diff --git a/libp2p/peerinfo.nim b/libp2p/peerinfo.nim index f86f9e9..f439056 100644 --- a/libp2p/peerinfo.nim +++ b/libp2p/peerinfo.nim @@ -7,6 +7,8 @@ ## This file may not be copied, modified, or distributed except according to ## those terms. +{.push raises: [Defect].} + import options, sequtils, hashes import chronos, chronicles import peerid, multiaddress, crypto/crypto @@ -30,7 +32,6 @@ type peerId*: PeerID addrs*: seq[MultiAddress] protocols*: seq[string] - lifefut: Future[void] protoVersion*: string agentVersion*: string secure*: string @@ -62,12 +63,12 @@ template postInit(peerinfo: PeerInfo, peerinfo.addrs = @addrs if len(protocols) > 0: peerinfo.protocols = @protocols - peerinfo.lifefut = newFuture[void]("libp2p.peerinfo.lifetime") proc init*(p: typedesc[PeerInfo], key: PrivateKey, addrs: openarray[MultiAddress] = [], - protocols: openarray[string] = []): PeerInfo {.inline.} = + protocols: openarray[string] = []): PeerInfo {. + raises: [Defect, ResultError[cstring]].} = result = PeerInfo(keyType: HasPrivate, peerId: PeerID.init(key).tryGet(), privateKey: key) result.postInit(addrs, protocols) @@ -75,55 +76,31 @@ proc init*(p: typedesc[PeerInfo], proc init*(p: typedesc[PeerInfo], peerId: PeerID, addrs: openarray[MultiAddress] = [], - protocols: openarray[string] = []): PeerInfo {.inline.} = + protocols: openarray[string] = []): PeerInfo = result = PeerInfo(keyType: HasPublic, peerId: peerId) result.postInit(addrs, protocols) proc init*(p: typedesc[PeerInfo], peerId: string, addrs: openarray[MultiAddress] = [], - protocols: openarray[string] = []): PeerInfo {.inline.} = + protocols: openarray[string] = []): PeerInfo {. + raises: [Defect, ResultError[cstring]].} = result = PeerInfo(keyType: HasPublic, peerId: PeerID.init(peerId).tryGet()) result.postInit(addrs, protocols) proc init*(p: typedesc[PeerInfo], key: PublicKey, addrs: openarray[MultiAddress] = [], - protocols: openarray[string] = []): PeerInfo {.inline.} = + protocols: openarray[string] = []): PeerInfo {. + raises: [Defect, ResultError[cstring]].}= result = PeerInfo(keyType: HasPublic, peerId: PeerID.init(key).tryGet(), key: some(key)) result.postInit(addrs, protocols) -proc close*(p: PeerInfo) {.inline.} = - if not p.lifefut.finished: - p.lifefut.complete() - else: - # TODO this should ideally not happen - notice "Closing closed peer", peer = p.id - -proc join*(p: PeerInfo): Future[void] {.inline.} = - var retFuture = newFuture[void]() - proc continuation(udata: pointer) {.gcsafe.} = - if not(retFuture.finished()): - retFuture.complete() - proc cancellation(udata: pointer) {.gcsafe.} = - p.lifefut.removeCallback(continuation) - if p.lifefut.finished: - retFuture.complete() - else: - p.lifefut.addCallback(continuation) - retFuture.cancelCallback = cancellation - return retFuture - -proc isClosed*(p: PeerInfo): bool {.inline.} = - result = p.lifefut.finished() - -proc lifeFuture*(p: PeerInfo): Future[void] {.inline.} = - result = p.lifefut - -proc publicKey*(p: PeerInfo): Option[PublicKey] {.inline.} = +proc publicKey*(p: PeerInfo): Option[PublicKey] {. + raises: [Defect, ResultError[CryptoError]].} = if p.keyType == HasPublic: if p.peerId.hasPublicKey(): var pubKey: PublicKey diff --git a/libp2p/protocols/secure/noise.nim b/libp2p/protocols/secure/noise.nim index 1919714..67884b1 100644 --- a/libp2p/protocols/secure/noise.nim +++ b/libp2p/protocols/secure/noise.nim @@ -134,7 +134,7 @@ proc decryptWithAd(state: var CipherState, ad, data: openArray[byte]): seq[byte] ChaChaPoly.decrypt(state.k, nonce, tagOut, result, ad) trace "decryptWithAd", tagIn = tagIn.shortLog, tagOut = tagOut.shortLog, nonce = state.n if tagIn != tagOut: - error "decryptWithAd failed", data = byteutils.toHex(data) + debug "decryptWithAd failed", data = shortLog(data) raise newException(NoiseDecryptTagError, "decryptWithAd failed tag authentication.") inc state.n if state.n > NonceMax: diff --git a/libp2p/switch.nim b/libp2p/switch.nim index 32be1ab..2b3411c 100644 --- a/libp2p/switch.nim +++ b/libp2p/switch.nim @@ -50,12 +50,22 @@ const type NoPubSubException* = object of CatchableError - Lifecycle* {.pure.} = enum - Connected, - Upgraded, - Disconnected + ConnEventKind* {.pure.} = enum + Connected, # A connection was made and securely upgraded - there may be + # more than one concurrent connection thus more than one upgrade + # event per peer. + Disconnected # Peer disconnected - this event is fired once per upgrade + # when the associated connection is terminated. - Hook* = proc(peer: PeerInfo, cycle: Lifecycle): Future[void] {.gcsafe.} + ConnEvent* = object + case kind*: ConnEventKind + of ConnEventKind.Connected: + incoming*: bool + else: + discard + + ConnEventHandler* = + proc(peerId: PeerID, event: ConnEvent): Future[void] {.gcsafe.} Switch* = ref object of RootObj peerInfo*: PeerInfo @@ -69,31 +79,35 @@ type secureManagers*: seq[Secure] pubSub*: Option[PubSub] dialLock: Table[PeerID, AsyncLock] - hooks: Table[Lifecycle, HashSet[Hook]] + ConnEvents: Table[ConnEventKind, HashSet[ConnEventHandler]] pubsubMonitors: Table[PeerId, Future[void]] proc newNoPubSubException(): ref NoPubSubException {.inline.} = result = newException(NoPubSubException, "no pubsub provided!") -proc addHook*(s: Switch, hook: Hook, cycle: Lifecycle) = - s.hooks.mgetOrPut(cycle, initHashSet[Hook]()).incl(hook) +proc addConnEventHandler*(s: Switch, + handler: ConnEventHandler, kind: ConnEventKind) = + ## Add peer event handler - handlers must not raise exceptions! + if isNil(handler): return + s.ConnEvents.mgetOrPut(kind, initHashSet[ConnEventHandler]()).incl(handler) -proc removeHook*(s: Switch, hook: Hook, cycle: Lifecycle) = - s.hooks.mgetOrPut(cycle, initHashSet[Hook]()).excl(hook) +proc removeConnEventHandler*(s: Switch, + handler: ConnEventHandler, kind: ConnEventKind) = + s.ConnEvents.withValue(kind, handlers) do: + handlers[].excl(handler) -proc triggerHooks(s: Switch, peer: PeerInfo, cycle: Lifecycle) {.async, gcsafe.} = +proc triggerConnEvent(s: Switch, peerId: PeerID, event: ConnEvent) {.async, gcsafe.} = try: - if cycle in s.hooks: - var hooks: seq[Future[void]] - for h in s.hooks[cycle]: - if not(isNil(h)): - hooks.add(h(peer, cycle)) + if event.kind in s.ConnEvents: + var ConnEvents: seq[Future[void]] + for h in s.ConnEvents[event.kind]: + ConnEvents.add(h(peerId, event)) - checkFutures(await allFinished(hooks)) + checkFutures(await allFinished(ConnEvents)) except CancelledError as exc: raise exc - except CatchableError as exc: - trace "exception in trigger hooks", exc = exc.msg + except CatchableError as exc: # handlers should not raise! + warn "exception in trigger ConnEvents", exc = exc.msg proc disconnect*(s: Switch, peerId: PeerID) {.async, gcsafe.} proc subscribePeer*(s: Switch, peerId: PeerID) {.async, gcsafe.} @@ -280,86 +294,96 @@ proc upgradeIncoming(s: Switch, conn: Connection) {.async, gcsafe.} = proc internalConnect(s: Switch, peerId: PeerID, addrs: seq[MultiAddress]): Future[Connection] {.async.} = + logScope: peer = peerId + if s.peerInfo.peerId == peerId: raise newException(CatchableError, "can't dial self!") - var conn = s.connManager.selectConn(peerId) - if conn != nil and not conn.atEof and not conn.closed: - trace "Reusing existing connection", oid = $conn.oid, - direction = $conn.dir, - peer = peerId - - return conn - + var conn: Connection + # Ensure there's only one in-flight attempt per peer let lock = s.dialLock.mgetOrPut(peerId, newAsyncLock()) - try: await lock.acquire() - trace "Dialing peer", peer = peerId + + # Check if we have a connection already and try to reuse it + conn = s.connManager.selectConn(peerId) + if conn != nil: + if conn.atEof or conn.closed: + # This connection should already have been removed from the connection + # manager - it's essentially a bug that we end up here - we'll fail + # for now, hoping that this will clean themselves up later... + warn "dead connection in connection manager" + await conn.close() + raise newException(CatchableError, "Zombie connection encountered") + + trace "Reusing existing connection", oid = $conn.oid, + direction = $conn.dir + + return conn + + trace "Dialing peer" for t in s.transports: # for each transport for a in addrs: # for each address if t.handles(a): # check if it can dial it - trace "Dialing address", address = $a, peer = peerId - try: - conn = await t.dial(a) - # make sure to assign the peer to the connection - conn.peerInfo = PeerInfo.init(peerId, addrs) + trace "Dialing address", address = $a + let dialed = try: + await t.dial(a) + except CancelledError as exc: + trace "dialing canceled", exc = exc.msg + raise exc + except CatchableError as exc: + trace "dialing failed", exc = exc.msg + libp2p_failed_dials.inc() + continue # Try the next address - conn.closeEvent.wait() - .addCallback do(udata: pointer): - asyncCheck s.triggerHooks( - conn.peerInfo, - Lifecycle.Disconnected) + # make sure to assign the peer to the connection + dialed.peerInfo = PeerInfo.init(peerId, addrs) - asyncCheck s.triggerHooks(conn.peerInfo, Lifecycle.Connected) - libp2p_dialed_peers.inc() - except CancelledError as exc: - trace "dialing canceled", exc = exc.msg, peer = peerId - raise exc - except CatchableError as exc: - trace "dialing failed", exc = exc.msg, peer = peerId - libp2p_failed_dials.inc() - continue + libp2p_dialed_peers.inc() - try: - let uconn = await s.upgradeOutgoing(conn) - s.connManager.storeOutgoing(uconn) - asyncCheck s.triggerHooks(uconn.peerInfo, Lifecycle.Upgraded) - conn = uconn - trace "dial successful", oid = $conn.oid, peer = $conn.peerInfo - except CatchableError as exc: - if not(isNil(conn)): - await conn.close() + let upgraded = try: + await s.upgradeOutgoing(dialed) + except CatchableError as exc: + # If we failed to establish the connection through one transport, + # we won't succeeed through another - no use in trying again + await dialed.close() + debug "upgrade failed", exc = exc.msg + if exc isnot CancelledError: + libp2p_failed_upgrade.inc() + raise exc - trace "Unable to establish outgoing link", exc = exc.msg, peer = peerId - raise exc + doAssert not isNil(upgraded), "checked in upgradeOutgoing" - if isNil(conn): - libp2p_failed_upgrade.inc() - continue + s.connManager.storeOutgoing(upgraded) + trace "dial successful", + oid = $conn.oid, + peerInfo = shortLog(upgraded.peerInfo) + + conn = upgraded break finally: if lock.locked(): lock.release() - if isNil(conn): + if isNil(conn): # None of the addresses connected raise newException(CatchableError, "Unable to establish outgoing link") - if conn.closed or conn.atEof: - await conn.close() - raise newException(CatchableError, "Connection dead on arrival") + conn.closeEvent.wait() + .addCallback do(udata: pointer): + asyncCheck s.triggerConnEvent( + peerId, ConnEvent(kind: ConnEventKind.Disconnected)) - doAssert(conn in s.connManager, "connection not tracked!") + await s.triggerConnEvent( + peerId, ConnEvent(kind: ConnEventKind.Connected, incoming: false)) - trace "dial successful", oid = $conn.oid, - peer = shortLog(conn.peerInfo) + if conn.closed(): + # This can happen if one of the peer event handlers deems the peer + # unworthy and disconnects it + raise newException(CatchableError, "Connection closed during handshake") asyncCheck s.cleanupPubSubPeer(conn) asyncCheck s.subscribePeer(peerId) - trace "got connection", oid = $conn.oid, - direction = $conn.dir, - peer = shortLog(conn.peerInfo) return conn proc connect*(s: Switch, peerId: PeerID, addrs: seq[MultiAddress]) {.async.} = @@ -418,13 +442,6 @@ proc start*(s: Switch): Future[seq[Future[void]]] {.async, gcsafe.} = proc handle(conn: Connection): Future[void] {.async, closure, gcsafe.} = try: - conn.closeEvent.wait() - .addCallback do(udata: pointer): - asyncCheck s.triggerHooks( - conn.peerInfo, - Lifecycle.Disconnected) - - asyncCheck s.triggerHooks(conn.peerInfo, Lifecycle.Connected) await s.upgradeIncoming(conn) # perform upgrade on incoming connection except CancelledError as exc: raise exc @@ -616,7 +633,10 @@ proc muxerHandler(s: Switch, muxer: Muxer) {.async, gcsafe.} = await muxer.close() return - muxer.connection.peerInfo = stream.peerInfo + let + peerInfo = stream.peerInfo + peerId = peerInfo.peerId + muxer.connection.peerInfo = peerInfo # store incoming connection s.connManager.storeIncoming(muxer.connection) @@ -624,12 +644,19 @@ proc muxerHandler(s: Switch, muxer: Muxer) {.async, gcsafe.} = # store muxer and muxed connection s.connManager.storeMuxer(muxer) - trace "got new muxer", peer = $muxer.connection.peerInfo - asyncCheck s.triggerHooks(muxer.connection.peerInfo, Lifecycle.Upgraded) + trace "got new muxer", peer = shortLog(peerInfo) + + muxer.connection.closeEvent.wait() + .addCallback do(udata: pointer): + asyncCheck s.triggerConnEvent( + peerId, ConnEvent(kind: ConnEventKind.Disconnected)) + + asyncCheck s.triggerConnEvent( + peerId, ConnEvent(kind: ConnEventKind.Connected, incoming: true)) # try establishing a pubsub connection asyncCheck s.cleanupPubSubPeer(muxer.connection) - asyncCheck s.subscribePeer(muxer.connection.peerInfo.peerId) + asyncCheck s.subscribePeer(peerId) except CancelledError as exc: await muxer.close() diff --git a/tests/testpeerinfo.nim b/tests/testpeerinfo.nim index 4428640..0bc8c71 100644 --- a/tests/testpeerinfo.nim +++ b/tests/testpeerinfo.nim @@ -55,16 +55,3 @@ suite "PeerInfo": test "Should return some if pubkey is present in id": let peerInfo = PeerInfo.init(PeerID.init(PrivateKey.random(Ed25519, rng[]).get()).get()) check peerInfo.publicKey.isSome - - test "join() and isClosed() test": - proc testJoin(): Future[bool] {.async, gcsafe.} = - let peerInfo = PeerInfo.init(PeerID.init(PrivateKey.random(Ed25519, rng[]).get()).get()) - check peerInfo.isClosed() == false - var joinFut = peerInfo.join() - check joinFut.finished() == false - peerInfo.close() - await wait(joinFut, 100.milliseconds) - check peerInfo.isClosed() == true - check (joinFut.finished() == true) and (joinFut.cancelled() == false) - result = true - check waitFor(testJoin()) == true diff --git a/tests/testswitch.nim b/tests/testswitch.nim index 89fe5c4..c77b494 100644 --- a/tests/testswitch.nim +++ b/tests/testswitch.nim @@ -237,38 +237,26 @@ suite "Switch": let switch2 = newStandardSwitch(secureManagers = [SecureProtocol.Secio]) var step = 0 - var cycles: set[Lifecycle] - proc hook(peer: PeerInfo, cycle: Lifecycle) {.async, gcsafe.} = - cycles = cycles + {cycle} + var kinds: set[ConnEventKind] + proc hook(peerId: PeerID, event: ConnEvent) {.async, gcsafe.} = + kinds = kinds + {event.kind} case step: of 0: - check cycle == Lifecycle.Connected - check if not(isNil(peer)): - peer.peerId == switch2.peerInfo.peerId - else: - true + check: + event.kind == ConnEventKind.Connected + peerId == switch2.peerInfo.peerId of 1: - assert(isNil(peer) == false) check: - cycle == Lifecycle.Upgraded - peer.peerId == switch2.peerInfo.peerId - of 2: - check: - cycle == Lifecycle.Disconnected + event.kind == ConnEventKind.Disconnected - check if not(isNil(peer)): - peer.peerId == switch2.peerInfo.peerId - else: - true + check peerId == switch2.peerInfo.peerId else: - echo "unkown cycle! ", $cycle check false step.inc() - switch1.addHook(hook, Lifecycle.Connected) - switch1.addHook(hook, Lifecycle.Upgraded) - switch1.addHook(hook, Lifecycle.Disconnected) + switch1.addConnEventHandler(hook, ConnEventKind.Connected) + switch1.addConnEventHandler(hook, ConnEventKind.Disconnected) awaiters.add(await switch1.start()) awaiters.add(await switch2.start()) @@ -294,10 +282,9 @@ suite "Switch": check connTracker.isLeaked() == false check: - cycles == { - Lifecycle.Connected, - Lifecycle.Upgraded, - Lifecycle.Disconnected + kinds == { + ConnEventKind.Connected, + ConnEventKind.Disconnected } await allFuturesThrowing(