diff --git a/.gitmodules b/.gitmodules index a4d16568..1da07b1b 100644 --- a/.gitmodules +++ b/.gitmodules @@ -178,3 +178,8 @@ [submodule "vendor/nim-leopard"] path = vendor/nim-leopard url = https://github.com/status-im/nim-leopard.git +[submodule "vendor/nim-libp2p-dht"] + path = vendor/nim-libp2p-dht + url = https://github.com/status-im/nim-libp2p-dht.git + ignore = untracked + branch = master diff --git a/config.nims b/config.nims index 7d12c021..068353f1 100644 --- a/config.nims +++ b/config.nims @@ -66,6 +66,8 @@ switch("warning", "ObservableStores:off") # Too many false positives for "Warning: method has lock level , but another method has 0 [LockLevel]" switch("warning", "LockLevel:off") + +switch("define", "libp2p_pki_schemes=secp256k1") switch("define", "chronicles_sinks=textlines[dynamic],json[dynamic]") # begin Nimble config (version 1) diff --git a/dagger/blockexchange/engine.nim b/dagger/blockexchange/engine.nim index acd866fd..4b5a2203 100644 --- a/dagger/blockexchange/engine.nim +++ b/dagger/blockexchange/engine.nim @@ -7,7 +7,7 @@ ## This file may not be copied, modified, or distributed except according to ## those terms. -import std/sequtils +import std/[sequtils, sets, tables, sugar] import pkg/chronos import pkg/chronicles @@ -16,6 +16,7 @@ import pkg/libp2p import ../stores/blockstore import ../blocktype as bt import ../utils/asyncheapqueue +import ../discovery import ./protobuf/blockexc import ./protobuf/presence @@ -37,15 +38,29 @@ const DefaultConcurrentTasks = 10 DefaultMaxRetries = 3 + # Current advertisement is meant to be more efficient than + # correct, so blocks could be advertised more slowly than that + # Put some margin + BlockAdvertisementFrequency = 30.minutes + type TaskHandler* = proc(task: BlockExcPeerCtx): Future[void] {.gcsafe.} TaskScheduler* = proc(task: BlockExcPeerCtx): bool {.gcsafe.} + BlockDiscovery* = ref object + discoveredProvider: AsyncEvent + discoveryLoop: Future[void] + toDiscover: Cid + treatedPeer: HashSet[PeerId] + inflightIWant: HashSet[PeerId] + gotIWantResponse: AsyncEvent + provides: seq[PeerId] + lastDhtQuery: Moment + BlockExcEngine* = ref object of RootObj localStore*: BlockStore # where we localStore blocks for this instance network*: BlockExcNetwork # network interface peers*: seq[BlockExcPeerCtx] # peers we're currently actively exchanging with - wantList*: seq[Cid] # local wants list taskQueue*: AsyncHeapQueue[BlockExcPeerCtx] # peers we're currently processing tasks for concurrentTasks: int # number of concurrent peers we're serving at any given time maxRetries: int # max number of tries for a failed block @@ -55,6 +70,12 @@ type peersPerRequest: int # max number of peers to request from wallet*: WalletRef # nitro wallet for micropayments pricing*: ?Pricing # optional bandwidth pricing + advertisedBlocks: seq[Cid] + advertisedIndex: int + advertisementFrequency: Duration + runningDiscoveries*: Table[Cid, BlockDiscovery] + blockAdded: AsyncEvent + discovery*: Discovery Pricing* = object address*: EthAddress @@ -79,6 +100,7 @@ proc scheduleTask(b: BlockExcEngine, task: BlockExcPeerCtx): bool {.gcsafe} = b.taskQueue.pushOrUpdateNoWait(task).isOk() proc blockexcTaskRunner(b: BlockExcEngine): Future[void] {.gcsafe.} +proc advertiseLoop(b: BlockExcEngine): Future[void] {.gcsafe.} proc start*(b: BlockExcEngine) {.async.} = ## Start the blockexc task @@ -94,6 +116,14 @@ proc start*(b: BlockExcEngine) {.async.} = for i in 0.. 0 trace "Requesting block from peer", peer = blockPeer.id, cid # request block b.network.request.sendWantList( - blockPeer.id, + discovery.provides[0], @[cid], wantType = WantType.wantBlock) # we want this remote to send us a block - if peers.len == 0: - return blk # no peers to send wants to - - # filter out the peer we've already requested from - let stop = min(peers.high, b.peersPerRequest) - trace "Sending want list requests to remaining peers", count = stop + 1 - for p in peers[0..stop]: - if cid notin p.peerHave: - # just send wants - b.network.request.sendWantList( - p.id, - @[cid], - wantType = WantType.wantHave) # we only want to know if the peer has the block - - return blk + #TODO substract the discovery time + return await blk.wait(timeout) proc blockPresenceHandler*( b: BlockExcEngine, @@ -177,12 +298,17 @@ proc blockPresenceHandler*( ## let peerCtx = b.getPeerCtx(peer) - if isNil(peerCtx): - return for blk in blocks: if presence =? Presence.init(blk): - peerCtx.updatePresence(presence) + if not isNil(peerCtx): + peerCtx.updatePresence(presence) + if presence.cid in b.runningDiscoveries: + let bd = b.runningDiscoveries[presence.cid] + if not presence.have: + bd.inflightIWant.excl(peer) + bd.treatedPeer.incl(peer) + bd.gotIWantResponse.fire() proc scheduleTasks(b: BlockExcEngine, blocks: seq[bt.Block]) = trace "Schedule a task for new blocks" @@ -204,8 +330,20 @@ proc resolveBlocks*(b: BlockExcEngine, blocks: seq[bt.Block]) = ## trace "Resolving blocks" - b.pendingBlocks.resolve(blocks) - b.scheduleTasks(blocks) + + var gotNewBlocks = false + for bl in blocks: + if bl.cid notin b.advertisedBlocks: #TODO that's very slow, maybe a ordered hashset instead + #TODO could do some smarter ordering here (insert it just before b.advertisedIndex, or similar) + b.advertisedBlocks.add(bl.cid) + asyncSpawn b.discovery.publishProvide(bl.cid) + gotNewBlocks = true + + if gotNewBlocks: + b.pendingBlocks.resolve(blocks) + b.scheduleTasks(blocks) + + b.blockAdded.fire() proc payForBlocks(engine: BlockExcEngine, peer: BlockExcPeerCtx, @@ -311,8 +449,13 @@ proc setupPeer*(b: BlockExcEngine, peer: PeerID) = )) # broadcast our want list, the other peer will do the same - if b.wantList.len > 0: - b.network.request.sendWantList(peer, b.wantList, full = true) + let wantList = collect(newSeqOfCap(b.runningDiscoveries.len)): + for cid, bd in b.runningDiscoveries: + bd.inflightIWant.incl(peer) + cid + + if wantList.len > 0: + b.network.request.sendWantList(peer, wantList, full = true, sendDontHave = true) if address =? b.pricing.?address: b.network.request.sendAccount(peer, Account(address: address)) @@ -326,6 +469,31 @@ proc dropPeer*(b: BlockExcEngine, peer: PeerID) = # drop the peer from the peers table b.peers.keepItIf( it.id != peer ) +proc advertiseLoop(b: BlockExcEngine) {.async, gcsafe.} = + while true: + if b.advertisedIndex >= b.advertisedBlocks.len: + b.advertisedIndex = 0 + b.advertisementFrequency = BlockAdvertisementFrequency + + # check that we still have this block. + while + b.advertisedIndex < b.advertisedBlocks.len and + not(b.localStore.contains(b.advertisedBlocks[b.advertisedIndex])): + b.advertisedBlocks.delete(b.advertisedIndex) + + #publish it + if b.advertisedIndex < b.advertisedBlocks.len: + asyncSpawn b.discovery.publishProvide(b.advertisedBlocks[b.advertisedIndex]) + + inc b.advertisedIndex + let toSleep = + if b.advertisedBlocks.len > 0: + b.advertisementFrequency div b.advertisedBlocks.len + else: + 30.minutes + await sleepAsync(toSleep) or b.blockAdded.wait() + b.blockAdded.clear() + proc taskHandler*(b: BlockExcEngine, task: BlockExcPeerCtx) {.gcsafe, async.} = trace "Handling task for peer", peer = task.id @@ -386,6 +554,7 @@ proc new*( localStore: BlockStore, wallet: WalletRef, network: BlockExcNetwork, + discovery: Discovery, concurrentTasks = DefaultConcurrentTasks, maxRetries = DefaultMaxRetries, peersPerRequest = DefaultMaxPeersPerRequest): T = @@ -393,11 +562,13 @@ proc new*( let engine = BlockExcEngine( localStore: localStore, pendingBlocks: PendingBlocksManager.new(), + blockAdded: newAsyncEvent(), peersPerRequest: peersPerRequest, network: network, wallet: wallet, concurrentTasks: concurrentTasks, maxRetries: maxRetries, + discovery: discovery, taskQueue: newAsyncHeapQueue[BlockExcPeerCtx](DefaultTaskQueueSize)) proc peerEventHandler(peerId: PeerID, event: PeerEvent) {.async.} = diff --git a/dagger/blockexchange/network.nim b/dagger/blockexchange/network.nim index 53753cb5..3f8729c6 100644 --- a/dagger/blockexchange/network.nim +++ b/dagger/blockexchange/network.nim @@ -8,6 +8,7 @@ ## those terms. import std/tables +import std/sequtils import pkg/chronicles import pkg/chronos @@ -289,6 +290,12 @@ proc setupPeer*(b: BlockExcNetwork, peer: PeerID) = discard b.getOrCreatePeer(peer) +proc dialPeer*(b: BlockExcNetwork, peer: PeerRecord) {.async.} = + try: + await b.switch.connect(peer.peerId, peer.addresses.mapIt(it.address)) + except CatchableError as exc: + debug "Failed to connect to peer", error=exc.msg + proc dropPeer*(b: BlockExcNetwork, peer: PeerID) = ## Cleanup disconnected peer ## diff --git a/dagger/conf.nim b/dagger/conf.nim index c99c0f93..6e07aad9 100644 --- a/dagger/conf.nim +++ b/dagger/conf.nim @@ -19,14 +19,14 @@ import std/typetraits import pkg/chronicles import pkg/chronicles/topics_registry import pkg/confutils/defs +import pkg/confutils/std/net +import pkg/stew/shims/net as stewnet import pkg/libp2p +import ./discovery import ./stores/cachestore -export DefaultCacheSizeMiB - -const - DefaultTcpListenMultiAddr = "/ip4/0.0.0.0/tcp/0" +export DefaultCacheSizeMiB, net type StartUpCommand* {.pure.} = enum @@ -66,17 +66,38 @@ type defaultValue: noCommand }: StartUpCommand of noCommand: - listenAddrs* {. - desc: "Specifies one or more listening multiaddrs for the node to listen on." - defaultValue: @[MultiAddress.init("/ip4/0.0.0.0/tcp/0").tryGet()] - defaultValueDesc: "/ip4/0.0.0.0/tcp/0" - abbr: "a" - name: "listen-addrs" }: seq[MultiAddress] + listenPorts* {. + desc: "Specifies one or more listening ports for the node to listen on." + defaultValue: @[Port(0)] + defaultValueDesc: "0" + abbr: "l" + name: "listen-port" }: seq[Port] + + # TODO We should have two options: the listen IP and the public IP + # Currently, they are tied together, so we can't be discoverable + # behind a NAT + listenIp* {. + desc: "The public IP" + defaultValue: ValidIpAddress.init("0.0.0.0") + defaultValueDesc: "0.0.0.0" + abbr: "i" + name: "listen-ip" }: ValidIpAddress + + discoveryPort* {. + desc: "Specify the discovery (UDP) port" + defaultValue: Port(8090) + defaultValueDesc: "8090" + name: "udp-port" }: Port + + netPrivKeyFile* {. + desc: "Source of network (secp256k1) private key file (random|)" + defaultValue: "random" + name: "net-privkey" }: string bootstrapNodes* {. desc: "Specifies one or more bootstrap nodes to use when connecting to the network." abbr: "b" - name: "bootstrap-nodes" }: seq[MultiAddress] + name: "bootstrap-nodes" }: seq[SignedPeerRecord] maxPeers* {. desc: "The maximum number of peers to connect to" @@ -119,6 +140,17 @@ func parseCmdArg*(T: type MultiAddress, input: TaintedString): T {.raises: [ValueError, LPError, Defect].} = MultiAddress.init($input).tryGet() +proc parseCmdArg*(T: type SignedPeerRecord, uri: TaintedString): T = + var res: SignedPeerRecord + try: + if not res.fromURI(uri): + warn "Invalid SignedPeerRecord uri", uri=uri + quit QuitFailure + except CatchableError as exc: + warn "Invalid SignedPeerRecord uri", uri=uri, error=exc.msg + quit QuitFailure + res + # silly chronicles, colors is a compile-time property proc stripAnsi(v: string): string = var diff --git a/dagger/dagger.nim b/dagger/dagger.nim index 83a96307..db9a0222 100644 --- a/dagger/dagger.nim +++ b/dagger/dagger.nim @@ -9,6 +9,7 @@ import std/sequtils import std/os +import std/sugar import pkg/chronicles import pkg/chronos @@ -18,6 +19,7 @@ import pkg/confutils import pkg/confutils/defs import pkg/nitro import pkg/stew/io2 +import pkg/stew/shims/net as stewnet import ./node import ./conf @@ -27,6 +29,7 @@ import ./stores import ./blockexchange import ./utils/fileutils import ./erasure +import ./discovery type DaggerServer* = ref object @@ -50,15 +53,51 @@ proc stop*(s: DaggerServer) {.async.} = proc new*(T: type DaggerServer, config: DaggerConf): T = + const SafePermissions = {UserRead, UserWrite} let + privateKey = + if config.netPrivKeyFile == "random": + PrivateKey.random(Rng.instance()[]).get() + else: + let path = + if config.netPrivKeyFile.isAbsolute: + config.netPrivKeyFile + else: + config.dataDir / config.netPrivKeyFile + + if path.fileAccessible({AccessFlags.Find}): + info "Found a network private key" + + if path.getPermissionsSet().get() != SafePermissions: + warn "The network private key file is not safe, aborting" + quit QuitFailure + + PrivateKey.init(path.readAllBytes().expect("accessible private key file")). + expect("valid private key file") + else: + info "Creating a private key and saving it" + let + res = PrivateKey.random(Rng.instance()[]).get() + bytes = res.getBytes().get() + + path.writeFile(bytes, SafePermissions.toInt()).expect("writing private key file") + + PrivateKey.init(bytes).expect("valid key bytes") + + let + addresses = + config.listenPorts.mapIt(MultiAddress.init("/ip4/" & $config.listenIp & "/tcp/" & $(it.int)).tryGet()) & + @[MultiAddress.init("/ip4/" & $config.listenIp & "/udp/" & $(config.discoveryPort.int)).tryGet()] switch = SwitchBuilder .new() - .withAddresses(config.listenAddrs) + .withPrivateKey(privateKey) + .withAddresses(addresses) .withRng(Rng.instance()) .withNoise() .withMplex(5.minutes, 5.minutes) .withMaxConnections(config.maxPeers) .withAgentVersion(config.agentString) + .withSignedPeerRecord(true) .withTcpTransport({ServerFlags.ReuseAddr}) .build() @@ -69,13 +108,20 @@ proc new*(T: type DaggerServer, config: DaggerConf): T = CacheStore.new() let + discoveryBootstrapNodes = config.bootstrapNodes + discovery = Discovery.new( + switch.peerInfo, + discoveryPort = config.discoveryPort, + bootstrapNodes = discoveryBootstrapNodes + ) + wallet = WalletRef.new(EthPrivateKey.random()) network = BlockExcNetwork.new(switch) localStore = FSStore.new(config.dataDir / "repo", cache = cache) - engine = BlockExcEngine.new(localStore, wallet, network) + engine = BlockExcEngine.new(localStore, wallet, network, discovery) store = NetworkStore.new(engine, localStore) erasure = Erasure.new(store, leoEncoderProvider, leoDecoderProvider) - daggerNode = DaggerNodeRef.new(switch, store, engine, erasure) + daggerNode = DaggerNodeRef.new(switch, store, engine, erasure, discovery) restServer = RestServerRef.new( daggerNode.initRestApi(), initTAddress("127.0.0.1" , config.apiPort), @@ -87,4 +133,5 @@ proc new*(T: type DaggerServer, config: DaggerConf): T = T( config: config, daggerNode: daggerNode, - restServer: restServer) + restServer: restServer, + ) diff --git a/dagger/discovery.nim b/dagger/discovery.nim new file mode 100644 index 00000000..60d9f999 --- /dev/null +++ b/dagger/discovery.nim @@ -0,0 +1,74 @@ +## Nim-Dagger +## Copyright (c) 2022 Status Research & Development GmbH +## Licensed under either of +## * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE)) +## * MIT license ([LICENSE-MIT](LICENSE-MIT)) +## at your option. +## This file may not be copied, modified, or distributed except according to +## those terms. + +import pkg/chronos +import pkg/libp2p +import pkg/questionable +import pkg/questionable/results +import pkg/stew/shims/net +import pkg/libp2pdht/discv5/protocol as discv5 + +import rng + +export discv5 + +type + Discovery* = ref object + protocol: discv5.Protocol + localInfo: PeerInfo + +proc new*( + T: type Discovery, + localInfo: PeerInfo, + discoveryPort: Port, + bootstrapNodes = newSeq[SignedPeerRecord](), + ): T = + + T( + protocol: newProtocol( + localInfo.privateKey, + bindPort = discoveryPort, + record = localInfo.signedPeerRecord, + bootstrapRecords = bootstrapNodes, + rng = Rng.instance() + ), + localInfo: localInfo + ) + +proc findPeer*( + d: Discovery, + peerId: PeerID): Future[?PeerRecord] {.async.} = + let node = await d.protocol.resolve(toNodeId(peerId)) + return + if node.isSome(): + some(node.get().record.data) + else: + none(PeerRecord) + +proc toDiscoveryId*(cid: Cid): NodeId = + ## To discovery id + readUintBE[256](keccak256.digest(cid.data.buffer).data) + +proc findBlockProviders*( + d: Discovery, + cid: Cid): Future[seq[SignedPeerRecord]] {.async.} = + return (await d.protocol.getProviders(cid.toDiscoveryId())).get() + +proc publishProvide*(d: Discovery, cid: Cid) {.async.} = + let bid = cid.toDiscoveryId() + discard await d.protocol.addProvider(bid, d.localInfo.signedPeerRecord) + + +proc start*(d: Discovery) {.async.} = + d.protocol.updateRecord(d.localInfo.signedPeerRecord).expect("updating SPR") + d.protocol.open() + d.protocol.start() + +proc stop*(d: Discovery) {.async.} = + await d.protocol.closeWait() diff --git a/dagger/node.nim b/dagger/node.nim index 926bcf88..5ef9d1c1 100644 --- a/dagger/node.nim +++ b/dagger/node.nim @@ -27,6 +27,7 @@ import ./stores/blockstore import ./blockexchange import ./streams import ./erasure +import ./discovery logScope: topics = "dagger node" @@ -40,11 +41,13 @@ type blockStore*: BlockStore engine*: BlockExcEngine erasure*: Erasure + discovery*: Discovery proc start*(node: DaggerNodeRef) {.async.} = await node.switch.start() await node.engine.start() await node.erasure.start() + await node.discovery.start() node.networkId = node.switch.peerInfo.peerId notice "Started dagger node", id = $node.networkId, addrs = node.switch.peerInfo.addrs @@ -55,11 +58,12 @@ proc stop*(node: DaggerNodeRef) {.async.} = await node.engine.stop() await node.switch.stop() await node.erasure.stop() + await node.discovery.stop() proc findPeer*( node: DaggerNodeRef, - peerId: PeerID): Future[?!PeerRecord] {.async.} = - discard + peerId: PeerID): Future[?PeerRecord] {.async.} = + return await node.discovery.findPeer(peerId) proc connect*( node: DaggerNodeRef, @@ -230,9 +234,11 @@ proc new*( switch: Switch, store: BlockStore, engine: BlockExcEngine, - erasure: Erasure): T = + erasure: Erasure, + discovery: Discovery): T = T( switch: switch, blockStore: store, engine: engine, - erasure: erasure) + erasure: erasure, + discovery: discovery) diff --git a/dagger/rest/api.nim b/dagger/rest/api.nim index 0d44cb7a..a6143688 100644 --- a/dagger/rest/api.nim +++ b/dagger/rest/api.nim @@ -106,15 +106,11 @@ proc initRestApi*(node: DaggerNodeRef): RestRouter = let addresses = if addrs.isOk and addrs.get().len > 0: addrs.get() else: - let peerRecord = await node.findPeer(peerId.get()) - if peerRecord.isErr: + without peerRecord =? (await node.findPeer(peerId.get())): return RestApiResponse.error( Http400, "Unable to find Peer!") - - peerRecord.get().addresses.mapIt( - it.address - ) + peerRecord.addresses.mapIt(it.address) try: await node.connect(peerId.get(), addresses) return RestApiResponse.response("Successfully connected to peer") diff --git a/dagger/stores/blockstore.nim b/dagger/stores/blockstore.nim index 4e8f39a5..90a06173 100644 --- a/dagger/stores/blockstore.nim +++ b/dagger/stores/blockstore.nim @@ -52,5 +52,11 @@ method hasBlock*(s: BlockStore, cid: Cid): bool {.base.} = return false +method blockList*(s: BlockStore): Future[seq[Cid]] {.base.} = + ## Get the list of blocks in the BlockStore. This is an intensive operation + ## + + raiseAssert("Not implemented!") + proc contains*(s: BlockStore, blk: Cid): bool = s.hasBlock(blk) diff --git a/dagger/stores/cachestore.nim b/dagger/stores/cachestore.nim index ba879398..ddcd730f 100644 --- a/dagger/stores/cachestore.nim +++ b/dagger/stores/cachestore.nim @@ -7,6 +7,7 @@ ## This file may not be copied, modified, or distributed except according to ## those terms. +import std/sequtils import pkg/upraises push: {.upraises: [].} @@ -67,6 +68,9 @@ method hasBlock*(self: CacheStore, cid: Cid): bool = cid in self.cache +method blockList*(s: CacheStore): Future[seq[Cid]] {.async.} = + return toSeq(s.cache.keys) + func putBlockSync(self: CacheStore, blk: Block): bool = let blkSize = blk.data.len # in bytes diff --git a/dagger/stores/fsstore.nim b/dagger/stores/fsstore.nim index 29027166..4870e3fd 100644 --- a/dagger/stores/fsstore.nim +++ b/dagger/stores/fsstore.nim @@ -129,6 +129,22 @@ method hasBlock*(self: FSStore, cid: Cid): bool = self.blockPath(cid).isFile() +method blockList*(s: FSStore): Future[seq[Cid]] {.async.} = + ## Very expensive AND blocking! + + debug "finding all blocks in store" + for (pkind, folderPath) in s.repoDir.walkDir(): + if pkind != pcDir: continue + let baseName = basename(folderPath) + if baseName.len != s.postfixLen: continue + + for (fkind, filePath) in folderPath.walkDir(false): + if fkind != pcFile: continue + let cid = Cid.init(basename(filePath)) + if cid.isOk: + result.add(cid.get()) + return result + proc new*( T: type FSStore, repoDir: string, diff --git a/tests/config.nims b/tests/config.nims new file mode 100644 index 00000000..c8062b36 --- /dev/null +++ b/tests/config.nims @@ -0,0 +1 @@ +patchFile("dagger", "discovery", "dagger/mockdiscovery") diff --git a/tests/dagger/blockexc/testblockexc.nim b/tests/dagger/blockexc/testblockexc.nim index 2657ef47..361c772f 100644 --- a/tests/dagger/blockexc/testblockexc.nim +++ b/tests/dagger/blockexc/testblockexc.nim @@ -1,4 +1,5 @@ import std/sequtils +import std/sugar import std/algorithm import pkg/asynctest @@ -7,11 +8,13 @@ import pkg/stew/byteutils import pkg/libp2p import pkg/libp2p/errors +import pkg/libp2pdht/discv5/protocol as discv5 import pkg/dagger/rng import pkg/dagger/stores import pkg/dagger/blockexchange import pkg/dagger/chunker +import pkg/dagger/discovery import pkg/dagger/blocktype as bt import ../helpers @@ -34,6 +37,7 @@ suite "NetworkStore engine - 2 nodes": blocks1, blocks2: seq[bt.Block] engine1, engine2: BlockExcEngine localStore1, localStore2: BlockStore + discovery1, discovery2: Discovery setup: while true: @@ -63,14 +67,16 @@ suite "NetworkStore engine - 2 nodes": peerId2 = switch2.peerInfo.peerId localStore1 = CacheStore.new(blocks1.mapIt( it )) + discovery1 = Discovery.new(switch1.peerInfo, Port(0)) network1 = BlockExcNetwork.new(switch = switch1) - engine1 = BlockExcEngine.new(localStore1, wallet1, network1) + engine1 = BlockExcEngine.new(localStore1, wallet1, network1, discovery1) blockexc1 = NetworkStore.new(engine1, localStore1) switch1.mount(network1) localStore2 = CacheStore.new(blocks2.mapIt( it )) + discovery2 = Discovery.new(switch2.peerInfo, Port(0)) network2 = BlockExcNetwork.new(switch = switch2) - engine2 = BlockExcEngine.new(localStore2, wallet2, network2) + engine2 = BlockExcEngine.new(localStore2, wallet2, network2, discovery2) blockexc2 = NetworkStore.new(engine2, localStore2) switch2.mount(network2) @@ -80,8 +86,8 @@ suite "NetworkStore engine - 2 nodes": ) # initialize our want lists - blockexc1.engine.wantList = blocks2.mapIt( it.cid ) - blockexc2.engine.wantList = blocks1.mapIt( it.cid ) + for b in blocks2: discard blockexc1.engine.discoverBlock(b.cid) + for b in blocks1: discard blockexc2.engine.discoverBlock(b.cid) pricing1.address = wallet1.address pricing2.address = wallet2.address @@ -92,7 +98,7 @@ suite "NetworkStore engine - 2 nodes": switch2.peerInfo.peerId, switch2.peerInfo.addrs) - await sleepAsync(1.seconds) # give some time to exchange lists + await sleepAsync(100.milliseconds) # give some time to exchange lists peerCtx2 = blockexc1.engine.getPeerCtx(peerId2) peerCtx1 = blockexc2.engine.getPeerCtx(peerId1) @@ -109,10 +115,10 @@ suite "NetworkStore engine - 2 nodes": check: peerCtx1.peerHave.mapIt( $it ).sorted(cmp[string]) == - blockexc2.engine.wantList.mapIt( $it ).sorted(cmp[string]) + toSeq(blockexc2.engine.runningDiscoveries.keys()).mapIt( $it ).sorted(cmp[string]) peerCtx2.peerHave.mapIt( $it ).sorted(cmp[string]) == - blockexc1.engine.wantList.mapIt( $it ).sorted(cmp[string]) + toSeq(blockexc1.engine.runningDiscoveries.keys()).mapIt( $it ).sorted(cmp[string]) test "exchanges accounts on connect": check peerCtx1.account.?address == pricing1.address.some @@ -169,8 +175,7 @@ suite "NetworkStore engine - 2 nodes": check wallet2.balance(channel, Asset) > 0 suite "NetworkStore - multiple nodes": - let - chunker = RandomChunker.new(Rng.instance(), size = 4096, chunkSize = 256) + let chunker = RandomChunker.new(Rng.instance(), size = 4096, chunkSize = 256) var switch: seq[Switch] @@ -208,8 +213,10 @@ suite "NetworkStore - multiple nodes": engine = downloader.engine # Add blocks from 1st peer to want list - engine.wantList &= blocks[0..3].mapIt( it.cid ) - engine.wantList &= blocks[12..15].mapIt( it.cid ) + for b in blocks[0..3]: + discard engine.discoverBlock(b.cid) + for b in blocks[12..15]: + discard engine.discoverBlock(b.cid) await allFutures( blocks[0..3].mapIt( blockexc[0].engine.localStore.putBlock(it) )) @@ -236,8 +243,10 @@ suite "NetworkStore - multiple nodes": engine = downloader.engine # Add blocks from 1st peer to want list - engine.wantList &= blocks[0..3].mapIt( it.cid ) - engine.wantList &= blocks[12..15].mapIt( it.cid ) + for b in blocks[0..3]: + discard engine.discoverBlock(b.cid) + for b in blocks[12..15]: + discard engine.discoverBlock(b.cid) await allFutures( blocks[0..3].mapIt( blockexc[0].engine.localStore.putBlock(it) )) @@ -254,3 +263,71 @@ suite "NetworkStore - multiple nodes": let wantListBlocks = await allFinished( blocks[0..3].mapIt( downloader.getBlock(it.cid) )) check wantListBlocks.mapIt( !it.read ) == blocks[0..3] + +suite "NetworkStore - discovery": + let chunker = RandomChunker.new(Rng.instance(), size = 4096, chunkSize = 256) + + var + switch: seq[Switch] + blockexc: seq[NetworkStore] + blocks: seq[bt.Block] + + setup: + while true: + let chunk = await chunker.getBytes() + if chunk.len <= 0: + break + + blocks.add(bt.Block.new(chunk).tryGet()) + + for e in generateNodes(4): + switch.add(e.switch) + blockexc.add(e.blockexc) + await e.blockexc.engine.start() + + await allFuturesThrowing( + switch.mapIt( it.start() ) + ) + + teardown: + await allFuturesThrowing( + switch.mapIt( it.stop() ) + ) + + switch = @[] + blockexc = @[] + + test "Shouldn't launch discovery request if we are already connected": + await blockexc[0].engine.blocksHandler(switch[1].peerInfo.peerId, blocks) + blockexc[0].engine.discovery.findBlockProviders_var = proc(d: Discovery, cid: Cid): seq[SignedPeerRecord] = + check false + await connectNodes(switch) + let blk = await blockexc[1].engine.requestBlock(blocks[0].cid) + + test "E2E discovery": + # Distribute the blocks amongst 1..3 + # Ask 0 to download everything without connecting him beforehand + + var advertised: Table[Cid, SignedPeerRecord] + + blockexc[1].engine.discovery.publishProvide_var = proc(d: Discovery, cid: Cid) = + advertised[cid] = switch[1].peerInfo.signedPeerRecord + + blockexc[2].engine.discovery.publishProvide_var = proc(d: Discovery, cid: Cid) = + advertised[cid] = switch[2].peerInfo.signedPeerRecord + + blockexc[3].engine.discovery.publishProvide_var = proc(d: Discovery, cid: Cid) = + advertised[cid] = switch[3].peerInfo.signedPeerRecord + + await blockexc[1].engine.blocksHandler(switch[0].peerInfo.peerId, blocks[0..5]) + await blockexc[2].engine.blocksHandler(switch[0].peerInfo.peerId, blocks[4..10]) + await blockexc[3].engine.blocksHandler(switch[0].peerInfo.peerId, blocks[10..15]) + + blockexc[0].engine.discovery.findBlockProviders_var = proc(d: Discovery, cid: Cid): seq[SignedPeerRecord] = + if cid in advertised: + result.add(advertised[cid]) + + let futs = collect(newSeq): + for b in blocks: + blockexc[0].engine.requestBlock(b.cid) + await allFutures(futs) diff --git a/tests/dagger/blockexc/testengine.nim b/tests/dagger/blockexc/testengine.nim index 885de748..9940c9c6 100644 --- a/tests/dagger/blockexc/testengine.nim +++ b/tests/dagger/blockexc/testengine.nim @@ -1,15 +1,19 @@ import std/sequtils import std/random +import std/algorithm import pkg/stew/byteutils import pkg/asynctest import pkg/chronos import pkg/libp2p +import pkg/libp2p/routing_record +import pkg/libp2pdht/discv5/protocol as discv5 import pkg/dagger/rng import pkg/dagger/blockexchange import pkg/dagger/stores import pkg/dagger/chunker +import pkg/dagger/discovery import pkg/dagger/blocktype as bt import pkg/dagger/utils/asyncheapqueue @@ -23,6 +27,7 @@ suite "NetworkStore engine basic": peerId = PeerID.init(seckey.getPublicKey().tryGet()).tryGet() chunker = RandomChunker.new(Rng.instance(), size = 1024, chunkSize = 256) wallet = WalletRef.example + discovery = Discovery.new() var blocks: seq[bt.Block] @@ -47,7 +52,7 @@ suite "NetworkStore engine basic": wantType: WantType = WantType.wantHave, full: bool = false, sendDontHave: bool = false) {.gcsafe.} = - check cids == blocks.mapIt( it.cid ) + check cids.mapIt($it).sorted == blocks.mapIt( $it.cid ).sorted done.complete() @@ -59,8 +64,10 @@ suite "NetworkStore engine basic": engine = BlockExcEngine.new( CacheStore.new(blocks.mapIt( it )), wallet, - network) - engine.wantList = blocks.mapIt( it.cid ) + network, + discovery) + for b in blocks: + discard engine.discoverBlock(b.cid) engine.setupPeer(peerId) await done @@ -77,7 +84,7 @@ suite "NetworkStore engine basic": sendAccount: sendAccount, )) - engine = BlockExcEngine.new(CacheStore.new, wallet, network) + engine = BlockExcEngine.new(CacheStore.new, wallet, network, discovery) engine.pricing = pricing.some engine.setupPeer(peerId) @@ -90,6 +97,7 @@ suite "NetworkStore engine handlers": peerId = PeerID.init(seckey.getPublicKey().tryGet()).tryGet() chunker = RandomChunker.new(Rng.instance(), size = 1024, chunkSize = 256) wallet = WalletRef.example + discovery = Discovery.new() var engine: BlockExcEngine @@ -106,7 +114,7 @@ suite "NetworkStore engine handlers": blocks.add(bt.Block.new(chunk).tryGet()) done = newFuture[void]() - engine = BlockExcEngine.new(CacheStore.new(), wallet, BlockExcNetwork()) + engine = BlockExcEngine.new(CacheStore.new(), wallet, BlockExcNetwork(), discovery) peerCtx = BlockExcPeerCtx( id: peerId ) @@ -230,7 +238,7 @@ suite "Task Handler": blocks.add(bt.Block.new(chunk).tryGet()) done = newFuture[void]() - engine = BlockExcEngine.new(CacheStore.new(), wallet, BlockExcNetwork()) + engine = BlockExcEngine.new(CacheStore.new(), wallet, BlockExcNetwork(), Discovery.new()) peersCtx = @[] for i in 0..3: diff --git a/tests/dagger/helpers/nodeutils.nim b/tests/dagger/helpers/nodeutils.nim index dc71d19f..e1ce5027 100644 --- a/tests/dagger/helpers/nodeutils.nim +++ b/tests/dagger/helpers/nodeutils.nim @@ -3,6 +3,7 @@ import std/sequtils import pkg/chronos import pkg/libp2p +import pkg/dagger/discovery import pkg/dagger/stores import pkg/dagger/blocktype as bt @@ -17,16 +18,15 @@ proc generateNodes*( for i in 0..