diff --git a/tests/v2/test_waku_store_queue.nim b/tests/v2/test_waku_store_queue.nim index 224d7fc7b..85ad96d5d 100644 --- a/tests/v2/test_waku_store_queue.nim +++ b/tests/v2/test_waku_store_queue.nim @@ -1,7 +1,7 @@ {.used.} import - std/sequtils, + std/[sequtils, strutils], testutils/unittests, ../../waku/v2/protocol/waku_store/waku_store_types, ../../waku/v2/utils/time @@ -40,10 +40,41 @@ procSuite "Sorted store queue": # Add one more. Capacity should not be exceeded. check: stQ.add(genIndexedWakuMessage(capacity.int8 + 1)).isOk() + stQ.len == capacity + + # Attempt to add message with older value than oldest in queue should fail + let + oldestTimestamp = stQ.first().get().index.senderTime + addRes = stQ.add(genIndexedWakuMessage(oldestTimestamp.int8 - 1)) check: + oldestTimestamp == 2 + addRes.isErr() + ($(addRes.error())).contains("too_old") stQ.len == capacity + test "Sender time can't be more than MaxTimeVariance in future": + var stQ = StoreQueueRef.new(capacity) + let + receiverTime = getNanoSecondTime(10) + senderTimeOk = receiverTime + MaxTimeVariance + senderTimeErr = senderTimeOk + 1 + validMessage = IndexedWakuMessage(msg: WakuMessage(payload: @[byte 1], timestamp: senderTimeOk), + index: Index(receiverTime: receiverTime, senderTime: senderTimeOk)) + invalidMessage = IndexedWakuMessage(msg: WakuMessage(payload: @[byte 1], timestamp: senderTimeErr), + index: Index(receiverTime: receiverTime, senderTime: senderTimeErr)) + + # Invalid case + let invalidRes = stQ.add(invalidMessage) + check: + invalidRes.isErr() + ($(invalidRes.error())).contains("future_sender_timestamp") + + # Valid case + let validRes = stQ.add(validMessage) + check: + validRes.isOk() + test "Store queue sort-on-insert works": # Walk forward through the set and verify ascending order var prevSmaller = genIndexedWakuMessage(min(unsortedSet).int8 - 1).index diff --git a/vendor/nim-chronicles b/vendor/nim-chronicles index 168209630..972f25d6c 160000 --- a/vendor/nim-chronicles +++ b/vendor/nim-chronicles @@ -1 +1 @@ -Subproject commit 1682096306ddba8185dcfac360a8c3f952d721e4 +Subproject commit 972f25d6c3a324848728d2d05796209f1b9d120e diff --git a/vendor/nim-chronos b/vendor/nim-chronos index 871972307..bb4c3298f 160000 --- a/vendor/nim-chronos +++ b/vendor/nim-chronos @@ -1 +1 @@ -Subproject commit 87197230779002a2bfa8642f0e2ae07e2349e304 +Subproject commit bb4c3298f56ba7bc69fbccd08fd6e5474c410262 diff --git a/vendor/nim-faststreams b/vendor/nim-faststreams index 1a85b3d50..5a5bfd3c0 160000 --- a/vendor/nim-faststreams +++ b/vendor/nim-faststreams @@ -1 +1 @@ -Subproject commit 1a85b3d50bf9a6dbbd58df045633dae0907c51c2 +Subproject commit 5a5bfd3c09887cdf80b052d14bff7b9a0a919938 diff --git a/vendor/nim-json-rpc b/vendor/nim-json-rpc index b4bab89ab..335f292a5 160000 --- a/vendor/nim-json-rpc +++ b/vendor/nim-json-rpc @@ -1 +1 @@ -Subproject commit b4bab89abdde3653939abe36ab9f6cae4aa1cbd1 +Subproject commit 335f292a5816910aebf215e3a88db8a665133e0e diff --git a/vendor/nim-json-serialization b/vendor/nim-json-serialization index 461fd03ed..074cd026e 160000 --- a/vendor/nim-json-serialization +++ b/vendor/nim-json-serialization @@ -1 +1 @@ -Subproject commit 461fd03edb300b7946544b34442d1a05d4ef2270 +Subproject commit 074cd026e61675708d518839e6914d55baa4b8ca diff --git a/vendor/nim-libbacktrace b/vendor/nim-libbacktrace index 284b3aac0..2ea147a71 160000 --- a/vendor/nim-libbacktrace +++ b/vendor/nim-libbacktrace @@ -1 +1 @@ -Subproject commit 284b3aac05a9d96c27044c389a5d27a84d8e8f4b +Subproject commit 2ea147a71c4c05d64fdff5e7b8d8990eb5821399 diff --git a/vendor/nim-libp2p b/vendor/nim-libp2p index eaa72dcdb..9973b9466 160000 --- a/vendor/nim-libp2p +++ b/vendor/nim-libp2p @@ -1 +1 @@ -Subproject commit eaa72dcdbe26d8236c64711e32cf0c2f8ac65e64 +Subproject commit 9973b9466dd062d7406d0178c750249b7e48471f diff --git a/vendor/nim-metrics b/vendor/nim-metrics index 858f73b7d..11edec862 160000 --- a/vendor/nim-metrics +++ b/vendor/nim-metrics @@ -1 +1 @@ -Subproject commit 858f73b7d3ae992333a7ffab35da87e3b7b81356 +Subproject commit 11edec862f96e42374bc2d584c84cc88d5d1f95f diff --git a/vendor/nim-nat-traversal b/vendor/nim-nat-traversal index 8994b67b0..11df74552 160000 --- a/vendor/nim-nat-traversal +++ b/vendor/nim-nat-traversal @@ -1 +1 @@ -Subproject commit 8994b67b07813955c61bebddf4bd2325439c3535 +Subproject commit 11df74552d3a3abe2c722c536c8075ef6814d5fa diff --git a/vendor/nim-stew b/vendor/nim-stew index 419903c9a..cdb1f213d 160000 --- a/vendor/nim-stew +++ b/vendor/nim-stew @@ -1 +1 @@ -Subproject commit 419903c9a31ab253cf5cf19f24d9a912dc4b5154 +Subproject commit cdb1f213d073fd2ecbdaf35a866417657da9294c diff --git a/vendor/nim-stint b/vendor/nim-stint index e656ad40d..c05f75a8d 160000 --- a/vendor/nim-stint +++ b/vendor/nim-stint @@ -1 +1 @@ -Subproject commit e656ad40d1815f618c12c7636a37e6971959aa62 +Subproject commit c05f75a8dae5f0066db5008dbe41a803ecbfbbcf diff --git a/vendor/nim-testutils b/vendor/nim-testutils index 1f6644815..b6867213f 160000 --- a/vendor/nim-testutils +++ b/vendor/nim-testutils @@ -1 +1 @@ -Subproject commit 1f66448152a03e45ebb556c00300e13565cd8adf +Subproject commit b6867213f289f12d8f15acdd154a32b98df332bf diff --git a/vendor/nim-web3 b/vendor/nim-web3 index c3b386b1e..d260915a0 160000 --- a/vendor/nim-web3 +++ b/vendor/nim-web3 @@ -1 +1 @@ -Subproject commit c3b386b1e88c0d60bc4ff9da215f969b23ee2f39 +Subproject commit d260915a0c2111e64ac49b9f75083734c58e5f03 diff --git a/vendor/nim-websock b/vendor/nim-websock index 4a7a05884..14d852e0d 160000 --- a/vendor/nim-websock +++ b/vendor/nim-websock @@ -1 +1 @@ -Subproject commit 4a7a058843cdb7a6a4fd25a55f0959c51b0b5847 +Subproject commit 14d852e0d40ce71d916545b54ac86c2e35315f62 diff --git a/vendor/nimbus-build-system b/vendor/nimbus-build-system index 041544c2b..a8ab2dc39 160000 --- a/vendor/nimbus-build-system +++ b/vendor/nimbus-build-system @@ -1 +1 @@ -Subproject commit 041544c2b7681040d160f7bd95c0ecc010c06968 +Subproject commit a8ab2dc39aad4d69ba3be72868772d851b4b9741 diff --git a/waku/v2/protocol/waku_noise/noise.nim b/waku/v2/protocol/waku_noise/noise.nim index 7850b7607..b4bbe0788 100644 --- a/waku/v2/protocol/waku_noise/noise.nim +++ b/waku/v2/protocol/waku_noise/noise.nim @@ -7,11 +7,10 @@ {.push raises: [Defect].} -import std/[oids, options, math, tables] +import std/[oids, options, strutils, tables] import chronos import chronicles import bearssl -import strutils import stew/[results, endians2, byteutils] import nimcrypto/[utils, sha2, hmac] @@ -765,7 +764,7 @@ proc serializePayloadV2*(self: PayloadV2): Result[seq[byte], cstring] = payload.add self.transportMessage return ok(payload) - + # Deserializes a byte sequence to a PayloadV2 object according to https://rfc.vac.dev/spec/35/. # The input serialized payload concatenates the output PayloadV2 object fields as diff --git a/waku/v2/protocol/waku_store/waku_store.nim b/waku/v2/protocol/waku_store/waku_store.nim index 05175d7cc..971faf8cd 100644 --- a/waku/v2/protocol/waku_store/waku_store.nim +++ b/waku/v2/protocol/waku_store/waku_store.nim @@ -405,8 +405,8 @@ proc handleMessage*(w: WakuStore, topic: string, msg: WakuMessage) {.async.} = let addRes = w.messages.add(IndexedWakuMessage(msg: msg, index: index, pubsubTopic: topic)) if addRes.isErr: - trace "Attempt to add message with duplicate index to store", msg=msg, index=index - waku_store_errors.inc(labelValues = ["duplicate"]) + trace "Attempt to add message to store failed", msg=msg, index=index, err=addRes.error() + waku_store_errors.inc(labelValues = [$(addRes.error())]) return # Do not attempt to store in persistent DB waku_store_messages.set(w.messages.len.int64, labelValues = ["stored"]) diff --git a/waku/v2/protocol/waku_store/waku_store_types.nim b/waku/v2/protocol/waku_store/waku_store_types.nim index 8e98098ef..6f4bea68d 100644 --- a/waku/v2/protocol/waku_store/waku_store_types.nim +++ b/waku/v2/protocol/waku_store/waku_store_types.nim @@ -36,6 +36,8 @@ const MaxRpcSize* = MaxPageSize * MaxWakuMessageSize + 64*1024 # We add a 64kB safety buffer for protocol overhead + MaxTimeVariance* = getNanoSecondTime(20) # 20 seconds maximum allowable sender timestamp "drift" into the future + DefaultTopic* = "/waku/2/default-waku/proto" @@ -368,20 +370,28 @@ proc new*(T: type StoreQueueRef, capacity: int): T = return StoreQueueRef(items: items, capacity: capacity) proc add*(storeQueue: StoreQueueRef, msg: IndexedWakuMessage): StoreQueueResult[void] = - ## Add a message to the queue. + ## Add a message to the queue ## If we're at capacity, we will be removing, ## the oldest (first) item + # Ensure that messages don't "jump" to the front of the queue with future timestamps + if msg.index.senderTime - msg.index.receiverTime > MaxTimeVariance: + return err("future_sender_timestamp") + trace "Adding item to store queue", msg=msg # TODO the below delete block can be removed if we convert to circular buffer if storeQueue.items.len >= storeQueue.capacity: var w = SortedSetWalkRef[Index, IndexedWakuMessage].init(storeQueue.items) - toDelete = w.first + firstItem = w.first - trace "Store queue at capacity. Deleting oldest item.", oldest=toDelete - discard storeQueue.items.delete(toDelete.value.key) + if cmp(msg.index, firstItem.value.key) < 0: + # When at capacity, we won't add if message index is smaller (older) than our oldest item + w.destroy # Clean up walker + return err("too_old") + + discard storeQueue.items.delete(firstItem.value.key) w.destroy # better to destroy walker after a delete operation let res = storeQueue.items.insert(msg.index)