diff --git a/waku/v2/node/wakunode2.nim b/waku/v2/node/wakunode2.nim index 5228bc5ec..28366a067 100644 --- a/waku/v2/node/wakunode2.nim +++ b/waku/v2/node/wakunode2.nim @@ -339,33 +339,52 @@ proc publish*(node: WakuNode, topic: Topic, message: WakuMessage) {.async, gcsaf discard await wakuRelay.publish(topic, data) -proc lightpush*(node: WakuNode, topic: Topic, message: WakuMessage, handler: PushResponseHandler) {.async, gcsafe.} = +proc lightpush*(node: WakuNode, topic: Topic, message: WakuMessage): Future[WakuLightpushResult[PushResponse]] {.async, gcsafe.} = ## Pushes a `WakuMessage` to a node which relays it further on PubSub topic. - ## Returns whether relaying was successful or not in `handler`. + ## Returns whether relaying was successful or not. ## `WakuMessage` should contain a `contentTopic` field for light node - ## functionality. This field may be also be omitted. - ## - ## Status: Implemented. - + ## functionality. debug "Publishing with lightpush", topic=topic, contentTopic=message.contentTopic let rpc = PushRequest(pubSubTopic: topic, message: message) - await node.wakuLightPush.request(rpc, handler) + return await node.wakuLightPush.request(rpc) -proc query*(node: WakuNode, query: HistoryQuery, handler: QueryHandlerFunc) {.async, gcsafe.} = - ## Queries known nodes for historical messages. Triggers the handler whenever a response is received. - ## QueryHandlerFunc is a method that takes a HistoryResponse. - ## - ## Status: Implemented. +proc lightpush*(node: WakuNode, topic: Topic, message: WakuMessage, handler: PushResponseHandler) {.async, gcsafe, + deprecated: "Use the no-callback version of this method".} = + ## Pushes a `WakuMessage` to a node which relays it further on PubSub topic. + ## Returns whether relaying was successful or not in `handler`. + ## `WakuMessage` should contain a `contentTopic` field for light node + ## functionality. - # TODO Once waku swap is less experimental, this can simplified + let rpc = PushRequest(pubSubTopic: topic, message: message) + let res = await node.wakuLightPush.request(rpc) + if res.isOk(): + handler(res.value) + else: + error "Message lightpush failed", error=res.error() + +proc query*(node: WakuNode, query: HistoryQuery): Future[WakuStoreResult[HistoryResponse]] {.async, gcsafe.} = + ## Queries known nodes for historical messages + + # TODO: Once waku swap is less experimental, this can simplified if node.wakuSwap.isNil: debug "Using default query" - await node.wakuStore.query(query, handler) + return await node.wakuStore.query(query) else: - debug "Using SWAPAccounting query" - # TODO wakuSwap now part of wakuStore object - await node.wakuStore.queryWithAccounting(query, handler) + debug "Using SWAP accounting query" + # TODO: wakuSwap now part of wakuStore object + return await node.wakuStore.queryWithAccounting(query) + +proc query*(node: WakuNode, query: HistoryQuery, handler: QueryHandlerFunc) {.async, gcsafe, + deprecated: "Use the no-callback version of this method".} = + ## Queries known nodes for historical messages. Triggers the handler whenever a response is received. + ## QueryHandlerFunc is a method that takes a HistoryResponse. + + let res = await node.query(query) + if res.isOk(): + handler(res.value) + else: + error "History query failed", error=res.error() proc resume*(node: WakuNode, peerList: Option[seq[RemotePeerInfo]] = none(seq[RemotePeerInfo])) {.async, gcsafe.} = ## resume proc retrieves the history of waku messages published on the default waku pubsub topic since the last time the waku node has been online diff --git a/waku/v2/protocol/waku_lightpush/protocol.nim b/waku/v2/protocol/waku_lightpush/protocol.nim index 628614575..8644ef7d2 100644 --- a/waku/v2/protocol/waku_lightpush/protocol.nim +++ b/waku/v2/protocol/waku_lightpush/protocol.nim @@ -146,11 +146,3 @@ proc request*(wl: WakuLightPush, req: PushRequest): Future[WakuLightPushResult[P return err(dialFailure) return await wl.request(req, peerOpt.get()) - -proc request*(wl: WakuLightPush, req: PushRequest, handler: PushResponseHandler) {.async, gcsafe, - deprecated: "Use the no-callback request() procedure".} = - let res = await wl.request(req) - if res.isErr(): - return - - handler(res.get()) \ No newline at end of file diff --git a/waku/v2/protocol/waku_store/protocol.nim b/waku/v2/protocol/waku_store/protocol.nim index ad308967e..2b97eb9a8 100644 --- a/waku/v2/protocol/waku_store/protocol.nim +++ b/waku/v2/protocol/waku_store/protocol.nim @@ -273,32 +273,8 @@ proc query*(w: WakuStore, req: HistoryQuery): Future[WakuStoreResult[HistoryResp return await w.query(req, peerOpt.get()) -proc query*(w: WakuStore, req: HistoryQuery, handler: QueryHandlerFunc) {.async, gcsafe, - deprecated: "Use the no-callback version of this method".} = - - let response = await w.query(req) - if response.isErr(): - error "history query failed", error=response.error() - return - - handler(response.get()) - - ## 21/WAKU2-FAULT-TOLERANT-STORE -proc queryFrom*(w: WakuStore, req: HistoryQuery, handler: QueryHandlerFunc, peer: RemotePeerInfo): Future[WakuStoreResult[uint64]] {.async, gcsafe, - deprecated: "Use the query() no-callback procedure instead".} = - ## Sends the query to the given peer. Returns the number of retrieved messages if no error occurs, otherwise returns the error string - # TODO: dialPeer add it to the list of known peers, while it does not cause any issue but might be unnecessary - let res = await w.query(req, peer) - if res.isErr(): - return err(res.error()) - - let response = res.get() - - handler(response) - return ok(response.messages.len.uint64) - proc queryFromWithPaging*(w: WakuStore, query: HistoryQuery, peer: RemotePeerInfo): Future[WakuStoreResult[seq[WakuMessage]]] {.async, gcsafe.} = ## A thin wrapper for query. Sends the query to the given peer. when the query has a valid pagingInfo, ## it retrieves the historical messages in pages. @@ -475,18 +451,3 @@ proc queryWithAccounting*(ws: WakuStore, req: HistoryQuery): Future[WakuStoreRes ws.wakuSwap.debit(peerOpt.get().peerId, response.messages.len) return ok(response) - -proc queryWithAccounting*(ws: WakuStore, req: HistoryQuery, handler: QueryHandlerFunc) {.async, gcsafe, - deprecated: "Use the no-callback procedure instead".} = - # TODO: We need to be more stratigic about which peers we dial. Right now we just set one on the service. - # Ideally depending on the query and our set of peers we take a subset of ideal peers. - # This will require us to check for various factors such as: - # - which topics they track - # - latency? - # - default store peer? - let response = await ws.queryWithAccounting(req) - if response.isErr(): - error "history query failed", error=response.error() - return - - handler(response.get())