From c1856fda5307be7e855a42894d2753169ef78e1f Mon Sep 17 00:00:00 2001 From: Jacek Sieka Date: Sun, 6 Sep 2020 10:31:47 +0200 Subject: [PATCH] simplify and unify logging (#353) * use short format for logging peerid * log peerid:oid for connections --- examples/directchat.nim | 2 +- libp2p/connmanager.nim | 10 +- libp2p/daemon/daemonapi.nim | 17 ---- libp2p/muxers/mplex/coder.nim | 6 +- libp2p/muxers/mplex/lpchannel.nim | 116 +++++++---------------- libp2p/muxers/mplex/mplex.nim | 68 ++++++------- libp2p/muxers/muxer.nim | 10 +- libp2p/peerid.nim | 38 ++++---- libp2p/peerinfo.nim | 11 +-- libp2p/protocols/identify.nim | 14 +-- libp2p/protocols/pubsub/floodsub.nim | 35 +++---- libp2p/protocols/pubsub/gossipsub.nim | 33 ++++--- libp2p/protocols/pubsub/pubsub.nim | 10 +- libp2p/protocols/pubsub/pubsubpeer.nim | 72 +++++++------- libp2p/protocols/pubsub/rpc/message.nim | 2 +- libp2p/protocols/pubsub/rpc/messages.nim | 2 +- libp2p/protocols/pubsub/rpc/protobuf.nim | 2 +- libp2p/protocols/secure/noise.nim | 19 ++-- libp2p/protocols/secure/secio.nim | 23 +++-- libp2p/protocols/secure/secure.nim | 30 +++--- libp2p/stream/bufferstream.nim | 20 ++-- libp2p/stream/chronosstream.nim | 12 ++- libp2p/stream/connection.nim | 23 +++-- libp2p/switch.nim | 87 ++++++++--------- tests/pubsub/testgossipsub.nim | 22 ++--- tests/testpeer.nim | 8 +- 26 files changed, 311 insertions(+), 381 deletions(-) diff --git a/examples/directchat.nim b/examples/directchat.nim index fe1c35374..86d3c8aa5 100644 --- a/examples/directchat.nim +++ b/examples/directchat.nim @@ -190,7 +190,7 @@ proc processInput(rfd: AsyncFD, rng: ref BrHmacDrbgContext) {.async.} = let libp2pFuts = await switch.start() chatProto.started = true - let id = peerInfo.peerId.pretty + let id = $peerInfo.peerId echo "PeerID: " & id echo "listening on: " for a in peerInfo.addrs: diff --git a/libp2p/connmanager.nim b/libp2p/connmanager.nim index 4cbafda98..9be88f400 100644 --- a/libp2p/connmanager.nim +++ b/libp2p/connmanager.nim @@ -113,7 +113,7 @@ proc cleanupConn(c: ConnManager, conn: Connection) {.async.} = finally: await conn.close() - trace "connection cleaned up", peer = $conn.peerInfo + trace "connection cleaned up", conn proc onClose(c: ConnManager, conn: Connection) {.async.} = ## connection close even handler @@ -122,7 +122,7 @@ proc onClose(c: ConnManager, conn: Connection) {.async.} = ## try: await conn.join() - trace "triggering connection cleanup", peer = $conn.peerInfo + trace "triggering connection cleanup", conn await c.cleanupConn(conn) except CancelledError: # This is top-level procedure which will work as separate task, so it @@ -167,7 +167,7 @@ proc selectMuxer*(c: ConnManager, conn: Connection): Muxer = if conn in c.muxed: return c.muxed[conn].muxer else: - debug "no muxer for connection", conn = $conn + debug "no muxer for connection", conn proc storeConn*(c: ConnManager, conn: Connection) = ## store a connection @@ -196,7 +196,7 @@ proc storeConn*(c: ConnManager, conn: Connection) = asyncSpawn c.onClose(conn) libp2p_peers.set(c.conns.len.int64) - trace "stored connection", connections = c.conns.len, peer = peerId + trace "stored connection", connections = c.conns.len, conn proc storeOutgoing*(c: ConnManager, conn: Connection) = conn.dir = Direction.Out @@ -222,7 +222,7 @@ proc storeMuxer*(c: ConnManager, muxer: muxer, handle: handle) - trace "stored muxer", connections = c.conns.len + trace "stored muxer", connections = c.conns.len, muxer proc getMuxedStream*(c: ConnManager, peerId: PeerID, diff --git a/libp2p/daemon/daemonapi.nim b/libp2p/daemon/daemonapi.nim index e610fd384..8aee77875 100644 --- a/libp2p/daemon/daemonapi.nim +++ b/libp2p/daemon/daemonapi.nim @@ -1304,20 +1304,3 @@ proc pubsubSubscribe*(api: DaemonAPI, topic: string, except Exception as exc: await api.closeConnection(transp) raise exc - -proc `$`*(pinfo: PeerInfo): string = - ## Get string representation of ``PeerInfo`` object. - result = newStringOfCap(128) - result.add("{PeerID: '") - result.add($pinfo.peer.pretty()) - result.add("' Addresses: [") - let length = len(pinfo.addresses) - for i in 0.. 0: - result = result diff --git a/libp2p/muxers/mplex/coder.nim b/libp2p/muxers/mplex/coder.nim index 3b7ecbe4d..558e266e0 100644 --- a/libp2p/muxers/mplex/coder.nim +++ b/libp2p/muxers/mplex/coder.nim @@ -31,10 +31,10 @@ proc newInvalidMplexMsgType*(): ref InvalidMplexMsgType = proc readMsg*(conn: Connection): Future[Msg] {.async, gcsafe.} = let header = await conn.readVarint() - trace "read header varint", varint = header + trace "read header varint", varint = header, conn let data = await conn.readLp(MaxMsgSize) - trace "read data", dataLen = data.len, data = shortLog(data) + trace "read data", dataLen = data.len, data = shortLog(data), conn let msgType = header and 0x7 if msgType.int > ord(MessageType.ResetOut): @@ -46,7 +46,7 @@ proc writeMsg*(conn: Connection, id: uint64, msgType: MessageType, data: seq[byte] = @[]) {.async, gcsafe.} = - trace "sending data over mplex", oid = $conn.oid, + trace "sending data over mplex", conn, id, msgType, data = data.len diff --git a/libp2p/muxers/mplex/lpchannel.nim b/libp2p/muxers/mplex/lpchannel.nim index e62dfebf2..4e8a2b534 100644 --- a/libp2p/muxers/mplex/lpchannel.nim +++ b/libp2p/muxers/mplex/lpchannel.nim @@ -7,7 +7,7 @@ ## This file may not be copied, modified, or distributed except according to ## those terms. -import oids, deques +import std/[oids, strformat] import chronos, chronicles, metrics import types, coder, @@ -66,86 +66,60 @@ template withWriteLock(lock: AsyncLock, body: untyped): untyped = if not(isNil(lock)) and lock.locked: lock.release() -proc closeMessage(s: LPChannel) {.async.} = - logScope: - id = s.id - initiator = s.initiator - name = s.name - oid = $s.oid - peer = $s.conn.peerInfo - # stack = getStackTrace() +func shortLog*(s: LPChannel): auto = + if s.isNil: "LPChannel(nil)" + elif s.conn.peerInfo.isNil: $s.oid + elif s.name != $s.oid: &"{shortLog(s.conn.peerInfo.peerId)}:{s.oid}:{s.name}" + else: &"{shortLog(s.conn.peerInfo.peerId)}:{s.oid}" +chronicles.formatIt(LPChannel): shortLog(it) +proc closeMessage(s: LPChannel) {.async.} = ## send close message - this will not raise ## on EOF or Closed withWriteLock(s.writeLock): - trace "sending close message" + trace "sending close message", s await s.conn.writeMsg(s.id, s.closeCode) # write close proc resetMessage(s: LPChannel) {.async.} = - logScope: - id = s.id - initiator = s.initiator - name = s.name - oid = $s.oid - peer = $s.conn.peerInfo - # stack = getStackTrace() - ## send reset message - this will not raise try: withWriteLock(s.writeLock): - trace "sending reset message" + trace "sending reset message", s await s.conn.writeMsg(s.id, s.resetCode) # write reset except CancelledError: # This procedure is called from one place and never awaited, so there no # need to re-raise CancelledError. trace "Unexpected cancellation while resetting channel" except LPStreamEOFError as exc: - trace "muxed connection EOF", exc = exc.msg + trace "muxed connection EOF", exc = exc.msg, s except LPStreamClosedError as exc: - trace "muxed connection closed", exc = exc.msg + trace "muxed connection closed", exc = exc.msg, s except LPStreamIncompleteError as exc: - trace "incomplete message", exc = exc.msg + trace "incomplete message", exc = exc.msg, s except CatchableError as exc: - trace "Unhandled exception leak", exc = exc.msg + debug "Unhandled exception leak", exc = exc.msg, s proc open*(s: LPChannel) {.async, gcsafe.} = - logScope: - id = s.id - initiator = s.initiator - name = s.name - oid = $s.oid - peer = $s.conn.peerInfo - # stack = getStackTrace() - - ## NOTE: Don't call withExcAndLock or withWriteLock, - ## because this already gets called from writeHandler - ## which is locked await s.conn.writeMsg(s.id, MessageType.New, s.name) - trace "opened channel" + trace "opened channel", s s.isOpen = true proc closeRemote*(s: LPChannel) {.async.} = - logScope: - id = s.id - initiator = s.initiator - name = s.name - oid = $s.oid - peer = $s.conn.peerInfo - # stack = getStackTrace() - - trace "got EOF, closing channel" + trace "closing remote", s try: await s.drainBuffer() s.isEof = true # set EOF immediately to prevent further reads # close parent bufferstream to prevent further reads await procCall BufferStream(s).close() - trace "channel closed on EOF" + trace "channel closed on EOF", s except CancelledError as exc: raise exc except CatchableError as exc: - trace "exception closing remote channel", exc = exc.msg + trace "exception closing remote channel", exc = exc.msg, s + + trace "closed remote", s method closed*(s: LPChannel): bool = ## this emulates half-closed behavior @@ -155,19 +129,11 @@ method closed*(s: LPChannel): bool = s.closedLocal method reset*(s: LPChannel) {.base, async, gcsafe.} = - logScope: - id = s.id - initiator = s.initiator - name = s.name - oid = $s.oid - peer = $s.conn.peerInfo - # stack = getStackTrace() - if s.closedLocal and s.isEof: - trace "channel already closed or reset" + trace "channel already closed or reset", s return - trace "resetting channel" + trace "resetting channel", s asyncSpawn s.resetMessage() @@ -182,24 +148,16 @@ method reset*(s: LPChannel) {.base, async, gcsafe.} = except CancelledError as exc: raise exc except CatchableError as exc: - trace "exception in reset", exc = exc.msg + trace "exception in reset", exc = exc.msg, s - trace "channel reset" + trace "channel reset", s method close*(s: LPChannel) {.async, gcsafe.} = - logScope: - id = s.id - initiator = s.initiator - name = s.name - oid = $s.oid - peer = $s.conn.peerInfo - # stack = getStackTrace() - if s.closedLocal: - trace "channel already closed" + trace "channel already closed", s return - trace "closing local lpchannel" + trace "closing local lpchannel", s proc closeInternal() {.async.} = try: @@ -207,15 +165,15 @@ method close*(s: LPChannel) {.async, gcsafe.} = if s.atEof: # already closed by remote close parent buffer immediately await procCall BufferStream(s).close() except CancelledError: - trace "Unexpected cancellation while closing channel" + trace "Unexpected cancellation while closing channel", s await s.reset() # This is top-level procedure which will work as separate task, so it # do not need to propogate CancelledError. except CatchableError as exc: - trace "exception closing channel", exc = exc.msg + trace "exception closing channel", exc = exc.msg, s await s.reset() - trace "lpchannel closed local" + trace "lpchannel closed local", s s.closedLocal = true # All the errors are handled inside `closeInternal()` procedure. @@ -226,7 +184,7 @@ method initStream*(s: LPChannel) = s.objName = "LPChannel" s.timeoutHandler = proc() {.async, gcsafe.} = - trace "idle timeout expired, resetting LPChannel" + trace "idle timeout expired, resetting LPChannel", s await s.reset() procCall BufferStream(s).initStream() @@ -253,27 +211,19 @@ proc init*( resetCode: if initiator: MessageType.ResetOut else: MessageType.ResetIn, dir: if initiator: Direction.Out else: Direction.In) - logScope: - id = chann.id - initiator = chann.initiator - name = chann.name - oid = $chann.oid - peer = $chann.conn.peerInfo - # stack = getStackTrace() - proc writeHandler(data: seq[byte]) {.async, gcsafe.} = try: if chann.isLazy and not(chann.isOpen): await chann.open() # writes should happen in sequence - trace "sending data", len = data.len + trace "sending data", len = data.len, chann await conn.writeMsg(chann.id, chann.msgCode, data) except CatchableError as exc: - trace "exception in lpchannel write handler", exc = exc.msg + trace "exception in lpchannel write handler", exc = exc.msg, chann await chann.reset() raise exc @@ -281,6 +231,6 @@ proc init*( when chronicles.enabledLogLevel == LogLevel.TRACE: chann.name = if chann.name.len > 0: chann.name else: $chann.oid - trace "created new lpchannel" + trace "created new lpchannel", chann return chann diff --git a/libp2p/muxers/mplex/mplex.nim b/libp2p/muxers/mplex/mplex.nim index d2611bc2e..37244258b 100644 --- a/libp2p/muxers/mplex/mplex.nim +++ b/libp2p/muxers/mplex/mplex.nim @@ -42,6 +42,9 @@ type oid*: Oid maxChannCount: int +func shortLog*(m: MPlex): auto = shortLog(m.connection) +chronicles.formatIt(Mplex): shortLog(it) + proc newTooManyChannels(): ref TooManyChannels = newException(TooManyChannels, "max allowed channel count exceeded") @@ -51,18 +54,19 @@ proc cleanupChann(m: Mplex, chann: LPChannel) {.async, inline.} = try: await chann.join() m.channels[chann.initiator].del(chann.id) - trace "cleaned up channel", id = chann.id, oid = $chann.oid + trace "cleaned up channel", m, chann when defined(libp2p_expensive_metrics): libp2p_mplex_channels.set( m.channels[chann.initiator].len.int64, - labelValues = [$chann.initiator, $m.connection.peerInfo]) + labelValues = [$chann.initiator, $m.connection.peerInfo.peerId]) except CancelledError: # This is top-level procedure which will work as separate task, so it # do not need to propogate CancelledError. - trace "Unexpected cancellation in mplex channel cleanup" + trace "Unexpected cancellation in mplex channel cleanup", + m, chann except CatchableError as exc: - trace "error cleaning up mplex channel", exc = exc.msg + trace "error cleaning up mplex channel", exc = exc.msg, m, chann proc newStreamInternal*(m: Mplex, initiator: bool = true, @@ -77,10 +81,10 @@ proc newStreamInternal*(m: Mplex, m.currentId.inc(); m.currentId else: chanId - trace "creating new channel", channelId = id, - initiator = initiator, - name = name, - oid = $m.oid + trace "creating new channel", id, + initiator, + name, + m result = LPChannel.init( id, m.connection, @@ -103,35 +107,30 @@ proc newStreamInternal*(m: Mplex, when defined(libp2p_expensive_metrics): libp2p_mplex_channels.set( m.channels[initiator].len.int64, - labelValues = [$initiator, $m.connection.peerInfo]) + labelValues = [$initiator, $m.connection.peerInfo.peerId]) proc handleStream(m: Mplex, chann: LPChannel) {.async.} = ## call the muxer stream handler for this channel ## try: await m.streamHandler(chann) - trace "finished handling stream" + trace "finished handling stream", m, chann doAssert(chann.closed, "connection not closed by handler!") except CancelledError: - trace "Unexpected cancellation in stream handler" + trace "Unexpected cancellation in stream handler", m, chann await chann.reset() # This is top-level procedure which will work as separate task, so it # do not need to propogate CancelledError. except CatchableError as exc: - trace "Exception in mplex stream handler", exc = exc.msg + trace "Exception in mplex stream handler", + exc = exc.msg, m, chann await chann.reset() method handle*(m: Mplex) {.async, gcsafe.} = - logScope: moid = $m.oid - - trace "starting mplex main loop" + trace "starting mplex main loop", m, peer = m.connection.peerInfo.peerId try: - defer: - trace "stopping mplex main loop" - await m.close() - while not m.connection.atEof: - trace "waiting for data" + trace "waiting for data", m let (id, msgType, data) = await m.connection.readMsg() initiator = bool(ord(msgType) and 1) @@ -142,32 +141,28 @@ method handle*(m: Mplex) {.async, gcsafe.} = msgType = msgType size = data.len - trace "read message from connection", data = data.shortLog + trace "read message from connection", m, data = data.shortLog var channel = if MessageType(msgType) != MessageType.New: let tmp = m.channels[initiator].getOrDefault(id, nil) if tmp == nil: - trace "Channel not found, skipping" + trace "Channel not found, skipping", m continue tmp else: if m.channels[false].len > m.maxChannCount - 1: warn "too many channels created by remote peer", - allowedMax = MaxChannelCount + allowedMax = MaxChannelCount, m raise newTooManyChannels() let name = string.fromBytes(data) m.newStreamInternal(false, id, name, timeout = m.outChannTimeout) - logScope: - name = channel.name - oid = $channel.oid - case msgType: of MessageType.New: - trace "created channel" + trace "created channel", m, channel if not isNil(m.streamHandler): # Launch handler task @@ -177,27 +172,26 @@ method handle*(m: Mplex) {.async, gcsafe.} = of MessageType.MsgIn, MessageType.MsgOut: if data.len > MaxMsgSize: warn "attempting to send a packet larger than allowed", - allowed = MaxMsgSize + allowed = MaxMsgSize, channel raise newLPStreamLimitError() - trace "pushing data to channel" + trace "pushing data to channel", m, channel await channel.pushTo(data) - trace "pushed data to channel" + trace "pushed data to channel", m, channel of MessageType.CloseIn, MessageType.CloseOut: - trace "closing channel" await channel.closeRemote() - trace "closed channel" of MessageType.ResetIn, MessageType.ResetOut: - trace "resetting channel" await channel.reset() - trace "reset channel" except CancelledError: # This procedure is spawned as task and it is not part of public API, so # there no way for this procedure to be cancelled implicitely. trace "Unexpected cancellation in mplex handler" except CatchableError as exc: - trace "Exception occurred", exception = exc.msg, oid = $m.oid + trace "Exception occurred", exception = exc.msg, m + finally: + trace "stopping mplex main loop", m + await m.close() proc init*(M: type Mplex, conn: Connection, @@ -224,7 +218,7 @@ method close*(m: Mplex) {.async, gcsafe.} = if m.isClosed: return - trace "closing mplex muxer", moid = $m.oid + trace "closing mplex muxer", m m.isClosed = true diff --git a/libp2p/muxers/muxer.nim b/libp2p/muxers/muxer.nim index baa76a22d..870a80a79 100644 --- a/libp2p/muxers/muxer.nim +++ b/libp2p/muxers/muxer.nim @@ -35,6 +35,9 @@ type streamHandler*: StreamHandler # triggered every time there is a new stream, called for any muxer instance muxerHandler*: MuxerHandler # triggered every time there is a new muxed connection created +func shortLog*(m: Muxer): auto = shortLog(m.connection) +chronicles.formatIt(Muxer): shortLog(it) + # muxer interface method newStream*(m: Muxer, name: string = "", lazy: bool = false): Future[Connection] {.base, async, gcsafe.} = discard @@ -49,7 +52,7 @@ proc newMuxerProvider*(creator: MuxerConstructor, codec: string): MuxerProvider method init(c: MuxerProvider) = proc handler(conn: Connection, proto: string) {.async, gcsafe, closure.} = - trace "starting muxer handler", proto=proto, peer = $conn + trace "starting muxer handler", proto=proto, conn try: let muxer = c.newMuxer(conn) @@ -68,11 +71,8 @@ method init(c: MuxerProvider) = except CancelledError as exc: raise exc except CatchableError as exc: - trace "exception in muxer handler", exc = exc.msg, peer = $conn, proto=proto + trace "exception in muxer handler", exc = exc.msg, conn, proto finally: await conn.close() c.handler = handler - -proc `$`*(m: Muxer): string = - $m.connection diff --git a/libp2p/peerid.nim b/libp2p/peerid.nim index 9eeb46791..cd01ec149 100644 --- a/libp2p/peerid.nim +++ b/libp2p/peerid.nim @@ -11,11 +11,13 @@ {.push raises: [Defect].} -import hashes +import std/hashes +import chronicles import nimcrypto/utils, stew/base58 import crypto/crypto, multicodec, multihash, vbuffer import protobuf/minprotobuf import stew/results + export results const @@ -27,11 +29,24 @@ type PeerID* = object data*: seq[byte] - PeerIDError* = object of CatchableError - -proc pretty*(pid: PeerID): string {.inline.} = +func `$`*(pid: PeerID): string = ## Return base58 encoded ``pid`` representation. - result = Base58.encode(pid.data) + Base58.encode(pid.data) + +func shortLog*(pid: PeerId): string = + ## Returns compact string representation of ``pid``. + var spid = $pid + if len(spid) <= 10: + result = spid + else: + result = newStringOfCap(10) + for i in 0..<2: + result.add(spid[i]) + result.add("*") + for i in (len(spid) - 6)..spid.high: + result.add(spid[i]) + +chronicles.formatIt(PeerID): shortLog(it) proc toBytes*(pid: PeerID, data: var openarray[byte]): int = ## Store PeerID ``pid`` to array of bytes ``data``. @@ -112,19 +127,6 @@ proc extractPublicKey*(pid: PeerID, pubkey: var PublicKey): bool = let length = len(mh.data.buffer) result = pubkey.init(mh.data.buffer.toOpenArray(mh.dpos, length - 1)) -proc `$`*(pid: PeerID): string = - ## Returns compact string representation of ``pid``. - var spid = pid.pretty() - if len(spid) <= 10: - result = spid - else: - result = newStringOfCap(10) - for i in 0..<2: - result.add(spid[i]) - result.add("*") - for i in (len(spid) - 6)..spid.high: - result.add(spid[i]) - proc init*(pid: var PeerID, data: openarray[byte]): bool = ## Initialize peer id from raw binary representation ``data``. ## diff --git a/libp2p/peerinfo.nim b/libp2p/peerinfo.nim index f43905696..3fac63a3b 100644 --- a/libp2p/peerinfo.nim +++ b/libp2p/peerinfo.nim @@ -41,20 +41,15 @@ type of HasPublic: key: Option[PublicKey] -proc id*(p: PeerInfo): string = - if not(isNil(p)): - return p.peerId.pretty() - -proc `$`*(p: PeerInfo): string = p.id - -proc shortLog*(p: PeerInfo): auto = +func shortLog*(p: PeerInfo): auto = ( - id: p.id(), + peerId: $p.peerId, addrs: mapIt(p.addrs, $it), protocols: mapIt(p.protocols, $it), protoVersion: p.protoVersion, agentVersion: p.agentVersion, ) +chronicles.formatIt(PeerInfo): shortLog(it) template postInit(peerinfo: PeerInfo, addrs: openarray[MultiAddress], diff --git a/libp2p/protocols/identify.nim b/libp2p/protocols/identify.nim index 611469a66..3c7ce2381 100644 --- a/libp2p/protocols/identify.nim +++ b/libp2p/protocols/identify.nim @@ -106,16 +106,16 @@ method init*(p: Identify) = proc handle(conn: Connection, proto: string) {.async, gcsafe, closure.} = try: defer: - trace "exiting identify handler", oid = $conn.oid + trace "exiting identify handler", conn await conn.close() - trace "handling identify request", oid = $conn.oid + trace "handling identify request", conn var pb = encodeMsg(p.peerInfo, conn.observedAddr) await conn.writeLp(pb.buffer) except CancelledError as exc: raise exc except CatchableError as exc: - trace "exception in identify handler", exc = exc.msg + trace "exception in identify handler", exc = exc.msg, conn p.handler = handle p.codec = IdentifyCodec @@ -123,10 +123,10 @@ method init*(p: Identify) = proc identify*(p: Identify, conn: Connection, remotePeerInfo: PeerInfo): Future[IdentifyInfo] {.async, gcsafe.} = - trace "initiating identify", peer = $conn + trace "initiating identify", conn var message = await conn.readLp(64*1024) if len(message) == 0: - trace "identify: Empty message received!" + trace "identify: Empty message received!", conn raise newException(IdentityInvalidMsgError, "Empty message received!") let infoOpt = decodeMsg(message) @@ -144,8 +144,8 @@ proc identify*(p: Identify, # have in most cases if peer.get() != remotePeerInfo.peerId: trace "Peer ids don't match", - remote = peer.get().pretty(), - local = remotePeerInfo.id + remote = peer, + local = remotePeerInfo.peerId raise newException(IdentityNoMatchError, "Peer ids don't match") diff --git a/libp2p/protocols/pubsub/floodsub.nim b/libp2p/protocols/pubsub/floodsub.nim index 578ecc89d..11c480251 100644 --- a/libp2p/protocols/pubsub/floodsub.nim +++ b/libp2p/protocols/pubsub/floodsub.nim @@ -39,18 +39,18 @@ method subscribeTopic*(f: FloodSub, f.floodsub[topic] = initHashSet[PubSubPeer]() if subscribe: - trace "adding subscription for topic", peer = peer.id, name = topic + trace "adding subscription for topic", peer, topic # subscribe the peer to the topic f.floodsub[topic].incl(peer) else: - trace "removing subscription for topic", peer = peer.id, name = topic + trace "removing subscription for topic", peer, topic # unsubscribe the peer from the topic f.floodsub[topic].excl(peer) method unsubscribePeer*(f: FloodSub, peer: PeerID) = ## handle peer disconnects ## - trace "unsubscribing floodsub peer", peer = $peer + trace "unsubscribing floodsub peer", peer let pubSubPeer = f.peers.getOrDefault(peer) if pubSubPeer.isNil: return @@ -67,20 +67,17 @@ method rpcHandler*(f: FloodSub, for msg in rpcMsg.messages: # for every message let msgId = f.msgIdProvider(msg) - logScope: - msgId - peer = peer.id if f.seen.put(msgId): - trace "Dropping already-seen message" + trace "Dropping already-seen message", msgId, peer continue if f.verifySignature and not msg.verify(peer.peerId): - debug "Dropping message due to failed signature verification" + debug "Dropping message due to failed signature verification", msgId, peer continue if not (await f.validate(msg)): - trace "Dropping message due to failed validation" + trace "Dropping message due to failed validation", msgId, peer continue var toSendPeers = initHashSet[PubSubPeer]() @@ -105,9 +102,9 @@ method init*(f: FloodSub) = except CancelledError: # This is top-level procedure which will work as separate task, so it # do not need to propogate CancelledError. - trace "Unexpected cancellation in floodsub handler" + trace "Unexpected cancellation in floodsub handler", conn except CatchableError as exc: - trace "FloodSub handler leaks an error", exc = exc.msg + trace "FloodSub handler leaks an error", exc = exc.msg, conn f.handler = handler f.codec = FloodSubCodec @@ -118,17 +115,16 @@ method publish*(f: FloodSub, # base returns always 0 discard await procCall PubSub(f).publish(topic, data) - logScope: topic - trace "Publishing message on topic", data = data.shortLog + trace "Publishing message on topic", data = data.shortLog, topic if topic.len <= 0: # data could be 0/empty - debug "Empty topic, skipping publish" + debug "Empty topic, skipping publish", topic return 0 let peers = toSeq(f.floodsub.getOrDefault(topic)) if peers.len == 0: - debug "No peers for topic, skipping publish" + debug "No peers for topic, skipping publish", topic return 0 inc f.msgSeqno @@ -136,13 +132,12 @@ method publish*(f: FloodSub, msg = Message.init(f.peerInfo, data, topic, f.msgSeqno, f.sign) msgId = f.msgIdProvider(msg) - logScope: msgId - - trace "Created new message", msg = shortLog(msg), peers = peers.len + trace "Created new message", + msg = shortLog(msg), peers = peers.len, topic, msgId if f.seen.put(msgId): # custom msgid providers might cause this - trace "Dropping already-seen message" + trace "Dropping already-seen message", msgId, topic return 0 # Try to send to all peers that are known to be interested @@ -151,7 +146,7 @@ method publish*(f: FloodSub, when defined(libp2p_expensive_metrics): libp2p_pubsub_messages_published.inc(labelValues = [topic]) - trace "Published message to peers" + trace "Published message to peers", msgId, topic return peers.len diff --git a/libp2p/protocols/pubsub/gossipsub.nim b/libp2p/protocols/pubsub/gossipsub.nim index 2bdd9a8eb..af6fe77d7 100644 --- a/libp2p/protocols/pubsub/gossipsub.nim +++ b/libp2p/protocols/pubsub/gossipsub.nim @@ -79,15 +79,16 @@ method init*(g: GossipSub) = except CancelledError: # This is top-level procedure which will work as separate task, so it # do not need to propogate CancelledError. - trace "Unexpected cancellation in gossipsub handler" + trace "Unexpected cancellation in gossipsub handler", conn except CatchableError as exc: - trace "GossipSub handler leaks an error", exc = exc.msg + trace "GossipSub handler leaks an error", exc = exc.msg, conn g.handler = handler g.codec = GossipSubCodec proc replenishFanout(g: GossipSub, topic: string) = ## get fanout peers for a topic + logScope: topic trace "about to replenish fanout" if g.fanout.peers(topic) < GossipSubDLo: @@ -201,7 +202,7 @@ proc getGossipPeers(g: GossipSub): Table[PubSubPeer, ControlMessage] {.gcsafe.} continue if topic notin g.gossipsub: - trace "topic not in gossip array, skipping", topicID = topic + trace "topic not in gossip array, skipping", topic continue let ihave = ControlIHave(topicID: topic, messageIDs: toSeq(mids)) @@ -251,9 +252,10 @@ method unsubscribePeer*(g: GossipSub, peer: PeerID) = ## handle peer disconnects ## - trace "unsubscribing gossipsub peer", peer = $peer + trace "unsubscribing gossipsub peer", peer let pubSubPeer = g.peers.getOrDefault(peer) if pubSubPeer.isNil: + trace "no peer to unsubscribe", peer return for t in toSeq(g.gossipsub.keys): @@ -287,7 +289,7 @@ method subscribeTopic*(g: GossipSub, procCall PubSub(g).subscribeTopic(topic, subscribe, peer) logScope: - peer = $peer.id + peer topic if subscribe: @@ -319,7 +321,7 @@ proc handleGraft(g: GossipSub, for graft in grafts: let topic = graft.topicID logScope: - peer = peer.id + peer topic trace "peer grafted topic" @@ -350,7 +352,7 @@ proc handleGraft(g: GossipSub, proc handlePrune(g: GossipSub, peer: PubSubPeer, prunes: seq[ControlPrune]) = for prune in prunes: - trace "peer pruned topic", peer = peer.id, topic = prune.topicID + trace "peer pruned topic", peer, topic = prune.topicID g.mesh.removePeer(prune.topicID, peer) when defined(libp2p_expensive_metrics): @@ -362,7 +364,7 @@ proc handleIHave(g: GossipSub, ihaves: seq[ControlIHave]): ControlIWant = for ihave in ihaves: trace "peer sent ihave", - peer = peer.id, topic = ihave.topicID, msgs = ihave.messageIDs + peer, topic = ihave.topicID, msgs = ihave.messageIDs if ihave.topicID in g.mesh: for m in ihave.messageIDs: @@ -374,7 +376,7 @@ proc handleIWant(g: GossipSub, iwants: seq[ControlIWant]): seq[Message] = for iwant in iwants: for mid in iwant.messageIDs: - trace "peer sent iwant", peer = peer.id, messageID = mid + trace "peer sent iwant", peer, messageID = mid let msg = g.mcache.get(mid) if msg.isSome: result.add(msg.get()) @@ -386,22 +388,19 @@ method rpcHandler*(g: GossipSub, for msg in rpcMsg.messages: # for every message let msgId = g.msgIdProvider(msg) - logScope: - msgId - peer = peer.id if g.seen.put(msgId): - trace "Dropping already-seen message" + trace "Dropping already-seen message", msgId, peer continue g.mcache.put(msgId, msg) if g.verifySignature and not msg.verify(peer.peerId): - debug "Dropping message due to failed signature verification" + debug "Dropping message due to failed signature verification", msgId, peer continue if not (await g.validate(msg)): - trace "Dropping message due to failed validation" + trace "Dropping message due to failed validation", msgId, peer continue var toSendPeers = initHashSet[PubSubPeer]() @@ -414,7 +413,7 @@ method rpcHandler*(g: GossipSub, # In theory, if topics are the same in all messages, we could batch - we'd # also have to be careful to only include validated messages g.broadcast(toSeq(toSendPeers), RPCMsg(messages: @[msg])) - trace "forwared message to peers", peers = toSendPeers.len + trace "forwared message to peers", peers = toSendPeers.len, msgId, peer if rpcMsg.control.isSome: let control = rpcMsg.control.get() @@ -428,7 +427,7 @@ method rpcHandler*(g: GossipSub, if respControl.graft.len > 0 or respControl.prune.len > 0 or respControl.ihave.len > 0 or messages.len > 0: - debug "sending control message", msg = shortLog(respControl) + debug "sending control message", msg = shortLog(respControl), peer g.send( peer, RPCMsg(control: some(respControl), messages: messages)) diff --git a/libp2p/protocols/pubsub/pubsub.nim b/libp2p/protocols/pubsub/pubsub.nim index fed1512e0..f89598ef1 100644 --- a/libp2p/protocols/pubsub/pubsub.nim +++ b/libp2p/protocols/pubsub/pubsub.nim @@ -106,11 +106,9 @@ method rpcHandler*(p: PubSub, peer: PubSubPeer, rpcMsg: RPCMsg) {.async, base.} = ## handle rpc messages - logScope: peer = peer.id - - trace "processing RPC message", msg = rpcMsg.shortLog + trace "processing RPC message", msg = rpcMsg.shortLog, peer for s in rpcMsg.subscriptions: # subscribe/unsubscribe the peer for each topic - trace "about to subscribe to topic", topicId = s.topic + trace "about to subscribe to topic", topicId = s.topic, peer p.subscribeTopic(s.topic, s.subscribe, peer) proc getOrCreatePeer*( @@ -178,11 +176,11 @@ method handleConn*(p: PubSub, try: peer.handler = handler await peer.handle(conn) # spawn peer read loop - trace "pubsub peer handler ended", peer = peer.id + trace "pubsub peer handler ended", conn except CancelledError as exc: raise exc except CatchableError as exc: - trace "exception ocurred in pubsub handle", exc = exc.msg + trace "exception ocurred in pubsub handle", exc = exc.msg, conn finally: await conn.close() diff --git a/libp2p/protocols/pubsub/pubsubpeer.nim b/libp2p/protocols/pubsub/pubsubpeer.nim index 3c7894ac2..ac8ca3303 100644 --- a/libp2p/protocols/pubsub/pubsubpeer.nim +++ b/libp2p/protocols/pubsub/pubsubpeer.nim @@ -49,9 +49,10 @@ func hash*(p: PubSubPeer): Hash = # int is either 32/64, so intptr basically, pubsubpeer is a ref cast[pointer](p).hash -proc id*(p: PubSubPeer): string = - doAssert(not p.isNil, "nil pubsubpeer") - p.peerId.pretty +func shortLog*(p: PubSubPeer): string = + if p.isNil: "PubSubPeer(nil)" + else: shortLog(p.peerId) +chronicles.formatIt(PubSubPeer): shortLog(it) proc connected*(p: PubSubPeer): bool = not p.sendConn.isNil and not @@ -72,25 +73,29 @@ proc sendObservers(p: PubSubPeer, msg: var RPCMsg) = obs.onSend(p, msg) proc handle*(p: PubSubPeer, conn: Connection) {.async.} = - logScope: - oid = $conn.oid - peer = p.id - closed = conn.closed - debug "starting pubsub read loop" + debug "starting pubsub read loop", + conn, peer = p, closed = conn.closed try: try: while not conn.atEof: - trace "waiting for data" + trace "waiting for data", conn, peer = p, closed = conn.closed + let data = await conn.readLp(64 * 1024) - trace "read data from peer", data = data.shortLog + trace "read data from peer", + conn, peer = p, closed = conn.closed, + data = data.shortLog var rmsg = decodeRpcMsg(data) if rmsg.isErr(): - notice "failed to decode msg from peer" + notice "failed to decode msg from peer", + conn, peer = p, closed = conn.closed, + err = rmsg.error() break - trace "decoded msg from peer", msg = rmsg.get().shortLog + trace "decoded msg from peer", + conn, peer = p, closed = conn.closed, + msg = rmsg.get().shortLog # trigger hooks p.recvObservers(rmsg.get()) @@ -98,7 +103,7 @@ proc handle*(p: PubSubPeer, conn: Connection) {.async.} = for m in rmsg.get().messages: for t in m.topicIDs: # metrics - libp2p_pubsub_received_messages.inc(labelValues = [p.id, t]) + libp2p_pubsub_received_messages.inc(labelValues = [$p.peerId, t]) await p.handler(p, rmsg.get()) finally: @@ -112,9 +117,11 @@ proc handle*(p: PubSubPeer, conn: Connection) {.async.} = # do not need to propogate CancelledError. trace "Unexpected cancellation in PubSubPeer.handle" except CatchableError as exc: - trace "Exception occurred in PubSubPeer.handle", exc = exc.msg + trace "Exception occurred in PubSubPeer.handle", + conn, peer = p, closed = conn.closed, exc = exc.msg finally: - debug "exiting pubsub read loop" + debug "exiting pubsub read loop", + conn, peer = p, closed = conn.closed proc getSendConn(p: PubSubPeer): Future[Connection] {.async.} = ## get a cached send connection or create a new one - will return nil if @@ -127,7 +134,7 @@ proc getSendConn(p: PubSubPeer): Future[Connection] {.async.} = if not current.isNil: if not (current.closed() or current.atEof): # The existing send connection looks like it might work - reuse it - trace "Reusing existing connection", oid = $current.oid + trace "Reusing existing connection", current return current # Send connection is set but broken - get rid of it @@ -171,19 +178,15 @@ proc getSendConn(p: PubSubPeer): Future[Connection] {.async.} = if newConn.isNil: return nil - trace "Sending handshake", oid = $newConn.oid, handshake = shortLog(handshake) + trace "Sending handshake", newConn, handshake = shortLog(handshake) await newConn.writeLp(encodeRpcMsg(handshake)) - trace "Caching new send connection", oid = $newConn.oid + trace "Caching new send connection", newConn p.sendConn = newConn # Start a read loop on the new connection. # All the errors are handled inside `handle()` procedure. asyncSpawn p.handle(newConn) return newConn - except CancelledError as exc: - raise exc - except CatchableError as exc: - return nil finally: if p.dialLock.locked: p.dialLock.release() @@ -200,11 +203,7 @@ proc connect*(p: PubSubPeer) = proc sendImpl(p: PubSubPeer, msg: RPCMsg) {.async.} = doAssert(not isNil(p), "pubsubpeer nil!") - logScope: - peer = p.id - rpcMsg = shortLog(msg) - - trace "sending msg to peer" + trace "sending msg to peer", peer = p, rpcMsg = shortLog(msg) # trigger send hooks var mm = msg # hooks can modify the message @@ -215,37 +214,34 @@ proc sendImpl(p: PubSubPeer, msg: RPCMsg) {.async.} = info "empty message, skipping" return - logScope: - encoded = shortLog(encoded) - - var conn = await p.getSendConn() + var conn: Connection try: - trace "about to send message" + conn = await p.getSendConn() if conn == nil: - debug "Couldn't get send connection, dropping message" + debug "Couldn't get send connection, dropping message", peer = p return - trace "sending encoded msgs to peer", connId = $conn.oid + trace "sending encoded msgs to peer", conn, encoded = shortLog(encoded) await conn.writeLp(encoded) - trace "sent pubsub message to remote", connId = $conn.oid + trace "sent pubsub message to remote", conn when defined(libp2p_expensive_metrics): for x in mm.messages: for t in x.topicIDs: # metrics - libp2p_pubsub_sent_messages.inc(labelValues = [p.id, t]) + libp2p_pubsub_sent_messages.inc(labelValues = [$p.peerId, t]) except CatchableError as exc: # Because we detach the send call from the currently executing task using # asyncCheck, no exceptions may leak out of it - trace "unable to send to remote", exc = exc.msg + debug "unable to send to remote", exc = exc.msg, peer = p # Next time sendConn is used, it will be have its close flag set and thus # will be recycled if not isNil(conn): await conn.close() # This will clean up the send connection if exc is CancelledError: # TODO not handled - debug "Send cancelled" + debug "Send cancelled", peer = p # We'll ask for a new send connection whenever possible if p.sendConn == conn: diff --git a/libp2p/protocols/pubsub/rpc/message.nim b/libp2p/protocols/pubsub/rpc/message.nim index e5d15d008..090bd374d 100644 --- a/libp2p/protocols/pubsub/rpc/message.nim +++ b/libp2p/protocols/pubsub/rpc/message.nim @@ -28,7 +28,7 @@ declareCounter(libp2p_pubsub_sig_verify_success, "pubsub successfully validated declareCounter(libp2p_pubsub_sig_verify_failure, "pubsub failed validated messages") func defaultMsgIdProvider*(m: Message): string = - byteutils.toHex(m.seqno) & m.fromPeer.pretty + byteutils.toHex(m.seqno) & $m.fromPeer proc sign*(msg: Message, privateKey: PrivateKey): CryptoResult[seq[byte]] = ok((? privateKey.sign(PubSubPrefix & encodeMessage(msg))).getBytes()) diff --git a/libp2p/protocols/pubsub/rpc/messages.nim b/libp2p/protocols/pubsub/rpc/messages.nim index 92ba5e332..014f065af 100644 --- a/libp2p/protocols/pubsub/rpc/messages.nim +++ b/libp2p/protocols/pubsub/rpc/messages.nim @@ -86,7 +86,7 @@ func shortLog*(c: ControlMessage): auto = func shortLog*(msg: Message): auto = ( - fromPeer: msg.fromPeer.pretty, + fromPeer: msg.fromPeer.shortLog, data: msg.data.shortLog, seqno: msg.seqno.shortLog, topicIDs: $msg.topicIDs, diff --git a/libp2p/protocols/pubsub/rpc/protobuf.nim b/libp2p/protocols/pubsub/rpc/protobuf.nim index 327b81559..ab6fcc711 100644 --- a/libp2p/protocols/pubsub/rpc/protobuf.nim +++ b/libp2p/protocols/pubsub/rpc/protobuf.nim @@ -187,7 +187,7 @@ proc decodeMessage*(pb: ProtoBuffer): ProtoResult[Message] {.inline.} = trace "decodeMessage: decoding message" var msg: Message if ? pb.getField(1, msg.fromPeer): - trace "decodeMessage: read fromPeer", fromPeer = msg.fromPeer.pretty() + trace "decodeMessage: read fromPeer", fromPeer = msg.fromPeer else: trace "decodeMessage: fromPeer is missing" if ? pb.getField(2, msg.data): diff --git a/libp2p/protocols/secure/noise.nim b/libp2p/protocols/secure/noise.nim index 726199259..c48ef5456 100644 --- a/libp2p/protocols/secure/noise.nim +++ b/libp2p/protocols/secure/noise.nim @@ -7,6 +7,7 @@ ## This file may not be copied, modified, or distributed except according to ## those terms. +import std/[oids, strformat] import chronos import chronicles import bearssl @@ -88,6 +89,12 @@ type # Utility +func shortLog*(conn: NoiseConnection): auto = + if conn.isNil: "NoiseConnection(nil)" + elif conn.peerInfo.isNil: $conn.oid + else: &"{shortLog(conn.peerInfo.peerId)}:{conn.oid}" +chronicles.formatIt(NoiseConnection): shortLog(it) + proc genKeyPair(rng: var BrHmacDrbgContext): KeyPair = result.privateKey = Curve25519Key.random(rng) result.publicKey = result.privateKey.public() @@ -392,13 +399,13 @@ method readMessage*(sconn: NoiseConnection): Future[seq[byte]] {.async.} = var besize: array[2, byte] await sconn.stream.readExactly(addr besize[0], besize.len) let size = uint16.fromBytesBE(besize).int # Cannot overflow - trace "receiveEncryptedMessage", size, peer = $sconn + trace "receiveEncryptedMessage", size, sconn if size > 0: var buffer = newSeq[byte](size) await sconn.stream.readExactly(addr buffer[0], buffer.len) return sconn.readCs.decryptWithAd([], buffer) else: - trace "Received 0-length message", conn = $sconn + trace "Received 0-length message", sconn method write*(sconn: NoiseConnection, message: seq[byte]): Future[void] {.async.} = if message.len == 0: @@ -418,14 +425,14 @@ method write*(sconn: NoiseConnection, message: seq[byte]): Future[void] {.async. lesize = cipher.len.uint16 besize = lesize.toBytesBE outbuf = newSeqOfCap[byte](cipher.len + 2) - trace "sendEncryptedMessage", size = lesize, peer = $sconn, left, offset + trace "sendEncryptedMessage", sconn, size = lesize, left, offset outbuf &= besize outbuf &= cipher await sconn.stream.write(outbuf) sconn.activity = true method handshake*(p: Noise, conn: Connection, initiator: bool): Future[SecureConn] {.async.} = - trace "Starting Noise handshake", initiator, peer = $conn + trace "Starting Noise handshake", conn, initiator # https://github.com/libp2p/specs/tree/master/noise#libp2p-data-in-handshake-messages let @@ -469,7 +476,7 @@ method handshake*(p: Noise, conn: Connection, initiator: bool): Future[SecureCon if not remoteSig.verify(verifyPayload, remotePubKey): raise newException(NoiseHandshakeError, "Noise handshake signature verify failed.") else: - trace "Remote signature verified", peer = $conn + trace "Remote signature verified", conn if initiator and not isNil(conn.peerInfo): let pid = PeerID.init(remotePubKey) @@ -480,7 +487,7 @@ method handshake*(p: Noise, conn: Connection, initiator: bool): Future[SecureCon failedKey: PublicKey discard extractPublicKey(conn.peerInfo.peerId, failedKey) debug "Noise handshake, peer infos don't match!", - initiator, dealt_peer = $conn.peerInfo.id, + initiator, dealt_peer = conn, dealt_key = $failedKey, received_peer = $pid, received_key = $remotePubKey raise newException(NoiseHandshakeError, "Noise handshake, peer infos don't match! " & $pid & " != " & $conn.peerInfo.peerId) diff --git a/libp2p/protocols/secure/secio.nim b/libp2p/protocols/secure/secio.nim index e40ab2d00..71bcb4dc2 100644 --- a/libp2p/protocols/secure/secio.nim +++ b/libp2p/protocols/secure/secio.nim @@ -6,7 +6,8 @@ ## at your option. ## This file may not be copied, modified, or distributed except according to ## those terms. -import chronos, chronicles, oids, stew/endians2, bearssl +import std/[oids, strformat] +import chronos, chronicles, stew/endians2, bearssl import nimcrypto/[hmac, sha2, sha, hash, rijndael, twofish, bcmode] import secure, ../../stream/connection, @@ -69,6 +70,12 @@ type SecioError* = object of CatchableError +func shortLog*(conn: SecioConn): auto = + if conn.isNil: "SecioConn(nil)" + elif conn.peerInfo.isNil: $conn.oid + else: &"{shortLog(conn.peerInfo.peerId)}:{conn.oid}" +chronicles.formatIt(SecioConn): shortLog(it) + proc init(mac: var SecureMac, hash: string, key: openarray[byte]) = if hash == "SHA256": mac = SecureMac(kind: SecureMacType.Sha256) @@ -184,17 +191,17 @@ proc readRawMessage(conn: Connection): Future[seq[byte]] {.async.} = trace "Recieved message header", header = lengthBuf.shortLog, length = length if length > SecioMaxMessageSize: # Verify length before casting! - trace "Received size of message exceed limits", conn = $conn, length = length + trace "Received size of message exceed limits", conn, length = length raise (ref SecioError)(msg: "Message exceeds maximum length") if length > 0: var buf = newSeq[byte](int(length)) await conn.readExactly(addr buf[0], buf.len) trace "Received message body", - conn = $conn, length = buf.len, buff = buf.shortLog + conn, length = buf.len, buff = buf.shortLog return buf - trace "Discarding 0-length payload", conn = $conn + trace "Discarding 0-length payload", conn method readMessage*(sconn: SecioConn): Future[seq[byte]] {.async.} = ## Read message from channel secure connection ``sconn``. @@ -312,12 +319,12 @@ method handshake*(s: Secio, conn: Connection, initiator: bool = false): Future[S var answer = await transactMessage(conn, request) if len(answer) == 0: - trace "Proposal exchange failed", conn = $conn + trace "Proposal exchange failed", conn raise (ref SecioError)(msg: "Proposal exchange failed") if not decodeProposal(answer, remoteNonce, remoteBytesPubkey, remoteExchanges, remoteCiphers, remoteHashes): - trace "Remote proposal decoding failed", conn = $conn + trace "Remote proposal decoding failed", conn raise (ref SecioError)(msg: "Remote proposal decoding failed") if not remotePubkey.init(remoteBytesPubkey): @@ -354,11 +361,11 @@ method handshake*(s: Secio, conn: Connection, initiator: bool = false): Future[S var localExchange = createExchange(epubkey, signature.getBytes()) var remoteExchange = await transactMessage(conn, localExchange) if len(remoteExchange) == 0: - trace "Corpus exchange failed", conn = $conn + trace "Corpus exchange failed", conn raise (ref SecioError)(msg: "Corpus exchange failed") if not decodeExchange(remoteExchange, remoteEBytesPubkey, remoteEBytesSig): - trace "Remote exchange decoding failed", conn = $conn + trace "Remote exchange decoding failed", conn raise (ref SecioError)(msg: "Remote exchange decoding failed") if not remoteESignature.init(remoteEBytesSig): diff --git a/libp2p/protocols/secure/secure.nim b/libp2p/protocols/secure/secure.nim index 3f5338e1b..7caf6686c 100644 --- a/libp2p/protocols/secure/secure.nim +++ b/libp2p/protocols/secure/secure.nim @@ -7,7 +7,7 @@ ## This file may not be copied, modified, or distributed except according to ## those terms. -import options +import std/[options, strformat] import chronos, chronicles, bearssl import ../protocol, ../../stream/streamseq, @@ -25,12 +25,18 @@ type stream*: Connection buf: StreamSeq -proc init*[T: SecureConn](C: type T, - conn: Connection, - peerInfo: PeerInfo, - observedAddr: Multiaddress, - timeout: Duration = DefaultConnectionTimeout): T = - result = C(stream: conn, +func shortLog*(conn: SecureConn): auto = + if conn.isNil: "SecureConn(nil)" + elif conn.peerInfo.isNil: $conn.oid + else: &"{shortLog(conn.peerInfo.peerId)}:{conn.oid}" +chronicles.formatIt(SecureConn): shortLog(it) + +proc init*(T: type SecureConn, + conn: Connection, + peerInfo: PeerInfo, + observedAddr: Multiaddress, + timeout: Duration = DefaultConnectionTimeout): T = + result = T(stream: conn, peerInfo: peerInfo, observedAddr: observedAddr, closeEvent: conn.closeEvent, @@ -71,7 +77,7 @@ proc handleConn*(s: Secure, # do not need to propogate CancelledError. discard except CatchableError as exc: - trace "error cleaning up secure connection", errMsg = exc.msg + trace "error cleaning up secure connection", err = exc.msg, sconn if not isNil(sconn): # All the errors are handled inside `cleanup()` procedure. @@ -83,18 +89,18 @@ method init*(s: Secure) {.gcsafe.} = procCall LPProtocol(s).init() proc handle(conn: Connection, proto: string) {.async, gcsafe.} = - trace "handling connection upgrade", proto + trace "handling connection upgrade", proto, conn try: # We don't need the result but we # definitely need to await the handshake discard await s.handleConn(conn, false) - trace "connection secured" + trace "connection secured", conn except CancelledError as exc: - warn "securing connection canceled" + warn "securing connection canceled", conn await conn.close() raise exc except CatchableError as exc: - warn "securing connection failed", msg = exc.msg + warn "securing connection failed", err = exc.msg, conn await conn.close() s.handler = handle diff --git a/libp2p/stream/bufferstream.nim b/libp2p/stream/bufferstream.nim index 25840e498..9ca59d744 100644 --- a/libp2p/stream/bufferstream.nim +++ b/libp2p/stream/bufferstream.nim @@ -30,7 +30,7 @@ ## will suspend until either the amount of elements in the ## buffer goes below ``maxSize`` or more data becomes available. -import deques, math +import std/[deques, math, strformat] import chronos, chronicles, metrics import ../stream/connection @@ -100,6 +100,12 @@ proc newAlreadyPipedError*(): ref CatchableError {.inline.} = proc newNotWritableError*(): ref CatchableError {.inline.} = result = newException(NotWritableError, "stream is not writable") +func shortLog*(s: BufferStream): auto = + if s.isNil: "BufferStream(nil)" + elif s.peerInfo.isNil: $s.oid + else: &"{shortLog(s.peerInfo.peerId)}:{s.oid}" +chronicles.formatIt(BufferStream): shortLog(it) + proc requestReadBytes(s: BufferStream): Future[void] = ## create a future that will complete when more ## data becomes available in the read buffer @@ -142,7 +148,7 @@ proc initBufferStream*(s: BufferStream, await s.writeLock.acquire() await handler(data) - trace "created bufferstream", oid = $s.oid + trace "created bufferstream", s proc newBufferStream*(handler: WriteHandler = nil, size: int = DefaultBufferSize, @@ -206,7 +212,7 @@ proc drainBuffer*(s: BufferStream) {.async.} = ## wait for all data in the buffer to be consumed ## - trace "draining buffer", len = s.len, oid = $s.oid + trace "draining buffer", len = s.len, s while s.len > 0: await s.dataReadEvent.wait() s.dataReadEvent.clear() @@ -296,7 +302,7 @@ method close*(s: BufferStream) {.async, gcsafe.} = try: ## close the stream and clear the buffer if not s.isClosed: - trace "closing bufferstream", oid = $s.oid + trace "closing bufferstream", s s.isEof = true for r in s.readReqs: if not(isNil(r)) and not(r.finished()): @@ -306,11 +312,11 @@ method close*(s: BufferStream) {.async, gcsafe.} = await procCall Connection(s).close() inc getBufferStreamTracker().closed - trace "bufferstream closed", oid = $s.oid + trace "bufferstream closed", s else: trace "attempt to close an already closed bufferstream", - trace = getStackTrace(), oid = $s.oid + trace = getStackTrace(), s except CancelledError as exc: raise exc except CatchableError as exc: - trace "error closing buffer stream", exc = exc.msg + trace "error closing buffer stream", exc = exc.msg, s diff --git a/libp2p/stream/chronosstream.nim b/libp2p/stream/chronosstream.nim index e6d28cf26..b6bb99d73 100644 --- a/libp2p/stream/chronosstream.nim +++ b/libp2p/stream/chronosstream.nim @@ -7,7 +7,7 @@ ## This file may not be copied, modified, or distributed except according to ## those terms. -import oids +import std/[oids, strformat] import chronos, chronicles import connection @@ -21,6 +21,12 @@ type ChronosStream* = ref object of Connection client: StreamTransport +func shortLog*(conn: ChronosStream): string = + if conn.isNil: "ChronosStream(nil)" + elif conn.peerInfo.isNil: $conn.oid + else: &"{shortLog(conn.peerInfo.peerId)}:{conn.oid}" +chronicles.formatIt(ChronosStream): shortLog(it) + method initStream*(s: ChronosStream) = if s.objName.len == 0: s.objName = "ChronosStream" @@ -88,7 +94,7 @@ method close*(s: ChronosStream) {.async.} = try: if not s.isClosed: trace "shutting down chronos stream", address = $s.client.remoteAddress(), - oid = $s.oid + s if not s.client.closed(): await s.client.closeWait() @@ -96,4 +102,4 @@ method close*(s: ChronosStream) {.async.} = except CancelledError as exc: raise exc except CatchableError as exc: - trace "error closing chronosstream", exc = exc.msg + trace "error closing chronosstream", exc = exc.msg, s diff --git a/libp2p/stream/connection.nim b/libp2p/stream/connection.nim index 3359e3344..20343ea81 100644 --- a/libp2p/stream/connection.nim +++ b/libp2p/stream/connection.nim @@ -7,13 +7,13 @@ ## This file may not be copied, modified, or distributed except according to ## those terms. -import hashes, oids +import std/[hashes, oids, strformat] import chronicles, chronos, metrics import lpstream, ../multiaddress, ../peerinfo -export lpstream +export lpstream, peerinfo logScope: topics = "connection" @@ -66,6 +66,12 @@ proc setupConnectionTracker(): ConnectionTracker = result.isLeaked = leakTransport addTracker(ConnectionTrackerName, result) +func shortLog*(conn: Connection): string = + if conn.isNil: "Connection(nil)" + elif conn.peerInfo.isNil: $conn.oid + else: &"{shortLog(conn.peerInfo.peerId)}:{conn.oid}" +chronicles.formatIt(Connection): shortLog(it) + method initStream*(s: Connection) = if s.objName.len == 0: s.objName = "Connection" @@ -77,7 +83,7 @@ method initStream*(s: Connection) = s.timeoutHandler = proc() {.async.} = await s.close() - trace "timeout set at", timeout = s.timeout.millis + trace "timeout set at", timeout = s.timeout.millis, s doAssert(isNil(s.timerTaskFut)) # doAssert(s.timeout > 0.millis) if s.timeout > 0.millis: @@ -94,10 +100,6 @@ method close*(s: Connection) {.async.} = await procCall LPStream(s).close() inc getConnectionTracker().closed -proc `$`*(conn: Connection): string = - if not isNil(conn.peerInfo): - result = conn.peerInfo.id - func hash*(p: Connection): Hash = cast[pointer](p).hash @@ -110,9 +112,6 @@ proc timeoutMonitor(s: Connection) {.async, gcsafe.} = ## be reset ## - logScope: - oid = $s.oid - try: while true: await sleepAsync(s.timeout) @@ -127,14 +126,14 @@ proc timeoutMonitor(s: Connection) {.async, gcsafe.} = break # reset channel on innactivity timeout - trace "Connection timed out" + trace "Connection timed out", s if not(isNil(s.timeoutHandler)): await s.timeoutHandler() except CancelledError as exc: raise exc except CatchableError as exc: - trace "exception in timeout", exc = exc.msg + trace "exception in timeout", exc = exc.msg, s proc init*(C: type Connection, peerInfo: PeerInfo, diff --git a/libp2p/switch.nim b/libp2p/switch.nim index d31067c71..b12805289 100644 --- a/libp2p/switch.nim +++ b/libp2p/switch.nim @@ -30,9 +30,6 @@ import stream/connection, peerid, errors -chronicles.formatIt(PeerInfo): $it -chronicles.formatIt(PeerID): $it - logScope: topics = "switch" @@ -101,7 +98,7 @@ proc triggerConnEvent(s: Switch, peerId: PeerID, event: ConnEvent) {.async, gcsa except CancelledError as exc: raise exc except CatchableError as exc: # handlers should not raise! - warn "exception in trigger ConnEvents", exc = exc.msg + warn "exception in trigger ConnEvents", exc = exc.msg, peerId proc disconnect*(s: Switch, peerId: PeerID) {.async, gcsafe.} @@ -120,7 +117,7 @@ proc secure(s: Switch, conn: Connection): Future[Connection] {.async, gcsafe.} = if manager.len == 0: raise newException(CatchableError, "Unable to negotiate a secure channel!") - trace "securing connection", codec = manager + trace "securing connection", codec = manager, conn let secureProtocol = s.secureManagers.filterIt(it.codec == manager) # ms.select should deal with the correctness of this @@ -154,7 +151,7 @@ proc identify(s: Switch, conn: Connection) {.async, gcsafe.} = if info.protos.len > 0: conn.peerInfo.protocols = info.protos - trace "identify: identified remote peer", peer = $conn.peerInfo + trace "identify: identified remote peer", conn proc identify(s: Switch, muxer: Muxer) {.async, gcsafe.} = # new stream for identify @@ -171,14 +168,14 @@ proc identify(s: Switch, muxer: Muxer) {.async, gcsafe.} = proc mux(s: Switch, conn: Connection): Future[Muxer] {.async, gcsafe.} = ## mux incoming connection - trace "muxing connection", peer = $conn + trace "muxing connection", conn if s.muxers.len == 0: - warn "no muxers registered, skipping upgrade flow" + warn "no muxers registered, skipping upgrade flow", conn return let muxerName = await s.ms.select(conn, toSeq(s.muxers.keys())) if muxerName.len == 0 or muxerName == "na": - debug "no muxer available, early exit", peer = $conn + debug "no muxer available, early exit", conn return # create new muxer for connection @@ -190,13 +187,13 @@ proc mux(s: Switch, conn: Connection): Future[Muxer] {.async, gcsafe.} = s.connManager.storeOutgoing(muxer.connection) s.connManager.storeMuxer(muxer) - trace "found a muxer", name = muxerName, peer = $conn + trace "found a muxer", name = muxerName, conn # start muxer read loop - the future will complete when loop ends let handlerFut = muxer.handle() # store it in muxed connections if we have a peer for it - trace "adding muxer for peer", peer = conn.peerInfo.id + trace "adding muxer for peer", conn s.connManager.storeMuxer(muxer, handlerFut) # update muxer with handler return muxer @@ -214,7 +211,7 @@ proc upgradeOutgoing(s: Switch, conn: Connection): Future[Connection] {.async, g raise newException(CatchableError, "current version of nim-libp2p requires that secure protocol negotiates peerid") - trace "upgrading connection", conn = $sconn, uoid = $conn.oid + trace "upgrading connection", conn let muxer = await s.mux(sconn) # mux it if possible if muxer == nil: # TODO this might be relaxed in the future @@ -227,21 +224,19 @@ proc upgradeOutgoing(s: Switch, conn: Connection): Future[Connection] {.async, g # Identify is non-essential, though if it fails, it might indicate that # the connection was closed already - this will be picked up by the read # loop - debug "Could not identify connection", - err = exc.msg, conn = $conn, uoid = $conn.oid + debug "Could not identify connection", err = exc.msg, conn if isNil(sconn.peerInfo): await sconn.close() raise newException(CatchableError, "unable to identify connection, stopping upgrade") - trace "successfully upgraded outgoing connection", - conn = $sconn, uoid = $conn.oid, oid = $sconn.oid + trace "successfully upgraded outgoing connection", conn, sconn return sconn proc upgradeIncoming(s: Switch, conn: Connection) {.async, gcsafe.} = - trace "upgrading incoming connection", conn = $conn, oid = $conn.oid + trace "upgrading incoming connection", conn let ms = newMultistream() # secure incoming connections @@ -250,7 +245,7 @@ proc upgradeIncoming(s: Switch, conn: Connection) {.async, gcsafe.} = {.async, gcsafe, closure.} = var sconn: Connection - trace "Securing connection", oid = $conn.oid + trace "Securing connection", conn let secure = s.secureManagers.filterIt(it.codec == proto)[0] try: @@ -271,8 +266,7 @@ proc upgradeIncoming(s: Switch, conn: Connection) {.async, gcsafe.} = except CancelledError as exc: raise exc except CatchableError as exc: - debug "ending secured handler", - err = exc.msg, conn = $conn, oid = $conn.oid + debug "ending secured handler", err = exc.msg, conn if (await ms.select(conn)): # just handshake # add the secure handlers @@ -302,27 +296,25 @@ proc internalConnect(s: Switch, # This connection should already have been removed from the connection # manager - it's essentially a bug that we end up here - we'll fail # for now, hoping that this will clean themselves up later... - warn "dead connection in connection manager", peer = $peerId + warn "dead connection in connection manager", peerId await conn.close() raise newException(CatchableError, "Zombie connection encountered") - trace "Reusing existing connection", - oid = $conn.oid, direction = $conn.dir, peer = $peerId - + trace "Reusing existing connection", conn return conn - trace "Dialing peer" + trace "Dialing peer", peerId for t in s.transports: # for each transport for a in addrs: # for each address if t.handles(a): # check if it can dial it - trace "Dialing address", address = $a + trace "Dialing address", address = $a, peerId let dialed = try: await t.dial(a) except CancelledError as exc: - trace "dialing canceled", exc = exc.msg, peer = $peerId + trace "dialing canceled", exc = exc.msg, peerId raise exc except CatchableError as exc: - trace "dialing failed", exc = exc.msg, peer = $peerId + trace "dialing failed", exc = exc.msg, peerId libp2p_failed_dials.inc() continue # Try the next address @@ -337,7 +329,7 @@ proc internalConnect(s: Switch, # If we failed to establish the connection through one transport, # we won't succeeed through another - no use in trying again await dialed.close() - debug "upgrade failed", exc = exc.msg + debug "Upgrade failed", exc = exc.msg, peerId if exc isnot CancelledError: libp2p_failed_upgrade.inc() raise exc @@ -345,10 +337,7 @@ proc internalConnect(s: Switch, doAssert not isNil(upgraded), "connection died after upgradeOutgoing" conn = upgraded - trace "dial successful", - peer = $peerId, - oid = $upgraded.oid, - peerInfo = shortLog(upgraded.peerInfo) + trace "dial successful", conn, peerInfo = upgraded.peerInfo break finally: if lock.locked(): @@ -374,10 +363,10 @@ proc internalConnect(s: Switch, # This is top-level procedure which will work as separate task, so it # do not need to propogate CancelledError. trace "Unexpected cancellation in switch peer connect cleanup", - peer = $peerId + conn except CatchableError as exc: trace "Unexpected exception in switch peer connect cleanup", - errMsg = exc.msg, peer = $peerId + errMsg = exc.msg, conn # All the errors are handled inside `cleanup()` procedure. asyncSpawn peerCleanup() @@ -389,9 +378,7 @@ proc connect*(s: Switch, peerId: PeerID, addrs: seq[MultiAddress]) {.async.} = discard await s.internalConnect(peerId, addrs) proc negotiateStream(s: Switch, stream: Connection, proto: string): Future[Connection] {.async.} = - trace "Attempting to select remote", proto = proto, - streamOid = $stream.oid, - oid = $stream.oid + trace "Attempting to select remote", proto = proto, stream if not await s.ms.select(stream, proto): await stream.close() @@ -430,11 +417,11 @@ proc dial*(s: Switch, return await s.negotiateStream(stream, proto) except CancelledError as exc: - trace "dial canceled" + trace "dial canceled", conn await cleanup() raise exc except CatchableError as exc: - trace "error dialing", exc = exc.msg + trace "error dialing", exc = exc.msg, conn await cleanup() raise exc @@ -450,7 +437,7 @@ proc mount*[T: LPProtocol](s: Switch, proto: T) {.gcsafe.} = s.ms.addHandler(proto.codec, proto) proc start*(s: Switch): Future[seq[Future[void]]] {.async, gcsafe.} = - trace "starting switch for peer", peerInfo = shortLog(s.peerInfo) + trace "starting switch for peer", peerInfo = s.peerInfo proc handle(conn: Connection): Future[void] {.async, closure, gcsafe.} = try: @@ -458,7 +445,7 @@ proc start*(s: Switch): Future[seq[Future[void]]] {.async, gcsafe.} = except CancelledError as exc: raise exc except CatchableError as exc: - trace "Exception occurred in Switch.start", exc = exc.msg + trace "Error in connection handler", exc = exc.msg, conn finally: await conn.close() @@ -470,7 +457,7 @@ proc start*(s: Switch): Future[seq[Future[void]]] {.async, gcsafe.} = s.peerInfo.addrs[i] = t.ma # update peer's address startFuts.add(server) - debug "started libp2p node", peer = $s.peerInfo, addrs = s.peerInfo.addrs + debug "started libp2p node", peerInfo = s.peerInfo result = startFuts # listen for incoming connections proc stop*(s: Switch) {.async.} = @@ -507,7 +494,7 @@ proc muxerHandler(s: Switch, muxer: Muxer) {.async, gcsafe.} = # Identify is non-essential, though if it fails, it might indicate that # the connection was closed already - this will be picked up by the read # loop - debug "Could not identify connection", err = exc.msg + debug "Could not identify connection", err = exc.msg, muxer try: let peerId = muxer.connection.peerInfo.peerId @@ -523,7 +510,7 @@ proc muxerHandler(s: Switch, muxer: Muxer) {.async, gcsafe.} = debug "Unexpected cancellation in switch muxer cleanup" except CatchableError as exc: debug "Unexpected exception in switch muxer cleanup", - errMsg = exc.msg + errMsg = exc.msg, muxer proc peerStartup() {.async.} = try: @@ -533,10 +520,10 @@ proc muxerHandler(s: Switch, muxer: Muxer) {.async, gcsafe.} = except CancelledError: # This is top-level procedure which will work as separate task, so it # do not need to propogate CancelledError. - debug "Unexpected cancellation in switch muxer startup" + debug "Unexpected cancellation in switch muxer startup", muxer except CatchableError as exc: debug "Unexpected exception in switch muxer startup", - errMsg = exc.msg + errMsg = exc.msg, muxer # All the errors are handled inside `peerStartup()` procedure. asyncSpawn peerStartup() @@ -550,7 +537,7 @@ proc muxerHandler(s: Switch, muxer: Muxer) {.async, gcsafe.} = except CatchableError as exc: await muxer.close() libp2p_failed_upgrade.inc() - trace "exception in muxer handler", exc = exc.msg + trace "exception in muxer handler", exc = exc.msg, muxer proc newSwitch*(peerInfo: PeerInfo, transports: seq[Transport], @@ -573,7 +560,7 @@ proc newSwitch*(peerInfo: PeerInfo, let s = result # can't capture result result.streamHandler = proc(stream: Connection) {.async, gcsafe.} = try: - trace "handling connection for", peerInfo = $stream + trace "handling connection for", stream defer: if not(isNil(stream)): await stream.close() @@ -581,7 +568,7 @@ proc newSwitch*(peerInfo: PeerInfo, except CancelledError as exc: raise exc except CatchableError as exc: - trace "exception in stream handler", exc = exc.msg + trace "exception in stream handler", exc = exc.msg, stream result.mount(identity) for key, val in muxers: diff --git a/tests/pubsub/testgossipsub.nim b/tests/pubsub/testgossipsub.nim index 84870caeb..924836e4a 100644 --- a/tests/pubsub/testgossipsub.nim +++ b/tests/pubsub/testgossipsub.nim @@ -517,9 +517,9 @@ suite "GossipSub": closureScope: var dialerNode = dialer handler = proc(topic: string, data: seq[byte]) {.async, gcsafe, closure.} = - if dialerNode.peerInfo.id notin seen: - seen[dialerNode.peerInfo.id] = 0 - seen[dialerNode.peerInfo.id].inc + if $dialerNode.peerInfo.peerId notin seen: + seen[$dialerNode.peerInfo.peerId] = 0 + seen[$dialerNode.peerInfo.peerId].inc check topic == "foobar" if not seenFut.finished() and seen.len >= runs: seenFut.complete() @@ -529,8 +529,8 @@ suite "GossipSub": await allFuturesThrowing(subs) tryPublish await wait(nodes[0].publish("foobar", - cast[seq[byte]]("from node " & - nodes[1].peerInfo.id)), + toBytes("from node " & + $nodes[1].peerInfo.peerId)), 1.minutes), runs, 5.seconds await wait(seenFut, 2.minutes) @@ -567,7 +567,7 @@ suite "GossipSub": await allFuturesThrowing(nodes.mapIt(it.start())) await subscribeNodes(nodes) - var seen: Table[string, int] + var seen: Table[PeerID, int] var subs: seq[Future[void]] var seenFut = newFuture[void]() for dialer in nodes: @@ -576,9 +576,9 @@ suite "GossipSub": var dialerNode = dialer handler = proc(topic: string, data: seq[byte]) {.async, gcsafe, closure.} = - if dialerNode.peerInfo.id notin seen: - seen[dialerNode.peerInfo.id] = 0 - seen[dialerNode.peerInfo.id].inc + if dialerNode.peerInfo.peerId notin seen: + seen[dialerNode.peerInfo.peerId] = 0 + seen[dialerNode.peerInfo.peerId].inc check topic == "foobar" if not seenFut.finished() and seen.len >= runs: seenFut.complete() @@ -588,8 +588,8 @@ suite "GossipSub": await allFuturesThrowing(subs) tryPublish await wait(nodes[0].publish("foobar", - cast[seq[byte]]("from node " & - nodes[1].peerInfo.id)), + toBytes("from node " & + $nodes[1].peerInfo.peerId)), 1.minutes), 2, 5.seconds await wait(seenFut, 5.minutes) diff --git a/tests/testpeer.nim b/tests/testpeer.nim index c9425f0cf..3418e2091 100644 --- a/tests/testpeer.nim +++ b/tests/testpeer.nim @@ -200,10 +200,10 @@ suite "Peer testing suite": p1 == p2 p1 == p4 p2 == p4 - p1.pretty() == PeerIDs[i] - p2.pretty() == PeerIDs[i] - p3.pretty() == PeerIDs[i] - p4.pretty() == PeerIDs[i] + $p1 == PeerIDs[i] + $p2 == PeerIDs[i] + $p3 == PeerIDs[i] + $p4 == PeerIDs[i] p1.match(seckey) == true p1.match(pubkey) == true p1.getBytes() == p2.getBytes()