From 3c4d4dc29f718f6cee7019b8d77b9856c1695683 Mon Sep 17 00:00:00 2001 From: Mark Spanbroek Date: Thu, 8 Apr 2021 14:27:49 +0200 Subject: [PATCH] Exchange pricing when connecting to new peer --- dagger/bitswap.nim | 4 ++++ dagger/bitswap/engine.nim | 12 ++++++++++++ dagger/bitswap/network.nim | 6 ++++++ tests/dagger/bitswap/testbitswap.nim | 11 +++++++++++ tests/dagger/bitswap/testengine.nim | 16 ++++++++++++++++ 5 files changed, 49 insertions(+) diff --git a/dagger/bitswap.nim b/dagger/bitswap.nim index 0e3d9a4d..0c15bcaf 100644 --- a/dagger/bitswap.nim +++ b/dagger/bitswap.nim @@ -174,10 +174,14 @@ proc new*( blocks: seq[bt.Block]) {.gcsafe.} = engine.blocksHandler(peer, blocks) + proc pricingHandler(peer: PeerId, pricing: Pricing) = + engine.pricingHandler(peer, pricing) + network.handlers = BitswapHandlers( onWantList: blockWantListHandler, onBlocks: blocksHandler, onPresence: blockPresenceHandler, + onPricing: pricingHandler ) return b diff --git a/dagger/bitswap/engine.nim b/dagger/bitswap/engine.nim index 87de93d0..22044aad 100644 --- a/dagger/bitswap/engine.nim +++ b/dagger/bitswap/engine.nim @@ -42,6 +42,7 @@ type bytesRecv*: int # bytes received from remote exchanged*: int # times peer has exchanged with us lastExchange*: Moment # last time peer has exchanged with us + pricing*: ?Pricing # optional bandwidth price for this peer BitswapEngine* = ref object of RootObj localStore*: BlockStore # where we localStore blocks for this instance @@ -51,6 +52,7 @@ type peersPerRequest: int # max number of peers to request from scheduleTask*: TaskScheduler # schedule a new task with the task runner request*: BitswapRequest # bitswap network requests + pricing*: ?Pricing # optional bandwidth pricing proc contains*(a: AsyncHeapQueue[Entry], b: Cid): bool = ## Convenience method to check for entry prepense @@ -259,6 +261,13 @@ proc wantListHandler*( if not b.scheduleTask(peerCtx): trace "Unable to schedule task for peer", peer +proc pricingHandler*(engine: BitswapEngine, peer: PeerID, pricing: Pricing) = + let context = engine.getPeerCtx(peer) + if context.isNil: + return + + context.pricing = pricing.some + proc setupPeer*(b: BitswapEngine, peer: PeerID) = ## Perform initial setup, such as want ## list exchange @@ -274,6 +283,9 @@ proc setupPeer*(b: BitswapEngine, peer: PeerID) = if b.wantList.len > 0: b.request.sendWantList(peer, b.wantList, full = true) + if b.pricing.isSome: + b.request.sendPricing(peer, b.pricing.get) + proc dropPeer*(b: BitswapEngine, peer: PeerID) = ## Cleanup disconnected peer ## diff --git a/dagger/bitswap/network.nim b/dagger/bitswap/network.nim index bdf93eb8..50ff7373 100644 --- a/dagger/bitswap/network.nim +++ b/dagger/bitswap/network.nim @@ -54,11 +54,13 @@ type BlocksBroadcaster* = proc(peer: PeerID, presence: seq[bt.Block]) {.gcsafe.} PresenceBroadcaster* = proc(peer: PeerID, presence: seq[BlockPresence]) {.gcsafe.} + PricingBroadcaster* = proc(peer: PeerID, pricing: Pricing) {.gcsafe.} BitswapRequest* = object sendWantList*: WantListBroadcaster sendBlocks*: BlocksBroadcaster sendPresence*: PresenceBroadcaster + sendPricing*: PricingBroadcaster BitswapNetwork* = ref object of LPProtocol peers*: Table[PeerID, NetworkPeer] @@ -348,10 +350,14 @@ proc new*( proc sendPresence(id: PeerID, presence: seq[BlockPresence]) {.gcsafe.} = b.broadcastBlockPresence(id, presence) + proc sendPricing(id: PeerID, pricing: Pricing) = + b.broadcastPricing(id, pricing) + b.request = BitswapRequest( sendWantList: sendWantList, sendBlocks: sendBlocks, sendPresence: sendPresence, + sendPricing: sendPricing ) b.init() diff --git a/tests/dagger/bitswap/testbitswap.nim b/tests/dagger/bitswap/testbitswap.nim index 1a4f434c..5dcbb8ac 100644 --- a/tests/dagger/bitswap/testbitswap.nim +++ b/tests/dagger/bitswap/testbitswap.nim @@ -16,6 +16,7 @@ import pkg/dagger/utils/asyncheapqueue import ./utils import ../helpers +import ../examples suite "Bitswap engine - 2 nodes": @@ -28,6 +29,7 @@ suite "Bitswap engine - 2 nodes": var switch1, switch2: Switch wallet1, wallet2: Wallet + pricing1, pricing2: Pricing network1, network2: BitswapNetwork bitswap1, bitswap2: Bitswap awaiters: seq[Future[void]] @@ -42,6 +44,8 @@ suite "Bitswap engine - 2 nodes": switch2 = newStandardSwitch() wallet1 = Wallet.init(EthPrivateKey.random()) wallet2 = Wallet.init(EthPrivateKey.random()) + pricing1 = Pricing.example + pricing2 = Pricing.example awaiters.add(await switch1.start()) awaiters.add(await switch2.start()) @@ -65,6 +69,9 @@ suite "Bitswap engine - 2 nodes": bitswap1.engine.wantList = blocks2.mapIt( it.cid ) bitswap2.engine.wantList = blocks1.mapIt( it.cid ) + bitswap1.engine.pricing = pricing1.some + bitswap2.engine.pricing = pricing2.some + await switch1.connect( switch2.peerInfo.peerId, switch2.peerInfo.addrs) @@ -93,6 +100,10 @@ suite "Bitswap engine - 2 nodes": peerCtx2.peerHave.mapIt( $it ).sorted(cmp[string]) == bitswap1.engine.wantList.mapIt( $it ).sorted(cmp[string]) + test "exchanges pricing on connect": + check peerCtx1.pricing == pricing1.some + check peerCtx2.pricing == pricing2.some + test "should send want-have for block": let blk = bt.Block.new("Block 1".toBytes) bitswap2.engine.localStore.putBlocks(@[blk]) diff --git a/tests/dagger/bitswap/testengine.nim b/tests/dagger/bitswap/testengine.nim index de6209f5..0e865f2a 100644 --- a/tests/dagger/bitswap/testengine.nim +++ b/tests/dagger/bitswap/testengine.nim @@ -14,6 +14,7 @@ import pkg/dagger/chunker import pkg/dagger/blocktype as bt import ../helpers +import ../examples suite "Bitswap engine basic": let @@ -52,6 +53,21 @@ suite "Bitswap engine basic": await done + test "sends pricing to new peers": + let pricing = Pricing.example + + proc sendPricing(peer: PeerID, toBeSent: Pricing) = + check toBeSent == pricing + done.complete() + + let request = BitswapRequest(sendPricing: sendPricing) + let engine = BitswapEngine.new(MemoryStore.new, request) + engine.pricing = pricing.some + + engine.setupPeer(peerId) + + await done.wait(100.millis) + suite "Bitswap engine handlers": let rng = Rng.instance()