fix race condition by setting the peers on init

This commit is contained in:
Richard Ramos 2021-08-31 18:51:37 -04:00 committed by Iuri Matias
parent d0514f366d
commit 350472ed03
6 changed files with 46 additions and 102 deletions

View File

@ -21,7 +21,7 @@ proc handleSignals(self: ChatController) =
self.status.events.on(SignalType.PeerStats.event) do(e:Args): self.status.events.on(SignalType.PeerStats.event) do(e:Args):
var data = PeerStatsSignal(e) var data = PeerStatsSignal(e)
let let
mailserverWorker = self.status.tasks.marathon[MailserverWorker().name] mailserverWorker = self.appService.marathon[MailserverWorker().name]
task = PeerSummaryChangeTaskArg( task = PeerSummaryChangeTaskArg(
`method`: "peerSummaryChange", `method`: "peerSummaryChange",
peers: data.peers peers: data.peers

View File

@ -1,5 +1,5 @@
import NimQml, chronicles import NimQml, chronicles
import status/[signals, status, node, network] import status/[signals, status, node, network, settings]
import ../../app_service/[main] import ../../app_service/[main]
import eventemitter import eventemitter
import view import view
@ -13,6 +13,7 @@ type NodeController* = ref object
view*: NodeView view*: NodeView
variant*: QVariant variant*: QVariant
networkAccessMananger*: QNetworkAccessManager networkAccessMananger*: QNetworkAccessManager
isWakuV2: bool
proc newController*(status: Status, appService: AppService, nam: QNetworkAccessManager): NodeController = proc newController*(status: Status, appService: AppService, nam: QNetworkAccessManager): NodeController =
result = NodeController() result = NodeController()
@ -26,25 +27,30 @@ proc delete*(self: NodeController) =
delete self.variant delete self.variant
delete self.view delete self.view
proc setPeers(self: NodeController, peers: seq[string]) =
self.status.network.peerSummaryChange(peers)
self.view.setPeerSize(peers.len)
proc init*(self: NodeController) = proc init*(self: NodeController) =
self.isWakuV2 = self.status.settings.getWakuVersion() == 2
self.status.events.on(SignalType.Wallet.event) do(e:Args): self.status.events.on(SignalType.Wallet.event) do(e:Args):
self.view.setLastMessage($WalletSignal(e).blockNumber) self.view.setLastMessage($WalletSignal(e).blockNumber)
self.status.events.on(SignalType.DiscoverySummary.event) do(e:Args): self.status.events.on(SignalType.DiscoverySummary.event) do(e:Args):
var data = DiscoverySummarySignal(e) var data = DiscoverySummarySignal(e)
self.status.network.peerSummaryChange(data.enodes) self.setPeers(data.enodes)
self.view.setPeerSize(data.enodes.len)
self.status.events.on(SignalType.PeerStats.event) do(e:Args): self.status.events.on(SignalType.PeerStats.event) do(e:Args):
var data = PeerStatsSignal(e) var data = PeerStatsSignal(e)
self.status.network.peerSummaryChange(data.peers) self.setPeers(data.peers)
self.view.setPeerSize(data.peers.len)
self.status.events.on(SignalType.Stats.event) do (e:Args): self.status.events.on(SignalType.Stats.event) do (e:Args):
self.view.setStats(StatsSignal(e).stats) self.view.setStats(StatsSignal(e).stats)
self.view.fetchBitsSet() if not self.isWakuV2: self.view.fetchBitsSet()
self.status.events.on(SignalType.ChroniclesLogs.event) do(e:Args): self.status.events.on(SignalType.ChroniclesLogs.event) do(e:Args):
self.view.log(ChroniclesLogsSignal(e).content) self.view.log(ChroniclesLogsSignal(e).content)
self.view.init() self.view.init()
self.setPeers(self.status.network.fetchPeers())

View File

@ -64,7 +64,7 @@ proc init*(self: ProfileController, account: Account) =
self.view.ens.init() self.view.ens.init()
self.view.initialized() self.view.initialized()
for name, endpoint in self.status.fleet.config.getMailservers(self.status.settings.getFleet()).pairs(): for name, endpoint in self.status.fleet.config.getMailservers(self.status.settings.getFleet(), self.status.settings.getWakuVersion() == 2).pairs():
let mailserver = MailServer(name: name, endpoint: endpoint) let mailserver = MailServer(name: name, endpoint: endpoint)
self.view.mailservers.add(mailserver) self.view.mailservers.add(mailserver)

View File

@ -1,6 +1,6 @@
import import
algorithm, chronos, chronicles, json, math, os, random, sequtils, sets, algorithm, chronos, chronicles, json, math, os, random, sequtils, sets,
tables tables, strutils
from times import cpuTime from times import cpuTime
import import
@ -39,6 +39,7 @@ type
activeMailserver*: string activeMailserver*: string
lastConnectionAttempt*: float lastConnectionAttempt*: float
fleet*: FleetModel fleet*: FleetModel
wakuVersion*: int
MailserverStatus* = enum MailserverStatus* = enum
Unknown = -1, Unknown = -1,
@ -46,6 +47,10 @@ type
Connecting = 1 Connecting = 1
Connected = 2, Connected = 2,
proc peerIdFromMultiAddress(nodeAddr: string): string =
let multiAddressParts = nodeAddr.split("/")
return multiAddressParts[multiAddressParts.len - 1]
proc cmpMailserverReply(x, y: (string, int)): int = proc cmpMailserverReply(x, y: (string, int)): int =
if x[1] > y[1]: 1 if x[1] > y[1]: 1
elif x[1] == y[1]: 0 elif x[1] == y[1]: 0
@ -68,7 +73,8 @@ proc init*(self: MailserverModel) =
"/../fleets.json" "/../fleets.json"
let fleetConfig = readFile(joinPath(getAppDir(), fleets)) let fleetConfig = readFile(joinPath(getAppDir(), fleets))
self.fleet = newFleetModel(fleetConfig) self.fleet = newFleetModel(fleetConfig)
self.mailservers = toSeq(self.fleet.config.getMailservers(status_settings.getFleet()).values) self.wakuVersion = status_settings.getWakuVersion()
self.mailservers = toSeq(self.fleet.config.getMailservers(status_settings.getFleet(), self.wakuVersion == 2).values)
for mailserver in status_settings.getMailservers().getElems(): for mailserver in status_settings.getMailservers().getElems():
self.mailservers.add(mailserver["address"].getStr()) self.mailservers.add(mailserver["address"].getStr())
@ -80,33 +86,39 @@ proc isActiveMailserverAvailable*(self: MailserverModel): bool =
else: else:
result = self.nodes[self.activeMailserver] == MailserverStatus.Connected result = self.nodes[self.activeMailserver] == MailserverStatus.Connected
proc connect(self: MailserverModel, enode: string) = proc connect(self: MailserverModel, nodeAddr: string) =
info "Connecting to mailserver", enode=enode.substr[enode.len-40..enode.len-1] debug "Connecting to mailserver", nodeAddr
var connected = false var connected = false
# TODO: this should come from settings # TODO: this should come from settings
var knownMailservers = initHashSet[string]() var knownMailservers = initHashSet[string]()
for m in self.mailservers: for m in self.mailservers:
knownMailservers.incl m knownMailservers.incl m
if not knownMailservers.contains(enode): if not knownMailservers.contains(nodeAddr):
warn "Mailserver not known", enode warn "Mailserver not known", nodeAddr
return return
self.activeMailserver = enode self.activeMailserver = if self.wakuVersion == 2: peerIdFromMultiAddress(nodeAddr) else: nodeAddr
info "Mailserver changed", enode self.events.emit("mailserver:changed", MailserverArgs(peer: nodeAddr))
self.events.emit("mailserver:changed", MailserverArgs(peer: enode))
# Adding a peer and marking it as connected can't be executed sync, because # Adding a peer and marking it as connected can't be executed sync in WakuV1, because
# There's a delay between requesting a peer being added, and a signal being # There's a delay between requesting a peer being added, and a signal being
# received after the peer was added. So we first set the peer status as # received after the peer was added. So we first set the peer status as
# Connecting and once a peerConnected signal is received, we mark it as # Connecting and once a peerConnected signal is received, we mark it as
# Connected # Connected
if self.nodes.hasKey(enode) and self.nodes[enode] == MailserverStatus.Connected: if self.nodes.hasKey(self.activeMailserver) and self.nodes[self.activeMailserver] == MailserverStatus.Connected:
connected = true connected = true
else: else:
# Attempt to connect to mailserver by adding it as a peer # Attempt to connect to mailserver by adding it as a peer
status_mailservers.update(enode) if self.wakuVersion == 2:
self.nodes[enode] = MailserverStatus.Connecting if status_core.dialPeer(nodeAddr): # WakuV2 dial is sync (should it be async?)
discard status_mailservers.setMailserver(self.activeMailserver)
self.nodes[self.activeMailserver] = MailserverStatus.Connected
connected = true
else:
status_mailservers.update(nodeAddr)
self.nodes[nodeAddr] = MailserverStatus.Connecting
self.lastConnectionAttempt = cpuTime() self.lastConnectionAttempt = cpuTime()
if connected: if connected:
@ -165,7 +177,7 @@ proc fillGaps*(self: MailserverModel, chatId: string, messageIds: seq[string]) =
proc findNewMailserver(self: MailserverModel) = proc findNewMailserver(self: MailserverModel) =
warn "Finding a new mailserver..." warn "Finding a new mailserver..."
let mailserversReply = parseJson(status_mailservers.ping(self.mailservers, 500))["result"] let mailserversReply = parseJson(status_mailservers.ping(self.mailservers, 500, self.wakuVersion == 2))["result"]
var availableMailservers:seq[(string, int)] = @[] var availableMailservers:seq[(string, int)] = @[]
for reply in mailserversReply: for reply in mailserversReply:
@ -191,7 +203,10 @@ proc cycleMailservers(self: MailserverModel) =
if self.activeMailserver != "": if self.activeMailserver != "":
info "Disconnecting active mailserver", peer=self.activeMailserver info "Disconnecting active mailserver", peer=self.activeMailserver
self.nodes[self.activeMailserver] = MailserverStatus.Disconnected self.nodes[self.activeMailserver] = MailserverStatus.Disconnected
removePeer(self.activeMailserver) if self.wakuVersion == 2:
dropPeerByID(self.activeMailserver)
else:
removePeer(self.activeMailserver)
self.activeMailserver = "" self.activeMailserver = ""
self.findNewMailserver() self.findNewMailserver()
@ -199,7 +214,7 @@ proc checkConnection*(self: MailserverModel) {.async.} =
while true: while true:
info "Verifying mailserver connection state..." info "Verifying mailserver connection state..."
let pinnedMailserver = status_settings.getPinnedMailserver() let pinnedMailserver = status_settings.getPinnedMailserver()
if pinnedMailserver != "" and self.activeMailserver != pinnedMailserver: if self.wakuVersion == 1 and pinnedMailserver != "" and self.activeMailserver != pinnedMailserver:
# connect to current mailserver from the settings # connect to current mailserver from the settings
self.mailservers.add(pinnedMailserver) self.mailservers.add(pinnedMailserver)
self.connect(pinnedMailserver) self.connect(pinnedMailserver)

View File

@ -1,77 +0,0 @@
import NimQml, tables, json, chronicles, strutils, json_serialization
import ../types as status_types
import types, messages, discovery, whisperFilter, envelopes, expired, wallet, mailserver, communities, stats, peerstats
import ../status
import ../../eventemitter
logScope:
topics = "signals"
QtObject:
type SignalsController* = ref object of QObject
variant*: QVariant
status*: Status
proc newController*(status: Status): SignalsController =
new(result)
result.status = status
result.setup()
result.variant = newQVariant(result)
proc setup(self: SignalsController) =
self.QObject.setup
proc delete*(self: SignalsController) =
self.variant.delete
self.QObject.delete
proc processSignal(self: SignalsController, statusSignal: string) =
var jsonSignal: JsonNode
try:
jsonSignal = statusSignal.parseJson
except:
error "Invalid signal received", data = statusSignal
return
let signalString = jsonSignal["type"].getStr
trace "Raw signal data", data = $jsonSignal
var signalType: SignalType
try:
signalType = parseEnum[SignalType](signalString)
except:
warn "Unknown signal received", type = signalString
signalType = SignalType.Unknown
return
var signal: Signal = case signalType:
of SignalType.Message: messages.fromEvent(jsonSignal)
of SignalType.EnvelopeSent: envelopes.fromEvent(jsonSignal)
of SignalType.EnvelopeExpired: expired.fromEvent(jsonSignal)
of SignalType.WhisperFilterAdded: whisperFilter.fromEvent(jsonSignal)
of SignalType.Wallet: wallet.fromEvent(jsonSignal)
of SignalType.NodeLogin: Json.decode($jsonSignal, NodeSignal)
of SignalType.PeerStats: peerStats.fromEvent(jsonSignal)
of SignalType.DiscoverySummary: discovery.fromEvent(jsonSignal)
of SignalType.MailserverRequestCompleted: mailserver.fromCompletedEvent(jsonSignal)
of SignalType.MailserverRequestExpired: mailserver.fromExpiredEvent(jsonSignal)
of SignalType.CommunityFound: communities.fromEvent(jsonSignal)
of SignalType.Stats: stats.fromEvent(jsonSignal)
else: Signal()
if(signalType == SignalType.NodeLogin):
if(NodeSignal(signal).event.error != ""):
error "node.login", error=NodeSignal(signal).event.error
if(signalType == SignalType.NodeCrashed):
error "node.crashed", error=statusSignal
self.status.events.emit(signalType.event, signal)
proc signalReceived*(self: SignalsController, signal: string) {.signal.}
proc receiveSignal(self: SignalsController, signal: string) {.slot.} =
self.processSignal(signal)
self.signalReceived(signal)

2
vendor/status-lib vendored

@ -1 +1 @@
Subproject commit e29ba9c9faa391ec6c4c504edc37e044c8ff4736 Subproject commit a80869872d822bdc239cc67f98c6fb3a7f54febe