diff --git a/waku/v2/node/waku_node.nim b/waku/v2/node/waku_node.nim index 3a534d42e..b30f82ae0 100644 --- a/waku/v2/node/waku_node.nim +++ b/waku/v2/node/waku_node.nim @@ -343,14 +343,16 @@ proc publish*(node: WakuNode, topic: PubsubTopic, message: WakuMessage) {.async, discard await node.wakuRelay.publish(topic, data) proc startRelay*(node: WakuNode) {.async.} = + ## Setup and start relay protocol + info "starting relay protocol" + if node.wakuRelay.isNil(): trace "Failed to start relay. Not mounted." return - ## Setup and start relay protocol - info "starting relay" + ## Setup relay protocol - # PubsubTopic subscriptions + # Subscribe to the default PubSub topics for topic in node.wakuRelay.defaultPubsubTopics: node.subscribe(topic, none(TopicHandler)) @@ -371,46 +373,33 @@ proc startRelay*(node: WakuNode) {.async.} = info "relay started successfully" proc mountRelay*(node: WakuNode, - topics: seq[string] = newSeq[string](), + topics: seq[string] = @[], triggerSelf = true, - peerExchangeHandler = none(RoutingRecordsHandler)) - # TODO: Better error handling: CatchableError is raised by `waitFor` - {.async, gcsafe, raises: [Defect, InitializationError, LPError, CatchableError].} = - - proc msgIdProvider(m: messages.Message): Result[MessageID, ValidationResult] = - let mh = MultiHash.digest("sha2-256", m.data) - if mh.isOk(): - return ok(mh[].data.buffer) - else: - return ok(($m.data.hash).toBytes()) - - let wakuRelay = WakuRelay.init( - switch = node.switch, - msgIdProvider = msgIdProvider, - triggerSelf = triggerSelf, - sign = false, - verifySignature = false, - maxMessageSize = MaxWakuMessageSize - ) + peerExchangeHandler = none(RoutingRecordsHandler)) {.async, gcsafe.} = + ## The default relay topics is the union of all configured topics plus default PubsubTopic(s) + info "mounting relay protocol" - info "mounting relay" + let initRes = WakuRelay.new( + node.peerManager, + defaultPubsubTopics = concat(@[DefaultPubsubTopic], topics), + triggerSelf = triggerSelf + ) + if initRes.isErr(): + error "failed mountin relay protocol", error=initRes.error + return - ## The default relay topics is the union of - ## all configured topics plus the hard-coded defaultTopic(s) - wakuRelay.defaultPubsubTopics = concat(@[DefaultPubsubTopic], topics) + node.wakuRelay = initRes.value ## Add peer exchange handler if peerExchangeHandler.isSome(): - wakuRelay.parameters.enablePX = true # Feature flag for peer exchange in nim-libp2p - wakuRelay.routingRecordsHandler.add(peerExchangeHandler.get()) + node.wakuRelay.parameters.enablePX = true # Feature flag for peer exchange in nim-libp2p + node.wakuRelay.routingRecordsHandler.add(peerExchangeHandler.get()) - node.wakuRelay = wakuRelay if node.started: - # Node has started already. Let's start relay too. await node.startRelay() - node.switch.mount(wakuRelay, protocolMatcher(WakuRelayCodec)) - + node.switch.mount(node.wakuRelay, protocolMatcher(WakuRelayCodec)) + info "relay mounted successfully" diff --git a/waku/v2/protocol/waku_relay.nim b/waku/v2/protocol/waku_relay.nim index b3e749072..3bfd4d182 100644 --- a/waku/v2/protocol/waku_relay.nim +++ b/waku/v2/protocol/waku_relay.nim @@ -8,13 +8,17 @@ else: {.push raises: [].} import + stew/results, chronos, chronicles, metrics, + libp2p/multihash, libp2p/protocols/pubsub/pubsub, + libp2p/protocols/pubsub/rpc/messages, libp2p/protocols/pubsub/gossipsub, libp2p/stream/connection import + ../node/peer_manager/peer_manager, ./waku_message logScope: @@ -23,37 +27,45 @@ logScope: const WakuRelayCodec* = "/vac/waku/relay/2.0.0" + +type WakuRelayResult*[T] = Result[T, string] + +type + PubsubRawHandler* = proc(pubsubTopic: PubsubTopic, data: seq[byte]): Future[void] {.gcsafe, raises: [Defect].} + SubsciptionHandler* = proc(pubsubTopic: PubsubTopic, message: WakuMessage): Future[void] {.gcsafe, raises: [Defect].} + type WakuRelay* = ref object of GossipSub + peerManager: PeerManager defaultPubsubTopics*: seq[PubsubTopic] # Default configured PubSub topics -method init*(w: WakuRelay) = - debug "init WakuRelay" - + +proc initProtocolHandler(w: WakuRelay) = proc handler(conn: Connection, proto: string) {.async.} = ## main protocol handler that gets triggered on every ## connection for a protocol string ## e.g. ``/wakusub/0.0.1``, etc... - ## + debug "Incoming WakuRelay connection", connection=conn, protocol=proto - debug "Incoming WakuRelay connection" try: await w.handleConn(conn, proto) except CancelledError: # This is top-level procedure which will work as separate task, so it # do not need to propogate CancelledError. - trace "Unexpected cancellation in relay handler", conn - except CatchableError as exc: - trace "WakuRelay handler leaks an error", exc = exc.msg, conn + error "Unexpected cancellation in relay handler", conn=conn, error=getCurrentExceptionMsg() + except CatchableError: + error "WakuRelay handler leaks an error", conn=conn, error=getCurrentExceptionMsg() # XXX: Handler hijack GossipSub here? w.handler = handler w.codec = WakuRelayCodec -method initPubSub*(w: WakuRelay) {.raises: [Defect, InitializationError].} = - debug "initWakuRelay" +method initPubSub(w: WakuRelay) {.raises: [InitializationError].} = + ## NOTE: This method overrides GossipSub initPubSub method; it called by the + ## parent protocol, PubSub. + debug "init waku relay" - # after discussions with @sinkingsugar, this is essentially what is needed for + # After discussions with @sinkingsugar: This is essentially what is needed for # the libp2p `StrictNoSign` policy w.anonymize = true w.verifySignature = false @@ -61,41 +73,86 @@ method initPubSub*(w: WakuRelay) {.raises: [Defect, InitializationError].} = procCall GossipSub(w).initPubSub() - w.init() + w.initProtocolHandler() -method subscribe*(w: WakuRelay, - pubSubTopic: PubsubTopic, - handler: TopicHandler) = - debug "subscribe", pubSubTopic=pubSubTopic - procCall GossipSub(w).subscribe(pubSubTopic, handler) +proc new*(T: type WakuRelay, + peerManager: PeerManager, + defaultPubsubTopics: seq[PubsubTopic] = @[], + triggerSelf: bool = true): WakuRelayResult[T] = + + proc msgIdProvider(msg: messages.Message): Result[MessageID, ValidationResult] = + let hash = MultiHash.digest("sha2-256", msg.data) + if hash.isErr(): + ok(($msg.data.hash).toBytes()) + else: + ok(hash.value.data.buffer) -method publish*(w: WakuRelay, - pubSubTopic: PubsubTopic, - message: seq[byte] - ): Future[int] {.async.} = - trace "publish", pubSubTopic=pubSubTopic, message=message + var wr: WakuRelay + try: + wr = WakuRelay.init( + switch = peerManager.switch, + msgIdProvider = msgIdProvider, + triggerSelf = triggerSelf, + sign = false, + verifySignature = false, + maxMessageSize = MaxWakuMessageSize + ) + except InitializationError: + return err("initialization error: " & getCurrentExceptionMsg()) - return await procCall GossipSub(w).publish(pubSubTopic, message) + wr.peerManager = peerManager + wr.defaultPubsubTopics = defaultPubsubTopics -method unsubscribe*(w: WakuRelay, - topics: seq[TopicPair]) = - debug "unsubscribe" + ok(wr) - procCall GossipSub(w).unsubscribe(topics) -method unsubscribeAll*(w: WakuRelay, - pubSubTopic: PubsubTopic) = - debug "unsubscribeAll" - - procCall GossipSub(w).unsubscribeAll(pubSubTopic) - -# GossipSub specific methods -------------------------------------------------- method start*(w: WakuRelay) {.async.} = debug "start" await procCall GossipSub(w).start() method stop*(w: WakuRelay) {.async.} = debug "stop" - await procCall GossipSub(w).stop() + + +method subscribe*(w: WakuRelay, pubsubTopic: PubsubTopic, handler: SubsciptionHandler|PubsubRawHandler) = + debug "subscribe", pubsubTopic=pubsubTopic + + var subsHandler: PubsubRawHandler + when handler is SubsciptionHandler: + subsHandler = proc(pubsubTopic: PubsubTopic, data: seq[byte]): Future[void] {.gcsafe, raises: [Defect].} = + let decodeRes = WakuMessage.decode(data) + if decodeRes.isErr(): + debug "message decode failure", pubsubTopic=pubsubTopic, error=decodeRes.error + return + + handler(pubsubTopic, decodeRes.value) + else: + subsHandler = handler + + procCall GossipSub(w).subscribe(pubsubTopic, subsHandler) + +method unsubscribe*(w: WakuRelay, topics: seq[TopicPair]) = + debug "unsubscribe", topics=topics + + procCall GossipSub(w).unsubscribe(topics) + +method unsubscribeAll*(w: WakuRelay, pubsubTopic: PubsubTopic) = + debug "unsubscribeAll", pubsubTopic=pubsubTopic + + procCall GossipSub(w).unsubscribeAll(pubsubTopic) + + +method publish*(w: WakuRelay, pubsubTopic: PubsubTopic, message: WakuMessage|seq[byte]): Future[int] {.async.} = + trace "publish", pubsubTopic=pubsubTopic + + var data: seq[byte] + when message is WakuMessage: + data = message.encode() + else: + data = message + + return await procCall GossipSub(w).publish(pubsubTopic, message) + +