From 450e0494f1cd8a4d20cc01dc4d2b7b3d3fad8988 Mon Sep 17 00:00:00 2001 From: Hanno Cornelius <68783915+jm-clius@users.noreply.github.com> Date: Mon, 11 Apr 2022 14:58:18 +0200 Subject: [PATCH] fix(store): timestamp validity checks (#876) --- tests/v2/test_waku_store_queue.nim | 33 ++++++++++++++++++- vendor/nim-eth | 2 +- waku/v2/protocol/waku_store/waku_store.nim | 4 +-- .../protocol/waku_store/waku_store_types.nim | 18 +++++++--- 4 files changed, 49 insertions(+), 8 deletions(-) diff --git a/tests/v2/test_waku_store_queue.nim b/tests/v2/test_waku_store_queue.nim index 224d7fc7b..85ad96d5d 100644 --- a/tests/v2/test_waku_store_queue.nim +++ b/tests/v2/test_waku_store_queue.nim @@ -1,7 +1,7 @@ {.used.} import - std/sequtils, + std/[sequtils, strutils], testutils/unittests, ../../waku/v2/protocol/waku_store/waku_store_types, ../../waku/v2/utils/time @@ -40,10 +40,41 @@ procSuite "Sorted store queue": # Add one more. Capacity should not be exceeded. check: stQ.add(genIndexedWakuMessage(capacity.int8 + 1)).isOk() + stQ.len == capacity + + # Attempt to add message with older value than oldest in queue should fail + let + oldestTimestamp = stQ.first().get().index.senderTime + addRes = stQ.add(genIndexedWakuMessage(oldestTimestamp.int8 - 1)) check: + oldestTimestamp == 2 + addRes.isErr() + ($(addRes.error())).contains("too_old") stQ.len == capacity + test "Sender time can't be more than MaxTimeVariance in future": + var stQ = StoreQueueRef.new(capacity) + let + receiverTime = getNanoSecondTime(10) + senderTimeOk = receiverTime + MaxTimeVariance + senderTimeErr = senderTimeOk + 1 + validMessage = IndexedWakuMessage(msg: WakuMessage(payload: @[byte 1], timestamp: senderTimeOk), + index: Index(receiverTime: receiverTime, senderTime: senderTimeOk)) + invalidMessage = IndexedWakuMessage(msg: WakuMessage(payload: @[byte 1], timestamp: senderTimeErr), + index: Index(receiverTime: receiverTime, senderTime: senderTimeErr)) + + # Invalid case + let invalidRes = stQ.add(invalidMessage) + check: + invalidRes.isErr() + ($(invalidRes.error())).contains("future_sender_timestamp") + + # Valid case + let validRes = stQ.add(validMessage) + check: + validRes.isOk() + test "Store queue sort-on-insert works": # Walk forward through the set and verify ascending order var prevSmaller = genIndexedWakuMessage(min(unsortedSet).int8 - 1).index diff --git a/vendor/nim-eth b/vendor/nim-eth index 92cd608a5..d442d84d2 160000 --- a/vendor/nim-eth +++ b/vendor/nim-eth @@ -1 +1 @@ -Subproject commit 92cd608a5f47de1aa55861afa6dcc13bea4ae842 +Subproject commit d442d84d221655ea25271b41bd2de546bafe4914 diff --git a/waku/v2/protocol/waku_store/waku_store.nim b/waku/v2/protocol/waku_store/waku_store.nim index 05175d7cc..971faf8cd 100644 --- a/waku/v2/protocol/waku_store/waku_store.nim +++ b/waku/v2/protocol/waku_store/waku_store.nim @@ -405,8 +405,8 @@ proc handleMessage*(w: WakuStore, topic: string, msg: WakuMessage) {.async.} = let addRes = w.messages.add(IndexedWakuMessage(msg: msg, index: index, pubsubTopic: topic)) if addRes.isErr: - trace "Attempt to add message with duplicate index to store", msg=msg, index=index - waku_store_errors.inc(labelValues = ["duplicate"]) + trace "Attempt to add message to store failed", msg=msg, index=index, err=addRes.error() + waku_store_errors.inc(labelValues = [$(addRes.error())]) return # Do not attempt to store in persistent DB waku_store_messages.set(w.messages.len.int64, labelValues = ["stored"]) diff --git a/waku/v2/protocol/waku_store/waku_store_types.nim b/waku/v2/protocol/waku_store/waku_store_types.nim index 8e98098ef..6f4bea68d 100644 --- a/waku/v2/protocol/waku_store/waku_store_types.nim +++ b/waku/v2/protocol/waku_store/waku_store_types.nim @@ -36,6 +36,8 @@ const MaxRpcSize* = MaxPageSize * MaxWakuMessageSize + 64*1024 # We add a 64kB safety buffer for protocol overhead + MaxTimeVariance* = getNanoSecondTime(20) # 20 seconds maximum allowable sender timestamp "drift" into the future + DefaultTopic* = "/waku/2/default-waku/proto" @@ -368,20 +370,28 @@ proc new*(T: type StoreQueueRef, capacity: int): T = return StoreQueueRef(items: items, capacity: capacity) proc add*(storeQueue: StoreQueueRef, msg: IndexedWakuMessage): StoreQueueResult[void] = - ## Add a message to the queue. + ## Add a message to the queue ## If we're at capacity, we will be removing, ## the oldest (first) item + # Ensure that messages don't "jump" to the front of the queue with future timestamps + if msg.index.senderTime - msg.index.receiverTime > MaxTimeVariance: + return err("future_sender_timestamp") + trace "Adding item to store queue", msg=msg # TODO the below delete block can be removed if we convert to circular buffer if storeQueue.items.len >= storeQueue.capacity: var w = SortedSetWalkRef[Index, IndexedWakuMessage].init(storeQueue.items) - toDelete = w.first + firstItem = w.first - trace "Store queue at capacity. Deleting oldest item.", oldest=toDelete - discard storeQueue.items.delete(toDelete.value.key) + if cmp(msg.index, firstItem.value.key) < 0: + # When at capacity, we won't add if message index is smaller (older) than our oldest item + w.destroy # Clean up walker + return err("too_old") + + discard storeQueue.items.delete(firstItem.value.key) w.destroy # better to destroy walker after a delete operation let res = storeQueue.items.insert(msg.index)