mplex testing/interop

This commit is contained in:
Dmitriy Ryajov 2019-09-06 15:27:55 -06:00
parent 100f6220b3
commit e5b782f094
5 changed files with 47 additions and 21 deletions

View File

@ -14,9 +14,12 @@ import ../../stream/bufferstream,
nimcrypto/utils, nimcrypto/utils,
types, coder types, coder
const DefaultChannelSize* = DefaultBufferSize * 64 # 64kb
type type
Channel* = ref object of BufferStream Channel* = ref object of BufferStream
id*: int id*: int
name*: string
conn*: Connection conn*: Connection
initiator*: bool initiator*: bool
isReset*: bool isReset*: bool
@ -30,9 +33,11 @@ type
proc newChannel*(id: int, proc newChannel*(id: int,
conn: Connection, conn: Connection,
initiator: bool, initiator: bool,
size: int = MaxMsgSize): Channel = name: string = "",
size: int = DefaultChannelSize): Channel =
new result new result
result.id = id result.id = id
result.name = name
result.conn = conn result.conn = conn
result.initiator = initiator result.initiator = initiator
result.msgCode = if initiator: MessageType.MsgOut else: MessageType.MsgIn result.msgCode = if initiator: MessageType.MsgOut else: MessageType.MsgIn
@ -41,7 +46,7 @@ proc newChannel*(id: int,
let chan = result let chan = result
proc writeHandler(data: seq[byte]): Future[void] {.async, gcsafe.} = proc writeHandler(data: seq[byte]): Future[void] {.async, gcsafe.} =
await conn.writeHeader(id, chan.msgCode, data.len) # write header await conn.writeHeader(chan.id, chan.msgCode, data.len) # write header
await conn.write(data) await conn.write(data)
result.initBufferStream(writeHandler, size) result.initBufferStream(writeHandler, size)

View File

@ -15,6 +15,9 @@ import types,
../../stream/lpstream, ../../stream/lpstream,
nimcrypto/utils nimcrypto/utils
type
Phase = enum Header, Size
proc readHeader*(conn: Connection): Future[(uint, MessageType)] {.async, gcsafe.} = proc readHeader*(conn: Connection): Future[(uint, MessageType)] {.async, gcsafe.} =
var var
header: uint header: uint
@ -26,13 +29,14 @@ proc readHeader*(conn: Connection): Future[(uint, MessageType)] {.async, gcsafe.
await conn.readExactly(addr buffer[i], 1) await conn.readExactly(addr buffer[i], 1)
res = LP.getUVarint(buffer.toOpenArray(0, i), length, header) res = LP.getUVarint(buffer.toOpenArray(0, i), length, header)
if res == VarintStatus.Success: if res == VarintStatus.Success:
let (id, msg) = (header shr 3, MessageType(header and 0x7))
return (header shr 3, MessageType(header and 0x7)) return (header shr 3, MessageType(header and 0x7))
if res != VarintStatus.Success: if res != VarintStatus.Success:
buffer.setLen(0) buffer.setLen(0)
return return
except LPStreamIncompleteError: except TransportIncompleteError:
buffer.setLen(0) buffer.setLen(0)
raise newException(CatchableError, "Could not decode header!") raise newLPStreamIncompleteError()
proc writeHeader*(conn: Connection, proc writeHeader*(conn: Connection,
id: int, id: int,
@ -40,8 +44,7 @@ proc writeHeader*(conn: Connection,
size: int = 0) {.async, gcsafe.} = size: int = 0) {.async, gcsafe.} =
## write lenght prefixed ## write lenght prefixed
var buf = initVBuffer() var buf = initVBuffer()
buf.writeVarint(LPSomeUVarint(id.uint shl 3 or msgType.uint)) buf.writeVarint((id.uint shl 3) or msgType.uint)
if size > 0: buf.writeVarint(size.uint) # size should be always sent
buf.writeVarint(LPSomeUVarint(size.uint))
buf.finish() buf.finish()
result = conn.write(buf.buffer) await conn.write(buf.buffer)

View File

@ -16,7 +16,7 @@
## This still needs to be implemented properly - I'm leaving it ## This still needs to be implemented properly - I'm leaving it
## here to not forget that this needs to be fixed ASAP. ## here to not forget that this needs to be fixed ASAP.
import tables, sequtils, strformat import tables, sequtils, strformat, options
import chronos import chronos
import coder, types, channel, import coder, types, channel,
../../varint, ../../varint,
@ -34,6 +34,9 @@ type
currentId*: int currentId*: int
maxChannels*: uint maxChannels*: uint
proc newMplexNoSuchChannel(id: int, msgType: MessageType): ref MplexNoSuchChannel =
result = newException(MplexNoSuchChannel, &"No such channel id {$id} and message {$msgType}")
proc newMplexUnknownMsgError(): ref MplexUnknownMsgError = proc newMplexUnknownMsgError(): ref MplexUnknownMsgError =
result = newException(MplexUnknownMsgError, "Unknown mplex message type") result = newException(MplexUnknownMsgError, "Unknown mplex message type")
@ -45,11 +48,12 @@ proc getChannelList(m: Mplex, initiator: bool): var Table[int, Channel] =
proc newStreamInternal*(m: Mplex, proc newStreamInternal*(m: Mplex,
initiator: bool = true, initiator: bool = true,
chanId: int): chanId: int,
name: string = ""):
Future[Channel] {.async, gcsafe.} = Future[Channel] {.async, gcsafe.} =
## create new channel/stream ## create new channel/stream
let id = if initiator: m.currentId.inc(); m.currentId else: chanId let id = if initiator: m.currentId.inc(); m.currentId else: chanId
result = newChannel(id, m.connection, initiator) result = newChannel(id, m.connection, initiator, name)
m.getChannelList(initiator)[id] = result m.getChannelList(initiator)[id] = result
proc newStreamInternal*(m: Mplex): Future[Channel] {.gcsafe.} = proc newStreamInternal*(m: Mplex): Future[Channel] {.gcsafe.} =
@ -60,26 +64,36 @@ method handle*(m: Mplex): Future[void] {.async, gcsafe.} =
while not m.connection.closed: while not m.connection.closed:
let (id, msgType) = await m.connection.readHeader() let (id, msgType) = await m.connection.readHeader()
let initiator = bool(ord(msgType) and 1) let initiator = bool(ord(msgType) and 1)
var channel: Channel
if MessageType(msgType) != MessageType.New:
let channels = m.getChannelList(initiator)
if not channels.contains(id.int):
raise newMplexNoSuchChannel(id.int, msgType)
channel = channels[id.int]
case msgType: case msgType:
of MessageType.New: of MessageType.New:
let channel = await m.newStreamInternal(false, id.int) var name: seq[byte]
try:
name = await m.connection.readLp()
except LPStreamIncompleteError as exc:
echo exc.msg
except Exception as exc:
echo exc.msg
raise
let channel = await m.newStreamInternal(false, id.int, cast[string](name))
if not isNil(m.streamHandler): if not isNil(m.streamHandler):
channel.handlerFuture = m.streamHandler(newConnection(channel)) channel.handlerFuture = m.streamHandler(newConnection(channel))
of MessageType.MsgIn, MessageType.MsgOut: of MessageType.MsgIn, MessageType.MsgOut:
let channel = m.getChannelList(initiator)[id.int]
let msg = await m.connection.readLp() let msg = await m.connection.readLp()
await channel.pushTo(msg) await channel.pushTo(msg)
of MessageType.CloseIn, MessageType.CloseOut: of MessageType.CloseIn, MessageType.CloseOut:
let channel = m.getChannelList(initiator)[id.int]
await channel.closedByRemote() await channel.closedByRemote()
m.getChannelList(initiator).del(id.int) m.getChannelList(initiator).del(id.int)
of MessageType.ResetIn, MessageType.ResetOut: of MessageType.ResetIn, MessageType.ResetOut:
let channel = m.getChannelList(initiator)[id.int]
await channel.resetByRemote() await channel.resetByRemote()
else: raise newMplexUnknownMsgError() else: raise newMplexUnknownMsgError()
except Exception as exc:
#TODO: add proper loging
discard
finally: finally:
await m.connection.close() await m.connection.close()
@ -91,9 +105,11 @@ proc newMplex*(conn: Connection,
result.remote = initTable[int, Channel]() result.remote = initTable[int, Channel]()
result.local = initTable[int, Channel]() result.local = initTable[int, Channel]()
method newStream*(m: Mplex): Future[Connection] {.async, gcsafe.} = method newStream*(m: Mplex, name: string = ""): Future[Connection] {.async, gcsafe.} =
let channel = await m.newStreamInternal() let channel = await m.newStreamInternal()
await m.connection.writeHeader(channel.id, MessageType.New) await m.connection.writeHeader(channel.id, MessageType.New, len(name))
if name.len > 0:
await m.connection.write(name)
result = newConnection(channel) result = newConnection(channel)
method close*(m: Mplex) {.async, gcsafe.} = method close*(m: Mplex) {.async, gcsafe.} =

View File

@ -17,6 +17,8 @@ const MaxReadWriteTime* = 5.seconds
type type
MplexUnknownMsgError* = object of CatchableError MplexUnknownMsgError* = object of CatchableError
MplexNoSuchChannel* = object of CatchableError
MessageType* {.pure.} = enum MessageType* {.pure.} = enum
New, New,
MsgIn, MsgIn,

View File

@ -24,7 +24,7 @@ type
newMuxer*: MuxerCreator newMuxer*: MuxerCreator
streamHandler*: StreamHandler streamHandler*: StreamHandler
method newStream*(m: Muxer): Future[Connection] {.base, async, gcsafe.} = discard method newStream*(m: Muxer, name: string = ""): Future[Connection] {.base, async, gcsafe.} = discard
method close*(m: Muxer) {.base, async, gcsafe.} = discard method close*(m: Muxer) {.base, async, gcsafe.} = discard
method handle*(m: Muxer): Future[void] {.base, async, gcsafe.} = discard method handle*(m: Muxer): Future[void] {.base, async, gcsafe.} = discard
method `=streamHandler`*(m: Muxer, handler: StreamHandler) {.base, gcsafe.} = method `=streamHandler`*(m: Muxer, handler: StreamHandler) {.base, gcsafe.} =