fix peer score accumulation (#541)

* fix accumulating peer score
* fix missing exception handling
* remove unnecessary initHashSet/initTable calls
* simplify peer stats management
* clean up tests a little
* fix some missing raises annotations
This commit is contained in:
Jacek Sieka 2021-03-09 13:22:52 +01:00 committed by GitHub
parent 269d3df351
commit 70deac9e0d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
23 changed files with 230 additions and 290 deletions

View File

@ -62,7 +62,6 @@ const
] ]
proc initCidCodeTable(): Table[int, MultiCodec] {.compileTime.} = proc initCidCodeTable(): Table[int, MultiCodec] {.compileTime.} =
result = initTable[int, MultiCodec]()
for item in ContentIdsList: for item in ContentIdsList:
result[int(item)] = item result[int(item)] = item

View File

@ -93,8 +93,6 @@ proc init*(C: type ConnManager,
raiseAssert "Invalid connection counts!" raiseAssert "Invalid connection counts!"
C(maxConnsPerPeer: maxConnsPerPeer, C(maxConnsPerPeer: maxConnsPerPeer,
conns: initTable[PeerID, HashSet[Connection]](),
muxed: initTable[Connection, MuxerHolder](),
inSema: inSema, inSema: inSema,
outSema: outSema) outSema: outSema)

View File

@ -645,7 +645,6 @@ proc newDaemonApi*(flags: set[P2PDaemonFlags] = {},
api.servers = newSeq[P2PServer]() api.servers = newSeq[P2PServer]()
api.pattern = patternForChild api.pattern = patternForChild
api.ucounter = 1 api.ucounter = 1
api.handlers = initTable[string, P2PStreamCallback]()
if len(sockpath) == 0: if len(sockpath) == 0:
api.flags.excl(NoProcessCtrl) api.flags.excl(NoProcessCtrl)

View File

@ -437,7 +437,6 @@ const
proc initMultiAddressCodeTable(): Table[MultiCodec, proc initMultiAddressCodeTable(): Table[MultiCodec,
MAProtocol] {.compileTime.} = MAProtocol] {.compileTime.} =
result = initTable[MultiCodec, MAProtocol]()
for item in ProtocolsList: for item in ProtocolsList:
result[item.mcodec] = item result[item.mcodec] = item

View File

@ -328,12 +328,10 @@ const
] ]
proc initMultiBaseCodeTable(): Table[char, MBCodec] {.compileTime.} = proc initMultiBaseCodeTable(): Table[char, MBCodec] {.compileTime.} =
result = initTable[char, MBCodec]()
for item in MultibaseCodecs: for item in MultibaseCodecs:
result[item.code] = item result[item.code] = item
proc initMultiBaseNameTable(): Table[string, MBCodec] {.compileTime.} = proc initMultiBaseNameTable(): Table[string, MBCodec] {.compileTime.} =
result = initTable[string, MBCodec]()
for item in MultibaseCodecs: for item in MultibaseCodecs:
result[item.name] = item result[item.name] = item

View File

@ -242,12 +242,10 @@ const
InvalidMultiCodec* = MultiCodec(-1) InvalidMultiCodec* = MultiCodec(-1)
proc initMultiCodecNameTable(): Table[string, int] {.compileTime.} = proc initMultiCodecNameTable(): Table[string, int] {.compileTime.} =
result = initTable[string, int]()
for item in MultiCodecList: for item in MultiCodecList:
result[item[0]] = item[1] result[item[0]] = item[1]
proc initMultiCodecCodeTable(): Table[int, string] {.compileTime.} = proc initMultiCodecCodeTable(): Table[int, string] {.compileTime.} =
result = initTable[int, string]()
for item in MultiCodecList: for item in MultiCodecList:
result[item[1]] = item[0] result[item[1]] = item[0]
@ -271,7 +269,7 @@ proc `$`*(mc: MultiCodec): string =
## Returns string representation of MultiCodec ``mc``. ## Returns string representation of MultiCodec ``mc``.
let name = CodeCodecs.getOrDefault(int(mc), "") let name = CodeCodecs.getOrDefault(int(mc), "")
doAssert(name != "") doAssert(name != "")
name name
proc `==`*(mc: MultiCodec, name: string): bool {.inline.} = proc `==`*(mc: MultiCodec, name: string): bool {.inline.} =
## Compares MultiCodec ``mc`` with string ``name``. ## Compares MultiCodec ``mc`` with string ``name``.

View File

@ -319,7 +319,6 @@ const
] ]
proc initMultiHashCodeTable(): Table[MultiCodec, MHash] {.compileTime.} = proc initMultiHashCodeTable(): Table[MultiCodec, MHash] {.compileTime.} =
result = initTable[MultiCodec, MHash]()
for item in HashesList: for item in HashesList:
result[item.mcodec] = item result[item.mcodec] = item

View File

@ -69,7 +69,7 @@ proc open*(s: LPChannel) {.async, gcsafe.} =
await s.conn.close() await s.conn.close()
raise exc raise exc
method closed*(s: LPChannel): bool = method closed*(s: LPChannel): bool {.raises: [Defect].} =
s.closedLocal s.closedLocal
proc closeUnderlying(s: LPChannel): Future[void] {.async.} = proc closeUnderlying(s: LPChannel): Future[void] {.async.} =

View File

@ -205,6 +205,5 @@ method unsubscribeAll*(f: FloodSub, topic: string) =
method initPubSub*(f: FloodSub) = method initPubSub*(f: FloodSub) =
procCall PubSub(f).initPubSub() procCall PubSub(f).initPubSub()
f.floodsub = initTable[string, HashSet[PubSubPeer]]()
f.seen = TimedCache[MessageID].init(2.minutes) f.seen = TimedCache[MessageID].init(2.minutes)
f.init() f.init()

View File

@ -7,7 +7,7 @@
## This file may not be copied, modified, or distributed except according to ## This file may not be copied, modified, or distributed except according to
## those terms. ## those terms.
import std/[tables, sets, options, sequtils, strutils, random, algorithm] import std/[tables, sets, options, sequtils, random]
import chronos, chronicles, metrics, bearssl import chronos, chronicles, metrics, bearssl
import ./pubsub, import ./pubsub,
./floodsub, ./floodsub,
@ -159,16 +159,16 @@ method init*(g: GossipSub) =
g.codecs &= GossipSubCodec_10 g.codecs &= GossipSubCodec_10
method onNewPeer(g: GossipSub, peer: PubSubPeer) = method onNewPeer(g: GossipSub, peer: PubSubPeer) =
if peer.peerId notin g.peerStats: g.withPeerStats(peer.peerId) do (stats: var PeerStats):
g.initPeerStats(peer) # Make sure stats and peer information match, even when reloading peer stats
else: # from a previous connection
# we knew this peer
# restore previously stored score
let stats = g.peerStats[peer.peerId]
peer.score = stats.score peer.score = stats.score
peer.appScore = stats.appScore peer.appScore = stats.appScore
peer.behaviourPenalty = stats.behaviourPenalty peer.behaviourPenalty = stats.behaviourPenalty
peer.iWantBudget = IWantPeerBudget
peer.iHaveBudget = IHavePeerBudget
method onPubSubPeerEvent*(p: GossipSub, peer: PubsubPeer, event: PubSubPeerEvent) {.gcsafe.} = method onPubSubPeerEvent*(p: GossipSub, peer: PubsubPeer, event: PubSubPeerEvent) {.gcsafe.} =
case event.kind case event.kind
of PubSubPeerEventKind.Connected: of PubSubPeerEventKind.Connected:
@ -216,7 +216,6 @@ method unsubscribePeer*(g: GossipSub, peer: PeerID) =
g.fanout.removePeer(t, pubSubPeer) g.fanout.removePeer(t, pubSubPeer)
g.peerStats.withValue(peer, stats): g.peerStats.withValue(peer, stats):
stats[].expire = Moment.now() + g.parameters.retainScore
for topic, info in stats[].topicInfos.mpairs: for topic, info in stats[].topicInfos.mpairs:
info.firstMessageDeliveries = 0 info.firstMessageDeliveries = 0
@ -294,21 +293,16 @@ method rpcHandler*(g: GossipSub,
# for every topic in the message # for every topic in the message
let topicParams = g.topicParams.mgetOrPut(t, TopicParams.init()) let topicParams = g.topicParams.mgetOrPut(t, TopicParams.init())
# if in mesh add more delivery score # if in mesh add more delivery score
g.peerStats.withValue(peer.peerId, pstats): g.withPeerStats(peer.peerId) do (stats: var PeerStats):
pstats[].topicInfos.withValue(t, stats): stats.topicInfos.withValue(t, tstats):
if stats[].inMesh: if tstats[].inMesh:
# TODO: take into account meshMessageDeliveriesWindow # TODO: take into account meshMessageDeliveriesWindow
# score only if messages are not too old. # score only if messages are not too old.
stats[].meshMessageDeliveries += 1 tstats[].meshMessageDeliveries += 1
if stats[].meshMessageDeliveries > topicParams.meshMessageDeliveriesCap: if tstats[].meshMessageDeliveries > topicParams.meshMessageDeliveriesCap:
stats[].meshMessageDeliveries = topicParams.meshMessageDeliveriesCap tstats[].meshMessageDeliveries = topicParams.meshMessageDeliveriesCap
do: # make sure we don't loose this information do: # make sure we don't loose this information
pstats[].topicInfos[t] = TopicInfo(meshMessageDeliveries: 1)
do: # make sure we don't loose this information
g.initPeerStats(peer) do:
var stats = PeerStats()
stats.topicInfos[t] = TopicInfo(meshMessageDeliveries: 1) stats.topicInfos[t] = TopicInfo(meshMessageDeliveries: 1)
stats
# onto the next message # onto the next message
continue continue
@ -359,25 +353,20 @@ method rpcHandler*(g: GossipSub,
let topicParams = g.topicParams.mgetOrPut(t, TopicParams.init()) let topicParams = g.topicParams.mgetOrPut(t, TopicParams.init())
g.peerStats.withValue(peer.peerId, pstats): g.withPeerStats(peer.peerId) do(stats: var PeerStats):
pstats[].topicInfos.withValue(t, stats): stats.topicInfos.withValue(t, tstats):
# contribute to peer score first delivery # contribute to peer score first delivery
stats[].firstMessageDeliveries += 1 tstats[].firstMessageDeliveries += 1
if stats[].firstMessageDeliveries > topicParams.firstMessageDeliveriesCap: if tstats[].firstMessageDeliveries > topicParams.firstMessageDeliveriesCap:
stats[].firstMessageDeliveries = topicParams.firstMessageDeliveriesCap tstats[].firstMessageDeliveries = topicParams.firstMessageDeliveriesCap
# if in mesh add more delivery score # if in mesh add more delivery score
if stats[].inMesh: if tstats[].inMesh:
stats[].meshMessageDeliveries += 1 tstats[].meshMessageDeliveries += 1
if stats[].meshMessageDeliveries > topicParams.meshMessageDeliveriesCap: if tstats[].meshMessageDeliveries > topicParams.meshMessageDeliveriesCap:
stats[].meshMessageDeliveries = topicParams.meshMessageDeliveriesCap tstats[].meshMessageDeliveries = topicParams.meshMessageDeliveriesCap
do: # make sure we don't loose this information do: # make sure we don't loose this information
pstats[].topicInfos[t] = TopicInfo(firstMessageDeliveries: 1, meshMessageDeliveries: 1)
do: # make sure we don't loose this information
g.initPeerStats(peer) do:
var stats = PeerStats()
stats.topicInfos[t] = TopicInfo(firstMessageDeliveries: 1, meshMessageDeliveries: 1) stats.topicInfos[t] = TopicInfo(firstMessageDeliveries: 1, meshMessageDeliveries: 1)
stats
g.floodsub.withValue(t, peers): toSendPeers.incl(peers[]) g.floodsub.withValue(t, peers): toSendPeers.incl(peers[])
g.mesh.withValue(t, peers): toSendPeers.incl(peers[]) g.mesh.withValue(t, peers): toSendPeers.incl(peers[])
@ -625,17 +614,10 @@ method initPubSub*(g: GossipSub) =
randomize() randomize()
# init the floodsub stuff here, we customize timedcache in gossip! # init the floodsub stuff here, we customize timedcache in gossip!
g.floodsub = initTable[string, HashSet[PubSubPeer]]()
g.seen = TimedCache[MessageID].init(g.parameters.seenTTL) g.seen = TimedCache[MessageID].init(g.parameters.seenTTL)
# init gossip stuff # init gossip stuff
g.mcache = MCache.init(g.parameters.historyGossip, g.parameters.historyLength) g.mcache = MCache.init(g.parameters.historyGossip, g.parameters.historyLength)
g.mesh = initTable[string, HashSet[PubSubPeer]]() # meshes - topic to peer
g.fanout = initTable[string, HashSet[PubSubPeer]]() # fanout - topic to peer
g.gossipsub = initTable[string, HashSet[PubSubPeer]]()# topic to peer map of all gossipsub peers
g.lastFanoutPubSub = initTable[string, Moment]() # last publish time for fanout topics
g.gossip = initTable[string, seq[ControlIHave]]() # pending gossip
g.control = initTable[string, ControlMessage]() # pending control messages
var rng = newRng() var rng = newRng()
g.randomBytes = newSeqUninitialized[byte](32) g.randomBytes = newSeqUninitialized[byte](32)
brHmacDrbgGenerate(rng[], g.randomBytes) brHmacDrbgGenerate(rng[], g.randomBytes)

View File

@ -7,7 +7,9 @@
## This file may not be copied, modified, or distributed except according to ## This file may not be copied, modified, or distributed except according to
## those terms. ## those terms.
import std/[tables, strutils, sequtils, sets, algorithm] # {.push raises: [Defect].} TODO compile error on windows due to chronicles?
import std/[tables, sequtils, sets, algorithm]
import random # for shuffle import random # for shuffle
import chronos, chronicles, metrics import chronos, chronicles, metrics
import "."/[types, scoring] import "."/[types, scoring]
@ -25,56 +27,48 @@ declareGauge(libp2p_gossipsub_low_peers_topics, "number of topics in mesh with a
declareGauge(libp2p_gossipsub_healthy_peers_topics, "number of topics in mesh with at least dlow peers (but below dhigh)") declareGauge(libp2p_gossipsub_healthy_peers_topics, "number of topics in mesh with at least dlow peers (but below dhigh)")
declareCounter(libp2p_gossipsub_above_dhigh_condition, "number of above dhigh pruning branches ran", labels = ["topic"]) declareCounter(libp2p_gossipsub_above_dhigh_condition, "number of above dhigh pruning branches ran", labels = ["topic"])
proc grafted*(g: GossipSub, p: PubSubPeer, topic: string) = proc grafted*(g: GossipSub, p: PubSubPeer, topic: string) {.raises: [Defect].} =
g.peerStats.withValue(p.peerId, stats): g.withPeerStats(p.peerId) do (stats: var PeerStats):
var info = stats.topicInfos.getOrDefault(topic) var info = stats.topicInfos.getOrDefault(topic)
info.graftTime = Moment.now() info.graftTime = Moment.now()
info.meshTime = 0.seconds info.meshTime = 0.seconds
info.inMesh = true info.inMesh = true
info.meshMessageDeliveriesActive = false info.meshMessageDeliveriesActive = false
# mgetOrPut does not work, so we gotta do this without referencing
stats.topicInfos[topic] = info stats.topicInfos[topic] = info
assert(g.peerStats[p.peerId].topicInfos[topic].inMesh == true)
trace "grafted", peer=p, topic trace "grafted", peer=p, topic
do:
g.initPeerStats(p)
g.grafted(p, topic)
proc pruned*(g: GossipSub, p: PubSubPeer, topic: string) = proc pruned*(g: GossipSub, p: PubSubPeer, topic: string) {.raises: [Defect].} =
let backoff = Moment.fromNow(g.parameters.pruneBackoff) let backoff = Moment.fromNow(g.parameters.pruneBackoff)
g.backingOff g.backingOff
.mgetOrPut(topic, initTable[PeerID, Moment]()) .mgetOrPut(topic, initTable[PeerID, Moment]())[p.peerId] = backoff
.mgetOrPut(p.peerId, backoff) = backoff
g.peerStats.withValue(p.peerId, stats): g.peerStats.withValue(p.peerId, stats):
if topic in stats.topicInfos: stats.topicInfos.withValue(topic, info):
var info = stats.topicInfos[topic] g.topicParams.withValue(topic, topicParams):
if topic in g.topicParams:
let topicParams = g.topicParams[topic]
# penalize a peer that delivered no message # penalize a peer that delivered no message
let threshold = topicParams.meshMessageDeliveriesThreshold let threshold = topicParams[].meshMessageDeliveriesThreshold
if info.inMesh and info.meshMessageDeliveriesActive and info.meshMessageDeliveries < threshold: if info[].inMesh and
info[].meshMessageDeliveriesActive and
info[].meshMessageDeliveries < threshold:
let deficit = threshold - info.meshMessageDeliveries let deficit = threshold - info.meshMessageDeliveries
info.meshFailurePenalty += deficit * deficit info[].meshFailurePenalty += deficit * deficit
info.inMesh = false info.inMesh = false
# mgetOrPut does not work, so we gotta do this without referencing
stats.topicInfos[topic] = info
trace "pruned", peer=p, topic trace "pruned", peer=p, topic
proc handleBackingOff*(t: var BackoffTable, topic: string) = proc handleBackingOff*(t: var BackoffTable, topic: string) {.raises: [Defect].} =
let now = Moment.now() let now = Moment.now()
var expired = toSeq(t.getOrDefault(topic).pairs()) var expired = toSeq(t.getOrDefault(topic).pairs())
expired.keepIf do (pair: tuple[peer: PeerID, expire: Moment]) -> bool: expired.keepIf do (pair: tuple[peer: PeerID, expire: Moment]) -> bool:
now >= pair.expire now >= pair.expire
for (peer, _) in expired: for (peer, _) in expired:
t.mgetOrPut(topic, initTable[PeerID, Moment]()).del(peer) t.withValue(topic, v):
v[].del(peer)
proc peerExchangeList*(g: GossipSub, topic: string): seq[PeerInfoMsg] = proc peerExchangeList*(g: GossipSub, topic: string): seq[PeerInfoMsg] {.raises: [Defect].} =
var peers = g.gossipsub.getOrDefault(topic, initHashSet[PubSubPeer]()).toSeq() var peers = g.gossipsub.getOrDefault(topic, initHashSet[PubSubPeer]()).toSeq()
peers.keepIf do (x: PubSubPeer) -> bool: peers.keepIf do (x: PubSubPeer) -> bool:
x.score >= 0.0 x.score >= 0.0
@ -85,19 +79,16 @@ proc peerExchangeList*(g: GossipSub, topic: string): seq[PeerInfoMsg] =
proc handleGraft*(g: GossipSub, proc handleGraft*(g: GossipSub,
peer: PubSubPeer, peer: PubSubPeer,
grafts: seq[ControlGraft]): seq[ControlPrune] = grafts: seq[ControlGraft]): seq[ControlPrune] = # {.raises: [Defect].} TODO chronicles exception on windows
for graft in grafts: for graft in grafts:
let topic = graft.topicID let topic = graft.topicID
logScope: trace "peer grafted topic", peer, topic
peer
topic
trace "peer grafted topic"
# It is an error to GRAFT on a explicit peer # It is an error to GRAFT on a explicit peer
if peer.peerId in g.parameters.directPeers: if peer.peerId in g.parameters.directPeers:
# receiving a graft from a direct peer should yield a more prominent warning (protocol violation) # receiving a graft from a direct peer should yield a more prominent warning (protocol violation)
warn "attempt to graft an explicit peer, peering agreements should be reciprocal", peer=peer.peerId, topic warn "attempt to graft an explicit peer, peering agreements should be reciprocal",
peer, topic
# and such an attempt should be logged and rejected with a PRUNE # and such an attempt should be logged and rejected with a PRUNE
result.add(ControlPrune( result.add(ControlPrune(
topicID: topic, topicID: topic,
@ -106,8 +97,7 @@ proc handleGraft*(g: GossipSub,
let backoff = Moment.fromNow(g.parameters.pruneBackoff) let backoff = Moment.fromNow(g.parameters.pruneBackoff)
g.backingOff g.backingOff
.mgetOrPut(topic, initTable[PeerID, Moment]()) .mgetOrPut(topic, initTable[PeerID, Moment]())[peer.peerId] = backoff
.mgetOrPut(peer.peerId, backoff) = backoff
peer.behaviourPenalty += 0.1 peer.behaviourPenalty += 0.1
@ -116,8 +106,7 @@ proc handleGraft*(g: GossipSub,
if g.backingOff if g.backingOff
.getOrDefault(topic) .getOrDefault(topic)
.getOrDefault(peer.peerId) > Moment.now(): .getOrDefault(peer.peerId) > Moment.now():
debug "attempt to graft a backingOff peer", peer=peer.peerId, debug "attempt to graft a backingOff peer", peer, topic
topic
# and such an attempt should be logged and rejected with a PRUNE # and such an attempt should be logged and rejected with a PRUNE
result.add(ControlPrune( result.add(ControlPrune(
topicID: topic, topicID: topic,
@ -126,17 +115,12 @@ proc handleGraft*(g: GossipSub,
let backoff = Moment.fromNow(g.parameters.pruneBackoff) let backoff = Moment.fromNow(g.parameters.pruneBackoff)
g.backingOff g.backingOff
.mgetOrPut(topic, initTable[PeerID, Moment]()) .mgetOrPut(topic, initTable[PeerID, Moment]())[peer.peerId] = backoff
.mgetOrPut(peer.peerId, backoff) = backoff
peer.behaviourPenalty += 0.1 peer.behaviourPenalty += 0.1
continue continue
# Notice this might not be necessary anymore
if peer.peerId notin g.peerStats:
g.initPeerStats(peer)
# not in the spec exactly, but let's avoid way too low score peers # not in the spec exactly, but let's avoid way too low score peers
# other clients do it too also was an audit recommendation # other clients do it too also was an audit recommendation
if peer.score < g.parameters.publishThreshold: if peer.score < g.parameters.publishThreshold:
@ -153,18 +137,19 @@ proc handleGraft*(g: GossipSub,
g.grafted(peer, topic) g.grafted(peer, topic)
g.fanout.removePeer(topic, peer) g.fanout.removePeer(topic, peer)
else: else:
trace "peer already in mesh" trace "peer already in mesh", peer, topic
else: else:
trace "pruning grafting peer, mesh full", peer, score = peer.score, mesh = g.mesh.peers(topic) trace "pruning grafting peer, mesh full",
peer, topic, score = peer.score, mesh = g.mesh.peers(topic)
result.add(ControlPrune( result.add(ControlPrune(
topicID: topic, topicID: topic,
peers: g.peerExchangeList(topic), peers: g.peerExchangeList(topic),
backoff: g.parameters.pruneBackoff.seconds.uint64)) backoff: g.parameters.pruneBackoff.seconds.uint64))
else: else:
trace "peer grafting topic we're not interested in", topic trace "peer grafting topic we're not interested in", peer, topic
# gossip 1.1, we do not send a control message prune anymore # gossip 1.1, we do not send a control message prune anymore
proc handlePrune*(g: GossipSub, peer: PubSubPeer, prunes: seq[ControlPrune]) = proc handlePrune*(g: GossipSub, peer: PubSubPeer, prunes: seq[ControlPrune]) {.raises: [Defect].} =
for prune in prunes: for prune in prunes:
let topic = prune.topicID let topic = prune.topicID
@ -185,8 +170,7 @@ proc handlePrune*(g: GossipSub, peer: PubSubPeer, prunes: seq[ControlPrune]) =
current = g.backingOff.getOrDefault(topic).getOrDefault(peer.peerId) current = g.backingOff.getOrDefault(topic).getOrDefault(peer.peerId)
if backoff > current: if backoff > current:
g.backingOff g.backingOff
.mgetOrPut(topic, initTable[PeerID, Moment]()) .mgetOrPut(topic, initTable[PeerID, Moment]())[peer.peerId] = backoff
.mgetOrPut(peer.peerId, backoff) = backoff
trace "pruning rpc received peer", peer, score = peer.score trace "pruning rpc received peer", peer, score = peer.score
g.pruned(peer, topic) g.pruned(peer, topic)
@ -198,7 +182,7 @@ proc handlePrune*(g: GossipSub, peer: PubSubPeer, prunes: seq[ControlPrune]) =
proc handleIHave*(g: GossipSub, proc handleIHave*(g: GossipSub,
peer: PubSubPeer, peer: PubSubPeer,
ihaves: seq[ControlIHave]): ControlIWant = ihaves: seq[ControlIHave]): ControlIWant {.raises: [Defect].} =
if peer.score < g.parameters.gossipThreshold: if peer.score < g.parameters.gossipThreshold:
trace "ihave: ignoring low score peer", peer, score = peer.score trace "ihave: ignoring low score peer", peer, score = peer.score
elif peer.iHaveBudget <= 0: elif peer.iHaveBudget <= 0:
@ -229,7 +213,7 @@ proc handleIHave*(g: GossipSub,
proc handleIWant*(g: GossipSub, proc handleIWant*(g: GossipSub,
peer: PubSubPeer, peer: PubSubPeer,
iwants: seq[ControlIWant]): seq[Message] = iwants: seq[ControlIWant]): seq[Message] {.raises: [Defect].} =
if peer.score < g.parameters.gossipThreshold: if peer.score < g.parameters.gossipThreshold:
trace "iwant: ignoring low score peer", peer, score = peer.score trace "iwant: ignoring low score peer", peer, score = peer.score
elif peer.iWantBudget <= 0: elif peer.iWantBudget <= 0:
@ -249,7 +233,7 @@ proc handleIWant*(g: GossipSub,
else: else:
return return
proc commitMetrics(metrics: var MeshMetrics) = proc commitMetrics(metrics: var MeshMetrics) {.raises: [Defect].} =
libp2p_gossipsub_low_peers_topics.set(metrics.lowPeersTopics) libp2p_gossipsub_low_peers_topics.set(metrics.lowPeersTopics)
libp2p_gossipsub_no_peers_topics.set(metrics.noPeersTopics) libp2p_gossipsub_no_peers_topics.set(metrics.noPeersTopics)
libp2p_gossipsub_under_dout_topics.set(metrics.underDoutTopics) libp2p_gossipsub_under_dout_topics.set(metrics.underDoutTopics)
@ -258,7 +242,7 @@ proc commitMetrics(metrics: var MeshMetrics) =
libp2p_gossipsub_peers_per_topic_fanout.set(metrics.otherPeersPerTopicFanout, labelValues = ["other"]) libp2p_gossipsub_peers_per_topic_fanout.set(metrics.otherPeersPerTopicFanout, labelValues = ["other"])
libp2p_gossipsub_peers_per_topic_mesh.set(metrics.otherPeersPerTopicMesh, labelValues = ["other"]) libp2p_gossipsub_peers_per_topic_mesh.set(metrics.otherPeersPerTopicMesh, labelValues = ["other"])
proc rebalanceMesh*(g: GossipSub, topic: string, metrics: ptr MeshMetrics = nil) = proc rebalanceMesh*(g: GossipSub, topic: string, metrics: ptr MeshMetrics = nil) {.raises: [Defect].} =
logScope: logScope:
topic topic
mesh = g.mesh.peers(topic) mesh = g.mesh.peers(topic)
@ -352,7 +336,7 @@ proc rebalanceMesh*(g: GossipSub, topic: string, metrics: ptr MeshMetrics = nil)
libp2p_gossipsub_above_dhigh_condition.inc(labelValues = ["other"]) libp2p_gossipsub_above_dhigh_condition.inc(labelValues = ["other"])
# prune peers if we've gone over Dhi # prune peers if we've gone over Dhi
prunes = toSeq(g.mesh[topic]) prunes = toSeq(try: g.mesh[topic] except KeyError: raiseAssert "have peers")
# avoid pruning peers we are currently grafting in this heartbeat # avoid pruning peers we are currently grafting in this heartbeat
prunes.keepIf do (x: PubSubPeer) -> bool: x notin grafts prunes.keepIf do (x: PubSubPeer) -> bool: x notin grafts
@ -400,7 +384,7 @@ proc rebalanceMesh*(g: GossipSub, topic: string, metrics: ptr MeshMetrics = nil)
# opportunistic grafting, by spec mesh should not be empty... # opportunistic grafting, by spec mesh should not be empty...
if g.mesh.peers(topic) > 1: if g.mesh.peers(topic) > 1:
var peers = toSeq(g.mesh[topic]) var peers = toSeq(try: g.mesh[topic] except KeyError: raiseAssert "have peers")
# grafting so high score has priority # grafting so high score has priority
peers.sort(byScore, SortOrder.Descending) peers.sort(byScore, SortOrder.Descending)
let medianIdx = peers.len div 2 let medianIdx = peers.len div 2
@ -470,37 +454,38 @@ proc rebalanceMesh*(g: GossipSub, topic: string, metrics: ptr MeshMetrics = nil)
backoff: g.parameters.pruneBackoff.seconds.uint64)]))) backoff: g.parameters.pruneBackoff.seconds.uint64)])))
g.broadcast(prunes, prune) g.broadcast(prunes, prune)
proc dropFanoutPeers*(g: GossipSub) = proc dropFanoutPeers*(g: GossipSub) {.raises: [Defect].} =
# drop peers that we haven't published to in # drop peers that we haven't published to in
# GossipSubFanoutTTL seconds # GossipSubFanoutTTL seconds
let now = Moment.now() let now = Moment.now()
for topic in toSeq(g.lastFanoutPubSub.keys): var drops: seq[string]
let val = g.lastFanoutPubSub[topic] for topic, val in g.lastFanoutPubSub:
if now > val: if now > val:
g.fanout.del(topic) g.fanout.del(topic)
g.lastFanoutPubSub.del(topic) drops.add topic
trace "dropping fanout topic", topic trace "dropping fanout topic", topic
for topic in drops:
g.lastFanoutPubSub.del topic
proc replenishFanout*(g: GossipSub, topic: string) = proc replenishFanout*(g: GossipSub, topic: string) {.raises: [Defect].} =
## get fanout peers for a topic ## get fanout peers for a topic
logScope: topic logScope: topic
trace "about to replenish fanout" trace "about to replenish fanout"
if g.fanout.peers(topic) < g.parameters.dLow: if g.fanout.peers(topic) < g.parameters.dLow:
trace "replenishing fanout", peers = g.fanout.peers(topic) trace "replenishing fanout", peers = g.fanout.peers(topic)
if topic in g.gossipsub: for peer in g.gossipsub.getOrDefault(topic):
for peer in g.gossipsub[topic]: if g.fanout.addPeer(topic, peer):
if g.fanout.addPeer(topic, peer): if g.fanout.peers(topic) == g.parameters.d:
if g.fanout.peers(topic) == g.parameters.d: break
break
trace "fanout replenished with peers", peers = g.fanout.peers(topic) trace "fanout replenished with peers", peers = g.fanout.peers(topic)
proc getGossipPeers*(g: GossipSub): Table[PubSubPeer, ControlMessage] {.gcsafe.} = proc getGossipPeers*(g: GossipSub): Table[PubSubPeer, ControlMessage] {.raises: [Defect].} =
## gossip iHave messages to peers ## gossip iHave messages to peers
## ##
libp2p_gossipsub_cache_window_size.set(0) var cacheWindowSize = 0
trace "getting gossip peers (iHave)" trace "getting gossip peers (iHave)"
let topics = toHashSet(toSeq(g.mesh.keys)) + toHashSet(toSeq(g.fanout.keys)) let topics = toHashSet(toSeq(g.mesh.keys)) + toHashSet(toSeq(g.fanout.keys))
@ -515,7 +500,7 @@ proc getGossipPeers*(g: GossipSub): Table[PubSubPeer, ControlMessage] {.gcsafe.}
var midsSeq = toSeq(mids) var midsSeq = toSeq(mids)
libp2p_gossipsub_cache_window_size.inc(midsSeq.len.int64) cacheWindowSize += midsSeq.len
# not in spec # not in spec
# similar to rust: https://github.com/sigp/rust-libp2p/blob/f53d02bc873fef2bf52cd31e3d5ce366a41d8a8c/protocols/gossipsub/src/behaviour.rs#L2101 # similar to rust: https://github.com/sigp/rust-libp2p/blob/f53d02bc873fef2bf52cd31e3d5ce366a41d8a8c/protocols/gossipsub/src/behaviour.rs#L2101
@ -546,78 +531,76 @@ proc getGossipPeers*(g: GossipSub): Table[PubSubPeer, ControlMessage] {.gcsafe.}
allPeers.setLen(target) allPeers.setLen(target)
for peer in allPeers: for peer in allPeers:
if peer notin result: result.mGetOrPut(peer, ControlMessage()).ihave.add(ihave)
result[peer] = ControlMessage()
result[peer].ihave.add(ihave) libp2p_gossipsub_cache_window_size.set(cacheWindowSize.int64)
proc onHeartbeat(g: GossipSub) {.raises: [Defect].} =
# reset IWANT budget
# reset IHAVE cap
block:
for peer in g.peers.values:
peer.iWantBudget = IWantPeerBudget
peer.iHaveBudget = IHavePeerBudget
g.updateScores()
var meshMetrics = MeshMetrics()
for t in toSeq(g.topics.keys):
# remove expired backoffs
block:
handleBackingOff(g.backingOff, t)
# prune every negative score peer
# do this before relance
# in order to avoid grafted -> pruned in the same cycle
let meshPeers = g.mesh.getOrDefault(t)
var prunes: seq[PubSubPeer]
for peer in meshPeers:
if peer.score < 0.0:
trace "pruning negative score peer", peer, score = peer.score
g.pruned(peer, t)
g.mesh.removePeer(t, peer)
prunes &= peer
if prunes.len > 0:
let prune = RPCMsg(control: some(ControlMessage(
prune: @[ControlPrune(
topicID: t,
peers: g.peerExchangeList(t),
backoff: g.parameters.pruneBackoff.seconds.uint64)])))
g.broadcast(prunes, prune)
# pass by ptr in order to both signal we want to update metrics
# and as well update the struct for each topic during this iteration
g.rebalanceMesh(t, addr meshMetrics)
commitMetrics(meshMetrics)
g.dropFanoutPeers()
# replenish known topics to the fanout
for t in toSeq(g.fanout.keys):
g.replenishFanout(t)
let peers = g.getGossipPeers()
for peer, control in peers:
# only ihave from here
for ihave in control.ihave:
if g.knownTopics.contains(ihave.topicID):
libp2p_pubsub_broadcast_ihave.inc(labelValues = [ihave.topicID])
else:
libp2p_pubsub_broadcast_ihave.inc(labelValues = ["generic"])
g.send(peer, RPCMsg(control: some(control)))
g.mcache.shift() # shift the cache
# {.pop.} # raises [Defect]
proc heartbeat*(g: GossipSub) {.async.} = proc heartbeat*(g: GossipSub) {.async.} =
while g.heartbeatRunning: while g.heartbeatRunning:
try: trace "running heartbeat", instance = cast[int](g)
trace "running heartbeat", instance = cast[int](g) g.onHeartbeat()
# reset IWANT budget
# reset IHAVE cap
block:
for peer in g.peers.values:
peer.iWantBudget = IWantPeerBudget
peer.iHaveBudget = IHavePeerBudget
g.updateScores()
var meshMetrics = MeshMetrics()
for t in toSeq(g.topics.keys):
# remove expired backoffs
block:
handleBackingOff(g.backingOff, t)
# prune every negative score peer
# do this before relance
# in order to avoid grafted -> pruned in the same cycle
let meshPeers = g.mesh.getOrDefault(t)
var prunes: seq[PubSubPeer]
for peer in meshPeers:
if peer.score < 0.0:
trace "pruning negative score peer", peer, score = peer.score
g.pruned(peer, t)
g.mesh.removePeer(t, peer)
prunes &= peer
if prunes.len > 0:
let prune = RPCMsg(control: some(ControlMessage(
prune: @[ControlPrune(
topicID: t,
peers: g.peerExchangeList(t),
backoff: g.parameters.pruneBackoff.seconds.uint64)])))
g.broadcast(prunes, prune)
# pass by ptr in order to both signal we want to update metrics
# and as well update the struct for each topic during this iteration
g.rebalanceMesh(t, addr meshMetrics)
commitMetrics(meshMetrics)
g.dropFanoutPeers()
# replenish known topics to the fanout
for t in toSeq(g.fanout.keys):
g.replenishFanout(t)
let peers = g.getGossipPeers()
for peer, control in peers:
# only ihave from here
for ihave in control.ihave:
if g.knownTopics.contains(ihave.topicID):
libp2p_pubsub_broadcast_ihave.inc(labelValues = [ihave.topicID])
else:
libp2p_pubsub_broadcast_ihave.inc(labelValues = ["generic"])
g.send(peer, RPCMsg(control: some(control)))
g.mcache.shift() # shift the cache
except CancelledError as exc:
raise exc
except CatchableError as exc:
warn "exception ocurred in gossipsub heartbeat", exc = exc.msg,
trace = exc.getStackTrace()
for trigger in g.heartbeatEvents: for trigger in g.heartbeatEvents:
trace "firing heartbeat event", instance = cast[int](g) trace "firing heartbeat event", instance = cast[int](g)

View File

@ -7,7 +7,9 @@
## This file may not be copied, modified, or distributed except according to ## This file may not be copied, modified, or distributed except according to
## those terms. ## those terms.
import std/[tables, strutils, sets, algorithm, options] {.push raises: [Defect].}
import std/[tables, sets, options]
import chronos, chronicles, metrics import chronos, chronicles, metrics
import "."/[types] import "."/[types]
import ".."/[pubsubpeer] import ".."/[pubsubpeer]
@ -23,12 +25,17 @@ declareGauge(libp2p_gossipsub_peers_score_appScore, "Detailed gossipsub scoring
declareGauge(libp2p_gossipsub_peers_score_behaviourPenalty, "Detailed gossipsub scoring metric", labels = ["agent"]) declareGauge(libp2p_gossipsub_peers_score_behaviourPenalty, "Detailed gossipsub scoring metric", labels = ["agent"])
declareGauge(libp2p_gossipsub_peers_score_colocationFactor, "Detailed gossipsub scoring metric", labels = ["agent"]) declareGauge(libp2p_gossipsub_peers_score_colocationFactor, "Detailed gossipsub scoring metric", labels = ["agent"])
proc initPeerStats*(g: GossipSub, peer: PubSubPeer, stats: PeerStats = PeerStats()) = proc withPeerStats*(
var initialStats = stats g: GossipSub, peerId: PeerId,
initialStats.expire = Moment.now() + g.parameters.retainScore action: proc (stats: var PeerStats) {.gcsafe, raises: [Defect].}) =
g.peerStats[peer.peerId] = initialStats ## Add or update peer statistics for a particular peer id - the statistics
peer.iWantBudget = IWantPeerBudget ## are retained across multiple connections until they expire
peer.iHaveBudget = IHavePeerBudget g.peerStats.withValue(peerId, stats) do:
action(stats[])
do:
action(g.peerStats.mgetOrPut(peerId, PeerStats(
expire: Moment.now() + g.parameters.retainScore
)))
func `/`(a, b: Duration): float64 = func `/`(a, b: Duration): float64 =
let let
@ -46,15 +53,16 @@ proc colocationFactor(g: GossipSub, peer: PubSubPeer): float64 =
address = peer.address.get() address = peer.address.get()
g.peersInIP.mgetOrPut(address, initHashSet[PeerID]()).incl(peer.peerId) g.peersInIP.mgetOrPut(address, initHashSet[PeerID]()).incl(peer.peerId)
let let
ipPeers = g.peersInIP[address] ipPeers = g.peersInIP.getOrDefault(address).len().float64
len = ipPeers.len.float64 if ipPeers > g.parameters.ipColocationFactorThreshold:
if len > g.parameters.ipColocationFactorThreshold: trace "colocationFactor over threshold", peer, address, ipPeers
trace "colocationFactor over threshold", peer, address, len let over = ipPeers - g.parameters.ipColocationFactorThreshold
let over = len - g.parameters.ipColocationFactorThreshold
over * over over * over
else: else:
0.0 0.0
{.pop.}
proc disconnectPeer(g: GossipSub, peer: PubSubPeer) {.async.} = proc disconnectPeer(g: GossipSub, peer: PubSubPeer) {.async.} =
when defined(libp2p_agents_metrics): when defined(libp2p_agents_metrics):
let agent = let agent =
@ -77,12 +85,14 @@ proc disconnectPeer(g: GossipSub, peer: PubSubPeer) {.async.} =
try: try:
await g.switch.disconnect(peer.peerId) await g.switch.disconnect(peer.peerId)
except CancelledError: except CatchableError as exc: # Never cancelled
raise
except CatchableError as exc:
trace "Failed to close connection", peer, error = exc.name, msg = exc.msg trace "Failed to close connection", peer, error = exc.name, msg = exc.msg
{.push raises: [Defect].}
proc updateScores*(g: GossipSub) = # avoid async proc updateScores*(g: GossipSub) = # avoid async
## https://github.com/libp2p/specs/blob/master/pubsub/gossipsub/gossipsub-v1.1.md#the-score-function
##
trace "updating scores", peers = g.peers.len trace "updating scores", peers = g.peers.len
let now = Moment.now() let now = Moment.now()
@ -101,6 +111,7 @@ proc updateScores*(g: GossipSub) = # avoid async
var var
n_topics = 0 n_topics = 0
is_grafted = 0 is_grafted = 0
score = 0.0
# Per topic # Per topic
for topic, topicParams in g.topicParams: for topic, topicParams in g.topicParams:
@ -144,7 +155,7 @@ proc updateScores*(g: GossipSub) = # avoid async
trace "updated peer topic's scores", peer, topic, info, topicScore trace "updated peer topic's scores", peer, topic, info, topicScore
peer.score += topicScore * topicParams.topicWeight score += topicScore * topicParams.topicWeight
# Score metrics # Score metrics
when defined(libp2p_agents_metrics): when defined(libp2p_agents_metrics):
@ -193,12 +204,14 @@ proc updateScores*(g: GossipSub) = # avoid async
# commit our changes, mgetOrPut does NOT work as wanted with value types (lent?) # commit our changes, mgetOrPut does NOT work as wanted with value types (lent?)
stats.topicInfos[topic] = info stats.topicInfos[topic] = info
peer.score += peer.appScore * g.parameters.appSpecificWeight score += peer.appScore * g.parameters.appSpecificWeight
peer.score += peer.behaviourPenalty * peer.behaviourPenalty * g.parameters.behaviourPenaltyWeight
# The value of the parameter is the square of the counter and is mixed with a negative weight.
score += peer.behaviourPenalty * peer.behaviourPenalty * g.parameters.behaviourPenaltyWeight
let colocationFactor = g.colocationFactor(peer) let colocationFactor = g.colocationFactor(peer)
peer.score += colocationFactor * g.parameters.ipColocationFactorWeight score += colocationFactor * g.parameters.ipColocationFactorWeight
# Score metrics # Score metrics
when defined(libp2p_agents_metrics): when defined(libp2p_agents_metrics):
@ -229,17 +242,19 @@ proc updateScores*(g: GossipSub) = # avoid async
if peer.behaviourPenalty < g.parameters.decayToZero: if peer.behaviourPenalty < g.parameters.decayToZero:
peer.behaviourPenalty = 0 peer.behaviourPenalty = 0
peer.score = score
# copy into stats the score to keep until expired # copy into stats the score to keep until expired
stats.score = peer.score stats.score = peer.score
stats.appScore = peer.appScore stats.appScore = peer.appScore
stats.behaviourPenalty = peer.behaviourPenalty stats.behaviourPenalty = peer.behaviourPenalty
stats.expire = Moment.now() + g.parameters.retainScore # refresh expiration stats.expire = now + g.parameters.retainScore # refresh expiration
assert(g.peerStats[peer.peerId].score == peer.score) # nim sanity check
trace "updated peer's score", peer, score = peer.score, n_topics, is_grafted trace "updated peer's score", peer, score = peer.score, n_topics, is_grafted
if g.parameters.disconnectBadPeers and stats.score < g.parameters.graylistThreshold: if g.parameters.disconnectBadPeers and stats.score < g.parameters.graylistThreshold:
debug "disconnecting bad score peer", peer, score = peer.score debug "disconnecting bad score peer", peer, score = peer.score
asyncSpawn g.disconnectPeer(peer) asyncSpawn(try: g.disconnectPeer(peer) except Exception as exc: raiseAssert exc.msg)
when defined(libp2p_agents_metrics): when defined(libp2p_agents_metrics):
libp2p_gossipsub_peers_scores.inc(peer.score, labelValues = [agent]) libp2p_gossipsub_peers_scores.inc(peer.score, labelValues = [agent])
@ -257,13 +272,5 @@ proc punishInvalidMessage*(g: GossipSub, peer: PubSubPeer, topics: seq[string])
continue continue
# update stats # update stats
g.peerStats.withValue(peer.peerId, stats): g.withPeerStats(peer.peerId) do (stats: var PeerStats):
stats[].topicInfos.withValue(t, tstats): stats.topicInfos.mgetOrPut(t, TopicInfo()).invalidMessageDeliveries += 1
tstats[].invalidMessageDeliveries += 1
do: # if we have no stats populate!
stats[].topicInfos[t] = TopicInfo(invalidMessageDeliveries: 1)
do: # if we have no stats populate!
g.initPeerStats(peer) do:
var stats = PeerStats()
stats.topicInfos[t] = TopicInfo(invalidMessageDeliveries: 1)
stats

View File

@ -7,6 +7,8 @@
## This file may not be copied, modified, or distributed except according to ## This file may not be copied, modified, or distributed except according to
## those terms. ## those terms.
{.push raises: [Defect].}
import std/[sets, tables, options] import std/[sets, tables, options]
import rpc/[messages] import rpc/[messages]
@ -24,12 +26,13 @@ type
func get*(c: MCache, mid: MessageID): Option[Message] = func get*(c: MCache, mid: MessageID): Option[Message] =
if mid in c.msgs: if mid in c.msgs:
some(c.msgs[mid]) try: some(c.msgs[mid])
except KeyError: raiseAssert "checked"
else: else:
none(Message) none(Message)
func contains*(c: MCache, mid: MessageID): bool = func contains*(c: MCache, mid: MessageID): bool =
c.get(mid).isSome mid in c.msgs
func put*(c: var MCache, msgId: MessageID, msg: Message) = func put*(c: var MCache, msgId: MessageID, msg: Message) =
if not c.msgs.hasKeyOrPut(msgId, msg): if not c.msgs.hasKeyOrPut(msgId, msg):
@ -37,8 +40,6 @@ func put*(c: var MCache, msgId: MessageID, msg: Message) =
c.history[0].add(CacheEntry(mid: msgId, topicIDs: msg.topicIDs)) c.history[0].add(CacheEntry(mid: msgId, topicIDs: msg.topicIDs))
func window*(c: MCache, topic: string): HashSet[MessageID] = func window*(c: MCache, topic: string): HashSet[MessageID] =
result = initHashSet[MessageID]()
let let
len = min(c.windowSize, c.history.len) len = min(c.windowSize, c.history.len)

View File

@ -7,17 +7,21 @@
## This file may not be copied, modified, or distributed except according to ## This file may not be copied, modified, or distributed except according to
## those terms. ## those terms.
import tables, sequtils, sets {.push raises: [Defect].}
import pubsubpeer, ../../peerid
import std/[tables, sets]
import ./pubsubpeer, ../../peerid
type type
PeerTable* = Table[string, HashSet[PubSubPeer]] # topic string to peer map PeerTable* = Table[string, HashSet[PubSubPeer]] # topic string to peer map
proc hasPeerID*(t: PeerTable, topic: string, peerId: PeerID): bool = proc hasPeerID*(t: PeerTable, topic: string, peerId: PeerID): bool =
if topic in t: if topic in t:
for peer in t[topic]: try:
if peer.peerId == peerId: for peer in t[topic]:
return true if peer.peerId == peerId:
return true
except KeyError: raiseAssert "checked with in"
false false
func addPeer*(table: var PeerTable, topic: string, peer: PubSubPeer): bool = func addPeer*(table: var PeerTable, topic: string, peer: PubSubPeer): bool =
@ -34,10 +38,13 @@ func removePeer*(table: var PeerTable, topic: string, peer: PubSubPeer) =
table.del(topic) table.del(topic)
func hasPeer*(table: PeerTable, topic: string, peer: PubSubPeer): bool = func hasPeer*(table: PeerTable, topic: string, peer: PubSubPeer): bool =
(topic in table) and (peer in table[topic]) try:
(topic in table) and (peer in table[topic])
except KeyError: raiseAssert "checked with in"
func peers*(table: PeerTable, topic: string): int = func peers*(table: PeerTable, topic: string): int =
if topic in table: if topic in table:
table[topic].len try: table[topic].len
except KeyError: raiseAssert "checked with in"
else: else:
0 0

View File

@ -119,7 +119,7 @@ method unsubscribePeer*(p: PubSub, peerId: PeerID) {.base.} =
libp2p_pubsub_peers.set(p.peers.len.int64) libp2p_pubsub_peers.set(p.peers.len.int64)
proc send*(p: PubSub, peer: PubSubPeer, msg: RPCMsg) = proc send*(p: PubSub, peer: PubSubPeer, msg: RPCMsg) {.raises: [Defect].} =
## Attempt to send `msg` to remote peer ## Attempt to send `msg` to remote peer
## ##
@ -129,7 +129,7 @@ proc send*(p: PubSub, peer: PubSubPeer, msg: RPCMsg) =
proc broadcast*( proc broadcast*(
p: PubSub, p: PubSub,
sendPeers: openArray[PubSubPeer], sendPeers: openArray[PubSubPeer],
msg: RPCMsg) = # raises: [Defect] msg: RPCMsg) {.raises: [Defect].} =
## Attempt to send `msg` to the given peers ## Attempt to send `msg` to the given peers
let npeers = sendPeers.len.int64 let npeers = sendPeers.len.int64
@ -278,9 +278,7 @@ proc getOrCreatePeer*(
proc dropConnAsync(peer: PubsubPeer) {.async.} = proc dropConnAsync(peer: PubsubPeer) {.async.} =
try: try:
await p.switch.disconnect(peer.peerId) await p.switch.disconnect(peer.peerId)
except CancelledError: except CatchableError as exc: # never cancelled
raise
except CatchableError as exc:
trace "Failed to close connection", peer, error = exc.name, msg = exc.msg trace "Failed to close connection", peer, error = exc.name, msg = exc.msg
asyncSpawn dropConnAsync(peer) asyncSpawn dropConnAsync(peer)
@ -530,8 +528,6 @@ proc init*[PubParams: object | bool](
anonymize: anonymize, anonymize: anonymize,
verifySignature: verifySignature, verifySignature: verifySignature,
sign: sign, sign: sign,
peers: initTable[PeerID, PubSubPeer](),
topics: initTable[string, Topic](),
msgIdProvider: msgIdProvider, msgIdProvider: msgIdProvider,
subscriptionValidator: subscriptionValidator, subscriptionValidator: subscriptionValidator,
topicsHigh: int.high) topicsHigh: int.high)
@ -542,8 +538,6 @@ proc init*[PubParams: object | bool](
anonymize: anonymize, anonymize: anonymize,
verifySignature: verifySignature, verifySignature: verifySignature,
sign: sign, sign: sign,
peers: initTable[PeerID, PubSubPeer](),
topics: initTable[string, Topic](),
msgIdProvider: msgIdProvider, msgIdProvider: msgIdProvider,
subscriptionValidator: subscriptionValidator, subscriptionValidator: subscriptionValidator,
parameters: parameters, parameters: parameters,

View File

@ -195,9 +195,7 @@ proc connectImpl(p: PubSubPeer) {.async.} =
# issue so we try to get a new on # issue so we try to get a new on
while true: while true:
await connectOnce(p) await connectOnce(p)
except CancelledError: except CatchableError as exc: # never cancelled
raise
except CatchableError as exc:
debug "Could not establish send connection", msg = exc.msg debug "Could not establish send connection", msg = exc.msg
finally: finally:
# drop the connection, else we end up with ghost peers # drop the connection, else we end up with ghost peers
@ -212,7 +210,7 @@ proc sendImpl(conn: Connection, encoded: seq[byte]) {.async.} =
await conn.writeLp(encoded) await conn.writeLp(encoded)
trace "sent pubsub message to remote", conn trace "sent pubsub message to remote", conn
except CatchableError as exc: except CatchableError as exc: # never cancelled
# Because we detach the send call from the currently executing task using # Because we detach the send call from the currently executing task using
# asyncSpawn, no exceptions may leak out of it # asyncSpawn, no exceptions may leak out of it
trace "Unable to send to remote", conn, msg = exc.msg trace "Unable to send to remote", conn, msg = exc.msg
@ -228,7 +226,7 @@ template sendMetrics(msg: RPCMsg): untyped =
# metrics # metrics
libp2p_pubsub_sent_messages.inc(labelValues = [$p.peerId, t]) libp2p_pubsub_sent_messages.inc(labelValues = [$p.peerId, t])
proc send*(p: PubSubPeer, msg: RPCMsg, anonymize: bool) = proc send*(p: PubSubPeer, msg: RPCMsg, anonymize: bool) {.raises: [Defect].} =
doAssert(not isNil(p), "pubsubpeer nil!") doAssert(not isNil(p), "pubsubpeer nil!")
let conn = p.sendConn let conn = p.sendConn
@ -261,7 +259,10 @@ proc send*(p: PubSubPeer, msg: RPCMsg, anonymize: bool) =
# To limit the size of the closure, we only pass the encoded message and # To limit the size of the closure, we only pass the encoded message and
# connection to the spawned send task # connection to the spawned send task
asyncSpawn sendImpl(conn, encoded) asyncSpawn(try:
sendImpl(conn, encoded)
except Exception as exc: # TODO chronos Exception
raiseAssert exc.msg)
proc newPubSubPeer*(peerId: PeerID, proc newPubSubPeer*(peerId: PeerID,
getConn: GetConn, getConn: GetConn,

View File

@ -7,7 +7,7 @@
## This file may not be copied, modified, or distributed except according to ## This file may not be copied, modified, or distributed except according to
## those terms. ## those terms.
import std/[hashes, tables] import std/[tables]
import chronos/timer import chronos/timer

View File

@ -100,7 +100,7 @@ method pushEof*(s: BufferStream) {.base, async.} =
finally: finally:
s.pushing = false s.pushing = false
method atEof*(s: BufferStream): bool = method atEof*(s: BufferStream): bool {.raises: [Defect].} =
s.isEof and s.readBuf.len == 0 s.isEof and s.readBuf.len == 0
method readOnce*(s: BufferStream, method readOnce*(s: BufferStream,

View File

@ -7,7 +7,7 @@
## This file may not be copied, modified, or distributed except according to ## This file may not be copied, modified, or distributed except according to
## those terms. ## those terms.
import std/[oids, strformat, strutils] import std/[oids, strformat]
import chronos, chronicles, metrics import chronos, chronicles, metrics
import connection import connection
import ../utility import ../utility
@ -126,10 +126,10 @@ method write*(s: ChronosStream, msg: seq[byte]) {.async.} =
if s.tracked: if s.tracked:
libp2p_peers_traffic_write.inc(msg.len.int64, labelValues = [s.shortAgent]) libp2p_peers_traffic_write.inc(msg.len.int64, labelValues = [s.shortAgent])
method closed*(s: ChronosStream): bool {.inline.} = method closed*(s: ChronosStream): bool {.raises: [Defect].} =
result = s.client.closed result = s.client.closed
method atEof*(s: ChronosStream): bool {.inline.} = method atEof*(s: ChronosStream): bool {.raises: [Defect].} =
s.client.atEof() s.client.atEof()
method closeImpl*(s: ChronosStream) {.async.} = method closeImpl*(s: ChronosStream) {.async.} =

View File

@ -130,10 +130,10 @@ method initStream*(s: LPStream) {.base.} =
proc join*(s: LPStream): Future[void] = proc join*(s: LPStream): Future[void] =
s.closeEvent.wait() s.closeEvent.wait()
method closed*(s: LPStream): bool {.base.} = method closed*(s: LPStream): bool {.base, raises: [Defect].} =
s.isClosed s.isClosed
method atEof*(s: LPStream): bool {.base.} = method atEof*(s: LPStream): bool {.base, raises: [Defect].} =
s.isEof s.isEof
method readOnce*(s: LPStream, method readOnce*(s: LPStream,

View File

@ -8,7 +8,6 @@
## those terms. ## those terms.
import std/[tables, import std/[tables,
sequtils,
options, options,
sets, sets,
oids, oids,

View File

@ -7,8 +7,9 @@
## This file may not be copied, modified, or distributed except according to ## This file may not be copied, modified, or distributed except according to
## those terms. ## those terms.
{.push raises: [Defect].}
import stew/byteutils import stew/byteutils
import strutils
const const
ShortDumpMax = 12 ShortDumpMax = 12

View File

@ -18,14 +18,20 @@ type
proc noop(data: seq[byte]) {.async, gcsafe.} = discard proc noop(data: seq[byte]) {.async, gcsafe.} = discard
proc getPubSubPeer(p: TestGossipSub, peerId: PeerID): auto = proc getPubSubPeer(p: TestGossipSub, peerId: PeerID): PubSubPeer =
proc getConn(): Future[Connection] = proc getConn(): Future[Connection] =
p.switch.dial(peerId, GossipSubCodec) p.switch.dial(peerId, GossipSubCodec)
proc dropConn(peer: PubSubPeer) = proc dropConn(peer: PubSubPeer) =
discard # we don't care about it here yet discard # we don't care about it here yet
newPubSubPeer(peerId, getConn, dropConn, nil, GossipSubCodec) let pubSubPeer = newPubSubPeer(peerId, getConn, dropConn, nil, GossipSubCodec)
debug "created new pubsub peer", peerId
p.peers[peerId] = pubSubPeer
onNewPeer(p, pubSubPeer)
pubSubPeer
proc randomPeerInfo(): PeerInfo = proc randomPeerInfo(): PeerInfo =
PeerInfo.init(PrivateKey.random(ECDSA, rng[]).get()) PeerInfo.init(PrivateKey.random(ECDSA, rng[]).get())
@ -53,8 +59,6 @@ suite "GossipSub internal":
conn.peerInfo = peerInfo conn.peerInfo = peerInfo
let peer = gossipSub.getPubSubPeer(peerInfo.peerId) let peer = gossipSub.getPubSubPeer(peerInfo.peerId)
peer.sendConn = conn peer.sendConn = conn
gossipSub.onNewPeer(peer)
gossipSub.peers[peerInfo.peerId] = peer
gossipSub.gossipsub[topic].incl(peer) gossipSub.gossipsub[topic].incl(peer)
# test via dynamic dispatch # test via dynamic dispatch
@ -96,8 +100,6 @@ suite "GossipSub internal":
conn.peerInfo = peerInfo conn.peerInfo = peerInfo
let peer = gossipSub.getPubSubPeer(peerInfo.peerId) let peer = gossipSub.getPubSubPeer(peerInfo.peerId)
peer.sendConn = conn peer.sendConn = conn
gossipSub.onNewPeer(peer)
gossipSub.peers[peerInfo.peerId] = peer
gossipSub.gossipsub[topic].incl(peer) gossipSub.gossipsub[topic].incl(peer)
check gossipSub.peers.len == 15 check gossipSub.peers.len == 15
@ -125,8 +127,6 @@ suite "GossipSub internal":
let peer = gossipSub.getPubSubPeer(peerInfo.peerId) let peer = gossipSub.getPubSubPeer(peerInfo.peerId)
peer.sendConn = conn peer.sendConn = conn
peer.score = scoreLow peer.score = scoreLow
gossipSub.onNewPeer(peer)
gossipSub.peers[peerInfo.peerId] = peer
gossipSub.gossipsub[topic].incl(peer) gossipSub.gossipsub[topic].incl(peer)
scoreLow += 1.0 scoreLow += 1.0
@ -155,9 +155,7 @@ suite "GossipSub internal":
let peerInfo = PeerInfo.init(PrivateKey.random(ECDSA, rng[]).get()) let peerInfo = PeerInfo.init(PrivateKey.random(ECDSA, rng[]).get())
conn.peerInfo = peerInfo conn.peerInfo = peerInfo
let peer = gossipSub.getPubSubPeer(peerInfo.peerId) let peer = gossipSub.getPubSubPeer(peerInfo.peerId)
gossipSub.onNewPeer(peer)
gossipSub.grafted(peer, topic) gossipSub.grafted(peer, topic)
gossipSub.peers[peerInfo.peerId] = peer
gossipSub.mesh[topic].incl(peer) gossipSub.mesh[topic].incl(peer)
check gossipSub.mesh[topic].len == 15 check gossipSub.mesh[topic].len == 15
@ -184,7 +182,6 @@ suite "GossipSub internal":
var peerInfo = randomPeerInfo() var peerInfo = randomPeerInfo()
conn.peerInfo = peerInfo conn.peerInfo = peerInfo
let peer = gossipSub.getPubSubPeer(peerInfo.peerId) let peer = gossipSub.getPubSubPeer(peerInfo.peerId)
gossipSub.onNewPeer(peer)
peer.handler = handler peer.handler = handler
gossipSub.gossipsub[topic].incl(peer) gossipSub.gossipsub[topic].incl(peer)
@ -214,7 +211,6 @@ suite "GossipSub internal":
let peerInfo = PeerInfo.init(PrivateKey.random(ECDSA, rng[]).get()) let peerInfo = PeerInfo.init(PrivateKey.random(ECDSA, rng[]).get())
conn.peerInfo = peerInfo conn.peerInfo = peerInfo
let peer = gossipSub.getPubSubPeer(peerInfo.peerId) let peer = gossipSub.getPubSubPeer(peerInfo.peerId)
gossipSub.onNewPeer(peer)
peer.handler = handler peer.handler = handler
gossipSub.fanout[topic].incl(peer) gossipSub.fanout[topic].incl(peer)
@ -249,7 +245,6 @@ suite "GossipSub internal":
let peerInfo = randomPeerInfo() let peerInfo = randomPeerInfo()
conn.peerInfo = peerInfo conn.peerInfo = peerInfo
let peer = gossipSub.getPubSubPeer(peerInfo.peerId) let peer = gossipSub.getPubSubPeer(peerInfo.peerId)
gossipSub.onNewPeer(peer)
peer.handler = handler peer.handler = handler
gossipSub.fanout[topic1].incl(peer) gossipSub.fanout[topic1].incl(peer)
gossipSub.fanout[topic2].incl(peer) gossipSub.fanout[topic2].incl(peer)
@ -284,7 +279,6 @@ suite "GossipSub internal":
let peerInfo = randomPeerInfo() let peerInfo = randomPeerInfo()
conn.peerInfo = peerInfo conn.peerInfo = peerInfo
let peer = gossipSub.getPubSubPeer(peerInfo.peerId) let peer = gossipSub.getPubSubPeer(peerInfo.peerId)
gossipSub.onNewPeer(peer)
peer.handler = handler peer.handler = handler
if i mod 2 == 0: if i mod 2 == 0:
gossipSub.fanout[topic].incl(peer) gossipSub.fanout[topic].incl(peer)
@ -299,7 +293,6 @@ suite "GossipSub internal":
let peerInfo = randomPeerInfo() let peerInfo = randomPeerInfo()
conn.peerInfo = peerInfo conn.peerInfo = peerInfo
let peer = gossipSub.getPubSubPeer(peerInfo.peerId) let peer = gossipSub.getPubSubPeer(peerInfo.peerId)
gossipSub.onNewPeer(peer)
peer.handler = handler peer.handler = handler
gossipSub.gossipsub[topic].incl(peer) gossipSub.gossipsub[topic].incl(peer)
@ -344,7 +337,6 @@ suite "GossipSub internal":
let peerInfo = randomPeerInfo() let peerInfo = randomPeerInfo()
conn.peerInfo = peerInfo conn.peerInfo = peerInfo
let peer = gossipSub.getPubSubPeer(peerInfo.peerId) let peer = gossipSub.getPubSubPeer(peerInfo.peerId)
gossipSub.onNewPeer(peer)
peer.handler = handler peer.handler = handler
if i mod 2 == 0: if i mod 2 == 0:
gossipSub.fanout[topic].incl(peer) gossipSub.fanout[topic].incl(peer)
@ -385,7 +377,6 @@ suite "GossipSub internal":
let peerInfo = randomPeerInfo() let peerInfo = randomPeerInfo()
conn.peerInfo = peerInfo conn.peerInfo = peerInfo
let peer = gossipSub.getPubSubPeer(peerInfo.peerId) let peer = gossipSub.getPubSubPeer(peerInfo.peerId)
gossipSub.onNewPeer(peer)
peer.handler = handler peer.handler = handler
if i mod 2 == 0: if i mod 2 == 0:
gossipSub.mesh[topic].incl(peer) gossipSub.mesh[topic].incl(peer)
@ -427,7 +418,6 @@ suite "GossipSub internal":
let peerInfo = randomPeerInfo() let peerInfo = randomPeerInfo()
conn.peerInfo = peerInfo conn.peerInfo = peerInfo
let peer = gossipSub.getPubSubPeer(peerInfo.peerId) let peer = gossipSub.getPubSubPeer(peerInfo.peerId)
gossipSub.onNewPeer(peer)
peer.handler = handler peer.handler = handler
if i mod 2 == 0: if i mod 2 == 0:
gossipSub.mesh[topic].incl(peer) gossipSub.mesh[topic].incl(peer)
@ -466,7 +456,6 @@ suite "GossipSub internal":
let peerInfo = randomPeerInfo() let peerInfo = randomPeerInfo()
conn.peerInfo = peerInfo conn.peerInfo = peerInfo
let peer = gossipSub.getPubSubPeer(peerInfo.peerId) let peer = gossipSub.getPubSubPeer(peerInfo.peerId)
gossipSub.onNewPeer(peer)
peer.handler = handler peer.handler = handler
# generate messages # generate messages
@ -489,7 +478,7 @@ suite "GossipSub internal":
asyncTest "Disconnect bad peers": asyncTest "Disconnect bad peers":
let gossipSub = TestGossipSub.init(newStandardSwitch()) let gossipSub = TestGossipSub.init(newStandardSwitch())
gossipSub.parameters.disconnectBadPeers = true gossipSub.parameters.disconnectBadPeers = true
gossipSub.parameters.appSpecificWeight = 1.0
proc handler(peer: PubSubPeer, msg: RPCMsg) {.async.} = proc handler(peer: PubSubPeer, msg: RPCMsg) {.async.} =
check false check false
@ -501,12 +490,10 @@ suite "GossipSub internal":
let peerInfo = randomPeerInfo() let peerInfo = randomPeerInfo()
conn.peerInfo = peerInfo conn.peerInfo = peerInfo
let peer = gossipSub.getPubSubPeer(peerInfo.peerId) let peer = gossipSub.getPubSubPeer(peerInfo.peerId)
gossipSub.onNewPeer(peer)
peer.sendConn = conn peer.sendConn = conn
peer.handler = handler peer.handler = handler
peer.score = gossipSub.parameters.graylistThreshold - 1 peer.appScore = gossipSub.parameters.graylistThreshold - 1
gossipSub.gossipsub.mgetOrPut(topic, initHashSet[PubSubPeer]()).incl(peer) gossipSub.gossipsub.mgetOrPut(topic, initHashSet[PubSubPeer]()).incl(peer)
gossipSub.peers[peerInfo.peerId] = peer
gossipSub.switch.connManager.storeConn(conn) gossipSub.switch.connManager.storeConn(conn)
gossipSub.updateScores() gossipSub.updateScores()
@ -535,7 +522,6 @@ suite "GossipSub internal":
let peerInfo = randomPeerInfo() let peerInfo = randomPeerInfo()
conn.peerInfo = peerInfo conn.peerInfo = peerInfo
let peer = gossipSub.getPubSubPeer(peerInfo.peerId) let peer = gossipSub.getPubSubPeer(peerInfo.peerId)
gossipSub.peers[peerInfo.peerId] = peer
await gossipSub.rpcHandler(peer, lotOfSubs) await gossipSub.rpcHandler(peer, lotOfSubs)
@ -561,8 +547,6 @@ suite "GossipSub internal":
conn.peerInfo = peerInfo conn.peerInfo = peerInfo
let peer = gossipSub.getPubSubPeer(peerInfo.peerId) let peer = gossipSub.getPubSubPeer(peerInfo.peerId)
peer.sendConn = conn peer.sendConn = conn
gossipSub.onNewPeer(peer)
gossipSub.peers[peerInfo.peerId] = peer
gossipSub.gossipsub[topic].incl(peer) gossipSub.gossipsub[topic].incl(peer)
gossipSub.backingOff gossipSub.backingOff
.mgetOrPut(topic, initTable[PeerID, Moment]()) .mgetOrPut(topic, initTable[PeerID, Moment]())
@ -601,9 +585,7 @@ suite "GossipSub internal":
let peer = gossipSub.getPubSubPeer(peerInfo.peerId) let peer = gossipSub.getPubSubPeer(peerInfo.peerId)
peer.score = 40.0 peer.score = 40.0
peer.sendConn = conn peer.sendConn = conn
gossipSub.onNewPeer(peer)
gossipSub.grafted(peer, topic) gossipSub.grafted(peer, topic)
gossipSub.peers[peerInfo.peerId] = peer
gossipSub.mesh[topic].incl(peer) gossipSub.mesh[topic].incl(peer)
for i in 0..<7: for i in 0..<7:
@ -615,9 +597,7 @@ suite "GossipSub internal":
let peer = gossipSub.getPubSubPeer(peerInfo.peerId) let peer = gossipSub.getPubSubPeer(peerInfo.peerId)
peer.score = 10.0 peer.score = 10.0
peer.sendConn = conn peer.sendConn = conn
gossipSub.onNewPeer(peer)
gossipSub.grafted(peer, topic) gossipSub.grafted(peer, topic)
gossipSub.peers[peerInfo.peerId] = peer
gossipSub.mesh[topic].incl(peer) gossipSub.mesh[topic].incl(peer)
check gossipSub.mesh[topic].len == 13 check gossipSub.mesh[topic].len == 13
@ -650,10 +630,8 @@ suite "GossipSub internal":
let peerInfo = randomPeerInfo() let peerInfo = randomPeerInfo()
conn.peerInfo = peerInfo conn.peerInfo = peerInfo
let peer = gossipSub.getPubSubPeer(peerInfo.peerId) let peer = gossipSub.getPubSubPeer(peerInfo.peerId)
gossipSub.onNewPeer(peer)
peer.handler = handler peer.handler = handler
gossipSub.grafted(peer, topic) gossipSub.grafted(peer, topic)
gossipSub.peers[peerInfo.peerId] = peer
gossipSub.mesh[topic].incl(peer) gossipSub.mesh[topic].incl(peer)
block: block:
@ -668,7 +646,7 @@ suite "GossipSub internal":
topicID: topic, topicID: topic,
messageIDs: @[id, id, id] messageIDs: @[id, id, id]
) )
# gossipSub.initPeerStats(peer) peer.iHaveBudget = 0
let iwants = gossipSub.handleIHave(peer, @[msg]) let iwants = gossipSub.handleIHave(peer, @[msg])
check: iwants.messageIDs.len == 0 check: iwants.messageIDs.len == 0
@ -684,7 +662,6 @@ suite "GossipSub internal":
topicID: topic, topicID: topic,
messageIDs: @[id, id, id] messageIDs: @[id, id, id]
) )
gossipSub.initPeerStats(peer)
let iwants = gossipSub.handleIHave(peer, @[msg]) let iwants = gossipSub.handleIHave(peer, @[msg])
check: iwants.messageIDs.len == 1 check: iwants.messageIDs.len == 1
@ -700,7 +677,6 @@ suite "GossipSub internal":
let msg = ControlIWant( let msg = ControlIWant(
messageIDs: @[id, id, id] messageIDs: @[id, id, id]
) )
gossipSub.initPeerStats(peer)
let genmsg = gossipSub.handleIWant(peer, @[msg]) let genmsg = gossipSub.handleIWant(peer, @[msg])
check: genmsg.len == 1 check: genmsg.len == 1