Add some changes that enable mixnet adapter. There's also loads of debug prints and whatnot that will get cleaned up later.

This commit is contained in:
Alejandro Cabeza Romero 2024-10-31 10:29:32 +01:00
parent 09fe199b6b
commit 5494c34589
No known key found for this signature in database
GPG Key ID: DA3D14AE478030FD
10 changed files with 81 additions and 26 deletions

View File

@ -54,13 +54,29 @@ proc dialAndUpgrade(
address: MultiAddress, address: MultiAddress,
dir = Direction.Out, dir = Direction.Out,
): Future[Muxer] {.async.} = ): Future[Muxer] {.async.} =
echo "\n\n> Dialer::dialAndUpgrade"
for transport in self.transports: # for each transport for transport in self.transports: # for each transport
if transport.handles(address): # check if it can dial it if transport.handles(address): # check if it can dial it
trace "Dialing address", address, peerId = peerId.get(default(PeerId)), hostname trace "Dialing address", address, peerId = peerId.get(default(PeerId)), hostname
let dialed = let dialed =
try: try:
libp2p_total_dial_attempts.inc() libp2p_total_dial_attempts.inc()
await transport.dial(hostname, address, peerId) echo "> Dialer::dialAndUpgrade::0"
# echo hostname
# echo address
# echo peerId
echo transport.log()
echo type(transport)
echo type(hostname)
echo type(address)
echo type(peerId)
let dialFut = transport.dial(hostname, address, peerId)
echo "> Dialer::dialAndUpgrade::after-dial"
let myD = await dialFut
echo "< Dialer::dialAndUpgrade::after-dial"
echo myD.shortLog()
echo "<<<< Dialer::dialAndUpgrade::after-dial"
myD
except CancelledError as exc: except CancelledError as exc:
trace "Dialing canceled", trace "Dialing canceled",
description = exc.msg, peerId = peerId.get(default(PeerId)) description = exc.msg, peerId = peerId.get(default(PeerId))
@ -70,7 +86,8 @@ proc dialAndUpgrade(
description = exc.msg, peerId = peerId.get(default(PeerId)) description = exc.msg, peerId = peerId.get(default(PeerId))
libp2p_failed_dials.inc() libp2p_failed_dials.inc()
return nil # Try the next address return nil # Try the next address
echo "> Dialer::dialAndUpgrade::1"
echo dialed.shortLog()
libp2p_successful_dials.inc() libp2p_successful_dials.inc()
let mux = let mux =
@ -80,6 +97,11 @@ proc dialAndUpgrade(
# The if below is more general and might handle other use cases in the future. # The if below is more general and might handle other use cases in the future.
if dialed.dir != dir: if dialed.dir != dir:
dialed.dir = dir dialed.dir = dir
echo "> Dialer::dialAndUpgrade::2"
echo transport.log()
echo dialed.shortLog()
# TODO: dialed should be MixnetConnectionAdapter
echo "< Dialer::dialAndUpgrade::2"
await transport.upgrade(dialed, peerId) await transport.upgrade(dialed, peerId)
except CancelledError as exc: except CancelledError as exc:
await dialed.close() await dialed.close()
@ -97,7 +119,7 @@ proc dialAndUpgrade(
# Try other address # Try other address
return nil return nil
echo "> Dialer::dialAndUpgrade::3"
doAssert not isNil(mux), "connection died after upgrade " & $dialed.dir doAssert not isNil(mux), "connection died after upgrade " & $dialed.dir
debug "Dial successful", peerId = mux.connection.peerId debug "Dial successful", peerId = mux.connection.peerId
return mux return mux
@ -170,6 +192,7 @@ proc internalConnect(
reuseConnection = true, reuseConnection = true,
dir = Direction.Out, dir = Direction.Out,
): Future[Muxer] {.async.} = ): Future[Muxer] {.async.} =
echo "> Dialer::internalConnect"
if Opt.some(self.localPeerId) == peerId: if Opt.some(self.localPeerId) == peerId:
raise newException(CatchableError, "can't dial self!") raise newException(CatchableError, "can't dial self!")
@ -184,16 +207,17 @@ proc internalConnect(
return mux return mux
let slot = self.connManager.getOutgoingSlot(forceDial) let slot = self.connManager.getOutgoingSlot(forceDial)
echo "> Dialer::internalConnect::0"
let muxed = let muxed =
try: try:
await self.dialAndUpgrade(peerId, addrs, dir) await self.dialAndUpgrade(peerId, addrs, dir)
except CatchableError as exc: except CatchableError as exc:
slot.release() slot.release()
raise exc raise exc
echo "> Dialer::internalConnect::1"
slot.trackMuxer(muxed) slot.trackMuxer(muxed)
if isNil(muxed): # None of the addresses connected if isNil(muxed): # None of the addresses connected
raise newException(DialFailedError, "Unable to establish outgoing link") raise newException(DialFailedError, "Unable to establish outgoing link")
try: try:
self.connManager.storeMuxer(muxed) self.connManager.storeMuxer(muxed)
await self.peerStore.identify(muxed) await self.peerStore.identify(muxed)
@ -302,7 +326,7 @@ method dial*(
## create a protocol stream and establish ## create a protocol stream and establish
## a connection if one doesn't exist already ## a connection if one doesn't exist already
## ##
echo "> Dialer::dial"
var var
conn: Muxer conn: Muxer
stream: Connection stream: Connection
@ -315,14 +339,18 @@ method dial*(
await conn.close() await conn.close()
try: try:
echo "> Dialer::0"
trace "Dialing (new)", peerId, protos trace "Dialing (new)", peerId, protos
conn = await self.internalConnect(Opt.some(peerId), addrs, forceDial) conn = await self.internalConnect(Opt.some(peerId), addrs, forceDial)
echo "> Dialer::1"
trace "Opening stream", conn trace "Opening stream", conn
stream = await self.connManager.getStream(conn) stream = await self.connManager.getStream(conn)
echo "> Dialer::2"
if isNil(stream): if isNil(stream):
raise newException(DialFailedError, "Couldn't get muxed stream") raise newException(DialFailedError, "Couldn't get muxed stream")
echo "< Dialer::dial"
return await self.negotiateStream(stream, protos) return await self.negotiateStream(stream, protos)
except CancelledError as exc: except CancelledError as exc:
trace "Dial canceled", conn trace "Dial canceled", conn

View File

@ -52,7 +52,9 @@ proc select*(
): Future[string] {.async: (raises: [CancelledError, LPStreamError, MultiStreamError]).} = ): Future[string] {.async: (raises: [CancelledError, LPStreamError, MultiStreamError]).} =
trace "initiating handshake", conn, codec = Codec trace "initiating handshake", conn, codec = Codec
## select a remote protocol ## select a remote protocol
echo "> MultiStreamSelect::select"
await conn.writeLp(Codec & "\n") # write handshake await conn.writeLp(Codec & "\n") # write handshake
echo "> MultiStreamSelect::select - 0"
if proto.len() > 0: if proto.len() > 0:
trace "selecting proto", conn, proto = proto[0] trace "selecting proto", conn, proto = proto[0]
await conn.writeLp((proto[0] & "\n")) # select proto await conn.writeLp((proto[0] & "\n")) # select proto
@ -67,6 +69,7 @@ proc select*(
trace "multistream handshake success", conn trace "multistream handshake success", conn
if proto.len() == 0: # no protocols, must be a handshake call if proto.len() == 0: # no protocols, must be a handshake call
echo "< MultiStreamSelect::select - Handshake"
return Codec return Codec
else: else:
s = string.fromBytes(await conn.readLp(MsgSize)) # read the first proto s = string.fromBytes(await conn.readLp(MsgSize)) # read the first proto
@ -75,6 +78,7 @@ proc select*(
if s == proto[0]: if s == proto[0]:
trace "successfully selected ", conn, proto = proto[0] trace "successfully selected ", conn, proto = proto[0]
conn.protocol = proto[0] conn.protocol = proto[0]
echo "< MultiStreamSelect::select - ", proto[0]
return proto[0] return proto[0]
elif proto.len > 1: elif proto.len > 1:
# Try to negotiate alternatives # Try to negotiate alternatives

View File

@ -52,6 +52,7 @@ proc new*(
method init*(p: Ping) = method init*(p: Ping) =
proc handle(conn: Connection, proto: string) {.async.} = proc handle(conn: Connection, proto: string) {.async.} =
echo "######### Before Ping #########"
try: try:
trace "handling ping", conn trace "handling ping", conn
var buf: array[PingSize, byte] var buf: array[PingSize, byte]
@ -64,13 +65,15 @@ method init*(p: Ping) =
raise exc raise exc
except CatchableError as exc: except CatchableError as exc:
trace "exception in ping handler", description = exc.msg, conn trace "exception in ping handler", description = exc.msg, conn
echo "######### After Ping #########"
p.handler = handle p.handler = handle
p.codec = PingCodec p.codec = PingCodec
proc ping*(p: Ping, conn: Connection): Future[Duration] {.async, public.} = proc ping*(p: Ping, conn: Connection): Future[Duration] {.async, public.} =
## Sends ping to `conn`, returns the delay ## Sends ping to `conn`, returns the delay
echo "######### Pinging #########"
echo conn.shortLog()
trace "initiating ping", conn trace "initiating ping", conn
var var
@ -82,9 +85,12 @@ proc ping*(p: Ping, conn: Connection): Future[Duration] {.async, public.} =
let startTime = Moment.now() let startTime = Moment.now()
trace "sending ping", conn trace "sending ping", conn
echo "# Before Write. Is conn closed? ", conn.isClosed, conn.isEof
await conn.write(@randomBuf) await conn.write(@randomBuf)
echo "# After Write. Is conn closed? ", conn.isClosed, conn.isEof
await conn.readExactly(addr resultBuf[0], PingSize) await conn.readExactly(addr resultBuf[0], PingSize)
echo "# After Read. Is conn closed? ", conn.isClosed
let responseDur = Moment.now() - startTime let responseDur = Moment.now() - startTime
@ -95,4 +101,5 @@ proc ping*(p: Ping, conn: Connection): Future[Duration] {.async, public.} =
raise newException(WrongPingAckError, "Incorrect ping data from peer!") raise newException(WrongPingAckError, "Incorrect ping data from peer!")
trace "valid ping response", conn trace "valid ping response", conn
echo "######### Pinged #########"
return responseDur return responseDur

View File

@ -148,6 +148,7 @@ method init*(s: Secure) =
try: try:
# We don't need the result but we # We don't need the result but we
# definitely need to await the handshake # definitely need to await the handshake
echo "Secure::handle"
discard await s.handleConn(conn, false, Opt.none(PeerId)) discard await s.handleConn(conn, false, Opt.none(PeerId))
trace "connection secured", conn trace "connection secured", conn
except CancelledError as exc: except CancelledError as exc:
@ -165,6 +166,7 @@ method secure*(
): Future[Connection] {. ): Future[Connection] {.
async: (raises: [CancelledError, LPStreamError], raw: true), base async: (raises: [CancelledError, LPStreamError], raw: true), base
.} = .} =
echo "> Secure::secure"
s.handleConn(conn, conn.dir == Direction.Out, peerId) s.handleConn(conn, conn.dir == Direction.Out, peerId)
method readOnce*( method readOnce*(

View File

@ -40,14 +40,11 @@ type
proc timeoutMonitor(s: Connection) {.async: (raises: []).} proc timeoutMonitor(s: Connection) {.async: (raises: []).}
func shortLog*(conn: Connection): string = method shortLog*(conn: Connection): string {.raises: [].} =
try: if conn == nil:
if conn == nil: "Connection(nil)"
"Connection(nil)" else:
else: &"{shortLog(conn.peerId)}:{conn.oid}:{conn.protocol}"
&"{shortLog(conn.peerId)}:{conn.oid}"
except ValueError as exc:
raiseAssert(exc.msg)
chronicles.formatIt(Connection): chronicles.formatIt(Connection):
shortLog(it) shortLog(it)

View File

@ -112,7 +112,7 @@ method initStream*(s: LPStream) {.base.} =
trackCounter(s.objName) trackCounter(s.objName)
trace "Stream created", s, objName = s.objName, dir = $s.dir trace "Stream created", s, objName = s.objName, dir = $s.dir
proc join*( method join*(
s: LPStream s: LPStream
): Future[void] {.async: (raises: [CancelledError], raw: true), public.} = ): Future[void] {.async: (raises: [CancelledError], raw: true), public.} =
## Wait for the stream to be closed ## Wait for the stream to be closed
@ -134,9 +134,10 @@ method readOnce*(
## available ## available
raiseAssert("Not implemented!") raiseAssert("Not implemented!")
proc readExactly*( method readExactly*(
s: LPStream, pbytes: pointer, nbytes: int s: LPStream, pbytes: pointer, nbytes: int
): Future[void] {.async: (raises: [CancelledError, LPStreamError]), public.} = ): Future[void] {.async: (raises: [CancelledError, LPStreamError]), public.} =
# echo "readExactly. Is conn closed? ", s.isClosed
## Waits for `nbytes` to be available, then read ## Waits for `nbytes` to be available, then read
## them and return them ## them and return them
if s.atEof: if s.atEof:
@ -159,9 +160,9 @@ proc readExactly*(
if read == 0: if read == 0:
doAssert s.atEof() doAssert s.atEof()
trace "couldn't read all bytes, stream EOF", s, nbytes, read
# Re-readOnce to raise a more specific error than EOF # Re-readOnce to raise a more specific error than EOF
# Raise EOF if it doesn't raise anything(shouldn't happen) # Raise EOF if it doesn't raise anything(shouldn't happen)
# echo "readExactly3. Is conn closed? ", s.isClosed
discard await s.readOnce(addr pbuffer[read], nbytes - read) discard await s.readOnce(addr pbuffer[read], nbytes - read)
warn "Read twice while at EOF" warn "Read twice while at EOF"
raise newLPStreamEOFError() raise newLPStreamEOFError()
@ -170,7 +171,7 @@ proc readExactly*(
trace "couldn't read all bytes, incomplete data", s, nbytes, read trace "couldn't read all bytes, incomplete data", s, nbytes, read
raise newLPStreamIncompleteError() raise newLPStreamIncompleteError()
proc readLine*( method readLine*(
s: LPStream, limit = 0, sep = "\r\n" s: LPStream, limit = 0, sep = "\r\n"
): Future[string] {.async: (raises: [CancelledError, LPStreamError]), public.} = ): Future[string] {.async: (raises: [CancelledError, LPStreamError]), public.} =
## Reads up to `limit` bytes are read, or a `sep` is found ## Reads up to `limit` bytes are read, or a `sep` is found
@ -198,7 +199,7 @@ proc readLine*(
if len(result) == lim: if len(result) == lim:
break break
proc readVarint*( method readVarint*(
conn: LPStream conn: LPStream
): Future[uint64] {.async: (raises: [CancelledError, LPStreamError]), public.} = ): Future[uint64] {.async: (raises: [CancelledError, LPStreamError]), public.} =
var buffer: array[10, byte] var buffer: array[10, byte]
@ -217,7 +218,7 @@ proc readVarint*(
if true: # can't end with a raise apparently if true: # can't end with a raise apparently
raise (ref InvalidVarintError)(msg: "Cannot parse varint") raise (ref InvalidVarintError)(msg: "Cannot parse varint")
proc readLp*( method readLp*(
s: LPStream, maxSize: int s: LPStream, maxSize: int
): Future[seq[byte]] {.async: (raises: [CancelledError, LPStreamError]), public.} = ): Future[seq[byte]] {.async: (raises: [CancelledError, LPStreamError]), public.} =
## read length prefixed msg, with the length encoded as a varint ## read length prefixed msg, with the length encoded as a varint
@ -243,7 +244,7 @@ method write*(
# Write `msg` to stream, waiting for the write to be finished # Write `msg` to stream, waiting for the write to be finished
raiseAssert("Not implemented!") raiseAssert("Not implemented!")
proc writeLp*( method writeLp*(
s: LPStream, msg: openArray[byte] s: LPStream, msg: openArray[byte]
): Future[void] {.async: (raises: [CancelledError, LPStreamError], raw: true), public.} = ): Future[void] {.async: (raises: [CancelledError, LPStreamError], raw: true), public.} =
## Write `msg` with a varint-encoded length prefix ## Write `msg` with a varint-encoded length prefix
@ -253,7 +254,7 @@ proc writeLp*(
buf[vbytes.len ..< buf.len] = msg buf[vbytes.len ..< buf.len] = msg
s.write(buf) s.write(buf)
proc writeLp*( method writeLp*(
s: LPStream, msg: string s: LPStream, msg: string
): Future[void] {.async: (raises: [CancelledError, LPStreamError], raw: true), public.} = ): Future[void] {.async: (raises: [CancelledError, LPStreamError], raw: true), public.} =
writeLp(s, msg.toOpenArrayByte(0, msg.high)) writeLp(s, msg.toOpenArrayByte(0, msg.high))
@ -305,6 +306,7 @@ proc closeWithEOF*(s: LPStream): Future[void] {.async: (raises: []), public.} =
## ##
trace "Closing with EOF", s trace "Closing with EOF", s
echo "> Closing with EOF: ", s.shortLog()
if s.closedWithEOF: if s.closedWithEOF:
trace "Already closed" trace "Already closed"
return return

View File

@ -226,6 +226,7 @@ method accept*(self: TcpTransport): Future[Connection] =
proc impl( proc impl(
self: TcpTransport self: TcpTransport
): Future[Connection] {.async: (raises: [transport.TransportError, CancelledError]).} = ): Future[Connection] {.async: (raises: [transport.TransportError, CancelledError]).} =
echo "> TcpTransport::accept"
proc cancelAcceptFuts() = proc cancelAcceptFuts() =
for fut in self.acceptFuts: for fut in self.acceptFuts:
if not fut.completed(): if not fut.completed():
@ -239,19 +240,23 @@ method accept*(self: TcpTransport): Future[Connection] =
elif self.acceptFuts.len == 0: elif self.acceptFuts.len == 0:
# Holds futures representing ongoing accept calls on multiple servers. # Holds futures representing ongoing accept calls on multiple servers.
self.acceptFuts = self.servers.mapIt(it.accept()) self.acceptFuts = self.servers.mapIt(it.accept())
echo "> TcpTransport::accept - 0"
let let
finished = finished =
try: try:
# Waits for any one of these futures to complete, indicating that a new connection has been accepted on one of the servers. # Waits for any one of these futures to complete, indicating that a new connection has been accepted on one of the servers.
echo "###############################################################"
await one(self.acceptFuts) await one(self.acceptFuts)
except ValueError: except ValueError:
echo "> TcpTransport::accept - 02"
raiseAssert "Accept futures should not be empty" raiseAssert "Accept futures should not be empty"
except CancelledError as exc: except CancelledError as exc:
echo "> TcpTransport::accept - 03"
cancelAcceptFuts() cancelAcceptFuts()
raise exc raise exc
index = self.acceptFuts.find(finished) index = self.acceptFuts.find(finished)
echo "> TcpTransport::accept - 1"
# A new connection has been accepted. The corresponding server should immediately start accepting another connection. # A new connection has been accepted. The corresponding server should immediately start accepting another connection.
# Thus we replace the completed future with a new one by calling accept on the same server again. # Thus we replace the completed future with a new one by calling accept on the same server again.
self.acceptFuts[index] = self.servers[index].accept() self.acceptFuts[index] = self.servers[index].accept()
@ -274,6 +279,7 @@ method accept*(self: TcpTransport): Future[Connection] =
cancelAcceptFuts() cancelAcceptFuts()
raise exc raise exc
echo "> TcpTransport::accept - 2"
if not self.running: # Stopped while waiting if not self.running: # Stopped while waiting
await transp.closeWait() await transp.closeWait()
raise newTransportClosedError() raise newTransportClosedError()
@ -289,6 +295,7 @@ method accept*(self: TcpTransport): Future[Connection] =
let observedAddr = let observedAddr =
MultiAddress.init(remote).expect("Can initialize from remote address") MultiAddress.init(remote).expect("Can initialize from remote address")
echo "- TcpTransport::accept"
self.connHandler(transp, Opt.some(observedAddr), Direction.In) self.connHandler(transp, Opt.some(observedAddr), Direction.In)
impl(self) impl(self)

View File

@ -36,6 +36,9 @@ type
upgrader*: Upgrade upgrader*: Upgrade
networkReachability*: NetworkReachability networkReachability*: NetworkReachability
method log*(self: Transport): string {.base, gcsafe.} =
"<Transport>"
proc newTransportClosedError*(parent: ref Exception = nil): ref TransportError = proc newTransportClosedError*(parent: ref Exception = nil): ref TransportError =
newException(TransportClosedError, "Transport closed, no more connections!", parent) newException(TransportClosedError, "Transport closed, no more connections!", parent)
@ -69,10 +72,10 @@ method dial*(
): Future[Connection] {.base, gcsafe.} = ): Future[Connection] {.base, gcsafe.} =
## dial a peer ## dial a peer
## ##
echo "Transport::dial"
doAssert(false, "Not implemented!") doAssert(false, "Not implemented!")
proc dial*( method dial*(
self: Transport, address: MultiAddress, peerId: Opt[PeerId] = Opt.none(PeerId) self: Transport, address: MultiAddress, peerId: Opt[PeerId] = Opt.none(PeerId)
): Future[Connection] {.gcsafe.} = ): Future[Connection] {.gcsafe.} =
self.dial("", address) self.dial("", address)
@ -83,6 +86,7 @@ method upgrade*(
## base upgrade method that the transport uses to perform ## base upgrade method that the transport uses to perform
## transport specific upgrades ## transport specific upgrades
## ##
echo "> Transport::upgrade"
self.upgrader.upgrade(conn, peerId) self.upgrader.upgrade(conn, peerId)
method handles*(self: Transport, address: MultiAddress): bool {.base, gcsafe.} = method handles*(self: Transport, address: MultiAddress): bool {.base, gcsafe.} =

View File

@ -68,6 +68,8 @@ method upgrade*(
): Future[Muxer] {.async: (raises: [CancelledError, LPError]).} = ): Future[Muxer] {.async: (raises: [CancelledError, LPError]).} =
trace "Upgrading connection", conn, direction = conn.dir trace "Upgrading connection", conn, direction = conn.dir
echo "> MuxedUpgrade::upgrade"
echo "-----"
let sconn = await self.secure(conn, peerId) # secure the connection let sconn = await self.secure(conn, peerId) # secure the connection
if sconn == nil: if sconn == nil:
raise (ref UpgradeFailedError)(msg: "unable to secure connection, stopping upgrade") raise (ref UpgradeFailedError)(msg: "unable to secure connection, stopping upgrade")

View File

@ -47,9 +47,10 @@ method upgrade*(
): Future[Muxer] {.async: (raises: [CancelledError, LPError], raw: true), base.} = ): Future[Muxer] {.async: (raises: [CancelledError, LPError], raw: true), base.} =
raiseAssert("Not implemented!") raiseAssert("Not implemented!")
proc secure*( method secure*(
self: Upgrade, conn: Connection, peerId: Opt[PeerId] self: Upgrade, conn: Connection, peerId: Opt[PeerId]
): Future[Connection] {.async: (raises: [CancelledError, LPError]).} = ): Future[Connection] {.async: (raises: [CancelledError, LPError]).} =
echo "> Upgrade::secure"
if self.secureManagers.len <= 0: if self.secureManagers.len <= 0:
raise (ref UpgradeFailedError)(msg: "No secure managers registered!") raise (ref UpgradeFailedError)(msg: "No secure managers registered!")
@ -68,4 +69,5 @@ proc secure*(
# let's avoid duplicating checks but detect if it fails to do it properly # let's avoid duplicating checks but detect if it fails to do it properly
doAssert(secureProtocol.len > 0) doAssert(secureProtocol.len > 0)
echo "> Upgrade::secure - 0"
await secureProtocol[0].secure(conn, peerId) await secureProtocol[0].secure(conn, peerId)