refactor(wakunode): remove deprecated non-async methods

This commit is contained in:
Lorenzo Delgado 2022-09-21 18:27:40 +02:00 committed by GitHub
parent 11832411a0
commit 5e3a75c56e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 299 additions and 278 deletions

View File

@ -241,7 +241,7 @@ proc publish(c: Chat, line: string) =
c.node.wakuRlnRelay.lastEpoch = message.proof.epoch c.node.wakuRlnRelay.lastEpoch = message.proof.epoch
if not c.node.wakuLightPush.isNil(): if not c.node.wakuLightPush.isNil():
# Attempt lightpush # Attempt lightpush
asyncSpawn c.node.lightpush(DefaultTopic, message, handler) asyncSpawn c.node.lightpush2(DefaultTopic, message)
else: else:
asyncSpawn c.node.publish(DefaultTopic, message, handler) asyncSpawn c.node.publish(DefaultTopic, message, handler)
else: else:
@ -270,7 +270,7 @@ proc publish(c: Chat, line: string) =
if not c.node.wakuLightPush.isNil(): if not c.node.wakuLightPush.isNil():
# Attempt lightpush # Attempt lightpush
asyncSpawn c.node.lightpush(DefaultTopic, message, handler) asyncSpawn c.node.lightpush2(DefaultTopic, message)
else: else:
asyncSpawn c.node.publish(DefaultTopic, message) asyncSpawn c.node.publish(DefaultTopic, message)
@ -475,7 +475,9 @@ proc processInput(rfd: AsyncFD) {.async.} =
echo &"{chatLine}" echo &"{chatLine}"
info "Hit store handler" info "Hit store handler"
await node.query(HistoryQuery(contentFilters: @[HistoryContentFilter(contentTopic: chat.contentTopic)]), storeHandler) let queryRes = await node.query(HistoryQuery(contentFilters: @[HistoryContentFilter(contentTopic: chat.contentTopic)]))
if queryRes.isOk():
storeHandler(queryRes.value)
# NOTE Must be mounted after relay # NOTE Must be mounted after relay
if conf.lightpushnode != "": if conf.lightpushnode != "":

View File

@ -1,9 +1,12 @@
{.used.} {.used.}
import import
std/[options, tables, sets], std/[options, tables, sets, times],
stew/byteutils,
stew/shims/net as stewNet,
testutils/unittests, testutils/unittests,
chronos, chronicles, stew/shims/net as stewNet, stew/byteutils, chronos,
chronicles,
libp2p/switch, libp2p/switch,
libp2p/protobuf/minprotobuf, libp2p/protobuf/minprotobuf,
libp2p/stream/[bufferstream, connection], libp2p/stream/[bufferstream, connection],
@ -17,7 +20,31 @@ import
../../waku/v2/node/storage/message/waku_store_queue, ../../waku/v2/node/storage/message/waku_store_queue,
../../waku/v2/node/wakunode2, ../../waku/v2/node/wakunode2,
../../waku/v2/utils/peers, ../../waku/v2/utils/peers,
../test_helpers, ./utils ../../waku/v2/utils/time,
../test_helpers,
./utils
const
DefaultPubsubTopic = "/waku/2/default-waku/proto"
DefaultContentTopic = ContentTopic("/waku/2/default-content/proto")
proc now(): Timestamp =
getNanosecondTime(getTime().toUnixFloat())
proc fakeWakuMessage(
payload = "TEST-PAYLOAD",
contentTopic = DefaultContentTopic,
ts = now()
): WakuMessage =
WakuMessage(
payload: toBytes(payload),
contentTopic: contentTopic,
version: 1,
timestamp: ts
)
procSuite "Waku SWAP Accounting": procSuite "Waku SWAP Accounting":
test "Handshake Encode/Decode": test "Handshake Encode/Decode":
@ -50,98 +77,91 @@ procSuite "Waku SWAP Accounting":
# With current logic state isn't updated because of bad cheque # With current logic state isn't updated because of bad cheque
# Consider moving this test to e2e test, and/or move swap module to be on by default # Consider moving this test to e2e test, and/or move swap module to be on by default
asyncTest "Update accounting state after store operations": asyncTest "Update accounting state after store operations":
## Setup
let let
nodeKey1 = crypto.PrivateKey.random(Secp256k1, rng[])[] serverKey = crypto.PrivateKey.random(Secp256k1, rng[])[]
node1 = WakuNode.new(nodeKey1, ValidIpAddress.init("0.0.0.0"), Port(60000)) server = WakuNode.new(serverKey, ValidIpAddress.init("0.0.0.0"), Port(60002))
nodeKey2 = crypto.PrivateKey.random(Secp256k1, rng[])[] clientKey = crypto.PrivateKey.random(Secp256k1, rng[])[]
node2 = WakuNode.new(nodeKey2, ValidIpAddress.init("0.0.0.0"), Port(60001)) client = WakuNode.new(clientKey, ValidIpAddress.init("0.0.0.0"), Port(60000))
let
contentTopic = ContentTopic("/waku/2/default-content/proto")
message = WakuMessage(payload: "hello world".toBytes(), contentTopic: contentTopic)
var completionFut = newFuture[bool]()
# Start nodes and mount protocols # Start nodes and mount protocols
await node1.start() await allFutures(client.start(), server.start())
await node1.mountSwap() await server.mountSwap()
await node1.mountStore(store=StoreQueueRef.new()) await server.mountStore(store=StoreQueueRef.new())
await node2.start() await client.mountSwap()
await node2.mountSwap() await client.mountStore()
await node2.mountStore(store=StoreQueueRef.new())
node2.wakuStore.handleMessage("/waku/2/default-waku/proto", message) client.wakuStore.setPeer(server.peerInfo.toRemotePeerInfo())
client.wakuSwap.setPeer(server.peerInfo.toRemotePeerInfo())
server.wakuSwap.setPeer(client.peerInfo.toRemotePeerInfo())
await sleepAsync(500.millis) ## Given
let message = fakeWakuMessage()
node1.wakuStore.setPeer(node2.switch.peerInfo.toRemotePeerInfo()) server.wakuStore.handleMessage(DefaultPubsubTopic, message)
node1.wakuSwap.setPeer(node2.switch.peerInfo.toRemotePeerInfo())
node2.wakuSwap.setPeer(node1.switch.peerInfo.toRemotePeerInfo())
proc storeHandler(response: HistoryResponse) {.gcsafe, closure.} = ## When
debug "storeHandler hit" let queryRes = await client.query(HistoryQuery(contentFilters: @[HistoryContentFilter(contentTopic: DefaultContentTopic)]))
## Then
check queryRes.isOk()
let response = queryRes.get()
check: check:
response.messages[0] == message response.messages == @[message]
completionFut.complete(true)
await node1.query(HistoryQuery(contentFilters: @[HistoryContentFilter(contentTopic: contentTopic)]), storeHandler)
check: check:
(await completionFut.withTimeout(5.seconds)) == true client.wakuSwap.accounting[server.peerInfo.peerId] == 1
# Accounting table updated with credit and debit, respectively server.wakuSwap.accounting[client.peerInfo.peerId] == -1
node1.wakuSwap.accounting[node2.switch.peerInfo.peerId] == 1
node2.wakuSwap.accounting[node1.switch.peerInfo.peerId] == -1 ## Cleanup
await node1.stop() await allFutures(client.stop(), server.stop())
await node2.stop()
# TODO Add cheque here
# This test will only Be checked if in Mock mode # This test will only Be checked if in Mock mode
# TODO: Add cheque here
asyncTest "Update accounting state after sending cheque": asyncTest "Update accounting state after sending cheque":
## Setup
let let
nodeKey1 = crypto.PrivateKey.random(Secp256k1, rng[])[] serverKey = crypto.PrivateKey.random(Secp256k1, rng[])[]
node1 = WakuNode.new(nodeKey1, ValidIpAddress.init("0.0.0.0"), Port(60000)) server = WakuNode.new(serverKey, ValidIpAddress.init("0.0.0.0"), Port(60002))
nodeKey2 = crypto.PrivateKey.random(Secp256k1, rng[])[] clientKey = crypto.PrivateKey.random(Secp256k1, rng[])[]
node2 = WakuNode.new(nodeKey2, ValidIpAddress.init("0.0.0.0"), Port(60001)) client = WakuNode.new(clientKey, ValidIpAddress.init("0.0.0.0"), Port(60000))
let
contentTopic = ContentTopic("/waku/2/default-content/proto")
message = WakuMessage(payload: "hello world".toBytes(), contentTopic: contentTopic)
var futures = [newFuture[bool](), newFuture[bool]()]
# Define the waku swap Config for this test # Define the waku swap Config for this test
let swapConfig = SwapConfig(mode: SwapMode.Mock, paymentThreshold: 1, disconnectThreshold: -1) let swapConfig = SwapConfig(mode: SwapMode.Mock, paymentThreshold: 1, disconnectThreshold: -1)
# Start nodes and mount protocols # Start nodes and mount protocols
await node1.start() await allFutures(client.start(), server.start())
await node1.mountSwap(swapConfig) await server.mountSwap(swapConfig)
await node1.mountStore(store=StoreQueueRef.new()) await server.mountStore(store=StoreQueueRef.new())
await node2.start() await client.mountSwap(swapConfig)
await node2.mountSwap(swapConfig) await client.mountStore()
await node2.mountStore(store=StoreQueueRef.new())
node2.wakuStore.handleMessage("/waku/2/default-waku/proto", message) client.wakuStore.setPeer(server.peerInfo.toRemotePeerInfo())
client.wakuSwap.setPeer(server.peerInfo.toRemotePeerInfo())
server.wakuSwap.setPeer(client.peerInfo.toRemotePeerInfo())
await sleepAsync(500.millis) ## Given
let message = fakeWakuMessage()
node1.wakuStore.setPeer(node2.switch.peerInfo.toRemotePeerInfo()) server.wakuStore.handleMessage(DefaultPubsubTopic, message)
node1.wakuSwap.setPeer(node2.switch.peerInfo.toRemotePeerInfo())
node2.wakuSwap.setPeer(node1.switch.peerInfo.toRemotePeerInfo())
proc handler1(response: HistoryResponse) {.gcsafe, closure.} = ## When
futures[0].complete(true) # TODO: Handshakes - for now we assume implicit, e2e still works for PoC
proc handler2(response: HistoryResponse) {.gcsafe, closure.} = let res1 = await client.query(HistoryQuery(contentFilters: @[HistoryContentFilter(contentTopic: DefaultContentTopic)]))
futures[1].complete(true) let res2 = await client.query(HistoryQuery(contentFilters: @[HistoryContentFilter(contentTopic: DefaultContentTopic)]))
# TODO Handshakes - for now we assume implicit, e2e still works for PoC ## Then
await node1.query(HistoryQuery(contentFilters: @[HistoryContentFilter(contentTopic: contentTopic)]), handler1) check:
await node1.query(HistoryQuery(contentFilters: @[HistoryContentFilter(contentTopic: contentTopic)]), handler2) res1.isOk()
res2.isOk()
check: check:
(await allFutures(futures).withTimeout(5.seconds)) == true
# Accounting table updated with credit and debit, respectively # Accounting table updated with credit and debit, respectively
# After sending a cheque the balance is partially adjusted # After sending a cheque the balance is partially adjusted
node1.wakuSwap.accounting[node2.switch.peerInfo.peerId] == 1 client.wakuSwap.accounting[server.peerInfo.peerId] == 1
node2.wakuSwap.accounting[node1.switch.peerInfo.peerId] == -1 server.wakuSwap.accounting[client.peerInfo.peerId] == -1
await node1.stop()
await node2.stop() ## Cleanup
await allFutures(client.stop(), server.stop())

View File

@ -12,72 +12,83 @@ import
../../waku/v2/protocol/waku_lightpush, ../../waku/v2/protocol/waku_lightpush,
../../waku/v2/node/peer_manager/peer_manager, ../../waku/v2/node/peer_manager/peer_manager,
../../waku/v2/utils/peers, ../../waku/v2/utils/peers,
../../waku/v2/utils/time,
../../waku/v2/node/wakunode2 ../../waku/v2/node/wakunode2
from std/times import getTime, toUnixFloat
const
DefaultPubsubTopic = "/waku/2/default-waku/proto"
DefaultContentTopic = ContentTopic("/waku/2/default-content/proto")
proc now(): Timestamp =
getNanosecondTime(getTime().toUnixFloat())
proc fakeWakuMessage(
payload = "TEST-PAYLOAD",
contentTopic = DefaultContentTopic,
ts = now()
): WakuMessage =
WakuMessage(
payload: toBytes(payload),
contentTopic: contentTopic,
version: 1,
timestamp: ts
)
procSuite "WakuNode - Lightpush": procSuite "WakuNode - Lightpush":
let rng = crypto.newRng() let rng = crypto.newRng()
asyncTest "Lightpush message return success": asyncTest "Lightpush message return success":
## Setup
let let
nodeKey1 = crypto.PrivateKey.random(Secp256k1, rng[])[] lightNodeKey = crypto.PrivateKey.random(Secp256k1, rng[])[]
node1 = WakuNode.new(nodeKey1, ValidIpAddress.init("0.0.0.0"), Port(60010)) lightNode = WakuNode.new(lightNodeKey, ValidIpAddress.init("0.0.0.0"), Port(60010))
nodeKey2 = crypto.PrivateKey.random(Secp256k1, rng[])[] bridgeNodeKey = crypto.PrivateKey.random(Secp256k1, rng[])[]
node2 = WakuNode.new(nodeKey2, ValidIpAddress.init("0.0.0.0"), Port(60012)) bridgeNode = WakuNode.new(bridgeNodeKey, ValidIpAddress.init("0.0.0.0"), Port(60012))
nodeKey3 = crypto.PrivateKey.random(Secp256k1, rng[])[] destNodeKey = crypto.PrivateKey.random(Secp256k1, rng[])[]
node3 = WakuNode.new(nodeKey3, ValidIpAddress.init("0.0.0.0"), Port(60013)) destNode = WakuNode.new(destNodeKey, ValidIpAddress.init("0.0.0.0"), Port(60013))
let await allFutures(destNode.start(), bridgeNode.start(), lightNode.start())
pubSubTopic = "test"
contentTopic = ContentTopic("/waku/2/default-content/proto")
payload = "hello world".toBytes()
message = WakuMessage(payload: payload, contentTopic: contentTopic)
# Light node, only lightpush await destNode.mountRelay(@[DefaultPubsubTopic])
await node1.start() await bridgeNode.mountRelay(@[DefaultPubsubTopic])
await node1.mountLightPush() await bridgeNode.mountLightPush()
await lightNode.mountLightPush()
# Intermediate node discard await lightNode.peerManager.dialPeer(bridgeNode.peerInfo.toRemotePeerInfo(), WakuLightPushCodec)
await node2.start() await sleepAsync(100.milliseconds)
await node2.mountRelay(@[pubSubTopic]) await destNode.connectToNodes(@[bridgeNode.peerInfo.toRemotePeerInfo()])
await node2.mountLightPush()
# Receiving node ## Given
await node3.start() let message = fakeWakuMessage()
await node3.mountRelay(@[pubSubTopic])
discard await node1.peerManager.dialPeer(node2.switch.peerInfo.toRemotePeerInfo(), WakuLightPushCodec)
await sleepAsync(1.seconds)
await node3.connectToNodes(@[node2.switch.peerInfo.toRemotePeerInfo()])
var completionFutLightPush = newFuture[bool]()
var completionFutRelay = newFuture[bool]() var completionFutRelay = newFuture[bool]()
proc relayHandler(topic: string, data: seq[byte]) {.async, gcsafe.} = proc relayHandler(pubsubTopic: string, data: seq[byte]) {.async, gcsafe.} =
let msg = WakuMessage.init(data) let msg = WakuMessage.init(data).get()
if msg.isOk():
let val = msg.value()
check: check:
topic == pubSubTopic pubsubTopic == DefaultPubsubTopic
val.contentTopic == contentTopic msg == message
val.payload == payload
completionFutRelay.complete(true) completionFutRelay.complete(true)
destNode.subscribe(DefaultPubsubTopic, relayHandler)
node3.subscribe(pubSubTopic, relayHandler) # Wait for subscription to take effect
await sleepAsync(500.millis) await sleepAsync(100.millis)
proc handler(response: PushResponse) {.gcsafe, closure.} = ## When
debug "push response handler, expecting true" let lightpushRes = await lightNode.lightpush(DefaultPubsubTopic, message)
require (await completionFutRelay.withTimeout(5.seconds)) == true
## Then
check lightpushRes.isOk()
let response = lightpushRes.get()
check: check:
response.isSuccess == true response.isSuccess == true
completionFutLightPush.complete(true)
# Publishing with lightpush ## Cleanup
await node1.lightpush(pubSubTopic, message, handler) await allFutures(lightNode.stop(), bridgeNode.stop(), destNode.stop())
await sleepAsync(500.millis)
check:
(await completionFutRelay.withTimeout(1.seconds)) == true
(await completionFutLightPush.withTimeout(1.seconds)) == true
await allFutures([node1.stop(), node2.stop(), node3.stop()])

View File

@ -29,172 +29,185 @@ import
from std/times import getTime, toUnixFloat from std/times import getTime, toUnixFloat
const
DefaultPubsubTopic = "/waku/2/default-waku/proto"
DefaultContentTopic = ContentTopic("/waku/2/default-content/proto")
proc now(): Timestamp =
getNanosecondTime(getTime().toUnixFloat())
proc newTestMessageStore(): MessageStore = proc newTestMessageStore(): MessageStore =
let database = SqliteDatabase.init("", inMemory = true)[] let database = SqliteDatabase.init("", inMemory = true)[]
SqliteStore.init(database).tryGet() SqliteStore.init(database).tryGet()
proc fakeWakuMessage(
payload = "TEST-PAYLOAD",
contentTopic = DefaultContentTopic,
ts = now()
): WakuMessage =
WakuMessage(
payload: toBytes(payload),
contentTopic: contentTopic,
version: 1,
timestamp: ts
)
procSuite "WakuNode - Store": procSuite "WakuNode - Store":
let rng = crypto.newRng() let rng = crypto.newRng()
asyncTest "Store protocol returns expected message": asyncTest "Store protocol returns expected message":
## Setup
let let
nodeKey1 = crypto.PrivateKey.random(Secp256k1, rng[])[] serverKey = crypto.PrivateKey.random(Secp256k1, rng[])[]
node1 = WakuNode.new(nodeKey1, ValidIpAddress.init("0.0.0.0"), Port(60000)) server = WakuNode.new(serverKey, ValidIpAddress.init("0.0.0.0"), Port(60002))
nodeKey2 = crypto.PrivateKey.random(Secp256k1, rng[])[] clientKey = crypto.PrivateKey.random(Secp256k1, rng[])[]
node2 = WakuNode.new(nodeKey2, ValidIpAddress.init("0.0.0.0"), Port(60002)) client = WakuNode.new(clientKey, ValidIpAddress.init("0.0.0.0"), Port(60000))
let
contentTopic = ContentTopic("/waku/2/default-content/proto")
message = WakuMessage(payload: "hello world".toBytes(), contentTopic: contentTopic)
var completionFut = newFuture[bool]() await allFutures(client.start(), server.start())
await server.mountStore(store=newTestMessageStore())
await client.mountStore()
await node1.start() client.wakuStore.setPeer(server.peerInfo.toRemotePeerInfo())
await node1.mountStore(store=newTestMessageStore())
await node2.start()
await node2.mountStore(store=newTestMessageStore())
node2.wakuStore.handleMessage("/waku/2/default-waku/proto", message) ## Given
let message = fakeWakuMessage()
server.wakuStore.handleMessage(DefaultPubsubTopic, message)
await sleepAsync(500.millis) ## When
let req = HistoryQuery(contentFilters: @[HistoryContentFilter(contentTopic: DefaultContentTopic)])
let queryRes = await client.query(req)
node1.wakuStore.setPeer(node2.switch.peerInfo.toRemotePeerInfo()) ## Then
check queryRes.isOk()
proc storeHandler(response: HistoryResponse) {.gcsafe, closure.} = let response = queryRes.get()
check: check:
response.messages[0] == message response.messages == @[message]
completionFut.complete(true)
await node1.query(HistoryQuery(contentFilters: @[HistoryContentFilter(contentTopic: contentTopic)]), storeHandler) # Cleanup
await allFutures(client.stop(), server.stop())
check:
(await completionFut.withTimeout(5.seconds)) == true
await node1.stop()
await node2.stop()
asyncTest "Store protocol returns expected message when relay is disabled and filter enabled": asyncTest "Store protocol returns expected message when relay is disabled and filter enabled":
# See nwaku issue #937: 'Store: ability to decouple store from relay' ## See nwaku issue #937: 'Store: ability to decouple store from relay'
## Setup
let let
nodeKey1 = crypto.PrivateKey.random(Secp256k1, rng[])[] filterSourceKey = crypto.PrivateKey.random(Secp256k1, rng[])[]
node1 = WakuNode.new(nodeKey1, ValidIpAddress.init("0.0.0.0"), Port(60000)) filterSource = WakuNode.new(filterSourceKey, ValidIpAddress.init("0.0.0.0"), Port(60004))
nodeKey2 = crypto.PrivateKey.random(Secp256k1, rng[])[] serverKey = crypto.PrivateKey.random(Secp256k1, rng[])[]
node2 = WakuNode.new(nodeKey2, ValidIpAddress.init("0.0.0.0"), Port(60002)) server = WakuNode.new(serverKey, ValidIpAddress.init("0.0.0.0"), Port(60002))
clientKey = crypto.PrivateKey.random(Secp256k1, rng[])[]
client = WakuNode.new(clientKey, ValidIpAddress.init("0.0.0.0"), Port(60000))
let await allFutures(client.start(), server.start(), filterSource.start())
pubSubTopic = "/waku/2/default-waku/proto"
contentTopic = ContentTopic("/waku/2/default-content/proto")
message = WakuMessage(payload: "hello world".toBytes(), contentTopic: contentTopic)
let await filterSource.mountFilter()
filterComplFut = newFuture[bool]() await server.mountStore(store=newTestMessageStore())
storeComplFut = newFuture[bool]() await server.mountFilter()
await client.mountStore()
await node1.start() server.wakuFilter.setPeer(filterSource.peerInfo.toRemotePeerInfo())
await node1.mountStore(store=newTestMessageStore()) client.wakuStore.setPeer(server.peerInfo.toRemotePeerInfo())
await node1.mountFilter()
await node2.start() ## Given
await node2.mountStore(store=newTestMessageStore()) let message = fakeWakuMessage()
await node2.mountFilter()
node2.wakuFilter.setPeer(node1.switch.peerInfo.toRemotePeerInfo())
node1.wakuStore.setPeer(node2.switch.peerInfo.toRemotePeerInfo())
## Then
let filterFut = newFuture[bool]()
proc filterReqHandler(msg: WakuMessage) {.gcsafe, closure.} = proc filterReqHandler(msg: WakuMessage) {.gcsafe, closure.} =
check: check:
msg == message msg == message
filterComplFut.complete(true) filterFut.complete(true)
await node2.subscribe(FilterRequest(pubSubTopic: pubSubTopic, contentFilters: @[ContentFilter(contentTopic: contentTopic)], subscribe: true), filterReqHandler) let filterReq = FilterRequest(pubSubTopic: DefaultPubsubTopic, contentFilters: @[ContentFilter(contentTopic: DefaultContentTopic)], subscribe: true)
await server.subscribe(filterReq, filterReqHandler)
await sleepAsync(500.millis) await sleepAsync(100.millis)
# Send filter push message to node2 # Send filter push message to server from source node
await node1.wakuFilter.handleMessage(pubSubTopic, message) await filterSource.wakuFilter.handleMessage(DefaultPubsubTopic, message)
await sleepAsync(500.millis) # Wait for the server filter to receive the push message
require (await filterFut.withTimeout(5.seconds))
# Wait for the node2 filter to receive the push message let res = await client.query(HistoryQuery(contentFilters: @[HistoryContentFilter(contentTopic: DefaultContentTopic)]))
check:
(await filterComplFut.withTimeout(5.seconds)) == true
proc node1StoreQueryRespHandler(response: HistoryResponse) {.gcsafe, closure.} = ## Then
check res.isOk()
let response = res.get()
check: check:
response.messages.len == 1 response.messages.len == 1
response.messages[0] == message response.messages[0] == message
storeComplFut.complete(true)
await node1.query(HistoryQuery(contentFilters: @[HistoryContentFilter(contentTopic: contentTopic)]), node1StoreQueryRespHandler) ## Cleanup
await allFutures(client.stop(), server.stop(), filterSource.stop())
check:
(await storeComplFut.withTimeout(5.seconds)) == true
await node1.stop()
await node2.stop()
asyncTest "Resume proc fetches the history": asyncTest "Resume proc fetches the history":
## Setup
let let
nodeKey1 = crypto.PrivateKey.random(Secp256k1, rng[])[] serverKey = crypto.PrivateKey.random(Secp256k1, rng[])[]
node1 = WakuNode.new(nodeKey1, ValidIpAddress.init("0.0.0.0"), Port(60000)) server = WakuNode.new(serverKey, ValidIpAddress.init("0.0.0.0"), Port(60002))
nodeKey2 = crypto.PrivateKey.random(Secp256k1, rng[])[] clientKey = crypto.PrivateKey.random(Secp256k1, rng[])[]
node2 = WakuNode.new(nodeKey2, ValidIpAddress.init("0.0.0.0"), Port(60002)) client = WakuNode.new(clientKey, ValidIpAddress.init("0.0.0.0"), Port(60000))
let
contentTopic = ContentTopic("/waku/2/default-content/proto")
message = WakuMessage(payload: "hello world".toBytes(), contentTopic: contentTopic)
await node1.start()
await node1.mountStore(store=newTestMessageStore())
await node2.start()
await node2.mountStore(store=StoreQueueRef.new())
node2.wakuStore.handleMessage("/waku/2/default-waku/proto", message)
await sleepAsync(500.millis)
node1.wakuStore.setPeer(node2.switch.peerInfo.toRemotePeerInfo())
await node1.resume()
check:
# message is correctly stored
node1.wakuStore.store.getMessagesCount().tryGet() == 1
await node1.stop()
await node2.stop()
asyncTest "Resume proc discards duplicate messages":
let timeOrigin = getNanosecondTime(getTime().toUnixFloat())
let
nodeKey1 = crypto.PrivateKey.random(Secp256k1, rng[])[]
client = WakuNode.new(nodeKey1, ValidIpAddress.init("0.0.0.0"), Port(60000))
nodeKey2 = crypto.PrivateKey.random(Secp256k1, rng[])[]
server = WakuNode.new(nodeKey2, ValidIpAddress.init("0.0.0.0"), Port(60002))
let
contentTopic = ContentTopic("/waku/2/default-content/proto")
msg1 = WakuMessage(payload: "hello world1".toBytes(), contentTopic: contentTopic, timestamp: timeOrigin + 1)
msg2 = WakuMessage(payload: "hello world2".toBytes(), contentTopic: contentTopic, timestamp: timeOrigin + 2)
msg3 = WakuMessage(payload: "hello world3".toBytes(), contentTopic: contentTopic, timestamp: timeOrigin + 3)
await allFutures(client.start(), server.start()) await allFutures(client.start(), server.start())
await server.mountStore(store=newTestMessageStore())
await client.mountStore(store=StoreQueueRef.new())
client.wakuStore.setPeer(server.peerInfo.toRemotePeerInfo())
## Given
let message = fakeWakuMessage()
server.wakuStore.handleMessage(DefaultPubsubTopic, message)
## When
await client.resume()
# Then
check:
client.wakuStore.store.getMessagesCount().tryGet() == 1
## Cleanup
await allFutures(client.stop(), server.stop())
asyncTest "Resume proc discards duplicate messages":
## Setup
let
serverKey = crypto.PrivateKey.random(Secp256k1, rng[])[]
server = WakuNode.new(serverKey, ValidIpAddress.init("0.0.0.0"), Port(60002))
clientKey = crypto.PrivateKey.random(Secp256k1, rng[])[]
client = WakuNode.new(clientKey, ValidIpAddress.init("0.0.0.0"), Port(60000))
await allFutures(server.start(), client.start())
await client.mountStore(store=StoreQueueRef.new()) await client.mountStore(store=StoreQueueRef.new())
await server.mountStore(store=StoreQueueRef.new()) await server.mountStore(store=StoreQueueRef.new())
client.wakuStore.setPeer(server.peerInfo.toRemotePeerInfo())
## Given
let timeOrigin = now()
let
msg1 = fakeWakuMessage(payload="hello world1", ts=(timeOrigin + getNanoSecondTime(1)))
msg2 = fakeWakuMessage(payload="hello world2", ts=(timeOrigin + getNanoSecondTime(2)))
msg3 = fakeWakuMessage(payload="hello world3", ts=(timeOrigin + getNanoSecondTime(3)))
server.wakuStore.handleMessage(DefaultTopic, msg1) server.wakuStore.handleMessage(DefaultTopic, msg1)
server.wakuStore.handleMessage(DefaultTopic, msg2) server.wakuStore.handleMessage(DefaultTopic, msg2)
client.wakuStore.setPeer(server.switch.peerInfo.toRemotePeerInfo())
# Insert the same message in both node's store # Insert the same message in both node's store
let index3 = Index.compute(msg3, getNanosecondTime(getTime().toUnixFloat() + 10.float), DefaultTopic) let index3 = Index.compute(msg3, getNanosecondTime(getTime().toUnixFloat() + 10.float), DefaultTopic)
require server.wakuStore.store.put(index3, msg3, DefaultTopic).isOk() require server.wakuStore.store.put(index3, msg3, DefaultTopic).isOk()
require client.wakuStore.store.put(index3, msg3, DefaultTopic).isOk() require client.wakuStore.store.put(index3, msg3, DefaultTopic).isOk()
# now run the resume proc ## When
await client.resume() await client.resume()
## Then
check: check:
# If the duplicates are discarded properly, then the total number of messages after resume should be 3 # If the duplicates are discarded properly, then the total number of messages after resume should be 3
client.wakuStore.store.getMessagesCount().tryGet() == 3 client.wakuStore.store.getMessagesCount().tryGet() == 3

View File

@ -26,23 +26,20 @@ proc installStoreApiHandlers*(node: WakuNode, rpcsrv: RpcServer) =
## Returns history for a list of content topics with optional paging ## Returns history for a list of content topics with optional paging
debug "get_waku_v2_store_v1_messages" debug "get_waku_v2_store_v1_messages"
var responseFut = newFuture[StoreResponse]()
proc queryFuncHandler(response: HistoryResponse) {.gcsafe, closure.} =
debug "get_waku_v2_store_v1_messages response"
responseFut.complete(response.toStoreResponse())
let historyQuery = HistoryQuery(pubsubTopic: if pubsubTopicOption.isSome: pubsubTopicOption.get() else: "", let historyQuery = HistoryQuery(pubsubTopic: if pubsubTopicOption.isSome: pubsubTopicOption.get() else: "",
contentFilters: if contentFiltersOption.isSome: contentFiltersOption.get() else: @[], contentFilters: if contentFiltersOption.isSome: contentFiltersOption.get() else: @[],
startTime: if startTime.isSome: startTime.get() else: Timestamp(0), startTime: if startTime.isSome: startTime.get() else: Timestamp(0),
endTime: if endTime.isSome: endTime.get() else: Timestamp(0), endTime: if endTime.isSome: endTime.get() else: Timestamp(0),
pagingInfo: if pagingOptions.isSome: pagingOptions.get.toPagingInfo() else: PagingInfo()) pagingInfo: if pagingOptions.isSome: pagingOptions.get.toPagingInfo() else: PagingInfo())
let req = node.query(historyQuery)
await node.query(historyQuery, queryFuncHandler) if not (await req.withTimeout(futTimeout)):
if (await responseFut.withTimeout(futTimeout)):
# Future completed
return responseFut.read()
else:
# Future failed to complete # Future failed to complete
raise newException(ValueError, "No history response received") raise newException(ValueError, "No history response received (timeout)")
let res = req.read()
if res.isErr():
raise newException(ValueError, $res.error())
debug "get_waku_v2_store_v1_messages response"
return res.value.toStoreResponse()

View File

@ -198,6 +198,9 @@ proc new*(T: type WakuNode, nodeKey: crypto.PrivateKey,
return wakuNode return wakuNode
proc peerInfo*(node: WakuNode): PeerInfo =
node.switch.peerInfo
proc subscribe(node: WakuNode, topic: Topic, handler: Option[TopicHandler]) = proc subscribe(node: WakuNode, topic: Topic, handler: Option[TopicHandler]) =
if node.wakuRelay.isNil: if node.wakuRelay.isNil:
error "Invalid API call to `subscribe`. WakuRelay not mounted." error "Invalid API call to `subscribe`. WakuRelay not mounted."
@ -354,19 +357,8 @@ proc lightpush*(node: WakuNode, topic: Topic, message: WakuMessage): Future[Waku
let rpc = PushRequest(pubSubTopic: topic, message: message) let rpc = PushRequest(pubSubTopic: topic, message: message)
return await node.wakuLightPush.request(rpc) return await node.wakuLightPush.request(rpc)
proc lightpush*(node: WakuNode, topic: Topic, message: WakuMessage, handler: PushResponseHandler) {.async, gcsafe, proc lightpush2*(node: WakuNode, topic: Topic, message: WakuMessage) {.async, gcsafe.} =
deprecated: "Use the no-callback version of this method".} = discard await node.lightpush(topic, message)
## Pushes a `WakuMessage` to a node which relays it further on PubSub topic.
## Returns whether relaying was successful or not in `handler`.
## `WakuMessage` should contain a `contentTopic` field for light node
## functionality.
let rpc = PushRequest(pubSubTopic: topic, message: message)
let res = await node.wakuLightPush.request(rpc)
if res.isOk():
handler(res.value)
else:
error "Message lightpush failed", error=res.error()
proc query*(node: WakuNode, query: HistoryQuery): Future[WakuStoreResult[HistoryResponse]] {.async, gcsafe.} = proc query*(node: WakuNode, query: HistoryQuery): Future[WakuStoreResult[HistoryResponse]] {.async, gcsafe.} =
## Queries known nodes for historical messages ## Queries known nodes for historical messages
@ -380,17 +372,6 @@ proc query*(node: WakuNode, query: HistoryQuery): Future[WakuStoreResult[History
# TODO: wakuSwap now part of wakuStore object # TODO: wakuSwap now part of wakuStore object
return await node.wakuStore.queryWithAccounting(query) return await node.wakuStore.queryWithAccounting(query)
proc query*(node: WakuNode, query: HistoryQuery, handler: QueryHandlerFunc) {.async, gcsafe,
deprecated: "Use the no-callback version of this method".} =
## Queries known nodes for historical messages. Triggers the handler whenever a response is received.
## QueryHandlerFunc is a method that takes a HistoryResponse.
let res = await node.query(query)
if res.isOk():
handler(res.value)
else:
error "History query failed", error=res.error()
proc resume*(node: WakuNode, peerList: Option[seq[RemotePeerInfo]] = none(seq[RemotePeerInfo])) {.async, gcsafe.} = proc resume*(node: WakuNode, peerList: Option[seq[RemotePeerInfo]] = none(seq[RemotePeerInfo])) {.async, gcsafe.} =
## resume proc retrieves the history of waku messages published on the default waku pubsub topic since the last time the waku node has been online ## resume proc retrieves the history of waku messages published on the default waku pubsub topic since the last time the waku node has been online
## for resume to work properly the waku node must have the store protocol mounted in the full mode (i.e., persisting messages) ## for resume to work properly the waku node must have the store protocol mounted in the full mode (i.e., persisting messages)

View File

@ -246,9 +246,6 @@ proc setPeer*(ws: WakuStore, peer: RemotePeerInfo) =
ws.peerManager.addPeer(peer, WakuStoreCodec) ws.peerManager.addPeer(peer, WakuStoreCodec)
waku_store_peers.inc() waku_store_peers.inc()
# TODO: Remove after converting the query method into a non-callback method
type QueryHandlerFunc* = proc(response: HistoryResponse) {.gcsafe, closure.}
proc query(w: WakuStore, req: HistoryQuery, peer: RemotePeerInfo): Future[WakuStoreResult[HistoryResponse]] {.async, gcsafe.} = proc query(w: WakuStore, req: HistoryQuery, peer: RemotePeerInfo): Future[WakuStoreResult[HistoryResponse]] {.async, gcsafe.} =
let connOpt = await w.peerManager.dialPeer(peer, WakuStoreCodec) let connOpt = await w.peerManager.dialPeer(peer, WakuStoreCodec)
if connOpt.isNone(): if connOpt.isNone():