diff --git a/examples/v2/basic2.nim b/examples/v2/basic2.nim index d12d92d98..a17aa892c 100644 --- a/examples/v2/basic2.nim +++ b/examples/v2/basic2.nim @@ -37,7 +37,7 @@ proc runBackground() {.async.} = # Publish to a topic let payload = cast[seq[byte]]("hello world") - let message = WakuMessage(payload: payload, contentTopic: "foo") + let message = WakuMessage(payload: payload, contentTopic: ContentTopic(1)) node.publish(topic, message) # TODO Await with try/except here diff --git a/examples/v2/chat2.nim b/examples/v2/chat2.nim index b456eaabe..b91756fe0 100644 --- a/examples/v2/chat2.nim +++ b/examples/v2/chat2.nim @@ -3,7 +3,7 @@ when not(compileOption("threads")): import std/[tables, strformat, strutils] import confutils, chronicles, chronos, stew/shims/net as stewNet, - eth/keys, bearssl + eth/keys, bearssl, stew/[byteutils, endians2] import libp2p/[switch, # manage transports, a single entry point for dialing and listening multistream, # tag stream with short header to identify it crypto/crypto, # cryptographic functions @@ -31,7 +31,9 @@ const Help = """ """ const DefaultTopic = "waku" -const DefaultContentTopic = "dingpu" + +const Dingpu = "dingpu".toBytes +const DefaultContentTopic = ContentTopic(uint32.fromBytes(Dingpu)) # XXX Connected is a bit annoying, because incoming connections don't trigger state change # Could poll connection pool or something here, I suppose diff --git a/tests/v2/test_waku_filter.nim b/tests/v2/test_waku_filter.nim index eaf4e290b..d0ea0b1b0 100644 --- a/tests/v2/test_waku_filter.nim +++ b/tests/v2/test_waku_filter.nim @@ -21,7 +21,8 @@ procSuite "Waku Filter": let key = PrivateKey.random(ECDSA, rng[]).get() peer = PeerInfo.init(key) - post = WakuMessage(payload: @[byte 1, 2, 3], contentTopic: "pew2") + contentTopic = ContentTopic(1) + post = WakuMessage(payload: @[byte 1, 2, 3], contentTopic: contentTopic) var dialSwitch = newStandardSwitch() discard await dialSwitch.start() @@ -38,7 +39,7 @@ procSuite "Waku Filter": let proto = WakuFilter.init(dialSwitch, crypto.newRng(), handle) - rpc = FilterRequest(contentFilters: @[ContentFilter(topics: @["pew", "pew2"])], topic: "topic") + rpc = FilterRequest(contentFilters: @[ContentFilter(topics: @[contentTopic])], topic: "topic") dialSwitch.mount(proto) proto.setPeer(listenSwitch.peerInfo) diff --git a/tests/v2/test_waku_store.nim b/tests/v2/test_waku_store.nim index 2c4603059..95c5bee1f 100644 --- a/tests/v2/test_waku_store.nim +++ b/tests/v2/test_waku_store.nim @@ -20,8 +20,9 @@ procSuite "Waku Store": let key = PrivateKey.random(ECDSA, rng[]).get() peer = PeerInfo.init(key) - msg = WakuMessage(payload: @[byte 1, 2, 3], contentTopic: "topic") - msg2 = WakuMessage(payload: @[byte 1, 2, 3], contentTopic: "topic2") + topic = ContentTopic(1) + msg = WakuMessage(payload: @[byte 1, 2, 3], contentTopic: topic) + msg2 = WakuMessage(payload: @[byte 1, 2, 3], contentTopic: ContentTopic(2)) var dialSwitch = newStandardSwitch() discard await dialSwitch.start() @@ -32,7 +33,7 @@ procSuite "Waku Store": let proto = WakuStore.init(dialSwitch, crypto.newRng()) subscription = proto.subscription() - rpc = HistoryQuery(topics: @["topic"]) + rpc = HistoryQuery(topics: @[topic]) proto.setPeer(listenSwitch.peerInfo) diff --git a/tests/v2/test_wakunode.nim b/tests/v2/test_wakunode.nim index e1752c562..026680168 100644 --- a/tests/v2/test_wakunode.nim +++ b/tests/v2/test_wakunode.nim @@ -19,7 +19,7 @@ procSuite "WakuNode": node = WakuNode.init(nodeKey, ValidIpAddress.init("0.0.0.0"), Port(60000)) pubSubTopic = "chat" - contentTopic = "foobar" + contentTopic = ContentTopic(1) filterRequest = FilterRequest(topic: pubSubTopic, contentFilters: @[ContentFilter(topics: @[contentTopic])]) message = WakuMessage(payload: "hello world".toBytes(), contentTopic: contentTopic) @@ -71,7 +71,7 @@ procSuite "WakuNode": node2 = WakuNode.init(nodeKey2, ValidIpAddress.init("0.0.0.0"), Port(60002)) pubSubTopic = "chat" - contentTopic = "foobar" + contentTopic = ContentTopic(1) filterRequest = FilterRequest(topic: pubSubTopic, contentFilters: @[ContentFilter(topics: @[contentTopic])]) message = WakuMessage(payload: "hello world".toBytes(), contentTopic: contentTopic) @@ -133,7 +133,7 @@ procSuite "WakuNode": nodeKey2 = crypto.PrivateKey.random(Secp256k1, rng[])[] node2 = WakuNode.init(nodeKey2, ValidIpAddress.init("0.0.0.0"), Port(60002)) - contentTopic = "foobar" + contentTopic = ContentTopic(1) message = WakuMessage(payload: "hello world".toBytes(), contentTopic: contentTopic) var completionFut = newFuture[bool]() @@ -169,7 +169,7 @@ procSuite "WakuNode": nodeKey2 = crypto.PrivateKey.random(Secp256k1, rng[])[] node2 = WakuNode.init(nodeKey2, ValidIpAddress.init("0.0.0.0"), Port(60002)) - contentTopic = "foobar" + contentTopic = ContentTopic(1) message = WakuMessage(payload: "hello world".toBytes(), contentTopic: contentTopic) var completionFut = newFuture[bool]() diff --git a/waku/node/v2/rpc/wakurpc.nim b/waku/node/v2/rpc/wakurpc.nim index 1ee8b205f..3d155b09e 100644 --- a/waku/node/v2/rpc/wakurpc.nim +++ b/waku/node/v2/rpc/wakurpc.nim @@ -57,17 +57,21 @@ proc setupWakuRPC*(node: WakuNode, rpcsrv: RpcServer) = #if not result: # raise newException(ValueError, "Message could not be posted") - rpcsrv.rpc("waku_query") do(topics: seq[string]) -> bool: + rpcsrv.rpc("waku_query") do(topics: seq[int]) -> bool: debug "waku_query" # XXX: Hacky in-line handler proc handler(response: HistoryResponse) {.gcsafe.} = info "Hit response handler", messages=response.messages - await node.query(HistoryQuery(topics: topics), handler) + var contentTopics = newSeq[ContentTopic]() + for topic in topics: + contentTopics.add(ContentTopic(topic)) + + await node.query(HistoryQuery(topics: contentTopics), handler) return true - rpcsrv.rpc("waku_subscribe_filter") do(topic: string, contentFilters: seq[seq[string]]) -> bool: + rpcsrv.rpc("waku_subscribe_filter") do(topic: string, contentFilters: seq[seq[int]]) -> bool: debug "waku_subscribe_filter" # XXX: Hacky in-line handler @@ -76,7 +80,10 @@ proc setupWakuRPC*(node: WakuNode, rpcsrv: RpcServer) = var filters = newSeq[ContentFilter]() for topics in contentFilters: - filters.add(ContentFilter(topics: topics)) + var contentTopics = newSeq[ContentTopic]() + for topic in topics: + contentTopics.add(ContentTopic(topic)) + filters.add(ContentFilter(topics: contentTopics)) await node.subscribe(FilterRequest(topic: topic, contentFilters: filters), handler) return true diff --git a/waku/node/v2/waku_types.nim b/waku/node/v2/waku_types.nim index 47e0a7722..b83265425 100644 --- a/waku/node/v2/waku_types.nim +++ b/waku/node/v2/waku_types.nim @@ -4,25 +4,26 @@ import std/[tables, times], - chronos, bearssl, stew/byteutils, + chronos, bearssl, stew/[byteutils, endians2], libp2p/[switch, peerinfo, multiaddress, crypto/crypto], libp2p/protobuf/minprotobuf, libp2p/protocols/protocol, libp2p/switch, libp2p/stream/connection, libp2p/protocols/pubsub/[pubsub, gossipsub], - nimcrypto/sha2, - stew/byteutils + nimcrypto/sha2 # Common data types ----------------------------------------------------------- type + ContentTopic* = uint32 + Topic* = string Message* = seq[byte] WakuMessage* = object payload*: seq[byte] - contentTopic*: string + contentTopic*: ContentTopic MessageNotificationHandler* = proc(topic: string, msg: WakuMessage): Future[void] {.gcsafe, closure.} @@ -51,7 +52,7 @@ type direction*: bool HistoryQuery* = object - topics*: seq[string] + topics*: seq[ContentTopic] pagingInfo*: PagingInfo HistoryResponse* = object @@ -102,7 +103,7 @@ type pushHandler*: MessagePushHandler ContentFilter* = object - topics*: seq[string] + topics*: seq[ContentTopic] ContentFilterHandler* = proc(msg: WakuMessage) {.gcsafe, closure.} @@ -179,7 +180,7 @@ proc computeIndex*(msg: WakuMessage): Index = ## Takes a WakuMessage and returns its index var ctx: sha256 ctx.init() - if msg.contentTopic.len != 0: # checks for non-empty contentTopic + if msg.contentTopic != 0: # checks for non-empty contentTopic ctx.update(msg.contentTopic.toBytes()) # converts the topic to bytes ctx.update(msg.payload) let digest = ctx.finish() # computes the hash diff --git a/waku/protocol/v2/waku_filter.nim b/waku/protocol/v2/waku_filter.nim index 0bc79171b..ae3dd3ea9 100644 --- a/waku/protocol/v2/waku_filter.nim +++ b/waku/protocol/v2/waku_filter.nim @@ -40,7 +40,7 @@ proc encode*(rpc: FilterRequest): ProtoBuffer = proc init*(T: type ContentFilter, buffer: seq[byte]): ProtoResult[T] = let pb = initProtoBuffer(buffer) - var topics: seq[string] + var topics: seq[ContentTopic] discard ? pb.getRepeatedField(1, topics) ok(ContentFilter(topics: topics)) diff --git a/waku/protocol/v2/waku_store.nim b/waku/protocol/v2/waku_store.nim index 116f11f5e..f5fa4b9e5 100644 --- a/waku/protocol/v2/waku_store.nim +++ b/waku/protocol/v2/waku_store.nim @@ -20,7 +20,7 @@ proc init*(T: type HistoryQuery, buffer: seq[byte]): ProtoResult[T] = var msg = HistoryQuery() let pb = initProtoBuffer(buffer) - var topics: seq[string] + var topics: seq[ContentTopic] discard ? pb.getRepeatedField(1, topics)