diff --git a/.appveyor.yml b/.appveyor.yml index 251d4be..0e604b0 100644 --- a/.appveyor.yml +++ b/.appveyor.yml @@ -39,7 +39,7 @@ install: - CD .. # install and build go-libp2p-daemon - - bash scripts/build_p2pd.sh p2pdCache v0.2.4 + - bash scripts/build_p2pd.sh p2pdCache v0.3.0 build_script: - nimble install -y --depsOnly diff --git a/.gitignore b/.gitignore index cadf353..2ca1f19 100644 --- a/.gitignore +++ b/.gitignore @@ -10,3 +10,5 @@ build/ *.exe *.dll .vscode/ +.DS_Store +tests/pubsub/testgossipsub diff --git a/.travis.yml b/.travis.yml index 8fc1f77..da8517f 100644 --- a/.travis.yml +++ b/.travis.yml @@ -45,7 +45,7 @@ install: - export PATH="$PWD/Nim/bin:$GOPATH/bin:$PATH" # install and build go-libp2p-daemon - - bash scripts/build_p2pd.sh p2pdCache v0.2.4 + - bash scripts/build_p2pd.sh p2pdCache v0.3.0 script: - nimble install -y --depsOnly diff --git a/azure-pipelines.yml b/azure-pipelines.yml index 3c1eedd..3ffad46 100644 --- a/azure-pipelines.yml +++ b/azure-pipelines.yml @@ -112,7 +112,7 @@ steps: echo "PATH=${PATH}" # we can't seem to be able to build a 32-bit p2pd - env PATH="/c/custom/mingw64/bin:${PATH}" bash scripts/build_p2pd.sh p2pdCache v0.2.4 + env PATH="/c/custom/mingw64/bin:${PATH}" bash scripts/build_p2pd.sh p2pdCache v0.3.0 # install dependencies nimble refresh diff --git a/examples/directchat.nim b/examples/directchat.nim index 200fbaf..dc48aa5 100644 --- a/examples/directchat.nim +++ b/examples/directchat.nim @@ -124,7 +124,7 @@ proc readWriteLoop(p: ChatProto) {.async.} = asyncCheck p.readAndPrint() proc newChatProto(switch: Switch, transp: StreamTransport): ChatProto = - var chatproto = ChatProto(switch: switch, transp: transp, codec: ChatCodec) + var chatproto = ChatProto(switch: switch, transp: transp, codecs: @[ChatCodec]) # create handler for incoming connection proc handle(stream: Connection, proto: string) {.async.} = diff --git a/libp2p.nimble b/libp2p.nimble index 4b42427..c21ebc7 100644 --- a/libp2p.nimble +++ b/libp2p.nimble @@ -47,8 +47,12 @@ task testinterop, "Runs interop tests": runTest("testinterop") task testpubsub, "Runs pubsub tests": + runTest("pubsub/testgossipinternal", sign = false, verify = false, moreoptions = "-d:pubsub_internal_testing") runTest("pubsub/testpubsub") runTest("pubsub/testpubsub", sign = false, verify = false) + runTest("pubsub/testgossipinternal10", sign = false, verify = false, moreoptions = "-d:pubsub_internal_testing") + runTest("pubsub/testpubsub", moreoptions = "-d:fallback_gossipsub_10") + runTest("pubsub/testpubsub", sign = false, verify = false, moreoptions = "-d:fallback_gossipsub_10") task testfilter, "Run PKI filter test": runTest("testpkifilter", diff --git a/libp2p/daemon/daemonapi.nim b/libp2p/daemon/daemonapi.nim index 8aee778..8784d93 100644 --- a/libp2p/daemon/daemonapi.nim +++ b/libp2p/daemon/daemonapi.nim @@ -8,8 +8,8 @@ ## those terms. ## This module implementes API for `go-libp2p-daemon`. -import os, osproc, strutils, tables, strtabs -import chronos +import std/[os, osproc, strutils, tables, strtabs] +import chronos, chronicles import ../varint, ../multiaddress, ../multicodec, ../cid, ../peerid import ../wire, ../multihash, ../protobuf/minprotobuf import ../crypto/crypto @@ -737,10 +737,12 @@ proc newDaemonApi*(flags: set[P2PDaemonFlags] = {}, opt.add $address args.add(opt) args.add("-noise=true") + args.add("-quic=false") args.add("-listen=" & $api.address) # We are trying to get absolute daemon path. let cmd = findExe(daemon) + trace "p2pd cmd", cmd, args if len(cmd) == 0: raise newException(DaemonLocalError, "Could not find daemon executable!") diff --git a/libp2p/multiaddress.nim b/libp2p/multiaddress.nim index f07c2a2..859c119 100644 --- a/libp2p/multiaddress.nim +++ b/libp2p/multiaddress.nim @@ -11,7 +11,7 @@ {.push raises: [Defect].} -import nativesockets +import nativesockets, hashes import tables, strutils, stew/shims/net import chronos import multicodec, multihash, multibase, transcoder, vbuffer, peerid, @@ -56,6 +56,13 @@ const IPPROTO_TCP = Protocol.IPPROTO_TCP IPPROTO_UDP = Protocol.IPPROTO_UDP +proc hash*(a: MultiAddress): Hash = + var h: Hash = 0 + h = h !& hash(a.data.buffer) + h = h !& hash(a.data.offset) + h = h !& hash(a.data.length) + !$h + proc ip4StB(s: string, vb: var VBuffer): bool = ## IPv4 stringToBuffer() implementation. try: diff --git a/libp2p/multistream.nim b/libp2p/multistream.nim index 525ffe9..baa46f5 100644 --- a/libp2p/multistream.nim +++ b/libp2p/multistream.nim @@ -7,7 +7,7 @@ ## This file may not be copied, modified, or distributed except according to ## those terms. -import strutils +import std/[strutils] import chronos, chronicles, stew/byteutils import stream/connection, vbuffer, @@ -28,7 +28,7 @@ type Matcher* = proc (proto: string): bool {.gcsafe.} HandlerHolder* = object - proto*: string + protos*: seq[string] protocol*: LPProtocol match*: Matcher @@ -147,7 +147,8 @@ proc handle*(m: MultistreamSelect, conn: Connection, active: bool = false) {.asy trace "handle: listing protos", conn var protos = "" for h in m.handlers: - protos &= (h.proto & "\n") + for proto in h.protos: + protos &= (proto & "\n") await conn.writeLp(protos) of Codec: if not handshaked: @@ -159,9 +160,9 @@ proc handle*(m: MultistreamSelect, conn: Connection, active: bool = false) {.asy await conn.write(Na) else: for h in m.handlers: - if (not isNil(h.match) and h.match(ms)) or ms == h.proto: + if (not isNil(h.match) and h.match(ms)) or h.protos.contains(ms): trace "found handler", conn, protocol = ms - await conn.writeLp((h.proto & "\n")) + await conn.writeLp(ms & "\n") await h.protocol.handler(conn, ms) return debug "no handlers", conn, protocol = ms @@ -175,27 +176,31 @@ proc handle*(m: MultistreamSelect, conn: Connection, active: bool = false) {.asy trace "Stopped multistream handler", conn +proc addHandler*(m: MultistreamSelect, + codecs: seq[string], + protocol: LPProtocol, + matcher: Matcher = nil) = + trace "registering protocols", protos = codecs + m.handlers.add(HandlerHolder(protos: codecs, + protocol: protocol, + match: matcher)) + proc addHandler*(m: MultistreamSelect, codec: string, protocol: LPProtocol, matcher: Matcher = nil) = - ## register a protocol - trace "registering protocol", codec = codec - m.handlers.add(HandlerHolder(proto: codec, - protocol: protocol, - match: matcher)) + addHandler(m, @[codec], protocol, matcher) proc addHandler*(m: MultistreamSelect, codec: string, handler: LPProtoHandler, matcher: Matcher = nil) = ## helper to allow registering pure handlers - - trace "registering proto handler", codec = codec + trace "registering proto handler", proto = codec let protocol = new LPProtocol protocol.codec = codec protocol.handler = handler - m.handlers.add(HandlerHolder(proto: codec, + m.handlers.add(HandlerHolder(protos: @[codec], protocol: protocol, match: matcher)) diff --git a/libp2p/muxers/mplex/mplex.nim b/libp2p/muxers/mplex/mplex.nim index f5b18b3..850834b 100644 --- a/libp2p/muxers/mplex/mplex.nim +++ b/libp2p/muxers/mplex/mplex.nim @@ -185,7 +185,7 @@ method handle*(m: Mplex) {.async, gcsafe.} = await channel.reset() except CancelledError: # This procedure is spawned as task and it is not part of public API, so - # there no way for this procedure to be cancelled implicitely. + # there no way for this procedure to be cancelled implicitly. debug "Unexpected cancellation in mplex handler", m except LPStreamEOFError as exc: trace "Stream EOF", m, msg = exc.msg diff --git a/libp2p/protocols/identify.nim b/libp2p/protocols/identify.nim index 3c7ce23..8a5aa27 100644 --- a/libp2p/protocols/identify.nim +++ b/libp2p/protocols/identify.nim @@ -87,7 +87,7 @@ proc decodeMsg*(buf: seq[byte]): Option[IdentifyInfo] = iinfo.protoVersion = some(protoVersion) if r6.get(): iinfo.agentVersion = some(agentVersion) - trace "decodeMsg: decoded message", pubkey = ($pubKey).shortLog, + debug "decodeMsg: decoded message", pubkey = ($pubKey).shortLog, addresses = $iinfo.addrs, protocols = $iinfo.protos, observable_address = $iinfo.observedAddr, proto_version = $iinfo.protoVersion, diff --git a/libp2p/protocols/protocol.nim b/libp2p/protocols/protocol.nim index 30e1c36..79badf3 100644 --- a/libp2p/protocols/protocol.nim +++ b/libp2p/protocols/protocol.nim @@ -16,7 +16,16 @@ type Future[void] {.gcsafe, closure.} LPProtocol* = ref object of RootObj - codec*: string + codecs*: seq[string] handler*: LPProtoHandler ## this handler gets invoked by the protocol negotiator method init*(p: LPProtocol) {.base, gcsafe.} = discard + +func codec*(p: LPProtocol): string = + assert(p.codecs.len > 0, "Codecs sequence was empty!") + p.codecs[0] + +func `codec=`*(p: LPProtocol, codec: string) = + # always insert as first codec + # if we use this abstraction + p.codecs.insert(codec, 0) diff --git a/libp2p/protocols/pubsub/gossipsub.nim b/libp2p/protocols/pubsub/gossipsub.nim index af6fe77..c54181e 100644 --- a/libp2p/protocols/pubsub/gossipsub.nim +++ b/libp2p/protocols/pubsub/gossipsub.nim @@ -7,7 +7,7 @@ ## This file may not be copied, modified, or distributed except according to ## those terms. -import std/[options, random, sequtils, sets, tables] +import std/[tables, sets, options, sequtils, random, algorithm] import chronos, chronicles, metrics import ./pubsub, ./floodsub, @@ -21,33 +21,119 @@ import ./pubsub, ../../peerinfo, ../../peerid, ../../utility +import stew/results +export results logScope: topics = "gossipsub" -const GossipSubCodec* = "/meshsub/1.0.0" +const + GossipSubCodec* = "/meshsub/1.1.0" + GossipSubCodec_10* = "/meshsub/1.0.0" # overlay parameters -const GossipSubD* = 6 -const GossipSubDlo* = 4 -const GossipSubDhi* = 12 +const + GossipSubD* = 6 + GossipSubDlo* = 4 + GossipSubDhi* = 12 # gossip parameters -const GossipSubHistoryLength* = 5 -const GossipSubHistoryGossip* = 3 +const + GossipSubHistoryLength* = 5 + GossipSubHistoryGossip* = 3 + GossipBackoffPeriod* = 1.minutes # heartbeat interval -const GossipSubHeartbeatInitialDelay* = 100.millis -const GossipSubHeartbeatInterval* = 1.seconds +const + GossipSubHeartbeatInitialDelay* = 100.millis + GossipSubHeartbeatInterval* = 1.seconds # fanout ttl -const GossipSubFanoutTTL* = 1.minutes +const + GossipSubFanoutTTL* = 1.minutes + +const + BackoffSlackTime = 2 # seconds + IWantPeerBudget = 25 # 25 messages per second ( reset every heartbeat ) + IHavePeerBudget = 10 type + TopicInfo* = object + # gossip 1.1 related + graftTime: Moment + meshTime: Duration + inMesh: bool + meshMessageDeliveriesActive: bool + firstMessageDeliveries: float64 + meshMessageDeliveries: float64 + meshFailurePenalty: float64 + invalidMessageDeliveries: float64 + + TopicParams* = object + topicWeight*: float64 + + # p1 + timeInMeshWeight*: float64 + timeInMeshQuantum*: Duration + timeInMeshCap*: float64 + + # p2 + firstMessageDeliveriesWeight*: float64 + firstMessageDeliveriesDecay*: float64 + firstMessageDeliveriesCap*: float64 + + # p3 + meshMessageDeliveriesWeight*: float64 + meshMessageDeliveriesDecay*: float64 + meshMessageDeliveriesThreshold*: float64 + meshMessageDeliveriesCap*: float64 + meshMessageDeliveriesActivation*: Duration + meshMessageDeliveriesWindow*: Duration + + # p3b + meshFailurePenaltyWeight*: float64 + meshFailurePenaltyDecay*: float64 + + # p4 + invalidMessageDeliveriesWeight*: float64 + invalidMessageDeliveriesDecay*: float64 + + PeerStats* = object + topicInfos*: Table[string, TopicInfo] + expire*: Moment # updated on disconnect, to retain scores until expire + + GossipSubParams* = object + explicit: bool + pruneBackoff*: Duration + floodPublish*: bool + gossipFactor*: float64 + dScore*: int + dOut*: int + dLazy*: int + + gossipThreshold*: float64 + publishThreshold*: float64 + graylistThreshold*: float64 + acceptPXThreshold*: float64 + opportunisticGraftThreshold*: float64 + decayInterval*: Duration + decayToZero*: float64 + retainScore*: Duration + + appSpecificWeight*: float64 + ipColocationFactorWeight*: float64 + ipColocationFactorThreshold*: float64 + behaviourPenaltyWeight*: float64 + behaviourPenaltyDecay*: float64 + + directPeers*: seq[PeerId] + GossipSub* = ref object of FloodSub mesh*: PeerTable # peers that we send messages to when we are subscribed to the topic fanout*: PeerTable # peers that we send messages to when we're not subscribed to the topic gossipsub*: PeerTable # peers that are subscribed to a topic + explicit*: PeerTable # directpeers that we keep alive explicitly + backingOff*: Table[PeerID, Moment] # explicit (always connected/forward) peers lastFanoutPubSub*: Table[string, Moment] # last publish time for fanout topics gossip*: Table[string, seq[ControlIHave]] # pending gossip control*: Table[string, ControlMessage] # pending control messages @@ -55,6 +141,17 @@ type heartbeatFut: Future[void] # cancellation future for heartbeat interval heartbeatRunning: bool + peerStats: Table[PubSubPeer, PeerStats] + parameters*: GossipSubParams + topicParams*: Table[string, TopicParams] + directPeersLoop: Future[void] + peersInIP: Table[MultiAddress, HashSet[PubSubPeer]] + + heartbeatEvents*: seq[AsyncEvent] + + when not defined(release): + prunedPeers: HashSet[PubSubPeer] + when defined(libp2p_expensive_metrics): declareGauge(libp2p_gossipsub_peers_per_topic_mesh, "gossipsub peers per topic in mesh", @@ -68,6 +165,103 @@ when defined(libp2p_expensive_metrics): "gossipsub peers per topic in gossipsub", labels = ["topic"]) +proc init*(_: type[GossipSubParams]): GossipSubParams = + GossipSubParams( + explicit: true, + pruneBackoff: 1.minutes, + floodPublish: true, + gossipFactor: 0.25, + dScore: 4, + dOut: GossipSubDlo - 1, + dLazy: GossipSubD, + gossipThreshold: -10, + publishThreshold: -100, + graylistThreshold: -10000, + opportunisticGraftThreshold: 1, + decayInterval: 1.seconds, + decayToZero: 0.01, + retainScore: 10.seconds, + appSpecificWeight: 0.0, + ipColocationFactorWeight: 0.0, + ipColocationFactorThreshold: 1.0, + behaviourPenaltyWeight: -1.0, + behaviourPenaltyDecay: 0.999, + ) + +proc validateParameters*(parameters: GossipSubParams): Result[void, cstring] = + if (parameters.dOut >= GossipSubDlo) or + (parameters.dOut > (GossipSubD div 2)): + err("gossipsub: dOut parameter error, Number of outbound connections to keep in the mesh. Must be less than D_lo and at most D/2") + elif parameters.gossipThreshold >= 0: + err("gossipsub: gossipThreshold parameter error, Must be < 0") + elif parameters.publishThreshold >= parameters.gossipThreshold: + err("gossipsub: publishThreshold parameter error, Must be < gossipThreshold") + elif parameters.graylistThreshold >= parameters.publishThreshold: + err("gossipsub: graylistThreshold parameter error, Must be < publishThreshold") + elif parameters.acceptPXThreshold < 0: + err("gossipsub: acceptPXThreshold parameter error, Must be >= 0") + elif parameters.opportunisticGraftThreshold < 0: + err("gossipsub: opportunisticGraftThreshold parameter error, Must be >= 0") + elif parameters.decayToZero > 0.5 or parameters.decayToZero <= 0.0: + err("gossipsub: decayToZero parameter error, Should be close to 0.0") + elif parameters.appSpecificWeight < 0: + err("gossipsub: appSpecificWeight parameter error, Must be positive") + elif parameters.ipColocationFactorWeight > 0: + err("gossipsub: ipColocationFactorWeight parameter error, Must be negative or 0") + elif parameters.ipColocationFactorThreshold < 1.0: + err("gossipsub: ipColocationFactorThreshold parameter error, Must be at least 1") + elif parameters.behaviourPenaltyWeight >= 0: + err("gossipsub: behaviourPenaltyWeight parameter error, Must be negative") + elif parameters.behaviourPenaltyDecay < 0 or parameters.behaviourPenaltyDecay >= 1: + err("gossipsub: behaviourPenaltyDecay parameter error, Must be between 0 and 1") + else: + ok() + +proc init*(_: type[TopicParams]): TopicParams = + TopicParams( + topicWeight: 0.0, # disable score + timeInMeshWeight: 0.01, + timeInMeshQuantum: 1.seconds, + timeInMeshCap: 10.0, + firstMessageDeliveriesWeight: 1.0, + firstMessageDeliveriesDecay: 0.5, + firstMessageDeliveriesCap: 10.0, + meshMessageDeliveriesWeight: -1.0, + meshMessageDeliveriesDecay: 0.5, + meshMessageDeliveriesCap: 10, + meshMessageDeliveriesThreshold: 1, + meshMessageDeliveriesWindow: 5.milliseconds, + meshMessageDeliveriesActivation: 10.seconds, + meshFailurePenaltyWeight: -1.0, + meshFailurePenaltyDecay: 0.5, + invalidMessageDeliveriesWeight: -1.0, + invalidMessageDeliveriesDecay: 0.5 + ) + +proc validateParameters*(parameters: TopicParams): Result[void, cstring] = + if parameters.timeInMeshWeight <= 0.0 or parameters.timeInMeshWeight > 1.0: + err("gossipsub: timeInMeshWeight parameter error, Must be a small positive value") + elif parameters.timeInMeshCap <= 0.0: + err("gossipsub: timeInMeshCap parameter error, Should be a positive value") + elif parameters.firstMessageDeliveriesWeight <= 0.0: + err("gossipsub: firstMessageDeliveriesWeight parameter error, Should be a positive value") + elif parameters.meshMessageDeliveriesWeight >= 0.0: + err("gossipsub: meshMessageDeliveriesWeight parameter error, Should be a negative value") + elif parameters.meshMessageDeliveriesThreshold <= 0.0: + err("gossipsub: meshMessageDeliveriesThreshold parameter error, Should be a positive value") + elif parameters.meshMessageDeliveriesCap < parameters.meshMessageDeliveriesThreshold: + err("gossipsub: meshMessageDeliveriesCap parameter error, Should be >= meshMessageDeliveriesThreshold") + elif parameters.meshMessageDeliveriesWindow > 100.milliseconds: + err("gossipsub: meshMessageDeliveriesWindow parameter error, Should be small, 1-5ms") + elif parameters.meshFailurePenaltyWeight >= 0.0: + err("gossipsub: meshFailurePenaltyWeight parameter error, Should be a negative value") + elif parameters.invalidMessageDeliveriesWeight >= 0.0: + err("gossipsub: invalidMessageDeliveriesWeight parameter error, Should be a negative value") + else: + ok() + +func byScore(x,y: PubSubPeer): int = (x.score - y.score).int + method init*(g: GossipSub) = proc handler(conn: Connection, proto: string) {.async.} = ## main protocol handler that gets triggered on every @@ -84,7 +278,67 @@ method init*(g: GossipSub) = trace "GossipSub handler leaks an error", exc = exc.msg, conn g.handler = handler - g.codec = GossipSubCodec + g.codecs &= GossipSubCodec + g.codecs &= GossipSubCodec_10 + +method onNewPeer(g: GossipSub, peer: PubSubPeer) = + if peer notin g.peerStats: + # new peer + g.peerStats[peer] = PeerStats() + peer.iWantBudget = IWantPeerBudget + peer.iHaveBudget = IHavePeerBudget + return + else: + # we knew this peer + discard + +proc grafted(g: GossipSub, p: PubSubPeer, topic: string) = + g.peerStats.withValue(p, stats) do: + var info = stats.topicInfos.getOrDefault(topic) + info.graftTime = Moment.now() + info.meshTime = 0.seconds + info.inMesh = true + info.meshMessageDeliveriesActive = false + + # mgetOrPut does not work, so we gotta do this without referencing + stats.topicInfos[topic] = info + assert(g.peerStats[p].topicInfos[topic].inMesh == true) + + trace "grafted", p + do: + g.onNewPeer(p) + g.grafted(p, topic) + +proc pruned(g: GossipSub, p: PubSubPeer, topic: string) = + g.peerStats.withValue(p, stats) do: + when not defined(release): + g.prunedPeers.incl(p) + + if topic in stats.topicInfos: + var info = stats.topicInfos[topic] + let topicParams = g.topicParams.mgetOrPut(topic, TopicParams.init()) + + # penalize a peer that delivered no message + let threshold = topicParams.meshMessageDeliveriesThreshold + if info.inMesh and info.meshMessageDeliveriesActive and info.meshMessageDeliveries < threshold: + let deficit = threshold - info.meshMessageDeliveries + info.meshFailurePenalty += deficit * deficit + + info.inMesh = false + + # mgetOrPut does not work, so we gotta do this without referencing + stats.topicInfos[topic] = info + + trace "pruned", p + +proc peerExchangeList(g: GossipSub, topic: string): seq[PeerInfoMsg] = + var peers = g.gossipsub.getOrDefault(topic, initHashSet[PubSubPeer]()).toSeq() + peers.keepIf do (x: PubSubPeer) -> bool: + x.score >= 0.0 + # by spec, larger then Dhi, but let's put some hard caps + peers.setLen(min(peers.len, GossipSubDhi * 2)) + peers.map do (x: PubSubPeer) -> PeerInfoMsg: + PeerInfoMsg(peerID: x.peerId.getBytes()) proc replenishFanout(g: GossipSub, topic: string) = ## get fanout peers for a topic @@ -116,9 +370,10 @@ proc rebalanceMesh(g: GossipSub, topic: string) {.async.} = # create a mesh topic that we're subscribing to var - grafts, prunes: seq[PubSubPeer] + grafts, prunes, grafting: seq[PubSubPeer] - if g.mesh.peers(topic) < GossipSubDlo: + let npeers = g.mesh.peers(topic) + if npeers < GossipSubDlo: trace "replenishing mesh", peers = g.mesh.peers(topic) # replenish the mesh if we're below Dlo grafts = toSeq( @@ -126,27 +381,142 @@ proc rebalanceMesh(g: GossipSub, topic: string) {.async.} = g.mesh.getOrDefault(topic, initHashSet[PubSubPeer]()) ) + grafts.keepIf do (x: PubSubPeer) -> bool: + # avoid negative score peers + x.score >= 0.0 and + # don't pick explicit peers + x.peerId notin g.parameters.directPeers and + # and avoid peers we are backing off + x.peerId notin g.backingOff + + # shuffle anyway, score might be not used shuffle(grafts) + # sort peers by score + grafts.sort(byScore) + # Graft peers so we reach a count of D grafts.setLen(min(grafts.len, GossipSubD - g.mesh.peers(topic))) trace "grafting", grafts = grafts.len + for peer in grafts: + if g.mesh.addPeer(topic, peer): + g.grafted(peer, topic) + g.fanout.removePeer(topic, peer) + grafting &= peer + + elif npeers < g.parameters.dOut: + trace "replenishing mesh outbound quota", peers = g.mesh.peers(topic) + # replenish the mesh if we're below Dlo + grafts = toSeq( + g.gossipsub.getOrDefault(topic, initHashSet[PubSubPeer]()) - + g.mesh.getOrDefault(topic, initHashSet[PubSubPeer]()) + ) + + grafts.keepIf do (x: PubSubPeer) -> bool: + # get only outbound ones + x.outbound and + # avoid negative score peers + x.score >= 0.0 and + # don't pick explicit peers + x.peerId notin g.parameters.directPeers and + # and avoid peers we are backing off + x.peerId notin g.backingOff + + # shuffle anyway, score might be not used + shuffle(grafts) + + # sort peers by score + grafts.sort(byScore) + + # Graft peers so we reach a count of D + grafts.setLen(min(grafts.len, g.parameters.dOut - g.mesh.peers(topic))) + + trace "grafting outbound peers", topic, peers = grafts.len for peer in grafts: if g.mesh.addPeer(topic, peer): + g.grafted(peer, topic) g.fanout.removePeer(topic, peer) + grafting &= peer + if g.mesh.peers(topic) > GossipSubDhi: # prune peers if we've gone over Dhi prunes = toSeq(g.mesh[topic]) + + # shuffle anyway, score might be not used shuffle(prunes) - prunes.setLen(prunes.len - GossipSubD) # .. down to D peers - trace "pruning", prunes = prunes.len - for peer in prunes: - g.mesh.removePeer(topic, peer) + # sort peers by score (inverted) + prunes.sort(byScore) + # keep high score peers + if prunes.len > g.parameters.dScore: + prunes.setLen(prunes.len - g.parameters.dScore) + # we must try to keep outbound peers + # to keep an outbound mesh quota + # so we try to first prune inbound peers + # if none we add up some outbound + var outbound: seq[PubSubPeer] + var inbound: seq[PubSubPeer] + for peer in prunes: + if peer.outbound: + outbound &= peer + else: + inbound &= peer + + let pruneLen = inbound.len - GossipSubD + if pruneLen > 0: + # Ok we got some peers to prune, + # for this heartbeat let's prune those + shuffle(inbound) + inbound.setLen(pruneLen) + else: + # We could not find any inbound to prune + # Yet we are on Hi, so we need to cull outbound peers + let keepDOutLen = outbound.len - g.parameters.dOut + if keepDOutLen > 0: + shuffle(outbound) + outbound.setLen(keepDOutLen) + inbound &= outbound + + trace "pruning", prunes = inbound.len + for peer in inbound: + g.pruned(peer, topic) + g.mesh.removePeer(topic, peer) + + # opportunistic grafting, by spec mesh should not be empty... + if g.mesh.peers(topic) > 1: + var peers = toSeq(g.mesh[topic]) + peers.sort(byScore) + let medianIdx = peers.len div 2 + let median = peers[medianIdx] + if median.score < g.parameters.opportunisticGraftThreshold: + trace "median score below opportunistic threshold", score = median.score + var avail = toSeq( + g.gossipsub.getOrDefault(topic, initHashSet[PubSubPeer]()) - + g.mesh.getOrDefault(topic, initHashSet[PubSubPeer]()) + ) + + avail.keepIf do (x: PubSubPeer) -> bool: + # avoid negative score peers + x.score >= median.score and + # don't pick explicit peers + x.peerId notin g.parameters.directPeers and + # and avoid peers we are backing off + x.peerId notin g.backingOff + + # by spec, grab only 2 + if avail.len > 2: + avail.setLen(2) + + for peer in avail: + if g.mesh.addPeer(topic, peer): + g.grafted(peer, topic) + grafting &= peer + trace "opportunistic grafting", peer = $peer + when defined(libp2p_expensive_metrics): libp2p_gossipsub_peers_per_topic_gossipsub .set(g.gossipsub.peers(topic).int64, labelValues = [topic]) @@ -164,7 +534,11 @@ proc rebalanceMesh(g: GossipSub, topic: string) {.async.} = let graft = RPCMsg(control: some(ControlMessage(graft: @[ControlGraft(topicID: topic)]))) g.broadcast(grafts, graft) if prunes.len > 0: - let prune = RPCMsg(control: some(ControlMessage(prune: @[ControlPrune(topicID: topic)]))) + let prune = RPCMsg(control: some(ControlMessage( + prune: @[ControlPrune( + topicID: topic, + peers: g.peerExchangeList(topic), + backoff: g.parameters.pruneBackoff.seconds.uint64)]))) g.broadcast(prunes, prune) proc dropFanoutPeers(g: GossipSub) = @@ -188,43 +562,204 @@ proc getGossipPeers(g: GossipSub): Table[PubSubPeer, ControlMessage] {.gcsafe.} trace "getting gossip peers (iHave)" let topics = toHashSet(toSeq(g.mesh.keys)) + toHashSet(toSeq(g.fanout.keys)) - let controlMsg = ControlMessage() for topic in topics: - var allPeers = toSeq(g.gossipsub.getOrDefault(topic)) - shuffle(allPeers) + if topic notin g.gossipsub: + trace "topic not in gossip array, skipping", topicID = topic + continue - let mesh = g.mesh.getOrDefault(topic) - let fanout = g.fanout.getOrDefault(topic) - - let gossipPeers = mesh + fanout let mids = g.mcache.window(topic) if not mids.len > 0: continue - if topic notin g.gossipsub: - trace "topic not in gossip array, skipping", topic - continue - let ihave = ControlIHave(topicID: topic, messageIDs: toSeq(mids)) - for peer in allPeers: - if result.len >= GossipSubD: - trace "got gossip peers", peers = result.len - break - if peer in gossipPeers: + let mesh = g.mesh.getOrDefault(topic) + let fanout = g.fanout.getOrDefault(topic) + let gossipPeers = mesh + fanout + var allPeers = toSeq(g.gossipsub.getOrDefault(topic)) + + allPeers.keepIf do (x: PubSubPeer) -> bool: + x.peerId notin g.parameters.directPeers and + x notin gossipPeers and + x.score >= g.parameters.gossipThreshold + + var target = g.parameters.dLazy + let factor = (g.parameters.gossipFactor.float * allPeers.len.float).int + if factor > target: + target = min(factor, allPeers.len) + + if target < allPeers.len: + shuffle(allPeers) + allPeers.setLen(target) + + for peer in allPeers: + if peer notin result: + result[peer] = ControlMessage() + result[peer].ihave.add(ihave) + +func `/`(a, b: Duration): float64 = + let + fa = float64(a.nanoseconds) + fb = float64(b.nanoseconds) + fa / fb + +proc colocationFactor(g: GossipSub, peer: PubSubPeer): float64 = + if peer.connections.len == 0: + 0.0 + else: + let + address = peer.connections[0].observedAddr + ipPeers = g.peersInIP.getOrDefault(address) + len = ipPeers.len.float64 + if len > g.parameters.ipColocationFactorThreshold: + let over = len - g.parameters.ipColocationFactorThreshold + over * over + else: + # lazy update peersInIP + if address notin g.peersInIP: + g.peersInIP[address] = initHashSet[PubSubPeer]() + g.peersInIP[address].incl(peer) + 0.0 + +proc updateScores(g: GossipSub) = # avoid async + trace "updating scores", peers = g.peers.len + + let now = Moment.now() + var evicting: seq[PubSubPeer] + + for peer, stats in g.peerStats.mpairs: + trace "updating peer score", peer + var n_topics = 0 + var is_grafted = 0 + + if not peer.connected: + if now > stats.expire: + evicting.add(peer) + trace "evicted peer from memory", peer continue - if peer notin result: - result[peer] = controlMsg + # Per topic + for topic, topicParams in g.topicParams: + var info = stats.topicInfos.getOrDefault(topic) + inc n_topics - result[peer].ihave.add(ihave) + # Scoring + var topicScore = 0'f64 + + if info.inMesh: + inc is_grafted + info.meshTime = now - info.graftTime + if info.meshTime > topicParams.meshMessageDeliveriesActivation: + info.meshMessageDeliveriesActive = true + + var p1 = info.meshTime / topicParams.timeInMeshQuantum + if p1 > topicParams.timeInMeshCap: + p1 = topicParams.timeInMeshCap + trace "p1", peer, p1 + topicScore += p1 * topicParams.timeInMeshWeight + else: + info.meshMessageDeliveriesActive = false + + topicScore += info.firstMessageDeliveries * topicParams.firstMessageDeliveriesWeight + trace "p2", peer, p2 = info.firstMessageDeliveries + + if info.meshMessageDeliveriesActive: + if info.meshMessageDeliveries < topicParams.meshMessageDeliveriesThreshold: + let deficit = topicParams.meshMessageDeliveriesThreshold - info.meshMessageDeliveries + let p3 = deficit * deficit + trace "p3", peer, p3 + topicScore += p3 * topicParams.meshMessageDeliveriesWeight + + topicScore += info.meshFailurePenalty * topicParams.meshFailurePenaltyWeight + trace "p3b", peer, p3b = info.meshFailurePenalty + + topicScore += info.invalidMessageDeliveries * info.invalidMessageDeliveries * topicParams.invalidMessageDeliveriesWeight + trace "p4", p4 = info.invalidMessageDeliveries * info.invalidMessageDeliveries + + trace "updated peer topic's scores", peer, topic, info, topicScore + + peer.score += topicScore * topicParams.topicWeight + + # Score decay + info.firstMessageDeliveries *= topicParams.firstMessageDeliveriesDecay + if info.firstMessageDeliveries < g.parameters.decayToZero: + info.firstMessageDeliveries = 0 + + info.meshMessageDeliveries *= topicParams.meshMessageDeliveriesDecay + if info.meshMessageDeliveries < g.parameters.decayToZero: + info.meshMessageDeliveries = 0 + + info.meshFailurePenalty *= topicParams.meshFailurePenaltyDecay + if info.meshFailurePenalty < g.parameters.decayToZero: + info.meshFailurePenalty = 0 + + info.invalidMessageDeliveries *= topicParams.invalidMessageDeliveriesDecay + if info.invalidMessageDeliveries < g.parameters.decayToZero: + info.invalidMessageDeliveries = 0 + + # Wrap up + # commit our changes, mgetOrPut does NOT work as wanted with value types (lent?) + stats.topicInfos[topic] = info + + peer.score += peer.appScore * g.parameters.appSpecificWeight + + peer.score += peer.behaviourPenalty * peer.behaviourPenalty * g.parameters.behaviourPenaltyWeight + + peer.score += g.colocationFactor(peer) * g.parameters.ipColocationFactorWeight + + # decay behaviourPenalty + peer.behaviourPenalty *= g.parameters.behaviourPenaltyDecay + if peer.behaviourPenalty < g.parameters.decayToZero: + peer.behaviourPenalty = 0 + + debug "updated peer's score", peer, score = peer.score, n_topics, is_grafted + + for peer in evicting: + g.peerStats.del(peer) + + trace "updated scores", peers = g.peers.len proc heartbeat(g: GossipSub) {.async.} = while g.heartbeatRunning: try: - trace "running heartbeat" + trace "running heartbeat", instance = cast[int](g) + + # remove expired backoffs + block: + let now = Moment.now() + var expired = toSeq(g.backingOff.pairs()) + expired.keepIf do (pair: tuple[peer: PeerID, expire: Moment]) -> bool: + now >= pair.expire + for (peer, _) in expired: + g.backingOff.del(peer) + + # reset IWANT budget + # reset IHAVE cap + block: + for peer in g.peers.values: + peer.iWantBudget = IWantPeerBudget + peer.iHaveBudget = IHavePeerBudget + + g.updateScores() for t in toSeq(g.topics.keys): + # prune every negative score peer + # do this before relance + # in order to avoid grafted -> pruned in the same cycle + let meshPeers = g.mesh.getOrDefault(t) + var prunes: seq[PubSubPeer] + for peer in meshPeers: + if peer.score < 0.0: + g.pruned(peer, t) + g.mesh.removePeer(t, peer) + prunes &= peer + let prune = RPCMsg(control: some(ControlMessage( + prune: @[ControlPrune( + topicID: t, + peers: g.peerExchangeList(t), + backoff: g.parameters.pruneBackoff.seconds.uint64)]))) + g.broadcast(prunes, prune) + await g.rebalanceMesh(t) g.dropFanoutPeers() @@ -244,7 +779,11 @@ proc heartbeat(g: GossipSub) {.async.} = except CancelledError as exc: raise exc except CatchableError as exc: - warn "exception ocurred in gossipsub heartbeat", exc = exc.msg + warn "exception ocurred in gossipsub heartbeat", exc = exc.msg, trace = exc.getStackTrace() + + for trigger in g.heartbeatEvents: + trace "firing heartbeat event", instance = cast[int](g) + trigger.fire() await sleepAsync(GossipSubHeartbeatInterval) @@ -258,14 +797,22 @@ method unsubscribePeer*(g: GossipSub, peer: PeerID) = trace "no peer to unsubscribe", peer return + # remove from peer IPs collection too + if pubSubPeer.connections.len > 0: + g.peersInIP.withValue(pubSubPeer.connections[0].observedAddr, s) do: + s[].excl(pubSubPeer) + for t in toSeq(g.gossipsub.keys): g.gossipsub.removePeer(t, pubSubPeer) + # also try to remove from explicit table here + g.explicit.removePeer(t, pubSubPeer) when defined(libp2p_expensive_metrics): libp2p_gossipsub_peers_per_topic_gossipsub .set(g.gossipsub.peers(t).int64, labelValues = [t]) for t in toSeq(g.mesh.keys): + g.pruned(pubSubPeer, t) g.mesh.removePeer(t, pubSubPeer) when defined(libp2p_expensive_metrics): @@ -278,6 +825,15 @@ method unsubscribePeer*(g: GossipSub, peer: PeerID) = when defined(libp2p_expensive_metrics): libp2p_gossipsub_peers_per_topic_fanout .set(g.fanout.peers(t).int64, labelValues = [t]) + + # don't retain bad score peers + if pubSubPeer.score < 0.0: + g.peerStats.del(pubSubPeer) + return + + g.peerStats[pubSubPeer].expire = Moment.now() + g.parameters.retainScore + for topic, info in g.peerStats[pubSubPeer].topicInfos.mpairs: + info.firstMessageDeliveries = 0 procCall FloodSub(g).unsubscribePeer(peer) @@ -291,17 +847,23 @@ method subscribeTopic*(g: GossipSub, logScope: peer topic + + g.onNewPeer(peer) if subscribe: trace "peer subscribed to topic" # subscribe remote peer to the topic discard g.gossipsub.addPeer(topic, peer) + if peer.peerId in g.parameters.directPeers: + discard g.explicit.addPeer(topic, peer) else: trace "peer unsubscribed from topic" # unsubscribe remote peer from the topic g.gossipsub.removePeer(topic, peer) g.mesh.removePeer(topic, peer) g.fanout.removePeer(topic, peer) + if peer.peerId in g.parameters.directPeers: + g.explicit.removePeer(topic, peer) when defined(libp2p_expensive_metrics): libp2p_gossipsub_peers_per_topic_mesh @@ -326,23 +888,51 @@ proc handleGraft(g: GossipSub, trace "peer grafted topic" - # If they send us a graft before they send us a subscribe, what should - # we do? For now, we add them to mesh but don't add them to gossipsub. + # It is an error to GRAFT on a explicit peer + if peer.peerId in g.parameters.directPeers: + trace "attempt to graft an explicit peer", peer=peer.id, + topicID=graft.topicID + # and such an attempt should be logged and rejected with a PRUNE + result.add(ControlPrune( + topicID: graft.topicID, + peers: @[], # omitting heavy computation here as the remote did something illegal + backoff: g.parameters.pruneBackoff.seconds.uint64)) + continue + if peer.peerId in g.backingOff: + trace "attempt to graft an backingOff peer", peer=peer.id, + topicID=graft.topicID, + expire=g.backingOff[peer.peerId] + # and such an attempt should be logged and rejected with a PRUNE + result.add(ControlPrune( + topicID: graft.topicID, + peers: @[], # omitting heavy computation here as the remote did something illegal + backoff: g.parameters.pruneBackoff.seconds.uint64)) + continue + + if peer notin g.peerStats: + g.peerStats[peer] = PeerStats() + + # If they send us a graft before they send us a subscribe, what should + # we do? For now, we add them to mesh but don't add them to gossipsub. if topic in g.topics: - if g.mesh.peers(topic) < GossipSubDHi: + if g.mesh.peers(topic) < GossipSubDHi or peer.outbound: # In the spec, there's no mention of DHi here, but implicitly, a # peer will be removed from the mesh on next rebalance, so we don't want # this peer to push someone else out if g.mesh.addPeer(topic, peer): + g.grafted(peer, topic) g.fanout.removePeer(topic, peer) else: trace "peer already in mesh" else: - result.add(ControlPrune(topicID: topic)) + result.add(ControlPrune( + topicID: topic, + peers: g.peerExchangeList(topic), + backoff: g.parameters.pruneBackoff.seconds.uint64)) else: - debug "peer grafting topic we're not interested in" - result.add(ControlPrune(topicID: topic)) + trace "peer grafting topic we're not interested in", topic + # gossip 1.1, we do not send a control message prune anymore when defined(libp2p_expensive_metrics): libp2p_gossipsub_peers_per_topic_mesh @@ -354,7 +944,16 @@ proc handlePrune(g: GossipSub, peer: PubSubPeer, prunes: seq[ControlPrune]) = for prune in prunes: trace "peer pruned topic", peer, topic = prune.topicID + # add peer backoff + if prune.backoff > 0: + let backoff = Moment.fromNow((prune.backoff + BackoffSlackTime).int64.seconds) + let current = g.backingOff.getOrDefault(peer.peerId) + if backoff > current: + g.backingOff[peer.peerId] = backoff + + g.pruned(peer, prune.topicID) g.mesh.removePeer(prune.topicID, peer) + when defined(libp2p_expensive_metrics): libp2p_gossipsub_peers_per_topic_mesh .set(g.mesh.peers(prune.topicID).int64, labelValues = [prune.topicID]) @@ -362,24 +961,47 @@ proc handlePrune(g: GossipSub, peer: PubSubPeer, prunes: seq[ControlPrune]) = proc handleIHave(g: GossipSub, peer: PubSubPeer, ihaves: seq[ControlIHave]): ControlIWant = - for ihave in ihaves: - trace "peer sent ihave", - peer, topic = ihave.topicID, msgs = ihave.messageIDs + if peer.score < g.parameters.gossipThreshold: + trace "ihave: ignoring low score peer", peer = $peer, score = peer.score + elif peer.iHaveBudget == 0: + trace "ihave: ignoring out of budget peer", peer = $peer, score = peer.score + else: + dec peer.iHaveBudget + for ihave in ihaves: + trace "peer sent ihave", + peer, topic = ihave.topicID, msgs = ihave.messageIDs - if ihave.topicID in g.mesh: - for m in ihave.messageIDs: - if m notin g.seen: - result.messageIDs.add(m) + if ihave.topicID in g.mesh: + for m in ihave.messageIDs: + if m notin g.seen: + result.messageIDs.add(m) proc handleIWant(g: GossipSub, peer: PubSubPeer, iwants: seq[ControlIWant]): seq[Message] = - for iwant in iwants: - for mid in iwant.messageIDs: - trace "peer sent iwant", peer, messageID = mid - let msg = g.mcache.get(mid) - if msg.isSome: - result.add(msg.get()) + if peer.score < g.parameters.gossipThreshold: + trace "iwant: ignoring low score peer", peer = $peer, score = peer.score + else: + for iwant in iwants: + for mid in iwant.messageIDs: + trace "peer sent iwant", peer, messageID = mid + let msg = g.mcache.get(mid) + if msg.isSome: + # avoid spam + if peer.iWantBudget > 0: + result.add(msg.get()) + dec peer.iWantBudget + else: + return + +proc punishPeer(g: GossipSub, peer: PubSubPeer, msg: Message) = + for t in msg.topicIDs: + # ensure we init a new topic if unknown + let _ = g.topicParams.mgetOrPut(t, TopicParams.init()) + # update stats + var tstats = g.peerStats[peer].topicInfos.getOrDefault(t) + tstats.invalidMessageDeliveries += 1 + g.peerStats[peer].topicInfos[t] = tstats method rpcHandler*(g: GossipSub, peer: PubSubPeer, @@ -391,20 +1013,53 @@ method rpcHandler*(g: GossipSub, if g.seen.put(msgId): trace "Dropping already-seen message", msgId, peer + + # make sure to update score tho before continuing + for t in msg.topicIDs: # for every topic in the message + let topicParams = g.topicParams.mgetOrPut(t, TopicParams.init()) + # if in mesh add more delivery score + var stats = g.peerStats[peer].topicInfos.getOrDefault(t) + if stats.inMesh: + stats.meshMessageDeliveries += 1 + if stats.meshMessageDeliveries > topicParams.meshMessageDeliveriesCap: + stats.meshMessageDeliveries = topicParams.meshMessageDeliveriesCap + + # commit back to the table + g.peerStats[peer].topicInfos[t] = stats + continue g.mcache.put(msgId, msg) if g.verifySignature and not msg.verify(peer.peerId): debug "Dropping message due to failed signature verification", msgId, peer + g.punishPeer(peer, msg) continue if not (await g.validate(msg)): - trace "Dropping message due to failed validation", msgId, peer + debug "Dropping message due to failed validation", msgId, peer + g.punishPeer(peer, msg) continue var toSendPeers = initHashSet[PubSubPeer]() - for t in msg.topicIDs: # for every topic in the message + for t in msg.topicIDs: # for every topic in the message + let topicParams = g.topicParams.mgetOrPut(t, TopicParams.init()) + + # contribute to peer score first delivery + var stats = g.peerStats[peer].topicInfos.getOrDefault(t) + stats.firstMessageDeliveries += 1 + if stats.firstMessageDeliveries > topicParams.firstMessageDeliveriesCap: + stats.firstMessageDeliveries = topicParams.firstMessageDeliveriesCap + + # if in mesh add more delivery score + if stats.inMesh: + stats.meshMessageDeliveries += 1 + if stats.meshMessageDeliveries > topicParams.meshMessageDeliveriesCap: + stats.meshMessageDeliveries = topicParams.meshMessageDeliveriesCap + + # commit back to the table + g.peerStats[peer].topicInfos[t] = stats + g.floodsub.withValue(t, peers): toSendPeers.incl(peers[]) g.mesh.withValue(t, peers): toSendPeers.incl(peers[]) @@ -427,7 +1082,7 @@ method rpcHandler*(g: GossipSub, if respControl.graft.len > 0 or respControl.prune.len > 0 or respControl.ihave.len > 0 or messages.len > 0: - debug "sending control message", msg = shortLog(respControl), peer + trace "sending control message", msg = shortLog(respControl), peer g.send( peer, RPCMsg(control: some(respControl), messages: messages)) @@ -436,6 +1091,11 @@ method subscribe*(g: GossipSub, topic: string, handler: TopicHandler) {.async.} = await procCall PubSub(g).subscribe(topic, handler) + + # if we have a fanout on this topic break it + if topic in g.fanout: + g.fanout.del(topic) + await g.rebalanceMesh(topic) method unsubscribe*(g: GossipSub, @@ -448,9 +1108,14 @@ method unsubscribe*(g: GossipSub, if topic in g.mesh: let peers = g.mesh[topic] g.mesh.del(topic) - - let prune = RPCMsg( - control: some(ControlMessage(prune: @[ControlPrune(topicID: topic)]))) + g.topicParams.del(topic) + for peer in peers: + g.pruned(peer, topic) + let prune = RPCMsg(control: some(ControlMessage( + prune: @[ControlPrune( + topicID: topic, + peers: g.peerExchangeList(topic), + backoff: g.parameters.pruneBackoff.seconds.uint64)]))) g.broadcast(toSeq(peers), prune) method unsubscribeAll*(g: GossipSub, topic: string) {.async.} = @@ -459,8 +1124,13 @@ method unsubscribeAll*(g: GossipSub, topic: string) {.async.} = if topic in g.mesh: let peers = g.mesh.getOrDefault(topic) g.mesh.del(topic) - - let prune = RPCMsg(control: some(ControlMessage(prune: @[ControlPrune(topicID: topic)]))) + for peer in peers: + g.pruned(peer, topic) + let prune = RPCMsg(control: some(ControlMessage( + prune: @[ControlPrune( + topicID: topic, + peers: g.peerExchangeList(topic), + backoff: g.parameters.pruneBackoff.seconds.uint64)]))) g.broadcast(toSeq(peers), prune) method publish*(g: GossipSub, @@ -475,17 +1145,29 @@ method publish*(g: GossipSub, if topic.len <= 0: # data could be 0/empty debug "Empty topic, skipping publish" return 0 - + var peers: HashSet[PubSubPeer] + + if g.parameters.floodPublish: + # With flood publishing enabled, the mesh is used when propagating messages from other peers, + # but a peer's own messages will always be published to all known peers in the topic. + for peer in g.gossipsub.getOrDefault(topic): + if peer.score >= g.parameters.publishThreshold: + trace "publish: including flood/high score peer", peer = $peer + peers.incl(peer) + + # add always direct peers + peers.incl(g.explicit.getOrDefault(topic)) + if topic in g.topics: # if we're subscribed use the mesh - peers = g.mesh.getOrDefault(topic) + peers.incl(g.mesh.getOrDefault(topic)) else: # not subscribed, send to fanout peers # try optimistically - peers = g.fanout.getOrDefault(topic) + peers.incl(g.fanout.getOrDefault(topic)) if peers.len == 0: # ok we had nothing.. let's try replenish inline g.replenishFanout(topic) - peers = g.fanout.getOrDefault(topic) + peers.incl(g.fanout.getOrDefault(topic)) # even if we couldn't publish, # we still attempted to publish @@ -523,6 +1205,18 @@ method publish*(g: GossipSub, return peers.len +proc maintainDirectPeers(g: GossipSub) {.async.} = + while g.heartbeatRunning: + for id in g.parameters.directPeers: + let peer = g.peers.getOrDefault(id) + if peer == nil: + # this creates a new peer and assigns the current switch to it + # as a result the next time we try to Send we will as well try to open a connection + # see pubsubpeer.nim send and such + discard g.getOrCreatePeer(id, g.codecs) + + await sleepAsync(1.minutes) + method start*(g: GossipSub) {.async.} = trace "gossipsub start" @@ -532,6 +1226,7 @@ method start*(g: GossipSub) {.async.} = g.heartbeatRunning = true g.heartbeatFut = g.heartbeat() + g.directPeersLoop = g.maintainDirectPeers() method stop*(g: GossipSub) {.async.} = trace "gossipsub stop" @@ -541,16 +1236,21 @@ method stop*(g: GossipSub) {.async.} = # stop heartbeat interval g.heartbeatRunning = false + g.directPeersLoop.cancel() if not g.heartbeatFut.finished: trace "awaiting last heartbeat" await g.heartbeatFut trace "heartbeat stopped" g.heartbeatFut = nil - method initPubSub*(g: GossipSub) = procCall FloodSub(g).initPubSub() + if not g.parameters.explicit: + g.parameters = GossipSubParams.init() + + g.parameters.validateParameters().tryGet() + randomize() g.mcache = MCache.init(GossipSubHistoryGossip, GossipSubHistoryLength) g.mesh = initTable[string, HashSet[PubSubPeer]]() # meshes - topic to peer diff --git a/libp2p/protocols/pubsub/gossipsub10.nim b/libp2p/protocols/pubsub/gossipsub10.nim new file mode 100644 index 0000000..0238bac --- /dev/null +++ b/libp2p/protocols/pubsub/gossipsub10.nim @@ -0,0 +1,601 @@ +## Nim-LibP2P +## Copyright (c) 2019 Status Research & Development GmbH +## Licensed under either of +## * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE)) +## * MIT license ([LICENSE-MIT](LICENSE-MIT)) +## at your option. +## This file may not be copied, modified, or distributed except according to +## those terms. + +# TODO: this module is temporary to allow +# for quick switchover fro 1.1 to 1.0. +# This should be removed once 1.1 is stable +# enough. + +import std/[options, random, sequtils, sets, tables] +import chronos, chronicles, metrics +import ./pubsub, + ./floodsub, + ./pubsubpeer, + ./peertable, + ./mcache, + ./timedcache, + ./rpc/[messages, message], + ../protocol, + ../../stream/connection, + ../../peerinfo, + ../../peerid, + ../../utility + +logScope: + topics = "gossipsub" + +const GossipSubCodec* = "/meshsub/1.0.0" + +# overlay parameters +const GossipSubD* = 6 +const GossipSubDlo* = 4 +const GossipSubDhi* = 12 + +# gossip parameters +const GossipSubHistoryLength* = 5 +const GossipSubHistoryGossip* = 3 + +# heartbeat interval +const GossipSubHeartbeatInitialDelay* = 100.millis +const GossipSubHeartbeatInterval* = 1.seconds + +# fanout ttl +const GossipSubFanoutTTL* = 1.minutes + +type + GossipSub* = ref object of FloodSub + mesh*: PeerTable # peers that we send messages to when we are subscribed to the topic + fanout*: PeerTable # peers that we send messages to when we're not subscribed to the topic + gossipsub*: PeerTable # peers that are subscribed to a topic + lastFanoutPubSub*: Table[string, Moment] # last publish time for fanout topics + gossip*: Table[string, seq[ControlIHave]] # pending gossip + control*: Table[string, ControlMessage] # pending control messages + mcache*: MCache # messages cache + heartbeatFut: Future[void] # cancellation future for heartbeat interval + heartbeatRunning: bool + heartbeatEvents*: seq[AsyncEvent] + parameters*: GossipSubParams + + GossipSubParams* = object + # stubs + explicit: bool + pruneBackoff*: Duration + floodPublish*: bool + gossipFactor*: float64 + dScore*: int + dOut*: int + dLazy*: int + + gossipThreshold*: float64 + publishThreshold*: float64 + graylistThreshold*: float64 + acceptPXThreshold*: float64 + opportunisticGraftThreshold*: float64 + decayInterval*: Duration + decayToZero*: float64 + retainScore*: Duration + + appSpecificWeight*: float64 + ipColocationFactorWeight*: float64 + ipColocationFactorThreshold*: float64 + behaviourPenaltyWeight*: float64 + behaviourPenaltyDecay*: float64 + + directPeers*: seq[PeerId] + +proc init*(G: type[GossipSubParams]): G = discard + +when defined(libp2p_expensive_metrics): + declareGauge(libp2p_gossipsub_peers_per_topic_mesh, + "gossipsub peers per topic in mesh", + labels = ["topic"]) + + declareGauge(libp2p_gossipsub_peers_per_topic_fanout, + "gossipsub peers per topic in fanout", + labels = ["topic"]) + + declareGauge(libp2p_gossipsub_peers_per_topic_gossipsub, + "gossipsub peers per topic in gossipsub", + labels = ["topic"]) + +method init*(g: GossipSub) = + proc handler(conn: Connection, proto: string) {.async.} = + ## main protocol handler that gets triggered on every + ## connection for a protocol string + ## e.g. ``/floodsub/1.0.0``, etc... + ## + try: + await g.handleConn(conn, proto) + except CancelledError: + # This is top-level procedure which will work as separate task, so it + # do not need to propogate CancelledError. + trace "Unexpected cancellation in gossipsub handler", conn + except CatchableError as exc: + trace "GossipSub handler leaks an error", exc = exc.msg, conn + + g.handler = handler + g.codec = GossipSubCodec + +proc replenishFanout(g: GossipSub, topic: string) = + ## get fanout peers for a topic + logScope: topic + trace "about to replenish fanout" + + if g.fanout.peers(topic) < GossipSubDLo: + trace "replenishing fanout", peers = g.fanout.peers(topic) + if topic in g.gossipsub: + for peer in g.gossipsub[topic]: + if g.fanout.addPeer(topic, peer): + if g.fanout.peers(topic) == GossipSubD: + break + + when defined(libp2p_expensive_metrics): + libp2p_gossipsub_peers_per_topic_fanout + .set(g.fanout.peers(topic).int64, labelValues = [topic]) + + trace "fanout replenished with peers", peers = g.fanout.peers(topic) + +proc rebalanceMesh(g: GossipSub, topic: string) {.async.} = + logScope: + topic + mesh = g.mesh.peers(topic) + gossipsub = g.gossipsub.peers(topic) + + trace "rebalancing mesh" + + # create a mesh topic that we're subscribing to + + var + grafts, prunes: seq[PubSubPeer] + + if g.mesh.peers(topic) < GossipSubDlo: + trace "replenishing mesh", peers = g.mesh.peers(topic) + # replenish the mesh if we're below Dlo + grafts = toSeq( + g.gossipsub.getOrDefault(topic, initHashSet[PubSubPeer]()) - + g.mesh.getOrDefault(topic, initHashSet[PubSubPeer]()) + ) + + shuffle(grafts) + + # Graft peers so we reach a count of D + grafts.setLen(min(grafts.len, GossipSubD - g.mesh.peers(topic))) + + trace "grafting", grafts = grafts.len + + for peer in grafts: + if g.mesh.addPeer(topic, peer): + g.fanout.removePeer(topic, peer) + + if g.mesh.peers(topic) > GossipSubDhi: + # prune peers if we've gone over Dhi + prunes = toSeq(g.mesh[topic]) + shuffle(prunes) + prunes.setLen(prunes.len - GossipSubD) # .. down to D peers + + trace "pruning", prunes = prunes.len + for peer in prunes: + g.mesh.removePeer(topic, peer) + + when defined(libp2p_expensive_metrics): + libp2p_gossipsub_peers_per_topic_gossipsub + .set(g.gossipsub.peers(topic).int64, labelValues = [topic]) + + libp2p_gossipsub_peers_per_topic_fanout + .set(g.fanout.peers(topic).int64, labelValues = [topic]) + + libp2p_gossipsub_peers_per_topic_mesh + .set(g.mesh.peers(topic).int64, labelValues = [topic]) + + trace "mesh balanced" + + # Send changes to peers after table updates to avoid stale state + if grafts.len > 0: + let graft = RPCMsg(control: some(ControlMessage(graft: @[ControlGraft(topicID: topic)]))) + g.broadcast(grafts, graft) + if prunes.len > 0: + let prune = RPCMsg(control: some(ControlMessage(prune: @[ControlPrune(topicID: topic)]))) + g.broadcast(prunes, prune) + +proc dropFanoutPeers(g: GossipSub) = + # drop peers that we haven't published to in + # GossipSubFanoutTTL seconds + let now = Moment.now() + for topic in toSeq(g.lastFanoutPubSub.keys): + let val = g.lastFanoutPubSub[topic] + if now > val: + g.fanout.del(topic) + g.lastFanoutPubSub.del(topic) + trace "dropping fanout topic", topic + + when defined(libp2p_expensive_metrics): + libp2p_gossipsub_peers_per_topic_fanout + .set(g.fanout.peers(topic).int64, labelValues = [topic]) + +proc getGossipPeers(g: GossipSub): Table[PubSubPeer, ControlMessage] {.gcsafe.} = + ## gossip iHave messages to peers + ## + + trace "getting gossip peers (iHave)" + let topics = toHashSet(toSeq(g.mesh.keys)) + toHashSet(toSeq(g.fanout.keys)) + let controlMsg = ControlMessage() + for topic in topics: + var allPeers = toSeq(g.gossipsub.getOrDefault(topic)) + shuffle(allPeers) + + let mesh = g.mesh.getOrDefault(topic) + let fanout = g.fanout.getOrDefault(topic) + + let gossipPeers = mesh + fanout + let mids = g.mcache.window(topic) + if not mids.len > 0: + continue + + if topic notin g.gossipsub: + trace "topic not in gossip array, skipping", topic + continue + + let ihave = ControlIHave(topicID: topic, messageIDs: toSeq(mids)) + for peer in allPeers: + if result.len >= GossipSubD: + trace "got gossip peers", peers = result.len + break + + if peer in gossipPeers: + continue + + if peer notin result: + result[peer] = controlMsg + + result[peer].ihave.add(ihave) + +proc heartbeat(g: GossipSub) {.async.} = + while g.heartbeatRunning: + try: + trace "running heartbeat" + + for t in toSeq(g.topics.keys): + await g.rebalanceMesh(t) + + g.dropFanoutPeers() + + # replenish known topics to the fanout + for t in toSeq(g.fanout.keys): + g.replenishFanout(t) + + let peers = g.getGossipPeers() + for peer, control in peers: + g.peers.withValue(peer.peerId, pubsubPeer) do: + g.send( + pubsubPeer[], + RPCMsg(control: some(control))) + + g.mcache.shift() # shift the cache + except CancelledError as exc: + raise exc + except CatchableError as exc: + warn "exception ocurred in gossipsub heartbeat", exc = exc.msg + + for trigger in g.heartbeatEvents: + trace "firing heartbeat event", instance = cast[int](g) + trigger.fire() + + await sleepAsync(GossipSubHeartbeatInterval) + +method unsubscribePeer*(g: GossipSub, peer: PeerID) = + ## handle peer disconnects + ## + + trace "unsubscribing gossipsub peer", peer + let pubSubPeer = g.peers.getOrDefault(peer) + if pubSubPeer.isNil: + trace "no peer to unsubscribe", peer + return + + for t in toSeq(g.gossipsub.keys): + g.gossipsub.removePeer(t, pubSubPeer) + + when defined(libp2p_expensive_metrics): + libp2p_gossipsub_peers_per_topic_gossipsub + .set(g.gossipsub.peers(t).int64, labelValues = [t]) + + for t in toSeq(g.mesh.keys): + g.mesh.removePeer(t, pubSubPeer) + + when defined(libp2p_expensive_metrics): + libp2p_gossipsub_peers_per_topic_mesh + .set(g.mesh.peers(t).int64, labelValues = [t]) + + for t in toSeq(g.fanout.keys): + g.fanout.removePeer(t, pubSubPeer) + + when defined(libp2p_expensive_metrics): + libp2p_gossipsub_peers_per_topic_fanout + .set(g.fanout.peers(t).int64, labelValues = [t]) + + procCall FloodSub(g).unsubscribePeer(peer) + +method subscribeTopic*(g: GossipSub, + topic: string, + subscribe: bool, + peer: PubSubPeer) {.gcsafe.} = + # Skip floodsub - we don't want it to add the peer to `g.floodsub` + procCall PubSub(g).subscribeTopic(topic, subscribe, peer) + + logScope: + peer + topic + + if subscribe: + trace "peer subscribed to topic" + # subscribe remote peer to the topic + discard g.gossipsub.addPeer(topic, peer) + else: + trace "peer unsubscribed from topic" + # unsubscribe remote peer from the topic + g.gossipsub.removePeer(topic, peer) + g.mesh.removePeer(topic, peer) + g.fanout.removePeer(topic, peer) + + when defined(libp2p_expensive_metrics): + libp2p_gossipsub_peers_per_topic_mesh + .set(g.mesh.peers(topic).int64, labelValues = [topic]) + libp2p_gossipsub_peers_per_topic_fanout + .set(g.fanout.peers(topic).int64, labelValues = [topic]) + + when defined(libp2p_expensive_metrics): + libp2p_gossipsub_peers_per_topic_gossipsub + .set(g.gossipsub.peers(topic).int64, labelValues = [topic]) + + trace "gossip peers", peers = g.gossipsub.peers(topic), topic + +proc handleGraft(g: GossipSub, + peer: PubSubPeer, + grafts: seq[ControlGraft]): seq[ControlPrune] = + for graft in grafts: + let topic = graft.topicID + logScope: + peer + topic + + trace "peer grafted topic" + + # If they send us a graft before they send us a subscribe, what should + # we do? For now, we add them to mesh but don't add them to gossipsub. + + if topic in g.topics: + if g.mesh.peers(topic) < GossipSubDHi: + # In the spec, there's no mention of DHi here, but implicitly, a + # peer will be removed from the mesh on next rebalance, so we don't want + # this peer to push someone else out + if g.mesh.addPeer(topic, peer): + g.fanout.removePeer(topic, peer) + else: + trace "peer already in mesh" + else: + result.add(ControlPrune(topicID: topic)) + else: + debug "peer grafting topic we're not interested in" + result.add(ControlPrune(topicID: topic)) + + when defined(libp2p_expensive_metrics): + libp2p_gossipsub_peers_per_topic_mesh + .set(g.mesh.peers(topic).int64, labelValues = [topic]) + libp2p_gossipsub_peers_per_topic_fanout + .set(g.fanout.peers(topic).int64, labelValues = [topic]) + +proc handlePrune(g: GossipSub, peer: PubSubPeer, prunes: seq[ControlPrune]) = + for prune in prunes: + trace "peer pruned topic", peer, topic = prune.topicID + + g.mesh.removePeer(prune.topicID, peer) + when defined(libp2p_expensive_metrics): + libp2p_gossipsub_peers_per_topic_mesh + .set(g.mesh.peers(prune.topicID).int64, labelValues = [prune.topicID]) + +proc handleIHave(g: GossipSub, + peer: PubSubPeer, + ihaves: seq[ControlIHave]): ControlIWant = + for ihave in ihaves: + trace "peer sent ihave", + peer, topic = ihave.topicID, msgs = ihave.messageIDs + + if ihave.topicID in g.mesh: + for m in ihave.messageIDs: + if m notin g.seen: + result.messageIDs.add(m) + +proc handleIWant(g: GossipSub, + peer: PubSubPeer, + iwants: seq[ControlIWant]): seq[Message] = + for iwant in iwants: + for mid in iwant.messageIDs: + trace "peer sent iwant", peer, messageID = mid + let msg = g.mcache.get(mid) + if msg.isSome: + result.add(msg.get()) + +method rpcHandler*(g: GossipSub, + peer: PubSubPeer, + rpcMsg: RPCMsg) {.async.} = + await procCall PubSub(g).rpcHandler(peer, rpcMsg) + + for msg in rpcMsg.messages: # for every message + let msgId = g.msgIdProvider(msg) + + if g.seen.put(msgId): + trace "Dropping already-seen message", msgId, peer + continue + + g.mcache.put(msgId, msg) + + if g.verifySignature and not msg.verify(peer.peerId): + debug "Dropping message due to failed signature verification", msgId, peer + continue + + if not (await g.validate(msg)): + trace "Dropping message due to failed validation", msgId, peer + continue + + var toSendPeers = initHashSet[PubSubPeer]() + for t in msg.topicIDs: # for every topic in the message + g.floodsub.withValue(t, peers): toSendPeers.incl(peers[]) + g.mesh.withValue(t, peers): toSendPeers.incl(peers[]) + + await handleData(g, t, msg.data) + + # In theory, if topics are the same in all messages, we could batch - we'd + # also have to be careful to only include validated messages + g.broadcast(toSeq(toSendPeers), RPCMsg(messages: @[msg])) + trace "forwared message to peers", peers = toSendPeers.len, msgId, peer + + if rpcMsg.control.isSome: + let control = rpcMsg.control.get() + g.handlePrune(peer, control.prune) + + var respControl: ControlMessage + respControl.iwant.add(g.handleIHave(peer, control.ihave)) + respControl.prune.add(g.handleGraft(peer, control.graft)) + let messages = g.handleIWant(peer, control.iwant) + + if respControl.graft.len > 0 or respControl.prune.len > 0 or + respControl.ihave.len > 0 or messages.len > 0: + + debug "sending control message", msg = shortLog(respControl), peer + g.send( + peer, + RPCMsg(control: some(respControl), messages: messages)) + +method subscribe*(g: GossipSub, + topic: string, + handler: TopicHandler) {.async.} = + await procCall PubSub(g).subscribe(topic, handler) + await g.rebalanceMesh(topic) + +method unsubscribe*(g: GossipSub, + topics: seq[TopicPair]) {.async.} = + await procCall PubSub(g).unsubscribe(topics) + + for (topic, handler) in topics: + # delete from mesh only if no handlers are left + if topic notin g.topics: + if topic in g.mesh: + let peers = g.mesh[topic] + g.mesh.del(topic) + + let prune = RPCMsg( + control: some(ControlMessage(prune: @[ControlPrune(topicID: topic)]))) + g.broadcast(toSeq(peers), prune) + +method unsubscribeAll*(g: GossipSub, topic: string) {.async.} = + await procCall PubSub(g).unsubscribeAll(topic) + + if topic in g.mesh: + let peers = g.mesh.getOrDefault(topic) + g.mesh.del(topic) + + let prune = RPCMsg(control: some(ControlMessage(prune: @[ControlPrune(topicID: topic)]))) + g.broadcast(toSeq(peers), prune) + +method publish*(g: GossipSub, + topic: string, + data: seq[byte]): Future[int] {.async.} = + # base returns always 0 + discard await procCall PubSub(g).publish(topic, data) + + logScope: topic + trace "Publishing message on topic", data = data.shortLog + + if topic.len <= 0: # data could be 0/empty + debug "Empty topic, skipping publish" + return 0 + + var peers: HashSet[PubSubPeer] + if topic in g.topics: # if we're subscribed use the mesh + peers = g.mesh.getOrDefault(topic) + else: # not subscribed, send to fanout peers + # try optimistically + peers = g.fanout.getOrDefault(topic) + if peers.len == 0: + # ok we had nothing.. let's try replenish inline + g.replenishFanout(topic) + peers = g.fanout.getOrDefault(topic) + + # even if we couldn't publish, + # we still attempted to publish + # on the topic, so it makes sense + # to update the last topic publish + # time + g.lastFanoutPubSub[topic] = Moment.fromNow(GossipSubFanoutTTL) + + if peers.len == 0: + debug "No peers for topic, skipping publish" + return 0 + + inc g.msgSeqno + let + msg = Message.init(g.peerInfo, data, topic, g.msgSeqno, g.sign) + msgId = g.msgIdProvider(msg) + + logScope: msgId + + trace "Created new message", msg = shortLog(msg), peers = peers.len + + if g.seen.put(msgId): + # custom msgid providers might cause this + trace "Dropping already-seen message" + return 0 + + g.mcache.put(msgId, msg) + + g.broadcast(toSeq(peers), RPCMsg(messages: @[msg])) + when defined(libp2p_expensive_metrics): + if peers.len > 0: + libp2p_pubsub_messages_published.inc(labelValues = [topic]) + + trace "Published message to peers" + + return peers.len + +method start*(g: GossipSub) {.async.} = + trace "gossipsub start" + + if not g.heartbeatFut.isNil: + warn "Starting gossipsub twice" + return + + g.heartbeatRunning = true + g.heartbeatFut = g.heartbeat() + +method stop*(g: GossipSub) {.async.} = + trace "gossipsub stop" + if g.heartbeatFut.isNil: + warn "Stopping gossipsub without starting it" + return + + # stop heartbeat interval + g.heartbeatRunning = false + if not g.heartbeatFut.finished: + trace "awaiting last heartbeat" + await g.heartbeatFut + trace "heartbeat stopped" + g.heartbeatFut = nil + + +method initPubSub*(g: GossipSub) = + procCall FloodSub(g).initPubSub() + + randomize() + g.mcache = MCache.init(GossipSubHistoryGossip, GossipSubHistoryLength) + g.mesh = initTable[string, HashSet[PubSubPeer]]() # meshes - topic to peer + g.fanout = initTable[string, HashSet[PubSubPeer]]() # fanout - topic to peer + g.gossipsub = initTable[string, HashSet[PubSubPeer]]()# topic to peer map of all gossipsub peers + g.lastFanoutPubSub = initTable[string, Moment]() # last publish time for fanout topics + g.gossip = initTable[string, seq[ControlIHave]]() # pending gossip + g.control = initTable[string, ControlMessage]() # pending control messages \ No newline at end of file diff --git a/libp2p/protocols/pubsub/pubsub.nim b/libp2p/protocols/pubsub/pubsub.nim index 2ef0914..23e1950 100644 --- a/libp2p/protocols/pubsub/pubsub.nim +++ b/libp2p/protocols/pubsub/pubsub.nim @@ -18,8 +18,13 @@ import pubsubpeer, ../../peerinfo, ../../errors +import metrics +import stew/results +export results + export PubSubPeer export PubSubObserver +export protocol logScope: topics = "pubsub" @@ -44,6 +49,7 @@ type proc(m: Message): string {.noSideEffect, raises: [Defect], nimcall, gcsafe.} Topic* = object + # make this a variant type if one day we have different Params structs name*: string handler*: seq[TopicHandler] @@ -111,24 +117,29 @@ method rpcHandler*(p: PubSub, trace "about to subscribe to topic", topicId = s.topic, peer p.subscribeTopic(s.topic, s.subscribe, peer) +method onNewPeer(p: PubSub, peer: PubSubPeer) {.base.} = discard + proc getOrCreatePeer*( p: PubSub, peer: PeerID, - proto: string): PubSubPeer = + protos: seq[string]): PubSubPeer = if peer in p.peers: return p.peers[peer] proc getConn(): Future[(Connection, RPCMsg)] {.async.} = - let conn = await p.switch.dial(peer, proto) + let conn = await p.switch.dial(peer, protos) return (conn, RPCMsg.withSubs(toSeq(p.topics.keys), true)) # create new pubsub peer - let pubSubPeer = newPubSubPeer(peer, getConn, proto) + let pubSubPeer = newPubSubPeer(peer, getConn, protos[0]) trace "created new pubsub peer", peerId = $peer p.peers[peer] = pubSubPeer pubSubPeer.observers = p.observers + onNewPeer(p, pubSubPeer) + + # metrics libp2p_pubsub_peers.set(p.peers.len.int64) pubsubPeer.connect() @@ -171,7 +182,7 @@ method handleConn*(p: PubSub, # call pubsub rpc handler p.rpcHandler(peer, msg) - let peer = p.getOrCreatePeer(conn.peerInfo.peerId, proto) + let peer = p.getOrCreatePeer(conn.peerInfo.peerId, @[proto]) try: peer.handler = handler @@ -189,7 +200,8 @@ method subscribePeer*(p: PubSub, peer: PeerID) {.base.} = ## messages ## - discard p.getOrCreatePeer(peer, p.codec) + let peer = p.getOrCreatePeer(peer, p.codecs) + peer.outbound = true # flag as outbound method unsubscribe*(p: PubSub, topics: seq[TopicPair]) {.base, async.} = @@ -302,23 +314,35 @@ method validate*(p: PubSub, message: Message): Future[bool] {.async, base.} = else: libp2p_pubsub_validation_failure.inc() -proc init*( +proc init*[PubParams: object | bool]( P: typedesc[PubSub], switch: Switch, triggerSelf: bool = false, verifySignature: bool = true, sign: bool = true, - msgIdProvider: MsgIdProvider = defaultMsgIdProvider): P = - - let pubsub = P( - switch: switch, - peerInfo: switch.peerInfo, - triggerSelf: triggerSelf, - verifySignature: verifySignature, - sign: sign, - peers: initTable[PeerID, PubSubPeer](), - topics: initTable[string, Topic](), - msgIdProvider: msgIdProvider) + msgIdProvider: MsgIdProvider = defaultMsgIdProvider, + parameters: PubParams = false): P = + let pubsub = + when PubParams is bool: + P(switch: switch, + peerInfo: switch.peerInfo, + triggerSelf: triggerSelf, + verifySignature: verifySignature, + sign: sign, + peers: initTable[PeerID, PubSubPeer](), + topics: initTable[string, Topic](), + msgIdProvider: msgIdProvider) + else: + P(switch: switch, + peerInfo: switch.peerInfo, + triggerSelf: triggerSelf, + verifySignature: verifySignature, + sign: sign, + peers: initTable[PeerID, PubSubPeer](), + topics: initTable[string, Topic](), + msgIdProvider: msgIdProvider, + parameters: parameters) + pubsub.initPubSub() proc peerEventHandler(peerId: PeerID, event: PeerEvent) {.async.} = if event == PeerEvent.Joined: @@ -332,8 +356,8 @@ proc init*( pubsub.initPubSub() return pubsub -proc addObserver*(p: PubSub; observer: PubSubObserver) = - p.observers[] &= observer + +proc addObserver*(p: PubSub; observer: PubSubObserver) = p.observers[] &= observer proc removeObserver*(p: PubSub; observer: PubSubObserver) = let idx = p.observers[].find(observer) diff --git a/libp2p/protocols/pubsub/pubsubpeer.nim b/libp2p/protocols/pubsub/pubsubpeer.nim index caf378c..c254f2f 100644 --- a/libp2p/protocols/pubsub/pubsubpeer.nim +++ b/libp2p/protocols/pubsub/pubsubpeer.nim @@ -7,7 +7,7 @@ ## This file may not be copied, modified, or distributed except according to ## those terms. -import std/[hashes, options, strutils, tables] +import std/[sequtils, strutils, tables, hashes, sets] import chronos, chronicles, nimcrypto/sha2, metrics import rpc/[messages, message, protobuf], ../../peerid, @@ -43,9 +43,18 @@ type observers*: ref seq[PubSubObserver] # ref as in smart_ptr dialLock: AsyncLock + score*: float64 + iWantBudget*: int + iHaveBudget*: int + outbound*: bool # if this is an outbound connection + appScore*: float64 # application specific score + behaviourPenalty*: float64 # the eventual penalty score + RPCHandler* = proc(peer: PubSubPeer, msg: RPCMsg): Future[void] {.gcsafe.} -func hash*(p: PubSubPeer): Hash = +chronicles.formatIt(PubSubPeer): $it.peerId + +func hash*(p: PubSubPeer): Hash = # int is either 32/64, so intptr basically, pubsubpeer is a ref cast[pointer](p).hash @@ -177,6 +186,7 @@ proc getSendConn(p: PubSubPeer): Future[Connection] {.async.} = # Grab a new send connection let (newConn, handshake) = await p.getConn() # ...and here if newConn.isNil: + debug "Failed to get a new send connection" return nil trace "Sending handshake", newConn, handshake = shortLog(handshake) diff --git a/libp2p/protocols/pubsub/rpc/messages.nim b/libp2p/protocols/pubsub/rpc/messages.nim index 014f065..301b8a3 100644 --- a/libp2p/protocols/pubsub/rpc/messages.nim +++ b/libp2p/protocols/pubsub/rpc/messages.nim @@ -14,6 +14,10 @@ import ../../../peerid export options type + PeerInfoMsg* = object + peerID*: seq[byte] + signedPeerRecord*: seq[byte] + SubOpts* = object subscribe*: bool topic*: string @@ -44,6 +48,8 @@ type ControlPrune* = object topicID*: string + peers*: seq[PeerInfoMsg] + backoff*: uint64 RPCMsg* = object subscriptions*: seq[SubOpts] diff --git a/libp2p/protocols/pubsub/rpc/protobuf.nim b/libp2p/protocols/pubsub/rpc/protobuf.nim index f5a4fe6..28755cd 100644 --- a/libp2p/protocols/pubsub/rpc/protobuf.nim +++ b/libp2p/protocols/pubsub/rpc/protobuf.nim @@ -23,9 +23,19 @@ proc write*(pb: var ProtoBuffer, field: int, graft: ControlGraft) = ipb.finish() pb.write(field, ipb) +proc write*(pb: var ProtoBuffer, field: int, infoMsg: PeerInfoMsg) = + var ipb = initProtoBuffer() + ipb.write(1, infoMsg.peerID) + ipb.write(2, infoMsg.signedPeerRecord) + ipb.finish() + pb.write(field, ipb) + proc write*(pb: var ProtoBuffer, field: int, prune: ControlPrune) = var ipb = initProtoBuffer() ipb.write(1, prune.topicID) + for peer in prune.peers: + ipb.write(2, peer) + ipb.write(3, prune.backoff) ipb.finish() pb.write(field, ipb) @@ -103,6 +113,7 @@ proc decodePrune*(pb: ProtoBuffer): ProtoResult[ControlPrune] {. trace "decodePrune: read topicId", topic_id = control.topicId else: trace "decodePrune: topicId is missing" + # TODO gossip 1.1 fields ok(control) proc decodeIHave*(pb: ProtoBuffer): ProtoResult[ControlIHave] {. diff --git a/libp2p/protocols/secure/secure.nim b/libp2p/protocols/secure/secure.nim index e477be1..c7c9b90 100644 --- a/libp2p/protocols/secure/secure.nim +++ b/libp2p/protocols/secure/secure.nim @@ -15,6 +15,8 @@ import ../protocol, ../../multiaddress, ../../peerinfo +export protocol + logScope: topics = "secure" diff --git a/libp2p/switch.nim b/libp2p/switch.nim index 20f579a..16400cb 100644 --- a/libp2p/switch.nim +++ b/libp2p/switch.nim @@ -316,7 +316,7 @@ proc upgradeIncoming(s: Switch, conn: Connection) {.async, gcsafe.} = # add the muxer for muxer in s.muxers.values: - ms.addHandler(muxer.codec, muxer) + ms.addHandler(muxer.codecs, muxer) # handle subsequent secure requests await ms.handle(sconn) @@ -436,30 +436,35 @@ proc internalConnect(s: Switch, proc connect*(s: Switch, peerId: PeerID, addrs: seq[MultiAddress]) {.async.} = discard await s.internalConnect(peerId, addrs) -proc negotiateStream(s: Switch, conn: Connection, proto: string): Future[Connection] {.async.} = - trace "Negotiating stream", conn, proto - if not await s.ms.select(conn, proto): +proc negotiateStream(s: Switch, conn: Connection, protos: seq[string]): Future[Connection] {.async.} = + trace "Negotiating stream", conn, protos + let selected = await s.ms.select(conn, protos) + if not protos.contains(selected): await conn.close() - raise newException(DialFailedError, "Unable to select sub-protocol " & proto) + raise newException(DialFailedError, "Unable to select sub-protocol " & $protos) return conn proc dial*(s: Switch, peerId: PeerID, - proto: string): Future[Connection] {.async.} = - trace "Dialling (existing)", peerId, proto + protos: seq[string]): Future[Connection] {.async.} = + trace "Dialing (existing)", peerId, protos let stream = await s.connManager.getMuxedStream(peerId) if stream.isNil: raise newException(DialFailedError, "Couldn't get muxed stream") - return await s.negotiateStream(stream, proto) + return await s.negotiateStream(stream, protos) + +proc dial*(s: Switch, + peerId: PeerID, + proto: string): Future[Connection] = dial(s, peerId, @[proto]) proc dial*(s: Switch, peerId: PeerID, addrs: seq[MultiAddress], - proto: string): + protos: seq[string]): Future[Connection] {.async.} = - trace "Dialling (new)", peerId, proto + trace "Dialing (new)", peerId, protos let conn = await s.internalConnect(peerId, addrs) trace "Opening stream", conn let stream = await s.connManager.getMuxedStream(conn) @@ -476,16 +481,22 @@ proc dial*(s: Switch, await conn.close() raise newException(DialFailedError, "Couldn't get muxed stream") - return await s.negotiateStream(stream, proto) + return await s.negotiateStream(stream, protos) except CancelledError as exc: trace "Dial canceled", conn await cleanup() raise exc except CatchableError as exc: - trace "Error dialing", conn, msg = exc.msg + debug "Error dialing", conn, msg = exc.msg await cleanup() raise exc +proc dial*(s: Switch, + peerId: PeerID, + addrs: seq[MultiAddress], + proto: string): + Future[Connection] = dial(s, peerId, addrs, @[proto]) + proc mount*[T: LPProtocol](s: Switch, proto: T) {.gcsafe.} = if isNil(proto.handler): raise newException(CatchableError, @@ -495,7 +506,7 @@ proc mount*[T: LPProtocol](s: Switch, proto: T) {.gcsafe.} = raise newException(CatchableError, "Protocol has to define a codec string") - s.ms.addHandler(proto.codec, proto) + s.ms.addHandler(proto.codecs, proto) proc start*(s: Switch): Future[seq[Future[void]]] {.async, gcsafe.} = trace "starting switch for peer", peerInfo = s.peerInfo diff --git a/scripts/build_p2pd.sh b/scripts/build_p2pd.sh index fedaed9..1f1765e 100644 --- a/scripts/build_p2pd.sh +++ b/scripts/build_p2pd.sh @@ -17,7 +17,7 @@ if [[ ! -e "$SUBREPO_DIR" ]]; then # we're probably in nim-libp2p's CI SUBREPO_DIR="go-libp2p-daemon" rm -rf "$SUBREPO_DIR" - git clone -q https://github.com/status-im/go-libp2p-daemon + git clone -q https://github.com/libp2p/go-libp2p-daemon cd "$SUBREPO_DIR" git checkout -q $LIBP2P_COMMIT cd .. diff --git a/tests/pubsub/testgossipinternal.nim b/tests/pubsub/testgossipinternal.nim index 5d48b19..4899ee1 100644 --- a/tests/pubsub/testgossipinternal.nim +++ b/tests/pubsub/testgossipinternal.nim @@ -32,12 +32,23 @@ suite "GossipSub internal": # echo tracker.dump() check tracker.isLeaked() == false + test "topic params": + proc testRun(): Future[bool] {.async.} = + let params = TopicParams.init() + params.validateParameters().tryGet() + + return true + + check: + waitFor(testRun()) == true + test "`rebalanceMesh` Degree Lo": proc testRun(): Future[bool] {.async.} = let gossipSub = TestGossipSub.init(newStandardSwitch()) let topic = "foobar" gossipSub.mesh[topic] = initHashSet[PubSubPeer]() + gossipSub.topicParams[topic] = TopicParams.init() var conns = newSeq[Connection]() gossipSub.gossipsub[topic] = initHashSet[PubSubPeer]() @@ -47,12 +58,13 @@ suite "GossipSub internal": let peerInfo = randomPeerInfo() conn.peerInfo = peerInfo let peer = gossipSub.getPubSubPeer(peerInfo.peerId) + gossipSub.onNewPeer(peer) gossipSub.peers[peerInfo.peerId] = peer - gossipSub.mesh[topic].incl(peer) + gossipSub.gossipsub[topic].incl(peer) check gossipSub.peers.len == 15 await gossipSub.rebalanceMesh(topic) - check gossipSub.mesh[topic].len == GossipSubD + check gossipSub.mesh[topic].len == GossipSubD + 2 # account opportunistic grafts await allFuturesThrowing(conns.mapIt(it.close())) await gossipSub.switch.stop() @@ -67,22 +79,24 @@ suite "GossipSub internal": let topic = "foobar" gossipSub.mesh[topic] = initHashSet[PubSubPeer]() - gossipSub.topics[topic] = Topic() # has to be in topics to rebalance + gossipSub.topicParams[topic] = TopicParams.init() - gossipSub.gossipsub[topic] = initHashSet[PubSubPeer]() var conns = newSeq[Connection]() + gossipSub.gossipsub[topic] = initHashSet[PubSubPeer]() for i in 0..<15: let conn = newBufferStream(noop) conns &= conn let peerInfo = PeerInfo.init(PrivateKey.random(ECDSA, rng[]).get()) conn.peerInfo = peerInfo let peer = gossipSub.getPubSubPeer(peerInfo.peerId) + gossipSub.onNewPeer(peer) + gossipSub.grafted(peer, topic) gossipSub.peers[peerInfo.peerId] = peer gossipSub.mesh[topic].incl(peer) check gossipSub.mesh[topic].len == 15 await gossipSub.rebalanceMesh(topic) - check gossipSub.mesh[topic].len == GossipSubD + check gossipSub.mesh[topic].len == GossipSubD + gossipSub.parameters.dScore await allFuturesThrowing(conns.mapIt(it.close())) await gossipSub.switch.stop() @@ -101,6 +115,7 @@ suite "GossipSub internal": let topic = "foobar" gossipSub.gossipsub[topic] = initHashSet[PubSubPeer]() + gossipSub.topicParams[topic] = TopicParams.init() var conns = newSeq[Connection]() for i in 0..<15: @@ -109,6 +124,7 @@ suite "GossipSub internal": var peerInfo = randomPeerInfo() conn.peerInfo = peerInfo let peer = gossipSub.getPubSubPeer(peerInfo.peerId) + gossipSub.onNewPeer(peer) peer.handler = handler gossipSub.gossipsub[topic].incl(peer) @@ -132,6 +148,7 @@ suite "GossipSub internal": discard let topic = "foobar" + gossipSub.topicParams[topic] = TopicParams.init() gossipSub.fanout[topic] = initHashSet[PubSubPeer]() gossipSub.lastFanoutPubSub[topic] = Moment.fromNow(1.millis) await sleepAsync(5.millis) # allow the topic to expire @@ -143,6 +160,7 @@ suite "GossipSub internal": let peerInfo = PeerInfo.init(PrivateKey.random(ECDSA, rng[]).get()) conn.peerInfo = peerInfo let peer = gossipSub.getPubSubPeer(peerInfo.peerId) + gossipSub.onNewPeer(peer) peer.handler = handler gossipSub.fanout[topic].incl(peer) @@ -168,6 +186,8 @@ suite "GossipSub internal": let topic1 = "foobar1" let topic2 = "foobar2" + gossipSub.topicParams[topic1] = TopicParams.init() + gossipSub.topicParams[topic2] = TopicParams.init() gossipSub.fanout[topic1] = initHashSet[PubSubPeer]() gossipSub.fanout[topic2] = initHashSet[PubSubPeer]() gossipSub.lastFanoutPubSub[topic1] = Moment.fromNow(1.millis) @@ -181,6 +201,7 @@ suite "GossipSub internal": let peerInfo = randomPeerInfo() conn.peerInfo = peerInfo let peer = gossipSub.getPubSubPeer(peerInfo.peerId) + gossipSub.onNewPeer(peer) peer.handler = handler gossipSub.fanout[topic1].incl(peer) gossipSub.fanout[topic2].incl(peer) @@ -208,6 +229,7 @@ suite "GossipSub internal": discard let topic = "foobar" + gossipSub.topicParams[topic] = TopicParams.init() gossipSub.mesh[topic] = initHashSet[PubSubPeer]() gossipSub.fanout[topic] = initHashSet[PubSubPeer]() gossipSub.gossipsub[topic] = initHashSet[PubSubPeer]() @@ -220,10 +242,12 @@ suite "GossipSub internal": let peerInfo = randomPeerInfo() conn.peerInfo = peerInfo let peer = gossipSub.getPubSubPeer(peerInfo.peerId) + gossipSub.onNewPeer(peer) peer.handler = handler if i mod 2 == 0: gossipSub.fanout[topic].incl(peer) else: + gossipSub.grafted(peer, topic) gossipSub.mesh[topic].incl(peer) # generate gossipsub (free standing) peers @@ -233,6 +257,7 @@ suite "GossipSub internal": let peerInfo = randomPeerInfo() conn.peerInfo = peerInfo let peer = gossipSub.getPubSubPeer(peerInfo.peerId) + gossipSub.onNewPeer(peer) peer.handler = handler gossipSub.gossipsub[topic].incl(peer) @@ -273,6 +298,7 @@ suite "GossipSub internal": discard let topic = "foobar" + gossipSub.topicParams[topic] = TopicParams.init() gossipSub.fanout[topic] = initHashSet[PubSubPeer]() gossipSub.gossipsub[topic] = initHashSet[PubSubPeer]() var conns = newSeq[Connection]() @@ -282,6 +308,7 @@ suite "GossipSub internal": let peerInfo = randomPeerInfo() conn.peerInfo = peerInfo let peer = gossipSub.getPubSubPeer(peerInfo.peerId) + gossipSub.onNewPeer(peer) peer.handler = handler if i mod 2 == 0: gossipSub.fanout[topic].incl(peer) @@ -318,6 +345,7 @@ suite "GossipSub internal": discard let topic = "foobar" + gossipSub.topicParams[topic] = TopicParams.init() gossipSub.mesh[topic] = initHashSet[PubSubPeer]() gossipSub.gossipsub[topic] = initHashSet[PubSubPeer]() var conns = newSeq[Connection]() @@ -327,9 +355,11 @@ suite "GossipSub internal": let peerInfo = randomPeerInfo() conn.peerInfo = peerInfo let peer = gossipSub.getPubSubPeer(peerInfo.peerId) + gossipSub.onNewPeer(peer) peer.handler = handler if i mod 2 == 0: gossipSub.mesh[topic].incl(peer) + gossipSub.grafted(peer, topic) else: gossipSub.gossipsub[topic].incl(peer) @@ -363,6 +393,7 @@ suite "GossipSub internal": discard let topic = "foobar" + gossipSub.topicParams[topic] = TopicParams.init() gossipSub.mesh[topic] = initHashSet[PubSubPeer]() gossipSub.fanout[topic] = initHashSet[PubSubPeer]() var conns = newSeq[Connection]() @@ -372,9 +403,11 @@ suite "GossipSub internal": let peerInfo = randomPeerInfo() conn.peerInfo = peerInfo let peer = gossipSub.getPubSubPeer(peerInfo.peerId) + gossipSub.onNewPeer(peer) peer.handler = handler if i mod 2 == 0: gossipSub.mesh[topic].incl(peer) + gossipSub.grafted(peer, topic) else: gossipSub.fanout[topic].incl(peer) diff --git a/tests/pubsub/testgossipinternal10.nim b/tests/pubsub/testgossipinternal10.nim new file mode 100644 index 0000000..4f74e23 --- /dev/null +++ b/tests/pubsub/testgossipinternal10.nim @@ -0,0 +1,401 @@ +include ../../libp2p/protocols/pubsub/gossipsub10 + +{.used.} + +import unittest, bearssl +import stew/byteutils +import ../../libp2p/standard_setup +import ../../libp2p/errors +import ../../libp2p/crypto/crypto +import ../../libp2p/stream/bufferstream + +import ../helpers + +type + TestGossipSub = ref object of GossipSub + +proc noop(data: seq[byte]) {.async, gcsafe.} = discard + +proc getPubSubPeer(p: TestGossipSub, peerId: PeerID): auto = + proc getConn(): Future[(Connection, RPCMsg)] {.async.} = + let conn = await p.switch.dial(peerId, GossipSubCodec) + return (conn, RPCMsg.withSubs(toSeq(p.topics.keys), true)) + + newPubSubPeer(peerId, getConn, GossipSubCodec) + +proc randomPeerInfo(): PeerInfo = + PeerInfo.init(PrivateKey.random(ECDSA, rng[]).get()) + +suite "GossipSub internal": + teardown: + for tracker in testTrackers(): + # echo tracker.dump() + check tracker.isLeaked() == false + + test "`rebalanceMesh` Degree Lo": + proc testRun(): Future[bool] {.async.} = + let gossipSub = TestGossipSub.init(newStandardSwitch()) + + let topic = "foobar" + gossipSub.mesh[topic] = initHashSet[PubSubPeer]() + + var conns = newSeq[Connection]() + gossipSub.gossipsub[topic] = initHashSet[PubSubPeer]() + for i in 0..<15: + let conn = newBufferStream(noop) + conns &= conn + let peerInfo = randomPeerInfo() + conn.peerInfo = peerInfo + let peer = gossipSub.getPubSubPeer(peerInfo.peerId) + gossipSub.peers[peerInfo.peerId] = peer + gossipSub.mesh[topic].incl(peer) + + check gossipSub.peers.len == 15 + await gossipSub.rebalanceMesh(topic) + check gossipSub.mesh[topic].len == GossipSubD + + await allFuturesThrowing(conns.mapIt(it.close())) + await gossipSub.switch.stop() + result = true + + check: + waitFor(testRun()) == true + + test "`rebalanceMesh` Degree Hi": + proc testRun(): Future[bool] {.async.} = + let gossipSub = TestGossipSub.init(newStandardSwitch()) + + let topic = "foobar" + gossipSub.mesh[topic] = initHashSet[PubSubPeer]() + gossipSub.topics[topic] = Topic() # has to be in topics to rebalance + + gossipSub.gossipsub[topic] = initHashSet[PubSubPeer]() + var conns = newSeq[Connection]() + for i in 0..<15: + let conn = newBufferStream(noop) + conns &= conn + let peerInfo = PeerInfo.init(PrivateKey.random(ECDSA, rng[]).get()) + conn.peerInfo = peerInfo + let peer = gossipSub.getPubSubPeer(peerInfo.peerId) + gossipSub.peers[peerInfo.peerId] = peer + gossipSub.mesh[topic].incl(peer) + + check gossipSub.mesh[topic].len == 15 + await gossipSub.rebalanceMesh(topic) + check gossipSub.mesh[topic].len == GossipSubD + + await allFuturesThrowing(conns.mapIt(it.close())) + await gossipSub.switch.stop() + + result = true + + check: + waitFor(testRun()) == true + + test "`replenishFanout` Degree Lo": + proc testRun(): Future[bool] {.async.} = + let gossipSub = TestGossipSub.init(newStandardSwitch()) + + proc handler(peer: PubSubPeer, msg: RPCMsg) {.async.} = + discard + + let topic = "foobar" + gossipSub.gossipsub[topic] = initHashSet[PubSubPeer]() + + var conns = newSeq[Connection]() + for i in 0..<15: + let conn = newBufferStream(noop) + conns &= conn + var peerInfo = randomPeerInfo() + conn.peerInfo = peerInfo + let peer = gossipSub.getPubSubPeer(peerInfo.peerId) + peer.handler = handler + gossipSub.gossipsub[topic].incl(peer) + + check gossipSub.gossipsub[topic].len == 15 + gossipSub.replenishFanout(topic) + check gossipSub.fanout[topic].len == GossipSubD + + await allFuturesThrowing(conns.mapIt(it.close())) + await gossipSub.switch.stop() + + result = true + + check: + waitFor(testRun()) == true + + test "`dropFanoutPeers` drop expired fanout topics": + proc testRun(): Future[bool] {.async.} = + let gossipSub = TestGossipSub.init(newStandardSwitch()) + + proc handler(peer: PubSubPeer, msg: RPCMsg) {.async.} = + discard + + let topic = "foobar" + gossipSub.fanout[topic] = initHashSet[PubSubPeer]() + gossipSub.lastFanoutPubSub[topic] = Moment.fromNow(1.millis) + await sleepAsync(5.millis) # allow the topic to expire + + var conns = newSeq[Connection]() + for i in 0..<6: + let conn = newBufferStream(noop) + conns &= conn + let peerInfo = PeerInfo.init(PrivateKey.random(ECDSA, rng[]).get()) + conn.peerInfo = peerInfo + let peer = gossipSub.getPubSubPeer(peerInfo.peerId) + peer.handler = handler + gossipSub.fanout[topic].incl(peer) + + check gossipSub.fanout[topic].len == GossipSubD + + gossipSub.dropFanoutPeers() + check topic notin gossipSub.fanout + + await allFuturesThrowing(conns.mapIt(it.close())) + await gossipSub.switch.stop() + + result = true + + check: + waitFor(testRun()) == true + + test "`dropFanoutPeers` leave unexpired fanout topics": + proc testRun(): Future[bool] {.async.} = + let gossipSub = TestGossipSub.init(newStandardSwitch()) + + proc handler(peer: PubSubPeer, msg: RPCMsg) {.async.} = + discard + + let topic1 = "foobar1" + let topic2 = "foobar2" + gossipSub.fanout[topic1] = initHashSet[PubSubPeer]() + gossipSub.fanout[topic2] = initHashSet[PubSubPeer]() + gossipSub.lastFanoutPubSub[topic1] = Moment.fromNow(1.millis) + gossipSub.lastFanoutPubSub[topic2] = Moment.fromNow(1.minutes) + await sleepAsync(5.millis) # allow the topic to expire + + var conns = newSeq[Connection]() + for i in 0..<6: + let conn = newBufferStream(noop) + conns &= conn + let peerInfo = randomPeerInfo() + conn.peerInfo = peerInfo + let peer = gossipSub.getPubSubPeer(peerInfo.peerId) + peer.handler = handler + gossipSub.fanout[topic1].incl(peer) + gossipSub.fanout[topic2].incl(peer) + + check gossipSub.fanout[topic1].len == GossipSubD + check gossipSub.fanout[topic2].len == GossipSubD + + gossipSub.dropFanoutPeers() + check topic1 notin gossipSub.fanout + check topic2 in gossipSub.fanout + + await allFuturesThrowing(conns.mapIt(it.close())) + await gossipSub.switch.stop() + + result = true + + check: + waitFor(testRun()) == true + + test "`getGossipPeers` - should gather up to degree D non intersecting peers": + proc testRun(): Future[bool] {.async.} = + let gossipSub = TestGossipSub.init(newStandardSwitch()) + + proc handler(peer: PubSubPeer, msg: RPCMsg) {.async.} = + discard + + let topic = "foobar" + gossipSub.mesh[topic] = initHashSet[PubSubPeer]() + gossipSub.fanout[topic] = initHashSet[PubSubPeer]() + gossipSub.gossipsub[topic] = initHashSet[PubSubPeer]() + var conns = newSeq[Connection]() + + # generate mesh and fanout peers + for i in 0..<30: + let conn = newBufferStream(noop) + conns &= conn + let peerInfo = randomPeerInfo() + conn.peerInfo = peerInfo + let peer = gossipSub.getPubSubPeer(peerInfo.peerId) + peer.handler = handler + if i mod 2 == 0: + gossipSub.fanout[topic].incl(peer) + else: + gossipSub.mesh[topic].incl(peer) + + # generate gossipsub (free standing) peers + for i in 0..<15: + let conn = newBufferStream(noop) + conns &= conn + let peerInfo = randomPeerInfo() + conn.peerInfo = peerInfo + let peer = gossipSub.getPubSubPeer(peerInfo.peerId) + peer.handler = handler + gossipSub.gossipsub[topic].incl(peer) + + # generate messages + var seqno = 0'u64 + for i in 0..5: + let conn = newBufferStream(noop) + conns &= conn + let peerInfo = randomPeerInfo() + conn.peerInfo = peerInfo + inc seqno + let msg = Message.init(peerInfo, ("HELLO" & $i).toBytes(), topic, seqno, false) + gossipSub.mcache.put(gossipSub.msgIdProvider(msg), msg) + + check gossipSub.fanout[topic].len == 15 + check gossipSub.mesh[topic].len == 15 + check gossipSub.gossipsub[topic].len == 15 + + let peers = gossipSub.getGossipPeers() + check peers.len == GossipSubD + for p in peers.keys: + check not gossipSub.fanout.hasPeerID(topic, p.peerId) + check not gossipSub.mesh.hasPeerID(topic, p.peerId) + + await allFuturesThrowing(conns.mapIt(it.close())) + await gossipSub.switch.stop() + + result = true + + check: + waitFor(testRun()) == true + + test "`getGossipPeers` - should not crash on missing topics in mesh": + proc testRun(): Future[bool] {.async.} = + let gossipSub = TestGossipSub.init(newStandardSwitch()) + + proc handler(peer: PubSubPeer, msg: RPCMsg) {.async.} = + discard + + let topic = "foobar" + gossipSub.fanout[topic] = initHashSet[PubSubPeer]() + gossipSub.gossipsub[topic] = initHashSet[PubSubPeer]() + var conns = newSeq[Connection]() + for i in 0..<30: + let conn = newBufferStream(noop) + conns &= conn + let peerInfo = randomPeerInfo() + conn.peerInfo = peerInfo + let peer = gossipSub.getPubSubPeer(peerInfo.peerId) + peer.handler = handler + if i mod 2 == 0: + gossipSub.fanout[topic].incl(peer) + else: + gossipSub.gossipsub[topic].incl(peer) + + # generate messages + var seqno = 0'u64 + for i in 0..5: + let conn = newBufferStream(noop) + conns &= conn + let peerInfo = randomPeerInfo() + conn.peerInfo = peerInfo + inc seqno + let msg = Message.init(peerInfo, ("HELLO" & $i).toBytes(), topic, seqno, false) + gossipSub.mcache.put(gossipSub.msgIdProvider(msg), msg) + + let peers = gossipSub.getGossipPeers() + check peers.len == GossipSubD + + await allFuturesThrowing(conns.mapIt(it.close())) + await gossipSub.switch.stop() + + result = true + + check: + waitFor(testRun()) == true + + test "`getGossipPeers` - should not crash on missing topics in fanout": + proc testRun(): Future[bool] {.async.} = + let gossipSub = TestGossipSub.init(newStandardSwitch()) + + proc handler(peer: PubSubPeer, msg: RPCMsg) {.async.} = + discard + + let topic = "foobar" + gossipSub.mesh[topic] = initHashSet[PubSubPeer]() + gossipSub.gossipsub[topic] = initHashSet[PubSubPeer]() + var conns = newSeq[Connection]() + for i in 0..<30: + let conn = newBufferStream(noop) + conns &= conn + let peerInfo = randomPeerInfo() + conn.peerInfo = peerInfo + let peer = gossipSub.getPubSubPeer(peerInfo.peerId) + peer.handler = handler + if i mod 2 == 0: + gossipSub.mesh[topic].incl(peer) + else: + gossipSub.gossipsub[topic].incl(peer) + + # generate messages + var seqno = 0'u64 + for i in 0..5: + let conn = newBufferStream(noop) + conns &= conn + let peerInfo = randomPeerInfo() + conn.peerInfo = peerInfo + inc seqno + let msg = Message.init(peerInfo, ("HELLO" & $i).toBytes(), topic, seqno, false) + gossipSub.mcache.put(gossipSub.msgIdProvider(msg), msg) + + let peers = gossipSub.getGossipPeers() + check peers.len == GossipSubD + + await allFuturesThrowing(conns.mapIt(it.close())) + await gossipSub.switch.stop() + + result = true + + check: + waitFor(testRun()) == true + + test "`getGossipPeers` - should not crash on missing topics in gossip": + proc testRun(): Future[bool] {.async.} = + let gossipSub = TestGossipSub.init(newStandardSwitch()) + + proc handler(peer: PubSubPeer, msg: RPCMsg) {.async.} = + discard + + let topic = "foobar" + gossipSub.mesh[topic] = initHashSet[PubSubPeer]() + gossipSub.fanout[topic] = initHashSet[PubSubPeer]() + var conns = newSeq[Connection]() + for i in 0..<30: + let conn = newBufferStream(noop) + conns &= conn + let peerInfo = randomPeerInfo() + conn.peerInfo = peerInfo + let peer = gossipSub.getPubSubPeer(peerInfo.peerId) + peer.handler = handler + if i mod 2 == 0: + gossipSub.mesh[topic].incl(peer) + else: + gossipSub.fanout[topic].incl(peer) + + # generate messages + var seqno = 0'u64 + for i in 0..5: + let conn = newBufferStream(noop) + conns &= conn + let peerInfo = randomPeerInfo() + conn.peerInfo = peerInfo + inc seqno + let msg = Message.init(peerInfo, ("bar" & $i).toBytes(), topic, seqno, false) + gossipSub.mcache.put(gossipSub.msgIdProvider(msg), msg) + + let peers = gossipSub.getGossipPeers() + check peers.len == 0 + + await allFuturesThrowing(conns.mapIt(it.close())) + await gossipSub.switch.stop() + + result = true + + check: + waitFor(testRun()) == true diff --git a/tests/pubsub/testgossipsub.nim b/tests/pubsub/testgossipsub.nim index 924836e..9f56b2f 100644 --- a/tests/pubsub/testgossipsub.nim +++ b/tests/pubsub/testgossipsub.nim @@ -20,10 +20,14 @@ import utils, ../../libp2p/[errors, crypto/crypto, protocols/pubsub/pubsub, protocols/pubsub/pubsubpeer, - protocols/pubsub/gossipsub, protocols/pubsub/peertable, protocols/pubsub/rpc/messages] +when defined(fallback_gossipsub_10): + import ../../libp2p/protocols/pubsub/gossipsub10 +else: + import ../../libp2p/protocols/pubsub/gossipsub + import ../helpers proc waitSub(sender, receiver: auto; key: string) {.async, gcsafe.} = @@ -34,6 +38,13 @@ proc waitSub(sender, receiver: auto; key: string) {.async, gcsafe.} = # peers can be inside `mesh` and `fanout`, not just `gossipsub` var ceil = 15 let fsub = GossipSub(sender) + let ev = newAsyncEvent() + fsub.heartbeatEvents.add(ev) + + # await first heartbeat + await ev.wait() + ev.clear() + while (not fsub.gossipsub.hasKey(key) or not fsub.gossipsub.hasPeerID(key, receiver.peerInfo.peerId)) and (not fsub.mesh.hasKey(key) or @@ -41,7 +52,11 @@ proc waitSub(sender, receiver: auto; key: string) {.async, gcsafe.} = (not fsub.fanout.hasKey(key) or not fsub.fanout.hasPeerID(key , receiver.peerInfo.peerId)): trace "waitSub sleeping..." - await sleepAsync(1.seconds) + + # await more heartbeats + await ev.wait() + ev.clear() + dec ceil doAssert(ceil > 0, "waitSub timeout!") @@ -90,6 +105,12 @@ suite "GossipSub": await nodes[0].subscribe("foobar", handler) await nodes[1].subscribe("foobar", handler) + var subs: seq[Future[void]] + subs &= waitSub(nodes[1], nodes[0], "foobar") + subs &= waitSub(nodes[0], nodes[1], "foobar") + + await allFuturesThrowing(subs) + var validatorFut = newFuture[bool]() proc validator(topic: string, message: Message): @@ -143,6 +164,19 @@ suite "GossipSub": await nodes[0].subscribe("foobar", handler) await nodes[1].subscribe("foobar", handler) + var subs: seq[Future[void]] + subs &= waitSub(nodes[1], nodes[0], "foobar") + subs &= waitSub(nodes[0], nodes[1], "foobar") + + await allFuturesThrowing(subs) + + let gossip1 = GossipSub(nodes[0]) + let gossip2 = GossipSub(nodes[1]) + + check: + gossip1.mesh["foobar"].len == 1 and "foobar" notin gossip1.fanout + gossip2.mesh["foobar"].len == 1 and "foobar" notin gossip2.fanout + var validatorFut = newFuture[bool]() proc validator(topic: string, message: Message): @@ -155,13 +189,6 @@ suite "GossipSub": check (await validatorFut) == true - let gossip1 = GossipSub(nodes[0]) - let gossip2 = GossipSub(nodes[1]) - - check: - gossip1.mesh["foobar"].len == 1 and "foobar" notin gossip1.fanout - gossip2.mesh["foobar"].len == 1 and "foobar" notin gossip2.fanout - await allFuturesThrowing( nodes[0].switch.stop(), nodes[1].switch.stop() @@ -526,7 +553,7 @@ suite "GossipSub": subs &= dialer.subscribe("foobar", handler) - await allFuturesThrowing(subs) + await allFuturesThrowing(subs).wait(30.seconds) tryPublish await wait(nodes[0].publish("foobar", toBytes("from node " & @@ -543,8 +570,6 @@ suite "GossipSub": check: "foobar" in gossip.gossipsub - gossip.fanout.len == 0 - gossip.mesh["foobar"].len > 0 await allFuturesThrowing( nodes.mapIt( @@ -584,9 +609,9 @@ suite "GossipSub": seenFut.complete() subs &= dialer.subscribe("foobar", handler) + subs &= waitSub(nodes[0], dialer, "foobar") await allFuturesThrowing(subs) - tryPublish await wait(nodes[0].publish("foobar", toBytes("from node " & $nodes[1].peerInfo.peerId)), diff --git a/tests/pubsub/utils.nim b/tests/pubsub/utils.nim index 0c0a1d7..030af27 100644 --- a/tests/pubsub/utils.nim +++ b/tests/pubsub/utils.nim @@ -8,9 +8,13 @@ import chronos import ../../libp2p/[standard_setup, protocols/pubsub/pubsub, protocols/pubsub/floodsub, - protocols/pubsub/gossipsub, protocols/secure/secure] +when defined(fallback_gossipsub_10): + import ../../libp2p/protocols/pubsub/gossipsub10 +else: + import ../../libp2p/protocols/pubsub/gossipsub + export standard_setup randomize() @@ -18,9 +22,7 @@ randomize() proc generateNodes*( num: Natural, secureManagers: openarray[SecureProtocol] = [ - # array cos order matters - SecureProtocol.Secio, - SecureProtocol.Noise, + SecureProtocol.Noise ], msgIdProvider: MsgIdProvider = nil, gossip: bool = false, @@ -36,7 +38,8 @@ proc generateNodes*( triggerSelf = triggerSelf, verifySignature = verifySignature, sign = sign, - msgIdProvider = msgIdProvider).PubSub + msgIdProvider = msgIdProvider, + parameters = (var p = GossipSubParams.init(); p.floodPublish = false; p)).PubSub else: FloodSub.init( switch = switch, diff --git a/tests/testinterop.nim b/tests/testinterop.nim index c2fb965..e684611 100644 --- a/tests/testinterop.nim +++ b/tests/testinterop.nim @@ -25,8 +25,8 @@ import ../libp2p/[daemon/daemonapi, transports/tcptransport, protocols/secure/secure, protocols/pubsub/pubsub, - protocols/pubsub/gossipsub, - protocols/pubsub/floodsub] + protocols/pubsub/floodsub, + protocols/pubsub/gossipsub] type # TODO: Unify both PeerInfo structs @@ -139,7 +139,7 @@ proc testPubSubNodePublish(gossip: bool = false, let daemonNode = await newDaemonApi(flags) let daemonPeer = await daemonNode.identity() let nativeNode = newStandardSwitch( - secureManagers = [SecureProtocol.Secio], + secureManagers = [SecureProtocol.Noise], outTimeout = 5.minutes) let pubsub = if gossip: @@ -203,6 +203,8 @@ suite "Interop": # # echo tracker.dump() # # check tracker.isLeaked() == false + # TODO: this test is failing sometimes on windows + # For some reason we receive EOF before test 4 sometimes test "native -> daemon multiple reads and writes": proc runTests(): Future[bool] {.async.} = var protos = @["/test-stream"] @@ -264,7 +266,7 @@ suite "Interop": copyMem(addr expect[0], addr buffer.buffer[0], len(expect)) let nativeNode = newStandardSwitch( - secureManagers = [SecureProtocol.Secio], + secureManagers = [SecureProtocol.Noise], outTimeout = 5.minutes) let awaiters = await nativeNode.start() @@ -358,7 +360,7 @@ suite "Interop": proto.codec = protos[0] # codec let nativeNode = newStandardSwitch( - secureManagers = [SecureProtocol.Secio], outTimeout = 5.minutes) + secureManagers = [SecureProtocol.Noise], outTimeout = 5.minutes) nativeNode.mount(proto)