refactor(waku-store): major code reorganization, move StoreQueue to message_store folder

This commit is contained in:
Lorenzo Delgado 2022-07-25 13:01:37 +02:00 committed by Lorenzo Delgado
parent a54e25972f
commit 888f7cb312
29 changed files with 471 additions and 406 deletions

View File

@ -22,7 +22,8 @@ import libp2p/[switch, # manage transports, a single entry poi
nameresolving/dnsresolver,# define DNS resolution
muxers/muxer] # define an interface for stream multiplexing, allowing peers to offer many protocols over a single connection
import ../../waku/v2/node/[wakunode2, waku_payload],
../../waku/v2/node/./dnsdisc/waku_dnsdisc,
../../waku/v2/node/dnsdisc/waku_dnsdisc,
../../waku/v2/protocol/waku_store,
../../waku/v2/utils/[peers,time],
../../waku/common/utils/nat,
./config_chat2

View File

@ -21,7 +21,7 @@ import
admin_api,
private_api],
../../waku/v2/protocol/waku_relay,
../../waku/v2/protocol/waku_store/[waku_store, waku_store_types],
../../waku/v2/protocol/waku_store,
../../waku/v2/protocol/waku_swap/waku_swap,
../../waku/v2/protocol/waku_filter/waku_filter,
../../waku/v2/utils/peers,

View File

@ -6,9 +6,12 @@ import
sqlite3_abi,
stew/byteutils,
../../waku/v2/node/storage/message/waku_message_store,
../../waku/v2/node/storage/message/waku_store_queue,
../../waku/v2/node/storage/sqlite,
../../waku/v2/protocol/waku_store/waku_store,
../../waku/v2/protocol/waku_message,
../../waku/v2/protocol/waku_store,
../../waku/v2/utils/time,
../../waku/v2/utils/pagination,
./utils

View File

@ -2,13 +2,15 @@
import
std/[sequtils, options],
stew/shims/net,
testutils/unittests,
chronicles,
chronos,
libp2p/crypto/crypto,
libp2p/protocols/pubsub/gossipsub,
stew/shims/net,
testutils/unittests,
libp2p/protocols/pubsub/gossipsub
import
../../waku/v2/node/wakunode2,
../../waku/v2/utils/peers,
../test_helpers
procSuite "Peer Exchange":

View File

@ -16,7 +16,7 @@ import
../../waku/common/wakubridge,
../../waku/v1/protocol/waku_protocol,
../../waku/v2/protocol/waku_message,
../../waku/v2/protocol/waku_store/waku_store,
../../waku/v2/protocol/waku_store,
../../waku/v2/protocol/waku_filter/waku_filter,
../../waku/v2/node/[wakunode2, waku_payload],
../../waku/v2/utils/peers,

View File

@ -2,14 +2,16 @@
import
std/[sequtils, tables],
chronicles,
chronos,
testutils/unittests,
stew/shims/net,
stew/[base32, results],
chronicles,
chronos,
libp2p/crypto/crypto,
eth/keys,
discovery/dnsdisc/builder,
discovery/dnsdisc/builder
import
../../waku/v2/node/peer_manager/peer_manager,
../../waku/v2/node/dnsdisc/waku_dnsdisc,
../../waku/v2/node/wakunode2,
../test_helpers

View File

@ -1,10 +1,16 @@
{.used.}
import
std/[options, sequtils],
testutils/unittests, nimcrypto/sha2,
libp2p/protobuf/minprotobuf,
../../waku/v2/protocol/waku_store/waku_store,
../../waku/v2/utils/time
testutils/unittests,
nimcrypto/sha2,
libp2p/protobuf/minprotobuf
import
../../waku/v2/node/storage/message/waku_store_queue,
../../waku/v2/protocol/waku_store,
../../waku/v2/protocol/waku_message,
../../waku/v2/utils/time,
../../waku/v2/utils/pagination
proc createSampleStoreQueue(s: int): StoreQueueRef =

View File

@ -2,18 +2,24 @@
import
std/[options, tables, sets, sequtils],
testutils/unittests, chronos, chronicles,
chronos,
chronicles,
testutils/unittests,
libp2p/switch,
libp2p/protobuf/minprotobuf,
libp2p/stream/[bufferstream, connection],
libp2p/crypto/crypto,
libp2p/protocols/pubsub/rpc/message,
libp2p/protocols/pubsub/rpc/message
import
../../waku/v2/protocol/waku_message,
../../waku/v2/protocol/waku_store/waku_store,
../../waku/v2/protocol/waku_store,
../../waku/v2/node/storage/message/waku_message_store,
../../waku/v2/node/storage/message/waku_store_queue,
../../waku/v2/node/peer_manager/peer_manager,
../../waku/v2/utils/pagination,
../../waku/v2/utils/time,
../test_helpers, ./utils
../test_helpers,
./utils
procSuite "Waku Store":
const defaultContentTopic = ContentTopic("1")

View File

@ -2,9 +2,16 @@
import
std/[sequtils, strutils],
stew/results,
testutils/unittests,
../../waku/v2/protocol/waku_store/waku_store_types,
../../waku/v2/utils/time
nimcrypto/hash
import
../../waku/v2/node/storage/message/waku_store_queue,
../../waku/v2/protocol/waku_message,
../../waku/v2/protocol/waku_store,
../../waku/v2/utils/time,
../../waku/v2/utils/pagination
procSuite "Sorted store queue":

View File

@ -11,7 +11,7 @@ import
libp2p/switch,
eth/keys,
../../waku/v2/protocol/waku_message,
../../waku/v2/protocol/waku_store/waku_store,
../../waku/v2/protocol/waku_store,
../../waku/v2/protocol/waku_swap/waku_swap,
../../waku/v2/node/wakunode2,
../../waku/v2/utils/peers,

View File

@ -15,8 +15,9 @@ import
eth/keys,
../../waku/v2/node/storage/sqlite,
../../waku/v2/node/storage/message/waku_message_store,
../../waku/v2/node/storage/message/waku_store_queue,
../../waku/v2/protocol/[waku_relay, waku_message],
../../waku/v2/protocol/waku_store/waku_store,
../../waku/v2/protocol/waku_store,
../../waku/v2/protocol/waku_filter/waku_filter,
../../waku/v2/protocol/waku_lightpush/waku_lightpush,
../../waku/v2/node/peer_manager/peer_manager,

View File

@ -4,9 +4,10 @@ import
std/[options, json],
eth/keys,
../../../v1/node/rpc/hexstrings,
../../protocol/waku_store/waku_store_types,
../../protocol/waku_store,
../../protocol/waku_message,
../../utils/time,
../../utils/pagination,
../waku_payload,
./jsonrpc_types

View File

@ -3,10 +3,14 @@
import
std/options,
chronicles,
json_rpc/rpcserver,
json_rpc/rpcserver
import
../wakunode2,
../../protocol/waku_store,
../../utils/time,
./jsonrpc_types, ./jsonrpc_utils
../../utils/pagination,
./jsonrpc_types,
./jsonrpc_utils
export jsonrpc_types

View File

@ -1,16 +1,13 @@
# Group by std, external then internal imports
import
# std imports
std/ [os, strutils, times, options], #options as what # TODO: Huh? Redefinition?
# external imports
std/[os, strutils, times, options], #options as what # TODO: Huh? Redefinition?
chronicles,
eth/common as eth_common,
eth/keys,
json_rpc/[rpcclient, rpcserver],
libp2p/protobuf/minprotobuf,
# internal imports
libp2p/protobuf/minprotobuf
import
../protocol/waku_filter/waku_filter_types,
../protocol/waku_store/waku_store_types,
../protocol/waku_store,
../protocol/waku_message,
../utils/time,
./wakunode2,

View File

@ -9,7 +9,7 @@ import
../waku_payload,
../jsonrpc/jsonrpc_types,
../../protocol/waku_filter/waku_filter_types,
../../protocol/waku_store/waku_store_types,
../../protocol/waku_store,
../../../v1/node/rpc/hexstrings
from strutils import rsplit

View File

@ -9,7 +9,7 @@ import
../waku_payload,
../jsonrpc/jsonrpc_types,
../../protocol/waku_filter/waku_filter_types,
../../protocol/waku_store/waku_store_types,
../../protocol/waku_store,
../../../v1/node/rpc/hexstrings
from strutils import rsplit

View File

@ -9,7 +9,7 @@ import
../waku_payload,
../jsonrpc/jsonrpc_types,
../../protocol/waku_filter/waku_filter_types,
../../protocol/waku_store/waku_store_types,
../../protocol/waku_store,
../../../v1/node/rpc/hexstrings
from strutils import rsplit

View File

@ -8,7 +8,7 @@ import
../waku_payload,
../jsonrpc/jsonrpc_types,
../../protocol/waku_filter/waku_filter_types,
../../protocol/waku_store/waku_store_types,
../../protocol/waku_store,
../../../v1/node/rpc/hexstrings
from strutils import rsplit

View File

@ -9,7 +9,7 @@ import
../waku_payload,
../jsonrpc/jsonrpc_types,
../../protocol/waku_filter/waku_filter_types,
../../protocol/waku_store/waku_store_types,
../../protocol/waku_store,
../../../v1/node/rpc/hexstrings
from strutils import rsplit

View File

@ -1,16 +1,16 @@
## This module defines a message store interface. Implementations of
## MessageStore are used by the `WakuStore` protocol to store and
## retrieve historical messages
{.push raises: [Defect].}
import
std/options,
stew/results,
../../../protocol/waku_message,
../../../protocol/waku_store/waku_store_types,
../../../protocol/waku_store/rpc,
../../../utils/time,
../../../utils/pagination
## This module defines a message store interface. Implementations of
## MessageStore are used by the `WakuStore` protocol to store and
## retrieve historical messages
type
DataProc* = proc(receiverTimestamp: Timestamp, msg: WakuMessage, pubsubTopic: string) {.closure, raises: [Defect].}
@ -19,6 +19,19 @@ type
MessageStore* = ref object of RootObj
# TODO: Remove after resolving nwaku #1026. Move it back to waku_store_queue.nim
type
IndexedWakuMessage* = object
# TODO may need to rename this object as it holds both the index and the pubsub topic of a waku message
## This type is used to encapsulate a WakuMessage and its Index
msg*: WakuMessage
index*: Index
pubsubTopic*: string
QueryFilterMatcher* = proc(indexedWakuMsg: IndexedWakuMessage) : bool {.gcsafe, closure.}
# MessageStore interface
method put*(db: MessageStore, cursor: Index, message: WakuMessage, pubsubTopic: string): MessageStoreResult[void] {.base.} = discard
method getAll*(db: MessageStore, onData: DataProc): MessageStoreResult[bool] {.base.} = discard

View File

@ -2,16 +2,18 @@
import
std/[options, tables, times],
sqlite3_abi,
stew/[byteutils, results],
chronicles,
chronos,
./message_store,
../sqlite,
chronicles,
sqlite3_abi
import
../../../protocol/waku_message,
../../../protocol/waku_store/waku_store,
../../../protocol/waku_store,
../../../utils/pagination,
../../../utils/time
../../../utils/time,
../sqlite,
./message_store,
./waku_store_queue
export sqlite

View File

@ -1,95 +1,42 @@
## Types for waku_store protocol.
{.push raises: [Defect].}
# Group by std, external then internal imports
import
std/[algorithm, options],
# external imports
bearssl,
chronicles,
libp2p/protocols/protocol,
std/[options, algorithm],
stew/[results, sorted_set],
# internal imports
../../utils/pagination,
../../utils/time,
../../node/peer_manager/peer_manager,
../waku_swap/waku_swap_types,
../waku_message
chronicles
import
../../../protocol/waku_message,
../../../protocol/waku_store/rpc,
../../../utils/pagination,
../../../utils/time,
./message_store
# TODO: Remove after resolving nwaku #1026
export
message_store
logScope:
topics = "message_store.storequeue"
# export all modules whose types are used in public functions/types
export
bearssl,
results,
peer_manager,
waku_swap_types,
waku_message,
pagination
const
# Constants required for pagination -------------------------------------------
MaxPageSize* = uint64(100) # Maximum number of waku messages in each page
# TODO the DefaultPageSize can be changed, it's current value is random
DefaultPageSize* = uint64(20) # A recommended default number of waku messages per page
MaxRpcSize* = MaxPageSize * MaxWakuMessageSize + 64*1024 # We add a 64kB safety buffer for protocol overhead
MaxTimeVariance* = getNanoSecondTime(20) # 20 seconds maximum allowable sender timestamp "drift" into the future
DefaultTopic* = "/waku/2/default-waku/proto"
MaxPageSize = uint64(100) # Maximum number of waku messages in each page
MaxTimeVariance = getNanoSecondTime(20) # 20 seconds maximum allowable sender timestamp "drift" into the future
type
HistoryContentFilter* = object
contentTopic*: ContentTopic
QueryHandlerFunc* = proc(response: HistoryResponse) {.gcsafe, closure.}
QueryFilterMatcher* = proc(indexedWakuMsg: IndexedWakuMessage) : bool {.gcsafe, closure.}
IndexedWakuMessage* = object
# TODO may need to rename this object as it holds both the index and the pubsub topic of a waku message
## This type is used to encapsulate a WakuMessage and its Index
msg*: WakuMessage
index*: Index
pubsubTopic*: string
PagingDirection* {.pure.} = enum
## PagingDirection determines the direction of pagination
BACKWARD = uint32(0)
FORWARD = uint32(1)
PagingInfo* = object
## This type holds the information needed for the pagination
pageSize*: uint64
cursor*: Index
direction*: PagingDirection
HistoryResponseError* {.pure.} = enum
## HistoryResponseError contains error message to inform the querying node about the state of its request
NONE = uint32(0)
INVALID_CURSOR = uint32(1)
HistoryQuery* = object
contentFilters*: seq[HistoryContentFilter]
pubsubTopic*: string
pagingInfo*: PagingInfo # used for pagination
startTime*: Timestamp # used for time-window query
endTime*: Timestamp # used for time-window query
HistoryResponse* = object
messages*: seq[WakuMessage]
pagingInfo*: PagingInfo # used for pagination
error*: HistoryResponseError
HistoryRPC* = object
requestId*: string
query*: HistoryQuery
response*: HistoryResponse
QueryResult* = Result[uint64, string]
MessagesResult* = Result[seq[WakuMessage], string]
type
StoreQueueResult*[T] = Result[T, cstring]
StoreQueueRef* = ref object
## Bounded repository for indexed messages
##
@ -105,15 +52,8 @@ type
items: SortedSet[Index, IndexedWakuMessage] # sorted set of stored messages
capacity: int # Maximum amount of messages to keep
StoreQueueResult*[T] = Result[T, cstring]
######################
# StoreQueue helpers #
######################
logScope:
topics = "wakustorequeue"
### Helpers
proc ffdToCursor(w: SortedSetWalkRef[Index, IndexedWakuMessage],
startCursor: Index):
@ -305,10 +245,8 @@ proc bwdPage(storeQueue: StoreQueueRef,
outPagingInfo,
outError)
##################
# StoreQueue API #
##################
#### API
## --- SortedSet accessors ---
iterator fwdIterator*(storeQueue: StoreQueueRef): (Index, IndexedWakuMessage) =

View File

@ -13,18 +13,21 @@ import
libp2p/protocols/pubsub/[gossipsub, rpc/messages],
libp2p/nameresolving/nameresolver,
libp2p/[builders, multihash],
libp2p/transports/[transport, tcptransport, wstransport],
libp2p/transports/[transport, tcptransport, wstransport]
import
../node/storage/message/waku_store_queue,
../protocol/[waku_relay, waku_message],
../protocol/waku_store/waku_store,
../protocol/waku_store,
../protocol/waku_swap/waku_swap,
../protocol/waku_filter/waku_filter,
../protocol/waku_lightpush/waku_lightpush,
../protocol/waku_rln_relay/[waku_rln_relay_types],
../utils/[peers, requests, wakuswitch, wakuenr],
./peer_manager/peer_manager,
./storage/message/message_store,
./dnsdisc/waku_dnsdisc,
./discv5/waku_discv5,
wakunode2_types
./wakunode2_types
export
builders,

View File

@ -3,7 +3,7 @@ import
libp2p/crypto/crypto,
libp2p/protocols/ping,
../protocol/waku_relay,
../protocol/waku_store/waku_store,
../protocol/waku_store,
../protocol/waku_swap/waku_swap,
../protocol/waku_filter/waku_filter,
../protocol/waku_lightpush/waku_lightpush,

View File

@ -0,0 +1,9 @@
import
./waku_store/protocol,
./waku_store/rpc,
./waku_store/rpc_codec
export
protocol,
rpc,
rpc_codec

View File

@ -1,42 +1,31 @@
## Waku Store protocol for historical messaging support.
## See spec for more details:
## https://github.com/vacp2p/specs/blob/master/specs/waku/v2/waku-store.md
{.push raises: [Defect].}
# Group by std, external then internal imports
import
# std imports
std/[tables, times, sequtils, options, math],
# external imports
bearssl,
stew/[results, byteutils],
chronicles,
chronos,
bearssl,
libp2p/crypto/crypto,
libp2p/protocols/protocol,
libp2p/protobuf/minprotobuf,
libp2p/stream/connection,
libp2p/varint,
metrics,
stew/[results, byteutils],
# internal imports
metrics
import
../../node/storage/message/message_store,
../../node/storage/message/waku_store_queue,
../../node/peer_manager/peer_manager,
../../utils/protobuf,
../../utils/requests,
../../utils/time,
../../utils/pagination,
../../utils/requests,
../waku_message,
../waku_swap/waku_swap,
./waku_store_types
./rpc,
./rpc_codec
# export all modules whose types are used in public functions/types
export
options,
chronos,
bearssl,
minprotobuf,
peer_manager,
waku_store_types,
message_store
declarePublicGauge waku_store_messages, "number of historical messages", ["type"]
declarePublicGauge waku_store_peers, "number of store peers"
@ -46,18 +35,30 @@ declarePublicGauge waku_store_queries, "number of store queries received"
logScope:
topics = "wakustore"
const
# Constants required for pagination -------------------------------------------
MaxPageSize* = uint64(100) # Maximum number of waku messages in each page
# TODO the DefaultPageSize can be changed, it's current value is random
DefaultPageSize* = uint64(20) # A recommended default number of waku messages per page
MaxRpcSize* = MaxPageSize * MaxWakuMessageSize + 64*1024 # We add a 64kB safety buffer for protocol overhead
MaxTimeVariance* = getNanoSecondTime(20) # 20 seconds maximum allowable sender timestamp "drift" into the future
DefaultTopic* = "/waku/2/default-waku/proto"
const
WakuStoreCodec* = "/vac/waku/store/2.0.0-beta4"
DefaultStoreCapacity* = 50000 # Default maximum of 50k messages stored
DefaultStoreCapacity* = 50_000 # Default maximum of 50k messages stored
# Error types (metric label values)
const
dialFailure = "dial_failure"
decodeRpcFailure = "decode_rpc_failure"
# TODO Move serialization function to separate file, too noisy
# TODO Move pagination to separate file, self-contained logic
type
WakuStore* = ref object of LPProtocol
peerManager*: PeerManager
@ -70,230 +71,6 @@ type
isSqliteOnly: bool # if true, don't use in memory-store and answer history queries from the sqlite DB
proc computeIndex*(msg: WakuMessage,
receivedTime = getNanosecondTime(getTime().toUnixFloat()),
pubsubTopic = DefaultTopic): Index =
## Takes a WakuMessage with received timestamp and returns its Index.
## Received timestamp will default to system time if not provided.
var ctx: sha256
ctx.init()
ctx.update(msg.contentTopic.toBytes()) # converts the contentTopic to bytes
ctx.update(msg.payload)
let digest = ctx.finish() # computes the hash
ctx.clear()
let
receiverTime = receivedTime
index = Index(digest:digest,
receiverTime: receiverTime,
senderTime: msg.timestamp,
pubsubTopic: pubsubTopic)
return index
proc encode*(index: Index): ProtoBuffer =
## encodes an Index object into a ProtoBuffer
## returns the resultant ProtoBuffer
# intiate a ProtoBuffer
var output = initProtoBuffer()
# encodes index
output.write3(1, index.digest.data)
output.write3(2, zint64(index.receiverTime))
output.write3(3, zint64(index.senderTime))
output.write3(4, index.pubsubTopic)
output.finish3()
return output
proc encode*(pinfo: PagingInfo): ProtoBuffer =
## encodes a PagingInfo object into a ProtoBuffer
## returns the resultant ProtoBuffer
# intiate a ProtoBuffer
var output = initProtoBuffer()
# encodes pinfo
output.write3(1, pinfo.pageSize)
output.write3(2, pinfo.cursor.encode())
output.write3(3, uint32(ord(pinfo.direction)))
output.finish3()
return output
proc init*(T: type Index, buffer: seq[byte]): ProtoResult[T] =
## creates and returns an Index object out of buffer
var index = Index()
let pb = initProtoBuffer(buffer)
var data: seq[byte]
discard ? pb.getField(1, data)
# create digest from data
index.digest = MDigest[256]()
for count, b in data:
index.digest.data[count] = b
# read the timestamp
var receiverTime: zint64
discard ? pb.getField(2, receiverTime)
index.receiverTime = Timestamp(receiverTime)
# read the timestamp
var senderTime: zint64
discard ? pb.getField(3, senderTime)
index.senderTime = Timestamp(senderTime)
# read the pubsubTopic
discard ? pb.getField(4, index.pubsubTopic)
return ok(index)
proc init*(T: type PagingInfo, buffer: seq[byte]): ProtoResult[T] =
## creates and returns a PagingInfo object out of buffer
var pagingInfo = PagingInfo()
let pb = initProtoBuffer(buffer)
var pageSize: uint64
discard ? pb.getField(1, pageSize)
pagingInfo.pageSize = pageSize
var cursorBuffer: seq[byte]
discard ? pb.getField(2, cursorBuffer)
pagingInfo.cursor = ? Index.init(cursorBuffer)
var direction: uint32
discard ? pb.getField(3, direction)
pagingInfo.direction = PagingDirection(direction)
return ok(pagingInfo)
proc init*(T: type HistoryContentFilter, buffer: seq[byte]): ProtoResult[T] =
let pb = initProtoBuffer(buffer)
# ContentTopic corresponds to the contentTopic field of waku message (not to be confused with pubsub topic)
var contentTopic: ContentTopic
discard ? pb.getField(1, contentTopic)
ok(HistoryContentFilter(contentTopic: contentTopic))
proc init*(T: type HistoryQuery, buffer: seq[byte]): ProtoResult[T] =
var msg = HistoryQuery()
let pb = initProtoBuffer(buffer)
discard ? pb.getField(2, msg.pubsubTopic)
var buffs: seq[seq[byte]]
discard ? pb.getRepeatedField(3, buffs)
for buf in buffs:
msg.contentFilters.add(? HistoryContentFilter.init(buf))
var pagingInfoBuffer: seq[byte]
discard ? pb.getField(4, pagingInfoBuffer)
msg.pagingInfo = ? PagingInfo.init(pagingInfoBuffer)
var startTime: zint64
discard ? pb.getField(5, startTime)
msg.startTime = Timestamp(startTime)
var endTime: zint64
discard ? pb.getField(6, endTime)
msg.endTime = Timestamp(endTime)
return ok(msg)
proc init*(T: type HistoryResponse, buffer: seq[byte]): ProtoResult[T] =
var msg = HistoryResponse()
let pb = initProtoBuffer(buffer)
var messages: seq[seq[byte]]
discard ? pb.getRepeatedField(2, messages)
for buf in messages:
msg.messages.add(? WakuMessage.init(buf))
var pagingInfoBuffer: seq[byte]
discard ? pb.getField(3, pagingInfoBuffer)
msg.pagingInfo= ? PagingInfo.init(pagingInfoBuffer)
var error: uint32
discard ? pb.getField(4, error)
msg.error = HistoryResponseError(error)
return ok(msg)
proc init*(T: type HistoryRPC, buffer: seq[byte]): ProtoResult[T] =
var rpc = HistoryRPC()
let pb = initProtoBuffer(buffer)
discard ? pb.getField(1, rpc.requestId)
var queryBuffer: seq[byte]
discard ? pb.getField(2, queryBuffer)
rpc.query = ? HistoryQuery.init(queryBuffer)
var responseBuffer: seq[byte]
discard ? pb.getField(3, responseBuffer)
rpc.response = ? HistoryResponse.init(responseBuffer)
return ok(rpc)
proc encode*(filter: HistoryContentFilter): ProtoBuffer =
var output = initProtoBuffer()
output.write3(1, filter.contentTopic)
output.finish3()
return output
proc encode*(query: HistoryQuery): ProtoBuffer =
var output = initProtoBuffer()
output.write3(2, query.pubsubTopic)
for filter in query.contentFilters:
output.write3(3, filter.encode())
output.write3(4, query.pagingInfo.encode())
output.write3(5, zint64(query.startTime))
output.write3(6, zint64(query.endTime))
output.finish3()
return output
proc encode*(response: HistoryResponse): ProtoBuffer =
var output = initProtoBuffer()
for msg in response.messages:
output.write3(2, msg.encode())
output.write3(3, response.pagingInfo.encode())
output.write3(4, uint32(ord(response.error)))
output.finish3()
return output
proc encode*(rpc: HistoryRPC): ProtoBuffer =
var output = initProtoBuffer()
output.write3(1, rpc.requestId)
output.write3(2, rpc.query.encode())
output.write3(3, rpc.response.encode())
output.finish3()
return output
proc findMessages(w: WakuStore, query: HistoryQuery): HistoryResponse {.gcsafe.} =
## Query history to return a single page of messages matching the query
@ -397,7 +174,11 @@ proc init*(ws: WakuStore, capacity = DefaultStoreCapacity) =
proc onData(receiverTime: Timestamp, msg: WakuMessage, pubsubTopic: string) =
# TODO index should not be recalculated
discard ws.messages.add(IndexedWakuMessage(msg: msg, index: msg.computeIndex(receiverTime, pubsubTopic), pubsubTopic: pubsubTopic))
discard ws.messages.add(IndexedWakuMessage(
msg: msg,
index: Index.compute(msg, receiverTime, pubsubTopic),
pubsubTopic: pubsubTopic
))
info "attempting to load messages from persistent storage"
@ -411,7 +192,6 @@ proc init*(ws: WakuStore, capacity = DefaultStoreCapacity) =
debug "the number of messages in the memory", messageNum=ws.messages.len
waku_store_messages.set(ws.messages.len.int64, labelValues = ["stored"])
proc init*(T: type WakuStore, peerManager: PeerManager, rng: ref BrHmacDrbgContext,
store: MessageStore = nil, wakuSwap: WakuSwap = nil, persistMessages = true,
capacity = DefaultStoreCapacity, isSqliteOnly = false): T =
@ -430,7 +210,11 @@ proc handleMessage*(w: WakuStore, topic: string, msg: WakuMessage) {.async.} =
# Store is mounted but new messages should not be stored
return
let index = msg.computeIndex(pubsubTopic = topic)
let index = Index.compute(
msg,
receivedTime = getNanosecondTime(getTime().toUnixFloat()),
pubsubTopic = topic
)
# add message to in-memory store
if not w.isSqliteOnly:
@ -614,7 +398,12 @@ proc resume*(ws: WakuStore, peerList: Option[seq[RemotePeerInfo]] = none(seq[Rem
# exclude index from the comparison criteria
for msg in msgList:
let index = msg.computeIndex(pubsubTopic = DefaultTopic)
let index = Index.compute(
msg,
receivedTime = getNanosecondTime(getTime().toUnixFloat()),
pubsubTopic = DefaultTopic
)
# check for duplicate messages
# TODO Should take pubsub topic into account if we are going to support topics rather than the DefaultTopic
if ws.messages.contains(index):
@ -712,3 +501,11 @@ proc queryWithAccounting*(ws: WakuStore, query: HistoryQuery, handler: QueryHand
waku_store_messages.set(response.value.response.messages.len.int64, labelValues = ["retrieved"])
handler(response.value.response)
# TODO: Remove the following deprecated method
proc computeIndex*(msg: WakuMessage,
receivedTime = getNanosecondTime(getTime().toUnixFloat()),
pubsubTopic = DefaultTopic): Index {.deprecated: "Use Index.compute() instead".}=
Index.compute(msg, receivedTime, pubsubTopic)

View File

@ -0,0 +1,35 @@
{.push raises: [Defect].}
import
nimcrypto/hash
import
../waku_message,
../../utils/pagination,
../../utils/time
type
HistoryContentFilter* = object
contentTopic*: ContentTopic
HistoryQuery* = object
contentFilters*: seq[HistoryContentFilter]
pubsubTopic*: string
pagingInfo*: PagingInfo # used for pagination
startTime*: Timestamp # used for time-window query
endTime*: Timestamp # used for time-window query
HistoryResponseError* {.pure.} = enum
## HistoryResponseError contains error message to inform the querying node about the state of its request
NONE = uint32(0)
INVALID_CURSOR = uint32(1)
HistoryResponse* = object
messages*: seq[WakuMessage]
pagingInfo*: PagingInfo # used for pagination
error*: HistoryResponseError
HistoryRPC* = object
requestId*: string
query*: HistoryQuery
response*: HistoryResponse

View File

@ -0,0 +1,204 @@
{.push raises: [Defect].}
import
nimcrypto/hash,
libp2p/protobuf/minprotobuf,
libp2p/varint
import
../waku_message,
../../utils/protobuf,
../../utils/pagination,
../../utils/time,
./rpc
proc encode*(index: Index): ProtoBuffer =
## Encode an Index object into a ProtoBuffer
## returns the resultant ProtoBuffer
var output = initProtoBuffer()
output.write3(1, index.digest.data)
output.write3(2, zint64(index.receiverTime))
output.write3(3, zint64(index.senderTime))
output.write3(4, index.pubsubTopic)
output.finish3()
return output
proc init*(T: type Index, buffer: seq[byte]): ProtoResult[T] =
## creates and returns an Index object out of buffer
var index = Index()
let pb = initProtoBuffer(buffer)
var data: seq[byte]
discard ?pb.getField(1, data)
# create digest from data
index.digest = MDigest[256]()
for count, b in data:
index.digest.data[count] = b
# read the timestamp
var receiverTime: zint64
discard ?pb.getField(2, receiverTime)
index.receiverTime = Timestamp(receiverTime)
# read the timestamp
var senderTime: zint64
discard ?pb.getField(3, senderTime)
index.senderTime = Timestamp(senderTime)
# read the pubsubTopic
discard ?pb.getField(4, index.pubsubTopic)
return ok(index)
proc encode*(pinfo: PagingInfo): ProtoBuffer =
## Encodes a PagingInfo object into a ProtoBuffer
## returns the resultant ProtoBuffer
var output = initProtoBuffer()
output.write3(1, pinfo.pageSize)
output.write3(2, pinfo.cursor.encode())
output.write3(3, uint32(ord(pinfo.direction)))
output.finish3()
return output
proc init*(T: type PagingInfo, buffer: seq[byte]): ProtoResult[T] =
## creates and returns a PagingInfo object out of buffer
var pagingInfo = PagingInfo()
let pb = initProtoBuffer(buffer)
var pageSize: uint64
discard ?pb.getField(1, pageSize)
pagingInfo.pageSize = pageSize
var cursorBuffer: seq[byte]
discard ?pb.getField(2, cursorBuffer)
pagingInfo.cursor = ?Index.init(cursorBuffer)
var direction: uint32
discard ?pb.getField(3, direction)
pagingInfo.direction = PagingDirection(direction)
return ok(pagingInfo)
proc encode*(filter: HistoryContentFilter): ProtoBuffer =
var output = initProtoBuffer()
output.write3(1, filter.contentTopic)
output.finish3()
return output
proc init*(T: type HistoryContentFilter, buffer: seq[byte]): ProtoResult[T] =
let pb = initProtoBuffer(buffer)
var contentTopic: ContentTopic
discard ?pb.getField(1, contentTopic)
ok(HistoryContentFilter(contentTopic: contentTopic))
proc encode*(query: HistoryQuery): ProtoBuffer =
var output = initProtoBuffer()
output.write3(2, query.pubsubTopic)
for filter in query.contentFilters:
output.write3(3, filter.encode())
output.write3(4, query.pagingInfo.encode())
output.write3(5, zint64(query.startTime))
output.write3(6, zint64(query.endTime))
output.finish3()
return output
proc init*(T: type HistoryQuery, buffer: seq[byte]): ProtoResult[T] =
var msg = HistoryQuery()
let pb = initProtoBuffer(buffer)
discard ?pb.getField(2, msg.pubsubTopic)
var buffs: seq[seq[byte]]
discard ?pb.getRepeatedField(3, buffs)
for buf in buffs:
msg.contentFilters.add(? HistoryContentFilter.init(buf))
var pagingInfoBuffer: seq[byte]
discard ?pb.getField(4, pagingInfoBuffer)
msg.pagingInfo = ?PagingInfo.init(pagingInfoBuffer)
var startTime: zint64
discard ?pb.getField(5, startTime)
msg.startTime = Timestamp(startTime)
var endTime: zint64
discard ?pb.getField(6, endTime)
msg.endTime = Timestamp(endTime)
return ok(msg)
proc encode*(response: HistoryResponse): ProtoBuffer =
var output = initProtoBuffer()
for msg in response.messages:
output.write3(2, msg.encode())
output.write3(3, response.pagingInfo.encode())
output.write3(4, uint32(ord(response.error)))
output.finish3()
return output
proc init*(T: type HistoryResponse, buffer: seq[byte]): ProtoResult[T] =
var msg = HistoryResponse()
let pb = initProtoBuffer(buffer)
var messages: seq[seq[byte]]
discard ?pb.getRepeatedField(2, messages)
for buf in messages:
let message = ?WakuMessage.init(buf)
msg.messages.add(message)
var pagingInfoBuffer: seq[byte]
discard ?pb.getField(3, pagingInfoBuffer)
msg.pagingInfo = ?PagingInfo.init(pagingInfoBuffer)
var error: uint32
discard ?pb.getField(4, error)
msg.error = HistoryResponseError(error)
return ok(msg)
proc encode*(rpc: HistoryRPC): ProtoBuffer =
var output = initProtoBuffer()
output.write3(1, rpc.requestId)
output.write3(2, rpc.query.encode())
output.write3(3, rpc.response.encode())
output.finish3()
return output
proc init*(T: type HistoryRPC, buffer: seq[byte]): ProtoResult[T] =
var rpc = HistoryRPC()
let pb = initProtoBuffer(buffer)
discard ?pb.getField(1, rpc.requestId)
var queryBuffer: seq[byte]
discard ?pb.getField(2, queryBuffer)
rpc.query = ?HistoryQuery.init(queryBuffer)
var responseBuffer: seq[byte]
discard ?pb.getField(3, responseBuffer)
rpc.response = ?HistoryResponse.init(responseBuffer)
return ok(rpc)

View File

@ -1,23 +1,44 @@
## Contains types and utilities for pagination.
##
## Used by both message store and store protocol.
{.push raises: [Defect].}
import
./time,
nimcrypto/hash,
stew/byteutils
stew/byteutils,
nimcrypto
import
../protocol/waku_message,
./time
export hash
type
Index* = object
## This type contains the description of an Index used in the pagination of WakuMessages
digest*: MDigest[256] # calculated over payload and content topic
receiverTime*: Timestamp
senderTime*: Timestamp # the time at which the message is generated
pubsubTopic*: string
type Index* = object
## This type contains the description of an Index used in the pagination of WakuMessages
digest*: MDigest[256] # calculated over payload and content topic
receiverTime*: Timestamp
senderTime*: Timestamp # the time at which the message is generated
pubsubTopic*: string
proc compute*(T: type Index, msg: WakuMessage, receivedTime: Timestamp, pubsubTopic: string): T =
## Takes a WakuMessage with received timestamp and returns its Index.
## Received timestamp will default to system time if not provided.
let
contentTopic = toBytes(msg.contentTopic)
payload = msg.payload
senderTime = msg.timestamp
var ctx: sha256
ctx.init()
ctx.update(contentTopic)
ctx.update(payload)
let digest = ctx.finish() # computes the hash
ctx.clear()
Index(
digest:digest,
receiverTime: receivedTime,
senderTime: senderTime,
pubsubTopic: pubsubTopic
)
proc `==`*(x, y: Index): bool =
## receiverTime plays no role in index equality
@ -59,3 +80,16 @@ proc cmp*(x, y: Index): int =
return digestcmp
return cmp(x.pubsubTopic, y.pubsubTopic)
type
PagingDirection* {.pure.} = enum
## PagingDirection determines the direction of pagination
BACKWARD = uint32(0)
FORWARD = uint32(1)
PagingInfo* = object
## This type holds the information needed for the pagination
pageSize*: uint64
cursor*: Index
direction*: PagingDirection