diff --git a/libp2p/cid.nim b/libp2p/cid.nim index 3594faaf1..93c88a42f 100644 --- a/libp2p/cid.nim +++ b/libp2p/cid.nim @@ -207,9 +207,8 @@ proc init*(ctype: typedesc[Cid], version: CidVersion, content: MultiCodec, raise newException(CidError, "Incorrect content type") result.mcodec = mcodec result.data = initVBuffer() - result.data.writeVarint(cast[uint64](version)) - result.data.write(hash.mcodec) - result.data.writeVarint(cast[uint64](content)) + result.data.writeVarint(cast[uint64](1)) + result.data.write(mcodec) result.hpos = len(result.data.buffer) result.data.write(hash) result.data.finish() diff --git a/libp2p/daemon/daemonapi.nim b/libp2p/daemon/daemonapi.nim index ccbdbae16..ec3de5190 100644 --- a/libp2p/daemon/daemonapi.nim +++ b/libp2p/daemon/daemonapi.nim @@ -10,7 +10,8 @@ ## This module implementes API for `go-libp2p-daemon`. import os, osproc, strutils, tables, streams, strtabs import asyncdispatch2 -import ../varint, ../multiaddress, ../protobuf/minprotobuf, ../base58 +import ../varint, ../multiaddress, ../base58, ../cid +import ../protobuf/minprotobuf when not defined(windows): import posix @@ -74,17 +75,24 @@ type PeerID* = seq[byte] MultiProtocol* = string - CID* = seq[byte] LibP2PPublicKey* = seq[byte] DHTValue* = seq[byte] P2PStreamFlags* {.pure.} = enum None, Closed, Inbound, Outbound - P2PDaemonFlags* {.pure.} = enum - DHTClient, DHTFull, Bootstrap, - Logging, Verbose, - PSFloodSub, PSGossipSub, PSSign, PSStrictSign + P2PDaemonFlags* = enum + DHTClient, ## Start daemon in DHT client mode + DHTFull, ## Start daemon with full DHT support + Bootstrap, ## Start daemon with bootstrap + WaitBootstrap, ## Start daemon with bootstrap and wait until daemon + ## establish connection to at least 2 peers + Logging, ## Enable capture daemon `stderr` + Verbose, ## Set daemon logging to DEBUG level + PSFloodSub, ## Enable `FloodSub` protocol in daemon + PSGossipSub, ## Enable `GossipSub` protocol in daemon + PSNoSign, ## Disable pubsub message signing (default true) + PSStrictSign ## Force strict checking pubsub message signature P2PStream* = ref object flags*: set[P2PStreamFlags] @@ -239,7 +247,7 @@ proc requestDHTFindPeersConnectedToPeer(peer: PeerID, result.write(initProtoField(5, msg)) result.finish() -proc requestDHTFindProviders(cid: CID, +proc requestDHTFindProviders(cid: Cid, count: uint32, timeout = 0): ProtoBuffer = ## https://github.com/libp2p/go-libp2p-daemon/blob/master/dht.go ## Processing function `doDHTFindProviders(req *pb.DHTRequest)`. @@ -247,7 +255,7 @@ proc requestDHTFindProviders(cid: CID, result = initProtoBuffer({WithVarintLength}) var msg = initProtoBuffer() msg.write(initProtoField(1, msgid)) - msg.write(initProtoField(3, cid)) + msg.write(initProtoField(3, cid.data.buffer)) msg.write(initProtoField(6, count)) if timeout > 0: msg.write(initProtoField(7, uint(timeout))) @@ -333,14 +341,14 @@ proc requestDHTPutValue(key: string, value: openarray[byte], result.write(initProtoField(5, msg)) result.finish() -proc requestDHTProvide(cid: CID, timeout = 0): ProtoBuffer = +proc requestDHTProvide(cid: Cid, timeout = 0): ProtoBuffer = ## https://github.com/libp2p/go-libp2p-daemon/blob/master/dht.go ## Processing function `doDHTProvide(req *pb.DHTRequest)`. let msgid = cast[uint](DHTRequestType.PROVIDE) result = initProtoBuffer({WithVarintLength}) var msg = initProtoBuffer() msg.write(initProtoField(1, msgid)) - msg.write(initProtoField(3, cid)) + msg.write(initProtoField(3, cid.data.buffer)) if timeout > 0: msg.write(initProtoField(7, uint(timeout))) msg.finish() @@ -515,6 +523,9 @@ else: # Not ready yet. discard +# This is forward declaration needed for newDaemonApi() +proc listPeers*(api: DaemonAPI): Future[seq[PeerInfo]] {.async.} + proc newDaemonApi*(flags: set[P2PDaemonFlags] = {}, bootstrapNodes: seq[string] = @[], id: string = "", @@ -523,8 +534,9 @@ proc newDaemonApi*(flags: set[P2PDaemonFlags] = {}, pattern = "/tmp/nim-p2pd-$1.sock", poolSize = 10, gossipsubHeartbeatInterval = 0, - gossipsubHeartbeatDelay = 0): Future[DaemonAPI] {.async.} = - ## Initialize connections to `go-libp2p-daemon` control socket. + gossipsubHeartbeatDelay = 0, + peersRequired = 2): Future[DaemonAPI] {.async.} = + ## Initialize connection to `go-libp2p-daemon` control socket. var api = new DaemonAPI var args = newSeq[string]() var env: StringTableRef @@ -551,21 +563,20 @@ proc newDaemonApi*(flags: set[P2PDaemonFlags] = {}, discard else: # DHTFull and DHTClient could not be present at the same time - if P2PDaemonFlags.DHTFull in flags and P2PDaemonFlags.DHTClient in flags: + if DHTFull in flags and DHTClient in flags: api.flags.excl(DHTClient) # PSGossipSub and PSFloodSub could not be present at the same time - if P2PDaemonFlags.PSGossipSub in flags and - P2PDaemonFlags.PSFloodSub in flags: + if PSGossipSub in flags and PSFloodSub in flags: api.flags.excl(PSFloodSub) - if P2PDaemonFlags.DHTFull in api.flags: + if DHTFull in api.flags: args.add("-dht") - if P2PDaemonFlags.DHTClient in api.flags: + if DHTClient in api.flags: args.add("-dhtClient") - if P2PDaemonFlags.Bootstrap in api.flags: + if {Bootstrap, WaitBootstrap} * api.flags != {}: args.add("-b") - if P2PDaemonFlags.Verbose in api.flags: + if Verbose in api.flags: env = newStringTable("IPFS_LOGGING", "debug", modeCaseSensitive) - if P2PDaemonFlags.PSGossipSub in api.flags: + if PSGossipSub in api.flags: args.add("-pubsub") args.add("-pubsubRouter=gossipsub") if gossipsubHeartbeatInterval != 0: @@ -574,13 +585,13 @@ proc newDaemonApi*(flags: set[P2PDaemonFlags] = {}, if gossipsubHeartbeatDelay != 0: let param = $gossipsubHeartbeatDelay & "ms" args.add("-gossipsubHeartbeatInitialDelay=" & param) - if P2PDaemonFlags.PSFloodSub in api.flags: + if PSFloodSub in api.flags: args.add("-pubsub") args.add("-pubsubRouter=floodsub") - if api.flags * {P2PDaemonFlags.PSFloodSub, P2PDaemonFlags.PSFloodSub} != {}: - if P2PDaemonFlags.PSSign in api.flags: - args.add("-pubsubSign=true") - if P2PDaemonFlags.PSStrictSign in api.flags: + if api.flags * {PSFloodSub, PSGossipSub} != {}: + if PSNoSign in api.flags: + args.add("-pubsubSign=false") + if PSStrictSign in api.flags: args.add("-pubsubSignStrict=true") if len(bootstrapNodes) > 0: args.add("-bootstrapPeers=" & bootstrapNodes.join(",")) @@ -602,7 +613,6 @@ proc newDaemonApi*(flags: set[P2PDaemonFlags] = {}, if api.sockname != sockpath: raise newException(DaemonLocalError, "Socket is already bound!") # Starting daemon process - # echo "Spawn [", cmd, " ", args.join(" "), "]" api.process = startProcess(cmd, "", args, env, {poStdErrToStdOut}) # Waiting until daemon will not be bound to control socket. while true: @@ -614,8 +624,17 @@ proc newDaemonApi*(flags: set[P2PDaemonFlags] = {}, break await sleepAsync(100) # api.pool = await newPool(api.address, poolsize = poolSize) - if P2PDaemonFlags.Logging in api.flags: + if Logging in api.flags: api.loggerFut = loggingHandler(api) + + if WaitBootstrap in api.flags: + while true: + var peers = await listPeers(api) + echo len(peers) + if len(peers) >= peersRequired: + break + await sleepAsync(1000) + result = api proc close*(stream: P2PStream) {.async.} = @@ -946,7 +965,7 @@ proc dhtPutValue*(api: DaemonAPI, key: string, value: seq[byte], finally: await api.closeConnection(transp) -proc dhtProvide*(api: DaemonAPI, cid: CID, timeout = 0) {.async.} = +proc dhtProvide*(api: DaemonAPI, cid: Cid, timeout = 0) {.async.} = ## Provide content with id ``cid``. ## ## You can specify timeout for DHT request with ``timeout`` value. ``0`` value @@ -1009,7 +1028,7 @@ proc dhtGetClosestPeers*(api: DaemonAPI, key: string, finally: await api.closeConnection(transp) -proc dhtFindProviders*(api: DaemonAPI, cid: CID, count: uint32, +proc dhtFindProviders*(api: DaemonAPI, cid: Cid, count: uint32, timeout = 0): Future[seq[PeerInfo]] {.async.} = ## Get ``count`` providers for content with id ``cid``. ## diff --git a/tests/testdaemon.nim b/tests/testdaemon.nim index 13d8b38a2..a7a0e12b7 100644 --- a/tests/testdaemon.nim +++ b/tests/testdaemon.nim @@ -1,6 +1,7 @@ import unittest import asyncdispatch2 -import ../libp2p/daemon/daemonapi, ../libp2p/multiaddress, ../libp2p/multicodec +import ../libp2p/daemon/daemonapi, ../libp2p/multiaddress, ../libp2p/multicodec, + ../libp2p/cid, ../libp2p/multihash proc identitySpawnTest(): Future[bool] {.async.} = var api = await newDaemonApi() @@ -37,16 +38,32 @@ proc connectStreamTest(): Future[bool] {.async.} = await api2.close() result = true -proc provideBadCidTest(): Future[bool] {.async.} = - var cid = newSeq[byte](10) - var api = await newDaemonApi({DHTFull}) - try: - await api.dhtProvide(cid) - result = false - except DaemonRemoteError: - result = true - finally: - await api.close() +proc provideCidTest(): Future[bool] {.async.} = + var api1 = await newDaemonApi({DHTFull}) + var api2 = await newDaemonApi({DHTFull}) + var msg = "ethereum2-beacon-chain" + var bmsg = cast[seq[byte]](msg) + var mh = MultiHash.digest("sha2-256", bmsg) + var cid = Cid.init(CIDv1, multiCodec("dag-pb"), mh) + + var id1 = await api1.identity() + var id2 = await api2.identity() + + await api1.connect(id2.peer, id2.addresses) + while true: + var peers = await api1.listPeers() + if len(peers) != 0: + break + + await api1.dhtProvide(cid) + var peers = await api2.dhtFindProviders(cid, 10) + + if len(peers) == 1: + if peers[0].peer == id1.peer: + result = true + + await api1.close() + await api2.close() # proc getOnlyOneIPv4Address(addresses: seq[MultiAddress]): seq[MultiAddress] = # ## We doing this becuase of bug in `go-pubsub` @@ -134,9 +151,9 @@ when isMainModule: test "Connect/Accept peer/stream test": check: waitFor(connectStreamTest()) == true - test "DHT provide bad CID test": + test "Provide CID test": check: - waitFor(provideBadCidTest()) == true + waitFor(provideCidTest()) == true test "GossipSub test": check: waitFor(pubsubTest({PSGossipSub})) == true