feat(archive): add waku archive implementation

This commit is contained in:
Lorenzo Delgado 2022-11-23 10:08:00 +01:00 committed by GitHub
parent 71cb709a23
commit a9a3e77685
34 changed files with 751 additions and 4580 deletions

View File

@ -445,7 +445,7 @@ when isMainModule:
waitFor connectToNodes(bridge.nodev2, conf.staticnodesV2)
if conf.storenode != "":
mountStoreClient(bridge.nodev2, store=nil)
mountStoreClient(bridge.nodev2)
setStorePeer(bridge.nodev2, conf.storenode)
if conf.filternode != "":

View File

@ -205,7 +205,7 @@ type
name: "storenode" }: string
storeMessageRetentionPolicy* {.
desc: "Message store retention policy. Time retention policy: 'time:<seconds>'. Capacity retention policy: 'capacity:<count>'",
desc: "Message store retention policy. Time retention policy: 'time:<seconds>'. Capacity retention policy: 'capacity:<count>'. Set to 'none' to disable.",
defaultValue: "time:" & $2.days.seconds,
name: "store-message-retention-policy" }: string
@ -486,7 +486,7 @@ let DbUrlRegex = re"^[\w\+]+:\/\/[\w\/\\\.\:\@]+$"
proc validateDbUrl*(val: string): ConfResult[string] =
let val = val.strip()
if val == "" or val.match(DbUrlRegex):
if val == "" or val == "none" or val.match(DbUrlRegex):
ok(val)
else:
err("invalid 'db url' option format: " & val)
@ -497,7 +497,7 @@ let StoreMessageRetentionPolicyRegex = re"^\w+:\w+$"
proc validateStoreMessageRetentionPolicy*(val: string): ConfResult[string] =
let val = val.strip()
if val == "" or val.match(StoreMessageRetentionPolicyRegex):
if val == "" or val == "none" or val.match(StoreMessageRetentionPolicyRegex):
ok(val)
else:
err("invalid 'store message retention policy' option format: " & val)

View File

@ -28,16 +28,16 @@ import
../../waku/v2/node/peer_manager/peer_store/migrations as peer_store_sqlite_migrations,
../../waku/v2/node/dnsdisc/waku_dnsdisc,
../../waku/v2/node/discv5/waku_discv5,
../../waku/v2/node/message_store/queue_store,
../../waku/v2/node/message_store/sqlite_store,
../../waku/v2/node/message_store/sqlite_store/migrations as message_store_sqlite_migrations,
../../waku/v2/node/message_store/message_retention_policy,
../../waku/v2/node/message_store/message_retention_policy_capacity,
../../waku/v2/node/message_store/message_retention_policy_time,
../../waku/v2/node/wakuswitch,
../../waku/v2/node/waku_node,
../../waku/v2/node/waku_metrics,
../../waku/v2/protocol/waku_store,
../../waku/v2/protocol/waku_archive,
../../waku/v2/protocol/waku_archive/driver/queue_driver,
../../waku/v2/protocol/waku_archive/driver/sqlite_driver,
../../waku/v2/protocol/waku_archive/driver/sqlite_driver/migrations as archive_driver_sqlite_migrations,
../../waku/v2/protocol/waku_archive/retention_policy,
../../waku/v2/protocol/waku_archive/retention_policy/retention_policy_capacity,
../../waku/v2/protocol/waku_archive/retention_policy/retention_policy_time,
../../waku/v2/protocol/waku_peer_exchange,
../../waku/v2/utils/peers,
../../waku/v2/utils/wakuenr,
@ -60,7 +60,7 @@ type SetupResult[T] = Result[T, string]
proc setupDatabaseConnection(dbUrl: string): SetupResult[Option[SqliteDatabase]] =
## dbUrl mimics SQLAlchemy Database URL schema
## See: https://docs.sqlalchemy.org/en/14/core/engines.html#database-urls
if dbUrl == "":
if dbUrl == "" or dbUrl == "none":
return ok(none(SqliteDatabase))
let dbUrlParts = dbUrl.split("://", 1)
@ -118,23 +118,9 @@ proc setupPeerStorage(): SetupResult[Option[WakuPeerStorage]] =
ok(some(res.value))
proc setupMessagesStore(db: Option[SqliteDatabase], storeCapacity: int = high(int)): SetupResult[MessageStore] =
if db.isSome():
debug "setting up sqlite-only message store"
let res = SqliteStore.init(db.get())
if res.isErr():
return err("failed to init sqlite message store: " & res.error)
return ok(res.value)
else:
debug "setting up in-memory message store"
let store = StoreQueueRef.new(storeCapacity)
return ok(store)
proc setupMessageStoreRetentionPolicy(retentionPolicy: string): SetupResult[Option[MessageRetentionPolicy]] =
if retentionPolicy == "":
return ok(none(MessageRetentionPolicy))
proc setupWakuArchiveRetentionPolicy(retentionPolicy: string): SetupResult[Option[RetentionPolicy]] =
if retentionPolicy == "" or retentionPolicy == "none":
return ok(none(RetentionPolicy))
let rententionPolicyParts = retentionPolicy.split(":", 1)
let
@ -149,7 +135,7 @@ proc setupMessageStoreRetentionPolicy(retentionPolicy: string): SetupResult[Opti
except ValueError:
return err("invalid time retention policy argument")
let retPolicy: MessageRetentionPolicy = TimeRetentionPolicy.init(retentionTimeSeconds)
let retPolicy: RetentionPolicy = TimeRetentionPolicy.init(retentionTimeSeconds)
return ok(some(retPolicy))
elif policy == "capacity":
@ -159,13 +145,13 @@ proc setupMessageStoreRetentionPolicy(retentionPolicy: string): SetupResult[Opti
except ValueError:
return err("invalid capacity retention policy argument")
let retPolicy: MessageRetentionPolicy = CapacityRetentionPolicy.init(retentionCapacity)
let retPolicy: RetentionPolicy = CapacityRetentionPolicy.init(retentionCapacity)
return ok(some(retPolicy))
else:
return err("unknown retention policy")
proc setupMessageStorage(dbUrl: string, vacuum: bool, migrate: bool): SetupResult[MessageStore] =
proc setupWakuArchiveDriver(dbUrl: string, vacuum: bool, migrate: bool): SetupResult[ArchiveDriver] =
let db = ?setupDatabaseConnection(dbUrl)
if db.isSome():
@ -179,10 +165,20 @@ proc setupMessageStorage(dbUrl: string, vacuum: bool, migrate: bool): SetupResul
# Database migration
if migrate:
?message_store_sqlite_migrations.migrate(db.get())
?archive_driver_sqlite_migrations.migrate(db.get())
# TODO: Extract capacity from `messageRetentionPolicy`
return setupMessagesStore(db, storeCapacity=high(int))
if db.isSome():
debug "setting up sqlite waku archive driver"
let res = SqliteDriver.new(db.get())
if res.isErr():
return err("failed to init sqlite archive driver: " & res.error)
ok(res.value)
else:
debug "setting up in-memory waku archive driver"
let driver = QueueDriver.new() # Defaults to a capacity of 25.000 messages
ok(driver)
proc retrieveDynamicBootstrapNodes(dnsDiscovery: bool, dnsDiscoveryUrl: string, dnsDiscoveryNameServers: seq[ValidIpAddress]): SetupResult[seq[RemotePeerInfo]] =
@ -318,8 +314,8 @@ proc initNode(conf: WakuNodeConf,
ok(node)
proc setupProtocols(node: WakuNode, conf: WakuNodeConf,
mStore: Option[MessageStore],
mStoreRetentionPolicy: Option[MessageRetentionPolicy]): Future[SetupResult[void]] {.async.} =
archiveDriver: Option[ArchiveDriver],
archiveRetentionPolicy: Option[RetentionPolicy]): Future[SetupResult[void]] {.async.} =
## Setup configured protocols on an existing Waku v2 node.
## Optionally include persistent message storage.
## No protocols are started yet.
@ -385,27 +381,25 @@ proc setupProtocols(node: WakuNode, conf: WakuNodeConf,
except:
return err("failed to mount waku swap protocol: " & getCurrentExceptionMsg())
# Store setup
if conf.store:
# Archive setup
let messageValidator: MessageValidator = DefaultMessageValidator()
mountArchive(node, archiveDriver, messageValidator=some(messageValidator), retentionPolicy=archiveRetentionPolicy)
# Store setup
try:
# TODO: Decouple message store and message retention policy from waku store protocol object
let mStorage = if mStore.isNone(): nil
else: mStore.get()
await mountStore(node, mStorage, retentionPolicy=mStoreRetentionPolicy)
await mountStore(node)
except:
return err("failed to mount waku store protocol: " & getCurrentExceptionMsg())
# TODO: Move this to storage setup phase
if mStoreRetentionPolicy.isSome():
if archiveRetentionPolicy.isSome():
executeMessageRetentionPolicy(node)
startMessageRetentionPolicyPeriodicTask(node, interval=MessageStoreDefaultRetentionPolicyInterval)
startMessageRetentionPolicyPeriodicTask(node, interval=WakuArchiveDefaultRetentionPolicyInterval)
if conf.storenode != "":
try:
# TODO: Use option instead of nil in store client
let mStorage = if mStore.isNone(): nil
else: mStore.get()
mountStoreClient(node, store=mStorage)
mountStoreClient(node)
setStorePeer(node, conf.storenode)
except:
return err("failed to set node waku store peer: " & getCurrentExceptionMsg())
@ -504,6 +498,7 @@ proc startNode(node: WakuNode, conf: WakuNodeConf,
return ok()
when defined(waku_exp_store_resume):
proc resumeMessageStore(node: WakuNode, address: string): Future[SetupResult[void]] {.async.} =
# Resume historical messages, this has to be called after the node has been started
if address != "":
@ -584,9 +579,9 @@ when isMainModule:
error "failed to setup peer store", error=peerStoreRes.error
waku_node_errors.inc(labelValues = ["init_store_failure"])
## Message store
var messageStore = none(MessageStore)
var messageStoreRetentionPolicy = none(MessageRetentionPolicy)
## Waku archive
var archiveDriver = none(ArchiveDriver)
var archiveRetentionPolicy = none(RetentionPolicy)
if conf.store:
# Message storage
@ -595,11 +590,11 @@ when isMainModule:
error "failed to configure the message store database connection", error=dbUrlValidationRes.error
quit(QuitFailure)
let messageStoreRes = setupMessageStorage(dbUrlValidationRes.get(), vacuum=conf.storeMessageDbVacuum, migrate=conf.storeMessageDbMigration)
if messageStoreRes.isOk():
messageStore = some(messageStoreRes.get())
let archiveDriverRes = setupWakuArchiveDriver(dbUrlValidationRes.get(), vacuum=conf.storeMessageDbVacuum, migrate=conf.storeMessageDbMigration)
if archiveDriverRes.isOk():
archiveDriver = some(archiveDriverRes.get())
else:
error "failed to configure message store", error=messageStoreRes.error
error "failed to configure archive driver", error=archiveDriverRes.error
quit(QuitFailure)
# Message store retention policy
@ -608,17 +603,17 @@ when isMainModule:
error "invalid store message retention policy configuration", error=storeMessageRetentionPolicyRes.error
quit(QuitFailure)
let messageStoreRetentionPolicyRes = setupMessageStoreRetentionPolicy(storeMessageRetentionPolicyRes.get())
if messageStoreRetentionPolicyRes.isOk():
messageStoreRetentionPolicy = messageStoreRetentionPolicyRes.get()
let archiveRetentionPolicyRes = setupWakuArchiveRetentionPolicy(storeMessageRetentionPolicyRes.get())
if archiveRetentionPolicyRes.isOk():
archiveRetentionPolicy = archiveRetentionPolicyRes.get()
else:
error "failed to configure the message retention policy", error=messageStoreRetentionPolicyRes.error
error "failed to configure the message retention policy", error=archiveRetentionPolicyRes.error
quit(QuitFailure)
# TODO: Move retention policy execution here
# if messageStoreRetentionPolicy.isSome():
# if archiveRetentionPolicy.isSome():
# executeMessageRetentionPolicy(node)
# startMessageRetentionPolicyPeriodicTask(node, interval=MessageStoreDefaultRetentionPolicyInterval)
# startMessageRetentionPolicyPeriodicTask(node, interval=WakuArchiveDefaultRetentionPolicyInterval)
debug "2/7 Retrieve dynamic bootstrap nodes"
@ -643,7 +638,7 @@ when isMainModule:
debug "4/7 Mounting protocols"
let setupProtocolsRes = waitFor setupProtocols(node, conf, messageStore, messageStoreRetentionPolicy)
let setupProtocolsRes = waitFor setupProtocols(node, conf, archiveDriver, archiveRetentionPolicy)
if setupProtocolsRes.isErr():
error "4/7 Mounting protocols failed. Continuing in current state.", error=setupProtocolsRes.error
@ -653,6 +648,8 @@ when isMainModule:
if startNodeRes.isErr():
error "5/7 Starting node and mounted protocols failed. Continuing in current state.", error=startNodeRes.error
when defined(waku_exp_store_resume):
# Resume message store on boot
if conf.storeResumePeer != "":
let resumeMessageStoreRes = waitFor resumeMessageStore(node, conf.storeResumePeer)

View File

@ -13,14 +13,7 @@ import
./v2/waku_archive/test_driver_sqlite_query,
./v2/waku_archive/test_driver_sqlite,
./v2/waku_archive/test_retention_policy,
./v2/waku_archive/test_waku_archive,
# TODO: Remove with the implementation
./v2/test_message_store_queue_index,
./v2/test_message_store_queue_pagination,
./v2/test_message_store_queue,
./v2/test_message_store_sqlite_query,
./v2/test_message_store_sqlite
./v2/waku_archive/test_waku_archive
import
# Waku v2 tests
@ -29,9 +22,6 @@ import
# Waku Store
./v2/test_waku_store_rpc_codec,
./v2/test_waku_store,
./v2/test_waku_store_client,
# TODO: Re-enable store resume test cases (#1282)
# ./v2/test_waku_store_resume,
./v2/test_wakunode_store,
# Waku LightPush
./v2/test_waku_lightpush,
@ -65,6 +55,9 @@ import
# Utils
./v2/test_utils_keyfile
## Experimental
when defined(rln):
import
./v2/test_waku_rln_relay,
@ -72,7 +65,11 @@ when defined(rln):
when defined(onchain_rln):
import ./v2/test_waku_rln_relay_onchain
when defined(waku_exp_store_resume):
# TODO: Review store resume test cases (#1282)
import ./v2/test_waku_store_resume
# TODO Only enable this once swap module is integrated more nicely as a dependency, i.e. as submodule with CI etc
# TODO: Only enable this once swap module is integrated more nicely as a dependency, i.e. as submodule with CI etc
# For PoC execute it manually and run separate module here: https://github.com/vacp2p/swap-contracts-module
# ./v2/test_waku_swap_contracts

View File

@ -14,7 +14,6 @@ import
libp2p/protocols/pubsub/rpc/message
import
../../waku/v1/node/rpc/hexstrings,
../../waku/v2/node/message_store/queue_store,
../../waku/v2/node/waku_node,
../../waku/v2/node/jsonrpc/[store_api,
relay_api,
@ -24,6 +23,8 @@ import
private_api],
../../waku/v2/protocol/waku_message,
../../waku/v2/protocol/waku_relay,
../../waku/v2/protocol/waku_archive,
../../waku/v2/protocol/waku_archive/driver/queue_driver,
../../waku/v2/protocol/waku_store,
../../waku/v2/protocol/waku_store/rpc,
../../waku/v2/protocol/waku_swap/waku_swap,
@ -38,6 +39,14 @@ template sourceDir*: string = currentSourcePath.rsplit(DirSep, 1)[0]
const sigPath = sourceDir / ParDir / ParDir / "waku" / "v2" / "node" / "jsonrpc" / "jsonrpc_callsigs.nim"
createRpcSigs(RpcHttpClient, sigPath)
proc put(store: ArchiveDriver, pubsubTopic: PubsubTopic, message: WakuMessage): Result[void, string] =
let
digest = waku_archive.computeDigest(message)
receivedTime = if message.timestamp > 0: message.timestamp
else: getNanosecondTime(getTime().toUnixFloat())
store.put(pubsubTopic, message, digest, receivedTime)
procSuite "Waku v2 JSON-RPC API":
let
rng = crypto.newRng()
@ -234,9 +243,10 @@ procSuite "Waku v2 JSON-RPC API":
key = crypto.PrivateKey.random(ECDSA, rng[]).get()
peer = PeerInfo.new(key)
let store = StoreQueueRef.new()
await node.mountStore(store=store)
node.mountStoreClient(store=store)
let driver: ArchiveDriver = QueueDriver.new()
node.mountArchive(some(driver), none(MessageValidator), none(RetentionPolicy))
await node.mountStore()
node.mountStoreClient()
var listenSwitch = newStandardSwitch(some(key))
await listenSwitch.start()
@ -247,20 +257,21 @@ procSuite "Waku v2 JSON-RPC API":
listenSwitch.mount(node.wakuStore)
# Now prime it with some history before tests
var
msgList = @[WakuMessage(payload: @[byte 0], contentTopic: ContentTopic("2"), timestamp: 0),
WakuMessage(payload: @[byte 1], contentTopic: DefaultContentTopic, timestamp: 1),
WakuMessage(payload: @[byte 2], contentTopic: DefaultContentTopic, timestamp: 2),
WakuMessage(payload: @[byte 3], contentTopic: DefaultContentTopic, timestamp: 3),
WakuMessage(payload: @[byte 4], contentTopic: DefaultContentTopic, timestamp: 4),
WakuMessage(payload: @[byte 5], contentTopic: DefaultContentTopic, timestamp: 5),
WakuMessage(payload: @[byte 6], contentTopic: DefaultContentTopic, timestamp: 6),
WakuMessage(payload: @[byte 7], contentTopic: DefaultContentTopic, timestamp: 7),
WakuMessage(payload: @[byte 8], contentTopic: DefaultContentTopic, timestamp: 8),
WakuMessage(payload: @[byte 9], contentTopic: ContentTopic("2"), timestamp: 9)]
let msgList = @[
fakeWakuMessage(@[byte 0], contentTopic=ContentTopic("2"), ts=0),
fakeWakuMessage(@[byte 1], ts=1),
fakeWakuMessage(@[byte 2], ts=2),
fakeWakuMessage(@[byte 3], ts=3),
fakeWakuMessage(@[byte 4], ts=4),
fakeWakuMessage(@[byte 5], ts=5),
fakeWakuMessage(@[byte 6], ts=6),
fakeWakuMessage(@[byte 7], ts=7),
fakeWakuMessage(@[byte 8], ts=8),
fakeWakuMessage(@[byte 9], contentTopic=ContentTopic("2"), ts=9)
]
for wakuMsg in msgList:
require node.wakuStore.store.put(DefaultPubsubTopic, wakuMsg).isOk()
for msg in msgList:
require driver.put(DefaultPubsubTopic, msg).isOk()
let client = newRpcHttpClient()
await client.connect("127.0.0.1", rpcPort, false)
@ -455,9 +466,10 @@ procSuite "Waku v2 JSON-RPC API":
await node.mountFilter()
await node.mountFilterClient()
await node.mountSwap()
let store = StoreQueueRef.new()
await node.mountStore(store=store)
node.mountStoreClient(store=store)
let driver: ArchiveDriver = QueueDriver.new()
node.mountArchive(some(driver), none(MessageValidator), none(RetentionPolicy))
await node.mountStore()
node.mountStoreClient()
# Create and set some peers
let

View File

@ -1,185 +0,0 @@
{.used.}
import
std/[options, sequtils, algorithm],
stew/results,
testutils/unittests
import
../../waku/v2/node/message_store/queue_store,
../../waku/v2/protocol/waku_message,
../../waku/v2/protocol/waku_store,
../../waku/v2/utils/time
# Helper functions
proc genIndexedWakuMessage(i: int8): IndexedWakuMessage =
## Use i to generate an IndexedWakuMessage
var data {.noinit.}: array[32, byte]
for x in data.mitems: x = i.byte
let
message = WakuMessage(payload: @[byte i], timestamp: Timestamp(i))
cursor = Index(
receiverTime: Timestamp(i),
senderTime: Timestamp(i),
digest: MessageDigest(data: data),
pubsubTopic: "test-pubsub-topic"
)
IndexedWakuMessage(msg: message, index: cursor)
proc getPrepopulatedTestStore(unsortedSet: auto, capacity: int): StoreQueueRef =
let store = StoreQueueRef.new(capacity)
for i in unsortedSet:
let message = genIndexedWakuMessage(i.int8)
discard store.add(message)
store
procSuite "Sorted store queue":
test "Store capacity - add a message over the limit":
## Given
let capacity = 5
let store = StoreQueueRef.new(capacity)
## When
# Fill up the queue
for i in 1..capacity:
let message = genIndexedWakuMessage(i.int8)
require(store.add(message).isOk())
# Add one more. Capacity should not be exceeded
let message = genIndexedWakuMessage(capacity.int8 + 1)
require(store.add(message).isOk())
## Then
check:
store.len == capacity
test "Store capacity - add message older than oldest in the queue":
## Given
let capacity = 5
let store = StoreQueueRef.new(capacity)
## When
# Fill up the queue
for i in 1..capacity:
let message = genIndexedWakuMessage(i.int8)
require(store.add(message).isOk())
# Attempt to add message with older value than oldest in queue should fail
let
oldestTimestamp = store.first().get().index.senderTime
message = genIndexedWakuMessage(oldestTimestamp.int8 - 1)
addRes = store.add(message)
## Then
check:
addRes.isErr()
addRes.error() == "too_old"
check:
store.len == capacity
test "Store queue sort-on-insert works":
## Given
let
capacity = 5
unsortedSet = [5,1,3,2,4]
let store = getPrepopulatedTestStore(unsortedSet, capacity)
# Walk forward through the set and verify ascending order
var prevSmaller = genIndexedWakuMessage(min(unsortedSet).int8 - 1).index
for i in store.fwdIterator:
let (index, indexedWakuMessage) = i
check cmp(index, prevSmaller) > 0
prevSmaller = index
# Walk backward through the set and verify descending order
var prevLarger = genIndexedWakuMessage(max(unsortedSet).int8 + 1).index
for i in store.bwdIterator:
let (index, indexedWakuMessage) = i
check cmp(index, prevLarger) < 0
prevLarger = index
test "access first item from store queue":
## Given
let
capacity = 5
unsortedSet = [5,1,3,2,4]
let store = getPrepopulatedTestStore(unsortedSet, capacity)
## When
let firstRes = store.first()
## Then
check:
firstRes.isOk()
let first = firstRes.tryGet()
check:
first.msg.timestamp == Timestamp(1)
test "get first item from empty store should fail":
## Given
let capacity = 5
let store = StoreQueueRef.new(capacity)
## When
let firstRes = store.first()
## Then
check:
firstRes.isErr()
firstRes.error() == "Not found"
test "access last item from store queue":
## Given
let
capacity = 5
unsortedSet = [5,1,3,2,4]
let store = getPrepopulatedTestStore(unsortedSet, capacity)
## When
let lastRes = store.last()
## Then
check:
lastRes.isOk()
let last = lastRes.tryGet()
check:
last.msg.timestamp == Timestamp(5)
test "get last item from empty store should fail":
## Given
let capacity = 5
let store = StoreQueueRef.new(capacity)
## When
let lastRes = store.last()
## Then
check:
lastRes.isErr()
lastRes.error() == "Not found"
test "verify if store queue contains an index":
## Given
let
capacity = 5
unsortedSet = [5,1,3,2,4]
let store = getPrepopulatedTestStore(unsortedSet, capacity)
let
existingIndex = genIndexedWakuMessage(4).index
nonExistingIndex = genIndexedWakuMessage(99).index
## Then
check:
store.contains(existingIndex) == true
store.contains(nonExistingIndex) == false

View File

@ -1,183 +0,0 @@
{.used.}
import
std/times,
stew/byteutils,
testutils/unittests,
nimcrypto
import
../../waku/v2/protocol/waku_message,
../../waku/v2/utils/time,
../../waku/v2/node/message_store/queue_store/index,
./testlib/common
## Helpers
proc getTestTimestamp(offset=0): Timestamp =
let now = getNanosecondTime(epochTime() + float(offset))
Timestamp(now)
proc hashFromStr(input: string): MDigest[256] =
var ctx: sha256
ctx.init()
ctx.update(input.toBytes())
let hashed = ctx.finish()
ctx.clear()
return hashed
suite "Pagination - Index":
## Test vars
let
smallIndex1 = Index(digest: hashFromStr("1234"),
receiverTime: getNanosecondTime(0),
senderTime: getNanosecondTime(1000))
smallIndex2 = Index(digest: hashFromStr("1234567"), # digest is less significant than senderTime
receiverTime: getNanosecondTime(0),
senderTime: getNanosecondTime(1000))
largeIndex1 = Index(digest: hashFromStr("1234"),
receiverTime: getNanosecondTime(0),
senderTime: getNanosecondTime(9000)) # only senderTime differ from smallIndex1
largeIndex2 = Index(digest: hashFromStr("12345"), # only digest differs from smallIndex1
receiverTime: getNanosecondTime(0),
senderTime: getNanosecondTime(1000))
eqIndex1 = Index(digest: hashFromStr("0003"),
receiverTime: getNanosecondTime(0),
senderTime: getNanosecondTime(54321))
eqIndex2 = Index(digest: hashFromStr("0003"),
receiverTime: getNanosecondTime(0),
senderTime: getNanosecondTime(54321))
eqIndex3 = Index(digest: hashFromStr("0003"),
receiverTime: getNanosecondTime(9999), # receiverTime difference should have no effect on comparisons
senderTime: getNanosecondTime(54321))
diffPsTopic = Index(digest: hashFromStr("1234"),
receiverTime: getNanosecondTime(0),
senderTime: getNanosecondTime(1000),
pubsubTopic: "zzzz")
noSenderTime1 = Index(digest: hashFromStr("1234"),
receiverTime: getNanosecondTime(1100),
senderTime: getNanosecondTime(0),
pubsubTopic: "zzzz")
noSenderTime2 = Index(digest: hashFromStr("1234"),
receiverTime: getNanosecondTime(10000),
senderTime: getNanosecondTime(0),
pubsubTopic: "zzzz")
noSenderTime3 = Index(digest: hashFromStr("1234"),
receiverTime: getNanosecondTime(1200),
senderTime: getNanosecondTime(0),
pubsubTopic: "aaaa")
noSenderTime4 = Index(digest: hashFromStr("0"),
receiverTime: getNanosecondTime(1200),
senderTime: getNanosecondTime(0),
pubsubTopic: "zzzz")
test "Index comparison":
# Index comparison with senderTime diff
check:
cmp(smallIndex1, largeIndex1) < 0
cmp(smallIndex2, largeIndex1) < 0
# Index comparison with digest diff
check:
cmp(smallIndex1, smallIndex2) < 0
cmp(smallIndex1, largeIndex2) < 0
cmp(smallIndex2, largeIndex2) > 0
cmp(largeIndex1, largeIndex2) > 0
# Index comparison when equal
check:
cmp(eqIndex1, eqIndex2) == 0
# pubsubTopic difference
check:
cmp(smallIndex1, diffPsTopic) < 0
# receiverTime diff plays no role when senderTime set
check:
cmp(eqIndex1, eqIndex3) == 0
# receiverTime diff plays no role when digest/pubsubTopic equal
check:
cmp(noSenderTime1, noSenderTime2) == 0
# sort on receiverTime with no senderTimestamp and unequal pubsubTopic
check:
cmp(noSenderTime1, noSenderTime3) < 0
# sort on receiverTime with no senderTimestamp and unequal digest
check:
cmp(noSenderTime1, noSenderTime4) < 0
# sort on receiverTime if no senderTimestamp on only one side
check:
cmp(smallIndex1, noSenderTime1) < 0
cmp(noSenderTime1, smallIndex1) > 0 # Test symmetry
cmp(noSenderTime2, eqIndex3) < 0
cmp(eqIndex3, noSenderTime2) > 0 # Test symmetry
test "Index equality":
# Exactly equal
check:
eqIndex1 == eqIndex2
# Receiver time plays no role, even without sender time
check:
eqIndex1 == eqIndex3
noSenderTime1 == noSenderTime2 # only receiver time differs, indices are equal
noSenderTime1 != noSenderTime3 # pubsubTopics differ
noSenderTime1 != noSenderTime4 # digests differ
# Unequal sender time
check:
smallIndex1 != largeIndex1
# Unequal digest
check:
smallIndex1 != smallIndex2
# Unequal hash and digest
check:
smallIndex1 != eqIndex1
# Unequal pubsubTopic
check:
smallIndex1 != diffPsTopic
test "Index computation should not be empty":
## Given
let ts = getTestTimestamp()
let wm = WakuMessage(payload: @[byte 1, 2, 3], timestamp: ts)
## When
let ts2 = getTestTimestamp() + 10
let index = Index.compute(wm, ts2, DefaultContentTopic)
## Then
check:
index.digest.data.len != 0
index.digest.data.len == 32 # sha2 output length in bytes
index.receiverTime == ts2 # the receiver timestamp should be a non-zero value
index.senderTime == ts
index.pubsubTopic == DefaultContentTopic
test "Index digest of two identical messsage should be the same":
## Given
let topic = ContentTopic("test-content-topic")
let
wm1 = WakuMessage(payload: @[byte 1, 2, 3], contentTopic: topic)
wm2 = WakuMessage(payload: @[byte 1, 2, 3], contentTopic: topic)
## When
let ts = getTestTimestamp()
let
index1 = Index.compute(wm1, ts, DefaultPubsubTopic)
index2 = Index.compute(wm2, ts, DefaultPubsubTopic)
## Then
check:
index1.digest == index2.digest

View File

@ -1,403 +0,0 @@
{.used.}
import
std/[options, sequtils, algorithm],
testutils/unittests,
libp2p/protobuf/minprotobuf
import
../../waku/v2/node/message_store/queue_store/queue_store {.all.},
../../waku/v2/node/message_store/queue_store/index,
../../waku/v2/protocol/waku_store,
../../waku/v2/protocol/waku_message,
../../waku/v2/utils/time,
./testlib/common
proc getTestStoreQueue(numMessages: int): StoreQueueRef =
let testStoreQueue = StoreQueueRef.new(numMessages)
var data {.noinit.}: array[32, byte]
for x in data.mitems: x = 1
for i in 0..<numMessages:
let msg = IndexedWakuMessage(
msg: WakuMessage(payload: @[byte i], timestamp: Timestamp(i)),
index: Index(
receiverTime: Timestamp(i),
senderTime: Timestamp(i),
digest: MessageDigest(data: data)
)
)
discard testStoreQueue.add(msg)
return testStoreQueue
procSuite "Queue store - pagination":
let store = getTestStoreQueue(10)
let
indexList: seq[Index] = toSeq(store.fwdIterator()).mapIt(it[0])
msgList: seq[WakuMessage] = toSeq(store.fwdIterator()).mapIt(it[1].msg)
test "Forward pagination - normal pagination":
## Given
let
pageSize: uint64 = 2
cursor: Option[Index] = some(indexList[3])
forward: bool = true
## When
let page = store.getPage(pageSize=pageSize, forward=forward, cursor=cursor)
## Then
let data = page.tryGet().mapIt(it[1])
check:
data.len == 2
data == msgList[4..5]
test "Forward pagination - initial pagination request with an empty cursor":
## Given
let
pageSize: uint64 = 2
cursor: Option[Index] = none(Index)
forward: bool = true
## When
let page = store.getPage(pageSize=pageSize, forward=forward, cursor=cursor)
## Then
let data = page.tryGet().mapIt(it[1])
check:
data.len == 2
data == msgList[0..1]
test "Forward pagination - initial pagination request with an empty cursor to fetch the entire history":
## Given
let
pageSize: uint64 = 13
cursor: Option[Index] = none(Index)
forward: bool = true
## When
let page = store.getPage(pageSize=pageSize, forward=forward, cursor=cursor)
## Then
let data = page.tryGet().mapIt(it[1])
check:
data.len == 10
data == msgList[0..9]
test "Forward pagination - empty msgList":
## Given
let store = getTestStoreQueue(0)
let
pageSize: uint64 = 2
cursor: Option[Index] = none(Index)
forward: bool = true
## When
let page = store.getPage(pageSize=pageSize, forward=forward, cursor=cursor)
## Then
let data = page.tryGet().mapIt(it[1])
check:
data.len == 0
test "Forward pagination - page size larger than the remaining messages":
## Given
let
pageSize: uint64 = 10
cursor: Option[Index] = some(indexList[3])
forward: bool = true
## When
let page = store.getPage(pageSize=pageSize, forward=forward, cursor=cursor)
## Then
let data = page.tryGet().mapIt(it[1])
check:
data.len == 6
data == msgList[4..9]
test "Forward pagination - page size larger than the maximum allowed page size":
## Given
let
pageSize: uint64 = MaxPageSize + 1
cursor: Option[Index] = some(indexList[3])
forward: bool = true
## When
let page = store.getPage(pageSize=pageSize, forward=forward, cursor=cursor)
## Then
let data = page.tryGet().mapIt(it[1])
check:
uint64(data.len) <= MaxPageSize
test "Forward pagination - cursor pointing to the end of the message list":
## Given
let
pageSize: uint64 = 10
cursor: Option[Index] = some(indexList[9])
forward: bool = true
## When
let page = store.getPage(pageSize=pageSize, forward=forward, cursor=cursor)
## Then
let data = page.tryGet().mapIt(it[1])
check:
data.len == 0
test "Forward pagination - invalid cursor":
## Given
let msg = fakeWakuMessage(payload= @[byte 10])
let index = HistoryCursor(
pubsubTopic: DefaultPubsubTopic,
senderTime: msg.timestamp,
storeTime: msg.timestamp,
digest: computeDigest(msg)
).toIndex()
let
pageSize: uint64 = 10
cursor: Option[Index] = some(index)
forward: bool = true
## When
let page = store.getPage(pageSize=pageSize, forward=forward, cursor=cursor)
## Then
let error = page.tryError()
check:
error == StoreQueueErrorKind.INVALID_CURSOR
test "Forward pagination - initial paging query over a message list with one message":
## Given
let store = getTestStoreQueue(1)
let
pageSize: uint64 = 10
cursor: Option[Index] = none(Index)
forward: bool = true
## When
let page = store.getPage(pageSize=pageSize, forward=forward, cursor=cursor)
## Then
let data = page.tryGet().mapIt(it[1])
check:
data.len == 1
test "Forward pagination - pagination over a message list with one message":
## Given
let store = getTestStoreQueue(1)
let
pageSize: uint64 = 10
cursor: Option[Index] = some(indexList[0])
forward: bool = true
## When
let page = store.getPage(pageSize=pageSize, forward=forward, cursor=cursor)
## Then
let data = page.tryGet().mapIt(it[1])
check:
data.len == 0
test "Forward pagination - with pradicate":
## Given
let
pageSize: uint64 = 3
cursor: Option[Index] = none(Index)
forward = true
proc onlyEvenTimes(i: IndexedWakuMessage): bool = i.msg.timestamp.int64 mod 2 == 0
## When
let page = store.getPage(pageSize=pageSize, forward=forward, cursor=cursor, predicate=onlyEvenTimes)
## Then
let data = page.tryGet().mapIt(it[1])
check:
data.mapIt(it.timestamp.int) == @[0, 2, 4]
test "Backward pagination - normal pagination":
## Given
let
pageSize: uint64 = 2
cursor: Option[Index] = some(indexList[3])
forward: bool = false
## When
let page = store.getPage(pageSize=pageSize, forward=forward, cursor=cursor)
## Then
let data = page.tryGet().mapIt(it[1])
check:
data == msgList[1..2].reversed
test "Backward pagination - empty msgList":
## Given
let store = getTestStoreQueue(0)
let
pageSize: uint64 = 2
cursor: Option[Index] = none(Index)
forward: bool = false
## When
let page = store.getPage(pageSize=pageSize, forward=forward, cursor=cursor)
## Then
let data = page.tryGet().mapIt(it[1])
check:
data.len == 0
test "Backward pagination - initial pagination request with an empty cursor":
## Given
let
pageSize: uint64 = 2
cursor: Option[Index] = none(Index)
forward: bool = false
## When
let page = store.getPage(pageSize=pageSize, forward=forward, cursor=cursor)
## Then
let data = page.tryGet().mapIt(it[1])
check:
data.len == 2
data == msgList[8..9].reversed
test "Backward pagination - initial pagination request with an empty cursor to fetch the entire history":
## Given
let
pageSize: uint64 = 13
cursor: Option[Index] = none(Index)
forward: bool = false
## When
let page = store.getPage(pageSize=pageSize, forward=forward, cursor=cursor)
## Then
let data = page.tryGet().mapIt(it[1])
check:
data.len == 10
data == msgList[0..9].reversed
test "Backward pagination - page size larger than the remaining messages":
## Given
let
pageSize: uint64 = 5
cursor: Option[Index] = some(indexList[3])
forward: bool = false
## When
let page = store.getPage(pageSize=pageSize, forward=forward, cursor=cursor)
## Then
let data = page.tryGet().mapIt(it[1])
check:
data == msgList[0..2].reversed
test "Backward pagination - page size larger than the Maximum allowed page size":
## Given
let
pageSize: uint64 = MaxPageSize + 1
cursor: Option[Index] = some(indexList[3])
forward: bool = false
## When
let page = store.getPage(pageSize=pageSize, forward=forward, cursor=cursor)
## Then
let data = page.tryGet().mapIt(it[1])
check:
uint64(data.len) <= MaxPageSize
test "Backward pagination - cursor pointing to the begining of the message list":
## Given
let
pageSize: uint64 = 5
cursor: Option[Index] = some(indexList[0])
forward: bool = false
## When
let page = store.getPage(pageSize=pageSize, forward=forward, cursor=cursor)
## Then
let data = page.tryGet().mapIt(it[1])
check:
data.len == 0
test "Backward pagination - invalid cursor":
## Given
let msg = fakeWakuMessage(payload= @[byte 10])
let index = HistoryCursor(
pubsubTopic: DefaultPubsubTopic,
senderTime: msg.timestamp,
storeTime: msg.timestamp,
digest: computeDigest(msg)
).toIndex()
let
pageSize: uint64 = 2
cursor: Option[Index] = some(index)
forward: bool = false
## When
let page = store.getPage(pageSize=pageSize, forward=forward, cursor=cursor)
## Then
let error = page.tryError()
check:
error == StoreQueueErrorKind.INVALID_CURSOR
test "Backward pagination - initial paging query over a message list with one message":
## Given
let store = getTestStoreQueue(1)
let
pageSize: uint64 = 10
cursor: Option[Index] = none(Index)
forward: bool = false
## When
let page = store.getPage(pageSize=pageSize, forward=forward, cursor=cursor)
## Then
let data = page.tryGet().mapIt(it[1])
check:
data.len == 1
test "Backward pagination - paging query over a message list with one message":
## Given
let store = getTestStoreQueue(1)
let
pageSize: uint64 = 10
cursor: Option[Index] = some(indexList[0])
forward: bool = false
## When
let page = store.getPage(pageSize=pageSize, forward=forward, cursor=cursor)
## Then
let data = page.tryGet().mapIt(it[1])
check:
data.len == 0
test "Backward pagination - with predicate":
## Given
let
pageSize: uint64 = 3
cursor: Option[Index] = none(Index)
forward = false
proc onlyOddTimes(i: IndexedWakuMessage): bool = i.msg.timestamp.int64 mod 2 != 0
## When
let page = store.getPage(pageSize=pageSize, forward=forward, cursor=cursor, predicate=onlyOddTimes)
## Then
let data = page.tryGet().mapIt(it[1])
check:
data.mapIt(it.timestamp.int) == @[5, 7,9].reversed

View File

@ -1,287 +0,0 @@
{.used.}
import
std/[unittest, options, sequtils],
stew/byteutils,
chronos
import
../../waku/common/sqlite,
../../waku/v2/node/message_store/sqlite_store,
../../waku/v2/node/message_store/message_retention_policy,
../../waku/v2/node/message_store/message_retention_policy_capacity,
../../waku/v2/protocol/waku_message,
../../waku/v2/protocol/waku_store,
../../waku/v2/utils/time,
./utils,
./testlib/common
proc newTestDatabase(): SqliteDatabase =
SqliteDatabase.new(":memory:").tryGet()
suite "SQLite message store - init store":
test "init store":
## Given
let database = newTestDatabase()
## When
let resStore = SqliteStore.init(database)
## Then
check:
resStore.isOk()
let store = resStore.tryGet()
check:
not store.isNil()
## Teardown
store.close()
test "init store with prepopulated database with messages older than retention policy":
# TODO: Implement initialization test cases
discard
test "init store with prepopulated database with messsage count greater than max capacity":
# TODO: Implement initialization test cases
discard
# TODO: Add test cases to cover the store retention time fucntionality
suite "SQLite message store - insert messages":
test "insert a message":
## Given
const contentTopic = "test-content-topic"
let
database = newTestDatabase()
store = SqliteStore.init(database).tryGet()
let message = fakeWakuMessage(contentTopic=contentTopic)
## When
let resPut = store.put(DefaultPubsubTopic, message)
## Then
check:
resPut.isOk()
let storedMsg = store.getAllMessages().tryGet()
check:
storedMsg.len == 1
storedMsg.all do (item: auto) -> bool:
let (pubsubTopic, msg, digest, storeTimestamp) = item
msg.contentTopic == contentTopic and
pubsubTopic == DefaultPubsubTopic
## Teardown
store.close()
test "store capacity should be limited":
## Given
const storeCapacity = 5
const contentTopic = "test-content-topic"
let
database = newTestDatabase()
store = SqliteStore.init(database).tryGet()
retentionPolicy: MessageRetentionPolicy = CapacityRetentionPolicy.init(capacity=storeCapacity)
let messages = @[
fakeWakuMessage(ts=ts(0)),
fakeWakuMessage(ts=ts(1)),
fakeWakuMessage(contentTopic=contentTopic, ts=ts(2)),
fakeWakuMessage(contentTopic=contentTopic, ts=ts(3)),
fakeWakuMessage(contentTopic=contentTopic, ts=ts(4)),
fakeWakuMessage(contentTopic=contentTopic, ts=ts(5)),
fakeWakuMessage(contentTopic=contentTopic, ts=ts(6))
]
## When
for msg in messages:
require store.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp).isOk()
require retentionPolicy.execute(store).isOk()
## Then
let storedMsg = store.getAllMessages().tryGet()
check:
storedMsg.len == storeCapacity
storedMsg.all do (item: auto) -> bool:
let (pubsubTopic, msg, digest, storeTimestamp) = item
msg.contentTopic == contentTopic and
pubsubTopic == DefaultPubsubTopic
## Teardown
store.close()
# TODO: Review the following suite test cases
suite "Message Store":
test "set and get works":
## Given
let
database = newTestDatabase()
store = SqliteStore.init(database).get()
let
t1 = ts(0)
t2 = ts(1)
t3 = high(int64)
var msgs = @[
WakuMessage(payload: @[byte 1, 2, 3], contentTopic: DefaultContentTopic, version: uint32(0), timestamp: t1),
WakuMessage(payload: @[byte 1, 2, 3, 4], contentTopic: DefaultContentTopic, version: uint32(1), timestamp: t2),
# high(uint32) is the largest value that fits in uint32, this is to make sure there is no overflow in the storage
WakuMessage(payload: @[byte 1, 2, 3, 4, 5], contentTopic: DefaultContentTopic, version: high(uint32), timestamp: t3),
]
var indexes: seq[HistoryCursor] = @[]
for msg in msgs:
require store.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp).isOk()
let cursor = HistoryCursor(
pubsubTopic: DefaultPubsubTopic,
senderTime: msg.timestamp,
storeTime: msg.timestamp,
digest: computeDigest(msg)
)
indexes.add(cursor)
## When
let res = store.getAllMessages()
## Then
check:
res.isOk()
let result = res.value
check:
result.len == 3
# flags for version
var v0Flag, v1Flag, vMaxFlag: bool = false
# flags for sender timestamp
var t1Flag, t2Flag, t3Flag: bool = false
# flags for receiver timestamp
var rt1Flag, rt2Flag, rt3Flag: bool = false
for (pubsubTopic, msg, digest, receiverTimestamp) in result:
check:
pubsubTopic == DefaultPubsubTopic
# check correct retrieval of receiver timestamps
if receiverTimestamp == indexes[0].storeTime: rt1Flag = true
if receiverTimestamp == indexes[1].storeTime: rt2Flag = true
if receiverTimestamp == indexes[2].storeTime: rt3Flag = true
check:
msg in msgs
# check the correct retrieval of versions
if msg.version == uint32(0): v0Flag = true
if msg.version == uint32(1): v1Flag = true
if msg.version == high(uint32): vMaxFlag = true
# check correct retrieval of sender timestamps
if msg.timestamp == t1: t1Flag = true
if msg.timestamp == t2: t2Flag = true
if msg.timestamp == t3: t3Flag = true
check:
# check version
v0Flag == true
v1Flag == true
vMaxFlag == true
# check sender timestamp
t1Flag == true
t2Flag == true
t3Flag == true
# check receiver timestamp
rt1Flag == true
rt2Flag == true
rt3Flag == true
## Cleanup
store.close()
test "set and get user version":
## Given
let
database = newTestDatabase()
store = SqliteStore.init(database).get()
## When
let resSetVersion = database.setUserVersion(5)
let resGetVersion = database.getUserVersion()
## Then
check:
resSetVersion.isOk()
resGetVersion.isOk()
let version = resGetVersion.tryGet()
check:
version == 5
## Cleanup
store.close()
# TODO: Move this test case to retention policy test suite
test "number of messages retrieved by getAll is bounded by storeCapacity":
let capacity = 10
let
database = newTestDatabase()
store = SqliteStore.init(database).tryGet()
retentionPolicy: MessageRetentionPolicy = CapacityRetentionPolicy.init(capacity=capacity)
for i in 1..capacity:
let msg = WakuMessage(payload: @[byte i], contentTopic: DefaultContentTopic, version: 0, timestamp: Timestamp(i))
require store.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp).isOk()
require retentionPolicy.execute(store).isOk()
## Then
# Test limited getAll function when store is at capacity
let resMax = store.getAllMessages()
check:
resMax.isOk()
let response = resMax.tryGet()
let lastMessageTimestamp = response[^1][1].timestamp
check:
response.len == capacity # We retrieved all items
lastMessageTimestamp == Timestamp(capacity) # Returned rows were ordered correctly # TODO: not representative because the timestamp only has second resolution
## Cleanup
store.close()
# TODO: Move this test case to retention policy test suite
test "DB store capacity":
let
contentTopic = ContentTopic("/waku/2/default-content/proto")
pubsubTopic = "/waku/2/default-waku/proto"
capacity = 100
overload = 65
let
database = newTestDatabase()
store = SqliteStore.init(database).tryGet()
retentionPolicy: MessageRetentionPolicy = CapacityRetentionPolicy.init(capacity=capacity)
for i in 1..capacity+overload:
let msg = WakuMessage(payload: ($i).toBytes(), contentTopic: contentTopic, version: uint32(0), timestamp: Timestamp(i))
require store.put(pubsubTopic, msg).isOk()
require retentionPolicy.execute(store).isOk()
# count messages in DB
let numMessages = store.getMessagesCount().tryGet()
check:
# expected number of messages is 120 because
# (capacity = 100) + (half of the overflow window = 15) + (5 messages added after after the last delete)
# the window size changes when changing `const maxStoreOverflow = 1.3 in sqlite_store
numMessages == 120
## Teardown
store.close()

View File

@ -1,642 +0,0 @@
{.used.}
import
std/[options, sequtils, algorithm],
unittest2,
chronos
import
../../waku/common/sqlite,
../../waku/v2/node/message_store/sqlite_store,
../../waku/v2/protocol/waku_message,
../../waku/v2/protocol/waku_store,
./utils,
./testlib/common
proc newTestDatabase(): SqliteDatabase =
SqliteDatabase.new(":memory:").tryGet()
suite "message store - history query":
test "single content topic":
## Given
const contentTopic = "test-content-topic"
let
database = newTestDatabase()
store = SqliteStore.init(database).tryGet()
let messages = @[
fakeWakuMessage(ts=ts(0)),
fakeWakuMessage(ts=ts(1)),
fakeWakuMessage("MSG-01", contentTopic=contentTopic, ts=ts(2)),
fakeWakuMessage("MSG-02", contentTopic=contentTopic, ts=ts(3)),
fakeWakuMessage("MSG-03", contentTopic=contentTopic, ts=ts(4)),
fakeWakuMessage("MSG-04", contentTopic=contentTopic, ts=ts(5)),
fakeWakuMessage("MSG-05", contentTopic=contentTopic, ts=ts(6)),
fakeWakuMessage("MSG-06", contentTopic=contentTopic, ts=ts(7)),
]
for msg in messages:
require store.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp).isOk()
## When
let res = store.getMessagesByHistoryQuery(
contentTopic=some(@[contentTopic]),
maxPageSize=2,
ascendingOrder=true
)
## Then
check:
res.isOk()
let filteredMessages = res.tryGet().mapIt(it[1])
check:
filteredMessages.len == 2
filteredMessages.all do (msg: WakuMessage) -> bool:
msg.contentTopic == contentTopic
filteredMessages == messages[2..3]
## Teardown
store.close()
test "single content topic and descending order":
## Given
const contentTopic = "test-content-topic"
let
database = newTestDatabase()
store = SqliteStore.init(database).tryGet()
let messages = @[
fakeWakuMessage(ts=ts(0)),
fakeWakuMessage(ts=ts(1)),
fakeWakuMessage("MSG-01", contentTopic=contentTopic, ts=ts(2)),
fakeWakuMessage("MSG-02", contentTopic=contentTopic, ts=ts(3)),
fakeWakuMessage("MSG-03", contentTopic=contentTopic, ts=ts(4)),
fakeWakuMessage("MSG-04", contentTopic=contentTopic, ts=ts(5)),
fakeWakuMessage("MSG-05", contentTopic=contentTopic, ts=ts(6)),
fakeWakuMessage("MSG-06", contentTopic=contentTopic, ts=ts(7)),
]
for msg in messages:
require store.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp).isOk()
## When
let res = store.getMessagesByHistoryQuery(
contentTopic=some(@[contentTopic]),
maxPageSize=2,
ascendingOrder=false
)
## Then
check:
res.isOk()
let filteredMessages = res.tryGet().mapIt(it[1])
check:
filteredMessages.len == 2
filteredMessages.all do (msg: WakuMessage) -> bool:
msg.contentTopic == contentTopic
filteredMessages == messages[6..7].reversed
## Teardown
store.close()
test "multiple content topic":
## Given
const contentTopic1 = "test-content-topic-1"
const contentTopic2 = "test-content-topic-2"
const contentTopic3 = "test-content-topic-3"
let
database = newTestDatabase()
store = SqliteStore.init(database).tryGet()
let messages = @[
fakeWakuMessage(ts=ts(0)),
fakeWakuMessage(ts=ts(1)),
fakeWakuMessage("MSG-01", contentTopic=contentTopic1, ts=ts(2)),
fakeWakuMessage("MSG-02", contentTopic=contentTopic2, ts=ts(3)),
fakeWakuMessage("MSG-03", contentTopic=contentTopic3, ts=ts(4)),
fakeWakuMessage("MSG-04", contentTopic=contentTopic1, ts=ts(5)),
fakeWakuMessage("MSG-05", contentTopic=contentTopic2, ts=ts(6)),
fakeWakuMessage("MSG-06", contentTopic=contentTopic3, ts=ts(7)),
]
for msg in messages:
require store.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp).isOk()
## When
let res = store.getMessagesByHistoryQuery(
contentTopic=some(@[contentTopic1, contentTopic2]),
maxPageSize=2,
ascendingOrder=true
)
## Then
check:
res.isOk()
let filteredMessages = res.tryGet().mapIt(it[1])
check:
filteredMessages.len == 2
filteredMessages.all do (msg: WakuMessage) -> bool:
msg.contentTopic in @[contentTopic1, contentTopic2]
filteredMessages == messages[2..3]
## Teardown
store.close()
test "content topic and pubsub topic":
## Given
const contentTopic = "test-content-topic"
const pubsubTopic = "test-pubsub-topic"
let
database = newTestDatabase()
store = SqliteStore.init(database).tryGet()
let messages1 = @[
fakeWakuMessage(ts=ts(0)),
fakeWakuMessage(ts=ts(1)),
fakeWakuMessage("MSG-01", contentTopic=contentTopic, ts=ts(2)),
fakeWakuMessage("MSG-02", contentTopic=contentTopic, ts=ts(3)),
]
for msg in messages1:
require store.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp).isOk()
let messages2 = @[
fakeWakuMessage("MSG-03", contentTopic=contentTopic, ts=ts(4)),
fakeWakuMessage("MSG-04", contentTopic=contentTopic, ts=ts(5)),
fakeWakuMessage("MSG-05", contentTopic=contentTopic, ts=ts(6)),
fakeWakuMessage("MSG-06", contentTopic=contentTopic, ts=ts(7)),
]
for msg in messages2:
require store.put(pubsubTopic, msg, computeDigest(msg), msg.timestamp).isOk()
## When
let res = store.getMessagesByHistoryQuery(
contentTopic=some(@[contentTopic]),
pubsubTopic=some(pubsubTopic),
maxPageSize=2,
ascendingOrder=true
)
## Then
check:
res.isOk()
let filteredMessages = res.tryGet().mapIt(it[1])
check:
filteredMessages.len == 2
filteredMessages.all do (msg: WakuMessage) -> bool:
msg.contentTopic == contentTopic
filteredMessages == messages2[0..1]
## Teardown
store.close()
test "content topic and cursor":
## Given
const contentTopic = "test-content-topic"
let
database = newTestDatabase()
store = SqliteStore.init(database).tryGet()
let messages = @[
fakeWakuMessage(ts=ts(0)),
fakeWakuMessage(ts=ts(1)),
fakeWakuMessage("MSG-01", contentTopic=contentTopic, ts=ts(2)),
fakeWakuMessage("MSG-02", contentTopic=contentTopic, ts=ts(3)),
fakeWakuMessage("MSG-03", contentTopic=contentTopic, ts=ts(4)),
fakeWakuMessage("MSG-04", contentTopic=contentTopic, ts=ts(5)),
fakeWakuMessage("MSG-05", contentTopic=contentTopic, ts=ts(6)),
fakeWakuMessage("MSG-06", contentTopic=contentTopic, ts=ts(7)),
]
for msg in messages:
require store.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp).isOk()
let cursor = HistoryCursor(
pubsubTopic: DefaultPubsubTopic,
senderTime: messages[4].timestamp,
storeTime: messages[4].timestamp,
digest: computeDigest(messages[4])
)
## When
let res = store.getMessagesByHistoryQuery(
contentTopic=some(@[contentTopic]),
cursor=some(cursor),
maxPageSize=2,
ascendingOrder=true
)
## Then
check:
res.isOk()
let filteredMessages = res.tryGet().mapIt(it[1])
check:
filteredMessages.len == 2
filteredMessages.all do (msg: WakuMessage) -> bool:
msg.contentTopic == contentTopic
filteredMessages == messages[5..6]
## Teardown
store.close()
test "content topic, cursor and descending order":
## Given
const contentTopic = "test-content-topic"
let
database = newTestDatabase()
store = SqliteStore.init(database).tryGet()
let messages = @[
fakeWakuMessage(ts=ts(0)),
fakeWakuMessage(ts=ts(1)),
fakeWakuMessage("MSG-01", contentTopic=contentTopic, ts=ts(2)),
fakeWakuMessage("MSG-02", contentTopic=contentTopic, ts=ts(3)),
fakeWakuMessage("MSG-03", contentTopic=contentTopic, ts=ts(4)),
fakeWakuMessage("MSG-04", contentTopic=contentTopic, ts=ts(5)),
fakeWakuMessage("MSG-05", contentTopic=contentTopic, ts=ts(6)),
fakeWakuMessage("MSG-06", contentTopic=contentTopic, ts=ts(7)),
]
for msg in messages:
require store.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp).isOk()
let cursor = HistoryCursor(
pubsubTopic: DefaultPubsubTopic,
senderTime: messages[6].timestamp,
storeTime: messages[6].timestamp,
digest: computeDigest(messages[6])
)
## When
let res = store.getMessagesByHistoryQuery(
contentTopic=some(@[contentTopic]),
cursor=some(cursor),
maxPageSize=2,
ascendingOrder=false
)
## Then
check:
res.isOk()
let filteredMessages = res.tryGet().mapIt(it[1])
check:
filteredMessages.len == 2
filteredMessages.all do (msg: WakuMessage) -> bool:
msg.contentTopic == contentTopic
filteredMessages == messages[4..5].reversed
## Teardown
store.close()
test "content topic, pubsub topic and cursor":
## Given
const contentTopic = "test-content-topic"
const pubsubTopic = "test-pubsub-topic"
let
database = newTestDatabase()
store = SqliteStore.init(database).tryGet()
let messages1 = @[
fakeWakuMessage(ts=ts(0)),
fakeWakuMessage(ts=ts(1)),
fakeWakuMessage("MSG-01", contentTopic=contentTopic, ts=ts(2)),
fakeWakuMessage("MSG-02", contentTopic=contentTopic, ts=ts(3)),
fakeWakuMessage("MSG-03", contentTopic=contentTopic, ts=ts(4)),
]
for msg in messages1:
require store.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp).isOk()
let messages2 = @[
fakeWakuMessage("MSG-04", contentTopic=contentTopic, ts=ts(5)),
fakeWakuMessage("MSG-05", contentTopic=contentTopic, ts=ts(6)),
fakeWakuMessage("MSG-06", contentTopic=contentTopic, ts=ts(7)),
]
for msg in messages2:
require store.put(pubsubTopic, msg, computeDigest(msg), msg.timestamp).isOk()
let cursor = HistoryCursor(
pubsubTopic: DefaultPubsubTopic,
senderTime: messages2[0].timestamp,
storeTime: messages2[0].timestamp,
digest: computeDigest(messages2[0])
)
## When
let res = store.getMessagesByHistoryQuery(
contentTopic=some(@[contentTopic]),
pubsubTopic=some(pubsubTopic),
cursor=some(cursor),
maxPageSize=2,
ascendingOrder=true
)
## Then
check:
res.isOk()
let filteredMessages = res.tryGet().mapIt(it[1])
check:
filteredMessages.len == 2
filteredMessages.all do (msg: WakuMessage) -> bool:
msg.contentTopic == contentTopic
filteredMessages == messages2[0..1]
## Teardown
store.close()
test "single content topic - no results":
## Given
const contentTopic = "test-content-topic"
let
database = newTestDatabase()
store = SqliteStore.init(database).tryGet()
let messages = @[
fakeWakuMessage("MSG-01", contentTopic=DefaultContentTopic, ts=ts(2)),
fakeWakuMessage("MSG-02", contentTopic=DefaultContentTopic, ts=ts(3)),
fakeWakuMessage("MSG-03", contentTopic=DefaultContentTopic, ts=ts(4)),
fakeWakuMessage("MSG-04", contentTopic=DefaultContentTopic, ts=ts(5)),
fakeWakuMessage("MSG-05", contentTopic=DefaultContentTopic, ts=ts(6)),
]
for msg in messages:
require store.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp).isOk()
## When
let res = store.getMessagesByHistoryQuery(
contentTopic=some(@[contentTopic]),
maxPageSize=2,
ascendingOrder=true
)
## Then
check:
res.isOk()
let filteredMessages = res.tryGet().mapIt(it[1])
check:
filteredMessages.len == 0
## Teardown
store.close()
test "content topic and page size":
## Given
let pageSize: uint64 = 50
let
database = newTestDatabase()
store = SqliteStore.init(database).tryGet()
for t in 0..<70:
let msg = fakeWakuMessage("MSG-" & $t, DefaultContentTopic, ts=ts(t))
require store.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp).isOk()
## When
let res = store.getMessagesByHistoryQuery(
contentTopic=some(@[DefaultContentTopic]),
maxPageSize=pageSize,
ascendingOrder=true
)
## Then
check:
res.isOk()
let filteredMessages = res.tryGet().mapIt(it[1])
check:
filteredMessages.len == 50
## Teardown
store.close()
test "content topic and page size - not enough messages stored":
## Given
let pageSize: uint64 = 50
let
database = newTestDatabase()
store = SqliteStore.init(database).tryGet()
for t in 0..<40:
let msg = fakeWakuMessage("MSG-" & $t, DefaultContentTopic, ts=ts(t))
require store.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp).isOk()
## When
let res = store.getMessagesByHistoryQuery(
contentTopic=some(@[DefaultContentTopic]),
maxPageSize=pageSize,
ascendingOrder=true
)
## Then
check:
res.isOk()
let filteredMessages = res.tryGet().mapIt(it[1])
check:
filteredMessages.len == 40
## Teardown
store.close()
test "single content topic and valid time range":
## Given
const contentTopic = "test-content-topic"
let timeOrigin = now()
let
database = newTestDatabase()
store = SqliteStore.init(database).tryGet()
let messages = @[
fakeWakuMessage("MSG-01", contentTopic=contentTopic, ts=timeOrigin + 00),
fakeWakuMessage("MSG-02", contentTopic=contentTopic, ts=timeOrigin + 10),
fakeWakuMessage("MSG-03", contentTopic=contentTopic, ts=timeOrigin + 20),
fakeWakuMessage("MSG-04", contentTopic=contentTopic, ts=timeOrigin + 30),
fakeWakuMessage("MSG-05", contentTopic=contentTopic, ts=timeOrigin + 50),
]
for msg in messages:
require store.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp).isOk()
## When
let res = store.getMessagesByHistoryQuery(
contentTopic=some(@[contentTopic]),
startTime=some(timeOrigin + 5),
endTime=some(timeOrigin + 35),
maxPageSize=2,
ascendingOrder=true
)
## Then
check:
res.isOk()
let filteredMessages = res.tryGet().mapIt(it[1])
check:
filteredMessages.len == 2
filteredMessages.all do (msg: WakuMessage) -> bool:
msg.contentTopic == contentTopic
filteredMessages == messages[1..2]
## Teardown
store.close()
test "single content topic and invalid time range - no results":
## Given
const contentTopic = "test-content-topic"
let timeOrigin = now()
let
database = newTestDatabase()
store = SqliteStore.init(database).tryGet()
let messages = @[
fakeWakuMessage("MSG-01", contentTopic=contentTopic, ts=timeOrigin + 00),
fakeWakuMessage("MSG-02", contentTopic=contentTopic, ts=timeOrigin + 10),
fakeWakuMessage("MSG-03", contentTopic=contentTopic, ts=timeOrigin + 20),
fakeWakuMessage("MSG-04", contentTopic=contentTopic, ts=timeOrigin + 30),
fakeWakuMessage("MSG-05", contentTopic=contentTopic, ts=timeOrigin + 50),
]
for msg in messages:
require store.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp).isOk()
## When
let res = store.getMessagesByHistoryQuery(
contentTopic=some(@[contentTopic]),
startTime=some(timeOrigin + 35),
endTime=some(timeOrigin + 10),
maxPageSize=2,
ascendingOrder=true
)
check:
res.isOk()
let filteredMessages = res.tryGet().mapIt(it[1])
check:
filteredMessages.len == 0
## Teardown
store.close()
test "single content topic and only time range start":
## Given
const contentTopic = "test-content-topic"
let timeOrigin = now()
let
database = newTestDatabase()
store = SqliteStore.init(database).tryGet()
let messages = @[
fakeWakuMessage("MSG-01", contentTopic=contentTopic, ts=timeOrigin + 00),
fakeWakuMessage("MSG-02", contentTopic=contentTopic, ts=timeOrigin + 10),
fakeWakuMessage("MSG-03", contentTopic=contentTopic, ts=timeOrigin + 20),
fakeWakuMessage("MSG-04", contentTopic=contentTopic, ts=timeOrigin + 30),
fakeWakuMessage("MSG-05", contentTopic=contentTopic, ts=timeOrigin + 50),
]
for msg in messages:
require store.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp).isOk()
## When
let res = store.getMessagesByHistoryQuery(
contentTopic=some(@[contentTopic]),
startTime=some(timeOrigin + 15),
ascendingOrder=false
)
check:
res.isOk()
let filteredMessages = res.tryGet().mapIt(it[1])
check:
filteredMessages.len == 3
filteredMessages.all do (msg: WakuMessage) -> bool:
msg.contentTopic == contentTopic
filteredMessages == messages[2..4].reversed
## Teardown
store.close()
test "single content topic, cursor and only time range start":
## Given
const contentTopic = "test-content-topic"
let timeOrigin = now()
let
database = newTestDatabase()
store = SqliteStore.init(database).tryGet()
let messages = @[
fakeWakuMessage("MSG-01", contentTopic=contentTopic, ts=timeOrigin + 00),
fakeWakuMessage("MSG-02", contentTopic=contentTopic, ts=timeOrigin + 10),
fakeWakuMessage("MSG-03", contentTopic=contentTopic, ts=timeOrigin + 20),
fakeWakuMessage("MSG-04", contentTopic=contentTopic, ts=timeOrigin + 30),
fakeWakuMessage("MSG-05", contentTopic=contentTopic, ts=timeOrigin + 50),
]
for msg in messages:
require store.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp).isOk()
let cursor = HistoryCursor(
pubsubTopic: DefaultPubsubTopic,
senderTime: messages[3].timestamp,
storeTime: messages[3].timestamp,
digest: computeDigest(messages[3])
)
## When
let res = store.getMessagesByHistoryQuery(
contentTopic=some(@[contentTopic]),
cursor=some(cursor),
startTime=some(timeOrigin + 15),
maxPageSize=2,
ascendingOrder=true
)
check:
res.isOk()
let filteredMessages = res.tryGet().mapIt(it[1])
check:
filteredMessages.len == 1
filteredMessages.all do (msg: WakuMessage) -> bool:
msg.contentTopic == contentTopic
filteredMessages == @[messages[^1]]
## Teardown
store.close()

View File

@ -1,41 +1,20 @@
{.used.}
import
std/[options, sequtils],
std/options,
testutils/unittests,
chronos,
chronicles,
libp2p/crypto/crypto
import
../../waku/common/sqlite,
../../waku/v2/node/message_store/queue_store,
../../waku/v2/node/message_store/sqlite_store,
../../waku/v2/node/peer_manager/peer_manager,
../../waku/v2/protocol/waku_message,
../../waku/v2/protocol/waku_store,
../../waku/v2/protocol/waku_store/client,
../../waku/v2/utils/time,
./testlib/common,
./testlib/switch
proc newTestDatabase(): SqliteDatabase =
SqliteDatabase.new(":memory:").tryGet()
proc newTestMessageStore(): MessageStore =
let database = newTestDatabase()
SqliteStore.init(database).tryGet()
proc newTestWakuStore(switch: Switch, store=newTestMessageStore()): Future[WakuStore] {.async.} =
let
peerManager = PeerManager.new(switch)
rng = crypto.newRng()
proto = WakuStore.new(peerManager, rng, store)
await proto.start()
switch.mount(proto)
return proto
proc newTestWakuStore(switch: Switch, handler: HistoryQueryHandler): Future[WakuStore] {.async.} =
let
@ -48,11 +27,11 @@ proc newTestWakuStore(switch: Switch, handler: HistoryQueryHandler): Future[Waku
return proto
proc newTestWakuStoreClient(switch: Switch, store: MessageStore = nil): WakuStoreClient =
proc newTestWakuStoreClient(switch: Switch): WakuStoreClient =
let
peerManager = PeerManager.new(switch)
rng = crypto.newRng()
WakuStoreClient.new(peerManager, rng, store)
WakuStoreClient.new(peerManager, rng)
suite "Waku Store - query handler":
@ -141,662 +120,3 @@ suite "Waku Store - query handler":
## Cleanup
await allFutures(serverSwitch.stop(), clientSwitch.stop())
procSuite "Waku Store - history query":
## Fixtures
let storeA = block:
let store = newTestMessageStore()
let msgList = @[
fakeWakuMessage(contentTopic=ContentTopic("2"), ts=Timestamp(0)),
fakeWakuMessage(contentTopic=ContentTopic("1"), ts=Timestamp(1)),
fakeWakuMessage(contentTopic=ContentTopic("2"), ts=Timestamp(2)),
fakeWakuMessage(contentTopic=ContentTopic("1"), ts=Timestamp(3)),
fakeWakuMessage(contentTopic=ContentTopic("2"), ts=Timestamp(4)),
fakeWakuMessage(contentTopic=ContentTopic("1"), ts=Timestamp(5)),
fakeWakuMessage(contentTopic=ContentTopic("2"), ts=Timestamp(6)),
fakeWakuMessage(contentTopic=ContentTopic("1"), ts=Timestamp(7)),
fakeWakuMessage(contentTopic=ContentTopic("2"), ts=Timestamp(8)),
fakeWakuMessage(contentTopic=ContentTopic("1"), ts=Timestamp(9))
]
for msg in msgList:
require store.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp).isOk()
store
asyncTest "handle query":
## Setup
let
serverSwitch = newTestSwitch()
clientSwitch = newTestSwitch()
await allFutures(serverSwitch.start(), clientSwitch.start())
let
server = await newTestWakuStore(serverSwitch)
client = newTestWakuStoreClient(clientSwitch)
## Given
let topic = ContentTopic("1")
let
msg1 = fakeWakuMessage(contentTopic=topic)
msg2 = fakeWakuMessage()
server.handleMessage("foo", msg1)
server.handleMessage("foo", msg2)
let serverPeerInfo =serverSwitch.peerInfo.toRemotePeerInfo()
## When
let req = HistoryQuery(contentTopics: @[topic])
let queryRes = await client.query(req, peer=serverPeerInfo)
## Then
check:
queryRes.isOk()
let response = queryRes.tryGet()
check:
response.messages.len == 1
response.messages == @[msg1]
## Cleanup
await allFutures(serverSwitch.stop(), clientSwitch.stop())
asyncTest "handle query with multiple content filters":
## Setup
let
serverSwitch = newTestSwitch()
clientSwitch = newTestSwitch()
await allFutures(serverSwitch.start(), clientSwitch.start())
let
server = await newTestWakuStore(serverSwitch)
client = newTestWakuStoreClient(clientSwitch)
## Given
let
topic1 = ContentTopic("1")
topic2 = ContentTopic("2")
topic3 = ContentTopic("3")
let
msg1 = fakeWakuMessage(contentTopic=topic1)
msg2 = fakeWakuMessage(contentTopic=topic2)
msg3 = fakeWakuMessage(contentTopic=topic3)
server.handleMessage("foo", msg1)
server.handleMessage("foo", msg2)
server.handleMessage("foo", msg3)
let serverPeerInfo =serverSwitch.peerInfo.toRemotePeerInfo()
## When
let req = HistoryQuery(contentTopics: @[topic1, topic3])
let queryRes = await client.query(req, peer=serverPeerInfo)
## Then
check:
queryRes.isOk()
let response = queryRes.tryGet()
check:
response.messages.len() == 2
response.messages.anyIt(it == msg1)
response.messages.anyIt(it == msg3)
## Cleanup
await allFutures(clientSwitch.stop(), serverSwitch.stop())
asyncTest "handle query with pubsub topic filter":
## Setup
let
serverSwitch = newTestSwitch()
clientSwitch = newTestSwitch()
await allFutures(serverSwitch.start(), clientSwitch.start())
let
server = await newTestWakuStore(serverSwitch)
client = newTestWakuStoreClient(clientSwitch)
## Given
let
pubsubTopic1 = "queried-topic"
pubsubTopic2 = "non-queried-topic"
let
contentTopic1 = ContentTopic("1")
contentTopic2 = ContentTopic("2")
contentTopic3 = ContentTopic("3")
let
msg1 = fakeWakuMessage(contentTopic=contentTopic1)
msg2 = fakeWakuMessage(contentTopic=contentTopic2)
msg3 = fakeWakuMessage(contentTopic=contentTopic3)
server.handleMessage(pubsubtopic1, msg1)
server.handleMessage(pubsubtopic2, msg2)
server.handleMessage(pubsubtopic2, msg3)
let serverPeerInfo =serverSwitch.peerInfo.toRemotePeerInfo()
## When
# this query targets: pubsubtopic1 AND (contentTopic1 OR contentTopic3)
let req = HistoryQuery(
pubsubTopic: some(pubsubTopic1),
contentTopics: @[contentTopic1, contentTopic3]
)
let queryRes = await client.query(req, peer=serverPeerInfo)
## Then
check:
queryRes.isOk()
let response = queryRes.tryGet()
check:
response.messages.len() == 1
response.messages.anyIt(it == msg1)
## Cleanup
await allFutures(clientSwitch.stop(), serverSwitch.stop())
asyncTest "handle query with pubsub topic filter - no match":
## Setup
let
serverSwitch = newTestSwitch()
clientSwitch = newTestSwitch()
await allFutures(serverSwitch.start(), clientSwitch.start())
let
server = await newTestWakuStore(serverSwitch)
client = newTestWakuStoreClient(clientSwitch)
## Given
let
pubsubtopic1 = "queried-topic"
pubsubtopic2 = "non-queried-topic"
let
msg1 = fakeWakuMessage()
msg2 = fakeWakuMessage()
msg3 = fakeWakuMessage()
server.handleMessage(pubsubtopic2, msg1)
server.handleMessage(pubsubtopic2, msg2)
server.handleMessage(pubsubtopic2, msg3)
let serverPeerInfo =serverSwitch.peerInfo.toRemotePeerInfo()
## When
let req = HistoryQuery(pubsubTopic: some(pubsubTopic1))
let res = await client.query(req, peer=serverPeerInfo)
## Then
check:
res.isOk()
let response = res.tryGet()
check:
response.messages.len() == 0
## Cleanup
await allFutures(clientSwitch.stop(), serverSwitch.stop())
asyncTest "handle query with pubsub topic filter - match the entire stored messages":
## Setup
let
serverSwitch = newTestSwitch()
clientSwitch = newTestSwitch()
await allFutures(serverSwitch.start(), clientSwitch.start())
let
server = await newTestWakuStore(serverSwitch)
client = newTestWakuStoreClient(clientSwitch)
## Given
let pubsubTopic = "queried-topic"
let
msg1 = fakeWakuMessage(payload="TEST-1")
msg2 = fakeWakuMessage(payload="TEST-2")
msg3 = fakeWakuMessage(payload="TEST-3")
server.handleMessage(pubsubTopic, msg1)
server.handleMessage(pubsubTopic, msg2)
server.handleMessage(pubsubTopic, msg3)
let serverPeerInfo =serverSwitch.peerInfo.toRemotePeerInfo()
## When
let req = HistoryQuery(pubsubTopic: some(pubsubTopic))
let res = await client.query(req, peer=serverPeerInfo)
## Then
check:
res.isOk()
let response = res.tryGet()
check:
response.messages.len() == 3
response.messages.anyIt(it == msg1)
response.messages.anyIt(it == msg2)
response.messages.anyIt(it == msg3)
## Cleanup
await allFutures(clientSwitch.stop(), serverSwitch.stop())
asyncTest "handle query with forward pagination":
## Setup
let
serverSwitch = newTestSwitch()
clientSwitch = newTestSwitch()
await allFutures(serverSwitch.start(), clientSwitch.start())
let
server = await newTestWakuStore(serverSwitch)
client = newTestWakuStoreClient(clientSwitch)
## Given
let currentTime = now()
let msgList = @[
WakuMessage(payload: @[byte 0], contentTopic: ContentTopic("2"), timestamp: currentTime - 9),
WakuMessage(payload: @[byte 1], contentTopic: DefaultContentTopic, timestamp: currentTime - 8),
WakuMessage(payload: @[byte 2], contentTopic: DefaultContentTopic, timestamp: currentTime - 7),
WakuMessage(payload: @[byte 3], contentTopic: DefaultContentTopic, timestamp: currentTime - 6),
WakuMessage(payload: @[byte 4], contentTopic: DefaultContentTopic, timestamp: currentTime - 5),
WakuMessage(payload: @[byte 5], contentTopic: DefaultContentTopic, timestamp: currentTime - 4),
WakuMessage(payload: @[byte 6], contentTopic: DefaultContentTopic, timestamp: currentTime - 3),
WakuMessage(payload: @[byte 7], contentTopic: DefaultContentTopic, timestamp: currentTime - 2),
WakuMessage(payload: @[byte 8], contentTopic: DefaultContentTopic, timestamp: currentTime - 1),
WakuMessage(payload: @[byte 9], contentTopic: ContentTopic("2"), timestamp: currentTime)
]
for msg in msgList:
require server.store.put(DefaultPubsubTopic, msg).isOk()
let serverPeerInfo =serverSwitch.peerInfo.toRemotePeerInfo()
## When
var req = HistoryQuery(
contentTopics: @[DefaultContentTopic],
pageSize: 2,
ascending: true
)
var res = await client.query(req, peer=serverPeerInfo)
require res.isOk()
var
response = res.tryGet()
totalMessages = response.messages.len()
totalQueries = 1
while response.cursor.isSome():
require:
totalQueries <= 4 # Sanity check here and guarantee that the test will not run forever
response.messages.len() == 2
response.pageSize == 2
response.ascending == true
req.cursor = response.cursor
# Continue querying
res = await client.query(req, peer=serverPeerInfo)
require res.isOk()
response = res.tryGet()
totalMessages += response.messages.len()
totalQueries += 1
## Then
check:
totalQueries == 4 # 4 queries of pageSize 2
totalMessages == 8 # 8 messages in total
## Cleanup
await allFutures(clientSwitch.stop(), serverSwitch.stop())
asyncTest "handle query with backward pagination":
## Setup
let
serverSwitch = newTestSwitch()
clientSwitch = newTestSwitch()
await allFutures(serverSwitch.start(), clientSwitch.start())
let
server = await newTestWakuStore(serverSwitch)
client = newTestWakuStoreClient(clientSwitch)
## Given
let currentTime = now()
let msgList = @[
WakuMessage(payload: @[byte 0], contentTopic: ContentTopic("2"), timestamp: currentTime - 9),
WakuMessage(payload: @[byte 1], contentTopic: DefaultContentTopic, timestamp: currentTime - 8),
WakuMessage(payload: @[byte 2], contentTopic: DefaultContentTopic, timestamp: currentTime - 7),
WakuMessage(payload: @[byte 3], contentTopic: DefaultContentTopic, timestamp: currentTime - 6),
WakuMessage(payload: @[byte 4], contentTopic: DefaultContentTopic, timestamp: currentTime - 5),
WakuMessage(payload: @[byte 5], contentTopic: DefaultContentTopic, timestamp: currentTime - 4),
WakuMessage(payload: @[byte 6], contentTopic: DefaultContentTopic, timestamp: currentTime - 3),
WakuMessage(payload: @[byte 7], contentTopic: DefaultContentTopic, timestamp: currentTime - 2),
WakuMessage(payload: @[byte 8], contentTopic: DefaultContentTopic, timestamp: currentTime - 1),
WakuMessage(payload: @[byte 9], contentTopic: ContentTopic("2"), timestamp: currentTime)
]
for msg in msgList:
require server.store.put(DefaultPubsubTopic, msg).isOk()
let serverPeerInfo =serverSwitch.peerInfo.toRemotePeerInfo()
## When
var req = HistoryQuery(
contentTopics: @[DefaultContentTopic],
pageSize: 2,
ascending: false
)
var res = await client.query(req, peer=serverPeerInfo)
require res.isOk()
var
response = res.tryGet()
totalMessages = response.messages.len()
totalQueries = 1
while response.cursor.isSome():
require:
totalQueries <= 4 # Sanity check here and guarantee that the test will not run forever
response.messages.len() == 2
response.pageSize == 2
response.ascending == false
req.cursor = response.cursor
# Continue querying
res = await client.query(req, peer=serverPeerInfo)
require res.isOk()
response = res.tryGet()
totalMessages += response.messages.len()
totalQueries += 1
## Then
check:
totalQueries == 4 # 4 queries of pageSize 2
totalMessages == 8 # 8 messages in total
## Cleanup
await allFutures(clientSwitch.stop(), serverSwitch.stop())
asyncTest "handle query with no paging info - auto-pagination":
## Setup
let
serverSwitch = newTestSwitch()
clientSwitch = newTestSwitch()
await allFutures(serverSwitch.start(), clientSwitch.start())
let
server = await newTestWakuStore(serverSwitch)
client = newTestWakuStoreClient(clientSwitch)
## Given
let msgList = @[
WakuMessage(payload: @[byte 0], contentTopic: ContentTopic("2")),
WakuMessage(payload: @[byte 1], contentTopic: DefaultContentTopic),
WakuMessage(payload: @[byte 2], contentTopic: DefaultContentTopic),
WakuMessage(payload: @[byte 3], contentTopic: DefaultContentTopic),
WakuMessage(payload: @[byte 4], contentTopic: DefaultContentTopic),
WakuMessage(payload: @[byte 5], contentTopic: DefaultContentTopic),
WakuMessage(payload: @[byte 6], contentTopic: DefaultContentTopic),
WakuMessage(payload: @[byte 7], contentTopic: DefaultContentTopic),
WakuMessage(payload: @[byte 8], contentTopic: DefaultContentTopic),
WakuMessage(payload: @[byte 9], contentTopic: ContentTopic("2"))
]
for msg in msgList:
require server.store.put(DefaultPubsubTopic, msg).isOk()
let serverPeerInfo =serverSwitch.peerInfo.toRemotePeerInfo()
## When
let req = HistoryQuery(contentTopics: @[DefaultContentTopic])
let res = await client.query(req, peer=serverPeerInfo)
## Then
check:
res.isOk()
let response = res.tryGet()
check:
## No pagination specified. Response will be auto-paginated with
## up to MaxPageSize messages per page.
response.messages.len() == 8
response.cursor.isNone()
## Cleanup
await allFutures(clientSwitch.stop(), serverSwitch.stop())
asyncTest "handle temporal history query with a valid time window":
## Setup
let
serverSwitch = newTestSwitch()
clientSwitch = newTestSwitch()
await allFutures(serverSwitch.start(), clientSwitch.start())
let
server = newTestWakuStore(serverSwitch, store=storeA)
client = newTestWakuStoreClient(clientSwitch)
## Given
let req = HistoryQuery(
contentTopics: @[ContentTopic("1")],
startTime: some(Timestamp(2)),
endTime: some(Timestamp(5))
)
let serverPeerInfo =serverSwitch.peerInfo.toRemotePeerInfo()
## When
let res = await client.query(req, peer=serverPeerInfo)
## Then
check res.isOk()
let response = res.tryGet()
check:
response.messages.len() == 2
response.messages.anyIt(it.timestamp == Timestamp(3))
response.messages.anyIt(it.timestamp == Timestamp(5))
## Cleanup
await allFutures(clientSwitch.stop(), serverSwitch.stop())
asyncTest "handle temporal history query with a zero-size time window":
# a zero-size window results in an empty list of history messages
## Setup
let
serverSwitch = newTestSwitch()
clientSwitch = newTestSwitch()
await allFutures(serverSwitch.start(), clientSwitch.start())
let
server = await newTestWakuStore(serverSwitch, store=storeA)
client = newTestWakuStoreClient(clientSwitch)
## Given
let req = HistoryQuery(
contentTopics: @[ContentTopic("1")],
startTime: some(Timestamp(2)),
endTime: some(Timestamp(2))
)
let serverPeerInfo =serverSwitch.peerInfo.toRemotePeerInfo()
## When
let res = await client.query(req, peer=serverPeerInfo)
## Then
check res.isOk()
let response = res.tryGet()
check:
response.messages.len == 0
## Cleanup
await allFutures(clientSwitch.stop(), serverSwitch.stop())
asyncTest "handle temporal history query with an invalid time window":
# A history query with an invalid time range results in an empty list of history messages
## Setup
let
serverSwitch = newTestSwitch()
clientSwitch = newTestSwitch()
await allFutures(serverSwitch.start(), clientSwitch.start())
let
server = await newTestWakuStore(serverSwitch, store=storeA)
client = newTestWakuStoreClient(clientSwitch)
## Given
let req = HistoryQuery(
contentTopics: @[ContentTopic("1")],
startTime: some(Timestamp(5)),
endTime: some(Timestamp(2))
)
let serverPeerInfo = serverSwitch.peerInfo.toRemotePeerInfo()
## When
let res = await client.query(req, peer=serverPeerInfo)
## Then
check res.isOk()
let response = res.tryGet()
check:
response.messages.len == 0
## Cleanup
await allFutures(clientSwitch.stop(), serverSwitch.stop())
suite "Message Store - message handling":
asyncTest "it should store a valid and non-ephemeral message":
## Setup
let store = StoreQueueRef.new(5)
let switch = newTestSwitch()
let proto = await newTestWakuStore(switch, store)
## Given
let validSenderTime = now()
let message = fakeWakuMessage(ephemeral=false, ts=validSenderTime)
## When
proto.handleMessage(DefaultPubSubTopic, message)
## Then
check:
store.getMessagesCount().tryGet() == 1
## Cleanup
await switch.stop()
asyncTest "it should not store an ephemeral message":
## Setup
let store = StoreQueueRef.new(10)
let switch = newTestSwitch()
let proto = await newTestWakuStore(switch, store)
## Given
let msgList = @[
fakeWakuMessage(ephemeral = false, payload = "1"),
fakeWakuMessage(ephemeral = true, payload = "2"),
fakeWakuMessage(ephemeral = true, payload = "3"),
fakeWakuMessage(ephemeral = true, payload = "4"),
fakeWakuMessage(ephemeral = false, payload = "5"),
]
## When
for msg in msgList:
proto.handleMessage(DefaultPubsubTopic, msg)
## Then
check:
store.len == 2
## Cleanup
await switch.stop()
asyncTest "it should store a message with no sender timestamp":
## Setup
let store = StoreQueueRef.new(5)
let switch = newTestSwitch()
let proto = await newTestWakuStore(switch, store)
## Given
let invalidSenderTime = 0
let message = fakeWakuMessage(ts=invalidSenderTime)
## When
proto.handleMessage(DefaultPubSubTopic, message)
## Then
check:
store.getMessagesCount().tryGet() == 1
## Cleanup
await switch.stop()
asyncTest "it should not store a message with a sender time variance greater than max time variance (future)":
## Setup
let store = StoreQueueRef.new(5)
let switch = newTestSwitch()
let proto = await newTestWakuStore(switch, store)
## Given
let
now = now()
invalidSenderTime = now + MaxMessageTimestampVariance + 1
let message = fakeWakuMessage(ts=invalidSenderTime)
## When
proto.handleMessage(DefaultPubSubTopic, message)
## Then
check:
store.getMessagesCount().tryGet() == 0
## Cleanup
await switch.stop()
asyncTest "it should not store a message with a sender time variance greater than max time variance (past)":
## Setup
let store = StoreQueueRef.new(5)
let switch = newTestSwitch()
let proto = await newTestWakuStore(switch, store)
## Given
let
now = now()
invalidSenderTime = now - MaxMessageTimestampVariance - 1
let message = fakeWakuMessage(ts=invalidSenderTime)
## When
proto.handleMessage(DefaultPubSubTopic, message)
## Then
check:
store.getMessagesCount().tryGet() == 0
## Cleanup
await switch.stop()

View File

@ -1,169 +0,0 @@
{.used.}
import
std/options,
testutils/unittests,
chronicles,
chronos,
libp2p/crypto/crypto
import
../../waku/common/sqlite,
../../waku/v2/node/message_store/sqlite_store,
../../waku/v2/node/peer_manager/peer_manager,
../../waku/v2/protocol/waku_message,
../../waku/v2/protocol/waku_store,
../../waku/v2/protocol/waku_store/client {.all.},
./testlib/common,
./testlib/switch
proc newTestDatabase(): SqliteDatabase =
SqliteDatabase.new(":memory:").tryGet()
proc newTestStore(): MessageStore =
let database = newTestDatabase()
SqliteStore.init(database).tryGet()
proc newTestWakuStoreNode(switch: Switch, store=newTestStore()): Future[WakuStore] {.async.} =
let
peerManager = PeerManager.new(switch)
rng = crypto.newRng()
proto = WakuStore.new(peerManager, rng, store)
await proto.start()
switch.mount(proto)
return proto
proc newTestWakuStoreClient(switch: Switch, store: MessageStore = nil): WakuStoreClient =
let
peerManager = PeerManager.new(switch)
rng = crypto.newRng()
WakuStoreClient.new(peerManager, rng, store)
procSuite "Waku Store Client":
## Fixtures
let testStore = block:
let store = newTestStore()
let msgList = @[
fakeWakuMessage(payload= @[byte 0], contentTopic=ContentTopic("0")),
fakeWakuMessage(payload= @[byte 1], contentTopic=DefaultContentTopic),
fakeWakuMessage(payload= @[byte 2], contentTopic=DefaultContentTopic),
fakeWakuMessage(payload= @[byte 3], contentTopic=DefaultContentTopic),
fakeWakuMessage(payload= @[byte 4], contentTopic=DefaultContentTopic),
fakeWakuMessage(payload= @[byte 5], contentTopic=DefaultContentTopic),
fakeWakuMessage(payload= @[byte 6], contentTopic=DefaultContentTopic),
fakeWakuMessage(payload= @[byte 7], contentTopic=DefaultContentTopic),
fakeWakuMessage(payload= @[byte 8], contentTopic=DefaultContentTopic),
fakeWakuMessage(payload= @[byte 9], contentTopic=ContentTopic("9")),
fakeWakuMessage(payload= @[byte 10], contentTopic=DefaultContentTopic),
fakeWakuMessage(payload= @[byte 11], contentTopic=ContentTopic("11")),
fakeWakuMessage(payload= @[byte 12], contentTopic=DefaultContentTopic),
]
for msg in msgList:
assert store.put(DefaultPubsubTopic, msg).isOk()
store
asyncTest "single query to peer":
## Setup
let
serverSwitch = newTestSwitch()
clientSwitch = newTestSwitch()
await allFutures(serverSwitch.start(), clientSwitch.start())
let
server = await newTestWakuStoreNode(serverSwitch, store=testStore)
client = newTestWakuStoreClient(clientSwitch)
## Given
let peer = serverSwitch.peerInfo.toRemotePeerInfo()
let req = HistoryQuery(contentTopics: @[DefaultContentTopic], pageSize: 8)
## When
let res = await client.query(req, peer)
## Then
check:
res.isOk()
let response = res.tryGet()
check:
## No pagination specified. Response will be auto-paginated with
## up to MaxPageSize messages per page.
response.messages.len() == 8
response.cursor.isSome()
## Cleanup
await allFutures(clientSwitch.stop(), serverSwitch.stop())
asyncTest "multiple query to peer with pagination":
## Setup
let
serverSwitch = newTestSwitch()
clientSwitch = newTestSwitch()
await allFutures(serverSwitch.start(), clientSwitch.start())
let
server = await newTestWakuStoreNode(serverSwitch, store=testStore)
client = newTestWakuStoreClient(clientSwitch)
## Given
let peer = serverSwitch.peerInfo.toRemotePeerInfo()
let req = HistoryQuery(contentTopics: @[DefaultContentTopic], pageSize: 5)
## When
let res = await client.queryAll(req, peer)
## Then
check:
res.isOk()
let response = res.tryGet()
check:
response.len == 10
## Cleanup
await allFutures(clientSwitch.stop(), serverSwitch.stop())
# TODO: Move to resume test suite
asyncTest "multiple query to multiple peers with pagination":
## Setup
let
serverSwitchA = newTestSwitch()
serverSwitchB = newTestSwitch()
clientSwitch = newTestSwitch()
await allFutures(serverSwitchA.start(), serverSwitchB.start(), clientSwitch.start())
let
serverA = await newTestWakuStoreNode(serverSwitchA, store=testStore)
serverB = await newTestWakuStoreNode(serverSwitchB, store=testStore)
client = newTestWakuStoreClient(clientSwitch)
## Given
let peers = @[
serverSwitchA.peerInfo.toRemotePeerInfo(),
serverSwitchB.peerInfo.toRemotePeerInfo()
]
let req = HistoryQuery(contentTopics: @[DefaultContentTopic], pageSize: 5)
## When
let res = await client.queryLoop(req, peers)
## Then
check:
res.isOk()
let response = res.tryGet()
check:
response.len == 10
## Cleanup
await allFutures(clientSwitch.stop(), serverSwitchA.stop(), serverSwitchB.stop())

View File

@ -19,9 +19,10 @@ import
proc newTestDatabase(): SqliteDatabase =
SqliteDatabase.new("memory:").tryGet()
proc newTestMessageStore(): MessageStore =
let database = newTestDatabase()
SqliteStore.init(database).tryGet()
proc newTestArchiveDriver(): ArchiveDriver =
let database = SqliteDatabase.new(":memory:").tryGet()
SqliteDriver.init(database).tryGet()
proc newTestWakuStore(switch: Switch, store=newTestMessageStore()): Future[WakuStore] {.async.} =
let
@ -34,6 +35,12 @@ proc newTestWakuStore(switch: Switch, store=newTestMessageStore()): Future[WakuS
return proto
proc newTestWakuStoreClient(switch: Switch, store: MessageStore = nil): WakuStoreClient =
let
peerManager = PeerManager.new(switch)
rng = crypto.newRng()
WakuStoreClient.new(peerManager, rng, store)
procSuite "Waku Store - resume store":
## Fixtures
@ -75,6 +82,40 @@ procSuite "Waku Store - resume store":
store
asyncTest "multiple query to multiple peers with pagination":
## Setup
let
serverSwitchA = newTestSwitch()
serverSwitchB = newTestSwitch()
clientSwitch = newTestSwitch()
await allFutures(serverSwitchA.start(), serverSwitchB.start(), clientSwitch.start())
let
serverA = await newTestWakuStoreNode(serverSwitchA, store=testStore)
serverB = await newTestWakuStoreNode(serverSwitchB, store=testStore)
client = newTestWakuStoreClient(clientSwitch)
## Given
let peers = @[
serverSwitchA.peerInfo.toRemotePeerInfo(),
serverSwitchB.peerInfo.toRemotePeerInfo()
]
let req = HistoryQuery(contentTopics: @[DefaultContentTopic], pageSize: 5)
## When
let res = await client.queryLoop(req, peers)
## Then
check:
res.isOk()
let response = res.tryGet()
check:
response.len == 10
## Cleanup
await allFutures(clientSwitch.stop(), serverSwitchA.stop(), serverSwitchB.stop())
asyncTest "resume message history":
## Setup
@ -168,3 +209,83 @@ procSuite "Waku Store - resume store":
## Cleanup
await allFutures(serverASwitch.stop(), serverBSwitch.stop(), clientSwitch.stop())
suite "WakuNode - waku store":
asyncTest "Resume proc fetches the history":
## Setup
let
serverKey = crypto.PrivateKey.random(Secp256k1, rng[])[]
server = WakuNode.new(serverKey, ValidIpAddress.init("0.0.0.0"), Port(60412))
clientKey = crypto.PrivateKey.random(Secp256k1, rng[])[]
client = WakuNode.new(clientKey, ValidIpAddress.init("0.0.0.0"), Port(60410))
await allFutures(client.start(), server.start())
let driver = newTestArchiveDriver()
server.mountArchive(some(driver), none(MessageValidator), none(RetentionPolicy))
await server.mountStore()
let clientStore = StoreQueueRef.new()
await client.mountStore(store=clientStore)
client.mountStoreClient(store=clientStore)
## Given
let message = fakeWakuMessage()
require server.wakuStore.store.put(DefaultPubsubTopic, message).isOk()
let serverPeer = server.peerInfo.toRemotePeerInfo()
## When
await client.resume(some(@[serverPeer]))
# Then
check:
client.wakuStore.store.getMessagesCount().tryGet() == 1
## Cleanup
await allFutures(client.stop(), server.stop())
asyncTest "Resume proc discards duplicate messages":
## Setup
let
serverKey = crypto.PrivateKey.random(Secp256k1, rng[])[]
server = WakuNode.new(serverKey, ValidIpAddress.init("0.0.0.0"), Port(60422))
clientKey = crypto.PrivateKey.random(Secp256k1, rng[])[]
client = WakuNode.new(clientKey, ValidIpAddress.init("0.0.0.0"), Port(60420))
await allFutures(server.start(), client.start())
await server.mountStore(store=StoreQueueRef.new())
let clientStore = StoreQueueRef.new()
await client.mountStore(store=clientStore)
client.mountStoreClient(store=clientStore)
## Given
let timeOrigin = now()
let
msg1 = fakeWakuMessage(payload="hello world1", ts=(timeOrigin + getNanoSecondTime(1)))
msg2 = fakeWakuMessage(payload="hello world2", ts=(timeOrigin + getNanoSecondTime(2)))
msg3 = fakeWakuMessage(payload="hello world3", ts=(timeOrigin + getNanoSecondTime(3)))
require server.wakuStore.store.put(DefaultPubsubTopic, msg1).isOk()
require server.wakuStore.store.put(DefaultPubsubTopic, msg2).isOk()
# Insert the same message in both node's store
let
receivedTime3 = now() + getNanosecondTime(10)
digest3 = computeDigest(msg3)
require server.wakuStore.store.put(DefaultPubsubTopic, msg3, digest3, receivedTime3).isOk()
require client.wakuStore.store.put(DefaultPubsubTopic, msg3, digest3, receivedTime3).isOk()
let serverPeer = server.peerInfo.toRemotePeerInfo()
## When
await client.resume(some(@[serverPeer]))
## Then
check:
# If the duplicates are discarded properly, then the total number of messages after resume should be 3
client.wakuStore.store.getMessagesCount().tryGet() == 3
await allFutures(client.stop(), server.stop())

View File

@ -14,10 +14,10 @@ import
libp2p/protocols/pubsub/gossipsub
import
../../waku/common/sqlite,
../../waku/v2/node/message_store/sqlite_store,
../../waku/v2/node/message_store/queue_store,
../../waku/v2/node/peer_manager/peer_manager,
../../waku/v2/protocol/waku_message,
../../waku/v2/protocol/waku_archive,
../../waku/v2/protocol/waku_archive/driver/sqlite_driver,
../../waku/v2/protocol/waku_store,
../../waku/v2/protocol/waku_filter,
../../waku/v2/utils/peers,
@ -25,10 +25,20 @@ import
../../waku/v2/node/waku_node,
./testlib/common
from std/times import getTime, toUnixFloat
proc newTestMessageStore(): MessageStore =
proc newTestArchiveDriver(): ArchiveDriver =
let database = SqliteDatabase.new(":memory:").tryGet()
SqliteStore.init(database).tryGet()
SqliteDriver.new(database).tryGet()
proc put(store: ArchiveDriver, pubsubTopic: PubsubTopic, message: WakuMessage): Result[void, string] =
let
digest = waku_archive.computeDigest(message)
receivedTime = if message.timestamp > 0: message.timestamp
else: getNanosecondTime(getTime().toUnixFloat())
store.put(pubsubTopic, message, digest, receivedTime)
procSuite "WakuNode - Store":
@ -43,13 +53,16 @@ procSuite "WakuNode - Store":
client = WakuNode.new(clientKey, ValidIpAddress.init("0.0.0.0"), Port(60430))
await allFutures(client.start(), server.start())
await server.mountStore(store=newTestMessageStore())
await client.mountStore()
let driver = newTestArchiveDriver()
server.mountArchive(some(driver), none(MessageValidator), none(RetentionPolicy))
await server.mountStore()
client.mountStoreClient()
## Given
let message = fakeWakuMessage()
require server.wakuStore.store.put(DefaultPubsubTopic, message).isOk()
require driver.put(DefaultPubsubTopic, message).isOk()
let serverPeer = server.peerInfo.toRemotePeerInfo()
@ -81,7 +94,9 @@ procSuite "WakuNode - Store":
await allFutures(client.start(), server.start(), filterSource.start())
await filterSource.mountFilter()
await server.mountStore(store=newTestMessageStore())
let driver = newTestArchiveDriver()
server.mountArchive(some(driver), none(MessageValidator), none(RetentionPolicy))
await server.mountStore()
await server.mountFilterClient()
client.mountStoreClient()
@ -123,81 +138,3 @@ procSuite "WakuNode - Store":
## Cleanup
await allFutures(client.stop(), server.stop(), filterSource.stop())
asyncTest "Resume proc fetches the history":
## Setup
let
serverKey = crypto.PrivateKey.random(Secp256k1, rng[])[]
server = WakuNode.new(serverKey, ValidIpAddress.init("0.0.0.0"), Port(60412))
clientKey = crypto.PrivateKey.random(Secp256k1, rng[])[]
client = WakuNode.new(clientKey, ValidIpAddress.init("0.0.0.0"), Port(60410))
await allFutures(client.start(), server.start())
await server.mountStore(store=newTestMessageStore())
let clientStore = StoreQueueRef.new()
await client.mountStore(store=clientStore)
client.mountStoreClient(store=clientStore)
## Given
let message = fakeWakuMessage()
require server.wakuStore.store.put(DefaultPubsubTopic, message).isOk()
let serverPeer = server.peerInfo.toRemotePeerInfo()
## When
await client.resume(some(@[serverPeer]))
# Then
check:
client.wakuStore.store.getMessagesCount().tryGet() == 1
## Cleanup
await allFutures(client.stop(), server.stop())
asyncTest "Resume proc discards duplicate messages":
## Setup
let
serverKey = crypto.PrivateKey.random(Secp256k1, rng[])[]
server = WakuNode.new(serverKey, ValidIpAddress.init("0.0.0.0"), Port(60422))
clientKey = crypto.PrivateKey.random(Secp256k1, rng[])[]
client = WakuNode.new(clientKey, ValidIpAddress.init("0.0.0.0"), Port(60420))
await allFutures(server.start(), client.start())
await server.mountStore(store=StoreQueueRef.new())
let clientStore = StoreQueueRef.new()
await client.mountStore(store=clientStore)
client.mountStoreClient(store=clientStore)
## Given
let timeOrigin = now()
let
msg1 = fakeWakuMessage(payload="hello world1", ts=(timeOrigin + getNanoSecondTime(1)))
msg2 = fakeWakuMessage(payload="hello world2", ts=(timeOrigin + getNanoSecondTime(2)))
msg3 = fakeWakuMessage(payload="hello world3", ts=(timeOrigin + getNanoSecondTime(3)))
require server.wakuStore.store.put(DefaultPubsubTopic, msg1).isOk()
require server.wakuStore.store.put(DefaultPubsubTopic, msg2).isOk()
# Insert the same message in both node's store
let
receivedTime3 = now() + getNanosecondTime(10)
digest3 = computeDigest(msg3)
require server.wakuStore.store.put(DefaultPubsubTopic, msg3, digest3, receivedTime3).isOk()
require client.wakuStore.store.put(DefaultPubsubTopic, msg3, digest3, receivedTime3).isOk()
let serverPeer = server.peerInfo.toRemotePeerInfo()
## When
await client.resume(some(@[serverPeer]))
## Then
check:
# If the duplicates are discarded properly, then the total number of messages after resume should be 3
client.wakuStore.store.getMessagesCount().tryGet() == 3
await allFutures(client.stop(), server.stop())

View File

@ -51,8 +51,8 @@ proc toJsonRPCStoreResponse*(response: HistoryResponse): StoreResponse =
messages: response.messages,
pagingOptions: if response.cursor.isNone(): none(StorePagingOptions)
else: some(StorePagingOptions(
pageSize: response.pageSize,
forward: response.ascending,
pageSize: uint64(response.messages.len), # This field will be deprecated soon
forward: true, # Hardcoded. This field will be deprecated soon
cursor: response.cursor.map(toRPC)
))
)

View File

@ -1,16 +0,0 @@
when (NimMajor, NimMinor) < (1, 4):
{.push raises: [Defect].}
else:
{.push raises: [].}
import
stew/results
import
../../protocol/waku_store/message_store
type RetentionPolicyResult*[T] = Result[T, string]
type MessageRetentionPolicy* = ref object of RootObj
method execute*(p: MessageRetentionPolicy, store: MessageStore): RetentionPolicyResult[void] {.base.} = discard

View File

@ -1,73 +0,0 @@
when (NimMajor, NimMinor) < (1, 4):
{.push raises: [Defect].}
else:
{.push raises: [].}
import
stew/results,
chronicles
import
../../protocol/waku_store/message_store,
./message_retention_policy
logScope:
topics = "waku node message_store retention_policy"
const StoreDefaultCapacity*: int = 25_000
const StoreMaxOverflow = 1.3
type
# CapacityRetentionPolicy implements auto deletion as follows:
# - The sqlite DB will store up to `totalCapacity = capacity` * `StoreMaxOverflow` messages,
# giving an overflowWindow of `capacity * (StoreMaxOverflow - 1) = overflowWindow`.
#
# - In case of an overflow, messages are sorted by `receiverTimestamp` and the oldest ones are
# deleted. The number of messages that get deleted is `(overflowWindow / 2) = deleteWindow`,
# bringing the total number of stored messages back to `capacity + (overflowWindow / 2)`.
#
# The rationale for batch deleting is efficiency. We keep half of the overflow window in addition
# to `capacity` because we delete the oldest messages with respect to `receiverTimestamp` instead of
# `senderTimestamp`. `ReceiverTimestamp` is guaranteed to be set, while senders could omit setting
# `senderTimestamp`. However, `receiverTimestamp` can differ from node to node for the same message.
# So sorting by `receiverTimestamp` might (slightly) prioritize some actually older messages and we
# compensate that by keeping half of the overflow window.
CapacityRetentionPolicy* = ref object of MessageRetentionPolicy
capacity: int # represents both the number of messages that are persisted in the sqlite DB (excl. the overflow window explained above), and the number of messages that get loaded via `getAll`.
totalCapacity: int # = capacity * StoreMaxOverflow
deleteWindow: int # = capacity * (StoreMaxOverflow - 1) / 2; half of the overflow window, the amount of messages deleted when overflow occurs
proc calculateTotalCapacity(capacity: int, overflow: float): int =
int(float(capacity) * overflow)
proc calculateOverflowWindow(capacity: int, overflow: float): int =
int(float(capacity) * (overflow - 1))
proc calculateDeleteWindow(capacity: int, overflow: float): int =
calculateOverflowWindow(capacity, overflow) div 2
proc init*(T: type CapacityRetentionPolicy, capacity=StoreDefaultCapacity): T =
let
totalCapacity = calculateTotalCapacity(capacity, StoreMaxOverflow)
deleteWindow = calculateDeleteWindow(capacity, StoreMaxOverflow)
CapacityRetentionPolicy(
capacity: capacity,
totalCapacity: totalCapacity,
deleteWindow: deleteWindow
)
method execute*(p: CapacityRetentionPolicy, store: MessageStore): RetentionPolicyResult[void] =
let numMessages = ?store.getMessagesCount().mapErr(proc(err: string): string = "failed to get messages count: " & err)
if numMessages < p.totalCapacity:
return ok()
let res = store.deleteOldestMessagesNotWithinLimit(limit=p.capacity + p.deleteWindow)
if res.isErr():
return err("deleting oldest messages failed: " & res.error())
ok()

View File

@ -1,49 +0,0 @@
when (NimMajor, NimMinor) < (1, 4):
{.push raises: [Defect].}
else:
{.push raises: [].}
import
std/times,
stew/results,
chronicles,
chronos
import
../../protocol/waku_store/message_store,
../../utils/time,
./message_retention_policy
logScope:
topics = "waku node message_store retention_policy"
const StoreDefaultRetentionTime*: int64 = 30.days.seconds
type TimeRetentionPolicy* = ref object of MessageRetentionPolicy
retentionTime: chronos.Duration
proc init*(T: type TimeRetentionPolicy, retentionTime=StoreDefaultRetentionTime): T =
TimeRetentionPolicy(
retentionTime: retentionTime.seconds
)
method execute*(p: TimeRetentionPolicy, store: MessageStore): RetentionPolicyResult[void] =
## Delete messages that exceed the retention time by 10% and more (batch delete for efficiency)
let oldestReceiverTimestamp = ?store.getOldestMessageTimestamp().mapErr(proc(err: string): string = "failed to get oldest message timestamp: " & err)
let now = getNanosecondTime(getTime().toUnixFloat())
let retentionTimestamp = now - p.retentionTime.nanoseconds
let thresholdTimestamp = retentionTimestamp - p.retentionTime.nanoseconds div 10
if thresholdTimestamp <= oldestReceiverTimestamp:
return ok()
let res = store.deleteMessagesOlderThanTimestamp(ts=retentionTimestamp)
if res.isErr():
return err("failed to delete oldest messages: " & res.error())
ok()

View File

@ -1,12 +0,0 @@
when (NimMajor, NimMinor) < (1, 4):
{.push raises: [Defect].}
else:
{.push raises: [].}
import
./queue_store/index,
./queue_store/queue_store
export
queue_store,
index

View File

@ -1,92 +0,0 @@
when (NimMajor, NimMinor) < (1, 4):
{.push raises: [Defect].}
else:
{.push raises: [].}
import
stew/byteutils,
nimcrypto/sha2
import
../../../protocol/waku_message,
../../../protocol/waku_store/common,
../../../utils/time
type Index* = object
## This type contains the description of an Index used in the pagination of WakuMessages
pubsubTopic*: string
senderTime*: Timestamp # the time at which the message is generated
receiverTime*: Timestamp
digest*: MessageDigest # calculated over payload and content topic
proc compute*(T: type Index, msg: WakuMessage, receivedTime: Timestamp, pubsubTopic: PubsubTopic): T =
## Takes a WakuMessage with received timestamp and returns its Index.
let
digest = computeDigest(msg)
senderTime = msg.timestamp
Index(
pubsubTopic: pubsubTopic,
senderTime: senderTime,
receiverTime: receivedTime,
digest: digest
)
proc tohistoryCursor*(index: Index): HistoryCursor =
HistoryCursor(
pubsubTopic: index.pubsubTopic,
senderTime: index.senderTime,
storeTime: index.receiverTime,
digest: index.digest
)
proc toIndex*(index: HistoryCursor): Index =
Index(
pubsubTopic: index.pubsubTopic,
senderTime: index.senderTime,
receiverTime: index.storeTime,
digest: index.digest
)
proc `==`*(x, y: Index): bool =
## receiverTime plays no role in index equality
(x.senderTime == y.senderTime) and
(x.digest == y.digest) and
(x.pubsubTopic == y.pubsubTopic)
proc cmp*(x, y: Index): int =
## compares x and y
## returns 0 if they are equal
## returns -1 if x < y
## returns 1 if x > y
##
## Default sorting order priority is:
## 1. senderTimestamp
## 2. receiverTimestamp (a fallback only if senderTimestamp unset on either side, and all other fields unequal)
## 3. message digest
## 4. pubsubTopic
if x == y:
# Quick exit ensures receiver time does not affect index equality
return 0
# Timestamp has a higher priority for comparison
let
# Use receiverTime where senderTime is unset
xTimestamp = if x.senderTime == 0: x.receiverTime
else: x.senderTime
yTimestamp = if y.senderTime == 0: y.receiverTime
else: y.senderTime
let timecmp = cmp(xTimestamp, yTimestamp)
if timecmp != 0:
return timecmp
# Continue only when timestamps are equal
let digestcmp = cmp(x.digest.data, y.digest.data)
if digestcmp != 0:
return digestcmp
return cmp(x.pubsubTopic, y.pubsubTopic)

View File

@ -1,330 +0,0 @@
when (NimMajor, NimMinor) < (1, 4):
{.push raises: [Defect].}
else:
{.push raises: [].}
import
std/options,
stew/results,
stew/sorted_set,
chronicles
import
../../../protocol/waku_message,
../../../protocol/waku_store/common,
../../../protocol/waku_store/message_store,
../../../utils/time,
./index
logScope:
topics = "waku node message_store storequeue"
const StoreQueueDefaultMaxCapacity* = 25_000
type
IndexedWakuMessage* = object
# TODO: may need to rename this object as it holds both the index and the pubsub topic of a waku message
## This type is used to encapsulate a WakuMessage and its Index
msg*: WakuMessage
index*: Index
pubsubTopic*: string
QueryFilterMatcher = proc(indexedWakuMsg: IndexedWakuMessage): bool {.gcsafe, closure.}
type
StoreQueueErrorKind {.pure.} = enum
INVALID_CURSOR
StoreQueueGetPageResult = Result[seq[MessageStoreRow], StoreQueueErrorKind]
type StoreQueueRef* = ref object of MessageStore
## Bounded repository for indexed messages
##
## The store queue will keep messages up to its
## configured capacity. As soon as this capacity
## is reached and a new message is added, the oldest
## item will be removed to make space for the new one.
## This implies both a `delete` and `add` operation
## for new items.
##
## TODO: a circular/ring buffer may be a more efficient implementation
## TODO: we don't need to store the Index twice (as key and in the value)
items: SortedSet[Index, IndexedWakuMessage] # sorted set of stored messages
capacity: int # Maximum amount of messages to keep
### Helpers
proc ffdToCursor(w: SortedSetWalkRef[Index, IndexedWakuMessage],
startCursor: Index):
SortedSetResult[Index, IndexedWakuMessage] =
## Fast forward `w` to start cursor
## TODO: can probably improve performance here with a binary/tree search
var nextItem = w.first
trace "Fast forwarding to start cursor", startCursor=startCursor, firstItem=nextItem
## Fast forward until we reach the startCursor
while nextItem.isOk():
if nextItem.value.key == startCursor:
# Exit ffd loop when we find the start cursor
break
# Not yet at cursor. Continue advancing
nextItem = w.next
trace "Continuing ffd to start cursor", nextItem=nextItem
return nextItem
proc rwdToCursor(w: SortedSetWalkRef[Index, IndexedWakuMessage],
startCursor: Index):
SortedSetResult[Index, IndexedWakuMessage] =
## Rewind `w` to start cursor
## TODO: can probably improve performance here with a binary/tree search
var prevItem = w.last
trace "Rewinding to start cursor", startCursor=startCursor, lastItem=prevItem
## Rewind until we reach the startCursor
while prevItem.isOk():
if prevItem.value.key == startCursor:
# Exit rwd loop when we find the start cursor
break
# Not yet at cursor. Continue rewinding.
prevItem = w.prev
trace "Continuing rewind to start cursor", prevItem=prevItem
return prevItem
#### API
proc new*(T: type StoreQueueRef, capacity: int = StoreQueueDefaultMaxCapacity): T =
var items = SortedSet[Index, IndexedWakuMessage].init()
return StoreQueueRef(items: items, capacity: capacity)
proc contains*(store: StoreQueueRef, index: Index): bool =
## Return `true` if the store queue already contains the `index`, `false` otherwise.
store.items.eq(index).isOk()
proc len*(store: StoreQueueRef): int {.noSideEffect.} =
store.items.len
proc getPage(store: StoreQueueRef,
pageSize: uint64 = 0,
forward: bool = true,
cursor: Option[Index] = none(Index),
predicate: QueryFilterMatcher = nil): StoreQueueGetPageResult =
## Populate a single page in forward direction
## Start at the `startCursor` (exclusive), or first entry (inclusive) if not defined.
## Page size must not exceed `maxPageSize`
## Each entry must match the `pred`
var outSeq: seq[MessageStoreRow]
var w = SortedSetWalkRef[Index,IndexedWakuMessage].init(store.items)
defer: w.destroy()
var currentEntry: SortedSetResult[Index, IndexedWakuMessage]
# Find starting entry
if cursor.isSome():
let cursorEntry = if forward: w.ffdToCursor(cursor.get())
else: w.rwdToCursor(cursor.get())
if cursorEntry.isErr():
return err(StoreQueueErrorKind.INVALID_CURSOR)
# Advance walker once more
currentEntry = if forward: w.next()
else: w.prev()
else:
# Start from the beginning of the queue
currentEntry = if forward: w.first()
else: w.last()
trace "Starting page query", currentEntry=currentEntry
## This loop walks forward over the queue:
## 1. from the given cursor (or first/last entry, if not provided)
## 2. adds entries matching the predicate function to output page
## 3. until either the end of the queue or maxPageSize is reached
var numberOfItems = 0.uint
while currentEntry.isOk() and numberOfItems < pageSize:
trace "Continuing page query", currentEntry=currentEntry, numberOfItems=numberOfItems
if predicate.isNil() or predicate(currentEntry.value.data):
let
key = currentEntry.value.key
data = currentEntry.value.data
numberOfItems += 1
outSeq.add((key.pubsubTopic, data.msg, @(key.digest.data), key.receiverTime))
currentEntry = if forward: w.next()
else: w.prev()
trace "Successfully retrieved page", len=outSeq.len
return ok(outSeq)
## --- SortedSet accessors ---
iterator fwdIterator*(store: StoreQueueRef): (Index, IndexedWakuMessage) =
## Forward iterator over the entire store queue
var
w = SortedSetWalkRef[Index,IndexedWakuMessage].init(store.items)
res = w.first()
while res.isOk():
yield (res.value.key, res.value.data)
res = w.next()
w.destroy()
iterator bwdIterator*(store: StoreQueueRef): (Index, IndexedWakuMessage) =
## Backwards iterator over the entire store queue
var
w = SortedSetWalkRef[Index,IndexedWakuMessage].init(store.items)
res = w.last()
while res.isOk():
yield (res.value.key, res.value.data)
res = w.prev()
w.destroy()
proc first*(store: StoreQueueRef): MessageStoreResult[IndexedWakuMessage] =
var
w = SortedSetWalkRef[Index,IndexedWakuMessage].init(store.items)
res = w.first()
w.destroy()
if res.isErr():
return err("Not found")
return ok(res.value.data)
proc last*(store: StoreQueueRef): MessageStoreResult[IndexedWakuMessage] =
var
w = SortedSetWalkRef[Index,IndexedWakuMessage].init(store.items)
res = w.last()
w.destroy()
if res.isErr():
return err("Not found")
return ok(res.value.data)
## --- Queue API ---
proc add*(store: StoreQueueRef, msg: IndexedWakuMessage): MessageStoreResult[void] =
## Add a message to the queue
##
## If we're at capacity, we will be removing, the oldest (first) item
if store.contains(msg.index):
trace "could not add item to store queue. Index already exists", index=msg.index
return err("duplicate")
# TODO: the below delete block can be removed if we convert to circular buffer
if store.items.len >= store.capacity:
var
w = SortedSetWalkRef[Index, IndexedWakuMessage].init(store.items)
firstItem = w.first
if cmp(msg.index, firstItem.value.key) < 0:
# When at capacity, we won't add if message index is smaller (older) than our oldest item
w.destroy # Clean up walker
return err("too_old")
discard store.items.delete(firstItem.value.key)
w.destroy # better to destroy walker after a delete operation
store.items.insert(msg.index).value.data = msg
return ok()
method put*(store: StoreQueueRef, pubsubTopic: PubsubTopic, message: WakuMessage, digest: MessageDigest, receivedTime: Timestamp): MessageStoreResult[void] =
let index = Index(pubsubTopic: pubsubTopic, senderTime: message.timestamp, receiverTime: receivedTime, digest: digest)
let message = IndexedWakuMessage(msg: message, index: index, pubsubTopic: pubsubTopic)
store.add(message)
method put*(store: StoreQueueRef, pubsubTopic: PubsubTopic, message: WakuMessage): MessageStoreResult[void] =
## Inserts a message into the store
procCall MessageStore(store).put(pubsubTopic, message)
method getAllMessages*(store: StoreQueueRef): MessageStoreResult[seq[MessageStoreRow]] =
# TODO: Implement this message_store method
err("interface method not implemented")
method getMessagesByHistoryQuery*(
store: StoreQueueRef,
contentTopic = none(seq[ContentTopic]),
pubsubTopic = none(PubsubTopic),
cursor = none(HistoryCursor),
startTime = none(Timestamp),
endTime = none(Timestamp),
maxPageSize = DefaultPageSize,
ascendingOrder = true
): MessageStoreResult[seq[MessageStoreRow]] =
let cursor = cursor.map(toIndex)
let matchesQuery: QueryFilterMatcher = proc(indMsg: IndexedWakuMessage): bool =
if pubsubTopic.isSome():
if indMsg.pubsubTopic != pubsubTopic.get():
return false
if startTime.isSome() and endTime.isSome():
# temporal filtering: select only messages whose sender generated timestamps fall
# between the queried start time and end time
if indMsg.msg.timestamp > endTime.get() or indMsg.msg.timestamp < startTime.get():
return false
if contentTopic.isSome():
if indMsg.msg.contentTopic notin contentTopic.get():
return false
return true
var pageRes: StoreQueueGetPageResult
try:
pageRes = store.getPage(maxPageSize, ascendingOrder, cursor, matchesQuery)
except:
return err(getCurrentExceptionMsg())
if pageRes.isErr():
case pageRes.error:
of StoreQueueErrorKind.INVALID_CURSOR:
return err("invalid cursor")
ok(pageRes.value)
method getMessagesCount*(s: StoreQueueRef): MessageStoreResult[int64] =
ok(int64(s.len()))
method getOldestMessageTimestamp*(s: StoreQueueRef): MessageStoreResult[Timestamp] =
s.first().map(proc(msg: IndexedWakuMessage): Timestamp = msg.index.receiverTime)
method getNewestMessageTimestamp*(s: StoreQueueRef): MessageStoreResult[Timestamp] =
s.last().map(proc(msg: IndexedWakuMessage): Timestamp = msg.index.receiverTime)
method deleteMessagesOlderThanTimestamp*(s: StoreQueueRef, ts: Timestamp): MessageStoreResult[void] =
# TODO: Implement this message_store method
err("interface method not implemented")
method deleteOldestMessagesNotWithinLimit*(s: StoreQueueRef, limit: int): MessageStoreResult[void] =
# TODO: Implement this message_store method
err("interface method not implemented")

View File

@ -1,8 +0,0 @@
when (NimMajor, NimMinor) < (1, 4):
{.push raises: [Defect].}
else:
{.push raises: [].}
import ./sqlite_store/sqlite_store
export sqlite_store

View File

@ -1,38 +0,0 @@
{.push raises: [].}
import
std/[tables, strutils, os],
stew/results,
chronicles
import
../../../../common/sqlite,
../../../../common/sqlite/migrations
logScope:
topics = "message_store.migration"
const SchemaVersion* = 7 # increase this when there is an update in the database schema
template projectRoot: string = currentSourcePath.rsplit(DirSep, 1)[0] / ".." / ".." / ".." / ".." / ".."
const MessageStoreMigrationPath: string = projectRoot / "migrations" / "message_store"
proc migrate*(db: SqliteDatabase, targetVersion = SchemaVersion): DatabaseResult[void] =
## Compares the `user_version` of the sqlite database with the provided `targetVersion`, then
## it runs migration scripts if the `user_version` is outdated. The `migrationScriptsDir` path
## points to the directory holding the migrations scripts once the db is updated, it sets the
## `user_version` to the `tragetVersion`.
##
## If not `targetVersion` is provided, it defaults to `SchemaVersion`.
##
## NOTE: Down migration it is not currently supported
debug "starting message store's sqlite database migration"
let migrationRes = migrations.migrate(db, targetVersion, migrationsScriptsDir=MessageStoreMigrationPath)
if migrationRes.isErr():
return err("failed to execute migration scripts: " & migrationRes.error)
debug "finished message store's sqlite database migration"
ok()

View File

@ -1,400 +0,0 @@
when (NimMajor, NimMinor) < (1, 4):
{.push raises: [Defect].}
else:
{.push raises: [].}
import
std/[options, sequtils],
stew/[results, byteutils],
sqlite3_abi
import
../../../../common/sqlite,
../../../protocol/waku_message,
../../../utils/time
const DbTable = "Message"
type SqlQueryStr = string
type DbCursor* = (Timestamp, seq[byte], PubsubTopic)
### SQLite column helper methods
proc queryRowWakuMessageCallback(s: ptr sqlite3_stmt, contentTopicCol, payloadCol, versionCol, senderTimestampCol: cint): WakuMessage =
let
topic = cast[ptr UncheckedArray[byte]](sqlite3_column_blob(s, contentTopicCol))
topicLength = sqlite3_column_bytes(s, contentTopicCol)
contentTopic = string.fromBytes(@(toOpenArray(topic, 0, topicLength-1)))
let
p = cast[ptr UncheckedArray[byte]](sqlite3_column_blob(s, payloadCol))
length = sqlite3_column_bytes(s, payloadCol)
payload = @(toOpenArray(p, 0, length-1))
let version = sqlite3_column_int64(s, versionCol)
let senderTimestamp = sqlite3_column_int64(s, senderTimestampCol)
WakuMessage(
contentTopic: ContentTopic(contentTopic),
payload: payload ,
version: uint32(version),
timestamp: Timestamp(senderTimestamp)
)
proc queryRowReceiverTimestampCallback(s: ptr sqlite3_stmt, storedAtCol: cint): Timestamp =
let storedAt = sqlite3_column_int64(s, storedAtCol)
Timestamp(storedAt)
proc queryRowPubsubTopicCallback(s: ptr sqlite3_stmt, pubsubTopicCol: cint): PubsubTopic =
let
pubsubTopicPointer = cast[ptr UncheckedArray[byte]](sqlite3_column_blob(s, pubsubTopicCol))
pubsubTopicLength = sqlite3_column_bytes(s, pubsubTopicCol)
pubsubTopic = string.fromBytes(@(toOpenArray(pubsubTopicPointer, 0, pubsubTopicLength-1)))
pubsubTopic
proc queryRowDigestCallback(s: ptr sqlite3_stmt, digestCol: cint): seq[byte] =
let
digestPointer = cast[ptr UncheckedArray[byte]](sqlite3_column_blob(s, digestCol))
digestLength = sqlite3_column_bytes(s, digestCol)
digest = @(toOpenArray(digestPointer, 0, digestLength-1))
digest
### SQLite queries
## Create table
proc createTableQuery(table: string): SqlQueryStr =
"CREATE TABLE IF NOT EXISTS " & table & " (" &
" pubsubTopic BLOB NOT NULL," &
" contentTopic BLOB NOT NULL," &
" payload BLOB," &
" version INTEGER NOT NULL," &
" timestamp INTEGER NOT NULL," &
" id BLOB," &
" storedAt INTEGER NOT NULL," &
" CONSTRAINT messageIndex PRIMARY KEY (storedAt, id, pubsubTopic)" &
") WITHOUT ROWID;"
proc createTable*(db: SqliteDatabase): DatabaseResult[void] =
let query = createTableQuery(DbTable)
discard ?db.query(query, proc(s: ptr sqlite3_stmt) = discard)
ok()
## Create indices
proc createOldestMessageTimestampIndexQuery(table: string): SqlQueryStr =
"CREATE INDEX IF NOT EXISTS i_ts ON " & table & " (storedAt);"
proc createOldestMessageTimestampIndex*(db: SqliteDatabase): DatabaseResult[void] =
let query = createOldestMessageTimestampIndexQuery(DbTable)
discard ?db.query(query, proc(s: ptr sqlite3_stmt) = discard)
ok()
proc createHistoryQueryIndexQuery(table: string): SqlQueryStr =
"CREATE INDEX IF NOT EXISTS i_query ON " & table & " (contentTopic, pubsubTopic, storedAt, id);"
proc createHistoryQueryIndex*(db: SqliteDatabase): DatabaseResult[void] =
let query = createHistoryQueryIndexQuery(DbTable)
discard ?db.query(query, proc(s: ptr sqlite3_stmt) = discard)
ok()
## Insert message
type InsertMessageParams* = (seq[byte], Timestamp, seq[byte], seq[byte], seq[byte], int64, Timestamp)
proc insertMessageQuery(table: string): SqlQueryStr =
"INSERT INTO " & table & "(id, storedAt, contentTopic, payload, pubsubTopic, version, timestamp)" &
" VALUES (?, ?, ?, ?, ?, ?, ?);"
proc prepareInsertMessageStmt*(db: SqliteDatabase): SqliteStmt[InsertMessageParams, void] =
let query = insertMessageQuery(DbTable)
db.prepareStmt(query, InsertMessageParams, void).expect("this is a valid statement")
## Count table messages
proc countMessagesQuery(table: string): SqlQueryStr =
"SELECT COUNT(*) FROM " & table
proc getMessageCount*(db: SqliteDatabase): DatabaseResult[int64] =
var count: int64
proc queryRowCallback(s: ptr sqlite3_stmt) =
count = sqlite3_column_int64(s, 0)
let query = countMessagesQuery(DbTable)
let res = db.query(query, queryRowCallback)
if res.isErr():
return err("failed to count number of messages in the database")
ok(count)
## Get oldest message receiver timestamp
proc selectOldestMessageTimestampQuery(table: string): SqlQueryStr =
"SELECT MIN(storedAt) FROM " & table
proc selectOldestReceiverTimestamp*(db: SqliteDatabase): DatabaseResult[Timestamp] {.inline.}=
var timestamp: Timestamp
proc queryRowCallback(s: ptr sqlite3_stmt) =
timestamp = queryRowReceiverTimestampCallback(s, 0)
let query = selectOldestMessageTimestampQuery(DbTable)
let res = db.query(query, queryRowCallback)
if res.isErr():
return err("failed to get the oldest receiver timestamp from the database")
ok(timestamp)
## Get newest message receiver timestamp
proc selectNewestMessageTimestampQuery(table: string): SqlQueryStr =
"SELECT MAX(storedAt) FROM " & table
proc selectNewestReceiverTimestamp*(db: SqliteDatabase): DatabaseResult[Timestamp] {.inline.}=
var timestamp: Timestamp
proc queryRowCallback(s: ptr sqlite3_stmt) =
timestamp = queryRowReceiverTimestampCallback(s, 0)
let query = selectNewestMessageTimestampQuery(DbTable)
let res = db.query(query, queryRowCallback)
if res.isErr():
return err("failed to get the newest receiver timestamp from the database")
ok(timestamp)
## Delete messages older than timestamp
proc deleteMessagesOlderThanTimestampQuery(table: string, ts: Timestamp): SqlQueryStr =
"DELETE FROM " & table & " WHERE storedAt < " & $ts
proc deleteMessagesOlderThanTimestamp*(db: SqliteDatabase, ts: int64): DatabaseResult[void] =
let query = deleteMessagesOlderThanTimestampQuery(DbTable, ts)
discard ?db.query(query, proc(s: ptr sqlite3_stmt) = discard)
ok()
## Delete oldest messages not within limit
proc deleteOldestMessagesNotWithinLimitQuery(table: string, limit: int): SqlQueryStr =
"DELETE FROM " & table & " WHERE id NOT IN (" &
" SELECT id FROM " & table &
" ORDER BY storedAt DESC" &
" LIMIT " & $limit &
");"
proc deleteOldestMessagesNotWithinLimit*(db: SqliteDatabase, limit: int): DatabaseResult[void] =
# NOTE: The word `limit` here refers the store capacity/maximum number-of-messages allowed limit
let query = deleteOldestMessagesNotWithinLimitQuery(DbTable, limit=limit)
discard ?db.query(query, proc(s: ptr sqlite3_stmt) = discard)
ok()
## Select all messages
proc selectAllMessagesQuery(table: string): SqlQueryStr =
"SELECT storedAt, contentTopic, payload, pubsubTopic, version, timestamp, id" &
" FROM " & table &
" ORDER BY storedAt ASC"
proc selectAllMessages*(db: SqliteDatabase): DatabaseResult[seq[(PubsubTopic, WakuMessage, seq[byte], Timestamp)]] =
## Retrieve all messages from the store.
var rows: seq[(PubsubTopic, WakuMessage, seq[byte], Timestamp)]
proc queryRowCallback(s: ptr sqlite3_stmt) =
let
pubsubTopic = queryRowPubsubTopicCallback(s, pubsubTopicCol=3)
wakuMessage = queryRowWakuMessageCallback(s, contentTopicCol=1, payloadCol=2, versionCol=4, senderTimestampCol=5)
digest = queryRowDigestCallback(s, digestCol=6)
storedAt = queryRowReceiverTimestampCallback(s, storedAtCol=0)
rows.add((pubsubTopic, wakuMessage, digest, storedAt))
let query = selectAllMessagesQuery(DbTable)
let res = db.query(query, queryRowCallback)
if res.isErr():
return err(res.error())
ok(rows)
## Select messages by history query with limit
proc contentTopicWhereClause(contentTopic: Option[seq[ContentTopic]]): Option[string] =
if contentTopic.isNone():
return none(string)
let topic = contentTopic.get()
if topic.len <= 0:
return none(string)
var contentTopicWhere = "("
contentTopicWhere &= "contentTopic = (?)"
for _ in topic[1..^1]:
contentTopicWhere &= " OR contentTopic = (?)"
contentTopicWhere &= ")"
some(contentTopicWhere)
proc cursorWhereClause(cursor: Option[DbCursor], ascending=true): Option[string] =
if cursor.isNone():
return none(string)
let comp = if ascending: ">" else: "<"
let whereClause = "(storedAt, id, pubsubTopic) " & comp & " (?, ?, ?)"
some(whereClause)
proc pubsubWhereClause(pubsubTopic: Option[PubsubTopic]): Option[string] =
if pubsubTopic.isNone():
return none(string)
some("pubsubTopic = (?)")
proc timeRangeWhereClause(startTime: Option[Timestamp], endTime: Option[Timestamp]): Option[string] =
if startTime.isNone() and endTime.isNone():
return none(string)
var where = "("
if startTime.isSome():
where &= "storedAt >= (?)"
if startTime.isSome() and endTime.isSome():
where &= " AND "
if endTime.isSome():
where &= "storedAt <= (?)"
where &= ")"
some(where)
proc whereClause(clauses: varargs[Option[string]]): Option[string] =
if clauses.len <= 0 or @clauses.all(proc(clause: Option[string]): bool= clause.isNone()):
return none(string)
let whereList = @clauses
.filter(proc(clause: Option[string]): bool= clause.isSome())
.map(proc(clause: Option[string]): string = clause.get())
var where: string = whereList[0]
for clause in whereList[1..^1]:
where &= " AND " & clause
some(where)
proc selectMessagesWithLimitQuery(table: string, where: Option[string], limit: uint64, ascending=true): SqlQueryStr =
let order = if ascending: "ASC" else: "DESC"
var query: string
query = "SELECT storedAt, contentTopic, payload, pubsubTopic, version, timestamp, id"
query &= " FROM " & table
if where.isSome():
query &= " WHERE " & where.get()
query &= " ORDER BY storedAt " & order & ", id " & order & ", pubsubTopic " & order
query &= " LIMIT " & $limit & ";"
query
proc prepareSelectMessagesWithlimitStmt(db: SqliteDatabase, stmt: string): DatabaseResult[SqliteStmt[void, void]] =
var s: RawStmtPtr
checkErr sqlite3_prepare_v2(db.env, stmt, stmt.len.cint, addr s, nil)
ok(SqliteStmt[void, void](s))
proc execSelectMessagesWithLimitStmt(s: SqliteStmt,
contentTopic: Option[seq[ContentTopic]],
pubsubTopic: Option[PubsubTopic],
cursor: Option[DbCursor],
startTime: Option[Timestamp],
endTime: Option[Timestamp],
onRowCallback: DataProc): DatabaseResult[void] =
let s = RawStmtPtr(s)
# Bind params
var paramIndex = 1
if contentTopic.isSome():
for topic in contentTopic.get():
checkErr bindParam(s, paramIndex, topic.toBytes())
paramIndex += 1
if cursor.isSome(): # cursor = storedAt, id, pubsubTopic
let (storedAt, id, pubsubTopic) = cursor.get()
checkErr bindParam(s, paramIndex, storedAt)
paramIndex += 1
checkErr bindParam(s, paramIndex, id)
paramIndex += 1
checkErr bindParam(s, paramIndex, pubsubTopic.toBytes())
paramIndex += 1
if pubsubTopic.isSome():
let pubsubTopic = toBytes(pubsubTopic.get())
checkErr bindParam(s, paramIndex, pubsubTopic)
paramIndex += 1
if startTime.isSome():
let time = startTime.get()
checkErr bindParam(s, paramIndex, time)
paramIndex += 1
if endTime.isSome():
let time = endTime.get()
checkErr bindParam(s, paramIndex, time)
paramIndex += 1
try:
while true:
let v = sqlite3_step(s)
case v
of SQLITE_ROW:
onRowCallback(s)
of SQLITE_DONE:
return ok()
else:
return err($sqlite3_errstr(v))
finally:
# release implicit transaction
discard sqlite3_reset(s) # same return information as step
discard sqlite3_clear_bindings(s) # no errors possible
proc selectMessagesByHistoryQueryWithLimit*(db: SqliteDatabase,
contentTopic: Option[seq[ContentTopic]],
pubsubTopic: Option[PubsubTopic],
cursor: Option[DbCursor],
startTime: Option[Timestamp],
endTime: Option[Timestamp],
limit: uint64,
ascending: bool): DatabaseResult[seq[(PubsubTopic, WakuMessage, seq[byte], Timestamp)]] =
var messages: seq[(PubsubTopic, WakuMessage, seq[byte], Timestamp)] = @[]
proc queryRowCallback(s: ptr sqlite3_stmt) =
let
pubsubTopic = queryRowPubsubTopicCallback(s, pubsubTopicCol=3)
message = queryRowWakuMessageCallback(s, contentTopicCol=1, payloadCol=2, versionCol=4, senderTimestampCol=5)
digest = queryRowDigestCallback(s, digestCol=6)
storedAt = queryRowReceiverTimestampCallback(s, storedAtCol=0)
messages.add((pubsubTopic, message, digest, storedAt))
let query = block:
let
contentTopicClause = contentTopicWhereClause(contentTopic)
cursorClause = cursorWhereClause(cursor, ascending)
pubsubClause = pubsubWhereClause(pubsubTopic)
timeRangeClause = timeRangeWhereClause(startTime, endTime)
let where = whereClause(contentTopicClause, cursorClause, pubsubClause, timeRangeClause)
selectMessagesWithLimitQuery(DbTable, where, limit, ascending)
let dbStmt = ?db.prepareSelectMessagesWithlimitStmt(query)
?dbStmt.execSelectMessagesWithLimitStmt(
contentTopic,
pubsubTopic,
cursor,
startTime,
endTime,
queryRowCallback
)
dbStmt.dispose()
ok(messages)

View File

@ -1,135 +0,0 @@
# The code in this file is an adaptation of the Sqlite KV Store found in nim-eth.
# https://github.com/status-im/nim-eth/blob/master/eth/db/kvstore_sqlite3.nim
when (NimMajor, NimMinor) < (1, 4):
{.push raises: [Defect].}
else:
{.push raises: [].}
import
std/[options, tables],
stew/[byteutils, results],
chronicles
import
../../../../common/sqlite,
../../../protocol/waku_message,
../../../protocol/waku_store/common,
../../../protocol/waku_store/message_store,
../../../utils/time,
./queries
logScope:
topics = "waku node message_store sqlite"
proc init(db: SqliteDatabase): MessageStoreResult[void] =
## Misconfiguration can lead to nil DB
if db.isNil():
return err("db not initialized")
# Create table, if doesn't exist
let resCreate = createTable(db)
if resCreate.isErr():
return err("failed to create table: " & resCreate.error())
# Create indices, if don't exist
let resRtIndex = createOldestMessageTimestampIndex(db)
if resRtIndex.isErr():
return err("failed to create i_rt index: " & resRtIndex.error())
let resMsgIndex = createHistoryQueryIndex(db)
if resMsgIndex.isErr():
return err("failed to create i_msg index: " & resMsgIndex.error())
ok()
type SqliteStore* = ref object of MessageStore
db: SqliteDatabase
insertStmt: SqliteStmt[InsertMessageParams, void]
proc init*(T: type SqliteStore, db: SqliteDatabase): MessageStoreResult[T] =
# Database initialization
let resInit = init(db)
if resInit.isErr():
return err(resInit.error())
# General initialization
let insertStmt = db.prepareInsertMessageStmt()
ok(SqliteStore(db: db, insertStmt: insertStmt))
proc close*(s: SqliteStore) =
## Close the database connection
# Dispose statements
s.insertStmt.dispose()
# Close connection
s.db.close()
method put*(s: SqliteStore, pubsubTopic: PubsubTopic, message: WakuMessage, digest: MessageDigest, receivedTime: Timestamp): MessageStoreResult[void] =
## Inserts a message into the store
let res = s.insertStmt.exec((
@(digest.data), # id
receivedTime, # storedAt
toBytes(message.contentTopic), # contentTopic
message.payload, # payload
toBytes(pubsubTopic), # pubsubTopic
int64(message.version), # version
message.timestamp # senderTimestamp
))
if res.isErr():
return err("message insert failed: " & res.error)
ok()
method put*(s: SqliteStore, pubsubTopic: PubsubTopic, message: WakuMessage): MessageStoreResult[void] =
## Inserts a message into the store
procCall MessageStore(s).put(pubsubTopic, message)
method getAllMessages*(s: SqliteStore): MessageStoreResult[seq[MessageStoreRow]] =
## Retrieve all messages from the store.
s.db.selectAllMessages()
method getMessagesByHistoryQuery*(
s: SqliteStore,
contentTopic = none(seq[ContentTopic]),
pubsubTopic = none(PubsubTopic),
cursor = none(HistoryCursor),
startTime = none(Timestamp),
endTime = none(Timestamp),
maxPageSize = DefaultPageSize,
ascendingOrder = true
): MessageStoreResult[seq[MessageStoreRow]] =
let cursor = cursor.map(proc(c: HistoryCursor): DbCursor = (c.storeTime, @(c.digest.data), c.pubsubTopic))
return s.db.selectMessagesByHistoryQueryWithLimit(
contentTopic,
pubsubTopic,
cursor,
startTime,
endTime,
limit=maxPageSize,
ascending=ascendingOrder
)
method getMessagesCount*(s: SqliteStore): MessageStoreResult[int64] =
s.db.getMessageCount()
method getOldestMessageTimestamp*(s: SqliteStore): MessageStoreResult[Timestamp] =
s.db.selectOldestReceiverTimestamp()
method getNewestMessageTimestamp*(s: SqliteStore): MessageStoreResult[Timestamp] =
s.db.selectnewestReceiverTimestamp()
method deleteMessagesOlderThanTimestamp*(s: SqliteStore, ts: Timestamp): MessageStoreResult[void] =
s.db.deleteMessagesOlderThanTimestamp(ts)
method deleteOldestMessagesNotWithinLimit*(s: SqliteStore, limit: int): MessageStoreResult[void] =
s.db.deleteOldestMessagesNotWithinLimit(limit)

View File

@ -6,8 +6,9 @@ else:
import
std/[hashes, options, tables, strutils, sequtils, os],
chronos, chronicles, metrics,
stew/shims/net as stewNet,
stew/results,
stew/byteutils,
stew/shims/net as stewNet,
eth/keys,
nimcrypto,
bearssl/rand,
@ -24,6 +25,7 @@ import
import
../protocol/waku_message,
../protocol/waku_relay,
../protocol/waku_archive,
../protocol/waku_store,
../protocol/waku_store/client as store_client,
../protocol/waku_swap/waku_swap,
@ -35,9 +37,6 @@ import
../utils/peers,
../utils/wakuenr,
./peer_manager/peer_manager,
./message_store/message_retention_policy,
./message_store/message_retention_policy_capacity,
./message_store/message_retention_policy_time,
./dnsdisc/waku_dnsdisc,
./discv5/waku_discv5,
./wakuswitch
@ -84,6 +83,7 @@ type
peerManager*: PeerManager
switch*: Switch
wakuRelay*: WakuRelay
wakuArchive*: WakuArchive
wakuStore*: WakuStore
wakuStoreClient*: WakuStoreClient
wakuFilter*: WakuFilter
@ -278,8 +278,8 @@ proc subscribe(node: WakuNode, topic: PubsubTopic, handler: Option[TopicHandler]
if not node.wakuFilter.isNil():
await node.wakuFilter.handleMessage(topic, msg.value)
if not node.wakuStore.isNil():
node.wakuStore.handleMessage(topic, msg.value)
if not node.wakuArchive.isNil():
node.wakuArchive.handleMessage(topic, msg.value)
waku_node_messages.inc(labelValues = ["relay"])
@ -448,7 +448,7 @@ proc filterSubscribe*(node: WakuNode, pubsubTopic: PubsubTopic, contentTopics: C
# TODO: Move this logic to wakunode2 app
let handlerWrapper: FilterPushHandler = proc(pubsubTopic: string, message: WakuMessage) {.raises: [Exception].} =
if node.wakuRelay.isNil() and not node.wakuStore.isNil():
node.wakuStore.handleMessage(pubSubTopic, message)
node.wakuArchive.handleMessage(pubSubTopic, message)
handler(pubsubTopic, message)
@ -540,27 +540,33 @@ proc mountSwap*(node: WakuNode, swapConfig: SwapConfig = SwapConfig.init()) {.as
node.switch.mount(node.wakuSwap, protocolMatcher(WakuSwapCodec))
## Waku store
## Waku archive
const MessageStoreDefaultRetentionPolicyInterval* = 30.minutes
proc mountArchive*(node: WakuNode,
driver: Option[ArchiveDriver],
messageValidator: Option[MessageValidator],
retentionPolicy: Option[RetentionPolicy]) =
proc executeMessageRetentionPolicy*(node: WakuNode) =
if node.wakuStore.isNil():
if driver.isNone():
error "failed to mount waku archive protocol", error="archive driver not set"
return
if node.wakuStore.store.isNil():
node.wakuArchive = WakuArchive.new(driver.get(), messageValidator, retentionPolicy)
# TODO: Review this periodic task. Maybe, move it to the appplication code
const WakuArchiveDefaultRetentionPolicyInterval* = 30.minutes
proc executeMessageRetentionPolicy*(node: WakuNode) =
if node.wakuArchive.isNil():
return
debug "executing message retention policy"
node.wakuStore.executeMessageRetentionPolicy()
node.wakuStore.reportStoredMessagesMetric()
node.wakuArchive.executeMessageRetentionPolicy()
node.wakuArchive.reportStoredMessagesMetric()
proc startMessageRetentionPolicyPeriodicTask*(node: WakuNode, interval: Duration) =
if node.wakuStore.isNil():
return
if node.wakuStore.store.isNil():
if node.wakuArchive.isNil():
return
# https://github.com/nim-lang/Nim/issues/17369
@ -571,15 +577,56 @@ proc startMessageRetentionPolicyPeriodicTask*(node: WakuNode, interval: Duration
discard setTimer(Moment.fromNow(interval), executeRetentionPolicy)
proc mountStore*(node: WakuNode, store: MessageStore = nil, retentionPolicy=none(MessageRetentionPolicy) ) {.async, raises: [Defect, LPError].} =
## Waku store
# TODO: Review this mapping logic. Maybe, move it to the appplication code
proc toArchiveQuery(request: HistoryQuery): ArchiveQuery =
ArchiveQuery(
pubsubTopic: request.pubsubTopic,
contentTopics: request.contentTopics,
cursor: request.cursor.map(proc(cursor: HistoryCursor): ArchiveCursor = ArchiveCursor(pubsubTopic: cursor.pubsubTopic, senderTime: cursor.senderTime, storeTime: cursor.storeTime, digest: cursor.digest)),
startTime: request.startTime,
endTime: request.endTime,
pageSize: request.pageSize.uint,
ascending: request.ascending
)
# TODO: Review this mapping logic. Maybe, move it to the appplication code
proc toHistoryResult*(res: ArchiveResult): HistoryResult =
if res.isErr():
let error = res.error
case res.error.kind:
of ArchiveErrorKind.DRIVER_ERROR, ArchiveErrorKind.INVALID_QUERY:
err(HistoryError(
kind: HistoryErrorKind.BAD_REQUEST,
cause: res.error.cause
))
else:
err(HistoryError(kind: HistoryErrorKind.UNKNOWN))
else:
let response = res.get()
ok(HistoryResponse(
messages: response.messages,
cursor: response.cursor.map(proc(cursor: ArchiveCursor): HistoryCursor = HistoryCursor(pubsubTopic: cursor.pubsubTopic, senderTime: cursor.senderTime, storeTime: cursor.storeTime, digest: cursor.digest)),
))
proc mountStore*(node: WakuNode) {.async, raises: [Defect, LPError].} =
info "mounting waku store protocol"
node.wakuStore = WakuStore.new(
node.peerManager,
node.rng,
store,
retentionPolicy=retentionPolicy
)
if node.wakuArchive.isNil():
error "failed to mount waku store protocol", error="waku archive not set"
return
# TODO: Review this handler logic. Maybe, move it to the appplication code
let queryHandler: HistoryQueryHandler = proc(request: HistoryQuery): HistoryResult =
let request = request.toArchiveQuery()
let response = node.wakuArchive.findMessages(request)
response.toHistoryResult()
node.wakuStore = WakuStore.new(node.peerManager, node.rng, queryHandler)
if node.started:
# Node has started already. Let's start store too.
@ -588,10 +635,10 @@ proc mountStore*(node: WakuNode, store: MessageStore = nil, retentionPolicy=none
node.switch.mount(node.wakuStore, protocolMatcher(WakuStoreCodec))
proc mountStoreClient*(node: WakuNode, store: MessageStore = nil) =
proc mountStoreClient*(node: WakuNode) =
info "mounting store client"
node.wakuStoreClient = WakuStoreClient.new(node.peerManager, node.rng, store)
node.wakuStoreClient = WakuStoreClient.new(node.peerManager, node.rng)
proc query*(node: WakuNode, query: HistoryQuery, peer: RemotePeerInfo): Future[WakuStoreResult[HistoryResponse]] {.async, gcsafe.} =
## Queries known nodes for historical messages
@ -635,6 +682,7 @@ proc query*(node: WakuNode, query: HistoryQuery): Future[WakuStoreResult[History
return await node.query(query, peerOpt.get())
when defined(waku_exp_store_resume):
# TODO: Move to application module (e.g., wakunode2.nim)
proc resume*(node: WakuNode, peerList: Option[seq[RemotePeerInfo]] = none(seq[RemotePeerInfo])) {.async, gcsafe.} =
## resume proc retrieves the history of waku messages published on the default waku pubsub topic since the last time the waku node has been online

View File

@ -67,7 +67,7 @@ method validate*(validator: DefaultMessageValidator, msg: WakuMessage): Validati
type
WakuArchive* = ref object
driver*: ArchiveDriver
driver*: ArchiveDriver # TODO: Make this field private. Remove asterisk
validator: MessageValidator
retentionPolicy: RetentionPolicy

View File

@ -5,8 +5,8 @@ import
stew/results,
chronicles
import
../../../../common/sqlite,
../../../../common/sqlite/migrations
../../../../../common/sqlite,
../../../../../common/sqlite/migrations
logScope:

View File

@ -1,9 +1,7 @@
import
./waku_store/common,
./waku_store/message_store,
./waku_store/protocol
export
common,
message_store,
protocol

View File

@ -4,7 +4,7 @@ else:
{.push raises: [].}
import
std/[options, sequtils, times],
std/options,
stew/results,
chronicles,
chronos,
@ -18,28 +18,32 @@ import
./protocol_metrics,
./common,
./rpc,
./rpc_codec,
./message_store
./rpc_codec
when defined(waku_exp_store_resume):
import std/[sequtils, times]
import ../waku_archive
logScope:
topics = "waku store client"
const
DefaultPageSize*: uint64 = 20 # A recommended default number of waku messages per page
const DefaultPageSize*: uint = 20 # A recommended default number of waku messages per page
type WakuStoreClient* = ref object
peerManager: PeerManager
rng: ref rand.HmacDrbgContext
store: MessageStore
# TODO: Move outside of the client
when defined(waku_exp_store_resume):
store: ArchiveDriver
proc new*(T: type WakuStoreClient,
peerManager: PeerManager,
rng: ref rand.HmacDrbgContext,
store: MessageStore): T =
WakuStoreClient(peerManager: peerManager, rng: rng, store: store)
rng: ref rand.HmacDrbgContext): T =
WakuStoreClient(peerManager: peerManager, rng: rng)
proc sendHistoryQueryRPC(w: WakuStoreClient, req: HistoryQuery, peer: RemotePeerInfo): Future[HistoryResult] {.async, gcsafe.} =
@ -79,7 +83,22 @@ proc sendHistoryQueryRPC(w: WakuStoreClient, req: HistoryQuery, peer: RemotePeer
proc query*(w: WakuStoreClient, req: HistoryQuery, peer: RemotePeerInfo): Future[HistoryResult] {.async, gcsafe.} =
return await w.sendHistoryQueryRPC(req, peer)
proc queryAll*(w: WakuStoreClient, query: HistoryQuery, peer: RemotePeerInfo): Future[WakuStoreResult[seq[WakuMessage]]] {.async, gcsafe.} =
# TODO: Move outside of the client
when defined(waku_exp_store_resume):
## Resume store
const StoreResumeTimeWindowOffset: Timestamp = getNanosecondTime(20) ## Adjust the time window with an offset of 20 seconds
proc new*(T: type WakuStoreClient,
peerManager: PeerManager,
rng: ref rand.HmacDrbgContext,
store: ArchiveDriver): T =
WakuStoreClient(peerManager: peerManager, rng: rng, store: store)
proc queryAll(w: WakuStoreClient, query: HistoryQuery, peer: RemotePeerInfo): Future[WakuStoreResult[seq[WakuMessage]]] {.async, gcsafe.} =
## A thin wrapper for query. Sends the query to the given peer. when the query has a valid pagingInfo,
## it retrieves the historical messages in pages.
## Returns all the fetched messages, if error occurs, returns an error string
@ -107,11 +126,6 @@ proc queryAll*(w: WakuStoreClient, query: HistoryQuery, peer: RemotePeerInfo): F
return ok(messageList)
## Resume store
const StoreResumeTimeWindowOffset: Timestamp = getNanosecondTime(20) ## Adjust the time window with an offset of 20 seconds
proc queryLoop(w: WakuStoreClient, req: HistoryQuery, peers: seq[RemotePeerInfo]): Future[WakuStoreResult[seq[WakuMessage]]] {.async, gcsafe.} =
## Loops through the peers candidate list in order and sends the query to each
##
@ -138,6 +152,15 @@ proc queryLoop(w: WakuStoreClient, req: HistoryQuery, peers: seq[RemotePeerInfo]
return ok(messagesList)
proc put(store: ArchiveDriver, pubsubTopic: PubsubTopic, message: WakuMessage): Result[void, string] =
let
digest = waku_archive.computeDigest(message)
receivedTime = if message.timestamp > 0: message.timestamp
else: getNanosecondTime(getTime().toUnixFloat())
store.put(pubsubTopic, message, digest, receivedTime)
proc resume*(w: WakuStoreClient,
peerList = none(seq[RemotePeerInfo]),
pageSize = DefaultPageSize,

View File

@ -60,8 +60,6 @@ type
HistoryResponse* = object
messages*: seq[WakuMessage]
pageSize*: uint64
ascending*: bool
cursor*: Option[HistoryCursor]
HistoryErrorKind* {.pure.} = enum

View File

@ -1,64 +0,0 @@
## This module defines a message store interface. Implementations of
## MessageStore are used by the `WakuStore` protocol to store and
## retrieve historical messages
when (NimMajor, NimMinor) < (1, 4):
{.push raises: [Defect].}
else:
{.push raises: [].}
import
std/[options, times],
stew/results
import
../../utils/time,
../waku_message,
./common
type
MessageStoreResult*[T] = Result[T, string]
MessageStoreRow* = (PubsubTopic, WakuMessage, seq[byte], Timestamp)
MessageStore* = ref object of RootObj
# MessageStore interface
method put*(ms: MessageStore, pubsubTopic: PubsubTopic, message: WakuMessage, digest: MessageDigest, receivedTime: Timestamp): MessageStoreResult[void] {.base.} = discard
method put*(ms: MessageStore, pubsubTopic: PubsubTopic, message: WakuMessage): MessageStoreResult[void] {.base.} =
let
digest = computeDigest(message)
receivedTime = if message.timestamp > 0: message.timestamp
else: getNanosecondTime(getTime().toUnixFloat())
ms.put(pubsubTopic, message, digest, receivedTime)
method getAllMessages*(ms: MessageStore): MessageStoreResult[seq[MessageStoreRow]] {.base.} = discard
method getMessagesByHistoryQuery*(
ms: MessageStore,
contentTopic = none(seq[ContentTopic]),
pubsubTopic = none(PubsubTopic),
cursor = none(HistoryCursor),
startTime = none(Timestamp),
endTime = none(Timestamp),
maxPageSize = DefaultPageSize,
ascendingOrder = true
): MessageStoreResult[seq[MessageStoreRow]] {.base.} = discard
# Store manipulation
method getMessagesCount*(ms: MessageStore): MessageStoreResult[int64] {.base.} = discard
method getOldestMessageTimestamp*(ms: MessageStore): MessageStoreResult[Timestamp] {.base.} = discard
method getNewestMessageTimestamp*(ms: MessageStore): MessageStoreResult[Timestamp] {.base.} = discard
method deleteMessagesOlderThanTimestamp*(ms: MessageStore, ts: Timestamp): MessageStoreResult[void] {.base.} = discard
method deleteOldestMessagesNotWithinLimit*(ms: MessageStore, limit: int): MessageStoreResult[void] {.base.} = discard

View File

@ -18,14 +18,12 @@ import
libp2p/stream/connection,
metrics
import
../../node/message_store/message_retention_policy,
../../node/peer_manager/peer_manager,
../../utils/time,
../waku_message,
./common,
./rpc,
./rpc_codec,
./message_store,
./protocol_metrics
@ -44,167 +42,6 @@ type
peerManager: PeerManager
rng: ref rand.HmacDrbgContext
queryHandler: HistoryQueryHandler
store*: MessageStore
retentionPolicy: Option[MessageRetentionPolicy]
# TODO: Move to a message store wrapper
proc executeMessageRetentionPolicy*(w: WakuStore) =
if w.retentionPolicy.isNone():
return
if w.store.isNil():
return
let policy = w.retentionPolicy.get()
let retPolicyRes = policy.execute(w.store)
if retPolicyRes.isErr():
waku_store_errors.inc(labelValues = [retPolicyFailure])
debug "failed execution of retention policy", error=retPolicyRes.error
# TODO: Move to a message store wrapper
proc reportStoredMessagesMetric*(w: WakuStore) =
if w.store.isNil():
return
let resCount = w.store.getMessagesCount()
if resCount.isErr():
return
waku_store_messages.set(resCount.value, labelValues = ["stored"])
# TODO: Move to a message store wrapper
proc isValidMessage(msg: WakuMessage): bool =
if msg.timestamp == 0:
return true
let
now = getNanosecondTime(getTime().toUnixFloat())
lowerBound = now - MaxMessageTimestampVariance
upperBound = now + MaxMessageTimestampVariance
return lowerBound <= msg.timestamp and msg.timestamp <= upperBound
# TODO: Move to a message store wrapper
proc handleMessage*(w: WakuStore, pubsubTopic: PubsubTopic, msg: WakuMessage) =
if w.store.isNil():
# Messages should not be stored
return
if msg.ephemeral:
# The message is ephemeral, should not be stored
return
if not isValidMessage(msg):
waku_store_errors.inc(labelValues = [invalidMessage])
return
let insertStartTime = getTime().toUnixFloat()
block:
let
msgDigest = computeDigest(msg)
msgReceivedTime = if msg.timestamp > 0: msg.timestamp
else: getNanosecondTime(getTime().toUnixFloat())
trace "handling message", pubsubTopic=pubsubTopic, contentTopic=msg.contentTopic, timestamp=msg.timestamp, digest=msgDigest
let putStoreRes = w.store.put(pubsubTopic, msg, msgDigest, msgReceivedTime)
if putStoreRes.isErr():
debug "failed to insert message into the store", err=putStoreRes.error
waku_store_errors.inc(labelValues = [insertFailure])
return
let insertDuration = getTime().toUnixFloat() - insertStartTime
waku_store_insert_duration_seconds.observe(insertDuration)
# TODO: Move to a message store wrapper
proc findMessages(w: WakuStore, query: HistoryQuery): HistoryResult {.gcsafe.} =
## Query history to return a single page of messages matching the query
# Extract query criteria. All query criteria are optional
let
qContentTopics = if query.contentTopics.len == 0: none(seq[ContentTopic])
else: some(query.contentTopics)
qPubSubTopic = query.pubsubTopic
qCursor = query.cursor
qStartTime = query.startTime
qEndTime = query.endTime
qMaxPageSize = if query.pageSize <= 0: DefaultPageSize
else: min(query.pageSize, MaxPageSize)
qAscendingOrder = query.ascending
let queryStartTime = getTime().toUnixFloat()
let queryRes = w.store.getMessagesByHistoryQuery(
contentTopic = qContentTopics,
pubsubTopic = qPubSubTopic,
cursor = qCursor,
startTime = qStartTime,
endTime = qEndTime,
maxPageSize = qMaxPageSize + 1,
ascendingOrder = qAscendingOrder
)
let queryDuration = getTime().toUnixFloat() - queryStartTime
waku_store_query_duration_seconds.observe(queryDuration)
# Build response
if queryRes.isErr():
# TODO: Improve error reporting
return err(HistoryError(kind: HistoryErrorKind.UNKNOWN))
let rows = queryRes.get()
if rows.len <= 0:
return ok(HistoryResponse(
messages: @[],
pageSize: 0,
ascending: qAscendingOrder,
cursor: none(HistoryCursor)
))
var messages = if rows.len <= int(qMaxPageSize): rows.mapIt(it[1])
else: rows[0..^2].mapIt(it[1])
var cursor = none(HistoryCursor)
# The retrieved messages list should always be in chronological order
if not qAscendingOrder:
messages.reverse()
if rows.len > int(qMaxPageSize):
## Build last message cursor
## The cursor is built from the last message INCLUDED in the response
## (i.e. the second last message in the rows list)
let (pubsubTopic, message, digest, storeTimestamp) = rows[^2]
# TODO: Improve coherence of MessageDigest type
var messageDigest: array[32, byte]
for i in 0..<min(digest.len, 32):
messageDigest[i] = digest[i]
cursor = some(HistoryCursor(
pubsubTopic: pubsubTopic,
senderTime: message.timestamp,
storeTime: storeTimestamp,
digest: MessageDigest(data: messageDigest)
))
ok(HistoryResponse(
messages: messages,
pageSize: uint64(messages.len),
ascending: qAscendingOrder,
cursor: cursor
))
## Protocol
@ -257,20 +94,6 @@ proc initProtocolHandler(ws: WakuStore) =
ws.handler = handler
ws.codec = WakuStoreCodec
proc new*(T: type WakuStore,
peerManager: PeerManager,
rng: ref rand.HmacDrbgContext,
store: MessageStore,
retentionPolicy=none(MessageRetentionPolicy)): T =
let ws = WakuStore(
rng: rng,
peerManager: peerManager,
store: store,
retentionPolicy: retentionPolicy
)
ws.queryHandler = proc(request: HistoryQuery): HistoryResult = ws.findMessages(request)
ws.initProtocolHandler()
ws
proc new*(T: type WakuStore,
peerManager: PeerManager,

View File

@ -208,16 +208,7 @@ proc toRPC*(res: HistoryResult): HistoryResponseRPC =
if resp.cursor.isNone():
none(PagingInfoRPC)
else:
let
pageSize = some(resp.pageSize)
cursor = resp.cursor.map(toRPC)
direction = if resp.ascending: some(PagingDirectionRPC.FORWARD)
else: some(PagingDirectionRPC.BACKWARD)
some(PagingInfoRPC(
pageSize: pageSize,
cursor: cursor,
direction: direction
))
some(PagingInfoRPC(cursor: resp.cursor.map(toRPC)))
error = HistoryResponseErrorRPC.NONE
@ -234,18 +225,10 @@ proc toAPI*(rpc: HistoryResponseRPC): HistoryResult =
let
messages = rpc.messages
pageSize = if rpc.pagingInfo.isNone(): 0'u64
else: rpc.pagingInfo.get().pageSize.get(0'u64)
ascending = if rpc.pagingInfo.isNone(): true
else: rpc.pagingInfo.get().direction.get(PagingDirectionRPC.FORWARD) == PagingDirectionRPC.FORWARD
cursor = if rpc.pagingInfo.isNone(): none(HistoryCursor)
else: rpc.pagingInfo.get().cursor.map(toAPI)
ok(HistoryResponse(
messages: messages,
pageSize: pageSize,
ascending: ascending,
cursor: cursor
))