gossipsub: unsubscribe fixes (#569)

* gossipsub: unsubscribe fixes

* fix KeyError when updating metric of unsubscribed topic
* fix unsubscribe message not being sent to all peers causing them to
keep thinking we're still subscribed
* release memory earlier in a few places

* floodsub fix
This commit is contained in:
Jacek Sieka 2021-05-07 00:43:45 +02:00 committed by GitHub
parent 9f301964ed
commit 83a20a992a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 221 additions and 253 deletions

View File

@ -39,10 +39,10 @@ proc addSeen*(f: FloodSub, msgId: MessageID): bool =
# Return true if the message has already been seen # Return true if the message has already been seen
f.seen.put(f.seenSalt & msgId) f.seen.put(f.seenSalt & msgId)
method subscribeTopic*(f: FloodSub, proc handleSubscribe*(f: FloodSub,
topic: string, peer: PubsubPeer,
subscribe: bool, topic: string,
peer: PubsubPeer) {.gcsafe.} = subscribe: bool) =
logScope: logScope:
peer peer
topic topic
@ -61,21 +61,16 @@ method subscribeTopic*(f: FloodSub,
return return
if subscribe: if subscribe:
if topic notin f.floodsub:
f.floodsub[topic] = initHashSet[PubSubPeer]()
trace "adding subscription for topic", peer, topic trace "adding subscription for topic", peer, topic
# subscribe the peer to the topic # subscribe the peer to the topic
f.floodsub[topic].incl(peer) f.floodsub.mgetOrPut(topic, HashSet[PubSubPeer]()).incl(peer)
else: else:
if topic notin f.floodsub: f.floodsub.withValue(topic, peers):
return trace "removing subscription for topic", peer, topic
trace "removing subscription for topic", peer, topic # unsubscribe the peer from the topic
peers[].excl(peer)
# unsubscribe the peer from the topic
f.floodsub[topic].excl(peer)
method unsubscribePeer*(f: FloodSub, peer: PeerID) = method unsubscribePeer*(f: FloodSub, peer: PeerID) =
## handle peer disconnects ## handle peer disconnects
@ -93,7 +88,9 @@ method unsubscribePeer*(f: FloodSub, peer: PeerID) =
method rpcHandler*(f: FloodSub, method rpcHandler*(f: FloodSub,
peer: PubSubPeer, peer: PubSubPeer,
rpcMsg: RPCMsg) {.async.} = rpcMsg: RPCMsg) {.async.} =
await procCall PubSub(f).rpcHandler(peer, rpcMsg) for i in 0..<min(f.topicsHigh, rpcMsg.subscriptions.len):
template sub: untyped = rpcMsg.subscriptions[i]
f.handleSubscribe(peer, sub.topic, sub.subscribe)
for msg in rpcMsg.messages: # for every message for msg in rpcMsg.messages: # for every message
let msgId = f.msgIdProvider(msg) let msgId = f.msgIdProvider(msg)
@ -139,6 +136,8 @@ method rpcHandler*(f: FloodSub,
f.broadcast(toSendPeers, RPCMsg(messages: @[msg])) f.broadcast(toSendPeers, RPCMsg(messages: @[msg]))
trace "Forwared message to peers", peers = toSendPeers.len trace "Forwared message to peers", peers = toSendPeers.len
f.updateMetrics(rpcMsg)
method init*(f: FloodSub) = method init*(f: FloodSub) =
proc handler(conn: Connection, proto: string) {.async.} = proc handler(conn: Connection, proto: string) {.async.} =
## main protocol handler that gets triggered on every ## main protocol handler that gets triggered on every
@ -202,19 +201,6 @@ method publish*(f: FloodSub,
return peers.len return peers.len
method unsubscribe*(f: FloodSub,
topics: seq[TopicPair]) =
procCall PubSub(f).unsubscribe(topics)
for p in f.peers.values:
f.sendSubs(p, topics.mapIt(it.topic).deduplicate(), false)
method unsubscribeAll*(f: FloodSub, topic: string) =
procCall PubSub(f).unsubscribeAll(topic)
for p in f.peers.values:
f.sendSubs(p, @[topic], false)
method initPubSub*(f: FloodSub) = method initPubSub*(f: FloodSub) =
procCall PubSub(f).initPubSub() procCall PubSub(f).initPubSub()
f.seen = TimedCache[MessageID].init(2.minutes) f.seen = TimedCache[MessageID].init(2.minutes)

View File

@ -200,29 +200,29 @@ method unsubscribePeer*(g: GossipSub, peer: PeerID) =
procCall FloodSub(g).unsubscribePeer(peer) procCall FloodSub(g).unsubscribePeer(peer)
method subscribeTopic*(g: GossipSub, proc handleSubscribe*(g: GossipSub,
topic: string, peer: PubSubPeer,
subscribe: bool, topic: string,
peer: PubSubPeer) {.gcsafe.} = subscribe: bool) =
logScope: logScope:
peer peer
topic topic
# this is a workaround for a race condition
# that can happen if we disconnect the peer very early
# in the future we might use this as a test case
# and eventually remove this workaround
if subscribe and peer.peerId notin g.peers:
trace "ignoring unknown peer"
return
if subscribe and not(isNil(g.subscriptionValidator)) and not(g.subscriptionValidator(topic)):
# this is a violation, so warn should be in order
trace "ignoring invalid topic subscription", topic, peer
libp2p_gossipsub_invalid_topic_subscription.inc()
return
if subscribe: if subscribe:
# this is a workaround for a race condition
# that can happen if we disconnect the peer very early
# in the future we might use this as a test case
# and eventually remove this workaround
if peer.peerId notin g.peers:
trace "ignoring unknown peer"
return
if not(isNil(g.subscriptionValidator)) and not(g.subscriptionValidator(topic)):
# this is a violation, so warn should be in order
trace "ignoring invalid topic subscription", topic, peer
libp2p_gossipsub_invalid_topic_subscription.inc()
return
trace "peer subscribed to topic" trace "peer subscribed to topic"
# subscribe remote peer to the topic # subscribe remote peer to the topic
@ -241,50 +241,48 @@ method subscribeTopic*(g: GossipSub,
trace "gossip peers", peers = g.gossipsub.peers(topic), topic trace "gossip peers", peers = g.gossipsub.peers(topic), topic
proc handleControl(g: GossipSub, peer: PubSubPeer, rpcMsg: RPCMsg) = proc handleControl(g: GossipSub, peer: PubSubPeer, control: ControlMessage) =
if rpcMsg.control.isSome: g.handlePrune(peer, control.prune)
let control = rpcMsg.control.get()
g.handlePrune(peer, control.prune)
var respControl: ControlMessage var respControl: ControlMessage
let iwant = g.handleIHave(peer, control.ihave) let iwant = g.handleIHave(peer, control.ihave)
if iwant.messageIDs.len > 0: if iwant.messageIDs.len > 0:
respControl.iwant.add(iwant) respControl.iwant.add(iwant)
respControl.prune.add(g.handleGraft(peer, control.graft)) respControl.prune.add(g.handleGraft(peer, control.graft))
let messages = g.handleIWant(peer, control.iwant) let messages = g.handleIWant(peer, control.iwant)
if if
respControl.prune.len > 0 or respControl.prune.len > 0 or
respControl.iwant.len > 0 or respControl.iwant.len > 0 or
messages.len > 0: messages.len > 0:
# iwant and prunes from here, also messages # iwant and prunes from here, also messages
for smsg in messages: for smsg in messages:
for topic in smsg.topicIDs: for topic in smsg.topicIDs:
if g.knownTopics.contains(topic): if g.knownTopics.contains(topic):
libp2p_pubsub_broadcast_messages.inc(labelValues = [topic]) libp2p_pubsub_broadcast_messages.inc(labelValues = [topic])
else:
libp2p_pubsub_broadcast_messages.inc(labelValues = ["generic"])
libp2p_pubsub_broadcast_iwant.inc(respControl.iwant.len.int64)
for prune in respControl.prune:
if g.knownTopics.contains(prune.topicID):
libp2p_pubsub_broadcast_prune.inc(labelValues = [prune.topicID])
else: else:
libp2p_pubsub_broadcast_prune.inc(labelValues = ["generic"]) libp2p_pubsub_broadcast_messages.inc(labelValues = ["generic"])
trace "sending control message", msg = shortLog(respControl), peer libp2p_pubsub_broadcast_iwant.inc(respControl.iwant.len.int64)
g.send(
peer, for prune in respControl.prune:
RPCMsg(control: some(respControl), messages: messages)) if g.knownTopics.contains(prune.topicID):
libp2p_pubsub_broadcast_prune.inc(labelValues = [prune.topicID])
else:
libp2p_pubsub_broadcast_prune.inc(labelValues = ["generic"])
trace "sending control message", msg = shortLog(respControl), peer
g.send(
peer,
RPCMsg(control: some(respControl), messages: messages))
method rpcHandler*(g: GossipSub, method rpcHandler*(g: GossipSub,
peer: PubSubPeer, peer: PubSubPeer,
rpcMsg: RPCMsg) {.async.} = rpcMsg: RPCMsg) {.async.} =
# base will check the amount of subscriptions and process subscriptions for i in 0..<min(g.topicsHigh, rpcMsg.subscriptions.len):
# also will update some metrics template sub: untyped = rpcMsg.subscriptions[i]
await procCall PubSub(g).rpcHandler(peer, rpcMsg) g.handleSubscribe(peer, sub.topic, sub.subscribe)
# the above call applied limtis to subs number # the above call applied limtis to subs number
# in gossipsub we want to apply scoring as well # in gossipsub we want to apply scoring as well
@ -294,7 +292,8 @@ method rpcHandler*(g: GossipSub,
limit = g.topicsHigh limit = g.topicsHigh
peer.behaviourPenalty += 0.1 peer.behaviourPenalty += 0.1
for msg in rpcMsg.messages: # for every message for i in 0..<rpcMsg.messages.len(): # for every message
template msg: untyped = rpcMsg.messages[i]
let msgId = g.msgIdProvider(msg) let msgId = g.msgIdProvider(msg)
# avoid the remote peer from controlling the seen table hashing # avoid the remote peer from controlling the seen table hashing
@ -371,66 +370,41 @@ method rpcHandler*(g: GossipSub,
else: else:
libp2p_pubsub_messages_rebroadcasted.inc(toSendPeers.len.int64, labelValues = ["generic"]) libp2p_pubsub_messages_rebroadcasted.inc(toSendPeers.len.int64, labelValues = ["generic"])
g.handleControl(peer, rpcMsg) if rpcMsg.control.isSome():
g.handleControl(peer, rpcMsg.control.unsafeGet())
method subscribe*(g: GossipSub, g.updateMetrics(rpcMsg)
topic: string,
handler: TopicHandler) =
procCall PubSub(g).subscribe(topic, handler)
# if we have a fanout on this topic break it method onTopicSubscription*(g: GossipSub, topic: string, subscribed: bool) =
if topic in g.fanout: if subscribed:
g.fanout.del(topic) procCall PubSub(g).onTopicSubscription(topic, subscribed)
# rebalance but don't update metrics here, we do that only in the heartbeat # if we have a fanout on this topic break it
g.rebalanceMesh(topic, metrics = nil) if topic in g.fanout:
g.fanout.del(topic)
proc unsubscribe*(g: GossipSub, topic: string) = # rebalance but don't update metrics here, we do that only in the heartbeat
var g.rebalanceMesh(topic, metrics = nil)
msg = RPCMsg.withSubs(@[topic], subscribe = false) else:
gpeers = g.gossipsub.getOrDefault(topic)
if topic in g.mesh:
let mpeers = g.mesh.getOrDefault(topic) let mpeers = g.mesh.getOrDefault(topic)
# remove mesh peers from gpeers, we send 2 different messages # Remove peers from the mesh since we're no longer both interested
gpeers = gpeers - mpeers # in the topic
# send to peers NOT in mesh first let msg = RPCMsg(control: some(ControlMessage(
g.broadcast(gpeers, msg) prune: @[ControlPrune(
topicID: topic,
peers: g.peerExchangeList(topic),
backoff: g.parameters.pruneBackoff.seconds.uint64)])))
g.broadcast(mpeers, msg)
for peer in mpeers: for peer in mpeers:
trace "pruning unsubscribeAll call peer", peer, score = peer.score
g.pruned(peer, topic) g.pruned(peer, topic)
g.mesh.del(topic) g.mesh.del(topic)
msg.control =
some(ControlMessage(prune:
@[ControlPrune(topicID: topic,
peers: g.peerExchangeList(topic),
backoff: g.parameters.pruneBackoff.seconds.uint64)]))
# send to peers IN mesh now # Send unsubscribe (in reverse order to sub/graft)
g.broadcast(mpeers, msg) procCall PubSub(g).onTopicSubscription(topic, subscribed)
else:
g.broadcast(gpeers, msg)
g.topicParams.del(topic)
method unsubscribeAll*(g: GossipSub, topic: string) =
g.unsubscribe(topic)
# finally let's remove from g.topics, do that by calling PubSub
procCall PubSub(g).unsubscribeAll(topic)
method unsubscribe*(g: GossipSub,
topics: seq[TopicPair]) =
procCall PubSub(g).unsubscribe(topics)
for (topic, handler) in topics:
# delete from mesh only if no handlers are left
# (handlers are removed in pubsub unsubscribe above)
if topic notin g.topics:
g.unsubscribe(topic)
method publish*(g: GossipSub, method publish*(g: GossipSub,
topic: string, topic: string,

View File

@ -69,13 +69,13 @@ declarePublicCounter(libp2p_pubsub_received_prune, "pubsub broadcast prune", lab
type type
TopicHandler* = proc(topic: string, TopicHandler* = proc(topic: string,
data: seq[byte]): Future[void] {.gcsafe.} data: seq[byte]): Future[void] {.gcsafe, raises: [Defect].}
ValidationResult* {.pure.} = enum ValidationResult* {.pure.} = enum
Accept, Reject, Ignore Accept, Reject, Ignore
ValidatorHandler* = proc(topic: string, ValidatorHandler* = proc(topic: string,
message: Message): Future[ValidationResult] {.gcsafe.} message: Message): Future[ValidationResult] {.gcsafe, raises: [Defect].}
TopicPair* = tuple[topic: string, handler: TopicHandler] TopicPair* = tuple[topic: string, handler: TopicHandler]
@ -85,15 +85,10 @@ type
SubscriptionValidator* = SubscriptionValidator* =
proc(topic: string): bool {.raises: [Defect], gcsafe.} proc(topic: string): bool {.raises: [Defect], gcsafe.}
Topic* = object
# make this a variant type if one day we have different Params structs
name*: string
handler*: seq[TopicHandler]
PubSub* = ref object of LPProtocol PubSub* = ref object of LPProtocol
switch*: Switch # the switch used to dial/connect to peers switch*: Switch # the switch used to dial/connect to peers
peerInfo*: PeerInfo # this peer's info peerInfo*: PeerInfo # this peer's info
topics*: Table[string, Topic] # local topics topics*: Table[string, seq[TopicHandler]] # the topics that _we_ are interested in
peers*: Table[PeerID, PubSubPeer] ##\ peers*: Table[PeerID, PubSubPeer] ##\
## Peers that we are interested to gossip with (but not necessarily ## Peers that we are interested to gossip with (but not necessarily
## yet connected to) ## yet connected to)
@ -106,7 +101,7 @@ type
msgSeqno*: uint64 msgSeqno*: uint64
anonymize*: bool # if we omit fromPeer and seqno from RPC messages we send anonymize*: bool # if we omit fromPeer and seqno from RPC messages we send
subscriptionValidator*: SubscriptionValidator # callback used to validate subscriptions subscriptionValidator*: SubscriptionValidator # callback used to validate subscriptions
topicsHigh*: int # the maximum number of topics we allow in a subscription message (application specific, defaults to int max) topicsHigh*: int # the maximum number of topics a peer is allowed to subscribe to
knownTopics*: HashSet[string] knownTopics*: HashSet[string]
@ -186,10 +181,11 @@ proc broadcast*(
proc sendSubs*(p: PubSub, proc sendSubs*(p: PubSub,
peer: PubSubPeer, peer: PubSubPeer,
topics: seq[string], topics: openArray[string],
subscribe: bool) = subscribe: bool) =
## send subscriptions to remote peer ## send subscriptions to remote peer
p.send(peer, RPCMsg.withSubs(topics, subscribe)) p.send(peer, RPCMsg.withSubs(topics, subscribe))
for topic in topics: for topic in topics:
if subscribe: if subscribe:
if p.knownTopics.contains(topic): if p.knownTopics.contains(topic):
@ -202,26 +198,9 @@ proc sendSubs*(p: PubSub,
else: else:
libp2p_pubsub_broadcast_unsubscriptions.inc(labelValues = ["generic"]) libp2p_pubsub_broadcast_unsubscriptions.inc(labelValues = ["generic"])
method subscribeTopic*(p: PubSub, proc updateMetrics*(p: PubSub, rpcMsg: RPCMsg) =
topic: string,
subscribe: bool,
peer: PubSubPeer) {.base.} =
# both gossipsub and floodsub diverge, and this super call is not necessary right now
# if necessary remove the assertion
doAssert(false, "unexpected call to pubsub.subscribeTopic")
method rpcHandler*(p: PubSub,
peer: PubSubPeer,
rpcMsg: RPCMsg): Future[void] {.base.} =
## handle rpc messages
trace "processing RPC message", msg = rpcMsg.shortLog, peer
for i in 0..<min(rpcMsg.subscriptions.len, p.topicsHigh): for i in 0..<min(rpcMsg.subscriptions.len, p.topicsHigh):
let s = rpcMsg.subscriptions[i] template sub(): untyped = rpcMsg.subscriptions[i]
trace "about to subscribe to topic", topicId = s.topic, peer, subscribe = s.subscribe
p.subscribeTopic(s.topic, s.subscribe, peer)
for i in 0..<min(rpcMsg.subscriptions.len, p.topicsHigh):
let sub = rpcMsg.subscriptions[i]
if sub.subscribe: if sub.subscribe:
if p.knownTopics.contains(sub.topic): if p.knownTopics.contains(sub.topic):
libp2p_pubsub_received_subscriptions.inc(labelValues = [sub.topic]) libp2p_pubsub_received_subscriptions.inc(labelValues = [sub.topic])
@ -233,8 +212,10 @@ method rpcHandler*(p: PubSub,
else: else:
libp2p_pubsub_received_unsubscriptions.inc(labelValues = ["generic"]) libp2p_pubsub_received_unsubscriptions.inc(labelValues = ["generic"])
for smsg in rpcMsg.messages: for i in 0..<rpcMsg.messages.len():
for topic in smsg.topicIDs: template smsg: untyped = rpcMsg.messages[i]
for j in 0..<smsg.topicIDs.len():
template topic: untyped = smsg.topicIDs[j]
if p.knownTopics.contains(topic): if p.knownTopics.contains(topic):
libp2p_pubsub_received_messages.inc(labelValues = [topic]) libp2p_pubsub_received_messages.inc(labelValues = [topic])
else: else:
@ -242,8 +223,7 @@ method rpcHandler*(p: PubSub,
if rpcMsg.control.isSome(): if rpcMsg.control.isSome():
libp2p_pubsub_received_iwant.inc(rpcMsg.control.get().iwant.len.int64) libp2p_pubsub_received_iwant.inc(rpcMsg.control.get().iwant.len.int64)
template control: untyped = rpcMsg.control.unsafeGet()
let control = rpcMsg.control.get()
for ihave in control.ihave: for ihave in control.ihave:
if p.knownTopics.contains(ihave.topicID): if p.knownTopics.contains(ihave.topicID):
libp2p_pubsub_received_ihave.inc(labelValues = [ihave.topicID]) libp2p_pubsub_received_ihave.inc(labelValues = [ihave.topicID])
@ -260,11 +240,11 @@ method rpcHandler*(p: PubSub,
else: else:
libp2p_pubsub_received_prune.inc(labelValues = ["generic"]) libp2p_pubsub_received_prune.inc(labelValues = ["generic"])
# Avoid async transformation to avoid copying of rpcMsg into closure - this method rpcHandler*(p: PubSub,
# is an unnecessary hotspot in gossip peer: PubSubPeer,
var res = newFuture[void]("PubSub.rpcHandler") rpcMsg: RPCMsg): Future[void] {.base.} =
res.complete() ## Handler that must be overridden by concrete implementation
return res raiseAssert "Unimplemented"
method onNewPeer(p: PubSub, peer: PubSubPeer) {.base.} = discard method onNewPeer(p: PubSub, peer: PubSubPeer) {.base.} = discard
@ -278,14 +258,14 @@ method onPubSubPeerEvent*(p: PubSub, peer: PubsubPeer, event: PubsubPeerEvent) {
discard discard
proc getOrCreatePeer*( proc getOrCreatePeer*(
p: PubSub, p: PubSub,
peer: PeerID, peerId: PeerID,
protos: seq[string]): PubSubPeer = protos: seq[string]): PubSubPeer =
if peer in p.peers: p.peers.withValue(peerId, peer):
return p.peers[peer] return peer[]
proc getConn(): Future[Connection] = proc getConn(): Future[Connection] =
p.switch.dial(peer, protos) p.switch.dial(peerId, protos)
proc dropConn(peer: PubSubPeer) = proc dropConn(peer: PubSubPeer) =
proc dropConnAsync(peer: PubsubPeer) {.async.} = proc dropConnAsync(peer: PubsubPeer) {.async.} =
@ -299,10 +279,10 @@ proc getOrCreatePeer*(
p.onPubSubPeerEvent(peer, event) p.onPubSubPeerEvent(peer, event)
# create new pubsub peer # create new pubsub peer
let pubSubPeer = newPubSubPeer(peer, getConn, dropConn, onEvent, protos[0]) let pubSubPeer = newPubSubPeer(peerId, getConn, dropConn, onEvent, protos[0])
debug "created new pubsub peer", peer debug "created new pubsub peer", peerId
p.peers[peer] = pubSubPeer p.peers[peerId] = pubSubPeer
pubSubPeer.observers = p.observers pubSubPeer.observers = p.observers
onNewPeer(p, pubSubPeer) onNewPeer(p, pubSubPeer)
@ -314,26 +294,40 @@ proc getOrCreatePeer*(
return pubSubPeer return pubSubPeer
proc handleData*(p: PubSub, topic: string, data: seq[byte]): Future[void] {.async.} = proc handleData*(p: PubSub, topic: string, data: seq[byte]): Future[void] =
if topic notin p.topics: return # Not subscribed # Start work on all data handlers without copying data into closure like
# happens on {.async.} transformation
p.topics.withValue(topic, handlers) do:
var futs = newSeq[Future[void]]()
# gather all futures without yielding to scheduler for handler in handlers[]:
var futs = p.topics[topic].handler.mapIt(it(topic, data)) if handler != nil: # allow nil handlers
if futs.len() == 0: return # No handlers let fut = handler(topic, data)
if not fut.completed(): # Fast path for successful sync handlers
futs.add(fut)
try: if futs.len() > 0:
futs = await allFinished(futs) proc waiter(): Future[void] {.async.} =
except CancelledError: # slow path - we have to wait for the handlers to complete
# propagate cancellation try:
for fut in futs: futs = await allFinished(futs)
if not(fut.finished): except CancelledError:
fut.cancel() # propagate cancellation
for fut in futs:
if not(fut.finished):
fut.cancel()
# check for errors in futures # check for errors in futures
for fut in futs: for fut in futs:
if fut.failed: if fut.failed:
let err = fut.readError() let err = fut.readError()
warn "Error in topic handler", msg = err.msg warn "Error in topic handler", msg = err.msg
return waiter()
# Fast path - futures finished synchronously or nobody cared about data
var res = newFuture[void]()
res.complete()
return res
method handleConn*(p: PubSub, method handleConn*(p: PubSub,
conn: Connection, conn: Connection,
@ -381,54 +375,65 @@ method subscribePeer*(p: PubSub, peer: PeerID) {.base.} =
proc updateTopicMetrics(p: PubSub, topic: string) = proc updateTopicMetrics(p: PubSub, topic: string) =
# metrics # metrics
libp2p_pubsub_topics.set(p.topics.len.int64) libp2p_pubsub_topics.set(p.topics.len.int64)
if p.knownTopics.contains(topic): if p.knownTopics.contains(topic):
libp2p_pubsub_topic_handlers.set(p.topics[topic].handler.len.int64, labelValues = [topic]) p.topics.withValue(topic, handlers) do:
libp2p_pubsub_topic_handlers.set(handlers[].len.int64, labelValues = [topic])
do:
libp2p_pubsub_topic_handlers.set(0, labelValues = [topic])
else: else:
libp2p_pubsub_topic_handlers.set(0, labelValues = ["other"]) var others: int64 = 0
for key, val in p.topics: for key, val in p.topics:
if not p.knownTopics.contains(key): if key notin p.knownTopics: others += 1
libp2p_pubsub_topic_handlers.inc(val.handler.len.int64, labelValues = ["other"])
method unsubscribe*(p: PubSub, libp2p_pubsub_topic_handlers.set(others, labelValues = ["other"])
topics: seq[TopicPair]) {.base.} =
## unsubscribe from a list of ``topic`` strings
for t in topics:
let
handler = t.handler
ttopic = t.topic
closureScope:
p.topics.withValue(ttopic, topic):
topic[].handler.keepIf(proc (x: auto): bool = x != handler)
if topic[].handler.len == 0: method onTopicSubscription*(p: PubSub, topic: string, subscribed: bool) {.base.} =
# make sure we delete the topic if # Called when subscribe is called the first time for a topic or unsubscribe
# no more handlers are left # removes the last handler
p.topics.del(ttopic)
p.updateTopicMetrics(ttopic) # Notify others that we are no longer interested in the topic
for _, peer in p.peers:
p.sendSubs(peer, [topic], subscribed)
libp2p_pubsub_unsubscriptions.inc() if subscribed:
libp2p_pubsub_subscriptions.inc()
else:
libp2p_pubsub_unsubscriptions.inc()
proc unsubscribe*(p: PubSub, proc unsubscribe*(p: PubSub,
topic: string, topic: string,
handler: TopicHandler) = handler: TopicHandler) =
## unsubscribe from a ``topic`` string ## unsubscribe from a ``topic`` string
## ##
p.unsubscribe(@[(topic, handler)]) p.topics.withValue(topic, handlers):
handlers[].keepItIf(it != handler)
method unsubscribeAll*(p: PubSub, topic: string) {.base.} = if handlers[].len() == 0:
p.topics.del(topic)
p.onTopicSubscription(topic, false)
p.updateTopicMetrics(topic)
proc unsubscribe*(p: PubSub, topics: openArray[TopicPair]) =
## unsubscribe from a list of ``topic`` handlers
for t in topics:
p.unsubscribe(t.topic, t.handler)
proc unsubscribeAll*(p: PubSub, topic: string) =
if topic notin p.topics: if topic notin p.topics:
debug "unsubscribeAll called for an unknown topic", topic debug "unsubscribeAll called for an unknown topic", topic
else: else:
p.topics.del(topic) p.topics.del(topic)
p.onTopicSubscription(topic, false)
p.updateTopicMetrics(topic) p.updateTopicMetrics(topic)
libp2p_pubsub_unsubscriptions.inc() proc subscribe*(p: PubSub,
topic: string,
method subscribe*(p: PubSub, handler: TopicHandler) =
topic: string,
handler: TopicHandler) {.base.} =
## subscribe to a topic ## subscribe to a topic
## ##
## ``topic`` - a string topic to subscribe to ## ``topic`` - a string topic to subscribe to
@ -437,19 +442,19 @@ method subscribe*(p: PubSub,
## that will be triggered ## that will be triggered
## on every received message ## on every received message
## ##
if topic notin p.topics:
p.topics.withValue(topic, handlers) do:
# Already subscribed, just adding another handler
handlers[].add(handler)
do:
trace "subscribing to topic", name = topic trace "subscribing to topic", name = topic
p.topics[topic] = Topic(name: topic) p.topics[topic] = @[handler]
p.topics[topic].handler.add(handler) # Notify on first handler
p.onTopicSubscription(topic, true)
for _, peer in p.peers:
p.sendSubs(peer, @[topic], true)
p.updateTopicMetrics(topic) p.updateTopicMetrics(topic)
libp2p_pubsub_subscriptions.inc()
method publish*(p: PubSub, method publish*(p: PubSub,
topic: string, topic: string,
data: seq[byte]): Future[int] {.base, async.} = data: seq[byte]): Future[int] {.base, async.} =
@ -481,18 +486,17 @@ method addValidator*(p: PubSub,
topic: varargs[string], topic: varargs[string],
hook: ValidatorHandler) {.base.} = hook: ValidatorHandler) {.base.} =
for t in topic: for t in topic:
if t notin p.validators:
p.validators[t] = initHashSet[ValidatorHandler]()
trace "adding validator for topic", topicId = t trace "adding validator for topic", topicId = t
p.validators[t].incl(hook) p.validators.mgetOrPut(t, HashSet[ValidatorHandler]()).incl(hook)
method removeValidator*(p: PubSub, method removeValidator*(p: PubSub,
topic: varargs[string], topic: varargs[string],
hook: ValidatorHandler) {.base.} = hook: ValidatorHandler) {.base.} =
for t in topic: for t in topic:
if t in p.validators: p.validators.withValue(t, validators):
p.validators[t].excl(hook) validators[].excl(hook)
if validators[].len() == 0:
p.validators.del(t)
method validate*(p: PubSub, message: Message): Future[ValidationResult] {.async, base.} = method validate*(p: PubSub, message: Message): Future[ValidationResult] {.async, base.} =
var pending: seq[Future[ValidationResult]] var pending: seq[Future[ValidationResult]]

View File

@ -116,12 +116,14 @@ proc handle*(p: PubSubPeer, conn: Connection) {.async.} =
while not conn.atEof: while not conn.atEof:
trace "waiting for data", conn, peer = p, closed = conn.closed trace "waiting for data", conn, peer = p, closed = conn.closed
let data = await conn.readLp(64 * 1024) var data = await conn.readLp(64 * 1024)
trace "read data from peer", trace "read data from peer",
conn, peer = p, closed = conn.closed, conn, peer = p, closed = conn.closed,
data = data.shortLog data = data.shortLog
var rmsg = decodeRpcMsg(data) var rmsg = decodeRpcMsg(data)
data = newSeq[byte]() # Release memory
if rmsg.isErr(): if rmsg.isErr():
notice "failed to decode msg from peer", notice "failed to decode msg from peer",
conn, peer = p, closed = conn.closed, conn, peer = p, closed = conn.closed,
@ -204,20 +206,25 @@ proc connectImpl(p: PubSubPeer) {.async.} =
proc connect*(p: PubSubPeer) = proc connect*(p: PubSubPeer) =
asyncSpawn connectImpl(p) asyncSpawn connectImpl(p)
proc sendImpl(conn: Connection, encoded: seq[byte]) {.async.} = proc sendImpl(conn: Connection, encoded: seq[byte]): Future[void] {.raises: [Defect].} =
try: trace "sending encoded msgs to peer", conn, encoded = shortLog(encoded)
trace "sending encoded msgs to peer", conn, encoded = shortLog(encoded)
await conn.writeLp(encoded)
trace "sent pubsub message to remote", conn
except CatchableError as exc: # never cancelled let fut = conn.writeLp(encoded) # Avoid copying `encoded` into future
# Because we detach the send call from the currently executing task using proc sendWaiter(): Future[void] {.async.} =
# asyncSpawn, no exceptions may leak out of it try:
trace "Unable to send to remote", conn, msg = exc.msg await fut
# Next time sendConn is used, it will be have its close flag set and thus trace "sent pubsub message to remote", conn
# will be recycled
await conn.close() # This will clean up the send connection except CatchableError as exc: # never cancelled
# Because we detach the send call from the currently executing task using
# asyncSpawn, no exceptions may leak out of it
trace "Unable to send to remote", conn, msg = exc.msg
# Next time sendConn is used, it will be have its close flag set and thus
# will be recycled
await conn.close() # This will clean up the send connection
return sendWaiter()
template sendMetrics(msg: RPCMsg): untyped = template sendMetrics(msg: RPCMsg): untyped =
when defined(libp2p_expensive_metrics): when defined(libp2p_expensive_metrics):
@ -240,10 +247,7 @@ proc sendEncoded*(p: PubSubPeer, msg: seq[byte]) {.raises: [Defect].} =
# To limit the size of the closure, we only pass the encoded message and # To limit the size of the closure, we only pass the encoded message and
# connection to the spawned send task # connection to the spawned send task
asyncSpawn(try: asyncSpawn sendImpl(conn, msg)
sendImpl(conn, msg)
except Exception as exc: # TODO chronos Exception
raiseAssert exc.msg)
proc send*(p: PubSubPeer, msg: RPCMsg, anonymize: bool) {.raises: [Defect].} = proc send*(p: PubSubPeer, msg: RPCMsg, anonymize: bool) {.raises: [Defect].} =
trace "sending msg to peer", peer = p, rpcMsg = shortLog(msg) trace "sending msg to peer", peer = p, rpcMsg = shortLog(msg)

View File

@ -236,7 +236,7 @@ proc readLp*(s: LPStream, maxSize: int): Future[seq[byte]] {.async, gcsafe.} =
await s.readExactly(addr res[0], res.len) await s.readExactly(addr res[0], res.len)
return res return res
method write*(s: LPStream, msg: seq[byte]): Future[void] {.base.} = method write*(s: LPStream, msg: seq[byte]): Future[void] {.base, raises: [Defect].} =
doAssert(false, "not implemented!") doAssert(false, "not implemented!")
proc writeLp*(s: LPStream, msg: openArray[byte]): Future[void] = proc writeLp*(s: LPStream, msg: openArray[byte]): Future[void] =

View File

@ -87,7 +87,7 @@ template rng*(): ref BrHmacDrbgContext =
getRng() getRng()
type type
WriteHandler* = proc(data: seq[byte]): Future[void] {.gcsafe.} WriteHandler* = proc(data: seq[byte]): Future[void] {.gcsafe, raises: [Defect].}
TestBufferStream* = ref object of BufferStream TestBufferStream* = ref object of BufferStream
writeHandler*: WriteHandler writeHandler*: WriteHandler