diff --git a/libp2p/daemon/daemonapi.nim b/libp2p/daemon/daemonapi.nim index c19f4e4..3c4f13b 100644 --- a/libp2p/daemon/daemonapi.nim +++ b/libp2p/daemon/daemonapi.nim @@ -92,6 +92,10 @@ type protocol*: string transp*: StreamTransport + P2PServer = object + server*: StreamServer + address*: TransportAddress + DaemonAPI* = ref object pool*: TransportPool flags*: set[P2PDaemonFlags] @@ -101,7 +105,7 @@ type ucounter*: int process*: Process handlers*: Table[string, P2PStreamCallback] - servers*: seq[StreamServer] + servers*: seq[P2PServer] PeerInfo* = object peer*: PeerID @@ -482,7 +486,7 @@ proc newDaemonApi*(flags: set[P2PDaemonFlags] = {}, ## Initialize connections to `go-libp2p-daemon` control socket. var api = new DaemonAPI api.flags = flags - api.servers = newSeq[StreamServer]() + api.servers = newSeq[P2PServer]() api.pattern = pattern api.ucounter = 1 api.handlers = initTable[string, P2PStreamCallback]() @@ -523,9 +527,11 @@ proc newDaemonApi*(flags: set[P2PDaemonFlags] = {}, args.add("-pubsub") args.add("-pubsubRouter=floodsub") if gossipsubHeartbeatInterval != 0: - args.add("-gossipsubHeartbeatInterval=" & $gossipsubHeartbeatInterval) + let param = $gossipsubHeartbeatInterval & "ms" + args.add("-gossipsubHeartbeatInterval=" & param) if gossipsubHeartbeatDelay != 0: - args.add("-gossipsubHeartbeatInitialDelay=" & $gossipsubHeartbeatDelay) + let param = $gossipsubHeartbeatDelay & "ms" + args.add("-gossipsubHeartbeatInitialDelay=" & param) if api.flags * {P2PDaemonFlags.PSFloodSub, P2PDaemonFlags.PSFloodSub} != {}: if P2PDaemonFlags.PSSign in api.flags: args.add("-pubsubSign=true") @@ -546,7 +552,9 @@ proc newDaemonApi*(flags: set[P2PDaemonFlags] = {}, # We can't use `existsFile()` because it do not support unix-domain socket # endpoints. if socketExists(api.sockname): - discard tryRemoveFile(api.sockname) + if not tryRemoveFile(api.sockname): + if api.sockname != sockpath: + raise newException(DaemonLocalError, "Socket is already bound!") # Starting daemon process api.process = startProcess(cmd, "", args, options = {poStdErrToStdOut}) # Waiting until daemon will not be bound to control socket. @@ -562,7 +570,7 @@ proc newDaemonApi*(flags: set[P2PDaemonFlags] = {}, result = api -proc close*(api: DaemonAPI, stream: P2PStream) {.async.} = +proc close*(stream: P2PStream) {.async.} = ## Close ``stream``. if P2PStreamFlags.Closed notin stream.flags: stream.transp.close() @@ -579,16 +587,19 @@ proc close*(api: DaemonAPI) {.async.} = if len(api.servers) > 0: var pending = newSeq[Future[void]]() for server in api.servers: - server.stop() - server.close() - pending.add(server.join()) + server.server.stop() + server.server.close() + pending.add(server.server.join()) await all(pending) + for server in api.servers: + discard tryRemoveFile($(server.address)) + api.servers.setLen(0) # Closing daemon's process. - if api.flags != {}: - api.process.terminate() + api.process.kill() + discard api.process.waitForExit() # Attempt to delete control socket endpoint. - # if socketExists(api.sockname): - # discard tryRemoveFile(api.sockname) + if socketExists(api.sockname): + discard tryRemoveFile(api.sockname) template withMessage(m, body: untyped): untyped = let kind = m.checkResponse() @@ -726,7 +737,7 @@ proc addHandler*(api: DaemonAPI, protocols: seq[string], var pb = await transp.transactMessage(requestStreamHandler(sockname, protocols)) pb.withMessage() do: - api.servers.add(server) + api.servers.add(P2PServer(server: server, address: localaddr)) except: for item in protocols: api.handlers.del(item) diff --git a/tests/testdaemon.nim b/tests/testdaemon.nim index 65975d6..2b88ab9 100644 --- a/tests/testdaemon.nim +++ b/tests/testdaemon.nim @@ -31,6 +31,7 @@ proc connectStreamTest(): Future[bool] {.async.} = doAssert(sent == len(test) + 2) var check = await wait(testFuture, 10000) doAssert(check == test) + await stream.close() await api1.close() await api2.close() result = true