diff --git a/Dockerfile.lightpushWithMix.compile b/Dockerfile.lightpushWithMix.compile index e39b88d91..381ee60ef 100644 --- a/Dockerfile.lightpushWithMix.compile +++ b/Dockerfile.lightpushWithMix.compile @@ -7,7 +7,7 @@ ARG NIM_COMMIT ARG LOG_LEVEL=TRACE # Get build tools and required header files -RUN apk add --no-cache bash git build-base openssl-dev pcre-dev linux-headers curl jq +RUN apk add --no-cache bash git build-base openssl-dev linux-headers curl jq WORKDIR /app COPY . . @@ -40,14 +40,12 @@ LABEL version="unknown" EXPOSE 30303 60000 8545 # Referenced in the binary -RUN apk add --no-cache libgcc pcre-dev libpq-dev \ +RUN apk add --no-cache libgcc libpq-dev \ wget \ iproute2 \ python3 \ jq -# Fix for 'Error loading shared library libpcre.so.3: No such file or directory' -RUN ln -s /usr/lib/libpcre.so /usr/lib/libpcre.so.3 COPY --from=nim-build /app/build/lightpush_publisher_mix /usr/bin/ RUN chmod +x /usr/bin/lightpush_publisher_mix diff --git a/apps/benchmarks/benchmarks.nim b/apps/benchmarks/benchmarks.nim index 75686c8b9..7082f7a35 100644 --- a/apps/benchmarks/benchmarks.nim +++ b/apps/benchmarks/benchmarks.nim @@ -28,10 +28,9 @@ proc benchmark( iter = i, elapsed_ms = (getTime() - start_time).inMilliseconds discard await manager.updateRoots() - let proofResult = await manager.fetchMerkleProofElements() - if proofResult.isErr(): - error "Failed to fetch Merkle proof", error = proofResult.error - manager.merkleProofCache = proofResult.get() + manager.merkleProofCache = (await manager.fetchMerkleProofElements()).valueOr: + error "Failed to fetch Merkle proof", error = error + quit(QuitFailure) let epoch = default(Epoch) debug "epoch in bytes", epochHex = epoch.inHex() diff --git a/apps/chat2/chat2.nim b/apps/chat2/chat2.nim index 1531a4652..8a28b3b72 100644 --- a/apps/chat2/chat2.nim +++ b/apps/chat2/chat2.nim @@ -132,25 +132,14 @@ proc showChatPrompt(c: Chat) = except IOError: discard -proc getChatLine(c: Chat, msg: WakuMessage): Result[string, string] = +proc getChatLine(payload: seq[byte]): string = # No payload encoding/encryption from Waku - let - pb = Chat2Message.init(msg.payload) - chatLine = - if pb.isOk: - pb[].toString() - else: - string.fromBytes(msg.payload) - return ok(chatline) + let pb = Chat2Message.init(payload).valueOr: + return string.fromBytes(payload) + return $pb proc printReceivedMessage(c: Chat, msg: WakuMessage) = - let - pb = Chat2Message.init(msg.payload) - chatLine = - if pb.isOk: - pb[].toString() - else: - string.fromBytes(msg.payload) + let chatLine = getChatLine(msg.payload) try: echo &"{chatLine}" except ValueError: @@ -173,18 +162,16 @@ proc startMetricsServer( ): Result[MetricsHttpServerRef, string] = info "Starting metrics HTTP server", serverIp = $serverIp, serverPort = $serverPort - let metricsServerRes = MetricsHttpServerRef.new($serverIp, serverPort) - if metricsServerRes.isErr(): - return err("metrics HTTP server start failed: " & $metricsServerRes.error) + let server = MetricsHttpServerRef.new($serverIp, serverPort).valueOr: + return err("metrics HTTP server start failed: " & $error) - let server = metricsServerRes.value try: waitFor server.start() except CatchableError: return err("metrics HTTP server start failed: " & getCurrentExceptionMsg()) info "Metrics HTTP server started", serverIp = $serverIp, serverPort = $serverPort - ok(metricsServerRes.value) + ok(server) proc publish(c: Chat, line: string) = # First create a Chat2Message protobuf with this line of text @@ -202,19 +189,17 @@ proc publish(c: Chat, line: string) = version: 0, timestamp: getNanosecondTime(time), ) + if not isNil(c.node.wakuRlnRelay): # for future version when we support more than one rln protected content topic, # we should check the message content topic as well - let appendRes = c.node.wakuRlnRelay.appendRLNProof(message, float64(time)) - if appendRes.isErr(): + if c.node.wakuRlnRelay.appendRLNProof(message, float64(time)).isErr(): debug "could not append rate limit proof to the message" else: debug "rate limit proof is appended to the message" - let decodeRes = RateLimitProof.init(message.proof) - if decodeRes.isErr(): + let proof = RateLimitProof.init(message.proof).valueOr: error "could not decode the RLN proof" - - let proof = decodeRes.get() + return # TODO move it to log after dogfooding let msgEpoch = fromEpoch(proof.epoch) if fromEpoch(c.node.wakuRlnRelay.lastEpoch) == msgEpoch: @@ -438,7 +423,7 @@ proc processInput(rfd: AsyncFD, rng: ref HmacDrbgContext) {.async.} = let resolved = await dnsResolver.resolveTxt(domain) return resolved[0] # Use only first answer - var wakuDnsDiscovery = WakuDnsDiscovery.init(dnsDiscoveryUrl.get(), resolver) + let wakuDnsDiscovery = WakuDnsDiscovery.init(dnsDiscoveryUrl.get(), resolver) if wakuDnsDiscovery.isOk: let discoveredPeers = await wakuDnsDiscovery.get().findPeers() if discoveredPeers.isOk: @@ -446,8 +431,10 @@ proc processInput(rfd: AsyncFD, rng: ref HmacDrbgContext) {.async.} = discoveredNodes = discoveredPeers.get() echo "Discovered and connecting to " & $discoveredNodes waitFor chat.node.connectToNodes(discoveredNodes) + else: + warn "Failed to find peers via DNS discovery", error = discoveredPeers.error else: - warn "Failed to init Waku DNS discovery" + warn "Failed to init Waku DNS discovery", error = wakuDnsDiscovery.error let peerInfo = node.switch.peerInfo let listenStr = $peerInfo.addrs[0] & "/p2p/" & $peerInfo.peerId @@ -483,21 +470,19 @@ proc processInput(rfd: AsyncFD, rng: ref HmacDrbgContext) {.async.} = else: newSeq[byte](0) - let - pb = Chat2Message.init(payload) - chatLine = - if pb.isOk: - pb[].toString() - else: - string.fromBytes(payload) + let chatLine = getChatLine(payload) echo &"{chatLine}" info "Hit store handler" - let queryRes = await node.query( - StoreQueryRequest(contentTopics: @[chat.contentTopic]), storenode.get() - ) - if queryRes.isOk(): - storeHandler(queryRes.value) + block storeQueryBlock: + let queryRes = ( + await node.query( + StoreQueryRequest(contentTopics: @[chat.contentTopic]), storenode.get() + ) + ).valueOr: + error "Store query failed", error = error + break storeQueryBlock + storeHandler(queryRes) # NOTE Must be mounted after relay if conf.lightpushnode != "": @@ -511,8 +496,9 @@ proc processInput(rfd: AsyncFD, rng: ref HmacDrbgContext) {.async.} = error = peerInfo.error if conf.filternode != "": - let peerInfo = parsePeerInfo(conf.filternode) - if peerInfo.isOk(): + if (let peerInfo = parsePeerInfo(conf.filternode); peerInfo.isErr()): + error "Filter not mounted. Couldn't parse conf.filternode", error = peerInfo.error + else: await node.mountFilter() await node.mountFilterClient() @@ -523,8 +509,6 @@ proc processInput(rfd: AsyncFD, rng: ref HmacDrbgContext) {.async.} = chat.printReceivedMessage(msg) # TODO: Here to support FilterV2 relevant subscription. - else: - error "Filter not mounted. Couldn't parse conf.filternode", error = peerInfo.error # Subscribe to a topic, if relay is mounted if conf.relay: @@ -545,11 +529,8 @@ proc processInput(rfd: AsyncFD, rng: ref HmacDrbgContext) {.async.} = proc spamHandler(wakuMessage: WakuMessage) {.gcsafe, closure.} = debug "spam handler is called" - let chatLineResult = chat.getChatLine(wakuMessage) - if chatLineResult.isOk(): - echo "A spam message is found and discarded : ", chatLineResult.value - else: - echo "A spam message is found and discarded" + let chatLineResult = getChatLine(wakuMessage.payload) + echo "spam message is found and discarded : " & chatLineResult chat.prompt = false showChatPrompt(chat) diff --git a/apps/chat2bridge/chat2bridge.nim b/apps/chat2bridge/chat2bridge.nim index c2bf9c032..279a5fefa 100644 --- a/apps/chat2bridge/chat2bridge.nim +++ b/apps/chat2bridge/chat2bridge.nim @@ -136,9 +136,7 @@ proc toMatterbridge( proc pollMatterbridge(cmb: Chat2MatterBridge, handler: MbMessageHandler) {.async.} = while cmb.running: - let getRes = cmb.mbClient.getMessages() - - if getRes.isOk(): + if (let getRes = cmb.mbClient.getMessages(); getRes.isOk()): for jsonNode in getRes[]: await handler(jsonNode) else: @@ -169,9 +167,7 @@ proc new*( let mbClient = MatterbridgeClient.new(mbHostUri, mbGateway) # Let's verify the Matterbridge configuration before continuing - let clientHealth = mbClient.isHealthy() - - if clientHealth.isOk() and clientHealth[]: + if mbClient.isHealthy().valueOr(false): info "Reached Matterbridge host", host = mbClient.host else: raise newException(ValueError, "Matterbridge client not reachable/healthy") diff --git a/apps/chat2bridge/config_chat2bridge.nim b/apps/chat2bridge/config_chat2bridge.nim index c7d8bb56a..abb5e329f 100644 --- a/apps/chat2bridge/config_chat2bridge.nim +++ b/apps/chat2bridge/config_chat2bridge.nim @@ -91,7 +91,7 @@ type Chat2MatterbridgeConf* = object name: "filternode" .}: string - # Matterbridge options + # Matterbridge options mbHostAddress* {. desc: "Listening address of the Matterbridge host", defaultValue: parseIpAddress("127.0.0.1"), @@ -126,11 +126,9 @@ proc completeCmdArg*(T: type keys.KeyPair, val: string): seq[string] = return @[] proc parseCmdArg*(T: type crypto.PrivateKey, p: string): T = - let key = SkPrivateKey.init(p) - if key.isOk(): - crypto.PrivateKey(scheme: Secp256k1, skkey: key.get()) - else: + let key = SkPrivateKey.init(p).valueOr: raise newException(ValueError, "Invalid private key") + return crypto.PrivateKey(scheme: Secp256k1, skkey: key) proc completeCmdArg*(T: type crypto.PrivateKey, val: string): seq[string] = return @[] diff --git a/apps/chat2mix/chat2mix.nim b/apps/chat2mix/chat2mix.nim index b4af7fbd6..21cdcd0d2 100644 --- a/apps/chat2mix/chat2mix.nim +++ b/apps/chat2mix/chat2mix.nim @@ -146,25 +146,14 @@ proc showChatPrompt(c: Chat) = except IOError: discard -proc getChatLine(c: Chat, msg: WakuMessage): Result[string, string] = +proc getChatLine(payload: seq[byte]): string = # No payload encoding/encryption from Waku - let - pb = Chat2Message.init(msg.payload) - chatLine = - if pb.isOk: - pb[].toString() - else: - string.fromBytes(msg.payload) - return ok(chatline) + let pb = Chat2Message.init(payload).valueOr: + return string.fromBytes(payload) + return $pb proc printReceivedMessage(c: Chat, msg: WakuMessage) = - let - pb = Chat2Message.init(msg.payload) - chatLine = - if pb.isOk: - pb[].toString() - else: - string.fromBytes(msg.payload) + let chatLine = getChatLine(msg.payload) try: echo &"{chatLine}" except ValueError: @@ -375,18 +364,17 @@ proc maintainSubscription( let peerOpt = selectRandomServicePeer( wakuNode.peerManager, some(actualFilterPeer), WakuFilterSubscribeCodec ) - if peerOpt.isOk(): - actualFilterPeer = peerOpt.get() - - info "Found new peer for codec", - codec = filterPubsubTopic, peer = constructMultiaddrStr(actualFilterPeer) - - noFailedSubscribes = 0 - continue # try again with new peer without delay - else: + peerOpt.isOkOr: error "Failed to find new service peer. Exiting." noFailedServiceNodeSwitches += 1 break + + actualFilterPeer = peerOpt.get() + info "Found new peer for codec", + codec = filterPubsubTopic, peer = constructMultiaddrStr(actualFilterPeer) + + noFailedSubscribes = 0 + continue # try again with new peer without delay else: if noFailedSubscribes > 0: noFailedSubscribes -= 1 @@ -530,7 +518,7 @@ proc processInput(rfd: AsyncFD, rng: ref HmacDrbgContext) {.async.} = let resolved = await dnsResolver.resolveTxt(domain) return resolved[0] # Use only first answer - var wakuDnsDiscovery = WakuDnsDiscovery.init(dnsDiscoveryUrl.get(), resolver) + let wakuDnsDiscovery = WakuDnsDiscovery.init(dnsDiscoveryUrl.get(), resolver) if wakuDnsDiscovery.isOk: let discoveredPeers = await wakuDnsDiscovery.get().findPeers() if discoveredPeers.isOk: @@ -538,8 +526,10 @@ proc processInput(rfd: AsyncFD, rng: ref HmacDrbgContext) {.async.} = discoveredNodes = discoveredPeers.get() echo "Discovered and connecting to " & $discoveredNodes waitFor chat.node.connectToNodes(discoveredNodes) + else: + warn "Failed to find peers via DNS discovery", error = discoveredPeers.error else: - warn "Failed to init Waku DNS discovery" + warn "Failed to init Waku DNS discovery", error = wakuDnsDiscovery.error let peerInfo = node.switch.peerInfo let listenStr = $peerInfo.addrs[0] & "/p2p/" & $peerInfo.peerId @@ -575,13 +565,7 @@ proc processInput(rfd: AsyncFD, rng: ref HmacDrbgContext) {.async.} = else: newSeq[byte](0) - let - pb = Chat2Message.init(payload) - chatLine = - if pb.isOk: - pb[].toString() - else: - string.fromBytes(payload) + let chatLine = getChatLine(payload) echo &"{chatLine}" info "Hit store handler" diff --git a/apps/liteprotocoltester/Dockerfile.liteprotocoltester b/apps/liteprotocoltester/Dockerfile.liteprotocoltester index 1948300d0..4c4b122b6 100644 --- a/apps/liteprotocoltester/Dockerfile.liteprotocoltester +++ b/apps/liteprotocoltester/Dockerfile.liteprotocoltester @@ -1,37 +1,33 @@ - # TESTING IMAGE -------------------------------------------------------------- +# TESTING IMAGE -------------------------------------------------------------- - ## NOTICE: This is a short cut build file for ubuntu users who compiles nwaku in ubuntu distro. - ## This is used for faster turnaround time for testing the compiled binary. - ## Prerequisites: compiled liteprotocoltester binary in build/ directory +## NOTICE: This is a short cut build file for ubuntu users who compiles nwaku in ubuntu distro. +## This is used for faster turnaround time for testing the compiled binary. +## Prerequisites: compiled liteprotocoltester binary in build/ directory - FROM ubuntu:noble AS prod +FROM ubuntu:noble AS prod - LABEL maintainer="zoltan@status.im" - LABEL source="https://github.com/waku-org/nwaku" - LABEL description="Lite Protocol Tester: Waku light-client" - LABEL commit="unknown" - LABEL version="unknown" +LABEL maintainer="zoltan@status.im" +LABEL source="https://github.com/waku-org/nwaku" +LABEL description="Lite Protocol Tester: Waku light-client" +LABEL commit="unknown" +LABEL version="unknown" - # DevP2P, LibP2P, and JSON RPC ports - EXPOSE 30303 60000 8545 +# DevP2P, LibP2P, and JSON RPC ports +EXPOSE 30303 60000 8545 - # Referenced in the binary - RUN apt-get update && apt-get install -y --no-install-recommends \ - libgcc1 \ - libpcre3 \ - libpq-dev \ - wget \ - iproute2 \ - && rm -rf /var/lib/apt/lists/* +# Referenced in the binary +RUN apt-get update && apt-get install -y --no-install-recommends \ + libgcc1 \ + libpq-dev \ + wget \ + iproute2 \ + && rm -rf /var/lib/apt/lists/* - # Fix for 'Error loading shared library libpcre.so.3: No such file or directory' - RUN ln -s /usr/lib/libpcre.so /usr/lib/libpcre.so.3 +COPY build/liteprotocoltester /usr/bin/ +COPY apps/liteprotocoltester/run_tester_node.sh /usr/bin/ +COPY apps/liteprotocoltester/run_tester_node_on_fleet.sh /usr/bin/ - COPY build/liteprotocoltester /usr/bin/ - COPY apps/liteprotocoltester/run_tester_node.sh /usr/bin/ - COPY apps/liteprotocoltester/run_tester_node_on_fleet.sh /usr/bin/ +ENTRYPOINT ["/usr/bin/run_tester_node.sh", "/usr/bin/liteprotocoltester"] - ENTRYPOINT ["/usr/bin/run_tester_node.sh", "/usr/bin/liteprotocoltester"] - - # # By default just show help if called without arguments - CMD ["--help"] +# # By default just show help if called without arguments +CMD ["--help"] diff --git a/apps/liteprotocoltester/Dockerfile.liteprotocoltester.compile b/apps/liteprotocoltester/Dockerfile.liteprotocoltester.compile index 497570c75..9e2432051 100644 --- a/apps/liteprotocoltester/Dockerfile.liteprotocoltester.compile +++ b/apps/liteprotocoltester/Dockerfile.liteprotocoltester.compile @@ -45,9 +45,6 @@ RUN apk add --no-cache libgcc libpq-dev \ iproute2 \ python3 -# Fix for 'Error loading shared library libpcre.so.3: No such file or directory' -RUN ln -s /usr/lib/libpcre.so /usr/lib/libpcre.so.3 - COPY --from=nim-build /app/build/liteprotocoltester /usr/bin/ RUN chmod +x /usr/bin/liteprotocoltester diff --git a/apps/liteprotocoltester/publisher.nim b/apps/liteprotocoltester/publisher.nim index d8031473d..1debfdf56 100644 --- a/apps/liteprotocoltester/publisher.nim +++ b/apps/liteprotocoltester/publisher.nim @@ -190,25 +190,22 @@ proc publishMessages( ) if not preventPeerSwitch and noFailedPush > maxFailedPush: info "Max push failure limit reached, Try switching peer." - let peerOpt = selectRandomServicePeer( + actualServicePeer = selectRandomServicePeer( wakuNode.peerManager, some(actualServicePeer), WakuLightPushCodec - ) - if peerOpt.isOk(): - actualServicePeer = peerOpt.get() - - info "New service peer in use", - codec = lightpushPubsubTopic, - peer = constructMultiaddrStr(actualServicePeer) - - noFailedPush = 0 - noOfServicePeerSwitches += 1 - lpt_change_service_peer_count.inc(labelValues = ["publisher"]) - continue # try again with new peer without delay - else: + ).valueOr: error "Failed to find new service peer. Exiting." noFailedServiceNodeSwitches += 1 break + info "New service peer in use", + codec = lightpushPubsubTopic, + peer = constructMultiaddrStr(actualServicePeer) + + noFailedPush = 0 + noOfServicePeerSwitches += 1 + lpt_change_service_peer_count.inc(labelValues = ["publisher"]) + continue # try again with new peer without delay + await sleepAsync(messageInterval) proc setupAndPublish*( diff --git a/apps/liteprotocoltester/receiver.nim b/apps/liteprotocoltester/receiver.nim index f0f41b1c5..0e6638c61 100644 --- a/apps/liteprotocoltester/receiver.nim +++ b/apps/liteprotocoltester/receiver.nim @@ -89,23 +89,20 @@ proc maintainSubscription( await sleepAsync(2.seconds) # Wait a bit before retrying continue elif not preventPeerSwitch: - let peerOpt = selectRandomServicePeer( + actualFilterPeer = selectRandomServicePeer( wakuNode.peerManager, some(actualFilterPeer), WakuFilterSubscribeCodec - ) - if peerOpt.isOk(): - actualFilterPeer = peerOpt.get() - - info "Found new peer for codec", - codec = filterPubsubTopic, peer = constructMultiaddrStr(actualFilterPeer) - - noFailedSubscribes = 0 - lpt_change_service_peer_count.inc(labelValues = ["receiver"]) - isFirstPingOnNewPeer = true - continue # try again with new peer without delay - else: + ).valueOr: error "Failed to find new service peer. Exiting." noFailedServiceNodeSwitches += 1 break + + info "Found new peer for codec", + codec = filterPubsubTopic, peer = constructMultiaddrStr(actualFilterPeer) + + noFailedSubscribes = 0 + lpt_change_service_peer_count.inc(labelValues = ["receiver"]) + isFirstPingOnNewPeer = true + continue # try again with new peer without delay else: if noFailedSubscribes > 0: noFailedSubscribes -= 1 diff --git a/apps/networkmonitor/networkmonitor.nim b/apps/networkmonitor/networkmonitor.nim index a9144ae22..b78a0cec1 100644 --- a/apps/networkmonitor/networkmonitor.nim +++ b/apps/networkmonitor/networkmonitor.nim @@ -1,7 +1,7 @@ {.push raises: [].} import - std/[net, tables, strutils, times, sequtils, random], + std/[net, tables, strutils, times, sequtils, random, sugar], results, chronicles, chronicles/topics_registry, @@ -183,16 +183,14 @@ proc setConnectedPeersMetrics( for maddr in peerInfo.addrs: if $maddr notin customPeerInfo.maddrs: customPeerInfo.maddrs.add $maddr - let typedRecord = discNode.toTypedRecord() - if not typedRecord.isOk(): + let typedRecord = discNode.toTypedRecord().valueOr: warn "could not convert record to typed record", record = discNode continue - if not typedRecord.get().ip.isSome(): - warn "ip field is not set", record = typedRecord.get() + let ipAddr = typedRecord.ip.valueOr: + warn "ip field is not set", record = typedRecord continue - let ip = $typedRecord.get().ip.get().join(".") - customPeerInfo.ip = ip + customPeerInfo.ip = $ipAddr.join(".") # try to ping the peer if shouldReconnect(customPeerInfo): @@ -374,14 +372,9 @@ proc retrieveDynamicBootstrapNodes( if resolved.len > 0: return resolved[0] # Use only first answer - var wakuDnsDiscovery = WakuDnsDiscovery.init(dnsDiscoveryUrl, resolver) - if wakuDnsDiscovery.isOk(): - return (await wakuDnsDiscovery.get().findPeers()).mapErr( - proc(e: cstring): string = - $e - ) - else: - warn "Failed to init Waku DNS discovery" + var wakuDnsDiscovery = WakuDnsDiscovery.init(dnsDiscoveryUrl, resolver).errorOr: + return (await value.findPeers()).mapErr(e => $e) + warn "Failed to init Waku DNS discovery" debug "No method for retrieving dynamic bootstrap nodes specified." ok(newSeq[RemotePeerInfo]()) # Return an empty seq by default @@ -391,11 +384,10 @@ proc getBootstrapFromDiscDns( ): Future[Result[seq[enr.Record], string]] {.async.} = try: let dnsNameServers = @[parseIpAddress("1.1.1.1"), parseIpAddress("1.0.0.1")] - let dynamicBootstrapNodesRes = + let dynamicBootstrapNodes = ( await retrieveDynamicBootstrapNodes(conf.dnsDiscoveryUrl, dnsNameServers) - if not dynamicBootstrapNodesRes.isOk(): - error("failed discovering peers from DNS") - let dynamicBootstrapNodes = dynamicBootstrapNodesRes.get() + ).valueOr: + return err("Failed retrieving dynamic bootstrap nodes: " & $error) # select dynamic bootstrap nodes that have an ENR containing a udp port. # Discv5 only supports UDP https://github.com/ethereum/devp2p/blob/master/discv5/discv5-theory.md) @@ -411,7 +403,7 @@ proc getBootstrapFromDiscDns( discv5BootstrapEnrs.add(enr) return ok(discv5BootstrapEnrs) except CatchableError: - error("failed discovering peers from DNS") + error("failed discovering peers from DNS: " & getCurrentExceptionMsg()) proc initAndStartApp( conf: NetworkMonitorConf diff --git a/docker/binaries/Dockerfile.bn.local b/docker/binaries/Dockerfile.bn.local index 79445d14f..7a2305bd1 100644 --- a/docker/binaries/Dockerfile.bn.local +++ b/docker/binaries/Dockerfile.bn.local @@ -14,12 +14,9 @@ EXPOSE 30303 60000 8545 # Referenced in the binary RUN apt-get update &&\ - apt-get install -y libpcre3 libpq-dev curl iproute2 wget jq dnsutils &&\ + apt-get install -y libpq-dev curl iproute2 wget jq dnsutils &&\ apt-get clean && rm -rf /var/lib/apt/lists/* -# Fix for 'Error loading shared library libpcre.so.3: No such file or directory' -RUN ln -s /usr/lib/libpcre.so /usr/lib/libpcre.so.3 - # Copy to separate location to accomodate different MAKE_TARGET values ADD ./build/$MAKE_TARGET /usr/local/bin/ diff --git a/examples/lightpush_publisher.nim b/examples/lightpush_publisher.nim index e9fa2174d..70ebd9c53 100644 --- a/examples/lightpush_publisher.nim +++ b/examples/lightpush_publisher.nim @@ -86,7 +86,9 @@ proc setupAndPublish(rng: ref HmacDrbgContext) {.async.} = timestamp: now(), ) # current timestamp - let lightpushPeer = parsePeerInfo(LightpushPeer).get() + let lightpushPeer = parsePeerInfo(LightpushPeer).valueOr: + error "failed to parse LightpushPeer", error = error + quit(QuitFailure) let res = await node.legacyLightpushPublish( some(LightpushPubsubTopic), message, lightpushPeer diff --git a/library/waku_thread_requests/requests/discovery_request.nim b/library/waku_thread_requests/requests/discovery_request.nim index 8fec0dd9f..6f6780a2f 100644 --- a/library/waku_thread_requests/requests/discovery_request.nim +++ b/library/waku_thread_requests/requests/discovery_request.nim @@ -6,6 +6,7 @@ import ../../../waku/discovery/waku_discv5, ../../../waku/waku_core/peers, ../../../waku/node/waku_node, + ../../../waku/node/api, ../../alloc type DiscoveryMsgType* = enum diff --git a/library/waku_thread_requests/requests/protocols/filter_request.nim b/library/waku_thread_requests/requests/protocols/filter_request.nim index 274ec32ea..c0a99f1f9 100644 --- a/library/waku_thread_requests/requests/protocols/filter_request.nim +++ b/library/waku_thread_requests/requests/protocols/filter_request.nim @@ -8,6 +8,7 @@ import ../../../../waku/waku_core/subscription/push_handler, ../../../../waku/node/peer_manager/peer_manager, ../../../../waku/node/waku_node, + ../../../../waku/node/api, ../../../../waku/waku_core/topics/pubsub_topic, ../../../../waku/waku_core/topics/content_topic, ../../../alloc diff --git a/tests/node/test_wakunode_filter.nim b/tests/node/test_wakunode_filter.nim index abf555b68..34f5c1e8c 100644 --- a/tests/node/test_wakunode_filter.nim +++ b/tests/node/test_wakunode_filter.nim @@ -12,6 +12,7 @@ import waku_core, node/peer_manager, node/waku_node, + node/api, waku_filter_v2, waku_filter_v2/client, waku_filter_v2/subscriptions, diff --git a/tests/node/test_wakunode_legacy_lightpush.nim b/tests/node/test_wakunode_legacy_lightpush.nim index b769abbfd..868c370cf 100644 --- a/tests/node/test_wakunode_legacy_lightpush.nim +++ b/tests/node/test_wakunode_legacy_lightpush.nim @@ -12,6 +12,7 @@ import waku_core, node/peer_manager, node/waku_node, + node/api, waku_lightpush_legacy, waku_lightpush_legacy/common, waku_lightpush_legacy/protocol_metrics, diff --git a/tests/node/test_wakunode_legacy_store.nim b/tests/node/test_wakunode_legacy_store.nim index beed3c1c6..1863066bc 100644 --- a/tests/node/test_wakunode_legacy_store.nim +++ b/tests/node/test_wakunode_legacy_store.nim @@ -6,6 +6,7 @@ import waku/[ common/paging, node/waku_node, + node/api, node/peer_manager, waku_core, waku_store_legacy, diff --git a/tests/node/test_wakunode_lightpush.nim b/tests/node/test_wakunode_lightpush.nim index e4ccb60fd..ec48ff1d6 100644 --- a/tests/node/test_wakunode_lightpush.nim +++ b/tests/node/test_wakunode_lightpush.nim @@ -8,7 +8,14 @@ import libp2p/crypto/crypto import - waku/[waku_core, node/peer_manager, node/waku_node, waku_lightpush, waku_rln_relay], + waku/[ + waku_core, + node/peer_manager, + node/waku_node, + node/api, + waku_lightpush, + waku_rln_relay, + ], ../testlib/[wakucore, wakunode, testasync, futures], ../resources/payloads, ../waku_rln_relay/[rln/waku_rln_relay_utils, utils_onchain] diff --git a/tests/node/test_wakunode_peer_exchange.nim b/tests/node/test_wakunode_peer_exchange.nim index 3075fa83f..4ebeae4ae 100644 --- a/tests/node/test_wakunode_peer_exchange.nim +++ b/tests/node/test_wakunode_peer_exchange.nim @@ -12,8 +12,14 @@ import eth/p2p/discoveryv5/enr import - waku/ - [waku_node, discovery/waku_discv5, waku_peer_exchange, node/peer_manager, waku_core], + waku/[ + waku_node, + node/api, + discovery/waku_discv5, + waku_peer_exchange, + node/peer_manager, + waku_core, + ], ../waku_peer_exchange/utils, ../testlib/[wakucore, wakunode, testasync] diff --git a/tests/node/test_wakunode_peer_manager.nim b/tests/node/test_wakunode_peer_manager.nim index 0ef2b1a13..6b1c2a427 100644 --- a/tests/node/test_wakunode_peer_manager.nim +++ b/tests/node/test_wakunode_peer_manager.nim @@ -17,6 +17,7 @@ import waku_core, node/peer_manager, node/waku_node, + node/api, discovery/waku_discv5, waku_filter_v2/common, waku_relay/protocol, diff --git a/tests/node/test_wakunode_relay_rln.nim b/tests/node/test_wakunode_relay_rln.nim index 4bc74fcf1..1acf6b590 100644 --- a/tests/node/test_wakunode_relay_rln.nim +++ b/tests/node/test_wakunode_relay_rln.nim @@ -17,6 +17,7 @@ import node/peer_manager, waku_core, waku_node, + node/api, common/error_handling, waku_rln_relay, waku_rln_relay/rln, diff --git a/tests/node/test_wakunode_sharding.nim b/tests/node/test_wakunode_sharding.nim index 5b99689be..945c22eee 100644 --- a/tests/node/test_wakunode_sharding.nim +++ b/tests/node/test_wakunode_sharding.nim @@ -16,6 +16,7 @@ import waku_core/topics/sharding, waku_store_legacy/common, node/waku_node, + node/api, common/paging, waku_core, waku_store/common, diff --git a/tests/node/test_wakunode_store.nim b/tests/node/test_wakunode_store.nim index 00dbfb7ee..284b32e64 100644 --- a/tests/node/test_wakunode_store.nim +++ b/tests/node/test_wakunode_store.nim @@ -6,6 +6,7 @@ import waku/[ common/paging, node/waku_node, + node/api, node/peer_manager, waku_core, waku_core/message/digest, diff --git a/tests/waku_discv5/test_waku_discv5.nim b/tests/waku_discv5/test_waku_discv5.nim index abdf09626..d1cd6c46f 100644 --- a/tests/waku_discv5/test_waku_discv5.nim +++ b/tests/waku_discv5/test_waku_discv5.nim @@ -22,6 +22,7 @@ import factory/conf_builder/conf_builder, factory/waku, node/waku_node, + node/api, node/peer_manager, ], ../testlib/[wakucore, testasync, assertions, futures, wakunode, testutils], diff --git a/tests/waku_filter_v2/test_waku_client.nim b/tests/waku_filter_v2/test_waku_client.nim index 2c3e2f4ec..6ae1f2902 100644 --- a/tests/waku_filter_v2/test_waku_client.nim +++ b/tests/waku_filter_v2/test_waku_client.nim @@ -3,7 +3,7 @@ import std/[options, sequtils, json], testutils/unittests, results, chronos import - waku/node/[peer_manager, waku_node], + waku/node/[peer_manager, waku_node, api], waku/waku_core, waku/waku_filter_v2/[common, client, subscriptions, protocol, rpc_codec], ../testlib/[wakucore, testasync, testutils, futures, sequtils, wakunode], diff --git a/tests/waku_peer_exchange/test_protocol.nim b/tests/waku_peer_exchange/test_protocol.nim index 1d10cf270..204338a85 100644 --- a/tests/waku_peer_exchange/test_protocol.nim +++ b/tests/waku_peer_exchange/test_protocol.nim @@ -17,7 +17,6 @@ import waku_peer_exchange/rpc_codec, waku_peer_exchange/protocol, waku_peer_exchange/client, - node/peer_manager, waku_core, common/enr/builder, waku_enr/sharding, diff --git a/waku.nimble b/waku.nimble index 6de73ad96..609ae16bf 100644 --- a/waku.nimble +++ b/waku.nimble @@ -20,6 +20,7 @@ requires "nim >= 2.2.4", "json_rpc", "libbacktrace", "nimcrypto", + "serialization", "stew", "stint", "metrics", @@ -30,7 +31,6 @@ requires "nim >= 2.2.4", "results", "db_connector", "minilru", - "quic", "https://github.com/vacp2p/mix#0.1.0" ### Helper functions diff --git a/waku/common/rate_limit/service_metrics.nim b/waku/common/rate_limit/service_metrics.nim index bff91f622..4453dc5bd 100644 --- a/waku/common/rate_limit/service_metrics.nim +++ b/waku/common/rate_limit/service_metrics.nim @@ -1,7 +1,6 @@ {.push raises: [].} import std/options -import chronos/timer import metrics, setting export metrics diff --git a/waku/discovery/waku_discv5.nim b/waku/discovery/waku_discv5.nim index 5ad07385c..29249df6c 100644 --- a/waku/discovery/waku_discv5.nim +++ b/waku/discovery/waku_discv5.nim @@ -421,13 +421,11 @@ proc setupDiscoveryV5*( addBootstrapNode(enrUri, discv5BootstrapEnrs) for enr in discv5BootstrapEnrs: - let peerInfoRes = enr.toRemotePeerInfo() - if peerInfoRes.isOk(): - nodePeerManager.addPeer(peerInfoRes.get(), PeerOrigin.Discv5) - else: + let peerInfo = enr.toRemotePeerInfo().valueOr: debug "could not convert discv5 bootstrap node to peerInfo, not adding peer to Peer Store", - enr = enr.toUri(), error = peerInfoRes.error - + enr = enr.toUri(), error = error + continue + nodePeerManager.addPeer(peerInfo, PeerOrigin.Discv5) discv5BootstrapEnrs.add(dynamicBootstrapEnrs) let discv5Config = diff --git a/waku/discovery/waku_dnsdisc.nim b/waku/discovery/waku_dnsdisc.nim index 995732a02..3204ad6bd 100644 --- a/waku/discovery/waku_dnsdisc.nim +++ b/waku/discovery/waku_dnsdisc.nim @@ -6,7 +6,7 @@ ## EIP-1459 is defined in https://eips.ethereum.org/EIPS/eip-1459 import - std/[options, net, sequtils], + std/[options, net, sequtils, sugar], chronicles, chronos, metrics, @@ -67,14 +67,11 @@ proc findPeers*( for enr in discoveredEnr: # Convert discovered ENR to RemotePeerInfo and add to discovered nodes - let res = enr.toRemotePeerInfo() - - if res.isOk(): - discoveredNodes.add(res.get()) - else: - error "Failed to convert ENR to peer info", enr = $enr, err = res.error() + let peerInfo = enr.toRemotePeerInfo().valueOr: + error "Failed to convert ENR to peer info", enr = $enr, error = error waku_dnsdisc_errors.inc(labelValues = ["peer_info_failure"]) - + continue + discoveredNodes.add(peerInfo) if discoveredNodes.len > 0: info "Successfully discovered nodes", count = discoveredNodes.len waku_dnsdisc_discovered.inc(discoveredNodes.len.int64) @@ -117,14 +114,9 @@ proc retrieveDynamicBootstrapNodes*( if resolved.len > 0: return resolved[0] # Use only first answer - var wakuDnsDiscovery = WakuDnsDiscovery.init(dnsDiscoveryUrl, resolver) - if wakuDnsDiscovery.isOk(): - return (await wakuDnsDiscovery.get().findPeers()).mapErr( - proc(e: cstring): string = - $e - ) - else: - warn "Failed to init Waku DNS discovery" + var wakuDnsDiscovery = WakuDnsDiscovery.init(dnsDiscoveryUrl, resolver).errorOr: + return (await value.findPeers()).mapErr(e => $e) + warn "Failed to init Waku DNS discovery" debug "No method for retrieving dynamic bootstrap nodes specified." ok(newSeq[RemotePeerInfo]()) # Return an empty seq by default diff --git a/waku/factory/node_factory.nim b/waku/factory/node_factory.nim index 016132306..c4cff6c68 100644 --- a/waku/factory/node_factory.nim +++ b/waku/factory/node_factory.nim @@ -251,21 +251,15 @@ proc setupProtocols( mountStoreClient(node) if conf.remoteStoreNode.isSome(): - let storeNode = parsePeerInfo(conf.remoteStoreNode.get()) - if storeNode.isOk(): - node.peerManager.addServicePeer(storeNode.value, store_common.WakuStoreCodec) - else: - return err("failed to set node waku store peer: " & storeNode.error) + let storeNode = parsePeerInfo(conf.remoteStoreNode.get()).valueOr: + return err("failed to set node waku store peer: " & error) + node.peerManager.addServicePeer(storeNode, WakuStoreCodec) mountLegacyStoreClient(node) if conf.remoteStoreNode.isSome(): - let storeNode = parsePeerInfo(conf.remoteStoreNode.get()) - if storeNode.isOk(): - node.peerManager.addServicePeer( - storeNode.value, legacy_common.WakuLegacyStoreCodec - ) - else: - return err("failed to set node waku legacy store peer: " & storeNode.error) + let storeNode = parsePeerInfo(conf.remoteStoreNode.get()).valueOr: + return err("failed to set node waku legacy store peer: " & error) + node.peerManager.addServicePeer(storeNode, WakuLegacyStoreCodec) if conf.storeServiceConf.isSome and conf.storeServiceConf.get().resume: node.setupStoreResume() @@ -377,12 +371,10 @@ proc setupProtocols( mountLightPushClient(node) mountLegacyLightPushClient(node) if conf.remoteLightPushNode.isSome(): - let lightPushNode = parsePeerInfo(conf.remoteLightPushNode.get()) - if lightPushNode.isOk(): - node.peerManager.addServicePeer(lightPushNode.value, WakuLightPushCodec) - node.peerManager.addServicePeer(lightPushNode.value, WakuLegacyLightPushCodec) - else: - return err("failed to set node waku lightpush peer: " & lightPushNode.error) + let lightPushNode = parsePeerInfo(conf.remoteLightPushNode.get()).valueOr: + return err("failed to set node waku lightpush peer: " & error) + node.peerManager.addServicePeer(lightPushNode, WakuLightPushCodec) + node.peerManager.addServicePeer(lightPushNode, WakuLegacyLightPushCodec) # Filter setup. NOTE Must be mounted after relay if conf.filterServiceConf.isSome(): @@ -400,16 +392,13 @@ proc setupProtocols( await node.mountFilterClient() if conf.remoteFilterNode.isSome(): - let filterNode = parsePeerInfo(conf.remoteFilterNode.get()) - if filterNode.isOk(): - try: - node.peerManager.addServicePeer(filterNode.value, WakuFilterSubscribeCodec) - except CatchableError: - return err( - "failed to mount waku filter client protocol: " & getCurrentExceptionMsg() - ) - else: - return err("failed to set node waku filter peer: " & filterNode.error) + let filterNode = parsePeerInfo(conf.remoteFilterNode.get()).valueOr: + return err("failed to set node waku filter peer: " & error) + try: + node.peerManager.addServicePeer(filterNode, WakuFilterSubscribeCodec) + except CatchableError: + return + err("failed to mount waku filter client protocol: " & getCurrentExceptionMsg()) # waku peer exchange setup if conf.peerExchangeService: @@ -422,12 +411,9 @@ proc setupProtocols( err("failed to mount waku peer-exchange protocol: " & getCurrentExceptionMsg()) if conf.remotePeerExchangeNode.isSome(): - let peerExchangeNode = parsePeerInfo(conf.remotePeerExchangeNode.get()) - if peerExchangeNode.isOk(): - node.peerManager.addServicePeer(peerExchangeNode.value, WakuPeerExchangeCodec) - else: - return - err("failed to set node waku peer-exchange peer: " & peerExchangeNode.error) + let peerExchangeNode = parsePeerInfo(conf.remotePeerExchangeNode.get()).valueOr: + return err("failed to set node waku peer-exchange peer: " & error) + node.peerManager.addServicePeer(peerExchangeNode, WakuPeerExchangeCodec) if conf.peerExchangeDiscovery: await node.mountPeerExchangeClient() @@ -481,7 +467,7 @@ proc startNode*( error "error while fetching peers from peer exchange", error = error # TODO: behavior described by comment is undesired. PX as client should be used in tandem with discv5. - # + # # Use px to periodically get peers if discv5 is disabled, as discv5 nodes have their own # periodic loop to find peers and px returned peers actually come from discv5 if conf.peerExchangeDiscovery and not conf.discv5Conf.isSome(): diff --git a/waku/factory/validator_signed.nim b/waku/factory/validator_signed.nim index 0da380ab5..bd6ad9ba6 100644 --- a/waku/factory/validator_signed.nim +++ b/waku/factory/validator_signed.nim @@ -67,9 +67,8 @@ proc addSignedShardsValidator*( if msg.timestamp != 0: if msg.withinTimeWindow(): let msgHash = SkMessage(topic.msgHash(msg)) - let recoveredSignature = SkSignature.fromRaw(msg.meta) - if recoveredSignature.isOk(): - if recoveredSignature.get.verify(msgHash, protectedShard.key): + SkSignature.fromRaw(msg.meta).isErrOr: + if value.verify(msgHash, protectedShard.key): outcome = errors.ValidationResult.Accept if outcome != errors.ValidationResult.Accept: diff --git a/waku/node/api.nim b/waku/node/api.nim new file mode 100644 index 000000000..6f8f1cdd9 --- /dev/null +++ b/waku/node/api.nim @@ -0,0 +1,9 @@ +import + ./api/filter as filter_api, + ./api/lightpush as lightpush_api, + ./api/store as store_api, + ./api/relay as relay_api, + ./api/peer_exchange as peer_exchange_api, + ./api/ping as ping_api + +export filter_api, lightpush_api, store_api, relay_api, peer_exchange_api, ping_api diff --git a/waku/node/api/filter.nim b/waku/node/api/filter.nim new file mode 100644 index 000000000..242640a44 --- /dev/null +++ b/waku/node/api/filter.nim @@ -0,0 +1,297 @@ +{.push raises: [].} + +import + std/[options, sugar, tables, sequtils, os, net], + chronos, + chronicles, + metrics, + results, + stew/byteutils, + eth/keys, + nimcrypto, + eth/p2p/discoveryv5/enr, + libp2p/crypto/crypto, + libp2p/protocols/ping, + libp2p/protocols/pubsub/gossipsub, + libp2p/protocols/pubsub/rpc/messages, + libp2p/builders, + libp2p/transports/tcptransport, + libp2p/transports/wstransport, + libp2p/utility + +import + ../waku_node, + ../../waku_core, + ../../waku_core/topics/sharding, + ../../waku_filter_v2, + ../../waku_filter_v2/client as filter_client, + ../../waku_filter_v2/subscriptions as filter_subscriptions, + ../../common/rate_limit/setting, + ../peer_manager + +logScope: + topics = "waku node filter api" + +## Waku filter + +proc mountFilter*( + node: WakuNode, + subscriptionTimeout: Duration = + filter_subscriptions.DefaultSubscriptionTimeToLiveSec, + maxFilterPeers: uint32 = filter_subscriptions.MaxFilterPeers, + maxFilterCriteriaPerPeer: uint32 = filter_subscriptions.MaxFilterCriteriaPerPeer, + messageCacheTTL: Duration = filter_subscriptions.MessageCacheTTL, + rateLimitSetting: RateLimitSetting = FilterDefaultPerPeerRateLimit, +) {.async: (raises: []).} = + ## Mounting filter v2 protocol + + info "mounting filter protocol" + node.wakuFilter = WakuFilter.new( + node.peerManager, + subscriptionTimeout, + maxFilterPeers, + maxFilterCriteriaPerPeer, + messageCacheTTL, + some(rateLimitSetting), + ) + + try: + await node.wakuFilter.start() + except CatchableError: + error "failed to start wakuFilter", error = getCurrentExceptionMsg() + + try: + node.switch.mount(node.wakuFilter, protocolMatcher(WakuFilterSubscribeCodec)) + except LPError: + error "failed to mount wakuFilter", error = getCurrentExceptionMsg() + +proc filterHandleMessage*( + node: WakuNode, pubsubTopic: PubsubTopic, message: WakuMessage +) {.async.} = + if node.wakuFilter.isNil(): + error "cannot handle filter message", error = "waku filter is required" + return + + await node.wakuFilter.handleMessage(pubsubTopic, message) + +proc mountFilterClient*(node: WakuNode) {.async: (raises: []).} = + ## Mounting both filter + ## Giving option for application level to choose btw own push message handling or + ## rely on node provided cache. - This only applies for v2 filter client + info "mounting filter client" + + if not node.wakuFilterClient.isNil(): + trace "Filter client already mounted." + return + + node.wakuFilterClient = WakuFilterClient.new(node.peerManager, node.rng) + + try: + await node.wakuFilterClient.start() + except CatchableError: + error "failed to start wakuFilterClient", error = getCurrentExceptionMsg() + + try: + node.switch.mount(node.wakuFilterClient, protocolMatcher(WakuFilterSubscribeCodec)) + except LPError: + error "failed to mount wakuFilterClient", error = getCurrentExceptionMsg() + +proc filterSubscribe*( + node: WakuNode, + pubsubTopic: Option[PubsubTopic], + contentTopics: ContentTopic | seq[ContentTopic], + peer: RemotePeerInfo | string, +): Future[FilterSubscribeResult] {.async: (raises: []).} = + ## Registers for messages that match a specific filter. Triggers the handler whenever a message is received. + if node.wakuFilterClient.isNil(): + error "cannot register filter subscription to topic", + error = "waku filter client is not set up" + return err(FilterSubscribeError.serviceUnavailable()) + + let remotePeerRes = parsePeerInfo(peer) + if remotePeerRes.isErr(): + error "Couldn't parse the peer info properly", error = remotePeerRes.error + return err(FilterSubscribeError.serviceUnavailable("No peers available")) + + let remotePeer = remotePeerRes.value + + if pubsubTopic.isSome(): + info "registering filter subscription to content", + pubsubTopic = pubsubTopic.get(), + contentTopics = contentTopics, + peer = remotePeer.peerId + + when (contentTopics is ContentTopic): + let contentTopics = @[contentTopics] + let subRes = await node.wakuFilterClient.subscribe( + remotePeer, pubsubTopic.get(), contentTopics + ) + if subRes.isOk(): + info "v2 subscribed to topic", + pubsubTopic = pubsubTopic, contentTopics = contentTopics + + # Purpose is to update Waku Metadata + node.topicSubscriptionQueue.emit((kind: PubsubSub, topic: pubsubTopic.get())) + else: + error "failed filter v2 subscription", error = subRes.error + waku_node_errors.inc(labelValues = ["subscribe_filter_failure"]) + + return subRes + elif node.wakuAutoSharding.isNone(): + error "Failed filter subscription, pubsub topic must be specified with static sharding" + waku_node_errors.inc(labelValues = ["subscribe_filter_failure"]) + else: + # No pubsub topic, autosharding is used to deduce it + # but content topics must be well-formed for this + let topicMapRes = + node.wakuAutoSharding.get().getShardsFromContentTopics(contentTopics) + + let topicMap = + if topicMapRes.isErr(): + error "can't get shard", error = topicMapRes.error + return err(FilterSubscribeError.badResponse("can't get shard")) + else: + topicMapRes.get() + + var futures = collect(newSeq): + for shard, topics in topicMap.pairs: + info "registering filter subscription to content", + shard = shard, contentTopics = topics, peer = remotePeer.peerId + let content = topics.mapIt($it) + node.wakuFilterClient.subscribe(remotePeer, $shard, content) + + var subRes: FilterSubscribeResult = FilterSubscribeResult.ok() + try: + let finished = await allFinished(futures) + + for fut in finished: + let res = fut.read() + + if res.isErr(): + error "failed filter subscription", error = res.error + waku_node_errors.inc(labelValues = ["subscribe_filter_failure"]) + subRes = FilterSubscribeResult.err(res.error) + + for pubsub, topics in topicMap.pairs: + info "subscribed to topic", pubsubTopic = pubsub, contentTopics = topics + + # Purpose is to update Waku Metadata + node.topicSubscriptionQueue.emit((kind: PubsubSub, topic: $pubsub)) + except CatchableError: + let errMsg = "exception in filterSubscribe: " & getCurrentExceptionMsg() + error "exception in filterSubscribe", error = getCurrentExceptionMsg() + waku_node_errors.inc(labelValues = ["subscribe_filter_failure"]) + subRes = + FilterSubscribeResult.err(FilterSubscribeError.serviceUnavailable(errMsg)) + + # return the last error or ok + return subRes + +proc filterUnsubscribe*( + node: WakuNode, + pubsubTopic: Option[PubsubTopic], + contentTopics: ContentTopic | seq[ContentTopic], + peer: RemotePeerInfo | string, +): Future[FilterSubscribeResult] {.async: (raises: []).} = + ## Unsubscribe from a content filter V2". + + let remotePeerRes = parsePeerInfo(peer) + if remotePeerRes.isErr(): + error "couldn't parse remotePeerInfo", error = remotePeerRes.error + return err(FilterSubscribeError.serviceUnavailable("No peers available")) + + let remotePeer = remotePeerRes.value + + if pubsubTopic.isSome(): + info "deregistering filter subscription to content", + pubsubTopic = pubsubTopic.get(), + contentTopics = contentTopics, + peer = remotePeer.peerId + + let unsubRes = await node.wakuFilterClient.unsubscribe( + remotePeer, pubsubTopic.get(), contentTopics + ) + if unsubRes.isOk(): + info "unsubscribed from topic", + pubsubTopic = pubsubTopic.get(), contentTopics = contentTopics + + # Purpose is to update Waku Metadata + node.topicSubscriptionQueue.emit((kind: PubsubUnsub, topic: pubsubTopic.get())) + else: + error "failed filter unsubscription", error = unsubRes.error + waku_node_errors.inc(labelValues = ["unsubscribe_filter_failure"]) + + return unsubRes + elif node.wakuAutoSharding.isNone(): + error "Failed filter un-subscription, pubsub topic must be specified with static sharding" + waku_node_errors.inc(labelValues = ["unsubscribe_filter_failure"]) + else: # pubsubTopic.isNone + let topicMapRes = + node.wakuAutoSharding.get().getShardsFromContentTopics(contentTopics) + + let topicMap = + if topicMapRes.isErr(): + error "can't get shard", error = topicMapRes.error + return err(FilterSubscribeError.badResponse("can't get shard")) + else: + topicMapRes.get() + + var futures = collect(newSeq): + for shard, topics in topicMap.pairs: + info "deregistering filter subscription to content", + shard = shard, contentTopics = topics, peer = remotePeer.peerId + let content = topics.mapIt($it) + node.wakuFilterClient.unsubscribe(remotePeer, $shard, content) + + var unsubRes: FilterSubscribeResult = FilterSubscribeResult.ok() + try: + let finished = await allFinished(futures) + + for fut in finished: + let res = fut.read() + + if res.isErr(): + error "failed filter unsubscription", error = res.error + waku_node_errors.inc(labelValues = ["unsubscribe_filter_failure"]) + unsubRes = FilterSubscribeResult.err(res.error) + + for pubsub, topics in topicMap.pairs: + info "unsubscribed from topic", pubsubTopic = pubsub, contentTopics = topics + + # Purpose is to update Waku Metadata + node.topicSubscriptionQueue.emit((kind: PubsubUnsub, topic: $pubsub)) + except CatchableError: + let errMsg = "exception in filterUnsubscribe: " & getCurrentExceptionMsg() + error "exception in filterUnsubscribe", error = getCurrentExceptionMsg() + waku_node_errors.inc(labelValues = ["unsubscribe_filter_failure"]) + unsubRes = + FilterSubscribeResult.err(FilterSubscribeError.serviceUnavailable(errMsg)) + + # return the last error or ok + return unsubRes + +proc filterUnsubscribeAll*( + node: WakuNode, peer: RemotePeerInfo | string +): Future[FilterSubscribeResult] {.async: (raises: []).} = + ## Unsubscribe from a content filter V2". + + let remotePeerRes = parsePeerInfo(peer) + if remotePeerRes.isErr(): + error "couldn't parse remotePeerInfo", error = remotePeerRes.error + return err(FilterSubscribeError.serviceUnavailable("No peers available")) + + let remotePeer = remotePeerRes.value + + info "deregistering all filter subscription to content", peer = remotePeer.peerId + + let unsubRes = await node.wakuFilterClient.unsubscribeAll(remotePeer) + if unsubRes.isOk(): + info "unsubscribed from all content-topic", peerId = remotePeer.peerId + else: + error "failed filter unsubscription from all content-topic", error = unsubRes.error + waku_node_errors.inc(labelValues = ["unsubscribe_filter_failure"]) + + return unsubRes + +# NOTICE: subscribe / unsubscribe methods are removed - they were already depricated +# yet incompatible to handle both type of filters - use specific filter registration instead diff --git a/waku/node/api/lightpush.nim b/waku/node/api/lightpush.nim new file mode 100644 index 000000000..c5ab75195 --- /dev/null +++ b/waku/node/api/lightpush.nim @@ -0,0 +1,288 @@ +{.push raises: [].} + +import + std/[hashes, options, tables, net], + chronos, + chronicles, + metrics, + results, + stew/byteutils, + eth/keys, + eth/p2p/discoveryv5/enr, + libp2p/crypto/crypto, + libp2p/protocols/ping, + libp2p/protocols/pubsub/gossipsub, + libp2p/protocols/pubsub/rpc/messages, + libp2p/builders, + libp2p/transports/tcptransport, + libp2p/transports/wstransport, + libp2p/utility, + mix + +import + ../waku_node, + ../../waku_core, + ../../waku_core/topics/sharding, + ../../waku_lightpush_legacy/client as legacy_lightpush_client, + ../../waku_lightpush_legacy as legacy_lightpush_protocol, + ../../waku_lightpush/client as lightpush_client, + ../../waku_lightpush as lightpush_protocol, + ../peer_manager, + ../../common/rate_limit/setting, + ../../waku_rln_relay + +logScope: + topics = "waku node lightpush api" + +## Waku lightpush +proc mountLegacyLightPush*( + node: WakuNode, rateLimit: RateLimitSetting = DefaultGlobalNonRelayRateLimit +) {.async.} = + info "mounting legacy light push" + + let pushHandler = + if node.wakuRelay.isNil: + debug "mounting legacy lightpush without relay (nil)" + legacy_lightpush_protocol.getNilPushHandler() + else: + debug "mounting legacy lightpush with relay" + let rlnPeer = + if isNil(node.wakuRlnRelay): + debug "mounting legacy lightpush without rln-relay" + none(WakuRLNRelay) + else: + debug "mounting legacy lightpush with rln-relay" + some(node.wakuRlnRelay) + legacy_lightpush_protocol.getRelayPushHandler(node.wakuRelay, rlnPeer) + + node.wakuLegacyLightPush = + WakuLegacyLightPush.new(node.peerManager, node.rng, pushHandler, some(rateLimit)) + + if node.started: + # Node has started already. Let's start lightpush too. + await node.wakuLegacyLightPush.start() + + node.switch.mount(node.wakuLegacyLightPush, protocolMatcher(WakuLegacyLightPushCodec)) + +proc mountLegacyLightPushClient*(node: WakuNode) = + info "mounting legacy light push client" + + if node.wakuLegacyLightpushClient.isNil(): + node.wakuLegacyLightpushClient = + WakuLegacyLightPushClient.new(node.peerManager, node.rng) + +proc legacyLightpushPublish*( + node: WakuNode, + pubsubTopic: Option[PubsubTopic], + message: WakuMessage, + peer: RemotePeerInfo, +): Future[legacy_lightpush_protocol.WakuLightPushResult[string]] {.async, gcsafe.} = + ## Pushes a `WakuMessage` to a node which relays it further on PubSub topic. + ## Returns whether relaying was successful or not. + ## `WakuMessage` should contain a `contentTopic` field for light node + ## functionality. + if node.wakuLegacyLightpushClient.isNil() and node.wakuLegacyLightPush.isNil(): + error "failed to publish message as legacy lightpush not available" + return err("Waku lightpush not available") + + let internalPublish = proc( + node: WakuNode, + pubsubTopic: PubsubTopic, + message: WakuMessage, + peer: RemotePeerInfo, + ): Future[legacy_lightpush_protocol.WakuLightPushResult[string]] {.async, gcsafe.} = + let msgHash = pubsubTopic.computeMessageHash(message).to0xHex() + if not node.wakuLegacyLightpushClient.isNil(): + notice "publishing message with legacy lightpush", + pubsubTopic = pubsubTopic, + contentTopic = message.contentTopic, + target_peer_id = peer.peerId, + msg_hash = msgHash + return await node.wakuLegacyLightpushClient.publish(pubsubTopic, message, peer) + + if not node.wakuLegacyLightPush.isNil(): + notice "publishing message with self hosted legacy lightpush", + pubsubTopic = pubsubTopic, + contentTopic = message.contentTopic, + target_peer_id = peer.peerId, + msg_hash = msgHash + return + await node.wakuLegacyLightPush.handleSelfLightPushRequest(pubsubTopic, message) + try: + if pubsubTopic.isSome(): + return await internalPublish(node, pubsubTopic.get(), message, peer) + + if node.wakuAutoSharding.isNone(): + return err("Pubsub topic must be specified when static sharding is enabled") + let topicMapRes = + node.wakuAutoSharding.get().getShardsFromContentTopics(message.contentTopic) + + let topicMap = + if topicMapRes.isErr(): + return err(topicMapRes.error) + else: + topicMapRes.get() + + for pubsub, _ in topicMap.pairs: # There's only one pair anyway + return await internalPublish(node, $pubsub, message, peer) + except CatchableError: + return err(getCurrentExceptionMsg()) + +# TODO: Move to application module (e.g., wakunode2.nim) +proc legacyLightpushPublish*( + node: WakuNode, pubsubTopic: Option[PubsubTopic], message: WakuMessage +): Future[legacy_lightpush_protocol.WakuLightPushResult[string]] {. + async, gcsafe, deprecated: "Use 'node.legacyLightpushPublish()' instead" +.} = + if node.wakuLegacyLightpushClient.isNil() and node.wakuLegacyLightPush.isNil(): + error "failed to publish message as legacy lightpush not available" + return err("waku legacy lightpush not available") + + var peerOpt: Option[RemotePeerInfo] = none(RemotePeerInfo) + if not node.wakuLegacyLightpushClient.isNil(): + peerOpt = node.peerManager.selectPeer(WakuLegacyLightPushCodec) + if peerOpt.isNone(): + let msg = "no suitable remote peers" + error "failed to publish message", err = msg + return err(msg) + elif not node.wakuLegacyLightPush.isNil(): + peerOpt = some(RemotePeerInfo.init($node.switch.peerInfo.peerId)) + + return await node.legacyLightpushPublish(pubsubTopic, message, peer = peerOpt.get()) + +proc mountLightPush*( + node: WakuNode, rateLimit: RateLimitSetting = DefaultGlobalNonRelayRateLimit +) {.async.} = + info "mounting light push" + + let pushHandler = + if node.wakuRelay.isNil(): + debug "mounting lightpush v2 without relay (nil)" + lightpush_protocol.getNilPushHandler() + else: + debug "mounting lightpush with relay" + let rlnPeer = + if isNil(node.wakuRlnRelay): + debug "mounting lightpush without rln-relay" + none(WakuRLNRelay) + else: + debug "mounting lightpush with rln-relay" + some(node.wakuRlnRelay) + lightpush_protocol.getRelayPushHandler(node.wakuRelay, rlnPeer) + + node.wakuLightPush = WakuLightPush.new( + node.peerManager, node.rng, pushHandler, node.wakuAutoSharding, some(rateLimit) + ) + + if node.started: + # Node has started already. Let's start lightpush too. + await node.wakuLightPush.start() + + node.switch.mount(node.wakuLightPush, protocolMatcher(WakuLightPushCodec)) + +proc mountLightPushClient*(node: WakuNode) = + info "mounting light push client" + + if node.wakuLightpushClient.isNil(): + node.wakuLightpushClient = WakuLightPushClient.new(node.peerManager, node.rng) + +proc lightpushPublishHandler( + node: WakuNode, + pubsubTopic: PubsubTopic, + message: WakuMessage, + peer: RemotePeerInfo | PeerInfo, + mixify: bool = false, +): Future[lightpush_protocol.WakuLightPushResult] {.async.} = + let msgHash = pubsubTopic.computeMessageHash(message).to0xHex() + + if not node.wakuLightpushClient.isNil(): + notice "publishing message with lightpush", + pubsubTopic = pubsubTopic, + contentTopic = message.contentTopic, + target_peer_id = peer.peerId, + msg_hash = msgHash, + mixify = mixify + if mixify: #indicates we want to use mix to send the message + #TODO: How to handle multiple addresses? + let conn = node.wakuMix.toConnection( + MixDestination.init(peer.peerId, peer.addrs[0]), + WakuLightPushCodec, + Opt.some( + MixParameters(expectReply: Opt.some(true), numSurbs: Opt.some(byte(1))) + # indicating we only want a single path to be used for reply hence numSurbs = 1 + ), + ).valueOr: + error "could not create mix connection" + return lighpushErrorResult( + LightPushErrorCode.SERVICE_NOT_AVAILABLE, + "Waku lightpush with mix not available", + ) + + return await node.wakuLightpushClient.publishWithConn( + pubsubTopic, message, conn, peer.peerId + ) + else: + return await node.wakuLightpushClient.publish(some(pubsubTopic), message, peer) + + if not node.wakuLightPush.isNil(): + if mixify: + error "mixify is not supported with self hosted lightpush" + return lighpushErrorResult( + LightPushErrorCode.SERVICE_NOT_AVAILABLE, + "Waku lightpush with mix not available", + ) + notice "publishing message with self hosted lightpush", + pubsubTopic = pubsubTopic, + contentTopic = message.contentTopic, + target_peer_id = peer.peerId, + msg_hash = msgHash + return + await node.wakuLightPush.handleSelfLightPushRequest(some(pubsubTopic), message) + +proc lightpushPublish*( + node: WakuNode, + pubsubTopic: Option[PubsubTopic], + message: WakuMessage, + peerOpt: Option[RemotePeerInfo] = none(RemotePeerInfo), + mixify: bool = false, +): Future[lightpush_protocol.WakuLightPushResult] {.async.} = + if node.wakuLightpushClient.isNil() and node.wakuLightPush.isNil(): + error "failed to publish message as lightpush not available" + return lighpushErrorResult( + LightPushErrorCode.SERVICE_NOT_AVAILABLE, "Waku lightpush not available" + ) + if mixify and node.wakuMix.isNil(): + error "failed to publish message using mix as mix protocol is not mounted" + return lighpushErrorResult( + LightPushErrorCode.SERVICE_NOT_AVAILABLE, "Waku lightpush with mix not available" + ) + let toPeer: RemotePeerInfo = peerOpt.valueOr: + if not node.wakuLightPush.isNil(): + RemotePeerInfo.init(node.peerId()) + elif not node.wakuLightpushClient.isNil(): + node.peerManager.selectPeer(WakuLightPushCodec).valueOr: + let msg = "no suitable remote peers" + error "failed to publish message", msg = msg + return lighpushErrorResult(LightPushErrorCode.NO_PEERS_TO_RELAY, msg) + else: + return lighpushErrorResult( + LightPushErrorCode.NO_PEERS_TO_RELAY, "no suitable remote peers" + ) + + let pubsubForPublish = pubSubTopic.valueOr: + if node.wakuAutoSharding.isNone(): + let msg = "Pubsub topic must be specified when static sharding is enabled" + error "lightpush publish error", error = msg + return lighpushErrorResult(LightPushErrorCode.INVALID_MESSAGE, msg) + + let parsedTopic = NsContentTopic.parse(message.contentTopic).valueOr: + let msg = "Invalid content-topic:" & $error + error "lightpush request handling error", error = msg + return lighpushErrorResult(LightPushErrorCode.INVALID_MESSAGE, msg) + + node.wakuAutoSharding.get().getShard(parsedTopic).valueOr: + let msg = "Autosharding error: " & error + error "lightpush publish error", error = msg + return lighpushErrorResult(LightPushErrorCode.INTERNAL_SERVER_ERROR, msg) + + return await lightpushPublishHandler(node, pubsubForPublish, message, toPeer, mixify) diff --git a/waku/node/api/peer_exchange.nim b/waku/node/api/peer_exchange.nim new file mode 100644 index 000000000..d2e0f5575 --- /dev/null +++ b/waku/node/api/peer_exchange.nim @@ -0,0 +1,120 @@ +{.push raises: [].} + +import + std/[options, tables, net], + chronos, + chronicles, + metrics, + results, + eth/keys, + eth/p2p/discoveryv5/enr, + libp2p/crypto/crypto, + libp2p/protocols/ping, + libp2p/protocols/pubsub/gossipsub, + libp2p/protocols/pubsub/rpc/messages, + libp2p/builders, + libp2p/transports/tcptransport, + libp2p/transports/wstransport, + libp2p/utility + +import + ../waku_node, + ../../waku_peer_exchange, + ../../waku_core, + ../peer_manager, + ../../common/rate_limit/setting + +logScope: + topics = "waku node peerexchange api" + +## Waku peer-exchange + +proc mountPeerExchange*( + node: WakuNode, + cluster: Option[uint16] = none(uint16), + rateLimit: RateLimitSetting = DefaultGlobalNonRelayRateLimit, +) {.async: (raises: []).} = + info "mounting waku peer exchange" + + node.wakuPeerExchange = + WakuPeerExchange.new(node.peerManager, cluster, some(rateLimit)) + + if node.started: + try: + await node.wakuPeerExchange.start() + except CatchableError: + error "failed to start wakuPeerExchange", error = getCurrentExceptionMsg() + + try: + node.switch.mount(node.wakuPeerExchange, protocolMatcher(WakuPeerExchangeCodec)) + except LPError: + error "failed to mount wakuPeerExchange", error = getCurrentExceptionMsg() + +proc mountPeerExchangeClient*(node: WakuNode) {.async: (raises: []).} = + info "mounting waku peer exchange client" + if node.wakuPeerExchangeClient.isNil(): + node.wakuPeerExchangeClient = WakuPeerExchangeClient.new(node.peerManager) + +proc fetchPeerExchangePeers*( + node: Wakunode, amount = DefaultPXNumPeersReq +): Future[Result[int, PeerExchangeResponseStatus]] {.async: (raises: []).} = + if node.wakuPeerExchangeClient.isNil(): + error "could not get peers from px, waku peer-exchange-client is nil" + return err( + ( + status_code: PeerExchangeResponseStatusCode.SERVICE_UNAVAILABLE, + status_desc: some("PeerExchangeClient is not mounted"), + ) + ) + + info "Retrieving peer info via peer exchange protocol", amount + let pxPeersRes = await node.wakuPeerExchangeClient.request(amount) + if pxPeersRes.isOk(): + var validPeers = 0 + let peers = pxPeersRes.get().peerInfos + for pi in peers: + var record: enr.Record + if enr.fromBytes(record, pi.enr): + node.peerManager.addPeer(record.toRemotePeerInfo().get, PeerExchange) + validPeers += 1 + info "Retrieved peer info via peer exchange protocol", + validPeers = validPeers, totalPeers = peers.len + return ok(validPeers) + else: + warn "failed to retrieve peer info via peer exchange protocol", + error = pxPeersRes.error + return err(pxPeersRes.error) + +proc peerExchangeLoop(node: WakuNode) {.async.} = + while true: + if not node.started: + await sleepAsync(5.seconds) + continue + (await node.fetchPeerExchangePeers()).isOkOr: + warn "Cannot fetch peers from peer exchange", cause = error + await sleepAsync(1.minutes) + +proc startPeerExchangeLoop*(node: WakuNode) = + if node.wakuPeerExchangeClient.isNil(): + error "startPeerExchangeLoop: Peer Exchange is not mounted" + return + info "Starting peer exchange loop" + node.wakuPeerExchangeClient.pxLoopHandle = node.peerExchangeLoop() + +# TODO: Move to application module (e.g., wakunode2.nim) +proc setPeerExchangePeer*( + node: WakuNode, peer: RemotePeerInfo | MultiAddress | string +) = + if node.wakuPeerExchange.isNil(): + error "could not set peer, waku peer-exchange is nil" + return + + info "Set peer-exchange peer", peer = peer + + let remotePeerRes = parsePeerInfo(peer) + if remotePeerRes.isErr(): + error "could not parse peer info", error = remotePeerRes.error + return + + node.peerManager.addPeer(remotePeerRes.value, PeerExchange) + waku_px_peers.inc() diff --git a/waku/node/api/ping.nim b/waku/node/api/ping.nim new file mode 100644 index 000000000..ceaad6696 --- /dev/null +++ b/waku/node/api/ping.nim @@ -0,0 +1,87 @@ +{.push raises: [].} + +import + std/[options], + chronos, + chronicles, + metrics, + results, + libp2p/protocols/ping, + libp2p/builders, + libp2p/transports/tcptransport, + libp2p/utility + +import ../waku_node, ../peer_manager + +logScope: + topics = "waku node ping api" + +proc mountLibp2pPing*(node: WakuNode) {.async: (raises: []).} = + info "mounting libp2p ping protocol" + + try: + node.libp2pPing = Ping.new(rng = node.rng) + except Exception as e: + error "failed to create ping", error = getCurrentExceptionMsg() + + if node.started: + # Node has started already. Let's start ping too. + try: + await node.libp2pPing.start() + except CatchableError: + error "failed to start libp2pPing", error = getCurrentExceptionMsg() + + try: + node.switch.mount(node.libp2pPing) + except LPError: + error "failed to mount libp2pPing", error = getCurrentExceptionMsg() + +proc pingPeer(node: WakuNode, peerId: PeerId): Future[Result[void, string]] {.async.} = + ## Ping a single peer and return the result + + try: + # Establish a stream + let stream = (await node.peerManager.dialPeer(peerId, PingCodec)).valueOr: + error "pingPeer: failed dialing peer", peerId = peerId + return err("pingPeer failed dialing peer peerId: " & $peerId) + defer: + # Always close the stream + try: + await stream.close() + except CatchableError as e: + debug "Error closing ping connection", peerId = peerId, error = e.msg + + # Perform ping + let pingDuration = await node.libp2pPing.ping(stream) + + trace "Ping successful", peerId = peerId, duration = pingDuration + return ok() + except CatchableError as e: + error "pingPeer: exception raised pinging peer", peerId = peerId, error = e.msg + return err("pingPeer: exception raised pinging peer: " & e.msg) + +# Returns the number of succesful pings performed +proc parallelPings*(node: WakuNode, peerIds: seq[PeerId]): Future[int] {.async.} = + if len(peerIds) == 0: + return 0 + + var pingFuts: seq[Future[Result[void, string]]] + + # Create ping futures for each peer + for i, peerId in peerIds: + let fut = pingPeer(node, peerId) + pingFuts.add(fut) + + # Wait for all pings to complete + discard await allFutures(pingFuts).withTimeout(5.seconds) + + var successCount = 0 + for fut in pingFuts: + if not fut.completed() or fut.failed(): + continue + + let res = fut.read() + if res.isOk(): + successCount.inc() + + return successCount diff --git a/waku/node/api/relay.nim b/waku/node/api/relay.nim new file mode 100644 index 000000000..f456bf2fc --- /dev/null +++ b/waku/node/api/relay.nim @@ -0,0 +1,256 @@ +{.push raises: [].} + +import + std/[options, net], + chronos, + chronicles, + metrics, + results, + stew/byteutils, + eth/keys, + eth/p2p/discoveryv5/enr, + libp2p/crypto/crypto, + libp2p/protocols/ping, + libp2p/protocols/pubsub/gossipsub, + libp2p/protocols/pubsub/rpc/messages, + libp2p/builders, + libp2p/transports/tcptransport, + libp2p/transports/wstransport, + libp2p/utility + +import + ../waku_node, + ../../waku_relay, + ../../waku_core, + ../../waku_core/topics/sharding, + ../../waku_filter_v2, + ../../waku_archive_legacy, + ../../waku_archive, + ../../waku_store_sync, + ../peer_manager, + ../../waku_rln_relay + +declarePublicHistogram waku_histogram_message_size, + "message size histogram in kB", + buckets = [ + 0.0, 1.0, 3.0, 5.0, 15.0, 50.0, 75.0, 100.0, 125.0, 150.0, 500.0, 700.0, 1000.0, Inf + ] + +logScope: + topics = "waku node relay api" + +## Waku relay + +proc registerRelayHandler( + node: WakuNode, topic: PubsubTopic, appHandler: WakuRelayHandler +) = + ## Registers the only handler for the given topic. + ## Notice that this handler internally calls other handlers, such as filter, + ## archive, etc, plus the handler provided by the application. + + if node.wakuRelay.isSubscribed(topic): + return + + proc traceHandler(topic: PubsubTopic, msg: WakuMessage) {.async, gcsafe.} = + let msgSizeKB = msg.payload.len / 1000 + + waku_node_messages.inc(labelValues = ["relay"]) + waku_histogram_message_size.observe(msgSizeKB) + + proc filterHandler(topic: PubsubTopic, msg: WakuMessage) {.async, gcsafe.} = + if node.wakuFilter.isNil(): + return + + await node.wakuFilter.handleMessage(topic, msg) + + proc archiveHandler(topic: PubsubTopic, msg: WakuMessage) {.async, gcsafe.} = + if not node.wakuLegacyArchive.isNil(): + ## we try to store with legacy archive + await node.wakuLegacyArchive.handleMessage(topic, msg) + return + + if node.wakuArchive.isNil(): + return + + await node.wakuArchive.handleMessage(topic, msg) + + proc syncHandler(topic: PubsubTopic, msg: WakuMessage) {.async, gcsafe.} = + if node.wakuStoreReconciliation.isNil(): + return + + node.wakuStoreReconciliation.messageIngress(topic, msg) + + let uniqueTopicHandler = proc( + topic: PubsubTopic, msg: WakuMessage + ): Future[void] {.async, gcsafe.} = + await traceHandler(topic, msg) + await filterHandler(topic, msg) + await archiveHandler(topic, msg) + await syncHandler(topic, msg) + await appHandler(topic, msg) + + node.wakuRelay.subscribe(topic, uniqueTopicHandler) + +proc subscribe*( + node: WakuNode, subscription: SubscriptionEvent, handler: WakuRelayHandler +): Result[void, string] = + ## Subscribes to a PubSub or Content topic. Triggers handler when receiving messages on + ## this topic. WakuRelayHandler is a method that takes a topic and a Waku message. + + if node.wakuRelay.isNil(): + error "Invalid API call to `subscribe`. WakuRelay not mounted." + return err("Invalid API call to `subscribe`. WakuRelay not mounted.") + + let (pubsubTopic, contentTopicOp) = + case subscription.kind + of ContentSub: + if node.wakuAutoSharding.isSome(): + let shard = node.wakuAutoSharding.get().getShard((subscription.topic)).valueOr: + error "Autosharding error", error = error + return err("Autosharding error: " & error) + ($shard, some(subscription.topic)) + else: + return err( + "Static sharding is used, relay subscriptions must specify a pubsub topic" + ) + of PubsubSub: + (subscription.topic, none(ContentTopic)) + else: + return err("Unsupported subscription type in relay subscribe") + + if node.wakuRelay.isSubscribed(pubsubTopic): + warn "No-effect API call to subscribe. Already subscribed to topic", pubsubTopic + return ok() + + node.registerRelayHandler(pubsubTopic, handler) + node.topicSubscriptionQueue.emit((kind: PubsubSub, topic: pubsubTopic)) + + return ok() + +proc unsubscribe*( + node: WakuNode, subscription: SubscriptionEvent +): Result[void, string] = + ## Unsubscribes from a specific PubSub or Content topic. + + if node.wakuRelay.isNil(): + error "Invalid API call to `unsubscribe`. WakuRelay not mounted." + return err("Invalid API call to `unsubscribe`. WakuRelay not mounted.") + + let (pubsubTopic, contentTopicOp) = + case subscription.kind + of ContentUnsub: + if node.wakuAutoSharding.isSome(): + let shard = node.wakuAutoSharding.get().getShard((subscription.topic)).valueOr: + error "Autosharding error", error = error + return err("Autosharding error: " & error) + ($shard, some(subscription.topic)) + else: + return err( + "Static sharding is used, relay subscriptions must specify a pubsub topic" + ) + of PubsubUnsub: + (subscription.topic, none(ContentTopic)) + else: + return err("Unsupported subscription type in relay unsubscribe") + + if not node.wakuRelay.isSubscribed(pubsubTopic): + warn "No-effect API call to `unsubscribe`. Was not subscribed", pubsubTopic + return ok() + + debug "unsubscribe", pubsubTopic, contentTopicOp + node.wakuRelay.unsubscribe(pubsubTopic) + node.topicSubscriptionQueue.emit((kind: PubsubUnsub, topic: pubsubTopic)) + + return ok() + +proc publish*( + node: WakuNode, pubsubTopicOp: Option[PubsubTopic], message: WakuMessage +): Future[Result[void, string]] {.async, gcsafe.} = + ## Publish a `WakuMessage`. Pubsub topic contains; none, a named or static shard. + ## `WakuMessage` should contain a `contentTopic` field for light node functionality. + ## It is also used to determine the shard. + + if node.wakuRelay.isNil(): + let msg = + "Invalid API call to `publish`. WakuRelay not mounted. Try `lightpush` instead." + error "publish error", err = msg + # TODO: Improve error handling + return err(msg) + + let pubsubTopic = pubsubTopicOp.valueOr: + if node.wakuAutoSharding.isNone(): + return err("Pubsub topic must be specified when static sharding is enabled.") + node.wakuAutoSharding.get().getShard(message.contentTopic).valueOr: + let msg = "Autosharding error: " & error + return err(msg) + + #TODO instead of discard return error when 0 peers received the message + discard await node.wakuRelay.publish(pubsubTopic, message) + + notice "waku.relay published", + peerId = node.peerId, + pubsubTopic = pubsubTopic, + msg_hash = pubsubTopic.computeMessageHash(message).to0xHex(), + publishTime = getNowInNanosecondTime() + + return ok() + +proc mountRelay*( + node: WakuNode, + peerExchangeHandler = none(RoutingRecordsHandler), + maxMessageSize = int(DefaultMaxWakuMessageSize), +): Future[Result[void, string]] {.async.} = + if not node.wakuRelay.isNil(): + error "wakuRelay already mounted, skipping" + return err("wakuRelay already mounted, skipping") + + ## The default relay topics is the union of all configured topics plus default PubsubTopic(s) + info "mounting relay protocol" + + node.wakuRelay = WakuRelay.new(node.switch, maxMessageSize).valueOr: + error "failed mounting relay protocol", error = error + return err("failed mounting relay protocol: " & error) + + ## Add peer exchange handler + if peerExchangeHandler.isSome(): + node.wakuRelay.parameters.enablePX = true + # Feature flag for peer exchange in nim-libp2p + node.wakuRelay.routingRecordsHandler.add(peerExchangeHandler.get()) + + if node.started: + await node.startRelay() + + node.switch.mount(node.wakuRelay, protocolMatcher(WakuRelayCodec)) + + info "relay mounted successfully" + return ok() + + ## Waku RLN Relay + +proc mountRlnRelay*( + node: WakuNode, + rlnConf: WakuRlnConfig, + spamHandler = none(SpamHandler), + registrationHandler = none(RegistrationHandler), +) {.async.} = + info "mounting rln relay" + + if node.wakuRelay.isNil(): + raise newException( + CatchableError, "WakuRelay protocol is not mounted, cannot mount WakuRlnRelay" + ) + + let rlnRelayRes = await WakuRlnRelay.new(rlnConf, registrationHandler) + if rlnRelayRes.isErr(): + raise + newException(CatchableError, "failed to mount WakuRlnRelay: " & rlnRelayRes.error) + let rlnRelay = rlnRelayRes.get() + if (rlnConf.userMessageLimit > rlnRelay.groupManager.rlnRelayMaxMessageLimit): + error "rln-relay-user-message-limit can't exceed the MAX_MESSAGE_LIMIT in the rln contract" + let validator = generateRlnValidator(rlnRelay, spamHandler) + + # register rln validator as default validator + debug "Registering RLN validator" + node.wakuRelay.addValidator(validator, "RLN validation failed") + + node.wakuRlnRelay = rlnRelay diff --git a/waku/node/api/store.nim b/waku/node/api/store.nim new file mode 100644 index 000000000..ddac5fbfd --- /dev/null +++ b/waku/node/api/store.nim @@ -0,0 +1,309 @@ +{.push raises: [].} + +import + std/[options], + chronos, + chronicles, + metrics, + results, + eth/keys, + eth/p2p/discoveryv5/enr, + libp2p/crypto/crypto, + libp2p/protocols/ping, + libp2p/protocols/pubsub/gossipsub, + libp2p/protocols/pubsub/rpc/messages, + libp2p/builders, + libp2p/transports/tcptransport, + libp2p/transports/wstransport, + libp2p/utility + +import + ../waku_node, + ../../waku_core, + ../../waku_store_legacy/protocol as legacy_store, + ../../waku_store_legacy/client as legacy_store_client, + ../../waku_store_legacy/common as legacy_store_common, + ../../waku_store/protocol as store, + ../../waku_store/client as store_client, + ../../waku_store/common as store_common, + ../../waku_store/resume, + ../peer_manager, + ../../common/rate_limit/setting, + ../../waku_archive, + ../../waku_archive_legacy + +logScope: + topics = "waku node store api" + +## Waku archive +proc mountArchive*( + node: WakuNode, + driver: waku_archive.ArchiveDriver, + retentionPolicy = none(waku_archive.RetentionPolicy), +): Result[void, string] = + node.wakuArchive = waku_archive.WakuArchive.new( + driver = driver, retentionPolicy = retentionPolicy + ).valueOr: + return err("error in mountArchive: " & error) + + node.wakuArchive.start() + + return ok() + +proc mountLegacyArchive*( + node: WakuNode, driver: waku_archive_legacy.ArchiveDriver +): Result[void, string] = + node.wakuLegacyArchive = waku_archive_legacy.WakuArchive.new(driver = driver).valueOr: + return err("error in mountLegacyArchive: " & error) + + return ok() + +## Legacy Waku Store + +# TODO: Review this mapping logic. Maybe, move it to the appplication code +proc toArchiveQuery( + request: legacy_store_common.HistoryQuery +): waku_archive_legacy.ArchiveQuery = + waku_archive_legacy.ArchiveQuery( + pubsubTopic: request.pubsubTopic, + contentTopics: request.contentTopics, + cursor: request.cursor.map( + proc(cursor: HistoryCursor): waku_archive_legacy.ArchiveCursor = + waku_archive_legacy.ArchiveCursor( + pubsubTopic: cursor.pubsubTopic, + senderTime: cursor.senderTime, + storeTime: cursor.storeTime, + digest: cursor.digest, + ) + ), + startTime: request.startTime, + endTime: request.endTime, + pageSize: request.pageSize.uint, + direction: request.direction, + requestId: request.requestId, + ) + +# TODO: Review this mapping logic. Maybe, move it to the appplication code +proc toHistoryResult*( + res: waku_archive_legacy.ArchiveResult +): legacy_store_common.HistoryResult = + if res.isErr(): + let error = res.error + case res.error.kind + of waku_archive_legacy.ArchiveErrorKind.DRIVER_ERROR, + waku_archive_legacy.ArchiveErrorKind.INVALID_QUERY: + err(HistoryError(kind: HistoryErrorKind.BAD_REQUEST, cause: res.error.cause)) + else: + err(HistoryError(kind: HistoryErrorKind.UNKNOWN)) + else: + let response = res.get() + ok( + HistoryResponse( + messages: response.messages, + cursor: response.cursor.map( + proc(cursor: waku_archive_legacy.ArchiveCursor): HistoryCursor = + HistoryCursor( + pubsubTopic: cursor.pubsubTopic, + senderTime: cursor.senderTime, + storeTime: cursor.storeTime, + digest: cursor.digest, + ) + ), + ) + ) + +proc mountLegacyStore*( + node: WakuNode, rateLimit: RateLimitSetting = DefaultGlobalNonRelayRateLimit +) {.async.} = + info "mounting waku legacy store protocol" + + if node.wakuLegacyArchive.isNil(): + error "failed to mount waku legacy store protocol", error = "waku archive not set" + return + + # TODO: Review this handler logic. Maybe, move it to the appplication code + let queryHandler: HistoryQueryHandler = proc( + request: HistoryQuery + ): Future[legacy_store_common.HistoryResult] {.async.} = + if request.cursor.isSome(): + request.cursor.get().checkHistCursor().isOkOr: + return err(error) + + let request = request.toArchiveQuery() + let response = await node.wakuLegacyArchive.findMessagesV2(request) + return response.toHistoryResult() + + node.wakuLegacyStore = legacy_store.WakuStore.new( + node.peerManager, node.rng, queryHandler, some(rateLimit) + ) + + if node.started: + # Node has started already. Let's start store too. + await node.wakuLegacyStore.start() + + node.switch.mount( + node.wakuLegacyStore, protocolMatcher(legacy_store_common.WakuLegacyStoreCodec) + ) + +proc mountLegacyStoreClient*(node: WakuNode) = + info "mounting legacy store client" + + node.wakuLegacyStoreClient = + legacy_store_client.WakuStoreClient.new(node.peerManager, node.rng) + +proc query*( + node: WakuNode, query: legacy_store_common.HistoryQuery, peer: RemotePeerInfo +): Future[legacy_store_common.WakuStoreResult[legacy_store_common.HistoryResponse]] {. + async, gcsafe +.} = + ## Queries known nodes for historical messages + if node.wakuLegacyStoreClient.isNil(): + return err("waku legacy store client is nil") + + let queryRes = await node.wakuLegacyStoreClient.query(query, peer) + if queryRes.isErr(): + return err("legacy store client query error: " & $queryRes.error) + + let response = queryRes.get() + + return ok(response) + +# TODO: Move to application module (e.g., wakunode2.nim) +proc query*( + node: WakuNode, query: legacy_store_common.HistoryQuery +): Future[legacy_store_common.WakuStoreResult[legacy_store_common.HistoryResponse]] {. + async, gcsafe, deprecated: "Use 'node.query()' with peer destination instead" +.} = + ## Queries known nodes for historical messages + if node.wakuLegacyStoreClient.isNil(): + return err("waku legacy store client is nil") + + let peerOpt = node.peerManager.selectPeer(legacy_store_common.WakuLegacyStoreCodec) + if peerOpt.isNone(): + error "no suitable remote peers" + return err("peer_not_found_failure") + + return await node.query(query, peerOpt.get()) + +when defined(waku_exp_store_resume): + # TODO: Move to application module (e.g., wakunode2.nim) + proc resume*( + node: WakuNode, peerList: Option[seq[RemotePeerInfo]] = none(seq[RemotePeerInfo]) + ) {.async, gcsafe.} = + ## resume proc retrieves the history of waku messages published on the default waku pubsub topic since the last time the waku node has been online + ## for resume to work properly the waku node must have the store protocol mounted in the full mode (i.e., persisting messages) + ## messages are stored in the wakuStore's messages field and in the message db + ## the offline time window is measured as the difference between the current time and the timestamp of the most recent persisted waku message + ## an offset of 20 second is added to the time window to count for nodes asynchrony + ## peerList indicates the list of peers to query from. The history is fetched from the first available peer in this list. Such candidates should be found through a discovery method (to be developed). + ## if no peerList is passed, one of the peers in the underlying peer manager unit of the store protocol is picked randomly to fetch the history from. + ## The history gets fetched successfully if the dialed peer has been online during the queried time window. + if node.wakuLegacyStoreClient.isNil(): + return + + let retrievedMessages = await node.wakuLegacyStoreClient.resume(peerList) + if retrievedMessages.isErr(): + error "failed to resume store", error = retrievedMessages.error + return + + info "the number of retrieved messages since the last online time: ", + number = retrievedMessages.value + +## Waku Store + +proc toArchiveQuery(request: StoreQueryRequest): waku_archive.ArchiveQuery = + var query = waku_archive.ArchiveQuery() + + query.includeData = request.includeData + query.pubsubTopic = request.pubsubTopic + query.contentTopics = request.contentTopics + query.startTime = request.startTime + query.endTime = request.endTime + query.hashes = request.messageHashes + query.cursor = request.paginationCursor + query.direction = request.paginationForward + query.requestId = request.requestId + + if request.paginationLimit.isSome(): + query.pageSize = uint(request.paginationLimit.get()) + + return query + +proc toStoreResult(res: waku_archive.ArchiveResult): StoreQueryResult = + let response = res.valueOr: + return err(StoreError.new(300, "archive error: " & $error)) + + var res = StoreQueryResponse() + + res.statusCode = 200 + res.statusDesc = "OK" + + for i in 0 ..< response.hashes.len: + let hash = response.hashes[i] + + let kv = store_common.WakuMessageKeyValue(messageHash: hash) + + res.messages.add(kv) + + for i in 0 ..< response.messages.len: + res.messages[i].message = some(response.messages[i]) + res.messages[i].pubsubTopic = some(response.topics[i]) + + res.paginationCursor = response.cursor + + return ok(res) + +proc mountStore*( + node: WakuNode, rateLimit: RateLimitSetting = DefaultGlobalNonRelayRateLimit +) {.async.} = + if node.wakuArchive.isNil(): + error "failed to mount waku store protocol", error = "waku archive not set" + return + + info "mounting waku store protocol" + + let requestHandler: StoreQueryRequestHandler = proc( + request: StoreQueryRequest + ): Future[StoreQueryResult] {.async.} = + let request = request.toArchiveQuery() + let response = await node.wakuArchive.findMessages(request) + + return response.toStoreResult() + + node.wakuStore = + store.WakuStore.new(node.peerManager, node.rng, requestHandler, some(rateLimit)) + + if node.started: + await node.wakuStore.start() + + node.switch.mount(node.wakuStore, protocolMatcher(store_common.WakuStoreCodec)) + +proc mountStoreClient*(node: WakuNode) = + info "mounting store client" + + node.wakuStoreClient = store_client.WakuStoreClient.new(node.peerManager, node.rng) + +proc query*( + node: WakuNode, request: store_common.StoreQueryRequest, peer: RemotePeerInfo +): Future[store_common.WakuStoreResult[store_common.StoreQueryResponse]] {. + async, gcsafe +.} = + ## Queries known nodes for historical messages + if node.wakuStoreClient.isNil(): + return err("waku store v3 client is nil") + + let response = (await node.wakuStoreClient.query(request, peer)).valueOr: + var res = StoreQueryResponse() + res.statusCode = uint32(error.kind) + res.statusDesc = $error + + return ok(res) + + return ok(response) + +proc setupStoreResume*(node: WakuNode) = + node.wakuStoreResume = StoreResume.new( + node.peerManager, node.wakuArchive, node.wakuStoreClient + ).valueOr: + error "Failed to setup Store Resume", error = $error + return diff --git a/waku/node/health_monitor/node_health_monitor.nim b/waku/node/health_monitor/node_health_monitor.nim index c73a2de05..a98e6577a 100644 --- a/waku/node/health_monitor/node_health_monitor.nim +++ b/waku/node/health_monitor/node_health_monitor.nim @@ -1,13 +1,14 @@ {.push raises: [].} import - std/[options, sets, strformat, random, sequtils], + std/[options, sets, random, sequtils], chronos, chronicles, libp2p/protocols/rendezvous import ../waku_node, + ../api, ../../waku_rln_relay, ../../waku_relay, ../peer_manager, diff --git a/waku/node/waku_node.nim b/waku/node/waku_node.nim index 35c91a698..1bb4eb881 100644 --- a/waku/node/waku_node.nim +++ b/waku/node/waku_node.nim @@ -1,15 +1,15 @@ {.push raises: [].} import - std/[hashes, options, sugar, tables, strutils, sequtils, os, net, random], + std/[options, tables, strutils, sequtils, os, net, random], chronos, chronicles, metrics, results, - stew/byteutils, eth/keys, nimcrypto, bearssl/rand, + stew/byteutils, eth/p2p/discoveryv5/enr, libp2p/crypto/crypto, libp2p/crypto/curve25519, @@ -42,7 +42,6 @@ import ../waku_store_sync, ../waku_filter_v2, ../waku_filter_v2/client as filter_client, - ../waku_filter_v2/subscriptions as filter_subscriptions, ../waku_metadata, ../waku_rendezvous/protocol, ../waku_lightpush_legacy/client as legacy_ligntpuhs_client, @@ -60,11 +59,6 @@ import ../waku_mix declarePublicCounter waku_node_messages, "number of messages received", ["type"] -declarePublicHistogram waku_histogram_message_size, - "message size histogram in kB", - buckets = [ - 0.0, 1.0, 3.0, 5.0, 15.0, 50.0, 75.0, 100.0, 125.0, 150.0, 500.0, 700.0, 1000.0, Inf - ] declarePublicGauge waku_version, "Waku version info (in git describe format)", ["version"] @@ -323,162 +317,6 @@ proc mountStoreSync*( return ok() -## Waku relay - -proc registerRelayHandler( - node: WakuNode, topic: PubsubTopic, appHandler: WakuRelayHandler -) = - ## Registers the only handler for the given topic. - ## Notice that this handler internally calls other handlers, such as filter, - ## archive, etc, plus the handler provided by the application. - - if node.wakuRelay.isSubscribed(topic): - return - - proc traceHandler(topic: PubsubTopic, msg: WakuMessage) {.async, gcsafe.} = - let msgSizeKB = msg.payload.len / 1000 - - waku_node_messages.inc(labelValues = ["relay"]) - waku_histogram_message_size.observe(msgSizeKB) - - proc filterHandler(topic: PubsubTopic, msg: WakuMessage) {.async, gcsafe.} = - if node.wakuFilter.isNil(): - return - - await node.wakuFilter.handleMessage(topic, msg) - - proc archiveHandler(topic: PubsubTopic, msg: WakuMessage) {.async, gcsafe.} = - if not node.wakuLegacyArchive.isNil(): - ## we try to store with legacy archive - await node.wakuLegacyArchive.handleMessage(topic, msg) - return - - if node.wakuArchive.isNil(): - return - - await node.wakuArchive.handleMessage(topic, msg) - - proc syncHandler(topic: PubsubTopic, msg: WakuMessage) {.async, gcsafe.} = - if node.wakuStoreReconciliation.isNil(): - return - - node.wakuStoreReconciliation.messageIngress(topic, msg) - - let uniqueTopicHandler = proc( - topic: PubsubTopic, msg: WakuMessage - ): Future[void] {.async, gcsafe.} = - await traceHandler(topic, msg) - await filterHandler(topic, msg) - await archiveHandler(topic, msg) - await syncHandler(topic, msg) - await appHandler(topic, msg) - - node.wakuRelay.subscribe(topic, uniqueTopicHandler) - -proc subscribe*( - node: WakuNode, subscription: SubscriptionEvent, handler: WakuRelayHandler -): Result[void, string] = - ## Subscribes to a PubSub or Content topic. Triggers handler when receiving messages on - ## this topic. WakuRelayHandler is a method that takes a topic and a Waku message. - - if node.wakuRelay.isNil(): - error "Invalid API call to `subscribe`. WakuRelay not mounted." - return err("Invalid API call to `subscribe`. WakuRelay not mounted.") - - let (pubsubTopic, contentTopicOp) = - case subscription.kind - of ContentSub: - if node.wakuAutoSharding.isSome(): - let shard = node.wakuAutoSharding.get().getShard((subscription.topic)).valueOr: - error "Autosharding error", error = error - return err("Autosharding error: " & error) - ($shard, some(subscription.topic)) - else: - return err( - "Static sharding is used, relay subscriptions must specify a pubsub topic" - ) - of PubsubSub: - (subscription.topic, none(ContentTopic)) - else: - return err("Unsupported subscription type in relay subscribe") - - if node.wakuRelay.isSubscribed(pubsubTopic): - warn "No-effect API call to subscribe. Already subscribed to topic", pubsubTopic - return ok() - - node.registerRelayHandler(pubsubTopic, handler) - node.topicSubscriptionQueue.emit((kind: PubsubSub, topic: pubsubTopic)) - - return ok() - -proc unsubscribe*( - node: WakuNode, subscription: SubscriptionEvent -): Result[void, string] = - ## Unsubscribes from a specific PubSub or Content topic. - - if node.wakuRelay.isNil(): - error "Invalid API call to `unsubscribe`. WakuRelay not mounted." - return err("Invalid API call to `unsubscribe`. WakuRelay not mounted.") - - let (pubsubTopic, contentTopicOp) = - case subscription.kind - of ContentUnsub: - if node.wakuAutoSharding.isSome(): - let shard = node.wakuAutoSharding.get().getShard((subscription.topic)).valueOr: - error "Autosharding error", error = error - return err("Autosharding error: " & error) - ($shard, some(subscription.topic)) - else: - return err( - "Static sharding is used, relay subscriptions must specify a pubsub topic" - ) - of PubsubUnsub: - (subscription.topic, none(ContentTopic)) - else: - return err("Unsupported subscription type in relay unsubscribe") - - if not node.wakuRelay.isSubscribed(pubsubTopic): - warn "No-effect API call to `unsubscribe`. Was not subscribed", pubsubTopic - return ok() - - debug "unsubscribe", pubsubTopic, contentTopicOp - node.wakuRelay.unsubscribe(pubsubTopic) - node.topicSubscriptionQueue.emit((kind: PubsubUnsub, topic: pubsubTopic)) - - return ok() - -proc publish*( - node: WakuNode, pubsubTopicOp: Option[PubsubTopic], message: WakuMessage -): Future[Result[void, string]] {.async, gcsafe.} = - ## Publish a `WakuMessage`. Pubsub topic contains; none, a named or static shard. - ## `WakuMessage` should contain a `contentTopic` field for light node functionality. - ## It is also used to determine the shard. - - if node.wakuRelay.isNil(): - let msg = - "Invalid API call to `publish`. WakuRelay not mounted. Try `lightpush` instead." - error "publish error", err = msg - # TODO: Improve error handling - return err(msg) - - let pubsubTopic = pubsubTopicOp.valueOr: - if node.wakuAutoSharding.isNone(): - return err("Pubsub topic must be specified when static sharding is enabled.") - node.wakuAutoSharding.get().getShard(message.contentTopic).valueOr: - let msg = "Autosharding error: " & error - return err(msg) - - #TODO instead of discard return error when 0 peers received the message - discard await node.wakuRelay.publish(pubsubTopic, message) - - notice "waku.relay published", - peerId = node.peerId, - pubsubTopic = pubsubTopic, - msg_hash = pubsubTopic.computeMessageHash(message).to0xHex(), - publishTime = getNowInNanosecondTime() - - return ok() - proc startRelay*(node: WakuNode) {.async.} = ## Setup and start relay protocol info "starting relay protocol" @@ -504,1024 +342,11 @@ proc startRelay*(node: WakuNode) {.async.} = info "relay started successfully" -proc mountRelay*( - node: WakuNode, - peerExchangeHandler = none(RoutingRecordsHandler), - maxMessageSize = int(DefaultMaxWakuMessageSize), -): Future[Result[void, string]] {.async.} = - if not node.wakuRelay.isNil(): - error "wakuRelay already mounted, skipping" - return err("wakuRelay already mounted, skipping") - - ## The default relay topics is the union of all configured topics plus default PubsubTopic(s) - info "mounting relay protocol" - - node.wakuRelay = WakuRelay.new(node.switch, maxMessageSize).valueOr: - error "failed mounting relay protocol", error = error - return err("failed mounting relay protocol: " & error) - - ## Add peer exchange handler - if peerExchangeHandler.isSome(): - node.wakuRelay.parameters.enablePX = true - # Feature flag for peer exchange in nim-libp2p - node.wakuRelay.routingRecordsHandler.add(peerExchangeHandler.get()) - - if node.started: - await node.startRelay() - - node.switch.mount(node.wakuRelay, protocolMatcher(WakuRelayCodec)) - - info "relay mounted successfully" - return ok() - -## Waku filter - -proc mountFilter*( - node: WakuNode, - subscriptionTimeout: Duration = - filter_subscriptions.DefaultSubscriptionTimeToLiveSec, - maxFilterPeers: uint32 = filter_subscriptions.MaxFilterPeers, - maxFilterCriteriaPerPeer: uint32 = filter_subscriptions.MaxFilterCriteriaPerPeer, - messageCacheTTL: Duration = filter_subscriptions.MessageCacheTTL, - rateLimitSetting: RateLimitSetting = FilterDefaultPerPeerRateLimit, -) {.async: (raises: []).} = - ## Mounting filter v2 protocol - - info "mounting filter protocol" - node.wakuFilter = WakuFilter.new( - node.peerManager, - subscriptionTimeout, - maxFilterPeers, - maxFilterCriteriaPerPeer, - messageCacheTTL, - some(rateLimitSetting), - ) - - try: - await node.wakuFilter.start() - except CatchableError: - error "failed to start wakuFilter", error = getCurrentExceptionMsg() - - try: - node.switch.mount(node.wakuFilter, protocolMatcher(WakuFilterSubscribeCodec)) - except LPError: - error "failed to mount wakuFilter", error = getCurrentExceptionMsg() - -proc filterHandleMessage*( - node: WakuNode, pubsubTopic: PubsubTopic, message: WakuMessage -) {.async.} = - if node.wakuFilter.isNil(): - error "cannot handle filter message", error = "waku filter is required" - return - - await node.wakuFilter.handleMessage(pubsubTopic, message) - -proc mountFilterClient*(node: WakuNode) {.async: (raises: []).} = - ## Mounting both filter - ## Giving option for application level to choose btw own push message handling or - ## rely on node provided cache. - This only applies for v2 filter client - info "mounting filter client" - - if not node.wakuFilterClient.isNil(): - trace "Filter client already mounted." - return - - node.wakuFilterClient = WakuFilterClient.new(node.peerManager, node.rng) - - try: - await node.wakuFilterClient.start() - except CatchableError: - error "failed to start wakuFilterClient", error = getCurrentExceptionMsg() - - try: - node.switch.mount(node.wakuFilterClient, protocolMatcher(WakuFilterSubscribeCodec)) - except LPError: - error "failed to mount wakuFilterClient", error = getCurrentExceptionMsg() - -proc filterSubscribe*( - node: WakuNode, - pubsubTopic: Option[PubsubTopic], - contentTopics: ContentTopic | seq[ContentTopic], - peer: RemotePeerInfo | string, -): Future[FilterSubscribeResult] {.async: (raises: []).} = - ## Registers for messages that match a specific filter. Triggers the handler whenever a message is received. - if node.wakuFilterClient.isNil(): - error "cannot register filter subscription to topic", - error = "waku filter client is not set up" - return err(FilterSubscribeError.serviceUnavailable()) - - let remotePeerRes = parsePeerInfo(peer) - if remotePeerRes.isErr(): - error "Couldn't parse the peer info properly", error = remotePeerRes.error - return err(FilterSubscribeError.serviceUnavailable("No peers available")) - - let remotePeer = remotePeerRes.value - - if pubsubTopic.isSome(): - info "registering filter subscription to content", - pubsubTopic = pubsubTopic.get(), - contentTopics = contentTopics, - peer = remotePeer.peerId - - when (contentTopics is ContentTopic): - let contentTopics = @[contentTopics] - let subRes = await node.wakuFilterClient.subscribe( - remotePeer, pubsubTopic.get(), contentTopics - ) - if subRes.isOk(): - info "v2 subscribed to topic", - pubsubTopic = pubsubTopic, contentTopics = contentTopics - - # Purpose is to update Waku Metadata - node.topicSubscriptionQueue.emit((kind: PubsubSub, topic: pubsubTopic.get())) - else: - error "failed filter v2 subscription", error = subRes.error - waku_node_errors.inc(labelValues = ["subscribe_filter_failure"]) - - return subRes - elif node.wakuAutoSharding.isNone(): - error "Failed filter subscription, pubsub topic must be specified with static sharding" - waku_node_errors.inc(labelValues = ["subscribe_filter_failure"]) - else: - # No pubsub topic, autosharding is used to deduce it - # but content topics must be well-formed for this - let topicMapRes = - node.wakuAutoSharding.get().getShardsFromContentTopics(contentTopics) - - let topicMap = - if topicMapRes.isErr(): - error "can't get shard", error = topicMapRes.error - return err(FilterSubscribeError.badResponse("can't get shard")) - else: - topicMapRes.get() - - var futures = collect(newSeq): - for shard, topics in topicMap.pairs: - info "registering filter subscription to content", - shard = shard, contentTopics = topics, peer = remotePeer.peerId - let content = topics.mapIt($it) - node.wakuFilterClient.subscribe(remotePeer, $shard, content) - - var subRes: FilterSubscribeResult = FilterSubscribeResult.ok() - try: - let finished = await allFinished(futures) - - for fut in finished: - let res = fut.read() - - if res.isErr(): - error "failed filter subscription", error = res.error - waku_node_errors.inc(labelValues = ["subscribe_filter_failure"]) - subRes = FilterSubscribeResult.err(res.error) - - for pubsub, topics in topicMap.pairs: - info "subscribed to topic", pubsubTopic = pubsub, contentTopics = topics - - # Purpose is to update Waku Metadata - node.topicSubscriptionQueue.emit((kind: PubsubSub, topic: $pubsub)) - except CatchableError: - let errMsg = "exception in filterSubscribe: " & getCurrentExceptionMsg() - error "exception in filterSubscribe", error = getCurrentExceptionMsg() - waku_node_errors.inc(labelValues = ["subscribe_filter_failure"]) - subRes = - FilterSubscribeResult.err(FilterSubscribeError.serviceUnavailable(errMsg)) - - # return the last error or ok - return subRes - -proc filterUnsubscribe*( - node: WakuNode, - pubsubTopic: Option[PubsubTopic], - contentTopics: ContentTopic | seq[ContentTopic], - peer: RemotePeerInfo | string, -): Future[FilterSubscribeResult] {.async: (raises: []).} = - ## Unsubscribe from a content filter V2". - - let remotePeerRes = parsePeerInfo(peer) - if remotePeerRes.isErr(): - error "couldn't parse remotePeerInfo", error = remotePeerRes.error - return err(FilterSubscribeError.serviceUnavailable("No peers available")) - - let remotePeer = remotePeerRes.value - - if pubsubTopic.isSome(): - info "deregistering filter subscription to content", - pubsubTopic = pubsubTopic.get(), - contentTopics = contentTopics, - peer = remotePeer.peerId - - let unsubRes = await node.wakuFilterClient.unsubscribe( - remotePeer, pubsubTopic.get(), contentTopics - ) - if unsubRes.isOk(): - info "unsubscribed from topic", - pubsubTopic = pubsubTopic.get(), contentTopics = contentTopics - - # Purpose is to update Waku Metadata - node.topicSubscriptionQueue.emit((kind: PubsubUnsub, topic: pubsubTopic.get())) - else: - error "failed filter unsubscription", error = unsubRes.error - waku_node_errors.inc(labelValues = ["unsubscribe_filter_failure"]) - - return unsubRes - elif node.wakuAutoSharding.isNone(): - error "Failed filter un-subscription, pubsub topic must be specified with static sharding" - waku_node_errors.inc(labelValues = ["unsubscribe_filter_failure"]) - else: # pubsubTopic.isNone - let topicMapRes = - node.wakuAutoSharding.get().getShardsFromContentTopics(contentTopics) - - let topicMap = - if topicMapRes.isErr(): - error "can't get shard", error = topicMapRes.error - return err(FilterSubscribeError.badResponse("can't get shard")) - else: - topicMapRes.get() - - var futures = collect(newSeq): - for shard, topics in topicMap.pairs: - info "deregistering filter subscription to content", - shard = shard, contentTopics = topics, peer = remotePeer.peerId - let content = topics.mapIt($it) - node.wakuFilterClient.unsubscribe(remotePeer, $shard, content) - - var unsubRes: FilterSubscribeResult = FilterSubscribeResult.ok() - try: - let finished = await allFinished(futures) - - for fut in finished: - let res = fut.read() - - if res.isErr(): - error "failed filter unsubscription", error = res.error - waku_node_errors.inc(labelValues = ["unsubscribe_filter_failure"]) - unsubRes = FilterSubscribeResult.err(res.error) - - for pubsub, topics in topicMap.pairs: - info "unsubscribed from topic", pubsubTopic = pubsub, contentTopics = topics - - # Purpose is to update Waku Metadata - node.topicSubscriptionQueue.emit((kind: PubsubUnsub, topic: $pubsub)) - except CatchableError: - let errMsg = "exception in filterUnsubscribe: " & getCurrentExceptionMsg() - error "exception in filterUnsubscribe", error = getCurrentExceptionMsg() - waku_node_errors.inc(labelValues = ["unsubscribe_filter_failure"]) - unsubRes = - FilterSubscribeResult.err(FilterSubscribeError.serviceUnavailable(errMsg)) - - # return the last error or ok - return unsubRes - -proc filterUnsubscribeAll*( - node: WakuNode, peer: RemotePeerInfo | string -): Future[FilterSubscribeResult] {.async: (raises: []).} = - ## Unsubscribe from a content filter V2". - - let remotePeerRes = parsePeerInfo(peer) - if remotePeerRes.isErr(): - error "couldn't parse remotePeerInfo", error = remotePeerRes.error - return err(FilterSubscribeError.serviceUnavailable("No peers available")) - - let remotePeer = remotePeerRes.value - - info "deregistering all filter subscription to content", peer = remotePeer.peerId - - let unsubRes = await node.wakuFilterClient.unsubscribeAll(remotePeer) - if unsubRes.isOk(): - info "unsubscribed from all content-topic", peerId = remotePeer.peerId - else: - error "failed filter unsubscription from all content-topic", error = unsubRes.error - waku_node_errors.inc(labelValues = ["unsubscribe_filter_failure"]) - - return unsubRes - -# NOTICE: subscribe / unsubscribe methods are removed - they were already depricated -# yet incompatible to handle both type of filters - use specific filter registration instead - -## Waku archive -proc mountArchive*( - node: WakuNode, - driver: waku_archive.ArchiveDriver, - retentionPolicy = none(waku_archive.RetentionPolicy), -): Result[void, string] = - node.wakuArchive = waku_archive.WakuArchive.new( - driver = driver, retentionPolicy = retentionPolicy - ).valueOr: - return err("error in mountArchive: " & error) - - node.wakuArchive.start() - - return ok() - -proc mountLegacyArchive*( - node: WakuNode, driver: waku_archive_legacy.ArchiveDriver -): Result[void, string] = - node.wakuLegacyArchive = waku_archive_legacy.WakuArchive.new(driver = driver).valueOr: - return err("error in mountLegacyArchive: " & error) - - return ok() - -## Legacy Waku Store - -# TODO: Review this mapping logic. Maybe, move it to the appplication code -proc toArchiveQuery( - request: legacy_store_common.HistoryQuery -): waku_archive_legacy.ArchiveQuery = - waku_archive_legacy.ArchiveQuery( - pubsubTopic: request.pubsubTopic, - contentTopics: request.contentTopics, - cursor: request.cursor.map( - proc(cursor: HistoryCursor): waku_archive_legacy.ArchiveCursor = - waku_archive_legacy.ArchiveCursor( - pubsubTopic: cursor.pubsubTopic, - senderTime: cursor.senderTime, - storeTime: cursor.storeTime, - digest: cursor.digest, - ) - ), - startTime: request.startTime, - endTime: request.endTime, - pageSize: request.pageSize.uint, - direction: request.direction, - requestId: request.requestId, - ) - -# TODO: Review this mapping logic. Maybe, move it to the appplication code -proc toHistoryResult*( - res: waku_archive_legacy.ArchiveResult -): legacy_store_common.HistoryResult = - if res.isErr(): - let error = res.error - case res.error.kind - of waku_archive_legacy.ArchiveErrorKind.DRIVER_ERROR, - waku_archive_legacy.ArchiveErrorKind.INVALID_QUERY: - err(HistoryError(kind: HistoryErrorKind.BAD_REQUEST, cause: res.error.cause)) - else: - err(HistoryError(kind: HistoryErrorKind.UNKNOWN)) - else: - let response = res.get() - ok( - HistoryResponse( - messages: response.messages, - cursor: response.cursor.map( - proc(cursor: waku_archive_legacy.ArchiveCursor): HistoryCursor = - HistoryCursor( - pubsubTopic: cursor.pubsubTopic, - senderTime: cursor.senderTime, - storeTime: cursor.storeTime, - digest: cursor.digest, - ) - ), - ) - ) - -proc mountLegacyStore*( - node: WakuNode, rateLimit: RateLimitSetting = DefaultGlobalNonRelayRateLimit -) {.async.} = - info "mounting waku legacy store protocol" - - if node.wakuLegacyArchive.isNil(): - error "failed to mount waku legacy store protocol", error = "waku archive not set" - return - - # TODO: Review this handler logic. Maybe, move it to the appplication code - let queryHandler: HistoryQueryHandler = proc( - request: HistoryQuery - ): Future[legacy_store_common.HistoryResult] {.async.} = - if request.cursor.isSome(): - request.cursor.get().checkHistCursor().isOkOr: - return err(error) - - let request = request.toArchiveQuery() - let response = await node.wakuLegacyArchive.findMessagesV2(request) - return response.toHistoryResult() - - node.wakuLegacyStore = legacy_store.WakuStore.new( - node.peerManager, node.rng, queryHandler, some(rateLimit) - ) - - if node.started: - # Node has started already. Let's start store too. - await node.wakuLegacyStore.start() - - node.switch.mount( - node.wakuLegacyStore, protocolMatcher(legacy_store_common.WakuLegacyStoreCodec) - ) - -proc mountLegacyStoreClient*(node: WakuNode) = - info "mounting legacy store client" - - node.wakuLegacyStoreClient = - legacy_store_client.WakuStoreClient.new(node.peerManager, node.rng) - -proc query*( - node: WakuNode, query: legacy_store_common.HistoryQuery, peer: RemotePeerInfo -): Future[legacy_store_common.WakuStoreResult[legacy_store_common.HistoryResponse]] {. - async, gcsafe -.} = - ## Queries known nodes for historical messages - if node.wakuLegacyStoreClient.isNil(): - return err("waku legacy store client is nil") - - let queryRes = await node.wakuLegacyStoreClient.query(query, peer) - if queryRes.isErr(): - return err("legacy store client query error: " & $queryRes.error) - - let response = queryRes.get() - - return ok(response) - -# TODO: Move to application module (e.g., wakunode2.nim) -proc query*( - node: WakuNode, query: legacy_store_common.HistoryQuery -): Future[legacy_store_common.WakuStoreResult[legacy_store_common.HistoryResponse]] {. - async, gcsafe, deprecated: "Use 'node.query()' with peer destination instead" -.} = - ## Queries known nodes for historical messages - if node.wakuLegacyStoreClient.isNil(): - return err("waku legacy store client is nil") - - let peerOpt = node.peerManager.selectPeer(legacy_store_common.WakuLegacyStoreCodec) - if peerOpt.isNone(): - error "no suitable remote peers" - return err("peer_not_found_failure") - - return await node.query(query, peerOpt.get()) - -when defined(waku_exp_store_resume): - # TODO: Move to application module (e.g., wakunode2.nim) - proc resume*( - node: WakuNode, peerList: Option[seq[RemotePeerInfo]] = none(seq[RemotePeerInfo]) - ) {.async, gcsafe.} = - ## resume proc retrieves the history of waku messages published on the default waku pubsub topic since the last time the waku node has been online - ## for resume to work properly the waku node must have the store protocol mounted in the full mode (i.e., persisting messages) - ## messages are stored in the wakuStore's messages field and in the message db - ## the offline time window is measured as the difference between the current time and the timestamp of the most recent persisted waku message - ## an offset of 20 second is added to the time window to count for nodes asynchrony - ## peerList indicates the list of peers to query from. The history is fetched from the first available peer in this list. Such candidates should be found through a discovery method (to be developed). - ## if no peerList is passed, one of the peers in the underlying peer manager unit of the store protocol is picked randomly to fetch the history from. - ## The history gets fetched successfully if the dialed peer has been online during the queried time window. - if node.wakuLegacyStoreClient.isNil(): - return - - let retrievedMessages = await node.wakuLegacyStoreClient.resume(peerList) - if retrievedMessages.isErr(): - error "failed to resume store", error = retrievedMessages.error - return - - info "the number of retrieved messages since the last online time: ", - number = retrievedMessages.value - -## Waku Store - -proc toArchiveQuery(request: StoreQueryRequest): waku_archive.ArchiveQuery = - var query = waku_archive.ArchiveQuery() - - query.includeData = request.includeData - query.pubsubTopic = request.pubsubTopic - query.contentTopics = request.contentTopics - query.startTime = request.startTime - query.endTime = request.endTime - query.hashes = request.messageHashes - query.cursor = request.paginationCursor - query.direction = request.paginationForward - query.requestId = request.requestId - - if request.paginationLimit.isSome(): - query.pageSize = uint(request.paginationLimit.get()) - - return query - -proc toStoreResult(res: waku_archive.ArchiveResult): StoreQueryResult = - let response = res.valueOr: - return err(StoreError.new(300, "archive error: " & $error)) - - var res = StoreQueryResponse() - - res.statusCode = 200 - res.statusDesc = "OK" - - for i in 0 ..< response.hashes.len: - let hash = response.hashes[i] - - let kv = store_common.WakuMessageKeyValue(messageHash: hash) - - res.messages.add(kv) - - for i in 0 ..< response.messages.len: - res.messages[i].message = some(response.messages[i]) - res.messages[i].pubsubTopic = some(response.topics[i]) - - res.paginationCursor = response.cursor - - return ok(res) - -proc mountStore*( - node: WakuNode, rateLimit: RateLimitSetting = DefaultGlobalNonRelayRateLimit -) {.async.} = - if node.wakuArchive.isNil(): - error "failed to mount waku store protocol", error = "waku archive not set" - return - - info "mounting waku store protocol" - - let requestHandler: StoreQueryRequestHandler = proc( - request: StoreQueryRequest - ): Future[StoreQueryResult] {.async.} = - let request = request.toArchiveQuery() - let response = await node.wakuArchive.findMessages(request) - - return response.toStoreResult() - - node.wakuStore = - store.WakuStore.new(node.peerManager, node.rng, requestHandler, some(rateLimit)) - - if node.started: - await node.wakuStore.start() - - node.switch.mount(node.wakuStore, protocolMatcher(store_common.WakuStoreCodec)) - -proc mountStoreClient*(node: WakuNode) = - info "mounting store client" - - node.wakuStoreClient = store_client.WakuStoreClient.new(node.peerManager, node.rng) - -proc query*( - node: WakuNode, request: store_common.StoreQueryRequest, peer: RemotePeerInfo -): Future[store_common.WakuStoreResult[store_common.StoreQueryResponse]] {. - async, gcsafe -.} = - ## Queries known nodes for historical messages - if node.wakuStoreClient.isNil(): - return err("waku store v3 client is nil") - - let response = (await node.wakuStoreClient.query(request, peer)).valueOr: - var res = StoreQueryResponse() - res.statusCode = uint32(error.kind) - res.statusDesc = $error - - return ok(res) - - return ok(response) - -proc setupStoreResume*(node: WakuNode) = - node.wakuStoreResume = StoreResume.new( - node.peerManager, node.wakuArchive, node.wakuStoreClient - ).valueOr: - error "Failed to setup Store Resume", error = $error - return - -## Waku lightpush -proc mountLegacyLightPush*( - node: WakuNode, rateLimit: RateLimitSetting = DefaultGlobalNonRelayRateLimit -) {.async.} = - info "mounting legacy light push" - - let pushHandler = - if node.wakuRelay.isNil: - debug "mounting legacy lightpush without relay (nil)" - legacy_lightpush_protocol.getNilPushHandler() - else: - debug "mounting legacy lightpush with relay" - let rlnPeer = - if isNil(node.wakuRlnRelay): - debug "mounting legacy lightpush without rln-relay" - none(WakuRLNRelay) - else: - debug "mounting legacy lightpush with rln-relay" - some(node.wakuRlnRelay) - legacy_lightpush_protocol.getRelayPushHandler(node.wakuRelay, rlnPeer) - - node.wakuLegacyLightPush = - WakuLegacyLightPush.new(node.peerManager, node.rng, pushHandler, some(rateLimit)) - - if node.started: - # Node has started already. Let's start lightpush too. - await node.wakuLegacyLightPush.start() - - node.switch.mount(node.wakuLegacyLightPush, protocolMatcher(WakuLegacyLightPushCodec)) - -proc mountLegacyLightPushClient*(node: WakuNode) = - info "mounting legacy light push client" - - if node.wakuLegacyLightpushClient.isNil(): - node.wakuLegacyLightpushClient = - WakuLegacyLightPushClient.new(node.peerManager, node.rng) - -proc legacyLightpushPublish*( - node: WakuNode, - pubsubTopic: Option[PubsubTopic], - message: WakuMessage, - peer: RemotePeerInfo, -): Future[legacy_lightpush_protocol.WakuLightPushResult[string]] {.async, gcsafe.} = - ## Pushes a `WakuMessage` to a node which relays it further on PubSub topic. - ## Returns whether relaying was successful or not. - ## `WakuMessage` should contain a `contentTopic` field for light node - ## functionality. - if node.wakuLegacyLightpushClient.isNil() and node.wakuLegacyLightPush.isNil(): - error "failed to publish message as legacy lightpush not available" - return err("Waku lightpush not available") - - let internalPublish = proc( - node: WakuNode, - pubsubTopic: PubsubTopic, - message: WakuMessage, - peer: RemotePeerInfo, - ): Future[legacy_lightpush_protocol.WakuLightPushResult[string]] {.async, gcsafe.} = - let msgHash = pubsubTopic.computeMessageHash(message).to0xHex() - if not node.wakuLegacyLightpushClient.isNil(): - notice "publishing message with legacy lightpush", - pubsubTopic = pubsubTopic, - contentTopic = message.contentTopic, - target_peer_id = peer.peerId, - msg_hash = msgHash - return await node.wakuLegacyLightpushClient.publish(pubsubTopic, message, peer) - - if not node.wakuLegacyLightPush.isNil(): - notice "publishing message with self hosted legacy lightpush", - pubsubTopic = pubsubTopic, - contentTopic = message.contentTopic, - target_peer_id = peer.peerId, - msg_hash = msgHash - return - await node.wakuLegacyLightPush.handleSelfLightPushRequest(pubsubTopic, message) - try: - if pubsubTopic.isSome(): - return await internalPublish(node, pubsubTopic.get(), message, peer) - - if node.wakuAutoSharding.isNone(): - return err("Pubsub topic must be specified when static sharding is enabled") - let topicMapRes = - node.wakuAutoSharding.get().getShardsFromContentTopics(message.contentTopic) - - let topicMap = - if topicMapRes.isErr(): - return err(topicMapRes.error) - else: - topicMapRes.get() - - for pubsub, _ in topicMap.pairs: # There's only one pair anyway - return await internalPublish(node, $pubsub, message, peer) - except CatchableError: - return err(getCurrentExceptionMsg()) - -# TODO: Move to application module (e.g., wakunode2.nim) -proc legacyLightpushPublish*( - node: WakuNode, pubsubTopic: Option[PubsubTopic], message: WakuMessage -): Future[legacy_lightpush_protocol.WakuLightPushResult[string]] {. - async, gcsafe, deprecated: "Use 'node.legacyLightpushPublish()' instead" -.} = - if node.wakuLegacyLightpushClient.isNil() and node.wakuLegacyLightPush.isNil(): - error "failed to publish message as legacy lightpush not available" - return err("waku legacy lightpush not available") - - var peerOpt: Option[RemotePeerInfo] = none(RemotePeerInfo) - if not node.wakuLegacyLightpushClient.isNil(): - peerOpt = node.peerManager.selectPeer(WakuLegacyLightPushCodec) - if peerOpt.isNone(): - let msg = "no suitable remote peers" - error "failed to publish message", err = msg - return err(msg) - elif not node.wakuLegacyLightPush.isNil(): - peerOpt = some(RemotePeerInfo.init($node.switch.peerInfo.peerId)) - - return await node.legacyLightpushPublish(pubsubTopic, message, peer = peerOpt.get()) - -proc mountLightPush*( - node: WakuNode, rateLimit: RateLimitSetting = DefaultGlobalNonRelayRateLimit -) {.async.} = - info "mounting light push" - - let pushHandler = - if node.wakuRelay.isNil(): - debug "mounting lightpush v2 without relay (nil)" - lightpush_protocol.getNilPushHandler() - else: - debug "mounting lightpush with relay" - let rlnPeer = - if isNil(node.wakuRlnRelay): - debug "mounting lightpush without rln-relay" - none(WakuRLNRelay) - else: - debug "mounting lightpush with rln-relay" - some(node.wakuRlnRelay) - lightpush_protocol.getRelayPushHandler(node.wakuRelay, rlnPeer) - - node.wakuLightPush = WakuLightPush.new( - node.peerManager, node.rng, pushHandler, node.wakuAutoSharding, some(rateLimit) - ) - - if node.started: - # Node has started already. Let's start lightpush too. - await node.wakuLightPush.start() - - node.switch.mount(node.wakuLightPush, protocolMatcher(WakuLightPushCodec)) - -proc mountLightPushClient*(node: WakuNode) = - info "mounting light push client" - - if node.wakuLightpushClient.isNil(): - node.wakuLightpushClient = WakuLightPushClient.new(node.peerManager, node.rng) - -proc lightpushPublishHandler( - node: WakuNode, - pubsubTopic: PubsubTopic, - message: WakuMessage, - peer: RemotePeerInfo | PeerInfo, - mixify: bool = false, -): Future[lightpush_protocol.WakuLightPushResult] {.async.} = - let msgHash = pubsubTopic.computeMessageHash(message).to0xHex() - - if not node.wakuLightpushClient.isNil(): - notice "publishing message with lightpush", - pubsubTopic = pubsubTopic, - contentTopic = message.contentTopic, - target_peer_id = peer.peerId, - msg_hash = msgHash, - mixify = mixify - if mixify: #indicates we want to use mix to send the message - #TODO: How to handle multiple addresses? - let conn = node.wakuMix.toConnection( - MixDestination.init(peer.peerId, peer.addrs[0]), - WakuLightPushCodec, - Opt.some( - MixParameters(expectReply: Opt.some(true), numSurbs: Opt.some(byte(1))) - # indicating we only want a single path to be used for reply hence numSurbs = 1 - ), - ).valueOr: - error "could not create mix connection" - return lighpushErrorResult( - LightPushErrorCode.SERVICE_NOT_AVAILABLE, - "Waku lightpush with mix not available", - ) - - return await node.wakuLightpushClient.publishWithConn( - pubsubTopic, message, conn, peer.peerId - ) - else: - return await node.wakuLightpushClient.publish(some(pubsubTopic), message, peer) - - if not node.wakuLightPush.isNil(): - if mixify: - error "mixify is not supported with self hosted lightpush" - return lighpushErrorResult( - LightPushErrorCode.SERVICE_NOT_AVAILABLE, - "Waku lightpush with mix not available", - ) - notice "publishing message with self hosted lightpush", - pubsubTopic = pubsubTopic, - contentTopic = message.contentTopic, - target_peer_id = peer.peerId, - msg_hash = msgHash - return - await node.wakuLightPush.handleSelfLightPushRequest(some(pubsubTopic), message) - -proc lightpushPublish*( - node: WakuNode, - pubsubTopic: Option[PubsubTopic], - message: WakuMessage, - peerOpt: Option[RemotePeerInfo] = none(RemotePeerInfo), - mixify: bool = false, -): Future[lightpush_protocol.WakuLightPushResult] {.async.} = - if node.wakuLightpushClient.isNil() and node.wakuLightPush.isNil(): - error "failed to publish message as lightpush not available" - return lighpushErrorResult( - LightPushErrorCode.SERVICE_NOT_AVAILABLE, "Waku lightpush not available" - ) - if mixify and node.wakuMix.isNil(): - error "failed to publish message using mix as mix protocol is not mounted" - return lighpushErrorResult( - LightPushErrorCode.SERVICE_NOT_AVAILABLE, "Waku lightpush with mix not available" - ) - let toPeer: RemotePeerInfo = peerOpt.valueOr: - if not node.wakuLightPush.isNil(): - RemotePeerInfo.init(node.peerId()) - elif not node.wakuLightpushClient.isNil(): - node.peerManager.selectPeer(WakuLightPushCodec).valueOr: - let msg = "no suitable remote peers" - error "failed to publish message", msg = msg - return lighpushErrorResult(LightPushErrorCode.NO_PEERS_TO_RELAY, msg) - else: - return lighpushErrorResult( - LightPushErrorCode.NO_PEERS_TO_RELAY, "no suitable remote peers" - ) - - let pubsubForPublish = pubSubTopic.valueOr: - if node.wakuAutoSharding.isNone(): - let msg = "Pubsub topic must be specified when static sharding is enabled" - error "lightpush publish error", error = msg - return lighpushErrorResult(LightPushErrorCode.INVALID_MESSAGE, msg) - - let parsedTopic = NsContentTopic.parse(message.contentTopic).valueOr: - let msg = "Invalid content-topic:" & $error - error "lightpush request handling error", error = msg - return lighpushErrorResult(LightPushErrorCode.INVALID_MESSAGE, msg) - - node.wakuAutoSharding.get().getShard(parsedTopic).valueOr: - let msg = "Autosharding error: " & error - error "lightpush publish error", error = msg - return lighpushErrorResult(LightPushErrorCode.INTERNAL_SERVER_ERROR, msg) - - return await lightpushPublishHandler(node, pubsubForPublish, message, toPeer, mixify) - -## Waku RLN Relay -proc mountRlnRelay*( - node: WakuNode, - rlnConf: WakuRlnConfig, - spamHandler = none(SpamHandler), - registrationHandler = none(RegistrationHandler), -) {.async.} = - info "mounting rln relay" - - if node.wakuRelay.isNil(): - raise newException( - CatchableError, "WakuRelay protocol is not mounted, cannot mount WakuRlnRelay" - ) - - let rlnRelayRes = await WakuRlnRelay.new(rlnConf, registrationHandler) - if rlnRelayRes.isErr(): - raise - newException(CatchableError, "failed to mount WakuRlnRelay: " & rlnRelayRes.error) - let rlnRelay = rlnRelayRes.get() - if (rlnConf.userMessageLimit > rlnRelay.groupManager.rlnRelayMaxMessageLimit): - error "rln-relay-user-message-limit can't exceed the MAX_MESSAGE_LIMIT in the rln contract" - let validator = generateRlnValidator(rlnRelay, spamHandler) - - # register rln validator as default validator - debug "Registering RLN validator" - node.wakuRelay.addValidator(validator, "RLN validation failed") - - node.wakuRlnRelay = rlnRelay - -## Waku peer-exchange - -proc mountPeerExchange*( - node: WakuNode, - cluster: Option[uint16] = none(uint16), - rateLimit: RateLimitSetting = DefaultGlobalNonRelayRateLimit, -) {.async: (raises: []).} = - info "mounting waku peer exchange" - - node.wakuPeerExchange = - WakuPeerExchange.new(node.peerManager, cluster, some(rateLimit)) - - if node.started: - try: - await node.wakuPeerExchange.start() - except CatchableError: - error "failed to start wakuPeerExchange", error = getCurrentExceptionMsg() - - try: - node.switch.mount(node.wakuPeerExchange, protocolMatcher(WakuPeerExchangeCodec)) - except LPError: - error "failed to mount wakuPeerExchange", error = getCurrentExceptionMsg() - -proc mountPeerExchangeClient*(node: WakuNode) {.async: (raises: []).} = - info "mounting waku peer exchange client" - if node.wakuPeerExchangeClient.isNil(): - node.wakuPeerExchangeClient = WakuPeerExchangeClient.new(node.peerManager) - -proc fetchPeerExchangePeers*( - node: Wakunode, amount = DefaultPXNumPeersReq -): Future[Result[int, PeerExchangeResponseStatus]] {.async: (raises: []).} = - if node.wakuPeerExchangeClient.isNil(): - error "could not get peers from px, waku peer-exchange-client is nil" - return err( - ( - status_code: PeerExchangeResponseStatusCode.SERVICE_UNAVAILABLE, - status_desc: some("PeerExchangeClient is not mounted"), - ) - ) - - info "Retrieving peer info via peer exchange protocol", amount - let pxPeersRes = await node.wakuPeerExchangeClient.request(amount) - if pxPeersRes.isOk(): - var validPeers = 0 - let peers = pxPeersRes.get().peerInfos - for pi in peers: - var record: enr.Record - if enr.fromBytes(record, pi.enr): - node.peerManager.addPeer(record.toRemotePeerInfo().get, PeerExchange) - validPeers += 1 - info "Retrieved peer info via peer exchange protocol", - validPeers = validPeers, totalPeers = peers.len - return ok(validPeers) - else: - warn "failed to retrieve peer info via peer exchange protocol", - error = pxPeersRes.error - return err(pxPeersRes.error) - -proc peerExchangeLoop(node: WakuNode) {.async.} = - while true: - if not node.started: - await sleepAsync(5.seconds) - continue - (await node.fetchPeerExchangePeers()).isOkOr: - warn "Cannot fetch peers from peer exchange", cause = error - await sleepAsync(1.minutes) - -proc startPeerExchangeLoop*(node: WakuNode) = - if node.wakuPeerExchangeClient.isNil(): - error "startPeerExchangeLoop: Peer Exchange is not mounted" - return - info "Starting peer exchange loop" - node.wakuPeerExchangeClient.pxLoopHandle = node.peerExchangeLoop() - -# TODO: Move to application module (e.g., wakunode2.nim) -proc setPeerExchangePeer*( - node: WakuNode, peer: RemotePeerInfo | MultiAddress | string -) = - if node.wakuPeerExchange.isNil(): - error "could not set peer, waku peer-exchange is nil" - return - - info "Set peer-exchange peer", peer = peer - - let remotePeerRes = parsePeerInfo(peer) - if remotePeerRes.isErr(): - error "could not parse peer info", error = remotePeerRes.error - return - - node.peerManager.addPeer(remotePeerRes.value, PeerExchange) - waku_px_peers.inc() - -## Other protocols - -proc mountLibp2pPing*(node: WakuNode) {.async: (raises: []).} = - info "mounting libp2p ping protocol" - - try: - node.libp2pPing = Ping.new(rng = node.rng) - except Exception as e: - error "failed to create ping", error = getCurrentExceptionMsg() - - if node.started: - # Node has started already. Let's start ping too. - try: - await node.libp2pPing.start() - except CatchableError: - error "failed to start libp2pPing", error = getCurrentExceptionMsg() - - try: - node.switch.mount(node.libp2pPing) - except LPError: - error "failed to mount libp2pPing", error = getCurrentExceptionMsg() - -proc pingPeer(node: WakuNode, peerId: PeerId): Future[Result[void, string]] {.async.} = - ## Ping a single peer and return the result - - try: - # Establish a stream - let stream = (await node.peerManager.dialPeer(peerId, PingCodec)).valueOr: - error "pingPeer: failed dialing peer", peerId = peerId - return err("pingPeer failed dialing peer peerId: " & $peerId) - defer: - # Always close the stream - try: - await stream.close() - except CatchableError as e: - debug "Error closing ping connection", peerId = peerId, error = e.msg - - # Perform ping - let pingDuration = await node.libp2pPing.ping(stream) - - trace "Ping successful", peerId = peerId, duration = pingDuration - return ok() - except CatchableError as e: - error "pingPeer: exception raised pinging peer", peerId = peerId, error = e.msg - return err("pingPeer: exception raised pinging peer: " & e.msg) - proc selectRandomPeers*(peers: seq[PeerId], numRandomPeers: int): seq[PeerId] = var randomPeers = peers shuffle(randomPeers) return randomPeers[0 ..< min(len(randomPeers), numRandomPeers)] -# Returns the number of succesful pings performed -proc parallelPings*(node: WakuNode, peerIds: seq[PeerId]): Future[int] {.async.} = - if len(peerIds) == 0: - return 0 - - var pingFuts: seq[Future[Result[void, string]]] - - # Create ping futures for each peer - for i, peerId in peerIds: - let fut = pingPeer(node, peerId) - pingFuts.add(fut) - - # Wait for all pings to complete - discard await allFutures(pingFuts).withTimeout(5.seconds) - - var successCount = 0 - for fut in pingFuts: - if not fut.completed() or fut.failed(): - continue - - let res = fut.read() - if res.isOk(): - successCount.inc() - - return successCount - proc mountRendezvous*(node: WakuNode, clusterId: uint16) {.async: (raises: []).} = info "mounting rendezvous discovery protocol" diff --git a/waku/utils/noise.nim b/waku/utils/noise.nim index 2e0159203..c225ad9bf 100644 --- a/waku/utils/noise.nim +++ b/waku/utils/noise.nim @@ -12,11 +12,9 @@ proc decodePayloadV2*( case message.version of 2: # We attempt to decode the WakuMessage payload - let deserializedPayload2 = deserializePayloadV2(message.payload) - if deserializedPayload2.isOk(): - return ok(deserializedPayload2.get()) - else: + let deserializedPayload2 = deserializePayloadV2(message.payload).valueOr: return err("Failed to decode WakuMessage") + return ok(deserializedPayload2) else: return err("Wrong message version while decoding payload") @@ -28,13 +26,11 @@ proc encodePayloadV2*( raises: [NoiseMalformedHandshake, NoisePublicKeyError] .} = # We attempt to encode the PayloadV2 - let serializedPayload2 = serializePayloadV2(payload2) - if not serializedPayload2.isOk(): + let serializedPayload2 = serializePayloadV2(payload2).valueOr: return err("Failed to encode PayloadV2") # If successful, we create and return a WakuMessage - let msg = WakuMessage( - payload: serializedPayload2.get(), version: 2, contentTopic: contentTopic - ) + let msg = + WakuMessage(payload: serializedPayload2, version: 2, contentTopic: contentTopic) return ok(msg) diff --git a/waku/waku_api/rest/legacy_store/handlers.nim b/waku/waku_api/rest/legacy_store/handlers.nim index d960f24ea..0113111bd 100644 --- a/waku/waku_api/rest/legacy_store/handlers.nim +++ b/waku/waku_api/rest/legacy_store/handlers.nim @@ -70,30 +70,27 @@ proc parseCursor( digest: Option[string], ): Result[Option[HistoryCursor], string] = # Parse sender time - let parsedSenderTime = parseTime(senderTime) - if not parsedSenderTime.isOk(): - return err(parsedSenderTime.error) + let parsedSenderTime = parseTime(senderTime).valueOr: + return err(error) # Parse store time - let parsedStoreTime = parseTime(storeTime) - if not parsedStoreTime.isOk(): - return err(parsedStoreTime.error) + let parsedStoreTime = parseTime(storeTime).valueOr: + return err(error) # Parse message digest - let parsedMsgDigest = parseMsgDigest(digest) - if not parsedMsgDigest.isOk(): - return err(parsedMsgDigest.error) + let parsedMsgDigest = parseMsgDigest(digest).valueOr: + return err(error) # Parse cursor information - if parsedPubsubTopic.isSome() and parsedSenderTime.value.isSome() and - parsedStoreTime.value.isSome() and parsedMsgDigest.value.isSome(): + if parsedPubsubTopic.isSome() and parsedSenderTime.isSome() and + parsedStoreTime.isSome() and parsedMsgDigest.isSome(): return ok( some( HistoryCursor( pubsubTopic: parsedPubsubTopic.get(), - senderTime: parsedSenderTime.value.get(), - storeTime: parsedStoreTime.value.get(), - digest: parsedMsgDigest.value.get(), + senderTime: parsedSenderTime.get(), + storeTime: parsedStoreTime.get(), + digest: parsedMsgDigest.get(), ) ) ) @@ -225,16 +222,14 @@ proc installStoreApiHandlers*( endTime.toOpt(), pageSize.toOpt(), ascending.toOpt(), - ) - - if not histQuery.isOk(): - return RestApiResponse.badRequest(histQuery.error) + ).valueOr: + return RestApiResponse.badRequest(error) if peerAddr.isNone() and not node.wakuLegacyStore.isNil(): ## The user didn't specify a peer address and self-node is configured as a store node. ## In this case we assume that the user is willing to retrieve the messages stored by ## the local/self store node. - return await node.retrieveMsgsFromSelfNode(histQuery.get()) + return await node.retrieveMsgsFromSelfNode(histQuery) # Parse the peer address parameter let parsedPeerAddr = parseUrlPeerAddr(peerAddr.toOpt()).valueOr: @@ -253,4 +248,4 @@ proc installStoreApiHandlers*( "No suitable service peer & none discovered" ) - return await node.performHistoryQuery(histQuery.value, peerAddr) + return await node.performHistoryQuery(histQuery, peerAddr) diff --git a/waku/waku_api/rest/legacy_store/types.nim b/waku/waku_api/rest/legacy_store/types.nim index eee3ac2d8..53a96bd69 100644 --- a/waku/waku_api/rest/legacy_store/types.nim +++ b/waku/waku_api/rest/legacy_store/types.nim @@ -60,13 +60,11 @@ proc parseMsgDigest*( return ok(none(waku_store_common.MessageDigest)) let decodedUrl = decodeUrl(input.get()) - let base64Decoded = base64.decode(Base64String(decodedUrl)) + let base64DecodedArr = base64.decode(Base64String(decodedUrl)).valueOr: + return err(error) + var messageDigest = waku_store_common.MessageDigest() - if not base64Decoded.isOk(): - return err(base64Decoded.error) - - let base64DecodedArr = base64Decoded.get() # Next snippet inspired by "nwaku/waku/waku_archive/archive.nim" # TODO: Improve coherence of MessageDigest type messageDigest = block: @@ -220,10 +218,9 @@ proc readValue*( of "data": if data.isSome(): reader.raiseUnexpectedField("Multiple `data` fields found", "MessageDigest") - let decoded = base64.decode(reader.readValue(Base64String)) - if not decoded.isOk(): + let decoded = base64.decode(reader.readValue(Base64String)).valueOr: reader.raiseUnexpectedField("Failed decoding data", "MessageDigest") - data = some(decoded.get()) + data = some(decoded) else: reader.raiseUnexpectedField("Unrecognided field", cstring(fieldName)) diff --git a/waku/waku_api/rest/server.nim b/waku/waku_api/rest/server.nim index f16dfe83f..e5db5ee5e 100644 --- a/waku/waku_api/rest/server.nim +++ b/waku/waku_api/rest/server.nim @@ -91,7 +91,7 @@ proc new*( ): Future[HttpResponseRef] {.async: (raises: [CancelledError]).} = discard - let sres = HttpServerRef.new( + server.httpServer = HttpServerRef.new( address, defaultProcessCallback, serverFlags, @@ -106,12 +106,9 @@ proc new*( maxRequestBodySize, dualstack = dualstack, middlewares = middlewares, - ) - if sres.isOk(): - server.httpServer = sres.get() - ok(server) - else: - err(sres.error) + ).valueOr: + return err(error) + return ok(server) proc getRouter(): RestRouter = # TODO: Review this `validate` method. Check in nim-presto what is this used for. diff --git a/waku/waku_archive/driver/postgres_driver/postgres_driver.nim b/waku/waku_archive/driver/postgres_driver/postgres_driver.nim index 1518f7a3b..3d5260641 100644 --- a/waku/waku_archive/driver/postgres_driver/postgres_driver.nim +++ b/waku/waku_archive/driver/postgres_driver/postgres_driver.nim @@ -51,7 +51,7 @@ const SelectNoCursorAscStmtDef = const SelectNoCursorNoDataAscStmtName = "SelectWithoutCursorAndDataAsc" const SelectNoCursorNoDataAscStmtDef = - """SELECT messageHash FROM messages + """SELECT messageHash FROM messages WHERE contentTopic IN ($1) AND messageHash IN ($2) AND pubsubTopic = $3 AND @@ -196,7 +196,7 @@ proc hashCallbackImpl( pqResult: ptr PGresult, rows: var seq[(WakuMessageHash, PubsubTopic, WakuMessage)] ) = ## Callback to get a hash out of the DB. - ## Used when queries only ask for hashes + ## Used when queries only ask for hashes let numFields = pqResult.pqnfields() if numFields != 1: @@ -1233,33 +1233,30 @@ proc refreshPartitionsInfo( debug "refreshPartitionsInfo" self.partitionMngr.clearPartitionInfo() - let partitionNamesRes = await self.getPartitionsList() - if not partitionNamesRes.isOk(): - return err("Could not retrieve partitions list: " & $partitionNamesRes.error) - else: - let partitionNames = partitionNamesRes.get() - for partitionName in partitionNames: - ## partitionName contains something like 'messages_1708449815_1708449875' - let bothTimes = partitionName.replace("messages_", "") - let times = bothTimes.split("_") - if times.len != 2: - return err(fmt"loopPartitionFactory wrong partition name {partitionName}") + let partitionNames = (await self.getPartitionsList()).valueOr: + return err("could not get partitions list: " & $error) + for partitionName in partitionNames: + ## partitionName contains something like 'messages_1708449815_1708449875' + let bothTimes = partitionName.replace("messages_", "") + let times = bothTimes.split("_") + if times.len != 2: + return err(fmt"loopPartitionFactory wrong partition name {partitionName}") - var beginning: int64 - try: - beginning = parseInt(times[0]) - except ValueError: - return err("Could not parse beginning time: " & getCurrentExceptionMsg()) + var beginning: int64 + try: + beginning = parseInt(times[0]) + except ValueError: + return err("Could not parse beginning time: " & getCurrentExceptionMsg()) - var `end`: int64 - try: - `end` = parseInt(times[1]) - except ValueError: - return err("Could not parse end time: " & getCurrentExceptionMsg()) + var `end`: int64 + try: + `end` = parseInt(times[1]) + except ValueError: + return err("Could not parse end time: " & getCurrentExceptionMsg()) - self.partitionMngr.addPartitionInfo(partitionName, beginning, `end`) + self.partitionMngr.addPartitionInfo(partitionName, beginning, `end`) - return ok() + return ok() const DefaultDatabasePartitionCheckTimeInterval = timer.minutes(10) @@ -1338,10 +1335,7 @@ proc removePartition( let partitionName = partition.getName() debug "beginning of removePartition", partitionName - var partSize = "" - let partSizeRes = await self.getTableSize(partitionName) - if partSizeRes.isOk(): - partSize = partSizeRes.get() + let partSize = (await self.getTableSize(partitionName)).valueOr("") ## Detach and remove the partition concurrently to not block the parent table (messages) let detachPartitionQuery = diff --git a/waku/waku_archive/driver/postgres_driver/postgres_healthcheck.nim b/waku/waku_archive/driver/postgres_driver/postgres_healthcheck.nim index 43d205cf6..2138a54f7 100644 --- a/waku/waku_archive/driver/postgres_driver/postgres_healthcheck.nim +++ b/waku/waku_archive/driver/postgres_driver/postgres_healthcheck.nim @@ -24,8 +24,7 @@ proc checkConnectivity*( var numTrial = 0 while numTrial < MaxNumTrials: - let res = await connPool.pgQuery(HealthCheckQuery) - if res.isOk(): + (await connPool.pgQuery(HealthCheckQuery)).isErrOr: ## Connection resumed. Let's go back to the normal healthcheck. break errorBlock diff --git a/waku/waku_archive_legacy/driver/postgres_driver/postgres_healthcheck.nim b/waku/waku_archive_legacy/driver/postgres_driver/postgres_healthcheck.nim index 4c9f170c9..23678538e 100644 --- a/waku/waku_archive_legacy/driver/postgres_driver/postgres_healthcheck.nim +++ b/waku/waku_archive_legacy/driver/postgres_driver/postgres_healthcheck.nim @@ -24,8 +24,7 @@ proc checkConnectivity*( var numTrial = 0 while numTrial < MaxNumTrials: - let res = await connPool.pgQuery(HealthCheckQuery) - if res.isOk(): + (await connPool.pgQuery(HealthCheckQuery)).isErrOr: ## Connection resumed. Let's go back to the normal healthcheck. break errorBlock diff --git a/waku/waku_metadata/protocol.nim b/waku/waku_metadata/protocol.nim index 01aaf027c..e7f92e103 100644 --- a/waku/waku_metadata/protocol.nim +++ b/waku/waku_metadata/protocol.nim @@ -1,7 +1,7 @@ {.push raises: [].} import - std/[options, sequtils, sets], + std/[options, sequtils], results, chronicles, chronos, diff --git a/waku/waku_mix/protocol.nim b/waku/waku_mix/protocol.nim index fbb9beee2..ba0e8fc92 100644 --- a/waku/waku_mix/protocol.nim +++ b/waku/waku_mix/protocol.nim @@ -7,7 +7,6 @@ import mix/mix_protocol, mix/mix_node, mix/mix_metrics, - mix/tag_manager, libp2p/[multiaddress, multicodec, peerid], eth/common/keys diff --git a/waku/waku_node.nim b/waku/waku_node.nim index 74415e9de..c81e49bb6 100644 --- a/waku/waku_node.nim +++ b/waku/waku_node.nim @@ -2,6 +2,7 @@ import ./node/net_config, ./node/waku_switch as switch, ./node/waku_node as node, - ./node/health_monitor as health_monitor + ./node/health_monitor as health_monitor, + ./node/api as api -export net_config, switch, node, health_monitor +export net_config, switch, node, health_monitor, api diff --git a/waku/waku_rln_relay/group_manager/group_manager_base.nim b/waku/waku_rln_relay/group_manager/group_manager_base.nim index 9ddcdee54..de2962e42 100644 --- a/waku/waku_rln_relay/group_manager/group_manager_base.nim +++ b/waku/waku_rln_relay/group_manager/group_manager_base.nim @@ -1,10 +1,6 @@ -import - ../../common/error_handling, - ../protocol_types, - ../protocol_metrics, - ../constants, - ../rln -import options, chronos, results, std/[deques, sequtils] +import ../../common/error_handling, ../protocol_types, ../protocol_metrics, ../constants + +import options, chronos, results, std/[deques] export options, chronos, results, protocol_types, protocol_metrics, deques diff --git a/waku/waku_rln_relay/group_manager/on_chain/rpc_wrapper.nim b/waku/waku_rln_relay/group_manager/on_chain/rpc_wrapper.nim index 867e9e7f0..2c47b11fa 100644 --- a/waku/waku_rln_relay/group_manager/on_chain/rpc_wrapper.nim +++ b/waku/waku_rln_relay/group_manager/on_chain/rpc_wrapper.nim @@ -1,5 +1,4 @@ import - os, web3, web3/eth_api_types, web3/primitives, @@ -8,14 +7,12 @@ import nimcrypto/keccak as keccak, stint, json, - std/[strutils, tables, algorithm], - stew/[byteutils, arrayops], + std/strutils, + stew/byteutils, sequtils import ../../../waku_keystore, - ../../rln, - ../../rln/rln_interface, ../../conversion_utils, ../../protocol_types, ../group_manager_base diff --git a/waku/waku_rln_relay/protocol_metrics.nim b/waku/waku_rln_relay/protocol_metrics.nim index 6a21146e1..1551f022e 100644 --- a/waku/waku_rln_relay/protocol_metrics.nim +++ b/waku/waku_rln_relay/protocol_metrics.nim @@ -1,6 +1,6 @@ {.push raises: [].} -import chronicles, metrics, metrics/chronos_httpserver, ./constants, ../utils/collector +import chronicles, metrics, metrics/chronos_httpserver, ../utils/collector export metrics diff --git a/waku/waku_rln_relay/rln/wrappers.nim b/waku/waku_rln_relay/rln/wrappers.nim index a5f870122..eebdc5851 100644 --- a/waku/waku_rln_relay/rln/wrappers.nim +++ b/waku/waku_rln_relay/rln/wrappers.nim @@ -6,7 +6,7 @@ import stew/[arrayops, byteutils, endians2], stint, results, - std/[sequtils, strutils, tables, tempfiles] + std/[sequtils, strutils, tables] import ./rln_interface, ../conversion_utils, ../protocol_types, ../protocol_metrics import ../../waku_core, ../../waku_keystore