mirror of https://github.com/vacp2p/nim-libp2p.git
make sure keys exist and more metrics (#215)
This commit is contained in:
parent
55a294a5c9
commit
ac04ca6e31
|
@ -78,16 +78,17 @@ proc replenishFanout(g: GossipSub, topic: string) {.async.} =
|
||||||
if topic notin g.fanout:
|
if topic notin g.fanout:
|
||||||
g.fanout[topic] = initHashSet[string]()
|
g.fanout[topic] = initHashSet[string]()
|
||||||
|
|
||||||
if g.fanout[topic].len < GossipSubDLo:
|
if g.fanout.getOrDefault(topic).len < GossipSubDLo:
|
||||||
trace "replenishing fanout", peers = g.fanout[topic].len
|
trace "replenishing fanout", peers = g.fanout.getOrDefault(topic).len
|
||||||
if topic in g.gossipsub:
|
if topic in g.gossipsub:
|
||||||
for p in g.gossipsub[topic]:
|
for p in g.gossipsub.getOrDefault(topic):
|
||||||
if not g.fanout[topic].containsOrIncl(p):
|
if not g.fanout[topic].containsOrIncl(p):
|
||||||
libp2p_gossipsub_peers_per_topic_fanout.set(g.fanout[topic].len.int64, labelValues = [topic])
|
if g.fanout.getOrDefault(topic).len == GossipSubD:
|
||||||
if g.fanout[topic].len == GossipSubD:
|
|
||||||
break
|
break
|
||||||
|
|
||||||
trace "fanout replenished with peers", peers = g.fanout[topic].len
|
libp2p_gossipsub_peers_per_topic_fanout
|
||||||
|
.set(g.fanout.getOrDefault(topic).len.int64, labelValues = [topic])
|
||||||
|
trace "fanout replenished with peers", peers = g.fanout.getOrDefault(topic).len
|
||||||
|
|
||||||
proc rebalanceMesh(g: GossipSub, topic: string) {.async.} =
|
proc rebalanceMesh(g: GossipSub, topic: string) {.async.} =
|
||||||
try:
|
try:
|
||||||
|
@ -96,47 +97,68 @@ proc rebalanceMesh(g: GossipSub, topic: string) {.async.} =
|
||||||
if topic notin g.mesh:
|
if topic notin g.mesh:
|
||||||
g.mesh[topic] = initHashSet[string]()
|
g.mesh[topic] = initHashSet[string]()
|
||||||
|
|
||||||
if g.mesh[topic].len < GossipSubDlo:
|
if g.mesh.getOrDefault(topic).len < GossipSubDlo:
|
||||||
trace "replenishing mesh"
|
trace "replenishing mesh", topic
|
||||||
# replenish the mesh if we're below GossipSubDlo
|
# replenish the mesh if we're below GossipSubDlo
|
||||||
while g.mesh[topic].len < GossipSubD:
|
while g.mesh.getOrDefault(topic).len < GossipSubD:
|
||||||
trace "gathering peers", peers = g.mesh[topic].len
|
trace "gathering peers", peers = g.mesh.getOrDefault(topic).len
|
||||||
|
await sleepAsync(1.millis) # don't starve the event loop
|
||||||
var id: string
|
var id: string
|
||||||
if topic in g.fanout and g.fanout[topic].len > 0:
|
if topic in g.fanout and g.fanout.getOrDefault(topic).len > 0:
|
||||||
id = sample(toSeq(g.fanout[topic]))
|
trace "getting peer from fanout", topic,
|
||||||
|
peers = g.fanout.getOrDefault(topic).len
|
||||||
|
|
||||||
|
id = sample(toSeq(g.fanout.getOrDefault(topic)))
|
||||||
g.fanout[topic].excl(id)
|
g.fanout[topic].excl(id)
|
||||||
libp2p_gossipsub_peers_per_topic_fanout.set(g.fanout[topic].len.int64, labelValues = [topic])
|
|
||||||
|
if id in g.fanout[topic]:
|
||||||
|
continue # we already have this peer in the mesh, try again
|
||||||
|
|
||||||
trace "got fanout peer", peer = id
|
trace "got fanout peer", peer = id
|
||||||
elif topic in g.gossipsub and g.gossipsub[topic].len > 0:
|
elif topic in g.gossipsub and g.gossipsub.getOrDefault(topic).len > 0:
|
||||||
|
trace "getting peer from gossipsub", topic,
|
||||||
|
peers = g.gossipsub.getOrDefault(topic).len
|
||||||
|
|
||||||
id = sample(toSeq(g.gossipsub[topic]))
|
id = sample(toSeq(g.gossipsub[topic]))
|
||||||
g.gossipsub[topic].excl(id)
|
g.gossipsub[topic].excl(id)
|
||||||
libp2p_gossipsub_peers_per_topic_gossipsub.set(g.gossipsub[topic].len.int64, labelValues = [topic])
|
|
||||||
|
if id in g.mesh[topic]:
|
||||||
|
continue # we already have this peer in the mesh, try again
|
||||||
|
|
||||||
trace "got gossipsub peer", peer = id
|
trace "got gossipsub peer", peer = id
|
||||||
else:
|
else:
|
||||||
trace "no more peers"
|
trace "no more peers"
|
||||||
break
|
break
|
||||||
|
|
||||||
g.mesh[topic].incl(id)
|
g.mesh[topic].incl(id)
|
||||||
libp2p_gossipsub_peers_per_topic_mesh.set(g.mesh[topic].len.int64, labelValues = [topic])
|
|
||||||
if id in g.peers:
|
if id in g.peers:
|
||||||
let p = g.peers[id]
|
let p = g.peers[id]
|
||||||
# send a graft message to the peer
|
# send a graft message to the peer
|
||||||
await p.sendGraft(@[topic])
|
await p.sendGraft(@[topic])
|
||||||
|
|
||||||
# prune peers if we've gone over
|
# prune peers if we've gone over
|
||||||
if g.mesh[topic].len > GossipSubDhi:
|
if g.mesh.getOrDefault(topic).len > GossipSubDhi:
|
||||||
trace "pruning mesh"
|
trace "about to prune mesh", mesh = g.mesh.getOrDefault(topic).len
|
||||||
while g.mesh[topic].len > GossipSubD:
|
while g.mesh.getOrDefault(topic).len > GossipSubD:
|
||||||
trace "pruning peers", peers = g.mesh[topic].len
|
trace "pruning peers", peers = g.mesh[topic].len
|
||||||
let id = toSeq(g.mesh[topic])[rand(0..<g.mesh[topic].len)]
|
let id = toSeq(g.mesh[topic])[rand(0..<g.mesh[topic].len)]
|
||||||
g.mesh[topic].excl(id)
|
g.mesh[topic].excl(id)
|
||||||
libp2p_gossipsub_peers_per_topic_mesh.set(g.mesh[topic].len.int64, labelValues = [topic])
|
|
||||||
|
|
||||||
let p = g.peers[id]
|
let p = g.peers[id]
|
||||||
# send a graft message to the peer
|
# send a graft message to the peer
|
||||||
await p.sendPrune(@[topic])
|
await p.sendPrune(@[topic])
|
||||||
|
|
||||||
trace "mesh balanced, got peers", peers = g.mesh[topic].len.int64, topicId = topic
|
libp2p_gossipsub_peers_per_topic_gossipsub
|
||||||
|
.set(g.gossipsub.getOrDefault(topic).len.int64, labelValues = [topic])
|
||||||
|
|
||||||
|
libp2p_gossipsub_peers_per_topic_fanout
|
||||||
|
.set(g.fanout.getOrDefault(topic).len.int64, labelValues = [topic])
|
||||||
|
|
||||||
|
libp2p_gossipsub_peers_per_topic_mesh
|
||||||
|
.set(g.mesh.getOrDefault(topic).len.int64, labelValues = [topic])
|
||||||
|
|
||||||
|
trace "mesh balanced, got peers", peers = g.mesh.getOrDefault(topic).len,
|
||||||
|
topicId = topic
|
||||||
except CatchableError as exc:
|
except CatchableError as exc:
|
||||||
trace "exception occurred re-balancing mesh", exc = exc.msg
|
trace "exception occurred re-balancing mesh", exc = exc.msg
|
||||||
|
|
||||||
|
@ -148,24 +170,19 @@ proc dropFanoutPeers(g: GossipSub) {.async.} =
|
||||||
if Moment.now > val:
|
if Moment.now > val:
|
||||||
dropping.add(topic)
|
dropping.add(topic)
|
||||||
g.fanout.del(topic)
|
g.fanout.del(topic)
|
||||||
|
|
||||||
for topic in dropping:
|
for topic in dropping:
|
||||||
g.lastFanoutPubSub.del(topic)
|
g.lastFanoutPubSub.del(topic)
|
||||||
|
|
||||||
|
libp2p_gossipsub_peers_per_topic_fanout
|
||||||
|
.set(g.fanout.getOrDefault(topic).len.int64, labelValues = [topic])
|
||||||
|
|
||||||
proc getGossipPeers(g: GossipSub): Table[string, ControlMessage] {.gcsafe.} =
|
proc getGossipPeers(g: GossipSub): Table[string, ControlMessage] {.gcsafe.} =
|
||||||
## gossip iHave messages to peers
|
## gossip iHave messages to peers
|
||||||
let topics = toHashSet(toSeq(g.mesh.keys)) + toHashSet(toSeq(g.fanout.keys))
|
let topics = toHashSet(toSeq(g.mesh.keys)) + toHashSet(toSeq(g.fanout.keys))
|
||||||
for topic in topics:
|
for topic in topics:
|
||||||
let mesh: HashSet[string] =
|
let mesh: HashSet[string] = g.mesh.getOrDefault(topic)
|
||||||
if topic in g.mesh:
|
let fanout: HashSet[string] = g.fanout.getOrDefault(topic)
|
||||||
g.mesh[topic]
|
|
||||||
else:
|
|
||||||
initHashSet[string]()
|
|
||||||
|
|
||||||
let fanout: HashSet[string] =
|
|
||||||
if topic in g.fanout:
|
|
||||||
g.fanout[topic]
|
|
||||||
else:
|
|
||||||
initHashSet[string]()
|
|
||||||
|
|
||||||
let gossipPeers = mesh + fanout
|
let gossipPeers = mesh + fanout
|
||||||
let mids = g.mcache.window(topic)
|
let mids = g.mcache.window(topic)
|
||||||
|
@ -178,25 +195,27 @@ proc getGossipPeers(g: GossipSub): Table[string, ControlMessage] {.gcsafe.} =
|
||||||
continue
|
continue
|
||||||
|
|
||||||
while result.len < GossipSubD:
|
while result.len < GossipSubD:
|
||||||
if not (g.gossipsub[topic].len > 0):
|
if g.gossipsub.getOrDefault(topic).len == 0:
|
||||||
trace "no peers for topic, skipping", topicID = topic
|
trace "no peers for topic, skipping", topicID = topic
|
||||||
break
|
break
|
||||||
|
|
||||||
let id = toSeq(g.gossipsub[topic]).sample()
|
let id = toSeq(g.gossipsub.getOrDefault(topic)).sample()
|
||||||
g.gossipsub[topic].excl(id)
|
g.gossipsub[topic].excl(id)
|
||||||
libp2p_gossipsub_peers_per_topic_gossipsub.set(g.gossipsub[topic].len.int64, labelValues = [topic])
|
|
||||||
if id notin gossipPeers:
|
if id notin gossipPeers:
|
||||||
if id notin result:
|
if id notin result:
|
||||||
result[id] = ControlMessage()
|
result[id] = ControlMessage()
|
||||||
result[id].ihave.add(ihave)
|
result[id].ihave.add(ihave)
|
||||||
|
|
||||||
|
libp2p_gossipsub_peers_per_topic_gossipsub
|
||||||
|
.set(g.gossipsub.getOrDefault(topic).len.int64, labelValues = [topic])
|
||||||
|
|
||||||
proc heartbeat(g: GossipSub) {.async.} =
|
proc heartbeat(g: GossipSub) {.async.} =
|
||||||
while true:
|
while true:
|
||||||
try:
|
try:
|
||||||
await g.heartbeatLock.acquire()
|
await g.heartbeatLock.acquire()
|
||||||
trace "running heartbeat"
|
trace "running heartbeat"
|
||||||
|
|
||||||
for t in g.mesh.keys:
|
for t in g.topics.keys:
|
||||||
await g.rebalanceMesh(t)
|
await g.rebalanceMesh(t)
|
||||||
|
|
||||||
await g.dropFanoutPeers()
|
await g.dropFanoutPeers()
|
||||||
|
@ -222,15 +241,21 @@ method handleDisconnect(g: GossipSub, peer: PubSubPeer) {.async.} =
|
||||||
|
|
||||||
for t in g.gossipsub.keys:
|
for t in g.gossipsub.keys:
|
||||||
g.gossipsub[t].excl(peer.id)
|
g.gossipsub[t].excl(peer.id)
|
||||||
libp2p_gossipsub_peers_per_topic_gossipsub.set(g.gossipsub[t].len.int64, labelValues = [t])
|
libp2p_gossipsub_peers_per_topic_gossipsub
|
||||||
|
.set(g.gossipsub[t].len.int64, labelValues = [t])
|
||||||
|
|
||||||
|
# mostly for metrics
|
||||||
|
await procCall PubSub(g).subscribeTopic(t, false, peer.id)
|
||||||
|
|
||||||
for t in g.mesh.keys:
|
for t in g.mesh.keys:
|
||||||
g.mesh[t].excl(peer.id)
|
g.mesh[t].excl(peer.id)
|
||||||
libp2p_gossipsub_peers_per_topic_mesh.set(g.mesh[t].len.int64, labelValues = [t])
|
libp2p_gossipsub_peers_per_topic_mesh
|
||||||
|
.set(g.mesh[t].len.int64, labelValues = [t])
|
||||||
|
|
||||||
for t in g.fanout.keys:
|
for t in g.fanout.keys:
|
||||||
g.fanout[t].excl(peer.id)
|
g.fanout[t].excl(peer.id)
|
||||||
libp2p_gossipsub_peers_per_topic_fanout.set(g.fanout[t].len.int64, labelValues = [t])
|
libp2p_gossipsub_peers_per_topic_fanout
|
||||||
|
.set(g.fanout[t].len.int64, labelValues = [t])
|
||||||
|
|
||||||
method subscribeToPeer*(p: GossipSub,
|
method subscribeToPeer*(p: GossipSub,
|
||||||
conn: Connection) {.async.} =
|
conn: Connection) {.async.} =
|
||||||
|
@ -250,12 +275,13 @@ method subscribeTopic*(g: GossipSub,
|
||||||
trace "adding subscription for topic", peer = peerId, name = topic
|
trace "adding subscription for topic", peer = peerId, name = topic
|
||||||
# subscribe remote peer to the topic
|
# subscribe remote peer to the topic
|
||||||
g.gossipsub[topic].incl(peerId)
|
g.gossipsub[topic].incl(peerId)
|
||||||
libp2p_gossipsub_peers_per_topic_gossipsub.set(g.gossipsub[topic].len.int64, labelValues = [topic])
|
|
||||||
else:
|
else:
|
||||||
trace "removing subscription for topic", peer = peerId, name = topic
|
trace "removing subscription for topic", peer = peerId, name = topic
|
||||||
# unsubscribe remote peer from the topic
|
# unsubscribe remote peer from the topic
|
||||||
g.gossipsub[topic].excl(peerId)
|
g.gossipsub[topic].excl(peerId)
|
||||||
libp2p_gossipsub_peers_per_topic_gossipsub.set(g.gossipsub[topic].len.int64, labelValues = [topic])
|
|
||||||
|
libp2p_gossipsub_peers_per_topic_gossipsub
|
||||||
|
.set(g.gossipsub.getOrDefault(topic).len.int64, labelValues = [topic])
|
||||||
|
|
||||||
if topic in g.topics:
|
if topic in g.topics:
|
||||||
await g.rebalanceMesh(topic)
|
await g.rebalanceMesh(topic)
|
||||||
|
@ -271,13 +297,17 @@ proc handleGraft(g: GossipSub,
|
||||||
if graft.topicID in g.topics:
|
if graft.topicID in g.topics:
|
||||||
if g.mesh.len < GossipSubD:
|
if g.mesh.len < GossipSubD:
|
||||||
g.mesh[graft.topicID].incl(peer.id)
|
g.mesh[graft.topicID].incl(peer.id)
|
||||||
libp2p_gossipsub_peers_per_topic_mesh.set(g.mesh[graft.topicID].len.int64, labelValues = [graft.topicID])
|
|
||||||
else:
|
else:
|
||||||
g.gossipsub[graft.topicID].incl(peer.id)
|
g.gossipsub[graft.topicID].incl(peer.id)
|
||||||
libp2p_gossipsub_peers_per_topic_gossipsub.set(g.gossipsub[graft.topicID].len.int64, labelValues = [graft.topicID])
|
|
||||||
else:
|
else:
|
||||||
respControl.prune.add(ControlPrune(topicID: graft.topicID))
|
respControl.prune.add(ControlPrune(topicID: graft.topicID))
|
||||||
|
|
||||||
|
libp2p_gossipsub_peers_per_topic_mesh
|
||||||
|
.set(g.mesh[graft.topicID].len.int64, labelValues = [graft.topicID])
|
||||||
|
|
||||||
|
libp2p_gossipsub_peers_per_topic_gossipsub
|
||||||
|
.set(g.gossipsub[graft.topicID].len.int64, labelValues = [graft.topicID])
|
||||||
|
|
||||||
proc handlePrune(g: GossipSub, peer: PubSubPeer, prunes: seq[ControlPrune]) =
|
proc handlePrune(g: GossipSub, peer: PubSubPeer, prunes: seq[ControlPrune]) =
|
||||||
for prune in prunes:
|
for prune in prunes:
|
||||||
trace "processing prune message", peer = peer.id,
|
trace "processing prune message", peer = peer.id,
|
||||||
|
@ -285,7 +315,8 @@ proc handlePrune(g: GossipSub, peer: PubSubPeer, prunes: seq[ControlPrune]) =
|
||||||
|
|
||||||
if prune.topicID in g.mesh:
|
if prune.topicID in g.mesh:
|
||||||
g.mesh[prune.topicID].excl(peer.id)
|
g.mesh[prune.topicID].excl(peer.id)
|
||||||
libp2p_gossipsub_peers_per_topic_mesh.set(g.mesh[prune.topicID].len.int64, labelValues = [prune.topicID])
|
libp2p_gossipsub_peers_per_topic_mesh
|
||||||
|
.set(g.mesh[prune.topicID].len.int64, labelValues = [prune.topicID])
|
||||||
|
|
||||||
proc handleIHave(g: GossipSub,
|
proc handleIHave(g: GossipSub,
|
||||||
peer: PubSubPeer,
|
peer: PubSubPeer,
|
||||||
|
@ -406,7 +437,7 @@ method unsubscribe*(g: GossipSub,
|
||||||
for pair in topics:
|
for pair in topics:
|
||||||
let topic = pair.topic
|
let topic = pair.topic
|
||||||
if topic in g.mesh:
|
if topic in g.mesh:
|
||||||
let peers = g.mesh[topic]
|
let peers = g.mesh.getOrDefault(topic)
|
||||||
g.mesh.del(topic)
|
g.mesh.del(topic)
|
||||||
for id in peers:
|
for id in peers:
|
||||||
let p = g.peers[id]
|
let p = g.peers[id]
|
||||||
|
@ -427,11 +458,11 @@ method publish*(g: GossipSub,
|
||||||
|
|
||||||
if topic in g.topics: # if we're subscribed to the topic attempt to build a mesh
|
if topic in g.topics: # if we're subscribed to the topic attempt to build a mesh
|
||||||
await g.rebalanceMesh(topic)
|
await g.rebalanceMesh(topic)
|
||||||
peers = g.mesh[topic]
|
peers = g.mesh.getOrDefault(topic)
|
||||||
else: # send to fanout peers
|
else: # send to fanout peers
|
||||||
await g.replenishFanout(topic)
|
await g.replenishFanout(topic)
|
||||||
if topic in g.fanout:
|
if topic in g.fanout:
|
||||||
peers = g.fanout[topic]
|
peers = g.fanout.getOrDefault(topic)
|
||||||
# set the fanout expiry time
|
# set the fanout expiry time
|
||||||
g.lastFanoutPubSub[topic] = Moment.fromNow(GossipSubFanoutTTL)
|
g.lastFanoutPubSub[topic] = Moment.fromNow(GossipSubFanoutTTL)
|
||||||
|
|
||||||
|
|
|
@ -187,6 +187,8 @@ method unsubscribe*(p: PubSub,
|
||||||
topics: seq[TopicPair]) {.base, async.} =
|
topics: seq[TopicPair]) {.base, async.} =
|
||||||
## unsubscribe from a list of ``topic`` strings
|
## unsubscribe from a list of ``topic`` strings
|
||||||
for t in topics:
|
for t in topics:
|
||||||
|
# metrics
|
||||||
|
libp2p_pubsub_topics.dec()
|
||||||
for i, h in p.topics[t.topic].handler:
|
for i, h in p.topics[t.topic].handler:
|
||||||
if h == t.handler:
|
if h == t.handler:
|
||||||
p.topics[t.topic].handler.del(i)
|
p.topics[t.topic].handler.del(i)
|
||||||
|
@ -194,9 +196,6 @@ method unsubscribe*(p: PubSub,
|
||||||
method unsubscribe*(p: PubSub,
|
method unsubscribe*(p: PubSub,
|
||||||
topic: string,
|
topic: string,
|
||||||
handler: TopicHandler): Future[void] {.base.} =
|
handler: TopicHandler): Future[void] {.base.} =
|
||||||
# metrics
|
|
||||||
libp2p_pubsub_topics.dec()
|
|
||||||
|
|
||||||
## unsubscribe from a ``topic`` string
|
## unsubscribe from a ``topic`` string
|
||||||
p.unsubscribe(@[(topic, handler)])
|
p.unsubscribe(@[(topic, handler)])
|
||||||
|
|
||||||
|
|
|
@ -22,6 +22,10 @@ import rpc/[messages, message, protobuf],
|
||||||
logScope:
|
logScope:
|
||||||
topics = "pubsubpeer"
|
topics = "pubsubpeer"
|
||||||
|
|
||||||
|
declareCounter(libp2p_pubsub_sent_messages, "number of messages sent", labels = ["id"])
|
||||||
|
declareCounter(libp2p_pubsub_received_messages, "number of messages received", labels = ["id"])
|
||||||
|
declareCounter(libp2p_pubsub_skipped_messages, "number of skipped messages", labels = ["id"])
|
||||||
|
|
||||||
type
|
type
|
||||||
PubSubObserver* = ref object
|
PubSubObserver* = ref object
|
||||||
onRecv*: proc(peer: PubSubPeer; msgs: var RPCMsg) {.gcsafe, raises: [Defect].}
|
onRecv*: proc(peer: PubSubPeer; msgs: var RPCMsg) {.gcsafe, raises: [Defect].}
|
||||||
|
@ -41,9 +45,6 @@ type
|
||||||
|
|
||||||
RPCHandler* = proc(peer: PubSubPeer, msg: seq[RPCMsg]): Future[void] {.gcsafe.}
|
RPCHandler* = proc(peer: PubSubPeer, msg: seq[RPCMsg]): Future[void] {.gcsafe.}
|
||||||
|
|
||||||
declareCounter(libp2p_pubsub_sent_messages, "number of messages sent")
|
|
||||||
declareCounter(libp2p_pubsub_received_messages, "number of messages received")
|
|
||||||
|
|
||||||
proc id*(p: PubSubPeer): string = p.peerInfo.id
|
proc id*(p: PubSubPeer): string = p.peerInfo.id
|
||||||
|
|
||||||
proc isConnected*(p: PubSubPeer): bool =
|
proc isConnected*(p: PubSubPeer): bool =
|
||||||
|
@ -77,6 +78,7 @@ proc handle*(p: PubSubPeer, conn: Connection) {.async.} =
|
||||||
let digest = $(sha256.digest(data))
|
let digest = $(sha256.digest(data))
|
||||||
trace "read data from peer", peer = p.id, data = data.shortLog
|
trace "read data from peer", peer = p.id, data = data.shortLog
|
||||||
if digest in p.recvdRpcCache:
|
if digest in p.recvdRpcCache:
|
||||||
|
libp2p_pubsub_skipped_messages.inc(labelValues = [p.id])
|
||||||
trace "message already received, skipping", peer = p.id
|
trace "message already received, skipping", peer = p.id
|
||||||
continue
|
continue
|
||||||
|
|
||||||
|
@ -86,7 +88,7 @@ proc handle*(p: PubSubPeer, conn: Connection) {.async.} =
|
||||||
p.recvObservers(msg)
|
p.recvObservers(msg)
|
||||||
|
|
||||||
# metrics
|
# metrics
|
||||||
libp2p_pubsub_received_messages.inc()
|
libp2p_pubsub_received_messages.inc(labelValues = [p.id])
|
||||||
|
|
||||||
await p.handler(p, @[msg])
|
await p.handler(p, @[msg])
|
||||||
p.recvdRpcCache.put(digest)
|
p.recvdRpcCache.put(digest)
|
||||||
|
@ -127,7 +129,7 @@ proc send*(p: PubSubPeer, msgs: seq[RPCMsg]) {.async.} =
|
||||||
p.sentRpcCache.put(digest)
|
p.sentRpcCache.put(digest)
|
||||||
|
|
||||||
# metrics
|
# metrics
|
||||||
libp2p_pubsub_sent_messages.inc()
|
libp2p_pubsub_sent_messages.inc(labelValues = [p.id])
|
||||||
except CatchableError as exc:
|
except CatchableError as exc:
|
||||||
trace "unable to send to remote", exc = exc.msg
|
trace "unable to send to remote", exc = exc.msg
|
||||||
p.sendConn = nil
|
p.sendConn = nil
|
||||||
|
|
Loading…
Reference in New Issue