mirror of
https://github.com/logos-messaging/logos-messaging-nim.git
synced 2026-01-15 20:33:10 +00:00
integrate reputation manager into Lightpush behind a feature flag
This commit is contained in:
parent
a1901a044e
commit
b8cdd3390b
@ -46,7 +46,7 @@ suite "Waku Incentivization PoC Reputation":
|
||||
let peerId = "peerWithInvalidResponse"
|
||||
let invalidResp = PushResponse(isSuccess: false, info: none(string))
|
||||
manager.updateReputationFromResponse(peerId, invalidResp)
|
||||
check manager.getReputation(peerId) == some(false)
|
||||
check manager.getReputation(peerId) == some(false)
|
||||
|
||||
test "incentivization PoC: reputation: default is None":
|
||||
let unknownPeerId = "unknown_peer"
|
||||
|
||||
@ -8,7 +8,8 @@ import
|
||||
waku/waku_lightpush,
|
||||
waku/waku_lightpush/[client, common],
|
||||
waku/common/rate_limit/setting,
|
||||
../testlib/[common, wakucore]
|
||||
../testlib/[common, wakucore],
|
||||
waku/incentivization/reputation_manager
|
||||
|
||||
proc newTestWakuLightpushNode*(
|
||||
switch: Switch,
|
||||
@ -26,4 +27,5 @@ proc newTestWakuLightpushNode*(
|
||||
|
||||
proc newTestWakuLightpushClient*(switch: Switch): WakuLightPushClient =
|
||||
let peerManager = PeerManager.new(switch)
|
||||
WakuLightPushClient.new(peerManager, rng)
|
||||
let reputationManager = ReputationManager.new()
|
||||
WakuLightPushClient.new(peerManager, reputationManager, rng)
|
||||
|
||||
@ -1,9 +1,8 @@
|
||||
import tables, std/options
|
||||
import waku/waku_lightpush/rpc
|
||||
import libp2p/peerid
|
||||
|
||||
type
|
||||
PeerId = string
|
||||
|
||||
ResponseQuality* = enum
|
||||
BadResponse
|
||||
GoodResponse
|
||||
@ -13,17 +12,17 @@ type
|
||||
# some(false) => BadRep
|
||||
# none(bool) => unknown / not set
|
||||
ReputationManager* = ref object
|
||||
reputationOf*: Table[PeerId, Option[bool]]
|
||||
reputationOf*: Table[PeerID, Option[bool]]
|
||||
|
||||
proc init*(T: type ReputationManager): ReputationManager =
|
||||
return ReputationManager(reputationOf: initTable[PeerId, Option[bool]]())
|
||||
return ReputationManager(reputationOf: initTable[PeerID, Option[bool]]())
|
||||
|
||||
proc setReputation*(
|
||||
manager: var ReputationManager, peer: PeerId, repValue: Option[bool]
|
||||
manager: var ReputationManager, peer: PeerID, repValue: Option[bool]
|
||||
) =
|
||||
manager.reputationOf[peer] = repValue
|
||||
|
||||
proc getReputation*(manager: ReputationManager, peer: PeerId): Option[bool] =
|
||||
proc getReputation*(manager: ReputationManager, peer: PeerID): Option[bool] =
|
||||
if peer in manager.reputationOf:
|
||||
result = manager.reputationOf[peer]
|
||||
else:
|
||||
@ -38,7 +37,7 @@ proc evaluateResponse*(response: PushResponse): ResponseQuality =
|
||||
|
||||
# Update reputation of the peer based on the quality of the response
|
||||
proc updateReputationFromResponse*(
|
||||
manager: var ReputationManager, peer: PeerId, response: PushResponse
|
||||
manager: var ReputationManager, peer: PeerID, response: PushResponse
|
||||
) =
|
||||
let respQuality = evaluateResponse(response)
|
||||
case respQuality
|
||||
|
||||
@ -11,20 +11,27 @@ import
|
||||
./common,
|
||||
./protocol_metrics,
|
||||
./rpc,
|
||||
./rpc_codec
|
||||
./rpc_codec,
|
||||
../incentivization/reputation_manager
|
||||
|
||||
logScope:
|
||||
topics = "waku lightpush client"
|
||||
|
||||
type WakuLightPushClient* = ref object
|
||||
peerManager*: PeerManager
|
||||
reputationManager*: ReputationManager
|
||||
rng*: ref rand.HmacDrbgContext
|
||||
publishObservers: seq[PublishObserver]
|
||||
|
||||
proc new*(
|
||||
T: type WakuLightPushClient, peerManager: PeerManager, rng: ref rand.HmacDrbgContext
|
||||
T: type WakuLightPushClient,
|
||||
peerManager: PeerManager,
|
||||
reputationManager: ReputationManager,
|
||||
rng: ref rand.HmacDrbgContext,
|
||||
): T =
|
||||
WakuLightPushClient(peerManager: peerManager, rng: rng)
|
||||
WakuLightPushClient(
|
||||
peerManager: peerManager, reputationManager: reputationManager, rng: rng
|
||||
)
|
||||
|
||||
proc addPublishObserver*(wl: WakuLightPushClient, obs: PublishObserver) =
|
||||
wl.publishObservers.add(obs)
|
||||
@ -65,6 +72,9 @@ proc sendPushRequest(
|
||||
else:
|
||||
return err("unknown failure")
|
||||
|
||||
when defined(reputation):
|
||||
wl.reputationManager.updateReputationFromResponse(peer.peerId, response)
|
||||
|
||||
return ok()
|
||||
|
||||
proc publish*(
|
||||
@ -82,13 +92,20 @@ proc publish*(
|
||||
obs.onMessagePublished(pubSubTopic, message)
|
||||
|
||||
notice "publishing message with lightpush",
|
||||
pubsubTopic = pubsubTopic,
|
||||
pubsubTopic = pubSubTopic,
|
||||
contentTopic = message.contentTopic,
|
||||
target_peer_id = peer.peerId,
|
||||
msg_hash = msg_hash_hex_str
|
||||
|
||||
return ok(msg_hash_hex_str)
|
||||
|
||||
proc selectPeerFromPeerManager(
|
||||
wl: WakuLightPushClient
|
||||
): Future[Result[RemotePeerInfo, string]] {.async, gcsafe.} =
|
||||
let peer = wl.peerManager.selectPeer(WakuLightPushCodec, none(PubsubTopic)).valueOr:
|
||||
return err("could not retrieve a peer supporting WakuLightPushCodec")
|
||||
return ok(peer)
|
||||
|
||||
proc publishToAny*(
|
||||
wl: WakuLightPushClient, pubSubTopic: PubsubTopic, message: WakuMessage
|
||||
): Future[WakuLightPushResult[void]] {.async, gcsafe.} =
|
||||
@ -97,8 +114,26 @@ proc publishToAny*(
|
||||
|
||||
info "publishToAny", msg_hash = computeMessageHash(pubsubTopic, message).to0xHex
|
||||
|
||||
let peer = wl.peerManager.selectPeer(WakuLightPushCodec).valueOr:
|
||||
return err("could not retrieve a peer supporting WakuLightPushCodec")
|
||||
var peer: RemotePeerInfo
|
||||
|
||||
when defined(reputation):
|
||||
const maxReputationAttempts = 10
|
||||
var attempts = 0
|
||||
while attempts < maxReputationAttempts:
|
||||
let peerResult = await wl.selectPeerFromPeerManager()
|
||||
if peerResult.isErr:
|
||||
return err(peerResult.error)
|
||||
peer = peerResult.get()
|
||||
if not (wl.reputationManager.getReputation(peer.peerId) == some(false)):
|
||||
break
|
||||
attempts += 1
|
||||
if attempts >= maxReputationAttempts:
|
||||
warn "Maximum reputation-based retries exceeded; continuing with a bad-reputation peer."
|
||||
else:
|
||||
let peerResult = await wl.selectPeerFromPeerManager()
|
||||
if peerResult.isErr:
|
||||
return err(peerResult.error)
|
||||
peer = peerResult.get()
|
||||
|
||||
let pushRequest = PushRequest(pubSubTopic: pubSubTopic, message: message)
|
||||
?await wl.sendPushRequest(pushRequest, peer)
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user