From 7f7594a675f920c756171377eb729bea117d4e9f Mon Sep 17 00:00:00 2001 From: Sergei Tikhomirov Date: Thu, 27 Feb 2025 17:51:39 +0100 Subject: [PATCH] refactor lightpush client --- waku/waku_lightpush/client.nim | 64 ++++++++++++++-------------------- 1 file changed, 26 insertions(+), 38 deletions(-) diff --git a/waku/waku_lightpush/client.nim b/waku/waku_lightpush/client.nim index e63411d3c..06f4da4df 100644 --- a/waku/waku_lightpush/client.nim +++ b/waku/waku_lightpush/client.nim @@ -19,8 +19,8 @@ logScope: type WakuLightPushClient* = ref object peerManager*: PeerManager - reputationManager*: ReputationManager rng*: ref rand.HmacDrbgContext + reputationManager*: ReputationManager publishObservers: seq[PublishObserver] proc new*( @@ -87,7 +87,8 @@ proc publish*( peer: RemotePeerInfo, ): Future[WakuLightPushResult[string]] {.async, gcsafe.} = ## On success, returns the msg_hash of the published message - let msg_hash_hex_str = computeMessageHash(pubsubTopic, message).to0xHex() + let msg_hash = computeMessageHash(pubsubTopic, message).to0xHex() + let pushRequest = PushRequest(pubSubTopic: pubSubTopic, message: message) ?await wl.sendPushRequest(pushRequest, peer) @@ -98,50 +99,37 @@ proc publish*( pubsubTopic = pubSubTopic, contentTopic = message.contentTopic, target_peer_id = peer.peerId, - msg_hash = msg_hash_hex_str + msg_hash = msg_hash - return ok(msg_hash_hex_str) + return ok(msg_hash) -proc selectPeerFromPeerManager( +proc selectPeerForLightPush*( wl: WakuLightPushClient ): Future[Result[RemotePeerInfo, string]] {.async, gcsafe.} = - let peer = wl.peerManager.selectPeer(WakuLightPushCodec, none(PubsubTopic)).valueOr: - return err("could not retrieve a peer supporting WakuLightPushCodec") - return ok(peer) + ## If reputation flag is defined, try to ensure the selected peer is not bad-rep. + ## Repeat peer selection until either maxAttempts is exceeded, + ## or a good-rep or neutral-rep peer is found. + ## Note: this procedure CAN return a bad-rep peer if maxAttempts is exceeded. + let maxAttempts = if defined(reputation): 10 else: 1 + var attempts = 0 + var peerResult: Result[RemotePeerInfo, string] + while attempts < maxAttempts: + let candidate = wl.peerManager.selectPeer(WakuLightPushCodec, none(PubsubTopic)).valueOr: + return err("could not retrieve a peer supporting WakuLightPushCodec") + if not (wl.reputationManager.getReputation(candidate.peerId) == some(false)): + return ok(candidate) + attempts += 1 + warn "Maximum reputation-based retries exceeded; continuing with a bad-reputation peer." + # Return last candidate even if it has bad reputation + return peerResult proc publishToAny*( wl: WakuLightPushClient, pubSubTopic: PubsubTopic, message: WakuMessage -): Future[WakuLightPushResult[void]] {.async, gcsafe.} = +): Future[WakuLightPushResult[string]] {.async, gcsafe.} = ## This proc is similar to the publish one but in this case ## we don't specify a particular peer and instead we get it from peer manager - info "publishToAny", msg_hash = computeMessageHash(pubsubTopic, message).to0xHex + info "publishToAny", msg_hash = computeMessageHash(pubSubTopic, message).to0xHex - var peer: RemotePeerInfo - - when defined(reputation): - const maxReputationAttempts = 10 - var attempts = 0 - while attempts < maxReputationAttempts: - let peerResult = await wl.selectPeerFromPeerManager() - if peerResult.isErr: - return err(peerResult.error) - peer = peerResult.get() - if not (wl.reputationManager.getReputation(peer.peerId) == some(false)): - break - attempts += 1 - if attempts >= maxReputationAttempts: - warn "Maximum reputation-based retries exceeded; continuing with a bad-reputation peer." - else: - let peerResult = await wl.selectPeerFromPeerManager() - if peerResult.isErr: - return err(peerResult.error) - peer = peerResult.get() - - let pushRequest = PushRequest(pubSubTopic: pubSubTopic, message: message) - ?await wl.sendPushRequest(pushRequest, peer) - - for obs in wl.publishObservers: - obs.onMessagePublished(pubSubTopic, message) - - return ok() + let peer = ?await wl.selectPeerForLightPush() + return await wl.publish(pubSubTopic, message, peer)