Add GossipSub ping (#912)

This commit is contained in:
Tanguy 2023-06-21 10:40:10 +02:00 committed by GitHub
parent 224f92e172
commit 1c4d0832ce
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 17 additions and 0 deletions

View File

@ -156,6 +156,7 @@ method onNewPeer(g: GossipSub, peer: PubSubPeer) =
peer.behaviourPenalty = stats.behaviourPenalty peer.behaviourPenalty = stats.behaviourPenalty
peer.iHaveBudget = IHavePeerBudget peer.iHaveBudget = IHavePeerBudget
peer.pingBudget = PingsPeerBudget
method onPubSubPeerEvent*(p: GossipSub, peer: PubSubPeer, event: PubSubPeerEvent) {.gcsafe.} = method onPubSubPeerEvent*(p: GossipSub, peer: PubSubPeer, event: PubSubPeerEvent) {.gcsafe.} =
case event.kind case event.kind
@ -352,6 +353,9 @@ proc validateAndRelay(g: GossipSub,
method rpcHandler*(g: GossipSub, method rpcHandler*(g: GossipSub,
peer: PubSubPeer, peer: PubSubPeer,
rpcMsg: RPCMsg) {.async.} = rpcMsg: RPCMsg) {.async.} =
if rpcMsg.ping.len in 1..<64 and peer.pingBudget > 0:
g.send(peer, RPCMsg(pong: rpcMsg.ping))
peer.pingBudget.dec
for i in 0..<min(g.topicsHigh, rpcMsg.subscriptions.len): for i in 0..<min(g.topicsHigh, rpcMsg.subscriptions.len):
template sub: untyped = rpcMsg.subscriptions[i] template sub: untyped = rpcMsg.subscriptions[i]
g.handleSubscribe(peer, sub.topic, sub.subscribe) g.handleSubscribe(peer, sub.topic, sub.subscribe)

View File

@ -644,6 +644,7 @@ proc onHeartbeat(g: GossipSub) {.raises: [].} =
if peer.sentIHaves.len > g.parameters.historyLength: if peer.sentIHaves.len > g.parameters.historyLength:
discard peer.sentIHaves.popLast() discard peer.sentIHaves.popLast()
peer.iHaveBudget = IHavePeerBudget peer.iHaveBudget = IHavePeerBudget
peer.pingBudget = PingsPeerBudget
var meshMetrics = MeshMetrics() var meshMetrics = MeshMetrics()

View File

@ -45,6 +45,7 @@ const
const const
BackoffSlackTime* = 2 # seconds BackoffSlackTime* = 2 # seconds
PingsPeerBudget* = 100 # maximum of 6.4kb/heartbeat (6.4kb/s with default 1 second/hb)
IHavePeerBudget* = 10 IHavePeerBudget* = 10
# the max amount of IHave to expose, not by spec, but go as example # the max amount of IHave to expose, not by spec, but go as example
# rust sigp: https://github.com/sigp/rust-libp2p/blob/f53d02bc873fef2bf52cd31e3d5ce366a41d8a8c/protocols/gossipsub/src/config.rs#L572 # rust sigp: https://github.com/sigp/rust-libp2p/blob/f53d02bc873fef2bf52cd31e3d5ce366a41d8a8c/protocols/gossipsub/src/config.rs#L572

View File

@ -61,6 +61,7 @@ type
score*: float64 score*: float64
sentIHaves*: Deque[HashSet[MessageId]] sentIHaves*: Deque[HashSet[MessageId]]
iHaveBudget*: int iHaveBudget*: int
pingBudget*: int
maxMessageSize: int maxMessageSize: int
appScore*: float64 # application specific score appScore*: float64 # application specific score
behaviourPenalty*: float64 # the eventual penalty score behaviourPenalty*: float64 # the eventual penalty score

View File

@ -62,6 +62,8 @@ type
subscriptions*: seq[SubOpts] subscriptions*: seq[SubOpts]
messages*: seq[Message] messages*: seq[Message]
control*: Option[ControlMessage] control*: Option[ControlMessage]
ping*: seq[byte]
pong*: seq[byte]
func withSubs*( func withSubs*(
T: type RPCMsg, topics: openArray[string], subscribe: bool): T = T: type RPCMsg, topics: openArray[string], subscribe: bool): T =

View File

@ -316,6 +316,12 @@ proc encodeRpcMsg*(msg: RPCMsg, anonymize: bool): seq[byte] =
pb.write(2, item, anonymize) pb.write(2, item, anonymize)
if msg.control.isSome(): if msg.control.isSome():
pb.write(3, msg.control.get()) pb.write(3, msg.control.get())
# nim-libp2p extension, using fields which are unlikely to be used
# by other extensions
if msg.ping.len > 0:
pb.write(60, msg.ping)
if msg.pong.len > 0:
pb.write(61, msg.pong)
if len(pb.buffer) > 0: if len(pb.buffer) > 0:
pb.finish() pb.finish()
pb.buffer pb.buffer
@ -327,4 +333,6 @@ proc decodeRpcMsg*(msg: seq[byte]): ProtoResult[RPCMsg] {.inline.} =
assign(rpcMsg.get().messages, ? pb.decodeMessages()) assign(rpcMsg.get().messages, ? pb.decodeMessages())
assign(rpcMsg.get().subscriptions, ? pb.decodeSubscriptions()) assign(rpcMsg.get().subscriptions, ? pb.decodeSubscriptions())
assign(rpcMsg.get().control, ? pb.decodeControl()) assign(rpcMsg.get().control, ? pb.decodeControl())
discard ? pb.getField(60, rpcMsg.get().ping)
discard ? pb.getField(61, rpcMsg.get().pong)
rpcMsg rpcMsg