From 7e7bba4a983c3321f036b73a72f9621d0baaa14d Mon Sep 17 00:00:00 2001 From: Lorenzo Delgado Date: Tue, 25 Oct 2022 14:55:31 +0200 Subject: [PATCH] feat(lightpush): add waku lightpush protocol client --- tests/all_tests_v2.nim | 4 +- tests/v2/test_waku_lightpush.nim | 143 +++++++++++------- tests/v2/test_wakunode_lightpush.nim | 2 +- tests/v2/testlib/common.nim | 3 +- waku/v2/node/waku_metrics.nim | 4 +- waku/v2/node/waku_node.nim | 34 +++-- waku/v2/protocol/waku_lightpush/client.nim | 79 ++++++++++ waku/v2/protocol/waku_lightpush/protocol.nim | 115 ++++++-------- .../waku_lightpush/protocol_metrics.nim | 18 +++ waku/v2/protocol/waku_lightpush/rpc_codec.nim | 3 + waku/v2/protocol/waku_store/client.nim | 3 +- 11 files changed, 263 insertions(+), 145 deletions(-) create mode 100644 waku/v2/protocol/waku_lightpush/client.nim create mode 100644 waku/v2/protocol/waku_lightpush/protocol_metrics.nim diff --git a/tests/all_tests_v2.nim b/tests/all_tests_v2.nim index 801dfe3b5..308c195c7 100644 --- a/tests/all_tests_v2.nim +++ b/tests/all_tests_v2.nim @@ -4,7 +4,6 @@ import # ./v2/test_waku, ./v2/test_wakunode, ./v2/test_wakunode_relay, - ./v2/test_wakunode_lightpush, # Waku Store ./v2/test_message_store_queue_index, ./v2/test_message_store_queue_pagination, @@ -17,6 +16,9 @@ import # TODO: Re-enable store resume test cases (#1282) # ./v2/test_waku_store_resume, ./v2/test_wakunode_store, + # Waku LightPush + ./v2/test_waku_lightpush, + ./v2/test_wakunode_lightpush, # Waku Filter ./v2/test_waku_filter, ./v2/test_wakunode_filter, diff --git a/tests/v2/test_waku_lightpush.nim b/tests/v2/test_waku_lightpush.nim index 4ace16011..34ba8c3d2 100644 --- a/tests/v2/test_waku_lightpush.nim +++ b/tests/v2/test_waku_lightpush.nim @@ -5,78 +5,119 @@ import testutils/unittests, chronicles, chronos, - libp2p/switch, libp2p/crypto/crypto import ../../waku/v2/node/peer_manager/peer_manager, ../../waku/v2/protocol/waku_message, ../../waku/v2/protocol/waku_lightpush, - ../test_helpers, - ./testlib/common + ../../waku/v2/protocol/waku_lightpush/client, + ./testlib/common, + ./testlib/switch -# TODO: Extend lightpush protocol test coverage -procSuite "Waku Lightpush": +proc newTestWakuLightpushNode(switch: Switch, handler: PushMessageHandler): Future[WakuLightPush] {.async.} = + let + peerManager = PeerManager.new(switch) + rng = crypto.newRng() + proto = WakuLightPush.new(peerManager, rng, handler) - asyncTest "handle light push request success": - # TODO: Move here the test case at test_wakunode: light push request success - discard + await proto.start() + switch.mount(proto) - asyncTest "handle light push request fail": - let - key = PrivateKey.random(ECDSA, rng[]).get() - listenSwitch = newStandardSwitch(some(key)) - await listenSwitch.start() + return proto - let dialSwitch = newStandardSwitch() - await dialSwitch.start() +proc newTestWakuLightpushClient(switch: Switch): WakuLightPushClient = + let + peerManager = PeerManager.new(switch) + rng = crypto.newRng() + WakuLightPushClient.new(peerManager, rng) - proc requestHandler(requestId: string, msg: PushRequest) {.gcsafe, closure.} = - # TODO Success return here - debug "handle push req" - check: - 1 == 0 +suite "Waku Lightpush": - # FIXME Unclear how we want to use subscriptions, if at all - let - peerManager = PeerManager.new(dialSwitch) - rng = crypto.newRng() - proto = WakuLightPush.init(peerManager, rng, requestHandler) - - proto.setPeer(listenSwitch.peerInfo.toRemotePeerInfo()) - waitFor proto.start() - dialSwitch.mount(proto) - - - # TODO Can possibly get rid of this if it isn't dynamic - proc requestHandler2(requestId: string, msg: PushRequest) {.gcsafe, closure.} = - debug "push request handler" - # TODO: Also relay message - # TODO: Here we want to send back response with is_success true - discard - - let - peerManager2 = PeerManager.new(listenSwitch) - rng2 = crypto.newRng() - proto2 = WakuLightPush.init(peerManager2, rng2, requestHandler2) - waitFor proto2.start() - listenSwitch.mount(proto2) + asyncTest "push message to pubsub topic is successful": + ## Setup + let + serverSwitch = newTestSwitch() + clientSwitch = newTestSwitch() + await allFutures(serverSwitch.start(), clientSwitch.start()) ## Given + let handlerFuture = newFuture[(string, WakuMessage)]() + let handler = proc(peer: PeerId, pubsubTopic: string, message: WakuMessage): Future[WakuLightPushResult[void]] {.async.} = + handlerFuture.complete((pubsubTopic, message)) + return ok() + let - msg = WakuMessage(payload: @[byte 1, 2, 3], contentTopic: DefaultContentTopic) - rpc = PushRequest(message: msg, pubSubTopic: DefaultPubsubTopic) + server = await newTestWakuLightpushNode(serverSwitch, handler) + client = newTestWakuLightpushClient(clientSwitch) + + let serverPeerId = serverSwitch.peerInfo.toRemotePeerInfo() + + let + topic = DefaultPubsubTopic + message = fakeWakuMessage() ## When - let res = await proto.request(rpc) + let rpc = PushRequest(pubSubTopic: topic, message: message) + let requestRes = await client.request(rpc, serverPeerId) + + require await handlerFuture.withTimeout(100.millis) ## Then - check res.isOk() - let response = res.get() check: - not response.isSuccess + requestRes.isOk() + handlerFuture.finished() + + let (handledMessagePubsubTopic, handledMessage) = handlerFuture.read() + check: + handledMessagePubsubTopic == topic + handledMessage == message ## Cleanup - await allFutures(listenSwitch.stop(), dialSwitch.stop()) \ No newline at end of file + await allFutures(clientSwitch.stop(), serverSwitch.stop()) + + asyncTest "push message to pubsub topic should fail": + ## Setup + let + serverSwitch = newTestSwitch() + clientSwitch = newTestSwitch() + + await allFutures(serverSwitch.start(), clientSwitch.start()) + + ## Given + let error = "test_failure" + + let handlerFuture = newFuture[void]() + let handler = proc(peer: PeerId, pubsubTopic: string, message: WakuMessage): Future[WakuLightPushResult[void]] {.async.} = + handlerFuture.complete() + return err(error) + + let + server = await newTestWakuLightpushNode(serverSwitch, handler) + client = newTestWakuLightpushClient(clientSwitch) + + let serverPeerId = serverSwitch.peerInfo.toRemotePeerInfo() + + let + topic = DefaultPubsubTopic + message = fakeWakuMessage() + + ## When + let rpc = PushRequest(pubSubTopic: topic, message: message) + let requestRes = await client.request(rpc, serverPeerId) + + require await handlerFuture.withTimeout(100.millis) + + ## Then + check: + requestRes.isErr() + handlerFuture.finished() + + let requestError = requestRes.error + check: + requestError == error + + ## Cleanup + await allFutures(clientSwitch.stop(), serverSwitch.stop()) diff --git a/tests/v2/test_wakunode_lightpush.nim b/tests/v2/test_wakunode_lightpush.nim index 5b7e7f882..1e4ebc54f 100644 --- a/tests/v2/test_wakunode_lightpush.nim +++ b/tests/v2/test_wakunode_lightpush.nim @@ -59,7 +59,7 @@ procSuite "WakuNode - Lightpush": ## When let lightpushRes = await lightNode.lightpush(DefaultPubsubTopic, message) - require (await completionFutRelay.withTimeout(5.seconds)) == true + require await completionFutRelay.withTimeout(5.seconds) ## Then check lightpushRes.isOk() diff --git a/tests/v2/testlib/common.nim b/tests/v2/testlib/common.nim index 6551fe1ae..2f3416580 100644 --- a/tests/v2/testlib/common.nim +++ b/tests/v2/testlib/common.nim @@ -1,5 +1,6 @@ import - std/times + std/times, + stew/byteutils import ../../../waku/v2/protocol/waku_message, ../../../waku/v2/utils/time diff --git a/waku/v2/node/waku_metrics.nim b/waku/v2/node/waku_metrics.nim index a8f5f40d1..02f17338c 100644 --- a/waku/v2/node/waku_metrics.nim +++ b/waku/v2/node/waku_metrics.nim @@ -9,8 +9,8 @@ import metrics/chronos_httpserver import ../protocol/waku_filter, - ../protocol/waku_store/protocol_metrics, - ../protocol/waku_lightpush, + ../protocol/waku_store/protocol_metrics as store_metrics, + ../protocol/waku_lightpush/protocol_metrics as lightpush_metrics, ../protocol/waku_swap/waku_swap, ../protocol/waku_peer_exchange, ../utils/collector, diff --git a/waku/v2/node/waku_node.nim b/waku/v2/node/waku_node.nim index 1261292d0..59ee67c7f 100644 --- a/waku/v2/node/waku_node.nim +++ b/waku/v2/node/waku_node.nim @@ -361,21 +361,16 @@ proc publish*(node: WakuNode, topic: Topic, message: WakuMessage) {.async, gcsaf ## Publish a `WakuMessage` to a PubSub topic. `WakuMessage` should contain a ## `contentTopic` field for light node functionality. This field may be also ## be omitted. - ## - ## Status: Implemented. - if node.wakuRelay.isNil: + if node.wakuRelay.isNil(): error "Invalid API call to `publish`. WakuRelay not mounted. Try `lightpush` instead." - # @TODO improved error handling + # TODO: Improve error handling return - let wakuRelay = node.wakuRelay trace "publish", topic=topic, contentTopic=message.contentTopic - var publishingMessage = message let data = message.encode().buffer - - discard await wakuRelay.publish(topic, data) + discard await node.wakuRelay.publish(topic, data) proc lightpush*(node: WakuNode, topic: Topic, message: WakuMessage): Future[WakuLightpushResult[PushResponse]] {.async, gcsafe.} = ## Pushes a `WakuMessage` to a node which relays it further on PubSub topic. @@ -421,7 +416,7 @@ proc resume*(node: WakuNode, peerList: Option[seq[RemotePeerInfo]] = none(seq[Re info "the number of retrieved messages since the last online time: ", number=retrievedMessages.value -# TODO Extend with more relevant info: topics, peers, memory usage, online time, etc +# TODO: Extend with more relevant info: topics, peers, memory usage, online time, etc proc info*(node: WakuNode): WakuInfo = ## Returns information about the Node, such as what multiaddress it can be reached at. ## @@ -594,16 +589,23 @@ proc mountRelay*(node: WakuNode, info "relay mounted successfully" -proc mountLightPush*(node: WakuNode) {.async, raises: [Defect, LPError].} = +proc mountLightPush*(node: WakuNode) {.async.} = info "mounting light push" - if node.wakuRelay.isNil: - debug "mounting lightpush without relay" - node.wakuLightPush = WakuLightPush.init(node.peerManager, node.rng, nil) + var pushHandler: PushMessageHandler + if node.wakuRelay.isNil(): + debug "mounting lightpush without relay (nil)" + # TODO: Remove after using waku lightpush client + pushHandler = proc(peer: PeerId, pubsubTopic: string, message: WakuMessage): Future[WakuLightPushResult[void]] {.async.} = + return err("no waku relay found") else: - debug "mounting lightpush with relay" - node.wakuLightPush = WakuLightPush.init(node.peerManager, node.rng, nil, node.wakuRelay) - + pushHandler = proc(peer: PeerId, pubsubTopic: string, message: WakuMessage): Future[WakuLightPushResult[void]] {.async.} = + discard await node.wakuRelay.publish(pubsubTopic, message.encode().buffer) + return ok() + + debug "mounting lightpush with relay" + node.wakuLightPush = WakuLightPush.new(node.peerManager, node.rng, pushHandler) + if node.started: # Node has started already. Let's start lightpush too. await node.wakuLightPush.start() diff --git a/waku/v2/protocol/waku_lightpush/client.nim b/waku/v2/protocol/waku_lightpush/client.nim new file mode 100644 index 000000000..5da0ab1d1 --- /dev/null +++ b/waku/v2/protocol/waku_lightpush/client.nim @@ -0,0 +1,79 @@ +{.push raises: [Defect].} + +import + std/options, + stew/results, + chronicles, + chronos, + metrics, + bearssl/rand +import + ../../node/peer_manager/peer_manager, + ../../utils/requests, + ./protocol, + ./protocol_metrics, + ./rpc, + ./rpc_codec + + +logScope: + topics = "wakulightpush.client" + + +type WakuLightPushClient* = ref object + peerManager*: PeerManager + rng*: ref rand.HmacDrbgContext + + +proc new*(T: type WakuLightPushClient, + peerManager: PeerManager, + rng: ref rand.HmacDrbgContext): T = + WakuLightPushClient(peerManager: peerManager, rng: rng) + + +proc request*(wl: WakuLightPushClient, req: PushRequest, peer: RemotePeerInfo): Future[WakuLightPushResult[void]] {.async, gcsafe.} = + let connOpt = await wl.peerManager.dialPeer(peer, WakuLightPushCodec) + if connOpt.isNone(): + waku_lightpush_errors.inc(labelValues = [dialFailure]) + return err(dialFailure) + let connection = connOpt.get() + + let rpc = PushRPC(requestId: generateRequestId(wl.rng), request: req) + await connection.writeLP(rpc.encode().buffer) + + var message = await connection.readLp(MaxRpcSize.int) + let decodeRespRes = PushRPC.init(message) + if decodeRespRes.isErr(): + error "failed to decode response" + waku_lightpush_errors.inc(labelValues = [decodeRpcFailure]) + return err(decodeRpcFailure) + + let pushResponseRes = decodeRespRes.get() + if pushResponseRes.response == PushResponse(): + waku_lightpush_errors.inc(labelValues = [emptyResponseBodyFailure]) + return err(emptyResponseBodyFailure) + + let response = pushResponseRes.response + if not response.isSuccess: + if response.info != "": + return err(response.info) + else: + return err("unknown failure") + + return ok() + + +### Set lightpush peer and send push requests + +proc setPeer*(wl: WakuLightPushClient, peer: RemotePeerInfo) = + wl.peerManager.addPeer(peer, WakuLightPushCodec) + waku_lightpush_peers.inc() + +proc request*(wl: WakuLightPushClient, req: PushRequest): Future[WakuLightPushResult[void]] {.async, gcsafe.} = + let peerOpt = wl.peerManager.selectPeer(WakuLightPushCodec) + if peerOpt.isNone(): + error "no suitable remote peers" + waku_lightpush_errors.inc(labelValues = [peerNotFoundFailure]) + return err(peerNotFoundFailure) + + return await wl.request(req, peerOpt.get()) diff --git a/waku/v2/protocol/waku_lightpush/protocol.nim b/waku/v2/protocol/waku_lightpush/protocol.nim index 557f7badb..ce9678c5a 100644 --- a/waku/v2/protocol/waku_lightpush/protocol.nim +++ b/waku/v2/protocol/waku_lightpush/protocol.nim @@ -6,116 +6,86 @@ import chronicles, chronos, metrics, - bearssl/rand, - libp2p/crypto/crypto - + bearssl/rand import ../waku_message, ../waku_relay, ../../node/peer_manager/peer_manager, ../../utils/requests, ./rpc, - ./rpc_codec + ./rpc_codec, + ./protocol_metrics logScope: topics = "wakulightpush" -declarePublicGauge waku_lightpush_peers, "number of lightpush peers" -declarePublicGauge waku_lightpush_errors, "number of lightpush protocol errors", ["type"] -declarePublicGauge waku_lightpush_messages, "number of lightpush messages received", ["type"] - -const - WakuLightPushCodec* = "/vac/waku/lightpush/2.0.0-beta1" - -const - MaxRpcSize* = MaxWakuMessageSize + 64 * 1024 # We add a 64kB safety buffer for protocol overhead - -# Error types (metric label values) -const - dialFailure = "dial_failure" - decodeRpcFailure = "decode_rpc_failure" +const WakuLightPushCodec* = "/vac/waku/lightpush/2.0.0-beta1" type - PushResponseHandler* = proc(response: PushResponse) {.gcsafe, closure.} - - PushRequestHandler* = proc(requestId: string, msg: PushRequest) {.gcsafe, closure.} - WakuLightPushResult*[T] = Result[T, string] + + PushMessageHandler* = proc(peer: PeerId, pubsubTopic: string, message: WakuMessage): Future[WakuLightPushResult[void]] {.gcsafe, closure.} WakuLightPush* = ref object of LPProtocol rng*: ref rand.HmacDrbgContext peerManager*: PeerManager - requestHandler*: PushRequestHandler - relayReference*: WakuRelay - - -proc init*(wl: WakuLightPush) = + pushHandler*: PushMessageHandler +proc initProtocolHandler*(wl: WakuLightPush) = proc handle(conn: Connection, proto: string) {.async, gcsafe, closure.} = - let message = await conn.readLp(MaxRpcSize.int) - let res = PushRPC.init(message) - if res.isErr(): + let buffer = await conn.readLp(MaxRpcSize.int) + let reqDecodeRes = PushRPC.init(buffer) + if reqDecodeRes.isErr(): error "failed to decode rpc" waku_lightpush_errors.inc(labelValues = [decodeRpcFailure]) return - let rpc = res.get() + let req = reqDecodeRes.get() + if req.request == PushRequest(): + error "invalid lightpush rpc received", error=emptyRequestBodyFailure + waku_lightpush_errors.inc(labelValues = [emptyRequestBodyFailure]) + return - if rpc.request != PushRequest(): - info "lightpush push request" - waku_lightpush_messages.inc(labelValues = ["PushRequest"]) + waku_lightpush_messages.inc(labelValues = ["PushRequest"]) + let + pubSubTopic = req.request.pubSubTopic + message = req.request.message + debug "push request", peerId=conn.peerId, requestId=req.requestId, pubsubTopic=pubsubTopic - let - pubSubTopic = rpc.request.pubSubTopic - message = rpc.request.message - debug "PushRequest", pubSubTopic=pubSubTopic, msg=message + var response: PushResponse + let handleRes = await wl.pushHandler(conn.peerId, pubsubTopic, message) + if handleRes.isOk(): + response = PushResponse(is_success: true, info: "OK") + else: + response = PushResponse(is_success: false, info: handleRes.error) + waku_lightpush_errors.inc(labelValues = [messagePushFailure]) + error "pushed message handling failed", error=handleRes.error - var response: PushResponse - if not wl.relayReference.isNil(): - let data = message.encode().buffer - - # Assumimng success, should probably be extended to check for network, peers, etc - discard wl.relayReference.publish(pubSubTopic, data) - response = PushResponse(is_success: true, info: "Totally.") - else: - debug "No relay protocol present, unsuccesssful push" - response = PushResponse(is_success: false, info: "No relay protocol") - - - let rpc = PushRPC(requestId: rpc.requestId, response: response) - await conn.writeLp(rpc.encode().buffer) - - if rpc.response != PushResponse(): - waku_lightpush_messages.inc(labelValues = ["PushResponse"]) - if rpc.response.isSuccess: - info "lightpush message success" - else: - info "lightpush message failure", info=rpc.response.info + let rpc = PushRPC(requestId: req.requestId, response: response) + await conn.writeLp(rpc.encode().buffer) wl.handler = handle wl.codec = WakuLightPushCodec -proc init*(T: type WakuLightPush, peerManager: PeerManager, rng: ref rand.HmacDrbgContext, handler: PushRequestHandler, relay: WakuRelay = nil): T = - debug "init" - let rng = crypto.newRng() - let wl = WakuLightPush(rng: rng, - peerManager: peerManager, - requestHandler: handler, - relayReference: relay) - wl.init() - +proc new*(T: type WakuLightPush, + peerManager: PeerManager, + rng: ref rand.HmacDrbgContext, + pushHandler: PushMessageHandler): T = + let wl = WakuLightPush(rng: rng, peerManager: peerManager, pushHandler: pushHandler) + wl.initProtocolHandler() return wl -proc setPeer*(wlp: WakuLightPush, peer: RemotePeerInfo) = +proc setPeer*(wlp: WakuLightPush, peer: RemotePeerInfo) {. + deprecated: "Use 'WakuLightPushClient.setPeer()' instead" .} = wlp.peerManager.addPeer(peer, WakuLightPushCodec) waku_lightpush_peers.inc() - -proc request(wl: WakuLightPush, req: PushRequest, peer: RemotePeerInfo): Future[WakuLightPushResult[PushResponse]] {.async, gcsafe.} = +proc request(wl: WakuLightPush, req: PushRequest, peer: RemotePeerInfo): Future[WakuLightPushResult[PushResponse]] {.async, gcsafe, + deprecated: "Use 'WakuLightPushClient.request()' instead" .} = let connOpt = await wl.peerManager.dialPeer(peer, WakuLightPushCodec) if connOpt.isNone(): waku_lightpush_errors.inc(labelValues = [dialFailure]) @@ -139,7 +109,8 @@ proc request(wl: WakuLightPush, req: PushRequest, peer: RemotePeerInfo): Future[ return ok(rpcRes.response) -proc request*(wl: WakuLightPush, req: PushRequest): Future[WakuLightPushResult[PushResponse]] {.async, gcsafe.} = +proc request*(wl: WakuLightPush, req: PushRequest): Future[WakuLightPushResult[PushResponse]] {.async, gcsafe, + deprecated: "Use 'WakuLightPushClient.request()' instead" .} = let peerOpt = wl.peerManager.selectPeer(WakuLightPushCodec) if peerOpt.isNone(): waku_lightpush_errors.inc(labelValues = [dialFailure]) diff --git a/waku/v2/protocol/waku_lightpush/protocol_metrics.nim b/waku/v2/protocol/waku_lightpush/protocol_metrics.nim new file mode 100644 index 000000000..469da1c94 --- /dev/null +++ b/waku/v2/protocol/waku_lightpush/protocol_metrics.nim @@ -0,0 +1,18 @@ +{.push raises: [Defect].} + +import metrics + + +declarePublicGauge waku_lightpush_peers, "number of lightpush peers" +declarePublicGauge waku_lightpush_errors, "number of lightpush protocol errors", ["type"] +declarePublicGauge waku_lightpush_messages, "number of lightpush messages received", ["type"] + + +# Error types (metric label values) +const + dialFailure* = "dial_failure" + decodeRpcFailure* = "decode_rpc_failure" + peerNotFoundFailure* = "peer_not_found_failure" + emptyRequestBodyFailure* = "empty_request_body_failure" + emptyResponseBodyFailure* = "empty_response_body_failure" + messagePushFailure* = "message_push_failure" diff --git a/waku/v2/protocol/waku_lightpush/rpc_codec.nim b/waku/v2/protocol/waku_lightpush/rpc_codec.nim index 7d4c0e21b..9647afcf9 100644 --- a/waku/v2/protocol/waku_lightpush/rpc_codec.nim +++ b/waku/v2/protocol/waku_lightpush/rpc_codec.nim @@ -9,6 +9,9 @@ import ./rpc +const MaxRpcSize* = MaxWakuMessageSize + 64 * 1024 # We add a 64kB safety buffer for protocol overhead + + proc encode*(rpc: PushRequest): ProtoBuffer = var output = initProtoBuffer() output.write3(1, rpc.pubSubTopic) diff --git a/waku/v2/protocol/waku_store/client.nim b/waku/v2/protocol/waku_store/client.nim index 7a1934714..a75522650 100644 --- a/waku/v2/protocol/waku_store/client.nim +++ b/waku/v2/protocol/waku_store/client.nim @@ -49,7 +49,7 @@ proc query*(w: WakuStoreClient, req: HistoryQuery, peer: RemotePeerInfo): Future let rpc = HistoryRPC(requestId: generateRequestId(w.rng), query: req) await connection.writeLP(rpc.encode().buffer) - var message = await connOpt.get().readLp(MaxRpcSize.int) + var message = await connection.readLp(MaxRpcSize.int) let response = HistoryRPC.init(message) if response.isErr(): @@ -114,6 +114,7 @@ proc queryLoop*(w: WakuStoreClient, req: HistoryQuery, peers: seq[RemotePeerInfo proc setPeer*(ws: WakuStoreClient, peer: RemotePeerInfo) = ws.peerManager.addPeer(peer, WakuStoreCodec) + waku_store_peers.inc() proc query*(w: WakuStoreClient, req: HistoryQuery): Future[WakuStoreResult[HistoryResponse]] {.async, gcsafe.} = # TODO: We need to be more stratigic about which peers we dial. Right now we just set one on the service.