diff --git a/libp2p/protocols/pubsub/gossipsub.nim b/libp2p/protocols/pubsub/gossipsub.nim index 1a6ddc4c2..349088121 100644 --- a/libp2p/protocols/pubsub/gossipsub.nim +++ b/libp2p/protocols/pubsub/gossipsub.nim @@ -318,7 +318,7 @@ method onNewPeer(g: GossipSub, peer: PubSubPeer) = discard proc grafted(g: GossipSub, p: PubSubPeer, topic: string) = - g.peerStats.withValue(p, stats) do: + g.peerStats.withValue(p, stats): var info = stats.topicInfos.getOrDefault(topic) info.graftTime = Moment.now() info.meshTime = 0.seconds @@ -335,7 +335,7 @@ proc grafted(g: GossipSub, p: PubSubPeer, topic: string) = g.grafted(p, topic) proc pruned(g: GossipSub, p: PubSubPeer, topic: string) = - g.peerStats.withValue(p, stats) do: + g.peerStats.withValue(p, stats): when not defined(release): g.prunedPeers.incl(p) @@ -400,7 +400,7 @@ method onPubSubPeerEvent*(p: GossipSub, peer: PubsubPeer, event: PubSubPeerEvent procCall FloodSub(p).onPubSubPeerEvent(peer, event) -proc rebalanceMesh(g: GossipSub, topic: string) {.async.} = +proc rebalanceMesh(g: GossipSub, topic: string) = logScope: topic mesh = g.mesh.peers(topic) @@ -529,6 +529,7 @@ proc rebalanceMesh(g: GossipSub, topic: string) {.async.} = trace "pruning", prunes = prunes.len for peer in prunes: + trace "pruning peer on rebalance", peer, score = peer.score g.pruned(peer, topic) g.mesh.removePeer(topic, peer) @@ -659,6 +660,7 @@ func `/`(a, b: Duration): float64 = proc colocationFactor(g: GossipSub, peer: PubSubPeer): float64 = if peer.connections.len == 0: + debug "colocationFactor, no connections", peer 0.0 else: let @@ -666,6 +668,7 @@ proc colocationFactor(g: GossipSub, peer: PubSubPeer): float64 = ipPeers = g.peersInIP.getOrDefault(address) len = ipPeers.len.float64 if len > g.parameters.ipColocationFactorThreshold: + debug "colocationFactor over threshold", peer, address, len let over = len - g.parameters.ipColocationFactorThreshold over * over else: @@ -697,42 +700,44 @@ proc updateScores(g: GossipSub) = # avoid async var info = stats.topicInfos.getOrDefault(topic) inc n_topics - # Scoring - var topicScore = 0'f64 + # if weight is 0.0 avoid wasting time + if topicParams.topicWeight != 0.0: + # Scoring + var topicScore = 0'f64 - if info.inMesh: - inc is_grafted - info.meshTime = now - info.graftTime - if info.meshTime > topicParams.meshMessageDeliveriesActivation: - info.meshMessageDeliveriesActive = true + if info.inMesh: + inc is_grafted + info.meshTime = now - info.graftTime + if info.meshTime > topicParams.meshMessageDeliveriesActivation: + info.meshMessageDeliveriesActive = true - var p1 = info.meshTime / topicParams.timeInMeshQuantum - if p1 > topicParams.timeInMeshCap: - p1 = topicParams.timeInMeshCap - trace "p1", peer, p1 - topicScore += p1 * topicParams.timeInMeshWeight - else: - info.meshMessageDeliveriesActive = false + var p1 = info.meshTime / topicParams.timeInMeshQuantum + if p1 > topicParams.timeInMeshCap: + p1 = topicParams.timeInMeshCap + trace "p1", peer, p1, topic, topicScore + topicScore += p1 * topicParams.timeInMeshWeight + else: + info.meshMessageDeliveriesActive = false - topicScore += info.firstMessageDeliveries * topicParams.firstMessageDeliveriesWeight - trace "p2", peer, p2 = info.firstMessageDeliveries + topicScore += info.firstMessageDeliveries * topicParams.firstMessageDeliveriesWeight + trace "p2", peer, p2 = info.firstMessageDeliveries, topic, topicScore - if info.meshMessageDeliveriesActive: - if info.meshMessageDeliveries < topicParams.meshMessageDeliveriesThreshold: - let deficit = topicParams.meshMessageDeliveriesThreshold - info.meshMessageDeliveries - let p3 = deficit * deficit - trace "p3", peer, p3 - topicScore += p3 * topicParams.meshMessageDeliveriesWeight + if info.meshMessageDeliveriesActive: + if info.meshMessageDeliveries < topicParams.meshMessageDeliveriesThreshold: + let deficit = topicParams.meshMessageDeliveriesThreshold - info.meshMessageDeliveries + let p3 = deficit * deficit + trace "p3", peer, p3, topic, topicScore + topicScore += p3 * topicParams.meshMessageDeliveriesWeight - topicScore += info.meshFailurePenalty * topicParams.meshFailurePenaltyWeight - trace "p3b", peer, p3b = info.meshFailurePenalty + topicScore += info.meshFailurePenalty * topicParams.meshFailurePenaltyWeight + trace "p3b", peer, p3b = info.meshFailurePenalty, topic, topicScore - topicScore += info.invalidMessageDeliveries * info.invalidMessageDeliveries * topicParams.invalidMessageDeliveriesWeight - trace "p4", p4 = info.invalidMessageDeliveries * info.invalidMessageDeliveries + topicScore += info.invalidMessageDeliveries * info.invalidMessageDeliveries * topicParams.invalidMessageDeliveriesWeight + trace "p4", p4 = info.invalidMessageDeliveries * info.invalidMessageDeliveries, topic, topicScore - trace "updated peer topic's scores", peer, topic, info, topicScore + trace "updated peer topic's scores", peer, topic, info, topicScore - peer.score += topicScore * topicParams.topicWeight + peer.score += topicScore * topicParams.topicWeight # Score decay info.firstMessageDeliveries *= topicParams.firstMessageDeliveriesDecay @@ -811,6 +816,7 @@ proc heartbeat(g: GossipSub) {.async.} = var prunes: seq[PubSubPeer] for peer in meshPeers: if peer.score < 0.0: + trace "pruning negative score peer", peer, score = peer.score g.pruned(peer, t) g.mesh.removePeer(t, peer) prunes &= peer @@ -821,7 +827,7 @@ proc heartbeat(g: GossipSub) {.async.} = backoff: g.parameters.pruneBackoff.seconds.uint64)]))) g.broadcast(prunes, prune) - await g.rebalanceMesh(t) + g.rebalanceMesh(t) libp2p_gossipsub_peers_mesh_sum.set(totalMeshPeers.int64) libp2p_gossipsub_peers_gossipsub_sum.set(totalGossipPeers.int64) @@ -834,7 +840,7 @@ proc heartbeat(g: GossipSub) {.async.} = let peers = g.getGossipPeers() for peer, control in peers: - g.peers.withValue(peer.peerId, pubsubPeer) do: + g.peers.withValue(peer.peerId, pubsubPeer): g.send( pubsubPeer[], RPCMsg(control: some(control))) @@ -864,7 +870,7 @@ method unsubscribePeer*(g: GossipSub, peer: PeerID) = # remove from peer IPs collection too if pubSubPeer.connections.len > 0: - g.peersInIP.withValue(pubSubPeer.connections[0].observedAddr, s) do: + g.peersInIP.withValue(pubSubPeer.connections[0].observedAddr, s): s[].excl(pubSubPeer) for t in toSeq(g.gossipsub.keys): @@ -877,6 +883,7 @@ method unsubscribePeer*(g: GossipSub, peer: PeerID) = .set(g.gossipsub.peers(t).int64, labelValues = [t]) for t in toSeq(g.mesh.keys): + trace "pruning unsubscribing peer", pubSubPeer, score = pubSubPeer.score g.pruned(pubSubPeer, t) g.mesh.removePeer(t, pubSubPeer) @@ -891,8 +898,9 @@ method unsubscribePeer*(g: GossipSub, peer: PeerID) = libp2p_gossipsub_peers_per_topic_fanout .set(g.fanout.peers(t).int64, labelValues = [t]) - g.peerStats[pubSubPeer].expire = Moment.now() + g.parameters.retainScore - for topic, info in g.peerStats[pubSubPeer].topicInfos.mpairs: + g.peerStats.withValue(pubSubPeer, stats): + stats[].expire = Moment.now() + g.parameters.retainScore + for topic, info in stats[].topicInfos.mpairs: info.firstMessageDeliveries = 0 procCall FloodSub(g).unsubscribePeer(peer) @@ -942,9 +950,18 @@ proc punishPeer(g: GossipSub, peer: PubSubPeer, topics: seq[string]) = # ensure we init a new topic if unknown let _ = g.topicParams.mgetOrPut(t, TopicParams.init()) # update stats - var tstats = g.peerStats[peer].topicInfos.getOrDefault(t) - tstats.invalidMessageDeliveries += 1 - g.peerStats[peer].topicInfos[t] = tstats + g.peerStats.withValue(peer, stats): + stats[].topicInfos.withValue(t, tstats): + tstats[].invalidMessageDeliveries += 1 + do: # if we have no stats populate! + stats[].topicInfos[t] = TopicInfo(invalidMessageDeliveries: 1) + do: # if we have no stats populate! + g.peerStats[peer] = + block: + var stats = PeerStats() + stats.topicInfos[t] = TopicInfo(invalidMessageDeliveries: 1) + stats + proc handleGraft(g: GossipSub, peer: PubSubPeer, @@ -1007,6 +1024,7 @@ proc handleGraft(g: GossipSub, else: trace "peer already in mesh" else: + trace "pruning grafting peer, mesh full", peer, score = peer.score, mesh = g.mesh.peers(topic) result.add(ControlPrune( topicID: topic, peers: g.peerExchangeList(topic), @@ -1032,6 +1050,7 @@ proc handlePrune(g: GossipSub, peer: PubSubPeer, prunes: seq[ControlPrune]) = if backoff > current: g.backingOff[peer.peerId] = backoff + trace "pruning rpc received peer", peer, score = peer.score g.pruned(peer, prune.topicID) g.mesh.removePeer(prune.topicID, peer) @@ -1104,19 +1123,26 @@ method rpcHandler*(g: GossipSub, for t in msg.topicIDs: # for every topic in the message let topicParams = g.topicParams.mgetOrPut(t, TopicParams.init()) # if in mesh add more delivery score - var stats = g.peerStats[peer].topicInfos.getOrDefault(t) - if stats.inMesh: - stats.meshMessageDeliveries += 1 - if stats.meshMessageDeliveries > topicParams.meshMessageDeliveriesCap: - stats.meshMessageDeliveries = topicParams.meshMessageDeliveriesCap - - # commit back to the table - g.peerStats[peer].topicInfos[t] = stats - + g.peerStats.withValue(peer, pstats): + pstats[].topicInfos.withValue(t, stats): + if stats[].inMesh: + # TODO: take into account meshMessageDeliveriesWindow + # score only if messages are not too old. + stats[].meshMessageDeliveries += 1 + if stats[].meshMessageDeliveries > topicParams.meshMessageDeliveriesCap: + stats[].meshMessageDeliveries = topicParams.meshMessageDeliveriesCap + do: # make sure we don't loose this information + pstats[].topicInfos[t] = TopicInfo(meshMessageDeliveries: 1) + do: # make sure we don't loose this information + g.peerStats[peer] = + block: + var stats = PeerStats() + stats.topicInfos[t] = TopicInfo(meshMessageDeliveries: 1) + stats + + # onto the next message continue - g.mcache.put(msgId, msg) - if (msg.signature.len > 0 or g.verifySignature) and not msg.verify(): # always validate if signature is present or required debug "Dropping message due to failed signature verification", msgId, peer @@ -1144,24 +1170,33 @@ method rpcHandler*(g: GossipSub, of ValidationResult.Accept: discard + # store in cache only after validation + g.mcache.put(msgId, msg) + var toSendPeers = initHashSet[PubSubPeer]() for t in msg.topicIDs: # for every topic in the message let topicParams = g.topicParams.mgetOrPut(t, TopicParams.init()) - # contribute to peer score first delivery - var stats = g.peerStats[peer].topicInfos.getOrDefault(t) - stats.firstMessageDeliveries += 1 - if stats.firstMessageDeliveries > topicParams.firstMessageDeliveriesCap: - stats.firstMessageDeliveries = topicParams.firstMessageDeliveriesCap + g.peerStats.withValue(peer, pstats): + pstats[].topicInfos.withValue(t, stats): + # contribute to peer score first delivery + stats[].firstMessageDeliveries += 1 + if stats[].firstMessageDeliveries > topicParams.firstMessageDeliveriesCap: + stats[].firstMessageDeliveries = topicParams.firstMessageDeliveriesCap - # if in mesh add more delivery score - if stats.inMesh: - stats.meshMessageDeliveries += 1 - if stats.meshMessageDeliveries > topicParams.meshMessageDeliveriesCap: - stats.meshMessageDeliveries = topicParams.meshMessageDeliveriesCap - - # commit back to the table - g.peerStats[peer].topicInfos[t] = stats + # if in mesh add more delivery score + if stats[].inMesh: + stats[].meshMessageDeliveries += 1 + if stats[].meshMessageDeliveries > topicParams.meshMessageDeliveriesCap: + stats[].meshMessageDeliveries = topicParams.meshMessageDeliveriesCap + do: # make sure we don't loose this information + pstats[].topicInfos[t] = TopicInfo(firstMessageDeliveries: 1, meshMessageDeliveries: 1) + do: # make sure we don't loose this information + g.peerStats[peer] = + block: + var stats = PeerStats() + stats.topicInfos[t] = TopicInfo(firstMessageDeliveries: 1, meshMessageDeliveries: 1) + stats g.floodsub.withValue(t, peers): toSendPeers.incl(peers[]) g.mesh.withValue(t, peers): toSendPeers.incl(peers[]) @@ -1200,7 +1235,7 @@ method subscribe*(g: GossipSub, if topic in g.fanout: g.fanout.del(topic) - await g.rebalanceMesh(topic) + g.rebalanceMesh(topic) method unsubscribe*(g: GossipSub, topics: seq[TopicPair]) {.async.} = @@ -1214,6 +1249,7 @@ method unsubscribe*(g: GossipSub, g.mesh.del(topic) g.topicParams.del(topic) for peer in peers: + trace "pruning unsubscribe call peer", peer, score = peer.score g.pruned(peer, topic) let prune = RPCMsg(control: some(ControlMessage( prune: @[ControlPrune( @@ -1229,6 +1265,7 @@ method unsubscribeAll*(g: GossipSub, topic: string) {.async.} = let peers = g.mesh.getOrDefault(topic) g.mesh.del(topic) for peer in peers: + trace "pruning unsubscribeAll call peer", peer, score = peer.score g.pruned(peer, topic) let prune = RPCMsg(control: some(ControlMessage( prune: @[ControlPrune( diff --git a/libp2p/protocols/pubsub/gossipsub10.nim b/libp2p/protocols/pubsub/gossipsub10.nim index 8908fb120..3d22d76f0 100644 --- a/libp2p/protocols/pubsub/gossipsub10.nim +++ b/libp2p/protocols/pubsub/gossipsub10.nim @@ -157,7 +157,7 @@ method onPubSubPeerEvent*(p: GossipSub, peer: PubsubPeer, event: PubSubPeerEvent procCall FloodSub(p).onPubSubPeerEvent(peer, event) -proc rebalanceMesh(g: GossipSub, topic: string) {.async.} = +proc rebalanceMesh(g: GossipSub, topic: string) = logScope: topic mesh = g.mesh.peers(topic) @@ -277,7 +277,7 @@ proc heartbeat(g: GossipSub) {.async.} = trace "running heartbeat" for t in toSeq(g.topics.keys): - await g.rebalanceMesh(t) + g.rebalanceMesh(t) g.dropFanoutPeers() @@ -508,7 +508,7 @@ method subscribe*(g: GossipSub, topic: string, handler: TopicHandler) {.async.} = await procCall PubSub(g).subscribe(topic, handler) - await g.rebalanceMesh(topic) + g.rebalanceMesh(topic) method unsubscribe*(g: GossipSub, topics: seq[TopicPair]) {.async.} = diff --git a/tests/pubsub/testgossipinternal.nim b/tests/pubsub/testgossipinternal.nim index 0679631b6..5805d72ed 100644 --- a/tests/pubsub/testgossipinternal.nim +++ b/tests/pubsub/testgossipinternal.nim @@ -55,7 +55,7 @@ suite "GossipSub internal": gossipSub.gossipsub[topic].incl(peer) check gossipSub.peers.len == 15 - await gossipSub.rebalanceMesh(topic) + gossipSub.rebalanceMesh(topic) check gossipSub.mesh[topic].len == gossipSub.parameters.d # + 2 # account opportunistic grafts await allFuturesThrowing(conns.mapIt(it.close())) @@ -82,7 +82,7 @@ suite "GossipSub internal": gossipSub.mesh[topic].incl(peer) check gossipSub.mesh[topic].len == 15 - await gossipSub.rebalanceMesh(topic) + gossipSub.rebalanceMesh(topic) check gossipSub.mesh[topic].len == gossipSub.parameters.d + gossipSub.parameters.dScore await allFuturesThrowing(conns.mapIt(it.close())) diff --git a/tests/pubsub/testgossipinternal10.nim b/tests/pubsub/testgossipinternal10.nim index 7a5a46e7c..3d684202e 100644 --- a/tests/pubsub/testgossipinternal10.nim +++ b/tests/pubsub/testgossipinternal10.nim @@ -49,7 +49,7 @@ suite "GossipSub internal": gossipSub.mesh[topic].incl(peer) check gossipSub.peers.len == 15 - await gossipSub.rebalanceMesh(topic) + gossipSub.rebalanceMesh(topic) check gossipSub.mesh[topic].len == GossipSubD await allFuturesThrowing(conns.mapIt(it.close())) @@ -74,7 +74,7 @@ suite "GossipSub internal": gossipSub.mesh[topic].incl(peer) check gossipSub.mesh[topic].len == 15 - await gossipSub.rebalanceMesh(topic) + gossipSub.rebalanceMesh(topic) check gossipSub.mesh[topic].len == GossipSubD await allFuturesThrowing(conns.mapIt(it.close())) diff --git a/tests/pubsub/testgossipsub.nim b/tests/pubsub/testgossipsub.nim index 723af5382..e0bf6fdbe 100644 --- a/tests/pubsub/testgossipsub.nim +++ b/tests/pubsub/testgossipsub.nim @@ -133,7 +133,7 @@ suite "GossipSub": await allFuturesThrowing(nodesFut.concat()) - asyncTest "GossipSub validation should fail": + asyncTest "GossipSub validation should fail (reject)": proc handler(topic: string, data: seq[byte]) {.async, gcsafe.} = check false # if we get here, it should fail @@ -195,6 +195,68 @@ suite "GossipSub": await allFuturesThrowing(nodesFut.concat()) + asyncTest "GossipSub validation should fail (ignore)": + proc handler(topic: string, data: seq[byte]) {.async, gcsafe.} = + check false # if we get here, it should fail + + let + nodes = generateNodes(2, gossip = true) + + # start switches + nodesFut = await allFinished( + nodes[0].switch.start(), + nodes[1].switch.start(), + ) + + # start pubsub + await allFuturesThrowing( + allFinished( + nodes[0].start(), + nodes[1].start(), + )) + + await subscribeNodes(nodes) + + await nodes[0].subscribe("foobar", handler) + await nodes[1].subscribe("foobar", handler) + + var subs: seq[Future[void]] + subs &= waitSub(nodes[1], nodes[0], "foobar") + subs &= waitSub(nodes[0], nodes[1], "foobar") + + await allFuturesThrowing(subs) + + let gossip1 = GossipSub(nodes[0]) + let gossip2 = GossipSub(nodes[1]) + + check: + gossip1.mesh["foobar"].len == 1 and "foobar" notin gossip1.fanout + gossip2.mesh["foobar"].len == 1 and "foobar" notin gossip2.fanout + + var validatorFut = newFuture[bool]() + proc validator(topic: string, + message: Message): + Future[ValidationResult] {.async.} = + result = ValidationResult.Ignore + validatorFut.complete(true) + + nodes[1].addValidator("foobar", validator) + tryPublish await nodes[0].publish("foobar", "Hello!".toBytes()), 1 + + check (await validatorFut) == true + + await allFuturesThrowing( + nodes[0].switch.stop(), + nodes[1].switch.stop() + ) + + await allFuturesThrowing( + nodes[0].stop(), + nodes[1].stop() + ) + + await allFuturesThrowing(nodesFut.concat()) + asyncTest "GossipSub validation one fails and one succeeds": var handlerFut = newFuture[bool]() proc handler(topic: string, data: seq[byte]) {.async, gcsafe.} =