From 48368893c91b4a0a597e6a4e5d729379b694ea1c Mon Sep 17 00:00:00 2001 From: Dmitriy Ryajov Date: Fri, 29 Jul 2022 10:19:34 -0600 Subject: [PATCH] Add limits for inflight requests (#169) * convert network to async * use async network api * test with async network * test concurrent send limits --- codex/blockexchange/engine/engine.nim | 37 +-- codex/blockexchange/network/network.nim | 265 +++++++++--------- .../codex/blockexchange/engine/testengine.nim | 23 +- tests/codex/blockexchange/testnetwork.nim | 75 +++-- 4 files changed, 221 insertions(+), 179 deletions(-) diff --git a/codex/blockexchange/engine/engine.nim b/codex/blockexchange/engine/engine.nim index 964b38eb..24396745 100644 --- a/codex/blockexchange/engine/engine.nim +++ b/codex/blockexchange/engine/engine.nim @@ -118,14 +118,14 @@ proc stop*(b: BlockExcEngine) {.async.} = proc requestBlock*( b: BlockExcEngine, cid: Cid, - timeout = DefaultBlockTimeout): Future[bt.Block] = + timeout = DefaultBlockTimeout): Future[bt.Block] {.async.} = ## Request a block from remotes ## trace "Requesting block", cid = $cid if cid in b.pendingBlocks: - return b.pendingBlocks.getWantHandle(cid, timeout) + return await b.pendingBlocks.getWantHandle(cid, timeout) let blk = b.pendingBlocks.getWantHandle(cid, timeout) @@ -138,13 +138,13 @@ proc requestBlock*( if peers.len <= 0: trace "No peers to request blocks from", cid = $cid b.discovery.queueFindBlocksReq(@[cid]) - return blk + return await blk let blockPeer = peers[0] # get cheapest # request block - b.network.request.sendWantList( + await b.network.request.sendWantList( blockPeer.id, @[cid], wantType = WantType.wantBlock) # we want this remote to send us a block @@ -152,7 +152,7 @@ proc requestBlock*( if (peers.len - 1) == 0: trace "Not enough peers to send want list to", cid = $cid b.discovery.queueFindBlocksReq(@[cid]) - return blk # no peers to send wants to + return await blk # no peers to send wants to # filter out the peer we've already requested from let stop = min(peers.high, b.peersPerRequest) @@ -160,12 +160,12 @@ proc requestBlock*( for p in peers[1..stop]: if cid notin p.peerHave: # just send wants - b.network.request.sendWantList( + await b.network.request.sendWantList( p.id, @[cid], wantType = WantType.wantHave) # we only want to know if the peer has the block - return blk + return await blk proc blockPresenceHandler*( b: BlockExcEngine, @@ -190,7 +190,7 @@ proc blockPresenceHandler*( trace "Received presence update for cids", peer, count = cids.len if cids.len > 0: - b.network.request.sendWantList( + await b.network.request.sendWantList( peer, cids, wantType = WantType.wantBlock) # we want this remote to send us a block @@ -235,14 +235,14 @@ proc resolveBlocks*(b: BlockExcEngine, blocks: seq[bt.Block]) {.async.} = proc payForBlocks(engine: BlockExcEngine, peer: BlockExcPeerCtx, - blocks: seq[bt.Block]) = + blocks: seq[bt.Block]) {.async.} = let sendPayment = engine.network.request.sendPayment if sendPayment.isNil: return let cids = blocks.mapIt(it.cid) if payment =? engine.wallet.pay(peer, peer.price(cids)): - sendPayment(peer.id, payment) + await sendPayment(peer.id, payment) proc blocksHandler*( b: BlockExcEngine, @@ -259,7 +259,7 @@ proc blocksHandler*( await b.resolveBlocks(blocks) let peerCtx = b.peers.get(peer) if peerCtx != nil: - b.payForBlocks(peerCtx, blocks) + await b.payForBlocks(peerCtx, blocks) proc wantListHandler*( b: BlockExcEngine, @@ -297,7 +297,7 @@ proc wantListHandler*( # send don't have's to remote if dontHaves.len > 0: - b.network.request.sendPresence( + await b.network.request.sendPresence( peer, dontHaves.mapIt( BlockPresence( @@ -331,7 +331,7 @@ proc paymentHandler*( else: context.paymentChannel = engine.wallet.acceptChannel(payment).option -proc setupPeer*(b: BlockExcEngine, peer: PeerID) = +proc setupPeer*(b: BlockExcEngine, peer: PeerID) {.async.} = ## Perform initial setup, such as want ## list exchange ## @@ -344,10 +344,11 @@ proc setupPeer*(b: BlockExcEngine, peer: PeerID) = # broadcast our want list, the other peer will do the same if b.pendingBlocks.len > 0: - b.network.request.sendWantList(peer, toSeq(b.pendingBlocks.wantList), full = true) + await b.network.request.sendWantList( + peer, toSeq(b.pendingBlocks.wantList), full = true) if address =? b.pricing.?address: - b.network.request.sendAccount(peer, Account(address: address)) + await b.network.request.sendAccount(peer, Account(address: address)) proc dropPeer*(b: BlockExcEngine, peer: PeerID) = ## Cleanup disconnected peer @@ -383,7 +384,7 @@ proc taskHandler*(b: BlockExcEngine, task: BlockExcPeerCtx) {.gcsafe, async.} = if blocks.len > 0: trace "Sending blocks to peer", peer = task.id, blocks = blocks.len - b.network.request.sendBlocks( + await b.network.request.sendBlocks( task.id, blocks) @@ -409,7 +410,7 @@ proc taskHandler*(b: BlockExcEngine, task: BlockExcPeerCtx) {.gcsafe, async.} = wants.add(BlockPresence.init(presence)) if wants.len > 0: - b.network.request.sendPresence(task.id, wants) + await b.network.request.sendPresence(task.id, wants) proc blockexcTaskRunner(b: BlockExcEngine) {.async.} = ## process tasks @@ -450,7 +451,7 @@ proc new*( proc peerEventHandler(peerId: PeerID, event: PeerEvent) {.async.} = if event.kind == PeerEventKind.Joined: - engine.setupPeer(peerId) + await engine.setupPeer(peerId) else: engine.dropPeer(peerId) diff --git a/codex/blockexchange/network/network.nim b/codex/blockexchange/network/network.nim index 40ba33a2..4960a2fc 100644 --- a/codex/blockexchange/network/network.nim +++ b/codex/blockexchange/network/network.nim @@ -14,6 +14,7 @@ import pkg/chronicles import pkg/chronos import pkg/libp2p +import pkg/libp2p/utils/semaphore import pkg/questionable import pkg/questionable/results @@ -28,7 +29,9 @@ export network, payments logScope: topics = "codex blockexc network" -const Codec* = "/codex/blockexc/1.0.0" +const + Codec* = "/codex/blockexc/1.0.0" + MaxInflight* = 100 type WantListHandler* = proc(peer: PeerID, wantList: WantList): Future[void] {.gcsafe.} @@ -36,6 +39,14 @@ type BlockPresenceHandler* = proc(peer: PeerID, precense: seq[BlockPresence]): Future[void] {.gcsafe.} AccountHandler* = proc(peer: PeerID, account: Account): Future[void] {.gcsafe.} PaymentHandler* = proc(peer: PeerID, payment: SignedState): Future[void] {.gcsafe.} + WantListSender* = proc( + id: PeerID, + cids: seq[Cid], + priority: int32 = 0, + cancel: bool = false, + wantType: WantType = WantType.wantHave, + full: bool = false, + sendDontHave: bool = false): Future[void] {.gcsafe.} BlockExcHandlers* = object onWantList*: WantListHandler @@ -44,26 +55,17 @@ type onAccount*: AccountHandler onPayment*: PaymentHandler - WantListBroadcaster* = proc( - id: PeerID, - cids: seq[Cid], - priority: int32 = 0, - cancel: bool = false, - wantType: WantType = WantType.wantHave, - full: bool = false, - sendDontHave: bool = false) {.gcsafe.} - - BlocksBroadcaster* = proc(peer: PeerID, presence: seq[bt.Block]) {.gcsafe.} - PresenceBroadcaster* = proc(peer: PeerID, presence: seq[BlockPresence]) {.gcsafe.} - AccountBroadcaster* = proc(peer: PeerID, account: Account) {.gcsafe.} - PaymentBroadcaster* = proc(peer: PeerID, payment: SignedState) {.gcsafe.} + BlocksSender* = proc(peer: PeerID, presence: seq[bt.Block]): Future[void] {.gcsafe.} + PresenceSender* = proc(peer: PeerID, presence: seq[BlockPresence]): Future[void] {.gcsafe.} + AccountSender* = proc(peer: PeerID, account: Account): Future[void] {.gcsafe.} + PaymentSender* = proc(peer: PeerID, payment: SignedState): Future[void] {.gcsafe.} BlockExcRequest* = object - sendWantList*: WantListBroadcaster - sendBlocks*: BlocksBroadcaster - sendPresence*: PresenceBroadcaster - sendAccount*: AccountBroadcaster - sendPayment*: PaymentBroadcaster + sendWantList*: WantListSender + sendBlocks*: BlocksSender + sendPresence*: PresenceSender + sendAccount*: AccountSender + sendPayment*: PaymentSender BlockExcNetwork* = ref object of LPProtocol peers*: Table[PeerID, NetworkPeer] @@ -71,19 +73,32 @@ type handlers*: BlockExcHandlers request*: BlockExcRequest getConn: ConnProvider + inflightSema: AsyncSemaphore + +proc send*(b: BlockExcNetwork, id: PeerId, msg: pb.Message) {.async.} = + ## Send message to peer + ## + + b.peers.withValue(id, peer): + try: + await b.inflightSema.acquire() + trace "Sending message to peer", peer = id + await peer[].send(msg) + finally: + b.inflightSema.release() + do: + trace "Unable to send, peer not found", peerId = id proc handleWantList( b: BlockExcNetwork, peer: NetworkPeer, - list: WantList): Future[void] = + list: WantList) {.async.} = ## Handle incoming want list ## - if isNil(b.handlers.onWantList): - return - - trace "Handling want list for peer", peer = peer.id, items = list.entries.len - b.handlers.onWantList(peer.id, list) + if not b.handlers.onWantList.isNil: + trace "Handling want list for peer", peer = peer.id, items = list.entries.len + await b.handlers.onWantList(peer.id, list) # TODO: make into a template proc makeWantList*( @@ -93,18 +108,17 @@ proc makeWantList*( wantType: WantType = WantType.wantHave, full: bool = false, sendDontHave: bool = false): WantList = - var entries: seq[Entry] - for cid in cids: - entries.add(Entry( - `block`: cid.data.buffer, - priority: priority.int32, - cancel: cancel, - wantType: wantType, - sendDontHave: sendDontHave)) + WantList( + entries: cids.mapIt( + Entry( + `block`: it.data.buffer, + priority: priority.int32, + cancel: cancel, + wantType: wantType, + sendDontHave: sendDontHave) ), + full: full) - WantList(entries: entries, full: full) - -proc broadcastWantList*( +proc sendWantList*( b: BlockExcNetwork, id: PeerID, cids: seq[Cid], @@ -112,49 +126,42 @@ proc broadcastWantList*( cancel: bool = false, wantType: WantType = WantType.wantHave, full: bool = false, - sendDontHave: bool = false) = - ## send a want message to peer + sendDontHave: bool = false): Future[void] = + ## Send a want message to peer ## - if id notin b.peers: - return - trace "Sending want list to peer", peer = id, `type` = $wantType, items = cids.len + let msg = makeWantList( + cids, + priority, + cancel, + wantType, + full, + sendDontHave) - let - wantList = makeWantList( - cids, - priority, - cancel, - wantType, - full, - sendDontHave) - b.peers.withValue(id, peer): - peer[].broadcast(Message(wantlist: wantList)) + b.send(id, Message(wantlist: msg)) proc handleBlocks( b: BlockExcNetwork, peer: NetworkPeer, - blocks: seq[pb.Block]): Future[void] = + blocks: seq[pb.Block]) {.async.} = ## Handle incoming blocks ## - if isNil(b.handlers.onBlocks): - return + if not b.handlers.onBlocks.isNil: + trace "Handling blocks for peer", peer = peer.id, items = blocks.len - trace "Handling blocks for peer", peer = peer.id, items = blocks.len + var blks: seq[bt.Block] + for blob in blocks: + without cid =? Cid.init(blob.prefix): + trace "Unable to initialize Cid from protobuf message" - var blks: seq[bt.Block] - for blob in blocks: - without cid =? Cid.init(blob.prefix): - trace "Unable to initialize Cid from protobuf message" + without blk =? bt.Block.new(cid, blob.data, verify = true): + trace "Unable to initialize Block from data" - without blk =? bt.Block.new(cid, blob.data, verify = true): - trace "Unable to initialize Block from data" + blks.add(blk) - blks.add(blk) - - b.handlers.onBlocks(peer.id, blks) + await b.handlers.onBlocks(peer.id, blks) template makeBlocks*(blocks: seq[bt.Block]): seq[pb.Block] = var blks: seq[pb.Block] @@ -166,81 +173,72 @@ template makeBlocks*(blocks: seq[bt.Block]): seq[pb.Block] = blks -proc broadcastBlocks*( +proc sendBlocks*( b: BlockExcNetwork, id: PeerID, - blocks: seq[bt.Block]) = + blocks: seq[bt.Block]): Future[void] = ## Send blocks to remote ## - if id notin b.peers: - trace "Unable to send blocks, peer disconnected", peer = id - return - - b.peers.withValue(id, peer): - trace "Sending blocks to peer", peer = id, items = blocks.len - peer[].broadcast(pb.Message(payload: makeBlocks(blocks))) + b.send(id, pb.Message(payload: makeBlocks(blocks))) proc handleBlockPresence( b: BlockExcNetwork, peer: NetworkPeer, - presence: seq[BlockPresence]): Future[void] = + presence: seq[BlockPresence]) {.async.} = ## Handle block presence ## - if isNil(b.handlers.onPresence): - return + if not b.handlers.onPresence.isNil: + trace "Handling block presence for peer", peer = peer.id, items = presence.len + await b.handlers.onPresence(peer.id, presence) - trace "Handling block presence for peer", peer = peer.id, items = presence.len - b.handlers.onPresence(peer.id, presence) - -proc broadcastBlockPresence*( +proc sendBlockPresence*( b: BlockExcNetwork, id: PeerID, - presence: seq[BlockPresence]) = + presence: seq[BlockPresence]): Future[void] = ## Send presence to remote ## - if id notin b.peers: - return + b.send(id, Message(blockPresences: @presence)) - trace "Sending presence to peer", peer = id, items = presence.len - b.peers.withValue(id, peer): - peer[].broadcast(Message(blockPresences: @presence)) +proc handleAccount( + network: BlockExcNetwork, + peer: NetworkPeer, + account: Account) {.async.} = + ## Handle account info + ## -proc handleAccount(network: BlockExcNetwork, - peer: NetworkPeer, - account: Account): Future[void] = - if network.handlers.onAccount.isNil: - return - network.handlers.onAccount(peer.id, account) + if not network.handlers.onAccount.isNil: + await network.handlers.onAccount(peer.id, account) -proc broadcastAccount*(network: BlockExcNetwork, - id: PeerId, - account: Account) = - if id notin network.peers: - return +proc sendAccount*( + b: BlockExcNetwork, + id: PeerId, + account: Account): Future[void] = + ## Send account info to remote + ## - let message = Message(account: AccountMessage.init(account)) - network.peers.withValue(id, peer): - peer[].broadcast(message) + b.send(id, Message(account: AccountMessage.init(account))) -proc broadcastPayment*(network: BlockExcNetwork, - id: PeerId, - payment: SignedState) = - if id notin network.peers: - return +proc sendPayment*( + b: BlockExcNetwork, + id: PeerId, + payment: SignedState): Future[void] = + ## Send payment to remote + ## - let message = Message(payment: StateChannelUpdate.init(payment)) - network.peers.withValue(id, peer): - peer[].broadcast(message) + b.send(id, Message(payment: StateChannelUpdate.init(payment))) -proc handlePayment(network: BlockExcNetwork, - peer: NetworkPeer, - payment: SignedState): Future[void] = - if network.handlers.onPayment.isNil: - return - network.handlers.onPayment(peer.id, payment) +proc handlePayment( + network: BlockExcNetwork, + peer: NetworkPeer, + payment: SignedState) {.async.} = + ## Handle payment + ## + + if not network.handlers.onPayment.isNil: + await network.handlers.onPayment(peer.id, payment) proc rpcHandler(b: BlockExcNetwork, peer: NetworkPeer, msg: Message) {.async.} = try: @@ -297,10 +295,7 @@ proc setupPeer*(b: BlockExcNetwork, peer: PeerID) = discard b.getOrCreatePeer(peer) proc dialPeer*(b: BlockExcNetwork, peer: PeerRecord) {.async.} = - try: - await b.switch.connect(peer.peerId, peer.addresses.mapIt(it.address)) - except CatchableError as exc: - debug "Failed to connect to peer", error = exc.msg, peer + await b.switch.connect(peer.peerId, peer.addresses.mapIt(it.address)) proc dropPeer*(b: BlockExcNetwork, peer: PeerID) = ## Cleanup disconnected peer @@ -332,13 +327,15 @@ method init*(b: BlockExcNetwork) = proc new*( T: type BlockExcNetwork, switch: Switch, - connProvider: ConnProvider = nil): T = + connProvider: ConnProvider = nil, + maxInflight = MaxInflight): T = ## Create a new BlockExcNetwork instance ## - let b = BlockExcNetwork( + let self = BlockExcNetwork( switch: switch, - getConn: connProvider) + getConn: connProvider, + inflightSema: newAsyncSemaphore(maxInflight)) proc sendWantList( id: PeerID, @@ -347,29 +344,29 @@ proc new*( cancel: bool = false, wantType: WantType = WantType.wantHave, full: bool = false, - sendDontHave: bool = false) {.gcsafe.} = - b.broadcastWantList( + sendDontHave: bool = false): Future[void] {.gcsafe.} = + self.sendWantList( id, cids, priority, cancel, wantType, full, sendDontHave) - proc sendBlocks(id: PeerID, blocks: seq[bt.Block]) {.gcsafe.} = - b.broadcastBlocks(id, blocks) + proc sendBlocks(id: PeerID, blocks: seq[bt.Block]): Future[void] {.gcsafe.} = + self.sendBlocks(id, blocks) - proc sendPresence(id: PeerID, presence: seq[BlockPresence]) {.gcsafe.} = - b.broadcastBlockPresence(id, presence) + proc sendPresence(id: PeerID, presence: seq[BlockPresence]): Future[void] {.gcsafe.} = + self.sendBlockPresence(id, presence) - proc sendAccount(id: PeerID, account: Account) = - b.broadcastAccount(id, account) + proc sendAccount(id: PeerID, account: Account): Future[void] {.gcsafe.} = + self.sendAccount(id, account) - proc sendPayment(id: PeerID, payment: SignedState) = - b.broadcastPayment(id, payment) + proc sendPayment(id: PeerID, payment: SignedState): Future[void] {.gcsafe.} = + self.sendPayment(id, payment) - b.request = BlockExcRequest( + self.request = BlockExcRequest( sendWantList: sendWantList, sendBlocks: sendBlocks, sendPresence: sendPresence, sendAccount: sendAccount, sendPayment: sendPayment) - b.init() - return b + self.init() + return self diff --git a/tests/codex/blockexchange/engine/testengine.nim b/tests/codex/blockexchange/engine/testengine.nim index 5fa2b803..7a360597 100644 --- a/tests/codex/blockexchange/engine/testengine.nim +++ b/tests/codex/blockexchange/engine/testengine.nim @@ -60,7 +60,7 @@ suite "NetworkStore engine basic": cancel: bool = false, wantType: WantType = WantType.wantHave, full: bool = false, - sendDontHave: bool = false) {.gcsafe.} = + sendDontHave: bool = false) {.gcsafe, async.} = check cids.mapIt($it).sorted == blocks.mapIt( $it.cid ).sorted done.complete() @@ -87,20 +87,21 @@ suite "NetworkStore engine basic": for b in blocks: discard engine.pendingBlocks.getWantHandle(b.cid) - engine.setupPeer(peerId) + await engine.setupPeer(peerId) await done.wait(100.millis) test "Should send account to new peers": let pricing = Pricing.example - proc sendAccount(peer: PeerID, account: Account) = + proc sendAccount(peer: PeerID, account: Account) {.gcsafe, async.} = check account.address == pricing.address done.complete() let - network = BlockExcNetwork(request: BlockExcRequest( - sendAccount: sendAccount + network = BlockExcNetwork( + request: BlockExcRequest( + sendAccount: sendAccount )) localStore = CacheStore.new() @@ -120,7 +121,7 @@ suite "NetworkStore engine basic": pendingBlocks) engine.pricing = pricing.some - engine.setupPeer(peerId) + await engine.setupPeer(peerId) await done.wait(100.millis) @@ -197,7 +198,7 @@ suite "NetworkStore engine handlers": test "Should handle want list - `dont-have`": let wantList = makeWantList(blocks.mapIt( it.cid ), sendDontHave = true) - proc sendPresence(peerId: PeerID, presence: seq[BlockPresence]) = + proc sendPresence(peerId: PeerID, presence: seq[BlockPresence]) {.gcsafe, async.} = check presence.mapIt( it.cid ) == wantList.entries.mapIt( it.`block` ) for p in presence: check: @@ -215,7 +216,7 @@ suite "NetworkStore engine handlers": test "Should handle want list - `dont-have` some blocks": let wantList = makeWantList(blocks.mapIt( it.cid ), sendDontHave = true) - proc sendPresence(peerId: PeerID, presence: seq[BlockPresence]) = + proc sendPresence(peerId: PeerID, presence: seq[BlockPresence]) {.gcsafe, async.} = check presence.mapIt( it.cid ) == blocks[2..blocks.high].mapIt( it.cid.data.buffer ) for p in presence: check: @@ -252,7 +253,7 @@ suite "NetworkStore engine handlers": peerContext.peerPrices = blocks.mapIt((it.cid, rand(uint16).u256)).toTable engine.network = BlockExcNetwork(request: BlockExcRequest( - sendPayment: proc(receiver: PeerID, payment: SignedState) = + sendPayment: proc(receiver: PeerID, payment: SignedState) {.gcsafe, async.} = let amount = blocks.mapIt(peerContext.peerPrices[it.cid]).foldl(a+b) let balances = !payment.state.outcome.balances(Asset) check receiver == peerId @@ -349,7 +350,7 @@ suite "Task Handler": test "Should send want-blocks in priority order": proc sendBlocks( id: PeerID, - blks: seq[bt.Block]) {.gcsafe.} = + blks: seq[bt.Block]) {.gcsafe, async.} = check blks.len == 2 check: blks[1].cid == blocks[0].cid @@ -386,7 +387,7 @@ suite "Task Handler": let missing = @[bt.Block.new("missing".toBytes).tryGet()] let price = (!engine.pricing).price - proc sendPresence(id: PeerID, presence: seq[BlockPresence]) = + proc sendPresence(id: PeerID, presence: seq[BlockPresence]) {.gcsafe, async.} = check presence.mapIt(!Presence.init(it)) == @[ Presence(cid: present[0].cid, have: true, price: price), Presence(cid: present[1].cid, have: true, price: price), diff --git a/tests/codex/blockexchange/testnetwork.nim b/tests/codex/blockexchange/testnetwork.nim index fe955d35..1b01422d 100644 --- a/tests/codex/blockexchange/testnetwork.nim +++ b/tests/codex/blockexchange/testnetwork.nim @@ -15,7 +15,7 @@ import pkg/codex/blockexchange import ../helpers import ../examples -suite "NetworkStore network": +suite "Network - Handlers": let rng = Rng.instance() seckey = PrivateKey.random(rng[]).tryGet() @@ -110,7 +110,7 @@ suite "NetworkStore network": await done.wait(500.millis) - test "handles account messages": + test "Handles account messages": let account = Account(address: EthAddress.example) proc handleAccount(peer: PeerID, received: Account) {.gcsafe, async.} = @@ -124,7 +124,7 @@ suite "NetworkStore network": await done.wait(100.millis) - test "handles payment messages": + test "Handles payment messages": let payment = SignedState.example proc handlePayment(peer: PeerID, received: SignedState) {.gcsafe, async.} = @@ -138,7 +138,7 @@ suite "NetworkStore network": await done.wait(100.millis) -suite "NetworkStore Network - e2e": +suite "Network - Senders": let chunker = RandomChunker.new(Rng.instance(), size = 1024, chunkSize = 256) @@ -179,7 +179,7 @@ suite "NetworkStore Network - e2e": switch1.stop(), switch2.stop()) - test "broadcast want list": + test "Send want list": proc wantListHandler(peer: PeerID, wantList: WantList) {.gcsafe, async.} = # check that we got the correct amount of entries check wantList.entries.len == 4 @@ -195,7 +195,7 @@ suite "NetworkStore Network - e2e": done.complete() network2.handlers.onWantList = wantListHandler - network1.broadcastWantList( + await network1.sendWantList( switch2.peerInfo.peerId, blocks.mapIt( it.cid ), 1, true, WantType.wantHave, @@ -203,19 +203,19 @@ suite "NetworkStore Network - e2e": await done.wait(500.millis) - test "broadcast blocks": + test "send blocks": proc blocksHandler(peer: PeerID, blks: seq[bt.Block]) {.gcsafe, async.} = check blks == blocks done.complete() network2.handlers.onBlocks = blocksHandler - network1.broadcastBlocks( + await network1.sendBlocks( switch2.peerInfo.peerId, blocks) await done.wait(500.millis) - test "broadcast presence": + test "send presence": proc presenceHandler( peer: PeerID, precense: seq[BlockPresence]) {.gcsafe, async.} = @@ -227,7 +227,7 @@ suite "NetworkStore Network - e2e": network2.handlers.onPresence = presenceHandler - network1.broadcastBlockPresence( + await network1.sendBlockPresence( switch2.peerInfo.peerId, blocks.mapIt( BlockPresence( @@ -237,7 +237,7 @@ suite "NetworkStore Network - e2e": await done.wait(500.millis) - test "broadcasts account": + test "send account": let account = Account(address: EthAddress.example) proc handleAccount(peer: PeerID, received: Account) {.gcsafe, async.} = @@ -246,11 +246,10 @@ suite "NetworkStore Network - e2e": network2.handlers.onAccount = handleAccount - network1.broadcastAccount(switch2.peerInfo.peerId, account) - + await network1.sendAccount(switch2.peerInfo.peerId, account) await done.wait(500.millis) - test "broadcasts payment": + test "send payment": let payment = SignedState.example proc handlePayment(peer: PeerID, received: SignedState) {.gcsafe, async.} = @@ -259,6 +258,50 @@ suite "NetworkStore Network - e2e": network2.handlers.onPayment = handlePayment - network1.broadcastPayment(switch2.peerInfo.peerId, payment) - + await network1.sendPayment(switch2.peerInfo.peerId, payment) await done.wait(500.millis) + +suite "Network - Test Limits": + var + switch1, switch2: Switch + network1, network2: BlockExcNetwork + blocks: seq[bt.Block] + done: Future[void] + + setup: + done = newFuture[void]() + switch1 = newStandardSwitch() + switch2 = newStandardSwitch() + await switch1.start() + await switch2.start() + + network1 = BlockExcNetwork.new( + switch = switch1, + maxInflight = 0) + switch1.mount(network1) + + network2 = BlockExcNetwork.new( + switch = switch2) + switch2.mount(network2) + + await switch1.connect( + switch2.peerInfo.peerId, + switch2.peerInfo.addrs) + + teardown: + await allFuturesThrowing( + switch1.stop(), + switch2.stop()) + + test "Concurrent Sends": + let account = Account(address: EthAddress.example) + network2.handlers.onAccount = + proc(peer: PeerID, received: Account) {.gcsafe, async.} = + check false + + let fut = network1.send( + switch2.peerInfo.peerId, + Message(account: AccountMessage.init(account))) + + await sleepAsync(100.millis) + check not fut.finished