From c53bce85783c7243ef99071151f4ee249ccd02e8 Mon Sep 17 00:00:00 2001 From: LNSD Date: Wed, 17 Aug 2022 15:58:04 +0000 Subject: [PATCH] deploy: 126cc3451da46ec04055229115d76b70c2fd53ac --- tests/v2/test_waku_store.nim | 1141 ++++++++++------------ waku/v2/protocol/waku_store/protocol.nim | 392 ++++---- 2 files changed, 712 insertions(+), 821 deletions(-) diff --git a/tests/v2/test_waku_store.nim b/tests/v2/test_waku_store.nim index ac0b22dfd..6293d0fea 100644 --- a/tests/v2/test_waku_store.nim +++ b/tests/v2/test_waku_store.nim @@ -1,677 +1,603 @@ {.used.} import - std/[options, tables, sets, sequtils], + std/[options, tables, sets, sequtils, times], + stew/byteutils, + testutils/unittests, chronos, chronicles, - testutils/unittests, libp2p/switch, - libp2p/protobuf/minprotobuf, - libp2p/stream/[bufferstream, connection], - libp2p/crypto/crypto, - libp2p/protocols/pubsub/rpc/message + libp2p/crypto/crypto import ../../waku/v2/protocol/waku_message, ../../waku/v2/protocol/waku_store, - ../../waku/v2/node/storage/message/waku_message_store, ../../waku/v2/node/storage/message/waku_store_queue, + ../../waku/v2/node/storage/message/waku_message_store, ../../waku/v2/node/peer_manager/peer_manager, ../../waku/v2/utils/pagination, ../../waku/v2/utils/time, - ../test_helpers, - ./utils + ../test_helpers -procSuite "Waku Store": - const defaultContentTopic = ContentTopic("1") + +const + DefaultPubsubTopic = "/waku/2/default-waku/proto" + DefaultContentTopic = ContentTopic("/waku/2/default-content/proto") + + +proc newTestDatabase(): SqliteDatabase = + SqliteDatabase.init("", inMemory = true).tryGet() + +proc fakeWakuMessage( + payload = "TEST-PAYLOAD", + contentTopic = DefaultContentTopic, + ts = getNanosecondTime(epochTime()) +): WakuMessage = + WakuMessage( + payload: toBytes(payload), + contentTopic: contentTopic, + version: 1, + timestamp: ts + ) + +proc newTestSwitch(key=none(PrivateKey), address=none(MultiAddress)): Switch = + let peerKey = key.get(PrivateKey.random(ECDSA, rng[]).get()) + let peerAddr = address.get(MultiAddress.init("/ip4/127.0.0.1/tcp/0").get()) + return newStandardSwitch(some(peerKey), addrs=peerAddr) + + +proc newTestWakuStore(switch: Switch): WakuStore = + let + peerManager = PeerManager.new(switch) + rng = crypto.newRng() + database = newTestDatabase() + store = WakuMessageStore.init(database).tryGet() + proto = WakuStore.init(peerManager, rng, store) + + switch.mount(proto) + + return proto + + +suite "Waku Store": asyncTest "handle query": + ## Setup + let + serverSwitch = newTestSwitch() + clientSwitch = newTestSwitch() + + await allFutures(serverSwitch.start(), clientSwitch.start()) + + let + serverProto = newTestWakuStore(serverSwitch) + clientProto = newTestWakuStore(clientSwitch) + + clientProto.setPeer(serverSwitch.peerInfo.toRemotePeerInfo()) + + + ## Given + let topic = ContentTopic("1") let - key = PrivateKey.random(ECDSA, rng[]).get() - peer = PeerInfo.new(key) - topic = defaultContentTopic - msg = WakuMessage(payload: @[byte 1, 2, 3], contentTopic: topic) - msg2 = WakuMessage(payload: @[byte 1, 2, 3], contentTopic: ContentTopic("2")) + msg1 = fakeWakuMessage(contentTopic=topic) + msg2 = fakeWakuMessage() - var dialSwitch = newStandardSwitch() - await dialSwitch.start() + await serverProto.handleMessage("foo", msg1) + await serverProto.handleMessage("foo", msg2) - var listenSwitch = newStandardSwitch(some(key)) - await listenSwitch.start() - - let - database = SqliteDatabase.init("", inMemory = true)[] - store = WakuMessageStore.init(database)[] - proto = WakuStore.init(PeerManager.new(dialSwitch), crypto.newRng(), store) - rpc = HistoryQuery(contentFilters: @[HistoryContentFilter(contentTopic: topic)]) - - proto.setPeer(listenSwitch.peerInfo.toRemotePeerInfo()) - - listenSwitch.mount(proto) - - await proto.handleMessage("foo", msg) - await proto.handleMessage("foo", msg2) - - var completionFut = newFuture[bool]() - - proc handler(response: HistoryResponse) {.gcsafe, closure.} = - check: - response.messages.len() == 1 - response.messages[0] == msg - completionFut.complete(true) - - await proto.query(rpc, handler) + ## When + let rpc = HistoryQuery(contentFilters: @[HistoryContentFilter(contentTopic: topic)]) + let resQuery = await clientProto.query(rpc) + ## Then check: - (await completionFut.withTimeout(5.seconds)) == true + resQuery.isOk() - # free resources - await allFutures(dialSwitch.stop(), - listenSwitch.stop()) + let response = resQuery.tryGet() + check: + response.messages.len == 1 + response.messages[0] == msg1 + + ## Cleanup + await allFutures(serverSwitch.stop(), clientSwitch.stop()) asyncTest "handle query with multiple content filters": + ## Setup + let + serverSwitch = newTestSwitch() + clientSwitch = newTestSwitch() + + await allFutures(serverSwitch.start(), clientSwitch.start()) + + let + serverProto = newTestWakuStore(serverSwitch) + clientProto = newTestWakuStore(clientSwitch) + + clientProto.setPeer(serverSwitch.peerInfo.toRemotePeerInfo()) + + ## Given let - key = PrivateKey.random(ECDSA, rng[]).get() - peer = PeerInfo.new(key) - topic1 = defaultContentTopic + topic1 = ContentTopic("1") topic2 = ContentTopic("2") topic3 = ContentTopic("3") - msg1 = WakuMessage(payload: @[byte 1, 2, 3], contentTopic: topic1) - msg2 = WakuMessage(payload: @[byte 1, 2, 3], contentTopic: topic2) - msg3 = WakuMessage(payload: @[byte 1, 2, 3], contentTopic: topic3) - - var dialSwitch = newStandardSwitch() - await dialSwitch.start() - - var listenSwitch = newStandardSwitch(some(key)) - await listenSwitch.start() let - database = SqliteDatabase.init("", inMemory = true)[] - store = WakuMessageStore.init(database)[] - proto = WakuStore.init(PeerManager.new(dialSwitch), crypto.newRng(), store) - rpc = HistoryQuery(contentFilters: @[HistoryContentFilter(contentTopic: topic1), HistoryContentFilter(contentTopic: topic3)]) + msg1 = fakeWakuMessage(contentTopic=topic1) + msg2 = fakeWakuMessage(contentTopic=topic2) + msg3 = fakeWakuMessage(contentTopic=topic3) - proto.setPeer(listenSwitch.peerInfo.toRemotePeerInfo()) - - listenSwitch.mount(proto) - - await proto.handleMessage("foo", msg1) - await proto.handleMessage("foo", msg2) - await proto.handleMessage("foo", msg3) - - var completionFut = newFuture[bool]() - - proc handler(response: HistoryResponse) {.gcsafe, closure.} = - check: - response.messages.len() == 2 - response.messages.anyIt(it == msg1) - response.messages.anyIt(it == msg3) - completionFut.complete(true) - - await proto.query(rpc, handler) + await serverProto.handleMessage("foo", msg1) + await serverProto.handleMessage("foo", msg2) + await serverProto.handleMessage("foo", msg3) + + ## When + let rpc = HistoryQuery(contentFilters: @[ + HistoryContentFilter(contentTopic: topic1), + HistoryContentFilter(contentTopic: topic3) + ]) + let resQuery = await clientProto.query(rpc) + ## Then check: - (await completionFut.withTimeout(5.seconds)) == true + resQuery.isOk() - # free resources - await allFutures(dialSwitch.stop(), - listenSwitch.stop()) + let response = resQuery.tryGet() + check: + response.messages.len() == 2 + response.messages.anyIt(it == msg1) + response.messages.anyIt(it == msg3) + + ## Cleanup + await allFutures(clientSwitch.stop(), serverSwitch.stop()) asyncTest "handle query with pubsub topic filter": + ## Setup + let + serverSwitch = newTestSwitch() + clientSwitch = newTestSwitch() + + await allFutures(serverSwitch.start(), clientSwitch.start()) + + let + serverProto = newTestWakuStore(serverSwitch) + clientProto = newTestWakuStore(clientSwitch) + + clientProto.setPeer(serverSwitch.peerInfo.toRemotePeerInfo()) + + ## Given let - key = PrivateKey.random(ECDSA, rng[]).get() - peer = PeerInfo.new(key) - contentTopic1 = defaultContentTopic + pubsubTopic1 = "queried-topic" + pubsubTopic2 = "non-queried-topic" + + let + contentTopic1 = ContentTopic("1") contentTopic2 = ContentTopic("2") contentTopic3 = ContentTopic("3") - msg1 = WakuMessage(payload: @[byte 1, 2, 3], contentTopic: contentTopic1) - msg2 = WakuMessage(payload: @[byte 1, 2, 3], contentTopic: contentTopic2) - msg3 = WakuMessage(payload: @[byte 1, 2, 3], contentTopic: contentTopic3) - - var dialSwitch = newStandardSwitch() - await dialSwitch.start() - - var listenSwitch = newStandardSwitch(some(key)) - await listenSwitch.start() let - database = SqliteDatabase.init("", inMemory = true)[] - store = WakuMessageStore.init(database)[] - proto = WakuStore.init(PeerManager.new(dialSwitch), crypto.newRng(), store) - pubsubtopic1 = "queried topic" - pubsubtopic2 = "non queried topic" - # this query targets: pubsubtopic1 AND (contentTopic1 OR contentTopic3) - rpc = HistoryQuery(contentFilters: @[HistoryContentFilter(contentTopic: contentTopic1), HistoryContentFilter(contentTopic: contentTopic3)], pubsubTopic: pubsubTopic1) + msg1 = fakeWakuMessage(contentTopic=contentTopic1) + msg2 = fakeWakuMessage(contentTopic=contentTopic2) + msg3 = fakeWakuMessage(contentTopic=contentTopic3) - proto.setPeer(listenSwitch.peerInfo.toRemotePeerInfo()) - - listenSwitch.mount(proto) - - # publish messages - await proto.handleMessage(pubsubtopic1, msg1) - await proto.handleMessage(pubsubtopic2, msg2) - await proto.handleMessage(pubsubtopic2, msg3) - - var completionFut = newFuture[bool]() - - proc handler(response: HistoryResponse) {.gcsafe, closure.} = - check: - response.messages.len() == 1 - # msg1 is the only match for the query predicate pubsubtopic1 AND (contentTopic1 OR contentTopic3) - response.messages.anyIt(it == msg1) - completionFut.complete(true) - - await proto.query(rpc, handler) - - check: - (await completionFut.withTimeout(5.seconds)) == true - - # free resources - await allFutures(dialSwitch.stop(), - listenSwitch.stop()) - - asyncTest "handle query with pubsub topic filter with no match": - let - key = PrivateKey.random(ECDSA, rng[]).get() - peer = PeerInfo.new(key) - msg1 = WakuMessage(payload: @[byte 1, 2, 3], contentTopic: defaultContentTopic) - msg2 = WakuMessage(payload: @[byte 1, 2, 3], contentTopic: defaultContentTopic) - msg3 = WakuMessage(payload: @[byte 1, 2, 3], contentTopic: defaultContentTopic) - - var dialSwitch = newStandardSwitch() - await dialSwitch.start() - - var listenSwitch = newStandardSwitch(some(key)) - await listenSwitch.start() - - let - database = SqliteDatabase.init("", inMemory = true)[] - store = WakuMessageStore.init(database)[] - proto = WakuStore.init(PeerManager.new(dialSwitch), crypto.newRng(), store) - pubsubtopic1 = "queried topic" - pubsubtopic2 = "non queried topic" - # this query targets: pubsubtopic1 - rpc = HistoryQuery(pubsubTopic: pubsubTopic1) - - proto.setPeer(listenSwitch.peerInfo.toRemotePeerInfo()) - - listenSwitch.mount(proto) - - # publish messages - await proto.handleMessage(pubsubtopic2, msg1) - await proto.handleMessage(pubsubtopic2, msg2) - await proto.handleMessage(pubsubtopic2, msg3) - - var completionFut = newFuture[bool]() - - proc handler(response: HistoryResponse) {.gcsafe, closure.} = - check: - response.messages.len() == 0 - completionFut.complete(true) - - await proto.query(rpc, handler) - - check: - (await completionFut.withTimeout(5.seconds)) == true - - # free resources - await allFutures(dialSwitch.stop(), - listenSwitch.stop()) - - asyncTest "handle query with pubsub topic filter matching the entire stored messages": - let - key = PrivateKey.random(ECDSA, rng[]).get() - peer = PeerInfo.new(key) - msg1 = WakuMessage(payload: @[byte 1, 2, 3], contentTopic: defaultContentTopic) - msg2 = WakuMessage(payload: @[byte 4, 5, 6], contentTopic: defaultContentTopic) - msg3 = WakuMessage(payload: @[byte 7, 8, 9,], contentTopic: defaultContentTopic) - - var dialSwitch = newStandardSwitch() - await dialSwitch.start() - - var listenSwitch = newStandardSwitch(some(key)) - await listenSwitch.start() - - let - database = SqliteDatabase.init("", inMemory = true)[] - store = WakuMessageStore.init(database)[] - proto = WakuStore.init(PeerManager.new(dialSwitch), crypto.newRng(), store) - pubsubtopic = "queried topic" - # this query targets: pubsubtopic - rpc = HistoryQuery(pubsubTopic: pubsubtopic) - - proto.setPeer(listenSwitch.peerInfo.toRemotePeerInfo()) - - listenSwitch.mount(proto) - - # publish messages - await proto.handleMessage(pubsubtopic, msg1) - await proto.handleMessage(pubsubtopic, msg2) - await proto.handleMessage(pubsubtopic, msg3) - - var completionFut = newFuture[bool]() - - proc handler(response: HistoryResponse) {.gcsafe, closure.} = - check: - response.messages.len() == 3 - response.messages.anyIt(it == msg1) - response.messages.anyIt(it == msg2) - response.messages.anyIt(it == msg3) - completionFut.complete(true) - - await proto.query(rpc, handler) - - check: - (await completionFut.withTimeout(5.seconds)) == true - - # free resources - await allFutures(dialSwitch.stop(), - listenSwitch.stop()) - - asyncTest "handle query with store and restarts": - let - key = PrivateKey.random(ECDSA, rng[]).get() - peer = PeerInfo.new(key) - topic = defaultContentTopic - database = SqliteDatabase.init("", inMemory = true)[] - store = WakuMessageStore.init(database)[] - msg = WakuMessage(payload: @[byte 1, 2, 3], contentTopic: topic) - msg2 = WakuMessage(payload: @[byte 1, 2, 3], contentTopic: ContentTopic("2")) - - var dialSwitch = newStandardSwitch() - await dialSwitch.start() - - var listenSwitch = newStandardSwitch(some(key)) - await listenSwitch.start() - - let - proto = WakuStore.init(PeerManager.new(dialSwitch), crypto.newRng(), store) - rpc = HistoryQuery(contentFilters: @[HistoryContentFilter(contentTopic: topic)]) - - proto.setPeer(listenSwitch.peerInfo.toRemotePeerInfo()) - - listenSwitch.mount(proto) - - await proto.handleMessage("foo", msg) - await sleepAsync(1.millis) # Sleep a millisecond to ensure messages are stored chronologically - await proto.handleMessage("foo", msg2) - - var completionFut = newFuture[bool]() - - proc handler(response: HistoryResponse) {.gcsafe, closure.} = - check: - response.messages.len() == 1 - response.messages[0] == msg - completionFut.complete(true) - - await proto.query(rpc, handler) - - check: - (await completionFut.withTimeout(5.seconds)) == true - - let - proto2 = WakuStore.init(PeerManager.new(dialSwitch), crypto.newRng(), store) - key2 = PrivateKey.random(ECDSA, rng[]).get() - - var listenSwitch2 = newStandardSwitch(some(key2)) - await listenSwitch2.start() - - proto2.setPeer(listenSwitch2.peerInfo.toRemotePeerInfo()) - - listenSwitch2.mount(proto2) - - var completionFut2 = newFuture[bool]() - proc handler2(response: HistoryResponse) {.gcsafe, closure.} = - check: - response.messages.len() == 1 - response.messages[0] == msg - completionFut2.complete(true) - - await proto2.query(rpc, handler2) - - check: - (await completionFut2.withTimeout(5.seconds)) == true + await serverProto.handleMessage(pubsubtopic1, msg1) + await serverProto.handleMessage(pubsubtopic2, msg2) + await serverProto.handleMessage(pubsubtopic2, msg3) - # free resources - await allFutures(dialSwitch.stop(), - listenSwitch.stop()) + ## When + # this query targets: pubsubtopic1 AND (contentTopic1 OR contentTopic3) + let rpc = HistoryQuery( + contentFilters: @[HistoryContentFilter(contentTopic: contentTopic1), + HistoryContentFilter(contentTopic: contentTopic3)], + pubsubTopic: pubsubTopic1 + ) + let resQuery = await clientProto.query(rpc) + + ## Then + check: + resQuery.isOk() + + let response = resQuery.tryGet() + check: + response.messages.len() == 1 + response.messages.anyIt(it == msg1) + + ## Cleanup + await allFutures(clientSwitch.stop(), serverSwitch.stop()) + + asyncTest "handle query with pubsub topic filter - no match": + ## Setup + let + serverSwitch = newTestSwitch() + clientSwitch = newTestSwitch() + + await allFutures(serverSwitch.start(), clientSwitch.start()) + + let + serverProto = newTestWakuStore(serverSwitch) + clientProto = newTestWakuStore(clientSwitch) + + clientProto.setPeer(serverSwitch.peerInfo.toRemotePeerInfo()) + + ## Given + let + pubsubtopic1 = "queried-topic" + pubsubtopic2 = "non-queried-topic" + + let + msg1 = fakeWakuMessage() + msg2 = fakeWakuMessage() + msg3 = fakeWakuMessage() + + await serverProto.handleMessage(pubsubtopic2, msg1) + await serverProto.handleMessage(pubsubtopic2, msg2) + await serverProto.handleMessage(pubsubtopic2, msg3) + + ## When + let rpc = HistoryQuery(pubsubTopic: pubsubTopic1) + let res = await clientProto.query(rpc) + + ## Then + check: + res.isOk() + + let response = res.tryGet() + check: + response.messages.len() == 0 + + ## Cleanup + await allFutures(clientSwitch.stop(), serverSwitch.stop()) + + asyncTest "handle query with pubsub topic filter - match the entire stored messages": + ## Setup + let + serverSwitch = newTestSwitch() + clientSwitch = newTestSwitch() + + await allFutures(serverSwitch.start(), clientSwitch.start()) + + let + serverProto = newTestWakuStore(serverSwitch) + clientProto = newTestWakuStore(clientSwitch) + + clientProto.setPeer(serverSwitch.peerInfo.toRemotePeerInfo()) + + ## Given + let pubsubTopic = "queried-topic" + + let + msg1 = fakeWakuMessage(payload="TEST-1") + msg2 = fakeWakuMessage(payload="TEST-2") + msg3 = fakeWakuMessage(payload="TEST-3") + + await serverProto.handleMessage(pubsubTopic, msg1) + await serverProto.handleMessage(pubsubTopic, msg2) + await serverProto.handleMessage(pubsubTopic, msg3) + + ## When + let rpc = HistoryQuery(pubsubTopic: pubsubTopic) + let res = await clientProto.query(rpc) + + ## Then + check: + res.isOk() + + let response = res.tryGet() + check: + response.messages.len() == 3 + response.messages.anyIt(it == msg1) + response.messages.anyIt(it == msg2) + response.messages.anyIt(it == msg3) + + ## Cleanup + await allFutures(clientSwitch.stop(), serverSwitch.stop()) asyncTest "handle query with forward pagination": - let - key = PrivateKey.random(ECDSA, rng[]).get() - peer = PeerInfo.new(key) - var - msgList = @[WakuMessage(payload: @[byte 0], contentTopic: ContentTopic("2")), - WakuMessage(payload: @[byte 1],contentTopic: defaultContentTopic), - WakuMessage(payload: @[byte 2],contentTopic: defaultContentTopic), - WakuMessage(payload: @[byte 3],contentTopic: defaultContentTopic), - WakuMessage(payload: @[byte 4],contentTopic: defaultContentTopic), - WakuMessage(payload: @[byte 5],contentTopic: defaultContentTopic), - WakuMessage(payload: @[byte 6],contentTopic: defaultContentTopic), - WakuMessage(payload: @[byte 7],contentTopic: defaultContentTopic), - WakuMessage(payload: @[byte 8],contentTopic: defaultContentTopic), - WakuMessage(payload: @[byte 9],contentTopic: ContentTopic("2"))] - - var dialSwitch = newStandardSwitch() - await dialSwitch.start() - - var listenSwitch = newStandardSwitch(some(key)) - await listenSwitch.start() - - let - database = SqliteDatabase.init("", inMemory = true)[] - store = WakuMessageStore.init(database)[] - proto = WakuStore.init(PeerManager.new(dialSwitch), crypto.newRng(), store) - rpc = HistoryQuery(contentFilters: @[HistoryContentFilter(contentTopic: defaultContentTopic)], pagingInfo: PagingInfo(pageSize: 2, direction: PagingDirection.FORWARD) ) + ## Setup + let + serverSwitch = newTestSwitch() + clientSwitch = newTestSwitch() + + await allFutures(serverSwitch.start(), clientSwitch.start()) - proto.setPeer(listenSwitch.peerInfo.toRemotePeerInfo()) + let + serverProto = newTestWakuStore(serverSwitch) + clientProto = newTestWakuStore(clientSwitch) - listenSwitch.mount(proto) + clientProto.setPeer(serverSwitch.peerInfo.toRemotePeerInfo()) - for wakuMsg in msgList: - await proto.handleMessage("foo", wakuMsg) - await sleepAsync(1.millis) # Sleep a millisecond to ensure messages are stored chronologically + ## Given + let msgList = @[ + WakuMessage(payload: @[byte 0], contentTopic: ContentTopic("2")), + WakuMessage(payload: @[byte 1], contentTopic: DefaultContentTopic), + WakuMessage(payload: @[byte 2], contentTopic: DefaultContentTopic), + WakuMessage(payload: @[byte 3], contentTopic: DefaultContentTopic), + WakuMessage(payload: @[byte 4], contentTopic: DefaultContentTopic), + WakuMessage(payload: @[byte 5], contentTopic: DefaultContentTopic), + WakuMessage(payload: @[byte 6], contentTopic: DefaultContentTopic), + WakuMessage(payload: @[byte 7], contentTopic: DefaultContentTopic), + WakuMessage(payload: @[byte 8], contentTopic: DefaultContentTopic), + WakuMessage(payload: @[byte 9], contentTopic: ContentTopic("2")) + ] - var completionFut = newFuture[bool]() + for msg in msgList: + await serverProto.handleMessage("foo", msg) - proc handler(response: HistoryResponse) {.gcsafe, closure.} = - check: - response.messages.len() == 2 - response.pagingInfo.pageSize == 2 - response.pagingInfo.direction == PagingDirection.FORWARD - response.pagingInfo.cursor != Index() - completionFut.complete(true) - - await proto.query(rpc, handler) + ## When + let rpc = HistoryQuery( + contentFilters: @[HistoryContentFilter(contentTopic: DefaultContentTopic)], + pagingInfo: PagingInfo(pageSize: 2, direction: PagingDirection.FORWARD) + ) + let res = await clientProto.query(rpc) + ## Then check: - (await completionFut.withTimeout(5.seconds)) == true + res.isOk() - # free resources - await allFutures(dialSwitch.stop(), - listenSwitch.stop()) + let response = res.tryGet() + check: + response.messages.len() == 2 + response.pagingInfo.pageSize == 2 + response.pagingInfo.direction == PagingDirection.FORWARD + response.pagingInfo.cursor != Index() + + ## Cleanup + await allFutures(clientSwitch.stop(), serverSwitch.stop()) asyncTest "handle query with backward pagination": + ## Setup + let + serverSwitch = newTestSwitch() + clientSwitch = newTestSwitch() + + await allFutures(serverSwitch.start(), clientSwitch.start()) + + let + serverProto = newTestWakuStore(serverSwitch) + clientProto = newTestWakuStore(clientSwitch) + + clientProto.setPeer(serverSwitch.peerInfo.toRemotePeerInfo()) + + ## Given + let msgList = @[ + WakuMessage(payload: @[byte 0], contentTopic: ContentTopic("2")), + WakuMessage(payload: @[byte 1], contentTopic: DefaultContentTopic), + WakuMessage(payload: @[byte 2], contentTopic: DefaultContentTopic), + WakuMessage(payload: @[byte 3], contentTopic: DefaultContentTopic), + WakuMessage(payload: @[byte 4], contentTopic: DefaultContentTopic), + WakuMessage(payload: @[byte 5], contentTopic: DefaultContentTopic), + WakuMessage(payload: @[byte 6], contentTopic: DefaultContentTopic), + WakuMessage(payload: @[byte 7], contentTopic: DefaultContentTopic), + WakuMessage(payload: @[byte 8], contentTopic: DefaultContentTopic), + WakuMessage(payload: @[byte 9], contentTopic: ContentTopic("2")) + ] + + for msg in msgList: + await serverProto.handleMessage("foo", msg) + + ## When + let rpc = HistoryQuery( + contentFilters: @[HistoryContentFilter(contentTopic: DefaultContentTopic)], + pagingInfo: PagingInfo(pageSize: 2, direction: PagingDirection.BACKWARD) + ) + let res = await clientProto.query(rpc) + + ## Then + check: + res.isOk() + + let response = res.tryGet() + check: + response.messages.len() == 2 + response.pagingInfo.pageSize == 2 + response.pagingInfo.direction == PagingDirection.BACKWARD + response.pagingInfo.cursor != Index() + + ## Cleanup + await allFutures(clientSwitch.stop(), serverSwitch.stop()) + + asyncTest "handle query with no paging info - auto-pagination": + ## Setup + let + serverSwitch = newTestSwitch() + clientSwitch = newTestSwitch() + + await allFutures(serverSwitch.start(), clientSwitch.start()) + + let + serverProto = newTestWakuStore(serverSwitch) + clientProto = newTestWakuStore(clientSwitch) + + clientProto.setPeer(serverSwitch.peerInfo.toRemotePeerInfo()) + + ## Given + let msgList = @[ + WakuMessage(payload: @[byte 0], contentTopic: ContentTopic("2")), + WakuMessage(payload: @[byte 1], contentTopic: DefaultContentTopic), + WakuMessage(payload: @[byte 2], contentTopic: DefaultContentTopic), + WakuMessage(payload: @[byte 3], contentTopic: DefaultContentTopic), + WakuMessage(payload: @[byte 4], contentTopic: DefaultContentTopic), + WakuMessage(payload: @[byte 5], contentTopic: DefaultContentTopic), + WakuMessage(payload: @[byte 6], contentTopic: DefaultContentTopic), + WakuMessage(payload: @[byte 7], contentTopic: DefaultContentTopic), + WakuMessage(payload: @[byte 8], contentTopic: DefaultContentTopic), + WakuMessage(payload: @[byte 9], contentTopic: ContentTopic("2")) + ] + + for msg in msgList: + await serverProto.handleMessage("foo", msg) + + ## When + let rpc = HistoryQuery(contentFilters: @[HistoryContentFilter(contentTopic: DefaultContentTopic)]) + let res = await clientProto.query(rpc) + + ## Then + check: + res.isOk() + + let response = res.tryGet() + check: + ## No pagination specified. Response will be auto-paginated with + ## up to MaxPageSize messages per page. + response.messages.len() == 8 + response.pagingInfo.pageSize == 8 + response.pagingInfo.direction == PagingDirection.BACKWARD + response.pagingInfo.cursor != Index() + + ## Cleanup + await allFutures(clientSwitch.stop(), serverSwitch.stop()) + + +# TODO: Review this test suite test cases +procSuite "Waku Store - fault tolerant store": + + proc newTestWakuStore(peer=none(RemotePeerInfo)): Future[(Switch, Switch, WakuStore)] {.async.} = let key = PrivateKey.random(ECDSA, rng[]).get() - peer = PeerInfo.new(key) - var - msgList = @[WakuMessage(payload: @[byte 0], contentTopic: ContentTopic("2")), - WakuMessage(payload: @[byte 1],contentTopic: defaultContentTopic), - WakuMessage(payload: @[byte 2],contentTopic: defaultContentTopic), - WakuMessage(payload: @[byte 3],contentTopic: defaultContentTopic), - WakuMessage(payload: @[byte 4],contentTopic: defaultContentTopic), - WakuMessage(payload: @[byte 5],contentTopic: defaultContentTopic), - WakuMessage(payload: @[byte 6],contentTopic: defaultContentTopic), - WakuMessage(payload: @[byte 7],contentTopic: defaultContentTopic), - WakuMessage(payload: @[byte 8],contentTopic: defaultContentTopic), - WakuMessage(payload: @[byte 9],contentTopic: ContentTopic("2"))] - - var dialSwitch = newStandardSwitch() - await dialSwitch.start() - - var listenSwitch = newStandardSwitch(some(key)) + listenSwitch = newStandardSwitch(some(key)) await listenSwitch.start() + let dialSwitch = newStandardSwitch() + await dialSwitch.start() + let - database = SqliteDatabase.init("", inMemory = true)[] - store = WakuMessageStore.init(database)[] - proto = WakuStore.init(PeerManager.new(dialSwitch), crypto.newRng(), store) + peerManager = PeerManager.new(dialsWitch) + rng = crypto.newRng() + database = newTestDatabase() + store = WakuMessageStore.init(database).tryGet() + proto = WakuStore.init(peerManager, rng, store) - proto.setPeer(listenSwitch.peerInfo.toRemotePeerInfo()) + let storePeer = peer.get(listenSwitch.peerInfo.toRemotePeerInfo()) + proto.setPeer(storePeer) listenSwitch.mount(proto) - for wakuMsg in msgList: - await proto.handleMessage("foo", wakuMsg) - await sleepAsync(1.millis) # Sleep a millisecond to ensure messages are stored chronologically - var completionFut = newFuture[bool]() + return (listenSwitch, dialSwitch, proto) - proc handler(response: HistoryResponse) {.gcsafe, closure.} = - check: - response.messages.len() == 2 - response.pagingInfo.pageSize == 2 - response.pagingInfo.direction == PagingDirection.BACKWARD - response.pagingInfo.cursor != Index() - completionFut.complete(true) - - let rpc = HistoryQuery(contentFilters: @[HistoryContentFilter(contentTopic: defaultContentTopic)], pagingInfo: PagingInfo(pageSize: 2, direction: PagingDirection.BACKWARD) ) - await proto.query(rpc, handler) - - check: - (await completionFut.withTimeout(5.seconds)) == true - - # free resources - await allFutures(dialSwitch.stop(), - listenSwitch.stop()) - - asyncTest "handle queries with no paging info (auto-paginate)": - let - key = PrivateKey.random(ECDSA, rng[]).get() - peer = PeerInfo.new(key) - var - msgList = @[WakuMessage(payload: @[byte 0], contentTopic: ContentTopic("2")), - WakuMessage(payload: @[byte 1], contentTopic: defaultContentTopic), - WakuMessage(payload: @[byte 2], contentTopic: defaultContentTopic), - WakuMessage(payload: @[byte 3], contentTopic: defaultContentTopic), - WakuMessage(payload: @[byte 4], contentTopic: defaultContentTopic), - WakuMessage(payload: @[byte 5], contentTopic: defaultContentTopic), - WakuMessage(payload: @[byte 6], contentTopic: defaultContentTopic), - WakuMessage(payload: @[byte 7], contentTopic: defaultContentTopic), - WakuMessage(payload: @[byte 8], contentTopic: defaultContentTopic), - WakuMessage(payload: @[byte 9], contentTopic: ContentTopic("2"))] - - var dialSwitch = newStandardSwitch() - await dialSwitch.start() - - var listenSwitch = newStandardSwitch(some(key)) - await listenSwitch.start() - - let - database = SqliteDatabase.init("", inMemory = true)[] - store = WakuMessageStore.init(database)[] - proto = WakuStore.init(PeerManager.new(dialSwitch), crypto.newRng(), store) - - proto.setPeer(listenSwitch.peerInfo.toRemotePeerInfo()) - - listenSwitch.mount(proto) - - for wakuMsg in msgList: - await proto.handleMessage("foo", wakuMsg) - await sleepAsync(1.millis) # Sleep a millisecond to ensure messages are stored chronologically - var completionFut = newFuture[bool]() - - proc handler(response: HistoryResponse) {.gcsafe, closure.} = - check: - ## No pagination specified. Response will be auto-paginated with - ## up to MaxPageSize messages per page. - response.messages.len() == 8 - response.pagingInfo.pageSize == 8 - response.pagingInfo.direction == PagingDirection.BACKWARD - response.pagingInfo.cursor != Index() - completionFut.complete(true) - - let rpc = HistoryQuery(contentFilters: @[HistoryContentFilter(contentTopic: defaultContentTopic)] ) - - await proto.query(rpc, handler) - - check: - (await completionFut.withTimeout(5.seconds)) == true - - # free resources - await allFutures(dialSwitch.stop(), - listenSwitch.stop()) asyncTest "temporal history queries": - let - key = PrivateKey.random(ECDSA, rng[]).get() - peer = PeerInfo.new(key) - key2 = PrivateKey.random(ECDSA, rng[]).get() - # peer2 = PeerInfo.new(key2) - var - msgList = @[WakuMessage(payload: @[byte 0], contentTopic: ContentTopic("2"), timestamp: Timestamp(0)), - WakuMessage(payload: @[byte 1],contentTopic: ContentTopic("1"), timestamp: Timestamp(1)), - WakuMessage(payload: @[byte 2],contentTopic: ContentTopic("2"), timestamp: Timestamp(2)), - WakuMessage(payload: @[byte 3],contentTopic: ContentTopic("1"), timestamp: Timestamp(3)), - WakuMessage(payload: @[byte 4],contentTopic: ContentTopic("2"), timestamp: Timestamp(4)), - WakuMessage(payload: @[byte 5],contentTopic: ContentTopic("1"), timestamp: Timestamp(5)), - WakuMessage(payload: @[byte 6],contentTopic: ContentTopic("2"), timestamp: Timestamp(6)), - WakuMessage(payload: @[byte 7],contentTopic: ContentTopic("1"), timestamp: Timestamp(7)), - WakuMessage(payload: @[byte 8],contentTopic: ContentTopic("2"), timestamp: Timestamp(8)), - WakuMessage(payload: @[byte 9],contentTopic: ContentTopic("1"),timestamp: Timestamp(9))] + ## Setup + let (listenSwitch, dialSwitch, proto) = await newTestWakuStore() + let msgList = @[ + WakuMessage(payload: @[byte 0], contentTopic: ContentTopic("2"), timestamp: Timestamp(0)), + WakuMessage(payload: @[byte 1], contentTopic: ContentTopic("1"), timestamp: Timestamp(1)), + WakuMessage(payload: @[byte 2], contentTopic: ContentTopic("2"), timestamp: Timestamp(2)), + WakuMessage(payload: @[byte 3], contentTopic: ContentTopic("1"), timestamp: Timestamp(3)), + WakuMessage(payload: @[byte 4], contentTopic: ContentTopic("2"), timestamp: Timestamp(4)), + WakuMessage(payload: @[byte 5], contentTopic: ContentTopic("1"), timestamp: Timestamp(5)), + WakuMessage(payload: @[byte 6], contentTopic: ContentTopic("2"), timestamp: Timestamp(6)), + WakuMessage(payload: @[byte 7], contentTopic: ContentTopic("1"), timestamp: Timestamp(7)), + WakuMessage(payload: @[byte 8], contentTopic: ContentTopic("2"), timestamp: Timestamp(8)), + WakuMessage(payload: @[byte 9], contentTopic: ContentTopic("1"), timestamp: Timestamp(9)) + ] - msgList2 = @[WakuMessage(payload: @[byte 0], contentTopic: ContentTopic("2"), timestamp: Timestamp(0)), - WakuMessage(payload: @[byte 11],contentTopic: ContentTopic("1"), timestamp: Timestamp(1)), - WakuMessage(payload: @[byte 12],contentTopic: ContentTopic("2"), timestamp: Timestamp(2)), - WakuMessage(payload: @[byte 3],contentTopic: ContentTopic("1"), timestamp: Timestamp(3)), - WakuMessage(payload: @[byte 4],contentTopic: ContentTopic("2"), timestamp: Timestamp(4)), - WakuMessage(payload: @[byte 5],contentTopic: ContentTopic("1"), timestamp: Timestamp(5)), - WakuMessage(payload: @[byte 13],contentTopic: ContentTopic("2"), timestamp: Timestamp(6)), - WakuMessage(payload: @[byte 14],contentTopic: ContentTopic("1"), timestamp: Timestamp(7))] + for msg in msgList: + await proto.handleMessage(DEFAULT_PUBSUB_TOPIC, msg) - #-------------------- - # setup default test store - #-------------------- - var dialSwitch = newStandardSwitch() - await dialSwitch.start() + let (listenSwitch2, dialSwitch2, proto2) = await newTestWakuStore() + let msgList2 = @[ + WakuMessage(payload: @[byte 0], contentTopic: ContentTopic("2"), timestamp: Timestamp(0)), + WakuMessage(payload: @[byte 11], contentTopic: ContentTopic("1"), timestamp: Timestamp(1)), + WakuMessage(payload: @[byte 12], contentTopic: ContentTopic("2"), timestamp: Timestamp(2)), + WakuMessage(payload: @[byte 3], contentTopic: ContentTopic("1"), timestamp: Timestamp(3)), + WakuMessage(payload: @[byte 4], contentTopic: ContentTopic("2"), timestamp: Timestamp(4)), + WakuMessage(payload: @[byte 5], contentTopic: ContentTopic("1"), timestamp: Timestamp(5)), + WakuMessage(payload: @[byte 13], contentTopic: ContentTopic("2"), timestamp: Timestamp(6)), + WakuMessage(payload: @[byte 14], contentTopic: ContentTopic("1"), timestamp: Timestamp(7)) + ] - # to be connected to - var listenSwitch = newStandardSwitch(some(key)) - await listenSwitch.start() + for msg in msgList2: + await proto2.handleMessage(DEFAULT_PUBSUB_TOPIC, msg) - let - database = SqliteDatabase.init("", inMemory = true)[] - store = WakuMessageStore.init(database)[] - proto = WakuStore.init(PeerManager.new(dialSwitch), crypto.newRng(), store) - - proto.setPeer(listenSwitch.peerInfo.toRemotePeerInfo()) - - listenSwitch.mount(proto) - - for wakuMsg in msgList: - # the pubsub topic should be DefaultTopic - await proto.handleMessage(DefaultTopic, wakuMsg) - - #-------------------- - # setup 2nd test store - #-------------------- - var dialSwitch2 = newStandardSwitch() - await dialSwitch2.start() - - # to be connected to - var listenSwitch2 = newStandardSwitch(some(key2)) - await listenSwitch2.start() - - let - database2 = SqliteDatabase.init("", inMemory = true)[] - store2 = WakuMessageStore.init(database2)[] - proto2 = WakuStore.init(PeerManager.new(dialSwitch2), crypto.newRng(), store2) - - proto2.setPeer(listenSwitch2.peerInfo.toRemotePeerInfo()) - - listenSwitch2.mount(proto2) - - for wakuMsg in msgList2: - # the pubsub topic should be DefaultTopic - await proto2.handleMessage(DefaultTopic, wakuMsg) asyncTest "handle temporal history query with a valid time window": - var completionFut = newFuture[bool]() + ## Given + let rpc = HistoryQuery( + contentFilters: @[HistoryContentFilter(contentTopic: ContentTopic("1"))], + startTime: Timestamp(2), + endTime: Timestamp(5) + ) + + ## When + let res = await proto.query(rpc) - proc handler(response: HistoryResponse) {.gcsafe, closure.} = - check: - response.messages.len() == 2 - response.messages.anyIt(it.timestamp == Timestamp(3)) - response.messages.anyIt(it.timestamp == Timestamp(5)) - completionFut.complete(true) - - let rpc = HistoryQuery(contentFilters: @[HistoryContentFilter(contentTopic: ContentTopic("1"))], startTime: Timestamp(2), endTime: Timestamp(5)) - await proto.query(rpc, handler) + ## Then + check res.isOk() + let response = res.tryGet() check: - (await completionFut.withTimeout(5.seconds)) == true - + response.messages.len() == 2 + response.messages.anyIt(it.timestamp == Timestamp(3)) + response.messages.anyIt(it.timestamp == Timestamp(5)) + asyncTest "handle temporal history query with a zero-size time window": # a zero-size window results in an empty list of history messages - var completionFut = newFuture[bool]() + ## Given + let rpc = HistoryQuery( + contentFilters: @[HistoryContentFilter(contentTopic: ContentTopic("1"))], + startTime: Timestamp(2), + endTime: Timestamp(2) + ) - proc handler(response: HistoryResponse) {.gcsafe, closure.} = - check: - # a zero-size window results in an empty list of history messages - response.messages.len() == 0 - completionFut.complete(true) + ## When + let res = await proto.query(rpc) - let rpc = HistoryQuery(contentFilters: @[HistoryContentFilter(contentTopic: ContentTopic("1"))], startTime: Timestamp(2), endTime: Timestamp(2)) - await proto.query(rpc, handler) + ## Then + check res.isOk() + let response = res.tryGet() check: - (await completionFut.withTimeout(5.seconds)) == true + response.messages.len == 0 asyncTest "handle temporal history query with an invalid time window": - # a history query with an invalid time range results in an empty list of history messages - var completionFut = newFuture[bool]() + # A history query with an invalid time range results in an empty list of history messages + ## Given + let rpc = HistoryQuery( + contentFilters: @[HistoryContentFilter(contentTopic: ContentTopic("1"))], + startTime: Timestamp(5), + endTime: Timestamp(2) + ) - proc handler(response: HistoryResponse) {.gcsafe, closure.} = - check: - # a history query with an invalid time range results in an empty list of history messages - response.messages.len() == 0 - completionFut.complete(true) + ## When + let res = await proto.query(rpc) - # time window is invalid since start time > end time - let rpc = HistoryQuery(contentFilters: @[HistoryContentFilter(contentTopic: ContentTopic("1"))], startTime: Timestamp(5), endTime: Timestamp(2)) - await proto.query(rpc, handler) + ## Then + check res.isOk() + let response = res.tryGet() check: - (await completionFut.withTimeout(5.seconds)) == true + response.messages.len == 0 asyncTest "resume message history": - # starts a new node - var dialSwitch3 = newStandardSwitch() - await dialSwitch3.start() - - let - database3 = SqliteDatabase.init("", inMemory = true)[] - store3 = WakuMessageStore.init(database3)[] - proto3 = WakuStore.init(PeerManager.new(dialSwitch3), crypto.newRng(), store3) - - proto3.setPeer(listenSwitch.peerInfo.toRemotePeerInfo()) + ## Given + # Start a new node + let (listenSwitch3, dialSwitch3, proto3) = await newTestWakuStore(peer=some(listenSwitch.peerInfo.toRemotePeerInfo())) + ## When let successResult = await proto3.resume() + + ## Then check: - successResult.isOk + successResult.isOk() successResult.value == 10 proto3.messages.len == 10 - await dialSwitch3.stop() - - asyncTest "queryFrom": - - var completionFut = newFuture[bool]() - - proc handler(response: HistoryResponse) {.gcsafe, closure.} = - check: - response.messages.len() == 4 - completionFut.complete(true) - - let rpc = HistoryQuery(startTime: Timestamp(2), endTime: Timestamp(5)) - let successResult = await proto.queryFrom(rpc, handler, listenSwitch.peerInfo.toRemotePeerInfo()) - - check: - (await completionFut.withTimeout(5.seconds)) == true - successResult.isOk - successResult.value == 4 - - asyncTest "queryFromWithPaging with empty pagingInfo": + ## Cleanup + await allFutures(dialSwitch3.stop(), listenSwitch3.stop()) + asyncTest "queryFromWithPaging - no pagingInfo": + ## Given let rpc = HistoryQuery(startTime: Timestamp(2), endTime: Timestamp(5)) - let messagesResult = await proto.queryFromWithPaging(rpc, listenSwitch.peerInfo.toRemotePeerInfo()) + ## When + let res = await proto.queryFromWithPaging(rpc, listenSwitch.peerInfo.toRemotePeerInfo()) + ## Then + check res.isOk() + + let response = res.tryGet() check: - messagesResult.isOk - messagesResult.value.len == 4 + response.len == 4 - asyncTest "queryFromWithPaging with pagination": + asyncTest "queryFromWithPaging - with pagination": var pinfo = PagingInfo(direction:PagingDirection.FORWARD, pageSize: 1) let rpc = HistoryQuery(startTime: Timestamp(2), endTime: Timestamp(5), pagingInfo: pinfo) @@ -696,57 +622,32 @@ procSuite "Waku Store": asyncTest "resume history from a list of candidate peers": - var offListenSwitch = newStandardSwitch(some(PrivateKey.random(ECDSA, rng[]).get())) + let offListenSwitch = newStandardSwitch(some(PrivateKey.random(ECDSA, rng[]).get())) + let (listenSwitch3, dialSwitch3, proto3) = await newTestWakuStore() - # starts a new node - var dialSwitch3 = newStandardSwitch() - await dialSwitch3.start() + ## When + let res = await proto3.resume(some(@[ + offListenSwitch.peerInfo.toRemotePeerInfo(), + listenSwitch.peerInfo.toRemotePeerInfo(), + listenSwitch2.peerInfo.toRemotePeerInfo() + ])) - let - database3 = SqliteDatabase.init("", inMemory = true)[] - store3 = WakuMessageStore.init(database3)[] - proto3 = WakuStore.init(PeerManager.new(dialSwitch3), crypto.newRng(), store3) + ## Then + # `proto3` is expected to retrieve 14 messages because: + # - the store mounted on `listenSwitch` holds 10 messages (`msgList`) + # - the store mounted on `listenSwitch2` holds 7 messages (see `msgList2`) + # - both stores share 3 messages, resulting in 14 unique messages in total + check res.isOk() - let successResult = await proto3.resume(some(@[offListenSwitch.peerInfo.toRemotePeerInfo(), - listenSwitch.peerInfo.toRemotePeerInfo(), - listenSwitch2.peerInfo.toRemotePeerInfo()])) + let response = res.tryGet() + check: + response == 14 + check: - # `proto3` is expected to retrieve 14 messages because: - # - the store mounted on `listenSwitch` holds 10 messages (`msgList`) - # - the store mounted on `listenSwitch2` holds 7 messages (see `msgList2`) - # - both stores share 3 messages, resulting in 14 unique messages in total proto3.messages.len == 14 - successResult.isOk - successResult.value == 14 - #free resources - await allFutures(dialSwitch3.stop(), - offListenSwitch.stop()) + ## Cleanup + await allFutures(listenSwitch3.stop(), dialSwitch3.stop(), offListenSwitch.stop()) - #free resources - await allFutures(dialSwitch.stop(), - dialSwitch2.stop(), - listenSwitch.stop()) - - - asyncTest "limit store capacity": - let - capacity = 10 - contentTopic = ContentTopic("/waku/2/default-content/proto") - pubsubTopic = "/waku/2/default-waku/proto" - - let store = WakuStore.init(PeerManager.new(newStandardSwitch()), crypto.newRng(), capacity = capacity) - - for i in 1..capacity: - await store.handleMessage(pubsubTopic, WakuMessage(payload: @[byte i], contentTopic: contentTopic, timestamp: Timestamp(i))) - await sleepAsync(1.millis) # Sleep a millisecond to ensure messages are stored chronologically - - check: - store.messages.len == capacity # Store is at capacity - - # Test that capacity holds - await store.handleMessage(pubsubTopic, WakuMessage(payload: @[byte (capacity + 1)], contentTopic: contentTopic, timestamp: Timestamp(capacity + 1))) - - check: - store.messages.len == capacity # Store is still at capacity - store.messages.last().get().msg.payload == @[byte (capacity + 1)] # Simple check to verify last added item is stored + ## Cleanup + await allFutures(dialSwitch.stop(), dialSwitch2.stop(), listenSwitch.stop(), listenSwitch2.stop()) diff --git a/waku/v2/protocol/waku_store/protocol.nim b/waku/v2/protocol/waku_store/protocol.nim index ccb3ed62d..f406eda55 100644 --- a/waku/v2/protocol/waku_store/protocol.nim +++ b/waku/v2/protocol/waku_store/protocol.nim @@ -54,6 +54,7 @@ const MaxRpcSize = StoreMaxPageSize * MaxWakuMessageSize + 64*1024 # We add a 64 # Error types (metric label values) const + storeFailure = "store_failure" dialFailure = "dial_failure" decodeRpcFailure = "decode_rpc_failure" peerNotFoundFailure = "peer_not_found_failure" @@ -202,8 +203,9 @@ proc setPeer*(ws: WakuStore, peer: RemotePeerInfo) = ws.peerManager.addPeer(peer, WakuStoreCodec) waku_store_peers.inc() + proc handleMessage*(w: WakuStore, topic: string, msg: WakuMessage) {.async.} = - if (not w.persistMessages): + if not w.persistMessages: # Store is mounted but new messages should not be stored return @@ -220,10 +222,10 @@ proc handleMessage*(w: WakuStore, topic: string, msg: WakuMessage) {.async.} = let addRes = w.messages.add(IndexedWakuMessage(msg: msg, index: index, pubsubTopic: topic)) - if addRes.isErr: + if addRes.isErr(): trace "Attempt to add message to store failed", msg=msg, index=index, err=addRes.error() waku_store_errors.inc(labelValues = [$(addRes.error())]) - return # Do not attempt to store in persistent DB + return waku_store_messages.set(w.messages.len.int64, labelValues = ["stored"]) @@ -231,16 +233,37 @@ proc handleMessage*(w: WakuStore, topic: string, msg: WakuMessage) {.async.} = return let res = w.store.put(index, msg, topic) - if res.isErr: - trace "failed to store messages", err = res.error - waku_store_errors.inc(labelValues = ["store_failure"]) + if res.isErr(): + trace "failed to store messages", err=res.error() + waku_store_errors.inc(labelValues = [storeFailure]) # TODO: Remove after converting the query method into a non-callback method type QueryHandlerFunc* = proc(response: HistoryResponse) {.gcsafe, closure.} -proc query*(w: WakuStore, query: HistoryQuery, handler: QueryHandlerFunc) {.async, gcsafe.} = - # @TODO We need to be more stratigic about which peers we dial. Right now we just set one on the service. +proc query(w: WakuStore, req: HistoryQuery, peer: RemotePeerInfo): Future[WakuStoreResult[HistoryResponse]] {.async, gcsafe.} = + let connOpt = await w.peerManager.dialPeer(peer, WakuStoreCodec) + if connOpt.isNone(): + waku_store_errors.inc(labelValues = [dialFailure]) + return err(dialFailure) + let connection = connOpt.get() + + let rpc = HistoryRPC(requestId: generateRequestId(w.rng), query: req) + await connection.writeLP(rpc.encode().buffer) + + var message = await connOpt.get().readLp(MaxRpcSize.int) + let response = HistoryRPC.init(message) + + if response.isErr(): + error "failed to decode response" + waku_store_errors.inc(labelValues = [decodeRpcFailure]) + return err(decodeRpcFailure) + + waku_store_messages.set(response.value.response.messages.len.int64, labelValues = ["retrieved"]) + return ok(response.value.response) + +proc query*(w: WakuStore, req: HistoryQuery): Future[WakuStoreResult[HistoryResponse]] {.async, gcsafe.} = + # TODO: We need to be more stratigic about which peers we dial. Right now we just set one on the service. # Ideally depending on the query and our set of peers we take a subset of ideal peers. # This will require us to check for various factors such as: # - which topics they track @@ -248,123 +271,98 @@ proc query*(w: WakuStore, query: HistoryQuery, handler: QueryHandlerFunc) {.asyn # - default store peer? let peerOpt = w.peerManager.selectPeer(WakuStoreCodec) - if peerOpt.isNone(): error "no suitable remote peers" waku_store_errors.inc(labelValues = [peerNotFoundFailure]) + return err(peerNotFoundFailure) + + return await w.query(req, peerOpt.get()) + + +proc query*(w: WakuStore, req: HistoryQuery, handler: QueryHandlerFunc) {.async, gcsafe, + deprecated: "Use the no-callback version of this method".} = + + let response = await w.query(req) + if response.isErr(): + error "history query failed", error=response.error() return - let connOpt = await w.peerManager.dialPeer(peerOpt.get(), WakuStoreCodec) - - if connOpt.isNone(): - # @TODO more sophisticated error handling here - error "failed to connect to remote peer" - waku_store_errors.inc(labelValues = [dialFailure]) - return - - await connOpt.get().writeLP(HistoryRPC(requestId: generateRequestId(w.rng), - query: query).encode().buffer) - - var message = await connOpt.get().readLp(MaxRpcSize.int) - let response = HistoryRPC.init(message) - - if response.isErr: - error "failed to decode response" - waku_store_errors.inc(labelValues = [decodeRpcFailure]) - return - - waku_store_messages.set(response.value.response.messages.len.int64, labelValues = ["retrieved"]) - handler(response.value.response) + handler(response.get()) ## 21/WAKU2-FAULT-TOLERANT-STORE -proc queryFrom*(w: WakuStore, query: HistoryQuery, handler: QueryHandlerFunc, peer: RemotePeerInfo): Future[WakuStoreResult[uint64]] {.async, gcsafe.} = - ## sends the query to the given peer - ## returns the number of retrieved messages if no error occurs, otherwise returns the error string - # TODO dialPeer add it to the list of known peers, while it does not cause any issue but might be unnecessary - let connOpt = await w.peerManager.dialPeer(peer, WakuStoreCodec) +proc queryFrom*(w: WakuStore, req: HistoryQuery, handler: QueryHandlerFunc, peer: RemotePeerInfo): Future[WakuStoreResult[uint64]] {.async, gcsafe, + deprecated: "Use the query() no-callback procedure instead".} = + ## Sends the query to the given peer. Returns the number of retrieved messages if no error occurs, otherwise returns the error string + # TODO: dialPeer add it to the list of known peers, while it does not cause any issue but might be unnecessary + let res = await w.query(req, peer) + if res.isErr(): + return err(res.error()) + + let response = res.get() - if connOpt.isNone(): - error "failed to connect to remote peer" - waku_store_errors.inc(labelValues = [dialFailure]) - return err("failed to connect to remote peer") - - await connOpt.get().writeLP(HistoryRPC(requestId: generateRequestId(w.rng), - query: query).encode().buffer) - debug "query is sent", query=query - var message = await connOpt.get().readLp(MaxRpcSize.int) - let response = HistoryRPC.init(message) - - debug "response is received" - - if response.isErr: - error "failed to decode response" - waku_store_errors.inc(labelValues = [decodeRpcFailure]) - return err("failed to decode response") - - - waku_store_messages.set(response.value.response.messages.len.int64, labelValues = ["retrieved"]) - handler(response.value.response) - return ok(response.value.response.messages.len.uint64) + handler(response) + return ok(response.messages.len.uint64) proc queryFromWithPaging*(w: WakuStore, query: HistoryQuery, peer: RemotePeerInfo): Future[WakuStoreResult[seq[WakuMessage]]] {.async, gcsafe.} = - ## a thin wrapper for queryFrom - ## sends the query to the given peer - ## when the query has a valid pagingInfo, it retrieves the historical messages in pages - ## returns all the fetched messages if no error occurs, otherwise returns an error string - debug "queryFromWithPaging is called" - var messageList: seq[WakuMessage] - # make a copy of the query - var q = query - debug "query is", q=q + ## A thin wrapper for query. Sends the query to the given peer. when the query has a valid pagingInfo, + ## it retrieves the historical messages in pages. + ## Returns all the fetched messages, if error occurs, returns an error string - var hasNextPage = true - proc handler(response: HistoryResponse) {.gcsafe.} = - # store messages - for m in response.messages.items: messageList.add(m) - - # check whether it is the last page - hasNextPage = (response.pagingInfo.pageSize != 0) - debug "hasNextPage", hasNextPage=hasNextPage + # Make a copy of the query + var req = query - # update paging cursor - q.pagingInfo.cursor = response.pagingInfo.cursor - debug "next paging info", pagingInfo=q.pagingInfo + var messageList: seq[WakuMessage] = @[] - # fetch the history in pages - while (hasNextPage): - let successResult = await w.queryFrom(q, handler, peer) - if not successResult.isOk: return err("failed to resolve the query") - debug "hasNextPage", hasNextPage=hasNextPage + # Fetch the history in pages + while true: + let res = await w.query(req, peer) + if res.isErr(): + return err(res.error()) + + let response = res.get() + + messageList.add(response.messages) + + # Check whether it is the last page + if response.pagingInfo.pageSize == 0: + break + + # Update paging cursor + req.pagingInfo.cursor = response.pagingInfo.cursor return ok(messageList) -proc queryLoop(w: WakuStore, query: HistoryQuery, candidateList: seq[RemotePeerInfo]): Future[WakuStoreResult[seq[WakuMessage]]] {.async, gcsafe.} = - ## loops through the candidateList in order and sends the query to each - ## once all responses have been received, the retrieved messages are consolidated into one deduplicated list - ## if no messages have been retrieved, the returned future will resolve into a MessagesResult result holding an empty seq. - var futureList: seq[Future[WakuStoreResult[seq[WakuMessage]]]] - for peer in candidateList.items: - futureList.add(w.queryFromWithPaging(query, peer)) - await allFutures(futureList) # all(), which returns a Future[seq[T]], has been deprecated +proc queryLoop(w: WakuStore, req: HistoryQuery, candidateList: seq[RemotePeerInfo]): Future[WakuStoreResult[seq[WakuMessage]]] {.async, gcsafe.} = + ## Loops through the peers candidate list in order and sends the query to each + ## + ## Once all responses have been received, the retrieved messages are consolidated into one deduplicated list. + ## if no messages have been retrieved, the returned future will resolve into a result holding an empty seq. + let queriesList = candidateList.mapIt(w.queryFromWithPaging(req, it)) - let messagesList = futureList + await allFutures(queriesList) + + let messagesList = queriesList .map(proc (fut: Future[WakuStoreResult[seq[WakuMessage]]]): seq[WakuMessage] = - if fut.completed() and fut.read().isOk(): # completed() just as a sanity check. These futures have been awaited before using allFutures() - fut.read().value - else: - @[] + # These futures have been awaited before using allFutures(). Call completed() just as a sanity check. + if not fut.completed() or fut.read().isErr(): + return @[] + + fut.read().value ) .concat() + .deduplicate() - if messagesList.len != 0: - return ok(messagesList.deduplicate()) - else: - debug "failed to resolve the query" + if messagesList.len == 0: return err("failed to resolve the query") -proc resume*(ws: WakuStore, peerList: Option[seq[RemotePeerInfo]] = none(seq[RemotePeerInfo]), pageSize: uint64 = DefaultPageSize): Future[WakuStoreResult[uint64]] {.async, gcsafe.} = + return ok(messagesList) + +proc resume*(w: WakuStore, + peerList: Option[seq[RemotePeerInfo]] = none(seq[RemotePeerInfo]), + pageSize: uint64 = DefaultPageSize, + pubsubTopic = DefaultTopic): Future[WakuStoreResult[uint64]] {.async, gcsafe.} = ## resume proc retrieves the history of waku messages published on the default waku pubsub topic since the last time the waku store node has been online ## messages are stored in the store node's messages field and in the message db ## the offline time window is measured as the difference between the current time and the timestamp of the most recent persisted waku message @@ -375,134 +373,126 @@ proc resume*(ws: WakuStore, peerList: Option[seq[RemotePeerInfo]] = none(seq[Rem ## if no peerList is passed, one of the peers in the underlying peer manager unit of the store protocol is picked randomly to fetch the history from. ## The history gets fetched successfully if the dialed peer has been online during the queried time window. ## the resume proc returns the number of retrieved messages if no error occurs, otherwise returns the error string - - + var lastSeenTime = Timestamp(0) var currentTime = getNanosecondTime(epochTime()) - debug "resume", currentEpochTime=currentTime - let lastSeenItem = ws.messages.last() + let lastSeenItem = w.messages.last() + if lastSeenItem.isOk(): + lastSeenTime = lastSeenItem.get().msg.timestamp + - var lastSeenTime = if lastSeenItem.isOk(): lastSeenItem.get().msg.timestamp - else: Timestamp(0) - # adjust the time window with an offset of 20 seconds let offset: Timestamp = getNanosecondTime(20) currentTime = currentTime + offset lastSeenTime = max(lastSeenTime - offset, 0) + debug "the offline time window is", lastSeenTime=lastSeenTime, currentTime=currentTime - let - pinfo = PagingInfo(direction:PagingDirection.FORWARD, pageSize: pageSize) - rpc = HistoryQuery(pubsubTopic: DefaultTopic, startTime: lastSeenTime, endTime: currentTime, pagingInfo: pinfo) - var dismissed: uint = 0 - var added: uint = 0 - proc save(msgList: seq[WakuMessage]) = - debug "save proc is called" - # exclude index from the comparison criteria + let req = HistoryQuery( + pubsubTopic: pubsubTopic, + startTime: lastSeenTime, + endTime: currentTime, + pagingInfo: PagingInfo( + direction:PagingDirection.FORWARD, + pageSize: pageSize + ) + ) - for msg in msgList: - let index = Index.compute( - msg, - receivedTime = getNanosecondTime(getTime().toUnixFloat()), - pubsubTopic = DefaultTopic - ) - - # check for duplicate messages - # TODO Should take pubsub topic into account if we are going to support topics rather than the DefaultTopic - if ws.messages.contains(index): - dismissed = dismissed + 1 - continue - - # store the new message - let indexedWakuMsg = IndexedWakuMessage(msg: msg, index: index, pubsubTopic: DefaultTopic) - - # store in db if exists - if not ws.store.isNil: - let res = ws.store.put(index, msg, DefaultTopic) - if res.isErr: - trace "failed to store messages", err = res.error - waku_store_errors.inc(labelValues = ["store_failure"]) - continue - - discard ws.messages.add(indexedWakuMsg) - added = added + 1 - - waku_store_messages.set(ws.messages.len.int64, labelValues = ["stored"]) - - debug "number of duplicate messages found in resume", dismissed=dismissed - debug "number of messages added via resume", added=added - - if peerList.isSome: + var res: WakuStoreResult[seq[WakuMessage]] + if peerList.isSome(): debug "trying the candidate list to fetch the history" - let successResult = await ws.queryLoop(rpc, peerList.get()) - if successResult.isErr: - debug "failed to resume the history from the list of candidates" - return err("failed to resume the history from the list of candidates") - debug "resume is done successfully" - save(successResult.value) - return ok(added) + res = await w.queryLoop(req, peerList.get()) + else: debug "no candidate list is provided, selecting a random peer" # if no peerList is set then query from one of the peers stored in the peer manager - let peerOpt = ws.peerManager.selectPeer(WakuStoreCodec) + let peerOpt = w.peerManager.selectPeer(WakuStoreCodec) if peerOpt.isNone(): warn "no suitable remote peers" waku_store_errors.inc(labelValues = [peerNotFoundFailure]) return err("no suitable remote peers") debug "a peer is selected from peer manager" - let remotePeerInfo = peerOpt.get() - let successResult = await ws.queryFromWithPaging(rpc, remotePeerInfo) - if successResult.isErr: - debug "failed to resume the history" - return err("failed to resume the history") - debug "resume is done successfully" - save(successResult.value) - return ok(added) + res = await w.queryFromWithPaging(req, peerOpt.get()) + if res.isErr(): + debug "failed to resume the history" + return err("failed to resume the history") + + + # Save the retrieved messages in the store + var dismissed: uint = 0 + var added: uint = 0 + + for msg in res.get(): + let index = Index.compute( + msg, + receivedTime = getNanosecondTime(getTime().toUnixFloat()), + pubsubTopic = pubsubTopic + ) + + # check for duplicate messages + # TODO: Should take pubsub topic into account if we are going to support topics rather than the DefaultTopic + if w.messages.contains(index): + dismissed.inc() + continue + + # store the new message + let resPut = w.messages.put(index, msg, pubsubTopic) + if resPut.isErr(): + trace "failed to store messages", err = resPut.error() + waku_store_errors.inc(labelValues = [storeFailure]) + continue + + # store in db if exists + if not w.store.isNil(): + let resPut = w.store.put(index, msg, pubsubTopic) + if resPut.isErr(): + trace "failed to store messages", err = resPut.error() + waku_store_errors.inc(labelValues = [storeFailure]) + continue + + added.inc() + + waku_store_messages.set(w.messages.len.int64, labelValues = ["stored"]) + + debug "resume finished successfully", addedMessages=added, dimissedMessages=dismissed + return ok(added) + + +## EXPERIMENTAL # NOTE: Experimental, maybe incorporate as part of query call -proc queryWithAccounting*(ws: WakuStore, query: HistoryQuery, handler: QueryHandlerFunc) {.async, gcsafe.} = - # @TODO We need to be more stratigic about which peers we dial. Right now we just set one on the service. +proc queryWithAccounting*(ws: WakuStore, req: HistoryQuery): Future[WakuStoreResult[HistoryResponse]] {.async, gcsafe.} = + let peerOpt = ws.peerManager.selectPeer(WakuStoreCodec) + if peerOpt.isNone(): + error "no suitable remote peers" + waku_store_errors.inc(labelValues = [peerNotFoundFailure]) + return err(peerNotFoundFailure) + + let res = await ws.query(req, peerOpt.get()) + if res.isErr(): + return err(res.error()) + + let response = res.get() + + # Perform accounting operation. Assumes wakuSwap protocol is mounted + ws.wakuSwap.debit(peerOpt.get().peerId, response.messages.len) + + return ok(response) + +proc queryWithAccounting*(ws: WakuStore, req: HistoryQuery, handler: QueryHandlerFunc) {.async, gcsafe, + deprecated: "Use the no-callback procedure instead".} = + # TODO: We need to be more stratigic about which peers we dial. Right now we just set one on the service. # Ideally depending on the query and our set of peers we take a subset of ideal peers. # This will require us to check for various factors such as: # - which topics they track # - latency? # - default store peer? - - let peerOpt = ws.peerManager.selectPeer(WakuStoreCodec) - - if peerOpt.isNone(): - error "no suitable remote peers" - waku_store_errors.inc(labelValues = [peerNotFoundFailure]) + let response = await ws.queryWithAccounting(req) + if response.isErr(): + error "history query failed", error=response.error() return - - let connOpt = await ws.peerManager.dialPeer(peerOpt.get(), WakuStoreCodec) - - if connOpt.isNone(): - # @TODO more sophisticated error handling here - error "failed to connect to remote peer" - waku_store_errors.inc(labelValues = [dialFailure]) - return - - await connOpt.get().writeLP(HistoryRPC(requestId: generateRequestId(ws.rng), - query: query).encode().buffer) - - var message = await connOpt.get().readLp(MaxRpcSize.int) - let response = HistoryRPC.init(message) - - if response.isErr: - error "failed to decode response" - waku_store_errors.inc(labelValues = [decodeRpcFailure]) - return - - # NOTE Perform accounting operation - # Assumes wakuSwap protocol is mounted - let remotePeerInfo = peerOpt.get() - let messages = response.value.response.messages - ws.wakuSwap.debit(remotePeerInfo.peerId, messages.len) - - waku_store_messages.set(response.value.response.messages.len.int64, labelValues = ["retrieved"]) - - handler(response.value.response) + + handler(response.get())