fix(store): timestamp validity checks (#876)

This commit is contained in:
Hanno Cornelius 2022-04-11 14:58:18 +02:00 committed by GitHub
parent d2fccb5220
commit 450e0494f1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 49 additions and 8 deletions

View File

@ -1,7 +1,7 @@
{.used.}
import
std/sequtils,
std/[sequtils, strutils],
testutils/unittests,
../../waku/v2/protocol/waku_store/waku_store_types,
../../waku/v2/utils/time
@ -40,10 +40,41 @@ procSuite "Sorted store queue":
# Add one more. Capacity should not be exceeded.
check:
stQ.add(genIndexedWakuMessage(capacity.int8 + 1)).isOk()
stQ.len == capacity
# Attempt to add message with older value than oldest in queue should fail
let
oldestTimestamp = stQ.first().get().index.senderTime
addRes = stQ.add(genIndexedWakuMessage(oldestTimestamp.int8 - 1))
check:
oldestTimestamp == 2
addRes.isErr()
($(addRes.error())).contains("too_old")
stQ.len == capacity
test "Sender time can't be more than MaxTimeVariance in future":
var stQ = StoreQueueRef.new(capacity)
let
receiverTime = getNanoSecondTime(10)
senderTimeOk = receiverTime + MaxTimeVariance
senderTimeErr = senderTimeOk + 1
validMessage = IndexedWakuMessage(msg: WakuMessage(payload: @[byte 1], timestamp: senderTimeOk),
index: Index(receiverTime: receiverTime, senderTime: senderTimeOk))
invalidMessage = IndexedWakuMessage(msg: WakuMessage(payload: @[byte 1], timestamp: senderTimeErr),
index: Index(receiverTime: receiverTime, senderTime: senderTimeErr))
# Invalid case
let invalidRes = stQ.add(invalidMessage)
check:
invalidRes.isErr()
($(invalidRes.error())).contains("future_sender_timestamp")
# Valid case
let validRes = stQ.add(validMessage)
check:
validRes.isOk()
test "Store queue sort-on-insert works":
# Walk forward through the set and verify ascending order
var prevSmaller = genIndexedWakuMessage(min(unsortedSet).int8 - 1).index

2
vendor/nim-eth vendored

@ -1 +1 @@
Subproject commit 92cd608a5f47de1aa55861afa6dcc13bea4ae842
Subproject commit d442d84d221655ea25271b41bd2de546bafe4914

View File

@ -405,8 +405,8 @@ proc handleMessage*(w: WakuStore, topic: string, msg: WakuMessage) {.async.} =
let addRes = w.messages.add(IndexedWakuMessage(msg: msg, index: index, pubsubTopic: topic))
if addRes.isErr:
trace "Attempt to add message with duplicate index to store", msg=msg, index=index
waku_store_errors.inc(labelValues = ["duplicate"])
trace "Attempt to add message to store failed", msg=msg, index=index, err=addRes.error()
waku_store_errors.inc(labelValues = [$(addRes.error())])
return # Do not attempt to store in persistent DB
waku_store_messages.set(w.messages.len.int64, labelValues = ["stored"])

View File

@ -36,6 +36,8 @@ const
MaxRpcSize* = MaxPageSize * MaxWakuMessageSize + 64*1024 # We add a 64kB safety buffer for protocol overhead
MaxTimeVariance* = getNanoSecondTime(20) # 20 seconds maximum allowable sender timestamp "drift" into the future
DefaultTopic* = "/waku/2/default-waku/proto"
@ -368,20 +370,28 @@ proc new*(T: type StoreQueueRef, capacity: int): T =
return StoreQueueRef(items: items, capacity: capacity)
proc add*(storeQueue: StoreQueueRef, msg: IndexedWakuMessage): StoreQueueResult[void] =
## Add a message to the queue.
## Add a message to the queue
## If we're at capacity, we will be removing,
## the oldest (first) item
# Ensure that messages don't "jump" to the front of the queue with future timestamps
if msg.index.senderTime - msg.index.receiverTime > MaxTimeVariance:
return err("future_sender_timestamp")
trace "Adding item to store queue", msg=msg
# TODO the below delete block can be removed if we convert to circular buffer
if storeQueue.items.len >= storeQueue.capacity:
var
w = SortedSetWalkRef[Index, IndexedWakuMessage].init(storeQueue.items)
toDelete = w.first
firstItem = w.first
trace "Store queue at capacity. Deleting oldest item.", oldest=toDelete
discard storeQueue.items.delete(toDelete.value.key)
if cmp(msg.index, firstItem.value.key) < 0:
# When at capacity, we won't add if message index is smaller (older) than our oldest item
w.destroy # Clean up walker
return err("too_old")
discard storeQueue.items.delete(firstItem.value.key)
w.destroy # better to destroy walker after a delete operation
let res = storeQueue.items.insert(msg.index)