mirror of
https://github.com/waku-org/nwaku.git
synced 2025-02-19 02:16:33 +00:00
deploy: d0cf3ed1f9299618bbab920c66ee06372f24c1fd
This commit is contained in:
parent
3d81c61058
commit
61a8e0f234
@ -9,6 +9,7 @@
|
||||
|
||||
- Waku v1 <> v2 bridge now supports DNS `multiaddrs`
|
||||
- Waku v1 <> v2 bridge now validates content topics before attempting to bridge a message from Waku v2 to Waku v1
|
||||
- Message store now auto deletes messages once over specified `--store-capacity`. This can significantly improve node start-up times.
|
||||
|
||||
### Fixes
|
||||
|
||||
|
@ -3,12 +3,15 @@
|
||||
import
|
||||
std/[unittest, options, tables, sets, times, os, strutils],
|
||||
chronos,
|
||||
sqlite3_abi,
|
||||
stew/byteutils,
|
||||
../../waku/v2/node/storage/message/waku_message_store,
|
||||
../../waku/v2/node/storage/sqlite,
|
||||
../../waku/v2/protocol/waku_store/waku_store,
|
||||
../../waku/v2/utils/time,
|
||||
./utils
|
||||
|
||||
|
||||
suite "Message Store":
|
||||
test "set and get works":
|
||||
let
|
||||
@ -125,57 +128,67 @@ suite "Message Store":
|
||||
ver.isErr == false
|
||||
ver.value == 10
|
||||
|
||||
test "get works with limit":
|
||||
let
|
||||
database = SqliteDatabase.init("", inMemory = true)[]
|
||||
store = WakuMessageStore.init(database)[]
|
||||
contentTopic = ContentTopic("/waku/2/default-content/proto")
|
||||
pubsubTopic = "/waku/2/default-waku/proto"
|
||||
capacity = 10
|
||||
|
||||
defer: store.close()
|
||||
|
||||
for i in 1..capacity:
|
||||
test "number of messages retrieved by getAll is bounded by storeCapacity":
|
||||
let
|
||||
msg = WakuMessage(payload: @[byte i], contentTopic: contentTopic, version: uint32(0), timestamp: Timestamp(i))
|
||||
index = computeIndex(msg)
|
||||
output = store.put(index, msg, pubsubTopic)
|
||||
|
||||
waitFor sleepAsync(1.millis) # Ensure stored messages have increasing receiver timestamp
|
||||
check output.isOk
|
||||
database = SqliteDatabase.init("", inMemory = true)[]
|
||||
contentTopic = ContentTopic("/waku/2/default-content/proto")
|
||||
pubsubTopic = "/waku/2/default-waku/proto"
|
||||
capacity = 10
|
||||
store = WakuMessageStore.init(database, capacity)[]
|
||||
|
||||
var
|
||||
responseCount = 0
|
||||
lastMessageTimestamp = Timestamp(0)
|
||||
defer: store.close()
|
||||
|
||||
proc data(receiverTimestamp: Timestamp, msg: WakuMessage, psTopic: string) {.raises: [Defect].} =
|
||||
responseCount += 1
|
||||
lastMessageTimestamp = msg.timestamp
|
||||
for i in 1..capacity:
|
||||
let
|
||||
msg = WakuMessage(payload: @[byte i], contentTopic: contentTopic, version: uint32(0), timestamp: Timestamp(i))
|
||||
index = computeIndex(msg)
|
||||
output = store.put(index, msg, pubsubTopic)
|
||||
check output.isOk
|
||||
|
||||
# Test limited getAll function when store is at capacity
|
||||
let resMax = store.getAll(data, some(capacity))
|
||||
|
||||
check:
|
||||
resMax.isOk
|
||||
responseCount == capacity # We retrieved all items
|
||||
lastMessageTimestamp == Timestamp(capacity) # Returned rows were ordered correctly
|
||||
var
|
||||
responseCount = 0
|
||||
lastMessageTimestamp = Timestamp(0)
|
||||
|
||||
# Now test getAll with a limit smaller than total stored items
|
||||
responseCount = 0 # Reset response count
|
||||
lastMessageTimestamp = 0
|
||||
let resLimit = store.getAll(data, some(capacity - 2))
|
||||
proc data(receiverTimestamp: Timestamp, msg: WakuMessage, psTopic: string) {.raises: [Defect].} =
|
||||
responseCount += 1
|
||||
lastMessageTimestamp = msg.timestamp
|
||||
|
||||
check:
|
||||
resLimit.isOk
|
||||
responseCount == capacity - 2 # We retrieved limited number of items
|
||||
lastMessageTimestamp == Timestamp(capacity) # We retrieved the youngest items in the store, in order
|
||||
|
||||
# Test zero limit
|
||||
responseCount = 0 # Reset response count
|
||||
lastMessageTimestamp = 0
|
||||
let resZero = store.getAll(data, some(0))
|
||||
# Test limited getAll function when store is at capacity
|
||||
let resMax = store.getAll(data)
|
||||
|
||||
check:
|
||||
resMax.isOk
|
||||
responseCount == capacity # We retrieved all items
|
||||
lastMessageTimestamp == Timestamp(capacity) # Returned rows were ordered correctly # TODO: not representative because the timestamp only has second resolution
|
||||
|
||||
test "DB store capacity":
|
||||
let
|
||||
database = SqliteDatabase.init("", inMemory = true)[]
|
||||
contentTopic = ContentTopic("/waku/2/default-content/proto")
|
||||
pubsubTopic = "/waku/2/default-waku/proto"
|
||||
capacity = 100
|
||||
overload = 65
|
||||
store = WakuMessageStore.init(database, capacity)[]
|
||||
|
||||
defer: store.close()
|
||||
|
||||
for i in 1..capacity+overload:
|
||||
let
|
||||
msg = WakuMessage(payload: ($i).toBytes(), contentTopic: contentTopic, version: uint32(0), timestamp: Timestamp(i))
|
||||
index = computeIndex(msg)
|
||||
output = store.put(index, msg, pubsubTopic)
|
||||
check output.isOk
|
||||
|
||||
# count messages in DB
|
||||
var numMessages: int64
|
||||
proc handler(s: ptr sqlite3_stmt) =
|
||||
numMessages = sqlite3_column_int64(s, 0)
|
||||
let countQuery = "SELECT COUNT(*) FROM message" # the table name is set in a const in waku_message_store
|
||||
discard store.database.query(countQuery, handler)
|
||||
|
||||
check:
|
||||
# expected number of messages is 120 because
|
||||
# (capacity = 100) + (half of the overflow window = 15) + (5 messages added after after the last delete)
|
||||
# the window size changes when changing `const maxStoreOverflow = 1.3 in waku_message_store
|
||||
numMessages == 120
|
||||
|
||||
check:
|
||||
resZero.isOk
|
||||
responseCount == 0 # No items retrieved
|
||||
lastMessageTimestamp == Timestamp(0) # No items retrieved
|
||||
|
@ -2,7 +2,7 @@
|
||||
|
||||
# libtool - Provide generalized library-building support services.
|
||||
# Generated automatically by config.status (libbacktrace) version-unused
|
||||
# Libtool was configured on host fv-az132-586:
|
||||
# Libtool was configured on host fv-az449-957:
|
||||
# NOTE: Changes made to this file will be lost: look at ltmain.sh.
|
||||
#
|
||||
# Copyright (C) 1996, 1997, 1998, 1999, 2000, 2001, 2003, 2004, 2005,
|
||||
|
@ -20,5 +20,5 @@ type
|
||||
|
||||
# MessageStore interface
|
||||
method put*(db: MessageStore, cursor: Index, message: WakuMessage, pubsubTopic: string): MessageStoreResult[void] {.base.} = discard
|
||||
method getAll*(db: MessageStore, onData: DataProc, limit = none(int)): MessageStoreResult[bool] {.base.} = discard
|
||||
method getAll*(db: MessageStore, onData: DataProc): MessageStoreResult[bool] {.base.} = discard
|
||||
|
||||
|
@ -4,6 +4,7 @@ import
|
||||
std/[options, tables],
|
||||
sqlite3_abi,
|
||||
stew/[byteutils, results],
|
||||
chronicles,
|
||||
./message_store,
|
||||
../sqlite,
|
||||
../../../protocol/waku_message,
|
||||
@ -12,18 +13,59 @@ import
|
||||
|
||||
export sqlite
|
||||
|
||||
logScope:
|
||||
topics = "wakuMessageStore"
|
||||
|
||||
const TABLE_TITLE = "Message"
|
||||
const MaxStoreOverflow = 1.3 # has to be > 1.0
|
||||
|
||||
# The code in this file is an adaptation of the Sqlite KV Store found in nim-eth.
|
||||
# https://github.com/status-im/nim-eth/blob/master/eth/db/kvstore_sqlite3.nim
|
||||
#
|
||||
# Most of it is a direct copy, the only unique functions being `get` and `put`.
|
||||
|
||||
type
|
||||
# WakuMessageStore implements auto deletion as follows:
|
||||
# The sqlite DB will store up to `storeMaxLoad = storeCapacity` * `MaxStoreOverflow` messages, giving an overflow window of (storeCapacity*MaxStoreOverflow - storeCapacity).
|
||||
# In case of an overflow, messages are sorted by `receiverTimestamp` and the oldest ones are deleted. The number of messages that get deleted is (overflow window / 2) = deleteWindow,
|
||||
# bringing the total number of stored messages back to `storeCapacity + (overflow window / 2)`. The rationale for batch deleting is efficiency.
|
||||
# We keep half of the overflow window in addition to `storeCapacity` because we delete the oldest messages with respect to `receiverTimestamp` instead of `senderTimestamp`.
|
||||
# `ReceiverTimestamp` is guaranteed to be set, while senders could omit setting `senderTimestamp`.
|
||||
# However, `receiverTimestamp` can differ from node to node for the same message.
|
||||
# So sorting by `receiverTimestamp` might (slightly) prioritize some actually older messages and we compensate that by keeping half of the overflow window.
|
||||
WakuMessageStore* = ref object of MessageStore
|
||||
database*: SqliteDatabase
|
||||
numMessages: int
|
||||
storeCapacity: int # represents both the number of messages that are persisted in the sqlite DB (excl. the overflow window explained above), and the number of messages that get loaded via `getAll`.
|
||||
storeMaxLoad: int # = storeCapacity * MaxStoreOverflow
|
||||
deleteWindow: int # = (storeCapacity * MaxStoreOverflow - storeCapacity)/2; half of the overflow window, the amount of messages deleted when overflow occurs
|
||||
|
||||
proc init*(T: type WakuMessageStore, db: SqliteDatabase): MessageStoreResult[T] =
|
||||
proc messageCount(db: SqliteDatabase): MessageStoreResult[int64] =
|
||||
var numMessages: int64
|
||||
proc handler(s: ptr sqlite3_stmt) =
|
||||
numMessages = sqlite3_column_int64(s, 0)
|
||||
let countQuery = "SELECT COUNT(*) FROM " & TABLE_TITLE
|
||||
let countRes = db.query(countQuery, handler)
|
||||
if countRes.isErr:
|
||||
return err("failed to count number of messages in DB")
|
||||
ok(numMessages)
|
||||
|
||||
proc deleteOldest(db: WakuMessageStore): MessageStoreResult[void] =
|
||||
var deleteQuery = "DELETE FROM " & TABLE_TITLE & " " &
|
||||
"WHERE id NOT IN " &
|
||||
"(SELECT id FROM " & TABLE_TITLE & " " &
|
||||
"ORDER BY receiverTimestamp DESC " &
|
||||
"LIMIT " & $(db.storeCapacity + db.deleteWindow) & ")"
|
||||
let res = db.database.query(deleteQuery, proc(s: ptr sqlite3_stmt) = discard)
|
||||
if res.isErr:
|
||||
return err(res.error)
|
||||
db.numMessages = db.storeCapacity + db.deleteWindow # sqlite3 DELETE does not return the number of deleted rows; Ideally we would subtract the number of actually deleted messages. We could run a separate COUNT.
|
||||
|
||||
when defined(debug):
|
||||
let numMessages = messageCount(db.database).get() # requires another SELECT query, so only run in debug mode
|
||||
debug "Oldest messages deleted from DB due to overflow.", storeCapacity=db.storeCapacity, maxStore=db.storeMaxLoad, deleteWindow=db.deleteWindow, messagesLeft=numMessages
|
||||
|
||||
ok()
|
||||
|
||||
proc init*(T: type WakuMessageStore, db: SqliteDatabase, storeCapacity: int = 50000): MessageStoreResult[T] =
|
||||
## Table is the SQL query for creating the messages Table.
|
||||
## It contains:
|
||||
## - 4-Byte ContentTopic stored as an Integer
|
||||
@ -45,11 +87,28 @@ proc init*(T: type WakuMessageStore, db: SqliteDatabase): MessageStoreResult[T]
|
||||
if prepare.isErr:
|
||||
return err("failed to prepare")
|
||||
|
||||
let res = prepare.value.exec(())
|
||||
if res.isErr:
|
||||
let prepareRes = prepare.value.exec(())
|
||||
if prepareRes.isErr:
|
||||
return err("failed to exec")
|
||||
|
||||
ok(WakuMessageStore(database: db))
|
||||
let numMessages = messageCount(db).get()
|
||||
debug "number of messages in sqlite database", messageNum=numMessages
|
||||
|
||||
let storeMaxLoad = int(float(storeCapacity) * MaxStoreOverflow)
|
||||
let deleteWindow = int(float(storeMaxLoad - storeCapacity) / 2)
|
||||
let wms = WakuMessageStore(database: db,
|
||||
numMessages: int(numMessages),
|
||||
storeCapacity: storeCapacity,
|
||||
storeMaxLoad: storeMaxLoad,
|
||||
deleteWindow: deleteWindow)
|
||||
|
||||
# If the loaded db is already over max load, delete the oldest messages before returning the WakuMessageStore object
|
||||
if wms.numMessages >= wms.storeMaxLoad:
|
||||
let res = wms.deleteOldest()
|
||||
if res.isErr: return err("deleting oldest messages failed")
|
||||
|
||||
ok(wms)
|
||||
|
||||
|
||||
method put*(db: WakuMessageStore, cursor: Index, message: WakuMessage, pubsubTopic: string): MessageStoreResult[void] =
|
||||
## Adds a message to the storage.
|
||||
@ -74,11 +133,17 @@ method put*(db: WakuMessageStore, cursor: Index, message: WakuMessage, pubsubTop
|
||||
if res.isErr:
|
||||
return err("failed")
|
||||
|
||||
db.numMessages += 1
|
||||
if db.numMessages >= db.storeMaxLoad:
|
||||
let res = db.deleteOldest()
|
||||
if res.isErr: return err("deleting oldest failed")
|
||||
|
||||
ok()
|
||||
|
||||
method getAll*(db: WakuMessageStore, onData: message_store.DataProc, limit = none(int)): MessageStoreResult[bool] =
|
||||
## Retrieves all messages from the storage.
|
||||
## Optionally limits the number of rows returned.
|
||||
|
||||
|
||||
method getAll*(db: WakuMessageStore, onData: message_store.DataProc): MessageStoreResult[bool] =
|
||||
## Retrieves `storeCapacity` many messages from the storage.
|
||||
##
|
||||
## **Example:**
|
||||
##
|
||||
@ -120,10 +185,10 @@ method getAll*(db: WakuMessageStore, onData: message_store.DataProc, limit = non
|
||||
var selectQuery = "SELECT receiverTimestamp, contentTopic, payload, pubsubTopic, version, senderTimestamp " &
|
||||
"FROM " & TABLE_TITLE & " " &
|
||||
"ORDER BY receiverTimestamp ASC"
|
||||
if limit.isSome():
|
||||
# Optional limit applies. This works because SQLITE will perform the time-based ORDER BY before applying the limit.
|
||||
selectQuery &= " LIMIT " & $(limit.get()) &
|
||||
" OFFSET cast((SELECT count(*) FROM " & TABLE_TITLE & ") AS INT) - " & $(limit.get()) # offset = total_row_count - limit
|
||||
|
||||
# Apply limit. This works because SQLITE will perform the time-based ORDER BY before applying the limit.
|
||||
selectQuery &= " LIMIT " & $db.storeCapacity &
|
||||
" OFFSET cast((SELECT count(*) FROM " & TABLE_TITLE & ") AS INT) - " & $db.storeCapacity # offset = total_row_count - limit
|
||||
|
||||
let res = db.database.query(selectQuery, msg)
|
||||
if res.isErr:
|
||||
|
@ -1018,7 +1018,7 @@ when isMainModule:
|
||||
|
||||
if conf.persistMessages:
|
||||
# Historical message persistence enable. Set up Message table in storage
|
||||
let res = WakuMessageStore.init(sqliteDatabase)
|
||||
let res = WakuMessageStore.init(sqliteDatabase, conf.storeCapacity)
|
||||
|
||||
if res.isErr:
|
||||
warn "failed to init WakuMessageStore", err = res.error
|
||||
|
@ -369,7 +369,7 @@ proc init*(ws: WakuStore, capacity = DefaultStoreCapacity) =
|
||||
|
||||
info "attempting to load messages from persistent storage"
|
||||
|
||||
let res = ws.store.getAll(onData, some(capacity))
|
||||
let res = ws.store.getAll(onData)
|
||||
if res.isErr:
|
||||
warn "failed to load messages from store", err = res.error
|
||||
waku_store_errors.inc(labelValues = ["store_load_failure"])
|
||||
|
@ -110,8 +110,8 @@ type
|
||||
WakuStore* = ref object of LPProtocol
|
||||
peerManager*: PeerManager
|
||||
rng*: ref BrHmacDrbgContext
|
||||
messages*: StoreQueueRef
|
||||
store*: MessageStore
|
||||
messages*: StoreQueueRef # in-memory message store
|
||||
store*: MessageStore # sqlite DB handle
|
||||
wakuSwap*: WakuSwap
|
||||
persistMessages*: bool
|
||||
|
||||
@ -439,4 +439,4 @@ proc len*(storeQueue: StoreQueueRef): int {.noSideEffect.} =
|
||||
storeQueue.items.len
|
||||
|
||||
proc `$`*(storeQueue: StoreQueueRef): string =
|
||||
$(storeQueue.items)
|
||||
$(storeQueue.items)
|
||||
|
Loading…
x
Reference in New Issue
Block a user