deploy: dbe76d29cea1317450cfd476f33bfa877de6b69e

This commit is contained in:
kaiserd 2022-03-01 14:50:10 +00:00
parent 41a21a490c
commit ab3029d6c9
171 changed files with 1595 additions and 793 deletions

4
.gitmodules vendored
View File

@ -1,8 +1,8 @@
[submodule "vendor/nim-eth"]
path = vendor/nim-eth
url = https://github.com/status-im/nim-eth.git
url = https://github.com/kaiserd/nim-eth.git
ignore = dirty
branch = master
branch = add-selectable-protocol-id-static
[submodule "vendor/nim-secp256k1"]
path = vendor/nim-secp256k1
url = https://github.com/status-im/nim-secp256k1.git

View File

@ -96,6 +96,9 @@ endif
endif
endif
# use a separate waku discv5 network with `protocol-id="d5waku"`
NIM_PARAMS := $(NIM_PARAMS) -d:discv5_protocol_id:d5waku
deps: | deps-common nat-libs waku.nims rlnlib
ifneq ($(USE_LIBBACKTRACE), 0)
deps: | libbacktrace

View File

@ -74,7 +74,7 @@ nim-eth dependencies) tests, one can run following command:
# Install required modules
nimble install
# Run only discovery v5 related test suite
nimble tests_discv5_full
nimble test_discv5_full
```
## dcli

View File

@ -74,6 +74,9 @@ task test_db, "Run db tests":
task test_utp, "Run utp tests":
runTest("tests/utp/all_utp_tests")
task test_common, "Run common tests":
runTest("tests/common/test_eth_types")
task test, "Run all tests":
for filename in [
"test_bloom",
@ -87,6 +90,7 @@ task test, "Run all tests":
test_trie_task()
test_db_task()
test_utp_task()
test_common_task()
task test_discv5_full, "Run discovery v5 and its dependencies tests":
test_keys_task()

View File

@ -244,6 +244,13 @@ func baseFee*(h: BlockHeader | BlockHeaderRef): UInt256 =
template `baseFee=`*(h: BlockHeader | BlockHeaderRef, data: UInt256) =
h.fee = some(data)
# starting from EIP-4399, `mixHash`/`mixDigest` field will be alled `prevRandao`
template prevRandao*(h: BlockHeader | BlockHeaderRef): Hash256 =
h.mixDigest
template `prevRandao=`*(h: BlockHeader | BlockHeaderRef, hash: Hash256) =
h.mixDigest = hash
func toBlockNonce*(n: uint64): BlockNonce =
n.toBytesBE()

View File

@ -23,7 +23,7 @@ proc writeValue*(w: var JsonWriter, value: StUint) {.inline.} =
proc readValue*(r: var JsonReader, value: var StUint) {.inline.} =
value = parse(r.readValue(string), type(value))
proc writeValue*(w: var JsonWriter, value: Stint) =
proc writeValue*(w: var JsonWriter, value: StInt) =
# The Ethereum Yellow Paper defines the RLP serialization only
# for unsigned integers:
{.error: "RLP serialization of signed integers is not allowed".}

View File

@ -211,6 +211,48 @@ proc exec*[Params, Res](s: SqliteStmt[Params, Res],
discard sqlite3_reset(s) # same return information as step
discard sqlite3_clear_bindings(s) # no errors possible
iterator exec*[Params, Res](s: SqliteStmt[Params, Res],
params: Params, item: var Res): KvResult[void] =
let s = RawStmtPtr s
# we use a mutable `res` variable here to avoid the code bloat that multiple
# `yield` statements cause when inlining the loop body
var res = KvResult[void].ok()
when params is tuple:
var i = 1
for param in fields(params):
if (let v = bindParam(s, i, param); v != SQLITE_OK):
res = KvResult[void].err($sqlite3_errstr(v))
break
inc i
else:
if (let v = bindParam(s, 1, params); v != SQLITE_OK):
res = KvResult[void].err($sqlite3_errstr(v))
defer:
# release implicit transaction
discard sqlite3_reset(s) # same return information as step
discard sqlite3_clear_bindings(s) # no errors possible
while res.isOk():
let v = sqlite3_step(s)
case v
of SQLITE_ROW:
item = readResult(s, Res)
yield KvResult[void].ok()
of SQLITE_DONE:
break
else:
res = KvResult[void].err($sqlite3_errstr(v))
if not res.isOk():
yield res
iterator exec*[Res](s: SqliteStmt[NoParams, Res], item: var Res): KvResult[void] =
for r in exec(s, (), item):
yield r
template exec*(s: SqliteStmt[NoParams, void]): KvResult[void] =
exec(s, ())

View File

@ -3,12 +3,12 @@ const
# Ethereum Foundation Go Bootnodes
"enode://d860a01f9722d78051619d1e2351aba3f43f943f6f00718d1b9baa4101932a1f5011f16bb2b1bb35db20d6fe28fa0bf09636d26a87d31de9ec6203eeedb1f666@18.138.108.67:30303", # bootnode-aws-ap-southeast-1-001
"enode://22a8232c3abc76a16ae9d6c3b164f98775fe226f0917b0ca871128a74a8e9630b458460865bab457221f1d448dd9791d24c4e5d88786180ac185df813a68d4de@3.209.45.79:30303", # bootnode-aws-us-east-1-001
"enode://ca6de62fce278f96aea6ec5a2daadb877e51651247cb96ee310a318def462913b653963c155a0ef6c7d50048bba6e6cea881130857413d9f50a621546b590758@34.255.23.113:30303", # bootnode-aws-eu-west-1-001
"enode://279944d8dcd428dffaa7436f25ca0ca43ae19e7bcf94a8fb7d1641651f92d121e972ac2e8f381414b80cc8e5555811c2ec6e1a99bb009b3f53c4c69923e11bd8@35.158.244.151:30303", # bootnode-aws-eu-central-1-001
"enode://8499da03c47d637b20eee24eec3c356c9a2e6148d6fe25ca195c7949ab8ec2c03e3556126b0d7ed644675e78c4318b08691b7b57de10e5f0d40d05b09238fa0a@52.187.207.27:30303", # bootnode-azure-australiaeast-001
"enode://103858bdb88756c71f15e9b5e09b56dc1be52f0a5021d46301dbbfb7e130029cc9d0d6f73f693bc29b665770fff7da4d34f3c6379fe12721b5d7a0bcb5ca1fc1@191.234.162.198:30303", # bootnode-azure-brazilsouth-001
"enode://715171f50508aba88aecd1250af392a45a330af91d7b90701c436b618c86aaa1589c9184561907bebbb56439b8f8787bc01f49a7c77276c58c1b09822d75e8e8@52.231.165.108:30303", # bootnode-azure-koreasouth-001
"enode://5d6d7cd20d6da4bb83a1d28cadb5d409b64edf314c0335df658c1a54e32c7c4a7ab7823d57c39b6a757556e68ff1df17c748b698544a55cb488b52479a92b60f@104.42.217.25:30303", # bootnode-azure-westus-001
"enode://2b252ab6a1d0f971d9722cb839a42cb81db019ba44c08754628ab4a823487071b5695317c8ccd085219c3a03af063495b2f1da8d18218da2d6a82981b45e6ffc@65.108.70.101:30303", # bootnode-hetzner-hel
"enode://4aeb4ab6c14b23e2c4cfdce879c04b0748a20d8e9b59e25ded2a08143e265c6c25936e74cbc8e641e3312ca288673d91f2f93f8e277de3cfa444ecdaaf982052@157.90.35.166:30303", # bootnode-hetzner-fsn
# Ethereum Foundation C++ Bootnodes
"enode://979b7fa28feeb35a4741660a16076f1943202cb72b6af70d327f053e248bab9ba81760f39d0701ef1d8f89cc1fbd2cacba0710a12cd5314d5e0c9021aa3637f9@5.1.83.226:30303" # DE
]

View File

@ -3,14 +3,15 @@ import
confutils, confutils/std/net, chronicles, chronicles/topics_registry,
chronos, metrics, metrics/chronos_httpserver, stew/byteutils,
../../keys, ../../net/nat,
"."/[enr, node, protocol]
"."/[enr, node],
"."/protocol as discv5_protocol
type
DiscoveryCmd* = enum
noCommand
ping
findnode
talkreq
findNode
talkReq
DiscoveryConf* = object
logLevel* {.
@ -75,7 +76,7 @@ type
argument
desc: "ENR URI of the node to a send ping message"
name: "node" .}: Node
of findnode:
of findNode:
distance* {.
defaultValue: 255
desc: "Distance parameter for the findNode message"
@ -86,10 +87,10 @@ type
argument
desc: "ENR URI of the node to send a findNode message"
name: "node" .}: Node
of talkreq:
talkreqTarget* {.
of talkReq:
talkReqTarget* {.
argument
desc: "ENR URI of the node to send a talkreq message"
desc: "ENR URI of the node to send a talkReq message"
name: "node" .}: Node
func defaultListenAddress*(conf: DiscoveryConf): ValidIpAddress =
@ -131,7 +132,7 @@ proc parseCmdArg*(T: type PrivateKey, p: TaintedString): T =
proc completeCmdArg*(T: type PrivateKey, val: TaintedString): seq[string] =
return @[]
proc discover(d: protocol.Protocol) {.async.} =
proc discover(d: discv5_protocol.Protocol) {.async.} =
while true:
let discovered = await d.queryRandom()
info "Lookup finished", nodes = discovered.len
@ -171,7 +172,7 @@ proc run(config: DiscoveryConf) =
echo pong[]
else:
echo "No Pong message returned"
of findnode:
of findNode:
let nodes = waitFor d.findNode(config.findNodeTarget, @[config.distance])
if nodes.isOk():
echo "Received valid records:"
@ -179,8 +180,8 @@ proc run(config: DiscoveryConf) =
echo $node.record & " - " & shortLog(node)
else:
echo "No Nodes message returned"
of talkreq:
let talkresp = waitFor d.talkreq(config.talkreqTarget, @[], @[])
of talkReq:
let talkresp = waitFor d.talkReq(config.talkReqTarget, @[], @[])
if talkresp.isOk():
echo talkresp[]
else:

View File

@ -31,11 +31,12 @@ logScope:
topics = "discv5"
const
version: uint16 = 1
discv5_protocol_version {.intdefine.} : uint16 = 1
discv5_protocol_id {.strdefine.} = "discv5"
version = discv5_protocol_version
protocolId = toBytes(discv5_protocol_id)
idSignatureText = "discovery v5 identity proof"
keyAgreementPrefix = "discovery v5 key agreement"
protocolIdStr = "discv5"
protocolId = toBytes(protocolIdStr)
gcmNonceSize* = 12
idNonceSize* = 16
gcmTagSize* = 16
@ -394,11 +395,11 @@ proc decodeMessage*(body: openArray[byte]): DecodeResult[Message] =
of unused: return err("Invalid message type")
of ping: rlp.decode(message.ping)
of pong: rlp.decode(message.pong)
of findnode: rlp.decode(message.findNode)
of findNode: rlp.decode(message.findNode)
of nodes: rlp.decode(message.nodes)
of talkreq: rlp.decode(message.talkreq)
of talkresp: rlp.decode(message.talkresp)
of regtopic, ticket, regconfirmation, topicquery:
of talkReq: rlp.decode(message.talkReq)
of talkResp: rlp.decode(message.talkResp)
of regTopic, ticket, regConfirmation, topicQuery:
# We just pass the empty type of this message without attempting to
# decode, so that the protocol knows what was received.
# But we ignore the message as per specification as "the content and

View File

@ -26,14 +26,14 @@ type
ping = 0x01
pong = 0x02
findnode = 0x03
findNode = 0x03
nodes = 0x04
talkreq = 0x05
talkresp = 0x06
regtopic = 0x07
talkReq = 0x05
talkResp = 0x06
regTopic = 0x07
ticket = 0x08
regconfirmation = 0x09
topicquery = 0x0A
regConfirmation = 0x09
topicQuery = 0x0A
RequestId* = object
id*: seq[byte]
@ -76,32 +76,32 @@ type
ping*: PingMessage
of pong:
pong*: PongMessage
of findnode:
findnode*: FindNodeMessage
of findNode:
findNode*: FindNodeMessage
of nodes:
nodes*: NodesMessage
of talkreq:
talkreq*: TalkReqMessage
of talkresp:
talkresp*: TalkRespMessage
of regtopic:
of talkReq:
talkReq*: TalkReqMessage
of talkResp:
talkResp*: TalkRespMessage
of regTopic:
regtopic*: RegTopicMessage
of ticket:
ticket*: TicketMessage
of regconfirmation:
regconfirmation*: RegConfirmationMessage
of topicquery:
topicquery*: TopicQueryMessage
of regConfirmation:
regConfirmation*: RegConfirmationMessage
of topicQuery:
topicQuery*: TopicQueryMessage
else:
discard
template messageKind*(T: typedesc[SomeMessage]): MessageKind =
when T is PingMessage: ping
elif T is PongMessage: pong
elif T is FindNodeMessage: findnode
elif T is FindNodeMessage: findNode
elif T is NodesMessage: nodes
elif T is TalkReqMessage: talkreq
elif T is TalkRespMessage: talkresp
elif T is TalkReqMessage: talkReq
elif T is TalkRespMessage: talkResp
proc read*(rlp: var Rlp, T: type RequestId): T
{.raises: [ValueError, RlpError, Defect].} =

View File

@ -116,6 +116,10 @@ const
## call
type
DiscoveryConfig* = object
tableIpLimits*: TableIpLimits
bitsPerHop*: int
Protocol* = ref object
transp: DatagramTransport
localNode*: Node
@ -148,6 +152,11 @@ type
DiscResult*[T] = Result[T, cstring]
const
defaultDiscoveryConfig* = DiscoveryConfig(
tableIpLimits: DefaultTableIpLimits,
bitsPerHop: DefaultBitsPerHop)
proc addNode*(d: Protocol, node: Node): bool =
## Add `Node` to discovery routing table.
##
@ -224,7 +233,7 @@ proc updateRecord*(
# TODO: Would it make sense to actively ping ("broadcast") to all the peers
# we stored a handshake with in order to get that ENR updated?
proc send(d: Protocol, a: Address, data: seq[byte]) =
proc send*(d: Protocol, a: Address, data: seq[byte]) =
let ta = initTAddress(a.ip, a.port)
let f = d.transp.sendTo(ta, data)
f.callback = proc(data: pointer) {.gcsafe.} =
@ -332,13 +341,13 @@ proc handleMessage(d: Protocol, srcId: NodeId, fromAddr: Address,
of ping:
discovery_message_requests_incoming.inc()
d.handlePing(srcId, fromAddr, message.ping, message.reqId)
of findnode:
of findNode:
discovery_message_requests_incoming.inc()
d.handleFindNode(srcId, fromAddr, message.findnode, message.reqId)
of talkreq:
d.handleFindNode(srcId, fromAddr, message.findNode, message.reqId)
of talkReq:
discovery_message_requests_incoming.inc()
d.handleTalkReq(srcId, fromAddr, message.talkreq, message.reqId)
of regtopic, topicquery:
d.handleTalkReq(srcId, fromAddr, message.talkReq, message.reqId)
of regTopic, topicQuery:
discovery_message_requests_incoming.inc()
discovery_message_requests_incoming.inc(labelValues = ["no_response"])
trace "Received unimplemented message kind", kind = message.kind,
@ -565,7 +574,7 @@ proc findNode*(d: Protocol, toNode: Node, distances: seq[uint16]):
d.replaceNode(toNode)
return err(nodes.error)
proc talkreq*(d: Protocol, toNode: Node, protocol, request: seq[byte]):
proc talkReq*(d: Protocol, toNode: Node, protocol, request: seq[byte]):
Future[DiscResult[seq[byte]]] {.async.} =
## Send a discovery talkreq message.
##
@ -575,9 +584,9 @@ proc talkreq*(d: Protocol, toNode: Node, protocol, request: seq[byte]):
let resp = await d.waitMessage(toNode, reqId)
if resp.isSome():
if resp.get().kind == talkresp:
if resp.get().kind == talkResp:
d.routingTable.setJustSeen(toNode)
return ok(resp.get().talkresp.response)
return ok(resp.get().talkResp.response)
else:
d.replaceNode(toNode)
discovery_message_requests_outgoing.inc(labelValues = ["invalid_response"])
@ -603,7 +612,7 @@ proc lookupWorker(d: Protocol, destNode: Node, target: NodeId):
Future[seq[Node]] {.async.} =
let dists = lookupDistances(target, destNode.id)
# Instead of doing max `lookupRequestLimit` findNode requests, make use
# Instead of doing max `lookupRequestLimit` findNode requests, make use
# of the discv5.1 functionality to request nodes for multiple distances.
let r = await d.findNode(destNode, dists)
if r.isOk:
@ -880,18 +889,32 @@ proc ipMajorityLoop(d: Protocol) {.async.} =
except CancelledError:
trace "ipMajorityLoop canceled"
proc newProtocol*(privKey: PrivateKey,
enrIp: Option[ValidIpAddress],
enrTcpPort, enrUdpPort: Option[Port],
localEnrFields: openArray[(string, seq[byte])] = [],
bootstrapRecords: openArray[Record] = [],
previousRecord = none[enr.Record](),
bindPort: Port,
bindIp = IPv4_any(),
enrAutoUpdate = false,
tableIpLimits = DefaultTableIpLimits,
rng = newRng()):
Protocol =
func init*(
T: type DiscoveryConfig,
tableIpLimit: uint,
bucketIpLimit: uint,
bitsPerHop: int): T =
DiscoveryConfig(
tableIpLimits: TableIpLimits(
tableIpLimit: tableIpLimit,
bucketIpLimit: bucketIpLimit),
bitsPerHop: bitsPerHop
)
proc newProtocol*(
privKey: PrivateKey,
enrIp: Option[ValidIpAddress],
enrTcpPort, enrUdpPort: Option[Port],
localEnrFields: openArray[(string, seq[byte])] = [],
bootstrapRecords: openArray[Record] = [],
previousRecord = none[enr.Record](),
bindPort: Port,
bindIp = IPv4_any(),
enrAutoUpdate = false,
config = defaultDiscoveryConfig,
rng = newRng()):
Protocol =
# TODO: Tried adding bindPort = udpPort as parameter but that gave
# "Error: internal error: environment misses: udpPort" in nim-beacon-chain.
# Anyhow, nim-beacon-chain would also require some changes to support port
@ -910,6 +933,11 @@ proc newProtocol*(privKey: PrivateKey,
record = enr.Record.init(1, privKey, enrIp, enrTcpPort, enrUdpPort,
extraFields).expect("Record within size limits")
debug "Initializing discovery v5",
enrIp, enrTcpPort, enrUdpPort, enrAutoUpdate,
bootstrapEnrs = bootstrapRecords, localEnrFields,
bindPort, bindIp
info "ENR initialized", ip = enrIp, tcp = enrTcpPort, udp = enrUdpPort,
seqNum = record.seqNum, uri = toURI(record)
if enrIp.isNone():
@ -933,7 +961,8 @@ proc newProtocol*(privKey: PrivateKey,
bootstrapRecords: @bootstrapRecords,
ipVote: IpVote.init(),
enrAutoUpdate: enrAutoUpdate,
routingTable: RoutingTable.init(node, DefaultBitsPerHop, tableIpLimits, rng),
routingTable: RoutingTable.init(
node, config.bitsPerHop, config.tableIpLimits, rng),
rng: rng)
template listeningAddress*(p: Protocol): Address =

View File

@ -19,7 +19,7 @@ const targetDelay = milliseconds(100)
# Typically it's less. TCP increases one MSS per RTT, which is 1500
const maxCwndIncreaseBytesPerRtt = 3000
const minWindowSize = 10
const minWindowSize* = 10
proc applyCongestionControl*(
currentMaxWindowSize: uint32,

View File

@ -1,102 +0,0 @@
# Copyright (c) 2021 Status Research & Development GmbH
# Licensed and distributed under either of
# * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT).
# * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0).
# at your option. This file may not be copied, modified, or distributed except according to those terms.
{.push raises: [Defect].}
import
chronos
# Internal Utp data structure to track send window and properly block when there is
# no free space when trying to send more bytes
type SendBufferTracker* = ref object
# number of payload bytes in-flight (i.e not counting header sizes)
# packets that have not yet been sent do not count, packets
# that are marked as needing to be re-sent (due to a timeout)
# don't count either
currentWindow*: uint32
# remote receive window updated based on packed wndSize field
maxRemoteWindow*: uint32
# maximum window size, in bytes, calculated by local congestion controller
maxWindow*: uint32
# configuration option for maxium number of bytes in snd buffer
maxSndBufferSize*: uint32
waiters: seq[(uint32, Future[void])]
proc new*(
T: type SendBufferTracker,
currentWindow: uint32,
maxRemoteWindow: uint32,
maxSndBufferSize: uint32,
maxWindow: uint32): T =
return (
SendBufferTracker(
currentWindow: currentWindow,
maxRemoteWindow: maxRemoteWindow,
maxSndBufferSize: maxSndBufferSize,
maxWindow: maxWindow,
waiters: @[]
)
)
proc currentFreeBytes*(t: SendBufferTracker): uint32 =
let maxSend = min(min(t.maxRemoteWindow, t.maxSndBufferSize), t.maxWindow)
if (maxSend <= t.currentWindow):
return 0
else:
return maxSend - t.currentWindow
proc checkWaiters(t: SendBufferTracker) =
var i = 0
while i < len(t.waiters):
let freeSpace = t.currentFreeBytes()
let (required, fut) = t.waiters[i]
if (required <= freeSpace):
# in case future was cancelled
if (not fut.finished()):
t.currentWindow = t.currentWindow + required
fut.complete()
t.waiters.del(i)
else:
# we do not have place for next waiter, just finish processing
return
proc updateMaxRemote*(t: SendBufferTracker, newRemoteWindow: uint32) =
t.maxRemoteWindow = newRemoteWindow
t.checkWaiters()
proc updateMaxWindowSize*(t: SendBufferTracker, newRemoteWindow: uint32, maxWindow: uint32) =
t.maxRemoteWindow = newRemoteWindow
t.maxWindow = maxWindow
t.checkWaiters()
proc decreaseCurrentWindow*(t: SendBufferTracker, value: uint32, notifyWaiters: bool) =
doAssert(t.currentWindow >= value)
t.currentWindow = t.currentWindow - value
if (notifyWaiters):
t.checkWaiters()
proc reserveNBytesWait*(t: SendBufferTracker, n: uint32): Future[void] =
let fut = newFuture[void]("SendBufferTracker.reserveNBytesWait")
let free = t.currentFreeBytes()
if (n <= free):
t.currentWindow = t.currentWindow + n
fut.complete()
else:
t.waiters.add((n, fut))
fut
proc reserveNBytes*(t: SendBufferTracker, n: uint32): bool =
let free = t.currentFreeBytes()
if (n <= free):
t.currentWindow = t.currentWindow + n
return true
else:
return false
proc currentBytesInFlight*(t: SendBufferTracker): uint32 = t.currentWindow

View File

@ -7,9 +7,9 @@
{.push raises: [Defect].}
import
std/[hashes],
std/[hashes, sugar],
chronos, chronicles,
../p2p/discoveryv5/protocol,
../p2p/discoveryv5/[protocol, messages, encoding],
./utp_router,
../keys
@ -18,64 +18,82 @@ export utp_router, protocol, chronicles
logScope:
topics = "utp_discv5_protocol"
type UtpDiscv5Protocol* = ref object of TalkProtocol
prot: protocol.Protocol
router: UtpRouter[Node]
type
NodeAddress* = object
nodeId*: NodeId
address*: Address
proc hash(x: UtpSocketKey[Node]): Hash =
UtpDiscv5Protocol* = ref object of TalkProtocol
prot: protocol.Protocol
router: UtpRouter[NodeAddress]
proc init*(T: type NodeAddress, nodeId: NodeId, address: Address): NodeAddress =
NodeAddress(nodeId: nodeId, address: address)
proc init*(T: type NodeAddress, node: Node): Option[NodeAddress] =
node.address.map((address: Address) => NodeAddress(nodeId: node.id, address: address))
proc hash(x: NodeAddress): Hash =
var h = 0
h = h !& x.nodeId.hash
h = h !& x.address.hash
!$h
proc hash(x: UtpSocketKey[NodeAddress]): Hash =
var h = 0
h = h !& x.remoteAddress.hash
h = h !& x.rcvId.hash
!$h
func `$`*(x: UtpSocketKey[Node]): string =
"(remoteId: " & $x.remoteAddress.id &
func `$`*(x: UtpSocketKey[NodeAddress]): string =
"(remoteId: " & $x.remoteAddress.nodeId &
", remoteAddress: " & $x.remoteAddress.address &
", rcvId: "& $x.rcvId &
")"
proc talkReqDirect(p: protocol.Protocol, n: NodeAddress, protocol, request: seq[byte]): Future[void] =
let
reqId = RequestId.init(p.rng[])
message = encodeMessage(TalkReqMessage(protocol: protocol, request: request), reqId)
(data, nonce) = encodeMessagePacket(p.rng[], p.codec, n.nodeId, n.address, message)
trace "Send message packet", dstId = n.nodeId, address = n.address, kind = MessageKind.talkreq
p.send(n.address, data)
proc initSendCallback(
t: protocol.Protocol, subProtocolName: seq[byte]): SendCallback[Node] =
t: protocol.Protocol, subProtocolName: seq[byte]): SendCallback[NodeAddress] =
return (
proc (to: Node, data: seq[byte]): Future[void] =
proc (to: NodeAddress, data: seq[byte]): Future[void] =
let fut = newFuture[void]()
# TODO: In discovery v5 each talkreq waits for a talkresp, but here we
# would really like the fire and forget semantics (similar to udp).
# For now start talkreq/talkresp in background, and discard its result.
# That way we also lose information about any possible errors.
# Consider adding talkreq proc which does not wait for the response.
discard t.talkreq(to, subProtocolName, data)
# hidden assumption here is that nodes already have established discv5 session
# between each other. In our use case this should be true as openning stream
# is only done after succesful OFFER/ACCEPT or FINDCONTENT/CONTENT exchange
# which forces nodes to establish session between each other.
discard t.talkReqDirect(to, subProtocolName, data)
fut.complete()
return fut
)
proc messageHandler(protocol: TalkProtocol, request: seq[byte],
srcId: NodeId, srcUdpAddress: Address): seq[byte] =
let p = UtpDiscv5Protocol(protocol)
let maybeSender = p.prot.getNode(srcId)
if maybeSender.isSome():
debug "Received utp payload from known node. Start processing"
let sender = maybeSender.unsafeGet()
# processIncomingBytes may respond to remote by using talkreq requests
asyncSpawn p.router.processIncomingBytes(request, sender)
# We always send empty responses as discv5 spec requires that talkreq
# always receives a talkresp.
@[]
else:
debug "Received utp payload from unknown node. Ignore"
@[]
let
p = UtpDiscv5Protocol(protocol)
nodeAddress = NodeAddress.init(srcId, srcUdpAddress)
debug "Received utp payload from known node. Start processing",
nodeId = nodeAddress.nodeId, address = nodeAddress.address
asyncSpawn p.router.processIncomingBytes(request, nodeAddress)
proc new*(
T: type UtpDiscv5Protocol,
p: protocol.Protocol,
subProtocolName: seq[byte],
acceptConnectionCb: AcceptConnectionCallback[Node],
allowConnectionCb: AllowConnectionCallback[Node] = nil,
acceptConnectionCb: AcceptConnectionCallback[NodeAddress],
allowConnectionCb: AllowConnectionCallback[NodeAddress] = nil,
socketConfig: SocketConfig = SocketConfig.init()): UtpDiscv5Protocol =
doAssert(not(isNil(acceptConnectionCb)))
let router = UtpRouter[Node].new(
let router = UtpRouter[NodeAddress].new(
acceptConnectionCb,
allowConnectionCb,
socketConfig,
@ -94,12 +112,12 @@ proc new*(
)
prot
proc connectTo*(r: UtpDiscv5Protocol, address: Node):
Future[ConnectionResult[Node]] =
proc connectTo*(r: UtpDiscv5Protocol, address: NodeAddress):
Future[ConnectionResult[NodeAddress]] =
return r.router.connectTo(address)
proc connectTo*(r: UtpDiscv5Protocol, address: Node, connectionId: uint16):
Future[ConnectionResult[Node]] =
proc connectTo*(r: UtpDiscv5Protocol, address: NodeAddress, connectionId: uint16):
Future[ConnectionResult[NodeAddress]] =
return r.router.connectTo(address, connectionId)
proc shutdown*(r: UtpDiscv5Protocol) =

View File

@ -23,6 +23,8 @@ type
transport: DatagramTransport
utpRouter: UtpRouter[TransportAddress]
SendCallbackBuilder* = proc (d: DatagramTransport): SendCallback[TransportAddress] {.gcsafe, raises: [Defect].}
# This should probably be defined in TransportAddress module, as hash function should
# be consitent with equality function
# in nim zero arrays always have hash equal to 0, irrespectively of array size, to
@ -78,6 +80,7 @@ proc new*(
address: TransportAddress,
socketConfig: SocketConfig = SocketConfig.init(),
allowConnectionCb: AllowConnectionCallback[TransportAddress] = nil,
sendCallbackBuilder: SendCallbackBuilder = nil,
rng = newRng()): UtpProtocol {.raises: [Defect, CatchableError].} =
doAssert(not(isNil(acceptConnectionCb)))
@ -90,7 +93,12 @@ proc new*(
)
let ta = newDatagramTransport(processDatagram, udata = router, local = address)
router.sendCb = initSendCallback(ta)
if (sendCallbackBuilder == nil):
router.sendCb = initSendCallback(ta)
else:
router.sendCb = sendCallbackBuilder(ta)
UtpProtocol(transport: ta, utpRouter: router)
proc shutdownWait*(p: UtpProtocol): Future[void] {.async.} =

View File

@ -168,7 +168,7 @@ proc processPacket[A](r: UtpRouter[A], p: Packet, sender: A) {.async.}=
# Initial ackNr is set to incoming packer seqNr
let incomingSocket = newIncomingSocket[A](sender, r.sendCb, r.socketConfig ,p.header.connectionId, p.header.seqNr, r.rng[])
r.registerUtpSocket(incomingSocket)
await incomingSocket.startIncomingSocket()
incomingSocket.startIncomingSocket()
# Based on configuration, socket is passed to upper layer either in SynRecv
# or Connected state
info "Accepting incoming connection",
@ -235,12 +235,6 @@ proc connect[A](s: UtpSocket[A]): Future[ConnectionResult[A]] {.async.}=
to = s.socketKey
s.destroy()
return err(OutgoingConnectionError(kind: ConnectionTimedOut))
except CatchableError as e:
info "Outgoing connection failed due to send error",
to = s.socketKey
s.destroy()
# this may only happen if user provided callback will for some reason fail
return err(OutgoingConnectionError(kind: ErrorWhileSendingSyn, error: e))
# Connect to provided address
# Reference implementation: https://github.com/bittorrent/libutp/blob/master/utp_internal.cpp#L2732

File diff suppressed because it is too large Load Diff

View File

@ -49,3 +49,10 @@ suite "BlockHashOrNumber":
Json.roundtripTest BlockHashOrNumber(isHash: false, number: 1209231231),
"\"1209231231\""
test "EIP-4399 prevRandao field":
let hash = Hash256.fromHex "0x7a64245f7f95164f6176d90bd4903dbdd3e5433d555dd1385e81787f9672c588"
var blk: BlockHeader
blk.prevRandao = hash
let res = blk.prevRandao
check hash == res

View File

@ -225,3 +225,11 @@ procSuite "SqStoreRef":
check:
selectRes.isOk and selectRes.get == true
abc == val
var found = false
var row: selectStmt.Result
for rowRes in selectStmt.exec(row):
rowRes.expect("working db")
check abc == row
found = true
check found

View File

@ -9,26 +9,31 @@ export net
proc localAddress*(port: int): Address =
Address(ip: ValidIpAddress.init("127.0.0.1"), port: Port(port))
proc initDiscoveryNode*(rng: ref BrHmacDrbgContext, privKey: PrivateKey,
address: Address,
bootstrapRecords: openArray[Record] = [],
localEnrFields: openArray[(string, seq[byte])] = [],
previousRecord = none[enr.Record]()):
discv5_protocol.Protocol =
proc initDiscoveryNode*(
rng: ref BrHmacDrbgContext,
privKey: PrivateKey,
address: Address,
bootstrapRecords: openArray[Record] = [],
localEnrFields: openArray[(string, seq[byte])] = [],
previousRecord = none[enr.Record]()):
discv5_protocol.Protocol =
# set bucketIpLimit to allow bucket split
let tableIpLimits = TableIpLimits(tableIpLimit: 1000, bucketIpLimit: 24)
let config = DiscoveryConfig.init(1000, 24, 5)
result = newProtocol(privKey,
some(address.ip),
some(address.port), some(address.port),
bindPort = address.port,
bootstrapRecords = bootstrapRecords,
localEnrFields = localEnrFields,
previousRecord = previousRecord,
tableIpLimits = tableIpLimits,
rng = rng)
let protocol = newProtocol(
privKey,
some(address.ip),
some(address.port), some(address.port),
bindPort = address.port,
bootstrapRecords = bootstrapRecords,
localEnrFields = localEnrFields,
previousRecord = previousRecord,
config = config,
rng = rng)
result.open()
protocol.open()
protocol
proc nodeIdInNodes*(id: NodeId, nodes: openArray[Node]): bool =
for n in nodes:

View File

@ -688,7 +688,7 @@ suite "Discovery v5 Tests":
rng, PrivateKey.random(rng[]), localAddress(20302))
node2 = initDiscoveryNode(
rng, PrivateKey.random(rng[]), localAddress(20303))
talkresp = await discv5_protocol.talkreq(node1, node2.localNode,
talkresp = await discv5_protocol.talkReq(node1, node2.localNode,
@[byte 0x01], @[])
check:
@ -713,7 +713,7 @@ suite "Discovery v5 Tests":
let echoProtocol = TalkProtocol(protocolHandler: handler)
check node2.registerTalkProtocol(talkProtocol, echoProtocol).isOk()
let talkresp = await discv5_protocol.talkreq(node1, node2.localNode,
let talkresp = await discv5_protocol.talkReq(node1, node2.localNode,
talkProtocol, "hello".toBytes())
check:

View File

@ -65,8 +65,8 @@ suite "Discovery v5.1 Protocol Message Encodings":
let message = decoded.get()
check:
message.reqId == reqId
message.kind == findnode
message.findnode.distances == distances
message.kind == findNode
message.findNode.distances == distances
test "Nodes Response (empty)":
let
@ -125,9 +125,9 @@ suite "Discovery v5.1 Protocol Message Encodings":
let message = decoded.get()
check:
message.reqId == reqId
message.kind == talkreq
message.talkreq.protocol == "echo".toBytes()
message.talkreq.request == "hi".toBytes()
message.kind == talkReq
message.talkReq.protocol == "echo".toBytes()
message.talkReq.request == "hi".toBytes()
test "Talk Response":
let
@ -143,8 +143,8 @@ suite "Discovery v5.1 Protocol Message Encodings":
let message = decoded.get()
check:
message.reqId == reqId
message.kind == talkresp
message.talkresp.response == "hi".toBytes()
message.kind == talkResp
message.talkResp.response == "hi".toBytes()
test "Ping with too large RequestId":
let

View File

@ -12,7 +12,7 @@
import
std/[net, options],
unittest2,
../../eth/p2p/enode
../../eth/p2p/[enode, bootnodes]
suite "ENode":
test "Go-Ethereum tests":
@ -92,3 +92,20 @@ suite "ENode":
check (results[index].isSome and res.error == results[index].get) or
res.isOk
test "Bootnodes test":
proc runBNTest(bns: openArray[string]): bool =
for z in bns:
let res = ENode.fromString(z)
if res.isErr: return false
true
check runBNTest(MainnetBootnodes)
check runBNTest(RopstenBootnodes)
check runBNTest(RinkebyBootnodes)
check runBNTest(GoerliBootnodes)
check runBNTest(DiscoveryV5Bootnodes)
check runBNTest(KovanBootnodes)
check runBNTest(StatusBootNodes)
check runBNTest(StatusBootNodesStaging)
check runBNTest(StatusBootNodesTest)

View File

@ -15,4 +15,5 @@ import
./test_utp_socket,
./test_utp_socket_sack,
./test_utp_router,
./test_clock_drift_calculator
./test_clock_drift_calculator,
./test_protocol_integration

View File

@ -7,37 +7,15 @@
{.used.}
import
std/options,
chronos, bearssl,
stew/shims/net, stew/byteutils,
testutils/unittests,
../../eth/p2p/discoveryv5/[enr, node, routing_table],
../../eth/p2p/discoveryv5/protocol as discv5_protocol,
../../eth/utp/utp_discv5_protocol,
../../eth/keys
proc localAddress*(port: int): Address =
Address(ip: ValidIpAddress.init("127.0.0.1"), port: Port(port))
proc initDiscoveryNode*(rng: ref BrHmacDrbgContext,
privKey: PrivateKey,
address: Address,
bootstrapRecords: openArray[Record] = [],
localEnrFields: openArray[(string, seq[byte])] = [],
previousRecord = none[enr.Record]()): discv5_protocol.Protocol =
# set bucketIpLimit to allow bucket split
let tableIpLimits = TableIpLimits(tableIpLimit: 1000, bucketIpLimit: 24)
result = newProtocol(privKey,
some(address.ip),
some(address.port), some(address.port),
bindPort = address.port,
bootstrapRecords = bootstrapRecords,
localEnrFields = localEnrFields,
previousRecord = previousRecord,
tableIpLimits = tableIpLimits,
rng = rng)
result.open()
../../eth/keys,
../p2p/discv5_test_helper
proc generateByteArray(rng: var BrHmacDrbgContext, length: int): seq[byte] =
var bytes = newSeq[byte](length)
@ -48,15 +26,15 @@ procSuite "Utp protocol over discovery v5 tests":
let rng = newRng()
let utpProtId = "test-utp".toBytes()
proc registerIncomingSocketCallback(serverSockets: AsyncQueue): AcceptConnectionCallback[Node] =
proc registerIncomingSocketCallback(serverSockets: AsyncQueue): AcceptConnectionCallback[NodeAddress] =
return (
proc(server: UtpRouter[Node], client: UtpSocket[Node]): Future[void] =
proc(server: UtpRouter[NodeAddress], client: UtpSocket[NodeAddress]): Future[void] =
serverSockets.addLast(client)
)
proc allowOneIdCallback(allowedId: uint16): AllowConnectionCallback[Node] =
proc allowOneIdCallback(allowedId: uint16): AllowConnectionCallback[NodeAddress] =
return (
proc(r: UtpRouter[Node], remoteAddress: Node, connectionId: uint16): bool =
proc(r: UtpRouter[NodeAddress], remoteAddress: NodeAddress, connectionId: uint16): bool =
connectionId == allowedId
)
@ -64,7 +42,7 @@ procSuite "Utp protocol over discovery v5 tests":
# from standard utp case
asyncTest "Success connect to remote host":
let
queue = newAsyncQueue[UtpSocket[Node]]()
queue = newAsyncQueue[UtpSocket[NodeAddress]]()
node1 = initDiscoveryNode(
rng, PrivateKey.random(rng[]), localAddress(20302))
node2 = initDiscoveryNode(
@ -73,12 +51,11 @@ procSuite "Utp protocol over discovery v5 tests":
utp1 = UtpDiscv5Protocol.new(node1, utpProtId, registerIncomingSocketCallback(queue))
utp2 = UtpDiscv5Protocol.new(node2, utpProtId, registerIncomingSocketCallback(queue))
# nodes must know about each other
# nodes must have session between each other
check:
node1.addNode(node2.localNode)
node2.addNode(node1.localNode)
(await node1.ping(node2.localNode)).isOk()
let clientSocketResult = await utp1.connectTo(node2.localNode)
let clientSocketResult = await utp1.connectTo(NodeAddress.init(node2.localNode).unsafeGet())
let clientSocket = clientSocketResult.get()
let serverSocket = await queue.get()
@ -95,7 +72,7 @@ procSuite "Utp protocol over discovery v5 tests":
asyncTest "Success write data over packet size to remote host":
let
queue = newAsyncQueue[UtpSocket[Node]]()
queue = newAsyncQueue[UtpSocket[NodeAddress]]()
node1 = initDiscoveryNode(
rng, PrivateKey.random(rng[]), localAddress(20302))
node2 = initDiscoveryNode(
@ -104,13 +81,12 @@ procSuite "Utp protocol over discovery v5 tests":
utp1 = UtpDiscv5Protocol.new(node1, utpProtId, registerIncomingSocketCallback(queue))
utp2 = UtpDiscv5Protocol.new(node2, utpProtId, registerIncomingSocketCallback(queue))
# nodes must know about each other
# nodes must have session between each other
check:
node1.addNode(node2.localNode)
node2.addNode(node1.localNode)
(await node1.ping(node2.localNode)).isOk()
let numOfBytes = 5000
let clientSocketResult = await utp1.connectTo(node2.localNode)
let clientSocketResult = await utp1.connectTo(NodeAddress.init(node2.localNode).unsafeGet())
let clientSocket = clientSocketResult.get()
let serverSocket = await queue.get()
@ -135,7 +111,7 @@ procSuite "Utp protocol over discovery v5 tests":
let
allowedId: uint16 = 10
lowSynTimeout = milliseconds(500)
queue = newAsyncQueue[UtpSocket[Node]]()
queue = newAsyncQueue[UtpSocket[NodeAddress]]()
node1 = initDiscoveryNode(
rng, PrivateKey.random(rng[]), localAddress(20302))
node2 = initDiscoveryNode(
@ -154,13 +130,12 @@ procSuite "Utp protocol over discovery v5 tests":
allowOneIdCallback(allowedId),
SocketConfig.init())
# nodes must know about each other
# nodes must have session between each other
check:
node1.addNode(node2.localNode)
node2.addNode(node1.localNode)
(await node1.ping(node2.localNode)).isOk()
let clientSocketResult1 = await utp1.connectTo(node2.localNode, allowedId)
let clientSocketResult2 = await utp1.connectTo(node2.localNode, allowedId + 1)
let clientSocketResult1 = await utp1.connectTo(NodeAddress.init(node2.localNode).unsafeGet(), allowedId)
let clientSocketResult2 = await utp1.connectTo(NodeAddress.init(node2.localNode).unsafeGet(), allowedId + 1)
check:
clientSocketResult1.isOk()
@ -180,7 +155,7 @@ procSuite "Utp protocol over discovery v5 tests":
asyncTest "Configure incoming connections to be in connected state":
let
queue = newAsyncQueue[UtpSocket[Node]]()
queue = newAsyncQueue[UtpSocket[NodeAddress]]()
node1 = initDiscoveryNode(
rng, PrivateKey.random(rng[]), localAddress(20302))
node2 = initDiscoveryNode(
@ -194,12 +169,11 @@ procSuite "Utp protocol over discovery v5 tests":
socketConfig = SocketConfig.init(incomingSocketReceiveTimeout = none[Duration]())
)
# nodes must know about each other
# nodes must have session between each other
check:
node1.addNode(node2.localNode)
node2.addNode(node1.localNode)
(await node1.ping(node2.localNode)).isOk()
let clientSocketResult = await utp1.connectTo(node2.localNode)
let clientSocketResult = await utp1.connectTo(NodeAddress.init(node2.localNode).unsafeGet())
let clientSocket = clientSocketResult.get()
let serverSocket = await queue.get()

View File

@ -0,0 +1,230 @@
# Copyright (c) 2022 Status Research & Development GmbH
# Licensed and distributed under either of
# * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT).
# * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0).
# at your option. This file may not be copied, modified, or distributed except according to those terms.
{.used.}
import
std/[sequtils, tables, options, sugar],
chronos, bearssl,
testutils/unittests,
./test_utils,
../../eth/utp/utp_router,
../../eth/utp/utp_protocol,
../../eth/keys,
../../eth/p2p/discoveryv5/random2
proc connectTillSuccess(p: UtpProtocol, to: TransportAddress, maxTries: int = 20): Future[UtpSocket[TransportAddress]] {.async.} =
var i = 0
while true:
let res = await p.connectTo(to)
if res.isOk():
return res.unsafeGet()
else:
inc i
if i >= maxTries:
raise newException(CatchableError, "Connection failed")
proc buildAcceptConnection(
t: ref Table[UtpSocketKey[TransportAddress], UtpSocket[TransportAddress]]
): AcceptConnectionCallback[TransportAddress] =
return (
proc (server: UtpRouter[TransportAddress], client: UtpSocket[TransportAddress]): Future[void] =
let fut = newFuture[void]()
let key = client.socketKey
t[key] = client
fut.complete()
return fut
)
proc getServerSocket(
t: ref Table[UtpSocketKey[TransportAddress], UtpSocket[TransportAddress]],
clientAddress: TransportAddress,
clientConnectionId: uint16): Option[UtpSocket[TransportAddress]] =
let serverSocketKey = UtpSocketKey[TransportAddress](remoteAddress: clientAddress, rcvId: clientConnectionId + 1)
let srvSocket = t.getOrDefault(serverSocketKey)
if srvSocket == nil:
return none[UtpSocket[TransportAddress]]()
else:
return some(srvSocket)
procSuite "Utp protocol over udp tests with loss and delays":
let rng = newRng()
proc sendBuilder(maxDelay: int, packetDropRate: int): SendCallbackBuilder =
return (
proc (d: DatagramTransport): SendCallback[TransportAddress] =
return (
proc (to: TransportAddress, data: seq[byte]): Future[void] {.async.} =
let i = rand(rng[], 99)
if i >= packetDropRate:
let delay = milliseconds(rand(rng[], maxDelay))
await sleepAsync(delay)
await d.sendTo(to, data)
)
)
proc testScenario(maxDelay: int, dropRate: int, cfg: SocketConfig = SocketConfig.init()):
Future[(
UtpProtocol,
UtpSocket[TransportAddress],
UtpProtocol,
UtpSocket[TransportAddress])
] {.async.} =
var connections1 = newTable[UtpSocketKey[TransportAddress], UtpSocket[TransportAddress]]()
let address1 = initTAddress("127.0.0.1", 9080)
let utpProt1 =
UtpProtocol.new(
buildAcceptConnection(connections1),
address1,
socketConfig = cfg,
sendCallbackBuilder = sendBuilder(maxDelay, dropRate),
rng = rng)
var connections2 = newTable[UtpSocketKey[TransportAddress], UtpSocket[TransportAddress]]()
let address2 = initTAddress("127.0.0.1", 9081)
let utpProt2 =
UtpProtocol.new(
buildAcceptConnection(connections2),
address2,
socketConfig = cfg,
sendCallbackBuilder = sendBuilder(maxDelay, dropRate),
rng = rng)
let clientSocket = await utpProt1.connectTillSuccess(address2)
let maybeServerSocket = connections2.getServerSocket(address1, clientSocket.socketKey.rcvId)
let serverSocket = maybeServerSocket.unsafeGet()
return (utpProt1, clientSocket, utpProt2, serverSocket)
type TestCase = object
# in miliseconds
maxDelay: int
dropRate: int
bytesToTransfer: int
bytesPerRead: int
cfg: SocketConfig
proc init(
T: type TestCase,
maxDelay: int,
dropRate: int,
bytesToTransfer: int,
cfg: SocketConfig = SocketConfig.init(),
bytesPerRead: int = 0): TestCase =
TestCase(maxDelay: maxDelay, dropRate: dropRate, bytesToTransfer: bytesToTransfer, cfg: cfg, bytesPerRead: bytesPerRead)
let testCases = @[
TestCase.init(45, 10, 40000),
TestCase.init(25, 15, 40000),
# super small recv buffer which will be constantly on the brink of being full
TestCase.init(15, 5, 40000, SocketConfig.init(optRcvBuffer = uint32(6000), remoteWindowResetTimeout = seconds(5))),
TestCase.init(15, 10, 40000, SocketConfig.init(optRcvBuffer = uint32(6000), remoteWindowResetTimeout = seconds(5)))
]
asyncTest "Write and Read large data in different network conditions":
for testCase in testCases:
let (
clientProtocol,
clientSocket,
serverProtocol,
serverSocket) = await testScenario(testCase.maxDelay, testCase.dropRate, testcase.cfg)
let smallBytes = 10
let smallBytesToTransfer = generateByteArray(rng[], smallBytes)
# first transfer and read to make server socket connecteced
let write1 = await clientSocket.write(smallBytesToTransfer)
let read1 = await serverSocket.read(smallBytes)
check:
write1.isOk()
read1 == smallBytesToTransfer
let numBytes = testCase.bytesToTransfer
let bytesToTransfer = generateByteArray(rng[], numBytes)
discard clientSocket.write(bytesToTransfer)
discard serverSocket.write(bytesToTransfer)
let serverReadFut = serverSocket.read(numBytes)
let clientReadFut = clientSocket.read(numBytes)
yield serverReadFut
yield clientReadFut
let clientRead = clientReadFut.read()
let serverRead = serverReadFut.read()
check:
clientRead == bytesToTransfer
serverRead == bytesToTransfer
await clientProtocol.shutdownWait()
await serverProtocol.shutdownWait()
let testCases1 = @[
# small buffers so it will fill up between reads
TestCase.init(15, 5, 40000, SocketConfig.init(optRcvBuffer = uint32(6000), remoteWindowResetTimeout = seconds(5)), 10000),
TestCase.init(15, 10, 40000, SocketConfig.init(optRcvBuffer = uint32(6000), remoteWindowResetTimeout = seconds(5)), 10000),
TestCase.init(15, 15, 40000, SocketConfig.init(optRcvBuffer = uint32(6000), remoteWindowResetTimeout = seconds(5)), 10000)
]
proc readWithMultipleReads(s: UtpSocket[TransportAddress], numOfReads: int, bytesPerRead: int): Future[seq[byte]] {.async.}=
var i = 0
var res: seq[byte] = @[]
while i < numOfReads:
let bytes = await s.read(bytesPerRead)
res.add(bytes)
inc i
return res
asyncTest "Write and Read large data in different network conditions split over several reads":
for testCase in testCases1:
let (
clientProtocol,
clientSocket,
serverProtocol,
serverSocket) = await testScenario(testCase.maxDelay, testCase.dropRate, testcase.cfg)
let smallBytes = 10
let smallBytesToTransfer = generateByteArray(rng[], smallBytes)
# first transfer and read to make server socket connecteced
let write1 = await clientSocket.write(smallBytesToTransfer)
let read1 = await serverSocket.read(smallBytes)
check:
read1 == smallBytesToTransfer
let numBytes = testCase.bytesToTransfer
let bytesToTransfer = generateByteArray(rng[], numBytes)
discard clientSocket.write(bytesToTransfer)
discard serverSocket.write(bytesToTransfer)
let numOfReads = int(testCase.bytesToTransfer / testCase.bytesPerRead)
let serverReadFut = serverSocket.readWithMultipleReads(numOfReads, testCase.bytesPerRead)
let clientReadFut = clientSocket.readWithMultipleReads(numOfReads, testCase.bytesPerRead)
yield serverReadFut
yield clientReadFut
let clientRead = clientReadFut.read()
let serverRead = serverReadFut.read()
check:
clientRead == bytesToTransfer
serverRead == bytesToTransfer
await clientProtocol.shutdownWait()
await serverProtocol.shutdownWait()

View File

@ -44,7 +44,7 @@ template connectOutGoingSocket*(
)
await sock1.processPacket(responseAck)
await waitUntil(proc (): bool = sock1.isConnected())
check:
sock1.isConnected()
@ -72,12 +72,14 @@ template connectOutGoingSocketWithIncoming*(
rng[]
)
await incomingSocket.startIncomingSocket()
incomingSocket.startIncomingSocket()
let responseAck = await incomingQueue.get()
await outgoingSocket.processPacket(responseAck)
await waitUntil(proc (): bool = outgoingSocket.isConnected())
check:
outgoingSocket.isConnected()

View File

@ -188,6 +188,8 @@ procSuite "Utp router unit tests":
await router.processIncomingBytes(encodedData, testSender)
await waitUntil(proc (): bool = socket.numOfEventsInEventQueue() == 0)
check:
socket.isConnected()
@ -350,8 +352,8 @@ procSuite "Utp router unit tests":
check:
connectResult.isErr()
connectResult.error().kind == ErrorWhileSendingSyn
cast[TestError](connectResult.error().error) is TestError
# even though send is failing we will just finish with timeout,
connectResult.error().kind == ConnectionTimedOut
router.len() == 0
asyncTest "Router should clear closed outgoing connections":

View File

@ -295,7 +295,6 @@ procSuite "Utp socket unit test":
await outgoingSocket.destroyWait()
asyncTest "Ignoring totally out of order packet":
# TODO test is valid until implementing selective acks
let q = newAsyncQueue[Packet]()
let initalRemoteSeqNr = 10'u16
@ -305,11 +304,11 @@ procSuite "Utp socket unit test":
await outgoingSocket.processPacket(packets[1024])
check:
outgoingSocket.numPacketsInReordedBuffer() == 0
await outgoingSocket.processPacket(packets[1023])
# give some time to process those packets
await sleepAsync(milliseconds(500))
check:
outgoingSocket.numPacketsInReordedBuffer() == 1
@ -349,6 +348,8 @@ procSuite "Utp socket unit test":
await outgoingSocket.processPacket(responseAck)
await waitUntil(proc (): bool = outgoingSocket.numPacketsInOutGoingBuffer() == 0)
check:
outgoingSocket.numPacketsInOutGoingBuffer() == 0
@ -370,7 +371,7 @@ procSuite "Utp socket unit test":
)
await socket.processPacket(ack)
except CancelledError:
echo "foo"
discard
asyncTest "Hitting RTO timeout with packets in flight should not decay window":
let q = newAsyncQueue[Packet]()
@ -427,7 +428,7 @@ procSuite "Utp socket unit test":
let dataToWrite1 = @[0'u8]
let dataToWrite2 = @[1'u8]
let (outgoingSocket, initialPacket) = connectOutGoingSocket(initialRemoteSeq, q, 0)
let (outgoingSocket, initialPacket) = connectOutGoingSocket(initialRemoteSeq, q, cfg = SocketConfig.init(optSndBuffer = 0))
let writeFut1 = outgoingSocket.write(dataToWrite1)
let writeFut2 = outgoingSocket.write(dataToWrite2)
@ -531,6 +532,8 @@ procSuite "Utp socket unit test":
await outgoingSocket.processPacket(responseAck)
await waitUntil(proc (): bool = outgoingSocket.isConnected())
check:
outgoingSocket.isConnected()
@ -768,6 +771,8 @@ procSuite "Utp socket unit test":
await outgoingSocket.processPacket(responseAck)
await waitUntil(proc (): bool = not outgoingSocket.isConnected())
check:
not outgoingSocket.isConnected()
@ -1005,6 +1010,8 @@ procSuite "Utp socket unit test":
await outgoingSocket.processPacket(responseAck)
await waitUntil(proc (): bool = int(outgoingSocket.numOfBytesInFlight) == len(dataToWrite))
check:
# only first packet has been acked so there should still by 5 bytes left
int(outgoingSocket.numOfBytesInFlight) == len(dataToWrite)
@ -1052,18 +1059,18 @@ procSuite "Utp socket unit test":
let q = newAsyncQueue[Packet]()
let initialRemoteSeq = 10'u16
let dataToWrite = @[1'u8, 2, 3, 4, 5]
let dataToWrite = generateByteArray(rng[], 1001)
# remote is initialized with buffer to small to handle whole payload
let (outgoingSocket, initialPacket) = connectOutGoingSocket(initialRemoteSeq, q, uint32(len(dataToWrite) - 1))
let (outgoingSocket, initialPacket) = connectOutGoingSocket(initialRemoteSeq, q, cfg = SocketConfig.init(optSndBuffer = 1000))
let writeFut = outgoingSocket.write(dataToWrite)
# wait some time to check future is not finished
await sleepAsync(seconds(2))
# write is not finished as future is blocked from progressing due to to small
# send window
# write is not finished as future is blocked from progressing due to to full
# send buffer
check:
not writeFut.finished()
@ -1071,20 +1078,18 @@ procSuite "Utp socket unit test":
ackPacket(
initialRemoteSeq,
initialPacket.header.connectionId,
initialPacket.header.seqNr,
uint32(len(dataToWrite)),
initialPacket.header.seqNr + 1,
testBufferSize,
0
)
await outgoingSocket.processPacket(someAckFromRemote)
# after processing packet with increased buffer size write should complete and
# packet should be sent
let sentPacket = await q.get()
# only after processing ack write will progress
let writeResult = await writeFut
check:
sentPacket.payload == dataToWrite
writeFut.finished()
writeResult.isOK()
await outgoingSocket.destroyWait()
@ -1092,30 +1097,21 @@ procSuite "Utp socket unit test":
let q = newAsyncQueue[Packet]()
let initialRemoteSeq = 10'u16
let dataToWrite = @[1'u8, 2, 3, 4, 5]
let dataToWirte = 1160
# remote is initialized with buffer to small to handle whole payload
let (outgoingSocket, initialPacket) = connectOutGoingSocket(initialRemoteSeq, q)
let remoteRcvWindowSize = uint32(outgoingSocket.getPacketSize())
let someAckFromRemote =
ackPacket(
initialRemoteSeq,
initialPacket.header.connectionId,
initialPacket.header.seqNr,
remoteRcvWindowSize,
0
)
let (outgoingSocket, initialPacket) = connectOutGoingSocket(initialRemoteSeq, q, cfg = SocketConfig.init(optSndBuffer = 1160))
# we are using ack from remote to setup our snd window size to one packet size on one packet
await outgoingSocket.processPacket(someAckFromRemote)
let twoPacketData = generateByteArray(rng[], int(dataToWirte))
let twoPacketData = generateByteArray(rng[], int(2 * remoteRcvWindowSize))
let writeResult = await outgoingSocket.write(twoPacketData)
check:
writeResult.isOk()
# this write will not progress as snd buffer is full
let writeFut = outgoingSocket.write(twoPacketData)
# after this time first packet will be send and will timeout, but the write should not
# finish, as timeouting packets do not notify writing about new space in snd
# buffer
# we wait for packets to timeout
await sleepAsync(seconds(2))
check:
@ -1162,15 +1158,22 @@ procSuite "Utp socket unit test":
check:
packet.header.pType == ST_DATA
uint32(len(packet.payload)) == remoteRcvWindowSize
not writeFut.finished
let packet1Fut = q.get()
await sleepAsync(milliseconds(500))
check:
not packet1Fut.finished()
await outgoingSocket.processPacket(firstAckFromRemote)
let packet1 = await q.get()
let writeResult = await writeFut
# packet is sent only after first packet is acked
let packet1 = await packet1Fut
check:
packet1.header.pType == ST_DATA
packet1.header.seqNr == packet.header.seqNr + 1
writeFut.finished
await outgoingSocket.destroyWait()
@ -1192,19 +1195,10 @@ procSuite "Utp socket unit test":
check:
outgoingSocket.isConnected()
let writeFut = outgoingSocket.write(someData)
await sleepAsync(seconds(1))
check:
# Even after 1 second write is not finished as we did not receive any message
# so remote rcv window is still zero
not writeFut.finished()
# Ultimately, after 3 second remote rcv window will be reseted to minimal value
# and write will be able to progress
let writeResult = await writeFut
# write result will be successfull as send buffer has space
let writeResult = await outgoingSocket.write(someData)
# this will finish in seconds(3) as only after this time window will be set to min value
let p = await q.get()
check:
@ -1331,6 +1325,8 @@ procSuite "Utp socket unit test":
await outgoingSocket.processPacket(dataP1)
let fastResend = await q.get()
let ack = await q.get()
check:
@ -1345,3 +1341,69 @@ procSuite "Utp socket unit test":
thirdSend.header.ackNr > secondSend.header.ackNr
await outgoingSocket.destroyWait()
asyncTest "Should support fast timeout ":
let q = newAsyncQueue[Packet]()
let initialRemoteSeq = 10'u16
# small writes to make sure it will be 3 different packets
let dataToWrite1 = @[1'u8]
let dataToWrite2 = @[1'u8]
let dataToWrite3 = @[1'u8]
let (outgoingSocket, initialPacket) = connectOutGoingSocket(initialRemoteSeq, q)
let writeRes1 = await outgoingSocket.write(dataToWrite1)
let writeRes2 = await outgoingSocket.write(dataToWrite2)
let writeRes3 = await outgoingSocket.write(dataToWrite3)
check:
writeRes1.isOk()
writeRes2.isOk()
writeRes3.isOk()
# drain queue of all sent packets
let sent1 = await q.get()
let sent2 = await q.get()
let sent3 = await q.get()
# wait for first timeout. Socket will enter fast timeout mode
let reSent1 = await q.get()
check:
# check that re-sent packet is the oldest one
reSent1.payload == sent1.payload
reSent1.header.seqNr == sent1.header.seqNr
# ack which will ack our re-sent packet
let responseAck =
ackPacket(
initialRemoteSeq,
initialPacket.header.connectionId,
reSent1.header.seqNr,
testBufferSize,
0
)
await outgoingSocket.processPacket(responseAck)
let fastResentPacket = await q.get()
check:
# second packet is now oldest unacked packet so it should be the one which
# is send during fast resend
fastResentPacket.payload == sent2.payload
fastResentPacket.header.seqNr == sent2.header.seqNr
# duplicate ack, processing it should not fast-resend any packet
await outgoingSocket.processPacket(responseAck)
let resent3 = await q.get()
check:
# in next timeout cycle packet nr3 is the only one waiting for re-send
resent3.payload == sent3.payload
resent3.header.seqNr == sent3.header.seqNr
await outgoingSocket.destroyWait()

View File

@ -48,6 +48,8 @@ procSuite "Utp socket selective acks unit test":
for p in dataPackets:
await outgoingSocket.processPacket(p)
await waitUntil(proc (): bool = outgoingSocket.numOfEventsInEventQueue() == 0)
let extArray = outgoingSocket.generateSelectiveAckBitMask()
await outgoingSocket.destroyWait()
@ -133,6 +135,9 @@ procSuite "Utp socket selective acks unit test":
# indexes of packets which should be delivered to remote
packetsDelivered: seq[int]
# indexes of packets which should be re-sent in resend testcases
packetsResent: seq[int]
let selectiveAckTestCases = @[
TestCase(numOfPackets: 2, packetsDelivered: @[1]),
TestCase(numOfPackets: 10, packetsDelivered: @[1, 3, 5, 7, 9]),
@ -144,6 +149,33 @@ procSuite "Utp socket selective acks unit test":
TestCase(numOfPackets: 33, packetsDelivered: toSeq(1..32))
]
proc setupTestCase(
dataToWrite: seq[byte],
initialRemoteSeq: uint16,
outgoingQueue: AsyncQueue[Packet],
incomingQueue: AsyncQueue[Packet],
testCase: TestCase): Future[(UtpSocket[TransportAddress], UtpSocket[TransportAddress], seq[Packet])] {.async.} =
let (outgoingSocket, incomingSocket) =
connectOutGoingSocketWithIncoming(
initialRemoteSeq,
outgoingQueue,
incomingQueue
)
var packets: seq[Packet] = @[]
for _ in 0..<testCase.numOfPackets:
discard await outgoingSocket.write(dataToWrite)
let packet = await outgoingQueue.get()
packets.add(packet)
for toDeliver in testCase.packetsDelivered:
await incomingSocket.processPacket(packets[toDeliver])
await waitUntil(proc (): bool = incomingSocket.numOfEventsInEventQueue() == 0)
return (outgoingSocket, incomingSocket, packets)
asyncTest "Socket should calculate number of bytes acked by selective acks":
let dataSize = 10
let initialRemoteSeq = 10'u16
@ -152,23 +184,14 @@ procSuite "Utp socket selective acks unit test":
for testCase in selectiveAckTestCases:
let outgoingQueue = newAsyncQueue[Packet]()
let incomingQueue = newAsyncQueue[Packet]()
let (outgoingSocket, incomingSocket) =
connectOutGoingSocketWithIncoming(
initialRemoteSeq,
outgoingQueue,
incomingQueue
)
var packets: seq[Packet] = @[]
for _ in 0..<testCase.numOfPackets:
discard await outgoingSocket.write(smallData)
let packet = await outgoingQueue.get()
packets.add(packet)
for toDeliver in testCase.packetsDelivered:
await incomingSocket.processPacket(packets[toDeliver])
let (outgoingSocket, incomingSocket, _) = await setupTestCase(
smallData,
initialRemoteSeq,
outgoingQueue,
incomingQueue,
testCase
)
let finalAck = incomingSocket.generateAckPacket()
@ -201,22 +224,13 @@ procSuite "Utp socket selective acks unit test":
let outgoingQueue = newAsyncQueue[Packet]()
let incomingQueue = newAsyncQueue[Packet]()
let (outgoingSocket, incomingSocket) =
connectOutGoingSocketWithIncoming(
initialRemoteSeq,
outgoingQueue,
incomingQueue
)
var packets: seq[Packet] = @[]
for _ in 0..<testCase.numOfPackets:
discard await outgoingSocket.write(smallData)
let packet = await outgoingQueue.get()
packets.add(packet)
for toDeliver in testCase.packetsDelivered:
await incomingSocket.processPacket(packets[toDeliver])
let (outgoingSocket, incomingSocket, _) = await setupTestCase(
smallData,
initialRemoteSeq,
outgoingQueue,
incomingQueue,
testCase
)
let finalAck = incomingSocket.generateAckPacket()
@ -237,8 +251,78 @@ procSuite "Utp socket selective acks unit test":
await outgoingSocket.processPacket(finalAck)
let expectedPackets = testCase.numOfPackets - len(testCase.packetsDelivered)
await waitUntil(proc (): bool = outgoingSocket.numPacketsInOutGoingBuffer() == expectedPackets)
check:
outgoingSocket.numPacketsInOutGoingBuffer() == testCase.numOfPackets - len(testCase.packetsDelivered)
outgoingSocket.numPacketsInOutGoingBuffer() == expectedPackets
await outgoingSocket.destroyWait()
await incomingSocket.destroyWait()
let packetResendTestCases = @[
TestCase(numOfPackets: 4, packetsDelivered: @[2, 3], packetsResent: @[]),
TestCase(numOfPackets: 4, packetsDelivered: @[1, 2, 3], packetsResent: @[0]),
TestCase(numOfPackets: 5, packetsDelivered: @[2, 3, 4], packetsResent: @[0, 1]),
TestCase(numOfPackets: 6, packetsDelivered: @[3, 4, 5], packetsResent: @[0, 1, 2]),
TestCase(numOfPackets: 7, packetsDelivered: @[4, 5, 6], packetsResent: @[0, 1, 2, 3]),
TestCase(numOfPackets: 8, packetsDelivered: @[5, 6, 7], packetsResent: @[0, 1, 2, 3]),
TestCase(numOfPackets: 10, packetsDelivered: @[3, 7, 8], packetsResent: @[0, 1, 2]),
TestCase(numOfPackets: 10, packetsDelivered: @[1, 2, 3, 7, 8, 9], packetsResent: @[0, 4, 5, 6]),
TestCase(numOfPackets: 10, packetsDelivered: @[1, 8, 9], packetsResent: @[0])
]
asyncTest "Socket should re-send packets when there are at least 3 packets acked ahead":
let dataSize = 10
let initialRemoteSeq = 10'u16
let smallData = generateByteArray(rng[], 10)
for testCase in packetResendTestCases:
let outgoingQueue = newAsyncQueue[Packet]()
let incomingQueue = newAsyncQueue[Packet]()
let (outgoingSocket, incomingSocket, initialPackets) = await setupTestCase(
smallData,
initialRemoteSeq,
outgoingQueue,
incomingQueue,
testCase
)
let initialBufferSize = outgoingSocket.currentMaxWindowSize()
let finalAck = incomingSocket.generateAckPacket()
check:
finalAck.eack.isSome()
let mask = finalAck.eack.unsafeGet().acks
let numOfDeliveredPackets = len(testCase.packetsDelivered)
check:
numOfSetBits(mask) == numOfDeliveredPackets
await outgoingSocket.processPacket(finalAck)
await waitUntil(proc (): bool = outgoingSocket.numOfEventsInEventQueue() == 0)
for idx in testCase.packetsResent:
let resentPacket = await outgoingQueue.get()
check:
resentPacket.header.seqNr == initialPackets[idx].header.seqNr
let endBufferSize = outgoingSocket.currentMaxWindowSize()
if len(testCase.packetsResent) == 0:
check:
# when there is no packet loss (no resent packets), buffer size increases
# due to packets acked by selective ack
endBufferSize > initialBufferSize
else:
check:
# due to ledbat congestion control we cannot assert on precise end buffer size,
# but due to packet loss we are sure it shoul be smaller that at the beginning
# becouse of 0.5 muliplayer
endBufferSize < initialBufferSize

View File

@ -2,7 +2,7 @@
# libtool - Provide generalized library-building support services.
# Generated automatically by config.status (libbacktrace) version-unused
# Libtool was configured on host fv-az190-646:
# Libtool was configured on host fv-az272-955:
# NOTE: Changes made to this file will be lost: look at ltmain.sh.
#
# Copyright (C) 1996, 1997, 1998, 1999, 2000, 2001, 2003, 2004, 2005,

Some files were not shown because too many files have changed in this diff Show More