Fix half closed (#324)

* don't call `close` in `remoteClose`

* make sure timeout are properly propagted

* fix tests

* adding remote close write test
This commit is contained in:
Dmitriy Ryajov 2020-08-10 16:17:11 -06:00 committed by GitHub
parent 6ffd5be059
commit 2325692f55
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 77 additions and 38 deletions

View File

@ -138,12 +138,9 @@ proc closeRemote*(s: LPChannel) {.async.} =
trace "got EOF, closing channel" trace "got EOF, closing channel"
try: try:
await s.drainBuffer() await s.drainBuffer()
s.isEof = true # set EOF immediately to prevent further reads s.isEof = true # set EOF immediately to prevent further reads
await s.close() # close local end # close parent bufferstream to prevent further reads
await procCall BufferStream(s).close()
# call to avoid leaks
await procCall BufferStream(s).close() # close parent bufferstream
trace "channel closed on EOF" trace "channel closed on EOF"
except CancelledError as exc: except CancelledError as exc:

View File

@ -28,11 +28,13 @@ type
proc init*[T: SecureConn](C: type T, proc init*[T: SecureConn](C: type T,
conn: Connection, conn: Connection,
peerInfo: PeerInfo, peerInfo: PeerInfo,
observedAddr: Multiaddress): T = observedAddr: Multiaddress,
timeout: Duration = DefaultConnectionTimeout): T =
result = C(stream: conn, result = C(stream: conn,
peerInfo: peerInfo, peerInfo: peerInfo,
observedAddr: observedAddr, observedAddr: observedAddr,
closeEvent: conn.closeEvent) closeEvent: conn.closeEvent,
timeout: timeout)
result.initStream() result.initStream()
method initStream*(s: SecureConn) = method initStream*(s: SecureConn) =

View File

@ -143,8 +143,10 @@ proc initBufferStream*(s: BufferStream,
trace "created bufferstream", oid = $s.oid trace "created bufferstream", oid = $s.oid
proc newBufferStream*(handler: WriteHandler = nil, proc newBufferStream*(handler: WriteHandler = nil,
size: int = DefaultBufferSize): BufferStream = size: int = DefaultBufferSize,
timeout: Duration = DefaultConnectionTimeout): BufferStream =
new result new result
result.timeout = timeout
result.initBufferStream(handler, size) result.initBufferStream(handler, size)
proc popFirst*(s: BufferStream): byte = proc popFirst*(s: BufferStream): byte =

View File

@ -7,7 +7,7 @@
## This file may not be copied, modified, or distributed except according to ## This file may not be copied, modified, or distributed except according to
## those terms. ## those terms.
import hashes import hashes, oids
import chronicles, chronos, metrics import chronicles, chronos, metrics
import lpstream, import lpstream,
../multiaddress, ../multiaddress,
@ -20,7 +20,7 @@ logScope:
const const
ConnectionTrackerName* = "libp2p.connection" ConnectionTrackerName* = "libp2p.connection"
DefaultConnectionTimeout* = 1.minutes DefaultConnectionTimeout* = 5.minutes
type type
TimeoutHandler* = proc(): Future[void] {.gcsafe.} TimeoutHandler* = proc(): Future[void] {.gcsafe.}
@ -73,8 +73,15 @@ method initStream*(s: Connection) =
procCall LPStream(s).initStream() procCall LPStream(s).initStream()
s.closeEvent = newAsyncEvent() s.closeEvent = newAsyncEvent()
if isNil(s.timeoutHandler):
s.timeoutHandler = proc() {.async.} =
await s.close()
trace "timeout", timeout = $s.timeout.millis
doAssert(isNil(s.timerTaskFut)) doAssert(isNil(s.timerTaskFut))
s.timerTaskFut = s.timeoutMonitor() # doAssert(s.timeout > 0.millis)
if s.timeout > 0.millis:
s.timerTaskFut = s.timeoutMonitor()
inc getConnectionTracker().opened inc getConnectionTracker().opened

View File

@ -135,18 +135,20 @@ suite "Mplex":
let let
conn = newBufferStream( conn = newBufferStream(
proc (data: seq[byte]) {.gcsafe, async.} = proc (data: seq[byte]) {.gcsafe, async.} =
discard discard,
timeout = 5.minutes
) )
chann = LPChannel.init(1, conn, true) chann = LPChannel.init(1, conn, true)
await chann.pushTo(("Hello!").toBytes) await chann.pushTo(("Hello!").toBytes)
let closeFut = chann.closeRemote()
var data = newSeq[byte](6) var data = newSeq[byte](6)
await chann.readExactly(addr data[0], 6) # this should work, since there is data in the buffer await chann.readExactly(addr data[0], 3)
let closeFut = chann.closeRemote() # closing channel
let readFut = chann.readExactly(addr data[3], 3)
await all(closeFut, readFut)
try: try:
await chann.readExactly(addr data[0], 6) # this should throw await chann.readExactly(addr data[0], 6) # this should fail now
await closeFut
except LPStreamEOFError: except LPStreamEOFError:
result = true result = true
finally: finally:
@ -156,6 +158,29 @@ suite "Mplex":
check: check:
waitFor(testClosedForRead()) == true waitFor(testClosedForRead()) == true
test "half closed - channel should allow writting on remote close":
proc testClosedForRead(): Future[bool] {.async.} =
let
testData = "Hello!".toBytes
conn = newBufferStream(
proc (data: seq[byte]) {.gcsafe, async.} =
discard
, timeout = 5.minutes
)
chann = LPChannel.init(1, conn, true)
var data = newSeq[byte](6)
await chann.closeRemote() # closing channel
try:
await chann.writeLp(testData)
return true
finally:
await chann.close()
await conn.close()
check:
waitFor(testClosedForRead()) == true
test "should not allow pushing data to channel when remote end closed": test "should not allow pushing data to channel when remote end closed":
proc testResetWrite(): Future[bool] {.async.} = proc testResetWrite(): Future[bool] {.async.} =
proc writeHandler(data: seq[byte]) {.async, gcsafe.} = discard proc writeHandler(data: seq[byte]) {.async, gcsafe.} = discard
@ -211,20 +236,20 @@ suite "Mplex":
check: check:
waitFor(testResetWrite()) == true waitFor(testResetWrite()) == true
test "reset - channel should reset on timeout": test "reset - channel should reset on timeout":
proc testResetWrite(): Future[bool] {.async.} = proc testResetWrite(): Future[bool] {.async.} =
proc writeHandler(data: seq[byte]) {.async, gcsafe.} = discard proc writeHandler(data: seq[byte]) {.async, gcsafe.} = discard
let let
conn = newBufferStream(writeHandler) conn = newBufferStream(writeHandler)
chann = LPChannel.init( chann = LPChannel.init(
1, conn, true, timeout = 100.millis) 1, conn, true, timeout = 100.millis)
await chann.closeEvent.wait() await chann.closeEvent.wait()
await conn.close() await conn.close()
result = true result = true
check: check:
waitFor(testResetWrite()) waitFor(testResetWrite())
test "e2e - read/write receiver": test "e2e - read/write receiver":
proc testNewStream() {.async.} = proc testNewStream() {.async.} =
@ -318,17 +343,23 @@ suite "Mplex":
bigseq.add(uint8(rand(uint('A')..uint('z')))) bigseq.add(uint8(rand(uint('A')..uint('z'))))
proc connHandler(conn: Connection) {.async, gcsafe.} = proc connHandler(conn: Connection) {.async, gcsafe.} =
let mplexListen = Mplex.init(conn) try:
mplexListen.streamHandler = proc(stream: Connection) let mplexListen = Mplex.init(conn)
{.async, gcsafe.} = mplexListen.streamHandler = proc(stream: Connection)
let msg = await stream.readLp(MaxMsgSize) {.async, gcsafe.} =
check msg == bigseq let msg = await stream.readLp(MaxMsgSize)
trace "Bigseq check passed!" check msg == bigseq
await stream.close() trace "Bigseq check passed!"
listenJob.complete() await stream.close()
listenJob.complete()
await mplexListen.handle() await mplexListen.handle()
await mplexListen.close() await sleepAsync(1.seconds) # give chronos some slack to process things
await mplexListen.close()
except CancelledError as exc:
raise exc
except CatchableError as exc:
check false
let transport1: TcpTransport = TcpTransport.init() let transport1: TcpTransport = TcpTransport.init()
let listenFut = await transport1.listen(ma, connHandler) let listenFut = await transport1.listen(ma, connHandler)