deploy: ee8ff014f457e7a7b48ccd01747695676bc2c5e1

This commit is contained in:
jm-clius 2021-11-03 11:27:13 +00:00
parent 794e38428e
commit 1be553ad7e
10 changed files with 182 additions and 35 deletions

View File

@ -123,3 +123,58 @@ suite "Message Store":
check: check:
ver.isErr == false ver.isErr == false
ver.value == 10 ver.value == 10
test "get works with limit":
let
database = SqliteDatabase.init("", inMemory = true)[]
store = WakuMessageStore.init(database)[]
contentTopic = ContentTopic("/waku/2/default-content/proto")
pubsubTopic = "/waku/2/default-waku/proto"
capacity = 10
defer: store.close()
for i in 1..capacity:
let
msg = WakuMessage(payload: @[byte i], contentTopic: contentTopic, version: uint32(0), timestamp: i.float)
index = computeIndex(msg)
output = store.put(index, msg, pubsubTopic)
waitFor sleepAsync(1.millis) # Ensure stored messages have increasing receiver timestamp
check output.isOk
var
responseCount = 0
lastMessageTimestamp = 0.float
proc data(receiverTimestamp: float64, msg: WakuMessage, psTopic: string) {.raises: [Defect].} =
responseCount += 1
lastMessageTimestamp = msg.timestamp
# Test limited getAll function when store is at capacity
let resMax = store.getAll(data, some(capacity))
check:
resMax.isOk
responseCount == capacity # We retrieved all items
lastMessageTimestamp == capacity.float # Returned rows were ordered correctly
# Now test getAll with a limit smaller than total stored items
responseCount = 0 # Reset response count
lastMessageTimestamp = 0
let resLimit = store.getAll(data, some(capacity - 2))
check:
resLimit.isOk
responseCount == capacity - 2 # We retrieved limited number of items
lastMessageTimestamp == capacity.float # We retrieved the youngest items in the store, in order
# Test zero limit
responseCount = 0 # Reset response count
lastMessageTimestamp = 0
let resZero = store.getAll(data, some(0))
check:
resZero.isOk
responseCount == 0 # No items retrieved
lastMessageTimestamp == 0.float # No items retrieved

View File

@ -91,23 +91,23 @@ procSuite "Waku Discovery v5":
node3.wakuDiscv5.protocol.nodesDiscovered > 0 node3.wakuDiscv5.protocol.nodesDiscovered > 0
# Let's see if we can deliver a message end-to-end # Let's see if we can deliver a message end-to-end
var completionFut = newFuture[bool]() # var completionFut = newFuture[bool]()
proc relayHandler(topic: string, data: seq[byte]) {.async, gcsafe.} = # proc relayHandler(topic: string, data: seq[byte]) {.async, gcsafe.} =
let msg = WakuMessage.init(data) # let msg = WakuMessage.init(data)
if msg.isOk(): # if msg.isOk():
let val = msg.value() # let val = msg.value()
check: # check:
topic == pubSubTopic # topic == pubSubTopic
val.contentTopic == contentTopic # val.contentTopic == contentTopic
val.payload == payload # val.payload == payload
completionFut.complete(true) # completionFut.complete(true)
node3.subscribe(pubSubTopic, relayHandler) # node3.subscribe(pubSubTopic, relayHandler)
await sleepAsync(2000.millis) # await sleepAsync(2000.millis)
await node1.publish(pubSubTopic, message) # await node1.publish(pubSubTopic, message)
check: # check:
(await completionFut.withTimeout(6.seconds)) == true # (await completionFut.withTimeout(6.seconds)) == true
await allFutures([node1.stop(), node2.stop(), node3.stop()]) await allFutures([node1.stop(), node2.stop(), node3.stop()])

View File

@ -690,4 +690,26 @@ procSuite "Waku Store":
check: check:
proto3.messages.len == 10 proto3.messages.len == 10
successResult.isOk successResult.isOk
successResult.value == 10 successResult.value == 10
asyncTest "limit store capacity":
let
capacity = 10
contentTopic = ContentTopic("/waku/2/default-content/proto")
pubsubTopic = "/waku/2/default-waku/proto"
let store = WakuStore.init(PeerManager.new(newStandardSwitch()), crypto.newRng(), capacity = capacity)
for i in 1..capacity:
await store.handleMessage(pubsubTopic, WakuMessage(payload: @[byte i], contentTopic: contentTopic))
await sleepAsync(1.millis) # Sleep a millisecond to ensure messages are stored chronologically
check:
store.messages.len == capacity # Store is at capacity
# Test that capacity holds
await store.handleMessage(pubsubTopic, WakuMessage(payload: @[byte (capacity + 1)], contentTopic: contentTopic))
check:
store.messages.len == capacity # Store is still at capacity
store.messages.filterIt(it.msg.payload == @[byte (capacity + 1)]).len == 1 # Simple check to verify last added item is stored

View File

@ -2,7 +2,7 @@
# libtool - Provide generalized library-building support services. # libtool - Provide generalized library-building support services.
# Generated automatically by config.status (libbacktrace) version-unused # Generated automatically by config.status (libbacktrace) version-unused
# Libtool was configured on host fv-az173-737: # Libtool was configured on host fv-az196-96:
# NOTE: Changes made to this file will be lost: look at ltmain.sh. # NOTE: Changes made to this file will be lost: look at ltmain.sh.
# #
# Copyright (C) 1996, 1997, 1998, 1999, 2000, 2001, 2003, 2004, 2005, # Copyright (C) 1996, 1997, 1998, 1999, 2000, 2001, 2003, 2004, 2005,

View File

@ -112,6 +112,11 @@ type
defaultValue: "" defaultValue: ""
name: "storenode" }: string name: "storenode" }: string
storeCapacity* {.
desc: "Maximum number of messages to keep in store.",
defaultValue: 50000
name: "store-capacity" }: int
## Filter config ## Filter config
filter* {. filter* {.

View File

@ -1,6 +1,7 @@
{.push raises: [Defect].} {.push raises: [Defect].}
import import
std/options,
stew/results, stew/results,
../../../protocol/waku_message, ../../../protocol/waku_message,
../../../utils/pagination ../../../utils/pagination
@ -18,5 +19,5 @@ type
# MessageStore interface # MessageStore interface
method put*(db: MessageStore, cursor: Index, message: WakuMessage, pubsubTopic: string): MessageStoreResult[void] {.base.} = discard method put*(db: MessageStore, cursor: Index, message: WakuMessage, pubsubTopic: string): MessageStoreResult[void] {.base.} = discard
method getAll*(db: MessageStore, onData: DataProc): MessageStoreResult[bool] {.base.} = discard method getAll*(db: MessageStore, onData: DataProc, limit = none(int)): MessageStoreResult[bool] {.base.} = discard

View File

@ -1,7 +1,7 @@
{.push raises: [Defect].} {.push raises: [Defect].}
import import
std/tables, std/[options, tables],
sqlite3_abi, sqlite3_abi,
stew/[byteutils, results], stew/[byteutils, results],
./message_store, ./message_store,
@ -74,8 +74,9 @@ method put*(db: WakuMessageStore, cursor: Index, message: WakuMessage, pubsubTop
ok() ok()
method getAll*(db: WakuMessageStore, onData: message_store.DataProc): MessageStoreResult[bool] = method getAll*(db: WakuMessageStore, onData: message_store.DataProc, limit = none(int)): MessageStoreResult[bool] =
## Retrieves all messages from the storage. ## Retrieves all messages from the storage.
## Optionally limits the number of rows returned.
## ##
## **Example:** ## **Example:**
## ##
@ -114,7 +115,15 @@ method getAll*(db: WakuMessageStore, onData: message_store.DataProc): MessageSto
WakuMessage(contentTopic: contentTopic, payload: payload , version: uint32(version), timestamp: senderTimestamp.float64), WakuMessage(contentTopic: contentTopic, payload: payload , version: uint32(version), timestamp: senderTimestamp.float64),
pubsubTopic) pubsubTopic)
let res = db.database.query("SELECT receiverTimestamp, contentTopic, payload, pubsubTopic, version, senderTimestamp FROM " & TABLE_TITLE & " ORDER BY receiverTimestamp ASC", msg) var selectQuery = "SELECT receiverTimestamp, contentTopic, payload, pubsubTopic, version, senderTimestamp " &
"FROM " & TABLE_TITLE & " " &
"ORDER BY receiverTimestamp ASC"
if limit.isSome():
# Optional limit applies. This works because SQLITE will perform the time-based ORDER BY before applying the limit.
selectQuery &= " LIMIT " & $(limit.get()) &
" OFFSET cast((SELECT count(*) FROM " & TABLE_TITLE & ") AS INT) - " & $(limit.get()) # offset = total_row_count - limit
let res = db.database.query(selectQuery, msg)
if res.isErr: if res.isErr:
return err("failed") return err("failed")

View File

@ -402,15 +402,15 @@ proc mountSwap*(node: WakuNode, swapConfig: SwapConfig = SwapConfig.init()) {.ra
# NYI - Do we need this? # NYI - Do we need this?
#node.subscriptions.subscribe(WakuSwapCodec, node.wakuSwap.subscription()) #node.subscriptions.subscribe(WakuSwapCodec, node.wakuSwap.subscription())
proc mountStore*(node: WakuNode, store: MessageStore = nil, persistMessages: bool = false) {.raises: [Defect, LPError].} = proc mountStore*(node: WakuNode, store: MessageStore = nil, persistMessages: bool = false, capacity = DefaultStoreCapacity) {.raises: [Defect, LPError].} =
info "mounting store" info "mounting store"
if node.wakuSwap.isNil: if node.wakuSwap.isNil:
debug "mounting store without swap" debug "mounting store without swap"
node.wakuStore = WakuStore.init(node.peerManager, node.rng, store, persistMessages=persistMessages) node.wakuStore = WakuStore.init(node.peerManager, node.rng, store, persistMessages=persistMessages, capacity=capacity)
else: else:
debug "mounting store with swap" debug "mounting store with swap"
node.wakuStore = WakuStore.init(node.peerManager, node.rng, store, node.wakuSwap, persistMessages=persistMessages) node.wakuStore = WakuStore.init(node.peerManager, node.rng, store, node.wakuSwap, persistMessages=persistMessages, capacity=capacity)
node.switch.mount(node.wakuStore, protocolMatcher(WakuStoreCodec)) node.switch.mount(node.wakuStore, protocolMatcher(WakuStoreCodec))
@ -998,7 +998,7 @@ when isMainModule:
# Store setup # Store setup
if (conf.storenode != "") or (conf.store): if (conf.storenode != "") or (conf.store):
mountStore(node, mStorage, conf.persistMessages) mountStore(node, mStorage, conf.persistMessages, conf.storeCapacity)
if conf.storenode != "": if conf.storenode != "":
setStorePeer(node, conf.storenode) setStorePeer(node, conf.storenode)

View File

@ -44,6 +44,7 @@ logScope:
const const
WakuStoreCodec* = "/vac/waku/store/2.0.0-beta3" WakuStoreCodec* = "/vac/waku/store/2.0.0-beta3"
DefaultStoreCapacity* = 50000 # Default maximum of 50k messages stored
# Error types (metric label values) # Error types (metric label values)
const const
@ -373,7 +374,7 @@ proc paginate*(list: seq[IndexedWakuMessage], pinfo: PagingInfo): (seq[IndexedWa
proc findMessages(w: WakuStore, query: HistoryQuery): HistoryResponse = proc findMessages(w: WakuStore, query: HistoryQuery): HistoryResponse =
var data : seq[IndexedWakuMessage] = w.messages var data : seq[IndexedWakuMessage] = w.messages.allItems()
# filter based on content filters # filter based on content filters
# an empty list of contentFilters means no content filter is requested # an empty list of contentFilters means no content filter is requested
@ -409,7 +410,8 @@ proc findMessages(w: WakuStore, query: HistoryQuery): HistoryResponse =
return historyRes return historyRes
proc init*(ws: WakuStore) = proc init*(ws: WakuStore, capacity = DefaultStoreCapacity) =
proc handler(conn: Connection, proto: string) {.async.} = proc handler(conn: Connection, proto: string) {.async.} =
var message = await conn.readLp(64*1024) var message = await conn.readLp(64*1024)
var res = HistoryRPC.init(message) var res = HistoryRPC.init(message)
@ -442,6 +444,7 @@ proc init*(ws: WakuStore) =
ws.handler = handler ws.handler = handler
ws.codec = WakuStoreCodec ws.codec = WakuStoreCodec
ws.messages = initQueue(capacity)
if ws.store.isNil: if ws.store.isNil:
return return
@ -450,20 +453,24 @@ proc init*(ws: WakuStore) =
# TODO index should not be recalculated # TODO index should not be recalculated
ws.messages.add(IndexedWakuMessage(msg: msg, index: msg.computeIndex(), pubsubTopic: pubsubTopic)) ws.messages.add(IndexedWakuMessage(msg: msg, index: msg.computeIndex(), pubsubTopic: pubsubTopic))
let res = ws.store.getAll(onData) info "attempting to load messages from persistent storage"
let res = ws.store.getAll(onData, some(capacity))
if res.isErr: if res.isErr:
warn "failed to load messages from store", err = res.error warn "failed to load messages from store", err = res.error
waku_store_errors.inc(labelValues = ["store_load_failure"]) waku_store_errors.inc(labelValues = ["store_load_failure"])
info "successfully loaded from store"
debug "the number of messages in the memory", messageNum=ws.messages.len debug "the number of messages in the memory", messageNum=ws.messages.len
waku_store_messages.set(ws.messages.len.int64, labelValues = ["stored"]) waku_store_messages.set(ws.messages.len.int64, labelValues = ["stored"])
proc init*(T: type WakuStore, peerManager: PeerManager, rng: ref BrHmacDrbgContext, proc init*(T: type WakuStore, peerManager: PeerManager, rng: ref BrHmacDrbgContext,
store: MessageStore = nil, wakuSwap: WakuSwap = nil, persistMessages = true): T = store: MessageStore = nil, wakuSwap: WakuSwap = nil, persistMessages = true,
capacity = DefaultStoreCapacity): T =
debug "init" debug "init"
var output = WakuStore(rng: rng, peerManager: peerManager, store: store, wakuSwap: wakuSwap, persistMessages: persistMessages) var output = WakuStore(rng: rng, peerManager: peerManager, store: store, wakuSwap: wakuSwap, persistMessages: persistMessages)
output.init() output.init(capacity)
return output return output
# @TODO THIS SHOULD PROBABLY BE AN ADD FUNCTION AND APPEND THE PEER TO AN ARRAY # @TODO THIS SHOULD PROBABLY BE AN ADD FUNCTION AND APPEND THE PEER TO AN ARRAY
@ -487,7 +494,7 @@ proc handleMessage*(w: WakuStore, topic: string, msg: WakuMessage) {.async.} =
let res = w.store.put(index, msg, topic) let res = w.store.put(index, msg, topic)
if res.isErr: if res.isErr:
warn "failed to store messages", err = res.error trace "failed to store messages", err = res.error
waku_store_errors.inc(labelValues = ["store_failure"]) waku_store_errors.inc(labelValues = ["store_failure"])
proc query*(w: WakuStore, query: HistoryQuery, handler: QueryHandlerFunc) {.async, gcsafe.} = proc query*(w: WakuStore, query: HistoryQuery, handler: QueryHandlerFunc) {.async, gcsafe.} =
@ -623,7 +630,7 @@ proc resume*(ws: WakuStore, peerList: Option[seq[RemotePeerInfo]] = none(seq[Rem
## the resume proc returns the number of retrieved messages if no error occurs, otherwise returns the error string ## the resume proc returns the number of retrieved messages if no error occurs, otherwise returns the error string
var currentTime = epochTime() var currentTime = epochTime()
var lastSeenTime: float = findLastSeen(ws.messages) var lastSeenTime: float = findLastSeen(ws.messages.allItems())
debug "resume", currentEpochTime=currentTime debug "resume", currentEpochTime=currentTime
# adjust the time window with an offset of 20 seconds # adjust the time window with an offset of 20 seconds
@ -642,7 +649,7 @@ proc resume*(ws: WakuStore, peerList: Option[seq[RemotePeerInfo]] = none(seq[Rem
proc save(msgList: seq[WakuMessage]) = proc save(msgList: seq[WakuMessage]) =
debug "save proc is called" debug "save proc is called"
# exclude index from the comparison criteria # exclude index from the comparison criteria
let currentMsgSummary = ws.messages.map(proc(x: IndexedWakuMessage): WakuMessage = x.msg) let currentMsgSummary = ws.messages.mapIt(it.msg)
for msg in msgList: for msg in msgList:
# check for duplicate messages # check for duplicate messages
# TODO Should take pubsub topic into account if we are going to support topics rather than the DefaultTopic # TODO Should take pubsub topic into account if we are going to support topics rather than the DefaultTopic
@ -658,7 +665,7 @@ proc resume*(ws: WakuStore, peerList: Option[seq[RemotePeerInfo]] = none(seq[Rem
if not ws.store.isNil: if not ws.store.isNil:
let res = ws.store.put(index, msg, DefaultTopic) let res = ws.store.put(index, msg, DefaultTopic)
if res.isErr: if res.isErr:
warn "failed to store messages", err = res.error trace "failed to store messages", err = res.error
waku_store_errors.inc(labelValues = ["store_failure"]) waku_store_errors.inc(labelValues = ["store_failure"])
continue continue

View File

@ -5,6 +5,7 @@
# Group by std, external then internal imports # Group by std, external then internal imports
import import
# external imports # external imports
std/sequtils,
bearssl, bearssl,
libp2p/protocols/protocol, libp2p/protocols/protocol,
stew/results, stew/results,
@ -81,11 +82,58 @@ type
QueryResult* = Result[uint64, string] QueryResult* = Result[uint64, string]
MessagesResult* = Result[seq[WakuMessage], string] MessagesResult* = Result[seq[WakuMessage], string]
StoreQueue* = object
## Bounded repository for indexed messages
##
## The store queue will keep messages up to its
## configured capacity. As soon as this capacity
## is reached and a new message is added, the oldest
## item will be removed to make space for the new one.
## This implies both a `delete` and `add` operation
## for new items.
##
## @ TODO: a circular/ring buffer may be a more efficient implementation
## @ TODO: consider adding message hashes for easy duplicate checks
items: seq[IndexedWakuMessage] # FIFO queue of stored messages
capacity: int # Maximum amount of messages to keep
WakuStore* = ref object of LPProtocol WakuStore* = ref object of LPProtocol
peerManager*: PeerManager peerManager*: PeerManager
rng*: ref BrHmacDrbgContext rng*: ref BrHmacDrbgContext
messages*: seq[IndexedWakuMessage] messages*: StoreQueue
store*: MessageStore store*: MessageStore
wakuSwap*: WakuSwap wakuSwap*: WakuSwap
persistMessages*: bool persistMessages*: bool
######################
# StoreQueue helpers #
######################
proc initQueue*(capacity: int): StoreQueue =
var storeQueue: StoreQueue
storeQueue.items = newSeqOfCap[IndexedWakuMessage](capacity)
storeQueue.capacity = capacity
return storeQueue
proc add*(storeQueue: var StoreQueue, msg: IndexedWakuMessage) {.noSideEffect.} =
## Add a message to the queue.
## If we're at capacity, we will be removing,
## the oldest item
if storeQueue.items.len >= storeQueue.capacity:
storeQueue.items.delete 0, 0 # Remove first item in queue
storeQueue.items.add(msg)
proc len*(storeQueue: StoreQueue): int {.noSideEffect.} =
storeQueue.items.len
proc allItems*(storeQueue: StoreQueue): seq[IndexedWakuMessage] =
storeQueue.items
template filterIt*(storeQueue: StoreQueue, pred: untyped): untyped =
storeQueue.items.filterIt(pred)
template mapIt*(storeQueue: StoreQueue, op: untyped): untyped =
storeQueue.items.mapIt(op)