* First implem

* Add persistent net key option

* Working DHT setup

* Bootstrap nodes

* Implement DaggerNode.findPeer

* Remove unrelevant comment

* Added discovery to blockexchange requestBlock

* add FSStore.blockList

* Block advertisement

* Tests compiles

* Green tests

* toDiscoveryId instead of toNodeId

* remove stopAdvertisingBlock

* Removed nim-eth dependency

* Move discovery stuff to discovery.nim

* Add missing file, start of discovery tests

* Better discovery logic

* Add tests

* Address comment

* Better E2E test
This commit is contained in:
Tanguy 2022-04-13 18:32:35 +02:00 committed by GitHub
parent 4bf28f1619
commit 4d681102e5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
23 changed files with 615 additions and 98 deletions

5
.gitmodules vendored
View File

@ -178,3 +178,8 @@
[submodule "vendor/nim-leopard"]
path = vendor/nim-leopard
url = https://github.com/status-im/nim-leopard.git
[submodule "vendor/nim-libp2p-dht"]
path = vendor/nim-libp2p-dht
url = https://github.com/status-im/nim-libp2p-dht.git
ignore = untracked
branch = master

View File

@ -66,6 +66,8 @@ switch("warning", "ObservableStores:off")
# Too many false positives for "Warning: method has lock level <unknown>, but another method has 0 [LockLevel]"
switch("warning", "LockLevel:off")
switch("define", "libp2p_pki_schemes=secp256k1")
switch("define", "chronicles_sinks=textlines[dynamic],json[dynamic]")
# begin Nimble config (version 1)

View File

@ -7,7 +7,7 @@
## This file may not be copied, modified, or distributed except according to
## those terms.
import std/sequtils
import std/[sequtils, sets, tables, sugar]
import pkg/chronos
import pkg/chronicles
@ -16,6 +16,7 @@ import pkg/libp2p
import ../stores/blockstore
import ../blocktype as bt
import ../utils/asyncheapqueue
import ../discovery
import ./protobuf/blockexc
import ./protobuf/presence
@ -37,15 +38,29 @@ const
DefaultConcurrentTasks = 10
DefaultMaxRetries = 3
# Current advertisement is meant to be more efficient than
# correct, so blocks could be advertised more slowly than that
# Put some margin
BlockAdvertisementFrequency = 30.minutes
type
TaskHandler* = proc(task: BlockExcPeerCtx): Future[void] {.gcsafe.}
TaskScheduler* = proc(task: BlockExcPeerCtx): bool {.gcsafe.}
BlockDiscovery* = ref object
discoveredProvider: AsyncEvent
discoveryLoop: Future[void]
toDiscover: Cid
treatedPeer: HashSet[PeerId]
inflightIWant: HashSet[PeerId]
gotIWantResponse: AsyncEvent
provides: seq[PeerId]
lastDhtQuery: Moment
BlockExcEngine* = ref object of RootObj
localStore*: BlockStore # where we localStore blocks for this instance
network*: BlockExcNetwork # network interface
peers*: seq[BlockExcPeerCtx] # peers we're currently actively exchanging with
wantList*: seq[Cid] # local wants list
taskQueue*: AsyncHeapQueue[BlockExcPeerCtx] # peers we're currently processing tasks for
concurrentTasks: int # number of concurrent peers we're serving at any given time
maxRetries: int # max number of tries for a failed block
@ -55,6 +70,12 @@ type
peersPerRequest: int # max number of peers to request from
wallet*: WalletRef # nitro wallet for micropayments
pricing*: ?Pricing # optional bandwidth pricing
advertisedBlocks: seq[Cid]
advertisedIndex: int
advertisementFrequency: Duration
runningDiscoveries*: Table[Cid, BlockDiscovery]
blockAdded: AsyncEvent
discovery*: Discovery
Pricing* = object
address*: EthAddress
@ -79,6 +100,7 @@ proc scheduleTask(b: BlockExcEngine, task: BlockExcPeerCtx): bool {.gcsafe} =
b.taskQueue.pushOrUpdateNoWait(task).isOk()
proc blockexcTaskRunner(b: BlockExcEngine): Future[void] {.gcsafe.}
proc advertiseLoop(b: BlockExcEngine): Future[void] {.gcsafe.}
proc start*(b: BlockExcEngine) {.async.} =
## Start the blockexc task
@ -94,6 +116,14 @@ proc start*(b: BlockExcEngine) {.async.} =
for i in 0..<b.concurrentTasks:
b.blockexcTasks.add(blockexcTaskRunner(b))
info "Getting existing block list"
let blocks = await b.localStore.blockList()
b.advertisedBlocks = blocks
# We start faster to publish everything ASAP
b.advertisementFrequency = 5.seconds
b.blockexcTasks.add(b.advertiseLoop())
proc stop*(b: BlockExcEngine) {.async.} =
## Stop the blockexc blockexc
##
@ -110,64 +140,155 @@ proc stop*(b: BlockExcEngine) {.async.} =
await t.cancelAndWait()
trace "Task stopped"
for _, bd in b.runningDiscoveries:
await bd.discoveryLoop.cancelAndWait()
b.runningDiscoveries.clear()
trace "NetworkStore stopped"
proc discoverOnDht(b: BlockExcEngine, bd: BlockDiscovery) {.async.} =
bd.lastDhtQuery = Moment.fromNow(10.hours)
defer: bd.lastDhtQuery = Moment.now()
let discoveredProviders = await b.discovery.findBlockProviders(bd.toDiscover)
for peer in discoveredProviders:
asyncSpawn b.network.dialPeer(peer.data)
proc discoverLoop(b: BlockExcEngine, bd: BlockDiscovery) {.async.} =
# First, try connected peers
# After a percent of peers declined, or a timeout passed, query DHT
# rinse & repeat
#
# TODO add a global timeout
debug "starting block discovery", cid=bd.toDiscover
bd.gotIWantResponse.fire()
while true:
# wait for iwant replies
await bd.gotIWantResponse.wait()
bd.gotIWantResponse.clear()
var foundPeerNew = false
for p in b.peers:
if bd.toDiscover in p.peerHave and p.id notin bd.treatedPeer:
bd.provides.add(p.id)
bd.treatedPeer.incl(p.id)
bd.inflightIWant.excl(p.id)
foundPeerNew = true
if foundPeerNew:
bd.discoveredProvider.fire()
continue
trace "asking peers", cid=bd.toDiscover, peers=b.peers.len, treated=bd.treatedPeer.len, inflight=bd.inflightIWant.len
for p in b.peers:
if p.id notin bd.treatedPeer and p.id notin bd.inflightIWant:
# just send wants
bd.inflightIWant.incl(p.id)
b.network.request.sendWantList(
p.id,
@[bd.toDiscover],
wantType = WantType.wantHave,
sendDontHave = true)
if bd.inflightIWant.len < 3 and #TODO or a timeout
bd.lastDhtQuery < Moment.now() - 5.seconds:
#start query
asyncSpawn b.discoverOnDht(bd)
proc discoverBlock*(b: BlockExcEngine, cid: Cid): BlockDiscovery =
if cid in b.runningDiscoveries:
return b.runningDiscoveries[cid]
else:
result = BlockDiscovery(
toDiscover: cid,
discoveredProvider: newAsyncEvent(),
gotIWantResponse: newAsyncEvent(),
)
result.discoveryLoop = b.discoverLoop(result)
b.runningDiscoveries[cid] = result
return result
proc stopDiscovery(b: BlockExcEngine, cid: Cid) =
if cid in b.runningDiscoveries:
b.runningDiscoveries[cid].discoveryLoop.cancel()
b.runningDiscoveries.del(cid)
proc requestBlock*(
b: BlockExcEngine,
cid: Cid,
timeout = DefaultBlockTimeout): Future[bt.Block] =
timeout = DefaultBlockTimeout): Future[bt.Block] {.async.} =
## Request a block from remotes
##
debug "requesting block", cid
# TODO
# we could optimize "groups of related chunks"
# be requesting multiple chunks, and running discovery
# less often
if cid in b.localStore:
return (await b.localStore.getBlock(cid)).get()
# be careful, don't give back control to main loop here
# otherwise, the block might slip in
if cid in b.pendingBlocks:
return await b.pendingBlocks.blocks[cid].wait(timeout)
# We are the first one to request this block, so we handle it
let
blk = b.pendingBlocks.addOrAwait(cid).wait(timeout)
timeoutFut = sleepAsync(timeout)
blk = b.pendingBlocks.addOrAwait(cid)
discovery = b.discoverBlock(cid)
if b.peers.len <= 0:
warn "No peers to request blocks from"
# TODO: run discovery here to get peers for the block
return blk
# Just take the first discovered peer
try:
await timeoutFut or blk or discovery.discoveredProvider.wait()
discovery.discoveredProvider.clear()
except CancelledError as exc:
#TODO also wrong, same issue as below
blk.cancel()
b.stopDiscovery(cid)
raise exc
var peers = b.peers
if timeoutFut.finished:
# TODO this is wrong, because other user may rely on us
# to handle this block. This proc should be asyncSpawned
#
# Other people may be using the discovery or blk
# so don't kill them
blk.cancel()
b.stopDiscovery(cid)
raise newException(AsyncTimeoutError, "")
# get the first peer with at least one (any)
# matching cid
var blockPeer: BlockExcPeerCtx
for i, p in peers:
if cid in p.peerHave:
blockPeer = p
break
if blk.finished:
# a peer sent us the block out of the blue, why not
b.stopDiscovery(cid)
return await blk
# didn't find any peer with matching cids
# use the first one in the sorted array
if isNil(blockPeer):
blockPeer = peers[0]
# We got a provider
# Currently, we just ask him for the block, and hope he gives it to us
#
# In reality, we could keep discovering until we find a suitable price, etc
b.stopDiscovery(cid)
timeoutFut.cancel()
peers.keepItIf(
it != blockPeer
)
assert discovery.provides.len > 0
trace "Requesting block from peer", peer = blockPeer.id, cid
# request block
b.network.request.sendWantList(
blockPeer.id,
discovery.provides[0],
@[cid],
wantType = WantType.wantBlock) # we want this remote to send us a block
if peers.len == 0:
return blk # no peers to send wants to
# filter out the peer we've already requested from
let stop = min(peers.high, b.peersPerRequest)
trace "Sending want list requests to remaining peers", count = stop + 1
for p in peers[0..stop]:
if cid notin p.peerHave:
# just send wants
b.network.request.sendWantList(
p.id,
@[cid],
wantType = WantType.wantHave) # we only want to know if the peer has the block
return blk
#TODO substract the discovery time
return await blk.wait(timeout)
proc blockPresenceHandler*(
b: BlockExcEngine,
@ -177,12 +298,17 @@ proc blockPresenceHandler*(
##
let peerCtx = b.getPeerCtx(peer)
if isNil(peerCtx):
return
for blk in blocks:
if presence =? Presence.init(blk):
peerCtx.updatePresence(presence)
if not isNil(peerCtx):
peerCtx.updatePresence(presence)
if presence.cid in b.runningDiscoveries:
let bd = b.runningDiscoveries[presence.cid]
if not presence.have:
bd.inflightIWant.excl(peer)
bd.treatedPeer.incl(peer)
bd.gotIWantResponse.fire()
proc scheduleTasks(b: BlockExcEngine, blocks: seq[bt.Block]) =
trace "Schedule a task for new blocks"
@ -204,8 +330,20 @@ proc resolveBlocks*(b: BlockExcEngine, blocks: seq[bt.Block]) =
##
trace "Resolving blocks"
b.pendingBlocks.resolve(blocks)
b.scheduleTasks(blocks)
var gotNewBlocks = false
for bl in blocks:
if bl.cid notin b.advertisedBlocks: #TODO that's very slow, maybe a ordered hashset instead
#TODO could do some smarter ordering here (insert it just before b.advertisedIndex, or similar)
b.advertisedBlocks.add(bl.cid)
asyncSpawn b.discovery.publishProvide(bl.cid)
gotNewBlocks = true
if gotNewBlocks:
b.pendingBlocks.resolve(blocks)
b.scheduleTasks(blocks)
b.blockAdded.fire()
proc payForBlocks(engine: BlockExcEngine,
peer: BlockExcPeerCtx,
@ -311,8 +449,13 @@ proc setupPeer*(b: BlockExcEngine, peer: PeerID) =
))
# broadcast our want list, the other peer will do the same
if b.wantList.len > 0:
b.network.request.sendWantList(peer, b.wantList, full = true)
let wantList = collect(newSeqOfCap(b.runningDiscoveries.len)):
for cid, bd in b.runningDiscoveries:
bd.inflightIWant.incl(peer)
cid
if wantList.len > 0:
b.network.request.sendWantList(peer, wantList, full = true, sendDontHave = true)
if address =? b.pricing.?address:
b.network.request.sendAccount(peer, Account(address: address))
@ -326,6 +469,31 @@ proc dropPeer*(b: BlockExcEngine, peer: PeerID) =
# drop the peer from the peers table
b.peers.keepItIf( it.id != peer )
proc advertiseLoop(b: BlockExcEngine) {.async, gcsafe.} =
while true:
if b.advertisedIndex >= b.advertisedBlocks.len:
b.advertisedIndex = 0
b.advertisementFrequency = BlockAdvertisementFrequency
# check that we still have this block.
while
b.advertisedIndex < b.advertisedBlocks.len and
not(b.localStore.contains(b.advertisedBlocks[b.advertisedIndex])):
b.advertisedBlocks.delete(b.advertisedIndex)
#publish it
if b.advertisedIndex < b.advertisedBlocks.len:
asyncSpawn b.discovery.publishProvide(b.advertisedBlocks[b.advertisedIndex])
inc b.advertisedIndex
let toSleep =
if b.advertisedBlocks.len > 0:
b.advertisementFrequency div b.advertisedBlocks.len
else:
30.minutes
await sleepAsync(toSleep) or b.blockAdded.wait()
b.blockAdded.clear()
proc taskHandler*(b: BlockExcEngine, task: BlockExcPeerCtx) {.gcsafe, async.} =
trace "Handling task for peer", peer = task.id
@ -386,6 +554,7 @@ proc new*(
localStore: BlockStore,
wallet: WalletRef,
network: BlockExcNetwork,
discovery: Discovery,
concurrentTasks = DefaultConcurrentTasks,
maxRetries = DefaultMaxRetries,
peersPerRequest = DefaultMaxPeersPerRequest): T =
@ -393,11 +562,13 @@ proc new*(
let engine = BlockExcEngine(
localStore: localStore,
pendingBlocks: PendingBlocksManager.new(),
blockAdded: newAsyncEvent(),
peersPerRequest: peersPerRequest,
network: network,
wallet: wallet,
concurrentTasks: concurrentTasks,
maxRetries: maxRetries,
discovery: discovery,
taskQueue: newAsyncHeapQueue[BlockExcPeerCtx](DefaultTaskQueueSize))
proc peerEventHandler(peerId: PeerID, event: PeerEvent) {.async.} =

View File

@ -8,6 +8,7 @@
## those terms.
import std/tables
import std/sequtils
import pkg/chronicles
import pkg/chronos
@ -289,6 +290,12 @@ proc setupPeer*(b: BlockExcNetwork, peer: PeerID) =
discard b.getOrCreatePeer(peer)
proc dialPeer*(b: BlockExcNetwork, peer: PeerRecord) {.async.} =
try:
await b.switch.connect(peer.peerId, peer.addresses.mapIt(it.address))
except CatchableError as exc:
debug "Failed to connect to peer", error=exc.msg
proc dropPeer*(b: BlockExcNetwork, peer: PeerID) =
## Cleanup disconnected peer
##

View File

@ -19,14 +19,14 @@ import std/typetraits
import pkg/chronicles
import pkg/chronicles/topics_registry
import pkg/confutils/defs
import pkg/confutils/std/net
import pkg/stew/shims/net as stewnet
import pkg/libp2p
import ./discovery
import ./stores/cachestore
export DefaultCacheSizeMiB
const
DefaultTcpListenMultiAddr = "/ip4/0.0.0.0/tcp/0"
export DefaultCacheSizeMiB, net
type
StartUpCommand* {.pure.} = enum
@ -66,17 +66,38 @@ type
defaultValue: noCommand }: StartUpCommand
of noCommand:
listenAddrs* {.
desc: "Specifies one or more listening multiaddrs for the node to listen on."
defaultValue: @[MultiAddress.init("/ip4/0.0.0.0/tcp/0").tryGet()]
defaultValueDesc: "/ip4/0.0.0.0/tcp/0"
abbr: "a"
name: "listen-addrs" }: seq[MultiAddress]
listenPorts* {.
desc: "Specifies one or more listening ports for the node to listen on."
defaultValue: @[Port(0)]
defaultValueDesc: "0"
abbr: "l"
name: "listen-port" }: seq[Port]
# TODO We should have two options: the listen IP and the public IP
# Currently, they are tied together, so we can't be discoverable
# behind a NAT
listenIp* {.
desc: "The public IP"
defaultValue: ValidIpAddress.init("0.0.0.0")
defaultValueDesc: "0.0.0.0"
abbr: "i"
name: "listen-ip" }: ValidIpAddress
discoveryPort* {.
desc: "Specify the discovery (UDP) port"
defaultValue: Port(8090)
defaultValueDesc: "8090"
name: "udp-port" }: Port
netPrivKeyFile* {.
desc: "Source of network (secp256k1) private key file (random|<path>)"
defaultValue: "random"
name: "net-privkey" }: string
bootstrapNodes* {.
desc: "Specifies one or more bootstrap nodes to use when connecting to the network."
abbr: "b"
name: "bootstrap-nodes" }: seq[MultiAddress]
name: "bootstrap-nodes" }: seq[SignedPeerRecord]
maxPeers* {.
desc: "The maximum number of peers to connect to"
@ -119,6 +140,17 @@ func parseCmdArg*(T: type MultiAddress, input: TaintedString): T
{.raises: [ValueError, LPError, Defect].} =
MultiAddress.init($input).tryGet()
proc parseCmdArg*(T: type SignedPeerRecord, uri: TaintedString): T =
var res: SignedPeerRecord
try:
if not res.fromURI(uri):
warn "Invalid SignedPeerRecord uri", uri=uri
quit QuitFailure
except CatchableError as exc:
warn "Invalid SignedPeerRecord uri", uri=uri, error=exc.msg
quit QuitFailure
res
# silly chronicles, colors is a compile-time property
proc stripAnsi(v: string): string =
var

View File

@ -9,6 +9,7 @@
import std/sequtils
import std/os
import std/sugar
import pkg/chronicles
import pkg/chronos
@ -18,6 +19,7 @@ import pkg/confutils
import pkg/confutils/defs
import pkg/nitro
import pkg/stew/io2
import pkg/stew/shims/net as stewnet
import ./node
import ./conf
@ -27,6 +29,7 @@ import ./stores
import ./blockexchange
import ./utils/fileutils
import ./erasure
import ./discovery
type
DaggerServer* = ref object
@ -50,15 +53,51 @@ proc stop*(s: DaggerServer) {.async.} =
proc new*(T: type DaggerServer, config: DaggerConf): T =
const SafePermissions = {UserRead, UserWrite}
let
privateKey =
if config.netPrivKeyFile == "random":
PrivateKey.random(Rng.instance()[]).get()
else:
let path =
if config.netPrivKeyFile.isAbsolute:
config.netPrivKeyFile
else:
config.dataDir / config.netPrivKeyFile
if path.fileAccessible({AccessFlags.Find}):
info "Found a network private key"
if path.getPermissionsSet().get() != SafePermissions:
warn "The network private key file is not safe, aborting"
quit QuitFailure
PrivateKey.init(path.readAllBytes().expect("accessible private key file")).
expect("valid private key file")
else:
info "Creating a private key and saving it"
let
res = PrivateKey.random(Rng.instance()[]).get()
bytes = res.getBytes().get()
path.writeFile(bytes, SafePermissions.toInt()).expect("writing private key file")
PrivateKey.init(bytes).expect("valid key bytes")
let
addresses =
config.listenPorts.mapIt(MultiAddress.init("/ip4/" & $config.listenIp & "/tcp/" & $(it.int)).tryGet()) &
@[MultiAddress.init("/ip4/" & $config.listenIp & "/udp/" & $(config.discoveryPort.int)).tryGet()]
switch = SwitchBuilder
.new()
.withAddresses(config.listenAddrs)
.withPrivateKey(privateKey)
.withAddresses(addresses)
.withRng(Rng.instance())
.withNoise()
.withMplex(5.minutes, 5.minutes)
.withMaxConnections(config.maxPeers)
.withAgentVersion(config.agentString)
.withSignedPeerRecord(true)
.withTcpTransport({ServerFlags.ReuseAddr})
.build()
@ -69,13 +108,20 @@ proc new*(T: type DaggerServer, config: DaggerConf): T =
CacheStore.new()
let
discoveryBootstrapNodes = config.bootstrapNodes
discovery = Discovery.new(
switch.peerInfo,
discoveryPort = config.discoveryPort,
bootstrapNodes = discoveryBootstrapNodes
)
wallet = WalletRef.new(EthPrivateKey.random())
network = BlockExcNetwork.new(switch)
localStore = FSStore.new(config.dataDir / "repo", cache = cache)
engine = BlockExcEngine.new(localStore, wallet, network)
engine = BlockExcEngine.new(localStore, wallet, network, discovery)
store = NetworkStore.new(engine, localStore)
erasure = Erasure.new(store, leoEncoderProvider, leoDecoderProvider)
daggerNode = DaggerNodeRef.new(switch, store, engine, erasure)
daggerNode = DaggerNodeRef.new(switch, store, engine, erasure, discovery)
restServer = RestServerRef.new(
daggerNode.initRestApi(),
initTAddress("127.0.0.1" , config.apiPort),
@ -87,4 +133,5 @@ proc new*(T: type DaggerServer, config: DaggerConf): T =
T(
config: config,
daggerNode: daggerNode,
restServer: restServer)
restServer: restServer,
)

74
dagger/discovery.nim Normal file
View File

@ -0,0 +1,74 @@
## Nim-Dagger
## Copyright (c) 2022 Status Research & Development GmbH
## Licensed under either of
## * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
## * MIT license ([LICENSE-MIT](LICENSE-MIT))
## at your option.
## This file may not be copied, modified, or distributed except according to
## those terms.
import pkg/chronos
import pkg/libp2p
import pkg/questionable
import pkg/questionable/results
import pkg/stew/shims/net
import pkg/libp2pdht/discv5/protocol as discv5
import rng
export discv5
type
Discovery* = ref object
protocol: discv5.Protocol
localInfo: PeerInfo
proc new*(
T: type Discovery,
localInfo: PeerInfo,
discoveryPort: Port,
bootstrapNodes = newSeq[SignedPeerRecord](),
): T =
T(
protocol: newProtocol(
localInfo.privateKey,
bindPort = discoveryPort,
record = localInfo.signedPeerRecord,
bootstrapRecords = bootstrapNodes,
rng = Rng.instance()
),
localInfo: localInfo
)
proc findPeer*(
d: Discovery,
peerId: PeerID): Future[?PeerRecord] {.async.} =
let node = await d.protocol.resolve(toNodeId(peerId))
return
if node.isSome():
some(node.get().record.data)
else:
none(PeerRecord)
proc toDiscoveryId*(cid: Cid): NodeId =
## To discovery id
readUintBE[256](keccak256.digest(cid.data.buffer).data)
proc findBlockProviders*(
d: Discovery,
cid: Cid): Future[seq[SignedPeerRecord]] {.async.} =
return (await d.protocol.getProviders(cid.toDiscoveryId())).get()
proc publishProvide*(d: Discovery, cid: Cid) {.async.} =
let bid = cid.toDiscoveryId()
discard await d.protocol.addProvider(bid, d.localInfo.signedPeerRecord)
proc start*(d: Discovery) {.async.} =
d.protocol.updateRecord(d.localInfo.signedPeerRecord).expect("updating SPR")
d.protocol.open()
d.protocol.start()
proc stop*(d: Discovery) {.async.} =
await d.protocol.closeWait()

View File

@ -27,6 +27,7 @@ import ./stores/blockstore
import ./blockexchange
import ./streams
import ./erasure
import ./discovery
logScope:
topics = "dagger node"
@ -40,11 +41,13 @@ type
blockStore*: BlockStore
engine*: BlockExcEngine
erasure*: Erasure
discovery*: Discovery
proc start*(node: DaggerNodeRef) {.async.} =
await node.switch.start()
await node.engine.start()
await node.erasure.start()
await node.discovery.start()
node.networkId = node.switch.peerInfo.peerId
notice "Started dagger node", id = $node.networkId, addrs = node.switch.peerInfo.addrs
@ -55,11 +58,12 @@ proc stop*(node: DaggerNodeRef) {.async.} =
await node.engine.stop()
await node.switch.stop()
await node.erasure.stop()
await node.discovery.stop()
proc findPeer*(
node: DaggerNodeRef,
peerId: PeerID): Future[?!PeerRecord] {.async.} =
discard
peerId: PeerID): Future[?PeerRecord] {.async.} =
return await node.discovery.findPeer(peerId)
proc connect*(
node: DaggerNodeRef,
@ -230,9 +234,11 @@ proc new*(
switch: Switch,
store: BlockStore,
engine: BlockExcEngine,
erasure: Erasure): T =
erasure: Erasure,
discovery: Discovery): T =
T(
switch: switch,
blockStore: store,
engine: engine,
erasure: erasure)
erasure: erasure,
discovery: discovery)

View File

@ -106,15 +106,11 @@ proc initRestApi*(node: DaggerNodeRef): RestRouter =
let addresses = if addrs.isOk and addrs.get().len > 0:
addrs.get()
else:
let peerRecord = await node.findPeer(peerId.get())
if peerRecord.isErr:
without peerRecord =? (await node.findPeer(peerId.get())):
return RestApiResponse.error(
Http400,
"Unable to find Peer!")
peerRecord.get().addresses.mapIt(
it.address
)
peerRecord.addresses.mapIt(it.address)
try:
await node.connect(peerId.get(), addresses)
return RestApiResponse.response("Successfully connected to peer")

View File

@ -52,5 +52,11 @@ method hasBlock*(s: BlockStore, cid: Cid): bool {.base.} =
return false
method blockList*(s: BlockStore): Future[seq[Cid]] {.base.} =
## Get the list of blocks in the BlockStore. This is an intensive operation
##
raiseAssert("Not implemented!")
proc contains*(s: BlockStore, blk: Cid): bool =
s.hasBlock(blk)

View File

@ -7,6 +7,7 @@
## This file may not be copied, modified, or distributed except according to
## those terms.
import std/sequtils
import pkg/upraises
push: {.upraises: [].}
@ -67,6 +68,9 @@ method hasBlock*(self: CacheStore, cid: Cid): bool =
cid in self.cache
method blockList*(s: CacheStore): Future[seq[Cid]] {.async.} =
return toSeq(s.cache.keys)
func putBlockSync(self: CacheStore, blk: Block): bool =
let blkSize = blk.data.len # in bytes

View File

@ -129,6 +129,22 @@ method hasBlock*(self: FSStore, cid: Cid): bool =
self.blockPath(cid).isFile()
method blockList*(s: FSStore): Future[seq[Cid]] {.async.} =
## Very expensive AND blocking!
debug "finding all blocks in store"
for (pkind, folderPath) in s.repoDir.walkDir():
if pkind != pcDir: continue
let baseName = basename(folderPath)
if baseName.len != s.postfixLen: continue
for (fkind, filePath) in folderPath.walkDir(false):
if fkind != pcFile: continue
let cid = Cid.init(basename(filePath))
if cid.isOk:
result.add(cid.get())
return result
proc new*(
T: type FSStore,
repoDir: string,

1
tests/config.nims Normal file
View File

@ -0,0 +1 @@
patchFile("dagger", "discovery", "dagger/mockdiscovery")

View File

@ -1,4 +1,5 @@
import std/sequtils
import std/sugar
import std/algorithm
import pkg/asynctest
@ -7,11 +8,13 @@ import pkg/stew/byteutils
import pkg/libp2p
import pkg/libp2p/errors
import pkg/libp2pdht/discv5/protocol as discv5
import pkg/dagger/rng
import pkg/dagger/stores
import pkg/dagger/blockexchange
import pkg/dagger/chunker
import pkg/dagger/discovery
import pkg/dagger/blocktype as bt
import ../helpers
@ -34,6 +37,7 @@ suite "NetworkStore engine - 2 nodes":
blocks1, blocks2: seq[bt.Block]
engine1, engine2: BlockExcEngine
localStore1, localStore2: BlockStore
discovery1, discovery2: Discovery
setup:
while true:
@ -63,14 +67,16 @@ suite "NetworkStore engine - 2 nodes":
peerId2 = switch2.peerInfo.peerId
localStore1 = CacheStore.new(blocks1.mapIt( it ))
discovery1 = Discovery.new(switch1.peerInfo, Port(0))
network1 = BlockExcNetwork.new(switch = switch1)
engine1 = BlockExcEngine.new(localStore1, wallet1, network1)
engine1 = BlockExcEngine.new(localStore1, wallet1, network1, discovery1)
blockexc1 = NetworkStore.new(engine1, localStore1)
switch1.mount(network1)
localStore2 = CacheStore.new(blocks2.mapIt( it ))
discovery2 = Discovery.new(switch2.peerInfo, Port(0))
network2 = BlockExcNetwork.new(switch = switch2)
engine2 = BlockExcEngine.new(localStore2, wallet2, network2)
engine2 = BlockExcEngine.new(localStore2, wallet2, network2, discovery2)
blockexc2 = NetworkStore.new(engine2, localStore2)
switch2.mount(network2)
@ -80,8 +86,8 @@ suite "NetworkStore engine - 2 nodes":
)
# initialize our want lists
blockexc1.engine.wantList = blocks2.mapIt( it.cid )
blockexc2.engine.wantList = blocks1.mapIt( it.cid )
for b in blocks2: discard blockexc1.engine.discoverBlock(b.cid)
for b in blocks1: discard blockexc2.engine.discoverBlock(b.cid)
pricing1.address = wallet1.address
pricing2.address = wallet2.address
@ -92,7 +98,7 @@ suite "NetworkStore engine - 2 nodes":
switch2.peerInfo.peerId,
switch2.peerInfo.addrs)
await sleepAsync(1.seconds) # give some time to exchange lists
await sleepAsync(100.milliseconds) # give some time to exchange lists
peerCtx2 = blockexc1.engine.getPeerCtx(peerId2)
peerCtx1 = blockexc2.engine.getPeerCtx(peerId1)
@ -109,10 +115,10 @@ suite "NetworkStore engine - 2 nodes":
check:
peerCtx1.peerHave.mapIt( $it ).sorted(cmp[string]) ==
blockexc2.engine.wantList.mapIt( $it ).sorted(cmp[string])
toSeq(blockexc2.engine.runningDiscoveries.keys()).mapIt( $it ).sorted(cmp[string])
peerCtx2.peerHave.mapIt( $it ).sorted(cmp[string]) ==
blockexc1.engine.wantList.mapIt( $it ).sorted(cmp[string])
toSeq(blockexc1.engine.runningDiscoveries.keys()).mapIt( $it ).sorted(cmp[string])
test "exchanges accounts on connect":
check peerCtx1.account.?address == pricing1.address.some
@ -169,8 +175,7 @@ suite "NetworkStore engine - 2 nodes":
check wallet2.balance(channel, Asset) > 0
suite "NetworkStore - multiple nodes":
let
chunker = RandomChunker.new(Rng.instance(), size = 4096, chunkSize = 256)
let chunker = RandomChunker.new(Rng.instance(), size = 4096, chunkSize = 256)
var
switch: seq[Switch]
@ -208,8 +213,10 @@ suite "NetworkStore - multiple nodes":
engine = downloader.engine
# Add blocks from 1st peer to want list
engine.wantList &= blocks[0..3].mapIt( it.cid )
engine.wantList &= blocks[12..15].mapIt( it.cid )
for b in blocks[0..3]:
discard engine.discoverBlock(b.cid)
for b in blocks[12..15]:
discard engine.discoverBlock(b.cid)
await allFutures(
blocks[0..3].mapIt( blockexc[0].engine.localStore.putBlock(it) ))
@ -236,8 +243,10 @@ suite "NetworkStore - multiple nodes":
engine = downloader.engine
# Add blocks from 1st peer to want list
engine.wantList &= blocks[0..3].mapIt( it.cid )
engine.wantList &= blocks[12..15].mapIt( it.cid )
for b in blocks[0..3]:
discard engine.discoverBlock(b.cid)
for b in blocks[12..15]:
discard engine.discoverBlock(b.cid)
await allFutures(
blocks[0..3].mapIt( blockexc[0].engine.localStore.putBlock(it) ))
@ -254,3 +263,71 @@ suite "NetworkStore - multiple nodes":
let wantListBlocks = await allFinished(
blocks[0..3].mapIt( downloader.getBlock(it.cid) ))
check wantListBlocks.mapIt( !it.read ) == blocks[0..3]
suite "NetworkStore - discovery":
let chunker = RandomChunker.new(Rng.instance(), size = 4096, chunkSize = 256)
var
switch: seq[Switch]
blockexc: seq[NetworkStore]
blocks: seq[bt.Block]
setup:
while true:
let chunk = await chunker.getBytes()
if chunk.len <= 0:
break
blocks.add(bt.Block.new(chunk).tryGet())
for e in generateNodes(4):
switch.add(e.switch)
blockexc.add(e.blockexc)
await e.blockexc.engine.start()
await allFuturesThrowing(
switch.mapIt( it.start() )
)
teardown:
await allFuturesThrowing(
switch.mapIt( it.stop() )
)
switch = @[]
blockexc = @[]
test "Shouldn't launch discovery request if we are already connected":
await blockexc[0].engine.blocksHandler(switch[1].peerInfo.peerId, blocks)
blockexc[0].engine.discovery.findBlockProviders_var = proc(d: Discovery, cid: Cid): seq[SignedPeerRecord] =
check false
await connectNodes(switch)
let blk = await blockexc[1].engine.requestBlock(blocks[0].cid)
test "E2E discovery":
# Distribute the blocks amongst 1..3
# Ask 0 to download everything without connecting him beforehand
var advertised: Table[Cid, SignedPeerRecord]
blockexc[1].engine.discovery.publishProvide_var = proc(d: Discovery, cid: Cid) =
advertised[cid] = switch[1].peerInfo.signedPeerRecord
blockexc[2].engine.discovery.publishProvide_var = proc(d: Discovery, cid: Cid) =
advertised[cid] = switch[2].peerInfo.signedPeerRecord
blockexc[3].engine.discovery.publishProvide_var = proc(d: Discovery, cid: Cid) =
advertised[cid] = switch[3].peerInfo.signedPeerRecord
await blockexc[1].engine.blocksHandler(switch[0].peerInfo.peerId, blocks[0..5])
await blockexc[2].engine.blocksHandler(switch[0].peerInfo.peerId, blocks[4..10])
await blockexc[3].engine.blocksHandler(switch[0].peerInfo.peerId, blocks[10..15])
blockexc[0].engine.discovery.findBlockProviders_var = proc(d: Discovery, cid: Cid): seq[SignedPeerRecord] =
if cid in advertised:
result.add(advertised[cid])
let futs = collect(newSeq):
for b in blocks:
blockexc[0].engine.requestBlock(b.cid)
await allFutures(futs)

View File

@ -1,15 +1,19 @@
import std/sequtils
import std/random
import std/algorithm
import pkg/stew/byteutils
import pkg/asynctest
import pkg/chronos
import pkg/libp2p
import pkg/libp2p/routing_record
import pkg/libp2pdht/discv5/protocol as discv5
import pkg/dagger/rng
import pkg/dagger/blockexchange
import pkg/dagger/stores
import pkg/dagger/chunker
import pkg/dagger/discovery
import pkg/dagger/blocktype as bt
import pkg/dagger/utils/asyncheapqueue
@ -23,6 +27,7 @@ suite "NetworkStore engine basic":
peerId = PeerID.init(seckey.getPublicKey().tryGet()).tryGet()
chunker = RandomChunker.new(Rng.instance(), size = 1024, chunkSize = 256)
wallet = WalletRef.example
discovery = Discovery.new()
var
blocks: seq[bt.Block]
@ -47,7 +52,7 @@ suite "NetworkStore engine basic":
wantType: WantType = WantType.wantHave,
full: bool = false,
sendDontHave: bool = false) {.gcsafe.} =
check cids == blocks.mapIt( it.cid )
check cids.mapIt($it).sorted == blocks.mapIt( $it.cid ).sorted
done.complete()
@ -59,8 +64,10 @@ suite "NetworkStore engine basic":
engine = BlockExcEngine.new(
CacheStore.new(blocks.mapIt( it )),
wallet,
network)
engine.wantList = blocks.mapIt( it.cid )
network,
discovery)
for b in blocks:
discard engine.discoverBlock(b.cid)
engine.setupPeer(peerId)
await done
@ -77,7 +84,7 @@ suite "NetworkStore engine basic":
sendAccount: sendAccount,
))
engine = BlockExcEngine.new(CacheStore.new, wallet, network)
engine = BlockExcEngine.new(CacheStore.new, wallet, network, discovery)
engine.pricing = pricing.some
engine.setupPeer(peerId)
@ -90,6 +97,7 @@ suite "NetworkStore engine handlers":
peerId = PeerID.init(seckey.getPublicKey().tryGet()).tryGet()
chunker = RandomChunker.new(Rng.instance(), size = 1024, chunkSize = 256)
wallet = WalletRef.example
discovery = Discovery.new()
var
engine: BlockExcEngine
@ -106,7 +114,7 @@ suite "NetworkStore engine handlers":
blocks.add(bt.Block.new(chunk).tryGet())
done = newFuture[void]()
engine = BlockExcEngine.new(CacheStore.new(), wallet, BlockExcNetwork())
engine = BlockExcEngine.new(CacheStore.new(), wallet, BlockExcNetwork(), discovery)
peerCtx = BlockExcPeerCtx(
id: peerId
)
@ -230,7 +238,7 @@ suite "Task Handler":
blocks.add(bt.Block.new(chunk).tryGet())
done = newFuture[void]()
engine = BlockExcEngine.new(CacheStore.new(), wallet, BlockExcNetwork())
engine = BlockExcEngine.new(CacheStore.new(), wallet, BlockExcNetwork(), Discovery.new())
peersCtx = @[]
for i in 0..3:

View File

@ -3,6 +3,7 @@ import std/sequtils
import pkg/chronos
import pkg/libp2p
import pkg/dagger/discovery
import pkg/dagger/stores
import pkg/dagger/blocktype as bt
@ -17,16 +18,15 @@ proc generateNodes*(
for i in 0..<num:
let
switch = newStandardSwitch(transportFlags = {ServerFlags.ReuseAddr})
discovery = Discovery.new(switch.peerInfo, Port(0))
wallet = WalletRef.example
network = BlockExcNetwork.new(switch)
localStore = CacheStore.new(blocks.mapIt( it ))
engine = BlockExcEngine.new(localStore, wallet, network)
engine = BlockExcEngine.new(localStore, wallet, network, discovery)
networkStore = NetworkStore.new(engine, localStore)
switch.mount(network)
# initialize our want lists
engine.wantList = blocks.mapIt( it.cid )
switch.mount(network)
result.add((switch, networkStore))

View File

@ -0,0 +1,54 @@
## Nim-Dagger
## Copyright (c) 2022 Status Research & Development GmbH
## Licensed under either of
## * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
## * MIT license ([LICENSE-MIT](LICENSE-MIT))
## at your option.
## This file may not be copied, modified, or distributed except according to
## those terms.
import pkg/chronos
import pkg/libp2p
import pkg/questionable
import pkg/questionable/results
import pkg/stew/shims/net
import pkg/libp2pdht/discv5/protocol as discv5
export discv5
type
Discovery* = ref object
findBlockProviders_var*: proc(d: Discovery, cid: Cid): seq[SignedPeerRecord] {.gcsafe.}
publishProvide_var*: proc(d: Discovery, cid: Cid) {.gcsafe.}
proc new*(
T: type Discovery,
localInfo: PeerInfo,
discoveryPort: Port,
bootstrapNodes = newSeq[SignedPeerRecord](),
): T =
T()
proc findPeer*(
d: Discovery,
peerId: PeerID): Future[?PeerRecord] {.async.} =
return none(PeerRecord)
proc findBlockProviders*(
d: Discovery,
cid: Cid): Future[seq[SignedPeerRecord]] {.async.} =
if isNil(d.findBlockProviders_var): return
return d.findBlockProviders_var(d, cid)
proc publishProvide*(d: Discovery, cid: Cid) {.async.} =
if isNil(d.publishProvide_var): return
d.publishProvide_var(d, cid)
proc start*(d: Discovery) {.async.} =
discard
proc stop*(d: Discovery) {.async.} =
discard

View File

@ -52,6 +52,12 @@ suite "FS Store":
check store.hasBlock(newBlock.cid)
test "blockList":
createDir(store.blockPath(newBlock.cid).parentDir)
writeFile(store.blockPath(newBlock.cid), newBlock.data)
check (await store.blockList()) == @[newBlock.cid]
test "fail hasBlock":
check not store.hasBlock(newBlock.cid)

View File

@ -8,12 +8,14 @@ import pkg/stew/byteutils
import pkg/nitro
import pkg/libp2p
import pkg/libp2pdht/discv5/protocol as discv5
import pkg/dagger/stores
import pkg/dagger/blockexchange
import pkg/dagger/chunker
import pkg/dagger/node
import pkg/dagger/manifest
import pkg/dagger/discovery
import pkg/dagger/blocktype as bt
import ./helpers
@ -32,6 +34,7 @@ suite "Test Node":
engine: BlockExcEngine
store: NetworkStore
node: DaggerNodeRef
discovery: Discovery
setup:
file = open(path.splitFile().dir /../ "fixtures" / "test.jpg")
@ -40,9 +43,10 @@ suite "Test Node":
wallet = WalletRef.new(EthPrivateKey.random())
network = BlockExcNetwork.new(switch)
localStore = CacheStore.new()
engine = BlockExcEngine.new(localStore, wallet, network)
discovery = Discovery.new(switch.peerInfo, Port(0))
engine = BlockExcEngine.new(localStore, wallet, network, discovery)
store = NetworkStore.new(engine, localStore)
node = DaggerNodeRef.new(switch, store, engine, nil) # TODO: pass `Erasure`
node = DaggerNodeRef.new(switch, store, engine, nil, discovery) # TODO: pass `Erasure`
await node.start()

@ -1 +1 @@
Subproject commit 536cc6b7933e5f86590bb27083c0ffeab31255f9
Subproject commit fbb76f8af8a33ab818184a7d4406d9fee20993be

2
vendor/lrucache.nim vendored

@ -1 +1 @@
Subproject commit 717abe4e612b5bd5c8c71ee14939d139a8e633e3
Subproject commit 8767ade0b76ea5b5d4ce24a52d0c58a6ebeb66cd

2
vendor/nim-libp2p vendored

@ -1 +1 @@
Subproject commit e72d03bc78d3bc896ae5912ab45e2ecd53849aa5
Subproject commit b2d980f258ebeabdaa0eedfe3722ebf949f6dadb

1
vendor/nim-libp2p-dht vendored Submodule

@ -0,0 +1 @@
Subproject commit 6c4ce518f55007a861fe577b48cabdca7eaa32df