Add limits for inflight requests (#169)

* convert network to async

* use async network api

* test with async network

* test concurrent send limits
This commit is contained in:
Dmitriy Ryajov 2022-07-29 10:19:34 -06:00 committed by GitHub
parent 0b786b383f
commit 48368893c9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 221 additions and 179 deletions

View File

@ -118,14 +118,14 @@ proc stop*(b: BlockExcEngine) {.async.} =
proc requestBlock*( proc requestBlock*(
b: BlockExcEngine, b: BlockExcEngine,
cid: Cid, cid: Cid,
timeout = DefaultBlockTimeout): Future[bt.Block] = timeout = DefaultBlockTimeout): Future[bt.Block] {.async.} =
## Request a block from remotes ## Request a block from remotes
## ##
trace "Requesting block", cid = $cid trace "Requesting block", cid = $cid
if cid in b.pendingBlocks: if cid in b.pendingBlocks:
return b.pendingBlocks.getWantHandle(cid, timeout) return await b.pendingBlocks.getWantHandle(cid, timeout)
let let
blk = b.pendingBlocks.getWantHandle(cid, timeout) blk = b.pendingBlocks.getWantHandle(cid, timeout)
@ -138,13 +138,13 @@ proc requestBlock*(
if peers.len <= 0: if peers.len <= 0:
trace "No peers to request blocks from", cid = $cid trace "No peers to request blocks from", cid = $cid
b.discovery.queueFindBlocksReq(@[cid]) b.discovery.queueFindBlocksReq(@[cid])
return blk return await blk
let let
blockPeer = peers[0] # get cheapest blockPeer = peers[0] # get cheapest
# request block # request block
b.network.request.sendWantList( await b.network.request.sendWantList(
blockPeer.id, blockPeer.id,
@[cid], @[cid],
wantType = WantType.wantBlock) # we want this remote to send us a block wantType = WantType.wantBlock) # we want this remote to send us a block
@ -152,7 +152,7 @@ proc requestBlock*(
if (peers.len - 1) == 0: if (peers.len - 1) == 0:
trace "Not enough peers to send want list to", cid = $cid trace "Not enough peers to send want list to", cid = $cid
b.discovery.queueFindBlocksReq(@[cid]) b.discovery.queueFindBlocksReq(@[cid])
return blk # no peers to send wants to return await blk # no peers to send wants to
# filter out the peer we've already requested from # filter out the peer we've already requested from
let stop = min(peers.high, b.peersPerRequest) let stop = min(peers.high, b.peersPerRequest)
@ -160,12 +160,12 @@ proc requestBlock*(
for p in peers[1..stop]: for p in peers[1..stop]:
if cid notin p.peerHave: if cid notin p.peerHave:
# just send wants # just send wants
b.network.request.sendWantList( await b.network.request.sendWantList(
p.id, p.id,
@[cid], @[cid],
wantType = WantType.wantHave) # we only want to know if the peer has the block wantType = WantType.wantHave) # we only want to know if the peer has the block
return blk return await blk
proc blockPresenceHandler*( proc blockPresenceHandler*(
b: BlockExcEngine, b: BlockExcEngine,
@ -190,7 +190,7 @@ proc blockPresenceHandler*(
trace "Received presence update for cids", peer, count = cids.len trace "Received presence update for cids", peer, count = cids.len
if cids.len > 0: if cids.len > 0:
b.network.request.sendWantList( await b.network.request.sendWantList(
peer, peer,
cids, cids,
wantType = WantType.wantBlock) # we want this remote to send us a block wantType = WantType.wantBlock) # we want this remote to send us a block
@ -235,14 +235,14 @@ proc resolveBlocks*(b: BlockExcEngine, blocks: seq[bt.Block]) {.async.} =
proc payForBlocks(engine: BlockExcEngine, proc payForBlocks(engine: BlockExcEngine,
peer: BlockExcPeerCtx, peer: BlockExcPeerCtx,
blocks: seq[bt.Block]) = blocks: seq[bt.Block]) {.async.} =
let sendPayment = engine.network.request.sendPayment let sendPayment = engine.network.request.sendPayment
if sendPayment.isNil: if sendPayment.isNil:
return return
let cids = blocks.mapIt(it.cid) let cids = blocks.mapIt(it.cid)
if payment =? engine.wallet.pay(peer, peer.price(cids)): if payment =? engine.wallet.pay(peer, peer.price(cids)):
sendPayment(peer.id, payment) await sendPayment(peer.id, payment)
proc blocksHandler*( proc blocksHandler*(
b: BlockExcEngine, b: BlockExcEngine,
@ -259,7 +259,7 @@ proc blocksHandler*(
await b.resolveBlocks(blocks) await b.resolveBlocks(blocks)
let peerCtx = b.peers.get(peer) let peerCtx = b.peers.get(peer)
if peerCtx != nil: if peerCtx != nil:
b.payForBlocks(peerCtx, blocks) await b.payForBlocks(peerCtx, blocks)
proc wantListHandler*( proc wantListHandler*(
b: BlockExcEngine, b: BlockExcEngine,
@ -297,7 +297,7 @@ proc wantListHandler*(
# send don't have's to remote # send don't have's to remote
if dontHaves.len > 0: if dontHaves.len > 0:
b.network.request.sendPresence( await b.network.request.sendPresence(
peer, peer,
dontHaves.mapIt( dontHaves.mapIt(
BlockPresence( BlockPresence(
@ -331,7 +331,7 @@ proc paymentHandler*(
else: else:
context.paymentChannel = engine.wallet.acceptChannel(payment).option context.paymentChannel = engine.wallet.acceptChannel(payment).option
proc setupPeer*(b: BlockExcEngine, peer: PeerID) = proc setupPeer*(b: BlockExcEngine, peer: PeerID) {.async.} =
## Perform initial setup, such as want ## Perform initial setup, such as want
## list exchange ## list exchange
## ##
@ -344,10 +344,11 @@ proc setupPeer*(b: BlockExcEngine, peer: PeerID) =
# broadcast our want list, the other peer will do the same # broadcast our want list, the other peer will do the same
if b.pendingBlocks.len > 0: if b.pendingBlocks.len > 0:
b.network.request.sendWantList(peer, toSeq(b.pendingBlocks.wantList), full = true) await b.network.request.sendWantList(
peer, toSeq(b.pendingBlocks.wantList), full = true)
if address =? b.pricing.?address: if address =? b.pricing.?address:
b.network.request.sendAccount(peer, Account(address: address)) await b.network.request.sendAccount(peer, Account(address: address))
proc dropPeer*(b: BlockExcEngine, peer: PeerID) = proc dropPeer*(b: BlockExcEngine, peer: PeerID) =
## Cleanup disconnected peer ## Cleanup disconnected peer
@ -383,7 +384,7 @@ proc taskHandler*(b: BlockExcEngine, task: BlockExcPeerCtx) {.gcsafe, async.} =
if blocks.len > 0: if blocks.len > 0:
trace "Sending blocks to peer", peer = task.id, blocks = blocks.len trace "Sending blocks to peer", peer = task.id, blocks = blocks.len
b.network.request.sendBlocks( await b.network.request.sendBlocks(
task.id, task.id,
blocks) blocks)
@ -409,7 +410,7 @@ proc taskHandler*(b: BlockExcEngine, task: BlockExcPeerCtx) {.gcsafe, async.} =
wants.add(BlockPresence.init(presence)) wants.add(BlockPresence.init(presence))
if wants.len > 0: if wants.len > 0:
b.network.request.sendPresence(task.id, wants) await b.network.request.sendPresence(task.id, wants)
proc blockexcTaskRunner(b: BlockExcEngine) {.async.} = proc blockexcTaskRunner(b: BlockExcEngine) {.async.} =
## process tasks ## process tasks
@ -450,7 +451,7 @@ proc new*(
proc peerEventHandler(peerId: PeerID, event: PeerEvent) {.async.} = proc peerEventHandler(peerId: PeerID, event: PeerEvent) {.async.} =
if event.kind == PeerEventKind.Joined: if event.kind == PeerEventKind.Joined:
engine.setupPeer(peerId) await engine.setupPeer(peerId)
else: else:
engine.dropPeer(peerId) engine.dropPeer(peerId)

View File

@ -14,6 +14,7 @@ import pkg/chronicles
import pkg/chronos import pkg/chronos
import pkg/libp2p import pkg/libp2p
import pkg/libp2p/utils/semaphore
import pkg/questionable import pkg/questionable
import pkg/questionable/results import pkg/questionable/results
@ -28,7 +29,9 @@ export network, payments
logScope: logScope:
topics = "codex blockexc network" topics = "codex blockexc network"
const Codec* = "/codex/blockexc/1.0.0" const
Codec* = "/codex/blockexc/1.0.0"
MaxInflight* = 100
type type
WantListHandler* = proc(peer: PeerID, wantList: WantList): Future[void] {.gcsafe.} WantListHandler* = proc(peer: PeerID, wantList: WantList): Future[void] {.gcsafe.}
@ -36,6 +39,14 @@ type
BlockPresenceHandler* = proc(peer: PeerID, precense: seq[BlockPresence]): Future[void] {.gcsafe.} BlockPresenceHandler* = proc(peer: PeerID, precense: seq[BlockPresence]): Future[void] {.gcsafe.}
AccountHandler* = proc(peer: PeerID, account: Account): Future[void] {.gcsafe.} AccountHandler* = proc(peer: PeerID, account: Account): Future[void] {.gcsafe.}
PaymentHandler* = proc(peer: PeerID, payment: SignedState): Future[void] {.gcsafe.} PaymentHandler* = proc(peer: PeerID, payment: SignedState): Future[void] {.gcsafe.}
WantListSender* = proc(
id: PeerID,
cids: seq[Cid],
priority: int32 = 0,
cancel: bool = false,
wantType: WantType = WantType.wantHave,
full: bool = false,
sendDontHave: bool = false): Future[void] {.gcsafe.}
BlockExcHandlers* = object BlockExcHandlers* = object
onWantList*: WantListHandler onWantList*: WantListHandler
@ -44,26 +55,17 @@ type
onAccount*: AccountHandler onAccount*: AccountHandler
onPayment*: PaymentHandler onPayment*: PaymentHandler
WantListBroadcaster* = proc( BlocksSender* = proc(peer: PeerID, presence: seq[bt.Block]): Future[void] {.gcsafe.}
id: PeerID, PresenceSender* = proc(peer: PeerID, presence: seq[BlockPresence]): Future[void] {.gcsafe.}
cids: seq[Cid], AccountSender* = proc(peer: PeerID, account: Account): Future[void] {.gcsafe.}
priority: int32 = 0, PaymentSender* = proc(peer: PeerID, payment: SignedState): Future[void] {.gcsafe.}
cancel: bool = false,
wantType: WantType = WantType.wantHave,
full: bool = false,
sendDontHave: bool = false) {.gcsafe.}
BlocksBroadcaster* = proc(peer: PeerID, presence: seq[bt.Block]) {.gcsafe.}
PresenceBroadcaster* = proc(peer: PeerID, presence: seq[BlockPresence]) {.gcsafe.}
AccountBroadcaster* = proc(peer: PeerID, account: Account) {.gcsafe.}
PaymentBroadcaster* = proc(peer: PeerID, payment: SignedState) {.gcsafe.}
BlockExcRequest* = object BlockExcRequest* = object
sendWantList*: WantListBroadcaster sendWantList*: WantListSender
sendBlocks*: BlocksBroadcaster sendBlocks*: BlocksSender
sendPresence*: PresenceBroadcaster sendPresence*: PresenceSender
sendAccount*: AccountBroadcaster sendAccount*: AccountSender
sendPayment*: PaymentBroadcaster sendPayment*: PaymentSender
BlockExcNetwork* = ref object of LPProtocol BlockExcNetwork* = ref object of LPProtocol
peers*: Table[PeerID, NetworkPeer] peers*: Table[PeerID, NetworkPeer]
@ -71,19 +73,32 @@ type
handlers*: BlockExcHandlers handlers*: BlockExcHandlers
request*: BlockExcRequest request*: BlockExcRequest
getConn: ConnProvider getConn: ConnProvider
inflightSema: AsyncSemaphore
proc send*(b: BlockExcNetwork, id: PeerId, msg: pb.Message) {.async.} =
## Send message to peer
##
b.peers.withValue(id, peer):
try:
await b.inflightSema.acquire()
trace "Sending message to peer", peer = id
await peer[].send(msg)
finally:
b.inflightSema.release()
do:
trace "Unable to send, peer not found", peerId = id
proc handleWantList( proc handleWantList(
b: BlockExcNetwork, b: BlockExcNetwork,
peer: NetworkPeer, peer: NetworkPeer,
list: WantList): Future[void] = list: WantList) {.async.} =
## Handle incoming want list ## Handle incoming want list
## ##
if isNil(b.handlers.onWantList): if not b.handlers.onWantList.isNil:
return trace "Handling want list for peer", peer = peer.id, items = list.entries.len
await b.handlers.onWantList(peer.id, list)
trace "Handling want list for peer", peer = peer.id, items = list.entries.len
b.handlers.onWantList(peer.id, list)
# TODO: make into a template # TODO: make into a template
proc makeWantList*( proc makeWantList*(
@ -93,18 +108,17 @@ proc makeWantList*(
wantType: WantType = WantType.wantHave, wantType: WantType = WantType.wantHave,
full: bool = false, full: bool = false,
sendDontHave: bool = false): WantList = sendDontHave: bool = false): WantList =
var entries: seq[Entry] WantList(
for cid in cids: entries: cids.mapIt(
entries.add(Entry( Entry(
`block`: cid.data.buffer, `block`: it.data.buffer,
priority: priority.int32, priority: priority.int32,
cancel: cancel, cancel: cancel,
wantType: wantType, wantType: wantType,
sendDontHave: sendDontHave)) sendDontHave: sendDontHave) ),
full: full)
WantList(entries: entries, full: full) proc sendWantList*(
proc broadcastWantList*(
b: BlockExcNetwork, b: BlockExcNetwork,
id: PeerID, id: PeerID,
cids: seq[Cid], cids: seq[Cid],
@ -112,49 +126,42 @@ proc broadcastWantList*(
cancel: bool = false, cancel: bool = false,
wantType: WantType = WantType.wantHave, wantType: WantType = WantType.wantHave,
full: bool = false, full: bool = false,
sendDontHave: bool = false) = sendDontHave: bool = false): Future[void] =
## send a want message to peer ## Send a want message to peer
## ##
if id notin b.peers:
return
trace "Sending want list to peer", peer = id, `type` = $wantType, items = cids.len trace "Sending want list to peer", peer = id, `type` = $wantType, items = cids.len
let msg = makeWantList(
cids,
priority,
cancel,
wantType,
full,
sendDontHave)
let b.send(id, Message(wantlist: msg))
wantList = makeWantList(
cids,
priority,
cancel,
wantType,
full,
sendDontHave)
b.peers.withValue(id, peer):
peer[].broadcast(Message(wantlist: wantList))
proc handleBlocks( proc handleBlocks(
b: BlockExcNetwork, b: BlockExcNetwork,
peer: NetworkPeer, peer: NetworkPeer,
blocks: seq[pb.Block]): Future[void] = blocks: seq[pb.Block]) {.async.} =
## Handle incoming blocks ## Handle incoming blocks
## ##
if isNil(b.handlers.onBlocks): if not b.handlers.onBlocks.isNil:
return trace "Handling blocks for peer", peer = peer.id, items = blocks.len
trace "Handling blocks for peer", peer = peer.id, items = blocks.len var blks: seq[bt.Block]
for blob in blocks:
without cid =? Cid.init(blob.prefix):
trace "Unable to initialize Cid from protobuf message"
var blks: seq[bt.Block] without blk =? bt.Block.new(cid, blob.data, verify = true):
for blob in blocks: trace "Unable to initialize Block from data"
without cid =? Cid.init(blob.prefix):
trace "Unable to initialize Cid from protobuf message"
without blk =? bt.Block.new(cid, blob.data, verify = true): blks.add(blk)
trace "Unable to initialize Block from data"
blks.add(blk) await b.handlers.onBlocks(peer.id, blks)
b.handlers.onBlocks(peer.id, blks)
template makeBlocks*(blocks: seq[bt.Block]): seq[pb.Block] = template makeBlocks*(blocks: seq[bt.Block]): seq[pb.Block] =
var blks: seq[pb.Block] var blks: seq[pb.Block]
@ -166,81 +173,72 @@ template makeBlocks*(blocks: seq[bt.Block]): seq[pb.Block] =
blks blks
proc broadcastBlocks*( proc sendBlocks*(
b: BlockExcNetwork, b: BlockExcNetwork,
id: PeerID, id: PeerID,
blocks: seq[bt.Block]) = blocks: seq[bt.Block]): Future[void] =
## Send blocks to remote ## Send blocks to remote
## ##
if id notin b.peers: b.send(id, pb.Message(payload: makeBlocks(blocks)))
trace "Unable to send blocks, peer disconnected", peer = id
return
b.peers.withValue(id, peer):
trace "Sending blocks to peer", peer = id, items = blocks.len
peer[].broadcast(pb.Message(payload: makeBlocks(blocks)))
proc handleBlockPresence( proc handleBlockPresence(
b: BlockExcNetwork, b: BlockExcNetwork,
peer: NetworkPeer, peer: NetworkPeer,
presence: seq[BlockPresence]): Future[void] = presence: seq[BlockPresence]) {.async.} =
## Handle block presence ## Handle block presence
## ##
if isNil(b.handlers.onPresence): if not b.handlers.onPresence.isNil:
return trace "Handling block presence for peer", peer = peer.id, items = presence.len
await b.handlers.onPresence(peer.id, presence)
trace "Handling block presence for peer", peer = peer.id, items = presence.len proc sendBlockPresence*(
b.handlers.onPresence(peer.id, presence)
proc broadcastBlockPresence*(
b: BlockExcNetwork, b: BlockExcNetwork,
id: PeerID, id: PeerID,
presence: seq[BlockPresence]) = presence: seq[BlockPresence]): Future[void] =
## Send presence to remote ## Send presence to remote
## ##
if id notin b.peers: b.send(id, Message(blockPresences: @presence))
return
trace "Sending presence to peer", peer = id, items = presence.len proc handleAccount(
b.peers.withValue(id, peer): network: BlockExcNetwork,
peer[].broadcast(Message(blockPresences: @presence)) peer: NetworkPeer,
account: Account) {.async.} =
## Handle account info
##
proc handleAccount(network: BlockExcNetwork, if not network.handlers.onAccount.isNil:
peer: NetworkPeer, await network.handlers.onAccount(peer.id, account)
account: Account): Future[void] =
if network.handlers.onAccount.isNil:
return
network.handlers.onAccount(peer.id, account)
proc broadcastAccount*(network: BlockExcNetwork, proc sendAccount*(
id: PeerId, b: BlockExcNetwork,
account: Account) = id: PeerId,
if id notin network.peers: account: Account): Future[void] =
return ## Send account info to remote
##
let message = Message(account: AccountMessage.init(account)) b.send(id, Message(account: AccountMessage.init(account)))
network.peers.withValue(id, peer):
peer[].broadcast(message)
proc broadcastPayment*(network: BlockExcNetwork, proc sendPayment*(
id: PeerId, b: BlockExcNetwork,
payment: SignedState) = id: PeerId,
if id notin network.peers: payment: SignedState): Future[void] =
return ## Send payment to remote
##
let message = Message(payment: StateChannelUpdate.init(payment)) b.send(id, Message(payment: StateChannelUpdate.init(payment)))
network.peers.withValue(id, peer):
peer[].broadcast(message)
proc handlePayment(network: BlockExcNetwork, proc handlePayment(
peer: NetworkPeer, network: BlockExcNetwork,
payment: SignedState): Future[void] = peer: NetworkPeer,
if network.handlers.onPayment.isNil: payment: SignedState) {.async.} =
return ## Handle payment
network.handlers.onPayment(peer.id, payment) ##
if not network.handlers.onPayment.isNil:
await network.handlers.onPayment(peer.id, payment)
proc rpcHandler(b: BlockExcNetwork, peer: NetworkPeer, msg: Message) {.async.} = proc rpcHandler(b: BlockExcNetwork, peer: NetworkPeer, msg: Message) {.async.} =
try: try:
@ -297,10 +295,7 @@ proc setupPeer*(b: BlockExcNetwork, peer: PeerID) =
discard b.getOrCreatePeer(peer) discard b.getOrCreatePeer(peer)
proc dialPeer*(b: BlockExcNetwork, peer: PeerRecord) {.async.} = proc dialPeer*(b: BlockExcNetwork, peer: PeerRecord) {.async.} =
try: await b.switch.connect(peer.peerId, peer.addresses.mapIt(it.address))
await b.switch.connect(peer.peerId, peer.addresses.mapIt(it.address))
except CatchableError as exc:
debug "Failed to connect to peer", error = exc.msg, peer
proc dropPeer*(b: BlockExcNetwork, peer: PeerID) = proc dropPeer*(b: BlockExcNetwork, peer: PeerID) =
## Cleanup disconnected peer ## Cleanup disconnected peer
@ -332,13 +327,15 @@ method init*(b: BlockExcNetwork) =
proc new*( proc new*(
T: type BlockExcNetwork, T: type BlockExcNetwork,
switch: Switch, switch: Switch,
connProvider: ConnProvider = nil): T = connProvider: ConnProvider = nil,
maxInflight = MaxInflight): T =
## Create a new BlockExcNetwork instance ## Create a new BlockExcNetwork instance
## ##
let b = BlockExcNetwork( let self = BlockExcNetwork(
switch: switch, switch: switch,
getConn: connProvider) getConn: connProvider,
inflightSema: newAsyncSemaphore(maxInflight))
proc sendWantList( proc sendWantList(
id: PeerID, id: PeerID,
@ -347,29 +344,29 @@ proc new*(
cancel: bool = false, cancel: bool = false,
wantType: WantType = WantType.wantHave, wantType: WantType = WantType.wantHave,
full: bool = false, full: bool = false,
sendDontHave: bool = false) {.gcsafe.} = sendDontHave: bool = false): Future[void] {.gcsafe.} =
b.broadcastWantList( self.sendWantList(
id, cids, priority, cancel, id, cids, priority, cancel,
wantType, full, sendDontHave) wantType, full, sendDontHave)
proc sendBlocks(id: PeerID, blocks: seq[bt.Block]) {.gcsafe.} = proc sendBlocks(id: PeerID, blocks: seq[bt.Block]): Future[void] {.gcsafe.} =
b.broadcastBlocks(id, blocks) self.sendBlocks(id, blocks)
proc sendPresence(id: PeerID, presence: seq[BlockPresence]) {.gcsafe.} = proc sendPresence(id: PeerID, presence: seq[BlockPresence]): Future[void] {.gcsafe.} =
b.broadcastBlockPresence(id, presence) self.sendBlockPresence(id, presence)
proc sendAccount(id: PeerID, account: Account) = proc sendAccount(id: PeerID, account: Account): Future[void] {.gcsafe.} =
b.broadcastAccount(id, account) self.sendAccount(id, account)
proc sendPayment(id: PeerID, payment: SignedState) = proc sendPayment(id: PeerID, payment: SignedState): Future[void] {.gcsafe.} =
b.broadcastPayment(id, payment) self.sendPayment(id, payment)
b.request = BlockExcRequest( self.request = BlockExcRequest(
sendWantList: sendWantList, sendWantList: sendWantList,
sendBlocks: sendBlocks, sendBlocks: sendBlocks,
sendPresence: sendPresence, sendPresence: sendPresence,
sendAccount: sendAccount, sendAccount: sendAccount,
sendPayment: sendPayment) sendPayment: sendPayment)
b.init() self.init()
return b return self

View File

@ -60,7 +60,7 @@ suite "NetworkStore engine basic":
cancel: bool = false, cancel: bool = false,
wantType: WantType = WantType.wantHave, wantType: WantType = WantType.wantHave,
full: bool = false, full: bool = false,
sendDontHave: bool = false) {.gcsafe.} = sendDontHave: bool = false) {.gcsafe, async.} =
check cids.mapIt($it).sorted == blocks.mapIt( $it.cid ).sorted check cids.mapIt($it).sorted == blocks.mapIt( $it.cid ).sorted
done.complete() done.complete()
@ -87,20 +87,21 @@ suite "NetworkStore engine basic":
for b in blocks: for b in blocks:
discard engine.pendingBlocks.getWantHandle(b.cid) discard engine.pendingBlocks.getWantHandle(b.cid)
engine.setupPeer(peerId) await engine.setupPeer(peerId)
await done.wait(100.millis) await done.wait(100.millis)
test "Should send account to new peers": test "Should send account to new peers":
let pricing = Pricing.example let pricing = Pricing.example
proc sendAccount(peer: PeerID, account: Account) = proc sendAccount(peer: PeerID, account: Account) {.gcsafe, async.} =
check account.address == pricing.address check account.address == pricing.address
done.complete() done.complete()
let let
network = BlockExcNetwork(request: BlockExcRequest( network = BlockExcNetwork(
sendAccount: sendAccount request: BlockExcRequest(
sendAccount: sendAccount
)) ))
localStore = CacheStore.new() localStore = CacheStore.new()
@ -120,7 +121,7 @@ suite "NetworkStore engine basic":
pendingBlocks) pendingBlocks)
engine.pricing = pricing.some engine.pricing = pricing.some
engine.setupPeer(peerId) await engine.setupPeer(peerId)
await done.wait(100.millis) await done.wait(100.millis)
@ -197,7 +198,7 @@ suite "NetworkStore engine handlers":
test "Should handle want list - `dont-have`": test "Should handle want list - `dont-have`":
let wantList = makeWantList(blocks.mapIt( it.cid ), sendDontHave = true) let wantList = makeWantList(blocks.mapIt( it.cid ), sendDontHave = true)
proc sendPresence(peerId: PeerID, presence: seq[BlockPresence]) = proc sendPresence(peerId: PeerID, presence: seq[BlockPresence]) {.gcsafe, async.} =
check presence.mapIt( it.cid ) == wantList.entries.mapIt( it.`block` ) check presence.mapIt( it.cid ) == wantList.entries.mapIt( it.`block` )
for p in presence: for p in presence:
check: check:
@ -215,7 +216,7 @@ suite "NetworkStore engine handlers":
test "Should handle want list - `dont-have` some blocks": test "Should handle want list - `dont-have` some blocks":
let wantList = makeWantList(blocks.mapIt( it.cid ), sendDontHave = true) let wantList = makeWantList(blocks.mapIt( it.cid ), sendDontHave = true)
proc sendPresence(peerId: PeerID, presence: seq[BlockPresence]) = proc sendPresence(peerId: PeerID, presence: seq[BlockPresence]) {.gcsafe, async.} =
check presence.mapIt( it.cid ) == blocks[2..blocks.high].mapIt( it.cid.data.buffer ) check presence.mapIt( it.cid ) == blocks[2..blocks.high].mapIt( it.cid.data.buffer )
for p in presence: for p in presence:
check: check:
@ -252,7 +253,7 @@ suite "NetworkStore engine handlers":
peerContext.peerPrices = blocks.mapIt((it.cid, rand(uint16).u256)).toTable peerContext.peerPrices = blocks.mapIt((it.cid, rand(uint16).u256)).toTable
engine.network = BlockExcNetwork(request: BlockExcRequest( engine.network = BlockExcNetwork(request: BlockExcRequest(
sendPayment: proc(receiver: PeerID, payment: SignedState) = sendPayment: proc(receiver: PeerID, payment: SignedState) {.gcsafe, async.} =
let amount = blocks.mapIt(peerContext.peerPrices[it.cid]).foldl(a+b) let amount = blocks.mapIt(peerContext.peerPrices[it.cid]).foldl(a+b)
let balances = !payment.state.outcome.balances(Asset) let balances = !payment.state.outcome.balances(Asset)
check receiver == peerId check receiver == peerId
@ -349,7 +350,7 @@ suite "Task Handler":
test "Should send want-blocks in priority order": test "Should send want-blocks in priority order":
proc sendBlocks( proc sendBlocks(
id: PeerID, id: PeerID,
blks: seq[bt.Block]) {.gcsafe.} = blks: seq[bt.Block]) {.gcsafe, async.} =
check blks.len == 2 check blks.len == 2
check: check:
blks[1].cid == blocks[0].cid blks[1].cid == blocks[0].cid
@ -386,7 +387,7 @@ suite "Task Handler":
let missing = @[bt.Block.new("missing".toBytes).tryGet()] let missing = @[bt.Block.new("missing".toBytes).tryGet()]
let price = (!engine.pricing).price let price = (!engine.pricing).price
proc sendPresence(id: PeerID, presence: seq[BlockPresence]) = proc sendPresence(id: PeerID, presence: seq[BlockPresence]) {.gcsafe, async.} =
check presence.mapIt(!Presence.init(it)) == @[ check presence.mapIt(!Presence.init(it)) == @[
Presence(cid: present[0].cid, have: true, price: price), Presence(cid: present[0].cid, have: true, price: price),
Presence(cid: present[1].cid, have: true, price: price), Presence(cid: present[1].cid, have: true, price: price),

View File

@ -15,7 +15,7 @@ import pkg/codex/blockexchange
import ../helpers import ../helpers
import ../examples import ../examples
suite "NetworkStore network": suite "Network - Handlers":
let let
rng = Rng.instance() rng = Rng.instance()
seckey = PrivateKey.random(rng[]).tryGet() seckey = PrivateKey.random(rng[]).tryGet()
@ -110,7 +110,7 @@ suite "NetworkStore network":
await done.wait(500.millis) await done.wait(500.millis)
test "handles account messages": test "Handles account messages":
let account = Account(address: EthAddress.example) let account = Account(address: EthAddress.example)
proc handleAccount(peer: PeerID, received: Account) {.gcsafe, async.} = proc handleAccount(peer: PeerID, received: Account) {.gcsafe, async.} =
@ -124,7 +124,7 @@ suite "NetworkStore network":
await done.wait(100.millis) await done.wait(100.millis)
test "handles payment messages": test "Handles payment messages":
let payment = SignedState.example let payment = SignedState.example
proc handlePayment(peer: PeerID, received: SignedState) {.gcsafe, async.} = proc handlePayment(peer: PeerID, received: SignedState) {.gcsafe, async.} =
@ -138,7 +138,7 @@ suite "NetworkStore network":
await done.wait(100.millis) await done.wait(100.millis)
suite "NetworkStore Network - e2e": suite "Network - Senders":
let let
chunker = RandomChunker.new(Rng.instance(), size = 1024, chunkSize = 256) chunker = RandomChunker.new(Rng.instance(), size = 1024, chunkSize = 256)
@ -179,7 +179,7 @@ suite "NetworkStore Network - e2e":
switch1.stop(), switch1.stop(),
switch2.stop()) switch2.stop())
test "broadcast want list": test "Send want list":
proc wantListHandler(peer: PeerID, wantList: WantList) {.gcsafe, async.} = proc wantListHandler(peer: PeerID, wantList: WantList) {.gcsafe, async.} =
# check that we got the correct amount of entries # check that we got the correct amount of entries
check wantList.entries.len == 4 check wantList.entries.len == 4
@ -195,7 +195,7 @@ suite "NetworkStore Network - e2e":
done.complete() done.complete()
network2.handlers.onWantList = wantListHandler network2.handlers.onWantList = wantListHandler
network1.broadcastWantList( await network1.sendWantList(
switch2.peerInfo.peerId, switch2.peerInfo.peerId,
blocks.mapIt( it.cid ), blocks.mapIt( it.cid ),
1, true, WantType.wantHave, 1, true, WantType.wantHave,
@ -203,19 +203,19 @@ suite "NetworkStore Network - e2e":
await done.wait(500.millis) await done.wait(500.millis)
test "broadcast blocks": test "send blocks":
proc blocksHandler(peer: PeerID, blks: seq[bt.Block]) {.gcsafe, async.} = proc blocksHandler(peer: PeerID, blks: seq[bt.Block]) {.gcsafe, async.} =
check blks == blocks check blks == blocks
done.complete() done.complete()
network2.handlers.onBlocks = blocksHandler network2.handlers.onBlocks = blocksHandler
network1.broadcastBlocks( await network1.sendBlocks(
switch2.peerInfo.peerId, switch2.peerInfo.peerId,
blocks) blocks)
await done.wait(500.millis) await done.wait(500.millis)
test "broadcast presence": test "send presence":
proc presenceHandler( proc presenceHandler(
peer: PeerID, peer: PeerID,
precense: seq[BlockPresence]) {.gcsafe, async.} = precense: seq[BlockPresence]) {.gcsafe, async.} =
@ -227,7 +227,7 @@ suite "NetworkStore Network - e2e":
network2.handlers.onPresence = presenceHandler network2.handlers.onPresence = presenceHandler
network1.broadcastBlockPresence( await network1.sendBlockPresence(
switch2.peerInfo.peerId, switch2.peerInfo.peerId,
blocks.mapIt( blocks.mapIt(
BlockPresence( BlockPresence(
@ -237,7 +237,7 @@ suite "NetworkStore Network - e2e":
await done.wait(500.millis) await done.wait(500.millis)
test "broadcasts account": test "send account":
let account = Account(address: EthAddress.example) let account = Account(address: EthAddress.example)
proc handleAccount(peer: PeerID, received: Account) {.gcsafe, async.} = proc handleAccount(peer: PeerID, received: Account) {.gcsafe, async.} =
@ -246,11 +246,10 @@ suite "NetworkStore Network - e2e":
network2.handlers.onAccount = handleAccount network2.handlers.onAccount = handleAccount
network1.broadcastAccount(switch2.peerInfo.peerId, account) await network1.sendAccount(switch2.peerInfo.peerId, account)
await done.wait(500.millis) await done.wait(500.millis)
test "broadcasts payment": test "send payment":
let payment = SignedState.example let payment = SignedState.example
proc handlePayment(peer: PeerID, received: SignedState) {.gcsafe, async.} = proc handlePayment(peer: PeerID, received: SignedState) {.gcsafe, async.} =
@ -259,6 +258,50 @@ suite "NetworkStore Network - e2e":
network2.handlers.onPayment = handlePayment network2.handlers.onPayment = handlePayment
network1.broadcastPayment(switch2.peerInfo.peerId, payment) await network1.sendPayment(switch2.peerInfo.peerId, payment)
await done.wait(500.millis) await done.wait(500.millis)
suite "Network - Test Limits":
var
switch1, switch2: Switch
network1, network2: BlockExcNetwork
blocks: seq[bt.Block]
done: Future[void]
setup:
done = newFuture[void]()
switch1 = newStandardSwitch()
switch2 = newStandardSwitch()
await switch1.start()
await switch2.start()
network1 = BlockExcNetwork.new(
switch = switch1,
maxInflight = 0)
switch1.mount(network1)
network2 = BlockExcNetwork.new(
switch = switch2)
switch2.mount(network2)
await switch1.connect(
switch2.peerInfo.peerId,
switch2.peerInfo.addrs)
teardown:
await allFuturesThrowing(
switch1.stop(),
switch2.stop())
test "Concurrent Sends":
let account = Account(address: EthAddress.example)
network2.handlers.onAccount =
proc(peer: PeerID, received: Account) {.gcsafe, async.} =
check false
let fut = network1.send(
switch2.peerInfo.peerId,
Message(account: AccountMessage.init(account)))
await sleepAsync(100.millis)
check not fut.finished