diff --git a/libp2p/protocols/pubsub/floodsub.nim b/libp2p/protocols/pubsub/floodsub.nim index 48bc711c5..004b38413 100644 --- a/libp2p/protocols/pubsub/floodsub.nim +++ b/libp2p/protocols/pubsub/floodsub.nim @@ -7,8 +7,8 @@ ## This file may not be copied, modified, or distributed except according to ## those terms. -import std/[sequtils, sets, tables] -import chronos, chronicles, metrics +import std/[sequtils, sets, hashes, tables] +import chronos, chronicles, metrics, bearssl import ./pubsub, ./pubsubpeer, ./timedcache, @@ -27,7 +27,17 @@ const FloodSubCodec* = "/floodsub/1.0.0" type FloodSub* = ref object of PubSub floodsub*: PeerTable # topic to remote peer map - seen*: TimedCache[MessageID] # list of messages forwarded to peers + seen*: TimedCache[MessageID] # message id:s already seen on the network + seenSalt*: seq[byte] + +proc hasSeen*(f: FloodSub, msgId: MessageID): bool = + f.seenSalt & msgId in f.seen + +proc addSeen*(f: FloodSub, msgId: MessageID): bool = + # Salting the seen hash helps avoid attacks against the hash function used + # in the nim hash table + # Return true if the message has already been seen + f.seen.put(f.seenSalt & msgId) method subscribeTopic*(f: FloodSub, topic: string, @@ -88,7 +98,7 @@ method rpcHandler*(f: FloodSub, for msg in rpcMsg.messages: # for every message let msgId = f.msgIdProvider(msg) - if f.seen.put(msgId): + if f.addSeen(msgId): trace "Dropping already-seen message", msgId, peer continue @@ -118,13 +128,15 @@ method rpcHandler*(f: FloodSub, var toSendPeers = initHashSet[PubSubPeer]() for t in msg.topicIDs: # for every topic in the message + if t notin f.topics: + continue f.floodsub.withValue(t, peers): toSendPeers.incl(peers[]) await handleData(f, t, msg.data) # In theory, if topics are the same in all messages, we could batch - we'd # also have to be careful to only include validated messages - f.broadcast(toSeq(toSendPeers), RPCMsg(messages: @[msg])) + f.broadcast(toSendPeers, RPCMsg(messages: @[msg])) trace "Forwared message to peers", peers = toSendPeers.len method init*(f: FloodSub) = @@ -157,7 +169,7 @@ method publish*(f: FloodSub, debug "Empty topic, skipping publish", topic return 0 - let peers = toSeq(f.floodsub.getOrDefault(topic)) + let peers = f.floodsub.getOrDefault(topic) if peers.len == 0: debug "No peers for topic, skipping publish", topic @@ -175,7 +187,7 @@ method publish*(f: FloodSub, trace "Created new message", msg = shortLog(msg), peers = peers.len, topic, msgId - if f.seen.put(msgId): + if f.addSeen(msgId): # custom msgid providers might cause this trace "Dropping already-seen message", msgId, topic return 0 @@ -206,4 +218,8 @@ method unsubscribeAll*(f: FloodSub, topic: string) = method initPubSub*(f: FloodSub) = procCall PubSub(f).initPubSub() f.seen = TimedCache[MessageID].init(2.minutes) + var rng = newRng() + f.seenSalt = newSeqUninitialized[byte](sizeof(Hash)) + brHmacDrbgGenerate(rng[], f.seenSalt) + f.init() diff --git a/libp2p/protocols/pubsub/gossipsub.nim b/libp2p/protocols/pubsub/gossipsub.nim index 2cf50a262..3b408e3b8 100644 --- a/libp2p/protocols/pubsub/gossipsub.nim +++ b/libp2p/protocols/pubsub/gossipsub.nim @@ -8,7 +8,7 @@ ## those terms. import std/[tables, sets, options, sequtils, random] -import chronos, chronicles, metrics, bearssl +import chronos, chronicles, metrics import ./pubsub, ./floodsub, ./pubsubpeer, @@ -98,27 +98,6 @@ proc validateParameters*(parameters: GossipSubParams): Result[void, cstring] = else: ok() -proc init*(_: type[TopicParams]): TopicParams = - TopicParams( - topicWeight: 0.0, # disabled by default - timeInMeshWeight: 0.01, - timeInMeshQuantum: 1.seconds, - timeInMeshCap: 10.0, - firstMessageDeliveriesWeight: 1.0, - firstMessageDeliveriesDecay: 0.5, - firstMessageDeliveriesCap: 10.0, - meshMessageDeliveriesWeight: -1.0, - meshMessageDeliveriesDecay: 0.5, - meshMessageDeliveriesCap: 10, - meshMessageDeliveriesThreshold: 1, - meshMessageDeliveriesWindow: 5.milliseconds, - meshMessageDeliveriesActivation: 10.seconds, - meshFailurePenaltyWeight: -1.0, - meshFailurePenaltyDecay: 0.5, - invalidMessageDeliveriesWeight: -1.0, - invalidMessageDeliveriesDecay: 0.5 - ) - proc validateParameters*(parameters: TopicParams): Result[void, cstring] = if parameters.timeInMeshWeight <= 0.0 or parameters.timeInMeshWeight > 1.0: err("gossipsub: timeInMeshWeight parameter error, Must be a small positive value") @@ -262,6 +241,37 @@ method subscribeTopic*(g: GossipSub, trace "gossip peers", peers = g.gossipsub.peers(topic), topic +proc handleControl(g: GossipSub, peer: PubSubPeer, rpcMsg: RPCMsg) = + if rpcMsg.control.isSome: + let control = rpcMsg.control.get() + g.handlePrune(peer, control.prune) + + var respControl: ControlMessage + respControl.iwant.add(g.handleIHave(peer, control.ihave)) + respControl.prune.add(g.handleGraft(peer, control.graft)) + let messages = g.handleIWant(peer, control.iwant) + + if respControl.graft.len > 0 or respControl.prune.len > 0 or + respControl.ihave.len > 0 or messages.len > 0: + # iwant and prunes from here, also messages + + for smsg in messages: + for topic in smsg.topicIDs: + if g.knownTopics.contains(topic): + libp2p_pubsub_broadcast_messages.inc(labelValues = [topic]) + else: + libp2p_pubsub_broadcast_messages.inc(labelValues = ["generic"]) + libp2p_pubsub_broadcast_iwant.inc(respControl.iwant.len.int64) + for prune in respControl.prune: + if g.knownTopics.contains(prune.topicID): + libp2p_pubsub_broadcast_prune.inc(labelValues = [prune.topicID]) + else: + libp2p_pubsub_broadcast_prune.inc(labelValues = ["generic"]) + trace "sending control message", msg = shortLog(respControl), peer + g.send( + peer, + RPCMsg(control: some(respControl), messages: messages)) + method rpcHandler*(g: GossipSub, peer: PubSubPeer, rpcMsg: RPCMsg) {.async.} = @@ -283,26 +293,12 @@ method rpcHandler*(g: GossipSub, # avoid the remote peer from controlling the seen table hashing # by adding random bytes to the ID we ensure we randomize the IDs # we do only for seen as this is the great filter from the external world - if g.seen.put(msgId & g.randomBytes): + if g.addSeen(msgId): trace "Dropping already-seen message", msgId = shortLog(msgId), peer - # make sure to update score tho before continuing - for t in msg.topicIDs: - if t notin g.topics: - continue - # for every topic in the message - let topicParams = g.topicParams.mgetOrPut(t, TopicParams.init()) - # if in mesh add more delivery score - g.withPeerStats(peer.peerId) do (stats: var PeerStats): - stats.topicInfos.withValue(t, tstats): - if tstats[].inMesh: - # TODO: take into account meshMessageDeliveriesWindow - # score only if messages are not too old. - tstats[].meshMessageDeliveries += 1 - if tstats[].meshMessageDeliveries > topicParams.meshMessageDeliveriesCap: - tstats[].meshMessageDeliveries = topicParams.meshMessageDeliveriesCap - do: # make sure we don't loose this information - stats.topicInfos[t] = TopicInfo(meshMessageDeliveries: 1) + # TODO: take into account meshMessageDeliveriesWindow + # score only if messages are not too old. + g.rewardDelivered(peer, msg.topicIDs, false) # onto the next message continue @@ -346,28 +342,13 @@ method rpcHandler*(g: GossipSub, # store in cache only after validation g.mcache.put(msgId, msg) + g.rewardDelivered(peer, msg.topicIDs, true) + var toSendPeers = initHashSet[PubSubPeer]() for t in msg.topicIDs: # for every topic in the message if t notin g.topics: continue - let topicParams = g.topicParams.mgetOrPut(t, TopicParams.init()) - - g.withPeerStats(peer.peerId) do(stats: var PeerStats): - stats.topicInfos.withValue(t, tstats): - # contribute to peer score first delivery - tstats[].firstMessageDeliveries += 1 - if tstats[].firstMessageDeliveries > topicParams.firstMessageDeliveriesCap: - tstats[].firstMessageDeliveries = topicParams.firstMessageDeliveriesCap - - # if in mesh add more delivery score - if tstats[].inMesh: - tstats[].meshMessageDeliveries += 1 - if tstats[].meshMessageDeliveries > topicParams.meshMessageDeliveriesCap: - tstats[].meshMessageDeliveries = topicParams.meshMessageDeliveriesCap - do: # make sure we don't loose this information - stats.topicInfos[t] = TopicInfo(firstMessageDeliveries: 1, meshMessageDeliveries: 1) - g.floodsub.withValue(t, peers): toSendPeers.incl(peers[]) g.mesh.withValue(t, peers): toSendPeers.incl(peers[]) @@ -375,44 +356,15 @@ method rpcHandler*(g: GossipSub, # In theory, if topics are the same in all messages, we could batch - we'd # also have to be careful to only include validated messages - let sendingTo = toSeq(toSendPeers) - g.broadcast(sendingTo, RPCMsg(messages: @[msg])) - trace "forwared message to peers", peers = sendingTo.len, msgId, peer + g.broadcast(toSendPeers, RPCMsg(messages: @[msg])) + trace "forwared message to peers", peers = toSendPeers.len, msgId, peer for topic in msg.topicIDs: if g.knownTopics.contains(topic): - libp2p_pubsub_messages_rebroadcasted.inc(sendingTo.len.int64, labelValues = [topic]) + libp2p_pubsub_messages_rebroadcasted.inc(toSendPeers.len.int64, labelValues = [topic]) else: - libp2p_pubsub_messages_rebroadcasted.inc(sendingTo.len.int64, labelValues = ["generic"]) + libp2p_pubsub_messages_rebroadcasted.inc(toSendPeers.len.int64, labelValues = ["generic"]) - if rpcMsg.control.isSome: - let control = rpcMsg.control.get() - g.handlePrune(peer, control.prune) - - var respControl: ControlMessage - respControl.iwant.add(g.handleIHave(peer, control.ihave)) - respControl.prune.add(g.handleGraft(peer, control.graft)) - let messages = g.handleIWant(peer, control.iwant) - - if respControl.graft.len > 0 or respControl.prune.len > 0 or - respControl.ihave.len > 0 or messages.len > 0: - # iwant and prunes from here, also messages - - for smsg in messages: - for topic in smsg.topicIDs: - if g.knownTopics.contains(topic): - libp2p_pubsub_broadcast_messages.inc(labelValues = [topic]) - else: - libp2p_pubsub_broadcast_messages.inc(labelValues = ["generic"]) - libp2p_pubsub_broadcast_iwant.inc(respControl.iwant.len.int64) - for prune in respControl.prune: - if g.knownTopics.contains(prune.topicID): - libp2p_pubsub_broadcast_prune.inc(labelValues = [prune.topicID]) - else: - libp2p_pubsub_broadcast_prune.inc(labelValues = ["generic"]) - trace "sending control message", msg = shortLog(respControl), peer - g.send( - peer, - RPCMsg(control: some(respControl), messages: messages)) + g.handleControl(peer, rpcMsg) method subscribe*(g: GossipSub, topic: string, @@ -437,7 +389,7 @@ proc unsubscribe*(g: GossipSub, topic: string) = # remove mesh peers from gpeers, we send 2 different messages gpeers = gpeers - mpeers # send to peers NOT in mesh first - g.broadcast(toSeq(gpeers), msg) + g.broadcast(gpeers, msg) for peer in mpeers: trace "pruning unsubscribeAll call peer", peer, score = peer.score @@ -452,9 +404,9 @@ proc unsubscribe*(g: GossipSub, topic: string) = backoff: g.parameters.pruneBackoff.seconds.uint64)])) # send to peers IN mesh now - g.broadcast(toSeq(mpeers), msg) + g.broadcast(mpeers, msg) else: - g.broadcast(toSeq(gpeers), msg) + g.broadcast(gpeers, msg) g.topicParams.del(topic) @@ -540,19 +492,19 @@ method publish*(g: GossipSub, trace "Created new message", msg = shortLog(msg), peers = peers.len - if g.seen.put(msgId & g.randomBytes): + if g.addSeen(msgId): # custom msgid providers might cause this trace "Dropping already-seen message" return 0 g.mcache.put(msgId, msg) - let peerSeq = toSeq(peers) - g.broadcast(peerSeq, RPCMsg(messages: @[msg])) + g.broadcast(peers, RPCMsg(messages: @[msg])) + if g.knownTopics.contains(topic): - libp2p_pubsub_messages_published.inc(peerSeq.len.int64, labelValues = [topic]) + libp2p_pubsub_messages_published.inc(peers.len.int64, labelValues = [topic]) else: - libp2p_pubsub_messages_published.inc(peerSeq.len.int64, labelValues = ["generic"]) + libp2p_pubsub_messages_published.inc(peers.len.int64, labelValues = ["generic"]) trace "Published message to peers" @@ -618,6 +570,3 @@ method initPubSub*(g: GossipSub) = # init gossip stuff g.mcache = MCache.init(g.parameters.historyGossip, g.parameters.historyLength) - var rng = newRng() - g.randomBytes = newSeqUninitialized[byte](32) - brHmacDrbgGenerate(rng[], g.randomBytes) diff --git a/libp2p/protocols/pubsub/gossipsub/behavior.nim b/libp2p/protocols/pubsub/gossipsub/behavior.nim index d734762bc..0a2ac6175 100644 --- a/libp2p/protocols/pubsub/gossipsub/behavior.nim +++ b/libp2p/protocols/pubsub/gossipsub/behavior.nim @@ -13,7 +13,7 @@ import std/[tables, sequtils, sets, algorithm] import random # for shuffle import chronos, chronicles, metrics import "."/[types, scoring] -import ".."/[pubsubpeer, peertable, timedcache, mcache, pubsub] +import ".."/[pubsubpeer, peertable, timedcache, mcache, floodsub, pubsub] import "../rpc"/[messages] import "../../.."/[peerid, multiaddress, utility, switch] @@ -198,11 +198,10 @@ proc handleIHave*(g: GossipSub, if ihave.topicID in g.mesh: # also avoid duplicates here! let deIhavesMsgs = ihave.messageIDs.deduplicate() - for m in deIhavesMsgs: - let msgId = m & g.randomBytes - if msgId notin g.seen: + for msgId in deIhavesMsgs: + if not g.hasSeen(msgId): if peer.iHaveBudget > 0: - result.messageIDs.add(m) + result.messageIDs.add(msgId) dec peer.iHaveBudget else: return diff --git a/libp2p/protocols/pubsub/gossipsub/scoring.nim b/libp2p/protocols/pubsub/gossipsub/scoring.nim index 3c82d5671..79ff50a39 100644 --- a/libp2p/protocols/pubsub/gossipsub/scoring.nim +++ b/libp2p/protocols/pubsub/gossipsub/scoring.nim @@ -25,6 +25,27 @@ declareGauge(libp2p_gossipsub_peers_score_appScore, "Detailed gossipsub scoring declareGauge(libp2p_gossipsub_peers_score_behaviourPenalty, "Detailed gossipsub scoring metric", labels = ["agent"]) declareGauge(libp2p_gossipsub_peers_score_colocationFactor, "Detailed gossipsub scoring metric", labels = ["agent"]) +proc init*(_: type[TopicParams]): TopicParams = + TopicParams( + topicWeight: 0.0, # disabled by default + timeInMeshWeight: 0.01, + timeInMeshQuantum: 1.seconds, + timeInMeshCap: 10.0, + firstMessageDeliveriesWeight: 1.0, + firstMessageDeliveriesDecay: 0.5, + firstMessageDeliveriesCap: 10.0, + meshMessageDeliveriesWeight: -1.0, + meshMessageDeliveriesDecay: 0.5, + meshMessageDeliveriesCap: 10, + meshMessageDeliveriesThreshold: 1, + meshMessageDeliveriesWindow: 5.milliseconds, + meshMessageDeliveriesActivation: 10.seconds, + meshFailurePenaltyWeight: -1.0, + meshFailurePenaltyDecay: 0.5, + invalidMessageDeliveriesWeight: -1.0, + invalidMessageDeliveriesDecay: 0.5 + ) + proc withPeerStats*( g: GossipSub, peerId: PeerId, action: proc (stats: var PeerStats) {.gcsafe, raises: [Defect].}) = @@ -274,3 +295,26 @@ proc punishInvalidMessage*(g: GossipSub, peer: PubSubPeer, topics: seq[string]) # update stats g.withPeerStats(peer.peerId) do (stats: var PeerStats): stats.topicInfos.mgetOrPut(t, TopicInfo()).invalidMessageDeliveries += 1 + +proc addCapped*[T](stat: var T, diff, cap: T) = + stat += min(diff, cap - stat) + +proc rewardDelivered*( + g: GossipSub, peer: PubSubPeer, topics: openArray[string], first: bool) = + for t in topics: + if t notin g.topics: + continue + let topicParams = g.topicParams.mgetOrPut(t, TopicParams.init()) + # if in mesh add more delivery score + + g.withPeerStats(peer.peerId) do (stats: var PeerStats): + stats.topicInfos.withValue(t, tstats): + if tstats[].inMesh: + if first: + tstats[].firstMessageDeliveries.addCapped( + 1, topicParams.firstMessageDeliveriesCap) + + tstats[].meshMessageDeliveries.addCapped( + 1, topicParams.meshMessageDeliveriesCap) + do: # make sure we don't loose this information + stats.topicInfos[t] = TopicInfo(meshMessageDeliveries: 1) diff --git a/libp2p/protocols/pubsub/gossipsub/types.nim b/libp2p/protocols/pubsub/gossipsub/types.nim index bd3a67b9d..05b67769b 100644 --- a/libp2p/protocols/pubsub/gossipsub/types.nim +++ b/libp2p/protocols/pubsub/gossipsub/types.nim @@ -150,8 +150,6 @@ type heartbeatEvents*: seq[AsyncEvent] - randomBytes*: seq[byte] - MeshMetrics* = object # scratch buffers for metrics otherPeersPerTopicMesh*: int64 diff --git a/libp2p/protocols/pubsub/pubsub.nim b/libp2p/protocols/pubsub/pubsub.nim index a608bfc4e..41ccdc87d 100644 --- a/libp2p/protocols/pubsub/pubsub.nim +++ b/libp2p/protocols/pubsub/pubsub.nim @@ -9,8 +9,8 @@ import std/[tables, sequtils, sets, strutils] import chronos, chronicles, metrics -import pubsubpeer, - rpc/[message, messages], +import ./pubsubpeer, + ./rpc/[message, messages, protobuf], ../../switch, ../protocol, ../../stream/connection, @@ -128,7 +128,7 @@ proc send*(p: PubSub, peer: PubSubPeer, msg: RPCMsg) {.raises: [Defect].} = proc broadcast*( p: PubSub, - sendPeers: openArray[PubSubPeer], + sendPeers: auto, # Iteratble[PubSubPeer] msg: RPCMsg) {.raises: [Defect].} = ## Attempt to send `msg` to the given peers @@ -174,8 +174,15 @@ proc broadcast*( trace "broadcasting messages to peers", peers = sendPeers.len, msg = shortLog(msg) - for peer in sendPeers: - p.send(peer, msg) + + if anyIt(sendPeers, it.hasObservers): + for peer in sendPeers: + p.send(peer, msg) + else: + # Fast path that only encodes message once + let encoded = encodeRpcMsg(msg, p.anonymize) + for peer in sendPeers: + peer.sendEncoded(encoded) proc sendSubs*(p: PubSub, peer: PubSubPeer, @@ -205,7 +212,7 @@ method subscribeTopic*(p: PubSub, method rpcHandler*(p: PubSub, peer: PubSubPeer, - rpcMsg: RPCMsg) {.async, base.} = + rpcMsg: RPCMsg): Future[void] {.base.} = ## handle rpc messages trace "processing RPC message", msg = rpcMsg.shortLog, peer for i in 0..