From d3dbbc75fa7767bae704f88c7bb8ce8b10d8a53c Mon Sep 17 00:00:00 2001 From: Dmitriy Ryajov Date: Wed, 18 May 2022 20:29:15 -0600 Subject: [PATCH] Extract Discovery engine (#99) * don't force logging syncs * Add failing test * wip discovery engine * re-add chronicles sinks * wip * move network related stuff to own folder * move peer related stuff to own folder * extract discovery into it's own engine * update imports * move pending blocks into engine module * add top level exports * update imports * update import paths * update imports * support for inflight request filtering and tests * use `remove` instead of `del` * fix sorting in `selectCheapest` * re-org test file structure * fix to use discovery engine * file re-org * fix compilation * fixup discovery to use async handlers * more re-org * rework with support for discovery engine * add logging * use defaults * wip: reworking with discoveryengine * wip: more test fixes * more logging * use ordered table * use `bt` for blocktype Block * fix tests * make tests work with discovery engine * expose all node components * fix to work with discovery engine * wip * propagate cancellation in listBlocks * start/stop disc engine in blockexc engine * remove disc engine start/stop * wire up discovery engine * misc comments and imports * pass discovery to dagger node * set sleep timers * unused imports * misc * don't spawn a task, await it * don't await handlers * trace logging * reduce default sleep time Co-authored-by: Tanguy --- dagger/blockexchange.nim | 5 +- dagger/blockexchange/engine.nim | 285 +++++------------- dagger/blockexchange/engine/discovery.nim | 261 ++++++++++++++++ dagger/blockexchange/engine/payments.nim | 3 +- dagger/blockexchange/network.nim | 16 +- dagger/blockexchange/networkpeer.nim | 3 +- dagger/blockexchange/peercontext.nim | 1 + dagger/blockexchange/peers.nim | 3 + dagger/blockexchange/peers/peerctxstore.nim | 82 +++++ dagger/blockexchange/pendingblocks.nim | 10 +- dagger/dagger.nim | 9 +- dagger/discovery.nim | 7 +- dagger/erasure/erasure.nim | 10 +- dagger/stores/fsstore.nim | 7 +- dagger/stores/networkstore.nim | 6 +- dagger/streams/storestream.nim | 2 - .../blockexc/discovery/testdiscovery.nim | 134 +++++--- .../discovery/testdiscoveryengine.nim | 237 +++++++++++++++ .../dagger/blockexc/protobuf/testpresence.nim | 1 - tests/dagger/blockexc/testblockexc.nim | 167 +++++----- tests/dagger/blockexc/testengine.nim | 178 ++++++++--- tests/dagger/blockexc/testpeerctxstore.nim | 109 +++++++ tests/dagger/examples.nim | 8 +- .../discovery => helpers}/mockdiscovery.nim | 19 +- tests/dagger/helpers/nodeutils.nim | 36 ++- tests/dagger/stores/testfsstore.nim | 3 +- tests/dagger/testblockexc.nim | 6 +- tests/dagger/testerasure.nim | 14 +- tests/dagger/testnode.nim | 14 +- tests/dagger/teststorestream.nim | 3 +- 30 files changed, 1164 insertions(+), 475 deletions(-) create mode 100644 dagger/blockexchange/engine/discovery.nim create mode 100644 dagger/blockexchange/peers.nim create mode 100644 dagger/blockexchange/peers/peerctxstore.nim create mode 100644 tests/dagger/blockexc/discovery/testdiscoveryengine.nim create mode 100644 tests/dagger/blockexc/testpeerctxstore.nim rename tests/dagger/{blockexc/discovery => helpers}/mockdiscovery.nim (71%) diff --git a/dagger/blockexchange.nim b/dagger/blockexchange.nim index 125f50bf..1c90ae4d 100644 --- a/dagger/blockexchange.nim +++ b/dagger/blockexchange.nim @@ -1,11 +1,10 @@ import ./blockexchange/[ network, engine, - peercontext] + peers] import ./blockexchange/protobuf/[ blockexc, - payments, presence] -export network, engine, peercontext, blockexc, payments, presence +export network, engine, blockexc, presence, peers diff --git a/dagger/blockexchange/engine.nim b/dagger/blockexchange/engine.nim index e30ba674..6a67d964 100644 --- a/dagger/blockexchange/engine.nim +++ b/dagger/blockexchange/engine.nim @@ -17,17 +17,17 @@ import pkg/libp2p import ../stores/blockstore import ../blocktype as bt import ../utils -import ../discovery import ./protobuf/blockexc import ./protobuf/presence import ./network -import ./pendingblocks -import ./peercontext +import ./peers import ./engine/payments +import ./engine/discovery +import ./pendingblocks -export peercontext, payments, pendingblocks +export peers, pendingblocks, payments, discovery logScope: topics = "dagger blockexc engine" @@ -48,28 +48,18 @@ type TaskScheduler* = proc(task: BlockExcPeerCtx): bool {.gcsafe.} BlockExcEngine* = ref object of RootObj - localStore*: BlockStore # where we localStore blocks for this instance - network*: BlockExcNetwork # network interface - peers*: seq[BlockExcPeerCtx] # peers we're currently actively exchanging with - taskQueue*: AsyncHeapQueue[BlockExcPeerCtx] # peers we're currently processing tasks for - concurrentTasks: int # number of concurrent peers we're serving at any given time - maxRetries: int # max number of tries for a failed block - blockexcTasks: seq[Future[void]] # future to control blockexc task - blockexcRunning: bool # indicates if the blockexc task is running - pendingBlocks*: PendingBlocksManager # blocks we're awaiting to be resolved - peersPerRequest: int # max number of peers to request from - wallet*: WalletRef # nitro wallet for micropayments - pricing*: ?Pricing # optional bandwidth pricing - discovery*: Discovery # Discovery interface - concurrentAdvReqs: int # Concurrent advertise requests - advertiseLoop*: Future[void] # Advertise loop task handle - advertiseQueue*: AsyncQueue[Cid] # Advertise queue - advertiseTasks*: seq[Future[void]] # Advertise tasks - concurrentDiscReqs: int # Concurrent discovery requests - discoveryLoop*: Future[void] # Discovery loop task handle - discoveryTasks*: seq[Future[void]] # Discovery tasks - discoveryQueue*: AsyncQueue[Cid] # Discovery queue - minPeersPerBlock*: int # Max number of peers with block + localStore*: BlockStore # Local block store for this instance + network*: BlockExcNetwork # Petwork interface + peers*: PeerCtxStore # Peers we're currently actively exchanging with + taskQueue*: AsyncHeapQueue[BlockExcPeerCtx] # Peers we're currently processing tasks for + concurrentTasks: int # Number of concurrent peers we're serving at any given time + blockexcTasks: seq[Future[void]] # Future to control blockexc task + blockexcRunning: bool # Indicates if the blockexc task is running + pendingBlocks*: PendingBlocksManager # Blocks we're awaiting to be resolved + peersPerRequest: int # Max number of peers to request from + wallet*: WalletRef # Nitro wallet for micropayments + pricing*: ?Pricing # Optional bandwidth pricing + discovery*: DiscoveryEngine Pricing* = object address*: EthAddress @@ -81,115 +71,19 @@ proc contains*(a: AsyncHeapQueue[Entry], b: Cid): bool = a.anyIt( it.cid == b ) -proc getPeerCtx*(b: BlockExcEngine, peerId: PeerID): BlockExcPeerCtx = - ## Get the peer's context - ## - - let peer = b.peers.filterIt( it.id == peerId ) - if peer.len > 0: - return peer[0] - # attach task scheduler to engine proc scheduleTask(b: BlockExcEngine, task: BlockExcPeerCtx): bool {.gcsafe} = b.taskQueue.pushOrUpdateNoWait(task).isOk() proc blockexcTaskRunner(b: BlockExcEngine): Future[void] {.gcsafe.} -proc discoveryLoopRunner(b: BlockExcEngine) {.async.} = - while b.blockexcRunning: - for cid in toSeq(b.pendingBlocks.wantList): - try: - await b.discoveryQueue.put(cid) - except CatchableError as exc: - trace "Exception in discovery loop", exc = exc.msg - - trace "About to sleep, number of wanted blocks", wanted = b.pendingBlocks.len - await sleepAsync(30.seconds) - -proc advertiseLoopRunner*(b: BlockExcEngine) {.async.} = - proc onBlock(cid: Cid) {.async.} = - try: - await b.advertiseQueue.put(cid) - except CatchableError as exc: - trace "Exception listing blocks", exc = exc.msg - - while b.blockexcRunning: - await b.localStore.listBlocks(onBlock) - await sleepAsync(30.seconds) - - trace "Exiting advertise task loop" - -proc advertiseTaskRunner(b: BlockExcEngine) {.async.} = - ## Run advertise tasks - ## - - while b.blockexcRunning: - try: - let cid = await b.advertiseQueue.get() - await b.discovery.provideBlock(cid) - except CatchableError as exc: - trace "Exception in advertise task runner", exc = exc.msg - - trace "Exiting advertise task runner" - -proc discoveryTaskRunner(b: BlockExcEngine) {.async.} = - ## Run discovery tasks - ## - - while b.blockexcRunning: - try: - let - cid = await b.discoveryQueue.get() - haves = b.peers.filterIt( - it.peerHave.anyIt( it == cid ) - ) - - trace "Got peers for block", cid = $cid, count = haves.len - let - providers = - if haves.len < b.minPeersPerBlock: - await b.discovery - .findBlockProviders(cid) - .wait(DefaultDiscoveryTimeout) - else: - @[] - - checkFutures providers.mapIt( b.network.dialPeer(it.data) ) - except CatchableError as exc: - trace "Exception in discovery task runner", exc = exc.msg - - trace "Exiting discovery task runner" - -template queueFindBlocksReq(b: BlockExcEngine, cids: seq[Cid]) = - proc queueReq() {.async.} = - try: - for cid in cids: - if cid notin b.discoveryQueue: - trace "Queueing find block request", cid = $cid - await b.discoveryQueue.put(cid) - except CatchableError as exc: - trace "Exception queueing discovery request", exc = exc.msg - - asyncSpawn queueReq() - -template queueProvideBlocksReq(b: BlockExcEngine, cids: seq[Cid]) = - proc queueReq() {.async.} = - try: - for cid in cids: - if cid notin b.advertiseQueue: - trace "Queueing provide block request", cid = $cid - await b.advertiseQueue.put(cid) - except CatchableError as exc: - trace "Exception queueing discovery request", exc = exc.msg - - asyncSpawn queueReq() - proc start*(b: BlockExcEngine) {.async.} = ## Start the blockexc task ## - trace "blockexc start" + await b.discovery.start() + trace "Blockexc starting with concurrent tasks", tasks = b.concurrentTasks if b.blockexcRunning: warn "Starting blockexc twice" return @@ -198,19 +92,12 @@ proc start*(b: BlockExcEngine) {.async.} = for i in 0.. 0: b.network.request.sendWantList( peer, @@ -342,23 +194,29 @@ proc blockPresenceHandler*( # if none of the connected peers report our wants in their have list, # fire up discovery - b.queueFindBlocksReq(toSeq(b.pendingBlocks.wantList) - .filter(proc(cid: Cid): bool = - (not b.peers.anyIt( cid in it.peerHave )))) + b.discovery.queueFindBlocksReq( + toSeq(b.pendingBlocks.wantList) + .filter do(cid: Cid) -> bool: + not b.peers.anyIt( cid in it.peerHave )) proc scheduleTasks(b: BlockExcEngine, blocks: seq[bt.Block]) = trace "Schedule a task for new blocks" - let cids = blocks.mapIt( it.cid ) + let + cids = blocks.mapIt( it.cid ) + # schedule any new peers to provide blocks to for p in b.peers: for c in cids: # for each cid - # schedule a peer if it wants at least one - # cid and we have it in our local store - if c in p.peerWants and c in b.localStore: - if not b.scheduleTask(p): - trace "Unable to schedule task for peer", peer = p.id - break # do next peer + # schedule a peer if it wants at least one + # cid and we have it in our local store + if c in p.peerWants and c in b.localStore: + if b.scheduleTask(p): + trace "Task scheduled for peer", peer = p.id + else: + trace "Unable to schedule task for peer", peer = p.id + + break # do next peer proc resolveBlocks*(b: BlockExcEngine, blocks: seq[bt.Block]) = ## Resolve pending blocks from the pending blocks manager @@ -369,7 +227,7 @@ proc resolveBlocks*(b: BlockExcEngine, blocks: seq[bt.Block]) = b.pendingBlocks.resolve(blocks) b.scheduleTasks(blocks) - b.queueProvideBlocksReq(blocks.mapIt( it.cid )) + b.discovery.queueProvideBlocksReq(blocks.mapIt( it.cid )) proc payForBlocks(engine: BlockExcEngine, peer: BlockExcPeerCtx, @@ -396,7 +254,7 @@ proc blocksHandler*( continue b.resolveBlocks(blocks) - let peerCtx = b.getPeerCtx(peer) + let peerCtx = b.peers.get(peer) if peerCtx != nil: b.payForBlocks(peerCtx, blocks) @@ -408,7 +266,7 @@ proc wantListHandler*( ## trace "Got want list for peer", peer - let peerCtx = b.getPeerCtx(peer) + let peerCtx = b.peers.get(peer) if isNil(peerCtx): return @@ -449,7 +307,7 @@ proc accountHandler*( engine: BlockExcEngine, peer: PeerID, account: Account) {.async.} = - let context = engine.getPeerCtx(peer) + let context = engine.peers.get(peer) if context.isNil: return @@ -459,7 +317,7 @@ proc paymentHandler*( engine: BlockExcEngine, peer: PeerId, payment: SignedState) {.async.} = - without context =? engine.getPeerCtx(peer).option and + without context =? engine.peers.get(peer).option and account =? context.account: return @@ -494,7 +352,7 @@ proc dropPeer*(b: BlockExcEngine, peer: PeerID) = trace "Dropping peer", peer # drop the peer from the peers table - b.peers.keepItIf( it.id != peer ) + b.peers.remove(peer) proc taskHandler*(b: BlockExcEngine, task: BlockExcPeerCtx) {.gcsafe, async.} = trace "Handling task for peer", peer = task.id @@ -546,9 +404,13 @@ proc blockexcTaskRunner(b: BlockExcEngine) {.async.} = ## process tasks ## + trace "Starting blockexc task runner" while b.blockexcRunning: - let peerCtx = await b.taskQueue.pop() - asyncSpawn b.taskHandler(peerCtx) + let + peerCtx = await b.taskQueue.pop() + + trace "Got new task from queue", peerId = peerCtx.id + await b.taskHandler(peerCtx) trace "Exiting blockexc task runner" @@ -557,30 +419,23 @@ proc new*( localStore: BlockStore, wallet: WalletRef, network: BlockExcNetwork, - discovery: Discovery, + discovery: DiscoveryEngine, + peerStore: PeerCtxStore, + pendingBlocks: PendingBlocksManager, concurrentTasks = DefaultConcurrentTasks, - maxRetries = DefaultMaxRetries, - peersPerRequest = DefaultMaxPeersPerRequest, - concurrentAdvReqs = DefaultConcurrentAdvertRequests, - concurrentDiscReqs = DefaultConcurrentDiscRequests, - minPeersPerBlock = DefaultMinPeersPerBlock): T = + peersPerRequest = DefaultMaxPeersPerRequest): T = let engine = BlockExcEngine( localStore: localStore, - pendingBlocks: PendingBlocksManager.new(), + peers: peerStore, + pendingBlocks: pendingBlocks, peersPerRequest: peersPerRequest, network: network, wallet: wallet, concurrentTasks: concurrentTasks, - concurrentAdvReqs: concurrentAdvReqs, - concurrentDiscReqs: concurrentDiscReqs, - maxRetries: maxRetries, taskQueue: newAsyncHeapQueue[BlockExcPeerCtx](DefaultTaskQueueSize), - discovery: discovery, - advertiseQueue: newAsyncQueue[Cid](DefaultTaskQueueSize), - discoveryQueue: newAsyncQueue[Cid](DefaultTaskQueueSize), - minPeersPerBlock: minPeersPerBlock) + discovery: discovery) proc peerEventHandler(peerId: PeerID, event: PeerEvent) {.async.} = if event.kind == PeerEventKind.Joined: diff --git a/dagger/blockexchange/engine/discovery.nim b/dagger/blockexchange/engine/discovery.nim new file mode 100644 index 00000000..aba3ccd0 --- /dev/null +++ b/dagger/blockexchange/engine/discovery.nim @@ -0,0 +1,261 @@ +## Nim-Dagger +## Copyright (c) 2022 Status Research & Development GmbH +## Licensed under either of +## * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE)) +## * MIT license ([LICENSE-MIT](LICENSE-MIT)) +## at your option. +## This file may not be copied, modified, or distributed except according to +## those terms. + +import std/sequtils + +import pkg/chronos +import pkg/chronicles +import pkg/libp2p + +import ../protobuf/presence + +import ../network +import ../peers + +import ../../utils +import ../../discovery +import ../../stores/blockstore + +import ../pendingblocks + +logScope: + topics = "dagger discovery engine" + +const + DefaultConcurrentDiscRequests = 10 + DefaultConcurrentAdvertRequests = 10 + DefaultDiscoveryTimeout = 1.minutes + DefaultMinPeersPerBlock = 3 + DefaultDiscoveryLoopSleep = 3.seconds + DefaultAdvertiseLoopSleep = 3.seconds + +type + DiscoveryEngine* = ref object of RootObj + localStore*: BlockStore # Local block store for this instance + peers*: PeerCtxStore # Peer context store + network*: BlockExcNetwork # Network interface + discovery*: Discovery # Discovery interface + pendingBlocks*: PendingBlocksManager # Blocks we're awaiting to be resolved + discEngineRunning*: bool # Indicates if discovery is running + concurrentAdvReqs: int # Concurrent advertise requests + concurrentDiscReqs: int # Concurrent discovery requests + advertiseLoop*: Future[void] # Advertise loop task handle + advertiseQueue*: AsyncQueue[Cid] # Advertise queue + advertiseTasks*: seq[Future[void]] # Advertise tasks + discoveryLoop*: Future[void] # Discovery loop task handle + discoveryTasks*: seq[Future[void]] # Discovery tasks + discoveryQueue*: AsyncQueue[Cid] # Discovery queue + minPeersPerBlock*: int # Max number of peers with block + discoveryLoopSleep: Duration # Discovery loop sleep + advertiseLoopSleep: Duration # Advertise loop sleep + inFlightDiscReqs*: Table[Cid, Future[seq[SignedPeerRecord]]] # Inflight discovery requests + inFlightAdvReqs*: Table[Cid, Future[void]] # Inflight advertise requests + +proc discoveryQueueLoop(b: DiscoveryEngine) {.async.} = + while b.discEngineRunning: + for cid in toSeq(b.pendingBlocks.wantList): + try: + await b.discoveryQueue.put(cid) + except CatchableError as exc: + trace "Exception in discovery loop", exc = exc.msg + + logScope: + sleep = b.discoveryLoopSleep + wanted = b.pendingBlocks.len + + trace "About to sleep discovery loop" + await sleepAsync(b.discoveryLoopSleep) + +proc advertiseQueueLoop*(b: DiscoveryEngine) {.async.} = + proc onBlock(cid: Cid) {.async.} = + try: + await b.advertiseQueue.put(cid) + except CancelledError as exc: + trace "Cancelling block listing" + raise exc + except CatchableError as exc: + trace "Exception listing blocks", exc = exc.msg + + while b.discEngineRunning: + await b.localStore.listBlocks(onBlock) + + trace "About to sleep advertise loop", sleep = b.advertiseLoopSleep + await sleepAsync(b.advertiseLoopSleep) + + trace "Exiting advertise task loop" + +proc advertiseTaskLoop(b: DiscoveryEngine) {.async.} = + ## Run advertise tasks + ## + + while b.discEngineRunning: + try: + let + cid = await b.advertiseQueue.get() + + if cid in b.inFlightAdvReqs: + trace "Advertise request already in progress", cid = $cid + continue + + try: + trace "Advertising block", cid = $cid + let request = b.discovery.provideBlock(cid) + b.inFlightAdvReqs[cid] = request + await request + finally: + b.inFlightAdvReqs.del(cid) + except CatchableError as exc: + trace "Exception in advertise task runner", exc = exc.msg + + trace "Exiting advertise task runner" + +proc discoveryTaskLoop(b: DiscoveryEngine) {.async.} = + ## Run discovery tasks + ## + + while b.discEngineRunning: + try: + let + cid = await b.discoveryQueue.get() + + if cid in b.inFlightDiscReqs: + trace "Discovery request already in progress", cid = $cid + continue + + let + haves = b.peers.peersHave(cid) + + trace "Current number of peers for block", cid = $cid, count = haves.len + if haves.len < b.minPeersPerBlock: + trace "Discovering block", cid = $cid + try: + let + request = b.discovery + .findBlockProviders(cid) + .wait(DefaultDiscoveryTimeout) + + b.inFlightDiscReqs[cid] = request + let + peers = await request + + trace "Discovered peers", peers = peers.len + checkFutures( + await allFinished(peers.mapIt( b.network.dialPeer(it.data)))) + finally: + b.inFlightDiscReqs.del(cid) + except CatchableError as exc: + trace "Exception in discovery task runner", exc = exc.msg + + trace "Exiting discovery task runner" + +proc queueFindBlocksReq*(b: DiscoveryEngine, cids: seq[Cid]) {.inline.} = + proc queueReq() {.async.} = + try: + for cid in cids: + if cid notin b.discoveryQueue: + trace "Queueing find block request", cid = $cid + await b.discoveryQueue.put(cid) + except CatchableError as exc: + trace "Exception queueing discovery request", exc = exc.msg + + asyncSpawn queueReq() + +proc queueProvideBlocksReq*(b: DiscoveryEngine, cids: seq[Cid]) {.inline.} = + proc queueReq() {.async.} = + try: + for cid in cids: + if cid notin b.advertiseQueue: + trace "Queueing provide block request", cid = $cid + await b.advertiseQueue.put(cid) + except CatchableError as exc: + trace "Exception queueing discovery request", exc = exc.msg + + asyncSpawn queueReq() + +proc start*(b: DiscoveryEngine) {.async.} = + ## Start the discengine task + ## + + trace "discovery engine start" + + if b.discEngineRunning: + warn "Starting discovery engine twice" + return + + b.discEngineRunning = true + for i in 0.. 0: - await b.handleWantList(peer, msg.wantlist) + asyncSpawn b.handleWantList(peer, msg.wantlist) if msg.payload.len > 0: - await b.handleBlocks(peer, msg.payload) + asyncSpawn b.handleBlocks(peer, msg.payload) if msg.blockPresences.len > 0: - await b.handleBlockPresence(peer, msg.blockPresences) + asyncSpawn b.handleBlockPresence(peer, msg.blockPresences) if account =? Account.init(msg.account): - await b.handleAccount(peer, account) + asyncSpawn b.handleAccount(peer, account) if payment =? SignedState.init(msg.payment): - await b.handlePayment(peer, payment) + asyncSpawn b.handlePayment(peer, payment) except CatchableError as exc: trace "Exception in blockexc rpc handler", exc = exc.msg @@ -298,7 +300,7 @@ proc dialPeer*(b: BlockExcNetwork, peer: PeerRecord) {.async.} = try: await b.switch.connect(peer.peerId, peer.addresses.mapIt(it.address)) except CatchableError as exc: - debug "Failed to connect to peer", error=exc.msg + debug "Failed to connect to peer", error = exc.msg, peer proc dropPeer*(b: BlockExcNetwork, peer: PeerID) = ## Cleanup disconnected peer diff --git a/dagger/blockexchange/networkpeer.nim b/dagger/blockexchange/networkpeer.nim index 2ef8d473..bf84384a 100644 --- a/dagger/blockexchange/networkpeer.nim +++ b/dagger/blockexchange/networkpeer.nim @@ -17,7 +17,8 @@ import ./protobuf/blockexc logScope: topics = "dagger blockexc networkpeer" -const MaxMessageSize = 100 * 1024 * 1024 # manifest files can be big +const + MaxMessageSize = 100 * 1024 * 1024 # manifest files can be big type RPCHandler* = proc(peer: NetworkPeer, msg: Message): Future[void] {.gcsafe.} diff --git a/dagger/blockexchange/peercontext.nim b/dagger/blockexchange/peercontext.nim index 2fb24216..0d303703 100644 --- a/dagger/blockexchange/peercontext.nim +++ b/dagger/blockexchange/peercontext.nim @@ -4,6 +4,7 @@ import pkg/libp2p import pkg/chronos import pkg/nitro import pkg/questionable + import ./protobuf/blockexc import ./protobuf/payments import ./protobuf/presence diff --git a/dagger/blockexchange/peers.nim b/dagger/blockexchange/peers.nim new file mode 100644 index 00000000..e159806a --- /dev/null +++ b/dagger/blockexchange/peers.nim @@ -0,0 +1,3 @@ +import ./peers/peerctxstore + +export peerctxstore diff --git a/dagger/blockexchange/peers/peerctxstore.nim b/dagger/blockexchange/peers/peerctxstore.nim new file mode 100644 index 00000000..9453a6f8 --- /dev/null +++ b/dagger/blockexchange/peers/peerctxstore.nim @@ -0,0 +1,82 @@ +## Nim-Dagger +## Copyright (c) 2022 Status Research & Development GmbH +## Licensed under either of +## * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE)) +## * MIT license ([LICENSE-MIT](LICENSE-MIT)) +## at your option. +## This file may not be copied, modified, or distributed except according to +## those terms. + +import std/sequtils +import std/tables +import std/algorithm + +import pkg/upraises + +push: {.upraises: [].} + +import pkg/chronos +import pkg/chronicles +import pkg/libp2p + +import ../protobuf/blockexc + +import ../peercontext +export peercontext + +logScope: + topics = "dagger blockexc peerctxstore" + +type + PeerCtxStore* = ref object of RootObj + peers*: OrderedTable[PeerID, BlockExcPeerCtx] + +iterator items*(self: PeerCtxStore): BlockExcPeerCtx = + for p in self.peers.values: + yield p + +func contains*(self: PeerCtxStore, peerId: PeerID): bool = + peerId in self.peers + +func add*(self: PeerCtxStore, peer: BlockExcPeerCtx) = + trace "Adding peer to peer context store", peer = peer.id + self.peers[peer.id] = peer + +func remove*(self: PeerCtxStore, peerId: PeerID) = + trace "Removing peer from peer context store", peer = peerId + self.peers.del(peerId) + +func get*(self: PeerCtxStore, peerId: PeerID): BlockExcPeerCtx = + trace "Retrieving peer from peer context store", peer = peerId + self.peers.getOrDefault(peerId, nil) + +func len*(self: PeerCtxStore): int = + self.peers.len + +func peersHave*(self: PeerCtxStore, cid: Cid): seq[BlockExcPeerCtx] = + toSeq(self.peers.values).filterIt( it.peerHave.anyIt( it == cid ) ) + +func peersWant*(self: PeerCtxStore, cid: Cid): seq[BlockExcPeerCtx] = + toSeq(self.peers.values).filterIt( it.peerWants.anyIt( it.cid == cid ) ) + +func selectCheapest*(self: PeerCtxStore, cid: Cid): seq[BlockExcPeerCtx] = + var + peers = self.peersHave(cid) + + func cmp(a, b: BlockExcPeerCtx): int = + # Can't do (a - b) without cast[int](a - b) + if a.peerPrices.getOrDefault(cid, 0.u256) == + b.peerPrices.getOrDefault(cid, 0.u256): + 0 + elif a.peerPrices.getOrDefault(cid, 0.u256) > + b.peerPrices.getOrDefault(cid, 0.u256): + 1 + else: + -1 + + peers.sort(cmp) + trace "Selected cheapest peers", peers = peers.len + return peers + +proc new*(T: type PeerCtxStore): PeerCtxStore = + T(peers: initOrderedTable[PeerID, BlockExcPeerCtx]()) diff --git a/dagger/blockexchange/pendingblocks.nim b/dagger/blockexchange/pendingblocks.nim index f3524e81..f1e30f6c 100644 --- a/dagger/blockexchange/pendingblocks.nim +++ b/dagger/blockexchange/pendingblocks.nim @@ -8,7 +8,6 @@ ## those terms. import std/tables -import std/sequtils import pkg/upraises @@ -52,12 +51,6 @@ proc getWantHandle*( finally: p.blocks.del(cid) -proc addOrAwait*( - p: PendingBlocksManager, - cid: Cid, - timeout = DefaultBlockTimeout): Future[Block] {.deprecated: "Use getWantHandle".} = - p.getWantHandle(cid, timeout) - proc resolve*( p: PendingBlocksManager, blocks: seq[Block]) = @@ -94,5 +87,4 @@ func len*(p: PendingBlocksManager): int = func new*(T: type PendingBlocksManager): T = T( - blocks: initTable[Cid, Future[Block]]() - ) + blocks: initTable[Cid, Future[Block]]()) diff --git a/dagger/dagger.nim b/dagger/dagger.nim index cf92ab82..f938bd8b 100644 --- a/dagger/dagger.nim +++ b/dagger/dagger.nim @@ -110,7 +110,7 @@ proc new*(T: type DaggerServer, config: DaggerConf): T = let discoveryBootstrapNodes = config.bootstrapNodes - discovery = Discovery.new( + blockDiscovery = Discovery.new( switch.peerInfo, discoveryPort = config.discoveryPort, bootstrapNodes = discoveryBootstrapNodes @@ -119,7 +119,10 @@ proc new*(T: type DaggerServer, config: DaggerConf): T = wallet = WalletRef.new(EthPrivateKey.random()) network = BlockExcNetwork.new(switch) localStore = FSStore.new(config.dataDir / "repo", cache = cache) - engine = BlockExcEngine.new(localStore, wallet, network, discovery) + peerStore = PeerCtxStore.new() + pendingBlocks = PendingBlocksManager.new() + discovery = DiscoveryEngine.new(localStore, peerStore, network, blockDiscovery, pendingBlocks) + engine = BlockExcEngine.new(localStore, wallet, network, discovery, peerStore, pendingBlocks) store = NetworkStore.new(engine, localStore) erasure = Erasure.new(store, leoEncoderProvider, leoDecoderProvider) contracts = ContractInteractions.new( @@ -127,7 +130,7 @@ proc new*(T: type DaggerServer, config: DaggerConf): T = config.ethDeployment, config.ethAccount ) - daggerNode = DaggerNodeRef.new(switch, store, engine, erasure, discovery, contracts) + daggerNode = DaggerNodeRef.new(switch, store, engine, erasure, blockDiscovery, contracts) restServer = RestServerRef.new( daggerNode.initRestApi(config), initTAddress("127.0.0.1" , config.apiPort), diff --git a/dagger/discovery.nim b/dagger/discovery.nim index 96c961a9..44dfd04d 100644 --- a/dagger/discovery.nim +++ b/dagger/discovery.nim @@ -28,8 +28,8 @@ type proc new*( T: type Discovery, localInfo: PeerInfo, - discoveryPort: Port, - bootstrapNodes = newSeq[SignedPeerRecord](), + discoveryPort = 0.Port, + bootstrapNodes: seq[SignedPeerRecord] = @[], ): T = T( @@ -40,8 +40,7 @@ proc new*( bootstrapRecords = bootstrapNodes, rng = Rng.instance() ), - localInfo: localInfo - ) + localInfo: localInfo) proc findPeer*( d: Discovery, diff --git a/dagger/erasure/erasure.nim b/dagger/erasure/erasure.nim index cf52ba94..c47f57cd 100644 --- a/dagger/erasure/erasure.nim +++ b/dagger/erasure/erasure.nim @@ -19,7 +19,7 @@ import pkg/chronicles import ../manifest import ../stores import ../errors -import ../blocktype +import ../blocktype as bt import ./backend @@ -96,7 +96,7 @@ proc encode*( try: for i in 0.. 0: + check false + count.inc + + await reqs # queue the request + + await discoveryEngine.start() + discoveryEngine.queueFindBlocksReq(@[blocks[0].cid]) + await sleepAsync(200.millis) + + discoveryEngine.queueFindBlocksReq(@[blocks[0].cid]) + await sleepAsync(200.millis) + + reqs.complete() + await discoveryEngine.stop() + + test "Should not request if there is already an inflight advertise request": + var + localStore = CacheStore.new() + discoveryEngine = DiscoveryEngine.new( + localStore, + peerStore, + network, + blockDiscovery, + pendingBlocks, + discoveryLoopSleep = 100.millis, + concurrentAdvReqs = 2) + reqs = newFuture[void]() + count = 0 + + blockDiscovery.publishProvideHandler = + proc(d: MockDiscovery, cid: Cid) {.async, gcsafe.} = + check cid == blocks[0].cid + if count > 0: + check false + count.inc + + await reqs # queue the request + + await discoveryEngine.start() + discoveryEngine.queueProvideBlocksReq(@[blocks[0].cid]) + await sleepAsync(200.millis) + + discoveryEngine.queueProvideBlocksReq(@[blocks[0].cid]) + await sleepAsync(200.millis) + + reqs.complete() + await discoveryEngine.stop() diff --git a/tests/dagger/blockexc/protobuf/testpresence.nim b/tests/dagger/blockexc/protobuf/testpresence.nim index b4a1bc33..65969d0c 100644 --- a/tests/dagger/blockexc/protobuf/testpresence.nim +++ b/tests/dagger/blockexc/protobuf/testpresence.nim @@ -1,4 +1,3 @@ -import std/sequtils import pkg/asynctest import pkg/chronos import pkg/libp2p diff --git a/tests/dagger/blockexc/testblockexc.nim b/tests/dagger/blockexc/testblockexc.nim index d80f643b..b878b6ff 100644 --- a/tests/dagger/blockexc/testblockexc.nim +++ b/tests/dagger/blockexc/testblockexc.nim @@ -19,23 +19,15 @@ import ../helpers import ../examples suite "NetworkStore engine - 2 nodes": - let chunker1 = RandomChunker.new(Rng.instance(), size = 1024, chunkSize = 256) chunker2 = RandomChunker.new(Rng.instance(), size = 1024, chunkSize = 256) var - switch1, switch2: Switch - wallet1, wallet2: WalletRef - pricing1, pricing2: Pricing - network1, network2: BlockExcNetwork - blockexc1, blockexc2: NetworkStore - peerId1, peerId2: PeerID + nodeCmps1, nodeCmps2: NodesComponents peerCtx1, peerCtx2: BlockExcPeerCtx + pricing1, pricing2: Pricing blocks1, blocks2: seq[bt.Block] - engine1, engine2: BlockExcEngine - localStore1, localStore2: BlockStore - discovery1, discovery2: Discovery pendingBlocks1, pendingBlocks2: seq[Future[bt.Block]] setup: @@ -53,70 +45,54 @@ suite "NetworkStore engine - 2 nodes": blocks2.add(bt.Block.new(chunk).tryGet()) - switch1 = newStandardSwitch() - switch2 = newStandardSwitch() - wallet1 = WalletRef.example - wallet2 = WalletRef.example - pricing1 = Pricing.example - pricing2 = Pricing.example - await switch1.start() - await switch2.start() - - peerId1 = switch1.peerInfo.peerId - peerId2 = switch2.peerInfo.peerId - - localStore1 = CacheStore.new(blocks1.mapIt( it )) - discovery1 = Discovery.new(switch1.peerInfo, Port(0)) - network1 = BlockExcNetwork.new(switch = switch1) - engine1 = BlockExcEngine.new(localStore1, wallet1, network1, discovery1) - blockexc1 = NetworkStore.new(engine1, localStore1) - switch1.mount(network1) - - localStore2 = CacheStore.new(blocks2.mapIt( it )) - discovery2 = Discovery.new(switch2.peerInfo, Port(0)) - network2 = BlockExcNetwork.new(switch = switch2) - engine2 = BlockExcEngine.new(localStore2, wallet2, network2, discovery2) - blockexc2 = NetworkStore.new(engine2, localStore2) - switch2.mount(network2) + nodeCmps1 = generateNodes(1, blocks1)[0] + nodeCmps2 = generateNodes(1, blocks2)[0] await allFuturesThrowing( - engine1.start(), - engine2.start(), - ) + nodeCmps1.switch.start(), + nodeCmps1.blockDiscovery.start(), + nodeCmps1.engine.start(), + nodeCmps2.switch.start(), + nodeCmps2.blockDiscovery.start(), + nodeCmps2.engine.start()) # initialize our want lists - pendingBlocks1 = blocks2.mapIt( blockexc1.engine.pendingBlocks.getWantHandle( it.cid ) ) - pendingBlocks2 = blocks1.mapIt( blockexc2.engine.pendingBlocks.getWantHandle( it.cid ) ) + pendingBlocks1 = blocks2.mapIt( nodeCmps1.pendingBlocks.getWantHandle( it.cid ) ) + pendingBlocks2 = blocks1.mapIt( nodeCmps2.pendingBlocks.getWantHandle( it.cid ) ) - pricing1.address = wallet1.address - pricing2.address = wallet2.address - blockexc1.engine.pricing = pricing1.some - blockexc2.engine.pricing = pricing2.some + pricing1.address = nodeCmps1.wallet.address + pricing2.address = nodeCmps2.wallet.address + nodeCmps1.engine.pricing = pricing1.some + nodeCmps2.engine.pricing = pricing2.some - await switch1.connect( - switch2.peerInfo.peerId, - switch2.peerInfo.addrs) + await nodeCmps1.switch.connect( + nodeCmps2.switch.peerInfo.peerId, + nodeCmps2.switch.peerInfo.addrs) await sleepAsync(1.seconds) # give some time to exchange lists - peerCtx2 = blockexc1.engine.getPeerCtx(peerId2) - peerCtx1 = blockexc2.engine.getPeerCtx(peerId1) + peerCtx2 = nodeCmps1.peerStore.get(nodeCmps2.switch.peerInfo.peerId) + peerCtx1 = nodeCmps2.peerStore.get(nodeCmps1.switch.peerInfo.peerId) + + check isNil(peerCtx1).not + check isNil(peerCtx2).not teardown: await allFuturesThrowing( - engine1.stop(), - engine2.stop(), - switch1.stop(), - switch2.stop()) - - test "should exchange want lists on connect": - check not isNil(peerCtx1) - check not isNil(peerCtx2) + nodeCmps1.blockDiscovery.stop(), + nodeCmps1.engine.stop(), + nodeCmps1.switch.stop(), + nodeCmps2.blockDiscovery.stop(), + nodeCmps2.engine.stop(), + nodeCmps2.switch.stop()) + test "Should exchange want lists on connect": await allFuturesThrowing( allFinished(pendingBlocks1)) + .wait(10.seconds) await allFuturesThrowing( allFinished(pendingBlocks2)) + .wait(10.seconds) check: peerCtx1.peerHave.mapIt( $it ).sorted(cmp[string]) == @@ -125,13 +101,13 @@ suite "NetworkStore engine - 2 nodes": peerCtx2.peerHave.mapIt( $it ).sorted(cmp[string]) == pendingBlocks1.mapIt( $it.read.cid ).sorted(cmp[string]) - test "exchanges accounts on connect": + test "Should exchanges accounts on connect": check peerCtx1.account.?address == pricing1.address.some check peerCtx2.account.?address == pricing2.address.some - test "should send want-have for block": + test "Should send want-have for block": let blk = bt.Block.new("Block 1".toBytes).tryGet() - check await blockexc2.engine.localStore.putBlock(blk) + check await nodeCmps2.localStore.putBlock(blk) let entry = Entry( `block`: blk.cid.data.buffer, @@ -141,43 +117,43 @@ suite "NetworkStore engine - 2 nodes": sendDontHave: false) peerCtx1.peerWants.add(entry) - check blockexc2 + check nodeCmps2 .engine .taskQueue .pushOrUpdateNoWait(peerCtx1).isOk await sleepAsync(100.millis) - check blockexc1.engine.localStore.hasBlock(blk.cid) + check nodeCmps1.localStore.hasBlock(blk.cid) - test "should get blocks from remote": + test "Should get blocks from remote": let blocks = await allFinished( - blocks2.mapIt( blockexc1.getBlock(it.cid) )) + blocks2.mapIt( nodeCmps1.networkStore.getBlock(it.cid) )) check blocks.mapIt( !it.read ) == blocks2 - test "remote should send blocks when available": + test "Remote should send blocks when available": let blk = bt.Block.new("Block 1".toBytes).tryGet() # should fail retrieving block from remote - check not await blockexc1.getBlock(blk.cid) + check not await nodeCmps1.networkStore.getBlock(blk.cid) .withTimeout(100.millis) # should expire - # first put the required block in the local store - check await blockexc2.engine.localStore.putBlock(blk) - # second trigger blockexc to resolve any pending requests # for the block - check await blockexc2.putBlock(blk) + check await nodeCmps2.networkStore.putBlock(blk) # should succeed retrieving block from remote - check await blockexc1.getBlock(blk.cid) - .withTimeout(100.millis) # should succede + check await nodeCmps1.networkStore.getBlock(blk.cid) + .withTimeout(100.millis) # should succeed - test "receives payments for blocks that were sent": + test "Should receive payments for blocks that were sent": let blocks = await allFinished( - blocks2.mapIt( blockexc1.getBlock(it.cid) )) + blocks2.mapIt( nodeCmps1.networkStore.getBlock(it.cid) )) + await sleepAsync(100.millis) - let channel = !peerCtx1.paymentChannel - check wallet2.balance(channel, Asset) > 0 + + let + channel = !peerCtx1.paymentChannel + check nodeCmps2.wallet.balance(channel, Asset) > 0 suite "NetworkStore - multiple nodes": let @@ -185,7 +161,7 @@ suite "NetworkStore - multiple nodes": var switch: seq[Switch] - blockexc: seq[NetworkStore] + networkStore: seq[NetworkStore] blocks: seq[bt.Block] setup: @@ -198,8 +174,8 @@ suite "NetworkStore - multiple nodes": for e in generateNodes(5): switch.add(e.switch) - blockexc.add(e.blockexc) - await e.blockexc.engine.start() + networkStore.add(e.networkStore) + await e.engine.start() await allFuturesThrowing( switch.mapIt( it.start() ) @@ -211,11 +187,11 @@ suite "NetworkStore - multiple nodes": ) switch = @[] - blockexc = @[] + networkStore = @[] - test "should receive haves for own want list": + test "Should receive haves for own want list": let - downloader = blockexc[4] + downloader = networkStore[4] engine = downloader.engine # Add blocks from 1st peer to want list @@ -224,13 +200,13 @@ suite "NetworkStore - multiple nodes": pendingBlocks2 = blocks[12..15].mapIt( engine.pendingBlocks.getWantHandle( it.cid )) await allFutures( - blocks[0..3].mapIt( blockexc[0].engine.localStore.putBlock(it) )) + blocks[0..3].mapIt( networkStore[0].engine.localStore.putBlock(it) )) await allFutures( - blocks[4..7].mapIt( blockexc[1].engine.localStore.putBlock(it) )) + blocks[4..7].mapIt( networkStore[1].engine.localStore.putBlock(it) )) await allFutures( - blocks[8..11].mapIt( blockexc[2].engine.localStore.putBlock(it) )) + blocks[8..11].mapIt( networkStore[2].engine.localStore.putBlock(it) )) await allFutures( - blocks[12..15].mapIt( blockexc[3].engine.localStore.putBlock(it) )) + blocks[12..15].mapIt( networkStore[3].engine.localStore.putBlock(it) )) await connectNodes(switch) await sleepAsync(1.seconds) @@ -239,16 +215,19 @@ suite "NetworkStore - multiple nodes": allFinished(pendingBlocks1), allFinished(pendingBlocks2)) + let + peers = toSeq(engine.peers) + check: - engine.peers[0].peerHave.mapIt($it).sorted(cmp[string]) == + peers[0].peerHave.mapIt($it).sorted(cmp[string]) == blocks[0..3].mapIt( $(it.cid) ).sorted(cmp[string]) - engine.peers[3].peerHave.mapIt($it).sorted(cmp[string]) == + peers[3].peerHave.mapIt($it).sorted(cmp[string]) == blocks[12..15].mapIt( $(it.cid) ).sorted(cmp[string]) - test "should exchange blocks with multiple nodes": + test "Should exchange blocks with multiple nodes": let - downloader = blockexc[4] + downloader = networkStore[4] engine = downloader.engine # Add blocks from 1st peer to want list @@ -257,13 +236,13 @@ suite "NetworkStore - multiple nodes": pendingBlocks2 = blocks[12..15].mapIt( engine.pendingBlocks.getWantHandle( it.cid )) await allFutures( - blocks[0..3].mapIt( blockexc[0].engine.localStore.putBlock(it) )) + blocks[0..3].mapIt( networkStore[0].engine.localStore.putBlock(it) )) await allFutures( - blocks[4..7].mapIt( blockexc[1].engine.localStore.putBlock(it) )) + blocks[4..7].mapIt( networkStore[1].engine.localStore.putBlock(it) )) await allFutures( - blocks[8..11].mapIt( blockexc[2].engine.localStore.putBlock(it) )) + blocks[8..11].mapIt( networkStore[2].engine.localStore.putBlock(it) )) await allFutures( - blocks[12..15].mapIt( blockexc[3].engine.localStore.putBlock(it) )) + blocks[12..15].mapIt( networkStore[3].engine.localStore.putBlock(it) )) await connectNodes(switch) await sleepAsync(1.seconds) diff --git a/tests/dagger/blockexc/testengine.nim b/tests/dagger/blockexc/testengine.nim index d741a120..7813b626 100644 --- a/tests/dagger/blockexc/testengine.nim +++ b/tests/dagger/blockexc/testengine.nim @@ -21,19 +21,28 @@ import ../helpers import ../examples suite "NetworkStore engine basic": - let + var + rng: Rng + seckey: PrivateKey + peerId: PeerID + chunker: Chunker + wallet: WalletRef + blockDiscovery: Discovery + peerStore: PeerCtxStore + pendingBlocks: PendingBlocksManager + blocks: seq[bt.Block] + done: Future[void] + + setup: rng = Rng.instance() seckey = PrivateKey.random(rng[]).tryGet() peerId = PeerID.init(seckey.getPublicKey().tryGet()).tryGet() chunker = RandomChunker.new(Rng.instance(), size = 1024, chunkSize = 256) wallet = WalletRef.example - discovery = Discovery.new() + blockDiscovery = Discovery.new() + peerStore = PeerCtxStore.new() + pendingBlocks = PendingBlocksManager.new() - var - blocks: seq[bt.Block] - done: Future[void] - - setup: while true: let chunk = await chunker.getBytes() if chunk.len <= 0: @@ -43,7 +52,7 @@ suite "NetworkStore engine basic": done = newFuture[void]() - test "should send want list to new peers": + test "Should send want list to new peers": proc sendWantList( id: PeerID, cids: seq[Cid], @@ -53,7 +62,6 @@ suite "NetworkStore engine basic": full: bool = false, sendDontHave: bool = false) {.gcsafe.} = check cids.mapIt($it).sorted == blocks.mapIt( $it.cid ).sorted - done.complete() let @@ -61,19 +69,29 @@ suite "NetworkStore engine basic": sendWantList: sendWantList, )) + localStore = CacheStore.new(blocks.mapIt( it )) + discovery = DiscoveryEngine.new( + localStore, + peerStore, + network, + blockDiscovery, + pendingBlocks) + engine = BlockExcEngine.new( - CacheStore.new(blocks.mapIt( it )), + localStore, wallet, network, - discovery) + discovery, + peerStore, + pendingBlocks) for b in blocks: discard engine.pendingBlocks.getWantHandle(b.cid) engine.setupPeer(peerId) - await done + await done.wait(100.millis) - test "sends account to new peers": + test "Should send account to new peers": let pricing = Pricing.example proc sendAccount(peer: PeerID, account: Account) = @@ -82,31 +100,52 @@ suite "NetworkStore engine basic": let network = BlockExcNetwork(request: BlockExcRequest( - sendAccount: sendAccount, + sendAccount: sendAccount )) - engine = BlockExcEngine.new(CacheStore.new, wallet, network, discovery) + localStore = CacheStore.new() + discovery = DiscoveryEngine.new( + localStore, + peerStore, + network, + blockDiscovery, + pendingBlocks) + + engine = BlockExcEngine.new( + localStore, + wallet, + network, + discovery, + peerStore, + pendingBlocks) engine.pricing = pricing.some engine.setupPeer(peerId) + await done.wait(100.millis) suite "NetworkStore engine handlers": - let - rng = Rng.instance() - seckey = PrivateKey.random(rng[]).tryGet() - peerId = PeerID.init(seckey.getPublicKey().tryGet()).tryGet() - chunker = RandomChunker.new(Rng.instance(), size = 1024, chunkSize = 256) - wallet = WalletRef.example - discovery = Discovery.new() - var + rng: Rng + seckey: PrivateKey + peerId: PeerID + chunker: Chunker + wallet: WalletRef + blockDiscovery: Discovery + peerStore: PeerCtxStore + pendingBlocks: PendingBlocksManager + network: BlockExcNetwork engine: BlockExcEngine + discovery: DiscoveryEngine peerCtx: BlockExcPeerCtx + localStore: BlockStore done: Future[void] blocks: seq[bt.Block] setup: + rng = Rng.instance() + chunker = RandomChunker.new(rng, size = 1024, chunkSize = 256) + while true: let chunk = await chunker.getBytes() if chunk.len <= 0: @@ -114,14 +153,38 @@ suite "NetworkStore engine handlers": blocks.add(bt.Block.new(chunk).tryGet()) - done = newFuture[void]() - engine = BlockExcEngine.new(CacheStore.new(), wallet, BlockExcNetwork(), discovery) + seckey = PrivateKey.random(rng[]).tryGet() + peerId = PeerID.init(seckey.getPublicKey().tryGet()).tryGet() + wallet = WalletRef.example + blockDiscovery = Discovery.new() + peerStore = PeerCtxStore.new() + pendingBlocks = PendingBlocksManager.new() + + localStore = CacheStore.new() + network = BlockExcNetwork() + + discovery = DiscoveryEngine.new( + localStore, + peerStore, + network, + blockDiscovery, + pendingBlocks) + + engine = BlockExcEngine.new( + localStore, + wallet, + network, + discovery, + peerStore, + pendingBlocks) + peerCtx = BlockExcPeerCtx( id: peerId ) engine.peers.add(peerCtx) + done = newFuture[void]() - test "should handle want list": + test "Should handle want list": let wantList = makeWantList(blocks.mapIt( it.cid )) proc handler() {.async.} = let ctx = await engine.taskQueue.pop() @@ -132,7 +195,7 @@ suite "NetworkStore engine handlers": await engine.wantListHandler(peerId, wantList) await done - test "should handle want list - `dont-have`": + test "Should handle want list - `dont-have`": let wantList = makeWantList(blocks.mapIt( it.cid ), sendDontHave = true) proc sendPresence(peerId: PeerID, presence: seq[BlockPresence]) = check presence.mapIt( it.cid ) == wantList.entries.mapIt( it.`block` ) @@ -150,7 +213,7 @@ suite "NetworkStore engine handlers": await done - test "should handle want list - `dont-have` some blocks": + test "Should handle want list - `dont-have` some blocks": let wantList = makeWantList(blocks.mapIt( it.cid ), sendDontHave = true) proc sendPresence(peerId: PeerID, presence: seq[BlockPresence]) = check presence.mapIt( it.cid ) == blocks[2..blocks.high].mapIt( it.cid.data.buffer ) @@ -170,7 +233,7 @@ suite "NetworkStore engine handlers": await done - test "stores blocks in local store": + test "Should store blocks in local store": let pending = blocks.mapIt( engine.pendingBlocks.getWantHandle( it.cid ) ) @@ -181,9 +244,9 @@ suite "NetworkStore engine handlers": for b in blocks: check engine.localStore.hasBlock(b.cid) - test "sends payments for received blocks": + test "Should send payments for received blocks": let account = Account(address: EthAddress.example) - let peerContext = engine.getPeerCtx(peerId) + let peerContext = peerStore.get(peerId) peerContext.account = account.some peerContext.peerPrices = blocks.mapIt((it.cid, rand(uint16).u256)).toTable @@ -197,10 +260,9 @@ suite "NetworkStore engine handlers": )) await engine.blocksHandler(peerId, blocks) - await done.wait(100.millis) - test "should handle block presence": + test "Should handle block presence": let price = UInt256.example await engine.blockPresenceHandler( peerId, @@ -217,20 +279,28 @@ suite "NetworkStore engine handlers": check peerCtx.peerPrices[cid] == price suite "Task Handler": - - let - rng = Rng.instance() - chunker = RandomChunker.new(Rng.instance(), size = 2048, chunkSize = 256) - wallet = WalletRef.example - var + rng: Rng + seckey: PrivateKey + peerId: PeerID + chunker: Chunker + wallet: WalletRef + blockDiscovery: Discovery + peerStore: PeerCtxStore + pendingBlocks: PendingBlocksManager + network: BlockExcNetwork engine: BlockExcEngine + discovery: DiscoveryEngine + peerCtx: BlockExcPeerCtx + localStore: BlockStore + peersCtx: seq[BlockExcPeerCtx] peers: seq[PeerID] - done: Future[void] blocks: seq[bt.Block] setup: + rng = Rng.instance() + chunker = RandomChunker.new(rng, size = 1024, chunkSize = 256) while true: let chunk = await chunker.getBytes() if chunk.len <= 0: @@ -238,8 +308,30 @@ suite "Task Handler": blocks.add(bt.Block.new(chunk).tryGet()) - done = newFuture[void]() - engine = BlockExcEngine.new(CacheStore.new(), wallet, BlockExcNetwork(), Discovery.new()) + seckey = PrivateKey.random(rng[]).tryGet() + peerId = PeerID.init(seckey.getPublicKey().tryGet()).tryGet() + wallet = WalletRef.example + blockDiscovery = Discovery.new() + peerStore = PeerCtxStore.new() + pendingBlocks = PendingBlocksManager.new() + + localStore = CacheStore.new() + network = BlockExcNetwork() + + discovery = DiscoveryEngine.new( + localStore, + peerStore, + network, + blockDiscovery, + pendingBlocks) + + engine = BlockExcEngine.new( + localStore, + wallet, + network, + discovery, + peerStore, + pendingBlocks) peersCtx = @[] for i in 0..3: @@ -249,8 +341,8 @@ suite "Task Handler": peersCtx.add(BlockExcPeerCtx( id: peers[i] )) + peerStore.add(peersCtx[i]) - engine.peers = peersCtx engine.pricing = Pricing.example.some test "Should send want-blocks in priority order": diff --git a/tests/dagger/blockexc/testpeerctxstore.nim b/tests/dagger/blockexc/testpeerctxstore.nim new file mode 100644 index 00000000..21f75cb3 --- /dev/null +++ b/tests/dagger/blockexc/testpeerctxstore.nim @@ -0,0 +1,109 @@ +import std/sugar +import std/sequtils + +import pkg/unittest2 +import pkg/libp2p + +import pkg/dagger/blockexchange/peers +import pkg/dagger/blockexchange/protobuf/blockexc + +import ../examples + +suite "Peer Context Store": + var + store: PeerCtxStore + peerCtx: BlockExcPeerCtx + + setup: + store = PeerCtxStore.new() + peerCtx = BlockExcPeerCtx.example + store.add(peerCtx) + + test "Should add peer": + check peerCtx.id in store + + test "Should remove peer": + store.remove(peerCtx.id) + check peerCtx.id notin store + + test "Should get peer": + check store.get(peerCtx.id) == peerCtx + +suite "Peer Context Store Peer Selection": + var + store: PeerCtxStore + peerCtxs: seq[BlockExcPeerCtx] + cids: seq[Cid] + + setup: + store = PeerCtxStore.new() + cids = collect(newSeq): + for i in 0..<10: Cid.example + + peerCtxs = collect(newSeq): + for i in 0..<10: BlockExcPeerCtx.example + + for p in peerCtxs: + store.add(p) + + teardown: + store = nil + cids = @[] + peerCtxs = @[] + + test "Should select peers that have Cid": + peerCtxs[0].peerPrices = collect(initTable): + for i, c in cids: + { c: i.u256 } + + peerCtxs[5].peerPrices = collect(initTable): + for i, c in cids: + { c: i.u256 } + + let + peers = store.peersHave(cids[0]) + + check peers.len == 2 + check peerCtxs[0] in peers + check peerCtxs[5] in peers + + test "Should select cheapest peers for Cid": + peerCtxs[0].peerPrices = collect(initTable): + for i, c in cids: + { c: (5 + i).u256 } + + peerCtxs[5].peerPrices = collect(initTable): + for i, c in cids: + { c: (2 + i).u256 } + + peerCtxs[9].peerPrices = collect(initTable): + for i, c in cids: + { c: i.u256 } + + let + peers = store.selectCheapest(cids[0]) + + check peers.len == 3 + check peers[0] == peerCtxs[9] + check peers[1] == peerCtxs[5] + check peers[2] == peerCtxs[0] + + test "Should select peers that want Cid": + let + entries = cids.mapIt( + Entry( + `block`: it.data.buffer, + priority: 1, + cancel: false, + wantType: WantType.wantBlock, + sendDontHave: false)) + + peerCtxs[0].peerWants = entries + peerCtxs[5].peerWants = entries + + let + peers = store.peersWant(cids[4]) + + check peers.len == 2 + check peerCtxs[0] in peers + check peerCtxs[5] in peers diff --git a/tests/dagger/examples.nim b/tests/dagger/examples.nim index ee09fbde..3d330092 100644 --- a/tests/dagger/examples.nim +++ b/tests/dagger/examples.nim @@ -5,7 +5,7 @@ import pkg/nitro import pkg/stint import pkg/dagger/rng import pkg/dagger/stores -import pkg/dagger/blocktype +import pkg/dagger/blocktype as bt import pkg/dagger/sales import ../examples @@ -38,10 +38,10 @@ proc example*(_: type Pricing): Pricing = price: uint32.rand.u256 ) -proc example*(_: type Block): Block = +proc example*(_: type bt.Block): bt.Block = let length = rand(4096) let bytes = newSeqWith(length, rand(uint8)) - Block.new(bytes).tryGet() + bt.Block.new(bytes).tryGet() proc example*(_: type PeerId): PeerID = let key = PrivateKey.random(Rng.instance[]).get @@ -51,7 +51,7 @@ proc example*(_: type BlockExcPeerCtx): BlockExcPeerCtx = BlockExcPeerCtx(id: PeerID.example) proc example*(_: type Cid): Cid = - Block.example.cid + bt.Block.example.cid proc example*(_: type Availability): Availability = Availability.init(uint16.example, uint16.example, uint64.example.u256) diff --git a/tests/dagger/blockexc/discovery/mockdiscovery.nim b/tests/dagger/helpers/mockdiscovery.nim similarity index 71% rename from tests/dagger/blockexc/discovery/mockdiscovery.nim rename to tests/dagger/helpers/mockdiscovery.nim index 137db9c8..9b671a9a 100644 --- a/tests/dagger/blockexc/discovery/mockdiscovery.nim +++ b/tests/dagger/helpers/mockdiscovery.nim @@ -16,8 +16,10 @@ import pkg/dagger/discovery type MockDiscovery* = ref object of Discovery - findBlockProvidersHandler*: proc(d: MockDiscovery, cid: Cid): seq[SignedPeerRecord] {.gcsafe.} - publishProvideHandler*: proc(d: MockDiscovery, cid: Cid) {.gcsafe.} + findBlockProvidersHandler*: proc(d: MockDiscovery, cid: Cid): + Future[seq[SignedPeerRecord]] {.gcsafe.} + publishProvideHandler*: proc(d: MockDiscovery, cid: Cid): + Future[void] {.gcsafe.} proc new*( T: type MockDiscovery, @@ -36,13 +38,16 @@ proc findPeer*( method findBlockProviders*( d: MockDiscovery, cid: Cid): Future[seq[SignedPeerRecord]] {.async.} = - if isNil(d.findBlockProvidersHandler): return + if isNil(d.findBlockProvidersHandler): + return - return d.findBlockProvidersHandler(d, cid) + return await d.findBlockProvidersHandler(d, cid) -method provideBlock*(d: MockDiscovery, cid: Cid) {.async.} = - if isNil(d.publishProvideHandler): return - d.publishProvideHandler(d, cid) +method provideBlock*(d: MockDiscovery, cid: Cid): Future[void] {.async.} = + if isNil(d.publishProvideHandler): + return + + await d.publishProvideHandler(d, cid) proc start*(d: Discovery) {.async.} = discard diff --git a/tests/dagger/helpers/nodeutils.nim b/tests/dagger/helpers/nodeutils.nim index f190746f..bfc21a24 100644 --- a/tests/dagger/helpers/nodeutils.nim +++ b/tests/dagger/helpers/nodeutils.nim @@ -6,25 +6,51 @@ import pkg/libp2p import pkg/dagger/discovery import pkg/dagger/stores import pkg/dagger/blocktype as bt +import pkg/dagger/blockexchange import ../examples +type + NodesComponents* = tuple[ + switch: Switch, + blockDiscovery: Discovery, + wallet: WalletRef, + network: BlockExcNetwork, + localStore: BlockStore, + peerStore: PeerCtxStore, + pendingBlocks: PendingBlocksManager, + discovery: DiscoveryEngine, + engine: BlockExcEngine, + networkStore: NetworkStore] + proc generateNodes*( num: Natural, - blocks: openArray[bt.Block] = []): - seq[tuple[switch: Switch, blockexc: NetworkStore]] = + blocks: openArray[bt.Block] = []): seq[NodesComponents] = for i in 0.. 0): - let blk = Block.new(chunk).tryGet() + let blk = bt.Block.new(chunk).tryGet() manifest.add(blk.cid) check (await store.putBlock(blk)) @@ -84,7 +84,7 @@ suite "Erasure encode/decode": let chunk = await chunker.getBytes(); chunk.len > 0): - let blk = Block.new(chunk).tryGet() + let blk = bt.Block.new(chunk).tryGet() manifest.add(blk.cid) check (await store.putBlock(blk)) @@ -134,7 +134,7 @@ suite "Erasure encode/decode": let chunk = await chunker.getBytes(); chunk.len > 0): - let blk = Block.new(chunk).tryGet() + let blk = bt.Block.new(chunk).tryGet() manifest.add(blk.cid) check (await store.putBlock(blk)) @@ -187,7 +187,7 @@ suite "Erasure encode/decode": let chunk = await chunker.getBytes(); chunk.len > 0): - let blk = Block.new(chunk).tryGet() + let blk = bt.Block.new(chunk).tryGet() manifest.add(blk.cid) check (await store.putBlock(blk)) @@ -246,7 +246,7 @@ suite "Erasure encode/decode": let chunk = await chunker.getBytes(); chunk.len > 0): - let blk = Block.new(chunk).tryGet() + let blk = bt.Block.new(chunk).tryGet() manifest.add(blk.cid) check (await store.putBlock(blk)) @@ -287,7 +287,7 @@ suite "Erasure encode/decode": let chunk = await chunker.getBytes(); chunk.len > 0): - let blk = Block.new(chunk).tryGet() + let blk = bt.Block.new(chunk).tryGet() manifest.add(blk.cid) check (await store.putBlock(blk)) diff --git a/tests/dagger/testnode.nim b/tests/dagger/testnode.nim index 8c115351..b0038fdf 100644 --- a/tests/dagger/testnode.nim +++ b/tests/dagger/testnode.nim @@ -35,7 +35,10 @@ suite "Test Node": engine: BlockExcEngine store: NetworkStore node: DaggerNodeRef - discovery: Discovery + blockDiscovery: Discovery + peerStore: PeerCtxStore + pendingBlocks: PendingBlocksManager + discovery: DiscoveryEngine contracts: ?ContractInteractions setup: @@ -45,11 +48,14 @@ suite "Test Node": wallet = WalletRef.new(EthPrivateKey.random()) network = BlockExcNetwork.new(switch) localStore = CacheStore.new() - discovery = Discovery.new(switch.peerInfo, Port(0)) - engine = BlockExcEngine.new(localStore, wallet, network, discovery) + blockDiscovery = Discovery.new(switch.peerInfo, Port(0)) + peerStore = PeerCtxStore.new() + pendingBlocks = PendingBlocksManager.new() + discovery = DiscoveryEngine.new(localStore, peerStore, network, blockDiscovery, pendingBlocks) + engine = BlockExcEngine.new(localStore, wallet, network, discovery, peerStore, pendingBlocks) store = NetworkStore.new(engine, localStore) contracts = ContractInteractions.new() - node = DaggerNodeRef.new(switch, store, engine, nil, discovery, contracts) # TODO: pass `Erasure` + node = DaggerNodeRef.new(switch, store, engine, nil, blockDiscovery, contracts) # TODO: pass `Erasure` await node.start() diff --git a/tests/dagger/teststorestream.nim b/tests/dagger/teststorestream.nim index 9d2d41fb..3d457fee 100644 --- a/tests/dagger/teststorestream.nim +++ b/tests/dagger/teststorestream.nim @@ -9,6 +9,7 @@ import pkg/dagger/streams import pkg/dagger/stores import pkg/dagger/manifest import pkg/dagger/rng +import pkg/dagger/blocktype as bt suite "StoreStream": var @@ -37,7 +38,7 @@ suite "StoreStream": for d in data: let - blk = Block.new(d).tryGet() + blk = bt.Block.new(d).tryGet() manifest.add(blk.cid) if not (await store.putBlock(blk)):