2023-02-27 14:38:24 +00:00
|
|
|
|
## Waku Relay module. Thin layer on top of GossipSub.
|
|
|
|
|
##
|
|
|
|
|
## See https://github.com/vacp2p/specs/blob/master/specs/waku/v2/waku-relay.md
|
|
|
|
|
## for spec.
|
2024-06-28 10:34:57 +00:00
|
|
|
|
{.push raises: [].}
|
2023-02-27 14:38:24 +00:00
|
|
|
|
|
|
|
|
|
import
|
2024-02-01 17:16:10 +00:00
|
|
|
|
std/strformat,
|
2024-07-09 11:14:28 +00:00
|
|
|
|
stew/byteutils,
|
|
|
|
|
results,
|
2023-09-05 09:05:07 +00:00
|
|
|
|
sequtils,
|
2023-02-27 14:38:24 +00:00
|
|
|
|
chronos,
|
|
|
|
|
chronicles,
|
|
|
|
|
metrics,
|
|
|
|
|
libp2p/multihash,
|
|
|
|
|
libp2p/protocols/pubsub/pubsub,
|
|
|
|
|
libp2p/protocols/pubsub/gossipsub,
|
2023-06-06 17:28:47 +00:00
|
|
|
|
libp2p/protocols/pubsub/rpc/messages,
|
2023-02-27 14:38:24 +00:00
|
|
|
|
libp2p/stream/connection,
|
|
|
|
|
libp2p/switch
|
2024-08-27 14:49:46 +00:00
|
|
|
|
import ../waku_core, ./message_id, ../node/delivery_monitor/publish_observer
|
2023-02-27 14:38:24 +00:00
|
|
|
|
|
2024-10-24 12:31:04 +00:00
|
|
|
|
from ../waku_core/codecs import WakuRelayCodec
|
|
|
|
|
export WakuRelayCodec
|
|
|
|
|
|
2023-02-27 14:38:24 +00:00
|
|
|
|
logScope:
|
|
|
|
|
topics = "waku relay"
|
|
|
|
|
|
2023-06-06 17:28:47 +00:00
|
|
|
|
# see: https://github.com/libp2p/specs/blob/master/pubsub/gossipsub/gossipsub-v1.1.md#overview-of-new-parameters
|
|
|
|
|
const TopicParameters = TopicParams(
|
2024-03-15 23:08:47 +00:00
|
|
|
|
topicWeight: 1,
|
|
|
|
|
|
|
|
|
|
# p1: favours peers already in the mesh
|
|
|
|
|
timeInMeshWeight: 0.01,
|
|
|
|
|
timeInMeshQuantum: 1.seconds,
|
|
|
|
|
timeInMeshCap: 10.0,
|
|
|
|
|
|
|
|
|
|
# p2: rewards fast peers
|
|
|
|
|
firstMessageDeliveriesWeight: 1.0,
|
|
|
|
|
firstMessageDeliveriesDecay: 0.5,
|
|
|
|
|
firstMessageDeliveriesCap: 10.0,
|
|
|
|
|
|
|
|
|
|
# p3: penalizes lazy peers. safe low value
|
|
|
|
|
meshMessageDeliveriesWeight: 0.0,
|
|
|
|
|
meshMessageDeliveriesDecay: 0.0,
|
|
|
|
|
meshMessageDeliveriesCap: 0,
|
|
|
|
|
meshMessageDeliveriesThreshold: 0,
|
|
|
|
|
meshMessageDeliveriesWindow: 0.milliseconds,
|
|
|
|
|
meshMessageDeliveriesActivation: 0.seconds,
|
|
|
|
|
|
|
|
|
|
# p3b: tracks history of prunes
|
|
|
|
|
meshFailurePenaltyWeight: 0.0,
|
|
|
|
|
meshFailurePenaltyDecay: 0.0,
|
|
|
|
|
|
|
|
|
|
# p4: penalizes invalid messages. highly penalize
|
|
|
|
|
# peers sending wrong messages
|
|
|
|
|
invalidMessageDeliveriesWeight: -100.0,
|
|
|
|
|
invalidMessageDeliveriesDecay: 0.5,
|
|
|
|
|
)
|
2023-06-06 17:28:47 +00:00
|
|
|
|
|
2024-06-28 00:48:29 +00:00
|
|
|
|
declareCounter waku_relay_network_bytes,
|
2024-08-21 15:10:29 +00:00
|
|
|
|
"total traffic per topic, distinct gross/net and direction",
|
|
|
|
|
labels = ["topic", "type", "direction"]
|
2024-06-28 00:48:29 +00:00
|
|
|
|
|
2023-06-06 17:28:47 +00:00
|
|
|
|
# see: https://rfc.vac.dev/spec/29/#gossipsub-v10-parameters
|
2024-04-09 08:17:46 +00:00
|
|
|
|
const GossipsubParameters = GossipSubParams.init(
|
|
|
|
|
pruneBackoff = chronos.minutes(1),
|
|
|
|
|
unsubscribeBackoff = chronos.seconds(5),
|
|
|
|
|
floodPublish = true,
|
|
|
|
|
gossipFactor = 0.25,
|
|
|
|
|
d = 6,
|
|
|
|
|
dLow = 4,
|
|
|
|
|
dHigh = 8,
|
|
|
|
|
dScore = 6,
|
|
|
|
|
dOut = 3,
|
|
|
|
|
dLazy = 6,
|
|
|
|
|
heartbeatInterval = chronos.seconds(1),
|
|
|
|
|
historyLength = 6,
|
|
|
|
|
historyGossip = 3,
|
|
|
|
|
fanoutTTL = chronos.minutes(1),
|
|
|
|
|
seenTTL = chronos.minutes(2),
|
2024-03-15 23:08:47 +00:00
|
|
|
|
|
|
|
|
|
# no gossip is sent to peers below this score
|
2024-04-09 08:17:46 +00:00
|
|
|
|
gossipThreshold = -100,
|
2024-03-15 23:08:47 +00:00
|
|
|
|
|
|
|
|
|
# no self-published msgs are sent to peers below this score
|
2024-04-09 08:17:46 +00:00
|
|
|
|
publishThreshold = -1000,
|
2024-03-15 23:08:47 +00:00
|
|
|
|
|
|
|
|
|
# used to trigger disconnections + ignore peer if below this score
|
2024-04-09 08:17:46 +00:00
|
|
|
|
graylistThreshold = -10000,
|
2024-03-15 23:08:47 +00:00
|
|
|
|
|
|
|
|
|
# grafts better peers if the mesh median score drops below this. unset.
|
2024-04-09 08:17:46 +00:00
|
|
|
|
opportunisticGraftThreshold = 0,
|
2024-03-15 23:08:47 +00:00
|
|
|
|
|
|
|
|
|
# how often peer scoring is updated
|
2024-04-09 08:17:46 +00:00
|
|
|
|
decayInterval = chronos.seconds(12),
|
2024-03-15 23:08:47 +00:00
|
|
|
|
|
|
|
|
|
# below this we consider the parameter to be zero
|
2024-04-09 08:17:46 +00:00
|
|
|
|
decayToZero = 0.01,
|
2024-03-15 23:08:47 +00:00
|
|
|
|
|
|
|
|
|
# remember peer score during x after it disconnects
|
2024-04-09 08:17:46 +00:00
|
|
|
|
retainScore = chronos.minutes(10),
|
2024-03-15 23:08:47 +00:00
|
|
|
|
|
|
|
|
|
# p5: application specific, unset
|
2024-04-09 08:17:46 +00:00
|
|
|
|
appSpecificWeight = 0.0,
|
2024-03-15 23:08:47 +00:00
|
|
|
|
|
|
|
|
|
# p6: penalizes peers sharing more than threshold ips
|
2024-04-09 08:17:46 +00:00
|
|
|
|
ipColocationFactorWeight = -50.0,
|
|
|
|
|
ipColocationFactorThreshold = 5.0,
|
2024-03-15 23:08:47 +00:00
|
|
|
|
|
|
|
|
|
# p7: penalizes bad behaviour (weight and decay)
|
2024-04-09 08:17:46 +00:00
|
|
|
|
behaviourPenaltyWeight = -10.0,
|
|
|
|
|
behaviourPenaltyDecay = 0.986,
|
2024-03-15 23:08:47 +00:00
|
|
|
|
|
|
|
|
|
# triggers disconnections of bad peers aka score <graylistThreshold
|
2024-04-09 08:17:46 +00:00
|
|
|
|
disconnectBadPeers = true,
|
2024-03-15 23:08:47 +00:00
|
|
|
|
)
|
2023-02-27 14:38:24 +00:00
|
|
|
|
|
|
|
|
|
type
|
2023-06-06 17:28:47 +00:00
|
|
|
|
WakuRelayResult*[T] = Result[T, string]
|
2024-03-15 23:08:47 +00:00
|
|
|
|
WakuRelayHandler* = proc(pubsubTopic: PubsubTopic, message: WakuMessage): Future[void] {.
|
|
|
|
|
gcsafe, raises: [Defect]
|
|
|
|
|
.}
|
|
|
|
|
WakuValidatorHandler* = proc(
|
|
|
|
|
pubsubTopic: PubsubTopic, message: WakuMessage
|
|
|
|
|
): Future[ValidationResult] {.gcsafe, raises: [Defect].}
|
2023-02-27 14:38:24 +00:00
|
|
|
|
WakuRelay* = ref object of GossipSub
|
2024-02-01 17:16:10 +00:00
|
|
|
|
# seq of tuples: the first entry in the tuple contains the validators are called for every topic
|
|
|
|
|
# the second entry contains the error messages to be returned when the validator fails
|
|
|
|
|
wakuValidators: seq[tuple[handler: WakuValidatorHandler, errorMessage: string]]
|
|
|
|
|
# a map of validators to error messages to return when validation fails
|
2023-09-05 09:05:07 +00:00
|
|
|
|
validatorInserted: Table[PubsubTopic, bool]
|
2024-08-27 14:49:46 +00:00
|
|
|
|
publishObservers: seq[PublishObserver]
|
2023-02-27 14:38:24 +00:00
|
|
|
|
|
|
|
|
|
proc initProtocolHandler(w: WakuRelay) =
|
|
|
|
|
proc handler(conn: Connection, proto: string) {.async.} =
|
|
|
|
|
## main protocol handler that gets triggered on every
|
|
|
|
|
## connection for a protocol string
|
|
|
|
|
## e.g. ``/wakusub/0.0.1``, etc...
|
2024-03-15 23:08:47 +00:00
|
|
|
|
debug "Incoming WakuRelay connection", connection = conn, protocol = proto
|
2023-02-27 14:38:24 +00:00
|
|
|
|
|
|
|
|
|
try:
|
|
|
|
|
await w.handleConn(conn, proto)
|
|
|
|
|
except CancelledError:
|
|
|
|
|
# This is top-level procedure which will work as separate task, so it
|
|
|
|
|
# do not need to propogate CancelledError.
|
2024-03-15 23:08:47 +00:00
|
|
|
|
error "Unexpected cancellation in relay handler",
|
|
|
|
|
conn = conn, error = getCurrentExceptionMsg()
|
2023-02-27 14:38:24 +00:00
|
|
|
|
except CatchableError:
|
2024-03-15 23:08:47 +00:00
|
|
|
|
error "WakuRelay handler leaks an error",
|
|
|
|
|
conn = conn, error = getCurrentExceptionMsg()
|
2023-02-27 14:38:24 +00:00
|
|
|
|
|
|
|
|
|
# XXX: Handler hijack GossipSub here?
|
|
|
|
|
w.handler = handler
|
|
|
|
|
w.codec = WakuRelayCodec
|
|
|
|
|
|
2024-08-19 12:13:28 +00:00
|
|
|
|
proc logMessageInfo*(
|
|
|
|
|
w: WakuRelay,
|
|
|
|
|
remotePeerId: string,
|
|
|
|
|
topic: string,
|
|
|
|
|
msg_id_short: string,
|
|
|
|
|
msg: WakuMessage,
|
|
|
|
|
onRecv: bool,
|
|
|
|
|
) =
|
|
|
|
|
let msg_hash = computeMessageHash(topic, msg).to0xHex()
|
|
|
|
|
|
|
|
|
|
if onRecv:
|
|
|
|
|
notice "received relay message",
|
|
|
|
|
my_peer_id = w.switch.peerInfo.peerId,
|
|
|
|
|
msg_hash = msg_hash,
|
|
|
|
|
msg_id = msg_id_short,
|
|
|
|
|
from_peer_id = remotePeerId,
|
|
|
|
|
topic = topic,
|
|
|
|
|
receivedTime = getNowInNanosecondTime(),
|
|
|
|
|
payloadSizeBytes = msg.payload.len
|
|
|
|
|
else:
|
|
|
|
|
notice "sent relay message",
|
|
|
|
|
my_peer_id = w.switch.peerInfo.peerId,
|
|
|
|
|
msg_hash = msg_hash,
|
|
|
|
|
msg_id = msg_id_short,
|
|
|
|
|
to_peer_id = remotePeerId,
|
|
|
|
|
topic = topic,
|
|
|
|
|
sentTime = getNowInNanosecondTime(),
|
|
|
|
|
payloadSizeBytes = msg.payload.len
|
|
|
|
|
|
|
|
|
|
proc initRelayObservers(w: WakuRelay) =
|
2024-06-28 00:48:29 +00:00
|
|
|
|
proc decodeRpcMessageInfo(
|
|
|
|
|
peer: PubSubPeer, msg: Message
|
|
|
|
|
): Result[
|
|
|
|
|
tuple[msgId: string, topic: string, wakuMessage: WakuMessage, msgSize: int], void
|
|
|
|
|
] =
|
|
|
|
|
let msg_id = w.msgIdProvider(msg).valueOr:
|
|
|
|
|
warn "Error generating message id",
|
|
|
|
|
my_peer_id = w.switch.peerInfo.peerId,
|
|
|
|
|
from_peer_id = peer.peerId,
|
|
|
|
|
pubsub_topic = msg.topic,
|
|
|
|
|
error = $error
|
|
|
|
|
return err()
|
|
|
|
|
|
|
|
|
|
let msg_id_short = shortLog(msg_id)
|
|
|
|
|
|
|
|
|
|
let wakuMessage = WakuMessage.decode(msg.data).valueOr:
|
|
|
|
|
warn "Error decoding to Waku Message",
|
|
|
|
|
my_peer_id = w.switch.peerInfo.peerId,
|
|
|
|
|
msg_id = msg_id_short,
|
|
|
|
|
from_peer_id = peer.peerId,
|
|
|
|
|
pubsub_topic = msg.topic,
|
|
|
|
|
error = $error
|
|
|
|
|
return err()
|
|
|
|
|
|
|
|
|
|
let msgSize = msg.data.len + msg.topic.len
|
|
|
|
|
return ok((msg_id_short, msg.topic, wakuMessage, msgSize))
|
|
|
|
|
|
|
|
|
|
proc updateMetrics(
|
|
|
|
|
peer: PubSubPeer,
|
|
|
|
|
pubsub_topic: string,
|
|
|
|
|
msg: WakuMessage,
|
|
|
|
|
msgSize: int,
|
|
|
|
|
onRecv: bool,
|
|
|
|
|
) =
|
2024-08-21 15:10:29 +00:00
|
|
|
|
if onRecv:
|
|
|
|
|
waku_relay_network_bytes.inc(
|
|
|
|
|
msgSize.int64, labelValues = [pubsub_topic, "gross", "in"]
|
|
|
|
|
)
|
|
|
|
|
else:
|
|
|
|
|
# sent traffic can only be "net"
|
|
|
|
|
# TODO: If we can measure unsuccessful sends would mean a possible distinction between gross/net
|
|
|
|
|
waku_relay_network_bytes.inc(
|
|
|
|
|
msgSize.int64, labelValues = [pubsub_topic, "net", "out"]
|
|
|
|
|
)
|
2024-06-28 00:48:29 +00:00
|
|
|
|
|
|
|
|
|
proc onRecv(peer: PubSubPeer, msgs: var RPCMsg) =
|
|
|
|
|
for msg in msgs.messages:
|
|
|
|
|
let (msg_id_short, topic, wakuMessage, msgSize) = decodeRpcMessageInfo(peer, msg).valueOr:
|
|
|
|
|
continue
|
2024-08-19 12:13:28 +00:00
|
|
|
|
# message receive log happens in onValidated observer as onRecv is called before checks
|
2024-06-28 00:48:29 +00:00
|
|
|
|
updateMetrics(peer, topic, wakuMessage, msgSize, onRecv = true)
|
|
|
|
|
discard
|
|
|
|
|
|
2024-08-19 12:13:28 +00:00
|
|
|
|
proc onValidated(peer: PubSubPeer, msg: Message, msgId: MessageId) =
|
|
|
|
|
let msg_id_short = shortLog(msgId)
|
|
|
|
|
let wakuMessage = WakuMessage.decode(msg.data).valueOr:
|
|
|
|
|
warn "onValidated: failed decoding to Waku Message",
|
|
|
|
|
my_peer_id = w.switch.peerInfo.peerId,
|
|
|
|
|
msg_id = msg_id_short,
|
|
|
|
|
from_peer_id = peer.peerId,
|
|
|
|
|
pubsub_topic = msg.topic,
|
|
|
|
|
error = $error
|
|
|
|
|
return
|
|
|
|
|
|
|
|
|
|
logMessageInfo(
|
|
|
|
|
w, shortLog(peer.peerId), msg.topic, msg_id_short, wakuMessage, onRecv = true
|
|
|
|
|
)
|
|
|
|
|
|
2024-06-28 00:48:29 +00:00
|
|
|
|
proc onSend(peer: PubSubPeer, msgs: var RPCMsg) =
|
|
|
|
|
for msg in msgs.messages:
|
|
|
|
|
let (msg_id_short, topic, wakuMessage, msgSize) = decodeRpcMessageInfo(peer, msg).valueOr:
|
2024-08-19 12:13:28 +00:00
|
|
|
|
warn "onSend: failed decoding RPC info",
|
|
|
|
|
my_peer_id = w.switch.peerInfo.peerId, to_peer_id = peer.peerId
|
2024-06-28 00:48:29 +00:00
|
|
|
|
continue
|
2024-08-19 12:13:28 +00:00
|
|
|
|
logMessageInfo(
|
|
|
|
|
w, shortLog(peer.peerId), topic, msg_id_short, wakuMessage, onRecv = false
|
|
|
|
|
)
|
2024-06-28 00:48:29 +00:00
|
|
|
|
updateMetrics(peer, topic, wakuMessage, msgSize, onRecv = false)
|
|
|
|
|
|
2024-08-19 12:13:28 +00:00
|
|
|
|
let administrativeObserver =
|
|
|
|
|
PubSubObserver(onRecv: onRecv, onSend: onSend, onValidated: onValidated)
|
2024-06-28 00:48:29 +00:00
|
|
|
|
|
|
|
|
|
w.addObserver(administrativeObserver)
|
|
|
|
|
|
2024-03-15 23:08:47 +00:00
|
|
|
|
proc new*(
|
2024-04-20 03:40:52 +00:00
|
|
|
|
T: type WakuRelay, switch: Switch, maxMessageSize = int(DefaultMaxWakuMessageSize)
|
2024-03-15 23:08:47 +00:00
|
|
|
|
): WakuRelayResult[T] =
|
2024-01-03 12:11:50 +00:00
|
|
|
|
## maxMessageSize: max num bytes that are allowed for the WakuMessage
|
2023-02-27 14:38:24 +00:00
|
|
|
|
|
2023-06-06 17:28:47 +00:00
|
|
|
|
var w: WakuRelay
|
2023-02-27 14:38:24 +00:00
|
|
|
|
try:
|
2023-06-06 17:28:47 +00:00
|
|
|
|
w = WakuRelay.init(
|
2023-02-27 14:38:24 +00:00
|
|
|
|
switch = switch,
|
2023-06-06 17:28:47 +00:00
|
|
|
|
anonymize = true,
|
2023-02-27 14:38:24 +00:00
|
|
|
|
verifySignature = false,
|
2023-06-06 17:28:47 +00:00
|
|
|
|
sign = false,
|
|
|
|
|
triggerSelf = true,
|
|
|
|
|
msgIdProvider = defaultMessageIdProvider,
|
2024-01-03 12:11:50 +00:00
|
|
|
|
maxMessageSize = maxMessageSize,
|
2024-03-15 23:08:47 +00:00
|
|
|
|
parameters = GossipsubParameters,
|
2023-02-27 14:38:24 +00:00
|
|
|
|
)
|
|
|
|
|
|
2023-06-06 17:28:47 +00:00
|
|
|
|
procCall GossipSub(w).initPubSub()
|
|
|
|
|
w.initProtocolHandler()
|
2024-08-19 12:13:28 +00:00
|
|
|
|
w.initRelayObservers()
|
2023-06-06 17:28:47 +00:00
|
|
|
|
except InitializationError:
|
|
|
|
|
return err("initialization error: " & getCurrentExceptionMsg())
|
2023-02-27 14:38:24 +00:00
|
|
|
|
|
2023-06-06 17:28:47 +00:00
|
|
|
|
return ok(w)
|
2023-02-27 14:38:24 +00:00
|
|
|
|
|
2024-03-15 23:08:47 +00:00
|
|
|
|
proc addValidator*(
|
|
|
|
|
w: WakuRelay, handler: WakuValidatorHandler, errorMessage: string = ""
|
|
|
|
|
) {.gcsafe.} =
|
2024-02-01 17:16:10 +00:00
|
|
|
|
w.wakuValidators.add((handler, errorMessage))
|
2024-01-29 15:11:26 +00:00
|
|
|
|
|
2024-08-27 14:49:46 +00:00
|
|
|
|
proc addPublishObserver*(w: WakuRelay, obs: PublishObserver) =
|
|
|
|
|
## Observer when the api client performed a publish operation. This
|
|
|
|
|
## is initially aimed for bringing an additional layer of delivery reliability thanks
|
|
|
|
|
## to store
|
|
|
|
|
w.publishObservers.add(obs)
|
|
|
|
|
|
2024-06-28 00:48:29 +00:00
|
|
|
|
proc addObserver*(w: WakuRelay, observer: PubSubObserver) {.gcsafe.} =
|
2024-08-27 14:49:46 +00:00
|
|
|
|
## Observes when a message is sent/received from the GossipSub PoV
|
2024-06-28 00:48:29 +00:00
|
|
|
|
procCall GossipSub(w).addObserver(observer)
|
|
|
|
|
|
2024-05-16 20:29:11 +00:00
|
|
|
|
method start*(w: WakuRelay) {.async, base.} =
|
2023-02-27 14:38:24 +00:00
|
|
|
|
debug "start"
|
|
|
|
|
await procCall GossipSub(w).start()
|
|
|
|
|
|
2024-05-16 20:29:11 +00:00
|
|
|
|
method stop*(w: WakuRelay) {.async, base.} =
|
2023-02-27 14:38:24 +00:00
|
|
|
|
debug "stop"
|
|
|
|
|
await procCall GossipSub(w).stop()
|
|
|
|
|
|
|
|
|
|
proc isSubscribed*(w: WakuRelay, topic: PubsubTopic): bool =
|
|
|
|
|
GossipSub(w).topics.hasKey(topic)
|
|
|
|
|
|
2023-09-01 13:03:59 +00:00
|
|
|
|
proc subscribedTopics*(w: WakuRelay): seq[PubsubTopic] =
|
|
|
|
|
return toSeq(GossipSub(w).topics.keys())
|
2023-02-27 14:38:24 +00:00
|
|
|
|
|
2024-03-04 14:31:37 +00:00
|
|
|
|
proc generateOrderedValidator(w: WakuRelay): auto {.gcsafe.} =
|
2023-09-05 09:05:07 +00:00
|
|
|
|
# rejects messages that are not WakuMessage
|
2024-03-15 23:08:47 +00:00
|
|
|
|
let wrappedValidator = proc(
|
|
|
|
|
pubsubTopic: string, message: messages.Message
|
|
|
|
|
): Future[ValidationResult] {.async.} =
|
2023-09-05 09:05:07 +00:00
|
|
|
|
# can be optimized by checking if the message is a WakuMessage without allocating memory
|
|
|
|
|
# see nim-libp2p protobuf library
|
2024-05-01 08:25:33 +00:00
|
|
|
|
let msg = WakuMessage.decode(message.data).valueOr:
|
|
|
|
|
error "protocol generateOrderedValidator reject decode error",
|
|
|
|
|
pubsubTopic = pubsubTopic, error = $error
|
2023-09-05 09:05:07 +00:00
|
|
|
|
return ValidationResult.Reject
|
2024-05-01 08:25:33 +00:00
|
|
|
|
|
2023-09-05 09:05:07 +00:00
|
|
|
|
# now sequentially validate the message
|
2024-06-10 13:56:55 +00:00
|
|
|
|
for (validator, errorMessage) in w.wakuValidators:
|
2024-01-29 15:11:26 +00:00
|
|
|
|
let validatorRes = await validator(pubsubTopic, msg)
|
2024-05-01 08:25:33 +00:00
|
|
|
|
|
2024-01-29 15:11:26 +00:00
|
|
|
|
if validatorRes != ValidationResult.Accept:
|
2024-06-10 13:56:55 +00:00
|
|
|
|
let msgHash = computeMessageHash(pubsubTopic, msg).to0xHex()
|
2024-05-01 08:25:33 +00:00
|
|
|
|
error "protocol generateOrderedValidator reject waku validator",
|
2024-06-10 13:56:55 +00:00
|
|
|
|
msg_hash = msgHash,
|
|
|
|
|
pubsubTopic = pubsubTopic,
|
2024-07-15 13:43:53 +00:00
|
|
|
|
contentTopic = msg.contentTopic,
|
2024-06-10 13:56:55 +00:00
|
|
|
|
validatorRes = validatorRes,
|
|
|
|
|
error = errorMessage
|
2024-05-01 08:25:33 +00:00
|
|
|
|
|
2024-01-29 15:11:26 +00:00
|
|
|
|
return validatorRes
|
2024-05-01 08:25:33 +00:00
|
|
|
|
|
2023-09-05 09:05:07 +00:00
|
|
|
|
return ValidationResult.Accept
|
2024-05-01 08:25:33 +00:00
|
|
|
|
|
2023-09-05 09:05:07 +00:00
|
|
|
|
return wrappedValidator
|
|
|
|
|
|
2024-03-15 23:08:47 +00:00
|
|
|
|
proc validateMessage*(
|
|
|
|
|
w: WakuRelay, pubsubTopic: string, msg: WakuMessage
|
|
|
|
|
): Future[Result[void, string]] {.async.} =
|
|
|
|
|
let messageSizeBytes = msg.encode().buffer.len
|
2024-05-01 08:25:33 +00:00
|
|
|
|
let msgHash = computeMessageHash(pubsubTopic, msg).to0xHex()
|
2024-03-15 23:08:47 +00:00
|
|
|
|
|
|
|
|
|
if messageSizeBytes > w.maxMessageSize:
|
|
|
|
|
let message = fmt"Message size exceeded maximum of {w.maxMessageSize} bytes"
|
2024-05-01 08:25:33 +00:00
|
|
|
|
error "too large Waku message",
|
|
|
|
|
msg_hash = msgHash,
|
|
|
|
|
error = message,
|
|
|
|
|
messageSizeBytes = messageSizeBytes,
|
|
|
|
|
maxMessageSize = w.maxMessageSize
|
|
|
|
|
|
2024-03-15 23:08:47 +00:00
|
|
|
|
return err(message)
|
|
|
|
|
|
|
|
|
|
for (validator, message) in w.wakuValidators:
|
|
|
|
|
let validatorRes = await validator(pubsubTopic, msg)
|
|
|
|
|
if validatorRes != ValidationResult.Accept:
|
|
|
|
|
if message.len > 0:
|
2024-05-01 08:25:33 +00:00
|
|
|
|
error "invalid Waku message", msg_hash = msgHash, error = message
|
2024-03-15 23:08:47 +00:00
|
|
|
|
return err(message)
|
|
|
|
|
else:
|
2024-05-01 08:25:33 +00:00
|
|
|
|
## This should never happen
|
|
|
|
|
error "uncertain invalid Waku message", msg_hash = msgHash, error = message
|
|
|
|
|
return err("validator failed")
|
|
|
|
|
|
2024-03-15 23:08:47 +00:00
|
|
|
|
return ok()
|
2024-02-01 17:16:10 +00:00
|
|
|
|
|
2024-03-15 23:08:47 +00:00
|
|
|
|
proc subscribe*(
|
|
|
|
|
w: WakuRelay, pubsubTopic: PubsubTopic, handler: WakuRelayHandler
|
|
|
|
|
): TopicHandler =
|
|
|
|
|
debug "subscribe", pubsubTopic = pubsubTopic
|
2023-02-27 14:38:24 +00:00
|
|
|
|
|
2023-11-15 15:11:36 +00:00
|
|
|
|
# We need to wrap the handler since gossipsub doesnt understand WakuMessage
|
2024-03-15 23:08:47 +00:00
|
|
|
|
let wrappedHandler = proc(
|
|
|
|
|
pubsubTopic: string, data: seq[byte]
|
|
|
|
|
): Future[void] {.gcsafe, raises: [].} =
|
|
|
|
|
let decMsg = WakuMessage.decode(data)
|
|
|
|
|
if decMsg.isErr():
|
|
|
|
|
# fine if triggerSelf enabled, since validators are bypassed
|
|
|
|
|
error "failed to decode WakuMessage, validator passed a wrong message",
|
2024-05-01 08:25:33 +00:00
|
|
|
|
pubsubTopic = pubsubTopic, error = decMsg.error
|
2024-03-15 23:08:47 +00:00
|
|
|
|
let fut = newFuture[void]()
|
|
|
|
|
fut.complete()
|
|
|
|
|
return fut
|
|
|
|
|
else:
|
2024-08-21 15:10:29 +00:00
|
|
|
|
# this subscription handler is called once for every validated message
|
|
|
|
|
# that will be relayed, hence this is the place we can count net incoming traffic
|
|
|
|
|
waku_relay_network_bytes.inc(
|
|
|
|
|
data.len.int64 + pubsubTopic.len.int64, labelValues = [pubsubTopic, "net", "in"]
|
|
|
|
|
)
|
|
|
|
|
|
2024-03-15 23:08:47 +00:00
|
|
|
|
return handler(pubsubTopic, decMsg.get())
|
2023-02-27 14:38:24 +00:00
|
|
|
|
|
2023-11-15 15:11:36 +00:00
|
|
|
|
# Add the ordered validator to the topic
|
|
|
|
|
# This assumes that if `w.validatorInserted.hasKey(pubSubTopic) is true`, it contains the ordered validator.
|
|
|
|
|
# Otherwise this might lead to unintended behaviour.
|
2023-09-05 09:05:07 +00:00
|
|
|
|
if not w.validatorInserted.hasKey(pubSubTopic):
|
|
|
|
|
procCall GossipSub(w).addValidator(pubSubTopic, w.generateOrderedValidator())
|
|
|
|
|
w.validatorInserted[pubSubTopic] = true
|
2023-02-27 14:38:24 +00:00
|
|
|
|
|
2023-06-06 17:28:47 +00:00
|
|
|
|
# set this topic parameters for scoring
|
|
|
|
|
w.topicParams[pubsubTopic] = TopicParameters
|
2023-02-27 14:38:24 +00:00
|
|
|
|
|
2023-06-06 17:28:47 +00:00
|
|
|
|
# subscribe to the topic with our wrapped handler
|
|
|
|
|
procCall GossipSub(w).subscribe(pubsubTopic, wrappedHandler)
|
2023-02-27 14:38:24 +00:00
|
|
|
|
|
2023-09-26 11:33:52 +00:00
|
|
|
|
return wrappedHandler
|
|
|
|
|
|
|
|
|
|
proc unsubscribeAll*(w: WakuRelay, pubsubTopic: PubsubTopic) =
|
|
|
|
|
## Unsubscribe all handlers on this pubsub topic
|
2023-10-30 15:17:39 +00:00
|
|
|
|
|
2024-03-15 23:08:47 +00:00
|
|
|
|
debug "unsubscribe all", pubsubTopic = pubsubTopic
|
2023-02-27 14:38:24 +00:00
|
|
|
|
|
|
|
|
|
procCall GossipSub(w).unsubscribeAll(pubsubTopic)
|
2023-09-05 09:05:07 +00:00
|
|
|
|
w.validatorInserted.del(pubsubTopic)
|
2023-02-27 14:38:24 +00:00
|
|
|
|
|
2023-09-26 11:33:52 +00:00
|
|
|
|
proc unsubscribe*(w: WakuRelay, pubsubTopic: PubsubTopic, handler: TopicHandler) =
|
|
|
|
|
## Unsubscribe this handler on this pubsub topic
|
2023-10-30 15:17:39 +00:00
|
|
|
|
|
2024-03-15 23:08:47 +00:00
|
|
|
|
debug "unsubscribe", pubsubTopic = pubsubTopic
|
2023-09-26 11:33:52 +00:00
|
|
|
|
|
|
|
|
|
procCall GossipSub(w).unsubscribe(pubsubTopic, handler)
|
|
|
|
|
|
2024-03-15 23:08:47 +00:00
|
|
|
|
proc publish*(
|
|
|
|
|
w: WakuRelay, pubsubTopic: PubsubTopic, message: WakuMessage
|
|
|
|
|
): Future[int] {.async.} =
|
2023-06-06 17:28:47 +00:00
|
|
|
|
let data = message.encode().buffer
|
2024-05-01 08:25:33 +00:00
|
|
|
|
|
2024-06-10 13:56:55 +00:00
|
|
|
|
let msgHash = computeMessageHash(pubsubTopic, message).to0xHex()
|
|
|
|
|
notice "start publish Waku message", msg_hash = msgHash, pubsubTopic = pubsubTopic
|
2023-02-27 14:38:24 +00:00
|
|
|
|
|
2024-06-28 00:48:29 +00:00
|
|
|
|
let relayedPeerCount = await procCall GossipSub(w).publish(pubsubTopic, data)
|
|
|
|
|
|
2024-08-27 14:49:46 +00:00
|
|
|
|
if relayedPeerCount > 0:
|
|
|
|
|
for obs in w.publishObservers:
|
|
|
|
|
obs.onMessagePublished(pubSubTopic, message)
|
|
|
|
|
|
2024-06-28 00:48:29 +00:00
|
|
|
|
return relayedPeerCount
|
2024-08-29 12:29:02 +00:00
|
|
|
|
|
|
|
|
|
proc getNumPeersInMesh*(w: WakuRelay, pubsubTopic: PubsubTopic): Result[int, string] =
|
|
|
|
|
## Returns the number of peers in a mesh defined by the passed pubsub topic.
|
|
|
|
|
## The 'mesh' atribute is defined in the GossipSub ref object.
|
|
|
|
|
|
|
|
|
|
if not w.mesh.hasKey(pubsubTopic):
|
|
|
|
|
return err(
|
|
|
|
|
"getNumPeersInMesh - there is no mesh peer for the given pubsub topic: " &
|
|
|
|
|
pubsubTopic
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
let peersRes = catch:
|
|
|
|
|
w.mesh[pubsubTopic]
|
|
|
|
|
|
|
|
|
|
let peers: HashSet[PubSubPeer] = peersRes.valueOr:
|
|
|
|
|
return
|
|
|
|
|
err("getNumPeersInMesh - exception accessing " & pubsubTopic & ": " & error.msg)
|
|
|
|
|
|
|
|
|
|
return ok(peers.len)
|
|
|
|
|
|
|
|
|
|
proc getNumConnectedPeers*(
|
|
|
|
|
w: WakuRelay, pubsubTopic: PubsubTopic
|
|
|
|
|
): Result[int, string] =
|
|
|
|
|
## Returns the number of connected peers and subscribed to the passed pubsub topic.
|
|
|
|
|
## The 'gossipsub' atribute is defined in the GossipSub ref object.
|
|
|
|
|
|
2024-09-11 08:13:54 +00:00
|
|
|
|
if pubsubTopic == "":
|
|
|
|
|
## Return all the connected peers
|
|
|
|
|
var numConnPeers = 0
|
|
|
|
|
for k, v in w.gossipsub:
|
|
|
|
|
numConnPeers.inc(v.len)
|
|
|
|
|
return ok(numConnPeers)
|
|
|
|
|
|
2024-08-29 12:29:02 +00:00
|
|
|
|
if not w.gossipsub.hasKey(pubsubTopic):
|
|
|
|
|
return err(
|
|
|
|
|
"getNumConnectedPeers - there is no gossipsub peer for the given pubsub topic: " &
|
|
|
|
|
pubsubTopic
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
let peersRes = catch:
|
|
|
|
|
w.gossipsub[pubsubTopic]
|
|
|
|
|
|
|
|
|
|
let peers: HashSet[PubSubPeer] = peersRes.valueOr:
|
|
|
|
|
return err(
|
|
|
|
|
"getNumConnectedPeers - exception accessing " & pubsubTopic & ": " & error.msg
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
return ok(peers.len)
|
2024-10-28 08:17:46 +00:00
|
|
|
|
|
|
|
|
|
proc getSubscribedTopics*(w: WakuRelay): seq[PubsubTopic] =
|
|
|
|
|
## Returns a seq containing the current list of subscribed topics
|
|
|
|
|
var topics: seq[PubsubTopic]
|
|
|
|
|
for t in w.validatorInserted.keys():
|
|
|
|
|
topics.add(t)
|
|
|
|
|
return topics
|