diff --git a/tests/v2/test_message_store_queue.nim b/tests/v2/test_message_store_queue.nim index fc4b40da8..dd2dbd75e 100644 --- a/tests/v2/test_message_store_queue.nim +++ b/tests/v2/test_message_store_queue.nim @@ -6,6 +6,7 @@ import testutils/unittests, nimcrypto/hash import + ../../waku/v2/node/storage/message/message_store, ../../waku/v2/node/storage/message/waku_store_queue, ../../waku/v2/protocol/waku_message, ../../waku/v2/protocol/waku_store, @@ -93,7 +94,7 @@ procSuite "Sorted store queue": let store = StoreQueueRef.new(capacity) let receiverTime = getNanoSecondTime(10) - senderTimeOk = receiverTime + MaxTimeVariance + senderTimeOk = receiverTime + StoreMaxTimeVariance senderTimeErr = senderTimeOk + 1 let invalidMessage = IndexedWakuMessage( diff --git a/waku/v2/node/storage/message/dual_message_store.nim b/waku/v2/node/storage/message/dual_message_store.nim index d5abe90fe..204262807 100644 --- a/waku/v2/node/storage/message/dual_message_store.nim +++ b/waku/v2/node/storage/message/dual_message_store.nim @@ -22,7 +22,7 @@ type DualMessageStore* = ref object of MessageStore persistent: SqliteStore -proc init*(T: type DualMessageStore, db: SqliteDatabase, capacity=StoreDefaultCapacity): MessageStoreResult[T] = +proc init*(T: type DualMessageStore, db: SqliteDatabase, capacity: int): MessageStoreResult[T] = let inmemory = StoreQueueRef.new(capacity) persistent = ?SqliteStore.init(db) @@ -60,7 +60,7 @@ method getMessagesByHistoryQuery*( cursor = none(Index), startTime = none(Timestamp), endTime = none(Timestamp), - maxPageSize = StoreMaxPageSize, + maxPageSize = MaxPageSize, ascendingOrder = true ): MessageStoreResult[MessageStorePage] = s.inmemory.getMessagesByHistoryQuery(contentTopic, pubsubTopic, cursor, startTime, endTime, maxPageSize, ascendingOrder) diff --git a/waku/v2/node/storage/message/message_retention_policy_capacity.nim b/waku/v2/node/storage/message/message_retention_policy_capacity.nim index 1fb6dac6c..c827e28d7 100644 --- a/waku/v2/node/storage/message/message_retention_policy_capacity.nim +++ b/waku/v2/node/storage/message/message_retention_policy_capacity.nim @@ -11,6 +11,8 @@ logScope: topics = "message_store.sqlite_store.retention_policy.capacity" +const StoreDefaultCapacity*: int = 25_000 + const StoreMaxOverflow = 1.3 type diff --git a/waku/v2/node/storage/message/message_retention_policy_time.nim b/waku/v2/node/storage/message/message_retention_policy_time.nim index 849599b27..1ef677ba6 100644 --- a/waku/v2/node/storage/message/message_retention_policy_time.nim +++ b/waku/v2/node/storage/message/message_retention_policy_time.nim @@ -14,6 +14,9 @@ logScope: topics = "message_store.sqlite_store.retention_policy.time" +const StoreDefaultRetentionTime*: int64 = 30.days.seconds + + type TimeRetentionPolicy* = ref object of MessageRetentionPolicy retentionTime: chronos.Duration diff --git a/waku/v2/node/storage/message/message_store.nim b/waku/v2/node/storage/message/message_store.nim index 86727c01f..ceb1290d3 100644 --- a/waku/v2/node/storage/message/message_store.nim +++ b/waku/v2/node/storage/message/message_store.nim @@ -13,11 +13,8 @@ import ../../../utils/pagination -const - StoreDefaultCapacity* = 25_000 - StoreDefaultRetentionTime* = chronos.days(30).seconds - StoreMaxPageSize* = 100.uint64 - StoreMaxTimeVariance* = getNanoSecondTime(20) # 20 seconds maximum allowable sender timestamp "drift" into the future +# TODO: Remove this constant after moving time variance checks to waku store protocol +const StoreMaxTimeVariance* = getNanoSecondTime(20) # 20 seconds maximum allowable sender timestamp "drift" into the future type @@ -42,7 +39,7 @@ method getMessagesByHistoryQuery*( cursor = none(Index), startTime = none(Timestamp), endTime = none(Timestamp), - maxPageSize = StoreMaxPageSize, + maxPageSize = MaxPageSize, ascendingOrder = true ): MessageStoreResult[MessageStorePage] {.base.} = discard diff --git a/waku/v2/node/storage/message/sqlite_store/sqlite_store.nim b/waku/v2/node/storage/message/sqlite_store/sqlite_store.nim index 8a164934d..d2355c845 100644 --- a/waku/v2/node/storage/message/sqlite_store/sqlite_store.nim +++ b/waku/v2/node/storage/message/sqlite_store/sqlite_store.nim @@ -96,11 +96,11 @@ method getMessagesByHistoryQuery*( cursor = none(Index), startTime = none(Timestamp), endTime = none(Timestamp), - maxPageSize = StoreMaxPageSize, + maxPageSize = MaxPageSize, ascendingOrder = true ): MessageStoreResult[MessageStorePage] = - let pageSizeLimit = if maxPageSize <= 0: StoreMaxPageSize - else: min(maxPageSize, StoreMaxPageSize) + let pageSizeLimit = if maxPageSize <= 0: MaxPageSize + else: min(maxPageSize, MaxPageSize) let rows = ?s.db.selectMessagesByHistoryQueryWithLimit( contentTopic, diff --git a/waku/v2/node/storage/message/waku_store_queue.nim b/waku/v2/node/storage/message/waku_store_queue.nim index ca2452717..7359df369 100644 --- a/waku/v2/node/storage/message/waku_store_queue.nim +++ b/waku/v2/node/storage/message/waku_store_queue.nim @@ -17,6 +17,9 @@ logScope: topics = "message_store.storequeue" +const StoreQueueDefaultMaxCapacity* = 25_000 + + type IndexedWakuMessage* = object # TODO may need to rename this object as it holds both the index and the pubsub topic of a waku message @@ -239,7 +242,7 @@ proc bwdPage(storeQueue: StoreQueueRef, #### API -proc new*(T: type StoreQueueRef, capacity: int = StoreDefaultCapacity): T = +proc new*(T: type StoreQueueRef, capacity: int = StoreQueueDefaultMaxCapacity): T = var items = SortedSet[Index, IndexedWakuMessage].init() return StoreQueueRef(items: items, capacity: capacity) @@ -356,8 +359,8 @@ proc getPage*(storeQueue: StoreQueueRef, let cursorOpt = if pagingInfo.cursor == Index(): none(Index) ## TODO: pagingInfo.cursor should be an Option. We shouldn't rely on empty initialisation to determine if set or not! else: some(pagingInfo.cursor) - maxPageSize = if pagingInfo.pageSize <= 0: StoreMaxPageSize - else: min(pagingInfo.pageSize, StoreMaxPageSize) + maxPageSize = if pagingInfo.pageSize <= 0: MaxPageSize + else: min(pagingInfo.pageSize, MaxPageSize) case pagingInfo.direction of PagingDirection.FORWARD: @@ -383,7 +386,7 @@ method getMessagesByHistoryQuery*( cursor = none(Index), startTime = none(Timestamp), endTime = none(Timestamp), - maxPageSize = StoreMaxPageSize, + maxPageSize = MaxPageSize, ascendingOrder = true ): MessageStoreResult[MessageStorePage] = diff --git a/waku/v2/protocol/waku_store/protocol.nim b/waku/v2/protocol/waku_store/protocol.nim index 270456db1..3e35952b5 100644 --- a/waku/v2/protocol/waku_store/protocol.nim +++ b/waku/v2/protocol/waku_store/protocol.nim @@ -4,7 +4,7 @@ {.push raises: [Defect].} import - std/[tables, times, sequtils, options, math], + std/[tables, times, sequtils, options], stew/results, chronicles, chronos, @@ -38,21 +38,13 @@ declarePublicHistogram waku_store_query_duration_seconds, "history query duratio logScope: topics = "wakustore" + const WakuStoreCodec* = "/vac/waku/store/2.0.0-beta4" DefaultTopic* = "/waku/2/default-waku/proto" - # Constants required for pagination ------------------------------------------- - MaxPageSize* = StoreMaxPageSize - - # TODO the DefaultPageSize can be changed, it's current value is random - DefaultPageSize* = uint64(20) # A recommended default number of waku messages per page - - MaxTimeVariance* = StoreMaxTimeVariance - - -const MaxRpcSize = StoreMaxPageSize * MaxWakuMessageSize + 64*1024 # We add a 64kB safety buffer for protocol overhead + MaxMessageTimestampVariance* = Timestamp(20.seconds.nanoseconds) # 20 seconds maximum allowable sender timestamp "drift" # Error types (metric label values) @@ -208,7 +200,7 @@ proc init*(T: type WakuStore, rng: ref rand.HmacDrbgContext, wakuSwap: WakuSwap = nil, retentionPolicy=none(MessageRetentionPolicy)): T = - let store = StoreQueueRef.new(StoreDefaultCapacity) + let store = StoreQueueRef.new() WakuStore.init(peerManager, rng, store, wakuSwap, retentionPolicy) diff --git a/waku/v2/protocol/waku_store/rpc_codec.nim b/waku/v2/protocol/waku_store/rpc_codec.nim index fa7dda965..857f48349 100644 --- a/waku/v2/protocol/waku_store/rpc_codec.nim +++ b/waku/v2/protocol/waku_store/rpc_codec.nim @@ -12,6 +12,9 @@ import ./rpc +const MaxRpcSize* = MaxPageSize * MaxWakuMessageSize + 64*1024 # We add a 64kB safety buffer for protocol overhead + + proc encode*(index: Index): ProtoBuffer = ## Encode an Index object into a ProtoBuffer ## returns the resultant ProtoBuffer diff --git a/waku/v2/utils/pagination.nim b/waku/v2/utils/pagination.nim index 83b04e4fa..914bc147e 100644 --- a/waku/v2/utils/pagination.nim +++ b/waku/v2/utils/pagination.nim @@ -9,6 +9,12 @@ import ./time +const + MaxPageSize*: uint64 = 100 + + DefaultPageSize*: uint64 = 20 # A recommended default number of waku messages per page + + type Index* = object ## This type contains the description of an Index used in the pagination of WakuMessages pubsubTopic*: string