mirror of
https://github.com/logos-messaging/logos-delivery.git
synced 2026-06-06 22:19:30 +00:00
Combines five dep-and-build changes that all flow from the libp2p v2.0.0
upgrade and the move to the extracted libp2p_mix / mix-rln plugin stack:
waku.nimble:
* libp2p: ff8d51857 -> c43199378 (release/v2.0.0 tip; sha-pinned until
vacp2p cuts a v2.0.0 tag).
* Drop the bare `zlib < 0.2` cap — no longer needed by the upgraded
libp2p.
* websock: bare ">= 0.4.0" — replaces the d4cd68b URL+SHA workaround
that pinned through a libp2p commit-specific websock SHA.
* nim-json-rpc: switch to chaitanyaprem/nim-json-rpc#f05fad25 — relaxes
websock cap to allow >=0.4.0. TODO: revert to status-im/nim-json-rpc
once status-im/nim-json-rpc#277 merges and a tag is cut.
* lsquic: bare ">= 0.4.1" (drops URL form).
* Add mix-rln-spam-protection-plugin pin (23b278b4) and nim-libp2p-mix
pin (50c4ab4f — PR #14 HEAD); the plugin pins the same libp2p_mix
SHA so the diamond dep collapses to a single source.
waku/factory/waku.nim:
* Explicit HPService.setup(switch) / AutonatService.setup(switch)
calls. libp2p v2.0.0's Service lifecycle refactor (libp2p#2462)
removed switch.start's auto-setup loop, so any caller that assigns
directly to switch.services (we do) is responsible for calling
setup() themselves. Without it, AutonatService.addressMapper stays
nil and peerInfo.expandAddrs SIGSEGVs during start(). Wrapped in
try/except for ServiceSetupError so a setup failure surfaces as a
logged error rather than a crash.
Build / scripts:
* scripts/build_rln_mix.sh removed and Makefile simplified — librln
is now a single shared archive built from zerokit's `stateless`
features (no separate librln_mix archive).
* simulations/mixnet/build_setup.sh + setup_credentials.nim updated
to use librln_v2.0.2.a directly and run RLN keystore setup before
nodes start.
Validated:
* Cold local-cache nimble setup --localdeps -y.
* wakunode2 and chat2mix link cleanly.
* Mixnet roundtrip sim: [PASS] bob received message from alice.
* RLN proof generation + verification on every in-path mix node:
5 gen_called == 5 verified, 0 SPAM_PROOF_* errors.
754 lines
24 KiB
Nim
754 lines
24 KiB
Nim
## Waku Relay module. Thin layer on top of GossipSub.
|
||
##
|
||
## See https://github.com/vacp2p/specs/blob/master/specs/waku/v2/waku-relay.md
|
||
## for spec.
|
||
{.push raises: [].}
|
||
|
||
import
|
||
std/[strformat, strutils, sets],
|
||
stew/byteutils,
|
||
results,
|
||
sequtils,
|
||
chronos,
|
||
chronicles,
|
||
metrics,
|
||
libp2p/multihash,
|
||
libp2p/crypto/rng,
|
||
libp2p/protocols/pubsub/gossipsub,
|
||
libp2p/protocols/pubsub/rpc/messages,
|
||
libp2p/stream/connection,
|
||
libp2p/switch,
|
||
brokers/broker_context
|
||
|
||
import
|
||
waku/waku_core,
|
||
waku/node/health_monitor/topic_health,
|
||
waku/requests/health_requests,
|
||
waku/events/health_events,
|
||
./message_id,
|
||
waku/events/peer_events
|
||
|
||
from waku/waku_core/codecs import WakuRelayCodec
|
||
export WakuRelayCodec
|
||
|
||
type ShardMetrics = object
|
||
count: float64
|
||
sizeSum: float64
|
||
avgSize: float64
|
||
maxSize: float64
|
||
|
||
logScope:
|
||
topics = "waku relay"
|
||
|
||
declareCounter waku_relay_network_bytes,
|
||
"total traffic per topic, distinct gross/net and direction",
|
||
labels = ["topic", "type", "direction"]
|
||
|
||
declarePublicGauge(
|
||
waku_relay_total_msg_bytes_per_shard,
|
||
"total length of messages seen per shard",
|
||
labels = ["shard"],
|
||
)
|
||
|
||
declarePublicGauge(
|
||
waku_relay_max_msg_bytes_per_shard,
|
||
"Maximum length of messages seen per shard",
|
||
labels = ["shard"],
|
||
)
|
||
|
||
declarePublicGauge(
|
||
waku_relay_avg_msg_bytes_per_shard,
|
||
"Average length of messages seen per shard",
|
||
labels = ["shard"],
|
||
)
|
||
|
||
# see: https://github.com/libp2p/specs/blob/master/pubsub/gossipsub/gossipsub-v1.1.md#overview-of-new-parameters
|
||
const TopicParameters = TopicParams(
|
||
topicWeight: 1,
|
||
|
||
# p1: favours peers already in the mesh
|
||
timeInMeshWeight: 0.01,
|
||
timeInMeshQuantum: 1.seconds,
|
||
timeInMeshCap: 10.0,
|
||
|
||
# p2: rewards fast peers
|
||
firstMessageDeliveriesWeight: 1.0,
|
||
firstMessageDeliveriesDecay: 0.5,
|
||
firstMessageDeliveriesCap: 10.0,
|
||
|
||
# p3: penalizes lazy peers. safe low value
|
||
meshMessageDeliveriesWeight: 0.0,
|
||
meshMessageDeliveriesDecay: 0.0,
|
||
meshMessageDeliveriesCap: 0,
|
||
meshMessageDeliveriesThreshold: 0,
|
||
meshMessageDeliveriesWindow: 0.milliseconds,
|
||
meshMessageDeliveriesActivation: 0.seconds,
|
||
|
||
# p3b: tracks history of prunes
|
||
meshFailurePenaltyWeight: 0.0,
|
||
meshFailurePenaltyDecay: 0.0,
|
||
|
||
# p4: penalizes invalid messages. highly penalize
|
||
# peers sending wrong messages
|
||
invalidMessageDeliveriesWeight: -100.0,
|
||
invalidMessageDeliveriesDecay: 0.5,
|
||
)
|
||
|
||
# see: https://rfc.vac.dev/spec/29/#gossipsub-v10-parameters
|
||
const GossipsubParameters = GossipSubParams.init(
|
||
pruneBackoff = chronos.minutes(1),
|
||
unsubscribeBackoff = chronos.seconds(5),
|
||
floodPublish = true,
|
||
gossipFactor = 0.25,
|
||
d = 6,
|
||
dLow = 4,
|
||
dHigh = 8,
|
||
dScore = 6,
|
||
dOut = 3,
|
||
dLazy = 6,
|
||
heartbeatInterval = chronos.seconds(1),
|
||
historyLength = 6,
|
||
historyGossip = 3,
|
||
fanoutTTL = chronos.minutes(1),
|
||
seenTTL = chronos.minutes(2),
|
||
|
||
# no gossip is sent to peers below this score
|
||
gossipThreshold = -100,
|
||
|
||
# no self-published msgs are sent to peers below this score
|
||
publishThreshold = -1000,
|
||
|
||
# used to trigger disconnections + ignore peer if below this score
|
||
graylistThreshold = -10000,
|
||
|
||
# grafts better peers if the mesh median score drops below this. unset.
|
||
opportunisticGraftThreshold = 0,
|
||
|
||
# how often peer scoring is updated
|
||
decayInterval = chronos.seconds(12),
|
||
|
||
# below this we consider the parameter to be zero
|
||
decayToZero = 0.01,
|
||
|
||
# remember peer score during x after it disconnects
|
||
retainScore = chronos.minutes(10),
|
||
|
||
# p5: application specific, unset
|
||
appSpecificWeight = 0.0,
|
||
|
||
# p6: penalizes peers sharing more than threshold ips
|
||
ipColocationFactorWeight = -50.0,
|
||
ipColocationFactorThreshold = 5.0,
|
||
|
||
# p7: penalizes bad behaviour (weight and decay)
|
||
behaviourPenaltyWeight = -10.0,
|
||
behaviourPenaltyDecay = 0.986,
|
||
|
||
# triggers disconnections of bad peers aka score <graylistThreshold
|
||
disconnectBadPeers = true,
|
||
)
|
||
|
||
type
|
||
WakuRelayResult*[T] = Result[T, string]
|
||
WakuRelayHandler* = proc(pubsubTopic: PubsubTopic, message: WakuMessage): Future[void] {.
|
||
gcsafe, raises: [Defect]
|
||
.}
|
||
WakuValidatorHandler* = proc(
|
||
pubsubTopic: PubsubTopic, message: WakuMessage
|
||
): Future[ValidationResult] {.gcsafe, raises: [Defect].}
|
||
WakuRelay* = ref object of GossipSub
|
||
brokerCtx: BrokerContext
|
||
peerEventListener: WakuPeerEventListener
|
||
# seq of tuples: the first entry in the tuple contains the validators are called for every topic
|
||
# the second entry contains the error messages to be returned when the validator fails
|
||
wakuValidators: seq[tuple[handler: WakuValidatorHandler, errorMessage: string]]
|
||
# a map of validators to error messages to return when validation fails
|
||
topicValidator: Table[PubsubTopic, ValidatorHandler]
|
||
# map topic with its assigned validator within pubsub
|
||
topicHandlers: Table[PubsubTopic, TopicHandler]
|
||
# map topic with the TopicHandler proc in charge of attending topic's incoming message events
|
||
topicsHealth*: Table[string, TopicHealth]
|
||
onTopicHealthChange*: TopicHealthChangeHandler
|
||
topicHealthLoopHandle*: Future[void]
|
||
topicHealthUpdateEvent: AsyncEvent
|
||
topicHealthDirty: HashSet[string]
|
||
# list of topics that need their health updated in the update event
|
||
topicHealthCheckAll: bool
|
||
# true if all topics need to have their health status refreshed in the update event
|
||
msgMetricsPerShard*: Table[string, ShardMetrics]
|
||
|
||
# predefinition for more detailed results from publishing new message
|
||
type PublishOutcome* {.pure.} = enum
|
||
NoTopicSpecified
|
||
DuplicateMessage
|
||
NoPeersToPublish
|
||
CannotGenerateMessageId
|
||
|
||
proc initProtocolHandler(w: WakuRelay) =
|
||
proc handler(conn: Connection, proto: string) {.async: (raises: [CancelledError]).} =
|
||
## main protocol handler that gets triggered on every
|
||
## connection for a protocol string
|
||
## e.g. ``/wakusub/0.0.1``, etc...
|
||
info "Incoming WakuRelay connection", connection = conn, protocol = proto
|
||
|
||
try:
|
||
await w.handleConn(conn, proto)
|
||
except CancelledError:
|
||
# This is top-level procedure which will work as separate task, so it
|
||
# do not need to propogate CancelledError.
|
||
error "Unexpected cancellation in relay handler",
|
||
conn = conn, error = getCurrentExceptionMsg()
|
||
except CatchableError:
|
||
error "WakuRelay handler leaks an error",
|
||
conn = conn, error = getCurrentExceptionMsg()
|
||
|
||
# XXX: Handler hijack GossipSub here?
|
||
w.handler = handler
|
||
w.codec = WakuRelayCodec
|
||
|
||
proc logMessageInfo*(
|
||
w: WakuRelay,
|
||
remotePeerId: string,
|
||
topic: string,
|
||
msg_id_short: string,
|
||
msg: WakuMessage,
|
||
onRecv: bool,
|
||
) =
|
||
let msg_hash = computeMessageHash(topic, msg).to0xHex()
|
||
let payloadSize = float64(msg.payload.len)
|
||
|
||
if onRecv:
|
||
debug "received relay message",
|
||
my_peer_id = w.switch.peerInfo.peerId,
|
||
msg_hash = msg_hash,
|
||
msg_id = msg_id_short,
|
||
from_peer_id = remotePeerId,
|
||
topic = topic,
|
||
contentTopic = msg.contentTopic,
|
||
receivedTime = getNowInNanosecondTime(),
|
||
payloadSizeBytes = payloadSize
|
||
else:
|
||
debug "sent relay message",
|
||
my_peer_id = w.switch.peerInfo.peerId,
|
||
msg_hash = msg_hash,
|
||
msg_id = msg_id_short,
|
||
to_peer_id = remotePeerId,
|
||
topic = topic,
|
||
contentTopic = msg.contentTopic,
|
||
sentTime = getNowInNanosecondTime(),
|
||
payloadSizeBytes = payloadSize
|
||
|
||
var shardMetrics = w.msgMetricsPerShard.getOrDefault(topic, ShardMetrics())
|
||
shardMetrics.count += 1
|
||
shardMetrics.sizeSum += payloadSize
|
||
if payloadSize > shardMetrics.maxSize:
|
||
shardMetrics.maxSize = payloadSize
|
||
shardMetrics.avgSize = shardMetrics.sizeSum / shardMetrics.count
|
||
w.msgMetricsPerShard[topic] = shardMetrics
|
||
|
||
waku_relay_max_msg_bytes_per_shard.set(shardMetrics.maxSize, labelValues = [topic])
|
||
|
||
waku_relay_avg_msg_bytes_per_shard.set(shardMetrics.avgSize, labelValues = [topic])
|
||
|
||
waku_relay_total_msg_bytes_per_shard.set(shardMetrics.sizeSum, labelValues = [topic])
|
||
|
||
proc initRelayObservers(w: WakuRelay) =
|
||
proc decodeRpcMessageInfo(
|
||
peer: PubSubPeer, msg: Message
|
||
): Result[
|
||
tuple[msgId: string, topic: string, wakuMessage: WakuMessage, msgSize: int], void
|
||
] =
|
||
let msg_id = w.msgIdProvider(msg).valueOr:
|
||
warn "Error generating message id",
|
||
my_peer_id = w.switch.peerInfo.peerId,
|
||
from_peer_id = peer.peerId,
|
||
pubsub_topic = msg.topic,
|
||
error = $error
|
||
return err()
|
||
|
||
let msg_id_short = shortLog(msg_id)
|
||
|
||
let wakuMessage = WakuMessage.decode(msg.data).valueOr:
|
||
warn "Error decoding to Waku Message",
|
||
my_peer_id = w.switch.peerInfo.peerId,
|
||
msg_id = msg_id_short,
|
||
from_peer_id = peer.peerId,
|
||
pubsub_topic = msg.topic,
|
||
error = $error
|
||
return err()
|
||
|
||
let msgSize = msg.data.len + msg.topic.len
|
||
return ok((msg_id_short, msg.topic, wakuMessage, msgSize))
|
||
|
||
proc updateMetrics(
|
||
peer: PubSubPeer,
|
||
pubsub_topic: string,
|
||
msg: WakuMessage,
|
||
msgSize: int,
|
||
onRecv: bool,
|
||
) =
|
||
if onRecv:
|
||
waku_relay_network_bytes.inc(
|
||
msgSize.int64, labelValues = [pubsub_topic, "gross", "in"]
|
||
)
|
||
else:
|
||
# sent traffic can only be "net"
|
||
# TODO: If we can measure unsuccessful sends would mean a possible distinction between gross/net
|
||
waku_relay_network_bytes.inc(
|
||
msgSize.int64, labelValues = [pubsub_topic, "net", "out"]
|
||
)
|
||
|
||
proc onRecv(peer: PubSubPeer, msgs: var RPCMsg) =
|
||
if msgs.control.isSome():
|
||
let ctrl = msgs.control.get()
|
||
var topicsChanged = false
|
||
|
||
for graft in ctrl.graft:
|
||
w.topicHealthDirty.incl(graft.topicID)
|
||
topicsChanged = true
|
||
|
||
for prune in ctrl.prune:
|
||
w.topicHealthDirty.incl(prune.topicID)
|
||
topicsChanged = true
|
||
|
||
if topicsChanged:
|
||
w.topicHealthUpdateEvent.fire()
|
||
|
||
for msg in msgs.messages:
|
||
let (msg_id_short, topic, wakuMessage, msgSize) = decodeRpcMessageInfo(peer, msg).valueOr:
|
||
continue
|
||
# message receive log happens in onValidated observer as onRecv is called before checks
|
||
updateMetrics(peer, topic, wakuMessage, msgSize, onRecv = true)
|
||
discard
|
||
|
||
proc onValidated(peer: PubSubPeer, msg: Message, msgId: MessageId) =
|
||
let msg_id_short = shortLog(msgId)
|
||
let wakuMessage = WakuMessage.decode(msg.data).valueOr:
|
||
warn "onValidated: failed decoding to Waku Message",
|
||
my_peer_id = w.switch.peerInfo.peerId,
|
||
msg_id = msg_id_short,
|
||
from_peer_id = peer.peerId,
|
||
pubsub_topic = msg.topic,
|
||
error = $error
|
||
return
|
||
|
||
logMessageInfo(
|
||
w, shortLog(peer.peerId), msg.topic, msg_id_short, wakuMessage, onRecv = true
|
||
)
|
||
|
||
proc onSend(peer: PubSubPeer, msgs: var RPCMsg) =
|
||
for msg in msgs.messages:
|
||
let (msg_id_short, topic, wakuMessage, msgSize) = decodeRpcMessageInfo(peer, msg).valueOr:
|
||
warn "onSend: failed decoding RPC info",
|
||
my_peer_id = w.switch.peerInfo.peerId, to_peer_id = peer.peerId
|
||
continue
|
||
logMessageInfo(
|
||
w, shortLog(peer.peerId), topic, msg_id_short, wakuMessage, onRecv = false
|
||
)
|
||
updateMetrics(peer, topic, wakuMessage, msgSize, onRecv = false)
|
||
|
||
let administrativeObserver =
|
||
PubSubObserver(onRecv: onRecv, onSend: onSend, onValidated: onValidated)
|
||
|
||
w.addObserver(administrativeObserver)
|
||
|
||
proc new*(
|
||
T: type WakuRelay, switch: Switch, maxMessageSize = int(DefaultMaxWakuMessageSize)
|
||
): WakuRelayResult[T] =
|
||
## maxMessageSize: max num bytes that are allowed for the WakuMessage
|
||
|
||
var w: WakuRelay
|
||
try:
|
||
w = WakuRelay.init(
|
||
switch = switch,
|
||
anonymize = true,
|
||
verifySignature = false,
|
||
sign = false,
|
||
triggerSelf = true,
|
||
msgIdProvider = defaultMessageIdProvider,
|
||
maxMessageSize = maxMessageSize,
|
||
# libp2p 1.15.3 made `rng` a required parameter of PubSub.init.
|
||
rng = newRng(),
|
||
parameters = GossipsubParameters,
|
||
)
|
||
w.brokerCtx = globalBrokerContext()
|
||
|
||
procCall GossipSub(w).initPubSub()
|
||
w.topicsHealth = initTable[string, TopicHealth]()
|
||
w.topicHealthUpdateEvent = newAsyncEvent()
|
||
w.topicHealthDirty = initHashSet[string]()
|
||
w.topicHealthCheckAll = false
|
||
w.initProtocolHandler()
|
||
w.initRelayObservers()
|
||
|
||
w.peerEventListener = WakuPeerEvent.listen(
|
||
w.brokerCtx,
|
||
proc(evt: WakuPeerEvent): Future[void] {.async: (raises: []), gcsafe.} =
|
||
if evt.kind == WakuPeerEventKind.EventDisconnected:
|
||
w.topicHealthCheckAll = true
|
||
w.topicHealthUpdateEvent.fire()
|
||
,
|
||
).valueOr:
|
||
return err("Failed to subscribe to peer events: " & error)
|
||
except InitializationError:
|
||
return err("initialization error: " & getCurrentExceptionMsg())
|
||
|
||
return ok(w)
|
||
|
||
proc addValidator*(
|
||
w: WakuRelay, handler: WakuValidatorHandler, errorMessage: string = ""
|
||
) {.gcsafe.} =
|
||
w.wakuValidators.add((handler, errorMessage))
|
||
|
||
proc addObserver*(w: WakuRelay, observer: PubSubObserver) {.gcsafe.} =
|
||
## Observes when a message is sent/received from the GossipSub PoV
|
||
procCall GossipSub(w).addObserver(observer)
|
||
|
||
proc getDHigh*(T: type WakuRelay): int =
|
||
return GossipsubParameters.dHigh
|
||
|
||
proc getPubSubPeersInMesh*(
|
||
w: WakuRelay, pubsubTopic: PubsubTopic
|
||
): Result[HashSet[PubSubPeer], string] =
|
||
## Returns the list of PubSubPeers in a mesh defined by the passed pubsub topic.
|
||
## The 'mesh' atribute is defined in the GossipSub ref object.
|
||
|
||
# If pubsubTopic is empty, we return all peers in mesh for any pubsub topic
|
||
if pubsubTopic == "":
|
||
var allPeers = initHashSet[PubSubPeer]()
|
||
for topic, topicMesh in w.mesh.pairs:
|
||
allPeers = allPeers.union(topicMesh)
|
||
return ok(allPeers)
|
||
|
||
if not w.mesh.hasKey(pubsubTopic):
|
||
info "getPubSubPeersInMesh - there is no mesh peer for the given pubsub topic",
|
||
pubsubTopic = pubsubTopic
|
||
return ok(initHashSet[PubSubPeer]())
|
||
|
||
let peersRes = catch:
|
||
w.mesh[pubsubTopic]
|
||
|
||
let peers: HashSet[PubSubPeer] = peersRes.valueOr:
|
||
return err(
|
||
"getPubSubPeersInMesh - exception accessing " & pubsubTopic & ": " & error.msg
|
||
)
|
||
|
||
return ok(peers)
|
||
|
||
proc getPeersInMesh*(
|
||
w: WakuRelay, pubsubTopic: PubsubTopic = ""
|
||
): Result[seq[PeerId], string] =
|
||
## Returns the list of peerIds in a mesh defined by the passed pubsub topic.
|
||
## The 'mesh' atribute is defined in the GossipSub ref object.
|
||
let pubSubPeers = ?w.getPubSubPeersInMesh(pubsubTopic)
|
||
let peerIds = toSeq(pubSubPeers).mapIt(it.peerId)
|
||
|
||
return ok(peerIds)
|
||
|
||
proc getNumPeersInMesh*(w: WakuRelay, pubsubTopic: PubsubTopic): Result[int, string] =
|
||
## Returns the number of peers in a mesh defined by the passed pubsub topic.
|
||
|
||
let peers = w.getPubSubPeersInMesh(pubsubTopic).valueOr:
|
||
return err(
|
||
"getNumPeersInMesh - failed retrieving peers in mesh: " & pubsubTopic & ": " &
|
||
error
|
||
)
|
||
|
||
return ok(peers.len)
|
||
|
||
proc calculateTopicHealth(wakuRelay: WakuRelay, topic: string): TopicHealth =
|
||
let numPeersInMesh = wakuRelay.getNumPeersInMesh(topic).valueOr:
|
||
error "Could not calculate topic health", topic = topic, error = error
|
||
return TopicHealth.UNHEALTHY
|
||
|
||
if numPeersInMesh < 1:
|
||
return TopicHealth.UNHEALTHY
|
||
elif numPeersInMesh < wakuRelay.parameters.dLow:
|
||
return TopicHealth.MINIMALLY_HEALTHY
|
||
return TopicHealth.SUFFICIENTLY_HEALTHY
|
||
|
||
proc isSubscribed*(w: WakuRelay, topic: PubsubTopic): bool =
|
||
GossipSub(w).topics.hasKey(topic)
|
||
|
||
proc subscribedTopics*(w: WakuRelay): seq[PubsubTopic] =
|
||
return toSeq(GossipSub(w).topics.keys())
|
||
|
||
proc topicsHealthLoop(w: WakuRelay) {.async.} =
|
||
while true:
|
||
await w.topicHealthUpdateEvent.wait()
|
||
w.topicHealthUpdateEvent.clear()
|
||
|
||
var topicsToCheck: seq[string]
|
||
|
||
if w.topicHealthCheckAll:
|
||
topicsToCheck = toSeq(w.topics.keys)
|
||
else:
|
||
topicsToCheck = toSeq(w.topicHealthDirty)
|
||
|
||
w.topicHealthCheckAll = false
|
||
w.topicHealthDirty.clear()
|
||
|
||
var futs = newSeq[Future[void]]()
|
||
|
||
for topic in topicsToCheck:
|
||
# guard against topic being unsubscribed since fire()
|
||
if not w.isSubscribed(topic):
|
||
continue
|
||
|
||
let
|
||
oldHealth = w.topicsHealth.getOrDefault(topic, TopicHealth.UNHEALTHY)
|
||
currentHealth = w.calculateTopicHealth(topic)
|
||
|
||
if oldHealth == currentHealth:
|
||
continue
|
||
|
||
w.topicsHealth[topic] = currentHealth
|
||
|
||
EventShardTopicHealthChange.emit(w.brokerCtx, topic, currentHealth)
|
||
|
||
if not w.onTopicHealthChange.isNil():
|
||
futs.add(w.onTopicHealthChange(topic, currentHealth))
|
||
|
||
if futs.len() > 0:
|
||
try:
|
||
discard await allFinished(futs)
|
||
except CancelledError:
|
||
break
|
||
except CatchableError as e:
|
||
warn "Error in topic health callback", error = e.msg
|
||
|
||
# safety cooldown to protect from edge cases
|
||
await sleepAsync(100.milliseconds)
|
||
|
||
method start*(w: WakuRelay) {.async: (raises: [CancelledError]).} =
|
||
info "start"
|
||
await procCall GossipSub(w).start()
|
||
w.topicHealthLoopHandle = w.topicsHealthLoop()
|
||
|
||
method stop*(w: WakuRelay) {.async: (raises: []).} =
|
||
info "stop"
|
||
await procCall GossipSub(w).stop()
|
||
|
||
await WakuPeerEvent.dropListener(w.brokerCtx, w.peerEventListener)
|
||
|
||
if not w.topicHealthLoopHandle.isNil():
|
||
await w.topicHealthLoopHandle.cancelAndWait()
|
||
|
||
proc generateOrderedValidator(w: WakuRelay): ValidatorHandler {.gcsafe.} =
|
||
# rejects messages that are not WakuMessage
|
||
let wrappedValidator = proc(
|
||
pubsubTopic: string, message: messages.Message
|
||
): Future[ValidationResult] {.async.} =
|
||
# can be optimized by checking if the message is a WakuMessage without allocating memory
|
||
# see nim-libp2p protobuf library
|
||
let msg = WakuMessage.decode(message.data).valueOr:
|
||
error "protocol generateOrderedValidator reject decode error",
|
||
pubsubTopic = pubsubTopic, error = $error
|
||
return ValidationResult.Reject
|
||
|
||
# now sequentially validate the message
|
||
for (validator, errorMessage) in w.wakuValidators:
|
||
let validatorRes = await validator(pubsubTopic, msg)
|
||
|
||
if validatorRes != ValidationResult.Accept:
|
||
let msgHash = computeMessageHash(pubsubTopic, msg).to0xHex()
|
||
error "protocol generateOrderedValidator reject waku validator",
|
||
msg_hash = msgHash,
|
||
pubsubTopic = pubsubTopic,
|
||
contentTopic = msg.contentTopic,
|
||
validatorRes = validatorRes,
|
||
error = errorMessage
|
||
|
||
return validatorRes
|
||
|
||
return ValidationResult.Accept
|
||
|
||
return wrappedValidator
|
||
|
||
proc validateMessage*(
|
||
w: WakuRelay, pubsubTopic: string, msg: WakuMessage
|
||
): Future[Result[void, string]] {.async.} =
|
||
let messageSizeBytes = msg.encode().buffer.len
|
||
let msgHash = computeMessageHash(pubsubTopic, msg).to0xHex()
|
||
|
||
if messageSizeBytes > w.maxMessageSize:
|
||
let message = fmt"Message size exceeded maximum of {w.maxMessageSize} bytes"
|
||
error "too large Waku message",
|
||
msg_hash = msgHash,
|
||
error = message,
|
||
messageSizeBytes = messageSizeBytes,
|
||
maxMessageSize = w.maxMessageSize
|
||
|
||
return err(message)
|
||
|
||
for (validator, message) in w.wakuValidators:
|
||
let validatorRes = await validator(pubsubTopic, msg)
|
||
if validatorRes != ValidationResult.Accept:
|
||
if message.len > 0:
|
||
error "invalid Waku message", msg_hash = msgHash, error = message
|
||
return err(message)
|
||
else:
|
||
## This should never happen
|
||
error "uncertain invalid Waku message", msg_hash = msgHash, error = message
|
||
return err("validator failed")
|
||
|
||
return ok()
|
||
|
||
proc subscribe*(w: WakuRelay, pubsubTopic: PubsubTopic, handler: WakuRelayHandler) =
|
||
info "subscribe", pubsubTopic = pubsubTopic
|
||
|
||
# We need to wrap the handler since gossipsub doesnt understand WakuMessage
|
||
let topicHandler = proc(
|
||
pubsubTopic: string, data: seq[byte]
|
||
): Future[void] {.gcsafe, raises: [].} =
|
||
let decMsg = WakuMessage.decode(data).valueOr:
|
||
# fine if triggerSelf enabled, since validators are bypassed
|
||
error "failed to decode WakuMessage, validator passed a wrong message",
|
||
pubsubTopic = pubsubTopic, error = error
|
||
let fut = newFuture[void]()
|
||
fut.complete()
|
||
return fut
|
||
# this subscription handler is called once for every validated message
|
||
# that will be relayed, hence this is the place we can count net incoming traffic
|
||
waku_relay_network_bytes.inc(
|
||
data.len.int64 + pubsubTopic.len.int64, labelValues = [pubsubTopic, "net", "in"]
|
||
)
|
||
|
||
return handler(pubsubTopic, decMsg)
|
||
|
||
# Add the ordered validator to the topic
|
||
# This assumes that if `w.validatorInserted.hasKey(pubSubTopic) is true`, it contains the ordered validator.
|
||
# Otherwise this might lead to unintended behaviour.
|
||
if not w.topicValidator.hasKey(pubSubTopic):
|
||
let newValidator = w.generateOrderedValidator()
|
||
procCall GossipSub(w).addValidator(pubSubTopic, newValidator)
|
||
w.topicValidator[pubSubTopic] = newValidator
|
||
|
||
# set this topic parameters for scoring
|
||
w.topicParams[pubsubTopic] = TopicParameters
|
||
|
||
# subscribe to the topic with our wrapped handler
|
||
procCall GossipSub(w).subscribe(pubsubTopic, topicHandler)
|
||
|
||
w.topicHandlers[pubsubTopic] = topicHandler
|
||
w.topicHealthDirty.incl(pubsubTopic)
|
||
w.topicHealthUpdateEvent.fire()
|
||
|
||
proc unsubscribeAll*(w: WakuRelay, pubsubTopic: PubsubTopic) =
|
||
## Unsubscribe all handlers on this pubsub topic
|
||
|
||
info "unsubscribe all", pubsubTopic = pubsubTopic
|
||
|
||
procCall GossipSub(w).unsubscribeAll(pubsubTopic)
|
||
w.topicValidator.del(pubsubTopic)
|
||
w.topicHandlers.del(pubsubTopic)
|
||
w.topicsHealth.del(pubsubTopic)
|
||
w.topicHealthDirty.excl(pubsubTopic)
|
||
|
||
proc unsubscribe*(w: WakuRelay, pubsubTopic: PubsubTopic) =
|
||
if not w.topicValidator.hasKey(pubsubTopic):
|
||
error "unsubscribe no validator for this topic", pubsubTopic
|
||
return
|
||
|
||
if not w.topicHandlers.hasKey(pubsubTopic):
|
||
error "not subscribed to the given topic", pubsubTopic
|
||
return
|
||
|
||
var topicHandler: TopicHandler
|
||
var topicValidator: ValidatorHandler
|
||
try:
|
||
topicHandler = w.topicHandlers[pubsubTopic]
|
||
topicValidator = w.topicValidator[pubsubTopic]
|
||
except KeyError:
|
||
error "exception in unsubscribe", pubsubTopic, error = getCurrentExceptionMsg()
|
||
return
|
||
|
||
info "unsubscribe", pubsubTopic
|
||
procCall GossipSub(w).unsubscribe(pubsubTopic, topicHandler)
|
||
procCall GossipSub(w).removeValidator(pubsubTopic, topicValidator)
|
||
|
||
w.topicValidator.del(pubsubTopic)
|
||
w.topicHandlers.del(pubsubTopic)
|
||
w.topicsHealth.del(pubsubTopic)
|
||
w.topicHealthDirty.excl(pubsubTopic)
|
||
|
||
proc publish*(
|
||
w: WakuRelay, pubsubTopic: PubsubTopic, wakuMessage: WakuMessage
|
||
): Future[Result[int, PublishOutcome]] {.async.} =
|
||
if pubsubTopic.isEmptyOrWhitespace():
|
||
return err(NoTopicSpecified)
|
||
|
||
var message = wakuMessage
|
||
if message.timestamp == 0:
|
||
message.timestamp = getNowInNanosecondTime()
|
||
|
||
let data = message.encode().buffer
|
||
|
||
let msgHash = computeMessageHash(pubsubTopic, message).to0xHex()
|
||
notice "start publish Waku message",
|
||
msg_hash = msgHash, pubsubTopic = pubsubTopic, contentTopic = message.contentTopic
|
||
|
||
let relayedPeerCount = await procCall GossipSub(w).publish(pubsubTopic, data)
|
||
|
||
if relayedPeerCount <= 0:
|
||
return err(NoPeersToPublish)
|
||
|
||
return ok(relayedPeerCount)
|
||
|
||
proc getConnectedPubSubPeers*(
|
||
w: WakuRelay, pubsubTopic: PubsubTopic
|
||
): Result[HashSet[PubsubPeer], string] =
|
||
## Returns the list of peerIds of connected peers and subscribed to the passed pubsub topic.
|
||
## The 'gossipsub' atribute is defined in the GossipSub ref object.
|
||
|
||
if pubsubTopic == "":
|
||
## Return all the connected peers
|
||
var peerIds = initHashSet[PubsubPeer]()
|
||
for k, v in w.gossipsub:
|
||
peerIds = peerIds + v
|
||
return ok(peerIds)
|
||
|
||
if not w.gossipsub.hasKey(pubsubTopic):
|
||
return err(
|
||
"getConnectedPeers - there is no gossipsub peer for the given pubsub topic: " &
|
||
pubsubTopic
|
||
)
|
||
|
||
let peersRes = catch:
|
||
w.gossipsub[pubsubTopic]
|
||
|
||
let peers: HashSet[PubSubPeer] = peersRes.valueOr:
|
||
return
|
||
err("getConnectedPeers - exception accessing " & pubsubTopic & ": " & error.msg)
|
||
|
||
return ok(peers)
|
||
|
||
proc getConnectedPeers*(
|
||
w: WakuRelay, pubsubTopic: PubsubTopic
|
||
): Result[seq[PeerId], string] =
|
||
## Returns the list of peerIds of connected peers and subscribed to the passed pubsub topic.
|
||
## The 'gossipsub' atribute is defined in the GossipSub ref object.
|
||
|
||
let peers = ?w.getConnectedPubSubPeers(pubsubTopic)
|
||
|
||
let peerIds = toSeq(peers).mapIt(it.peerId)
|
||
return ok(peerIds)
|
||
|
||
proc getNumConnectedPeers*(
|
||
w: WakuRelay, pubsubTopic: PubsubTopic
|
||
): Result[int, string] =
|
||
## Returns the number of connected peers and subscribed to the passed pubsub topic.
|
||
|
||
## Return all the connected peers
|
||
let peers = w.getConnectedPubSubPeers(pubsubTopic).valueOr:
|
||
return err(
|
||
"getNumConnectedPeers - failed retrieving peers in mesh: " & pubsubTopic & ": " &
|
||
error
|
||
)
|
||
|
||
return ok(peers.len)
|
||
|
||
proc getSubscribedTopics*(w: WakuRelay): seq[PubsubTopic] =
|
||
## Returns a seq containing the current list of subscribed topics
|
||
return PubSub(w).topics.keys.toSeq().mapIt(cast[PubsubTopic](it))
|