Add logging go-libp2p-daemon.

Comment FloodSub test.
This commit is contained in:
cheatfate 2018-12-12 16:08:55 +02:00
parent 7d60e22782
commit 0eb649c990
4 changed files with 66 additions and 40 deletions

View File

@ -18,4 +18,4 @@ task test, "Runs the test suite":
exec "nim c -r tests/testmultiaddress"
exec "nim c -r tests/testmultihash"
exec "nim c -r tests/testmultibase"
exec "nim c -r tests/testdaemon"
exec "nim c -r tests/testdaemon"

View File

@ -8,7 +8,7 @@
## those terms.
## This module implementes API for `go-libp2p-daemon`.
import os, osproc, strutils, tables, streams
import os, osproc, strutils, tables, streams, strtabs
import asyncdispatch2
import ../varint, ../multiaddress, ../protobuf/minprotobuf, ../base58
@ -83,6 +83,7 @@ type
P2PDaemonFlags* {.pure.} = enum
DHTClient, DHTFull, Bootstrap,
Logging, Verbose,
PSFloodSub, PSGossipSub, PSSign, PSStrictSign
P2PStream* = ref object
@ -106,6 +107,8 @@ type
process*: Process
handlers*: Table[string, P2PStreamCallback]
servers*: seq[P2PServer]
log*: string
loggerFut*: Future[void]
PeerInfo* = object
peer*: PeerID
@ -483,6 +486,25 @@ proc socketExists(filename: string): bool =
var res: Stat
result = stat(filename, res) >= 0'i32
proc loggingHandler(api: DaemonAPI): Future[void] =
var retFuture = newFuture[void]("logging.handler")
var loop = getGlobalDispatcher()
let fd = wrapAsyncSocket(SocketHandle(api.process.outputHandle))
proc readOutputLoop(udata: pointer) {.gcsafe.} =
var buffer: array[2048, char]
let res = posix.read(cint(fd), addr buffer[0], 2000)
if res == -1 or res == 0:
removeReader(fd)
retFuture.complete()
else:
var cstr = cast[cstring](addr buffer[0])
api.log.add(cstr)
# let offset = len(api.log)
# api.log.setLen(offset + res)
# copyMem(addr api.log[offset], addr buffer[0], res)
addReader(fd, readOutputLoop, nil)
result = retFuture
proc newDaemonApi*(flags: set[P2PDaemonFlags] = {},
bootstrapNodes: seq[string] = @[],
id: string = "",
@ -495,6 +517,8 @@ proc newDaemonApi*(flags: set[P2PDaemonFlags] = {},
## Initialize connections to `go-libp2p-daemon` control socket.
var api = new DaemonAPI
var args = newSeq[string]()
var env: StringTableRef
api.flags = flags
api.servers = newSeq[P2PServer]()
api.pattern = pattern
@ -529,6 +553,8 @@ proc newDaemonApi*(flags: set[P2PDaemonFlags] = {},
args.add("-dhtClient")
if P2PDaemonFlags.Bootstrap in api.flags:
args.add("-b")
if P2PDaemonFlags.Verbose in api.flags:
env = newStringTable("IPFS_LOGGING", "debug", modeCaseSensitive)
if P2PDaemonFlags.PSGossipSub in api.flags:
args.add("-pubsub")
args.add("-pubsubRouter=gossipsub")
@ -567,7 +593,7 @@ proc newDaemonApi*(flags: set[P2PDaemonFlags] = {},
raise newException(DaemonLocalError, "Socket is already bound!")
# Starting daemon process
# echo "Spawn [", cmd, " ", args.join(" "), "]"
api.process = startProcess(cmd, "", args, options = {poStdErrToStdOut})
api.process = startProcess(cmd, "", args, env, {poStdErrToStdOut})
# Waiting until daemon will not be bound to control socket.
while true:
if not api.process.running():
@ -578,6 +604,8 @@ proc newDaemonApi*(flags: set[P2PDaemonFlags] = {},
break
await sleepAsync(100)
# api.pool = await newPool(api.address, poolsize = poolSize)
if P2PDaemonFlags.Logging in api.flags:
api.loggerFut = loggingHandler(api)
result = api
proc close*(stream: P2PStream) {.async.} =
@ -607,6 +635,9 @@ proc close*(api: DaemonAPI) {.async.} =
# Closing daemon's process.
api.process.kill()
discard api.process.waitForExit()
# Waiting for logger loop to exit
if not isNil(api.loggerFut):
await api.loggerFut
# Attempt to delete control socket endpoint.
if socketExists(api.sockname):
discard tryRemoveFile(api.sockname)

View File

@ -26,7 +26,7 @@ proc connectStreamTest(): Future[bool] {.async.} =
await api2.addHandler(protos, streamHandler)
await api1.connect(id2.peer, id2.addresses)
echo await api1.listPeers()
# echo await api1.listPeers()
var stream = await api1.openStream(id2.peer, protos)
let sent = await stream.transp.write(test & "\r\n")
doAssert(sent == len(test) + 2)
@ -48,13 +48,13 @@ proc provideBadCidTest(): Future[bool] {.async.} =
finally:
await api.close()
proc getOnlyIPv4Addresses(addresses: seq[MultiAddress]): seq[MultiAddress] =
if len(addresses) > 0:
result = newSeqOfCap[MultiAddress](len(addresses))
let ip4 = multiCodec("ip4")
for item in addresses:
if item.protoCode() == ip4:
result.add(item)
# proc getOnlyIPv4Addresses(addresses: seq[MultiAddress]): seq[MultiAddress] =
# if len(addresses) > 0:
# result = newSeqOfCap[MultiAddress](len(addresses))
# let ip4 = multiCodec("ip4")
# for item in addresses:
# if item.protoCode() == ip4:
# result.add(item)
proc pubsubTest(f: set[P2PDaemonFlags]): Future[bool] {.async.} =
var pubsubData = "TEST MESSAGE"
@ -67,15 +67,12 @@ proc pubsubTest(f: set[P2PDaemonFlags]): Future[bool] {.async.} =
var id1 = await api1.identity()
var id2 = await api2.identity()
echo $id1
echo $id2
var resultsCount = 0
var topics10 = await api1.pubsubGetTopics()
var peers10 = await api1.pubsubListPeers("test-topic")
var topics20 = await api2.pubsubGetTopics()
var peers20 = await api2.pubsubListPeers("test-topic")
# var topics10 = await api1.pubsubGetTopics()
# var peers10 = await api1.pubsubListPeers("test-topic")
# var topics20 = await api2.pubsubGetTopics()
# var peers20 = await api2.pubsubListPeers("test-topic")
var handlerFuture1 = newFuture[void]()
var handlerFuture2 = newFuture[void]()
@ -100,28 +97,27 @@ proc pubsubTest(f: set[P2PDaemonFlags]): Future[bool] {.async.} =
# Callback must return `false` to close subscription channel.
result = false
if len(topics10) == 0 and len(peers10) == 0 and
len(topics20) == 0 and len(peers20) == 0:
# Not subscribed to any topics everything must be 0.
# Not subscribed to any topics everything must be 0.
await api1.connect(id2.peer, id2.addresses)
await api2.connect(id1.peer, id1.addresses)
await api1.connect(id2.peer, getOnlyIPv4Addresses(id2.addresses))
await api2.connect(id1.peer, getOnlyIPv4Addresses(id1.addresses))
var ticket1 = await api1.pubsubSubscribe("test-topic", pubsubHandler1)
var ticket2 = await api2.pubsubSubscribe("test-topic", pubsubHandler2)
var ticket1 = await api1.pubsubSubscribe("test-topic", pubsubHandler1)
var ticket2 = await api2.pubsubSubscribe("test-topic", pubsubHandler2)
await sleepAsync(2000)
var topics1 = await api1.pubsubGetTopics()
var topics2 = await api2.pubsubGetTopics()
var topics1 = await api1.pubsubGetTopics()
var topics2 = await api2.pubsubGetTopics()
if len(topics1) == 1 and len(topics2) == 1:
var peers1 = await api1.pubsubListPeers("test-topic")
var peers2 = await api2.pubsubListPeers("test-topic")
if len(peers1) == 1 and len(peers2) == 1:
# Publish test data via api1.
await sleepAsync(500)
await api1.pubsubPublish("test-topic", msgData)
var andfut = handlerFuture1 and handlerFuture2
await andfut or sleepAsync(10000)
if len(topics1) == 1 and len(topics2) == 1:
var peers1 = await api1.pubsubListPeers("test-topic")
var peers2 = await api2.pubsubListPeers("test-topic")
if len(peers1) == 1 and len(peers2) == 1:
# Publish test data via api1.
await sleepAsync(500)
await api1.pubsubPublish("test-topic", msgData)
var andfut = handlerFuture1 and handlerFuture2
await andfut or sleepAsync(10000)
await api1.close()
await api2.close()
@ -142,6 +138,6 @@ when isMainModule:
test "GossipSub test":
check:
waitFor(pubsubTest({PSGossipSub})) == true
test "FloodSub test":
check:
waitFor(pubsubTest({PSFloodSub})) == true
# test "FloodSub test":
# check:
# waitFor(pubsubTest({PSFloodSub})) == true

View File

@ -304,4 +304,3 @@ suite "MultiBase test suite":
check:
r1 == true
r2 == true