Merge branch 'feat/incentivization-poc-lightpush-integration-reputation' into feat/service-incentivization-poc

This commit is contained in:
Sergei Tikhomirov 2025-03-11 13:33:08 +01:00
commit 5edfdfa387
7 changed files with 191 additions and 36 deletions

View File

@ -6,7 +6,8 @@ import
stew/byteutils,
stint,
strutils,
tests/testlib/testasync
tests/testlib/testasync,
libp2p/[peerid, crypto/crypto]
import
waku/[node/peer_manager, waku_core],
@ -15,16 +16,18 @@ import
suite "Waku Incentivization PoC Reputation":
var manager {.threadvar.}: ReputationManager
var peerId1 {.threadvar.}: PeerId
setup:
manager = ReputationManager.init()
peerId1 = PeerId.init(PrivateKey.random(ECDSA, (newRng())[]).tryGet()).tryGet()
test "incentivization PoC: reputation: reputation table is empty after initialization":
check manager.reputationOf.len == 0
test "incentivization PoC: reputation: set and get reputation":
manager.setReputation("peer1", some(true)) # Encodes GoodRep
check manager.getReputation("peer1") == some(true)
manager.setReputation(peerId1, some(true)) # Encodes GoodRep
check manager.getReputation(peerId1) == some(true)
test "incentivization PoC: reputation: evaluate PushResponse valid":
let validLightpushResponse =
@ -37,18 +40,14 @@ suite "Waku Incentivization PoC Reputation":
check evaluateResponse(invalidLightpushResponse) == BadResponse
test "incentivization PoC: reputation: updateReputationFromResponse valid":
let peerId = "peerWithValidResponse"
let validResp = PushResponse(isSuccess: true, info: some("All good"))
manager.updateReputationFromResponse(peerId, validResp)
check manager.getReputation(peerId) == some(true)
manager.updateReputationFromResponse(peerId1, validResp)
check manager.getReputation(peerId1) == some(true)
test "incentivization PoC: reputation: updateReputationFromResponse invalid":
let peerId = "peerWithInvalidResponse"
let invalidResp = PushResponse(isSuccess: false, info: none(string))
manager.updateReputationFromResponse(peerId, invalidResp)
check manager.getReputation(peerId) == some(false)
manager.updateReputationFromResponse(peerId1, invalidResp)
check manager.getReputation(peerId1) == some(false)
test "incentivization PoC: reputation: default is None":
let unknownPeerId = "unknown_peer"
# The peer is not in the table yet
check manager.getReputation(unknownPeerId) == none(bool)
check manager.getReputation(peerId1) == none(bool)

View File

@ -9,7 +9,8 @@ import
waku/waku_lightpush,
waku/waku_lightpush/[client, common],
waku/common/rate_limit/setting,
../testlib/[common, wakucore]
../testlib/[common, wakucore],
waku/incentivization/reputation_manager
proc newTestWakuLightpushNode*(
switch: Switch,
@ -26,6 +27,9 @@ proc newTestWakuLightpushNode*(
return proto
proc newTestWakuLightpushClient*(switch: Switch): WakuLightPushClient =
proc newTestWakuLightpushClient*(
switch: Switch,
reputationEnabled: bool = false
): WakuLightPushClient =
let peerManager = PeerManager.new(switch)
WakuLightPushClient.new(peerManager, rng)
WakuLightPushClient.new(peerManager, rng, reputationEnabled)

View File

@ -14,6 +14,9 @@ import
waku_lightpush,
waku_lightpush/client,
waku_lightpush/protocol_metrics,
waku_lightpush/rpc,
waku_lightpush/rpc_codec,
/incentivization/reputation_manager,
],
../testlib/[assertions, wakucore, testasync, futures, testutils],
./lightpush_utils,
@ -22,19 +25,29 @@ import
suite "Waku Lightpush Client":
var
handlerFuture {.threadvar.}: Future[(PubsubTopic, WakuMessage)]
handlerFutureFailsLightpush {.threadvar.}: Future[void]
handler {.threadvar.}: PushMessageHandler
handlerFailsLightpush {.threadvar.}: PushMessageHandler
serverSwitch {.threadvar.}: Switch
serverSwitchFailsLightpush {.threadvar.}: Switch
clientSwitch {.threadvar.}: Switch
server {.threadvar.}: WakuLightPush
serverFailsLightpush {.threadvar.}: WakuLightPush
client {.threadvar.}: WakuLightPushClient
serverRemotePeerInfo {.threadvar.}: RemotePeerInfo
serverRemotePeerInfoFailsLightpush {.threadvar.}: RemotePeerInfo
clientPeerId {.threadvar.}: PeerId
pubsubTopic {.threadvar.}: PubsubTopic
contentTopic {.threadvar.}: ContentTopic
message {.threadvar.}: WakuMessage
const handlerError = "handler-error"
asyncSetup:
handlerFuture = newPushHandlerFuture()
handler = proc(
@ -45,24 +58,41 @@ suite "Waku Lightpush Client":
return
lighpushErrorResult(PAYLOAD_TOO_LARGE, "length greater than maxMessageSize")
handlerFuture.complete((pubsubTopic, message))
# return that we published the message to 1 peer.
return ok(1)
return ok()
# A Lightpush server that fails
handlerFutureFailsLightpush = newFuture[void]()
handlerFailsLightpush = proc(
peer: PeerId, pubsubTopic: PubsubTopic, message: WakuMessage
): Future[WakuLightPushResult[void]] {.async.} =
handlerFutureFailsLightpush.complete()
return err(handlerError)
serverSwitch = newTestSwitch()
serverSwitchFailsLightpush = newTestSwitch()
clientSwitch = newTestSwitch()
server = await newTestWakuLightpushNode(serverSwitch, handler)
client = newTestWakuLightpushClient(clientSwitch)
await allFutures(serverSwitch.start(), clientSwitch.start())
server = await newTestWakuLightpushNode(serverSwitch, handler)
serverFailsLightpush =
await newTestWakuLightpushNode(serverSwitchFailsLightpush, handlerFailsLightpush)
client = newTestWakuLightpushClient(clientSwitch, reputationEnabled = true)
await allFutures(
serverSwitch.start(), serverSwitchFailsLightpush.start(), clientSwitch.start()
)
serverRemotePeerInfo = serverSwitch.peerInfo.toRemotePeerInfo()
serverRemotePeerInfoFailsLightpush =
serverSwitchFailsLightpush.peerInfo.toRemotePeerInfo()
clientPeerId = clientSwitch.peerInfo.peerId
pubsubTopic = DefaultPubsubTopic
contentTopic = DefaultContentTopic
message = fakeWakuMessage()
asyncTeardown:
await allFutures(clientSwitch.stop(), serverSwitch.stop())
await allFutures(
clientSwitch.stop(), serverSwitch.stop(), serverSwitchFailsLightpush.stop()
)
suite "Verification of PushRequest Payload":
asyncTest "Valid Payload Types":
@ -302,6 +332,7 @@ suite "Waku Lightpush Client":
# When publishing a payload
let publishResponse =
await client.publish(some(pubsubTopic), message, serverRemotePeerInfo2)
await client.publish(pubsubTopic, message, serverRemotePeerInfoFailsLightpush)
# Then the response is negative
check:
@ -311,6 +342,8 @@ suite "Waku Lightpush Client":
# Cleanup
await serverSwitch2.stop()
publishResponse.error() == handlerError
(await handlerFutureFailsLightpush.waitForResult()).isOk()
suite "Verification of PushResponse Payload":
asyncTest "Positive Responses":
@ -321,19 +354,83 @@ suite "Waku Lightpush Client":
# Then the response is positive
assertResultOk publishResponse
if client.reputationManager.isSome:
check client.reputationManager.get().getReputation(serverRemotePeerInfo.peerId) ==
some(true)
# TODO: Improve: Add more negative responses variations
asyncTest "Negative Responses":
# Given a server that does not support Waku Lightpush
let
serverSwitch2 = newTestSwitch()
serverRemotePeerInfo2 = serverSwitch2.peerInfo.toRemotePeerInfo()
serverSwitchFailsLightpush = newTestSwitch()
serverRemotePeerInfoFailsLightpush =
serverSwitchFailsLightpush.peerInfo.toRemotePeerInfo()
await serverSwitch2.start()
await serverSwitchFailsLightpush.start()
# When sending an invalid PushRequest
let publishResponse =
await client.publish(some(pubsubTopic), message, serverRemotePeerInfo2)
await client.publish(pubsubTopic, message, serverRemotePeerInfoFailsLightpush)
# Then the response is negative
check not publishResponse.isOk()
check publishResponse.error.code == LightpushStatusCode.NO_PEERS_TO_RELAY
if client.reputationManager.isSome:
check client.reputationManager.get().getReputation(
serverRemotePeerInfoFailsLightpush.peerId
) == some(false)
asyncTest "Positive Publish To Any":
# add a peer that supports the Lightpush protocol to the client's PeerManager
client.peerManager.addPeer(serverRemotePeerInfo) # supports Lightpush
# When sending a valid PushRequest using publishToAny
let publishResponse = await client.publishToAny(pubsubTopic, message)
# Then the response is positive
check publishResponse.isOk()
asyncTest "Negative Publish To Any":
# add a peer that does not support the Lightpush protocol to the client's PeerManager
client.peerManager.addPeer(serverRemotePeerInfoFailsLightpush)
# does not support Lightpush
# When sending a PushRequest using publishToAny to the only peer that doesn't support Lightpush
let publishResponse = await client.publishToAny(pubsubTopic, message)
# Then the response is negative
check not publishResponse.isOk()
asyncTest "Peer Selection for Lighpush with Reputation":
# add a peer that does not support the Lightpush protocol to the client's PeerManager
client.peerManager.addPeer(serverRemotePeerInfoFailsLightpush)
# try publishing via a failing peer
let publishResponse1 = await client.publishToAny(pubsubTopic, message)
check not publishResponse1.isOk()
if client.reputationManager.isSome:
client.reputationManager.get().setReputation(serverRemotePeerInfoFailsLightpush.peerId, some(false))
# add a peer that supports the Lightpush protocol to the client's PeerManager
client.peerManager.addPeer(serverRemotePeerInfo) # supports Lightpush
# try publishing again - this time another (good) peer will be selected
let publishResponse2 = await client.publishToAny(pubsubTopic, message)
check publishResponse2.isOk()
if client.reputationManager.isSome:
client.reputationManager.get().setReputation(serverRemotePeerInfo.peerId, some(true))
if client.reputationManager.isSome:
# the reputation of a failed peer is negative
check client.reputationManager.get().getReputation(
serverRemotePeerInfoFailsLightpush.peerId
) == some(false)
# the reputation of a successful peer is positive
check client.reputationManager.get().getReputation(serverRemotePeerInfo.peerId) ==
some(true)

View File

@ -496,6 +496,13 @@ hence would have reachability issues.""",
name: "lightpushnode"
.}: string
## Reputation config
reputationEnabled* {.
desc: "Enable client-side reputation for light protocols: true|false",
defaultValue: false,
name: "reputation"
.}: bool
## Reliability config
reliabilityEnabled* {.
desc:

View File

@ -374,6 +374,8 @@ proc setupProtocols(
else:
return err("failed to set node waku lightpush peer: " & lightPushNode.error)
## TODO: initialize reputation manager here??
# Filter setup. NOTE Must be mounted after relay
if conf.filter:
try:

View File

@ -1,9 +1,8 @@
import tables, std/options
import ../waku_lightpush_legacy/rpc
import ../waku_lightpush/[rpc, common]
import libp2p/peerid
type
PeerId = string
ResponseQuality* = enum
BadResponse
GoodResponse
@ -29,16 +28,16 @@ proc getReputation*(manager: ReputationManager, peer: PeerId): Option[bool] =
else:
result = none(bool)
# Evaluate the quality of a PushResponse by checking its isSuccess field
proc evaluateResponse*(response: PushResponse): ResponseQuality =
if response.isSuccess:
# Evaluate the quality of a LightPushResponse by checking its status code
proc evaluateResponse*(response: LightPushResponse): ResponseQuality =
if response.statusCode == LightpushStatusCode.SUCCESS.uint32:
return GoodResponse
else:
return BadResponse
# Update reputation of the peer based on the quality of the response
proc updateReputationFromResponse*(
manager: var ReputationManager, peer: PeerId, response: PushResponse
manager: var ReputationManager, peer: PeerId, response: LightPushResponse
) =
let respQuality = evaluateResponse(response)
case respQuality

View File

@ -11,7 +11,8 @@ import
./common,
./protocol_metrics,
./rpc,
./rpc_codec
./rpc_codec,
../incentivization/reputation_manager
logScope:
topics = "waku lightpush client"
@ -19,12 +20,23 @@ logScope:
type WakuLightPushClient* = ref object
peerManager*: PeerManager
rng*: ref rand.HmacDrbgContext
reputationManager*: Option[ReputationManager]
publishObservers: seq[PublishObserver]
proc new*(
T: type WakuLightPushClient, peerManager: PeerManager, rng: ref rand.HmacDrbgContext
T: type WakuLightPushClient,
peerManager: PeerManager,
rng: ref rand.HmacDrbgContext,
reputationEnabled: bool,
): T =
WakuLightPushClient(peerManager: peerManager, rng: rng)
let reputationManager =
if reputationEnabled:
some(ReputationManager.new())
else:
none(ReputationManager)
WakuLightPushClient(
peerManager: peerManager, rng: rng, reputationManager: reputationManager
)
proc addPublishObserver*(wl: WakuLightPushClient, obs: PublishObserver) =
wl.publishObservers.add(obs)
@ -60,6 +72,9 @@ proc sendPushRequest(
requestId = req.requestId, responseRequestId = response.requestId
return lightpushResultInternalError("response failure, requestId mismatch")
if wl.reputationManager.isSome:
wl.reputationManager.get().updateReputationFromResponse(peer.peerId, response)
return toPushResult(response)
proc publish*(
@ -82,11 +97,36 @@ proc publish*(
)
let publishedCount = ?await wl.sendPushRequest(pushRequest, peer)
# FIXME: adapt for Lightpush v3 error reporting
if pushResult.isErr:
if wl.reputationManager.isSome:
wl.reputationManager.get().setReputation(peer.peerId, some(false))
return err(pushResult.error)
for obs in wl.publishObservers:
obs.onMessagePublished(pubSubTopic.get(""), message)
return lightpushSuccessResult(publishedCount)
proc selectPeerForLightPush*(
wl: WakuLightPushClient
): Future[Result[RemotePeerInfo, string]] {.async, gcsafe.} =
let maxAttempts = if defined(reputation): 10 else: 1
var attempts = 0
var peerResult: Result[RemotePeerInfo, string]
while attempts < maxAttempts:
let candidate = wl.peerManager.selectPeer(WakuLightPushCodec, none(PubsubTopic)).valueOr:
return err("could not retrieve a peer supporting WakuLightPushCodec")
if wl.reputationManager.isSome():
let reputation = wl.reputationManager.get().getReputation(candidate.peerId)
info "Peer selected", peerId = candidate.peerId, reputation = $reputation, attempts = $attempts
if (reputation == some(false)):
attempts += 1
continue
return ok(candidate)
warn "Maximum reputation-based retries exceeded; continuing with a bad-reputation peer."
return peerResult
proc publishToAny*(
wl: WakuLightPushClient, pubSubTopic: PubsubTopic, message: WakuMessage
): Future[WakuLightPushResult] {.async, gcsafe.} =
@ -94,7 +134,6 @@ proc publishToAny*(
## we don't specify a particular peer and instead we get it from peer manager
info "publishToAny", msg_hash = computeMessageHash(pubsubTopic, message).to0xHex
let peer = wl.peerManager.selectPeer(WakuLightPushCodec).valueOr:
# TODO: check if it is matches the situation - shall we distinguish client side missing peers from server side?
return lighpushErrorResult(NO_PEERS_TO_RELAY, "no suitable remote peers")
@ -106,6 +145,14 @@ proc publishToAny*(
)
let publishedCount = ?await wl.sendPushRequest(pushRequest, peer)
# FIXME
#[
let peer = wl.peerManager.selectPeer(WakuLightPushCodec).valueOr:
return err("could not retrieve a peer supporting WakuLightPushCodec")
let pushRequest = PushRequest(pubSubTopic: pubSubTopic, message: message)
?await wl.sendPushRequest(pushRequest, peer)
]#
for obs in wl.publishObservers:
obs.onMessagePublished(pubSubTopic, message)