fix up merge

This commit is contained in:
Giovanni Petrantoni 2020-07-04 11:51:09 +09:00
parent afb5cb88f9
commit be0375f840
4 changed files with 32 additions and 32 deletions

2
.gitignore vendored
View File

@ -10,3 +10,5 @@ build/
*.exe *.exe
*.dll *.dll
.vscode/ .vscode/
.DS_Store
tests/pubsub/testgossipsub

View File

@ -319,14 +319,14 @@ proc newPubSub*[PubParams: object | bool](P: typedesc[PubSub],
verifySignature: verifySignature, verifySignature: verifySignature,
sign: sign, sign: sign,
cleanupLock: newAsyncLock(), cleanupLock: newAsyncLock(),
msgIdProvider) msgIdProvider: msgIdProvider)
else: else:
result = P(peerInfo: peerInfo, result = P(peerInfo: peerInfo,
triggerSelf: triggerSelf, triggerSelf: triggerSelf,
verifySignature: verifySignature, verifySignature: verifySignature,
sign: sign, sign: sign,
cleanupLock: newAsyncLock(), cleanupLock: newAsyncLock(),
msgIdProvider, msgIdProvider: msgIdProvider,
parameters: params) parameters: params)
result.initPubSub() result.initPubSub()

View File

@ -28,7 +28,6 @@ proc newStandardSwitch*(privKey = none(PrivateKey),
address = MultiAddress.init("/ip4/127.0.0.1/tcp/0").tryGet(), address = MultiAddress.init("/ip4/127.0.0.1/tcp/0").tryGet(),
triggerSelf = false, triggerSelf = false,
gossip = false, gossip = false,
gossipParams = GossipSubParams.init(),
secureManagers: openarray[SecureProtocol] = [ secureManagers: openarray[SecureProtocol] = [
# array cos order matters # array cos order matters
SecureProtocol.Secio, SecureProtocol.Secio,
@ -64,8 +63,7 @@ proc newStandardSwitch*(privKey = none(PrivateKey),
triggerSelf = triggerSelf, triggerSelf = triggerSelf,
verifySignature = verifySignature, verifySignature = verifySignature,
sign = sign, sign = sign,
msgIdProvider = msgIdProvider, msgIdProvider = msgIdProvider).PubSub
gossipParams).PubSub
else: else:
newPubSub(FloodSub, newPubSub(FloodSub,
peerInfo = peerInfo, peerInfo = peerInfo,

View File

@ -509,29 +509,29 @@ proc start*(s: Switch): Future[seq[Future[void]]] {.async, gcsafe.} =
proc stop*(s: Switch) {.async.} = proc stop*(s: Switch) {.async.} =
trace "stopping switch" trace "stopping switch"
s.running = false s.running = false
# Stop explicit peering system (gossip 1.1 related, but useful even with other pubsubs) # Stop explicit peering system (gossip 1.1 related, but useful even with other pubsubs)
# Cancel their sleep as it likely is running for 5 mins # Cancel their sleep as it likely is running for 5 mins
# running is false so they should exit after that # running is false so they should exit after that
# and so we just wait/ensure all has finished # and so we just wait/ensure all has finished
# Maintain has tryAndWarn so we should not be priting any error here # Maintain has tryAndWarn so we should not be priting any error here
# nevertheless use checkFutures! # nevertheless use checkFutures!
# Notice.. this is ugly but we have no clean way to express a Chain of operations/futures # Notice.. this is ugly but we have no clean way to express a Chain of operations/futures
# and simply post a cancelation/stop from the root of the chain... # and simply post a cancelation/stop from the root of the chain...
let let
maintainers = toSeq(s.maintainFuts.values) maintainers = toSeq(s.maintainFuts.values)
sleepFuts = maintainers.mapIt(it.sleepFut) sleepFuts = maintainers.mapIt(it.sleepFut)
loopFuts = maintainers.mapIt(it.loopFut) loopFuts = maintainers.mapIt(it.loopFut)
for f in sleepFuts: f.cancel() for f in sleepFuts: f.cancel()
checkFutures(await allFinished(sleepFuts)) checkFutures(await allFinished(sleepFuts))
checkFutures(await allFinished(loopFuts)) checkFutures(await allFinished(loopFuts))
# we want to report errors but we do not want to fail # we want to report errors but we do not want to fail
# or crash here, cos we need to clean possibly MANY items # or crash here, cos we need to clean possibly MANY items
# and any following conn/transport won't be cleaned up # and any following conn/transport won't be cleaned up
if s.pubSub.isSome: if s.pubSub.isSome:
await s.pubSub.get().stop() await s.pubSub.get().stop()
for conns in toSeq(s.connections.values): for conns in toSeq(s.connections.values):
for conn in conns: for conn in conns:
@ -555,8 +555,8 @@ proc stop*(s: Switch) {.async.} =
proc maintainPeer(s: Switch, peerInfo: PeerInfo) {.async.} = proc maintainPeer(s: Switch, peerInfo: PeerInfo) {.async.} =
while s.running: while s.running:
tryAndWarn "explicit peer maintain": tryAndWarn "explicit peer maintain":
var conn = s.connections.getOrDefault(peerInfo.id) var conns = s.connections.getOrDefault(peerInfo.id)
if conn.isNil or conn.closed: if conns.len == 0:
# attempt re-connect in this case # attempt re-connect in this case
trace "explicit peering, trying to re-connect", peer=peerInfo trace "explicit peering, trying to re-connect", peer=peerInfo
await s.connect(peerInfo) await s.connect(peerInfo)
@ -587,13 +587,13 @@ proc subscribeToPeer*(s: Switch, peerInfo: PeerInfo) {.async, gcsafe.} =
s.dialedPubSubPeers.incl(peerInfo.id) s.dialedPubSubPeers.incl(peerInfo.id)
try: try:
if (await s.ms.select(conn, s.pubSub.get().codec)): if (await s.ms.select(stream, s.pubSub.get().codec)):
await s.pubSub.get().subscribeToPeer(conn) await s.pubSub.get().subscribeToPeer(stream)
else: else:
await conn.close() await stream.close()
except CatchableError as exc: except CatchableError as exc:
trace "exception in subscribe to peer", peer = peerInfo.shortLog, exc = exc.msg trace "exception in subscribe to peer", peer = peerInfo.shortLog, exc = exc.msg
await conn.close() await stream.close()
finally: finally:
s.dialedPubSubPeers.excl(peerInfo.id) s.dialedPubSubPeers.excl(peerInfo.id)