mirror of
https://github.com/logos-messaging/logos-messaging-nim.git
synced 2026-01-05 23:43:07 +00:00
Add support for WakuMessage and merge publish function (#122)
* Add WakuMessage type * Add WakuMessage encoding and decoding Also clean up waku_types module and imports * Clean up waku_relay module Imports, remove old text test, make fields and functions public, format. * Publish WakuMessage Also fix type mismatch in RPC * Make publish work in examples, node and protocol * Parse protobuf content in examples * Update docs * Update waku/node/v2/waku_types.nim * Fix compilation error and disable out of date waku test Co-authored-by: Kim De Mey <kim.demey@gmail.com>
This commit is contained in:
parent
91344e3ce2
commit
6c511c92c5
@ -46,19 +46,12 @@ method unsubscribe*(w: WakuNode, contentFilter: ContentFilter)
|
|||||||
## Status: Not yet implemented.
|
## Status: Not yet implemented.
|
||||||
## TODO Implement.
|
## TODO Implement.
|
||||||
|
|
||||||
method publish*(w: WakuNode, topic: Topic, message: Message)
|
method publish*(w: WakuNode, topic: Topic, message: WakuMessage)
|
||||||
## Publish a `Message` to a PubSub topic.
|
## Publish a `WakuMessage` to a PubSub topic. `WakuMessage` should contain a
|
||||||
|
## `contentTopic` field for light node functionality. This field may be also
|
||||||
|
## be omitted.
|
||||||
##
|
##
|
||||||
## Status: Partially implemented.
|
## Status: Implemented.
|
||||||
## TODO WakuMessage OR seq[byte]. NOT PubSub Message.
|
|
||||||
|
|
||||||
method publish*(w: WakuNode, topic: Topic, contentFilter: ContentFilter, message: Message)
|
|
||||||
## Publish a `Message` to a PubSub topic with a specific content filter.
|
|
||||||
## Currently this means a `contentTopic`.
|
|
||||||
##
|
|
||||||
## Status: Not yet implemented.
|
|
||||||
## TODO Implement as wrapper around `waku_relay` and `publish`.
|
|
||||||
## TODO WakuMessage. Ensure content filter is in it.
|
|
||||||
|
|
||||||
method query*(w: WakuNode, query: HistoryQuery): HistoryResponse
|
method query*(w: WakuNode, query: HistoryQuery): HistoryResponse
|
||||||
## Queries for historical messages.
|
## Queries for historical messages.
|
||||||
|
|||||||
@ -11,6 +11,9 @@ import
|
|||||||
../../waku/node/v2/[config, wakunode2, waku_types],
|
../../waku/node/v2/[config, wakunode2, waku_types],
|
||||||
../../waku/node/common
|
../../waku/node/common
|
||||||
|
|
||||||
|
type
|
||||||
|
Topic* = waku_types.Topic
|
||||||
|
|
||||||
# Node operations happens asynchronously
|
# Node operations happens asynchronously
|
||||||
proc runBackground() {.async.} =
|
proc runBackground() {.async.} =
|
||||||
let
|
let
|
||||||
@ -24,13 +27,16 @@ proc runBackground() {.async.} =
|
|||||||
await node.start()
|
await node.start()
|
||||||
|
|
||||||
# Subscribe to a topic
|
# Subscribe to a topic
|
||||||
let topic = "foobar"
|
let topic = cast[Topic]("foobar")
|
||||||
proc handler(topic: string, data: seq[byte]) {.async, gcsafe.} =
|
proc handler(topic: Topic, data: seq[byte]) {.async, gcsafe.} =
|
||||||
info "Hit subscribe handler", topic=topic, data=data, decoded=cast[string](data)
|
let message = WakuMessage.init(data).value
|
||||||
|
let payload = cast[string](message.payload)
|
||||||
|
info "Hit subscribe handler", topic=topic, payload=payload, contentTopic=message.contentTopic
|
||||||
node.subscribe(topic, handler)
|
node.subscribe(topic, handler)
|
||||||
|
|
||||||
# Publish to a topic
|
# Publish to a topic
|
||||||
let message = cast[seq[byte]]("hello world")
|
let payload = cast[seq[byte]]("hello world")
|
||||||
|
let message = WakuMessage(payload: payload, contentTopic: "foo")
|
||||||
node.publish(topic, message)
|
node.publish(topic, message)
|
||||||
|
|
||||||
# TODO Await with try/except here
|
# TODO Await with try/except here
|
||||||
|
|||||||
@ -1,6 +1,7 @@
|
|||||||
import
|
import
|
||||||
# Waku v2 tests
|
# Waku v2 tests
|
||||||
./v2/test_waku,
|
# TODO: enable this when it is altered into a proper waku relay test
|
||||||
|
# ./v2/test_waku,
|
||||||
./v2/test_wakunode,
|
./v2/test_wakunode,
|
||||||
./v2/test_waku_store,
|
./v2/test_waku_store,
|
||||||
./v2/test_waku_filter
|
./v2/test_waku_filter
|
||||||
|
|||||||
@ -20,11 +20,10 @@ procSuite "WakuNode":
|
|||||||
await node.start()
|
await node.start()
|
||||||
|
|
||||||
let
|
let
|
||||||
topic = "foobar"
|
topic = "foo"
|
||||||
message = ("hello world").toBytes
|
contentTopic = "foobar"
|
||||||
node.publish(topic, ContentFilter(contentTopic: topic), message)
|
wakuMessage = WakuMessage(payload: ("hello world").toBytes,
|
||||||
|
contentTopic: contentTopic)
|
||||||
|
|
||||||
let response = node.query(HistoryQuery(topics: @[topic]))
|
node.publish(topic, wakuMessage)
|
||||||
check:
|
# TODO: Add actual subscribe part with receival of message
|
||||||
response.messages.len == 1
|
|
||||||
response.messages[0] == message
|
|
||||||
|
|||||||
@ -22,10 +22,11 @@ proc setupWakuRPC*(node: WakuNode, rpcsrv: RpcServer) =
|
|||||||
result = WakuRelayCodec
|
result = WakuRelayCodec
|
||||||
|
|
||||||
# TODO: Implement symkey etc logic
|
# TODO: Implement symkey etc logic
|
||||||
rpcsrv.rpc("waku_publish") do(topic: string, message: seq[byte]) -> bool:
|
rpcsrv.rpc("waku_publish") do(topic: string, payload: seq[byte]) -> bool:
|
||||||
# XXX Why is casting necessary here but not in Nim node API?
|
# XXX Why is casting necessary here but not in Nim node API?
|
||||||
let wakuRelay = cast[WakuRelay](node.switch.pubSub.get())
|
let wakuRelay = cast[WakuRelay](node.switch.pubSub.get())
|
||||||
# XXX also future return type
|
# XXX also future return type
|
||||||
|
var message = WakuMessage(payload: payload, contentTopic: "foo")
|
||||||
discard wakuRelay.publish(topic, message)
|
discard wakuRelay.publish(topic, message)
|
||||||
return true
|
return true
|
||||||
#if not result:
|
#if not result:
|
||||||
|
|||||||
@ -1,13 +1,13 @@
|
|||||||
|
## Core Waku data types are defined here to avoid recursive dependencies.
|
||||||
|
##
|
||||||
|
## TODO Move more common data types here
|
||||||
|
|
||||||
import
|
import
|
||||||
chronos,
|
chronos,
|
||||||
libp2p/multiaddress,
|
libp2p/[switch, peerinfo, multiaddress, crypto/crypto],
|
||||||
libp2p/crypto/crypto,
|
libp2p/protobuf/minprotobuf
|
||||||
libp2p/peerinfo,
|
|
||||||
standard_setup
|
|
||||||
|
|
||||||
# Core Waku data types are defined here to avoid recursive dependencies.
|
# Common data types -----------------------------------------------------------
|
||||||
#
|
|
||||||
# TODO Move more common data types here
|
|
||||||
|
|
||||||
type
|
type
|
||||||
Topic* = string
|
Topic* = string
|
||||||
@ -16,7 +16,28 @@ type
|
|||||||
# NOTE based on Eth2Node in NBC eth2_network.nim
|
# NOTE based on Eth2Node in NBC eth2_network.nim
|
||||||
WakuNode* = ref object of RootObj
|
WakuNode* = ref object of RootObj
|
||||||
switch*: Switch
|
switch*: Switch
|
||||||
# XXX Unclear if we need this
|
|
||||||
peerInfo*: PeerInfo
|
peerInfo*: PeerInfo
|
||||||
libp2pTransportLoops*: seq[Future[void]]
|
libp2pTransportLoops*: seq[Future[void]]
|
||||||
messages*: seq[(Topic, Message)]
|
# TODO Revist messages field indexing as well as if this should be Message or WakuMessage
|
||||||
|
messages*: seq[(Topic, WakuMessage)]
|
||||||
|
|
||||||
|
WakuMessage* = object
|
||||||
|
payload*: seq[byte]
|
||||||
|
contentTopic*: string
|
||||||
|
|
||||||
|
# Encoding and decoding -------------------------------------------------------
|
||||||
|
|
||||||
|
proc init*(T: type WakuMessage, buffer: seq[byte]): ProtoResult[T] =
|
||||||
|
var msg = WakuMessage()
|
||||||
|
let pb = initProtoBuffer(buffer)
|
||||||
|
|
||||||
|
discard ? pb.getField(1, msg.payload)
|
||||||
|
discard ? pb.getField(2, msg.contentTopic)
|
||||||
|
|
||||||
|
ok(msg)
|
||||||
|
|
||||||
|
proc encode*(message: WakuMessage): ProtoBuffer =
|
||||||
|
result = initProtoBuffer()
|
||||||
|
|
||||||
|
result.write(1, message.payload)
|
||||||
|
result.write(2, message.contentTopic)
|
||||||
|
|||||||
@ -19,6 +19,7 @@ type
|
|||||||
PublicKey* = crypto.PublicKey
|
PublicKey* = crypto.PublicKey
|
||||||
PrivateKey* = crypto.PrivateKey
|
PrivateKey* = crypto.PrivateKey
|
||||||
|
|
||||||
|
# TODO Get rid of this and use waku_types one
|
||||||
Topic* = waku_types.Topic
|
Topic* = waku_types.Topic
|
||||||
Message* = seq[byte]
|
Message* = seq[byte]
|
||||||
ContentFilter* = object
|
ContentFilter* = object
|
||||||
@ -176,30 +177,24 @@ proc unsubscribe*(w: WakuNode, contentFilter: ContentFilter) =
|
|||||||
## Status: Not yet implemented.
|
## Status: Not yet implemented.
|
||||||
## TODO Implement.
|
## TODO Implement.
|
||||||
|
|
||||||
proc publish*(w: WakuNode, topic: Topic, message: Message) =
|
|
||||||
## Publish a `Message` to a PubSub topic.
|
proc publish*(node: WakuNode, topic: Topic, message: WakuMessage) =
|
||||||
|
## Publish a `WakuMessage` to a PubSub topic. `WakuMessage` should contain a
|
||||||
|
## `contentTopic` field for light node functionality. This field may be also
|
||||||
|
## be omitted.
|
||||||
##
|
##
|
||||||
## Status: Partially implemented.
|
## Status: Implemented.
|
||||||
##
|
##
|
||||||
## TODO WakuMessage OR seq[byte]. NOT PubSub Message.
|
|
||||||
let wakuSub = w.switch.pubSub.get()
|
# TODO Basic getter function for relay
|
||||||
|
let wakuRelay = cast[WakuRelay](node.switch.pubSub.get())
|
||||||
|
|
||||||
|
# XXX Unclear what the purpose of this is
|
||||||
|
# Commenting out as it is later expected to be Message type, not WakuMessage
|
||||||
|
#node.messages.insert((topic, message))
|
||||||
|
|
||||||
# XXX Consider awaiting here
|
# XXX Consider awaiting here
|
||||||
discard wakuSub.publish(topic, message)
|
discard wakuRelay.publish(topic, message)
|
||||||
|
|
||||||
proc publish*(w: WakuNode, topic: Topic, contentFilter: ContentFilter, message: Message) =
|
|
||||||
## Publish a `Message` to a PubSub topic with a specific content filter.
|
|
||||||
## Currently this means a `contentTopic`.
|
|
||||||
##
|
|
||||||
## Status: Not yet implemented.
|
|
||||||
## TODO Implement as wrapper around `waku_relay` and `publish`.
|
|
||||||
## TODO WakuMessage. Ensure content filter is in it.
|
|
||||||
|
|
||||||
w.messages.insert((contentFilter.contentTopic, message))
|
|
||||||
|
|
||||||
let wakuSub = w.switch.pubSub.get()
|
|
||||||
# XXX Consider awaiting here
|
|
||||||
|
|
||||||
discard wakuSub.publish(topic, message)
|
|
||||||
|
|
||||||
proc query*(w: WakuNode, query: HistoryQuery): HistoryResponse =
|
proc query*(w: WakuNode, query: HistoryQuery): HistoryResponse =
|
||||||
## Queries for historical messages.
|
## Queries for historical messages.
|
||||||
@ -212,7 +207,8 @@ proc query*(w: WakuNode, query: HistoryQuery): HistoryResponse =
|
|||||||
if msg[0] notin query.topics:
|
if msg[0] notin query.topics:
|
||||||
continue
|
continue
|
||||||
|
|
||||||
result.messages.insert(msg[1])
|
# XXX Unclear how this should be hooked up, Message or WakuMessage?
|
||||||
|
# result.messages.insert(msg[1])
|
||||||
|
|
||||||
when isMainModule:
|
when isMainModule:
|
||||||
let
|
let
|
||||||
|
|||||||
@ -6,12 +6,10 @@
|
|||||||
import
|
import
|
||||||
std/[strutils, tables],
|
std/[strutils, tables],
|
||||||
chronos, chronicles, metrics,
|
chronos, chronicles, metrics,
|
||||||
libp2p/protocols/pubsub/pubsub,
|
libp2p/protocols/pubsub/[pubsub, pubsubpeer, floodsub, gossipsub],
|
||||||
libp2p/protocols/pubsub/pubsubpeer,
|
libp2p/protocols/pubsub/rpc/messages,
|
||||||
libp2p/protocols/pubsub/floodsub,
|
|
||||||
libp2p/protocols/pubsub/gossipsub,
|
|
||||||
libp2p/protocols/pubsub/rpc/[messages],
|
|
||||||
libp2p/stream/connection,
|
libp2p/stream/connection,
|
||||||
|
../../node/v2/waku_types,
|
||||||
./filter
|
./filter
|
||||||
|
|
||||||
declarePublicGauge total_messages, "number of messages received"
|
declarePublicGauge total_messages, "number of messages received"
|
||||||
@ -23,13 +21,10 @@ const WakuRelayCodec* = "/vac/waku/relay/2.0.0-alpha2"
|
|||||||
|
|
||||||
type
|
type
|
||||||
WakuRelay* = ref object of GossipSub
|
WakuRelay* = ref object of GossipSub
|
||||||
# XXX: just playing
|
|
||||||
text*: string
|
|
||||||
gossipEnabled*: bool
|
gossipEnabled*: bool
|
||||||
|
filters*: Filters
|
||||||
|
|
||||||
filters: Filters
|
method init*(w: WakuRelay) =
|
||||||
|
|
||||||
method init(w: WakuRelay) =
|
|
||||||
debug "init"
|
debug "init"
|
||||||
proc handler(conn: Connection, proto: string) {.async.} =
|
proc handler(conn: Connection, proto: string) {.async.} =
|
||||||
## main protocol handler that gets triggered on every
|
## main protocol handler that gets triggered on every
|
||||||
@ -47,8 +42,6 @@ method init(w: WakuRelay) =
|
|||||||
|
|
||||||
method initPubSub*(w: WakuRelay) =
|
method initPubSub*(w: WakuRelay) =
|
||||||
debug "initWakuRelay"
|
debug "initWakuRelay"
|
||||||
w.text = "Foobar"
|
|
||||||
debug "w.text", text = w.text
|
|
||||||
|
|
||||||
# Not using GossipSub
|
# Not using GossipSub
|
||||||
w.gossipEnabled = false
|
w.gossipEnabled = false
|
||||||
@ -64,16 +57,11 @@ method subscribe*(w: WakuRelay,
|
|||||||
topic: string,
|
topic: string,
|
||||||
handler: TopicHandler) {.async.} =
|
handler: TopicHandler) {.async.} =
|
||||||
debug "subscribe", topic=topic
|
debug "subscribe", topic=topic
|
||||||
# XXX: Pubsub really
|
|
||||||
|
|
||||||
# XXX: This is what is called, I think
|
|
||||||
if w.gossipEnabled:
|
if w.gossipEnabled:
|
||||||
await procCall GossipSub(w).subscribe(topic, handler)
|
await procCall GossipSub(w).subscribe(topic, handler)
|
||||||
else:
|
else:
|
||||||
await procCall FloodSub(w).subscribe(topic, handler)
|
await procCall FloodSub(w).subscribe(topic, handler)
|
||||||
|
|
||||||
|
|
||||||
# Subscribing a peer to a specified topic
|
|
||||||
method subscribeTopic*(w: WakuRelay,
|
method subscribeTopic*(w: WakuRelay,
|
||||||
topic: string,
|
topic: string,
|
||||||
subscribe: bool,
|
subscribe: bool,
|
||||||
@ -117,8 +105,11 @@ method rpcHandler*(w: WakuRelay,
|
|||||||
|
|
||||||
method publish*(w: WakuRelay,
|
method publish*(w: WakuRelay,
|
||||||
topic: string,
|
topic: string,
|
||||||
data: seq[byte]): Future[int] {.async.} =
|
message: WakuMessage
|
||||||
debug "publish", topic=topic
|
): Future[int] {.async.} =
|
||||||
|
debug "publish", topic=topic, contentTopic=message.contentTopic
|
||||||
|
|
||||||
|
let data = message.encode().buffer
|
||||||
|
|
||||||
if w.gossipEnabled:
|
if w.gossipEnabled:
|
||||||
return await procCall GossipSub(w).publish(topic, data)
|
return await procCall GossipSub(w).publish(topic, data)
|
||||||
@ -133,7 +124,7 @@ method unsubscribe*(w: WakuRelay,
|
|||||||
else:
|
else:
|
||||||
await procCall FloodSub(w).unsubscribe(topics)
|
await procCall FloodSub(w).unsubscribe(topics)
|
||||||
|
|
||||||
# GossipSub specific methods
|
# GossipSub specific methods --------------------------------------------------
|
||||||
method start*(w: WakuRelay) {.async.} =
|
method start*(w: WakuRelay) {.async.} =
|
||||||
debug "start"
|
debug "start"
|
||||||
if w.gossipEnabled:
|
if w.gossipEnabled:
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user