explicit peering part 1

This commit is contained in:
Giovanni Petrantoni 2020-06-23 14:23:00 +09:00
parent 9eb240fb7d
commit b3aebb18e9
2 changed files with 41 additions and 0 deletions

View File

@ -36,6 +36,9 @@ type
privateKey*: PrivateKey privateKey*: PrivateKey
of HasPublic: of HasPublic:
key: Option[PublicKey] key: Option[PublicKey]
# gossip 1.1 spec related
# https://github.com/libp2p/specs/blob/master/pubsub/gossipsub/gossipsub-v1.1.md#explicit-peering-agreements
maintain*: bool
proc id*(p: PeerInfo): string = proc id*(p: PeerInfo): string =
p.peerId.pretty() p.peerId.pretty()

View File

@ -41,6 +41,10 @@ declareCounter(libp2p_failed_upgrade, "peers failed upgrade")
type type
NoPubSubException = object of CatchableError NoPubSubException = object of CatchableError
Maintainer = object
loopFut: Future[void]
sleepFut: Future[void]
Switch* = ref object of RootObj Switch* = ref object of RootObj
peerInfo*: PeerInfo peerInfo*: PeerInfo
connections*: Table[string, Connection] connections*: Table[string, Connection]
@ -54,6 +58,8 @@ type
secureManagers*: seq[Secure] secureManagers*: seq[Secure]
pubSub*: Option[PubSub] pubSub*: Option[PubSub]
dialedPubSubPeers: HashSet[string] dialedPubSubPeers: HashSet[string]
running: bool
maintainFuts: Table[string, Maintainer]
proc newNoPubSubException(): ref CatchableError {.inline.} = proc newNoPubSubException(): ref CatchableError {.inline.} =
result = newException(NoPubSubException, "no pubsub provided!") result = newException(NoPubSubException, "no pubsub provided!")
@ -364,6 +370,24 @@ proc stop*(s: Switch) {.async.} =
try: try:
trace "stopping switch" trace "stopping switch"
s.running = false
# Stop explicit peering system (gossip 1.1 related, but useful even with other pubsubs)
# Cancel their sleep as it likely is running for 5 mins
# running is false so they should exit after that
# and so we just wait/ensure all has finished
# Maintain has tryAndWarn so we should not be priting any error here
# nevertheless use checkFutures!
# Notice.. this is ugly but we have no clean way to express a Chain of operations/futures
# and simply post a cancelation/stop from the root of the chain...
let
maintainers = toSeq(s.maintainFuts.values)
sleepFuts = maintainers.mapIt(it.sleepFut)
loopFuts = maintainers.mapIt(it.loopFut)
for f in sleepFuts: f.cancel()
checkFutures(await allFinished(sleepFuts))
checkFutures(await allFinished(loopFuts))
# we want to report errors but we do not want to fail # we want to report errors but we do not want to fail
# or crash here, cos we need to clean possibly MANY items # or crash here, cos we need to clean possibly MANY items
# and any following conn/transport won't be cleaned up # and any following conn/transport won't be cleaned up
@ -386,6 +410,16 @@ proc stop*(s: Switch) {.async.} =
except CatchableError as exc: except CatchableError as exc:
warn "error stopping switch", exc = exc.msg warn "error stopping switch", exc = exc.msg
proc maintainPeer(s: Switch, peerInfo: PeerInfo) {.async.} =
while s.running:
tryAndWarn "explicit peer maintain":
var conn = s.connections.getOrDefault(peerInfo.id)
if conn.isNil or conn.closed:
# attempt redial in this case
discard
await sleepAsync(5.minutes) # spec recommended
proc subscribeToPeer*(s: Switch, peerInfo: PeerInfo) {.async, gcsafe.} = proc subscribeToPeer*(s: Switch, peerInfo: PeerInfo) {.async, gcsafe.} =
trace "about to subscribe to pubsub peer", peer = peerInfo.shortLog() trace "about to subscribe to pubsub peer", peer = peerInfo.shortLog()
## Subscribe to pub sub peer ## Subscribe to pub sub peer
@ -406,6 +440,10 @@ proc subscribeToPeer*(s: Switch, peerInfo: PeerInfo) {.async, gcsafe.} =
await conn.close() await conn.close()
finally: finally:
s.dialedPubSubPeers.excl(peerInfo.id) s.dialedPubSubPeers.excl(peerInfo.id)
if peerInfo.maintain:
s.maintainFuts[peerInfo.id].loopFut = maintainPeer(s, peerInfo)
s.maintainFuts[peerInfo.id].sleepFut = newFuture[void]() # stub until real one happens
proc subscribe*(s: Switch, topic: string, proc subscribe*(s: Switch, topic: string,
handler: TopicHandler): Future[void] = handler: TopicHandler): Future[void] =