## Waku Relay module. Thin layer on top of GossipSub. ## ## See https://github.com/vacp2p/specs/blob/master/specs/waku/v2/waku-relay.md ## for spec. when (NimMajor, NimMinor) < (1, 4): {.push raises: [Defect].} else: {.push raises: [].} import std/strformat, stew/results, sequtils, chronos, chronicles, metrics, libp2p/multihash, libp2p/protocols/pubsub/pubsub, libp2p/protocols/pubsub/gossipsub, libp2p/protocols/pubsub/rpc/messages, libp2p/stream/connection, libp2p/switch import ../waku_core, ./message_id logScope: topics = "waku relay" const WakuRelayCodec* = "/vac/waku/relay/2.0.0" # see: https://github.com/libp2p/specs/blob/master/pubsub/gossipsub/gossipsub-v1.1.md#overview-of-new-parameters const TopicParameters = TopicParams( topicWeight: 1, # p1: favours peers already in the mesh timeInMeshWeight: 0.01, timeInMeshQuantum: 1.seconds, timeInMeshCap: 10.0, # p2: rewards fast peers firstMessageDeliveriesWeight: 1.0, firstMessageDeliveriesDecay: 0.5, firstMessageDeliveriesCap: 10.0, # p3: penalizes lazy peers. safe low value meshMessageDeliveriesWeight: 0.0, meshMessageDeliveriesDecay: 0.0, meshMessageDeliveriesCap: 0, meshMessageDeliveriesThreshold: 0, meshMessageDeliveriesWindow: 0.milliseconds, meshMessageDeliveriesActivation: 0.seconds, # p3b: tracks history of prunes meshFailurePenaltyWeight: 0.0, meshFailurePenaltyDecay: 0.0, # p4: penalizes invalid messages. highly penalize # peers sending wrong messages invalidMessageDeliveriesWeight: -100.0, invalidMessageDeliveriesDecay: 0.5 ) # see: https://rfc.vac.dev/spec/29/#gossipsub-v10-parameters const GossipsubParameters = GossipSubParams( explicit: true, pruneBackoff: chronos.minutes(1), unsubscribeBackoff: chronos.seconds(5), floodPublish: true, gossipFactor: 0.25, d: 6, dLow: 4, dHigh: 8, dScore: 6, dOut: 3, dLazy: 6, heartbeatInterval: chronos.seconds(1), historyLength: 6, historyGossip: 3, fanoutTTL: chronos.minutes(1), seenTTL: chronos.minutes(2), # no gossip is sent to peers below this score gossipThreshold: -100, # no self-published msgs are sent to peers below this score publishThreshold: -1000, # used to trigger disconnections + ignore peer if below this score graylistThreshold: -10000, # grafts better peers if the mesh median score drops below this. unset. opportunisticGraftThreshold: 0, # how often peer scoring is updated decayInterval: chronos.seconds(12), # below this we consider the parameter to be zero decayToZero: 0.01, # remember peer score during x after it disconnects retainScore: chronos.minutes(10), # p5: application specific, unset appSpecificWeight: 0.0, # p6: penalizes peers sharing more than threshold ips ipColocationFactorWeight: -50.0, ipColocationFactorThreshold: 5.0, # p7: penalizes bad behaviour (weight and decay) behaviourPenaltyWeight: -10.0, behaviourPenaltyDecay: 0.986, # triggers disconnections of bad peers aka score w.maxMessageSize: let message = fmt"Message size exceeded maximum of {w.maxMessageSize} bytes" debug "Invalid Waku Message", error=message return err(message) for (validator, message) in w.wakuValidators: let validatorRes = await validator(pubsubTopic, msg) if validatorRes != ValidationResult.Accept: if message.len > 0: return err(message) else: return err("Validator failed") return ok() proc subscribe*(w: WakuRelay, pubsubTopic: PubsubTopic, handler: WakuRelayHandler): TopicHandler = debug "subscribe", pubsubTopic=pubsubTopic # We need to wrap the handler since gossipsub doesnt understand WakuMessage let wrappedHandler = proc(pubsubTopic: string, data: seq[byte]): Future[void] {.gcsafe, raises: [].} = let decMsg = WakuMessage.decode(data) if decMsg.isErr(): # fine if triggerSelf enabled, since validators are bypassed error "failed to decode WakuMessage, validator passed a wrong message", error = decMsg.error let fut = newFuture[void]() fut.complete() return fut else: return handler(pubsubTopic, decMsg.get()) # Add the ordered validator to the topic # This assumes that if `w.validatorInserted.hasKey(pubSubTopic) is true`, it contains the ordered validator. # Otherwise this might lead to unintended behaviour. if not w.validatorInserted.hasKey(pubSubTopic): procCall GossipSub(w).addValidator(pubSubTopic, w.generateOrderedValidator()) w.validatorInserted[pubSubTopic] = true # set this topic parameters for scoring w.topicParams[pubsubTopic] = TopicParameters # subscribe to the topic with our wrapped handler procCall GossipSub(w).subscribe(pubsubTopic, wrappedHandler) return wrappedHandler proc unsubscribeAll*(w: WakuRelay, pubsubTopic: PubsubTopic) = ## Unsubscribe all handlers on this pubsub topic debug "unsubscribe all", pubsubTopic=pubsubTopic procCall GossipSub(w).unsubscribeAll(pubsubTopic) w.validatorInserted.del(pubsubTopic) proc unsubscribe*(w: WakuRelay, pubsubTopic: PubsubTopic, handler: TopicHandler) = ## Unsubscribe this handler on this pubsub topic debug "unsubscribe", pubsubTopic=pubsubTopic procCall GossipSub(w).unsubscribe(pubsubTopic, handler) proc publish*(w: WakuRelay, pubsubTopic: PubsubTopic, message: WakuMessage): Future[int] {.async.} = trace "publish", pubsubTopic=pubsubTopic let data = message.encode().buffer return await procCall GossipSub(w).publish(pubsubTopic, data)