diff --git a/libp2p/connection.nim b/libp2p/connection.nim index f8b164e28..2cf952777 100644 --- a/libp2p/connection.nim +++ b/libp2p/connection.nim @@ -17,24 +17,33 @@ type peerInfo*: PeerInfo stream: ReadWrite -proc newConnection*(stream: ReadWrite): Connection = +proc newConnection*(stream: ReadWrite): Connection = ## create a new Connection for the specified async reader/writer new result result.stream = stream -method read*(s: Connection, n = -1): Future[seq[byte]] {.async.} = +method read*(s: Connection, n = -1): Future[seq[byte]] {.async.} = result = await s.stream.read(n) -method readExactly*(s: Connection, pbytes: pointer, nbytes: int): Future[void] {.async.} = +method readExactly*(s: Connection, + pbytes: pointer, + nbytes: int): Future[void] {.async.} = await s.stream.readExactly(pbytes, nbytes) -method readLine*(s: Connection, limit = 0, sep = "\r\n"): Future[string] {.async.} = +method readLine*(s: Connection, + limit = 0, + sep = "\r\n"): Future[string] {.async.} = result = await s.stream.readLine(limit, sep) -method readOnce*(s: Connection, pbytes: pointer, nbytes: int): Future[int] {.async.} = +method readOnce*(s: Connection, + pbytes: pointer, + nbytes: int): Future[int] {.async.} = result = await s.stream.readOnce(pbytes, nbytes) -method readUntil*(s: Connection, pbytes: pointer, nbytes: int, sep: seq[byte]): Future[int] {.async.} = +method readUntil*(s: Connection, + pbytes: pointer, + nbytes: int, + sep: seq[byte]): Future[int] {.async.} = result = await s.stream.readUntil(pbytes, nbytes, sep) method write*(s: Connection, pbytes: pointer, nbytes: int) {.async.} = @@ -50,7 +59,7 @@ method close*(s: Connection) {.async.} = await s.stream.close() s.closed = true -proc readLp*(s: Connection): Future[seq[byte]] {.async.} = +proc readLp*(s: Connection): Future[seq[byte]] {.async.} = ## read lenght prefixed msg var size: uint @@ -79,11 +88,12 @@ proc writeLp*(s: Connection, msg: string | seq[byte]) {.async.} = buf.finish() result = s.write(buf.buffer) -method getPeerInfo* (c: Connection): Future[PeerInfo] {.base, async.} = +method getPeerInfo* (c: Connection): Future[PeerInfo] {.base, async.} = ## get up to date peer info ## TODO: implement PeerInfo refresh over identify discard -method getObservedAddrs(c: Connection): Future[seq[MultiAddress]] {.base, async.} = +method getObservedAddrs(c: Connection): Future[seq[MultiAddress]] {.base, + async.} = ## get resolved multiaddresses for the connection discard diff --git a/libp2p/multistreamselect.nim b/libp2p/multistreamselect.nim index 8c8f13f10..8e6985ae8 100644 --- a/libp2p/multistreamselect.nim +++ b/libp2p/multistreamselect.nim @@ -17,7 +17,7 @@ const MultiCodec* = "\x13" & Codec & "\n" const Na = "\x03na\n" const Ls = "\x03ls\n" -type +type MultisteamSelectException = object of CatchableError Handler* = proc (conn: Connection, proto: string): Future[void] @@ -40,12 +40,14 @@ proc newMultistream*(): MultisteamSelect = result.ls = Ls result.na = Na -proc select*(m: MultisteamSelect, conn: Connection, proto: string = ""): Future[bool] {.async.} = +proc select*(m: MultisteamSelect, + conn: Connection, + proto: string = ""): Future[bool] {.async.} = ## select a remote protocol ## TODO: select should support a list of protos to be selected - await conn.write(m.codec) # write handshake - if proto.len() > 0: + await conn.write(m.codec) # write handshake + if proto.len() > 0: await conn.writeLp(proto) # select proto var ms = cast[string](await conn.readLp()) @@ -60,12 +62,13 @@ proc select*(m: MultisteamSelect, conn: Connection, proto: string = ""): Future[ ms.removeSuffix("\n") result = ms == proto -proc list*(m: MultisteamSelect, conn: Connection): Future[seq[string]] {.async.} = +proc list*(m: MultisteamSelect, + conn: Connection): Future[seq[string]] {.async.} = ## list remote protos requests on connection if not (await m.select(conn)): return - await conn.write(m.ls) # send ls + await conn.write(m.ls) # send ls var list = newSeq[string]() let ms = cast[string](await conn.readLp()) @@ -96,7 +99,7 @@ proc handle*(m: MultisteamSelect, conn: Connection) {.async.} = for h in m.handlers: protos &= (h.proto & "\n") await conn.writeLp(cast[seq[byte]](toSeq(protos.items))) - else: + else: for h in m.handlers: if (not isNil(h.match) and h.match(ms)) or ms == h.proto: await conn.writeLp(h.proto & "\n") @@ -104,11 +107,11 @@ proc handle*(m: MultisteamSelect, conn: Connection) {.async.} = return await conn.write(m.na) -proc addHandler*(m: MultisteamSelect, - proto: string, - handler: Handler, - matcher: Matcher = nil) = +proc addHandler*(m: MultisteamSelect, + proto: string, + handler: Handler, + matcher: Matcher = nil) = ## register a handler for the protocol - m.handlers.add(HandlerHolder(proto: proto, - handler: handler, + m.handlers.add(HandlerHolder(proto: proto, + handler: handler, match: matcher)) diff --git a/libp2p/readerwriter.nim b/libp2p/readerwriter.nim index 30e2333e1..df73b1440 100644 --- a/libp2p/readerwriter.nim +++ b/libp2p/readerwriter.nim @@ -12,38 +12,40 @@ import chronos type ReadWrite* = ref object of RootObj closed*: bool -method read*(s: ReadWrite, n = -1): Future[seq[byte]] +method read*(s: ReadWrite, n = -1): Future[seq[byte]] {.base, async.} = discard -method readExactly*(s: ReadWrite, pbytes: pointer, nbytes: int): Future[void] +method readExactly*(s: ReadWrite, pbytes: pointer, nbytes: int): Future[void] {.base, async.} = discard -method readLine*(s: ReadWrite, limit = 0, sep = "\r\n"): Future[string] - {.base, async.} = +method readLine*(s: ReadWrite, limit = 0, sep = "\r\n"): Future[string] + {.base, async.} = discard -method readOnce*(s: ReadWrite, pbytes: pointer, nbytes: int): Future[int] - {.base, async.} = +method readOnce*(s: ReadWrite, pbytes: pointer, nbytes: int): Future[int] + {.base, async.} = discard -method readUntil*(s: ReadWrite, pbytes: pointer, nbytes: int, sep: seq[byte]): Future[int] - {.base, async.} = +method readUntil*(s: ReadWrite, + pbytes: pointer, nbytes: int, + sep: seq[byte]): Future[int] + {.base, async.} = discard method write*(s: ReadWrite, pbytes: pointer, nbytes: int) - {.base, async.} = + {.base, async.} = discard method write*(s: ReadWrite, msg: string, msglen = -1) - {.base, async.} = + {.base, async.} = discard method write*(s: ReadWrite, msg: seq[byte], msglen = -1) - {.base, async.} = + {.base, async.} = discard -method close*(s: ReadWrite) - {.base, async.} = +method close*(s: ReadWrite) + {.base, async.} = discard diff --git a/libp2p/tcptransport.nim b/libp2p/tcptransport.nim index afb93bb44..2bff235c6 100644 --- a/libp2p/tcptransport.nim +++ b/libp2p/tcptransport.nim @@ -8,14 +8,17 @@ ## those terms. import chronos -import transport, wire, connection, multiaddress, connection, multicodec, chronosstream +import transport, wire, connection, + multiaddress, connection, + multicodec, chronosstream type TcpTransport* = ref object of Transport server*: StreamServer method connHandler*(t: Transport, server: StreamServer, - client: StreamTransport): Future[Connection] {.base, gcsafe, async.} = + client: StreamTransport): Future[Connection] + {.base, gcsafe, async.} = let conn: Connection = newConnection(newChronosStream(server, client)) let handlerFut = if t.handler == nil: nil else: t.handler(conn) let connHolder: ConnHolder = ConnHolder(connection: conn, @@ -38,7 +41,9 @@ method close*(t: TcpTransport): Future[void] {.async.} = t.server.stop() await t.server.closeWait() -method listen*(t: TcpTransport, ma: MultiAddress, handler: ConnHandler): Future[void] {.async.} = +method listen*(t: TcpTransport, + ma: MultiAddress, + handler: ConnHandler): Future[void] {.async.} = await procCall Transport(t).listen(ma, handler) # call base ## listen on the transport @@ -50,7 +55,8 @@ method listen*(t: TcpTransport, ma: MultiAddress, handler: ConnHandler): Future[ server.start() listenFuture.complete() -method dial*(t: TcpTransport, address: MultiAddress): Future[Connection] {.async.} = +method dial*(t: TcpTransport, + address: MultiAddress): Future[Connection] {.async.} = ## dial a peer let client: StreamTransport = await connect(address) result = await t.connHandler(t.server, client) diff --git a/libp2p/transport.nim b/libp2p/transport.nim index 825da2367..f1c5a368e 100644 --- a/libp2p/transport.nim +++ b/libp2p/transport.nim @@ -23,11 +23,11 @@ type handler*: ConnHandler multicodec*: MultiCodec -method init*(t: Transport) {.base, error: "not implemented".} = +method init*(t: Transport) {.base, error: "not implemented".} = ## perform protocol initialization discard -proc newTransport*(t: typedesc[Transport]): t = +proc newTransport*(t: typedesc[Transport]): t = new result result.init() @@ -37,16 +37,19 @@ method close*(t: Transport) {.base, async.} = for c in t.connections: await c.connection.close() -method listen*(t: Transport, ma: MultiAddress, handler: ConnHandler) {.base, async.} = +method listen*(t: Transport, + ma: MultiAddress, + handler: ConnHandler) {.base, async.} = ## listen for incoming connections t.ma = ma t.handler = handler -method dial*(t: Transport, address: MultiAddress): Future[Connection] {.base, async.} = +method dial*(t: Transport, + address: MultiAddress): Future[Connection] {.base, async.} = ## dial a peer discard -method supports(t: Transport, address: MultiAddress): bool {.base.} = +method supports(t: Transport, address: MultiAddress): bool {.base.} = ## check if transport supportes the multiaddress # TODO: this should implement generic logic that would use the multicodec # declared in the multicodec field and set by each individual transport diff --git a/tests/testmultistreamselect.nim b/tests/testmultistreamselect.nim index c9852e1f4..383a5ba9c 100644 --- a/tests/testmultistreamselect.nim +++ b/tests/testmultistreamselect.nim @@ -1,6 +1,6 @@ import unittest, strutils, sequtils, sugar import chronos -import ../libp2p/connection, ../libp2p/multistreamselect, +import ../libp2p/connection, ../libp2p/multistreamselect, ../libp2p/readerwriter, ../libp2p/connection, ../libp2p/multiaddress, ../libp2p/transport, ../libp2p/tcptransport @@ -9,7 +9,9 @@ type TestSelectStream = ref object of ReadWrite step*: int -method readExactly*(s: TestSelectStream, pbytes: pointer, nbytes: int): Future[void] {.async.} = +method readExactly*(s: TestSelectStream, + pbytes: pointer, + nbytes: int): Future[void] {.async.} = case s.step: of 1: var buf = newSeq[byte](1) @@ -28,8 +30,10 @@ method readExactly*(s: TestSelectStream, pbytes: pointer, nbytes: int): Future[v of 4: var buf = "/test/proto/1.0.0\n" copyMem(cast[pointer](cast[uint](pbytes)), addr buf[0], buf.len()) - else: - copyMem(cast[pointer](cast[uint](pbytes)), cstring("\0x3na\n"), "\0x3na\n".len()) + else: + copyMem(cast[pointer](cast[uint](pbytes)), + cstring("\0x3na\n"), + "\0x3na\n".len()) proc newTestSelectStream(): TestSelectStream = new result @@ -40,7 +44,9 @@ type TestHandlesStream = ref object of ReadWrite step*: int -method readExactly*(s: TestHandlesStream, pbytes: pointer, nbytes: int): Future[void] {.async.} = +method readExactly*(s: TestHandlesStream, + pbytes: pointer, + nbytes: int): Future[void] {.async.} = case s.step: of 1: var buf = newSeq[byte](1) @@ -59,8 +65,10 @@ method readExactly*(s: TestHandlesStream, pbytes: pointer, nbytes: int): Future[ of 4: var buf = "/test/proto/1.0.0\n" copyMem(cast[pointer](cast[uint](pbytes)), addr buf[0], buf.len()) - else: - copyMem(cast[pointer](cast[uint](pbytes)), cstring("\0x3na\n"), "\0x3na\n".len()) + else: + copyMem(cast[pointer](cast[uint](pbytes)), + cstring("\0x3na\n"), + "\0x3na\n".len()) proc newTestHandlesStream(): TestHandlesStream = new result @@ -74,7 +82,9 @@ type step*: int ls*: LsHandler -method readExactly*(s: TestLsStream, pbytes: pointer, nbytes: int): Future[void] {.async.} = +method readExactly*(s: TestLsStream, + pbytes: pointer, + nbytes: int): Future[void] {.async.} = case s.step: of 1: var buf = newSeq[byte](1) @@ -93,8 +103,10 @@ method readExactly*(s: TestLsStream, pbytes: pointer, nbytes: int): Future[void] of 4: var buf = "ls\n" copyMem(cast[pointer](cast[uint](pbytes)), addr buf[0], buf.len()) - else: - copyMem(cast[pointer](cast[uint](pbytes)), cstring("\0x3na\n"), "\0x3na\n".len()) + else: + copyMem(cast[pointer](cast[uint](pbytes)), + cstring("\0x3na\n"), + "\0x3na\n".len()) method write*(s: TestLsStream, msg: seq[byte], msglen = -1) {.async.} = if s.step == 4: @@ -113,7 +125,9 @@ type step*: int na*: NaHandler -method readExactly*(s: TestNaStream, pbytes: pointer, nbytes: int): Future[void] {.async.} = +method readExactly*(s: TestNaStream, + pbytes: pointer, + nbytes: int): Future[void] {.async.} = case s.step: of 1: var buf = newSeq[byte](1) @@ -132,8 +146,10 @@ method readExactly*(s: TestNaStream, pbytes: pointer, nbytes: int): Future[void] of 4: var buf = "/test/proto/1.0.0\n" copyMem(cast[pointer](cast[uint](pbytes)), addr buf[0], buf.len()) - else: - copyMem(cast[pointer](cast[uint](pbytes)), cstring("\0x3na\n"), "\0x3na\n".len()) + else: + copyMem(cast[pointer](cast[uint](pbytes)), + cstring("\0x3na\n"), + "\0x3na\n".len()) method write*(s: TestNaStream, msg: string, msglen = -1) {.async.} = if s.step == 4: @@ -159,7 +175,8 @@ suite "Multistream select": let ms = newMultistream() let conn = newConnection(newTestHandlesStream()) - proc testHandler(conn: Connection, proto: string): Future[void] {.async.} = + proc testHandler(conn: Connection, + proto: string): Future[void] {.async.} = check proto == "/test/proto/1.0.0" ms.addHandler("/test/proto/1.0.0", testHandler) @@ -168,7 +185,7 @@ suite "Multistream select": check: waitFor(testHandle()) == true - + test "test handle `ls`": proc testLs(): Future[bool] {.async.} = let ms = newMultistream() @@ -181,7 +198,8 @@ suite "Multistream select": check strProto == "\x26/test/proto1/1.0.0\n/test/proto2/1.0.0\n" await conn.close() - proc testHandler(conn: Connection, proto: string): Future[void] {.async.} = discard + proc testHandler(conn: Connection, + proto: string): Future[void] {.async.} = discard ms.addHandler("/test/proto1/1.0.0", testHandler) ms.addHandler("/test/proto2/1.0.0", testHandler) await ms.handle(conn) @@ -201,7 +219,8 @@ suite "Multistream select": check cast[string](msg) == "\x3na\n" await conn.close() - proc testHandler(conn: Connection, proto: string): Future[void] {.async.} = discard + proc testHandler(conn: Connection, + proto: string): Future[void] {.async.} = discard ms.addHandler("/unabvailable/proto/1.0.0", testHandler) await ms.handle(conn) @@ -210,10 +229,11 @@ suite "Multistream select": check: waitFor(testNa()) == true - test "end to end - handle": + test "e2e - handle": proc endToEnd(): Future[bool] {.async.} = let ma: MultiAddress = Multiaddress.init("/ip4/127.0.0.1/tcp/53340") - proc testHandler(conn: Connection, proto: string): Future[void] {.async.} = + proc testHandler(conn: Connection, + proto: string): Future[void] {.async.} = check proto == "/test/proto/1.0.0" await conn.writeLp("Hello!") await conn.close() @@ -221,7 +241,7 @@ suite "Multistream select": let msListen = newMultistream() msListen.addHandler("/test/proto/1.0.0", testHandler) - proc connHandler(conn: Connection): Future[void] {.async ,gcsafe.} = + proc connHandler(conn: Connection): Future[void] {.async, gcsafe.} = await msListen.handle(conn) let transport1: TcpTransport = newTransport(TcpTransport) @@ -230,7 +250,7 @@ suite "Multistream select": let msDial = newMultistream() let transport2: TcpTransport = newTransport(TcpTransport) let conn = await transport2.dial(ma) - + let res = await msDial.select(conn, "/test/proto/1.0.0") check res == true @@ -241,17 +261,18 @@ suite "Multistream select": check: waitFor(endToEnd()) == true - test "end to end - ls": + test "e2e - ls": proc endToEnd(): Future[bool] {.async.} = let ma: MultiAddress = Multiaddress.init("/ip4/127.0.0.1/tcp/53341") let msListen = newMultistream() - proc testHandler(conn: Connection, proto: string): Future[void] {.async.} = discard + proc testHandler(conn: Connection, + proto: string): Future[void] {.async.} = discard msListen.addHandler("/test/proto1/1.0.0", testHandler) msListen.addHandler("/test/proto2/1.0.0", testHandler) let transport1: TcpTransport = newTransport(TcpTransport) - proc connHandler(conn: Connection): Future[void] {.async ,gcsafe.} = + proc connHandler(conn: Connection): Future[void] {.async, gcsafe.} = await msListen.handle(conn) await transport1.listen(ma, connHandler) @@ -259,7 +280,7 @@ suite "Multistream select": let msDial = newMultistream() let transport2: TcpTransport = newTransport(TcpTransport) let conn = await transport2.dial(ma) - + let ls = await msDial.list(conn) let protos: seq[string] = @["/test/proto1/1.0.0", "/test/proto2/1.0.0"] await conn.close() diff --git a/tests/testtransport.nim b/tests/testtransport.nim index 09f3e53f2..c54133c77 100644 --- a/tests/testtransport.nim +++ b/tests/testtransport.nim @@ -7,7 +7,7 @@ suite "TCP transport suite": test "test listener: handle write": proc testListener(): Future[bool] {.async.} = let ma: MultiAddress = Multiaddress.init("/ip4/127.0.0.1/tcp/53335") - proc connHandler(conn: Connection): Future[void] {.async ,gcsafe.} = + proc connHandler(conn: Connection): Future[void] {.async, gcsafe.} = result = conn.write(cstring("Hello!"), 6) let transport: TcpTransport = newTransport(TcpTransport) @@ -20,12 +20,12 @@ suite "TCP transport suite": result = cast[string](msg) == "Hello!" check: - waitFor(testListener()) == true + waitFor(testListener()) == true test "test listener: handle read": proc testListener(): Future[bool] {.async.} = let ma: MultiAddress = Multiaddress.init("/ip4/127.0.0.1/tcp/53336") - proc connHandler(conn: Connection): Future[void] {.async ,gcsafe.} = + proc connHandler(conn: Connection): Future[void] {.async, gcsafe.} = let msg = await conn.read(6) check cast[string](msg) == "Hello!" @@ -36,7 +36,7 @@ suite "TCP transport suite": result = sent == 6 check: - waitFor(testListener()) == true + waitFor(testListener()) == true test "test dialer: handle write": proc testDialer(address: TransportAddress): Future[bool] {.async.} = @@ -65,36 +65,36 @@ suite "TCP transport suite": check waitFor(testDialer(initTAddress("127.0.0.1:53337"))) == true test "test dialer: handle write": - proc testDialer(address: TransportAddress): Future[bool] {.async.} = - proc serveClient(server: StreamServer, + proc testDialer(address: TransportAddress): Future[bool] {.async.} = + proc serveClient(server: StreamServer, transp: StreamTransport) {.async.} = - var rstream = newAsyncStreamReader(transp) - let msg = await rstream.read(6) - check cast[string](msg) == "Hello!" - - await rstream.closeWait() - await transp.closeWait() - server.stop() - server.close() - - var server = createStreamServer(address, serveClient, {ReuseAddr}) - server.start() - - let ma: MultiAddress = Multiaddress.init("/ip4/127.0.0.1/tcp/53337") - let transport: TcpTransport = newTransport(TcpTransport) - let conn = await transport.dial(ma) - await conn.write(cstring("Hello!"), 6) - result = true + var rstream = newAsyncStreamReader(transp) + let msg = await rstream.read(6) + check cast[string](msg) == "Hello!" + await rstream.closeWait() + await transp.closeWait() server.stop() server.close() - await server.join() - check waitFor(testDialer(initTAddress("127.0.0.1:53337"))) == true - test "test listener - dialer: handle write": + var server = createStreamServer(address, serveClient, {ReuseAddr}) + server.start() + + let ma: MultiAddress = Multiaddress.init("/ip4/127.0.0.1/tcp/53337") + let transport: TcpTransport = newTransport(TcpTransport) + let conn = await transport.dial(ma) + await conn.write(cstring("Hello!"), 6) + result = true + + server.stop() + server.close() + await server.join() + check waitFor(testDialer(initTAddress("127.0.0.1:53337"))) == true + + test "e2e: handle write": proc testListenerDialer(): Future[bool] {.async.} = let ma: MultiAddress = Multiaddress.init("/ip4/127.0.0.1/tcp/53339") - proc connHandler(conn: Connection): Future[void] {.async ,gcsafe.} = + proc connHandler(conn: Connection): Future[void] {.async, gcsafe.} = result = conn.write(cstring("Hello!"), 6) let transport1: TcpTransport = newTransport(TcpTransport) @@ -108,12 +108,12 @@ suite "TCP transport suite": result = cast[string](msg) == "Hello!" check: - waitFor(testListenerDialer()) == true + waitFor(testListenerDialer()) == true - test "test listener - dialer: handle read": + test "e2e: handle read": proc testListenerDialer(): Future[bool] {.async.} = let ma: MultiAddress = Multiaddress.init("/ip4/127.0.0.1/tcp/53340") - proc connHandler(conn: Connection): Future[void] {.async ,gcsafe.} = + proc connHandler(conn: Connection): Future[void] {.async, gcsafe.} = let msg = await conn.read(6) check cast[string](msg) == "Hello!" @@ -127,4 +127,4 @@ suite "TCP transport suite": result = true check: - waitFor(testListenerDialer()) == true + waitFor(testListenerDialer()) == true