From d4f3ebc867e2ade0dfffe479f442f69a5561e572 Mon Sep 17 00:00:00 2001 From: Dmitriy Ryajov Date: Thu, 19 May 2022 16:28:53 -0600 Subject: [PATCH] reorg engine directory structure (#104) --- codex/blockexchange/engine.nim | 479 +----------------- codex/blockexchange/engine/discovery.nim | 2 +- codex/blockexchange/engine/engine.nim | 479 ++++++++++++++++++ codex/blockexchange/engine/payments.nim | 9 + .../{ => engine}/pendingblocks.nim | 2 +- codex/blockexchange/network.nim | 377 +------------- codex/blockexchange/network/network.nim | 375 ++++++++++++++ .../{ => network}/networkpeer.nim | 2 +- codex/blockexchange/peers.nim | 3 +- .../blockexchange/{ => peers}/peercontext.nim | 15 +- codex/blockexchange/peers/peerctxstore.nim | 2 +- .../discovery/testdiscovery.nim | 0 .../discovery/testdiscoveryengine.nim | 0 .../engine}/testblockexc.nim | 4 +- .../engine}/testengine.nim | 4 +- .../engine/testpayments.nim | 0 .../protobuf/testpayments.nim | 0 .../protobuf/testpresence.nim | 0 tests/codex/blockexchange/testdiscovery.nim | 4 + tests/codex/blockexchange/testengine.nim | 5 + .../testnetwork.nim | 0 .../testpeerctxstore.nim | 0 tests/codex/blockexchange/testprotobuf.nim | 4 + tests/codex/testblockexc.nim | 14 +- 24 files changed, 909 insertions(+), 871 deletions(-) create mode 100644 codex/blockexchange/engine/engine.nim rename codex/blockexchange/{ => engine}/pendingblocks.nim (98%) create mode 100644 codex/blockexchange/network/network.nim rename codex/blockexchange/{ => network}/networkpeer.nim (98%) rename codex/blockexchange/{ => peers}/peercontext.nim (76%) rename tests/codex/{blockexc => blockexchange}/discovery/testdiscovery.nim (100%) rename tests/codex/{blockexc => blockexchange}/discovery/testdiscoveryengine.nim (100%) rename tests/codex/{blockexc => blockexchange/engine}/testblockexc.nim (99%) rename tests/codex/{blockexc => blockexchange/engine}/testengine.nim (99%) rename tests/codex/{blockexc => blockexchange}/engine/testpayments.nim (100%) rename tests/codex/{blockexc => blockexchange}/protobuf/testpayments.nim (100%) rename tests/codex/{blockexc => blockexchange}/protobuf/testpresence.nim (100%) create mode 100644 tests/codex/blockexchange/testdiscovery.nim create mode 100644 tests/codex/blockexchange/testengine.nim rename tests/codex/{blockexc => blockexchange}/testnetwork.nim (100%) rename tests/codex/{blockexc => blockexchange}/testpeerctxstore.nim (100%) create mode 100644 tests/codex/blockexchange/testprotobuf.nim diff --git a/codex/blockexchange/engine.nim b/codex/blockexchange/engine.nim index 6223ee13..a9cd9160 100644 --- a/codex/blockexchange/engine.nim +++ b/codex/blockexchange/engine.nim @@ -1,478 +1,5 @@ -## Nim-Codex -## Copyright (c) 2021 Status Research & Development GmbH -## Licensed under either of -## * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE)) -## * MIT license ([LICENSE-MIT](LICENSE-MIT)) -## at your option. -## This file may not be copied, modified, or distributed except according to -## those terms. - -import std/sequtils -import std/sets - -import pkg/chronos -import pkg/chronicles -import pkg/libp2p - -import ../stores/blockstore -import ../blocktype as bt -import ../utils - -import ./protobuf/blockexc -import ./protobuf/presence - -import ./network -import ./peers -import ./engine/payments import ./engine/discovery -import ./pendingblocks +import ./engine/engine +import ./engine/payments -export peers, pendingblocks, payments, discovery - -logScope: - topics = "codex blockexc engine" - -const - DefaultMaxPeersPerRequest* = 10 - DefaultTaskQueueSize = 100 - DefaultConcurrentTasks = 10 - DefaultMaxRetries = 3 - DefaultConcurrentDiscRequests = 10 - DefaultConcurrentAdvertRequests = 10 - DefaultDiscoveryTimeout = 1.minutes - DefaultMaxQueriedBlocksCache = 1000 - DefaultMinPeersPerBlock = 3 - -type - TaskHandler* = proc(task: BlockExcPeerCtx): Future[void] {.gcsafe.} - TaskScheduler* = proc(task: BlockExcPeerCtx): bool {.gcsafe.} - - BlockExcEngine* = ref object of RootObj - localStore*: BlockStore # Local block store for this instance - network*: BlockExcNetwork # Petwork interface - peers*: PeerCtxStore # Peers we're currently actively exchanging with - taskQueue*: AsyncHeapQueue[BlockExcPeerCtx] # Peers we're currently processing tasks for - concurrentTasks: int # Number of concurrent peers we're serving at any given time - blockexcTasks: seq[Future[void]] # Future to control blockexc task - blockexcRunning: bool # Indicates if the blockexc task is running - pendingBlocks*: PendingBlocksManager # Blocks we're awaiting to be resolved - peersPerRequest: int # Max number of peers to request from - wallet*: WalletRef # Nitro wallet for micropayments - pricing*: ?Pricing # Optional bandwidth pricing - discovery*: DiscoveryEngine - - Pricing* = object - address*: EthAddress - price*: UInt256 - -proc contains*(a: AsyncHeapQueue[Entry], b: Cid): bool = - ## Convenience method to check for entry prepense - ## - - a.anyIt( it.cid == b ) - -# attach task scheduler to engine -proc scheduleTask(b: BlockExcEngine, task: BlockExcPeerCtx): bool {.gcsafe} = - b.taskQueue.pushOrUpdateNoWait(task).isOk() - -proc blockexcTaskRunner(b: BlockExcEngine): Future[void] {.gcsafe.} - -proc start*(b: BlockExcEngine) {.async.} = - ## Start the blockexc task - ## - - await b.discovery.start() - - trace "Blockexc starting with concurrent tasks", tasks = b.concurrentTasks - if b.blockexcRunning: - warn "Starting blockexc twice" - return - - b.blockexcRunning = true - for i in 0.. 0: - b.network.request.sendWantList( - peer, - cids, - wantType = WantType.wantBlock) # we want this remote to send us a block - - # if none of the connected peers report our wants in their have list, - # fire up discovery - b.discovery.queueFindBlocksReq( - toSeq(b.pendingBlocks.wantList) - .filter do(cid: Cid) -> bool: - not b.peers.anyIt( cid in it.peerHave )) - -proc scheduleTasks(b: BlockExcEngine, blocks: seq[bt.Block]) = - trace "Schedule a task for new blocks" - - let - cids = blocks.mapIt( it.cid ) - - # schedule any new peers to provide blocks to - for p in b.peers: - for c in cids: # for each cid - # schedule a peer if it wants at least one - # cid and we have it in our local store - if c in p.peerWants and c in b.localStore: - if b.scheduleTask(p): - trace "Task scheduled for peer", peer = p.id - else: - trace "Unable to schedule task for peer", peer = p.id - - break # do next peer - -proc resolveBlocks*(b: BlockExcEngine, blocks: seq[bt.Block]) = - ## Resolve pending blocks from the pending blocks manager - ## and schedule any new task to be ran - ## - - trace "Resolving blocks", blocks = blocks.len - - b.pendingBlocks.resolve(blocks) - b.scheduleTasks(blocks) - b.discovery.queueProvideBlocksReq(blocks.mapIt( it.cid )) - -proc payForBlocks(engine: BlockExcEngine, - peer: BlockExcPeerCtx, - blocks: seq[bt.Block]) = - let sendPayment = engine.network.request.sendPayment - if sendPayment.isNil: - return - - let cids = blocks.mapIt(it.cid) - if payment =? engine.wallet.pay(peer, peer.price(cids)): - sendPayment(peer.id, payment) - -proc blocksHandler*( - b: BlockExcEngine, - peer: PeerID, - blocks: seq[bt.Block]) {.async.} = - ## handle incoming blocks - ## - - trace "Got blocks from peer", peer, len = blocks.len - for blk in blocks: - if not (await b.localStore.putBlock(blk)): - trace "Unable to store block", cid = blk.cid - continue - - b.resolveBlocks(blocks) - let peerCtx = b.peers.get(peer) - if peerCtx != nil: - b.payForBlocks(peerCtx, blocks) - -proc wantListHandler*( - b: BlockExcEngine, - peer: PeerID, - wantList: WantList) {.async.} = - ## Handle incoming want lists - ## - - trace "Got want list for peer", peer - let peerCtx = b.peers.get(peer) - if isNil(peerCtx): - return - - var dontHaves: seq[Cid] - let entries = wantList.entries - for e in entries: - let idx = peerCtx.peerWants.find(e) - if idx > -1: - # peer doesn't want this block anymore - if e.cancel: - peerCtx.peerWants.del(idx) - continue - - peerCtx.peerWants[idx] = e # update entry - else: - peerCtx.peerWants.add(e) - - trace "Added entry to peer's want list", peer = peerCtx.id, cid = $e.cid - - # peer might want to ask for the same cid with - # different want params - if e.sendDontHave and e.cid notin b.localStore: - dontHaves.add(e.cid) - - # send don't have's to remote - if dontHaves.len > 0: - b.network.request.sendPresence( - peer, - dontHaves.mapIt( - BlockPresence( - cid: it.data.buffer, - `type`: BlockPresenceType.presenceDontHave))) - - if not b.scheduleTask(peerCtx): - trace "Unable to schedule task for peer", peer - -proc accountHandler*( - engine: BlockExcEngine, - peer: PeerID, - account: Account) {.async.} = - let context = engine.peers.get(peer) - if context.isNil: - return - - context.account = account.some - -proc paymentHandler*( - engine: BlockExcEngine, - peer: PeerId, - payment: SignedState) {.async.} = - without context =? engine.peers.get(peer).option and - account =? context.account: - return - - if channel =? context.paymentChannel: - let sender = account.address - discard engine.wallet.acceptPayment(channel, Asset, sender, payment) - else: - context.paymentChannel = engine.wallet.acceptChannel(payment).option - -proc setupPeer*(b: BlockExcEngine, peer: PeerID) = - ## Perform initial setup, such as want - ## list exchange - ## - - trace "Setting up new peer", peer - if peer notin b.peers: - b.peers.add(BlockExcPeerCtx( - id: peer - )) - - # broadcast our want list, the other peer will do the same - if b.pendingBlocks.len > 0: - b.network.request.sendWantList(peer, toSeq(b.pendingBlocks.wantList), full = true) - - if address =? b.pricing.?address: - b.network.request.sendAccount(peer, Account(address: address)) - -proc dropPeer*(b: BlockExcEngine, peer: PeerID) = - ## Cleanup disconnected peer - ## - - trace "Dropping peer", peer - - # drop the peer from the peers table - b.peers.remove(peer) - -proc taskHandler*(b: BlockExcEngine, task: BlockExcPeerCtx) {.gcsafe, async.} = - trace "Handling task for peer", peer = task.id - - var wantsBlocks = newAsyncHeapQueue[Entry](queueType = QueueType.Max) - # get blocks and wants to send to the remote - for e in task.peerWants: - if e.wantType == WantType.wantBlock: - await wantsBlocks.push(e) - - # TODO: There should be all sorts of accounting of - # bytes sent/received here - if wantsBlocks.len > 0: - let blockFuts = await allFinished(wantsBlocks.mapIt( - b.localStore.getBlock(it.cid) - )) - - let blocks = blockFuts - .filterIt((not it.failed) and it.read.isOk) - .mapIt(!it.read) - - if blocks.len > 0: - trace "Sending blocks to peer", peer = task.id, blocks = blocks.len - b.network.request.sendBlocks( - task.id, - blocks) - - # Remove successfully sent blocks - task.peerWants.keepIf( - proc(e: Entry): bool = - not blocks.anyIt( it.cid == e.cid ) - ) - - var wants: seq[BlockPresence] - # do not remove wants from the queue unless - # we send the block or get a cancel - for e in task.peerWants: - if e.wantType == WantType.wantHave: - var presence = Presence(cid: e.cid) - presence.have = b.localStore.hasblock(presence.cid) - if presence.have and price =? b.pricing.?price: - presence.price = price - wants.add(BlockPresence.init(presence)) - - if wants.len > 0: - b.network.request.sendPresence(task.id, wants) - -proc blockexcTaskRunner(b: BlockExcEngine) {.async.} = - ## process tasks - ## - - trace "Starting blockexc task runner" - while b.blockexcRunning: - let - peerCtx = await b.taskQueue.pop() - - trace "Got new task from queue", peerId = peerCtx.id - await b.taskHandler(peerCtx) - - trace "Exiting blockexc task runner" - -proc new*( - T: type BlockExcEngine, - localStore: BlockStore, - wallet: WalletRef, - network: BlockExcNetwork, - discovery: DiscoveryEngine, - peerStore: PeerCtxStore, - pendingBlocks: PendingBlocksManager, - concurrentTasks = DefaultConcurrentTasks, - peersPerRequest = DefaultMaxPeersPerRequest): T = - - let - engine = BlockExcEngine( - localStore: localStore, - peers: peerStore, - pendingBlocks: pendingBlocks, - peersPerRequest: peersPerRequest, - network: network, - wallet: wallet, - concurrentTasks: concurrentTasks, - taskQueue: newAsyncHeapQueue[BlockExcPeerCtx](DefaultTaskQueueSize), - discovery: discovery) - - proc peerEventHandler(peerId: PeerID, event: PeerEvent) {.async.} = - if event.kind == PeerEventKind.Joined: - engine.setupPeer(peerId) - else: - engine.dropPeer(peerId) - - if not isNil(network.switch): - network.switch.addPeerEventHandler(peerEventHandler, PeerEventKind.Joined) - network.switch.addPeerEventHandler(peerEventHandler, PeerEventKind.Left) - - proc blockWantListHandler( - peer: PeerID, - wantList: WantList): Future[void] {.gcsafe.} = - engine.wantListHandler(peer, wantList) - - proc blockPresenceHandler( - peer: PeerID, - presence: seq[BlockPresence]): Future[void] {.gcsafe.} = - engine.blockPresenceHandler(peer, presence) - - proc blocksHandler( - peer: PeerID, - blocks: seq[bt.Block]): Future[void] {.gcsafe.} = - engine.blocksHandler(peer, blocks) - - proc accountHandler(peer: PeerId, account: Account): Future[void] {.gcsafe.} = - engine.accountHandler(peer, account) - - proc paymentHandler(peer: PeerId, payment: SignedState): Future[void] {.gcsafe.} = - engine.paymentHandler(peer, payment) - - network.handlers = BlockExcHandlers( - onWantList: blockWantListHandler, - onBlocks: blocksHandler, - onPresence: blockPresenceHandler, - onAccount: accountHandler, - onPayment: paymentHandler) - - return engine +export discovery, engine, payments diff --git a/codex/blockexchange/engine/discovery.nim b/codex/blockexchange/engine/discovery.nim index aeb88557..107084d8 100644 --- a/codex/blockexchange/engine/discovery.nim +++ b/codex/blockexchange/engine/discovery.nim @@ -22,7 +22,7 @@ import ../../utils import ../../discovery import ../../stores/blockstore -import ../pendingblocks +import ./pendingblocks logScope: topics = "codex discovery engine" diff --git a/codex/blockexchange/engine/engine.nim b/codex/blockexchange/engine/engine.nim new file mode 100644 index 00000000..69fda741 --- /dev/null +++ b/codex/blockexchange/engine/engine.nim @@ -0,0 +1,479 @@ +## Nim-Codex +## Copyright (c) 2021 Status Research & Development GmbH +## Licensed under either of +## * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE)) +## * MIT license ([LICENSE-MIT](LICENSE-MIT)) +## at your option. +## This file may not be copied, modified, or distributed except according to +## those terms. + +import std/sequtils +import std/sets + +import pkg/chronos +import pkg/chronicles +import pkg/libp2p + +import ../../stores/blockstore +import ../../blocktype as bt +import ../../utils + +import ../protobuf/blockexc +import ../protobuf/presence + +import ../network +import ../peers + +import ./payments +import ./discovery +import ./pendingblocks + +export peers, pendingblocks, payments, discovery + +logScope: + topics = "codex blockexc engine" + +const + DefaultMaxPeersPerRequest* = 10 + DefaultTaskQueueSize = 100 + DefaultConcurrentTasks = 10 + DefaultMaxRetries = 3 + DefaultConcurrentDiscRequests = 10 + DefaultConcurrentAdvertRequests = 10 + DefaultDiscoveryTimeout = 1.minutes + DefaultMaxQueriedBlocksCache = 1000 + DefaultMinPeersPerBlock = 3 + +type + TaskHandler* = proc(task: BlockExcPeerCtx): Future[void] {.gcsafe.} + TaskScheduler* = proc(task: BlockExcPeerCtx): bool {.gcsafe.} + + BlockExcEngine* = ref object of RootObj + localStore*: BlockStore # Local block store for this instance + network*: BlockExcNetwork # Petwork interface + peers*: PeerCtxStore # Peers we're currently actively exchanging with + taskQueue*: AsyncHeapQueue[BlockExcPeerCtx] # Peers we're currently processing tasks for + concurrentTasks: int # Number of concurrent peers we're serving at any given time + blockexcTasks: seq[Future[void]] # Future to control blockexc task + blockexcRunning: bool # Indicates if the blockexc task is running + pendingBlocks*: PendingBlocksManager # Blocks we're awaiting to be resolved + peersPerRequest: int # Max number of peers to request from + wallet*: WalletRef # Nitro wallet for micropayments + pricing*: ?Pricing # Optional bandwidth pricing + discovery*: DiscoveryEngine + + Pricing* = object + address*: EthAddress + price*: UInt256 + +proc contains*(a: AsyncHeapQueue[Entry], b: Cid): bool = + ## Convenience method to check for entry prepense + ## + + a.anyIt( it.cid == b ) + +# attach task scheduler to engine +proc scheduleTask(b: BlockExcEngine, task: BlockExcPeerCtx): bool {.gcsafe} = + b.taskQueue.pushOrUpdateNoWait(task).isOk() + +proc blockexcTaskRunner(b: BlockExcEngine): Future[void] {.gcsafe.} + +proc start*(b: BlockExcEngine) {.async.} = + ## Start the blockexc task + ## + + await b.discovery.start() + + trace "Blockexc starting with concurrent tasks", tasks = b.concurrentTasks + if b.blockexcRunning: + warn "Starting blockexc twice" + return + + b.blockexcRunning = true + for i in 0.. 0: + b.network.request.sendWantList( + peer, + cids, + wantType = WantType.wantBlock) # we want this remote to send us a block + + # if none of the connected peers report our wants in their have list, + # fire up discovery + b.discovery.queueFindBlocksReq( + toSeq(b.pendingBlocks.wantList) + .filter do(cid: Cid) -> bool: + not b.peers.anyIt( cid in it.peerHave )) + +proc scheduleTasks(b: BlockExcEngine, blocks: seq[bt.Block]) = + trace "Schedule a task for new blocks" + + let + cids = blocks.mapIt( it.cid ) + + # schedule any new peers to provide blocks to + for p in b.peers: + for c in cids: # for each cid + # schedule a peer if it wants at least one + # cid and we have it in our local store + if c in p.peerWants and c in b.localStore: + if b.scheduleTask(p): + trace "Task scheduled for peer", peer = p.id + else: + trace "Unable to schedule task for peer", peer = p.id + + break # do next peer + +proc resolveBlocks*(b: BlockExcEngine, blocks: seq[bt.Block]) = + ## Resolve pending blocks from the pending blocks manager + ## and schedule any new task to be ran + ## + + trace "Resolving blocks", blocks = blocks.len + + b.pendingBlocks.resolve(blocks) + b.scheduleTasks(blocks) + b.discovery.queueProvideBlocksReq(blocks.mapIt( it.cid )) + +proc payForBlocks(engine: BlockExcEngine, + peer: BlockExcPeerCtx, + blocks: seq[bt.Block]) = + let sendPayment = engine.network.request.sendPayment + if sendPayment.isNil: + return + + let cids = blocks.mapIt(it.cid) + if payment =? engine.wallet.pay(peer, peer.price(cids)): + sendPayment(peer.id, payment) + +proc blocksHandler*( + b: BlockExcEngine, + peer: PeerID, + blocks: seq[bt.Block]) {.async.} = + ## handle incoming blocks + ## + + trace "Got blocks from peer", peer, len = blocks.len + for blk in blocks: + if not (await b.localStore.putBlock(blk)): + trace "Unable to store block", cid = blk.cid + continue + + b.resolveBlocks(blocks) + let peerCtx = b.peers.get(peer) + if peerCtx != nil: + b.payForBlocks(peerCtx, blocks) + +proc wantListHandler*( + b: BlockExcEngine, + peer: PeerID, + wantList: WantList) {.async.} = + ## Handle incoming want lists + ## + + trace "Got want list for peer", peer + let peerCtx = b.peers.get(peer) + if isNil(peerCtx): + return + + var dontHaves: seq[Cid] + let entries = wantList.entries + for e in entries: + let idx = peerCtx.peerWants.find(e) + if idx > -1: + # peer doesn't want this block anymore + if e.cancel: + peerCtx.peerWants.del(idx) + continue + + peerCtx.peerWants[idx] = e # update entry + else: + peerCtx.peerWants.add(e) + + trace "Added entry to peer's want list", peer = peerCtx.id, cid = $e.cid + + # peer might want to ask for the same cid with + # different want params + if e.sendDontHave and e.cid notin b.localStore: + dontHaves.add(e.cid) + + # send don't have's to remote + if dontHaves.len > 0: + b.network.request.sendPresence( + peer, + dontHaves.mapIt( + BlockPresence( + cid: it.data.buffer, + `type`: BlockPresenceType.presenceDontHave))) + + if not b.scheduleTask(peerCtx): + trace "Unable to schedule task for peer", peer + +proc accountHandler*( + engine: BlockExcEngine, + peer: PeerID, + account: Account) {.async.} = + let context = engine.peers.get(peer) + if context.isNil: + return + + context.account = account.some + +proc paymentHandler*( + engine: BlockExcEngine, + peer: PeerId, + payment: SignedState) {.async.} = + without context =? engine.peers.get(peer).option and + account =? context.account: + return + + if channel =? context.paymentChannel: + let sender = account.address + discard engine.wallet.acceptPayment(channel, Asset, sender, payment) + else: + context.paymentChannel = engine.wallet.acceptChannel(payment).option + +proc setupPeer*(b: BlockExcEngine, peer: PeerID) = + ## Perform initial setup, such as want + ## list exchange + ## + + trace "Setting up new peer", peer + if peer notin b.peers: + b.peers.add(BlockExcPeerCtx( + id: peer + )) + + # broadcast our want list, the other peer will do the same + if b.pendingBlocks.len > 0: + b.network.request.sendWantList(peer, toSeq(b.pendingBlocks.wantList), full = true) + + if address =? b.pricing.?address: + b.network.request.sendAccount(peer, Account(address: address)) + +proc dropPeer*(b: BlockExcEngine, peer: PeerID) = + ## Cleanup disconnected peer + ## + + trace "Dropping peer", peer + + # drop the peer from the peers table + b.peers.remove(peer) + +proc taskHandler*(b: BlockExcEngine, task: BlockExcPeerCtx) {.gcsafe, async.} = + trace "Handling task for peer", peer = task.id + + var wantsBlocks = newAsyncHeapQueue[Entry](queueType = QueueType.Max) + # get blocks and wants to send to the remote + for e in task.peerWants: + if e.wantType == WantType.wantBlock: + await wantsBlocks.push(e) + + # TODO: There should be all sorts of accounting of + # bytes sent/received here + if wantsBlocks.len > 0: + let blockFuts = await allFinished(wantsBlocks.mapIt( + b.localStore.getBlock(it.cid) + )) + + let blocks = blockFuts + .filterIt((not it.failed) and it.read.isOk) + .mapIt(!it.read) + + if blocks.len > 0: + trace "Sending blocks to peer", peer = task.id, blocks = blocks.len + b.network.request.sendBlocks( + task.id, + blocks) + + # Remove successfully sent blocks + task.peerWants.keepIf( + proc(e: Entry): bool = + not blocks.anyIt( it.cid == e.cid ) + ) + + var wants: seq[BlockPresence] + # do not remove wants from the queue unless + # we send the block or get a cancel + for e in task.peerWants: + if e.wantType == WantType.wantHave: + var presence = Presence(cid: e.cid) + presence.have = b.localStore.hasblock(presence.cid) + if presence.have and price =? b.pricing.?price: + presence.price = price + wants.add(BlockPresence.init(presence)) + + if wants.len > 0: + b.network.request.sendPresence(task.id, wants) + +proc blockexcTaskRunner(b: BlockExcEngine) {.async.} = + ## process tasks + ## + + trace "Starting blockexc task runner" + while b.blockexcRunning: + let + peerCtx = await b.taskQueue.pop() + + trace "Got new task from queue", peerId = peerCtx.id + await b.taskHandler(peerCtx) + + trace "Exiting blockexc task runner" + +proc new*( + T: type BlockExcEngine, + localStore: BlockStore, + wallet: WalletRef, + network: BlockExcNetwork, + discovery: DiscoveryEngine, + peerStore: PeerCtxStore, + pendingBlocks: PendingBlocksManager, + concurrentTasks = DefaultConcurrentTasks, + peersPerRequest = DefaultMaxPeersPerRequest): T = + + let + engine = BlockExcEngine( + localStore: localStore, + peers: peerStore, + pendingBlocks: pendingBlocks, + peersPerRequest: peersPerRequest, + network: network, + wallet: wallet, + concurrentTasks: concurrentTasks, + taskQueue: newAsyncHeapQueue[BlockExcPeerCtx](DefaultTaskQueueSize), + discovery: discovery) + + proc peerEventHandler(peerId: PeerID, event: PeerEvent) {.async.} = + if event.kind == PeerEventKind.Joined: + engine.setupPeer(peerId) + else: + engine.dropPeer(peerId) + + if not isNil(network.switch): + network.switch.addPeerEventHandler(peerEventHandler, PeerEventKind.Joined) + network.switch.addPeerEventHandler(peerEventHandler, PeerEventKind.Left) + + proc blockWantListHandler( + peer: PeerID, + wantList: WantList): Future[void] {.gcsafe.} = + engine.wantListHandler(peer, wantList) + + proc blockPresenceHandler( + peer: PeerID, + presence: seq[BlockPresence]): Future[void] {.gcsafe.} = + engine.blockPresenceHandler(peer, presence) + + proc blocksHandler( + peer: PeerID, + blocks: seq[bt.Block]): Future[void] {.gcsafe.} = + engine.blocksHandler(peer, blocks) + + proc accountHandler(peer: PeerId, account: Account): Future[void] {.gcsafe.} = + engine.accountHandler(peer, account) + + proc paymentHandler(peer: PeerId, payment: SignedState): Future[void] {.gcsafe.} = + engine.paymentHandler(peer, payment) + + network.handlers = BlockExcHandlers( + onWantList: blockWantListHandler, + onBlocks: blocksHandler, + onPresence: blockPresenceHandler, + onAccount: accountHandler, + onPayment: paymentHandler) + + return engine diff --git a/codex/blockexchange/engine/payments.nim b/codex/blockexchange/engine/payments.nim index 15079086..35d38e29 100644 --- a/codex/blockexchange/engine/payments.nim +++ b/codex/blockexchange/engine/payments.nim @@ -1,3 +1,12 @@ +## Nim-Codex +## Copyright (c) 2021 Status Research & Development GmbH +## Licensed under either of +## * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE)) +## * MIT license ([LICENSE-MIT](LICENSE-MIT)) +## at your option. +## This file may not be copied, modified, or distributed except according to +## those terms. + import std/math import pkg/nitro import pkg/questionable/results diff --git a/codex/blockexchange/pendingblocks.nim b/codex/blockexchange/engine/pendingblocks.nim similarity index 98% rename from codex/blockexchange/pendingblocks.nim rename to codex/blockexchange/engine/pendingblocks.nim index 5539734f..c24c7519 100644 --- a/codex/blockexchange/pendingblocks.nim +++ b/codex/blockexchange/engine/pendingblocks.nim @@ -18,7 +18,7 @@ import pkg/chronicles import pkg/chronos import pkg/libp2p -import ../blocktype +import ../../blocktype logScope: topics = "codex blockexc pendingblocks" diff --git a/codex/blockexchange/network.nim b/codex/blockexchange/network.nim index 0c14e51c..ccaa6207 100644 --- a/codex/blockexchange/network.nim +++ b/codex/blockexchange/network.nim @@ -1,375 +1,4 @@ -## Nim-Codex -## Copyright (c) 2021 Status Research & Development GmbH -## Licensed under either of -## * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE)) -## * MIT license ([LICENSE-MIT](LICENSE-MIT)) -## at your option. -## This file may not be copied, modified, or distributed except according to -## those terms. +import ./network/network +import ./network/networkpeer -import std/tables -import std/sequtils - -import pkg/chronicles -import pkg/chronos - -import pkg/libp2p -import pkg/questionable -import pkg/questionable/results - -import ../blocktype as bt -import ./protobuf/blockexc as pb -import ./protobuf/payments - -import ./networkpeer - -export networkpeer, payments - -logScope: - topics = "codex blockexc network" - -const Codec* = "/codex/blockexc/1.0.0" - -type - WantListHandler* = proc(peer: PeerID, wantList: WantList): Future[void] {.gcsafe.} - BlocksHandler* = proc(peer: PeerID, blocks: seq[bt.Block]): Future[void] {.gcsafe.} - BlockPresenceHandler* = proc(peer: PeerID, precense: seq[BlockPresence]): Future[void] {.gcsafe.} - AccountHandler* = proc(peer: PeerID, account: Account): Future[void] {.gcsafe.} - PaymentHandler* = proc(peer: PeerID, payment: SignedState): Future[void] {.gcsafe.} - - BlockExcHandlers* = object - onWantList*: WantListHandler - onBlocks*: BlocksHandler - onPresence*: BlockPresenceHandler - onAccount*: AccountHandler - onPayment*: PaymentHandler - - WantListBroadcaster* = proc( - id: PeerID, - cids: seq[Cid], - priority: int32 = 0, - cancel: bool = false, - wantType: WantType = WantType.wantHave, - full: bool = false, - sendDontHave: bool = false) {.gcsafe.} - - BlocksBroadcaster* = proc(peer: PeerID, presence: seq[bt.Block]) {.gcsafe.} - PresenceBroadcaster* = proc(peer: PeerID, presence: seq[BlockPresence]) {.gcsafe.} - AccountBroadcaster* = proc(peer: PeerID, account: Account) {.gcsafe.} - PaymentBroadcaster* = proc(peer: PeerID, payment: SignedState) {.gcsafe.} - - BlockExcRequest* = object - sendWantList*: WantListBroadcaster - sendBlocks*: BlocksBroadcaster - sendPresence*: PresenceBroadcaster - sendAccount*: AccountBroadcaster - sendPayment*: PaymentBroadcaster - - BlockExcNetwork* = ref object of LPProtocol - peers*: Table[PeerID, NetworkPeer] - switch*: Switch - handlers*: BlockExcHandlers - request*: BlockExcRequest - getConn: ConnProvider - -proc handleWantList( - b: BlockExcNetwork, - peer: NetworkPeer, - list: WantList): Future[void] = - ## Handle incoming want list - ## - - if isNil(b.handlers.onWantList): - return - - trace "Handling want list for peer", peer = peer.id - b.handlers.onWantList(peer.id, list) - -# TODO: make into a template -proc makeWantList*( - cids: seq[Cid], - priority: int = 0, - cancel: bool = false, - wantType: WantType = WantType.wantHave, - full: bool = false, - sendDontHave: bool = false): WantList = - var entries: seq[Entry] - for cid in cids: - entries.add(Entry( - `block`: cid.data.buffer, - priority: priority.int32, - cancel: cancel, - wantType: wantType, - sendDontHave: sendDontHave)) - - WantList(entries: entries, full: full) - -proc broadcastWantList*( - b: BlockExcNetwork, - id: PeerID, - cids: seq[Cid], - priority: int32 = 0, - cancel: bool = false, - wantType: WantType = WantType.wantHave, - full: bool = false, - sendDontHave: bool = false) = - ## send a want message to peer - ## - - if id notin b.peers: - return - - trace "Sending want list to peer", peer = id, `type` = $wantType, len = cids.len - - let - wantList = makeWantList( - cids, - priority, - cancel, - wantType, - full, - sendDontHave) - b.peers.withValue(id, peer): - peer[].broadcast(Message(wantlist: wantList)) - -proc handleBlocks( - b: BlockExcNetwork, - peer: NetworkPeer, - blocks: seq[pb.Block]): Future[void] = - ## Handle incoming blocks - ## - - if isNil(b.handlers.onBlocks): - return - - trace "Handling blocks for peer", peer = peer.id - - var blks: seq[bt.Block] - for blob in blocks: - without cid =? Cid.init(blob.prefix): - trace "Unable to initialize Cid from protobuf message" - - without blk =? bt.Block.new(cid, blob.data, verify = true): - trace "Unable to initialize Block from data" - - blks.add(blk) - - b.handlers.onBlocks(peer.id, blks) - -template makeBlocks*(blocks: seq[bt.Block]): seq[pb.Block] = - var blks: seq[pb.Block] - for blk in blocks: - blks.add(pb.Block( - prefix: blk.cid.data.buffer, - data: blk.data - )) - - blks - -proc broadcastBlocks*( - b: BlockExcNetwork, - id: PeerID, - blocks: seq[bt.Block]) = - ## Send blocks to remote - ## - - if id notin b.peers: - trace "Unable to send blocks, peer disconnected", peer = id - return - - b.peers.withValue(id, peer): - trace "Sending blocks to peer", peer = id, len = blocks.len - peer[].broadcast(pb.Message(payload: makeBlocks(blocks))) - -proc handleBlockPresence( - b: BlockExcNetwork, - peer: NetworkPeer, - presence: seq[BlockPresence]): Future[void] = - ## Handle block presence - ## - - if isNil(b.handlers.onPresence): - return - - trace "Handling block presence for peer", peer = peer.id - b.handlers.onPresence(peer.id, presence) - -proc broadcastBlockPresence*( - b: BlockExcNetwork, - id: PeerID, - presence: seq[BlockPresence]) = - ## Send presence to remote - ## - - if id notin b.peers: - return - - trace "Sending presence to peer", peer = id - b.peers.withValue(id, peer): - peer[].broadcast(Message(blockPresences: @presence)) - -proc handleAccount(network: BlockExcNetwork, - peer: NetworkPeer, - account: Account): Future[void] = - if network.handlers.onAccount.isNil: - return - network.handlers.onAccount(peer.id, account) - -proc broadcastAccount*(network: BlockExcNetwork, - id: PeerId, - account: Account) = - if id notin network.peers: - return - - let message = Message(account: AccountMessage.init(account)) - network.peers.withValue(id, peer): - peer[].broadcast(message) - -proc broadcastPayment*(network: BlockExcNetwork, - id: PeerId, - payment: SignedState) = - if id notin network.peers: - return - - let message = Message(payment: StateChannelUpdate.init(payment)) - network.peers.withValue(id, peer): - peer[].broadcast(message) - -proc handlePayment(network: BlockExcNetwork, - peer: NetworkPeer, - payment: SignedState): Future[void] = - if network.handlers.onPayment.isNil: - return - network.handlers.onPayment(peer.id, payment) - -proc rpcHandler(b: BlockExcNetwork, peer: NetworkPeer, msg: Message) {.async.} = - try: - if msg.wantlist.entries.len > 0: - asyncSpawn b.handleWantList(peer, msg.wantlist) - - if msg.payload.len > 0: - asyncSpawn b.handleBlocks(peer, msg.payload) - - if msg.blockPresences.len > 0: - asyncSpawn b.handleBlockPresence(peer, msg.blockPresences) - - if account =? Account.init(msg.account): - asyncSpawn b.handleAccount(peer, account) - - if payment =? SignedState.init(msg.payment): - asyncSpawn b.handlePayment(peer, payment) - - except CatchableError as exc: - trace "Exception in blockexc rpc handler", exc = exc.msg - -proc getOrCreatePeer(b: BlockExcNetwork, peer: PeerID): NetworkPeer = - ## Creates or retrieves a BlockExcNetwork Peer - ## - - if peer in b.peers: - return b.peers.getOrDefault(peer, nil) - - var getConn = proc(): Future[Connection] {.async.} = - try: - return await b.switch.dial(peer, Codec) - except CatchableError as exc: - trace "Unable to connect to blockexc peer", exc = exc.msg - - if not isNil(b.getConn): - getConn = b.getConn - - let rpcHandler = proc (p: NetworkPeer, msg: Message): Future[void] = - b.rpcHandler(p, msg) - - # create new pubsub peer - let blockExcPeer = NetworkPeer.new(peer, getConn, rpcHandler) - debug "Created new blockexc peer", peer - - b.peers[peer] = blockExcPeer - - return blockExcPeer - -proc setupPeer*(b: BlockExcNetwork, peer: PeerID) = - ## Perform initial setup, such as want - ## list exchange - ## - - discard b.getOrCreatePeer(peer) - -proc dialPeer*(b: BlockExcNetwork, peer: PeerRecord) {.async.} = - try: - await b.switch.connect(peer.peerId, peer.addresses.mapIt(it.address)) - except CatchableError as exc: - debug "Failed to connect to peer", error = exc.msg, peer - -proc dropPeer*(b: BlockExcNetwork, peer: PeerID) = - ## Cleanup disconnected peer - ## - - b.peers.del(peer) - -method init*(b: BlockExcNetwork) = - ## Perform protocol initialization - ## - - proc peerEventHandler(peerId: PeerID, event: PeerEvent) {.async.} = - if event.kind == PeerEventKind.Joined: - b.setupPeer(peerId) - else: - b.dropPeer(peerId) - - b.switch.addPeerEventHandler(peerEventHandler, PeerEventKind.Joined) - b.switch.addPeerEventHandler(peerEventHandler, PeerEventKind.Left) - - proc handle(conn: Connection, proto: string) {.async, gcsafe, closure.} = - let peerId = conn.peerId - let blockexcPeer = b.getOrCreatePeer(peerId) - await blockexcPeer.readLoop(conn) # attach read loop - - b.handler = handle - b.codec = Codec - -proc new*( - T: type BlockExcNetwork, - switch: Switch, - connProvider: ConnProvider = nil): T = - ## Create a new BlockExcNetwork instance - ## - - let b = BlockExcNetwork( - switch: switch, - getConn: connProvider) - - proc sendWantList( - id: PeerID, - cids: seq[Cid], - priority: int32 = 0, - cancel: bool = false, - wantType: WantType = WantType.wantHave, - full: bool = false, - sendDontHave: bool = false) {.gcsafe.} = - b.broadcastWantList( - id, cids, priority, cancel, - wantType, full, sendDontHave) - - proc sendBlocks(id: PeerID, blocks: seq[bt.Block]) {.gcsafe.} = - b.broadcastBlocks(id, blocks) - - proc sendPresence(id: PeerID, presence: seq[BlockPresence]) {.gcsafe.} = - b.broadcastBlockPresence(id, presence) - - proc sendAccount(id: PeerID, account: Account) = - b.broadcastAccount(id, account) - - proc sendPayment(id: PeerID, payment: SignedState) = - b.broadcastPayment(id, payment) - - b.request = BlockExcRequest( - sendWantList: sendWantList, - sendBlocks: sendBlocks, - sendPresence: sendPresence, - sendAccount: sendAccount, - sendPayment: sendPayment) - - b.init() - return b +export network, networkpeer diff --git a/codex/blockexchange/network/network.nim b/codex/blockexchange/network/network.nim new file mode 100644 index 00000000..d50bfd84 --- /dev/null +++ b/codex/blockexchange/network/network.nim @@ -0,0 +1,375 @@ +## Nim-Codex +## Copyright (c) 2021 Status Research & Development GmbH +## Licensed under either of +## * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE)) +## * MIT license ([LICENSE-MIT](LICENSE-MIT)) +## at your option. +## This file may not be copied, modified, or distributed except according to +## those terms. + +import std/tables +import std/sequtils + +import pkg/chronicles +import pkg/chronos + +import pkg/libp2p +import pkg/questionable +import pkg/questionable/results + +import ../../blocktype as bt +import ../protobuf/blockexc as pb +import ../protobuf/payments + +import ./networkpeer + +export network, payments + +logScope: + topics = "codex blockexc network" + +const Codec* = "/codex/blockexc/1.0.0" + +type + WantListHandler* = proc(peer: PeerID, wantList: WantList): Future[void] {.gcsafe.} + BlocksHandler* = proc(peer: PeerID, blocks: seq[bt.Block]): Future[void] {.gcsafe.} + BlockPresenceHandler* = proc(peer: PeerID, precense: seq[BlockPresence]): Future[void] {.gcsafe.} + AccountHandler* = proc(peer: PeerID, account: Account): Future[void] {.gcsafe.} + PaymentHandler* = proc(peer: PeerID, payment: SignedState): Future[void] {.gcsafe.} + + BlockExcHandlers* = object + onWantList*: WantListHandler + onBlocks*: BlocksHandler + onPresence*: BlockPresenceHandler + onAccount*: AccountHandler + onPayment*: PaymentHandler + + WantListBroadcaster* = proc( + id: PeerID, + cids: seq[Cid], + priority: int32 = 0, + cancel: bool = false, + wantType: WantType = WantType.wantHave, + full: bool = false, + sendDontHave: bool = false) {.gcsafe.} + + BlocksBroadcaster* = proc(peer: PeerID, presence: seq[bt.Block]) {.gcsafe.} + PresenceBroadcaster* = proc(peer: PeerID, presence: seq[BlockPresence]) {.gcsafe.} + AccountBroadcaster* = proc(peer: PeerID, account: Account) {.gcsafe.} + PaymentBroadcaster* = proc(peer: PeerID, payment: SignedState) {.gcsafe.} + + BlockExcRequest* = object + sendWantList*: WantListBroadcaster + sendBlocks*: BlocksBroadcaster + sendPresence*: PresenceBroadcaster + sendAccount*: AccountBroadcaster + sendPayment*: PaymentBroadcaster + + BlockExcNetwork* = ref object of LPProtocol + peers*: Table[PeerID, NetworkPeer] + switch*: Switch + handlers*: BlockExcHandlers + request*: BlockExcRequest + getConn: ConnProvider + +proc handleWantList( + b: BlockExcNetwork, + peer: NetworkPeer, + list: WantList): Future[void] = + ## Handle incoming want list + ## + + if isNil(b.handlers.onWantList): + return + + trace "Handling want list for peer", peer = peer.id + b.handlers.onWantList(peer.id, list) + +# TODO: make into a template +proc makeWantList*( + cids: seq[Cid], + priority: int = 0, + cancel: bool = false, + wantType: WantType = WantType.wantHave, + full: bool = false, + sendDontHave: bool = false): WantList = + var entries: seq[Entry] + for cid in cids: + entries.add(Entry( + `block`: cid.data.buffer, + priority: priority.int32, + cancel: cancel, + wantType: wantType, + sendDontHave: sendDontHave)) + + WantList(entries: entries, full: full) + +proc broadcastWantList*( + b: BlockExcNetwork, + id: PeerID, + cids: seq[Cid], + priority: int32 = 0, + cancel: bool = false, + wantType: WantType = WantType.wantHave, + full: bool = false, + sendDontHave: bool = false) = + ## send a want message to peer + ## + + if id notin b.peers: + return + + trace "Sending want list to peer", peer = id, `type` = $wantType, len = cids.len + + let + wantList = makeWantList( + cids, + priority, + cancel, + wantType, + full, + sendDontHave) + b.peers.withValue(id, peer): + peer[].broadcast(Message(wantlist: wantList)) + +proc handleBlocks( + b: BlockExcNetwork, + peer: NetworkPeer, + blocks: seq[pb.Block]): Future[void] = + ## Handle incoming blocks + ## + + if isNil(b.handlers.onBlocks): + return + + trace "Handling blocks for peer", peer = peer.id + + var blks: seq[bt.Block] + for blob in blocks: + without cid =? Cid.init(blob.prefix): + trace "Unable to initialize Cid from protobuf message" + + without blk =? bt.Block.new(cid, blob.data, verify = true): + trace "Unable to initialize Block from data" + + blks.add(blk) + + b.handlers.onBlocks(peer.id, blks) + +template makeBlocks*(blocks: seq[bt.Block]): seq[pb.Block] = + var blks: seq[pb.Block] + for blk in blocks: + blks.add(pb.Block( + prefix: blk.cid.data.buffer, + data: blk.data + )) + + blks + +proc broadcastBlocks*( + b: BlockExcNetwork, + id: PeerID, + blocks: seq[bt.Block]) = + ## Send blocks to remote + ## + + if id notin b.peers: + trace "Unable to send blocks, peer disconnected", peer = id + return + + b.peers.withValue(id, peer): + trace "Sending blocks to peer", peer = id, len = blocks.len + peer[].broadcast(pb.Message(payload: makeBlocks(blocks))) + +proc handleBlockPresence( + b: BlockExcNetwork, + peer: NetworkPeer, + presence: seq[BlockPresence]): Future[void] = + ## Handle block presence + ## + + if isNil(b.handlers.onPresence): + return + + trace "Handling block presence for peer", peer = peer.id + b.handlers.onPresence(peer.id, presence) + +proc broadcastBlockPresence*( + b: BlockExcNetwork, + id: PeerID, + presence: seq[BlockPresence]) = + ## Send presence to remote + ## + + if id notin b.peers: + return + + trace "Sending presence to peer", peer = id + b.peers.withValue(id, peer): + peer[].broadcast(Message(blockPresences: @presence)) + +proc handleAccount(network: BlockExcNetwork, + peer: NetworkPeer, + account: Account): Future[void] = + if network.handlers.onAccount.isNil: + return + network.handlers.onAccount(peer.id, account) + +proc broadcastAccount*(network: BlockExcNetwork, + id: PeerId, + account: Account) = + if id notin network.peers: + return + + let message = Message(account: AccountMessage.init(account)) + network.peers.withValue(id, peer): + peer[].broadcast(message) + +proc broadcastPayment*(network: BlockExcNetwork, + id: PeerId, + payment: SignedState) = + if id notin network.peers: + return + + let message = Message(payment: StateChannelUpdate.init(payment)) + network.peers.withValue(id, peer): + peer[].broadcast(message) + +proc handlePayment(network: BlockExcNetwork, + peer: NetworkPeer, + payment: SignedState): Future[void] = + if network.handlers.onPayment.isNil: + return + network.handlers.onPayment(peer.id, payment) + +proc rpcHandler(b: BlockExcNetwork, peer: NetworkPeer, msg: Message) {.async.} = + try: + if msg.wantlist.entries.len > 0: + asyncSpawn b.handleWantList(peer, msg.wantlist) + + if msg.payload.len > 0: + asyncSpawn b.handleBlocks(peer, msg.payload) + + if msg.blockPresences.len > 0: + asyncSpawn b.handleBlockPresence(peer, msg.blockPresences) + + if account =? Account.init(msg.account): + asyncSpawn b.handleAccount(peer, account) + + if payment =? SignedState.init(msg.payment): + asyncSpawn b.handlePayment(peer, payment) + + except CatchableError as exc: + trace "Exception in blockexc rpc handler", exc = exc.msg + +proc getOrCreatePeer(b: BlockExcNetwork, peer: PeerID): NetworkPeer = + ## Creates or retrieves a BlockExcNetwork Peer + ## + + if peer in b.peers: + return b.peers.getOrDefault(peer, nil) + + var getConn = proc(): Future[Connection] {.async.} = + try: + return await b.switch.dial(peer, Codec) + except CatchableError as exc: + trace "Unable to connect to blockexc peer", exc = exc.msg + + if not isNil(b.getConn): + getConn = b.getConn + + let rpcHandler = proc (p: NetworkPeer, msg: Message): Future[void] = + b.rpcHandler(p, msg) + + # create new pubsub peer + let blockExcPeer = NetworkPeer.new(peer, getConn, rpcHandler) + debug "Created new blockexc peer", peer + + b.peers[peer] = blockExcPeer + + return blockExcPeer + +proc setupPeer*(b: BlockExcNetwork, peer: PeerID) = + ## Perform initial setup, such as want + ## list exchange + ## + + discard b.getOrCreatePeer(peer) + +proc dialPeer*(b: BlockExcNetwork, peer: PeerRecord) {.async.} = + try: + await b.switch.connect(peer.peerId, peer.addresses.mapIt(it.address)) + except CatchableError as exc: + debug "Failed to connect to peer", error = exc.msg, peer + +proc dropPeer*(b: BlockExcNetwork, peer: PeerID) = + ## Cleanup disconnected peer + ## + + b.peers.del(peer) + +method init*(b: BlockExcNetwork) = + ## Perform protocol initialization + ## + + proc peerEventHandler(peerId: PeerID, event: PeerEvent) {.async.} = + if event.kind == PeerEventKind.Joined: + b.setupPeer(peerId) + else: + b.dropPeer(peerId) + + b.switch.addPeerEventHandler(peerEventHandler, PeerEventKind.Joined) + b.switch.addPeerEventHandler(peerEventHandler, PeerEventKind.Left) + + proc handle(conn: Connection, proto: string) {.async, gcsafe, closure.} = + let peerId = conn.peerId + let blockexcPeer = b.getOrCreatePeer(peerId) + await blockexcPeer.readLoop(conn) # attach read loop + + b.handler = handle + b.codec = Codec + +proc new*( + T: type BlockExcNetwork, + switch: Switch, + connProvider: ConnProvider = nil): T = + ## Create a new BlockExcNetwork instance + ## + + let b = BlockExcNetwork( + switch: switch, + getConn: connProvider) + + proc sendWantList( + id: PeerID, + cids: seq[Cid], + priority: int32 = 0, + cancel: bool = false, + wantType: WantType = WantType.wantHave, + full: bool = false, + sendDontHave: bool = false) {.gcsafe.} = + b.broadcastWantList( + id, cids, priority, cancel, + wantType, full, sendDontHave) + + proc sendBlocks(id: PeerID, blocks: seq[bt.Block]) {.gcsafe.} = + b.broadcastBlocks(id, blocks) + + proc sendPresence(id: PeerID, presence: seq[BlockPresence]) {.gcsafe.} = + b.broadcastBlockPresence(id, presence) + + proc sendAccount(id: PeerID, account: Account) = + b.broadcastAccount(id, account) + + proc sendPayment(id: PeerID, payment: SignedState) = + b.broadcastPayment(id, payment) + + b.request = BlockExcRequest( + sendWantList: sendWantList, + sendBlocks: sendBlocks, + sendPresence: sendPresence, + sendAccount: sendAccount, + sendPayment: sendPayment) + + b.init() + return b diff --git a/codex/blockexchange/networkpeer.nim b/codex/blockexchange/network/networkpeer.nim similarity index 98% rename from codex/blockexchange/networkpeer.nim rename to codex/blockexchange/network/networkpeer.nim index 1c44c3c2..964a8586 100644 --- a/codex/blockexchange/networkpeer.nim +++ b/codex/blockexchange/network/networkpeer.nim @@ -12,7 +12,7 @@ import pkg/chronicles import pkg/protobuf_serialization import pkg/libp2p -import ./protobuf/blockexc +import ../protobuf/blockexc logScope: topics = "codex blockexc networkpeer" diff --git a/codex/blockexchange/peers.nim b/codex/blockexchange/peers.nim index e159806a..940f66e4 100644 --- a/codex/blockexchange/peers.nim +++ b/codex/blockexchange/peers.nim @@ -1,3 +1,4 @@ import ./peers/peerctxstore +import ./peers/peercontext -export peerctxstore +export peerctxstore, peercontext diff --git a/codex/blockexchange/peercontext.nim b/codex/blockexchange/peers/peercontext.nim similarity index 76% rename from codex/blockexchange/peercontext.nim rename to codex/blockexchange/peers/peercontext.nim index 0d303703..9aab35b2 100644 --- a/codex/blockexchange/peercontext.nim +++ b/codex/blockexchange/peers/peercontext.nim @@ -1,3 +1,12 @@ +## Nim-Codex +## Copyright (c) 2021 Status Research & Development GmbH +## Licensed under either of +## * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE)) +## * MIT license ([LICENSE-MIT](LICENSE-MIT)) +## at your option. +## This file may not be copied, modified, or distributed except according to +## those terms. + import std/sequtils import std/tables import pkg/libp2p @@ -5,9 +14,9 @@ import pkg/chronos import pkg/nitro import pkg/questionable -import ./protobuf/blockexc -import ./protobuf/payments -import ./protobuf/presence +import ../protobuf/blockexc +import ../protobuf/payments +import ../protobuf/presence export payments, nitro diff --git a/codex/blockexchange/peers/peerctxstore.nim b/codex/blockexchange/peers/peerctxstore.nim index 77ad6f60..442dd206 100644 --- a/codex/blockexchange/peers/peerctxstore.nim +++ b/codex/blockexchange/peers/peerctxstore.nim @@ -21,7 +21,7 @@ import pkg/libp2p import ../protobuf/blockexc -import ../peercontext +import ./peercontext export peercontext logScope: diff --git a/tests/codex/blockexc/discovery/testdiscovery.nim b/tests/codex/blockexchange/discovery/testdiscovery.nim similarity index 100% rename from tests/codex/blockexc/discovery/testdiscovery.nim rename to tests/codex/blockexchange/discovery/testdiscovery.nim diff --git a/tests/codex/blockexc/discovery/testdiscoveryengine.nim b/tests/codex/blockexchange/discovery/testdiscoveryengine.nim similarity index 100% rename from tests/codex/blockexc/discovery/testdiscoveryengine.nim rename to tests/codex/blockexchange/discovery/testdiscoveryengine.nim diff --git a/tests/codex/blockexc/testblockexc.nim b/tests/codex/blockexchange/engine/testblockexc.nim similarity index 99% rename from tests/codex/blockexc/testblockexc.nim rename to tests/codex/blockexchange/engine/testblockexc.nim index 351dfe0c..6c2d6557 100644 --- a/tests/codex/blockexc/testblockexc.nim +++ b/tests/codex/blockexchange/engine/testblockexc.nim @@ -15,8 +15,8 @@ import pkg/codex/chunker import pkg/codex/discovery import pkg/codex/blocktype as bt -import ../helpers -import ../examples +import ../../helpers +import ../../examples suite "NetworkStore engine - 2 nodes": let diff --git a/tests/codex/blockexc/testengine.nim b/tests/codex/blockexchange/engine/testengine.nim similarity index 99% rename from tests/codex/blockexc/testengine.nim rename to tests/codex/blockexchange/engine/testengine.nim index eb128327..e6193e70 100644 --- a/tests/codex/blockexc/testengine.nim +++ b/tests/codex/blockexchange/engine/testengine.nim @@ -17,8 +17,8 @@ import pkg/codex/discovery import pkg/codex/blocktype as bt import pkg/codex/utils/asyncheapqueue -import ../helpers -import ../examples +import ../../helpers +import ../../examples suite "NetworkStore engine basic": var diff --git a/tests/codex/blockexc/engine/testpayments.nim b/tests/codex/blockexchange/engine/testpayments.nim similarity index 100% rename from tests/codex/blockexc/engine/testpayments.nim rename to tests/codex/blockexchange/engine/testpayments.nim diff --git a/tests/codex/blockexc/protobuf/testpayments.nim b/tests/codex/blockexchange/protobuf/testpayments.nim similarity index 100% rename from tests/codex/blockexc/protobuf/testpayments.nim rename to tests/codex/blockexchange/protobuf/testpayments.nim diff --git a/tests/codex/blockexc/protobuf/testpresence.nim b/tests/codex/blockexchange/protobuf/testpresence.nim similarity index 100% rename from tests/codex/blockexc/protobuf/testpresence.nim rename to tests/codex/blockexchange/protobuf/testpresence.nim diff --git a/tests/codex/blockexchange/testdiscovery.nim b/tests/codex/blockexchange/testdiscovery.nim new file mode 100644 index 00000000..6f443014 --- /dev/null +++ b/tests/codex/blockexchange/testdiscovery.nim @@ -0,0 +1,4 @@ +import ./discovery/testdiscovery +import ./discovery/testdiscoveryengine + +{.warning[UnusedImport]: off.} diff --git a/tests/codex/blockexchange/testengine.nim b/tests/codex/blockexchange/testengine.nim new file mode 100644 index 00000000..5277e027 --- /dev/null +++ b/tests/codex/blockexchange/testengine.nim @@ -0,0 +1,5 @@ +import ./engine/testengine +import ./engine/testblockexc +import ./engine/testpayments + +{.warning[UnusedImport]: off.} diff --git a/tests/codex/blockexc/testnetwork.nim b/tests/codex/blockexchange/testnetwork.nim similarity index 100% rename from tests/codex/blockexc/testnetwork.nim rename to tests/codex/blockexchange/testnetwork.nim diff --git a/tests/codex/blockexc/testpeerctxstore.nim b/tests/codex/blockexchange/testpeerctxstore.nim similarity index 100% rename from tests/codex/blockexc/testpeerctxstore.nim rename to tests/codex/blockexchange/testpeerctxstore.nim diff --git a/tests/codex/blockexchange/testprotobuf.nim b/tests/codex/blockexchange/testprotobuf.nim new file mode 100644 index 00000000..1d41d6b4 --- /dev/null +++ b/tests/codex/blockexchange/testprotobuf.nim @@ -0,0 +1,4 @@ +import ./protobuf/testpayments +import ./protobuf/testpresence + +{.warning[UnusedImport]: off.} diff --git a/tests/codex/testblockexc.nim b/tests/codex/testblockexc.nim index e12e9e0d..bb7ace3d 100644 --- a/tests/codex/testblockexc.nim +++ b/tests/codex/testblockexc.nim @@ -1,11 +1,7 @@ -import ./blockexc/testengine -import ./blockexc/testnetwork -import ./blockexc/testblockexc -import ./blockexc/testpeerctxstore -import ./blockexc/discovery/testdiscovery -import ./blockexc/discovery/testdiscoveryengine -import ./blockexc/protobuf/testpayments as testprotobufpayments -import ./blockexc/protobuf/testpresence -import ./blockexc/engine/testpayments as testenginepayments +import ./blockexchange/testengine +import ./blockexchange/testnetwork +import ./blockexchange/testpeerctxstore +import ./blockexchange/testdiscovery +import ./blockexchange/testprotobuf {.warning[UnusedImport]: off.}