diff --git a/apps/chat2/chat2.nim b/apps/chat2/chat2.nim index 31ee45228..34faecb63 100644 --- a/apps/chat2/chat2.nim +++ b/apps/chat2/chat2.nim @@ -474,7 +474,8 @@ proc processInput(rfd: AsyncFD) {.async.} = # We have a viable storenode. Let's query it for historical messages. echo "Connecting to storenode: " & $(storenode.get()) - node.wakuStore.setPeer(storenode.get()) + node.mountStoreClient() + node.setStorePeer(storenode.get()) proc storeHandler(response: HistoryResponse) {.gcsafe.} = for msg in response.messages: diff --git a/apps/wakunode2/wakunode2.nim b/apps/wakunode2/wakunode2.nim index 572e42577..2541e5cb9 100644 --- a/apps/wakunode2/wakunode2.nim +++ b/apps/wakunode2/wakunode2.nim @@ -393,11 +393,15 @@ proc setupProtocols(node: WakuNode, conf: WakuNodeConf, executeMessageRetentionPolicy(node) startMessageRetentionPolicyPeriodicTask(node, interval=MessageStoreDefaultRetentionPolicyInterval) - if conf.storenode != "": - try: - setStorePeer(node, conf.storenode) - except: - return err("failed to set node waku store peer: " & getCurrentExceptionMsg()) + if conf.storenode != "": + try: + # TODO: Use option instead of nil in store client + let mStorage = if mStore.isNone(): nil + else: mStore.get() + mountStoreClient(node, store=mStorage) + setStorePeer(node, conf.storenode) + except: + return err("failed to set node waku store peer: " & getCurrentExceptionMsg()) # NOTE Must be mounted after relay if conf.lightpush: diff --git a/tests/v2/test_jsonrpc_waku.nim b/tests/v2/test_jsonrpc_waku.nim index 696af045d..1c7691cf9 100644 --- a/tests/v2/test_jsonrpc_waku.nim +++ b/tests/v2/test_jsonrpc_waku.nim @@ -28,17 +28,14 @@ import ../../waku/v2/protocol/waku_swap/waku_swap, ../../waku/v2/protocol/waku_filter, ../../waku/v2/utils/peers, - ../../waku/v2/utils/time + ../../waku/v2/utils/time, + ./testlib/common template sourceDir*: string = currentSourcePath.rsplit(DirSep, 1)[0] const sigPath = sourceDir / ParDir / ParDir / "waku" / "v2" / "node" / "jsonrpc" / "jsonrpc_callsigs.nim" createRpcSigs(RpcHttpClient, sigPath) procSuite "Waku v2 JSON-RPC API": - const defaultTopic = "/waku/2/default-waku/proto" - const defaultContentTopic = ContentTopic("/waku/2/default-content/proto") - const testCodec = "/waku/2/default-waku/codec" - let rng = crypto.newRng() privkey = crypto.PrivateKey.random(Secp256k1, rng[]).tryGet() @@ -48,7 +45,7 @@ procSuite "Waku v2 JSON-RPC API": node = WakuNode.new(privkey, bindIp, port, some(extIp), some(port)) asyncTest "Debug API: get node info": - waitFor node.start() + await node.start() await node.mountRelay() @@ -72,10 +69,10 @@ procSuite "Waku v2 JSON-RPC API": await server.stop() await server.closeWait() - waitfor node.stop() + await node.stop() asyncTest "Relay API: publish and subscribe/unsubscribe": - waitFor node.start() + await node.start() await node.mountRelay() @@ -105,7 +102,7 @@ procSuite "Waku v2 JSON-RPC API": response == true # Publish a message on the default topic - response = await client.post_waku_v2_relay_v1_message(defaultTopic, WakuRelayMessage(payload: @[byte 1], contentTopic: some(defaultContentTopic), timestamp: some(getNanosecondTime(epochTime())))) + response = await client.post_waku_v2_relay_v1_message(DefaultPubsubTopic, WakuRelayMessage(payload: @[byte 1], contentTopic: some(DefaultContentTopic), timestamp: some(getNanosecondTime(epochTime())))) check: # @TODO poll topic to verify message has been published @@ -122,18 +119,18 @@ procSuite "Waku v2 JSON-RPC API": await server.stop() await server.closeWait() - waitfor node.stop() + await node.stop() asyncTest "Relay API: get latest messages": let nodeKey1 = crypto.PrivateKey.random(Secp256k1, rng[])[] - node1 = WakuNode.new(nodeKey1, bindIp, Port(60000)) + node1 = WakuNode.new(nodeKey1, bindIp, Port(60300)) nodeKey2 = crypto.PrivateKey.random(Secp256k1, rng[])[] - node2 = WakuNode.new(nodeKey2, bindIp, Port(60002)) + node2 = WakuNode.new(nodeKey2, bindIp, Port(60302)) nodeKey3 = crypto.PrivateKey.random(Secp256k1, rng[])[] - node3 = WakuNode.new(nodeKey3, bindIp, Port(60003), some(extIp), some(port)) + node3 = WakuNode.new(nodeKey3, bindIp, Port(60303), some(extIp), some(port)) pubSubTopic = "polling" - contentTopic = defaultContentTopic + contentTopic = DefaultContentTopic payload1 = @[byte 9] message1 = WakuMessage(payload: payload1, contentTopic: contentTopic) payload2 = @[byte 8] @@ -165,11 +162,11 @@ procSuite "Waku v2 JSON-RPC API": await client.connect("127.0.0.1", rpcPort, false) # First see if we can retrieve messages published on the default topic (node is already subscribed) - await node2.publish(defaultTopic, message1) + await node2.publish(DefaultPubsubTopic, message1) await sleepAsync(2000.millis) - var messages = await client.get_waku_v2_relay_v1_messages(defaultTopic) + var messages = await client.get_waku_v2_relay_v1_messages(DefaultPubsubTopic) check: messages.len == 1 @@ -215,8 +212,8 @@ procSuite "Waku v2 JSON-RPC API": await node2.stop() await node3.stop() - asyncTest "Store API: retrieve historical messages": - waitFor node.start() + asyncTest "Store API: retrieve historical messages": + await node.start() await node.mountRelay() @@ -234,12 +231,14 @@ procSuite "Waku v2 JSON-RPC API": key = crypto.PrivateKey.random(ECDSA, rng[]).get() peer = PeerInfo.new(key) - await node.mountStore(store=StoreQueueRef.new()) + let store = StoreQueueRef.new() + await node.mountStore(store=store) + node.mountStoreClient(store=store) var listenSwitch = newStandardSwitch(some(key)) - waitFor listenSwitch.start() + await listenSwitch.start() - node.wakuStore.setPeer(listenSwitch.peerInfo.toRemotePeerInfo()) + node.setStorePeer(listenSwitch.peerInfo.toRemotePeerInfo()) listenSwitch.mount(node.wakuRelay) listenSwitch.mount(node.wakuStore) @@ -247,23 +246,23 @@ procSuite "Waku v2 JSON-RPC API": # Now prime it with some history before tests var msgList = @[WakuMessage(payload: @[byte 0], contentTopic: ContentTopic("2"), timestamp: 0), - WakuMessage(payload: @[byte 1], contentTopic: defaultContentTopic, timestamp: 1), - WakuMessage(payload: @[byte 2], contentTopic: defaultContentTopic, timestamp: 2), - WakuMessage(payload: @[byte 3], contentTopic: defaultContentTopic, timestamp: 3), - WakuMessage(payload: @[byte 4], contentTopic: defaultContentTopic, timestamp: 4), - WakuMessage(payload: @[byte 5], contentTopic: defaultContentTopic, timestamp: 5), - WakuMessage(payload: @[byte 6], contentTopic: defaultContentTopic, timestamp: 6), - WakuMessage(payload: @[byte 7], contentTopic: defaultContentTopic, timestamp: 7), - WakuMessage(payload: @[byte 8], contentTopic: defaultContentTopic, timestamp: 8), + WakuMessage(payload: @[byte 1], contentTopic: DefaultContentTopic, timestamp: 1), + WakuMessage(payload: @[byte 2], contentTopic: DefaultContentTopic, timestamp: 2), + WakuMessage(payload: @[byte 3], contentTopic: DefaultContentTopic, timestamp: 3), + WakuMessage(payload: @[byte 4], contentTopic: DefaultContentTopic, timestamp: 4), + WakuMessage(payload: @[byte 5], contentTopic: DefaultContentTopic, timestamp: 5), + WakuMessage(payload: @[byte 6], contentTopic: DefaultContentTopic, timestamp: 6), + WakuMessage(payload: @[byte 7], contentTopic: DefaultContentTopic, timestamp: 7), + WakuMessage(payload: @[byte 8], contentTopic: DefaultContentTopic, timestamp: 8), WakuMessage(payload: @[byte 9], contentTopic: ContentTopic("2"), timestamp: 9)] for wakuMsg in msgList: - require node.wakuStore.store.put(defaultTopic, wakuMsg).isOk() + require node.wakuStore.store.put(DefaultPubsubTopic, wakuMsg).isOk() let client = newRpcHttpClient() await client.connect("127.0.0.1", rpcPort, false) - let response = await client.get_waku_v2_store_v1_messages(some(defaultTopic), some(@[HistoryContentFilter(contentTopic: defaultContentTopic)]), some(Timestamp(0)), some(Timestamp(9)), some(StorePagingOptions())) + let response = await client.get_waku_v2_store_v1_messages(some(DefaultPubsubTopic), some(@[HistoryContentFilter(contentTopic: DefaultContentTopic)]), some(Timestamp(0)), some(Timestamp(9)), some(StorePagingOptions())) check: response.messages.len() == 8 response.pagingOptions.isNone() @@ -271,10 +270,10 @@ procSuite "Waku v2 JSON-RPC API": await server.stop() await server.closeWait() - waitfor node.stop() + await node.stop() asyncTest "Filter API: subscribe/unsubscribe": - waitFor node.start() + await node.start() await node.mountRelay() @@ -296,19 +295,19 @@ procSuite "Waku v2 JSON-RPC API": # Light node has not yet subscribed to any filters node.filters.len() == 0 - let contentFilters = @[ContentFilter(contentTopic: defaultContentTopic), + let contentFilters = @[ContentFilter(contentTopic: DefaultContentTopic), ContentFilter(contentTopic: ContentTopic("2")), ContentFilter(contentTopic: ContentTopic("3")), ContentFilter(contentTopic: ContentTopic("4")), ] - var response = await client.post_waku_v2_filter_v1_subscription(contentFilters = contentFilters, topic = some(defaultTopic)) + var response = await client.post_waku_v2_filter_v1_subscription(contentFilters = contentFilters, topic = some(DefaultPubsubTopic)) check: # Light node has successfully subscribed to a single filter node.filters.len() == 1 response == true - response = await client.delete_waku_v2_filter_v1_subscription(contentFilters = contentFilters, topic = some(defaultTopic)) + response = await client.delete_waku_v2_filter_v1_subscription(contentFilters = contentFilters, topic = some(DefaultPubsubTopic)) check: # Light node has successfully unsubscribed from all filters @@ -318,10 +317,10 @@ procSuite "Waku v2 JSON-RPC API": await server.stop() await server.closeWait() - waitfor node.stop() + await node.stop() asyncTest "Filter API: get latest messages": - waitFor node.start() + await node.start() # RPC server setup let @@ -339,21 +338,21 @@ procSuite "Waku v2 JSON-RPC API": # First ensure subscription exists - let sub = await client.post_waku_v2_filter_v1_subscription(contentFilters = @[ContentFilter(contentTopic: defaultContentTopic)], topic = some(defaultTopic)) + let sub = await client.post_waku_v2_filter_v1_subscription(contentFilters = @[ContentFilter(contentTopic: DefaultContentTopic)], topic = some(DefaultPubsubTopic)) check: sub # Now prime the node with some messages before tests var msgList = @[WakuMessage(payload: @[byte 0], contentTopic: ContentTopic("2")), - WakuMessage(payload: @[byte 1], contentTopic: defaultContentTopic), - WakuMessage(payload: @[byte 2], contentTopic: defaultContentTopic), - WakuMessage(payload: @[byte 3], contentTopic: defaultContentTopic), - WakuMessage(payload: @[byte 4], contentTopic: defaultContentTopic), - WakuMessage(payload: @[byte 5], contentTopic: defaultContentTopic), - WakuMessage(payload: @[byte 6], contentTopic: defaultContentTopic), - WakuMessage(payload: @[byte 7], contentTopic: defaultContentTopic), - WakuMessage(payload: @[byte 8], contentTopic: defaultContentTopic), + WakuMessage(payload: @[byte 1], contentTopic: DefaultContentTopic), + WakuMessage(payload: @[byte 2], contentTopic: DefaultContentTopic), + WakuMessage(payload: @[byte 3], contentTopic: DefaultContentTopic), + WakuMessage(payload: @[byte 4], contentTopic: DefaultContentTopic), + WakuMessage(payload: @[byte 5], contentTopic: DefaultContentTopic), + WakuMessage(payload: @[byte 6], contentTopic: DefaultContentTopic), + WakuMessage(payload: @[byte 7], contentTopic: DefaultContentTopic), + WakuMessage(payload: @[byte 8], contentTopic: DefaultContentTopic), WakuMessage(payload: @[byte 9], contentTopic: ContentTopic("2"))] let @@ -363,13 +362,13 @@ procSuite "Waku v2 JSON-RPC API": for wakuMsg in msgList: filters.notify(wakuMsg, requestId) - var response = await client.get_waku_v2_filter_v1_messages(defaultContentTopic) + var response = await client.get_waku_v2_filter_v1_messages(DefaultContentTopic) check: response.len() == 8 - response.allIt(it.contentTopic == defaultContentTopic) + response.allIt(it.contentTopic == DefaultContentTopic) # No new messages - response = await client.get_waku_v2_filter_v1_messages(defaultContentTopic) + response = await client.get_waku_v2_filter_v1_messages(DefaultContentTopic) check: response.len() == 0 @@ -380,15 +379,15 @@ procSuite "Waku v2 JSON-RPC API": for x in 1..(maxSize + 1): # Try to cache 1 more than maximum allowed - filters.notify(WakuMessage(payload: @[byte x], contentTopic: defaultContentTopic), requestId) + filters.notify(WakuMessage(payload: @[byte x], contentTopic: DefaultContentTopic), requestId) await sleepAsync(2000.millis) - response = await client.get_waku_v2_filter_v1_messages(defaultContentTopic) + response = await client.get_waku_v2_filter_v1_messages(DefaultContentTopic) check: # Max messages has not been exceeded response.len == maxSize - response.allIt(it.contentTopic == defaultContentTopic) + response.allIt(it.contentTopic == DefaultContentTopic) # Check that oldest item has been removed response[0].payload == @[byte 2] response[maxSize - 1].payload == @[byte (maxSize + 1)] @@ -396,21 +395,18 @@ procSuite "Waku v2 JSON-RPC API": await server.stop() await server.closeWait() - waitfor node.stop() + await node.stop() asyncTest "Admin API: connect to ad-hoc peers": # Create a couple of nodes let nodeKey1 = crypto.PrivateKey.random(Secp256k1, rng[])[] - node1 = WakuNode.new(nodeKey1, ValidIpAddress.init("0.0.0.0"), - Port(60000)) + node1 = WakuNode.new(nodeKey1, ValidIpAddress.init("0.0.0.0"), Port(60600)) nodeKey2 = crypto.PrivateKey.random(Secp256k1, rng[])[] - node2 = WakuNode.new(nodeKey2, ValidIpAddress.init("0.0.0.0"), - Port(60002)) + node2 = WakuNode.new(nodeKey2, ValidIpAddress.init("0.0.0.0"), Port(60602)) peerInfo2 = node2.switch.peerInfo nodeKey3 = crypto.PrivateKey.random(Secp256k1, rng[])[] - node3 = WakuNode.new(nodeKey3, ValidIpAddress.init("0.0.0.0"), - Port(60004)) + node3 = WakuNode.new(nodeKey3, ValidIpAddress.init("0.0.0.0"), Port(60604)) peerInfo3 = node3.switch.peerInfo await allFutures([node1.start(), node2.start(), node3.start()]) @@ -459,15 +455,12 @@ procSuite "Waku v2 JSON-RPC API": # Create a couple of nodes let nodeKey1 = crypto.PrivateKey.random(Secp256k1, rng[])[] - node1 = WakuNode.new(nodeKey1, ValidIpAddress.init("0.0.0.0"), - Port(60000)) + node1 = WakuNode.new(nodeKey1, ValidIpAddress.init("0.0.0.0"), Port(60220)) nodeKey2 = crypto.PrivateKey.random(Secp256k1, rng[])[] - node2 = WakuNode.new(nodeKey2, ValidIpAddress.init("0.0.0.0"), - Port(60002)) + node2 = WakuNode.new(nodeKey2, ValidIpAddress.init("0.0.0.0"), Port(60222)) peerInfo2 = node2.switch.peerInfo nodeKey3 = crypto.PrivateKey.random(Secp256k1, rng[])[] - node3 = WakuNode.new(nodeKey3, ValidIpAddress.init("0.0.0.0"), - Port(60004)) + node3 = WakuNode.new(nodeKey3, ValidIpAddress.init("0.0.0.0"), Port(60224)) peerInfo3 = node3.switch.peerInfo await allFutures([node1.start(), node2.start(), node3.start()]) @@ -511,10 +504,9 @@ procSuite "Waku v2 JSON-RPC API": asyncTest "Admin API: get unmanaged peer information": let nodeKey = crypto.PrivateKey.random(Secp256k1, rng[])[] - node = WakuNode.new(nodeKey, ValidIpAddress.init("0.0.0.0"), - Port(60000)) + node = WakuNode.new(nodeKey, ValidIpAddress.init("0.0.0.0"), Port(60523)) - waitFor node.start() + await node.start() # RPC server setup let @@ -530,7 +522,9 @@ procSuite "Waku v2 JSON-RPC API": await node.mountFilter() await node.mountSwap() - await node.mountStore(store=StoreQueueRef.new()) + let store = StoreQueueRef.new() + await node.mountStore(store=store) + node.mountStoreClient(store=store) # Create and set some peers let @@ -547,7 +541,8 @@ procSuite "Waku v2 JSON-RPC API": node.wakuFilter.setPeer(filterPeer.toRemotePeerInfo()) node.wakuSwap.setPeer(swapPeer.toRemotePeerInfo()) - node.wakuStore.setPeer(storePeer.toRemotePeerInfo()) + + node.setStorePeer(storePeer.toRemotePeerInfo()) let response = await client.get_waku_v2_admin_v1_peers() @@ -563,7 +558,7 @@ procSuite "Waku v2 JSON-RPC API": await server.stop() await server.closeWait() - waitfor node.stop() + await node.stop() asyncTest "Private API: generate asymmetric keys and encrypt/decrypt communication": let @@ -574,7 +569,7 @@ procSuite "Waku v2 JSON-RPC API": nodeKey3 = crypto.PrivateKey.random(Secp256k1, rng[])[] node3 = WakuNode.new(nodeKey3, bindIp, Port(60003), some(extIp), some(port)) pubSubTopic = "polling" - contentTopic = defaultContentTopic + contentTopic = DefaultContentTopic payload = @[byte 9] message = WakuRelayMessage(payload: payload, contentTopic: some(contentTopic), timestamp: some(getNanosecondTime(epochTime()))) topicCache = newTable[string, seq[WakuMessage]]() @@ -665,7 +660,7 @@ procSuite "Waku v2 JSON-RPC API": nodeKey3 = crypto.PrivateKey.random(Secp256k1, rng[])[] node3 = WakuNode.new(nodeKey3, bindIp, Port(60003), some(extIp), some(port)) pubSubTopic = "polling" - contentTopic = defaultContentTopic + contentTopic = DefaultContentTopic payload = @[byte 9] message = WakuRelayMessage(payload: payload, contentTopic: some(contentTopic), timestamp: some(getNanosecondTime(epochTime()))) topicCache = newTable[string, seq[WakuMessage]]() diff --git a/tests/v2/test_peer_manager.nim b/tests/v2/test_peer_manager.nim index 31a08d313..720b55728 100644 --- a/tests/v2/test_peer_manager.nim +++ b/tests/v2/test_peer_manager.nim @@ -104,11 +104,12 @@ procSuite "Peer Manager": await node.mountFilter() await node.mountSwap() - await node.mountStore() + node.mountStoreClient() node.wakuFilter.setPeer(filterPeer.toRemotePeerInfo()) node.wakuSwap.setPeer(swapPeer.toRemotePeerInfo()) - node.wakuStore.setPeer(storePeer.toRemotePeerInfo()) + + node.setStorePeer(storePeer.toRemotePeerInfo()) # Check peers were successfully added to peer manager check: @@ -217,7 +218,7 @@ procSuite "Peer Manager": await allFutures([node1.stop(), node2.stop(), node3.stop()]) -asyncTest "Peer manager support multiple protocol IDs when reconnecting to peers": + asyncTest "Peer manager support multiple protocol IDs when reconnecting to peers": let database = SqliteDatabase.init("2", inMemory = true)[] storage = WakuPeerStorage.new(database)[] diff --git a/tests/v2/test_waku_store.nim b/tests/v2/test_waku_store.nim index b76248aca..4d359699b 100644 --- a/tests/v2/test_waku_store.nim +++ b/tests/v2/test_waku_store.nim @@ -31,7 +31,7 @@ proc newTestWakuStore(switch: Switch, store=newTestMessageStore()): Future[WakuS let peerManager = PeerManager.new(switch) rng = crypto.newRng() - proto = WakuStore.init(peerManager, rng, store) + proto = WakuStore.new(peerManager, rng, store) await proto.start() switch.mount(proto) @@ -80,9 +80,6 @@ procSuite "Waku Store - history query": server = await newTestWakuStore(serverSwitch) client = newTestWakuStoreClient(clientSwitch) - client.setPeer(serverSwitch.peerInfo.toRemotePeerInfo()) - - ## Given let topic = ContentTopic("1") let @@ -92,9 +89,11 @@ procSuite "Waku Store - history query": server.handleMessage("foo", msg1) server.handleMessage("foo", msg2) + let serverPeerInfo =serverSwitch.peerInfo.toRemotePeerInfo() + ## When let rpc = HistoryQuery(contentFilters: @[HistoryContentFilter(contentTopic: topic)]) - let resQuery = await client.query(rpc) + let resQuery = await client.query(rpc, peer=serverPeerInfo) ## Then check: @@ -120,8 +119,6 @@ procSuite "Waku Store - history query": server = await newTestWakuStore(serverSwitch) client = newTestWakuStoreClient(clientSwitch) - client.setPeer(serverSwitch.peerInfo.toRemotePeerInfo()) - ## Given let topic1 = ContentTopic("1") @@ -137,12 +134,14 @@ procSuite "Waku Store - history query": server.handleMessage("foo", msg2) server.handleMessage("foo", msg3) + let serverPeerInfo =serverSwitch.peerInfo.toRemotePeerInfo() + ## When let rpc = HistoryQuery(contentFilters: @[ HistoryContentFilter(contentTopic: topic1), HistoryContentFilter(contentTopic: topic3) ]) - let resQuery = await client.query(rpc) + let resQuery = await client.query(rpc, peer=serverPeerInfo) ## Then check: @@ -169,8 +168,6 @@ procSuite "Waku Store - history query": server = await newTestWakuStore(serverSwitch) client = newTestWakuStoreClient(clientSwitch) - client.setPeer(serverSwitch.peerInfo.toRemotePeerInfo()) - ## Given let pubsubTopic1 = "queried-topic" @@ -190,6 +187,8 @@ procSuite "Waku Store - history query": server.handleMessage(pubsubtopic2, msg2) server.handleMessage(pubsubtopic2, msg3) + let serverPeerInfo =serverSwitch.peerInfo.toRemotePeerInfo() + ## When # this query targets: pubsubtopic1 AND (contentTopic1 OR contentTopic3) let rpc = HistoryQuery( @@ -197,7 +196,7 @@ procSuite "Waku Store - history query": HistoryContentFilter(contentTopic: contentTopic3)], pubsubTopic: pubsubTopic1 ) - let resQuery = await client.query(rpc) + let resQuery = await client.query(rpc, peer=serverPeerInfo) ## Then check: @@ -223,8 +222,6 @@ procSuite "Waku Store - history query": server = await newTestWakuStore(serverSwitch) client = newTestWakuStoreClient(clientSwitch) - client.setPeer(serverSwitch.peerInfo.toRemotePeerInfo()) - ## Given let pubsubtopic1 = "queried-topic" @@ -239,9 +236,11 @@ procSuite "Waku Store - history query": server.handleMessage(pubsubtopic2, msg2) server.handleMessage(pubsubtopic2, msg3) + let serverPeerInfo =serverSwitch.peerInfo.toRemotePeerInfo() + ## When let rpc = HistoryQuery(pubsubTopic: pubsubTopic1) - let res = await client.query(rpc) + let res = await client.query(rpc, peer=serverPeerInfo) ## Then check: @@ -266,8 +265,6 @@ procSuite "Waku Store - history query": server = await newTestWakuStore(serverSwitch) client = newTestWakuStoreClient(clientSwitch) - client.setPeer(serverSwitch.peerInfo.toRemotePeerInfo()) - ## Given let pubsubTopic = "queried-topic" @@ -280,9 +277,11 @@ procSuite "Waku Store - history query": server.handleMessage(pubsubTopic, msg2) server.handleMessage(pubsubTopic, msg3) + let serverPeerInfo =serverSwitch.peerInfo.toRemotePeerInfo() + ## When let rpc = HistoryQuery(pubsubTopic: pubsubTopic) - let res = await client.query(rpc) + let res = await client.query(rpc, peer=serverPeerInfo) ## Then check: @@ -310,8 +309,6 @@ procSuite "Waku Store - history query": server = await newTestWakuStore(serverSwitch) client = newTestWakuStoreClient(clientSwitch) - client.setPeer(serverSwitch.peerInfo.toRemotePeerInfo()) - ## Given let currentTime = now() let msgList = @[ @@ -329,13 +326,15 @@ procSuite "Waku Store - history query": for msg in msgList: require server.store.put(DefaultPubsubTopic, msg).isOk() + + let serverPeerInfo =serverSwitch.peerInfo.toRemotePeerInfo() ## When var rpc = HistoryQuery( contentFilters: @[HistoryContentFilter(contentTopic: DefaultContentTopic)], pagingInfo: PagingInfo(pageSize: 2, direction: PagingDirection.FORWARD) ) - var res = await client.query(rpc) + var res = await client.query(rpc, peer=serverPeerInfo) require res.isOk() var @@ -353,7 +352,7 @@ procSuite "Waku Store - history query": rpc.pagingInfo = response.pagingInfo # Continue querying - res = await client.query(rpc) + res = await client.query(rpc, peer=serverPeerInfo) require res.isOk() response = res.tryGet() totalMessages += response.messages.len() @@ -379,8 +378,6 @@ procSuite "Waku Store - history query": server = await newTestWakuStore(serverSwitch) client = newTestWakuStoreClient(clientSwitch) - client.setPeer(serverSwitch.peerInfo.toRemotePeerInfo()) - ## Given let currentTime = now() let msgList = @[ @@ -399,12 +396,14 @@ procSuite "Waku Store - history query": for msg in msgList: require server.store.put(DefaultPubsubTopic, msg).isOk() + let serverPeerInfo =serverSwitch.peerInfo.toRemotePeerInfo() + ## When var rpc = HistoryQuery( contentFilters: @[HistoryContentFilter(contentTopic: DefaultContentTopic)], pagingInfo: PagingInfo(pageSize: 2, direction: PagingDirection.BACKWARD) ) - var res = await client.query(rpc) + var res = await client.query(rpc, peer=serverPeerInfo) require res.isOk() var @@ -422,7 +421,7 @@ procSuite "Waku Store - history query": rpc.pagingInfo = response.pagingInfo # Continue querying - res = await client.query(rpc) + res = await client.query(rpc, peer=serverPeerInfo) require res.isOk() response = res.tryGet() totalMessages += response.messages.len() @@ -448,8 +447,6 @@ procSuite "Waku Store - history query": server = await newTestWakuStore(serverSwitch) client = newTestWakuStoreClient(clientSwitch) - client.setPeer(serverSwitch.peerInfo.toRemotePeerInfo()) - ## Given let msgList = @[ WakuMessage(payload: @[byte 0], contentTopic: ContentTopic("2")), @@ -467,9 +464,11 @@ procSuite "Waku Store - history query": for msg in msgList: require server.store.put(DefaultPubsubTopic, msg).isOk() + let serverPeerInfo =serverSwitch.peerInfo.toRemotePeerInfo() + ## When let rpc = HistoryQuery(contentFilters: @[HistoryContentFilter(contentTopic: DefaultContentTopic)]) - let res = await client.query(rpc) + let res = await client.query(rpc, peer=serverPeerInfo) ## Then check: @@ -497,8 +496,6 @@ procSuite "Waku Store - history query": server = newTestWakuStore(serverSwitch, store=storeA) client = newTestWakuStoreClient(clientSwitch) - client.setPeer(serverSwitch.peerInfo.toRemotePeerInfo()) - ## Given let rpc = HistoryQuery( contentFilters: @[HistoryContentFilter(contentTopic: ContentTopic("1"))], @@ -506,8 +503,10 @@ procSuite "Waku Store - history query": endTime: Timestamp(5) ) + let serverPeerInfo =serverSwitch.peerInfo.toRemotePeerInfo() + ## When - let res = await client.query(rpc) + let res = await client.query(rpc, peer=serverPeerInfo) ## Then check res.isOk() @@ -534,8 +533,6 @@ procSuite "Waku Store - history query": server = await newTestWakuStore(serverSwitch, store=storeA) client = newTestWakuStoreClient(clientSwitch) - client.setPeer(serverSwitch.peerInfo.toRemotePeerInfo()) - ## Given let rpc = HistoryQuery( contentFilters: @[HistoryContentFilter(contentTopic: ContentTopic("1"))], @@ -543,8 +540,10 @@ procSuite "Waku Store - history query": endTime: Timestamp(2) ) + let serverPeerInfo =serverSwitch.peerInfo.toRemotePeerInfo() + ## When - let res = await client.query(rpc) + let res = await client.query(rpc, peer=serverPeerInfo) ## Then check res.isOk() @@ -569,8 +568,6 @@ procSuite "Waku Store - history query": server = await newTestWakuStore(serverSwitch, store=storeA) client = newTestWakuStoreClient(clientSwitch) - client.setPeer(serverSwitch.peerInfo.toRemotePeerInfo()) - ## Given let rpc = HistoryQuery( contentFilters: @[HistoryContentFilter(contentTopic: ContentTopic("1"))], @@ -578,8 +575,10 @@ procSuite "Waku Store - history query": endTime: Timestamp(2) ) + let serverPeerInfo =serverSwitch.peerInfo.toRemotePeerInfo() + ## When - let res = await client.query(rpc) + let res = await client.query(rpc, peer=serverPeerInfo) ## Then check res.isOk() @@ -592,7 +591,7 @@ procSuite "Waku Store - history query": await allFutures(clientSwitch.stop(), serverSwitch.stop()) -suite "Waku Store - message handling": +suite "Message Store - message handling": asyncTest "it should store a valid and non-ephemeral message": ## Setup diff --git a/tests/v2/test_waku_store_client.nim b/tests/v2/test_waku_store_client.nim index 955ba7bcb..7903b2314 100644 --- a/tests/v2/test_waku_store_client.nim +++ b/tests/v2/test_waku_store_client.nim @@ -25,11 +25,11 @@ proc newTestStore(): MessageStore = let database = newTestDatabase() SqliteStore.init(database).tryGet() -proc newTestWakuStore(switch: Switch, store=newTestStore()): Future[WakuStore] {.async.} = +proc newTestWakuStoreNode(switch: Switch, store=newTestStore()): Future[WakuStore] {.async.} = let peerManager = PeerManager.new(switch) rng = crypto.newRng() - proto = WakuStore.init(peerManager, rng, store) + proto = WakuStore.new(peerManager, rng, store) await proto.start() switch.mount(proto) @@ -78,7 +78,7 @@ procSuite "Waku Store Client": await allFutures(serverSwitch.start(), clientSwitch.start()) let - server = await newTestWakuStore(serverSwitch, store=testStore) + server = await newTestWakuStoreNode(serverSwitch, store=testStore) client = newTestWakuStoreClient(clientSwitch) ## Given @@ -114,7 +114,7 @@ procSuite "Waku Store Client": await allFutures(serverSwitch.start(), clientSwitch.start()) let - server = await newTestWakuStore(serverSwitch, store=testStore) + server = await newTestWakuStoreNode(serverSwitch, store=testStore) client = newTestWakuStoreClient(clientSwitch) ## Given @@ -148,8 +148,8 @@ procSuite "Waku Store Client": await allFutures(serverSwitchA.start(), serverSwitchB.start(), clientSwitch.start()) let - serverA = await newTestWakuStore(serverSwitchA, store=testStore) - serverB = await newTestWakuStore(serverSwitchB, store=testStore) + serverA = await newTestWakuStoreNode(serverSwitchA, store=testStore) + serverB = await newTestWakuStoreNode(serverSwitchB, store=testStore) client = newTestWakuStoreClient(clientSwitch) ## Given @@ -175,70 +175,3 @@ procSuite "Waku Store Client": ## Cleanup await allFutures(clientSwitch.stop(), serverSwitchA.stop(), serverSwitchB.stop()) - - asyncTest "single query with no pre-configured store peer should fail": - ## Setup - let - serverSwitch = newTestSwitch() - clientSwitch = newTestSwitch() - - await allFutures(serverSwitch.start(), clientSwitch.start()) - - let - server = await newTestWakuStore(serverSwitch, store=testStore) - client = newTestWakuStoreClient(clientSwitch) - - ## Given - let rpc = HistoryQuery( - contentFilters: @[HistoryContentFilter(contentTopic: DefaultContentTopic)], - pagingInfo: PagingInfo(pageSize: 8) - ) - - ## When - let res = await client.query(rpc) - - ## Then - check: - res.isErr() - res.error == peerNotFoundFailure - - ## Cleanup - await allFutures(clientSwitch.stop(), serverSwitch.stop()) - - asyncTest "single query to pre-configured store peer": - ## Setup - let - serverSwitch = newTestSwitch() - clientSwitch = newTestSwitch() - - await allFutures(serverSwitch.start(), clientSwitch.start()) - - let - server = await newTestWakuStore(serverSwitch, store=testStore) - client = newTestWakuStoreClient(clientSwitch) - - ## Given - let peer = serverSwitch.peerInfo.toRemotePeerInfo() - let rpc = HistoryQuery( - contentFilters: @[HistoryContentFilter(contentTopic: DefaultContentTopic)], - pagingInfo: PagingInfo(pageSize: 8) - ) - - ## When - client.setPeer(peer) - - let res = await client.query(rpc) - - ## Then - check: - res.isOk() - - let response = res.tryGet() - check: - ## No pagination specified. Response will be auto-paginated with - ## up to MaxPageSize messages per page. - response.messages.len() == 8 - response.pagingInfo != PagingInfo() - - ## Cleanup - await allFutures(clientSwitch.stop(), serverSwitch.stop()) diff --git a/tests/v2/test_waku_swap.nim b/tests/v2/test_waku_swap.nim index 0055fb6a3..9055ac143 100644 --- a/tests/v2/test_waku_swap.nim +++ b/tests/v2/test_waku_swap.nim @@ -1,7 +1,7 @@ {.used.} import - std/[options, tables, sets, times], + std/[options, tables, sets], stew/byteutils, stew/shims/net as stewNet, testutils/unittests, @@ -20,7 +20,6 @@ import ../../waku/v2/node/storage/message/waku_store_queue, ../../waku/v2/node/waku_node, ../../waku/v2/utils/peers, - ../../waku/v2/utils/time, ../test_helpers, ./utils, ./testlib/common @@ -53,35 +52,40 @@ procSuite "Waku SWAP Accounting": decodedCheque.isErr == false decodedCheque.get() == cheque - # TODO To do this reliably we need access to contract node + # TODO: To do this reliably we need access to contract node # With current logic state isn't updated because of bad cheque # Consider moving this test to e2e test, and/or move swap module to be on by default asyncTest "Update accounting state after store operations": ## Setup let serverKey = crypto.PrivateKey.random(Secp256k1, rng[])[] - server = WakuNode.new(serverKey, ValidIpAddress.init("0.0.0.0"), Port(60002)) + server = WakuNode.new(serverKey, ValidIpAddress.init("0.0.0.0"), Port(60102)) clientKey = crypto.PrivateKey.random(Secp256k1, rng[])[] - client = WakuNode.new(clientKey, ValidIpAddress.init("0.0.0.0"), Port(60000)) + client = WakuNode.new(clientKey, ValidIpAddress.init("0.0.0.0"), Port(60100)) - # Start nodes and mount protocols await allFutures(client.start(), server.start()) + await server.mountSwap() await server.mountStore(store=StoreQueueRef.new()) await client.mountSwap() await client.mountStore() + client.mountStoreClient() - client.wakuStore.setPeer(server.peerInfo.toRemotePeerInfo()) client.wakuSwap.setPeer(server.peerInfo.toRemotePeerInfo()) server.wakuSwap.setPeer(client.peerInfo.toRemotePeerInfo()) + client.setStorePeer(server.peerInfo.toRemotePeerInfo()) + server.setStorePeer(client.peerInfo.toRemotePeerInfo()) + ## Given let message = fakeWakuMessage() + require server.wakuStore.store.put(DefaultPubsubTopic, message).isOk() - server.wakuStore.handleMessage(DefaultPubsubTopic, message) + let serverPeer = server.peerInfo.toRemotePeerInfo() + let rpc = HistoryQuery(contentFilters: @[HistoryContentFilter(contentTopic: DefaultContentTopic)]) ## When - let queryRes = await client.query(HistoryQuery(contentFilters: @[HistoryContentFilter(contentTopic: DefaultContentTopic)])) + let queryRes = await client.query(rpc, peer=serverPeer) ## Then check queryRes.isOk() @@ -104,9 +108,9 @@ procSuite "Waku SWAP Accounting": ## Setup let serverKey = crypto.PrivateKey.random(Secp256k1, rng[])[] - server = WakuNode.new(serverKey, ValidIpAddress.init("0.0.0.0"), Port(60002)) + server = WakuNode.new(serverKey, ValidIpAddress.init("0.0.0.0"), Port(60202)) clientKey = crypto.PrivateKey.random(Secp256k1, rng[])[] - client = WakuNode.new(clientKey, ValidIpAddress.init("0.0.0.0"), Port(60000)) + client = WakuNode.new(clientKey, ValidIpAddress.init("0.0.0.0"), Port(60200)) # Define the waku swap Config for this test let swapConfig = SwapConfig(mode: SwapMode.Mock, paymentThreshold: 1, disconnectThreshold: -1) @@ -117,26 +121,31 @@ procSuite "Waku SWAP Accounting": await server.mountStore(store=StoreQueueRef.new()) await client.mountSwap(swapConfig) await client.mountStore() + client.mountStoreClient() - client.wakuStore.setPeer(server.peerInfo.toRemotePeerInfo()) client.wakuSwap.setPeer(server.peerInfo.toRemotePeerInfo()) server.wakuSwap.setPeer(client.peerInfo.toRemotePeerInfo()) + + client.setStorePeer(server.peerInfo.toRemotePeerInfo()) + server.setStorePeer(client.peerInfo.toRemotePeerInfo()) ## Given let message = fakeWakuMessage() - - server.wakuStore.handleMessage(DefaultPubsubTopic, message) + require server.wakuStore.store.put(DefaultPubsubTopic, message).isOk() + + let serverPeer = server.peerInfo.toRemotePeerInfo() + let rpc = HistoryQuery(contentFilters: @[HistoryContentFilter(contentTopic: DefaultContentTopic)]) ## When # TODO: Handshakes - for now we assume implicit, e2e still works for PoC - let res1 = await client.query(HistoryQuery(contentFilters: @[HistoryContentFilter(contentTopic: DefaultContentTopic)])) - let res2 = await client.query(HistoryQuery(contentFilters: @[HistoryContentFilter(contentTopic: DefaultContentTopic)])) + let res1 = await client.query(rpc, peer=serverPeer) + let res2 = await client.query(rpc, peer=serverPeer) - ## Then - check: + require: res1.isOk() res2.isOk() + ## Then check: # Accounting table updated with credit and debit, respectively # After sending a cheque the balance is partially adjusted diff --git a/tests/v2/test_wakunode_store.nim b/tests/v2/test_wakunode_store.nim index 66b049b8b..07f0cadb6 100644 --- a/tests/v2/test_wakunode_store.nim +++ b/tests/v2/test_wakunode_store.nim @@ -46,16 +46,17 @@ procSuite "WakuNode - Store": await allFutures(client.start(), server.start()) await server.mountStore(store=newTestMessageStore()) await client.mountStore() - - client.wakuStore.setPeer(server.peerInfo.toRemotePeerInfo()) + client.mountStoreClient() ## Given let message = fakeWakuMessage() require server.wakuStore.store.put(DefaultPubsubTopic, message).isOk() + let serverPeer = server.peerInfo.toRemotePeerInfo() + ## When let req = HistoryQuery(contentFilters: @[HistoryContentFilter(contentTopic: DefaultContentTopic)]) - let queryRes = await client.query(req) + let queryRes = await client.query(req, peer=serverPeer) ## Then check queryRes.isOk() @@ -84,13 +85,13 @@ procSuite "WakuNode - Store": await server.mountStore(store=newTestMessageStore()) await server.mountFilter() await client.mountStore() + client.mountStoreClient() server.wakuFilter.setPeer(filterSource.peerInfo.toRemotePeerInfo()) - client.wakuStore.setPeer(server.peerInfo.toRemotePeerInfo()) ## Given let message = fakeWakuMessage() - + let serverPeer = server.peerInfo.toRemotePeerInfo() ## Then let filterFut = newFuture[bool]() proc filterReqHandler(msg: WakuMessage) {.gcsafe, closure.} = @@ -109,7 +110,7 @@ procSuite "WakuNode - Store": # Wait for the server filter to receive the push message require (await filterFut.withTimeout(5.seconds)) - let res = await client.query(HistoryQuery(contentFilters: @[HistoryContentFilter(contentTopic: DefaultContentTopic)])) + let res = await client.query(HistoryQuery(contentFilters: @[HistoryContentFilter(contentTopic: DefaultContentTopic)]), peer=serverPeer) ## Then check res.isOk() @@ -134,16 +135,19 @@ procSuite "WakuNode - Store": await allFutures(client.start(), server.start()) await server.mountStore(store=newTestMessageStore()) - await client.mountStore(store=StoreQueueRef.new()) - - client.wakuStore.setPeer(server.peerInfo.toRemotePeerInfo()) + + let clientStore = StoreQueueRef.new() + await client.mountStore(store=clientStore) + client.mountStoreClient(store=clientStore) ## Given let message = fakeWakuMessage() require server.wakuStore.store.put(DefaultPubsubTopic, message).isOk() + let serverPeer = server.peerInfo.toRemotePeerInfo() + ## When - await client.resume() + await client.resume(some(@[serverPeer])) # Then check: @@ -162,10 +166,11 @@ procSuite "WakuNode - Store": client = WakuNode.new(clientKey, ValidIpAddress.init("0.0.0.0"), Port(60000)) await allFutures(server.start(), client.start()) - await client.mountStore(store=StoreQueueRef.new()) await server.mountStore(store=StoreQueueRef.new()) - - client.wakuStore.setPeer(server.peerInfo.toRemotePeerInfo()) + + let clientStore = StoreQueueRef.new() + await client.mountStore(store=clientStore) + client.mountStoreClient(store=clientStore) ## Given let timeOrigin = now() @@ -184,8 +189,10 @@ procSuite "WakuNode - Store": require server.wakuStore.store.put(DefaultTopic, msg3, digest3, receivedTime3).isOk() require client.wakuStore.store.put(DefaultTopic, msg3, digest3, receivedTime3).isOk() + let serverPeer = server.peerInfo.toRemotePeerInfo() + ## When - await client.resume() + await client.resume(some(@[serverPeer])) ## Then check: diff --git a/waku/v2/node/jsonrpc/store_api.nim b/waku/v2/node/jsonrpc/store_api.nim index 582c9531f..6a130ea75 100644 --- a/waku/v2/node/jsonrpc/store_api.nim +++ b/waku/v2/node/jsonrpc/store_api.nim @@ -5,6 +5,7 @@ import chronicles, json_rpc/rpcserver import + ../peer_manager/peer_manager, ../waku_node, ../../protocol/waku_store, ../../utils/time, @@ -21,24 +22,27 @@ proc installStoreApiHandlers*(node: WakuNode, rpcsrv: RpcServer) = ## Store API version 1 definitions - rpcsrv.rpc("get_waku_v2_store_v1_messages") do(pubsubTopicOption: Option[string], contentFiltersOption: Option[seq[HistoryContentFilter]], startTime: Option[Timestamp], endTime: Option[Timestamp], pagingOptions: Option[StorePagingOptions]) -> StoreResponse: + rpcsrv.rpc("get_waku_v2_store_v1_messages") do (pubsubTopicOption: Option[string], contentFiltersOption: Option[seq[HistoryContentFilter]], startTime: Option[Timestamp], endTime: Option[Timestamp], pagingOptions: Option[StorePagingOptions]) -> StoreResponse: ## Returns history for a list of content topics with optional paging debug "get_waku_v2_store_v1_messages" + let peerOpt = node.peerManager.selectPeer(WakuStoreCodec) + if peerOpt.isNone(): + raise newException(ValueError, "no suitable remote store peers") + let historyQuery = HistoryQuery(pubsubTopic: if pubsubTopicOption.isSome: pubsubTopicOption.get() else: "", contentFilters: if contentFiltersOption.isSome: contentFiltersOption.get() else: @[], startTime: if startTime.isSome: startTime.get() else: Timestamp(0), endTime: if endTime.isSome: endTime.get() else: Timestamp(0), pagingInfo: if pagingOptions.isSome: pagingOptions.get.toPagingInfo() else: PagingInfo()) - let req = node.query(historyQuery) + let queryFut = node.query(historyQuery, peerOpt.get()) - if not (await req.withTimeout(futTimeout)): - # Future failed to complete + if not await queryFut.withTimeout(futTimeout): raise newException(ValueError, "No history response received (timeout)") - let res = req.read() + let res = queryFut.read() if res.isErr(): - raise newException(ValueError, $res.error()) + raise newException(ValueError, $res.error) debug "get_waku_v2_store_v1_messages response" return res.value.toStoreResponse() diff --git a/waku/v2/node/waku_node.nim b/waku/v2/node/waku_node.nim index 3358add31..9fc039a69 100644 --- a/waku/v2/node/waku_node.nim +++ b/waku/v2/node/waku_node.nim @@ -18,6 +18,7 @@ import import ../protocol/[waku_relay, waku_message], ../protocol/waku_store, + ../protocol/waku_store/client, ../protocol/waku_swap/waku_swap, ../protocol/waku_filter, ../protocol/waku_lightpush, @@ -39,6 +40,7 @@ declarePublicCounter waku_node_messages, "number of messages received", ["type"] declarePublicGauge waku_node_filters, "number of content filter subscriptions" declarePublicGauge waku_node_errors, "number of wakunode errors", ["type"] declarePublicGauge waku_lightpush_peers, "number of lightpush peers" +declarePublicGauge waku_store_peers, "number of store peers" logScope: @@ -75,6 +77,7 @@ type switch*: Switch wakuRelay*: WakuRelay wakuStore*: WakuStore + wakuStoreClient*: WakuStoreClient wakuFilter*: WakuFilter wakuSwap*: WakuSwap wakuRlnRelay*: WakuRLNRelay @@ -499,6 +502,11 @@ proc mountSwap*(node: WakuNode, swapConfig: SwapConfig = SwapConfig.init()) {.as ## Waku store +proc mountStoreClient*(node: WakuNode, store: MessageStore = nil) = + info "mounting store client" + + node.wakuStoreClient = WakuStoreClient.new(node.peerManager, node.rng, store) + const MessageStoreDefaultRetentionPolicyInterval* = 30.minutes proc executeMessageRetentionPolicy*(node: WakuNode) = @@ -534,7 +542,7 @@ proc mountStore*(node: WakuNode, store: MessageStore = nil, retentionPolicy=none else: info "mounting waku store protocol with waku swap support" - node.wakuStore = WakuStore.init( + node.wakuStore = WakuStore.new( node.peerManager, node.rng, store, @@ -548,29 +556,53 @@ proc mountStore*(node: WakuNode, store: MessageStore = nil, retentionPolicy=none node.switch.mount(node.wakuStore, protocolMatcher(WakuStoreCodec)) -proc setStorePeer*(node: WakuNode, peer: RemotePeerInfo|string) {.raises: [Defect, ValueError, LPError].} = - if node.wakuStore.isNil(): - error "could not set peer, waku store is nil" +proc query*(node: WakuNode, query: HistoryQuery, peer: RemotePeerInfo): Future[WakuStoreResult[HistoryResponse]] {.async, gcsafe.} = + ## Queries known nodes for historical messages + if node.wakuStoreClient.isNil(): + return err("waku store client is nil") + + let queryRes = await node.wakuStoreClient.query(query, peer) + if queryRes.isErr(): + return err(queryRes.error) + + let response = queryRes.get() + + if not node.wakuSwap.isNil(): + # Perform accounting operation + node.wakuSwap.debit(peer.peerId, response.messages.len) + + return ok(response) + + +# TODO: Move to application module (e.g., wakunode2.nim) +proc setStorePeer*(node: WakuNode, peer: RemotePeerInfo|string) {.raises: [Defect, ValueError, LPError], + deprecated: "Use 'node.query()' with peer destination instead".} = + if node.wakuStoreClient.isNil(): + error "could not set peer, waku store client is nil" return - info "Set store peer", peer=peer + info "set store peer", peer=peer let remotePeer = when peer is string: parseRemotePeerInfo(peer) else: peer - node.wakuStore.setPeer(remotePeer) + node.peerManager.addPeer(remotePeer, WakuStoreCodec) + waku_store_peers.inc() -proc query*(node: WakuNode, query: HistoryQuery): Future[WakuStoreResult[HistoryResponse]] {.async, gcsafe.} = +# TODO: Move to application module (e.g., wakunode2.nim) +proc query*(node: WakuNode, query: HistoryQuery): Future[WakuStoreResult[HistoryResponse]] {.async, gcsafe, + deprecated: "Use 'node.query()' with peer destination instead".} = ## Queries known nodes for historical messages + if node.wakuStoreClient.isNil(): + return err("waku store client is nil") - # TODO: Once waku swap is less experimental, this can simplified - if node.wakuSwap.isNil(): - debug "Using default query" - return await node.wakuStore.query(query) - else: - debug "Using SWAP accounting query" - # TODO: wakuSwap now part of wakuStore object - return await node.wakuStore.queryWithAccounting(query) + let peerOpt = node.peerManager.selectPeer(WakuStoreCodec) + if peerOpt.isNone(): + error "no suitable remote peers" + return err("peer_not_found_failure") + return await node.query(query, peerOpt.get()) + +# TODO: Move to application module (e.g., wakunode2.nim) proc resume*(node: WakuNode, peerList: Option[seq[RemotePeerInfo]] = none(seq[RemotePeerInfo])) {.async, gcsafe.} = ## resume proc retrieves the history of waku messages published on the default waku pubsub topic since the last time the waku node has been online ## for resume to work properly the waku node must have the store protocol mounted in the full mode (i.e., persisting messages) @@ -580,10 +612,10 @@ proc resume*(node: WakuNode, peerList: Option[seq[RemotePeerInfo]] = none(seq[Re ## peerList indicates the list of peers to query from. The history is fetched from the first available peer in this list. Such candidates should be found through a discovery method (to be developed). ## if no peerList is passed, one of the peers in the underlying peer manager unit of the store protocol is picked randomly to fetch the history from. ## The history gets fetched successfully if the dialed peer has been online during the queried time window. - if node.wakuStore.isNil(): + if node.wakuStoreClient.isNil(): return - let retrievedMessages = await node.wakuStore.resume(peerList) + let retrievedMessages = await node.wakuStoreClient.resume(peerList) if retrievedMessages.isErr(): error "failed to resume store", error=retrievedMessages.error return diff --git a/waku/v2/protocol/waku_store/client.nim b/waku/v2/protocol/waku_store/client.nim index b10819156..4cc7bfc4c 100644 --- a/waku/v2/protocol/waku_store/client.nim +++ b/waku/v2/protocol/waku_store/client.nim @@ -34,9 +34,8 @@ type WakuStoreClient* = ref object proc new*(T: type WakuStoreClient, peerManager: PeerManager, rng: ref rand.HmacDrbgContext, - store: MessageStore, - wakuSwap: WakuSwap = nil): T = - WakuStoreClient(peerManager: peerManager, rng: rng, store: store, wakuSwap: wakuSwap) + store: MessageStore): T = + WakuStoreClient(peerManager: peerManager, rng: rng, store: store) proc query*(w: WakuStoreClient, req: HistoryQuery, peer: RemotePeerInfo): Future[WakuStoreResult[HistoryResponse]] {.async, gcsafe.} = @@ -113,30 +112,6 @@ proc queryLoop*(w: WakuStoreClient, req: HistoryQuery, peers: seq[RemotePeerInfo return ok(messagesList) - -### Set store peer and query for messages - -proc setPeer*(ws: WakuStoreClient, peer: RemotePeerInfo) = - ws.peerManager.addPeer(peer, WakuStoreCodec) - waku_store_peers.inc() - -proc query*(w: WakuStoreClient, req: HistoryQuery): Future[WakuStoreResult[HistoryResponse]] {.async, gcsafe.} = - # TODO: We need to be more stratigic about which peers we dial. Right now we just set one on the service. - # Ideally depending on the query and our set of peers we take a subset of ideal peers. - # This will require us to check for various factors such as: - # - which topics they track - # - latency? - # - default store peer? - - let peerOpt = w.peerManager.selectPeer(WakuStoreCodec) - if peerOpt.isNone(): - error "no suitable remote peers" - waku_store_errors.inc(labelValues = [peerNotFoundFailure]) - return err(peerNotFoundFailure) - - return await w.query(req, peerOpt.get()) - - ## Resume store const StoreResumeTimeWindowOffset: Timestamp = getNanosecondTime(20) ## Adjust the time window with an offset of 20 seconds @@ -216,28 +191,3 @@ proc resume*(w: WakuStoreClient, added.inc() return ok(added) - - -## EXPERIMENTAL - -# NOTE: Experimental, maybe incorporate as part of query call -proc queryWithAccounting*(w: WakuStoreClient, req: HistoryQuery): Future[WakuStoreResult[HistoryResponse]] {.async, gcsafe.} = - if w.wakuSwap.isNil(): - return err("waku swap not fount (nil)") - - let peerOpt = w.peerManager.selectPeer(WakuStoreCodec) - if peerOpt.isNone(): - error "no suitable remote peers" - waku_store_errors.inc(labelValues = [peerNotFoundFailure]) - return err(peerNotFoundFailure) - - let queryRes = await w.query(req, peerOpt.get()) - if queryRes.isErr(): - return err(queryRes.error) - - let response = queryRes.get() - - # Perform accounting operation. Assumes wakuSwap protocol is mounted - w.wakuSwap.debit(peerOpt.get().peerId, response.messages.len) - - return ok(response) \ No newline at end of file diff --git a/waku/v2/protocol/waku_store/protocol.nim b/waku/v2/protocol/waku_store/protocol.nim index b39993665..aaf542b7a 100644 --- a/waku/v2/protocol/waku_store/protocol.nim +++ b/waku/v2/protocol/waku_store/protocol.nim @@ -19,7 +19,6 @@ import ../../node/storage/message/waku_store_queue, ../../node/peer_manager/peer_manager, ../../utils/time, - ../../utils/requests, ../waku_message, ../waku_swap/waku_swap, ./rpc, @@ -201,7 +200,7 @@ proc initProtocolHandler*(ws: WakuStore) = ws.handler = handler ws.codec = WakuStoreCodec -proc init*(T: type WakuStore, +proc new*(T: type WakuStore, peerManager: PeerManager, rng: ref rand.HmacDrbgContext, store: MessageStore, @@ -270,217 +269,3 @@ proc handleMessage*(w: WakuStore, pubsubTopic: string, msg: WakuMessage) = let insertDuration = getTime().toUnixFloat() - insertStartTime waku_store_insert_duration_seconds.observe(insertDuration) - - -## CLIENT - -# TODO: This should probably be an add function and append the peer to an array -proc setPeer*(ws: WakuStore, peer: RemotePeerInfo) {. - deprecated: "use waku_store/client methods instead".} = - ws.peerManager.addPeer(peer, WakuStoreCodec) - waku_store_peers.inc() - -proc query(w: WakuStore, req: HistoryQuery, peer: RemotePeerInfo): Future[WakuStoreResult[HistoryResponse]] {.async, gcsafe, - deprecated: "use waku_store/client methods instead".} = - let connOpt = await w.peerManager.dialPeer(peer, WakuStoreCodec) - if connOpt.isNone(): - waku_store_errors.inc(labelValues = [dialFailure]) - return err(dialFailure) - let connection = connOpt.get() - - let rpc = HistoryRPC(requestId: generateRequestId(w.rng), query: req) - await connection.writeLP(rpc.encode().buffer) - - var message = await connOpt.get().readLp(MaxRpcSize.int) - let response = HistoryRPC.init(message) - - if response.isErr(): - error "failed to decode response" - waku_store_errors.inc(labelValues = [decodeRpcFailure]) - return err(decodeRpcFailure) - - waku_store_messages.set(response.value.response.messages.len.int64, labelValues = ["retrieved"]) - return ok(response.value.response) - -proc query*(w: WakuStore, req: HistoryQuery): Future[WakuStoreResult[HistoryResponse]] {.async, gcsafe, - deprecated: "use waku_store/client methods instead".} = - # TODO: We need to be more stratigic about which peers we dial. Right now we just set one on the service. - # Ideally depending on the query and our set of peers we take a subset of ideal peers. - # This will require us to check for various factors such as: - # - which topics they track - # - latency? - # - default store peer? - - let peerOpt = w.peerManager.selectPeer(WakuStoreCodec) - if peerOpt.isNone(): - error "no suitable remote peers" - waku_store_errors.inc(labelValues = [peerNotFoundFailure]) - return err(peerNotFoundFailure) - - return await w.query(req, peerOpt.get()) - - -## 21/WAKU2-FAULT-TOLERANT-STORE - -const StoreResumeTimeWindowOffset: Timestamp = getNanosecondTime(20) ## Adjust the time window with an offset of 20 seconds - -proc queryFromWithPaging*(w: WakuStore, query: HistoryQuery, peer: RemotePeerInfo): Future[WakuStoreResult[seq[WakuMessage]]] {.async, gcsafe, - deprecated: "use waku_store/client methods instead".} = - ## A thin wrapper for query. Sends the query to the given peer. when the query has a valid pagingInfo, - ## it retrieves the historical messages in pages. - ## Returns all the fetched messages, if error occurs, returns an error string - - # Make a copy of the query - var req = query - - var messageList: seq[WakuMessage] = @[] - - # Fetch the history in pages - while true: - let res = await w.query(req, peer) - if res.isErr(): - return err(res.error) - - let response = res.get() - - messageList.add(response.messages) - - # Check whether it is the last page - if response.pagingInfo.pageSize == 0: - break - - # Update paging cursor - req.pagingInfo.cursor = response.pagingInfo.cursor - - return ok(messageList) - -proc queryLoop(w: WakuStore, req: HistoryQuery, candidateList: seq[RemotePeerInfo]): Future[WakuStoreResult[seq[WakuMessage]]] {.async, gcsafe, - deprecated: "use waku_store/client methods instead".} = - ## Loops through the peers candidate list in order and sends the query to each - ## - ## Once all responses have been received, the retrieved messages are consolidated into one deduplicated list. - ## if no messages have been retrieved, the returned future will resolve into a result holding an empty seq. - let queriesList = candidateList.mapIt(w.queryFromWithPaging(req, it)) - - await allFutures(queriesList) - - let messagesList = queriesList - .map(proc (fut: Future[WakuStoreResult[seq[WakuMessage]]]): seq[WakuMessage] = - try: - # fut.read() can raise a CatchableError - # These futures have been awaited before using allFutures(). Call completed() just as a sanity check. - if not fut.completed() or fut.read().isErr(): - return @[] - - fut.read().value - except CatchableError: - return @[] - ) - .concat() - .deduplicate() - - if messagesList.len == 0: - return err("failed to resolve the query") - - return ok(messagesList) - -proc resume*(w: WakuStore, - peerList: Option[seq[RemotePeerInfo]] = none(seq[RemotePeerInfo]), - pageSize: uint64 = DefaultPageSize, - pubsubTopic = DefaultTopic): Future[WakuStoreResult[uint64]] {.async, gcsafe, - deprecated: "use waku_store/client methods instead".} = - ## resume proc retrieves the history of waku messages published on the default waku pubsub topic since the last time the waku store node has been online - ## messages are stored in the store node's messages field and in the message db - ## the offline time window is measured as the difference between the current time and the timestamp of the most recent persisted waku message - ## an offset of 20 second is added to the time window to count for nodes asynchrony - ## peerList indicates the list of peers to query from. - ## The history is fetched from all available peers in this list and then consolidated into one deduplicated list. - ## Such candidates should be found through a discovery method (to be developed). - ## if no peerList is passed, one of the peers in the underlying peer manager unit of the store protocol is picked randomly to fetch the history from. - ## The history gets fetched successfully if the dialed peer has been online during the queried time window. - ## the resume proc returns the number of retrieved messages if no error occurs, otherwise returns the error string - - # If store has not been provided, don't even try - if w.store.isNil(): - return err("store not provided (nil)") - - # NOTE: Original implementation is based on the message's sender timestamp. At the moment - # of writing, the sqlite store implementation returns the last message's receiver - # timestamp. - # lastSeenTime = lastSeenItem.get().msg.timestamp - let - lastSeenTime = w.store.getNewestMessageTimestamp().get(Timestamp(0)) - now = getNanosecondTime(getTime().toUnixFloat()) - - debug "resuming with offline time window", lastSeenTime=lastSeenTime, currentTime=now - - let - queryEndTime = now + StoreResumeTimeWindowOffset - queryStartTime = max(lastSeenTime - StoreResumeTimeWindowOffset, 0) - - let req = HistoryQuery( - pubsubTopic: pubsubTopic, - startTime: queryStartTime, - endTime: queryEndTime, - pagingInfo: PagingInfo( - direction:PagingDirection.FORWARD, - pageSize: pageSize - ) - ) - - var res: WakuStoreResult[seq[WakuMessage]] - if peerList.isSome(): - debug "trying the candidate list to fetch the history" - res = await w.queryLoop(req, peerList.get()) - - else: - debug "no candidate list is provided, selecting a random peer" - # if no peerList is set then query from one of the peers stored in the peer manager - let peerOpt = w.peerManager.selectPeer(WakuStoreCodec) - if peerOpt.isNone(): - warn "no suitable remote peers" - waku_store_errors.inc(labelValues = [peerNotFoundFailure]) - return err("no suitable remote peers") - - debug "a peer is selected from peer manager" - res = await w.queryFromWithPaging(req, peerOpt.get()) - - if res.isErr(): - debug "failed to resume the history" - return err("failed to resume the history") - - - # Save the retrieved messages in the store - var added: uint = 0 - for msg in res.get(): - let putStoreRes = w.store.put(pubsubTopic, msg) - if putStoreRes.isErr(): - warn "failed to insert resumed message into store", error=putStoreRes.error - continue - - added.inc() - - return ok(added) - - -## EXPERIMENTAL - -# NOTE: Experimental, maybe incorporate as part of query call -proc queryWithAccounting*(ws: WakuStore, req: HistoryQuery): Future[WakuStoreResult[HistoryResponse]] {.async, gcsafe, - deprecated: "use waku_store/client methods instead".} = - let peerOpt = ws.peerManager.selectPeer(WakuStoreCodec) - if peerOpt.isNone(): - error "no suitable remote peers" - waku_store_errors.inc(labelValues = [peerNotFoundFailure]) - return err(peerNotFoundFailure) - - let res = await ws.query(req, peerOpt.get()) - if res.isErr(): - return err(res.error) - - let response = res.get() - - # Perform accounting operation. Assumes wakuSwap protocol is mounted - ws.wakuSwap.debit(peerOpt.get().peerId, response.messages.len) - - return ok(response) diff --git a/waku/v2/protocol/waku_store/protocol_metrics.nim b/waku/v2/protocol/waku_store/protocol_metrics.nim index f4de631c4..c38318028 100644 --- a/waku/v2/protocol/waku_store/protocol_metrics.nim +++ b/waku/v2/protocol/waku_store/protocol_metrics.nim @@ -4,7 +4,6 @@ import metrics declarePublicGauge waku_store_messages, "number of historical messages", ["type"] -declarePublicGauge waku_store_peers, "number of store peers" declarePublicGauge waku_store_errors, "number of store protocol errors", ["type"] declarePublicGauge waku_store_queries, "number of store queries received" declarePublicHistogram waku_store_insert_duration_seconds, "message insertion duration"