Remove debt ratio
Rationale: selecting peers based on debt ratio in bytes is no longer needed when the bandwidth for each chunk is compensated.
This commit is contained in:
parent
fa05bffeac
commit
e3b68f96d8
|
@ -43,8 +43,7 @@ type
|
||||||
taskHandler: TaskHandler # handler provided by the engine called by the runner
|
taskHandler: TaskHandler # handler provided by the engine called by the runner
|
||||||
|
|
||||||
proc bitswapTaskRunner(b: Bitswap) {.async.} =
|
proc bitswapTaskRunner(b: Bitswap) {.async.} =
|
||||||
## process tasks in order of least amount of
|
## process tasks
|
||||||
## debt ratio
|
|
||||||
##
|
##
|
||||||
|
|
||||||
while b.bitswapRunning:
|
while b.bitswapRunning:
|
||||||
|
|
|
@ -87,25 +87,13 @@ proc requestBlocks*(
|
||||||
blocks.add(
|
blocks.add(
|
||||||
b.pendingBlocks.addOrAwait(c).wait(timeout))
|
b.pendingBlocks.addOrAwait(c).wait(timeout))
|
||||||
|
|
||||||
proc cmp(a, b: BitswapPeerCtx): int =
|
|
||||||
if a.debtRatio == b.debtRatio:
|
|
||||||
0
|
|
||||||
elif a.debtRatio > b.debtRatio:
|
|
||||||
1
|
|
||||||
else:
|
|
||||||
-1
|
|
||||||
|
|
||||||
# sort the peers so that we request
|
var peers = b.peers
|
||||||
# the blocks from a peer with the lowest
|
|
||||||
# debt ratio
|
|
||||||
var sortedPeers = b.peers.sorted(
|
|
||||||
cmp
|
|
||||||
)
|
|
||||||
|
|
||||||
# get the first peer with at least one (any)
|
# get the first peer with at least one (any)
|
||||||
# matching cid
|
# matching cid
|
||||||
var blockPeer: BitswapPeerCtx
|
var blockPeer: BitswapPeerCtx
|
||||||
for i, p in sortedPeers:
|
for i, p in peers:
|
||||||
let has = cids.anyIt(
|
let has = cids.anyIt(
|
||||||
it in p.peerHave
|
it in p.peerHave
|
||||||
)
|
)
|
||||||
|
@ -117,9 +105,9 @@ proc requestBlocks*(
|
||||||
# didn't find any peer with matching cids
|
# didn't find any peer with matching cids
|
||||||
# use the first one in the sorted array
|
# use the first one in the sorted array
|
||||||
if isNil(blockPeer):
|
if isNil(blockPeer):
|
||||||
blockPeer = sortedPeers[0]
|
blockPeer = peers[0]
|
||||||
|
|
||||||
sortedPeers.keepItIf(
|
peers.keepItIf(
|
||||||
it != blockPeer
|
it != blockPeer
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -130,7 +118,7 @@ proc requestBlocks*(
|
||||||
cids,
|
cids,
|
||||||
wantType = WantType.wantBlock) # we want this remote to send us a block
|
wantType = WantType.wantBlock) # we want this remote to send us a block
|
||||||
|
|
||||||
if sortedPeers.len == 0:
|
if peers.len == 0:
|
||||||
return blocks # no peers to send wants to
|
return blocks # no peers to send wants to
|
||||||
|
|
||||||
template sendWants(ctx: BitswapPeerCtx) =
|
template sendWants(ctx: BitswapPeerCtx) =
|
||||||
|
@ -141,10 +129,10 @@ proc requestBlocks*(
|
||||||
wantType = WantType.wantHave) # we only want to know if the peer has the block
|
wantType = WantType.wantHave) # we only want to know if the peer has the block
|
||||||
|
|
||||||
# filter out the peer we've already requested from
|
# filter out the peer we've already requested from
|
||||||
var stop = sortedPeers.high
|
var stop = peers.high
|
||||||
if stop > b.peersPerRequest: stop = b.peersPerRequest
|
if stop > b.peersPerRequest: stop = b.peersPerRequest
|
||||||
trace "Sending want list requests to remaining peers", count = stop + 1
|
trace "Sending want list requests to remaining peers", count = stop + 1
|
||||||
for p in sortedPeers[0..stop]:
|
for p in peers[0..stop]:
|
||||||
sendWants(p)
|
sendWants(p)
|
||||||
|
|
||||||
return blocks
|
return blocks
|
||||||
|
|
|
@ -15,10 +15,7 @@ type
|
||||||
BitswapPeerCtx* = ref object of RootObj
|
BitswapPeerCtx* = ref object of RootObj
|
||||||
id*: PeerID
|
id*: PeerID
|
||||||
peerPrices*: Table[Cid, UInt256] # remote peer have list including price
|
peerPrices*: Table[Cid, UInt256] # remote peer have list including price
|
||||||
peerHave*: seq[Cid] # remote peers have lists
|
|
||||||
peerWants*: seq[Entry] # remote peers want lists
|
peerWants*: seq[Entry] # remote peers want lists
|
||||||
bytesSent*: int # bytes sent to remote
|
|
||||||
bytesRecv*: int # bytes received from remote
|
|
||||||
exchanged*: int # times peer has exchanged with us
|
exchanged*: int # times peer has exchanged with us
|
||||||
lastExchange*: Moment # last time peer has exchanged with us
|
lastExchange*: Moment # last time peer has exchanged with us
|
||||||
pricing*: ?Pricing # optional bandwidth price for this peer
|
pricing*: ?Pricing # optional bandwidth price for this peer
|
||||||
|
@ -30,12 +27,6 @@ proc contains*(a: openArray[BitswapPeerCtx], b: PeerID): bool =
|
||||||
|
|
||||||
a.anyIt( it.id == b )
|
a.anyIt( it.id == b )
|
||||||
|
|
||||||
proc debtRatio*(b: BitswapPeerCtx): float =
|
|
||||||
b.bytesSent / (b.bytesRecv + 1)
|
|
||||||
|
|
||||||
proc `<`*(a, b: BitswapPeerCtx): bool =
|
|
||||||
a.debtRatio < b.debtRatio
|
|
||||||
|
|
||||||
func updatePresence*(context: BitswapPeerCtx, presence: Presence) =
|
func updatePresence*(context: BitswapPeerCtx, presence: Presence) =
|
||||||
let cid = presence.cid
|
let cid = presence.cid
|
||||||
let price = presence.price
|
let price = presence.price
|
||||||
|
|
|
@ -185,99 +185,6 @@ suite "Bitswap engine handlers":
|
||||||
check peerCtx.peerHave.contains(cid)
|
check peerCtx.peerHave.contains(cid)
|
||||||
check peerCtx.peerPrices[cid] == price
|
check peerCtx.peerPrices[cid] == price
|
||||||
|
|
||||||
suite "Bitswap engine blocks":
|
|
||||||
|
|
||||||
let
|
|
||||||
rng = Rng.instance()
|
|
||||||
chunker = newRandomChunker(Rng.instance(), size = 2048, chunkSize = 256)
|
|
||||||
blocks = chunker.mapIt( bt.Block.new(it) )
|
|
||||||
wallet = WalletRef.example
|
|
||||||
|
|
||||||
var
|
|
||||||
engine: BitswapEngine
|
|
||||||
peersCtx: seq[BitswapPeerCtx]
|
|
||||||
peers: seq[PeerID]
|
|
||||||
done: Future[void]
|
|
||||||
|
|
||||||
setup:
|
|
||||||
done = newFuture[void]()
|
|
||||||
engine = BitswapEngine.new(MemoryStore.new(), wallet)
|
|
||||||
peersCtx = @[]
|
|
||||||
|
|
||||||
for i in 0..3:
|
|
||||||
let seckey = PrivateKey.random(rng[]).tryGet()
|
|
||||||
peers.add(PeerID.init(seckey.getKey().tryGet()).tryGet())
|
|
||||||
|
|
||||||
peersCtx.add(BitswapPeerCtx(
|
|
||||||
id: peers[i]
|
|
||||||
))
|
|
||||||
|
|
||||||
# set debt ratios
|
|
||||||
|
|
||||||
# ratio > 1
|
|
||||||
peersCtx[0].bytesSent = 1000
|
|
||||||
peersCtx[0].bytesRecv = 100
|
|
||||||
|
|
||||||
# ratio < 1
|
|
||||||
peersCtx[1].bytesSent = 100
|
|
||||||
peersCtx[1].bytesRecv = 1000
|
|
||||||
|
|
||||||
# ratio > 1
|
|
||||||
peersCtx[2].bytesSent = 100
|
|
||||||
peersCtx[2].bytesRecv = 99
|
|
||||||
|
|
||||||
# ratio == 0
|
|
||||||
peersCtx[3].bytesSent = 100
|
|
||||||
peersCtx[3].bytesRecv = 100
|
|
||||||
|
|
||||||
engine.peers = peersCtx
|
|
||||||
|
|
||||||
test "should select peer with least debt ratio":
|
|
||||||
proc sendWantList(
|
|
||||||
id: PeerID,
|
|
||||||
cids: seq[Cid],
|
|
||||||
priority: int32 = 0,
|
|
||||||
cancel: bool = false,
|
|
||||||
wantType: WantType = WantType.wantHave,
|
|
||||||
full: bool = false,
|
|
||||||
sendDontHave: bool = false) {.gcsafe.} =
|
|
||||||
check cids == blocks.mapIt( it.cid )
|
|
||||||
if peersCtx[1].id == id: # second peer has the least debt ratio
|
|
||||||
check wantType == WantType.wantBlock
|
|
||||||
engine.resolveBlocks(blocks)
|
|
||||||
else:
|
|
||||||
check wantType == WantType.wantHave
|
|
||||||
|
|
||||||
engine.request.sendWantList = sendWantList
|
|
||||||
|
|
||||||
let pending = engine.requestBlocks(blocks.mapIt( it.cid ))
|
|
||||||
let resolved = await allFinished(pending)
|
|
||||||
check resolved.mapIt( it.read ) == blocks
|
|
||||||
|
|
||||||
test "should select peer with least debt ratio and have CIDs":
|
|
||||||
proc sendWantList(
|
|
||||||
id: PeerID,
|
|
||||||
cids: seq[Cid],
|
|
||||||
priority: int32 = 0,
|
|
||||||
cancel: bool = false,
|
|
||||||
wantType: WantType = WantType.wantHave,
|
|
||||||
full: bool = false,
|
|
||||||
sendDontHave: bool = false) {.gcsafe.} =
|
|
||||||
check cids == blocks.mapIt( it.cid )
|
|
||||||
if peersCtx[3].id == id: # 4th peer has the least debt ratio and has cids
|
|
||||||
check wantType == WantType.wantBlock
|
|
||||||
engine.resolveBlocks(blocks)
|
|
||||||
else:
|
|
||||||
check wantType == WantType.wantHave
|
|
||||||
|
|
||||||
engine.request.sendWantList = sendWantList
|
|
||||||
|
|
||||||
peersCtx[3].peerHave = blocks.mapIt( it.cid )
|
|
||||||
let pending = engine.requestBlocks(blocks.mapIt( it.cid ))
|
|
||||||
let resolved = await allFinished(pending)
|
|
||||||
check resolved.mapIt( it.read ) == blocks
|
|
||||||
|
|
||||||
|
|
||||||
suite "Task Handler":
|
suite "Task Handler":
|
||||||
|
|
||||||
let
|
let
|
||||||
|
|
Loading…
Reference in New Issue