mirror of https://github.com/status-im/nim-eth.git
Fix chronos related deprecated warnings in discv5 and uTP (#667)
* Fix chronos related deprecated warnings in discv5 * Fix chronos related deprecated warnings in uTP code * Improve discv5 closeWait order of events and add noCancel
This commit is contained in:
parent
b75ee36576
commit
496bcdab74
|
@ -24,7 +24,6 @@ let commonParams =
|
||||||
" --warning[ObservableStores]:off" &
|
" --warning[ObservableStores]:off" &
|
||||||
" --styleCheck:usages --styleCheck:error" &
|
" --styleCheck:usages --styleCheck:error" &
|
||||||
" " & getEnv("NIMFLAGS") &
|
" " & getEnv("NIMFLAGS") &
|
||||||
" -d:chronosStrictException" &
|
|
||||||
" -d:chronicles_log_level=TRACE"
|
" -d:chronicles_log_level=TRACE"
|
||||||
|
|
||||||
proc runTest(path: string, release: bool = true) =
|
proc runTest(path: string, release: bool = true) =
|
||||||
|
|
|
@ -468,7 +468,7 @@ proc receive*(d: Protocol, a: Address, packet: openArray[byte]) =
|
||||||
trace "Packet decoding error", error = decoded.error, address = a
|
trace "Packet decoding error", error = decoded.error, address = a
|
||||||
|
|
||||||
proc processClient(transp: DatagramTransport, raddr: TransportAddress):
|
proc processClient(transp: DatagramTransport, raddr: TransportAddress):
|
||||||
Future[void] {.async.} =
|
Future[void] {.async: (raises: []).} =
|
||||||
let proto = getUserData[Protocol](transp)
|
let proto = getUserData[Protocol](transp)
|
||||||
|
|
||||||
# TODO: should we use `peekMessage()` to avoid allocation?
|
# TODO: should we use `peekMessage()` to avoid allocation?
|
||||||
|
@ -1036,28 +1036,20 @@ proc start*(d: Protocol) =
|
||||||
d.revalidateLoop = revalidateLoop(d)
|
d.revalidateLoop = revalidateLoop(d)
|
||||||
d.ipMajorityLoop = ipMajorityLoop(d)
|
d.ipMajorityLoop = ipMajorityLoop(d)
|
||||||
|
|
||||||
proc close*(d: Protocol) =
|
proc closeWait*(d: Protocol) {.async: (raises: []).} =
|
||||||
doAssert(not d.transp.closed)
|
doAssert(not d.transp.closed)
|
||||||
|
|
||||||
debug "Closing discovery node", node = d.localNode
|
debug "Closing discovery node", node = d.localNode
|
||||||
|
var futures: seq[Future[void]]
|
||||||
if not d.revalidateLoop.isNil:
|
if not d.revalidateLoop.isNil:
|
||||||
d.revalidateLoop.cancel()
|
futures.add(d.revalidateLoop.cancelAndWait())
|
||||||
if not d.refreshLoop.isNil:
|
if not d.refreshLoop.isNil:
|
||||||
d.refreshLoop.cancel()
|
futures.add(d.refreshLoop.cancelAndWait())
|
||||||
if not d.ipMajorityLoop.isNil:
|
if not d.ipMajorityLoop.isNil:
|
||||||
d.ipMajorityLoop.cancel()
|
futures.add(d.ipMajorityLoop.cancelAndWait())
|
||||||
|
|
||||||
d.transp.close()
|
await noCancel(allFutures(futures))
|
||||||
|
await noCancel(d.transp.closeWait())
|
||||||
|
|
||||||
proc closeWait*(d: Protocol) {.async.} =
|
proc close*(d: Protocol) {.deprecated: "Please use closeWait() instead".} =
|
||||||
doAssert(not d.transp.closed)
|
asyncSpawn d.closeWait()
|
||||||
|
|
||||||
debug "Closing discovery node", node = d.localNode
|
|
||||||
if not d.revalidateLoop.isNil:
|
|
||||||
await d.revalidateLoop.cancelAndWait()
|
|
||||||
if not d.refreshLoop.isNil:
|
|
||||||
await d.refreshLoop.cancelAndWait()
|
|
||||||
if not d.ipMajorityLoop.isNil:
|
|
||||||
await d.ipMajorityLoop.cancelAndWait()
|
|
||||||
|
|
||||||
await d.transp.closeWait()
|
|
||||||
|
|
|
@ -738,7 +738,7 @@ proc destroy*(s: UtpSocket) =
|
||||||
## Moves socket to destroy state and clean all resources.
|
## Moves socket to destroy state and clean all resources.
|
||||||
## Remote is not notified in any way about socket end of life.
|
## Remote is not notified in any way about socket end of life.
|
||||||
s.state = Destroy
|
s.state = Destroy
|
||||||
s.eventLoop.cancel()
|
s.eventLoop.cancelSoon()
|
||||||
# This procedure initiate cleanup process which goes like:
|
# This procedure initiate cleanup process which goes like:
|
||||||
# Cancel EventLoop -> Cancel timeoutsLoop -> Fire closeEvent
|
# Cancel EventLoop -> Cancel timeoutsLoop -> Fire closeEvent
|
||||||
# This is necessary due to how evenLoop look like i.e it has only one await
|
# This is necessary due to how evenLoop look like i.e it has only one await
|
||||||
|
@ -1762,7 +1762,7 @@ proc eventLoop(socket: UtpSocket) {.async.} =
|
||||||
socket.pendingWrites.clear()
|
socket.pendingWrites.clear()
|
||||||
socket.pendingReads.clear()
|
socket.pendingReads.clear()
|
||||||
# main eventLoop has been cancelled, try to cancel `checkTimeoutsLoop`
|
# main eventLoop has been cancelled, try to cancel `checkTimeoutsLoop`
|
||||||
socket.checkTimeoutsLoop.cancel()
|
socket.checkTimeoutsLoop.cancelSoon()
|
||||||
trace "main socket event loop cancelled"
|
trace "main socket event loop cancelled"
|
||||||
raise exc
|
raise exc
|
||||||
|
|
||||||
|
|
|
@ -48,7 +48,7 @@ procSuite "uTP socket":
|
||||||
initialPacket.header.wndSize == defaultConfig.optRcvBuffer
|
initialPacket.header.wndSize == defaultConfig.optRcvBuffer
|
||||||
|
|
||||||
await socket.destroyWait()
|
await socket.destroyWait()
|
||||||
fut.cancel()
|
fut.cancelSoon()
|
||||||
|
|
||||||
asyncTest "Outgoing socket should re-send SYN packet 2 times before declaring failure":
|
asyncTest "Outgoing socket should re-send SYN packet 2 times before declaring failure":
|
||||||
let q = newAsyncQueue[Packet]()
|
let q = newAsyncQueue[Packet]()
|
||||||
|
@ -82,7 +82,7 @@ procSuite "uTP socket":
|
||||||
not socket.isConnected()
|
not socket.isConnected()
|
||||||
|
|
||||||
await socket.destroyWait()
|
await socket.destroyWait()
|
||||||
fut1.cancel()
|
fut1.cancelSoon()
|
||||||
|
|
||||||
asyncTest "Processing in order ack should make socket connected":
|
asyncTest "Processing in order ack should make socket connected":
|
||||||
let q = newAsyncQueue[Packet]()
|
let q = newAsyncQueue[Packet]()
|
||||||
|
|
|
@ -183,7 +183,7 @@ proc discover(d: discv5_protocol.Protocol, psFile: string) {.async.} =
|
||||||
let newLine = str % [pubkey.get().toHex, dNode.id.toHex, forkDigest[0..3].toHex, $dNode.address.get(), attnets.get().toHex, $bits]
|
let newLine = str % [pubkey.get().toHex, dNode.id.toHex, forkDigest[0..3].toHex, $dNode.address.get(), attnets.get().toHex, $bits]
|
||||||
|
|
||||||
ps.write(newLine)
|
ps.write(newLine)
|
||||||
await sleepAsync(1000) # 1 sec of delay
|
await sleepAsync(1.seconds) # 1 sec of delay
|
||||||
|
|
||||||
|
|
||||||
proc run(config: DiscoveryConf) {.raises: [CatchableError].} =
|
proc run(config: DiscoveryConf) {.raises: [CatchableError].} =
|
||||||
|
|
Loading…
Reference in New Issue