Extract Discovery engine (#99)

* don't force logging syncs

* Add failing test

* wip discovery engine

* re-add chronicles sinks

* wip

* move network related stuff to own folder

* move peer related stuff to own folder

* extract discovery into it's own engine

* update imports

* move pending blocks into engine module

* add top level exports

* update imports

* update import paths

* update imports

* support for inflight request filtering and tests

* use `remove` instead of `del`

* fix sorting in `selectCheapest`

* re-org test file structure

* fix to use discovery engine

* file re-org

* fix compilation

* fixup discovery to use async handlers

* more re-org

* rework with support for discovery engine

* add logging

* use defaults

* wip: reworking with discoveryengine

* wip: more test fixes

* more logging

* use ordered table

* use `bt` for blocktype Block

* fix tests

* make tests work with discovery engine

* expose all node components

* fix to work with discovery engine

* wip

* propagate cancellation in listBlocks

* start/stop disc engine in blockexc engine

* remove disc engine start/stop

* wire up discovery engine

* misc comments and imports

* pass discovery to dagger node

* set sleep timers

* unused imports

* misc

* don't spawn a task, await it

* don't await handlers

* trace logging

* reduce default sleep time

Co-authored-by: Tanguy <tanguy@status.im>
This commit is contained in:
Dmitriy Ryajov 2022-05-18 20:29:15 -06:00 committed by GitHub
parent d669e344bc
commit d3dbbc75fa
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
30 changed files with 1164 additions and 475 deletions

View File

@ -1,11 +1,10 @@
import ./blockexchange/[
network,
engine,
peercontext]
peers]
import ./blockexchange/protobuf/[
blockexc,
payments,
presence]
export network, engine, peercontext, blockexc, payments, presence
export network, engine, blockexc, presence, peers

View File

@ -17,17 +17,17 @@ import pkg/libp2p
import ../stores/blockstore
import ../blocktype as bt
import ../utils
import ../discovery
import ./protobuf/blockexc
import ./protobuf/presence
import ./network
import ./pendingblocks
import ./peercontext
import ./peers
import ./engine/payments
import ./engine/discovery
import ./pendingblocks
export peercontext, payments, pendingblocks
export peers, pendingblocks, payments, discovery
logScope:
topics = "dagger blockexc engine"
@ -48,28 +48,18 @@ type
TaskScheduler* = proc(task: BlockExcPeerCtx): bool {.gcsafe.}
BlockExcEngine* = ref object of RootObj
localStore*: BlockStore # where we localStore blocks for this instance
network*: BlockExcNetwork # network interface
peers*: seq[BlockExcPeerCtx] # peers we're currently actively exchanging with
taskQueue*: AsyncHeapQueue[BlockExcPeerCtx] # peers we're currently processing tasks for
concurrentTasks: int # number of concurrent peers we're serving at any given time
maxRetries: int # max number of tries for a failed block
blockexcTasks: seq[Future[void]] # future to control blockexc task
blockexcRunning: bool # indicates if the blockexc task is running
pendingBlocks*: PendingBlocksManager # blocks we're awaiting to be resolved
peersPerRequest: int # max number of peers to request from
wallet*: WalletRef # nitro wallet for micropayments
pricing*: ?Pricing # optional bandwidth pricing
discovery*: Discovery # Discovery interface
concurrentAdvReqs: int # Concurrent advertise requests
advertiseLoop*: Future[void] # Advertise loop task handle
advertiseQueue*: AsyncQueue[Cid] # Advertise queue
advertiseTasks*: seq[Future[void]] # Advertise tasks
concurrentDiscReqs: int # Concurrent discovery requests
discoveryLoop*: Future[void] # Discovery loop task handle
discoveryTasks*: seq[Future[void]] # Discovery tasks
discoveryQueue*: AsyncQueue[Cid] # Discovery queue
minPeersPerBlock*: int # Max number of peers with block
localStore*: BlockStore # Local block store for this instance
network*: BlockExcNetwork # Petwork interface
peers*: PeerCtxStore # Peers we're currently actively exchanging with
taskQueue*: AsyncHeapQueue[BlockExcPeerCtx] # Peers we're currently processing tasks for
concurrentTasks: int # Number of concurrent peers we're serving at any given time
blockexcTasks: seq[Future[void]] # Future to control blockexc task
blockexcRunning: bool # Indicates if the blockexc task is running
pendingBlocks*: PendingBlocksManager # Blocks we're awaiting to be resolved
peersPerRequest: int # Max number of peers to request from
wallet*: WalletRef # Nitro wallet for micropayments
pricing*: ?Pricing # Optional bandwidth pricing
discovery*: DiscoveryEngine
Pricing* = object
address*: EthAddress
@ -81,115 +71,19 @@ proc contains*(a: AsyncHeapQueue[Entry], b: Cid): bool =
a.anyIt( it.cid == b )
proc getPeerCtx*(b: BlockExcEngine, peerId: PeerID): BlockExcPeerCtx =
## Get the peer's context
##
let peer = b.peers.filterIt( it.id == peerId )
if peer.len > 0:
return peer[0]
# attach task scheduler to engine
proc scheduleTask(b: BlockExcEngine, task: BlockExcPeerCtx): bool {.gcsafe} =
b.taskQueue.pushOrUpdateNoWait(task).isOk()
proc blockexcTaskRunner(b: BlockExcEngine): Future[void] {.gcsafe.}
proc discoveryLoopRunner(b: BlockExcEngine) {.async.} =
while b.blockexcRunning:
for cid in toSeq(b.pendingBlocks.wantList):
try:
await b.discoveryQueue.put(cid)
except CatchableError as exc:
trace "Exception in discovery loop", exc = exc.msg
trace "About to sleep, number of wanted blocks", wanted = b.pendingBlocks.len
await sleepAsync(30.seconds)
proc advertiseLoopRunner*(b: BlockExcEngine) {.async.} =
proc onBlock(cid: Cid) {.async.} =
try:
await b.advertiseQueue.put(cid)
except CatchableError as exc:
trace "Exception listing blocks", exc = exc.msg
while b.blockexcRunning:
await b.localStore.listBlocks(onBlock)
await sleepAsync(30.seconds)
trace "Exiting advertise task loop"
proc advertiseTaskRunner(b: BlockExcEngine) {.async.} =
## Run advertise tasks
##
while b.blockexcRunning:
try:
let cid = await b.advertiseQueue.get()
await b.discovery.provideBlock(cid)
except CatchableError as exc:
trace "Exception in advertise task runner", exc = exc.msg
trace "Exiting advertise task runner"
proc discoveryTaskRunner(b: BlockExcEngine) {.async.} =
## Run discovery tasks
##
while b.blockexcRunning:
try:
let
cid = await b.discoveryQueue.get()
haves = b.peers.filterIt(
it.peerHave.anyIt( it == cid )
)
trace "Got peers for block", cid = $cid, count = haves.len
let
providers =
if haves.len < b.minPeersPerBlock:
await b.discovery
.findBlockProviders(cid)
.wait(DefaultDiscoveryTimeout)
else:
@[]
checkFutures providers.mapIt( b.network.dialPeer(it.data) )
except CatchableError as exc:
trace "Exception in discovery task runner", exc = exc.msg
trace "Exiting discovery task runner"
template queueFindBlocksReq(b: BlockExcEngine, cids: seq[Cid]) =
proc queueReq() {.async.} =
try:
for cid in cids:
if cid notin b.discoveryQueue:
trace "Queueing find block request", cid = $cid
await b.discoveryQueue.put(cid)
except CatchableError as exc:
trace "Exception queueing discovery request", exc = exc.msg
asyncSpawn queueReq()
template queueProvideBlocksReq(b: BlockExcEngine, cids: seq[Cid]) =
proc queueReq() {.async.} =
try:
for cid in cids:
if cid notin b.advertiseQueue:
trace "Queueing provide block request", cid = $cid
await b.advertiseQueue.put(cid)
except CatchableError as exc:
trace "Exception queueing discovery request", exc = exc.msg
asyncSpawn queueReq()
proc start*(b: BlockExcEngine) {.async.} =
## Start the blockexc task
##
trace "blockexc start"
await b.discovery.start()
trace "Blockexc starting with concurrent tasks", tasks = b.concurrentTasks
if b.blockexcRunning:
warn "Starting blockexc twice"
return
@ -198,19 +92,12 @@ proc start*(b: BlockExcEngine) {.async.} =
for i in 0..<b.concurrentTasks:
b.blockexcTasks.add(blockexcTaskRunner(b))
for i in 0..<b.concurrentAdvReqs:
b.advertiseTasks.add(advertiseTaskRunner(b))
for i in 0..<b.concurrentDiscReqs:
b.discoveryTasks.add(discoveryTaskRunner(b))
b.advertiseLoop = advertiseLoopRunner(b)
b.discoveryLoop = discoveryLoopRunner(b)
proc stop*(b: BlockExcEngine) {.async.} =
## Stop the blockexc blockexc
##
await b.discovery.stop()
trace "NetworkStore stop"
if not b.blockexcRunning:
warn "Stopping blockexc without starting it"
@ -223,28 +110,6 @@ proc stop*(b: BlockExcEngine) {.async.} =
await t.cancelAndWait()
trace "Task stopped"
for t in b.advertiseTasks:
if not t.finished:
trace "Awaiting task to stop"
await t.cancelAndWait()
trace "Task stopped"
for t in b.discoveryTasks:
if not t.finished:
trace "Awaiting task to stop"
await t.cancelAndWait()
trace "Task stopped"
if not b.advertiseLoop.isNil and not b.advertiseLoop.finished:
trace "Awaiting advertise loop to stop"
await b.advertiseLoop.cancelAndWait()
trace "Advertise loop stopped"
if not b.discoveryLoop.isNil and not b.discoveryLoop.finished:
trace "Awaiting discovery loop to stop"
await b.discoveryLoop.cancelAndWait()
trace "Discovery loop stopped"
trace "NetworkStore stopped"
proc requestBlock*(
@ -262,30 +127,18 @@ proc requestBlock*(
let
blk = b.pendingBlocks.getWantHandle(cid, timeout)
if b.peers.len <= 0:
var
peers = b.peers.selectCheapest(cid)
if peers.len <= 0:
peers = toSeq(b.peers) # Get any peer
if peers.len <= 0:
trace "No peers to request blocks from", cid = $cid
b.queueFindBlocksReq(@[cid])
b.discovery.queueFindBlocksReq(@[cid])
return blk
var peers = b.peers
# get the first peer with at least one (any)
# matching cid
# TODO: this should be sorted by best to worst
var blockPeer: BlockExcPeerCtx
for p in peers:
if cid in p.peerHave:
blockPeer = p
break
# didn't find any peer with matching cids
if isNil(blockPeer):
blockPeer = peers[0]
trace "No peers with block, sending to first peer", peer = blockPeer.id
peers.keepItIf(
it != blockPeer and cid notin it.peerHave
)
let
blockPeer = peers[0] # get cheapest
# request block
b.network.request.sendWantList(
@ -293,15 +146,15 @@ proc requestBlock*(
@[cid],
wantType = WantType.wantBlock) # we want this remote to send us a block
if peers.len == 0:
if (peers.len - 1) == 0:
trace "Not enough peers to send want list to", cid = $cid
b.queueFindBlocksReq(@[cid])
b.discovery.queueFindBlocksReq(@[cid])
return blk # no peers to send wants to
# filter out the peer we've already requested from
let stop = min(peers.high, b.peersPerRequest)
trace "Sending want list requests to remaining peers", count = stop + 1
for p in peers[0..stop]:
for p in peers[1..stop]:
if cid notin p.peerHave:
# just send wants
b.network.request.sendWantList(
@ -319,7 +172,7 @@ proc blockPresenceHandler*(
##
trace "Received presence update for peer", peer
let peerCtx = b.getPeerCtx(peer)
let peerCtx = b.peers.get(peer)
if isNil(peerCtx):
return
@ -332,8 +185,7 @@ proc blockPresenceHandler*(
it in peerCtx.peerHave
)
trace "Received presence update for cids", peer, cids = $cids
trace "Received presence update for cids", peer, count = cids.len
if cids.len > 0:
b.network.request.sendWantList(
peer,
@ -342,22 +194,28 @@ proc blockPresenceHandler*(
# if none of the connected peers report our wants in their have list,
# fire up discovery
b.queueFindBlocksReq(toSeq(b.pendingBlocks.wantList)
.filter(proc(cid: Cid): bool =
(not b.peers.anyIt( cid in it.peerHave ))))
b.discovery.queueFindBlocksReq(
toSeq(b.pendingBlocks.wantList)
.filter do(cid: Cid) -> bool:
not b.peers.anyIt( cid in it.peerHave ))
proc scheduleTasks(b: BlockExcEngine, blocks: seq[bt.Block]) =
trace "Schedule a task for new blocks"
let cids = blocks.mapIt( it.cid )
let
cids = blocks.mapIt( it.cid )
# schedule any new peers to provide blocks to
for p in b.peers:
for c in cids: # for each cid
# schedule a peer if it wants at least one
# cid and we have it in our local store
if c in p.peerWants and c in b.localStore:
if not b.scheduleTask(p):
if b.scheduleTask(p):
trace "Task scheduled for peer", peer = p.id
else:
trace "Unable to schedule task for peer", peer = p.id
break # do next peer
proc resolveBlocks*(b: BlockExcEngine, blocks: seq[bt.Block]) =
@ -369,7 +227,7 @@ proc resolveBlocks*(b: BlockExcEngine, blocks: seq[bt.Block]) =
b.pendingBlocks.resolve(blocks)
b.scheduleTasks(blocks)
b.queueProvideBlocksReq(blocks.mapIt( it.cid ))
b.discovery.queueProvideBlocksReq(blocks.mapIt( it.cid ))
proc payForBlocks(engine: BlockExcEngine,
peer: BlockExcPeerCtx,
@ -396,7 +254,7 @@ proc blocksHandler*(
continue
b.resolveBlocks(blocks)
let peerCtx = b.getPeerCtx(peer)
let peerCtx = b.peers.get(peer)
if peerCtx != nil:
b.payForBlocks(peerCtx, blocks)
@ -408,7 +266,7 @@ proc wantListHandler*(
##
trace "Got want list for peer", peer
let peerCtx = b.getPeerCtx(peer)
let peerCtx = b.peers.get(peer)
if isNil(peerCtx):
return
@ -449,7 +307,7 @@ proc accountHandler*(
engine: BlockExcEngine,
peer: PeerID,
account: Account) {.async.} =
let context = engine.getPeerCtx(peer)
let context = engine.peers.get(peer)
if context.isNil:
return
@ -459,7 +317,7 @@ proc paymentHandler*(
engine: BlockExcEngine,
peer: PeerId,
payment: SignedState) {.async.} =
without context =? engine.getPeerCtx(peer).option and
without context =? engine.peers.get(peer).option and
account =? context.account:
return
@ -494,7 +352,7 @@ proc dropPeer*(b: BlockExcEngine, peer: PeerID) =
trace "Dropping peer", peer
# drop the peer from the peers table
b.peers.keepItIf( it.id != peer )
b.peers.remove(peer)
proc taskHandler*(b: BlockExcEngine, task: BlockExcPeerCtx) {.gcsafe, async.} =
trace "Handling task for peer", peer = task.id
@ -546,9 +404,13 @@ proc blockexcTaskRunner(b: BlockExcEngine) {.async.} =
## process tasks
##
trace "Starting blockexc task runner"
while b.blockexcRunning:
let peerCtx = await b.taskQueue.pop()
asyncSpawn b.taskHandler(peerCtx)
let
peerCtx = await b.taskQueue.pop()
trace "Got new task from queue", peerId = peerCtx.id
await b.taskHandler(peerCtx)
trace "Exiting blockexc task runner"
@ -557,30 +419,23 @@ proc new*(
localStore: BlockStore,
wallet: WalletRef,
network: BlockExcNetwork,
discovery: Discovery,
discovery: DiscoveryEngine,
peerStore: PeerCtxStore,
pendingBlocks: PendingBlocksManager,
concurrentTasks = DefaultConcurrentTasks,
maxRetries = DefaultMaxRetries,
peersPerRequest = DefaultMaxPeersPerRequest,
concurrentAdvReqs = DefaultConcurrentAdvertRequests,
concurrentDiscReqs = DefaultConcurrentDiscRequests,
minPeersPerBlock = DefaultMinPeersPerBlock): T =
peersPerRequest = DefaultMaxPeersPerRequest): T =
let
engine = BlockExcEngine(
localStore: localStore,
pendingBlocks: PendingBlocksManager.new(),
peers: peerStore,
pendingBlocks: pendingBlocks,
peersPerRequest: peersPerRequest,
network: network,
wallet: wallet,
concurrentTasks: concurrentTasks,
concurrentAdvReqs: concurrentAdvReqs,
concurrentDiscReqs: concurrentDiscReqs,
maxRetries: maxRetries,
taskQueue: newAsyncHeapQueue[BlockExcPeerCtx](DefaultTaskQueueSize),
discovery: discovery,
advertiseQueue: newAsyncQueue[Cid](DefaultTaskQueueSize),
discoveryQueue: newAsyncQueue[Cid](DefaultTaskQueueSize),
minPeersPerBlock: minPeersPerBlock)
discovery: discovery)
proc peerEventHandler(peerId: PeerID, event: PeerEvent) {.async.} =
if event.kind == PeerEventKind.Joined:

View File

@ -0,0 +1,261 @@
## Nim-Dagger
## Copyright (c) 2022 Status Research & Development GmbH
## Licensed under either of
## * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
## * MIT license ([LICENSE-MIT](LICENSE-MIT))
## at your option.
## This file may not be copied, modified, or distributed except according to
## those terms.
import std/sequtils
import pkg/chronos
import pkg/chronicles
import pkg/libp2p
import ../protobuf/presence
import ../network
import ../peers
import ../../utils
import ../../discovery
import ../../stores/blockstore
import ../pendingblocks
logScope:
topics = "dagger discovery engine"
const
DefaultConcurrentDiscRequests = 10
DefaultConcurrentAdvertRequests = 10
DefaultDiscoveryTimeout = 1.minutes
DefaultMinPeersPerBlock = 3
DefaultDiscoveryLoopSleep = 3.seconds
DefaultAdvertiseLoopSleep = 3.seconds
type
DiscoveryEngine* = ref object of RootObj
localStore*: BlockStore # Local block store for this instance
peers*: PeerCtxStore # Peer context store
network*: BlockExcNetwork # Network interface
discovery*: Discovery # Discovery interface
pendingBlocks*: PendingBlocksManager # Blocks we're awaiting to be resolved
discEngineRunning*: bool # Indicates if discovery is running
concurrentAdvReqs: int # Concurrent advertise requests
concurrentDiscReqs: int # Concurrent discovery requests
advertiseLoop*: Future[void] # Advertise loop task handle
advertiseQueue*: AsyncQueue[Cid] # Advertise queue
advertiseTasks*: seq[Future[void]] # Advertise tasks
discoveryLoop*: Future[void] # Discovery loop task handle
discoveryTasks*: seq[Future[void]] # Discovery tasks
discoveryQueue*: AsyncQueue[Cid] # Discovery queue
minPeersPerBlock*: int # Max number of peers with block
discoveryLoopSleep: Duration # Discovery loop sleep
advertiseLoopSleep: Duration # Advertise loop sleep
inFlightDiscReqs*: Table[Cid, Future[seq[SignedPeerRecord]]] # Inflight discovery requests
inFlightAdvReqs*: Table[Cid, Future[void]] # Inflight advertise requests
proc discoveryQueueLoop(b: DiscoveryEngine) {.async.} =
while b.discEngineRunning:
for cid in toSeq(b.pendingBlocks.wantList):
try:
await b.discoveryQueue.put(cid)
except CatchableError as exc:
trace "Exception in discovery loop", exc = exc.msg
logScope:
sleep = b.discoveryLoopSleep
wanted = b.pendingBlocks.len
trace "About to sleep discovery loop"
await sleepAsync(b.discoveryLoopSleep)
proc advertiseQueueLoop*(b: DiscoveryEngine) {.async.} =
proc onBlock(cid: Cid) {.async.} =
try:
await b.advertiseQueue.put(cid)
except CancelledError as exc:
trace "Cancelling block listing"
raise exc
except CatchableError as exc:
trace "Exception listing blocks", exc = exc.msg
while b.discEngineRunning:
await b.localStore.listBlocks(onBlock)
trace "About to sleep advertise loop", sleep = b.advertiseLoopSleep
await sleepAsync(b.advertiseLoopSleep)
trace "Exiting advertise task loop"
proc advertiseTaskLoop(b: DiscoveryEngine) {.async.} =
## Run advertise tasks
##
while b.discEngineRunning:
try:
let
cid = await b.advertiseQueue.get()
if cid in b.inFlightAdvReqs:
trace "Advertise request already in progress", cid = $cid
continue
try:
trace "Advertising block", cid = $cid
let request = b.discovery.provideBlock(cid)
b.inFlightAdvReqs[cid] = request
await request
finally:
b.inFlightAdvReqs.del(cid)
except CatchableError as exc:
trace "Exception in advertise task runner", exc = exc.msg
trace "Exiting advertise task runner"
proc discoveryTaskLoop(b: DiscoveryEngine) {.async.} =
## Run discovery tasks
##
while b.discEngineRunning:
try:
let
cid = await b.discoveryQueue.get()
if cid in b.inFlightDiscReqs:
trace "Discovery request already in progress", cid = $cid
continue
let
haves = b.peers.peersHave(cid)
trace "Current number of peers for block", cid = $cid, count = haves.len
if haves.len < b.minPeersPerBlock:
trace "Discovering block", cid = $cid
try:
let
request = b.discovery
.findBlockProviders(cid)
.wait(DefaultDiscoveryTimeout)
b.inFlightDiscReqs[cid] = request
let
peers = await request
trace "Discovered peers", peers = peers.len
checkFutures(
await allFinished(peers.mapIt( b.network.dialPeer(it.data))))
finally:
b.inFlightDiscReqs.del(cid)
except CatchableError as exc:
trace "Exception in discovery task runner", exc = exc.msg
trace "Exiting discovery task runner"
proc queueFindBlocksReq*(b: DiscoveryEngine, cids: seq[Cid]) {.inline.} =
proc queueReq() {.async.} =
try:
for cid in cids:
if cid notin b.discoveryQueue:
trace "Queueing find block request", cid = $cid
await b.discoveryQueue.put(cid)
except CatchableError as exc:
trace "Exception queueing discovery request", exc = exc.msg
asyncSpawn queueReq()
proc queueProvideBlocksReq*(b: DiscoveryEngine, cids: seq[Cid]) {.inline.} =
proc queueReq() {.async.} =
try:
for cid in cids:
if cid notin b.advertiseQueue:
trace "Queueing provide block request", cid = $cid
await b.advertiseQueue.put(cid)
except CatchableError as exc:
trace "Exception queueing discovery request", exc = exc.msg
asyncSpawn queueReq()
proc start*(b: DiscoveryEngine) {.async.} =
## Start the discengine task
##
trace "discovery engine start"
if b.discEngineRunning:
warn "Starting discovery engine twice"
return
b.discEngineRunning = true
for i in 0..<b.concurrentAdvReqs:
b.advertiseTasks.add(advertiseTaskLoop(b))
for i in 0..<b.concurrentDiscReqs:
b.discoveryTasks.add(discoveryTaskLoop(b))
b.advertiseLoop = advertiseQueueLoop(b)
b.discoveryLoop = discoveryQueueLoop(b)
proc stop*(b: DiscoveryEngine) {.async.} =
## Stop the discovery engine
##
trace "Discovery engine stop"
if not b.discEngineRunning:
warn "Stopping discovery engine without starting it"
return
b.discEngineRunning = false
for t in b.advertiseTasks:
if not t.finished:
trace "Awaiting advertise task to stop"
await t.cancelAndWait()
trace "Advertise task stopped"
for t in b.discoveryTasks:
if not t.finished:
trace "Awaiting discovery task to stop"
await t.cancelAndWait()
trace "Discovery task stopped"
if not b.advertiseLoop.isNil and not b.advertiseLoop.finished:
trace "Awaiting advertise loop to stop"
await b.advertiseLoop.cancelAndWait()
trace "Advertise loop stopped"
if not b.discoveryLoop.isNil and not b.discoveryLoop.finished:
trace "Awaiting discovery loop to stop"
await b.discoveryLoop.cancelAndWait()
trace "Discovery loop stopped"
trace "Discovery engine stopped"
proc new*(
T: type DiscoveryEngine,
localStore: BlockStore,
peers: PeerCtxStore,
network: BlockExcNetwork,
discovery: Discovery,
pendingBlocks: PendingBlocksManager,
concurrentAdvReqs = DefaultConcurrentAdvertRequests,
concurrentDiscReqs = DefaultConcurrentDiscRequests,
discoveryLoopSleep = DefaultDiscoveryLoopSleep,
advertiseLoopSleep = DefaultAdvertiseLoopSleep,
minPeersPerBlock = DefaultMinPeersPerBlock,): DiscoveryEngine =
T(
localStore: localStore,
peers: peers,
network: network,
discovery: discovery,
pendingBlocks: pendingBlocks,
concurrentAdvReqs: concurrentAdvReqs,
concurrentDiscReqs: concurrentDiscReqs,
advertiseQueue: newAsyncQueue[Cid](concurrentAdvReqs),
discoveryQueue: newAsyncQueue[Cid](concurrentDiscReqs),
inFlightDiscReqs: initTable[Cid, Future[seq[SignedPeerRecord]]](),
inFlightAdvReqs: initTable[Cid, Future[void]](),
discoveryLoopSleep: discoveryLoopSleep,
advertiseLoopSleep: advertiseLoopSleep,
minPeersPerBlock: minPeersPerBlock)

View File

@ -1,11 +1,10 @@
import std/math
import pkg/nitro
import pkg/questionable/results
import ../peercontext
import ../peers
export nitro
export results
export peercontext
push: {.upraises: [].}

View File

@ -20,6 +20,7 @@ import pkg/questionable/results
import ../blocktype as bt
import ./protobuf/blockexc as pb
import ./protobuf/payments
import ./networkpeer
export networkpeer, payments
@ -173,10 +174,11 @@ proc broadcastBlocks*(
##
if id notin b.peers:
trace "Unable to send blocks, peer disconnected", peer = id
return
trace "Sending blocks to peer", peer = id, len = blocks.len
b.peers.withValue(id, peer):
trace "Sending blocks to peer", peer = id, len = blocks.len
peer[].broadcast(pb.Message(payload: makeBlocks(blocks)))
proc handleBlockPresence(
@ -243,19 +245,19 @@ proc handlePayment(network: BlockExcNetwork,
proc rpcHandler(b: BlockExcNetwork, peer: NetworkPeer, msg: Message) {.async.} =
try:
if msg.wantlist.entries.len > 0:
await b.handleWantList(peer, msg.wantlist)
asyncSpawn b.handleWantList(peer, msg.wantlist)
if msg.payload.len > 0:
await b.handleBlocks(peer, msg.payload)
asyncSpawn b.handleBlocks(peer, msg.payload)
if msg.blockPresences.len > 0:
await b.handleBlockPresence(peer, msg.blockPresences)
asyncSpawn b.handleBlockPresence(peer, msg.blockPresences)
if account =? Account.init(msg.account):
await b.handleAccount(peer, account)
asyncSpawn b.handleAccount(peer, account)
if payment =? SignedState.init(msg.payment):
await b.handlePayment(peer, payment)
asyncSpawn b.handlePayment(peer, payment)
except CatchableError as exc:
trace "Exception in blockexc rpc handler", exc = exc.msg
@ -298,7 +300,7 @@ proc dialPeer*(b: BlockExcNetwork, peer: PeerRecord) {.async.} =
try:
await b.switch.connect(peer.peerId, peer.addresses.mapIt(it.address))
except CatchableError as exc:
debug "Failed to connect to peer", error=exc.msg
debug "Failed to connect to peer", error = exc.msg, peer
proc dropPeer*(b: BlockExcNetwork, peer: PeerID) =
## Cleanup disconnected peer

View File

@ -17,7 +17,8 @@ import ./protobuf/blockexc
logScope:
topics = "dagger blockexc networkpeer"
const MaxMessageSize = 100 * 1024 * 1024 # manifest files can be big
const
MaxMessageSize = 100 * 1024 * 1024 # manifest files can be big
type
RPCHandler* = proc(peer: NetworkPeer, msg: Message): Future[void] {.gcsafe.}

View File

@ -4,6 +4,7 @@ import pkg/libp2p
import pkg/chronos
import pkg/nitro
import pkg/questionable
import ./protobuf/blockexc
import ./protobuf/payments
import ./protobuf/presence

View File

@ -0,0 +1,3 @@
import ./peers/peerctxstore
export peerctxstore

View File

@ -0,0 +1,82 @@
## Nim-Dagger
## Copyright (c) 2022 Status Research & Development GmbH
## Licensed under either of
## * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
## * MIT license ([LICENSE-MIT](LICENSE-MIT))
## at your option.
## This file may not be copied, modified, or distributed except according to
## those terms.
import std/sequtils
import std/tables
import std/algorithm
import pkg/upraises
push: {.upraises: [].}
import pkg/chronos
import pkg/chronicles
import pkg/libp2p
import ../protobuf/blockexc
import ../peercontext
export peercontext
logScope:
topics = "dagger blockexc peerctxstore"
type
PeerCtxStore* = ref object of RootObj
peers*: OrderedTable[PeerID, BlockExcPeerCtx]
iterator items*(self: PeerCtxStore): BlockExcPeerCtx =
for p in self.peers.values:
yield p
func contains*(self: PeerCtxStore, peerId: PeerID): bool =
peerId in self.peers
func add*(self: PeerCtxStore, peer: BlockExcPeerCtx) =
trace "Adding peer to peer context store", peer = peer.id
self.peers[peer.id] = peer
func remove*(self: PeerCtxStore, peerId: PeerID) =
trace "Removing peer from peer context store", peer = peerId
self.peers.del(peerId)
func get*(self: PeerCtxStore, peerId: PeerID): BlockExcPeerCtx =
trace "Retrieving peer from peer context store", peer = peerId
self.peers.getOrDefault(peerId, nil)
func len*(self: PeerCtxStore): int =
self.peers.len
func peersHave*(self: PeerCtxStore, cid: Cid): seq[BlockExcPeerCtx] =
toSeq(self.peers.values).filterIt( it.peerHave.anyIt( it == cid ) )
func peersWant*(self: PeerCtxStore, cid: Cid): seq[BlockExcPeerCtx] =
toSeq(self.peers.values).filterIt( it.peerWants.anyIt( it.cid == cid ) )
func selectCheapest*(self: PeerCtxStore, cid: Cid): seq[BlockExcPeerCtx] =
var
peers = self.peersHave(cid)
func cmp(a, b: BlockExcPeerCtx): int =
# Can't do (a - b) without cast[int](a - b)
if a.peerPrices.getOrDefault(cid, 0.u256) ==
b.peerPrices.getOrDefault(cid, 0.u256):
0
elif a.peerPrices.getOrDefault(cid, 0.u256) >
b.peerPrices.getOrDefault(cid, 0.u256):
1
else:
-1
peers.sort(cmp)
trace "Selected cheapest peers", peers = peers.len
return peers
proc new*(T: type PeerCtxStore): PeerCtxStore =
T(peers: initOrderedTable[PeerID, BlockExcPeerCtx]())

View File

@ -8,7 +8,6 @@
## those terms.
import std/tables
import std/sequtils
import pkg/upraises
@ -52,12 +51,6 @@ proc getWantHandle*(
finally:
p.blocks.del(cid)
proc addOrAwait*(
p: PendingBlocksManager,
cid: Cid,
timeout = DefaultBlockTimeout): Future[Block] {.deprecated: "Use getWantHandle".} =
p.getWantHandle(cid, timeout)
proc resolve*(
p: PendingBlocksManager,
blocks: seq[Block]) =
@ -94,5 +87,4 @@ func len*(p: PendingBlocksManager): int =
func new*(T: type PendingBlocksManager): T =
T(
blocks: initTable[Cid, Future[Block]]()
)
blocks: initTable[Cid, Future[Block]]())

View File

@ -110,7 +110,7 @@ proc new*(T: type DaggerServer, config: DaggerConf): T =
let
discoveryBootstrapNodes = config.bootstrapNodes
discovery = Discovery.new(
blockDiscovery = Discovery.new(
switch.peerInfo,
discoveryPort = config.discoveryPort,
bootstrapNodes = discoveryBootstrapNodes
@ -119,7 +119,10 @@ proc new*(T: type DaggerServer, config: DaggerConf): T =
wallet = WalletRef.new(EthPrivateKey.random())
network = BlockExcNetwork.new(switch)
localStore = FSStore.new(config.dataDir / "repo", cache = cache)
engine = BlockExcEngine.new(localStore, wallet, network, discovery)
peerStore = PeerCtxStore.new()
pendingBlocks = PendingBlocksManager.new()
discovery = DiscoveryEngine.new(localStore, peerStore, network, blockDiscovery, pendingBlocks)
engine = BlockExcEngine.new(localStore, wallet, network, discovery, peerStore, pendingBlocks)
store = NetworkStore.new(engine, localStore)
erasure = Erasure.new(store, leoEncoderProvider, leoDecoderProvider)
contracts = ContractInteractions.new(
@ -127,7 +130,7 @@ proc new*(T: type DaggerServer, config: DaggerConf): T =
config.ethDeployment,
config.ethAccount
)
daggerNode = DaggerNodeRef.new(switch, store, engine, erasure, discovery, contracts)
daggerNode = DaggerNodeRef.new(switch, store, engine, erasure, blockDiscovery, contracts)
restServer = RestServerRef.new(
daggerNode.initRestApi(config),
initTAddress("127.0.0.1" , config.apiPort),

View File

@ -28,8 +28,8 @@ type
proc new*(
T: type Discovery,
localInfo: PeerInfo,
discoveryPort: Port,
bootstrapNodes = newSeq[SignedPeerRecord](),
discoveryPort = 0.Port,
bootstrapNodes: seq[SignedPeerRecord] = @[],
): T =
T(
@ -40,8 +40,7 @@ proc new*(
bootstrapRecords = bootstrapNodes,
rng = Rng.instance()
),
localInfo: localInfo
)
localInfo: localInfo)
proc findPeer*(
d: Discovery,

View File

@ -19,7 +19,7 @@ import pkg/chronicles
import ../manifest
import ../stores
import ../errors
import ../blocktype
import ../blocktype as bt
import ./backend
@ -96,7 +96,7 @@ proc encode*(
try:
for i in 0..<encoded.steps:
# TODO: Don't allocate a new seq everytime, allocate once and zero out
# TODO: Don't allocate a new seq every time, allocate once and zero out
var
data = newSeq[seq[byte]](blocks) # number of blocks to encode
parityData = newSeqWith[seq[byte]](parity, newSeq[byte](manifest.blockSize))
@ -133,7 +133,7 @@ proc encode*(
for j in 0..<parity:
let idx = encoded.rounded + blockIdx[j]
without blk =? Block.new(parityData[j]), error:
without blk =? bt.Block.new(parityData[j]), error:
trace "Unable to create parity block", err = error.msg
return failure(error)
@ -173,7 +173,7 @@ proc decode*(
try:
for i in 0..<encoded.steps:
# TODO: Don't allocate a new seq everytime, allocate once and zero out
# TODO: Don't allocate a new seq every time, allocate once and zero out
let
# calculate block indexes to retrieve
blockIdx = toSeq(countup(i, encoded.len - 1, encoded.steps))
@ -237,7 +237,7 @@ proc decode*(
for i in 0..<encoded.K:
if data[i].len <= 0:
without blk =? Block.new(recovered[i]), error:
without blk =? bt.Block.new(recovered[i]), error:
trace "Unable to create block!", exc = error.msg
return failure(error)

View File

@ -130,7 +130,7 @@ method hasBlock*(self: FSStore, cid: Cid): bool =
self.blockPath(cid).isFile()
method listBlocks*(self: FSStore, onBlock: OnBlock) {.async.} =
debug "Finding all blocks in store"
debug "Listing all blocks in store"
for (pkind, folderPath) in self.repoDir.walkDir():
if pkind != pcDir: continue
let baseName = basename(folderPath)
@ -144,9 +144,14 @@ method listBlocks*(self: FSStore, onBlock: OnBlock) {.async.} =
# compilation error if using different syntax/construct bellow
try:
await onBlock(cid.get())
except CancelledError as exc:
trace "Cancelling list blocks"
raise exc
except CatchableError as exc:
trace "Couldn't get block", cid = $(cid.get())
# TODO: this should run on a thread which
# wouldn't need the sleep
await sleepAsync(100.millis) # avoid blocking
proc new*(

View File

@ -19,11 +19,9 @@ import ../blocktype as bt
import ../utils/asyncheapqueue
import ./blockstore
import ../blockexchange/network
import ../blockexchange/engine
import ../blockexchange/peercontext
import ../blockexchange
export blockstore, network, engine, asyncheapqueue
export blockstore, blockexchange, asyncheapqueue
logScope:
topics = "dagger networkstore"

View File

@ -7,8 +7,6 @@
## This file may not be copied, modified, or distributed except according to
## those terms.
import std/strformat
import pkg/upraises
push: {.upraises: [].}

View File

@ -1,11 +1,9 @@
import std/sequtils
import std/sugar
import std/algorithm
import std/tables
import pkg/asynctest
import pkg/chronos
import pkg/stew/byteutils
import pkg/libp2p
import pkg/libp2p/errors
@ -16,7 +14,7 @@ import pkg/dagger/blockexchange
import pkg/dagger/chunker
import pkg/dagger/blocktype as bt
import ./mockdiscovery
import ../../helpers/mockdiscovery
import ../../helpers
import ../../examples
@ -27,11 +25,14 @@ suite "Block Advertising and Discovery":
var
blocks: seq[bt.Block]
switch: Switch
discovery: MockDiscovery
peerStore: PeerCtxStore
blockDiscovery: MockDiscovery
discovery: DiscoveryEngine
wallet: WalletRef
network: BlockExcNetwork
localStore: CacheStore
engine: BlockExcEngine
pendingBlocks: PendingBlocksManager
setup:
while true:
@ -42,11 +43,29 @@ suite "Block Advertising and Discovery":
blocks.add(bt.Block.new(chunk).tryGet())
switch = newStandardSwitch(transportFlags = {ServerFlags.ReuseAddr})
discovery = MockDiscovery.new(switch.peerInfo, 0.Port)
blockDiscovery = MockDiscovery.new(switch.peerInfo, 0.Port)
wallet = WalletRef.example
network = BlockExcNetwork.new(switch)
localStore = CacheStore.new(blocks.mapIt( it ))
engine = BlockExcEngine.new(localStore, wallet, network, discovery, minPeersPerBlock = 1)
peerStore = PeerCtxStore.new()
pendingBlocks = PendingBlocksManager.new()
discovery = DiscoveryEngine.new(
localStore,
peerStore,
network,
blockDiscovery,
pendingBlocks,
minPeersPerBlock = 1)
engine = BlockExcEngine.new(
localStore,
wallet,
network,
discovery,
peerStore,
pendingBlocks)
switch.mount(network)
test "Should discover want list":
@ -55,9 +74,14 @@ suite "Block Advertising and Discovery":
engine.pendingBlocks.getWantHandle(it.cid)
)
await engine.start() # fire up discovery loop
discovery.findBlockProvidersHandler =
proc(d: MockDiscovery, cid: Cid): seq[SignedPeerRecord] =
await engine.start()
blockDiscovery.publishProvideHandler =
proc(d: MockDiscovery, cid: Cid): Future[void] {.async, gcsafe.} =
return
blockDiscovery.findBlockProvidersHandler =
proc(d: MockDiscovery, cid: Cid): Future[seq[SignedPeerRecord]] {.async.} =
engine.resolveBlocks(blocks.filterIt( it.cid == cid ))
await allFuturesThrowing(
@ -70,7 +94,7 @@ suite "Block Advertising and Discovery":
advertised = initTable.collect:
for b in blocks: {b.cid: newFuture[void]()}
discovery.publishProvideHandler = proc(d: MockDiscovery, cid: Cid) =
blockDiscovery.publishProvideHandler = proc(d: MockDiscovery, cid: Cid) {.async.} =
if cid in advertised and not advertised[cid].finished():
advertised[cid].complete()
@ -95,8 +119,8 @@ suite "Block Advertising and Discovery":
peerPrices: haves
))
discovery.findBlockProvidersHandler =
proc(d: MockDiscovery, cid: Cid): seq[SignedPeerRecord] =
blockDiscovery.findBlockProvidersHandler =
proc(d: MockDiscovery, cid: Cid): Future[seq[SignedPeerRecord]] =
check false
await engine.start() # fire up discovery loop
@ -126,11 +150,28 @@ suite "E2E - Multiple Nodes Discovery":
for _ in 0..<4:
let
s = newStandardSwitch(transportFlags = {ServerFlags.ReuseAddr})
discovery = MockDiscovery.new(s.peerInfo, 0.Port)
blockDiscovery = MockDiscovery.new(s.peerInfo, 0.Port)
wallet = WalletRef.example
network = BlockExcNetwork.new(s)
localStore = CacheStore.new()
engine = BlockExcEngine.new(localStore, wallet, network, discovery, minPeersPerBlock = 1)
peerStore = PeerCtxStore.new()
pendingBlocks = PendingBlocksManager.new()
discovery = DiscoveryEngine.new(
localStore,
peerStore,
network,
blockDiscovery,
pendingBlocks,
minPeersPerBlock = 1)
engine = BlockExcEngine.new(
localStore,
wallet,
network,
discovery,
peerStore,
pendingBlocks)
networkStore = NetworkStore.new(engine, localStore)
s.mount(network)
@ -147,24 +188,25 @@ suite "E2E - Multiple Nodes Discovery":
var advertised: Table[Cid, SignedPeerRecord]
MockDiscovery(blockexc[1].engine.discovery)
.publishProvideHandler = proc(d: MockDiscovery, cid: Cid) =
MockDiscovery(blockexc[1].engine.discovery.discovery)
.publishProvideHandler = proc(d: MockDiscovery, cid: Cid): Future[void] {.async.} =
advertised.add(cid, switch[1].peerInfo.signedPeerRecord)
MockDiscovery(blockexc[2].engine.discovery)
.publishProvideHandler = proc(d: MockDiscovery, cid: Cid) =
MockDiscovery(blockexc[2].engine.discovery.discovery)
.publishProvideHandler = proc(d: MockDiscovery, cid: Cid): Future[void] {.async.} =
advertised.add(cid, switch[2].peerInfo.signedPeerRecord)
MockDiscovery(blockexc[3].engine.discovery)
.publishProvideHandler = proc(d: MockDiscovery, cid: Cid) =
MockDiscovery(blockexc[3].engine.discovery.discovery)
.publishProvideHandler = proc(d: MockDiscovery, cid: Cid): Future[void] {.async.} =
advertised.add(cid, switch[3].peerInfo.signedPeerRecord)
await blockexc[1].engine.blocksHandler(switch[0].peerInfo.peerId, blocks[0..5])
await blockexc[2].engine.blocksHandler(switch[0].peerInfo.peerId, blocks[4..10])
await blockexc[3].engine.blocksHandler(switch[0].peerInfo.peerId, blocks[10..15])
MockDiscovery(blockexc[0].engine.discovery)
.findBlockProvidersHandler = proc(d: MockDiscovery, cid: Cid): seq[SignedPeerRecord] =
MockDiscovery(blockexc[0].engine.discovery.discovery)
.findBlockProvidersHandler = proc(d: MockDiscovery, cid: Cid):
Future[seq[SignedPeerRecord]] {.async.} =
if cid in advertised:
result.add(advertised[cid])
@ -174,15 +216,13 @@ suite "E2E - Multiple Nodes Discovery":
await allFuturesThrowing(
switch.mapIt( it.start() ) &
blockexc.mapIt( it.engine.start() )
)
blockexc.mapIt( it.engine.start() ))
await allFutures(futs)
await allFuturesThrowing(
blockexc.mapIt( it.engine.stop() ) &
switch.mapIt( it.stop() )
)
switch.mapIt( it.stop() ))
test "E2E - Should advertise and discover blocks with peers already connected":
# Distribute the blocks amongst 1..3
@ -190,43 +230,37 @@ suite "E2E - Multiple Nodes Discovery":
var advertised: Table[Cid, SignedPeerRecord]
MockDiscovery(blockexc[1].engine.discovery)
.publishProvideHandler = proc(d: MockDiscovery, cid: Cid) =
advertised.add(cid, switch[1].peerInfo.signedPeerRecord)
MockDiscovery(blockexc[1].engine.discovery.discovery)
.publishProvideHandler = proc(d: MockDiscovery, cid: Cid): Future[void] {.async.} =
advertised[cid] = switch[1].peerInfo.signedPeerRecord
MockDiscovery(blockexc[2].engine.discovery)
.publishProvideHandler = proc(d: MockDiscovery, cid: Cid) =
advertised.add(cid, switch[2].peerInfo.signedPeerRecord)
MockDiscovery(blockexc[2].engine.discovery.discovery)
.publishProvideHandler = proc(d: MockDiscovery, cid: Cid): Future[void] {.async.} =
advertised[cid] = switch[2].peerInfo.signedPeerRecord
MockDiscovery(blockexc[3].engine.discovery)
.publishProvideHandler = proc(d: MockDiscovery, cid: Cid) =
advertised.add(cid, switch[3].peerInfo.signedPeerRecord)
MockDiscovery(blockexc[3].engine.discovery.discovery)
.publishProvideHandler = proc(d: MockDiscovery, cid: Cid): Future[void] {.async.} =
advertised[cid] = switch[3].peerInfo.signedPeerRecord
await blockexc[1].engine.blocksHandler(switch[0].peerInfo.peerId, blocks[0..5])
await blockexc[2].engine.blocksHandler(switch[0].peerInfo.peerId, blocks[4..10])
await blockexc[3].engine.blocksHandler(switch[0].peerInfo.peerId, blocks[10..15])
MockDiscovery(blockexc[0].engine.discovery)
.findBlockProvidersHandler = proc(d: MockDiscovery, cid: Cid): seq[SignedPeerRecord] =
MockDiscovery(blockexc[0].engine.discovery.discovery)
.findBlockProvidersHandler = proc(d: MockDiscovery, cid: Cid):
Future[seq[SignedPeerRecord]] {.async.} =
if cid in advertised:
result.add(advertised[cid])
return @[advertised[cid]]
let
futs = blocks.mapIt( blockexc[0].engine.requestBlock( it.cid ) )
await allFuturesThrowing(
switch.mapIt( it.start() ) &
blockexc.mapIt( it.engine.start() )
)
# Connect to the two first nodes
discard await blockexc[0].engine.requestBlock(blocks[0].cid)
discard await blockexc[0].engine.requestBlock(blocks[6].cid)
let futs = collect(newSeq):
for b in blocks:
blockexc[0].engine.requestBlock(b.cid)
blockexc.mapIt( it.engine.start() ))
await allFutures(futs).wait(10.seconds)
await allFuturesThrowing(
blockexc.mapIt( it.engine.stop() ) &
switch.mapIt( it.stop() )
)
switch.mapIt( it.stop() ))

View File

@ -0,0 +1,237 @@
import std/sequtils
import std/sugar
import std/tables
import pkg/asynctest
import pkg/chronos
import pkg/chronicles
import pkg/libp2p
import pkg/dagger/rng
import pkg/dagger/stores
import pkg/dagger/blockexchange
import pkg/dagger/chunker
import pkg/dagger/blocktype as bt
import pkg/dagger/blockexchange/engine
import ../../helpers/mockdiscovery
import ../../helpers
import ../../examples
suite "Test Discovery Engine":
let chunker = RandomChunker.new(Rng.instance(), size = 4096, chunkSize = 256)
var
blocks: seq[bt.Block]
switch: Switch
peerStore: PeerCtxStore
blockDiscovery: MockDiscovery
pendingBlocks: PendingBlocksManager
localStore: CacheStore
network: BlockExcNetwork
setup:
while true:
let chunk = await chunker.getBytes()
if chunk.len <= 0:
break
blocks.add(bt.Block.new(chunk).tryGet())
switch = newStandardSwitch(transportFlags = {ServerFlags.ReuseAddr})
network = BlockExcNetwork.new(switch)
peerStore = PeerCtxStore.new()
pendingBlocks = PendingBlocksManager.new()
blockDiscovery = MockDiscovery.new()
test "Should Query Wants":
var
localStore = CacheStore.new()
discoveryEngine = DiscoveryEngine.new(
localStore,
peerStore,
network,
blockDiscovery,
pendingBlocks,
discoveryLoopSleep = 100.millis)
wants = blocks.mapIt( pendingBlocks.getWantHandle(it.cid) )
blockDiscovery.findBlockProvidersHandler =
proc(d: MockDiscovery, cid: Cid): Future[seq[SignedPeerRecord]] {.async, gcsafe.} =
pendingBlocks.resolve(blocks.filterIt( it.cid == cid))
await discoveryEngine.start()
await allFuturesThrowing(allFinished(wants)).wait(1.seconds)
await discoveryEngine.stop()
test "Should Advertise Haves":
var
localStore = CacheStore.new(blocks.mapIt( it ))
discoveryEngine = DiscoveryEngine.new(
localStore,
peerStore,
network,
blockDiscovery,
pendingBlocks,
discoveryLoopSleep = 100.millis)
haves = collect(initTable):
for b in blocks:
{ b.cid: newFuture[void]() }
blockDiscovery.publishProvideHandler =
proc(d: MockDiscovery, cid: Cid) {.async, gcsafe.} =
if not haves[cid].finished:
haves[cid].complete
await discoveryEngine.start()
await allFuturesThrowing(
allFinished(toSeq(haves.values))).wait(1.seconds)
await discoveryEngine.stop()
test "Should queue discovery request":
var
localStore = CacheStore.new()
discoveryEngine = DiscoveryEngine.new(
localStore,
peerStore,
network,
blockDiscovery,
pendingBlocks,
discoveryLoopSleep = 100.millis)
want = newFuture[void]()
blockDiscovery.findBlockProvidersHandler =
proc(d: MockDiscovery, cid: Cid): Future[seq[SignedPeerRecord]] {.async, gcsafe.} =
check cid == blocks[0].cid
if not want.finished:
want.complete()
await discoveryEngine.start()
discoveryEngine.queueFindBlocksReq(@[blocks[0].cid])
await want.wait(1.seconds)
await discoveryEngine.stop()
test "Should queue advertise request":
var
localStore = CacheStore.new(@[blocks[0]])
discoveryEngine = DiscoveryEngine.new(
localStore,
peerStore,
network,
blockDiscovery,
pendingBlocks,
discoveryLoopSleep = 100.millis)
have = newFuture[void]()
blockDiscovery.publishProvideHandler =
proc(d: MockDiscovery, cid: Cid) {.async, gcsafe.} =
check cid == blocks[0].cid
if not have.finished:
have.complete()
await discoveryEngine.start()
await have.wait(1.seconds)
await discoveryEngine.stop()
test "Should not request more than minPeersPerBlock":
var
localStore = CacheStore.new()
minPeers = 2
discoveryEngine = DiscoveryEngine.new(
localStore,
peerStore,
network,
blockDiscovery,
pendingBlocks,
discoveryLoopSleep = 5.minutes,
minPeersPerBlock = minPeers)
want = newAsyncEvent()
blockDiscovery.findBlockProvidersHandler =
proc(d: MockDiscovery, cid: Cid): Future[seq[SignedPeerRecord]] {.async, gcsafe.} =
check cid == blocks[0].cid
check peerStore.len < minPeers
var
peerCtx = BlockExcPeerCtx(id: PeerID.example)
peerCtx.peerPrices[cid] = 0.u256
peerStore.add(peerCtx)
want.fire()
await discoveryEngine.start()
while peerStore.len < minPeers:
discoveryEngine.queueFindBlocksReq(@[blocks[0].cid])
await want.wait()
want.clear()
check peerStore.len == minPeers
await discoveryEngine.stop()
test "Should not request if there is already an inflight discovery request":
var
localStore = CacheStore.new()
discoveryEngine = DiscoveryEngine.new(
localStore,
peerStore,
network,
blockDiscovery,
pendingBlocks,
discoveryLoopSleep = 100.millis,
concurrentDiscReqs = 2)
reqs = newFuture[void]()
count = 0
blockDiscovery.findBlockProvidersHandler =
proc(d: MockDiscovery, cid: Cid):
Future[seq[SignedPeerRecord]] {.gcsafe, async.} =
check cid == blocks[0].cid
if count > 0:
check false
count.inc
await reqs # queue the request
await discoveryEngine.start()
discoveryEngine.queueFindBlocksReq(@[blocks[0].cid])
await sleepAsync(200.millis)
discoveryEngine.queueFindBlocksReq(@[blocks[0].cid])
await sleepAsync(200.millis)
reqs.complete()
await discoveryEngine.stop()
test "Should not request if there is already an inflight advertise request":
var
localStore = CacheStore.new()
discoveryEngine = DiscoveryEngine.new(
localStore,
peerStore,
network,
blockDiscovery,
pendingBlocks,
discoveryLoopSleep = 100.millis,
concurrentAdvReqs = 2)
reqs = newFuture[void]()
count = 0
blockDiscovery.publishProvideHandler =
proc(d: MockDiscovery, cid: Cid) {.async, gcsafe.} =
check cid == blocks[0].cid
if count > 0:
check false
count.inc
await reqs # queue the request
await discoveryEngine.start()
discoveryEngine.queueProvideBlocksReq(@[blocks[0].cid])
await sleepAsync(200.millis)
discoveryEngine.queueProvideBlocksReq(@[blocks[0].cid])
await sleepAsync(200.millis)
reqs.complete()
await discoveryEngine.stop()

View File

@ -1,4 +1,3 @@
import std/sequtils
import pkg/asynctest
import pkg/chronos
import pkg/libp2p

View File

@ -19,23 +19,15 @@ import ../helpers
import ../examples
suite "NetworkStore engine - 2 nodes":
let
chunker1 = RandomChunker.new(Rng.instance(), size = 1024, chunkSize = 256)
chunker2 = RandomChunker.new(Rng.instance(), size = 1024, chunkSize = 256)
var
switch1, switch2: Switch
wallet1, wallet2: WalletRef
pricing1, pricing2: Pricing
network1, network2: BlockExcNetwork
blockexc1, blockexc2: NetworkStore
peerId1, peerId2: PeerID
nodeCmps1, nodeCmps2: NodesComponents
peerCtx1, peerCtx2: BlockExcPeerCtx
pricing1, pricing2: Pricing
blocks1, blocks2: seq[bt.Block]
engine1, engine2: BlockExcEngine
localStore1, localStore2: BlockStore
discovery1, discovery2: Discovery
pendingBlocks1, pendingBlocks2: seq[Future[bt.Block]]
setup:
@ -53,70 +45,54 @@ suite "NetworkStore engine - 2 nodes":
blocks2.add(bt.Block.new(chunk).tryGet())
switch1 = newStandardSwitch()
switch2 = newStandardSwitch()
wallet1 = WalletRef.example
wallet2 = WalletRef.example
pricing1 = Pricing.example
pricing2 = Pricing.example
await switch1.start()
await switch2.start()
peerId1 = switch1.peerInfo.peerId
peerId2 = switch2.peerInfo.peerId
localStore1 = CacheStore.new(blocks1.mapIt( it ))
discovery1 = Discovery.new(switch1.peerInfo, Port(0))
network1 = BlockExcNetwork.new(switch = switch1)
engine1 = BlockExcEngine.new(localStore1, wallet1, network1, discovery1)
blockexc1 = NetworkStore.new(engine1, localStore1)
switch1.mount(network1)
localStore2 = CacheStore.new(blocks2.mapIt( it ))
discovery2 = Discovery.new(switch2.peerInfo, Port(0))
network2 = BlockExcNetwork.new(switch = switch2)
engine2 = BlockExcEngine.new(localStore2, wallet2, network2, discovery2)
blockexc2 = NetworkStore.new(engine2, localStore2)
switch2.mount(network2)
nodeCmps1 = generateNodes(1, blocks1)[0]
nodeCmps2 = generateNodes(1, blocks2)[0]
await allFuturesThrowing(
engine1.start(),
engine2.start(),
)
nodeCmps1.switch.start(),
nodeCmps1.blockDiscovery.start(),
nodeCmps1.engine.start(),
nodeCmps2.switch.start(),
nodeCmps2.blockDiscovery.start(),
nodeCmps2.engine.start())
# initialize our want lists
pendingBlocks1 = blocks2.mapIt( blockexc1.engine.pendingBlocks.getWantHandle( it.cid ) )
pendingBlocks2 = blocks1.mapIt( blockexc2.engine.pendingBlocks.getWantHandle( it.cid ) )
pendingBlocks1 = blocks2.mapIt( nodeCmps1.pendingBlocks.getWantHandle( it.cid ) )
pendingBlocks2 = blocks1.mapIt( nodeCmps2.pendingBlocks.getWantHandle( it.cid ) )
pricing1.address = wallet1.address
pricing2.address = wallet2.address
blockexc1.engine.pricing = pricing1.some
blockexc2.engine.pricing = pricing2.some
pricing1.address = nodeCmps1.wallet.address
pricing2.address = nodeCmps2.wallet.address
nodeCmps1.engine.pricing = pricing1.some
nodeCmps2.engine.pricing = pricing2.some
await switch1.connect(
switch2.peerInfo.peerId,
switch2.peerInfo.addrs)
await nodeCmps1.switch.connect(
nodeCmps2.switch.peerInfo.peerId,
nodeCmps2.switch.peerInfo.addrs)
await sleepAsync(1.seconds) # give some time to exchange lists
peerCtx2 = blockexc1.engine.getPeerCtx(peerId2)
peerCtx1 = blockexc2.engine.getPeerCtx(peerId1)
peerCtx2 = nodeCmps1.peerStore.get(nodeCmps2.switch.peerInfo.peerId)
peerCtx1 = nodeCmps2.peerStore.get(nodeCmps1.switch.peerInfo.peerId)
check isNil(peerCtx1).not
check isNil(peerCtx2).not
teardown:
await allFuturesThrowing(
engine1.stop(),
engine2.stop(),
switch1.stop(),
switch2.stop())
test "should exchange want lists on connect":
check not isNil(peerCtx1)
check not isNil(peerCtx2)
nodeCmps1.blockDiscovery.stop(),
nodeCmps1.engine.stop(),
nodeCmps1.switch.stop(),
nodeCmps2.blockDiscovery.stop(),
nodeCmps2.engine.stop(),
nodeCmps2.switch.stop())
test "Should exchange want lists on connect":
await allFuturesThrowing(
allFinished(pendingBlocks1))
.wait(10.seconds)
await allFuturesThrowing(
allFinished(pendingBlocks2))
.wait(10.seconds)
check:
peerCtx1.peerHave.mapIt( $it ).sorted(cmp[string]) ==
@ -125,13 +101,13 @@ suite "NetworkStore engine - 2 nodes":
peerCtx2.peerHave.mapIt( $it ).sorted(cmp[string]) ==
pendingBlocks1.mapIt( $it.read.cid ).sorted(cmp[string])
test "exchanges accounts on connect":
test "Should exchanges accounts on connect":
check peerCtx1.account.?address == pricing1.address.some
check peerCtx2.account.?address == pricing2.address.some
test "should send want-have for block":
test "Should send want-have for block":
let blk = bt.Block.new("Block 1".toBytes).tryGet()
check await blockexc2.engine.localStore.putBlock(blk)
check await nodeCmps2.localStore.putBlock(blk)
let entry = Entry(
`block`: blk.cid.data.buffer,
@ -141,43 +117,43 @@ suite "NetworkStore engine - 2 nodes":
sendDontHave: false)
peerCtx1.peerWants.add(entry)
check blockexc2
check nodeCmps2
.engine
.taskQueue
.pushOrUpdateNoWait(peerCtx1).isOk
await sleepAsync(100.millis)
check blockexc1.engine.localStore.hasBlock(blk.cid)
check nodeCmps1.localStore.hasBlock(blk.cid)
test "should get blocks from remote":
test "Should get blocks from remote":
let blocks = await allFinished(
blocks2.mapIt( blockexc1.getBlock(it.cid) ))
blocks2.mapIt( nodeCmps1.networkStore.getBlock(it.cid) ))
check blocks.mapIt( !it.read ) == blocks2
test "remote should send blocks when available":
test "Remote should send blocks when available":
let blk = bt.Block.new("Block 1".toBytes).tryGet()
# should fail retrieving block from remote
check not await blockexc1.getBlock(blk.cid)
check not await nodeCmps1.networkStore.getBlock(blk.cid)
.withTimeout(100.millis) # should expire
# first put the required block in the local store
check await blockexc2.engine.localStore.putBlock(blk)
# second trigger blockexc to resolve any pending requests
# for the block
check await blockexc2.putBlock(blk)
check await nodeCmps2.networkStore.putBlock(blk)
# should succeed retrieving block from remote
check await blockexc1.getBlock(blk.cid)
.withTimeout(100.millis) # should succede
check await nodeCmps1.networkStore.getBlock(blk.cid)
.withTimeout(100.millis) # should succeed
test "receives payments for blocks that were sent":
test "Should receive payments for blocks that were sent":
let blocks = await allFinished(
blocks2.mapIt( blockexc1.getBlock(it.cid) ))
blocks2.mapIt( nodeCmps1.networkStore.getBlock(it.cid) ))
await sleepAsync(100.millis)
let channel = !peerCtx1.paymentChannel
check wallet2.balance(channel, Asset) > 0
let
channel = !peerCtx1.paymentChannel
check nodeCmps2.wallet.balance(channel, Asset) > 0
suite "NetworkStore - multiple nodes":
let
@ -185,7 +161,7 @@ suite "NetworkStore - multiple nodes":
var
switch: seq[Switch]
blockexc: seq[NetworkStore]
networkStore: seq[NetworkStore]
blocks: seq[bt.Block]
setup:
@ -198,8 +174,8 @@ suite "NetworkStore - multiple nodes":
for e in generateNodes(5):
switch.add(e.switch)
blockexc.add(e.blockexc)
await e.blockexc.engine.start()
networkStore.add(e.networkStore)
await e.engine.start()
await allFuturesThrowing(
switch.mapIt( it.start() )
@ -211,11 +187,11 @@ suite "NetworkStore - multiple nodes":
)
switch = @[]
blockexc = @[]
networkStore = @[]
test "should receive haves for own want list":
test "Should receive haves for own want list":
let
downloader = blockexc[4]
downloader = networkStore[4]
engine = downloader.engine
# Add blocks from 1st peer to want list
@ -224,13 +200,13 @@ suite "NetworkStore - multiple nodes":
pendingBlocks2 = blocks[12..15].mapIt( engine.pendingBlocks.getWantHandle( it.cid ))
await allFutures(
blocks[0..3].mapIt( blockexc[0].engine.localStore.putBlock(it) ))
blocks[0..3].mapIt( networkStore[0].engine.localStore.putBlock(it) ))
await allFutures(
blocks[4..7].mapIt( blockexc[1].engine.localStore.putBlock(it) ))
blocks[4..7].mapIt( networkStore[1].engine.localStore.putBlock(it) ))
await allFutures(
blocks[8..11].mapIt( blockexc[2].engine.localStore.putBlock(it) ))
blocks[8..11].mapIt( networkStore[2].engine.localStore.putBlock(it) ))
await allFutures(
blocks[12..15].mapIt( blockexc[3].engine.localStore.putBlock(it) ))
blocks[12..15].mapIt( networkStore[3].engine.localStore.putBlock(it) ))
await connectNodes(switch)
await sleepAsync(1.seconds)
@ -239,16 +215,19 @@ suite "NetworkStore - multiple nodes":
allFinished(pendingBlocks1),
allFinished(pendingBlocks2))
let
peers = toSeq(engine.peers)
check:
engine.peers[0].peerHave.mapIt($it).sorted(cmp[string]) ==
peers[0].peerHave.mapIt($it).sorted(cmp[string]) ==
blocks[0..3].mapIt( $(it.cid) ).sorted(cmp[string])
engine.peers[3].peerHave.mapIt($it).sorted(cmp[string]) ==
peers[3].peerHave.mapIt($it).sorted(cmp[string]) ==
blocks[12..15].mapIt( $(it.cid) ).sorted(cmp[string])
test "should exchange blocks with multiple nodes":
test "Should exchange blocks with multiple nodes":
let
downloader = blockexc[4]
downloader = networkStore[4]
engine = downloader.engine
# Add blocks from 1st peer to want list
@ -257,13 +236,13 @@ suite "NetworkStore - multiple nodes":
pendingBlocks2 = blocks[12..15].mapIt( engine.pendingBlocks.getWantHandle( it.cid ))
await allFutures(
blocks[0..3].mapIt( blockexc[0].engine.localStore.putBlock(it) ))
blocks[0..3].mapIt( networkStore[0].engine.localStore.putBlock(it) ))
await allFutures(
blocks[4..7].mapIt( blockexc[1].engine.localStore.putBlock(it) ))
blocks[4..7].mapIt( networkStore[1].engine.localStore.putBlock(it) ))
await allFutures(
blocks[8..11].mapIt( blockexc[2].engine.localStore.putBlock(it) ))
blocks[8..11].mapIt( networkStore[2].engine.localStore.putBlock(it) ))
await allFutures(
blocks[12..15].mapIt( blockexc[3].engine.localStore.putBlock(it) ))
blocks[12..15].mapIt( networkStore[3].engine.localStore.putBlock(it) ))
await connectNodes(switch)
await sleepAsync(1.seconds)

View File

@ -21,19 +21,28 @@ import ../helpers
import ../examples
suite "NetworkStore engine basic":
let
var
rng: Rng
seckey: PrivateKey
peerId: PeerID
chunker: Chunker
wallet: WalletRef
blockDiscovery: Discovery
peerStore: PeerCtxStore
pendingBlocks: PendingBlocksManager
blocks: seq[bt.Block]
done: Future[void]
setup:
rng = Rng.instance()
seckey = PrivateKey.random(rng[]).tryGet()
peerId = PeerID.init(seckey.getPublicKey().tryGet()).tryGet()
chunker = RandomChunker.new(Rng.instance(), size = 1024, chunkSize = 256)
wallet = WalletRef.example
discovery = Discovery.new()
blockDiscovery = Discovery.new()
peerStore = PeerCtxStore.new()
pendingBlocks = PendingBlocksManager.new()
var
blocks: seq[bt.Block]
done: Future[void]
setup:
while true:
let chunk = await chunker.getBytes()
if chunk.len <= 0:
@ -43,7 +52,7 @@ suite "NetworkStore engine basic":
done = newFuture[void]()
test "should send want list to new peers":
test "Should send want list to new peers":
proc sendWantList(
id: PeerID,
cids: seq[Cid],
@ -53,7 +62,6 @@ suite "NetworkStore engine basic":
full: bool = false,
sendDontHave: bool = false) {.gcsafe.} =
check cids.mapIt($it).sorted == blocks.mapIt( $it.cid ).sorted
done.complete()
let
@ -61,19 +69,29 @@ suite "NetworkStore engine basic":
sendWantList: sendWantList,
))
localStore = CacheStore.new(blocks.mapIt( it ))
discovery = DiscoveryEngine.new(
localStore,
peerStore,
network,
blockDiscovery,
pendingBlocks)
engine = BlockExcEngine.new(
CacheStore.new(blocks.mapIt( it )),
localStore,
wallet,
network,
discovery)
discovery,
peerStore,
pendingBlocks)
for b in blocks:
discard engine.pendingBlocks.getWantHandle(b.cid)
engine.setupPeer(peerId)
await done
await done.wait(100.millis)
test "sends account to new peers":
test "Should send account to new peers":
let pricing = Pricing.example
proc sendAccount(peer: PeerID, account: Account) =
@ -82,31 +100,52 @@ suite "NetworkStore engine basic":
let
network = BlockExcNetwork(request: BlockExcRequest(
sendAccount: sendAccount,
sendAccount: sendAccount
))
engine = BlockExcEngine.new(CacheStore.new, wallet, network, discovery)
localStore = CacheStore.new()
discovery = DiscoveryEngine.new(
localStore,
peerStore,
network,
blockDiscovery,
pendingBlocks)
engine = BlockExcEngine.new(
localStore,
wallet,
network,
discovery,
peerStore,
pendingBlocks)
engine.pricing = pricing.some
engine.setupPeer(peerId)
await done.wait(100.millis)
suite "NetworkStore engine handlers":
let
rng = Rng.instance()
seckey = PrivateKey.random(rng[]).tryGet()
peerId = PeerID.init(seckey.getPublicKey().tryGet()).tryGet()
chunker = RandomChunker.new(Rng.instance(), size = 1024, chunkSize = 256)
wallet = WalletRef.example
discovery = Discovery.new()
var
rng: Rng
seckey: PrivateKey
peerId: PeerID
chunker: Chunker
wallet: WalletRef
blockDiscovery: Discovery
peerStore: PeerCtxStore
pendingBlocks: PendingBlocksManager
network: BlockExcNetwork
engine: BlockExcEngine
discovery: DiscoveryEngine
peerCtx: BlockExcPeerCtx
localStore: BlockStore
done: Future[void]
blocks: seq[bt.Block]
setup:
rng = Rng.instance()
chunker = RandomChunker.new(rng, size = 1024, chunkSize = 256)
while true:
let chunk = await chunker.getBytes()
if chunk.len <= 0:
@ -114,14 +153,38 @@ suite "NetworkStore engine handlers":
blocks.add(bt.Block.new(chunk).tryGet())
done = newFuture[void]()
engine = BlockExcEngine.new(CacheStore.new(), wallet, BlockExcNetwork(), discovery)
seckey = PrivateKey.random(rng[]).tryGet()
peerId = PeerID.init(seckey.getPublicKey().tryGet()).tryGet()
wallet = WalletRef.example
blockDiscovery = Discovery.new()
peerStore = PeerCtxStore.new()
pendingBlocks = PendingBlocksManager.new()
localStore = CacheStore.new()
network = BlockExcNetwork()
discovery = DiscoveryEngine.new(
localStore,
peerStore,
network,
blockDiscovery,
pendingBlocks)
engine = BlockExcEngine.new(
localStore,
wallet,
network,
discovery,
peerStore,
pendingBlocks)
peerCtx = BlockExcPeerCtx(
id: peerId
)
engine.peers.add(peerCtx)
done = newFuture[void]()
test "should handle want list":
test "Should handle want list":
let wantList = makeWantList(blocks.mapIt( it.cid ))
proc handler() {.async.} =
let ctx = await engine.taskQueue.pop()
@ -132,7 +195,7 @@ suite "NetworkStore engine handlers":
await engine.wantListHandler(peerId, wantList)
await done
test "should handle want list - `dont-have`":
test "Should handle want list - `dont-have`":
let wantList = makeWantList(blocks.mapIt( it.cid ), sendDontHave = true)
proc sendPresence(peerId: PeerID, presence: seq[BlockPresence]) =
check presence.mapIt( it.cid ) == wantList.entries.mapIt( it.`block` )
@ -150,7 +213,7 @@ suite "NetworkStore engine handlers":
await done
test "should handle want list - `dont-have` some blocks":
test "Should handle want list - `dont-have` some blocks":
let wantList = makeWantList(blocks.mapIt( it.cid ), sendDontHave = true)
proc sendPresence(peerId: PeerID, presence: seq[BlockPresence]) =
check presence.mapIt( it.cid ) == blocks[2..blocks.high].mapIt( it.cid.data.buffer )
@ -170,7 +233,7 @@ suite "NetworkStore engine handlers":
await done
test "stores blocks in local store":
test "Should store blocks in local store":
let pending = blocks.mapIt(
engine.pendingBlocks.getWantHandle( it.cid )
)
@ -181,9 +244,9 @@ suite "NetworkStore engine handlers":
for b in blocks:
check engine.localStore.hasBlock(b.cid)
test "sends payments for received blocks":
test "Should send payments for received blocks":
let account = Account(address: EthAddress.example)
let peerContext = engine.getPeerCtx(peerId)
let peerContext = peerStore.get(peerId)
peerContext.account = account.some
peerContext.peerPrices = blocks.mapIt((it.cid, rand(uint16).u256)).toTable
@ -197,10 +260,9 @@ suite "NetworkStore engine handlers":
))
await engine.blocksHandler(peerId, blocks)
await done.wait(100.millis)
test "should handle block presence":
test "Should handle block presence":
let price = UInt256.example
await engine.blockPresenceHandler(
peerId,
@ -217,20 +279,28 @@ suite "NetworkStore engine handlers":
check peerCtx.peerPrices[cid] == price
suite "Task Handler":
let
rng = Rng.instance()
chunker = RandomChunker.new(Rng.instance(), size = 2048, chunkSize = 256)
wallet = WalletRef.example
var
rng: Rng
seckey: PrivateKey
peerId: PeerID
chunker: Chunker
wallet: WalletRef
blockDiscovery: Discovery
peerStore: PeerCtxStore
pendingBlocks: PendingBlocksManager
network: BlockExcNetwork
engine: BlockExcEngine
discovery: DiscoveryEngine
peerCtx: BlockExcPeerCtx
localStore: BlockStore
peersCtx: seq[BlockExcPeerCtx]
peers: seq[PeerID]
done: Future[void]
blocks: seq[bt.Block]
setup:
rng = Rng.instance()
chunker = RandomChunker.new(rng, size = 1024, chunkSize = 256)
while true:
let chunk = await chunker.getBytes()
if chunk.len <= 0:
@ -238,8 +308,30 @@ suite "Task Handler":
blocks.add(bt.Block.new(chunk).tryGet())
done = newFuture[void]()
engine = BlockExcEngine.new(CacheStore.new(), wallet, BlockExcNetwork(), Discovery.new())
seckey = PrivateKey.random(rng[]).tryGet()
peerId = PeerID.init(seckey.getPublicKey().tryGet()).tryGet()
wallet = WalletRef.example
blockDiscovery = Discovery.new()
peerStore = PeerCtxStore.new()
pendingBlocks = PendingBlocksManager.new()
localStore = CacheStore.new()
network = BlockExcNetwork()
discovery = DiscoveryEngine.new(
localStore,
peerStore,
network,
blockDiscovery,
pendingBlocks)
engine = BlockExcEngine.new(
localStore,
wallet,
network,
discovery,
peerStore,
pendingBlocks)
peersCtx = @[]
for i in 0..3:
@ -249,8 +341,8 @@ suite "Task Handler":
peersCtx.add(BlockExcPeerCtx(
id: peers[i]
))
peerStore.add(peersCtx[i])
engine.peers = peersCtx
engine.pricing = Pricing.example.some
test "Should send want-blocks in priority order":

View File

@ -0,0 +1,109 @@
import std/sugar
import std/sequtils
import pkg/unittest2
import pkg/libp2p
import pkg/dagger/blockexchange/peers
import pkg/dagger/blockexchange/protobuf/blockexc
import ../examples
suite "Peer Context Store":
var
store: PeerCtxStore
peerCtx: BlockExcPeerCtx
setup:
store = PeerCtxStore.new()
peerCtx = BlockExcPeerCtx.example
store.add(peerCtx)
test "Should add peer":
check peerCtx.id in store
test "Should remove peer":
store.remove(peerCtx.id)
check peerCtx.id notin store
test "Should get peer":
check store.get(peerCtx.id) == peerCtx
suite "Peer Context Store Peer Selection":
var
store: PeerCtxStore
peerCtxs: seq[BlockExcPeerCtx]
cids: seq[Cid]
setup:
store = PeerCtxStore.new()
cids = collect(newSeq):
for i in 0..<10: Cid.example
peerCtxs = collect(newSeq):
for i in 0..<10: BlockExcPeerCtx.example
for p in peerCtxs:
store.add(p)
teardown:
store = nil
cids = @[]
peerCtxs = @[]
test "Should select peers that have Cid":
peerCtxs[0].peerPrices = collect(initTable):
for i, c in cids:
{ c: i.u256 }
peerCtxs[5].peerPrices = collect(initTable):
for i, c in cids:
{ c: i.u256 }
let
peers = store.peersHave(cids[0])
check peers.len == 2
check peerCtxs[0] in peers
check peerCtxs[5] in peers
test "Should select cheapest peers for Cid":
peerCtxs[0].peerPrices = collect(initTable):
for i, c in cids:
{ c: (5 + i).u256 }
peerCtxs[5].peerPrices = collect(initTable):
for i, c in cids:
{ c: (2 + i).u256 }
peerCtxs[9].peerPrices = collect(initTable):
for i, c in cids:
{ c: i.u256 }
let
peers = store.selectCheapest(cids[0])
check peers.len == 3
check peers[0] == peerCtxs[9]
check peers[1] == peerCtxs[5]
check peers[2] == peerCtxs[0]
test "Should select peers that want Cid":
let
entries = cids.mapIt(
Entry(
`block`: it.data.buffer,
priority: 1,
cancel: false,
wantType: WantType.wantBlock,
sendDontHave: false))
peerCtxs[0].peerWants = entries
peerCtxs[5].peerWants = entries
let
peers = store.peersWant(cids[4])
check peers.len == 2
check peerCtxs[0] in peers
check peerCtxs[5] in peers

View File

@ -5,7 +5,7 @@ import pkg/nitro
import pkg/stint
import pkg/dagger/rng
import pkg/dagger/stores
import pkg/dagger/blocktype
import pkg/dagger/blocktype as bt
import pkg/dagger/sales
import ../examples
@ -38,10 +38,10 @@ proc example*(_: type Pricing): Pricing =
price: uint32.rand.u256
)
proc example*(_: type Block): Block =
proc example*(_: type bt.Block): bt.Block =
let length = rand(4096)
let bytes = newSeqWith(length, rand(uint8))
Block.new(bytes).tryGet()
bt.Block.new(bytes).tryGet()
proc example*(_: type PeerId): PeerID =
let key = PrivateKey.random(Rng.instance[]).get
@ -51,7 +51,7 @@ proc example*(_: type BlockExcPeerCtx): BlockExcPeerCtx =
BlockExcPeerCtx(id: PeerID.example)
proc example*(_: type Cid): Cid =
Block.example.cid
bt.Block.example.cid
proc example*(_: type Availability): Availability =
Availability.init(uint16.example, uint16.example, uint64.example.u256)

View File

@ -16,8 +16,10 @@ import pkg/dagger/discovery
type
MockDiscovery* = ref object of Discovery
findBlockProvidersHandler*: proc(d: MockDiscovery, cid: Cid): seq[SignedPeerRecord] {.gcsafe.}
publishProvideHandler*: proc(d: MockDiscovery, cid: Cid) {.gcsafe.}
findBlockProvidersHandler*: proc(d: MockDiscovery, cid: Cid):
Future[seq[SignedPeerRecord]] {.gcsafe.}
publishProvideHandler*: proc(d: MockDiscovery, cid: Cid):
Future[void] {.gcsafe.}
proc new*(
T: type MockDiscovery,
@ -36,13 +38,16 @@ proc findPeer*(
method findBlockProviders*(
d: MockDiscovery,
cid: Cid): Future[seq[SignedPeerRecord]] {.async.} =
if isNil(d.findBlockProvidersHandler): return
if isNil(d.findBlockProvidersHandler):
return
return d.findBlockProvidersHandler(d, cid)
return await d.findBlockProvidersHandler(d, cid)
method provideBlock*(d: MockDiscovery, cid: Cid) {.async.} =
if isNil(d.publishProvideHandler): return
d.publishProvideHandler(d, cid)
method provideBlock*(d: MockDiscovery, cid: Cid): Future[void] {.async.} =
if isNil(d.publishProvideHandler):
return
await d.publishProvideHandler(d, cid)
proc start*(d: Discovery) {.async.} =
discard

View File

@ -6,25 +6,51 @@ import pkg/libp2p
import pkg/dagger/discovery
import pkg/dagger/stores
import pkg/dagger/blocktype as bt
import pkg/dagger/blockexchange
import ../examples
type
NodesComponents* = tuple[
switch: Switch,
blockDiscovery: Discovery,
wallet: WalletRef,
network: BlockExcNetwork,
localStore: BlockStore,
peerStore: PeerCtxStore,
pendingBlocks: PendingBlocksManager,
discovery: DiscoveryEngine,
engine: BlockExcEngine,
networkStore: NetworkStore]
proc generateNodes*(
num: Natural,
blocks: openArray[bt.Block] = []):
seq[tuple[switch: Switch, blockexc: NetworkStore]] =
blocks: openArray[bt.Block] = []): seq[NodesComponents] =
for i in 0..<num:
let
switch = newStandardSwitch(transportFlags = {ServerFlags.ReuseAddr})
discovery = Discovery.new(switch.peerInfo, Port(0))
blockDiscovery = Discovery.new(switch.peerInfo, Port(0))
wallet = WalletRef.example
network = BlockExcNetwork.new(switch)
localStore = CacheStore.new(blocks.mapIt( it ))
engine = BlockExcEngine.new(localStore, wallet, network, discovery)
peerStore = PeerCtxStore.new()
pendingBlocks = PendingBlocksManager.new()
discovery = DiscoveryEngine.new(localStore, peerStore, network, blockDiscovery, pendingBlocks)
engine = BlockExcEngine.new(localStore, wallet, network, discovery, peerStore, pendingBlocks)
networkStore = NetworkStore.new(engine, localStore)
switch.mount(network)
result.add((switch, networkStore))
result.add((
switch,
blockDiscovery,
wallet,
network,
localStore,
peerStore,
pendingBlocks,
discovery,
engine,
networkStore))
proc connectNodes*(nodes: seq[Switch]) {.async.} =
for dialer in nodes:

View File

@ -11,6 +11,7 @@ import pkg/stew/byteutils
import pkg/dagger/stores/cachestore
import pkg/dagger/chunker
import pkg/dagger/stores
import pkg/dagger/blocktype as bt
import ../helpers
@ -21,7 +22,7 @@ suite "FS Store":
var
store: FSStore
repoDir: string
newBlock = Block.new("New Block".toBytes()).tryGet()
newBlock = bt.Block.new("New Block".toBytes()).tryGet()
setup:
repoDir = path.parentDir / "repo"

View File

@ -1,9 +1,11 @@
import ./blockexc/testengine
import ./blockexc/testnetwork
import ./blockexc/testblockexc
import ./blockexc/testpeerctxstore
import ./blockexc/discovery/testdiscovery
import ./blockexc/discovery/testdiscoveryengine
import ./blockexc/protobuf/testpayments as testprotobufpayments
import ./blockexc/protobuf/testpresence
import ./blockexc/engine/testpayments as testenginepayments
import ./blockexc/testblockexc
import ./blockexc/discovery/testdiscovery
{.warning[UnusedImport]: off.}

View File

@ -9,7 +9,7 @@ import pkg/questionable/results
import pkg/dagger/erasure
import pkg/dagger/manifest
import pkg/dagger/stores
import pkg/dagger/blocktype
import pkg/dagger/blocktype as bt
import pkg/dagger/rng
import ./helpers
@ -32,7 +32,7 @@ suite "Erasure encode/decode":
let chunk = await chunker.getBytes();
chunk.len > 0):
let blk = Block.new(chunk).tryGet()
let blk = bt.Block.new(chunk).tryGet()
manifest.add(blk.cid)
check (await store.putBlock(blk))
@ -84,7 +84,7 @@ suite "Erasure encode/decode":
let chunk = await chunker.getBytes();
chunk.len > 0):
let blk = Block.new(chunk).tryGet()
let blk = bt.Block.new(chunk).tryGet()
manifest.add(blk.cid)
check (await store.putBlock(blk))
@ -134,7 +134,7 @@ suite "Erasure encode/decode":
let chunk = await chunker.getBytes();
chunk.len > 0):
let blk = Block.new(chunk).tryGet()
let blk = bt.Block.new(chunk).tryGet()
manifest.add(blk.cid)
check (await store.putBlock(blk))
@ -187,7 +187,7 @@ suite "Erasure encode/decode":
let chunk = await chunker.getBytes();
chunk.len > 0):
let blk = Block.new(chunk).tryGet()
let blk = bt.Block.new(chunk).tryGet()
manifest.add(blk.cid)
check (await store.putBlock(blk))
@ -246,7 +246,7 @@ suite "Erasure encode/decode":
let chunk = await chunker.getBytes();
chunk.len > 0):
let blk = Block.new(chunk).tryGet()
let blk = bt.Block.new(chunk).tryGet()
manifest.add(blk.cid)
check (await store.putBlock(blk))
@ -287,7 +287,7 @@ suite "Erasure encode/decode":
let chunk = await chunker.getBytes();
chunk.len > 0):
let blk = Block.new(chunk).tryGet()
let blk = bt.Block.new(chunk).tryGet()
manifest.add(blk.cid)
check (await store.putBlock(blk))

View File

@ -35,7 +35,10 @@ suite "Test Node":
engine: BlockExcEngine
store: NetworkStore
node: DaggerNodeRef
discovery: Discovery
blockDiscovery: Discovery
peerStore: PeerCtxStore
pendingBlocks: PendingBlocksManager
discovery: DiscoveryEngine
contracts: ?ContractInteractions
setup:
@ -45,11 +48,14 @@ suite "Test Node":
wallet = WalletRef.new(EthPrivateKey.random())
network = BlockExcNetwork.new(switch)
localStore = CacheStore.new()
discovery = Discovery.new(switch.peerInfo, Port(0))
engine = BlockExcEngine.new(localStore, wallet, network, discovery)
blockDiscovery = Discovery.new(switch.peerInfo, Port(0))
peerStore = PeerCtxStore.new()
pendingBlocks = PendingBlocksManager.new()
discovery = DiscoveryEngine.new(localStore, peerStore, network, blockDiscovery, pendingBlocks)
engine = BlockExcEngine.new(localStore, wallet, network, discovery, peerStore, pendingBlocks)
store = NetworkStore.new(engine, localStore)
contracts = ContractInteractions.new()
node = DaggerNodeRef.new(switch, store, engine, nil, discovery, contracts) # TODO: pass `Erasure`
node = DaggerNodeRef.new(switch, store, engine, nil, blockDiscovery, contracts) # TODO: pass `Erasure`
await node.start()

View File

@ -9,6 +9,7 @@ import pkg/dagger/streams
import pkg/dagger/stores
import pkg/dagger/manifest
import pkg/dagger/rng
import pkg/dagger/blocktype as bt
suite "StoreStream":
var
@ -37,7 +38,7 @@ suite "StoreStream":
for d in data:
let
blk = Block.new(d).tryGet()
blk = bt.Block.new(d).tryGet()
manifest.add(blk.cid)
if not (await store.putBlock(blk)):