refactor lightpush client

This commit is contained in:
Sergei Tikhomirov 2025-02-27 17:51:39 +01:00
parent ef67d542ab
commit 7f7594a675

View File

@ -19,8 +19,8 @@ logScope:
type WakuLightPushClient* = ref object
peerManager*: PeerManager
reputationManager*: ReputationManager
rng*: ref rand.HmacDrbgContext
reputationManager*: ReputationManager
publishObservers: seq[PublishObserver]
proc new*(
@ -87,7 +87,8 @@ proc publish*(
peer: RemotePeerInfo,
): Future[WakuLightPushResult[string]] {.async, gcsafe.} =
## On success, returns the msg_hash of the published message
let msg_hash_hex_str = computeMessageHash(pubsubTopic, message).to0xHex()
let msg_hash = computeMessageHash(pubsubTopic, message).to0xHex()
let pushRequest = PushRequest(pubSubTopic: pubSubTopic, message: message)
?await wl.sendPushRequest(pushRequest, peer)
@ -98,50 +99,37 @@ proc publish*(
pubsubTopic = pubSubTopic,
contentTopic = message.contentTopic,
target_peer_id = peer.peerId,
msg_hash = msg_hash_hex_str
msg_hash = msg_hash
return ok(msg_hash_hex_str)
return ok(msg_hash)
proc selectPeerFromPeerManager(
proc selectPeerForLightPush*(
wl: WakuLightPushClient
): Future[Result[RemotePeerInfo, string]] {.async, gcsafe.} =
let peer = wl.peerManager.selectPeer(WakuLightPushCodec, none(PubsubTopic)).valueOr:
return err("could not retrieve a peer supporting WakuLightPushCodec")
return ok(peer)
## If reputation flag is defined, try to ensure the selected peer is not bad-rep.
## Repeat peer selection until either maxAttempts is exceeded,
## or a good-rep or neutral-rep peer is found.
## Note: this procedure CAN return a bad-rep peer if maxAttempts is exceeded.
let maxAttempts = if defined(reputation): 10 else: 1
var attempts = 0
var peerResult: Result[RemotePeerInfo, string]
while attempts < maxAttempts:
let candidate = wl.peerManager.selectPeer(WakuLightPushCodec, none(PubsubTopic)).valueOr:
return err("could not retrieve a peer supporting WakuLightPushCodec")
if not (wl.reputationManager.getReputation(candidate.peerId) == some(false)):
return ok(candidate)
attempts += 1
warn "Maximum reputation-based retries exceeded; continuing with a bad-reputation peer."
# Return last candidate even if it has bad reputation
return peerResult
proc publishToAny*(
wl: WakuLightPushClient, pubSubTopic: PubsubTopic, message: WakuMessage
): Future[WakuLightPushResult[void]] {.async, gcsafe.} =
): Future[WakuLightPushResult[string]] {.async, gcsafe.} =
## This proc is similar to the publish one but in this case
## we don't specify a particular peer and instead we get it from peer manager
info "publishToAny", msg_hash = computeMessageHash(pubsubTopic, message).to0xHex
info "publishToAny", msg_hash = computeMessageHash(pubSubTopic, message).to0xHex
var peer: RemotePeerInfo
when defined(reputation):
const maxReputationAttempts = 10
var attempts = 0
while attempts < maxReputationAttempts:
let peerResult = await wl.selectPeerFromPeerManager()
if peerResult.isErr:
return err(peerResult.error)
peer = peerResult.get()
if not (wl.reputationManager.getReputation(peer.peerId) == some(false)):
break
attempts += 1
if attempts >= maxReputationAttempts:
warn "Maximum reputation-based retries exceeded; continuing with a bad-reputation peer."
else:
let peerResult = await wl.selectPeerFromPeerManager()
if peerResult.isErr:
return err(peerResult.error)
peer = peerResult.get()
let pushRequest = PushRequest(pubSubTopic: pubSubTopic, message: message)
?await wl.sendPushRequest(pushRequest, peer)
for obs in wl.publishObservers:
obs.onMessagePublished(pubSubTopic, message)
return ok()
let peer = ?await wl.selectPeerForLightPush()
return await wl.publish(pubSubTopic, message, peer)