From 034a1e8b1b3657856390d39a64bad0bd867f31b4 Mon Sep 17 00:00:00 2001 From: Dmitriy Ryajov Date: Mon, 23 Nov 2020 15:02:23 -0600 Subject: [PATCH] small cleanups from tcp-limits2 (#446) --- libp2p/muxers/mplex/lpchannel.nim | 2 +- libp2p/protocols/pubsub/floodsub.nim | 2 +- libp2p/protocols/pubsub/gossipsub.nim | 21 +++++++++++---------- libp2p/protocols/pubsub/pubsubpeer.nim | 19 +++++++++++-------- 4 files changed, 24 insertions(+), 20 deletions(-) diff --git a/libp2p/muxers/mplex/lpchannel.nim b/libp2p/muxers/mplex/lpchannel.nim index f577f6356..bb7eb1a45 100644 --- a/libp2p/muxers/mplex/lpchannel.nim +++ b/libp2p/muxers/mplex/lpchannel.nim @@ -216,6 +216,6 @@ proc init*( when chronicles.enabledLogLevel == LogLevel.TRACE: chann.name = if chann.name.len > 0: chann.name else: $chann.oid - trace "Created new lpchannel", chann, id, initiator + trace "Created new lpchannel", s = chann, id, initiator return chann diff --git a/libp2p/protocols/pubsub/floodsub.nim b/libp2p/protocols/pubsub/floodsub.nim index 295997e93..fab4b0f25 100644 --- a/libp2p/protocols/pubsub/floodsub.nim +++ b/libp2p/protocols/pubsub/floodsub.nim @@ -117,7 +117,7 @@ method init*(f: FloodSub) = await f.handleConn(conn, proto) except CancelledError: # This is top-level procedure which will work as separate task, so it - # do not need to propogate CancelledError. + # do not need to propagate CancelledError. trace "Unexpected cancellation in floodsub handler", conn except CatchableError as exc: trace "FloodSub handler leaks an error", exc = exc.msg, conn diff --git a/libp2p/protocols/pubsub/gossipsub.nim b/libp2p/protocols/pubsub/gossipsub.nim index 7716e899d..7e50a092a 100644 --- a/libp2p/protocols/pubsub/gossipsub.nim +++ b/libp2p/protocols/pubsub/gossipsub.nim @@ -32,7 +32,7 @@ const GossipSubCodec_10* = "/meshsub/1.0.0" # overlay parameters -const +const GossipSubD* = 6 GossipSubDlo* = 4 GossipSubDhi* = 12 @@ -447,7 +447,7 @@ proc rebalanceMesh(g: GossipSub, topic: string) {.async.} = meshPeers.keepIf do (x: PubSubPeer) -> bool: x.outbound if meshPeers.len < g.parameters.dOut: trace "replenishing mesh outbound quota", peers = g.mesh.peers(topic) - + grafts = toSeq( g.gossipsub.getOrDefault(topic, initHashSet[PubSubPeer]()) - g.mesh.getOrDefault(topic, initHashSet[PubSubPeer]()) @@ -508,7 +508,7 @@ proc rebalanceMesh(g: GossipSub, topic: string) {.async.} = inbound &= peer # ensure that there are at least D_out peers first and rebalance to g.d after that - let maxOutboundPrunes = + let maxOutboundPrunes = block: var count = 0 for peer in mesh: @@ -815,7 +815,7 @@ proc heartbeat(g: GossipSub) {.async.} = g.broadcast(prunes, prune) await g.rebalanceMesh(t) - + libp2p_gossipsub_peers_mesh_sum.set(totalMeshPeers.int64) libp2p_gossipsub_peers_gossipsub_sum.set(totalGossipPeers.int64) @@ -836,7 +836,8 @@ proc heartbeat(g: GossipSub) {.async.} = except CancelledError as exc: raise exc except CatchableError as exc: - warn "exception ocurred in gossipsub heartbeat", exc = exc.msg, trace = exc.getStackTrace() + warn "exception ocurred in gossipsub heartbeat", exc = exc.msg, + trace = exc.getStackTrace() for trigger in g.heartbeatEvents: trace "firing heartbeat event", instance = cast[int](g) @@ -961,7 +962,7 @@ proc handleGraft(g: GossipSub, backoff: g.parameters.pruneBackoff.seconds.uint64)) g.punishPeer(peer, @[topic]) - + continue if peer.peerId in g.backingOff and g.backingOff[peer.peerId] > Moment.now(): @@ -973,9 +974,9 @@ proc handleGraft(g: GossipSub, topicID: graft.topicID, peers: @[], # omitting heavy computation here as the remote did something illegal backoff: g.parameters.pruneBackoff.seconds.uint64)) - + g.punishPeer(peer, @[topic]) - + continue if peer notin g.peerStats: @@ -1055,8 +1056,8 @@ proc handleIHave(g: GossipSub, dec peer.iHaveBudget else: return - - # shuffling result.messageIDs before sending it out to increase the likelihood + + # shuffling result.messageIDs before sending it out to increase the likelihood # of getting an answer if the peer truncates the list due to internal size restrictions. shuffle(result.messageIDs) diff --git a/libp2p/protocols/pubsub/pubsubpeer.nim b/libp2p/protocols/pubsub/pubsubpeer.nim index 9f5d70903..75c990041 100644 --- a/libp2p/protocols/pubsub/pubsubpeer.nim +++ b/libp2p/protocols/pubsub/pubsubpeer.nim @@ -129,7 +129,7 @@ proc handle*(p: PubSubPeer, conn: Connection) {.async.} = await conn.close() except CancelledError: # This is top-level procedure which will work as separate task, so it - # do not need to propogate CancelledError. + # do not need to propagate CancelledError. trace "Unexpected cancellation in PubSubPeer.handle" except CatchableError as exc: trace "Exception occurred in PubSubPeer.handle", @@ -193,6 +193,13 @@ proc sendImpl(conn: Connection, encoded: seq[byte]) {.async.} = await conn.close() # This will clean up the send connection +template sendMetrics(msg: RPCMsg): untyped = + when defined(libp2p_expensive_metrics): + for x in msg.messages: + for t in x.topicIDs: + # metrics + libp2p_pubsub_sent_messages.inc(labelValues = [$p.peerId, t]) + proc send*(p: PubSubPeer, msg: RPCMsg, anonymize: bool) = doAssert(not isNil(p), "pubsubpeer nil!") @@ -209,13 +216,15 @@ proc send*(p: PubSubPeer, msg: RPCMsg, anonymize: bool) = # some forms of valid but redundantly encoded protobufs with unknown or # duplicated fields let encoded = if p.hasObservers(): + var mm = msg # trigger send hooks - var mm = msg # hooks can modify the message p.sendObservers(mm) + sendMetrics(mm) encodeRpcMsg(mm, anonymize) else: # If there are no send hooks, we redundantly re-encode the message to # protobuf for every peer - this could easily be improved! + sendMetrics(msg) encodeRpcMsg(msg, anonymize) if encoded.len <= 0: @@ -226,12 +235,6 @@ proc send*(p: PubSubPeer, msg: RPCMsg, anonymize: bool) = # connection to the spawned send task asyncSpawn sendImpl(conn, encoded) - when defined(libp2p_expensive_metrics): - for x in msg.messages: - for t in x.topicIDs: - # metrics - libp2p_pubsub_sent_messages.inc(labelValues = [$p.peerId, t]) - proc newPubSubPeer*(peerId: PeerID, getConn: GetConn, onEvent: OnEvent,