no empty proto dials and add connect method

This commit is contained in:
Dmitriy Ryajov 2020-02-25 16:13:05 -06:00
parent 9efc08cb2f
commit eb49d4b218
4 changed files with 47 additions and 29 deletions

View File

@ -209,10 +209,8 @@ proc upgradeIncoming(s: Switch, conn: Connection) {.async, gcsafe.} =
proc subscribeToPeer(s: Switch, peerInfo: PeerInfo) {.async, gcsafe.} proc subscribeToPeer(s: Switch, peerInfo: PeerInfo) {.async, gcsafe.}
proc dial*(s: Switch, proc internalConnect(s: Switch,
peer: PeerInfo, peer: PeerInfo): Future[Connection] {.async.} =
proto: string = ""):
Future[Connection] {.async.} =
let id = peer.id let id = peer.id
trace "Dialing peer", peer = id trace "Dialing peer", peer = id
var conn = s.connections.getOrDefault(id) var conn = s.connections.getOrDefault(id)
@ -239,10 +237,25 @@ proc dial*(s: Switch,
else: else:
trace "Reusing existing connection" trace "Reusing existing connection"
await s.subscribeToPeer(peer)
result = conn
proc connect*(s: Switch, peer: PeerInfo) {.async.} =
var conn = s.internalConnect(peer)
if isNil(conn):
raise newException(CatchableError, "Unable to connect to peer")
proc dial*(s: Switch,
peer: PeerInfo,
proto: string):
Future[Connection] {.async.} =
var conn = await s.internalConnect(peer)
if isNil(conn): if isNil(conn):
raise newException(CatchableError, "Unable to establish outgoing link") raise newException(CatchableError, "Unable to establish outgoing link")
if proto.len > 0 and not conn.closed: if conn.closed:
raise newException(CatchableError, "Connection dead on arrival")
let stream = await s.getMuxedStream(peer) let stream = await s.getMuxedStream(peer)
if not isNil(stream): if not isNil(stream):
trace "Connection is muxed, return muxed stream" trace "Connection is muxed, return muxed stream"
@ -250,11 +263,9 @@ proc dial*(s: Switch,
trace "Attempting to select remote", proto = proto trace "Attempting to select remote", proto = proto
if not await s.ms.select(result, proto): if not await s.ms.select(result, proto):
error "Unable to select sub-protocol", proto = proto warn "Unable to select sub-protocol", proto = proto
raise newException(CatchableError, &"unable to select protocol: {proto}") raise newException(CatchableError, &"unable to select protocol: {proto}")
await s.subscribeToPeer(peer)
proc mount*[T: LPProtocol](s: Switch, proto: T) {.gcsafe.} = proc mount*[T: LPProtocol](s: Switch, proto: T) {.gcsafe.} =
if isNil(proto.handler): if isNil(proto.handler):
raise newException(CatchableError, raise newException(CatchableError,
@ -310,7 +321,7 @@ proc subscribeToPeer(s: Switch, peerInfo: PeerInfo) {.async, gcsafe.} =
let conn = await s.dial(peerInfo, s.pubSub.get().codec) let conn = await s.dial(peerInfo, s.pubSub.get().codec)
await s.pubSub.get().subscribeToPeer(conn) await s.pubSub.get().subscribeToPeer(conn)
except CatchableError as exc: except CatchableError as exc:
warn "unable to initiate pubsub", exc = exc.msg trace "unable to initiate pubsub", exc = exc.msg
s.dialedPubSubPeers.excl(peerInfo.id) s.dialedPubSubPeers.excl(peerInfo.id)
proc subscribe*(s: Switch, topic: string, handler: TopicHandler): Future[void] {.gcsafe.} = proc subscribe*(s: Switch, topic: string, handler: TopicHandler): Future[void] {.gcsafe.} =

View File

@ -1,4 +1,3 @@
import options, tables
import chronos import chronos
import ../../libp2p/standard_setup import ../../libp2p/standard_setup
export standard_setup export standard_setup
@ -8,10 +7,10 @@ proc generateNodes*(num: Natural, gossip: bool = false): seq[Switch] =
result.add(newStandardSwitch(gossip = gossip)) result.add(newStandardSwitch(gossip = gossip))
proc subscribeNodes*(nodes: seq[Switch]) {.async.} = proc subscribeNodes*(nodes: seq[Switch]) {.async.} =
var dials: seq[Future[Connection]] var dials: seq[Future[void]]
for dialer in nodes: for dialer in nodes:
for node in nodes: for node in nodes:
if dialer.peerInfo.peerId != node.peerInfo.peerId: if dialer.peerInfo.peerId != node.peerInfo.peerId:
dials.add(dialer.dial(node.peerInfo)) dials.add(dialer.connect(node.peerInfo))
await sleepAsync(100.millis) await sleepAsync(100.millis)
await allFutures(dials) await allFutures(dials)

View File

@ -118,7 +118,7 @@ proc testPubSubDaemonPublish(gossip: bool = false,
if times >= count and not handlerFuture.finished: if times >= count and not handlerFuture.finished:
handlerFuture.complete(true) handlerFuture.complete(true)
discard await nativeNode.dial(NativePeerInfo.init(daemonPeer.peer, await nativeNode.connect(NativePeerInfo.init(daemonPeer.peer,
daemonPeer.addresses)) daemonPeer.addresses))
await sleepAsync(1.seconds) await sleepAsync(1.seconds)
await daemonNode.connect(nativePeer.peerId, nativePeer.addrs) await daemonNode.connect(nativePeer.peerId, nativePeer.addrs)
@ -157,7 +157,7 @@ proc testPubSubNodePublish(gossip: bool = false,
let nativePeer = nativeNode.peerInfo let nativePeer = nativeNode.peerInfo
var handlerFuture = newFuture[bool]() var handlerFuture = newFuture[bool]()
discard await nativeNode.dial(NativePeerInfo.init(daemonPeer.peer, await nativeNode.connect(NativePeerInfo.init(daemonPeer.peer,
daemonPeer.addresses)) daemonPeer.addresses))
await sleepAsync(1.seconds) await sleepAsync(1.seconds)

View File

@ -91,16 +91,24 @@ suite "Switch":
var peerInfo1, peerInfo2: PeerInfo var peerInfo1, peerInfo2: PeerInfo
var switch1, switch2: Switch var switch1, switch2: Switch
(switch1, peerInfo1) = createSwitch(ma1)
var awaiters: seq[Future[void]] var awaiters: seq[Future[void]]
awaiters.add(await switch1.start())
(switch1, peerInfo1) = createSwitch(ma1)
let testProto = new TestProto
testProto.init()
testProto.codec = TestCodec
switch1.mount(testProto)
(switch2, peerInfo2) = createSwitch(ma2) (switch2, peerInfo2) = createSwitch(ma2)
awaiters.add(await switch1.start())
awaiters.add(await switch2.start()) awaiters.add(await switch2.start())
var conn = await switch2.dial(switch1.peerInfo) await switch2.connect(switch1.peerInfo)
let conn = await switch2.dial(switch1.peerInfo, TestCodec)
await conn.writeLp("Hello!")
let msg = cast[string](await conn.readLp())
check "Hello!" == msg
check isNil(conn) await allFutures(switch1.stop(), switch2.stop())
discard allFutures(switch1.stop(), switch2.stop())
await allFutures(awaiters) await allFutures(awaiters)
result = true result = true