diff --git a/libp2p/peerinfo.nim b/libp2p/peerinfo.nim index 31c2a6bd7..38a148dcf 100644 --- a/libp2p/peerinfo.nim +++ b/libp2p/peerinfo.nim @@ -36,6 +36,9 @@ type privateKey*: PrivateKey of HasPublic: key: Option[PublicKey] + # gossip 1.1 spec related + # https://github.com/libp2p/specs/blob/master/pubsub/gossipsub/gossipsub-v1.1.md#explicit-peering-agreements + maintain*: bool proc id*(p: PeerInfo): string = p.peerId.pretty() diff --git a/libp2p/switch.nim b/libp2p/switch.nim index 909a766e0..848ceff69 100644 --- a/libp2p/switch.nim +++ b/libp2p/switch.nim @@ -41,6 +41,10 @@ declareCounter(libp2p_failed_upgrade, "peers failed upgrade") type NoPubSubException = object of CatchableError + Maintainer = object + loopFut: Future[void] + sleepFut: Future[void] + Switch* = ref object of RootObj peerInfo*: PeerInfo connections*: Table[string, Connection] @@ -54,6 +58,8 @@ type secureManagers*: seq[Secure] pubSub*: Option[PubSub] dialedPubSubPeers: HashSet[string] + running: bool + maintainFuts: Table[string, Maintainer] proc newNoPubSubException(): ref CatchableError {.inline.} = result = newException(NoPubSubException, "no pubsub provided!") @@ -364,6 +370,24 @@ proc stop*(s: Switch) {.async.} = try: trace "stopping switch" + s.running = false + + # Stop explicit peering system (gossip 1.1 related, but useful even with other pubsubs) + # Cancel their sleep as it likely is running for 5 mins + # running is false so they should exit after that + # and so we just wait/ensure all has finished + # Maintain has tryAndWarn so we should not be priting any error here + # nevertheless use checkFutures! + # Notice.. this is ugly but we have no clean way to express a Chain of operations/futures + # and simply post a cancelation/stop from the root of the chain... + let + maintainers = toSeq(s.maintainFuts.values) + sleepFuts = maintainers.mapIt(it.sleepFut) + loopFuts = maintainers.mapIt(it.loopFut) + for f in sleepFuts: f.cancel() + checkFutures(await allFinished(sleepFuts)) + checkFutures(await allFinished(loopFuts)) + # we want to report errors but we do not want to fail # or crash here, cos we need to clean possibly MANY items # and any following conn/transport won't be cleaned up @@ -386,6 +410,16 @@ proc stop*(s: Switch) {.async.} = except CatchableError as exc: warn "error stopping switch", exc = exc.msg +proc maintainPeer(s: Switch, peerInfo: PeerInfo) {.async.} = + while s.running: + tryAndWarn "explicit peer maintain": + var conn = s.connections.getOrDefault(peerInfo.id) + if conn.isNil or conn.closed: + # attempt redial in this case + discard + + await sleepAsync(5.minutes) # spec recommended + proc subscribeToPeer*(s: Switch, peerInfo: PeerInfo) {.async, gcsafe.} = trace "about to subscribe to pubsub peer", peer = peerInfo.shortLog() ## Subscribe to pub sub peer @@ -406,6 +440,10 @@ proc subscribeToPeer*(s: Switch, peerInfo: PeerInfo) {.async, gcsafe.} = await conn.close() finally: s.dialedPubSubPeers.excl(peerInfo.id) + + if peerInfo.maintain: + s.maintainFuts[peerInfo.id].loopFut = maintainPeer(s, peerInfo) + s.maintainFuts[peerInfo.id].sleepFut = newFuture[void]() # stub until real one happens proc subscribe*(s: Switch, topic: string, handler: TopicHandler): Future[void] =