feat(store): execute retention policies periodically (#1155)

This commit is contained in:
Lorenzo Delgado 2022-09-21 11:32:59 +02:00 committed by GitHub
parent efe82eade2
commit 11832411a0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 86 additions and 71 deletions

View File

@ -258,7 +258,7 @@ procSuite "Waku v2 JSON-RPC API":
WakuMessage(payload: @[byte 9], contentTopic: ContentTopic("2"), timestamp: 9)]
for wakuMsg in msgList:
waitFor node.wakuStore.handleMessage(defaultTopic, wakuMsg)
node.wakuStore.handleMessage(defaultTopic, wakuMsg)
let client = newRpcHttpClient()
await client.connect("127.0.0.1", rpcPort, false)

View File

@ -97,8 +97,8 @@ suite "Waku Store":
msg1 = fakeWakuMessage(contentTopic=topic)
msg2 = fakeWakuMessage()
await serverProto.handleMessage("foo", msg1)
await serverProto.handleMessage("foo", msg2)
serverProto.handleMessage("foo", msg1)
serverProto.handleMessage("foo", msg2)
## When
let rpc = HistoryQuery(contentFilters: @[HistoryContentFilter(contentTopic: topic)])
@ -141,9 +141,9 @@ suite "Waku Store":
msg2 = fakeWakuMessage(contentTopic=topic2)
msg3 = fakeWakuMessage(contentTopic=topic3)
await serverProto.handleMessage("foo", msg1)
await serverProto.handleMessage("foo", msg2)
await serverProto.handleMessage("foo", msg3)
serverProto.handleMessage("foo", msg1)
serverProto.handleMessage("foo", msg2)
serverProto.handleMessage("foo", msg3)
## When
let rpc = HistoryQuery(contentFilters: @[
@ -194,9 +194,9 @@ suite "Waku Store":
msg2 = fakeWakuMessage(contentTopic=contentTopic2)
msg3 = fakeWakuMessage(contentTopic=contentTopic3)
await serverProto.handleMessage(pubsubtopic1, msg1)
await serverProto.handleMessage(pubsubtopic2, msg2)
await serverProto.handleMessage(pubsubtopic2, msg3)
serverProto.handleMessage(pubsubtopic1, msg1)
serverProto.handleMessage(pubsubtopic2, msg2)
serverProto.handleMessage(pubsubtopic2, msg3)
## When
# this query targets: pubsubtopic1 AND (contentTopic1 OR contentTopic3)
@ -243,9 +243,9 @@ suite "Waku Store":
msg2 = fakeWakuMessage()
msg3 = fakeWakuMessage()
await serverProto.handleMessage(pubsubtopic2, msg1)
await serverProto.handleMessage(pubsubtopic2, msg2)
await serverProto.handleMessage(pubsubtopic2, msg3)
serverProto.handleMessage(pubsubtopic2, msg1)
serverProto.handleMessage(pubsubtopic2, msg2)
serverProto.handleMessage(pubsubtopic2, msg3)
## When
let rpc = HistoryQuery(pubsubTopic: pubsubTopic1)
@ -284,9 +284,9 @@ suite "Waku Store":
msg2 = fakeWakuMessage(payload="TEST-2")
msg3 = fakeWakuMessage(payload="TEST-3")
await serverProto.handleMessage(pubsubTopic, msg1)
await serverProto.handleMessage(pubsubTopic, msg2)
await serverProto.handleMessage(pubsubTopic, msg3)
serverProto.handleMessage(pubsubTopic, msg1)
serverProto.handleMessage(pubsubTopic, msg2)
serverProto.handleMessage(pubsubTopic, msg3)
## When
let rpc = HistoryQuery(pubsubTopic: pubsubTopic)
@ -335,7 +335,7 @@ suite "Waku Store":
]
for msg in msgList:
await serverProto.handleMessage("foo", msg)
serverProto.handleMessage("foo", msg)
## When
let rpc = HistoryQuery(
@ -387,7 +387,7 @@ suite "Waku Store":
]
for msg in msgList:
await serverProto.handleMessage("foo", msg)
serverProto.handleMessage("foo", msg)
## When
let rpc = HistoryQuery(
@ -439,7 +439,7 @@ suite "Waku Store":
]
for msg in msgList:
await serverProto.handleMessage("foo", msg)
serverProto.handleMessage("foo", msg)
## When
let rpc = HistoryQuery(contentFilters: @[HistoryContentFilter(contentTopic: DefaultContentTopic)])
@ -475,7 +475,7 @@ suite "Waku Store":
]
for msg in msgList:
await proto.handleMessage(DefaultPubsubTopic, msg)
proto.handleMessage(DefaultPubsubTopic, msg)
check:
store.len == 2
@ -529,7 +529,7 @@ procSuite "Waku Store - fault tolerant store":
]
for msg in msgList:
await proto.handleMessage(DefaultPubsubTopic, msg)
proto.handleMessage(DefaultPubsubTopic, msg)
let (listenSwitch2, dialSwitch2, proto2) = await newTestWakuStore()
let msgList2 = @[
@ -544,7 +544,7 @@ procSuite "Waku Store - fault tolerant store":
]
for msg in msgList2:
await proto2.handleMessage(DefaultPubsubTopic, msg)
proto2.handleMessage(DefaultPubsubTopic, msg)
asyncTest "handle temporal history query with a valid time window":

View File

@ -70,7 +70,7 @@ procSuite "Waku SWAP Accounting":
await node2.mountSwap()
await node2.mountStore(store=StoreQueueRef.new())
await node2.wakuStore.handleMessage("/waku/2/default-waku/proto", message)
node2.wakuStore.handleMessage("/waku/2/default-waku/proto", message)
await sleepAsync(500.millis)
@ -120,7 +120,7 @@ procSuite "Waku SWAP Accounting":
await node2.mountSwap(swapConfig)
await node2.mountStore(store=StoreQueueRef.new())
await node2.wakuStore.handleMessage("/waku/2/default-waku/proto", message)
node2.wakuStore.handleMessage("/waku/2/default-waku/proto", message)
await sleepAsync(500.millis)

View File

@ -54,7 +54,7 @@ procSuite "WakuNode - Store":
await node2.start()
await node2.mountStore(store=newTestMessageStore())
await node2.wakuStore.handleMessage("/waku/2/default-waku/proto", message)
node2.wakuStore.handleMessage("/waku/2/default-waku/proto", message)
await sleepAsync(500.millis)
@ -149,7 +149,7 @@ procSuite "WakuNode - Store":
await node2.start()
await node2.mountStore(store=StoreQueueRef.new())
await node2.wakuStore.handleMessage("/waku/2/default-waku/proto", message)
node2.wakuStore.handleMessage("/waku/2/default-waku/proto", message)
await sleepAsync(500.millis)
@ -182,8 +182,8 @@ procSuite "WakuNode - Store":
await client.mountStore(store=StoreQueueRef.new())
await server.mountStore(store=StoreQueueRef.new())
await server.wakuStore.handleMessage(DefaultTopic, msg1)
await server.wakuStore.handleMessage(DefaultTopic, msg2)
server.wakuStore.handleMessage(DefaultTopic, msg1)
server.wakuStore.handleMessage(DefaultTopic, msg2)
client.wakuStore.setPeer(server.switch.peerInfo.toRemotePeerInfo())

View File

@ -211,15 +211,20 @@ proc subscribe(node: WakuNode, topic: Topic, handler: Option[TopicHandler]) =
trace "Hit default handler", topic=topic, data=data
let msg = WakuMessage.init(data)
if msg.isOk():
# Notify mounted protocols of new message
if (not node.wakuFilter.isNil):
await node.wakuFilter.handleMessage(topic, msg.value())
if (not node.wakuStore.isNil):
await node.wakuStore.handleMessage(topic, msg.value())
if msg.isErr():
# TODO: Add metric to track waku message decode errors
return
# Notify mounted protocols of new message
if not node.wakuFilter.isNil():
await node.wakuFilter.handleMessage(topic, msg.value)
if not node.wakuStore.isNil():
node.wakuStore.handleMessage(topic, msg.value)
waku_node_messages.inc(labelValues = ["relay"])
waku_node_messages.inc(labelValues = ["relay"])
let wakuRelay = node.wakuRelay
@ -432,7 +437,7 @@ proc mountFilter*(node: WakuNode, filterTimeout: Duration = WakuFilterTimeout) {
if not node.wakuStore.isNil and (requestId in node.filters):
let pubSubTopic = node.filters[requestId].pubSubTopic
await node.wakuStore.handleMessage(pubSubTopic, message)
node.wakuStore.handleMessage(pubSubTopic, message)
waku_node_messages.inc(labelValues = ["filter"])
@ -456,7 +461,37 @@ proc mountSwap*(node: WakuNode, swapConfig: SwapConfig = SwapConfig.init()) {.as
node.switch.mount(node.wakuSwap, protocolMatcher(WakuSwapCodec))
proc mountStore*(node: WakuNode, store: MessageStore = nil, capacity = StoreDefaultCapacity, retentionPolicy=none(MessageRetentionPolicy) ) {.async, raises: [Defect, LPError].} =
const MessageStoreDefaultRetentionPolicyInterval = 30.minutes
proc executeMessageRetentionPolicy(node: WakuNode) =
if node.wakuStore.isNil():
return
if node.wakuStore.store.isNil():
return
debug "executing message retention policy"
node.wakuStore.executeMessageRetentionPolicy()
node.wakuStore.reportStoredMessagesMetric()
proc startMessageRetentionPolicyPeriodicTask(node: WakuNode, interval: Duration) =
if node.wakuStore.isNil():
return
if node.wakuStore.store.isNil():
return
# https://github.com/nim-lang/Nim/issues/17369
var executeRetentionPolicy: proc(udata: pointer) {.gcsafe, raises: [Defect].}
executeRetentionPolicy = proc(udata: pointer) {.gcsafe.} =
executeMessageRetentionPolicy(node)
discard setTimer(Moment.fromNow(interval), executeRetentionPolicy)
discard setTimer(Moment.fromNow(interval), executeRetentionPolicy)
proc mountStore*(node: WakuNode, store: MessageStore = nil, retentionPolicy=none(MessageRetentionPolicy) ) {.async, raises: [Defect, LPError].} =
if node.wakuSwap.isNil():
info "mounting waku store protocol (no waku swap)"
else:
@ -469,7 +504,7 @@ proc mountStore*(node: WakuNode, store: MessageStore = nil, capacity = StoreDefa
wakuSwap=node.wakuSwap,
retentionPolicy=retentionPolicy
)
if node.started:
# Node has started already. Let's start store too.
await node.wakuStore.start()
@ -1061,6 +1096,9 @@ when isMainModule:
else: CapacityRetentionPolicy.init(conf.storeCapacity)
waitFor mountStore(node, mStorage, retentionPolicy=some(retentionPolicy))
executeMessageRetentionPolicy(node)
startMessageRetentionPolicyPeriodicTask(node, interval=MessageStoreDefaultRetentionPolicyInterval)
if conf.storenode != "":
setStorePeer(node, conf.storenode)

View File

@ -75,16 +75,19 @@ type
retentionPolicy: Option[MessageRetentionPolicy]
proc executeMessageRetentionPolicy*(w: WakuStore): WakuStoreResult[void] =
proc executeMessageRetentionPolicy*(w: WakuStore) =
if w.retentionPolicy.isNone():
return ok()
return
if w.store.isNil():
return
let policy = w.retentionPolicy.get()
if w.store.isNil():
return err("no message store provided (nil)")
policy.execute(w.store)
let retPolicyRes = policy.execute(w.store)
if retPolicyRes.isErr():
waku_store_errors.inc(labelValues = [retPolicyFailure])
debug "failed execution of retention policy", error=retPolicyRes.error
proc reportStoredMessagesMetric*(w: WakuStore) =
@ -198,14 +201,6 @@ proc init*(T: type WakuStore,
)
ws.initProtocolHandler()
# TODO: Move to wakunode
# Execute retention policy on initialization
let retPolicyRes = ws.executeMessageRetentionPolicy()
if retPolicyRes.isErr():
warn "an error occurred while applying the retention policy at init", error=retPolicyRes.error
ws.reportStoredMessagesMetric()
return ws
proc init*(T: type WakuStore,
@ -217,7 +212,7 @@ proc init*(T: type WakuStore,
WakuStore.init(peerManager, rng, store, wakuSwap, retentionPolicy)
proc handleMessage*(w: WakuStore, pubsubTopic: string, msg: WakuMessage) {.async.} =
proc handleMessage*(w: WakuStore, pubsubTopic: string, msg: WakuMessage) =
if w.store.isNil():
# Messages should not be stored
return
@ -240,14 +235,6 @@ proc handleMessage*(w: WakuStore, pubsubTopic: string, msg: WakuMessage) {.async
waku_store_errors.inc(labelValues = [insertFailure])
return
# Execute the retention policy after insertion
let retPolicyRes = w.executeMessageRetentionPolicy()
if retPolicyRes.isErr():
debug "message retention policy failure", error=retPolicyRes.error
waku_store_errors.inc(labelValues = [retPolicyFailure])
w.reportStoredMessagesMetric()
let insertDuration = getTime().toUnixFloat() - insertStartTime
waku_store_insert_duration_seconds.observe(insertDuration)
@ -435,16 +422,6 @@ proc resume*(w: WakuStore,
added.inc()
debug "resume finished successfully", retrievedMessages=res.get().len, addedMessages=added
# Execute the retention policy after insertion
let retPolicyRes = w.executeMessageRetentionPolicy()
if retPolicyRes.isErr():
debug "message retention policy failure", error=retPolicyRes.error
waku_store_errors.inc(labelValues = [retPolicyFailure])
w.reportStoredMessagesMetric()
return ok(added)