From c6561b88514398c7aef7366ee14935e23681e111 Mon Sep 17 00:00:00 2001 From: Dmitriy Ryajov Date: Mon, 6 Jan 2020 23:34:46 -0600 Subject: [PATCH] add timeouts to connection/secio --- libp2p/connection.nim | 22 +++++++++++++--------- libp2p/protocols/secure/secio.nim | 5 +++-- 2 files changed, 16 insertions(+), 11 deletions(-) diff --git a/libp2p/connection.nim b/libp2p/connection.nim index 5395be965..70e0b0c4f 100644 --- a/libp2p/connection.nim +++ b/libp2p/connection.nim @@ -16,23 +16,27 @@ import peerinfo, vbuffer const DefaultReadSize*: uint = 64 * 1024 +const DefaultRWTimeout*: Duration = 2.minutes type Connection* = ref object of LPStream peerInfo*: PeerInfo stream*: LPStream observedAddrs*: Multiaddress + timeout*: Duration InvalidVarintException = object of LPStreamError proc newInvalidVarintException*(): ref InvalidVarintException = newException(InvalidVarintException, "unable to prase varint") -proc newConnection*(stream: LPStream): Connection = +proc newConnection*(stream: LPStream, + timeout: Duration = DefaultRWTimeout): Connection = ## create a new Connection for the specified async reader/writer new result result.stream = stream result.closeEvent = newAsyncEvent() + result.timeout = timeout # bind stream's close event to connection's close # to ensure correct close propagation @@ -45,50 +49,50 @@ proc newConnection*(stream: LPStream): Connection = asyncCheck this.close() method read*(s: Connection, n = -1): Future[seq[byte]] {.gcsafe.} = - s.stream.read(n) + wait(s.stream.read(n), s.timeout) method readExactly*(s: Connection, pbytes: pointer, nbytes: int): Future[void] {.gcsafe.} = - s.stream.readExactly(pbytes, nbytes) + wait(s.stream.readExactly(pbytes, nbytes), s.timeout) method readLine*(s: Connection, limit = 0, sep = "\r\n"): Future[string] {.gcsafe.} = - s.stream.readLine(limit, sep) + wait(s.stream.readLine(limit, sep), s.timeout) method readOnce*(s: Connection, pbytes: pointer, nbytes: int): Future[int] {.gcsafe.} = - s.stream.readOnce(pbytes, nbytes) + wait(s.stream.readOnce(pbytes, nbytes), s.timeout) method readUntil*(s: Connection, pbytes: pointer, nbytes: int, sep: seq[byte]): Future[int] {.gcsafe.} = - s.stream.readUntil(pbytes, nbytes, sep) + wait(s.stream.readUntil(pbytes, nbytes, sep), s.timeout) method write*(s: Connection, pbytes: pointer, nbytes: int): Future[void] {.gcsafe.} = - s.stream.write(pbytes, nbytes) + wait(s.stream.write(pbytes, nbytes), s.timeout) method write*(s: Connection, msg: string, msglen = -1): Future[void] {.gcsafe.} = - s.stream.write(msg, msglen) + wait(s.stream.write(msg, msglen), s.timeout) method write*(s: Connection, msg: seq[byte], msglen = -1): Future[void] {.gcsafe.} = - s.stream.write(msg, msglen) + wait(s.stream.write(msg, msglen), s.timeout) method closed*(s: Connection): bool = if isNil(s.stream): diff --git a/libp2p/protocols/secure/secio.nim b/libp2p/protocols/secure/secio.nim index 4c3760e54..fd6307103 100644 --- a/libp2p/protocols/secure/secio.nim +++ b/libp2p/protocols/secure/secio.nim @@ -31,6 +31,7 @@ const SecioExchanges = "P-256,P-384,P-521" SecioCiphers = "TwofishCTR,AES-256,AES-128" SecioHashes = "SHA256,SHA512" + SecioRWTimeout = 2.minutes type Secio = ref object of Secure @@ -233,6 +234,7 @@ proc newSecureConnection(conn: Connection, new result result.stream = conn + result.timeout = SecioRWTimeout result.closeEvent = newAsyncEvent() let i0 = if order < 0: 1 else: 0 @@ -331,7 +333,6 @@ proc handshake(s: Secio, conn: Connection): Future[SecureConnection] {.async.} = remotePeerId = PeerID.init(remotePubkey) # TODO: PeerID check against supplied PeerID - let order = getOrder(remoteBytesPubkey, localNonce, localBytesPubkey, remoteNonce) trace "Remote proposal", schemes = remoteExchanges, ciphers = remoteCiphers, @@ -451,7 +452,7 @@ method init(s: Secio) {.gcsafe.} = proc handle(conn: Connection, proto: string) {.async, gcsafe.} = trace "handling connection" try: - discard await s.handleConn(conn) + asyncCheck s.handleConn(conn) trace "connection secured" except CatchableError as exc: if not conn.closed():