small fixes (#374)
* add helper to read EOF marker after closing stream (else stream stay alive until timeout/reset) * don't assert on empty channel message * don't loop when writing to chronos (no need)
This commit is contained in:
parent
ec322124ac
commit
25bd0a18f4
|
@ -160,10 +160,12 @@ method readOnce*(s: LPChannel,
|
||||||
raise exc
|
raise exc
|
||||||
|
|
||||||
method write*(s: LPChannel, msg: seq[byte]): Future[void] {.async.} =
|
method write*(s: LPChannel, msg: seq[byte]): Future[void] {.async.} =
|
||||||
if s.closedLocal:
|
if s.closedLocal or s.conn.closed:
|
||||||
raise newLPStreamClosedError()
|
raise newLPStreamClosedError()
|
||||||
|
|
||||||
doAssert msg.len > 0
|
if msg.len == 0:
|
||||||
|
return
|
||||||
|
|
||||||
try:
|
try:
|
||||||
if not s.isOpen:
|
if not s.isOpen:
|
||||||
await s.open()
|
await s.open()
|
||||||
|
|
|
@ -105,10 +105,6 @@ proc newIdentify*(peerInfo: PeerInfo): Identify =
|
||||||
method init*(p: Identify) =
|
method init*(p: Identify) =
|
||||||
proc handle(conn: Connection, proto: string) {.async, gcsafe, closure.} =
|
proc handle(conn: Connection, proto: string) {.async, gcsafe, closure.} =
|
||||||
try:
|
try:
|
||||||
defer:
|
|
||||||
trace "exiting identify handler", conn
|
|
||||||
await conn.close()
|
|
||||||
|
|
||||||
trace "handling identify request", conn
|
trace "handling identify request", conn
|
||||||
var pb = encodeMsg(p.peerInfo, conn.observedAddr)
|
var pb = encodeMsg(p.peerInfo, conn.observedAddr)
|
||||||
await conn.writeLp(pb.buffer)
|
await conn.writeLp(pb.buffer)
|
||||||
|
@ -116,6 +112,9 @@ method init*(p: Identify) =
|
||||||
raise exc
|
raise exc
|
||||||
except CatchableError as exc:
|
except CatchableError as exc:
|
||||||
trace "exception in identify handler", exc = exc.msg, conn
|
trace "exception in identify handler", exc = exc.msg, conn
|
||||||
|
finally:
|
||||||
|
trace "exiting identify handler", conn
|
||||||
|
await conn.closeWithEOF()
|
||||||
|
|
||||||
p.handler = handle
|
p.handler = handle
|
||||||
p.codec = IdentifyCodec
|
p.codec = IdentifyCodec
|
||||||
|
|
|
@ -205,7 +205,7 @@ method handleConn*(p: PubSub,
|
||||||
except CatchableError as exc:
|
except CatchableError as exc:
|
||||||
trace "exception ocurred in pubsub handle", exc = exc.msg, conn
|
trace "exception ocurred in pubsub handle", exc = exc.msg, conn
|
||||||
finally:
|
finally:
|
||||||
await conn.close()
|
await conn.closeWithEOF()
|
||||||
|
|
||||||
method subscribePeer*(p: PubSub, peer: PeerID) {.base.} =
|
method subscribePeer*(p: PubSub, peer: PeerID) {.base.} =
|
||||||
## subscribe to remote peer to receive/send pubsub
|
## subscribe to remote peer to receive/send pubsub
|
||||||
|
|
|
@ -76,14 +76,16 @@ method write*(s: ChronosStream, msg: seq[byte]) {.async.} =
|
||||||
return
|
return
|
||||||
|
|
||||||
withExceptions:
|
withExceptions:
|
||||||
var written = 0
|
# StreamTransport will only return written < msg.len on fatal failures where
|
||||||
while not s.client.closed and written < msg.len:
|
# further writing is not possible - in such cases, we'll raise here,
|
||||||
written += await s.client.write(msg[written..<msg.len])
|
# since we don't return partial writes lengths
|
||||||
s.activity = true # reset activity flag
|
var written = await s.client.write(msg)
|
||||||
|
|
||||||
if written < msg.len:
|
if written < msg.len:
|
||||||
raise (ref LPStreamClosedError)(msg: "Write couldn't finish writing")
|
raise (ref LPStreamClosedError)(msg: "Write couldn't finish writing")
|
||||||
|
|
||||||
|
s.activity = true # reset activity flag
|
||||||
|
|
||||||
method closed*(s: ChronosStream): bool {.inline.} =
|
method closed*(s: ChronosStream): bool {.inline.} =
|
||||||
result = s.client.closed
|
result = s.client.closed
|
||||||
|
|
||||||
|
|
|
@ -229,3 +229,22 @@ method close*(s: LPStream): Future[void] {.base, async.} = # {.raises [Defect].}
|
||||||
# override `closeImpl`, it is called only once - anyone overriding `close`
|
# override `closeImpl`, it is called only once - anyone overriding `close`
|
||||||
# itself must implement this - once-only check as well, with their own field
|
# itself must implement this - once-only check as well, with their own field
|
||||||
await closeImpl(s)
|
await closeImpl(s)
|
||||||
|
|
||||||
|
proc closeWithEOF*(s: LPStream): Future[void] {.async.} =
|
||||||
|
## Close the stream and wait for EOF - use this with half-closed streams where
|
||||||
|
## an EOF is expected to arrive from the other end.
|
||||||
|
await s.close()
|
||||||
|
|
||||||
|
if s.atEof():
|
||||||
|
return
|
||||||
|
|
||||||
|
try:
|
||||||
|
var buf: array[8, byte]
|
||||||
|
if (await readOnce(s, addr buf[0], buf.len)) != 0:
|
||||||
|
debug "Unexpected bytes while waiting for EOF", s
|
||||||
|
except LPStreamEOFError:
|
||||||
|
trace "Expected EOF came", s
|
||||||
|
except CancelledError as exc:
|
||||||
|
raise exc
|
||||||
|
except CatchableError as exc:
|
||||||
|
debug "Unexpected error while waiting for EOF", s, msg = exc.msg
|
||||||
|
|
|
@ -137,14 +137,13 @@ proc identify(s: Switch, conn: Connection) {.async, gcsafe.} =
|
||||||
proc identify(s: Switch, muxer: Muxer) {.async, gcsafe.} =
|
proc identify(s: Switch, muxer: Muxer) {.async, gcsafe.} =
|
||||||
# new stream for identify
|
# new stream for identify
|
||||||
var stream = await muxer.newStream()
|
var stream = await muxer.newStream()
|
||||||
|
if stream == nil:
|
||||||
|
return
|
||||||
|
|
||||||
defer:
|
try:
|
||||||
if not(isNil(stream)):
|
await s.identify(stream)
|
||||||
await stream.close() # close identify stream
|
finally:
|
||||||
|
await stream.closeWithEOF()
|
||||||
# do identify first, so that we have a
|
|
||||||
# PeerInfo in case we didn't before
|
|
||||||
await s.identify(stream)
|
|
||||||
|
|
||||||
proc mux(s: Switch, conn: Connection): Future[Muxer] {.async, gcsafe.} =
|
proc mux(s: Switch, conn: Connection): Future[Muxer] {.async, gcsafe.} =
|
||||||
## mux incoming connection
|
## mux incoming connection
|
||||||
|
@ -361,7 +360,7 @@ proc negotiateStream(s: Switch, conn: Connection, protos: seq[string]): Future[C
|
||||||
trace "Negotiating stream", conn, protos
|
trace "Negotiating stream", conn, protos
|
||||||
let selected = await s.ms.select(conn, protos)
|
let selected = await s.ms.select(conn, protos)
|
||||||
if not protos.contains(selected):
|
if not protos.contains(selected):
|
||||||
await conn.close()
|
await conn.closeWithEOF()
|
||||||
raise newException(DialFailedError, "Unable to select sub-protocol " & $protos)
|
raise newException(DialFailedError, "Unable to select sub-protocol " & $protos)
|
||||||
|
|
||||||
return conn
|
return conn
|
||||||
|
@ -392,7 +391,7 @@ proc dial*(s: Switch,
|
||||||
|
|
||||||
proc cleanup() {.async.} =
|
proc cleanup() {.async.} =
|
||||||
if not(isNil(stream)):
|
if not(isNil(stream)):
|
||||||
await stream.close()
|
await stream.closeWithEOF()
|
||||||
|
|
||||||
if not(isNil(conn)):
|
if not(isNil(conn)):
|
||||||
await conn.close()
|
await conn.close()
|
||||||
|
@ -561,7 +560,7 @@ proc newSwitch*(peerInfo: PeerInfo,
|
||||||
except CatchableError as exc:
|
except CatchableError as exc:
|
||||||
trace "exception in stream handler", conn, msg = exc.msg
|
trace "exception in stream handler", conn, msg = exc.msg
|
||||||
finally:
|
finally:
|
||||||
await conn.close()
|
await conn.closeWithEOF()
|
||||||
trace "Stream handler done", conn
|
trace "Stream handler done", conn
|
||||||
|
|
||||||
switch.mount(identity)
|
switch.mount(identity)
|
||||||
|
|
Loading…
Reference in New Issue