mirror of
https://github.com/logos-messaging/logos-messaging-nim.git
synced 2026-01-04 06:53:12 +00:00
Change contentTopic to string (#463)
* Change contentTopic to string * Missed a spot * Try to fix Windows CI Co-authored-by: Oskar Thorén <ot@oskarthoren.com>
This commit is contained in:
parent
70798cd9d9
commit
d09ba25dd6
@ -35,7 +35,7 @@ proc runBackground() {.async.} =
|
|||||||
|
|
||||||
# Publish to a topic
|
# Publish to a topic
|
||||||
let payload = cast[seq[byte]]("hello world")
|
let payload = cast[seq[byte]]("hello world")
|
||||||
let message = WakuMessage(payload: payload, contentTopic: ContentTopic(1))
|
let message = WakuMessage(payload: payload, contentTopic: ContentTopic("/waku/2/default-content/proto"))
|
||||||
await node.publish(topic, message)
|
await node.publish(topic, message)
|
||||||
|
|
||||||
# TODO Await with try/except here
|
# TODO Await with try/except here
|
||||||
|
|||||||
@ -36,8 +36,7 @@ const
|
|||||||
PayloadV1* {.booldefine.} = false
|
PayloadV1* {.booldefine.} = false
|
||||||
DefaultTopic = "/waku/2/default-waku/proto"
|
DefaultTopic = "/waku/2/default-waku/proto"
|
||||||
|
|
||||||
Dingpu = "dingpu".toBytes
|
DefaultContentTopic = ContentTopic("dingpu")
|
||||||
DefaultContentTopic = ContentTopic(uint32.fromBytes(Dingpu))
|
|
||||||
|
|
||||||
# XXX Connected is a bit annoying, because incoming connections don't trigger state change
|
# XXX Connected is a bit annoying, because incoming connections don't trigger state change
|
||||||
# Could poll connection pool or something here, I suppose
|
# Could poll connection pool or something here, I suppose
|
||||||
|
|||||||
@ -32,6 +32,7 @@ createRpcSigs(RpcHttpClient, sigPath)
|
|||||||
|
|
||||||
procSuite "Waku v2 JSON-RPC API":
|
procSuite "Waku v2 JSON-RPC API":
|
||||||
const defaultTopic = "/waku/2/default-waku/proto"
|
const defaultTopic = "/waku/2/default-waku/proto"
|
||||||
|
const defaultContentTopic = ContentTopic("/waku/2/default-content/proto")
|
||||||
const testCodec = "/waku/2/default-waku/codec"
|
const testCodec = "/waku/2/default-waku/codec"
|
||||||
|
|
||||||
let
|
let
|
||||||
@ -99,7 +100,7 @@ procSuite "Waku v2 JSON-RPC API":
|
|||||||
response == true
|
response == true
|
||||||
|
|
||||||
# Publish a message on the default topic
|
# Publish a message on the default topic
|
||||||
response = await client.post_waku_v2_relay_v1_message(defaultTopic, WakuRelayMessage(payload: @[byte 1], contentTopic: some(ContentTopic(1))))
|
response = await client.post_waku_v2_relay_v1_message(defaultTopic, WakuRelayMessage(payload: @[byte 1], contentTopic: some(defaultContentTopic)))
|
||||||
|
|
||||||
check:
|
check:
|
||||||
# @TODO poll topic to verify message has been published
|
# @TODO poll topic to verify message has been published
|
||||||
@ -126,7 +127,7 @@ procSuite "Waku v2 JSON-RPC API":
|
|||||||
nodeKey3 = crypto.PrivateKey.random(Secp256k1, rng[])[]
|
nodeKey3 = crypto.PrivateKey.random(Secp256k1, rng[])[]
|
||||||
node3 = WakuNode.init(nodeKey3, bindIp, Port(60003), some(extIp), some(port))
|
node3 = WakuNode.init(nodeKey3, bindIp, Port(60003), some(extIp), some(port))
|
||||||
pubSubTopic = "polling"
|
pubSubTopic = "polling"
|
||||||
contentTopic = ContentTopic(1)
|
contentTopic = defaultContentTopic
|
||||||
payload = @[byte 9]
|
payload = @[byte 9]
|
||||||
message = WakuMessage(payload: payload, contentTopic: contentTopic)
|
message = WakuMessage(payload: payload, contentTopic: contentTopic)
|
||||||
|
|
||||||
@ -241,16 +242,16 @@ procSuite "Waku v2 JSON-RPC API":
|
|||||||
|
|
||||||
# Now prime it with some history before tests
|
# Now prime it with some history before tests
|
||||||
var
|
var
|
||||||
msgList = @[WakuMessage(payload: @[byte 0], contentTopic: ContentTopic(2)),
|
msgList = @[WakuMessage(payload: @[byte 0], contentTopic: ContentTopic("2")),
|
||||||
WakuMessage(payload: @[byte 1], contentTopic: ContentTopic(1)),
|
WakuMessage(payload: @[byte 1], contentTopic: defaultContentTopic),
|
||||||
WakuMessage(payload: @[byte 2], contentTopic: ContentTopic(1)),
|
WakuMessage(payload: @[byte 2], contentTopic: defaultContentTopic),
|
||||||
WakuMessage(payload: @[byte 3], contentTopic: ContentTopic(1)),
|
WakuMessage(payload: @[byte 3], contentTopic: defaultContentTopic),
|
||||||
WakuMessage(payload: @[byte 4], contentTopic: ContentTopic(1)),
|
WakuMessage(payload: @[byte 4], contentTopic: defaultContentTopic),
|
||||||
WakuMessage(payload: @[byte 5], contentTopic: ContentTopic(1)),
|
WakuMessage(payload: @[byte 5], contentTopic: defaultContentTopic),
|
||||||
WakuMessage(payload: @[byte 6], contentTopic: ContentTopic(1)),
|
WakuMessage(payload: @[byte 6], contentTopic: defaultContentTopic),
|
||||||
WakuMessage(payload: @[byte 7], contentTopic: ContentTopic(1)),
|
WakuMessage(payload: @[byte 7], contentTopic: defaultContentTopic),
|
||||||
WakuMessage(payload: @[byte 8], contentTopic: ContentTopic(1)),
|
WakuMessage(payload: @[byte 8], contentTopic: defaultContentTopic),
|
||||||
WakuMessage(payload: @[byte 9], contentTopic: ContentTopic(2))]
|
WakuMessage(payload: @[byte 9], contentTopic: ContentTopic("2"))]
|
||||||
|
|
||||||
for wakuMsg in msgList:
|
for wakuMsg in msgList:
|
||||||
waitFor subscriptions.notify(defaultTopic, wakuMsg)
|
waitFor subscriptions.notify(defaultTopic, wakuMsg)
|
||||||
@ -258,7 +259,7 @@ procSuite "Waku v2 JSON-RPC API":
|
|||||||
let client = newRpcHttpClient()
|
let client = newRpcHttpClient()
|
||||||
await client.connect("127.0.0.1", rpcPort)
|
await client.connect("127.0.0.1", rpcPort)
|
||||||
|
|
||||||
let response = await client.get_waku_v2_store_v1_messages(@[ContentTopic(1)], some(StorePagingOptions()))
|
let response = await client.get_waku_v2_store_v1_messages(@[defaultContentTopic], some(StorePagingOptions()))
|
||||||
check:
|
check:
|
||||||
response.messages.len() == 8
|
response.messages.len() == 8
|
||||||
response.pagingOptions.isNone
|
response.pagingOptions.isNone
|
||||||
@ -290,8 +291,8 @@ procSuite "Waku v2 JSON-RPC API":
|
|||||||
# Light node has not yet subscribed to any filters
|
# Light node has not yet subscribed to any filters
|
||||||
node.filters.len() == 0
|
node.filters.len() == 0
|
||||||
|
|
||||||
let contentFilters = @[ContentFilter(topics: @[ContentTopic(1), ContentTopic(2)]),
|
let contentFilters = @[ContentFilter(topics: @[defaultContentTopic, ContentTopic("2")]),
|
||||||
ContentFilter(topics: @[ContentTopic(3), ContentTopic(4)])]
|
ContentFilter(topics: @[ContentTopic("3"), ContentTopic("4")])]
|
||||||
var response = await client.post_waku_v2_filter_v1_subscription(contentFilters = contentFilters, topic = some(defaultTopic))
|
var response = await client.post_waku_v2_filter_v1_subscription(contentFilters = contentFilters, topic = some(defaultTopic))
|
||||||
|
|
||||||
check:
|
check:
|
||||||
@ -311,8 +312,6 @@ procSuite "Waku v2 JSON-RPC API":
|
|||||||
waitfor node.stop()
|
waitfor node.stop()
|
||||||
|
|
||||||
asyncTest "Filter API: get latest messages":
|
asyncTest "Filter API: get latest messages":
|
||||||
const cTopic = ContentTopic(1)
|
|
||||||
|
|
||||||
waitFor node.start()
|
waitFor node.start()
|
||||||
|
|
||||||
# RPC server setup
|
# RPC server setup
|
||||||
@ -331,22 +330,22 @@ procSuite "Waku v2 JSON-RPC API":
|
|||||||
|
|
||||||
# First ensure subscription exists
|
# First ensure subscription exists
|
||||||
|
|
||||||
let sub = await client.post_waku_v2_filter_v1_subscription(contentFilters = @[ContentFilter(topics: @[cTopic])], topic = some(defaultTopic))
|
let sub = await client.post_waku_v2_filter_v1_subscription(contentFilters = @[ContentFilter(topics: @[defaultContentTopic])], topic = some(defaultTopic))
|
||||||
check:
|
check:
|
||||||
sub
|
sub
|
||||||
|
|
||||||
# Now prime the node with some messages before tests
|
# Now prime the node with some messages before tests
|
||||||
var
|
var
|
||||||
msgList = @[WakuMessage(payload: @[byte 0], contentTopic: ContentTopic(2)),
|
msgList = @[WakuMessage(payload: @[byte 0], contentTopic: ContentTopic("2")),
|
||||||
WakuMessage(payload: @[byte 1], contentTopic: cTopic),
|
WakuMessage(payload: @[byte 1], contentTopic: defaultContentTopic),
|
||||||
WakuMessage(payload: @[byte 2], contentTopic: cTopic),
|
WakuMessage(payload: @[byte 2], contentTopic: defaultContentTopic),
|
||||||
WakuMessage(payload: @[byte 3], contentTopic: cTopic),
|
WakuMessage(payload: @[byte 3], contentTopic: defaultContentTopic),
|
||||||
WakuMessage(payload: @[byte 4], contentTopic: cTopic),
|
WakuMessage(payload: @[byte 4], contentTopic: defaultContentTopic),
|
||||||
WakuMessage(payload: @[byte 5], contentTopic: cTopic),
|
WakuMessage(payload: @[byte 5], contentTopic: defaultContentTopic),
|
||||||
WakuMessage(payload: @[byte 6], contentTopic: cTopic),
|
WakuMessage(payload: @[byte 6], contentTopic: defaultContentTopic),
|
||||||
WakuMessage(payload: @[byte 7], contentTopic: cTopic),
|
WakuMessage(payload: @[byte 7], contentTopic: defaultContentTopic),
|
||||||
WakuMessage(payload: @[byte 8], contentTopic: cTopic),
|
WakuMessage(payload: @[byte 8], contentTopic: defaultContentTopic),
|
||||||
WakuMessage(payload: @[byte 9], contentTopic: ContentTopic(2))]
|
WakuMessage(payload: @[byte 9], contentTopic: ContentTopic("2"))]
|
||||||
|
|
||||||
let
|
let
|
||||||
filters = node.filters
|
filters = node.filters
|
||||||
@ -355,13 +354,13 @@ procSuite "Waku v2 JSON-RPC API":
|
|||||||
for wakuMsg in msgList:
|
for wakuMsg in msgList:
|
||||||
filters.notify(wakuMsg, requestId)
|
filters.notify(wakuMsg, requestId)
|
||||||
|
|
||||||
var response = await client.get_waku_v2_filter_v1_messages(cTopic)
|
var response = await client.get_waku_v2_filter_v1_messages(defaultContentTopic)
|
||||||
check:
|
check:
|
||||||
response.len() == 8
|
response.len() == 8
|
||||||
response.allIt(it.contentTopic == cTopic)
|
response.allIt(it.contentTopic == defaultContentTopic)
|
||||||
|
|
||||||
# No new messages
|
# No new messages
|
||||||
response = await client.get_waku_v2_filter_v1_messages(cTopic)
|
response = await client.get_waku_v2_filter_v1_messages(defaultContentTopic)
|
||||||
|
|
||||||
check:
|
check:
|
||||||
response.len() == 0
|
response.len() == 0
|
||||||
@ -372,13 +371,13 @@ procSuite "Waku v2 JSON-RPC API":
|
|||||||
|
|
||||||
for x in 1..(maxSize + 1):
|
for x in 1..(maxSize + 1):
|
||||||
# Try to cache 1 more than maximum allowed
|
# Try to cache 1 more than maximum allowed
|
||||||
filters.notify(WakuMessage(payload: @[byte x], contentTopic: cTopic), requestId)
|
filters.notify(WakuMessage(payload: @[byte x], contentTopic: defaultContentTopic), requestId)
|
||||||
|
|
||||||
response = await client.get_waku_v2_filter_v1_messages(cTopic)
|
response = await client.get_waku_v2_filter_v1_messages(defaultContentTopic)
|
||||||
check:
|
check:
|
||||||
# Max messages has not been exceeded
|
# Max messages has not been exceeded
|
||||||
response.len == maxSize
|
response.len == maxSize
|
||||||
response.allIt(it.contentTopic == cTopic)
|
response.allIt(it.contentTopic == defaultContentTopic)
|
||||||
# Check that oldest item has been removed
|
# Check that oldest item has been removed
|
||||||
response[0].payload == @[byte 2]
|
response[0].payload == @[byte 2]
|
||||||
response[maxSize - 1].payload == @[byte (maxSize + 1)]
|
response[maxSize - 1].payload == @[byte (maxSize + 1)]
|
||||||
@ -496,7 +495,6 @@ procSuite "Waku v2 JSON-RPC API":
|
|||||||
await allFutures([node1.stop(), node2.stop(), node3.stop()])
|
await allFutures([node1.stop(), node2.stop(), node3.stop()])
|
||||||
|
|
||||||
asyncTest "Admin API: get unmanaged peer information":
|
asyncTest "Admin API: get unmanaged peer information":
|
||||||
const cTopic = ContentTopic(1)
|
|
||||||
let
|
let
|
||||||
nodeKey = crypto.PrivateKey.random(Secp256k1, rng[])[]
|
nodeKey = crypto.PrivateKey.random(Secp256k1, rng[])[]
|
||||||
node = WakuNode.init(nodeKey, ValidIpAddress.init("0.0.0.0"),
|
node = WakuNode.init(nodeKey, ValidIpAddress.init("0.0.0.0"),
|
||||||
@ -561,7 +559,7 @@ procSuite "Waku v2 JSON-RPC API":
|
|||||||
nodeKey3 = crypto.PrivateKey.random(Secp256k1, rng[])[]
|
nodeKey3 = crypto.PrivateKey.random(Secp256k1, rng[])[]
|
||||||
node3 = WakuNode.init(nodeKey3, bindIp, Port(60003), some(extIp), some(port))
|
node3 = WakuNode.init(nodeKey3, bindIp, Port(60003), some(extIp), some(port))
|
||||||
pubSubTopic = "polling"
|
pubSubTopic = "polling"
|
||||||
contentTopic = ContentTopic(1)
|
contentTopic = defaultContentTopic
|
||||||
payload = @[byte 9]
|
payload = @[byte 9]
|
||||||
message = WakuRelayMessage(payload: payload, contentTopic: some(contentTopic))
|
message = WakuRelayMessage(payload: payload, contentTopic: some(contentTopic))
|
||||||
topicCache = newTable[string, seq[WakuMessage]]()
|
topicCache = newTable[string, seq[WakuMessage]]()
|
||||||
@ -651,7 +649,7 @@ procSuite "Waku v2 JSON-RPC API":
|
|||||||
nodeKey3 = crypto.PrivateKey.random(Secp256k1, rng[])[]
|
nodeKey3 = crypto.PrivateKey.random(Secp256k1, rng[])[]
|
||||||
node3 = WakuNode.init(nodeKey3, bindIp, Port(60003), some(extIp), some(port))
|
node3 = WakuNode.init(nodeKey3, bindIp, Port(60003), some(extIp), some(port))
|
||||||
pubSubTopic = "polling"
|
pubSubTopic = "polling"
|
||||||
contentTopic = ContentTopic(1)
|
contentTopic = defaultContentTopic
|
||||||
payload = @[byte 9]
|
payload = @[byte 9]
|
||||||
message = WakuRelayMessage(payload: payload, contentTopic: some(contentTopic))
|
message = WakuRelayMessage(payload: payload, contentTopic: some(contentTopic))
|
||||||
topicCache = newTable[string, seq[WakuMessage]]()
|
topicCache = newTable[string, seq[WakuMessage]]()
|
||||||
|
|||||||
@ -12,7 +12,7 @@ suite "Message Store":
|
|||||||
let
|
let
|
||||||
database = SqliteDatabase.init("", inMemory = true)[]
|
database = SqliteDatabase.init("", inMemory = true)[]
|
||||||
store = WakuMessageStore.init(database)[]
|
store = WakuMessageStore.init(database)[]
|
||||||
topic = ContentTopic(1)
|
topic = ContentTopic("/waku/2/default-content/proto")
|
||||||
|
|
||||||
var msgs = @[
|
var msgs = @[
|
||||||
WakuMessage(payload: @[byte 1, 2, 3], contentTopic: topic),
|
WakuMessage(payload: @[byte 1, 2, 3], contentTopic: topic),
|
||||||
|
|||||||
@ -3,7 +3,7 @@
|
|||||||
import
|
import
|
||||||
std/strutils,
|
std/strutils,
|
||||||
testutils/unittests,
|
testutils/unittests,
|
||||||
chronicles, chronos, stew/shims/net as stewNet, stew/byteutils,
|
chronicles, chronos, stew/shims/net as stewNet, stew/[byteutils, objects],
|
||||||
libp2p/crypto/crypto,
|
libp2p/crypto/crypto,
|
||||||
libp2p/crypto/secp,
|
libp2p/crypto/secp,
|
||||||
libp2p/peerid,
|
libp2p/peerid,
|
||||||
@ -44,8 +44,8 @@ procSuite "WakuBridge":
|
|||||||
v2NodeKey = crypto.PrivateKey.random(Secp256k1, rng[])[]
|
v2NodeKey = crypto.PrivateKey.random(Secp256k1, rng[])[]
|
||||||
v2Node = WakuNode.init(v2NodeKey, ValidIpAddress.init("0.0.0.0"), Port(60002))
|
v2Node = WakuNode.init(v2NodeKey, ValidIpAddress.init("0.0.0.0"), Port(60002))
|
||||||
|
|
||||||
topic = [byte 0x00, 0, 0, byte 0x01]
|
contentTopic = ContentTopic("0001")
|
||||||
contentTopic = ContentTopic(1)
|
topic = toArray(4, contentTopic.toBytes()[0..3])
|
||||||
payloadV1 = "hello from V1".toBytes()
|
payloadV1 = "hello from V1".toBytes()
|
||||||
payloadV2 = "hello from V2".toBytes()
|
payloadV2 = "hello from V2".toBytes()
|
||||||
message = WakuMessage(payload: payloadV2, contentTopic: contentTopic)
|
message = WakuMessage(payload: payloadV2, contentTopic: contentTopic)
|
||||||
|
|||||||
@ -21,7 +21,7 @@ procSuite "Waku Filter":
|
|||||||
let
|
let
|
||||||
key = PrivateKey.random(ECDSA, rng[]).get()
|
key = PrivateKey.random(ECDSA, rng[]).get()
|
||||||
peer = PeerInfo.init(key)
|
peer = PeerInfo.init(key)
|
||||||
contentTopic = ContentTopic(1)
|
contentTopic = ContentTopic("/waku/2/default-content/proto")
|
||||||
post = WakuMessage(payload: @[byte 1, 2, 3], contentTopic: contentTopic)
|
post = WakuMessage(payload: @[byte 1, 2, 3], contentTopic: contentTopic)
|
||||||
|
|
||||||
var dialSwitch = newStandardSwitch()
|
var dialSwitch = newStandardSwitch()
|
||||||
@ -70,7 +70,7 @@ procSuite "Waku Filter":
|
|||||||
let
|
let
|
||||||
key = PrivateKey.random(ECDSA, rng[]).get()
|
key = PrivateKey.random(ECDSA, rng[]).get()
|
||||||
peer = PeerInfo.init(key)
|
peer = PeerInfo.init(key)
|
||||||
contentTopic = ContentTopic(1)
|
contentTopic = ContentTopic("/waku/2/default-content/proto")
|
||||||
post = WakuMessage(payload: @[byte 1, 2, 3], contentTopic: contentTopic)
|
post = WakuMessage(payload: @[byte 1, 2, 3], contentTopic: contentTopic)
|
||||||
|
|
||||||
var dialSwitch = newStandardSwitch()
|
var dialSwitch = newStandardSwitch()
|
||||||
@ -134,7 +134,7 @@ procSuite "Waku Filter":
|
|||||||
const defaultTopic = "/waku/2/default-waku/proto"
|
const defaultTopic = "/waku/2/default-waku/proto"
|
||||||
|
|
||||||
let
|
let
|
||||||
contentTopic = ContentTopic(1)
|
contentTopic = ContentTopic("/waku/2/default-content/proto")
|
||||||
|
|
||||||
var dialSwitch = newStandardSwitch()
|
var dialSwitch = newStandardSwitch()
|
||||||
discard await dialSwitch.start()
|
discard await dialSwitch.start()
|
||||||
|
|||||||
@ -26,9 +26,9 @@ procSuite "pagination":
|
|||||||
index.receivedTime != 0 # the timestamp should be a non-zero value
|
index.receivedTime != 0 # the timestamp should be a non-zero value
|
||||||
|
|
||||||
let
|
let
|
||||||
wm1 = WakuMessage(payload: @[byte 1, 2, 3], contentTopic: ContentTopic(1))
|
wm1 = WakuMessage(payload: @[byte 1, 2, 3], contentTopic: ContentTopic("/waku/2/default-content/proto"))
|
||||||
index1 = wm1.computeIndex()
|
index1 = wm1.computeIndex()
|
||||||
wm2 = WakuMessage(payload: @[byte 1, 2, 3], contentTopic: ContentTopic(1))
|
wm2 = WakuMessage(payload: @[byte 1, 2, 3], contentTopic: ContentTopic("/waku/2/default-content/proto"))
|
||||||
index2 = wm2.computeIndex()
|
index2 = wm2.computeIndex()
|
||||||
|
|
||||||
check:
|
check:
|
||||||
|
|||||||
@ -15,13 +15,15 @@ import
|
|||||||
../test_helpers, ./utils
|
../test_helpers, ./utils
|
||||||
|
|
||||||
procSuite "Waku Store":
|
procSuite "Waku Store":
|
||||||
|
const defaultContentTopic = ContentTopic("1")
|
||||||
|
|
||||||
asyncTest "handle query":
|
asyncTest "handle query":
|
||||||
let
|
let
|
||||||
key = PrivateKey.random(ECDSA, rng[]).get()
|
key = PrivateKey.random(ECDSA, rng[]).get()
|
||||||
peer = PeerInfo.init(key)
|
peer = PeerInfo.init(key)
|
||||||
topic = ContentTopic(1)
|
topic = defaultContentTopic
|
||||||
msg = WakuMessage(payload: @[byte 1, 2, 3], contentTopic: topic)
|
msg = WakuMessage(payload: @[byte 1, 2, 3], contentTopic: topic)
|
||||||
msg2 = WakuMessage(payload: @[byte 1, 2, 3], contentTopic: ContentTopic(2))
|
msg2 = WakuMessage(payload: @[byte 1, 2, 3], contentTopic: ContentTopic("2"))
|
||||||
|
|
||||||
var dialSwitch = newStandardSwitch()
|
var dialSwitch = newStandardSwitch()
|
||||||
discard await dialSwitch.start()
|
discard await dialSwitch.start()
|
||||||
@ -61,11 +63,11 @@ procSuite "Waku Store":
|
|||||||
let
|
let
|
||||||
key = PrivateKey.random(ECDSA, rng[]).get()
|
key = PrivateKey.random(ECDSA, rng[]).get()
|
||||||
peer = PeerInfo.init(key)
|
peer = PeerInfo.init(key)
|
||||||
topic = ContentTopic(1)
|
topic = defaultContentTopic
|
||||||
database = SqliteDatabase.init("", inMemory = true)[]
|
database = SqliteDatabase.init("", inMemory = true)[]
|
||||||
store = WakuMessageStore.init(database)[]
|
store = WakuMessageStore.init(database)[]
|
||||||
msg = WakuMessage(payload: @[byte 1, 2, 3], contentTopic: topic)
|
msg = WakuMessage(payload: @[byte 1, 2, 3], contentTopic: topic)
|
||||||
msg2 = WakuMessage(payload: @[byte 1, 2, 3], contentTopic: ContentTopic(2))
|
msg2 = WakuMessage(payload: @[byte 1, 2, 3], contentTopic: ContentTopic("2"))
|
||||||
|
|
||||||
var dialSwitch = newStandardSwitch()
|
var dialSwitch = newStandardSwitch()
|
||||||
discard await dialSwitch.start()
|
discard await dialSwitch.start()
|
||||||
@ -129,16 +131,16 @@ procSuite "Waku Store":
|
|||||||
key = PrivateKey.random(ECDSA, rng[]).get()
|
key = PrivateKey.random(ECDSA, rng[]).get()
|
||||||
peer = PeerInfo.init(key)
|
peer = PeerInfo.init(key)
|
||||||
var
|
var
|
||||||
msgList = @[WakuMessage(payload: @[byte 0], contentTopic: ContentTopic(2)),
|
msgList = @[WakuMessage(payload: @[byte 0], contentTopic: ContentTopic("2")),
|
||||||
WakuMessage(payload: @[byte 1],contentTopic: ContentTopic(1)),
|
WakuMessage(payload: @[byte 1],contentTopic: defaultContentTopic),
|
||||||
WakuMessage(payload: @[byte 2],contentTopic: ContentTopic(1)),
|
WakuMessage(payload: @[byte 2],contentTopic: defaultContentTopic),
|
||||||
WakuMessage(payload: @[byte 3],contentTopic: ContentTopic(1)),
|
WakuMessage(payload: @[byte 3],contentTopic: defaultContentTopic),
|
||||||
WakuMessage(payload: @[byte 4],contentTopic: ContentTopic(1)),
|
WakuMessage(payload: @[byte 4],contentTopic: defaultContentTopic),
|
||||||
WakuMessage(payload: @[byte 5],contentTopic: ContentTopic(1)),
|
WakuMessage(payload: @[byte 5],contentTopic: defaultContentTopic),
|
||||||
WakuMessage(payload: @[byte 6],contentTopic: ContentTopic(1)),
|
WakuMessage(payload: @[byte 6],contentTopic: defaultContentTopic),
|
||||||
WakuMessage(payload: @[byte 7],contentTopic: ContentTopic(1)),
|
WakuMessage(payload: @[byte 7],contentTopic: defaultContentTopic),
|
||||||
WakuMessage(payload: @[byte 8],contentTopic: ContentTopic(1)),
|
WakuMessage(payload: @[byte 8],contentTopic: defaultContentTopic),
|
||||||
WakuMessage(payload: @[byte 9],contentTopic: ContentTopic(2))]
|
WakuMessage(payload: @[byte 9],contentTopic: ContentTopic("2"))]
|
||||||
|
|
||||||
var dialSwitch = newStandardSwitch()
|
var dialSwitch = newStandardSwitch()
|
||||||
discard await dialSwitch.start()
|
discard await dialSwitch.start()
|
||||||
@ -149,7 +151,7 @@ procSuite "Waku Store":
|
|||||||
let
|
let
|
||||||
proto = WakuStore.init(PeerManager.new(dialSwitch), crypto.newRng())
|
proto = WakuStore.init(PeerManager.new(dialSwitch), crypto.newRng())
|
||||||
subscription = proto.subscription()
|
subscription = proto.subscription()
|
||||||
rpc = HistoryQuery(topics: @[ContentTopic(1)], pagingInfo: PagingInfo(pageSize: 2, direction: PagingDirection.FORWARD) )
|
rpc = HistoryQuery(topics: @[defaultContentTopic], pagingInfo: PagingInfo(pageSize: 2, direction: PagingDirection.FORWARD) )
|
||||||
|
|
||||||
proto.setPeer(listenSwitch.peerInfo)
|
proto.setPeer(listenSwitch.peerInfo)
|
||||||
|
|
||||||
@ -181,16 +183,16 @@ procSuite "Waku Store":
|
|||||||
key = PrivateKey.random(ECDSA, rng[]).get()
|
key = PrivateKey.random(ECDSA, rng[]).get()
|
||||||
peer = PeerInfo.init(key)
|
peer = PeerInfo.init(key)
|
||||||
var
|
var
|
||||||
msgList = @[WakuMessage(payload: @[byte 0], contentTopic: ContentTopic(2)),
|
msgList = @[WakuMessage(payload: @[byte 0], contentTopic: ContentTopic("2")),
|
||||||
WakuMessage(payload: @[byte 1],contentTopic: ContentTopic(1)),
|
WakuMessage(payload: @[byte 1],contentTopic: defaultContentTopic),
|
||||||
WakuMessage(payload: @[byte 2],contentTopic: ContentTopic(1)),
|
WakuMessage(payload: @[byte 2],contentTopic: defaultContentTopic),
|
||||||
WakuMessage(payload: @[byte 3],contentTopic: ContentTopic(1)),
|
WakuMessage(payload: @[byte 3],contentTopic: defaultContentTopic),
|
||||||
WakuMessage(payload: @[byte 4],contentTopic: ContentTopic(1)),
|
WakuMessage(payload: @[byte 4],contentTopic: defaultContentTopic),
|
||||||
WakuMessage(payload: @[byte 5],contentTopic: ContentTopic(1)),
|
WakuMessage(payload: @[byte 5],contentTopic: defaultContentTopic),
|
||||||
WakuMessage(payload: @[byte 6],contentTopic: ContentTopic(1)),
|
WakuMessage(payload: @[byte 6],contentTopic: defaultContentTopic),
|
||||||
WakuMessage(payload: @[byte 7],contentTopic: ContentTopic(1)),
|
WakuMessage(payload: @[byte 7],contentTopic: defaultContentTopic),
|
||||||
WakuMessage(payload: @[byte 8],contentTopic: ContentTopic(1)),
|
WakuMessage(payload: @[byte 8],contentTopic: defaultContentTopic),
|
||||||
WakuMessage(payload: @[byte 9],contentTopic: ContentTopic(2))]
|
WakuMessage(payload: @[byte 9],contentTopic: ContentTopic("2"))]
|
||||||
|
|
||||||
var dialSwitch = newStandardSwitch()
|
var dialSwitch = newStandardSwitch()
|
||||||
discard await dialSwitch.start()
|
discard await dialSwitch.start()
|
||||||
@ -220,7 +222,7 @@ procSuite "Waku Store":
|
|||||||
response.pagingInfo.cursor != Index()
|
response.pagingInfo.cursor != Index()
|
||||||
completionFut.complete(true)
|
completionFut.complete(true)
|
||||||
|
|
||||||
let rpc = HistoryQuery(topics: @[ContentTopic(1)], pagingInfo: PagingInfo(pageSize: 2, direction: PagingDirection.BACKWARD) )
|
let rpc = HistoryQuery(topics: @[defaultContentTopic], pagingInfo: PagingInfo(pageSize: 2, direction: PagingDirection.BACKWARD) )
|
||||||
await proto.query(rpc, handler)
|
await proto.query(rpc, handler)
|
||||||
|
|
||||||
check:
|
check:
|
||||||
@ -231,16 +233,16 @@ procSuite "Waku Store":
|
|||||||
key = PrivateKey.random(ECDSA, rng[]).get()
|
key = PrivateKey.random(ECDSA, rng[]).get()
|
||||||
peer = PeerInfo.init(key)
|
peer = PeerInfo.init(key)
|
||||||
var
|
var
|
||||||
msgList = @[WakuMessage(payload: @[byte 0], contentTopic: ContentTopic(2)),
|
msgList = @[WakuMessage(payload: @[byte 0], contentTopic: ContentTopic("2")),
|
||||||
WakuMessage(payload: @[byte 1], contentTopic: ContentTopic(1)),
|
WakuMessage(payload: @[byte 1], contentTopic: defaultContentTopic),
|
||||||
WakuMessage(payload: @[byte 2], contentTopic: ContentTopic(1)),
|
WakuMessage(payload: @[byte 2], contentTopic: defaultContentTopic),
|
||||||
WakuMessage(payload: @[byte 3], contentTopic: ContentTopic(1)),
|
WakuMessage(payload: @[byte 3], contentTopic: defaultContentTopic),
|
||||||
WakuMessage(payload: @[byte 4], contentTopic: ContentTopic(1)),
|
WakuMessage(payload: @[byte 4], contentTopic: defaultContentTopic),
|
||||||
WakuMessage(payload: @[byte 5], contentTopic: ContentTopic(1)),
|
WakuMessage(payload: @[byte 5], contentTopic: defaultContentTopic),
|
||||||
WakuMessage(payload: @[byte 6], contentTopic: ContentTopic(1)),
|
WakuMessage(payload: @[byte 6], contentTopic: defaultContentTopic),
|
||||||
WakuMessage(payload: @[byte 7], contentTopic: ContentTopic(1)),
|
WakuMessage(payload: @[byte 7], contentTopic: defaultContentTopic),
|
||||||
WakuMessage(payload: @[byte 8], contentTopic: ContentTopic(1)),
|
WakuMessage(payload: @[byte 8], contentTopic: defaultContentTopic),
|
||||||
WakuMessage(payload: @[byte 9], contentTopic: ContentTopic(2))]
|
WakuMessage(payload: @[byte 9], contentTopic: ContentTopic("2"))]
|
||||||
|
|
||||||
var dialSwitch = newStandardSwitch()
|
var dialSwitch = newStandardSwitch()
|
||||||
discard await dialSwitch.start()
|
discard await dialSwitch.start()
|
||||||
@ -268,7 +270,7 @@ procSuite "Waku Store":
|
|||||||
response.pagingInfo == PagingInfo()
|
response.pagingInfo == PagingInfo()
|
||||||
completionFut.complete(true)
|
completionFut.complete(true)
|
||||||
|
|
||||||
let rpc = HistoryQuery(topics: @[ContentTopic(1)] )
|
let rpc = HistoryQuery(topics: @[defaultContentTopic] )
|
||||||
|
|
||||||
await proto.query(rpc, handler)
|
await proto.query(rpc, handler)
|
||||||
|
|
||||||
@ -277,7 +279,7 @@ procSuite "Waku Store":
|
|||||||
|
|
||||||
test "Index Protobuf encoder/decoder test":
|
test "Index Protobuf encoder/decoder test":
|
||||||
let
|
let
|
||||||
index = computeIndex(WakuMessage(payload: @[byte 1], contentTopic: ContentTopic(1)))
|
index = computeIndex(WakuMessage(payload: @[byte 1], contentTopic: defaultContentTopic))
|
||||||
pb = index.encode()
|
pb = index.encode()
|
||||||
decodedIndex = Index.init(pb.buffer)
|
decodedIndex = Index.init(pb.buffer)
|
||||||
|
|
||||||
@ -310,7 +312,7 @@ procSuite "Waku Store":
|
|||||||
|
|
||||||
test "PagingInfo Protobuf encod/init test":
|
test "PagingInfo Protobuf encod/init test":
|
||||||
let
|
let
|
||||||
index = computeIndex(WakuMessage(payload: @[byte 1], contentTopic: ContentTopic(1)))
|
index = computeIndex(WakuMessage(payload: @[byte 1], contentTopic: defaultContentTopic))
|
||||||
pagingInfo = PagingInfo(pageSize: 1, cursor: index, direction: PagingDirection.BACKWARD)
|
pagingInfo = PagingInfo(pageSize: 1, cursor: index, direction: PagingDirection.BACKWARD)
|
||||||
pb = pagingInfo.encode()
|
pb = pagingInfo.encode()
|
||||||
decodedPagingInfo = PagingInfo.init(pb.buffer)
|
decodedPagingInfo = PagingInfo.init(pb.buffer)
|
||||||
@ -332,9 +334,9 @@ procSuite "Waku Store":
|
|||||||
|
|
||||||
test "HistoryQuery Protobuf encode/init test":
|
test "HistoryQuery Protobuf encode/init test":
|
||||||
let
|
let
|
||||||
index = computeIndex(WakuMessage(payload: @[byte 1], contentTopic: ContentTopic(1)))
|
index = computeIndex(WakuMessage(payload: @[byte 1], contentTopic: defaultContentTopic))
|
||||||
pagingInfo = PagingInfo(pageSize: 1, cursor: index, direction: PagingDirection.BACKWARD)
|
pagingInfo = PagingInfo(pageSize: 1, cursor: index, direction: PagingDirection.BACKWARD)
|
||||||
query=HistoryQuery(topics: @[ContentTopic(1)], pagingInfo: pagingInfo, startTime: float64(10), endTime: float64(11))
|
query=HistoryQuery(topics: @[defaultContentTopic], pagingInfo: pagingInfo, startTime: float64(10), endTime: float64(11))
|
||||||
pb = query.encode()
|
pb = query.encode()
|
||||||
decodedQuery = HistoryQuery.init(pb.buffer)
|
decodedQuery = HistoryQuery.init(pb.buffer)
|
||||||
|
|
||||||
@ -355,7 +357,7 @@ procSuite "Waku Store":
|
|||||||
|
|
||||||
test "HistoryResponse Protobuf encod/init test":
|
test "HistoryResponse Protobuf encod/init test":
|
||||||
let
|
let
|
||||||
wm = WakuMessage(payload: @[byte 1], contentTopic: ContentTopic(1))
|
wm = WakuMessage(payload: @[byte 1], contentTopic: defaultContentTopic)
|
||||||
index = computeIndex(wm)
|
index = computeIndex(wm)
|
||||||
pagingInfo = PagingInfo(pageSize: 1, cursor: index, direction: PagingDirection.BACKWARD)
|
pagingInfo = PagingInfo(pageSize: 1, cursor: index, direction: PagingDirection.BACKWARD)
|
||||||
res = HistoryResponse(messages: @[wm], pagingInfo:pagingInfo)
|
res = HistoryResponse(messages: @[wm], pagingInfo:pagingInfo)
|
||||||
|
|||||||
@ -54,7 +54,7 @@ procSuite "Waku SWAP Accounting":
|
|||||||
nodeKey2 = crypto.PrivateKey.random(Secp256k1, rng[])[]
|
nodeKey2 = crypto.PrivateKey.random(Secp256k1, rng[])[]
|
||||||
node2 = WakuNode.init(nodeKey2, ValidIpAddress.init("0.0.0.0"),
|
node2 = WakuNode.init(nodeKey2, ValidIpAddress.init("0.0.0.0"),
|
||||||
Port(60001))
|
Port(60001))
|
||||||
contentTopic = ContentTopic(1)
|
contentTopic = ContentTopic("/waku/2/default-content/proto")
|
||||||
message = WakuMessage(payload: "hello world".toBytes(), contentTopic: contentTopic)
|
message = WakuMessage(payload: "hello world".toBytes(), contentTopic: contentTopic)
|
||||||
|
|
||||||
var completionFut = newFuture[bool]()
|
var completionFut = newFuture[bool]()
|
||||||
@ -100,7 +100,7 @@ procSuite "Waku SWAP Accounting":
|
|||||||
nodeKey2 = crypto.PrivateKey.random(Secp256k1, rng[])[]
|
nodeKey2 = crypto.PrivateKey.random(Secp256k1, rng[])[]
|
||||||
node2 = WakuNode.init(nodeKey2, ValidIpAddress.init("0.0.0.0"),
|
node2 = WakuNode.init(nodeKey2, ValidIpAddress.init("0.0.0.0"),
|
||||||
Port(60001))
|
Port(60001))
|
||||||
contentTopic = ContentTopic(1)
|
contentTopic = ContentTopic("/waku/2/default-content/proto")
|
||||||
message = WakuMessage(payload: "hello world".toBytes(), contentTopic: contentTopic)
|
message = WakuMessage(payload: "hello world".toBytes(), contentTopic: contentTopic)
|
||||||
|
|
||||||
var futures = [newFuture[bool](), newFuture[bool]()]
|
var futures = [newFuture[bool](), newFuture[bool]()]
|
||||||
|
|||||||
@ -27,7 +27,7 @@ procSuite "WakuNode":
|
|||||||
node = WakuNode.init(nodeKey, ValidIpAddress.init("0.0.0.0"),
|
node = WakuNode.init(nodeKey, ValidIpAddress.init("0.0.0.0"),
|
||||||
Port(60000))
|
Port(60000))
|
||||||
pubSubTopic = "chat"
|
pubSubTopic = "chat"
|
||||||
contentTopic = ContentTopic(1)
|
contentTopic = ContentTopic("/waku/2/default-content/proto")
|
||||||
filterRequest = FilterRequest(topic: pubSubTopic, contentFilters: @[ContentFilter(topics: @[contentTopic])], subscribe: true)
|
filterRequest = FilterRequest(topic: pubSubTopic, contentFilters: @[ContentFilter(topics: @[contentTopic])], subscribe: true)
|
||||||
message = WakuMessage(payload: "hello world".toBytes(),
|
message = WakuMessage(payload: "hello world".toBytes(),
|
||||||
contentTopic: contentTopic)
|
contentTopic: contentTopic)
|
||||||
@ -79,7 +79,7 @@ procSuite "WakuNode":
|
|||||||
node2 = WakuNode.init(nodeKey2, ValidIpAddress.init("0.0.0.0"),
|
node2 = WakuNode.init(nodeKey2, ValidIpAddress.init("0.0.0.0"),
|
||||||
Port(60002))
|
Port(60002))
|
||||||
pubSubTopic = "chat"
|
pubSubTopic = "chat"
|
||||||
contentTopic = ContentTopic(1)
|
contentTopic = ContentTopic("/waku/2/default-content/proto")
|
||||||
filterRequest = FilterRequest(topic: pubSubTopic, contentFilters: @[ContentFilter(topics: @[contentTopic])], subscribe: true)
|
filterRequest = FilterRequest(topic: pubSubTopic, contentFilters: @[ContentFilter(topics: @[contentTopic])], subscribe: true)
|
||||||
message = WakuMessage(payload: "hello world".toBytes(),
|
message = WakuMessage(payload: "hello world".toBytes(),
|
||||||
contentTopic: contentTopic)
|
contentTopic: contentTopic)
|
||||||
@ -141,7 +141,7 @@ procSuite "WakuNode":
|
|||||||
nodeKey2 = crypto.PrivateKey.random(Secp256k1, rng[])[]
|
nodeKey2 = crypto.PrivateKey.random(Secp256k1, rng[])[]
|
||||||
node2 = WakuNode.init(nodeKey2, ValidIpAddress.init("0.0.0.0"),
|
node2 = WakuNode.init(nodeKey2, ValidIpAddress.init("0.0.0.0"),
|
||||||
Port(60002))
|
Port(60002))
|
||||||
contentTopic = ContentTopic(1)
|
contentTopic = ContentTopic("/waku/2/default-content/proto")
|
||||||
message = WakuMessage(payload: "hello world".toBytes(), contentTopic: contentTopic)
|
message = WakuMessage(payload: "hello world".toBytes(), contentTopic: contentTopic)
|
||||||
|
|
||||||
var completionFut = newFuture[bool]()
|
var completionFut = newFuture[bool]()
|
||||||
@ -177,7 +177,7 @@ procSuite "WakuNode":
|
|||||||
nodeKey2 = crypto.PrivateKey.random(Secp256k1, rng[])[]
|
nodeKey2 = crypto.PrivateKey.random(Secp256k1, rng[])[]
|
||||||
node2 = WakuNode.init(nodeKey2, ValidIpAddress.init("0.0.0.0"),
|
node2 = WakuNode.init(nodeKey2, ValidIpAddress.init("0.0.0.0"),
|
||||||
Port(60002))
|
Port(60002))
|
||||||
contentTopic = ContentTopic(1)
|
contentTopic = ContentTopic("/waku/2/default-content/proto")
|
||||||
message = WakuMessage(payload: "hello world".toBytes(), contentTopic: contentTopic)
|
message = WakuMessage(payload: "hello world".toBytes(), contentTopic: contentTopic)
|
||||||
|
|
||||||
var completionFut = newFuture[bool]()
|
var completionFut = newFuture[bool]()
|
||||||
@ -219,7 +219,7 @@ procSuite "WakuNode":
|
|||||||
node3 = WakuNode.init(nodeKey3, ValidIpAddress.init("0.0.0.0"),
|
node3 = WakuNode.init(nodeKey3, ValidIpAddress.init("0.0.0.0"),
|
||||||
Port(60003))
|
Port(60003))
|
||||||
pubSubTopic = "test"
|
pubSubTopic = "test"
|
||||||
contentTopic = ContentTopic(1)
|
contentTopic = ContentTopic("/waku/2/default-content/proto")
|
||||||
payload = "hello world".toBytes()
|
payload = "hello world".toBytes()
|
||||||
message = WakuMessage(payload: payload, contentTopic: contentTopic)
|
message = WakuMessage(payload: payload, contentTopic: contentTopic)
|
||||||
|
|
||||||
@ -317,12 +317,12 @@ procSuite "WakuNode":
|
|||||||
node3 = WakuNode.init(nodeKey3, ValidIpAddress.init("0.0.0.0"), Port(60003))
|
node3 = WakuNode.init(nodeKey3, ValidIpAddress.init("0.0.0.0"), Port(60003))
|
||||||
|
|
||||||
pubSubTopic = "test"
|
pubSubTopic = "test"
|
||||||
contentTopic1 = ContentTopic(1)
|
contentTopic1 = ContentTopic("/waku/2/default-content/proto")
|
||||||
payload = "hello world".toBytes()
|
payload = "hello world".toBytes()
|
||||||
message1 = WakuMessage(payload: payload, contentTopic: contentTopic1)
|
message1 = WakuMessage(payload: payload, contentTopic: contentTopic1)
|
||||||
|
|
||||||
payload2 = "you should not see this message!".toBytes()
|
payload2 = "you should not see this message!".toBytes()
|
||||||
contentTopic2 = ContentTopic(2)
|
contentTopic2 = ContentTopic("2")
|
||||||
message2 = WakuMessage(payload: payload2, contentTopic: contentTopic2)
|
message2 = WakuMessage(payload: payload2, contentTopic: contentTopic2)
|
||||||
|
|
||||||
# start all the nodes
|
# start all the nodes
|
||||||
@ -410,7 +410,7 @@ procSuite "WakuNode":
|
|||||||
node3 = WakuNode.init(nodeKey3, ValidIpAddress.init("0.0.0.0"), Port(60003))
|
node3 = WakuNode.init(nodeKey3, ValidIpAddress.init("0.0.0.0"), Port(60003))
|
||||||
|
|
||||||
pubSubTopic = "defaultTopic"
|
pubSubTopic = "defaultTopic"
|
||||||
contentTopic1 = ContentTopic(1)
|
contentTopic1 = ContentTopic("/waku/2/default-content/proto")
|
||||||
payload = "hello world".toBytes()
|
payload = "hello world".toBytes()
|
||||||
message1 = WakuMessage(payload: payload, contentTopic: contentTopic1)
|
message1 = WakuMessage(payload: payload, contentTopic: contentTopic1)
|
||||||
|
|
||||||
|
|||||||
@ -1,7 +1,7 @@
|
|||||||
import
|
import
|
||||||
std/tables,
|
std/tables,
|
||||||
chronos, confutils, chronicles, chronicles/topics_registry, metrics,
|
chronos, confutils, chronicles, chronicles/topics_registry, metrics,
|
||||||
stew/endians2,
|
stew/[byteutils, objects],
|
||||||
stew/shims/net as stewNet, json_rpc/rpcserver,
|
stew/shims/net as stewNet, json_rpc/rpcserver,
|
||||||
# Waku v1 imports
|
# Waku v1 imports
|
||||||
eth/[keys, p2p], eth/common/utils,
|
eth/[keys, p2p], eth/common/utils,
|
||||||
@ -44,17 +44,22 @@ type
|
|||||||
func toWakuMessage(env: Envelope): WakuMessage =
|
func toWakuMessage(env: Envelope): WakuMessage =
|
||||||
# Translate a Waku v1 envelope to a Waku v2 message
|
# Translate a Waku v1 envelope to a Waku v2 message
|
||||||
WakuMessage(payload: env.data,
|
WakuMessage(payload: env.data,
|
||||||
contentTopic: ContentTopic(uint32.fromBytes(env.topic, Endianness.bigEndian)),
|
contentTopic: ContentTopic(string.fromBytes(env.topic)),
|
||||||
version: 1)
|
version: 1)
|
||||||
|
|
||||||
proc toWakuV2(bridge: WakuBridge, env: Envelope) {.async.} =
|
proc toWakuV2(bridge: WakuBridge, env: Envelope) {.async.} =
|
||||||
waku_bridge_transfers.inc(labelValues = ["v1_to_v2"])
|
waku_bridge_transfers.inc(labelValues = ["v1_to_v2"])
|
||||||
|
|
||||||
await bridge.nodev2.publish(defaultBridgeTopic, env.toWakuMessage())
|
await bridge.nodev2.publish(defaultBridgeTopic, env.toWakuMessage())
|
||||||
|
|
||||||
proc toWakuV1(bridge: WakuBridge, msg: WakuMessage) {.gcsafe.} =
|
proc toWakuV1(bridge: WakuBridge, msg: WakuMessage) {.gcsafe.} =
|
||||||
waku_bridge_transfers.inc(labelValues = ["v2_to_v1"])
|
waku_bridge_transfers.inc(labelValues = ["v2_to_v1"])
|
||||||
|
|
||||||
|
# @TODO: use namespacing to map v2 contentTopics to v1 topics
|
||||||
|
let v1TopicSeq = msg.contentTopic.toBytes()[0..3]
|
||||||
|
|
||||||
discard bridge.nodev1.postMessage(ttl = defaultTTL,
|
discard bridge.nodev1.postMessage(ttl = defaultTTL,
|
||||||
topic = msg.contentTopic.toBytes(Endianness.bigEndian),
|
topic = toArray(4, v1TopicSeq),
|
||||||
payload = msg.payload)
|
payload = msg.payload)
|
||||||
|
|
||||||
##############
|
##############
|
||||||
|
|||||||
@ -43,7 +43,7 @@ proc installAdminApiHandlers*(node: WakuNode, rpcsrv: RpcServer) =
|
|||||||
raise newException(ValueError, "Failed to connect to peers: " & $peers)
|
raise newException(ValueError, "Failed to connect to peers: " & $peers)
|
||||||
|
|
||||||
rpcsrv.rpc("get_waku_v2_admin_v1_peers") do() -> seq[WakuPeer]:
|
rpcsrv.rpc("get_waku_v2_admin_v1_peers") do() -> seq[WakuPeer]:
|
||||||
## Returns history for a list of content topics with optional paging
|
## Returns a list of peers registered for this node
|
||||||
debug "get_waku_v2_admin_v1_peers"
|
debug "get_waku_v2_admin_v1_peers"
|
||||||
|
|
||||||
# Create a single list of peers from mounted protocols.
|
# Create a single list of peers from mounted protocols.
|
||||||
|
|||||||
@ -3,6 +3,7 @@ import
|
|||||||
eth/keys,
|
eth/keys,
|
||||||
../../../v1/node/rpc/hexstrings,
|
../../../v1/node/rpc/hexstrings,
|
||||||
../../protocol/waku_store/waku_store_types,
|
../../protocol/waku_store/waku_store_types,
|
||||||
|
../../protocol/waku_message,
|
||||||
../waku_payload,
|
../waku_payload,
|
||||||
./jsonrpc_types
|
./jsonrpc_types
|
||||||
|
|
||||||
@ -37,15 +38,14 @@ proc toStoreResponse*(historyResponse: HistoryResponse): StoreResponse =
|
|||||||
pagingOptions: if historyResponse.pagingInfo != PagingInfo(): some(historyResponse.pagingInfo.toPagingOptions()) else: none(StorePagingOptions))
|
pagingOptions: if historyResponse.pagingInfo != PagingInfo(): some(historyResponse.pagingInfo.toPagingOptions()) else: none(StorePagingOptions))
|
||||||
|
|
||||||
proc toWakuMessage*(relayMessage: WakuRelayMessage, version: uint32): WakuMessage =
|
proc toWakuMessage*(relayMessage: WakuRelayMessage, version: uint32): WakuMessage =
|
||||||
# @TODO global definition for default content topic
|
const defaultCT = ContentTopic("/waku/2/default-content/proto")
|
||||||
const defaultCT = 0
|
|
||||||
WakuMessage(payload: relayMessage.payload,
|
WakuMessage(payload: relayMessage.payload,
|
||||||
contentTopic: if relayMessage.contentTopic.isSome: relayMessage.contentTopic.get else: defaultCT,
|
contentTopic: if relayMessage.contentTopic.isSome: relayMessage.contentTopic.get else: defaultCT,
|
||||||
version: version)
|
version: version)
|
||||||
|
|
||||||
proc toWakuMessage*(relayMessage: WakuRelayMessage, version: uint32, rng: ref BrHmacDrbgContext, symkey: Option[SymKey], pubKey: Option[keys.PublicKey]): WakuMessage =
|
proc toWakuMessage*(relayMessage: WakuRelayMessage, version: uint32, rng: ref BrHmacDrbgContext, symkey: Option[SymKey], pubKey: Option[keys.PublicKey]): WakuMessage =
|
||||||
# @TODO global definition for default content topic
|
# @TODO global definition for default content topic
|
||||||
const defaultCT = 0
|
const defaultCT = ContentTopic("/waku/2/default-content/proto")
|
||||||
|
|
||||||
let payload = Payload(payload: relayMessage.payload,
|
let payload = Payload(payload: relayMessage.payload,
|
||||||
dst: pubKey,
|
dst: pubKey,
|
||||||
|
|||||||
@ -18,7 +18,7 @@ createRpcSigs(RpcHttpClient, sigWakuPath)
|
|||||||
|
|
||||||
const defaultTopic = "/waku/2/default-waku/proto"
|
const defaultTopic = "/waku/2/default-waku/proto"
|
||||||
|
|
||||||
const defaultContentTopic = ContentTopic(1)
|
const defaultContentTopic = ContentTopic("waku/2/default-content/proto")
|
||||||
|
|
||||||
const topicAmount = 10 #100
|
const topicAmount = 10 #100
|
||||||
|
|
||||||
|
|||||||
@ -33,7 +33,7 @@ var node = newRpcHttpClient()
|
|||||||
waitfor node.connect("localhost", rpcPort)
|
waitfor node.connect("localhost", rpcPort)
|
||||||
|
|
||||||
let pubSubTopic = "/waku/2/default-waku/proto"
|
let pubSubTopic = "/waku/2/default-waku/proto"
|
||||||
let contentTopic = ContentTopic(1)
|
let contentTopic = ContentTopic("/waku/2/default-content/proto")
|
||||||
let relayMessage = WakuRelayMessage(payload: input.toBytes(), contentTopic: some(contentTopic))
|
let relayMessage = WakuRelayMessage(payload: input.toBytes(), contentTopic: some(contentTopic))
|
||||||
var res = waitfor node.post_waku_v2_relay_v1_message(pubSubTopic, relayMessage)
|
var res = waitfor node.post_waku_v2_relay_v1_message(pubSubTopic, relayMessage)
|
||||||
echo "Waku publish response: ", res
|
echo "Waku publish response: ", res
|
||||||
|
|||||||
@ -32,5 +32,5 @@ echo "Input is:", input
|
|||||||
var node = newRpcHttpClient()
|
var node = newRpcHttpClient()
|
||||||
waitfor node.connect("localhost", rpcPort)
|
waitfor node.connect("localhost", rpcPort)
|
||||||
|
|
||||||
var res = waitfor node.get_waku_v2_store_v1_messages(@[ContentTopic(parseUInt(input))], none(StorePagingOptions))
|
var res = waitfor node.get_waku_v2_store_v1_messages(@[ContentTopic(input)], none(StorePagingOptions))
|
||||||
echo "Waku query response: ", res
|
echo "Waku query response: ", res
|
||||||
|
|||||||
@ -33,6 +33,6 @@ var node = newRpcHttpClient()
|
|||||||
waitfor node.connect("localhost", rpcPort)
|
waitfor node.connect("localhost", rpcPort)
|
||||||
|
|
||||||
let pubSubTopic = "/waku/2/default-waku/proto"
|
let pubSubTopic = "/waku/2/default-waku/proto"
|
||||||
let contentTopic = ContentTopic(1)
|
let contentTopic = ContentTopic("/waku/2/default-content/proto")
|
||||||
var res = waitfor node.post_waku_v2_filter_v1_subscription(@[ContentFilter(topics: @[contentTopic])], some(pubSubTopic))
|
var res = waitfor node.post_waku_v2_filter_v1_subscription(@[ContentFilter(topics: @[contentTopic])], some(pubSubTopic))
|
||||||
echo "Waku query response: ", res
|
echo "Waku query response: ", res
|
||||||
|
|||||||
@ -5,7 +5,7 @@ import
|
|||||||
libp2p/protocols/protocol,
|
libp2p/protocols/protocol,
|
||||||
libp2p/protobuf/minprotobuf,
|
libp2p/protobuf/minprotobuf,
|
||||||
libp2p/stream/connection,
|
libp2p/stream/connection,
|
||||||
stew/results,
|
stew/[byteutils, results],
|
||||||
./message_store,
|
./message_store,
|
||||||
../sqlite,
|
../sqlite,
|
||||||
../../../protocol/waku_message,
|
../../../protocol/waku_message,
|
||||||
@ -31,7 +31,7 @@ proc init*(T: type WakuMessageStore, db: SqliteDatabase): MessageStoreResult[T]
|
|||||||
CREATE TABLE IF NOT EXISTS messages (
|
CREATE TABLE IF NOT EXISTS messages (
|
||||||
id BLOB PRIMARY KEY,
|
id BLOB PRIMARY KEY,
|
||||||
timestamp INTEGER NOT NULL,
|
timestamp INTEGER NOT NULL,
|
||||||
contentTopic INTEGER NOT NULL,
|
contentTopic BLOB NOT NULL,
|
||||||
payload BLOB
|
payload BLOB
|
||||||
) WITHOUT ROWID;
|
) WITHOUT ROWID;
|
||||||
""", NoParams, void)
|
""", NoParams, void)
|
||||||
@ -57,14 +57,14 @@ method put*(db: WakuMessageStore, cursor: Index, message: WakuMessage): MessageS
|
|||||||
##
|
##
|
||||||
let prepare = db.database.prepareStmt(
|
let prepare = db.database.prepareStmt(
|
||||||
"INSERT INTO messages (id, timestamp, contentTopic, payload) VALUES (?, ?, ?, ?);",
|
"INSERT INTO messages (id, timestamp, contentTopic, payload) VALUES (?, ?, ?, ?);",
|
||||||
(seq[byte], int64, uint32, seq[byte]),
|
(seq[byte], int64, seq[byte], seq[byte]),
|
||||||
void
|
void
|
||||||
)
|
)
|
||||||
|
|
||||||
if prepare.isErr:
|
if prepare.isErr:
|
||||||
return err("failed to prepare")
|
return err("failed to prepare")
|
||||||
|
|
||||||
let res = prepare.value.exec((@(cursor.digest.data), int64(cursor.receivedTime), message.contentTopic, message.payload))
|
let res = prepare.value.exec((@(cursor.digest.data), int64(cursor.receivedTime), message.contentTopic.toBytes(), message.payload))
|
||||||
if res.isErr:
|
if res.isErr:
|
||||||
return err("failed")
|
return err("failed")
|
||||||
|
|
||||||
@ -87,11 +87,14 @@ method getAll*(db: WakuMessageStore, onData: message_store.DataProc): MessageSto
|
|||||||
gotMessages = true
|
gotMessages = true
|
||||||
let
|
let
|
||||||
timestamp = sqlite3_column_int64(s, 0)
|
timestamp = sqlite3_column_int64(s, 0)
|
||||||
topic = sqlite3_column_int(s, 1)
|
topic = cast[ptr UncheckedArray[byte]](sqlite3_column_blob(s, 1))
|
||||||
|
topicL = sqlite3_column_bytes(s,1)
|
||||||
p = cast[ptr UncheckedArray[byte]](sqlite3_column_blob(s, 2))
|
p = cast[ptr UncheckedArray[byte]](sqlite3_column_blob(s, 2))
|
||||||
l = sqlite3_column_bytes(s, 2)
|
l = sqlite3_column_bytes(s, 2)
|
||||||
|
|
||||||
onData(uint64(timestamp), WakuMessage(contentTopic: ContentTopic(int(topic)), payload: @(toOpenArray(p, 0, l-1))))
|
onData(uint64(timestamp),
|
||||||
|
WakuMessage(contentTopic: ContentTopic(string.fromBytes(@(toOpenArray(topic, 0, topicL-1)))),
|
||||||
|
payload: @(toOpenArray(p, 0, l-1))))
|
||||||
|
|
||||||
let res = db.database.query("SELECT timestamp, contentTopic, payload FROM messages ORDER BY timestamp ASC", msg)
|
let res = db.database.query("SELECT timestamp, contentTopic, payload FROM messages ORDER BY timestamp ASC", msg)
|
||||||
if res.isErr:
|
if res.isErr:
|
||||||
|
|||||||
@ -10,7 +10,7 @@ import
|
|||||||
libp2p/protobuf/minprotobuf
|
libp2p/protobuf/minprotobuf
|
||||||
|
|
||||||
type
|
type
|
||||||
ContentTopic* = uint32
|
ContentTopic* = string
|
||||||
|
|
||||||
WakuMessage* = object
|
WakuMessage* = object
|
||||||
payload*: seq[byte]
|
payload*: seq[byte]
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user