chore(store): move constants to retention policies modules

This commit is contained in:
Lorenzo Delgado 2022-09-22 11:17:38 +02:00 committed by GitHub
parent 5e3a75c56e
commit aebda46a53
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 35 additions and 28 deletions

View File

@ -6,6 +6,7 @@ import
testutils/unittests, testutils/unittests,
nimcrypto/hash nimcrypto/hash
import import
../../waku/v2/node/storage/message/message_store,
../../waku/v2/node/storage/message/waku_store_queue, ../../waku/v2/node/storage/message/waku_store_queue,
../../waku/v2/protocol/waku_message, ../../waku/v2/protocol/waku_message,
../../waku/v2/protocol/waku_store, ../../waku/v2/protocol/waku_store,
@ -93,7 +94,7 @@ procSuite "Sorted store queue":
let store = StoreQueueRef.new(capacity) let store = StoreQueueRef.new(capacity)
let let
receiverTime = getNanoSecondTime(10) receiverTime = getNanoSecondTime(10)
senderTimeOk = receiverTime + MaxTimeVariance senderTimeOk = receiverTime + StoreMaxTimeVariance
senderTimeErr = senderTimeOk + 1 senderTimeErr = senderTimeOk + 1
let invalidMessage = IndexedWakuMessage( let invalidMessage = IndexedWakuMessage(

View File

@ -22,7 +22,7 @@ type DualMessageStore* = ref object of MessageStore
persistent: SqliteStore persistent: SqliteStore
proc init*(T: type DualMessageStore, db: SqliteDatabase, capacity=StoreDefaultCapacity): MessageStoreResult[T] = proc init*(T: type DualMessageStore, db: SqliteDatabase, capacity: int): MessageStoreResult[T] =
let let
inmemory = StoreQueueRef.new(capacity) inmemory = StoreQueueRef.new(capacity)
persistent = ?SqliteStore.init(db) persistent = ?SqliteStore.init(db)
@ -60,7 +60,7 @@ method getMessagesByHistoryQuery*(
cursor = none(Index), cursor = none(Index),
startTime = none(Timestamp), startTime = none(Timestamp),
endTime = none(Timestamp), endTime = none(Timestamp),
maxPageSize = StoreMaxPageSize, maxPageSize = MaxPageSize,
ascendingOrder = true ascendingOrder = true
): MessageStoreResult[MessageStorePage] = ): MessageStoreResult[MessageStorePage] =
s.inmemory.getMessagesByHistoryQuery(contentTopic, pubsubTopic, cursor, startTime, endTime, maxPageSize, ascendingOrder) s.inmemory.getMessagesByHistoryQuery(contentTopic, pubsubTopic, cursor, startTime, endTime, maxPageSize, ascendingOrder)

View File

@ -11,6 +11,8 @@ logScope:
topics = "message_store.sqlite_store.retention_policy.capacity" topics = "message_store.sqlite_store.retention_policy.capacity"
const StoreDefaultCapacity*: int = 25_000
const StoreMaxOverflow = 1.3 const StoreMaxOverflow = 1.3
type type

View File

@ -14,6 +14,9 @@ logScope:
topics = "message_store.sqlite_store.retention_policy.time" topics = "message_store.sqlite_store.retention_policy.time"
const StoreDefaultRetentionTime*: int64 = 30.days.seconds
type TimeRetentionPolicy* = ref object of MessageRetentionPolicy type TimeRetentionPolicy* = ref object of MessageRetentionPolicy
retentionTime: chronos.Duration retentionTime: chronos.Duration

View File

@ -13,11 +13,8 @@ import
../../../utils/pagination ../../../utils/pagination
const # TODO: Remove this constant after moving time variance checks to waku store protocol
StoreDefaultCapacity* = 25_000 const StoreMaxTimeVariance* = getNanoSecondTime(20) # 20 seconds maximum allowable sender timestamp "drift" into the future
StoreDefaultRetentionTime* = chronos.days(30).seconds
StoreMaxPageSize* = 100.uint64
StoreMaxTimeVariance* = getNanoSecondTime(20) # 20 seconds maximum allowable sender timestamp "drift" into the future
type type
@ -42,7 +39,7 @@ method getMessagesByHistoryQuery*(
cursor = none(Index), cursor = none(Index),
startTime = none(Timestamp), startTime = none(Timestamp),
endTime = none(Timestamp), endTime = none(Timestamp),
maxPageSize = StoreMaxPageSize, maxPageSize = MaxPageSize,
ascendingOrder = true ascendingOrder = true
): MessageStoreResult[MessageStorePage] {.base.} = discard ): MessageStoreResult[MessageStorePage] {.base.} = discard

View File

@ -96,11 +96,11 @@ method getMessagesByHistoryQuery*(
cursor = none(Index), cursor = none(Index),
startTime = none(Timestamp), startTime = none(Timestamp),
endTime = none(Timestamp), endTime = none(Timestamp),
maxPageSize = StoreMaxPageSize, maxPageSize = MaxPageSize,
ascendingOrder = true ascendingOrder = true
): MessageStoreResult[MessageStorePage] = ): MessageStoreResult[MessageStorePage] =
let pageSizeLimit = if maxPageSize <= 0: StoreMaxPageSize let pageSizeLimit = if maxPageSize <= 0: MaxPageSize
else: min(maxPageSize, StoreMaxPageSize) else: min(maxPageSize, MaxPageSize)
let rows = ?s.db.selectMessagesByHistoryQueryWithLimit( let rows = ?s.db.selectMessagesByHistoryQueryWithLimit(
contentTopic, contentTopic,

View File

@ -17,6 +17,9 @@ logScope:
topics = "message_store.storequeue" topics = "message_store.storequeue"
const StoreQueueDefaultMaxCapacity* = 25_000
type type
IndexedWakuMessage* = object IndexedWakuMessage* = object
# TODO may need to rename this object as it holds both the index and the pubsub topic of a waku message # TODO may need to rename this object as it holds both the index and the pubsub topic of a waku message
@ -239,7 +242,7 @@ proc bwdPage(storeQueue: StoreQueueRef,
#### API #### API
proc new*(T: type StoreQueueRef, capacity: int = StoreDefaultCapacity): T = proc new*(T: type StoreQueueRef, capacity: int = StoreQueueDefaultMaxCapacity): T =
var items = SortedSet[Index, IndexedWakuMessage].init() var items = SortedSet[Index, IndexedWakuMessage].init()
return StoreQueueRef(items: items, capacity: capacity) return StoreQueueRef(items: items, capacity: capacity)
@ -356,8 +359,8 @@ proc getPage*(storeQueue: StoreQueueRef,
let let
cursorOpt = if pagingInfo.cursor == Index(): none(Index) ## TODO: pagingInfo.cursor should be an Option. We shouldn't rely on empty initialisation to determine if set or not! cursorOpt = if pagingInfo.cursor == Index(): none(Index) ## TODO: pagingInfo.cursor should be an Option. We shouldn't rely on empty initialisation to determine if set or not!
else: some(pagingInfo.cursor) else: some(pagingInfo.cursor)
maxPageSize = if pagingInfo.pageSize <= 0: StoreMaxPageSize maxPageSize = if pagingInfo.pageSize <= 0: MaxPageSize
else: min(pagingInfo.pageSize, StoreMaxPageSize) else: min(pagingInfo.pageSize, MaxPageSize)
case pagingInfo.direction case pagingInfo.direction
of PagingDirection.FORWARD: of PagingDirection.FORWARD:
@ -383,7 +386,7 @@ method getMessagesByHistoryQuery*(
cursor = none(Index), cursor = none(Index),
startTime = none(Timestamp), startTime = none(Timestamp),
endTime = none(Timestamp), endTime = none(Timestamp),
maxPageSize = StoreMaxPageSize, maxPageSize = MaxPageSize,
ascendingOrder = true ascendingOrder = true
): MessageStoreResult[MessageStorePage] = ): MessageStoreResult[MessageStorePage] =

View File

@ -4,7 +4,7 @@
{.push raises: [Defect].} {.push raises: [Defect].}
import import
std/[tables, times, sequtils, options, math], std/[tables, times, sequtils, options],
stew/results, stew/results,
chronicles, chronicles,
chronos, chronos,
@ -38,21 +38,13 @@ declarePublicHistogram waku_store_query_duration_seconds, "history query duratio
logScope: logScope:
topics = "wakustore" topics = "wakustore"
const const
WakuStoreCodec* = "/vac/waku/store/2.0.0-beta4" WakuStoreCodec* = "/vac/waku/store/2.0.0-beta4"
DefaultTopic* = "/waku/2/default-waku/proto" DefaultTopic* = "/waku/2/default-waku/proto"
# Constants required for pagination ------------------------------------------- MaxMessageTimestampVariance* = Timestamp(20.seconds.nanoseconds) # 20 seconds maximum allowable sender timestamp "drift"
MaxPageSize* = StoreMaxPageSize
# TODO the DefaultPageSize can be changed, it's current value is random
DefaultPageSize* = uint64(20) # A recommended default number of waku messages per page
MaxTimeVariance* = StoreMaxTimeVariance
const MaxRpcSize = StoreMaxPageSize * MaxWakuMessageSize + 64*1024 # We add a 64kB safety buffer for protocol overhead
# Error types (metric label values) # Error types (metric label values)
@ -208,7 +200,7 @@ proc init*(T: type WakuStore,
rng: ref rand.HmacDrbgContext, rng: ref rand.HmacDrbgContext,
wakuSwap: WakuSwap = nil, wakuSwap: WakuSwap = nil,
retentionPolicy=none(MessageRetentionPolicy)): T = retentionPolicy=none(MessageRetentionPolicy)): T =
let store = StoreQueueRef.new(StoreDefaultCapacity) let store = StoreQueueRef.new()
WakuStore.init(peerManager, rng, store, wakuSwap, retentionPolicy) WakuStore.init(peerManager, rng, store, wakuSwap, retentionPolicy)

View File

@ -12,6 +12,9 @@ import
./rpc ./rpc
const MaxRpcSize* = MaxPageSize * MaxWakuMessageSize + 64*1024 # We add a 64kB safety buffer for protocol overhead
proc encode*(index: Index): ProtoBuffer = proc encode*(index: Index): ProtoBuffer =
## Encode an Index object into a ProtoBuffer ## Encode an Index object into a ProtoBuffer
## returns the resultant ProtoBuffer ## returns the resultant ProtoBuffer

View File

@ -9,6 +9,12 @@ import
./time ./time
const
MaxPageSize*: uint64 = 100
DefaultPageSize*: uint64 = 20 # A recommended default number of waku messages per page
type Index* = object type Index* = object
## This type contains the description of an Index used in the pagination of WakuMessages ## This type contains the description of an Index used in the pagination of WakuMessages
pubsubTopic*: string pubsubTopic*: string