Block download (#304)

* track inflight requests

* preperly handle precense updates

* trace number of of scheduled blocks

* invoke `payForBlocks` at the correct time

* reduntant block info on want list updates

* don't update prices in task handler

* PeerID -> PeerId

* cleanup

* proper log topic

* better chronicles topic filtering

* more trace logging

* sort want blocks

* wip - fix tests

* wip - fix tests, presence changes

* fix small test issue

* return price

* payment related changes

* misc

* re-enable payment tests

* fix warn wording

* fix `u256` conversion

* minor misc changes

* don't idle for so long on `encode`

* logging

* move buff

* disable cache by default

* disable cache by default

* fix streamOneBlock

* log node stopping/exiting

* trace logging

* don't stringify cid

* use `self`

* quick cleanup

* rename enums

* rename enums

* turns out we don't needs this test

* fix wording
This commit is contained in:
Dmitriy Ryajov 2022-11-15 09:46:21 -06:00 committed by GitHub
parent 456e675b8a
commit 5abf80cc69
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
25 changed files with 477 additions and 267 deletions

View File

@ -43,7 +43,7 @@ when isMainModule:
quit QuitFailure quit QuitFailure
if config.nat == ValidIpAddress.init("127.0.0.1"): if config.nat == ValidIpAddress.init("127.0.0.1"):
warn "`--nat` is set to local loopback, your node wont be properly announce over the DHT" warn "`--nat` is set to loopback, your node wont properly announce over the DHT"
if not(checkAndCreateDataDir((config.dataDir).string)): if not(checkAndCreateDataDir((config.dataDir).string)):
# We are unable to access/create data folder or data folder's # We are unable to access/create data folder or data folder's
@ -89,9 +89,12 @@ when isMainModule:
proc SIGTERMHandler(signal: cint) {.noconv.} = proc SIGTERMHandler(signal: cint) {.noconv.} =
notice "Shutting down after having received SIGTERM" notice "Shutting down after having received SIGTERM"
waitFor server.stop() waitFor server.stop()
notice "Stopped Codex"
c_signal(ansi_c.SIGTERM, SIGTERMHandler) c_signal(ansi_c.SIGTERM, SIGTERMHandler)
waitFor server.start() waitFor server.start()
notice "Exited codex"
of StartUpCommand.initNode: of StartUpCommand.initNode:
discard discard

View File

@ -27,7 +27,7 @@ import ../../stores/blockstore
import ./pendingblocks import ./pendingblocks
logScope: logScope:
topics = "codex discovery engine" topics = "codex discoveryengine"
declareGauge(codex_inflight_discovery, "inflight discovery requests") declareGauge(codex_inflight_discovery, "inflight discovery requests")

View File

@ -15,6 +15,7 @@ import std/algorithm
import pkg/chronos import pkg/chronos
import pkg/chronicles import pkg/chronicles
import pkg/libp2p import pkg/libp2p
import pkg/stint
import ../../stores/blockstore import ../../stores/blockstore
import ../../blocktype as bt import ../../blocktype as bt
@ -33,7 +34,7 @@ import ./pendingblocks
export peers, pendingblocks, payments, discovery export peers, pendingblocks, payments, discovery
logScope: logScope:
topics = "codex blockexc engine" topics = "codex blockexcengine"
const const
DefaultMaxPeersPerRequest* = 10 DefaultMaxPeersPerRequest* = 10
@ -122,9 +123,10 @@ proc requestBlock*(
## Request a block from remotes ## Request a block from remotes
## ##
trace "Requesting block", cid trace "Requesting block", cid, peers = b.peers.len
if cid in b.pendingBlocks: if b.pendingBlocks.isInFlight(cid):
trace "Request handle already pending", cid
return await b.pendingBlocks.getWantHandle(cid, timeout) return await b.pendingBlocks.getWantHandle(cid, timeout)
let let
@ -134,6 +136,7 @@ proc requestBlock*(
peers = b.peers.selectCheapest(cid) peers = b.peers.selectCheapest(cid)
if peers.len <= 0: if peers.len <= 0:
trace "No cheapest peers, selecting first in list", cid
peers = toSeq(b.peers) # Get any peer peers = toSeq(b.peers) # Get any peer
if peers.len <= 0: if peers.len <= 0:
trace "No peers to request blocks from", cid trace "No peers to request blocks from", cid
@ -143,40 +146,67 @@ proc requestBlock*(
let let
blockPeer = peers[0] # get cheapest blockPeer = peers[0] # get cheapest
proc blockHandleMonitor() {.async.} =
try:
trace "Monigoring block handle", cid
b.pendingBlocks.setInFlight(cid, true)
discard await blk
trace "Block handle success", cid
except CatchableError as exc:
trace "Error block handle, disconnecting peer", cid, exc = exc.msg
# TODO: really, this is just a quick and dirty way of
# preventing hitting the same "bad" peer every time, however,
# we might as well discover this on or next iteration, so
# it doesn't mean that we're never talking to this peer again.
# TODO: we need a lot more work around peer selection and
# prioritization
# drop unresponsive peer
await b.network.switch.disconnect(blockPeer.id)
trace "Sending block request to peer", peer = blockPeer.id, cid
# monitor block handle
asyncSpawn blockHandleMonitor()
# request block # request block
await b.network.request.sendWantList( await b.network.request.sendWantList(
blockPeer.id, blockPeer.id,
@[cid], @[cid],
wantType = WantType.wantBlock) # we want this remote to send us a block wantType = WantType.WantBlock) # we want this remote to send us a block
if (peers.len - 1) == 0: if (peers.len - 1) == 0:
trace "Not enough peers to send want list to", cid trace "No peers to send want list to", cid
b.discovery.queueFindBlocksReq(@[cid]) b.discovery.queueFindBlocksReq(@[cid])
return await blk # no peers to send wants to return await blk # no peers to send wants to
# filter out the peer we've already requested from # filter out the peer we've already requested from
let stop = min(peers.high, b.peersPerRequest) let remaining = peers[1..min(peers.high, b.peersPerRequest)]
trace "Sending want list requests to remaining peers", count = stop + 1 trace "Sending want list to remaining peers", count = remaining.len
for p in peers[1..stop]: for p in remaining:
if cid notin p.peerHave: if cid notin p.peerHave:
# just send wants # just send wants
await b.network.request.sendWantList( await b.network.request.sendWantList(
p.id, p.id,
@[cid], @[cid],
wantType = WantType.wantHave) # we only want to know if the peer has the block wantType = WantType.WantHave) # we only want to know if the peer has the block
return await blk return await blk
proc blockPresenceHandler*( proc blockPresenceHandler*(
b: BlockExcEngine, b: BlockExcEngine,
peer: PeerID, peer: PeerId,
blocks: seq[BlockPresence]) {.async.} = blocks: seq[BlockPresence]) {.async.} =
## Handle block presence ## Handle block presence
## ##
trace "Received presence update for peer", peer trace "Received presence update for peer", peer, blocks = blocks.len
let peerCtx = b.peers.get(peer) let
if isNil(peerCtx): peerCtx = b.peers.get(peer)
wantList = toSeq(b.pendingBlocks.wantList)
if peerCtx.isNil:
return return
for blk in blocks: for blk in blocks:
@ -187,19 +217,29 @@ proc blockPresenceHandler*(
price = presence.price price = presence.price
trace "Updating precense" trace "Updating precense"
peerCtx.updatePresence(presence) peerCtx.setPresence(presence)
var let
cids = toSeq(b.pendingBlocks.wantList).filterIt( peerHave = peerCtx.peerHave
it in peerCtx.peerHave dontWantCids = peerHave.filterIt(
it notin wantList
) )
trace "Received presence update for cids", peer, count = cids.len if dontWantCids.len > 0:
if cids.len > 0: trace "Cleaning peer haves", peer, count = dontWantCids.len
await b.network.request.sendWantList( peerCtx.cleanPresence(dontWantCids)
peer,
cids, trace "Peer want/have", items = peerHave.len, wantList = wantList.len
wantType = WantType.wantBlock) # we want this remote to send us a block let
wantCids = wantList.filterIt(
it in peerHave
)
if wantCids.len > 0:
trace "Getting blocks based on updated precense", peer, count = wantCids.len
discard await allFinished(
wantCids.mapIt(b.requestBlock(it)))
trace "Requested blocks based on updated precense", peer, count = wantCids.len
# if none of the connected peers report our wants in their have list, # if none of the connected peers report our wants in their have list,
# fire up discovery # fire up discovery
@ -209,7 +249,7 @@ proc blockPresenceHandler*(
not b.peers.anyIt( cid in it.peerHave )) not b.peers.anyIt( cid in it.peerHave ))
proc scheduleTasks(b: BlockExcEngine, blocks: seq[bt.Block]) {.async.} = proc scheduleTasks(b: BlockExcEngine, blocks: seq[bt.Block]) {.async.} =
trace "Schedule a task for new blocks" trace "Schedule a task for new blocks", items = blocks.len
let let
cids = blocks.mapIt( it.cid ) cids = blocks.mapIt( it.cid )
@ -242,17 +282,19 @@ proc resolveBlocks*(b: BlockExcEngine, blocks: seq[bt.Block]) {.async.} =
proc payForBlocks(engine: BlockExcEngine, proc payForBlocks(engine: BlockExcEngine,
peer: BlockExcPeerCtx, peer: BlockExcPeerCtx,
blocks: seq[bt.Block]) {.async.} = blocks: seq[bt.Block]) {.async.} =
let sendPayment = engine.network.request.sendPayment trace "Paying for blocks", blocks = blocks.len
if sendPayment.isNil:
return
let cids = blocks.mapIt(it.cid) let
if payment =? engine.wallet.pay(peer, peer.price(cids)): sendPayment = engine.network.request.sendPayment
price = peer.price(blocks.mapIt(it.cid))
if payment =? engine.wallet.pay(peer, price):
trace "Sending payment for blocks", price
await sendPayment(peer.id, payment) await sendPayment(peer.id, payment)
proc blocksHandler*( proc blocksHandler*(
b: BlockExcEngine, b: BlockExcEngine,
peer: PeerID, peer: PeerId,
blocks: seq[bt.Block]) {.async.} = blocks: seq[bt.Block]) {.async.} =
## handle incoming blocks ## handle incoming blocks
## ##
@ -263,59 +305,85 @@ proc blocksHandler*(
trace "Unable to store block", cid = blk.cid trace "Unable to store block", cid = blk.cid
await b.resolveBlocks(blocks) await b.resolveBlocks(blocks)
let peerCtx = b.peers.get(peer) let
peerCtx = b.peers.get(peer)
if peerCtx != nil: if peerCtx != nil:
# we don't care about this blocks anymore, lets cleanup the list
await b.payForBlocks(peerCtx, blocks) await b.payForBlocks(peerCtx, blocks)
peerCtx.cleanPresence(blocks.mapIt( it.cid ))
proc wantListHandler*( proc wantListHandler*(
b: BlockExcEngine, b: BlockExcEngine,
peer: PeerID, peer: PeerId,
wantList: WantList) {.async.} = wantList: WantList) {.async.} =
## Handle incoming want lists ## Handle incoming want lists
## ##
trace "Got want list for peer", peer trace "Got want list for peer", peer, items = wantList.entries.len
let peerCtx = b.peers.get(peer) let peerCtx = b.peers.get(peer)
if isNil(peerCtx): if isNil(peerCtx):
return return
var dontHaves: seq[Cid] var
let entries = wantList.entries precense: seq[BlockPresence]
for e in entries:
let idx = peerCtx.peerWants.find(e) for e in wantList.entries:
if idx > -1: let
idx = peerCtx.peerWants.find(e)
logScope:
peer = peerCtx.id
cid = e.cid
wantType = $e.wantType
if idx < 0: # updating entry
trace "Processing new want list entry", cid = e.cid
let
have = await e.cid in b.localStore
price = @(
b.pricing.get(Pricing(price: 0.u256))
.price.toBytesBE)
if not have and e.sendDontHave:
trace "Adding dont have entry to precense response", cid = e.cid
precense.add(
BlockPresence(
cid: e.cid.data.buffer,
`type`: BlockPresenceType.DontHave,
price: price))
elif have and e.wantType == WantType.WantHave:
trace "Adding have entry to precense response", cid = e.cid
precense.add(
BlockPresence(
cid: e.cid.data.buffer,
`type`: BlockPresenceType.Have,
price: price))
elif e.wantType == WantType.WantBlock:
trace "Added entry to peer's want blocks list", cid = e.cid
peerCtx.peerWants.add(e)
else:
# peer doesn't want this block anymore # peer doesn't want this block anymore
if e.cancel: if e.cancel:
trace "Removing entry from peer want list"
peerCtx.peerWants.del(idx) peerCtx.peerWants.del(idx)
continue else:
trace "Updating entry in peer want list"
# peer might want to ask for the same cid with
# different want params
peerCtx.peerWants[idx] = e # update entry
peerCtx.peerWants[idx] = e # update entry if precense.len > 0:
else: trace "Sending precense to remote", items = precense.len
peerCtx.peerWants.add(e) await b.network.request.sendPresence(peer, precense)
trace "Added entry to peer's want list", peer = peerCtx.id, cid = $e.cid
# peer might want to ask for the same cid with
# different want params
if e.sendDontHave:
if not(await e.cid in b.localStore):
dontHaves.add(e.cid)
# send don't have's to remote
if dontHaves.len > 0:
await b.network.request.sendPresence(
peer,
dontHaves.mapIt(
BlockPresence(
cid: it.data.buffer,
`type`: BlockPresenceType.presenceDontHave)))
if not b.scheduleTask(peerCtx): if not b.scheduleTask(peerCtx):
trace "Unable to schedule task for peer", peer trace "Unable to schedule task for peer", peer
proc accountHandler*( proc accountHandler*(
engine: BlockExcEngine, engine: BlockExcEngine,
peer: PeerID, peer: PeerId,
account: Account) {.async.} = account: Account) {.async.} =
let context = engine.peers.get(peer) let context = engine.peers.get(peer)
if context.isNil: if context.isNil:
@ -327,8 +395,11 @@ proc paymentHandler*(
engine: BlockExcEngine, engine: BlockExcEngine,
peer: PeerId, peer: PeerId,
payment: SignedState) {.async.} = payment: SignedState) {.async.} =
trace "Handling payments", peer
without context =? engine.peers.get(peer).option and without context =? engine.peers.get(peer).option and
account =? context.account: account =? context.account:
trace "No context or account for peer", peer
return return
if channel =? context.paymentChannel: if channel =? context.paymentChannel:
@ -337,16 +408,17 @@ proc paymentHandler*(
else: else:
context.paymentChannel = engine.wallet.acceptChannel(payment).option context.paymentChannel = engine.wallet.acceptChannel(payment).option
proc setupPeer*(b: BlockExcEngine, peer: PeerID) {.async.} = proc setupPeer*(b: BlockExcEngine, peer: PeerId) {.async.} =
## Perform initial setup, such as want ## Perform initial setup, such as want
## list exchange ## list exchange
## ##
trace "Setting up new peer", peer
if peer notin b.peers: if peer notin b.peers:
trace "Setting up new peer", peer
b.peers.add(BlockExcPeerCtx( b.peers.add(BlockExcPeerCtx(
id: peer id: peer
)) ))
trace "Added peer", peers = b.peers.len
# broadcast our want list, the other peer will do the same # broadcast our want list, the other peer will do the same
if b.pendingBlocks.len > 0: if b.pendingBlocks.len > 0:
@ -356,7 +428,7 @@ proc setupPeer*(b: BlockExcEngine, peer: PeerID) {.async.} =
if address =? b.pricing.?address: if address =? b.pricing.?address:
await b.network.request.sendAccount(peer, Account(address: address)) await b.network.request.sendAccount(peer, Account(address: address))
proc dropPeer*(b: BlockExcEngine, peer: PeerID) = proc dropPeer*(b: BlockExcEngine, peer: PeerId) =
## Cleanup disconnected peer ## Cleanup disconnected peer
## ##
@ -368,25 +440,32 @@ proc dropPeer*(b: BlockExcEngine, peer: PeerID) =
proc taskHandler*(b: BlockExcEngine, task: BlockExcPeerCtx) {.gcsafe, async.} = proc taskHandler*(b: BlockExcEngine, task: BlockExcPeerCtx) {.gcsafe, async.} =
trace "Handling task for peer", peer = task.id trace "Handling task for peer", peer = task.id
# PART 1: Send to the peer blocks he wants to get, # Send to the peer blocks he wants to get,
# if they present in our local store # if they present in our local store
# TODO: There should be all sorts of accounting of # TODO: There should be all sorts of accounting of
# bytes sent/received here # bytes sent/received here
var wantsBlocks = task.peerWants.filterIt(it.wantType == WantType.wantBlock) var
wantsBlocks = task.peerWants.filterIt(
it.wantType == WantType.WantBlock
)
if wantsBlocks.len > 0: if wantsBlocks.len > 0:
trace "Got peer want blocks list", items = wantsBlocks.len
wantsBlocks.sort(SortOrder.Descending) wantsBlocks.sort(SortOrder.Descending)
let blockFuts = await allFinished(wantsBlocks.mapIt( let
blockFuts = await allFinished(wantsBlocks.mapIt(
b.localStore.getBlock(it.cid) b.localStore.getBlock(it.cid)
)) ))
# Extract succesfully received blocks # Extract successfully received blocks
let blocks = blockFuts let
.filterIt(it.completed and it.read.isOk) blocks = blockFuts
.mapIt(it.read.get) .filterIt(it.completed and it.read.isOk)
.mapIt(it.read.get)
if blocks.len > 0: if blocks.len > 0:
trace "Sending blocks to peer", peer = task.id, blocks = blocks.len trace "Sending blocks to peer", peer = task.id, blocks = blocks.len
@ -394,29 +473,13 @@ proc taskHandler*(b: BlockExcEngine, task: BlockExcPeerCtx) {.gcsafe, async.} =
task.id, task.id,
blocks) blocks)
trace "About to remove entries from peerWants", blocks = blocks.len, items = task.peerWants.len
# Remove successfully sent blocks # Remove successfully sent blocks
task.peerWants.keepIf( task.peerWants.keepIf(
proc(e: Entry): bool = proc(e: Entry): bool =
not blocks.anyIt( it.cid == e.cid ) not blocks.anyIt( it.cid == e.cid )
) )
trace "Removed entries from peerWants", items = task.peerWants.len
# PART 2: Send to the peer prices of the blocks he wants to discover,
# if they present in our local store
var wants: seq[BlockPresence]
# do not remove wants from the queue unless
# we send the block or get a cancel
for e in task.peerWants:
if e.wantType == WantType.wantHave:
var presence = Presence(cid: e.cid)
presence.have = await (presence.cid in b.localStore)
if presence.have and price =? b.pricing.?price:
presence.price = price
wants.add(BlockPresence.init(presence))
if wants.len > 0:
await b.network.request.sendPresence(task.id, wants)
proc blockexcTaskRunner(b: BlockExcEngine) {.async.} = proc blockexcTaskRunner(b: BlockExcEngine) {.async.} =
## process tasks ## process tasks
@ -455,7 +518,7 @@ proc new*(
taskQueue: newAsyncHeapQueue[BlockExcPeerCtx](DefaultTaskQueueSize), taskQueue: newAsyncHeapQueue[BlockExcPeerCtx](DefaultTaskQueueSize),
discovery: discovery) discovery: discovery)
proc peerEventHandler(peerId: PeerID, event: PeerEvent) {.async.} = proc peerEventHandler(peerId: PeerId, event: PeerEvent) {.async.} =
if event.kind == PeerEventKind.Joined: if event.kind == PeerEventKind.Joined:
await engine.setupPeer(peerId) await engine.setupPeer(peerId)
else: else:
@ -466,17 +529,17 @@ proc new*(
network.switch.addPeerEventHandler(peerEventHandler, PeerEventKind.Left) network.switch.addPeerEventHandler(peerEventHandler, PeerEventKind.Left)
proc blockWantListHandler( proc blockWantListHandler(
peer: PeerID, peer: PeerId,
wantList: WantList): Future[void] {.gcsafe.} = wantList: WantList): Future[void] {.gcsafe.} =
engine.wantListHandler(peer, wantList) engine.wantListHandler(peer, wantList)
proc blockPresenceHandler( proc blockPresenceHandler(
peer: PeerID, peer: PeerId,
presence: seq[BlockPresence]): Future[void] {.gcsafe.} = presence: seq[BlockPresence]): Future[void] {.gcsafe.} =
engine.blockPresenceHandler(peer, presence) engine.blockPresenceHandler(peer, presence)
proc blocksHandler( proc blocksHandler(
peer: PeerID, peer: PeerId,
blocks: seq[bt.Block]): Future[void] {.gcsafe.} = blocks: seq[bt.Block]): Future[void] {.gcsafe.} =
engine.blocksHandler(peer, blocks) engine.blocksHandler(peer, blocks)

View File

@ -21,7 +21,7 @@ import pkg/libp2p
import ../../blocktype import ../../blocktype
logScope: logScope:
topics = "codex blockexc pendingblocks" topics = "codex pendingblocks"
const const
DefaultBlockTimeout* = 10.minutes DefaultBlockTimeout* = 10.minutes
@ -48,7 +48,7 @@ proc getWantHandle*(
handle: newFuture[Block]("pendingBlocks.getWantHandle"), handle: newFuture[Block]("pendingBlocks.getWantHandle"),
inFlight: inFlight) inFlight: inFlight)
trace "Adding pending future for block", cid trace "Adding pending future for block", cid, inFlight = p.blocks[cid].inFlight
return await p.blocks[cid].handle.wait(timeout) return await p.blocks[cid].handle.wait(timeout)
except CancelledError as exc: except CancelledError as exc:
@ -76,15 +76,18 @@ proc resolve*(
proc setInFlight*( proc setInFlight*(
p: PendingBlocksManager, p: PendingBlocksManager,
cid: Cid) = cid: Cid,
inFlight = true) =
p.blocks.withValue(cid, pending): p.blocks.withValue(cid, pending):
pending[].inFlight = true pending[].inFlight = inFlight
trace "Setting inflight", cid, inFlight = pending[].inFlight
proc isInFlight*( proc isInFlight*(
p: PendingBlocksManager, p: PendingBlocksManager,
cid: Cid): bool = cid: Cid): bool =
p.blocks.withValue(cid, pending): p.blocks.withValue(cid, pending):
result = pending[].inFlight result = pending[].inFlight
trace "Getting inflight", cid, inFlight = result
proc pending*( proc pending*(
p: PendingBlocksManager, p: PendingBlocksManager,

View File

@ -27,7 +27,7 @@ import ./networkpeer
export network, payments export network, payments
logScope: logScope:
topics = "codex blockexc network" topics = "codex blockexcnetwork"
const const
Codec* = "/codex/blockexc/1.0.0" Codec* = "/codex/blockexc/1.0.0"
@ -44,7 +44,7 @@ type
cids: seq[Cid], cids: seq[Cid],
priority: int32 = 0, priority: int32 = 0,
cancel: bool = false, cancel: bool = false,
wantType: WantType = WantType.wantHave, wantType: WantType = WantType.WantHave,
full: bool = false, full: bool = false,
sendDontHave: bool = false): Future[void] {.gcsafe.} sendDontHave: bool = false): Future[void] {.gcsafe.}
@ -105,7 +105,7 @@ proc makeWantList*(
cids: seq[Cid], cids: seq[Cid],
priority: int = 0, priority: int = 0,
cancel: bool = false, cancel: bool = false,
wantType: WantType = WantType.wantHave, wantType: WantType = WantType.WantHave,
full: bool = false, full: bool = false,
sendDontHave: bool = false): WantList = sendDontHave: bool = false): WantList =
WantList( WantList(
@ -124,7 +124,7 @@ proc sendWantList*(
cids: seq[Cid], cids: seq[Cid],
priority: int32 = 0, priority: int32 = 0,
cancel: bool = false, cancel: bool = false,
wantType: WantType = WantType.wantHave, wantType: WantType = WantType.WantHave,
full: bool = false, full: bool = false,
sendDontHave: bool = false): Future[void] = sendDontHave: bool = false): Future[void] =
## Send a want message to peer ## Send a want message to peer
@ -332,17 +332,18 @@ proc new*(
## Create a new BlockExcNetwork instance ## Create a new BlockExcNetwork instance
## ##
let self = BlockExcNetwork( let
switch: switch, self = BlockExcNetwork(
getConn: connProvider, switch: switch,
inflightSema: newAsyncSemaphore(maxInflight)) getConn: connProvider,
inflightSema: newAsyncSemaphore(maxInflight))
proc sendWantList( proc sendWantList(
id: PeerID, id: PeerID,
cids: seq[Cid], cids: seq[Cid],
priority: int32 = 0, priority: int32 = 0,
cancel: bool = false, cancel: bool = false,
wantType: WantType = WantType.wantHave, wantType: WantType = WantType.WantHave,
full: bool = false, full: bool = false,
sendDontHave: bool = false): Future[void] {.gcsafe.} = sendDontHave: bool = false): Future[void] {.gcsafe.} =
self.sendWantList( self.sendWantList(

View File

@ -18,7 +18,7 @@ import ../protobuf/blockexc
import ../../errors import ../../errors
logScope: logScope:
topics = "codex blockexc networkpeer" topics = "codex blockexcnetworkpeer"
const const
MaxMessageSize = 100 * 1 shl 20 # manifest files can be big MaxMessageSize = 100 * 1 shl 20 # manifest files can be big

View File

@ -9,6 +9,8 @@
import std/sequtils import std/sequtils
import std/tables import std/tables
import pkg/chronicles
import pkg/libp2p import pkg/libp2p
import pkg/chronos import pkg/chronos
import pkg/nitro import pkg/nitro
@ -20,35 +22,40 @@ import ../protobuf/presence
export payments, nitro export payments, nitro
logScope:
topics = "codex peercontext"
type type
BlockExcPeerCtx* = ref object of RootObj BlockExcPeerCtx* = ref object of RootObj
id*: PeerID id*: PeerID
peerPrices*: Table[Cid, UInt256] # remote peer have list including price blocks*: Table[Cid, Presence] # remote peer have list including price
peerWants*: seq[Entry] # remote peers want lists peerWants*: seq[Entry] # remote peers want lists
exchanged*: int # times peer has exchanged with us exchanged*: int # times peer has exchanged with us
lastExchange*: Moment # last time peer has exchanged with us lastExchange*: Moment # last time peer has exchanged with us
account*: ?Account # ethereum account of this peer account*: ?Account # ethereum account of this peer
paymentChannel*: ?ChannelId # payment channel id paymentChannel*: ?ChannelId # payment channel id
proc peerHave*(context: BlockExcPeerCtx): seq[Cid] = proc peerHave*(self: BlockExcPeerCtx): seq[Cid] =
toSeq(context.peerPrices.keys) toSeq(self.blocks.keys)
proc contains*(a: openArray[BlockExcPeerCtx], b: PeerID): bool = proc contains*(self: BlockExcPeerCtx, cid: Cid): bool =
## Convenience method to check for peer prepense cid in self.blocks
##
a.anyIt( it.id == b ) func setPresence*(self: BlockExcPeerCtx, presence: Presence) =
self.blocks[presence.cid] = presence
func updatePresence*(context: BlockExcPeerCtx, presence: Presence) = func cleanPresence*(self: BlockExcPeerCtx, cids: seq[Cid]) =
let cid = presence.cid
let price = presence.price
if cid notin context.peerHave and presence.have:
context.peerPrices[cid] = price
elif cid in context.peerHave and not presence.have:
context.peerPrices.del(cid)
func price*(context: BlockExcPeerCtx, cids: seq[Cid]): UInt256 =
for cid in cids: for cid in cids:
if price =? context.peerPrices.?[cid]: self.blocks.del(cid)
result += price
func cleanPresence*(self: BlockExcPeerCtx, cid: Cid) =
self.cleanPresence(@[cid])
func price*(self: BlockExcPeerCtx, cids: seq[Cid]): UInt256 =
var price = 0.u256
for cid in cids:
self.blocks.withValue(cid, precense):
price += precense[].price
trace "Blocks price", price
price

View File

@ -25,7 +25,7 @@ import ./peercontext
export peercontext export peercontext
logScope: logScope:
topics = "codex blockexc peerctxstore" topics = "codex peerctxstore"
type type
PeerCtxStore* = ref object of RootObj PeerCtxStore* = ref object of RootObj
@ -35,6 +35,12 @@ iterator items*(self: PeerCtxStore): BlockExcPeerCtx =
for p in self.peers.values: for p in self.peers.values:
yield p yield p
proc contains*(a: openArray[BlockExcPeerCtx], b: PeerID): bool =
## Convenience method to check for peer precense
##
a.anyIt( it.id == b )
func contains*(self: PeerCtxStore, peerId: PeerID): bool = func contains*(self: PeerCtxStore, peerId: PeerID): bool =
peerId in self.peers peerId in self.peers
@ -63,13 +69,21 @@ func selectCheapest*(self: PeerCtxStore, cid: Cid): seq[BlockExcPeerCtx] =
var var
peers = self.peersHave(cid) peers = self.peersHave(cid)
trace "Selecting cheapest peers", peers = peers.len
func cmp(a, b: BlockExcPeerCtx): int = func cmp(a, b: BlockExcPeerCtx): int =
# Can't do (a - b) without cast[int](a - b) var
if a.peerPrices.getOrDefault(cid, 0.u256) == priceA = 0.u256
b.peerPrices.getOrDefault(cid, 0.u256): priceB = 0.u256
a.blocks.withValue(cid, precense):
priceA = precense[].price
b.blocks.withValue(cid, precense):
priceB = precense[].price
if priceA == priceB:
0 0
elif a.peerPrices.getOrDefault(cid, 0.u256) > elif priceA > priceB:
b.peerPrices.getOrDefault(cid, 0.u256):
1 1
else: else:
-1 -1
@ -79,4 +93,5 @@ func selectCheapest*(self: PeerCtxStore, cid: Cid): seq[BlockExcPeerCtx] =
return peers return peers
proc new*(T: type PeerCtxStore): PeerCtxStore = proc new*(T: type PeerCtxStore): PeerCtxStore =
T(peers: initOrderedTable[PeerID, BlockExcPeerCtx]()) T(
peers: initOrderedTable[PeerID, BlockExcPeerCtx]())

View File

@ -8,8 +8,8 @@ import pkg/libp2p/protobuf/minprotobuf
type type
WantType* = enum WantType* = enum
wantBlock = 0, WantBlock = 0,
wantHave = 1 WantHave = 1
Entry* = object Entry* = object
`block`*: seq[byte] # The block cid `block`*: seq[byte] # The block cid
@ -27,8 +27,8 @@ type
data*: seq[byte] data*: seq[byte]
BlockPresenceType* = enum BlockPresenceType* = enum
presenceHave = 0, Have = 0,
presenceDontHave = 1 DontHave = 1
BlockPresence* = object BlockPresence* = object
cid*: seq[byte] # The block cid cid*: seq[byte] # The block cid

View File

@ -18,13 +18,6 @@ type
have*: bool have*: bool
price*: UInt256 price*: UInt256
func init*(_: type PresenceMessage, presence: Presence): PresenceMessage =
PresenceMessage(
cid: presence.cid.data.buffer,
`type`: if presence.have: presenceHave else: presenceDontHave,
price: @(presence.price.toBytesBE)
)
func parse(_: type UInt256, bytes: seq[byte]): ?UInt256 = func parse(_: type UInt256, bytes: seq[byte]): ?UInt256 =
if bytes.len > 32: if bytes.len > 32:
return UInt256.none return UInt256.none
@ -37,6 +30,16 @@ func init*(_: type Presence, message: PresenceMessage): ?Presence =
some Presence( some Presence(
cid: cid, cid: cid,
have: message.`type` == presenceHave, have: message.`type` == BlockPresenceType.Have,
price: price price: price
) )
func init*(_: type PresenceMessage, presence: Presence): PresenceMessage =
PresenceMessage(
cid: presence.cid.data.buffer,
`type`: if presence.have:
BlockPresenceType.Have
else:
BlockPresenceType.DontHave,
price: @(presence.price.toBytesBE)
)

View File

@ -57,7 +57,7 @@ proc getBytes*(c: Chunker): Future[seq[byte]] {.async.} =
if not c.pad and buff.len > read: if not c.pad and buff.len > read:
buff.setLen(read) buff.setLen(read)
return buff return move buff
func new*( func new*(
T: type Chunker, T: type Chunker,

View File

@ -75,7 +75,7 @@ proc start*(s: CodexServer) {.async.} =
s.codexNode.discovery.updateAnnounceRecord(announceAddrs) s.codexNode.discovery.updateAnnounceRecord(announceAddrs)
s.codexNode.discovery.updateDhtRecord(s.config.nat, s.config.discoveryPort) s.codexNode.discovery.updateDhtRecord(s.config.nat, s.config.discoveryPort)
s.runHandle = newFuture[void]() s.runHandle = newFuture[void]("codex.runHandle")
await s.runHandle await s.runHandle
proc stop*(s: CodexServer) {.async.} = proc stop*(s: CodexServer) {.async.} =
@ -116,7 +116,7 @@ proc new*(T: type CodexServer, config: CodexConf, privateKey: CodexPrivateKey):
.build() .build()
var var
cache: CacheStore cache: CacheStore = nil
if config.cacheSize > 0: if config.cacheSize > 0:
cache = CacheStore.new(cacheSize = config.cacheSize * MiB) cache = CacheStore.new(cacheSize = config.cacheSize * MiB)

View File

@ -143,9 +143,9 @@ type
abbr: "p" }: int abbr: "p" }: int
cacheSize* {. cacheSize* {.
desc: "The size in MiB of the block cache, 0 disables the cache" desc: "The size in MiB of the block cache, 0 disables the cache - might help on slow hardrives"
defaultValue: DefaultCacheSizeMiB defaultValue: 0
defaultValueDesc: $DefaultCacheSizeMiB defaultValueDesc: "0"
name: "cache-size" name: "cache-size"
abbr: "c" }: Natural abbr: "c" }: Natural

View File

@ -109,7 +109,7 @@ proc encode*(
# TODO: this is a tight blocking loop so we sleep here to allow # TODO: this is a tight blocking loop so we sleep here to allow
# other events to be processed, this should be addressed # other events to be processed, this should be addressed
# by threading # by threading
await sleepAsync(100.millis) await sleepAsync(10.millis)
for j in 0..<blocks: for j in 0..<blocks:
let idx = blockIdx[j] let idx = blockIdx[j]

View File

@ -147,23 +147,25 @@ proc retrieve*(
let let
stream = BufferStream.new() stream = BufferStream.new()
if blkOrNone =? (await node.blockStore.getBlock(cid)) and blk =? blkOrNone: without blk =? (await node.blockStore.getBlock(cid)), err:
proc streamOneBlock(): Future[void] {.async.} = return failure(err)
try:
await stream.pushData(blk.data)
except CatchableError as exc:
trace "Unable to send block", cid
discard
finally:
await stream.pushEof()
asyncSpawn streamOneBlock() proc streamOneBlock(): Future[void] {.async.} =
return LPStream(stream).success() try:
await stream.pushData(blk.data)
except CatchableError as exc:
trace "Unable to send block", cid
discard
finally:
await stream.pushEof()
asyncSpawn streamOneBlock()
return LPStream(stream).success()
return failure("Unable to retrieve Cid!") return failure("Unable to retrieve Cid!")
proc store*( proc store*(
node: CodexNodeRef, self: CodexNodeRef,
stream: LPStream, stream: LPStream,
blockSize = BlockSize): Future[?!Cid] {.async.} = blockSize = BlockSize): Future[?!Cid] {.async.} =
## Save stream contents as dataset with given blockSize ## Save stream contents as dataset with given blockSize
@ -187,7 +189,7 @@ proc store*(
return failure("Unable to init block from chunk!") return failure("Unable to init block from chunk!")
blockManifest.add(blk.cid) blockManifest.add(blk.cid)
if isErr (await node.blockStore.putBlock(blk)): if isErr (await self.blockStore.putBlock(blk)):
# trace "Unable to store block", cid = blk.cid # trace "Unable to store block", cid = blk.cid
return failure(&"Unable to store block {blk.cid}") return failure(&"Unable to store block {blk.cid}")
@ -209,18 +211,21 @@ proc store*(
trace "Unable to init block from manifest data!" trace "Unable to init block from manifest data!"
return failure("Unable to init block from manifest data!") return failure("Unable to init block from manifest data!")
if isErr (await node.blockStore.putBlock(manifest)): if isErr (await self.blockStore.putBlock(manifest)):
trace "Unable to store manifest", cid = $manifest.cid trace "Unable to store manifest", cid = manifest.cid
return failure("Unable to store manifest " & $manifest.cid) return failure("Unable to store manifest " & $manifest.cid)
without cid =? blockManifest.cid, error: without cid =? blockManifest.cid, error:
trace "Unable to generate manifest Cid!", exc = error.msg trace "Unable to generate manifest Cid!", exc = error.msg
return failure(error.msg) return failure(error.msg)
trace "Stored data", manifestCid = $manifest.cid, trace "Stored data", manifestCid = manifest.cid,
contentCid = cid, contentCid = cid,
blocks = blockManifest.len blocks = blockManifest.len
# Announce manifest
await self.discovery.provide(manifest.cid)
return manifest.cid.success return manifest.cid.success
proc requestStorage*(self: CodexNodeRef, proc requestStorage*(self: CodexNodeRef,
@ -263,7 +268,7 @@ proc requestStorage*(self: CodexNodeRef,
return failure(error) return failure(error)
if isErr (await self.blockStore.putBlock(encodedBlk)): if isErr (await self.blockStore.putBlock(encodedBlk)):
trace "Unable to store encoded manifest block", cid = $encodedBlk.cid trace "Unable to store encoded manifest block", cid = encodedBlk.cid
return failure("Unable to store encoded manifest block") return failure("Unable to store encoded manifest block")
let request = StorageRequest( let request = StorageRequest(

View File

@ -40,7 +40,7 @@ type
const const
MiB* = 1024 * 1024 # bytes, 1 mebibyte = 1,048,576 bytes MiB* = 1024 * 1024 # bytes, 1 mebibyte = 1,048,576 bytes
DefaultCacheSizeMiB* = 100 DefaultCacheSizeMiB* = 5
DefaultCacheSize* = DefaultCacheSizeMiB * MiB # bytes DefaultCacheSize* = DefaultCacheSizeMiB * MiB # bytes
method getBlock*(self: CacheStore, cid: Cid): Future[?!Block] {.async.} = method getBlock*(self: CacheStore, cid: Cid): Future[?!Block] {.async.} =

View File

@ -185,7 +185,8 @@ method listBlocks*(self: FSStore, onBlock: OnBlock): Future[?!void] {.async.} =
for (fkind, filename) in folderPath.walkDir(relative = true): for (fkind, filename) in folderPath.walkDir(relative = true):
if fkind != pcFile: continue if fkind != pcFile: continue
let cid = Cid.init(filename) let cid = Cid.init(filename)
if cid.isOk: await onBlock(cid.get()) if cid.isOk:
await onBlock(cid.get())
return success() return success()
@ -199,7 +200,7 @@ proc new*(
T: type FSStore, T: type FSStore,
repoDir: string, repoDir: string,
postfixLen = 2, postfixLen = 2,
cache: BlockStore = CacheStore.new()): T = cache: BlockStore = nil): T =
T( T(
postfixLen: postfixLen, postfixLen: postfixLen,
repoDir: repoDir, repoDir: repoDir,

View File

@ -27,7 +27,7 @@ import ./seekablestream
export stores, blocktype, manifest, chronos export stores, blocktype, manifest, chronos
logScope: logScope:
topics = "dagger storestream" topics = "codex storestream"
type type
# Make SeekableStream from a sequence of blocks stored in Manifest # Make SeekableStream from a sequence of blocks stored in Manifest
@ -69,7 +69,7 @@ method readOnce*(
## Return how many bytes were actually read before EOF was encountered. ## Return how many bytes were actually read before EOF was encountered.
## Raise exception if we are already at EOF. ## Raise exception if we are already at EOF.
trace "Reading from manifest", cid = $self.manifest.cid.get(), blocks = self.manifest.len trace "Reading from manifest", cid = self.manifest.cid.get(), blocks = self.manifest.len
if self.atEof: if self.atEof:
raise newLPStreamEOFError() raise newLPStreamEOFError()

View File

@ -111,12 +111,13 @@ suite "Block Advertising and Discovery":
) )
peerId = PeerID.example peerId = PeerID.example
haves = collect(initTable()): haves = collect(initTable()):
for blk in blocks: {blk.cid: 0.u256} for blk in blocks:
{ blk.cid: Presence(cid: blk.cid, price: 0.u256) }
engine.peers.add( engine.peers.add(
BlockExcPeerCtx( BlockExcPeerCtx(
id: peerId, id: peerId,
peerPrices: haves blocks: haves
)) ))
blockDiscovery.findBlockProvidersHandler = blockDiscovery.findBlockProvidersHandler =

View File

@ -150,12 +150,13 @@ suite "Test Discovery Engine":
blockDiscovery.findBlockProvidersHandler = blockDiscovery.findBlockProvidersHandler =
proc(d: MockDiscovery, cid: Cid): Future[seq[SignedPeerRecord]] {.async, gcsafe.} = proc(d: MockDiscovery, cid: Cid): Future[seq[SignedPeerRecord]] {.async, gcsafe.} =
check cid == blocks[0].cid check cid == blocks[0].cid
check peerStore.len < minPeers check peerStore.len < minPeers
var var
peerCtx = BlockExcPeerCtx(id: PeerID.example) peerCtx = BlockExcPeerCtx(id: PeerID.example)
peerCtx.peerPrices[cid] = 0.u256 peerCtx.blocks[cid] = Presence(cid: cid, price: 0.u256)
peerStore.add(peerCtx) peerStore.add(peerCtx)
want.fire() want.fire()

View File

@ -20,8 +20,8 @@ import ../../helpers
suite "NetworkStore engine - 2 nodes": suite "NetworkStore engine - 2 nodes":
let let
chunker1 = RandomChunker.new(Rng.instance(), size = 1024, chunkSize = 256) chunker1 = RandomChunker.new(Rng.instance(), size = 2048, chunkSize = 256)
chunker2 = RandomChunker.new(Rng.instance(), size = 1024, chunkSize = 256) chunker2 = RandomChunker.new(Rng.instance(), size = 2048, chunkSize = 256)
var var
nodeCmps1, nodeCmps2: NodesComponents nodeCmps1, nodeCmps2: NodesComponents
@ -57,8 +57,13 @@ suite "NetworkStore engine - 2 nodes":
nodeCmps2.engine.start()) nodeCmps2.engine.start())
# initialize our want lists # initialize our want lists
pendingBlocks1 = blocks2.mapIt( nodeCmps1.pendingBlocks.getWantHandle( it.cid ) ) pendingBlocks1 = blocks2[0..3].mapIt(
pendingBlocks2 = blocks1.mapIt( nodeCmps2.pendingBlocks.getWantHandle( it.cid ) ) nodeCmps1.pendingBlocks.getWantHandle( it.cid )
)
pendingBlocks2 = blocks1[0..3].mapIt(
nodeCmps2.pendingBlocks.getWantHandle( it.cid )
)
pricing1 = Pricing.example() pricing1 = Pricing.example()
pricing2 = Pricing.example() pricing2 = Pricing.example()
@ -88,7 +93,7 @@ suite "NetworkStore engine - 2 nodes":
nodeCmps2.engine.stop(), nodeCmps2.engine.stop(),
nodeCmps2.switch.stop()) nodeCmps2.switch.stop())
test "Should exchange want lists on connect": test "Should exchange blocks on connect":
await allFuturesThrowing( await allFuturesThrowing(
allFinished(pendingBlocks1)) allFinished(pendingBlocks1))
.wait(10.seconds) .wait(10.seconds)
@ -98,11 +103,19 @@ suite "NetworkStore engine - 2 nodes":
.wait(10.seconds) .wait(10.seconds)
check: check:
peerCtx1.peerHave.mapIt( $it ).sorted(cmp[string]) == (await allFinished(
pendingBlocks2.mapIt( $it.read.cid ).sorted(cmp[string]) blocks1[0..3].mapIt(
nodeCmps2.localStore.getBlock( it.cid ) )))
.filterIt( it.completed and it.read.isOk )
.mapIt( $it.read.get.cid ).sorted(cmp[string]) ==
blocks1[0..3].mapIt( $it.cid ).sorted(cmp[string])
peerCtx2.peerHave.mapIt( $it ).sorted(cmp[string]) == (await allFinished(
pendingBlocks1.mapIt( $it.read.cid ).sorted(cmp[string]) blocks2[0..3].mapIt(
nodeCmps1.localStore.getBlock( it.cid ) )))
.filterIt( it.completed and it.read.isOk )
.mapIt( $it.read.get.cid ).sorted(cmp[string]) ==
blocks2[0..3].mapIt( $it.cid ).sorted(cmp[string])
test "Should exchanges accounts on connect": test "Should exchanges accounts on connect":
check peerCtx1.account.?address == pricing1.address.some check peerCtx1.account.?address == pricing1.address.some
@ -116,7 +129,7 @@ suite "NetworkStore engine - 2 nodes":
`block`: blk.cid.data.buffer, `block`: blk.cid.data.buffer,
priority: 1, priority: 1,
cancel: false, cancel: false,
wantType: WantType.wantBlock, wantType: WantType.WantBlock,
sendDontHave: false) sendDontHave: false)
peerCtx1.peerWants.add(entry) peerCtx1.peerWants.add(entry)
@ -128,16 +141,19 @@ suite "NetworkStore engine - 2 nodes":
check eventually (await nodeCmps1.localStore.hasBlock(blk.cid)).tryGet() check eventually (await nodeCmps1.localStore.hasBlock(blk.cid)).tryGet()
test "Should get blocks from remote": test "Should get blocks from remote":
let blocks = await allFinished( let
blocks2.mapIt( nodeCmps1.networkStore.getBlock(it.cid) )) blocks = await allFinished(
check blocks.mapIt( it.read().tryGet() ) == blocks2 blocks2[4..7].mapIt(
nodeCmps1.networkStore.getBlock(it.cid)
))
check blocks.mapIt( it.read().tryGet() ) == blocks2[4..7]
test "Remote should send blocks when available": test "Remote should send blocks when available":
let blk = bt.Block.new("Block 1".toBytes).tryGet() let blk = bt.Block.new("Block 1".toBytes).tryGet()
# should fail retrieving block from remote # should fail retrieving block from remote
check not await nodeCmps1.networkStore.getBlock(blk.cid) check not await blk.cid in nodeCmps1.networkStore
.withTimeout(100.millis) # should expire
# second trigger blockexc to resolve any pending requests # second trigger blockexc to resolve any pending requests
# for the block # for the block
@ -148,15 +164,22 @@ suite "NetworkStore engine - 2 nodes":
.withTimeout(100.millis) # should succeed .withTimeout(100.millis) # should succeed
test "Should receive payments for blocks that were sent": test "Should receive payments for blocks that were sent":
# delete on node1 cached blocks from node2
discard await allFinished( discard await allFinished(
blocks2.mapIt( nodeCmps1.networkStore.delBlock(it.cid) )) blocks2[4..7].mapIt(
nodeCmps2.networkStore.putBlock(it)
))
let blocks = await allFinished( let
blocks2.mapIt( nodeCmps1.networkStore.getBlock(it.cid) )) blocks = await allFinished(
blocks2[4..7].mapIt(
nodeCmps1.networkStore.getBlock(it.cid)
))
let channel = !peerCtx1.paymentChannel # await sleepAsync(10.seconds)
let wallet = nodeCmps2.wallet
let
channel = !peerCtx1.paymentChannel
wallet = nodeCmps2.wallet
check eventually wallet.balance(channel, Asset) > 0 check eventually wallet.balance(channel, Asset) > 0
@ -194,15 +217,24 @@ suite "NetworkStore - multiple nodes":
switch = @[] switch = @[]
networkStore = @[] networkStore = @[]
test "Should receive haves for own want list": test "Should receive blocks for own want list":
let let
downloader = networkStore[4] downloader = networkStore[4]
engine = downloader.engine engine = downloader.engine
# Add blocks from 1st peer to want list # Add blocks from 1st peer to want list
let let
pendingBlocks1 = blocks[0..3].mapIt( engine.pendingBlocks.getWantHandle( it.cid ) ) downloadCids =
pendingBlocks2 = blocks[12..15].mapIt( engine.pendingBlocks.getWantHandle( it.cid )) blocks[0..3].mapIt(
it.cid
) &
blocks[12..15].mapIt(
it.cid
)
pendingBlocks = downloadCids.mapIt(
engine.pendingBlocks.getWantHandle( it )
)
for i in 0..15: for i in 0..15:
(await networkStore[i div 4].engine.localStore.putBlock(blocks[i])).tryGet() (await networkStore[i div 4].engine.localStore.putBlock(blocks[i])).tryGet()
@ -211,18 +243,15 @@ suite "NetworkStore - multiple nodes":
await sleepAsync(1.seconds) await sleepAsync(1.seconds)
await allFuturesThrowing( await allFuturesThrowing(
allFinished(pendingBlocks1), allFinished(pendingBlocks))
allFinished(pendingBlocks2))
let
peers = toSeq(engine.peers)
check: check:
peers[0].peerHave.mapIt($it).sorted(cmp[string]) == (await allFinished(
blocks[0..3].mapIt( $(it.cid) ).sorted(cmp[string]) downloadCids.mapIt(
downloader.localStore.getBlock( it ) )))
peers[3].peerHave.mapIt($it).sorted(cmp[string]) == .filterIt( it.completed and it.read.isOk )
blocks[12..15].mapIt( $(it.cid) ).sorted(cmp[string]) .mapIt( $it.read.get.cid ).sorted(cmp[string]) ==
downloadCids.mapIt( $it ).sorted(cmp[string])
test "Should exchange blocks with multiple nodes": test "Should exchange blocks with multiple nodes":
let let
@ -231,8 +260,12 @@ suite "NetworkStore - multiple nodes":
# Add blocks from 1st peer to want list # Add blocks from 1st peer to want list
let let
pendingBlocks1 = blocks[0..3].mapIt( engine.pendingBlocks.getWantHandle( it.cid ) ) pendingBlocks1 = blocks[0..3].mapIt(
pendingBlocks2 = blocks[12..15].mapIt( engine.pendingBlocks.getWantHandle( it.cid )) engine.pendingBlocks.getWantHandle( it.cid )
)
pendingBlocks2 = blocks[12..15].mapIt(
engine.pendingBlocks.getWantHandle( it.cid )
)
for i in 0..15: for i in 0..15:
(await networkStore[i div 4].engine.localStore.putBlock(blocks[i])).tryGet() (await networkStore[i div 4].engine.localStore.putBlock(blocks[i])).tryGet()

View File

@ -58,7 +58,7 @@ suite "NetworkStore engine basic":
cids: seq[Cid], cids: seq[Cid],
priority: int32 = 0, priority: int32 = 0,
cancel: bool = false, cancel: bool = false,
wantType: WantType = WantType.wantHave, wantType: WantType = WantType.WantHave,
full: bool = false, full: bool = false,
sendDontHave: bool = false) {.gcsafe, async.} = sendDontHave: bool = false) {.gcsafe, async.} =
check cids.mapIt($it).sorted == blocks.mapIt( $it.cid ).sorted check cids.mapIt($it).sorted == blocks.mapIt( $it.cid ).sorted
@ -183,26 +183,55 @@ suite "NetworkStore engine handlers":
id: peerId id: peerId
) )
engine.peers.add(peerCtx) engine.peers.add(peerCtx)
done = newFuture[void]()
test "Should handle want list": test "Should schedule block requests":
let wantList = makeWantList(blocks.mapIt( it.cid )) let
wantList = makeWantList(
blocks.mapIt( it.cid ),
wantType = WantType.WantBlock) # only `wantBlock` are stored in `peerWants`
proc handler() {.async.} = proc handler() {.async.} =
let ctx = await engine.taskQueue.pop() let ctx = await engine.taskQueue.pop()
check ctx.id == peerId check ctx.id == peerId
# only `wantBlock` scheduled
check ctx.peerWants.mapIt( it.cid ) == blocks.mapIt( it.cid ) check ctx.peerWants.mapIt( it.cid ) == blocks.mapIt( it.cid )
let done = handler() let done = handler()
await engine.wantListHandler(peerId, wantList) await engine.wantListHandler(peerId, wantList)
await done await done
test "Should handle want list":
let
done = newFuture[void]()
wantList = makeWantList(blocks.mapIt( it.cid ))
proc sendPresence(peerId: PeerID, presence: seq[BlockPresence]) {.gcsafe, async.} =
check presence.mapIt( it.cid ) == wantList.entries.mapIt( it.`block` )
done.complete()
engine.network = BlockExcNetwork(
request: BlockExcRequest(
sendPresence: sendPresence
))
await allFuturesThrowing(
allFinished(blocks.mapIt( localStore.putBlock(it) )))
await engine.wantListHandler(peerId, wantList)
await done
test "Should handle want list - `dont-have`": test "Should handle want list - `dont-have`":
let wantList = makeWantList(blocks.mapIt( it.cid ), sendDontHave = true) let
done = newFuture[void]()
wantList = makeWantList(
blocks.mapIt( it.cid ),
sendDontHave = true)
proc sendPresence(peerId: PeerID, presence: seq[BlockPresence]) {.gcsafe, async.} = proc sendPresence(peerId: PeerID, presence: seq[BlockPresence]) {.gcsafe, async.} =
check presence.mapIt( it.cid ) == wantList.entries.mapIt( it.`block` ) check presence.mapIt( it.cid ) == wantList.entries.mapIt( it.`block` )
for p in presence: for p in presence:
check: check:
p.`type` == BlockPresenceType.presenceDontHave p.`type` == BlockPresenceType.DontHave
done.complete() done.complete()
@ -211,21 +240,31 @@ suite "NetworkStore engine handlers":
)) ))
await engine.wantListHandler(peerId, wantList) await engine.wantListHandler(peerId, wantList)
await done await done
test "Should handle want list - `dont-have` some blocks": test "Should handle want list - `dont-have` some blocks":
let wantList = makeWantList(blocks.mapIt( it.cid ), sendDontHave = true) let
done = newFuture[void]()
wantList = makeWantList(
blocks.mapIt( it.cid ),
sendDontHave = true)
proc sendPresence(peerId: PeerID, presence: seq[BlockPresence]) {.gcsafe, async.} = proc sendPresence(peerId: PeerID, presence: seq[BlockPresence]) {.gcsafe, async.} =
check presence.mapIt( it.cid ) == blocks[2..blocks.high].mapIt( it.cid.data.buffer ) let
cid1Buf = blocks[0].cid.data.buffer
cid2Buf = blocks[1].cid.data.buffer
for p in presence: for p in presence:
check: if p.cid != cid1Buf and p.cid != cid2Buf:
p.`type` == BlockPresenceType.presenceDontHave check p.`type` == BlockPresenceType.DontHave
else:
check p.`type` == BlockPresenceType.Have
done.complete() done.complete()
engine.network = BlockExcNetwork(request: BlockExcRequest( engine.network = BlockExcNetwork(
sendPresence: sendPresence request: BlockExcRequest(
sendPresence: sendPresence
)) ))
(await engine.localStore.putBlock(blocks[0])).tryGet() (await engine.localStore.putBlock(blocks[0])).tryGet()
@ -247,24 +286,58 @@ suite "NetworkStore engine handlers":
check present.tryGet() check present.tryGet()
test "Should send payments for received blocks": test "Should send payments for received blocks":
let account = Account(address: EthAddress.example) let
let peerContext = peerStore.get(peerId) done = newFuture[void]()
peerContext.account = account.some account = Account(address: EthAddress.example)
peerContext.peerPrices = blocks.mapIt((it.cid, rand(uint16).u256)).toTable peerContext = peerStore.get(peerId)
engine.network = BlockExcNetwork(request: BlockExcRequest( peerContext.account = account.some
sendPayment: proc(receiver: PeerID, payment: SignedState) {.gcsafe, async.} = peerContext.blocks = blocks.mapIt(
let amount = blocks.mapIt(peerContext.peerPrices[it.cid]).foldl(a+b) (it.cid, Presence(cid: it.cid, price: rand(uint16).u256))
let balances = !payment.state.outcome.balances(Asset) ).toTable
check receiver == peerId
check balances[account.address.toDestination] == amount engine.network = BlockExcNetwork(
done.complete() request: BlockExcRequest(
sendPayment: proc(receiver: PeerID, payment: SignedState) {.gcsafe, async.} =
let
amount =
blocks.mapIt(
peerContext.blocks[it.cid].price
).foldl(a + b)
balances = !payment.state.outcome.balances(Asset)
check receiver == peerId
check balances[account.address.toDestination] == amount
done.complete()
)) ))
await engine.blocksHandler(peerId, blocks) await engine.blocksHandler(peerId, blocks)
await done.wait(100.millis) await done.wait(100.millis)
test "Should handle block presence": test "Should handle block presence":
var
handles: Table[Cid, Future[bt.Block]]
proc sendWantList(
id: PeerID,
cids: seq[Cid],
priority: int32 = 0,
cancel: bool = false,
wantType: WantType = WantType.WantHave,
full: bool = false,
sendDontHave: bool = false) {.gcsafe, async.} =
engine.pendingBlocks.resolve(blocks.filterIt( it.cid in cids ))
engine.network = BlockExcNetwork(
request: BlockExcRequest(
sendWantList: sendWantList
))
# only Cids in peer want lists are requested
handles = blocks.mapIt(
(it.cid, engine.pendingBlocks.getWantHandle( it.cid ))).toTable
let price = UInt256.example let price = UInt256.example
await engine.blockPresenceHandler( await engine.blockPresenceHandler(
peerId, peerId,
@ -277,8 +350,8 @@ suite "NetworkStore engine handlers":
)))) ))))
for cid in blocks.mapIt(it.cid): for cid in blocks.mapIt(it.cid):
check peerCtx.peerHave.contains(cid) check cid in peerCtx.peerHave
check peerCtx.peerPrices[cid] == price check peerCtx.blocks[cid].price == price
suite "Task Handler": suite "Task Handler":
var var
@ -366,7 +439,7 @@ suite "Task Handler":
`block`: blocks[0].cid.data.buffer, `block`: blocks[0].cid.data.buffer,
priority: 49, priority: 49,
cancel: false, cancel: false,
wantType: WantType.wantBlock, wantType: WantType.WantBlock,
sendDontHave: false) sendDontHave: false)
) )
@ -376,7 +449,7 @@ suite "Task Handler":
`block`: blocks[1].cid.data.buffer, `block`: blocks[1].cid.data.buffer,
priority: 50, priority: 50,
cancel: false, cancel: false,
wantType: WantType.wantBlock, wantType: WantType.WantBlock,
sendDontHave: false) sendDontHave: false)
) )
@ -404,7 +477,7 @@ suite "Task Handler":
`block`: present[0].cid.data.buffer, `block`: present[0].cid.data.buffer,
priority: 1, priority: 1,
cancel: false, cancel: false,
wantType: WantType.wantHave, wantType: WantType.WantHave,
sendDontHave: false) sendDontHave: false)
) )
@ -414,7 +487,7 @@ suite "Task Handler":
`block`: present[1].cid.data.buffer, `block`: present[1].cid.data.buffer,
priority: 1, priority: 1,
cancel: false, cancel: false,
wantType: WantType.wantHave, wantType: WantType.WantHave,
sendDontHave: false) sendDontHave: false)
) )
@ -424,7 +497,7 @@ suite "Task Handler":
`block`: missing[0].cid.data.buffer, `block`: missing[0].cid.data.buffer,
priority: 1, priority: 1,
cancel: false, cancel: false,
wantType: WantType.wantHave, wantType: WantType.WantHave,
sendDontHave: false) sendDontHave: false)
) )

View File

@ -18,9 +18,9 @@ suite "block presence protobuf messages":
test "encodes have/donthave": test "encodes have/donthave":
var presence = presence var presence = presence
presence.have = true presence.have = true
check PresenceMessage.init(presence).`type` == presenceHave check PresenceMessage.init(presence).`type` == Have
presence.have = false presence.have = false
check PresenceMessage.init(presence).`type` == presenceDontHave check PresenceMessage.init(presence).`type` == DontHave
test "encodes price": test "encodes price":
check message.price == @(price.toBytesBE) check message.price == @(price.toBytesBE)
@ -35,9 +35,9 @@ suite "block presence protobuf messages":
test "decodes have/donthave": test "decodes have/donthave":
var message = message var message = message
message.`type` = presenceHave message.`type` = BlockPresenceType.Have
check Presence.init(message).?have == true.some check Presence.init(message).?have == true.some
message.`type` = presenceDontHave message.`type` = BlockPresenceType.DontHave
check Presence.init(message).?have == false.some check Presence.init(message).?have == false.some
test "decodes price": test "decodes price":

View File

@ -56,7 +56,7 @@ suite "Network - Handlers":
for b in blocks: for b in blocks:
check b.cid in wantList.entries check b.cid in wantList.entries
let entry = wantList.entries[wantList.entries.find(b.cid)] let entry = wantList.entries[wantList.entries.find(b.cid)]
check entry.wantType == WantType.wantHave check entry.wantType == WantType.WantHave
check entry.priority == 1 check entry.priority == 1
check entry.cancel == true check entry.cancel == true
check entry.sendDontHave == true check entry.sendDontHave == true
@ -67,7 +67,7 @@ suite "Network - Handlers":
let wantList = makeWantList( let wantList = makeWantList(
blocks.mapIt( it.cid ), blocks.mapIt( it.cid ),
1, true, WantType.wantHave, 1, true, WantType.WantHave,
true, true) true, true)
let msg = Message(wantlist: wantList) let msg = Message(wantlist: wantList)
@ -103,7 +103,7 @@ suite "Network - Handlers":
blockPresences: blocks.mapIt( blockPresences: blocks.mapIt(
BlockPresence( BlockPresence(
cid: it.cid.data.buffer, cid: it.cid.data.buffer,
type: BlockPresenceType.presenceHave type: BlockPresenceType.Have
))) )))
await buffer.pushData(lenPrefix(ProtobufEncode(msg))) await buffer.pushData(lenPrefix(ProtobufEncode(msg)))
@ -186,7 +186,7 @@ suite "Network - Senders":
for b in blocks: for b in blocks:
check b.cid in wantList.entries check b.cid in wantList.entries
let entry = wantList.entries[wantList.entries.find(b.cid)] let entry = wantList.entries[wantList.entries.find(b.cid)]
check entry.wantType == WantType.wantHave check entry.wantType == WantType.WantHave
check entry.priority == 1 check entry.priority == 1
check entry.cancel == true check entry.cancel == true
check entry.sendDontHave == true check entry.sendDontHave == true
@ -197,7 +197,7 @@ suite "Network - Senders":
await network1.sendWantList( await network1.sendWantList(
switch2.peerInfo.peerId, switch2.peerInfo.peerId,
blocks.mapIt( it.cid ), blocks.mapIt( it.cid ),
1, true, WantType.wantHave, 1, true, WantType.WantHave,
true, true) true, true)
await done.wait(500.millis) await done.wait(500.millis)
@ -231,7 +231,7 @@ suite "Network - Senders":
blocks.mapIt( blocks.mapIt(
BlockPresence( BlockPresence(
cid: it.cid.data.buffer, cid: it.cid.data.buffer,
type: BlockPresenceType.presenceHave type: BlockPresenceType.Have
))) )))
await done.wait(500.millis) await done.wait(500.millis)

View File

@ -6,6 +6,7 @@ import pkg/libp2p
import pkg/codex/blockexchange/peers import pkg/codex/blockexchange/peers
import pkg/codex/blockexchange/protobuf/blockexc import pkg/codex/blockexchange/protobuf/blockexc
import pkg/codex/blockexchange/protobuf/presence
import ../examples import ../examples
@ -52,13 +53,13 @@ suite "Peer Context Store Peer Selection":
peerCtxs = @[] peerCtxs = @[]
test "Should select peers that have Cid": test "Should select peers that have Cid":
peerCtxs[0].peerPrices = collect(initTable): peerCtxs[0].blocks = collect(initTable):
for i, c in cids: for i, c in cids:
{ c: i.u256 } { c: Presence(cid: c, price: i.u256) }
peerCtxs[5].peerPrices = collect(initTable): peerCtxs[5].blocks = collect(initTable):
for i, c in cids: for i, c in cids:
{ c: i.u256 } { c: Presence(cid: c, price: i.u256) }
let let
peers = store.peersHave(cids[0]) peers = store.peersHave(cids[0])
@ -68,17 +69,17 @@ suite "Peer Context Store Peer Selection":
check peerCtxs[5] in peers check peerCtxs[5] in peers
test "Should select cheapest peers for Cid": test "Should select cheapest peers for Cid":
peerCtxs[0].peerPrices = collect(initTable): peerCtxs[0].blocks = collect(initTable):
for i, c in cids: for i, c in cids:
{ c: (5 + i).u256 } { c: Presence(cid: c, price: (5 + i).u256) }
peerCtxs[5].peerPrices = collect(initTable): peerCtxs[5].blocks = collect(initTable):
for i, c in cids: for i, c in cids:
{ c: (2 + i).u256 } { c: Presence(cid: c, price: (2 + i).u256) }
peerCtxs[9].peerPrices = collect(initTable): peerCtxs[9].blocks = collect(initTable):
for i, c in cids: for i, c in cids:
{ c: i.u256 } { c: Presence(cid: c, price: i.u256) }
let let
peers = store.selectCheapest(cids[0]) peers = store.selectCheapest(cids[0])
@ -95,7 +96,7 @@ suite "Peer Context Store Peer Selection":
`block`: it.data.buffer, `block`: it.data.buffer,
priority: 1, priority: 1,
cancel: false, cancel: false,
wantType: WantType.wantBlock, wantType: WantType.WantBlock,
sendDontHave: false)) sendDontHave: false))
peerCtxs[0].peerWants = entries peerCtxs[0].peerWants = entries