From c6c0c152c0b08e2af776b4f0073081fb657f513b Mon Sep 17 00:00:00 2001 From: Jacek Sieka Date: Thu, 6 Aug 2020 09:29:27 +0200 Subject: [PATCH] Dial peerid (#308) * prefer PeerID in switch api This avoids ref issues like ref identity and nil * use existing peerinfo instance if possible * remove secureCodec there may be multiple connections per peerinfo with different codecs * avoid some extra async:: --- libp2p/protocols/pubsub/pubsub.nim | 12 +- libp2p/protocols/secure/noise.nim | 22 ++-- libp2p/protocols/secure/secio.nim | 9 +- libp2p/switch.nim | 188 ++++++++++++++++------------- 4 files changed, 127 insertions(+), 104 deletions(-) diff --git a/libp2p/protocols/pubsub/pubsub.nim b/libp2p/protocols/pubsub/pubsub.nim index 0415150a1..90559b26d 100644 --- a/libp2p/protocols/pubsub/pubsub.nim +++ b/libp2p/protocols/pubsub/pubsub.nim @@ -204,12 +204,9 @@ method unsubscribePeer*(p: PubSub, peerInfo: PeerInfo) {.base, async.} = if not(isNil(peer)) and not(isNil(peer.conn)): await peer.conn.close() -proc connected*(p: PubSub, peerInfo: PeerInfo): bool = - if peerInfo.id in p.peers: - let peer = p.peers[peerInfo.id] - - if not(isNil(peer)): - return peer.connected +proc connected*(p: PubSub, peerId: PeerID): bool = + p.peers.withValue($peerId, peer): + return peer[] != nil and peer[].connected method unsubscribe*(p: PubSub, topics: seq[TopicPair]) {.base, async.} = @@ -388,3 +385,6 @@ proc removeObserver*(p: PubSub; observer: PubSubObserver) = let idx = p.observers[].find(observer) if idx != -1: p.observers[].del(idx) + +proc connected*(p: PubSub, peerInfo: PeerInfo): bool {.deprecated: "Use PeerID version".} = + peerInfo != nil and connected(p, peerInfo.peerId) diff --git a/libp2p/protocols/secure/noise.nim b/libp2p/protocols/secure/noise.nim index 56e9774dc..191971446 100644 --- a/libp2p/protocols/secure/noise.nim +++ b/libp2p/protocols/secure/noise.nim @@ -455,14 +455,14 @@ method handshake*(p: Noise, conn: Connection, initiator: bool): Future[SecureCon let r1 = remoteProof.getField(1, remotePubKeyBytes) let r2 = remoteProof.getField(2, remoteSigBytes) if r1.isErr() or not(r1.get()): - raise newException(NoiseHandshakeError, "Failed to deserialize remote public key bytes. (initiator: " & $initiator & ", peer: " & $conn.peerInfo.peerId & ")") + raise newException(NoiseHandshakeError, "Failed to deserialize remote public key bytes. (initiator: " & $initiator & ")") if r2.isErr() or not(r2.get()): - raise newException(NoiseHandshakeError, "Failed to deserialize remote signature bytes. (initiator: " & $initiator & ", peer: " & $conn.peerInfo.peerId & ")") + raise newException(NoiseHandshakeError, "Failed to deserialize remote signature bytes. (initiator: " & $initiator & ")") if not remotePubKey.init(remotePubKeyBytes): - raise newException(NoiseHandshakeError, "Failed to decode remote public key. (initiator: " & $initiator & ", peer: " & $conn.peerInfo.peerId & ")") + raise newException(NoiseHandshakeError, "Failed to decode remote public key. (initiator: " & $initiator & ")") if not remoteSig.init(remoteSigBytes): - raise newException(NoiseHandshakeError, "Failed to decode remote signature. (initiator: " & $initiator & ", peer: " & $conn.peerInfo.peerId & ")") + raise newException(NoiseHandshakeError, "Failed to decode remote signature. (initiator: " & $initiator & ")") let verifyPayload = PayloadString.toBytes & handshakeRes.rs.getBytes if not remoteSig.verify(verifyPayload, remotePubKey): @@ -478,11 +478,17 @@ method handshake*(p: Noise, conn: Connection, initiator: bool): Future[SecureCon var failedKey: PublicKey discard extractPublicKey(conn.peerInfo.peerId, failedKey) - debug "Noise handshake, peer infos don't match!", initiator, dealt_peer = $conn.peerInfo.id, dealt_key = $failedKey, received_peer = $pid, received_key = $remotePubKey + debug "Noise handshake, peer infos don't match!", + initiator, dealt_peer = $conn.peerInfo.id, + dealt_key = $failedKey, received_peer = $pid, + received_key = $remotePubKey raise newException(NoiseHandshakeError, "Noise handshake, peer infos don't match! " & $pid & " != " & $conn.peerInfo.peerId) - var tmp = NoiseConnection.init( - conn, PeerInfo.init(remotePubKey), conn.observedAddr) + let peerInfo = + if conn.peerInfo != nil: conn.peerInfo + else: PeerInfo.init(remotePubKey) + + var tmp = NoiseConnection.init(conn, peerInfo, conn.observedAddr) if initiator: tmp.readCs = handshakeRes.cs2 @@ -494,7 +500,7 @@ method handshake*(p: Noise, conn: Connection, initiator: bool): Future[SecureCon finally: burnMem(handshakeRes) - trace "Noise handshake completed!", initiator, peer = $secure.peerInfo + trace "Noise handshake completed!", initiator, peer = shortLog(secure.peerInfo) return secure diff --git a/libp2p/protocols/secure/secio.nim b/libp2p/protocols/secure/secio.nim index 4cd2e6bc5..52c3905b5 100644 --- a/libp2p/protocols/secure/secio.nim +++ b/libp2p/protocols/secure/secio.nim @@ -246,9 +246,12 @@ proc newSecioConn(conn: Connection, ## Create new secure stream/lpstream, using specified hash algorithm ``hash``, ## cipher algorithm ``cipher``, stretched keys ``secrets`` and order ## ``order``. - result = SecioConn.init(conn, - PeerInfo.init(remotePubKey), - conn.observedAddr) + + let peerInfo = + if conn.peerInfo != nil: conn.peerInfo + else: PeerInfo.init(remotePubKey) + + result = SecioConn.init(conn, peerInfo, conn.observedAddr) let i0 = if order < 0: 1 else: 0 let i1 = if order < 0: 0 else: 1 diff --git a/libp2p/switch.nim b/libp2p/switch.nim index 582fbdebc..ce3943be1 100644 --- a/libp2p/switch.nim +++ b/libp2p/switch.nim @@ -95,9 +95,9 @@ proc triggerHooks(s: Switch, peer: PeerInfo, cycle: Lifecycle) {.async, gcsafe.} except CatchableError as exc: trace "exception in trigger hooks", exc = exc.msg -proc disconnect*(s: Switch, peer: PeerInfo) {.async, gcsafe.} -proc subscribePeer*(s: Switch, peerInfo: PeerInfo) {.async, gcsafe.} -proc subscribePeerInternal(s: Switch, peerInfo: PeerInfo) {.async, gcsafe.} +proc disconnect*(s: Switch, peerId: PeerID) {.async, gcsafe.} +proc subscribePeer*(s: Switch, peerId: PeerID) {.async, gcsafe.} +proc subscribePeerInternal(s: Switch, peerId: PeerID) {.async, gcsafe.} proc cleanupPubSubPeer(s: Switch, conn: Connection) {.async.} = try: @@ -114,12 +114,12 @@ proc cleanupPubSubPeer(s: Switch, conn: Connection) {.async.} = except CatchableError as exc: trace "exception cleaning pubsub peer", exc = exc.msg -proc isConnected*(s: Switch, peer: PeerInfo): bool = +proc isConnected*(s: Switch, peerId: PeerID): bool = ## returns true if the peer has one or more ## associated connections (sockets) ## - peer.peerId in s.connManager + peerId in s.connManager proc secure(s: Switch, conn: Connection): Future[Connection] {.async, gcsafe.} = if s.secureManagers.len <= 0: @@ -211,9 +211,8 @@ proc mux(s: Switch, conn: Connection) {.async, gcsafe.} = trace "adding muxer for peer", peer = conn.peerInfo.id s.connManager.storeMuxer(muxer, handlerFut) # update muxer with handler -proc disconnect*(s: Switch, peer: PeerInfo) {.async, gcsafe.} = - if not peer.isNil: - await s.connManager.dropPeer(peer.peerId) +proc disconnect*(s: Switch, peerId: PeerID): Future[void] {.gcsafe.} = + s.connManager.dropPeer(peerId) proc upgradeOutgoing(s: Switch, conn: Connection): Future[Connection] {.async, gcsafe.} = logScope: @@ -279,101 +278,99 @@ proc upgradeIncoming(s: Switch, conn: Connection) {.async, gcsafe.} = await ms.handle(conn, active = true) proc internalConnect(s: Switch, - peer: PeerInfo): Future[Connection] {.async.} = - - if s.peerInfo.peerId == peer.peerId: + peerId: PeerID, + addrs: seq[MultiAddress]): Future[Connection] {.async.} = + if s.peerInfo.peerId == peerId: raise newException(CatchableError, "can't dial self!") - let id = peer.id - var conn: Connection - let lock = s.dialLock.mgetOrPut(id, newAsyncLock()) + var conn = s.connManager.selectConn(peerId) + if conn != nil and not conn.atEof and not conn.closed: + trace "Reusing existing connection", oid = $conn.oid, + direction = $conn.dir, + peer = peerId + + return conn + + let lock = s.dialLock.mgetOrPut($peerId, newAsyncLock()) try: await lock.acquire() - trace "about to dial peer", peer = id - conn = s.connManager.selectConn(peer.peerId) - if conn.isNil or (conn.closed or conn.atEof): - trace "Dialing peer", peer = id - for t in s.transports: # for each transport - for a in peer.addrs: # for each address - if t.handles(a): # check if it can dial it - trace "Dialing address", address = $a, peer = id - try: - conn = await t.dial(a) - # make sure to assign the peer to the connection - conn.peerInfo = peer + trace "Dialing peer", peer = peerId + for t in s.transports: # for each transport + for a in addrs: # for each address + if t.handles(a): # check if it can dial it + trace "Dialing address", address = $a, peer = peerId + try: + conn = await t.dial(a) + # make sure to assign the peer to the connection + conn.peerInfo = PeerInfo.init(peerId, addrs) - conn.closeEvent.wait() - .addCallback do(udata: pointer): - asyncCheck s.triggerHooks( - conn.peerInfo, - Lifecycle.Disconnected) + conn.closeEvent.wait() + .addCallback do(udata: pointer): + asyncCheck s.triggerHooks( + conn.peerInfo, + Lifecycle.Disconnected) - asyncCheck s.triggerHooks(conn.peerInfo, Lifecycle.Connected) - libp2p_dialed_peers.inc() - except CancelledError as exc: - trace "dialing canceled", exc = exc.msg - raise - except CatchableError as exc: - trace "dialing failed", exc = exc.msg - libp2p_failed_dials.inc() - continue + asyncCheck s.triggerHooks(conn.peerInfo, Lifecycle.Connected) + libp2p_dialed_peers.inc() + except CancelledError as exc: + trace "dialing canceled", exc = exc.msg, peer = peerId + raise + except CatchableError as exc: + trace "dialing failed", exc = exc.msg, peer = peerId + libp2p_failed_dials.inc() + continue - try: - let uconn = await s.upgradeOutgoing(conn) - s.connManager.storeOutgoing(uconn) - asyncCheck s.triggerHooks(uconn.peerInfo, Lifecycle.Upgraded) - conn = uconn - trace "dial successful", oid = $conn.oid, peer = $conn.peerInfo - except CatchableError as exc: - if not(isNil(conn)): - await conn.close() + try: + let uconn = await s.upgradeOutgoing(conn) + s.connManager.storeOutgoing(uconn) + asyncCheck s.triggerHooks(uconn.peerInfo, Lifecycle.Upgraded) + conn = uconn + trace "dial successful", oid = $conn.oid, peer = $conn.peerInfo + except CatchableError as exc: + if not(isNil(conn)): + await conn.close() - trace "Unable to establish outgoing link", exc = exc.msg - raise exc + trace "Unable to establish outgoing link", exc = exc.msg, peer = peerId + raise exc - if isNil(conn): - libp2p_failed_upgrade.inc() - continue - break - else: - trace "Reusing existing connection", oid = $conn.oid, - direction = $conn.dir, - peer = $conn.peerInfo + if isNil(conn): + libp2p_failed_upgrade.inc() + continue + break finally: if lock.locked(): lock.release() if isNil(conn): - raise newException(CatchableError, - "Unable to establish outgoing link") + raise newException(CatchableError, "Unable to establish outgoing link") if conn.closed or conn.atEof: await conn.close() - raise newException(CatchableError, - "Connection dead on arrival") + raise newException(CatchableError, "Connection dead on arrival") doAssert(conn in s.connManager, "connection not tracked!") trace "dial successful", oid = $conn.oid, - peer = $conn.peerInfo + peer = shortLog(conn.peerInfo) asyncCheck s.cleanupPubSubPeer(conn) - asyncCheck s.subscribePeer(conn.peerInfo) + asyncCheck s.subscribePeer(peerId) trace "got connection", oid = $conn.oid, direction = $conn.dir, - peer = $conn.peerInfo + peer = shortLog(conn.peerInfo) return conn -proc connect*(s: Switch, peer: PeerInfo) {.async.} = - discard await s.internalConnect(peer) +proc connect*(s: Switch, peerId: PeerID, addrs: seq[MultiAddress]) {.async.} = + discard await s.internalConnect(peerId, addrs) proc dial*(s: Switch, - peer: PeerInfo, + peerId: PeerID, + addrs: seq[MultiAddress], proto: string): Future[Connection] {.async.} = - let conn = await s.internalConnect(peer) + let conn = await s.internalConnect(peerId, addrs) let stream = await s.connManager.getMuxedStream(conn) proc cleanup() {.async.} = @@ -472,17 +469,17 @@ proc stop*(s: Switch) {.async.} = trace "switch stopped" -proc subscribePeerInternal(s: Switch, peerInfo: PeerInfo) {.async, gcsafe.} = +proc subscribePeerInternal(s: Switch, peerId: PeerID) {.async, gcsafe.} = ## Subscribe to pub sub peer ## - if s.pubSub.isSome and not s.pubSub.get().connected(peerInfo): - trace "about to subscribe to pubsub peer", peer = peerInfo.shortLog() + if s.pubSub.isSome and not s.pubSub.get().connected(peerId): + trace "about to subscribe to pubsub peer", peer = peerId var stream: Connection try: - stream = await s.connManager.getMuxedStream(peerInfo.peerId) + stream = await s.connManager.getMuxedStream(peerId) if isNil(stream): - trace "unable to subscribe to peer", peer = peerInfo.shortLog + trace "unable to subscribe to peer", peer = peerId return if not await s.ms.select(stream, s.pubSub.get().codec): @@ -499,38 +496,37 @@ proc subscribePeerInternal(s: Switch, peerInfo: PeerInfo) {.async, gcsafe.} = raise exc except CatchableError as exc: - trace "exception in subscribe to peer", peer = peerInfo.shortLog, + trace "exception in subscribe to peer", peer = peerId, exc = exc.msg if not(isNil(stream)): await stream.close() -proc pubsubMonitor(s: Switch, peer: PeerInfo) {.async.} = +proc pubsubMonitor(s: Switch, peerId: PeerID) {.async.} = ## while peer connected maintain a ## pubsub connection as well ## - while s.isConnected(peer): + while s.isConnected(peerId): try: - trace "subscribing to pubsub peer", peer = $peer - await s.subscribePeerInternal(peer) + trace "subscribing to pubsub peer", peer = peerId + await s.subscribePeerInternal(peerId) except CancelledError as exc: raise exc except CatchableError as exc: - trace "exception in pubsub monitor", peer = $peer, exc = exc.msg + trace "exception in pubsub monitor", peer = peerId, exc = exc.msg finally: - trace "sleeping before trying pubsub peer", peer = $peer + trace "sleeping before trying pubsub peer", peer = peerId await sleepAsync(1.seconds) # allow the peer to cooldown - trace "exiting pubsub monitor", peer = $peer + trace "exiting pubsub monitor", peer = peerId -proc subscribePeer*(s: Switch, peerInfo: PeerInfo): Future[void] {.gcsafe.} = +proc subscribePeer*(s: Switch, peerId: PeerID): Future[void] {.gcsafe.} = ## Waits until ``server`` is not closed. ## var retFuture = newFuture[void]("stream.transport.server.join") let pubsubFut = s.pubsubMonitors.mgetOrPut( - peerInfo.peerId, - s.pubsubMonitor(peerInfo)) + peerId, s.pubsubMonitor(peerId)) proc continuation(udata: pointer) {.gcsafe.} = retFuture.complete() @@ -633,7 +629,7 @@ proc muxerHandler(s: Switch, muxer: Muxer) {.async, gcsafe.} = # try establishing a pubsub connection asyncCheck s.cleanupPubSubPeer(muxer.connection) - asyncCheck s.subscribePeer(muxer.connection.peerInfo) + asyncCheck s.subscribePeer(muxer.connection.peerInfo.peerId) except CancelledError as exc: await muxer.close() @@ -684,3 +680,21 @@ proc newSwitch*(peerInfo: PeerInfo, if pubSub.isSome: result.pubSub = pubSub result.mount(pubSub.get()) + +proc isConnected*(s: Switch, peerInfo: PeerInfo): bool {.deprecated: "Use PeerID version".} = + not isNil(peerInfo) and isConnected(s, peerInfo.peerId) + +proc disconnect*(s: Switch, peerInfo: PeerInfo): Future[void] {.deprecated: "Use PeerID version", gcsafe.} = + disconnect(s, peerInfo.peerId) + +proc connect*(s: Switch, peerInfo: PeerInfo): Future[void] {.deprecated: "Use PeerID version".} = + connect(s, peerInfo.peerId, peerInfo.addrs) + +proc dial*(s: Switch, + peerInfo: PeerInfo, + proto: string): + Future[Connection] {.deprecated: "Use PeerID version".} = + dial(s, peerInfo.peerId, peerInfo.addrs, proto) + +proc subscribePeer*(s: Switch, peerInfo: PeerInfo): Future[void] {.deprecated: "Use PeerID version", gcsafe.} = + subscribePeer(s, peerInfo.peerId)