mirror of
https://github.com/vacp2p/nim-libp2p-experimental.git
synced 2025-01-12 03:14:15 +00:00
properly close connections
This commit is contained in:
parent
35a48fa560
commit
e4303110a6
@ -44,11 +44,16 @@ proc init*[T: Connection](self: var T, stream: LPStream) =
|
|||||||
# bind stream's close event to connection's close
|
# bind stream's close event to connection's close
|
||||||
# to ensure correct close propagation
|
# to ensure correct close propagation
|
||||||
let this = self
|
let this = self
|
||||||
if not isNil(self.stream.closeEvent):
|
if not isNil(stream.closeEvent):
|
||||||
self.stream.closeEvent.wait().
|
stream.closeEvent.wait().
|
||||||
addCallback do (udata: pointer):
|
addCallback do (udata: pointer):
|
||||||
if not this.closed:
|
trace "wrapped stream closed, about to close conn", closed = this.isClosed,
|
||||||
trace "wrapped stream closed, closing conn"
|
peer = if not isNil(this.peerInfo):
|
||||||
|
this.peerInfo.id else: ""
|
||||||
|
if not this.isClosed:
|
||||||
|
trace "wrapped stream closed, closing conn", closed = this.isClosed,
|
||||||
|
peer = if not isNil(this.peerInfo):
|
||||||
|
this.peerInfo.id else: ""
|
||||||
asyncCheck this.close()
|
asyncCheck this.close()
|
||||||
|
|
||||||
proc newConnection*(stream: LPStream): Connection =
|
proc newConnection*(stream: LPStream): Connection =
|
||||||
@ -108,13 +113,21 @@ method closed*(s: Connection): bool =
|
|||||||
result = s.stream.closed
|
result = s.stream.closed
|
||||||
|
|
||||||
method close*(s: Connection) {.async, gcsafe.} =
|
method close*(s: Connection) {.async, gcsafe.} =
|
||||||
trace "closing connection"
|
trace "closing connection", closed = s.closed,
|
||||||
|
peer = if not isNil(s.peerInfo):
|
||||||
|
s.peerInfo.id else: ""
|
||||||
if not s.closed:
|
if not s.closed:
|
||||||
if not isNil(s.stream) and not s.stream.closed:
|
if not isNil(s.stream) and not s.stream.closed:
|
||||||
|
trace "closing connection", closed = s.closed,
|
||||||
|
peer = if not isNil(s.peerInfo):
|
||||||
|
s.peerInfo.id else: ""
|
||||||
await s.stream.close()
|
await s.stream.close()
|
||||||
|
|
||||||
s.closeEvent.fire()
|
s.closeEvent.fire()
|
||||||
s.isClosed = true
|
s.isClosed = true
|
||||||
trace "connection closed", closed = s.closed
|
trace "connection closed", closed = s.closed,
|
||||||
|
peer = if not isNil(s.peerInfo):
|
||||||
|
s.peerInfo.id else: ""
|
||||||
|
|
||||||
proc readLp*(s: Connection): Future[seq[byte]] {.async, gcsafe.} =
|
proc readLp*(s: Connection): Future[seq[byte]] {.async, gcsafe.} =
|
||||||
## read lenght prefixed msg
|
## read lenght prefixed msg
|
||||||
|
@ -19,6 +19,8 @@ import ../protocol,
|
|||||||
|
|
||||||
type
|
type
|
||||||
Secure* = ref object of LPProtocol # base type for secure managers
|
Secure* = ref object of LPProtocol # base type for secure managers
|
||||||
|
cleanupFut: Future[void]
|
||||||
|
|
||||||
SecureConn* = ref object of Connection
|
SecureConn* = ref object of Connection
|
||||||
|
|
||||||
method readMessage*(c: SecureConn): Future[seq[byte]] {.async, base.} =
|
method readMessage*(c: SecureConn): Future[seq[byte]] {.async, base.} =
|
||||||
@ -32,8 +34,9 @@ method handshake(s: Secure,
|
|||||||
initiator: bool = false): Future[SecureConn] {.async, base.} =
|
initiator: bool = false): Future[SecureConn] {.async, base.} =
|
||||||
doAssert(false, "Not implemented!")
|
doAssert(false, "Not implemented!")
|
||||||
|
|
||||||
proc readLoop(sconn: SecureConn, stream: BufferStream) {.async.} =
|
proc readLoop(sconn: SecureConn, conn: Connection) {.async.} =
|
||||||
try:
|
try:
|
||||||
|
let stream = BufferStream(conn.stream)
|
||||||
while not sconn.closed:
|
while not sconn.closed:
|
||||||
let msg = await sconn.readMessage()
|
let msg = await sconn.readMessage()
|
||||||
if msg.len == 0:
|
if msg.len == 0:
|
||||||
@ -44,6 +47,9 @@ proc readLoop(sconn: SecureConn, stream: BufferStream) {.async.} =
|
|||||||
except CatchableError as exc:
|
except CatchableError as exc:
|
||||||
trace "Exception occurred Secure.readLoop", exc = exc.msg
|
trace "Exception occurred Secure.readLoop", exc = exc.msg
|
||||||
finally:
|
finally:
|
||||||
|
if not conn.closed:
|
||||||
|
await conn.close()
|
||||||
|
|
||||||
if not sconn.closed:
|
if not sconn.closed:
|
||||||
await sconn.close()
|
await sconn.close()
|
||||||
trace "ending Secure readLoop", isclosed = sconn.closed()
|
trace "ending Secure readLoop", isclosed = sconn.closed()
|
||||||
@ -54,14 +60,8 @@ proc handleConn*(s: Secure, conn: Connection, initiator: bool = false): Future[C
|
|||||||
trace "sending encrypted bytes", bytes = data.shortLog
|
trace "sending encrypted bytes", bytes = data.shortLog
|
||||||
await sconn.writeMessage(data)
|
await sconn.writeMessage(data)
|
||||||
|
|
||||||
var stream = newBufferStream(writeHandler)
|
result = newConnection(newBufferStream(writeHandler))
|
||||||
asyncCheck readLoop(sconn, stream)
|
asyncCheck readLoop(sconn, result)
|
||||||
result = newConnection(stream)
|
|
||||||
result.closeEvent.wait()
|
|
||||||
.addCallback do (udata: pointer):
|
|
||||||
trace "wrapped connection closed, closing upstream"
|
|
||||||
if not isNil(sconn) and not sconn.closed:
|
|
||||||
asyncCheck sconn.close()
|
|
||||||
|
|
||||||
if not isNil(sconn.peerInfo) and sconn.peerInfo.publicKey.isSome:
|
if not isNil(sconn.peerInfo) and sconn.peerInfo.publicKey.isSome:
|
||||||
result.peerInfo = PeerInfo.init(sconn.peerInfo.publicKey.get())
|
result.peerInfo = PeerInfo.init(sconn.peerInfo.publicKey.get())
|
||||||
|
Loading…
x
Reference in New Issue
Block a user