mirror of https://github.com/status-im/nim-eth.git
Minor adjustments in utp_discv5_protocol (#459)
- Move SocketConfig parameter location - Reuse rng from disc5 protocol - add exports - Some whitespace clean-up
This commit is contained in:
parent
0f18272315
commit
26ab9b078e
|
@ -13,7 +13,7 @@ import
|
||||||
./utp_router,
|
./utp_router,
|
||||||
../keys
|
../keys
|
||||||
|
|
||||||
export utp_router
|
export utp_router, protocol
|
||||||
|
|
||||||
type UtpDiscv5Protocol* = ref object of TalkProtocol
|
type UtpDiscv5Protocol* = ref object of TalkProtocol
|
||||||
prot: protocol.Protocol
|
prot: protocol.Protocol
|
||||||
|
@ -26,8 +26,8 @@ proc hash(x: UtpSocketKey[Node]): Hash =
|
||||||
!$h
|
!$h
|
||||||
|
|
||||||
func `$`*(x: UtpSocketKey[Node]): string =
|
func `$`*(x: UtpSocketKey[Node]): string =
|
||||||
"(remoteId: " & $x.remoteAddress.id &
|
"(remoteId: " & $x.remoteAddress.id &
|
||||||
", remoteAddress: " & $x.remoteAddress.address &
|
", remoteAddress: " & $x.remoteAddress.address &
|
||||||
", rcvId: "& $x.rcvId &
|
", rcvId: "& $x.rcvId &
|
||||||
")"
|
")"
|
||||||
|
|
||||||
|
@ -66,16 +66,15 @@ proc new*(
|
||||||
p: protocol.Protocol,
|
p: protocol.Protocol,
|
||||||
subProtocolName: seq[byte],
|
subProtocolName: seq[byte],
|
||||||
acceptConnectionCb: AcceptConnectionCallback[Node],
|
acceptConnectionCb: AcceptConnectionCallback[Node],
|
||||||
socketConfig: SocketConfig = SocketConfig.init(),
|
|
||||||
allowConnectionCb: AllowConnectionCallback[Node] = nil,
|
allowConnectionCb: AllowConnectionCallback[Node] = nil,
|
||||||
rng = newRng()): UtpDiscv5Protocol =
|
socketConfig: SocketConfig = SocketConfig.init()): UtpDiscv5Protocol =
|
||||||
doAssert(not(isNil(acceptConnectionCb)))
|
doAssert(not(isNil(acceptConnectionCb)))
|
||||||
|
|
||||||
let router = UtpRouter[Node].new(
|
let router = UtpRouter[Node].new(
|
||||||
acceptConnectionCb,
|
acceptConnectionCb,
|
||||||
allowConnectionCb,
|
allowConnectionCb,
|
||||||
socketConfig,
|
socketConfig,
|
||||||
rng
|
p.rng
|
||||||
)
|
)
|
||||||
router.sendCb = initSendCallback(p, subProtocolName)
|
router.sendCb = initSendCallback(p, subProtocolName)
|
||||||
|
|
||||||
|
|
|
@ -113,8 +113,8 @@ proc new*[A](
|
||||||
rng = newRng()): UtpRouter[A] =
|
rng = newRng()): UtpRouter[A] =
|
||||||
UtpRouter[A].new(acceptConnectionCb, nil, socketConfig, rng)
|
UtpRouter[A].new(acceptConnectionCb, nil, socketConfig, rng)
|
||||||
|
|
||||||
# There are different possiblites how connection was established, and we need to
|
# There are different possibilities on how the connection got established, need
|
||||||
# check every case
|
# to check every case.
|
||||||
proc getSocketOnReset[A](r: UtpRouter[A], sender: A, id: uint16): Option[UtpSocket[A]] =
|
proc getSocketOnReset[A](r: UtpRouter[A], sender: A, id: uint16): Option[UtpSocket[A]] =
|
||||||
# id is our recv id
|
# id is our recv id
|
||||||
let recvKey = UtpSocketKey[A].init(sender, id)
|
let recvKey = UtpSocketKey[A].init(sender, id)
|
||||||
|
|
|
@ -907,7 +907,7 @@ proc calculateSelectiveAckBytes*(socket: UtpSocket, receivedPackedAckNr: uint16
|
||||||
var ackedBytes = 0'u32
|
var ackedBytes = 0'u32
|
||||||
|
|
||||||
var bits = (len(ext.acks)) * 8 - 1
|
var bits = (len(ext.acks)) * 8 - 1
|
||||||
|
|
||||||
while bits >= 0:
|
while bits >= 0:
|
||||||
let v = base + uint16(bits)
|
let v = base + uint16(bits)
|
||||||
|
|
||||||
|
@ -920,12 +920,12 @@ proc calculateSelectiveAckBytes*(socket: UtpSocket, receivedPackedAckNr: uint16
|
||||||
if (maybePacket.isNone() or maybePacket.unsafeGet().transmissions == 0):
|
if (maybePacket.isNone() or maybePacket.unsafeGet().transmissions == 0):
|
||||||
dec bits
|
dec bits
|
||||||
continue
|
continue
|
||||||
|
|
||||||
let pkt = maybePacket.unsafeGet()
|
let pkt = maybePacket.unsafeGet()
|
||||||
|
|
||||||
if (getBit(ext.acks, bits)):
|
if (getBit(ext.acks, bits)):
|
||||||
ackedBytes = ackedBytes + pkt.payloadLength
|
ackedBytes = ackedBytes + pkt.payloadLength
|
||||||
|
|
||||||
dec bits
|
dec bits
|
||||||
|
|
||||||
return ackedBytes
|
return ackedBytes
|
||||||
|
@ -940,7 +940,7 @@ proc selectiveAckPackets(socket: UtpSocket, receivedPackedAckNr: uint16, ext: S
|
||||||
return
|
return
|
||||||
|
|
||||||
var bits = (len(ext.acks)) * 8 - 1
|
var bits = (len(ext.acks)) * 8 - 1
|
||||||
|
|
||||||
while bits >= 0:
|
while bits >= 0:
|
||||||
let v = base + uint16(bits)
|
let v = base + uint16(bits)
|
||||||
|
|
||||||
|
@ -953,12 +953,12 @@ proc selectiveAckPackets(socket: UtpSocket, receivedPackedAckNr: uint16, ext: S
|
||||||
if (maybePacket.isNone() or maybePacket.unsafeGet().transmissions == 0):
|
if (maybePacket.isNone() or maybePacket.unsafeGet().transmissions == 0):
|
||||||
dec bits
|
dec bits
|
||||||
continue
|
continue
|
||||||
|
|
||||||
let pkt = maybePacket.unsafeGet()
|
let pkt = maybePacket.unsafeGet()
|
||||||
|
|
||||||
if (getBit(ext.acks, bits)):
|
if (getBit(ext.acks, bits)):
|
||||||
discard socket.ackPacket(v, currentTime)
|
discard socket.ackPacket(v, currentTime)
|
||||||
|
|
||||||
dec bits
|
dec bits
|
||||||
|
|
||||||
# TODO Add handling of fast timeouts and duplicate acks counting
|
# TODO Add handling of fast timeouts and duplicate acks counting
|
||||||
|
@ -970,7 +970,7 @@ proc selectiveAckPackets(socket: UtpSocket, receivedPackedAckNr: uint16, ext: S
|
||||||
# The bitmask has reverse byte order. The first byte represents packets [ack_nr + 2, ack_nr + 2 + 7] in reverse order
|
# The bitmask has reverse byte order. The first byte represents packets [ack_nr + 2, ack_nr + 2 + 7] in reverse order
|
||||||
# The least significant bit in the byte represents ack_nr + 2, the most significant bit in the byte represents ack_nr + 2 + 7
|
# The least significant bit in the byte represents ack_nr + 2, the most significant bit in the byte represents ack_nr + 2 + 7
|
||||||
# The next byte in the mask represents [ack_nr + 2 + 8, ack_nr + 2 + 15] in reverse order, and so on
|
# The next byte in the mask represents [ack_nr + 2 + 8, ack_nr + 2 + 15] in reverse order, and so on
|
||||||
proc generateSelectiveAckBitMask*(socket: UtpSocket): array[4, byte] =
|
proc generateSelectiveAckBitMask*(socket: UtpSocket): array[4, byte] =
|
||||||
let window = min(32, socket.inBuffer.len())
|
let window = min(32, socket.inBuffer.len())
|
||||||
var arr: array[4, uint8] = [0'u8, 0, 0, 0]
|
var arr: array[4, uint8] = [0'u8, 0, 0, 0]
|
||||||
var i = 0
|
var i = 0
|
||||||
|
@ -981,7 +981,7 @@ proc generateSelectiveAckBitMask*(socket: UtpSocket): array[4, byte] =
|
||||||
return arr
|
return arr
|
||||||
|
|
||||||
# Generates ack packet based on current state of the socket.
|
# Generates ack packet based on current state of the socket.
|
||||||
proc generateAckPacket*(socket: UtpSocket): Packet =
|
proc generateAckPacket*(socket: UtpSocket): Packet =
|
||||||
let bitmask =
|
let bitmask =
|
||||||
if (socket.reorderCount != 0 and (not socket.reachedFin)):
|
if (socket.reorderCount != 0 and (not socket.reachedFin)):
|
||||||
some(socket.generateSelectiveAckBitMask())
|
some(socket.generateSelectiveAckBitMask())
|
||||||
|
@ -1017,16 +1017,16 @@ proc startIncomingSocket*(socket: UtpSocket) {.async.} =
|
||||||
# running
|
# running
|
||||||
proc processPacket*(socket: UtpSocket, p: Packet) {.async.} =
|
proc processPacket*(socket: UtpSocket, p: Packet) {.async.} =
|
||||||
|
|
||||||
debug "Process packet",
|
debug "Process packet",
|
||||||
socketKey = socket.socketKey,
|
socketKey = socket.socketKey,
|
||||||
packetType = p.header.pType,
|
packetType = p.header.pType,
|
||||||
seqNr = p.header.seqNr,
|
seqNr = p.header.seqNr,
|
||||||
ackNr = p.header.ackNr,
|
ackNr = p.header.ackNr,
|
||||||
timestamp = p.header.timestamp,
|
timestamp = p.header.timestamp,
|
||||||
timestampDiff = p.header.timestampDiff
|
timestampDiff = p.header.timestampDiff
|
||||||
|
|
||||||
let timestampInfo = getMonoTimestamp()
|
let timestampInfo = getMonoTimestamp()
|
||||||
|
|
||||||
if socket.isAckNrInvalid(p):
|
if socket.isAckNrInvalid(p):
|
||||||
debug "Received packet with invalid ack number",
|
debug "Received packet with invalid ack number",
|
||||||
ackNr = p.header.ackNr,
|
ackNr = p.header.ackNr,
|
||||||
|
@ -1178,7 +1178,7 @@ proc processPacket*(socket: UtpSocket, p: Packet) {.async.} =
|
||||||
debug "Received FIN packet",
|
debug "Received FIN packet",
|
||||||
eofPktNr = pkSeqNr,
|
eofPktNr = pkSeqNr,
|
||||||
curAckNr = socket.ackNr
|
curAckNr = socket.ackNr
|
||||||
|
|
||||||
socket.gotFin = true
|
socket.gotFin = true
|
||||||
socket.eofPktNr = pkSeqNr
|
socket.eofPktNr = pkSeqNr
|
||||||
|
|
||||||
|
|
|
@ -145,14 +145,14 @@ procSuite "Utp protocol over discovery v5 tests":
|
||||||
node1,
|
node1,
|
||||||
utpProtId,
|
utpProtId,
|
||||||
registerIncomingSocketCallback(queue),
|
registerIncomingSocketCallback(queue),
|
||||||
SocketConfig.init(lowSynTimeout))
|
socketConfig = SocketConfig.init(lowSynTimeout))
|
||||||
utp2 =
|
utp2 =
|
||||||
UtpDiscv5Protocol.new(
|
UtpDiscv5Protocol.new(
|
||||||
node2,
|
node2,
|
||||||
utpProtId,
|
utpProtId,
|
||||||
registerIncomingSocketCallback(queue),
|
registerIncomingSocketCallback(queue),
|
||||||
SocketConfig.init(),
|
allowOneIdCallback(allowedId),
|
||||||
allowOneIdCallback(allowedId))
|
SocketConfig.init())
|
||||||
|
|
||||||
# nodes must know about each other
|
# nodes must know about each other
|
||||||
check:
|
check:
|
||||||
|
@ -191,7 +191,7 @@ procSuite "Utp protocol over discovery v5 tests":
|
||||||
node2,
|
node2,
|
||||||
utpProtId,
|
utpProtId,
|
||||||
registerIncomingSocketCallback(queue),
|
registerIncomingSocketCallback(queue),
|
||||||
SocketConfig.init(incomingSocketReceiveTimeout = none[Duration]())
|
socketConfig = SocketConfig.init(incomingSocketReceiveTimeout = none[Duration]())
|
||||||
)
|
)
|
||||||
|
|
||||||
# nodes must know about each other
|
# nodes must know about each other
|
||||||
|
|
Loading…
Reference in New Issue