diff --git a/libp2p/builders.nim b/libp2p/builders.nim index 7d2e797f9..9d8a3bd50 100644 --- a/libp2p/builders.nim +++ b/libp2p/builders.nim @@ -169,7 +169,7 @@ proc build*(b: SwitchBuilder): Switch transports = block: var transports: seq[Transport] if b.tcpTransportOpts.enable: - transports.add(Transport(TcpTransport.init(b.tcpTransportOpts.flags, muxedUpgrade))) + transports.add(Transport(TcpTransport.new(b.tcpTransportOpts.flags, muxedUpgrade))) transports if b.secureManagers.len == 0: diff --git a/libp2p/transports/tcptransport.nim b/libp2p/transports/tcptransport.nim index ebb55c636..1b6edfc72 100644 --- a/libp2p/transports/tcptransport.nim +++ b/libp2p/transports/tcptransport.nim @@ -114,21 +114,25 @@ proc connHandler*(self: TcpTransport, return conn -func init*( - T: type TcpTransport, +proc init*( + T: typedesc[TcpTransport], + flags: set[ServerFlags] = {}, + upgrade: Upgrade): T {.deprecated: "use .new".} = + + T.new(flags, upgrade) + +proc new*( + T: typedesc[TcpTransport], flags: set[ServerFlags] = {}, upgrade: Upgrade): T = - result = T( + let transport = T( flags: flags, upgrader: upgrade ) - result.initTransport() - -method initTransport*(self: TcpTransport) = - self.multicodec = multiCodec("tcp") inc getTcpTransportTracker().opened + return transport method start*( self: TcpTransport, @@ -150,7 +154,6 @@ method start*( # always get the resolved address in case we're bound to 0.0.0.0:0 self.ma = MultiAddress.init(self.server.sock.getLocalAddress()).tryGet() - self.running = true trace "Listening on", address = self.ma @@ -158,8 +161,6 @@ method stop*(self: TcpTransport) {.async, gcsafe.} = ## stop the transport ## - self.running = false # mark stopped as soon as possible - try: trace "Stopping TCP transport" await procCall Transport(self).stop() # call base @@ -179,24 +180,6 @@ method stop*(self: TcpTransport) {.async, gcsafe.} = except CatchableError as exc: trace "Error shutting down tcp transport", exc = exc.msg -method upgradeIncoming*( - self: TcpTransport, - conn: Connection): Future[void] {.gcsafe.} = - ## base upgrade method that the transport uses to perform - ## transport specific upgrades - ## - - self.upgrader.upgradeIncoming(conn) - -method upgradeOutgoing*( - self: TcpTransport, - conn: Connection): Future[Connection] {.gcsafe.} = - ## base upgrade method that the transport uses to perform - ## transport specific upgrades - ## - - self.upgrader.upgradeOutgoing(conn) - method accept*(self: TcpTransport): Future[Connection] {.async, gcsafe.} = ## accept a new TCP connection ## diff --git a/libp2p/transports/transport.nim b/libp2p/transports/transport.nim index c92df5a4f..699b605d5 100644 --- a/libp2p/transports/transport.nim +++ b/libp2p/transports/transport.nim @@ -28,40 +28,35 @@ type ma*: Multiaddress running*: bool upgrader*: Upgrade - multicodec*: MultiCodec proc newTransportClosedError*(parent: ref Exception = nil): ref LPError = newException(TransportClosedError, "Transport closed, no more connections!", parent) -method initTransport*(self: Transport) {.base, gcsafe, locks: "unknown".} = - ## perform protocol initialization - ## - - discard - method start*( self: Transport, ma: MultiAddress): Future[void] {.base, async.} = ## start the transport ## - self.ma = ma trace "starting transport", address = $ma + self.ma = ma + self.running = true method stop*(self: Transport): Future[void] {.base, async.} = ## stop and cleanup the transport ## including all outstanding connections ## - discard + trace "stopping transport", address = $self.ma + self.running = false method accept*(self: Transport): Future[Connection] {.base, gcsafe.} = ## accept incoming connections ## - discard + doAssert(false, "Not implemented!") method dial*( self: Transport, @@ -69,7 +64,7 @@ method dial*( ## dial a peer ## - discard + doAssert(false, "Not implemented!") method upgradeIncoming*( self: Transport, @@ -78,7 +73,7 @@ method upgradeIncoming*( ## transport specific upgrades ## - doAssert(false, "Not implemented!") + self.upgrader.upgradeIncoming(conn) method upgradeOutgoing*( self: Transport, @@ -87,7 +82,7 @@ method upgradeOutgoing*( ## transport specific upgrades ## - doAssert(false, "Not implemented!") + self.upgrader.upgradeOutgoing(conn) method handles*( self: Transport, @@ -99,9 +94,3 @@ method handles*( # having to repeat the check in every transport if address.protocols.isOk: return address.protocols.get().filterIt( it == multiCodec("p2p-circuit") ).len == 0 - -method localAddress*(self: Transport): MultiAddress {.base, gcsafe.} = - ## get the local address of the transport in case started with 0.0.0.0:0 - ## - - discard diff --git a/tests/commontransport.nim b/tests/commontransport.nim new file mode 100644 index 000000000..caaa75bc7 --- /dev/null +++ b/tests/commontransport.nim @@ -0,0 +1,93 @@ +{.used.} + +import sequtils +import chronos, stew/byteutils +import ../libp2p/[stream/connection, + transports/transport, + upgrademngrs/upgrade, + multiaddress, + errors, + wire] + +import ./helpers + +proc commonTransportTest*(transportType: typedesc[Transport], ma: string) = + suite $transportType & " common": + teardown: + checkTrackers() + asyncTest "e2e: handle write": + let ma: MultiAddress = Multiaddress.init(ma).tryGet() + + let transport1: transportType = transportType.new(upgrade = Upgrade()) + await transport1.start(ma) + + proc acceptHandler() {.async, gcsafe.} = + let conn = await transport1.accept() + await conn.write("Hello!") + await conn.close() + + let handlerWait = acceptHandler() + + let transport2: transportType = transportType.new(upgrade = Upgrade()) + let conn = await transport2.dial(transport1.ma) + var msg = newSeq[byte](6) + await conn.readExactly(addr msg[0], 6) + + await conn.close() #for some protocols, closing requires actively, so we must close here + await handlerWait.wait(1.seconds) # when no issues will not wait that long! + + await transport2.stop() + await transport1.stop() + + check string.fromBytes(msg) == "Hello!" + + asyncTest "e2e: handle read": + let ma: MultiAddress = Multiaddress.init(ma).tryGet() + let transport1: transportType = transportType.new(upgrade = Upgrade()) + asyncSpawn transport1.start(ma) + + proc acceptHandler() {.async, gcsafe.} = + let conn = await transport1.accept() + var msg = newSeq[byte](6) + await conn.readExactly(addr msg[0], 6) + check string.fromBytes(msg) == "Hello!" + await conn.close() + + let handlerWait = acceptHandler() + + let transport2: transportType = transportType.new(upgrade = Upgrade()) + let conn = await transport2.dial(transport1.ma) + await conn.write("Hello!") + + await conn.close() #for some protocols, closing requires actively, so we must close here + await handlerWait.wait(1.seconds) # when no issues will not wait that long! + + await transport2.stop() + await transport1.stop() + + asyncTest "e2e: handle dial cancellation": + let ma: MultiAddress = Multiaddress.init(ma).tryGet() + + let transport1: transportType = transportType.new(upgrade = Upgrade()) + await transport1.start(ma) + + let transport2: transportType = transportType.new(upgrade = Upgrade()) + let cancellation = transport2.dial(transport1.ma) + + await cancellation.cancelAndWait() + check cancellation.cancelled + + await transport2.stop() + await transport1.stop() + + asyncTest "e2e: handle accept cancellation": + let ma: MultiAddress = Multiaddress.init(ma).tryGet() + + let transport1: transportType = transportType.new(upgrade = Upgrade()) + await transport1.start(ma) + + let acceptHandler = transport1.accept() + await acceptHandler.cancelAndWait() + check acceptHandler.cancelled + + await transport1.stop() diff --git a/tests/testidentify.nim b/tests/testidentify.nim index b23caed41..06087f4af 100644 --- a/tests/testidentify.nim +++ b/tests/testidentify.nim @@ -41,8 +41,8 @@ suite "Identify": remotePeerInfo = PeerInfo.init( remoteSecKey, [ma], ["/test/proto1/1.0.0", "/test/proto2/1.0.0"]) - transport1 = TcpTransport.init(upgrade = Upgrade()) - transport2 = TcpTransport.init(upgrade = Upgrade()) + transport1 = TcpTransport.new(upgrade = Upgrade()) + transport2 = TcpTransport.new(upgrade = Upgrade()) identifyProto1 = Identify.new(remotePeerInfo) identifyProto2 = Identify.new(remotePeerInfo) diff --git a/tests/testmplex.nim b/tests/testmplex.nim index f5787b2f9..4b106af6d 100644 --- a/tests/testmplex.nim +++ b/tests/testmplex.nim @@ -380,7 +380,7 @@ suite "Mplex": asyncTest "read/write receiver": let ma: MultiAddress = Multiaddress.init("/ip4/0.0.0.0/tcp/0").tryGet() - let transport1: TcpTransport = TcpTransport.init(upgrade = Upgrade()) + let transport1: TcpTransport = TcpTransport.new(upgrade = Upgrade()) let listenFut = transport1.start(ma) proc acceptHandler() {.async, gcsafe.} = @@ -396,7 +396,7 @@ suite "Mplex": await mplexListen.close() let acceptFut = acceptHandler() - let transport2: TcpTransport = TcpTransport.init(upgrade = Upgrade()) + let transport2: TcpTransport = TcpTransport.new(upgrade = Upgrade()) let conn = await transport2.dial(transport1.ma) let mplexDial = Mplex.init(conn) @@ -417,7 +417,7 @@ suite "Mplex": asyncTest "read/write receiver lazy": let ma: MultiAddress = Multiaddress.init("/ip4/0.0.0.0/tcp/0").tryGet() - let transport1: TcpTransport = TcpTransport.init(upgrade = Upgrade()) + let transport1: TcpTransport = TcpTransport.new(upgrade = Upgrade()) let listenFut = transport1.start(ma) proc acceptHandler() {.async, gcsafe.} = @@ -433,7 +433,7 @@ suite "Mplex": await mplexListen.close() let acceptFut = acceptHandler() - let transport2: TcpTransport = TcpTransport.init(upgrade = Upgrade()) + let transport2: TcpTransport = TcpTransport.new(upgrade = Upgrade()) let conn = await transport2.dial(transport1.ma) let mplexDial = Mplex.init(conn) @@ -461,7 +461,7 @@ suite "Mplex": for _ in 0..