diff --git a/apps/wakunode2/wakunode2_setup_metrics.nim b/apps/wakunode2/wakunode2_setup_metrics.nim index e5819a244..b2f347a85 100644 --- a/apps/wakunode2/wakunode2_setup_metrics.nim +++ b/apps/wakunode2/wakunode2_setup_metrics.nim @@ -9,7 +9,7 @@ import metrics/chronos_httpserver import ../../waku/v2/protocol/waku_filter, - ../../waku/v2/protocol/waku_store, + ../../waku/v2/protocol/waku_store/protocol_metrics, ../../waku/v2/protocol/waku_lightpush, ../../waku/v2/protocol/waku_swap/waku_swap, ../../waku/v2/protocol/waku_peer_exchange, diff --git a/tests/all_tests_v2.nim b/tests/all_tests_v2.nim index 8e1efb2e6..801dfe3b5 100644 --- a/tests/all_tests_v2.nim +++ b/tests/all_tests_v2.nim @@ -13,6 +13,7 @@ import ./v2/test_message_store_sqlite, ./v2/test_waku_store_rpc_codec, ./v2/test_waku_store, + ./v2/test_waku_store_client, # TODO: Re-enable store resume test cases (#1282) # ./v2/test_waku_store_resume, ./v2/test_wakunode_store, diff --git a/tests/v2/test_waku_store.nim b/tests/v2/test_waku_store.nim index 31c96b782..3816872c1 100644 --- a/tests/v2/test_waku_store.nim +++ b/tests/v2/test_waku_store.nim @@ -11,6 +11,7 @@ import import ../../waku/v2/protocol/waku_message, ../../waku/v2/protocol/waku_store, + ../../waku/v2/protocol/waku_store/client, ../../waku/v2/node/storage/sqlite, ../../waku/v2/node/storage/message/waku_store_queue, ../../waku/v2/node/storage/message/sqlite_store, @@ -53,17 +54,24 @@ proc newTestMessageStore(): MessageStore = let database = newTestDatabase() SqliteStore.init(database).tryGet() -proc newTestWakuStore(switch: Switch, store=newTestMessageStore()): WakuStore = +proc newTestWakuStore(switch: Switch, store=newTestMessageStore()): Future[WakuStore] {.async.} = let peerManager = PeerManager.new(switch) rng = crypto.newRng() proto = WakuStore.init(peerManager, rng, store) - waitFor proto.start() + await proto.start() switch.mount(proto) return proto +proc newTestWakuStoreClient(switch: Switch, store: MessageStore = nil): WakuStoreClient = + let + peerManager = PeerManager.new(switch) + rng = crypto.newRng() + WakuStoreClient.new(peerManager, rng, store) + + procSuite "Waku Store - history query": ## Fixtures let storeA = block: @@ -96,10 +104,10 @@ procSuite "Waku Store - history query": await allFutures(serverSwitch.start(), clientSwitch.start()) let - serverProto = newTestWakuStore(serverSwitch) - clientProto = newTestWakuStore(clientSwitch) + server = await newTestWakuStore(serverSwitch) + client = newTestWakuStoreClient(clientSwitch) - clientProto.setPeer(serverSwitch.peerInfo.toRemotePeerInfo()) + client.setPeer(serverSwitch.peerInfo.toRemotePeerInfo()) ## Given @@ -108,12 +116,12 @@ procSuite "Waku Store - history query": msg1 = fakeWakuMessage(contentTopic=topic) msg2 = fakeWakuMessage() - serverProto.handleMessage("foo", msg1) - serverProto.handleMessage("foo", msg2) + server.handleMessage("foo", msg1) + server.handleMessage("foo", msg2) ## When let rpc = HistoryQuery(contentFilters: @[HistoryContentFilter(contentTopic: topic)]) - let resQuery = await clientProto.query(rpc) + let resQuery = await client.query(rpc) ## Then check: @@ -122,7 +130,7 @@ procSuite "Waku Store - history query": let response = resQuery.tryGet() check: response.messages.len == 1 - response.messages[0] == msg1 + response.messages == @[msg1] ## Cleanup await allFutures(serverSwitch.stop(), clientSwitch.stop()) @@ -136,10 +144,10 @@ procSuite "Waku Store - history query": await allFutures(serverSwitch.start(), clientSwitch.start()) let - serverProto = newTestWakuStore(serverSwitch) - clientProto = newTestWakuStore(clientSwitch) + server = await newTestWakuStore(serverSwitch) + client = newTestWakuStoreClient(clientSwitch) - clientProto.setPeer(serverSwitch.peerInfo.toRemotePeerInfo()) + client.setPeer(serverSwitch.peerInfo.toRemotePeerInfo()) ## Given let @@ -152,16 +160,16 @@ procSuite "Waku Store - history query": msg2 = fakeWakuMessage(contentTopic=topic2) msg3 = fakeWakuMessage(contentTopic=topic3) - serverProto.handleMessage("foo", msg1) - serverProto.handleMessage("foo", msg2) - serverProto.handleMessage("foo", msg3) + server.handleMessage("foo", msg1) + server.handleMessage("foo", msg2) + server.handleMessage("foo", msg3) ## When let rpc = HistoryQuery(contentFilters: @[ - HistoryContentFilter(contentTopic: topic1), + HistoryContentFilter(contentTopic: topic1), HistoryContentFilter(contentTopic: topic3) ]) - let resQuery = await clientProto.query(rpc) + let resQuery = await client.query(rpc) ## Then check: @@ -185,10 +193,10 @@ procSuite "Waku Store - history query": await allFutures(serverSwitch.start(), clientSwitch.start()) let - serverProto = newTestWakuStore(serverSwitch) - clientProto = newTestWakuStore(clientSwitch) + server = await newTestWakuStore(serverSwitch) + client = newTestWakuStoreClient(clientSwitch) - clientProto.setPeer(serverSwitch.peerInfo.toRemotePeerInfo()) + client.setPeer(serverSwitch.peerInfo.toRemotePeerInfo()) ## Given let @@ -205,9 +213,9 @@ procSuite "Waku Store - history query": msg2 = fakeWakuMessage(contentTopic=contentTopic2) msg3 = fakeWakuMessage(contentTopic=contentTopic3) - serverProto.handleMessage(pubsubtopic1, msg1) - serverProto.handleMessage(pubsubtopic2, msg2) - serverProto.handleMessage(pubsubtopic2, msg3) + server.handleMessage(pubsubtopic1, msg1) + server.handleMessage(pubsubtopic2, msg2) + server.handleMessage(pubsubtopic2, msg3) ## When # this query targets: pubsubtopic1 AND (contentTopic1 OR contentTopic3) @@ -216,7 +224,7 @@ procSuite "Waku Store - history query": HistoryContentFilter(contentTopic: contentTopic3)], pubsubTopic: pubsubTopic1 ) - let resQuery = await clientProto.query(rpc) + let resQuery = await client.query(rpc) ## Then check: @@ -239,10 +247,10 @@ procSuite "Waku Store - history query": await allFutures(serverSwitch.start(), clientSwitch.start()) let - serverProto = newTestWakuStore(serverSwitch) - clientProto = newTestWakuStore(clientSwitch) + server = await newTestWakuStore(serverSwitch) + client = newTestWakuStoreClient(clientSwitch) - clientProto.setPeer(serverSwitch.peerInfo.toRemotePeerInfo()) + client.setPeer(serverSwitch.peerInfo.toRemotePeerInfo()) ## Given let @@ -254,13 +262,13 @@ procSuite "Waku Store - history query": msg2 = fakeWakuMessage() msg3 = fakeWakuMessage() - serverProto.handleMessage(pubsubtopic2, msg1) - serverProto.handleMessage(pubsubtopic2, msg2) - serverProto.handleMessage(pubsubtopic2, msg3) + server.handleMessage(pubsubtopic2, msg1) + server.handleMessage(pubsubtopic2, msg2) + server.handleMessage(pubsubtopic2, msg3) ## When let rpc = HistoryQuery(pubsubTopic: pubsubTopic1) - let res = await clientProto.query(rpc) + let res = await client.query(rpc) ## Then check: @@ -282,10 +290,10 @@ procSuite "Waku Store - history query": await allFutures(serverSwitch.start(), clientSwitch.start()) let - serverProto = newTestWakuStore(serverSwitch) - clientProto = newTestWakuStore(clientSwitch) + server = await newTestWakuStore(serverSwitch) + client = newTestWakuStoreClient(clientSwitch) - clientProto.setPeer(serverSwitch.peerInfo.toRemotePeerInfo()) + client.setPeer(serverSwitch.peerInfo.toRemotePeerInfo()) ## Given let pubsubTopic = "queried-topic" @@ -295,13 +303,13 @@ procSuite "Waku Store - history query": msg2 = fakeWakuMessage(payload="TEST-2") msg3 = fakeWakuMessage(payload="TEST-3") - serverProto.handleMessage(pubsubTopic, msg1) - serverProto.handleMessage(pubsubTopic, msg2) - serverProto.handleMessage(pubsubTopic, msg3) + server.handleMessage(pubsubTopic, msg1) + server.handleMessage(pubsubTopic, msg2) + server.handleMessage(pubsubTopic, msg3) ## When let rpc = HistoryQuery(pubsubTopic: pubsubTopic) - let res = await clientProto.query(rpc) + let res = await client.query(rpc) ## Then check: @@ -326,10 +334,10 @@ procSuite "Waku Store - history query": await allFutures(serverSwitch.start(), clientSwitch.start()) let - serverProto = newTestWakuStore(serverSwitch) - clientProto = newTestWakuStore(clientSwitch) + server = await newTestWakuStore(serverSwitch) + client = newTestWakuStoreClient(clientSwitch) - clientProto.setPeer(serverSwitch.peerInfo.toRemotePeerInfo()) + client.setPeer(serverSwitch.peerInfo.toRemotePeerInfo()) ## Given let currentTime = getNanosecondTime(getTime().toUnixFloat()) @@ -347,14 +355,14 @@ procSuite "Waku Store - history query": ] for msg in msgList: - require serverProto.store.put(DefaultPubsubTopic, msg).isOk() + require server.store.put(DefaultPubsubTopic, msg).isOk() ## When var rpc = HistoryQuery( - contentFilters: @[HistoryContentFilter(contentTopic: DefaultContentTopic)], - pagingInfo: PagingInfo(pageSize: 2, direction: PagingDirection.FORWARD) - ) - var res = await clientProto.query(rpc) + contentFilters: @[HistoryContentFilter(contentTopic: DefaultContentTopic)], + pagingInfo: PagingInfo(pageSize: 2, direction: PagingDirection.FORWARD) + ) + var res = await client.query(rpc) require res.isOk() var @@ -366,13 +374,13 @@ procSuite "Waku Store - history query": require: totalQueries <= 4 # Sanity check here and guarantee that the test will not run forever response.messages.len() == 2 - response.pagingInfo.pageSize == 2 + response.pagingInfo.pageSize == 2 response.pagingInfo.direction == PagingDirection.FORWARD rpc.pagingInfo = response.pagingInfo # Continue querying - res = await clientProto.query(rpc) + res = await client.query(rpc) require res.isOk() response = res.tryGet() totalMessages += response.messages.len() @@ -395,10 +403,10 @@ procSuite "Waku Store - history query": await allFutures(serverSwitch.start(), clientSwitch.start()) let - serverProto = newTestWakuStore(serverSwitch) - clientProto = newTestWakuStore(clientSwitch) + server = await newTestWakuStore(serverSwitch) + client = newTestWakuStoreClient(clientSwitch) - clientProto.setPeer(serverSwitch.peerInfo.toRemotePeerInfo()) + client.setPeer(serverSwitch.peerInfo.toRemotePeerInfo()) ## Given let currentTime = getNanosecondTime(getTime().toUnixFloat()) @@ -416,16 +424,16 @@ procSuite "Waku Store - history query": ] for msg in msgList: - require serverProto.store.put(DefaultPubsubTopic, msg).isOk() + require server.store.put(DefaultPubsubTopic, msg).isOk() ## When var rpc = HistoryQuery( - contentFilters: @[HistoryContentFilter(contentTopic: DefaultContentTopic)], - pagingInfo: PagingInfo(pageSize: 2, direction: PagingDirection.BACKWARD) - ) - var res = await clientProto.query(rpc) + contentFilters: @[HistoryContentFilter(contentTopic: DefaultContentTopic)], + pagingInfo: PagingInfo(pageSize: 2, direction: PagingDirection.BACKWARD) + ) + var res = await client.query(rpc) require res.isOk() - + var response = res.tryGet() totalMessages = response.messages.len() @@ -435,13 +443,13 @@ procSuite "Waku Store - history query": require: totalQueries <= 4 # Sanity check here and guarantee that the test will not run forever response.messages.len() == 2 - response.pagingInfo.pageSize == 2 + response.pagingInfo.pageSize == 2 response.pagingInfo.direction == PagingDirection.BACKWARD rpc.pagingInfo = response.pagingInfo # Continue querying - res = await clientProto.query(rpc) + res = await client.query(rpc) require res.isOk() response = res.tryGet() totalMessages += response.messages.len() @@ -464,10 +472,10 @@ procSuite "Waku Store - history query": await allFutures(serverSwitch.start(), clientSwitch.start()) let - serverProto = newTestWakuStore(serverSwitch) - clientProto = newTestWakuStore(clientSwitch) + server = await newTestWakuStore(serverSwitch) + client = newTestWakuStoreClient(clientSwitch) - clientProto.setPeer(serverSwitch.peerInfo.toRemotePeerInfo()) + client.setPeer(serverSwitch.peerInfo.toRemotePeerInfo()) ## Given let msgList = @[ @@ -484,11 +492,11 @@ procSuite "Waku Store - history query": ] for msg in msgList: - require serverProto.store.put(DefaultPubsubTopic, msg).isOk() + require server.store.put(DefaultPubsubTopic, msg).isOk() ## When let rpc = HistoryQuery(contentFilters: @[HistoryContentFilter(contentTopic: DefaultContentTopic)]) - let res = await clientProto.query(rpc) + let res = await client.query(rpc) ## Then check: @@ -514,7 +522,7 @@ procSuite "Waku Store - history query": let server = newTestWakuStore(serverSwitch, store=storeA) - client = newTestWakuStore(clientSwitch) + client = newTestWakuStoreClient(clientSwitch) client.setPeer(serverSwitch.peerInfo.toRemotePeerInfo()) @@ -539,7 +547,7 @@ procSuite "Waku Store - history query": ## Cleanup await allFutures(clientSwitch.stop(), serverSwitch.stop()) - + asyncTest "handle temporal history query with a zero-size time window": # a zero-size window results in an empty list of history messages ## Setup @@ -550,8 +558,8 @@ procSuite "Waku Store - history query": await allFutures(serverSwitch.start(), clientSwitch.start()) let - server = newTestWakuStore(serverSwitch, store=storeA) - client = newTestWakuStore(clientSwitch) + server = await newTestWakuStore(serverSwitch, store=storeA) + client = newTestWakuStoreClient(clientSwitch) client.setPeer(serverSwitch.peerInfo.toRemotePeerInfo()) @@ -585,8 +593,8 @@ procSuite "Waku Store - history query": await allFutures(serverSwitch.start(), clientSwitch.start()) let - server = newTestWakuStore(serverSwitch, store=storeA) - client = newTestWakuStore(clientSwitch) + server = await newTestWakuStore(serverSwitch, store=storeA) + client = newTestWakuStoreClient(clientSwitch) client.setPeer(serverSwitch.peerInfo.toRemotePeerInfo()) @@ -617,7 +625,7 @@ suite "Waku Store - message handling": ## Setup let store = StoreQueueRef.new(5) let switch = newTestSwitch() - let proto = newTestWakuStore(switch, store) + let proto = await newTestWakuStore(switch, store) ## Given let validSenderTime = now() @@ -637,7 +645,7 @@ suite "Waku Store - message handling": ## Setup let store = StoreQueueRef.new(10) let switch = newTestSwitch() - let proto = newTestWakuStore(switch, store) + let proto = await newTestWakuStore(switch, store) ## Given let msgList = @[ @@ -663,7 +671,7 @@ suite "Waku Store - message handling": ## Setup let store = StoreQueueRef.new(5) let switch = newTestSwitch() - let proto = newTestWakuStore(switch, store) + let proto = await newTestWakuStore(switch, store) ## Given let invalidSenderTime = 0 @@ -683,7 +691,7 @@ suite "Waku Store - message handling": ## Setup let store = StoreQueueRef.new(5) let switch = newTestSwitch() - let proto = newTestWakuStore(switch, store) + let proto = await newTestWakuStore(switch, store) ## Given let @@ -706,7 +714,7 @@ suite "Waku Store - message handling": ## Setup let store = StoreQueueRef.new(5) let switch = newTestSwitch() - let proto = newTestWakuStore(switch, store) + let proto = await newTestWakuStore(switch, store) ## Given let diff --git a/tests/v2/test_waku_store_client.nim b/tests/v2/test_waku_store_client.nim new file mode 100644 index 000000000..07b667292 --- /dev/null +++ b/tests/v2/test_waku_store_client.nim @@ -0,0 +1,273 @@ +{.used.} + +import + std/[options, tables, sets, times], + stew/byteutils, + testutils/unittests, + chronos, + chronicles, + libp2p/switch, + libp2p/crypto/crypto +import + ../../waku/v2/protocol/waku_message, + ../../waku/v2/protocol/waku_store, + ../../waku/v2/protocol/waku_store/client, + ../../waku/v2/protocol/waku_store/protocol_metrics, + ../../waku/v2/node/storage/sqlite, + ../../waku/v2/node/storage/message/sqlite_store, + ../../waku/v2/node/peer_manager/peer_manager, + ../../waku/v2/utils/time, + ../test_helpers + + +const + DefaultPubsubTopic = "/waku/2/default-waku/proto" + DefaultContentTopic = ContentTopic("/waku/2/default-content/proto") + + +proc now(): Timestamp = + getNanosecondTime(getTime().toUnixFloat()) + +proc newTestDatabase(): SqliteDatabase = + SqliteDatabase.init("", inMemory = true).tryGet() + +proc fakeWakuMessage( + payload = toBytes("TEST-PAYLOAD"), + contentTopic = DefaultContentTopic, + ts = now(), + ephemeral = false, +): WakuMessage = + WakuMessage( + payload: payload, + contentTopic: contentTopic, + version: 1, + timestamp: ts, + ephemeral: ephemeral, + ) + +proc newTestSwitch(key=none(PrivateKey), address=none(MultiAddress)): Switch = + let peerKey = key.get(PrivateKey.random(ECDSA, rng[]).get()) + let peerAddr = address.get(MultiAddress.init("/ip4/127.0.0.1/tcp/0").get()) + return newStandardSwitch(some(peerKey), addrs=peerAddr) + +proc newTestStore(): MessageStore = + let database = newTestDatabase() + SqliteStore.init(database).tryGet() + +proc newTestWakuStore(switch: Switch, store=newTestStore()): WakuStore = + let + peerManager = PeerManager.new(switch) + rng = crypto.newRng() + proto = WakuStore.init(peerManager, rng, store) + + waitFor proto.start() + switch.mount(proto) + + return proto + +proc newTestWakuStoreClient(switch: Switch, store: MessageStore = nil): WakuStoreClient = + let + peerManager = PeerManager.new(switch) + rng = crypto.newRng() + WakuStoreClient.new(peerManager, rng, store) + + +procSuite "Waku Store Client": + + ## Fixtures + let testStore = block: + let store = newTestStore() + let msgList = @[ + fakeWakuMessage(payload= @[byte 0], contentTopic=ContentTopic("0")), + fakeWakuMessage(payload= @[byte 1], contentTopic=DefaultContentTopic), + fakeWakuMessage(payload= @[byte 2], contentTopic=DefaultContentTopic), + fakeWakuMessage(payload= @[byte 3], contentTopic=DefaultContentTopic), + fakeWakuMessage(payload= @[byte 4], contentTopic=DefaultContentTopic), + fakeWakuMessage(payload= @[byte 5], contentTopic=DefaultContentTopic), + fakeWakuMessage(payload= @[byte 6], contentTopic=DefaultContentTopic), + fakeWakuMessage(payload= @[byte 7], contentTopic=DefaultContentTopic), + fakeWakuMessage(payload= @[byte 8], contentTopic=DefaultContentTopic), + fakeWakuMessage(payload= @[byte 9], contentTopic=ContentTopic("9")), + fakeWakuMessage(payload= @[byte 10], contentTopic=DefaultContentTopic), + fakeWakuMessage(payload= @[byte 11], contentTopic=ContentTopic("11")), + fakeWakuMessage(payload= @[byte 12], contentTopic=DefaultContentTopic), + ] + + for msg in msgList: + assert store.put(DefaultPubsubTopic, msg).isOk() + + store + + asyncTest "single query to peer": + ## Setup + let + serverSwitch = newTestSwitch() + clientSwitch = newTestSwitch() + + await allFutures(serverSwitch.start(), clientSwitch.start()) + + let + server = newTestWakuStore(serverSwitch, store=testStore) + client = newTestWakuStoreClient(clientSwitch) + + ## Given + let peer = serverSwitch.peerInfo.toRemotePeerInfo() + let rpc = HistoryQuery( + contentFilters: @[HistoryContentFilter(contentTopic: DefaultContentTopic)], + pagingInfo: PagingInfo(pageSize: 8) + ) + + ## When + let res = await client.query(rpc, peer) + + ## Then + check: + res.isOk() + + let response = res.tryGet() + check: + ## No pagination specified. Response will be auto-paginated with + ## up to MaxPageSize messages per page. + response.messages.len() == 8 + response.pagingInfo != PagingInfo() + + ## Cleanup + await allFutures(clientSwitch.stop(), serverSwitch.stop()) + + asyncTest "multiple query to peer with pagination": + ## Setup + let + serverSwitch = newTestSwitch() + clientSwitch = newTestSwitch() + + await allFutures(serverSwitch.start(), clientSwitch.start()) + + let + server = newTestWakuStore(serverSwitch, store=testStore) + client = newTestWakuStoreClient(clientSwitch) + + ## Given + let peer = serverSwitch.peerInfo.toRemotePeerInfo() + let rpc = HistoryQuery( + contentFilters: @[HistoryContentFilter(contentTopic: DefaultContentTopic)], + pagingInfo: PagingInfo(pageSize: 5) + ) + + ## When + let res = await client.queryWithPaging(rpc, peer) + + ## Then + check: + res.isOk() + + let response = res.tryGet() + check: + response.len == 10 + + ## Cleanup + await allFutures(clientSwitch.stop(), serverSwitch.stop()) + + asyncTest "multiple query to multiple peers with pagination": + ## Setup + let + serverSwitchA = newTestSwitch() + serverSwitchB = newTestSwitch() + clientSwitch = newTestSwitch() + + await allFutures(serverSwitchA.start(), serverSwitchB.start(), clientSwitch.start()) + + let + serverA = newTestWakuStore(serverSwitchA, store=testStore) + serverB = newTestWakuStore(serverSwitchB, store=testStore) + client = newTestWakuStoreClient(clientSwitch) + + ## Given + let peers = @[ + serverSwitchA.peerInfo.toRemotePeerInfo(), + serverSwitchB.peerInfo.toRemotePeerInfo() + ] + let rpc = HistoryQuery( + contentFilters: @[HistoryContentFilter(contentTopic: DefaultContentTopic)], + pagingInfo: PagingInfo(pageSize: 5) + ) + + ## When + let res = await client.queryLoop(rpc, peers) + + ## Then + check: + res.isOk() + + let response = res.tryGet() + check: + response.len == 10 + + ## Cleanup + await allFutures(clientSwitch.stop(), serverSwitchA.stop(), serverSwitchB.stop()) + + asyncTest "single query with no pre-configured store peer should fail": + ## Setup + let + serverSwitch = newTestSwitch() + clientSwitch = newTestSwitch() + + await allFutures(serverSwitch.start(), clientSwitch.start()) + + let + server = newTestWakuStore(serverSwitch, store=testStore) + client = newTestWakuStoreClient(clientSwitch) + + ## Given + let rpc = HistoryQuery( + contentFilters: @[HistoryContentFilter(contentTopic: DefaultContentTopic)], + pagingInfo: PagingInfo(pageSize: 8) + ) + + ## When + let res = await client.query(rpc) + + ## Then + check: + res.isErr() + res.error == peerNotFoundFailure + + ## Cleanup + await allFutures(clientSwitch.stop(), serverSwitch.stop()) + + asyncTest "single query to pre-configured store peer": + ## Setup + let + serverSwitch = newTestSwitch() + clientSwitch = newTestSwitch() + + await allFutures(serverSwitch.start(), clientSwitch.start()) + + let + server = newTestWakuStore(serverSwitch, store=testStore) + client = newTestWakuStoreClient(clientSwitch) + + ## Given + let peer = serverSwitch.peerInfo.toRemotePeerInfo() + let rpc = HistoryQuery( + contentFilters: @[HistoryContentFilter(contentTopic: DefaultContentTopic)], + pagingInfo: PagingInfo(pageSize: 8) + ) + + ## When + client.setPeer(peer) + + let res = await client.query(rpc) + + ## Then + check: + res.isOk() + + let response = res.tryGet() + check: + ## No pagination specified. Response will be auto-paginated with + ## up to MaxPageSize messages per page. + response.messages.len() == 8 + response.pagingInfo != PagingInfo() + + ## Cleanup + await allFutures(clientSwitch.stop(), serverSwitch.stop()) diff --git a/waku/v2/protocol/waku_store/client.nim b/waku/v2/protocol/waku_store/client.nim new file mode 100644 index 000000000..7a1934714 --- /dev/null +++ b/waku/v2/protocol/waku_store/client.nim @@ -0,0 +1,238 @@ +{.push raises: [Defect].} + +import + std/[options, sequtils, times], + stew/results, + chronicles, + chronos, + metrics, + bearssl/rand +import + ../../node/peer_manager/peer_manager, + ../../utils/requests, + ../../utils/time, + ../waku_message, + ../waku_swap/waku_swap, + ./protocol, + ./protocol_metrics, + ./pagination, + ./rpc, + ./rpc_codec, + ./message_store + + +logScope: + topics = "wakustore.client" + + +type WakuStoreClient* = ref object + peerManager: PeerManager + rng: ref rand.HmacDrbgContext + store: MessageStore + wakuSwap: WakuSwap + +proc new*(T: type WakuStoreClient, + peerManager: PeerManager, + rng: ref rand.HmacDrbgContext, + store: MessageStore, + wakuSwap: WakuSwap = nil): T = + WakuStoreClient(peerManager: peerManager, rng: rng, store: store, wakuSwap: wakuSwap) + + +proc query*(w: WakuStoreClient, req: HistoryQuery, peer: RemotePeerInfo): Future[WakuStoreResult[HistoryResponse]] {.async, gcsafe.} = + let connOpt = await w.peerManager.dialPeer(peer, WakuStoreCodec) + if connOpt.isNone(): + waku_store_errors.inc(labelValues = [dialFailure]) + return err(dialFailure) + let connection = connOpt.get() + + let rpc = HistoryRPC(requestId: generateRequestId(w.rng), query: req) + await connection.writeLP(rpc.encode().buffer) + + var message = await connOpt.get().readLp(MaxRpcSize.int) + let response = HistoryRPC.init(message) + + if response.isErr(): + error "failed to decode response" + waku_store_errors.inc(labelValues = [decodeRpcFailure]) + return err(decodeRpcFailure) + + return ok(response.value.response) + +proc queryWithPaging*(w: WakuStoreClient, query: HistoryQuery, peer: RemotePeerInfo): Future[WakuStoreResult[seq[WakuMessage]]] {.async, gcsafe.} = + ## A thin wrapper for query. Sends the query to the given peer. when the query has a valid pagingInfo, + ## it retrieves the historical messages in pages. + ## Returns all the fetched messages, if error occurs, returns an error string + + # Make a copy of the query + var req = query + + var messageList: seq[WakuMessage] = @[] + + while true: + let res = await w.query(req, peer) + if res.isErr(): + return err(res.error) + + let response = res.get() + + messageList.add(response.messages) + + # Check whether it is the last page + if response.pagingInfo == PagingInfo(): + break + + # Update paging cursor + req.pagingInfo.cursor = response.pagingInfo.cursor + + return ok(messageList) + +proc queryLoop*(w: WakuStoreClient, req: HistoryQuery, peers: seq[RemotePeerInfo]): Future[WakuStoreResult[seq[WakuMessage]]] {.async, gcsafe.} = + ## Loops through the peers candidate list in order and sends the query to each + ## + ## Once all responses have been received, the retrieved messages are consolidated into one deduplicated list. + ## if no messages have been retrieved, the returned future will resolve into a result holding an empty seq. + let queryFuturesList = peers.mapIt(w.queryWithPaging(req, it)) + + await allFutures(queryFuturesList) + + let messagesList = queryFuturesList + .map(proc (fut: Future[WakuStoreResult[seq[WakuMessage]]]): seq[WakuMessage] = + # These futures have been awaited before using allFutures(). Call completed() just as a sanity check. + if not fut.completed() or fut.read().isErr(): + return @[] + + fut.read().value + ) + .concat() + .deduplicate() + + return ok(messagesList) + + +### Set store peer and query for messages + +proc setPeer*(ws: WakuStoreClient, peer: RemotePeerInfo) = + ws.peerManager.addPeer(peer, WakuStoreCodec) + +proc query*(w: WakuStoreClient, req: HistoryQuery): Future[WakuStoreResult[HistoryResponse]] {.async, gcsafe.} = + # TODO: We need to be more stratigic about which peers we dial. Right now we just set one on the service. + # Ideally depending on the query and our set of peers we take a subset of ideal peers. + # This will require us to check for various factors such as: + # - which topics they track + # - latency? + # - default store peer? + + let peerOpt = w.peerManager.selectPeer(WakuStoreCodec) + if peerOpt.isNone(): + error "no suitable remote peers" + waku_store_errors.inc(labelValues = [peerNotFoundFailure]) + return err(peerNotFoundFailure) + + return await w.query(req, peerOpt.get()) + + +## Resume store + +const StoreResumeTimeWindowOffset: Timestamp = getNanosecondTime(20) ## Adjust the time window with an offset of 20 seconds + +proc resume*(w: WakuStoreClient, + peerList = none(seq[RemotePeerInfo]), + pageSize = DefaultPageSize, + pubsubTopic = DefaultTopic): Future[WakuStoreResult[uint64]] {.async, gcsafe.} = + ## resume proc retrieves the history of waku messages published on the default waku pubsub topic since the last time the waku store node has been online + ## messages are stored in the store node's messages field and in the message db + ## the offline time window is measured as the difference between the current time and the timestamp of the most recent persisted waku message + ## an offset of 20 second is added to the time window to count for nodes asynchrony + ## peerList indicates the list of peers to query from. + ## The history is fetched from all available peers in this list and then consolidated into one deduplicated list. + ## Such candidates should be found through a discovery method (to be developed). + ## if no peerList is passed, one of the peers in the underlying peer manager unit of the store protocol is picked randomly to fetch the history from. + ## The history gets fetched successfully if the dialed peer has been online during the queried time window. + ## the resume proc returns the number of retrieved messages if no error occurs, otherwise returns the error string + + # If store has not been provided, don't even try + if w.store.isNil(): + return err("store not provided (nil)") + + # NOTE: Original implementation is based on the message's sender timestamp. At the moment + # of writing, the sqlite store implementation returns the last message's receiver + # timestamp. + # lastSeenTime = lastSeenItem.get().msg.timestamp + let + lastSeenTime = w.store.getNewestMessageTimestamp().get(Timestamp(0)) + now = getNanosecondTime(getTime().toUnixFloat()) + + debug "resuming with offline time window", lastSeenTime=lastSeenTime, currentTime=now + + let + queryEndTime = now + StoreResumeTimeWindowOffset + queryStartTime = max(lastSeenTime - StoreResumeTimeWindowOffset, 0) + + let req = HistoryQuery( + pubsubTopic: pubsubTopic, + startTime: queryStartTime, + endTime: queryEndTime, + pagingInfo: PagingInfo( + direction:PagingDirection.FORWARD, + pageSize: uint64(pageSize) + ) + ) + + var res: WakuStoreResult[seq[WakuMessage]] + if peerList.isSome(): + debug "trying the candidate list to fetch the history" + res = await w.queryLoop(req, peerList.get()) + + else: + debug "no candidate list is provided, selecting a random peer" + # if no peerList is set then query from one of the peers stored in the peer manager + let peerOpt = w.peerManager.selectPeer(WakuStoreCodec) + if peerOpt.isNone(): + warn "no suitable remote peers" + waku_store_errors.inc(labelValues = [peerNotFoundFailure]) + return err("no suitable remote peers") + + debug "a peer is selected from peer manager" + res = await w.queryWithPaging(req, peerOpt.get()) + + if res.isErr(): + debug "failed to resume the history" + return err("failed to resume the history") + + + # Save the retrieved messages in the store + var added: uint = 0 + for msg in res.get(): + let putStoreRes = w.store.put(pubsubTopic, msg) + if putStoreRes.isErr(): + continue + + added.inc() + + return ok(added) + + +## EXPERIMENTAL + +# NOTE: Experimental, maybe incorporate as part of query call +proc queryWithAccounting*(w: WakuStoreClient, req: HistoryQuery): Future[WakuStoreResult[HistoryResponse]] {.async, gcsafe.} = + if w.wakuSwap.isNil(): + return err("waku swap not fount (nil)") + + let peerOpt = w.peerManager.selectPeer(WakuStoreCodec) + if peerOpt.isNone(): + error "no suitable remote peers" + waku_store_errors.inc(labelValues = [peerNotFoundFailure]) + return err(peerNotFoundFailure) + + let queryRes = await w.query(req, peerOpt.get()) + if queryRes.isErr(): + return err(queryRes.error) + + let response = queryRes.get() + + # Perform accounting operation. Assumes wakuSwap protocol is mounted + w.wakuSwap.debit(peerOpt.get().peerId, response.messages.len) + + return ok(response) \ No newline at end of file diff --git a/waku/v2/protocol/waku_store/protocol.nim b/waku/v2/protocol/waku_store/protocol.nim index 419d163b4..0c6d67bef 100644 --- a/waku/v2/protocol/waku_store/protocol.nim +++ b/waku/v2/protocol/waku_store/protocol.nim @@ -25,16 +25,10 @@ import ./rpc, ./rpc_codec, ./pagination, - ./message_store + ./message_store, + ./protocol_metrics -declarePublicGauge waku_store_messages, "number of historical messages", ["type"] -declarePublicGauge waku_store_peers, "number of store peers" -declarePublicGauge waku_store_errors, "number of store protocol errors", ["type"] -declarePublicGauge waku_store_queries, "number of store queries received" -declarePublicHistogram waku_store_insert_duration_seconds, "message insertion duration" -declarePublicHistogram waku_store_query_duration_seconds, "history query duration" - logScope: topics = "wakustore" @@ -47,16 +41,6 @@ const MaxMessageTimestampVariance* = getNanoSecondTime(20) # 20 seconds maximum allowable sender timestamp "drift" -# Error types (metric label values) -const - invalidMessage = "invalid_message" - insertFailure = "insert_failure" - retPolicyFailure = "retpolicy_failure" - dialFailure = "dial_failure" - decodeRpcFailure = "decode_rpc_failure" - peerNotFoundFailure = "peer_not_found_failure" - - type WakuStoreResult*[T] = Result[T, string] @@ -291,11 +275,13 @@ proc handleMessage*(w: WakuStore, pubsubTopic: string, msg: WakuMessage) = ## CLIENT # TODO: This should probably be an add function and append the peer to an array -proc setPeer*(ws: WakuStore, peer: RemotePeerInfo) = +proc setPeer*(ws: WakuStore, peer: RemotePeerInfo) {. + deprecated: "use waku_store/client methods instead".} = ws.peerManager.addPeer(peer, WakuStoreCodec) waku_store_peers.inc() -proc query(w: WakuStore, req: HistoryQuery, peer: RemotePeerInfo): Future[WakuStoreResult[HistoryResponse]] {.async, gcsafe.} = +proc query(w: WakuStore, req: HistoryQuery, peer: RemotePeerInfo): Future[WakuStoreResult[HistoryResponse]] {.async, gcsafe, + deprecated: "use waku_store/client methods instead".} = let connOpt = await w.peerManager.dialPeer(peer, WakuStoreCodec) if connOpt.isNone(): waku_store_errors.inc(labelValues = [dialFailure]) @@ -316,7 +302,8 @@ proc query(w: WakuStore, req: HistoryQuery, peer: RemotePeerInfo): Future[WakuSt waku_store_messages.set(response.value.response.messages.len.int64, labelValues = ["retrieved"]) return ok(response.value.response) -proc query*(w: WakuStore, req: HistoryQuery): Future[WakuStoreResult[HistoryResponse]] {.async, gcsafe.} = +proc query*(w: WakuStore, req: HistoryQuery): Future[WakuStoreResult[HistoryResponse]] {.async, gcsafe, + deprecated: "use waku_store/client methods instead".} = # TODO: We need to be more stratigic about which peers we dial. Right now we just set one on the service. # Ideally depending on the query and our set of peers we take a subset of ideal peers. # This will require us to check for various factors such as: @@ -337,7 +324,8 @@ proc query*(w: WakuStore, req: HistoryQuery): Future[WakuStoreResult[HistoryResp const StoreResumeTimeWindowOffset: Timestamp = getNanosecondTime(20) ## Adjust the time window with an offset of 20 seconds -proc queryFromWithPaging*(w: WakuStore, query: HistoryQuery, peer: RemotePeerInfo): Future[WakuStoreResult[seq[WakuMessage]]] {.async, gcsafe.} = +proc queryFromWithPaging*(w: WakuStore, query: HistoryQuery, peer: RemotePeerInfo): Future[WakuStoreResult[seq[WakuMessage]]] {.async, gcsafe, + deprecated: "use waku_store/client methods instead".} = ## A thin wrapper for query. Sends the query to the given peer. when the query has a valid pagingInfo, ## it retrieves the historical messages in pages. ## Returns all the fetched messages, if error occurs, returns an error string @@ -366,7 +354,8 @@ proc queryFromWithPaging*(w: WakuStore, query: HistoryQuery, peer: RemotePeerInf return ok(messageList) -proc queryLoop(w: WakuStore, req: HistoryQuery, candidateList: seq[RemotePeerInfo]): Future[WakuStoreResult[seq[WakuMessage]]] {.async, gcsafe.} = +proc queryLoop(w: WakuStore, req: HistoryQuery, candidateList: seq[RemotePeerInfo]): Future[WakuStoreResult[seq[WakuMessage]]] {.async, gcsafe, + deprecated: "use waku_store/client methods instead".} = ## Loops through the peers candidate list in order and sends the query to each ## ## Once all responses have been received, the retrieved messages are consolidated into one deduplicated list. @@ -394,7 +383,8 @@ proc queryLoop(w: WakuStore, req: HistoryQuery, candidateList: seq[RemotePeerInf proc resume*(w: WakuStore, peerList: Option[seq[RemotePeerInfo]] = none(seq[RemotePeerInfo]), pageSize: uint64 = DefaultPageSize, - pubsubTopic = DefaultTopic): Future[WakuStoreResult[uint64]] {.async, gcsafe.} = + pubsubTopic = DefaultTopic): Future[WakuStoreResult[uint64]] {.async, gcsafe, + deprecated: "use waku_store/client methods instead".} = ## resume proc retrieves the history of waku messages published on the default waku pubsub topic since the last time the waku store node has been online ## messages are stored in the store node's messages field and in the message db ## the offline time window is measured as the difference between the current time and the timestamp of the most recent persisted waku message @@ -472,7 +462,8 @@ proc resume*(w: WakuStore, ## EXPERIMENTAL # NOTE: Experimental, maybe incorporate as part of query call -proc queryWithAccounting*(ws: WakuStore, req: HistoryQuery): Future[WakuStoreResult[HistoryResponse]] {.async, gcsafe.} = +proc queryWithAccounting*(ws: WakuStore, req: HistoryQuery): Future[WakuStoreResult[HistoryResponse]] {.async, gcsafe, + deprecated: "use waku_store/client methods instead".} = let peerOpt = ws.peerManager.selectPeer(WakuStoreCodec) if peerOpt.isNone(): error "no suitable remote peers" diff --git a/waku/v2/protocol/waku_store/protocol_metrics.nim b/waku/v2/protocol/waku_store/protocol_metrics.nim new file mode 100644 index 000000000..f4de631c4 --- /dev/null +++ b/waku/v2/protocol/waku_store/protocol_metrics.nim @@ -0,0 +1,21 @@ +{.push raises: [Defect].} + +import metrics + + +declarePublicGauge waku_store_messages, "number of historical messages", ["type"] +declarePublicGauge waku_store_peers, "number of store peers" +declarePublicGauge waku_store_errors, "number of store protocol errors", ["type"] +declarePublicGauge waku_store_queries, "number of store queries received" +declarePublicHistogram waku_store_insert_duration_seconds, "message insertion duration" +declarePublicHistogram waku_store_query_duration_seconds, "history query duration" + + +# Error types (metric label values) +const + invalidMessage* = "invalid_message" + insertFailure* = "insert_failure" + retPolicyFailure* = "retpolicy_failure" + dialFailure* = "dial_failure" + decodeRpcFailure* = "decode_rpc_failure" + peerNotFoundFailure* = "peer_not_found_failure" \ No newline at end of file