mirror of
https://github.com/waku-org/nwaku.git
synced 2025-01-15 01:14:56 +00:00
deploy: 4f93510fc9a938954dd85593f8dc4135a1c367de
This commit is contained in:
parent
03a05d6c08
commit
09b4a4f3df
@ -2,7 +2,7 @@
|
||||
|
||||
# libtool - Provide generalized library-building support services.
|
||||
# Generated automatically by config.status (libbacktrace) version-unused
|
||||
# Libtool was configured on host fv-az178-280:
|
||||
# Libtool was configured on host fv-az204-288:
|
||||
# NOTE: Changes made to this file will be lost: look at ltmain.sh.
|
||||
#
|
||||
# Copyright (C) 1996, 1997, 1998, 1999, 2000, 2001, 2003, 2004, 2005,
|
||||
|
@ -6,19 +6,34 @@
|
||||
import
|
||||
std/options,
|
||||
stew/results,
|
||||
chronos
|
||||
import
|
||||
../../../protocol/waku_message,
|
||||
../../../protocol/waku_store/rpc,
|
||||
../../../utils/time,
|
||||
../../../utils/pagination
|
||||
|
||||
|
||||
type
|
||||
DataProc* = proc(receiverTimestamp: Timestamp, msg: WakuMessage, pubsubTopic: string) {.closure, raises: [Defect].}
|
||||
const
|
||||
StoreDefaultCapacity* = 25_000
|
||||
StoreMaxOverflow* = 1.3
|
||||
StoreDefaultRetentionTime* = chronos.days(30).seconds
|
||||
StoreMaxPageSize* = 100.uint64
|
||||
StoreMaxTimeVariance* = getNanoSecondTime(20) # 20 seconds maximum allowable sender timestamp "drift" into the future
|
||||
|
||||
|
||||
type
|
||||
MessageStoreResult*[T] = Result[T, string]
|
||||
|
||||
MessageStorePage* = (seq[WakuMessage], Option[PagingInfo])
|
||||
|
||||
MessageStoreRow* = (Timestamp, WakuMessage, string)
|
||||
|
||||
MessageStore* = ref object of RootObj
|
||||
|
||||
# TODO: Deprecate the following type
|
||||
type DataProc* = proc(receiverTimestamp: Timestamp, msg: WakuMessage, pubsubTopic: string) {.closure, raises: [Defect].}
|
||||
|
||||
|
||||
# TODO: Remove after resolving nwaku #1026. Move it back to waku_store_queue.nim
|
||||
type
|
||||
@ -33,8 +48,29 @@ type
|
||||
|
||||
|
||||
# MessageStore interface
|
||||
method getMostRecentMessageTimestamp*(db: MessageStore): MessageStoreResult[Timestamp] {.base.} = discard
|
||||
|
||||
method getOldestMessageTimestamp*(db: MessageStore): MessageStoreResult[Timestamp] {.base.} = discard
|
||||
|
||||
method put*(db: MessageStore, cursor: Index, message: WakuMessage, pubsubTopic: string): MessageStoreResult[void] {.base.} = discard
|
||||
|
||||
|
||||
# TODO: Deprecate the following methods after after #1026
|
||||
method getAll*(db: MessageStore, onData: DataProc): MessageStoreResult[bool] {.base.} = discard
|
||||
method getPage*(db: MessageStore, pred: QueryFilterMatcher, pagingInfo: PagingInfo): MessageStoreResult[(seq[WakuMessage], PagingInfo, HistoryResponseError)] {.base.} = discard
|
||||
method getPage*(db: MessageStore, pagingInfo: PagingInfo): MessageStoreResult[(seq[WakuMessage], PagingInfo, HistoryResponseError)] {.base.} = discard
|
||||
|
||||
|
||||
# TODO: Move to sqlite store
|
||||
method getAllMessages(db: MessageStore): MessageStoreResult[seq[MessageStoreRow]] {.base.} = discard
|
||||
|
||||
method getMessagesByHistoryQuery*(
|
||||
db: MessageStore,
|
||||
contentTopic = none(seq[ContentTopic]),
|
||||
pubsubTopic = none(string),
|
||||
cursor = none(Index),
|
||||
startTime = none(Timestamp),
|
||||
endTime = none(Timestamp),
|
||||
maxPageSize = StoreMaxPageSize,
|
||||
ascendingOrder = true
|
||||
): MessageStoreResult[MessageStorePage] {.base.} = discard
|
||||
|
@ -21,23 +21,10 @@ logScope:
|
||||
topics = "message_store.storequeue"
|
||||
|
||||
|
||||
const
|
||||
MaxPageSize = uint64(100) # Maximum number of waku messages in each page
|
||||
|
||||
MaxTimeVariance = getNanoSecondTime(20) # 20 seconds maximum allowable sender timestamp "drift" into the future
|
||||
|
||||
type QueryHandlerFunc* = proc(response: HistoryResponse) {.gcsafe, closure.}
|
||||
|
||||
type
|
||||
QueryHandlerFunc* = proc(response: HistoryResponse) {.gcsafe, closure.}
|
||||
|
||||
QueryResult* = Result[uint64, string]
|
||||
|
||||
MessagesResult* = Result[seq[WakuMessage], string]
|
||||
|
||||
type
|
||||
StoreQueueResult*[T] = Result[T, cstring]
|
||||
|
||||
StoreQueueRef* = ref object
|
||||
StoreQueueRef* = ref object of MessageStore
|
||||
## Bounded repository for indexed messages
|
||||
##
|
||||
## The store queue will keep messages up to its
|
||||
@ -199,7 +186,7 @@ proc bwdPage(storeQueue: StoreQueueRef,
|
||||
lastValidCursor = startCursor.get()
|
||||
|
||||
let cursorEntry = w.rwdToCursor(startCursor.get())
|
||||
if cursorEntry.isErr:
|
||||
if cursorEntry.isErr():
|
||||
# Quick exit here if start cursor not found
|
||||
trace "Could not find starting cursor. Returning empty result.", startCursor=startCursor
|
||||
outSeq = @[]
|
||||
@ -222,8 +209,7 @@ proc bwdPage(storeQueue: StoreQueueRef,
|
||||
## 2. adds entries matching the predicate function to output page
|
||||
## 3. until either the beginning of the queue or maxPageSize is reached
|
||||
var numberOfItems = 0.uint
|
||||
while currentEntry.isOk and numberOfItems < maxPageSize:
|
||||
|
||||
while currentEntry.isOk() and numberOfItems < maxPageSize:
|
||||
trace "Continuing bwd page query", currentEntry=currentEntry, numberOfItems=numberOfItems
|
||||
|
||||
if pred(currentEntry.value.data):
|
||||
@ -247,72 +233,101 @@ proc bwdPage(storeQueue: StoreQueueRef,
|
||||
|
||||
|
||||
#### API
|
||||
|
||||
proc new*(T: type StoreQueueRef, capacity: int = StoreDefaultCapacity): T =
|
||||
var items = SortedSet[Index, IndexedWakuMessage].init()
|
||||
return StoreQueueRef(items: items, capacity: capacity)
|
||||
|
||||
|
||||
proc contains*(store: StoreQueueRef, index: Index): bool =
|
||||
## Return `true` if the store queue already contains the `index`, `false` otherwise.
|
||||
store.items.eq(index).isOk()
|
||||
|
||||
proc len*(store: StoreQueueRef): int {.noSideEffect.} =
|
||||
store.items.len
|
||||
|
||||
proc `$`*(store: StoreQueueRef): string =
|
||||
$store.items
|
||||
|
||||
|
||||
## --- SortedSet accessors ---
|
||||
|
||||
iterator fwdIterator*(storeQueue: StoreQueueRef): (Index, IndexedWakuMessage) =
|
||||
## Forward iterator over the entire store queue
|
||||
var
|
||||
w = SortedSetWalkRef[Index,IndexedWakuMessage].init(storeQueue.items)
|
||||
res = w.first
|
||||
while res.isOk:
|
||||
res = w.first()
|
||||
|
||||
while res.isOk():
|
||||
yield (res.value.key, res.value.data)
|
||||
res = w.next
|
||||
w.destroy
|
||||
res = w.next()
|
||||
|
||||
w.destroy()
|
||||
|
||||
iterator bwdIterator*(storeQueue: StoreQueueRef): (Index, IndexedWakuMessage) =
|
||||
## Backwards iterator over the entire store queue
|
||||
var
|
||||
w = SortedSetWalkRef[Index,IndexedWakuMessage].init(storeQueue.items)
|
||||
res = w.last
|
||||
while res.isOk:
|
||||
res = w.last()
|
||||
|
||||
while res.isOk():
|
||||
yield (res.value.key, res.value.data)
|
||||
res = w.prev
|
||||
w.destroy
|
||||
res = w.prev()
|
||||
|
||||
proc first*(storeQueue: StoreQueueRef): StoreQueueResult[IndexedWakuMessage] =
|
||||
w.destroy()
|
||||
|
||||
proc first*(store: StoreQueueRef): MessageStoreResult[IndexedWakuMessage] =
|
||||
var
|
||||
w = SortedSetWalkRef[Index,IndexedWakuMessage].init(storeQueue.items)
|
||||
res = w.first
|
||||
w.destroy
|
||||
w = SortedSetWalkRef[Index,IndexedWakuMessage].init(store.items)
|
||||
res = w.first()
|
||||
w.destroy()
|
||||
|
||||
if res.isOk:
|
||||
return ok(res.value.data)
|
||||
else:
|
||||
if res.isErr():
|
||||
return err("Not found")
|
||||
|
||||
proc last*(storeQueue: StoreQueueRef): StoreQueueResult[IndexedWakuMessage] =
|
||||
return ok(res.value.data)
|
||||
|
||||
proc last*(storeQueue: StoreQueueRef): MessageStoreResult[IndexedWakuMessage] =
|
||||
var
|
||||
w = SortedSetWalkRef[Index,IndexedWakuMessage].init(storeQueue.items)
|
||||
res = w.last
|
||||
w.destroy
|
||||
res = w.last()
|
||||
w.destroy()
|
||||
|
||||
if res.isOk:
|
||||
return ok(res.value.data)
|
||||
else:
|
||||
if res.isErr():
|
||||
return err("Not found")
|
||||
|
||||
return ok(res.value.data)
|
||||
|
||||
## --- Queue API ---
|
||||
|
||||
proc new*(T: type StoreQueueRef, capacity: int): T =
|
||||
var items = SortedSet[Index, IndexedWakuMessage].init()
|
||||
method getMostRecentMessageTimestamp*(store: StoreQueueRef): MessageStoreResult[Timestamp] =
|
||||
let message = ?store.last()
|
||||
ok(message.index.receiverTime)
|
||||
|
||||
return StoreQueueRef(items: items, capacity: capacity)
|
||||
method getOldestMessageTimestamp*(store: StoreQueueRef): MessageStoreResult[Timestamp] =
|
||||
let message = ?store.first()
|
||||
ok(message.index.receiverTime)
|
||||
|
||||
proc add*(storeQueue: StoreQueueRef, msg: IndexedWakuMessage): StoreQueueResult[void] =
|
||||
|
||||
proc add*(store: StoreQueueRef, msg: IndexedWakuMessage): MessageStoreResult[void] =
|
||||
## Add a message to the queue
|
||||
## If we're at capacity, we will be removing,
|
||||
## the oldest (first) item
|
||||
##
|
||||
## If we're at capacity, we will be removing, the oldest (first) item
|
||||
trace "adding item to store queue", msg=msg
|
||||
|
||||
# Ensure that messages don't "jump" to the front of the queue with future timestamps
|
||||
if msg.index.senderTime - msg.index.receiverTime > MaxTimeVariance:
|
||||
if msg.index.senderTime - msg.index.receiverTime > StoreMaxTimeVariance:
|
||||
return err("future_sender_timestamp")
|
||||
|
||||
trace "Adding item to store queue", msg=msg
|
||||
if store.contains(msg.index):
|
||||
trace "could not add item to store queue. Index already exists", index=msg.index
|
||||
return err("duplicate")
|
||||
|
||||
# TODO the below delete block can be removed if we convert to circular buffer
|
||||
if storeQueue.items.len >= storeQueue.capacity:
|
||||
|
||||
# TODO: the below delete block can be removed if we convert to circular buffer
|
||||
if store.items.len >= store.capacity:
|
||||
var
|
||||
w = SortedSetWalkRef[Index, IndexedWakuMessage].init(storeQueue.items)
|
||||
w = SortedSetWalkRef[Index, IndexedWakuMessage].init(store.items)
|
||||
firstItem = w.first
|
||||
|
||||
if cmp(msg.index, firstItem.value.key) < 0:
|
||||
@ -320,22 +335,18 @@ proc add*(storeQueue: StoreQueueRef, msg: IndexedWakuMessage): StoreQueueResult[
|
||||
w.destroy # Clean up walker
|
||||
return err("too_old")
|
||||
|
||||
discard storeQueue.items.delete(firstItem.value.key)
|
||||
discard store.items.delete(firstItem.value.key)
|
||||
w.destroy # better to destroy walker after a delete operation
|
||||
|
||||
let res = storeQueue.items.insert(msg.index)
|
||||
if res.isErr:
|
||||
# This indicates the index already exists in the storeQueue.
|
||||
# TODO: could return error result and log in metrics
|
||||
store.items.insert(msg.index).value.data = msg
|
||||
|
||||
trace "Could not add item to store queue. Index already exists.", index=msg.index
|
||||
return err("duplicate")
|
||||
else:
|
||||
res.value.data = msg
|
||||
|
||||
trace "Successfully added item to store queue.", msg=msg
|
||||
return ok()
|
||||
|
||||
method put*(store: StoreQueueRef, cursor: Index, message: WakuMessage, pubsubTopic: string): MessageStoreResult[void] =
|
||||
let message = IndexedWakuMessage(msg: message, index: cursor, pubsubTopic: pubsubTopic)
|
||||
store.add(message)
|
||||
|
||||
|
||||
proc getPage*(storeQueue: StoreQueueRef,
|
||||
pred: QueryFilterMatcher,
|
||||
pagingInfo: PagingInfo):
|
||||
@ -348,8 +359,8 @@ proc getPage*(storeQueue: StoreQueueRef,
|
||||
let
|
||||
cursorOpt = if pagingInfo.cursor == Index(): none(Index) ## TODO: pagingInfo.cursor should be an Option. We shouldn't rely on empty initialisation to determine if set or not!
|
||||
else: some(pagingInfo.cursor)
|
||||
maxPageSize = if pagingInfo.pageSize == 0 or pagingInfo.pageSize > MaxPageSize: MaxPageSize # Used default MaxPageSize for invalid pagingInfos
|
||||
else: pagingInfo.pageSize
|
||||
maxPageSize = if pagingInfo.pageSize <= 0: StoreMaxPageSize
|
||||
else: min(pagingInfo.pageSize, StoreMaxPageSize)
|
||||
|
||||
case pagingInfo.direction
|
||||
of PagingDirection.FORWARD:
|
||||
@ -367,15 +378,55 @@ proc getPage*(storeQueue: StoreQueueRef,
|
||||
|
||||
return getPage(storeQueue, predicate, pagingInfo)
|
||||
|
||||
proc contains*(storeQueue: StoreQueueRef, index: Index): bool =
|
||||
## Return `true` if the store queue already contains the `index`,
|
||||
## `false` otherwise
|
||||
let res = storeQueue.items.eq(index)
|
||||
|
||||
return res.isOk()
|
||||
method getMessagesByHistoryQuery*(
|
||||
store: StoreQueueRef,
|
||||
contentTopic = none(seq[ContentTopic]),
|
||||
pubsubTopic = none(string),
|
||||
cursor = none(Index),
|
||||
startTime = none(Timestamp),
|
||||
endTime = none(Timestamp),
|
||||
maxPageSize = StoreMaxPageSize,
|
||||
ascendingOrder = true
|
||||
): MessageStoreResult[MessageStorePage] =
|
||||
|
||||
proc len*(storeQueue: StoreQueueRef): int {.noSideEffect.} =
|
||||
storeQueue.items.len
|
||||
proc matchesQuery(indMsg: IndexedWakuMessage): bool =
|
||||
trace "Matching indexed message against predicate", msg=indMsg
|
||||
|
||||
proc `$`*(storeQueue: StoreQueueRef): string =
|
||||
$(storeQueue.items)
|
||||
if pubsubTopic.isSome():
|
||||
# filter by pubsub topic
|
||||
if indMsg.pubsubTopic != pubsubTopic.get():
|
||||
trace "Failed to match pubsub topic", criteria=pubsubTopic.get(), actual=indMsg.pubsubTopic
|
||||
return false
|
||||
|
||||
if startTime.isSome() and endTime.isSome():
|
||||
# temporal filtering: select only messages whose sender generated timestamps fall
|
||||
# between the queried start time and end time
|
||||
if indMsg.msg.timestamp > endTime.get() or indMsg.msg.timestamp < startTime.get():
|
||||
trace "Failed to match temporal filter", criteriaStart=startTime.get(), criteriaEnd=endTime.get(), actual=indMsg.msg.timestamp
|
||||
return false
|
||||
|
||||
if contentTopic.isSome():
|
||||
# filter by content topic
|
||||
if indMsg.msg.contentTopic notin contentTopic.get():
|
||||
trace "Failed to match content topic", criteria=contentTopic.get(), actual=indMsg.msg.contentTopic
|
||||
return false
|
||||
|
||||
return true
|
||||
|
||||
|
||||
let queryPagingInfo = PagingInfo(
|
||||
pageSize: maxPageSize,
|
||||
cursor: cursor.get(Index()),
|
||||
direction: if ascendingOrder: PagingDirection.FORWARD
|
||||
else: PagingDirection.BACKWARD
|
||||
)
|
||||
let (messages, pagingInfo, error) = store.getPage(matchesQuery, queryPagingInfo)
|
||||
|
||||
if error == HistoryResponseError.INVALID_CURSOR:
|
||||
return err("invalid cursor")
|
||||
|
||||
if messages.len == 0:
|
||||
return ok((messages, none(PagingInfo)))
|
||||
|
||||
ok((messages, some(pagingInfo)))
|
||||
|
@ -38,14 +38,14 @@ logScope:
|
||||
|
||||
const
|
||||
# Constants required for pagination -------------------------------------------
|
||||
MaxPageSize* = uint64(100) # Maximum number of waku messages in each page
|
||||
MaxPageSize* = StoreMaxPageSize
|
||||
|
||||
# TODO the DefaultPageSize can be changed, it's current value is random
|
||||
DefaultPageSize* = uint64(20) # A recommended default number of waku messages per page
|
||||
|
||||
MaxRpcSize* = MaxPageSize * MaxWakuMessageSize + 64*1024 # We add a 64kB safety buffer for protocol overhead
|
||||
MaxRpcSize* = StoreMaxPageSize * MaxWakuMessageSize + 64*1024 # We add a 64kB safety buffer for protocol overhead
|
||||
|
||||
MaxTimeVariance* = getNanoSecondTime(20) # 20 seconds maximum allowable sender timestamp "drift" into the future
|
||||
MaxTimeVariance* = StoreMaxTimeVariance
|
||||
|
||||
DefaultTopic* = "/waku/2/default-waku/proto"
|
||||
|
||||
@ -60,6 +60,8 @@ const
|
||||
decodeRpcFailure = "decode_rpc_failure"
|
||||
|
||||
type
|
||||
WakuStoreResult*[T] = Result[T, string]
|
||||
|
||||
WakuStore* = ref object of LPProtocol
|
||||
peerManager*: PeerManager
|
||||
rng*: ref BrHmacDrbgContext
|
||||
@ -275,7 +277,10 @@ proc query*(w: WakuStore, query: HistoryQuery, handler: QueryHandlerFunc) {.asyn
|
||||
waku_store_messages.set(response.value.response.messages.len.int64, labelValues = ["retrieved"])
|
||||
handler(response.value.response)
|
||||
|
||||
proc queryFrom*(w: WakuStore, query: HistoryQuery, handler: QueryHandlerFunc, peer: RemotePeerInfo): Future[QueryResult] {.async, gcsafe.} =
|
||||
|
||||
## 21/WAKU2-FAULT-TOLERANT-STORE
|
||||
|
||||
proc queryFrom*(w: WakuStore, query: HistoryQuery, handler: QueryHandlerFunc, peer: RemotePeerInfo): Future[WakuStoreResult[uint64]] {.async, gcsafe.} =
|
||||
## sends the query to the given peer
|
||||
## returns the number of retrieved messages if no error occurs, otherwise returns the error string
|
||||
# TODO dialPeer add it to the list of known peers, while it does not cause any issue but might be unnecessary
|
||||
@ -304,7 +309,7 @@ proc queryFrom*(w: WakuStore, query: HistoryQuery, handler: QueryHandlerFunc, pe
|
||||
handler(response.value.response)
|
||||
return ok(response.value.response.messages.len.uint64)
|
||||
|
||||
proc queryFromWithPaging*(w: WakuStore, query: HistoryQuery, peer: RemotePeerInfo): Future[MessagesResult] {.async, gcsafe.} =
|
||||
proc queryFromWithPaging*(w: WakuStore, query: HistoryQuery, peer: RemotePeerInfo): Future[WakuStoreResult[seq[WakuMessage]]] {.async, gcsafe.} =
|
||||
## a thin wrapper for queryFrom
|
||||
## sends the query to the given peer
|
||||
## when the query has a valid pagingInfo, it retrieves the historical messages in pages
|
||||
@ -336,17 +341,17 @@ proc queryFromWithPaging*(w: WakuStore, query: HistoryQuery, peer: RemotePeerInf
|
||||
|
||||
return ok(messageList)
|
||||
|
||||
proc queryLoop(w: WakuStore, query: HistoryQuery, candidateList: seq[RemotePeerInfo]): Future[MessagesResult] {.async, gcsafe.} =
|
||||
proc queryLoop(w: WakuStore, query: HistoryQuery, candidateList: seq[RemotePeerInfo]): Future[WakuStoreResult[seq[WakuMessage]]] {.async, gcsafe.} =
|
||||
## loops through the candidateList in order and sends the query to each
|
||||
## once all responses have been received, the retrieved messages are consolidated into one deduplicated list
|
||||
## if no messages have been retrieved, the returned future will resolve into a MessagesResult result holding an empty seq.
|
||||
var futureList: seq[Future[MessagesResult]]
|
||||
var futureList: seq[Future[WakuStoreResult[seq[WakuMessage]]]]
|
||||
for peer in candidateList.items:
|
||||
futureList.add(w.queryFromWithPaging(query, peer))
|
||||
await allFutures(futureList) # all(), which returns a Future[seq[T]], has been deprecated
|
||||
|
||||
let messagesList = futureList
|
||||
.map(proc (fut: Future[MessagesResult]): seq[WakuMessage] =
|
||||
.map(proc (fut: Future[WakuStoreResult[seq[WakuMessage]]]): seq[WakuMessage] =
|
||||
if fut.completed() and fut.read().isOk(): # completed() just as a sanity check. These futures have been awaited before using allFutures()
|
||||
fut.read().value
|
||||
else:
|
||||
@ -360,7 +365,7 @@ proc queryLoop(w: WakuStore, query: HistoryQuery, candidateList: seq[RemotePeerI
|
||||
debug "failed to resolve the query"
|
||||
return err("failed to resolve the query")
|
||||
|
||||
proc resume*(ws: WakuStore, peerList: Option[seq[RemotePeerInfo]] = none(seq[RemotePeerInfo]), pageSize: uint64 = DefaultPageSize): Future[QueryResult] {.async, gcsafe.} =
|
||||
proc resume*(ws: WakuStore, peerList: Option[seq[RemotePeerInfo]] = none(seq[RemotePeerInfo]), pageSize: uint64 = DefaultPageSize): Future[WakuStoreResult[uint64]] {.async, gcsafe.} =
|
||||
## resume proc retrieves the history of waku messages published on the default waku pubsub topic since the last time the waku store node has been online
|
||||
## messages are stored in the store node's messages field and in the message db
|
||||
## the offline time window is measured as the difference between the current time and the timestamp of the most recent persisted waku message
|
||||
@ -457,6 +462,7 @@ proc resume*(ws: WakuStore, peerList: Option[seq[RemotePeerInfo]] = none(seq[Rem
|
||||
save(successResult.value)
|
||||
return ok(added)
|
||||
|
||||
|
||||
# NOTE: Experimental, maybe incorporate as part of query call
|
||||
proc queryWithAccounting*(ws: WakuStore, query: HistoryQuery, handler: QueryHandlerFunc) {.async, gcsafe.} =
|
||||
# @TODO We need to be more stratigic about which peers we dial. Right now we just set one on the service.
|
||||
@ -503,7 +509,6 @@ proc queryWithAccounting*(ws: WakuStore, query: HistoryQuery, handler: QueryHand
|
||||
handler(response.value.response)
|
||||
|
||||
|
||||
|
||||
# TODO: Remove the following deprecated method
|
||||
proc computeIndex*(msg: WakuMessage,
|
||||
receivedTime = getNanosecondTime(getTime().toUnixFloat()),
|
||||
|
Loading…
x
Reference in New Issue
Block a user