mirror of https://github.com/waku-org/nwaku.git
Change contentTopic to string (#463)
* Change contentTopic to string * Missed a spot * Try to fix Windows CI Co-authored-by: Oskar Thorén <ot@oskarthoren.com>
This commit is contained in:
parent
139f148037
commit
5747ff5be0
|
@ -35,7 +35,7 @@ proc runBackground() {.async.} =
|
|||
|
||||
# Publish to a topic
|
||||
let payload = cast[seq[byte]]("hello world")
|
||||
let message = WakuMessage(payload: payload, contentTopic: ContentTopic(1))
|
||||
let message = WakuMessage(payload: payload, contentTopic: ContentTopic("/waku/2/default-content/proto"))
|
||||
await node.publish(topic, message)
|
||||
|
||||
# TODO Await with try/except here
|
||||
|
|
|
@ -36,8 +36,7 @@ const
|
|||
PayloadV1* {.booldefine.} = false
|
||||
DefaultTopic = "/waku/2/default-waku/proto"
|
||||
|
||||
Dingpu = "dingpu".toBytes
|
||||
DefaultContentTopic = ContentTopic(uint32.fromBytes(Dingpu))
|
||||
DefaultContentTopic = ContentTopic("dingpu")
|
||||
|
||||
# XXX Connected is a bit annoying, because incoming connections don't trigger state change
|
||||
# Could poll connection pool or something here, I suppose
|
||||
|
|
|
@ -32,6 +32,7 @@ createRpcSigs(RpcHttpClient, sigPath)
|
|||
|
||||
procSuite "Waku v2 JSON-RPC API":
|
||||
const defaultTopic = "/waku/2/default-waku/proto"
|
||||
const defaultContentTopic = ContentTopic("/waku/2/default-content/proto")
|
||||
const testCodec = "/waku/2/default-waku/codec"
|
||||
|
||||
let
|
||||
|
@ -99,7 +100,7 @@ procSuite "Waku v2 JSON-RPC API":
|
|||
response == true
|
||||
|
||||
# Publish a message on the default topic
|
||||
response = await client.post_waku_v2_relay_v1_message(defaultTopic, WakuRelayMessage(payload: @[byte 1], contentTopic: some(ContentTopic(1))))
|
||||
response = await client.post_waku_v2_relay_v1_message(defaultTopic, WakuRelayMessage(payload: @[byte 1], contentTopic: some(defaultContentTopic)))
|
||||
|
||||
check:
|
||||
# @TODO poll topic to verify message has been published
|
||||
|
@ -126,7 +127,7 @@ procSuite "Waku v2 JSON-RPC API":
|
|||
nodeKey3 = crypto.PrivateKey.random(Secp256k1, rng[])[]
|
||||
node3 = WakuNode.init(nodeKey3, bindIp, Port(60003), some(extIp), some(port))
|
||||
pubSubTopic = "polling"
|
||||
contentTopic = ContentTopic(1)
|
||||
contentTopic = defaultContentTopic
|
||||
payload = @[byte 9]
|
||||
message = WakuMessage(payload: payload, contentTopic: contentTopic)
|
||||
|
||||
|
@ -241,16 +242,16 @@ procSuite "Waku v2 JSON-RPC API":
|
|||
|
||||
# Now prime it with some history before tests
|
||||
var
|
||||
msgList = @[WakuMessage(payload: @[byte 0], contentTopic: ContentTopic(2)),
|
||||
WakuMessage(payload: @[byte 1], contentTopic: ContentTopic(1)),
|
||||
WakuMessage(payload: @[byte 2], contentTopic: ContentTopic(1)),
|
||||
WakuMessage(payload: @[byte 3], contentTopic: ContentTopic(1)),
|
||||
WakuMessage(payload: @[byte 4], contentTopic: ContentTopic(1)),
|
||||
WakuMessage(payload: @[byte 5], contentTopic: ContentTopic(1)),
|
||||
WakuMessage(payload: @[byte 6], contentTopic: ContentTopic(1)),
|
||||
WakuMessage(payload: @[byte 7], contentTopic: ContentTopic(1)),
|
||||
WakuMessage(payload: @[byte 8], contentTopic: ContentTopic(1)),
|
||||
WakuMessage(payload: @[byte 9], contentTopic: ContentTopic(2))]
|
||||
msgList = @[WakuMessage(payload: @[byte 0], contentTopic: ContentTopic("2")),
|
||||
WakuMessage(payload: @[byte 1], contentTopic: defaultContentTopic),
|
||||
WakuMessage(payload: @[byte 2], contentTopic: defaultContentTopic),
|
||||
WakuMessage(payload: @[byte 3], contentTopic: defaultContentTopic),
|
||||
WakuMessage(payload: @[byte 4], contentTopic: defaultContentTopic),
|
||||
WakuMessage(payload: @[byte 5], contentTopic: defaultContentTopic),
|
||||
WakuMessage(payload: @[byte 6], contentTopic: defaultContentTopic),
|
||||
WakuMessage(payload: @[byte 7], contentTopic: defaultContentTopic),
|
||||
WakuMessage(payload: @[byte 8], contentTopic: defaultContentTopic),
|
||||
WakuMessage(payload: @[byte 9], contentTopic: ContentTopic("2"))]
|
||||
|
||||
for wakuMsg in msgList:
|
||||
waitFor subscriptions.notify(defaultTopic, wakuMsg)
|
||||
|
@ -258,7 +259,7 @@ procSuite "Waku v2 JSON-RPC API":
|
|||
let client = newRpcHttpClient()
|
||||
await client.connect("127.0.0.1", rpcPort)
|
||||
|
||||
let response = await client.get_waku_v2_store_v1_messages(@[ContentTopic(1)], some(StorePagingOptions()))
|
||||
let response = await client.get_waku_v2_store_v1_messages(@[defaultContentTopic], some(StorePagingOptions()))
|
||||
check:
|
||||
response.messages.len() == 8
|
||||
response.pagingOptions.isNone
|
||||
|
@ -290,8 +291,8 @@ procSuite "Waku v2 JSON-RPC API":
|
|||
# Light node has not yet subscribed to any filters
|
||||
node.filters.len() == 0
|
||||
|
||||
let contentFilters = @[ContentFilter(topics: @[ContentTopic(1), ContentTopic(2)]),
|
||||
ContentFilter(topics: @[ContentTopic(3), ContentTopic(4)])]
|
||||
let contentFilters = @[ContentFilter(topics: @[defaultContentTopic, ContentTopic("2")]),
|
||||
ContentFilter(topics: @[ContentTopic("3"), ContentTopic("4")])]
|
||||
var response = await client.post_waku_v2_filter_v1_subscription(contentFilters = contentFilters, topic = some(defaultTopic))
|
||||
|
||||
check:
|
||||
|
@ -311,8 +312,6 @@ procSuite "Waku v2 JSON-RPC API":
|
|||
waitfor node.stop()
|
||||
|
||||
asyncTest "Filter API: get latest messages":
|
||||
const cTopic = ContentTopic(1)
|
||||
|
||||
waitFor node.start()
|
||||
|
||||
# RPC server setup
|
||||
|
@ -331,22 +330,22 @@ procSuite "Waku v2 JSON-RPC API":
|
|||
|
||||
# First ensure subscription exists
|
||||
|
||||
let sub = await client.post_waku_v2_filter_v1_subscription(contentFilters = @[ContentFilter(topics: @[cTopic])], topic = some(defaultTopic))
|
||||
let sub = await client.post_waku_v2_filter_v1_subscription(contentFilters = @[ContentFilter(topics: @[defaultContentTopic])], topic = some(defaultTopic))
|
||||
check:
|
||||
sub
|
||||
|
||||
# Now prime the node with some messages before tests
|
||||
var
|
||||
msgList = @[WakuMessage(payload: @[byte 0], contentTopic: ContentTopic(2)),
|
||||
WakuMessage(payload: @[byte 1], contentTopic: cTopic),
|
||||
WakuMessage(payload: @[byte 2], contentTopic: cTopic),
|
||||
WakuMessage(payload: @[byte 3], contentTopic: cTopic),
|
||||
WakuMessage(payload: @[byte 4], contentTopic: cTopic),
|
||||
WakuMessage(payload: @[byte 5], contentTopic: cTopic),
|
||||
WakuMessage(payload: @[byte 6], contentTopic: cTopic),
|
||||
WakuMessage(payload: @[byte 7], contentTopic: cTopic),
|
||||
WakuMessage(payload: @[byte 8], contentTopic: cTopic),
|
||||
WakuMessage(payload: @[byte 9], contentTopic: ContentTopic(2))]
|
||||
msgList = @[WakuMessage(payload: @[byte 0], contentTopic: ContentTopic("2")),
|
||||
WakuMessage(payload: @[byte 1], contentTopic: defaultContentTopic),
|
||||
WakuMessage(payload: @[byte 2], contentTopic: defaultContentTopic),
|
||||
WakuMessage(payload: @[byte 3], contentTopic: defaultContentTopic),
|
||||
WakuMessage(payload: @[byte 4], contentTopic: defaultContentTopic),
|
||||
WakuMessage(payload: @[byte 5], contentTopic: defaultContentTopic),
|
||||
WakuMessage(payload: @[byte 6], contentTopic: defaultContentTopic),
|
||||
WakuMessage(payload: @[byte 7], contentTopic: defaultContentTopic),
|
||||
WakuMessage(payload: @[byte 8], contentTopic: defaultContentTopic),
|
||||
WakuMessage(payload: @[byte 9], contentTopic: ContentTopic("2"))]
|
||||
|
||||
let
|
||||
filters = node.filters
|
||||
|
@ -355,13 +354,13 @@ procSuite "Waku v2 JSON-RPC API":
|
|||
for wakuMsg in msgList:
|
||||
filters.notify(wakuMsg, requestId)
|
||||
|
||||
var response = await client.get_waku_v2_filter_v1_messages(cTopic)
|
||||
var response = await client.get_waku_v2_filter_v1_messages(defaultContentTopic)
|
||||
check:
|
||||
response.len() == 8
|
||||
response.allIt(it.contentTopic == cTopic)
|
||||
response.allIt(it.contentTopic == defaultContentTopic)
|
||||
|
||||
# No new messages
|
||||
response = await client.get_waku_v2_filter_v1_messages(cTopic)
|
||||
response = await client.get_waku_v2_filter_v1_messages(defaultContentTopic)
|
||||
|
||||
check:
|
||||
response.len() == 0
|
||||
|
@ -372,13 +371,13 @@ procSuite "Waku v2 JSON-RPC API":
|
|||
|
||||
for x in 1..(maxSize + 1):
|
||||
# Try to cache 1 more than maximum allowed
|
||||
filters.notify(WakuMessage(payload: @[byte x], contentTopic: cTopic), requestId)
|
||||
filters.notify(WakuMessage(payload: @[byte x], contentTopic: defaultContentTopic), requestId)
|
||||
|
||||
response = await client.get_waku_v2_filter_v1_messages(cTopic)
|
||||
response = await client.get_waku_v2_filter_v1_messages(defaultContentTopic)
|
||||
check:
|
||||
# Max messages has not been exceeded
|
||||
response.len == maxSize
|
||||
response.allIt(it.contentTopic == cTopic)
|
||||
response.allIt(it.contentTopic == defaultContentTopic)
|
||||
# Check that oldest item has been removed
|
||||
response[0].payload == @[byte 2]
|
||||
response[maxSize - 1].payload == @[byte (maxSize + 1)]
|
||||
|
@ -496,7 +495,6 @@ procSuite "Waku v2 JSON-RPC API":
|
|||
await allFutures([node1.stop(), node2.stop(), node3.stop()])
|
||||
|
||||
asyncTest "Admin API: get unmanaged peer information":
|
||||
const cTopic = ContentTopic(1)
|
||||
let
|
||||
nodeKey = crypto.PrivateKey.random(Secp256k1, rng[])[]
|
||||
node = WakuNode.init(nodeKey, ValidIpAddress.init("0.0.0.0"),
|
||||
|
@ -561,7 +559,7 @@ procSuite "Waku v2 JSON-RPC API":
|
|||
nodeKey3 = crypto.PrivateKey.random(Secp256k1, rng[])[]
|
||||
node3 = WakuNode.init(nodeKey3, bindIp, Port(60003), some(extIp), some(port))
|
||||
pubSubTopic = "polling"
|
||||
contentTopic = ContentTopic(1)
|
||||
contentTopic = defaultContentTopic
|
||||
payload = @[byte 9]
|
||||
message = WakuRelayMessage(payload: payload, contentTopic: some(contentTopic))
|
||||
topicCache = newTable[string, seq[WakuMessage]]()
|
||||
|
@ -651,7 +649,7 @@ procSuite "Waku v2 JSON-RPC API":
|
|||
nodeKey3 = crypto.PrivateKey.random(Secp256k1, rng[])[]
|
||||
node3 = WakuNode.init(nodeKey3, bindIp, Port(60003), some(extIp), some(port))
|
||||
pubSubTopic = "polling"
|
||||
contentTopic = ContentTopic(1)
|
||||
contentTopic = defaultContentTopic
|
||||
payload = @[byte 9]
|
||||
message = WakuRelayMessage(payload: payload, contentTopic: some(contentTopic))
|
||||
topicCache = newTable[string, seq[WakuMessage]]()
|
||||
|
|
|
@ -12,7 +12,7 @@ suite "Message Store":
|
|||
let
|
||||
database = SqliteDatabase.init("", inMemory = true)[]
|
||||
store = WakuMessageStore.init(database)[]
|
||||
topic = ContentTopic(1)
|
||||
topic = ContentTopic("/waku/2/default-content/proto")
|
||||
|
||||
var msgs = @[
|
||||
WakuMessage(payload: @[byte 1, 2, 3], contentTopic: topic),
|
||||
|
|
|
@ -3,7 +3,7 @@
|
|||
import
|
||||
std/strutils,
|
||||
testutils/unittests,
|
||||
chronicles, chronos, stew/shims/net as stewNet, stew/byteutils,
|
||||
chronicles, chronos, stew/shims/net as stewNet, stew/[byteutils, objects],
|
||||
libp2p/crypto/crypto,
|
||||
libp2p/crypto/secp,
|
||||
libp2p/peerid,
|
||||
|
@ -44,8 +44,8 @@ procSuite "WakuBridge":
|
|||
v2NodeKey = crypto.PrivateKey.random(Secp256k1, rng[])[]
|
||||
v2Node = WakuNode.init(v2NodeKey, ValidIpAddress.init("0.0.0.0"), Port(60002))
|
||||
|
||||
topic = [byte 0x00, 0, 0, byte 0x01]
|
||||
contentTopic = ContentTopic(1)
|
||||
contentTopic = ContentTopic("0001")
|
||||
topic = toArray(4, contentTopic.toBytes()[0..3])
|
||||
payloadV1 = "hello from V1".toBytes()
|
||||
payloadV2 = "hello from V2".toBytes()
|
||||
message = WakuMessage(payload: payloadV2, contentTopic: contentTopic)
|
||||
|
|
|
@ -21,7 +21,7 @@ procSuite "Waku Filter":
|
|||
let
|
||||
key = PrivateKey.random(ECDSA, rng[]).get()
|
||||
peer = PeerInfo.init(key)
|
||||
contentTopic = ContentTopic(1)
|
||||
contentTopic = ContentTopic("/waku/2/default-content/proto")
|
||||
post = WakuMessage(payload: @[byte 1, 2, 3], contentTopic: contentTopic)
|
||||
|
||||
var dialSwitch = newStandardSwitch()
|
||||
|
@ -70,7 +70,7 @@ procSuite "Waku Filter":
|
|||
let
|
||||
key = PrivateKey.random(ECDSA, rng[]).get()
|
||||
peer = PeerInfo.init(key)
|
||||
contentTopic = ContentTopic(1)
|
||||
contentTopic = ContentTopic("/waku/2/default-content/proto")
|
||||
post = WakuMessage(payload: @[byte 1, 2, 3], contentTopic: contentTopic)
|
||||
|
||||
var dialSwitch = newStandardSwitch()
|
||||
|
@ -134,7 +134,7 @@ procSuite "Waku Filter":
|
|||
const defaultTopic = "/waku/2/default-waku/proto"
|
||||
|
||||
let
|
||||
contentTopic = ContentTopic(1)
|
||||
contentTopic = ContentTopic("/waku/2/default-content/proto")
|
||||
|
||||
var dialSwitch = newStandardSwitch()
|
||||
discard await dialSwitch.start()
|
||||
|
|
|
@ -26,9 +26,9 @@ procSuite "pagination":
|
|||
index.receivedTime != 0 # the timestamp should be a non-zero value
|
||||
|
||||
let
|
||||
wm1 = WakuMessage(payload: @[byte 1, 2, 3], contentTopic: ContentTopic(1))
|
||||
wm1 = WakuMessage(payload: @[byte 1, 2, 3], contentTopic: ContentTopic("/waku/2/default-content/proto"))
|
||||
index1 = wm1.computeIndex()
|
||||
wm2 = WakuMessage(payload: @[byte 1, 2, 3], contentTopic: ContentTopic(1))
|
||||
wm2 = WakuMessage(payload: @[byte 1, 2, 3], contentTopic: ContentTopic("/waku/2/default-content/proto"))
|
||||
index2 = wm2.computeIndex()
|
||||
|
||||
check:
|
||||
|
|
|
@ -15,13 +15,15 @@ import
|
|||
../test_helpers, ./utils
|
||||
|
||||
procSuite "Waku Store":
|
||||
const defaultContentTopic = ContentTopic("1")
|
||||
|
||||
asyncTest "handle query":
|
||||
let
|
||||
key = PrivateKey.random(ECDSA, rng[]).get()
|
||||
peer = PeerInfo.init(key)
|
||||
topic = ContentTopic(1)
|
||||
topic = defaultContentTopic
|
||||
msg = WakuMessage(payload: @[byte 1, 2, 3], contentTopic: topic)
|
||||
msg2 = WakuMessage(payload: @[byte 1, 2, 3], contentTopic: ContentTopic(2))
|
||||
msg2 = WakuMessage(payload: @[byte 1, 2, 3], contentTopic: ContentTopic("2"))
|
||||
|
||||
var dialSwitch = newStandardSwitch()
|
||||
discard await dialSwitch.start()
|
||||
|
@ -61,11 +63,11 @@ procSuite "Waku Store":
|
|||
let
|
||||
key = PrivateKey.random(ECDSA, rng[]).get()
|
||||
peer = PeerInfo.init(key)
|
||||
topic = ContentTopic(1)
|
||||
topic = defaultContentTopic
|
||||
database = SqliteDatabase.init("", inMemory = true)[]
|
||||
store = WakuMessageStore.init(database)[]
|
||||
msg = WakuMessage(payload: @[byte 1, 2, 3], contentTopic: topic)
|
||||
msg2 = WakuMessage(payload: @[byte 1, 2, 3], contentTopic: ContentTopic(2))
|
||||
msg2 = WakuMessage(payload: @[byte 1, 2, 3], contentTopic: ContentTopic("2"))
|
||||
|
||||
var dialSwitch = newStandardSwitch()
|
||||
discard await dialSwitch.start()
|
||||
|
@ -129,16 +131,16 @@ procSuite "Waku Store":
|
|||
key = PrivateKey.random(ECDSA, rng[]).get()
|
||||
peer = PeerInfo.init(key)
|
||||
var
|
||||
msgList = @[WakuMessage(payload: @[byte 0], contentTopic: ContentTopic(2)),
|
||||
WakuMessage(payload: @[byte 1],contentTopic: ContentTopic(1)),
|
||||
WakuMessage(payload: @[byte 2],contentTopic: ContentTopic(1)),
|
||||
WakuMessage(payload: @[byte 3],contentTopic: ContentTopic(1)),
|
||||
WakuMessage(payload: @[byte 4],contentTopic: ContentTopic(1)),
|
||||
WakuMessage(payload: @[byte 5],contentTopic: ContentTopic(1)),
|
||||
WakuMessage(payload: @[byte 6],contentTopic: ContentTopic(1)),
|
||||
WakuMessage(payload: @[byte 7],contentTopic: ContentTopic(1)),
|
||||
WakuMessage(payload: @[byte 8],contentTopic: ContentTopic(1)),
|
||||
WakuMessage(payload: @[byte 9],contentTopic: ContentTopic(2))]
|
||||
msgList = @[WakuMessage(payload: @[byte 0], contentTopic: ContentTopic("2")),
|
||||
WakuMessage(payload: @[byte 1],contentTopic: defaultContentTopic),
|
||||
WakuMessage(payload: @[byte 2],contentTopic: defaultContentTopic),
|
||||
WakuMessage(payload: @[byte 3],contentTopic: defaultContentTopic),
|
||||
WakuMessage(payload: @[byte 4],contentTopic: defaultContentTopic),
|
||||
WakuMessage(payload: @[byte 5],contentTopic: defaultContentTopic),
|
||||
WakuMessage(payload: @[byte 6],contentTopic: defaultContentTopic),
|
||||
WakuMessage(payload: @[byte 7],contentTopic: defaultContentTopic),
|
||||
WakuMessage(payload: @[byte 8],contentTopic: defaultContentTopic),
|
||||
WakuMessage(payload: @[byte 9],contentTopic: ContentTopic("2"))]
|
||||
|
||||
var dialSwitch = newStandardSwitch()
|
||||
discard await dialSwitch.start()
|
||||
|
@ -149,7 +151,7 @@ procSuite "Waku Store":
|
|||
let
|
||||
proto = WakuStore.init(PeerManager.new(dialSwitch), crypto.newRng())
|
||||
subscription = proto.subscription()
|
||||
rpc = HistoryQuery(topics: @[ContentTopic(1)], pagingInfo: PagingInfo(pageSize: 2, direction: PagingDirection.FORWARD) )
|
||||
rpc = HistoryQuery(topics: @[defaultContentTopic], pagingInfo: PagingInfo(pageSize: 2, direction: PagingDirection.FORWARD) )
|
||||
|
||||
proto.setPeer(listenSwitch.peerInfo)
|
||||
|
||||
|
@ -181,16 +183,16 @@ procSuite "Waku Store":
|
|||
key = PrivateKey.random(ECDSA, rng[]).get()
|
||||
peer = PeerInfo.init(key)
|
||||
var
|
||||
msgList = @[WakuMessage(payload: @[byte 0], contentTopic: ContentTopic(2)),
|
||||
WakuMessage(payload: @[byte 1],contentTopic: ContentTopic(1)),
|
||||
WakuMessage(payload: @[byte 2],contentTopic: ContentTopic(1)),
|
||||
WakuMessage(payload: @[byte 3],contentTopic: ContentTopic(1)),
|
||||
WakuMessage(payload: @[byte 4],contentTopic: ContentTopic(1)),
|
||||
WakuMessage(payload: @[byte 5],contentTopic: ContentTopic(1)),
|
||||
WakuMessage(payload: @[byte 6],contentTopic: ContentTopic(1)),
|
||||
WakuMessage(payload: @[byte 7],contentTopic: ContentTopic(1)),
|
||||
WakuMessage(payload: @[byte 8],contentTopic: ContentTopic(1)),
|
||||
WakuMessage(payload: @[byte 9],contentTopic: ContentTopic(2))]
|
||||
msgList = @[WakuMessage(payload: @[byte 0], contentTopic: ContentTopic("2")),
|
||||
WakuMessage(payload: @[byte 1],contentTopic: defaultContentTopic),
|
||||
WakuMessage(payload: @[byte 2],contentTopic: defaultContentTopic),
|
||||
WakuMessage(payload: @[byte 3],contentTopic: defaultContentTopic),
|
||||
WakuMessage(payload: @[byte 4],contentTopic: defaultContentTopic),
|
||||
WakuMessage(payload: @[byte 5],contentTopic: defaultContentTopic),
|
||||
WakuMessage(payload: @[byte 6],contentTopic: defaultContentTopic),
|
||||
WakuMessage(payload: @[byte 7],contentTopic: defaultContentTopic),
|
||||
WakuMessage(payload: @[byte 8],contentTopic: defaultContentTopic),
|
||||
WakuMessage(payload: @[byte 9],contentTopic: ContentTopic("2"))]
|
||||
|
||||
var dialSwitch = newStandardSwitch()
|
||||
discard await dialSwitch.start()
|
||||
|
@ -220,7 +222,7 @@ procSuite "Waku Store":
|
|||
response.pagingInfo.cursor != Index()
|
||||
completionFut.complete(true)
|
||||
|
||||
let rpc = HistoryQuery(topics: @[ContentTopic(1)], pagingInfo: PagingInfo(pageSize: 2, direction: PagingDirection.BACKWARD) )
|
||||
let rpc = HistoryQuery(topics: @[defaultContentTopic], pagingInfo: PagingInfo(pageSize: 2, direction: PagingDirection.BACKWARD) )
|
||||
await proto.query(rpc, handler)
|
||||
|
||||
check:
|
||||
|
@ -231,16 +233,16 @@ procSuite "Waku Store":
|
|||
key = PrivateKey.random(ECDSA, rng[]).get()
|
||||
peer = PeerInfo.init(key)
|
||||
var
|
||||
msgList = @[WakuMessage(payload: @[byte 0], contentTopic: ContentTopic(2)),
|
||||
WakuMessage(payload: @[byte 1], contentTopic: ContentTopic(1)),
|
||||
WakuMessage(payload: @[byte 2], contentTopic: ContentTopic(1)),
|
||||
WakuMessage(payload: @[byte 3], contentTopic: ContentTopic(1)),
|
||||
WakuMessage(payload: @[byte 4], contentTopic: ContentTopic(1)),
|
||||
WakuMessage(payload: @[byte 5], contentTopic: ContentTopic(1)),
|
||||
WakuMessage(payload: @[byte 6], contentTopic: ContentTopic(1)),
|
||||
WakuMessage(payload: @[byte 7], contentTopic: ContentTopic(1)),
|
||||
WakuMessage(payload: @[byte 8], contentTopic: ContentTopic(1)),
|
||||
WakuMessage(payload: @[byte 9], contentTopic: ContentTopic(2))]
|
||||
msgList = @[WakuMessage(payload: @[byte 0], contentTopic: ContentTopic("2")),
|
||||
WakuMessage(payload: @[byte 1], contentTopic: defaultContentTopic),
|
||||
WakuMessage(payload: @[byte 2], contentTopic: defaultContentTopic),
|
||||
WakuMessage(payload: @[byte 3], contentTopic: defaultContentTopic),
|
||||
WakuMessage(payload: @[byte 4], contentTopic: defaultContentTopic),
|
||||
WakuMessage(payload: @[byte 5], contentTopic: defaultContentTopic),
|
||||
WakuMessage(payload: @[byte 6], contentTopic: defaultContentTopic),
|
||||
WakuMessage(payload: @[byte 7], contentTopic: defaultContentTopic),
|
||||
WakuMessage(payload: @[byte 8], contentTopic: defaultContentTopic),
|
||||
WakuMessage(payload: @[byte 9], contentTopic: ContentTopic("2"))]
|
||||
|
||||
var dialSwitch = newStandardSwitch()
|
||||
discard await dialSwitch.start()
|
||||
|
@ -268,7 +270,7 @@ procSuite "Waku Store":
|
|||
response.pagingInfo == PagingInfo()
|
||||
completionFut.complete(true)
|
||||
|
||||
let rpc = HistoryQuery(topics: @[ContentTopic(1)] )
|
||||
let rpc = HistoryQuery(topics: @[defaultContentTopic] )
|
||||
|
||||
await proto.query(rpc, handler)
|
||||
|
||||
|
@ -277,7 +279,7 @@ procSuite "Waku Store":
|
|||
|
||||
test "Index Protobuf encoder/decoder test":
|
||||
let
|
||||
index = computeIndex(WakuMessage(payload: @[byte 1], contentTopic: ContentTopic(1)))
|
||||
index = computeIndex(WakuMessage(payload: @[byte 1], contentTopic: defaultContentTopic))
|
||||
pb = index.encode()
|
||||
decodedIndex = Index.init(pb.buffer)
|
||||
|
||||
|
@ -310,7 +312,7 @@ procSuite "Waku Store":
|
|||
|
||||
test "PagingInfo Protobuf encod/init test":
|
||||
let
|
||||
index = computeIndex(WakuMessage(payload: @[byte 1], contentTopic: ContentTopic(1)))
|
||||
index = computeIndex(WakuMessage(payload: @[byte 1], contentTopic: defaultContentTopic))
|
||||
pagingInfo = PagingInfo(pageSize: 1, cursor: index, direction: PagingDirection.BACKWARD)
|
||||
pb = pagingInfo.encode()
|
||||
decodedPagingInfo = PagingInfo.init(pb.buffer)
|
||||
|
@ -332,9 +334,9 @@ procSuite "Waku Store":
|
|||
|
||||
test "HistoryQuery Protobuf encode/init test":
|
||||
let
|
||||
index = computeIndex(WakuMessage(payload: @[byte 1], contentTopic: ContentTopic(1)))
|
||||
index = computeIndex(WakuMessage(payload: @[byte 1], contentTopic: defaultContentTopic))
|
||||
pagingInfo = PagingInfo(pageSize: 1, cursor: index, direction: PagingDirection.BACKWARD)
|
||||
query=HistoryQuery(topics: @[ContentTopic(1)], pagingInfo: pagingInfo, startTime: float64(10), endTime: float64(11))
|
||||
query=HistoryQuery(topics: @[defaultContentTopic], pagingInfo: pagingInfo, startTime: float64(10), endTime: float64(11))
|
||||
pb = query.encode()
|
||||
decodedQuery = HistoryQuery.init(pb.buffer)
|
||||
|
||||
|
@ -355,7 +357,7 @@ procSuite "Waku Store":
|
|||
|
||||
test "HistoryResponse Protobuf encod/init test":
|
||||
let
|
||||
wm = WakuMessage(payload: @[byte 1], contentTopic: ContentTopic(1))
|
||||
wm = WakuMessage(payload: @[byte 1], contentTopic: defaultContentTopic)
|
||||
index = computeIndex(wm)
|
||||
pagingInfo = PagingInfo(pageSize: 1, cursor: index, direction: PagingDirection.BACKWARD)
|
||||
res = HistoryResponse(messages: @[wm], pagingInfo:pagingInfo)
|
||||
|
|
|
@ -54,7 +54,7 @@ procSuite "Waku SWAP Accounting":
|
|||
nodeKey2 = crypto.PrivateKey.random(Secp256k1, rng[])[]
|
||||
node2 = WakuNode.init(nodeKey2, ValidIpAddress.init("0.0.0.0"),
|
||||
Port(60001))
|
||||
contentTopic = ContentTopic(1)
|
||||
contentTopic = ContentTopic("/waku/2/default-content/proto")
|
||||
message = WakuMessage(payload: "hello world".toBytes(), contentTopic: contentTopic)
|
||||
|
||||
var completionFut = newFuture[bool]()
|
||||
|
@ -100,7 +100,7 @@ procSuite "Waku SWAP Accounting":
|
|||
nodeKey2 = crypto.PrivateKey.random(Secp256k1, rng[])[]
|
||||
node2 = WakuNode.init(nodeKey2, ValidIpAddress.init("0.0.0.0"),
|
||||
Port(60001))
|
||||
contentTopic = ContentTopic(1)
|
||||
contentTopic = ContentTopic("/waku/2/default-content/proto")
|
||||
message = WakuMessage(payload: "hello world".toBytes(), contentTopic: contentTopic)
|
||||
|
||||
var futures = [newFuture[bool](), newFuture[bool]()]
|
||||
|
|
|
@ -27,7 +27,7 @@ procSuite "WakuNode":
|
|||
node = WakuNode.init(nodeKey, ValidIpAddress.init("0.0.0.0"),
|
||||
Port(60000))
|
||||
pubSubTopic = "chat"
|
||||
contentTopic = ContentTopic(1)
|
||||
contentTopic = ContentTopic("/waku/2/default-content/proto")
|
||||
filterRequest = FilterRequest(topic: pubSubTopic, contentFilters: @[ContentFilter(topics: @[contentTopic])], subscribe: true)
|
||||
message = WakuMessage(payload: "hello world".toBytes(),
|
||||
contentTopic: contentTopic)
|
||||
|
@ -79,7 +79,7 @@ procSuite "WakuNode":
|
|||
node2 = WakuNode.init(nodeKey2, ValidIpAddress.init("0.0.0.0"),
|
||||
Port(60002))
|
||||
pubSubTopic = "chat"
|
||||
contentTopic = ContentTopic(1)
|
||||
contentTopic = ContentTopic("/waku/2/default-content/proto")
|
||||
filterRequest = FilterRequest(topic: pubSubTopic, contentFilters: @[ContentFilter(topics: @[contentTopic])], subscribe: true)
|
||||
message = WakuMessage(payload: "hello world".toBytes(),
|
||||
contentTopic: contentTopic)
|
||||
|
@ -141,7 +141,7 @@ procSuite "WakuNode":
|
|||
nodeKey2 = crypto.PrivateKey.random(Secp256k1, rng[])[]
|
||||
node2 = WakuNode.init(nodeKey2, ValidIpAddress.init("0.0.0.0"),
|
||||
Port(60002))
|
||||
contentTopic = ContentTopic(1)
|
||||
contentTopic = ContentTopic("/waku/2/default-content/proto")
|
||||
message = WakuMessage(payload: "hello world".toBytes(), contentTopic: contentTopic)
|
||||
|
||||
var completionFut = newFuture[bool]()
|
||||
|
@ -177,7 +177,7 @@ procSuite "WakuNode":
|
|||
nodeKey2 = crypto.PrivateKey.random(Secp256k1, rng[])[]
|
||||
node2 = WakuNode.init(nodeKey2, ValidIpAddress.init("0.0.0.0"),
|
||||
Port(60002))
|
||||
contentTopic = ContentTopic(1)
|
||||
contentTopic = ContentTopic("/waku/2/default-content/proto")
|
||||
message = WakuMessage(payload: "hello world".toBytes(), contentTopic: contentTopic)
|
||||
|
||||
var completionFut = newFuture[bool]()
|
||||
|
@ -219,7 +219,7 @@ procSuite "WakuNode":
|
|||
node3 = WakuNode.init(nodeKey3, ValidIpAddress.init("0.0.0.0"),
|
||||
Port(60003))
|
||||
pubSubTopic = "test"
|
||||
contentTopic = ContentTopic(1)
|
||||
contentTopic = ContentTopic("/waku/2/default-content/proto")
|
||||
payload = "hello world".toBytes()
|
||||
message = WakuMessage(payload: payload, contentTopic: contentTopic)
|
||||
|
||||
|
@ -317,12 +317,12 @@ procSuite "WakuNode":
|
|||
node3 = WakuNode.init(nodeKey3, ValidIpAddress.init("0.0.0.0"), Port(60003))
|
||||
|
||||
pubSubTopic = "test"
|
||||
contentTopic1 = ContentTopic(1)
|
||||
contentTopic1 = ContentTopic("/waku/2/default-content/proto")
|
||||
payload = "hello world".toBytes()
|
||||
message1 = WakuMessage(payload: payload, contentTopic: contentTopic1)
|
||||
|
||||
payload2 = "you should not see this message!".toBytes()
|
||||
contentTopic2 = ContentTopic(2)
|
||||
contentTopic2 = ContentTopic("2")
|
||||
message2 = WakuMessage(payload: payload2, contentTopic: contentTopic2)
|
||||
|
||||
# start all the nodes
|
||||
|
@ -410,7 +410,7 @@ procSuite "WakuNode":
|
|||
node3 = WakuNode.init(nodeKey3, ValidIpAddress.init("0.0.0.0"), Port(60003))
|
||||
|
||||
pubSubTopic = "defaultTopic"
|
||||
contentTopic1 = ContentTopic(1)
|
||||
contentTopic1 = ContentTopic("/waku/2/default-content/proto")
|
||||
payload = "hello world".toBytes()
|
||||
message1 = WakuMessage(payload: payload, contentTopic: contentTopic1)
|
||||
|
||||
|
|
|
@ -1,7 +1,7 @@
|
|||
import
|
||||
std/tables,
|
||||
chronos, confutils, chronicles, chronicles/topics_registry, metrics,
|
||||
stew/endians2,
|
||||
stew/[byteutils, objects],
|
||||
stew/shims/net as stewNet, json_rpc/rpcserver,
|
||||
# Waku v1 imports
|
||||
eth/[keys, p2p], eth/common/utils,
|
||||
|
@ -44,17 +44,22 @@ type
|
|||
func toWakuMessage(env: Envelope): WakuMessage =
|
||||
# Translate a Waku v1 envelope to a Waku v2 message
|
||||
WakuMessage(payload: env.data,
|
||||
contentTopic: ContentTopic(uint32.fromBytes(env.topic, Endianness.bigEndian)),
|
||||
contentTopic: ContentTopic(string.fromBytes(env.topic)),
|
||||
version: 1)
|
||||
|
||||
proc toWakuV2(bridge: WakuBridge, env: Envelope) {.async.} =
|
||||
waku_bridge_transfers.inc(labelValues = ["v1_to_v2"])
|
||||
|
||||
await bridge.nodev2.publish(defaultBridgeTopic, env.toWakuMessage())
|
||||
|
||||
proc toWakuV1(bridge: WakuBridge, msg: WakuMessage) {.gcsafe.} =
|
||||
waku_bridge_transfers.inc(labelValues = ["v2_to_v1"])
|
||||
|
||||
# @TODO: use namespacing to map v2 contentTopics to v1 topics
|
||||
let v1TopicSeq = msg.contentTopic.toBytes()[0..3]
|
||||
|
||||
discard bridge.nodev1.postMessage(ttl = defaultTTL,
|
||||
topic = msg.contentTopic.toBytes(Endianness.bigEndian),
|
||||
topic = toArray(4, v1TopicSeq),
|
||||
payload = msg.payload)
|
||||
|
||||
##############
|
||||
|
|
|
@ -43,7 +43,7 @@ proc installAdminApiHandlers*(node: WakuNode, rpcsrv: RpcServer) =
|
|||
raise newException(ValueError, "Failed to connect to peers: " & $peers)
|
||||
|
||||
rpcsrv.rpc("get_waku_v2_admin_v1_peers") do() -> seq[WakuPeer]:
|
||||
## Returns history for a list of content topics with optional paging
|
||||
## Returns a list of peers registered for this node
|
||||
debug "get_waku_v2_admin_v1_peers"
|
||||
|
||||
# Create a single list of peers from mounted protocols.
|
||||
|
|
|
@ -3,6 +3,7 @@ import
|
|||
eth/keys,
|
||||
../../../v1/node/rpc/hexstrings,
|
||||
../../protocol/waku_store/waku_store_types,
|
||||
../../protocol/waku_message,
|
||||
../waku_payload,
|
||||
./jsonrpc_types
|
||||
|
||||
|
@ -37,15 +38,14 @@ proc toStoreResponse*(historyResponse: HistoryResponse): StoreResponse =
|
|||
pagingOptions: if historyResponse.pagingInfo != PagingInfo(): some(historyResponse.pagingInfo.toPagingOptions()) else: none(StorePagingOptions))
|
||||
|
||||
proc toWakuMessage*(relayMessage: WakuRelayMessage, version: uint32): WakuMessage =
|
||||
# @TODO global definition for default content topic
|
||||
const defaultCT = 0
|
||||
const defaultCT = ContentTopic("/waku/2/default-content/proto")
|
||||
WakuMessage(payload: relayMessage.payload,
|
||||
contentTopic: if relayMessage.contentTopic.isSome: relayMessage.contentTopic.get else: defaultCT,
|
||||
version: version)
|
||||
|
||||
proc toWakuMessage*(relayMessage: WakuRelayMessage, version: uint32, rng: ref BrHmacDrbgContext, symkey: Option[SymKey], pubKey: Option[keys.PublicKey]): WakuMessage =
|
||||
# @TODO global definition for default content topic
|
||||
const defaultCT = 0
|
||||
const defaultCT = ContentTopic("/waku/2/default-content/proto")
|
||||
|
||||
let payload = Payload(payload: relayMessage.payload,
|
||||
dst: pubKey,
|
||||
|
|
|
@ -18,7 +18,7 @@ createRpcSigs(RpcHttpClient, sigWakuPath)
|
|||
|
||||
const defaultTopic = "/waku/2/default-waku/proto"
|
||||
|
||||
const defaultContentTopic = ContentTopic(1)
|
||||
const defaultContentTopic = ContentTopic("waku/2/default-content/proto")
|
||||
|
||||
const topicAmount = 10 #100
|
||||
|
||||
|
|
|
@ -33,7 +33,7 @@ var node = newRpcHttpClient()
|
|||
waitfor node.connect("localhost", rpcPort)
|
||||
|
||||
let pubSubTopic = "/waku/2/default-waku/proto"
|
||||
let contentTopic = ContentTopic(1)
|
||||
let contentTopic = ContentTopic("/waku/2/default-content/proto")
|
||||
let relayMessage = WakuRelayMessage(payload: input.toBytes(), contentTopic: some(contentTopic))
|
||||
var res = waitfor node.post_waku_v2_relay_v1_message(pubSubTopic, relayMessage)
|
||||
echo "Waku publish response: ", res
|
||||
|
|
|
@ -32,5 +32,5 @@ echo "Input is:", input
|
|||
var node = newRpcHttpClient()
|
||||
waitfor node.connect("localhost", rpcPort)
|
||||
|
||||
var res = waitfor node.get_waku_v2_store_v1_messages(@[ContentTopic(parseUInt(input))], none(StorePagingOptions))
|
||||
var res = waitfor node.get_waku_v2_store_v1_messages(@[ContentTopic(input)], none(StorePagingOptions))
|
||||
echo "Waku query response: ", res
|
||||
|
|
|
@ -33,6 +33,6 @@ var node = newRpcHttpClient()
|
|||
waitfor node.connect("localhost", rpcPort)
|
||||
|
||||
let pubSubTopic = "/waku/2/default-waku/proto"
|
||||
let contentTopic = ContentTopic(1)
|
||||
let contentTopic = ContentTopic("/waku/2/default-content/proto")
|
||||
var res = waitfor node.post_waku_v2_filter_v1_subscription(@[ContentFilter(topics: @[contentTopic])], some(pubSubTopic))
|
||||
echo "Waku query response: ", res
|
||||
|
|
|
@ -5,7 +5,7 @@ import
|
|||
libp2p/protocols/protocol,
|
||||
libp2p/protobuf/minprotobuf,
|
||||
libp2p/stream/connection,
|
||||
stew/results,
|
||||
stew/[byteutils, results],
|
||||
./message_store,
|
||||
../sqlite,
|
||||
../../../protocol/waku_message,
|
||||
|
@ -31,7 +31,7 @@ proc init*(T: type WakuMessageStore, db: SqliteDatabase): MessageStoreResult[T]
|
|||
CREATE TABLE IF NOT EXISTS messages (
|
||||
id BLOB PRIMARY KEY,
|
||||
timestamp INTEGER NOT NULL,
|
||||
contentTopic INTEGER NOT NULL,
|
||||
contentTopic BLOB NOT NULL,
|
||||
payload BLOB
|
||||
) WITHOUT ROWID;
|
||||
""", NoParams, void)
|
||||
|
@ -57,14 +57,14 @@ method put*(db: WakuMessageStore, cursor: Index, message: WakuMessage): MessageS
|
|||
##
|
||||
let prepare = db.database.prepareStmt(
|
||||
"INSERT INTO messages (id, timestamp, contentTopic, payload) VALUES (?, ?, ?, ?);",
|
||||
(seq[byte], int64, uint32, seq[byte]),
|
||||
(seq[byte], int64, seq[byte], seq[byte]),
|
||||
void
|
||||
)
|
||||
|
||||
if prepare.isErr:
|
||||
return err("failed to prepare")
|
||||
|
||||
let res = prepare.value.exec((@(cursor.digest.data), int64(cursor.receivedTime), message.contentTopic, message.payload))
|
||||
let res = prepare.value.exec((@(cursor.digest.data), int64(cursor.receivedTime), message.contentTopic.toBytes(), message.payload))
|
||||
if res.isErr:
|
||||
return err("failed")
|
||||
|
||||
|
@ -87,11 +87,14 @@ method getAll*(db: WakuMessageStore, onData: message_store.DataProc): MessageSto
|
|||
gotMessages = true
|
||||
let
|
||||
timestamp = sqlite3_column_int64(s, 0)
|
||||
topic = sqlite3_column_int(s, 1)
|
||||
topic = cast[ptr UncheckedArray[byte]](sqlite3_column_blob(s, 1))
|
||||
topicL = sqlite3_column_bytes(s,1)
|
||||
p = cast[ptr UncheckedArray[byte]](sqlite3_column_blob(s, 2))
|
||||
l = sqlite3_column_bytes(s, 2)
|
||||
|
||||
onData(uint64(timestamp), WakuMessage(contentTopic: ContentTopic(int(topic)), payload: @(toOpenArray(p, 0, l-1))))
|
||||
onData(uint64(timestamp),
|
||||
WakuMessage(contentTopic: ContentTopic(string.fromBytes(@(toOpenArray(topic, 0, topicL-1)))),
|
||||
payload: @(toOpenArray(p, 0, l-1))))
|
||||
|
||||
let res = db.database.query("SELECT timestamp, contentTopic, payload FROM messages ORDER BY timestamp ASC", msg)
|
||||
if res.isErr:
|
||||
|
|
|
@ -10,7 +10,7 @@ import
|
|||
libp2p/protobuf/minprotobuf
|
||||
|
||||
type
|
||||
ContentTopic* = uint32
|
||||
ContentTopic* = string
|
||||
|
||||
WakuMessage* = object
|
||||
payload*: seq[byte]
|
||||
|
|
Loading…
Reference in New Issue