gossipsub audit fixes (#412)
* [SEC] gossipsub - rebalanceMesh grafts peers giving preference to low scores #405 * comment score choices * compiler warning fixes/bug fixes (unsubscribe) * rebalanceMesh does not enforce D_out quota * fix outbound grafting * fight the nim compiler * fix closure capture bs... * another closure fix * #403 rebalance prune fixes * more test fixing * #403 fixes * #402 avoid removing scores on unsub * #401 handleGraft improvements * [SEC] handleIHAVE/handleIWANT recommendations * add a note about peer exchange handling
This commit is contained in:
parent
1de1d49223
commit
75b023c9e5
|
@ -510,7 +510,7 @@ proc getRawBytes*(pubkey: EcPublicKey): EcResult[seq[byte]] =
|
||||||
let length = ? pubkey.toRawBytes(res)
|
let length = ? pubkey.toRawBytes(res)
|
||||||
res.setLen(length)
|
res.setLen(length)
|
||||||
discard ? pubkey.toRawBytes(res)
|
discard ? pubkey.toRawBytes(res)
|
||||||
ok(res)
|
return ok(res)
|
||||||
else:
|
else:
|
||||||
return err(EcKeyIncorrectError)
|
return err(EcKeyIncorrectError)
|
||||||
|
|
||||||
|
|
|
@ -873,7 +873,9 @@ proc init*(mtype: typedesc[MultiAddress], address: TransportAddress,
|
||||||
let protoProto = case protocol
|
let protoProto = case protocol
|
||||||
of IPPROTO_TCP: getProtocol("tcp")
|
of IPPROTO_TCP: getProtocol("tcp")
|
||||||
of IPPROTO_UDP: getProtocol("udp")
|
of IPPROTO_UDP: getProtocol("udp")
|
||||||
else: return err("multiaddress: protocol should be either TCP or UDP")
|
else: default(MAProtocol)
|
||||||
|
if protoProto.size == 0:
|
||||||
|
return err("multiaddress: protocol should be either TCP or UDP")
|
||||||
if address.family == AddressFamily.IPv4:
|
if address.family == AddressFamily.IPv4:
|
||||||
res.data.write(getProtocol("ip4").mcodec)
|
res.data.write(getProtocol("ip4").mcodec)
|
||||||
res.data.writeArray(address.address_v4)
|
res.data.writeArray(address.address_v4)
|
||||||
|
|
|
@ -260,7 +260,7 @@ proc validateParameters*(parameters: TopicParams): Result[void, cstring] =
|
||||||
else:
|
else:
|
||||||
ok()
|
ok()
|
||||||
|
|
||||||
func byScore(x,y: PubSubPeer): int = (x.score - y.score).int
|
func byScore(x,y: PubSubPeer): int = system.cmp(x.score, y.score)
|
||||||
|
|
||||||
method init*(g: GossipSub) =
|
method init*(g: GossipSub) =
|
||||||
proc handler(conn: Connection, proto: string) {.async.} =
|
proc handler(conn: Connection, proto: string) {.async.} =
|
||||||
|
@ -408,8 +408,8 @@ proc rebalanceMesh(g: GossipSub, topic: string) {.async.} =
|
||||||
# shuffle anyway, score might be not used
|
# shuffle anyway, score might be not used
|
||||||
shuffle(grafts)
|
shuffle(grafts)
|
||||||
|
|
||||||
# sort peers by score
|
# sort peers by score, high score first since we graft
|
||||||
grafts.sort(byScore)
|
grafts.sort(byScore, SortOrder.Descending)
|
||||||
|
|
||||||
# Graft peers so we reach a count of D
|
# Graft peers so we reach a count of D
|
||||||
grafts.setLen(min(grafts.len, GossipSubD - g.mesh.peers(topic)))
|
grafts.setLen(min(grafts.len, GossipSubD - g.mesh.peers(topic)))
|
||||||
|
@ -421,9 +421,12 @@ proc rebalanceMesh(g: GossipSub, topic: string) {.async.} =
|
||||||
g.fanout.removePeer(topic, peer)
|
g.fanout.removePeer(topic, peer)
|
||||||
grafting &= peer
|
grafting &= peer
|
||||||
|
|
||||||
elif npeers < g.parameters.dOut:
|
else:
|
||||||
|
var meshPeers = toSeq(g.mesh.getOrDefault(topic, initHashSet[PubSubPeer]()))
|
||||||
|
meshPeers.keepIf do (x: PubSubPeer) -> bool: x.outbound
|
||||||
|
if meshPeers.len < g.parameters.dOut:
|
||||||
trace "replenishing mesh outbound quota", peers = g.mesh.peers(topic)
|
trace "replenishing mesh outbound quota", peers = g.mesh.peers(topic)
|
||||||
# replenish the mesh if we're below Dlo
|
|
||||||
grafts = toSeq(
|
grafts = toSeq(
|
||||||
g.gossipsub.getOrDefault(topic, initHashSet[PubSubPeer]()) -
|
g.gossipsub.getOrDefault(topic, initHashSet[PubSubPeer]()) -
|
||||||
g.mesh.getOrDefault(topic, initHashSet[PubSubPeer]())
|
g.mesh.getOrDefault(topic, initHashSet[PubSubPeer]())
|
||||||
|
@ -442,11 +445,11 @@ proc rebalanceMesh(g: GossipSub, topic: string) {.async.} =
|
||||||
# shuffle anyway, score might be not used
|
# shuffle anyway, score might be not used
|
||||||
shuffle(grafts)
|
shuffle(grafts)
|
||||||
|
|
||||||
# sort peers by score
|
# sort peers by score, high score first, we are grafting
|
||||||
grafts.sort(byScore)
|
grafts.sort(byScore, SortOrder.Descending)
|
||||||
|
|
||||||
# Graft peers so we reach a count of D
|
# Graft peers so we reach a count of D
|
||||||
grafts.setLen(min(grafts.len, g.parameters.dOut - g.mesh.peers(topic)))
|
grafts.setLen(min(grafts.len, g.parameters.dOut))
|
||||||
|
|
||||||
trace "grafting outbound peers", topic, peers = grafts.len
|
trace "grafting outbound peers", topic, peers = grafts.len
|
||||||
|
|
||||||
|
@ -460,20 +463,21 @@ proc rebalanceMesh(g: GossipSub, topic: string) {.async.} =
|
||||||
if g.mesh.peers(topic) > GossipSubDhi:
|
if g.mesh.peers(topic) > GossipSubDhi:
|
||||||
# prune peers if we've gone over Dhi
|
# prune peers if we've gone over Dhi
|
||||||
prunes = toSeq(g.mesh[topic])
|
prunes = toSeq(g.mesh[topic])
|
||||||
|
# avoid pruning peers we are currently grafting in this heartbeat
|
||||||
|
prunes.keepIf do (x: PubSubPeer) -> bool: x notin grafting
|
||||||
|
let mesh = prunes
|
||||||
|
|
||||||
# shuffle anyway, score might be not used
|
# shuffle anyway, score might be not used
|
||||||
shuffle(prunes)
|
shuffle(prunes)
|
||||||
|
|
||||||
# sort peers by score (inverted)
|
# sort peers by score (inverted), pruning, so low score peers are on top
|
||||||
prunes.sort(byScore)
|
prunes.sort(byScore, SortOrder.Ascending)
|
||||||
|
|
||||||
# keep high score peers
|
# keep high score peers
|
||||||
if prunes.len > g.parameters.dScore:
|
if prunes.len > g.parameters.dScore:
|
||||||
prunes.setLen(prunes.len - g.parameters.dScore)
|
prunes.setLen(prunes.len - g.parameters.dScore)
|
||||||
# we must try to keep outbound peers
|
|
||||||
# to keep an outbound mesh quota
|
# collect inbound/outbound info
|
||||||
# so we try to first prune inbound peers
|
|
||||||
# if none we add up some outbound
|
|
||||||
var outbound: seq[PubSubPeer]
|
var outbound: seq[PubSubPeer]
|
||||||
var inbound: seq[PubSubPeer]
|
var inbound: seq[PubSubPeer]
|
||||||
for peer in prunes:
|
for peer in prunes:
|
||||||
|
@ -482,20 +486,25 @@ proc rebalanceMesh(g: GossipSub, topic: string) {.async.} =
|
||||||
else:
|
else:
|
||||||
inbound &= peer
|
inbound &= peer
|
||||||
|
|
||||||
|
# ensure that there are at least D_out peers first and rebalance to GossipSubD after that
|
||||||
|
let maxOutboundPrunes =
|
||||||
|
block:
|
||||||
|
var count = 0
|
||||||
|
for peer in mesh:
|
||||||
|
if peer.outbound:
|
||||||
|
inc count
|
||||||
|
count - g.parameters.dOut
|
||||||
|
outbound.setLen(min(outbound.len, max(0, maxOutboundPrunes)))
|
||||||
|
|
||||||
|
# concat remaining outbound peers
|
||||||
|
inbound &= outbound
|
||||||
|
|
||||||
let pruneLen = inbound.len - GossipSubD
|
let pruneLen = inbound.len - GossipSubD
|
||||||
if pruneLen > 0:
|
if pruneLen > 0:
|
||||||
# Ok we got some peers to prune,
|
# Ok we got some peers to prune,
|
||||||
# for this heartbeat let's prune those
|
# for this heartbeat let's prune those
|
||||||
shuffle(inbound)
|
shuffle(inbound)
|
||||||
inbound.setLen(pruneLen)
|
inbound.setLen(pruneLen)
|
||||||
else:
|
|
||||||
# We could not find any inbound to prune
|
|
||||||
# Yet we are on Hi, so we need to cull outbound peers
|
|
||||||
let keepDOutLen = outbound.len - g.parameters.dOut
|
|
||||||
if keepDOutLen > 0:
|
|
||||||
shuffle(outbound)
|
|
||||||
outbound.setLen(keepDOutLen)
|
|
||||||
inbound &= outbound
|
|
||||||
|
|
||||||
trace "pruning", prunes = inbound.len
|
trace "pruning", prunes = inbound.len
|
||||||
for peer in inbound:
|
for peer in inbound:
|
||||||
|
@ -505,7 +514,8 @@ proc rebalanceMesh(g: GossipSub, topic: string) {.async.} =
|
||||||
# opportunistic grafting, by spec mesh should not be empty...
|
# opportunistic grafting, by spec mesh should not be empty...
|
||||||
if g.mesh.peers(topic) > 1:
|
if g.mesh.peers(topic) > 1:
|
||||||
var peers = toSeq(g.mesh[topic])
|
var peers = toSeq(g.mesh[topic])
|
||||||
peers.sort(byScore)
|
# grafting so high score has priority
|
||||||
|
peers.sort(byScore, SortOrder.Descending)
|
||||||
let medianIdx = peers.len div 2
|
let medianIdx = peers.len div 2
|
||||||
let median = peers[medianIdx]
|
let median = peers[medianIdx]
|
||||||
if median.score < g.parameters.opportunisticGraftThreshold:
|
if median.score < g.parameters.opportunisticGraftThreshold:
|
||||||
|
@ -728,7 +738,7 @@ proc updateScores(g: GossipSub) = # avoid async
|
||||||
if peer.behaviourPenalty < g.parameters.decayToZero:
|
if peer.behaviourPenalty < g.parameters.decayToZero:
|
||||||
peer.behaviourPenalty = 0
|
peer.behaviourPenalty = 0
|
||||||
|
|
||||||
debug "updated peer's score", peer, score = peer.score, n_topics, is_grafted
|
trace "updated peer's score", peer, score = peer.score, n_topics, is_grafted
|
||||||
|
|
||||||
for peer in evicting:
|
for peer in evicting:
|
||||||
g.peerStats.del(peer)
|
g.peerStats.del(peer)
|
||||||
|
@ -842,11 +852,6 @@ method unsubscribePeer*(g: GossipSub, peer: PeerID) =
|
||||||
libp2p_gossipsub_peers_per_topic_fanout
|
libp2p_gossipsub_peers_per_topic_fanout
|
||||||
.set(g.fanout.peers(t).int64, labelValues = [t])
|
.set(g.fanout.peers(t).int64, labelValues = [t])
|
||||||
|
|
||||||
# don't retain bad score peers
|
|
||||||
if pubSubPeer.score < 0.0:
|
|
||||||
g.peerStats.del(pubSubPeer)
|
|
||||||
return
|
|
||||||
|
|
||||||
g.peerStats[pubSubPeer].expire = Moment.now() + g.parameters.retainScore
|
g.peerStats[pubSubPeer].expire = Moment.now() + g.parameters.retainScore
|
||||||
for topic, info in g.peerStats[pubSubPeer].topicInfos.mpairs:
|
for topic, info in g.peerStats[pubSubPeer].topicInfos.mpairs:
|
||||||
info.firstMessageDeliveries = 0
|
info.firstMessageDeliveries = 0
|
||||||
|
@ -893,6 +898,15 @@ method subscribeTopic*(g: GossipSub,
|
||||||
|
|
||||||
trace "gossip peers", peers = g.gossipsub.peers(topic), topic
|
trace "gossip peers", peers = g.gossipsub.peers(topic), topic
|
||||||
|
|
||||||
|
proc punishPeer(g: GossipSub, peer: PubSubPeer, topics: seq[string]) =
|
||||||
|
for t in topics:
|
||||||
|
# ensure we init a new topic if unknown
|
||||||
|
let _ = g.topicParams.mgetOrPut(t, TopicParams.init())
|
||||||
|
# update stats
|
||||||
|
var tstats = g.peerStats[peer].topicInfos.getOrDefault(t)
|
||||||
|
tstats.invalidMessageDeliveries += 1
|
||||||
|
g.peerStats[peer].topicInfos[t] = tstats
|
||||||
|
|
||||||
proc handleGraft(g: GossipSub,
|
proc handleGraft(g: GossipSub,
|
||||||
peer: PubSubPeer,
|
peer: PubSubPeer,
|
||||||
grafts: seq[ControlGraft]): seq[ControlPrune] =
|
grafts: seq[ControlGraft]): seq[ControlPrune] =
|
||||||
|
@ -906,17 +920,21 @@ proc handleGraft(g: GossipSub,
|
||||||
|
|
||||||
# It is an error to GRAFT on a explicit peer
|
# It is an error to GRAFT on a explicit peer
|
||||||
if peer.peerId in g.parameters.directPeers:
|
if peer.peerId in g.parameters.directPeers:
|
||||||
trace "attempt to graft an explicit peer", peer=peer.id,
|
# receiving a graft from a direct peer should yield a more prominent warning (protocol violation)
|
||||||
|
warn "attempt to graft an explicit peer", peer=peer.id,
|
||||||
topicID=graft.topicID
|
topicID=graft.topicID
|
||||||
# and such an attempt should be logged and rejected with a PRUNE
|
# and such an attempt should be logged and rejected with a PRUNE
|
||||||
result.add(ControlPrune(
|
result.add(ControlPrune(
|
||||||
topicID: graft.topicID,
|
topicID: graft.topicID,
|
||||||
peers: @[], # omitting heavy computation here as the remote did something illegal
|
peers: @[], # omitting heavy computation here as the remote did something illegal
|
||||||
backoff: g.parameters.pruneBackoff.seconds.uint64))
|
backoff: g.parameters.pruneBackoff.seconds.uint64))
|
||||||
|
|
||||||
|
g.punishPeer(peer, @[topic])
|
||||||
|
|
||||||
continue
|
continue
|
||||||
|
|
||||||
if peer.peerId in g.backingOff:
|
if peer.peerId in g.backingOff and g.backingOff[peer.peerId] > Moment.now():
|
||||||
trace "attempt to graft an backingOff peer", peer=peer.id,
|
trace "attempt to graft a backingOff peer", peer=peer.id,
|
||||||
topicID=graft.topicID,
|
topicID=graft.topicID,
|
||||||
expire=g.backingOff[peer.peerId]
|
expire=g.backingOff[peer.peerId]
|
||||||
# and such an attempt should be logged and rejected with a PRUNE
|
# and such an attempt should be logged and rejected with a PRUNE
|
||||||
|
@ -924,10 +942,18 @@ proc handleGraft(g: GossipSub,
|
||||||
topicID: graft.topicID,
|
topicID: graft.topicID,
|
||||||
peers: @[], # omitting heavy computation here as the remote did something illegal
|
peers: @[], # omitting heavy computation here as the remote did something illegal
|
||||||
backoff: g.parameters.pruneBackoff.seconds.uint64))
|
backoff: g.parameters.pruneBackoff.seconds.uint64))
|
||||||
|
|
||||||
|
g.punishPeer(peer, @[topic])
|
||||||
|
|
||||||
continue
|
continue
|
||||||
|
|
||||||
if peer notin g.peerStats:
|
if peer notin g.peerStats:
|
||||||
g.peerStats[peer] = PeerStats()
|
g.onNewPeer(peer)
|
||||||
|
|
||||||
|
# not in the spec exactly, but let's avoid way too low score peers
|
||||||
|
# other clients do it too also was an audit recommendation
|
||||||
|
if peer.score < g.parameters.publishThreshold:
|
||||||
|
continue
|
||||||
|
|
||||||
# If they send us a graft before they send us a subscribe, what should
|
# If they send us a graft before they send us a subscribe, what should
|
||||||
# we do? For now, we add them to mesh but don't add them to gossipsub.
|
# we do? For now, we add them to mesh but don't add them to gossipsub.
|
||||||
|
@ -970,6 +996,10 @@ proc handlePrune(g: GossipSub, peer: PubSubPeer, prunes: seq[ControlPrune]) =
|
||||||
g.pruned(peer, prune.topicID)
|
g.pruned(peer, prune.topicID)
|
||||||
g.mesh.removePeer(prune.topicID, peer)
|
g.mesh.removePeer(prune.topicID, peer)
|
||||||
|
|
||||||
|
# TODO peer exchange, we miss ambient peer discovery in libp2p, so we are blocked by that
|
||||||
|
# another option could be to implement signed peer records
|
||||||
|
## if peer.score > g.parameters.gossipThreshold and prunes.peers.len > 0:
|
||||||
|
|
||||||
when defined(libp2p_expensive_metrics):
|
when defined(libp2p_expensive_metrics):
|
||||||
libp2p_gossipsub_peers_per_topic_mesh
|
libp2p_gossipsub_peers_per_topic_mesh
|
||||||
.set(g.mesh.peers(prune.topicID).int64, labelValues = [prune.topicID])
|
.set(g.mesh.peers(prune.topicID).int64, labelValues = [prune.topicID])
|
||||||
|
@ -979,26 +1009,36 @@ proc handleIHave(g: GossipSub,
|
||||||
ihaves: seq[ControlIHave]): ControlIWant =
|
ihaves: seq[ControlIHave]): ControlIWant =
|
||||||
if peer.score < g.parameters.gossipThreshold:
|
if peer.score < g.parameters.gossipThreshold:
|
||||||
trace "ihave: ignoring low score peer", peer, score = peer.score
|
trace "ihave: ignoring low score peer", peer, score = peer.score
|
||||||
elif peer.iHaveBudget == 0:
|
elif peer.iHaveBudget <= 0:
|
||||||
trace "ihave: ignoring out of budget peer", peer, score = peer.score
|
trace "ihave: ignoring out of budget peer", peer, score = peer.score
|
||||||
else:
|
else:
|
||||||
dec peer.iHaveBudget
|
var deIhaves = ihaves.deduplicate()
|
||||||
for ihave in ihaves:
|
for ihave in deIhaves.mitems:
|
||||||
trace "peer sent ihave",
|
trace "peer sent ihave",
|
||||||
peer, topic = ihave.topicID, msgs = ihave.messageIDs
|
peer, topic = ihave.topicID, msgs = ihave.messageIDs
|
||||||
|
|
||||||
if ihave.topicID in g.mesh:
|
if ihave.topicID in g.mesh:
|
||||||
for m in ihave.messageIDs:
|
for m in ihave.messageIDs:
|
||||||
if m notin g.seen:
|
if m notin g.seen:
|
||||||
|
if peer.iHaveBudget > 0:
|
||||||
result.messageIDs.add(m)
|
result.messageIDs.add(m)
|
||||||
|
dec peer.iHaveBudget
|
||||||
|
else:
|
||||||
|
return
|
||||||
|
|
||||||
|
# shuffling result.messageIDs before sending it out to increase the likelihood
|
||||||
|
# of getting an answer if the peer truncates the list due to internal size restrictions.
|
||||||
|
shuffle(result.messageIDs)
|
||||||
|
|
||||||
proc handleIWant(g: GossipSub,
|
proc handleIWant(g: GossipSub,
|
||||||
peer: PubSubPeer,
|
peer: PubSubPeer,
|
||||||
iwants: seq[ControlIWant]): seq[Message] =
|
iwants: seq[ControlIWant]): seq[Message] =
|
||||||
if peer.score < g.parameters.gossipThreshold:
|
if peer.score < g.parameters.gossipThreshold:
|
||||||
trace "iwant: ignoring low score peer", peer, score = peer.score
|
trace "iwant: ignoring low score peer", peer, score = peer.score
|
||||||
|
elif peer.iWantBudget <= 0:
|
||||||
|
trace "iwant: ignoring out of budget peer", peer, score = peer.score
|
||||||
else:
|
else:
|
||||||
for iwant in iwants:
|
var deIwants = iwants.deduplicate()
|
||||||
|
for iwant in deIwants:
|
||||||
for mid in iwant.messageIDs:
|
for mid in iwant.messageIDs:
|
||||||
trace "peer sent iwant", peer, messageID = mid
|
trace "peer sent iwant", peer, messageID = mid
|
||||||
let msg = g.mcache.get(mid)
|
let msg = g.mcache.get(mid)
|
||||||
|
@ -1010,15 +1050,6 @@ proc handleIWant(g: GossipSub,
|
||||||
else:
|
else:
|
||||||
return
|
return
|
||||||
|
|
||||||
proc punishPeer(g: GossipSub, peer: PubSubPeer, msg: Message) =
|
|
||||||
for t in msg.topicIDs:
|
|
||||||
# ensure we init a new topic if unknown
|
|
||||||
let _ = g.topicParams.mgetOrPut(t, TopicParams.init())
|
|
||||||
# update stats
|
|
||||||
var tstats = g.peerStats[peer].topicInfos.getOrDefault(t)
|
|
||||||
tstats.invalidMessageDeliveries += 1
|
|
||||||
g.peerStats[peer].topicInfos[t] = tstats
|
|
||||||
|
|
||||||
method rpcHandler*(g: GossipSub,
|
method rpcHandler*(g: GossipSub,
|
||||||
peer: PubSubPeer,
|
peer: PubSubPeer,
|
||||||
rpcMsg: RPCMsg) {.async.} =
|
rpcMsg: RPCMsg) {.async.} =
|
||||||
|
@ -1050,13 +1081,13 @@ method rpcHandler*(g: GossipSub,
|
||||||
if (msg.signature.len > 0 or g.verifySignature) and not msg.verify():
|
if (msg.signature.len > 0 or g.verifySignature) and not msg.verify():
|
||||||
# always validate if signature is present or required
|
# always validate if signature is present or required
|
||||||
debug "Dropping message due to failed signature verification", msgId, peer
|
debug "Dropping message due to failed signature verification", msgId, peer
|
||||||
g.punishPeer(peer, msg)
|
g.punishPeer(peer, msg.topicIDs)
|
||||||
continue
|
continue
|
||||||
|
|
||||||
if msg.seqno.len > 0 and msg.seqno.len != 8:
|
if msg.seqno.len > 0 and msg.seqno.len != 8:
|
||||||
# if we have seqno should be 8 bytes long
|
# if we have seqno should be 8 bytes long
|
||||||
debug "Dropping message due to invalid seqno length", msgId, peer
|
debug "Dropping message due to invalid seqno length", msgId, peer
|
||||||
g.punishPeer(peer, msg)
|
g.punishPeer(peer, msg.topicIDs)
|
||||||
continue
|
continue
|
||||||
|
|
||||||
# g.anonymize needs no evaluation when receiving messages
|
# g.anonymize needs no evaluation when receiving messages
|
||||||
|
@ -1066,7 +1097,7 @@ method rpcHandler*(g: GossipSub,
|
||||||
case validation
|
case validation
|
||||||
of ValidationResult.Reject:
|
of ValidationResult.Reject:
|
||||||
debug "Dropping message due to failed validation", msgId, peer
|
debug "Dropping message due to failed validation", msgId, peer
|
||||||
g.punishPeer(peer, msg)
|
g.punishPeer(peer, msg.topicIDs)
|
||||||
continue
|
continue
|
||||||
of ValidationResult.Ignore:
|
of ValidationResult.Ignore:
|
||||||
debug "Dropping message due to ignored validation", msgId, peer
|
debug "Dropping message due to ignored validation", msgId, peer
|
||||||
|
|
|
@ -225,13 +225,17 @@ method unsubscribe*(p: PubSub,
|
||||||
topics: seq[TopicPair]) {.base, async.} =
|
topics: seq[TopicPair]) {.base, async.} =
|
||||||
## unsubscribe from a list of ``topic`` strings
|
## unsubscribe from a list of ``topic`` strings
|
||||||
for t in topics:
|
for t in topics:
|
||||||
p.topics.withValue(t.topic, topic):
|
let
|
||||||
topic[].handler.keepIf(proc (x: auto): bool = x != t.handler)
|
handler = t.handler
|
||||||
|
ttopic = t.topic
|
||||||
|
closureScope:
|
||||||
|
p.topics.withValue(ttopic, topic):
|
||||||
|
topic[].handler.keepIf(proc (x: auto): bool = x != handler)
|
||||||
|
|
||||||
if topic[].handler.len == 0:
|
if topic[].handler.len == 0:
|
||||||
# make sure we delete the topic if
|
# make sure we delete the topic if
|
||||||
# no more handlers are left
|
# no more handlers are left
|
||||||
p.topics.del(t.topic)
|
p.topics.del(ttopic)
|
||||||
|
|
||||||
libp2p_pubsub_topics.set(p.topics.len.int64)
|
libp2p_pubsub_topics.set(p.topics.len.int64)
|
||||||
|
|
||||||
|
|
|
@ -539,11 +539,11 @@ suite "GossipSub":
|
||||||
for dialer in nodes:
|
for dialer in nodes:
|
||||||
var handler: TopicHandler
|
var handler: TopicHandler
|
||||||
closureScope:
|
closureScope:
|
||||||
var dialerNode = dialer
|
var peerName = $dialer.peerInfo.peerId
|
||||||
handler = proc(topic: string, data: seq[byte]) {.async, gcsafe, closure.} =
|
handler = proc(topic: string, data: seq[byte]) {.async, gcsafe, closure.} =
|
||||||
if $dialerNode.peerInfo.peerId notin seen:
|
if peerName notin seen:
|
||||||
seen[$dialerNode.peerInfo.peerId] = 0
|
seen[peerName] = 0
|
||||||
seen[$dialerNode.peerInfo.peerId].inc
|
seen[peerName].inc
|
||||||
check topic == "foobar"
|
check topic == "foobar"
|
||||||
if not seenFut.finished() and seen.len >= runs:
|
if not seenFut.finished() and seen.len >= runs:
|
||||||
seenFut.complete()
|
seenFut.complete()
|
||||||
|
@ -588,17 +588,16 @@ suite "GossipSub":
|
||||||
await allFuturesThrowing(nodes.mapIt(it.start()))
|
await allFuturesThrowing(nodes.mapIt(it.start()))
|
||||||
await subscribeNodes(nodes)
|
await subscribeNodes(nodes)
|
||||||
|
|
||||||
var seen: Table[PeerID, int]
|
var seen: Table[string, int]
|
||||||
var seenFut = newFuture[void]()
|
var seenFut = newFuture[void]()
|
||||||
for dialer in nodes:
|
for dialer in nodes:
|
||||||
var handler: TopicHandler
|
var handler: TopicHandler
|
||||||
closureScope:
|
closureScope:
|
||||||
var dialerNode = dialer
|
var peerName = $dialer.peerInfo.peerId
|
||||||
handler = proc(topic: string, data: seq[byte])
|
handler = proc(topic: string, data: seq[byte]) {.async, gcsafe, closure.} =
|
||||||
{.async, gcsafe, closure.} =
|
if peerName notin seen:
|
||||||
if dialerNode.peerInfo.peerId notin seen:
|
seen[peerName] = 0
|
||||||
seen[dialerNode.peerInfo.peerId] = 0
|
seen[peerName].inc
|
||||||
seen[dialerNode.peerInfo.peerId].inc
|
|
||||||
check topic == "foobar"
|
check topic == "foobar"
|
||||||
if not seenFut.finished() and seen.len >= runs:
|
if not seenFut.finished() and seen.len >= runs:
|
||||||
seenFut.complete()
|
seenFut.complete()
|
||||||
|
|
Loading…
Reference in New Issue