mirror of
https://github.com/waku-org/nwaku.git
synced 2025-01-26 14:51:49 +00:00
refactor(wakunode): split wakunode store tests into a new file
This commit is contained in:
parent
30f0e19d79
commit
797c82b030
@ -6,6 +6,7 @@ import
|
||||
./v2/test_wakunode_lightpush,
|
||||
./v2/test_waku_store_rpc_codec,
|
||||
./v2/test_waku_store,
|
||||
./v2/test_wakunode_store,
|
||||
./v2/test_waku_filter,
|
||||
./v2/test_wakunode_filter,
|
||||
./v2/test_waku_payload,
|
||||
|
@ -12,21 +12,13 @@ import
|
||||
libp2p/protocols/pubsub/gossipsub,
|
||||
libp2p/nameresolving/mockresolver,
|
||||
eth/keys,
|
||||
../../waku/v2/node/storage/sqlite,
|
||||
../../waku/v2/node/storage/message/waku_message_store,
|
||||
../../waku/v2/node/storage/message/waku_store_queue,
|
||||
../../waku/v2/protocol/[waku_relay, waku_message],
|
||||
../../waku/v2/protocol/waku_store,
|
||||
../../waku/v2/protocol/waku_filter,
|
||||
../../waku/v2/node/peer_manager/peer_manager,
|
||||
../../waku/v2/utils/peers,
|
||||
../../waku/v2/utils/pagination,
|
||||
../../waku/v2/utils/time,
|
||||
../../waku/v2/node/wakunode2
|
||||
|
||||
|
||||
from std/times import epochTime
|
||||
|
||||
|
||||
template sourceDir: string = currentSourcePath.parentDir()
|
||||
const KEY_PATH = sourceDir / "resources/test_key.pem"
|
||||
@ -34,102 +26,7 @@ const CERT_PATH = sourceDir / "resources/test_cert.pem"
|
||||
|
||||
procSuite "WakuNode":
|
||||
let rng = crypto.newRng()
|
||||
|
||||
asyncTest "Store protocol returns expected message":
|
||||
let
|
||||
nodeKey1 = crypto.PrivateKey.random(Secp256k1, rng[])[]
|
||||
node1 = WakuNode.new(nodeKey1, ValidIpAddress.init("0.0.0.0"),
|
||||
Port(60000))
|
||||
nodeKey2 = crypto.PrivateKey.random(Secp256k1, rng[])[]
|
||||
node2 = WakuNode.new(nodeKey2, ValidIpAddress.init("0.0.0.0"),
|
||||
Port(60002))
|
||||
contentTopic = ContentTopic("/waku/2/default-content/proto")
|
||||
message = WakuMessage(payload: "hello world".toBytes(), contentTopic: contentTopic)
|
||||
|
||||
var completionFut = newFuture[bool]()
|
||||
|
||||
await node1.start()
|
||||
await node1.mountStore(persistMessages = true)
|
||||
await node2.start()
|
||||
await node2.mountStore(persistMessages = true)
|
||||
|
||||
await node2.wakuStore.handleMessage("/waku/2/default-waku/proto", message)
|
||||
|
||||
await sleepAsync(2000.millis)
|
||||
|
||||
node1.wakuStore.setPeer(node2.switch.peerInfo.toRemotePeerInfo())
|
||||
|
||||
proc storeHandler(response: HistoryResponse) {.gcsafe, closure.} =
|
||||
check:
|
||||
response.messages[0] == message
|
||||
completionFut.complete(true)
|
||||
|
||||
await node1.query(HistoryQuery(contentFilters: @[HistoryContentFilter(contentTopic: contentTopic)]), storeHandler)
|
||||
|
||||
check:
|
||||
(await completionFut.withTimeout(5.seconds)) == true
|
||||
await node1.stop()
|
||||
await node2.stop()
|
||||
|
||||
asyncTest "Store protocol returns expected message when relay is disabled and filter enabled":
|
||||
# See nwaku issue #937: 'Store: ability to decouple store from relay'
|
||||
|
||||
let
|
||||
nodeKey1 = crypto.PrivateKey.random(Secp256k1, rng[])[]
|
||||
node1 = WakuNode.new(nodeKey1, ValidIpAddress.init("0.0.0.0"), Port(60000))
|
||||
nodeKey2 = crypto.PrivateKey.random(Secp256k1, rng[])[]
|
||||
node2 = WakuNode.new(nodeKey2, ValidIpAddress.init("0.0.0.0"), Port(60002))
|
||||
|
||||
pubSubTopic = "/waku/2/default-waku/proto"
|
||||
contentTopic = ContentTopic("/waku/2/default-content/proto")
|
||||
message = WakuMessage(payload: "hello world".toBytes(), contentTopic: contentTopic)
|
||||
|
||||
filterComplFut = newFuture[bool]()
|
||||
storeComplFut = newFuture[bool]()
|
||||
|
||||
await node1.start()
|
||||
await node1.mountStore(persistMessages = true)
|
||||
await node1.mountFilter()
|
||||
|
||||
await node2.start()
|
||||
await node2.mountStore(persistMessages = true)
|
||||
await node2.mountFilter()
|
||||
|
||||
node2.wakuFilter.setPeer(node1.switch.peerInfo.toRemotePeerInfo())
|
||||
node1.wakuStore.setPeer(node2.switch.peerInfo.toRemotePeerInfo())
|
||||
|
||||
proc filterReqHandler(msg: WakuMessage) {.gcsafe, closure.} =
|
||||
check:
|
||||
msg == message
|
||||
filterComplFut.complete(true)
|
||||
|
||||
await node2.subscribe(FilterRequest(pubSubTopic: pubSubTopic, contentFilters: @[ContentFilter(contentTopic: contentTopic)], subscribe: true), filterReqHandler)
|
||||
|
||||
await sleepAsync(2000.millis)
|
||||
|
||||
# Send filter push message to node2
|
||||
await node1.wakuFilter.handleMessage(pubSubTopic, message)
|
||||
|
||||
await sleepAsync(2000.millis)
|
||||
|
||||
# Wait for the node2 filter to receive the push message
|
||||
check:
|
||||
(await filterComplFut.withTimeout(5.seconds)) == true
|
||||
|
||||
proc node1StoreQueryRespHandler(response: HistoryResponse) {.gcsafe, closure.} =
|
||||
check:
|
||||
response.messages.len == 1
|
||||
response.messages[0] == message
|
||||
storeComplFut.complete(true)
|
||||
|
||||
await node1.query(HistoryQuery(contentFilters: @[HistoryContentFilter(contentTopic: contentTopic)]), node1StoreQueryRespHandler)
|
||||
|
||||
check:
|
||||
(await storeComplFut.withTimeout(5.seconds)) == true
|
||||
|
||||
await node1.stop()
|
||||
await node2.stop()
|
||||
|
||||
|
||||
asyncTest "Messages are correctly relayed":
|
||||
let
|
||||
nodeKey1 = crypto.PrivateKey.random(Secp256k1, rng[])[]
|
||||
@ -399,94 +296,6 @@ procSuite "WakuNode":
|
||||
|
||||
await allFutures([node1.stop(), node2.stop()])
|
||||
|
||||
asyncTest "Resume proc fetches the history":
|
||||
let
|
||||
nodeKey1 = crypto.PrivateKey.random(Secp256k1, rng[])[]
|
||||
node1 = WakuNode.new(nodeKey1, ValidIpAddress.init("0.0.0.0"),
|
||||
Port(60000))
|
||||
nodeKey2 = crypto.PrivateKey.random(Secp256k1, rng[])[]
|
||||
node2 = WakuNode.new(nodeKey2, ValidIpAddress.init("0.0.0.0"),
|
||||
Port(60002))
|
||||
contentTopic = ContentTopic("/waku/2/default-content/proto")
|
||||
message = WakuMessage(payload: "hello world".toBytes(), contentTopic: contentTopic)
|
||||
|
||||
await node1.start()
|
||||
await node1.mountStore(persistMessages = true)
|
||||
await node2.start()
|
||||
await node2.mountStore(persistMessages = true)
|
||||
|
||||
await node2.wakuStore.handleMessage("/waku/2/default-waku/proto", message)
|
||||
|
||||
await sleepAsync(2000.millis)
|
||||
|
||||
node1.wakuStore.setPeer(node2.switch.peerInfo.toRemotePeerInfo())
|
||||
|
||||
await node1.resume()
|
||||
|
||||
check:
|
||||
# message is correctly stored
|
||||
node1.wakuStore.messages.len == 1
|
||||
|
||||
await node1.stop()
|
||||
await node2.stop()
|
||||
|
||||
asyncTest "Resume proc discards duplicate messages":
|
||||
let
|
||||
nodeKey1 = crypto.PrivateKey.random(Secp256k1, rng[])[]
|
||||
node1 = WakuNode.new(nodeKey1, ValidIpAddress.init("0.0.0.0"),
|
||||
Port(60000))
|
||||
nodeKey2 = crypto.PrivateKey.random(Secp256k1, rng[])[]
|
||||
node2 = WakuNode.new(nodeKey2, ValidIpAddress.init("0.0.0.0"),
|
||||
Port(60002))
|
||||
contentTopic = ContentTopic("/waku/2/default-content/proto")
|
||||
msg1 = WakuMessage(payload: "hello world1".toBytes(), contentTopic: contentTopic, timestamp: 1)
|
||||
msg2 = WakuMessage(payload: "hello world2".toBytes(), contentTopic: contentTopic, timestamp: 2)
|
||||
|
||||
# setup sqlite database for node1
|
||||
let
|
||||
database = SqliteDatabase.init("", inMemory = true)[]
|
||||
store = WakuMessageStore.init(database)[]
|
||||
|
||||
|
||||
var completionFut = newFuture[bool]()
|
||||
|
||||
await node1.start()
|
||||
await node1.mountStore(persistMessages = true, store = store)
|
||||
await node2.start()
|
||||
await node2.mountStore(persistMessages = true)
|
||||
|
||||
await node2.wakuStore.handleMessage(DefaultTopic, msg1)
|
||||
await node2.wakuStore.handleMessage(DefaultTopic, msg2)
|
||||
|
||||
await sleepAsync(2000.millis)
|
||||
|
||||
node1.wakuStore.setPeer(node2.switch.peerInfo.toRemotePeerInfo())
|
||||
|
||||
|
||||
# populate db with msg1 to be a duplicate
|
||||
let index1 = Index.compute(msg1, getNanosecondTime(epochTime()), DefaultTopic)
|
||||
let output1 = store.put(index1, msg1, DefaultTopic)
|
||||
check output1.isOk
|
||||
discard node1.wakuStore.messages.put(index1, msg1, DefaultTopic)
|
||||
|
||||
# now run the resume proc
|
||||
await node1.resume()
|
||||
|
||||
# count the total number of retrieved messages from the database
|
||||
let res = store.getAllMessages()
|
||||
check:
|
||||
res.isOk()
|
||||
|
||||
check:
|
||||
# if the duplicates are discarded properly, then the total number of messages after resume should be 2
|
||||
# check no duplicates is in the messages field
|
||||
node1.wakuStore.messages.len == 2
|
||||
# check no duplicates is in the db
|
||||
res.value.len == 2
|
||||
|
||||
await node1.stop()
|
||||
await node2.stop()
|
||||
|
||||
asyncTest "Maximum connections can be configured":
|
||||
let
|
||||
maxConnections = 2
|
||||
|
216
tests/v2/test_wakunode_store.nim
Normal file
216
tests/v2/test_wakunode_store.nim
Normal file
@ -0,0 +1,216 @@
|
||||
{.used.}
|
||||
|
||||
import
|
||||
stew/byteutils,
|
||||
stew/shims/net as stewNet,
|
||||
testutils/unittests,
|
||||
chronicles,
|
||||
chronos,
|
||||
libp2p/crypto/crypto,
|
||||
libp2p/peerid,
|
||||
libp2p/multiaddress,
|
||||
libp2p/switch,
|
||||
libp2p/protocols/pubsub/rpc/messages,
|
||||
libp2p/protocols/pubsub/pubsub,
|
||||
libp2p/protocols/pubsub/gossipsub,
|
||||
eth/keys
|
||||
import
|
||||
../../waku/v2/node/storage/sqlite,
|
||||
../../waku/v2/node/storage/message/waku_message_store,
|
||||
../../waku/v2/node/storage/message/waku_store_queue,
|
||||
../../waku/v2/protocol/[waku_relay, waku_message],
|
||||
../../waku/v2/protocol/waku_store,
|
||||
../../waku/v2/protocol/waku_filter,
|
||||
../../waku/v2/node/peer_manager/peer_manager,
|
||||
../../waku/v2/utils/peers,
|
||||
../../waku/v2/utils/pagination,
|
||||
../../waku/v2/utils/time,
|
||||
../../waku/v2/node/wakunode2
|
||||
|
||||
from std/times import epochTime
|
||||
|
||||
|
||||
procSuite "WakuNode - Store":
|
||||
let rng = keys.newRng()
|
||||
|
||||
asyncTest "Store protocol returns expected message":
|
||||
let
|
||||
nodeKey1 = crypto.PrivateKey.random(Secp256k1, rng[])[]
|
||||
node1 = WakuNode.new(nodeKey1, ValidIpAddress.init("0.0.0.0"),
|
||||
Port(60000))
|
||||
nodeKey2 = crypto.PrivateKey.random(Secp256k1, rng[])[]
|
||||
node2 = WakuNode.new(nodeKey2, ValidIpAddress.init("0.0.0.0"),
|
||||
Port(60002))
|
||||
contentTopic = ContentTopic("/waku/2/default-content/proto")
|
||||
message = WakuMessage(payload: "hello world".toBytes(), contentTopic: contentTopic)
|
||||
|
||||
var completionFut = newFuture[bool]()
|
||||
|
||||
await node1.start()
|
||||
await node1.mountStore(persistMessages = true)
|
||||
await node2.start()
|
||||
await node2.mountStore(persistMessages = true)
|
||||
|
||||
await node2.wakuStore.handleMessage("/waku/2/default-waku/proto", message)
|
||||
|
||||
await sleepAsync(500.millis)
|
||||
|
||||
node1.wakuStore.setPeer(node2.switch.peerInfo.toRemotePeerInfo())
|
||||
|
||||
proc storeHandler(response: HistoryResponse) {.gcsafe, closure.} =
|
||||
check:
|
||||
response.messages[0] == message
|
||||
completionFut.complete(true)
|
||||
|
||||
await node1.query(HistoryQuery(contentFilters: @[HistoryContentFilter(contentTopic: contentTopic)]), storeHandler)
|
||||
|
||||
check:
|
||||
(await completionFut.withTimeout(5.seconds)) == true
|
||||
await node1.stop()
|
||||
await node2.stop()
|
||||
|
||||
asyncTest "Store protocol returns expected message when relay is disabled and filter enabled":
|
||||
# See nwaku issue #937: 'Store: ability to decouple store from relay'
|
||||
|
||||
let
|
||||
nodeKey1 = crypto.PrivateKey.random(Secp256k1, rng[])[]
|
||||
node1 = WakuNode.new(nodeKey1, ValidIpAddress.init("0.0.0.0"), Port(60000))
|
||||
nodeKey2 = crypto.PrivateKey.random(Secp256k1, rng[])[]
|
||||
node2 = WakuNode.new(nodeKey2, ValidIpAddress.init("0.0.0.0"), Port(60002))
|
||||
|
||||
let
|
||||
pubSubTopic = "/waku/2/default-waku/proto"
|
||||
contentTopic = ContentTopic("/waku/2/default-content/proto")
|
||||
message = WakuMessage(payload: "hello world".toBytes(), contentTopic: contentTopic)
|
||||
|
||||
let
|
||||
filterComplFut = newFuture[bool]()
|
||||
storeComplFut = newFuture[bool]()
|
||||
|
||||
await node1.start()
|
||||
await node1.mountStore(persistMessages = true)
|
||||
await node1.mountFilter()
|
||||
|
||||
await node2.start()
|
||||
await node2.mountStore(persistMessages = true)
|
||||
await node2.mountFilter()
|
||||
|
||||
node2.wakuFilter.setPeer(node1.switch.peerInfo.toRemotePeerInfo())
|
||||
node1.wakuStore.setPeer(node2.switch.peerInfo.toRemotePeerInfo())
|
||||
|
||||
proc filterReqHandler(msg: WakuMessage) {.gcsafe, closure.} =
|
||||
check:
|
||||
msg == message
|
||||
filterComplFut.complete(true)
|
||||
|
||||
await node2.subscribe(FilterRequest(pubSubTopic: pubSubTopic, contentFilters: @[ContentFilter(contentTopic: contentTopic)], subscribe: true), filterReqHandler)
|
||||
|
||||
await sleepAsync(500.millis)
|
||||
|
||||
# Send filter push message to node2
|
||||
await node1.wakuFilter.handleMessage(pubSubTopic, message)
|
||||
|
||||
await sleepAsync(500.millis)
|
||||
|
||||
# Wait for the node2 filter to receive the push message
|
||||
check:
|
||||
(await filterComplFut.withTimeout(5.seconds)) == true
|
||||
|
||||
proc node1StoreQueryRespHandler(response: HistoryResponse) {.gcsafe, closure.} =
|
||||
check:
|
||||
response.messages.len == 1
|
||||
response.messages[0] == message
|
||||
storeComplFut.complete(true)
|
||||
|
||||
await node1.query(HistoryQuery(contentFilters: @[HistoryContentFilter(contentTopic: contentTopic)]), node1StoreQueryRespHandler)
|
||||
|
||||
check:
|
||||
(await storeComplFut.withTimeout(5.seconds)) == true
|
||||
|
||||
await node1.stop()
|
||||
await node2.stop()
|
||||
|
||||
asyncTest "Resume proc fetches the history":
|
||||
let
|
||||
nodeKey1 = crypto.PrivateKey.random(Secp256k1, rng[])[]
|
||||
node1 = WakuNode.new(nodeKey1, ValidIpAddress.init("0.0.0.0"), Port(60000))
|
||||
nodeKey2 = crypto.PrivateKey.random(Secp256k1, rng[])[]
|
||||
node2 = WakuNode.new(nodeKey2, ValidIpAddress.init("0.0.0.0"), Port(60002))
|
||||
|
||||
let
|
||||
contentTopic = ContentTopic("/waku/2/default-content/proto")
|
||||
message = WakuMessage(payload: "hello world".toBytes(), contentTopic: contentTopic)
|
||||
|
||||
await node1.start()
|
||||
await node1.mountStore(persistMessages = true)
|
||||
await node2.start()
|
||||
await node2.mountStore(persistMessages = true)
|
||||
|
||||
await node2.wakuStore.handleMessage("/waku/2/default-waku/proto", message)
|
||||
|
||||
await sleepAsync(500.millis)
|
||||
|
||||
node1.wakuStore.setPeer(node2.switch.peerInfo.toRemotePeerInfo())
|
||||
|
||||
await node1.resume()
|
||||
|
||||
check:
|
||||
# message is correctly stored
|
||||
node1.wakuStore.messages.len == 1
|
||||
|
||||
await node1.stop()
|
||||
await node2.stop()
|
||||
|
||||
asyncTest "Resume proc discards duplicate messages":
|
||||
let
|
||||
nodeKey1 = crypto.PrivateKey.random(Secp256k1, rng[])[]
|
||||
node1 = WakuNode.new(nodeKey1, ValidIpAddress.init("0.0.0.0"), Port(60000))
|
||||
nodeKey2 = crypto.PrivateKey.random(Secp256k1, rng[])[]
|
||||
node2 = WakuNode.new(nodeKey2, ValidIpAddress.init("0.0.0.0"), Port(60002))
|
||||
|
||||
let
|
||||
contentTopic = ContentTopic("/waku/2/default-content/proto")
|
||||
msg1 = WakuMessage(payload: "hello world1".toBytes(), contentTopic: contentTopic, timestamp: 1)
|
||||
msg2 = WakuMessage(payload: "hello world2".toBytes(), contentTopic: contentTopic, timestamp: 2)
|
||||
|
||||
# setup sqlite database for node1
|
||||
let
|
||||
database = SqliteDatabase.init("", inMemory = true)[]
|
||||
store = WakuMessageStore.init(database)[]
|
||||
|
||||
await node1.start()
|
||||
await node1.mountStore(persistMessages = true, store = store)
|
||||
await node2.start()
|
||||
await node2.mountStore(persistMessages = true)
|
||||
|
||||
await node2.wakuStore.handleMessage(DefaultTopic, msg1)
|
||||
await node2.wakuStore.handleMessage(DefaultTopic, msg2)
|
||||
|
||||
await sleepAsync(500.millis)
|
||||
|
||||
node1.wakuStore.setPeer(node2.switch.peerInfo.toRemotePeerInfo())
|
||||
|
||||
|
||||
# populate db with msg1 to be a duplicate
|
||||
let index1 = Index.compute(msg1, getNanosecondTime(epochTime()), DefaultTopic)
|
||||
let output1 = store.put(index1, msg1, DefaultTopic)
|
||||
check output1.isOk
|
||||
discard node1.wakuStore.messages.put(index1, msg1, DefaultTopic)
|
||||
|
||||
# now run the resume proc
|
||||
await node1.resume()
|
||||
|
||||
# count the total number of retrieved messages from the database
|
||||
let res = store.getAllMessages()
|
||||
check:
|
||||
res.isOk()
|
||||
|
||||
check:
|
||||
# if the duplicates are discarded properly, then the total number of messages after resume should be 2
|
||||
# check no duplicates is in the messages field
|
||||
node1.wakuStore.messages.len == 2
|
||||
# check no duplicates is in the db
|
||||
res.value.len == 2
|
||||
|
||||
await node1.stop()
|
||||
await node2.stop()
|
Loading…
x
Reference in New Issue
Block a user