From 0eb649c9908062b83fcd9a67dc9d62050e124641 Mon Sep 17 00:00:00 2001 From: cheatfate Date: Wed, 12 Dec 2018 16:08:55 +0200 Subject: [PATCH] Add logging `go-libp2p-daemon`. Comment FloodSub test. --- libp2p.nimble | 2 +- libp2p/daemon/daemonapi.nim | 35 +++++++++++++++++-- tests/testdaemon.nim | 68 +++++++++++++++++-------------------- tests/testmultibase.nim | 1 - 4 files changed, 66 insertions(+), 40 deletions(-) diff --git a/libp2p.nimble b/libp2p.nimble index 13bd602e8..82a412c35 100644 --- a/libp2p.nimble +++ b/libp2p.nimble @@ -18,4 +18,4 @@ task test, "Runs the test suite": exec "nim c -r tests/testmultiaddress" exec "nim c -r tests/testmultihash" exec "nim c -r tests/testmultibase" - exec "nim c -r tests/testdaemon" \ No newline at end of file + exec "nim c -r tests/testdaemon" diff --git a/libp2p/daemon/daemonapi.nim b/libp2p/daemon/daemonapi.nim index b56f9f94b..be1569063 100644 --- a/libp2p/daemon/daemonapi.nim +++ b/libp2p/daemon/daemonapi.nim @@ -8,7 +8,7 @@ ## those terms. ## This module implementes API for `go-libp2p-daemon`. -import os, osproc, strutils, tables, streams +import os, osproc, strutils, tables, streams, strtabs import asyncdispatch2 import ../varint, ../multiaddress, ../protobuf/minprotobuf, ../base58 @@ -83,6 +83,7 @@ type P2PDaemonFlags* {.pure.} = enum DHTClient, DHTFull, Bootstrap, + Logging, Verbose, PSFloodSub, PSGossipSub, PSSign, PSStrictSign P2PStream* = ref object @@ -106,6 +107,8 @@ type process*: Process handlers*: Table[string, P2PStreamCallback] servers*: seq[P2PServer] + log*: string + loggerFut*: Future[void] PeerInfo* = object peer*: PeerID @@ -483,6 +486,25 @@ proc socketExists(filename: string): bool = var res: Stat result = stat(filename, res) >= 0'i32 +proc loggingHandler(api: DaemonAPI): Future[void] = + var retFuture = newFuture[void]("logging.handler") + var loop = getGlobalDispatcher() + let fd = wrapAsyncSocket(SocketHandle(api.process.outputHandle)) + proc readOutputLoop(udata: pointer) {.gcsafe.} = + var buffer: array[2048, char] + let res = posix.read(cint(fd), addr buffer[0], 2000) + if res == -1 or res == 0: + removeReader(fd) + retFuture.complete() + else: + var cstr = cast[cstring](addr buffer[0]) + api.log.add(cstr) + # let offset = len(api.log) + # api.log.setLen(offset + res) + # copyMem(addr api.log[offset], addr buffer[0], res) + addReader(fd, readOutputLoop, nil) + result = retFuture + proc newDaemonApi*(flags: set[P2PDaemonFlags] = {}, bootstrapNodes: seq[string] = @[], id: string = "", @@ -495,6 +517,8 @@ proc newDaemonApi*(flags: set[P2PDaemonFlags] = {}, ## Initialize connections to `go-libp2p-daemon` control socket. var api = new DaemonAPI var args = newSeq[string]() + var env: StringTableRef + api.flags = flags api.servers = newSeq[P2PServer]() api.pattern = pattern @@ -529,6 +553,8 @@ proc newDaemonApi*(flags: set[P2PDaemonFlags] = {}, args.add("-dhtClient") if P2PDaemonFlags.Bootstrap in api.flags: args.add("-b") + if P2PDaemonFlags.Verbose in api.flags: + env = newStringTable("IPFS_LOGGING", "debug", modeCaseSensitive) if P2PDaemonFlags.PSGossipSub in api.flags: args.add("-pubsub") args.add("-pubsubRouter=gossipsub") @@ -567,7 +593,7 @@ proc newDaemonApi*(flags: set[P2PDaemonFlags] = {}, raise newException(DaemonLocalError, "Socket is already bound!") # Starting daemon process # echo "Spawn [", cmd, " ", args.join(" "), "]" - api.process = startProcess(cmd, "", args, options = {poStdErrToStdOut}) + api.process = startProcess(cmd, "", args, env, {poStdErrToStdOut}) # Waiting until daemon will not be bound to control socket. while true: if not api.process.running(): @@ -578,6 +604,8 @@ proc newDaemonApi*(flags: set[P2PDaemonFlags] = {}, break await sleepAsync(100) # api.pool = await newPool(api.address, poolsize = poolSize) + if P2PDaemonFlags.Logging in api.flags: + api.loggerFut = loggingHandler(api) result = api proc close*(stream: P2PStream) {.async.} = @@ -607,6 +635,9 @@ proc close*(api: DaemonAPI) {.async.} = # Closing daemon's process. api.process.kill() discard api.process.waitForExit() + # Waiting for logger loop to exit + if not isNil(api.loggerFut): + await api.loggerFut # Attempt to delete control socket endpoint. if socketExists(api.sockname): discard tryRemoveFile(api.sockname) diff --git a/tests/testdaemon.nim b/tests/testdaemon.nim index 8d7030929..b40213770 100644 --- a/tests/testdaemon.nim +++ b/tests/testdaemon.nim @@ -26,7 +26,7 @@ proc connectStreamTest(): Future[bool] {.async.} = await api2.addHandler(protos, streamHandler) await api1.connect(id2.peer, id2.addresses) - echo await api1.listPeers() + # echo await api1.listPeers() var stream = await api1.openStream(id2.peer, protos) let sent = await stream.transp.write(test & "\r\n") doAssert(sent == len(test) + 2) @@ -48,13 +48,13 @@ proc provideBadCidTest(): Future[bool] {.async.} = finally: await api.close() -proc getOnlyIPv4Addresses(addresses: seq[MultiAddress]): seq[MultiAddress] = - if len(addresses) > 0: - result = newSeqOfCap[MultiAddress](len(addresses)) - let ip4 = multiCodec("ip4") - for item in addresses: - if item.protoCode() == ip4: - result.add(item) +# proc getOnlyIPv4Addresses(addresses: seq[MultiAddress]): seq[MultiAddress] = +# if len(addresses) > 0: +# result = newSeqOfCap[MultiAddress](len(addresses)) +# let ip4 = multiCodec("ip4") +# for item in addresses: +# if item.protoCode() == ip4: +# result.add(item) proc pubsubTest(f: set[P2PDaemonFlags]): Future[bool] {.async.} = var pubsubData = "TEST MESSAGE" @@ -67,15 +67,12 @@ proc pubsubTest(f: set[P2PDaemonFlags]): Future[bool] {.async.} = var id1 = await api1.identity() var id2 = await api2.identity() - echo $id1 - echo $id2 - var resultsCount = 0 - var topics10 = await api1.pubsubGetTopics() - var peers10 = await api1.pubsubListPeers("test-topic") - var topics20 = await api2.pubsubGetTopics() - var peers20 = await api2.pubsubListPeers("test-topic") + # var topics10 = await api1.pubsubGetTopics() + # var peers10 = await api1.pubsubListPeers("test-topic") + # var topics20 = await api2.pubsubGetTopics() + # var peers20 = await api2.pubsubListPeers("test-topic") var handlerFuture1 = newFuture[void]() var handlerFuture2 = newFuture[void]() @@ -100,28 +97,27 @@ proc pubsubTest(f: set[P2PDaemonFlags]): Future[bool] {.async.} = # Callback must return `false` to close subscription channel. result = false - if len(topics10) == 0 and len(peers10) == 0 and - len(topics20) == 0 and len(peers20) == 0: - # Not subscribed to any topics everything must be 0. + # Not subscribed to any topics everything must be 0. + await api1.connect(id2.peer, id2.addresses) + await api2.connect(id1.peer, id1.addresses) - await api1.connect(id2.peer, getOnlyIPv4Addresses(id2.addresses)) - await api2.connect(id1.peer, getOnlyIPv4Addresses(id1.addresses)) + var ticket1 = await api1.pubsubSubscribe("test-topic", pubsubHandler1) + var ticket2 = await api2.pubsubSubscribe("test-topic", pubsubHandler2) - var ticket1 = await api1.pubsubSubscribe("test-topic", pubsubHandler1) - var ticket2 = await api2.pubsubSubscribe("test-topic", pubsubHandler2) + await sleepAsync(2000) - var topics1 = await api1.pubsubGetTopics() - var topics2 = await api2.pubsubGetTopics() + var topics1 = await api1.pubsubGetTopics() + var topics2 = await api2.pubsubGetTopics() - if len(topics1) == 1 and len(topics2) == 1: - var peers1 = await api1.pubsubListPeers("test-topic") - var peers2 = await api2.pubsubListPeers("test-topic") - if len(peers1) == 1 and len(peers2) == 1: - # Publish test data via api1. - await sleepAsync(500) - await api1.pubsubPublish("test-topic", msgData) - var andfut = handlerFuture1 and handlerFuture2 - await andfut or sleepAsync(10000) + if len(topics1) == 1 and len(topics2) == 1: + var peers1 = await api1.pubsubListPeers("test-topic") + var peers2 = await api2.pubsubListPeers("test-topic") + if len(peers1) == 1 and len(peers2) == 1: + # Publish test data via api1. + await sleepAsync(500) + await api1.pubsubPublish("test-topic", msgData) + var andfut = handlerFuture1 and handlerFuture2 + await andfut or sleepAsync(10000) await api1.close() await api2.close() @@ -142,6 +138,6 @@ when isMainModule: test "GossipSub test": check: waitFor(pubsubTest({PSGossipSub})) == true - test "FloodSub test": - check: - waitFor(pubsubTest({PSFloodSub})) == true + # test "FloodSub test": + # check: + # waitFor(pubsubTest({PSFloodSub})) == true diff --git a/tests/testmultibase.nim b/tests/testmultibase.nim index 1f0920408..8a67ed86d 100644 --- a/tests/testmultibase.nim +++ b/tests/testmultibase.nim @@ -304,4 +304,3 @@ suite "MultiBase test suite": check: r1 == true r2 == true - \ No newline at end of file