diff --git a/libp2p/cid.nim b/libp2p/cid.nim index 400e285..1a6e6ef 100644 --- a/libp2p/cid.nim +++ b/libp2p/cid.nim @@ -62,7 +62,6 @@ const ] proc initCidCodeTable(): Table[int, MultiCodec] {.compileTime.} = - result = initTable[int, MultiCodec]() for item in ContentIdsList: result[int(item)] = item diff --git a/libp2p/connmanager.nim b/libp2p/connmanager.nim index 818af37..2a55b57 100644 --- a/libp2p/connmanager.nim +++ b/libp2p/connmanager.nim @@ -93,8 +93,6 @@ proc init*(C: type ConnManager, raiseAssert "Invalid connection counts!" C(maxConnsPerPeer: maxConnsPerPeer, - conns: initTable[PeerID, HashSet[Connection]](), - muxed: initTable[Connection, MuxerHolder](), inSema: inSema, outSema: outSema) diff --git a/libp2p/daemon/daemonapi.nim b/libp2p/daemon/daemonapi.nim index 2319a0a..6e1ced9 100644 --- a/libp2p/daemon/daemonapi.nim +++ b/libp2p/daemon/daemonapi.nim @@ -645,7 +645,6 @@ proc newDaemonApi*(flags: set[P2PDaemonFlags] = {}, api.servers = newSeq[P2PServer]() api.pattern = patternForChild api.ucounter = 1 - api.handlers = initTable[string, P2PStreamCallback]() if len(sockpath) == 0: api.flags.excl(NoProcessCtrl) diff --git a/libp2p/multiaddress.nim b/libp2p/multiaddress.nim index 456c866..628e43b 100644 --- a/libp2p/multiaddress.nim +++ b/libp2p/multiaddress.nim @@ -437,7 +437,6 @@ const proc initMultiAddressCodeTable(): Table[MultiCodec, MAProtocol] {.compileTime.} = - result = initTable[MultiCodec, MAProtocol]() for item in ProtocolsList: result[item.mcodec] = item diff --git a/libp2p/multibase.nim b/libp2p/multibase.nim index 8d392f1..9867d6f 100644 --- a/libp2p/multibase.nim +++ b/libp2p/multibase.nim @@ -328,12 +328,10 @@ const ] proc initMultiBaseCodeTable(): Table[char, MBCodec] {.compileTime.} = - result = initTable[char, MBCodec]() for item in MultibaseCodecs: result[item.code] = item proc initMultiBaseNameTable(): Table[string, MBCodec] {.compileTime.} = - result = initTable[string, MBCodec]() for item in MultibaseCodecs: result[item.name] = item diff --git a/libp2p/multicodec.nim b/libp2p/multicodec.nim index 33ef889..175379a 100644 --- a/libp2p/multicodec.nim +++ b/libp2p/multicodec.nim @@ -242,12 +242,10 @@ const InvalidMultiCodec* = MultiCodec(-1) proc initMultiCodecNameTable(): Table[string, int] {.compileTime.} = - result = initTable[string, int]() for item in MultiCodecList: result[item[0]] = item[1] proc initMultiCodecCodeTable(): Table[int, string] {.compileTime.} = - result = initTable[int, string]() for item in MultiCodecList: result[item[1]] = item[0] @@ -271,7 +269,7 @@ proc `$`*(mc: MultiCodec): string = ## Returns string representation of MultiCodec ``mc``. let name = CodeCodecs.getOrDefault(int(mc), "") doAssert(name != "") - name + name proc `==`*(mc: MultiCodec, name: string): bool {.inline.} = ## Compares MultiCodec ``mc`` with string ``name``. diff --git a/libp2p/multihash.nim b/libp2p/multihash.nim index 4d7d066..09b5f66 100644 --- a/libp2p/multihash.nim +++ b/libp2p/multihash.nim @@ -319,7 +319,6 @@ const ] proc initMultiHashCodeTable(): Table[MultiCodec, MHash] {.compileTime.} = - result = initTable[MultiCodec, MHash]() for item in HashesList: result[item.mcodec] = item diff --git a/libp2p/muxers/mplex/lpchannel.nim b/libp2p/muxers/mplex/lpchannel.nim index 48c162b..054e061 100644 --- a/libp2p/muxers/mplex/lpchannel.nim +++ b/libp2p/muxers/mplex/lpchannel.nim @@ -69,7 +69,7 @@ proc open*(s: LPChannel) {.async, gcsafe.} = await s.conn.close() raise exc -method closed*(s: LPChannel): bool = +method closed*(s: LPChannel): bool {.raises: [Defect].} = s.closedLocal proc closeUnderlying(s: LPChannel): Future[void] {.async.} = diff --git a/libp2p/protocols/pubsub/floodsub.nim b/libp2p/protocols/pubsub/floodsub.nim index f713095..48bc711 100644 --- a/libp2p/protocols/pubsub/floodsub.nim +++ b/libp2p/protocols/pubsub/floodsub.nim @@ -205,6 +205,5 @@ method unsubscribeAll*(f: FloodSub, topic: string) = method initPubSub*(f: FloodSub) = procCall PubSub(f).initPubSub() - f.floodsub = initTable[string, HashSet[PubSubPeer]]() f.seen = TimedCache[MessageID].init(2.minutes) f.init() diff --git a/libp2p/protocols/pubsub/gossipsub.nim b/libp2p/protocols/pubsub/gossipsub.nim index 5244bee..2cf50a2 100644 --- a/libp2p/protocols/pubsub/gossipsub.nim +++ b/libp2p/protocols/pubsub/gossipsub.nim @@ -7,7 +7,7 @@ ## This file may not be copied, modified, or distributed except according to ## those terms. -import std/[tables, sets, options, sequtils, strutils, random, algorithm] +import std/[tables, sets, options, sequtils, random] import chronos, chronicles, metrics, bearssl import ./pubsub, ./floodsub, @@ -159,16 +159,16 @@ method init*(g: GossipSub) = g.codecs &= GossipSubCodec_10 method onNewPeer(g: GossipSub, peer: PubSubPeer) = - if peer.peerId notin g.peerStats: - g.initPeerStats(peer) - else: - # we knew this peer - # restore previously stored score - let stats = g.peerStats[peer.peerId] + g.withPeerStats(peer.peerId) do (stats: var PeerStats): + # Make sure stats and peer information match, even when reloading peer stats + # from a previous connection peer.score = stats.score peer.appScore = stats.appScore peer.behaviourPenalty = stats.behaviourPenalty + peer.iWantBudget = IWantPeerBudget + peer.iHaveBudget = IHavePeerBudget + method onPubSubPeerEvent*(p: GossipSub, peer: PubsubPeer, event: PubSubPeerEvent) {.gcsafe.} = case event.kind of PubSubPeerEventKind.Connected: @@ -216,7 +216,6 @@ method unsubscribePeer*(g: GossipSub, peer: PeerID) = g.fanout.removePeer(t, pubSubPeer) g.peerStats.withValue(peer, stats): - stats[].expire = Moment.now() + g.parameters.retainScore for topic, info in stats[].topicInfos.mpairs: info.firstMessageDeliveries = 0 @@ -294,21 +293,16 @@ method rpcHandler*(g: GossipSub, # for every topic in the message let topicParams = g.topicParams.mgetOrPut(t, TopicParams.init()) # if in mesh add more delivery score - g.peerStats.withValue(peer.peerId, pstats): - pstats[].topicInfos.withValue(t, stats): - if stats[].inMesh: + g.withPeerStats(peer.peerId) do (stats: var PeerStats): + stats.topicInfos.withValue(t, tstats): + if tstats[].inMesh: # TODO: take into account meshMessageDeliveriesWindow # score only if messages are not too old. - stats[].meshMessageDeliveries += 1 - if stats[].meshMessageDeliveries > topicParams.meshMessageDeliveriesCap: - stats[].meshMessageDeliveries = topicParams.meshMessageDeliveriesCap + tstats[].meshMessageDeliveries += 1 + if tstats[].meshMessageDeliveries > topicParams.meshMessageDeliveriesCap: + tstats[].meshMessageDeliveries = topicParams.meshMessageDeliveriesCap do: # make sure we don't loose this information - pstats[].topicInfos[t] = TopicInfo(meshMessageDeliveries: 1) - do: # make sure we don't loose this information - g.initPeerStats(peer) do: - var stats = PeerStats() stats.topicInfos[t] = TopicInfo(meshMessageDeliveries: 1) - stats # onto the next message continue @@ -359,25 +353,20 @@ method rpcHandler*(g: GossipSub, let topicParams = g.topicParams.mgetOrPut(t, TopicParams.init()) - g.peerStats.withValue(peer.peerId, pstats): - pstats[].topicInfos.withValue(t, stats): + g.withPeerStats(peer.peerId) do(stats: var PeerStats): + stats.topicInfos.withValue(t, tstats): # contribute to peer score first delivery - stats[].firstMessageDeliveries += 1 - if stats[].firstMessageDeliveries > topicParams.firstMessageDeliveriesCap: - stats[].firstMessageDeliveries = topicParams.firstMessageDeliveriesCap + tstats[].firstMessageDeliveries += 1 + if tstats[].firstMessageDeliveries > topicParams.firstMessageDeliveriesCap: + tstats[].firstMessageDeliveries = topicParams.firstMessageDeliveriesCap # if in mesh add more delivery score - if stats[].inMesh: - stats[].meshMessageDeliveries += 1 - if stats[].meshMessageDeliveries > topicParams.meshMessageDeliveriesCap: - stats[].meshMessageDeliveries = topicParams.meshMessageDeliveriesCap + if tstats[].inMesh: + tstats[].meshMessageDeliveries += 1 + if tstats[].meshMessageDeliveries > topicParams.meshMessageDeliveriesCap: + tstats[].meshMessageDeliveries = topicParams.meshMessageDeliveriesCap do: # make sure we don't loose this information - pstats[].topicInfos[t] = TopicInfo(firstMessageDeliveries: 1, meshMessageDeliveries: 1) - do: # make sure we don't loose this information - g.initPeerStats(peer) do: - var stats = PeerStats() stats.topicInfos[t] = TopicInfo(firstMessageDeliveries: 1, meshMessageDeliveries: 1) - stats g.floodsub.withValue(t, peers): toSendPeers.incl(peers[]) g.mesh.withValue(t, peers): toSendPeers.incl(peers[]) @@ -625,17 +614,10 @@ method initPubSub*(g: GossipSub) = randomize() # init the floodsub stuff here, we customize timedcache in gossip! - g.floodsub = initTable[string, HashSet[PubSubPeer]]() g.seen = TimedCache[MessageID].init(g.parameters.seenTTL) # init gossip stuff g.mcache = MCache.init(g.parameters.historyGossip, g.parameters.historyLength) - g.mesh = initTable[string, HashSet[PubSubPeer]]() # meshes - topic to peer - g.fanout = initTable[string, HashSet[PubSubPeer]]() # fanout - topic to peer - g.gossipsub = initTable[string, HashSet[PubSubPeer]]()# topic to peer map of all gossipsub peers - g.lastFanoutPubSub = initTable[string, Moment]() # last publish time for fanout topics - g.gossip = initTable[string, seq[ControlIHave]]() # pending gossip - g.control = initTable[string, ControlMessage]() # pending control messages var rng = newRng() g.randomBytes = newSeqUninitialized[byte](32) brHmacDrbgGenerate(rng[], g.randomBytes) diff --git a/libp2p/protocols/pubsub/gossipsub/behavior.nim b/libp2p/protocols/pubsub/gossipsub/behavior.nim index 1a173b1..d734762 100644 --- a/libp2p/protocols/pubsub/gossipsub/behavior.nim +++ b/libp2p/protocols/pubsub/gossipsub/behavior.nim @@ -7,7 +7,9 @@ ## This file may not be copied, modified, or distributed except according to ## those terms. -import std/[tables, strutils, sequtils, sets, algorithm] +# {.push raises: [Defect].} TODO compile error on windows due to chronicles? + +import std/[tables, sequtils, sets, algorithm] import random # for shuffle import chronos, chronicles, metrics import "."/[types, scoring] @@ -25,56 +27,48 @@ declareGauge(libp2p_gossipsub_low_peers_topics, "number of topics in mesh with a declareGauge(libp2p_gossipsub_healthy_peers_topics, "number of topics in mesh with at least dlow peers (but below dhigh)") declareCounter(libp2p_gossipsub_above_dhigh_condition, "number of above dhigh pruning branches ran", labels = ["topic"]) -proc grafted*(g: GossipSub, p: PubSubPeer, topic: string) = - g.peerStats.withValue(p.peerId, stats): +proc grafted*(g: GossipSub, p: PubSubPeer, topic: string) {.raises: [Defect].} = + g.withPeerStats(p.peerId) do (stats: var PeerStats): var info = stats.topicInfos.getOrDefault(topic) info.graftTime = Moment.now() info.meshTime = 0.seconds info.inMesh = true info.meshMessageDeliveriesActive = false - # mgetOrPut does not work, so we gotta do this without referencing stats.topicInfos[topic] = info - assert(g.peerStats[p.peerId].topicInfos[topic].inMesh == true) trace "grafted", peer=p, topic - do: - g.initPeerStats(p) - g.grafted(p, topic) -proc pruned*(g: GossipSub, p: PubSubPeer, topic: string) = +proc pruned*(g: GossipSub, p: PubSubPeer, topic: string) {.raises: [Defect].} = let backoff = Moment.fromNow(g.parameters.pruneBackoff) g.backingOff - .mgetOrPut(topic, initTable[PeerID, Moment]()) - .mgetOrPut(p.peerId, backoff) = backoff + .mgetOrPut(topic, initTable[PeerID, Moment]())[p.peerId] = backoff g.peerStats.withValue(p.peerId, stats): - if topic in stats.topicInfos: - var info = stats.topicInfos[topic] - if topic in g.topicParams: - let topicParams = g.topicParams[topic] + stats.topicInfos.withValue(topic, info): + g.topicParams.withValue(topic, topicParams): # penalize a peer that delivered no message - let threshold = topicParams.meshMessageDeliveriesThreshold - if info.inMesh and info.meshMessageDeliveriesActive and info.meshMessageDeliveries < threshold: + let threshold = topicParams[].meshMessageDeliveriesThreshold + if info[].inMesh and + info[].meshMessageDeliveriesActive and + info[].meshMessageDeliveries < threshold: let deficit = threshold - info.meshMessageDeliveries - info.meshFailurePenalty += deficit * deficit + info[].meshFailurePenalty += deficit * deficit info.inMesh = false - # mgetOrPut does not work, so we gotta do this without referencing - stats.topicInfos[topic] = info - trace "pruned", peer=p, topic -proc handleBackingOff*(t: var BackoffTable, topic: string) = +proc handleBackingOff*(t: var BackoffTable, topic: string) {.raises: [Defect].} = let now = Moment.now() var expired = toSeq(t.getOrDefault(topic).pairs()) expired.keepIf do (pair: tuple[peer: PeerID, expire: Moment]) -> bool: now >= pair.expire for (peer, _) in expired: - t.mgetOrPut(topic, initTable[PeerID, Moment]()).del(peer) + t.withValue(topic, v): + v[].del(peer) -proc peerExchangeList*(g: GossipSub, topic: string): seq[PeerInfoMsg] = +proc peerExchangeList*(g: GossipSub, topic: string): seq[PeerInfoMsg] {.raises: [Defect].} = var peers = g.gossipsub.getOrDefault(topic, initHashSet[PubSubPeer]()).toSeq() peers.keepIf do (x: PubSubPeer) -> bool: x.score >= 0.0 @@ -85,19 +79,16 @@ proc peerExchangeList*(g: GossipSub, topic: string): seq[PeerInfoMsg] = proc handleGraft*(g: GossipSub, peer: PubSubPeer, - grafts: seq[ControlGraft]): seq[ControlPrune] = + grafts: seq[ControlGraft]): seq[ControlPrune] = # {.raises: [Defect].} TODO chronicles exception on windows for graft in grafts: let topic = graft.topicID - logScope: - peer - topic - - trace "peer grafted topic" + trace "peer grafted topic", peer, topic # It is an error to GRAFT on a explicit peer if peer.peerId in g.parameters.directPeers: # receiving a graft from a direct peer should yield a more prominent warning (protocol violation) - warn "attempt to graft an explicit peer, peering agreements should be reciprocal", peer=peer.peerId, topic + warn "attempt to graft an explicit peer, peering agreements should be reciprocal", + peer, topic # and such an attempt should be logged and rejected with a PRUNE result.add(ControlPrune( topicID: topic, @@ -106,8 +97,7 @@ proc handleGraft*(g: GossipSub, let backoff = Moment.fromNow(g.parameters.pruneBackoff) g.backingOff - .mgetOrPut(topic, initTable[PeerID, Moment]()) - .mgetOrPut(peer.peerId, backoff) = backoff + .mgetOrPut(topic, initTable[PeerID, Moment]())[peer.peerId] = backoff peer.behaviourPenalty += 0.1 @@ -116,8 +106,7 @@ proc handleGraft*(g: GossipSub, if g.backingOff .getOrDefault(topic) .getOrDefault(peer.peerId) > Moment.now(): - debug "attempt to graft a backingOff peer", peer=peer.peerId, - topic + debug "attempt to graft a backingOff peer", peer, topic # and such an attempt should be logged and rejected with a PRUNE result.add(ControlPrune( topicID: topic, @@ -126,17 +115,12 @@ proc handleGraft*(g: GossipSub, let backoff = Moment.fromNow(g.parameters.pruneBackoff) g.backingOff - .mgetOrPut(topic, initTable[PeerID, Moment]()) - .mgetOrPut(peer.peerId, backoff) = backoff + .mgetOrPut(topic, initTable[PeerID, Moment]())[peer.peerId] = backoff peer.behaviourPenalty += 0.1 continue - # Notice this might not be necessary anymore - if peer.peerId notin g.peerStats: - g.initPeerStats(peer) - # not in the spec exactly, but let's avoid way too low score peers # other clients do it too also was an audit recommendation if peer.score < g.parameters.publishThreshold: @@ -153,18 +137,19 @@ proc handleGraft*(g: GossipSub, g.grafted(peer, topic) g.fanout.removePeer(topic, peer) else: - trace "peer already in mesh" + trace "peer already in mesh", peer, topic else: - trace "pruning grafting peer, mesh full", peer, score = peer.score, mesh = g.mesh.peers(topic) + trace "pruning grafting peer, mesh full", + peer, topic, score = peer.score, mesh = g.mesh.peers(topic) result.add(ControlPrune( topicID: topic, peers: g.peerExchangeList(topic), backoff: g.parameters.pruneBackoff.seconds.uint64)) else: - trace "peer grafting topic we're not interested in", topic + trace "peer grafting topic we're not interested in", peer, topic # gossip 1.1, we do not send a control message prune anymore -proc handlePrune*(g: GossipSub, peer: PubSubPeer, prunes: seq[ControlPrune]) = +proc handlePrune*(g: GossipSub, peer: PubSubPeer, prunes: seq[ControlPrune]) {.raises: [Defect].} = for prune in prunes: let topic = prune.topicID @@ -185,8 +170,7 @@ proc handlePrune*(g: GossipSub, peer: PubSubPeer, prunes: seq[ControlPrune]) = current = g.backingOff.getOrDefault(topic).getOrDefault(peer.peerId) if backoff > current: g.backingOff - .mgetOrPut(topic, initTable[PeerID, Moment]()) - .mgetOrPut(peer.peerId, backoff) = backoff + .mgetOrPut(topic, initTable[PeerID, Moment]())[peer.peerId] = backoff trace "pruning rpc received peer", peer, score = peer.score g.pruned(peer, topic) @@ -198,7 +182,7 @@ proc handlePrune*(g: GossipSub, peer: PubSubPeer, prunes: seq[ControlPrune]) = proc handleIHave*(g: GossipSub, peer: PubSubPeer, - ihaves: seq[ControlIHave]): ControlIWant = + ihaves: seq[ControlIHave]): ControlIWant {.raises: [Defect].} = if peer.score < g.parameters.gossipThreshold: trace "ihave: ignoring low score peer", peer, score = peer.score elif peer.iHaveBudget <= 0: @@ -229,7 +213,7 @@ proc handleIHave*(g: GossipSub, proc handleIWant*(g: GossipSub, peer: PubSubPeer, - iwants: seq[ControlIWant]): seq[Message] = + iwants: seq[ControlIWant]): seq[Message] {.raises: [Defect].} = if peer.score < g.parameters.gossipThreshold: trace "iwant: ignoring low score peer", peer, score = peer.score elif peer.iWantBudget <= 0: @@ -249,7 +233,7 @@ proc handleIWant*(g: GossipSub, else: return -proc commitMetrics(metrics: var MeshMetrics) = +proc commitMetrics(metrics: var MeshMetrics) {.raises: [Defect].} = libp2p_gossipsub_low_peers_topics.set(metrics.lowPeersTopics) libp2p_gossipsub_no_peers_topics.set(metrics.noPeersTopics) libp2p_gossipsub_under_dout_topics.set(metrics.underDoutTopics) @@ -258,7 +242,7 @@ proc commitMetrics(metrics: var MeshMetrics) = libp2p_gossipsub_peers_per_topic_fanout.set(metrics.otherPeersPerTopicFanout, labelValues = ["other"]) libp2p_gossipsub_peers_per_topic_mesh.set(metrics.otherPeersPerTopicMesh, labelValues = ["other"]) -proc rebalanceMesh*(g: GossipSub, topic: string, metrics: ptr MeshMetrics = nil) = +proc rebalanceMesh*(g: GossipSub, topic: string, metrics: ptr MeshMetrics = nil) {.raises: [Defect].} = logScope: topic mesh = g.mesh.peers(topic) @@ -352,7 +336,7 @@ proc rebalanceMesh*(g: GossipSub, topic: string, metrics: ptr MeshMetrics = nil) libp2p_gossipsub_above_dhigh_condition.inc(labelValues = ["other"]) # prune peers if we've gone over Dhi - prunes = toSeq(g.mesh[topic]) + prunes = toSeq(try: g.mesh[topic] except KeyError: raiseAssert "have peers") # avoid pruning peers we are currently grafting in this heartbeat prunes.keepIf do (x: PubSubPeer) -> bool: x notin grafts @@ -400,7 +384,7 @@ proc rebalanceMesh*(g: GossipSub, topic: string, metrics: ptr MeshMetrics = nil) # opportunistic grafting, by spec mesh should not be empty... if g.mesh.peers(topic) > 1: - var peers = toSeq(g.mesh[topic]) + var peers = toSeq(try: g.mesh[topic] except KeyError: raiseAssert "have peers") # grafting so high score has priority peers.sort(byScore, SortOrder.Descending) let medianIdx = peers.len div 2 @@ -470,37 +454,38 @@ proc rebalanceMesh*(g: GossipSub, topic: string, metrics: ptr MeshMetrics = nil) backoff: g.parameters.pruneBackoff.seconds.uint64)]))) g.broadcast(prunes, prune) -proc dropFanoutPeers*(g: GossipSub) = +proc dropFanoutPeers*(g: GossipSub) {.raises: [Defect].} = # drop peers that we haven't published to in # GossipSubFanoutTTL seconds let now = Moment.now() - for topic in toSeq(g.lastFanoutPubSub.keys): - let val = g.lastFanoutPubSub[topic] + var drops: seq[string] + for topic, val in g.lastFanoutPubSub: if now > val: g.fanout.del(topic) - g.lastFanoutPubSub.del(topic) + drops.add topic trace "dropping fanout topic", topic + for topic in drops: + g.lastFanoutPubSub.del topic -proc replenishFanout*(g: GossipSub, topic: string) = +proc replenishFanout*(g: GossipSub, topic: string) {.raises: [Defect].} = ## get fanout peers for a topic logScope: topic trace "about to replenish fanout" if g.fanout.peers(topic) < g.parameters.dLow: trace "replenishing fanout", peers = g.fanout.peers(topic) - if topic in g.gossipsub: - for peer in g.gossipsub[topic]: - if g.fanout.addPeer(topic, peer): - if g.fanout.peers(topic) == g.parameters.d: - break + for peer in g.gossipsub.getOrDefault(topic): + if g.fanout.addPeer(topic, peer): + if g.fanout.peers(topic) == g.parameters.d: + break trace "fanout replenished with peers", peers = g.fanout.peers(topic) -proc getGossipPeers*(g: GossipSub): Table[PubSubPeer, ControlMessage] {.gcsafe.} = +proc getGossipPeers*(g: GossipSub): Table[PubSubPeer, ControlMessage] {.raises: [Defect].} = ## gossip iHave messages to peers ## - libp2p_gossipsub_cache_window_size.set(0) + var cacheWindowSize = 0 trace "getting gossip peers (iHave)" let topics = toHashSet(toSeq(g.mesh.keys)) + toHashSet(toSeq(g.fanout.keys)) @@ -515,7 +500,7 @@ proc getGossipPeers*(g: GossipSub): Table[PubSubPeer, ControlMessage] {.gcsafe.} var midsSeq = toSeq(mids) - libp2p_gossipsub_cache_window_size.inc(midsSeq.len.int64) + cacheWindowSize += midsSeq.len # not in spec # similar to rust: https://github.com/sigp/rust-libp2p/blob/f53d02bc873fef2bf52cd31e3d5ce366a41d8a8c/protocols/gossipsub/src/behaviour.rs#L2101 @@ -546,78 +531,76 @@ proc getGossipPeers*(g: GossipSub): Table[PubSubPeer, ControlMessage] {.gcsafe.} allPeers.setLen(target) for peer in allPeers: - if peer notin result: - result[peer] = ControlMessage() - result[peer].ihave.add(ihave) + result.mGetOrPut(peer, ControlMessage()).ihave.add(ihave) + + libp2p_gossipsub_cache_window_size.set(cacheWindowSize.int64) + +proc onHeartbeat(g: GossipSub) {.raises: [Defect].} = + # reset IWANT budget + # reset IHAVE cap + block: + for peer in g.peers.values: + peer.iWantBudget = IWantPeerBudget + peer.iHaveBudget = IHavePeerBudget + + g.updateScores() + + var meshMetrics = MeshMetrics() + + for t in toSeq(g.topics.keys): + # remove expired backoffs + block: + handleBackingOff(g.backingOff, t) + + # prune every negative score peer + # do this before relance + # in order to avoid grafted -> pruned in the same cycle + let meshPeers = g.mesh.getOrDefault(t) + var prunes: seq[PubSubPeer] + for peer in meshPeers: + if peer.score < 0.0: + trace "pruning negative score peer", peer, score = peer.score + g.pruned(peer, t) + g.mesh.removePeer(t, peer) + prunes &= peer + if prunes.len > 0: + let prune = RPCMsg(control: some(ControlMessage( + prune: @[ControlPrune( + topicID: t, + peers: g.peerExchangeList(t), + backoff: g.parameters.pruneBackoff.seconds.uint64)]))) + g.broadcast(prunes, prune) + + # pass by ptr in order to both signal we want to update metrics + # and as well update the struct for each topic during this iteration + g.rebalanceMesh(t, addr meshMetrics) + + commitMetrics(meshMetrics) + + g.dropFanoutPeers() + + # replenish known topics to the fanout + for t in toSeq(g.fanout.keys): + g.replenishFanout(t) + + let peers = g.getGossipPeers() + for peer, control in peers: + # only ihave from here + for ihave in control.ihave: + if g.knownTopics.contains(ihave.topicID): + libp2p_pubsub_broadcast_ihave.inc(labelValues = [ihave.topicID]) + else: + libp2p_pubsub_broadcast_ihave.inc(labelValues = ["generic"]) + g.send(peer, RPCMsg(control: some(control))) + + g.mcache.shift() # shift the cache + +# {.pop.} # raises [Defect] proc heartbeat*(g: GossipSub) {.async.} = while g.heartbeatRunning: - try: - trace "running heartbeat", instance = cast[int](g) - - # reset IWANT budget - # reset IHAVE cap - block: - for peer in g.peers.values: - peer.iWantBudget = IWantPeerBudget - peer.iHaveBudget = IHavePeerBudget - - g.updateScores() - - var meshMetrics = MeshMetrics() - - for t in toSeq(g.topics.keys): - # remove expired backoffs - block: - handleBackingOff(g.backingOff, t) - - # prune every negative score peer - # do this before relance - # in order to avoid grafted -> pruned in the same cycle - let meshPeers = g.mesh.getOrDefault(t) - var prunes: seq[PubSubPeer] - for peer in meshPeers: - if peer.score < 0.0: - trace "pruning negative score peer", peer, score = peer.score - g.pruned(peer, t) - g.mesh.removePeer(t, peer) - prunes &= peer - if prunes.len > 0: - let prune = RPCMsg(control: some(ControlMessage( - prune: @[ControlPrune( - topicID: t, - peers: g.peerExchangeList(t), - backoff: g.parameters.pruneBackoff.seconds.uint64)]))) - g.broadcast(prunes, prune) - - # pass by ptr in order to both signal we want to update metrics - # and as well update the struct for each topic during this iteration - g.rebalanceMesh(t, addr meshMetrics) - - commitMetrics(meshMetrics) - - g.dropFanoutPeers() - - # replenish known topics to the fanout - for t in toSeq(g.fanout.keys): - g.replenishFanout(t) - - let peers = g.getGossipPeers() - for peer, control in peers: - # only ihave from here - for ihave in control.ihave: - if g.knownTopics.contains(ihave.topicID): - libp2p_pubsub_broadcast_ihave.inc(labelValues = [ihave.topicID]) - else: - libp2p_pubsub_broadcast_ihave.inc(labelValues = ["generic"]) - g.send(peer, RPCMsg(control: some(control))) - - g.mcache.shift() # shift the cache - except CancelledError as exc: - raise exc - except CatchableError as exc: - warn "exception ocurred in gossipsub heartbeat", exc = exc.msg, - trace = exc.getStackTrace() + trace "running heartbeat", instance = cast[int](g) + g.onHeartbeat() for trigger in g.heartbeatEvents: trace "firing heartbeat event", instance = cast[int](g) diff --git a/libp2p/protocols/pubsub/gossipsub/scoring.nim b/libp2p/protocols/pubsub/gossipsub/scoring.nim index cf24bf2..d2db2c2 100644 --- a/libp2p/protocols/pubsub/gossipsub/scoring.nim +++ b/libp2p/protocols/pubsub/gossipsub/scoring.nim @@ -7,7 +7,9 @@ ## This file may not be copied, modified, or distributed except according to ## those terms. -import std/[tables, strutils, sets, algorithm, options] +{.push raises: [Defect].} + +import std/[tables, sets, options] import chronos, chronicles, metrics import "."/[types] import ".."/[pubsubpeer] @@ -23,12 +25,17 @@ declareGauge(libp2p_gossipsub_peers_score_appScore, "Detailed gossipsub scoring declareGauge(libp2p_gossipsub_peers_score_behaviourPenalty, "Detailed gossipsub scoring metric", labels = ["agent"]) declareGauge(libp2p_gossipsub_peers_score_colocationFactor, "Detailed gossipsub scoring metric", labels = ["agent"]) -proc initPeerStats*(g: GossipSub, peer: PubSubPeer, stats: PeerStats = PeerStats()) = - var initialStats = stats - initialStats.expire = Moment.now() + g.parameters.retainScore - g.peerStats[peer.peerId] = initialStats - peer.iWantBudget = IWantPeerBudget - peer.iHaveBudget = IHavePeerBudget +proc withPeerStats*( + g: GossipSub, peerId: PeerId, + action: proc (stats: var PeerStats) {.gcsafe, raises: [Defect].}) = + ## Add or update peer statistics for a particular peer id - the statistics + ## are retained across multiple connections until they expire + g.peerStats.withValue(peerId, stats) do: + action(stats[]) + do: + action(g.peerStats.mgetOrPut(peerId, PeerStats( + expire: Moment.now() + g.parameters.retainScore + ))) func `/`(a, b: Duration): float64 = let @@ -46,15 +53,16 @@ proc colocationFactor(g: GossipSub, peer: PubSubPeer): float64 = address = peer.address.get() g.peersInIP.mgetOrPut(address, initHashSet[PeerID]()).incl(peer.peerId) let - ipPeers = g.peersInIP[address] - len = ipPeers.len.float64 - if len > g.parameters.ipColocationFactorThreshold: - trace "colocationFactor over threshold", peer, address, len - let over = len - g.parameters.ipColocationFactorThreshold + ipPeers = g.peersInIP.getOrDefault(address).len().float64 + if ipPeers > g.parameters.ipColocationFactorThreshold: + trace "colocationFactor over threshold", peer, address, ipPeers + let over = ipPeers - g.parameters.ipColocationFactorThreshold over * over else: 0.0 +{.pop.} + proc disconnectPeer(g: GossipSub, peer: PubSubPeer) {.async.} = when defined(libp2p_agents_metrics): let agent = @@ -77,12 +85,14 @@ proc disconnectPeer(g: GossipSub, peer: PubSubPeer) {.async.} = try: await g.switch.disconnect(peer.peerId) - except CancelledError: - raise - except CatchableError as exc: + except CatchableError as exc: # Never cancelled trace "Failed to close connection", peer, error = exc.name, msg = exc.msg +{.push raises: [Defect].} + proc updateScores*(g: GossipSub) = # avoid async + ## https://github.com/libp2p/specs/blob/master/pubsub/gossipsub/gossipsub-v1.1.md#the-score-function + ## trace "updating scores", peers = g.peers.len let now = Moment.now() @@ -101,6 +111,7 @@ proc updateScores*(g: GossipSub) = # avoid async var n_topics = 0 is_grafted = 0 + score = 0.0 # Per topic for topic, topicParams in g.topicParams: @@ -144,7 +155,7 @@ proc updateScores*(g: GossipSub) = # avoid async trace "updated peer topic's scores", peer, topic, info, topicScore - peer.score += topicScore * topicParams.topicWeight + score += topicScore * topicParams.topicWeight # Score metrics when defined(libp2p_agents_metrics): @@ -193,12 +204,14 @@ proc updateScores*(g: GossipSub) = # avoid async # commit our changes, mgetOrPut does NOT work as wanted with value types (lent?) stats.topicInfos[topic] = info - peer.score += peer.appScore * g.parameters.appSpecificWeight + score += peer.appScore * g.parameters.appSpecificWeight - peer.score += peer.behaviourPenalty * peer.behaviourPenalty * g.parameters.behaviourPenaltyWeight + + # The value of the parameter is the square of the counter and is mixed with a negative weight. + score += peer.behaviourPenalty * peer.behaviourPenalty * g.parameters.behaviourPenaltyWeight let colocationFactor = g.colocationFactor(peer) - peer.score += colocationFactor * g.parameters.ipColocationFactorWeight + score += colocationFactor * g.parameters.ipColocationFactorWeight # Score metrics when defined(libp2p_agents_metrics): @@ -229,17 +242,19 @@ proc updateScores*(g: GossipSub) = # avoid async if peer.behaviourPenalty < g.parameters.decayToZero: peer.behaviourPenalty = 0 + peer.score = score + # copy into stats the score to keep until expired stats.score = peer.score stats.appScore = peer.appScore stats.behaviourPenalty = peer.behaviourPenalty - stats.expire = Moment.now() + g.parameters.retainScore # refresh expiration - assert(g.peerStats[peer.peerId].score == peer.score) # nim sanity check + stats.expire = now + g.parameters.retainScore # refresh expiration + trace "updated peer's score", peer, score = peer.score, n_topics, is_grafted if g.parameters.disconnectBadPeers and stats.score < g.parameters.graylistThreshold: debug "disconnecting bad score peer", peer, score = peer.score - asyncSpawn g.disconnectPeer(peer) + asyncSpawn(try: g.disconnectPeer(peer) except Exception as exc: raiseAssert exc.msg) when defined(libp2p_agents_metrics): libp2p_gossipsub_peers_scores.inc(peer.score, labelValues = [agent]) @@ -257,13 +272,5 @@ proc punishInvalidMessage*(g: GossipSub, peer: PubSubPeer, topics: seq[string]) continue # update stats - g.peerStats.withValue(peer.peerId, stats): - stats[].topicInfos.withValue(t, tstats): - tstats[].invalidMessageDeliveries += 1 - do: # if we have no stats populate! - stats[].topicInfos[t] = TopicInfo(invalidMessageDeliveries: 1) - do: # if we have no stats populate! - g.initPeerStats(peer) do: - var stats = PeerStats() - stats.topicInfos[t] = TopicInfo(invalidMessageDeliveries: 1) - stats + g.withPeerStats(peer.peerId) do (stats: var PeerStats): + stats.topicInfos.mgetOrPut(t, TopicInfo()).invalidMessageDeliveries += 1 diff --git a/libp2p/protocols/pubsub/mcache.nim b/libp2p/protocols/pubsub/mcache.nim index ff25c94..821fcea 100644 --- a/libp2p/protocols/pubsub/mcache.nim +++ b/libp2p/protocols/pubsub/mcache.nim @@ -7,6 +7,8 @@ ## This file may not be copied, modified, or distributed except according to ## those terms. +{.push raises: [Defect].} + import std/[sets, tables, options] import rpc/[messages] @@ -24,12 +26,13 @@ type func get*(c: MCache, mid: MessageID): Option[Message] = if mid in c.msgs: - some(c.msgs[mid]) + try: some(c.msgs[mid]) + except KeyError: raiseAssert "checked" else: none(Message) func contains*(c: MCache, mid: MessageID): bool = - c.get(mid).isSome + mid in c.msgs func put*(c: var MCache, msgId: MessageID, msg: Message) = if not c.msgs.hasKeyOrPut(msgId, msg): @@ -37,8 +40,6 @@ func put*(c: var MCache, msgId: MessageID, msg: Message) = c.history[0].add(CacheEntry(mid: msgId, topicIDs: msg.topicIDs)) func window*(c: MCache, topic: string): HashSet[MessageID] = - result = initHashSet[MessageID]() - let len = min(c.windowSize, c.history.len) diff --git a/libp2p/protocols/pubsub/peertable.nim b/libp2p/protocols/pubsub/peertable.nim index 944fa2a..09e0830 100644 --- a/libp2p/protocols/pubsub/peertable.nim +++ b/libp2p/protocols/pubsub/peertable.nim @@ -7,17 +7,21 @@ ## This file may not be copied, modified, or distributed except according to ## those terms. -import tables, sequtils, sets -import pubsubpeer, ../../peerid +{.push raises: [Defect].} + +import std/[tables, sets] +import ./pubsubpeer, ../../peerid type PeerTable* = Table[string, HashSet[PubSubPeer]] # topic string to peer map proc hasPeerID*(t: PeerTable, topic: string, peerId: PeerID): bool = if topic in t: - for peer in t[topic]: - if peer.peerId == peerId: - return true + try: + for peer in t[topic]: + if peer.peerId == peerId: + return true + except KeyError: raiseAssert "checked with in" false func addPeer*(table: var PeerTable, topic: string, peer: PubSubPeer): bool = @@ -34,10 +38,13 @@ func removePeer*(table: var PeerTable, topic: string, peer: PubSubPeer) = table.del(topic) func hasPeer*(table: PeerTable, topic: string, peer: PubSubPeer): bool = - (topic in table) and (peer in table[topic]) + try: + (topic in table) and (peer in table[topic]) + except KeyError: raiseAssert "checked with in" func peers*(table: PeerTable, topic: string): int = if topic in table: - table[topic].len + try: table[topic].len + except KeyError: raiseAssert "checked with in" else: 0 diff --git a/libp2p/protocols/pubsub/pubsub.nim b/libp2p/protocols/pubsub/pubsub.nim index f8ee577..a608bfc 100644 --- a/libp2p/protocols/pubsub/pubsub.nim +++ b/libp2p/protocols/pubsub/pubsub.nim @@ -119,7 +119,7 @@ method unsubscribePeer*(p: PubSub, peerId: PeerID) {.base.} = libp2p_pubsub_peers.set(p.peers.len.int64) -proc send*(p: PubSub, peer: PubSubPeer, msg: RPCMsg) = +proc send*(p: PubSub, peer: PubSubPeer, msg: RPCMsg) {.raises: [Defect].} = ## Attempt to send `msg` to remote peer ## @@ -129,7 +129,7 @@ proc send*(p: PubSub, peer: PubSubPeer, msg: RPCMsg) = proc broadcast*( p: PubSub, sendPeers: openArray[PubSubPeer], - msg: RPCMsg) = # raises: [Defect] + msg: RPCMsg) {.raises: [Defect].} = ## Attempt to send `msg` to the given peers let npeers = sendPeers.len.int64 @@ -278,9 +278,7 @@ proc getOrCreatePeer*( proc dropConnAsync(peer: PubsubPeer) {.async.} = try: await p.switch.disconnect(peer.peerId) - except CancelledError: - raise - except CatchableError as exc: + except CatchableError as exc: # never cancelled trace "Failed to close connection", peer, error = exc.name, msg = exc.msg asyncSpawn dropConnAsync(peer) @@ -530,8 +528,6 @@ proc init*[PubParams: object | bool]( anonymize: anonymize, verifySignature: verifySignature, sign: sign, - peers: initTable[PeerID, PubSubPeer](), - topics: initTable[string, Topic](), msgIdProvider: msgIdProvider, subscriptionValidator: subscriptionValidator, topicsHigh: int.high) @@ -542,8 +538,6 @@ proc init*[PubParams: object | bool]( anonymize: anonymize, verifySignature: verifySignature, sign: sign, - peers: initTable[PeerID, PubSubPeer](), - topics: initTable[string, Topic](), msgIdProvider: msgIdProvider, subscriptionValidator: subscriptionValidator, parameters: parameters, diff --git a/libp2p/protocols/pubsub/pubsubpeer.nim b/libp2p/protocols/pubsub/pubsubpeer.nim index e52bc82..8f64a75 100644 --- a/libp2p/protocols/pubsub/pubsubpeer.nim +++ b/libp2p/protocols/pubsub/pubsubpeer.nim @@ -195,9 +195,7 @@ proc connectImpl(p: PubSubPeer) {.async.} = # issue so we try to get a new on while true: await connectOnce(p) - except CancelledError: - raise - except CatchableError as exc: + except CatchableError as exc: # never cancelled debug "Could not establish send connection", msg = exc.msg finally: # drop the connection, else we end up with ghost peers @@ -212,7 +210,7 @@ proc sendImpl(conn: Connection, encoded: seq[byte]) {.async.} = await conn.writeLp(encoded) trace "sent pubsub message to remote", conn - except CatchableError as exc: + except CatchableError as exc: # never cancelled # Because we detach the send call from the currently executing task using # asyncSpawn, no exceptions may leak out of it trace "Unable to send to remote", conn, msg = exc.msg @@ -228,7 +226,7 @@ template sendMetrics(msg: RPCMsg): untyped = # metrics libp2p_pubsub_sent_messages.inc(labelValues = [$p.peerId, t]) -proc send*(p: PubSubPeer, msg: RPCMsg, anonymize: bool) = +proc send*(p: PubSubPeer, msg: RPCMsg, anonymize: bool) {.raises: [Defect].} = doAssert(not isNil(p), "pubsubpeer nil!") let conn = p.sendConn @@ -261,7 +259,10 @@ proc send*(p: PubSubPeer, msg: RPCMsg, anonymize: bool) = # To limit the size of the closure, we only pass the encoded message and # connection to the spawned send task - asyncSpawn sendImpl(conn, encoded) + asyncSpawn(try: + sendImpl(conn, encoded) + except Exception as exc: # TODO chronos Exception + raiseAssert exc.msg) proc newPubSubPeer*(peerId: PeerID, getConn: GetConn, diff --git a/libp2p/protocols/pubsub/timedcache.nim b/libp2p/protocols/pubsub/timedcache.nim index b7647e9..e4cfdf5 100644 --- a/libp2p/protocols/pubsub/timedcache.nim +++ b/libp2p/protocols/pubsub/timedcache.nim @@ -7,7 +7,7 @@ ## This file may not be copied, modified, or distributed except according to ## those terms. -import std/[hashes, tables] +import std/[tables] import chronos/timer diff --git a/libp2p/stream/bufferstream.nim b/libp2p/stream/bufferstream.nim index 60a551d..84deb0d 100644 --- a/libp2p/stream/bufferstream.nim +++ b/libp2p/stream/bufferstream.nim @@ -100,7 +100,7 @@ method pushEof*(s: BufferStream) {.base, async.} = finally: s.pushing = false -method atEof*(s: BufferStream): bool = +method atEof*(s: BufferStream): bool {.raises: [Defect].} = s.isEof and s.readBuf.len == 0 method readOnce*(s: BufferStream, diff --git a/libp2p/stream/chronosstream.nim b/libp2p/stream/chronosstream.nim index c2ac074..259587f 100644 --- a/libp2p/stream/chronosstream.nim +++ b/libp2p/stream/chronosstream.nim @@ -7,7 +7,7 @@ ## This file may not be copied, modified, or distributed except according to ## those terms. -import std/[oids, strformat, strutils] +import std/[oids, strformat] import chronos, chronicles, metrics import connection import ../utility @@ -126,10 +126,10 @@ method write*(s: ChronosStream, msg: seq[byte]) {.async.} = if s.tracked: libp2p_peers_traffic_write.inc(msg.len.int64, labelValues = [s.shortAgent]) -method closed*(s: ChronosStream): bool {.inline.} = +method closed*(s: ChronosStream): bool {.raises: [Defect].} = result = s.client.closed -method atEof*(s: ChronosStream): bool {.inline.} = +method atEof*(s: ChronosStream): bool {.raises: [Defect].} = s.client.atEof() method closeImpl*(s: ChronosStream) {.async.} = diff --git a/libp2p/stream/lpstream.nim b/libp2p/stream/lpstream.nim index 1ced16b..5e480b6 100644 --- a/libp2p/stream/lpstream.nim +++ b/libp2p/stream/lpstream.nim @@ -130,10 +130,10 @@ method initStream*(s: LPStream) {.base.} = proc join*(s: LPStream): Future[void] = s.closeEvent.wait() -method closed*(s: LPStream): bool {.base.} = +method closed*(s: LPStream): bool {.base, raises: [Defect].} = s.isClosed -method atEof*(s: LPStream): bool {.base.} = +method atEof*(s: LPStream): bool {.base, raises: [Defect].} = s.isEof method readOnce*(s: LPStream, diff --git a/libp2p/switch.nim b/libp2p/switch.nim index 97ededb..19ce0d2 100644 --- a/libp2p/switch.nim +++ b/libp2p/switch.nim @@ -8,7 +8,6 @@ ## those terms. import std/[tables, - sequtils, options, sets, oids, diff --git a/libp2p/utility.nim b/libp2p/utility.nim index 9f3081a..f866e9e 100644 --- a/libp2p/utility.nim +++ b/libp2p/utility.nim @@ -7,8 +7,9 @@ ## This file may not be copied, modified, or distributed except according to ## those terms. +{.push raises: [Defect].} + import stew/byteutils -import strutils const ShortDumpMax = 12 diff --git a/tests/pubsub/testgossipinternal.nim b/tests/pubsub/testgossipinternal.nim index 81779d1..921aea8 100644 --- a/tests/pubsub/testgossipinternal.nim +++ b/tests/pubsub/testgossipinternal.nim @@ -18,14 +18,20 @@ type proc noop(data: seq[byte]) {.async, gcsafe.} = discard -proc getPubSubPeer(p: TestGossipSub, peerId: PeerID): auto = +proc getPubSubPeer(p: TestGossipSub, peerId: PeerID): PubSubPeer = proc getConn(): Future[Connection] = p.switch.dial(peerId, GossipSubCodec) proc dropConn(peer: PubSubPeer) = discard # we don't care about it here yet - newPubSubPeer(peerId, getConn, dropConn, nil, GossipSubCodec) + let pubSubPeer = newPubSubPeer(peerId, getConn, dropConn, nil, GossipSubCodec) + debug "created new pubsub peer", peerId + + p.peers[peerId] = pubSubPeer + + onNewPeer(p, pubSubPeer) + pubSubPeer proc randomPeerInfo(): PeerInfo = PeerInfo.init(PrivateKey.random(ECDSA, rng[]).get()) @@ -53,8 +59,6 @@ suite "GossipSub internal": conn.peerInfo = peerInfo let peer = gossipSub.getPubSubPeer(peerInfo.peerId) peer.sendConn = conn - gossipSub.onNewPeer(peer) - gossipSub.peers[peerInfo.peerId] = peer gossipSub.gossipsub[topic].incl(peer) # test via dynamic dispatch @@ -96,8 +100,6 @@ suite "GossipSub internal": conn.peerInfo = peerInfo let peer = gossipSub.getPubSubPeer(peerInfo.peerId) peer.sendConn = conn - gossipSub.onNewPeer(peer) - gossipSub.peers[peerInfo.peerId] = peer gossipSub.gossipsub[topic].incl(peer) check gossipSub.peers.len == 15 @@ -125,8 +127,6 @@ suite "GossipSub internal": let peer = gossipSub.getPubSubPeer(peerInfo.peerId) peer.sendConn = conn peer.score = scoreLow - gossipSub.onNewPeer(peer) - gossipSub.peers[peerInfo.peerId] = peer gossipSub.gossipsub[topic].incl(peer) scoreLow += 1.0 @@ -155,9 +155,7 @@ suite "GossipSub internal": let peerInfo = PeerInfo.init(PrivateKey.random(ECDSA, rng[]).get()) conn.peerInfo = peerInfo let peer = gossipSub.getPubSubPeer(peerInfo.peerId) - gossipSub.onNewPeer(peer) gossipSub.grafted(peer, topic) - gossipSub.peers[peerInfo.peerId] = peer gossipSub.mesh[topic].incl(peer) check gossipSub.mesh[topic].len == 15 @@ -184,7 +182,6 @@ suite "GossipSub internal": var peerInfo = randomPeerInfo() conn.peerInfo = peerInfo let peer = gossipSub.getPubSubPeer(peerInfo.peerId) - gossipSub.onNewPeer(peer) peer.handler = handler gossipSub.gossipsub[topic].incl(peer) @@ -214,7 +211,6 @@ suite "GossipSub internal": let peerInfo = PeerInfo.init(PrivateKey.random(ECDSA, rng[]).get()) conn.peerInfo = peerInfo let peer = gossipSub.getPubSubPeer(peerInfo.peerId) - gossipSub.onNewPeer(peer) peer.handler = handler gossipSub.fanout[topic].incl(peer) @@ -249,7 +245,6 @@ suite "GossipSub internal": let peerInfo = randomPeerInfo() conn.peerInfo = peerInfo let peer = gossipSub.getPubSubPeer(peerInfo.peerId) - gossipSub.onNewPeer(peer) peer.handler = handler gossipSub.fanout[topic1].incl(peer) gossipSub.fanout[topic2].incl(peer) @@ -284,7 +279,6 @@ suite "GossipSub internal": let peerInfo = randomPeerInfo() conn.peerInfo = peerInfo let peer = gossipSub.getPubSubPeer(peerInfo.peerId) - gossipSub.onNewPeer(peer) peer.handler = handler if i mod 2 == 0: gossipSub.fanout[topic].incl(peer) @@ -299,7 +293,6 @@ suite "GossipSub internal": let peerInfo = randomPeerInfo() conn.peerInfo = peerInfo let peer = gossipSub.getPubSubPeer(peerInfo.peerId) - gossipSub.onNewPeer(peer) peer.handler = handler gossipSub.gossipsub[topic].incl(peer) @@ -344,7 +337,6 @@ suite "GossipSub internal": let peerInfo = randomPeerInfo() conn.peerInfo = peerInfo let peer = gossipSub.getPubSubPeer(peerInfo.peerId) - gossipSub.onNewPeer(peer) peer.handler = handler if i mod 2 == 0: gossipSub.fanout[topic].incl(peer) @@ -385,7 +377,6 @@ suite "GossipSub internal": let peerInfo = randomPeerInfo() conn.peerInfo = peerInfo let peer = gossipSub.getPubSubPeer(peerInfo.peerId) - gossipSub.onNewPeer(peer) peer.handler = handler if i mod 2 == 0: gossipSub.mesh[topic].incl(peer) @@ -427,7 +418,6 @@ suite "GossipSub internal": let peerInfo = randomPeerInfo() conn.peerInfo = peerInfo let peer = gossipSub.getPubSubPeer(peerInfo.peerId) - gossipSub.onNewPeer(peer) peer.handler = handler if i mod 2 == 0: gossipSub.mesh[topic].incl(peer) @@ -466,7 +456,6 @@ suite "GossipSub internal": let peerInfo = randomPeerInfo() conn.peerInfo = peerInfo let peer = gossipSub.getPubSubPeer(peerInfo.peerId) - gossipSub.onNewPeer(peer) peer.handler = handler # generate messages @@ -489,7 +478,7 @@ suite "GossipSub internal": asyncTest "Disconnect bad peers": let gossipSub = TestGossipSub.init(newStandardSwitch()) gossipSub.parameters.disconnectBadPeers = true - + gossipSub.parameters.appSpecificWeight = 1.0 proc handler(peer: PubSubPeer, msg: RPCMsg) {.async.} = check false @@ -501,12 +490,10 @@ suite "GossipSub internal": let peerInfo = randomPeerInfo() conn.peerInfo = peerInfo let peer = gossipSub.getPubSubPeer(peerInfo.peerId) - gossipSub.onNewPeer(peer) peer.sendConn = conn peer.handler = handler - peer.score = gossipSub.parameters.graylistThreshold - 1 + peer.appScore = gossipSub.parameters.graylistThreshold - 1 gossipSub.gossipsub.mgetOrPut(topic, initHashSet[PubSubPeer]()).incl(peer) - gossipSub.peers[peerInfo.peerId] = peer gossipSub.switch.connManager.storeConn(conn) gossipSub.updateScores() @@ -535,7 +522,6 @@ suite "GossipSub internal": let peerInfo = randomPeerInfo() conn.peerInfo = peerInfo let peer = gossipSub.getPubSubPeer(peerInfo.peerId) - gossipSub.peers[peerInfo.peerId] = peer await gossipSub.rpcHandler(peer, lotOfSubs) @@ -561,8 +547,6 @@ suite "GossipSub internal": conn.peerInfo = peerInfo let peer = gossipSub.getPubSubPeer(peerInfo.peerId) peer.sendConn = conn - gossipSub.onNewPeer(peer) - gossipSub.peers[peerInfo.peerId] = peer gossipSub.gossipsub[topic].incl(peer) gossipSub.backingOff .mgetOrPut(topic, initTable[PeerID, Moment]()) @@ -601,9 +585,7 @@ suite "GossipSub internal": let peer = gossipSub.getPubSubPeer(peerInfo.peerId) peer.score = 40.0 peer.sendConn = conn - gossipSub.onNewPeer(peer) gossipSub.grafted(peer, topic) - gossipSub.peers[peerInfo.peerId] = peer gossipSub.mesh[topic].incl(peer) for i in 0..<7: @@ -615,9 +597,7 @@ suite "GossipSub internal": let peer = gossipSub.getPubSubPeer(peerInfo.peerId) peer.score = 10.0 peer.sendConn = conn - gossipSub.onNewPeer(peer) gossipSub.grafted(peer, topic) - gossipSub.peers[peerInfo.peerId] = peer gossipSub.mesh[topic].incl(peer) check gossipSub.mesh[topic].len == 13 @@ -650,10 +630,8 @@ suite "GossipSub internal": let peerInfo = randomPeerInfo() conn.peerInfo = peerInfo let peer = gossipSub.getPubSubPeer(peerInfo.peerId) - gossipSub.onNewPeer(peer) peer.handler = handler gossipSub.grafted(peer, topic) - gossipSub.peers[peerInfo.peerId] = peer gossipSub.mesh[topic].incl(peer) block: @@ -668,7 +646,7 @@ suite "GossipSub internal": topicID: topic, messageIDs: @[id, id, id] ) - # gossipSub.initPeerStats(peer) + peer.iHaveBudget = 0 let iwants = gossipSub.handleIHave(peer, @[msg]) check: iwants.messageIDs.len == 0 @@ -684,7 +662,6 @@ suite "GossipSub internal": topicID: topic, messageIDs: @[id, id, id] ) - gossipSub.initPeerStats(peer) let iwants = gossipSub.handleIHave(peer, @[msg]) check: iwants.messageIDs.len == 1 @@ -700,7 +677,6 @@ suite "GossipSub internal": let msg = ControlIWant( messageIDs: @[id, id, id] ) - gossipSub.initPeerStats(peer) let genmsg = gossipSub.handleIWant(peer, @[msg]) check: genmsg.len == 1