Add go-libp2p-daemon logging to file.
Fix mistypes in tests.
Fix issues with connection to dedicated go-libp2p2-daemon.
This commit is contained in:
cheatfate 2019-08-22 00:42:44 +03:00
parent 2670cd8433
commit 2ab6b2c3ca
No known key found for this signature in database
GPG Key ID: 46ADD633A7201F95
3 changed files with 57 additions and 83 deletions

View File

@ -14,7 +14,7 @@ import ../varint, ../multiaddress, ../multicodec, ../base58, ../cid, ../peer
import ../wire, ../multihash, ../protobuf/minprotobuf import ../wire, ../multihash, ../protobuf/minprotobuf
import ../crypto/crypto import ../crypto/crypto
export peer, multiaddress, multicodec, multihash, cid, crypto export peer, multiaddress, multicodec, multihash, cid, crypto, wire
when not defined(windows): when not defined(windows):
import posix import posix
@ -28,6 +28,9 @@ const
DefaultDaemonFile* = "p2pd" DefaultDaemonFile* = "p2pd"
type type
IpfsLogLevel* {.pure.} = enum
Critical, Error, Warning, Notice, Info, Debug, Trace
RequestType* {.pure.} = enum RequestType* {.pure.} = enum
IDENTITY = 0, IDENTITY = 0,
CONNECT = 1, CONNECT = 1,
@ -91,13 +94,12 @@ type
Bootstrap, ## Start daemon with bootstrap Bootstrap, ## Start daemon with bootstrap
WaitBootstrap, ## Start daemon with bootstrap and wait until daemon WaitBootstrap, ## Start daemon with bootstrap and wait until daemon
## establish connection to at least 2 peers ## establish connection to at least 2 peers
Logging, ## Enable capture daemon `stderr`
Verbose, ## Set daemon logging to DEBUG level
PSFloodSub, ## Enable `FloodSub` protocol in daemon PSFloodSub, ## Enable `FloodSub` protocol in daemon
PSGossipSub, ## Enable `GossipSub` protocol in daemon PSGossipSub, ## Enable `GossipSub` protocol in daemon
PSNoSign, ## Disable pubsub message signing (default true) PSNoSign, ## Disable pubsub message signing (default true)
PSStrictSign, ## Force strict checking pubsub message signature PSStrictSign, ## Force strict checking pubsub message signature
NATPortMap ## Force daemon to use NAT-PMP. NATPortMap, ## Force daemon to use NAT-PMP.
NoProcessCtrl ## Process was not spawned.
P2PStream* = ref object P2PStream* = ref object
flags*: set[P2PStreamFlags] flags*: set[P2PStreamFlags]
@ -119,8 +121,6 @@ type
process*: Process process*: Process
handlers*: Table[string, P2PStreamCallback] handlers*: Table[string, P2PStreamCallback]
servers*: seq[P2PServer] servers*: seq[P2PServer]
log*: string
loggerFut*: Future[void]
userData*: RootRef userData*: RootRef
PeerInfo* = object PeerInfo* = object
@ -131,6 +131,7 @@ type
topic*: string topic*: string
handler*: P2PPubSubCallback handler*: P2PPubSubCallback
transp*: StreamTransport transp*: StreamTransport
loopFut*: Future[void]
PubSubMessage* = object PubSubMessage* = object
peer*: PeerID peer*: PeerID
@ -500,42 +501,14 @@ proc socketExists(address: MultiAddress): Future[bool] {.async.} =
except: except:
result = false result = false
when not defined(windows): when defined(windows):
proc loggingHandler(api: DaemonAPI): Future[void] =
var retFuture = newFuture[void]("logging.handler")
var loop = getGlobalDispatcher()
let pfd = SocketHandle(api.process.outputHandle)
var fd = AsyncFD(pfd)
if not setSocketBlocking(pfd, false):
discard close(cint(pfd))
retFuture.fail(newException(OSError, osErrorMsg(osLastError())))
proc readOutputLoop(udata: pointer) {.gcsafe.} =
var buffer: array[2048, char]
let res = posix.read(cint(fd), addr buffer[0], 2000)
if res == -1 or res == 0:
removeReader(fd)
retFuture.complete()
else:
var cstr = cast[cstring](addr buffer[0])
api.log.add(cstr)
register(AsyncFD(pfd))
addReader(fd, readOutputLoop, nil)
result = retFuture
proc getProcessId(): int =
result = posix.getpid()
else:
proc getCurrentProcessId(): uint32 {.stdcall, dynlib: "kernel32", proc getCurrentProcessId(): uint32 {.stdcall, dynlib: "kernel32",
importc: "GetCurrentProcessId".} importc: "GetCurrentProcessId".}
proc loggingHandler(api: DaemonAPI): Future[void] =
# Not ready yet.
discard
proc getProcessId(): int = proc getProcessId(): int =
# Not ready yet
result = cast[int](getCurrentProcessId()) result = cast[int](getCurrentProcessId())
else:
proc getProcessId(): int =
result = cast[int](posix.getpid())
proc getSocket(pattern: string, proc getSocket(pattern: string,
count: ptr int): Future[MultiAddress] {.async.} = count: ptr int): Future[MultiAddress] {.async.} =
@ -568,12 +541,11 @@ proc getSocket(pattern: string,
# This is forward declaration needed for newDaemonApi() # This is forward declaration needed for newDaemonApi()
proc listPeers*(api: DaemonAPI): Future[seq[PeerInfo]] {.async.} proc listPeers*(api: DaemonAPI): Future[seq[PeerInfo]] {.async.}
when not defined(windows): proc copyEnv(): StringTableRef =
proc copyEnv(): StringTableRef = ## This procedure copy all environment variables into StringTable.
## This procedure copy all environment variables into StringTable. result = newStringTable(modeStyleInsensitive)
result = newStringTable(modeStyleInsensitive) for key, val in envPairs():
for key, val in envPairs(): result[key] = val
result[key] = val
proc newDaemonApi*(flags: set[P2PDaemonFlags] = {}, proc newDaemonApi*(flags: set[P2PDaemonFlags] = {},
bootstrapNodes: seq[string] = @[], bootstrapNodes: seq[string] = @[],
@ -587,7 +559,9 @@ proc newDaemonApi*(flags: set[P2PDaemonFlags] = {},
poolSize = 10, poolSize = 10,
gossipsubHeartbeatInterval = 0, gossipsubHeartbeatInterval = 0,
gossipsubHeartbeatDelay = 0, gossipsubHeartbeatDelay = 0,
peersRequired = 2): Future[DaemonAPI] {.async.} = peersRequired = 2,
logFile = "",
logLevel = IpfsLogLevel.Debug): Future[DaemonAPI] {.async.} =
## Initialize connection to `go-libp2p-daemon` control socket. ## Initialize connection to `go-libp2p-daemon` control socket.
## ##
## ``flags`` - set of P2PDaemonFlags. ## ``flags`` - set of P2PDaemonFlags.
@ -630,6 +604,12 @@ proc newDaemonApi*(flags: set[P2PDaemonFlags] = {},
## ``peersRequired`` - Wait until `go-libp2p-daemon` will connect to at least ## ``peersRequired`` - Wait until `go-libp2p-daemon` will connect to at least
## ``peersRequired`` peers before return from `newDaemonApi()` procedure ## ``peersRequired`` peers before return from `newDaemonApi()` procedure
## (default: 2). ## (default: 2).
##
## ``logFile`` - Enable ``go-libp2p-daemon`` logging and store it to file
## ``logFile`` (default: "", no logging)
##
## ``logLevel`` - Set ``go-libp2p-daemon`` logging verbosity level to
## ``logLevel`` (default: Trace)
var api = new DaemonAPI var api = new DaemonAPI
var args = newSeq[string]() var args = newSeq[string]()
var env: StringTableRef var env: StringTableRef
@ -660,12 +640,16 @@ proc newDaemonApi*(flags: set[P2PDaemonFlags] = {},
api.handlers = initTable[string, P2PStreamCallback]() api.handlers = initTable[string, P2PStreamCallback]()
if len(sockpath) == 0: if len(sockpath) == 0:
api.flags.excl(NoProcessCtrl)
api.address = await getSocket(patternForSocket, addr daemonsCount) api.address = await getSocket(patternForSocket, addr daemonsCount)
else: else:
api.address = MultiAddress.init(sockpath) api.address = MultiAddress.init(sockpath)
api.flags.incl(NoProcessCtrl)
let res = await socketExists(api.address) let res = await socketExists(api.address)
if not res: if not res:
raise newException(DaemonLocalError, "Could not connect to remote daemon") raise newException(DaemonLocalError, "Could not connect to remote daemon")
result = api
return
# DHTFull and DHTClient could not be present at the same time # DHTFull and DHTClient could not be present at the same time
if DHTFull in flags and DHTClient in flags: if DHTFull in flags and DHTClient in flags:
@ -679,16 +663,27 @@ proc newDaemonApi*(flags: set[P2PDaemonFlags] = {},
args.add("-dhtClient") args.add("-dhtClient")
if {Bootstrap, WaitBootstrap} * api.flags != {}: if {Bootstrap, WaitBootstrap} * api.flags != {}:
args.add("-b") args.add("-b")
if Verbose in api.flags: if len(logFile) != 0:
when defined(windows): env = copyEnv()
# Currently enabling logging output is not a good idea, because we can't env["IPFS_LOGGING_FMT"] = "nocolor"
# properly read process' stdout/stderr it can stuck on Windows. env["GOLOG_FILE"] = logFile
env = nil case logLevel
else: of IpfsLogLevel.Critical:
env = copyEnv() env["IPFS_LOGGING"] = "CRITICAL"
env["IPFS_LOGGING"] = "debug" of IpfsLogLevel.Error:
else: env["IPFS_LOGGING"] = "ERROR"
env = nil of IpfsLogLevel.Warning:
env["IPFS_LOGGING"] = "WARNING"
of IpfsLogLevel.Notice:
env["IPFS_LOGGING"] = "NOTICE"
of IpfsLogLevel.Info:
env["IPFS_LOGGING"] = "INFO"
of IpfsLogLevel.Debug:
env["IPFS_LOGGING"] = "DEBUG"
of IpfsLogLevel.Trace:
env["IPFS_LOGGING"] = "DEBUG"
env["GOLOG_TRACING_FILE"] = logFile
if PSGossipSub in api.flags: if PSGossipSub in api.flags:
args.add("-pubsub") args.add("-pubsub")
args.add("-pubsubRouter=gossipsub") args.add("-pubsubRouter=gossipsub")
@ -745,9 +740,6 @@ proc newDaemonApi*(flags: set[P2PDaemonFlags] = {},
break break
await sleepAsync(500.milliseconds) await sleepAsync(500.milliseconds)
if Logging in api.flags:
api.loggerFut = loggingHandler(api)
if WaitBootstrap in api.flags: if WaitBootstrap in api.flags:
while true: while true:
var peers = await listPeers(api) var peers = await listPeers(api)
@ -783,11 +775,9 @@ proc close*(api: DaemonAPI) {.async.} =
discard tryRemoveFile($address) discard tryRemoveFile($address)
api.servers.setLen(0) api.servers.setLen(0)
# Closing daemon's process. # Closing daemon's process.
api.process.kill() if NoProcessCtrl in api.flags:
discard api.process.waitForExit() api.process.terminate()
# Waiting for logger loop to exit discard api.process.waitForExit()
if not isNil(api.loggerFut):
await api.loggerFut
# Attempt to delete unix socket endpoint. # Attempt to delete unix socket endpoint.
let address = initTAddress(api.address) let address = initTAddress(api.address)
if address.family == AddressFamily.Unix: if address.family == AddressFamily.Unix:
@ -1291,7 +1281,7 @@ proc pubsubSubscribe*(api: DaemonAPI, topic: string,
ticket.topic = topic ticket.topic = topic
ticket.handler = handler ticket.handler = handler
ticket.transp = transp ticket.transp = transp
asyncCheck pubsubLoop(api, ticket) ticket.loopFut = pubsubLoop(api, ticket)
result = ticket result = ticket
except: except:
await api.closeConnection(transp) await api.closeConnection(transp)

View File

@ -66,24 +66,13 @@ proc provideCidTest(): Future[bool] {.async.} =
await api1.close() await api1.close()
await api2.close() await api2.close()
# proc getOnlyOneIPv4Address(addresses: seq[MultiAddress]): seq[MultiAddress] =
# ## We doing this becuase of bug in `go-pubsub`
# ## https://github.com/libp2p/go-libp2p-pubsub/issues/130
# if len(addresses) > 0:
# result = newSeqOfCap[MultiAddress](len(addresses))
# let ip4 = multiCodec("ip4")
# for item in addresses:
# if item.protoCode() == ip4:
# result.add(item)
# break
proc pubsubTest(f: set[P2PDaemonFlags]): Future[bool] {.async.} = proc pubsubTest(f: set[P2PDaemonFlags]): Future[bool] {.async.} =
var pubsubData = "TEST MESSAGE" var pubsubData = "TEST MESSAGE"
var msgData = cast[seq[byte]](pubsubData) var msgData = cast[seq[byte]](pubsubData)
var api1, api2: DaemonAPI var api1, api2: DaemonAPI
api1 = await newDaemonApi(f + {Verbose, Logging}) api1 = await newDaemonApi(f)
api2 = await newDaemonApi(f + {Verbose, Logging}) api2 = await newDaemonApi(f)
var id1 = await api1.identity() var id1 = await api1.identity()
var id2 = await api2.identity() var id2 = await api2.identity()
@ -138,11 +127,6 @@ proc pubsubTest(f: set[P2PDaemonFlags]): Future[bool] {.async.} =
await api2.close() await api2.close()
if resultsCount == 2: if resultsCount == 2:
result = true result = true
else:
echo " -- CLIENT1 -- "
echo api1.log
echo " -- CLIENT2 -- "
echo api2.log
when isMainModule: when isMainModule:
suite "libp2p-daemon test suite": suite "libp2p-daemon test suite":

View File

@ -555,7 +555,7 @@ suite "EC NIST-P256/384/521 test suite":
isNil(shared2) == false isNil(shared2) == false
shared1 == shared2 shared1 == shared2
test "[secp384r1] ECDHE test vectors": test "[secp521r1] ECDHE test vectors":
for i in 6..<9: for i in 6..<9:
var key1 = fromHex(stripSpaces(ECDHEPrivateKeys[i * 2])) var key1 = fromHex(stripSpaces(ECDHEPrivateKeys[i * 2]))
var key2 = fromHex(stripSpaces(ECDHEPrivateKeys[i * 2 + 1])) var key2 = fromHex(stripSpaces(ECDHEPrivateKeys[i * 2 + 1]))