nwaku/tests/v2/waku_store/test_resume.nim

290 lines
9.3 KiB
Nim

{.used.}
import
std/[options, tables, sets],
testutils/unittests,
chronos,
chronicles,
libp2p/crypto/crypto
import
../../waku/common/sqlite,
../../waku/v2/node/message_store/sqlite_store,
../../waku/v2/node/peer_manager,
../../waku/v2/protocol/waku_message,
../../waku/v2/protocol/waku_store,
./testlib/common,
./testlib/switch
proc newTestDatabase(): SqliteDatabase =
SqliteDatabase.new("memory:").tryGet()
proc newTestArchiveDriver(): ArchiveDriver =
let database = SqliteDatabase.new(":memory:").tryGet()
SqliteDriver.init(database).tryGet()
proc newTestWakuStore(switch: Switch, store=newTestMessageStore()): Future[WakuStore] {.async.} =
let
peerManager = PeerManager.new(switch)
proto = WakuStore.init(peerManager, rng, store)
await proto.start()
switch.mount(proto)
return proto
proc newTestWakuStoreClient(switch: Switch, store: MessageStore = nil): WakuStoreClient =
let
peerManager = PeerManager.new(switch)
WakuStoreClient.new(peerManager, rng, store)
procSuite "Waku Store - resume store":
## Fixtures
let storeA = block:
let store = newTestMessageStore()
let msgList = @[
fakeWakuMessage(payload= @[byte 0], contentTopic=ContentTopic("2"), ts=ts(0)),
fakeWakuMessage(payload= @[byte 1], contentTopic=ContentTopic("1"), ts=ts(1)),
fakeWakuMessage(payload= @[byte 2], contentTopic=ContentTopic("2"), ts=ts(2)),
fakeWakuMessage(payload= @[byte 3], contentTopic=ContentTopic("1"), ts=ts(3)),
fakeWakuMessage(payload= @[byte 4], contentTopic=ContentTopic("2"), ts=ts(4)),
fakeWakuMessage(payload= @[byte 5], contentTopic=ContentTopic("1"), ts=ts(5)),
fakeWakuMessage(payload= @[byte 6], contentTopic=ContentTopic("2"), ts=ts(6)),
fakeWakuMessage(payload= @[byte 7], contentTopic=ContentTopic("1"), ts=ts(7)),
fakeWakuMessage(payload= @[byte 8], contentTopic=ContentTopic("2"), ts=ts(8)),
fakeWakuMessage(payload= @[byte 9], contentTopic=ContentTopic("1"), ts=ts(9))
]
for msg in msgList:
require store.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp).isOk()
store
let storeB = block:
let store = newTestMessageStore()
let msgList2 = @[
fakeWakuMessage(payload= @[byte 0], contentTopic=ContentTopic("2"), ts=ts(0)),
fakeWakuMessage(payload= @[byte 11], contentTopic=ContentTopic("1"), ts=ts(1)),
fakeWakuMessage(payload= @[byte 12], contentTopic=ContentTopic("2"), ts=ts(2)),
fakeWakuMessage(payload= @[byte 3], contentTopic=ContentTopic("1"), ts=ts(3)),
fakeWakuMessage(payload= @[byte 4], contentTopic=ContentTopic("2"), ts=ts(4)),
fakeWakuMessage(payload= @[byte 5], contentTopic=ContentTopic("1"), ts=ts(5)),
fakeWakuMessage(payload= @[byte 13], contentTopic=ContentTopic("2"), ts=ts(6)),
fakeWakuMessage(payload= @[byte 14], contentTopic=ContentTopic("1"), ts=ts(7))
]
for msg in msgList2:
require store.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp).isOk()
store
asyncTest "multiple query to multiple peers with pagination":
## Setup
let
serverSwitchA = newTestSwitch()
serverSwitchB = newTestSwitch()
clientSwitch = newTestSwitch()
await allFutures(serverSwitchA.start(), serverSwitchB.start(), clientSwitch.start())
let
serverA = await newTestWakuStoreNode(serverSwitchA, store=testStore)
serverB = await newTestWakuStoreNode(serverSwitchB, store=testStore)
client = newTestWakuStoreClient(clientSwitch)
## Given
let peers = @[
serverSwitchA.peerInfo.toRemotePeerInfo(),
serverSwitchB.peerInfo.toRemotePeerInfo()
]
let req = HistoryQuery(contentTopics: @[DefaultContentTopic], pageSize: 5)
## When
let res = await client.queryLoop(req, peers)
## Then
check:
res.isOk()
let response = res.tryGet()
check:
response.len == 10
## Cleanup
await allFutures(clientSwitch.stop(), serverSwitchA.stop(), serverSwitchB.stop())
asyncTest "resume message history":
## Setup
let
serverSwitch = newTestSwitch()
clientSwitch = newTestSwitch()
await allFutures(serverSwitch.start(), clientSwitch.start())
let
server = await newTestWakuStore(serverSwitch, store=storeA)
client = await newTestWakuStore(clientSwitch)
client.setPeer(serverSwitch.peerInfo.toRemotePeerInfo())
## When
let res = await client.resume()
## Then
check res.isOk()
let resumedMessagesCount = res.tryGet()
let storedMessagesCount = client.store.getMessagesCount().tryGet()
check:
resumedMessagesCount == 10
storedMessagesCount == 10
## Cleanup
await allFutures(clientSwitch.stop(), serverSwitch.stop())
asyncTest "resume history from a list of candidates - offline peer":
## Setup
let
clientSwitch = newTestSwitch()
offlineSwitch = newTestSwitch()
await clientSwitch.start()
let client = await newTestWakuStore(clientSwitch)
## Given
let peers = @[offlineSwitch.peerInfo.toRemotePeerInfo()]
## When
let res = await client.resume(some(peers))
## Then
check res.isErr()
## Cleanup
await clientSwitch.stop()
asyncTest "resume history from a list of candidates - online and offline peers":
## Setup
let
offlineSwitch = newTestSwitch()
serverASwitch = newTestSwitch()
serverBSwitch = newTestSwitch()
clientSwitch = newTestSwitch()
await allFutures(serverASwitch.start(), serverBSwitch.start(), clientSwitch.start())
let
serverA = await newTestWakuStore(serverASwitch, store=storeA)
serverB = await newTestWakuStore(serverBSwitch, store=storeB)
client = await newTestWakuStore(clientSwitch)
## Given
let peers = @[
offlineSwitch.peerInfo.toRemotePeerInfo(),
serverASwitch.peerInfo.toRemotePeerInfo(),
serverBSwitch.peerInfo.toRemotePeerInfo()
]
## When
let res = await client.resume(some(peers))
## Then
# `client` is expected to retrieve 14 messages:
# - The store mounted on `serverB` holds 10 messages (see `storeA` fixture)
# - The store mounted on `serverB` holds 7 messages (see `storeB` fixture)
# Both stores share 3 messages, resulting in 14 unique messages in total
check res.isOk()
let restoredMessagesCount = res.tryGet()
let storedMessagesCount = client.store.getMessagesCount().tryGet()
check:
restoredMessagesCount == 14
storedMessagesCount == 14
## Cleanup
await allFutures(serverASwitch.stop(), serverBSwitch.stop(), clientSwitch.stop())
suite "WakuNode - waku store":
asyncTest "Resume proc fetches the history":
## Setup
let
serverKey = generateSecp256k1Key()
server = WakuNode.new(serverKey, ValidIpAddress.init("0.0.0.0"), Port(0))
clientKey = generateSecp256k1Key()
client = WakuNode.new(clientKey, ValidIpAddress.init("0.0.0.0"), Port(0))
await allFutures(client.start(), server.start())
let driver = newTestArchiveDriver()
server.mountArchive(some(driver), none(MessageValidator), none(RetentionPolicy))
await server.mountStore()
let clientStore = StoreQueueRef.new()
await client.mountStore(store=clientStore)
client.mountStoreClient(store=clientStore)
## Given
let message = fakeWakuMessage()
require server.wakuStore.store.put(DefaultPubsubTopic, message).isOk()
let serverPeer = server.peerInfo.toRemotePeerInfo()
## When
await client.resume(some(@[serverPeer]))
# Then
check:
client.wakuStore.store.getMessagesCount().tryGet() == 1
## Cleanup
await allFutures(client.stop(), server.stop())
asyncTest "Resume proc discards duplicate messages":
## Setup
let
serverKey = generateSecp256k1Key()
server = WakuNode.new(serverKey, ValidIpAddress.init("0.0.0.0"), Port(0))
clientKey = generateSecp256k1Key()
client = WakuNode.new(clientKey, ValidIpAddress.init("0.0.0.0"), Port(0))
await allFutures(server.start(), client.start())
await server.mountStore(store=StoreQueueRef.new())
let clientStore = StoreQueueRef.new()
await client.mountStore(store=clientStore)
client.mountStoreClient(store=clientStore)
## Given
let timeOrigin = now()
let
msg1 = fakeWakuMessage(payload="hello world1", ts=(timeOrigin + getNanoSecondTime(1)))
msg2 = fakeWakuMessage(payload="hello world2", ts=(timeOrigin + getNanoSecondTime(2)))
msg3 = fakeWakuMessage(payload="hello world3", ts=(timeOrigin + getNanoSecondTime(3)))
require server.wakuStore.store.put(DefaultPubsubTopic, msg1).isOk()
require server.wakuStore.store.put(DefaultPubsubTopic, msg2).isOk()
# Insert the same message in both node's store
let
receivedTime3 = now() + getNanosecondTime(10)
digest3 = computeDigest(msg3)
require server.wakuStore.store.put(DefaultPubsubTopic, msg3, digest3, receivedTime3).isOk()
require client.wakuStore.store.put(DefaultPubsubTopic, msg3, digest3, receivedTime3).isOk()
let serverPeer = server.peerInfo.toRemotePeerInfo()
## When
await client.resume(some(@[serverPeer]))
## Then
check:
# If the duplicates are discarded properly, then the total number of messages after resume should be 3
client.wakuStore.store.getMessagesCount().tryGet() == 3
await allFutures(client.stop(), server.stop())