a few more log fixes (#355)
This commit is contained in:
parent
c1856fda53
commit
2b72d485a3
|
@ -44,7 +44,8 @@ declareCounter(libp2p_failed_dials, "failed dials")
|
|||
declareCounter(libp2p_failed_upgrade, "peers failed upgrade")
|
||||
|
||||
type
|
||||
NoPubSubException* = object of CatchableError
|
||||
UpgradeFailedError* = object of CatchableError
|
||||
DialFailedError* = object of CatchableError
|
||||
|
||||
ConnEventKind* {.pure.} = enum
|
||||
Connected, # A connection was made and securely upgraded - there may be
|
||||
|
@ -111,13 +112,13 @@ proc isConnected*(s: Switch, peerId: PeerID): bool =
|
|||
|
||||
proc secure(s: Switch, conn: Connection): Future[Connection] {.async, gcsafe.} =
|
||||
if s.secureManagers.len <= 0:
|
||||
raise newException(CatchableError, "No secure managers registered!")
|
||||
raise newException(UpgradeFailedError, "No secure managers registered!")
|
||||
|
||||
let manager = await s.ms.select(conn, s.secureManagers.mapIt(it.codec))
|
||||
if manager.len == 0:
|
||||
raise newException(CatchableError, "Unable to negotiate a secure channel!")
|
||||
raise newException(UpgradeFailedError, "Unable to negotiate a secure channel!")
|
||||
|
||||
trace "securing connection", codec = manager, conn
|
||||
trace "Securing connection", codec = manager, conn
|
||||
let secureProtocol = s.secureManagers.filterIt(it.codec == manager)
|
||||
|
||||
# ms.select should deal with the correctness of this
|
||||
|
@ -133,7 +134,7 @@ proc identify(s: Switch, conn: Connection) {.async, gcsafe.} =
|
|||
let info = await s.identity.identify(conn, conn.peerInfo)
|
||||
|
||||
if info.pubKey.isNone and isNil(conn):
|
||||
raise newException(CatchableError,
|
||||
raise newException(UpgradeFailedError,
|
||||
"no public key provided and no existing peer identity found")
|
||||
|
||||
if isNil(conn.peerInfo):
|
||||
|
@ -151,7 +152,7 @@ proc identify(s: Switch, conn: Connection) {.async, gcsafe.} =
|
|||
if info.protos.len > 0:
|
||||
conn.peerInfo.protocols = info.protos
|
||||
|
||||
trace "identify: identified remote peer", conn
|
||||
trace "identified remote peer", conn, peerInfo = shortLog(conn.peerInfo)
|
||||
|
||||
proc identify(s: Switch, muxer: Muxer) {.async, gcsafe.} =
|
||||
# new stream for identify
|
||||
|
@ -168,7 +169,7 @@ proc identify(s: Switch, muxer: Muxer) {.async, gcsafe.} =
|
|||
proc mux(s: Switch, conn: Connection): Future[Muxer] {.async, gcsafe.} =
|
||||
## mux incoming connection
|
||||
|
||||
trace "muxing connection", conn
|
||||
trace "Muxing connection", conn
|
||||
if s.muxers.len == 0:
|
||||
warn "no muxers registered, skipping upgrade flow", conn
|
||||
return
|
||||
|
@ -184,7 +185,8 @@ proc mux(s: Switch, conn: Connection): Future[Muxer] {.async, gcsafe.} =
|
|||
# install stream handler
|
||||
muxer.streamHandler = s.streamHandler
|
||||
|
||||
s.connManager.storeOutgoing(muxer.connection)
|
||||
s.connManager.storeOutgoing(conn)
|
||||
trace "Storing muxer", conn
|
||||
s.connManager.storeMuxer(muxer)
|
||||
|
||||
trace "found a muxer", name = muxerName, conn
|
||||
|
@ -193,7 +195,7 @@ proc mux(s: Switch, conn: Connection): Future[Muxer] {.async, gcsafe.} =
|
|||
let handlerFut = muxer.handle()
|
||||
|
||||
# store it in muxed connections if we have a peer for it
|
||||
trace "adding muxer for peer", conn
|
||||
trace "Storing muxer with handler", conn
|
||||
s.connManager.storeMuxer(muxer, handlerFut) # update muxer with handler
|
||||
|
||||
return muxer
|
||||
|
@ -202,20 +204,21 @@ proc disconnect*(s: Switch, peerId: PeerID): Future[void] {.gcsafe.} =
|
|||
s.connManager.dropPeer(peerId)
|
||||
|
||||
proc upgradeOutgoing(s: Switch, conn: Connection): Future[Connection] {.async, gcsafe.} =
|
||||
trace "Upgrading outgoing connection", conn
|
||||
|
||||
let sconn = await s.secure(conn) # secure the connection
|
||||
if isNil(sconn):
|
||||
raise newException(CatchableError,
|
||||
raise newException(UpgradeFailedError,
|
||||
"unable to secure connection, stopping upgrade")
|
||||
|
||||
if sconn.peerInfo.isNil:
|
||||
raise newException(CatchableError,
|
||||
raise newException(UpgradeFailedError,
|
||||
"current version of nim-libp2p requires that secure protocol negotiates peerid")
|
||||
|
||||
trace "upgrading connection", conn
|
||||
let muxer = await s.mux(sconn) # mux it if possible
|
||||
if muxer == nil:
|
||||
# TODO this might be relaxed in the future
|
||||
raise newException(CatchableError,
|
||||
raise newException(UpgradeFailedError,
|
||||
"a muxer is required for outgoing connections")
|
||||
|
||||
try:
|
||||
|
@ -228,28 +231,26 @@ proc upgradeOutgoing(s: Switch, conn: Connection): Future[Connection] {.async, g
|
|||
|
||||
if isNil(sconn.peerInfo):
|
||||
await sconn.close()
|
||||
raise newException(CatchableError,
|
||||
"unable to identify connection, stopping upgrade")
|
||||
raise newException(UpgradeFailedError,
|
||||
"No peerInfo for connection, stopping upgrade")
|
||||
|
||||
trace "successfully upgraded outgoing connection", conn, sconn
|
||||
trace "Upgraded outgoing connection", conn, sconn
|
||||
|
||||
return sconn
|
||||
|
||||
proc upgradeIncoming(s: Switch, conn: Connection) {.async, gcsafe.} =
|
||||
trace "upgrading incoming connection", conn
|
||||
trace "Upgrading incoming connection", conn
|
||||
let ms = newMultistream()
|
||||
|
||||
# secure incoming connections
|
||||
proc securedHandler (conn: Connection,
|
||||
proto: string)
|
||||
{.async, gcsafe, closure.} =
|
||||
|
||||
var sconn: Connection
|
||||
trace "Securing connection", conn
|
||||
let secure = s.secureManagers.filterIt(it.codec == proto)[0]
|
||||
|
||||
try:
|
||||
sconn = await secure.secure(conn, false)
|
||||
var sconn = await secure.secure(conn, false)
|
||||
if isNil(sconn):
|
||||
return
|
||||
|
||||
|
@ -266,7 +267,9 @@ proc upgradeIncoming(s: Switch, conn: Connection) {.async, gcsafe.} =
|
|||
except CancelledError as exc:
|
||||
raise exc
|
||||
except CatchableError as exc:
|
||||
debug "ending secured handler", err = exc.msg, conn
|
||||
debug "Exception in secure handler", err = exc.msg, conn
|
||||
|
||||
trace "Ending secured handler", conn
|
||||
|
||||
if (await ms.select(conn)): # just handshake
|
||||
# add the secure handlers
|
||||
|
@ -296,11 +299,12 @@ proc internalConnect(s: Switch,
|
|||
# This connection should already have been removed from the connection
|
||||
# manager - it's essentially a bug that we end up here - we'll fail
|
||||
# for now, hoping that this will clean themselves up later...
|
||||
warn "dead connection in connection manager", peerId
|
||||
warn "dead connection in connection manager", conn
|
||||
await conn.close()
|
||||
raise newException(CatchableError, "Zombie connection encountered")
|
||||
raise newException(DialFailedError, "Zombie connection encountered")
|
||||
|
||||
trace "Reusing existing connection", conn, direction = $conn.dir
|
||||
|
||||
trace "Reusing existing connection", conn
|
||||
return conn
|
||||
|
||||
trace "Dialing peer", peerId
|
||||
|
@ -311,10 +315,10 @@ proc internalConnect(s: Switch,
|
|||
let dialed = try:
|
||||
await t.dial(a)
|
||||
except CancelledError as exc:
|
||||
trace "dialing canceled", exc = exc.msg, peerId
|
||||
trace "Dialing canceled", exc = exc.msg, peerId
|
||||
raise exc
|
||||
except CatchableError as exc:
|
||||
trace "dialing failed", exc = exc.msg, peerId
|
||||
trace "Dialing failed", exc = exc.msg, peerId
|
||||
libp2p_failed_dials.inc()
|
||||
continue # Try the next address
|
||||
|
||||
|
@ -337,7 +341,7 @@ proc internalConnect(s: Switch,
|
|||
doAssert not isNil(upgraded), "connection died after upgradeOutgoing"
|
||||
|
||||
conn = upgraded
|
||||
trace "dial successful", conn, peerInfo = upgraded.peerInfo
|
||||
trace "Dial successful", conn, peerInfo = conn.peerInfo
|
||||
break
|
||||
finally:
|
||||
if lock.locked():
|
||||
|
@ -371,27 +375,25 @@ proc internalConnect(s: Switch,
|
|||
# All the errors are handled inside `cleanup()` procedure.
|
||||
asyncSpawn peerCleanup()
|
||||
|
||||
|
||||
return conn
|
||||
|
||||
proc connect*(s: Switch, peerId: PeerID, addrs: seq[MultiAddress]) {.async.} =
|
||||
discard await s.internalConnect(peerId, addrs)
|
||||
|
||||
proc negotiateStream(s: Switch, stream: Connection, proto: string): Future[Connection] {.async.} =
|
||||
trace "Attempting to select remote", proto = proto, stream
|
||||
proc negotiateStream(s: Switch, conn: Connection, proto: string): Future[Connection] {.async.} =
|
||||
trace "Negotiating stream", proto = proto, conn
|
||||
if not await s.ms.select(conn, proto):
|
||||
await conn.close()
|
||||
raise newException(DialFailedError, "Unable to select sub-protocol " & proto)
|
||||
|
||||
if not await s.ms.select(stream, proto):
|
||||
await stream.close()
|
||||
raise newException(CatchableError, "Unable to select sub-protocol" & proto)
|
||||
|
||||
return stream
|
||||
return conn
|
||||
|
||||
proc dial*(s: Switch,
|
||||
peerId: PeerID,
|
||||
proto: string): Future[Connection] {.async.} =
|
||||
let stream = await s.connmanager.getMuxedStream(peerId)
|
||||
if stream.isNil:
|
||||
raise newException(CatchableError, "Couldn't get muxed stream")
|
||||
raise newException(DialFailedError, "Couldn't get muxed stream")
|
||||
|
||||
return await s.negotiateStream(stream, proto)
|
||||
|
||||
|
@ -413,7 +415,7 @@ proc dial*(s: Switch,
|
|||
try:
|
||||
if isNil(stream):
|
||||
await conn.close()
|
||||
raise newException(CatchableError, "Couldn't get muxed stream")
|
||||
raise newException(DialFailedError, "Couldn't get muxed stream")
|
||||
|
||||
return await s.negotiateStream(stream, proto)
|
||||
except CancelledError as exc:
|
||||
|
@ -421,7 +423,7 @@ proc dial*(s: Switch,
|
|||
await cleanup()
|
||||
raise exc
|
||||
except CatchableError as exc:
|
||||
trace "error dialing", exc = exc.msg, conn
|
||||
trace "Error dialing", exc = exc.msg, conn
|
||||
await cleanup()
|
||||
raise exc
|
||||
|
||||
|
@ -440,14 +442,16 @@ proc start*(s: Switch): Future[seq[Future[void]]] {.async, gcsafe.} =
|
|||
trace "starting switch for peer", peerInfo = s.peerInfo
|
||||
|
||||
proc handle(conn: Connection): Future[void] {.async, closure, gcsafe.} =
|
||||
trace "Incoming connection", conn
|
||||
try:
|
||||
await s.upgradeIncoming(conn) # perform upgrade on incoming connection
|
||||
except CancelledError as exc:
|
||||
raise exc
|
||||
except CatchableError as exc:
|
||||
trace "Error in connection handler", exc = exc.msg, conn
|
||||
trace "Exception occurred in incoming handler", exc = exc.msg, conn
|
||||
finally:
|
||||
await conn.close()
|
||||
trace "Connection handler done", conn
|
||||
|
||||
var startFuts: seq[Future[void]]
|
||||
for t in s.transports: # for each transport
|
||||
|
@ -457,7 +461,7 @@ proc start*(s: Switch): Future[seq[Future[void]]] {.async, gcsafe.} =
|
|||
s.peerInfo.addrs[i] = t.ma # update peer's address
|
||||
startFuts.add(server)
|
||||
|
||||
debug "started libp2p node", peerInfo = s.peerInfo
|
||||
debug "Started libp2p node", peer = s.peerInfo
|
||||
result = startFuts # listen for incoming connections
|
||||
|
||||
proc stop*(s: Switch) {.async.} =
|
||||
|
@ -477,13 +481,16 @@ proc stop*(s: Switch) {.async.} =
|
|||
trace "switch stopped"
|
||||
|
||||
proc muxerHandler(s: Switch, muxer: Muxer) {.async, gcsafe.} =
|
||||
if muxer.connection.peerInfo.isNil:
|
||||
let
|
||||
conn = muxer.connection
|
||||
|
||||
if conn.peerInfo.isNil:
|
||||
warn "This version of nim-libp2p requires secure protocol to negotiate peerid"
|
||||
await muxer.close()
|
||||
return
|
||||
|
||||
# store incoming connection
|
||||
s.connManager.storeIncoming(muxer.connection)
|
||||
s.connManager.storeIncoming(conn)
|
||||
|
||||
# store muxer and muxed connection
|
||||
s.connManager.storeMuxer(muxer)
|
||||
|
@ -494,10 +501,10 @@ proc muxerHandler(s: Switch, muxer: Muxer) {.async, gcsafe.} =
|
|||
# Identify is non-essential, though if it fails, it might indicate that
|
||||
# the connection was closed already - this will be picked up by the read
|
||||
# loop
|
||||
debug "Could not identify connection", err = exc.msg, muxer
|
||||
debug "Could not identify connection", err = exc.msg, conn
|
||||
|
||||
try:
|
||||
let peerId = muxer.connection.peerInfo.peerId
|
||||
let peerId = conn.peerInfo.peerId
|
||||
|
||||
proc peerCleanup() {.async.} =
|
||||
try:
|
||||
|
@ -507,10 +514,10 @@ proc muxerHandler(s: Switch, muxer: Muxer) {.async, gcsafe.} =
|
|||
except CancelledError:
|
||||
# This is top-level procedure which will work as separate task, so it
|
||||
# do not need to propogate CancelledError.
|
||||
debug "Unexpected cancellation in switch muxer cleanup"
|
||||
debug "Unexpected cancellation in switch muxer cleanup", conn
|
||||
except CatchableError as exc:
|
||||
debug "Unexpected exception in switch muxer cleanup",
|
||||
errMsg = exc.msg, muxer
|
||||
err = exc.msg, conn
|
||||
|
||||
proc peerStartup() {.async.} =
|
||||
try:
|
||||
|
@ -520,10 +527,10 @@ proc muxerHandler(s: Switch, muxer: Muxer) {.async, gcsafe.} =
|
|||
except CancelledError:
|
||||
# This is top-level procedure which will work as separate task, so it
|
||||
# do not need to propogate CancelledError.
|
||||
debug "Unexpected cancellation in switch muxer startup", muxer
|
||||
debug "Unexpected cancellation in switch muxer startup", conn
|
||||
except CatchableError as exc:
|
||||
debug "Unexpected exception in switch muxer startup",
|
||||
errMsg = exc.msg, muxer
|
||||
err = exc.msg, conn
|
||||
|
||||
# All the errors are handled inside `peerStartup()` procedure.
|
||||
asyncSpawn peerStartup()
|
||||
|
@ -537,7 +544,7 @@ proc muxerHandler(s: Switch, muxer: Muxer) {.async, gcsafe.} =
|
|||
except CatchableError as exc:
|
||||
await muxer.close()
|
||||
libp2p_failed_upgrade.inc()
|
||||
trace "exception in muxer handler", exc = exc.msg, muxer
|
||||
trace "Exception in muxer handler", exc = exc.msg, conn
|
||||
|
||||
proc newSwitch*(peerInfo: PeerInfo,
|
||||
transports: seq[Transport],
|
||||
|
@ -558,17 +565,17 @@ proc newSwitch*(peerInfo: PeerInfo,
|
|||
)
|
||||
|
||||
let s = result # can't capture result
|
||||
result.streamHandler = proc(stream: Connection) {.async, gcsafe.} =
|
||||
result.streamHandler = proc(conn: Connection) {.async, gcsafe.} = # noraises
|
||||
trace "Incoming muxed connection", conn
|
||||
try:
|
||||
trace "handling connection for", stream
|
||||
defer:
|
||||
if not(isNil(stream)):
|
||||
await stream.close()
|
||||
await s.ms.handle(stream) # handle incoming connection
|
||||
await s.ms.handle(conn) # handle incoming connection
|
||||
except CancelledError as exc:
|
||||
raise exc
|
||||
except CatchableError as exc:
|
||||
trace "exception in stream handler", exc = exc.msg, stream
|
||||
trace "exception in stream handler", exc = exc.msg, conn
|
||||
finally:
|
||||
await conn.close()
|
||||
trace "Muxed connection done", conn
|
||||
|
||||
result.mount(identity)
|
||||
for key, val in muxers:
|
||||
|
|
Loading…
Reference in New Issue