Fix utp connection leak on cancel (#1107)
* Fix utp connection leak on cancel
This commit is contained in:
parent
b975d35c84
commit
5c78fe64e4
|
@ -615,36 +615,63 @@ proc findContent*(p: PortalProtocol, dst: Node, contentKey: ByteList):
|
||||||
id = dst.id
|
id = dst.id
|
||||||
return err("Trying to connect to node with unknown address")
|
return err("Trying to connect to node with unknown address")
|
||||||
|
|
||||||
let connectionResult =
|
let connFuture = p.stream.connectTo(
|
||||||
await p.stream.connectTo(
|
|
||||||
nodeAddress.unsafeGet(),
|
nodeAddress.unsafeGet(),
|
||||||
uint16.fromBytesBE(m.connectionId)
|
uint16.fromBytesBE(m.connectionId)
|
||||||
)
|
)
|
||||||
|
|
||||||
|
yield connFuture
|
||||||
|
|
||||||
|
var connectionResult: Result[UtpSocket[NodeAddress], string]
|
||||||
|
|
||||||
|
if connFuture.completed():
|
||||||
|
connectionResult = connFuture.read()
|
||||||
|
else:
|
||||||
|
raise connFuture.error
|
||||||
|
|
||||||
if connectionResult.isErr():
|
if connectionResult.isErr():
|
||||||
debug "Utp connection error while trying to find content",
|
debug "Utp connection error while trying to find content",
|
||||||
error = connectionResult.error
|
error = connectionResult.error
|
||||||
return err("Error connecting uTP socket")
|
return err("Error connecting uTP socket")
|
||||||
|
|
||||||
let socket = connectionResult.get()
|
let socket = connectionResult.get()
|
||||||
# Read all bytes from the socket
|
|
||||||
# This will either end with a FIN, or because the read action times out.
|
|
||||||
# A FIN does not necessarily mean that the data read is complete. Further
|
|
||||||
# validation is required, using a length prefix here might be beneficial for
|
|
||||||
# this.
|
|
||||||
let readData = socket.read()
|
|
||||||
readData.cancelCallback = proc(udate: pointer) {.gcsafe.} =
|
|
||||||
# In case this `findContent` gets cancelled while reading the data,
|
|
||||||
# send a FIN and clean up the socket.
|
|
||||||
socket.close()
|
|
||||||
|
|
||||||
if await readData.withTimeout(p.stream.readTimeout):
|
try:
|
||||||
let content = readData.read
|
# Read all bytes from the socket
|
||||||
await socket.destroyWait()
|
# This will either end with a FIN, or because the read action times out.
|
||||||
return ok(FoundContent(src: dst, kind: Content, content: content))
|
# A FIN does not necessarily mean that the data read is complete. Further
|
||||||
else:
|
# validation is required, using a length prefix here might be beneficial for
|
||||||
|
# this.
|
||||||
|
let readFut = socket.read()
|
||||||
|
|
||||||
|
readFut.cancelCallback = proc(udate: pointer) {.gcsafe.} =
|
||||||
|
debug "Socket read cancelled",
|
||||||
|
socketKey = socket.socketKey
|
||||||
|
# In case this `findContent` gets cancelled while reading the data,
|
||||||
|
# send a FIN and clean up the socket.
|
||||||
|
socket.close()
|
||||||
|
|
||||||
|
if await readFut.withTimeout(p.stream.readTimeout):
|
||||||
|
let content = readFut.read
|
||||||
|
# socket received remote FIN and drained whole buffer, it can be
|
||||||
|
# safely destroyed without notifing remote
|
||||||
|
debug "Socket read fully",
|
||||||
|
socketKey = socket.socketKey
|
||||||
|
socket.destroy()
|
||||||
|
return ok(FoundContent(src: dst, kind: Content, content: content))
|
||||||
|
else :
|
||||||
|
debug "Socket read time-out",
|
||||||
|
socketKey = socket.socketKey
|
||||||
|
socket.close()
|
||||||
|
return err("Reading data from socket timed out, content request failed")
|
||||||
|
except CancelledError as exc:
|
||||||
|
# even though we already installed cancelCallback on readFut, it is worth
|
||||||
|
# catching CancelledError in case that withTimeout throws CancelledError
|
||||||
|
# but readFut have already finished.
|
||||||
|
debug "Socket read cancelled",
|
||||||
|
socketKey = socket.socketKey
|
||||||
socket.close()
|
socket.close()
|
||||||
return err("Reading data from socket timed out, content request failed")
|
raise exc
|
||||||
of contentType:
|
of contentType:
|
||||||
return ok(FoundContent(src: dst, kind: Content, content: m.content.asSeq()))
|
return ok(FoundContent(src: dst, kind: Content, content: m.content.asSeq()))
|
||||||
of enrsType:
|
of enrsType:
|
||||||
|
|
|
@ -140,7 +140,18 @@ proc connectTo*(
|
||||||
nodeAddress: NodeAddress,
|
nodeAddress: NodeAddress,
|
||||||
connectionId: uint16):
|
connectionId: uint16):
|
||||||
Future[Result[UtpSocket[NodeAddress], string]] {.async.} =
|
Future[Result[UtpSocket[NodeAddress], string]] {.async.} =
|
||||||
let socketRes = await stream.transport.connectTo(nodeAddress, connectionId)
|
let connectFut = stream.transport.connectTo(nodeAddress, connectionId)
|
||||||
|
|
||||||
|
# using yield, not await, as await does not play nice with cancellation
|
||||||
|
# interacting with async procs which allocates some resource
|
||||||
|
yield connectFut
|
||||||
|
|
||||||
|
var socketRes: ConnectionResult[NodeAddress]
|
||||||
|
|
||||||
|
if connectFut.completed():
|
||||||
|
socketRes = connectFut.read()
|
||||||
|
else:
|
||||||
|
raise connectFut.error
|
||||||
|
|
||||||
if socketRes.isErr():
|
if socketRes.isErr():
|
||||||
case socketRes.error.kind
|
case socketRes.error.kind
|
||||||
|
|
|
@ -1 +1 @@
|
||||||
Subproject commit 4463a28fd615561b3614806b69f2c0592fe91047
|
Subproject commit dffaa78cbedd47d3ee00ba1fdf2b130c47e75793
|
Loading…
Reference in New Issue