diff --git a/libp2p/connection.nim b/libp2p/connection.nim index 854b92082..3e29c6ac9 100644 --- a/libp2p/connection.nim +++ b/libp2p/connection.nim @@ -21,15 +21,13 @@ type Connection* = ref object of LPStream peerInfo*: PeerInfo stream*: LPStream + observedAddrs*: Multiaddress proc newConnection*(stream: LPStream): Connection = ## create a new Connection for the specified async reader/writer new result result.stream = stream -proc `=peerInfo`*(s: Connection, peerInfo: PeerInfo) = - s.peerInfo = peerInfo - method read*(s: Connection, n = -1): Future[seq[byte]] {.gcsafe.} = result = s.stream.read(n) @@ -115,7 +113,7 @@ proc writeLp*(s: Connection, msg: string | seq[byte]): Future[void] {.gcsafe.} = method getObservedAddrs*(c: Connection): Future[MultiAddress] {.base, async, gcsafe.} = ## get resolved multiaddresses for the connection - discard + result = c.observedAddrs proc `$`*(conn: Connection): string = if conn.peerInfo.peerId.isSome: diff --git a/libp2p/muxers/mplex/coder.nim b/libp2p/muxers/mplex/coder.nim index 092e32236..8cac34882 100644 --- a/libp2p/muxers/mplex/coder.nim +++ b/libp2p/muxers/mplex/coder.nim @@ -10,9 +10,9 @@ import chronos, options, sequtils, strformat import nimcrypto/utils, chronicles import types, - ../../connection, - ../../varint, - ../../vbuffer, + ../../connection, + ../../varint, + ../../vbuffer, ../../stream/lpstream logScope: diff --git a/libp2p/muxers/mplex/channel.nim b/libp2p/muxers/mplex/lpchannel.nim similarity index 78% rename from libp2p/muxers/mplex/channel.nim rename to libp2p/muxers/mplex/lpchannel.nim index fd14c8421..030cbbbef 100644 --- a/libp2p/muxers/mplex/channel.nim +++ b/libp2p/muxers/mplex/lpchannel.nim @@ -9,11 +9,11 @@ import strformat import chronos, chronicles -import types, +import types, coder, nimcrypto/utils, - ../../stream/bufferstream, - ../../stream/lpstream, + ../../stream/bufferstream, + ../../stream/lpstream, ../../connection logScope: @@ -22,7 +22,7 @@ logScope: const DefaultChannelSize* = DefaultBufferSize * 64 # 64kb type - Channel* = ref object of BufferStream + LPChannel* = ref object of BufferStream id*: uint name*: string conn*: Connection @@ -40,7 +40,7 @@ proc newChannel*(id: uint, conn: Connection, initiator: bool, name: string = "", - size: int = DefaultChannelSize): Channel = + size: int = DefaultChannelSize): LPChannel = new result result.id = id result.name = name @@ -61,46 +61,46 @@ proc newChannel*(id: uint, result.initBufferStream(writeHandler, size) -proc closeMessage(s: Channel) {.async, gcsafe.} = +proc closeMessage(s: LPChannel) {.async, gcsafe.} = await s.conn.writeMsg(s.id, s.closeCode) # write header -proc closed*(s: Channel): bool = +proc closed*(s: LPChannel): bool = s.closedLocal and s.closedLocal -proc closedByRemote*(s: Channel) {.async.} = +proc closedByRemote*(s: LPChannel) {.async.} = s.closedRemote = true -proc cleanUp*(s: Channel): Future[void] = +proc cleanUp*(s: LPChannel): Future[void] = result = procCall close(BufferStream(s)) -method close*(s: Channel) {.async, gcsafe.} = +method close*(s: LPChannel) {.async, gcsafe.} = s.closedLocal = true await s.closeMessage() -proc resetMessage(s: Channel) {.async, gcsafe.} = +proc resetMessage(s: LPChannel) {.async, gcsafe.} = await s.conn.writeMsg(s.id, s.resetCode) -proc resetByRemote*(s: Channel) {.async, gcsafe.} = +proc resetByRemote*(s: LPChannel) {.async, gcsafe.} = await allFutures(s.close(), s.closedByRemote()) s.isReset = true -proc reset*(s: Channel) {.async.} = +proc reset*(s: LPChannel) {.async.} = await allFutures(s.resetMessage(), s.resetByRemote()) -proc isReadEof(s: Channel): bool = +proc isReadEof(s: LPChannel): bool = bool((s.closedRemote or s.closedLocal) and s.len() < 1) -proc pushTo*(s: Channel, data: seq[byte]): Future[void] {.gcsafe.} = +proc pushTo*(s: LPChannel, data: seq[byte]): Future[void] {.gcsafe.} = if s.closedRemote: raise newLPStreamClosedError() result = procCall pushTo(BufferStream(s), data) -method read*(s: Channel, n = -1): Future[seq[byte]] {.gcsafe.} = +method read*(s: LPChannel, n = -1): Future[seq[byte]] {.gcsafe.} = if s.isReadEof(): raise newLPStreamClosedError() result = procCall read(BufferStream(s), n) -method readExactly*(s: Channel, +method readExactly*(s: LPChannel, pbytes: pointer, nbytes: int): Future[void] {.gcsafe.} = @@ -108,7 +108,7 @@ method readExactly*(s: Channel, raise newLPStreamClosedError() result = procCall readExactly(BufferStream(s), pbytes, nbytes) -method readLine*(s: Channel, +method readLine*(s: LPChannel, limit = 0, sep = "\r\n"): Future[string] {.gcsafe.} = @@ -116,7 +116,7 @@ method readLine*(s: Channel, raise newLPStreamClosedError() result = procCall readLine(BufferStream(s), limit, sep) -method readOnce*(s: Channel, +method readOnce*(s: LPChannel, pbytes: pointer, nbytes: int): Future[int] {.gcsafe.} = @@ -124,7 +124,7 @@ method readOnce*(s: Channel, raise newLPStreamClosedError() result = procCall readOnce(BufferStream(s), pbytes, nbytes) -method readUntil*(s: Channel, +method readUntil*(s: LPChannel, pbytes: pointer, nbytes: int, sep: seq[byte]): Future[int] {.gcsafe.} = @@ -132,19 +132,19 @@ method readUntil*(s: Channel, raise newLPStreamClosedError() result = procCall readOnce(BufferStream(s), pbytes, nbytes) -method write*(s: Channel, +method write*(s: LPChannel, pbytes: pointer, nbytes: int): Future[void] {.gcsafe.} = if s.closedLocal: raise newLPStreamClosedError() result = procCall write(BufferStream(s), pbytes, nbytes) -method write*(s: Channel, msg: string, msglen = -1) {.async, gcsafe.} = +method write*(s: LPChannel, msg: string, msglen = -1) {.async, gcsafe.} = if s.closedLocal: raise newLPStreamClosedError() result = procCall write(BufferStream(s), msg, msglen) -method write*(s: Channel, msg: seq[byte], msglen = -1) {.async, gcsafe.} = +method write*(s: LPChannel, msg: seq[byte], msglen = -1) {.async, gcsafe.} = if s.closedLocal: raise newLPStreamClosedError() result = procCall write(BufferStream(s), msg, msglen) diff --git a/libp2p/muxers/mplex/mplex.nim b/libp2p/muxers/mplex/mplex.nim index 0cb57ad2c..8b2c39886 100644 --- a/libp2p/muxers/mplex/mplex.nim +++ b/libp2p/muxers/mplex/mplex.nim @@ -18,7 +18,7 @@ import tables, sequtils, options, strformat import chronos, chronicles -import coder, types, channel, +import coder, types, lpchannel, ../muxer, ../../varint, ../../connection, @@ -32,15 +32,15 @@ logScope: type Mplex* = ref object of Muxer - remote*: Table[uint, Channel] - local*: Table[uint, Channel] + remote*: Table[uint, LPChannel] + local*: Table[uint, LPChannel] currentId*: uint maxChannels*: uint proc newMplexUnknownMsgError(): ref MplexUnknownMsgError = result = newException(MplexUnknownMsgError, "Unknown mplex message type") -proc getChannelList(m: Mplex, initiator: bool): var Table[uint, Channel] = +proc getChannelList(m: Mplex, initiator: bool): var Table[uint, LPChannel] = if initiator: result = m.remote else: @@ -50,7 +50,7 @@ proc newStreamInternal*(m: Mplex, initiator: bool = true, chanId: uint = 0, name: string = ""): - Future[Channel] {.async, gcsafe.} = + Future[LPChannel] {.async, gcsafe.} = ## create new channel/stream let id = if initiator: m.currentId.inc(); m.currentId else: chanId result = newChannel(id, m.connection, initiator, name) @@ -66,7 +66,7 @@ method handle*(m: Mplex) {.async, gcsafe.} = let (id, msgType, data) = msgRes.get() let initiator = bool(ord(msgType) and 1) - var channel: Channel + var channel: LPChannel if MessageType(msgType) != MessageType.New: let channels = m.getChannelList(initiator) if not channels.contains(id): @@ -116,8 +116,8 @@ proc newMplex*(conn: Connection, new result result.connection = conn result.maxChannels = maxChanns - result.remote = initTable[uint, Channel]() - result.local = initTable[uint, Channel]() + result.remote = initTable[uint, LPChannel]() + result.local = initTable[uint, LPChannel]() method newStream*(m: Mplex, name: string = ""): Future[Connection] {.async, gcsafe.} = let channel = await m.newStreamInternal() diff --git a/libp2p/muxers/muxer.nim b/libp2p/muxers/muxer.nim index aa466ea2e..253e5c3db 100644 --- a/libp2p/muxers/muxer.nim +++ b/libp2p/muxers/muxer.nim @@ -8,7 +8,7 @@ ## those terms. import chronos, chronicles -import ../protocols/protocol, +import ../protocols/protocol, ../connection logScope: @@ -32,7 +32,7 @@ type method newStream*(m: Muxer, name: string = ""): Future[Connection] {.base, async, gcsafe.} = discard method close*(m: Muxer) {.base, async, gcsafe.} = discard method handle*(m: Muxer): Future[void] {.base, async, gcsafe.} = discard -method `=streamHandler`*(m: Muxer, handler: StreamHandler) {.base, gcsafe.} = +method `streamHandler=`*(m: Muxer, handler: StreamHandler) {.base, gcsafe.} = m.streamHandler = handler proc newMuxerProvider*(creator: MuxerCreator, codec: string): MuxerProvider {.gcsafe.} = @@ -41,7 +41,7 @@ proc newMuxerProvider*(creator: MuxerCreator, codec: string): MuxerProvider {.gc result.codec = codec result.init() -method `=streamHandler`*(m: MuxerProvider, handler: StreamHandler) {.base, gcsafe.} = +method `streamHandler=`*(m: MuxerProvider, handler: StreamHandler) {.base, gcsafe.} = ## new stream (channels) handler ## ## triggered every time there is a new @@ -50,7 +50,7 @@ method `=streamHandler`*(m: MuxerProvider, handler: StreamHandler) {.base, gcsaf ## m.streamHandler = handler -method `=muxerHandler`*(m: MuxerProvider, handler: MuxerHandler) {.base, gcsafe.} = +method `muxerHandler=`*(m: MuxerProvider, handler: MuxerHandler) {.base, gcsafe.} = ## new muxer (muxed connections) handler ## ## triggered every time there is a new muxed diff --git a/libp2p/transports/transport.nim b/libp2p/transports/transport.nim index 55d67a399..ff89458d4 100644 --- a/libp2p/transports/transport.nim +++ b/libp2p/transports/transport.nim @@ -62,3 +62,7 @@ method handles*(t: Transport, address: MultiAddress): bool {.base, gcsafe.} = # TODO: this should implement generic logic that would use the multicodec # declared in the multicodec field and set by each individual transport discard + +method localAddress*(t: Transport): MultiAddress {.base, gcsafe.} = + ## get the local address of the transport in case started with 0.0.0.0:0 + discard diff --git a/tests/testmplex.nim b/tests/testmplex.nim index 49d58abb6..c55f45764 100644 --- a/tests/testmplex.nim +++ b/tests/testmplex.nim @@ -10,7 +10,7 @@ import ../libp2p/connection, ../libp2p/muxers/mplex/mplex, ../libp2p/muxers/mplex/coder, ../libp2p/muxers/mplex/types, - ../libp2p/muxers/mplex/channel + ../libp2p/muxers/mplex/lpchannel suite "Mplex": test "encode header with channel id 0": @@ -118,6 +118,35 @@ suite "Mplex": check: waitFor(testDecodeHeader()) == true + test "e2e - read/write receiver": + proc testNewStream(): Future[bool] {.async.} = + let ma: MultiAddress = Multiaddress.init("/ip4/127.0.0.1/tcp/53381") + + proc connHandler(conn: Connection) {.async, gcsafe.} = + proc handleMplexListen(stream: Connection) {.async, gcsafe.} = + let msg = await stream.readLp() + check cast[string](msg) == "Hello from stream!" + await stream.close() + + let mplexListen = newMplex(conn) + mplexListen.streamHandler = handleMplexListen + asyncCheck mplexListen.handle() + + let transport1: TcpTransport = newTransport(TcpTransport) + discard await transport1.listen(ma, connHandler) + + let transport2: TcpTransport = newTransport(TcpTransport) + let conn = await transport2.dial(ma) + + let mplexDial = newMplex(conn) + let stream = await mplexDial.newStream() + await stream.writeLp("Hello from stream!") + await conn.close() + result = true + + check: + waitFor(testNewStream()) == true + test "e2e - read/write initiator": proc testNewStream(): Future[bool] {.async.} = let ma: MultiAddress = Multiaddress.init("/ip4/127.0.0.1/tcp/53380") @@ -132,7 +161,7 @@ suite "Mplex": await mplexListen.handle() let transport1: TcpTransport = newTransport(TcpTransport) - asyncCheck transport1.listen(ma, connHandler) + discard await transport1.listen(ma, connHandler) let transport2: TcpTransport = newTransport(TcpTransport) let conn = await transport2.dial(ma) @@ -149,35 +178,6 @@ suite "Mplex": check: waitFor(testNewStream()) == true - test "e2e - read/write receiver": - proc testNewStream(): Future[bool] {.async.} = - let ma: MultiAddress = Multiaddress.init("/ip4/127.0.0.1/tcp/53381") - - proc connHandler(conn: Connection) {.async, gcsafe.} = - proc handleMplexListen(stream: Connection) {.async, gcsafe.} = - let msg = await stream.readLp() - check cast[string](msg) == "Hello from stream!" - await stream.close() - - let mplexListen = newMplex(conn) - mplexListen.streamHandler = handleMplexListen - await mplexListen.handle() - - let transport1: TcpTransport = newTransport(TcpTransport) - asyncCheck transport1.listen(ma, connHandler) - - let transport2: TcpTransport = newTransport(TcpTransport) - let conn = await transport2.dial(ma) - - let mplexDial = newMplex(conn) - let stream = await mplexDial.newStream() - await stream.writeLp("Hello from stream!") - await conn.close() - result = true - - check: - waitFor(testNewStream()) == true - test "e2e - multiple streams": proc testNewStream(): Future[bool] {.async.} = let ma: MultiAddress = Multiaddress.init("/ip4/127.0.0.1/tcp/53382") @@ -194,12 +194,10 @@ suite "Mplex": listenConn = conn let mplexListen = newMplex(conn) mplexListen.streamHandler = handleMplexListen - mplexListen.handle() - .addCallback(proc(udata: pointer) = - debug "handle completed") + await mplexListen.handle() let transport1: TcpTransport = newTransport(TcpTransport) - asyncCheck transport1.listen(ma, connHandler) + discard await transport1.listen(ma, connHandler) let transport2: TcpTransport = newTransport(TcpTransport) let conn = await transport2.dial(ma)