{.used.} import std/[options, tables, sets], testutils/unittests, chronos, chronicles, libp2p/crypto/crypto import ../../waku/common/sqlite, ../../waku/v2/node/message_store/sqlite_store, ../../waku/v2/node/peer_manager, ../../waku/v2/protocol/waku_message, ../../waku/v2/protocol/waku_store, ./testlib/common, ./testlib/switch proc newTestDatabase(): SqliteDatabase = SqliteDatabase.new("memory:").tryGet() proc newTestArchiveDriver(): ArchiveDriver = let database = SqliteDatabase.new(":memory:").tryGet() SqliteDriver.init(database).tryGet() proc newTestWakuStore(switch: Switch, store=newTestMessageStore()): Future[WakuStore] {.async.} = let peerManager = PeerManager.new(switch) proto = WakuStore.init(peerManager, rng, store) await proto.start() switch.mount(proto) return proto proc newTestWakuStoreClient(switch: Switch, store: MessageStore = nil): WakuStoreClient = let peerManager = PeerManager.new(switch) WakuStoreClient.new(peerManager, rng, store) procSuite "Waku Store - resume store": ## Fixtures let storeA = block: let store = newTestMessageStore() let msgList = @[ fakeWakuMessage(payload= @[byte 0], contentTopic=ContentTopic("2"), ts=ts(0)), fakeWakuMessage(payload= @[byte 1], contentTopic=ContentTopic("1"), ts=ts(1)), fakeWakuMessage(payload= @[byte 2], contentTopic=ContentTopic("2"), ts=ts(2)), fakeWakuMessage(payload= @[byte 3], contentTopic=ContentTopic("1"), ts=ts(3)), fakeWakuMessage(payload= @[byte 4], contentTopic=ContentTopic("2"), ts=ts(4)), fakeWakuMessage(payload= @[byte 5], contentTopic=ContentTopic("1"), ts=ts(5)), fakeWakuMessage(payload= @[byte 6], contentTopic=ContentTopic("2"), ts=ts(6)), fakeWakuMessage(payload= @[byte 7], contentTopic=ContentTopic("1"), ts=ts(7)), fakeWakuMessage(payload= @[byte 8], contentTopic=ContentTopic("2"), ts=ts(8)), fakeWakuMessage(payload= @[byte 9], contentTopic=ContentTopic("1"), ts=ts(9)) ] for msg in msgList: require store.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp).isOk() store let storeB = block: let store = newTestMessageStore() let msgList2 = @[ fakeWakuMessage(payload= @[byte 0], contentTopic=ContentTopic("2"), ts=ts(0)), fakeWakuMessage(payload= @[byte 11], contentTopic=ContentTopic("1"), ts=ts(1)), fakeWakuMessage(payload= @[byte 12], contentTopic=ContentTopic("2"), ts=ts(2)), fakeWakuMessage(payload= @[byte 3], contentTopic=ContentTopic("1"), ts=ts(3)), fakeWakuMessage(payload= @[byte 4], contentTopic=ContentTopic("2"), ts=ts(4)), fakeWakuMessage(payload= @[byte 5], contentTopic=ContentTopic("1"), ts=ts(5)), fakeWakuMessage(payload= @[byte 13], contentTopic=ContentTopic("2"), ts=ts(6)), fakeWakuMessage(payload= @[byte 14], contentTopic=ContentTopic("1"), ts=ts(7)) ] for msg in msgList2: require store.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp).isOk() store asyncTest "multiple query to multiple peers with pagination": ## Setup let serverSwitchA = newTestSwitch() serverSwitchB = newTestSwitch() clientSwitch = newTestSwitch() await allFutures(serverSwitchA.start(), serverSwitchB.start(), clientSwitch.start()) let serverA = await newTestWakuStoreNode(serverSwitchA, store=testStore) serverB = await newTestWakuStoreNode(serverSwitchB, store=testStore) client = newTestWakuStoreClient(clientSwitch) ## Given let peers = @[ serverSwitchA.peerInfo.toRemotePeerInfo(), serverSwitchB.peerInfo.toRemotePeerInfo() ] let req = HistoryQuery(contentTopics: @[DefaultContentTopic], pageSize: 5) ## When let res = await client.queryLoop(req, peers) ## Then check: res.isOk() let response = res.tryGet() check: response.len == 10 ## Cleanup await allFutures(clientSwitch.stop(), serverSwitchA.stop(), serverSwitchB.stop()) asyncTest "resume message history": ## Setup let serverSwitch = newTestSwitch() clientSwitch = newTestSwitch() await allFutures(serverSwitch.start(), clientSwitch.start()) let server = await newTestWakuStore(serverSwitch, store=storeA) client = await newTestWakuStore(clientSwitch) client.setPeer(serverSwitch.peerInfo.toRemotePeerInfo()) ## When let res = await client.resume() ## Then check res.isOk() let resumedMessagesCount = res.tryGet() let storedMessagesCount = client.store.getMessagesCount().tryGet() check: resumedMessagesCount == 10 storedMessagesCount == 10 ## Cleanup await allFutures(clientSwitch.stop(), serverSwitch.stop()) asyncTest "resume history from a list of candidates - offline peer": ## Setup let clientSwitch = newTestSwitch() offlineSwitch = newTestSwitch() await clientSwitch.start() let client = await newTestWakuStore(clientSwitch) ## Given let peers = @[offlineSwitch.peerInfo.toRemotePeerInfo()] ## When let res = await client.resume(some(peers)) ## Then check res.isErr() ## Cleanup await clientSwitch.stop() asyncTest "resume history from a list of candidates - online and offline peers": ## Setup let offlineSwitch = newTestSwitch() serverASwitch = newTestSwitch() serverBSwitch = newTestSwitch() clientSwitch = newTestSwitch() await allFutures(serverASwitch.start(), serverBSwitch.start(), clientSwitch.start()) let serverA = await newTestWakuStore(serverASwitch, store=storeA) serverB = await newTestWakuStore(serverBSwitch, store=storeB) client = await newTestWakuStore(clientSwitch) ## Given let peers = @[ offlineSwitch.peerInfo.toRemotePeerInfo(), serverASwitch.peerInfo.toRemotePeerInfo(), serverBSwitch.peerInfo.toRemotePeerInfo() ] ## When let res = await client.resume(some(peers)) ## Then # `client` is expected to retrieve 14 messages: # - The store mounted on `serverB` holds 10 messages (see `storeA` fixture) # - The store mounted on `serverB` holds 7 messages (see `storeB` fixture) # Both stores share 3 messages, resulting in 14 unique messages in total check res.isOk() let restoredMessagesCount = res.tryGet() let storedMessagesCount = client.store.getMessagesCount().tryGet() check: restoredMessagesCount == 14 storedMessagesCount == 14 ## Cleanup await allFutures(serverASwitch.stop(), serverBSwitch.stop(), clientSwitch.stop()) suite "WakuNode - waku store": asyncTest "Resume proc fetches the history": ## Setup let serverKey = generateSecp256k1Key() server = WakuNode.new(serverKey, ValidIpAddress.init("0.0.0.0"), Port(60412)) clientKey = generateSecp256k1Key() client = WakuNode.new(clientKey, ValidIpAddress.init("0.0.0.0"), Port(60410)) await allFutures(client.start(), server.start()) let driver = newTestArchiveDriver() server.mountArchive(some(driver), none(MessageValidator), none(RetentionPolicy)) await server.mountStore() let clientStore = StoreQueueRef.new() await client.mountStore(store=clientStore) client.mountStoreClient(store=clientStore) ## Given let message = fakeWakuMessage() require server.wakuStore.store.put(DefaultPubsubTopic, message).isOk() let serverPeer = server.peerInfo.toRemotePeerInfo() ## When await client.resume(some(@[serverPeer])) # Then check: client.wakuStore.store.getMessagesCount().tryGet() == 1 ## Cleanup await allFutures(client.stop(), server.stop()) asyncTest "Resume proc discards duplicate messages": ## Setup let serverKey = generateSecp256k1Key() server = WakuNode.new(serverKey, ValidIpAddress.init("0.0.0.0"), Port(60422)) clientKey = generateSecp256k1Key() client = WakuNode.new(clientKey, ValidIpAddress.init("0.0.0.0"), Port(60420)) await allFutures(server.start(), client.start()) await server.mountStore(store=StoreQueueRef.new()) let clientStore = StoreQueueRef.new() await client.mountStore(store=clientStore) client.mountStoreClient(store=clientStore) ## Given let timeOrigin = now() let msg1 = fakeWakuMessage(payload="hello world1", ts=(timeOrigin + getNanoSecondTime(1))) msg2 = fakeWakuMessage(payload="hello world2", ts=(timeOrigin + getNanoSecondTime(2))) msg3 = fakeWakuMessage(payload="hello world3", ts=(timeOrigin + getNanoSecondTime(3))) require server.wakuStore.store.put(DefaultPubsubTopic, msg1).isOk() require server.wakuStore.store.put(DefaultPubsubTopic, msg2).isOk() # Insert the same message in both node's store let receivedTime3 = now() + getNanosecondTime(10) digest3 = computeDigest(msg3) require server.wakuStore.store.put(DefaultPubsubTopic, msg3, digest3, receivedTime3).isOk() require client.wakuStore.store.put(DefaultPubsubTopic, msg3, digest3, receivedTime3).isOk() let serverPeer = server.peerInfo.toRemotePeerInfo() ## When await client.resume(some(@[serverPeer])) ## Then check: # If the duplicates are discarded properly, then the total number of messages after resume should be 3 client.wakuStore.store.getMessagesCount().tryGet() == 3 await allFutures(client.stop(), server.stop())