diff --git a/tests/v2/test_jsonrpc_waku.nim b/tests/v2/test_jsonrpc_waku.nim index 016d6a20c..b7f0e44c3 100644 --- a/tests/v2/test_jsonrpc_waku.nim +++ b/tests/v2/test_jsonrpc_waku.nim @@ -258,7 +258,7 @@ procSuite "Waku v2 JSON-RPC API": WakuMessage(payload: @[byte 9], contentTopic: ContentTopic("2"), timestamp: 9)] for wakuMsg in msgList: - waitFor node.wakuStore.handleMessage(defaultTopic, wakuMsg) + node.wakuStore.handleMessage(defaultTopic, wakuMsg) let client = newRpcHttpClient() await client.connect("127.0.0.1", rpcPort, false) diff --git a/tests/v2/test_waku_store.nim b/tests/v2/test_waku_store.nim index da98272b8..01d3494d5 100644 --- a/tests/v2/test_waku_store.nim +++ b/tests/v2/test_waku_store.nim @@ -97,8 +97,8 @@ suite "Waku Store": msg1 = fakeWakuMessage(contentTopic=topic) msg2 = fakeWakuMessage() - await serverProto.handleMessage("foo", msg1) - await serverProto.handleMessage("foo", msg2) + serverProto.handleMessage("foo", msg1) + serverProto.handleMessage("foo", msg2) ## When let rpc = HistoryQuery(contentFilters: @[HistoryContentFilter(contentTopic: topic)]) @@ -141,9 +141,9 @@ suite "Waku Store": msg2 = fakeWakuMessage(contentTopic=topic2) msg3 = fakeWakuMessage(contentTopic=topic3) - await serverProto.handleMessage("foo", msg1) - await serverProto.handleMessage("foo", msg2) - await serverProto.handleMessage("foo", msg3) + serverProto.handleMessage("foo", msg1) + serverProto.handleMessage("foo", msg2) + serverProto.handleMessage("foo", msg3) ## When let rpc = HistoryQuery(contentFilters: @[ @@ -194,9 +194,9 @@ suite "Waku Store": msg2 = fakeWakuMessage(contentTopic=contentTopic2) msg3 = fakeWakuMessage(contentTopic=contentTopic3) - await serverProto.handleMessage(pubsubtopic1, msg1) - await serverProto.handleMessage(pubsubtopic2, msg2) - await serverProto.handleMessage(pubsubtopic2, msg3) + serverProto.handleMessage(pubsubtopic1, msg1) + serverProto.handleMessage(pubsubtopic2, msg2) + serverProto.handleMessage(pubsubtopic2, msg3) ## When # this query targets: pubsubtopic1 AND (contentTopic1 OR contentTopic3) @@ -243,9 +243,9 @@ suite "Waku Store": msg2 = fakeWakuMessage() msg3 = fakeWakuMessage() - await serverProto.handleMessage(pubsubtopic2, msg1) - await serverProto.handleMessage(pubsubtopic2, msg2) - await serverProto.handleMessage(pubsubtopic2, msg3) + serverProto.handleMessage(pubsubtopic2, msg1) + serverProto.handleMessage(pubsubtopic2, msg2) + serverProto.handleMessage(pubsubtopic2, msg3) ## When let rpc = HistoryQuery(pubsubTopic: pubsubTopic1) @@ -284,9 +284,9 @@ suite "Waku Store": msg2 = fakeWakuMessage(payload="TEST-2") msg3 = fakeWakuMessage(payload="TEST-3") - await serverProto.handleMessage(pubsubTopic, msg1) - await serverProto.handleMessage(pubsubTopic, msg2) - await serverProto.handleMessage(pubsubTopic, msg3) + serverProto.handleMessage(pubsubTopic, msg1) + serverProto.handleMessage(pubsubTopic, msg2) + serverProto.handleMessage(pubsubTopic, msg3) ## When let rpc = HistoryQuery(pubsubTopic: pubsubTopic) @@ -335,7 +335,7 @@ suite "Waku Store": ] for msg in msgList: - await serverProto.handleMessage("foo", msg) + serverProto.handleMessage("foo", msg) ## When let rpc = HistoryQuery( @@ -387,7 +387,7 @@ suite "Waku Store": ] for msg in msgList: - await serverProto.handleMessage("foo", msg) + serverProto.handleMessage("foo", msg) ## When let rpc = HistoryQuery( @@ -439,7 +439,7 @@ suite "Waku Store": ] for msg in msgList: - await serverProto.handleMessage("foo", msg) + serverProto.handleMessage("foo", msg) ## When let rpc = HistoryQuery(contentFilters: @[HistoryContentFilter(contentTopic: DefaultContentTopic)]) @@ -475,7 +475,7 @@ suite "Waku Store": ] for msg in msgList: - await proto.handleMessage(DefaultPubsubTopic, msg) + proto.handleMessage(DefaultPubsubTopic, msg) check: store.len == 2 @@ -529,7 +529,7 @@ procSuite "Waku Store - fault tolerant store": ] for msg in msgList: - await proto.handleMessage(DefaultPubsubTopic, msg) + proto.handleMessage(DefaultPubsubTopic, msg) let (listenSwitch2, dialSwitch2, proto2) = await newTestWakuStore() let msgList2 = @[ @@ -544,7 +544,7 @@ procSuite "Waku Store - fault tolerant store": ] for msg in msgList2: - await proto2.handleMessage(DefaultPubsubTopic, msg) + proto2.handleMessage(DefaultPubsubTopic, msg) asyncTest "handle temporal history query with a valid time window": diff --git a/tests/v2/test_waku_swap.nim b/tests/v2/test_waku_swap.nim index 890d1db3b..fb8a1a31d 100644 --- a/tests/v2/test_waku_swap.nim +++ b/tests/v2/test_waku_swap.nim @@ -70,7 +70,7 @@ procSuite "Waku SWAP Accounting": await node2.mountSwap() await node2.mountStore(store=StoreQueueRef.new()) - await node2.wakuStore.handleMessage("/waku/2/default-waku/proto", message) + node2.wakuStore.handleMessage("/waku/2/default-waku/proto", message) await sleepAsync(500.millis) @@ -120,7 +120,7 @@ procSuite "Waku SWAP Accounting": await node2.mountSwap(swapConfig) await node2.mountStore(store=StoreQueueRef.new()) - await node2.wakuStore.handleMessage("/waku/2/default-waku/proto", message) + node2.wakuStore.handleMessage("/waku/2/default-waku/proto", message) await sleepAsync(500.millis) diff --git a/tests/v2/test_wakunode_store.nim b/tests/v2/test_wakunode_store.nim index b5c3b7c3c..e751f72e8 100644 --- a/tests/v2/test_wakunode_store.nim +++ b/tests/v2/test_wakunode_store.nim @@ -54,7 +54,7 @@ procSuite "WakuNode - Store": await node2.start() await node2.mountStore(store=newTestMessageStore()) - await node2.wakuStore.handleMessage("/waku/2/default-waku/proto", message) + node2.wakuStore.handleMessage("/waku/2/default-waku/proto", message) await sleepAsync(500.millis) @@ -149,7 +149,7 @@ procSuite "WakuNode - Store": await node2.start() await node2.mountStore(store=StoreQueueRef.new()) - await node2.wakuStore.handleMessage("/waku/2/default-waku/proto", message) + node2.wakuStore.handleMessage("/waku/2/default-waku/proto", message) await sleepAsync(500.millis) @@ -182,8 +182,8 @@ procSuite "WakuNode - Store": await client.mountStore(store=StoreQueueRef.new()) await server.mountStore(store=StoreQueueRef.new()) - await server.wakuStore.handleMessage(DefaultTopic, msg1) - await server.wakuStore.handleMessage(DefaultTopic, msg2) + server.wakuStore.handleMessage(DefaultTopic, msg1) + server.wakuStore.handleMessage(DefaultTopic, msg2) client.wakuStore.setPeer(server.switch.peerInfo.toRemotePeerInfo()) diff --git a/vendor/nim-libbacktrace/vendor/libbacktrace-upstream/libtool b/vendor/nim-libbacktrace/vendor/libbacktrace-upstream/libtool index 1c1797019..2a852a250 100755 --- a/vendor/nim-libbacktrace/vendor/libbacktrace-upstream/libtool +++ b/vendor/nim-libbacktrace/vendor/libbacktrace-upstream/libtool @@ -2,7 +2,7 @@ # libtool - Provide generalized library-building support services. # Generated automatically by config.status (libbacktrace) version-unused -# Libtool was configured on host fv-az246-635: +# Libtool was configured on host fv-az208-287: # NOTE: Changes made to this file will be lost: look at ltmain.sh. # # Copyright (C) 1996, 1997, 1998, 1999, 2000, 2001, 2003, 2004, 2005, diff --git a/waku/v2/node/wakunode2.nim b/waku/v2/node/wakunode2.nim index 00577bcb5..7f30ba00f 100644 --- a/waku/v2/node/wakunode2.nim +++ b/waku/v2/node/wakunode2.nim @@ -211,15 +211,20 @@ proc subscribe(node: WakuNode, topic: Topic, handler: Option[TopicHandler]) = trace "Hit default handler", topic=topic, data=data let msg = WakuMessage.init(data) - if msg.isOk(): - # Notify mounted protocols of new message - if (not node.wakuFilter.isNil): - await node.wakuFilter.handleMessage(topic, msg.value()) - - if (not node.wakuStore.isNil): - await node.wakuStore.handleMessage(topic, msg.value()) + if msg.isErr(): + # TODO: Add metric to track waku message decode errors + return + + + # Notify mounted protocols of new message + if not node.wakuFilter.isNil(): + await node.wakuFilter.handleMessage(topic, msg.value) + + if not node.wakuStore.isNil(): + node.wakuStore.handleMessage(topic, msg.value) + + waku_node_messages.inc(labelValues = ["relay"]) - waku_node_messages.inc(labelValues = ["relay"]) let wakuRelay = node.wakuRelay @@ -432,7 +437,7 @@ proc mountFilter*(node: WakuNode, filterTimeout: Duration = WakuFilterTimeout) { if not node.wakuStore.isNil and (requestId in node.filters): let pubSubTopic = node.filters[requestId].pubSubTopic - await node.wakuStore.handleMessage(pubSubTopic, message) + node.wakuStore.handleMessage(pubSubTopic, message) waku_node_messages.inc(labelValues = ["filter"]) @@ -456,7 +461,37 @@ proc mountSwap*(node: WakuNode, swapConfig: SwapConfig = SwapConfig.init()) {.as node.switch.mount(node.wakuSwap, protocolMatcher(WakuSwapCodec)) -proc mountStore*(node: WakuNode, store: MessageStore = nil, capacity = StoreDefaultCapacity, retentionPolicy=none(MessageRetentionPolicy) ) {.async, raises: [Defect, LPError].} = + +const MessageStoreDefaultRetentionPolicyInterval = 30.minutes + +proc executeMessageRetentionPolicy(node: WakuNode) = + if node.wakuStore.isNil(): + return + + if node.wakuStore.store.isNil(): + return + + debug "executing message retention policy" + + node.wakuStore.executeMessageRetentionPolicy() + node.wakuStore.reportStoredMessagesMetric() + +proc startMessageRetentionPolicyPeriodicTask(node: WakuNode, interval: Duration) = + if node.wakuStore.isNil(): + return + + if node.wakuStore.store.isNil(): + return + + # https://github.com/nim-lang/Nim/issues/17369 + var executeRetentionPolicy: proc(udata: pointer) {.gcsafe, raises: [Defect].} + executeRetentionPolicy = proc(udata: pointer) {.gcsafe.} = + executeMessageRetentionPolicy(node) + discard setTimer(Moment.fromNow(interval), executeRetentionPolicy) + + discard setTimer(Moment.fromNow(interval), executeRetentionPolicy) + +proc mountStore*(node: WakuNode, store: MessageStore = nil, retentionPolicy=none(MessageRetentionPolicy) ) {.async, raises: [Defect, LPError].} = if node.wakuSwap.isNil(): info "mounting waku store protocol (no waku swap)" else: @@ -469,7 +504,7 @@ proc mountStore*(node: WakuNode, store: MessageStore = nil, capacity = StoreDefa wakuSwap=node.wakuSwap, retentionPolicy=retentionPolicy ) - + if node.started: # Node has started already. Let's start store too. await node.wakuStore.start() @@ -1061,6 +1096,9 @@ when isMainModule: else: CapacityRetentionPolicy.init(conf.storeCapacity) waitFor mountStore(node, mStorage, retentionPolicy=some(retentionPolicy)) + executeMessageRetentionPolicy(node) + startMessageRetentionPolicyPeriodicTask(node, interval=MessageStoreDefaultRetentionPolicyInterval) + if conf.storenode != "": setStorePeer(node, conf.storenode) diff --git a/waku/v2/protocol/waku_store/protocol.nim b/waku/v2/protocol/waku_store/protocol.nim index 6ebc04fc9..1d2804a91 100644 --- a/waku/v2/protocol/waku_store/protocol.nim +++ b/waku/v2/protocol/waku_store/protocol.nim @@ -75,16 +75,19 @@ type retentionPolicy: Option[MessageRetentionPolicy] -proc executeMessageRetentionPolicy*(w: WakuStore): WakuStoreResult[void] = +proc executeMessageRetentionPolicy*(w: WakuStore) = if w.retentionPolicy.isNone(): - return ok() + return + + if w.store.isNil(): + return let policy = w.retentionPolicy.get() - if w.store.isNil(): - return err("no message store provided (nil)") - - policy.execute(w.store) + let retPolicyRes = policy.execute(w.store) + if retPolicyRes.isErr(): + waku_store_errors.inc(labelValues = [retPolicyFailure]) + debug "failed execution of retention policy", error=retPolicyRes.error proc reportStoredMessagesMetric*(w: WakuStore) = @@ -198,14 +201,6 @@ proc init*(T: type WakuStore, ) ws.initProtocolHandler() - # TODO: Move to wakunode - # Execute retention policy on initialization - let retPolicyRes = ws.executeMessageRetentionPolicy() - if retPolicyRes.isErr(): - warn "an error occurred while applying the retention policy at init", error=retPolicyRes.error - - ws.reportStoredMessagesMetric() - return ws proc init*(T: type WakuStore, @@ -217,7 +212,7 @@ proc init*(T: type WakuStore, WakuStore.init(peerManager, rng, store, wakuSwap, retentionPolicy) -proc handleMessage*(w: WakuStore, pubsubTopic: string, msg: WakuMessage) {.async.} = +proc handleMessage*(w: WakuStore, pubsubTopic: string, msg: WakuMessage) = if w.store.isNil(): # Messages should not be stored return @@ -240,14 +235,6 @@ proc handleMessage*(w: WakuStore, pubsubTopic: string, msg: WakuMessage) {.async waku_store_errors.inc(labelValues = [insertFailure]) return - # Execute the retention policy after insertion - let retPolicyRes = w.executeMessageRetentionPolicy() - if retPolicyRes.isErr(): - debug "message retention policy failure", error=retPolicyRes.error - waku_store_errors.inc(labelValues = [retPolicyFailure]) - - w.reportStoredMessagesMetric() - let insertDuration = getTime().toUnixFloat() - insertStartTime waku_store_insert_duration_seconds.observe(insertDuration) @@ -435,16 +422,6 @@ proc resume*(w: WakuStore, added.inc() - debug "resume finished successfully", retrievedMessages=res.get().len, addedMessages=added - - # Execute the retention policy after insertion - let retPolicyRes = w.executeMessageRetentionPolicy() - if retPolicyRes.isErr(): - debug "message retention policy failure", error=retPolicyRes.error - waku_store_errors.inc(labelValues = [retPolicyFailure]) - - w.reportStoredMessagesMetric() - return ok(added)