From e0cc40c1f7bf3f29b236c4175f8f291be7e9cf9d Mon Sep 17 00:00:00 2001 From: Ivan Folgueira Bande Date: Mon, 4 Mar 2024 09:39:43 +0100 Subject: [PATCH] pubsub, gossipsub, floodsub: undo changes that printed msg_id more intensively --- libp2p/protocols/pubsub/floodsub.nim | 26 ++++++++--------- libp2p/protocols/pubsub/gossipsub.nim | 41 ++++++++++++++------------- libp2p/protocols/pubsub/pubsub.nim | 11 ++++--- 3 files changed, 39 insertions(+), 39 deletions(-) diff --git a/libp2p/protocols/pubsub/floodsub.nim b/libp2p/protocols/pubsub/floodsub.nim index 914428bc0..819161c0c 100644 --- a/libp2p/protocols/pubsub/floodsub.nim +++ b/libp2p/protocols/pubsub/floodsub.nim @@ -183,20 +183,8 @@ method init*(f: FloodSub) = method publish*(f: FloodSub, topic: string, data: seq[byte]): Future[int] {.async.} = - let - msg = - if f.anonymize: - Message.init(none(PeerInfo), data, topic, none(uint64), false) - else: - inc f.msgSeqno - Message.init(some(f.peerInfo), data, topic, some(f.msgSeqno), f.sign) - msgId = f.msgIdProvider(msg).valueOr: - trace "Error generating message id, skipping publish", - error = error - return 0 - # base returns always 0 - discard await procCall PubSub(f).publish(topic, data, msgId) + discard await procCall PubSub(f).publish(topic, data) trace "Publishing message on topic", data = data.shortLog, topic @@ -210,6 +198,18 @@ method publish*(f: FloodSub, debug "No peers for topic, skipping publish", topic return 0 + let + msg = + if f.anonymize: + Message.init(none(PeerInfo), data, topic, none(uint64), false) + else: + inc f.msgSeqno + Message.init(some(f.peerInfo), data, topic, some(f.msgSeqno), f.sign) + msgId = f.msgIdProvider(msg).valueOr: + trace "Error generating message id, skipping publish", + error = error + return 0 + trace "Created new message", msg = shortLog(msg), peers = peers.len, topic, msgId diff --git a/libp2p/protocols/pubsub/gossipsub.nim b/libp2p/protocols/pubsub/gossipsub.nim index 948c2e198..e889fc096 100644 --- a/libp2p/protocols/pubsub/gossipsub.nim +++ b/libp2p/protocols/pubsub/gossipsub.nim @@ -391,7 +391,7 @@ proc validateAndRelay(g: GossipSub, else: libp2p_pubsub_messages_rebroadcasted.inc(toSendPeers.len.int64, labelValues = ["generic"]) - await handleData(g, topic, msg.data, msgId) + await handleData(g, topic, msg.data) except CatchableError as exc: info "validateAndRelay failed", msg=exc.msg @@ -584,25 +584,7 @@ method publish*(g: GossipSub, topic: string, data: seq[byte]): Future[int] {.async.} = # base returns always 0 - let - msg = - if g.anonymize: - Message.init(none(PeerInfo), data, topic, none(uint64), false) - else: - inc g.msgSeqno - Message.init(some(g.peerInfo), data, topic, some(g.msgSeqno), g.sign) - msg_id = g.msgIdProvider(msg).valueOr: - trace "Error generating message id, skipping publish", - error = error - libp2p_gossipsub_failed_publish.inc() - return 0 - msg_hash = g.msgHashProvider(topic, data).valueOr: - trace "Error generating message hash, skipping publish", - error = error - libp2p_gossipsub_failed_publish.inc() - return 0 - - discard await procCall PubSub(g).publish(topic, data, msg_id) + discard await procCall PubSub(g).publish(topic, data) logScope: topic @@ -666,6 +648,25 @@ method publish*(g: GossipSub, topic libp2p_gossipsub_failed_publish.inc() return 0 + + let + msg = + if g.anonymize: + Message.init(none(PeerInfo), data, topic, none(uint64), false) + else: + inc g.msgSeqno + Message.init(some(g.peerInfo), data, topic, some(g.msgSeqno), g.sign) + msg_id = g.msgIdProvider(msg).valueOr: + trace "Error generating message id, skipping publish", + error = error + libp2p_gossipsub_failed_publish.inc() + return 0 + msg_hash = g.msgHashProvider(topic, data).valueOr: + trace "Error generating message hash, skipping publish", + error = error + libp2p_gossipsub_failed_publish.inc() + return 0 + logScope: msg_id = shortLog(msg_id) msg_hash = msg_hash diff --git a/libp2p/protocols/pubsub/pubsub.nim b/libp2p/protocols/pubsub/pubsub.nim index 016fc7e24..0bfeef014 100644 --- a/libp2p/protocols/pubsub/pubsub.nim +++ b/libp2p/protocols/pubsub/pubsub.nim @@ -84,7 +84,7 @@ type InitializationError* = object of LPError TopicHandler* {.public.} = proc(topic: string, - data: seq[byte], msgId: seq[byte]): Future[void] {.gcsafe, raises: [].} + data: seq[byte]): Future[void] {.gcsafe, raises: [].} ValidatorHandler* {.public.} = proc(topic: string, message: Message): Future[ValidationResult] {.gcsafe, raises: [].} @@ -314,7 +314,7 @@ method getOrCreatePeer*( return pubSubPeer -proc handleData*(p: PubSub, topic: string, data: seq[byte], msgId = newSeq[byte]()): Future[void] = +proc handleData*(p: PubSub, topic: string, data: seq[byte]): Future[void] = # Start work on all data handlers without copying data into closure like # happens on {.async.} transformation p.topics.withValue(topic, handlers) do: @@ -322,7 +322,7 @@ proc handleData*(p: PubSub, topic: string, data: seq[byte], msgId = newSeq[byte] for handler in handlers[]: if handler != nil: # allow nil handlers - let fut = handler(topic, data, msgId) + let fut = handler(topic, data) if not fut.completed(): # Fast path for successful sync handlers futs.add(fut) @@ -482,8 +482,7 @@ proc subscribe*(p: PubSub, method publish*(p: PubSub, topic: string, - data: seq[byte], - msgId: seq[byte]): Future[int] {.base, async, public.} = + data: seq[byte]): Future[int] {.base, async, public.} = ## publish to a ``topic`` ## ## The return value is the number of neighbours that we attempted to send the @@ -491,7 +490,7 @@ method publish*(p: PubSub, ## attempts - the number of peers that actually receive the message might ## be lower. if p.triggerSelf: - await handleData(p, topic, data, msgId) + await handleData(p, topic, data) return 0