pubsub, gossipsub, floodsub: undo changes that printed msg_id more intensively

This commit is contained in:
Ivan Folgueira Bande 2024-03-04 09:39:43 +01:00
parent 34043dbfe7
commit e0cc40c1f7
No known key found for this signature in database
GPG Key ID: 3C117481F89E24A7
3 changed files with 39 additions and 39 deletions

View File

@ -183,20 +183,8 @@ method init*(f: FloodSub) =
method publish*(f: FloodSub,
topic: string,
data: seq[byte]): Future[int] {.async.} =
let
msg =
if f.anonymize:
Message.init(none(PeerInfo), data, topic, none(uint64), false)
else:
inc f.msgSeqno
Message.init(some(f.peerInfo), data, topic, some(f.msgSeqno), f.sign)
msgId = f.msgIdProvider(msg).valueOr:
trace "Error generating message id, skipping publish",
error = error
return 0
# base returns always 0
discard await procCall PubSub(f).publish(topic, data, msgId)
discard await procCall PubSub(f).publish(topic, data)
trace "Publishing message on topic", data = data.shortLog, topic
@ -210,6 +198,18 @@ method publish*(f: FloodSub,
debug "No peers for topic, skipping publish", topic
return 0
let
msg =
if f.anonymize:
Message.init(none(PeerInfo), data, topic, none(uint64), false)
else:
inc f.msgSeqno
Message.init(some(f.peerInfo), data, topic, some(f.msgSeqno), f.sign)
msgId = f.msgIdProvider(msg).valueOr:
trace "Error generating message id, skipping publish",
error = error
return 0
trace "Created new message",
msg = shortLog(msg), peers = peers.len, topic, msgId

View File

@ -391,7 +391,7 @@ proc validateAndRelay(g: GossipSub,
else:
libp2p_pubsub_messages_rebroadcasted.inc(toSendPeers.len.int64, labelValues = ["generic"])
await handleData(g, topic, msg.data, msgId)
await handleData(g, topic, msg.data)
except CatchableError as exc:
info "validateAndRelay failed", msg=exc.msg
@ -584,25 +584,7 @@ method publish*(g: GossipSub,
topic: string,
data: seq[byte]): Future[int] {.async.} =
# base returns always 0
let
msg =
if g.anonymize:
Message.init(none(PeerInfo), data, topic, none(uint64), false)
else:
inc g.msgSeqno
Message.init(some(g.peerInfo), data, topic, some(g.msgSeqno), g.sign)
msg_id = g.msgIdProvider(msg).valueOr:
trace "Error generating message id, skipping publish",
error = error
libp2p_gossipsub_failed_publish.inc()
return 0
msg_hash = g.msgHashProvider(topic, data).valueOr:
trace "Error generating message hash, skipping publish",
error = error
libp2p_gossipsub_failed_publish.inc()
return 0
discard await procCall PubSub(g).publish(topic, data, msg_id)
discard await procCall PubSub(g).publish(topic, data)
logScope:
topic
@ -666,6 +648,25 @@ method publish*(g: GossipSub,
topic
libp2p_gossipsub_failed_publish.inc()
return 0
let
msg =
if g.anonymize:
Message.init(none(PeerInfo), data, topic, none(uint64), false)
else:
inc g.msgSeqno
Message.init(some(g.peerInfo), data, topic, some(g.msgSeqno), g.sign)
msg_id = g.msgIdProvider(msg).valueOr:
trace "Error generating message id, skipping publish",
error = error
libp2p_gossipsub_failed_publish.inc()
return 0
msg_hash = g.msgHashProvider(topic, data).valueOr:
trace "Error generating message hash, skipping publish",
error = error
libp2p_gossipsub_failed_publish.inc()
return 0
logScope:
msg_id = shortLog(msg_id)
msg_hash = msg_hash

View File

@ -84,7 +84,7 @@ type
InitializationError* = object of LPError
TopicHandler* {.public.} = proc(topic: string,
data: seq[byte], msgId: seq[byte]): Future[void] {.gcsafe, raises: [].}
data: seq[byte]): Future[void] {.gcsafe, raises: [].}
ValidatorHandler* {.public.} = proc(topic: string,
message: Message): Future[ValidationResult] {.gcsafe, raises: [].}
@ -314,7 +314,7 @@ method getOrCreatePeer*(
return pubSubPeer
proc handleData*(p: PubSub, topic: string, data: seq[byte], msgId = newSeq[byte]()): Future[void] =
proc handleData*(p: PubSub, topic: string, data: seq[byte]): Future[void] =
# Start work on all data handlers without copying data into closure like
# happens on {.async.} transformation
p.topics.withValue(topic, handlers) do:
@ -322,7 +322,7 @@ proc handleData*(p: PubSub, topic: string, data: seq[byte], msgId = newSeq[byte]
for handler in handlers[]:
if handler != nil: # allow nil handlers
let fut = handler(topic, data, msgId)
let fut = handler(topic, data)
if not fut.completed(): # Fast path for successful sync handlers
futs.add(fut)
@ -482,8 +482,7 @@ proc subscribe*(p: PubSub,
method publish*(p: PubSub,
topic: string,
data: seq[byte],
msgId: seq[byte]): Future[int] {.base, async, public.} =
data: seq[byte]): Future[int] {.base, async, public.} =
## publish to a ``topic``
##
## The return value is the number of neighbours that we attempted to send the
@ -491,7 +490,7 @@ method publish*(p: PubSub,
## attempts - the number of peers that actually receive the message might
## be lower.
if p.triggerSelf:
await handleData(p, topic, data, msgId)
await handleData(p, topic, data)
return 0