diff --git a/libp2p/protocols/pubsub/gossipsub.nim b/libp2p/protocols/pubsub/gossipsub.nim index 2f7cf8ddf..e56ac8c94 100644 --- a/libp2p/protocols/pubsub/gossipsub.nim +++ b/libp2p/protocols/pubsub/gossipsub.nim @@ -21,7 +21,7 @@ import ./pubsub, ../../peerinfo, ../../peerid, ../../utility, - ../../crypto/curve25519 + ../../switch import stew/results export results @@ -147,7 +147,9 @@ type behaviourPenaltyWeight*: float64 behaviourPenaltyDecay*: float64 - directPeers*: seq[PeerId] + directPeers*: Table[PeerId, seq[MultiAddress]] + + disconnectBadPeers*: bool GossipSub* = ref object of FloodSub mesh*: PeerTable # peers that we send messages to when we are subscribed to the topic @@ -197,9 +199,8 @@ declareGauge(libp2p_gossipsub_peers_per_topic_gossipsub, declareCounter(libp2p_gossipsub_failed_publish, "number of failed publish") declareGauge(libp2p_gossipsub_cache_window_size, "the number of messages in the cache") -when defined(libp2p_agents_metrics): - declareGauge(libp2p_gossipsub_peers_scores, "the scores of the peers in gossipsub", labels = ["agent"]) - +declareGauge(libp2p_gossipsub_peers_scores, "the scores of the peers in gossipsub", labels = ["agent"]) +declareCounter(libp2p_gossipsub_bad_score_disconnection, "the number of peers disconnected by gossipsub", labels = ["agent"]) declareGauge(libp2p_gossipsub_under_dlow_topics, "number of topics below dlow") declareGauge(libp2p_gossipsub_under_dout_topics, "number of topics below dout") declareGauge(libp2p_gossipsub_under_dhigh_above_dlow_topics, "number of topics below dhigh but above dlow") @@ -236,6 +237,7 @@ proc init*(_: type[GossipSubParams]): GossipSubParams = ipColocationFactorThreshold: 1.0, behaviourPenaltyWeight: -1.0, behaviourPenaltyDecay: 0.999, + disconnectBadPeers: false ) proc validateParameters*(parameters: GossipSubParams): Result[void, cstring] = @@ -714,24 +716,56 @@ func `/`(a, b: Duration): float64 = fb = float64(b.nanoseconds) fa / fb +proc disconnectPeer(g: GossipSub, peer: PubSubPeer) {.async.} = + when defined(libp2p_agents_metrics): + let agent = + block: + if peer.shortAgent.len > 0: + peer.shortAgent + else: + if peer.sendConn != nil: + let shortAgent = peer.sendConn.peerInfo.agentVersion.split("/")[0].toLowerAscii() + if KnownLibP2PAgentsSeq.contains(shortAgent): + peer.shortAgent = shortAgent + else: + peer.shortAgent = "unknown" + peer.shortAgent + else: + "unknown" + libp2p_gossipsub_bad_score_disconnection.inc(labelValues = [agent]) + else: + libp2p_gossipsub_bad_score_disconnection.inc(labelValues = ["unknown"]) + + if peer.sendConn != nil: + try: + await g.switch.disconnect(peer.peerId) + except CancelledError: + raise + except CatchableError as exc: + trace "Failed to close connection", peer, error = exc.name, msg = exc.msg + proc colocationFactor(g: GossipSub, peer: PubSubPeer): float64 = - if peer.connections.len == 0: - trace "colocationFactor, no connections", peer + if peer.sendConn == nil: + trace "colocationFactor, no connection", peer 0.0 else: let - address = peer.connections[0].observedAddr - ipPeers = g.peersInIP.getOrDefault(address) + address = peer.sendConn.observedAddr + + g.peersInIP.mgetOrPut(address, initHashSet[PubSubPeer]()).incl(peer) + if address notin g.peersInIP: + g.peersInIP[address] = initHashSet[PubSubPeer]() + g.peersInIP[address].incl(peer) + + let + ipPeers = g.peersInIP[address] len = ipPeers.len.float64 + if len > g.parameters.ipColocationFactorThreshold: trace "colocationFactor over threshold", peer, address, len let over = len - g.parameters.ipColocationFactorThreshold over * over else: - # lazy update peersInIP - if address notin g.peersInIP: - g.peersInIP[address] = initHashSet[PubSubPeer]() - g.peersInIP[address].incl(peer) 0.0 proc updateScores(g: GossipSub) = # avoid async @@ -837,18 +871,18 @@ proc updateScores(g: GossipSub) = # avoid async assert(g.peerStats[peer.peerId].score == peer.score) # nim sanity check trace "updated peer's score", peer, score = peer.score, n_topics, is_grafted + if g.parameters.disconnectBadPeers and stats.score < g.parameters.graylistThreshold: + debug "disconnecting bad score peer", peer, score = peer.score + asyncSpawn g.disconnectPeer(peer) + when defined(libp2p_agents_metrics): let agent = block: if peer.shortAgent.len > 0: peer.shortAgent else: - let connections = peer.connections.filterIt( - not isNil(it.peerInfo) and - it.peerInfo.agentVersion.len > 0 - ) - if connections.len > 0: - let shortAgent = connections[0].peerInfo.agentVersion.split("/")[0].toLowerAscii() + if peer.sendConn != nil: + let shortAgent = peer.sendConn.peerInfo.agentVersion.split("/")[0].toLowerAscii() if KnownLibP2PAgentsSeq.contains(shortAgent): peer.shortAgent = shortAgent else: @@ -857,6 +891,8 @@ proc updateScores(g: GossipSub) = # avoid async else: "unknown" libp2p_gossipsub_peers_scores.inc(peer.score, labelValues = [agent]) + else: + libp2p_gossipsub_peers_scores.inc(peer.score, labelValues = ["unknown"]) for peer in evicting: g.peerStats.del(peer) @@ -954,9 +990,11 @@ method unsubscribePeer*(g: GossipSub, peer: PeerID) = return # remove from peer IPs collection too - if pubSubPeer.connections.len > 0: - g.peersInIP.withValue(pubSubPeer.connections[0].observedAddr, s): + if pubSubPeer.sendConn != nil: + g.peersInIP.withValue(pubSubPeer.sendConn.observedAddr, s): s[].excl(pubSubPeer) + if s[].len == 0: + g.peersInIP.del(pubSubPeer.sendConn.observedAddr) for t in toSeq(g.gossipsub.keys): g.gossipsub.removePeer(t, pubSubPeer) @@ -1472,13 +1510,20 @@ method publish*(g: GossipSub, proc maintainDirectPeers(g: GossipSub) {.async.} = while g.heartbeatRunning: - for id in g.parameters.directPeers: + for id, addrs in g.parameters.directPeers: let peer = g.peers.getOrDefault(id) - if peer == nil: - # this creates a new peer and assigns the current switch to it - # as a result the next time we try to Send we will as well try to open a connection - # see pubsubpeer.nim send and such - discard g.getOrCreatePeer(id, g.codecs) + if isNil(peer): + trace "Attempting to dial a direct peer", peer = id + try: + # dial, internally connection will be stored + let _ = await g.switch.dial(id, addrs, g.codecs) + # populate the peer after it's connected + discard g.getOrCreatePeer(id, g.codecs) + except CancelledError: + trace "Direct peer dial canceled" + raise + except CatchableError as exc: + debug "Direct peer error dialing", msg = exc.msg await sleepAsync(1.minutes) diff --git a/libp2p/protocols/pubsub/pubsubpeer.nim b/libp2p/protocols/pubsub/pubsubpeer.nim index b1c925ebd..0ef2013f9 100644 --- a/libp2p/protocols/pubsub/pubsubpeer.nim +++ b/libp2p/protocols/pubsub/pubsubpeer.nim @@ -48,7 +48,6 @@ type onEvent*: OnEvent # Connectivity updates for peer codec*: string # the protocol that this peer joined from sendConn*: Connection # cached send connection - connections*: seq[Connection] # connections to this peer peerId*: PeerID handler*: RPCHandler observers*: ref seq[PubSubObserver] # ref as in smart_ptr @@ -59,7 +58,7 @@ type outbound*: bool # if this is an outbound connection appScore*: float64 # application specific score behaviourPenalty*: float64 # the eventual penalty score - + when defined(libp2p_agents_metrics): shortAgent*: string diff --git a/libp2p/switch.nim b/libp2p/switch.nim index 89c434d27..7e8914993 100644 --- a/libp2p/switch.nim +++ b/libp2p/switch.nim @@ -57,7 +57,7 @@ type Switch* = ref object of RootObj peerInfo*: PeerInfo - connManager: ConnManager + connManager*: ConnManager transports*: seq[Transport] protocols*: seq[LPProtocol] muxers*: Table[string, MuxerProvider] @@ -244,7 +244,9 @@ proc upgradeIncoming(s: Switch, incomingConn: Connection) {.async, gcsafe.} = # await ms.handle(cconn) except CatchableError as exc: debug "Exception in secure handler during incoming upgrade", msg = exc.msg, conn - if not cconn.upgraded.finished: + if not isNil(cconn) and + not isNil(cconn.upgraded) and + not(cconn.upgraded.finished): cconn.upgraded.fail(exc) finally: if not isNil(cconn): @@ -263,10 +265,13 @@ proc upgradeIncoming(s: Switch, incomingConn: Connection) {.async, gcsafe.} = # await ms.handle(incomingConn, active = true) except CatchableError as exc: debug "Exception upgrading incoming", exc = exc.msg - if not incomingConn.upgraded.finished: + if not isNil(incomingConn) and + not isNil(incomingConn.upgraded) and + not(incomingConn.upgraded.finished): incomingConn.upgraded.fail(exc) finally: - await incomingConn.close() + if not isNil(incomingConn): + await incomingConn.close() proc dialAndUpgrade(s: Switch, peerId: PeerID, diff --git a/tests/pubsub/testgossipinternal.nim b/tests/pubsub/testgossipinternal.nim index 6ee772934..810a9f847 100644 --- a/tests/pubsub/testgossipinternal.nim +++ b/tests/pubsub/testgossipinternal.nim @@ -9,6 +9,7 @@ import ../../libp2p/standard_setup import ../../libp2p/errors import ../../libp2p/crypto/crypto import ../../libp2p/stream/bufferstream +import ../../libp2p/switch import ../helpers @@ -422,9 +423,6 @@ suite "GossipSub internal": check false let topic = "foobar" - # gossipSub.topicParams[topic] = TopicParams.init() - # gossipSub.mesh[topic] = initHashSet[PubSubPeer]() - # gossipSub.fanout[topic] = initHashSet[PubSubPeer]() var conns = newSeq[Connection]() for i in 0..<30: let conn = newBufferStream(noop) @@ -451,3 +449,39 @@ suite "GossipSub internal": await allFuturesThrowing(conns.mapIt(it.close())) await gossipSub.switch.stop() + + asyncTest "Disconnect bad peers": + let gossipSub = TestGossipSub.init(newStandardSwitch()) + gossipSub.parameters.disconnectBadPeers = true + + proc handler(peer: PubSubPeer, msg: RPCMsg) {.async.} = + check false + + let topic = "foobar" + var conns = newSeq[Connection]() + for i in 0..<30: + let conn = newBufferStream(noop) + conns &= conn + let peerInfo = randomPeerInfo() + conn.peerInfo = peerInfo + let peer = gossipSub.getPubSubPeer(peerInfo.peerId) + gossipSub.onNewPeer(peer) + peer.sendConn = conn + peer.handler = handler + peer.score = gossipSub.parameters.graylistThreshold - 1 + gossipSub.gossipsub.mgetOrPut(topic, initHashSet[PubSubPeer]()).incl(peer) + gossipSub.peers[peerInfo.peerId] = peer + gossipSub.switch.connManager.storeIncoming(conn) + + gossipSub.updateScores() + + await sleepAsync(100.millis) + + check: + # test our disconnect mechanics + gossipSub.gossipsub.peers(topic) == 0 + # also ensure we cleanup properly the peersInIP table + gossipSub.peersInIP.len == 0 + + await allFuturesThrowing(conns.mapIt(it.close())) + await gossipSub.switch.stop() diff --git a/tests/pubsub/testgossipsub.nim b/tests/pubsub/testgossipsub.nim index 921f25f9b..5748d9db9 100644 --- a/tests/pubsub/testgossipsub.nim +++ b/tests/pubsub/testgossipsub.nim @@ -716,3 +716,60 @@ suite "GossipSub": ) await allFuturesThrowing(nodesFut.concat()) + + asyncTest "GossipSub test directPeers": + var handlerFut = newFuture[bool]() + proc handler(topic: string, data: seq[byte]) {.async, gcsafe.} = + check topic == "foobar" + handlerFut.complete(true) + + let + nodes = generateNodes(2, gossip = true) + + # start switches + nodesFut = await allFinished( + nodes[0].switch.start(), + nodes[1].switch.start(), + ) + + var gossip = GossipSub(nodes[0]) + gossip.parameters.directPeers[nodes[1].switch.peerInfo.peerId] = nodes[1].switch.peerInfo.addrs + + # start pubsub + await allFuturesThrowing( + allFinished( + nodes[0].start(), + nodes[1].start(), + )) + + let invalidDetected = newFuture[void]() + gossip.subscriptionValidator = + proc(topic: string): bool = + if topic == "foobar": + try: + invalidDetected.complete() + except: + raise newException(Defect, "Exception during subscriptionValidator") + false + else: + true + + # DO NOT SUBSCRIBE, CONNECTION SHOULD HAPPEN + ### await subscribeNodes(nodes) + + nodes[0].subscribe("foobar", handler) + nodes[1].subscribe("foobar", handler) + + await invalidDetected.wait(10.seconds) + + await allFuturesThrowing( + nodes[0].switch.stop(), + nodes[1].switch.stop() + ) + + await allFuturesThrowing( + nodes[0].stop(), + nodes[1].stop() + ) + + await allFuturesThrowing(nodesFut.concat())