Lazy channels (#78)

* Implemented lazy stream opening for mplex connections

* Properly fix newStream usage

* Make lazy channel open optional

* Add Lazy channel test

* Cleanup mplex test

* Move lazyness properly into LPChannel

* Connection writeLp back to proc
This commit is contained in:
Giovanni Petrantoni 2020-02-12 02:30:36 +09:00 committed by GitHub
parent 8c406fb9e5
commit 23712ecf3b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 71 additions and 25 deletions

View File

@ -28,22 +28,26 @@ type
proc newInvalidVarintException*(): ref InvalidVarintException = proc newInvalidVarintException*(): ref InvalidVarintException =
newException(InvalidVarintException, "unable to prase varint") newException(InvalidVarintException, "unable to prase varint")
proc newConnection*(stream: LPStream): Connection = proc init*[T: Connection](self: var T, stream: LPStream) =
## create a new Connection for the specified async reader/writer ## create a new Connection for the specified async reader/writer
new result new self
result.stream = stream self.stream = stream
result.closeEvent = newAsyncEvent() self.closeEvent = newAsyncEvent()
# bind stream's close event to connection's close # bind stream's close event to connection's close
# to ensure correct close propagation # to ensure correct close propagation
let this = result let this = self
if not isNil(result.stream.closeEvent): if not isNil(self.stream.closeEvent):
result.stream.closeEvent.wait(). self.stream.closeEvent.wait().
addCallback do (udata: pointer): addCallback do (udata: pointer):
if not this.closed: if not this.closed:
trace "closing this connection because wrapped stream closed" trace "closing this connection because wrapped stream closed"
asyncCheck this.close() asyncCheck this.close()
proc newConnection*(stream: LPStream): Connection =
## create a new Connection for the specified async reader/writer
result.init(stream)
method read*(s: Connection, n = -1): Future[seq[byte]] {.gcsafe.} = method read*(s: Connection, n = -1): Future[seq[byte]] {.gcsafe.} =
s.stream.read(n) s.stream.read(n)

View File

@ -26,6 +26,8 @@ type
name*: string name*: string
conn*: Connection conn*: Connection
initiator*: bool initiator*: bool
isLazy*: bool
isOpen*: bool
isReset*: bool isReset*: bool
closedLocal*: bool closedLocal*: bool
closedRemote*: bool closedRemote*: bool
@ -39,7 +41,8 @@ proc newChannel*(id: uint,
conn: Connection, conn: Connection,
initiator: bool, initiator: bool,
name: string = "", name: string = "",
size: int = DefaultChannelSize): LPChannel = size: int = DefaultChannelSize,
lazy: bool = false): LPChannel =
new result new result
result.id = id result.id = id
result.name = name result.name = name
@ -49,6 +52,7 @@ proc newChannel*(id: uint,
result.closeCode = if initiator: MessageType.CloseOut else: MessageType.CloseIn result.closeCode = if initiator: MessageType.CloseOut else: MessageType.CloseIn
result.resetCode = if initiator: MessageType.ResetOut else: MessageType.ResetIn result.resetCode = if initiator: MessageType.ResetOut else: MessageType.ResetIn
result.asyncLock = newAsyncLock() result.asyncLock = newAsyncLock()
result.isLazy = lazy
let chan = result let chan = result
proc writeHandler(data: seq[byte]): Future[void] {.async.} = proc writeHandler(data: seq[byte]): Future[void] {.async.} =
@ -76,6 +80,7 @@ proc cleanUp*(s: LPChannel): Future[void] =
result = procCall close(BufferStream(s)) result = procCall close(BufferStream(s))
proc open*(s: LPChannel): Future[void] = proc open*(s: LPChannel): Future[void] =
s.isOpen = true
s.conn.writeMsg(s.id, MessageType.New, s.name) s.conn.writeMsg(s.id, MessageType.New, s.name)
method close*(s: LPChannel) {.async, gcsafe.} = method close*(s: LPChannel) {.async, gcsafe.} =
@ -142,19 +147,22 @@ method readUntil*(s: LPChannel,
raise newLPStreamEOFError() raise newLPStreamEOFError()
result = procCall readOnce(BufferStream(s), pbytes, nbytes) result = procCall readOnce(BufferStream(s), pbytes, nbytes)
method write*(s: LPChannel, template writePrefix: untyped =
pbytes: pointer, if s.isLazy and not s.isOpen:
nbytes: int): Future[void] = await s.open()
if s.closedLocal or s.isReset: if s.closedLocal or s.isReset:
raise newLPStreamEOFError() raise newLPStreamEOFError()
method write*(s: LPChannel,
pbytes: pointer,
nbytes: int): Future[void] {.async.} =
writePrefix()
result = procCall write(BufferStream(s), pbytes, nbytes) result = procCall write(BufferStream(s), pbytes, nbytes)
method write*(s: LPChannel, msg: string, msglen = -1) {.async.} = method write*(s: LPChannel, msg: string, msglen = -1) {.async.} =
if s.closedLocal or s.isReset: writePrefix()
raise newLPStreamEOFError()
result = procCall write(BufferStream(s), msg, msglen) result = procCall write(BufferStream(s), msg, msglen)
method write*(s: LPChannel, msg: seq[byte], msglen = -1) {.async.} = method write*(s: LPChannel, msg: seq[byte], msglen = -1) {.async.} =
if s.closedLocal or s.isReset: writePrefix()
raise newLPStreamEOFError()
result = procCall write(BufferStream(s), msg, msglen) result = procCall write(BufferStream(s), msg, msglen)

View File

@ -41,12 +41,13 @@ proc getChannelList(m: Mplex, initiator: bool): var Table[uint, LPChannel] =
proc newStreamInternal*(m: Mplex, proc newStreamInternal*(m: Mplex,
initiator: bool = true, initiator: bool = true,
chanId: uint = 0, chanId: uint = 0,
name: string = ""): name: string = "",
lazy: bool = false):
Future[LPChannel] {.async, gcsafe.} = Future[LPChannel] {.async, gcsafe.} =
## create new channel/stream ## create new channel/stream
let id = if initiator: m.currentId.inc(); m.currentId else: chanId let id = if initiator: m.currentId.inc(); m.currentId else: chanId
trace "creating new channel", channelId = id, initiator = initiator trace "creating new channel", channelId = id, initiator = initiator
result = newChannel(id, m.connection, initiator, name) result = newChannel(id, m.connection, initiator, name, lazy = lazy)
m.getChannelList(initiator)[id] = result m.getChannelList(initiator)[id] = result
proc cleanupChann(m: Mplex, chann: LPChannel, initiator: bool) {.async, inline.} = proc cleanupChann(m: Mplex, chann: LPChannel, initiator: bool) {.async, inline.} =
@ -141,11 +142,11 @@ proc newMplex*(conn: Connection,
.addCallback do (udata: pointer): .addCallback do (udata: pointer):
trace "connection closed, cleaning up mplex" trace "connection closed, cleaning up mplex"
asyncCheck m.close() asyncCheck m.close()
method newStream*(m: Mplex, name: string = ""): Future[Connection] {.async, gcsafe.} = method newStream*(m: Mplex, name: string = "", lazy: bool = false): Future[Connection] {.async, gcsafe.} =
let channel = await m.newStreamInternal() let channel = await m.newStreamInternal(lazy = lazy)
# TODO: open the channel (this should be lazy) if not lazy:
await channel.open() await channel.open()
result = newConnection(channel) result = newConnection(channel)
result.peerInfo = m.connection.peerInfo result.peerInfo = m.connection.peerInfo

View File

@ -32,7 +32,7 @@ type
muxerHandler*: MuxerHandler # triggered every time there is a new muxed connection created muxerHandler*: MuxerHandler # triggered every time there is a new muxed connection created
# muxer interface # muxer interface
method newStream*(m: Muxer, name: string = ""): Future[Connection] {.base, async, gcsafe.} = discard method newStream*(m: Muxer, name: string = "", lazy: bool = false): Future[Connection] {.base, async, gcsafe.} = discard
method close*(m: Muxer) {.base, async, gcsafe.} = discard method close*(m: Muxer) {.base, async, gcsafe.} = discard
method handle*(m: Muxer): Future[void] {.base, async, gcsafe.} = discard method handle*(m: Muxer): Future[void] {.base, async, gcsafe.} = discard

View File

@ -116,7 +116,7 @@ suite "Mplex":
check: check:
waitFor(testDecodeHeader()) == true waitFor(testDecodeHeader()) == true
test "e2e - read/write receiver": test "e2e - read/write receiver":
proc testNewStream(): Future[bool] {.async.} = proc testNewStream(): Future[bool] {.async.} =
let ma: MultiAddress = Multiaddress.init("/ip4/0.0.0.0/tcp/0") let ma: MultiAddress = Multiaddress.init("/ip4/0.0.0.0/tcp/0")
@ -138,9 +138,42 @@ suite "Mplex":
let conn = await transport2.dial(transport1.ma) let conn = await transport2.dial(transport1.ma)
let mplexDial = newMplex(conn) let mplexDial = newMplex(conn)
let stream = await mplexDial.newStream() let stream = await mplexDial.newStream()
let openState = cast[LPChannel](stream.stream).isOpen
await stream.writeLp("Hello from stream!") await stream.writeLp("Hello from stream!")
await conn.close() await conn.close()
check openState # not lazy
result = true
check:
waitFor(testNewStream()) == true
test "e2e - read/write receiver lazy":
proc testNewStream(): Future[bool] {.async.} =
let ma: MultiAddress = Multiaddress.init("/ip4/0.0.0.0/tcp/0")
proc connHandler(conn: Connection) {.async, gcsafe.} =
proc handleMplexListen(stream: Connection) {.async, gcsafe.} =
let msg = await stream.readLp()
check cast[string](msg) == "Hello from stream!"
await stream.close()
let mplexListen = newMplex(conn)
mplexListen.streamHandler = handleMplexListen
discard mplexListen.handle()
let transport1: TcpTransport = newTransport(TcpTransport)
discard await transport1.listen(ma, connHandler)
let transport2: TcpTransport = newTransport(TcpTransport)
let conn = await transport2.dial(transport1.ma)
let mplexDial = newMplex(conn)
let stream = await mplexDial.newStream("", true)
let openState = cast[LPChannel](stream.stream).isOpen
await stream.writeLp("Hello from stream!")
await conn.close()
check not openState # assert lazy
result = true result = true
check: check: