mirror of https://github.com/waku-org/nwaku.git
Merge branch 'master' into noise-handshakestates
This commit is contained in:
commit
7a832c1bc2
|
@ -1,7 +1,7 @@
|
||||||
{.used.}
|
{.used.}
|
||||||
|
|
||||||
import
|
import
|
||||||
std/sequtils,
|
std/[sequtils, strutils],
|
||||||
testutils/unittests,
|
testutils/unittests,
|
||||||
../../waku/v2/protocol/waku_store/waku_store_types,
|
../../waku/v2/protocol/waku_store/waku_store_types,
|
||||||
../../waku/v2/utils/time
|
../../waku/v2/utils/time
|
||||||
|
@ -40,10 +40,41 @@ procSuite "Sorted store queue":
|
||||||
# Add one more. Capacity should not be exceeded.
|
# Add one more. Capacity should not be exceeded.
|
||||||
check:
|
check:
|
||||||
stQ.add(genIndexedWakuMessage(capacity.int8 + 1)).isOk()
|
stQ.add(genIndexedWakuMessage(capacity.int8 + 1)).isOk()
|
||||||
|
stQ.len == capacity
|
||||||
|
|
||||||
|
# Attempt to add message with older value than oldest in queue should fail
|
||||||
|
let
|
||||||
|
oldestTimestamp = stQ.first().get().index.senderTime
|
||||||
|
addRes = stQ.add(genIndexedWakuMessage(oldestTimestamp.int8 - 1))
|
||||||
|
|
||||||
check:
|
check:
|
||||||
|
oldestTimestamp == 2
|
||||||
|
addRes.isErr()
|
||||||
|
($(addRes.error())).contains("too_old")
|
||||||
stQ.len == capacity
|
stQ.len == capacity
|
||||||
|
|
||||||
|
test "Sender time can't be more than MaxTimeVariance in future":
|
||||||
|
var stQ = StoreQueueRef.new(capacity)
|
||||||
|
let
|
||||||
|
receiverTime = getNanoSecondTime(10)
|
||||||
|
senderTimeOk = receiverTime + MaxTimeVariance
|
||||||
|
senderTimeErr = senderTimeOk + 1
|
||||||
|
validMessage = IndexedWakuMessage(msg: WakuMessage(payload: @[byte 1], timestamp: senderTimeOk),
|
||||||
|
index: Index(receiverTime: receiverTime, senderTime: senderTimeOk))
|
||||||
|
invalidMessage = IndexedWakuMessage(msg: WakuMessage(payload: @[byte 1], timestamp: senderTimeErr),
|
||||||
|
index: Index(receiverTime: receiverTime, senderTime: senderTimeErr))
|
||||||
|
|
||||||
|
# Invalid case
|
||||||
|
let invalidRes = stQ.add(invalidMessage)
|
||||||
|
check:
|
||||||
|
invalidRes.isErr()
|
||||||
|
($(invalidRes.error())).contains("future_sender_timestamp")
|
||||||
|
|
||||||
|
# Valid case
|
||||||
|
let validRes = stQ.add(validMessage)
|
||||||
|
check:
|
||||||
|
validRes.isOk()
|
||||||
|
|
||||||
test "Store queue sort-on-insert works":
|
test "Store queue sort-on-insert works":
|
||||||
# Walk forward through the set and verify ascending order
|
# Walk forward through the set and verify ascending order
|
||||||
var prevSmaller = genIndexedWakuMessage(min(unsortedSet).int8 - 1).index
|
var prevSmaller = genIndexedWakuMessage(min(unsortedSet).int8 - 1).index
|
||||||
|
|
|
@ -1 +1 @@
|
||||||
Subproject commit 1682096306ddba8185dcfac360a8c3f952d721e4
|
Subproject commit 972f25d6c3a324848728d2d05796209f1b9d120e
|
|
@ -1 +1 @@
|
||||||
Subproject commit 87197230779002a2bfa8642f0e2ae07e2349e304
|
Subproject commit bb4c3298f56ba7bc69fbccd08fd6e5474c410262
|
|
@ -1 +1 @@
|
||||||
Subproject commit 1a85b3d50bf9a6dbbd58df045633dae0907c51c2
|
Subproject commit 5a5bfd3c09887cdf80b052d14bff7b9a0a919938
|
|
@ -1 +1 @@
|
||||||
Subproject commit b4bab89abdde3653939abe36ab9f6cae4aa1cbd1
|
Subproject commit 335f292a5816910aebf215e3a88db8a665133e0e
|
|
@ -1 +1 @@
|
||||||
Subproject commit 461fd03edb300b7946544b34442d1a05d4ef2270
|
Subproject commit 074cd026e61675708d518839e6914d55baa4b8ca
|
|
@ -1 +1 @@
|
||||||
Subproject commit 284b3aac05a9d96c27044c389a5d27a84d8e8f4b
|
Subproject commit 2ea147a71c4c05d64fdff5e7b8d8990eb5821399
|
|
@ -1 +1 @@
|
||||||
Subproject commit eaa72dcdbe26d8236c64711e32cf0c2f8ac65e64
|
Subproject commit 9973b9466dd062d7406d0178c750249b7e48471f
|
|
@ -1 +1 @@
|
||||||
Subproject commit 858f73b7d3ae992333a7ffab35da87e3b7b81356
|
Subproject commit 11edec862f96e42374bc2d584c84cc88d5d1f95f
|
|
@ -1 +1 @@
|
||||||
Subproject commit 8994b67b07813955c61bebddf4bd2325439c3535
|
Subproject commit 11df74552d3a3abe2c722c536c8075ef6814d5fa
|
|
@ -1 +1 @@
|
||||||
Subproject commit 419903c9a31ab253cf5cf19f24d9a912dc4b5154
|
Subproject commit cdb1f213d073fd2ecbdaf35a866417657da9294c
|
|
@ -1 +1 @@
|
||||||
Subproject commit e656ad40d1815f618c12c7636a37e6971959aa62
|
Subproject commit c05f75a8dae5f0066db5008dbe41a803ecbfbbcf
|
|
@ -1 +1 @@
|
||||||
Subproject commit 1f66448152a03e45ebb556c00300e13565cd8adf
|
Subproject commit b6867213f289f12d8f15acdd154a32b98df332bf
|
|
@ -1 +1 @@
|
||||||
Subproject commit c3b386b1e88c0d60bc4ff9da215f969b23ee2f39
|
Subproject commit d260915a0c2111e64ac49b9f75083734c58e5f03
|
|
@ -1 +1 @@
|
||||||
Subproject commit 4a7a058843cdb7a6a4fd25a55f0959c51b0b5847
|
Subproject commit 14d852e0d40ce71d916545b54ac86c2e35315f62
|
|
@ -1 +1 @@
|
||||||
Subproject commit 041544c2b7681040d160f7bd95c0ecc010c06968
|
Subproject commit a8ab2dc39aad4d69ba3be72868772d851b4b9741
|
|
@ -7,11 +7,10 @@
|
||||||
|
|
||||||
{.push raises: [Defect].}
|
{.push raises: [Defect].}
|
||||||
|
|
||||||
import std/[oids, options, math, tables]
|
import std/[oids, options, strutils, tables]
|
||||||
import chronos
|
import chronos
|
||||||
import chronicles
|
import chronicles
|
||||||
import bearssl
|
import bearssl
|
||||||
import strutils
|
|
||||||
import stew/[results, endians2, byteutils]
|
import stew/[results, endians2, byteutils]
|
||||||
import nimcrypto/[utils, sha2, hmac]
|
import nimcrypto/[utils, sha2, hmac]
|
||||||
|
|
||||||
|
@ -765,7 +764,7 @@ proc serializePayloadV2*(self: PayloadV2): Result[seq[byte], cstring] =
|
||||||
payload.add self.transportMessage
|
payload.add self.transportMessage
|
||||||
|
|
||||||
return ok(payload)
|
return ok(payload)
|
||||||
|
|
||||||
|
|
||||||
# Deserializes a byte sequence to a PayloadV2 object according to https://rfc.vac.dev/spec/35/.
|
# Deserializes a byte sequence to a PayloadV2 object according to https://rfc.vac.dev/spec/35/.
|
||||||
# The input serialized payload concatenates the output PayloadV2 object fields as
|
# The input serialized payload concatenates the output PayloadV2 object fields as
|
||||||
|
|
|
@ -405,8 +405,8 @@ proc handleMessage*(w: WakuStore, topic: string, msg: WakuMessage) {.async.} =
|
||||||
let addRes = w.messages.add(IndexedWakuMessage(msg: msg, index: index, pubsubTopic: topic))
|
let addRes = w.messages.add(IndexedWakuMessage(msg: msg, index: index, pubsubTopic: topic))
|
||||||
|
|
||||||
if addRes.isErr:
|
if addRes.isErr:
|
||||||
trace "Attempt to add message with duplicate index to store", msg=msg, index=index
|
trace "Attempt to add message to store failed", msg=msg, index=index, err=addRes.error()
|
||||||
waku_store_errors.inc(labelValues = ["duplicate"])
|
waku_store_errors.inc(labelValues = [$(addRes.error())])
|
||||||
return # Do not attempt to store in persistent DB
|
return # Do not attempt to store in persistent DB
|
||||||
|
|
||||||
waku_store_messages.set(w.messages.len.int64, labelValues = ["stored"])
|
waku_store_messages.set(w.messages.len.int64, labelValues = ["stored"])
|
||||||
|
|
|
@ -36,6 +36,8 @@ const
|
||||||
|
|
||||||
MaxRpcSize* = MaxPageSize * MaxWakuMessageSize + 64*1024 # We add a 64kB safety buffer for protocol overhead
|
MaxRpcSize* = MaxPageSize * MaxWakuMessageSize + 64*1024 # We add a 64kB safety buffer for protocol overhead
|
||||||
|
|
||||||
|
MaxTimeVariance* = getNanoSecondTime(20) # 20 seconds maximum allowable sender timestamp "drift" into the future
|
||||||
|
|
||||||
DefaultTopic* = "/waku/2/default-waku/proto"
|
DefaultTopic* = "/waku/2/default-waku/proto"
|
||||||
|
|
||||||
|
|
||||||
|
@ -368,20 +370,28 @@ proc new*(T: type StoreQueueRef, capacity: int): T =
|
||||||
return StoreQueueRef(items: items, capacity: capacity)
|
return StoreQueueRef(items: items, capacity: capacity)
|
||||||
|
|
||||||
proc add*(storeQueue: StoreQueueRef, msg: IndexedWakuMessage): StoreQueueResult[void] =
|
proc add*(storeQueue: StoreQueueRef, msg: IndexedWakuMessage): StoreQueueResult[void] =
|
||||||
## Add a message to the queue.
|
## Add a message to the queue
|
||||||
## If we're at capacity, we will be removing,
|
## If we're at capacity, we will be removing,
|
||||||
## the oldest (first) item
|
## the oldest (first) item
|
||||||
|
|
||||||
|
# Ensure that messages don't "jump" to the front of the queue with future timestamps
|
||||||
|
if msg.index.senderTime - msg.index.receiverTime > MaxTimeVariance:
|
||||||
|
return err("future_sender_timestamp")
|
||||||
|
|
||||||
trace "Adding item to store queue", msg=msg
|
trace "Adding item to store queue", msg=msg
|
||||||
|
|
||||||
# TODO the below delete block can be removed if we convert to circular buffer
|
# TODO the below delete block can be removed if we convert to circular buffer
|
||||||
if storeQueue.items.len >= storeQueue.capacity:
|
if storeQueue.items.len >= storeQueue.capacity:
|
||||||
var
|
var
|
||||||
w = SortedSetWalkRef[Index, IndexedWakuMessage].init(storeQueue.items)
|
w = SortedSetWalkRef[Index, IndexedWakuMessage].init(storeQueue.items)
|
||||||
toDelete = w.first
|
firstItem = w.first
|
||||||
|
|
||||||
trace "Store queue at capacity. Deleting oldest item.", oldest=toDelete
|
if cmp(msg.index, firstItem.value.key) < 0:
|
||||||
discard storeQueue.items.delete(toDelete.value.key)
|
# When at capacity, we won't add if message index is smaller (older) than our oldest item
|
||||||
|
w.destroy # Clean up walker
|
||||||
|
return err("too_old")
|
||||||
|
|
||||||
|
discard storeQueue.items.delete(firstItem.value.key)
|
||||||
w.destroy # better to destroy walker after a delete operation
|
w.destroy # better to destroy walker after a delete operation
|
||||||
|
|
||||||
let res = storeQueue.items.insert(msg.index)
|
let res = storeQueue.items.insert(msg.index)
|
||||||
|
|
Loading…
Reference in New Issue