From b1d646c6e0abd3eb039fddc1e202bdd00f34630c Mon Sep 17 00:00:00 2001 From: Sergei Tikhomirov Date: Fri, 14 Mar 2025 11:16:23 +0100 Subject: [PATCH] feat: introduce reputation in PeerManager (#3322) * chore: remove unnecessary comment * move reputation manager logic to peer manager * remove peer selection from lightpush client * chore: lint fix --- tests/incentivization/test_poc_reputation.nim | 10 ---- tests/test_peer_manager.nim | 55 +++++++++++++------ tests/waku_lightpush/lightpush_utils.nim | 7 +-- tests/waku_lightpush/test_client.nim | 41 ++------------ waku/incentivization/reputation_manager.nim | 4 +- waku/node/peer_manager/peer_manager.nim | 29 +++++++++- waku/node/waku_node.nim | 3 +- waku/waku_lightpush/client.nim | 54 ++++-------------- 8 files changed, 86 insertions(+), 117 deletions(-) diff --git a/tests/incentivization/test_poc_reputation.nim b/tests/incentivization/test_poc_reputation.nim index ad1fa3bae..6e21330a2 100644 --- a/tests/incentivization/test_poc_reputation.nim +++ b/tests/incentivization/test_poc_reputation.nim @@ -29,16 +29,6 @@ suite "Waku Incentivization PoC Reputation": manager.setReputation(peerId1, some(true)) # Encodes GoodRep check manager.getReputation(peerId1) == some(true) - #[ - LightPushResponse* = object - requestId*: string - statusCode*: uint32 - statusDesc*: Option[string] - relayPeerCount*: Option[uint32] - - LightpushStatusCode - ]# - test "incentivization PoC: reputation: evaluate LightPushResponse valid": let validLightLightPushResponse = LightPushResponse(requestId: "", statusCode: LightpushStatusCode.SUCCESS.uint32) diff --git a/tests/test_peer_manager.nim b/tests/test_peer_manager.nim index 4fd148b81..46edfc447 100644 --- a/tests/test_peer_manager.nim +++ b/tests/test_peer_manager.nim @@ -30,6 +30,7 @@ import waku_lightpush/common, waku_peer_exchange, waku_metadata, + incentivization/reputation_manager, ], ./testlib/common, ./testlib/testutils, @@ -902,39 +903,57 @@ procSuite "Peer Manager": # Add a peer[0] to the peerstore pm.wakuPeerStore[AddressBook][peers[0].peerId] = peers[0].addrs pm.wakuPeerStore[ProtoBook][peers[0].peerId] = - @[WakuRelayCodec, WakuStoreCodec, WakuFilterSubscribeCodec] + @[WakuRelayCodec, WakuStoreCodec, WakuFilterSubscribeCodec, WakuLightPushCodec] # When no service peers, we get one from the peerstore - let selectedPeer1 = pm.selectPeer(WakuStoreCodec) + let selectedPeerWakuStore = pm.selectPeer(WakuStoreCodec) check: - selectedPeer1.isSome() == true - selectedPeer1.get().peerId == peers[0].peerId + selectedPeerWakuStore.isSome() == true + selectedPeerWakuStore.get().peerId == peers[0].peerId # Same for other protocol - let selectedPeer2 = pm.selectPeer(WakuFilterSubscribeCodec) + let selectedPeerWakuFilter = pm.selectPeer(WakuFilterSubscribeCodec) check: - selectedPeer2.isSome() == true - selectedPeer2.get().peerId == peers[0].peerId + selectedPeerWakuFilter.isSome() == true + selectedPeerWakuFilter.get().peerId == peers[0].peerId # And return none if we dont have any peer for that protocol - let selectedPeer3 = pm.selectPeer(WakuLegacyLightPushCodec) + let selectedPeerWakuLegacyLightpush = pm.selectPeer(WakuLegacyLightPushCodec) check: - selectedPeer3.isSome() == false + selectedPeerWakuLegacyLightpush.isSome() == false - # Now we add service peers for different protocols peer[1..3] - pm.addServicePeer(peers[1], WakuStoreCodec) - pm.addServicePeer(peers[2], WakuLegacyLightPushCodec) + # Reputation: if no reputation is set (neutral-rep), return a peer + let selectedPeerWakuLightpush = pm.selectPeer(WakuLightPushCodec) + check: + selectedPeerWakuLightpush.isSome() == true + # Reputation: avoid negative-reputation peers + if pm.reputationManager.isSome(): + var rm = pm.reputationManager.get() + # assign negative reputation to the peer + rm.setReputation(selectedPeerWakuLightpush.get().peerId, some(false)) + # the peer is not selected because of negative reputation + check: + pm.selectPeer(WakuLightPushCodec).isNone() + # revert reputation to neutral + rm.setReputation(selectedPeerWakuLightpush.get().peerId, none(bool)) + # the peer is selected again + check: + pm.selectPeer(WakuLightPushCodec).isSome() + + # Now we add service peers for different protocols # We no longer get one from the peerstore. Slots are being used instead. - let selectedPeer4 = pm.selectPeer(WakuStoreCodec) + pm.addServicePeer(peers[1], WakuStoreCodec) + let selectedPeerWakuStoreSlotted = pm.selectPeer(WakuStoreCodec) check: - selectedPeer4.isSome() == true - selectedPeer4.get().peerId == peers[1].peerId + selectedPeerWakuStoreSlotted.isSome() == true + selectedPeerWakuStoreSlotted.get().peerId == peers[1].peerId - let selectedPeer5 = pm.selectPeer(WakuLegacyLightPushCodec) + pm.addServicePeer(peers[2], WakuLegacyLightPushCodec) + let selectedPeerWakuLegacyLightpushSlotted = pm.selectPeer(WakuLegacyLightPushCodec) check: - selectedPeer5.isSome() == true - selectedPeer5.get().peerId == peers[2].peerId + selectedPeerWakuLegacyLightpushSlotted.isSome() == true + selectedPeerWakuLegacyLightpushSlotted.get().peerId == peers[2].peerId test "peer manager cant have more max connections than peerstore size": # Peerstore size can't be smaller than max connections diff --git a/tests/waku_lightpush/lightpush_utils.nim b/tests/waku_lightpush/lightpush_utils.nim index f3e0d8999..312ac3f03 100644 --- a/tests/waku_lightpush/lightpush_utils.nim +++ b/tests/waku_lightpush/lightpush_utils.nim @@ -28,8 +28,7 @@ proc newTestWakuLightpushNode*( return proto proc newTestWakuLightpushClient*( - switch: Switch, - reputationEnabled: bool = false + switch: Switch, reputationEnabled: bool = false ): WakuLightPushClient = - let peerManager = PeerManager.new(switch) - WakuLightPushClient.new(peerManager, rng, reputationEnabled) + let peerManager = PeerManager.new(switch, reputationEnabled = reputationEnabled) + WakuLightPushClient.new(peerManager, rng) diff --git a/tests/waku_lightpush/test_client.nim b/tests/waku_lightpush/test_client.nim index f3c0d4a23..3499ae9e4 100644 --- a/tests/waku_lightpush/test_client.nim +++ b/tests/waku_lightpush/test_client.nim @@ -43,6 +43,9 @@ suite "Waku Lightpush Client": contentTopic {.threadvar.}: ContentTopic message {.threadvar.}: WakuMessage + # Use reputation manager (inside the peer manager) for Lightpush Client test instanse + const reputationEnabled = true + asyncSetup: handlerFuture = newPushHandlerFuture() handler = proc( @@ -73,7 +76,7 @@ suite "Waku Lightpush Client": server = await newTestWakuLightpushNode(serverSwitch, handler) serverFailsLightpush = await newTestWakuLightpushNode(serverSwitchFailsLightpush, handlerFailsLightpush) - client = newTestWakuLightpushClient(clientSwitch) + client = newTestWakuLightpushClient(clientSwitch, reputationEnabled) await allFutures( serverSwitch.start(), serverSwitchFailsLightpush.start(), clientSwitch.start() @@ -386,39 +389,3 @@ suite "Waku Lightpush Client": # Then the response is negative check not publishResponse.isOk() - - #[ - # TODO: adapt test for reputation-based peer selection after reputation logic is moved into PeerManager - asyncTest "Peer Selection for Lighpush with Reputation": - # add a peer that does not support the Lightpush protocol to the client's PeerManager - client.peerManager.addPeer(serverRemotePeerInfoFailsLightpush) - - # try publishing via a failing peer - let publishResponse1 = await client.publishToAny(pubsubTopic, message) - - check not publishResponse1.isOk() - - if client.reputationManager.isSome: - client.reputationManager.get().setReputation(serverRemotePeerInfoFailsLightpush.peerId, some(false)) - - # add a peer that supports the Lightpush protocol to the client's PeerManager - client.peerManager.addPeer(serverRemotePeerInfo) # supports Lightpush - - # try publishing again - this time another (good) peer will be selected - let publishResponse2 = await client.publishToAny(pubsubTopic, message) - - check publishResponse2.isOk() - - if client.reputationManager.isSome: - client.reputationManager.get().setReputation(serverRemotePeerInfo.peerId, some(true)) - - if client.reputationManager.isSome: - # the reputation of a failed peer is negative - check client.reputationManager.get().getReputation( - serverRemotePeerInfoFailsLightpush.peerId - ) == some(false) - - # the reputation of a successful peer is positive - check client.reputationManager.get().getReputation(serverRemotePeerInfo.peerId) == - some(true) - ]# diff --git a/waku/incentivization/reputation_manager.nim b/waku/incentivization/reputation_manager.nim index 256b7a2ac..d9ca3b1aa 100644 --- a/waku/incentivization/reputation_manager.nim +++ b/waku/incentivization/reputation_manager.nim @@ -28,6 +28,8 @@ proc getReputation*(manager: ReputationManager, peer: PeerId): Option[bool] = else: result = none(bool) +### Lightpush-specific functionality ### + # Evaluate the quality of a LightPushResponse by checking its status code proc evaluateResponse*(response: LightPushResponse): ResponseQuality = if response.statusCode == LightpushStatusCode.SUCCESS.uint32: @@ -35,7 +37,7 @@ proc evaluateResponse*(response: LightPushResponse): ResponseQuality = else: return BadResponse -# Update reputation of the peer based on the quality of the response +# Update reputation of the peer based on LightPushResponse quality proc updateReputationFromResponse*( manager: var ReputationManager, peer: PeerId, response: LightPushResponse ) = diff --git a/waku/node/peer_manager/peer_manager.nim b/waku/node/peer_manager/peer_manager.nim index ba04b6b00..c94087d1e 100644 --- a/waku/node/peer_manager/peer_manager.nim +++ b/waku/node/peer_manager/peer_manager.nim @@ -21,7 +21,8 @@ import ../../waku_enr/capabilities, ../../waku_metadata, ./peer_store/peer_storage, - ./waku_peer_store + ./waku_peer_store, + ../../incentivization/reputation_manager export waku_peer_store, peer_storage, peers @@ -96,6 +97,7 @@ type PeerManager* = ref object of RootObj started: bool shardedPeerManagement: bool # temp feature flag onConnectionChange*: ConnectionChangeHandler + reputationManager*: Option[ReputationManager] #~~~~~~~~~~~~~~~~~~~# # Helper Functions # @@ -256,9 +258,23 @@ proc selectPeer*( # If not slotted, we select a random peer for the given protocol if peers.len > 0: + # if reputation is enabled, filter out bad-reputation peers + var preSelectedPeers = + if pm.reputationManager.isSome(): + peers.filterIt: + let rep = + try: + pm.reputationManager.get().getReputation(it.peerId) + except KeyError: + none(bool) + rep == none(bool) or rep == some(true) + else: + peers + let selectedPeer = preSelectedPeers[0] trace "Got peer from peerstore", - peerId = peers[0].peerId, multi = peers[0].addrs[0], protocol = proto - return some(peers[0]) + peerId = selectedPeer.peerId, multi = selectedPeer.addrs[0], protocol = proto + return some(selectedPeer) + trace "No peer found for protocol", protocol = proto return none(RemotePeerInfo) @@ -1020,6 +1036,7 @@ proc new*( maxFailedAttempts = MaxFailedAttempts, colocationLimit = DefaultColocationLimit, shardedPeerManagement = false, + reputationEnabled = false, ): PeerManager {.gcsafe.} = let capacity = switch.peerStore.capacity let maxConnections = switch.connManager.inSema.size @@ -1094,4 +1111,10 @@ proc new*( else: trace "no peer storage found" + pm.reputationManager = + if reputationEnabled: + some(ReputationManager.new()) + else: + none(ReputationManager) + return pm diff --git a/waku/node/waku_node.nim b/waku/node/waku_node.nim index c7bef00f9..e10d705ff 100644 --- a/waku/node/waku_node.nim +++ b/waku/node/waku_node.nim @@ -1124,8 +1124,7 @@ proc mountLightPush*( proc mountLightPushClient*(node: WakuNode) = info "mounting light push client" - node.wakuLightpushClient = - WakuLightPushClient.new(node.peerManager, node.rng, reputationEnabled = false) + node.wakuLightpushClient = WakuLightPushClient.new(node.peerManager, node.rng) proc lightpushPublishHandler( node: WakuNode, diff --git a/waku/waku_lightpush/client.nim b/waku/waku_lightpush/client.nim index 08127c06c..cdfcad424 100644 --- a/waku/waku_lightpush/client.nim +++ b/waku/waku_lightpush/client.nim @@ -20,23 +20,12 @@ logScope: type WakuLightPushClient* = ref object peerManager*: PeerManager rng*: ref rand.HmacDrbgContext - reputationManager*: Option[ReputationManager] publishObservers: seq[PublishObserver] proc new*( - T: type WakuLightPushClient, - peerManager: PeerManager, - rng: ref rand.HmacDrbgContext, - reputationEnabled: bool, + T: type WakuLightPushClient, peerManager: PeerManager, rng: ref rand.HmacDrbgContext ): T = - let reputationManager = - if reputationEnabled: - some(ReputationManager.new()) - else: - none(ReputationManager) - WakuLightPushClient( - peerManager: peerManager, rng: rng, reputationManager: reputationManager - ) + WakuLightPushClient(peerManager: peerManager, rng: rng) proc addPublishObserver*(wl: WakuLightPushClient, obs: PublishObserver) = wl.publishObservers.add(obs) @@ -57,8 +46,8 @@ proc sendPushRequest( buffer = await connection.readLp(DefaultMaxRpcSize.int) except LPStreamRemoteClosedError: error "Failed to read responose from peer", error = getCurrentExceptionMsg() - if wl.reputationManager.isSome: - wl.reputationManager.get().setReputation(peer.peerId, some(false)) + if wl.peerManager.reputationManager.isSome: + wl.peerManager.reputationManager.get().setReputation(peer.peerId, some(false)) return lightpushResultInternalError( "Failed to read response from peer: " & getCurrentExceptionMsg() ) @@ -66,20 +55,22 @@ proc sendPushRequest( let response = LightpushResponse.decode(buffer).valueOr: error "failed to decode response" waku_lightpush_v3_errors.inc(labelValues = [decodeRpcFailure]) - if wl.reputationManager.isSome: - wl.reputationManager.get().setReputation(peer.peerId, some(false)) + if wl.peerManager.reputationManager.isSome: + wl.peerManager.reputationManager.get().setReputation(peer.peerId, some(false)) return lightpushResultInternalError(decodeRpcFailure) if response.requestId != req.requestId and response.statusCode != TOO_MANY_REQUESTS.uint32: error "response failure, requestId mismatch", requestId = req.requestId, responseRequestId = response.requestId - if wl.reputationManager.isSome: - wl.reputationManager.get().setReputation(peer.peerId, some(false)) + if wl.peerManager.reputationManager.isSome: + wl.peerManager.reputationManager.get().setReputation(peer.peerId, some(false)) return lightpushResultInternalError("response failure, requestId mismatch") - if wl.reputationManager.isSome: - wl.reputationManager.get().updateReputationFromResponse(peer.peerId, response) + if wl.peerManager.reputationManager.isSome: + wl.peerManager.reputationManager.get().updateReputationFromResponse( + peer.peerId, response + ) return toPushResult(response) @@ -108,27 +99,6 @@ proc publish*( return lightpushSuccessResult(publishedCount) -# TODO: move selectPeerForLightPush logic into PeerManager -proc selectPeerForLightPush*( - wl: WakuLightPushClient -): Future[Result[RemotePeerInfo, string]] {.async, gcsafe.} = - let maxAttempts = if defined(reputation): 10 else: 1 - var attempts = 0 - var peerResult: Result[RemotePeerInfo, string] - while attempts < maxAttempts: - let candidate = wl.peerManager.selectPeer(WakuLightPushCodec, none(PubsubTopic)).valueOr: - return err("could not retrieve a peer supporting WakuLightPushCodec") - if wl.reputationManager.isSome(): - let reputation = wl.reputationManager.get().getReputation(candidate.peerId) - info "Peer selected", - peerId = candidate.peerId, reputation = $reputation, attempts = $attempts - if (reputation == some(false)): - attempts += 1 - continue - return ok(candidate) - warn "Maximum reputation-based retries exceeded; continuing with a bad-reputation peer." - return peerResult - proc publishToAny*( wl: WakuLightPushClient, pubSubTopic: PubsubTopic, message: WakuMessage ): Future[WakuLightPushResult] {.async, gcsafe.} =