Add block prices to peer context
This commit is contained in:
parent
0ac876dbc3
commit
0e34b69d7d
|
@ -16,6 +16,7 @@ import pkg/libp2p
|
||||||
import pkg/libp2p/errors
|
import pkg/libp2p/errors
|
||||||
|
|
||||||
import ./protobuf/bitswap as pb
|
import ./protobuf/bitswap as pb
|
||||||
|
import ./protobuf/presence
|
||||||
import ../blocktype as bt
|
import ../blocktype as bt
|
||||||
import ../stores/blockstore
|
import ../stores/blockstore
|
||||||
import ../utils/asyncheapqueue
|
import ../utils/asyncheapqueue
|
||||||
|
@ -151,7 +152,7 @@ proc requestBlocks*(
|
||||||
proc blockPresenceHandler*(
|
proc blockPresenceHandler*(
|
||||||
b: BitswapEngine,
|
b: BitswapEngine,
|
||||||
peer: PeerID,
|
peer: PeerID,
|
||||||
presence: seq[BlockPresence]) =
|
blocks: seq[BlockPresence]) =
|
||||||
## Handle block presence
|
## Handle block presence
|
||||||
##
|
##
|
||||||
|
|
||||||
|
@ -159,11 +160,9 @@ proc blockPresenceHandler*(
|
||||||
if isNil(peerCtx):
|
if isNil(peerCtx):
|
||||||
return
|
return
|
||||||
|
|
||||||
for blk in presence:
|
for blk in blocks:
|
||||||
let cid = Cid.init(blk.cid).get()
|
if presence =? Presence.init(blk):
|
||||||
if cid notin peerCtx.peerHave:
|
peerCtx.updatePresence(presence)
|
||||||
if blk.type == BlockPresenceType.presenceHave:
|
|
||||||
peerCtx.peerHave.add(cid)
|
|
||||||
|
|
||||||
proc scheduleTasks(b: BitswapEngine, blocks: seq[bt.Block]) =
|
proc scheduleTasks(b: BitswapEngine, blocks: seq[bt.Block]) =
|
||||||
trace "Schedule a task for new blocks"
|
trace "Schedule a task for new blocks"
|
||||||
|
|
|
@ -1,10 +1,12 @@
|
||||||
import std/sequtils
|
import std/sequtils
|
||||||
|
import std/tables
|
||||||
import pkg/libp2p
|
import pkg/libp2p
|
||||||
import pkg/chronos
|
import pkg/chronos
|
||||||
import pkg/nitro
|
import pkg/nitro
|
||||||
import pkg/questionable
|
import pkg/questionable
|
||||||
import ./protobuf/bitswap
|
import ./protobuf/bitswap
|
||||||
import ./protobuf/payments
|
import ./protobuf/payments
|
||||||
|
import ./protobuf/presence
|
||||||
|
|
||||||
export payments
|
export payments
|
||||||
export nitro
|
export nitro
|
||||||
|
@ -12,6 +14,7 @@ export nitro
|
||||||
type
|
type
|
||||||
BitswapPeerCtx* = ref object of RootObj
|
BitswapPeerCtx* = ref object of RootObj
|
||||||
id*: PeerID
|
id*: PeerID
|
||||||
|
peerPrices*: Table[Cid, UInt256] # remote peer have list including price
|
||||||
peerHave*: seq[Cid] # remote peers have lists
|
peerHave*: seq[Cid] # remote peers have lists
|
||||||
peerWants*: seq[Entry] # remote peers want lists
|
peerWants*: seq[Entry] # remote peers want lists
|
||||||
bytesSent*: int # bytes sent to remote
|
bytesSent*: int # bytes sent to remote
|
||||||
|
@ -33,3 +36,13 @@ proc debtRatio*(b: BitswapPeerCtx): float =
|
||||||
proc `<`*(a, b: BitswapPeerCtx): bool =
|
proc `<`*(a, b: BitswapPeerCtx): bool =
|
||||||
a.debtRatio < b.debtRatio
|
a.debtRatio < b.debtRatio
|
||||||
|
|
||||||
|
func updatePresence*(context: BitswapPeerCtx, presence: Presence) =
|
||||||
|
let cid = presence.cid
|
||||||
|
let price = presence.price
|
||||||
|
|
||||||
|
if cid notin context.peerHave and presence.have:
|
||||||
|
context.peerHave.add(cid)
|
||||||
|
context.peerPrices[cid] = price
|
||||||
|
elif cid in context.peerHave and not presence.have:
|
||||||
|
context.peerHave.keepItIf(it != cid)
|
||||||
|
context.peerPrices.del(cid)
|
||||||
|
|
|
@ -10,6 +10,7 @@ import pkg/dagger/p2p/rng
|
||||||
import pkg/dagger/bitswap
|
import pkg/dagger/bitswap
|
||||||
import pkg/dagger/bitswap/pendingblocks
|
import pkg/dagger/bitswap/pendingblocks
|
||||||
import pkg/dagger/bitswap/engine/payments
|
import pkg/dagger/bitswap/engine/payments
|
||||||
|
import pkg/dagger/bitswap/protobuf/presence
|
||||||
import pkg/dagger/stores/memorystore
|
import pkg/dagger/stores/memorystore
|
||||||
import pkg/dagger/chunker
|
import pkg/dagger/chunker
|
||||||
import pkg/dagger/blocktype as bt
|
import pkg/dagger/blocktype as bt
|
||||||
|
@ -166,15 +167,20 @@ suite "Bitswap engine handlers":
|
||||||
await done.wait(100.millis)
|
await done.wait(100.millis)
|
||||||
|
|
||||||
test "should handle block presence":
|
test "should handle block presence":
|
||||||
|
let price = UInt256.example
|
||||||
engine.blockPresenceHandler(
|
engine.blockPresenceHandler(
|
||||||
peerId,
|
peerId,
|
||||||
blocks.mapIt(
|
blocks.mapIt(
|
||||||
BlockPresence(
|
PresenceMessage.init(
|
||||||
cid: it.cid.data.buffer,
|
Presence(
|
||||||
`type`: BlockPresenceType.presenceHave
|
cid: it.cid,
|
||||||
)))
|
have: true,
|
||||||
|
price: price
|
||||||
|
))))
|
||||||
|
|
||||||
check peerCtx.peerHave == blocks.mapIt( it.cid )
|
for cid in blocks.mapIt(it.cid):
|
||||||
|
check peerCtx.peerHave.contains(cid)
|
||||||
|
check peerCtx.peerPrices[cid] == price
|
||||||
|
|
||||||
suite "Bitswap engine blocks":
|
suite "Bitswap engine blocks":
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue