diff --git a/examples/v2/chat2.nim b/examples/v2/chat2.nim index ce7c31a2e..3fab923ea 100644 --- a/examples/v2/chat2.nim +++ b/examples/v2/chat2.nim @@ -241,7 +241,7 @@ proc publish(c: Chat, line: string) = c.node.wakuRlnRelay.lastEpoch = message.proof.epoch if not c.node.wakuLightPush.isNil(): # Attempt lightpush - asyncSpawn c.node.lightpush(DefaultTopic, message, handler) + asyncSpawn c.node.lightpush2(DefaultTopic, message) else: asyncSpawn c.node.publish(DefaultTopic, message, handler) else: @@ -270,7 +270,7 @@ proc publish(c: Chat, line: string) = if not c.node.wakuLightPush.isNil(): # Attempt lightpush - asyncSpawn c.node.lightpush(DefaultTopic, message, handler) + asyncSpawn c.node.lightpush2(DefaultTopic, message) else: asyncSpawn c.node.publish(DefaultTopic, message) @@ -475,7 +475,9 @@ proc processInput(rfd: AsyncFD) {.async.} = echo &"{chatLine}" info "Hit store handler" - await node.query(HistoryQuery(contentFilters: @[HistoryContentFilter(contentTopic: chat.contentTopic)]), storeHandler) + let queryRes = await node.query(HistoryQuery(contentFilters: @[HistoryContentFilter(contentTopic: chat.contentTopic)])) + if queryRes.isOk(): + storeHandler(queryRes.value) # NOTE Must be mounted after relay if conf.lightpushnode != "": diff --git a/tests/v2/test_waku_swap.nim b/tests/v2/test_waku_swap.nim index fb8a1a31d..1bc8def80 100644 --- a/tests/v2/test_waku_swap.nim +++ b/tests/v2/test_waku_swap.nim @@ -1,9 +1,12 @@ {.used.} import - std/[options, tables, sets], + std/[options, tables, sets, times], + stew/byteutils, + stew/shims/net as stewNet, testutils/unittests, - chronos, chronicles, stew/shims/net as stewNet, stew/byteutils, + chronos, + chronicles, libp2p/switch, libp2p/protobuf/minprotobuf, libp2p/stream/[bufferstream, connection], @@ -17,7 +20,31 @@ import ../../waku/v2/node/storage/message/waku_store_queue, ../../waku/v2/node/wakunode2, ../../waku/v2/utils/peers, - ../test_helpers, ./utils + ../../waku/v2/utils/time, + ../test_helpers, + ./utils + + +const + DefaultPubsubTopic = "/waku/2/default-waku/proto" + DefaultContentTopic = ContentTopic("/waku/2/default-content/proto") + + +proc now(): Timestamp = + getNanosecondTime(getTime().toUnixFloat()) + +proc fakeWakuMessage( + payload = "TEST-PAYLOAD", + contentTopic = DefaultContentTopic, + ts = now() +): WakuMessage = + WakuMessage( + payload: toBytes(payload), + contentTopic: contentTopic, + version: 1, + timestamp: ts + ) + procSuite "Waku SWAP Accounting": test "Handshake Encode/Decode": @@ -50,98 +77,91 @@ procSuite "Waku SWAP Accounting": # With current logic state isn't updated because of bad cheque # Consider moving this test to e2e test, and/or move swap module to be on by default asyncTest "Update accounting state after store operations": + ## Setup let - nodeKey1 = crypto.PrivateKey.random(Secp256k1, rng[])[] - node1 = WakuNode.new(nodeKey1, ValidIpAddress.init("0.0.0.0"), Port(60000)) - nodeKey2 = crypto.PrivateKey.random(Secp256k1, rng[])[] - node2 = WakuNode.new(nodeKey2, ValidIpAddress.init("0.0.0.0"), Port(60001)) - - let - contentTopic = ContentTopic("/waku/2/default-content/proto") - message = WakuMessage(payload: "hello world".toBytes(), contentTopic: contentTopic) - - var completionFut = newFuture[bool]() + serverKey = crypto.PrivateKey.random(Secp256k1, rng[])[] + server = WakuNode.new(serverKey, ValidIpAddress.init("0.0.0.0"), Port(60002)) + clientKey = crypto.PrivateKey.random(Secp256k1, rng[])[] + client = WakuNode.new(clientKey, ValidIpAddress.init("0.0.0.0"), Port(60000)) # Start nodes and mount protocols - await node1.start() - await node1.mountSwap() - await node1.mountStore(store=StoreQueueRef.new()) - await node2.start() - await node2.mountSwap() - await node2.mountStore(store=StoreQueueRef.new()) + await allFutures(client.start(), server.start()) + await server.mountSwap() + await server.mountStore(store=StoreQueueRef.new()) + await client.mountSwap() + await client.mountStore() - node2.wakuStore.handleMessage("/waku/2/default-waku/proto", message) + client.wakuStore.setPeer(server.peerInfo.toRemotePeerInfo()) + client.wakuSwap.setPeer(server.peerInfo.toRemotePeerInfo()) + server.wakuSwap.setPeer(client.peerInfo.toRemotePeerInfo()) + + ## Given + let message = fakeWakuMessage() - await sleepAsync(500.millis) + server.wakuStore.handleMessage(DefaultPubsubTopic, message) - node1.wakuStore.setPeer(node2.switch.peerInfo.toRemotePeerInfo()) - node1.wakuSwap.setPeer(node2.switch.peerInfo.toRemotePeerInfo()) - node2.wakuSwap.setPeer(node1.switch.peerInfo.toRemotePeerInfo()) + ## When + let queryRes = await client.query(HistoryQuery(contentFilters: @[HistoryContentFilter(contentTopic: DefaultContentTopic)])) - proc storeHandler(response: HistoryResponse) {.gcsafe, closure.} = - debug "storeHandler hit" - check: - response.messages[0] == message - completionFut.complete(true) + ## Then + check queryRes.isOk() - await node1.query(HistoryQuery(contentFilters: @[HistoryContentFilter(contentTopic: contentTopic)]), storeHandler) + let response = queryRes.get() + check: + response.messages == @[message] check: - (await completionFut.withTimeout(5.seconds)) == true - # Accounting table updated with credit and debit, respectively - node1.wakuSwap.accounting[node2.switch.peerInfo.peerId] == 1 - node2.wakuSwap.accounting[node1.switch.peerInfo.peerId] == -1 - await node1.stop() - await node2.stop() + client.wakuSwap.accounting[server.peerInfo.peerId] == 1 + server.wakuSwap.accounting[client.peerInfo.peerId] == -1 + + ## Cleanup + await allFutures(client.stop(), server.stop()) + - # TODO Add cheque here # This test will only Be checked if in Mock mode + # TODO: Add cheque here asyncTest "Update accounting state after sending cheque": + ## Setup let - nodeKey1 = crypto.PrivateKey.random(Secp256k1, rng[])[] - node1 = WakuNode.new(nodeKey1, ValidIpAddress.init("0.0.0.0"), Port(60000)) - nodeKey2 = crypto.PrivateKey.random(Secp256k1, rng[])[] - node2 = WakuNode.new(nodeKey2, ValidIpAddress.init("0.0.0.0"), Port(60001)) - - let - contentTopic = ContentTopic("/waku/2/default-content/proto") - message = WakuMessage(payload: "hello world".toBytes(), contentTopic: contentTopic) - - var futures = [newFuture[bool](), newFuture[bool]()] - + serverKey = crypto.PrivateKey.random(Secp256k1, rng[])[] + server = WakuNode.new(serverKey, ValidIpAddress.init("0.0.0.0"), Port(60002)) + clientKey = crypto.PrivateKey.random(Secp256k1, rng[])[] + client = WakuNode.new(clientKey, ValidIpAddress.init("0.0.0.0"), Port(60000)) + # Define the waku swap Config for this test let swapConfig = SwapConfig(mode: SwapMode.Mock, paymentThreshold: 1, disconnectThreshold: -1) # Start nodes and mount protocols - await node1.start() - await node1.mountSwap(swapConfig) - await node1.mountStore(store=StoreQueueRef.new()) - await node2.start() - await node2.mountSwap(swapConfig) - await node2.mountStore(store=StoreQueueRef.new()) + await allFutures(client.start(), server.start()) + await server.mountSwap(swapConfig) + await server.mountStore(store=StoreQueueRef.new()) + await client.mountSwap(swapConfig) + await client.mountStore() - node2.wakuStore.handleMessage("/waku/2/default-waku/proto", message) + client.wakuStore.setPeer(server.peerInfo.toRemotePeerInfo()) + client.wakuSwap.setPeer(server.peerInfo.toRemotePeerInfo()) + server.wakuSwap.setPeer(client.peerInfo.toRemotePeerInfo()) + + ## Given + let message = fakeWakuMessage() - await sleepAsync(500.millis) + server.wakuStore.handleMessage(DefaultPubsubTopic, message) - node1.wakuStore.setPeer(node2.switch.peerInfo.toRemotePeerInfo()) - node1.wakuSwap.setPeer(node2.switch.peerInfo.toRemotePeerInfo()) - node2.wakuSwap.setPeer(node1.switch.peerInfo.toRemotePeerInfo()) + ## When + # TODO: Handshakes - for now we assume implicit, e2e still works for PoC + let res1 = await client.query(HistoryQuery(contentFilters: @[HistoryContentFilter(contentTopic: DefaultContentTopic)])) + let res2 = await client.query(HistoryQuery(contentFilters: @[HistoryContentFilter(contentTopic: DefaultContentTopic)])) - proc handler1(response: HistoryResponse) {.gcsafe, closure.} = - futures[0].complete(true) - proc handler2(response: HistoryResponse) {.gcsafe, closure.} = - futures[1].complete(true) - - # TODO Handshakes - for now we assume implicit, e2e still works for PoC - await node1.query(HistoryQuery(contentFilters: @[HistoryContentFilter(contentTopic: contentTopic)]), handler1) - await node1.query(HistoryQuery(contentFilters: @[HistoryContentFilter(contentTopic: contentTopic)]), handler2) + ## Then + check: + res1.isOk() + res2.isOk() check: - (await allFutures(futures).withTimeout(5.seconds)) == true # Accounting table updated with credit and debit, respectively # After sending a cheque the balance is partially adjusted - node1.wakuSwap.accounting[node2.switch.peerInfo.peerId] == 1 - node2.wakuSwap.accounting[node1.switch.peerInfo.peerId] == -1 - await node1.stop() - await node2.stop() + client.wakuSwap.accounting[server.peerInfo.peerId] == 1 + server.wakuSwap.accounting[client.peerInfo.peerId] == -1 + + ## Cleanup + await allFutures(client.stop(), server.stop()) diff --git a/tests/v2/test_wakunode_lightpush.nim b/tests/v2/test_wakunode_lightpush.nim index f977c70b9..635ce8956 100644 --- a/tests/v2/test_wakunode_lightpush.nim +++ b/tests/v2/test_wakunode_lightpush.nim @@ -12,72 +12,83 @@ import ../../waku/v2/protocol/waku_lightpush, ../../waku/v2/node/peer_manager/peer_manager, ../../waku/v2/utils/peers, + ../../waku/v2/utils/time, ../../waku/v2/node/wakunode2 +from std/times import getTime, toUnixFloat + + +const + DefaultPubsubTopic = "/waku/2/default-waku/proto" + DefaultContentTopic = ContentTopic("/waku/2/default-content/proto") + +proc now(): Timestamp = + getNanosecondTime(getTime().toUnixFloat()) + +proc fakeWakuMessage( + payload = "TEST-PAYLOAD", + contentTopic = DefaultContentTopic, + ts = now() +): WakuMessage = + WakuMessage( + payload: toBytes(payload), + contentTopic: contentTopic, + version: 1, + timestamp: ts + ) + procSuite "WakuNode - Lightpush": let rng = crypto.newRng() asyncTest "Lightpush message return success": + ## Setup let - nodeKey1 = crypto.PrivateKey.random(Secp256k1, rng[])[] - node1 = WakuNode.new(nodeKey1, ValidIpAddress.init("0.0.0.0"), Port(60010)) - nodeKey2 = crypto.PrivateKey.random(Secp256k1, rng[])[] - node2 = WakuNode.new(nodeKey2, ValidIpAddress.init("0.0.0.0"), Port(60012)) - nodeKey3 = crypto.PrivateKey.random(Secp256k1, rng[])[] - node3 = WakuNode.new(nodeKey3, ValidIpAddress.init("0.0.0.0"), Port(60013)) + lightNodeKey = crypto.PrivateKey.random(Secp256k1, rng[])[] + lightNode = WakuNode.new(lightNodeKey, ValidIpAddress.init("0.0.0.0"), Port(60010)) + bridgeNodeKey = crypto.PrivateKey.random(Secp256k1, rng[])[] + bridgeNode = WakuNode.new(bridgeNodeKey, ValidIpAddress.init("0.0.0.0"), Port(60012)) + destNodeKey = crypto.PrivateKey.random(Secp256k1, rng[])[] + destNode = WakuNode.new(destNodeKey, ValidIpAddress.init("0.0.0.0"), Port(60013)) - let - pubSubTopic = "test" - contentTopic = ContentTopic("/waku/2/default-content/proto") - payload = "hello world".toBytes() - message = WakuMessage(payload: payload, contentTopic: contentTopic) + await allFutures(destNode.start(), bridgeNode.start(), lightNode.start()) - # Light node, only lightpush - await node1.start() - await node1.mountLightPush() + await destNode.mountRelay(@[DefaultPubsubTopic]) + await bridgeNode.mountRelay(@[DefaultPubsubTopic]) + await bridgeNode.mountLightPush() + await lightNode.mountLightPush() + + discard await lightNode.peerManager.dialPeer(bridgeNode.peerInfo.toRemotePeerInfo(), WakuLightPushCodec) + await sleepAsync(100.milliseconds) + await destNode.connectToNodes(@[bridgeNode.peerInfo.toRemotePeerInfo()]) - # Intermediate node - await node2.start() - await node2.mountRelay(@[pubSubTopic]) - await node2.mountLightPush() + ## Given + let message = fakeWakuMessage() - # Receiving node - await node3.start() - await node3.mountRelay(@[pubSubTopic]) - - discard await node1.peerManager.dialPeer(node2.switch.peerInfo.toRemotePeerInfo(), WakuLightPushCodec) - await sleepAsync(1.seconds) - await node3.connectToNodes(@[node2.switch.peerInfo.toRemotePeerInfo()]) - - var completionFutLightPush = newFuture[bool]() var completionFutRelay = newFuture[bool]() - proc relayHandler(topic: string, data: seq[byte]) {.async, gcsafe.} = - let msg = WakuMessage.init(data) - if msg.isOk(): - let val = msg.value() - check: - topic == pubSubTopic - val.contentTopic == contentTopic - val.payload == payload - completionFutRelay.complete(true) - - node3.subscribe(pubSubTopic, relayHandler) - await sleepAsync(500.millis) - - proc handler(response: PushResponse) {.gcsafe, closure.} = - debug "push response handler, expecting true" + proc relayHandler(pubsubTopic: string, data: seq[byte]) {.async, gcsafe.} = + let msg = WakuMessage.init(data).get() check: - response.isSuccess == true - completionFutLightPush.complete(true) + pubsubTopic == DefaultPubsubTopic + msg == message + completionFutRelay.complete(true) + destNode.subscribe(DefaultPubsubTopic, relayHandler) - # Publishing with lightpush - await node1.lightpush(pubSubTopic, message, handler) - await sleepAsync(500.millis) + # Wait for subscription to take effect + await sleepAsync(100.millis) + ## When + let lightpushRes = await lightNode.lightpush(DefaultPubsubTopic, message) + + require (await completionFutRelay.withTimeout(5.seconds)) == true + + ## Then + check lightpushRes.isOk() + + let response = lightpushRes.get() check: - (await completionFutRelay.withTimeout(1.seconds)) == true - (await completionFutLightPush.withTimeout(1.seconds)) == true + response.isSuccess == true - await allFutures([node1.stop(), node2.stop(), node3.stop()]) + ## Cleanup + await allFutures(lightNode.stop(), bridgeNode.stop(), destNode.stop()) \ No newline at end of file diff --git a/tests/v2/test_wakunode_store.nim b/tests/v2/test_wakunode_store.nim index e751f72e8..fd529de5a 100644 --- a/tests/v2/test_wakunode_store.nim +++ b/tests/v2/test_wakunode_store.nim @@ -29,172 +29,185 @@ import from std/times import getTime, toUnixFloat + +const + DefaultPubsubTopic = "/waku/2/default-waku/proto" + DefaultContentTopic = ContentTopic("/waku/2/default-content/proto") + +proc now(): Timestamp = + getNanosecondTime(getTime().toUnixFloat()) + proc newTestMessageStore(): MessageStore = let database = SqliteDatabase.init("", inMemory = true)[] SqliteStore.init(database).tryGet() +proc fakeWakuMessage( + payload = "TEST-PAYLOAD", + contentTopic = DefaultContentTopic, + ts = now() +): WakuMessage = + WakuMessage( + payload: toBytes(payload), + contentTopic: contentTopic, + version: 1, + timestamp: ts + ) + procSuite "WakuNode - Store": let rng = crypto.newRng() asyncTest "Store protocol returns expected message": + ## Setup let - nodeKey1 = crypto.PrivateKey.random(Secp256k1, rng[])[] - node1 = WakuNode.new(nodeKey1, ValidIpAddress.init("0.0.0.0"), Port(60000)) - nodeKey2 = crypto.PrivateKey.random(Secp256k1, rng[])[] - node2 = WakuNode.new(nodeKey2, ValidIpAddress.init("0.0.0.0"), Port(60002)) - let - contentTopic = ContentTopic("/waku/2/default-content/proto") - message = WakuMessage(payload: "hello world".toBytes(), contentTopic: contentTopic) + serverKey = crypto.PrivateKey.random(Secp256k1, rng[])[] + server = WakuNode.new(serverKey, ValidIpAddress.init("0.0.0.0"), Port(60002)) + clientKey = crypto.PrivateKey.random(Secp256k1, rng[])[] + client = WakuNode.new(clientKey, ValidIpAddress.init("0.0.0.0"), Port(60000)) - var completionFut = newFuture[bool]() + await allFutures(client.start(), server.start()) + await server.mountStore(store=newTestMessageStore()) + await client.mountStore() - await node1.start() - await node1.mountStore(store=newTestMessageStore()) - await node2.start() - await node2.mountStore(store=newTestMessageStore()) + client.wakuStore.setPeer(server.peerInfo.toRemotePeerInfo()) - node2.wakuStore.handleMessage("/waku/2/default-waku/proto", message) + ## Given + let message = fakeWakuMessage() + server.wakuStore.handleMessage(DefaultPubsubTopic, message) - await sleepAsync(500.millis) - - node1.wakuStore.setPeer(node2.switch.peerInfo.toRemotePeerInfo()) - - proc storeHandler(response: HistoryResponse) {.gcsafe, closure.} = - check: - response.messages[0] == message - completionFut.complete(true) - - await node1.query(HistoryQuery(contentFilters: @[HistoryContentFilter(contentTopic: contentTopic)]), storeHandler) + ## When + let req = HistoryQuery(contentFilters: @[HistoryContentFilter(contentTopic: DefaultContentTopic)]) + let queryRes = await client.query(req) + + ## Then + check queryRes.isOk() + let response = queryRes.get() check: - (await completionFut.withTimeout(5.seconds)) == true - await node1.stop() - await node2.stop() + response.messages == @[message] + + # Cleanup + await allFutures(client.stop(), server.stop()) asyncTest "Store protocol returns expected message when relay is disabled and filter enabled": - # See nwaku issue #937: 'Store: ability to decouple store from relay' - + ## See nwaku issue #937: 'Store: ability to decouple store from relay' + ## Setup let - nodeKey1 = crypto.PrivateKey.random(Secp256k1, rng[])[] - node1 = WakuNode.new(nodeKey1, ValidIpAddress.init("0.0.0.0"), Port(60000)) - nodeKey2 = crypto.PrivateKey.random(Secp256k1, rng[])[] - node2 = WakuNode.new(nodeKey2, ValidIpAddress.init("0.0.0.0"), Port(60002)) + filterSourceKey = crypto.PrivateKey.random(Secp256k1, rng[])[] + filterSource = WakuNode.new(filterSourceKey, ValidIpAddress.init("0.0.0.0"), Port(60004)) + serverKey = crypto.PrivateKey.random(Secp256k1, rng[])[] + server = WakuNode.new(serverKey, ValidIpAddress.init("0.0.0.0"), Port(60002)) + clientKey = crypto.PrivateKey.random(Secp256k1, rng[])[] + client = WakuNode.new(clientKey, ValidIpAddress.init("0.0.0.0"), Port(60000)) - let - pubSubTopic = "/waku/2/default-waku/proto" - contentTopic = ContentTopic("/waku/2/default-content/proto") - message = WakuMessage(payload: "hello world".toBytes(), contentTopic: contentTopic) + await allFutures(client.start(), server.start(), filterSource.start()) - let - filterComplFut = newFuture[bool]() - storeComplFut = newFuture[bool]() + await filterSource.mountFilter() + await server.mountStore(store=newTestMessageStore()) + await server.mountFilter() + await client.mountStore() - await node1.start() - await node1.mountStore(store=newTestMessageStore()) - await node1.mountFilter() + server.wakuFilter.setPeer(filterSource.peerInfo.toRemotePeerInfo()) + client.wakuStore.setPeer(server.peerInfo.toRemotePeerInfo()) - await node2.start() - await node2.mountStore(store=newTestMessageStore()) - await node2.mountFilter() - - node2.wakuFilter.setPeer(node1.switch.peerInfo.toRemotePeerInfo()) - node1.wakuStore.setPeer(node2.switch.peerInfo.toRemotePeerInfo()) + ## Given + let message = fakeWakuMessage() + ## Then + let filterFut = newFuture[bool]() proc filterReqHandler(msg: WakuMessage) {.gcsafe, closure.} = check: msg == message - filterComplFut.complete(true) + filterFut.complete(true) - await node2.subscribe(FilterRequest(pubSubTopic: pubSubTopic, contentFilters: @[ContentFilter(contentTopic: contentTopic)], subscribe: true), filterReqHandler) + let filterReq = FilterRequest(pubSubTopic: DefaultPubsubTopic, contentFilters: @[ContentFilter(contentTopic: DefaultContentTopic)], subscribe: true) + await server.subscribe(filterReq, filterReqHandler) - await sleepAsync(500.millis) + await sleepAsync(100.millis) - # Send filter push message to node2 - await node1.wakuFilter.handleMessage(pubSubTopic, message) + # Send filter push message to server from source node + await filterSource.wakuFilter.handleMessage(DefaultPubsubTopic, message) - await sleepAsync(500.millis) + # Wait for the server filter to receive the push message + require (await filterFut.withTimeout(5.seconds)) - # Wait for the node2 filter to receive the push message + let res = await client.query(HistoryQuery(contentFilters: @[HistoryContentFilter(contentTopic: DefaultContentTopic)])) + + ## Then + check res.isOk() + + let response = res.get() check: - (await filterComplFut.withTimeout(5.seconds)) == true + response.messages.len == 1 + response.messages[0] == message - proc node1StoreQueryRespHandler(response: HistoryResponse) {.gcsafe, closure.} = - check: - response.messages.len == 1 - response.messages[0] == message - storeComplFut.complete(true) + ## Cleanup + await allFutures(client.stop(), server.stop(), filterSource.stop()) - await node1.query(HistoryQuery(contentFilters: @[HistoryContentFilter(contentTopic: contentTopic)]), node1StoreQueryRespHandler) - - check: - (await storeComplFut.withTimeout(5.seconds)) == true - - await node1.stop() - await node2.stop() asyncTest "Resume proc fetches the history": + ## Setup let - nodeKey1 = crypto.PrivateKey.random(Secp256k1, rng[])[] - node1 = WakuNode.new(nodeKey1, ValidIpAddress.init("0.0.0.0"), Port(60000)) - nodeKey2 = crypto.PrivateKey.random(Secp256k1, rng[])[] - node2 = WakuNode.new(nodeKey2, ValidIpAddress.init("0.0.0.0"), Port(60002)) - - let - contentTopic = ContentTopic("/waku/2/default-content/proto") - message = WakuMessage(payload: "hello world".toBytes(), contentTopic: contentTopic) - - await node1.start() - await node1.mountStore(store=newTestMessageStore()) - await node2.start() - await node2.mountStore(store=StoreQueueRef.new()) - - node2.wakuStore.handleMessage("/waku/2/default-waku/proto", message) - - await sleepAsync(500.millis) - - node1.wakuStore.setPeer(node2.switch.peerInfo.toRemotePeerInfo()) - - await node1.resume() - - check: - # message is correctly stored - node1.wakuStore.store.getMessagesCount().tryGet() == 1 - - await node1.stop() - await node2.stop() - - asyncTest "Resume proc discards duplicate messages": - let timeOrigin = getNanosecondTime(getTime().toUnixFloat()) - let - nodeKey1 = crypto.PrivateKey.random(Secp256k1, rng[])[] - client = WakuNode.new(nodeKey1, ValidIpAddress.init("0.0.0.0"), Port(60000)) - nodeKey2 = crypto.PrivateKey.random(Secp256k1, rng[])[] - server = WakuNode.new(nodeKey2, ValidIpAddress.init("0.0.0.0"), Port(60002)) - - let - contentTopic = ContentTopic("/waku/2/default-content/proto") - msg1 = WakuMessage(payload: "hello world1".toBytes(), contentTopic: contentTopic, timestamp: timeOrigin + 1) - msg2 = WakuMessage(payload: "hello world2".toBytes(), contentTopic: contentTopic, timestamp: timeOrigin + 2) - msg3 = WakuMessage(payload: "hello world3".toBytes(), contentTopic: contentTopic, timestamp: timeOrigin + 3) + serverKey = crypto.PrivateKey.random(Secp256k1, rng[])[] + server = WakuNode.new(serverKey, ValidIpAddress.init("0.0.0.0"), Port(60002)) + clientKey = crypto.PrivateKey.random(Secp256k1, rng[])[] + client = WakuNode.new(clientKey, ValidIpAddress.init("0.0.0.0"), Port(60000)) await allFutures(client.start(), server.start()) + + await server.mountStore(store=newTestMessageStore()) + await client.mountStore(store=StoreQueueRef.new()) + + client.wakuStore.setPeer(server.peerInfo.toRemotePeerInfo()) + + ## Given + let message = fakeWakuMessage() + server.wakuStore.handleMessage(DefaultPubsubTopic, message) + + ## When + await client.resume() + + # Then + check: + client.wakuStore.store.getMessagesCount().tryGet() == 1 + + ## Cleanup + await allFutures(client.stop(), server.stop()) + + + asyncTest "Resume proc discards duplicate messages": + ## Setup + let + serverKey = crypto.PrivateKey.random(Secp256k1, rng[])[] + server = WakuNode.new(serverKey, ValidIpAddress.init("0.0.0.0"), Port(60002)) + clientKey = crypto.PrivateKey.random(Secp256k1, rng[])[] + client = WakuNode.new(clientKey, ValidIpAddress.init("0.0.0.0"), Port(60000)) + + await allFutures(server.start(), client.start()) await client.mountStore(store=StoreQueueRef.new()) await server.mountStore(store=StoreQueueRef.new()) + client.wakuStore.setPeer(server.peerInfo.toRemotePeerInfo()) + + ## Given + let timeOrigin = now() + let + msg1 = fakeWakuMessage(payload="hello world1", ts=(timeOrigin + getNanoSecondTime(1))) + msg2 = fakeWakuMessage(payload="hello world2", ts=(timeOrigin + getNanoSecondTime(2))) + msg3 = fakeWakuMessage(payload="hello world3", ts=(timeOrigin + getNanoSecondTime(3))) + server.wakuStore.handleMessage(DefaultTopic, msg1) server.wakuStore.handleMessage(DefaultTopic, msg2) - client.wakuStore.setPeer(server.switch.peerInfo.toRemotePeerInfo()) - # Insert the same message in both node's store let index3 = Index.compute(msg3, getNanosecondTime(getTime().toUnixFloat() + 10.float), DefaultTopic) require server.wakuStore.store.put(index3, msg3, DefaultTopic).isOk() require client.wakuStore.store.put(index3, msg3, DefaultTopic).isOk() - # now run the resume proc + ## When await client.resume() + ## Then check: # If the duplicates are discarded properly, then the total number of messages after resume should be 3 client.wakuStore.store.getMessagesCount().tryGet() == 3 diff --git a/waku/v2/node/jsonrpc/store_api.nim b/waku/v2/node/jsonrpc/store_api.nim index 0097c2cda..e7ecc8655 100644 --- a/waku/v2/node/jsonrpc/store_api.nim +++ b/waku/v2/node/jsonrpc/store_api.nim @@ -26,23 +26,20 @@ proc installStoreApiHandlers*(node: WakuNode, rpcsrv: RpcServer) = ## Returns history for a list of content topics with optional paging debug "get_waku_v2_store_v1_messages" - var responseFut = newFuture[StoreResponse]() - - proc queryFuncHandler(response: HistoryResponse) {.gcsafe, closure.} = - debug "get_waku_v2_store_v1_messages response" - responseFut.complete(response.toStoreResponse()) - let historyQuery = HistoryQuery(pubsubTopic: if pubsubTopicOption.isSome: pubsubTopicOption.get() else: "", contentFilters: if contentFiltersOption.isSome: contentFiltersOption.get() else: @[], startTime: if startTime.isSome: startTime.get() else: Timestamp(0), endTime: if endTime.isSome: endTime.get() else: Timestamp(0), pagingInfo: if pagingOptions.isSome: pagingOptions.get.toPagingInfo() else: PagingInfo()) - - await node.query(historyQuery, queryFuncHandler) + let req = node.query(historyQuery) - if (await responseFut.withTimeout(futTimeout)): - # Future completed - return responseFut.read() - else: + if not (await req.withTimeout(futTimeout)): # Future failed to complete - raise newException(ValueError, "No history response received") \ No newline at end of file + raise newException(ValueError, "No history response received (timeout)") + + let res = req.read() + if res.isErr(): + raise newException(ValueError, $res.error()) + + debug "get_waku_v2_store_v1_messages response" + return res.value.toStoreResponse() diff --git a/waku/v2/node/wakunode2.nim b/waku/v2/node/wakunode2.nim index 7f30ba00f..a69a68f96 100644 --- a/waku/v2/node/wakunode2.nim +++ b/waku/v2/node/wakunode2.nim @@ -198,6 +198,9 @@ proc new*(T: type WakuNode, nodeKey: crypto.PrivateKey, return wakuNode +proc peerInfo*(node: WakuNode): PeerInfo = + node.switch.peerInfo + proc subscribe(node: WakuNode, topic: Topic, handler: Option[TopicHandler]) = if node.wakuRelay.isNil: error "Invalid API call to `subscribe`. WakuRelay not mounted." @@ -354,19 +357,8 @@ proc lightpush*(node: WakuNode, topic: Topic, message: WakuMessage): Future[Waku let rpc = PushRequest(pubSubTopic: topic, message: message) return await node.wakuLightPush.request(rpc) -proc lightpush*(node: WakuNode, topic: Topic, message: WakuMessage, handler: PushResponseHandler) {.async, gcsafe, - deprecated: "Use the no-callback version of this method".} = - ## Pushes a `WakuMessage` to a node which relays it further on PubSub topic. - ## Returns whether relaying was successful or not in `handler`. - ## `WakuMessage` should contain a `contentTopic` field for light node - ## functionality. - - let rpc = PushRequest(pubSubTopic: topic, message: message) - let res = await node.wakuLightPush.request(rpc) - if res.isOk(): - handler(res.value) - else: - error "Message lightpush failed", error=res.error() +proc lightpush2*(node: WakuNode, topic: Topic, message: WakuMessage) {.async, gcsafe.} = + discard await node.lightpush(topic, message) proc query*(node: WakuNode, query: HistoryQuery): Future[WakuStoreResult[HistoryResponse]] {.async, gcsafe.} = ## Queries known nodes for historical messages @@ -380,17 +372,6 @@ proc query*(node: WakuNode, query: HistoryQuery): Future[WakuStoreResult[History # TODO: wakuSwap now part of wakuStore object return await node.wakuStore.queryWithAccounting(query) -proc query*(node: WakuNode, query: HistoryQuery, handler: QueryHandlerFunc) {.async, gcsafe, - deprecated: "Use the no-callback version of this method".} = - ## Queries known nodes for historical messages. Triggers the handler whenever a response is received. - ## QueryHandlerFunc is a method that takes a HistoryResponse. - - let res = await node.query(query) - if res.isOk(): - handler(res.value) - else: - error "History query failed", error=res.error() - proc resume*(node: WakuNode, peerList: Option[seq[RemotePeerInfo]] = none(seq[RemotePeerInfo])) {.async, gcsafe.} = ## resume proc retrieves the history of waku messages published on the default waku pubsub topic since the last time the waku node has been online ## for resume to work properly the waku node must have the store protocol mounted in the full mode (i.e., persisting messages) diff --git a/waku/v2/protocol/waku_store/protocol.nim b/waku/v2/protocol/waku_store/protocol.nim index 1d2804a91..270456db1 100644 --- a/waku/v2/protocol/waku_store/protocol.nim +++ b/waku/v2/protocol/waku_store/protocol.nim @@ -246,9 +246,6 @@ proc setPeer*(ws: WakuStore, peer: RemotePeerInfo) = ws.peerManager.addPeer(peer, WakuStoreCodec) waku_store_peers.inc() -# TODO: Remove after converting the query method into a non-callback method -type QueryHandlerFunc* = proc(response: HistoryResponse) {.gcsafe, closure.} - proc query(w: WakuStore, req: HistoryQuery, peer: RemotePeerInfo): Future[WakuStoreResult[HistoryResponse]] {.async, gcsafe.} = let connOpt = await w.peerManager.dialPeer(peer, WakuStoreCodec) if connOpt.isNone():