mirror of
https://github.com/waku-org/nwaku.git
synced 2025-01-27 15:16:05 +00:00
deploy: 450e0494f1cd8a4d20cc01dc4d2b7b3d3fad8988
This commit is contained in:
parent
e444594791
commit
03ad0fa8ce
@ -1,7 +1,7 @@
|
||||
{.used.}
|
||||
|
||||
import
|
||||
std/sequtils,
|
||||
std/[sequtils, strutils],
|
||||
testutils/unittests,
|
||||
../../waku/v2/protocol/waku_store/waku_store_types,
|
||||
../../waku/v2/utils/time
|
||||
@ -40,10 +40,41 @@ procSuite "Sorted store queue":
|
||||
# Add one more. Capacity should not be exceeded.
|
||||
check:
|
||||
stQ.add(genIndexedWakuMessage(capacity.int8 + 1)).isOk()
|
||||
stQ.len == capacity
|
||||
|
||||
# Attempt to add message with older value than oldest in queue should fail
|
||||
let
|
||||
oldestTimestamp = stQ.first().get().index.senderTime
|
||||
addRes = stQ.add(genIndexedWakuMessage(oldestTimestamp.int8 - 1))
|
||||
|
||||
check:
|
||||
oldestTimestamp == 2
|
||||
addRes.isErr()
|
||||
($(addRes.error())).contains("too_old")
|
||||
stQ.len == capacity
|
||||
|
||||
test "Sender time can't be more than MaxTimeVariance in future":
|
||||
var stQ = StoreQueueRef.new(capacity)
|
||||
let
|
||||
receiverTime = getNanoSecondTime(10)
|
||||
senderTimeOk = receiverTime + MaxTimeVariance
|
||||
senderTimeErr = senderTimeOk + 1
|
||||
validMessage = IndexedWakuMessage(msg: WakuMessage(payload: @[byte 1], timestamp: senderTimeOk),
|
||||
index: Index(receiverTime: receiverTime, senderTime: senderTimeOk))
|
||||
invalidMessage = IndexedWakuMessage(msg: WakuMessage(payload: @[byte 1], timestamp: senderTimeErr),
|
||||
index: Index(receiverTime: receiverTime, senderTime: senderTimeErr))
|
||||
|
||||
# Invalid case
|
||||
let invalidRes = stQ.add(invalidMessage)
|
||||
check:
|
||||
invalidRes.isErr()
|
||||
($(invalidRes.error())).contains("future_sender_timestamp")
|
||||
|
||||
# Valid case
|
||||
let validRes = stQ.add(validMessage)
|
||||
check:
|
||||
validRes.isOk()
|
||||
|
||||
test "Store queue sort-on-insert works":
|
||||
# Walk forward through the set and verify ascending order
|
||||
var prevSmaller = genIndexedWakuMessage(min(unsortedSet).int8 - 1).index
|
||||
|
26
vendor/nim-eth/eth/utp/utp_socket.nim
vendored
26
vendor/nim-eth/eth/utp/utp_socket.nim
vendored
@ -1048,7 +1048,6 @@ proc processPacketInternal(socket: UtpSocket, p: Packet) =
|
||||
socketAckNr = socket.ackNr,
|
||||
socketSeqNr = socket.seqNr,
|
||||
windowPackets = socket.curWindowPackets,
|
||||
rcvBufferSize = socket.offset,
|
||||
packetType = p.header.pType,
|
||||
seqNr = p.header.seqNr,
|
||||
ackNr = p.header.ackNr,
|
||||
@ -1464,12 +1463,6 @@ template shiftBuffer(t, c: untyped) =
|
||||
(t).offset = 0
|
||||
|
||||
proc onRead(socket: UtpSocket, readReq: var ReadReq): ReadResult =
|
||||
debug "Handling incoming read",
|
||||
rcvBufferSize = socket.offset,
|
||||
reorderBufferSize = socket.inBufferBytes,
|
||||
socketAtEOF = socket.atEof(),
|
||||
readTillEOF = readReq.bytesToRead == 0
|
||||
|
||||
if readReq.reader.finished():
|
||||
return ReadCancelled
|
||||
|
||||
@ -1484,18 +1477,9 @@ proc onRead(socket: UtpSocket, readReq: var ReadReq): ReadResult =
|
||||
readReq.bytesAvailable.add(socket.rcvBuffer.toOpenArray(0, socket.offset - 1))
|
||||
socket.shiftBuffer(socket.offset)
|
||||
if (socket.atEof()):
|
||||
|
||||
debug "Read finished",
|
||||
bytesRead = len(readReq.bytesAvailable),
|
||||
socektAtEof = socket.atEof()
|
||||
|
||||
readReq.reader.complete(readReq.bytesAvailable)
|
||||
return ReadFinished
|
||||
else:
|
||||
debug "Read not finished",
|
||||
bytesRead = len(readReq.bytesAvailable),
|
||||
socektAtEof = socket.atEof()
|
||||
|
||||
return ReadNotFinished
|
||||
else:
|
||||
let bytesAlreadyRead = len(readReq.bytesAvailable)
|
||||
@ -1504,17 +1488,9 @@ proc onRead(socket: UtpSocket, readReq: var ReadReq): ReadResult =
|
||||
readReq.bytesAvailable.add(socket.rcvBuffer.toOpenArray(0, count - 1))
|
||||
socket.shiftBuffer(count)
|
||||
if (len(readReq.bytesAvailable) == readReq.bytesToRead):
|
||||
debug "Read finished",
|
||||
bytesRead = len(readReq.bytesAvailable),
|
||||
socektAtEof = socket.atEof()
|
||||
|
||||
readReq.reader.complete(readReq.bytesAvailable)
|
||||
return ReadFinished
|
||||
else:
|
||||
debug "Read not finished",
|
||||
bytesRead = len(readReq.bytesAvailable),
|
||||
socektAtEof = socket.atEof()
|
||||
|
||||
return ReadNotFinished
|
||||
|
||||
proc eventLoop(socket: UtpSocket) {.async.} =
|
||||
@ -1527,7 +1503,7 @@ proc eventLoop(socket: UtpSocket) {.async.} =
|
||||
|
||||
# we processed a packet and rcv buffer size is larger than 0,
|
||||
# check if we can finish some pending readers
|
||||
while socket.pendingReads.len() > 0:
|
||||
while socket.pendingReads.len() > 0 and socket.offset > 0:
|
||||
let readResult = socket.onRead(socket.pendingReads[0])
|
||||
case readResult
|
||||
of ReadFinished:
|
||||
|
@ -120,6 +120,7 @@ procSuite "Utp protocol over udp tests with loss and delays":
|
||||
bytesPerRead: int = 0): TestCase =
|
||||
TestCase(maxDelay: maxDelay, dropRate: dropRate, bytesToTransfer: bytesToTransfer, cfg: cfg, bytesPerRead: bytesPerRead)
|
||||
|
||||
|
||||
let testCases = @[
|
||||
TestCase.init(45, 10, 40000),
|
||||
TestCase.init(25, 15, 40000),
|
||||
@ -227,32 +228,3 @@ procSuite "Utp protocol over udp tests with loss and delays":
|
||||
await clientProtocol.shutdownWait()
|
||||
await serverProtocol.shutdownWait()
|
||||
|
||||
let testCase2 = @[
|
||||
TestCase.init(45, 0, 40000),
|
||||
TestCase.init(45, 0, 80000),
|
||||
TestCase.init(25, 15, 40000),
|
||||
TestCase.init(15, 5, 40000, SocketConfig.init(optRcvBuffer = uint32(10000), remoteWindowResetTimeout = seconds(5)))
|
||||
]
|
||||
|
||||
asyncTest "Write large data and read till EOF":
|
||||
for testCase in testCase2:
|
||||
let (
|
||||
clientProtocol,
|
||||
clientSocket,
|
||||
serverProtocol,
|
||||
serverSocket) = await testScenario(testCase.maxDelay, testCase.dropRate, testcase.cfg)
|
||||
|
||||
|
||||
let numBytes = testCase.bytesToTransfer
|
||||
let bytesToTransfer = generateByteArray(rng[], numBytes)
|
||||
|
||||
discard await clientSocket.write(bytesToTransfer)
|
||||
clientSocket.close()
|
||||
|
||||
let read = await serverSocket.read()
|
||||
|
||||
check:
|
||||
read == bytesToTransfer
|
||||
|
||||
await clientProtocol.shutdownWait()
|
||||
await serverProtocol.shutdownWait()
|
||||
|
@ -2,7 +2,7 @@
|
||||
|
||||
# libtool - Provide generalized library-building support services.
|
||||
# Generated automatically by config.status (libbacktrace) version-unused
|
||||
# Libtool was configured on host fv-az129-796:
|
||||
# Libtool was configured on host fv-az457-821:
|
||||
# NOTE: Changes made to this file will be lost: look at ltmain.sh.
|
||||
#
|
||||
# Copyright (C) 1996, 1997, 1998, 1999, 2000, 2001, 2003, 2004, 2005,
|
||||
|
@ -405,8 +405,8 @@ proc handleMessage*(w: WakuStore, topic: string, msg: WakuMessage) {.async.} =
|
||||
let addRes = w.messages.add(IndexedWakuMessage(msg: msg, index: index, pubsubTopic: topic))
|
||||
|
||||
if addRes.isErr:
|
||||
trace "Attempt to add message with duplicate index to store", msg=msg, index=index
|
||||
waku_store_errors.inc(labelValues = ["duplicate"])
|
||||
trace "Attempt to add message to store failed", msg=msg, index=index, err=addRes.error()
|
||||
waku_store_errors.inc(labelValues = [$(addRes.error())])
|
||||
return # Do not attempt to store in persistent DB
|
||||
|
||||
waku_store_messages.set(w.messages.len.int64, labelValues = ["stored"])
|
||||
|
@ -36,6 +36,8 @@ const
|
||||
|
||||
MaxRpcSize* = MaxPageSize * MaxWakuMessageSize + 64*1024 # We add a 64kB safety buffer for protocol overhead
|
||||
|
||||
MaxTimeVariance* = getNanoSecondTime(20) # 20 seconds maximum allowable sender timestamp "drift" into the future
|
||||
|
||||
DefaultTopic* = "/waku/2/default-waku/proto"
|
||||
|
||||
|
||||
@ -368,20 +370,28 @@ proc new*(T: type StoreQueueRef, capacity: int): T =
|
||||
return StoreQueueRef(items: items, capacity: capacity)
|
||||
|
||||
proc add*(storeQueue: StoreQueueRef, msg: IndexedWakuMessage): StoreQueueResult[void] =
|
||||
## Add a message to the queue.
|
||||
## Add a message to the queue
|
||||
## If we're at capacity, we will be removing,
|
||||
## the oldest (first) item
|
||||
|
||||
# Ensure that messages don't "jump" to the front of the queue with future timestamps
|
||||
if msg.index.senderTime - msg.index.receiverTime > MaxTimeVariance:
|
||||
return err("future_sender_timestamp")
|
||||
|
||||
trace "Adding item to store queue", msg=msg
|
||||
|
||||
# TODO the below delete block can be removed if we convert to circular buffer
|
||||
if storeQueue.items.len >= storeQueue.capacity:
|
||||
var
|
||||
w = SortedSetWalkRef[Index, IndexedWakuMessage].init(storeQueue.items)
|
||||
toDelete = w.first
|
||||
firstItem = w.first
|
||||
|
||||
trace "Store queue at capacity. Deleting oldest item.", oldest=toDelete
|
||||
discard storeQueue.items.delete(toDelete.value.key)
|
||||
if cmp(msg.index, firstItem.value.key) < 0:
|
||||
# When at capacity, we won't add if message index is smaller (older) than our oldest item
|
||||
w.destroy # Clean up walker
|
||||
return err("too_old")
|
||||
|
||||
discard storeQueue.items.delete(firstItem.value.key)
|
||||
w.destroy # better to destroy walker after a delete operation
|
||||
|
||||
let res = storeQueue.items.insert(msg.index)
|
||||
|
Loading…
x
Reference in New Issue
Block a user