diff --git a/libp2p/connmanager.nim b/libp2p/connmanager.nim index b17bbf4..3e91fba 100644 --- a/libp2p/connmanager.nim +++ b/libp2p/connmanager.nim @@ -184,8 +184,8 @@ proc storeConn*(c: ConnManager, conn: Connection) = let peerId = conn.peerInfo.peerId if c.conns.getOrDefault(peerId).len > c.maxConns: - debug "too many connections", peer = conn, - conns = c.conns.getOrDefault(peerId).len + debug "too many connections", + conn, conns = c.conns.getOrDefault(peerId).len raise newTooManyConnections() @@ -200,7 +200,7 @@ proc storeConn*(c: ConnManager, conn: Connection) = libp2p_peers.set(c.conns.len.int64) trace "Stored connection", - connections = c.conns.len, conn, direction = $conn.dir + conn, direction = $conn.dir, connections = c.conns.len proc storeOutgoing*(c: ConnManager, conn: Connection) = conn.dir = Direction.Out @@ -226,7 +226,8 @@ proc storeMuxer*(c: ConnManager, muxer: muxer, handle: handle) - trace "Stored muxer", connections = c.conns.len, muxer + trace "Stored muxer", + muxer, handle = not handle.isNil, connections = c.conns.len proc getMuxedStream*(c: ConnManager, peerId: PeerID, @@ -263,7 +264,6 @@ proc dropPeer*(c: ConnManager, peerId: PeerID) {.async.} = trace "Dropping peer", peerId let conns = c.conns.getOrDefault(peerId) for conn in conns: - trace "Removing connection", conn delConn(c, conn) var muxers: seq[MuxerHolder] @@ -277,12 +277,15 @@ proc dropPeer*(c: ConnManager, peerId: PeerID) {.async.} = for conn in conns: await conn.close() + trace "Dropped peer", peerId proc close*(c: ConnManager) {.async.} = ## cleanup resources for the connection ## manager ## + + trace "Closing ConnManager" let conns = c.conns c.conns.clear() @@ -295,3 +298,5 @@ proc close*(c: ConnManager) {.async.} = for _, conns2 in conns: for conn in conns2: await conn.close() + + trace "Closed ConnManager" diff --git a/libp2p/multistream.nim b/libp2p/multistream.nim index 9bc120b..525ffe9 100644 --- a/libp2p/multistream.nim +++ b/libp2p/multistream.nim @@ -50,42 +50,42 @@ proc select*(m: MultistreamSelect, conn: Connection, proto: seq[string]): Future[string] {.async.} = - trace "initiating handshake", codec = m.codec + trace "initiating handshake", conn, codec = m.codec ## select a remote protocol await conn.write(m.codec) # write handshake if proto.len() > 0: - trace "selecting proto", proto = proto[0] + trace "selecting proto", conn, proto = proto[0] await conn.writeLp((proto[0] & "\n")) # select proto - var s = string.fromBytes((await conn.readLp(1024))) # read ms header + var s = string.fromBytes((await conn.readLp(MsgSize))) # read ms header validateSuffix(s) if s != Codec: - notice "handshake failed", codec = s + notice "handshake failed", conn, codec = s raise newException(CatchableError, "MultistreamSelect handshake failed") else: - trace "multistream handshake success" + trace "multistream handshake success", conn if proto.len() == 0: # no protocols, must be a handshake call return Codec else: - s = string.fromBytes(await conn.readLp(1024)) # read the first proto + s = string.fromBytes(await conn.readLp(MsgSize)) # read the first proto validateSuffix(s) - trace "reading first requested proto" + trace "reading first requested proto", conn if s == proto[0]: - trace "successfully selected ", proto = proto[0] + trace "successfully selected ", conn, proto = proto[0] return proto[0] elif proto.len > 1: # Try to negotiate alternatives let protos = proto[1.. 0: list.add(s) @@ -120,31 +120,31 @@ proc list*(m: MultistreamSelect, result = list proc handle*(m: MultistreamSelect, conn: Connection, active: bool = false) {.async, gcsafe.} = - trace "handle: starting multistream handling", handshaked = active + trace "Starting multistream handler", conn, handshaked = active var handshaked = active try: while not conn.atEof: - var ms = string.fromBytes(await conn.readLp(1024)) + var ms = string.fromBytes(await conn.readLp(MsgSize)) validateSuffix(ms) if not handshaked and ms != Codec: - error "expected handshake message", instead=ms + notice "expected handshake message", conn, instead=ms raise newException(CatchableError, "MultistreamSelect handling failed, invalid first message") - trace "handle: got request", ms + trace "handle: got request", conn, ms if ms.len() <= 0: - trace "handle: invalid proto" + trace "handle: invalid proto", conn await conn.write(Na) if m.handlers.len() == 0: - trace "handle: sending `na` for protocol", protocol = ms + trace "handle: sending `na` for protocol", conn, protocol = ms await conn.write(Na) continue case ms: of "ls": - trace "handle: listing protos" + trace "handle: listing protos", conn var protos = "" for h in m.handlers: protos &= (h.proto & "\n") @@ -154,25 +154,26 @@ proc handle*(m: MultistreamSelect, conn: Connection, active: bool = false) {.asy await conn.write(m.codec) handshaked = true else: - trace "handle: sending `na` for duplicate handshake while handshaked" + trace "handle: sending `na` for duplicate handshake while handshaked", + conn await conn.write(Na) else: for h in m.handlers: if (not isNil(h.match) and h.match(ms)) or ms == h.proto: - trace "found handler", protocol = ms + trace "found handler", conn, protocol = ms await conn.writeLp((h.proto & "\n")) await h.protocol.handler(conn, ms) return - debug "no handlers", protocol = ms + debug "no handlers", conn, protocol = ms await conn.write(Na) except CancelledError as exc: - await conn.close() raise exc except CatchableError as exc: - trace "exception in multistream", exc = exc.msg - await conn.close() + trace "Exception in multistream", conn, msg = exc.msg finally: - trace "leaving multistream loop" + await conn.close() + + trace "Stopped multistream handler", conn proc addHandler*(m: MultistreamSelect, codec: string, diff --git a/libp2p/muxers/mplex/lpchannel.nim b/libp2p/muxers/mplex/lpchannel.nim index 54c2a8f..530ccb6 100644 --- a/libp2p/muxers/mplex/lpchannel.nim +++ b/libp2p/muxers/mplex/lpchannel.nim @@ -133,7 +133,7 @@ method reset*(s: LPChannel) {.base, async, gcsafe.} = trace "channel already closed or reset", s return - trace "resetting channel", s + trace "Resetting channel", s asyncSpawn s.resetMessage() @@ -148,9 +148,9 @@ method reset*(s: LPChannel) {.base, async, gcsafe.} = except CancelledError as exc: raise exc except CatchableError as exc: - trace "exception in reset", exc = exc.msg, s + trace "Exception in reset", exc = exc.msg, s - trace "channel reset", s + trace "Channel reset", s method close*(s: LPChannel) {.async, gcsafe.} = if s.closedLocal: diff --git a/libp2p/protocols/pubsub/pubsubpeer.nim b/libp2p/protocols/pubsub/pubsubpeer.nim index ac8ca33..47962d7 100644 --- a/libp2p/protocols/pubsub/pubsubpeer.nim +++ b/libp2p/protocols/pubsub/pubsubpeer.nim @@ -218,7 +218,7 @@ proc sendImpl(p: PubSubPeer, msg: RPCMsg) {.async.} = try: conn = await p.getSendConn() if conn == nil: - debug "Couldn't get send connection, dropping message", peer = p + trace "Couldn't get send connection, dropping message", peer = p return trace "sending encoded msgs to peer", conn, encoded = shortLog(encoded) @@ -234,7 +234,7 @@ proc sendImpl(p: PubSubPeer, msg: RPCMsg) {.async.} = except CatchableError as exc: # Because we detach the send call from the currently executing task using # asyncCheck, no exceptions may leak out of it - debug "unable to send to remote", exc = exc.msg, peer = p + trace "Unable to send to remote", conn, exc = exc.msg # Next time sendConn is used, it will be have its close flag set and thus # will be recycled if not isNil(conn): diff --git a/libp2p/stream/connection.nim b/libp2p/stream/connection.nim index 20343ea..d4eac09 100644 --- a/libp2p/stream/connection.nim +++ b/libp2p/stream/connection.nim @@ -31,7 +31,7 @@ type Connection* = ref object of LPStream activity*: bool # reset every time data is sent or received timeout*: Duration # channel timeout if no activity - timerTaskFut: Future[void] # the current timer instanse + timerTaskFut: Future[void] # the current timer instance timeoutHandler*: TimeoutHandler # timeout handler peerInfo*: PeerInfo observedAddr*: Multiaddress @@ -83,10 +83,11 @@ method initStream*(s: Connection) = s.timeoutHandler = proc() {.async.} = await s.close() - trace "timeout set at", timeout = s.timeout.millis, s doAssert(isNil(s.timerTaskFut)) # doAssert(s.timeout > 0.millis) if s.timeout > 0.millis: + trace "Monitoring for timeout", s, timeout = s.timeout + s.timerTaskFut = s.timeoutMonitor() inc getConnectionTracker().opened diff --git a/libp2p/stream/streamseq.nim b/libp2p/stream/streamseq.nim index 30f62ad..b972edb 100644 --- a/libp2p/stream/streamseq.nim +++ b/libp2p/stream/streamseq.nim @@ -1,5 +1,7 @@ import stew/bitops2 +{.push raises: [Defect].} + type StreamSeq* = object # Seq adapted to the stream use case where we add data at the back and diff --git a/libp2p/switch.nim b/libp2p/switch.nim index 9269898..c8bf3c2 100644 --- a/libp2p/switch.nim +++ b/libp2p/switch.nim @@ -99,9 +99,8 @@ proc triggerConnEvent(s: Switch, peerId: PeerID, event: ConnEvent) {.async, gcsa except CancelledError as exc: raise exc except CatchableError as exc: # handlers should not raise! - warn "exception in trigger ConnEvents", exc = exc.msg, peerId - -proc disconnect*(s: Switch, peerId: PeerID) {.async, gcsafe.} + warn "Exception in triggerConnEvents", + msg = exc.msg, peerId, event = $event proc isConnected*(s: Switch, peerId: PeerID): bool = ## returns true if the peer has one or more @@ -114,18 +113,18 @@ proc secure(s: Switch, conn: Connection): Future[Connection] {.async, gcsafe.} = if s.secureManagers.len <= 0: raise newException(UpgradeFailedError, "No secure managers registered!") - let manager = await s.ms.select(conn, s.secureManagers.mapIt(it.codec)) - if manager.len == 0: + let codec = await s.ms.select(conn, s.secureManagers.mapIt(it.codec)) + if codec.len == 0: raise newException(UpgradeFailedError, "Unable to negotiate a secure channel!") - trace "Securing connection", codec = manager, conn - let secureProtocol = s.secureManagers.filterIt(it.codec == manager) + trace "Securing connection", conn, codec + let secureProtocol = s.secureManagers.filterIt(it.codec == codec) # ms.select should deal with the correctness of this # let's avoid duplicating checks but detect if it fails to do it properly doAssert(secureProtocol.len > 0) - result = await secureProtocol[0].secure(conn, true) + return await secureProtocol[0].secure(conn, true) proc identify(s: Switch, conn: Connection) {.async, gcsafe.} = ## identify the connection @@ -179,6 +178,8 @@ proc mux(s: Switch, conn: Connection): Future[Muxer] {.async, gcsafe.} = debug "no muxer available, early exit", conn return + trace "Found a muxer", conn, muxerName + # create new muxer for connection let muxer = s.muxers[muxerName].newMuxer(conn) @@ -186,16 +187,12 @@ proc mux(s: Switch, conn: Connection): Future[Muxer] {.async, gcsafe.} = muxer.streamHandler = s.streamHandler s.connManager.storeOutgoing(conn) - trace "Storing muxer", conn s.connManager.storeMuxer(muxer) - trace "found a muxer", name = muxerName, conn - # start muxer read loop - the future will complete when loop ends let handlerFut = muxer.handle() # store it in muxed connections if we have a peer for it - trace "Storing muxer with handler", conn s.connManager.storeMuxer(muxer, handlerFut) # update muxer with handler return muxer @@ -227,7 +224,7 @@ proc upgradeOutgoing(s: Switch, conn: Connection): Future[Connection] {.async, g # Identify is non-essential, though if it fails, it might indicate that # the connection was closed already - this will be picked up by the read # loop - debug "Could not identify connection", err = exc.msg, conn + debug "Could not identify connection", conn, msg = exc.msg if isNil(sconn.peerInfo): await sconn.close() @@ -246,7 +243,7 @@ proc upgradeIncoming(s: Switch, conn: Connection) {.async, gcsafe.} = proc securedHandler (conn: Connection, proto: string) {.async, gcsafe, closure.} = - trace "Securing connection", conn + trace "Starting secure handler", conn let secure = s.secureManagers.filterIt(it.codec == proto)[0] try: @@ -267,9 +264,9 @@ proc upgradeIncoming(s: Switch, conn: Connection) {.async, gcsafe.} = except CancelledError as exc: raise exc except CatchableError as exc: - debug "Exception in secure handler", err = exc.msg, conn + debug "Exception in secure handler", msg = exc.msg, conn - trace "Ending secured handler", conn + trace "Stopped secure handler", conn if (await ms.select(conn)): # just handshake # add the secure handlers @@ -315,10 +312,10 @@ proc internalConnect(s: Switch, let dialed = try: await t.dial(a) except CancelledError as exc: - trace "Dialing canceled", exc = exc.msg, peerId + trace "Dialing canceled", msg = exc.msg, peerId raise exc except CatchableError as exc: - trace "Dialing failed", exc = exc.msg, peerId + trace "Dialing failed", msg = exc.msg, peerId libp2p_failed_dials.inc() continue # Try the next address @@ -333,7 +330,7 @@ proc internalConnect(s: Switch, # If we failed to establish the connection through one transport, # we won't succeeed through another - no use in trying again await dialed.close() - debug "Upgrade failed", exc = exc.msg, peerId + debug "Upgrade failed", msg = exc.msg, peerId if exc isnot CancelledError: libp2p_failed_upgrade.inc() raise exc @@ -363,14 +360,11 @@ proc internalConnect(s: Switch, await conn.closeEvent.wait() await s.triggerConnEvent(peerId, ConnEvent(kind: ConnEventKind.Disconnected)) - except CancelledError: - # This is top-level procedure which will work as separate task, so it - # do not need to propogate CancelledError. - trace "Unexpected cancellation in switch peer connect cleanup", - conn except CatchableError as exc: - trace "Unexpected exception in switch peer connect cleanup", - errMsg = exc.msg, conn + # This is top-level procedure which will work as separate task, so it + # do not need to propogate CancelledError and should handle other errors + warn "Unexpected exception in switch peer connect cleanup", + conn, msg = exc.msg # All the errors are handled inside `cleanup()` procedure. asyncSpawn peerCleanup() @@ -381,7 +375,7 @@ proc connect*(s: Switch, peerId: PeerID, addrs: seq[MultiAddress]) {.async.} = discard await s.internalConnect(peerId, addrs) proc negotiateStream(s: Switch, conn: Connection, proto: string): Future[Connection] {.async.} = - trace "Negotiating stream", proto = proto, conn + trace "Negotiating stream", conn, proto if not await s.ms.select(conn, proto): await conn.close() raise newException(DialFailedError, "Unable to select sub-protocol " & proto) @@ -391,7 +385,8 @@ proc negotiateStream(s: Switch, conn: Connection, proto: string): Future[Connect proc dial*(s: Switch, peerId: PeerID, proto: string): Future[Connection] {.async.} = - let stream = await s.connmanager.getMuxedStream(peerId) + trace "Dialling (existing)", peerId, proto + let stream = await s.connManager.getMuxedStream(peerId) if stream.isNil: raise newException(DialFailedError, "Couldn't get muxed stream") @@ -402,7 +397,9 @@ proc dial*(s: Switch, addrs: seq[MultiAddress], proto: string): Future[Connection] {.async.} = + trace "Dialling (new)", peerId, proto let conn = await s.internalConnect(peerId, addrs) + trace "Opening stream", conn let stream = await s.connManager.getMuxedStream(conn) proc cleanup() {.async.} = @@ -419,11 +416,11 @@ proc dial*(s: Switch, return await s.negotiateStream(stream, proto) except CancelledError as exc: - trace "dial canceled", conn + trace "Dial canceled", conn await cleanup() raise exc except CatchableError as exc: - trace "Error dialing", exc = exc.msg, conn + trace "Error dialing", conn, msg = exc.msg await cleanup() raise exc @@ -448,7 +445,7 @@ proc start*(s: Switch): Future[seq[Future[void]]] {.async, gcsafe.} = except CancelledError as exc: raise exc except CatchableError as exc: - trace "Exception occurred in incoming handler", exc = exc.msg, conn + trace "Exception occurred in incoming handler", conn, msg = exc.msg finally: await conn.close() trace "Connection handler done", conn @@ -465,7 +462,7 @@ proc start*(s: Switch): Future[seq[Future[void]]] {.async, gcsafe.} = result = startFuts # listen for incoming connections proc stop*(s: Switch) {.async.} = - trace "stopping switch" + trace "Stopping switch" # close and cleanup all connections await s.connManager.close() @@ -476,9 +473,9 @@ proc stop*(s: Switch) {.async.} = except CancelledError as exc: raise exc except CatchableError as exc: - warn "error cleaning up transports" + warn "error cleaning up transports", msg = exc.msg - trace "switch stopped" + trace "Switch stopped" proc muxerHandler(s: Switch, muxer: Muxer) {.async, gcsafe.} = let @@ -501,7 +498,7 @@ proc muxerHandler(s: Switch, muxer: Muxer) {.async, gcsafe.} = # Identify is non-essential, though if it fails, it might indicate that # the connection was closed already - this will be picked up by the read # loop - debug "Could not identify connection", err = exc.msg, conn + debug "Could not identify connection", conn, msg = exc.msg try: let peerId = conn.peerInfo.peerId @@ -511,26 +508,22 @@ proc muxerHandler(s: Switch, muxer: Muxer) {.async, gcsafe.} = await muxer.connection.closeEvent.wait() await s.triggerConnEvent(peerId, ConnEvent(kind: ConnEventKind.Disconnected)) - except CancelledError: - # This is top-level procedure which will work as separate task, so it - # do not need to propogate CancelledError. - debug "Unexpected cancellation in switch muxer cleanup", conn except CatchableError as exc: + # This is top-level procedure which will work as separate task, so it + # do not need to propogate CancelledError and shouldn't leak others debug "Unexpected exception in switch muxer cleanup", - err = exc.msg, conn + conn, msg = exc.msg proc peerStartup() {.async.} = try: await s.triggerConnEvent(peerId, ConnEvent(kind: ConnEventKind.Connected, incoming: true)) - except CancelledError: - # This is top-level procedure which will work as separate task, so it - # do not need to propogate CancelledError. - debug "Unexpected cancellation in switch muxer startup", conn except CatchableError as exc: + # This is top-level procedure which will work as separate task, so it + # do not need to propogate CancelledError and shouldn't leak others debug "Unexpected exception in switch muxer startup", - err = exc.msg, conn + conn, msg = exc.msg # All the errors are handled inside `peerStartup()` procedure. asyncSpawn peerStartup() @@ -544,7 +537,7 @@ proc muxerHandler(s: Switch, muxer: Muxer) {.async, gcsafe.} = except CatchableError as exc: await muxer.close() libp2p_failed_upgrade.inc() - trace "Exception in muxer handler", exc = exc.msg, conn + trace "Exception in muxer handler", conn, msg = exc.msg proc newSwitch*(peerInfo: PeerInfo, transports: seq[Transport], @@ -566,16 +559,16 @@ proc newSwitch*(peerInfo: PeerInfo, let s = result # can't capture result result.streamHandler = proc(conn: Connection) {.async, gcsafe.} = # noraises - trace "Incoming muxed connection", conn + trace "Starting stream handler", conn try: await s.ms.handle(conn) # handle incoming connection except CancelledError as exc: raise exc except CatchableError as exc: - trace "exception in stream handler", exc = exc.msg, conn + trace "exception in stream handler", conn, msg = exc.msg finally: await conn.close() - trace "Muxed connection done", conn + trace "Stream handler done", conn result.mount(identity) for key, val in muxers: