## Waku Relay module. Thin layer on top of GossipSub. ## ## See https://github.com/vacp2p/specs/blob/master/specs/waku/v2/waku-relay.md ## for spec. {.push raises: [].} import std/strformat, stew/byteutils, results, sequtils, chronos, chronicles, metrics, libp2p/multihash, libp2p/protocols/pubsub/pubsub, libp2p/protocols/pubsub/gossipsub, libp2p/protocols/pubsub/rpc/messages, libp2p/stream/connection, libp2p/switch import ../waku_core, ./message_id, ../node/delivery_monitor/publish_observer from ../waku_core/codecs import WakuRelayCodec export WakuRelayCodec logScope: topics = "waku relay" # see: https://github.com/libp2p/specs/blob/master/pubsub/gossipsub/gossipsub-v1.1.md#overview-of-new-parameters const TopicParameters = TopicParams( topicWeight: 1, # p1: favours peers already in the mesh timeInMeshWeight: 0.01, timeInMeshQuantum: 1.seconds, timeInMeshCap: 10.0, # p2: rewards fast peers firstMessageDeliveriesWeight: 1.0, firstMessageDeliveriesDecay: 0.5, firstMessageDeliveriesCap: 10.0, # p3: penalizes lazy peers. safe low value meshMessageDeliveriesWeight: 0.0, meshMessageDeliveriesDecay: 0.0, meshMessageDeliveriesCap: 0, meshMessageDeliveriesThreshold: 0, meshMessageDeliveriesWindow: 0.milliseconds, meshMessageDeliveriesActivation: 0.seconds, # p3b: tracks history of prunes meshFailurePenaltyWeight: 0.0, meshFailurePenaltyDecay: 0.0, # p4: penalizes invalid messages. highly penalize # peers sending wrong messages invalidMessageDeliveriesWeight: -100.0, invalidMessageDeliveriesDecay: 0.5, ) declareCounter waku_relay_network_bytes, "total traffic per topic, distinct gross/net and direction", labels = ["topic", "type", "direction"] # see: https://rfc.vac.dev/spec/29/#gossipsub-v10-parameters const GossipsubParameters = GossipSubParams.init( pruneBackoff = chronos.minutes(1), unsubscribeBackoff = chronos.seconds(5), floodPublish = true, gossipFactor = 0.25, d = 6, dLow = 4, dHigh = 8, dScore = 6, dOut = 3, dLazy = 6, heartbeatInterval = chronos.seconds(1), historyLength = 6, historyGossip = 3, fanoutTTL = chronos.minutes(1), seenTTL = chronos.minutes(2), # no gossip is sent to peers below this score gossipThreshold = -100, # no self-published msgs are sent to peers below this score publishThreshold = -1000, # used to trigger disconnections + ignore peer if below this score graylistThreshold = -10000, # grafts better peers if the mesh median score drops below this. unset. opportunisticGraftThreshold = 0, # how often peer scoring is updated decayInterval = chronos.seconds(12), # below this we consider the parameter to be zero decayToZero = 0.01, # remember peer score during x after it disconnects retainScore = chronos.minutes(10), # p5: application specific, unset appSpecificWeight = 0.0, # p6: penalizes peers sharing more than threshold ips ipColocationFactorWeight = -50.0, ipColocationFactorThreshold = 5.0, # p7: penalizes bad behaviour (weight and decay) behaviourPenaltyWeight = -10.0, behaviourPenaltyDecay = 0.986, # triggers disconnections of bad peers aka score w.maxMessageSize: let message = fmt"Message size exceeded maximum of {w.maxMessageSize} bytes" error "too large Waku message", msg_hash = msgHash, error = message, messageSizeBytes = messageSizeBytes, maxMessageSize = w.maxMessageSize return err(message) for (validator, message) in w.wakuValidators: let validatorRes = await validator(pubsubTopic, msg) if validatorRes != ValidationResult.Accept: if message.len > 0: error "invalid Waku message", msg_hash = msgHash, error = message return err(message) else: ## This should never happen error "uncertain invalid Waku message", msg_hash = msgHash, error = message return err("validator failed") return ok() proc subscribe*( w: WakuRelay, pubsubTopic: PubsubTopic, handler: WakuRelayHandler ): TopicHandler = debug "subscribe", pubsubTopic = pubsubTopic # We need to wrap the handler since gossipsub doesnt understand WakuMessage let wrappedHandler = proc( pubsubTopic: string, data: seq[byte] ): Future[void] {.gcsafe, raises: [].} = let decMsg = WakuMessage.decode(data) if decMsg.isErr(): # fine if triggerSelf enabled, since validators are bypassed error "failed to decode WakuMessage, validator passed a wrong message", pubsubTopic = pubsubTopic, error = decMsg.error let fut = newFuture[void]() fut.complete() return fut else: # this subscription handler is called once for every validated message # that will be relayed, hence this is the place we can count net incoming traffic waku_relay_network_bytes.inc( data.len.int64 + pubsubTopic.len.int64, labelValues = [pubsubTopic, "net", "in"] ) return handler(pubsubTopic, decMsg.get()) # Add the ordered validator to the topic # This assumes that if `w.validatorInserted.hasKey(pubSubTopic) is true`, it contains the ordered validator. # Otherwise this might lead to unintended behaviour. if not w.validatorInserted.hasKey(pubSubTopic): procCall GossipSub(w).addValidator(pubSubTopic, w.generateOrderedValidator()) w.validatorInserted[pubSubTopic] = true # set this topic parameters for scoring w.topicParams[pubsubTopic] = TopicParameters # subscribe to the topic with our wrapped handler procCall GossipSub(w).subscribe(pubsubTopic, wrappedHandler) return wrappedHandler proc unsubscribeAll*(w: WakuRelay, pubsubTopic: PubsubTopic) = ## Unsubscribe all handlers on this pubsub topic debug "unsubscribe all", pubsubTopic = pubsubTopic procCall GossipSub(w).unsubscribeAll(pubsubTopic) w.validatorInserted.del(pubsubTopic) proc unsubscribe*(w: WakuRelay, pubsubTopic: PubsubTopic, handler: TopicHandler) = ## Unsubscribe this handler on this pubsub topic debug "unsubscribe", pubsubTopic = pubsubTopic procCall GossipSub(w).unsubscribe(pubsubTopic, handler) proc publish*( w: WakuRelay, pubsubTopic: PubsubTopic, message: WakuMessage ): Future[int] {.async.} = let data = message.encode().buffer let msgHash = computeMessageHash(pubsubTopic, message).to0xHex() notice "start publish Waku message", msg_hash = msgHash, pubsubTopic = pubsubTopic let relayedPeerCount = await procCall GossipSub(w).publish(pubsubTopic, data) if relayedPeerCount > 0: for obs in w.publishObservers: obs.onMessagePublished(pubSubTopic, message) return relayedPeerCount proc getNumPeersInMesh*(w: WakuRelay, pubsubTopic: PubsubTopic): Result[int, string] = ## Returns the number of peers in a mesh defined by the passed pubsub topic. ## The 'mesh' atribute is defined in the GossipSub ref object. if not w.mesh.hasKey(pubsubTopic): return err( "getNumPeersInMesh - there is no mesh peer for the given pubsub topic: " & pubsubTopic ) let peersRes = catch: w.mesh[pubsubTopic] let peers: HashSet[PubSubPeer] = peersRes.valueOr: return err("getNumPeersInMesh - exception accessing " & pubsubTopic & ": " & error.msg) return ok(peers.len) proc getNumConnectedPeers*( w: WakuRelay, pubsubTopic: PubsubTopic ): Result[int, string] = ## Returns the number of connected peers and subscribed to the passed pubsub topic. ## The 'gossipsub' atribute is defined in the GossipSub ref object. if pubsubTopic == "": ## Return all the connected peers var numConnPeers = 0 for k, v in w.gossipsub: numConnPeers.inc(v.len) return ok(numConnPeers) if not w.gossipsub.hasKey(pubsubTopic): return err( "getNumConnectedPeers - there is no gossipsub peer for the given pubsub topic: " & pubsubTopic ) let peersRes = catch: w.gossipsub[pubsubTopic] let peers: HashSet[PubSubPeer] = peersRes.valueOr: return err( "getNumConnectedPeers - exception accessing " & pubsubTopic & ": " & error.msg ) return ok(peers.len) proc getSubscribedTopics*(w: WakuRelay): seq[PubsubTopic] = ## Returns a seq containing the current list of subscribed topics var topics: seq[PubsubTopic] for t in w.validatorInserted.keys(): topics.add(t) return topics