From 68aeda48c3a0e89a867ccf78f03d4cc0e5e9d983 Mon Sep 17 00:00:00 2001 From: LNSD Date: Mon, 21 Nov 2022 09:25:37 +0000 Subject: [PATCH] deploy: e85b5cbae1875c3c036f7cf12453f88031ffa592 --- tests/v2/test_waku_swap.nim | 113 +-------------- .../vendor/libbacktrace-upstream/libtool | 2 +- waku/v2/node/waku_node.nim | 134 ++++++++---------- waku/v2/protocol/waku_store/client.nim | 40 +++--- waku/v2/protocol/waku_store/protocol.nim | 70 ++++----- 5 files changed, 115 insertions(+), 244 deletions(-) diff --git a/tests/v2/test_waku_swap.nim b/tests/v2/test_waku_swap.nim index 8c0e98711..8b82f455d 100644 --- a/tests/v2/test_waku_swap.nim +++ b/tests/v2/test_waku_swap.nim @@ -2,10 +2,10 @@ import std/tables, - stew/shims/net as stewNet, + stew/shims/net as stewNet, testutils/unittests, - chronos, - chronicles, + chronos, + chronicles, libp2p/switch, libp2p/protobuf/minprotobuf, libp2p/stream/bufferstream, @@ -15,11 +15,9 @@ import eth/keys import ../../waku/v2/node/waku_node, - ../../waku/v2/node/message_store/queue_store, - ../../waku/v2/protocol/waku_store, ../../waku/v2/protocol/waku_swap/waku_swap, ../../waku/v2/utils/peers, - ../test_helpers, + ../test_helpers, ./utils, ./testlib/common @@ -50,106 +48,3 @@ procSuite "Waku SWAP Accounting": check: decodedCheque.isErr == false decodedCheque.get() == cheque - - # TODO: To do this reliably we need access to contract node - # With current logic state isn't updated because of bad cheque - # Consider moving this test to e2e test, and/or move swap module to be on by default - asyncTest "Update accounting state after store operations": - ## Setup - let - serverKey = crypto.PrivateKey.random(Secp256k1, rng[])[] - server = WakuNode.new(serverKey, ValidIpAddress.init("0.0.0.0"), Port(60102)) - clientKey = crypto.PrivateKey.random(Secp256k1, rng[])[] - client = WakuNode.new(clientKey, ValidIpAddress.init("0.0.0.0"), Port(60100)) - - await allFutures(client.start(), server.start()) - - await server.mountSwap() - await server.mountStore(store=StoreQueueRef.new()) - await client.mountSwap() - await client.mountStore() - client.mountStoreClient() - - client.wakuSwap.setPeer(server.peerInfo.toRemotePeerInfo()) - server.wakuSwap.setPeer(client.peerInfo.toRemotePeerInfo()) - - client.setStorePeer(server.peerInfo.toRemotePeerInfo()) - server.setStorePeer(client.peerInfo.toRemotePeerInfo()) - - ## Given - let message = fakeWakuMessage() - require server.wakuStore.store.put(DefaultPubsubTopic, message).isOk() - - let serverPeer = server.peerInfo.toRemotePeerInfo() - let req = HistoryQuery(contentTopics: @[DefaultContentTopic]) - - ## When - let queryRes = await client.query(req, peer=serverPeer) - - ## Then - check queryRes.isOk() - - let response = queryRes.get() - check: - response.messages == @[message] - - check: - client.wakuSwap.accounting[server.peerInfo.peerId] == 1 - server.wakuSwap.accounting[client.peerInfo.peerId] == -1 - - ## Cleanup - await allFutures(client.stop(), server.stop()) - - - # This test will only Be checked if in Mock mode - # TODO: Add cheque here - asyncTest "Update accounting state after sending cheque": - ## Setup - let - serverKey = crypto.PrivateKey.random(Secp256k1, rng[])[] - server = WakuNode.new(serverKey, ValidIpAddress.init("0.0.0.0"), Port(60202)) - clientKey = crypto.PrivateKey.random(Secp256k1, rng[])[] - client = WakuNode.new(clientKey, ValidIpAddress.init("0.0.0.0"), Port(60200)) - - # Define the waku swap Config for this test - let swapConfig = SwapConfig(mode: SwapMode.Mock, paymentThreshold: 1, disconnectThreshold: -1) - - # Start nodes and mount protocols - await allFutures(client.start(), server.start()) - await server.mountSwap(swapConfig) - await server.mountStore(store=StoreQueueRef.new()) - await client.mountSwap(swapConfig) - await client.mountStore() - client.mountStoreClient() - - client.wakuSwap.setPeer(server.peerInfo.toRemotePeerInfo()) - server.wakuSwap.setPeer(client.peerInfo.toRemotePeerInfo()) - - client.setStorePeer(server.peerInfo.toRemotePeerInfo()) - server.setStorePeer(client.peerInfo.toRemotePeerInfo()) - - ## Given - let message = fakeWakuMessage() - require server.wakuStore.store.put(DefaultPubsubTopic, message).isOk() - - let serverPeer = server.peerInfo.toRemotePeerInfo() - let req = HistoryQuery(contentTopics: @[DefaultContentTopic]) - - ## When - # TODO: Handshakes - for now we assume implicit, e2e still works for PoC - let res1 = await client.query(req, peer=serverPeer) - let res2 = await client.query(req, peer=serverPeer) - - require: - res1.isOk() - res2.isOk() - - ## Then - check: - # Accounting table updated with credit and debit, respectively - # After sending a cheque the balance is partially adjusted - client.wakuSwap.accounting[server.peerInfo.peerId] == 1 - server.wakuSwap.accounting[client.peerInfo.peerId] == -1 - - ## Cleanup - await allFutures(client.stop(), server.stop()) diff --git a/vendor/nim-libbacktrace/vendor/libbacktrace-upstream/libtool b/vendor/nim-libbacktrace/vendor/libbacktrace-upstream/libtool index bdf06b3f8..48b691232 100755 --- a/vendor/nim-libbacktrace/vendor/libbacktrace-upstream/libtool +++ b/vendor/nim-libbacktrace/vendor/libbacktrace-upstream/libtool @@ -2,7 +2,7 @@ # libtool - Provide generalized library-building support services. # Generated automatically by config.status (libbacktrace) version-unused -# Libtool was configured on host fv-az41-370: +# Libtool was configured on host fv-az508-423: # NOTE: Changes made to this file will be lost: look at ltmain.sh. # # Copyright (C) 1996, 1997, 1998, 1999, 2000, 2001, 2003, 2004, 2005, diff --git a/waku/v2/node/waku_node.nim b/waku/v2/node/waku_node.nim index b30f82ae0..5d27d1cac 100644 --- a/waku/v2/node/waku_node.nim +++ b/waku/v2/node/waku_node.nim @@ -32,7 +32,7 @@ import ../protocol/waku_lightpush, ../protocol/waku_lightpush/client as lightpush_client, ../protocol/waku_peer_exchange, - ../utils/peers, + ../utils/peers, ../utils/wakuenr, ./peer_manager/peer_manager, ./message_store/message_retention_policy, @@ -128,11 +128,11 @@ template wsFlag(wssEnabled: bool): MultiAddress = if wssEnabled: MultiAddress.init("/wss").tryGet() else: MultiAddress.init("/ws").tryGet() -proc new*(T: type WakuNode, +proc new*(T: type WakuNode, nodeKey: crypto.PrivateKey, - bindIp: ValidIpAddress, + bindIp: ValidIpAddress, bindPort: Port, - extIp = none(ValidIpAddress), + extIp = none(ValidIpAddress), extPort = none(Port), peerStorage: PeerStorage = nil, maxConnections = builders.MaxConnections, @@ -160,7 +160,7 @@ proc new*(T: type WakuNode, # Setup external addresses, if available var hostExtAddress, wsExtAddress = none(MultiAddress) - + if (dns4DomainName.isSome()): # Use dns4 for externally announced addresses hostExtAddress = some(dns4TcpEndPoint(dns4DomainName.get(), extPort.get())) @@ -180,12 +180,12 @@ proc new*(T: type WakuNode, announcedAddresses.add(hostExtAddress.get()) else: announcedAddresses.add(hostAddress) # We always have at least a bind address for the host - + if wsExtAddress.isSome(): announcedAddresses.add(wsExtAddress.get()) elif wsHostAddress.isSome(): announcedAddresses.add(wsHostAddress.get()) - + ## Initialize peer let rng = crypto.newRng() @@ -202,7 +202,7 @@ proc new*(T: type WakuNode, discv5UdpPort, wakuFlags, enrMultiaddrs) - + info "Initializing networking", addrs=announcedAddresses let switch = newWakuSwitch( @@ -210,7 +210,7 @@ proc new*(T: type WakuNode, hostAddress, wsHostAddress, transportFlags = {ServerFlags.ReuseAddr}, - rng = rng, + rng = rng, maxConnections = maxConnections, wssEnabled = wssEnabled, secureKeyPath = secureKey, @@ -219,7 +219,7 @@ proc new*(T: type WakuNode, sendSignedPeerRecord = sendSignedPeerRecord, agentString = agentString ) - + let wakuNode = WakuNode( peerManager: PeerManager.new(switch, peerStorage), switch: switch, @@ -231,7 +231,7 @@ proc new*(T: type WakuNode, return wakuNode -proc peerInfo*(node: WakuNode): PeerInfo = +proc peerInfo*(node: WakuNode): PeerInfo = node.switch.peerInfo # TODO: Extend with more relevant info: topics, peers, memory usage, online time, etc @@ -239,7 +239,7 @@ proc info*(node: WakuNode): WakuInfo = ## Returns information about the Node, such as what multiaddress it can be reached at. let peerInfo = node.switch.peerInfo - + var listenStr : seq[string] for address in node.announcedAddresses: var fulladdr = $address & "/p2p/" & $peerInfo.peerId @@ -277,7 +277,7 @@ proc subscribe(node: WakuNode, topic: PubsubTopic, handler: Option[TopicHandler] # Notify mounted protocols of new message if not node.wakuFilter.isNil(): await node.wakuFilter.handleMessage(topic, msg.value) - + if not node.wakuStore.isNil(): node.wakuStore.handleMessage(topic, msg.value) @@ -308,7 +308,7 @@ proc unsubscribe*(node: WakuNode, topic: PubsubTopic, handler: TopicHandler) = error "Invalid API call to `unsubscribe`. WakuRelay not mounted." # TODO: improved error handling return - + info "unsubscribe", topic=topic let wakuRelay = node.wakuRelay @@ -316,22 +316,22 @@ proc unsubscribe*(node: WakuNode, topic: PubsubTopic, handler: TopicHandler) = proc unsubscribeAll*(node: WakuNode, topic: PubsubTopic) = ## Unsubscribes all handlers registered on a specific PubSub topic. - + if node.wakuRelay.isNil(): error "Invalid API call to `unsubscribeAll`. WakuRelay not mounted." # TODO: improved error handling return - + info "unsubscribeAll", topic=topic let wakuRelay = node.wakuRelay wakuRelay.unsubscribeAll(topic) - + proc publish*(node: WakuNode, topic: PubsubTopic, message: WakuMessage) {.async, gcsafe.} = ## Publish a `WakuMessage` to a PubSub topic. `WakuMessage` should contain a ## `contentTopic` field for light node functionality. This field may be also ## be omitted. - + if node.wakuRelay.isNil(): error "Invalid API call to `publish`. WakuRelay not mounted. Try `lightpush` instead." # TODO: Improve error handling @@ -351,7 +351,7 @@ proc startRelay*(node: WakuNode) {.async.} = return ## Setup relay protocol - + # Subscribe to the default PubSub topics for topic in node.wakuRelay.defaultPubsubTopics: node.subscribe(topic, none(TopicHandler)) @@ -359,14 +359,14 @@ proc startRelay*(node: WakuNode) {.async.} = # Resume previous relay connections if node.peerManager.hasPeers(protocolMatcher(WakuRelayCodec)): info "Found previous WakuRelay peers. Reconnecting." - + # Reconnect to previous relay peers. This will respect a backoff period, if necessary let backoffPeriod = node.wakuRelay.parameters.pruneBackoff + chronos.seconds(BackoffSlackTime) await node.peerManager.reconnectPeers(WakuRelayCodec, protocolMatcher(WakuRelayCodec), backoffPeriod) - + # Start the WakuRelay protocol await node.wakuRelay.start() @@ -378,7 +378,7 @@ proc mountRelay*(node: WakuNode, peerExchangeHandler = none(RoutingRecordsHandler)) {.async, gcsafe.} = ## The default relay topics is the union of all configured topics plus default PubsubTopic(s) info "mounting relay protocol" - + let initRes = WakuRelay.new( node.peerManager, defaultPubsubTopics = concat(@[DefaultPubsubTopic], topics), @@ -432,14 +432,14 @@ proc mountFilterClient*(node: WakuNode) {.async, raises: [Defect, LPError].} = node.switch.mount(node.wakuFilterClient, protocolMatcher(WakuFilterCodec)) -proc filterSubscribe*(node: WakuNode, pubsubTopic: PubsubTopic, contentTopics: ContentTopic|seq[ContentTopic], +proc filterSubscribe*(node: WakuNode, pubsubTopic: PubsubTopic, contentTopics: ContentTopic|seq[ContentTopic], handler: FilterPushHandler, peer: RemotePeerInfo|string) {.async, gcsafe, raises: [Defect, ValueError].} = ## Registers for messages that match a specific filter. Triggers the handler whenever a message is received. if node.wakuFilterClient.isNil(): error "cannot register filter subscription to topic", error="waku filter client is nil" return - - let remotePeer = when peer is string: parseRemotePeerInfo(peer) + + let remotePeer = when peer is string: parseRemotePeerInfo(peer) else: peer info "registering filter subscription to content", pubsubTopic=pubsubTopic, contentTopics=contentTopics, peer=remotePeer @@ -466,11 +466,11 @@ proc filterUnsubscribe*(node: WakuNode, pubsubTopic: PubsubTopic, contentTopics: error "cannot unregister filter subscription to content", error="waku filter client is nil" return - let remotePeer = when peer is string: parseRemotePeerInfo(peer) + let remotePeer = when peer is string: parseRemotePeerInfo(peer) else: peer - + info "deregistering filter subscription to content", pubsubTopic=pubsubTopic, contentTopics=contentTopics, peer=remotePeer - + let unsubRes = await node.wakuFilterClient.unsubscribe(pubsubTopic, contentTopics, peer=remotePeer) if unsubRes.isOk(): info "unsubscribed from topic", pubsubTopic=pubsubTopic, contentTopics=contentTopics @@ -501,12 +501,12 @@ proc subscribe*(node: WakuNode, pubsubTopic: PubsubTopic, contentTopics: Content if node.wakuFilterClient.isNil(): error "cannot register filter subscription to topic", error="waku filter client is nil" return - + let peerOpt = node.peerManager.selectPeer(WakuFilterCodec) if peerOpt.isNone(): error "cannot register filter subscription to topic", error="no suitable remote peers" return - + await node.filterSubscribe(pubsubTopic, contentTopics, handler, peer=peerOpt.get()) # TODO: Move to application module (e.g., wakunode2.nim) @@ -516,7 +516,7 @@ proc unsubscribe*(node: WakuNode, pubsubTopic: PubsubTopic, contentTopics: Conte if node.wakuFilterClient.isNil(): error "cannot unregister filter subscription to content", error="waku filter client is nil" return - + let peerOpt = node.peerManager.selectPeer(WakuFilterCodec) if peerOpt.isNone(): error "cannot register filter subscription to topic", error="no suitable remote peers" @@ -565,23 +565,19 @@ proc startMessageRetentionPolicyPeriodicTask*(node: WakuNode, interval: Duration # https://github.com/nim-lang/Nim/issues/17369 var executeRetentionPolicy: proc(udata: pointer) {.gcsafe, raises: [Defect].} - executeRetentionPolicy = proc(udata: pointer) {.gcsafe.} = + executeRetentionPolicy = proc(udata: pointer) {.gcsafe.} = executeMessageRetentionPolicy(node) discard setTimer(Moment.fromNow(interval), executeRetentionPolicy) - + discard setTimer(Moment.fromNow(interval), executeRetentionPolicy) proc mountStore*(node: WakuNode, store: MessageStore = nil, retentionPolicy=none(MessageRetentionPolicy) ) {.async, raises: [Defect, LPError].} = - if node.wakuSwap.isNil(): - info "mounting waku store protocol (no waku swap)" - else: - info "mounting waku store protocol with waku swap support" + info "mounting waku store protocol" node.wakuStore = WakuStore.new( - node.peerManager, - node.rng, - store, - wakuSwap=node.wakuSwap, + node.peerManager, + node.rng, + store, retentionPolicy=retentionPolicy ) @@ -605,12 +601,8 @@ proc query*(node: WakuNode, query: HistoryQuery, peer: RemotePeerInfo): Future[W let queryRes = await node.wakuStoreClient.query(query, peer) if queryRes.isErr(): return err($queryRes.error) - + let response = queryRes.get() - - if not node.wakuSwap.isNil(): - # Perform accounting operation - node.wakuSwap.debit(peer.peerId, response.messages.len) return ok(response) @@ -645,13 +637,13 @@ proc query*(node: WakuNode, query: HistoryQuery): Future[WakuStoreResult[History # TODO: Move to application module (e.g., wakunode2.nim) proc resume*(node: WakuNode, peerList: Option[seq[RemotePeerInfo]] = none(seq[RemotePeerInfo])) {.async, gcsafe.} = - ## resume proc retrieves the history of waku messages published on the default waku pubsub topic since the last time the waku node has been online + ## resume proc retrieves the history of waku messages published on the default waku pubsub topic since the last time the waku node has been online ## for resume to work properly the waku node must have the store protocol mounted in the full mode (i.e., persisting messages) ## messages are stored in the the wakuStore's messages field and in the message db - ## the offline time window is measured as the difference between the current time and the timestamp of the most recent persisted waku message + ## the offline time window is measured as the difference between the current time and the timestamp of the most recent persisted waku message ## an offset of 20 second is added to the time window to count for nodes asynchrony ## peerList indicates the list of peers to query from. The history is fetched from the first available peer in this list. Such candidates should be found through a discovery method (to be developed). - ## if no peerList is passed, one of the peers in the underlying peer manager unit of the store protocol is picked randomly to fetch the history from. + ## if no peerList is passed, one of the peers in the underlying peer manager unit of the store protocol is picked randomly to fetch the history from. ## The history gets fetched successfully if the dialed peer has been online during the queried time window. if node.wakuStoreClient.isNil(): return @@ -660,7 +652,7 @@ proc resume*(node: WakuNode, peerList: Option[seq[RemotePeerInfo]] = none(seq[Re if retrievedMessages.isErr(): error "failed to resume store", error=retrievedMessages.error return - + info "the number of retrieved messages since the last online time: ", number=retrievedMessages.value @@ -672,10 +664,10 @@ proc mountLightPush*(node: WakuNode) {.async.} = var pushHandler: PushMessageHandler if node.wakuRelay.isNil(): debug "mounting lightpush without relay (nil)" - pushHandler = proc(peer: PeerId, pubsubTopic: string, message: WakuMessage): Future[WakuLightPushResult[void]] {.async.} = + pushHandler = proc(peer: PeerId, pubsubTopic: string, message: WakuMessage): Future[WakuLightPushResult[void]] {.async.} = return err("no waku relay found") else: - pushHandler = proc(peer: PeerId, pubsubTopic: string, message: WakuMessage): Future[WakuLightPushResult[void]] {.async.} = + pushHandler = proc(peer: PeerId, pubsubTopic: string, message: WakuMessage): Future[WakuLightPushResult[void]] {.async.} = discard await node.wakuRelay.publish(pubsubTopic, message.encode().buffer) return ok() @@ -723,7 +715,7 @@ proc lightpushPublish*(node: WakuNode, pubsubTopic: PubsubTopic, message: WakuMe if node.wakuLightpushClient.isNil(): error "failed to publish message", error="waku lightpush client is nil" return - + let peerOpt = node.peerManager.selectPeer(WakuLightPushCodec) if peerOpt.isNone(): error "failed to publish message", error="no suitable remote peers" @@ -777,11 +769,11 @@ proc mountLibp2pPing*(node: WakuNode) {.async, raises: [Defect, LPError].} = # This is necessary as `Ping.new*` does not have explicit `raises` requirement # @TODO: remove exception handling once explicit `raises` in ping module raise newException(LPError, "Failed to initialize ping protocol") - + if node.started: # Node has started already. Let's start ping too. await node.libp2pPing.start() - + node.switch.mount(node.libp2pPing) proc keepaliveLoop(node: WakuNode, keepalive: chronos.Duration) {.async.} = @@ -805,7 +797,7 @@ proc keepaliveLoop(node: WakuNode, keepalive: chronos.Duration) {.async.} = return discard await node.libp2pPing.ping(connOpt.get()) # Ping connection - + await sleepAsync(keepalive) proc startKeepalive*(node: WakuNode) = @@ -832,7 +824,7 @@ proc runDiscv5Loop(node: WakuNode) {.async.} = if discoveredPeers.isOk(): ## Let's attempt to connect to peers we ## have not encountered before - + trace "Discovered peers", count=discoveredPeers.get().len() let newPeers = discoveredPeers.get().filterIt( @@ -850,9 +842,9 @@ proc runDiscv5Loop(node: WakuNode) {.async.} = proc startDiscv5*(node: WakuNode): Future[bool] {.async.} = ## Start Discovery v5 service - + info "Starting discovery v5 service" - + if not node.wakuDiscv5.isNil(): ## First start listening on configured port try: @@ -861,12 +853,12 @@ proc startDiscv5*(node: WakuNode): Future[bool] {.async.} = except CatchableError: error "Failed to start discovery service. UDP port may be already in use" return false - + ## Start Discovery v5 trace "Start discv5 service" node.wakuDiscv5.start() trace "Start discovering new peers using discv5" - + asyncSpawn node.runDiscv5Loop() debug "Successfully started discovery v5 service" @@ -877,10 +869,10 @@ proc startDiscv5*(node: WakuNode): Future[bool] {.async.} = proc stopDiscv5*(node: WakuNode): Future[bool] {.async.} = ## Stop Discovery v5 service - + if not node.wakuDiscv5.isNil(): info "Stopping discovery v5 service" - + ## Stop Discovery v5 process and close listening port if node.wakuDiscv5.listening: trace "Stop listening on discv5 port" @@ -892,17 +884,17 @@ proc stopDiscv5*(node: WakuNode): Future[bool] {.async.} = proc start*(node: WakuNode) {.async.} = ## Starts a created Waku Node and ## all its mounted protocols. - + waku_version.set(1, labelValues=[git_version]) info "Starting Waku node", version=git_version - + let peerInfo = node.switch.peerInfo info "PeerInfo", peerId = peerInfo.peerId, addrs = peerInfo.addrs var listenStr = "" for address in node.announcedAddresses: - var fulladdr = "[" & $address & "/p2p/" & $peerInfo.peerId & "]" + var fulladdr = "[" & $address & "/p2p/" & $peerInfo.peerId & "]" listenStr &= fulladdr - + ## XXX: this should be /ip4..., / stripped? info "Listening on", full = listenStr info "DNS: discoverable ENR ", enr = node.enr.toUri() @@ -910,7 +902,7 @@ proc start*(node: WakuNode) {.async.} = # Perform relay-specific startup tasks TODO: this should be rethought if not node.wakuRelay.isNil(): await node.startRelay() - + ## The switch uses this mapper to update peer info addrs ## with announced addrs after start let addressMapper = @@ -922,16 +914,16 @@ proc start*(node: WakuNode) {.async.} = await node.switch.start() node.started = true - + info "Node started successfully" proc stop*(node: WakuNode) {.async.} = if not node.wakuRelay.isNil(): await node.wakuRelay.stop() - + if not node.wakuDiscv5.isNil(): discard await node.stopDiscv5() await node.switch.stop() - node.started = false \ No newline at end of file + node.started = false diff --git a/waku/v2/protocol/waku_store/client.nim b/waku/v2/protocol/waku_store/client.nim index 068056a7b..c280cb75a 100644 --- a/waku/v2/protocol/waku_store/client.nim +++ b/waku/v2/protocol/waku_store/client.nim @@ -15,7 +15,6 @@ import ../../utils/requests, ../../utils/time, ../waku_message, - ../waku_swap/waku_swap, ./protocol_metrics, ./common, ./rpc, @@ -27,7 +26,7 @@ logScope: topics = "waku store client" -const +const DefaultPageSize*: uint64 = 20 # A recommended default number of waku messages per page @@ -35,12 +34,11 @@ type WakuStoreClient* = ref object peerManager: PeerManager rng: ref rand.HmacDrbgContext store: MessageStore - wakuSwap: WakuSwap proc new*(T: type WakuStoreClient, peerManager: PeerManager, rng: ref rand.HmacDrbgContext, - store: MessageStore): T = + store: MessageStore): T = WakuStoreClient(peerManager: peerManager, rng: rng, store: store) proc sendHistoryQueryRPC(w: WakuStoreClient, req: HistoryQuery, peer: RemotePeerInfo): Future[HistoryResult] {.async, gcsafe.} = @@ -49,7 +47,7 @@ proc sendHistoryQueryRPC(w: WakuStoreClient, req: HistoryQuery, peer: RemotePeer if connOpt.isNone(): waku_store_errors.inc(labelValues = [dialFailure]) return err(HistoryError(kind: HistoryErrorKind.PEER_DIAL_FAILURE, address: $peer)) - + let connection = connOpt.get() @@ -82,7 +80,7 @@ proc query*(w: WakuStoreClient, req: HistoryQuery, peer: RemotePeerInfo): Future return await w.sendHistoryQueryRPC(req, peer) proc queryAll*(w: WakuStoreClient, query: HistoryQuery, peer: RemotePeerInfo): Future[WakuStoreResult[seq[WakuMessage]]] {.async, gcsafe.} = - ## A thin wrapper for query. Sends the query to the given peer. when the query has a valid pagingInfo, + ## A thin wrapper for query. Sends the query to the given peer. when the query has a valid pagingInfo, ## it retrieves the historical messages in pages. ## Returns all the fetched messages, if error occurs, returns an error string @@ -93,7 +91,7 @@ proc queryAll*(w: WakuStoreClient, query: HistoryQuery, peer: RemotePeerInfo): F while true: let queryRes = await w.query(req, peer) - if queryRes.isErr(): + if queryRes.isErr(): return err($queryRes.error) let response = queryRes.get() @@ -114,7 +112,7 @@ proc queryAll*(w: WakuStoreClient, query: HistoryQuery, peer: RemotePeerInfo): F const StoreResumeTimeWindowOffset: Timestamp = getNanosecondTime(20) ## Adjust the time window with an offset of 20 seconds -proc queryLoop(w: WakuStoreClient, req: HistoryQuery, peers: seq[RemotePeerInfo]): Future[WakuStoreResult[seq[WakuMessage]]] {.async, gcsafe.} = +proc queryLoop(w: WakuStoreClient, req: HistoryQuery, peers: seq[RemotePeerInfo]): Future[WakuStoreResult[seq[WakuMessage]]] {.async, gcsafe.} = ## Loops through the peers candidate list in order and sends the query to each ## ## Once all responses have been received, the retrieved messages are consolidated into one deduplicated list. @@ -127,8 +125,8 @@ proc queryLoop(w: WakuStoreClient, req: HistoryQuery, peers: seq[RemotePeerInfo] .map(proc (fut: Future[WakuStoreResult[seq[WakuMessage]]]): seq[WakuMessage] = try: # fut.read() can raise a CatchableError - # These futures have been awaited before using allFutures(). Call completed() just as a sanity check. - if not fut.completed() or fut.read().isErr(): + # These futures have been awaited before using allFutures(). Call completed() just as a sanity check. + if not fut.completed() or fut.read().isErr(): return @[] fut.read().value @@ -140,30 +138,30 @@ proc queryLoop(w: WakuStoreClient, req: HistoryQuery, peers: seq[RemotePeerInfo] return ok(messagesList) -proc resume*(w: WakuStoreClient, - peerList = none(seq[RemotePeerInfo]), +proc resume*(w: WakuStoreClient, + peerList = none(seq[RemotePeerInfo]), pageSize = DefaultPageSize, pubsubTopic = DefaultPubsubTopic): Future[WakuStoreResult[uint64]] {.async, gcsafe.} = - ## resume proc retrieves the history of waku messages published on the default waku pubsub topic since the last time the waku store node has been online + ## resume proc retrieves the history of waku messages published on the default waku pubsub topic since the last time the waku store node has been online ## messages are stored in the store node's messages field and in the message db - ## the offline time window is measured as the difference between the current time and the timestamp of the most recent persisted waku message + ## the offline time window is measured as the difference between the current time and the timestamp of the most recent persisted waku message ## an offset of 20 second is added to the time window to count for nodes asynchrony ## peerList indicates the list of peers to query from. ## The history is fetched from all available peers in this list and then consolidated into one deduplicated list. ## Such candidates should be found through a discovery method (to be developed). - ## if no peerList is passed, one of the peers in the underlying peer manager unit of the store protocol is picked randomly to fetch the history from. + ## if no peerList is passed, one of the peers in the underlying peer manager unit of the store protocol is picked randomly to fetch the history from. ## The history gets fetched successfully if the dialed peer has been online during the queried time window. ## the resume proc returns the number of retrieved messages if no error occurs, otherwise returns the error string - + # If store has not been provided, don't even try if w.store.isNil(): return err("store not provided (nil)") # NOTE: Original implementation is based on the message's sender timestamp. At the moment - # of writing, the sqlite store implementation returns the last message's receiver + # of writing, the sqlite store implementation returns the last message's receiver # timestamp. # lastSeenTime = lastSeenItem.get().msg.timestamp - let + let lastSeenTime = w.store.getNewestMessageTimestamp().get(Timestamp(0)) now = getNanosecondTime(getTime().toUnixFloat()) @@ -175,7 +173,7 @@ proc resume*(w: WakuStoreClient, let req = HistoryQuery( pubsubTopic: some(pubsubTopic), - startTime: some(queryStartTime), + startTime: some(queryStartTime), endTime: some(queryEndTime), pageSize: uint64(pageSize), ascending: true @@ -188,7 +186,7 @@ proc resume*(w: WakuStoreClient, else: debug "no candidate list is provided, selecting a random peer" - # if no peerList is set then query from one of the peers stored in the peer manager + # if no peerList is set then query from one of the peers stored in the peer manager let peerOpt = w.peerManager.selectPeer(WakuStoreCodec) if peerOpt.isNone(): warn "no suitable remote peers" @@ -198,7 +196,7 @@ proc resume*(w: WakuStoreClient, debug "a peer is selected from peer manager" res = await w.queryAll(req, peerOpt.get()) - if res.isErr(): + if res.isErr(): debug "failed to resume the history" return err("failed to resume the history") diff --git a/waku/v2/protocol/waku_store/protocol.nim b/waku/v2/protocol/waku_store/protocol.nim index 8ffdee1e9..4227fb3d7 100644 --- a/waku/v2/protocol/waku_store/protocol.nim +++ b/waku/v2/protocol/waku_store/protocol.nim @@ -10,7 +10,7 @@ import std/[tables, times, sequtils, options, algorithm], stew/results, chronicles, - chronos, + chronos, bearssl/rand, libp2p/crypto/crypto, libp2p/protocols/protocol, @@ -22,7 +22,6 @@ import ../../node/peer_manager/peer_manager, ../../utils/time, ../waku_message, - ../waku_swap/waku_swap, ./common, ./rpc, ./rpc_codec, @@ -34,7 +33,7 @@ logScope: topics = "waku store" -const +const MaxMessageTimestampVariance* = getNanoSecondTime(20) # 20 seconds maximum allowable sender timestamp "drift" @@ -43,7 +42,6 @@ type peerManager*: PeerManager rng*: ref rand.HmacDrbgContext store*: MessageStore - wakuSwap*: WakuSwap retentionPolicy: Option[MessageRetentionPolicy] @@ -63,7 +61,7 @@ proc executeMessageRetentionPolicy*(w: WakuStore) = debug "failed execution of retention policy", error=retPolicyRes.error # TODO: Move to a message store wrapper -proc reportStoredMessagesMetric*(w: WakuStore) = +proc reportStoredMessagesMetric*(w: WakuStore) = if w.store.isNil(): return @@ -78,7 +76,7 @@ proc isValidMessage(msg: WakuMessage): bool = if msg.timestamp == 0: return true - let + let now = getNanosecondTime(getTime().toUnixFloat()) lowerBound = now - MaxMessageTimestampVariance upperBound = now + MaxMessageTimestampVariance @@ -94,19 +92,19 @@ proc handleMessage*(w: WakuStore, pubsubTopic: PubsubTopic, msg: WakuMessage) = if msg.ephemeral: # The message is ephemeral, should not be stored return - + if not isValidMessage(msg): waku_store_errors.inc(labelValues = [invalidMessage]) return let insertStartTime = getTime().toUnixFloat() - + block: let - msgDigest = computeDigest(msg) + msgDigest = computeDigest(msg) msgReceivedTime = if msg.timestamp > 0: msg.timestamp - else: getNanosecondTime(getTime().toUnixFloat()) + else: getNanosecondTime(getTime().toUnixFloat()) trace "handling message", pubsubTopic=pubsubTopic, contentTopic=msg.contentTopic, timestamp=msg.timestamp, digest=msgDigest @@ -123,14 +121,14 @@ proc handleMessage*(w: WakuStore, pubsubTopic: PubsubTopic, msg: WakuMessage) = # TODO: Move to a message store wrapper proc findMessages(w: WakuStore, query: HistoryQuery): HistoryResult {.gcsafe.} = ## Query history to return a single page of messages matching the query - + # Extract query criteria. All query criteria are optional let qContentTopics = if query.contentTopics.len == 0: none(seq[ContentTopic]) else: some(query.contentTopics) qPubSubTopic = query.pubsubTopic qCursor = query.cursor - qStartTime = query.startTime + qStartTime = query.startTime qEndTime = query.endTime qMaxPageSize = if query.pageSize <= 0: DefaultPageSize else: min(query.pageSize, MaxPageSize) @@ -138,7 +136,7 @@ proc findMessages(w: WakuStore, query: HistoryQuery): HistoryResult {.gcsafe.} = let queryStartTime = getTime().toUnixFloat() - + let queryRes = w.store.getMessagesByHistoryQuery( contentTopic = qContentTopics, pubsubTopic = qPubSubTopic, @@ -159,15 +157,15 @@ proc findMessages(w: WakuStore, query: HistoryQuery): HistoryResult {.gcsafe.} = return err(HistoryError(kind: HistoryErrorKind.UNKNOWN)) let rows = queryRes.get() - + if rows.len <= 0: return ok(HistoryResponse( - messages: @[], + messages: @[], pageSize: 0, ascending: qAscendingOrder, cursor: none(HistoryCursor) )) - + var messages = if rows.len <= int(qMaxPageSize): rows.mapIt(it[1]) else: rows[0..^2].mapIt(it[1]) @@ -177,7 +175,7 @@ proc findMessages(w: WakuStore, query: HistoryQuery): HistoryResult {.gcsafe.} = if not qAscendingOrder: messages.reverse() - + if rows.len > int(qMaxPageSize): ## Build last message cursor ## The cursor is built from the last message INCLUDED in the response @@ -190,7 +188,7 @@ proc findMessages(w: WakuStore, query: HistoryQuery): HistoryResult {.gcsafe.} = messageDigest[i] = digest[i] cursor = some(HistoryCursor( - pubsubTopic: pubsubTopic, + pubsubTopic: pubsubTopic, senderTime: message.timestamp, storeTime: storeTimestamp, digest: MessageDigest(data: messageDigest) @@ -198,7 +196,7 @@ proc findMessages(w: WakuStore, query: HistoryQuery): HistoryResult {.gcsafe.} = ok(HistoryResponse( - messages: messages, + messages: messages, pageSize: uint64(messages.len), ascending: qAscendingOrder, cursor: cursor @@ -207,8 +205,8 @@ proc findMessages(w: WakuStore, query: HistoryQuery): HistoryResult {.gcsafe.} = ## Protocol -proc initProtocolHandler*(ws: WakuStore) = - +proc initProtocolHandler(ws: WakuStore) = + proc handler(conn: Connection, proto: string) {.async.} = let buf = await conn.readLp(MaxRpcSize.int) @@ -233,9 +231,9 @@ proc initProtocolHandler*(ws: WakuStore) = waku_store_queries.inc() - if ws.store.isNil(): + if ws.store.isNil(): let respErr = HistoryError(kind: HistoryErrorKind.SERVICE_UNAVAILABLE) - + error "history query failed", peerId=conn.peerId, requestId=reqRpc.requestId, error= $respErr let resp = HistoryResponseRPC(error: respErr.toRPC()) @@ -245,7 +243,7 @@ proc initProtocolHandler*(ws: WakuStore) = let query = reqRpc.query.get().toAPI() - + let respRes = ws.findMessages(query) if respRes.isErr(): @@ -259,16 +257,6 @@ proc initProtocolHandler*(ws: WakuStore) = let resp = respRes.toRPC() - if not ws.wakuSwap.isNil(): - info "handle store swap", peerId=conn.peerId, requestId=reqRpc.requestId, text=ws.wakuSwap.text - - # Perform accounting operation - # TODO: Do accounting here, response is HistoryResponseRPC. How do we get node or swap context? - let peerId = conn.peerId - let messages = resp.messages - ws.wakuSwap.credit(peerId, messages.len) - - info "sending history response", peerId=conn.peerId, requestId=reqRpc.requestId, messages=resp.messages.len let rpc = HistoryRPC(requestId: reqRpc.requestId, response: some(resp)) @@ -277,17 +265,15 @@ proc initProtocolHandler*(ws: WakuStore) = ws.handler = handler ws.codec = WakuStoreCodec -proc new*(T: type WakuStore, - peerManager: PeerManager, +proc new*(T: type WakuStore, + peerManager: PeerManager, rng: ref rand.HmacDrbgContext, - store: MessageStore, - wakuSwap: WakuSwap = nil, + store: MessageStore, retentionPolicy=none(MessageRetentionPolicy)): T = let ws = WakuStore( - rng: rng, - peerManager: peerManager, - store: store, - wakuSwap: wakuSwap, + rng: rng, + peerManager: peerManager, + store: store, retentionPolicy: retentionPolicy ) ws.initProtocolHandler()