Dial peerid (#308)

* prefer PeerID in switch api

This avoids ref issues like ref identity and nil

* use existing peerinfo instance if possible

* remove secureCodec

there may be multiple connections per peerinfo with different codecs

* avoid some extra async::
This commit is contained in:
Jacek Sieka 2020-08-06 09:29:27 +02:00 committed by GitHub
parent 9bbe5e4841
commit c6c0c152c0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 127 additions and 104 deletions

View File

@ -204,12 +204,9 @@ method unsubscribePeer*(p: PubSub, peerInfo: PeerInfo) {.base, async.} =
if not(isNil(peer)) and not(isNil(peer.conn)): if not(isNil(peer)) and not(isNil(peer.conn)):
await peer.conn.close() await peer.conn.close()
proc connected*(p: PubSub, peerInfo: PeerInfo): bool = proc connected*(p: PubSub, peerId: PeerID): bool =
if peerInfo.id in p.peers: p.peers.withValue($peerId, peer):
let peer = p.peers[peerInfo.id] return peer[] != nil and peer[].connected
if not(isNil(peer)):
return peer.connected
method unsubscribe*(p: PubSub, method unsubscribe*(p: PubSub,
topics: seq[TopicPair]) {.base, async.} = topics: seq[TopicPair]) {.base, async.} =
@ -388,3 +385,6 @@ proc removeObserver*(p: PubSub; observer: PubSubObserver) =
let idx = p.observers[].find(observer) let idx = p.observers[].find(observer)
if idx != -1: if idx != -1:
p.observers[].del(idx) p.observers[].del(idx)
proc connected*(p: PubSub, peerInfo: PeerInfo): bool {.deprecated: "Use PeerID version".} =
peerInfo != nil and connected(p, peerInfo.peerId)

View File

@ -455,14 +455,14 @@ method handshake*(p: Noise, conn: Connection, initiator: bool): Future[SecureCon
let r1 = remoteProof.getField(1, remotePubKeyBytes) let r1 = remoteProof.getField(1, remotePubKeyBytes)
let r2 = remoteProof.getField(2, remoteSigBytes) let r2 = remoteProof.getField(2, remoteSigBytes)
if r1.isErr() or not(r1.get()): if r1.isErr() or not(r1.get()):
raise newException(NoiseHandshakeError, "Failed to deserialize remote public key bytes. (initiator: " & $initiator & ", peer: " & $conn.peerInfo.peerId & ")") raise newException(NoiseHandshakeError, "Failed to deserialize remote public key bytes. (initiator: " & $initiator & ")")
if r2.isErr() or not(r2.get()): if r2.isErr() or not(r2.get()):
raise newException(NoiseHandshakeError, "Failed to deserialize remote signature bytes. (initiator: " & $initiator & ", peer: " & $conn.peerInfo.peerId & ")") raise newException(NoiseHandshakeError, "Failed to deserialize remote signature bytes. (initiator: " & $initiator & ")")
if not remotePubKey.init(remotePubKeyBytes): if not remotePubKey.init(remotePubKeyBytes):
raise newException(NoiseHandshakeError, "Failed to decode remote public key. (initiator: " & $initiator & ", peer: " & $conn.peerInfo.peerId & ")") raise newException(NoiseHandshakeError, "Failed to decode remote public key. (initiator: " & $initiator & ")")
if not remoteSig.init(remoteSigBytes): if not remoteSig.init(remoteSigBytes):
raise newException(NoiseHandshakeError, "Failed to decode remote signature. (initiator: " & $initiator & ", peer: " & $conn.peerInfo.peerId & ")") raise newException(NoiseHandshakeError, "Failed to decode remote signature. (initiator: " & $initiator & ")")
let verifyPayload = PayloadString.toBytes & handshakeRes.rs.getBytes let verifyPayload = PayloadString.toBytes & handshakeRes.rs.getBytes
if not remoteSig.verify(verifyPayload, remotePubKey): if not remoteSig.verify(verifyPayload, remotePubKey):
@ -478,11 +478,17 @@ method handshake*(p: Noise, conn: Connection, initiator: bool): Future[SecureCon
var var
failedKey: PublicKey failedKey: PublicKey
discard extractPublicKey(conn.peerInfo.peerId, failedKey) discard extractPublicKey(conn.peerInfo.peerId, failedKey)
debug "Noise handshake, peer infos don't match!", initiator, dealt_peer = $conn.peerInfo.id, dealt_key = $failedKey, received_peer = $pid, received_key = $remotePubKey debug "Noise handshake, peer infos don't match!",
initiator, dealt_peer = $conn.peerInfo.id,
dealt_key = $failedKey, received_peer = $pid,
received_key = $remotePubKey
raise newException(NoiseHandshakeError, "Noise handshake, peer infos don't match! " & $pid & " != " & $conn.peerInfo.peerId) raise newException(NoiseHandshakeError, "Noise handshake, peer infos don't match! " & $pid & " != " & $conn.peerInfo.peerId)
var tmp = NoiseConnection.init( let peerInfo =
conn, PeerInfo.init(remotePubKey), conn.observedAddr) if conn.peerInfo != nil: conn.peerInfo
else: PeerInfo.init(remotePubKey)
var tmp = NoiseConnection.init(conn, peerInfo, conn.observedAddr)
if initiator: if initiator:
tmp.readCs = handshakeRes.cs2 tmp.readCs = handshakeRes.cs2
@ -494,7 +500,7 @@ method handshake*(p: Noise, conn: Connection, initiator: bool): Future[SecureCon
finally: finally:
burnMem(handshakeRes) burnMem(handshakeRes)
trace "Noise handshake completed!", initiator, peer = $secure.peerInfo trace "Noise handshake completed!", initiator, peer = shortLog(secure.peerInfo)
return secure return secure

View File

@ -246,9 +246,12 @@ proc newSecioConn(conn: Connection,
## Create new secure stream/lpstream, using specified hash algorithm ``hash``, ## Create new secure stream/lpstream, using specified hash algorithm ``hash``,
## cipher algorithm ``cipher``, stretched keys ``secrets`` and order ## cipher algorithm ``cipher``, stretched keys ``secrets`` and order
## ``order``. ## ``order``.
result = SecioConn.init(conn,
PeerInfo.init(remotePubKey), let peerInfo =
conn.observedAddr) if conn.peerInfo != nil: conn.peerInfo
else: PeerInfo.init(remotePubKey)
result = SecioConn.init(conn, peerInfo, conn.observedAddr)
let i0 = if order < 0: 1 else: 0 let i0 = if order < 0: 1 else: 0
let i1 = if order < 0: 0 else: 1 let i1 = if order < 0: 0 else: 1

View File

@ -95,9 +95,9 @@ proc triggerHooks(s: Switch, peer: PeerInfo, cycle: Lifecycle) {.async, gcsafe.}
except CatchableError as exc: except CatchableError as exc:
trace "exception in trigger hooks", exc = exc.msg trace "exception in trigger hooks", exc = exc.msg
proc disconnect*(s: Switch, peer: PeerInfo) {.async, gcsafe.} proc disconnect*(s: Switch, peerId: PeerID) {.async, gcsafe.}
proc subscribePeer*(s: Switch, peerInfo: PeerInfo) {.async, gcsafe.} proc subscribePeer*(s: Switch, peerId: PeerID) {.async, gcsafe.}
proc subscribePeerInternal(s: Switch, peerInfo: PeerInfo) {.async, gcsafe.} proc subscribePeerInternal(s: Switch, peerId: PeerID) {.async, gcsafe.}
proc cleanupPubSubPeer(s: Switch, conn: Connection) {.async.} = proc cleanupPubSubPeer(s: Switch, conn: Connection) {.async.} =
try: try:
@ -114,12 +114,12 @@ proc cleanupPubSubPeer(s: Switch, conn: Connection) {.async.} =
except CatchableError as exc: except CatchableError as exc:
trace "exception cleaning pubsub peer", exc = exc.msg trace "exception cleaning pubsub peer", exc = exc.msg
proc isConnected*(s: Switch, peer: PeerInfo): bool = proc isConnected*(s: Switch, peerId: PeerID): bool =
## returns true if the peer has one or more ## returns true if the peer has one or more
## associated connections (sockets) ## associated connections (sockets)
## ##
peer.peerId in s.connManager peerId in s.connManager
proc secure(s: Switch, conn: Connection): Future[Connection] {.async, gcsafe.} = proc secure(s: Switch, conn: Connection): Future[Connection] {.async, gcsafe.} =
if s.secureManagers.len <= 0: if s.secureManagers.len <= 0:
@ -211,9 +211,8 @@ proc mux(s: Switch, conn: Connection) {.async, gcsafe.} =
trace "adding muxer for peer", peer = conn.peerInfo.id trace "adding muxer for peer", peer = conn.peerInfo.id
s.connManager.storeMuxer(muxer, handlerFut) # update muxer with handler s.connManager.storeMuxer(muxer, handlerFut) # update muxer with handler
proc disconnect*(s: Switch, peer: PeerInfo) {.async, gcsafe.} = proc disconnect*(s: Switch, peerId: PeerID): Future[void] {.gcsafe.} =
if not peer.isNil: s.connManager.dropPeer(peerId)
await s.connManager.dropPeer(peer.peerId)
proc upgradeOutgoing(s: Switch, conn: Connection): Future[Connection] {.async, gcsafe.} = proc upgradeOutgoing(s: Switch, conn: Connection): Future[Connection] {.async, gcsafe.} =
logScope: logScope:
@ -279,101 +278,99 @@ proc upgradeIncoming(s: Switch, conn: Connection) {.async, gcsafe.} =
await ms.handle(conn, active = true) await ms.handle(conn, active = true)
proc internalConnect(s: Switch, proc internalConnect(s: Switch,
peer: PeerInfo): Future[Connection] {.async.} = peerId: PeerID,
addrs: seq[MultiAddress]): Future[Connection] {.async.} =
if s.peerInfo.peerId == peer.peerId: if s.peerInfo.peerId == peerId:
raise newException(CatchableError, "can't dial self!") raise newException(CatchableError, "can't dial self!")
let id = peer.id var conn = s.connManager.selectConn(peerId)
var conn: Connection if conn != nil and not conn.atEof and not conn.closed:
let lock = s.dialLock.mgetOrPut(id, newAsyncLock()) trace "Reusing existing connection", oid = $conn.oid,
direction = $conn.dir,
peer = peerId
return conn
let lock = s.dialLock.mgetOrPut($peerId, newAsyncLock())
try: try:
await lock.acquire() await lock.acquire()
trace "about to dial peer", peer = id trace "Dialing peer", peer = peerId
conn = s.connManager.selectConn(peer.peerId) for t in s.transports: # for each transport
if conn.isNil or (conn.closed or conn.atEof): for a in addrs: # for each address
trace "Dialing peer", peer = id if t.handles(a): # check if it can dial it
for t in s.transports: # for each transport trace "Dialing address", address = $a, peer = peerId
for a in peer.addrs: # for each address try:
if t.handles(a): # check if it can dial it conn = await t.dial(a)
trace "Dialing address", address = $a, peer = id # make sure to assign the peer to the connection
try: conn.peerInfo = PeerInfo.init(peerId, addrs)
conn = await t.dial(a)
# make sure to assign the peer to the connection
conn.peerInfo = peer
conn.closeEvent.wait() conn.closeEvent.wait()
.addCallback do(udata: pointer): .addCallback do(udata: pointer):
asyncCheck s.triggerHooks( asyncCheck s.triggerHooks(
conn.peerInfo, conn.peerInfo,
Lifecycle.Disconnected) Lifecycle.Disconnected)
asyncCheck s.triggerHooks(conn.peerInfo, Lifecycle.Connected) asyncCheck s.triggerHooks(conn.peerInfo, Lifecycle.Connected)
libp2p_dialed_peers.inc() libp2p_dialed_peers.inc()
except CancelledError as exc: except CancelledError as exc:
trace "dialing canceled", exc = exc.msg trace "dialing canceled", exc = exc.msg, peer = peerId
raise raise
except CatchableError as exc: except CatchableError as exc:
trace "dialing failed", exc = exc.msg trace "dialing failed", exc = exc.msg, peer = peerId
libp2p_failed_dials.inc() libp2p_failed_dials.inc()
continue continue
try: try:
let uconn = await s.upgradeOutgoing(conn) let uconn = await s.upgradeOutgoing(conn)
s.connManager.storeOutgoing(uconn) s.connManager.storeOutgoing(uconn)
asyncCheck s.triggerHooks(uconn.peerInfo, Lifecycle.Upgraded) asyncCheck s.triggerHooks(uconn.peerInfo, Lifecycle.Upgraded)
conn = uconn conn = uconn
trace "dial successful", oid = $conn.oid, peer = $conn.peerInfo trace "dial successful", oid = $conn.oid, peer = $conn.peerInfo
except CatchableError as exc: except CatchableError as exc:
if not(isNil(conn)): if not(isNil(conn)):
await conn.close() await conn.close()
trace "Unable to establish outgoing link", exc = exc.msg trace "Unable to establish outgoing link", exc = exc.msg, peer = peerId
raise exc raise exc
if isNil(conn): if isNil(conn):
libp2p_failed_upgrade.inc() libp2p_failed_upgrade.inc()
continue continue
break break
else:
trace "Reusing existing connection", oid = $conn.oid,
direction = $conn.dir,
peer = $conn.peerInfo
finally: finally:
if lock.locked(): if lock.locked():
lock.release() lock.release()
if isNil(conn): if isNil(conn):
raise newException(CatchableError, raise newException(CatchableError, "Unable to establish outgoing link")
"Unable to establish outgoing link")
if conn.closed or conn.atEof: if conn.closed or conn.atEof:
await conn.close() await conn.close()
raise newException(CatchableError, raise newException(CatchableError, "Connection dead on arrival")
"Connection dead on arrival")
doAssert(conn in s.connManager, "connection not tracked!") doAssert(conn in s.connManager, "connection not tracked!")
trace "dial successful", oid = $conn.oid, trace "dial successful", oid = $conn.oid,
peer = $conn.peerInfo peer = shortLog(conn.peerInfo)
asyncCheck s.cleanupPubSubPeer(conn) asyncCheck s.cleanupPubSubPeer(conn)
asyncCheck s.subscribePeer(conn.peerInfo) asyncCheck s.subscribePeer(peerId)
trace "got connection", oid = $conn.oid, trace "got connection", oid = $conn.oid,
direction = $conn.dir, direction = $conn.dir,
peer = $conn.peerInfo peer = shortLog(conn.peerInfo)
return conn return conn
proc connect*(s: Switch, peer: PeerInfo) {.async.} = proc connect*(s: Switch, peerId: PeerID, addrs: seq[MultiAddress]) {.async.} =
discard await s.internalConnect(peer) discard await s.internalConnect(peerId, addrs)
proc dial*(s: Switch, proc dial*(s: Switch,
peer: PeerInfo, peerId: PeerID,
addrs: seq[MultiAddress],
proto: string): proto: string):
Future[Connection] {.async.} = Future[Connection] {.async.} =
let conn = await s.internalConnect(peer) let conn = await s.internalConnect(peerId, addrs)
let stream = await s.connManager.getMuxedStream(conn) let stream = await s.connManager.getMuxedStream(conn)
proc cleanup() {.async.} = proc cleanup() {.async.} =
@ -472,17 +469,17 @@ proc stop*(s: Switch) {.async.} =
trace "switch stopped" trace "switch stopped"
proc subscribePeerInternal(s: Switch, peerInfo: PeerInfo) {.async, gcsafe.} = proc subscribePeerInternal(s: Switch, peerId: PeerID) {.async, gcsafe.} =
## Subscribe to pub sub peer ## Subscribe to pub sub peer
## ##
if s.pubSub.isSome and not s.pubSub.get().connected(peerInfo): if s.pubSub.isSome and not s.pubSub.get().connected(peerId):
trace "about to subscribe to pubsub peer", peer = peerInfo.shortLog() trace "about to subscribe to pubsub peer", peer = peerId
var stream: Connection var stream: Connection
try: try:
stream = await s.connManager.getMuxedStream(peerInfo.peerId) stream = await s.connManager.getMuxedStream(peerId)
if isNil(stream): if isNil(stream):
trace "unable to subscribe to peer", peer = peerInfo.shortLog trace "unable to subscribe to peer", peer = peerId
return return
if not await s.ms.select(stream, s.pubSub.get().codec): if not await s.ms.select(stream, s.pubSub.get().codec):
@ -499,38 +496,37 @@ proc subscribePeerInternal(s: Switch, peerInfo: PeerInfo) {.async, gcsafe.} =
raise exc raise exc
except CatchableError as exc: except CatchableError as exc:
trace "exception in subscribe to peer", peer = peerInfo.shortLog, trace "exception in subscribe to peer", peer = peerId,
exc = exc.msg exc = exc.msg
if not(isNil(stream)): if not(isNil(stream)):
await stream.close() await stream.close()
proc pubsubMonitor(s: Switch, peer: PeerInfo) {.async.} = proc pubsubMonitor(s: Switch, peerId: PeerID) {.async.} =
## while peer connected maintain a ## while peer connected maintain a
## pubsub connection as well ## pubsub connection as well
## ##
while s.isConnected(peer): while s.isConnected(peerId):
try: try:
trace "subscribing to pubsub peer", peer = $peer trace "subscribing to pubsub peer", peer = peerId
await s.subscribePeerInternal(peer) await s.subscribePeerInternal(peerId)
except CancelledError as exc: except CancelledError as exc:
raise exc raise exc
except CatchableError as exc: except CatchableError as exc:
trace "exception in pubsub monitor", peer = $peer, exc = exc.msg trace "exception in pubsub monitor", peer = peerId, exc = exc.msg
finally: finally:
trace "sleeping before trying pubsub peer", peer = $peer trace "sleeping before trying pubsub peer", peer = peerId
await sleepAsync(1.seconds) # allow the peer to cooldown await sleepAsync(1.seconds) # allow the peer to cooldown
trace "exiting pubsub monitor", peer = $peer trace "exiting pubsub monitor", peer = peerId
proc subscribePeer*(s: Switch, peerInfo: PeerInfo): Future[void] {.gcsafe.} = proc subscribePeer*(s: Switch, peerId: PeerID): Future[void] {.gcsafe.} =
## Waits until ``server`` is not closed. ## Waits until ``server`` is not closed.
## ##
var retFuture = newFuture[void]("stream.transport.server.join") var retFuture = newFuture[void]("stream.transport.server.join")
let pubsubFut = s.pubsubMonitors.mgetOrPut( let pubsubFut = s.pubsubMonitors.mgetOrPut(
peerInfo.peerId, peerId, s.pubsubMonitor(peerId))
s.pubsubMonitor(peerInfo))
proc continuation(udata: pointer) {.gcsafe.} = proc continuation(udata: pointer) {.gcsafe.} =
retFuture.complete() retFuture.complete()
@ -633,7 +629,7 @@ proc muxerHandler(s: Switch, muxer: Muxer) {.async, gcsafe.} =
# try establishing a pubsub connection # try establishing a pubsub connection
asyncCheck s.cleanupPubSubPeer(muxer.connection) asyncCheck s.cleanupPubSubPeer(muxer.connection)
asyncCheck s.subscribePeer(muxer.connection.peerInfo) asyncCheck s.subscribePeer(muxer.connection.peerInfo.peerId)
except CancelledError as exc: except CancelledError as exc:
await muxer.close() await muxer.close()
@ -684,3 +680,21 @@ proc newSwitch*(peerInfo: PeerInfo,
if pubSub.isSome: if pubSub.isSome:
result.pubSub = pubSub result.pubSub = pubSub
result.mount(pubSub.get()) result.mount(pubSub.get())
proc isConnected*(s: Switch, peerInfo: PeerInfo): bool {.deprecated: "Use PeerID version".} =
not isNil(peerInfo) and isConnected(s, peerInfo.peerId)
proc disconnect*(s: Switch, peerInfo: PeerInfo): Future[void] {.deprecated: "Use PeerID version", gcsafe.} =
disconnect(s, peerInfo.peerId)
proc connect*(s: Switch, peerInfo: PeerInfo): Future[void] {.deprecated: "Use PeerID version".} =
connect(s, peerInfo.peerId, peerInfo.addrs)
proc dial*(s: Switch,
peerInfo: PeerInfo,
proto: string):
Future[Connection] {.deprecated: "Use PeerID version".} =
dial(s, peerInfo.peerId, peerInfo.addrs, proto)
proc subscribePeer*(s: Switch, peerInfo: PeerInfo): Future[void] {.deprecated: "Use PeerID version", gcsafe.} =
subscribePeer(s, peerInfo.peerId)