From 797c82b0302ffe32f49641027b50b4822fb2f7c2 Mon Sep 17 00:00:00 2001 From: Lorenzo Delgado Date: Fri, 9 Sep 2022 15:54:16 +0200 Subject: [PATCH] refactor(wakunode): split wakunode store tests into a new file --- tests/all_tests_v2.nim | 1 + tests/v2/test_wakunode.nim | 193 +-------------------------- tests/v2/test_wakunode_store.nim | 216 +++++++++++++++++++++++++++++++ 3 files changed, 218 insertions(+), 192 deletions(-) create mode 100644 tests/v2/test_wakunode_store.nim diff --git a/tests/all_tests_v2.nim b/tests/all_tests_v2.nim index 9c02d1a28..0dd1c672c 100644 --- a/tests/all_tests_v2.nim +++ b/tests/all_tests_v2.nim @@ -6,6 +6,7 @@ import ./v2/test_wakunode_lightpush, ./v2/test_waku_store_rpc_codec, ./v2/test_waku_store, + ./v2/test_wakunode_store, ./v2/test_waku_filter, ./v2/test_wakunode_filter, ./v2/test_waku_payload, diff --git a/tests/v2/test_wakunode.nim b/tests/v2/test_wakunode.nim index 6d9e77ea9..70471bd2c 100644 --- a/tests/v2/test_wakunode.nim +++ b/tests/v2/test_wakunode.nim @@ -12,21 +12,13 @@ import libp2p/protocols/pubsub/gossipsub, libp2p/nameresolving/mockresolver, eth/keys, - ../../waku/v2/node/storage/sqlite, - ../../waku/v2/node/storage/message/waku_message_store, - ../../waku/v2/node/storage/message/waku_store_queue, ../../waku/v2/protocol/[waku_relay, waku_message], - ../../waku/v2/protocol/waku_store, ../../waku/v2/protocol/waku_filter, ../../waku/v2/node/peer_manager/peer_manager, ../../waku/v2/utils/peers, - ../../waku/v2/utils/pagination, - ../../waku/v2/utils/time, ../../waku/v2/node/wakunode2 -from std/times import epochTime - template sourceDir: string = currentSourcePath.parentDir() const KEY_PATH = sourceDir / "resources/test_key.pem" @@ -34,102 +26,7 @@ const CERT_PATH = sourceDir / "resources/test_cert.pem" procSuite "WakuNode": let rng = crypto.newRng() - - asyncTest "Store protocol returns expected message": - let - nodeKey1 = crypto.PrivateKey.random(Secp256k1, rng[])[] - node1 = WakuNode.new(nodeKey1, ValidIpAddress.init("0.0.0.0"), - Port(60000)) - nodeKey2 = crypto.PrivateKey.random(Secp256k1, rng[])[] - node2 = WakuNode.new(nodeKey2, ValidIpAddress.init("0.0.0.0"), - Port(60002)) - contentTopic = ContentTopic("/waku/2/default-content/proto") - message = WakuMessage(payload: "hello world".toBytes(), contentTopic: contentTopic) - - var completionFut = newFuture[bool]() - - await node1.start() - await node1.mountStore(persistMessages = true) - await node2.start() - await node2.mountStore(persistMessages = true) - - await node2.wakuStore.handleMessage("/waku/2/default-waku/proto", message) - - await sleepAsync(2000.millis) - - node1.wakuStore.setPeer(node2.switch.peerInfo.toRemotePeerInfo()) - - proc storeHandler(response: HistoryResponse) {.gcsafe, closure.} = - check: - response.messages[0] == message - completionFut.complete(true) - - await node1.query(HistoryQuery(contentFilters: @[HistoryContentFilter(contentTopic: contentTopic)]), storeHandler) - - check: - (await completionFut.withTimeout(5.seconds)) == true - await node1.stop() - await node2.stop() - - asyncTest "Store protocol returns expected message when relay is disabled and filter enabled": - # See nwaku issue #937: 'Store: ability to decouple store from relay' - - let - nodeKey1 = crypto.PrivateKey.random(Secp256k1, rng[])[] - node1 = WakuNode.new(nodeKey1, ValidIpAddress.init("0.0.0.0"), Port(60000)) - nodeKey2 = crypto.PrivateKey.random(Secp256k1, rng[])[] - node2 = WakuNode.new(nodeKey2, ValidIpAddress.init("0.0.0.0"), Port(60002)) - - pubSubTopic = "/waku/2/default-waku/proto" - contentTopic = ContentTopic("/waku/2/default-content/proto") - message = WakuMessage(payload: "hello world".toBytes(), contentTopic: contentTopic) - - filterComplFut = newFuture[bool]() - storeComplFut = newFuture[bool]() - - await node1.start() - await node1.mountStore(persistMessages = true) - await node1.mountFilter() - - await node2.start() - await node2.mountStore(persistMessages = true) - await node2.mountFilter() - - node2.wakuFilter.setPeer(node1.switch.peerInfo.toRemotePeerInfo()) - node1.wakuStore.setPeer(node2.switch.peerInfo.toRemotePeerInfo()) - - proc filterReqHandler(msg: WakuMessage) {.gcsafe, closure.} = - check: - msg == message - filterComplFut.complete(true) - - await node2.subscribe(FilterRequest(pubSubTopic: pubSubTopic, contentFilters: @[ContentFilter(contentTopic: contentTopic)], subscribe: true), filterReqHandler) - - await sleepAsync(2000.millis) - - # Send filter push message to node2 - await node1.wakuFilter.handleMessage(pubSubTopic, message) - - await sleepAsync(2000.millis) - - # Wait for the node2 filter to receive the push message - check: - (await filterComplFut.withTimeout(5.seconds)) == true - - proc node1StoreQueryRespHandler(response: HistoryResponse) {.gcsafe, closure.} = - check: - response.messages.len == 1 - response.messages[0] == message - storeComplFut.complete(true) - - await node1.query(HistoryQuery(contentFilters: @[HistoryContentFilter(contentTopic: contentTopic)]), node1StoreQueryRespHandler) - - check: - (await storeComplFut.withTimeout(5.seconds)) == true - - await node1.stop() - await node2.stop() - + asyncTest "Messages are correctly relayed": let nodeKey1 = crypto.PrivateKey.random(Secp256k1, rng[])[] @@ -399,94 +296,6 @@ procSuite "WakuNode": await allFutures([node1.stop(), node2.stop()]) - asyncTest "Resume proc fetches the history": - let - nodeKey1 = crypto.PrivateKey.random(Secp256k1, rng[])[] - node1 = WakuNode.new(nodeKey1, ValidIpAddress.init("0.0.0.0"), - Port(60000)) - nodeKey2 = crypto.PrivateKey.random(Secp256k1, rng[])[] - node2 = WakuNode.new(nodeKey2, ValidIpAddress.init("0.0.0.0"), - Port(60002)) - contentTopic = ContentTopic("/waku/2/default-content/proto") - message = WakuMessage(payload: "hello world".toBytes(), contentTopic: contentTopic) - - await node1.start() - await node1.mountStore(persistMessages = true) - await node2.start() - await node2.mountStore(persistMessages = true) - - await node2.wakuStore.handleMessage("/waku/2/default-waku/proto", message) - - await sleepAsync(2000.millis) - - node1.wakuStore.setPeer(node2.switch.peerInfo.toRemotePeerInfo()) - - await node1.resume() - - check: - # message is correctly stored - node1.wakuStore.messages.len == 1 - - await node1.stop() - await node2.stop() - - asyncTest "Resume proc discards duplicate messages": - let - nodeKey1 = crypto.PrivateKey.random(Secp256k1, rng[])[] - node1 = WakuNode.new(nodeKey1, ValidIpAddress.init("0.0.0.0"), - Port(60000)) - nodeKey2 = crypto.PrivateKey.random(Secp256k1, rng[])[] - node2 = WakuNode.new(nodeKey2, ValidIpAddress.init("0.0.0.0"), - Port(60002)) - contentTopic = ContentTopic("/waku/2/default-content/proto") - msg1 = WakuMessage(payload: "hello world1".toBytes(), contentTopic: contentTopic, timestamp: 1) - msg2 = WakuMessage(payload: "hello world2".toBytes(), contentTopic: contentTopic, timestamp: 2) - - # setup sqlite database for node1 - let - database = SqliteDatabase.init("", inMemory = true)[] - store = WakuMessageStore.init(database)[] - - - var completionFut = newFuture[bool]() - - await node1.start() - await node1.mountStore(persistMessages = true, store = store) - await node2.start() - await node2.mountStore(persistMessages = true) - - await node2.wakuStore.handleMessage(DefaultTopic, msg1) - await node2.wakuStore.handleMessage(DefaultTopic, msg2) - - await sleepAsync(2000.millis) - - node1.wakuStore.setPeer(node2.switch.peerInfo.toRemotePeerInfo()) - - - # populate db with msg1 to be a duplicate - let index1 = Index.compute(msg1, getNanosecondTime(epochTime()), DefaultTopic) - let output1 = store.put(index1, msg1, DefaultTopic) - check output1.isOk - discard node1.wakuStore.messages.put(index1, msg1, DefaultTopic) - - # now run the resume proc - await node1.resume() - - # count the total number of retrieved messages from the database - let res = store.getAllMessages() - check: - res.isOk() - - check: - # if the duplicates are discarded properly, then the total number of messages after resume should be 2 - # check no duplicates is in the messages field - node1.wakuStore.messages.len == 2 - # check no duplicates is in the db - res.value.len == 2 - - await node1.stop() - await node2.stop() - asyncTest "Maximum connections can be configured": let maxConnections = 2 diff --git a/tests/v2/test_wakunode_store.nim b/tests/v2/test_wakunode_store.nim new file mode 100644 index 000000000..78b034af5 --- /dev/null +++ b/tests/v2/test_wakunode_store.nim @@ -0,0 +1,216 @@ +{.used.} + +import + stew/byteutils, + stew/shims/net as stewNet, + testutils/unittests, + chronicles, + chronos, + libp2p/crypto/crypto, + libp2p/peerid, + libp2p/multiaddress, + libp2p/switch, + libp2p/protocols/pubsub/rpc/messages, + libp2p/protocols/pubsub/pubsub, + libp2p/protocols/pubsub/gossipsub, + eth/keys +import + ../../waku/v2/node/storage/sqlite, + ../../waku/v2/node/storage/message/waku_message_store, + ../../waku/v2/node/storage/message/waku_store_queue, + ../../waku/v2/protocol/[waku_relay, waku_message], + ../../waku/v2/protocol/waku_store, + ../../waku/v2/protocol/waku_filter, + ../../waku/v2/node/peer_manager/peer_manager, + ../../waku/v2/utils/peers, + ../../waku/v2/utils/pagination, + ../../waku/v2/utils/time, + ../../waku/v2/node/wakunode2 + +from std/times import epochTime + + +procSuite "WakuNode - Store": + let rng = keys.newRng() + + asyncTest "Store protocol returns expected message": + let + nodeKey1 = crypto.PrivateKey.random(Secp256k1, rng[])[] + node1 = WakuNode.new(nodeKey1, ValidIpAddress.init("0.0.0.0"), + Port(60000)) + nodeKey2 = crypto.PrivateKey.random(Secp256k1, rng[])[] + node2 = WakuNode.new(nodeKey2, ValidIpAddress.init("0.0.0.0"), + Port(60002)) + contentTopic = ContentTopic("/waku/2/default-content/proto") + message = WakuMessage(payload: "hello world".toBytes(), contentTopic: contentTopic) + + var completionFut = newFuture[bool]() + + await node1.start() + await node1.mountStore(persistMessages = true) + await node2.start() + await node2.mountStore(persistMessages = true) + + await node2.wakuStore.handleMessage("/waku/2/default-waku/proto", message) + + await sleepAsync(500.millis) + + node1.wakuStore.setPeer(node2.switch.peerInfo.toRemotePeerInfo()) + + proc storeHandler(response: HistoryResponse) {.gcsafe, closure.} = + check: + response.messages[0] == message + completionFut.complete(true) + + await node1.query(HistoryQuery(contentFilters: @[HistoryContentFilter(contentTopic: contentTopic)]), storeHandler) + + check: + (await completionFut.withTimeout(5.seconds)) == true + await node1.stop() + await node2.stop() + + asyncTest "Store protocol returns expected message when relay is disabled and filter enabled": + # See nwaku issue #937: 'Store: ability to decouple store from relay' + + let + nodeKey1 = crypto.PrivateKey.random(Secp256k1, rng[])[] + node1 = WakuNode.new(nodeKey1, ValidIpAddress.init("0.0.0.0"), Port(60000)) + nodeKey2 = crypto.PrivateKey.random(Secp256k1, rng[])[] + node2 = WakuNode.new(nodeKey2, ValidIpAddress.init("0.0.0.0"), Port(60002)) + + let + pubSubTopic = "/waku/2/default-waku/proto" + contentTopic = ContentTopic("/waku/2/default-content/proto") + message = WakuMessage(payload: "hello world".toBytes(), contentTopic: contentTopic) + + let + filterComplFut = newFuture[bool]() + storeComplFut = newFuture[bool]() + + await node1.start() + await node1.mountStore(persistMessages = true) + await node1.mountFilter() + + await node2.start() + await node2.mountStore(persistMessages = true) + await node2.mountFilter() + + node2.wakuFilter.setPeer(node1.switch.peerInfo.toRemotePeerInfo()) + node1.wakuStore.setPeer(node2.switch.peerInfo.toRemotePeerInfo()) + + proc filterReqHandler(msg: WakuMessage) {.gcsafe, closure.} = + check: + msg == message + filterComplFut.complete(true) + + await node2.subscribe(FilterRequest(pubSubTopic: pubSubTopic, contentFilters: @[ContentFilter(contentTopic: contentTopic)], subscribe: true), filterReqHandler) + + await sleepAsync(500.millis) + + # Send filter push message to node2 + await node1.wakuFilter.handleMessage(pubSubTopic, message) + + await sleepAsync(500.millis) + + # Wait for the node2 filter to receive the push message + check: + (await filterComplFut.withTimeout(5.seconds)) == true + + proc node1StoreQueryRespHandler(response: HistoryResponse) {.gcsafe, closure.} = + check: + response.messages.len == 1 + response.messages[0] == message + storeComplFut.complete(true) + + await node1.query(HistoryQuery(contentFilters: @[HistoryContentFilter(contentTopic: contentTopic)]), node1StoreQueryRespHandler) + + check: + (await storeComplFut.withTimeout(5.seconds)) == true + + await node1.stop() + await node2.stop() + + asyncTest "Resume proc fetches the history": + let + nodeKey1 = crypto.PrivateKey.random(Secp256k1, rng[])[] + node1 = WakuNode.new(nodeKey1, ValidIpAddress.init("0.0.0.0"), Port(60000)) + nodeKey2 = crypto.PrivateKey.random(Secp256k1, rng[])[] + node2 = WakuNode.new(nodeKey2, ValidIpAddress.init("0.0.0.0"), Port(60002)) + + let + contentTopic = ContentTopic("/waku/2/default-content/proto") + message = WakuMessage(payload: "hello world".toBytes(), contentTopic: contentTopic) + + await node1.start() + await node1.mountStore(persistMessages = true) + await node2.start() + await node2.mountStore(persistMessages = true) + + await node2.wakuStore.handleMessage("/waku/2/default-waku/proto", message) + + await sleepAsync(500.millis) + + node1.wakuStore.setPeer(node2.switch.peerInfo.toRemotePeerInfo()) + + await node1.resume() + + check: + # message is correctly stored + node1.wakuStore.messages.len == 1 + + await node1.stop() + await node2.stop() + + asyncTest "Resume proc discards duplicate messages": + let + nodeKey1 = crypto.PrivateKey.random(Secp256k1, rng[])[] + node1 = WakuNode.new(nodeKey1, ValidIpAddress.init("0.0.0.0"), Port(60000)) + nodeKey2 = crypto.PrivateKey.random(Secp256k1, rng[])[] + node2 = WakuNode.new(nodeKey2, ValidIpAddress.init("0.0.0.0"), Port(60002)) + + let + contentTopic = ContentTopic("/waku/2/default-content/proto") + msg1 = WakuMessage(payload: "hello world1".toBytes(), contentTopic: contentTopic, timestamp: 1) + msg2 = WakuMessage(payload: "hello world2".toBytes(), contentTopic: contentTopic, timestamp: 2) + + # setup sqlite database for node1 + let + database = SqliteDatabase.init("", inMemory = true)[] + store = WakuMessageStore.init(database)[] + + await node1.start() + await node1.mountStore(persistMessages = true, store = store) + await node2.start() + await node2.mountStore(persistMessages = true) + + await node2.wakuStore.handleMessage(DefaultTopic, msg1) + await node2.wakuStore.handleMessage(DefaultTopic, msg2) + + await sleepAsync(500.millis) + + node1.wakuStore.setPeer(node2.switch.peerInfo.toRemotePeerInfo()) + + + # populate db with msg1 to be a duplicate + let index1 = Index.compute(msg1, getNanosecondTime(epochTime()), DefaultTopic) + let output1 = store.put(index1, msg1, DefaultTopic) + check output1.isOk + discard node1.wakuStore.messages.put(index1, msg1, DefaultTopic) + + # now run the resume proc + await node1.resume() + + # count the total number of retrieved messages from the database + let res = store.getAllMessages() + check: + res.isOk() + + check: + # if the duplicates are discarded properly, then the total number of messages after resume should be 2 + # check no duplicates is in the messages field + node1.wakuStore.messages.len == 2 + # check no duplicates is in the db + res.value.len == 2 + + await node1.stop() + await node2.stop() \ No newline at end of file