mirror of
https://github.com/waku-org/nwaku.git
synced 2025-01-24 13:50:25 +00:00
f54ba10bc7
* queue driver refactor (#2753) * chore(archive): archive refactor (#2752) * chore(archive): sqlite driver refactor (#2754) * chore(archive): postgres driver refactor (#2755) * chore(archive): renaming & copies (#2751) * posgres legacy: stop using the storedAt field * migration script 6: we still need the id column The id column is needed because it contains the message digest which is used in store v2, and we need to keep support to store v2 for a while * legacy archive: set target migration version to 6 * waku_node: try to use wakuLegacyArchive if wakuArchive is nil * node_factory, waku_node: mount legacy and future store simultaneously We want the nwaku node to simultaneously support store-v2 requests and store-v3 requests. Only the legacy archive is in charge of archiving messages, and the archived information is suitable to fulfill both store-v2 and store-v3 needs. * postgres_driver: adding temporary code until store-v2 is removed --------- Co-authored-by: Ivan FB <128452529+Ivansete-status@users.noreply.github.com> Co-authored-by: gabrielmer <101006718+gabrielmer@users.noreply.github.com> Co-authored-by: Ivan Folgueira Bande <ivansete@status.im>
183 lines
4.5 KiB
Nim
183 lines
4.5 KiB
Nim
{.used.}
|
|
|
|
import std/options, stew/results, testutils/unittests
|
|
import
|
|
waku/waku_archive_legacy,
|
|
waku/waku_archive_legacy/driver/queue_driver/queue_driver {.all.},
|
|
waku/waku_archive_legacy/driver/queue_driver/index,
|
|
waku/waku_core
|
|
|
|
# Helper functions
|
|
|
|
proc genIndexedWakuMessage(i: int8): (Index, WakuMessage) =
|
|
## Use i to generate an Index WakuMessage
|
|
var data {.noinit.}: array[32, byte]
|
|
for x in data.mitems:
|
|
x = i.byte
|
|
|
|
let
|
|
message = WakuMessage(payload: @[byte i], timestamp: Timestamp(i))
|
|
topic = "test-pubsub-topic"
|
|
cursor = Index(
|
|
receiverTime: Timestamp(i),
|
|
senderTime: Timestamp(i),
|
|
digest: MessageDigest(data: data),
|
|
pubsubTopic: topic,
|
|
hash: computeMessageHash(topic, message),
|
|
)
|
|
|
|
(cursor, message)
|
|
|
|
proc getPrepopulatedTestQueue(unsortedSet: auto, capacity: int): QueueDriver =
|
|
let driver = QueueDriver.new(capacity)
|
|
|
|
for i in unsortedSet:
|
|
let (index, message) = genIndexedWakuMessage(i.int8)
|
|
discard driver.add(index, message)
|
|
|
|
driver
|
|
|
|
procSuite "Sorted driver queue":
|
|
test "queue capacity - add a message over the limit":
|
|
## Given
|
|
let capacity = 5
|
|
let driver = QueueDriver.new(capacity)
|
|
|
|
## When
|
|
# Fill up the queue
|
|
for i in 1 .. capacity:
|
|
let (index, message) = genIndexedWakuMessage(i.int8)
|
|
require(driver.add(index, message).isOk())
|
|
|
|
# Add one more. Capacity should not be exceeded
|
|
let (index, message) = genIndexedWakuMessage(capacity.int8 + 1)
|
|
require(driver.add(index, message).isOk())
|
|
|
|
## Then
|
|
check:
|
|
driver.len == capacity
|
|
|
|
test "queue capacity - add message older than oldest in the queue":
|
|
## Given
|
|
let capacity = 5
|
|
let driver = QueueDriver.new(capacity)
|
|
|
|
## When
|
|
# Fill up the queue
|
|
for i in 1 .. capacity:
|
|
let (index, message) = genIndexedWakuMessage(i.int8)
|
|
require(driver.add(index, message).isOk())
|
|
|
|
# Attempt to add message with older value than oldest in queue should fail
|
|
let
|
|
oldestTimestamp = driver.first().get().senderTime
|
|
(index, message) = genIndexedWakuMessage(oldestTimestamp.int8 - 1)
|
|
addRes = driver.add(index, message)
|
|
|
|
## Then
|
|
check:
|
|
addRes.isErr()
|
|
addRes.error() == "too_old"
|
|
|
|
check:
|
|
driver.len == capacity
|
|
|
|
test "queue sort-on-insert":
|
|
## Given
|
|
let
|
|
capacity = 5
|
|
unsortedSet = [5, 1, 3, 2, 4]
|
|
let driver = getPrepopulatedTestQueue(unsortedSet, capacity)
|
|
|
|
# Walk forward through the set and verify ascending order
|
|
var (prevSmaller, _) = genIndexedWakuMessage(min(unsortedSet).int8 - 1)
|
|
for i in driver.fwdIterator:
|
|
let (index, _) = i
|
|
check cmp(index, prevSmaller) > 0
|
|
prevSmaller = index
|
|
|
|
# Walk backward through the set and verify descending order
|
|
var (prevLarger, _) = genIndexedWakuMessage(max(unsortedSet).int8 + 1)
|
|
for i in driver.bwdIterator:
|
|
let (index, _) = i
|
|
check cmp(index, prevLarger) < 0
|
|
prevLarger = index
|
|
|
|
test "access first item from queue":
|
|
## Given
|
|
let
|
|
capacity = 5
|
|
unsortedSet = [5, 1, 3, 2, 4]
|
|
let driver = getPrepopulatedTestQueue(unsortedSet, capacity)
|
|
|
|
## When
|
|
let firstRes = driver.first()
|
|
|
|
## Then
|
|
check:
|
|
firstRes.isOk()
|
|
|
|
let first = firstRes.tryGet()
|
|
check:
|
|
first.senderTime == Timestamp(1)
|
|
|
|
test "get first item from empty queue should fail":
|
|
## Given
|
|
let capacity = 5
|
|
let driver = QueueDriver.new(capacity)
|
|
|
|
## When
|
|
let firstRes = driver.first()
|
|
|
|
## Then
|
|
check:
|
|
firstRes.isErr()
|
|
firstRes.error() == "Not found"
|
|
|
|
test "access last item from queue":
|
|
## Given
|
|
let
|
|
capacity = 5
|
|
unsortedSet = [5, 1, 3, 2, 4]
|
|
let driver = getPrepopulatedTestQueue(unsortedSet, capacity)
|
|
|
|
## When
|
|
let lastRes = driver.last()
|
|
|
|
## Then
|
|
check:
|
|
lastRes.isOk()
|
|
|
|
let last = lastRes.tryGet()
|
|
check:
|
|
last.senderTime == Timestamp(5)
|
|
|
|
test "get last item from empty queue should fail":
|
|
## Given
|
|
let capacity = 5
|
|
let driver = QueueDriver.new(capacity)
|
|
|
|
## When
|
|
let lastRes = driver.last()
|
|
|
|
## Then
|
|
check:
|
|
lastRes.isErr()
|
|
lastRes.error() == "Not found"
|
|
|
|
test "verify if queue contains an index":
|
|
## Given
|
|
let
|
|
capacity = 5
|
|
unsortedSet = [5, 1, 3, 2, 4]
|
|
let driver = getPrepopulatedTestQueue(unsortedSet, capacity)
|
|
|
|
let
|
|
(existingIndex, _) = genIndexedWakuMessage(4)
|
|
(nonExistingIndex, _) = genIndexedWakuMessage(99)
|
|
|
|
## Then
|
|
check:
|
|
driver.contains(existingIndex) == true
|
|
driver.contains(nonExistingIndex) == false
|