nwaku/waku/waku_relay/protocol.nim

456 lines
15 KiB
Nim
Raw Normal View History

## Waku Relay module. Thin layer on top of GossipSub.
##
## See https://github.com/vacp2p/specs/blob/master/specs/waku/v2/waku-relay.md
## for spec.
{.push raises: [].}
import
std/strformat,
stew/byteutils,
results,
sequtils,
chronos,
chronicles,
metrics,
libp2p/multihash,
libp2p/protocols/pubsub/pubsub,
libp2p/protocols/pubsub/gossipsub,
libp2p/protocols/pubsub/rpc/messages,
libp2p/stream/connection,
libp2p/switch
import ../waku_core, ./message_id, ../node/delivery_monitor/publish_observer
logScope:
topics = "waku relay"
const WakuRelayCodec* = "/vac/waku/relay/2.0.0"
# see: https://github.com/libp2p/specs/blob/master/pubsub/gossipsub/gossipsub-v1.1.md#overview-of-new-parameters
const TopicParameters = TopicParams(
topicWeight: 1,
# p1: favours peers already in the mesh
timeInMeshWeight: 0.01,
timeInMeshQuantum: 1.seconds,
timeInMeshCap: 10.0,
# p2: rewards fast peers
firstMessageDeliveriesWeight: 1.0,
firstMessageDeliveriesDecay: 0.5,
firstMessageDeliveriesCap: 10.0,
# p3: penalizes lazy peers. safe low value
meshMessageDeliveriesWeight: 0.0,
meshMessageDeliveriesDecay: 0.0,
meshMessageDeliveriesCap: 0,
meshMessageDeliveriesThreshold: 0,
meshMessageDeliveriesWindow: 0.milliseconds,
meshMessageDeliveriesActivation: 0.seconds,
# p3b: tracks history of prunes
meshFailurePenaltyWeight: 0.0,
meshFailurePenaltyDecay: 0.0,
# p4: penalizes invalid messages. highly penalize
# peers sending wrong messages
invalidMessageDeliveriesWeight: -100.0,
invalidMessageDeliveriesDecay: 0.5,
)
declareCounter waku_relay_network_bytes,
"total traffic per topic, distinct gross/net and direction",
labels = ["topic", "type", "direction"]
# see: https://rfc.vac.dev/spec/29/#gossipsub-v10-parameters
const GossipsubParameters = GossipSubParams.init(
pruneBackoff = chronos.minutes(1),
unsubscribeBackoff = chronos.seconds(5),
floodPublish = true,
gossipFactor = 0.25,
d = 6,
dLow = 4,
dHigh = 8,
dScore = 6,
dOut = 3,
dLazy = 6,
heartbeatInterval = chronos.seconds(1),
historyLength = 6,
historyGossip = 3,
fanoutTTL = chronos.minutes(1),
seenTTL = chronos.minutes(2),
# no gossip is sent to peers below this score
gossipThreshold = -100,
# no self-published msgs are sent to peers below this score
publishThreshold = -1000,
# used to trigger disconnections + ignore peer if below this score
graylistThreshold = -10000,
# grafts better peers if the mesh median score drops below this. unset.
opportunisticGraftThreshold = 0,
# how often peer scoring is updated
decayInterval = chronos.seconds(12),
# below this we consider the parameter to be zero
decayToZero = 0.01,
# remember peer score during x after it disconnects
retainScore = chronos.minutes(10),
# p5: application specific, unset
appSpecificWeight = 0.0,
# p6: penalizes peers sharing more than threshold ips
ipColocationFactorWeight = -50.0,
ipColocationFactorThreshold = 5.0,
# p7: penalizes bad behaviour (weight and decay)
behaviourPenaltyWeight = -10.0,
behaviourPenaltyDecay = 0.986,
# triggers disconnections of bad peers aka score <graylistThreshold
disconnectBadPeers = true,
)
type
WakuRelayResult*[T] = Result[T, string]
WakuRelayHandler* = proc(pubsubTopic: PubsubTopic, message: WakuMessage): Future[void] {.
gcsafe, raises: [Defect]
.}
WakuValidatorHandler* = proc(
pubsubTopic: PubsubTopic, message: WakuMessage
): Future[ValidationResult] {.gcsafe, raises: [Defect].}
WakuRelay* = ref object of GossipSub
# seq of tuples: the first entry in the tuple contains the validators are called for every topic
# the second entry contains the error messages to be returned when the validator fails
wakuValidators: seq[tuple[handler: WakuValidatorHandler, errorMessage: string]]
# a map of validators to error messages to return when validation fails
validatorInserted: Table[PubsubTopic, bool]
publishObservers: seq[PublishObserver]
proc initProtocolHandler(w: WakuRelay) =
proc handler(conn: Connection, proto: string) {.async.} =
## main protocol handler that gets triggered on every
## connection for a protocol string
## e.g. ``/wakusub/0.0.1``, etc...
debug "Incoming WakuRelay connection", connection = conn, protocol = proto
try:
await w.handleConn(conn, proto)
except CancelledError:
# This is top-level procedure which will work as separate task, so it
# do not need to propogate CancelledError.
error "Unexpected cancellation in relay handler",
conn = conn, error = getCurrentExceptionMsg()
except CatchableError:
error "WakuRelay handler leaks an error",
conn = conn, error = getCurrentExceptionMsg()
# XXX: Handler hijack GossipSub here?
w.handler = handler
w.codec = WakuRelayCodec
proc logMessageInfo*(
w: WakuRelay,
remotePeerId: string,
topic: string,
msg_id_short: string,
msg: WakuMessage,
onRecv: bool,
) =
let msg_hash = computeMessageHash(topic, msg).to0xHex()
if onRecv:
notice "received relay message",
my_peer_id = w.switch.peerInfo.peerId,
msg_hash = msg_hash,
msg_id = msg_id_short,
from_peer_id = remotePeerId,
topic = topic,
receivedTime = getNowInNanosecondTime(),
payloadSizeBytes = msg.payload.len
else:
notice "sent relay message",
my_peer_id = w.switch.peerInfo.peerId,
msg_hash = msg_hash,
msg_id = msg_id_short,
to_peer_id = remotePeerId,
topic = topic,
sentTime = getNowInNanosecondTime(),
payloadSizeBytes = msg.payload.len
proc initRelayObservers(w: WakuRelay) =
proc decodeRpcMessageInfo(
peer: PubSubPeer, msg: Message
): Result[
tuple[msgId: string, topic: string, wakuMessage: WakuMessage, msgSize: int], void
] =
let msg_id = w.msgIdProvider(msg).valueOr:
warn "Error generating message id",
my_peer_id = w.switch.peerInfo.peerId,
from_peer_id = peer.peerId,
pubsub_topic = msg.topic,
error = $error
return err()
let msg_id_short = shortLog(msg_id)
let wakuMessage = WakuMessage.decode(msg.data).valueOr:
warn "Error decoding to Waku Message",
my_peer_id = w.switch.peerInfo.peerId,
msg_id = msg_id_short,
from_peer_id = peer.peerId,
pubsub_topic = msg.topic,
error = $error
return err()
let msgSize = msg.data.len + msg.topic.len
return ok((msg_id_short, msg.topic, wakuMessage, msgSize))
proc updateMetrics(
peer: PubSubPeer,
pubsub_topic: string,
msg: WakuMessage,
msgSize: int,
onRecv: bool,
) =
if onRecv:
waku_relay_network_bytes.inc(
msgSize.int64, labelValues = [pubsub_topic, "gross", "in"]
)
else:
# sent traffic can only be "net"
# TODO: If we can measure unsuccessful sends would mean a possible distinction between gross/net
waku_relay_network_bytes.inc(
msgSize.int64, labelValues = [pubsub_topic, "net", "out"]
)
proc onRecv(peer: PubSubPeer, msgs: var RPCMsg) =
for msg in msgs.messages:
let (msg_id_short, topic, wakuMessage, msgSize) = decodeRpcMessageInfo(peer, msg).valueOr:
continue
# message receive log happens in onValidated observer as onRecv is called before checks
updateMetrics(peer, topic, wakuMessage, msgSize, onRecv = true)
discard
proc onValidated(peer: PubSubPeer, msg: Message, msgId: MessageId) =
let msg_id_short = shortLog(msgId)
let wakuMessage = WakuMessage.decode(msg.data).valueOr:
warn "onValidated: failed decoding to Waku Message",
my_peer_id = w.switch.peerInfo.peerId,
msg_id = msg_id_short,
from_peer_id = peer.peerId,
pubsub_topic = msg.topic,
error = $error
return
logMessageInfo(
w, shortLog(peer.peerId), msg.topic, msg_id_short, wakuMessage, onRecv = true
)
proc onSend(peer: PubSubPeer, msgs: var RPCMsg) =
for msg in msgs.messages:
let (msg_id_short, topic, wakuMessage, msgSize) = decodeRpcMessageInfo(peer, msg).valueOr:
warn "onSend: failed decoding RPC info",
my_peer_id = w.switch.peerInfo.peerId, to_peer_id = peer.peerId
continue
logMessageInfo(
w, shortLog(peer.peerId), topic, msg_id_short, wakuMessage, onRecv = false
)
updateMetrics(peer, topic, wakuMessage, msgSize, onRecv = false)
let administrativeObserver =
PubSubObserver(onRecv: onRecv, onSend: onSend, onValidated: onValidated)
w.addObserver(administrativeObserver)
proc new*(
T: type WakuRelay, switch: Switch, maxMessageSize = int(DefaultMaxWakuMessageSize)
): WakuRelayResult[T] =
## maxMessageSize: max num bytes that are allowed for the WakuMessage
var w: WakuRelay
try:
w = WakuRelay.init(
switch = switch,
anonymize = true,
verifySignature = false,
sign = false,
triggerSelf = true,
msgIdProvider = defaultMessageIdProvider,
maxMessageSize = maxMessageSize,
parameters = GossipsubParameters,
)
procCall GossipSub(w).initPubSub()
w.initProtocolHandler()
w.initRelayObservers()
except InitializationError:
return err("initialization error: " & getCurrentExceptionMsg())
return ok(w)
proc addValidator*(
w: WakuRelay, handler: WakuValidatorHandler, errorMessage: string = ""
) {.gcsafe.} =
w.wakuValidators.add((handler, errorMessage))
proc addPublishObserver*(w: WakuRelay, obs: PublishObserver) =
## Observer when the api client performed a publish operation. This
## is initially aimed for bringing an additional layer of delivery reliability thanks
## to store
w.publishObservers.add(obs)
proc addObserver*(w: WakuRelay, observer: PubSubObserver) {.gcsafe.} =
## Observes when a message is sent/received from the GossipSub PoV
procCall GossipSub(w).addObserver(observer)
method start*(w: WakuRelay) {.async, base.} =
debug "start"
await procCall GossipSub(w).start()
method stop*(w: WakuRelay) {.async, base.} =
debug "stop"
await procCall GossipSub(w).stop()
proc isSubscribed*(w: WakuRelay, topic: PubsubTopic): bool =
GossipSub(w).topics.hasKey(topic)
proc subscribedTopics*(w: WakuRelay): seq[PubsubTopic] =
return toSeq(GossipSub(w).topics.keys())
proc generateOrderedValidator(w: WakuRelay): auto {.gcsafe.} =
# rejects messages that are not WakuMessage
let wrappedValidator = proc(
pubsubTopic: string, message: messages.Message
): Future[ValidationResult] {.async.} =
# can be optimized by checking if the message is a WakuMessage without allocating memory
# see nim-libp2p protobuf library
let msg = WakuMessage.decode(message.data).valueOr:
error "protocol generateOrderedValidator reject decode error",
pubsubTopic = pubsubTopic, error = $error
return ValidationResult.Reject
# now sequentially validate the message
for (validator, errorMessage) in w.wakuValidators:
let validatorRes = await validator(pubsubTopic, msg)
if validatorRes != ValidationResult.Accept:
let msgHash = computeMessageHash(pubsubTopic, msg).to0xHex()
error "protocol generateOrderedValidator reject waku validator",
msg_hash = msgHash,
pubsubTopic = pubsubTopic,
contentTopic = msg.contentTopic,
validatorRes = validatorRes,
error = errorMessage
return validatorRes
return ValidationResult.Accept
return wrappedValidator
proc validateMessage*(
w: WakuRelay, pubsubTopic: string, msg: WakuMessage
): Future[Result[void, string]] {.async.} =
let messageSizeBytes = msg.encode().buffer.len
let msgHash = computeMessageHash(pubsubTopic, msg).to0xHex()
if messageSizeBytes > w.maxMessageSize:
let message = fmt"Message size exceeded maximum of {w.maxMessageSize} bytes"
error "too large Waku message",
msg_hash = msgHash,
error = message,
messageSizeBytes = messageSizeBytes,
maxMessageSize = w.maxMessageSize
return err(message)
for (validator, message) in w.wakuValidators:
let validatorRes = await validator(pubsubTopic, msg)
if validatorRes != ValidationResult.Accept:
if message.len > 0:
error "invalid Waku message", msg_hash = msgHash, error = message
return err(message)
else:
## This should never happen
error "uncertain invalid Waku message", msg_hash = msgHash, error = message
return err("validator failed")
return ok()
proc subscribe*(
w: WakuRelay, pubsubTopic: PubsubTopic, handler: WakuRelayHandler
): TopicHandler =
debug "subscribe", pubsubTopic = pubsubTopic
# We need to wrap the handler since gossipsub doesnt understand WakuMessage
let wrappedHandler = proc(
pubsubTopic: string, data: seq[byte]
): Future[void] {.gcsafe, raises: [].} =
let decMsg = WakuMessage.decode(data)
if decMsg.isErr():
# fine if triggerSelf enabled, since validators are bypassed
error "failed to decode WakuMessage, validator passed a wrong message",
pubsubTopic = pubsubTopic, error = decMsg.error
let fut = newFuture[void]()
fut.complete()
return fut
else:
# this subscription handler is called once for every validated message
# that will be relayed, hence this is the place we can count net incoming traffic
waku_relay_network_bytes.inc(
data.len.int64 + pubsubTopic.len.int64, labelValues = [pubsubTopic, "net", "in"]
)
return handler(pubsubTopic, decMsg.get())
# Add the ordered validator to the topic
# This assumes that if `w.validatorInserted.hasKey(pubSubTopic) is true`, it contains the ordered validator.
# Otherwise this might lead to unintended behaviour.
if not w.validatorInserted.hasKey(pubSubTopic):
procCall GossipSub(w).addValidator(pubSubTopic, w.generateOrderedValidator())
w.validatorInserted[pubSubTopic] = true
# set this topic parameters for scoring
w.topicParams[pubsubTopic] = TopicParameters
# subscribe to the topic with our wrapped handler
procCall GossipSub(w).subscribe(pubsubTopic, wrappedHandler)
return wrappedHandler
proc unsubscribeAll*(w: WakuRelay, pubsubTopic: PubsubTopic) =
## Unsubscribe all handlers on this pubsub topic
debug "unsubscribe all", pubsubTopic = pubsubTopic
procCall GossipSub(w).unsubscribeAll(pubsubTopic)
w.validatorInserted.del(pubsubTopic)
proc unsubscribe*(w: WakuRelay, pubsubTopic: PubsubTopic, handler: TopicHandler) =
## Unsubscribe this handler on this pubsub topic
debug "unsubscribe", pubsubTopic = pubsubTopic
procCall GossipSub(w).unsubscribe(pubsubTopic, handler)
proc publish*(
w: WakuRelay, pubsubTopic: PubsubTopic, message: WakuMessage
): Future[int] {.async.} =
let data = message.encode().buffer
let msgHash = computeMessageHash(pubsubTopic, message).to0xHex()
notice "start publish Waku message", msg_hash = msgHash, pubsubTopic = pubsubTopic
let relayedPeerCount = await procCall GossipSub(w).publish(pubsubTopic, data)
if relayedPeerCount > 0:
for obs in w.publishObservers:
obs.onMessagePublished(pubSubTopic, message)
return relayedPeerCount