mirror of https://github.com/status-im/nim-eth.git
Fix some typos (#557)
This commit is contained in:
parent
ff0b1a330a
commit
e7d3de6ebf
|
@ -41,7 +41,7 @@ will be advanced just past the end of the consumed object.
|
|||
|
||||
The `toXX` and `read` family of procs may raise a `RlpTypeMismatch` in case
|
||||
of type mismatch with the stream contents under the cursor. A corrupted
|
||||
RLP stream or an attemp to read past the stream end will be signaled
|
||||
RLP stream or an attempt to read past the stream end will be signaled
|
||||
with the `MalformedRlpError` exception. If the RLP stream includes data
|
||||
that cannot be processed on the current platform (e.g. an integer value
|
||||
that is too large), the library will raise an `UnsupportedRlpError` exception.
|
||||
|
|
|
@ -86,7 +86,7 @@ Constructor API:
|
|||
* init(BinaryTrie, DB, rootHash[optional])
|
||||
|
||||
Normally you would not set the rootHash when constructing an empty Binary-trie.
|
||||
Setting the rootHash occured in a scenario where you have a populated DB
|
||||
Setting the rootHash occurred in a scenario where you have a populated DB
|
||||
with existing trie structure and you know the rootHash,
|
||||
and then you want to continue/resume the trie operations.
|
||||
|
||||
|
@ -123,7 +123,7 @@ What kind of lie? actually, `delete` and `deleteSubtrie` doesn't remove the
|
|||
'deleted' node from the underlying DB. It only make the node inaccessible
|
||||
from the user of the trie. The same also happened if you update the value of a key,
|
||||
the old value node is not removed from the underlying DB.
|
||||
A more subtle lie also happened when you add new entrie into the trie using `set` operation.
|
||||
A more subtle lie also happened when you add new entries into the trie using `set` operation.
|
||||
The previous hash of affected branch become obsolete and replaced by new hash,
|
||||
the old hash become inaccessible to the user.
|
||||
You may think that is a waste of storage space.
|
||||
|
@ -230,7 +230,7 @@ Then we can write the clean tree into a new DB instance to replace the old one.
|
|||
## Sparse Merkle Trie
|
||||
|
||||
Sparse Merkle Trie(SMT) is a variant of Binary Trie which uses binary encoding to
|
||||
represent path during trie travelsal. When Binary Trie uses three types of node,
|
||||
represent path during trie traversal. When Binary Trie uses three types of node,
|
||||
SMT only use one type of node without any additional special encoding to store it's key-path.
|
||||
|
||||
Actually, it doesn't even store it's key-path anywhere like Binary Trie,
|
||||
|
@ -280,7 +280,7 @@ Constructor API:
|
|||
* init(SparseBinaryTrie, DB, rootHash[optional])
|
||||
|
||||
Normally you would not set the rootHash when constructing an empty Sparse Merkle Trie.
|
||||
Setting the rootHash occured in a scenario where you have a populated DB
|
||||
Setting the rootHash occurred in a scenario where you have a populated DB
|
||||
with existing trie structure and you know the rootHash,
|
||||
and then you want to continue/resume the trie operations.
|
||||
|
||||
|
|
|
@ -19,7 +19,7 @@ export results
|
|||
type
|
||||
MemStoreRef* = ref object of RootObj
|
||||
records: Table[seq[byte], seq[byte]]
|
||||
# TODO interaction with this table would benefit from heterogenous lookup
|
||||
# TODO interaction with this table would benefit from heterogeneous lookup
|
||||
# (see `@key` below)
|
||||
# https://github.com/nim-lang/Nim/issues/7457
|
||||
|
||||
|
@ -51,7 +51,7 @@ template put*(dbParam: KvStoreRef, key, val: openArray[byte]): KvResult[void] =
|
|||
db.putProc(db.obj, key, val)
|
||||
|
||||
template get*(dbParam: KvStoreRef, key: openArray[byte], onData: untyped): KvResult[bool] =
|
||||
## Retrive value at ``key`` and call ``onData`` with the value. The data is
|
||||
## Retrieve value at ``key`` and call ``onData`` with the value. The data is
|
||||
## valid for the duration of the callback.
|
||||
## ``onData``: ``proc(data: openArray[byte])``
|
||||
## returns true if found and false otherwise.
|
||||
|
|
|
@ -59,7 +59,7 @@ template dispose*(db: SqliteStmt) =
|
|||
|
||||
func isInsideTransaction*(db: SqStoreRef): bool =
|
||||
sqlite3_get_autocommit(db.env) == 0
|
||||
|
||||
|
||||
proc release[T](x: var AutoDisposed[T]): T =
|
||||
result = x.val
|
||||
x.val = nil
|
||||
|
@ -132,7 +132,7 @@ proc exec*[P](s: SqliteStmt[P, void], params: P): KvResult[void] =
|
|||
else:
|
||||
ok()
|
||||
|
||||
# release implict transaction
|
||||
# release implicit transaction
|
||||
discard sqlite3_reset(s) # same return information as step
|
||||
discard sqlite3_clear_bindings(s) # no errors possible
|
||||
|
||||
|
@ -620,12 +620,12 @@ proc customScalarBlobFunction(ctx: ptr sqlite3_context, n: cint, v: ptr ptr sqli
|
|||
try:
|
||||
if s.isOk():
|
||||
let bytes = s.unsafeGet()
|
||||
# try is necessessary as otherwise nim marks SQLITE_TRANSIENT as throwning
|
||||
# try is necessary as otherwise nim marks SQLITE_TRANSIENT as throwing
|
||||
# unlisted exception.
|
||||
# Using SQLITE_TRANSIENT destructor type, as it inform sqlite that data
|
||||
# under provided pointer may be deleted at any moment, which is the case
|
||||
# for seq[byte] as it is managed by nim gc. With this flag sqlite copy bytes
|
||||
# under pointer and then realeases them itself.
|
||||
# under pointer and then releases them itself.
|
||||
sqlite3_result_blob(ctx, unsafeAddr bytes[0], bytes.len.cint, SQLITE_TRANSIENT)
|
||||
else:
|
||||
let errMsg = s.error
|
||||
|
|
|
@ -313,7 +313,7 @@ proc createKeyFileJson*(seckey: PrivateKey,
|
|||
## ``version`` - version of keyfile format (default is 3)
|
||||
## ``cryptkind`` - algorithm for private key encryption
|
||||
## (default is AES128-CTR)
|
||||
## ``kdfkind`` - algorithm for key deriviation function (default is PBKDF2)
|
||||
## ``kdfkind`` - algorithm for key derivation function (default is PBKDF2)
|
||||
## ``workfactor`` - Key deriviation function work factor, 0 is to use
|
||||
## default workfactor.
|
||||
var iv: array[aes128.sizeBlock, byte]
|
||||
|
|
|
@ -9,7 +9,7 @@
|
|||
# working with keys and signatures as they appear in Ethereum in particular:
|
||||
#
|
||||
# * Public keys as serialized in uncompressed format without the initial byte
|
||||
# * Shared secrets are serialized in raw format without the intial byte
|
||||
# * Shared secrets are serialized in raw format without the initial byte
|
||||
# * distinct types are used to avoid confusion with the "standard" secp types
|
||||
|
||||
{.push raises: [Defect].}
|
||||
|
|
|
@ -65,6 +65,6 @@ proc getRouteIpv4*(): Result[ValidIpAddress, cstring] =
|
|||
let ip = try: route.source.address()
|
||||
except ValueError as e:
|
||||
# This should not occur really.
|
||||
error "Address convertion error", exception = e.name, msg = e.msg
|
||||
error "Address conversion error", exception = e.name, msg = e.msg
|
||||
return err("Invalid IP address")
|
||||
ok(ValidIpAddress.init(ip))
|
||||
|
|
|
@ -25,7 +25,7 @@ const
|
|||
MAC_SIZE = 256 div 8 # 32
|
||||
SIG_SIZE = 520 div 8 # 65
|
||||
HEAD_SIZE = MAC_SIZE + SIG_SIZE # 97
|
||||
EXPIRATION = 60 # let messages expire after N secondes
|
||||
EXPIRATION = 60 # let messages expire after N seconds
|
||||
PROTO_VERSION = 4
|
||||
|
||||
type
|
||||
|
|
|
@ -520,7 +520,7 @@ proc decodeHandshakePacket(c: var Codec, fromAddr: Address, nonce: AESGCMNonce,
|
|||
# Differently from an ordinary message, this is seen as an error as the
|
||||
# secrets just got negotiated in the handshake and thus decryption should
|
||||
# always work. We do not send a new Whoareyou on these as it probably means
|
||||
# there is a compatiblity issue and we might loop forever in failed
|
||||
# there is a compatibility issue and we might loop forever in failed
|
||||
# handshakes with this peer.
|
||||
return err("Decryption of message failed in handshake packet")
|
||||
|
||||
|
|
|
@ -10,7 +10,7 @@
|
|||
##
|
||||
## This allows the selection of a node its own public IP based on address
|
||||
## information that is received from other nodes.
|
||||
## This can be used in conjuction with discovery v5 ping-pong request responses
|
||||
## This can be used in conjunction with discovery v5 ping-pong request responses
|
||||
## that provide this information.
|
||||
## To select the right address, a majority count is done. This is done over a
|
||||
## sort of moving window as votes expire after `IpVoteTimeout`.
|
||||
|
|
|
@ -5,7 +5,7 @@ import std/[tables, lists, options]
|
|||
type
|
||||
LRUCache*[K, V] = object of RootObj
|
||||
list: DoublyLinkedList[(K, V)] # Head is MRU k:v and tail is LRU k:v
|
||||
table: Table[K, DoublyLinkedNode[(K, V)]] # DoublyLinkedNode is alraedy ref
|
||||
table: Table[K, DoublyLinkedNode[(K, V)]] # DoublyLinkedNode is already ref
|
||||
capacity: int
|
||||
|
||||
func init*[K, V](T: type LRUCache[K, V], capacity: int): LRUCache[K, V] =
|
||||
|
|
|
@ -40,7 +40,7 @@ proc verifyNodesRecords(enrs: openArray[Record], fromNode: Node, nodesLimit: int
|
|||
# The discovery v5 specification specifies no limit on the amount of ENRs
|
||||
# that can be returned, but clients usually stick with the bucket size limit
|
||||
# as in original Kademlia. Because of this it is chosen not to fail
|
||||
# immediatly, but still process maximum `findNodeResultLimit`.
|
||||
# immediately, but still process maximum `findNodeResultLimit`.
|
||||
if count >= nodesLimit:
|
||||
debug "Too many ENRs", enrs = enrs.len(),
|
||||
limit = nodesLimit, sender = fromNode.record.toURI
|
||||
|
|
|
@ -490,7 +490,7 @@ proc replaceNode(d: Protocol, n: Node) =
|
|||
# peers in the routing table.
|
||||
debug "Message request to bootstrap node failed", enr = toURI(n.record)
|
||||
|
||||
# TODO: This could be improved to do the clean-up immediatily in case a non
|
||||
# TODO: This could be improved to do the clean-up immediately in case a non
|
||||
# whoareyou response does arrive, but we would need to store the AuthTag
|
||||
# somewhere
|
||||
proc registerRequest(d: Protocol, n: Node, message: seq[byte],
|
||||
|
|
|
@ -53,7 +53,7 @@ type
|
|||
## time seen. First entry (head) is considered the most recently seen node
|
||||
## and the last entry (tail) is considered the least recently seen node.
|
||||
## Here "seen" means a successful request-response. This can also not have
|
||||
## occured yet.
|
||||
## occurred yet.
|
||||
replacementCache: seq[Node] ## Nodes that could not be added to the `nodes`
|
||||
## seq as it is full and without stale nodes. This is practically a small
|
||||
## LRU cache.
|
||||
|
@ -70,12 +70,12 @@ type
|
|||
## is possible that a malicious node could fill (poison) the routing table or
|
||||
## a specific bucket with ENRs with IPs it does not control. The effect of
|
||||
## this would be that a node that actually owns the IP could have a difficult
|
||||
## time getting its ENR distrubuted in the DHT and as a consequence would
|
||||
## time getting its ENR distributed in the DHT and as a consequence would
|
||||
## not be reached from the outside as much (or at all). However, that node can
|
||||
## still search and find nodes to connect to. So it would practically be a
|
||||
## similar situation as a node that is not reachable behind the NAT because
|
||||
## port mapping is not set up properly.
|
||||
## There is the possiblity to set the IP limit on verified (=contacted) nodes
|
||||
## There is the possibility to set the IP limit on verified (=contacted) nodes
|
||||
## only, but that would allow for lookups to be done on a higher set of nodes
|
||||
## owned by the same identity. This is a worse alternative.
|
||||
## Next, doing lookups only on verified nodes would slow down discovery start
|
||||
|
@ -321,7 +321,7 @@ proc addNode*(r: var RoutingTable, n: Node): NodeStatus =
|
|||
## Try to add the node to the routing table.
|
||||
##
|
||||
## First, an attempt will be done to add the node to the bucket in its range.
|
||||
## If this fails, the bucket will be split if it is eligable for splitting.
|
||||
## If this fails, the bucket will be split if it is eligible for splitting.
|
||||
## If so, a new attempt will be done to add the node. If not, the node will be
|
||||
## added to the replacement cache.
|
||||
##
|
||||
|
|
|
@ -667,7 +667,7 @@ proc randomNodes*(k: KademliaProtocol, count: int): seq[Node] =
|
|||
result = newSeqOfCap[Node](count)
|
||||
var seen = initHashSet[Node]()
|
||||
|
||||
# This is a rather inneficient way of randomizing nodes from all buckets, but even if we
|
||||
# This is a rather inefficient way of randomizing nodes from all buckets, but even if we
|
||||
# iterate over all nodes in the routing table, the time it takes would still be
|
||||
# insignificant compared to the time it takes for the network roundtrips when connecting
|
||||
# to nodes.
|
||||
|
|
|
@ -54,7 +54,7 @@ type
|
|||
## Cached ident for the timeout parameter
|
||||
|
||||
extraDefs*: NimNode
|
||||
## The reponse procs have extra templates that must become
|
||||
## The response procs have extra templates that must become
|
||||
## part of the generated code
|
||||
|
||||
P2PProtocol* = ref object
|
||||
|
@ -253,7 +253,7 @@ proc refreshParam(n: NimNode): NimNode =
|
|||
result = copyNimTree(n)
|
||||
if n.kind == nnkIdentDefs:
|
||||
for i in 0..<n.len-2:
|
||||
if n[i].kind == nnkSym:
|
||||
if n[i].kind == nnkSym:
|
||||
result[i] = genSym(symKind(n[i]), $n[i])
|
||||
|
||||
iterator typedInputParams(procDef: NimNode, skip = 0): (NimNode, NimNode) =
|
||||
|
@ -414,7 +414,7 @@ proc newMsg(protocol: P2PProtocol, kind: MessageKind, id: int,
|
|||
procDef: NimNode, response: Message = nil): Message =
|
||||
|
||||
if procDef[0].kind == nnkPostfix:
|
||||
error("p2pProcotol procs are public by default. " &
|
||||
error("p2pProtocol procs are public by default. " &
|
||||
"Please remove the postfix `*`.", procDef)
|
||||
|
||||
var
|
||||
|
@ -588,7 +588,7 @@ proc createSendProc*(msg: Message,
|
|||
# A response proc must be called with a response object that originates
|
||||
# from a certain request. Here we change the Peer parameter at position
|
||||
# 1 to the correct strongly-typed ResponderType. The incoming procs still
|
||||
# gets the normal Peer paramter.
|
||||
# gets the normal Peer parameter.
|
||||
let
|
||||
ResponderType = msg.ResponderType
|
||||
sendProcName = msg.ident
|
||||
|
@ -687,7 +687,7 @@ proc useStandardBody*(sendProc: SendProc,
|
|||
preSerialization = if preSerializationStep.isNil: newStmtList()
|
||||
else: preSerializationStep(outputStream)
|
||||
|
||||
serilization = writeParamsAsRecord(sendProc.msgParams,
|
||||
serialization = writeParamsAsRecord(sendProc.msgParams,
|
||||
outputStream, Format, msgRecName)
|
||||
|
||||
postSerialization = if postSerializationStep.isNil: newStmtList()
|
||||
|
@ -708,7 +708,7 @@ proc useStandardBody*(sendProc: SendProc,
|
|||
|
||||
var `outputStream` = memoryOutput()
|
||||
`preSerialization`
|
||||
`serilization`
|
||||
`serialization`
|
||||
`postSerialization`
|
||||
`tracing`
|
||||
let `msgBytes` = getOutput(`outputStream`)
|
||||
|
@ -846,7 +846,7 @@ proc processProtocolBody*(p: P2PProtocol, protocolBody: NimNode) =
|
|||
elif eqIdent(n[0], "requestResponse"):
|
||||
# `requestResponse` can be given a block of 2 or more procs.
|
||||
# The last one is considered to be a response message, while
|
||||
# all preceeding ones are requests triggering the response.
|
||||
# all preceding ones are requests triggering the response.
|
||||
# The system makes sure to automatically insert a hidden `reqId`
|
||||
# parameter used to discriminate the individual messages.
|
||||
let procs = expectBlockWithProcs(n)
|
||||
|
@ -982,7 +982,7 @@ macro emitForSingleBackend(
|
|||
version: static[int],
|
||||
backend: static[BackendFactory],
|
||||
body: untyped,
|
||||
# TODO Nim can't handle a proper duration paramter here
|
||||
# TODO Nim can't handle a proper duration parameter here
|
||||
timeouts: static[int64] = defaultReqTimeout.milliseconds,
|
||||
useRequestIds: static[bool] = true,
|
||||
rlpxName: static[string] = "",
|
||||
|
|
|
@ -14,7 +14,7 @@ import
|
|||
./private/p2p_types, "."/[kademlia, auth, rlpxcrypt, enode, p2p_protocol_dsl]
|
||||
|
||||
const
|
||||
# Insane kludge for suppporting chunked messages when syncing against clients
|
||||
# Insane kludge for supporting chunked messages when syncing against clients
|
||||
# like Nethermind.
|
||||
#
|
||||
# The original specs which are now obsoleted can be found here:
|
||||
|
@ -556,7 +556,7 @@ proc recvMsg*(peer: Peer): Future[tuple[msgId: int, msgData: Rlp]] {.async.} =
|
|||
# JACEK:
|
||||
# or pass it in, allowing the caller to choose - they'll likely be in a
|
||||
# better position to decide if buffer should be reused or not. this will
|
||||
# also be useuful for chunked messages where part of the buffer may have
|
||||
# also be useful for chunked messages where part of the buffer may have
|
||||
# been processed and needs filling in
|
||||
var encryptedBytes = newSeq[byte](remainingBytes)
|
||||
await peer.transport.readExactly(addr encryptedBytes[0], len(encryptedBytes))
|
||||
|
@ -776,7 +776,7 @@ proc dispatchMessages*(peer: Peer) {.async.} =
|
|||
msg = peer.getMsgName(msgId), err = e.msg
|
||||
|
||||
# TODO: Hmm, this can be safely moved into the message handler thunk.
|
||||
# The documentation will need to be updated, explaning the fact that
|
||||
# The documentation will need to be updated, explaining the fact that
|
||||
# nextMsg will be resolved only if the message handler has executed
|
||||
# successfully.
|
||||
if msgId >= 0 and msgId < peer.awaitedMessages.len and
|
||||
|
@ -1146,9 +1146,9 @@ proc postHelloSteps(peer: Peer, h: DevP2P.hello) {.async.} =
|
|||
if protocol.handshake != nil:
|
||||
subProtocolsHandshakes.add((protocol.handshake)(peer))
|
||||
|
||||
# The `dispatchMesssages` loop must be started after this.
|
||||
# The `dispatchMessages` loop must be started after this.
|
||||
# Otherwise, we risk that some of the handshake packets sent by
|
||||
# the other peer may arrrive too early and be processed before
|
||||
# the other peer may arrive too early and be processed before
|
||||
# the handshake code got a change to wait for them.
|
||||
#
|
||||
var messageProcessingLoop = peer.dispatchMessages()
|
||||
|
|
|
@ -399,7 +399,7 @@ proc toNodes*(self: var Rlp): RlpNode =
|
|||
result.bytes = self.toBytes()
|
||||
self.position = self.currentElemEnd()
|
||||
|
||||
# We define a single `read` template with a pretty low specifity
|
||||
# We define a single `read` template with a pretty low specificity
|
||||
# score in order to facilitate easier overloading with user types:
|
||||
template read*(rlp: var Rlp, T: type): auto =
|
||||
readImpl(rlp, T)
|
||||
|
|
|
@ -201,7 +201,7 @@ proc appendImpl(self: var RlpWriter, data: object) {.inline.} =
|
|||
proc appendImpl(self: var RlpWriter, data: tuple) {.inline.} =
|
||||
self.appendRecordType(data)
|
||||
|
||||
# We define a single `append` template with a pretty low specifity
|
||||
# We define a single `append` template with a pretty low specificity
|
||||
# score in order to facilitate easier overloading with user types:
|
||||
template append*[T](w: var RlpWriter; data: T) =
|
||||
when data is float64:
|
||||
|
|
|
@ -86,7 +86,7 @@ proc put*(db: MemoryLayer, key, val: openArray[byte]) =
|
|||
|
||||
if key.len != 32:
|
||||
# This is not a Trie key, but a regular system mutable key
|
||||
# (e.g. the cannonical head hash). We don't need to ref count such keys.
|
||||
# (e.g. the canonical head hash). We don't need to ref count such keys.
|
||||
db.records[key] = MemDBRec(refCount: 1, value: @val)
|
||||
else:
|
||||
db.records.withValue(key, v) do:
|
||||
|
|
|
@ -19,7 +19,7 @@ type
|
|||
DelayHistogram* = object
|
||||
delayBase*: uint32
|
||||
currentDelayHistory: array[currentDelaySize, uint32]
|
||||
currentDelyIdx: int
|
||||
currentDelayIdx: int
|
||||
delayBaseHistory: array[delayBaseHistory, uint32]
|
||||
delayBaseIdx: int
|
||||
delayBaseTime: Moment
|
||||
|
@ -49,8 +49,8 @@ proc addSample*(h: var DelayHistogram, sample: uint32, currentTime: Moment) =
|
|||
|
||||
let delay = sample - h.delayBase
|
||||
|
||||
h.currentDelayHistory[h.currentDelyIdx] = delay
|
||||
h.currentDelyIdx = (h.currentDelyIdx + 1) mod currentDelaySize
|
||||
h.currentDelayHistory[h.currentDelayIdx] = delay
|
||||
h.currentDelayIdx = (h.currentDelayIdx + 1) mod currentDelaySize
|
||||
|
||||
if (currentTime - h.delayBaseTime > delayBaseUpdateInterval):
|
||||
h.delayBaseTime = currentTime
|
||||
|
@ -64,7 +64,7 @@ proc addSample*(h: var DelayHistogram, sample: uint32, currentTime: Moment) =
|
|||
|
||||
proc getValue*(h: DelayHistogram): Duration =
|
||||
var value = uint32.high
|
||||
# this will return zero if not all samples are colected
|
||||
# this will return zero if not all samples are collected
|
||||
for sample in h.currentDelayHistory:
|
||||
value = min(sample, value)
|
||||
|
||||
|
|
|
@ -12,9 +12,9 @@ export options
|
|||
# Buffer implementation similar to the one in in reference implementation.
|
||||
# Main rationale for it, is to refer to items in buffer by their sequence number,
|
||||
# and support out of order packets.
|
||||
# Therefore it is super specific data structure, and it mostly usefull for
|
||||
# Therefore it is super specific data structure, and it mostly usefully for
|
||||
# utp implementation.
|
||||
# Another alternative would be to use standard deque from deques module, and caluclate
|
||||
# Another alternative would be to use standard deque from deques module, and calculate
|
||||
# item indexes from their sequence numbers.
|
||||
type
|
||||
GrowableCircularBuffer*[A] = object
|
||||
|
|
|
@ -24,7 +24,7 @@ const minWindowSize* = 10
|
|||
proc applyCongestionControl*(
|
||||
currentMaxWindowSize: uint32,
|
||||
currentSlowStart: bool,
|
||||
currentSlowStartTreshold: uint32,
|
||||
currentSlowStartThreshold: uint32,
|
||||
maxSndBufferSize: uint32,
|
||||
currentPacketSize: uint32,
|
||||
actualDelay: Duration,
|
||||
|
@ -34,7 +34,7 @@ proc applyCongestionControl*(
|
|||
clockDrift: int32
|
||||
): (uint32, uint32, bool) =
|
||||
if (actualDelay.isZero() or minRtt.isZero() or numOfAckedBytes == 0):
|
||||
return (currentMaxWindowSize, currentSlowStartTreshold, currentSlowStart)
|
||||
return (currentMaxWindowSize, currentSlowStartThreshold, currentSlowStart)
|
||||
|
||||
let ourDelay = min(minRtt, calculatedDelay)
|
||||
|
||||
|
@ -50,7 +50,7 @@ proc applyCongestionControl*(
|
|||
# people trying to "cheat" uTP by making their clock run slower,
|
||||
# and this definitely catches that without any risk of false positives
|
||||
# if clock_drift < -200000 start applying a penalty delay proportional
|
||||
# to how far beoynd -200000 the clock drift is
|
||||
# to how far beyond -200000 the clock drift is
|
||||
let clockDriftPenalty: int64 =
|
||||
if (clockDrift < -200000):
|
||||
let penalty = (-clockDrift - 200000) div 7
|
||||
|
@ -81,17 +81,17 @@ proc applyCongestionControl*(
|
|||
|
||||
var newSlowStart = currentSlowStart
|
||||
var newMaxWindowSize = currentMaxWindowSize
|
||||
var newSlowStartTreshold = currentSlowStartTreshold
|
||||
var newSlowStartThreshold = currentSlowStartThreshold
|
||||
|
||||
if currentSlowStart:
|
||||
let slowStartCwnd = currentMaxWindowSize + uint32(windowFactor * float64(currentPacketSize))
|
||||
|
||||
if (slowStartCwnd > currentSlowStartTreshold):
|
||||
if (slowStartCwnd > currentSlowStartThreshold):
|
||||
newSlowStart = false
|
||||
elif float64(ourDelay.microseconds()) > float64(target.microseconds()) * 0.9:
|
||||
# we are just a litte under target delay, discontinute slows start
|
||||
# we are just a little under target delay, discontinue slows start
|
||||
newSlowStart = false
|
||||
newSlowStartTreshold = currentMaxWindowSize
|
||||
newSlowStartThreshold = currentMaxWindowSize
|
||||
else:
|
||||
newMaxWindowSize = max(slowStartCwnd, ledbatCwnd)
|
||||
else:
|
||||
|
@ -99,4 +99,4 @@ proc applyCongestionControl*(
|
|||
|
||||
newMaxWindowSize = clamp(newMaxWindowSize, minWindowSize, maxSndBufferSize)
|
||||
|
||||
(newMaxWindowSize, newSlowStartTreshold, newSlowStart)
|
||||
(newMaxWindowSize, newSlowStartThreshold, newSlowStart)
|
||||
|
|
|
@ -63,7 +63,7 @@ type
|
|||
# 1. Microsecond precisions
|
||||
# 2. Monotonicity
|
||||
# Reference lib have a lot of checks to assume that this is monotonic on
|
||||
# every system, and warnings when monotonic clock is not avaialable.
|
||||
# every system, and warnings when monotonic clock is not available.
|
||||
proc getMonoTimestamp*(): TimeStampInfo =
|
||||
let currentMoment = Moment.now()
|
||||
|
||||
|
@ -109,7 +109,7 @@ proc encodeHeaderStream(s: var OutputStream, h: PacketHeaderV1) =
|
|||
|
||||
proc encodeExtensionStream(s: var OutputStream, e: SelectiveAckExtension) =
|
||||
try:
|
||||
# writing 0 as there is not further extensions after selectiv ack
|
||||
# writing 0 as there is not further extensions after selective ack
|
||||
s.write(0'u8)
|
||||
s.write(acksArrayLength)
|
||||
s.write(e.acks)
|
||||
|
@ -180,7 +180,7 @@ proc decodePacket*(bytes: openArray[byte]): Result[Packet, string] =
|
|||
|
||||
# As selective ack is only supported extension the byte for nextExtension
|
||||
# must be equal to 0.
|
||||
# As for extLength, specificaiton says that it must be at least 4, and in multiples of 4
|
||||
# As for extLength, specification says that it must be at least 4, and in multiples of 4
|
||||
# but reference implementation always uses 4 bytes bit mask which makes sense
|
||||
# as 4byte bit mask is able to ack 32 packets in the future which is more than enough
|
||||
if (nextExtension != 0 or extLength != 4):
|
||||
|
@ -221,7 +221,7 @@ proc synPacket*(seqNr: uint16, rcvConnectionId: uint16, bufferSize: uint32): Pac
|
|||
timestampDiff: 0'u32,
|
||||
wndSize: bufferSize,
|
||||
seqNr: seqNr,
|
||||
# Initialy we did not receive any acks
|
||||
# Initially we did not receive any acks
|
||||
ackNr: 0'u16
|
||||
)
|
||||
|
||||
|
|
|
@ -6,7 +6,7 @@
|
|||
|
||||
{.push raises: [Defect].}
|
||||
|
||||
import
|
||||
import
|
||||
chronos, stew/[results, byteutils],
|
||||
./utp_router,
|
||||
./utp_socket,
|
||||
|
@ -21,7 +21,7 @@ import
|
|||
when isMainModule:
|
||||
proc echoIncomingSocketCallBack(): AcceptConnectionCallback[TransportAddress] =
|
||||
return (
|
||||
proc (server: UtpRouter[TransportAddress], client: UtpSocket[TransportAddress]): Future[void] {.gcsafe, raises: [Defect].} =
|
||||
proc (server: UtpRouter[TransportAddress], client: UtpSocket[TransportAddress]): Future[void] {.gcsafe, raises: [Defect].} =
|
||||
echo "received incoming connection"
|
||||
let fakeFuture = newFuture[void]()
|
||||
fakeFuture.complete()
|
||||
|
@ -37,7 +37,7 @@ when isMainModule:
|
|||
|
||||
doAssert(soc.numPacketsInOutGoingBuffer() == 0)
|
||||
|
||||
let helloUtp = "Helllo from nim implementation"
|
||||
let helloUtp = "Hello from nim implementation"
|
||||
let bytes = helloUtp.toBytes()
|
||||
|
||||
discard waitFor soc.write(bytes)
|
||||
|
|
|
@ -67,8 +67,8 @@ proc initSendCallback(
|
|||
proc (to: NodeAddress, data: seq[byte]): Future[void] =
|
||||
let fut = newFuture[void]()
|
||||
# hidden assumption here is that nodes already have established discv5 session
|
||||
# between each other. In our use case this should be true as openning stream
|
||||
# is only done after succesful OFFER/ACCEPT or FINDCONTENT/CONTENT exchange
|
||||
# between each other. In our use case this should be true as opening stream
|
||||
# is only done after successful OFFER/ACCEPT or FINDCONTENT/CONTENT exchange
|
||||
# which forces nodes to establish session between each other.
|
||||
discard t.talkReqDirect(to, subProtocolName, data)
|
||||
fut.complete()
|
||||
|
|
|
@ -16,7 +16,7 @@ logScope:
|
|||
topics = "utp"
|
||||
|
||||
type
|
||||
# For now utp protocol is tied to udp transport, but ultimatly we would like to
|
||||
# For now utp protocol is tied to udp transport, but ultimately we would like to
|
||||
# abstract underlying transport to be able to run utp over udp, discoveryv5 or
|
||||
# maybe some test transport
|
||||
UtpProtocol* = ref object
|
||||
|
@ -26,9 +26,9 @@ type
|
|||
SendCallbackBuilder* = proc (d: DatagramTransport): SendCallback[TransportAddress] {.gcsafe, raises: [Defect].}
|
||||
|
||||
# This should probably be defined in TransportAddress module, as hash function should
|
||||
# be consitent with equality function
|
||||
# be consistent with equality function
|
||||
# in nim zero arrays always have hash equal to 0, irrespectively of array size, to
|
||||
# avoid clashes betweend different types of addresses, each type have mixed different
|
||||
# avoid clashes between different types of addresses, each type have mixed different
|
||||
# magic number
|
||||
proc hash(x: TransportAddress): Hash =
|
||||
var h: Hash = 0
|
||||
|
|
|
@ -29,7 +29,7 @@ declareCounter utp_allowed_incoming,
|
|||
declareCounter utp_declined_incoming,
|
||||
"Total number of declined incoming connections"
|
||||
declareCounter utp_success_outgoing,
|
||||
"Total number of succesful outgoing connections"
|
||||
"Total number of successful outgoing connections"
|
||||
declareCounter utp_failed_outgoing,
|
||||
"Total number of failed outgoing connections"
|
||||
|
||||
|
@ -104,7 +104,7 @@ proc registerUtpSocket[A](p: UtpRouter, s: UtpSocket[A]) =
|
|||
|
||||
proc registerIfAbsent[A](p: UtpRouter, s: UtpSocket[A]): bool =
|
||||
## Registers socket only if it's not already existing in the active sockets
|
||||
## table. Returns true if socket has been succesfuly registered.
|
||||
## table. Returns true if socket has been successfully registered.
|
||||
if p.sockets.hasKey(s.socketKey):
|
||||
false
|
||||
else:
|
||||
|
@ -165,7 +165,7 @@ proc getSocketOnReset[A](
|
|||
# id is our recv id
|
||||
let recvKey = UtpSocketKey[A].init(sender, id)
|
||||
|
||||
# id is our send id, and we did nitiate the connection, our recv id is id - 1
|
||||
# id is our send id, and we did initiate the connection, our recv id is id - 1
|
||||
let sendInitKey = UtpSocketKey[A].init(sender, id - 1)
|
||||
|
||||
# id is our send id, and we did not initiate the connection, so our recv id is id + 1
|
||||
|
@ -194,11 +194,11 @@ proc processPacket[A](r: UtpRouter[A], p: Packet, sender: A) {.async.}=
|
|||
if maybeSocket.isSome():
|
||||
debug "Received RST packet on known connection, closing socket"
|
||||
let socket = maybeSocket.unsafeGet()
|
||||
# reference implementation acutally changes the socket state to reset state unless
|
||||
# reference implementation actually changes the socket state to reset state unless
|
||||
# user explicitly closed socket before. The only difference between reset and destroy
|
||||
# state is that socket in destroy state is ultimatly deleted from active connection
|
||||
# state is that socket in destroy state is ultimately deleted from active connection
|
||||
# list but socket in reset state lingers there until user of library closes it
|
||||
# explictly.
|
||||
# explicitly.
|
||||
socket.destroy()
|
||||
else:
|
||||
debug "Received RST packet for unknown connection, ignoring"
|
||||
|
@ -335,7 +335,7 @@ proc connectTo*[A](
|
|||
return connFut
|
||||
|
||||
# Connect to provided address with provided connection id, if socket with this id
|
||||
# and address already exsits return error
|
||||
# and address already exists return error
|
||||
proc connectTo*[A](
|
||||
r: UtpRouter[A], address: A, connectionId: uint16):
|
||||
Future[ConnectionResult[A]] =
|
||||
|
|
|
@ -56,14 +56,14 @@ type
|
|||
# means that initial connection would timeout after 21s, which seems rather long
|
||||
initialSynTimeout*: Duration
|
||||
|
||||
# Number of resend re-tries of each data packet, before daclaring connection
|
||||
# Number of resend re-tries of each data packet, before declaring connection
|
||||
# failed
|
||||
dataResendsBeforeFailure*: uint16
|
||||
|
||||
# Maximnal size of receive buffer in bytes
|
||||
# Maximal size of receive buffer in bytes
|
||||
optRcvBuffer*: uint32
|
||||
|
||||
# Maximnal size of send buffer in bytes
|
||||
# Maximal size of send buffer in bytes
|
||||
optSndBuffer*: uint32
|
||||
|
||||
# If set to some(`Duration`), the incoming socket will be initialized in
|
||||
|
@ -153,10 +153,10 @@ type
|
|||
connectionIdSnd*: uint16
|
||||
# Sequence number for the next packet to be sent.
|
||||
seqNr: uint16
|
||||
# All seq number up to this havve been correctly acked by us
|
||||
# All seq number up to this have been correctly acked by us
|
||||
ackNr: uint16
|
||||
|
||||
# Should be completed after succesful connection to remote host or after timeout
|
||||
# Should be completed after successful connection to remote host or after timeout
|
||||
# for the first syn packet
|
||||
connectionFuture: Future[void]
|
||||
|
||||
|
@ -196,7 +196,7 @@ type
|
|||
rtt: Duration
|
||||
# calculated round trip time variance
|
||||
rttVar: Duration
|
||||
# Round trip timeout dynamicaly updated based on acks received from remote
|
||||
# Round trip timeout dynamically updated based on acks received from remote
|
||||
# peer
|
||||
rto: Duration
|
||||
|
||||
|
@ -215,10 +215,10 @@ type
|
|||
# loop called every 500ms to check for on going timeout status
|
||||
checkTimeoutsLoop: Future[void]
|
||||
|
||||
# number on consecutive re-transsmisions
|
||||
# number on consecutive re-transmissions
|
||||
retransmitCount: uint32
|
||||
|
||||
# Event which will complete whenever socket gets in destory state
|
||||
# Event which will complete whenever socket gets in destroy state
|
||||
closeEvent: AsyncEvent
|
||||
|
||||
# All callback to be called whenever socket gets in destroy state
|
||||
|
@ -261,8 +261,8 @@ type
|
|||
# indicator if we're in slow-start (exponential growth) phase
|
||||
slowStart: bool
|
||||
|
||||
# indiciator if we're in fast time out mode i.e we will resent
|
||||
# oldest packet un-acket in case of newer packet arriving
|
||||
# indicator if we're in fast time out mode i.e we will resend
|
||||
# oldest packet un-acked in case of newer packet arriving
|
||||
fastTimeout: bool
|
||||
|
||||
# Sequence number of the next packet we are allowed to fast-resend. This is
|
||||
|
@ -276,7 +276,7 @@ type
|
|||
duplicateAck: uint16
|
||||
|
||||
#the slow-start threshold, in bytes
|
||||
slowStartTreshold: uint32
|
||||
slowStartThreshold: uint32
|
||||
|
||||
# history of our delays
|
||||
ourHistogram: DelayHistogram
|
||||
|
@ -284,7 +284,7 @@ type
|
|||
# history of remote delays
|
||||
remoteHistogram: DelayHistogram
|
||||
|
||||
# calculator of drifiting between local and remote clocks
|
||||
# calculator of drifting between local and remote clocks
|
||||
driftCalculator: ClockDriftCalculator
|
||||
|
||||
# socket identifier
|
||||
|
@ -312,7 +312,7 @@ const
|
|||
# Default maximum size of the data packet payload. With such configuration
|
||||
# data packets will have 508 bytes (488 + 20 header).
|
||||
# 508 bytes of udp payload can translate into 576 bytes udp packet i.e
|
||||
# 508bytes paylaod + 60bytes (max ip header) + 8bytes (udp header) = 576bytes.
|
||||
# 508bytes payload + 60bytes (max ip header) + 8bytes (udp header) = 576bytes.
|
||||
# 576bytes is defined as minimum reassembly buffer size i.e
|
||||
# the minimum datagram size that we are guaranteed any implementation must support.
|
||||
# from RFC791: All hosts must be prepared
|
||||
|
@ -323,7 +323,7 @@ const
|
|||
# How often each socket check its different on going timers
|
||||
checkTimeoutsLoopInterval = milliseconds(500)
|
||||
|
||||
# Defualt initial timeout for first Syn packet
|
||||
# Default initial timeout for first Syn packet
|
||||
defaultInitialSynTimeout = milliseconds(3000)
|
||||
|
||||
# Initial timeout to receive first Data data packet after receiving initial Syn
|
||||
|
@ -359,13 +359,13 @@ const
|
|||
|
||||
duplicateAcksBeforeResend = 3
|
||||
|
||||
# minimal time before subseqent window decays
|
||||
# minimal time before subsequent window decays
|
||||
maxWindowDecay = milliseconds(100)
|
||||
|
||||
# Maximal size of reorder buffer as fraction of optRcvBuffer size following
|
||||
# semantics apply bases on rcvBuffer set to 1000 bytes:
|
||||
# if there are already 1000 bytes in rcv buffer no more bytes will be accepted to reorder buffer
|
||||
# if there are already 500 bytes in reoreder buffer, no more bytes will be accepted
|
||||
# if there are already 500 bytes in reorder buffer, no more bytes will be accepted
|
||||
# to it, and only 500 bytes can be accepted to rcv buffer
|
||||
# this way there is always a space in rcv buffer to fit new data if the reordering
|
||||
# happens
|
||||
|
@ -550,7 +550,7 @@ proc checkTimeouts(socket: UtpSocket) =
|
|||
curWindowPackets = socket.curWindowPackets,
|
||||
curWindowBytes = socket.currentWindow
|
||||
|
||||
# TODO add handling of probe time outs. Reference implemenation has mechanism
|
||||
# TODO add handling of probe time outs. Reference implementation has mechanism
|
||||
# of sending probes to determine mtu size. Probe timeouts do not count to standard
|
||||
# timeouts calculations
|
||||
|
||||
|
@ -594,7 +594,7 @@ proc checkTimeouts(socket: UtpSocket) =
|
|||
# due to high delay window has shrunk below packet size
|
||||
# which means that we cannot send more data
|
||||
# reset it to fit at least one packet
|
||||
debug "Reseting window size do fit a least one packet",
|
||||
debug "Resetting window size do fit a least one packet",
|
||||
oldWindowSize = socket.maxWindow,
|
||||
newWindowSize = currentPacketSize
|
||||
|
||||
|
@ -604,7 +604,7 @@ proc checkTimeouts(socket: UtpSocket) =
|
|||
socket.slowStart = true
|
||||
|
||||
# This will have much more sense when we will add handling of selective acks
|
||||
# as then every selecivly acked packet restes timeout timer and removes packet
|
||||
# as then every selectively acked packet resets timeout timer and removes packet
|
||||
# from out buffer.
|
||||
markAllPacketAsLost(socket)
|
||||
|
||||
|
@ -709,14 +709,14 @@ proc isClosedAndCleanedUpAllResources*(socket: UtpSocket): bool =
|
|||
|
||||
proc destroy*(s: UtpSocket) =
|
||||
debug "Destroying socket", to = s.socketKey
|
||||
## Moves socket to destroy state and clean all reasources.
|
||||
## Moves socket to destroy state and clean all resources.
|
||||
## Remote is not notified in any way about socket end of life
|
||||
s.state = Destroy
|
||||
s.eventLoop.cancel()
|
||||
# This procedure initiate cleanup process which goes like:
|
||||
# Cancel EventLoop -> Cancel timeoutsLoop -> Fire closeEvent
|
||||
# This is necessary due to how evenLoop look like i.e it has only one await
|
||||
# point on `eventQueue.get` which trigger cancellation excepion only when
|
||||
# point on `eventQueue.get` which trigger cancellation exception only when
|
||||
# someone will try run `eventQueue.put`. Without `eventQueue.put` , eventLoop
|
||||
# future shows as cancelled, but handler for CancelledError is not run
|
||||
|
||||
|
@ -784,7 +784,7 @@ proc ackPacket(socket: UtpSocket, seqNr: uint16, currentTime: Moment): AckResult
|
|||
debug "Acked packet (deleted from outgoing buffer)",
|
||||
pkSeqNr = seqNr,
|
||||
pkTransmissions = packet.transmissions,
|
||||
pkNeedReesend = packet.needResend
|
||||
pkNeedResend = packet.needResend
|
||||
|
||||
# from spec: The rtt and rtt_var is only updated for packets that were sent only once.
|
||||
# This avoids problems with figuring out which packet was acked, the first or the second one.
|
||||
|
@ -868,7 +868,7 @@ proc isAckNrInvalid(socket: UtpSocket, packet: Packet): bool =
|
|||
|
||||
# counts the number of bytes acked by selective ack header
|
||||
proc calculateSelectiveAckBytes*(socket: UtpSocket, receivedPackedAckNr: uint16, ext: SelectiveAckExtension): uint32 =
|
||||
# we add 2, as the first bit in the mask therefore represents ackNr + 2 becouse
|
||||
# we add 2, as the first bit in the mask therefore represents ackNr + 2 because
|
||||
# ackNr + 1 (i.e next expected packet) is considered lost.
|
||||
let base = receivedPackedAckNr + 2
|
||||
|
||||
|
@ -914,11 +914,11 @@ proc tryDecayWindow(socket: UtpSocket, now: Moment) =
|
|||
|
||||
socket.maxWindow = newMaxWindow
|
||||
socket.slowStart = false
|
||||
socket.slowStartTreshold = newMaxWindow
|
||||
socket.slowStartThreshold = newMaxWindow
|
||||
|
||||
# ack packets (removes them from out going buffer) based on selective ack extension header
|
||||
proc selectiveAckPackets(socket: UtpSocket, receivedPackedAckNr: uint16, ext: SelectiveAckExtension, currentTime: Moment): void =
|
||||
# we add 2, as the first bit in the mask therefore represents ackNr + 2 becouse
|
||||
# we add 2, as the first bit in the mask therefore represents ackNr + 2 because
|
||||
# ackNr + 1 (i.e next expected packet) is considered lost.
|
||||
let base = receivedPackedAckNr + 2
|
||||
|
||||
|
@ -1070,7 +1070,7 @@ proc sendAck(socket: UtpSocket) =
|
|||
|
||||
proc tryfinalizeConnection(socket: UtpSocket, p: Packet) =
|
||||
# To avoid amplification attacks, server socket is in SynRecv state until
|
||||
# it receices first data transfer
|
||||
# it receives first data transfer
|
||||
# https://www.usenix.org/system/files/conference/woot15/woot15-paper-adamsky.pdf
|
||||
# Socket is in SynRecv state only when recv timeout is configured
|
||||
if (socket.state == SynRecv and p.header.pType == ST_DATA):
|
||||
|
@ -1087,7 +1087,7 @@ proc tryfinalizeConnection(socket: UtpSocket, p: Packet) =
|
|||
socket.connectionFuture.complete()
|
||||
|
||||
# TODO at socket level we should handle only FIN/DATA/ACK packets. Refactor to make
|
||||
# it enforcable by type system
|
||||
# it enforceable by type system
|
||||
proc processPacketInternal(socket: UtpSocket, p: Packet) =
|
||||
|
||||
debug "Process packet",
|
||||
|
@ -1114,7 +1114,7 @@ proc processPacketInternal(socket: UtpSocket, p: Packet) =
|
|||
return
|
||||
|
||||
## Updates socket state based on received packet, and sends ack when necessary.
|
||||
## Shoyuld be called in main packet receiving loop
|
||||
## Should be called in main packet receiving loop
|
||||
let pkSeqNr = p.header.seqNr
|
||||
let pkAckNr = p.header.ackNr
|
||||
|
||||
|
@ -1122,7 +1122,7 @@ proc processPacketInternal(socket: UtpSocket, p: Packet) =
|
|||
|
||||
# number of packets past the expected
|
||||
# ack_nr is the last acked, seq_nr is the
|
||||
# current. Subtracring 1 makes 0 mean "this is the next expected packet"
|
||||
# current. Subtracting 1 makes 0 mean "this is the next expected packet"
|
||||
let pastExpected = pkSeqNr - socket.ackNr - 1
|
||||
|
||||
# acks is the number of packets that was acked, in normal case - no selective
|
||||
|
@ -1154,21 +1154,21 @@ proc processPacketInternal(socket: UtpSocket, p: Packet) =
|
|||
p.header.pType == ST_STATE:
|
||||
inc socket.duplicateAck
|
||||
|
||||
debug "Recevied duplicated ack",
|
||||
debug "Received duplicated ack",
|
||||
pkAckNr = pkAckNr,
|
||||
duplicatAckCounter = socket.duplicateAck
|
||||
duplicateAckCounter = socket.duplicateAck
|
||||
else:
|
||||
socket.duplicateAck = 0
|
||||
# spec says that in case of duplicate ack counter larger that duplicateAcksBeforeResend
|
||||
# we should re-send oldest packet, on the other hand refrence implementation
|
||||
# we should re-send oldest packet, on the other hand reference implementation
|
||||
# has code path which does it commented out with todo. Currently to be as close
|
||||
# to refrence impl we do not resend packets in that case
|
||||
# to reference impl we do not resend packets in that case
|
||||
|
||||
debug "Packet state variables",
|
||||
pastExpected = pastExpected,
|
||||
acks = acks
|
||||
|
||||
# If packet is totally of the mark short circout the processing
|
||||
# If packet is totally off the mark, short-circuit the processing
|
||||
if pastExpected >= reorderBufferMaxSize:
|
||||
|
||||
# if `pastExpected` is really big number (for example: uint16.high) then most
|
||||
|
@ -1202,7 +1202,7 @@ proc processPacketInternal(socket: UtpSocket, p: Packet) =
|
|||
|
||||
# we are using uint32 not a Duration, to wrap a round in case of
|
||||
# sentTimeRemote > receipTimestamp. This can happen as local and remote
|
||||
# clock can be not synchornized or even using different system clock.
|
||||
# clock can be not synchronized or even using different system clock.
|
||||
# i.e this number itself does not tell anything and is only used to feedback it
|
||||
# to remote peer with each sent packet
|
||||
let remoteDelay =
|
||||
|
@ -1238,11 +1238,11 @@ proc processPacketInternal(socket: UtpSocket, p: Packet) =
|
|||
socket.ourHistogram.shift(diff)
|
||||
|
||||
let currentPacketSize = socket.getPacketSize()
|
||||
let (newMaxWindow, newSlowStartTreshold, newSlowStart) =
|
||||
let (newMaxWindow, newSlowStartThreshold, newSlowStart) =
|
||||
applyCongestionControl(
|
||||
socket.maxWindow,
|
||||
socket.slowStart,
|
||||
socket.slowStartTreshold,
|
||||
socket.slowStartThreshold,
|
||||
socket.socketConfig.optSndBuffer,
|
||||
currentPacketSize,
|
||||
microseconds(actualDelay),
|
||||
|
@ -1256,12 +1256,12 @@ proc processPacketInternal(socket: UtpSocket, p: Packet) =
|
|||
socket.maxWindow = newMaxWindow
|
||||
socket.maxRemoteWindow = p.header.wndSize
|
||||
socket.slowStart = newSlowStart
|
||||
socket.slowStartTreshold = newSlowStartTreshold
|
||||
socket.slowStartThreshold = newSlowStartThreshold
|
||||
|
||||
debug "Applied ledbat congestion controller",
|
||||
maxWindow = newMaxWindow,
|
||||
remoteWindow = p.header.wndSize,
|
||||
slowStartTreshold = newSlowStartTreshold,
|
||||
slowStartThreshold = newSlowStartThreshold,
|
||||
slowstart = newSlowStart
|
||||
|
||||
if (socket.zeroWindowTimer.isNone() and socket.maxRemoteWindow <= currentPacketSize):
|
||||
|
@ -1382,14 +1382,14 @@ proc processPacketInternal(socket: UtpSocket, p: Packet) =
|
|||
reorderCount = socket.reorderCount
|
||||
|
||||
while true:
|
||||
# We are doing this in reoreder loop, to handle the case when we already received
|
||||
# We are doing this in reorder loop, to handle the case when we already received
|
||||
# fin but there were some gaps before eof
|
||||
# we have reached remote eof, and should not receive more packets from remote
|
||||
if ((not socket.reachedFin) and socket.gotFin and socket.eofPktNr == socket.ackNr):
|
||||
debug "Reached socket EOF"
|
||||
# In case of reaching eof, it is up to user of library what to to with
|
||||
# it. With the current implementation, the most apropriate way would be to
|
||||
# destory it (as with our implementation we know that remote is destroying its acked fin)
|
||||
# it. With the current implementation, the most appropriate way would be to
|
||||
# destroy it (as with our implementation we know that remote is destroying its acked fin)
|
||||
# as any other send will either generate timeout, or socket will be forcefully
|
||||
# closed by reset
|
||||
socket.reachedFin = true
|
||||
|
@ -1416,7 +1416,7 @@ proc processPacketInternal(socket: UtpSocket, p: Packet) =
|
|||
packetSeqNr = packet.header.seqNr,
|
||||
packetAckNr = packet.header.ackNr,
|
||||
socketSeqNr = socket.seqNr,
|
||||
socektAckNr = socket.ackNr,
|
||||
socketAckNr = socket.ackNr,
|
||||
rcvbufferSize = socket.offset,
|
||||
reorderBufferSize = socket.inBufferBytes
|
||||
|
||||
|
@ -1473,7 +1473,7 @@ proc processPacketInternal(socket: UtpSocket, p: Packet) =
|
|||
packetSeqNr = p.header.seqNr,
|
||||
packetAckNr = p.header.ackNr,
|
||||
socketSeqNr = socket.seqNr,
|
||||
socektAckNr = socket.ackNr,
|
||||
socketAckNr = socket.ackNr,
|
||||
rcvbufferSize = socket.offset,
|
||||
reorderBufferSize = socket.inBufferBytes
|
||||
|
||||
|
@ -1482,7 +1482,7 @@ proc processPacketInternal(socket: UtpSocket, p: Packet) =
|
|||
socket.inBufferBytes = socket.inBufferBytes + payloadLength
|
||||
debug "added out of order packet to reorder buffer",
|
||||
reorderCount = socket.reorderCount
|
||||
# we send ack packet, as we reoreder count is > 0, so the eack bitmask will be
|
||||
# we send ack packet, as we reorder count is > 0, so the eack bitmask will be
|
||||
# generated
|
||||
socket.sendAck()
|
||||
|
||||
|
@ -1521,14 +1521,14 @@ proc onRead(socket: UtpSocket, readReq: var ReadReq): ReadResult =
|
|||
|
||||
debug "Read finished",
|
||||
bytesRead = len(readReq.bytesAvailable),
|
||||
socektAtEof = socket.atEof()
|
||||
socketAtEof = socket.atEof()
|
||||
|
||||
readReq.reader.complete(readReq.bytesAvailable)
|
||||
return ReadFinished
|
||||
else:
|
||||
debug "Read not finished",
|
||||
bytesRead = len(readReq.bytesAvailable),
|
||||
socektAtEof = socket.atEof()
|
||||
socketAtEof = socket.atEof()
|
||||
|
||||
return ReadNotFinished
|
||||
else:
|
||||
|
@ -1540,14 +1540,14 @@ proc onRead(socket: UtpSocket, readReq: var ReadReq): ReadResult =
|
|||
if (len(readReq.bytesAvailable) == readReq.bytesToRead):
|
||||
debug "Read finished",
|
||||
bytesRead = len(readReq.bytesAvailable),
|
||||
socektAtEof = socket.atEof()
|
||||
socketAtEof = socket.atEof()
|
||||
|
||||
readReq.reader.complete(readReq.bytesAvailable)
|
||||
return ReadFinished
|
||||
else:
|
||||
debug "Read not finished",
|
||||
bytesRead = len(readReq.bytesAvailable),
|
||||
socektAtEof = socket.atEof()
|
||||
socketAtEof = socket.atEof()
|
||||
|
||||
return ReadNotFinished
|
||||
|
||||
|
@ -1568,7 +1568,7 @@ proc eventLoop(socket: UtpSocket) {.async.} =
|
|||
discard socket.pendingReads.popFirst()
|
||||
of ReadNotFinished:
|
||||
# there was not enough bytes in buffer to finish this read request,
|
||||
# stop processing fruther reeads
|
||||
# stop processing further reads
|
||||
break
|
||||
else:
|
||||
# read was cancelled or socket is already finished move on to next read
|
||||
|
@ -1625,7 +1625,7 @@ proc eventLoop(socket: UtpSocket) {.async.} =
|
|||
# check if the writer was not cancelled in mean time
|
||||
if (not ev.readReq.reader.finished()):
|
||||
if (socket.pendingReads.len() > 0):
|
||||
# there is already pending unfininshed read request, schedule this one for
|
||||
# there is already pending unfinished read request, schedule this one for
|
||||
# later
|
||||
socket.pendingReads.addLast(ev.readReq)
|
||||
else:
|
||||
|
@ -1645,8 +1645,8 @@ proc eventLoop(socket: UtpSocket) {.async.} =
|
|||
w.writer.complete(res)
|
||||
for r in socket.pendingReads.items():
|
||||
# complete every reader with already read bytes
|
||||
# TODO: it maybe better to refine read api to returl Future[Result[seq[byte], E]]
|
||||
# and return erros for not finished reads
|
||||
# TODO: it maybe better to refine read api to return Future[Result[seq[byte], E]]
|
||||
# and return errors for not finished reads
|
||||
if (not r.reader.finished()):
|
||||
r.reader.complete(r.bytesAvailable)
|
||||
socket.pendingWrites.clear()
|
||||
|
@ -1668,7 +1668,7 @@ proc readingClosed(socket: UtpSocket): bool =
|
|||
socket.atEof() or socket.state == Destroy
|
||||
|
||||
proc close*(socket: UtpSocket) =
|
||||
## Gracefully closes conneciton (send FIN) if socket is in connected state
|
||||
## Gracefully closes connection (send FIN) if socket is in connected state
|
||||
## does not wait for socket to close
|
||||
if socket.state != Destroy:
|
||||
case socket.state
|
||||
|
@ -1692,11 +1692,11 @@ proc close*(socket: UtpSocket) =
|
|||
socket.destroy()
|
||||
|
||||
proc closeWait*(socket: UtpSocket) {.async.} =
|
||||
## Gracefully closes conneciton (send FIN) if socket is in connected state
|
||||
## Gracefully closes connection (send FIN) if socket is in connected state
|
||||
## and waits for socket to be closed.
|
||||
## Warning: if FIN packet for some reason will be lost, then socket will be closed
|
||||
## due to retransmission failure which may take some time.
|
||||
## default is 4 retransmissions with doubling of rto between each retranssmision
|
||||
## default is 4 retransmissions with doubling of rto between each retransmission
|
||||
socket.close()
|
||||
await socket.closeEvent.wait()
|
||||
|
||||
|
@ -1782,7 +1782,7 @@ proc read*(socket: UtpSocket): Future[seq[byte]] =
|
|||
|
||||
return fut
|
||||
|
||||
# Check how many packets are still in the out going buffer, usefull for tests or
|
||||
# Check how many packets are still in the out going buffer, usefully for tests or
|
||||
# debugging.
|
||||
proc numPacketsInOutGoingBuffer*(socket: UtpSocket): int =
|
||||
var num = 0
|
||||
|
@ -1797,10 +1797,10 @@ proc numOfBytesInFlight*(socket: UtpSocket): uint32 = socket.currentWindow
|
|||
# Check how many bytes are in incoming buffer
|
||||
proc numOfBytesInIncomingBuffer*(socket: UtpSocket): uint32 = uint32(socket.offset)
|
||||
|
||||
# Check how many packets are still in the reorder buffer, usefull for tests or
|
||||
# Check how many packets are still in the reorder buffer, useful for tests or
|
||||
# debugging.
|
||||
# It throws assertion error when number of elements in buffer do not equal kept counter
|
||||
proc numPacketsInReordedBuffer*(socket: UtpSocket): int =
|
||||
proc numPacketsInReorderedBuffer*(socket: UtpSocket): int =
|
||||
var num = 0
|
||||
for e in socket.inBuffer.items():
|
||||
if e.isSome():
|
||||
|
@ -1840,9 +1840,9 @@ proc new[A](
|
|||
let currentTime = getMonoTimestamp().moment
|
||||
|
||||
# Initial max window size. Reference implementation uses value which enables one packet
|
||||
# to be transfered.
|
||||
# to be transferred.
|
||||
# We use value two times higher as we do not yet have proper mtu estimation, and
|
||||
# our impl should work over udp and discovery v5 (where proper estmation may be harder
|
||||
# our impl should work over udp and discovery v5 (where proper estimation may be harder
|
||||
# as packets already have discoveryv5 envelope)
|
||||
let initMaxWindow = 2 * cfg.payloadSize
|
||||
T(
|
||||
|
@ -1880,7 +1880,7 @@ proc new[A](
|
|||
fastTimeout: false,
|
||||
fastResendSeqNr: initialSeqNr,
|
||||
lastWindowDecay: currentTime - maxWindowDecay,
|
||||
slowStartTreshold: cfg.optSndBuffer,
|
||||
slowStartThreshold: cfg.optSndBuffer,
|
||||
ourHistogram: DelayHistogram.init(currentTime),
|
||||
remoteHistogram: DelayHistogram.init(currentTime),
|
||||
driftCalculator: ClockDriftCalculator.init(currentTime),
|
||||
|
@ -1906,7 +1906,7 @@ proc newOutgoingSocket*[A](
|
|||
rcvConnectionId,
|
||||
sndConnectionId,
|
||||
initialSeqNr,
|
||||
# Initialy ack nr is 0, as we do not know remote inital seqnr
|
||||
# Initially ack nr is 0, as we do not know remote initial seqnr
|
||||
0,
|
||||
cfg.initialSynTimeout
|
||||
)
|
||||
|
@ -1926,8 +1926,8 @@ proc newIncomingSocket*[A](
|
|||
# it does not matter what timeout value we put here, as socket will be in
|
||||
# connected state without outgoing packets in buffer so any timeout hit will
|
||||
# just double rto without any penalties
|
||||
# although we cannont use 0, as then timeout will be constantly re-set to 500ms
|
||||
# and there will be a lot of not usefull work done
|
||||
# although we cannot use 0, as then timeout will be constantly re-set to 500ms
|
||||
# and there will be a lot of not useful work done
|
||||
(Connected, defaultInitialSynTimeout)
|
||||
else:
|
||||
let timeout = cfg.incomingSocketReceiveTimeout.unsafeGet()
|
||||
|
@ -1962,7 +1962,7 @@ proc startOutgoingSocket*(socket: UtpSocket): Future[void] =
|
|||
seqNr = packet.header.seqNr,
|
||||
connectionId = packet.header.connectionId
|
||||
# set number of transmissions to 1 as syn packet will be send just after
|
||||
# initiliazation
|
||||
# initialization
|
||||
let outgoingPacket = OutgoingPacket.init(encodePacket(packet), 1, false, 0)
|
||||
socket.registerOutgoingPacket(outgoingPacket)
|
||||
socket.startEventLoop()
|
||||
|
|
|
@ -259,7 +259,7 @@ procSuite "SqStoreRef":
|
|||
|
||||
var sums: seq[seq[byte]] = @[]
|
||||
|
||||
# Use custom function, which interprest blobs as uint32 numbers and sums
|
||||
# Use custom function, which interprets blobs as uint32 numbers and sums
|
||||
# them together
|
||||
let sumKeyVal = db.prepareStmt(
|
||||
"SELECT sum32(key, value) FROM kvstore;",
|
||||
|
@ -274,10 +274,10 @@ procSuite "SqStoreRef":
|
|||
|
||||
discard sumKeyVal.exec do (res: seq[byte]):
|
||||
sums.add(res)
|
||||
|
||||
|
||||
check:
|
||||
len(sums) == 1
|
||||
|
||||
|
||||
let sum = uint32.fromBytesBE(sums[0])
|
||||
|
||||
check:
|
||||
|
|
|
@ -38,7 +38,7 @@ test:
|
|||
|
||||
try:
|
||||
targetNode.receive(address, msg)
|
||||
# These errors are also catched in `processClient` in discovery.nim
|
||||
# These errors are also caught in `processClient` in discovery.nim
|
||||
# TODO: move them a layer down in discovery so we can do a cleaner test there?
|
||||
except RlpError as e:
|
||||
debug "Receive failed", err = e.msg
|
||||
|
|
|
@ -13,12 +13,12 @@ init:
|
|||
|
||||
enrRecA = enr.Record.init(1, privKeyA,
|
||||
some(ValidIpAddress.init("127.0.0.1")), some(Port(9000)),
|
||||
some(Port(9000))).expect("Properly intialized private key")
|
||||
some(Port(9000))).expect("Properly initialized private key")
|
||||
nodeA = newNode(enrRecA).expect("Properly initialized record")
|
||||
|
||||
enrRecB = enr.Record.init(1, privKeyB,
|
||||
some(ValidIpAddress.init("127.0.0.1")), some(Port(9000)),
|
||||
some(Port(9000))).expect("Properly intialized private key")
|
||||
some(Port(9000))).expect("Properly initialized private key")
|
||||
nodeB = newNode(enrRecB).expect("Properly initialized record")
|
||||
|
||||
var codecB = Codec(localNode: nodeB, privKey: privKeyB,
|
||||
|
@ -34,4 +34,4 @@ test:
|
|||
|
||||
let decoded = decodePacket(codecB, nodeA.address.get(), @iv & maskedHeader)
|
||||
if decoded.isErr():
|
||||
debug "Error occured", error = decoded.error
|
||||
debug "Error occurred", error = decoded.error
|
||||
|
|
|
@ -44,7 +44,7 @@ proc generateNode*(privKey: PrivateKey, port: int = 20302,
|
|||
localEnrFields: openArray[FieldPair] = []): Node =
|
||||
let port = Port(port)
|
||||
let enr = enr.Record.init(1, privKey, some(ip),
|
||||
some(port), some(port), localEnrFields).expect("Properly intialized private key")
|
||||
some(port), some(port), localEnrFields).expect("Properly initialized private key")
|
||||
result = newNode(enr).expect("Properly initialized node")
|
||||
|
||||
proc generateNRandomNodes*(rng: var HmacDrbgContext, n: int): seq[Node] =
|
||||
|
|
|
@ -95,7 +95,7 @@ const data = [
|
|||
e7c301a0c05559f4c25db65e36820b4b909a226171a60ac6cb7beea09376d6d8""")
|
||||
]
|
||||
|
||||
# Thies test vectors was copied from EIP8 specfication
|
||||
# These test vectors were copied from EIP8 specification
|
||||
# https://github.com/ethereum/EIPs/blob/master/EIPS/eip-8.md
|
||||
const eip8data = [
|
||||
("initiator_private_key",
|
||||
|
|
|
@ -135,10 +135,10 @@ procSuite "Discovery Tests":
|
|||
let neighbours1Future = bootNode.kademlia.findNode(nodesSeen, targetNodeId, peerNode)
|
||||
|
||||
# This will raise an assertion error if `findNode` doesn't check for and ignore
|
||||
# this second call to the same target and peer in rapid successfion.
|
||||
# this second call to the same target and peer in rapid succession.
|
||||
let neighbours2Future = bootNode.kademlia.findNode(nodesSeen, targetNodeId, peerNode)
|
||||
|
||||
# Just for completness, verify the result is empty from the second call.
|
||||
# Just for completeness, verify the result is empty from the second call.
|
||||
let neighbours2 = await neighbours2Future
|
||||
check(neighbours2.len == 0)
|
||||
|
||||
|
|
|
@ -622,7 +622,7 @@ suite "Discovery v5 Tests":
|
|||
privKey = PrivateKey.random(rng[])
|
||||
enrRec = enr.Record.init(1, privKey,
|
||||
some(ValidIpAddress.init("127.0.0.1")), some(Port(9000)),
|
||||
some(Port(9000))).expect("Properly intialized private key")
|
||||
some(Port(9000))).expect("Properly initialized private key")
|
||||
sendNode = newNode(enrRec).expect("Properly initialized record")
|
||||
var codec = Codec(localNode: sendNode, privKey: privKey, sessions: Sessions.init(5))
|
||||
|
||||
|
@ -651,7 +651,7 @@ suite "Discovery v5 Tests":
|
|||
privKey = PrivateKey.random(rng[])
|
||||
enrRec = enr.Record.init(1, privKey,
|
||||
some(ValidIpAddress.init("127.0.0.1")), some(Port(9000)),
|
||||
some(Port(9000))).expect("Properly intialized private key")
|
||||
some(Port(9000))).expect("Properly initialized private key")
|
||||
sendNode = newNode(enrRec).expect("Properly initialized record")
|
||||
var codec = Codec(localNode: sendNode, privKey: privKey, sessions: Sessions.init(5))
|
||||
for i in 0 ..< 5:
|
||||
|
@ -682,7 +682,7 @@ suite "Discovery v5 Tests":
|
|||
privKey = PrivateKey.random(rng[])
|
||||
enrRec = enr.Record.init(1, privKey,
|
||||
some(ValidIpAddress.init("127.0.0.1")), some(Port(9000)),
|
||||
some(Port(9000))).expect("Properly intialized private key")
|
||||
some(Port(9000))).expect("Properly initialized private key")
|
||||
sendNode = newNode(enrRec).expect("Properly initialized record")
|
||||
var codec = Codec(localNode: sendNode, privKey: privKey, sessions: Sessions.init(5))
|
||||
|
||||
|
|
|
@ -259,12 +259,12 @@ suite "Discovery v5.1 Packet Encodings Test Vectors":
|
|||
|
||||
enrRecA = enr.Record.init(1, privKeyA,
|
||||
some(ValidIpAddress.init("127.0.0.1")), some(Port(9000)),
|
||||
some(Port(9000))).expect("Properly intialized private key")
|
||||
some(Port(9000))).expect("Properly initialized private key")
|
||||
nodeA = newNode(enrRecA).expect("Properly initialized record")
|
||||
|
||||
enrRecB = enr.Record.init(1, privKeyB,
|
||||
some(ValidIpAddress.init("127.0.0.1")), some(Port(9000)),
|
||||
some(Port(9000))).expect("Properly intialized private key")
|
||||
some(Port(9000))).expect("Properly initialized private key")
|
||||
nodeB = newNode(enrRecB).expect("Properly initialized record")
|
||||
|
||||
var
|
||||
|
@ -480,12 +480,12 @@ suite "Discovery v5.1 Additional Encode/Decode":
|
|||
|
||||
enrRecA = enr.Record.init(1, privKeyA,
|
||||
some(ValidIpAddress.init("127.0.0.1")), some(Port(9000)),
|
||||
some(Port(9000))).expect("Properly intialized private key")
|
||||
some(Port(9000))).expect("Properly initialized private key")
|
||||
nodeA = newNode(enrRecA).expect("Properly initialized record")
|
||||
|
||||
enrRecB = enr.Record.init(1, privKeyB,
|
||||
some(ValidIpAddress.init("127.0.0.1")), some(Port(9000)),
|
||||
some(Port(9000))).expect("Properly intialized private key")
|
||||
some(Port(9000))).expect("Properly initialized private key")
|
||||
nodeB = newNode(enrRecB).expect("Properly initialized record")
|
||||
|
||||
var
|
||||
|
|
|
@ -89,7 +89,7 @@ suite "Routing Table Tests":
|
|||
# Adding another should fail as both buckets will be full and not be
|
||||
# allowed to split another time.
|
||||
check table.addNode(node.nodeAtDistance(rng[], 256)) == ReplacementAdded
|
||||
# And also when targetting one of the two specific buckets.
|
||||
# And also when targeting one of the two specific buckets.
|
||||
check table.addNode(firstNode.nodeAtDistance(rng[], 255)) == ReplacementAdded
|
||||
check table.addNode(firstNode.nodeAtDistance(rng[], 254)) == ReplacementAdded
|
||||
# This add should be allowed as it is on the branch where the own node's id
|
||||
|
@ -535,7 +535,7 @@ suite "Routing Table Tests":
|
|||
let neighbours = table.neighbours(local.id)
|
||||
check len(neighbours) == numNodes
|
||||
|
||||
# check that neighbours are sorted by provdied custom distance funciton
|
||||
# check that neighbours are sorted by provided custom distance function
|
||||
for i in 0..numNodes-2:
|
||||
let prevDist = customDistance(local.id, neighbours[i].id)
|
||||
let nextDist = customDistance(local.id, neighbours[i + 1].id)
|
||||
|
|
|
@ -46,7 +46,7 @@ suite "Utp ring buffer":
|
|||
buff.get(13) == some(13)
|
||||
buff.get(14) == some(14)
|
||||
|
||||
test "Modifing existing element in buffer":
|
||||
test "Modifying existing element in buffer":
|
||||
var buff = GrowableCircularBuffer[TestObj].init(size = 4)
|
||||
let oldText = "test"
|
||||
let newText = "testChanged"
|
||||
|
@ -104,7 +104,7 @@ suite "Utp ring buffer":
|
|||
buff.put(15, 15)
|
||||
|
||||
check:
|
||||
# it growed to next power of two
|
||||
# it grew to next power of two
|
||||
buff.len() == 8
|
||||
buff.get(11) == some(11)
|
||||
buff.get(12) == some(12)
|
||||
|
@ -157,7 +157,7 @@ suite "Utp ring buffer":
|
|||
buff.ensureSize(3, 4)
|
||||
buff.put(3, 3)
|
||||
|
||||
# all elements should be available thorugh old indexes
|
||||
# all elements should be available through old indexes
|
||||
check:
|
||||
buff.get(65535) == some(65535)
|
||||
buff.get(0) == some(0)
|
||||
|
|
|
@ -38,7 +38,7 @@ suite "Clock drift calculator":
|
|||
# first sample which will be treated as a base sample
|
||||
calculator.addSample(10, currentTime + seconds(3))
|
||||
|
||||
# second sample in the first inteval it will be treated in relation to first one
|
||||
# second sample in the first interval it will be treated in relation to first one
|
||||
# so correct first drift should be: (50 - 10) / 2 == 20
|
||||
calculator.addSample(50, currentTime + seconds(6))
|
||||
|
||||
|
@ -46,7 +46,7 @@ suite "Clock drift calculator":
|
|||
calculator.clockDrift == 2
|
||||
calculator.lastClockDrift == 20
|
||||
|
||||
test "Clock drift should properly calcuated when clock drifts to two sides":
|
||||
test "Clock drift should properly calculated when clock drifts to two sides":
|
||||
let currentTime = Moment.now()
|
||||
var calculator1 = ClockDriftCalculator.init(currentTime)
|
||||
var calculator2 = ClockDriftCalculator.init(currentTime)
|
||||
|
|
|
@ -205,7 +205,7 @@ procSuite "Utp protocol over udp tests":
|
|||
|
||||
await utpProt1.shutdownWait()
|
||||
|
||||
asyncTest "Success connect to remote host which initialy was offline":
|
||||
asyncTest "Success connect to remote host which initially was offline":
|
||||
let server1Called = newAsyncEvent()
|
||||
let address = initTAddress("127.0.0.1", 9079)
|
||||
let utpProt1 = UtpProtocol.new(setAcceptedCallback(server1Called), address, nil, SocketConfig.init(milliseconds(500)))
|
||||
|
@ -214,7 +214,7 @@ procSuite "Utp protocol over udp tests":
|
|||
|
||||
let futSock = utpProt1.connectTo(address1)
|
||||
|
||||
# waiting 400 milisecond will trigger at least one re-send
|
||||
# waiting 400 millisecond will trigger at least one re-send
|
||||
await sleepAsync(milliseconds(400))
|
||||
|
||||
var server2Called = newAsyncEvent()
|
||||
|
@ -276,7 +276,7 @@ procSuite "Utp protocol over udp tests":
|
|||
let bytesReceivedFromClient = await transferData(s.clientSocket, s.serverSocket, bytesToTransfer)
|
||||
let bytesReceivedFromServer = await transferData(s.serverSocket, s.clientSocket, bytesToTransfer)
|
||||
|
||||
# ultimatly all send packets will acked, and outgoing buffer will be empty
|
||||
# ultimately all send packets will acked, and outgoing buffer will be empty
|
||||
await waitUntil(proc (): bool = s.clientSocket.numPacketsInOutGoingBuffer() == 0)
|
||||
await waitUntil(proc (): bool = s.serverSocket.numPacketsInOutGoingBuffer() == 0)
|
||||
|
||||
|
@ -316,7 +316,7 @@ procSuite "Utp protocol over udp tests":
|
|||
|
||||
let bytesReceived = await s.serverSocket.read(len(bytesToTransfer) + len(bytesToTransfer1))
|
||||
|
||||
# ultimatly all send packets will acked, and outgoing buffer will be empty
|
||||
# ultimately all send packets will acked, and outgoing buffer will be empty
|
||||
await waitUntil(proc (): bool = s.clientSocket.numPacketsInOutGoingBuffer() == 0)
|
||||
|
||||
check:
|
||||
|
@ -350,7 +350,7 @@ procSuite "Utp protocol over udp tests":
|
|||
|
||||
await s.close()
|
||||
|
||||
asyncTest "Gracefull stop of the socket":
|
||||
asyncTest "Gracefully stop of the socket":
|
||||
let s = await initClientServerScenario()
|
||||
check:
|
||||
s.clientSocket.isConnected()
|
||||
|
@ -443,7 +443,7 @@ procSuite "Utp protocol over udp tests":
|
|||
check:
|
||||
allowedSocketRes.isOk()
|
||||
notAllowedSocketRes.isErr()
|
||||
# remote did not allow this connection, and utlimatly it did time out
|
||||
# remote did not allow this connection, and ultimately it did time out
|
||||
notAllowedSocketRes.error().kind == ConnectionTimedOut
|
||||
|
||||
let clientSocket = allowedSocketRes.get()
|
||||
|
@ -475,12 +475,12 @@ procSuite "Utp protocol over udp tests":
|
|||
|
||||
let bytesReceivedFromClient = await transferData(s.clientSocket, s.serverSocket, bytesToTransfer)
|
||||
|
||||
# ultimatly all send packets will acked, and outgoing buffer will be empty
|
||||
# ultimately all send packets will acked, and outgoing buffer will be empty
|
||||
await waitUntil(proc (): bool = s.clientSocket.numPacketsInOutGoingBuffer() == 0)
|
||||
|
||||
check:
|
||||
# we can only assert that window has grown, becouse specific values depends on
|
||||
# particual timings
|
||||
# we can only assert that window has grown, because specific values depends on
|
||||
# particular timings
|
||||
s.clientSocket.currentMaxWindowSize > startMaxWindow
|
||||
s.serverSocket.isConnected()
|
||||
s.clientSocket.numPacketsInOutGoingBuffer() == 0
|
||||
|
@ -507,14 +507,14 @@ procSuite "Utp protocol over udp tests":
|
|||
|
||||
let bytesReceivedFromClient = await transferData(s.clientSocket, s.serverSocket, bytesToTransfer)
|
||||
|
||||
# ultimatly all send packets will acked, and outgoing buffer will be empty
|
||||
# ultimately all send packets will acked, and outgoing buffer will be empty
|
||||
await waitUntil(proc (): bool = s.clientSocket.numPacketsInOutGoingBuffer() == 0)
|
||||
|
||||
let maximumMaxWindow = s.clientSocket.currentMaxWindowSize
|
||||
|
||||
check:
|
||||
# we can only assert that window has grown, becouse specific values depends on
|
||||
# particual timings
|
||||
# we can only assert that window has grown, because specific values depends on
|
||||
# particular timings
|
||||
maximumMaxWindow > startMaxWindow
|
||||
s.serverSocket.isConnected()
|
||||
s.clientSocket.numPacketsInOutGoingBuffer() == 0
|
||||
|
|
|
@ -103,7 +103,7 @@ procSuite "Utp protocol over udp tests with loss and delays":
|
|||
return (utpProt1, clientSocket, utpProt2, serverSocket)
|
||||
|
||||
type TestCase = object
|
||||
# in miliseconds
|
||||
# in milliseconds
|
||||
maxDelay: int
|
||||
dropRate: int
|
||||
bytesToTransfer: int
|
||||
|
@ -137,7 +137,7 @@ procSuite "Utp protocol over udp tests with loss and delays":
|
|||
|
||||
let smallBytes = 10
|
||||
let smallBytesToTransfer = rng[].generateBytes(smallBytes)
|
||||
# first transfer and read to make server socket connecteced
|
||||
# first transfer and read to make server socket connected
|
||||
let write1 = await clientSocket.write(smallBytesToTransfer)
|
||||
let read1 = await serverSocket.read(smallBytes)
|
||||
|
||||
|
@ -193,7 +193,7 @@ procSuite "Utp protocol over udp tests with loss and delays":
|
|||
|
||||
let smallBytes = 10
|
||||
let smallBytesToTransfer = rng[].generateBytes(smallBytes)
|
||||
# first transfer and read to make server socket connecteced
|
||||
# first transfer and read to make server socket connected
|
||||
let write1 = await clientSocket.write(smallBytesToTransfer)
|
||||
let read1 = await serverSocket.read(smallBytes)
|
||||
|
||||
|
|
|
@ -75,7 +75,7 @@ procSuite "Utp router unit tests":
|
|||
let outgoingSocket = await connectFuture
|
||||
(outgoingSocket.get(), initialPacket)
|
||||
|
||||
asyncTest "Router should ingnore non utp packets":
|
||||
asyncTest "Router should ignore non utp packets":
|
||||
let q = newAsyncQueue[UtpSocket[int]]()
|
||||
let router = UtpRouter[int].new(registerIncomingSocketCallback(q), SocketConfig.init(), rng)
|
||||
router.sendCb = testSend
|
||||
|
@ -116,7 +116,7 @@ procSuite "Utp router unit tests":
|
|||
check:
|
||||
router.len() == connectionsLimit
|
||||
|
||||
asyncTest "Incoming connection should be closed when not receving data for period of time when configured":
|
||||
asyncTest "Incoming connection should be closed when not receiving data for period of time when configured":
|
||||
let q = newAsyncQueue[UtpSocket[int]]()
|
||||
let router =
|
||||
UtpRouter[int].new(
|
||||
|
@ -213,7 +213,7 @@ procSuite "Utp router unit tests":
|
|||
check:
|
||||
socket.isConnected()
|
||||
|
||||
asyncTest "Router should create new incoming socket when receiving same syn packet from diffrent sender":
|
||||
asyncTest "Router should create new incoming socket when receiving same syn packet from different sender":
|
||||
let q = newAsyncQueue[UtpSocket[int]]()
|
||||
let router = UtpRouter[int].new(registerIncomingSocketCallback(q), SocketConfig.init(), rng)
|
||||
router.sendCb = testSend
|
||||
|
@ -366,7 +366,7 @@ procSuite "Utp router unit tests":
|
|||
router.sendCb =
|
||||
proc (to: int, data: seq[byte]): Future[void] =
|
||||
let f = newFuture[void]()
|
||||
f.fail(newException(TestError, "faile"))
|
||||
f.fail(newException(TestError, "failed"))
|
||||
return f
|
||||
|
||||
let connectResult = await router.connectTo(testSender2)
|
||||
|
|
|
@ -96,14 +96,14 @@ procSuite "Utp socket unit test":
|
|||
|
||||
asyncTest "Processing in order data packet should upload it to buffer and ack packet":
|
||||
let q = newAsyncQueue[Packet]()
|
||||
let initalRemoteSeqNr = 10'u16
|
||||
let initialRemoteSeqNr = 10'u16
|
||||
let data = @[1'u8, 2'u8, 3'u8]
|
||||
|
||||
let (outgoingSocket, initialPacket) = connectOutGoingSocket(initalRemoteSeqNr, q)
|
||||
let (outgoingSocket, initialPacket) = connectOutGoingSocket(initialRemoteSeqNr, q)
|
||||
|
||||
let dataP1 =
|
||||
dataPacket(
|
||||
initalRemoteSeqNr,
|
||||
initialRemoteSeqNr,
|
||||
initialPacket.header.connectionId,
|
||||
initialPacket.header.seqNr,
|
||||
testBufferSize,
|
||||
|
@ -116,7 +116,7 @@ procSuite "Utp socket unit test":
|
|||
|
||||
check:
|
||||
ack1.header.pType == ST_STATE
|
||||
ack1.header.ackNr == initalRemoteSeqNr
|
||||
ack1.header.ackNr == initialRemoteSeqNr
|
||||
|
||||
let receivedBytes = await outgoingSocket.read(len(data))
|
||||
|
||||
|
@ -127,14 +127,14 @@ procSuite "Utp socket unit test":
|
|||
|
||||
asyncTest "Processing duplicated fresh data packet should ack it and stop processing":
|
||||
let q = newAsyncQueue[Packet]()
|
||||
let initalRemoteSeqNr = 10'u16
|
||||
let initialRemoteSeqNr = 10'u16
|
||||
let data = @[1'u8, 2'u8, 3'u8]
|
||||
|
||||
let (outgoingSocket, initialPacket) = connectOutGoingSocket(initalRemoteSeqNr, q)
|
||||
let (outgoingSocket, initialPacket) = connectOutGoingSocket(initialRemoteSeqNr, q)
|
||||
|
||||
let dataP1 =
|
||||
dataPacket(
|
||||
initalRemoteSeqNr,
|
||||
initialRemoteSeqNr,
|
||||
initialPacket.header.connectionId,
|
||||
initialPacket.header.seqNr,
|
||||
testBufferSize,
|
||||
|
@ -148,7 +148,7 @@ procSuite "Utp socket unit test":
|
|||
|
||||
check:
|
||||
ack1.header.pType == ST_STATE
|
||||
ack1.header.ackNr == initalRemoteSeqNr
|
||||
ack1.header.ackNr == initialRemoteSeqNr
|
||||
|
||||
let receivedBytes = await outgoingSocket.read(len(data))
|
||||
|
||||
|
@ -162,7 +162,7 @@ procSuite "Utp socket unit test":
|
|||
|
||||
check:
|
||||
ack2.header.pType == ST_STATE
|
||||
ack2.header.ackNr == initalRemoteSeqNr
|
||||
ack2.header.ackNr == initialRemoteSeqNr
|
||||
# we do not upload data one more time
|
||||
outgoingSocket.numOfBytesInIncomingBuffer() == 0'u32
|
||||
|
||||
|
@ -171,12 +171,12 @@ procSuite "Utp socket unit test":
|
|||
asyncTest "Processing out of order data packet should buffer it until receiving in order one":
|
||||
# TODO test is valid until implementing selective acks
|
||||
let q = newAsyncQueue[Packet]()
|
||||
let initalRemoteSeqNr = 10'u16
|
||||
let initialRemoteSeqNr = 10'u16
|
||||
let numOfPackets = 10'u16
|
||||
|
||||
let (outgoingSocket, initialPacket) = connectOutGoingSocket(initalRemoteSeqNr, q)
|
||||
let (outgoingSocket, initialPacket) = connectOutGoingSocket(initialRemoteSeqNr, q)
|
||||
|
||||
var packets = generateDataPackets(numOfPackets, initalRemoteSeqNr, initialPacket.header.connectionId, initialPacket.header.seqNr, rng[])
|
||||
var packets = generateDataPackets(numOfPackets, initialRemoteSeqNr, initialPacket.header.connectionId, initialPacket.header.seqNr, rng[])
|
||||
|
||||
let data = packetsToBytes(packets)
|
||||
|
||||
|
@ -195,7 +195,7 @@ procSuite "Utp socket unit test":
|
|||
# all packets except last one should be selective acks, without bumped ackNr
|
||||
for i in 0'u16..<numOfPackets - 1:
|
||||
check:
|
||||
sentAcks[i].header.ackNr == initalRemoteSeqNr - 1
|
||||
sentAcks[i].header.ackNr == initialRemoteSeqNr - 1
|
||||
sentAcks[i].eack.isSome()
|
||||
|
||||
# last ack should be normal ack packet (not selective one), and it should ack
|
||||
|
@ -205,7 +205,7 @@ procSuite "Utp socket unit test":
|
|||
check:
|
||||
lastAck.header.pType == ST_STATE
|
||||
# we are acking in one shot whole 10 packets
|
||||
lastAck.header.ackNr == initalRemoteSeqNr + uint16(len(packets) - 1)
|
||||
lastAck.header.ackNr == initialRemoteSeqNr + uint16(len(packets) - 1)
|
||||
|
||||
lastAck.eack.isNone()
|
||||
|
||||
|
@ -219,12 +219,12 @@ procSuite "Utp socket unit test":
|
|||
asyncTest "Processing out of order data packet should ignore duplicated not ordered packets":
|
||||
# TODO test is valid until implementing selective acks
|
||||
let q = newAsyncQueue[Packet]()
|
||||
let initalRemoteSeqNr = 10'u16
|
||||
let initialRemoteSeqNr = 10'u16
|
||||
let numOfPackets = 3'u16
|
||||
|
||||
let (outgoingSocket, initialPacket) = connectOutGoingSocket(initalRemoteSeqNr, q)
|
||||
let (outgoingSocket, initialPacket) = connectOutGoingSocket(initialRemoteSeqNr, q)
|
||||
|
||||
var packets = generateDataPackets(numOfPackets, initalRemoteSeqNr, initialPacket.header.connectionId, initialPacket.header.seqNr, rng[])
|
||||
var packets = generateDataPackets(numOfPackets, initialRemoteSeqNr, initialPacket.header.connectionId, initialPacket.header.seqNr, rng[])
|
||||
|
||||
let data = packetsToBytes(packets)
|
||||
|
||||
|
@ -247,7 +247,7 @@ procSuite "Utp socket unit test":
|
|||
# all packets except last one should be selective acks, without bumped ackNr
|
||||
for i in 0'u16..<numOfPackets - 1:
|
||||
check:
|
||||
sentAcks[i].header.ackNr == initalRemoteSeqNr - 1
|
||||
sentAcks[i].header.ackNr == initialRemoteSeqNr - 1
|
||||
sentAcks[i].eack.isSome()
|
||||
|
||||
# last ack should be normal ack packet (not selective one), and it should ack
|
||||
|
@ -257,7 +257,7 @@ procSuite "Utp socket unit test":
|
|||
check:
|
||||
lastAck.header.pType == ST_STATE
|
||||
# we are acking in one shot whole 10 packets
|
||||
lastAck.header.ackNr == initalRemoteSeqNr + uint16(len(packets) - 1)
|
||||
lastAck.header.ackNr == initialRemoteSeqNr + uint16(len(packets) - 1)
|
||||
|
||||
lastAck.eack.isNone()
|
||||
|
||||
|
@ -270,11 +270,11 @@ procSuite "Utp socket unit test":
|
|||
|
||||
asyncTest "Processing packets in random order":
|
||||
let q = newAsyncQueue[Packet]()
|
||||
let initalRemoteSeqNr = 10'u16
|
||||
let initialRemoteSeqNr = 10'u16
|
||||
|
||||
let (outgoingSocket, initialPacket) = connectOutGoingSocket(initalRemoteSeqNr, q)
|
||||
let (outgoingSocket, initialPacket) = connectOutGoingSocket(initialRemoteSeqNr, q)
|
||||
|
||||
var packets = generateDataPackets(30, initalRemoteSeqNr, initialPacket.header.connectionId, initialPacket.header.seqNr, rng[])
|
||||
var packets = generateDataPackets(30, initialRemoteSeqNr, initialPacket.header.connectionId, initialPacket.header.seqNr, rng[])
|
||||
|
||||
let data = packetsToBytes(packets)
|
||||
|
||||
|
@ -288,7 +288,7 @@ procSuite "Utp socket unit test":
|
|||
let receivedData = await outgoingSocket.read(len(data))
|
||||
|
||||
check:
|
||||
# with packets totally out of order we cannont assert on acks
|
||||
# with packets totally out of order we cannot assert on acks
|
||||
# as they can be fired at any point. What matters is that data is passed
|
||||
# in same order as received.
|
||||
receivedData == data
|
||||
|
@ -297,11 +297,11 @@ procSuite "Utp socket unit test":
|
|||
|
||||
asyncTest "Ignoring totally out of order packet":
|
||||
let q = newAsyncQueue[Packet]()
|
||||
let initalRemoteSeqNr = 10'u16
|
||||
let initialRemoteSeqNr = 10'u16
|
||||
|
||||
let (outgoingSocket, initialPacket) = connectOutGoingSocket(initalRemoteSeqNr, q)
|
||||
let (outgoingSocket, initialPacket) = connectOutGoingSocket(initialRemoteSeqNr, q)
|
||||
|
||||
var packets = generateDataPackets(1025, initalRemoteSeqNr, initialPacket.header.connectionId, initialPacket.header.seqNr, rng[])
|
||||
var packets = generateDataPackets(1025, initialRemoteSeqNr, initialPacket.header.connectionId, initialPacket.header.seqNr, rng[])
|
||||
|
||||
await outgoingSocket.processPacket(packets[1024])
|
||||
|
||||
|
@ -311,7 +311,7 @@ procSuite "Utp socket unit test":
|
|||
await sleepAsync(milliseconds(500))
|
||||
|
||||
check:
|
||||
outgoingSocket.numPacketsInReordedBuffer() == 1
|
||||
outgoingSocket.numPacketsInReorderedBuffer() == 1
|
||||
|
||||
await outgoingSocket.destroyWait()
|
||||
|
||||
|
@ -390,11 +390,11 @@ procSuite "Utp socket unit test":
|
|||
|
||||
await waitUntil(proc (): bool = outgoingSocket.numPacketsInOutGoingBuffer() == 0)
|
||||
|
||||
let maxWindowAfterSuccesfulSends = outgoingSocket.currentMaxWindowSize()
|
||||
let maxWindowAfterSuccessfulSends = outgoingSocket.currentMaxWindowSize()
|
||||
|
||||
check:
|
||||
# after processing a lot of data, our window size should be a lot bigger than our packet size
|
||||
maxWindowAfterSuccesfulSends > uint32(outgoingSocket.getPacketSize())
|
||||
maxWindowAfterSuccessfulSends > uint32(outgoingSocket.getPacketSize())
|
||||
|
||||
# cancel acking process, next writes will for sure timeout
|
||||
await acker.cancelAndWait()
|
||||
|
@ -418,7 +418,7 @@ procSuite "Utp socket unit test":
|
|||
check:
|
||||
# After standard timeout window should not decay and must be bigger than packet size
|
||||
maxWindowAfterTimeout > uint32(outgoingSocket.getPacketSize())
|
||||
maxWindowAfterTimeout == maxWindowAfterSuccesfulSends
|
||||
maxWindowAfterTimeout == maxWindowAfterSuccessfulSends
|
||||
|
||||
await outgoingSocket.destroyWait()
|
||||
|
||||
|
@ -505,7 +505,7 @@ procSuite "Utp socket unit test":
|
|||
|
||||
asyncTest "Socket should re-send data packet configurable number of times before declaring failure":
|
||||
let q = newAsyncQueue[Packet]()
|
||||
let initalRemoteSeqNr = 10'u16
|
||||
let initialRemoteSeqNr = 10'u16
|
||||
|
||||
let outgoingSocket = newOutgoingSocket[TransportAddress](
|
||||
testAddress,
|
||||
|
@ -524,7 +524,7 @@ procSuite "Utp socket unit test":
|
|||
|
||||
let responseAck =
|
||||
ackPacket(
|
||||
initalRemoteSeqNr,
|
||||
initialRemoteSeqNr,
|
||||
initialPacket.header.connectionId,
|
||||
initialPacket.header.seqNr,
|
||||
testBufferSize,
|
||||
|
@ -863,17 +863,17 @@ procSuite "Utp socket unit test":
|
|||
|
||||
await outgoingSocket.destroyWait()
|
||||
|
||||
asyncTest "Reading data from the buffer shoud increase receive window":
|
||||
asyncTest "Reading data from the buffer should increase receive window":
|
||||
let q = newAsyncQueue[Packet]()
|
||||
let initalRemoteSeqNr = 10'u16
|
||||
let initialRemoteSeqNr = 10'u16
|
||||
let initialRcvBufferSize = 10'u32
|
||||
let data = @[1'u8, 2'u8, 3'u8]
|
||||
let sCfg = SocketConfig.init(optRcvBuffer = initialRcvBufferSize)
|
||||
let (outgoingSocket, initialPacket) = connectOutGoingSocket(initalRemoteSeqNr, q, testBufferSize, sCfg)
|
||||
let (outgoingSocket, initialPacket) = connectOutGoingSocket(initialRemoteSeqNr, q, testBufferSize, sCfg)
|
||||
|
||||
let dataP1 =
|
||||
dataPacket(
|
||||
initalRemoteSeqNr,
|
||||
initialRemoteSeqNr,
|
||||
initialPacket.header.connectionId,
|
||||
initialPacket.header.seqNr,
|
||||
testBufferSize,
|
||||
|
@ -887,7 +887,7 @@ procSuite "Utp socket unit test":
|
|||
|
||||
check:
|
||||
ack1.header.pType == ST_STATE
|
||||
ack1.header.ackNr == initalRemoteSeqNr
|
||||
ack1.header.ackNr == initialRemoteSeqNr
|
||||
ack1.header.wndSize == initialRcvBufferSize - uint32(len(data))
|
||||
|
||||
let readData = await outgoingSocket.read(data.len())
|
||||
|
@ -1043,7 +1043,7 @@ procSuite "Utp socket unit test":
|
|||
check:
|
||||
int(outgoingSocket.numOfBytesInFlight) == len(dataToWrite) + len(dataToWrite1)
|
||||
|
||||
# after timeout oldest packet will be immediatly re-sent
|
||||
# after timeout oldest packet will be immediately re-sent
|
||||
let reSentFirstPacket = await q.get()
|
||||
|
||||
check:
|
||||
|
@ -1098,11 +1098,11 @@ procSuite "Utp socket unit test":
|
|||
let q = newAsyncQueue[Packet]()
|
||||
let initialRemoteSeq = 10'u16
|
||||
|
||||
let dataToWirte = 1160
|
||||
let dataToWrite = 1160
|
||||
# remote is initialized with buffer to small to handle whole payload
|
||||
let (outgoingSocket, initialPacket) = connectOutGoingSocket(initialRemoteSeq, q, cfg = SocketConfig.init(optSndBuffer = 1160))
|
||||
|
||||
let twoPacketData = rng[].generateBytes(int(dataToWirte))
|
||||
let twoPacketData = rng[].generateBytes(int(dataToWrite))
|
||||
|
||||
let writeResult = await outgoingSocket.write(twoPacketData)
|
||||
|
||||
|
@ -1179,7 +1179,7 @@ procSuite "Utp socket unit test":
|
|||
|
||||
await outgoingSocket.destroyWait()
|
||||
|
||||
asyncTest "Remote window should be reseted to minimal value after configured amount of time":
|
||||
asyncTest "Remote window should be reset to minimal value after configured amount of time":
|
||||
let q = newAsyncQueue[Packet]()
|
||||
let initialRemoteSeq = 10'u16
|
||||
let someData = @[1'u8]
|
||||
|
@ -1196,7 +1196,7 @@ procSuite "Utp socket unit test":
|
|||
check:
|
||||
outgoingSocket.isConnected()
|
||||
|
||||
# write result will be successfull as send buffer has space
|
||||
# write result will be successful as send buffer has space
|
||||
let writeResult = await outgoingSocket.write(someData)
|
||||
|
||||
# this will finish in seconds(3) as only after this time window will be set to min value
|
||||
|
@ -1225,12 +1225,12 @@ procSuite "Utp socket unit test":
|
|||
check:
|
||||
outgoingSocket.isConnected()
|
||||
|
||||
# snd buffer got 1 byte of space so this future shold finish
|
||||
# snd buffer got 1 byte of space so this future should finish
|
||||
let write1 = await outgoingSocket.write(someData1)
|
||||
|
||||
let writeFut2 = outgoingSocket.write(someData2)
|
||||
|
||||
# wait until 2 re-sends to check we do not accidently free buffer during re-sends
|
||||
# wait until 2 re-sends to check we do not accidentally free buffer during re-sends
|
||||
discard await q.get()
|
||||
discard await q.get()
|
||||
let firstPacket = await q.get()
|
||||
|
@ -1295,9 +1295,9 @@ procSuite "Utp socket unit test":
|
|||
|
||||
asyncTest "Re-sent packet should have updated timestamps and ack numbers":
|
||||
let q = newAsyncQueue[Packet]()
|
||||
let initalRemoteSeqNr = 10'u16
|
||||
let initialRemoteSeqNr = 10'u16
|
||||
|
||||
let (outgoingSocket, initialPacket) = connectOutGoingSocket(initalRemoteSeqNr, q)
|
||||
let (outgoingSocket, initialPacket) = connectOutGoingSocket(initialRemoteSeqNr, q)
|
||||
|
||||
let writeResult = await outgoingSocket.write(@[1'u8])
|
||||
|
||||
|
@ -1316,7 +1316,7 @@ procSuite "Utp socket unit test":
|
|||
|
||||
let dataP1 =
|
||||
dataPacket(
|
||||
initalRemoteSeqNr,
|
||||
initialRemoteSeqNr,
|
||||
initialPacket.header.connectionId,
|
||||
initialPacket.header.seqNr,
|
||||
testBufferSize,
|
||||
|
@ -1413,10 +1413,10 @@ procSuite "Utp socket unit test":
|
|||
let q = newAsyncQueue[Packet]()
|
||||
let initialRemoteSeq = 10'u16
|
||||
let cfg = SocketConfig.init()
|
||||
let remoteReciveBuffer = 1024'u32
|
||||
let remoteReceiveBuffer = 1024'u32
|
||||
|
||||
let dataDropped = @[1'u8]
|
||||
let dataRecived = @[2'u8]
|
||||
let dataReceived = @[2'u8]
|
||||
|
||||
let sock1 = newOutgoingSocket[TransportAddress](testAddress, initTestSnd(q), cfg, defaultRcvOutgoingId, rng[])
|
||||
|
||||
|
@ -1441,7 +1441,7 @@ procSuite "Utp socket unit test":
|
|||
initialPacket.header.connectionId,
|
||||
initialPacket.header.seqNr,
|
||||
testBufferSize,
|
||||
dataRecived,
|
||||
dataReceived,
|
||||
0
|
||||
)
|
||||
|
||||
|
@ -1450,7 +1450,7 @@ procSuite "Utp socket unit test":
|
|||
initialRemoteSeq,
|
||||
initialPacket.header.connectionId,
|
||||
initialPacket.header.seqNr,
|
||||
remoteReciveBuffer,
|
||||
remoteReceiveBuffer,
|
||||
0
|
||||
)
|
||||
|
||||
|
@ -1463,7 +1463,7 @@ procSuite "Utp socket unit test":
|
|||
let receivedData = await sock1.read(1)
|
||||
|
||||
check:
|
||||
receivedData == dataRecived
|
||||
receivedData == dataReceived
|
||||
|
||||
await sock1.destroyWait()
|
||||
|
||||
|
@ -1477,7 +1477,7 @@ procSuite "Utp socket unit test":
|
|||
|
||||
let bytesWritten = await outgoingSocket.write(dataToWrite)
|
||||
# this future will never finish as there is not place in write buffer
|
||||
# although it should get properly clearead up when socket is closed
|
||||
# although it should get properly cleared up when socket is closed
|
||||
let writeFut = outgoingSocket.write(dataToWrite)
|
||||
|
||||
check:
|
||||
|
@ -1529,12 +1529,12 @@ procSuite "Utp socket unit test":
|
|||
|
||||
asyncTest "Maximum payload size should be configurable":
|
||||
let q = newAsyncQueue[Packet]()
|
||||
let initalRemoteSeqNr = 10'u16
|
||||
let initialRemoteSeqNr = 10'u16
|
||||
let d = rng[].generateBytes(5000)
|
||||
let maxPayloadSize = 800'u32
|
||||
let config = SocketConfig.init(payloadSize = maxPayloadSize)
|
||||
|
||||
let (outgoingSocket, initialPacket) = connectOutGoingSocket(initalRemoteSeqNr, q, cfg = config)
|
||||
let (outgoingSocket, initialPacket) = connectOutGoingSocket(initialRemoteSeqNr, q, cfg = config)
|
||||
|
||||
let wr = await outgoingSocket.write(d)
|
||||
|
||||
|
|
|
@ -131,7 +131,7 @@ procSuite "Utp socket selective acks unit test":
|
|||
len(bitMask) == 4
|
||||
|
||||
type TestCase = object
|
||||
# number of packet to generate by writitng side
|
||||
# number of packet to generate by writing side
|
||||
numOfPackets: int
|
||||
# indexes of packets which should be delivered to remote
|
||||
packetsDelivered: seq[int]
|
||||
|
@ -324,6 +324,6 @@ procSuite "Utp socket selective acks unit test":
|
|||
else:
|
||||
check:
|
||||
# due to ledbat congestion control we cannot assert on precise end buffer size,
|
||||
# but due to packet loss we are sure it shoul be smaller that at the beginning
|
||||
# becouse of 0.5 muliplayer
|
||||
# but due to packet loss we are sure it should be smaller that at the beginning
|
||||
# because of 0.5 multiplayer
|
||||
endBufferSize < initialBufferSize
|
||||
|
|
Loading…
Reference in New Issue