nimpretty

This commit is contained in:
Dmitriy Ryajov 2019-08-26 09:37:15 -06:00
parent ebab744106
commit d23398f498
7 changed files with 145 additions and 100 deletions

View File

@ -17,24 +17,33 @@ type
peerInfo*: PeerInfo peerInfo*: PeerInfo
stream: ReadWrite stream: ReadWrite
proc newConnection*(stream: ReadWrite): Connection = proc newConnection*(stream: ReadWrite): Connection =
## create a new Connection for the specified async reader/writer ## create a new Connection for the specified async reader/writer
new result new result
result.stream = stream result.stream = stream
method read*(s: Connection, n = -1): Future[seq[byte]] {.async.} = method read*(s: Connection, n = -1): Future[seq[byte]] {.async.} =
result = await s.stream.read(n) result = await s.stream.read(n)
method readExactly*(s: Connection, pbytes: pointer, nbytes: int): Future[void] {.async.} = method readExactly*(s: Connection,
pbytes: pointer,
nbytes: int): Future[void] {.async.} =
await s.stream.readExactly(pbytes, nbytes) await s.stream.readExactly(pbytes, nbytes)
method readLine*(s: Connection, limit = 0, sep = "\r\n"): Future[string] {.async.} = method readLine*(s: Connection,
limit = 0,
sep = "\r\n"): Future[string] {.async.} =
result = await s.stream.readLine(limit, sep) result = await s.stream.readLine(limit, sep)
method readOnce*(s: Connection, pbytes: pointer, nbytes: int): Future[int] {.async.} = method readOnce*(s: Connection,
pbytes: pointer,
nbytes: int): Future[int] {.async.} =
result = await s.stream.readOnce(pbytes, nbytes) result = await s.stream.readOnce(pbytes, nbytes)
method readUntil*(s: Connection, pbytes: pointer, nbytes: int, sep: seq[byte]): Future[int] {.async.} = method readUntil*(s: Connection,
pbytes: pointer,
nbytes: int,
sep: seq[byte]): Future[int] {.async.} =
result = await s.stream.readUntil(pbytes, nbytes, sep) result = await s.stream.readUntil(pbytes, nbytes, sep)
method write*(s: Connection, pbytes: pointer, nbytes: int) {.async.} = method write*(s: Connection, pbytes: pointer, nbytes: int) {.async.} =
@ -50,7 +59,7 @@ method close*(s: Connection) {.async.} =
await s.stream.close() await s.stream.close()
s.closed = true s.closed = true
proc readLp*(s: Connection): Future[seq[byte]] {.async.} = proc readLp*(s: Connection): Future[seq[byte]] {.async.} =
## read lenght prefixed msg ## read lenght prefixed msg
var var
size: uint size: uint
@ -79,11 +88,12 @@ proc writeLp*(s: Connection, msg: string | seq[byte]) {.async.} =
buf.finish() buf.finish()
result = s.write(buf.buffer) result = s.write(buf.buffer)
method getPeerInfo* (c: Connection): Future[PeerInfo] {.base, async.} = method getPeerInfo* (c: Connection): Future[PeerInfo] {.base, async.} =
## get up to date peer info ## get up to date peer info
## TODO: implement PeerInfo refresh over identify ## TODO: implement PeerInfo refresh over identify
discard discard
method getObservedAddrs(c: Connection): Future[seq[MultiAddress]] {.base, async.} = method getObservedAddrs(c: Connection): Future[seq[MultiAddress]] {.base,
async.} =
## get resolved multiaddresses for the connection ## get resolved multiaddresses for the connection
discard discard

View File

@ -17,7 +17,7 @@ const MultiCodec* = "\x13" & Codec & "\n"
const Na = "\x03na\n" const Na = "\x03na\n"
const Ls = "\x03ls\n" const Ls = "\x03ls\n"
type type
MultisteamSelectException = object of CatchableError MultisteamSelectException = object of CatchableError
Handler* = proc (conn: Connection, proto: string): Future[void] Handler* = proc (conn: Connection, proto: string): Future[void]
@ -40,12 +40,14 @@ proc newMultistream*(): MultisteamSelect =
result.ls = Ls result.ls = Ls
result.na = Na result.na = Na
proc select*(m: MultisteamSelect, conn: Connection, proto: string = ""): Future[bool] {.async.} = proc select*(m: MultisteamSelect,
conn: Connection,
proto: string = ""): Future[bool] {.async.} =
## select a remote protocol ## select a remote protocol
## TODO: select should support a list of protos to be selected ## TODO: select should support a list of protos to be selected
await conn.write(m.codec) # write handshake await conn.write(m.codec) # write handshake
if proto.len() > 0: if proto.len() > 0:
await conn.writeLp(proto) # select proto await conn.writeLp(proto) # select proto
var ms = cast[string](await conn.readLp()) var ms = cast[string](await conn.readLp())
@ -60,12 +62,13 @@ proc select*(m: MultisteamSelect, conn: Connection, proto: string = ""): Future[
ms.removeSuffix("\n") ms.removeSuffix("\n")
result = ms == proto result = ms == proto
proc list*(m: MultisteamSelect, conn: Connection): Future[seq[string]] {.async.} = proc list*(m: MultisteamSelect,
conn: Connection): Future[seq[string]] {.async.} =
## list remote protos requests on connection ## list remote protos requests on connection
if not (await m.select(conn)): if not (await m.select(conn)):
return return
await conn.write(m.ls) # send ls await conn.write(m.ls) # send ls
var list = newSeq[string]() var list = newSeq[string]()
let ms = cast[string](await conn.readLp()) let ms = cast[string](await conn.readLp())
@ -96,7 +99,7 @@ proc handle*(m: MultisteamSelect, conn: Connection) {.async.} =
for h in m.handlers: for h in m.handlers:
protos &= (h.proto & "\n") protos &= (h.proto & "\n")
await conn.writeLp(cast[seq[byte]](toSeq(protos.items))) await conn.writeLp(cast[seq[byte]](toSeq(protos.items)))
else: else:
for h in m.handlers: for h in m.handlers:
if (not isNil(h.match) and h.match(ms)) or ms == h.proto: if (not isNil(h.match) and h.match(ms)) or ms == h.proto:
await conn.writeLp(h.proto & "\n") await conn.writeLp(h.proto & "\n")
@ -104,11 +107,11 @@ proc handle*(m: MultisteamSelect, conn: Connection) {.async.} =
return return
await conn.write(m.na) await conn.write(m.na)
proc addHandler*(m: MultisteamSelect, proc addHandler*(m: MultisteamSelect,
proto: string, proto: string,
handler: Handler, handler: Handler,
matcher: Matcher = nil) = matcher: Matcher = nil) =
## register a handler for the protocol ## register a handler for the protocol
m.handlers.add(HandlerHolder(proto: proto, m.handlers.add(HandlerHolder(proto: proto,
handler: handler, handler: handler,
match: matcher)) match: matcher))

View File

@ -12,38 +12,40 @@ import chronos
type ReadWrite* = ref object of RootObj type ReadWrite* = ref object of RootObj
closed*: bool closed*: bool
method read*(s: ReadWrite, n = -1): Future[seq[byte]] method read*(s: ReadWrite, n = -1): Future[seq[byte]]
{.base, async.} = {.base, async.} =
discard discard
method readExactly*(s: ReadWrite, pbytes: pointer, nbytes: int): Future[void] method readExactly*(s: ReadWrite, pbytes: pointer, nbytes: int): Future[void]
{.base, async.} = {.base, async.} =
discard discard
method readLine*(s: ReadWrite, limit = 0, sep = "\r\n"): Future[string] method readLine*(s: ReadWrite, limit = 0, sep = "\r\n"): Future[string]
{.base, async.} = {.base, async.} =
discard discard
method readOnce*(s: ReadWrite, pbytes: pointer, nbytes: int): Future[int] method readOnce*(s: ReadWrite, pbytes: pointer, nbytes: int): Future[int]
{.base, async.} = {.base, async.} =
discard discard
method readUntil*(s: ReadWrite, pbytes: pointer, nbytes: int, sep: seq[byte]): Future[int] method readUntil*(s: ReadWrite,
{.base, async.} = pbytes: pointer, nbytes: int,
sep: seq[byte]): Future[int]
{.base, async.} =
discard discard
method write*(s: ReadWrite, pbytes: pointer, nbytes: int) method write*(s: ReadWrite, pbytes: pointer, nbytes: int)
{.base, async.} = {.base, async.} =
discard discard
method write*(s: ReadWrite, msg: string, msglen = -1) method write*(s: ReadWrite, msg: string, msglen = -1)
{.base, async.} = {.base, async.} =
discard discard
method write*(s: ReadWrite, msg: seq[byte], msglen = -1) method write*(s: ReadWrite, msg: seq[byte], msglen = -1)
{.base, async.} = {.base, async.} =
discard discard
method close*(s: ReadWrite) method close*(s: ReadWrite)
{.base, async.} = {.base, async.} =
discard discard

View File

@ -8,14 +8,17 @@
## those terms. ## those terms.
import chronos import chronos
import transport, wire, connection, multiaddress, connection, multicodec, chronosstream import transport, wire, connection,
multiaddress, connection,
multicodec, chronosstream
type TcpTransport* = ref object of Transport type TcpTransport* = ref object of Transport
server*: StreamServer server*: StreamServer
method connHandler*(t: Transport, method connHandler*(t: Transport,
server: StreamServer, server: StreamServer,
client: StreamTransport): Future[Connection] {.base, gcsafe, async.} = client: StreamTransport): Future[Connection]
{.base, gcsafe, async.} =
let conn: Connection = newConnection(newChronosStream(server, client)) let conn: Connection = newConnection(newChronosStream(server, client))
let handlerFut = if t.handler == nil: nil else: t.handler(conn) let handlerFut = if t.handler == nil: nil else: t.handler(conn)
let connHolder: ConnHolder = ConnHolder(connection: conn, let connHolder: ConnHolder = ConnHolder(connection: conn,
@ -38,7 +41,9 @@ method close*(t: TcpTransport): Future[void] {.async.} =
t.server.stop() t.server.stop()
await t.server.closeWait() await t.server.closeWait()
method listen*(t: TcpTransport, ma: MultiAddress, handler: ConnHandler): Future[void] {.async.} = method listen*(t: TcpTransport,
ma: MultiAddress,
handler: ConnHandler): Future[void] {.async.} =
await procCall Transport(t).listen(ma, handler) # call base await procCall Transport(t).listen(ma, handler) # call base
## listen on the transport ## listen on the transport
@ -50,7 +55,8 @@ method listen*(t: TcpTransport, ma: MultiAddress, handler: ConnHandler): Future[
server.start() server.start()
listenFuture.complete() listenFuture.complete()
method dial*(t: TcpTransport, address: MultiAddress): Future[Connection] {.async.} = method dial*(t: TcpTransport,
address: MultiAddress): Future[Connection] {.async.} =
## dial a peer ## dial a peer
let client: StreamTransport = await connect(address) let client: StreamTransport = await connect(address)
result = await t.connHandler(t.server, client) result = await t.connHandler(t.server, client)

View File

@ -23,11 +23,11 @@ type
handler*: ConnHandler handler*: ConnHandler
multicodec*: MultiCodec multicodec*: MultiCodec
method init*(t: Transport) {.base, error: "not implemented".} = method init*(t: Transport) {.base, error: "not implemented".} =
## perform protocol initialization ## perform protocol initialization
discard discard
proc newTransport*(t: typedesc[Transport]): t = proc newTransport*(t: typedesc[Transport]): t =
new result new result
result.init() result.init()
@ -37,16 +37,19 @@ method close*(t: Transport) {.base, async.} =
for c in t.connections: for c in t.connections:
await c.connection.close() await c.connection.close()
method listen*(t: Transport, ma: MultiAddress, handler: ConnHandler) {.base, async.} = method listen*(t: Transport,
ma: MultiAddress,
handler: ConnHandler) {.base, async.} =
## listen for incoming connections ## listen for incoming connections
t.ma = ma t.ma = ma
t.handler = handler t.handler = handler
method dial*(t: Transport, address: MultiAddress): Future[Connection] {.base, async.} = method dial*(t: Transport,
address: MultiAddress): Future[Connection] {.base, async.} =
## dial a peer ## dial a peer
discard discard
method supports(t: Transport, address: MultiAddress): bool {.base.} = method supports(t: Transport, address: MultiAddress): bool {.base.} =
## check if transport supportes the multiaddress ## check if transport supportes the multiaddress
# TODO: this should implement generic logic that would use the multicodec # TODO: this should implement generic logic that would use the multicodec
# declared in the multicodec field and set by each individual transport # declared in the multicodec field and set by each individual transport

View File

@ -1,6 +1,6 @@
import unittest, strutils, sequtils, sugar import unittest, strutils, sequtils, sugar
import chronos import chronos
import ../libp2p/connection, ../libp2p/multistreamselect, import ../libp2p/connection, ../libp2p/multistreamselect,
../libp2p/readerwriter, ../libp2p/connection, ../libp2p/multiaddress, ../libp2p/readerwriter, ../libp2p/connection, ../libp2p/multiaddress,
../libp2p/transport, ../libp2p/tcptransport ../libp2p/transport, ../libp2p/tcptransport
@ -9,7 +9,9 @@ type
TestSelectStream = ref object of ReadWrite TestSelectStream = ref object of ReadWrite
step*: int step*: int
method readExactly*(s: TestSelectStream, pbytes: pointer, nbytes: int): Future[void] {.async.} = method readExactly*(s: TestSelectStream,
pbytes: pointer,
nbytes: int): Future[void] {.async.} =
case s.step: case s.step:
of 1: of 1:
var buf = newSeq[byte](1) var buf = newSeq[byte](1)
@ -28,8 +30,10 @@ method readExactly*(s: TestSelectStream, pbytes: pointer, nbytes: int): Future[v
of 4: of 4:
var buf = "/test/proto/1.0.0\n" var buf = "/test/proto/1.0.0\n"
copyMem(cast[pointer](cast[uint](pbytes)), addr buf[0], buf.len()) copyMem(cast[pointer](cast[uint](pbytes)), addr buf[0], buf.len())
else: else:
copyMem(cast[pointer](cast[uint](pbytes)), cstring("\0x3na\n"), "\0x3na\n".len()) copyMem(cast[pointer](cast[uint](pbytes)),
cstring("\0x3na\n"),
"\0x3na\n".len())
proc newTestSelectStream(): TestSelectStream = proc newTestSelectStream(): TestSelectStream =
new result new result
@ -40,7 +44,9 @@ type
TestHandlesStream = ref object of ReadWrite TestHandlesStream = ref object of ReadWrite
step*: int step*: int
method readExactly*(s: TestHandlesStream, pbytes: pointer, nbytes: int): Future[void] {.async.} = method readExactly*(s: TestHandlesStream,
pbytes: pointer,
nbytes: int): Future[void] {.async.} =
case s.step: case s.step:
of 1: of 1:
var buf = newSeq[byte](1) var buf = newSeq[byte](1)
@ -59,8 +65,10 @@ method readExactly*(s: TestHandlesStream, pbytes: pointer, nbytes: int): Future[
of 4: of 4:
var buf = "/test/proto/1.0.0\n" var buf = "/test/proto/1.0.0\n"
copyMem(cast[pointer](cast[uint](pbytes)), addr buf[0], buf.len()) copyMem(cast[pointer](cast[uint](pbytes)), addr buf[0], buf.len())
else: else:
copyMem(cast[pointer](cast[uint](pbytes)), cstring("\0x3na\n"), "\0x3na\n".len()) copyMem(cast[pointer](cast[uint](pbytes)),
cstring("\0x3na\n"),
"\0x3na\n".len())
proc newTestHandlesStream(): TestHandlesStream = proc newTestHandlesStream(): TestHandlesStream =
new result new result
@ -74,7 +82,9 @@ type
step*: int step*: int
ls*: LsHandler ls*: LsHandler
method readExactly*(s: TestLsStream, pbytes: pointer, nbytes: int): Future[void] {.async.} = method readExactly*(s: TestLsStream,
pbytes: pointer,
nbytes: int): Future[void] {.async.} =
case s.step: case s.step:
of 1: of 1:
var buf = newSeq[byte](1) var buf = newSeq[byte](1)
@ -93,8 +103,10 @@ method readExactly*(s: TestLsStream, pbytes: pointer, nbytes: int): Future[void]
of 4: of 4:
var buf = "ls\n" var buf = "ls\n"
copyMem(cast[pointer](cast[uint](pbytes)), addr buf[0], buf.len()) copyMem(cast[pointer](cast[uint](pbytes)), addr buf[0], buf.len())
else: else:
copyMem(cast[pointer](cast[uint](pbytes)), cstring("\0x3na\n"), "\0x3na\n".len()) copyMem(cast[pointer](cast[uint](pbytes)),
cstring("\0x3na\n"),
"\0x3na\n".len())
method write*(s: TestLsStream, msg: seq[byte], msglen = -1) {.async.} = method write*(s: TestLsStream, msg: seq[byte], msglen = -1) {.async.} =
if s.step == 4: if s.step == 4:
@ -113,7 +125,9 @@ type
step*: int step*: int
na*: NaHandler na*: NaHandler
method readExactly*(s: TestNaStream, pbytes: pointer, nbytes: int): Future[void] {.async.} = method readExactly*(s: TestNaStream,
pbytes: pointer,
nbytes: int): Future[void] {.async.} =
case s.step: case s.step:
of 1: of 1:
var buf = newSeq[byte](1) var buf = newSeq[byte](1)
@ -132,8 +146,10 @@ method readExactly*(s: TestNaStream, pbytes: pointer, nbytes: int): Future[void]
of 4: of 4:
var buf = "/test/proto/1.0.0\n" var buf = "/test/proto/1.0.0\n"
copyMem(cast[pointer](cast[uint](pbytes)), addr buf[0], buf.len()) copyMem(cast[pointer](cast[uint](pbytes)), addr buf[0], buf.len())
else: else:
copyMem(cast[pointer](cast[uint](pbytes)), cstring("\0x3na\n"), "\0x3na\n".len()) copyMem(cast[pointer](cast[uint](pbytes)),
cstring("\0x3na\n"),
"\0x3na\n".len())
method write*(s: TestNaStream, msg: string, msglen = -1) {.async.} = method write*(s: TestNaStream, msg: string, msglen = -1) {.async.} =
if s.step == 4: if s.step == 4:
@ -159,7 +175,8 @@ suite "Multistream select":
let ms = newMultistream() let ms = newMultistream()
let conn = newConnection(newTestHandlesStream()) let conn = newConnection(newTestHandlesStream())
proc testHandler(conn: Connection, proto: string): Future[void] {.async.} = proc testHandler(conn: Connection,
proto: string): Future[void] {.async.} =
check proto == "/test/proto/1.0.0" check proto == "/test/proto/1.0.0"
ms.addHandler("/test/proto/1.0.0", testHandler) ms.addHandler("/test/proto/1.0.0", testHandler)
@ -168,7 +185,7 @@ suite "Multistream select":
check: check:
waitFor(testHandle()) == true waitFor(testHandle()) == true
test "test handle `ls`": test "test handle `ls`":
proc testLs(): Future[bool] {.async.} = proc testLs(): Future[bool] {.async.} =
let ms = newMultistream() let ms = newMultistream()
@ -181,7 +198,8 @@ suite "Multistream select":
check strProto == "\x26/test/proto1/1.0.0\n/test/proto2/1.0.0\n" check strProto == "\x26/test/proto1/1.0.0\n/test/proto2/1.0.0\n"
await conn.close() await conn.close()
proc testHandler(conn: Connection, proto: string): Future[void] {.async.} = discard proc testHandler(conn: Connection,
proto: string): Future[void] {.async.} = discard
ms.addHandler("/test/proto1/1.0.0", testHandler) ms.addHandler("/test/proto1/1.0.0", testHandler)
ms.addHandler("/test/proto2/1.0.0", testHandler) ms.addHandler("/test/proto2/1.0.0", testHandler)
await ms.handle(conn) await ms.handle(conn)
@ -201,7 +219,8 @@ suite "Multistream select":
check cast[string](msg) == "\x3na\n" check cast[string](msg) == "\x3na\n"
await conn.close() await conn.close()
proc testHandler(conn: Connection, proto: string): Future[void] {.async.} = discard proc testHandler(conn: Connection,
proto: string): Future[void] {.async.} = discard
ms.addHandler("/unabvailable/proto/1.0.0", testHandler) ms.addHandler("/unabvailable/proto/1.0.0", testHandler)
await ms.handle(conn) await ms.handle(conn)
@ -210,10 +229,11 @@ suite "Multistream select":
check: check:
waitFor(testNa()) == true waitFor(testNa()) == true
test "end to end - handle": test "e2e - handle":
proc endToEnd(): Future[bool] {.async.} = proc endToEnd(): Future[bool] {.async.} =
let ma: MultiAddress = Multiaddress.init("/ip4/127.0.0.1/tcp/53340") let ma: MultiAddress = Multiaddress.init("/ip4/127.0.0.1/tcp/53340")
proc testHandler(conn: Connection, proto: string): Future[void] {.async.} = proc testHandler(conn: Connection,
proto: string): Future[void] {.async.} =
check proto == "/test/proto/1.0.0" check proto == "/test/proto/1.0.0"
await conn.writeLp("Hello!") await conn.writeLp("Hello!")
await conn.close() await conn.close()
@ -221,7 +241,7 @@ suite "Multistream select":
let msListen = newMultistream() let msListen = newMultistream()
msListen.addHandler("/test/proto/1.0.0", testHandler) msListen.addHandler("/test/proto/1.0.0", testHandler)
proc connHandler(conn: Connection): Future[void] {.async ,gcsafe.} = proc connHandler(conn: Connection): Future[void] {.async, gcsafe.} =
await msListen.handle(conn) await msListen.handle(conn)
let transport1: TcpTransport = newTransport(TcpTransport) let transport1: TcpTransport = newTransport(TcpTransport)
@ -230,7 +250,7 @@ suite "Multistream select":
let msDial = newMultistream() let msDial = newMultistream()
let transport2: TcpTransport = newTransport(TcpTransport) let transport2: TcpTransport = newTransport(TcpTransport)
let conn = await transport2.dial(ma) let conn = await transport2.dial(ma)
let res = await msDial.select(conn, "/test/proto/1.0.0") let res = await msDial.select(conn, "/test/proto/1.0.0")
check res == true check res == true
@ -241,17 +261,18 @@ suite "Multistream select":
check: check:
waitFor(endToEnd()) == true waitFor(endToEnd()) == true
test "end to end - ls": test "e2e - ls":
proc endToEnd(): Future[bool] {.async.} = proc endToEnd(): Future[bool] {.async.} =
let ma: MultiAddress = Multiaddress.init("/ip4/127.0.0.1/tcp/53341") let ma: MultiAddress = Multiaddress.init("/ip4/127.0.0.1/tcp/53341")
let msListen = newMultistream() let msListen = newMultistream()
proc testHandler(conn: Connection, proto: string): Future[void] {.async.} = discard proc testHandler(conn: Connection,
proto: string): Future[void] {.async.} = discard
msListen.addHandler("/test/proto1/1.0.0", testHandler) msListen.addHandler("/test/proto1/1.0.0", testHandler)
msListen.addHandler("/test/proto2/1.0.0", testHandler) msListen.addHandler("/test/proto2/1.0.0", testHandler)
let transport1: TcpTransport = newTransport(TcpTransport) let transport1: TcpTransport = newTransport(TcpTransport)
proc connHandler(conn: Connection): Future[void] {.async ,gcsafe.} = proc connHandler(conn: Connection): Future[void] {.async, gcsafe.} =
await msListen.handle(conn) await msListen.handle(conn)
await transport1.listen(ma, connHandler) await transport1.listen(ma, connHandler)
@ -259,7 +280,7 @@ suite "Multistream select":
let msDial = newMultistream() let msDial = newMultistream()
let transport2: TcpTransport = newTransport(TcpTransport) let transport2: TcpTransport = newTransport(TcpTransport)
let conn = await transport2.dial(ma) let conn = await transport2.dial(ma)
let ls = await msDial.list(conn) let ls = await msDial.list(conn)
let protos: seq[string] = @["/test/proto1/1.0.0", "/test/proto2/1.0.0"] let protos: seq[string] = @["/test/proto1/1.0.0", "/test/proto2/1.0.0"]
await conn.close() await conn.close()

View File

@ -7,7 +7,7 @@ suite "TCP transport suite":
test "test listener: handle write": test "test listener: handle write":
proc testListener(): Future[bool] {.async.} = proc testListener(): Future[bool] {.async.} =
let ma: MultiAddress = Multiaddress.init("/ip4/127.0.0.1/tcp/53335") let ma: MultiAddress = Multiaddress.init("/ip4/127.0.0.1/tcp/53335")
proc connHandler(conn: Connection): Future[void] {.async ,gcsafe.} = proc connHandler(conn: Connection): Future[void] {.async, gcsafe.} =
result = conn.write(cstring("Hello!"), 6) result = conn.write(cstring("Hello!"), 6)
let transport: TcpTransport = newTransport(TcpTransport) let transport: TcpTransport = newTransport(TcpTransport)
@ -20,12 +20,12 @@ suite "TCP transport suite":
result = cast[string](msg) == "Hello!" result = cast[string](msg) == "Hello!"
check: check:
waitFor(testListener()) == true waitFor(testListener()) == true
test "test listener: handle read": test "test listener: handle read":
proc testListener(): Future[bool] {.async.} = proc testListener(): Future[bool] {.async.} =
let ma: MultiAddress = Multiaddress.init("/ip4/127.0.0.1/tcp/53336") let ma: MultiAddress = Multiaddress.init("/ip4/127.0.0.1/tcp/53336")
proc connHandler(conn: Connection): Future[void] {.async ,gcsafe.} = proc connHandler(conn: Connection): Future[void] {.async, gcsafe.} =
let msg = await conn.read(6) let msg = await conn.read(6)
check cast[string](msg) == "Hello!" check cast[string](msg) == "Hello!"
@ -36,7 +36,7 @@ suite "TCP transport suite":
result = sent == 6 result = sent == 6
check: check:
waitFor(testListener()) == true waitFor(testListener()) == true
test "test dialer: handle write": test "test dialer: handle write":
proc testDialer(address: TransportAddress): Future[bool] {.async.} = proc testDialer(address: TransportAddress): Future[bool] {.async.} =
@ -65,36 +65,36 @@ suite "TCP transport suite":
check waitFor(testDialer(initTAddress("127.0.0.1:53337"))) == true check waitFor(testDialer(initTAddress("127.0.0.1:53337"))) == true
test "test dialer: handle write": test "test dialer: handle write":
proc testDialer(address: TransportAddress): Future[bool] {.async.} = proc testDialer(address: TransportAddress): Future[bool] {.async.} =
proc serveClient(server: StreamServer, proc serveClient(server: StreamServer,
transp: StreamTransport) {.async.} = transp: StreamTransport) {.async.} =
var rstream = newAsyncStreamReader(transp) var rstream = newAsyncStreamReader(transp)
let msg = await rstream.read(6) let msg = await rstream.read(6)
check cast[string](msg) == "Hello!" check cast[string](msg) == "Hello!"
await rstream.closeWait()
await transp.closeWait()
server.stop()
server.close()
var server = createStreamServer(address, serveClient, {ReuseAddr})
server.start()
let ma: MultiAddress = Multiaddress.init("/ip4/127.0.0.1/tcp/53337")
let transport: TcpTransport = newTransport(TcpTransport)
let conn = await transport.dial(ma)
await conn.write(cstring("Hello!"), 6)
result = true
await rstream.closeWait()
await transp.closeWait()
server.stop() server.stop()
server.close() server.close()
await server.join()
check waitFor(testDialer(initTAddress("127.0.0.1:53337"))) == true
test "test listener - dialer: handle write": var server = createStreamServer(address, serveClient, {ReuseAddr})
server.start()
let ma: MultiAddress = Multiaddress.init("/ip4/127.0.0.1/tcp/53337")
let transport: TcpTransport = newTransport(TcpTransport)
let conn = await transport.dial(ma)
await conn.write(cstring("Hello!"), 6)
result = true
server.stop()
server.close()
await server.join()
check waitFor(testDialer(initTAddress("127.0.0.1:53337"))) == true
test "e2e: handle write":
proc testListenerDialer(): Future[bool] {.async.} = proc testListenerDialer(): Future[bool] {.async.} =
let ma: MultiAddress = Multiaddress.init("/ip4/127.0.0.1/tcp/53339") let ma: MultiAddress = Multiaddress.init("/ip4/127.0.0.1/tcp/53339")
proc connHandler(conn: Connection): Future[void] {.async ,gcsafe.} = proc connHandler(conn: Connection): Future[void] {.async, gcsafe.} =
result = conn.write(cstring("Hello!"), 6) result = conn.write(cstring("Hello!"), 6)
let transport1: TcpTransport = newTransport(TcpTransport) let transport1: TcpTransport = newTransport(TcpTransport)
@ -108,12 +108,12 @@ suite "TCP transport suite":
result = cast[string](msg) == "Hello!" result = cast[string](msg) == "Hello!"
check: check:
waitFor(testListenerDialer()) == true waitFor(testListenerDialer()) == true
test "test listener - dialer: handle read": test "e2e: handle read":
proc testListenerDialer(): Future[bool] {.async.} = proc testListenerDialer(): Future[bool] {.async.} =
let ma: MultiAddress = Multiaddress.init("/ip4/127.0.0.1/tcp/53340") let ma: MultiAddress = Multiaddress.init("/ip4/127.0.0.1/tcp/53340")
proc connHandler(conn: Connection): Future[void] {.async ,gcsafe.} = proc connHandler(conn: Connection): Future[void] {.async, gcsafe.} =
let msg = await conn.read(6) let msg = await conn.read(6)
check cast[string](msg) == "Hello!" check cast[string](msg) == "Hello!"
@ -127,4 +127,4 @@ suite "TCP transport suite":
result = true result = true
check: check:
waitFor(testListenerDialer()) == true waitFor(testListenerDialer()) == true