feat: introduce reputation in PeerManager (#3322)

* chore: remove unnecessary comment

* move reputation manager logic to peer manager

* remove peer selection from lightpush client

* chore: lint fix
This commit is contained in:
Sergei Tikhomirov 2025-03-14 11:16:23 +01:00 committed by GitHub
parent df66fcd279
commit b1d646c6e0
8 changed files with 86 additions and 117 deletions

View File

@ -29,16 +29,6 @@ suite "Waku Incentivization PoC Reputation":
manager.setReputation(peerId1, some(true)) # Encodes GoodRep
check manager.getReputation(peerId1) == some(true)
#[
LightPushResponse* = object
requestId*: string
statusCode*: uint32
statusDesc*: Option[string]
relayPeerCount*: Option[uint32]
LightpushStatusCode
]#
test "incentivization PoC: reputation: evaluate LightPushResponse valid":
let validLightLightPushResponse =
LightPushResponse(requestId: "", statusCode: LightpushStatusCode.SUCCESS.uint32)

View File

@ -30,6 +30,7 @@ import
waku_lightpush/common,
waku_peer_exchange,
waku_metadata,
incentivization/reputation_manager,
],
./testlib/common,
./testlib/testutils,
@ -902,39 +903,57 @@ procSuite "Peer Manager":
# Add a peer[0] to the peerstore
pm.wakuPeerStore[AddressBook][peers[0].peerId] = peers[0].addrs
pm.wakuPeerStore[ProtoBook][peers[0].peerId] =
@[WakuRelayCodec, WakuStoreCodec, WakuFilterSubscribeCodec]
@[WakuRelayCodec, WakuStoreCodec, WakuFilterSubscribeCodec, WakuLightPushCodec]
# When no service peers, we get one from the peerstore
let selectedPeer1 = pm.selectPeer(WakuStoreCodec)
let selectedPeerWakuStore = pm.selectPeer(WakuStoreCodec)
check:
selectedPeer1.isSome() == true
selectedPeer1.get().peerId == peers[0].peerId
selectedPeerWakuStore.isSome() == true
selectedPeerWakuStore.get().peerId == peers[0].peerId
# Same for other protocol
let selectedPeer2 = pm.selectPeer(WakuFilterSubscribeCodec)
let selectedPeerWakuFilter = pm.selectPeer(WakuFilterSubscribeCodec)
check:
selectedPeer2.isSome() == true
selectedPeer2.get().peerId == peers[0].peerId
selectedPeerWakuFilter.isSome() == true
selectedPeerWakuFilter.get().peerId == peers[0].peerId
# And return none if we dont have any peer for that protocol
let selectedPeer3 = pm.selectPeer(WakuLegacyLightPushCodec)
let selectedPeerWakuLegacyLightpush = pm.selectPeer(WakuLegacyLightPushCodec)
check:
selectedPeer3.isSome() == false
selectedPeerWakuLegacyLightpush.isSome() == false
# Now we add service peers for different protocols peer[1..3]
pm.addServicePeer(peers[1], WakuStoreCodec)
pm.addServicePeer(peers[2], WakuLegacyLightPushCodec)
# Reputation: if no reputation is set (neutral-rep), return a peer
let selectedPeerWakuLightpush = pm.selectPeer(WakuLightPushCodec)
check:
selectedPeerWakuLightpush.isSome() == true
# Reputation: avoid negative-reputation peers
if pm.reputationManager.isSome():
var rm = pm.reputationManager.get()
# assign negative reputation to the peer
rm.setReputation(selectedPeerWakuLightpush.get().peerId, some(false))
# the peer is not selected because of negative reputation
check:
pm.selectPeer(WakuLightPushCodec).isNone()
# revert reputation to neutral
rm.setReputation(selectedPeerWakuLightpush.get().peerId, none(bool))
# the peer is selected again
check:
pm.selectPeer(WakuLightPushCodec).isSome()
# Now we add service peers for different protocols
# We no longer get one from the peerstore. Slots are being used instead.
let selectedPeer4 = pm.selectPeer(WakuStoreCodec)
pm.addServicePeer(peers[1], WakuStoreCodec)
let selectedPeerWakuStoreSlotted = pm.selectPeer(WakuStoreCodec)
check:
selectedPeer4.isSome() == true
selectedPeer4.get().peerId == peers[1].peerId
selectedPeerWakuStoreSlotted.isSome() == true
selectedPeerWakuStoreSlotted.get().peerId == peers[1].peerId
let selectedPeer5 = pm.selectPeer(WakuLegacyLightPushCodec)
pm.addServicePeer(peers[2], WakuLegacyLightPushCodec)
let selectedPeerWakuLegacyLightpushSlotted = pm.selectPeer(WakuLegacyLightPushCodec)
check:
selectedPeer5.isSome() == true
selectedPeer5.get().peerId == peers[2].peerId
selectedPeerWakuLegacyLightpushSlotted.isSome() == true
selectedPeerWakuLegacyLightpushSlotted.get().peerId == peers[2].peerId
test "peer manager cant have more max connections than peerstore size":
# Peerstore size can't be smaller than max connections

View File

@ -28,8 +28,7 @@ proc newTestWakuLightpushNode*(
return proto
proc newTestWakuLightpushClient*(
switch: Switch,
reputationEnabled: bool = false
switch: Switch, reputationEnabled: bool = false
): WakuLightPushClient =
let peerManager = PeerManager.new(switch)
WakuLightPushClient.new(peerManager, rng, reputationEnabled)
let peerManager = PeerManager.new(switch, reputationEnabled = reputationEnabled)
WakuLightPushClient.new(peerManager, rng)

View File

@ -43,6 +43,9 @@ suite "Waku Lightpush Client":
contentTopic {.threadvar.}: ContentTopic
message {.threadvar.}: WakuMessage
# Use reputation manager (inside the peer manager) for Lightpush Client test instanse
const reputationEnabled = true
asyncSetup:
handlerFuture = newPushHandlerFuture()
handler = proc(
@ -73,7 +76,7 @@ suite "Waku Lightpush Client":
server = await newTestWakuLightpushNode(serverSwitch, handler)
serverFailsLightpush =
await newTestWakuLightpushNode(serverSwitchFailsLightpush, handlerFailsLightpush)
client = newTestWakuLightpushClient(clientSwitch)
client = newTestWakuLightpushClient(clientSwitch, reputationEnabled)
await allFutures(
serverSwitch.start(), serverSwitchFailsLightpush.start(), clientSwitch.start()
@ -386,39 +389,3 @@ suite "Waku Lightpush Client":
# Then the response is negative
check not publishResponse.isOk()
#[
# TODO: adapt test for reputation-based peer selection after reputation logic is moved into PeerManager
asyncTest "Peer Selection for Lighpush with Reputation":
# add a peer that does not support the Lightpush protocol to the client's PeerManager
client.peerManager.addPeer(serverRemotePeerInfoFailsLightpush)
# try publishing via a failing peer
let publishResponse1 = await client.publishToAny(pubsubTopic, message)
check not publishResponse1.isOk()
if client.reputationManager.isSome:
client.reputationManager.get().setReputation(serverRemotePeerInfoFailsLightpush.peerId, some(false))
# add a peer that supports the Lightpush protocol to the client's PeerManager
client.peerManager.addPeer(serverRemotePeerInfo) # supports Lightpush
# try publishing again - this time another (good) peer will be selected
let publishResponse2 = await client.publishToAny(pubsubTopic, message)
check publishResponse2.isOk()
if client.reputationManager.isSome:
client.reputationManager.get().setReputation(serverRemotePeerInfo.peerId, some(true))
if client.reputationManager.isSome:
# the reputation of a failed peer is negative
check client.reputationManager.get().getReputation(
serverRemotePeerInfoFailsLightpush.peerId
) == some(false)
# the reputation of a successful peer is positive
check client.reputationManager.get().getReputation(serverRemotePeerInfo.peerId) ==
some(true)
]#

View File

@ -28,6 +28,8 @@ proc getReputation*(manager: ReputationManager, peer: PeerId): Option[bool] =
else:
result = none(bool)
### Lightpush-specific functionality ###
# Evaluate the quality of a LightPushResponse by checking its status code
proc evaluateResponse*(response: LightPushResponse): ResponseQuality =
if response.statusCode == LightpushStatusCode.SUCCESS.uint32:
@ -35,7 +37,7 @@ proc evaluateResponse*(response: LightPushResponse): ResponseQuality =
else:
return BadResponse
# Update reputation of the peer based on the quality of the response
# Update reputation of the peer based on LightPushResponse quality
proc updateReputationFromResponse*(
manager: var ReputationManager, peer: PeerId, response: LightPushResponse
) =

View File

@ -21,7 +21,8 @@ import
../../waku_enr/capabilities,
../../waku_metadata,
./peer_store/peer_storage,
./waku_peer_store
./waku_peer_store,
../../incentivization/reputation_manager
export waku_peer_store, peer_storage, peers
@ -96,6 +97,7 @@ type PeerManager* = ref object of RootObj
started: bool
shardedPeerManagement: bool # temp feature flag
onConnectionChange*: ConnectionChangeHandler
reputationManager*: Option[ReputationManager]
#~~~~~~~~~~~~~~~~~~~#
# Helper Functions #
@ -256,9 +258,23 @@ proc selectPeer*(
# If not slotted, we select a random peer for the given protocol
if peers.len > 0:
# if reputation is enabled, filter out bad-reputation peers
var preSelectedPeers =
if pm.reputationManager.isSome():
peers.filterIt:
let rep =
try:
pm.reputationManager.get().getReputation(it.peerId)
except KeyError:
none(bool)
rep == none(bool) or rep == some(true)
else:
peers
let selectedPeer = preSelectedPeers[0]
trace "Got peer from peerstore",
peerId = peers[0].peerId, multi = peers[0].addrs[0], protocol = proto
return some(peers[0])
peerId = selectedPeer.peerId, multi = selectedPeer.addrs[0], protocol = proto
return some(selectedPeer)
trace "No peer found for protocol", protocol = proto
return none(RemotePeerInfo)
@ -1020,6 +1036,7 @@ proc new*(
maxFailedAttempts = MaxFailedAttempts,
colocationLimit = DefaultColocationLimit,
shardedPeerManagement = false,
reputationEnabled = false,
): PeerManager {.gcsafe.} =
let capacity = switch.peerStore.capacity
let maxConnections = switch.connManager.inSema.size
@ -1094,4 +1111,10 @@ proc new*(
else:
trace "no peer storage found"
pm.reputationManager =
if reputationEnabled:
some(ReputationManager.new())
else:
none(ReputationManager)
return pm

View File

@ -1124,8 +1124,7 @@ proc mountLightPush*(
proc mountLightPushClient*(node: WakuNode) =
info "mounting light push client"
node.wakuLightpushClient =
WakuLightPushClient.new(node.peerManager, node.rng, reputationEnabled = false)
node.wakuLightpushClient = WakuLightPushClient.new(node.peerManager, node.rng)
proc lightpushPublishHandler(
node: WakuNode,

View File

@ -20,23 +20,12 @@ logScope:
type WakuLightPushClient* = ref object
peerManager*: PeerManager
rng*: ref rand.HmacDrbgContext
reputationManager*: Option[ReputationManager]
publishObservers: seq[PublishObserver]
proc new*(
T: type WakuLightPushClient,
peerManager: PeerManager,
rng: ref rand.HmacDrbgContext,
reputationEnabled: bool,
T: type WakuLightPushClient, peerManager: PeerManager, rng: ref rand.HmacDrbgContext
): T =
let reputationManager =
if reputationEnabled:
some(ReputationManager.new())
else:
none(ReputationManager)
WakuLightPushClient(
peerManager: peerManager, rng: rng, reputationManager: reputationManager
)
WakuLightPushClient(peerManager: peerManager, rng: rng)
proc addPublishObserver*(wl: WakuLightPushClient, obs: PublishObserver) =
wl.publishObservers.add(obs)
@ -57,8 +46,8 @@ proc sendPushRequest(
buffer = await connection.readLp(DefaultMaxRpcSize.int)
except LPStreamRemoteClosedError:
error "Failed to read responose from peer", error = getCurrentExceptionMsg()
if wl.reputationManager.isSome:
wl.reputationManager.get().setReputation(peer.peerId, some(false))
if wl.peerManager.reputationManager.isSome:
wl.peerManager.reputationManager.get().setReputation(peer.peerId, some(false))
return lightpushResultInternalError(
"Failed to read response from peer: " & getCurrentExceptionMsg()
)
@ -66,20 +55,22 @@ proc sendPushRequest(
let response = LightpushResponse.decode(buffer).valueOr:
error "failed to decode response"
waku_lightpush_v3_errors.inc(labelValues = [decodeRpcFailure])
if wl.reputationManager.isSome:
wl.reputationManager.get().setReputation(peer.peerId, some(false))
if wl.peerManager.reputationManager.isSome:
wl.peerManager.reputationManager.get().setReputation(peer.peerId, some(false))
return lightpushResultInternalError(decodeRpcFailure)
if response.requestId != req.requestId and
response.statusCode != TOO_MANY_REQUESTS.uint32:
error "response failure, requestId mismatch",
requestId = req.requestId, responseRequestId = response.requestId
if wl.reputationManager.isSome:
wl.reputationManager.get().setReputation(peer.peerId, some(false))
if wl.peerManager.reputationManager.isSome:
wl.peerManager.reputationManager.get().setReputation(peer.peerId, some(false))
return lightpushResultInternalError("response failure, requestId mismatch")
if wl.reputationManager.isSome:
wl.reputationManager.get().updateReputationFromResponse(peer.peerId, response)
if wl.peerManager.reputationManager.isSome:
wl.peerManager.reputationManager.get().updateReputationFromResponse(
peer.peerId, response
)
return toPushResult(response)
@ -108,27 +99,6 @@ proc publish*(
return lightpushSuccessResult(publishedCount)
# TODO: move selectPeerForLightPush logic into PeerManager
proc selectPeerForLightPush*(
wl: WakuLightPushClient
): Future[Result[RemotePeerInfo, string]] {.async, gcsafe.} =
let maxAttempts = if defined(reputation): 10 else: 1
var attempts = 0
var peerResult: Result[RemotePeerInfo, string]
while attempts < maxAttempts:
let candidate = wl.peerManager.selectPeer(WakuLightPushCodec, none(PubsubTopic)).valueOr:
return err("could not retrieve a peer supporting WakuLightPushCodec")
if wl.reputationManager.isSome():
let reputation = wl.reputationManager.get().getReputation(candidate.peerId)
info "Peer selected",
peerId = candidate.peerId, reputation = $reputation, attempts = $attempts
if (reputation == some(false)):
attempts += 1
continue
return ok(candidate)
warn "Maximum reputation-based retries exceeded; continuing with a bad-reputation peer."
return peerResult
proc publishToAny*(
wl: WakuLightPushClient, pubSubTopic: PubsubTopic, message: WakuMessage
): Future[WakuLightPushResult] {.async, gcsafe.} =