refactor: mailserver cycle

This commit is contained in:
Richard Ramos 2022-01-06 13:19:02 -04:00
parent 35388b0f6d
commit e88a936193
13 changed files with 59 additions and 355 deletions

View File

@ -35,7 +35,6 @@ include event_handling
include signal_handling
proc init*(self: ChatController) =
self.handleMailserverEvents()
self.handleChatEvents()
self.handleSystemEvents()
self.handleSignals()

View File

@ -198,16 +198,6 @@ proc handleChatEvents(self: ChatController) =
#Notifying communities about this change.
self.view.communities.markNotificationsAsRead(markAsReadProps)
proc handleMailserverEvents(self: ChatController) =
let mailserverWorker = self.appService.marathon[MailserverWorker().name]
self.status.events.on("mailserverAvailable") do(e:Args):
self.view.messageView.setLoadingMessages(true)
let task = RequestMessagesTaskArg(
`method`: "requestMessages",
vptr: cast[ByteAddress](self.view.vptr),
slot: "requestAllHistoricMessagesResult"
)
mailserverWorker.start(task)
proc handleSystemEvents(self: ChatController) =
self.status.events.on("osNotificationClicked") do(e:Args):

View File

@ -7,27 +7,6 @@ proc handleSignals(self: ChatController) =
var data = MessageSignal(e)
self.status.chat.update(data.chats, data.messages, data.emojiReactions, data.communities, data.membershipRequests, data.pinnedMessages, data.activityCenterNotification, data.statusUpdates, data.deletedMessages)
self.status.events.on(SignalType.DiscoverySummary.event) do(e:Args):
## Handle mailserver peers being added and removed
var data = DiscoverySummarySignal(e)
let
mailserverWorker = self.appService.marathon[MailserverWorker().name]
task = PeerSummaryChangeTaskArg(
`method`: "peerSummaryChange",
peers: data.enodes
)
mailserverWorker.start(task)
self.status.events.on(SignalType.PeerStats.event) do(e:Args):
var data = PeerStatsSignal(e)
let
mailserverWorker = self.appService.marathon[MailserverWorker().name]
task = PeerSummaryChangeTaskArg(
`method`: "peerSummaryChange",
peers: data.peers
)
mailserverWorker.start(task)
self.status.events.on(SignalType.EnvelopeSent.event) do(e:Args):
var data = EnvelopeSentSignal(e)
self.status.messages.updateStatus(data.messageIds)
@ -52,3 +31,14 @@ proc handleSignals(self: ChatController) =
# TODO: retry mailserver request up to N times or change mailserver
# If > N, then
self.view.hideLoadingIndicator()
let mailserverWorker = self.appService.marathon[MailserverWorker().name]
self.status.events.on(SignalType.MailserverAvailable.event) do(e:Args):
var data = MailserverAvailableSignal(e)
info "active mailserver changed", node=data.address, topics="mailserver-interaction"
self.view.messageView.setLoadingMessages(true)
let task = RequestMessagesTaskArg(
`method`: "requestMessages",
vptr: cast[ByteAddress](self.view.vptr)
)
mailserverWorker.start(task)

View File

@ -3,7 +3,6 @@ import status/[status]
import status/utils as status_utils
import status/chat as status_chat
import status/messages as status_messages
import status/mailservers
import status/contacts as status_contacts
import status/ens as status_ens
import status/chat/[chat]
@ -377,17 +376,6 @@ QtObject:
QtProperty[QVariant] transactions:
read = getTransactions
proc isActiveMailserverResult(self: ChatsView, resultEncoded: string) {.slot.} =
let isActiveMailserverAvailable = decode[bool](resultEncoded)
if isActiveMailserverAvailable:
self.messageView.setLoadingMessages(true)
let
mailserverWorker = self.appService.marathon[MailserverWorker().name]
task = RequestMessagesTaskArg(`method`: "requestMessages")
mailserverWorker.start(task)
proc requestAllHistoricMessagesResult(self: ChatsView, resultEncoded: string) {.slot.} =
self.messageView.setLoadingMessages(true)
proc createCommunityChannel*(self: ChatsView, communityId: string, name: string, description: string, categoryId: string): string {.slot.} =
try:
@ -485,15 +473,6 @@ QtObject:
proc markMessageAsSent*(self: ChatsView, chat: string, messageId: string) =
self.messageView.markMessageAsSent(chat, messageId)
# TODO: this method was created just to test the store functionality.
# It should be removed, once peer management is added to status-go
proc requestAllHistoricMessages(self: ChatsView) {.slot.} =
debug "Requesting messages"
# TODO: the mailservers must change depending on whether we are using wakuV1 or wakuV2
# in the meantime I'm hardcoding a specific mailserver
echo self.status.mailservers.setMailserver("16Uiu2HAm4v86W3bmT1BiH6oSPzcsSr24iDQpSN5Qa992BCjjwgrD")
echo self.status.mailservers.requestAllHistoricMessages()
proc switchTo*(self: ChatsView, communityId: string, channelId: string,
messageId: string) =
## This method displays community with communityId as an active one (if

View File

@ -8,7 +8,6 @@ import status/chat/chat
import status/wallet
import status/types/[account, transaction, setting, profile, mailserver]
import ../../app_service/[main]
import ../../app_service/tasks/marathon/mailserver/events
import eventemitter
import view
import views/[ens_manager, devices, network, mailservers, muted_chats]
@ -95,9 +94,10 @@ proc init*(self: ProfileController, account: Account) =
var evArgs = ChatUpdateArgs(e)
self.view.mutedChats.updateChats(evArgs.chats)
self.status.events.on("mailserver:changed") do(e: Args):
let mailserverArg = MailserverArgs(e)
self.view.mailservers.activeMailserverChanged(mailserverArg.peer)
self.status.events.on(SignalType.MailserverChanged.event) do(e: Args):
let m = MailserverChangedSignal(e)
info "active mailserver changed", node=m.address, topics="mailserver-interaction"
self.view.mailservers.setActiveMailserver(m.address)
self.status.events.on(SignalType.HistoryRequestStarted.event) do(e: Args):
let h = HistoryRequestStartedSignal(e)

View File

@ -15,6 +15,7 @@ QtObject:
status: Status
appService: AppService
mailserversList*: MailServersList
activeMailserver: string
proc setup(self: MailserversView) =
self.QObject.setup
@ -39,21 +40,16 @@ QtObject:
QtProperty[QVariant] list:
read = getMailserversList
proc activeMailserverChanged*(self: MailserversView, activeMailserverName: string) {.signal.}
proc activeMailserverChanged*(self: MailserversView, activeMailserver: string) {.signal.}
proc getActiveMailserver(self: MailserversView): string {.slot.} =
let
mailserverWorker = self.appService.marathon[MailserverWorker().name]
task = GetActiveMailserverTaskArg(
`method`: "getActiveMailserver",
vptr: cast[ByteAddress](self.vptr),
slot: "getActiveMailserverResult"
)
mailserverWorker.start(task)
proc getActiveMailserverResult*(self: MailserversView, activeMailserver: string) {.slot.} =
proc setActiveMailserver*(self: MailserversView, activeMailserver: string) =
self.activeMailserver = activeMailserver
self.activeMailserverChanged(activeMailserver)
QtProperty[string] activeMailserver:
read = activeMailserver
notify = activeMailserverChanged
proc getAutomaticSelection(self: MailserversView): bool {.slot.} =
self.status.settings.getPinnedMailserver() == ""
@ -66,19 +62,10 @@ QtObject:
proc enableAutomaticSelection(self: MailserversView, value: bool) {.slot.} =
if value:
self.activeMailserverChanged(self.activeMailserver)
self.status.settings.pinMailserver()
else:
let
mailserverWorker = self.appService.marathon[MailserverWorker().name]
task = GetActiveMailserverTaskArg(
`method`: "getActiveMailserver",
vptr: cast[ByteAddress](self.vptr),
slot: "getActiveMailserverResult2"
)
mailserverWorker.start(task)
proc getActiveMailserverResult2(self: MailserversView, activeMailserver: string) {.slot.} =
self.status.settings.pinMailserver(activeMailserver)
self.activeMailserverChanged("")
proc save(self: MailserversView, name: string, address: string) {.slot.} =
self.status.settings.saveMailserver(name, address)

View File

@ -146,6 +146,7 @@ var NODE_CONFIG* = %* {
},
"ShhextConfig": {
"BackupDisabledDataDir": "./",
"EnableMailserverCycle": true,
"DataSyncEnabled": true,
"InstallationID": "aef27732-8d86-5039-a32e-bdbe094d8791",
"MailServerConfirmations": true,

View File

@ -1,11 +1,8 @@
import # std libs
strutils
import # vendor libs
chronicles, NimQml, json_serialization
import # status-desktop libs
status/status, ../../common as task_runner_common, ./events
status/status, ../../common as task_runner_common
logScope:
topics = "mailserver controller"
@ -32,8 +29,3 @@ QtObject:
proc delete*(self: MailserverController) =
self.variant.delete
self.QObject.delete
proc receiveEvent(self: MailserverController, eventTuple: string) {.slot.} =
let event = Json.decode(eventTuple, tuple[name: string, arg: MailserverArgs])
trace "forwarding event from long-running mailserver task to the main thread", event=eventTuple
self.status.events.emit(event.name, event.arg)

View File

@ -1,21 +0,0 @@
import # vendor libs
NimQml, json_serialization
import # status-desktop libs
eventemitter
type
MailserverEvents* = ref object
vptr: ByteAddress
MailserverArgs* = ref object of Args
peer*: string
const EVENTS_SLOT = "receiveEvent"
proc newMailserverEvents*(vptr: ByteAddress): MailserverEvents =
new(result)
result.vptr = vptr
proc emit*(self: MailserverEvents, event: string, arg: MailserverArgs) =
let payload: tuple[event: string, arg: MailserverArgs] = (event, arg)
signal_handler(cast[pointer](self.vptr), Json.encode(payload), EVENTS_SLOT)

View File

@ -1,238 +1,46 @@
import
algorithm, chronos, chronicles, json, math, os, random, sequtils, sets,
tables, strutils
from times import cpuTime
chronos, chronicles
import
status/statusgo_backend/settings as status_settings,
status/statusgo_backend/chat as status_chat,
status/statusgo_backend/mailservers as status_mailservers,
status/statusgo_backend/core as status_core,
status/fleet,
./events as mailserver_events
status/statusgo_backend_new/mailservers as status_mailservers,
status/fleet
logScope:
topics = "mailserver model"
################################################################################
## ##
## NOTE: MailserverModel runs on a separate (long-running) thread ##
## ##
## How do mailservers work ? ##
## ##
## - We send a request to the mailserver, we are only interested in the ##
## messages since `last-request` up to the last seven days ##
## and the last 24 hours for topics that were just joined ##
## - The mailserver doesn't directly respond to the request and ##
## instead we start receiving messages in the filters for the requested ##
## topics. ##
## - If the mailserver was not ready when we tried for instance to request ##
## the history of a topic after joining a chat, the request will be done ##
## as soon as the mailserver becomes available ##
## ##
################################################################################
type
MailserverModel* = ref object
mailservers*: seq[string]
events*: MailserverEvents
nodes*: Table[string, MailserverStatus]
activeMailserver*: string
lastConnectionAttempt*: float
wakuVersion*: int
MailserverStatus* = enum
Unknown = -1,
Disconnected = 0,
Connecting = 1
Connected = 2,
proc peerIdFromMultiAddress(nodeAddr: string): string =
let multiAddressParts = nodeAddr.split("/")
return multiAddressParts[multiAddressParts.len - 1]
proc cmpMailserverReply(x, y: (string, int)): int =
if x[1] > y[1]: 1
elif x[1] == y[1]: 0
else: -1
proc poolSize(fleetSize: int): int = ceil(fleetSize / 4).int
proc newMailserverModel*(vptr: ByteAddress): MailserverModel =
result = MailserverModel()
result.events = newMailserverEvents(vptr)
result.nodes = initTable[string, MailserverStatus]()
result.activeMailserver = ""
result.mailservers = @[]
proc init*(self: MailserverModel) =
trace "MailserverModel::init()"
self.wakuVersion = status_settings.getWakuVersion()
let nodeConfig = status_settings.getNodeConfig()
if self.wakuVersion == 2:
# TODO:
# Instead of obtaining the waku2 fleet from fleet.json, expose a method in status-go that will
# return the list of store nodes. (The cluster config can contain dns-discovery urls so it cannot be
# used to populate the list of mailservers)
let fleets = if defined(windows) and defined(production):
"/../resources/fleets.json"
else:
"/../fleets.json"
let fleetConfig = readFile(joinPath(getAppDir(), fleets))
let fleetModel = newFleetModel(fleetConfig)
self.mailservers = toSeq(fleetModel.config.getMailservers(status_settings.getFleet(), isWakuV2=true).values)
else:
for mailserver in nodeConfig["ClusterConfig"]["TrustedMailServers"].getElems():
self.mailservers.add(mailserver.getStr())
for mailserver in status_settings.getMailservers().getElems():
self.mailservers.add(mailserver["address"].getStr())
proc getActiveMailserver*(self: MailserverModel): string = self.activeMailserver
proc isActiveMailserverAvailable*(self: MailserverModel): bool =
if not self.nodes.hasKey(self.activeMailserver):
result = false
else:
result = self.nodes[self.activeMailserver] == MailserverStatus.Connected
proc connect(self: MailserverModel, nodeAddr: string) =
debug "Connecting to mailserver", nodeAddr
var connected = false
# TODO: this should come from settings
var knownMailservers = initHashSet[string]()
for m in self.mailservers:
knownMailservers.incl m
if not knownMailservers.contains(nodeAddr):
warn "Mailserver not known", nodeAddr
return
self.activeMailserver = if self.wakuVersion == 2: peerIdFromMultiAddress(nodeAddr) else: nodeAddr
self.events.emit("mailserver:changed", MailserverArgs(peer: nodeAddr))
# Adding a peer and marking it as connected can't be executed sync in WakuV1, because
# There's a delay between requesting a peer being added, and a signal being
# received after the peer was added. So we first set the peer status as
# Connecting and once a peerConnected signal is received, we mark it as
# Connected
if self.nodes.hasKey(self.activeMailserver) and self.nodes[self.activeMailserver] == MailserverStatus.Connected:
connected = true
else:
# Attempt to connect to mailserver by adding it as a peer
if self.wakuVersion == 2:
if status_core.dialPeer(nodeAddr): # WakuV2 dial is sync (should it be async?)
discard status_mailservers.setMailserver(self.activeMailserver)
self.nodes[self.activeMailserver] = MailserverStatus.Connected
connected = true
else:
status_mailservers.update(nodeAddr)
self.nodes[nodeAddr] = MailserverStatus.Connecting
self.lastConnectionAttempt = cpuTime()
if connected:
info "Mailserver available"
self.events.emit("mailserverAvailable", MailserverArgs())
proc peerSummaryChange*(self: MailserverModel, peers: seq[string]) =
# When a node is added as a peer, or disconnected
# a DiscoverySummary signal is emitted. In here we
# change the status of the nodes the app is connected to
# Connected / Disconnected and emit peerConnected / peerDisconnected
# events.
var mailserverAvailable = false
for knownPeer in self.nodes.keys:
if not peers.contains(knownPeer) and (self.nodes[knownPeer] == MailserverStatus.Connected or (self.nodes[knownPeer] == MailserverStatus.Connecting and (cpuTime() - self.lastConnectionAttempt) > 8)):
info "Peer disconnected", peer=knownPeer
self.nodes[knownPeer] = MailserverStatus.Disconnected
self.events.emit("peerDisconnected", MailserverArgs(peer: knownPeer))
if self.activeMailserver == knownPeer:
warn "Active mailserver disconnected!", peer = knownPeer
self.activeMailserver = ""
for peer in peers:
if self.nodes.hasKey(peer) and (self.nodes[peer] == MailserverStatus.Connected): continue
info "Peer connected", peer
self.nodes[peer] = MailserverStatus.Connected
self.events.emit("peerConnected", MailserverArgs(peer: peer))
if peer == self.activeMailserver:
if self.nodes.hasKey(self.activeMailserver):
if self.activeMailserver == peer:
mailserverAvailable = true
if mailserverAvailable:
info "Mailserver available"
self.events.emit("mailserverAvailable", MailserverArgs())
proc disconnectActiveMailserver(self: MailserverModel) =
try:
warn "Disconnecting active mailserver due to error"
discard status_mailservers.disconnectActiveMailserver()
except Exception as e:
error "error: ", errDescription=e.msg
proc requestMessages*(self: MailserverModel) =
info "Requesting messages from", mailserver=self.activeMailserver
discard status_mailservers.requestAllHistoricMessages()
proc requestStoreMessages*(self: MailserverModel, topics: seq[string], fromValue: int64 = 0, toValue: int64 = 0, force: bool = false) =
info "Requesting messages from", mailserver=self.activeMailserver
let generatedSymKey = status_chat.generateSymKeyFromPassword()
status_mailservers.requestStoreMessages(topics, generatedSymKey, self.activeMailserver, 1000, fromValue, toValue, force)
try:
info "Requesting message history"
discard status_mailservers.requestAllHistoricMessages()
except Exception as e:
error "error: ", errDescription=e.msg
self.disconnectActiveMailserver()
proc requestMoreMessages*(self: MailserverModel, chatId: string) =
info "Requesting more messages from", mailserver=self.activeMailserver, chatId=chatId
discard status_mailservers.syncChatFromSyncedFrom(chatId)
try:
info "Requesting more messages for", chatId=chatId
discard status_mailservers.syncChatFromSyncedFrom(chatId)
except Exception as e:
error "error: ", errDescription=e.msg
self.disconnectActiveMailserver()
proc fillGaps*(self: MailserverModel, chatId: string, messageIds: seq[string]) =
info "Requesting fill gaps from", mailserver=self.activeMailserver, chatId=chatId
discard status_mailservers.fillGaps(chatId, messageIds)
proc findNewMailserver(self: MailserverModel) =
warn "Finding a new mailserver...", wakuVersion=self.wakuVersion
let mailserversReply = parseJson(status_mailservers.ping(self.mailservers, 500, self.wakuVersion == 2))["result"]
var availableMailservers:seq[(string, int)] = @[]
for reply in mailserversReply:
if(reply["error"].kind != JNull): continue # The results with error are ignored
availableMailservers.add((reply["address"].getStr, reply["rttMs"].getInt))
availableMailservers.sort(cmpMailserverReply)
# No mailservers where returned... do nothing.
if availableMailservers.len == 0:
warn "No mailservers available"
return
# Picks a random mailserver amongs the ones with the lowest latency
# The pool size is 1/4 of the mailservers were pinged successfully
randomize()
let mailServer = availableMailservers[rand(poolSize(availableMailservers.len - 1))][0]
self.connect(mailserver)
proc cycleMailservers(self: MailserverModel) =
warn "Automatically switching mailserver"
if self.activeMailserver != "":
info "Disconnecting active mailserver", peer=self.activeMailserver
self.nodes[self.activeMailserver] = MailserverStatus.Disconnected
if self.wakuVersion == 2:
dropPeerByID(self.activeMailserver)
else:
removePeer(self.activeMailserver)
self.activeMailserver = ""
self.findNewMailserver()
proc checkConnection*(self: MailserverModel) {.async.} =
while true:
info "Verifying mailserver connection state..."
let pinnedMailserver = status_settings.getPinnedMailserver()
if self.wakuVersion == 1 and pinnedMailserver != "" and self.activeMailserver != pinnedMailserver:
# connect to current mailserver from the settings
self.mailservers.add(pinnedMailserver)
self.connect(pinnedMailserver)
else:
# or setup a random mailserver:
if not self.isActiveMailserverAvailable:
# TODO: have a timeout for reconnection before changing to a different server
self.cycleMailservers()
await sleepAsync(10.seconds)
try:
info "Requesting fill gaps from", chatId=chatId
discard status_mailservers.fillGaps(chatId, messageIds)
except Exception as e:
error "error: ", errDescription=e.msg
self.disconnectActiveMailserver()

View File

@ -6,9 +6,7 @@ import # vendor libs
import # status-desktop libs
../worker, ./model, ../../qt, ../../common as task_runner_common,
../common as methuselash_common,
status/statusgo_backend/mailservers # TODO: needed for MailserverTopic type, remove?
../common as methuselash_common
export
chronos, task_runner_common, json_serialization
@ -86,12 +84,6 @@ proc processMessage(mailserverModel: MailserverModel, received: string) =
mailserverModel.requestMessages()
taskArg.finish("") # TODO:
of "isActiveMailserverAvailable":
let
taskArg = decode[IsActiveMailserverAvailableTaskArg](received)
output = mailserverModel.isActiveMailserverAvailable()
taskArg.finish(output)
of "requestMessages":
let taskArg = decode[RequestMessagesTaskArg](received)
mailserverModel.requestMessages()
@ -104,16 +96,6 @@ proc processMessage(mailserverModel: MailserverModel, received: string) =
let taskArg = decode[FillGapsTaskArg](received)
mailserverModel.fillGaps(taskArg.chatId, taskArg.messageIds)
of "getActiveMailserver":
let
taskArg = decode[GetActiveMailserverTaskArg](received)
output = mailserverModel.getActiveMailserver()
taskArg.finish(output)
of "peerSummaryChange":
let taskArg = decode[PeerSummaryChangeTaskArg](received)
mailserverModel.peerSummaryChange(taskArg.peers)
else:
error "unknown message", message=received
@ -140,9 +122,6 @@ proc worker(arg: WorkerThreadArg) {.async, gcsafe, nimcall.} =
return
else:
unprocessedMsgs.add received
mailserverModel.init()
discard mailserverModel.checkConnection()
for msg in unprocessedMsgs.items:
mailserverModel.processMessage(msg)

View File

@ -20,8 +20,8 @@ Item {
Connections {
target: root.store.mailservers
onActiveMailserverChanged: function(activeMailserverName){
var mName = root.store.getMailserverName(activeMailserverName)
onActiveMailserverChanged: function(activeMailserver){
var mName = root.store.getMailserverName(activeMailserver)
root.activeMailserver = mName
}
}

2
vendor/status-lib vendored

@ -1 +1 @@
Subproject commit ebd6f2674b6cf030991f5833a46be5d75593e569
Subproject commit 571492e9a6e39bf07d4ae60fa3e8553ab469abe7