mirror of
https://github.com/waku-org/nwaku.git
synced 2025-01-18 02:41:47 +00:00
f54ba10bc7
* queue driver refactor (#2753) * chore(archive): archive refactor (#2752) * chore(archive): sqlite driver refactor (#2754) * chore(archive): postgres driver refactor (#2755) * chore(archive): renaming & copies (#2751) * posgres legacy: stop using the storedAt field * migration script 6: we still need the id column The id column is needed because it contains the message digest which is used in store v2, and we need to keep support to store v2 for a while * legacy archive: set target migration version to 6 * waku_node: try to use wakuLegacyArchive if wakuArchive is nil * node_factory, waku_node: mount legacy and future store simultaneously We want the nwaku node to simultaneously support store-v2 requests and store-v3 requests. Only the legacy archive is in charge of archiving messages, and the archived information is suitable to fulfill both store-v2 and store-v3 needs. * postgres_driver: adding temporary code until store-v2 is removed --------- Co-authored-by: Ivan FB <128452529+Ivansete-status@users.noreply.github.com> Co-authored-by: gabrielmer <101006718+gabrielmer@users.noreply.github.com> Co-authored-by: Ivan Folgueira Bande <ivansete@status.im>
343 lines
10 KiB
Nim
343 lines
10 KiB
Nim
{.used.}
|
|
|
|
when defined(waku_exp_store_resume):
|
|
# TODO: Review store resume test cases (#1282)
|
|
# Ongoing changes to test code base had ruin this test meanwhile, need to investigate and fix
|
|
|
|
import
|
|
std/[options, tables, sets],
|
|
testutils/unittests,
|
|
chronos,
|
|
chronicles,
|
|
libp2p/crypto/crypto
|
|
import
|
|
waku/[
|
|
common/databases/db_sqlite,
|
|
waku_archive_legacy/driver,
|
|
waku_archive_legacy/driver/sqlite_driver/sqlite_driver,
|
|
node/peer_manager,
|
|
waku_core,
|
|
waku_core/message/digest,
|
|
waku_store_legacy,
|
|
],
|
|
../waku_store_legacy/store_utils,
|
|
../waku_archive_legacy/archive_utils,
|
|
./testlib/common,
|
|
./testlib/switch
|
|
|
|
procSuite "Waku Store - resume store":
|
|
## Fixtures
|
|
let storeA = block:
|
|
let store = newTestMessageStore()
|
|
let msgList =
|
|
@[
|
|
fakeWakuMessage(
|
|
payload = @[byte 0], contentTopic = ContentTopic("2"), ts = ts(0)
|
|
),
|
|
fakeWakuMessage(
|
|
payload = @[byte 1], contentTopic = ContentTopic("1"), ts = ts(1)
|
|
),
|
|
fakeWakuMessage(
|
|
payload = @[byte 2], contentTopic = ContentTopic("2"), ts = ts(2)
|
|
),
|
|
fakeWakuMessage(
|
|
payload = @[byte 3], contentTopic = ContentTopic("1"), ts = ts(3)
|
|
),
|
|
fakeWakuMessage(
|
|
payload = @[byte 4], contentTopic = ContentTopic("2"), ts = ts(4)
|
|
),
|
|
fakeWakuMessage(
|
|
payload = @[byte 5], contentTopic = ContentTopic("1"), ts = ts(5)
|
|
),
|
|
fakeWakuMessage(
|
|
payload = @[byte 6], contentTopic = ContentTopic("2"), ts = ts(6)
|
|
),
|
|
fakeWakuMessage(
|
|
payload = @[byte 7], contentTopic = ContentTopic("1"), ts = ts(7)
|
|
),
|
|
fakeWakuMessage(
|
|
payload = @[byte 8], contentTopic = ContentTopic("2"), ts = ts(8)
|
|
),
|
|
fakeWakuMessage(
|
|
payload = @[byte 9], contentTopic = ContentTopic("1"), ts = ts(9)
|
|
),
|
|
]
|
|
|
|
for msg in msgList:
|
|
require store
|
|
.put(
|
|
DefaultPubsubTopic,
|
|
msg,
|
|
computeDigest(msg),
|
|
computeMessageHash(DefaultPubsubTopic, msg),
|
|
msg.timestamp,
|
|
)
|
|
.isOk()
|
|
|
|
store
|
|
|
|
let storeB = block:
|
|
let store = newTestMessageStore()
|
|
let msgList2 =
|
|
@[
|
|
fakeWakuMessage(
|
|
payload = @[byte 0], contentTopic = ContentTopic("2"), ts = ts(0)
|
|
),
|
|
fakeWakuMessage(
|
|
payload = @[byte 11], contentTopic = ContentTopic("1"), ts = ts(1)
|
|
),
|
|
fakeWakuMessage(
|
|
payload = @[byte 12], contentTopic = ContentTopic("2"), ts = ts(2)
|
|
),
|
|
fakeWakuMessage(
|
|
payload = @[byte 3], contentTopic = ContentTopic("1"), ts = ts(3)
|
|
),
|
|
fakeWakuMessage(
|
|
payload = @[byte 4], contentTopic = ContentTopic("2"), ts = ts(4)
|
|
),
|
|
fakeWakuMessage(
|
|
payload = @[byte 5], contentTopic = ContentTopic("1"), ts = ts(5)
|
|
),
|
|
fakeWakuMessage(
|
|
payload = @[byte 13], contentTopic = ContentTopic("2"), ts = ts(6)
|
|
),
|
|
fakeWakuMessage(
|
|
payload = @[byte 14], contentTopic = ContentTopic("1"), ts = ts(7)
|
|
),
|
|
]
|
|
|
|
for msg in msgList2:
|
|
require store
|
|
.put(
|
|
DefaultPubsubTopic,
|
|
msg,
|
|
computeDigest(msg),
|
|
computeMessageHash(DefaultPubsubTopic, msg),
|
|
msg.timestamp,
|
|
)
|
|
.isOk()
|
|
|
|
store
|
|
|
|
asyncTest "multiple query to multiple peers with pagination":
|
|
## Setup
|
|
let
|
|
serverSwitchA = newTestSwitch()
|
|
serverSwitchB = newTestSwitch()
|
|
clientSwitch = newTestSwitch()
|
|
|
|
await allFutures(
|
|
serverSwitchA.start(), serverSwitchB.start(), clientSwitch.start()
|
|
)
|
|
|
|
let
|
|
serverA = await newTestWakuStoreNode(serverSwitchA, store = testStore)
|
|
serverB = await newTestWakuStoreNode(serverSwitchB, store = testStore)
|
|
client = newTestWakuStoreClient(clientSwitch)
|
|
|
|
## Given
|
|
let peers =
|
|
@[
|
|
serverSwitchA.peerInfo.toRemotePeerInfo(),
|
|
serverSwitchB.peerInfo.toRemotePeerInfo(),
|
|
]
|
|
let req = HistoryQuery(contentTopics: @[DefaultContentTopic], pageSize: 5)
|
|
|
|
## When
|
|
let res = await client.queryLoop(req, peers)
|
|
|
|
## Then
|
|
check:
|
|
res.isOk()
|
|
|
|
let response = res.tryGet()
|
|
check:
|
|
response.len == 10
|
|
|
|
## Cleanup
|
|
await allFutures(clientSwitch.stop(), serverSwitchA.stop(), serverSwitchB.stop())
|
|
|
|
asyncTest "resume message history":
|
|
## Setup
|
|
let
|
|
serverSwitch = newTestSwitch()
|
|
clientSwitch = newTestSwitch()
|
|
|
|
await allFutures(serverSwitch.start(), clientSwitch.start())
|
|
|
|
let
|
|
server = await newTestWakuStore(serverSwitch, store = storeA)
|
|
client = await newTestWakuStore(clientSwitch)
|
|
|
|
client.setPeer(serverSwitch.peerInfo.toRemotePeerInfo())
|
|
|
|
## When
|
|
let res = await client.resume()
|
|
|
|
## Then
|
|
check res.isOk()
|
|
|
|
let resumedMessagesCount = res.tryGet()
|
|
let storedMessagesCount = client.store.getMessagesCount().tryGet()
|
|
check:
|
|
resumedMessagesCount == 10
|
|
storedMessagesCount == 10
|
|
|
|
## Cleanup
|
|
await allFutures(clientSwitch.stop(), serverSwitch.stop())
|
|
|
|
asyncTest "resume history from a list of candidates - offline peer":
|
|
## Setup
|
|
let
|
|
clientSwitch = newTestSwitch()
|
|
offlineSwitch = newTestSwitch()
|
|
|
|
await clientSwitch.start()
|
|
|
|
let client = await newTestWakuStore(clientSwitch)
|
|
|
|
## Given
|
|
let peers = @[offlineSwitch.peerInfo.toRemotePeerInfo()]
|
|
|
|
## When
|
|
let res = await client.resume(some(peers))
|
|
|
|
## Then
|
|
check res.isErr()
|
|
|
|
## Cleanup
|
|
await clientSwitch.stop()
|
|
|
|
asyncTest "resume history from a list of candidates - online and offline peers":
|
|
## Setup
|
|
let
|
|
offlineSwitch = newTestSwitch()
|
|
serverASwitch = newTestSwitch()
|
|
serverBSwitch = newTestSwitch()
|
|
clientSwitch = newTestSwitch()
|
|
|
|
await allFutures(
|
|
serverASwitch.start(), serverBSwitch.start(), clientSwitch.start()
|
|
)
|
|
|
|
let
|
|
serverA = await newTestWakuStore(serverASwitch, store = storeA)
|
|
serverB = await newTestWakuStore(serverBSwitch, store = storeB)
|
|
client = await newTestWakuStore(clientSwitch)
|
|
|
|
## Given
|
|
let peers =
|
|
@[
|
|
offlineSwitch.peerInfo.toRemotePeerInfo(),
|
|
serverASwitch.peerInfo.toRemotePeerInfo(),
|
|
serverBSwitch.peerInfo.toRemotePeerInfo(),
|
|
]
|
|
|
|
## When
|
|
let res = await client.resume(some(peers))
|
|
|
|
## Then
|
|
# `client` is expected to retrieve 14 messages:
|
|
# - The store mounted on `serverB` holds 10 messages (see `storeA` fixture)
|
|
# - The store mounted on `serverB` holds 7 messages (see `storeB` fixture)
|
|
# Both stores share 3 messages, resulting in 14 unique messages in total
|
|
check res.isOk()
|
|
|
|
let restoredMessagesCount = res.tryGet()
|
|
let storedMessagesCount = client.store.getMessagesCount().tryGet()
|
|
check:
|
|
restoredMessagesCount == 14
|
|
storedMessagesCount == 14
|
|
|
|
## Cleanup
|
|
await allFutures(serverASwitch.stop(), serverBSwitch.stop(), clientSwitch.stop())
|
|
|
|
suite "WakuNode - waku store":
|
|
asyncTest "Resume proc fetches the history":
|
|
## Setup
|
|
let
|
|
serverKey = generateSecp256k1Key()
|
|
server = newTestWakuNode(serverKey, parseIpAddress("0.0.0.0"), Port(0))
|
|
clientKey = generateSecp256k1Key()
|
|
client = newTestWakuNode(clientKey, parseIpAddress("0.0.0.0"), Port(0))
|
|
|
|
await allFutures(client.start(), server.start())
|
|
|
|
let driver = newSqliteArchiveDriver()
|
|
server.mountArchive(some(driver), none(MessageValidator), none(RetentionPolicy))
|
|
await server.mountStore()
|
|
|
|
let clientStore = StoreQueueRef.new()
|
|
await client.mountStore(store = clientStore)
|
|
client.mountStoreClient(store = clientStore)
|
|
|
|
## Given
|
|
let message = fakeWakuMessage()
|
|
require server.wakuStore.store.put(DefaultPubsubTopic, message).isOk()
|
|
|
|
let serverPeer = server.peerInfo.toRemotePeerInfo()
|
|
|
|
## When
|
|
await client.resume(some(@[serverPeer]))
|
|
|
|
# Then
|
|
check:
|
|
client.wakuStore.store.getMessagesCount().tryGet() == 1
|
|
|
|
## Cleanup
|
|
await allFutures(client.stop(), server.stop())
|
|
|
|
asyncTest "Resume proc discards duplicate messages":
|
|
## Setup
|
|
let
|
|
serverKey = generateSecp256k1Key()
|
|
server = newTestWakuNode(serverKey, parseIpAddress("0.0.0.0"), Port(0))
|
|
clientKey = generateSecp256k1Key()
|
|
client = newTestWakuNode(clientKey, parseIpAddress("0.0.0.0"), Port(0))
|
|
|
|
await allFutures(server.start(), client.start())
|
|
await server.mountStore(store = StoreQueueRef.new())
|
|
|
|
let clientStore = StoreQueueRef.new()
|
|
await client.mountStore(store = clientStore)
|
|
client.mountStoreClient(store = clientStore)
|
|
|
|
## Given
|
|
let timeOrigin = now()
|
|
let
|
|
msg1 = fakeWakuMessage(
|
|
payload = "hello world1", ts = (timeOrigin + getNanoSecondTime(1))
|
|
)
|
|
msg2 = fakeWakuMessage(
|
|
payload = "hello world2", ts = (timeOrigin + getNanoSecondTime(2))
|
|
)
|
|
msg3 = fakeWakuMessage(
|
|
payload = "hello world3", ts = (timeOrigin + getNanoSecondTime(3))
|
|
)
|
|
|
|
require server.wakuStore.store.put(DefaultPubsubTopic, msg1).isOk()
|
|
require server.wakuStore.store.put(DefaultPubsubTopic, msg2).isOk()
|
|
|
|
# Insert the same message in both node's store
|
|
let
|
|
receivedTime3 = now() + getNanosecondTime(10)
|
|
digest3 = computeDigest(msg3)
|
|
require server.wakuStore.store
|
|
.put(DefaultPubsubTopic, msg3, digest3, receivedTime3)
|
|
.isOk()
|
|
require client.wakuStore.store
|
|
.put(DefaultPubsubTopic, msg3, digest3, receivedTime3)
|
|
.isOk()
|
|
|
|
let serverPeer = server.peerInfo.toRemotePeerInfo()
|
|
|
|
## When
|
|
await client.resume(some(@[serverPeer]))
|
|
|
|
## Then
|
|
check:
|
|
# If the duplicates are discarded properly, then the total number of messages after resume should be 3
|
|
client.wakuStore.store.getMessagesCount().tryGet() == 3
|
|
|
|
await allFutures(client.stop(), server.stop())
|