From b8cdd3390b73fd3b02a4b9d6ca0b1c031de2c36f Mon Sep 17 00:00:00 2001 From: Sergei Tikhomirov Date: Wed, 26 Feb 2025 16:30:15 +0100 Subject: [PATCH] integrate reputation manager into Lightpush behind a feature flag --- tests/incentivization/test_poc_reputation.nim | 2 +- tests/waku_lightpush/lightpush_utils.nim | 6 ++- waku/incentivization/reputation_manager.nim | 13 +++-- waku/waku_lightpush/client.nim | 47 ++++++++++++++++--- 4 files changed, 52 insertions(+), 16 deletions(-) diff --git a/tests/incentivization/test_poc_reputation.nim b/tests/incentivization/test_poc_reputation.nim index b35c4b92f..5c0bdca0c 100644 --- a/tests/incentivization/test_poc_reputation.nim +++ b/tests/incentivization/test_poc_reputation.nim @@ -46,7 +46,7 @@ suite "Waku Incentivization PoC Reputation": let peerId = "peerWithInvalidResponse" let invalidResp = PushResponse(isSuccess: false, info: none(string)) manager.updateReputationFromResponse(peerId, invalidResp) - check manager.getReputation(peerId) == some(false) + check manager.getReputation(peerId) == some(false) test "incentivization PoC: reputation: default is None": let unknownPeerId = "unknown_peer" diff --git a/tests/waku_lightpush/lightpush_utils.nim b/tests/waku_lightpush/lightpush_utils.nim index 45bbe125c..a2a13a415 100644 --- a/tests/waku_lightpush/lightpush_utils.nim +++ b/tests/waku_lightpush/lightpush_utils.nim @@ -8,7 +8,8 @@ import waku/waku_lightpush, waku/waku_lightpush/[client, common], waku/common/rate_limit/setting, - ../testlib/[common, wakucore] + ../testlib/[common, wakucore], + waku/incentivization/reputation_manager proc newTestWakuLightpushNode*( switch: Switch, @@ -26,4 +27,5 @@ proc newTestWakuLightpushNode*( proc newTestWakuLightpushClient*(switch: Switch): WakuLightPushClient = let peerManager = PeerManager.new(switch) - WakuLightPushClient.new(peerManager, rng) + let reputationManager = ReputationManager.new() + WakuLightPushClient.new(peerManager, reputationManager, rng) diff --git a/waku/incentivization/reputation_manager.nim b/waku/incentivization/reputation_manager.nim index d5097b711..95de0a8cb 100644 --- a/waku/incentivization/reputation_manager.nim +++ b/waku/incentivization/reputation_manager.nim @@ -1,9 +1,8 @@ import tables, std/options import waku/waku_lightpush/rpc +import libp2p/peerid type - PeerId = string - ResponseQuality* = enum BadResponse GoodResponse @@ -13,17 +12,17 @@ type # some(false) => BadRep # none(bool) => unknown / not set ReputationManager* = ref object - reputationOf*: Table[PeerId, Option[bool]] + reputationOf*: Table[PeerID, Option[bool]] proc init*(T: type ReputationManager): ReputationManager = - return ReputationManager(reputationOf: initTable[PeerId, Option[bool]]()) + return ReputationManager(reputationOf: initTable[PeerID, Option[bool]]()) proc setReputation*( - manager: var ReputationManager, peer: PeerId, repValue: Option[bool] + manager: var ReputationManager, peer: PeerID, repValue: Option[bool] ) = manager.reputationOf[peer] = repValue -proc getReputation*(manager: ReputationManager, peer: PeerId): Option[bool] = +proc getReputation*(manager: ReputationManager, peer: PeerID): Option[bool] = if peer in manager.reputationOf: result = manager.reputationOf[peer] else: @@ -38,7 +37,7 @@ proc evaluateResponse*(response: PushResponse): ResponseQuality = # Update reputation of the peer based on the quality of the response proc updateReputationFromResponse*( - manager: var ReputationManager, peer: PeerId, response: PushResponse + manager: var ReputationManager, peer: PeerID, response: PushResponse ) = let respQuality = evaluateResponse(response) case respQuality diff --git a/waku/waku_lightpush/client.nim b/waku/waku_lightpush/client.nim index 3e20bf9e3..66e9fa856 100644 --- a/waku/waku_lightpush/client.nim +++ b/waku/waku_lightpush/client.nim @@ -11,20 +11,27 @@ import ./common, ./protocol_metrics, ./rpc, - ./rpc_codec + ./rpc_codec, + ../incentivization/reputation_manager logScope: topics = "waku lightpush client" type WakuLightPushClient* = ref object peerManager*: PeerManager + reputationManager*: ReputationManager rng*: ref rand.HmacDrbgContext publishObservers: seq[PublishObserver] proc new*( - T: type WakuLightPushClient, peerManager: PeerManager, rng: ref rand.HmacDrbgContext + T: type WakuLightPushClient, + peerManager: PeerManager, + reputationManager: ReputationManager, + rng: ref rand.HmacDrbgContext, ): T = - WakuLightPushClient(peerManager: peerManager, rng: rng) + WakuLightPushClient( + peerManager: peerManager, reputationManager: reputationManager, rng: rng + ) proc addPublishObserver*(wl: WakuLightPushClient, obs: PublishObserver) = wl.publishObservers.add(obs) @@ -65,6 +72,9 @@ proc sendPushRequest( else: return err("unknown failure") + when defined(reputation): + wl.reputationManager.updateReputationFromResponse(peer.peerId, response) + return ok() proc publish*( @@ -82,13 +92,20 @@ proc publish*( obs.onMessagePublished(pubSubTopic, message) notice "publishing message with lightpush", - pubsubTopic = pubsubTopic, + pubsubTopic = pubSubTopic, contentTopic = message.contentTopic, target_peer_id = peer.peerId, msg_hash = msg_hash_hex_str return ok(msg_hash_hex_str) +proc selectPeerFromPeerManager( + wl: WakuLightPushClient +): Future[Result[RemotePeerInfo, string]] {.async, gcsafe.} = + let peer = wl.peerManager.selectPeer(WakuLightPushCodec, none(PubsubTopic)).valueOr: + return err("could not retrieve a peer supporting WakuLightPushCodec") + return ok(peer) + proc publishToAny*( wl: WakuLightPushClient, pubSubTopic: PubsubTopic, message: WakuMessage ): Future[WakuLightPushResult[void]] {.async, gcsafe.} = @@ -97,8 +114,26 @@ proc publishToAny*( info "publishToAny", msg_hash = computeMessageHash(pubsubTopic, message).to0xHex - let peer = wl.peerManager.selectPeer(WakuLightPushCodec).valueOr: - return err("could not retrieve a peer supporting WakuLightPushCodec") + var peer: RemotePeerInfo + + when defined(reputation): + const maxReputationAttempts = 10 + var attempts = 0 + while attempts < maxReputationAttempts: + let peerResult = await wl.selectPeerFromPeerManager() + if peerResult.isErr: + return err(peerResult.error) + peer = peerResult.get() + if not (wl.reputationManager.getReputation(peer.peerId) == some(false)): + break + attempts += 1 + if attempts >= maxReputationAttempts: + warn "Maximum reputation-based retries exceeded; continuing with a bad-reputation peer." + else: + let peerResult = await wl.selectPeerFromPeerManager() + if peerResult.isErr: + return err(peerResult.error) + peer = peerResult.get() let pushRequest = PushRequest(pubSubTopic: pubSubTopic, message: message) ?await wl.sendPushRequest(pushRequest, peer)