From e655a510cdba16c5c61fcf333f7c9146fb95f4b3 Mon Sep 17 00:00:00 2001 From: Jacek Sieka Date: Sun, 2 Aug 2020 12:22:49 +0200 Subject: [PATCH] misc cleanups (#303) --- libp2p/multistream.nim | 10 +++++----- libp2p/muxers/mplex/lpchannel.nim | 2 +- libp2p/protocols/identify.nim | 4 ++-- libp2p/protocols/pubsub/gossipsub.nim | 6 +++--- libp2p/protocols/pubsub/pubsub.nim | 5 ++--- libp2p/protocols/pubsub/rpc/protobuf.nim | 2 +- libp2p/stream/chronosstream.nim | 3 ++- libp2p/stream/connection.nim | 5 ++++- libp2p/stream/lpstream.nim | 3 +++ libp2p/switch.nim | 15 ++++++++------- libp2p/transports/tcptransport.nim | 3 ++- 11 files changed, 33 insertions(+), 25 deletions(-) diff --git a/libp2p/multistream.nim b/libp2p/multistream.nim index b82c518..535de0c 100644 --- a/libp2p/multistream.nim +++ b/libp2p/multistream.nim @@ -129,16 +129,16 @@ proc handle*(m: MultistreamSelect, conn: Connection, active: bool = false) {.asy if not handshaked and ms != Codec: error "expected handshake message", instead=ms - raise newException(CatchableError, + raise newException(CatchableError, "MultistreamSelect handling failed, invalid first message") - trace "handle: got request for ", ms + trace "handle: got request", ms if ms.len() <= 0: trace "handle: invalid proto" await conn.write(Na) if m.handlers.len() == 0: - trace "handle: sending `na` for protocol ", protocol = ms + trace "handle: sending `na` for protocol", protocol = ms await conn.write(Na) continue @@ -159,11 +159,11 @@ proc handle*(m: MultistreamSelect, conn: Connection, active: bool = false) {.asy else: for h in m.handlers: if (not isNil(h.match) and h.match(ms)) or ms == h.proto: - trace "found handler for", protocol = ms + trace "found handler", protocol = ms await conn.writeLp((h.proto & "\n")) await h.protocol.handler(conn, ms) return - debug "no handlers for ", protocol = ms + debug "no handlers", protocol = ms await conn.write(Na) except CancelledError as exc: await conn.close() diff --git a/libp2p/muxers/mplex/lpchannel.nim b/libp2p/muxers/mplex/lpchannel.nim index 74f0b30..4671c22 100644 --- a/libp2p/muxers/mplex/lpchannel.nim +++ b/libp2p/muxers/mplex/lpchannel.nim @@ -81,7 +81,7 @@ template withEOFExceptions(body: untyped): untyped = proc cleanupTimer(s: LPChannel) {.async.} = ## cleanup timers - if not s.timerTaskFut.finished: + if not isNil(s.timerTaskFut) and not s.timerTaskFut.finished: await s.timerTaskFut.cancelAndWait() proc closeMessage(s: LPChannel) {.async.} = diff --git a/libp2p/protocols/identify.nim b/libp2p/protocols/identify.nim index 4de5d49..611469a 100644 --- a/libp2p/protocols/identify.nim +++ b/libp2p/protocols/identify.nim @@ -106,10 +106,10 @@ method init*(p: Identify) = proc handle(conn: Connection, proto: string) {.async, gcsafe, closure.} = try: defer: - trace "exiting identify handler", oid = conn.oid + trace "exiting identify handler", oid = $conn.oid await conn.close() - trace "handling identify request", oid = conn.oid + trace "handling identify request", oid = $conn.oid var pb = encodeMsg(p.peerInfo, conn.observedAddr) await conn.writeLp(pb.buffer) except CancelledError as exc: diff --git a/libp2p/protocols/pubsub/gossipsub.nim b/libp2p/protocols/pubsub/gossipsub.nim index 94c1302..969adfd 100644 --- a/libp2p/protocols/pubsub/gossipsub.nim +++ b/libp2p/protocols/pubsub/gossipsub.nim @@ -228,9 +228,9 @@ proc heartbeat(g: GossipSub) {.async.} = let peers = g.getGossipPeers() var sent: seq[Future[void]] - for peer in peers.keys: - if peer in g.peers: - sent &= g.peers[peer].send(RPCMsg(control: some(peers[peer]))) + for peer, control in peers: + g.peers.withValue(peer, pubsubPeer) do: + sent &= pubsubPeer[].send(RPCMsg(control: some(control))) checkFutures(await allFinished(sent)) g.mcache.shift() # shift the cache diff --git a/libp2p/protocols/pubsub/pubsub.nim b/libp2p/protocols/pubsub/pubsub.nim index 251a091..8f1822f 100644 --- a/libp2p/protocols/pubsub/pubsub.nim +++ b/libp2p/protocols/pubsub/pubsub.nim @@ -184,9 +184,8 @@ method handleConn*(p: PubSub, let peer = p.getOrCreatePeer(conn.peerInfo, proto) - let topics = toSeq(p.topics.keys) - if topics.len > 0: - await p.sendSubs(peer, topics, true) + if p.topics.len > 0: + await p.sendSubs(peer, toSeq(p.topics.keys), true) try: peer.handler = handler diff --git a/libp2p/protocols/pubsub/rpc/protobuf.nim b/libp2p/protocols/pubsub/rpc/protobuf.nim index 4958320..327b815 100644 --- a/libp2p/protocols/pubsub/rpc/protobuf.nim +++ b/libp2p/protocols/pubsub/rpc/protobuf.nim @@ -195,7 +195,7 @@ proc decodeMessage*(pb: ProtoBuffer): ProtoResult[Message] {.inline.} = else: trace "decodeMessage: data is missing" if ? pb.getField(3, msg.seqno): - trace "decodeMessage: read seqno", seqno = msg.data.shortLog() + trace "decodeMessage: read seqno", seqno = msg.seqno else: trace "decodeMessage: seqno is missing" if ? pb.getRepeatedField(4, msg.topicIDs): diff --git a/libp2p/stream/chronosstream.nim b/libp2p/stream/chronosstream.nim index 1d200d5..33c17b3 100644 --- a/libp2p/stream/chronosstream.nim +++ b/libp2p/stream/chronosstream.nim @@ -7,6 +7,7 @@ ## This file may not be copied, modified, or distributed except according to ## those terms. +import oids import chronos, chronicles import connection @@ -73,7 +74,7 @@ method close*(s: ChronosStream) {.async.} = try: if not s.isClosed: trace "shutting down chronos stream", address = $s.client.remoteAddress(), - oid = s.oid + oid = $s.oid if not s.client.closed(): await s.client.closeWait() diff --git a/libp2p/stream/connection.nim b/libp2p/stream/connection.nim index 60d5be8..4c060c3 100644 --- a/libp2p/stream/connection.nim +++ b/libp2p/stream/connection.nim @@ -8,13 +8,16 @@ ## those terms. import hashes -import chronos, metrics +import chronicles, chronos, metrics import lpstream, ../multiaddress, ../peerinfo export lpstream +logScope: + topics = "connection" + const ConnectionTrackerName* = "libp2p.connection" diff --git a/libp2p/stream/lpstream.nim b/libp2p/stream/lpstream.nim index cea0933..1c42691 100644 --- a/libp2p/stream/lpstream.nim +++ b/libp2p/stream/lpstream.nim @@ -16,6 +16,9 @@ import ../varint, declareGauge(libp2p_open_streams, "open stream instances", labels = ["type"]) +logScope: + topics = "lpstream" + type LPStream* = ref object of RootObj closeEvent*: AsyncEvent diff --git a/libp2p/switch.nim b/libp2p/switch.nim index 1accfe5..4cb3057 100644 --- a/libp2p/switch.nim +++ b/libp2p/switch.nim @@ -167,12 +167,11 @@ proc mux(s: Switch, conn: Connection) {.async, gcsafe.} = ## mux incoming connection trace "muxing connection", peer = $conn - let muxers = toSeq(s.muxers.keys) - if muxers.len == 0: + if s.muxers.len == 0: warn "no muxers registered, skipping upgrade flow" return - let muxerName = await s.ms.select(conn, muxers) + let muxerName = await s.ms.select(conn, toSeq(s.muxers.keys())) if muxerName.len == 0 or muxerName == "na": debug "no muxer available, early exit", peer = $conn return @@ -387,7 +386,9 @@ proc dial*(s: Switch, await conn.close() raise newException(CatchableError, "Couldn't get muxed stream") - trace "Attempting to select remote", proto = proto, oid = conn.oid + trace "Attempting to select remote", proto = proto, + streamOid = $stream.oid, + oid = $conn.oid if not await s.ms.select(stream, proto): await stream.close() raise newException(CatchableError, "Unable to select sub-protocol " & proto) @@ -498,7 +499,7 @@ proc subscribePeerInternal(s: Switch, peerInfo: PeerInfo) {.async, gcsafe.} = if not(isNil(stream)): await stream.close() -proc pubsubMonitor(switch: Switch, peer: PeerInfo) {.async.} = +proc pubsubMonitor(s: Switch, peer: PeerInfo) {.async.} = ## while peer connected maintain a ## pubsub connection as well ## @@ -506,11 +507,11 @@ proc pubsubMonitor(switch: Switch, peer: PeerInfo) {.async.} = var tries = 0 var backoffFactor = 5 # up to ~10 mins var backoff = 1.seconds - while switch.isConnected(peer) and + while s.isConnected(peer) and tries < MaxPubsubReconnectAttempts: try: debug "subscribing to pubsub peer", peer = $peer - await switch.subscribePeerInternal(peer) + await s.subscribePeerInternal(peer) except CancelledError as exc: raise exc except CatchableError as exc: diff --git a/libp2p/transports/tcptransport.nim b/libp2p/transports/tcptransport.nim index 032b3e5..63283ab 100644 --- a/libp2p/transports/tcptransport.nim +++ b/libp2p/transports/tcptransport.nim @@ -7,6 +7,7 @@ ## This file may not be copied, modified, or distributed except according to ## those terms. +import oids import chronos, chronicles, sequtils import transport, ../errors, @@ -71,7 +72,7 @@ proc connHandler*(t: TcpTransport, proc cleanup() {.async.} = try: await client.join() - trace "cleaning up client", addrs = $client.remoteAddress, connoid = conn.oid + trace "cleaning up client", addrs = $client.remoteAddress, connoid = $conn.oid if not(isNil(conn)): await conn.close() t.clients.keepItIf(it != client)