Sergei Tikhomirov e0edb717d0 fix merge issues
2025-07-10 19:42:26 +02:00

130 lines
4.8 KiB
Nim

{.push raises: [].}
import std/options, results, chronicles, chronos, metrics, bearssl/rand, stew/byteutils
import libp2p/peerid
import
../waku_core/peers,
../node/peer_manager,
../node/delivery_monitor/publish_observer,
../utils/requests,
../waku_core,
./common,
./protocol_metrics,
./rpc,
./rpc_codec,
../incentivization/[reputation_manager, eligibility_manager, rpc]
logScope:
topics = "waku lightpush client"
type WakuLightPushClient* = ref object
peerManager*: PeerManager
rng*: ref rand.HmacDrbgContext
publishObservers: seq[PublishObserver]
proc new*(
T: type WakuLightPushClient, peerManager: PeerManager, rng: ref rand.HmacDrbgContext
): T =
WakuLightPushClient(peerManager: peerManager, rng: rng)
proc addPublishObserver*(wl: WakuLightPushClient, obs: PublishObserver) =
wl.publishObservers.add(obs)
proc sendPushRequest(
wl: WakuLightPushClient, req: LightPushRequest, peer: PeerId | RemotePeerInfo
): Future[WakuLightPushResult] {.async.} =
let connection = (await wl.peerManager.dialPeer(peer, WakuLightPushCodec)).valueOr:
waku_lightpush_v3_errors.inc(labelValues = [dialFailure])
return lighpushErrorResult(
LightPushErrorCode.NO_PEERS_TO_RELAY,
dialFailure & ": " & $peer & " is not accessible",
)
await connection.writeLP(req.encode().buffer)
var buffer: seq[byte]
try:
buffer = await connection.readLp(DefaultMaxRpcSize.int)
except LPStreamRemoteClosedError:
error "Failed to read responose from peer", error = getCurrentExceptionMsg()
if wl.peerManager.reputationManager.isSome:
wl.peerManager.reputationManager.get().setReputation(peer.peerId, some(false))
return lightpushResultInternalError(
"Failed to read response from peer: " & getCurrentExceptionMsg()
)
let response = LightpushResponse.decode(buffer).valueOr:
error "failed to decode response"
waku_lightpush_v3_errors.inc(labelValues = [decodeRpcFailure])
if wl.peerManager.reputationManager.isSome:
wl.peerManager.reputationManager.get().setReputation(peer.peerId, some(false))
return lightpushResultInternalError(decodeRpcFailure)
if response.requestId != req.requestId and
response.statusCode != LightPushErrorCode.TOO_MANY_REQUESTS:
error "response failure, requestId mismatch",
requestId = req.requestId, responseRequestId = response.requestId
if wl.peerManager.reputationManager.isSome:
wl.peerManager.reputationManager.get().setReputation(peer.peerId, some(false))
return lightpushResultInternalError("response failure, requestId mismatch")
if wl.peerManager.reputationManager.isSome:
wl.peerManager.reputationManager.get().updateReputationFromResponse(
peer.peerId, response
)
return toPushResult(response)
proc publish*(
wl: WakuLightPushClient,
pubSubTopic: Option[PubsubTopic] = none(PubsubTopic),
wakuMessage: WakuMessage,
eligibilityProof: Option[EligibilityProof] = none(EligibilityProof),
peer: PeerId | RemotePeerInfo,
): Future[WakuLightPushResult] {.async, gcsafe.} =
var message = wakuMessage
if message.timestamp == 0:
message.timestamp = getNowInNanosecondTime()
# FIXME: make more DRY
when peer is PeerId:
info "publish",
peerId = shortLog(peer),
msg_hash = computeMessageHash(pubsubTopic.get(""), message).to0xHex
else:
info "publish",
peerId = shortLog(peer.peerId),
msg_hash = computeMessageHash(pubsubTopic.get(""), message).to0xHex
# i13n POC: adding eligibilityProof to the request
let pushRequest = LightpushRequest(
requestId: generateRequestId(wl.rng), pubSubTopic: pubSubTopic, message: message, eligibilityProof: eligibilityProof
)
debug "in Lightpush client publish: created pushRequest: ", pushRequest
let publishedCount = ?await wl.sendPushRequest(pushRequest, peer)
for obs in wl.publishObservers:
obs.onMessagePublished(pubSubTopic.get(""), message)
return lightpushSuccessResult(publishedCount)
proc publishToAny*(
wl: WakuLightPushClient, pubSubTopic: PubsubTopic, wakuMessage: WakuMessage,
eligibilityproof: Option[EligibilityProof] = none(EligibilityProof)
): Future[WakuLightPushResult] {.async, gcsafe.} =
## This proc is similar to the publish one but in this case
## we don't specify a particular peer and instead we get it from peer manager
var message = wakuMessage
if message.timestamp == 0:
message.timestamp = getNowInNanosecondTime()
info "publishToAny", msg_hash = computeMessageHash(pubsubTopic, message).to0xHex
let peer = wl.peerManager.selectPeer(WakuLightPushCodec).valueOr:
# TODO: check if it is matches the situation - shall we distinguish client side missing peers from server side?
return lighpushErrorResult(
LightPushErrorCode.NO_PEERS_TO_RELAY, "no suitable remote peers"
)
return await wl.publish(some(pubSubTopic), message, eligibilityproof, peer)