testing mplex

This commit is contained in:
Dmitriy Ryajov 2019-09-06 00:51:19 -06:00
parent b7f999d316
commit 8338a16aab
4 changed files with 40 additions and 23 deletions

View File

@ -7,10 +7,12 @@
## This file may not be copied, modified, or distributed except according to ## This file may not be copied, modified, or distributed except according to
## those terms. ## those terms.
import chronos import chronos, strformat
import ../../stream/bufferstream, import ../../stream/bufferstream,
../../stream/lpstream, ../../stream/lpstream,
types, coder, ../../connection ../../connection,
nimcrypto/utils,
types, coder
type type
Channel* = ref object of BufferStream Channel* = ref object of BufferStream
@ -45,7 +47,7 @@ proc newChannel*(id: int,
result.initBufferStream(writeHandler, size) result.initBufferStream(writeHandler, size)
proc closeMessage(s: Channel) {.async, gcsafe.} = proc closeMessage(s: Channel) {.async, gcsafe.} =
await s.conn.writeHeader(s.id, s.closeCode, 0) # write header await s.conn.writeHeader(s.id, s.closeCode) # write header
proc closed*(s: Channel): bool = proc closed*(s: Channel): bool =
s.closedLocal s.closedLocal
@ -58,7 +60,7 @@ method close*(s: Channel) {.async, gcsafe.} =
await s.closeMessage() await s.closeMessage()
proc resetMessage(s: Channel) {.async, gcsafe.} = proc resetMessage(s: Channel) {.async, gcsafe.} =
await s.conn.writeHeader(s.id, s.resetCode, 0) await s.conn.writeHeader(s.id, s.resetCode)
proc resetByRemote*(s: Channel) {.async, gcsafe.} = proc resetByRemote*(s: Channel) {.async, gcsafe.} =
await allFutures(s.close(), s.closedByRemote()) await allFutures(s.close(), s.closedByRemote())

View File

@ -8,9 +8,12 @@
## those terms. ## those terms.
import chronos import chronos
import ../../connection, ../../varint, import types,
../../vbuffer, types, ../../connection,
../../stream/lpstream ../../varint,
../../vbuffer,
../../stream/lpstream,
nimcrypto/utils
proc readHeader*(conn: Connection): Future[(uint, MessageType)] {.async, gcsafe.} = proc readHeader*(conn: Connection): Future[(uint, MessageType)] {.async, gcsafe.} =
var var
@ -29,11 +32,12 @@ proc readHeader*(conn: Connection): Future[(uint, MessageType)] {.async, gcsafe.
return return
except LPStreamIncompleteError: except LPStreamIncompleteError:
buffer.setLen(0) buffer.setLen(0)
raise newException(CatchableError, "Could not decode header!")
proc writeHeader*(conn: Connection, proc writeHeader*(conn: Connection,
id: int, id: int,
msgType: MessageType, msgType: MessageType,
size: int) {.async, gcsafe.} = size: int = 0) {.async, gcsafe.} =
## write lenght prefixed ## write lenght prefixed
var buf = initVBuffer() var buf = initVBuffer()
buf.writeVarint(LPSomeUVarint(id.uint shl 3 or msgType.uint)) buf.writeVarint(LPSomeUVarint(id.uint shl 3 or msgType.uint))

View File

@ -16,13 +16,16 @@
## This still needs to be implemented properly - I'm leaving it ## This still needs to be implemented properly - I'm leaving it
## here to not forget that this needs to be fixed ASAP. ## here to not forget that this needs to be fixed ASAP.
import tables, sequtils import tables, sequtils, strformat
import chronos import chronos
import ../../varint, ../../connection, import coder, types, channel,
../../vbuffer, ../../protocol, ../../varint,
../../connection,
../../vbuffer,
../../protocols/protocol,
../../stream/bufferstream, ../../stream/bufferstream,
../../stream/lpstream, ../muxer, ../../stream/lpstream,
coder, types, channel ../muxer
type type
Mplex* = ref object of Muxer Mplex* = ref object of Muxer
@ -90,7 +93,7 @@ proc newMplex*(conn: Connection,
method newStream*(m: Mplex): Future[Connection] {.async, gcsafe.} = method newStream*(m: Mplex): Future[Connection] {.async, gcsafe.} =
let channel = await m.newStreamInternal() let channel = await m.newStreamInternal()
await m.connection.writeHeader(channel.id, MessageType.New, 0) await m.connection.writeHeader(channel.id, MessageType.New)
result = newConnection(channel) result = newConnection(channel)
method close*(m: Mplex) {.async, gcsafe.} = method close*(m: Mplex) {.async, gcsafe.} =

View File

@ -8,7 +8,8 @@
## those terms. ## those terms.
import chronos import chronos
import ../protocol, ../connection import ../protocols/protocol,
../connection
type type
StreamHandler* = proc(conn: Connection): Future[void] {.gcsafe.} StreamHandler* = proc(conn: Connection): Future[void] {.gcsafe.}
@ -21,21 +22,28 @@ type
# this wraps a creator proc that knows how to make muxers # this wraps a creator proc that knows how to make muxers
MuxerProvider* = ref object of LPProtocol MuxerProvider* = ref object of LPProtocol
newMuxer*: MuxerCreator newMuxer*: MuxerCreator
streamHandler*: StreamHandler
method newStream*(m: Muxer): Future[Connection] {.base, async, gcsafe.} = discard
method close*(m: Muxer) {.base, async, gcsafe.} = discard
method handle*(m: Muxer): Future[void] {.base, async, gcsafe.} = discard
method `=streamHandler`*(m: Muxer, handler: StreamHandler) {.base, gcsafe.} =
m.streamHandler = handler
proc newMuxerProvider*(creator: MuxerCreator, codec: string): MuxerProvider {.gcsafe.} = proc newMuxerProvider*(creator: MuxerCreator, codec: string): MuxerProvider {.gcsafe.} =
new result new result
result.newMuxer = creator result.newMuxer = creator
result.codec = codec result.codec = codec
result.init()
method `=streamHandler`*(m: MuxerProvider, handler: StreamHandler) {.base, gcsafe.} =
m.streamHandler = handler
method init(c: MuxerProvider) = method init(c: MuxerProvider) =
proc handler(conn: Connection, proto: string) {.async, gcsafe, closure.} = proc handler(conn: Connection, proto: string) {.async, gcsafe, closure.} =
let muxer = c.newMuxer(conn) let muxer = c.newMuxer(conn)
if not isNil(c.streamHandler):
muxer.streamHandler = c.streamHandler
await muxer.handle()
c.handler = handler c.handler = handler
method newStream*(m: Muxer): Future[Connection] {.base, async, gcsafe.} = discard
method close*(m: Muxer) {.base, async, gcsafe.} = discard
method handle*(m: Muxer): Future[void] {.base, async, gcsafe.} = discard
method `=streamHandler`*(m: Muxer, handler: StreamHandler) {.base, gcsafe.} =
m.streamHandler = handler