Prem Chaitanya Prathi 92ea373520
feat(mix): cover traffic with constant rate
- Integrate ConstantRateCoverTraffic from libp2p mix module with default
  totalSlots = userMessageLimit (or 2) and 10s epoch
- Add --mix-user-message-limit and --mix-disable-spam-protection CLI flags
  with corresponding MixConfBuilder accessors and MixConf fields
- Wrap mixRlnSpamProtection construction so it is skipped when spam
  protection is disabled, with a nil guard in setupSpamProtectionCallbacks
- Add waku/common/option_shims.nim restoring valueOr/withValue templates
  for std/options (removed upstream by results), and import it across
  modules that relied on the old behavior
- Sink chat2mix logs to textlines (stdout) instead of textlines[file] to
  work around a chronicles compile-time macro-eval bug under Nim 2.2.4
- Rename ExtendedKademliaDiscoveryParams -> ExtendedServiceDiscoveryParams
  to match the kad_disco -> service_discovery rename in nim-libp2p
- Bump nim-libp2p to e1bbda4f6 (PR #2243 "cover traffic with constant
  rate") and mix-rln-spam-protection-plugin to 153d0c0 (PR #5 cover
  traffic epoch change support); both pre-libp2p_mix-extraction
- Add simulations/mixnet/check_cover_traffic.sh for monitoring
  mix_cover_* / mix_slot_* metrics, plus per-node cover-traffic configs
2026-05-20 18:30:09 +05:30

82 lines
2.8 KiB
Nim

import chronicles, chronos, results
import std/options
import brokers/broker_context
import
waku/common/option_shims,
waku/node/peer_manager,
waku/waku_core,
waku/waku_lightpush/[common, client, rpc]
import ./[delivery_task, send_processor]
logScope:
topics = "send service lightpush processor"
type LightpushSendProcessor* = ref object of BaseSendProcessor
peerManager: PeerManager
lightpushClient: WakuLightPushClient
proc new*(
T: typedesc[LightpushSendProcessor],
peerManager: PeerManager,
lightpushClient: WakuLightPushClient,
brokerCtx: BrokerContext,
): T =
return
T(peerManager: peerManager, lightpushClient: lightpushClient, brokerCtx: brokerCtx)
proc isLightpushPeerAvailable(
self: LightpushSendProcessor, pubsubTopic: PubsubTopic
): bool =
return self.peerManager.selectPeer(WakuLightPushCodec, some(pubsubTopic)).isSome()
method isValidProcessor*(
self: LightpushSendProcessor, task: DeliveryTask
): bool {.gcsafe.} =
return self.isLightpushPeerAvailable(task.pubsubTopic)
method sendImpl*(
self: LightpushSendProcessor, task: DeliveryTask
): Future[void] {.async.} =
task.tryCount.inc()
info "Trying message delivery via Lightpush",
requestId = task.requestId,
msgHash = task.msgHash.to0xHex(),
tryCount = task.tryCount
let peer = self.peerManager.selectPeer(WakuLightPushCodec, some(task.pubsubTopic)).valueOr:
debug "No peer available for Lightpush, request pushed back for next round",
requestId = task.requestId
task.state = DeliveryState.NextRoundRetry
return
let numLightpushServers = (
await self.lightpushClient.publish(some(task.pubsubTopic), task.msg, peer)
).valueOr:
error "LightpushSendProcessor.sendImpl failed", error = error.desc.get($error.code)
case error.code
of LightPushErrorCode.NO_PEERS_TO_RELAY, LightPushErrorCode.TOO_MANY_REQUESTS,
LightPushErrorCode.OUT_OF_RLN_PROOF, LightPushErrorCode.SERVICE_NOT_AVAILABLE,
LightPushErrorCode.INTERNAL_SERVER_ERROR:
task.state = DeliveryState.NextRoundRetry
else:
# the message is malformed, send error
task.state = DeliveryState.FailedToDeliver
task.errorDesc = error.desc.get($error.code)
task.deliveryTime = Moment.now()
return
if numLightpushServers > 0:
info "Message propagated via Lightpush",
requestId = task.requestId, msgHash = task.msgHash.to0xHex()
task.state = DeliveryState.SuccessfullyPropagated
task.deliveryTime = Moment.now()
# TODO: with a simple retry processor it might be more accurate to say `Sent`
else:
# Controversial state, publish says ok but no peer. It should not happen.
debug "Lightpush publish returned zero peers, request pushed back for next round",
requestId = task.requestId
task.state = DeliveryState.NextRoundRetry
return