diff --git a/examples/v2/basic2.nim b/examples/v2/basic2.nim index 2ad659073..ac01c6e8f 100644 --- a/examples/v2/basic2.nim +++ b/examples/v2/basic2.nim @@ -35,7 +35,7 @@ proc runBackground() {.async.} = # Publish to a topic let payload = cast[seq[byte]]("hello world") - let message = WakuMessage(payload: payload, contentTopic: ContentTopic(1)) + let message = WakuMessage(payload: payload, contentTopic: ContentTopic("/waku/2/default-content/proto")) await node.publish(topic, message) # TODO Await with try/except here diff --git a/examples/v2/chat2.nim b/examples/v2/chat2.nim index 39c85b929..1306c75bd 100644 --- a/examples/v2/chat2.nim +++ b/examples/v2/chat2.nim @@ -36,8 +36,7 @@ const PayloadV1* {.booldefine.} = false DefaultTopic = "/waku/2/default-waku/proto" - Dingpu = "dingpu".toBytes - DefaultContentTopic = ContentTopic(uint32.fromBytes(Dingpu)) + DefaultContentTopic = ContentTopic("dingpu") # XXX Connected is a bit annoying, because incoming connections don't trigger state change # Could poll connection pool or something here, I suppose diff --git a/tests/v2/test_jsonrpc_waku.nim b/tests/v2/test_jsonrpc_waku.nim index a82ffe483..7d3542b16 100644 --- a/tests/v2/test_jsonrpc_waku.nim +++ b/tests/v2/test_jsonrpc_waku.nim @@ -32,6 +32,7 @@ createRpcSigs(RpcHttpClient, sigPath) procSuite "Waku v2 JSON-RPC API": const defaultTopic = "/waku/2/default-waku/proto" + const defaultContentTopic = ContentTopic("/waku/2/default-content/proto") const testCodec = "/waku/2/default-waku/codec" let @@ -99,7 +100,7 @@ procSuite "Waku v2 JSON-RPC API": response == true # Publish a message on the default topic - response = await client.post_waku_v2_relay_v1_message(defaultTopic, WakuRelayMessage(payload: @[byte 1], contentTopic: some(ContentTopic(1)))) + response = await client.post_waku_v2_relay_v1_message(defaultTopic, WakuRelayMessage(payload: @[byte 1], contentTopic: some(defaultContentTopic))) check: # @TODO poll topic to verify message has been published @@ -126,7 +127,7 @@ procSuite "Waku v2 JSON-RPC API": nodeKey3 = crypto.PrivateKey.random(Secp256k1, rng[])[] node3 = WakuNode.init(nodeKey3, bindIp, Port(60003), some(extIp), some(port)) pubSubTopic = "polling" - contentTopic = ContentTopic(1) + contentTopic = defaultContentTopic payload = @[byte 9] message = WakuMessage(payload: payload, contentTopic: contentTopic) @@ -241,16 +242,16 @@ procSuite "Waku v2 JSON-RPC API": # Now prime it with some history before tests var - msgList = @[WakuMessage(payload: @[byte 0], contentTopic: ContentTopic(2)), - WakuMessage(payload: @[byte 1], contentTopic: ContentTopic(1)), - WakuMessage(payload: @[byte 2], contentTopic: ContentTopic(1)), - WakuMessage(payload: @[byte 3], contentTopic: ContentTopic(1)), - WakuMessage(payload: @[byte 4], contentTopic: ContentTopic(1)), - WakuMessage(payload: @[byte 5], contentTopic: ContentTopic(1)), - WakuMessage(payload: @[byte 6], contentTopic: ContentTopic(1)), - WakuMessage(payload: @[byte 7], contentTopic: ContentTopic(1)), - WakuMessage(payload: @[byte 8], contentTopic: ContentTopic(1)), - WakuMessage(payload: @[byte 9], contentTopic: ContentTopic(2))] + msgList = @[WakuMessage(payload: @[byte 0], contentTopic: ContentTopic("2")), + WakuMessage(payload: @[byte 1], contentTopic: defaultContentTopic), + WakuMessage(payload: @[byte 2], contentTopic: defaultContentTopic), + WakuMessage(payload: @[byte 3], contentTopic: defaultContentTopic), + WakuMessage(payload: @[byte 4], contentTopic: defaultContentTopic), + WakuMessage(payload: @[byte 5], contentTopic: defaultContentTopic), + WakuMessage(payload: @[byte 6], contentTopic: defaultContentTopic), + WakuMessage(payload: @[byte 7], contentTopic: defaultContentTopic), + WakuMessage(payload: @[byte 8], contentTopic: defaultContentTopic), + WakuMessage(payload: @[byte 9], contentTopic: ContentTopic("2"))] for wakuMsg in msgList: waitFor subscriptions.notify(defaultTopic, wakuMsg) @@ -258,7 +259,7 @@ procSuite "Waku v2 JSON-RPC API": let client = newRpcHttpClient() await client.connect("127.0.0.1", rpcPort) - let response = await client.get_waku_v2_store_v1_messages(@[ContentTopic(1)], some(StorePagingOptions())) + let response = await client.get_waku_v2_store_v1_messages(@[defaultContentTopic], some(StorePagingOptions())) check: response.messages.len() == 8 response.pagingOptions.isNone @@ -290,8 +291,8 @@ procSuite "Waku v2 JSON-RPC API": # Light node has not yet subscribed to any filters node.filters.len() == 0 - let contentFilters = @[ContentFilter(topics: @[ContentTopic(1), ContentTopic(2)]), - ContentFilter(topics: @[ContentTopic(3), ContentTopic(4)])] + let contentFilters = @[ContentFilter(topics: @[defaultContentTopic, ContentTopic("2")]), + ContentFilter(topics: @[ContentTopic("3"), ContentTopic("4")])] var response = await client.post_waku_v2_filter_v1_subscription(contentFilters = contentFilters, topic = some(defaultTopic)) check: @@ -311,8 +312,6 @@ procSuite "Waku v2 JSON-RPC API": waitfor node.stop() asyncTest "Filter API: get latest messages": - const cTopic = ContentTopic(1) - waitFor node.start() # RPC server setup @@ -331,22 +330,22 @@ procSuite "Waku v2 JSON-RPC API": # First ensure subscription exists - let sub = await client.post_waku_v2_filter_v1_subscription(contentFilters = @[ContentFilter(topics: @[cTopic])], topic = some(defaultTopic)) + let sub = await client.post_waku_v2_filter_v1_subscription(contentFilters = @[ContentFilter(topics: @[defaultContentTopic])], topic = some(defaultTopic)) check: sub # Now prime the node with some messages before tests var - msgList = @[WakuMessage(payload: @[byte 0], contentTopic: ContentTopic(2)), - WakuMessage(payload: @[byte 1], contentTopic: cTopic), - WakuMessage(payload: @[byte 2], contentTopic: cTopic), - WakuMessage(payload: @[byte 3], contentTopic: cTopic), - WakuMessage(payload: @[byte 4], contentTopic: cTopic), - WakuMessage(payload: @[byte 5], contentTopic: cTopic), - WakuMessage(payload: @[byte 6], contentTopic: cTopic), - WakuMessage(payload: @[byte 7], contentTopic: cTopic), - WakuMessage(payload: @[byte 8], contentTopic: cTopic), - WakuMessage(payload: @[byte 9], contentTopic: ContentTopic(2))] + msgList = @[WakuMessage(payload: @[byte 0], contentTopic: ContentTopic("2")), + WakuMessage(payload: @[byte 1], contentTopic: defaultContentTopic), + WakuMessage(payload: @[byte 2], contentTopic: defaultContentTopic), + WakuMessage(payload: @[byte 3], contentTopic: defaultContentTopic), + WakuMessage(payload: @[byte 4], contentTopic: defaultContentTopic), + WakuMessage(payload: @[byte 5], contentTopic: defaultContentTopic), + WakuMessage(payload: @[byte 6], contentTopic: defaultContentTopic), + WakuMessage(payload: @[byte 7], contentTopic: defaultContentTopic), + WakuMessage(payload: @[byte 8], contentTopic: defaultContentTopic), + WakuMessage(payload: @[byte 9], contentTopic: ContentTopic("2"))] let filters = node.filters @@ -355,13 +354,13 @@ procSuite "Waku v2 JSON-RPC API": for wakuMsg in msgList: filters.notify(wakuMsg, requestId) - var response = await client.get_waku_v2_filter_v1_messages(cTopic) + var response = await client.get_waku_v2_filter_v1_messages(defaultContentTopic) check: response.len() == 8 - response.allIt(it.contentTopic == cTopic) + response.allIt(it.contentTopic == defaultContentTopic) # No new messages - response = await client.get_waku_v2_filter_v1_messages(cTopic) + response = await client.get_waku_v2_filter_v1_messages(defaultContentTopic) check: response.len() == 0 @@ -372,13 +371,13 @@ procSuite "Waku v2 JSON-RPC API": for x in 1..(maxSize + 1): # Try to cache 1 more than maximum allowed - filters.notify(WakuMessage(payload: @[byte x], contentTopic: cTopic), requestId) + filters.notify(WakuMessage(payload: @[byte x], contentTopic: defaultContentTopic), requestId) - response = await client.get_waku_v2_filter_v1_messages(cTopic) + response = await client.get_waku_v2_filter_v1_messages(defaultContentTopic) check: # Max messages has not been exceeded response.len == maxSize - response.allIt(it.contentTopic == cTopic) + response.allIt(it.contentTopic == defaultContentTopic) # Check that oldest item has been removed response[0].payload == @[byte 2] response[maxSize - 1].payload == @[byte (maxSize + 1)] @@ -496,7 +495,6 @@ procSuite "Waku v2 JSON-RPC API": await allFutures([node1.stop(), node2.stop(), node3.stop()]) asyncTest "Admin API: get unmanaged peer information": - const cTopic = ContentTopic(1) let nodeKey = crypto.PrivateKey.random(Secp256k1, rng[])[] node = WakuNode.init(nodeKey, ValidIpAddress.init("0.0.0.0"), @@ -561,7 +559,7 @@ procSuite "Waku v2 JSON-RPC API": nodeKey3 = crypto.PrivateKey.random(Secp256k1, rng[])[] node3 = WakuNode.init(nodeKey3, bindIp, Port(60003), some(extIp), some(port)) pubSubTopic = "polling" - contentTopic = ContentTopic(1) + contentTopic = defaultContentTopic payload = @[byte 9] message = WakuRelayMessage(payload: payload, contentTopic: some(contentTopic)) topicCache = newTable[string, seq[WakuMessage]]() @@ -651,7 +649,7 @@ procSuite "Waku v2 JSON-RPC API": nodeKey3 = crypto.PrivateKey.random(Secp256k1, rng[])[] node3 = WakuNode.init(nodeKey3, bindIp, Port(60003), some(extIp), some(port)) pubSubTopic = "polling" - contentTopic = ContentTopic(1) + contentTopic = defaultContentTopic payload = @[byte 9] message = WakuRelayMessage(payload: payload, contentTopic: some(contentTopic)) topicCache = newTable[string, seq[WakuMessage]]() diff --git a/tests/v2/test_message_store.nim b/tests/v2/test_message_store.nim index 8a2f4bb33..85ca00136 100644 --- a/tests/v2/test_message_store.nim +++ b/tests/v2/test_message_store.nim @@ -12,7 +12,7 @@ suite "Message Store": let database = SqliteDatabase.init("", inMemory = true)[] store = WakuMessageStore.init(database)[] - topic = ContentTopic(1) + topic = ContentTopic("/waku/2/default-content/proto") var msgs = @[ WakuMessage(payload: @[byte 1, 2, 3], contentTopic: topic), diff --git a/tests/v2/test_waku_bridge.nim b/tests/v2/test_waku_bridge.nim index c0243154d..c318e07ad 100644 --- a/tests/v2/test_waku_bridge.nim +++ b/tests/v2/test_waku_bridge.nim @@ -3,7 +3,7 @@ import std/strutils, testutils/unittests, - chronicles, chronos, stew/shims/net as stewNet, stew/byteutils, + chronicles, chronos, stew/shims/net as stewNet, stew/[byteutils, objects], libp2p/crypto/crypto, libp2p/crypto/secp, libp2p/peerid, @@ -44,8 +44,8 @@ procSuite "WakuBridge": v2NodeKey = crypto.PrivateKey.random(Secp256k1, rng[])[] v2Node = WakuNode.init(v2NodeKey, ValidIpAddress.init("0.0.0.0"), Port(60002)) - topic = [byte 0x00, 0, 0, byte 0x01] - contentTopic = ContentTopic(1) + contentTopic = ContentTopic("0001") + topic = toArray(4, contentTopic.toBytes()[0..3]) payloadV1 = "hello from V1".toBytes() payloadV2 = "hello from V2".toBytes() message = WakuMessage(payload: payloadV2, contentTopic: contentTopic) diff --git a/tests/v2/test_waku_filter.nim b/tests/v2/test_waku_filter.nim index 2cc3fdc4c..29579cef4 100644 --- a/tests/v2/test_waku_filter.nim +++ b/tests/v2/test_waku_filter.nim @@ -21,7 +21,7 @@ procSuite "Waku Filter": let key = PrivateKey.random(ECDSA, rng[]).get() peer = PeerInfo.init(key) - contentTopic = ContentTopic(1) + contentTopic = ContentTopic("/waku/2/default-content/proto") post = WakuMessage(payload: @[byte 1, 2, 3], contentTopic: contentTopic) var dialSwitch = newStandardSwitch() @@ -70,7 +70,7 @@ procSuite "Waku Filter": let key = PrivateKey.random(ECDSA, rng[]).get() peer = PeerInfo.init(key) - contentTopic = ContentTopic(1) + contentTopic = ContentTopic("/waku/2/default-content/proto") post = WakuMessage(payload: @[byte 1, 2, 3], contentTopic: contentTopic) var dialSwitch = newStandardSwitch() @@ -134,7 +134,7 @@ procSuite "Waku Filter": const defaultTopic = "/waku/2/default-waku/proto" let - contentTopic = ContentTopic(1) + contentTopic = ContentTopic("/waku/2/default-content/proto") var dialSwitch = newStandardSwitch() discard await dialSwitch.start() diff --git a/tests/v2/test_waku_pagination.nim b/tests/v2/test_waku_pagination.nim index 1c89da000..ecc221292 100644 --- a/tests/v2/test_waku_pagination.nim +++ b/tests/v2/test_waku_pagination.nim @@ -26,9 +26,9 @@ procSuite "pagination": index.receivedTime != 0 # the timestamp should be a non-zero value let - wm1 = WakuMessage(payload: @[byte 1, 2, 3], contentTopic: ContentTopic(1)) + wm1 = WakuMessage(payload: @[byte 1, 2, 3], contentTopic: ContentTopic("/waku/2/default-content/proto")) index1 = wm1.computeIndex() - wm2 = WakuMessage(payload: @[byte 1, 2, 3], contentTopic: ContentTopic(1)) + wm2 = WakuMessage(payload: @[byte 1, 2, 3], contentTopic: ContentTopic("/waku/2/default-content/proto")) index2 = wm2.computeIndex() check: diff --git a/tests/v2/test_waku_store.nim b/tests/v2/test_waku_store.nim index eca102919..b8b8a9b2f 100644 --- a/tests/v2/test_waku_store.nim +++ b/tests/v2/test_waku_store.nim @@ -15,13 +15,15 @@ import ../test_helpers, ./utils procSuite "Waku Store": + const defaultContentTopic = ContentTopic("1") + asyncTest "handle query": let key = PrivateKey.random(ECDSA, rng[]).get() peer = PeerInfo.init(key) - topic = ContentTopic(1) + topic = defaultContentTopic msg = WakuMessage(payload: @[byte 1, 2, 3], contentTopic: topic) - msg2 = WakuMessage(payload: @[byte 1, 2, 3], contentTopic: ContentTopic(2)) + msg2 = WakuMessage(payload: @[byte 1, 2, 3], contentTopic: ContentTopic("2")) var dialSwitch = newStandardSwitch() discard await dialSwitch.start() @@ -61,11 +63,11 @@ procSuite "Waku Store": let key = PrivateKey.random(ECDSA, rng[]).get() peer = PeerInfo.init(key) - topic = ContentTopic(1) + topic = defaultContentTopic database = SqliteDatabase.init("", inMemory = true)[] store = WakuMessageStore.init(database)[] msg = WakuMessage(payload: @[byte 1, 2, 3], contentTopic: topic) - msg2 = WakuMessage(payload: @[byte 1, 2, 3], contentTopic: ContentTopic(2)) + msg2 = WakuMessage(payload: @[byte 1, 2, 3], contentTopic: ContentTopic("2")) var dialSwitch = newStandardSwitch() discard await dialSwitch.start() @@ -129,16 +131,16 @@ procSuite "Waku Store": key = PrivateKey.random(ECDSA, rng[]).get() peer = PeerInfo.init(key) var - msgList = @[WakuMessage(payload: @[byte 0], contentTopic: ContentTopic(2)), - WakuMessage(payload: @[byte 1],contentTopic: ContentTopic(1)), - WakuMessage(payload: @[byte 2],contentTopic: ContentTopic(1)), - WakuMessage(payload: @[byte 3],contentTopic: ContentTopic(1)), - WakuMessage(payload: @[byte 4],contentTopic: ContentTopic(1)), - WakuMessage(payload: @[byte 5],contentTopic: ContentTopic(1)), - WakuMessage(payload: @[byte 6],contentTopic: ContentTopic(1)), - WakuMessage(payload: @[byte 7],contentTopic: ContentTopic(1)), - WakuMessage(payload: @[byte 8],contentTopic: ContentTopic(1)), - WakuMessage(payload: @[byte 9],contentTopic: ContentTopic(2))] + msgList = @[WakuMessage(payload: @[byte 0], contentTopic: ContentTopic("2")), + WakuMessage(payload: @[byte 1],contentTopic: defaultContentTopic), + WakuMessage(payload: @[byte 2],contentTopic: defaultContentTopic), + WakuMessage(payload: @[byte 3],contentTopic: defaultContentTopic), + WakuMessage(payload: @[byte 4],contentTopic: defaultContentTopic), + WakuMessage(payload: @[byte 5],contentTopic: defaultContentTopic), + WakuMessage(payload: @[byte 6],contentTopic: defaultContentTopic), + WakuMessage(payload: @[byte 7],contentTopic: defaultContentTopic), + WakuMessage(payload: @[byte 8],contentTopic: defaultContentTopic), + WakuMessage(payload: @[byte 9],contentTopic: ContentTopic("2"))] var dialSwitch = newStandardSwitch() discard await dialSwitch.start() @@ -149,7 +151,7 @@ procSuite "Waku Store": let proto = WakuStore.init(PeerManager.new(dialSwitch), crypto.newRng()) subscription = proto.subscription() - rpc = HistoryQuery(topics: @[ContentTopic(1)], pagingInfo: PagingInfo(pageSize: 2, direction: PagingDirection.FORWARD) ) + rpc = HistoryQuery(topics: @[defaultContentTopic], pagingInfo: PagingInfo(pageSize: 2, direction: PagingDirection.FORWARD) ) proto.setPeer(listenSwitch.peerInfo) @@ -181,16 +183,16 @@ procSuite "Waku Store": key = PrivateKey.random(ECDSA, rng[]).get() peer = PeerInfo.init(key) var - msgList = @[WakuMessage(payload: @[byte 0], contentTopic: ContentTopic(2)), - WakuMessage(payload: @[byte 1],contentTopic: ContentTopic(1)), - WakuMessage(payload: @[byte 2],contentTopic: ContentTopic(1)), - WakuMessage(payload: @[byte 3],contentTopic: ContentTopic(1)), - WakuMessage(payload: @[byte 4],contentTopic: ContentTopic(1)), - WakuMessage(payload: @[byte 5],contentTopic: ContentTopic(1)), - WakuMessage(payload: @[byte 6],contentTopic: ContentTopic(1)), - WakuMessage(payload: @[byte 7],contentTopic: ContentTopic(1)), - WakuMessage(payload: @[byte 8],contentTopic: ContentTopic(1)), - WakuMessage(payload: @[byte 9],contentTopic: ContentTopic(2))] + msgList = @[WakuMessage(payload: @[byte 0], contentTopic: ContentTopic("2")), + WakuMessage(payload: @[byte 1],contentTopic: defaultContentTopic), + WakuMessage(payload: @[byte 2],contentTopic: defaultContentTopic), + WakuMessage(payload: @[byte 3],contentTopic: defaultContentTopic), + WakuMessage(payload: @[byte 4],contentTopic: defaultContentTopic), + WakuMessage(payload: @[byte 5],contentTopic: defaultContentTopic), + WakuMessage(payload: @[byte 6],contentTopic: defaultContentTopic), + WakuMessage(payload: @[byte 7],contentTopic: defaultContentTopic), + WakuMessage(payload: @[byte 8],contentTopic: defaultContentTopic), + WakuMessage(payload: @[byte 9],contentTopic: ContentTopic("2"))] var dialSwitch = newStandardSwitch() discard await dialSwitch.start() @@ -220,7 +222,7 @@ procSuite "Waku Store": response.pagingInfo.cursor != Index() completionFut.complete(true) - let rpc = HistoryQuery(topics: @[ContentTopic(1)], pagingInfo: PagingInfo(pageSize: 2, direction: PagingDirection.BACKWARD) ) + let rpc = HistoryQuery(topics: @[defaultContentTopic], pagingInfo: PagingInfo(pageSize: 2, direction: PagingDirection.BACKWARD) ) await proto.query(rpc, handler) check: @@ -231,16 +233,16 @@ procSuite "Waku Store": key = PrivateKey.random(ECDSA, rng[]).get() peer = PeerInfo.init(key) var - msgList = @[WakuMessage(payload: @[byte 0], contentTopic: ContentTopic(2)), - WakuMessage(payload: @[byte 1], contentTopic: ContentTopic(1)), - WakuMessage(payload: @[byte 2], contentTopic: ContentTopic(1)), - WakuMessage(payload: @[byte 3], contentTopic: ContentTopic(1)), - WakuMessage(payload: @[byte 4], contentTopic: ContentTopic(1)), - WakuMessage(payload: @[byte 5], contentTopic: ContentTopic(1)), - WakuMessage(payload: @[byte 6], contentTopic: ContentTopic(1)), - WakuMessage(payload: @[byte 7], contentTopic: ContentTopic(1)), - WakuMessage(payload: @[byte 8], contentTopic: ContentTopic(1)), - WakuMessage(payload: @[byte 9], contentTopic: ContentTopic(2))] + msgList = @[WakuMessage(payload: @[byte 0], contentTopic: ContentTopic("2")), + WakuMessage(payload: @[byte 1], contentTopic: defaultContentTopic), + WakuMessage(payload: @[byte 2], contentTopic: defaultContentTopic), + WakuMessage(payload: @[byte 3], contentTopic: defaultContentTopic), + WakuMessage(payload: @[byte 4], contentTopic: defaultContentTopic), + WakuMessage(payload: @[byte 5], contentTopic: defaultContentTopic), + WakuMessage(payload: @[byte 6], contentTopic: defaultContentTopic), + WakuMessage(payload: @[byte 7], contentTopic: defaultContentTopic), + WakuMessage(payload: @[byte 8], contentTopic: defaultContentTopic), + WakuMessage(payload: @[byte 9], contentTopic: ContentTopic("2"))] var dialSwitch = newStandardSwitch() discard await dialSwitch.start() @@ -268,7 +270,7 @@ procSuite "Waku Store": response.pagingInfo == PagingInfo() completionFut.complete(true) - let rpc = HistoryQuery(topics: @[ContentTopic(1)] ) + let rpc = HistoryQuery(topics: @[defaultContentTopic] ) await proto.query(rpc, handler) @@ -277,7 +279,7 @@ procSuite "Waku Store": test "Index Protobuf encoder/decoder test": let - index = computeIndex(WakuMessage(payload: @[byte 1], contentTopic: ContentTopic(1))) + index = computeIndex(WakuMessage(payload: @[byte 1], contentTopic: defaultContentTopic)) pb = index.encode() decodedIndex = Index.init(pb.buffer) @@ -310,7 +312,7 @@ procSuite "Waku Store": test "PagingInfo Protobuf encod/init test": let - index = computeIndex(WakuMessage(payload: @[byte 1], contentTopic: ContentTopic(1))) + index = computeIndex(WakuMessage(payload: @[byte 1], contentTopic: defaultContentTopic)) pagingInfo = PagingInfo(pageSize: 1, cursor: index, direction: PagingDirection.BACKWARD) pb = pagingInfo.encode() decodedPagingInfo = PagingInfo.init(pb.buffer) @@ -332,9 +334,9 @@ procSuite "Waku Store": test "HistoryQuery Protobuf encode/init test": let - index = computeIndex(WakuMessage(payload: @[byte 1], contentTopic: ContentTopic(1))) + index = computeIndex(WakuMessage(payload: @[byte 1], contentTopic: defaultContentTopic)) pagingInfo = PagingInfo(pageSize: 1, cursor: index, direction: PagingDirection.BACKWARD) - query=HistoryQuery(topics: @[ContentTopic(1)], pagingInfo: pagingInfo, startTime: float64(10), endTime: float64(11)) + query=HistoryQuery(topics: @[defaultContentTopic], pagingInfo: pagingInfo, startTime: float64(10), endTime: float64(11)) pb = query.encode() decodedQuery = HistoryQuery.init(pb.buffer) @@ -355,7 +357,7 @@ procSuite "Waku Store": test "HistoryResponse Protobuf encod/init test": let - wm = WakuMessage(payload: @[byte 1], contentTopic: ContentTopic(1)) + wm = WakuMessage(payload: @[byte 1], contentTopic: defaultContentTopic) index = computeIndex(wm) pagingInfo = PagingInfo(pageSize: 1, cursor: index, direction: PagingDirection.BACKWARD) res = HistoryResponse(messages: @[wm], pagingInfo:pagingInfo) diff --git a/tests/v2/test_waku_swap.nim b/tests/v2/test_waku_swap.nim index f2fcfed75..5ef7832d2 100644 --- a/tests/v2/test_waku_swap.nim +++ b/tests/v2/test_waku_swap.nim @@ -54,7 +54,7 @@ procSuite "Waku SWAP Accounting": nodeKey2 = crypto.PrivateKey.random(Secp256k1, rng[])[] node2 = WakuNode.init(nodeKey2, ValidIpAddress.init("0.0.0.0"), Port(60001)) - contentTopic = ContentTopic(1) + contentTopic = ContentTopic("/waku/2/default-content/proto") message = WakuMessage(payload: "hello world".toBytes(), contentTopic: contentTopic) var completionFut = newFuture[bool]() @@ -100,7 +100,7 @@ procSuite "Waku SWAP Accounting": nodeKey2 = crypto.PrivateKey.random(Secp256k1, rng[])[] node2 = WakuNode.init(nodeKey2, ValidIpAddress.init("0.0.0.0"), Port(60001)) - contentTopic = ContentTopic(1) + contentTopic = ContentTopic("/waku/2/default-content/proto") message = WakuMessage(payload: "hello world".toBytes(), contentTopic: contentTopic) var futures = [newFuture[bool](), newFuture[bool]()] diff --git a/tests/v2/test_wakunode.nim b/tests/v2/test_wakunode.nim index cb33697ca..b89167090 100644 --- a/tests/v2/test_wakunode.nim +++ b/tests/v2/test_wakunode.nim @@ -27,7 +27,7 @@ procSuite "WakuNode": node = WakuNode.init(nodeKey, ValidIpAddress.init("0.0.0.0"), Port(60000)) pubSubTopic = "chat" - contentTopic = ContentTopic(1) + contentTopic = ContentTopic("/waku/2/default-content/proto") filterRequest = FilterRequest(topic: pubSubTopic, contentFilters: @[ContentFilter(topics: @[contentTopic])], subscribe: true) message = WakuMessage(payload: "hello world".toBytes(), contentTopic: contentTopic) @@ -79,7 +79,7 @@ procSuite "WakuNode": node2 = WakuNode.init(nodeKey2, ValidIpAddress.init("0.0.0.0"), Port(60002)) pubSubTopic = "chat" - contentTopic = ContentTopic(1) + contentTopic = ContentTopic("/waku/2/default-content/proto") filterRequest = FilterRequest(topic: pubSubTopic, contentFilters: @[ContentFilter(topics: @[contentTopic])], subscribe: true) message = WakuMessage(payload: "hello world".toBytes(), contentTopic: contentTopic) @@ -141,7 +141,7 @@ procSuite "WakuNode": nodeKey2 = crypto.PrivateKey.random(Secp256k1, rng[])[] node2 = WakuNode.init(nodeKey2, ValidIpAddress.init("0.0.0.0"), Port(60002)) - contentTopic = ContentTopic(1) + contentTopic = ContentTopic("/waku/2/default-content/proto") message = WakuMessage(payload: "hello world".toBytes(), contentTopic: contentTopic) var completionFut = newFuture[bool]() @@ -177,7 +177,7 @@ procSuite "WakuNode": nodeKey2 = crypto.PrivateKey.random(Secp256k1, rng[])[] node2 = WakuNode.init(nodeKey2, ValidIpAddress.init("0.0.0.0"), Port(60002)) - contentTopic = ContentTopic(1) + contentTopic = ContentTopic("/waku/2/default-content/proto") message = WakuMessage(payload: "hello world".toBytes(), contentTopic: contentTopic) var completionFut = newFuture[bool]() @@ -219,7 +219,7 @@ procSuite "WakuNode": node3 = WakuNode.init(nodeKey3, ValidIpAddress.init("0.0.0.0"), Port(60003)) pubSubTopic = "test" - contentTopic = ContentTopic(1) + contentTopic = ContentTopic("/waku/2/default-content/proto") payload = "hello world".toBytes() message = WakuMessage(payload: payload, contentTopic: contentTopic) @@ -317,12 +317,12 @@ procSuite "WakuNode": node3 = WakuNode.init(nodeKey3, ValidIpAddress.init("0.0.0.0"), Port(60003)) pubSubTopic = "test" - contentTopic1 = ContentTopic(1) + contentTopic1 = ContentTopic("/waku/2/default-content/proto") payload = "hello world".toBytes() message1 = WakuMessage(payload: payload, contentTopic: contentTopic1) payload2 = "you should not see this message!".toBytes() - contentTopic2 = ContentTopic(2) + contentTopic2 = ContentTopic("2") message2 = WakuMessage(payload: payload2, contentTopic: contentTopic2) # start all the nodes @@ -410,7 +410,7 @@ procSuite "WakuNode": node3 = WakuNode.init(nodeKey3, ValidIpAddress.init("0.0.0.0"), Port(60003)) pubSubTopic = "defaultTopic" - contentTopic1 = ContentTopic(1) + contentTopic1 = ContentTopic("/waku/2/default-content/proto") payload = "hello world".toBytes() message1 = WakuMessage(payload: payload, contentTopic: contentTopic1) diff --git a/waku/common/wakubridge.nim b/waku/common/wakubridge.nim index 8ecbf1d8a..d3e59362b 100644 --- a/waku/common/wakubridge.nim +++ b/waku/common/wakubridge.nim @@ -1,7 +1,7 @@ import std/tables, chronos, confutils, chronicles, chronicles/topics_registry, metrics, - stew/endians2, + stew/[byteutils, objects], stew/shims/net as stewNet, json_rpc/rpcserver, # Waku v1 imports eth/[keys, p2p], eth/common/utils, @@ -44,17 +44,22 @@ type func toWakuMessage(env: Envelope): WakuMessage = # Translate a Waku v1 envelope to a Waku v2 message WakuMessage(payload: env.data, - contentTopic: ContentTopic(uint32.fromBytes(env.topic, Endianness.bigEndian)), + contentTopic: ContentTopic(string.fromBytes(env.topic)), version: 1) proc toWakuV2(bridge: WakuBridge, env: Envelope) {.async.} = waku_bridge_transfers.inc(labelValues = ["v1_to_v2"]) + await bridge.nodev2.publish(defaultBridgeTopic, env.toWakuMessage()) proc toWakuV1(bridge: WakuBridge, msg: WakuMessage) {.gcsafe.} = waku_bridge_transfers.inc(labelValues = ["v2_to_v1"]) + + # @TODO: use namespacing to map v2 contentTopics to v1 topics + let v1TopicSeq = msg.contentTopic.toBytes()[0..3] + discard bridge.nodev1.postMessage(ttl = defaultTTL, - topic = msg.contentTopic.toBytes(Endianness.bigEndian), + topic = toArray(4, v1TopicSeq), payload = msg.payload) ############## diff --git a/waku/v2/node/jsonrpc/admin_api.nim b/waku/v2/node/jsonrpc/admin_api.nim index 21686ec4a..5ae06e758 100644 --- a/waku/v2/node/jsonrpc/admin_api.nim +++ b/waku/v2/node/jsonrpc/admin_api.nim @@ -43,7 +43,7 @@ proc installAdminApiHandlers*(node: WakuNode, rpcsrv: RpcServer) = raise newException(ValueError, "Failed to connect to peers: " & $peers) rpcsrv.rpc("get_waku_v2_admin_v1_peers") do() -> seq[WakuPeer]: - ## Returns history for a list of content topics with optional paging + ## Returns a list of peers registered for this node debug "get_waku_v2_admin_v1_peers" # Create a single list of peers from mounted protocols. diff --git a/waku/v2/node/jsonrpc/jsonrpc_utils.nim b/waku/v2/node/jsonrpc/jsonrpc_utils.nim index 92347feec..82facd700 100644 --- a/waku/v2/node/jsonrpc/jsonrpc_utils.nim +++ b/waku/v2/node/jsonrpc/jsonrpc_utils.nim @@ -3,6 +3,7 @@ import eth/keys, ../../../v1/node/rpc/hexstrings, ../../protocol/waku_store/waku_store_types, + ../../protocol/waku_message, ../waku_payload, ./jsonrpc_types @@ -37,15 +38,14 @@ proc toStoreResponse*(historyResponse: HistoryResponse): StoreResponse = pagingOptions: if historyResponse.pagingInfo != PagingInfo(): some(historyResponse.pagingInfo.toPagingOptions()) else: none(StorePagingOptions)) proc toWakuMessage*(relayMessage: WakuRelayMessage, version: uint32): WakuMessage = - # @TODO global definition for default content topic - const defaultCT = 0 + const defaultCT = ContentTopic("/waku/2/default-content/proto") WakuMessage(payload: relayMessage.payload, contentTopic: if relayMessage.contentTopic.isSome: relayMessage.contentTopic.get else: defaultCT, version: version) proc toWakuMessage*(relayMessage: WakuRelayMessage, version: uint32, rng: ref BrHmacDrbgContext, symkey: Option[SymKey], pubKey: Option[keys.PublicKey]): WakuMessage = # @TODO global definition for default content topic - const defaultCT = 0 + const defaultCT = ContentTopic("/waku/2/default-content/proto") let payload = Payload(payload: relayMessage.payload, dst: pubKey, diff --git a/waku/v2/node/quicksim2.nim b/waku/v2/node/quicksim2.nim index c8a524fd1..83f37918b 100644 --- a/waku/v2/node/quicksim2.nim +++ b/waku/v2/node/quicksim2.nim @@ -18,7 +18,7 @@ createRpcSigs(RpcHttpClient, sigWakuPath) const defaultTopic = "/waku/2/default-waku/proto" -const defaultContentTopic = ContentTopic(1) +const defaultContentTopic = ContentTopic("waku/2/default-content/proto") const topicAmount = 10 #100 diff --git a/waku/v2/node/scripts/rpc_publish.nim b/waku/v2/node/scripts/rpc_publish.nim index 7edef3055..a23682a23 100644 --- a/waku/v2/node/scripts/rpc_publish.nim +++ b/waku/v2/node/scripts/rpc_publish.nim @@ -33,7 +33,7 @@ var node = newRpcHttpClient() waitfor node.connect("localhost", rpcPort) let pubSubTopic = "/waku/2/default-waku/proto" -let contentTopic = ContentTopic(1) +let contentTopic = ContentTopic("/waku/2/default-content/proto") let relayMessage = WakuRelayMessage(payload: input.toBytes(), contentTopic: some(contentTopic)) var res = waitfor node.post_waku_v2_relay_v1_message(pubSubTopic, relayMessage) echo "Waku publish response: ", res diff --git a/waku/v2/node/scripts/rpc_query.nim b/waku/v2/node/scripts/rpc_query.nim index 97fd9e318..c8aeb8e67 100644 --- a/waku/v2/node/scripts/rpc_query.nim +++ b/waku/v2/node/scripts/rpc_query.nim @@ -32,5 +32,5 @@ echo "Input is:", input var node = newRpcHttpClient() waitfor node.connect("localhost", rpcPort) -var res = waitfor node.get_waku_v2_store_v1_messages(@[ContentTopic(parseUInt(input))], none(StorePagingOptions)) +var res = waitfor node.get_waku_v2_store_v1_messages(@[ContentTopic(input)], none(StorePagingOptions)) echo "Waku query response: ", res diff --git a/waku/v2/node/scripts/rpc_subscribe_filter.nim b/waku/v2/node/scripts/rpc_subscribe_filter.nim index be8e86167..ad7f14190 100644 --- a/waku/v2/node/scripts/rpc_subscribe_filter.nim +++ b/waku/v2/node/scripts/rpc_subscribe_filter.nim @@ -33,6 +33,6 @@ var node = newRpcHttpClient() waitfor node.connect("localhost", rpcPort) let pubSubTopic = "/waku/2/default-waku/proto" -let contentTopic = ContentTopic(1) +let contentTopic = ContentTopic("/waku/2/default-content/proto") var res = waitfor node.post_waku_v2_filter_v1_subscription(@[ContentFilter(topics: @[contentTopic])], some(pubSubTopic)) echo "Waku query response: ", res diff --git a/waku/v2/node/storage/message/waku_message_store.nim b/waku/v2/node/storage/message/waku_message_store.nim index a4fa09299..aaa8925e9 100644 --- a/waku/v2/node/storage/message/waku_message_store.nim +++ b/waku/v2/node/storage/message/waku_message_store.nim @@ -5,7 +5,7 @@ import libp2p/protocols/protocol, libp2p/protobuf/minprotobuf, libp2p/stream/connection, - stew/results, + stew/[byteutils, results], ./message_store, ../sqlite, ../../../protocol/waku_message, @@ -31,7 +31,7 @@ proc init*(T: type WakuMessageStore, db: SqliteDatabase): MessageStoreResult[T] CREATE TABLE IF NOT EXISTS messages ( id BLOB PRIMARY KEY, timestamp INTEGER NOT NULL, - contentTopic INTEGER NOT NULL, + contentTopic BLOB NOT NULL, payload BLOB ) WITHOUT ROWID; """, NoParams, void) @@ -57,14 +57,14 @@ method put*(db: WakuMessageStore, cursor: Index, message: WakuMessage): MessageS ## let prepare = db.database.prepareStmt( "INSERT INTO messages (id, timestamp, contentTopic, payload) VALUES (?, ?, ?, ?);", - (seq[byte], int64, uint32, seq[byte]), + (seq[byte], int64, seq[byte], seq[byte]), void ) if prepare.isErr: return err("failed to prepare") - let res = prepare.value.exec((@(cursor.digest.data), int64(cursor.receivedTime), message.contentTopic, message.payload)) + let res = prepare.value.exec((@(cursor.digest.data), int64(cursor.receivedTime), message.contentTopic.toBytes(), message.payload)) if res.isErr: return err("failed") @@ -87,11 +87,14 @@ method getAll*(db: WakuMessageStore, onData: message_store.DataProc): MessageSto gotMessages = true let timestamp = sqlite3_column_int64(s, 0) - topic = sqlite3_column_int(s, 1) + topic = cast[ptr UncheckedArray[byte]](sqlite3_column_blob(s, 1)) + topicL = sqlite3_column_bytes(s,1) p = cast[ptr UncheckedArray[byte]](sqlite3_column_blob(s, 2)) l = sqlite3_column_bytes(s, 2) - onData(uint64(timestamp), WakuMessage(contentTopic: ContentTopic(int(topic)), payload: @(toOpenArray(p, 0, l-1)))) + onData(uint64(timestamp), + WakuMessage(contentTopic: ContentTopic(string.fromBytes(@(toOpenArray(topic, 0, topicL-1)))), + payload: @(toOpenArray(p, 0, l-1)))) let res = db.database.query("SELECT timestamp, contentTopic, payload FROM messages ORDER BY timestamp ASC", msg) if res.isErr: diff --git a/waku/v2/protocol/waku_message.nim b/waku/v2/protocol/waku_message.nim index 9661faab3..73ea690f8 100644 --- a/waku/v2/protocol/waku_message.nim +++ b/waku/v2/protocol/waku_message.nim @@ -10,7 +10,7 @@ import libp2p/protobuf/minprotobuf type - ContentTopic* = uint32 + ContentTopic* = string WakuMessage* = object payload*: seq[byte]