From 8a1ca1ff8fc88c47c6ff4a4f0f35ce450337eb6d Mon Sep 17 00:00:00 2001 From: Hanno Cornelius <68783915+jm-clius@users.noreply.github.com> Date: Wed, 2 Dec 2020 10:40:53 +0200 Subject: [PATCH] Await WakuRelay publish on node (#313) * Await WakuRelay publish on node * Reflect 'await' change on CHANGELOG --- CHANGELOG.md | 4 ++++ examples/v2/basic2.nim | 2 +- examples/v2/chat2.nim | 4 ++-- tests/v2/test_jsonrpc_waku.nim | 2 +- tests/v2/test_wakunode.nim | 6 +++--- waku/v2/node/jsonrpc/relay_api.nim | 9 ++++++--- waku/v2/node/rpc/wakurpc.nim | 2 +- waku/v2/node/wakunode2.nim | 5 ++--- 8 files changed, 20 insertions(+), 14 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 44fef462c..7bc6a0dd6 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,9 @@ # Changelog +## Next version + +- Calls to `publish` a message on `wakunode2` now `await` instead of `discard` dispatched [`WakuRelay`](https://github.com/vacp2p/specs/blob/master/specs/waku/v2/waku-relay.md) procedures + ## 2020-11-30 v0.1 Initial beta release. diff --git a/examples/v2/basic2.nim b/examples/v2/basic2.nim index 63dd6acaa..bf64703a3 100644 --- a/examples/v2/basic2.nim +++ b/examples/v2/basic2.nim @@ -40,7 +40,7 @@ proc runBackground() {.async.} = # Publish to a topic let payload = cast[seq[byte]]("hello world") let message = WakuMessage(payload: payload, contentTopic: ContentTopic(1)) - node.publish(topic, message) + await node.publish(topic, message) # TODO Await with try/except here discard runBackground() diff --git a/examples/v2/chat2.nim b/examples/v2/chat2.nim index dbe0e7169..f86808e69 100644 --- a/examples/v2/chat2.nim +++ b/examples/v2/chat2.nim @@ -94,14 +94,14 @@ proc publish(c: Chat, line: string) = if encodedPayload.isOk(): let message = WakuMessage(payload: encodedPayload.get(), contentTopic: DefaultContentTopic, version: version) - c.node.publish(DefaultTopic, message) + asyncSpawn c.node.publish(DefaultTopic, message) else: warn "Payload encoding failed", error = encodedPayload.error else: # No payload encoding/encryption from Waku let message = WakuMessage(payload: line.toBytes(), contentTopic: DefaultContentTopic, version: 0) - c.node.publish(DefaultTopic, message) + asyncSpawn c.node.publish(DefaultTopic, message) # TODO This should read or be subscribe handler subscribe proc readAndPrint(c: Chat) {.async.} = diff --git a/tests/v2/test_jsonrpc_waku.nim b/tests/v2/test_jsonrpc_waku.nim index 001aab3ed..13e5d95e7 100644 --- a/tests/v2/test_jsonrpc_waku.nim +++ b/tests/v2/test_jsonrpc_waku.nim @@ -156,7 +156,7 @@ procSuite "Waku v2 JSON-RPC API": response == true # Now publish a message on node1 and see if we receive it on node3 - node1.publish(pubSubTopic, message) + await node1.publish(pubSubTopic, message) await sleepAsync(2000.millis) diff --git a/tests/v2/test_wakunode.nim b/tests/v2/test_wakunode.nim index f8ec8242e..51a211191 100644 --- a/tests/v2/test_wakunode.nim +++ b/tests/v2/test_wakunode.nim @@ -57,7 +57,7 @@ procSuite "WakuNode": await sleepAsync(2000.millis) - node.publish(pubSubTopic, message) + await node.publish(pubSubTopic, message) check: (await completionFut.withTimeout(5.seconds)) == true @@ -120,7 +120,7 @@ procSuite "WakuNode": await sleepAsync(2000.millis) info "Waking up and publishing" - node2.publish(pubSubTopic, message) + await node2.publish(pubSubTopic, message) check: (await completionFut.withTimeout(5.seconds)) == true @@ -243,7 +243,7 @@ procSuite "WakuNode": await node3.subscribe(pubSubTopic, relayHandler) await sleepAsync(2000.millis) - node1.publish(pubSubTopic, message) + await node1.publish(pubSubTopic, message) await sleepAsync(2000.millis) check: diff --git a/waku/v2/node/jsonrpc/relay_api.nim b/waku/v2/node/jsonrpc/relay_api.nim index d0b75c1f1..b346ca5c3 100644 --- a/waku/v2/node/jsonrpc/relay_api.nim +++ b/waku/v2/node/jsonrpc/relay_api.nim @@ -34,9 +34,12 @@ proc installRelayApiHandlers*(node: WakuNode, rpcsrv: RpcServer) = ## Publishes a WakuMessage to a PubSub topic debug "post_waku_v2_relay_v1_message" - node.publish(topic, message.toWakuMessage(version = 0)) - - return true + if (await node.publish(topic, message.toWakuMessage(version = 0)).withTimeout(futTimeout)): + # Successfully published message + return true + else: + # Failed to publish message to topic + raise newException(ValueError, "Failed to publish to topic " & topic) rpcsrv.rpc("get_waku_v2_relay_v1_messages") do(topic: string) -> seq[WakuMessage]: ## Returns all WakuMessages received on a PubSub topic since the diff --git a/waku/v2/node/rpc/wakurpc.nim b/waku/v2/node/rpc/wakurpc.nim index 540b79e4d..de35f08e6 100644 --- a/waku/v2/node/rpc/wakurpc.nim +++ b/waku/v2/node/rpc/wakurpc.nim @@ -33,7 +33,7 @@ proc setupWakuRPC*(node: WakuNode, rpcsrv: RpcServer) = warn "waku_publish decode error", msg=msg debug "waku_publish", topic=topic, payload=payload, msg=msg[] - node.publish(topic, msg[]) + await node.publish(topic, msg[]) return true #if not result: # raise newException(ValueError, "Message could not be posted") diff --git a/waku/v2/node/wakunode2.nim b/waku/v2/node/wakunode2.nim index 75e23f4a1..5d4c1f173 100644 --- a/waku/v2/node/wakunode2.nim +++ b/waku/v2/node/wakunode2.nim @@ -216,7 +216,7 @@ proc unsubscribe*(node: WakuNode, request: FilterRequest) {.async, gcsafe.} = node.filters.removeContentFilters(request.contentFilters) -proc publish*(node: WakuNode, topic: Topic, message: WakuMessage) = +proc publish*(node: WakuNode, topic: Topic, message: WakuMessage) {.async, gcsafe.} = ## Publish a `WakuMessage` to a PubSub topic. `WakuMessage` should contain a ## `contentTopic` field for light node functionality. This field may be also ## be omitted. @@ -229,8 +229,7 @@ proc publish*(node: WakuNode, topic: Topic, message: WakuMessage) = debug "publish", topic=topic, contentTopic=message.contentTopic let data = message.encode().buffer - # XXX Consider awaiting here - discard wakuRelay.publish(topic, data) + discard await wakuRelay.publish(topic, data) proc query*(node: WakuNode, query: HistoryQuery, handler: QueryHandlerFunc) {.async, gcsafe.} = ## Queries known nodes for historical messages. Triggers the handler whenever a response is received.