## Waku Relay module. Thin layer on top of GossipSub. ## ## See https://github.com/vacp2p/specs/blob/master/specs/waku/v2/waku-relay.md ## for spec. {.push raises: [].} import std/[strformat, strutils], stew/byteutils, results, sequtils, chronos, chronicles, metrics, libp2p/multihash, libp2p/protocols/pubsub/gossipsub, libp2p/protocols/pubsub/rpc/messages, libp2p/stream/connection, libp2p/switch import ../waku_core, ./message_id, ./topic_health, ../node/delivery_monitor/publish_observer from ../waku_core/codecs import WakuRelayCodec export WakuRelayCodec logScope: topics = "waku relay" # see: https://github.com/libp2p/specs/blob/master/pubsub/gossipsub/gossipsub-v1.1.md#overview-of-new-parameters const TopicParameters = TopicParams( topicWeight: 1, # p1: favours peers already in the mesh timeInMeshWeight: 0.01, timeInMeshQuantum: 1.seconds, timeInMeshCap: 10.0, # p2: rewards fast peers firstMessageDeliveriesWeight: 1.0, firstMessageDeliveriesDecay: 0.5, firstMessageDeliveriesCap: 10.0, # p3: penalizes lazy peers. safe low value meshMessageDeliveriesWeight: 0.0, meshMessageDeliveriesDecay: 0.0, meshMessageDeliveriesCap: 0, meshMessageDeliveriesThreshold: 0, meshMessageDeliveriesWindow: 0.milliseconds, meshMessageDeliveriesActivation: 0.seconds, # p3b: tracks history of prunes meshFailurePenaltyWeight: 0.0, meshFailurePenaltyDecay: 0.0, # p4: penalizes invalid messages. highly penalize # peers sending wrong messages invalidMessageDeliveriesWeight: -100.0, invalidMessageDeliveriesDecay: 0.5, ) declareCounter waku_relay_network_bytes, "total traffic per topic, distinct gross/net and direction", labels = ["topic", "type", "direction"] # see: https://rfc.vac.dev/spec/29/#gossipsub-v10-parameters const GossipsubParameters = GossipSubParams.init( pruneBackoff = chronos.minutes(1), unsubscribeBackoff = chronos.seconds(5), floodPublish = true, gossipFactor = 0.25, d = 6, dLow = 4, dHigh = 8, dScore = 6, dOut = 3, dLazy = 6, heartbeatInterval = chronos.seconds(1), historyLength = 6, historyGossip = 3, fanoutTTL = chronos.minutes(1), seenTTL = chronos.minutes(2), # no gossip is sent to peers below this score gossipThreshold = -100, # no self-published msgs are sent to peers below this score publishThreshold = -1000, # used to trigger disconnections + ignore peer if below this score graylistThreshold = -10000, # grafts better peers if the mesh median score drops below this. unset. opportunisticGraftThreshold = 0, # how often peer scoring is updated decayInterval = chronos.seconds(12), # below this we consider the parameter to be zero decayToZero = 0.01, # remember peer score during x after it disconnects retainScore = chronos.minutes(10), # p5: application specific, unset appSpecificWeight = 0.0, # p6: penalizes peers sharing more than threshold ips ipColocationFactorWeight = -50.0, ipColocationFactorThreshold = 5.0, # p7: penalizes bad behaviour (weight and decay) behaviourPenaltyWeight = -10.0, behaviourPenaltyDecay = 0.986, # triggers disconnections of bad peers aka score 0: # slow path - we have to wait for the handlers to complete try: futs = await allFinished(futs) except CancelledError: # check for errors in futures for fut in futs: if fut.failed: let err = fut.readError() warn "Error in health change handler", description = err.msg proc topicsHealthLoop(wakuRelay: WakuRelay) {.async.} = while true: await wakuRelay.updateTopicsHealth() await sleepAsync(10.seconds) method start*(w: WakuRelay) {.async, base.} = debug "start" await procCall GossipSub(w).start() w.topicHealthLoopHandle = w.topicsHealthLoop() method stop*(w: WakuRelay) {.async, base.} = debug "stop" await procCall GossipSub(w).stop() if not w.topicHealthLoopHandle.isNil(): await w.topicHealthLoopHandle.cancelAndWait() proc isSubscribed*(w: WakuRelay, topic: PubsubTopic): bool = GossipSub(w).topics.hasKey(topic) proc subscribedTopics*(w: WakuRelay): seq[PubsubTopic] = return toSeq(GossipSub(w).topics.keys()) proc generateOrderedValidator(w: WakuRelay): ValidatorHandler {.gcsafe.} = # rejects messages that are not WakuMessage let wrappedValidator = proc( pubsubTopic: string, message: messages.Message ): Future[ValidationResult] {.async.} = # can be optimized by checking if the message is a WakuMessage without allocating memory # see nim-libp2p protobuf library let msg = WakuMessage.decode(message.data).valueOr: error "protocol generateOrderedValidator reject decode error", pubsubTopic = pubsubTopic, error = $error return ValidationResult.Reject # now sequentially validate the message for (validator, errorMessage) in w.wakuValidators: let validatorRes = await validator(pubsubTopic, msg) if validatorRes != ValidationResult.Accept: let msgHash = computeMessageHash(pubsubTopic, msg).to0xHex() error "protocol generateOrderedValidator reject waku validator", msg_hash = msgHash, pubsubTopic = pubsubTopic, contentTopic = msg.contentTopic, validatorRes = validatorRes, error = errorMessage return validatorRes return ValidationResult.Accept return wrappedValidator proc validateMessage*( w: WakuRelay, pubsubTopic: string, msg: WakuMessage ): Future[Result[void, string]] {.async.} = let messageSizeBytes = msg.encode().buffer.len let msgHash = computeMessageHash(pubsubTopic, msg).to0xHex() if messageSizeBytes > w.maxMessageSize: let message = fmt"Message size exceeded maximum of {w.maxMessageSize} bytes" error "too large Waku message", msg_hash = msgHash, error = message, messageSizeBytes = messageSizeBytes, maxMessageSize = w.maxMessageSize return err(message) for (validator, message) in w.wakuValidators: let validatorRes = await validator(pubsubTopic, msg) if validatorRes != ValidationResult.Accept: if message.len > 0: error "invalid Waku message", msg_hash = msgHash, error = message return err(message) else: ## This should never happen error "uncertain invalid Waku message", msg_hash = msgHash, error = message return err("validator failed") return ok() proc subscribe*(w: WakuRelay, pubsubTopic: PubsubTopic, handler: WakuRelayHandler) = debug "subscribe", pubsubTopic = pubsubTopic # We need to wrap the handler since gossipsub doesnt understand WakuMessage let topicHandler = proc( pubsubTopic: string, data: seq[byte] ): Future[void] {.gcsafe, raises: [].} = let decMsg = WakuMessage.decode(data) if decMsg.isErr(): # fine if triggerSelf enabled, since validators are bypassed error "failed to decode WakuMessage, validator passed a wrong message", pubsubTopic = pubsubTopic, error = decMsg.error let fut = newFuture[void]() fut.complete() return fut else: # this subscription handler is called once for every validated message # that will be relayed, hence this is the place we can count net incoming traffic waku_relay_network_bytes.inc( data.len.int64 + pubsubTopic.len.int64, labelValues = [pubsubTopic, "net", "in"] ) return handler(pubsubTopic, decMsg.get()) # Add the ordered validator to the topic # This assumes that if `w.validatorInserted.hasKey(pubSubTopic) is true`, it contains the ordered validator. # Otherwise this might lead to unintended behaviour. if not w.topicValidator.hasKey(pubSubTopic): let newValidator = w.generateOrderedValidator() procCall GossipSub(w).addValidator(pubSubTopic, w.generateOrderedValidator()) w.topicValidator[pubSubTopic] = newValidator # set this topic parameters for scoring w.topicParams[pubsubTopic] = TopicParameters # subscribe to the topic with our wrapped handler procCall GossipSub(w).subscribe(pubsubTopic, topicHandler) w.topicHandlers[pubsubTopic] = topicHandler proc unsubscribeAll*(w: WakuRelay, pubsubTopic: PubsubTopic) = ## Unsubscribe all handlers on this pubsub topic debug "unsubscribe all", pubsubTopic = pubsubTopic procCall GossipSub(w).unsubscribeAll(pubsubTopic) w.topicValidator.del(pubsubTopic) w.topicHandlers.del(pubsubTopic) proc unsubscribe*(w: WakuRelay, pubsubTopic: PubsubTopic) = if not w.topicValidator.hasKey(pubsubTopic): error "unsubscribe no validator for this topic", pubsubTopic return if not w.topicHandlers.hasKey(pubsubTopic): error "not subscribed to the given topic", pubsubTopic return var topicHandler: TopicHandler var topicValidator: ValidatorHandler try: topicHandler = w.topicHandlers[pubsubTopic] topicValidator = w.topicValidator[pubsubTopic] except KeyError: error "exception in unsubscribe", pubsubTopic, error = getCurrentExceptionMsg() return debug "unsubscribe", pubsubTopic procCall GossipSub(w).unsubscribe(pubsubTopic, topicHandler) procCall GossipSub(w).removeValidator(pubsubTopic, topicValidator) w.topicValidator.del(pubsubTopic) w.topicHandlers.del(pubsubTopic) proc publish*( w: WakuRelay, pubsubTopic: PubsubTopic, wakuMessage: WakuMessage ): Future[Result[int, PublishOutcome]] {.async.} = if pubsubTopic.isEmptyOrWhitespace(): return err(NoTopicSpecified) var message = wakuMessage if message.timestamp == 0: message.timestamp = getNowInNanosecondTime() let data = message.encode().buffer let msgHash = computeMessageHash(pubsubTopic, message).to0xHex() notice "start publish Waku message", msg_hash = msgHash, pubsubTopic = pubsubTopic let relayedPeerCount = await procCall GossipSub(w).publish(pubsubTopic, data) if relayedPeerCount <= 0: return err(NoPeersToPublish) for obs in w.publishObservers: obs.onMessagePublished(pubSubTopic, message) return ok(relayedPeerCount) proc getConnectedPubSubPeers*( w: WakuRelay, pubsubTopic: PubsubTopic ): Result[HashSet[PubsubPeer], string] = ## Returns the list of peerIds of connected peers and subscribed to the passed pubsub topic. ## The 'gossipsub' atribute is defined in the GossipSub ref object. if pubsubTopic == "": ## Return all the connected peers var peerIds = initHashSet[PubsubPeer]() for k, v in w.gossipsub: peerIds = peerIds + v return ok(peerIds) if not w.gossipsub.hasKey(pubsubTopic): return err( "getConnectedPeers - there is no gossipsub peer for the given pubsub topic: " & pubsubTopic ) let peersRes = catch: w.gossipsub[pubsubTopic] let peers: HashSet[PubSubPeer] = peersRes.valueOr: return err("getConnectedPeers - exception accessing " & pubsubTopic & ": " & error.msg) return ok(peers) proc getConnectedPeers*( w: WakuRelay, pubsubTopic: PubsubTopic ): Result[seq[PeerId], string] = ## Returns the list of peerIds of connected peers and subscribed to the passed pubsub topic. ## The 'gossipsub' atribute is defined in the GossipSub ref object. let peers = w.getConnectedPubSubPeers(pubsubTopic).valueOr: return err(error) let peerIds = toSeq(peers).mapIt(it.peerId) return ok(peerIds) proc getNumConnectedPeers*( w: WakuRelay, pubsubTopic: PubsubTopic ): Result[int, string] = ## Returns the number of connected peers and subscribed to the passed pubsub topic. ## Return all the connected peers let peers = w.getConnectedPubSubPeers(pubsubTopic).valueOr: return err( "getNumConnectedPeers - failed retrieving peers in mesh: " & pubsubTopic & ": " & error ) return ok(peers.len) proc getSubscribedTopics*(w: WakuRelay): seq[PubsubTopic] = ## Returns a seq containing the current list of subscribed topics return PubSub(w).topics.keys.toSeq().mapIt(cast[PubsubTopic](it))