diff --git a/tests/v2/test_message_store_sqlite.nim b/tests/v2/test_message_store_sqlite.nim index cc54d2f3a..f3f3804ac 100644 --- a/tests/v2/test_message_store_sqlite.nim +++ b/tests/v2/test_message_store_sqlite.nim @@ -8,6 +8,8 @@ import sqlite3_abi import ../../waku/v2/node/storage/message/sqlite_store, + ../../waku/v2/node/storage/message/message_retention_policy, + ../../waku/v2/node/storage/message/message_retention_policy_capacity, ../../waku/v2/node/storage/sqlite, ../../waku/v2/protocol/waku_message, ../../waku/v2/utils/time, @@ -42,14 +44,10 @@ proc fakeWakuMessage( suite "SQLite message store - init store": test "init store": ## Given - const storeCapacity = 20 - let database = newTestDatabase() ## When - let - retentionPolicy: MessageRetentionPolicy = CapacityRetentionPolicy.init(capacity=storeCapacity) - resStore = SqliteStore.init(database, retentionPolicy=some(retentionPolicy)) + let resStore = SqliteStore.init(database) ## Then check: @@ -109,8 +107,8 @@ suite "SQLite message store - insert messages": let database = newTestDatabase() + store = SqliteStore.init(database).tryGet() retentionPolicy: MessageRetentionPolicy = CapacityRetentionPolicy.init(capacity=storeCapacity) - store = SqliteStore.init(database, retentionPolicy=some(retentionPolicy)).tryGet() let messages = @[ fakeWakuMessage(ts=getNanosecondTime(epochTime()) + 0), @@ -126,8 +124,8 @@ suite "SQLite message store - insert messages": ## When for msg in messages: let index = Index.compute(msg, msg.timestamp, DefaultPubsubTopic) - let resPut = store.put(index, msg, DefaultPubsubTopic) - require(resPut.isOk()) + require store.put(index, msg, DefaultPubsubTopic).isOk() + require retentionPolicy.execute(store).isOk() ## Then let storedMsg = store.getAllMessages().tryGet() @@ -274,22 +272,21 @@ suite "Message Store": capacity = 10 let - database = SqliteDatabase.init("", inMemory = true)[] + database = newTestDatabase() + store = SqliteStore.init(database).tryGet() retentionPolicy: MessageRetentionPolicy = CapacityRetentionPolicy.init(capacity=capacity) - store = SqliteStore.init(database, retentionPolicy=some(retentionPolicy)).tryGet() for i in 1..capacity: let msg = WakuMessage(payload: @[byte i], contentTopic: contentTopic, version: uint32(0), timestamp: Timestamp(i)) index = Index.compute(msg, getTestTimestamp(), DefaultPubsubTopic) - output = store.put(index, msg, pubsubTopic) - check output.isOk - + require store.put(index, msg, pubsubTopic).isOk() + require retentionPolicy.execute(store).isOk() + + ## Then # Test limited getAll function when store is at capacity let resMax = store.getAllMessages() - - ## THen check: resMax.isOk() @@ -311,16 +308,15 @@ suite "Message Store": let database = SqliteDatabase.init("", inMemory = true)[] + store = SqliteStore.init(database).tryGet() retentionPolicy: MessageRetentionPolicy = CapacityRetentionPolicy.init(capacity=capacity) - store = SqliteStore.init(database, retentionPolicy=some(retentionPolicy)).tryGet() - defer: store.close() for i in 1..capacity+overload: let msg = WakuMessage(payload: ($i).toBytes(), contentTopic: contentTopic, version: uint32(0), timestamp: Timestamp(i)) index = Index.compute(msg, getTestTimestamp(), DefaultPubsubTopic) - output = store.put(index, msg, pubsubTopic) - check output.isOk + require store.put(index, msg, pubsubTopic).isOk() + require retentionPolicy.execute(store).isOk() # count messages in DB var numMessages: int64 @@ -333,4 +329,7 @@ suite "Message Store": # expected number of messages is 120 because # (capacity = 100) + (half of the overflow window = 15) + (5 messages added after after the last delete) # the window size changes when changing `const maxStoreOverflow = 1.3 in sqlite_store - numMessages == 120 \ No newline at end of file + numMessages == 120 + + ## Teardown + store.close() \ No newline at end of file diff --git a/tests/v2/test_message_store_sqlite_query.nim b/tests/v2/test_message_store_sqlite_query.nim index 87bf5fbce..40b2c8b17 100644 --- a/tests/v2/test_message_store_sqlite_query.nim +++ b/tests/v2/test_message_store_sqlite_query.nim @@ -39,13 +39,11 @@ suite "message store - history query": test "single content topic": ## Given - const storeCapacity = 20 const contentTopic = "test-content-topic" let database = newTestDatabase() - retentionPolicy: MessageRetentionPolicy = CapacityRetentionPolicy.init(capacity=storeCapacity) - store = SqliteStore.init(database, retentionPolicy=some(retentionPolicy)).tryGet() + store = SqliteStore.init(database).tryGet() let messages = @[ fakeWakuMessage(ts=getNanosecondTime(epochTime()) + 0), @@ -62,8 +60,7 @@ suite "message store - history query": for msg in messages: let index = Index.compute(msg, msg.timestamp, DefaultPubsubTopic) - let resPut = store.put(index, msg, DefaultPubsubTopic) - require(resPut.isOk()) + require store.put(index, msg, DefaultPubsubTopic).isOk() ## When let res = store.getMessagesByHistoryQuery( @@ -91,13 +88,11 @@ suite "message store - history query": test "single content topic and descending order": ## Given - const storeCapacity = 20 const contentTopic = "test-content-topic" let database = newTestDatabase() - retentionPolicy: MessageRetentionPolicy = CapacityRetentionPolicy.init(capacity=storeCapacity) - store = SqliteStore.init(database, retentionPolicy=some(retentionPolicy)).tryGet() + store = SqliteStore.init(database).tryGet() let messages = @[ fakeWakuMessage(ts=getNanosecondTime(epochTime()) + 0), @@ -114,8 +109,7 @@ suite "message store - history query": for msg in messages: let index = Index.compute(msg, msg.timestamp, DefaultPubsubTopic) - let resPut = store.put(index, msg, DefaultPubsubTopic) - require(resPut.isOk()) + require store.put(index, msg, DefaultPubsubTopic).isOk() ## When let res = store.getMessagesByHistoryQuery( @@ -143,15 +137,13 @@ suite "message store - history query": test "multiple content topic": ## Given - const storeCapacity = 20 const contentTopic1 = "test-content-topic-1" const contentTopic2 = "test-content-topic-2" const contentTopic3 = "test-content-topic-3" let database = newTestDatabase() - retentionPolicy: MessageRetentionPolicy = CapacityRetentionPolicy.init(capacity=storeCapacity) - store = SqliteStore.init(database, retentionPolicy=some(retentionPolicy)).tryGet() + store = SqliteStore.init(database).tryGet() let messages = @[ fakeWakuMessage(ts=getNanosecondTime(epochTime()) + 0), @@ -168,8 +160,7 @@ suite "message store - history query": for msg in messages: let index = Index.compute(msg, msg.timestamp, DefaultPubsubTopic) - let resPut = store.put(index, msg, DefaultPubsubTopic) - require(resPut.isOk()) + require store.put(index, msg, DefaultPubsubTopic).isOk() ## When let res = store.getMessagesByHistoryQuery( @@ -197,14 +188,12 @@ suite "message store - history query": test "content topic and pubsub topic": ## Given - const storeCapacity = 20 const contentTopic = "test-content-topic" const pubsubTopic = "test-pubsub-topic" let database = newTestDatabase() - retentionPolicy: MessageRetentionPolicy = CapacityRetentionPolicy.init(capacity=storeCapacity) - store = SqliteStore.init(database, retentionPolicy=some(retentionPolicy)).tryGet() + store = SqliteStore.init(database).tryGet() let messages1 = @[ fakeWakuMessage(ts=getNanosecondTime(epochTime()) + 0), @@ -215,8 +204,8 @@ suite "message store - history query": ] for msg in messages1: let index = Index.compute(msg, msg.timestamp, DefaultPubsubTopic) - let resPut = store.put(index, msg, DefaultPubsubTopic) - require(resPut.isOk()) + require store.put(index, msg, DefaultPubsubTopic).isOk() + let messages2 = @[ fakeWakuMessage("MSG-03", contentTopic=contentTopic, ts=getNanosecondTime(epochTime()) + 4), @@ -226,9 +215,8 @@ suite "message store - history query": ] for msg in messages2: let index = Index.compute(msg, msg.timestamp, pubsubTopic) - let resPut = store.put(index, msg, pubsubTopic) - require(resPut.isOk()) - + require store.put(index, msg, pubsubTopic).isOk() + ## When let res = store.getMessagesByHistoryQuery( contentTopic=some(@[contentTopic]), @@ -256,13 +244,11 @@ suite "message store - history query": test "content topic and cursor": ## Given - const storeCapacity = 20 const contentTopic = "test-content-topic" let database = newTestDatabase() - retentionPolicy: MessageRetentionPolicy = CapacityRetentionPolicy.init(capacity=storeCapacity) - store = SqliteStore.init(database, retentionPolicy=some(retentionPolicy)).tryGet() + store = SqliteStore.init(database).tryGet() let messages = @[ fakeWakuMessage(ts=getNanosecondTime(epochTime()) + 0), @@ -279,8 +265,7 @@ suite "message store - history query": for msg in messages: let index = Index.compute(msg, msg.timestamp, DefaultPubsubTopic) - let resPut = store.put(index, msg, DefaultPubsubTopic) - require(resPut.isOk()) + require store.put(index, msg, DefaultPubsubTopic).isOk() let cursor = Index.compute(messages[4], messages[4].timestamp, DefaultPubsubTopic) @@ -311,13 +296,11 @@ suite "message store - history query": test "content topic, cursor and descending order": ## Given - const storeCapacity = 20 const contentTopic = "test-content-topic" let database = newTestDatabase() - retentionPolicy: MessageRetentionPolicy = CapacityRetentionPolicy.init(capacity=storeCapacity) - store = SqliteStore.init(database, retentionPolicy=some(retentionPolicy)).tryGet() + store = SqliteStore.init(database).tryGet() let messages = @[ fakeWakuMessage(ts=getNanosecondTime(epochTime()) + 0), @@ -334,8 +317,7 @@ suite "message store - history query": for msg in messages: let index = Index.compute(msg, msg.timestamp, DefaultPubsubTopic) - let resPut = store.put(index, msg, DefaultPubsubTopic) - require(resPut.isOk()) + require store.put(index, msg, DefaultPubsubTopic).isOk() let cursor = Index.compute(messages[6], messages[6].timestamp, DefaultPubsubTopic) @@ -366,14 +348,12 @@ suite "message store - history query": test "content topic, pubsub topic and cursor": ## Given - const storeCapacity = 20 const contentTopic = "test-content-topic" const pubsubTopic = "test-pubsub-topic" let database = newTestDatabase() - retentionPolicy: MessageRetentionPolicy = CapacityRetentionPolicy.init(capacity=storeCapacity) - store = SqliteStore.init(database, retentionPolicy=some(retentionPolicy)).tryGet() + store = SqliteStore.init(database).tryGet() let messages1 = @[ fakeWakuMessage(ts=getNanosecondTime(epochTime()) + 0), @@ -384,8 +364,7 @@ suite "message store - history query": ] for msg in messages1: let index = Index.compute(msg, msg.timestamp, DefaultPubsubTopic) - let resPut = store.put(index, msg, DefaultPubsubTopic) - require(resPut.isOk()) + require store.put(index, msg, DefaultPubsubTopic).isOk() let messages2 = @[ fakeWakuMessage("MSG-04", contentTopic=contentTopic, ts=getNanosecondTime(epochTime()) + 5), @@ -394,8 +373,7 @@ suite "message store - history query": ] for msg in messages2: let index = Index.compute(msg, msg.timestamp, pubsubTopic) - let resPut = store.put(index, msg, pubsubTopic) - require(resPut.isOk()) + require store.put(index, msg, pubsubTopic).isOk() let cursor = Index.compute(messages2[0], messages2[0].timestamp, DefaultPubsubTopic) @@ -427,13 +405,11 @@ suite "message store - history query": test "single content topic - no results": ## Given - const storeCapacity = 10 const contentTopic = "test-content-topic" let database = newTestDatabase() - retentionPolicy: MessageRetentionPolicy = CapacityRetentionPolicy.init(capacity=storeCapacity) - store = SqliteStore.init(database, retentionPolicy=some(retentionPolicy)).tryGet() + store = SqliteStore.init(database).tryGet() let messages = @[ fakeWakuMessage("MSG-01", contentTopic=DefaultContentTopic, ts=getNanosecondTime(epochTime()) + 2), @@ -445,8 +421,7 @@ suite "message store - history query": for msg in messages: let index = Index.compute(msg, msg.timestamp, DefaultPubsubTopic) - let resPut = store.put(index, msg, DefaultPubsubTopic) - require(resPut.isOk()) + require store.put(index, msg, DefaultPubsubTopic).isOk() ## When let res = store.getMessagesByHistoryQuery( @@ -469,15 +444,12 @@ suite "message store - history query": test "single content topic and valid time range": ## Given - let - storeCapacity = 10 - contentTopic = "test-content-topic" - timeOrigin = getNanosecondTime(epochTime()) + const contentTopic = "test-content-topic" + let timeOrigin = getNanosecondTime(epochTime()) let database = newTestDatabase() - retentionPolicy: MessageRetentionPolicy = CapacityRetentionPolicy.init(capacity=storeCapacity) - store = SqliteStore.init(database, retentionPolicy=some(retentionPolicy)).tryGet() + store = SqliteStore.init(database).tryGet() let messages = @[ fakeWakuMessage("MSG-01", contentTopic=contentTopic, ts=timeOrigin + 00), @@ -491,8 +463,7 @@ suite "message store - history query": for msg in messages: let index = Index.compute(msg, msg.timestamp, DefaultPubsubTopic) - let resPut = store.put(index, msg, DefaultPubsubTopic) - require(resPut.isOk()) + require store.put(index, msg, DefaultPubsubTopic).isOk() ## When let res = store.getMessagesByHistoryQuery( @@ -521,15 +492,12 @@ suite "message store - history query": test "single content topic and invalid time range - no results": ## Given - let - storeCapacity = 10 - contentTopic = "test-content-topic" - timeOrigin = getNanosecondTime(epochTime()) + const contentTopic = "test-content-topic" + let timeOrigin = getNanosecondTime(epochTime()) let database = newTestDatabase() - retentionPolicy: MessageRetentionPolicy = CapacityRetentionPolicy.init(capacity=storeCapacity) - store = SqliteStore.init(database, retentionPolicy=some(retentionPolicy)).tryGet() + store = SqliteStore.init(database).tryGet() let messages = @[ fakeWakuMessage("MSG-01", contentTopic=contentTopic, ts=timeOrigin + 00), @@ -541,8 +509,7 @@ suite "message store - history query": for msg in messages: let index = Index.compute(msg, msg.timestamp, DefaultPubsubTopic) - let resPut = store.put(index, msg, DefaultPubsubTopic) - require(resPut.isOk()) + require store.put(index, msg, DefaultPubsubTopic).isOk() ## When let res = store.getMessagesByHistoryQuery( @@ -566,15 +533,12 @@ suite "message store - history query": test "single content topic and only time range start": ## Given - let - storeCapacity = 10 - contentTopic = "test-content-topic" - timeOrigin = getNanosecondTime(epochTime()) + const contentTopic = "test-content-topic" + let timeOrigin = getNanosecondTime(epochTime()) let database = newTestDatabase() - retentionPolicy: MessageRetentionPolicy = CapacityRetentionPolicy.init(capacity=storeCapacity) - store = SqliteStore.init(database, retentionPolicy=some(retentionPolicy)).tryGet() + store = SqliteStore.init(database).tryGet() let messages = @[ fakeWakuMessage("MSG-01", contentTopic=contentTopic, ts=timeOrigin + 00), @@ -587,8 +551,7 @@ suite "message store - history query": for msg in messages: let index = Index.compute(msg, msg.timestamp, DefaultPubsubTopic) - let resPut = store.put(index, msg, DefaultPubsubTopic) - require(resPut.isOk()) + require store.put(index, msg, DefaultPubsubTopic).isOk() ## When let res = store.getMessagesByHistoryQuery( @@ -615,15 +578,12 @@ suite "message store - history query": test "single content topic, cursor and only time range start": ## Given - let - storeCapacity = 10 - contentTopic = "test-content-topic" - timeOrigin = getNanosecondTime(epochTime()) + const contentTopic = "test-content-topic" + let timeOrigin = getNanosecondTime(epochTime()) let database = newTestDatabase() - retentionPolicy: MessageRetentionPolicy = CapacityRetentionPolicy.init(capacity=storeCapacity) - store = SqliteStore.init(database, retentionPolicy=some(retentionPolicy)).tryGet() + store = SqliteStore.init(database).tryGet() let messages = @[ fakeWakuMessage("MSG-01", contentTopic=contentTopic, ts=timeOrigin + 00), @@ -637,8 +597,7 @@ suite "message store - history query": for msg in messages: let index = Index.compute(msg, msg.timestamp, DefaultPubsubTopic) - let resPut = store.put(index, msg, DefaultPubsubTopic) - require(resPut.isOk()) + require store.put(index, msg, DefaultPubsubTopic).isOk() let cursor = Index.compute(messages[3], messages[3].timestamp, DefaultPubsubTopic) diff --git a/waku/v2/node/config.nim b/waku/v2/node/config.nim index cb47a80f0..4cf1f94d8 100644 --- a/waku/v2/node/config.nim +++ b/waku/v2/node/config.nim @@ -209,6 +209,7 @@ type defaultValue: false name: "sqlite-store" }: bool + ## TODO: Rename this command-line option to `storeRetentionTime` sqliteRetentionTime* {. desc: "time the sqlite-only store keeps messages (in seconds)", defaultValue: 30.days.seconds diff --git a/waku/v2/node/storage/message/sqlite_store/retention_policy.nim b/waku/v2/node/storage/message/message_retention_policy.nim similarity index 55% rename from waku/v2/node/storage/message/sqlite_store/retention_policy.nim rename to waku/v2/node/storage/message/message_retention_policy.nim index dc60c4fe2..4465d4090 100644 --- a/waku/v2/node/storage/message/sqlite_store/retention_policy.nim +++ b/waku/v2/node/storage/message/message_retention_policy.nim @@ -3,12 +3,11 @@ import stew/results import - ../../sqlite - + ./message_store type RetentionPolicyResult*[T] = Result[T, string] type MessageRetentionPolicy* = ref object of RootObj -method execute*(p: MessageRetentionPolicy, db: SqliteDatabase): RetentionPolicyResult[void] {.base.} = discard \ No newline at end of file +method execute*(p: MessageRetentionPolicy, store: MessageStore): RetentionPolicyResult[void] {.base.} = discard \ No newline at end of file diff --git a/waku/v2/node/storage/message/sqlite_store/retention_policy_capacity.nim b/waku/v2/node/storage/message/message_retention_policy_capacity.nim similarity index 87% rename from waku/v2/node/storage/message/sqlite_store/retention_policy_capacity.nim rename to waku/v2/node/storage/message/message_retention_policy_capacity.nim index 4f0933672..1fb6dac6c 100644 --- a/waku/v2/node/storage/message/sqlite_store/retention_policy_capacity.nim +++ b/waku/v2/node/storage/message/message_retention_policy_capacity.nim @@ -4,10 +4,8 @@ import stew/results, chronicles import - ../../sqlite, - ../message_store, - ./queries, - ./retention_policy + ./message_store, + ./message_retention_policy logScope: topics = "message_store.sqlite_store.retention_policy.capacity" @@ -57,13 +55,13 @@ proc init*(T: type CapacityRetentionPolicy, capacity=StoreDefaultCapacity): T = deleteWindow: deleteWindow ) -method execute*(p: CapacityRetentionPolicy, db: SqliteDatabase): RetentionPolicyResult[void] = - let numMessages = ?db.getMessageCount().mapErr(proc(err: string): string = "failed to get messages count: " & err) +method execute*(p: CapacityRetentionPolicy, store: MessageStore): RetentionPolicyResult[void] = + let numMessages = ?store.getMessagesCount().mapErr(proc(err: string): string = "failed to get messages count: " & err) if numMessages < p.totalCapacity: return ok() - let res = db.deleteOldestMessagesNotWithinLimit(limit=p.capacity + p.deleteWindow) + let res = store.deleteOldestMessagesNotWithinLimit(limit=p.capacity + p.deleteWindow) if res.isErr(): return err("deleting oldest messages failed: " & res.error()) diff --git a/waku/v2/node/storage/message/sqlite_store/retention_policy_time.nim b/waku/v2/node/storage/message/message_retention_policy_time.nim similarity index 67% rename from waku/v2/node/storage/message/sqlite_store/retention_policy_time.nim rename to waku/v2/node/storage/message/message_retention_policy_time.nim index 0f75ee54c..849599b27 100644 --- a/waku/v2/node/storage/message/sqlite_store/retention_policy_time.nim +++ b/waku/v2/node/storage/message/message_retention_policy_time.nim @@ -6,11 +6,9 @@ import chronicles, chronos import - ../../../../utils/time, - ../../sqlite, - ../message_store, - ../sqlite_store/queries, - ./retention_policy + ../../../utils/time, + ./message_store, + ./message_retention_policy logScope: topics = "message_store.sqlite_store.retention_policy.time" @@ -26,10 +24,10 @@ proc init*(T: type TimeRetentionPolicy, retentionTime=StoreDefaultRetentionTime) ) -method execute*(p: TimeRetentionPolicy, db: SqliteDatabase): RetentionPolicyResult[void] = +method execute*(p: TimeRetentionPolicy, store: MessageStore): RetentionPolicyResult[void] = ## Delete messages that exceed the retention time by 10% and more (batch delete for efficiency) - let oldestReceiverTimestamp = ?db.selectOldestReceiverTimestamp().mapErr(proc(err: string): string = "failed to get oldest message timestamp: " & err) + let oldestReceiverTimestamp = ?store.getOldestMessageTimestamp().mapErr(proc(err: string): string = "failed to get oldest message timestamp: " & err) let now = getNanosecondTime(getTime().toUnixFloat()) let retentionTimestamp = now - p.retentionTime.nanoseconds @@ -38,7 +36,7 @@ method execute*(p: TimeRetentionPolicy, db: SqliteDatabase): RetentionPolicyResu if thresholdTimestamp <= oldestReceiverTimestamp: return ok() - let res = db.deleteMessagesOlderThanTimestamp(ts=retentionTimestamp) + let res = store.deleteMessagesOlderThanTimestamp(ts=retentionTimestamp) if res.isErr(): return err("failed to delete oldest messages: " & res.error()) diff --git a/waku/v2/node/storage/message/sqlite_store.nim b/waku/v2/node/storage/message/sqlite_store.nim index 28edd9a33..e9889a7c9 100644 --- a/waku/v2/node/storage/message/sqlite_store.nim +++ b/waku/v2/node/storage/message/sqlite_store.nim @@ -1,13 +1,5 @@ {.push raises: [Defect].} -import - ./sqlite_store/retention_policy, - ./sqlite_store/retention_policy_capacity, - ./sqlite_store/retention_policy_time, - ./sqlite_store/sqlite_store +import ./sqlite_store/sqlite_store -export - retention_policy, - retention_policy_capacity, - retention_policy_time, - sqlite_store \ No newline at end of file +export sqlite_store \ No newline at end of file diff --git a/waku/v2/node/storage/message/sqlite_store/sqlite_store.nim b/waku/v2/node/storage/message/sqlite_store/sqlite_store.nim index 65b8b8ee4..8a164934d 100644 --- a/waku/v2/node/storage/message/sqlite_store/sqlite_store.nim +++ b/waku/v2/node/storage/message/sqlite_store/sqlite_store.nim @@ -13,10 +13,7 @@ import ../../../../utils/time, ../../sqlite, ../message_store, - ./queries, - ./retention_policy, - ./retention_policy_capacity, - ./retention_policy_time + ./queries logScope: topics = "message_store.sqlite" @@ -42,10 +39,9 @@ proc init(db: SqliteDatabase): MessageStoreResult[void] = type SqliteStore* = ref object of MessageStore db: SqliteDatabase - retentionPolicy: Option[MessageRetentionPolicy] insertStmt: SqliteStmt[InsertMessageParams, void] - -proc init*(T: type SqliteStore, db: SqliteDatabase, retentionPolicy=none(MessageRetentionPolicy)): MessageStoreResult[T] = + +proc init*(T: type SqliteStore, db: SqliteDatabase): MessageStoreResult[T] = # Database initialization let resInit = init(db) @@ -54,18 +50,7 @@ proc init*(T: type SqliteStore, db: SqliteDatabase, retentionPolicy=none(Message # General initialization let insertStmt = db.prepareInsertMessageStmt() - let s = SqliteStore( - db: db, - retentionPolicy: retentionPolicy, - insertStmt: insertStmt, - ) - - if retentionPolicy.isSome(): - let res = retentionPolicy.get().execute(db) - if res.isErr(): - return err("failed to execute the retention policy: " & res.error()) - - ok(s) + ok(SqliteStore(db: db, insertStmt: insertStmt)) proc close*(s: SqliteStore) = ## Close the database connection @@ -96,11 +81,6 @@ method put*(s: SqliteStore, cursor: Index, message: WakuMessage, pubsubTopic: st if res.isErr(): return err("message insert failed: " & res.error()) - if s.retentionPolicy.isSome(): - let res = s.retentionPolicy.get().execute(s.db) - if res.isErr(): - return err("failed to execute the retention policy: " & res.error()) - ok() diff --git a/waku/v2/node/wakunode2.nim b/waku/v2/node/wakunode2.nim index fcabdfdd0..df75504b9 100644 --- a/waku/v2/node/wakunode2.nim +++ b/waku/v2/node/wakunode2.nim @@ -25,6 +25,9 @@ import ./peer_manager/peer_manager, ./storage/message/waku_store_queue, ./storage/message/message_store, + ./storage/message/message_retention_policy, + ./storage/message/message_retention_policy_capacity, + ./storage/message/message_retention_policy_time, ./dnsdisc/waku_dnsdisc, ./discv5/waku_discv5, ./wakuswitch, @@ -449,15 +452,25 @@ proc mountSwap*(node: WakuNode, swapConfig: SwapConfig = SwapConfig.init()) {.as node.switch.mount(node.wakuSwap, protocolMatcher(WakuSwapCodec)) -proc mountStore*(node: WakuNode, store: MessageStore = nil, persistMessages: bool = false, capacity = StoreDefaultCapacity, isSqliteOnly = false) {.async, raises: [Defect, LPError].} = - info "mounting store" - - if node.wakuSwap.isNil: - debug "mounting store without swap" - node.wakuStore = WakuStore.init(node.peerManager, node.rng, store, persistMessages=persistMessages, capacity=capacity, isSqliteOnly=isSqliteOnly) +proc mountStore*(node: WakuNode, store: MessageStore = nil, persistMessages: bool = false, capacity = StoreDefaultCapacity, retentionTime = StoreDefaultRetentionTime, isSqliteOnly = false) {.async, raises: [Defect, LPError].} = + if node.wakuSwap.isNil(): + info "mounting waku store protocol (no waku swap)" else: - debug "mounting store with swap" - node.wakuStore = WakuStore.init(node.peerManager, node.rng, store, node.wakuSwap, persistMessages=persistMessages, capacity=capacity, isSqliteOnly=isSqliteOnly) + info "mounting waku store protocol with waku swap support" + + let retentionPolicy = if isSqliteOnly: TimeRetentionPolicy.init(retentionTime) + else: CapacityRetentionPolicy.init(capacity) + + node.wakuStore = WakuStore.init( + node.peerManager, + node.rng, + store, + wakuSwap=node.wakuSwap, + persistMessages=persistMessages, + capacity=capacity, + isSqliteOnly=isSqliteOnly, + retentionPolicy=some(retentionPolicy) + ) if node.started: # Node has started already. Let's start store too. @@ -889,9 +902,7 @@ when isMainModule: if conf.persistMessages: # Historical message persistence enable. Set up Message table in storage - let retentionPolicy = if conf.sqliteStore: TimeRetentionPolicy.init(conf.sqliteRetentionTime) - else: CapacityRetentionPolicy.init(conf.storeCapacity) - let res = SqliteStore.init(sqliteDatabase, retentionPolicy=some(retentionPolicy)) + let res = SqliteStore.init(sqliteDatabase) if res.isErr(): warn "failed to init SqliteStore", err = res.error waku_node_errors.inc(labelValues = ["init_store_failure"]) @@ -1073,7 +1084,7 @@ when isMainModule: # Store setup if (conf.storenode != "") or (conf.store): - waitFor mountStore(node, mStorage, conf.persistMessages, conf.storeCapacity, conf.sqliteStore) + waitFor mountStore(node, mStorage, conf.persistMessages, conf.storeCapacity, conf.sqliteRetentionTime, conf.sqliteStore) if conf.storenode != "": setStorePeer(node, conf.storenode) diff --git a/waku/v2/protocol/waku_store/protocol.nim b/waku/v2/protocol/waku_store/protocol.nim index e74694f17..8a9e321a4 100644 --- a/waku/v2/protocol/waku_store/protocol.nim +++ b/waku/v2/protocol/waku_store/protocol.nim @@ -16,6 +16,7 @@ import metrics import ../../node/storage/message/message_store, + ../../node/storage/message/message_retention_policy, ../../node/storage/message/waku_store_queue, ../../node/peer_manager/peer_manager, ../../utils/time, @@ -74,6 +75,7 @@ type persistMessages*: bool #TODO: SqliteStore currenly also holds isSqliteOnly; put it in single place. isSqliteOnly: bool # if true, don't use in memory-store and answer history queries from the sqlite DB + retentionPolicy: Option[MessageRetentionPolicy] proc reportMessagesCountMetric(store: MessageStore) = @@ -181,12 +183,18 @@ proc init*(ws: WakuStore, capacity = StoreDefaultCapacity) = ws.codec = WakuStoreCodec ws.messages = StoreQueueRef.new(capacity) - if ws.isSqliteOnly: if ws.store.isNil(): warn "store not provided (nil)" return + # Execute retention policy on initialization + if not ws.retentionPolicy.isNone(): + let policy = ws.retentionPolicy.get() + let resRetPolicy = policy.execute(ws.store) + if resRetPolicy.isErr(): + warn "an error occurred while applying the retention policy at init", error=resRetPolicy.error() + info "SQLite-only store initialized. Messages are *not* loaded into memory." let numMessages = ws.store.getMessagesCount() @@ -200,6 +208,13 @@ proc init*(ws: WakuStore, capacity = StoreDefaultCapacity) = if ws.store.isNil(): return + # Execute retention policy before loading any messages into in-memory store + if not ws.retentionPolicy.isNone(): + let policy = ws.retentionPolicy.get() + let resRetPolicy = policy.execute(ws.store) + if resRetPolicy.isErr(): + warn "an error occurred while applying the retention policy at init", error=resRetPolicy.error() + info "loading messages from persistent storage" let res = ws.store.getAllMessages() @@ -218,13 +233,28 @@ proc init*(ws: WakuStore, capacity = StoreDefaultCapacity) = waku_store_messages.set(numMessages.value, labelValues = ["stored"]) -proc init*(T: type WakuStore, peerManager: PeerManager, rng: ref rand.HmacDrbgContext, - store: MessageStore = nil, wakuSwap: WakuSwap = nil, persistMessages = true, - capacity = StoreDefaultCapacity, isSqliteOnly = false): T = - let ws = WakuStore(rng: rng, peerManager: peerManager, store: store, wakuSwap: wakuSwap, persistMessages: persistMessages, isSqliteOnly: isSqliteOnly) +proc init*(T: type WakuStore, + peerManager: PeerManager, + rng: ref rand.HmacDrbgContext, + store: MessageStore = nil, + wakuSwap: WakuSwap = nil, + persistMessages = true, + capacity = StoreDefaultCapacity, + isSqliteOnly = false, + retentionPolicy=none(MessageRetentionPolicy)): T = + let ws = WakuStore( + rng: rng, + peerManager: peerManager, + store: store, + wakuSwap: wakuSwap, + persistMessages: persistMessages, + isSqliteOnly: isSqliteOnly, + retentionPolicy: retentionPolicy + ) ws.init(capacity) return ws + # TODO: This should probably be an add function and append the peer to an array proc setPeer*(ws: WakuStore, peer: RemotePeerInfo) = ws.peerManager.addPeer(peer, WakuStoreCodec) @@ -257,6 +287,15 @@ proc handleMessage*(w: WakuStore, pubsubTopic: string, msg: WakuMessage) {.async if resPutStore.isErr(): debug "failed to insert message to persistent store", index=index, err=resPutStore.error() waku_store_errors.inc(labelValues = [insertFailure]) + return + + # Execute the retention policy after insertion + if not w.retentionPolicy.isNone(): + let policy = w.retentionPolicy.get() + let resRetPolicy = policy.execute(w.store) + if resRetPolicy.isErr(): + debug "message retention policy failure", error=resRetPolicy.error() + waku_store_errors.inc(labelValues = [insertFailure]) reportMessagesCountMetric(w.store) @@ -282,6 +321,14 @@ proc handleMessage*(w: WakuStore, pubsubTopic: string, msg: WakuMessage) {.async waku_store_errors.inc(labelValues = [insertFailure]) return + # Execute the retention policy after insertion + if not w.retentionPolicy.isNone(): + let policy = w.retentionPolicy.get() + let resRetPolicy = policy.execute(w.store) + if resRetPolicy.isErr(): + debug "message retention policy failure", error=resRetPolicy.error() + waku_store_errors.inc(labelValues = [insertFailure]) + let insertDuration = getTime().toUnixFloat() - insertStartTime waku_store_insert_duration_seconds.observe(insertDuration) @@ -465,6 +512,14 @@ proc resume*(w: WakuStore, debug "failed to insert message to persistent store", index=index, err=resPutStore.error() waku_store_errors.inc(labelValues = [insertFailure]) continue + + # Execute the retention policy after insertion + if not w.retentionPolicy.isNone(): + let policy = w.retentionPolicy.get() + let resRetPolicy = policy.execute(w.store) + if resRetPolicy.isErr(): + debug "message retention policy failure", error=resRetPolicy.error() + waku_store_errors.inc(labelValues = [insertFailure]) # TODO: Move this logic, together with the load from persistent store on init # into a "dual-store" message store implementation. @@ -491,6 +546,14 @@ proc resume*(w: WakuStore, debug "failed to insert message to persistent store", index=index, err=resPutStore.error() waku_store_errors.inc(labelValues = [insertFailure]) continue + + # Execute the retention policy after insertion + if not w.retentionPolicy.isNone(): + let policy = w.retentionPolicy.get() + let resRetPolicy = policy.execute(w.store) + if resRetPolicy.isErr(): + debug "message retention policy failure", error=resRetPolicy.error() + waku_store_errors.inc(labelValues = [insertFailure]) added.inc()