Update advertising (#862)

* Setting up advertiser

* Wires up advertiser

* cleanup

* test compiles

* tests pass

* setting up test for advertiser

* Finishes advertiser tests

* fixes commonstore tests

* Review comments by Giuliano

* Race condition found by Giuliano

* Review comment by Dmitriy

Co-authored-by: Dmitriy Ryajov <dryajov@gmail.com>
Signed-off-by: Ben Bierens <39762930+benbierens@users.noreply.github.com>

* fixes tests

---------

Signed-off-by: Ben Bierens <39762930+benbierens@users.noreply.github.com>
Co-authored-by: Dmitriy Ryajov <dryajov@gmail.com>
This commit is contained in:
Ben Bierens 2024-08-26 15:18:59 +02:00 committed by GitHub
parent e017b05cf1
commit 1e2ad95659
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
19 changed files with 369 additions and 235 deletions

View File

@ -1,5 +1,6 @@
import ./engine/discovery import ./engine/discovery
import ./engine/advertiser
import ./engine/engine import ./engine/engine
import ./engine/payments import ./engine/payments
export discovery, engine, payments export discovery, advertiser, engine, payments

View File

@ -0,0 +1,177 @@
## Nim-Codex
## Copyright (c) 2022 Status Research & Development GmbH
## Licensed under either of
## * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
## * MIT license ([LICENSE-MIT](LICENSE-MIT))
## at your option.
## This file may not be copied, modified, or distributed except according to
## those terms.
import pkg/chronos
import pkg/libp2p/cid
import pkg/libp2p/multicodec
import pkg/metrics
import pkg/questionable
import pkg/questionable/results
import ../protobuf/presence
import ../peers
import ../../utils
import ../../discovery
import ../../stores/blockstore
import ../../logutils
import ../../manifest
logScope:
topics = "codex discoveryengine advertiser"
declareGauge(codexInflightAdvertise, "inflight advertise requests")
const
DefaultConcurrentAdvertRequests = 10
DefaultAdvertiseLoopSleep = 30.minutes
type
Advertiser* = ref object of RootObj
localStore*: BlockStore # Local block store for this instance
discovery*: Discovery # Discovery interface
advertiserRunning*: bool # Indicates if discovery is running
concurrentAdvReqs: int # Concurrent advertise requests
advertiseLocalStoreLoop*: Future[void] # Advertise loop task handle
advertiseQueue*: AsyncQueue[Cid] # Advertise queue
advertiseTasks*: seq[Future[void]] # Advertise tasks
advertiseLocalStoreLoopSleep: Duration # Advertise loop sleep
inFlightAdvReqs*: Table[Cid, Future[void]] # Inflight advertise requests
proc addCidToQueue(b: Advertiser, cid: Cid) {.async.} =
if cid notin b.advertiseQueue:
await b.advertiseQueue.put(cid)
trace "Advertising", cid
proc advertiseBlock(b: Advertiser, cid: Cid) {.async.} =
without isM =? cid.isManifest, err:
warn "Unable to determine if cid is manifest"
return
if isM:
without blk =? await b.localStore.getBlock(cid), err:
error "Error retrieving manifest block", cid, err = err.msg
return
without manifest =? Manifest.decode(blk), err:
error "Unable to decode as manifest", err = err.msg
return
# announce manifest cid and tree cid
await b.addCidToQueue(cid)
await b.addCidToQueue(manifest.treeCid)
proc advertiseLocalStoreLoop(b: Advertiser) {.async.} =
while b.advertiserRunning:
if cids =? await b.localStore.listBlocks(blockType = BlockType.Manifest):
trace "Advertiser begins iterating blocks..."
for c in cids:
if cid =? await c:
await b.advertiseBlock(cid)
trace "Advertiser iterating blocks finished."
await sleepAsync(b.advertiseLocalStoreLoopSleep)
info "Exiting advertise task loop"
proc processQueueLoop(b: Advertiser) {.async.} =
while b.advertiserRunning:
try:
let
cid = await b.advertiseQueue.get()
if cid in b.inFlightAdvReqs:
continue
try:
let
request = b.discovery.provide(cid)
b.inFlightAdvReqs[cid] = request
codexInflightAdvertise.set(b.inFlightAdvReqs.len.int64)
await request
finally:
b.inFlightAdvReqs.del(cid)
codexInflightAdvertise.set(b.inFlightAdvReqs.len.int64)
except CancelledError:
trace "Advertise task cancelled"
return
except CatchableError as exc:
warn "Exception in advertise task runner", exc = exc.msg
info "Exiting advertise task runner"
proc start*(b: Advertiser) {.async.} =
## Start the advertiser
##
trace "Advertiser start"
proc onBlock(cid: Cid) {.async.} =
await b.advertiseBlock(cid)
doAssert(b.localStore.onBlockStored.isNone())
b.localStore.onBlockStored = onBlock.some
if b.advertiserRunning:
warn "Starting advertiser twice"
return
b.advertiserRunning = true
for i in 0..<b.concurrentAdvReqs:
b.advertiseTasks.add(processQueueLoop(b))
b.advertiseLocalStoreLoop = advertiseLocalStoreLoop(b)
proc stop*(b: Advertiser) {.async.} =
## Stop the advertiser
##
trace "Advertiser stop"
if not b.advertiserRunning:
warn "Stopping advertiser without starting it"
return
b.advertiserRunning = false
# Stop incoming tasks from callback and localStore loop
b.localStore.onBlockStored = CidCallback.none
if not b.advertiseLocalStoreLoop.isNil and not b.advertiseLocalStoreLoop.finished:
trace "Awaiting advertise loop to stop"
await b.advertiseLocalStoreLoop.cancelAndWait()
trace "Advertise loop stopped"
# Clear up remaining tasks
for task in b.advertiseTasks:
if not task.finished:
trace "Awaiting advertise task to stop"
await task.cancelAndWait()
trace "Advertise task stopped"
trace "Advertiser stopped"
proc new*(
T: type Advertiser,
localStore: BlockStore,
discovery: Discovery,
concurrentAdvReqs = DefaultConcurrentAdvertRequests,
advertiseLocalStoreLoopSleep = DefaultAdvertiseLoopSleep
): Advertiser =
## Create a advertiser instance
##
Advertiser(
localStore: localStore,
discovery: discovery,
concurrentAdvReqs: concurrentAdvReqs,
advertiseQueue: newAsyncQueue[Cid](concurrentAdvReqs),
inFlightAdvReqs: initTable[Cid, Future[void]](),
advertiseLocalStoreLoopSleep: advertiseLocalStoreLoopSleep)

View File

@ -35,11 +35,9 @@ declareGauge(codexInflightDiscovery, "inflight discovery requests")
const const
DefaultConcurrentDiscRequests = 10 DefaultConcurrentDiscRequests = 10
DefaultConcurrentAdvertRequests = 10
DefaultDiscoveryTimeout = 1.minutes DefaultDiscoveryTimeout = 1.minutes
DefaultMinPeersPerBlock = 3 DefaultMinPeersPerBlock = 3
DefaultDiscoveryLoopSleep = 3.seconds DefaultDiscoveryLoopSleep = 3.seconds
DefaultAdvertiseLoopSleep = 30.minutes
type type
DiscoveryEngine* = ref object of RootObj DiscoveryEngine* = ref object of RootObj
@ -49,20 +47,13 @@ type
discovery*: Discovery # Discovery interface discovery*: Discovery # Discovery interface
pendingBlocks*: PendingBlocksManager # Blocks we're awaiting to be resolved pendingBlocks*: PendingBlocksManager # Blocks we're awaiting to be resolved
discEngineRunning*: bool # Indicates if discovery is running discEngineRunning*: bool # Indicates if discovery is running
concurrentAdvReqs: int # Concurrent advertise requests
concurrentDiscReqs: int # Concurrent discovery requests concurrentDiscReqs: int # Concurrent discovery requests
advertiseLoop*: Future[void] # Advertise loop task handle
advertiseQueue*: AsyncQueue[Cid] # Advertise queue
advertiseTasks*: seq[Future[void]] # Advertise tasks
discoveryLoop*: Future[void] # Discovery loop task handle discoveryLoop*: Future[void] # Discovery loop task handle
discoveryQueue*: AsyncQueue[Cid] # Discovery queue discoveryQueue*: AsyncQueue[Cid] # Discovery queue
discoveryTasks*: seq[Future[void]] # Discovery tasks discoveryTasks*: seq[Future[void]] # Discovery tasks
minPeersPerBlock*: int # Max number of peers with block minPeersPerBlock*: int # Max number of peers with block
discoveryLoopSleep: Duration # Discovery loop sleep discoveryLoopSleep: Duration # Discovery loop sleep
advertiseLoopSleep: Duration # Advertise loop sleep
inFlightDiscReqs*: Table[Cid, Future[seq[SignedPeerRecord]]] # Inflight discovery requests inFlightDiscReqs*: Table[Cid, Future[seq[SignedPeerRecord]]] # Inflight discovery requests
inFlightAdvReqs*: Table[Cid, Future[void]] # Inflight advertise requests
advertiseType*: BlockType # Advertice blocks, manifests or both
proc discoveryQueueLoop(b: DiscoveryEngine) {.async.} = proc discoveryQueueLoop(b: DiscoveryEngine) {.async.} =
while b.discEngineRunning: while b.discEngineRunning:
@ -81,68 +72,6 @@ proc discoveryQueueLoop(b: DiscoveryEngine) {.async.} =
await sleepAsync(b.discoveryLoopSleep) await sleepAsync(b.discoveryLoopSleep)
proc advertiseBlock(b: DiscoveryEngine, cid: Cid) {.async.} =
without isM =? cid.isManifest, err:
warn "Unable to determine if cid is manifest"
return
if isM:
without blk =? await b.localStore.getBlock(cid), err:
error "Error retrieving manifest block", cid, err = err.msg
return
without manifest =? Manifest.decode(blk), err:
error "Unable to decode as manifest", err = err.msg
return
# announce manifest cid and tree cid
await b.advertiseQueue.put(cid)
await b.advertiseQueue.put(manifest.treeCid)
proc advertiseQueueLoop(b: DiscoveryEngine) {.async.} =
while b.discEngineRunning:
if cids =? await b.localStore.listBlocks(blockType = b.advertiseType):
trace "Begin iterating blocks..."
for c in cids:
if cid =? await c:
await b.advertiseBlock(cid)
trace "Iterating blocks finished."
await sleepAsync(b.advertiseLoopSleep)
info "Exiting advertise task loop"
proc advertiseTaskLoop(b: DiscoveryEngine) {.async.} =
## Run advertise tasks
##
while b.discEngineRunning:
try:
let
cid = await b.advertiseQueue.get()
if cid in b.inFlightAdvReqs:
continue
try:
let
request = b.discovery.provide(cid)
b.inFlightAdvReqs[cid] = request
codexInflightDiscovery.set(b.inFlightAdvReqs.len.int64)
await request
finally:
b.inFlightAdvReqs.del(cid)
codexInflightDiscovery.set(b.inFlightAdvReqs.len.int64)
except CancelledError:
trace "Advertise task cancelled"
return
except CatchableError as exc:
warn "Exception in advertise task runner", exc = exc.msg
info "Exiting advertise task runner"
proc discoveryTaskLoop(b: DiscoveryEngine) {.async.} = proc discoveryTaskLoop(b: DiscoveryEngine) {.async.} =
## Run discovery tasks ## Run discovery tasks
## ##
@ -167,7 +96,7 @@ proc discoveryTaskLoop(b: DiscoveryEngine) {.async.} =
.wait(DefaultDiscoveryTimeout) .wait(DefaultDiscoveryTimeout)
b.inFlightDiscReqs[cid] = request b.inFlightDiscReqs[cid] = request
codexInflightDiscovery.set(b.inFlightAdvReqs.len.int64) codexInflightDiscovery.set(b.inFlightDiscReqs.len.int64)
let let
peers = await request peers = await request
@ -181,7 +110,7 @@ proc discoveryTaskLoop(b: DiscoveryEngine) {.async.} =
finally: finally:
b.inFlightDiscReqs.del(cid) b.inFlightDiscReqs.del(cid)
codexInflightDiscovery.set(b.inFlightAdvReqs.len.int64) codexInflightDiscovery.set(b.inFlightDiscReqs.len.int64)
except CancelledError: except CancelledError:
trace "Discovery task cancelled" trace "Discovery task cancelled"
return return
@ -198,14 +127,6 @@ proc queueFindBlocksReq*(b: DiscoveryEngine, cids: seq[Cid]) {.inline.} =
except CatchableError as exc: except CatchableError as exc:
warn "Exception queueing discovery request", exc = exc.msg warn "Exception queueing discovery request", exc = exc.msg
proc queueProvideBlocksReq*(b: DiscoveryEngine, cids: seq[Cid]) {.inline.} =
for cid in cids:
if cid notin b.advertiseQueue:
try:
b.advertiseQueue.putNoWait(cid)
except CatchableError as exc:
warn "Exception queueing discovery request", exc = exc.msg
proc start*(b: DiscoveryEngine) {.async.} = proc start*(b: DiscoveryEngine) {.async.} =
## Start the discengine task ## Start the discengine task
## ##
@ -217,13 +138,9 @@ proc start*(b: DiscoveryEngine) {.async.} =
return return
b.discEngineRunning = true b.discEngineRunning = true
for i in 0..<b.concurrentAdvReqs:
b.advertiseTasks.add(advertiseTaskLoop(b))
for i in 0..<b.concurrentDiscReqs: for i in 0..<b.concurrentDiscReqs:
b.discoveryTasks.add(discoveryTaskLoop(b)) b.discoveryTasks.add(discoveryTaskLoop(b))
b.advertiseLoop = advertiseQueueLoop(b)
b.discoveryLoop = discoveryQueueLoop(b) b.discoveryLoop = discoveryQueueLoop(b)
proc stop*(b: DiscoveryEngine) {.async.} = proc stop*(b: DiscoveryEngine) {.async.} =
@ -236,23 +153,12 @@ proc stop*(b: DiscoveryEngine) {.async.} =
return return
b.discEngineRunning = false b.discEngineRunning = false
for task in b.advertiseTasks:
if not task.finished:
trace "Awaiting advertise task to stop"
await task.cancelAndWait()
trace "Advertise task stopped"
for task in b.discoveryTasks: for task in b.discoveryTasks:
if not task.finished: if not task.finished:
trace "Awaiting discovery task to stop" trace "Awaiting discovery task to stop"
await task.cancelAndWait() await task.cancelAndWait()
trace "Discovery task stopped" trace "Discovery task stopped"
if not b.advertiseLoop.isNil and not b.advertiseLoop.finished:
trace "Awaiting advertise loop to stop"
await b.advertiseLoop.cancelAndWait()
trace "Advertise loop stopped"
if not b.discoveryLoop.isNil and not b.discoveryLoop.finished: if not b.discoveryLoop.isNil and not b.discoveryLoop.finished:
trace "Awaiting discovery loop to stop" trace "Awaiting discovery loop to stop"
await b.discoveryLoop.cancelAndWait() await b.discoveryLoop.cancelAndWait()
@ -267,12 +173,9 @@ proc new*(
network: BlockExcNetwork, network: BlockExcNetwork,
discovery: Discovery, discovery: Discovery,
pendingBlocks: PendingBlocksManager, pendingBlocks: PendingBlocksManager,
concurrentAdvReqs = DefaultConcurrentAdvertRequests,
concurrentDiscReqs = DefaultConcurrentDiscRequests, concurrentDiscReqs = DefaultConcurrentDiscRequests,
discoveryLoopSleep = DefaultDiscoveryLoopSleep, discoveryLoopSleep = DefaultDiscoveryLoopSleep,
advertiseLoopSleep = DefaultAdvertiseLoopSleep, minPeersPerBlock = DefaultMinPeersPerBlock
minPeersPerBlock = DefaultMinPeersPerBlock,
advertiseType = BlockType.Manifest
): DiscoveryEngine = ): DiscoveryEngine =
## Create a discovery engine instance for advertising services ## Create a discovery engine instance for advertising services
## ##
@ -282,13 +185,8 @@ proc new*(
network: network, network: network,
discovery: discovery, discovery: discovery,
pendingBlocks: pendingBlocks, pendingBlocks: pendingBlocks,
concurrentAdvReqs: concurrentAdvReqs,
concurrentDiscReqs: concurrentDiscReqs, concurrentDiscReqs: concurrentDiscReqs,
advertiseQueue: newAsyncQueue[Cid](concurrentAdvReqs),
discoveryQueue: newAsyncQueue[Cid](concurrentDiscReqs), discoveryQueue: newAsyncQueue[Cid](concurrentDiscReqs),
inFlightDiscReqs: initTable[Cid, Future[seq[SignedPeerRecord]]](), inFlightDiscReqs: initTable[Cid, Future[seq[SignedPeerRecord]]](),
inFlightAdvReqs: initTable[Cid, Future[void]](),
discoveryLoopSleep: discoveryLoopSleep, discoveryLoopSleep: discoveryLoopSleep,
advertiseLoopSleep: advertiseLoopSleep, minPeersPerBlock: minPeersPerBlock)
minPeersPerBlock: minPeersPerBlock,
advertiseType: advertiseType)

View File

@ -34,6 +34,7 @@ import ../peers
import ./payments import ./payments
import ./discovery import ./discovery
import ./advertiser
import ./pendingblocks import ./pendingblocks
export peers, pendingblocks, payments, discovery export peers, pendingblocks, payments, discovery
@ -77,6 +78,7 @@ type
pricing*: ?Pricing # Optional bandwidth pricing pricing*: ?Pricing # Optional bandwidth pricing
blockFetchTimeout*: Duration # Timeout for fetching blocks over the network blockFetchTimeout*: Duration # Timeout for fetching blocks over the network
discovery*: DiscoveryEngine discovery*: DiscoveryEngine
advertiser*: Advertiser
Pricing* = object Pricing* = object
address*: EthAddress address*: EthAddress
@ -93,6 +95,7 @@ proc start*(b: BlockExcEngine) {.async.} =
## ##
await b.discovery.start() await b.discovery.start()
await b.advertiser.start()
trace "Blockexc starting with concurrent tasks", tasks = b.concurrentTasks trace "Blockexc starting with concurrent tasks", tasks = b.concurrentTasks
if b.blockexcRunning: if b.blockexcRunning:
@ -108,6 +111,7 @@ proc stop*(b: BlockExcEngine) {.async.} =
## ##
await b.discovery.stop() await b.discovery.stop()
await b.advertiser.stop()
trace "NetworkStore stop" trace "NetworkStore stop"
if not b.blockexcRunning: if not b.blockexcRunning:
@ -284,27 +288,11 @@ proc cancelBlocks(b: BlockExcEngine, addrs: seq[BlockAddress]) {.async.} =
if failed.len > 0: if failed.len > 0:
warn "Failed to send block request cancellations to peers", peers = failed.len warn "Failed to send block request cancellations to peers", peers = failed.len
proc getAnnouceCids(blocksDelivery: seq[BlockDelivery]): seq[Cid] =
var cids = initHashSet[Cid]()
for bd in blocksDelivery:
if bd.address.leaf:
cids.incl(bd.address.treeCid)
else:
without isM =? bd.address.cid.isManifest, err:
warn "Unable to determine if cid is manifest"
continue
if isM:
cids.incl(bd.address.cid)
return cids.toSeq
proc resolveBlocks*(b: BlockExcEngine, blocksDelivery: seq[BlockDelivery]) {.async.} = proc resolveBlocks*(b: BlockExcEngine, blocksDelivery: seq[BlockDelivery]) {.async.} =
b.pendingBlocks.resolve(blocksDelivery) b.pendingBlocks.resolve(blocksDelivery)
await b.scheduleTasks(blocksDelivery) await b.scheduleTasks(blocksDelivery)
let announceCids = getAnnouceCids(blocksDelivery)
await b.cancelBlocks(blocksDelivery.mapIt(it.address)) await b.cancelBlocks(blocksDelivery.mapIt(it.address))
b.discovery.queueProvideBlocksReq(announceCids)
proc resolveBlocks*(b: BlockExcEngine, blocks: seq[Block]) {.async.} = proc resolveBlocks*(b: BlockExcEngine, blocks: seq[Block]) {.async.} =
await b.resolveBlocks( await b.resolveBlocks(
blocks.mapIt( blocks.mapIt(
@ -596,6 +584,7 @@ proc new*(
wallet: WalletRef, wallet: WalletRef,
network: BlockExcNetwork, network: BlockExcNetwork,
discovery: DiscoveryEngine, discovery: DiscoveryEngine,
advertiser: Advertiser,
peerStore: PeerCtxStore, peerStore: PeerCtxStore,
pendingBlocks: PendingBlocksManager, pendingBlocks: PendingBlocksManager,
concurrentTasks = DefaultConcurrentTasks, concurrentTasks = DefaultConcurrentTasks,
@ -616,6 +605,7 @@ proc new*(
concurrentTasks: concurrentTasks, concurrentTasks: concurrentTasks,
taskQueue: newAsyncHeapQueue[BlockExcPeerCtx](DefaultTaskQueueSize), taskQueue: newAsyncHeapQueue[BlockExcPeerCtx](DefaultTaskQueueSize),
discovery: discovery, discovery: discovery,
advertiser: advertiser,
blockFetchTimeout: blockFetchTimeout) blockFetchTimeout: blockFetchTimeout)
proc peerEventHandler(peerId: PeerId, event: PeerEvent) {.async.} = proc peerEventHandler(peerId: PeerId, event: PeerEvent) {.async.} =

View File

@ -268,8 +268,9 @@ proc new*(
peerStore = PeerCtxStore.new() peerStore = PeerCtxStore.new()
pendingBlocks = PendingBlocksManager.new() pendingBlocks = PendingBlocksManager.new()
advertiser = Advertiser.new(repoStore, discovery)
blockDiscovery = DiscoveryEngine.new(repoStore, peerStore, network, discovery, pendingBlocks) blockDiscovery = DiscoveryEngine.new(repoStore, peerStore, network, discovery, pendingBlocks)
engine = BlockExcEngine.new(repoStore, wallet, network, blockDiscovery, peerStore, pendingBlocks) engine = BlockExcEngine.new(repoStore, wallet, network, blockDiscovery, advertiser, peerStore, pendingBlocks)
store = NetworkStore.new(engine, repoStore) store = NetworkStore.new(engine, repoStore)
prover = if config.prover: prover = if config.prover:
if not fileAccessible($config.circomR1cs, {AccessFlags.Read}) and if not fileAccessible($config.circomR1cs, {AccessFlags.Read}) and

View File

@ -366,9 +366,6 @@ proc store*(
blocks = manifest.blocksCount, blocks = manifest.blocksCount,
datasetSize = manifest.datasetSize datasetSize = manifest.datasetSize
await self.discovery.provide(manifestBlk.cid)
await self.discovery.provide(treeCid)
return manifestBlk.cid.success return manifestBlk.cid.success
proc iterateManifests*(self: CodexNodeRef, onManifest: OnManifest) {.async.} = proc iterateManifests*(self: CodexNodeRef, onManifest: OnManifest) {.async.} =

View File

@ -29,7 +29,9 @@ type
BlockType* {.pure.} = enum BlockType* {.pure.} = enum
Manifest, Block, Both Manifest, Block, Both
CidCallback* = proc(cid: Cid): Future[void] {.gcsafe, raises:[].}
BlockStore* = ref object of RootObj BlockStore* = ref object of RootObj
onBlockStored*: ?CidCallback
method getBlock*(self: BlockStore, cid: Cid): Future[?!Block] {.base.} = method getBlock*(self: BlockStore, cid: Cid): Future[?!Block] {.base.} =
## Get a block from the blockstore ## Get a block from the blockstore

View File

@ -197,6 +197,9 @@ method putBlock*(
return success() return success()
discard self.putBlockSync(blk) discard self.putBlockSync(blk)
if onBlock =? self.onBlockStored:
await onBlock(blk.cid)
return success() return success()
method putCidAndProof*( method putCidAndProof*(
@ -282,7 +285,8 @@ proc new*(
cache: cache, cache: cache,
cidAndProofCache: cidAndProofCache, cidAndProofCache: cidAndProofCache,
currentSize: currentSize, currentSize: currentSize,
size: cacheSize) size: cacheSize,
onBlockStored: CidCallback.none)
for blk in blocks: for blk in blocks:
discard store.putBlockSync(blk) discard store.putBlockSync(blk)

View File

@ -189,6 +189,9 @@ method putBlock*(
if err =? (await self.updateTotalBlocksCount(plusCount = 1)).errorOption: if err =? (await self.updateTotalBlocksCount(plusCount = 1)).errorOption:
return failure(err) return failure(err)
if onBlock =? self.onBlockStored:
await onBlock(blk.cid)
else: else:
trace "Block already exists" trace "Block already exists"

View File

@ -11,6 +11,7 @@ import pkg/chronos
import pkg/datastore import pkg/datastore
import pkg/datastore/typedds import pkg/datastore/typedds
import pkg/libp2p/cid import pkg/libp2p/cid
import pkg/questionable
import ../blockstore import ../blockstore
import ../../clock import ../../clock
@ -103,5 +104,6 @@ func new*(
clock: clock, clock: clock,
postFixLen: postFixLen, postFixLen: postFixLen,
quotaMaxBytes: quotaMaxBytes, quotaMaxBytes: quotaMaxBytes,
blockTtl: blockTtl blockTtl: blockTtl,
onBlockStored: CidCallback.none
) )

View File

@ -32,6 +32,7 @@ asyncchecksuite "Block Advertising and Discovery":
peerStore: PeerCtxStore peerStore: PeerCtxStore
blockDiscovery: MockDiscovery blockDiscovery: MockDiscovery
discovery: DiscoveryEngine discovery: DiscoveryEngine
advertiser: Advertiser
wallet: WalletRef wallet: WalletRef
network: BlockExcNetwork network: BlockExcNetwork
localStore: CacheStore localStore: CacheStore
@ -68,11 +69,17 @@ asyncchecksuite "Block Advertising and Discovery":
pendingBlocks, pendingBlocks,
minPeersPerBlock = 1) minPeersPerBlock = 1)
advertiser = Advertiser.new(
localStore,
blockDiscovery
)
engine = BlockExcEngine.new( engine = BlockExcEngine.new(
localStore, localStore,
wallet, wallet,
network, network,
discovery, discovery,
advertiser,
peerStore, peerStore,
pendingBlocks) pendingBlocks)
@ -200,11 +207,17 @@ asyncchecksuite "E2E - Multiple Nodes Discovery":
pendingBlocks, pendingBlocks,
minPeersPerBlock = 1) minPeersPerBlock = 1)
advertiser = Advertiser.new(
localStore,
blockDiscovery
)
engine = BlockExcEngine.new( engine = BlockExcEngine.new(
localStore, localStore,
wallet, wallet,
network, network,
discovery, discovery,
advertiser,
peerStore, peerStore,
pendingBlocks) pendingBlocks)
networkStore = NetworkStore.new(engine, localStore) networkStore = NetworkStore.new(engine, localStore)

View File

@ -74,30 +74,6 @@ asyncchecksuite "Test Discovery Engine":
await allFuturesThrowing(allFinished(wants)).wait(1.seconds) await allFuturesThrowing(allFinished(wants)).wait(1.seconds)
await discoveryEngine.stop() await discoveryEngine.stop()
test "Should Advertise Haves":
var
localStore = CacheStore.new(blocks.mapIt(it))
discoveryEngine = DiscoveryEngine.new(
localStore,
peerStore,
network,
blockDiscovery,
pendingBlocks,
discoveryLoopSleep = 100.millis)
haves = collect(initTable):
for cid in @[manifestBlock.cid, manifest.treeCid]:
{ cid: newFuture[void]() }
blockDiscovery.publishBlockProvideHandler =
proc(d: MockDiscovery, cid: Cid) {.async, gcsafe.} =
if not haves[cid].finished:
haves[cid].complete
await discoveryEngine.start()
await allFuturesThrowing(
allFinished(toSeq(haves.values))).wait(5.seconds)
await discoveryEngine.stop()
test "Should queue discovery request": test "Should queue discovery request":
var var
localStore = CacheStore.new() localStore = CacheStore.new()
@ -191,36 +167,3 @@ asyncchecksuite "Test Discovery Engine":
reqs.complete() reqs.complete()
await discoveryEngine.stop() await discoveryEngine.stop()
test "Should not request if there is already an inflight advertise request":
var
localStore = CacheStore.new()
discoveryEngine = DiscoveryEngine.new(
localStore,
peerStore,
network,
blockDiscovery,
pendingBlocks,
discoveryLoopSleep = 100.millis,
concurrentAdvReqs = 2)
reqs = newFuture[void]()
count = 0
blockDiscovery.publishBlockProvideHandler =
proc(d: MockDiscovery, cid: Cid) {.async, gcsafe.} =
check cid == blocks[0].cid
if count > 0:
check false
count.inc
await reqs # queue the request
await discoveryEngine.start()
discoveryEngine.queueProvideBlocksReq(@[blocks[0].cid])
await sleepAsync(200.millis)
discoveryEngine.queueProvideBlocksReq(@[blocks[0].cid])
await sleepAsync(200.millis)
reqs.complete()
await discoveryEngine.stop()

View File

@ -0,0 +1,106 @@
import std/sequtils
import std/random
import pkg/chronos
import pkg/libp2p/routing_record
import pkg/codexdht/discv5/protocol as discv5
import pkg/codex/blockexchange
import pkg/codex/stores
import pkg/codex/chunker
import pkg/codex/discovery
import pkg/codex/blocktype as bt
import pkg/codex/manifest
import ../../../asynctest
import ../../helpers
import ../../helpers/mockdiscovery
import ../../examples
asyncchecksuite "Advertiser":
var
blockDiscovery: MockDiscovery
localStore: BlockStore
advertiser: Advertiser
let
manifest = Manifest.new(
treeCid = Cid.example,
blockSize = 123.NBytes,
datasetSize = 234.NBytes)
manifestBlk = Block.new(data = manifest.encode().tryGet(), codec = ManifestCodec).tryGet()
setup:
blockDiscovery = MockDiscovery.new()
localStore = CacheStore.new()
advertiser = Advertiser.new(
localStore,
blockDiscovery
)
await advertiser.start()
teardown:
await advertiser.stop()
test "blockStored should queue manifest Cid for advertising":
(await localStore.putBlock(manifestBlk)).tryGet()
check:
manifestBlk.cid in advertiser.advertiseQueue
test "blockStored should queue tree Cid for advertising":
(await localStore.putBlock(manifestBlk)).tryGet()
check:
manifest.treeCid in advertiser.advertiseQueue
test "blockStored should not queue non-manifest non-tree CIDs for discovery":
let blk = bt.Block.example
(await localStore.putBlock(blk)).tryGet()
check:
blk.cid notin advertiser.advertiseQueue
test "Should not queue if there is already an inflight advertise request":
var
reqs = newFuture[void]()
manifestCount = 0
treeCount = 0
blockDiscovery.publishBlockProvideHandler =
proc(d: MockDiscovery, cid: Cid) {.async, gcsafe.} =
if cid == manifestBlk.cid:
inc manifestCount
if cid == manifest.treeCid:
inc treeCount
await reqs # queue the request
(await localStore.putBlock(manifestBlk)).tryGet()
(await localStore.putBlock(manifestBlk)).tryGet()
reqs.complete()
check eventually manifestCount == 1
check eventually treeCount == 1
test "Should advertise existing manifests and their trees":
let
newStore = CacheStore.new([manifestBlk])
await advertiser.stop()
advertiser = Advertiser.new(
newStore,
blockDiscovery
)
await advertiser.start()
check eventually manifestBlk.cid in advertiser.advertiseQueue
check eventually manifest.treeCid in advertiser.advertiseQueue
test "Stop should clear onBlockStored callback":
await advertiser.stop()
check:
localStore.onBlockStored.isNone()

View File

@ -78,11 +78,17 @@ asyncchecksuite "NetworkStore engine basic":
blockDiscovery, blockDiscovery,
pendingBlocks) pendingBlocks)
advertiser = Advertiser.new(
localStore,
blockDiscovery
)
engine = BlockExcEngine.new( engine = BlockExcEngine.new(
localStore, localStore,
wallet, wallet,
network, network,
discovery, discovery,
advertiser,
peerStore, peerStore,
pendingBlocks) pendingBlocks)
@ -113,11 +119,17 @@ asyncchecksuite "NetworkStore engine basic":
blockDiscovery, blockDiscovery,
pendingBlocks) pendingBlocks)
advertiser = Advertiser.new(
localStore,
blockDiscovery
)
engine = BlockExcEngine.new( engine = BlockExcEngine.new(
localStore, localStore,
wallet, wallet,
network, network,
discovery, discovery,
advertiser,
peerStore, peerStore,
pendingBlocks) pendingBlocks)
@ -139,6 +151,7 @@ asyncchecksuite "NetworkStore engine handlers":
network: BlockExcNetwork network: BlockExcNetwork
engine: BlockExcEngine engine: BlockExcEngine
discovery: DiscoveryEngine discovery: DiscoveryEngine
advertiser: Advertiser
peerCtx: BlockExcPeerCtx peerCtx: BlockExcPeerCtx
localStore: BlockStore localStore: BlockStore
blocks: seq[Block] blocks: seq[Block]
@ -176,11 +189,17 @@ asyncchecksuite "NetworkStore engine handlers":
blockDiscovery, blockDiscovery,
pendingBlocks) pendingBlocks)
advertiser = Advertiser.new(
localStore,
blockDiscovery
)
engine = BlockExcEngine.new( engine = BlockExcEngine.new(
localStore, localStore,
wallet, wallet,
network, network,
discovery, discovery,
advertiser,
peerStore, peerStore,
pendingBlocks) pendingBlocks)
@ -390,51 +409,6 @@ asyncchecksuite "NetworkStore engine handlers":
discard await allFinished(pending) discard await allFinished(pending)
await allFuturesThrowing(cancellations.values().toSeq) await allFuturesThrowing(cancellations.values().toSeq)
test "resolveBlocks should queue manifest CIDs for discovery":
engine.network = BlockExcNetwork(
request: BlockExcRequest(sendWantCancellations: NopSendWantCancellationsProc))
let
manifest = Manifest.new(
treeCid = Cid.example,
blockSize = 123.NBytes,
datasetSize = 234.NBytes
)
let manifestBlk = Block.new(data = manifest.encode().tryGet(), codec = ManifestCodec).tryGet()
let blks = @[manifestBlk]
await engine.resolveBlocks(blks)
check:
manifestBlk.cid in engine.discovery.advertiseQueue
test "resolveBlocks should queue tree CIDs for discovery":
engine.network = BlockExcNetwork(
request: BlockExcRequest(sendWantCancellations: NopSendWantCancellationsProc))
let
tCid = Cid.example
delivery = BlockDelivery(blk: Block.example, address: BlockAddress(leaf: true, treeCid: tCid))
await engine.resolveBlocks(@[delivery])
check:
tCid in engine.discovery.advertiseQueue
test "resolveBlocks should not queue non-manifest non-tree CIDs for discovery":
engine.network = BlockExcNetwork(
request: BlockExcRequest(sendWantCancellations: NopSendWantCancellationsProc))
let
blkCid = Cid.example
delivery = BlockDelivery(blk: Block.example, address: BlockAddress(leaf: false, cid: blkCid))
await engine.resolveBlocks(@[delivery])
check:
blkCid notin engine.discovery.advertiseQueue
asyncchecksuite "Task Handler": asyncchecksuite "Task Handler":
var var
rng: Rng rng: Rng
@ -448,6 +422,7 @@ asyncchecksuite "Task Handler":
network: BlockExcNetwork network: BlockExcNetwork
engine: BlockExcEngine engine: BlockExcEngine
discovery: DiscoveryEngine discovery: DiscoveryEngine
advertiser: Advertiser
localStore: BlockStore localStore: BlockStore
peersCtx: seq[BlockExcPeerCtx] peersCtx: seq[BlockExcPeerCtx]
@ -481,11 +456,17 @@ asyncchecksuite "Task Handler":
blockDiscovery, blockDiscovery,
pendingBlocks) pendingBlocks)
advertiser = Advertiser.new(
localStore,
blockDiscovery
)
engine = BlockExcEngine.new( engine = BlockExcEngine.new(
localStore, localStore,
wallet, wallet,
network, network,
discovery, discovery,
advertiser,
peerStore, peerStore,
pendingBlocks) pendingBlocks)
peersCtx = @[] peersCtx = @[]

View File

@ -1,5 +1,6 @@
import ./engine/testengine import ./engine/testengine
import ./engine/testblockexc import ./engine/testblockexc
import ./engine/testpayments import ./engine/testpayments
import ./engine/testadvertiser
{.warning[UnusedImport]: off.} {.warning[UnusedImport]: off.}

View File

@ -40,8 +40,9 @@ proc generateNodes*(
localStore = CacheStore.new(blocks.mapIt( it )) localStore = CacheStore.new(blocks.mapIt( it ))
peerStore = PeerCtxStore.new() peerStore = PeerCtxStore.new()
pendingBlocks = PendingBlocksManager.new() pendingBlocks = PendingBlocksManager.new()
advertiser = Advertiser.new(localStore, discovery)
blockDiscovery = DiscoveryEngine.new(localStore, peerStore, network, discovery, pendingBlocks) blockDiscovery = DiscoveryEngine.new(localStore, peerStore, network, discovery, pendingBlocks)
engine = BlockExcEngine.new(localStore, wallet, network, blockDiscovery, peerStore, pendingBlocks) engine = BlockExcEngine.new(localStore, wallet, network, blockDiscovery, advertiser, peerStore, pendingBlocks)
networkStore = NetworkStore.new(engine, localStore) networkStore = NetworkStore.new(engine, localStore)
switch.mount(network) switch.mount(network)

View File

@ -82,6 +82,7 @@ template setupAndTearDown*() {.dirty.} =
peerStore: PeerCtxStore peerStore: PeerCtxStore
pendingBlocks: PendingBlocksManager pendingBlocks: PendingBlocksManager
discovery: DiscoveryEngine discovery: DiscoveryEngine
advertiser: Advertiser
taskpool: Taskpool taskpool: Taskpool
let let
@ -109,7 +110,8 @@ template setupAndTearDown*() {.dirty.} =
peerStore = PeerCtxStore.new() peerStore = PeerCtxStore.new()
pendingBlocks = PendingBlocksManager.new() pendingBlocks = PendingBlocksManager.new()
discovery = DiscoveryEngine.new(localStore, peerStore, network, blockDiscovery, pendingBlocks) discovery = DiscoveryEngine.new(localStore, peerStore, network, blockDiscovery, pendingBlocks)
engine = BlockExcEngine.new(localStore, wallet, network, discovery, peerStore, pendingBlocks) advertiser = Advertiser.new(localStore, blockDiscovery)
engine = BlockExcEngine.new(localStore, wallet, network, discovery, advertiser, peerStore, pendingBlocks)
store = NetworkStore.new(engine, localStore) store = NetworkStore.new(engine, localStore)
taskpool = Taskpool.new(num_threads = countProcessors()) taskpool = Taskpool.new(num_threads = countProcessors())
node = CodexNodeRef.new( node = CodexNodeRef.new(
@ -120,8 +122,6 @@ template setupAndTearDown*() {.dirty.} =
discovery = blockDiscovery, discovery = blockDiscovery,
taskpool = taskpool) taskpool = taskpool)
await node.start()
teardown: teardown:
close(file) close(file)
await node.stop() await node.stop()

View File

@ -49,6 +49,9 @@ privateAccess(CodexNodeRef) # enable access to private fields
asyncchecksuite "Test Node - Basic": asyncchecksuite "Test Node - Basic":
setupAndTearDown() setupAndTearDown()
setup:
await node.start()
test "Fetch Manifest": test "Fetch Manifest":
let let
manifest = await storeDataGetManifest(localStore, chunker) manifest = await storeDataGetManifest(localStore, chunker)

View File

@ -15,6 +15,7 @@ import pkg/codex/utils
import ../../asynctest import ../../asynctest
import ../helpers import ../helpers
import ../examples
type type
StoreProvider* = proc(): BlockStore {.gcsafe.} StoreProvider* = proc(): BlockStore {.gcsafe.}
@ -56,6 +57,16 @@ proc commonBlockStoreTests*(name: string,
(await store.putBlock(newBlock1)).tryGet() (await store.putBlock(newBlock1)).tryGet()
check (await store.hasBlock(newBlock1.cid)).tryGet() check (await store.hasBlock(newBlock1.cid)).tryGet()
test "putBlock raises onBlockStored":
var storedCid = Cid.example
proc onStored(cid: Cid) {.async.} =
storedCid = cid
store.onBlockStored = onStored.some()
(await store.putBlock(newBlock1)).tryGet()
check storedCid == newBlock1.cid
test "getBlock": test "getBlock":
(await store.putBlock(newBlock)).tryGet() (await store.putBlock(newBlock)).tryGet()
let blk = await store.getBlock(newBlock.cid) let blk = await store.getBlock(newBlock.cid)