diff --git a/apps/liteprotocoltester/tester_message.nim b/apps/liteprotocoltester/tester_message.nim index e91a2a370..eeff7b531 100644 --- a/apps/liteprotocoltester/tester_message.nim +++ b/apps/liteprotocoltester/tester_message.nim @@ -87,7 +87,7 @@ proc readValue*( ) size = some(reader.readValue(uint64)) else: - unrecognizedFieldWarning() + unrecognizedFieldWarning(value) if sender.isNone(): reader.raiseUnexpectedValue("Field `sender` is missing") diff --git a/library/events/json_message_event.nim b/library/events/json_message_event.nim index 04dc0b8b4..ffb6065f5 100644 --- a/library/events/json_message_event.nim +++ b/library/events/json_message_event.nim @@ -1,9 +1,10 @@ -import system, results, std/json +import system, results, std/json, std/strutils import stew/byteutils import ../../waku/common/base64, ../../waku/waku_core/message, ../../waku/waku_core/message/message, + ../utils, ./json_base_event type JsonMessage* = ref object # https://rfc.vac.dev/spec/36/#jsonmessage-type @@ -15,16 +16,20 @@ type JsonMessage* = ref object # https://rfc.vac.dev/spec/36/#jsonmessage-type meta*: Base64String proof*: Base64String -func fromJsonNode*(T: type JsonMessage, jsonContent: JsonNode): JsonMessage = +func fromJsonNode*( + T: type JsonMessage, jsonContent: JsonNode +): Result[JsonMessage, string] = # Visit https://rfc.vac.dev/spec/14/ for further details - JsonMessage( - payload: Base64String(jsonContent["payload"].getStr()), - contentTopic: jsonContent["contentTopic"].getStr(), - version: uint32(jsonContent{"version"}.getInt()), - timestamp: int64(jsonContent{"timestamp"}.getBiggestInt()), - ephemeral: jsonContent{"ephemeral"}.getBool(), - meta: Base64String(jsonContent{"meta"}.getStr()), - proof: Base64String(jsonContent{"proof"}.getStr()), + ok( + JsonMessage( + payload: Base64String(jsonContent["payload"].getStr()), + contentTopic: jsonContent["contentTopic"].getStr(), + version: uint32(jsonContent{"version"}.getInt()), + timestamp: (?jsonContent.getProtoInt64("timestamp")).get(0), + ephemeral: jsonContent{"ephemeral"}.getBool(), + meta: Base64String(jsonContent{"meta"}.getStr()), + proof: Base64String(jsonContent{"proof"}.getStr()), + ) ) proc toWakuMessage*(self: JsonMessage): Result[WakuMessage, string] = diff --git a/library/libwaku.nim b/library/libwaku.nim index f51ab4aae..13022f879 100644 --- a/library/libwaku.nim +++ b/library/libwaku.nim @@ -27,7 +27,8 @@ import ./waku_thread/inter_thread_communication/requests/ping_request, ./waku_thread/inter_thread_communication/waku_thread_request, ./alloc, - ./ffi_types + ./ffi_types, + ../waku/factory/app_callbacks ################################################################################ ### Wrapper around the waku node @@ -138,10 +139,14 @@ proc waku_new( ctx.userData = userData + let appCallbacks = AppCallbacks(relayHandler: onReceivedMessage(ctx)) + let retCode = handleRequest( ctx, RequestType.LIFECYCLE, - NodeLifecycleRequest.createShared(NodeLifecycleMsgType.CREATE_NODE, configJson), + NodeLifecycleRequest.createShared( + NodeLifecycleMsgType.CREATE_NODE, configJson, appCallbacks + ), callback, userData, ) @@ -267,7 +272,8 @@ proc waku_relay_publish( var jsonMessage: JsonMessage try: let jsonContent = parseJson($jwm) - jsonMessage = JsonMessage.fromJsonNode(jsonContent) + jsonMessage = JsonMessage.fromJsonNode(jsonContent).valueOr: + raise newException(JsonParsingError, $error) except JsonParsingError: deallocShared(jwm) let msg = fmt"Error parsing json message: {getCurrentExceptionMsg()}" @@ -371,7 +377,7 @@ proc waku_relay_unsubscribe( ctx, RequestType.RELAY, RelayRequest.createShared( - RelayMsgType.SUBSCRIBE, + RelayMsgType.UNSUBSCRIBE, PubsubTopic($pst), WakuRelayHandler(onReceivedMessage(ctx)), ), @@ -495,7 +501,8 @@ proc waku_lightpush_publish( var jsonMessage: JsonMessage try: let jsonContent = parseJson($jwm) - jsonMessage = JsonMessage.fromJsonNode(jsonContent) + jsonMessage = JsonMessage.fromJsonNode(jsonContent).valueOr: + raise newException(JsonParsingError, $error) except JsonParsingError: let msg = fmt"Error parsing json message: {getCurrentExceptionMsg()}" callback(RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg)), userData) diff --git a/library/utils.nim b/library/utils.nim new file mode 100644 index 000000000..926ec4e88 --- /dev/null +++ b/library/utils.nim @@ -0,0 +1,20 @@ +import std/[json, options, strutils] +import results + +proc getProtoInt64*(node: JsonNode, key: string): Result[Option[int64], string] = + try: + let (value, ok) = + if node.hasKey(key): + if node[key].kind == JString: + (parseBiggestInt(node[key].getStr()), true) + else: + (node[key].getBiggestInt(), true) + else: + (0, false) + + if ok: + return ok(some(value)) + + return ok(none(int64)) + except CatchableError: + return err("Invalid int64 value in `" & key & "`") diff --git a/library/waku_thread/inter_thread_communication/requests/node_lifecycle_request.nim b/library/waku_thread/inter_thread_communication/requests/node_lifecycle_request.nim index 2b2edf038..087a78d3e 100644 --- a/library/waku_thread/inter_thread_communication/requests/node_lifecycle_request.nim +++ b/library/waku_thread/inter_thread_communication/requests/node_lifecycle_request.nim @@ -7,6 +7,7 @@ import ../../../../waku/factory/waku, ../../../../waku/factory/node_factory, ../../../../waku/factory/networks_config, + ../../../../waku/factory/app_callbacks, ../../../alloc type NodeLifecycleMsgType* = enum @@ -17,12 +18,17 @@ type NodeLifecycleMsgType* = enum type NodeLifecycleRequest* = object operation: NodeLifecycleMsgType configJson: cstring ## Only used in 'CREATE_NODE' operation + appCallbacks: AppCallbacks proc createShared*( - T: type NodeLifecycleRequest, op: NodeLifecycleMsgType, configJson: cstring = "" + T: type NodeLifecycleRequest, + op: NodeLifecycleMsgType, + configJson: cstring = "", + appCallbacks: AppCallbacks = nil, ): ptr type T = var ret = createShared(T) ret[].operation = op + ret[].appCallbacks = appCallbacks ret[].configJson = configJson.alloc() return ret @@ -30,7 +36,9 @@ proc destroyShared(self: ptr NodeLifecycleRequest) = deallocShared(self[].configJson) deallocShared(self) -proc createWaku(configJson: cstring): Future[Result[Waku, string]] {.async.} = +proc createWaku( + configJson: cstring, appCallbacks: AppCallbacks = nil +): Future[Result[Waku, string]] {.async.} = var conf = defaultWakuNodeConf().valueOr: return err("Failed creating node: " & error) @@ -59,7 +67,7 @@ proc createWaku(configJson: cstring): Future[Result[Waku, string]] {.async.} = formattedString & ". expected type: " & $typeof(confValue) ) - let wakuRes = Waku.new(conf).valueOr: + let wakuRes = Waku.new(conf, appCallbacks).valueOr: error "waku initialization failed", error = error return err("Failed setting up Waku: " & $error) @@ -73,7 +81,7 @@ proc process*( case self.operation of CREATE_NODE: - waku[] = (await createWaku(self.configJson)).valueOr: + waku[] = (await createWaku(self.configJson, self.appCallbacks)).valueOr: error "CREATE_NODE failed", error = error return err("error processing createWaku request: " & $error) of START_NODE: diff --git a/library/waku_thread/inter_thread_communication/requests/protocols/store_request.nim b/library/waku_thread/inter_thread_communication/requests/protocols/store_request.nim index ee2b608c3..3e2523fec 100644 --- a/library/waku_thread/inter_thread_communication/requests/protocols/store_request.nim +++ b/library/waku_thread/inter_thread_communication/requests/protocols/store_request.nim @@ -3,6 +3,7 @@ import chronos, chronicles, results import ../../../../../waku/factory/waku, ../../../../alloc, + ../../../../utils, ../../../../../waku/waku_core/peers, ../../../../../waku/waku_core/time, ../../../../../waku/waku_core/message/digest, @@ -24,7 +25,7 @@ type StoreRequest* = object func fromJsonNode( T: type JsonStoreQueryRequest, jsonContent: JsonNode -): StoreQueryRequest = +): Result[StoreQueryRequest, string] = let contentTopics = collect(newSeq): for cTopic in jsonContent["content_topics"].getElems(): cTopic.getStr() @@ -45,18 +46,6 @@ func fromJsonNode( else: none(string) - let startTime = - if jsonContent.contains("time_start"): - some(Timestamp(jsonContent["time_start"].getInt())) - else: - none(Timestamp) - - let endTime = - if jsonContent.contains("time_end"): - some(Timestamp(jsonContent["time_end"].getInt())) - else: - none(Timestamp) - let paginationCursor = if jsonContent.contains("pagination_cursor"): var hash: WakuMessageHash @@ -79,17 +68,19 @@ func fromJsonNode( else: none(uint64) - return StoreQueryRequest( - requestId: jsonContent["request_id"].getStr(), - includeData: jsonContent["include_data"].getBool(), - pubsubTopic: pubsubTopic, - contentTopics: contentTopics, - startTime: startTime, - endTime: endTime, - messageHashes: msgHashes, - paginationCursor: paginationCursor, - paginationForward: paginationForward, - paginationLimit: paginationLimit, + return ok( + StoreQueryRequest( + requestId: jsonContent["request_id"].getStr(), + includeData: jsonContent["include_data"].getBool(), + pubsubTopic: pubsubTopic, + contentTopics: contentTopics, + startTime: ?jsonContent.getProtoInt64("time_start"), + endTime: ?jsonContent.getProtoInt64("time_end"), + messageHashes: msgHashes, + paginationCursor: paginationCursor, + paginationForward: paginationForward, + paginationLimit: paginationLimit, + ) ) proc createShared*( @@ -128,7 +119,7 @@ proc process( let peer = peers.parsePeerInfo(($self[].peerAddr).split(",")).valueOr: return err("JsonStoreQueryRequest failed to parse peer addr: " & $error) - let queryResponse = (await waku.node.wakuStoreClient.query(storeQueryRequest, peer)).valueOr: + let queryResponse = (await waku.node.wakuStoreClient.query(?storeQueryRequest, peer)).valueOr: return err("JsonStoreQueryRequest failed store query: " & $error) return ok($(%*queryResponse)) ## returning the response in json format diff --git a/tests/factory/test_node_factory.nim b/tests/factory/test_node_factory.nim index bc3dc0f80..c575c2b81 100644 --- a/tests/factory/test_node_factory.nim +++ b/tests/factory/test_node_factory.nim @@ -17,7 +17,7 @@ suite "Node Factory": node.wakuStore.isNil() node.wakuFilter.isNil() not node.wakuStoreClient.isNil() - not node.rendezvous.isNil() + not node.wakuRendezvous.isNil() test "Set up a node with Store enabled": var conf = defaultTestWakuNodeConf() diff --git a/tests/test_waku_rendezvous.nim b/tests/test_waku_rendezvous.nim index c005a8f34..65967b79a 100644 --- a/tests/test_waku_rendezvous.nim +++ b/tests/test_waku_rendezvous.nim @@ -1,51 +1,63 @@ {.used.} -import chronos, testutils/unittests, libp2p/builders, libp2p/protocols/rendezvous +import std/options, chronos, testutils/unittests, libp2p/builders -import waku/node/waku_switch, ./testlib/common, ./testlib/wakucore - -proc newRendezvousClientSwitch(rdv: RendezVous): Switch = - SwitchBuilder - .new() - .withRng(rng()) - .withAddresses(@[MultiAddress.init("/ip4/0.0.0.0/tcp/0").tryGet()]) - .withTcpTransport() - .withMplex() - .withNoise() - .withRendezVous(rdv) - .build() +import + waku/waku_core/peers, + waku/node/waku_node, + waku/node/peer_manager/peer_manager, + waku/waku_rendezvous/protocol, + ./testlib/[wakucore, wakunode] procSuite "Waku Rendezvous": - asyncTest "Waku Switch uses Rendezvous": - ## Setup - + asyncTest "Simple remote test": let - wakuClient = RendezVous.new() - sourceClient = RendezVous.new() - destClient = RendezVous.new() - wakuSwitch = newRendezvousClientSwitch(wakuClient) #rendezvous point - sourceSwitch = newRendezvousClientSwitch(sourceClient) #client - destSwitch = newRendezvousClientSwitch(destClient) #client + clusterId = 10.uint16 + node1 = newTestWakuNode( + generateSecp256k1Key(), + parseIpAddress("0.0.0.0"), + Port(0), + clusterId = clusterId, + ) + node2 = newTestWakuNode( + generateSecp256k1Key(), + parseIpAddress("0.0.0.0"), + Port(0), + clusterId = clusterId, + ) + node3 = newTestWakuNode( + generateSecp256k1Key(), + parseIpAddress("0.0.0.0"), + Port(0), + clusterId = clusterId, + ) - # Setup client rendezvous - wakuClient.setup(wakuSwitch) - sourceClient.setup(sourceSwitch) - destClient.setup(destSwitch) + await allFutures( + [node1.mountRendezvous(), node2.mountRendezvous(), node3.mountRendezvous()] + ) + await allFutures([node1.start(), node2.start(), node3.start()]) - await allFutures(wakuSwitch.start(), sourceSwitch.start(), destSwitch.start()) + let peerInfo1 = node1.switch.peerInfo.toRemotePeerInfo() + let peerInfo2 = node2.switch.peerInfo.toRemotePeerInfo() + let peerInfo3 = node3.switch.peerInfo.toRemotePeerInfo() - # Connect clients to the rendezvous point - await sourceSwitch.connect(wakuSwitch.peerInfo.peerId, wakuSwitch.peerInfo.addrs) - await destSwitch.connect(wakuSwitch.peerInfo.peerId, wakuSwitch.peerInfo.addrs) + node1.peerManager.addPeer(peerInfo2) + node2.peerManager.addPeer(peerInfo1) + node2.peerManager.addPeer(peerInfo3) + node3.peerManager.addPeer(peerInfo2) - let res0 = await sourceClient.request("empty") - check res0.len == 0 + let namespace = "test/name/space" + + let res = await node1.wakuRendezvous.batchAdvertise( + namespace, 60.seconds, @[peerInfo2.peerId] + ) + assert res.isOk(), $res.error + + let response = + await node3.wakuRendezvous.batchRequest(namespace, 1, @[peerInfo2.peerId]) + assert response.isOk(), $response.error + let records = response.get() - # Check that source client gets peer info of dest client from rendezvous point - await sourceClient.advertise("foo") - let res1 = await destClient.request("foo") check: - res1.len == 1 - res1[0] == sourceSwitch.peerInfo.signedPeerRecord.data - - await allFutures(wakuSwitch.stop(), sourceSwitch.stop(), destSwitch.stop()) + records.len == 1 + records[0].peerId == peerInfo1.peerId diff --git a/tests/testlib/wakunode.nim b/tests/testlib/wakunode.nim index 01e58697f..8cacf5317 100644 --- a/tests/testlib/wakunode.nim +++ b/tests/testlib/wakunode.nim @@ -40,6 +40,7 @@ proc defaultTestWakuNodeConf*(): WakuNodeConf = clusterId: DefaultClusterId, shards: @[DefaultShardId], relay: true, + rendezvous: true, storeMessageDbUrl: "sqlite://store.sqlite3", ) diff --git a/tests/wakunode2/test_app.nim b/tests/wakunode2/test_app.nim index 67a6556c8..73ffc8f93 100644 --- a/tests/wakunode2/test_app.nim +++ b/tests/wakunode2/test_app.nim @@ -62,7 +62,7 @@ suite "Wakunode2 - Waku initialization": node.wakuArchive.isNil() node.wakuStore.isNil() not node.wakuStoreClient.isNil() - not node.rendezvous.isNil() + not node.wakuRendezvous.isNil() ## Cleanup waitFor waku.stop() @@ -92,7 +92,7 @@ suite "Wakunode2 - Waku initialization": node.wakuArchive.isNil() node.wakuStore.isNil() not node.wakuStoreClient.isNil() - not node.rendezvous.isNil() + not node.wakuRendezvous.isNil() # DS structures are updated with dynamic ports typedNodeEnr.get().tcp.get() != 0 diff --git a/vendor/nimbus-build-system b/vendor/nimbus-build-system index 741274439..8fafcd0ba 160000 --- a/vendor/nimbus-build-system +++ b/vendor/nimbus-build-system @@ -1 +1 @@ -Subproject commit 741274439ce72162ab3c740e7c0ef624d32725f9 +Subproject commit 8fafcd0bac9f409091b7bcaee62ab6330f57441e diff --git a/waku/factory/app_callbacks.nim b/waku/factory/app_callbacks.nim new file mode 100644 index 000000000..ffab59c24 --- /dev/null +++ b/waku/factory/app_callbacks.nim @@ -0,0 +1,4 @@ +import ../waku_relay/protocol + +type AppCallbacks* = ref object + relayHandler*: WakuRelayHandler diff --git a/waku/factory/external_config.nim b/waku/factory/external_config.nim index 4b08086ba..fc45389f5 100644 --- a/waku/factory/external_config.nim +++ b/waku/factory/external_config.nim @@ -647,6 +647,13 @@ with the drawback of consuming some more bandwidth.""", name: "peer-exchange-node" .}: string + ## Rendez vous + rendezvous* {. + desc: "Enable waku rendezvous discovery server", + defaultValue: true, + name: "rendezvous" + .}: bool + ## websocket config websocketSupport* {. desc: "Enable websocket: true|false", diff --git a/waku/factory/node_factory.nim b/waku/factory/node_factory.nim index d98a99546..80734d0b8 100644 --- a/waku/factory/node_factory.nim +++ b/waku/factory/node_factory.nim @@ -124,6 +124,16 @@ proc getNumShardsInNetwork*(conf: WakuNodeConf): uint32 = # https://github.com/waku-org/specs/blob/master/standards/core/relay-sharding.md#static-sharding return uint32(MaxShardIndex + 1) +proc getAutoshards*( + node: WakuNode, contentTopics: seq[string] +): Result[seq[RelayShard], string] = + var autoShards: seq[RelayShard] + for contentTopic in contentTopics: + let shard = node.wakuSharding.getShard(contentTopic).valueOr: + return err("Could not parse content topic: " & error) + autoShards.add(shard) + return ok(autoshards) + proc setupProtocols( node: WakuNode, conf: WakuNodeConf, nodeKey: crypto.PrivateKey ): Future[Result[void, string]] {.async.} = @@ -169,11 +179,8 @@ proc setupProtocols( peerExchangeHandler = some(handlePeerExchange) - var autoShards: seq[RelayShard] - for contentTopic in conf.contentTopics: - let shard = node.wakuSharding.getShard(contentTopic).valueOr: - return err("Could not parse content topic: " & error) - autoShards.add(shard) + let autoShards = node.getAutoshards(conf.contentTopics).valueOr: + return err("Could not get autoshards: " & error) debug "Shards created from content topics", contentTopics = conf.contentTopics, shards = autoShards @@ -207,12 +214,9 @@ proc setupProtocols( protectedShard = shardKey.shard, publicKey = shardKey.key node.wakuRelay.addSignedShardsValidator(subscribedProtectedShards, conf.clusterId) - # Enable Rendezvous Discovery protocol when Relay is enabled - try: - await mountRendezvous(node) - except CatchableError: - return - err("failed to mount waku rendezvous protocol: " & getCurrentExceptionMsg()) + # Only relay nodes should be rendezvous points. + if conf.rendezvous: + await node.mountRendezvous() # Keepalive mounted on all nodes try: diff --git a/waku/factory/waku.nim b/waku/factory/waku.nim index 8d23cbaa5..4fed2f1dc 100644 --- a/waku/factory/waku.nim +++ b/waku/factory/waku.nim @@ -42,6 +42,7 @@ import ../factory/node_factory, ../factory/internal_config, ../factory/external_config, + ../factory/app_callbacks, ../waku_enr/multiaddr logScope: @@ -67,6 +68,7 @@ type Waku* = ref object restServer*: WakuRestServerRef metricsServer*: MetricsHttpServerRef + appCallbacks*: AppCallbacks proc logConfig(conf: WakuNodeConf) = info "Configuration: Enabled protocols", @@ -146,7 +148,32 @@ proc newCircuitRelay(isRelayClient: bool): Relay = return RelayClient.new() return Relay.new() -proc new*(T: type Waku, confCopy: var WakuNodeConf): Result[Waku, string] = +proc setupAppCallbacks( + node: WakuNode, conf: WakuNodeConf, appCallbacks: AppCallbacks +): Result[void, string] = + if appCallbacks.isNil(): + info "No external callbacks to be set" + return ok() + + if not appCallbacks.relayHandler.isNil(): + if node.wakuRelay.isNil(): + return err("Cannot configure relayHandler callback without Relay mounted") + + let autoShards = node.getAutoshards(conf.contentTopics).valueOr: + return err("Could not get autoshards: " & error) + + let confShards = + conf.shards.mapIt(RelayShard(clusterId: conf.clusterId, shardId: uint16(it))) + let shards = confShards & autoShards + + for shard in shards: + discard node.wakuRelay.subscribe($shard, appCallbacks.relayHandler) + + return ok() + +proc new*( + T: type Waku, confCopy: var WakuNodeConf, appCallbacks: AppCallbacks = nil +): Result[Waku, string] = let rng = crypto.newRng() logging.setupLog(confCopy.logLevel, confCopy.logFormat) @@ -225,6 +252,10 @@ proc new*(T: type Waku, confCopy: var WakuNodeConf): Result[Waku, string] = let node = nodeRes.get() + node.setupAppCallbacks(confCopy, appCallbacks).isOkOr: + error "Failed setting up app callbacks", error = error + return err("Failed setting up app callbacks: " & $error) + ## Delivery Monitor var deliveryMonitor: DeliveryMonitor if confCopy.reliabilityEnabled: @@ -246,6 +277,7 @@ proc new*(T: type Waku, confCopy: var WakuNodeConf): Result[Waku, string] = key: confCopy.nodekey.get(), node: node, deliveryMonitor: deliveryMonitor, + appCallbacks: appCallbacks, ) waku.setupSwitchServices(confCopy, relay, rng) @@ -413,16 +445,6 @@ proc startWaku*(waku: ptr Waku): Future[Result[void, string]] {.async.} = if not waku[].deliveryMonitor.isNil(): waku[].deliveryMonitor.startDeliveryMonitor() - ## libp2p DiscoveryManager - waku[].discoveryMngr = DiscoveryManager() - waku[].discoveryMngr.add( - RendezVousInterface.new(rdv = waku[].node.rendezvous, tta = 1.minutes) - ) - if not isNil(waku[].node.wakuRelay): - for topic in waku[].node.wakuRelay.getSubscribedTopics(): - debug "advertise rendezvous namespace", topic - waku[].discoveryMngr.advertise(RdvNamespace(topic)) - return ok() # Waku shutdown diff --git a/waku/node/waku_node.nim b/waku/node/waku_node.nim index 50b65bfc5..591962472 100644 --- a/waku/node/waku_node.nim +++ b/waku/node/waku_node.nim @@ -17,7 +17,6 @@ import libp2p/protocols/pubsub/rpc/messages, libp2p/protocols/connectivity/autonat/client, libp2p/protocols/connectivity/autonat/service, - libp2p/protocols/rendezvous, libp2p/builders, libp2p/transports/transport, libp2p/transports/tcptransport, @@ -39,6 +38,7 @@ import ../waku_filter_v2/client as filter_client, ../waku_filter_v2/subscriptions as filter_subscriptions, ../waku_metadata, + ../waku_rendezvous/protocol, ../waku_lightpush/client as lightpush_client, ../waku_lightpush/common, ../waku_lightpush/protocol, @@ -110,7 +110,7 @@ type enr*: enr.Record libp2pPing*: Ping rng*: ref rand.HmacDrbgContext - rendezvous*: RendezVous + wakuRendezvous*: WakuRendezVous announcedAddresses*: seq[MultiAddress] started*: bool # Indicates that node has started listening topicSubscriptionQueue*: AsyncEventQueue[SubscriptionEvent] @@ -1217,22 +1217,16 @@ proc startKeepalive*(node: WakuNode, keepalive = 2.minutes) = proc mountRendezvous*(node: WakuNode) {.async: (raises: []).} = info "mounting rendezvous discovery protocol" - try: - node.rendezvous = RendezVous.new(node.switch) - except Exception as e: - error "failed to create rendezvous", error = getCurrentExceptionMsg() + node.wakuRendezvous = WakuRendezVous.new(node.switch, node.peerManager, node.enr).valueOr: + error "initializing waku rendezvous failed", error = error return - if node.started: - try: - await node.rendezvous.start() - except CatchableError: - error "failed to start rendezvous", error = getCurrentExceptionMsg() + # Always start discovering peers at startup + (await node.wakuRendezvous.initialRequestAll()).isOkOr: + error "rendezvous failed initial requests", error = error - try: - node.switch.mount(node.rendezvous) - except LPError: - error "failed to mount rendezvous", error = getCurrentExceptionMsg() + if node.started: + await node.wakuRendezvous.start() proc isBindIpWithZeroPort(inputMultiAdd: MultiAddress): bool = let inputStr = $inputMultiAdd @@ -1304,6 +1298,9 @@ proc start*(node: WakuNode) {.async.} = if not node.wakuStoreResume.isNil(): await node.wakuStoreResume.start() + if not node.wakuRendezvous.isNil(): + await node.wakuRendezvous.start() + ## The switch uses this mapper to update peer info addrs ## with announced addrs after start let addressMapper = proc( @@ -1346,6 +1343,9 @@ proc stop*(node: WakuNode) {.async.} = if not node.wakuPeerExchange.isNil() and not node.wakuPeerExchange.pxLoopHandle.isNil(): await node.wakuPeerExchange.pxLoopHandle.cancelAndWait() + if not node.wakuRendezvous.isNil(): + await node.wakuRendezvous.stopWait() + node.started = false proc isReady*(node: WakuNode): Future[bool] {.async: (raises: [Exception]).} = diff --git a/waku/waku_api/rest/admin/types.nim b/waku/waku_api/rest/admin/types.nim index fc6470658..bb7dd2b0c 100644 --- a/waku/waku_api/rest/admin/types.nim +++ b/waku/waku_api/rest/admin/types.nim @@ -83,7 +83,7 @@ proc readValue*( ) connected = some(reader.readValue(bool)) else: - unrecognizedFieldWarning() + unrecognizedFieldWarning(value) if connected.isNone(): reader.raiseUnexpectedValue("Field `connected` is missing") @@ -116,7 +116,7 @@ proc readValue*( reader.raiseUnexpectedField("Multiple `origin` fields found", "WakuPeer") origin = some(reader.readValue(PeerOrigin)) else: - unrecognizedFieldWarning() + unrecognizedFieldWarning(value) if multiaddr.isNone(): reader.raiseUnexpectedValue("Field `multiaddr` is missing") @@ -153,7 +153,7 @@ proc readValue*( ) contentTopic = some(reader.readValue(string)) else: - unrecognizedFieldWarning() + unrecognizedFieldWarning(value) if pubsubTopic.isNone(): reader.raiseUnexpectedValue("Field `pubsubTopic` is missing") @@ -185,7 +185,7 @@ proc readValue*( ) filterCriteria = some(reader.readValue(seq[FilterTopic])) else: - unrecognizedFieldWarning() + unrecognizedFieldWarning(value) if peerId.isNone(): reader.raiseUnexpectedValue("Field `peerId` is missing") diff --git a/waku/waku_api/rest/debug/types.nim b/waku/waku_api/rest/debug/types.nim index 0ca52c4a5..8fa1068f9 100644 --- a/waku/waku_api/rest/debug/types.nim +++ b/waku/waku_api/rest/debug/types.nim @@ -2,6 +2,7 @@ import chronicles, json_serialization, json_serialization/std/options import ../../../waku_node, ../serdes +import std/typetraits #### Types @@ -47,7 +48,7 @@ proc readValue*( reader.raiseUnexpectedField("Multiple `enrUri` fields found", "DebugWakuInfo") enrUri = some(reader.readValue(string)) else: - unrecognizedFieldWarning() + unrecognizedFieldWarning(value) if listenAddresses.isNone(): reader.raiseUnexpectedValue("Field `listenAddresses` is missing") diff --git a/waku/waku_api/rest/filter/types.nim b/waku/waku_api/rest/filter/types.nim index 0506a7a74..6d18e7f6e 100644 --- a/waku/waku_api/rest/filter/types.nim +++ b/waku/waku_api/rest/filter/types.nim @@ -187,7 +187,7 @@ proc readValue*( of "ephemeral": ephemeral = some(reader.readValue(bool)) else: - unrecognizedFieldWarning() + unrecognizedFieldWarning(value) if payload.isNone(): reader.raiseUnexpectedValue("Field `payload` is missing") @@ -225,7 +225,7 @@ proc readValue*( of "contentFilters": contentFilters = some(reader.readValue(seq[ContentTopic])) else: - unrecognizedFieldWarning() + unrecognizedFieldWarning(value) if contentFilters.isNone(): reader.raiseUnexpectedValue("Field `contentFilters` is missing") @@ -262,7 +262,7 @@ proc readValue*( of "requestId": requestId = some(reader.readValue(string)) else: - unrecognizedFieldWarning() + unrecognizedFieldWarning(value) if requestId.isNone(): reader.raiseUnexpectedValue("Field `requestId` is missing") @@ -296,7 +296,7 @@ proc readValue*( of "contentFilters": contentFilters = some(reader.readValue(seq[ContentTopic])) else: - unrecognizedFieldWarning() + unrecognizedFieldWarning(value) if requestId.isNone(): reader.raiseUnexpectedValue("Field `requestId` is missing") @@ -344,7 +344,7 @@ proc readValue*( of "contentFilters": contentFilters = some(reader.readValue(seq[ContentTopic])) else: - unrecognizedFieldWarning() + unrecognizedFieldWarning(value) if requestId.isNone(): reader.raiseUnexpectedValue("Field `requestId` is missing") @@ -385,7 +385,7 @@ proc readValue*( of "requestId": requestId = some(reader.readValue(string)) else: - unrecognizedFieldWarning() + unrecognizedFieldWarning(value) if requestId.isNone(): reader.raiseUnexpectedValue("Field `requestId` is missing") @@ -416,7 +416,7 @@ proc readValue*( of "statusDesc": statusDesc = some(reader.readValue(string)) else: - unrecognizedFieldWarning() + unrecognizedFieldWarning(value) if requestId.isNone(): reader.raiseUnexpectedValue("Field `requestId` is missing") diff --git a/waku/waku_api/rest/health/types.nim b/waku/waku_api/rest/health/types.nim index db76f5b8c..ce58ab711 100644 --- a/waku/waku_api/rest/health/types.nim +++ b/waku/waku_api/rest/health/types.nim @@ -65,7 +65,7 @@ proc readValue*( protocolsHealth = some(reader.readValue(seq[ProtocolHealth])) else: - unrecognizedFieldWarning() + unrecognizedFieldWarning(value) if nodeHealth.isNone(): reader.raiseUnexpectedValue("Field `nodeHealth` is missing") diff --git a/waku/waku_api/rest/lightpush/types.nim b/waku/waku_api/rest/lightpush/types.nim index f499600b7..60368403f 100644 --- a/waku/waku_api/rest/lightpush/types.nim +++ b/waku/waku_api/rest/lightpush/types.nim @@ -52,7 +52,7 @@ proc readValue*( of "message": message = some(reader.readValue(RelayWakuMessage)) else: - unrecognizedFieldWarning() + unrecognizedFieldWarning(value) if message.isNone(): reader.raiseUnexpectedValue("Field `message` is missing") diff --git a/waku/waku_api/rest/relay/types.nim b/waku/waku_api/rest/relay/types.nim index 1ffb59384..6da0f401e 100644 --- a/waku/waku_api/rest/relay/types.nim +++ b/waku/waku_api/rest/relay/types.nim @@ -117,7 +117,7 @@ proc readValue*( of "ephemeral": ephemeral = some(reader.readValue(bool)) else: - unrecognizedFieldWarning() + unrecognizedFieldWarning(value) if payload.isNone() or isEmptyOrWhitespace(string(payload.get())): reader.raiseUnexpectedValue("Field `payload` is missing or empty") diff --git a/waku/waku_api/rest/serdes.nim b/waku/waku_api/rest/serdes.nim index 6c2657415..2c8ebb4b7 100644 --- a/waku/waku_api/rest/serdes.nim +++ b/waku/waku_api/rest/serdes.nim @@ -20,12 +20,12 @@ createJsonFlavor RestJson Json.setWriter JsonWriter, PreferredOutput = string -template unrecognizedFieldWarning*() = +template unrecognizedFieldWarning*(field: typed) = # TODO: There should be a different notification mechanism for informing the # caller of a deserialization routine for unexpected fields. # The chonicles import in this module should be removed. debug "JSON field not recognized by the current version of nwaku. Consider upgrading", - fieldName, typeName = typetraits.name(typeof value) + fieldName, typeName = typetraits.name(typeof field) type SerdesResult*[T] = Result[T, cstring] diff --git a/waku/waku_rendezvous.nim b/waku/waku_rendezvous.nim new file mode 100644 index 000000000..b07f1f727 --- /dev/null +++ b/waku/waku_rendezvous.nim @@ -0,0 +1,3 @@ +import ./waku_rendezvous/protocol + +export protocol diff --git a/waku/waku_rendezvous/common.nim b/waku/waku_rendezvous/common.nim new file mode 100644 index 000000000..9722347cf --- /dev/null +++ b/waku/waku_rendezvous/common.nim @@ -0,0 +1,36 @@ +{.push raises: [].} + +import std/options, chronos + +import ../common/enr, ../waku_enr/capabilities, ../waku_enr/sharding + +const DiscoverLimit* = 1000 +const DefaultRegistrationTTL* = 60.seconds +const DefaultRegistrationInterval* = 10.seconds +const PeersRequestedCount* = 12 + +proc computeNamespace*(clusterId: uint16, shard: uint16): string = + var namespace = "rs/" + + namespace &= $clusterId + namespace &= '/' + namespace &= $shard + + return namespace + +proc computeNamespace*(clusterId: uint16, shard: uint16, cap: Capabilities): string = + var namespace = "rs/" + + namespace &= $clusterId + namespace &= '/' + namespace &= $shard + namespace &= '/' + namespace &= $cap + + return namespace + +proc getRelayShards*(enr: enr.Record): Option[RelayShards] = + let typedRecord = enr.toTyped().valueOr: + return none(RelayShards) + + return typedRecord.relaySharding() diff --git a/waku/waku_rendezvous/protocol.nim b/waku/waku_rendezvous/protocol.nim new file mode 100644 index 000000000..221321c95 --- /dev/null +++ b/waku/waku_rendezvous/protocol.nim @@ -0,0 +1,267 @@ +{.push raises: [].} + +import + std/[sugar, options], + results, + chronos, + chronicles, + metrics, + libp2p/protocols/rendezvous, + libp2p/switch, + libp2p/utility + +import + ../node/peer_manager, + ../common/enr, + ../waku_enr/capabilities, + ../waku_enr/sharding, + ../waku_core/peers, + ../waku_core/topics, + ./common + +logScope: + topics = "waku rendezvous" + +declarePublicCounter rendezvousPeerFoundTotal, + "total number of peers found via rendezvous" + +type WakuRendezVous* = ref object + rendezvous: Rendezvous + peerManager: PeerManager + + relayShard: RelayShards + capabilities: seq[Capabilities] + + periodicRegistrationFut: Future[void] + +proc batchAdvertise*( + self: WakuRendezVous, + namespace: string, + ttl: Duration = DefaultRegistrationTTL, + peers: seq[PeerId], +): Future[Result[void, string]] {.async: (raises: []).} = + ## Register with all rendezvous peers under a namespace + + # rendezvous.advertise except already opened connections + # must dial first + var futs = collect(newSeq): + for peerId in peers: + self.peerManager.dialPeer(peerId, RendezVousCodec) + + let dialCatch = catch: + await allFinished(futs) + + if dialCatch.isErr(): + return err("batchAdvertise: " & dialCatch.error.msg) + + futs = dialCatch.get() + + let conns = collect(newSeq): + for fut in futs: + let catchable = catch: + fut.read() + + if catchable.isErr(): + error "rendezvous dial failed", error = catchable.error.msg + continue + + let connOpt = catchable.get() + + let conn = connOpt.valueOr: + continue + + conn + + let advertCatch = catch: + await self.rendezvous.advertise(namespace, ttl, peers) + + for conn in conns: + await conn.close() + + if advertCatch.isErr(): + return err("batchAdvertise: " & advertCatch.error.msg) + + return ok() + +proc batchRequest*( + self: WakuRendezVous, + namespace: string, + count: int = DiscoverLimit, + peers: seq[PeerId], +): Future[Result[seq[PeerRecord], string]] {.async: (raises: []).} = + ## Request all records from all rendezvous peers matching a namespace + + # rendezvous.request except already opened connections + # must dial first + var futs = collect(newSeq): + for peerId in peers: + self.peerManager.dialPeer(peerId, RendezVousCodec) + + let dialCatch = catch: + await allFinished(futs) + + if dialCatch.isErr(): + return err("batchRequest: " & dialCatch.error.msg) + + futs = dialCatch.get() + + let conns = collect(newSeq): + for fut in futs: + let catchable = catch: + fut.read() + + if catchable.isErr(): + error "rendezvous dial failed", error = catchable.error.msg + continue + + let connOpt = catchable.get() + + let conn = connOpt.valueOr: + continue + + conn + + let reqCatch = catch: + await self.rendezvous.request(namespace, count, peers) + + for conn in conns: + await conn.close() + + if reqCatch.isErr(): + return err("batchRequest: " & reqCatch.error.msg) + + return ok(reqCatch.get()) + +proc advertiseAll( + self: WakuRendezVous +): Future[Result[void, string]] {.async: (raises: []).} = + debug "waku rendezvous advertisements started" + + let pubsubTopics = self.relayShard.topics() + + let futs = collect(newSeq): + for pubsubTopic in pubsubTopics: + # Get a random RDV peer for that shard + let rpi = self.peerManager.selectPeer(RendezVousCodec, some($pubsubTopic)).valueOr: + error "could not get a peer supporting RendezVousCodec" + continue + + let namespace = computeNamespace(pubsubTopic.clusterId, pubsubTopic.shardId) + + # Advertise yourself on that peer + self.batchAdvertise(namespace, DefaultRegistrationTTL, @[rpi.peerId]) + + let catchable = catch: + await allFinished(futs) + + if catchable.isErr(): + return err(catchable.error.msg) + + for fut in catchable.get(): + if fut.failed(): + error "rendezvous advertisement failed", error = fut.error.msg + + debug "waku rendezvous advertisements finished" + + return ok() + +proc initialRequestAll*( + self: WakuRendezVous +): Future[Result[void, string]] {.async: (raises: []).} = + debug "waku rendezvous initial requests started" + + let pubsubTopics = self.relayShard.topics() + + let futs = collect(newSeq): + for pubsubTopic in pubsubTopics: + let namespace = computeNamespace(pubsubTopic.clusterId, pubsubTopic.shardId) + + # Get a random RDV peer for that shard + let rpi = self.peerManager.selectPeer(RendezVousCodec, some($pubsubTopic)).valueOr: + error "could not get a peer supporting RendezVousCodec" + continue + + # Ask for peer records for that shard + self.batchRequest(namespace, PeersRequestedCount, @[rpi.peerId]) + + let catchable = catch: + await allFinished(futs) + + if catchable.isErr(): + return err(catchable.error.msg) + + for fut in catchable.get(): + if fut.failed(): + error "rendezvous request failed", error = fut.error.msg + elif fut.finished(): + let res = fut.value() + + let records = res.valueOr: + return err($res.error) + + for record in records: + rendezvousPeerFoundTotal.inc() + self.peerManager.addPeer(record) + + debug "waku rendezvous initial requests finished" + + return ok() + +proc periodicRegistration(self: WakuRendezVous) {.async.} = + debug "waku rendezvous periodic registration started", + interval = DefaultRegistrationInterval + + # infinite loop + while true: + await sleepAsync(DefaultRegistrationInterval) + + (await self.advertiseAll()).isOkOr: + debug "waku rendezvous advertisements failed", error = error + +proc new*( + T: type WakuRendezVous, switch: Switch, peerManager: PeerManager, enr: Record +): Result[T, string] {.raises: [].} = + let relayshard = getRelayShards(enr).valueOr: + warn "Using default cluster id 0" + RelayShards(clusterID: 0, shardIds: @[]) + + let capabilities = enr.getCapabilities() + + let rvCatchable = catch: + RendezVous.new(switch = switch, minDuration = DefaultRegistrationTTL) + + if rvCatchable.isErr(): + return err(rvCatchable.error.msg) + + let rv = rvCatchable.get() + + let mountCatchable = catch: + switch.mount(rv) + + if mountCatchable.isErr(): + return err(mountCatchable.error.msg) + + var wrv = WakuRendezVous() + wrv.rendezvous = rv + wrv.peerManager = peerManager + wrv.relayshard = relayshard + wrv.capabilities = capabilities + + debug "waku rendezvous initialized", + cluster = relayshard.clusterId, + shards = relayshard.shardIds, + capabilities = capabilities + + return ok(wrv) + +proc start*(self: WakuRendezVous) {.async: (raises: []).} = + # start registering forever + self.periodicRegistrationFut = self.periodicRegistration() + + debug "waku rendezvous discovery started" + +proc stopWait*(self: WakuRendezVous) {.async: (raises: []).} = + if not self.periodicRegistrationFut.isNil(): + await self.periodicRegistrationFut.cancelAndWait() + + debug "waku rendezvous discovery stopped"