small cleanups from tcp-limits2 (#446)
This commit is contained in:
parent
1d16d22f5f
commit
034a1e8b1b
|
@ -216,6 +216,6 @@ proc init*(
|
||||||
when chronicles.enabledLogLevel == LogLevel.TRACE:
|
when chronicles.enabledLogLevel == LogLevel.TRACE:
|
||||||
chann.name = if chann.name.len > 0: chann.name else: $chann.oid
|
chann.name = if chann.name.len > 0: chann.name else: $chann.oid
|
||||||
|
|
||||||
trace "Created new lpchannel", chann, id, initiator
|
trace "Created new lpchannel", s = chann, id, initiator
|
||||||
|
|
||||||
return chann
|
return chann
|
||||||
|
|
|
@ -117,7 +117,7 @@ method init*(f: FloodSub) =
|
||||||
await f.handleConn(conn, proto)
|
await f.handleConn(conn, proto)
|
||||||
except CancelledError:
|
except CancelledError:
|
||||||
# This is top-level procedure which will work as separate task, so it
|
# This is top-level procedure which will work as separate task, so it
|
||||||
# do not need to propogate CancelledError.
|
# do not need to propagate CancelledError.
|
||||||
trace "Unexpected cancellation in floodsub handler", conn
|
trace "Unexpected cancellation in floodsub handler", conn
|
||||||
except CatchableError as exc:
|
except CatchableError as exc:
|
||||||
trace "FloodSub handler leaks an error", exc = exc.msg, conn
|
trace "FloodSub handler leaks an error", exc = exc.msg, conn
|
||||||
|
|
|
@ -32,7 +32,7 @@ const
|
||||||
GossipSubCodec_10* = "/meshsub/1.0.0"
|
GossipSubCodec_10* = "/meshsub/1.0.0"
|
||||||
|
|
||||||
# overlay parameters
|
# overlay parameters
|
||||||
const
|
const
|
||||||
GossipSubD* = 6
|
GossipSubD* = 6
|
||||||
GossipSubDlo* = 4
|
GossipSubDlo* = 4
|
||||||
GossipSubDhi* = 12
|
GossipSubDhi* = 12
|
||||||
|
@ -447,7 +447,7 @@ proc rebalanceMesh(g: GossipSub, topic: string) {.async.} =
|
||||||
meshPeers.keepIf do (x: PubSubPeer) -> bool: x.outbound
|
meshPeers.keepIf do (x: PubSubPeer) -> bool: x.outbound
|
||||||
if meshPeers.len < g.parameters.dOut:
|
if meshPeers.len < g.parameters.dOut:
|
||||||
trace "replenishing mesh outbound quota", peers = g.mesh.peers(topic)
|
trace "replenishing mesh outbound quota", peers = g.mesh.peers(topic)
|
||||||
|
|
||||||
grafts = toSeq(
|
grafts = toSeq(
|
||||||
g.gossipsub.getOrDefault(topic, initHashSet[PubSubPeer]()) -
|
g.gossipsub.getOrDefault(topic, initHashSet[PubSubPeer]()) -
|
||||||
g.mesh.getOrDefault(topic, initHashSet[PubSubPeer]())
|
g.mesh.getOrDefault(topic, initHashSet[PubSubPeer]())
|
||||||
|
@ -508,7 +508,7 @@ proc rebalanceMesh(g: GossipSub, topic: string) {.async.} =
|
||||||
inbound &= peer
|
inbound &= peer
|
||||||
|
|
||||||
# ensure that there are at least D_out peers first and rebalance to g.d after that
|
# ensure that there are at least D_out peers first and rebalance to g.d after that
|
||||||
let maxOutboundPrunes =
|
let maxOutboundPrunes =
|
||||||
block:
|
block:
|
||||||
var count = 0
|
var count = 0
|
||||||
for peer in mesh:
|
for peer in mesh:
|
||||||
|
@ -815,7 +815,7 @@ proc heartbeat(g: GossipSub) {.async.} =
|
||||||
g.broadcast(prunes, prune)
|
g.broadcast(prunes, prune)
|
||||||
|
|
||||||
await g.rebalanceMesh(t)
|
await g.rebalanceMesh(t)
|
||||||
|
|
||||||
libp2p_gossipsub_peers_mesh_sum.set(totalMeshPeers.int64)
|
libp2p_gossipsub_peers_mesh_sum.set(totalMeshPeers.int64)
|
||||||
libp2p_gossipsub_peers_gossipsub_sum.set(totalGossipPeers.int64)
|
libp2p_gossipsub_peers_gossipsub_sum.set(totalGossipPeers.int64)
|
||||||
|
|
||||||
|
@ -836,7 +836,8 @@ proc heartbeat(g: GossipSub) {.async.} =
|
||||||
except CancelledError as exc:
|
except CancelledError as exc:
|
||||||
raise exc
|
raise exc
|
||||||
except CatchableError as exc:
|
except CatchableError as exc:
|
||||||
warn "exception ocurred in gossipsub heartbeat", exc = exc.msg, trace = exc.getStackTrace()
|
warn "exception ocurred in gossipsub heartbeat", exc = exc.msg,
|
||||||
|
trace = exc.getStackTrace()
|
||||||
|
|
||||||
for trigger in g.heartbeatEvents:
|
for trigger in g.heartbeatEvents:
|
||||||
trace "firing heartbeat event", instance = cast[int](g)
|
trace "firing heartbeat event", instance = cast[int](g)
|
||||||
|
@ -961,7 +962,7 @@ proc handleGraft(g: GossipSub,
|
||||||
backoff: g.parameters.pruneBackoff.seconds.uint64))
|
backoff: g.parameters.pruneBackoff.seconds.uint64))
|
||||||
|
|
||||||
g.punishPeer(peer, @[topic])
|
g.punishPeer(peer, @[topic])
|
||||||
|
|
||||||
continue
|
continue
|
||||||
|
|
||||||
if peer.peerId in g.backingOff and g.backingOff[peer.peerId] > Moment.now():
|
if peer.peerId in g.backingOff and g.backingOff[peer.peerId] > Moment.now():
|
||||||
|
@ -973,9 +974,9 @@ proc handleGraft(g: GossipSub,
|
||||||
topicID: graft.topicID,
|
topicID: graft.topicID,
|
||||||
peers: @[], # omitting heavy computation here as the remote did something illegal
|
peers: @[], # omitting heavy computation here as the remote did something illegal
|
||||||
backoff: g.parameters.pruneBackoff.seconds.uint64))
|
backoff: g.parameters.pruneBackoff.seconds.uint64))
|
||||||
|
|
||||||
g.punishPeer(peer, @[topic])
|
g.punishPeer(peer, @[topic])
|
||||||
|
|
||||||
continue
|
continue
|
||||||
|
|
||||||
if peer notin g.peerStats:
|
if peer notin g.peerStats:
|
||||||
|
@ -1055,8 +1056,8 @@ proc handleIHave(g: GossipSub,
|
||||||
dec peer.iHaveBudget
|
dec peer.iHaveBudget
|
||||||
else:
|
else:
|
||||||
return
|
return
|
||||||
|
|
||||||
# shuffling result.messageIDs before sending it out to increase the likelihood
|
# shuffling result.messageIDs before sending it out to increase the likelihood
|
||||||
# of getting an answer if the peer truncates the list due to internal size restrictions.
|
# of getting an answer if the peer truncates the list due to internal size restrictions.
|
||||||
shuffle(result.messageIDs)
|
shuffle(result.messageIDs)
|
||||||
|
|
||||||
|
|
|
@ -129,7 +129,7 @@ proc handle*(p: PubSubPeer, conn: Connection) {.async.} =
|
||||||
await conn.close()
|
await conn.close()
|
||||||
except CancelledError:
|
except CancelledError:
|
||||||
# This is top-level procedure which will work as separate task, so it
|
# This is top-level procedure which will work as separate task, so it
|
||||||
# do not need to propogate CancelledError.
|
# do not need to propagate CancelledError.
|
||||||
trace "Unexpected cancellation in PubSubPeer.handle"
|
trace "Unexpected cancellation in PubSubPeer.handle"
|
||||||
except CatchableError as exc:
|
except CatchableError as exc:
|
||||||
trace "Exception occurred in PubSubPeer.handle",
|
trace "Exception occurred in PubSubPeer.handle",
|
||||||
|
@ -193,6 +193,13 @@ proc sendImpl(conn: Connection, encoded: seq[byte]) {.async.} =
|
||||||
|
|
||||||
await conn.close() # This will clean up the send connection
|
await conn.close() # This will clean up the send connection
|
||||||
|
|
||||||
|
template sendMetrics(msg: RPCMsg): untyped =
|
||||||
|
when defined(libp2p_expensive_metrics):
|
||||||
|
for x in msg.messages:
|
||||||
|
for t in x.topicIDs:
|
||||||
|
# metrics
|
||||||
|
libp2p_pubsub_sent_messages.inc(labelValues = [$p.peerId, t])
|
||||||
|
|
||||||
proc send*(p: PubSubPeer, msg: RPCMsg, anonymize: bool) =
|
proc send*(p: PubSubPeer, msg: RPCMsg, anonymize: bool) =
|
||||||
doAssert(not isNil(p), "pubsubpeer nil!")
|
doAssert(not isNil(p), "pubsubpeer nil!")
|
||||||
|
|
||||||
|
@ -209,13 +216,15 @@ proc send*(p: PubSubPeer, msg: RPCMsg, anonymize: bool) =
|
||||||
# some forms of valid but redundantly encoded protobufs with unknown or
|
# some forms of valid but redundantly encoded protobufs with unknown or
|
||||||
# duplicated fields
|
# duplicated fields
|
||||||
let encoded = if p.hasObservers():
|
let encoded = if p.hasObservers():
|
||||||
|
var mm = msg
|
||||||
# trigger send hooks
|
# trigger send hooks
|
||||||
var mm = msg # hooks can modify the message
|
|
||||||
p.sendObservers(mm)
|
p.sendObservers(mm)
|
||||||
|
sendMetrics(mm)
|
||||||
encodeRpcMsg(mm, anonymize)
|
encodeRpcMsg(mm, anonymize)
|
||||||
else:
|
else:
|
||||||
# If there are no send hooks, we redundantly re-encode the message to
|
# If there are no send hooks, we redundantly re-encode the message to
|
||||||
# protobuf for every peer - this could easily be improved!
|
# protobuf for every peer - this could easily be improved!
|
||||||
|
sendMetrics(msg)
|
||||||
encodeRpcMsg(msg, anonymize)
|
encodeRpcMsg(msg, anonymize)
|
||||||
|
|
||||||
if encoded.len <= 0:
|
if encoded.len <= 0:
|
||||||
|
@ -226,12 +235,6 @@ proc send*(p: PubSubPeer, msg: RPCMsg, anonymize: bool) =
|
||||||
# connection to the spawned send task
|
# connection to the spawned send task
|
||||||
asyncSpawn sendImpl(conn, encoded)
|
asyncSpawn sendImpl(conn, encoded)
|
||||||
|
|
||||||
when defined(libp2p_expensive_metrics):
|
|
||||||
for x in msg.messages:
|
|
||||||
for t in x.topicIDs:
|
|
||||||
# metrics
|
|
||||||
libp2p_pubsub_sent_messages.inc(labelValues = [$p.peerId, t])
|
|
||||||
|
|
||||||
proc newPubSubPeer*(peerId: PeerID,
|
proc newPubSubPeer*(peerId: PeerID,
|
||||||
getConn: GetConn,
|
getConn: GetConn,
|
||||||
onEvent: OnEvent,
|
onEvent: OnEvent,
|
||||||
|
|
Loading…
Reference in New Issue