2023-02-27 14:38:24 +00:00
|
|
|
|
## Waku Relay module. Thin layer on top of GossipSub.
|
|
|
|
|
##
|
|
|
|
|
## See https://github.com/vacp2p/specs/blob/master/specs/waku/v2/waku-relay.md
|
|
|
|
|
## for spec.
|
|
|
|
|
when (NimMajor, NimMinor) < (1, 4):
|
|
|
|
|
{.push raises: [Defect].}
|
|
|
|
|
else:
|
|
|
|
|
{.push raises: [].}
|
|
|
|
|
|
|
|
|
|
import
|
2024-02-01 17:16:10 +00:00
|
|
|
|
std/strformat,
|
2023-02-27 14:38:24 +00:00
|
|
|
|
stew/results,
|
2023-09-05 09:05:07 +00:00
|
|
|
|
sequtils,
|
2023-02-27 14:38:24 +00:00
|
|
|
|
chronos,
|
|
|
|
|
chronicles,
|
|
|
|
|
metrics,
|
|
|
|
|
libp2p/multihash,
|
|
|
|
|
libp2p/protocols/pubsub/pubsub,
|
|
|
|
|
libp2p/protocols/pubsub/gossipsub,
|
2023-06-06 17:28:47 +00:00
|
|
|
|
libp2p/protocols/pubsub/rpc/messages,
|
2023-02-27 14:38:24 +00:00
|
|
|
|
libp2p/stream/connection,
|
|
|
|
|
libp2p/switch
|
|
|
|
|
import
|
2023-04-19 11:29:23 +00:00
|
|
|
|
../waku_core,
|
2023-02-27 14:38:24 +00:00
|
|
|
|
./message_id
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
logScope:
|
|
|
|
|
topics = "waku relay"
|
|
|
|
|
|
|
|
|
|
const
|
|
|
|
|
WakuRelayCodec* = "/vac/waku/relay/2.0.0"
|
|
|
|
|
|
2023-06-06 17:28:47 +00:00
|
|
|
|
# see: https://github.com/libp2p/specs/blob/master/pubsub/gossipsub/gossipsub-v1.1.md#overview-of-new-parameters
|
|
|
|
|
const TopicParameters = TopicParams(
|
|
|
|
|
topicWeight: 1,
|
|
|
|
|
|
|
|
|
|
# p1: favours peers already in the mesh
|
|
|
|
|
timeInMeshWeight: 0.01,
|
|
|
|
|
timeInMeshQuantum: 1.seconds,
|
|
|
|
|
timeInMeshCap: 10.0,
|
|
|
|
|
|
|
|
|
|
# p2: rewards fast peers
|
|
|
|
|
firstMessageDeliveriesWeight: 1.0,
|
|
|
|
|
firstMessageDeliveriesDecay: 0.5,
|
|
|
|
|
firstMessageDeliveriesCap: 10.0,
|
|
|
|
|
|
|
|
|
|
# p3: penalizes lazy peers. safe low value
|
|
|
|
|
meshMessageDeliveriesWeight: 0.0,
|
|
|
|
|
meshMessageDeliveriesDecay: 0.0,
|
|
|
|
|
meshMessageDeliveriesCap: 0,
|
|
|
|
|
meshMessageDeliveriesThreshold: 0,
|
|
|
|
|
meshMessageDeliveriesWindow: 0.milliseconds,
|
|
|
|
|
meshMessageDeliveriesActivation: 0.seconds,
|
|
|
|
|
|
|
|
|
|
# p3b: tracks history of prunes
|
|
|
|
|
meshFailurePenaltyWeight: 0.0,
|
|
|
|
|
meshFailurePenaltyDecay: 0.0,
|
|
|
|
|
|
|
|
|
|
# p4: penalizes invalid messages. highly penalize
|
|
|
|
|
# peers sending wrong messages
|
|
|
|
|
invalidMessageDeliveriesWeight: -100.0,
|
|
|
|
|
invalidMessageDeliveriesDecay: 0.5
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
# see: https://rfc.vac.dev/spec/29/#gossipsub-v10-parameters
|
|
|
|
|
const GossipsubParameters = GossipSubParams(
|
|
|
|
|
explicit: true,
|
|
|
|
|
pruneBackoff: chronos.minutes(1),
|
|
|
|
|
unsubscribeBackoff: chronos.seconds(5),
|
|
|
|
|
floodPublish: true,
|
|
|
|
|
gossipFactor: 0.25,
|
|
|
|
|
|
|
|
|
|
d: 6,
|
|
|
|
|
dLow: 4,
|
2023-10-30 15:17:39 +00:00
|
|
|
|
dHigh: 8,
|
2023-06-06 17:28:47 +00:00
|
|
|
|
dScore: 6,
|
|
|
|
|
dOut: 3,
|
|
|
|
|
dLazy: 6,
|
|
|
|
|
|
|
|
|
|
heartbeatInterval: chronos.seconds(1),
|
|
|
|
|
historyLength: 6,
|
|
|
|
|
historyGossip: 3,
|
|
|
|
|
fanoutTTL: chronos.minutes(1),
|
|
|
|
|
seenTTL: chronos.minutes(2),
|
|
|
|
|
|
|
|
|
|
# no gossip is sent to peers below this score
|
|
|
|
|
gossipThreshold: -100,
|
|
|
|
|
|
|
|
|
|
# no self-published msgs are sent to peers below this score
|
|
|
|
|
publishThreshold: -1000,
|
|
|
|
|
|
|
|
|
|
# used to trigger disconnections + ignore peer if below this score
|
|
|
|
|
graylistThreshold: -10000,
|
|
|
|
|
|
|
|
|
|
# grafts better peers if the mesh median score drops below this. unset.
|
|
|
|
|
opportunisticGraftThreshold: 0,
|
|
|
|
|
|
|
|
|
|
# how often peer scoring is updated
|
|
|
|
|
decayInterval: chronos.seconds(12),
|
|
|
|
|
|
|
|
|
|
# below this we consider the parameter to be zero
|
|
|
|
|
decayToZero: 0.01,
|
|
|
|
|
|
|
|
|
|
# remember peer score during x after it disconnects
|
|
|
|
|
retainScore: chronos.minutes(10),
|
|
|
|
|
|
|
|
|
|
# p5: application specific, unset
|
|
|
|
|
appSpecificWeight: 0.0,
|
|
|
|
|
|
|
|
|
|
# p6: penalizes peers sharing more than threshold ips
|
|
|
|
|
ipColocationFactorWeight: -50.0,
|
|
|
|
|
ipColocationFactorThreshold: 5.0,
|
|
|
|
|
|
|
|
|
|
# p7: penalizes bad behaviour (weight and decay)
|
|
|
|
|
behaviourPenaltyWeight: -10.0,
|
|
|
|
|
behaviourPenaltyDecay: 0.986,
|
|
|
|
|
|
|
|
|
|
# triggers disconnections of bad peers aka score <graylistThreshold
|
|
|
|
|
disconnectBadPeers: true
|
|
|
|
|
)
|
2023-02-27 14:38:24 +00:00
|
|
|
|
|
|
|
|
|
type
|
2023-06-06 17:28:47 +00:00
|
|
|
|
WakuRelayResult*[T] = Result[T, string]
|
|
|
|
|
WakuRelayHandler* = proc(pubsubTopic: PubsubTopic, message: WakuMessage): Future[void] {.gcsafe, raises: [Defect].}
|
2023-09-05 09:05:07 +00:00
|
|
|
|
WakuValidatorHandler* = proc(pubsubTopic: PubsubTopic, message: WakuMessage): Future[ValidationResult] {.gcsafe, raises: [Defect].}
|
2023-02-27 14:38:24 +00:00
|
|
|
|
WakuRelay* = ref object of GossipSub
|
2024-02-01 17:16:10 +00:00
|
|
|
|
# seq of tuples: the first entry in the tuple contains the validators are called for every topic
|
|
|
|
|
# the second entry contains the error messages to be returned when the validator fails
|
|
|
|
|
wakuValidators: seq[tuple[handler: WakuValidatorHandler, errorMessage: string]]
|
|
|
|
|
# a map of validators to error messages to return when validation fails
|
2023-09-05 09:05:07 +00:00
|
|
|
|
validatorInserted: Table[PubsubTopic, bool]
|
2023-02-27 14:38:24 +00:00
|
|
|
|
|
|
|
|
|
proc initProtocolHandler(w: WakuRelay) =
|
|
|
|
|
proc handler(conn: Connection, proto: string) {.async.} =
|
|
|
|
|
## main protocol handler that gets triggered on every
|
|
|
|
|
## connection for a protocol string
|
|
|
|
|
## e.g. ``/wakusub/0.0.1``, etc...
|
|
|
|
|
debug "Incoming WakuRelay connection", connection=conn, protocol=proto
|
|
|
|
|
|
|
|
|
|
try:
|
|
|
|
|
await w.handleConn(conn, proto)
|
|
|
|
|
except CancelledError:
|
|
|
|
|
# This is top-level procedure which will work as separate task, so it
|
|
|
|
|
# do not need to propogate CancelledError.
|
|
|
|
|
error "Unexpected cancellation in relay handler", conn=conn, error=getCurrentExceptionMsg()
|
|
|
|
|
except CatchableError:
|
|
|
|
|
error "WakuRelay handler leaks an error", conn=conn, error=getCurrentExceptionMsg()
|
|
|
|
|
|
|
|
|
|
# XXX: Handler hijack GossipSub here?
|
|
|
|
|
w.handler = handler
|
|
|
|
|
w.codec = WakuRelayCodec
|
|
|
|
|
|
2024-01-03 12:11:50 +00:00
|
|
|
|
proc new*(T: type WakuRelay,
|
|
|
|
|
switch: Switch,
|
|
|
|
|
maxMessageSize = int(MaxWakuMessageSize)): WakuRelayResult[T] =
|
|
|
|
|
## maxMessageSize: max num bytes that are allowed for the WakuMessage
|
2023-02-27 14:38:24 +00:00
|
|
|
|
|
2023-06-06 17:28:47 +00:00
|
|
|
|
var w: WakuRelay
|
2023-02-27 14:38:24 +00:00
|
|
|
|
try:
|
2023-06-06 17:28:47 +00:00
|
|
|
|
w = WakuRelay.init(
|
2023-02-27 14:38:24 +00:00
|
|
|
|
switch = switch,
|
2023-06-06 17:28:47 +00:00
|
|
|
|
anonymize = true,
|
2023-02-27 14:38:24 +00:00
|
|
|
|
verifySignature = false,
|
2023-06-06 17:28:47 +00:00
|
|
|
|
sign = false,
|
|
|
|
|
triggerSelf = true,
|
|
|
|
|
msgIdProvider = defaultMessageIdProvider,
|
2024-01-03 12:11:50 +00:00
|
|
|
|
maxMessageSize = maxMessageSize,
|
2023-06-06 17:28:47 +00:00
|
|
|
|
parameters = GossipsubParameters
|
2023-02-27 14:38:24 +00:00
|
|
|
|
)
|
|
|
|
|
|
2023-06-06 17:28:47 +00:00
|
|
|
|
procCall GossipSub(w).initPubSub()
|
|
|
|
|
w.initProtocolHandler()
|
2023-02-27 14:38:24 +00:00
|
|
|
|
|
2023-06-06 17:28:47 +00:00
|
|
|
|
except InitializationError:
|
|
|
|
|
return err("initialization error: " & getCurrentExceptionMsg())
|
2023-02-27 14:38:24 +00:00
|
|
|
|
|
2023-06-06 17:28:47 +00:00
|
|
|
|
return ok(w)
|
2023-02-27 14:38:24 +00:00
|
|
|
|
|
2023-10-30 15:17:39 +00:00
|
|
|
|
proc addValidator*(w: WakuRelay,
|
2024-02-01 17:16:10 +00:00
|
|
|
|
handler: WakuValidatorHandler,
|
|
|
|
|
errorMessage: string = "") {.gcsafe.} =
|
|
|
|
|
w.wakuValidators.add((handler, errorMessage))
|
2024-01-29 15:11:26 +00:00
|
|
|
|
|
2023-02-27 14:38:24 +00:00
|
|
|
|
method start*(w: WakuRelay) {.async.} =
|
|
|
|
|
debug "start"
|
|
|
|
|
await procCall GossipSub(w).start()
|
|
|
|
|
|
|
|
|
|
method stop*(w: WakuRelay) {.async.} =
|
|
|
|
|
debug "stop"
|
|
|
|
|
await procCall GossipSub(w).stop()
|
|
|
|
|
|
|
|
|
|
proc isSubscribed*(w: WakuRelay, topic: PubsubTopic): bool =
|
|
|
|
|
GossipSub(w).topics.hasKey(topic)
|
|
|
|
|
|
2023-09-01 13:03:59 +00:00
|
|
|
|
proc subscribedTopics*(w: WakuRelay): seq[PubsubTopic] =
|
|
|
|
|
return toSeq(GossipSub(w).topics.keys())
|
2023-02-27 14:38:24 +00:00
|
|
|
|
|
2023-09-05 09:05:07 +00:00
|
|
|
|
proc generateOrderedValidator*(w: WakuRelay): auto {.gcsafe.} =
|
|
|
|
|
# rejects messages that are not WakuMessage
|
2023-10-30 15:17:39 +00:00
|
|
|
|
let wrappedValidator = proc(pubsubTopic: string,
|
2023-09-05 09:05:07 +00:00
|
|
|
|
message: messages.Message): Future[ValidationResult] {.async.} =
|
|
|
|
|
# can be optimized by checking if the message is a WakuMessage without allocating memory
|
|
|
|
|
# see nim-libp2p protobuf library
|
|
|
|
|
let msgRes = WakuMessage.decode(message.data)
|
|
|
|
|
if msgRes.isErr():
|
2023-12-15 12:34:30 +00:00
|
|
|
|
trace "protocol generateOrderedValidator reject decode error", error=msgRes.error
|
2023-09-05 09:05:07 +00:00
|
|
|
|
return ValidationResult.Reject
|
|
|
|
|
let msg = msgRes.get()
|
|
|
|
|
|
|
|
|
|
# now sequentially validate the message
|
2024-02-01 17:16:10 +00:00
|
|
|
|
for (validator, _) in w.wakuValidators:
|
2024-01-29 15:11:26 +00:00
|
|
|
|
let validatorRes = await validator(pubsubTopic, msg)
|
|
|
|
|
if validatorRes != ValidationResult.Accept:
|
|
|
|
|
return validatorRes
|
2023-09-05 09:05:07 +00:00
|
|
|
|
return ValidationResult.Accept
|
|
|
|
|
return wrappedValidator
|
|
|
|
|
|
2024-02-01 17:16:10 +00:00
|
|
|
|
proc isValidSize(message: WakuMessage): Future[Result[void, string]] {.async.} =
|
|
|
|
|
let messageSizeBytes = uint64(message.encode().buffer.len)
|
|
|
|
|
|
|
|
|
|
if(messageSizeBytes > MaxWakuMessageSize):
|
|
|
|
|
let message = fmt"Message size exceeded maximum of {DefaultMaxWakuMessageSizeStr}"
|
|
|
|
|
debug "Invalid Waku Message", error=message
|
|
|
|
|
return err(message)
|
|
|
|
|
|
|
|
|
|
return ok()
|
|
|
|
|
|
|
|
|
|
proc validateMessage*(w: WakuRelay, pubsubTopic: string, msg: WakuMessage):
|
|
|
|
|
Future[Result[void, string]] {.async.} =
|
|
|
|
|
|
|
|
|
|
(await msg.isValidSize()).isOkOr:
|
|
|
|
|
return err(error)
|
|
|
|
|
|
|
|
|
|
for (validator, message) in w.wakuValidators:
|
|
|
|
|
let validatorRes = await validator(pubsubTopic, msg)
|
|
|
|
|
if validatorRes != ValidationResult.Accept:
|
|
|
|
|
if message.len > 0:
|
|
|
|
|
return err(message)
|
|
|
|
|
else:
|
|
|
|
|
return err("Validator failed")
|
|
|
|
|
return ok()
|
|
|
|
|
|
2023-09-26 11:33:52 +00:00
|
|
|
|
proc subscribe*(w: WakuRelay, pubsubTopic: PubsubTopic, handler: WakuRelayHandler): TopicHandler =
|
2023-02-27 14:38:24 +00:00
|
|
|
|
debug "subscribe", pubsubTopic=pubsubTopic
|
|
|
|
|
|
2023-11-15 15:11:36 +00:00
|
|
|
|
# We need to wrap the handler since gossipsub doesnt understand WakuMessage
|
2023-09-26 11:33:52 +00:00
|
|
|
|
let wrappedHandler =
|
|
|
|
|
proc(pubsubTopic: string, data: seq[byte]): Future[void] {.gcsafe, raises: [].} =
|
|
|
|
|
let decMsg = WakuMessage.decode(data)
|
|
|
|
|
if decMsg.isErr():
|
|
|
|
|
# fine if triggerSelf enabled, since validators are bypassed
|
|
|
|
|
error "failed to decode WakuMessage, validator passed a wrong message", error = decMsg.error
|
|
|
|
|
let fut = newFuture[void]()
|
|
|
|
|
fut.complete()
|
|
|
|
|
return fut
|
|
|
|
|
else:
|
|
|
|
|
return handler(pubsubTopic, decMsg.get())
|
2023-02-27 14:38:24 +00:00
|
|
|
|
|
2023-11-15 15:11:36 +00:00
|
|
|
|
# Add the ordered validator to the topic
|
|
|
|
|
# This assumes that if `w.validatorInserted.hasKey(pubSubTopic) is true`, it contains the ordered validator.
|
|
|
|
|
# Otherwise this might lead to unintended behaviour.
|
2023-09-05 09:05:07 +00:00
|
|
|
|
if not w.validatorInserted.hasKey(pubSubTopic):
|
|
|
|
|
procCall GossipSub(w).addValidator(pubSubTopic, w.generateOrderedValidator())
|
|
|
|
|
w.validatorInserted[pubSubTopic] = true
|
2023-02-27 14:38:24 +00:00
|
|
|
|
|
2023-06-06 17:28:47 +00:00
|
|
|
|
# set this topic parameters for scoring
|
|
|
|
|
w.topicParams[pubsubTopic] = TopicParameters
|
2023-02-27 14:38:24 +00:00
|
|
|
|
|
2023-06-06 17:28:47 +00:00
|
|
|
|
# subscribe to the topic with our wrapped handler
|
|
|
|
|
procCall GossipSub(w).subscribe(pubsubTopic, wrappedHandler)
|
2023-02-27 14:38:24 +00:00
|
|
|
|
|
2023-09-26 11:33:52 +00:00
|
|
|
|
return wrappedHandler
|
|
|
|
|
|
|
|
|
|
proc unsubscribeAll*(w: WakuRelay, pubsubTopic: PubsubTopic) =
|
|
|
|
|
## Unsubscribe all handlers on this pubsub topic
|
2023-10-30 15:17:39 +00:00
|
|
|
|
|
2023-09-26 11:33:52 +00:00
|
|
|
|
debug "unsubscribe all", pubsubTopic=pubsubTopic
|
2023-02-27 14:38:24 +00:00
|
|
|
|
|
|
|
|
|
procCall GossipSub(w).unsubscribeAll(pubsubTopic)
|
2023-09-05 09:05:07 +00:00
|
|
|
|
w.validatorInserted.del(pubsubTopic)
|
2023-02-27 14:38:24 +00:00
|
|
|
|
|
2023-09-26 11:33:52 +00:00
|
|
|
|
proc unsubscribe*(w: WakuRelay, pubsubTopic: PubsubTopic, handler: TopicHandler) =
|
|
|
|
|
## Unsubscribe this handler on this pubsub topic
|
2023-10-30 15:17:39 +00:00
|
|
|
|
|
2023-09-26 11:33:52 +00:00
|
|
|
|
debug "unsubscribe", pubsubTopic=pubsubTopic
|
|
|
|
|
|
|
|
|
|
procCall GossipSub(w).unsubscribe(pubsubTopic, handler)
|
|
|
|
|
|
2023-06-06 17:28:47 +00:00
|
|
|
|
proc publish*(w: WakuRelay, pubsubTopic: PubsubTopic, message: WakuMessage): Future[int] {.async.} =
|
2023-02-27 14:38:24 +00:00
|
|
|
|
trace "publish", pubsubTopic=pubsubTopic
|
2023-06-06 17:28:47 +00:00
|
|
|
|
let data = message.encode().buffer
|
2023-02-27 14:38:24 +00:00
|
|
|
|
|
|
|
|
|
return await procCall GossipSub(w).publish(pubsubTopic, data)
|