Channel timeout (#278)

* add support for channel timeouts

* tests for channel timeout

* add timeouts to standard switch

* fix mplex init

* cleanup timer on stream close

* add comment for `isConnected`

* move cleanup event
This commit is contained in:
Dmitriy Ryajov 2020-07-17 12:44:41 -06:00 committed by GitHub
parent 0348773ec9
commit ba071cafa6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 261 additions and 119 deletions

View File

@ -172,7 +172,7 @@ proc processInput(rfd: AsyncFD, rng: ref BrHmacDrbgContext) {.async.} =
# a constructor for building different multiplexers under various connections
proc createMplex(conn: Connection): Muxer =
result = newMplex(conn)
result = Mplex.init(conn)
let mplexProvider = newMuxerProvider(createMplex, MplexCodec)
let transports = @[Transport(TcpTransport.init())]

View File

@ -11,6 +11,7 @@ import oids, deques
import chronos, chronicles, metrics
import types,
coder,
../muxer,
nimcrypto/utils,
../../stream/connection,
../../stream/bufferstream,
@ -45,6 +46,8 @@ logScope:
type
LPChannel* = ref object of BufferStream
id*: uint64 # channel id
timeout: Duration # channel timeout if no activity
activity: bool # reset every time data is sent or received
name*: string # name of the channel (for debugging)
conn*: Connection # wrapped connection used to for writing
initiator*: bool # initiated remotely or locally flag
@ -54,6 +57,8 @@ type
msgCode*: MessageType # cached in/out message code
closeCode*: MessageType # cached in/out close code
resetCode*: MessageType # cached in/out reset code
timerFut: Future[void] # the current timer instanse
timerTaskFut: Future[void] # the current timer instanse
proc open*(s: LPChannel) {.async, gcsafe.}
@ -75,60 +80,6 @@ template withEOFExceptions(body: untyped): untyped =
except LPStreamIncompleteError as exc:
trace "incomplete message", exc = exc.msg
method reset*(s: LPChannel) {.base, async, gcsafe.}
method initStream*(s: LPChannel) =
if s.objName.len == 0:
s.objName = "LPChannel"
procCall BufferStream(s).initStream()
proc newChannel*(id: uint64,
conn: Connection,
initiator: bool,
name: string = "",
size: int = DefaultBufferSize,
lazy: bool = false): LPChannel =
result = LPChannel(id: id,
name: name,
conn: conn,
initiator: initiator,
msgCode: if initiator: MessageType.MsgOut else: MessageType.MsgIn,
closeCode: if initiator: MessageType.CloseOut else: MessageType.CloseIn,
resetCode: if initiator: MessageType.ResetOut else: MessageType.ResetIn,
isLazy: lazy)
let chan = result
logScope:
id = chan.id
initiator = chan.initiator
name = chan.name
oid = $chan.oid
peer = $chan.conn.peerInfo
# stack = getStackTrace()
proc writeHandler(data: seq[byte]): Future[void] {.async, gcsafe.} =
try:
if chan.isLazy and not(chan.isOpen):
await chan.open()
# writes should happen in sequence
trace "sending data"
await conn.writeMsg(chan.id,
chan.msgCode,
data).wait(2.minutes) # write header
except CatchableError as exc:
trace "exception in lpchannel write handler", exc = exc.msg
await chan.reset()
raise exc
result.initBufferStream(writeHandler, size)
when chronicles.enabledLogLevel == LogLevel.TRACE:
result.name = if result.name.len > 0: result.name else: $result.oid
trace "created new lpchannel"
proc closeMessage(s: LPChannel) {.async.} =
logScope:
id = s.id
@ -187,6 +138,7 @@ proc closeRemote*(s: LPChannel) {.async.} =
# stack = getStackTrace()
trace "got EOF, closing channel"
try:
await s.drainBuffer()
s.isEof = true # set EOF immediately to prevent further reads
@ -195,6 +147,10 @@ proc closeRemote*(s: LPChannel) {.async.} =
# call to avoid leaks
await procCall BufferStream(s).close() # close parent bufferstream
trace "channel closed on EOF"
except CancelledError as exc:
raise exc
except CatchableError as exc:
trace "exception closing remote channel", exc = exc.msg
method closed*(s: LPChannel): bool =
## this emulates half-closed behavior
@ -223,6 +179,7 @@ method reset*(s: LPChannel) {.base, async, gcsafe.} =
# optimistic
asyncCheck s.resetMessage()
try:
# drain the buffer before closing
await s.drainBuffer()
await procCall BufferStream(s).close()
@ -230,6 +187,11 @@ method reset*(s: LPChannel) {.base, async, gcsafe.} =
s.isEof = true
s.closedLocal = true
except CancelledError as exc:
raise exc
except CatchableError as exc:
trace "exception in reset", exc = exc.msg
trace "channel reset"
method close*(s: LPChannel) {.async, gcsafe.} =
@ -263,3 +225,123 @@ method close*(s: LPChannel) {.async, gcsafe.} =
s.closedLocal = true
asyncCheck closeInternal()
proc cleanupOnClose(s: LPChannel) {.async.} =
## await this stream's close event
## to cleanup timers and other resources
##
await s.closeEvent.wait()
if not(isNil(s.timerFut)) and
not(s.timerFut.finished):
s.timerFut.cancel()
await s.timerTaskFut
proc timeoutMonitor(s: LPChannel) {.async.} =
## monitor the channel for innactivity
##
## if the timeout was hit, it means that
## neither incoming nor outgoing activity
## has been detected and the channel will
## be reset
##
if not(isNil(s.timerFut)):
return
try:
while true:
s.timerFut = sleepAsync(s.timeout)
await s.timerFut
if s.closed or s.atEof:
return
if s.activity:
s.activity = false
continue
break
# reset channel on innactivity timeout
trace "channel timed out, resetting"
await s.reset()
except CatchableError as exc:
trace "exception in timeout", exc = exc.msg
method initStream*(s: LPChannel) =
if s.objName.len == 0:
s.objName = "LPChannel"
procCall BufferStream(s).initStream()
method readOnce*(s: LPChannel,
pbytes: pointer,
nbytes: int):
Future[int] =
s.activity = true
procCall BufferStream(s).readOnce(pbytes, nbytes)
method write*(s: LPChannel, msg: seq[byte]): Future[void] =
s.activity = true
procCall BufferStream(s).write(msg)
proc init*(
L: type LPChannel,
id: uint64,
conn: Connection,
initiator: bool,
name: string = "",
size: int = DefaultBufferSize,
lazy: bool = false,
timeout: Duration = DefaultChanTimeout): LPChannel =
let chann = L(
id: id,
name: name,
conn: conn,
initiator: initiator,
isLazy: lazy,
timeout: timeout,
msgCode: if initiator: MessageType.MsgOut else: MessageType.MsgIn,
closeCode: if initiator: MessageType.CloseOut else: MessageType.CloseIn,
resetCode: if initiator: MessageType.ResetOut else: MessageType.ResetIn,
dir: if initiator: Direction.Out else: Direction.In)
logScope:
id = chann.id
initiator = chann.initiator
name = chann.name
oid = $chann.oid
peer = $chann.conn.peerInfo
# stack = getStackTrace()
proc writeHandler(data: seq[byte]) {.async, gcsafe.} =
try:
if chann.isLazy and not(chann.isOpen):
await chann.open()
# writes should happen in sequence
trace "sending data"
await conn.writeMsg(chann.id,
chann.msgCode,
data)
except CatchableError as exc:
trace "exception in lpchannel write handler", exc = exc.msg
await chann.reset()
raise exc
chann.initBufferStream(writeHandler, size)
when chronicles.enabledLogLevel == LogLevel.TRACE:
chann.name = if chann.name.len > 0: chann.name else: $chann.oid
# launch task to cancel and cleanup
# timer on stream close
asyncCheck chann.cleanupOnClose()
chann.timerTaskFut = chann.timeoutMonitor()
trace "created new lpchannel"
return chann

View File

@ -18,6 +18,8 @@ import ../muxer,
types,
lpchannel
export muxer
logScope:
topics = "mplex"
@ -29,8 +31,9 @@ type
local: Table[uint64, LPChannel]
currentId*: uint64
maxChannels*: uint64
inChannTimeout: Duration
outChannTimeout: Duration
isClosed: bool
when chronicles.enabledLogLevel == LogLevel.TRACE:
oid*: Oid
proc getChannelList(m: Mplex, initiator: bool): var Table[uint64, LPChannel] =
@ -45,7 +48,8 @@ proc newStreamInternal*(m: Mplex,
initiator: bool = true,
chanId: uint64 = 0,
name: string = "",
lazy: bool = false):
lazy: bool = false,
timeout: Duration):
Future[LPChannel] {.async, gcsafe.} =
## create new channel/stream
##
@ -57,11 +61,13 @@ proc newStreamInternal*(m: Mplex,
initiator = initiator,
name = name,
oid = $m.oid
result = newChannel(id,
result = LPChannel.init(
id,
m.connection,
initiator,
name,
lazy = lazy)
lazy = lazy,
timeout = timeout)
result.peerInfo = m.connection.peerInfo
result.observedAddr = m.connection.observedAddr
@ -142,7 +148,11 @@ method handle*(m: Mplex) {.async, gcsafe.} =
case msgType:
of MessageType.New:
let name = string.fromBytes(data)
channel = await m.newStreamInternal(false, id, name)
channel = await m.newStreamInternal(
false,
id,
name,
timeout = m.outChannTimeout)
trace "created channel", name = channel.name,
oid = $channel.oid
@ -189,21 +199,24 @@ method handle*(m: Mplex) {.async, gcsafe.} =
except CatchableError as exc:
trace "Exception occurred", exception = exc.msg, oid = $m.oid
proc newMplex*(conn: Connection,
maxChanns: uint = MaxChannels): Mplex =
new result
result.connection = conn
result.maxChannels = maxChanns
result.remote = initTable[uint64, LPChannel]()
result.local = initTable[uint64, LPChannel]()
when chronicles.enabledLogLevel == LogLevel.TRACE:
result.oid = genOid()
proc init*(M: type Mplex,
conn: Connection,
maxChanns: uint = MaxChannels,
inTimeout, outTimeout: Duration = DefaultChanTimeout): Mplex =
M(connection: conn,
maxChannels: maxChanns,
inChannTimeout: inTimeout,
outChannTimeout: outTimeout,
remote: initTable[uint64, LPChannel](),
local: initTable[uint64, LPChannel](),
oid: genOid())
method newStream*(m: Mplex,
name: string = "",
lazy: bool = false): Future[Connection] {.async, gcsafe.} =
let channel = await m.newStreamInternal(lazy = lazy)
let channel = await m.newStreamInternal(
lazy = lazy, timeout = m.inChannTimeout)
if not lazy:
await channel.open()

View File

@ -15,6 +15,9 @@ import ../protocols/protocol,
logScope:
topics = "muxer"
const
DefaultChanTimeout* = 1.minutes
type
StreamHandler* = proc(conn: Connection): Future[void] {.gcsafe.}
MuxerHandler* = proc(muxer: Muxer): Future[void] {.gcsafe.}

View File

@ -37,9 +37,14 @@ proc newStandardSwitch*(privKey = none(PrivateKey),
sign = libp2p_pubsub_sign,
transportFlags: set[ServerFlags] = {},
msgIdProvider: MsgIdProvider = defaultMsgIdProvider,
rng = newRng()): Switch =
rng = newRng(),
inTimeout: Duration = 1.minutes,
outTimeout: Duration = 5.minutes): Switch =
proc createMplex(conn: Connection): Muxer =
newMplex(conn)
Mplex.init(
conn,
inTimeout = inTimeout,
outTimeout = outTimeout)
if rng == nil: # newRng could fail
raise (ref CatchableError)(msg: "Cannot initialize RNG")

View File

@ -71,6 +71,10 @@ proc cleanupPubSubPeer(s: Switch, conn: Connection) {.async.} =
await s.pubSub.get().unsubscribePeer(conn.peerInfo)
proc isConnected*(s: Switch, peer: PeerInfo): bool =
## returns true if the peer has one or more
## associated connections (sockets)
##
peer.peerId in s.connManager
proc secure(s: Switch, conn: Connection): Future[Connection] {.async, gcsafe.} =

View File

@ -71,7 +71,11 @@ proc testPubSubDaemonPublish(gossip: bool = false,
let daemonNode = await newDaemonApi(flags)
let daemonPeer = await daemonNode.identity()
let nativeNode = newStandardSwitch(gossip = gossip, secureManagers = [SecureProtocol.Noise])
let nativeNode = newStandardSwitch(
gossip = gossip,
secureManagers = [SecureProtocol.Noise],
outTimeout = 5.minutes)
let awaiters = nativeNode.start()
let nativePeer = nativeNode.peerInfo
@ -122,7 +126,11 @@ proc testPubSubNodePublish(gossip: bool = false,
let daemonNode = await newDaemonApi(flags)
let daemonPeer = await daemonNode.identity()
let nativeNode = newStandardSwitch(gossip = gossip, secureManagers = [SecureProtocol.Secio])
let nativeNode = newStandardSwitch(
gossip = gossip,
secureManagers = [SecureProtocol.Secio],
outTimeout = 5.minutes)
let awaiters = nativeNode.start()
let nativePeer = nativeNode.peerInfo
@ -175,7 +183,10 @@ suite "Interop":
proc runTests(): Future[bool] {.async.} =
var protos = @["/test-stream"]
let nativeNode = newStandardSwitch(secureManagers = [SecureProtocol.Noise])
let nativeNode = newStandardSwitch(
secureManagers = [SecureProtocol.Noise],
outTimeout = 5.minutes)
let awaiters = await nativeNode.start()
let daemonNode = await newDaemonApi()
let daemonPeer = await daemonNode.identity()
@ -228,7 +239,10 @@ suite "Interop":
var expect = newString(len(buffer) - 2)
copyMem(addr expect[0], addr buffer.buffer[0], len(expect))
let nativeNode = newStandardSwitch(secureManagers = [SecureProtocol.Secio])
let nativeNode = newStandardSwitch(
secureManagers = [SecureProtocol.Secio],
outTimeout = 5.minutes)
let awaiters = await nativeNode.start()
let daemonNode = await newDaemonApi()
@ -275,7 +289,9 @@ suite "Interop":
proto.handler = nativeHandler
proto.codec = protos[0] # codec
let nativeNode = newStandardSwitch(secureManagers = [SecureProtocol.Noise])
let nativeNode = newStandardSwitch(
secureManagers = [SecureProtocol.Noise], outTimeout = 5.minutes)
nativeNode.mount(proto)
let awaiters = await nativeNode.start()
@ -317,7 +333,9 @@ suite "Interop":
proto.handler = nativeHandler
proto.codec = protos[0] # codec
let nativeNode = newStandardSwitch(secureManagers = [SecureProtocol.Secio])
let nativeNode = newStandardSwitch(
secureManagers = [SecureProtocol.Secio], outTimeout = 5.minutes)
nativeNode.mount(proto)
let awaiters = await nativeNode.start()
@ -366,7 +384,9 @@ suite "Interop":
proto.handler = nativeHandler
proto.codec = protos[0] # codec
let nativeNode = newStandardSwitch(secureManagers = [SecureProtocol.Noise])
let nativeNode = newStandardSwitch(
secureManagers = [SecureProtocol.Noise], outTimeout = 5.minutes)
nativeNode.mount(proto)
let awaiters = await nativeNode.start()

View File

@ -117,7 +117,7 @@ suite "Mplex":
proc writeHandler(data: seq[byte]) {.async, gcsafe.} = discard
let
conn = newBufferStream(writeHandler)
chann = newChannel(1, conn, true)
chann = LPChannel.init(1, conn, true)
await chann.close()
try:
await chann.write("Hello")
@ -137,7 +137,7 @@ suite "Mplex":
proc (data: seq[byte]) {.gcsafe, async.} =
result = nil
)
chann = newChannel(1, conn, true)
chann = LPChannel.init(1, conn, true)
await chann.pushTo(("Hello!").toBytes)
let closeFut = chann.closeRemote()
@ -161,7 +161,7 @@ suite "Mplex":
proc writeHandler(data: seq[byte]) {.async, gcsafe.} = discard
let
conn = newBufferStream(writeHandler)
chann = newChannel(1, conn, true)
chann = LPChannel.init(1, conn, true)
await chann.closeRemote()
try:
await chann.pushTo(@[byte(1)])
@ -179,7 +179,7 @@ suite "Mplex":
proc writeHandler(data: seq[byte]) {.async, gcsafe.} = discard
let
conn = newBufferStream(writeHandler)
chann = newChannel(1, conn, true)
chann = LPChannel.init(1, conn, true)
await chann.reset()
var data = newSeq[byte](1)
@ -199,7 +199,7 @@ suite "Mplex":
proc writeHandler(data: seq[byte]) {.async, gcsafe.} = discard
let
conn = newBufferStream(writeHandler)
chann = newChannel(1, conn, true)
chann = LPChannel.init(1, conn, true)
await chann.reset()
try:
await chann.write(("Hello!").toBytes)
@ -211,13 +211,28 @@ suite "Mplex":
check:
waitFor(testResetWrite()) == true
test "timeout, channel should reset":
proc testResetWrite(): Future[bool] {.async.} =
proc writeHandler(data: seq[byte]) {.async, gcsafe.} = discard
let
conn = newBufferStream(writeHandler)
chann = LPChannel.init(
1, conn, true, timeout = 100.millis)
await chann.closeEvent.wait()
await conn.close()
result = true
check:
waitFor(testResetWrite())
test "e2e - read/write receiver":
proc testNewStream() {.async.} =
let ma: MultiAddress = Multiaddress.init("/ip4/0.0.0.0/tcp/0").tryGet()
var done = newFuture[void]()
proc connHandler(conn: Connection) {.async, gcsafe.} =
let mplexListen = newMplex(conn)
let mplexListen = Mplex.init(conn)
mplexListen.streamHandler = proc(stream: Connection)
{.async, gcsafe.} =
let msg = await stream.readLp(1024)
@ -234,7 +249,7 @@ suite "Mplex":
let transport2: TcpTransport = TcpTransport.init()
let conn = await transport2.dial(transport1.ma)
let mplexDial = newMplex(conn)
let mplexDial = Mplex.init(conn)
let mplexDialFut = mplexDial.handle()
let stream = await mplexDial.newStream()
await stream.writeLp("HELLO")
@ -257,7 +272,7 @@ suite "Mplex":
var done = newFuture[void]()
proc connHandler(conn: Connection) {.async, gcsafe.} =
let mplexListen = newMplex(conn)
let mplexListen = Mplex.init(conn)
mplexListen.streamHandler = proc(stream: Connection)
{.async, gcsafe.} =
let msg = await stream.readLp(1024)
@ -274,7 +289,7 @@ suite "Mplex":
let transport2: TcpTransport = TcpTransport.init()
let conn = await transport2.dial(transport1.ma)
let mplexDial = newMplex(conn)
let mplexDial = Mplex.init(conn)
let stream = await mplexDial.newStream(lazy = true)
let mplexDialFut = mplexDial.handle()
check not LPChannel(stream).isOpen # assert lazy
@ -303,7 +318,7 @@ suite "Mplex":
bigseq.add(uint8(rand(uint('A')..uint('z'))))
proc connHandler(conn: Connection) {.async, gcsafe.} =
let mplexListen = newMplex(conn)
let mplexListen = Mplex.init(conn)
mplexListen.streamHandler = proc(stream: Connection)
{.async, gcsafe.} =
let msg = await stream.readLp(MaxMsgSize)
@ -321,7 +336,7 @@ suite "Mplex":
let transport2: TcpTransport = TcpTransport.init()
let conn = await transport2.dial(transport1.ma)
let mplexDial = newMplex(conn)
let mplexDial = Mplex.init(conn)
let mplexDialFut = mplexDial.handle()
let stream = await mplexDial.newStream()
@ -347,7 +362,7 @@ suite "Mplex":
let done = newFuture[void]()
proc connHandler(conn: Connection) {.async, gcsafe.} =
let mplexListen = newMplex(conn)
let mplexListen = Mplex.init(conn)
mplexListen.streamHandler = proc(stream: Connection)
{.async, gcsafe.} =
await stream.writeLp("Hello from stream!")
@ -363,7 +378,7 @@ suite "Mplex":
let transport2: TcpTransport = TcpTransport.init()
let conn = await transport2.dial(transport1.ma)
let mplexDial = newMplex(conn)
let mplexDial = Mplex.init(conn)
let mplexDialFut = mplexDial.handle()
let stream = await mplexDial.newStream("DIALER")
let msg = string.fromBytes(await stream.readLp(1024))
@ -387,7 +402,7 @@ suite "Mplex":
let done = newFuture[void]()
proc connHandler(conn: Connection) {.async, gcsafe.} =
var count = 1
let mplexListen = newMplex(conn)
let mplexListen = Mplex.init(conn)
mplexListen.streamHandler = proc(stream: Connection)
{.async, gcsafe.} =
let msg = await stream.readLp(1024)
@ -406,7 +421,7 @@ suite "Mplex":
let transport2: TcpTransport = TcpTransport.init()
let conn = await transport2.dial(transport1.ma)
let mplexDial = newMplex(conn)
let mplexDial = Mplex.init(conn)
# TODO: Reenable once half-closed is working properly
let mplexDialFut = mplexDial.handle()
for i in 1..10:
@ -431,7 +446,7 @@ suite "Mplex":
let done = newFuture[void]()
proc connHandler(conn: Connection) {.async, gcsafe.} =
var count = 1
let mplexListen = newMplex(conn)
let mplexListen = Mplex.init(conn)
mplexListen.streamHandler = proc(stream: Connection)
{.async, gcsafe.} =
let msg = await stream.readLp(1024)
@ -451,7 +466,7 @@ suite "Mplex":
let transport2: TcpTransport = TcpTransport.init()
let conn = await transport2.dial(transport1.ma)
let mplexDial = newMplex(conn)
let mplexDial = Mplex.init(conn)
let mplexDialFut = mplexDial.handle()
for i in 1..10:
let stream = await mplexDial.newStream("dialer stream")
@ -477,7 +492,7 @@ suite "Mplex":
var complete = newFuture[void]()
const MsgSize = 1024
proc connHandler(conn: Connection) {.async, gcsafe.} =
let mplexListen = newMplex(conn)
let mplexListen = Mplex.init(conn)
mplexListen.streamHandler = proc(stream: Connection)
{.async, gcsafe.} =
let msg = await stream.readLp(MsgSize)
@ -494,7 +509,7 @@ suite "Mplex":
let transport2: TcpTransport = TcpTransport.init()
let conn = await transport2.dial(transport1.ma)
let mplexDial = newMplex(conn)
let mplexDial = Mplex.init(conn)
let mplexDialFut = mplexDial.handle()
let stream = await mplexDial.newStream()
var bigseq = newSeqOfCap[uint8](MaxMsgSize + 1)
@ -547,7 +562,7 @@ suite "Mplex":
var complete = newFuture[void]()
const MsgSize = 512
proc connHandler(conn: Connection) {.async, gcsafe.} =
let mplexListen = newMplex(conn)
let mplexListen = Mplex.init(conn)
mplexListen.streamHandler = proc(stream: Connection)
{.async, gcsafe.} =
let msg = await stream.readLp(MsgSize)
@ -564,7 +579,7 @@ suite "Mplex":
let transport2: TcpTransport = TcpTransport.init()
let conn = await transport2.dial(transport1.ma)
let mplexDial = newMplex(conn)
let mplexDial = Mplex.init(conn)
let stream = await mplexDial.newStream()
let mplexDialFut = mplexDial.handle()
var bigseq = newSeqOfCap[uint8](MsgSize + 1)

View File

@ -54,7 +54,7 @@ proc createSwitch(ma: MultiAddress; outgoing: bool): (Switch, PeerInfo) =
let identify = newIdentify(peerInfo)
proc createMplex(conn: Connection): Muxer =
result = newMplex(conn)
result = Mplex.init(conn)
let mplexProvider = newMuxerProvider(createMplex, MplexCodec)
let transports = @[Transport(TcpTransport.init())]

View File

@ -114,14 +114,14 @@ suite "Switch":
await sleepAsync(2.seconds) # wait a little for cleanup to happen
var bufferTracker = getTracker(BufferStreamTrackerName)
echo bufferTracker.dump()
# echo bufferTracker.dump()
# plus 4 for the pubsub streams
check (BufferStreamTracker(bufferTracker).opened ==
(BufferStreamTracker(bufferTracker).closed + 4.uint64))
var connTracker = getTracker(ConnectionTrackerName)
echo connTracker.dump()
# echo connTracker.dump()
# plus 8 is for the secured connection and the socket
# and the pubsub streams that won't clean up until