From 8368c3f1dcb79bc65921be077ff404c779a3ec1a Mon Sep 17 00:00:00 2001 From: NagyZoltanPeter <113987313+NagyZoltanPeter@users.noreply.github.com> Date: Thu, 20 Feb 2025 09:08:51 +0100 Subject: [PATCH] simplify code upon review findings. --- libp2p/protocols/pubsub/errors.nim | 13 ------------- libp2p/protocols/pubsub/floodsub.nim | 4 ++-- libp2p/protocols/pubsub/gossipsub.nim | 4 ++-- libp2p/protocols/pubsub/pubsub.nim | 9 ++++++--- libp2p/protocols/pubsub/pubsubpeer.nim | 25 +++++++++++-------------- 5 files changed, 21 insertions(+), 34 deletions(-) diff --git a/libp2p/protocols/pubsub/errors.nim b/libp2p/protocols/pubsub/errors.nim index 9f28aa151..c200c7f84 100644 --- a/libp2p/protocols/pubsub/errors.nim +++ b/libp2p/protocols/pubsub/errors.nim @@ -13,16 +13,3 @@ type PublishOutcome* {.pure, public.} = enum DuplicateMessage NoPeersToPublish CannotGenerateMessageId - -proc `$`*(publishOutcome: PublishOutcome): string = - case publishOutcome - of NoTopicSpecified: - return "NoTopicSpecified" - of DuplicateMessage: - return "DuplicateMessage" - of NoPeersToPublish: - return "NoPeersToPublish" - of CannotGenerateMessageId: - return "CannotGenerateMessageId" - else: - return "unknown" diff --git a/libp2p/protocols/pubsub/floodsub.nim b/libp2p/protocols/pubsub/floodsub.nim index f975c0957..1387b49ec 100644 --- a/libp2p/protocols/pubsub/floodsub.nim +++ b/libp2p/protocols/pubsub/floodsub.nim @@ -195,7 +195,7 @@ method init*(f: FloodSub) = method doPublish*( f: FloodSub, topic: string, data: seq[byte] -): Future[Result[int, PublishOutcome]] {.async: (raises: []).} = +): Future[PublishResult] {.async: (raises: []).} = # base returns always 0 discard await procCall PubSub(f).doPublish(topic, data) @@ -212,7 +212,7 @@ method doPublish*( return err(NoPeersToPublish) let (msg, msgId) = f.createMessage(topic, data).valueOr: - trace "Error generating message id, skipping publish", error = error + trace "Error creating message, skipping publish", error = error return err(CannotGenerateMessageId) trace "Created new message", payload = shortLog(msg), peers = peers.len, topic, msgId diff --git a/libp2p/protocols/pubsub/gossipsub.nim b/libp2p/protocols/pubsub/gossipsub.nim index b731566c7..747c3f2ad 100644 --- a/libp2p/protocols/pubsub/gossipsub.nim +++ b/libp2p/protocols/pubsub/gossipsub.nim @@ -765,7 +765,7 @@ proc collectPeersForPublish( method doPublish*( g: GossipSub, topic: string, data: seq[byte] -): Future[Result[int, PublishOutcome]] {.async: (raises: []).} = +): Future[PublishResult] {.async: (raises: []).} = logScope: topic @@ -788,7 +788,7 @@ method doPublish*( return err(NoPeersToPublish) let (msg, msgId) = g.createMessage(topic, data).valueOr: - trace "Error generating message id, skipping publish", error = error + error "Error creating message, skipping publish", error = error return err(CannotGenerateMessageId) logScope: diff --git a/libp2p/protocols/pubsub/pubsub.nim b/libp2p/protocols/pubsub/pubsub.nim index 831ca18d0..6f3470aeb 100644 --- a/libp2p/protocols/pubsub/pubsub.nim +++ b/libp2p/protocols/pubsub/pubsub.nim @@ -145,6 +145,8 @@ type ## we have to store it, which may be an attack vector. ## This callback can be used to reject topic we're not interested in + PublishResult* {.public.} = Result[int, PublishOutcome] + PubSub* {.public.} = ref object of LPProtocol switch*: Switch # the switch used to dial/connect to peers peerInfo*: PeerInfo # this peer's info @@ -555,7 +557,7 @@ proc subscribe*(p: PubSub, topic: string, handler: TopicHandler) {.public.} = method createMessage*( p: PubSub, topic: string, data: seq[byte] -): Result[(Message, MessageId), ValidationResult] {.base, gcsafe, raises: [].} = +): Result[(Message, MessageId), string] {.base, gcsafe, raises: [].} = let msg = if p.anonymize: @@ -563,7 +565,8 @@ method createMessage*( else: inc p.msgSeqno Message.init(some(p.peerInfo), data, topic, some(p.msgSeqno), p.sign) - msgId = ?p.msgIdProvider(msg) + msgId = p.msgIdProvider(msg).valueOr: + return err("Failed to generate message id") return ok((msg, msgId)) @@ -572,7 +575,7 @@ method createMessage*( # but call `publish`. method doPublish*( p: PubSub, topic: string, data: seq[byte] -): Future[Result[int, PublishOutcome]] {.base, async: (raises: []).} = +): Future[PublishResult] {.base, async: (raises: []).} = ## publish to a ``topic`` ## ## The return value is the number of neighbours that we attempted to send the diff --git a/libp2p/protocols/pubsub/pubsubpeer.nim b/libp2p/protocols/pubsub/pubsubpeer.nim index a6fa85b7c..7eaf0d387 100644 --- a/libp2p/protocols/pubsub/pubsubpeer.nim +++ b/libp2p/protocols/pubsub/pubsubpeer.nim @@ -179,34 +179,31 @@ proc hasBeforeSendObservers*(p: PubSubPeer): bool = proc recvObservers*(p: PubSubPeer, msg: var RPCMsg) = # trigger hooks - if not (isNil(p.observers)) and p.observers[].len > 0: + if not (isNil(p.observers)): for obs in p.observers[]: - if not (isNil(obs)): # TODO: should never be nil, but... - if not (isNil(obs.onRecv)): - obs.onRecv(p, msg) + if not (isNil(obs)) and not (isNil(obs.onRecv)): + obs.onRecv(p, msg) proc validatedObservers*(p: PubSubPeer, msg: Message, msgId: MessageId) = # trigger hooks - if not (isNil(p.observers)) and p.observers[].len > 0: + if not (isNil(p.observers)): for obs in p.observers[]: - if not (isNil(obs.onValidated)): + if not (isNil(obs)) and not (isNil(obs.onValidated)): obs.onValidated(p, msg, msgId) proc beforeSendObservers(p: PubSubPeer, msg: var RPCMsg) = # trigger hooks - if not (isNil(p.observers)) and p.observers[].len > 0: + if not (isNil(p.observers)): for obs in p.observers[]: - if not (isNil(obs)): # TODO: should never be nil, but... - if not (isNil(obs.onBeforeSend)): - obs.onBeforeSend(p, msg) + if not (isNil(obs)) and not (isNil(obs.onBeforeSend)): + obs.onBeforeSend(p, msg) proc afterSentObservers(p: PubSubPeer, msg: RPCMsg) = # trigger hooks - if not (isNil(p.observers)) and p.observers[].len > 0: + if not (isNil(p.observers)): for obs in p.observers[]: - if not (isNil(obs)): # TODO: should never be nil, but... - if not (isNil(obs.onAfterSent)): - obs.onAfterSent(p, msg) + if not (isNil(obs)) and not (isNil(obs.onAfterSent)): + obs.onAfterSent(p, msg) proc handle*(p: PubSubPeer, conn: Connection) {.async: (raises: []).} = debug "starting pubsub read loop", conn, peer = p, closed = conn.closed