Await WakuRelay publish on node (#313)

* Await WakuRelay publish on node

* Reflect 'await' change on CHANGELOG
This commit is contained in:
Hanno Cornelius 2020-12-02 10:40:53 +02:00 committed by GitHub
parent d3c5840a79
commit 8a1ca1ff8f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 20 additions and 14 deletions

View File

@ -1,5 +1,9 @@
# Changelog
## Next version
- Calls to `publish` a message on `wakunode2` now `await` instead of `discard` dispatched [`WakuRelay`](https://github.com/vacp2p/specs/blob/master/specs/waku/v2/waku-relay.md) procedures
## 2020-11-30 v0.1
Initial beta release.

View File

@ -40,7 +40,7 @@ proc runBackground() {.async.} =
# Publish to a topic
let payload = cast[seq[byte]]("hello world")
let message = WakuMessage(payload: payload, contentTopic: ContentTopic(1))
node.publish(topic, message)
await node.publish(topic, message)
# TODO Await with try/except here
discard runBackground()

View File

@ -94,14 +94,14 @@ proc publish(c: Chat, line: string) =
if encodedPayload.isOk():
let message = WakuMessage(payload: encodedPayload.get(),
contentTopic: DefaultContentTopic, version: version)
c.node.publish(DefaultTopic, message)
asyncSpawn c.node.publish(DefaultTopic, message)
else:
warn "Payload encoding failed", error = encodedPayload.error
else:
# No payload encoding/encryption from Waku
let message = WakuMessage(payload: line.toBytes(),
contentTopic: DefaultContentTopic, version: 0)
c.node.publish(DefaultTopic, message)
asyncSpawn c.node.publish(DefaultTopic, message)
# TODO This should read or be subscribe handler subscribe
proc readAndPrint(c: Chat) {.async.} =

View File

@ -156,7 +156,7 @@ procSuite "Waku v2 JSON-RPC API":
response == true
# Now publish a message on node1 and see if we receive it on node3
node1.publish(pubSubTopic, message)
await node1.publish(pubSubTopic, message)
await sleepAsync(2000.millis)

View File

@ -57,7 +57,7 @@ procSuite "WakuNode":
await sleepAsync(2000.millis)
node.publish(pubSubTopic, message)
await node.publish(pubSubTopic, message)
check:
(await completionFut.withTimeout(5.seconds)) == true
@ -120,7 +120,7 @@ procSuite "WakuNode":
await sleepAsync(2000.millis)
info "Waking up and publishing"
node2.publish(pubSubTopic, message)
await node2.publish(pubSubTopic, message)
check:
(await completionFut.withTimeout(5.seconds)) == true
@ -243,7 +243,7 @@ procSuite "WakuNode":
await node3.subscribe(pubSubTopic, relayHandler)
await sleepAsync(2000.millis)
node1.publish(pubSubTopic, message)
await node1.publish(pubSubTopic, message)
await sleepAsync(2000.millis)
check:

View File

@ -34,9 +34,12 @@ proc installRelayApiHandlers*(node: WakuNode, rpcsrv: RpcServer) =
## Publishes a WakuMessage to a PubSub topic
debug "post_waku_v2_relay_v1_message"
node.publish(topic, message.toWakuMessage(version = 0))
return true
if (await node.publish(topic, message.toWakuMessage(version = 0)).withTimeout(futTimeout)):
# Successfully published message
return true
else:
# Failed to publish message to topic
raise newException(ValueError, "Failed to publish to topic " & topic)
rpcsrv.rpc("get_waku_v2_relay_v1_messages") do(topic: string) -> seq[WakuMessage]:
## Returns all WakuMessages received on a PubSub topic since the

View File

@ -33,7 +33,7 @@ proc setupWakuRPC*(node: WakuNode, rpcsrv: RpcServer) =
warn "waku_publish decode error", msg=msg
debug "waku_publish", topic=topic, payload=payload, msg=msg[]
node.publish(topic, msg[])
await node.publish(topic, msg[])
return true
#if not result:
# raise newException(ValueError, "Message could not be posted")

View File

@ -216,7 +216,7 @@ proc unsubscribe*(node: WakuNode, request: FilterRequest) {.async, gcsafe.} =
node.filters.removeContentFilters(request.contentFilters)
proc publish*(node: WakuNode, topic: Topic, message: WakuMessage) =
proc publish*(node: WakuNode, topic: Topic, message: WakuMessage) {.async, gcsafe.} =
## Publish a `WakuMessage` to a PubSub topic. `WakuMessage` should contain a
## `contentTopic` field for light node functionality. This field may be also
## be omitted.
@ -229,8 +229,7 @@ proc publish*(node: WakuNode, topic: Topic, message: WakuMessage) =
debug "publish", topic=topic, contentTopic=message.contentTopic
let data = message.encode().buffer
# XXX Consider awaiting here
discard wakuRelay.publish(topic, data)
discard await wakuRelay.publish(topic, data)
proc query*(node: WakuNode, query: HistoryQuery, handler: QueryHandlerFunc) {.async, gcsafe.} =
## Queries known nodes for historical messages. Triggers the handler whenever a response is received.