From 1c4d0832ce658093449150b8fe32be541816691d Mon Sep 17 00:00:00 2001 From: Tanguy Date: Wed, 21 Jun 2023 10:40:10 +0200 Subject: [PATCH] Add GossipSub ping (#912) --- libp2p/protocols/pubsub/gossipsub.nim | 4 ++++ libp2p/protocols/pubsub/gossipsub/behavior.nim | 1 + libp2p/protocols/pubsub/gossipsub/types.nim | 1 + libp2p/protocols/pubsub/pubsubpeer.nim | 1 + libp2p/protocols/pubsub/rpc/messages.nim | 2 ++ libp2p/protocols/pubsub/rpc/protobuf.nim | 8 ++++++++ 6 files changed, 17 insertions(+) diff --git a/libp2p/protocols/pubsub/gossipsub.nim b/libp2p/protocols/pubsub/gossipsub.nim index 75fc48d9a..0743aa67a 100644 --- a/libp2p/protocols/pubsub/gossipsub.nim +++ b/libp2p/protocols/pubsub/gossipsub.nim @@ -156,6 +156,7 @@ method onNewPeer(g: GossipSub, peer: PubSubPeer) = peer.behaviourPenalty = stats.behaviourPenalty peer.iHaveBudget = IHavePeerBudget + peer.pingBudget = PingsPeerBudget method onPubSubPeerEvent*(p: GossipSub, peer: PubSubPeer, event: PubSubPeerEvent) {.gcsafe.} = case event.kind @@ -352,6 +353,9 @@ proc validateAndRelay(g: GossipSub, method rpcHandler*(g: GossipSub, peer: PubSubPeer, rpcMsg: RPCMsg) {.async.} = + if rpcMsg.ping.len in 1..<64 and peer.pingBudget > 0: + g.send(peer, RPCMsg(pong: rpcMsg.ping)) + peer.pingBudget.dec for i in 0.. g.parameters.historyLength: discard peer.sentIHaves.popLast() peer.iHaveBudget = IHavePeerBudget + peer.pingBudget = PingsPeerBudget var meshMetrics = MeshMetrics() diff --git a/libp2p/protocols/pubsub/gossipsub/types.nim b/libp2p/protocols/pubsub/gossipsub/types.nim index 6827460cd..0f5fe55c7 100644 --- a/libp2p/protocols/pubsub/gossipsub/types.nim +++ b/libp2p/protocols/pubsub/gossipsub/types.nim @@ -45,6 +45,7 @@ const const BackoffSlackTime* = 2 # seconds + PingsPeerBudget* = 100 # maximum of 6.4kb/heartbeat (6.4kb/s with default 1 second/hb) IHavePeerBudget* = 10 # the max amount of IHave to expose, not by spec, but go as example # rust sigp: https://github.com/sigp/rust-libp2p/blob/f53d02bc873fef2bf52cd31e3d5ce366a41d8a8c/protocols/gossipsub/src/config.rs#L572 diff --git a/libp2p/protocols/pubsub/pubsubpeer.nim b/libp2p/protocols/pubsub/pubsubpeer.nim index e1fc989bb..371a6f3e7 100644 --- a/libp2p/protocols/pubsub/pubsubpeer.nim +++ b/libp2p/protocols/pubsub/pubsubpeer.nim @@ -61,6 +61,7 @@ type score*: float64 sentIHaves*: Deque[HashSet[MessageId]] iHaveBudget*: int + pingBudget*: int maxMessageSize: int appScore*: float64 # application specific score behaviourPenalty*: float64 # the eventual penalty score diff --git a/libp2p/protocols/pubsub/rpc/messages.nim b/libp2p/protocols/pubsub/rpc/messages.nim index 6192be920..bd67edfe3 100644 --- a/libp2p/protocols/pubsub/rpc/messages.nim +++ b/libp2p/protocols/pubsub/rpc/messages.nim @@ -62,6 +62,8 @@ type subscriptions*: seq[SubOpts] messages*: seq[Message] control*: Option[ControlMessage] + ping*: seq[byte] + pong*: seq[byte] func withSubs*( T: type RPCMsg, topics: openArray[string], subscribe: bool): T = diff --git a/libp2p/protocols/pubsub/rpc/protobuf.nim b/libp2p/protocols/pubsub/rpc/protobuf.nim index 03f710e27..11dc27740 100644 --- a/libp2p/protocols/pubsub/rpc/protobuf.nim +++ b/libp2p/protocols/pubsub/rpc/protobuf.nim @@ -316,6 +316,12 @@ proc encodeRpcMsg*(msg: RPCMsg, anonymize: bool): seq[byte] = pb.write(2, item, anonymize) if msg.control.isSome(): pb.write(3, msg.control.get()) + # nim-libp2p extension, using fields which are unlikely to be used + # by other extensions + if msg.ping.len > 0: + pb.write(60, msg.ping) + if msg.pong.len > 0: + pb.write(61, msg.pong) if len(pb.buffer) > 0: pb.finish() pb.buffer @@ -327,4 +333,6 @@ proc decodeRpcMsg*(msg: seq[byte]): ProtoResult[RPCMsg] {.inline.} = assign(rpcMsg.get().messages, ? pb.decodeMessages()) assign(rpcMsg.get().subscriptions, ? pb.decodeSubscriptions()) assign(rpcMsg.get().control, ? pb.decodeControl()) + discard ? pb.getField(60, rpcMsg.get().ping) + discard ? pb.getField(61, rpcMsg.get().pong) rpcMsg