From 4c6a123d316c699eaa265b66ba5733250aa01a77 Mon Sep 17 00:00:00 2001 From: Giovanni Petrantoni Date: Tue, 21 Apr 2020 10:24:42 +0900 Subject: [PATCH] Add chronos trackers and used them to sanitize resource disposal (#131) * Add chronos trackers and used them to sanitize resource disposal * Chronos trackers for transport tests wip * No more chronos leaks in testtransport * Make tcp transport and test more robust when closing * Test async leaking tracking wip * Fix a regression in wire connect * Add chronos trackers to more tests and sanitize resource closure * Wip fixing floodsub tests * Floodsub wip * Made floodsub basically deterministic, hit a nim bug with captures tho * Wrap up floodsub tests refactor * Wrapping up * Add allFuturesThrowing utility * Fix missing allFuturesThrowing in noise tests! * Make tests green * attempt fixing gossipsub failing cases * Make sure to check also fanout in waitSub * More verbose traces * Gossipsub test improvments * Refactor TcpTransport remove asyncCheck * Add Connection trackers * Add stricter connection tracking, wip mplex fix * More asynccheck removal, in order to avoid connection leaks * bump chronicles requirement * Enable tracker dump to check CI output * Wait for more futures in testmplex * Remove tracker dump messages * add tryAndWarn utility, fix mplex issue with go interop * All allFuturesThrowing to directchat too * make sure to cleanup on transport close --- examples/directchat.nim | 3 +- libp2p.nimble | 4 +- libp2p/connection.nim | 60 ++++++- libp2p/errors.nim | 19 +++ libp2p/multistream.nim | 15 +- libp2p/muxers/mplex/mplex.nim | 13 +- libp2p/protocols/pubsub/gossipsub.nim | 66 +++++++- libp2p/protocols/pubsub/rpc/protobuf.nim | 5 +- libp2p/protocols/secure/noise.nim | 1 + libp2p/protocols/secure/secio.nim | 2 + libp2p/protocols/secure/secure.nim | 2 +- libp2p/stream/bufferstream.nim | 38 ++++- libp2p/switch.nim | 2 + libp2p/transports/tcptransport.nim | 84 ++++++++-- libp2p/wire.nim | 9 +- tests/pubsub/testfloodsub.nim | 204 ++++++++++++++++------- tests/pubsub/testgossipsub.nim | 171 +++++++++++++------ tests/pubsub/utils.nim | 1 - tests/testbufferstream.nim | 75 ++++++++- tests/testidentify.nim | 37 +++- tests/testmplex.nim | 191 +++++++++++++++++---- tests/testmultistream.nim | 68 +++++++- tests/testnative.nim | 20 ++- tests/testnoise.nim | 101 +++++++---- tests/testpeerinfo.nim | 17 ++ tests/testswitch.nim | 50 +++++- tests/testtransport.nim | 96 +++++++++-- 27 files changed, 1079 insertions(+), 275 deletions(-) diff --git a/examples/directchat.nim b/examples/directchat.nim index 4ba596d..47bbae5 100644 --- a/examples/directchat.nim +++ b/examples/directchat.nim @@ -6,6 +6,7 @@ import chronos # an efficient library for async import ../libp2p/[switch, # manage transports, a single entry point for dialing and listening multistream, # tag stream with short header to identify it crypto/crypto, # cryptographic functions + errors, # error handling utilities protocols/identify, # identify the peer info of a peer connection, # create and close stream read / write connections transports/transport, # listen and dial to other peers using p2p protocol @@ -196,7 +197,7 @@ proc processInput(rfd: AsyncFD) {.async.} = echo &"{a}/ipfs/{id}" await chatProto.readWriteLoop() - await allFutures(libp2pFuts) + await allFuturesThrowing(libp2pFuts) proc main() {.async.} = let (rfd, wfd) = createAsyncPipe() diff --git a/libp2p.nimble b/libp2p.nimble index 5fed406..c8c3074 100644 --- a/libp2p.nimble +++ b/libp2p.nimble @@ -9,12 +9,10 @@ skipDirs = @["tests", "examples", "Nim"] requires "nim >= 1.2.0", "nimcrypto >= 0.4.1", - "chronos >= 2.3.8", "bearssl >= 0.1.4", - "chronicles >= 0.7.1", + "chronicles >= 0.7.2", "chronos >= 2.3.8", "metrics", - "nimcrypto >= 0.4.1", "secp256k1", "stew" diff --git a/libp2p/connection.nim b/libp2p/connection.nim index 46fc5cc..50ed590 100644 --- a/libp2p/connection.nim +++ b/libp2p/connection.nim @@ -10,6 +10,7 @@ import oids import chronos, chronicles, metrics import peerinfo, + errors, multiaddress, stream/lpstream, peerinfo, @@ -19,17 +20,50 @@ import peerinfo, logScope: topic = "Connection" -const DefaultReadSize* = 1 shl 20 +const + DefaultReadSize* = 1 shl 20 + ConnectionTrackerName* = "libp2p.connection" type Connection* = ref object of LPStream peerInfo*: PeerInfo stream*: LPStream observedAddrs*: Multiaddress + # notice this is a ugly circular reference collection + # (we got many actually :-)) + readLoops*: seq[Future[void]] InvalidVarintException = object of LPStreamError InvalidVarintSizeException = object of LPStreamError + ConnectionTracker* = ref object of TrackerBase + opened*: uint64 + closed*: uint64 + +proc setupConnectionTracker(): ConnectionTracker {.gcsafe.} + +proc getConnectionTracker*(): ConnectionTracker {.gcsafe.} = + result = cast[ConnectionTracker](getTracker(ConnectionTrackerName)) + if isNil(result): + result = setupConnectionTracker() + +proc dumpTracking(): string {.gcsafe.} = + var tracker = getConnectionTracker() + result = "Opened conns: " & $tracker.opened & "\n" & + "Closed conns: " & $tracker.closed + +proc leakTransport(): bool {.gcsafe.} = + var tracker = getConnectionTracker() + result = (tracker.opened != tracker.closed) + +proc setupConnectionTracker(): ConnectionTracker = + result = new ConnectionTracker + result.opened = 0 + result.closed = 0 + result.dump = dumpTracking + result.isLeaked = leakTransport + addTracker(ConnectionTrackerName, result) + declareGauge libp2p_open_connection, "open Connection instances" proc newInvalidVarintException*(): ref InvalidVarintException = @@ -50,9 +84,9 @@ proc bindStreamClose(conn: Connection) {.async.} = trace "wrapped stream closed, closing conn", closed = conn.isClosed, peer = if not isNil(conn.peerInfo): conn.peerInfo.id else: "" - asyncCheck conn.close() + await conn.close() -proc init*[T: Connection](self: var T, stream: LPStream): T = +proc init[T: Connection](self: var T, stream: LPStream): T = ## create a new Connection for the specified async reader/writer new self self.stream = stream @@ -60,6 +94,7 @@ proc init*[T: Connection](self: var T, stream: LPStream): T = when chronicles.enabledLogLevel == LogLevel.TRACE: self.oid = genOid() asyncCheck self.bindStreamClose() + inc getConnectionTracker().opened libp2p_open_connection.inc() return self @@ -116,15 +151,18 @@ method write*(s: Connection, method closed*(s: Connection): bool = if isNil(s.stream): - return false + return true result = s.stream.closed method close*(s: Connection) {.async, gcsafe.} = - if not s.closed: - trace "about to close connection", closed = s.closed, - peer = if not isNil(s.peerInfo): - s.peerInfo.id else: "" + trace "about to close connection", closed = s.closed, + peer = if not isNil(s.peerInfo): + s.peerInfo.id else: "" + + if not s.isClosed: + s.isClosed = true + inc getConnectionTracker().closed if not isNil(s.stream) and not s.stream.closed: trace "closing child stream", closed = s.closed, @@ -133,7 +171,11 @@ method close*(s: Connection) {.async, gcsafe.} = await s.stream.close() s.closeEvent.fire() - s.isClosed = true + + trace "waiting readloops", count=s.readLoops.len + let loopFuts = await allFinished(s.readLoops) + checkFutures(loopFuts) + s.readLoops = @[] trace "connection closed", closed = s.closed, peer = if not isNil(s.peerInfo): diff --git a/libp2p/errors.nim b/libp2p/errors.nim index 35ea14b..8f63158 100644 --- a/libp2p/errors.nim +++ b/libp2p/errors.nim @@ -37,3 +37,22 @@ macro checkFutures*[T](futs: seq[Future[T]], exclude: untyped = []): untyped = # We still don't abort but warn warn "Something went wrong in a future", error=exc.name, file=pos.filename, line=pos.line + +proc allFuturesThrowing*[T](args: varargs[Future[T]]): Future[void] = + var futs: seq[Future[T]] + for fut in args: + futs &= fut + proc call() {.async.} = + futs = await allFinished(futs) + for fut in futs: + if fut.failed: + raise fut.readError() + return call() + +template tryAndWarn*(msg: static[string]; body: untyped): untyped = + try: + body + except CancelledError as ex: + raise ex + except CatchableError as ex: + warn "ignored an error", name=ex.name, msg=msg diff --git a/libp2p/multistream.nim b/libp2p/multistream.nim index 2910f10..e31e357 100644 --- a/libp2p/multistream.nim +++ b/libp2p/multistream.nim @@ -11,6 +11,7 @@ import strutils import chronos, chronicles import connection, vbuffer, + errors, protocols/protocol logScope: @@ -116,7 +117,7 @@ proc list*(m: MultistreamSelect, proc handle*(m: MultistreamSelect, conn: Connection) {.async, gcsafe.} = trace "handle: starting multistream handling" - try: + tryAndWarn "multistream handle": while not conn.closed: var ms = cast[string]((await conn.readLp())) ms.removeSuffix("\n") @@ -145,18 +146,14 @@ proc handle*(m: MultistreamSelect, conn: Connection) {.async, gcsafe.} = if (not isNil(h.match) and h.match(ms)) or ms == h.proto: trace "found handler for", protocol = ms await conn.writeLp((h.proto & "\n")) - try: + tryAndWarn "multistream handle handler": await h.protocol.handler(conn, ms) return - except CatchableError as exc: - warn "exception while handling", msg = exc.msg - return warn "no handlers for ", protocol = ms await conn.write(m.na) - except CatchableError as exc: - trace "Exception occurred", exc = exc.msg - finally: - trace "leaving multistream loop" + trace "leaving multistream loop" + # we might be tempted to close conn here but that would be a bad idea! + # we indeed will reuse it later on proc addHandler*[T: LPProtocol](m: MultistreamSelect, codec: string, diff --git a/libp2p/muxers/mplex/mplex.nim b/libp2p/muxers/mplex/mplex.nim index 85290e0..5e234ab 100644 --- a/libp2p/muxers/mplex/mplex.nim +++ b/libp2p/muxers/mplex/mplex.nim @@ -92,12 +92,15 @@ method handle*(m: Mplex) {.async, gcsafe.} = let stream = newConnection(channel) stream.peerInfo = m.connection.peerInfo - # cleanup channel once handler is finished - # stream.closeEvent.wait().addCallback( - # proc(udata: pointer) = - # asyncCheck cleanupChann(m, channel, initiator)) + proc handler() {.async.} = + tryAndWarn "mplex channel handler": + await m.streamHandler(stream) + # TODO closing stream + # or doing cleanupChann + # will make go interop tests fail + # need to investigate why - asyncCheck m.streamHandler(stream) + asynccheck handler() continue of MessageType.MsgIn, MessageType.MsgOut: trace "pushing data to channel", id = id, diff --git a/libp2p/protocols/pubsub/gossipsub.nim b/libp2p/protocols/pubsub/gossipsub.nim index 8afee01..f7c9e5c 100644 --- a/libp2p/protocols/pubsub/gossipsub.nim +++ b/libp2p/protocols/pubsub/gossipsub.nim @@ -87,7 +87,10 @@ method init(g: GossipSub) = method handleDisconnect(g: GossipSub, peer: PubSubPeer) {.async.} = ## handle peer disconnects + trace "peer disconnected", peer=peer.id + await procCall FloodSub(g).handleDisconnect(peer) + for t in g.gossipsub.keys: g.gossipsub[t].excl(peer.id) @@ -263,9 +266,9 @@ proc rebalanceMesh(g: GossipSub, topic: string) {.async.} = if g.mesh[topic].len < GossipSubDlo: trace "replenishing mesh" - # replenish the mesh if we're bellow GossipSubDlo + # replenish the mesh if we're below GossipSubDlo while g.mesh[topic].len < GossipSubD: - trace "gattering peers", peers = g.mesh[topic].len + trace "gathering peers", peers = g.mesh[topic].len var id: string if topic in g.fanout and g.fanout[topic].len > 0: id = g.fanout[topic].pop() @@ -457,12 +460,31 @@ when isMainModule: ## import unittest + import ../../errors import ../../stream/bufferstream type TestGossipSub = ref object of GossipSub + const + StreamTransportTrackerName = "stream.transport" + StreamServerTrackerName = "stream.server" + suite "GossipSub": + teardown: + let + trackers = [ + getTracker(BufferStreamTrackerName), + getTracker(AsyncStreamWriterTrackerName), + getTracker(AsyncStreamReaderTrackerName), + getTracker(StreamTransportTrackerName), + getTracker(StreamServerTrackerName) + ] + for tracker in trackers: + if not isNil(tracker): + # echo tracker.dump() + check tracker.isLeaked() == false + test "`rebalanceMesh` Degree Lo": proc testRun(): Future[bool] {.async.} = let gossipSub = newPubSub(TestGossipSub, @@ -473,8 +495,10 @@ when isMainModule: proc writeHandler(data: seq[byte]) {.async.} = discard + var conns = newSeq[Connection]() for i in 0..<15: let conn = newConnection(newBufferStream(writeHandler)) + conns &= conn let peerInfo = PeerInfo.init(PrivateKey.random(RSA)) conn.peerInfo = peerInfo gossipSub.peers[peerInfo.id] = newPubSubPeer(peerInfo, GossipSubCodec) @@ -485,6 +509,8 @@ when isMainModule: await gossipSub.rebalanceMesh(topic) check gossipSub.mesh[topic].len == GossipSubD + await allFuturesThrowing(conns.mapIt(it.close())) + result = true check: @@ -500,8 +526,10 @@ when isMainModule: proc writeHandler(data: seq[byte]) {.async.} = discard + var conns = newSeq[Connection]() for i in 0..<15: let conn = newConnection(newBufferStream(writeHandler)) + conns &= conn let peerInfo = PeerInfo.init(PrivateKey.random(RSA)) conn.peerInfo = peerInfo gossipSub.peers[peerInfo.id] = newPubSubPeer(peerInfo, GossipSubCodec) @@ -512,6 +540,8 @@ when isMainModule: await gossipSub.rebalanceMesh(topic) check gossipSub.mesh[topic].len == GossipSubD + await allFuturesThrowing(conns.mapIt(it.close())) + result = true check: @@ -530,8 +560,10 @@ when isMainModule: proc writeHandler(data: seq[byte]) {.async.} = discard + var conns = newSeq[Connection]() for i in 0..<15: let conn = newConnection(newBufferStream(writeHandler)) + conns &= conn var peerInfo = PeerInfo.init(PrivateKey.random(RSA)) conn.peerInfo = peerInfo gossipSub.peers[peerInfo.id] = newPubSubPeer(peerInfo, GossipSubCodec) @@ -542,6 +574,8 @@ when isMainModule: await gossipSub.replenishFanout(topic) check gossipSub.fanout[topic].len == GossipSubD + await allFuturesThrowing(conns.mapIt(it.close())) + result = true check: @@ -561,8 +595,10 @@ when isMainModule: proc writeHandler(data: seq[byte]) {.async.} = discard + var conns = newSeq[Connection]() for i in 0..<6: let conn = newConnection(newBufferStream(writeHandler)) + conns &= conn let peerInfo = PeerInfo.init(PrivateKey.random(RSA)) conn.peerInfo = peerInfo gossipSub.peers[peerInfo.id] = newPubSubPeer(peerInfo, GossipSubCodec) @@ -574,6 +610,8 @@ when isMainModule: await gossipSub.dropFanoutPeers() check topic notin gossipSub.fanout + await allFuturesThrowing(conns.mapIt(it.close())) + result = true check: @@ -597,8 +635,10 @@ when isMainModule: proc writeHandler(data: seq[byte]) {.async.} = discard + var conns = newSeq[Connection]() for i in 0..<6: let conn = newConnection(newBufferStream(writeHandler)) + conns &= conn let peerInfo = PeerInfo.init(PrivateKey.random(RSA)) conn.peerInfo = peerInfo gossipSub.peers[peerInfo.id] = newPubSubPeer(peerInfo, GossipSubCodec) @@ -613,6 +653,8 @@ when isMainModule: check topic1 notin gossipSub.fanout check topic2 in gossipSub.fanout + await allFuturesThrowing(conns.mapIt(it.close())) + result = true check: @@ -633,8 +675,10 @@ when isMainModule: gossipSub.mesh[topic] = initHashSet[string]() gossipSub.fanout[topic] = initHashSet[string]() gossipSub.gossipsub[topic] = initHashSet[string]() + var conns = newSeq[Connection]() for i in 0..<30: let conn = newConnection(newBufferStream(writeHandler)) + conns &= conn let peerInfo = PeerInfo.init(PrivateKey.random(RSA)) conn.peerInfo = peerInfo gossipSub.peers[peerInfo.id] = newPubSubPeer(peerInfo, GossipSubCodec) @@ -646,6 +690,7 @@ when isMainModule: for i in 0..<15: let conn = newConnection(newBufferStream(writeHandler)) + conns &= conn let peerInfo = PeerInfo.init(PrivateKey.random(RSA)) conn.peerInfo = peerInfo gossipSub.peers[peerInfo.id] = newPubSubPeer(peerInfo, GossipSubCodec) @@ -662,6 +707,8 @@ when isMainModule: check p notin gossipSub.fanout[topic] check p notin gossipSub.mesh[topic] + await allFuturesThrowing(conns.mapIt(it.close())) + result = true check: @@ -681,8 +728,10 @@ when isMainModule: let topic = "foobar" gossipSub.fanout[topic] = initHashSet[string]() gossipSub.gossipsub[topic] = initHashSet[string]() + var conns = newSeq[Connection]() for i in 0..<30: let conn = newConnection(newBufferStream(writeHandler)) + conns &= conn let peerInfo = PeerInfo.init(PrivateKey.random(RSA)) conn.peerInfo = peerInfo gossipSub.peers[peerInfo.id] = newPubSubPeer(peerInfo, GossipSubCodec) @@ -694,6 +743,9 @@ when isMainModule: let peers = gossipSub.getGossipPeers() check peers.len == GossipSubD + + await allFuturesThrowing(conns.mapIt(it.close())) + result = true check: @@ -713,8 +765,10 @@ when isMainModule: let topic = "foobar" gossipSub.mesh[topic] = initHashSet[string]() gossipSub.gossipsub[topic] = initHashSet[string]() + var conns = newSeq[Connection]() for i in 0..<30: let conn = newConnection(newBufferStream(writeHandler)) + conns &= conn let peerInfo = PeerInfo.init(PrivateKey.random(RSA)) conn.peerInfo = peerInfo gossipSub.peers[peerInfo.id] = newPubSubPeer(peerInfo, GossipSubCodec) @@ -726,6 +780,9 @@ when isMainModule: let peers = gossipSub.getGossipPeers() check peers.len == GossipSubD + + await allFuturesThrowing(conns.mapIt(it.close())) + result = true check: @@ -745,8 +802,10 @@ when isMainModule: let topic = "foobar" gossipSub.mesh[topic] = initHashSet[string]() gossipSub.fanout[topic] = initHashSet[string]() + var conns = newSeq[Connection]() for i in 0..<30: let conn = newConnection(newBufferStream(writeHandler)) + conns &= conn let peerInfo = PeerInfo.init(PrivateKey.random(RSA)) conn.peerInfo = peerInfo gossipSub.peers[peerInfo.id] = newPubSubPeer(peerInfo, GossipSubCodec) @@ -758,6 +817,9 @@ when isMainModule: let peers = gossipSub.getGossipPeers() check peers.len == 0 + + await allFuturesThrowing(conns.mapIt(it.close())) + result = true check: diff --git a/libp2p/protocols/pubsub/rpc/protobuf.nim b/libp2p/protocols/pubsub/rpc/protobuf.nim index 9362beb..d5b7f7b 100644 --- a/libp2p/protocols/pubsub/rpc/protobuf.nim +++ b/libp2p/protocols/pubsub/rpc/protobuf.nim @@ -23,7 +23,6 @@ proc decodeGraft*(pb: var ProtoBuffer): seq[ControlGraft] {.gcsafe.} = while true: var topic: string if pb.getString(1, topic) < 0: - trace "unable to read topic field from graft msg, breaking" break trace "read topic field from graft msg", topicID = topic @@ -38,8 +37,8 @@ proc decodePrune*(pb: var ProtoBuffer): seq[ControlPrune] {.gcsafe.} = var topic: string if pb.getString(1, topic) < 0: break - trace "read topic field", topicID = topic - + + trace "read topic field from prune msg", topicID = topic result.add(ControlPrune(topicID: topic)) proc encodeIHave*(ihave: ControlIHave, pb: var ProtoBuffer) {.gcsafe.} = diff --git a/libp2p/protocols/secure/noise.nim b/libp2p/protocols/secure/noise.nim index 7d98d42..369c737 100644 --- a/libp2p/protocols/secure/noise.nim +++ b/libp2p/protocols/secure/noise.nim @@ -499,6 +499,7 @@ method handshake*(p: Noise, conn: Connection, initiator: bool = false): Future[S raise newException(NoiseHandshakeError, "Noise handshake, peer infos don't match! " & $pid & " != " & $conn.peerInfo.peerId) var secure = new NoiseConnection + inc getConnectionTracker().opened secure.stream = conn secure.closeEvent = newAsyncEvent() secure.peerInfo = PeerInfo.init(remotePubKey) diff --git a/libp2p/protocols/secure/secio.nim b/libp2p/protocols/secure/secio.nim index 935aa45..e90c012 100644 --- a/libp2p/protocols/secure/secio.nim +++ b/libp2p/protocols/secure/secio.nim @@ -266,6 +266,8 @@ proc newSecioConn(conn: Connection, when chronicles.enabledLogLevel == LogLevel.TRACE: result.oid = genOid() + inc getConnectionTracker().opened + proc transactMessage(conn: Connection, msg: seq[byte]): Future[seq[byte]] {.async.} = var buf = newSeq[byte](4) diff --git a/libp2p/protocols/secure/secure.nim b/libp2p/protocols/secure/secure.nim index 00105d1..6c7859a 100644 --- a/libp2p/protocols/secure/secure.nim +++ b/libp2p/protocols/secure/secure.nim @@ -61,7 +61,7 @@ proc handleConn*(s: Secure, conn: Connection, initiator: bool = false): Future[C await sconn.writeMessage(data) result = newConnection(newBufferStream(writeHandler)) - asyncCheck readLoop(sconn, result) + conn.readLoops &= readLoop(sconn, result) if not isNil(sconn.peerInfo) and sconn.peerInfo.publicKey.isSome: result.peerInfo = PeerInfo.init(sconn.peerInfo.publicKey.get()) diff --git a/libp2p/stream/bufferstream.nim b/libp2p/stream/bufferstream.nim index f2c168a..b6aba58 100644 --- a/libp2p/stream/bufferstream.nim +++ b/libp2p/stream/bufferstream.nim @@ -34,7 +34,9 @@ import deques, math, oids import chronos, chronicles, metrics import ../stream/lpstream -const DefaultBufferSize* = 1024 +const + BufferStreamTrackerName* = "libp2p.bufferstream" + DefaultBufferSize* = 1024 type # TODO: figure out how to make this generic to avoid casts @@ -52,6 +54,33 @@ type AlreadyPipedError* = object of CatchableError NotWritableError* = object of CatchableError + BufferStreamTracker* = ref object of TrackerBase + opened*: uint64 + closed*: uint64 + +proc setupBufferStreamTracker(): BufferStreamTracker {.gcsafe.} + +proc getBufferStreamTracker(): BufferStreamTracker {.gcsafe.} = + result = cast[BufferStreamTracker](getTracker(BufferStreamTrackerName)) + if isNil(result): + result = setupBufferStreamTracker() + +proc dumpTracking(): string {.gcsafe.} = + var tracker = getBufferStreamTracker() + result = "Opened buffers: " & $tracker.opened & "\n" & + "Closed buffers: " & $tracker.closed + +proc leakTransport(): bool {.gcsafe.} = + var tracker = getBufferStreamTracker() + result = (tracker.opened != tracker.closed) + +proc setupBufferStreamTracker(): BufferStreamTracker = + result = new BufferStreamTracker + result.opened = 0 + result.closed = 0 + result.dump = dumpTracking + result.isLeaked = leakTransport + addTracker(BufferStreamTrackerName, result) declareGauge libp2p_open_bufferstream, "open BufferStream instances" proc newAlreadyPipedError*(): ref Exception {.inline.} = @@ -77,6 +106,7 @@ proc initBufferStream*(s: BufferStream, s.lock = newAsyncLock() s.writeHandler = handler s.closeEvent = newAsyncEvent() + inc getBufferStreamTracker().opened when chronicles.enabledLogLevel == LogLevel.TRACE: s.oid = genOid() s.isClosed = false @@ -181,7 +211,7 @@ method readExactly*(s: BufferStream, try: buff = await s.read(nbytes) except LPStreamEOFError as exc: - trace "Exception occured", exc = exc.msg + trace "Exception occurred", exc = exc.msg if nbytes > buff.len(): raise newLPStreamIncompleteError() @@ -399,5 +429,7 @@ method close*(s: BufferStream) {.async.} = s.readBuf.clear() s.closeEvent.fire() s.isClosed = true + inc getBufferStreamTracker().closed libp2p_open_bufferstream.dec() - + else: + trace "attempt to close an already closed bufferstream", trace=getStackTrace() diff --git a/libp2p/switch.nim b/libp2p/switch.nim index dfca8a4..c6387dc 100644 --- a/libp2p/switch.nim +++ b/libp2p/switch.nim @@ -324,6 +324,8 @@ proc stop*(s: Switch) {.async.} = futs = await allFinished(futs) checkFutures(futs) + trace "switch stopped" + proc subscribeToPeer(s: Switch, peerInfo: PeerInfo) {.async, gcsafe.} = ## Subscribe to pub sub peer if s.pubSub.isSome and peerInfo.id notin s.dialedPubSubPeers: diff --git a/libp2p/transports/tcptransport.nim b/libp2p/transports/tcptransport.nim index 6e26004..285b6f7 100644 --- a/libp2p/transports/tcptransport.nim +++ b/libp2p/transports/tcptransport.nim @@ -7,8 +7,9 @@ ## This file may not be copied, modified, or distributed except according to ## those terms. -import chronos, chronicles, sequtils +import chronos, chronicles, sequtils, sets import transport, + ../errors, ../wire, ../connection, ../multiaddress, @@ -18,40 +19,78 @@ import transport, logScope: topic = "TcpTransport" -type TcpTransport* = ref object of Transport - server*: StreamServer +const + TcpTransportTrackerName* = "libp2p.tcptransport" + +type + TcpTransport* = ref object of Transport + server*: StreamServer + cleanups*: seq[Future[void]] + handlers*: seq[Future[void]] + + TcpTransportTracker* = ref object of TrackerBase + opened*: uint64 + closed*: uint64 + +proc setupTcpTransportTracker(): TcpTransportTracker {.gcsafe.} + +proc getTcpTransportTracker(): TcpTransportTracker {.gcsafe.} = + result = cast[TcpTransportTracker](getTracker(TcpTransportTrackerName)) + if isNil(result): + result = setupTcpTransportTracker() + +proc dumpTracking(): string {.gcsafe.} = + var tracker = getTcpTransportTracker() + result = "Opened transports: " & $tracker.opened & "\n" & + "Closed transports: " & $tracker.closed + +proc leakTransport(): bool {.gcsafe.} = + var tracker = getTcpTransportTracker() + result = (tracker.opened != tracker.closed) + +proc setupTcpTransportTracker(): TcpTransportTracker = + result = new TcpTransportTracker + result.opened = 0 + result.closed = 0 + result.dump = dumpTracking + result.isLeaked = leakTransport + addTracker(TcpTransportTrackerName, result) proc cleanup(t: Transport, conn: Connection) {.async.} = await conn.closeEvent.wait() + trace "connection cleanup event wait ended" t.connections.keepItIf(it != conn) -proc connHandler*(t: Transport, +proc connHandler*(t: TcpTransport, server: StreamServer, client: StreamTransport, - initiator: bool = false): - Future[Connection] {.async, gcsafe.} = + initiator: bool): Connection = trace "handling connection for", address = $client.remoteAddress let conn: Connection = newConnection(newChronosStream(server, client)) conn.observedAddrs = MultiAddress.init(client.remoteAddress) if not initiator: if not isNil(t.handler): - asyncCheck t.handler(conn) + t.handlers &= t.handler(conn) t.connections.add(conn) - asyncCheck t.cleanup(conn) + t.cleanups &= t.cleanup(conn) result = conn proc connCb(server: StreamServer, client: StreamTransport) {.async, gcsafe.} = trace "incomming connection for", address = $client.remoteAddress - let t: Transport = cast[Transport](server.udata) - asyncCheck t.connHandler(server, client) + let t = cast[TcpTransport](server.udata) + # we don't need result connection in this case + # as it's added inside connHandler + discard t.connHandler(server, client, false) method init*(t: TcpTransport) = t.multicodec = multiCodec("tcp") -method close*(t: TcpTransport): Future[void] {.async, gcsafe.} = + inc getTcpTransportTracker().opened + +method close*(t: TcpTransport) {.async, gcsafe.} = ## start the transport trace "stopping transport" await procCall Transport(t).close() # call base @@ -59,11 +98,28 @@ method close*(t: TcpTransport): Future[void] {.async, gcsafe.} = # server can be nil if not isNil(t.server): t.server.stop() - t.server.close() - await t.server.join() + await t.server.closeWait() + + t.server = nil + + for fut in t.handlers: + if not fut.finished: + fut.cancel() + t.handlers = await allFinished(t.handlers) + checkFutures(t.handlers) + t.handlers = @[] + + for fut in t.cleanups: + if not fut.finished: + fut.cancel() + t.cleanups = await allFinished(t.cleanups) + checkFutures(t.cleanups) + t.cleanups = @[] trace "transport stopped" + inc getTcpTransportTracker().closed + method listen*(t: TcpTransport, ma: MultiAddress, handler: ConnHandler): @@ -85,7 +141,7 @@ method dial*(t: TcpTransport, trace "dialing remote peer", address = $address ## dial a peer let client: StreamTransport = await connect(address) - result = await t.connHandler(t.server, client, true) + result = t.connHandler(t.server, client, true) method handles*(t: TcpTransport, address: MultiAddress): bool {.gcsafe.} = if procCall Transport(t).handles(address): diff --git a/libp2p/wire.nim b/libp2p/wire.nim index 1b7dc72..16d087f 100644 --- a/libp2p/wire.nim +++ b/libp2p/wire.nim @@ -56,7 +56,7 @@ proc initTAddress*(ma: MultiAddress): TransportAddress = "Could not initialize address!") proc connect*(ma: MultiAddress, bufferSize = DefaultStreamBufferSize, - child: StreamTransport = nil): Future[StreamTransport] = + child: StreamTransport = nil): Future[StreamTransport] {.async.} = ## Open new connection to remote peer with address ``ma`` and create ## new transport object ``StreamTransport`` for established connection. ## ``bufferSize`` is size of internal buffer for transport. @@ -64,11 +64,8 @@ proc connect*(ma: MultiAddress, bufferSize = DefaultStreamBufferSize, let address = initTAddress(ma) if address.family in {AddressFamily.IPv4, AddressFamily.IPv6}: if ma[1].protoCode() != multiCodec("tcp"): - var retFuture = newFuture[StreamTransport]() - retFuture.fail(newException(TransportAddressError, - "Incorrect address type!")) - return retFuture - result = connect(address, bufferSize, child) + raise newException(TransportAddressError, "Incorrect address type!") + result = await connect(address, bufferSize, child) proc createStreamServer*[T](ma: MultiAddress, cbproc: StreamCallback, diff --git a/tests/pubsub/testfloodsub.nim b/tests/pubsub/testfloodsub.nim index c66d94f..46a154f 100644 --- a/tests/pubsub/testfloodsub.nim +++ b/tests/pubsub/testfloodsub.nim @@ -7,16 +7,49 @@ ## This file may not be copied, modified, or distributed except according to ## those terms. -import unittest, sequtils +import unittest, sequtils, options, tables, sets import chronos import utils, - ../../libp2p/[switch, + ../../libp2p/[errors, + switch, + connection, + stream/bufferstream, crypto/crypto, protocols/pubsub/pubsub, + protocols/pubsub/floodsub, protocols/pubsub/rpc/messages, protocols/pubsub/rpc/message] +const + StreamTransportTrackerName = "stream.transport" + StreamServerTrackerName = "stream.server" + +proc waitSub(sender, receiver: auto; key: string) {.async, gcsafe.} = + # turn things deterministic + # this is for testing purposes only + var ceil = 15 + let fsub = cast[FloodSub](sender.pubSub.get()) + while not fsub.floodsub.hasKey(key) or + not fsub.floodsub[key].contains(receiver.peerInfo.id): + await sleepAsync(100.millis) + dec ceil + doAssert(ceil > 0, "waitSub timeout!") + suite "FloodSub": + teardown: + let + trackers = [ + # getTracker(ConnectionTrackerName), + getTracker(BufferStreamTrackerName), + getTracker(AsyncStreamWriterTrackerName), + getTracker(AsyncStreamReaderTrackerName), + getTracker(StreamTransportTrackerName), + getTracker(StreamServerTrackerName) + ] + for tracker in trackers: + if not isNil(tracker): + check tracker.isLeaked() == false + test "FloodSub basic publish/subscribe A -> B": proc runTests(): Future[bool] {.async.} = var completionFut = newFuture[bool]() @@ -24,21 +57,30 @@ suite "FloodSub": check topic == "foobar" completionFut.complete(true) - var nodes = generateNodes(2) - var awaiters: seq[Future[void]] - awaiters.add((await nodes[0].start())) - awaiters.add((await nodes[1].start())) + let + nodes = generateNodes(2) + nodesFut = await allFinished( + nodes[0].start(), + nodes[1].start() + ) await subscribeNodes(nodes) + await nodes[1].subscribe("foobar", handler) - await sleepAsync(1000.millis) + await waitSub(nodes[0], nodes[1], "foobar") await nodes[0].publish("foobar", cast[seq[byte]]("Hello!")) - result = await completionFut - await allFutures(nodes[0].stop(), nodes[1].stop()) - await allFutures(awaiters) + result = await completionFut.wait(5.seconds) + await allFuturesThrowing( + nodes[0].stop(), + nodes[1].stop() + ) + + for fut in nodesFut: + let res = fut.read() + await allFuturesThrowing(res) check: waitFor(runTests()) == true @@ -55,14 +97,16 @@ suite "FloodSub": awaiters.add((await nodes[1].start())) await subscribeNodes(nodes) + await nodes[0].subscribe("foobar", handler) - await sleepAsync(1000.millis) + await waitSub(nodes[1], nodes[0], "foobar") await nodes[1].publish("foobar", cast[seq[byte]]("Hello!")) - result = await completionFut - await allFutures(nodes[0].stop(), nodes[1].stop()) - await allFutures(awaiters) + result = await completionFut.wait(5.seconds) + + await allFuturesThrowing(nodes[0].stop(), nodes[1].stop()) + await allFuturesThrowing(awaiters) check: waitFor(runTests()) == true @@ -81,7 +125,7 @@ suite "FloodSub": await subscribeNodes(nodes) await nodes[1].subscribe("foobar", handler) - await sleepAsync(1000.millis) + await waitSub(nodes[0], nodes[1], "foobar") var validatorFut = newFuture[bool]() proc validator(topic: string, @@ -91,11 +135,12 @@ suite "FloodSub": result = true nodes[1].addValidator("foobar", validator) + await nodes[0].publish("foobar", cast[seq[byte]]("Hello!")) - await allFutures(handlerFut, handlerFut) - await allFutures(nodes[0].stop(), nodes[1].stop()) - await allFutures(awaiters) + await allFuturesThrowing(handlerFut, handlerFut) + await allFuturesThrowing(nodes[0].stop(), nodes[1].stop()) + await allFuturesThrowing(awaiters) result = true check: @@ -113,7 +158,7 @@ suite "FloodSub": await subscribeNodes(nodes) await nodes[1].subscribe("foobar", handler) - await sleepAsync(100.millis) + await waitSub(nodes[0], nodes[1], "foobar") var validatorFut = newFuture[bool]() proc validator(topic: string, @@ -122,9 +167,11 @@ suite "FloodSub": result = false nodes[1].addValidator("foobar", validator) + await nodes[0].publish("foobar", cast[seq[byte]]("Hello!")) - await allFutures(nodes[0].stop(), nodes[1].stop()) - await allFutures(awaiters) + + await allFuturesThrowing(nodes[0].stop(), nodes[1].stop()) + await allFuturesThrowing(awaiters) result = true check: @@ -144,8 +191,9 @@ suite "FloodSub": await subscribeNodes(nodes) await nodes[1].subscribe("foo", handler) + await waitSub(nodes[0], nodes[1], "foo") await nodes[1].subscribe("bar", handler) - await sleepAsync(1000.millis) + await waitSub(nodes[0], nodes[1], "bar") proc validator(topic: string, message: Message): Future[bool] {.async.} = @@ -155,12 +203,12 @@ suite "FloodSub": result = false nodes[1].addValidator("foo", "bar", validator) + await nodes[0].publish("foo", cast[seq[byte]]("Hello!")) await nodes[0].publish("bar", cast[seq[byte]]("Hello!")) - await sleepAsync(100.millis) - await allFutures(nodes[0].stop(), nodes[1].stop()) - await allFutures(awaiters) + await allFuturesThrowing(nodes[0].stop(), nodes[1].stop()) + await allFuturesThrowing(awaiters) result = true check: @@ -169,65 +217,107 @@ suite "FloodSub": test "FloodSub multiple peers, no self trigger": proc runTests(): Future[bool] {.async.} = var passed = 0 - proc handler(topic: string, data: seq[byte]) {.async, gcsafe.} = - check topic == "foobar" - passed.inc() + + var futs = newSeq[(Future[void], TopicHandler, ref int)](10) + for i in 0..<10: + closureScope: + var + fut = newFuture[void]() + counter = new int + futs[i] = ( + fut, + (proc(topic: string, data: seq[byte]) {.async, gcsafe.} = + check topic == "foobar" + inc counter[] + if counter[] == 9: + fut.complete()), + counter + ) var nodes: seq[Switch] = newSeq[Switch]() for i in 0..<10: - nodes.add(newStandardSwitch()) + nodes.add newStandardSwitch() + var awaitters: seq[Future[void]] - for node in nodes: - awaitters.add(await node.start()) - await node.subscribe("foobar", handler) - await sleepAsync(100.millis) - + for i in 0..<10: + awaitters.add(await nodes[i].start()) + await subscribeNodes(nodes) - await sleepAsync(1000.millis) - for node in nodes: - await node.publish("foobar", cast[seq[byte]]("Hello!")) - await sleepAsync(100.millis) + for i in 0..<10: + await nodes[i].subscribe("foobar", futs[i][1]) - await sleepAsync(1.minutes) - await allFutures(nodes.mapIt(it.stop())) - await allFutures(awaitters) + var subs: seq[Future[void]] + for i in 0..<10: + for y in 0..<10: + if y != i: + subs &= waitSub(nodes[i], nodes[y], "foobar") + await allFuturesThrowing(subs) - result = passed >= 10 # non deterministic, so at least 10 times + var pubs: seq[Future[void]] + for i in 0..<10: + pubs &= nodes[i].publish("foobar", cast[seq[byte]]("Hello!")) + await allFuturesThrowing(pubs) + await allFuturesThrowing(futs.mapIt(it[0])) + await allFuturesThrowing(nodes.mapIt(it.stop())) + await allFuturesThrowing(awaitters) + + result = true check: waitFor(runTests()) == true test "FloodSub multiple peers, with self trigger": proc runTests(): Future[bool] {.async.} = var passed = 0 - proc handler(topic: string, data: seq[byte]) {.async, gcsafe.} = - check topic == "foobar" - passed.inc() + + var futs = newSeq[(Future[void], TopicHandler, ref int)](10) + for i in 0..<10: + closureScope: + var + fut = newFuture[void]() + counter = new int + futs[i] = ( + fut, + (proc(topic: string, data: seq[byte]) {.async, gcsafe.} = + check topic == "foobar" + inc counter[] + if counter[] == 10: + fut.complete()), + counter + ) var nodes: seq[Switch] = newSeq[Switch]() for i in 0..<10: nodes.add newStandardSwitch(triggerSelf = true) + var awaitters: seq[Future[void]] - for node in nodes: - awaitters.add((await node.start())) - await node.subscribe("foobar", handler) - await sleepAsync(100.millis) + for i in 0..<10: + awaitters.add(await nodes[i].start()) await subscribeNodes(nodes) - await sleepAsync(1000.millis) - for node in nodes: - await node.publish("foobar", cast[seq[byte]]("Hello!")) - await sleepAsync(100.millis) + for i in 0..<10: + await nodes[i].subscribe("foobar", futs[i][1]) - await sleepAsync(1.minutes) - await allFutures(nodes.mapIt(it.stop())) - await allFutures(awaitters) + var subs: seq[Future[void]] + for i in 0..<10: + for y in 0..<10: + if y != i: + subs &= waitSub(nodes[i], nodes[y], "foobar") + await allFuturesThrowing(subs) - result = passed >= 20 # non deterministic, so at least 10 times + var pubs: seq[Future[void]] + for i in 0..<10: + pubs &= nodes[i].publish("foobar", cast[seq[byte]]("Hello!")) + await allFuturesThrowing(pubs) + await allFuturesThrowing(futs.mapIt(it[0])) + await allFuturesThrowing(nodes.mapIt(it.stop())) + await allFuturesThrowing(awaitters) + + result = true check: waitFor(runTests()) == true diff --git a/tests/pubsub/testgossipsub.nim b/tests/pubsub/testgossipsub.nim index 136f621..1cffc2d 100644 --- a/tests/pubsub/testgossipsub.nim +++ b/tests/pubsub/testgossipsub.nim @@ -9,7 +9,9 @@ import unittest, sequtils, options, tables, sets import chronos -import utils, ../../libp2p/[peer, +import chronicles +import utils, ../../libp2p/[errors, + peer, peerinfo, connection, crypto/crypto, @@ -18,11 +20,48 @@ import utils, ../../libp2p/[peer, protocols/pubsub/gossipsub, protocols/pubsub/rpc/messages] +const + StreamTransportTrackerName = "stream.transport" + StreamServerTrackerName = "stream.server" + proc createGossipSub(): GossipSub = var peerInfo = PeerInfo.init(PrivateKey.random(RSA)) result = newPubSub(GossipSub, peerInfo) +proc waitSub(sender, receiver: auto; key: string) {.async, gcsafe.} = + if sender == receiver: + return + # turn things deterministic + # this is for testing purposes only + # peers can be inside `mesh` and `fanout`, not just `gossipsub` + var ceil = 15 + let fsub = cast[GossipSub](sender.pubSub.get()) + while (not fsub.gossipsub.hasKey(key) or + not fsub.gossipsub[key].contains(receiver.peerInfo.id)) and + (not fsub.mesh.hasKey(key) or + not fsub.mesh[key].contains(receiver.peerInfo.id)) and + (not fsub.fanout.hasKey(key) or + not fsub.fanout[key].contains(receiver.peerInfo.id)): + trace "waitSub sleeping...", peers=fsub.gossipsub[key] + await sleepAsync(100.millis) + dec ceil + doAssert(ceil > 0, "waitSub timeout!") + suite "GossipSub": + teardown: + let + trackers = [ + getTracker(BufferStreamTrackerName), + getTracker(AsyncStreamWriterTrackerName), + getTracker(AsyncStreamReaderTrackerName), + getTracker(StreamTransportTrackerName), + getTracker(StreamServerTrackerName) + ] + for tracker in trackers: + if not isNil(tracker): + # echo tracker.dump() + check tracker.isLeaked() == false + test "GossipSub validation should succeed": proc runTests(): Future[bool] {.async.} = var handlerFut = newFuture[bool]() @@ -35,10 +74,12 @@ suite "GossipSub": awaiters.add((await nodes[0].start())) awaiters.add((await nodes[1].start())) - await nodes[0].subscribe("foobar", handler) - await nodes[1].subscribe("foobar", handler) await subscribeNodes(nodes) - await sleepAsync(100.millis) + + await nodes[0].subscribe("foobar", handler) + await waitSub(nodes[1], nodes[0], "foobar") + await nodes[1].subscribe("foobar", handler) + await waitSub(nodes[0], nodes[1], "foobar") var validatorFut = newFuture[bool]() proc validator(topic: string, @@ -52,8 +93,8 @@ suite "GossipSub": await nodes[0].publish("foobar", cast[seq[byte]]("Hello!")) result = (await validatorFut) and (await handlerFut) - await allFutures(nodes[0].stop(), nodes[1].stop()) - await allFutures(awaiters) + await allFuturesThrowing(nodes[0].stop(), nodes[1].stop()) + await allFuturesThrowing(awaiters) check: waitFor(runTests()) == true @@ -69,8 +110,9 @@ suite "GossipSub": awaiters.add((await nodes[1].start())) await subscribeNodes(nodes) + await nodes[1].subscribe("foobar", handler) - await sleepAsync(100.millis) + await waitSub(nodes[0], nodes[1], "foobar") var validatorFut = newFuture[bool]() proc validator(topic: string, @@ -82,10 +124,9 @@ suite "GossipSub": nodes[1].addValidator("foobar", validator) await nodes[0].publish("foobar", cast[seq[byte]]("Hello!")) - await sleepAsync(100.millis) result = await validatorFut - await allFutures(nodes[0].stop(), nodes[1].stop()) - await allFutures(awaiters) + await allFuturesThrowing(nodes[0].stop(), nodes[1].stop()) + await allFuturesThrowing(awaiters) check: waitFor(runTests()) == true @@ -102,10 +143,11 @@ suite "GossipSub": awaiters.add((await nodes[0].start())) awaiters.add((await nodes[1].start())) - await nodes[1].subscribe("foo", handler) - await nodes[1].subscribe("bar", handler) await subscribeNodes(nodes) - await sleepAsync(100.millis) + await nodes[1].subscribe("foo", handler) + await waitSub(nodes[0], nodes[1], "foo") + await nodes[1].subscribe("bar", handler) + await waitSub(nodes[0], nodes[1], "bar") var passed, failed: Future[bool] = newFuture[bool]() proc validator(topic: string, @@ -123,8 +165,8 @@ suite "GossipSub": await nodes[0].publish("bar", cast[seq[byte]]("Hello!")) result = ((await passed) and (await failed) and (await handlerFut)) - await allFutures(nodes[0].stop(), nodes[1].stop()) - await allFutures(awaiters) + await allFuturesThrowing(nodes[0].stop(), nodes[1].stop()) + await allFuturesThrowing(awaiters) result = true check: waitFor(runTests()) == true @@ -151,12 +193,17 @@ suite "GossipSub": asyncCheck gossip2.handleConn(conn1, GossipSubCodec) await gossip1.subscribe("foobar", handler) - await sleepAsync(10.millis) + await sleepAsync(1.seconds) check: "foobar" in gossip2.gossipsub gossip1.peerInfo.id in gossip2.gossipsub["foobar"] + await allFuturesThrowing( + buf1.close(), + buf2.close() + ) + result = true check: @@ -175,9 +222,9 @@ suite "GossipSub": for node in nodes: awaitters.add(await node.start()) - await nodes[1].subscribe("foobar", handler) await subscribeNodes(nodes) - await sleepAsync(100.millis) + await nodes[1].subscribe("foobar", handler) + await sleepAsync(1.seconds) let gossip1 = GossipSub(nodes[0].pubSub.get()) let gossip2 = GossipSub(nodes[1].pubSub.get()) @@ -187,8 +234,8 @@ suite "GossipSub": "foobar" in gossip1.gossipsub gossip2.peerInfo.id in gossip1.gossipsub["foobar"] - await allFutures(nodes.mapIt(it.stop())) - await allFutures(awaitters) + await allFuturesThrowing(nodes.mapIt(it.stop())) + await allFuturesThrowing(awaitters) result = true @@ -221,7 +268,7 @@ suite "GossipSub": await gossip1.subscribe("foobar", handler) await gossip2.subscribe("foobar", handler) - await sleepAsync(100.millis) + await sleepAsync(1.seconds) check: "foobar" in gossip1.topics @@ -236,6 +283,11 @@ suite "GossipSub": gossip1.peerInfo.id in gossip1.gossipsub["foobar"] gossip2.peerInfo.id in gossip2.gossipsub["foobar"] + await allFuturesThrowing( + buf1.close(), + buf2.close() + ) + result = true check: @@ -254,13 +306,19 @@ suite "GossipSub": for node in nodes: awaitters.add(await node.start()) + await subscribeNodes(nodes) + await nodes[0].subscribe("foobar", handler) await nodes[1].subscribe("foobar", handler) - await subscribeNodes(nodes) - await sleepAsync(100.millis) - let gossip1 = GossipSub(nodes[0].pubSub.get()) - let gossip2 = GossipSub(nodes[1].pubSub.get()) + var subs: seq[Future[void]] + subs &= waitSub(nodes[1], nodes[0], "foobar") + subs &= waitSub(nodes[0], nodes[1], "foobar") + await allFuturesThrowing(subs) + + let + gossip1 = GossipSub(nodes[0].pubSub.get()) + gossip2 = GossipSub(nodes[1].pubSub.get()) check: "foobar" in gossip1.topics @@ -269,11 +327,14 @@ suite "GossipSub": "foobar" in gossip1.gossipsub "foobar" in gossip2.gossipsub - gossip1.peerInfo.id in gossip2.gossipsub["foobar"] - gossip2.peerInfo.id in gossip1.gossipsub["foobar"] + gossip2.peerInfo.id in gossip1.gossipsub["foobar"] or + gossip2.peerInfo.id in gossip1.mesh["foobar"] - await allFutures(nodes.mapIt(it.stop())) - await allFutures(awaitters) + gossip1.peerInfo.id in gossip2.gossipsub["foobar"] or + gossip1.peerInfo.id in gossip2.mesh["foobar"] + + await allFuturesThrowing(nodes.mapIt(it.stop())) + await allFuturesThrowing(awaitters) result = true @@ -322,10 +383,10 @@ suite "GossipSub": test "e2e - GossipSub send over fanout A -> B": proc runTests(): Future[bool] {.async.} = - var passed: bool + var passed = newFuture[void]() proc handler(topic: string, data: seq[byte]) {.async, gcsafe.} = check topic == "foobar" - passed = true + passed.complete() var nodes = generateNodes(2, true) var wait = newSeq[Future[void]]() @@ -335,33 +396,36 @@ suite "GossipSub": await subscribeNodes(nodes) await nodes[1].subscribe("foobar", handler) - await sleepAsync(1000.millis) + await waitSub(nodes[0], nodes[1], "foobar") await nodes[0].publish("foobar", cast[seq[byte]]("Hello!")) - await sleepAsync(1000.millis) var gossipSub1: GossipSub = GossipSub(nodes[0].pubSub.get()) check: "foobar" in gossipSub1.gossipsub - await nodes[1].stop() - await nodes[0].stop() + await passed.wait(5.seconds) - await allFutures(wait) - result = passed + trace "test done, stopping..." + + await nodes[0].stop() + await nodes[1].stop() + await allFuturesThrowing(wait) + + result = true check: waitFor(runTests()) == true # test "send over mesh A -> B": # proc runTests(): Future[bool] {.async.} = - # var passed: bool + # var passed = newFuture[void]() # proc handler(topic: string, data: seq[byte]) {.async, gcsafe.} = # check: # topic == "foobar" # cast[string](data) == "Hello!" - # passed = true + # passed.complete() # let gossip1 = createGossipSub() # let gossip2 = createGossipSub() @@ -387,7 +451,11 @@ suite "GossipSub": # await gossip2.publish("foobar", cast[seq[byte]]("Hello!")) # await sleepAsync(1.seconds) - # result = passed + + # await passed.wait(5.seconds) + # result = true + + # await allFuturesThrowing(buf1.close(), buf2.close()) # check: # waitFor(runTests()) == true @@ -405,17 +473,17 @@ suite "GossipSub": wait.add(await nodes[1].start()) await subscribeNodes(nodes) - await sleepAsync(100.millis) await nodes[1].subscribe("foobar", handler) - await sleepAsync(100.millis) + await waitSub(nodes[0], nodes[1], "foobar") await nodes[0].publish("foobar", cast[seq[byte]]("Hello!")) + result = await passed await nodes[0].stop() await nodes[1].stop() - await allFutures(wait) + await allFuturesThrowing(wait) check: waitFor(runTests()) == true @@ -466,8 +534,8 @@ suite "GossipSub": # nodes[1].peerInfo.peerId.get().pretty)) # await sleepAsync(1000.millis) - # await allFutures(nodes.mapIt(it.stop())) - # await allFutures(awaitters) + # await allFuturesThrowing(nodes.mapIt(it.stop())) + # await allFuturesThrowing(awaitters) # check: seen.len == 9 # for k, v in seen.pairs: @@ -487,6 +555,8 @@ suite "GossipSub": nodes.add newStandardSwitch(triggerSelf = true, gossip = true) awaitters.add((await nodes[i].start())) + await subscribeNodes(nodes) + var seen: Table[string, int] var subs: seq[Future[void]] var seenFut = newFuture[void]() @@ -502,10 +572,9 @@ suite "GossipSub": if not seenFut.finished() and seen.len == 10: seenFut.complete() - subs.add(dialer.subscribe("foobar", handler)) - await allFutures(subs) - await subscribeNodes(nodes) - await sleepAsync(1.seconds) + subs.add(allFutures(dialer.subscribe("foobar", handler), waitSub(nodes[0], dialer, "foobar"))) + + await allFuturesThrowing(subs) await wait(nodes[0].publish("foobar", cast[seq[byte]]("from node " & @@ -517,8 +586,8 @@ suite "GossipSub": for k, v in seen.pairs: check: v == 1 - await allFutures(nodes.mapIt(it.stop())) - await allFutures(awaitters) + await allFuturesThrowing(nodes.mapIt(it.stop())) + await allFuturesThrowing(awaitters) result = true check: diff --git a/tests/pubsub/utils.nim b/tests/pubsub/utils.nim index 22babb4..f462168 100644 --- a/tests/pubsub/utils.nim +++ b/tests/pubsub/utils.nim @@ -12,5 +12,4 @@ proc subscribeNodes*(nodes: seq[Switch]) {.async.} = for node in nodes: if dialer.peerInfo.peerId != node.peerInfo.peerId: dials.add(dialer.connect(node.peerInfo)) - await sleepAsync(100.millis) await allFutures(dials) diff --git a/tests/testbufferstream.nim b/tests/testbufferstream.nim index ee22096..427641a 100644 --- a/tests/testbufferstream.nim +++ b/tests/testbufferstream.nim @@ -1,10 +1,15 @@ import unittest, strformat import chronos +import ../libp2p/errors import ../libp2p/stream/bufferstream when defined(nimHasUsed): {.used.} suite "BufferStream": + teardown: + # echo getTracker("libp2p.bufferstream").dump() + check getTracker("libp2p.bufferstream").isLeaked() == false + test "push data to buffer": proc testPushTo(): Future[bool] {.async.} = proc writeHandler(data: seq[byte]) {.async, gcsafe.} = discard @@ -16,6 +21,8 @@ suite "BufferStream": check buff.len == 5 result = true + await buff.close() + check: waitFor(testPushTo()) == true @@ -33,6 +40,8 @@ suite "BufferStream": result = true + await buff.close() + check: waitFor(testPushTo()) == true @@ -47,6 +56,8 @@ suite "BufferStream": result = true + await buff.close() + check: waitFor(testRead()) == true @@ -62,6 +73,8 @@ suite "BufferStream": result = true + await buff.close() + check: waitFor(testRead()) == true @@ -81,6 +94,8 @@ suite "BufferStream": result = true + await buff.close() + check: waitFor(testRead()) == true @@ -102,8 +117,11 @@ suite "BufferStream": var fut = reader() await buff.pushTo(cast[seq[byte]](@"12345")) await fut + result = true + await buff.close() + check: waitFor(testRead()) == true @@ -118,8 +136,11 @@ suite "BufferStream": var data: seq[byte] = newSeq[byte](2) await buff.readExactly(addr data[0], 2) check cast[string](data) == @['1', '2'] + result = true + await buff.close() + check: waitFor(testReadExactly()) == true @@ -132,8 +153,11 @@ suite "BufferStream": await buff.pushTo(cast[seq[byte]](@"12345\n67890")) check buff.len == 11 check "12345" == await buff.readLine(0, "\n") + result = true + await buff.close() + check: waitFor(testReadLine()) == true @@ -150,8 +174,11 @@ suite "BufferStream": check (await readFut) == 3 check cast[string](data) == @['1', '2', '3'] + result = true + await buff.close() + check: waitFor(testReadOnce()) == true @@ -168,8 +195,11 @@ suite "BufferStream": check (await readFut) == 4 check cast[string](data) == @['1', '2', '3'] + result = true + await buff.close() + check: waitFor(testReadUntil()) == true @@ -183,8 +213,11 @@ suite "BufferStream": var data = "Hello!" await buff.write(addr data[0], data.len) + result = true + await buff.close() + check: waitFor(testWritePtr()) == true @@ -197,8 +230,11 @@ suite "BufferStream": check buff.len == 0 await buff.write("Hello!", 6) + result = true + await buff.close() + check: waitFor(testWritePtr()) == true @@ -211,8 +247,11 @@ suite "BufferStream": check buff.len == 0 await buff.write(cast[seq[byte]]("Hello!"), 6) + result = true + await buff.close() + check: waitFor(testWritePtr()) == true @@ -236,8 +275,11 @@ suite "BufferStream": await buff.write("Msg 8") await buff.write("Msg 9") await buff.write("Msg 10") + result = true + await buff.close() + check: waitFor(testWritePtr()) == true @@ -265,6 +307,8 @@ suite "BufferStream": result = true + await buff.close() + check: waitFor(testWritePtr()) == true @@ -295,7 +339,7 @@ suite "BufferStream": await buf1.pushTo(cast[seq[byte]]("Hello2!")) await buf2.pushTo(cast[seq[byte]]("Hello1!")) - await allFutures(readFut1, readFut2) + await allFuturesThrowing(readFut1, readFut2) check: res1 == cast[seq[byte]]("Hello2!") @@ -303,6 +347,9 @@ suite "BufferStream": result = true + await buf1.close() + await buf2.close() + check: waitFor(pipeTest()) == true @@ -321,6 +368,9 @@ suite "BufferStream": result = true + await buf1.close() + await buf2.close() + check: waitFor(pipeTest()) == true @@ -339,7 +389,7 @@ suite "BufferStream": await buf1.write(cast[seq[byte]]("Hello1!")) await buf2.write(cast[seq[byte]]("Hello2!")) - await allFutures(readFut1, readFut2) + await allFuturesThrowing(readFut1, readFut2) check: res1 == cast[seq[byte]]("Hello2!") @@ -347,6 +397,9 @@ suite "BufferStream": result = true + await buf1.close() + await buf2.close() + check: waitFor(pipeTest()) == true @@ -368,6 +421,8 @@ suite "BufferStream": result = true + await buf1.close() + check: waitFor(pipeTest()) == true @@ -386,6 +441,9 @@ suite "BufferStream": result = true + await buf1.close() + await buf2.close() + check: waitFor(pipeTest()) == true @@ -404,7 +462,7 @@ suite "BufferStream": await buf1.write(cast[seq[byte]]("Hello1!")) await buf2.write(cast[seq[byte]]("Hello2!")) - await allFutures(readFut1, readFut2) + await allFuturesThrowing(readFut1, readFut2) check: res1 == cast[seq[byte]]("Hello2!") @@ -412,6 +470,9 @@ suite "BufferStream": result = true + await buf1.close() + await buf2.close() + check: waitFor(pipeTest()) == true @@ -433,6 +494,8 @@ suite "BufferStream": result = true + await buf1.close() + check: waitFor(pipeTest()) == true @@ -458,9 +521,11 @@ suite "BufferStream": var writerFut = writer() var readerFut = reader() - await allFutures(readerFut, writerFut) + await allFuturesThrowing(readerFut, writerFut) result = true + await buf1.close() + check: waitFor(pipeTest()) == true @@ -481,6 +546,8 @@ suite "BufferStream": except AsyncTimeoutError: result = false + await stream.close() + check: waitFor(closeTest()) == true diff --git a/tests/testidentify.nim b/tests/testidentify.nim index e47a611..27bfb4c 100644 --- a/tests/testidentify.nim +++ b/tests/testidentify.nim @@ -13,7 +13,25 @@ import ../libp2p/[protocols/identify, when defined(nimHasUsed): {.used.} +const + StreamTransportTrackerName = "stream.transport" + StreamServerTrackerName = "stream.server" + suite "Identify": + teardown: + let + trackers = [ + getTracker(AsyncStreamWriterTrackerName), + getTracker(TcpTransportTrackerName), + getTracker(AsyncStreamReaderTrackerName), + getTracker(StreamTransportTrackerName), + getTracker(StreamServerTrackerName) + ] + for tracker in trackers: + if not isNil(tracker): + # echo tracker.dump() + check tracker.isLeaked() == false + test "handle identify message": proc testHandle(): Future[bool] {.async.} = let ma: MultiAddress = Multiaddress.init("/ip4/0.0.0.0/tcp/0") @@ -51,8 +69,11 @@ suite "Identify": await conn.close() await transport1.close() await serverFut + result = true + await transport2.close() + check: waitFor(testHandle()) == true @@ -63,9 +84,13 @@ suite "Identify": let identifyProto1 = newIdentify(remotePeerInfo) let msListen = newMultistream() + let done = newFuture[void]() + msListen.addHandler(IdentifyCodec, identifyProto1) proc connHandler(conn: Connection): Future[void] {.async, gcsafe.} = await msListen.handle(conn) + await conn.close() + done.complete() let transport1: TcpTransport = newTransport(TcpTransport) asyncCheck transport1.listen(ma, connHandler) @@ -76,9 +101,15 @@ suite "Identify": var localPeerInfo = PeerInfo.init(PrivateKey.random(RSA), [ma]) let identifyProto2 = newIdentify(localPeerInfo) - discard await msDial.select(conn, IdentifyCodec) - discard await identifyProto2.identify(conn, PeerInfo.init(PrivateKey.random(RSA))) - await conn.close() + + try: + discard await msDial.select(conn, IdentifyCodec) + discard await identifyProto2.identify(conn, PeerInfo.init(PrivateKey.random(RSA))) + finally: + await done.wait(5000.millis) # when no issues will not wait that long! + await conn.close() + await transport2.close() + await transport1.close() expect IdentityNoMatchError: waitFor(testHandleError()) diff --git a/tests/testmplex.nim b/tests/testmplex.nim index b538152..12fa5d8 100644 --- a/tests/testmplex.nim +++ b/tests/testmplex.nim @@ -1,6 +1,7 @@ import unittest, sequtils, sugar, strformat, options, strformat, random import chronos, nimcrypto/utils, chronicles -import ../libp2p/[connection, +import ../libp2p/[errors, + connection, stream/lpstream, stream/bufferstream, transports/tcptransport, @@ -16,7 +17,26 @@ import ../libp2p/[connection, when defined(nimHasUsed): {.used.} +const + StreamTransportTrackerName = "stream.transport" + StreamServerTrackerName = "stream.server" + suite "Mplex": + teardown: + let + trackers = [ + getTracker(BufferStreamTrackerName), + getTracker(AsyncStreamWriterTrackerName), + getTracker(TcpTransportTrackerName), + getTracker(AsyncStreamReaderTrackerName), + getTracker(StreamTransportTrackerName), + getTracker(StreamServerTrackerName) + ] + for tracker in trackers: + if not isNil(tracker): + # echo tracker.dump() + check tracker.isLeaked() == false + test "encode header with channel id 0": proc testEncodeHeader(): Future[bool] {.async.} = proc encHandler(msg: seq[byte]) {.async.} = @@ -25,8 +45,11 @@ suite "Mplex": let stream = newBufferStream(encHandler) let conn = newConnection(stream) await conn.writeMsg(0, MessageType.New, cast[seq[byte]]("stream 1")) + result = true + await stream.close() + check: waitFor(testEncodeHeader()) == true @@ -38,8 +61,11 @@ suite "Mplex": let stream = newBufferStream(encHandler) let conn = newConnection(stream) await conn.writeMsg(17, MessageType.New, cast[seq[byte]]("stream 1")) + result = true + await stream.close() + check: waitFor(testEncodeHeader()) == true @@ -52,8 +78,11 @@ suite "Mplex": let stream = newBufferStream(encHandler) let conn = newConnection(stream) await conn.writeMsg(0, MessageType.MsgOut, cast[seq[byte]]("stream 1")) + result = true + await stream.close() + check: waitFor(testEncodeHeaderBody()) == true @@ -67,8 +96,11 @@ suite "Mplex": let conn = newConnection(stream) await conn.writeMsg(17, MessageType.MsgOut, cast[seq[byte]]("stream 1")) await conn.close() + result = true + await stream.close() + check: waitFor(testEncodeHeaderBody()) == true @@ -81,8 +113,11 @@ suite "Mplex": check msg.id == 0 check msg.msgType == MessageType.New + result = true + await stream.close() + check: waitFor(testDecodeHeader()) == true @@ -96,8 +131,11 @@ suite "Mplex": check msg.id == 0 check msg.msgType == MessageType.MsgOut check cast[string](msg.data) == "hello from channel 0!!" + result = true + await stream.close() + check: waitFor(testDecodeHeader()) == true @@ -111,8 +149,11 @@ suite "Mplex": check msg.id == 17 check msg.msgType == MessageType.MsgOut check cast[string](msg.data) == "hello from channel 0!!" + result = true + await stream.close() + check: waitFor(testDecodeHeader()) == true @@ -120,21 +161,25 @@ suite "Mplex": proc testNewStream(): Future[bool] {.async.} = let ma: MultiAddress = Multiaddress.init("/ip4/0.0.0.0/tcp/0") + var + done = newFuture[void]() + done2 = newFuture[void]() + proc connHandler(conn: Connection) {.async, gcsafe.} = proc handleMplexListen(stream: Connection) {.async, gcsafe.} = let msg = await stream.readLp() check cast[string](msg) == "Hello from stream!" await stream.close() + done.complete() let mplexListen = newMplex(conn) mplexListen.streamHandler = handleMplexListen - discard mplexListen.handle() + await mplexListen.handle() + await conn.close() + done2.complete() let transport1: TcpTransport = newTransport(TcpTransport) - discard await transport1.listen(ma, connHandler) - - defer: - await transport1.close() + let lfut = await transport1.listen(ma, connHandler) let transport2: TcpTransport = newTransport(TcpTransport) let conn = await transport2.dial(transport1.ma) @@ -145,8 +190,17 @@ suite "Mplex": await stream.writeLp("Hello from stream!") await conn.close() check openState # not lazy + result = true + await done.wait(5000.millis) + await done2.wait(5000.millis) + await stream.close() + await conn.close() + await transport2.close() + await transport1.close() + await lfut + check: waitFor(testNewStream()) == true @@ -154,15 +208,21 @@ suite "Mplex": proc testNewStream(): Future[bool] {.async.} = let ma: MultiAddress = Multiaddress.init("/ip4/0.0.0.0/tcp/0") + var + done = newFuture[void]() + done2 = newFuture[void]() + proc connHandler(conn: Connection) {.async, gcsafe.} = proc handleMplexListen(stream: Connection) {.async, gcsafe.} = let msg = await stream.readLp() check cast[string](msg) == "Hello from stream!" await stream.close() + done.complete() let mplexListen = newMplex(conn) mplexListen.streamHandler = handleMplexListen - discard mplexListen.handle() + await mplexListen.handle() + done2.complete() let transport1: TcpTransport = newTransport(TcpTransport) let listenFut = await transport1.listen(ma, connHandler) @@ -179,7 +239,12 @@ suite "Mplex": check not openState # assert lazy result = true + await done.wait(5000.millis) + await done2.wait(5000.millis) + await conn.close() + await stream.close() await mplexDial.close() + await transport2.close() await transport1.close() await listenFut @@ -214,8 +279,6 @@ suite "Mplex": let transport2: TcpTransport = newTransport(TcpTransport) let conn = await transport2.dial(transport1.ma) - defer: - await conn.close() let mplexDial = newMplex(conn) let stream = await mplexDial.newStream() @@ -228,7 +291,10 @@ suite "Mplex": result = true + await stream.close() await mplexDial.close() + await conn.close() + await transport2.close() await transport1.close() await listenFut @@ -239,10 +305,13 @@ suite "Mplex": proc testNewStream(): Future[bool] {.async.} = let ma: MultiAddress = Multiaddress.init("/ip4/0.0.0.0/tcp/0") + let done = newFuture[void]() + proc connHandler(conn: Connection) {.async, gcsafe.} = proc handleMplexListen(stream: Connection) {.async, gcsafe.} = await stream.writeLp("Hello from stream!") await stream.close() + done.complete() let mplexListen = newMplex(conn) mplexListen.streamHandler = handleMplexListen @@ -259,11 +328,15 @@ suite "Mplex": let stream = await mplexDial.newStream("DIALER") let msg = cast[string](await stream.readLp()) check msg == "Hello from stream!" - await conn.close() + # await dialFut result = true + await done.wait(5000.millis) + await stream.close() + await conn.close() await mplexDial.close() + await transport2.close() await transport1.close() await listenFut @@ -274,6 +347,8 @@ suite "Mplex": proc testNewStream(): Future[bool] {.async.} = let ma: MultiAddress = Multiaddress.init("/ip4/0.0.0.0/tcp/0") + let done = newFuture[void]() + var count = 1 var listenConn: Connection proc connHandler(conn: Connection) {.async, gcsafe.} = @@ -282,6 +357,8 @@ suite "Mplex": check cast[string](msg) == &"stream {count}!" count.inc await stream.close() + if count == 10: + done.complete() listenConn = conn let mplexListen = newMplex(conn) @@ -300,9 +377,9 @@ suite "Mplex": await stream.writeLp(&"stream {i}!") await stream.close() - await sleepAsync(1.seconds) # allow messages to get to the handler - await conn.close() # TODO: chronos sockets don't seem to have half-closed functionality - + await done.wait(5000.millis) + await conn.close() + await transport2.close() await mplexDial.close() await listenConn.close() await transport1.close() @@ -318,8 +395,8 @@ suite "Mplex": let ma: MultiAddress = Multiaddress.init("/ip4/0.0.0.0/tcp/0") var count = 1 - var listenFut: Future[void] var listenConn: Connection + let done = newFuture[void]() proc connHandler(conn: Connection) {.async, gcsafe.} = listenConn = conn proc handleMplexListen(stream: Connection) {.async, gcsafe.} = @@ -328,12 +405,12 @@ suite "Mplex": await stream.writeLp(&"stream {count} from listener!") count.inc await stream.close() + if count == 10: + done.complete() let mplexListen = newMplex(conn) mplexListen.streamHandler = handleMplexListen - listenFut = mplexListen.handle() - listenFut.addCallback(proc(udata: pointer) {.gcsafe.} - = trace "completed listener") + await mplexListen.handle() let transport1: TcpTransport = newTransport(TcpTransport) let transportFut = await transport1.listen(ma, connHandler) @@ -352,9 +429,12 @@ suite "Mplex": check cast[string](msg) == &"stream {i} from listener!" await stream.close() + await done.wait(5.seconds) await conn.close() await listenConn.close() - await allFutures(dialFut, listenFut) + await allFuturesThrowing(dialFut) + await mplexDial.close() + await transport2.close() await transport1.close() await transportFut result = true @@ -365,9 +445,16 @@ suite "Mplex": test "half closed - channel should close for write": proc testClosedForWrite(): Future[void] {.async.} = proc writeHandler(data: seq[byte]) {.async, gcsafe.} = discard - let chann = newChannel(1, newConnection(newBufferStream(writeHandler)), true) - await chann.close() - await chann.write("Hello") + let + buff = newBufferStream(writeHandler) + conn = newConnection(buff) + chann = newChannel(1, conn, true) + try: + await chann.close() + await chann.write("Hello") + finally: + await chann.cleanUp() + await conn.close() expect LPStreamEOFError: waitFor(testClosedForWrite()) @@ -375,12 +462,19 @@ suite "Mplex": test "half closed - channel should close for read by remote": proc testClosedForRead(): Future[void] {.async.} = proc writeHandler(data: seq[byte]) {.async, gcsafe.} = discard - let chann = newChannel(1, newConnection(newBufferStream(writeHandler)), true) + let + buff = newBufferStream(writeHandler) + conn = newConnection(buff) + chann = newChannel(1, conn, true) - await chann.pushTo(cast[seq[byte]]("Hello!")) - await chann.closedByRemote() - discard await chann.read() # this should work, since there is data in the buffer - discard await chann.read() # this should throw + try: + await chann.pushTo(cast[seq[byte]]("Hello!")) + await chann.closedByRemote() + discard await chann.read() # this should work, since there is data in the buffer + discard await chann.read() # this should throw + finally: + await chann.cleanUp() + await conn.close() expect LPStreamEOFError: waitFor(testClosedForRead()) @@ -445,6 +539,7 @@ suite "Mplex": await conn.close() await complete + await transport2.close() await transport1.close() await listenFut @@ -502,7 +597,7 @@ suite "Mplex": await stream.close() await conn.close() await complete - + await transport2.close() await transport1.close() await listenFut @@ -514,10 +609,18 @@ suite "Mplex": test "reset - channel should fail reading": proc testResetRead(): Future[void] {.async.} = proc writeHandler(data: seq[byte]) {.async, gcsafe.} = discard - let chann = newChannel(1, newConnection(newBufferStream(writeHandler)), true) - await chann.reset() - var data = await chann.read() - doAssert(len(data) == 1) + let + buff = newBufferStream(writeHandler) + conn = newConnection(buff) + chann = newChannel(1, conn, true) + + try: + await chann.reset() + var data = await chann.read() + doAssert(len(data) == 1) + finally: + await chann.cleanUp() + await conn.close() expect LPStreamEOFError: waitFor(testResetRead()) @@ -525,9 +628,16 @@ suite "Mplex": test "reset - channel should fail writing": proc testResetWrite(): Future[void] {.async.} = proc writeHandler(data: seq[byte]) {.async, gcsafe.} = discard - let chann = newChannel(1, newConnection(newBufferStream(writeHandler)), true) - await chann.reset() - await chann.write(cast[seq[byte]]("Hello!")) + let + buff = newBufferStream(writeHandler) + conn = newConnection(buff) + chann = newChannel(1, conn, true) + try: + await chann.reset() + await chann.write(cast[seq[byte]]("Hello!")) + finally: + await chann.cleanUp() + await conn.close() expect LPStreamEOFError: waitFor(testResetWrite()) @@ -535,9 +645,16 @@ suite "Mplex": test "should not allow pushing data to channel when remote end closed": proc testResetWrite(): Future[void] {.async.} = proc writeHandler(data: seq[byte]) {.async, gcsafe.} = discard - let chann = newChannel(1, newConnection(newBufferStream(writeHandler)), true) - await chann.closedByRemote() - await chann.pushTo(@[byte(1)]) + let + buff = newBufferStream(writeHandler) + conn = newConnection(buff) + chann = newChannel(1, conn, true) + try: + await chann.closedByRemote() + await chann.pushTo(@[byte(1)]) + finally: + await chann.cleanUp() + await conn.close() expect LPStreamEOFError: waitFor(testResetWrite()) diff --git a/tests/testmultistream.nim b/tests/testmultistream.nim index 91a32e8..9072aac 100644 --- a/tests/testmultistream.nim +++ b/tests/testmultistream.nim @@ -1,6 +1,7 @@ import unittest, strutils, sequtils, strformat, options import chronos -import ../libp2p/connection, +import ../libp2p/errors, + ../libp2p/connection, ../libp2p/multistream, ../libp2p/stream/lpstream, ../libp2p/stream/bufferstream, @@ -20,6 +21,10 @@ type TestSelectStream = ref object of LPStream step*: int +const + StreamTransportTrackerName = "stream.transport" + StreamServerTrackerName = "stream.server" + method readExactly*(s: TestSelectStream, pbytes: pointer, nbytes: int): Future[void] {.async, gcsafe.} = @@ -155,11 +160,27 @@ proc newTestNaStream(na: NaHandler): TestNaStream = result.step = 1 suite "Multistream select": + teardown: + let + trackers = [ + # getTracker(ConnectionTrackerName), + getTracker(AsyncStreamWriterTrackerName), + getTracker(TcpTransportTrackerName), + getTracker(AsyncStreamReaderTrackerName), + getTracker(StreamTransportTrackerName), + getTracker(StreamServerTrackerName) + ] + for tracker in trackers: + if not isNil(tracker): + # echo tracker.dump() + check tracker.isLeaked() == false + test "test select custom proto": proc testSelect(): Future[bool] {.async.} = let ms = newMultistream() let conn = newConnection(newTestSelectStream()) result = (await ms.select(conn, @["/test/proto/1.0.0"])) == "/test/proto/1.0.0" + await conn.close() check: waitFor(testSelect()) == true @@ -190,10 +211,12 @@ suite "Multistream select": proc testLsHandler(proto: seq[byte]) {.async, gcsafe.} # forward declaration let conn = newConnection(newTestLsStream(testLsHandler)) + let done = newFuture[void]() proc testLsHandler(proto: seq[byte]) {.async, gcsafe.} = var strProto: string = cast[string](proto) check strProto == "\x26/test/proto1/1.0.0\n/test/proto2/1.0.0\n" await conn.close() + done.complete() proc testHandler(conn: Connection, proto: string): Future[void] {.async, gcsafe.} = discard @@ -204,6 +227,8 @@ suite "Multistream select": await ms.handle(conn) result = true + await done.wait(5.seconds) + check: waitFor(testLs()) == true @@ -235,6 +260,10 @@ suite "Multistream select": proc endToEnd(): Future[bool] {.async.} = let ma: MultiAddress = Multiaddress.init("/ip4/0.0.0.0/tcp/0") + let + handlerWait1 = newFuture[void]() + handlerWait2 = newFuture[void]() + var protocol: LPProtocol = new LPProtocol proc testHandler(conn: Connection, proto: string): @@ -242,6 +271,7 @@ suite "Multistream select": check proto == "/test/proto/1.0.0" await conn.writeLp("Hello!") await conn.close() + handlerWait1.complete() protocol.handler = testHandler let msListen = newMultistream() @@ -249,6 +279,8 @@ suite "Multistream select": proc connHandler(conn: Connection): Future[void] {.async, gcsafe.} = await msListen.handle(conn) + await conn.close() + handlerWait2.complete() let transport1: TcpTransport = newTransport(TcpTransport) asyncCheck transport1.listen(ma, connHandler) @@ -262,6 +294,11 @@ suite "Multistream select": let hello = cast[string](await conn.readLp()) result = hello == "Hello!" await conn.close() + + await transport2.close() + await transport1.close() + + await allFuturesThrowing(handlerWait1.wait(5000.millis) #[if OK won't happen!!]#, handlerWait2.wait(5000.millis) #[if OK won't happen!!]#) check: waitFor(endToEnd()) == true @@ -270,13 +307,21 @@ suite "Multistream select": proc endToEnd(): Future[bool] {.async.} = let ma: MultiAddress = Multiaddress.init("/ip4/0.0.0.0/tcp/0") + let + handlerWait = newFuture[void]() + let msListen = newMultistream() var protocol: LPProtocol = new LPProtocol protocol.handler = proc(conn: Connection, proto: string) {.async, gcsafe.} = - await conn.close() + # never reached + discard + proc testHandler(conn: Connection, proto: string): - Future[void] {.async.} = discard + Future[void] {.async.} = + # never reached + discard + protocol.handler = testHandler msListen.addHandler("/test/proto1/1.0.0", protocol) msListen.addHandler("/test/proto2/1.0.0", protocol) @@ -284,6 +329,8 @@ suite "Multistream select": let transport1: TcpTransport = newTransport(TcpTransport) proc connHandler(conn: Connection): Future[void] {.async, gcsafe.} = await msListen.handle(conn) + handlerWait.complete() + asyncCheck transport1.listen(ma, connHandler) let msDial = newMultistream() @@ -292,9 +339,15 @@ suite "Multistream select": let ls = await msDial.list(conn) let protos: seq[string] = @["/test/proto1/1.0.0", "/test/proto2/1.0.0"] - await conn.close() + result = ls == protos + await conn.close() + await transport2.close() + await transport1.close() + + await handlerWait.wait(5000.millis) # when no issues will not wait that long! + check: waitFor(endToEnd()) == true @@ -329,7 +382,11 @@ suite "Multistream select": let hello = cast[string](await conn.readLp()) result = hello == "Hello!" + await conn.close() + await transport2.close() + await transport1.close() + check: waitFor(endToEnd()) == true @@ -363,7 +420,10 @@ suite "Multistream select": check (await msDial.select(conn, @["/test/proto2/1.0.0", "/test/proto1/1.0.0"])) == "/test/proto2/1.0.0" result = cast[string](await conn.readLp()) == "Hello from /test/proto2/1.0.0!" + await conn.close() + await transport2.close() + await transport1.close() check: waitFor(endToEnd()) == true diff --git a/tests/testnative.nim b/tests/testnative.nim index a5dc079..be262fc 100644 --- a/tests/testnative.nim +++ b/tests/testnative.nim @@ -1,6 +1,16 @@ import testvarint -import testrsa, testecnist, tested25519, testsecp256k1, testcrypto -import testmultibase, testmultihash, testmultiaddress, testcid, testpeer + +import testrsa, + testecnist, + tested25519, + testsecp256k1, + testcrypto + +import testmultibase, + testmultihash, + testmultiaddress, + testcid, + testpeer import testtransport, testmultistream, @@ -9,7 +19,5 @@ import testtransport, testswitch, testnoise, testpeerinfo, - pubsub/testpubsub, - # TODO: placing this before pubsub tests, - # breaks some flood and gossip tests - no idea why - testmplex + testmplex, + pubsub/testpubsub diff --git a/tests/testnoise.nim b/tests/testnoise.nim index 560496a..035bd7c 100644 --- a/tests/testnoise.nim +++ b/tests/testnoise.nim @@ -13,7 +13,9 @@ import chronicles import nimcrypto/sysrand import ../libp2p/crypto/crypto import ../libp2p/[switch, + errors, multistream, + stream/bufferstream, protocols/identify, connection, transports/transport, @@ -29,7 +31,10 @@ import ../libp2p/[switch, protocols/secure/noise, protocols/secure/secure] -const TestCodec = "/test/proto/1.0.0" +const + TestCodec = "/test/proto/1.0.0" + StreamTransportTrackerName = "stream.transport" + StreamServerTrackerName = "stream.server" type TestProto = ref object of LPProtocol @@ -64,6 +69,21 @@ proc createSwitch(ma: MultiAddress; outgoing: bool): (Switch, PeerInfo) = result = (switch, peerInfo) suite "Noise": + teardown: + let + trackers = [ + getTracker(BufferStreamTrackerName), + getTracker(AsyncStreamWriterTrackerName), + getTracker(TcpTransportTrackerName), + getTracker(AsyncStreamReaderTrackerName), + getTracker(StreamTransportTrackerName), + getTracker(StreamServerTrackerName) + ] + for tracker in trackers: + if not isNil(tracker): + # echo tracker.dump() + check tracker.isLeaked() == false + test "e2e: handle write + noise": proc testListenerDialer(): Future[bool] {.async.} = let @@ -75,6 +95,7 @@ suite "Noise": let sconn = await serverNoise.secure(conn) defer: await sconn.close() + await conn.close() await sconn.write(cstring("Hello!"), 6) let @@ -91,7 +112,9 @@ suite "Noise": msg = await sconn.read(6) await sconn.close() + await conn.close() await transport1.close() + await transport2.close() result = cast[string](msg) == "Hello!" @@ -110,6 +133,7 @@ suite "Noise": let sconn = await serverNoise.secure(conn) defer: await sconn.close() + await conn.close() let msg = await sconn.read(6) check cast[string](msg) == "Hello!" readTask.complete() @@ -128,53 +152,58 @@ suite "Noise": await sconn.write("Hello!".cstring, 6) await readTask await sconn.close() + await conn.close() await transport1.close() + await transport2.close() result = true check: waitFor(testListenerDialer()) == true - # test "e2e: handle read + noise fragmented": - # proc testListenerDialer(): Future[bool] {.async.} = - # let - # server: MultiAddress = Multiaddress.init("/ip4/0.0.0.0/tcp/0") - # serverInfo = PeerInfo.init(PrivateKey.random(RSA), [server]) - # serverNoise = newNoise(serverInfo.privateKey, outgoing = false) - # readTask = newFuture[void]() + test "e2e: handle read + noise fragmented": + proc testListenerDialer(): Future[bool] {.async.} = + let + server: MultiAddress = Multiaddress.init("/ip4/0.0.0.0/tcp/0") + serverInfo = PeerInfo.init(PrivateKey.random(RSA), [server]) + serverNoise = newNoise(serverInfo.privateKey, outgoing = false) + readTask = newFuture[void]() - # var hugePayload = newSeq[byte](0xFFFFF) - # check randomBytes(hugePayload) == hugePayload.len - # trace "Sending huge payload", size = hugePayload.len + var hugePayload = newSeq[byte](0xFFFFF) + check randomBytes(hugePayload) == hugePayload.len + trace "Sending huge payload", size = hugePayload.len - # proc connHandler(conn: Connection) {.async, gcsafe.} = - # let sconn = await serverNoise.secure(conn) - # defer: - # await sconn.close() - # let msg = await sconn.readLp() - # check msg == hugePayload - # readTask.complete() + proc connHandler(conn: Connection) {.async, gcsafe.} = + let sconn = await serverNoise.secure(conn) + defer: + await sconn.close() + let msg = await sconn.readLp() + check msg == hugePayload + readTask.complete() - # let - # transport1: TcpTransport = newTransport(TcpTransport) - # asyncCheck await transport1.listen(server, connHandler) + let + transport1: TcpTransport = newTransport(TcpTransport) + asyncCheck await transport1.listen(server, connHandler) - # let - # transport2: TcpTransport = newTransport(TcpTransport) - # clientInfo = PeerInfo.init(PrivateKey.random(RSA), [transport1.ma]) - # clientNoise = newNoise(clientInfo.privateKey, outgoing = true) - # conn = await transport2.dial(transport1.ma) - # sconn = await clientNoise.secure(conn) + let + transport2: TcpTransport = newTransport(TcpTransport) + clientInfo = PeerInfo.init(PrivateKey.random(RSA), [transport1.ma]) + clientNoise = newNoise(clientInfo.privateKey, outgoing = true) + conn = await transport2.dial(transport1.ma) + sconn = await clientNoise.secure(conn) - # await sconn.writeLp(hugePayload) - # await readTask - # await sconn.close() - # await transport1.close() + await sconn.writeLp(hugePayload) + await readTask - # result = true + await sconn.close() + await conn.close() + await transport2.close() + await transport1.close() - # check: - # waitFor(testListenerDialer()) == true + result = true + + check: + waitFor(testListenerDialer()) == true test "e2e use switch dial proto string": proc testSwitch(): Future[bool] {.async, gcsafe.} = @@ -199,8 +228,8 @@ suite "Noise": let msg = cast[string](await conn.readLp()) check "Hello!" == msg - await allFutures(switch1.stop(), switch2.stop()) - await allFutures(awaiters) + await allFuturesThrowing(switch1.stop(), switch2.stop()) + await allFuturesThrowing(awaiters) result = true check: diff --git a/tests/testpeerinfo.nim b/tests/testpeerinfo.nim index 0396a30..c2bdc9c 100644 --- a/tests/testpeerinfo.nim +++ b/tests/testpeerinfo.nim @@ -5,7 +5,24 @@ import ../libp2p/crypto/crypto, ../libp2p/peerinfo, ../libp2p/peer +const + StreamTransportTrackerName = "stream.transport" + StreamServerTrackerName = "stream.server" + suite "PeerInfo": + teardown: + let + trackers = [ + getTracker(AsyncStreamWriterTrackerName), + getTracker(AsyncStreamReaderTrackerName), + getTracker(StreamTransportTrackerName), + getTracker(StreamServerTrackerName) + ] + for tracker in trackers: + if not isNil(tracker): + # echo tracker.dump() + check tracker.isLeaked() == false + test "Should init with private key": let seckey = PrivateKey.random(RSA) var peerInfo = PeerInfo.init(seckey) diff --git a/tests/testswitch.nim b/tests/testswitch.nim index 3e4e28c..de054dd 100644 --- a/tests/testswitch.nim +++ b/tests/testswitch.nim @@ -2,8 +2,10 @@ import unittest, tables import chronos import chronicles import nimcrypto/sysrand -import ../libp2p/[switch, +import ../libp2p/[errors, + switch, multistream, + stream/bufferstream, protocols/identify, connection, transports/transport, @@ -22,7 +24,10 @@ import ../libp2p/[switch, when defined(nimHasUsed): {.used.} -const TestCodec = "/test/proto/1.0.0" +const + TestCodec = "/test/proto/1.0.0" + StreamTransportTrackerName = "stream.transport" + StreamServerTrackerName = "stream.server" type TestProto = ref object of LPProtocol @@ -47,6 +52,22 @@ proc createSwitch(ma: MultiAddress): (Switch, PeerInfo) = result = (switch, peerInfo) suite "Switch": + teardown: + let + trackers = [ + # getTracker(ConnectionTrackerName), + getTracker(BufferStreamTrackerName), + getTracker(AsyncStreamWriterTrackerName), + getTracker(TcpTransportTrackerName), + getTracker(AsyncStreamReaderTrackerName), + getTracker(StreamTransportTrackerName), + getTracker(StreamServerTrackerName) + ] + for tracker in trackers: + if not isNil(tracker): + # echo tracker.dump() + check tracker.isLeaked() == false + test "e2e use switch dial proto string": proc testSwitch(): Future[bool] {.async, gcsafe.} = let ma1: MultiAddress = Multiaddress.init("/ip4/0.0.0.0/tcp/0") @@ -58,11 +79,14 @@ suite "Switch": (switch1, peerInfo1) = createSwitch(ma1) + let done = newFuture[void]() + proc handle(conn: Connection, proto: string) {.async, gcsafe.} = let msg = cast[string](await conn.readLp()) check "Hello!" == msg await conn.writeLp("Hello!") await conn.close() + done.complete() let testProto = new TestProto testProto.codec = TestCodec @@ -83,8 +107,15 @@ suite "Switch": except LPStreamError: result = false - await allFutures(switch1.stop(), switch2.stop()) - await allFutures(awaiters) + await allFuturesThrowing( + done.wait(5000.millis) #[if OK won't happen!!]#, + conn.close(), + switch1.stop(), + switch2.stop(), + ) + + # this needs to go at end + await allFuturesThrowing(awaiters) check: waitFor(testSwitch()) == true @@ -125,8 +156,12 @@ suite "Switch": except LPStreamError: result = false - await allFutures(switch1.stop(), switch2.stop()) - await allFutures(awaiters) + await allFuturesThrowing( + conn.close(), + switch1.stop(), + switch2.stop() + ) + await allFuturesThrowing(awaiters) check: waitFor(testSwitch()) == true @@ -164,7 +199,10 @@ suite "Switch": # await sconn.write(hugePayload) # await readTask + # await sconn.close() + # await conn.close() + # await transport2.close() # await transport1.close() # result = true diff --git a/tests/testtransport.nim b/tests/testtransport.nim index 1c0205b..a0992da 100644 --- a/tests/testtransport.nim +++ b/tests/testtransport.nim @@ -1,6 +1,7 @@ import unittest import chronos -import ../libp2p/[connection, +import ../libp2p/[errors, + connection, transports/transport, transports/tcptransport, multiaddress, @@ -8,19 +9,45 @@ import ../libp2p/[connection, when defined(nimHasUsed): {.used.} +const + StreamTransportTrackerName = "stream.transport" + StreamServerTrackerName = "stream.server" + +template ignoreErrors(body: untyped): untyped = + try: + body + except: + echo getCurrentExceptionMsg() + suite "TCP transport": + teardown: + check: + # getTracker(ConnectionTrackerName).isLeaked() == false + getTracker(AsyncStreamReaderTrackerName).isLeaked() == false + getTracker(AsyncStreamWriterTrackerName).isLeaked() == false + getTracker(StreamTransportTrackerName).isLeaked() == false + getTracker(StreamServerTrackerName).isLeaked() == false + test "test listener: handle write": proc testListener(): Future[bool] {.async, gcsafe.} = let ma: MultiAddress = Multiaddress.init("/ip4/0.0.0.0/tcp/0") - proc connHandler(conn: Connection): Future[void] {.async, gcsafe.} = - result = conn.write(cstring("Hello!"), 6) + let handlerWait = newFuture[void]() + proc connHandler(conn: Connection) {.async, gcsafe.} = + await conn.write(cstring("Hello!"), 6) + await conn.close() + handlerWait.complete() let transport: TcpTransport = newTransport(TcpTransport) - asyncCheck await transport.listen(ma, connHandler) - let streamTransport: StreamTransport = await connect(transport.ma) + + asyncCheck transport.listen(ma, connHandler) + + let streamTransport = await connect(transport.ma) + let msg = await streamTransport.read(6) - await transport.close() + + await handlerWait.wait(5000.millis) # when no issues will not wait that long! await streamTransport.closeWait() + await transport.close() result = cast[string](msg) == "Hello!" @@ -30,14 +57,22 @@ suite "TCP transport": test "test listener: handle read": proc testListener(): Future[bool] {.async.} = let ma: MultiAddress = Multiaddress.init("/ip4/0.0.0.0/tcp/0") - proc connHandler(conn: Connection): Future[void] {.async, gcsafe.} = + let handlerWait = newFuture[void]() + proc connHandler(conn: Connection) {.async, gcsafe.} = let msg = await conn.read(6) check cast[string](msg) == "Hello!" + await conn.close() + handlerWait.complete() let transport: TcpTransport = newTransport(TcpTransport) asyncCheck await transport.listen(ma, connHandler) let streamTransport: StreamTransport = await connect(transport.ma) let sent = await streamTransport.write("Hello!", 6) + + await handlerWait.wait(5000.millis) # when no issues will not wait that long! + await streamTransport.closeWait() + await transport.close() + result = sent == 6 check: @@ -45,6 +80,7 @@ suite "TCP transport": test "test dialer: handle write": proc testDialer(address: TransportAddress): Future[bool] {.async.} = + let handlerWait = newFuture[void]() proc serveClient(server: StreamServer, transp: StreamTransport) {.async, gcsafe.} = var wstream = newAsyncStreamWriter(transp) @@ -54,6 +90,7 @@ suite "TCP transport": await transp.closeWait() server.stop() server.close() + handlerWait.complete() var server = createStreamServer(address, serveClient, {ReuseAddr}) server.start() @@ -64,13 +101,21 @@ suite "TCP transport": let msg = await conn.read(6) result = cast[string](msg) == "Hello!" + await handlerWait.wait(5000.millis) # when no issues will not wait that long! + + await conn.close() + await transport.close() + server.stop() server.close() await server.join() - check waitFor(testDialer(initTAddress("0.0.0.0:0"))) == true + + check: + waitFor(testDialer(initTAddress("0.0.0.0:0"))) == true test "test dialer: handle write": proc testDialer(address: TransportAddress): Future[bool] {.async, gcsafe.} = + let handlerWait = newFuture[void]() proc serveClient(server: StreamServer, transp: StreamTransport) {.async, gcsafe.} = var rstream = newAsyncStreamReader(transp) @@ -81,6 +126,7 @@ suite "TCP transport": await transp.closeWait() server.stop() server.close() + handlerWait.complete() var server = createStreamServer(address, serveClient, {ReuseAddr}) server.start() @@ -91,23 +137,37 @@ suite "TCP transport": await conn.write(cstring("Hello!"), 6) result = true + await handlerWait.wait(5000.millis) # when no issues will not wait that long! + + await conn.close() + await transport.close() + server.stop() server.close() await server.join() - check waitFor(testDialer(initTAddress("0.0.0.0:0"))) == true + check: + waitFor(testDialer(initTAddress("0.0.0.0:0"))) == true test "e2e: handle write": proc testListenerDialer(): Future[bool] {.async.} = let ma: MultiAddress = Multiaddress.init("/ip4/0.0.0.0/tcp/0") - proc connHandler(conn: Connection): Future[void] {.async, gcsafe.} = - result = conn.write(cstring("Hello!"), 6) + let handlerWait = newFuture[void]() + proc connHandler(conn: Connection) {.async, gcsafe.} = + await conn.write(cstring("Hello!"), 6) + await conn.close() + handlerWait.complete() let transport1: TcpTransport = newTransport(TcpTransport) - asyncCheck await transport1.listen(ma, connHandler) + asyncCheck transport1.listen(ma, connHandler) let transport2: TcpTransport = newTransport(TcpTransport) let conn = await transport2.dial(transport1.ma) let msg = await conn.read(6) + + await handlerWait.wait(5000.millis) # when no issues will not wait that long! + + await conn.close() + await transport2.close() await transport1.close() result = cast[string](msg) == "Hello!" @@ -118,16 +178,24 @@ suite "TCP transport": test "e2e: handle read": proc testListenerDialer(): Future[bool] {.async.} = let ma: MultiAddress = Multiaddress.init("/ip4/0.0.0.0/tcp/0") - proc connHandler(conn: Connection): Future[void] {.async, gcsafe.} = + let handlerWait = newFuture[void]() + proc connHandler(conn: Connection) {.async, gcsafe.} = let msg = await conn.read(6) check cast[string](msg) == "Hello!" + await conn.close() + handlerWait.complete() let transport1: TcpTransport = newTransport(TcpTransport) - asyncCheck await transport1.listen(ma, connHandler) + asyncCheck transport1.listen(ma, connHandler) let transport2: TcpTransport = newTransport(TcpTransport) let conn = await transport2.dial(transport1.ma) await conn.write(cstring("Hello!"), 6) + + await handlerWait.wait(5000.millis) # when no issues will not wait that long! + + await conn.close() + await transport2.close() await transport1.close() result = true