diff --git a/libp2p/protocols/pubsub/floodsub.nim b/libp2p/protocols/pubsub/floodsub.nim index 004b38413..6260b3bbf 100644 --- a/libp2p/protocols/pubsub/floodsub.nim +++ b/libp2p/protocols/pubsub/floodsub.nim @@ -39,10 +39,10 @@ proc addSeen*(f: FloodSub, msgId: MessageID): bool = # Return true if the message has already been seen f.seen.put(f.seenSalt & msgId) -method subscribeTopic*(f: FloodSub, - topic: string, - subscribe: bool, - peer: PubsubPeer) {.gcsafe.} = +proc handleSubscribe*(f: FloodSub, + peer: PubsubPeer, + topic: string, + subscribe: bool) = logScope: peer topic @@ -61,21 +61,16 @@ method subscribeTopic*(f: FloodSub, return if subscribe: - if topic notin f.floodsub: - f.floodsub[topic] = initHashSet[PubSubPeer]() - trace "adding subscription for topic", peer, topic # subscribe the peer to the topic - f.floodsub[topic].incl(peer) + f.floodsub.mgetOrPut(topic, HashSet[PubSubPeer]()).incl(peer) else: - if topic notin f.floodsub: - return + f.floodsub.withValue(topic, peers): + trace "removing subscription for topic", peer, topic - trace "removing subscription for topic", peer, topic - - # unsubscribe the peer from the topic - f.floodsub[topic].excl(peer) + # unsubscribe the peer from the topic + peers[].excl(peer) method unsubscribePeer*(f: FloodSub, peer: PeerID) = ## handle peer disconnects @@ -93,7 +88,9 @@ method unsubscribePeer*(f: FloodSub, peer: PeerID) = method rpcHandler*(f: FloodSub, peer: PubSubPeer, rpcMsg: RPCMsg) {.async.} = - await procCall PubSub(f).rpcHandler(peer, rpcMsg) + for i in 0.. 0: - respControl.iwant.add(iwant) - respControl.prune.add(g.handleGraft(peer, control.graft)) - let messages = g.handleIWant(peer, control.iwant) + var respControl: ControlMessage + let iwant = g.handleIHave(peer, control.ihave) + if iwant.messageIDs.len > 0: + respControl.iwant.add(iwant) + respControl.prune.add(g.handleGraft(peer, control.graft)) + let messages = g.handleIWant(peer, control.iwant) - if - respControl.prune.len > 0 or - respControl.iwant.len > 0 or - messages.len > 0: - # iwant and prunes from here, also messages + if + respControl.prune.len > 0 or + respControl.iwant.len > 0 or + messages.len > 0: + # iwant and prunes from here, also messages - for smsg in messages: - for topic in smsg.topicIDs: - if g.knownTopics.contains(topic): - libp2p_pubsub_broadcast_messages.inc(labelValues = [topic]) - else: - libp2p_pubsub_broadcast_messages.inc(labelValues = ["generic"]) - - libp2p_pubsub_broadcast_iwant.inc(respControl.iwant.len.int64) - - for prune in respControl.prune: - if g.knownTopics.contains(prune.topicID): - libp2p_pubsub_broadcast_prune.inc(labelValues = [prune.topicID]) + for smsg in messages: + for topic in smsg.topicIDs: + if g.knownTopics.contains(topic): + libp2p_pubsub_broadcast_messages.inc(labelValues = [topic]) else: - libp2p_pubsub_broadcast_prune.inc(labelValues = ["generic"]) + libp2p_pubsub_broadcast_messages.inc(labelValues = ["generic"]) - trace "sending control message", msg = shortLog(respControl), peer - g.send( - peer, - RPCMsg(control: some(respControl), messages: messages)) + libp2p_pubsub_broadcast_iwant.inc(respControl.iwant.len.int64) + + for prune in respControl.prune: + if g.knownTopics.contains(prune.topicID): + libp2p_pubsub_broadcast_prune.inc(labelValues = [prune.topicID]) + else: + libp2p_pubsub_broadcast_prune.inc(labelValues = ["generic"]) + + trace "sending control message", msg = shortLog(respControl), peer + g.send( + peer, + RPCMsg(control: some(respControl), messages: messages)) method rpcHandler*(g: GossipSub, peer: PubSubPeer, rpcMsg: RPCMsg) {.async.} = - # base will check the amount of subscriptions and process subscriptions - # also will update some metrics - await procCall PubSub(g).rpcHandler(peer, rpcMsg) + for i in 0.. 0: + proc waiter(): Future[void] {.async.} = + # slow path - we have to wait for the handlers to complete + try: + futs = await allFinished(futs) + except CancelledError: + # propagate cancellation + for fut in futs: + if not(fut.finished): + fut.cancel() - # check for errors in futures - for fut in futs: - if fut.failed: - let err = fut.readError() - warn "Error in topic handler", msg = err.msg + # check for errors in futures + for fut in futs: + if fut.failed: + let err = fut.readError() + warn "Error in topic handler", msg = err.msg + return waiter() + + # Fast path - futures finished synchronously or nobody cared about data + var res = newFuture[void]() + res.complete() + return res method handleConn*(p: PubSub, conn: Connection, @@ -381,54 +375,65 @@ method subscribePeer*(p: PubSub, peer: PeerID) {.base.} = proc updateTopicMetrics(p: PubSub, topic: string) = # metrics libp2p_pubsub_topics.set(p.topics.len.int64) + if p.knownTopics.contains(topic): - libp2p_pubsub_topic_handlers.set(p.topics[topic].handler.len.int64, labelValues = [topic]) + p.topics.withValue(topic, handlers) do: + libp2p_pubsub_topic_handlers.set(handlers[].len.int64, labelValues = [topic]) + do: + libp2p_pubsub_topic_handlers.set(0, labelValues = [topic]) else: - libp2p_pubsub_topic_handlers.set(0, labelValues = ["other"]) + var others: int64 = 0 for key, val in p.topics: - if not p.knownTopics.contains(key): - libp2p_pubsub_topic_handlers.inc(val.handler.len.int64, labelValues = ["other"]) + if key notin p.knownTopics: others += 1 -method unsubscribe*(p: PubSub, - topics: seq[TopicPair]) {.base.} = - ## unsubscribe from a list of ``topic`` strings - for t in topics: - let - handler = t.handler - ttopic = t.topic - closureScope: - p.topics.withValue(ttopic, topic): - topic[].handler.keepIf(proc (x: auto): bool = x != handler) + libp2p_pubsub_topic_handlers.set(others, labelValues = ["other"]) - if topic[].handler.len == 0: - # make sure we delete the topic if - # no more handlers are left - p.topics.del(ttopic) +method onTopicSubscription*(p: PubSub, topic: string, subscribed: bool) {.base.} = + # Called when subscribe is called the first time for a topic or unsubscribe + # removes the last handler - p.updateTopicMetrics(ttopic) + # Notify others that we are no longer interested in the topic + for _, peer in p.peers: + p.sendSubs(peer, [topic], subscribed) - libp2p_pubsub_unsubscriptions.inc() + if subscribed: + libp2p_pubsub_subscriptions.inc() + else: + libp2p_pubsub_unsubscriptions.inc() proc unsubscribe*(p: PubSub, topic: string, handler: TopicHandler) = ## unsubscribe from a ``topic`` string ## - p.unsubscribe(@[(topic, handler)]) + p.topics.withValue(topic, handlers): + handlers[].keepItIf(it != handler) -method unsubscribeAll*(p: PubSub, topic: string) {.base.} = + if handlers[].len() == 0: + p.topics.del(topic) + + p.onTopicSubscription(topic, false) + + p.updateTopicMetrics(topic) + +proc unsubscribe*(p: PubSub, topics: openArray[TopicPair]) = + ## unsubscribe from a list of ``topic`` handlers + for t in topics: + p.unsubscribe(t.topic, t.handler) + +proc unsubscribeAll*(p: PubSub, topic: string) = if topic notin p.topics: debug "unsubscribeAll called for an unknown topic", topic else: p.topics.del(topic) + p.onTopicSubscription(topic, false) + p.updateTopicMetrics(topic) - libp2p_pubsub_unsubscriptions.inc() - -method subscribe*(p: PubSub, - topic: string, - handler: TopicHandler) {.base.} = +proc subscribe*(p: PubSub, + topic: string, + handler: TopicHandler) = ## subscribe to a topic ## ## ``topic`` - a string topic to subscribe to @@ -437,19 +442,19 @@ method subscribe*(p: PubSub, ## that will be triggered ## on every received message ## - if topic notin p.topics: + + p.topics.withValue(topic, handlers) do: + # Already subscribed, just adding another handler + handlers[].add(handler) + do: trace "subscribing to topic", name = topic - p.topics[topic] = Topic(name: topic) + p.topics[topic] = @[handler] - p.topics[topic].handler.add(handler) - - for _, peer in p.peers: - p.sendSubs(peer, @[topic], true) + # Notify on first handler + p.onTopicSubscription(topic, true) p.updateTopicMetrics(topic) - libp2p_pubsub_subscriptions.inc() - method publish*(p: PubSub, topic: string, data: seq[byte]): Future[int] {.base, async.} = @@ -481,18 +486,17 @@ method addValidator*(p: PubSub, topic: varargs[string], hook: ValidatorHandler) {.base.} = for t in topic: - if t notin p.validators: - p.validators[t] = initHashSet[ValidatorHandler]() - trace "adding validator for topic", topicId = t - p.validators[t].incl(hook) + p.validators.mgetOrPut(t, HashSet[ValidatorHandler]()).incl(hook) method removeValidator*(p: PubSub, topic: varargs[string], hook: ValidatorHandler) {.base.} = for t in topic: - if t in p.validators: - p.validators[t].excl(hook) + p.validators.withValue(t, validators): + validators[].excl(hook) + if validators[].len() == 0: + p.validators.del(t) method validate*(p: PubSub, message: Message): Future[ValidationResult] {.async, base.} = var pending: seq[Future[ValidationResult]] diff --git a/libp2p/protocols/pubsub/pubsubpeer.nim b/libp2p/protocols/pubsub/pubsubpeer.nim index 66ebe5c4a..a8ed07aad 100644 --- a/libp2p/protocols/pubsub/pubsubpeer.nim +++ b/libp2p/protocols/pubsub/pubsubpeer.nim @@ -116,12 +116,14 @@ proc handle*(p: PubSubPeer, conn: Connection) {.async.} = while not conn.atEof: trace "waiting for data", conn, peer = p, closed = conn.closed - let data = await conn.readLp(64 * 1024) + var data = await conn.readLp(64 * 1024) trace "read data from peer", conn, peer = p, closed = conn.closed, data = data.shortLog var rmsg = decodeRpcMsg(data) + data = newSeq[byte]() # Release memory + if rmsg.isErr(): notice "failed to decode msg from peer", conn, peer = p, closed = conn.closed, @@ -204,20 +206,25 @@ proc connectImpl(p: PubSubPeer) {.async.} = proc connect*(p: PubSubPeer) = asyncSpawn connectImpl(p) -proc sendImpl(conn: Connection, encoded: seq[byte]) {.async.} = - try: - trace "sending encoded msgs to peer", conn, encoded = shortLog(encoded) - await conn.writeLp(encoded) - trace "sent pubsub message to remote", conn +proc sendImpl(conn: Connection, encoded: seq[byte]): Future[void] {.raises: [Defect].} = + trace "sending encoded msgs to peer", conn, encoded = shortLog(encoded) - except CatchableError as exc: # never cancelled - # Because we detach the send call from the currently executing task using - # asyncSpawn, no exceptions may leak out of it - trace "Unable to send to remote", conn, msg = exc.msg - # Next time sendConn is used, it will be have its close flag set and thus - # will be recycled + let fut = conn.writeLp(encoded) # Avoid copying `encoded` into future + proc sendWaiter(): Future[void] {.async.} = + try: + await fut + trace "sent pubsub message to remote", conn - await conn.close() # This will clean up the send connection + except CatchableError as exc: # never cancelled + # Because we detach the send call from the currently executing task using + # asyncSpawn, no exceptions may leak out of it + trace "Unable to send to remote", conn, msg = exc.msg + # Next time sendConn is used, it will be have its close flag set and thus + # will be recycled + + await conn.close() # This will clean up the send connection + + return sendWaiter() template sendMetrics(msg: RPCMsg): untyped = when defined(libp2p_expensive_metrics): @@ -240,10 +247,7 @@ proc sendEncoded*(p: PubSubPeer, msg: seq[byte]) {.raises: [Defect].} = # To limit the size of the closure, we only pass the encoded message and # connection to the spawned send task - asyncSpawn(try: - sendImpl(conn, msg) - except Exception as exc: # TODO chronos Exception - raiseAssert exc.msg) + asyncSpawn sendImpl(conn, msg) proc send*(p: PubSubPeer, msg: RPCMsg, anonymize: bool) {.raises: [Defect].} = trace "sending msg to peer", peer = p, rpcMsg = shortLog(msg) diff --git a/libp2p/stream/lpstream.nim b/libp2p/stream/lpstream.nim index 923e052d9..6bf0f856c 100644 --- a/libp2p/stream/lpstream.nim +++ b/libp2p/stream/lpstream.nim @@ -236,7 +236,7 @@ proc readLp*(s: LPStream, maxSize: int): Future[seq[byte]] {.async, gcsafe.} = await s.readExactly(addr res[0], res.len) return res -method write*(s: LPStream, msg: seq[byte]): Future[void] {.base.} = +method write*(s: LPStream, msg: seq[byte]): Future[void] {.base, raises: [Defect].} = doAssert(false, "not implemented!") proc writeLp*(s: LPStream, msg: openArray[byte]): Future[void] = diff --git a/tests/helpers.nim b/tests/helpers.nim index 29096ed27..390070439 100644 --- a/tests/helpers.nim +++ b/tests/helpers.nim @@ -87,7 +87,7 @@ template rng*(): ref BrHmacDrbgContext = getRng() type - WriteHandler* = proc(data: seq[byte]): Future[void] {.gcsafe.} + WriteHandler* = proc(data: seq[byte]): Future[void] {.gcsafe, raises: [Defect].} TestBufferStream* = ref object of BufferStream writeHandler*: WriteHandler