From dc48170b0dd9301d75e4d40b4496db82ab463ff5 Mon Sep 17 00:00:00 2001 From: Giovanni Petrantoni <7008900+sinkingsugar@users.noreply.github.com> Date: Wed, 13 Jan 2021 23:49:44 +0900 Subject: [PATCH] Gossip subscription improvements (#497) * salt ids in seen table * add subscription validation callback and avoid processing topics we don't care of * apply penalty on bad subscription * fix IHave handling IDs * reduce indenting, add some comments * fix gossip randombytes generation * do not descore unwanted topics (might happen, due to timing, needs improvements) * cleaning up and added tests * validate subscriptions only when subscribing * set notice level for failed publish * fix floodsub behavior --- libp2p/protocols/pubsub/floodsub.nim | 16 +++++-- libp2p/protocols/pubsub/gossipsub.nim | 63 ++++++++++++++++++++++----- libp2p/protocols/pubsub/pubsub.nim | 18 +++++--- tests/pubsub/testgossipinternal.nim | 37 ++++++++++++++++ tests/pubsub/testgossipsub.nim | 54 +++++++++++++++++++++++ 5 files changed, 167 insertions(+), 21 deletions(-) diff --git a/libp2p/protocols/pubsub/floodsub.nim b/libp2p/protocols/pubsub/floodsub.nim index a604b73..f713095 100644 --- a/libp2p/protocols/pubsub/floodsub.nim +++ b/libp2p/protocols/pubsub/floodsub.nim @@ -45,17 +45,25 @@ method subscribeTopic*(f: FloodSub, trace "ignoring unknown peer" return - procCall PubSub(f).subscribeTopic(topic, subscribe, peer) - - if topic notin f.floodsub: - f.floodsub[topic] = initHashSet[PubSubPeer]() + if subscribe and not(isNil(f.subscriptionValidator)) and not(f.subscriptionValidator(topic)): + # this is a violation, so warn should be in order + warn "ignoring invalid topic subscription", topic, peer + return if subscribe: + if topic notin f.floodsub: + f.floodsub[topic] = initHashSet[PubSubPeer]() + trace "adding subscription for topic", peer, topic + # subscribe the peer to the topic f.floodsub[topic].incl(peer) else: + if topic notin f.floodsub: + return + trace "removing subscription for topic", peer, topic + # unsubscribe the peer from the topic f.floodsub[topic].excl(peer) diff --git a/libp2p/protocols/pubsub/gossipsub.nim b/libp2p/protocols/pubsub/gossipsub.nim index daec486..2f7cf8d 100644 --- a/libp2p/protocols/pubsub/gossipsub.nim +++ b/libp2p/protocols/pubsub/gossipsub.nim @@ -8,7 +8,7 @@ ## those terms. import std/[tables, sets, options, sequtils, strutils, random, algorithm] -import chronos, chronicles, metrics +import chronos, chronicles, metrics, bearssl import ./pubsub, ./floodsub, ./pubsubpeer, @@ -20,7 +20,8 @@ import ./pubsub, ../../stream/connection, ../../peerinfo, ../../peerid, - ../../utility + ../../utility, + ../../crypto/curve25519 import stew/results export results @@ -106,7 +107,10 @@ type PeerStats* = object topicInfos*: Table[string, TopicInfo] expire*: Moment # updated on disconnect, to retain scores until expire + # the following are copies from PubSubPeer, in order to restore them on re-connection score*: float64 # a copy of the score to keep in case the peer is disconnected + appScore*: float64 # application specific score + behaviourPenalty*: float64 # the eventual penalty score GossipSubParams* = object explicit: bool @@ -166,6 +170,8 @@ type heartbeatEvents*: seq[AsyncEvent] + randomBytes: seq[byte] + MeshMetrics = object # scratch buffers for metrics otherPeersPerTopicMesh: int64 @@ -334,7 +340,10 @@ method onNewPeer(g: GossipSub, peer: PubSubPeer) = else: # we knew this peer # restore previously stored score - peer.score = g.peerStats[peer.peerId].score + let stats = g.peerStats[peer.peerId] + peer.score = stats.score + peer.appScore = stats.appScore + peer.behaviourPenalty = stats.behaviourPenalty proc grafted(g: GossipSub, p: PubSubPeer, topic: string) = g.peerStats.withValue(p.peerId, stats): @@ -823,6 +832,8 @@ proc updateScores(g: GossipSub) = # avoid async # copy into stats the score to keep until expired stats.score = peer.score + stats.appScore = peer.appScore + stats.behaviourPenalty = peer.behaviourPenalty assert(g.peerStats[peer.peerId].score == peer.score) # nim sanity check trace "updated peer's score", peer, score = peer.score, n_topics, is_grafted @@ -882,7 +893,6 @@ proc heartbeat(g: GossipSub) {.async.} = # do this before relance # in order to avoid grafted -> pruned in the same cycle let meshPeers = g.mesh.getOrDefault(t) - let gossipPeers = g.gossipsub.getOrDefault(t) var prunes: seq[PubSubPeer] for peer in meshPeers: if peer.score < 0.0: @@ -984,17 +994,23 @@ method subscribeTopic*(g: GossipSub, trace "ignoring unknown peer" return - # Skip floodsub - we don't want it to add the peer to `g.floodsub` - procCall PubSub(g).subscribeTopic(topic, subscribe, peer) + if subscribe and not(isNil(g.subscriptionValidator)) and not(g.subscriptionValidator(topic)): + # this is a violation, so warn should be in order + warn "ignoring invalid topic subscription", topic, peer + # also punish + peer.behaviourPenalty += 1 + return if subscribe: trace "peer subscribed to topic" + # subscribe remote peer to the topic discard g.gossipsub.addPeer(topic, peer) if peer.peerId in g.parameters.directPeers: discard g.explicit.addPeer(topic, peer) else: trace "peer unsubscribed from topic" + # unsubscribe remote peer from the topic g.gossipsub.removePeer(topic, peer) g.mesh.removePeer(topic, peer) @@ -1006,6 +1022,9 @@ method subscribeTopic*(g: GossipSub, proc punishPeer(g: GossipSub, peer: PubSubPeer, topics: seq[string]) = for t in topics: + if t notin g.topics: + continue + # ensure we init a new topic if unknown let _ = g.topicParams.mgetOrPut(t, TopicParams.init()) # update stats @@ -1125,7 +1144,8 @@ proc handleIHave(g: GossipSub, peer, topic = ihave.topicID, msgs = ihave.messageIDs if ihave.topicID in g.mesh: for m in ihave.messageIDs: - if m notin g.seen: + let msgId = m & g.randomBytes + if msgId notin g.seen: if peer.iHaveBudget > 0: result.messageIDs.add(m) dec peer.iHaveBudget @@ -1165,11 +1185,17 @@ method rpcHandler*(g: GossipSub, for msg in rpcMsg.messages: # for every message let msgId = g.msgIdProvider(msg) - if g.seen.put(msgId): + # avoid the remote peer from controlling the seen table hashing + # by adding random bytes to the ID we ensure we randomize the IDs + # we do only for seen as this is the great filter from the external world + if g.seen.put(msgId & g.randomBytes): trace "Dropping already-seen message", msgId = shortLog(msgId), peer # make sure to update score tho before continuing - for t in msg.topicIDs: # for every topic in the message + for t in msg.topicIDs: + if t notin g.topics: + continue + # for every topic in the message let topicParams = g.topicParams.mgetOrPut(t, TopicParams.init()) # if in mesh add more delivery score g.peerStats.withValue(peer.peerId, pstats): @@ -1192,6 +1218,11 @@ method rpcHandler*(g: GossipSub, # onto the next message continue + # avoid processing messages we are not interested in + if msg.topicIDs.allIt(it notin g.topics): + debug "Dropping message of topic without subscription", msgId = shortLog(msgId), peer + continue + if (msg.signature.len > 0 or g.verifySignature) and not msg.verify(): # always validate if signature is present or required debug "Dropping message due to failed signature verification", @@ -1228,6 +1259,9 @@ method rpcHandler*(g: GossipSub, var toSendPeers = initHashSet[PubSubPeer]() for t in msg.topicIDs: # for every topic in the message + if t notin g.topics: + continue + let topicParams = g.topicParams.mgetOrPut(t, TopicParams.init()) g.peerStats.withValue(peer.peerId, pstats): @@ -1360,7 +1394,9 @@ method publish*(g: GossipSub, # base returns always 0 discard await procCall PubSub(g).publish(topic, data) - logScope: topic + logScope: + topic + trace "Publishing message on topic", data = data.shortLog if topic.len <= 0: # data could be 0/empty @@ -1398,7 +1434,7 @@ method publish*(g: GossipSub, g.lastFanoutPubSub[topic] = Moment.fromNow(g.parameters.fanoutTTL) if peers.len == 0: - debug "No peers for topic, skipping publish" + notice "No peers for topic, skipping publish" # skipping topic as our metrics finds that heavy libp2p_gossipsub_failed_publish.inc() return 0 @@ -1416,7 +1452,7 @@ method publish*(g: GossipSub, trace "Created new message", msg = shortLog(msg), peers = peers.len - if g.seen.put(msgId): + if g.seen.put(msgId & g.randomBytes): # custom msgid providers might cause this trace "Dropping already-seen message" return 0 @@ -1494,3 +1530,6 @@ method initPubSub*(g: GossipSub) = g.lastFanoutPubSub = initTable[string, Moment]() # last publish time for fanout topics g.gossip = initTable[string, seq[ControlIHave]]() # pending gossip g.control = initTable[string, ControlMessage]() # pending control messages + var rng = newRng() + g.randomBytes = newSeqUninitialized[byte](32) + brHmacDrbgGenerate(rng[], g.randomBytes) diff --git a/libp2p/protocols/pubsub/pubsub.nim b/libp2p/protocols/pubsub/pubsub.nim index 740b6ee..77e944f 100644 --- a/libp2p/protocols/pubsub/pubsub.nim +++ b/libp2p/protocols/pubsub/pubsub.nim @@ -75,12 +75,15 @@ type Accept, Reject, Ignore ValidatorHandler* = proc(topic: string, - message: Message): Future[ValidationResult] {.gcsafe, closure.} + message: Message): Future[ValidationResult] {.gcsafe.} TopicPair* = tuple[topic: string, handler: TopicHandler] MsgIdProvider* = - proc(m: Message): MessageID {.noSideEffect, raises: [Defect], nimcall, gcsafe.} + proc(m: Message): MessageID {.noSideEffect, raises: [Defect], gcsafe.} + + SubscriptionValidator* = + proc(topic: string): bool {.raises: [Defect], gcsafe.} Topic* = object # make this a variant type if one day we have different Params structs @@ -102,6 +105,7 @@ type msgIdProvider*: MsgIdProvider # Turn message into message id (not nil) msgSeqno*: uint64 anonymize*: bool # if we omit fromPeer and seqno from RPC messages we send + subscriptionValidator*: SubscriptionValidator # callback used to validate subscriptions knownTopics*: HashSet[string] @@ -194,8 +198,9 @@ method subscribeTopic*(p: PubSub, topic: string, subscribe: bool, peer: PubSubPeer) {.base.} = - # called when remote peer subscribes to a topic - discard + # both gossipsub and floodsub diverge, and this super call is not necessary right now + # if necessary remove the assertion + doAssert(false, "unexpected call to pubsub.subscribeTopic") method rpcHandler*(p: PubSub, peer: PubSubPeer, @@ -503,6 +508,7 @@ proc init*[PubParams: object | bool]( verifySignature: bool = true, sign: bool = true, msgIdProvider: MsgIdProvider = defaultMsgIdProvider, + subscriptionValidator: SubscriptionValidator = nil, parameters: PubParams = false): P = let pubsub = when PubParams is bool: @@ -514,7 +520,8 @@ proc init*[PubParams: object | bool]( sign: sign, peers: initTable[PeerID, PubSubPeer](), topics: initTable[string, Topic](), - msgIdProvider: msgIdProvider) + msgIdProvider: msgIdProvider, + subscriptionValidator: subscriptionValidator) else: P(switch: switch, peerInfo: switch.peerInfo, @@ -525,6 +532,7 @@ proc init*[PubParams: object | bool]( peers: initTable[PeerID, PubSubPeer](), topics: initTable[string, Topic](), msgIdProvider: msgIdProvider, + subscriptionValidator: subscriptionValidator, parameters: parameters) proc peerEventHandler(peerId: PeerID, event: PeerEvent) {.async.} = diff --git a/tests/pubsub/testgossipinternal.nim b/tests/pubsub/testgossipinternal.nim index 273afaf..6ee7729 100644 --- a/tests/pubsub/testgossipinternal.nim +++ b/tests/pubsub/testgossipinternal.nim @@ -414,3 +414,40 @@ suite "GossipSub internal": await allFuturesThrowing(conns.mapIt(it.close())) await gossipSub.switch.stop() + + asyncTest "Drop messages of topics without subscription": + let gossipSub = TestGossipSub.init(newStandardSwitch()) + + proc handler(peer: PubSubPeer, msg: RPCMsg) {.async.} = + check false + + let topic = "foobar" + # gossipSub.topicParams[topic] = TopicParams.init() + # gossipSub.mesh[topic] = initHashSet[PubSubPeer]() + # gossipSub.fanout[topic] = initHashSet[PubSubPeer]() + var conns = newSeq[Connection]() + for i in 0..<30: + let conn = newBufferStream(noop) + conns &= conn + let peerInfo = randomPeerInfo() + conn.peerInfo = peerInfo + let peer = gossipSub.getPubSubPeer(peerInfo.peerId) + gossipSub.onNewPeer(peer) + peer.handler = handler + + # generate messages + var seqno = 0'u64 + for i in 0..5: + let conn = newBufferStream(noop) + conns &= conn + let peerInfo = randomPeerInfo() + conn.peerInfo = peerInfo + let peer = gossipSub.getPubSubPeer(peerInfo.peerId) + inc seqno + let msg = Message.init(some(peerInfo), ("bar" & $i).toBytes(), topic, some(seqno), false) + await gossipSub.rpcHandler(peer, RPCMsg(messages: @[msg])) + + check gossipSub.mcache.msgs.len == 0 + + await allFuturesThrowing(conns.mapIt(it.close())) + await gossipSub.switch.stop() diff --git a/tests/pubsub/testgossipsub.nim b/tests/pubsub/testgossipsub.nim index e4b8392..921f25f 100644 --- a/tests/pubsub/testgossipsub.nim +++ b/tests/pubsub/testgossipsub.nim @@ -662,3 +662,57 @@ suite "GossipSub": it.switch.stop()))) await allFuturesThrowing(nodesFut) + + asyncTest "GossipSub invalid topic subscription": + var handlerFut = newFuture[bool]() + proc handler(topic: string, data: seq[byte]) {.async, gcsafe.} = + check topic == "foobar" + handlerFut.complete(true) + + let + nodes = generateNodes(2, gossip = true) + + # start switches + nodesFut = await allFinished( + nodes[0].switch.start(), + nodes[1].switch.start(), + ) + + # start pubsub + await allFuturesThrowing( + allFinished( + nodes[0].start(), + nodes[1].start(), + )) + + var gossip = GossipSub(nodes[0]) + let invalidDetected = newFuture[void]() + gossip.subscriptionValidator = + proc(topic: string): bool = + if topic == "foobar": + try: + invalidDetected.complete() + except: + raise newException(Defect, "Exception during subscriptionValidator") + false + else: + true + + await subscribeNodes(nodes) + + nodes[0].subscribe("foobar", handler) + nodes[1].subscribe("foobar", handler) + + await invalidDetected.wait(10.seconds) + + await allFuturesThrowing( + nodes[0].switch.stop(), + nodes[1].switch.stop() + ) + + await allFuturesThrowing( + nodes[0].stop(), + nodes[1].stop() + ) + + await allFuturesThrowing(nodesFut.concat())