Fix cid with proper CIDv1 code.

Fix daemonapi to use proper Cid type.
Make daemonapi cid test more complex.
This commit is contained in:
cheatfate 2018-12-16 15:51:12 +02:00
parent d96756f6e3
commit bf2737525d
3 changed files with 80 additions and 45 deletions

View File

@ -207,9 +207,8 @@ proc init*(ctype: typedesc[Cid], version: CidVersion, content: MultiCodec,
raise newException(CidError, "Incorrect content type") raise newException(CidError, "Incorrect content type")
result.mcodec = mcodec result.mcodec = mcodec
result.data = initVBuffer() result.data = initVBuffer()
result.data.writeVarint(cast[uint64](version)) result.data.writeVarint(cast[uint64](1))
result.data.write(hash.mcodec) result.data.write(mcodec)
result.data.writeVarint(cast[uint64](content))
result.hpos = len(result.data.buffer) result.hpos = len(result.data.buffer)
result.data.write(hash) result.data.write(hash)
result.data.finish() result.data.finish()

View File

@ -10,7 +10,8 @@
## This module implementes API for `go-libp2p-daemon`. ## This module implementes API for `go-libp2p-daemon`.
import os, osproc, strutils, tables, streams, strtabs import os, osproc, strutils, tables, streams, strtabs
import asyncdispatch2 import asyncdispatch2
import ../varint, ../multiaddress, ../protobuf/minprotobuf, ../base58 import ../varint, ../multiaddress, ../base58, ../cid
import ../protobuf/minprotobuf
when not defined(windows): when not defined(windows):
import posix import posix
@ -74,17 +75,24 @@ type
PeerID* = seq[byte] PeerID* = seq[byte]
MultiProtocol* = string MultiProtocol* = string
CID* = seq[byte]
LibP2PPublicKey* = seq[byte] LibP2PPublicKey* = seq[byte]
DHTValue* = seq[byte] DHTValue* = seq[byte]
P2PStreamFlags* {.pure.} = enum P2PStreamFlags* {.pure.} = enum
None, Closed, Inbound, Outbound None, Closed, Inbound, Outbound
P2PDaemonFlags* {.pure.} = enum P2PDaemonFlags* = enum
DHTClient, DHTFull, Bootstrap, DHTClient, ## Start daemon in DHT client mode
Logging, Verbose, DHTFull, ## Start daemon with full DHT support
PSFloodSub, PSGossipSub, PSSign, PSStrictSign Bootstrap, ## Start daemon with bootstrap
WaitBootstrap, ## Start daemon with bootstrap and wait until daemon
## establish connection to at least 2 peers
Logging, ## Enable capture daemon `stderr`
Verbose, ## Set daemon logging to DEBUG level
PSFloodSub, ## Enable `FloodSub` protocol in daemon
PSGossipSub, ## Enable `GossipSub` protocol in daemon
PSNoSign, ## Disable pubsub message signing (default true)
PSStrictSign ## Force strict checking pubsub message signature
P2PStream* = ref object P2PStream* = ref object
flags*: set[P2PStreamFlags] flags*: set[P2PStreamFlags]
@ -239,7 +247,7 @@ proc requestDHTFindPeersConnectedToPeer(peer: PeerID,
result.write(initProtoField(5, msg)) result.write(initProtoField(5, msg))
result.finish() result.finish()
proc requestDHTFindProviders(cid: CID, proc requestDHTFindProviders(cid: Cid,
count: uint32, timeout = 0): ProtoBuffer = count: uint32, timeout = 0): ProtoBuffer =
## https://github.com/libp2p/go-libp2p-daemon/blob/master/dht.go ## https://github.com/libp2p/go-libp2p-daemon/blob/master/dht.go
## Processing function `doDHTFindProviders(req *pb.DHTRequest)`. ## Processing function `doDHTFindProviders(req *pb.DHTRequest)`.
@ -247,7 +255,7 @@ proc requestDHTFindProviders(cid: CID,
result = initProtoBuffer({WithVarintLength}) result = initProtoBuffer({WithVarintLength})
var msg = initProtoBuffer() var msg = initProtoBuffer()
msg.write(initProtoField(1, msgid)) msg.write(initProtoField(1, msgid))
msg.write(initProtoField(3, cid)) msg.write(initProtoField(3, cid.data.buffer))
msg.write(initProtoField(6, count)) msg.write(initProtoField(6, count))
if timeout > 0: if timeout > 0:
msg.write(initProtoField(7, uint(timeout))) msg.write(initProtoField(7, uint(timeout)))
@ -333,14 +341,14 @@ proc requestDHTPutValue(key: string, value: openarray[byte],
result.write(initProtoField(5, msg)) result.write(initProtoField(5, msg))
result.finish() result.finish()
proc requestDHTProvide(cid: CID, timeout = 0): ProtoBuffer = proc requestDHTProvide(cid: Cid, timeout = 0): ProtoBuffer =
## https://github.com/libp2p/go-libp2p-daemon/blob/master/dht.go ## https://github.com/libp2p/go-libp2p-daemon/blob/master/dht.go
## Processing function `doDHTProvide(req *pb.DHTRequest)`. ## Processing function `doDHTProvide(req *pb.DHTRequest)`.
let msgid = cast[uint](DHTRequestType.PROVIDE) let msgid = cast[uint](DHTRequestType.PROVIDE)
result = initProtoBuffer({WithVarintLength}) result = initProtoBuffer({WithVarintLength})
var msg = initProtoBuffer() var msg = initProtoBuffer()
msg.write(initProtoField(1, msgid)) msg.write(initProtoField(1, msgid))
msg.write(initProtoField(3, cid)) msg.write(initProtoField(3, cid.data.buffer))
if timeout > 0: if timeout > 0:
msg.write(initProtoField(7, uint(timeout))) msg.write(initProtoField(7, uint(timeout)))
msg.finish() msg.finish()
@ -515,6 +523,9 @@ else:
# Not ready yet. # Not ready yet.
discard discard
# This is forward declaration needed for newDaemonApi()
proc listPeers*(api: DaemonAPI): Future[seq[PeerInfo]] {.async.}
proc newDaemonApi*(flags: set[P2PDaemonFlags] = {}, proc newDaemonApi*(flags: set[P2PDaemonFlags] = {},
bootstrapNodes: seq[string] = @[], bootstrapNodes: seq[string] = @[],
id: string = "", id: string = "",
@ -523,8 +534,9 @@ proc newDaemonApi*(flags: set[P2PDaemonFlags] = {},
pattern = "/tmp/nim-p2pd-$1.sock", pattern = "/tmp/nim-p2pd-$1.sock",
poolSize = 10, poolSize = 10,
gossipsubHeartbeatInterval = 0, gossipsubHeartbeatInterval = 0,
gossipsubHeartbeatDelay = 0): Future[DaemonAPI] {.async.} = gossipsubHeartbeatDelay = 0,
## Initialize connections to `go-libp2p-daemon` control socket. peersRequired = 2): Future[DaemonAPI] {.async.} =
## Initialize connection to `go-libp2p-daemon` control socket.
var api = new DaemonAPI var api = new DaemonAPI
var args = newSeq[string]() var args = newSeq[string]()
var env: StringTableRef var env: StringTableRef
@ -551,21 +563,20 @@ proc newDaemonApi*(flags: set[P2PDaemonFlags] = {},
discard discard
else: else:
# DHTFull and DHTClient could not be present at the same time # DHTFull and DHTClient could not be present at the same time
if P2PDaemonFlags.DHTFull in flags and P2PDaemonFlags.DHTClient in flags: if DHTFull in flags and DHTClient in flags:
api.flags.excl(DHTClient) api.flags.excl(DHTClient)
# PSGossipSub and PSFloodSub could not be present at the same time # PSGossipSub and PSFloodSub could not be present at the same time
if P2PDaemonFlags.PSGossipSub in flags and if PSGossipSub in flags and PSFloodSub in flags:
P2PDaemonFlags.PSFloodSub in flags:
api.flags.excl(PSFloodSub) api.flags.excl(PSFloodSub)
if P2PDaemonFlags.DHTFull in api.flags: if DHTFull in api.flags:
args.add("-dht") args.add("-dht")
if P2PDaemonFlags.DHTClient in api.flags: if DHTClient in api.flags:
args.add("-dhtClient") args.add("-dhtClient")
if P2PDaemonFlags.Bootstrap in api.flags: if {Bootstrap, WaitBootstrap} * api.flags != {}:
args.add("-b") args.add("-b")
if P2PDaemonFlags.Verbose in api.flags: if Verbose in api.flags:
env = newStringTable("IPFS_LOGGING", "debug", modeCaseSensitive) env = newStringTable("IPFS_LOGGING", "debug", modeCaseSensitive)
if P2PDaemonFlags.PSGossipSub in api.flags: if PSGossipSub in api.flags:
args.add("-pubsub") args.add("-pubsub")
args.add("-pubsubRouter=gossipsub") args.add("-pubsubRouter=gossipsub")
if gossipsubHeartbeatInterval != 0: if gossipsubHeartbeatInterval != 0:
@ -574,13 +585,13 @@ proc newDaemonApi*(flags: set[P2PDaemonFlags] = {},
if gossipsubHeartbeatDelay != 0: if gossipsubHeartbeatDelay != 0:
let param = $gossipsubHeartbeatDelay & "ms" let param = $gossipsubHeartbeatDelay & "ms"
args.add("-gossipsubHeartbeatInitialDelay=" & param) args.add("-gossipsubHeartbeatInitialDelay=" & param)
if P2PDaemonFlags.PSFloodSub in api.flags: if PSFloodSub in api.flags:
args.add("-pubsub") args.add("-pubsub")
args.add("-pubsubRouter=floodsub") args.add("-pubsubRouter=floodsub")
if api.flags * {P2PDaemonFlags.PSFloodSub, P2PDaemonFlags.PSFloodSub} != {}: if api.flags * {PSFloodSub, PSGossipSub} != {}:
if P2PDaemonFlags.PSSign in api.flags: if PSNoSign in api.flags:
args.add("-pubsubSign=true") args.add("-pubsubSign=false")
if P2PDaemonFlags.PSStrictSign in api.flags: if PSStrictSign in api.flags:
args.add("-pubsubSignStrict=true") args.add("-pubsubSignStrict=true")
if len(bootstrapNodes) > 0: if len(bootstrapNodes) > 0:
args.add("-bootstrapPeers=" & bootstrapNodes.join(",")) args.add("-bootstrapPeers=" & bootstrapNodes.join(","))
@ -602,7 +613,6 @@ proc newDaemonApi*(flags: set[P2PDaemonFlags] = {},
if api.sockname != sockpath: if api.sockname != sockpath:
raise newException(DaemonLocalError, "Socket is already bound!") raise newException(DaemonLocalError, "Socket is already bound!")
# Starting daemon process # Starting daemon process
# echo "Spawn [", cmd, " ", args.join(" "), "]"
api.process = startProcess(cmd, "", args, env, {poStdErrToStdOut}) api.process = startProcess(cmd, "", args, env, {poStdErrToStdOut})
# Waiting until daemon will not be bound to control socket. # Waiting until daemon will not be bound to control socket.
while true: while true:
@ -614,8 +624,17 @@ proc newDaemonApi*(flags: set[P2PDaemonFlags] = {},
break break
await sleepAsync(100) await sleepAsync(100)
# api.pool = await newPool(api.address, poolsize = poolSize) # api.pool = await newPool(api.address, poolsize = poolSize)
if P2PDaemonFlags.Logging in api.flags: if Logging in api.flags:
api.loggerFut = loggingHandler(api) api.loggerFut = loggingHandler(api)
if WaitBootstrap in api.flags:
while true:
var peers = await listPeers(api)
echo len(peers)
if len(peers) >= peersRequired:
break
await sleepAsync(1000)
result = api result = api
proc close*(stream: P2PStream) {.async.} = proc close*(stream: P2PStream) {.async.} =
@ -946,7 +965,7 @@ proc dhtPutValue*(api: DaemonAPI, key: string, value: seq[byte],
finally: finally:
await api.closeConnection(transp) await api.closeConnection(transp)
proc dhtProvide*(api: DaemonAPI, cid: CID, timeout = 0) {.async.} = proc dhtProvide*(api: DaemonAPI, cid: Cid, timeout = 0) {.async.} =
## Provide content with id ``cid``. ## Provide content with id ``cid``.
## ##
## You can specify timeout for DHT request with ``timeout`` value. ``0`` value ## You can specify timeout for DHT request with ``timeout`` value. ``0`` value
@ -1009,7 +1028,7 @@ proc dhtGetClosestPeers*(api: DaemonAPI, key: string,
finally: finally:
await api.closeConnection(transp) await api.closeConnection(transp)
proc dhtFindProviders*(api: DaemonAPI, cid: CID, count: uint32, proc dhtFindProviders*(api: DaemonAPI, cid: Cid, count: uint32,
timeout = 0): Future[seq[PeerInfo]] {.async.} = timeout = 0): Future[seq[PeerInfo]] {.async.} =
## Get ``count`` providers for content with id ``cid``. ## Get ``count`` providers for content with id ``cid``.
## ##

View File

@ -1,6 +1,7 @@
import unittest import unittest
import asyncdispatch2 import asyncdispatch2
import ../libp2p/daemon/daemonapi, ../libp2p/multiaddress, ../libp2p/multicodec import ../libp2p/daemon/daemonapi, ../libp2p/multiaddress, ../libp2p/multicodec,
../libp2p/cid, ../libp2p/multihash
proc identitySpawnTest(): Future[bool] {.async.} = proc identitySpawnTest(): Future[bool] {.async.} =
var api = await newDaemonApi() var api = await newDaemonApi()
@ -37,16 +38,32 @@ proc connectStreamTest(): Future[bool] {.async.} =
await api2.close() await api2.close()
result = true result = true
proc provideBadCidTest(): Future[bool] {.async.} = proc provideCidTest(): Future[bool] {.async.} =
var cid = newSeq[byte](10) var api1 = await newDaemonApi({DHTFull})
var api = await newDaemonApi({DHTFull}) var api2 = await newDaemonApi({DHTFull})
try: var msg = "ethereum2-beacon-chain"
await api.dhtProvide(cid) var bmsg = cast[seq[byte]](msg)
result = false var mh = MultiHash.digest("sha2-256", bmsg)
except DaemonRemoteError: var cid = Cid.init(CIDv1, multiCodec("dag-pb"), mh)
result = true
finally: var id1 = await api1.identity()
await api.close() var id2 = await api2.identity()
await api1.connect(id2.peer, id2.addresses)
while true:
var peers = await api1.listPeers()
if len(peers) != 0:
break
await api1.dhtProvide(cid)
var peers = await api2.dhtFindProviders(cid, 10)
if len(peers) == 1:
if peers[0].peer == id1.peer:
result = true
await api1.close()
await api2.close()
# proc getOnlyOneIPv4Address(addresses: seq[MultiAddress]): seq[MultiAddress] = # proc getOnlyOneIPv4Address(addresses: seq[MultiAddress]): seq[MultiAddress] =
# ## We doing this becuase of bug in `go-pubsub` # ## We doing this becuase of bug in `go-pubsub`
@ -134,9 +151,9 @@ when isMainModule:
test "Connect/Accept peer/stream test": test "Connect/Accept peer/stream test":
check: check:
waitFor(connectStreamTest()) == true waitFor(connectStreamTest()) == true
test "DHT provide bad CID test": test "Provide CID test":
check: check:
waitFor(provideBadCidTest()) == true waitFor(provideCidTest()) == true
test "GossipSub test": test "GossipSub test":
check: check:
waitFor(pubsubTest({PSGossipSub})) == true waitFor(pubsubTest({PSGossipSub})) == true