implicitelly dial pubsub if enabled

This commit is contained in:
Dmitriy Ryajov 2020-02-20 19:14:39 -06:00
parent cb156f3260
commit fbcef69891
4 changed files with 30 additions and 16 deletions

View File

@ -125,7 +125,7 @@ method handleConn*(p: PubSub,
return return
proc handler(peer: PubSubPeer, msgs: seq[RPCMsg]) {.async.} = proc handler(peer: PubSubPeer, msgs: seq[RPCMsg]) {.async.} =
# call floodsub rpc handler # call pubsub rpc handler
await p.rpcHandler(peer, msgs) await p.rpcHandler(peer, msgs)
let peer = p.getPeer(conn.peerInfo, proto) let peer = p.getPeer(conn.peerInfo, proto)

View File

@ -7,7 +7,7 @@
## This file may not be copied, modified, or distributed except according to ## This file may not be copied, modified, or distributed except according to
## those terms. ## those terms.
import tables, sequtils, options, strformat import tables, sequtils, options, strformat, sets
import chronos, chronicles import chronos, chronicles
import connection, import connection,
transports/transport, transports/transport,
@ -45,6 +45,7 @@ type
streamHandler*: StreamHandler streamHandler*: StreamHandler
secureManagers*: Table[string, Secure] secureManagers*: Table[string, Secure]
pubSub*: Option[PubSub] pubSub*: Option[PubSub]
dialedPubSubPeers: HashSet[string]
proc newNoPubSubException(): ref Exception {.inline.} = proc newNoPubSubException(): ref Exception {.inline.} =
result = newException(NoPubSubException, "no pubsub provided!") result = newException(NoPubSubException, "no pubsub provided!")
@ -144,6 +145,9 @@ proc cleanupConn(s: Switch, conn: Connection) {.async, gcsafe.} =
await s.connections[id].close() await s.connections[id].close()
s.connections.del(id) s.connections.del(id)
if id in s.dialedPubSubPeers:
s.dialedPubSubPeers.excl(id)
# TODO: Investigate cleanupConn() always called twice for one peer. # TODO: Investigate cleanupConn() always called twice for one peer.
if not(conn.peerInfo.isClosed()): if not(conn.peerInfo.isClosed()):
conn.peerInfo.close() conn.peerInfo.close()
@ -204,6 +208,8 @@ proc upgradeIncoming(s: Switch, conn: Connection) {.async, gcsafe.} =
# handle secured connections # handle secured connections
await ms.handle(conn) await ms.handle(conn)
proc subscribeToPeer(s: Switch, peerInfo: PeerInfo) {.async, gcsafe.}
proc dial*(s: Switch, proc dial*(s: Switch,
peer: PeerInfo, peer: PeerInfo,
proto: string = ""): proto: string = ""):
@ -244,6 +250,8 @@ proc dial*(s: Switch,
error "Unable to select sub-protocol", proto = proto error "Unable to select sub-protocol", proto = proto
raise newException(CatchableError, &"unable to select protocol: {proto}") raise newException(CatchableError, &"unable to select protocol: {proto}")
await s.subscribeToPeer(peer)
proc mount*[T: LPProtocol](s: Switch, proto: T) {.gcsafe.} = proc mount*[T: LPProtocol](s: Switch, proto: T) {.gcsafe.} =
if isNil(proto.handler): if isNil(proto.handler):
raise newException(CatchableError, raise newException(CatchableError,
@ -291,11 +299,16 @@ proc stop*(s: Switch) {.async.} =
await allFutures(toSeq(s.connections.values).mapIt(s.cleanupConn(it))) await allFutures(toSeq(s.connections.values).mapIt(s.cleanupConn(it)))
await allFutures(s.transports.mapIt(it.close())) await allFutures(s.transports.mapIt(it.close()))
proc subscribeToPeer*(s: Switch, peerInfo: PeerInfo) {.async, gcsafe.} = proc subscribeToPeer(s: Switch, peerInfo: PeerInfo) {.async, gcsafe.} =
## Subscribe to pub sub peer ## Subscribe to pub sub peer
if s.pubSub.isSome: if s.pubSub.isSome and peerInfo.id notin s.dialedPubSubPeers:
try:
s.dialedPubSubPeers.incl(peerInfo.id)
let conn = await s.dial(peerInfo, s.pubSub.get().codec) let conn = await s.dial(peerInfo, s.pubSub.get().codec)
await s.pubSub.get().subscribeToPeer(conn) await s.pubSub.get().subscribeToPeer(conn)
except CatchableError as exc:
trace "unable to initiate pubsub", exc = exc.msg
s.dialedPubSubPeers.excl(peerInfo.id)
proc subscribe*(s: Switch, topic: string, handler: TopicHandler): Future[void] {.gcsafe.} = proc subscribe*(s: Switch, topic: string, handler: TopicHandler): Future[void] {.gcsafe.} =
## subscribe to a pubsub topic ## subscribe to a pubsub topic
@ -351,6 +364,7 @@ proc newSwitch*(peerInfo: PeerInfo,
result.identity = identity result.identity = identity
result.muxers = muxers result.muxers = muxers
result.secureManagers = initTable[string, Secure]() result.secureManagers = initTable[string, Secure]()
result.dialedPubSubPeers = initHashSet[string]()
let s = result # can't capture result let s = result # can't capture result
result.streamHandler = proc(stream: Connection) {.async, gcsafe.} = result.streamHandler = proc(stream: Connection) {.async, gcsafe.} =

View File

@ -8,10 +8,10 @@ proc generateNodes*(num: Natural, gossip: bool = false): seq[Switch] =
result.add(newStandardSwitch(gossip = gossip)) result.add(newStandardSwitch(gossip = gossip))
proc subscribeNodes*(nodes: seq[Switch]) {.async.} = proc subscribeNodes*(nodes: seq[Switch]) {.async.} =
var dials: seq[Future[void]] var dials: seq[Future[Connection]]
for dialer in nodes: for dialer in nodes:
for node in nodes: for node in nodes:
if dialer.peerInfo.peerId != node.peerInfo.peerId: if dialer.peerInfo.peerId != node.peerInfo.peerId:
dials.add(dialer.subscribeToPeer(node.peerInfo)) dials.add(dialer.dial(node.peerInfo))
await sleepAsync(100.millis) await sleepAsync(100.millis)
await allFutures(dials) await allFutures(dials)

View File

@ -93,8 +93,8 @@ proc createNode*(privKey: Option[PrivateKey] = none(PrivateKey),
secureManagers = secureManagers, secureManagers = secureManagers,
pubSub = pubSub) pubSub = pubSub)
proc testPubSubDaemonPublish(gossip: bool = false, count: int = 1): Future[ proc testPubSubDaemonPublish(gossip: bool = false,
bool] {.async.} = count: int = 1): Future[bool] {.async.} =
var pubsubData = "TEST MESSAGE" var pubsubData = "TEST MESSAGE"
var testTopic = "test-topic" var testTopic = "test-topic"
var msgData = cast[seq[byte]](pubsubData) var msgData = cast[seq[byte]](pubsubData)
@ -118,7 +118,7 @@ proc testPubSubDaemonPublish(gossip: bool = false, count: int = 1): Future[
if times >= count and not handlerFuture.finished: if times >= count and not handlerFuture.finished:
handlerFuture.complete(true) handlerFuture.complete(true)
await nativeNode.subscribeToPeer(NativePeerInfo.init(daemonPeer.peer, discard await nativeNode.dial(NativePeerInfo.init(daemonPeer.peer,
daemonPeer.addresses)) daemonPeer.addresses))
await sleepAsync(1.seconds) await sleepAsync(1.seconds)
await daemonNode.connect(nativePeer.peerId, nativePeer.addrs) await daemonNode.connect(nativePeer.peerId, nativePeer.addrs)
@ -140,8 +140,8 @@ proc testPubSubDaemonPublish(gossip: bool = false, count: int = 1): Future[
await allFutures(awaiters) await allFutures(awaiters)
await daemonNode.close() await daemonNode.close()
proc testPubSubNodePublish(gossip: bool = false, count: int = 1): Future[ proc testPubSubNodePublish(gossip: bool = false,
bool] {.async.} = count: int = 1): Future[bool] {.async.} =
var pubsubData = "TEST MESSAGE" var pubsubData = "TEST MESSAGE"
var testTopic = "test-topic" var testTopic = "test-topic"
var msgData = cast[seq[byte]](pubsubData) var msgData = cast[seq[byte]](pubsubData)
@ -157,7 +157,7 @@ proc testPubSubNodePublish(gossip: bool = false, count: int = 1): Future[
let nativePeer = nativeNode.peerInfo let nativePeer = nativeNode.peerInfo
var handlerFuture = newFuture[bool]() var handlerFuture = newFuture[bool]()
await nativeNode.subscribeToPeer(NativePeerInfo.init(daemonPeer.peer, discard await nativeNode.dial(NativePeerInfo.init(daemonPeer.peer,
daemonPeer.addresses)) daemonPeer.addresses))
await sleepAsync(1.seconds) await sleepAsync(1.seconds)