diff --git a/libp2p/connection.nim b/libp2p/connection.nim index 0eeacff7f..8eb925bd1 100644 --- a/libp2p/connection.nim +++ b/libp2p/connection.nim @@ -102,7 +102,7 @@ proc readLp*(s: Connection): Future[seq[byte]] {.async, gcsafe.} = if size > 0.uint: await s.readExactly(addr result[0], int(size)) except LPStreamIncompleteError, LPStreamReadError: - warn "readLp: could not read from remote, this is usually ok", exception = getCurrentExceptionMsg() + trace "remote connection closed", exc = getCurrentExceptionMsg() proc writeLp*(s: Connection, msg: string | seq[byte]): Future[void] {.gcsafe.} = ## write lenght prefixed diff --git a/libp2p/multistream.nim b/libp2p/multistream.nim index 7fb8b4545..c73bf8d9a 100644 --- a/libp2p/multistream.nim +++ b/libp2p/multistream.nim @@ -22,11 +22,10 @@ const Codec* = "/multistream/1.0.0" MSCodec* = "\x13" & Codec & "\n" - Na = "\x03na\n" - Ls = "\x03ls\n" + Na* = "\x03na\n" + Ls* = "\x03ls\n" type - MultisteamSelectException = object of CatchableError Matcher* = proc (proto: string): bool {.gcsafe.} HandlerHolder* = object diff --git a/libp2p/muxers/mplex/mplex.nim b/libp2p/muxers/mplex/mplex.nim index a86c8be80..7b8d760de 100644 --- a/libp2p/muxers/mplex/mplex.nim +++ b/libp2p/muxers/mplex/mplex.nim @@ -32,9 +32,6 @@ type currentId*: uint maxChannels*: uint -proc newMplexUnknownMsgError(): ref MplexUnknownMsgError = - result = newException(MplexUnknownMsgError, "Unknown mplex message type") - proc getChannelList(m: Mplex, initiator: bool): var Table[uint, LPChannel] = if initiator: result = m.remote @@ -102,7 +99,6 @@ method handle*(m: Mplex) {.async, gcsafe.} = trace "handle: resetting channel ", id = id await channel.resetByRemote() break - else: raise newMplexUnknownMsgError() except: error "exception occurred", exception = getCurrentExceptionMsg() finally: diff --git a/libp2p/muxers/mplex/types.nim b/libp2p/muxers/mplex/types.nim index b08b79f43..af09049c0 100644 --- a/libp2p/muxers/mplex/types.nim +++ b/libp2p/muxers/mplex/types.nim @@ -16,7 +16,6 @@ const MplexCodec* = "/mplex/6.7.0" const MaxReadWriteTime* = 5.seconds type - MplexUnknownMsgError* = object of CatchableError MplexNoSuchChannel* = object of CatchableError MessageType* {.pure.} = enum diff --git a/libp2p/stream/chronosstream.nim b/libp2p/stream/chronosstream.nim index 4cc921302..e85445173 100644 --- a/libp2p/stream/chronosstream.nim +++ b/libp2p/stream/chronosstream.nim @@ -100,5 +100,4 @@ method close*(s: ChronosStream) {.async, gcsafe.} = await s.writer.closeWait() await s.reader.closeWait() await s.client.closeWait() - s.closed = true diff --git a/tests/testmultistream.nim b/tests/testmultistream.nim index c6f139f26..16629ab7e 100644 --- a/tests/testmultistream.nim +++ b/tests/testmultistream.nim @@ -57,7 +57,7 @@ proc newTestSelectStream(): TestSelectStream = ## Mock stream for handles `ls` test type - LsHandler = proc(procs: seq[byte]): Future[void] + LsHandler = proc(procs: seq[byte]): Future[void] {.gcsafe.} TestLsStream = ref object of LPStream step*: int @@ -66,7 +66,7 @@ type method readExactly*(s: TestLsStream, pbytes: pointer, nbytes: int): - Future[void] {.async, gcsafe.} = + Future[void] {.async.} = case s.step: of 1: var buf = newSeq[byte](1) @@ -86,7 +86,7 @@ method readExactly*(s: TestLsStream, var buf = "ls\n" copyMem(pbytes, addr buf[0], buf.len()) else: - copyMem(pbytes, cstring("\0x3na\n"), "\0x3na\n".len()) + copyMem(pbytes, cstring(Na), Na.len()) method write*(s: TestLsStream, msg: seq[byte], msglen = -1) {.async, gcsafe.} = if s.step == 4: @@ -97,14 +97,14 @@ method write*(s: TestLsStream, msg: string, msglen = -1) method close(s: TestLsStream) {.async, gcsafe.} = s.closed = true -proc newTestLsStream(ls: LsHandler): TestLsStream = +proc newTestLsStream(ls: LsHandler): TestLsStream {.gcsafe.} = new result result.ls = ls result.step = 1 ## Mock stream for handles `na` test type - NaHandler = proc(procs: string): Future[void] + NaHandler = proc(procs: string): Future[void] {.gcsafe.} TestNaStream = ref object of LPStream step*: int @@ -185,10 +185,9 @@ suite "Multistream select": proc testLs(): Future[bool] {.async.} = let ms = newMultistream() - proc testLs(proto: seq[byte]): Future[void] {.async.} - let conn = newConnection(newTestLsStream(testLs)) - - proc testLs(proto: seq[byte]): Future[void] {.async.} = + proc testLsHandler(proto: seq[byte]) {.async, gcsafe.} # forward declaration + let conn = newConnection(newTestLsStream(testLsHandler)) + proc testLsHandler(proto: seq[byte]) {.async, gcsafe.} = var strProto: string = cast[string](proto) check strProto == "\x26/test/proto1/1.0.0\n/test/proto2/1.0.0\n" await conn.close() @@ -198,8 +197,8 @@ suite "Multistream select": peerInfo.peerId = some(PeerID.init(seckey)) var protocol: LPProtocol = new LPProtocol proc testHandler(conn: Connection, - proto: string): - Future[void] {.async, gcsafe.} = discard + proto: string): + Future[void] {.async, gcsafe.} = discard protocol.handler = testHandler ms.addHandler("/test/proto1/1.0.0", protocol) ms.addHandler("/test/proto2/1.0.0", protocol) @@ -213,11 +212,11 @@ suite "Multistream select": proc testNa(): Future[bool] {.async.} = let ms = newMultistream() - proc testNa(msg: string): Future[void] {.async.} - let conn = newConnection(newTestNaStream(testNa)) + proc testNaHandler(msg: string): Future[void] {.async, gcsafe.} + let conn = newConnection(newTestNaStream(testNaHandler)) - proc testNa(msg: string): Future[void] {.async.} = - check cast[string](msg) == "\x3na\n" + proc testNaHandler(msg: string): Future[void] {.async, gcsafe.} = + check cast[string](msg) == Na await conn.close() let seckey = PrivateKey.random(RSA) @@ -238,7 +237,7 @@ suite "Multistream select": test "e2e - handle": proc endToEnd(): Future[bool] {.async.} = - let ma: MultiAddress = Multiaddress.init("/ip4/127.0.0.1/tcp/53350") + let ma: MultiAddress = Multiaddress.init("/ip4/0.0.0.0/tcp/0") let seckey = PrivateKey.random(RSA) var peerInfo: PeerInfo @@ -263,7 +262,7 @@ suite "Multistream select": let msDial = newMultistream() let transport2: TcpTransport = newTransport(TcpTransport) - let conn = await transport2.dial(ma) + let conn = await transport2.dial(transport1.ma) check (await msDial.select(conn, "/test/proto/1.0.0")) == true @@ -276,7 +275,7 @@ suite "Multistream select": test "e2e - ls": proc endToEnd(): Future[bool] {.async.} = - let ma: MultiAddress = Multiaddress.init("/ip4/127.0.0.1/tcp/53351") + let ma: MultiAddress = Multiaddress.init("/ip4/0.0.0.0/tcp/0") let msListen = newMultistream() let seckey = PrivateKey.random(RSA) @@ -299,7 +298,7 @@ suite "Multistream select": let msDial = newMultistream() let transport2: TcpTransport = newTransport(TcpTransport) - let conn = await transport2.dial(ma) + let conn = await transport2.dial(transport1.ma) let ls = await msDial.list(conn) let protos: seq[string] = @["/test/proto1/1.0.0", "/test/proto2/1.0.0"] @@ -311,7 +310,7 @@ suite "Multistream select": test "e2e - select one from a list with unsupported protos": proc endToEnd(): Future[bool] {.async.} = - let ma: MultiAddress = Multiaddress.init("/ip4/127.0.0.1/tcp/53352") + let ma: MultiAddress = Multiaddress.init("/ip4/0.0.0.0/tcp/0") let seckey = PrivateKey.random(RSA) var peerInfo: PeerInfo @@ -336,7 +335,7 @@ suite "Multistream select": let msDial = newMultistream() let transport2: TcpTransport = newTransport(TcpTransport) - let conn = await transport2.dial(ma) + let conn = await transport2.dial(transport1.ma) check (await msDial.select(conn, @["/test/proto/1.0.0", "/test/no/proto/1.0.0"])) == "/test/proto/1.0.0" @@ -350,7 +349,7 @@ suite "Multistream select": test "e2e - select one with both valid": proc endToEnd(): Future[bool] {.async.} = - let ma: MultiAddress = Multiaddress.init("/ip4/127.0.0.1/tcp/53353") + let ma: MultiAddress = Multiaddress.init("/ip4/0.0.0.0/tcp/0") let seckey = PrivateKey.random(RSA) var peerInfo: PeerInfo @@ -375,7 +374,7 @@ suite "Multistream select": let msDial = newMultistream() let transport2: TcpTransport = newTransport(TcpTransport) - let conn = await transport2.dial(ma) + let conn = await transport2.dial(transport1.ma) check (await msDial.select(conn, @["/test/proto2/1.0.0", "/test/proto1/1.0.0"])) == "/test/proto2/1.0.0"