better pubsub metrics (#214)
This commit is contained in:
parent
6b196ad7b4
commit
55a294a5c9
|
@ -83,7 +83,7 @@ proc replenishFanout(g: GossipSub, topic: string) {.async.} =
|
||||||
if topic in g.gossipsub:
|
if topic in g.gossipsub:
|
||||||
for p in g.gossipsub[topic]:
|
for p in g.gossipsub[topic]:
|
||||||
if not g.fanout[topic].containsOrIncl(p):
|
if not g.fanout[topic].containsOrIncl(p):
|
||||||
libp2p_gossipsub_peers_per_topic_fanout.inc(labelValues = [topic])
|
libp2p_gossipsub_peers_per_topic_fanout.set(g.fanout[topic].len.int64, labelValues = [topic])
|
||||||
if g.fanout[topic].len == GossipSubD:
|
if g.fanout[topic].len == GossipSubD:
|
||||||
break
|
break
|
||||||
|
|
||||||
|
@ -105,19 +105,19 @@ proc rebalanceMesh(g: GossipSub, topic: string) {.async.} =
|
||||||
if topic in g.fanout and g.fanout[topic].len > 0:
|
if topic in g.fanout and g.fanout[topic].len > 0:
|
||||||
id = sample(toSeq(g.fanout[topic]))
|
id = sample(toSeq(g.fanout[topic]))
|
||||||
g.fanout[topic].excl(id)
|
g.fanout[topic].excl(id)
|
||||||
libp2p_gossipsub_peers_per_topic_fanout.dec(labelValues = [topic])
|
libp2p_gossipsub_peers_per_topic_fanout.set(g.fanout[topic].len.int64, labelValues = [topic])
|
||||||
trace "got fanout peer", peer = id
|
trace "got fanout peer", peer = id
|
||||||
elif topic in g.gossipsub and g.gossipsub[topic].len > 0:
|
elif topic in g.gossipsub and g.gossipsub[topic].len > 0:
|
||||||
id = sample(toSeq(g.gossipsub[topic]))
|
id = sample(toSeq(g.gossipsub[topic]))
|
||||||
g.gossipsub[topic].excl(id)
|
g.gossipsub[topic].excl(id)
|
||||||
libp2p_gossipsub_peers_per_topic_gossipsub.dec(labelValues = [topic])
|
libp2p_gossipsub_peers_per_topic_gossipsub.set(g.gossipsub[topic].len.int64, labelValues = [topic])
|
||||||
trace "got gossipsub peer", peer = id
|
trace "got gossipsub peer", peer = id
|
||||||
else:
|
else:
|
||||||
trace "no more peers"
|
trace "no more peers"
|
||||||
break
|
break
|
||||||
|
|
||||||
g.mesh[topic].incl(id)
|
g.mesh[topic].incl(id)
|
||||||
libp2p_gossipsub_peers_per_topic_mesh.inc(labelValues = [topic])
|
libp2p_gossipsub_peers_per_topic_mesh.set(g.mesh[topic].len.int64, labelValues = [topic])
|
||||||
if id in g.peers:
|
if id in g.peers:
|
||||||
let p = g.peers[id]
|
let p = g.peers[id]
|
||||||
# send a graft message to the peer
|
# send a graft message to the peer
|
||||||
|
@ -130,13 +130,13 @@ proc rebalanceMesh(g: GossipSub, topic: string) {.async.} =
|
||||||
trace "pruning peers", peers = g.mesh[topic].len
|
trace "pruning peers", peers = g.mesh[topic].len
|
||||||
let id = toSeq(g.mesh[topic])[rand(0..<g.mesh[topic].len)]
|
let id = toSeq(g.mesh[topic])[rand(0..<g.mesh[topic].len)]
|
||||||
g.mesh[topic].excl(id)
|
g.mesh[topic].excl(id)
|
||||||
libp2p_gossipsub_peers_per_topic_mesh.dec(labelValues = [topic])
|
libp2p_gossipsub_peers_per_topic_mesh.set(g.mesh[topic].len.int64, labelValues = [topic])
|
||||||
|
|
||||||
let p = g.peers[id]
|
let p = g.peers[id]
|
||||||
# send a graft message to the peer
|
# send a graft message to the peer
|
||||||
await p.sendPrune(@[topic])
|
await p.sendPrune(@[topic])
|
||||||
|
|
||||||
trace "mesh balanced, got peers", peers = g.mesh[topic].len, topicId = topic
|
trace "mesh balanced, got peers", peers = g.mesh[topic].len.int64, topicId = topic
|
||||||
except CatchableError as exc:
|
except CatchableError as exc:
|
||||||
trace "exception occurred re-balancing mesh", exc = exc.msg
|
trace "exception occurred re-balancing mesh", exc = exc.msg
|
||||||
|
|
||||||
|
@ -184,7 +184,7 @@ proc getGossipPeers(g: GossipSub): Table[string, ControlMessage] {.gcsafe.} =
|
||||||
|
|
||||||
let id = toSeq(g.gossipsub[topic]).sample()
|
let id = toSeq(g.gossipsub[topic]).sample()
|
||||||
g.gossipsub[topic].excl(id)
|
g.gossipsub[topic].excl(id)
|
||||||
libp2p_gossipsub_peers_per_topic_gossipsub.dec(labelValues = [topic])
|
libp2p_gossipsub_peers_per_topic_gossipsub.set(g.gossipsub[topic].len.int64, labelValues = [topic])
|
||||||
if id notin gossipPeers:
|
if id notin gossipPeers:
|
||||||
if id notin result:
|
if id notin result:
|
||||||
result[id] = ControlMessage()
|
result[id] = ControlMessage()
|
||||||
|
@ -222,15 +222,15 @@ method handleDisconnect(g: GossipSub, peer: PubSubPeer) {.async.} =
|
||||||
|
|
||||||
for t in g.gossipsub.keys:
|
for t in g.gossipsub.keys:
|
||||||
g.gossipsub[t].excl(peer.id)
|
g.gossipsub[t].excl(peer.id)
|
||||||
libp2p_gossipsub_peers_per_topic_gossipsub.dec(labelValues = [t])
|
libp2p_gossipsub_peers_per_topic_gossipsub.set(g.gossipsub[t].len.int64, labelValues = [t])
|
||||||
|
|
||||||
for t in g.mesh.keys:
|
for t in g.mesh.keys:
|
||||||
g.mesh[t].excl(peer.id)
|
g.mesh[t].excl(peer.id)
|
||||||
libp2p_gossipsub_peers_per_topic_mesh.dec(labelValues = [t])
|
libp2p_gossipsub_peers_per_topic_mesh.set(g.mesh[t].len.int64, labelValues = [t])
|
||||||
|
|
||||||
for t in g.fanout.keys:
|
for t in g.fanout.keys:
|
||||||
g.fanout[t].excl(peer.id)
|
g.fanout[t].excl(peer.id)
|
||||||
libp2p_gossipsub_peers_per_topic_fanout.dec(labelValues = [t])
|
libp2p_gossipsub_peers_per_topic_fanout.set(g.fanout[t].len.int64, labelValues = [t])
|
||||||
|
|
||||||
method subscribeToPeer*(p: GossipSub,
|
method subscribeToPeer*(p: GossipSub,
|
||||||
conn: Connection) {.async.} =
|
conn: Connection) {.async.} =
|
||||||
|
@ -250,12 +250,12 @@ method subscribeTopic*(g: GossipSub,
|
||||||
trace "adding subscription for topic", peer = peerId, name = topic
|
trace "adding subscription for topic", peer = peerId, name = topic
|
||||||
# subscribe remote peer to the topic
|
# subscribe remote peer to the topic
|
||||||
g.gossipsub[topic].incl(peerId)
|
g.gossipsub[topic].incl(peerId)
|
||||||
libp2p_gossipsub_peers_per_topic_gossipsub.inc(labelValues = [topic])
|
libp2p_gossipsub_peers_per_topic_gossipsub.set(g.gossipsub[topic].len.int64, labelValues = [topic])
|
||||||
else:
|
else:
|
||||||
trace "removing subscription for topic", peer = peerId, name = topic
|
trace "removing subscription for topic", peer = peerId, name = topic
|
||||||
# unsubscribe remote peer from the topic
|
# unsubscribe remote peer from the topic
|
||||||
g.gossipsub[topic].excl(peerId)
|
g.gossipsub[topic].excl(peerId)
|
||||||
libp2p_gossipsub_peers_per_topic_gossipsub.dec(labelValues = [topic])
|
libp2p_gossipsub_peers_per_topic_gossipsub.set(g.gossipsub[topic].len.int64, labelValues = [topic])
|
||||||
|
|
||||||
if topic in g.topics:
|
if topic in g.topics:
|
||||||
await g.rebalanceMesh(topic)
|
await g.rebalanceMesh(topic)
|
||||||
|
@ -271,10 +271,10 @@ proc handleGraft(g: GossipSub,
|
||||||
if graft.topicID in g.topics:
|
if graft.topicID in g.topics:
|
||||||
if g.mesh.len < GossipSubD:
|
if g.mesh.len < GossipSubD:
|
||||||
g.mesh[graft.topicID].incl(peer.id)
|
g.mesh[graft.topicID].incl(peer.id)
|
||||||
libp2p_gossipsub_peers_per_topic_mesh.inc(labelValues = [graft.topicID])
|
libp2p_gossipsub_peers_per_topic_mesh.set(g.mesh[graft.topicID].len.int64, labelValues = [graft.topicID])
|
||||||
else:
|
else:
|
||||||
g.gossipsub[graft.topicID].incl(peer.id)
|
g.gossipsub[graft.topicID].incl(peer.id)
|
||||||
libp2p_gossipsub_peers_per_topic_gossipsub.inc(labelValues = [graft.topicID])
|
libp2p_gossipsub_peers_per_topic_gossipsub.set(g.gossipsub[graft.topicID].len.int64, labelValues = [graft.topicID])
|
||||||
else:
|
else:
|
||||||
respControl.prune.add(ControlPrune(topicID: graft.topicID))
|
respControl.prune.add(ControlPrune(topicID: graft.topicID))
|
||||||
|
|
||||||
|
@ -285,7 +285,7 @@ proc handlePrune(g: GossipSub, peer: PubSubPeer, prunes: seq[ControlPrune]) =
|
||||||
|
|
||||||
if prune.topicID in g.mesh:
|
if prune.topicID in g.mesh:
|
||||||
g.mesh[prune.topicID].excl(peer.id)
|
g.mesh[prune.topicID].excl(peer.id)
|
||||||
libp2p_gossipsub_peers_per_topic_mesh.dec(labelValues = [prune.topicID])
|
libp2p_gossipsub_peers_per_topic_mesh.set(g.mesh[prune.topicID].len.int64, labelValues = [prune.topicID])
|
||||||
|
|
||||||
proc handleIHave(g: GossipSub,
|
proc handleIHave(g: GossipSub,
|
||||||
peer: PubSubPeer,
|
peer: PubSubPeer,
|
||||||
|
@ -473,10 +473,10 @@ method initPubSub*(g: GossipSub) =
|
||||||
|
|
||||||
randomize()
|
randomize()
|
||||||
g.mcache = newMCache(GossipSubHistoryGossip, GossipSubHistoryLength)
|
g.mcache = newMCache(GossipSubHistoryGossip, GossipSubHistoryLength)
|
||||||
g.mesh = initTable[string, HashSet[string]]() # meshes - topic to peer
|
g.mesh = initTable[string, HashSet[string]]() # meshes - topic to peer
|
||||||
g.fanout = initTable[string, HashSet[string]]() # fanout - topic to peer
|
g.fanout = initTable[string, HashSet[string]]() # fanout - topic to peer
|
||||||
g.gossipsub = initTable[string, HashSet[string]]() # topic to peer map of all gossipsub peers
|
g.gossipsub = initTable[string, HashSet[string]]()# topic to peer map of all gossipsub peers
|
||||||
g.lastFanoutPubSub = initTable[string, Moment]() # last publish time for fanout topics
|
g.lastFanoutPubSub = initTable[string, Moment]() # last publish time for fanout topics
|
||||||
g.gossip = initTable[string, seq[ControlIHave]]() # pending gossip
|
g.gossip = initTable[string, seq[ControlIHave]]() # pending gossip
|
||||||
g.control = initTable[string, ControlMessage]() # pending control messages
|
g.control = initTable[string, ControlMessage]() # pending control messages
|
||||||
g.heartbeatLock = newAsyncLock()
|
g.heartbeatLock = newAsyncLock()
|
||||||
|
|
|
@ -8,7 +8,7 @@
|
||||||
## those terms.
|
## those terms.
|
||||||
|
|
||||||
import options, hashes, strutils, tables, hashes
|
import options, hashes, strutils, tables, hashes
|
||||||
import chronos, chronicles, nimcrypto/sha2
|
import chronos, chronicles, nimcrypto/sha2, metrics
|
||||||
import rpc/[messages, message, protobuf],
|
import rpc/[messages, message, protobuf],
|
||||||
timedcache,
|
timedcache,
|
||||||
../../peer,
|
../../peer,
|
||||||
|
@ -18,7 +18,6 @@ import rpc/[messages, message, protobuf],
|
||||||
../../crypto/crypto,
|
../../crypto/crypto,
|
||||||
../../protobuf/minprotobuf,
|
../../protobuf/minprotobuf,
|
||||||
../../utility
|
../../utility
|
||||||
import metrics
|
|
||||||
|
|
||||||
logScope:
|
logScope:
|
||||||
topics = "pubsubpeer"
|
topics = "pubsubpeer"
|
||||||
|
@ -42,8 +41,8 @@ type
|
||||||
|
|
||||||
RPCHandler* = proc(peer: PubSubPeer, msg: seq[RPCMsg]): Future[void] {.gcsafe.}
|
RPCHandler* = proc(peer: PubSubPeer, msg: seq[RPCMsg]): Future[void] {.gcsafe.}
|
||||||
|
|
||||||
declareCounter(libp2p_pubsub_encoded_messages, "number of messages encoded")
|
declareCounter(libp2p_pubsub_sent_messages, "number of messages sent")
|
||||||
declareCounter(libp2p_pubsub_decoded_messages, "number of messages decoded")
|
declareCounter(libp2p_pubsub_received_messages, "number of messages received")
|
||||||
|
|
||||||
proc id*(p: PubSubPeer): string = p.peerInfo.id
|
proc id*(p: PubSubPeer): string = p.peerInfo.id
|
||||||
|
|
||||||
|
@ -86,11 +85,11 @@ proc handle*(p: PubSubPeer, conn: Connection) {.async.} =
|
||||||
# trigger hooks
|
# trigger hooks
|
||||||
p.recvObservers(msg)
|
p.recvObservers(msg)
|
||||||
|
|
||||||
|
# metrics
|
||||||
|
libp2p_pubsub_received_messages.inc()
|
||||||
|
|
||||||
await p.handler(p, @[msg])
|
await p.handler(p, @[msg])
|
||||||
p.recvdRpcCache.put(digest)
|
p.recvdRpcCache.put(digest)
|
||||||
|
|
||||||
# metrics
|
|
||||||
libp2p_pubsub_decoded_messages.inc()
|
|
||||||
finally:
|
finally:
|
||||||
trace "exiting pubsub peer read loop", peer = p.id
|
trace "exiting pubsub peer read loop", peer = p.id
|
||||||
await conn.close()
|
await conn.close()
|
||||||
|
@ -105,9 +104,6 @@ proc send*(p: PubSubPeer, msgs: seq[RPCMsg]) {.async.} =
|
||||||
# trigger send hooks
|
# trigger send hooks
|
||||||
var mm = m # hooks can modify the message
|
var mm = m # hooks can modify the message
|
||||||
p.sendObservers(mm)
|
p.sendObservers(mm)
|
||||||
|
|
||||||
# metrics
|
|
||||||
libp2p_pubsub_encoded_messages.inc()
|
|
||||||
|
|
||||||
let encoded = encodeRpcMsg(mm)
|
let encoded = encodeRpcMsg(mm)
|
||||||
if encoded.buffer.len <= 0:
|
if encoded.buffer.len <= 0:
|
||||||
|
@ -129,6 +125,9 @@ proc send*(p: PubSubPeer, msgs: seq[RPCMsg]) {.async.} =
|
||||||
encoded = encoded.buffer.shortLog
|
encoded = encoded.buffer.shortLog
|
||||||
await p.sendConn.writeLp(encoded.buffer)
|
await p.sendConn.writeLp(encoded.buffer)
|
||||||
p.sentRpcCache.put(digest)
|
p.sentRpcCache.put(digest)
|
||||||
|
|
||||||
|
# metrics
|
||||||
|
libp2p_pubsub_sent_messages.inc()
|
||||||
except CatchableError as exc:
|
except CatchableError as exc:
|
||||||
trace "unable to send to remote", exc = exc.msg
|
trace "unable to send to remote", exc = exc.msg
|
||||||
p.sendConn = nil
|
p.sendConn = nil
|
||||||
|
|
Loading…
Reference in New Issue