From 2c975597e122279b8a02cd42f6efe6eb810f0e90 Mon Sep 17 00:00:00 2001 From: Lorenzo Delgado Date: Wed, 5 Oct 2022 17:58:24 +0200 Subject: [PATCH] fix(store): fix waku store resume tests --- tests/all_tests_v2.nim | 1 + tests/v2/test_waku_store.nim | 342 +++++++----------- tests/v2/test_waku_store_resume.nim | 203 +++++++++++ waku/v2/protocol/waku_store/message_store.nim | 3 +- waku/v2/protocol/waku_store/protocol.nim | 1 + 5 files changed, 339 insertions(+), 211 deletions(-) create mode 100644 tests/v2/test_waku_store_resume.nim diff --git a/tests/all_tests_v2.nim b/tests/all_tests_v2.nim index 4b1f813ef..9227fe6d3 100644 --- a/tests/all_tests_v2.nim +++ b/tests/all_tests_v2.nim @@ -13,6 +13,7 @@ import ./v2/test_message_store_sqlite, ./v2/test_waku_store_rpc_codec, ./v2/test_waku_store, + ./v2/test_waku_store_resume, ./v2/test_wakunode_store, # Waku Filter ./v2/test_waku_filter, diff --git a/tests/v2/test_waku_store.nim b/tests/v2/test_waku_store.nim index 0ae864c88..3a8d3a4de 100644 --- a/tests/v2/test_waku_store.nim +++ b/tests/v2/test_waku_store.nim @@ -49,11 +49,11 @@ proc newTestSwitch(key=none(PrivateKey), address=none(MultiAddress)): Switch = let peerAddr = address.get(MultiAddress.init("/ip4/127.0.0.1/tcp/0").get()) return newStandardSwitch(some(peerKey), addrs=peerAddr) -proc newTestStore(): MessageStore = +proc newTestMessageStore(): MessageStore = let database = newTestDatabase() SqliteStore.init(database).tryGet() -proc newTestWakuStore(switch: Switch, store=newTestStore()): WakuStore = +proc newTestWakuStore(switch: Switch, store=newTestMessageStore()): WakuStore = let peerManager = PeerManager.new(switch) rng = crypto.newRng() @@ -64,8 +64,29 @@ proc newTestWakuStore(switch: Switch, store=newTestStore()): WakuStore = return proto -suite "Waku Store - history query": - +procSuite "Waku Store - history query": + ## Fixtures + let storeA = block: + let store = newTestMessageStore() + + let msgList = @[ + fakeWakuMessage(contentTopic=ContentTopic("2"), ts=Timestamp(0)), + fakeWakuMessage(contentTopic=ContentTopic("1"), ts=Timestamp(1)), + fakeWakuMessage(contentTopic=ContentTopic("2"), ts=Timestamp(2)), + fakeWakuMessage(contentTopic=ContentTopic("1"), ts=Timestamp(3)), + fakeWakuMessage(contentTopic=ContentTopic("2"), ts=Timestamp(4)), + fakeWakuMessage(contentTopic=ContentTopic("1"), ts=Timestamp(5)), + fakeWakuMessage(contentTopic=ContentTopic("2"), ts=Timestamp(6)), + fakeWakuMessage(contentTopic=ContentTopic("1"), ts=Timestamp(7)), + fakeWakuMessage(contentTopic=ContentTopic("2"), ts=Timestamp(8)), + fakeWakuMessage(contentTopic=ContentTopic("1"), ts=Timestamp(9)) + ] + + for msg in msgList: + require store.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp).isOk() + + store + asyncTest "handle query": ## Setup let @@ -449,6 +470,113 @@ suite "Waku Store - history query": ## Cleanup await allFutures(clientSwitch.stop(), serverSwitch.stop()) + asyncTest "handle temporal history query with a valid time window": + ## Setup + let + serverSwitch = newTestSwitch() + clientSwitch = newTestSwitch() + + await allFutures(serverSwitch.start(), clientSwitch.start()) + + let + server = newTestWakuStore(serverSwitch, store=storeA) + client = newTestWakuStore(clientSwitch) + + client.setPeer(serverSwitch.peerInfo.toRemotePeerInfo()) + + ## Given + let rpc = HistoryQuery( + contentFilters: @[HistoryContentFilter(contentTopic: ContentTopic("1"))], + startTime: Timestamp(2), + endTime: Timestamp(5) + ) + + ## When + let res = await client.query(rpc) + + ## Then + check res.isOk() + + let response = res.tryGet() + check: + response.messages.len() == 2 + response.messages.anyIt(it.timestamp == Timestamp(3)) + response.messages.anyIt(it.timestamp == Timestamp(5)) + + ## Cleanup + await allFutures(clientSwitch.stop(), serverSwitch.stop()) + + asyncTest "handle temporal history query with a zero-size time window": + # a zero-size window results in an empty list of history messages + ## Setup + let + serverSwitch = newTestSwitch() + clientSwitch = newTestSwitch() + + await allFutures(serverSwitch.start(), clientSwitch.start()) + + let + server = newTestWakuStore(serverSwitch, store=storeA) + client = newTestWakuStore(clientSwitch) + + client.setPeer(serverSwitch.peerInfo.toRemotePeerInfo()) + + ## Given + let rpc = HistoryQuery( + contentFilters: @[HistoryContentFilter(contentTopic: ContentTopic("1"))], + startTime: Timestamp(2), + endTime: Timestamp(2) + ) + + ## When + let res = await client.query(rpc) + + ## Then + check res.isOk() + + let response = res.tryGet() + check: + response.messages.len == 0 + + ## Cleanup + await allFutures(clientSwitch.stop(), serverSwitch.stop()) + + asyncTest "handle temporal history query with an invalid time window": + # A history query with an invalid time range results in an empty list of history messages + ## Setup + let + serverSwitch = newTestSwitch() + clientSwitch = newTestSwitch() + + await allFutures(serverSwitch.start(), clientSwitch.start()) + + let + server = newTestWakuStore(serverSwitch, store=storeA) + client = newTestWakuStore(clientSwitch) + + client.setPeer(serverSwitch.peerInfo.toRemotePeerInfo()) + + ## Given + let rpc = HistoryQuery( + contentFilters: @[HistoryContentFilter(contentTopic: ContentTopic("1"))], + startTime: Timestamp(5), + endTime: Timestamp(2) + ) + + ## When + let res = await client.query(rpc) + + ## Then + check res.isOk() + + let response = res.tryGet() + check: + response.messages.len == 0 + + ## Cleanup + await allFutures(clientSwitch.stop(), serverSwitch.stop()) + + suite "Waku Store - message handling": asyncTest "it should store a valid and non-ephemeral message": @@ -562,209 +690,3 @@ suite "Waku Store - message handling": ## Cleanup await switch.stop() - - -# TODO: Review this test suite test cases -procSuite "Waku Store - fault tolerant store": - - proc newTestWakuStore(peer=none(RemotePeerInfo)): Future[(Switch, Switch, WakuStore)] {.async.} = - let - key = PrivateKey.random(ECDSA, rng[]).get() - listenSwitch = newStandardSwitch(some(key)) - await listenSwitch.start() - - let dialSwitch = newStandardSwitch() - await dialSwitch.start() - - let - peerManager = PeerManager.new(dialsWitch) - rng = crypto.newRng() - database = newTestDatabase() - store = SqliteStore.init(database).tryGet() - proto = WakuStore.init(peerManager, rng, store) - - let storePeer = peer.get(listenSwitch.peerInfo.toRemotePeerInfo()) - proto.setPeer(storePeer) - - await proto.start() - listenSwitch.mount(proto) - - return (listenSwitch, dialSwitch, proto) - - - asyncTest "temporal history queries": - ## Setup - let (listenSwitch, dialSwitch, proto) = await newTestWakuStore() - let msgList = @[ - WakuMessage(payload: @[byte 0], contentTopic: ContentTopic("2"), timestamp: Timestamp(0)), - WakuMessage(payload: @[byte 1], contentTopic: ContentTopic("1"), timestamp: Timestamp(1)), - WakuMessage(payload: @[byte 2], contentTopic: ContentTopic("2"), timestamp: Timestamp(2)), - WakuMessage(payload: @[byte 3], contentTopic: ContentTopic("1"), timestamp: Timestamp(3)), - WakuMessage(payload: @[byte 4], contentTopic: ContentTopic("2"), timestamp: Timestamp(4)), - WakuMessage(payload: @[byte 5], contentTopic: ContentTopic("1"), timestamp: Timestamp(5)), - WakuMessage(payload: @[byte 6], contentTopic: ContentTopic("2"), timestamp: Timestamp(6)), - WakuMessage(payload: @[byte 7], contentTopic: ContentTopic("1"), timestamp: Timestamp(7)), - WakuMessage(payload: @[byte 8], contentTopic: ContentTopic("2"), timestamp: Timestamp(8)), - WakuMessage(payload: @[byte 9], contentTopic: ContentTopic("1"), timestamp: Timestamp(9)) - ] - - for msg in msgList: - require proto.store.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp).isOk() - - let (listenSwitch2, dialSwitch2, proto2) = await newTestWakuStore() - let msgList2 = @[ - WakuMessage(payload: @[byte 0], contentTopic: ContentTopic("2"), timestamp: Timestamp(0)), - WakuMessage(payload: @[byte 11], contentTopic: ContentTopic("1"), timestamp: Timestamp(1)), - WakuMessage(payload: @[byte 12], contentTopic: ContentTopic("2"), timestamp: Timestamp(2)), - WakuMessage(payload: @[byte 3], contentTopic: ContentTopic("1"), timestamp: Timestamp(3)), - WakuMessage(payload: @[byte 4], contentTopic: ContentTopic("2"), timestamp: Timestamp(4)), - WakuMessage(payload: @[byte 5], contentTopic: ContentTopic("1"), timestamp: Timestamp(5)), - WakuMessage(payload: @[byte 13], contentTopic: ContentTopic("2"), timestamp: Timestamp(6)), - WakuMessage(payload: @[byte 14], contentTopic: ContentTopic("1"), timestamp: Timestamp(7)) - ] - - for msg in msgList2: - require proto2.store.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp).isOk() - - - asyncTest "handle temporal history query with a valid time window": - ## Given - let rpc = HistoryQuery( - contentFilters: @[HistoryContentFilter(contentTopic: ContentTopic("1"))], - startTime: Timestamp(2), - endTime: Timestamp(5) - ) - - ## When - let res = await proto.query(rpc) - - ## Then - check res.isOk() - - let response = res.tryGet() - check: - response.messages.len() == 2 - response.messages.anyIt(it.timestamp == Timestamp(3)) - response.messages.anyIt(it.timestamp == Timestamp(5)) - - asyncTest "handle temporal history query with a zero-size time window": - # a zero-size window results in an empty list of history messages - ## Given - let rpc = HistoryQuery( - contentFilters: @[HistoryContentFilter(contentTopic: ContentTopic("1"))], - startTime: Timestamp(2), - endTime: Timestamp(2) - ) - - ## When - let res = await proto.query(rpc) - - ## Then - check res.isOk() - - let response = res.tryGet() - check: - response.messages.len == 0 - - asyncTest "handle temporal history query with an invalid time window": - # A history query with an invalid time range results in an empty list of history messages - ## Given - let rpc = HistoryQuery( - contentFilters: @[HistoryContentFilter(contentTopic: ContentTopic("1"))], - startTime: Timestamp(5), - endTime: Timestamp(2) - ) - - ## When - let res = await proto.query(rpc) - - ## Then - check res.isOk() - - let response = res.tryGet() - check: - response.messages.len == 0 - - asyncTest "resume message history": - ## Given - # Start a new node - let (listenSwitch3, dialSwitch3, proto3) = await newTestWakuStore(peer=some(listenSwitch.peerInfo.toRemotePeerInfo())) - - ## When - let successResult = await proto3.resume() - - ## Then - check: - successResult.isOk() - successResult.value == 10 - proto3.store.getMessagesCount().tryGet() == 10 - - ## Cleanup - await allFutures(dialSwitch3.stop(), listenSwitch3.stop()) - - asyncTest "queryFromWithPaging - no pagingInfo": - ## Given - let rpc = HistoryQuery(startTime: Timestamp(2), endTime: Timestamp(5)) - - ## When - let res = await proto.queryFromWithPaging(rpc, listenSwitch.peerInfo.toRemotePeerInfo()) - - ## Then - check res.isOk() - - let response = res.tryGet() - check: - response.len == 4 - - asyncTest "queryFromWithPaging - with pagination": - var pinfo = PagingInfo(direction:PagingDirection.FORWARD, pageSize: 1) - let rpc = HistoryQuery(startTime: Timestamp(2), endTime: Timestamp(5), pagingInfo: pinfo) - - let messagesResult = await proto.queryFromWithPaging(rpc, listenSwitch.peerInfo.toRemotePeerInfo()) - - check: - messagesResult.isOk - messagesResult.value.len == 4 - - asyncTest "resume history from a list of offline peers": - var offListenSwitch = newStandardSwitch(some(PrivateKey.random(ECDSA, rng[]).get())) - var dialSwitch3 = newStandardSwitch() - await dialSwitch3.start() - let proto3 = WakuStore.init(PeerManager.new(dialSwitch3), crypto.newRng()) - let successResult = await proto3.resume(some(@[offListenSwitch.peerInfo.toRemotePeerInfo()])) - check: - successResult.isErr - - #free resources - await allFutures(dialSwitch3.stop(), - offListenSwitch.stop()) - - asyncTest "resume history from a list of candidate peers": - - let offListenSwitch = newStandardSwitch(some(PrivateKey.random(ECDSA, rng[]).get())) - let (listenSwitch3, dialSwitch3, proto3) = await newTestWakuStore() - - ## When - let res = await proto3.resume(some(@[ - offListenSwitch.peerInfo.toRemotePeerInfo(), - listenSwitch.peerInfo.toRemotePeerInfo(), - listenSwitch2.peerInfo.toRemotePeerInfo() - ])) - - ## Then - # `proto3` is expected to retrieve 14 messages because: - # - the store mounted on `listenSwitch` holds 10 messages (`msgList`) - # - the store mounted on `listenSwitch2` holds 7 messages (see `msgList2`) - # - both stores share 3 messages, resulting in 14 unique messages in total - check res.isOk() - - let response = res.tryGet() - check: - response == 14 - proto3.store.getMessagesCount().tryGet() == 14 - - ## Cleanup - await allFutures(listenSwitch3.stop(), dialSwitch3.stop(), offListenSwitch.stop()) - - ## Cleanup - await allFutures(dialSwitch.stop(), dialSwitch2.stop(), listenSwitch.stop(), listenSwitch2.stop()) diff --git a/tests/v2/test_waku_store_resume.nim b/tests/v2/test_waku_store_resume.nim new file mode 100644 index 000000000..8a5f7b39e --- /dev/null +++ b/tests/v2/test_waku_store_resume.nim @@ -0,0 +1,203 @@ +{.used.} + +import + std/[options, tables, sets, times], + stew/byteutils, + testutils/unittests, + chronos, + chronicles, + libp2p/switch, + libp2p/crypto/crypto +import + ../../waku/v2/protocol/waku_message, + ../../waku/v2/protocol/waku_store, + ../../waku/v2/node/storage/sqlite, + ../../waku/v2/node/storage/message/sqlite_store, + ../../waku/v2/node/peer_manager/peer_manager, + ../../waku/v2/utils/time, + ../test_helpers + + +const + DefaultPubsubTopic = "/waku/2/default-waku/proto" + DefaultContentTopic = ContentTopic("/waku/2/default-content/proto") + + +proc now(): Timestamp = + getNanosecondTime(getTime().toUnixFloat()) + +proc ts(offset=0, origin=now()): Timestamp = + origin + getNanosecondTime(offset) + +proc newTestDatabase(): SqliteDatabase = + SqliteDatabase.init("", inMemory = true).tryGet() + +proc fakeWakuMessage( + payload = toBytes("TEST-PAYLOAD"), + contentTopic = DefaultContentTopic, + ts = now(), + ephemeral = false, +): WakuMessage = + WakuMessage( + payload: payload, + contentTopic: contentTopic, + version: 1, + timestamp: ts, + ephemeral: ephemeral, + ) + +proc newTestSwitch(key=none(PrivateKey), address=none(MultiAddress)): Switch = + let peerKey = key.get(PrivateKey.random(ECDSA, rng[]).get()) + let peerAddr = address.get(MultiAddress.init("/ip4/127.0.0.1/tcp/0").get()) + return newStandardSwitch(some(peerKey), addrs=peerAddr) + +proc newTestMessageStore(): MessageStore = + let database = newTestDatabase() + SqliteStore.init(database).tryGet() + +proc newTestWakuStore(switch: Switch, store=newTestMessageStore()): WakuStore = + let + peerManager = PeerManager.new(switch) + rng = crypto.newRng() + proto = WakuStore.init(peerManager, rng, store) + + waitFor proto.start() + switch.mount(proto) + + return proto + + +procSuite "Waku Store - resume store": + ## Fixtures + let storeA = block: + let store = newTestMessageStore() + + let msgList = @[ + fakeWakuMessage(payload= @[byte 0], contentTopic=ContentTopic("2"), ts=ts(0)), + fakeWakuMessage(payload= @[byte 1], contentTopic=ContentTopic("1"), ts=ts(1)), + fakeWakuMessage(payload= @[byte 2], contentTopic=ContentTopic("2"), ts=ts(2)), + fakeWakuMessage(payload= @[byte 3], contentTopic=ContentTopic("1"), ts=ts(3)), + fakeWakuMessage(payload= @[byte 4], contentTopic=ContentTopic("2"), ts=ts(4)), + fakeWakuMessage(payload= @[byte 5], contentTopic=ContentTopic("1"), ts=ts(5)), + fakeWakuMessage(payload= @[byte 6], contentTopic=ContentTopic("2"), ts=ts(6)), + fakeWakuMessage(payload= @[byte 7], contentTopic=ContentTopic("1"), ts=ts(7)), + fakeWakuMessage(payload= @[byte 8], contentTopic=ContentTopic("2"), ts=ts(8)), + fakeWakuMessage(payload= @[byte 9], contentTopic=ContentTopic("1"), ts=ts(9)) + ] + + for msg in msgList: + require store.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp).isOk() + + store + + let storeB = block: + let store = newTestMessageStore() + let msgList2 = @[ + fakeWakuMessage(payload= @[byte 0], contentTopic=ContentTopic("2"), ts=ts(0)), + fakeWakuMessage(payload= @[byte 11], contentTopic=ContentTopic("1"), ts=ts(1)), + fakeWakuMessage(payload= @[byte 12], contentTopic=ContentTopic("2"), ts=ts(2)), + fakeWakuMessage(payload= @[byte 3], contentTopic=ContentTopic("1"), ts=ts(3)), + fakeWakuMessage(payload= @[byte 4], contentTopic=ContentTopic("2"), ts=ts(4)), + fakeWakuMessage(payload= @[byte 5], contentTopic=ContentTopic("1"), ts=ts(5)), + fakeWakuMessage(payload= @[byte 13], contentTopic=ContentTopic("2"), ts=ts(6)), + fakeWakuMessage(payload= @[byte 14], contentTopic=ContentTopic("1"), ts=ts(7)) + ] + + for msg in msgList2: + require store.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp).isOk() + + store + + + asyncTest "resume message history": + ## Setup + let + serverSwitch = newTestSwitch() + clientSwitch = newTestSwitch() + + await allFutures(serverSwitch.start(), clientSwitch.start()) + + let + _ = newTestWakuStore(serverSwitch, store=storeA) + client = newTestWakuStore(clientSwitch) + + client.setPeer(serverSwitch.peerInfo.toRemotePeerInfo()) + + ## When + let res = await client.resume() + + ## Then + check res.isOk() + + let resumedMessagesCount = res.tryGet() + let storedMessagesCount = client.store.getMessagesCount().tryGet() + check: + resumedMessagesCount == 10 + storedMessagesCount == 10 + + ## Cleanup + await allFutures(clientSwitch.stop(), serverSwitch.stop()) + + asyncTest "resume history from a list of candidates - offline peer": + ## Setup + let + clientSwitch = newTestSwitch() + offlineSwitch = newTestSwitch() + + await clientSwitch.start() + + let client = newTestWakuStore(clientSwitch) + + ## Given + let peers = @[offlineSwitch.peerInfo.toRemotePeerInfo()] + + ## When + let res = await client.resume(some(peers)) + + ## Then + check res.isErr() + + ## Cleanup + await clientSwitch.stop() + + asyncTest "resume history from a list of candidates - online and offline peers": + ## Setup + let + offlineSwitch = newTestSwitch() + serverASwitch = newTestSwitch() + serverBSwitch = newTestSwitch() + clientSwitch = newTestSwitch() + + await allFutures(serverASwitch.start(), serverBSwitch.start(), clientSwitch.start()) + + let + serverA = newTestWakuStore(serverASwitch, store=storeA) + serverB = newTestWakuStore(serverBSwitch, store=storeB) + client = newTestWakuStore(clientSwitch) + + ## Given + let peers = @[ + offlineSwitch.peerInfo.toRemotePeerInfo(), + serverASwitch.peerInfo.toRemotePeerInfo(), + serverBSwitch.peerInfo.toRemotePeerInfo() + ] + + ## When + let res = await client.resume(some(peers)) + + ## Then + # `client` is expected to retrieve 14 messages: + # - The store mounted on `serverB` holds 10 messages (see `storeA` fixture) + # - The store mounted on `serverB` holds 7 messages (see `storeB` fixture) + # Both stores share 3 messages, resulting in 14 unique messages in total + check res.isOk() + + let restoredMessagesCount = res.tryGet() + let storedMessagesCount = client.store.getMessagesCount().tryGet() + check: + restoredMessagesCount == 14 + storedMessagesCount == 14 + + ## Cleanup + await allFutures(serverASwitch.stop(), serverBSwitch.stop(), clientSwitch.stop()) + diff --git a/waku/v2/protocol/waku_store/message_store.nim b/waku/v2/protocol/waku_store/message_store.nim index 30e8440e4..d1ee0b22e 100644 --- a/waku/v2/protocol/waku_store/message_store.nim +++ b/waku/v2/protocol/waku_store/message_store.nim @@ -26,7 +26,8 @@ method put*(ms: MessageStore, pubsubTopic: string, message: WakuMessage, digest: method put*(ms: MessageStore, pubsubTopic: string, message: WakuMessage): MessageStoreResult[void] = let digest = computeDigest(message) - receivedTime = getNanosecondTime(getTime().toUnixFloat()) + receivedTime = if message.timestamp > 0: message.timestamp + else: getNanosecondTime(getTime().toUnixFloat()) ms.put(pubsubTopic, message, digest, receivedTime) diff --git a/waku/v2/protocol/waku_store/protocol.nim b/waku/v2/protocol/waku_store/protocol.nim index e666b0a4d..d20c4ac53 100644 --- a/waku/v2/protocol/waku_store/protocol.nim +++ b/waku/v2/protocol/waku_store/protocol.nim @@ -459,6 +459,7 @@ proc resume*(w: WakuStore, for msg in res.get(): let putStoreRes = w.store.put(pubsubTopic, msg) if putStoreRes.isErr(): + warn "failed to insert resumed message into store", error=putStoreRes.error continue added.inc()