feat: Add Methuselah and Mailserver long-running task

Fixes #2143.

Methuselah is the a task manager for long-running tasks. It allows fo registration of workers that will each run in their own thead. Each worker can pass messages to and recieve messages from the main thread.

MailserverWoker was also introduced which moves all mailserver model logic to a MethuselahWorker. All communication to/from the model is done via a MethuselahTask. Results of the task are returned to the main thread by way of `signal_handler`, which calls the QtObject slot specified in the task.

Mailsever also provides a way for the model to emit events inside of the worker. These events are forwarded to the main thread via the `receiveEvent` slot of the `MailserverController`.

Co-authored-by: Michael Bradley, Jr <michaelsbradleyjr@gmail.com>
This commit is contained in:
Eric Mastro 2021-03-30 12:12:10 +11:00 committed by Iuri Matias
parent 0d677eb156
commit e7571bd2a4
22 changed files with 788 additions and 334 deletions

View File

@ -1,6 +1,5 @@
import NimQml, chronicles, tables import NimQml, chronicles, tables
import ../../status/chat as chat_model import ../../status/chat as chat_model
import ../../status/mailservers as mailserver_model
import ../../status/messages as messages_model import ../../status/messages as messages_model
import ../../status/signals/types import ../../status/signals/types
import ../../status/libstatus/types as status_types import ../../status/libstatus/types as status_types
@ -38,7 +37,6 @@ proc init*(self: ChatController) =
let pubKey = status_settings.getSetting[string](Setting.PublicKey, "0x0") let pubKey = status_settings.getSetting[string](Setting.PublicKey, "0x0")
self.view.pubKey = pubKey self.view.pubKey = pubKey
self.status.mailservers.init()
self.status.chat.init(pubKey) self.status.chat.init(pubKey)
self.status.stickers.init() self.status.stickers.init()
self.view.reactions.init() self.view.reactions.init()

View File

@ -1,6 +1,14 @@
import sugar, sequtils, times, strutils import # std libs
import ../../status/chat/chat as status_chat sugar, sequtils, times, strutils
import ./views/communities
import
stew/faux_closures
import # status-desktop libs
../../status/chat/chat as status_chat, ./views/communities,
../../status/tasks/marathon,
../../status/tasks/marathon/mailserver/worker,
../../status/libstatus/mailservers # TODO: needed for MailserverTopic type, remove?
proc handleChatEvents(self: ChatController) = proc handleChatEvents(self: ChatController) =
# Display already saved messages # Display already saved messages
@ -113,23 +121,33 @@ proc handleChatEvents(self: ChatController) =
else: else:
self.view.stickers.resetBuyAttempt(tx.data.parseInt) self.view.stickers.resetBuyAttempt(tx.data.parseInt)
proc handleMailserverEvents(self: ChatController) = proc handleMailserverEvents(self: ChatController) =
let mailserverWorker = self.status.tasks.marathon[MailserverWorker().name]
self.status.events.on("mailserverTopics") do(e: Args): self.status.events.on("mailserverTopics") do(e: Args):
var topics = TopicArgs(e).topics var topics = TopicArgs(e).topics
for topic in topics: for topic in topics:
topic.lastRequest = times.toUnix(times.getTime()) topic.lastRequest = times.toUnix(times.getTime())
self.status.mailservers.addMailserverTopic(topic) let task = AddMailserverTopicTaskArg(
`method`: "addMailserverTopic",
topic: topic
)
mailserverWorker.start(task)
if(self.status.mailservers.isActiveMailserverAvailable): let task = IsActiveMailserverAvailableTaskArg(
self.view.setLoadingMessages(true) `method`: "isActiveMailserverAvailable",
self.status.mailservers.requestMessages(topics.map(t => t.topic)) vptr: cast[ByteAddress](self.view.vptr),
slot: "isActiveMailserverResult",
topics: topics
)
mailserverWorker.start(task)
self.status.events.on("mailserverAvailable") do(e:Args): self.status.events.on("mailserverAvailable") do(e:Args):
let mailserverTopics = self.status.mailservers.getMailserverTopics() let task = GetMailserverTopicsTaskArg(
var fromValue = times.toUnix(times.getTime()) - 86400 # today - 24 hours `method`: "getMailserverTopics",
vptr: cast[ByteAddress](self.view.vptr),
slot: "getMailserverTopicsResult"
)
mailserverWorker.start(task)
if mailserverTopics.len > 0:
fromValue = min(mailserverTopics.map(topic => topic.lastRequest))
self.view.setLoadingMessages(true)
self.status.mailservers.requestMessages(mailserverTopics.map(t => t.topic), fromValue)

View File

@ -1,3 +1,6 @@
import
../../status/tasks/marathon/mailserver/worker
proc handleSignals(self: ChatController) = proc handleSignals(self: ChatController) =
self.status.events.on(SignalType.Message.event) do(e:Args): self.status.events.on(SignalType.Message.event) do(e:Args):
var data = MessageSignal(e) var data = MessageSignal(e)
@ -6,7 +9,13 @@ proc handleSignals(self: ChatController) =
self.status.events.on(SignalType.DiscoverySummary.event) do(e:Args): self.status.events.on(SignalType.DiscoverySummary.event) do(e:Args):
## Handle mailserver peers being added and removed ## Handle mailserver peers being added and removed
var data = DiscoverySummarySignal(e) var data = DiscoverySummarySignal(e)
self.status.mailservers.peerSummaryChange(data.enodes) let
mailserverWorker = self.status.tasks.marathon[MailserverWorker().name]
task = PeerSummaryChangeTaskArg(
`method`: "peerSummaryChange",
peers: data.enodes
)
mailserverWorker.start(task)
self.status.events.on(SignalType.EnvelopeSent.event) do(e:Args): self.status.events.on(SignalType.EnvelopeSent.event) do(e:Args):
var data = EnvelopeSentSignal(e) var data = EnvelopeSentSignal(e)

View File

@ -1,6 +1,5 @@
import NimQml, Tables, json, sequtils, chronicles, times, re, sugar, strutils, os, strformat, algorithm import NimQml, Tables, json, sequtils, chronicles, times, re, sugar, strutils, os, strformat, algorithm
import ../../status/status import ../../status/status
import ../../status/mailservers
import ../../status/libstatus/chat as libstatus_chat import ../../status/libstatus/chat as libstatus_chat
import ../../status/libstatus/accounts/constants import ../../status/libstatus/accounts/constants
import ../../status/libstatus/mailservers as status_mailservers import ../../status/libstatus/mailservers as status_mailservers
@ -17,6 +16,7 @@ import web3/[conversions, ethtypes]
import views/[channels_list, message_list, chat_item, suggestions_list, reactions, stickers, groups, transactions, communities, community_list, community_item] import views/[channels_list, message_list, chat_item, suggestions_list, reactions, stickers, groups, transactions, communities, community_list, community_item]
import ../utils/image_utils import ../utils/image_utils
import ../../status/tasks/[qt, task_runner_impl] import ../../status/tasks/[qt, task_runner_impl]
import ../../status/tasks/marathon/mailserver/worker
logScope: logScope:
topics = "chats-view" topics = "chats-view"
@ -199,10 +199,22 @@ QtObject:
if self.status.chat.lastMessageTimestamps.hasKey(self.activeChannel.id): if self.status.chat.lastMessageTimestamps.hasKey(self.activeChannel.id):
if force or self.status.chat.lastMessageTimestamps[self.activeChannel.id] <= self.oldestMessageTimestamp: if force or self.status.chat.lastMessageTimestamps[self.activeChannel.id] <= self.oldestMessageTimestamp:
self.oldestMessageTimestamp = self.status.chat.lastMessageTimestamps[self.activeChannel.id] self.oldestMessageTimestamp = self.status.chat.lastMessageTimestamps[self.activeChannel.id]
self.oldestMessageTimestampChanged()
else: else:
let topics = self.status.mailservers.getMailserverTopicsByChatId(self.activeChannel.id) let
if topics.len > 0: mailserverWorker = self.status.tasks.marathon[MailserverWorker().name]
self.oldestMessageTimestamp = topics[0].lastRequest task = GetMailserverTopicsByChatIdTaskArg(
`method`: "getMailserverTopicsByChatId",
vptr: cast[ByteAddress](self.vptr),
slot: "getMailserverTopicsByChatIdResult",
chatId: self.activeChannel.id
)
mailserverWorker.start(task)
proc getMailserverTopicsByChatIdResult(self: ChatsView, topicsEncoded: string) {.slot.} =
let topicsTuple = decode[tuple[topics: seq[MailserverTopic], fetchRange: int]](topicsEncoded)
if topicsTuple.topics.len > 0:
self.oldestMessageTimestamp = topicsTuple.topics[0].lastRequest
else: else:
self.oldestMessageTimestamp = times.toUnix(times.getTime()) self.oldestMessageTimestamp = times.toUnix(times.getTime())
self.oldestMessageTimestampChanged() self.oldestMessageTimestampChanged()
@ -622,22 +634,54 @@ QtObject:
write = setLoadingMessages write = setLoadingMessages
notify = loadingMessagesChanged notify = loadingMessagesChanged
proc getMailserverTopicsByChatIdResult2(self: ChatsView, topicsEncoded: string) {.slot.} =
let
topicsTuple = decode[tuple[topics: seq[MailserverTopic], fetchRange: int]](topicsEncoded)
currentOldestMessageTimestamp = self.oldestMessageTimestamp
self.oldestMessageTimestamp = self.oldestMessageTimestamp - topicsTuple.fetchRange
let
mailserverWorker = self.status.tasks.marathon[MailserverWorker().name]
task = RequestMessagesTaskArg(
`method`: "requestMessages",
vptr: cast[ByteAddress](self.vptr),
slot: "requestMessagesResult",
topics: topicsTuple.topics.map(topic => topic.topic),
fromValue: self.oldestMessageTimestamp,
toValue: currentOldestMessageTimestamp,
force: true
)
mailserverWorker.start(task)
proc requestMoreMessages*(self: ChatsView, fetchRange: int) {.slot.} = proc requestMoreMessages*(self: ChatsView, fetchRange: int) {.slot.} =
self.loadingMessages = true self.loadingMessages = true
self.loadingMessagesChanged(true) self.loadingMessagesChanged(true)
let mailserverWorker = self.status.tasks.marathon[MailserverWorker().name]
var allTopics: seq[string] = @[]
if(self.activeChannel.isTimelineChat): if(self.activeChannel.isTimelineChat):
var chatIds: seq[string] = @[]
for contact in self.status.contacts.getContacts(): for contact in self.status.contacts.getContacts():
for t in self.status.mailservers.getMailserverTopicsByChatId(getTimelineChatId(contact.id)).map(topic => topic.topic): chatIds.add(getTimelineChatId(contact.id))
allTopics.add(t)
let task = GetMailserverTopicsByChatIdsTaskArg(
`method`: "getMailserverTopicsByChatIds",
vptr: cast[ByteAddress](self.vptr),
slot: "getMailserverTopicsByChatIdResult2",
chatIds: chatIds,
fetchRange: fetchRange
)
mailserverWorker.start(task)
else: else:
allTopics = self.status.mailservers.getMailserverTopicsByChatId(self.activeChannel.id).map(topic => topic.topic) let task = GetMailserverTopicsByChatIdTaskArg(
`method`: "getMailserverTopicsByChatId",
vptr: cast[ByteAddress](self.vptr),
slot: "getMailserverTopicsByChatIdResult2",
chatId: self.activeChannel.id,
fetchRange: fetchRange
)
mailserverWorker.start(task)
let currentOldestMessageTimestamp = self.oldestMessageTimestamp proc requestMessagesResult(self: ChatsView, resultEncoded: string) {.slot.} =
self.oldestMessageTimestamp = self.oldestMessageTimestamp - fetchRange
self.status.mailservers.requestMessages(allTopics, self.oldestMessageTimestamp, currentOldestMessageTimestamp, true)
self.oldestMessageTimestampChanged() self.oldestMessageTimestampChanged()
self.messagesLoaded(); self.messagesLoaded();
@ -648,11 +692,23 @@ QtObject:
if (self.activeChannel.id == selectedChannel.id): if (self.activeChannel.id == selectedChannel.id):
self.activeChannel.chatItem = nil self.activeChannel.chatItem = nil
self.status.chat.leave(selectedChannel.id) self.status.chat.leave(selectedChannel.id)
self.status.mailservers.deleteMailserverTopic(selectedChannel.id) let
mailserverWorker = self.status.tasks.marathon[MailserverWorker().name]
task = DeleteMailserverTopicTaskArg(
`method`: "deleteMailserverTopic",
chatId: selectedChannel.id
)
mailserverWorker.start(task)
proc leaveActiveChat*(self: ChatsView) {.slot.} = proc leaveActiveChat*(self: ChatsView) {.slot.} =
self.status.chat.leave(self.activeChannel.id) self.status.chat.leave(self.activeChannel.id)
self.status.mailservers.deleteMailserverTopic(self.activeChannel.id) let
mailserverWorker = self.status.tasks.marathon[MailserverWorker().name]
task = DeleteMailserverTopicTaskArg(
`method`: "deleteMailserverTopic",
chatId: self.activeChannel.id
)
mailserverWorker.start(task)
proc removeChat*(self: ChatsView, chatId: string) = proc removeChat*(self: ChatsView, chatId: string) =
discard self.chats.removeChatItemFromList(chatId) discard self.chats.removeChatItemFromList(chatId)
@ -843,3 +899,33 @@ QtObject:
idx = idx + 1 idx = idx + 1
if(id == msg.id): return idx if(id == msg.id): return idx
return idx return idx
proc isActiveMailserverResult(self: ChatsView, resultEncoded: string) {.slot.} =
let arg = decode[tuple[isActiveMailserverAvailable: bool, topics: seq[MailserverTopic]]](resultEncoded)
if arg.isActiveMailserverAvailable:
self.setLoadingMessages(true)
let
mailserverWorker = self.status.tasks.marathon[MailserverWorker().name]
task = RequestMessagesTaskArg(
`method`: "requestMessages",
topics: arg.topics.map(t => t.topic)
)
mailserverWorker.start(task)
proc getMailserverTopicsResult(self: ChatsView, resultEncoded: string) {.slot.} =
let mailserverTopics = decode[seq[MailserverTopic]](resultEncoded)
var fromValue = times.toUnix(times.getTime()) - 86400 # today - 24 hours
if mailserverTopics.len > 0:
fromValue = min(mailserverTopics.map(topic => topic.lastRequest))
self.setLoadingMessages(true)
let
mailserverWorker = self.status.tasks.marathon[MailserverWorker().name]
task = RequestMessagesTaskArg(
`method`: "requestMessages",
topics: mailserverTopics.map(t => t.topic),
fromValue: fromValue
)
mailserverWorker.start(task)

View File

@ -17,6 +17,7 @@ import view
import views/[ens_manager, devices, network, mailservers, contacts] import views/[ens_manager, devices, network, mailservers, contacts]
import ../chat/views/channels_list import ../chat/views/channels_list
import chronicles import chronicles
import ../../status/tasks/marathon/mailserver/events
type ProfileController* = ref object type ProfileController* = ref object
view*: ProfileView view*: ProfileView
@ -109,7 +110,8 @@ proc init*(self: ProfileController, account: Account) =
self.view.contacts.setContactList(contacts) self.view.contacts.setContactList(contacts)
self.status.events.on("mailserver:changed") do(e: Args): self.status.events.on("mailserver:changed") do(e: Args):
self.view.mailservers.activeMailserverChanged() let mailserverArg = MailserverArgs(e)
self.view.mailservers.activeMailserverChanged(mailserverArg.peer)
self.status.events.on(SignalType.Message.event) do(e: Args): self.status.events.on(SignalType.Message.event) do(e: Args):
let msgData = MessageSignal(e); let msgData = MessageSignal(e);

View File

@ -1,9 +1,9 @@
import NimQml, chronicles import NimQml, chronicles
import ../../../status/status import ../../../status/status
import ../../../status/mailservers
import ../../../status/profile/mailserver import ../../../status/profile/mailserver
import mailservers_list import mailservers_list
import ../../../status/libstatus/settings as status_settings import ../../../status/libstatus/settings as status_settings
import ../../../status/tasks/marathon/mailserver/worker
logScope: logScope:
topics = "mailservers-view" topics = "mailservers-view"
@ -35,14 +35,20 @@ QtObject:
QtProperty[QVariant] list: QtProperty[QVariant] list:
read = getMailserversList read = getMailserversList
proc activeMailserverChanged*(self: MailserversView, activeMailserverName: string) {.signal.}
proc getActiveMailserver(self: MailserversView): string {.slot.} = proc getActiveMailserver(self: MailserversView): string {.slot.} =
return self.mailserversList.getMailserverName(self.status.mailservers.getActiveMailserver()) let
mailserverWorker = self.status.tasks.marathon[MailserverWorker().name]
task = GetActiveMailserverTaskArg(
`method`: "getActiveMailserver",
vptr: cast[ByteAddress](self.vptr),
slot: "getActiveMailserverResult"
)
mailserverWorker.start(task)
proc activeMailserverChanged*(self: MailserversView) {.signal.} proc getActiveMailserverResult*(self: MailserversView, activeMailserver: string) {.slot.} =
self.activeMailserverChanged(activeMailserver)
QtProperty[string] activeMailserver:
read = getActiveMailserver
notify = activeMailserverChanged
proc getAutomaticSelection(self: MailserversView): bool {.slot.} = proc getAutomaticSelection(self: MailserversView): bool {.slot.} =
status_settings.getPinnedMailserver() == "" status_settings.getPinnedMailserver() == ""
@ -58,7 +64,17 @@ QtObject:
if value: if value:
status_settings.pinMailserver() status_settings.pinMailserver()
else: else:
status_settings.pinMailserver(self.status.mailservers.getActiveMailserver()) let
mailserverWorker = self.status.tasks.marathon[MailserverWorker().name]
task = GetActiveMailserverTaskArg(
`method`: "getActiveMailserver",
vptr: cast[ByteAddress](self.vptr),
slot: "getActiveMailserverResult2"
)
mailserverWorker.start(task)
proc getActiveMailserverResult2(self: MailserversView, activeMailserver: string) {.slot.} =
status_settings.pinMailserver(activeMailserver)
proc save(self: MailserversView, name: string, address: string) {.slot.} = proc save(self: MailserversView, name: string, address: string) {.slot.} =
status_settings.saveMailserver(name, address) status_settings.saveMailserver(name, address)

View File

@ -15,8 +15,11 @@ import status/libstatus/accounts/constants
import status_go import status_go
import status/status as statuslib import status/status as statuslib
import ./eventemitter import ./eventemitter
import ./status/tasks/marathon/mailserver/controller as mailserver_controller
import ./status/tasks/marathon/mailserver/worker as mailserver_worker
var signalsQObjPointer: pointer var signalsQObjPointer: pointer
var mailserverQObjPointer: pointer
logScope: logScope:
topics = "main" topics = "main"
@ -28,7 +31,14 @@ proc mainProc() =
else: else:
"/../fleets.json" "/../fleets.json"
let status = statuslib.newStatusInstance(readFile(joinPath(getAppDir(), fleets))) let
fleetConfig = readFile(joinPath(getAppDir(), fleets))
status = statuslib.newStatusInstance(fleetConfig)
mailserverController = mailserver_controller.newController(status)
mailserverWorker = mailserver_worker.newMailserverWorker(cast[ByteAddress](mailserverController.vptr))
# TODO: create and register an ipcWorker
status.tasks.marathon.registerWorker(mailserverWorker)
status.initNode() status.initNode()
enableHDPI() enableHDPI()
@ -117,6 +127,9 @@ proc mainProc() =
status.events.once("login") do(a: Args): status.events.once("login") do(a: Args):
var args = AccountArgs(a) var args = AccountArgs(a)
status.tasks.marathon.onLoggedIn()
# Delete login and onboarding from memory to remove any mnemonic that would have been saved in the accounts list # Delete login and onboarding from memory to remove any mnemonic that would have been saved in the accounts list
login.delete() login.delete()
onboarding.delete() onboarding.delete()
@ -132,7 +145,6 @@ proc mainProc() =
wallet.checkPendingTransactions() wallet.checkPendingTransactions()
wallet.start() wallet.start()
engine.setRootContextProperty("loginModel", login.variant) engine.setRootContextProperty("loginModel", login.variant)
engine.setRootContextProperty("onboardingModel", onboarding.variant) engine.setRootContextProperty("onboardingModel", onboarding.variant)
@ -183,6 +195,7 @@ proc mainProc() =
initControllers() initControllers()
engine.setRootContextProperty("signals", signalController.variant) engine.setRootContextProperty("signals", signalController.variant)
engine.setRootContextProperty("mailserver", mailserverController.variant)
engine.load(newQUrl("qrc:///main.qml")) engine.load(newQUrl("qrc:///main.qml"))

View File

@ -1,15 +1,12 @@
import ../eventemitter
import json import json
import libstatus/types import libstatus/types
type type
FleetModel* = ref object FleetModel* = ref object
events*: EventEmitter
config*: FleetConfig config*: FleetConfig
proc newFleetModel*(events: EventEmitter, fleetConfigJson: string): FleetModel = proc newFleetModel*(fleetConfigJson: string): FleetModel =
result = FleetModel() result = FleetModel()
result.events = events
result.config = fleetConfigJson.toFleetConfig() result.config = fleetConfigJson.toFleetConfig()
proc delete*(self: FleetModel) = proc delete*(self: FleetModel) =

View File

@ -169,7 +169,6 @@ proc sendImageMessage*(chatId: string, image: string): string =
proc sendImageMessages*(chatId: string, images: var seq[string]): string = proc sendImageMessages*(chatId: string, images: var seq[string]): string =
let let
preferredUsername = getSetting[string](Setting.PreferredUsername, "") preferredUsername = getSetting[string](Setting.PreferredUsername, "")
debugEcho ">>> [status/libstatus/chat.sendImageMessages] about to send images"
let imagesJson = %* images.map(image => %* let imagesJson = %* images.map(image => %*
{ {
"chatId": chatId, "chatId": chatId,
@ -179,7 +178,6 @@ proc sendImageMessages*(chatId: string, images: var seq[string]): string =
"text": "Update to latest version to see a nice image here!" "text": "Update to latest version to see a nice image here!"
} }
) )
debugEcho ">>> [status/libstatus/chat.sendImageMessages] imagesJson:", $imagesJson
callPrivateRPC("sendChatMessages".prefix, %* [imagesJson]) callPrivateRPC("sendChatMessages".prefix, %* [imagesJson])
proc sendStickerMessage*(chatId: string, sticker: Sticker): string = proc sendStickerMessage*(chatId: string, sticker: Sticker): string =

View File

@ -1,273 +0,0 @@
import algorithm, json, random, math, os, tables, sets, chronicles, sequtils, locks, sugar, times
import libstatus/core as status_core
import libstatus/chat as status_chat
import libstatus/settings as status_settings
import libstatus/types
import libstatus/mailservers as status_mailservers
import ../eventemitter
import fleet
# How do mailserver should work ?
#
# - We send a request to the mailserver, we are only interested in the
# messages since `last-request` up to the last seven days
# and the last 24 hours for topics that were just joined
# - The mailserver doesn't directly respond to the request and
# instead we start receiving messages in the filters for the requested
# topics.
# - If the mailserver was not ready when we tried for instance to request
# the history of a topic after joining a chat, the request will be done
# as soon as the mailserver becomes available
logScope:
topics = "mailserver-model"
var nodesLock: Lock
var activeMailserverLock: Lock
type
MailserverArg* = ref object of Args
peer*: string
MailserverStatus* = enum
Unknown = -1,
Disconnected = 0,
Connecting = 1
Connected = 2,
MailserverModel* = ref object
mailservers*: seq[string]
events*: EventEmitter
nodes*: Table[string, MailserverStatus]
activeMailserver*: string
topics*: HashSet[string]
lastConnectionAttempt*: float
fleet*: FleetModel
proc cmpMailserverReply(x, y: (string, int)): int =
if x[1] > y[1]: 1
elif x[1] == y[1]: 0
else: -1
proc poolSize(fleetSize: int): int = ceil(fleetSize / 4).int
var mailserverModel: MailServerModel
var modelLock: Lock
var connThread: Thread[void]
proc newMailserverModel*(fleet: FleetModel, events: EventEmitter): MailserverModel =
result = MailserverModel()
result.events = events
result.fleet = fleet
result.nodes = initTable[string, MailserverStatus]()
result.activeMailserver = ""
mailserverModel = result
modelLock.initLock()
nodesLock.initLock()
activeMailserverLock.initLock()
proc getActiveMailserver*(self:MailserverModel): string =
withLock activeMailserverLock:
result = self.activeMailserver
proc isActiveMailserverAvailable*(self:MailserverModel): bool =
activeMailserverLock.acquire()
nodesLock.acquire()
if not self.nodes.hasKey(self.activeMailserver):
result = false
else:
result = self.nodes[self.activeMailserver] == MailserverStatus.Connected
nodesLock.release()
activeMailserverLock.release()
proc connect(self: MailserverModel, enode: string) =
debug "Connecting to mailserver", enode=enode.substr[enode.len-40..enode.len-1]
var connected = false
# TODO: this should come from settings
var knownMailservers = initHashSet[string]()
for m in self.mailservers:
knownMailservers.incl m
if not knownMailservers.contains(enode):
warn "Mailserver not known", enode
return
activeMailserverLock.acquire()
nodesLock.acquire()
self.activeMailserver = enode
self.events.emit("mailserver:changed", Args())
# Adding a peer and marking it as connected can't be executed sync, because
# There's a delay between requesting a peer being added, and a signal being
# received after the peer was added. So we first set the peer status as
# Connecting and once a peerConnected signal is received, we mark it as
# Connected
if self.nodes.hasKey(enode) and self.nodes[enode] == MailserverStatus.Connected:
status_mailservers.update(enode)
connected = true
else:
# Attempt to connect to mailserver by adding it as a peer
self.nodes[enode] = MailserverStatus.Connecting
addPeer(enode)
self.lastConnectionAttempt = cpuTime()
nodesLock.release()
activeMailserverLock.release()
if connected:
self.events.emit("mailserverAvailable", Args())
proc peerSummaryChange*(self: MailserverModel, peers: seq[string]) =
# When a node is added as a peer, or disconnected
# a DiscoverySummary signal is emitted. In here we
# change the status of the nodes the app is connected to
# Connected / Disconnected and emit peerConnected / peerDisconnected
# events.
var mailserverAvailable = false
withLock nodesLock:
for knownPeer in self.nodes.keys:
if not peers.contains(knownPeer) and self.nodes[knownPeer] != MailserverStatus.Disconnected:
debug "Peer disconnected", peer=knownPeer
self.nodes[knownPeer] = MailserverStatus.Disconnected
self.events.emit("peerDisconnected", MailserverArg(peer: knownPeer))
withLock activeMailserverLock:
if self.activeMailserver == knownPeer:
warn "Active mailserver disconnected!", peer = knownPeer
self.activeMailserver = ""
for peer in peers:
if self.nodes.hasKey(peer) and (self.nodes[peer] == MailserverStatus.Connected): continue
debug "Peer connected", peer
self.nodes[peer] = MailserverStatus.Connected
self.events.emit("peerConnected", MailserverArg(peer: peer))
withLock activeMailserverLock:
if peer == self.activeMailserver:
if self.nodes.hasKey(self.activeMailserver):
if self.activeMailserver == peer:
mailserverAvailable = true
status_mailservers.update(peer)
if mailserverAvailable:
debug "Mailserver available"
self.events.emit("mailserverAvailable", Args())
proc requestMessages*(self: MailserverModel, topics: seq[string], fromValue: int64 = 0, toValue: int64 = 0, force: bool = false) =
withLock activeMailserverLock:
debug "Requesting messages from", mailserver=self.activeMailserver
let generatedSymKey = status_chat.generateSymKeyFromPassword()
status_mailservers.requestMessages(topics, generatedSymKey, self.activeMailserver, 1000, fromValue, toValue, force)
proc getMailserverTopics*(self: MailserverModel): seq[MailserverTopic] =
let response = status_mailservers.getMailserverTopics()
let topics = parseJson(response)["result"]
var newTopic: MailserverTopic
result = @[]
if topics.kind != JNull:
for topic in topics:
newTopic = MailserverTopic(
topic: topic["topic"].getStr,
discovery: topic["discovery?"].getBool,
negotiated: topic["negotiated?"].getBool,
lastRequest: topic["last-request"].getInt
)
if (topic["chat-ids"].kind != JNull):
newTopic.chatIds = topic["chat-ids"].to(seq[string])
result.add(newTopic)
proc getMailserverTopicsByChatId*(self: MailserverModel, chatId: string): seq[MailServerTopic] =
result = self.getMailserverTopics()
.filter(topic => topic.chatIds.contains(chatId))
proc addMailserverTopic*(self: MailserverModel, topic: MailserverTopic) =
discard status_mailservers.addMailserverTopic(topic)
proc deleteMailserverTopic*(self: MailserverModel, chatId: string) =
var topics = self.getMailserverTopicsByChatId(chatId)
if topics.len == 0:
return
var topic:MailserverTopic = topics[0]
if(topic.chatIds.len > 1):
discard status_mailservers.addMailserverTopic(topic)
else:
discard status_mailservers.deleteMailserverTopic(topic.topic)
proc findNewMailserver(self: MailserverModel) =
warn "Finding a new mailserver..."
let mailserversReply = parseJson(status_mailservers.ping(self.mailservers, 500))["result"]
var availableMailservers:seq[(string, int)] = @[]
for reply in mailserversReply:
if(reply["error"].kind != JNull): continue # The results with error are ignored
availableMailservers.add((reply["address"].getStr, reply["rttMs"].getInt))
availableMailservers.sort(cmpMailserverReply)
# No mailservers where returned... do nothing.
if availableMailservers.len == 0: return
# Picks a random mailserver amongs the ones with the lowest latency
# The pool size is 1/4 of the mailservers were pinged successfully
randomize()
let mailServer = availableMailservers[rand(poolSize(availableMailservers.len - 1))][0]
self.connect(mailserver)
proc cycleMailservers(self: MailserverModel) =
warn "Automatically switching mailserver"
withLock activeMailserverLock:
if self.activeMailserver != "":
warn "Disconnecting active mailserver", peer=self.activeMailserver
withLock nodesLock:
self.nodes[self.activeMailserver] = MailserverStatus.Disconnected
removePeer(self.activeMailserver)
self.activeMailserver = ""
self.findNewMailserver()
proc checkConnection() {.thread.} =
{.gcsafe.}:
let sleepDuration = 10000
while true:
trace "Verifying mailserver connection state..."
withLock modelLock:
let pinnedMailserver = status_settings.getPinnedMailserver()
if pinnedMailserver != "" and mailserverModel.getActiveMailserver() != pinnedMailserver:
# connect to current mailserver from the settings
mailserverModel.mailservers.add(pinnedMailserver)
mailserverModel.connect(pinnedMailserver)
else:
# or setup a random mailserver:
if not mailserverModel.isActiveMailserverAvailable:
# TODO: have a timeout for reconnection before changing to a different server
mailserverModel.cycleMailservers()
sleep(sleepDuration)
proc init*(self: MailserverModel) =
debug "MailserverModel::init()"
self.mailservers = toSeq(self.fleet.config.getMailservers(status_settings.getFleet()).values)
for mailserver in status_settings.getMailservers().getElems():
self.mailservers.add(mailserver["address"].getStr())
connThread.createThread(checkConnection)

View File

@ -2,18 +2,17 @@ import libstatus/accounts as libstatus_accounts
import libstatus/core as libstatus_core import libstatus/core as libstatus_core
import libstatus/settings as libstatus_settings import libstatus/settings as libstatus_settings
import libstatus/types as libstatus_types import libstatus/types as libstatus_types
import chat, accounts, wallet, node, network, mailservers, messages, contacts, profile, stickers, permissions, fleet import chat, accounts, wallet, node, network, messages, contacts, profile, stickers, permissions, fleet
import ../eventemitter import ../eventemitter
import ./tasks/task_runner_impl import ./tasks/task_runner_impl
export chat, accounts, node, mailservers, messages, contacts, profile, network, permissions, fleet, task_runner_impl export chat, accounts, node, messages, contacts, profile, network, permissions, fleet, task_runner_impl, eventemitter
type Status* = ref object type Status* = ref object
events*: EventEmitter events*: EventEmitter
fleet*: FleetModel fleet*: FleetModel
chat*: ChatModel chat*: ChatModel
messages*: MessagesModel messages*: MessagesModel
mailservers*: MailserverModel
accounts*: AccountModel accounts*: AccountModel
wallet*: WalletModel wallet*: WalletModel
node*: NodeModel node*: NodeModel
@ -28,13 +27,12 @@ proc newStatusInstance*(fleetConfig: string): Status =
result = Status() result = Status()
result.tasks = newTaskRunner() result.tasks = newTaskRunner()
result.events = createEventEmitter() result.events = createEventEmitter()
result.fleet = fleet.newFleetModel(result.events, fleetConfig) result.fleet = fleet.newFleetModel(fleetConfig)
result.chat = chat.newChatModel(result.events) result.chat = chat.newChatModel(result.events)
result.accounts = accounts.newAccountModel(result.events) result.accounts = accounts.newAccountModel(result.events)
result.wallet = wallet.newWalletModel(result.events) result.wallet = wallet.newWalletModel(result.events)
result.wallet.initEvents() result.wallet.initEvents()
result.node = node.newNodeModel() result.node = node.newNodeModel()
result.mailservers = mailservers.newMailserverModel(result.fleet, result.events)
result.messages = messages.newMessagesModel(result.events) result.messages = messages.newMessagesModel(result.events)
result.profile = profile.newProfileModel() result.profile = profile.newProfileModel()
result.contacts = contacts.newContactModel(result.events) result.contacts = contacts.newContactModel(result.events)

View File

@ -1,5 +1,7 @@
import # vendor libs import # vendor libs
json_serialization, NimQml, task_runner json_serialization
export json_serialization
type type
Task* = proc(arg: string): void {.gcsafe, nimcall.} Task* = proc(arg: string): void {.gcsafe, nimcall.}

View File

@ -0,0 +1,43 @@
import # std libs
strformat, tables
import # vendor libs
chronicles
import # status-desktop libs
./marathon/worker, ./marathon/common as marathon_common
export marathon_common
logScope:
topics = "marathon"
type
Marathon* = ref object
workers: Table[string, MarathonWorker]
proc start*[T: MarathonTaskArg](self: MarathonWorker, arg: T) =
self.chanSendToWorker.sendSync(arg.encode.safe)
proc newMarathon*(): Marathon =
new(result)
result.workers = initTable[string, MarathonWorker]()
proc registerWorker*(self: Marathon, worker: MarathonWorker) =
self.workers[worker.name] = worker # overwrite if exists
proc `[]`*(self: Marathon, name: string): MarathonWorker =
if not self.workers.contains(name):
raise newException(ValueError, &"""Worker '{name}' is not registered. Use 'registerWorker("{name}", {name}Worker)' to register the worker first.""")
self.workers[name]
proc init*(self: Marathon) =
for worker in self.workers.values:
worker.init()
proc teardown*(self: Marathon) =
for worker in self.workers.values:
worker.teardown()
proc onLoggedIn*(self: Marathon) =
for worker in self.workers.values:
worker.onLoggedIn()

View File

@ -0,0 +1,6 @@
import # status-desktop libs
../qt
type
MarathonTaskArg* = ref object of QObjectTaskArg
`method`*: string

View File

@ -0,0 +1,39 @@
import # std libs
strutils
import # vendor libs
chronicles, NimQml, json_serialization
import # status-desktop libs
../../../status, ../../common as task_runner_common, ./events
logScope:
topics = "mailserver controller"
################################################################################
## ##
## NOTE: MailserverController runs on the main thread ##
## ##
################################################################################
QtObject:
type MailserverController* = ref object of QObject
variant*: QVariant
status*: Status
proc newController*(status: Status): MailserverController =
new(result)
result.status = status
result.setup()
result.variant = newQVariant(result)
proc setup(self: MailserverController) =
self.QObject.setup
proc delete*(self: MailserverController) =
self.variant.delete
self.QObject.delete
proc receiveEvent(self: MailserverController, eventTuple: string) {.slot.} =
let event = Json.decode(eventTuple, tuple[name: string, arg: MailserverArgs])
trace "forwarding event from long-running mailserver task to the main thread", event=eventTuple
self.status.events.emit(event.name, event.arg)

View File

@ -0,0 +1,21 @@
import # vendor libs
NimQml, json_serialization
import # status-desktop libs
../../../../eventemitter
type
MailserverEvents* = ref object
vptr: ByteAddress
MailserverArgs* = ref object of Args
peer*: string
const EVENTS_SLOT = "receiveEvent"
proc newMailserverEvents*(vptr: ByteAddress): MailserverEvents =
new(result)
result.vptr = vptr
proc emit*(self: MailserverEvents, event: string, arg: MailserverArgs) =
let payload: tuple[event: string, arg: MailserverArgs] = (event, arg)
signal_handler(cast[pointer](self.vptr), Json.encode(payload), EVENTS_SLOT)

View File

@ -0,0 +1,240 @@
import
algorithm, chronos, chronicles, json, math, os, random, sequtils, sets, sugar,
tables
from times import cpuTime
import
../../../libstatus/settings as status_settings,
../../../libstatus/mailservers as status_mailservers,
../../../libstatus/core as status_core, ../../../libstatus/chat as status_chat,
../../../libstatus/types, ../../../fleet,
./events as mailserver_events
logScope:
topics = "mailserver model"
################################################################################
## ##
## NOTE: MailserverModel runs on a separate (long-running) thread ##
## ##
## How do mailservers work ? ##
## ##
## - We send a request to the mailserver, we are only interested in the ##
## messages since `last-request` up to the last seven days ##
## and the last 24 hours for topics that were just joined ##
## - The mailserver doesn't directly respond to the request and ##
## instead we start receiving messages in the filters for the requested ##
## topics. ##
## - If the mailserver was not ready when we tried for instance to request ##
## the history of a topic after joining a chat, the request will be done ##
## as soon as the mailserver becomes available ##
## ##
################################################################################
type
MailserverModel* = ref object
mailservers*: seq[string]
events*: MailserverEvents
nodes*: Table[string, MailserverStatus]
activeMailserver*: string
topics*: HashSet[string]
lastConnectionAttempt*: float
fleet*: FleetModel
MailserverStatus* = enum
Unknown = -1,
Disconnected = 0,
Connecting = 1
Connected = 2,
proc cmpMailserverReply(x, y: (string, int)): int =
if x[1] > y[1]: 1
elif x[1] == y[1]: 0
else: -1
proc poolSize(fleetSize: int): int = ceil(fleetSize / 4).int
proc newMailserverModel*(vptr: ByteAddress): MailserverModel =
result = MailserverModel()
result.events = newMailserverEvents(vptr)
result.nodes = initTable[string, MailserverStatus]()
result.activeMailserver = ""
proc init*(self: MailserverModel) =
trace "MailserverModel::init()"
let fleets =
if defined(windows) and getEnv("NIM_STATUS_CLIENT_DEV").string == "":
"/../resources/fleets.json"
else:
"/../fleets.json"
let fleetConfig = readFile(joinPath(getAppDir(), fleets))
self.fleet = newFleetModel(fleetConfig)
self.mailservers = toSeq(self.fleet.config.getMailservers(status_settings.getFleet()).values)
for mailserver in status_settings.getMailservers().getElems():
self.mailservers.add(mailserver["address"].getStr())
proc getActiveMailserver*(self: MailserverModel): string = self.activeMailserver
proc isActiveMailserverAvailable*(self: MailserverModel): bool =
if not self.nodes.hasKey(self.activeMailserver):
result = false
else:
result = self.nodes[self.activeMailserver] == MailserverStatus.Connected
proc connect(self: MailserverModel, enode: string) =
debug "Connecting to mailserver", enode=enode.substr[enode.len-40..enode.len-1]
var connected = false
# TODO: this should come from settings
var knownMailservers = initHashSet[string]()
for m in self.mailservers:
knownMailservers.incl m
if not knownMailservers.contains(enode):
warn "Mailserver not known", enode
return
self.activeMailserver = enode
self.events.emit("mailserver:changed", MailserverArgs(peer: enode))
# Adding a peer and marking it as connected can't be executed sync, because
# There's a delay between requesting a peer being added, and a signal being
# received after the peer was added. So we first set the peer status as
# Connecting and once a peerConnected signal is received, we mark it as
# Connected
if self.nodes.hasKey(enode) and self.nodes[enode] == MailserverStatus.Connected:
status_mailservers.update(enode)
connected = true
else:
# Attempt to connect to mailserver by adding it as a peer
self.nodes[enode] = MailserverStatus.Connecting
addPeer(enode)
self.lastConnectionAttempt = cpuTime()
if connected:
self.events.emit("mailserverAvailable", MailserverArgs())
proc peerSummaryChange*(self: MailserverModel, peers: seq[string]) =
# When a node is added as a peer, or disconnected
# a DiscoverySummary signal is emitted. In here we
# change the status of the nodes the app is connected to
# Connected / Disconnected and emit peerConnected / peerDisconnected
# events.
var mailserverAvailable = false
for knownPeer in self.nodes.keys:
if not peers.contains(knownPeer) and self.nodes[knownPeer] != MailserverStatus.Disconnected:
debug "Peer disconnected", peer=knownPeer
self.nodes[knownPeer] = MailserverStatus.Disconnected
self.events.emit("peerDisconnected", MailserverArgs(peer: knownPeer))
if self.activeMailserver == knownPeer:
warn "Active mailserver disconnected!", peer = knownPeer
self.activeMailserver = ""
for peer in peers:
if self.nodes.hasKey(peer) and (self.nodes[peer] == MailserverStatus.Connected): continue
debug "Peer connected", peer
self.nodes[peer] = MailserverStatus.Connected
self.events.emit("peerConnected", MailserverArgs(peer: peer))
if peer == self.activeMailserver:
if self.nodes.hasKey(self.activeMailserver):
if self.activeMailserver == peer:
mailserverAvailable = true
status_mailservers.update(peer)
if mailserverAvailable:
debug "Mailserver available"
self.events.emit("mailserverAvailable", MailserverArgs())
proc requestMessages*(self: MailserverModel, topics: seq[string], fromValue: int64 = 0, toValue: int64 = 0, force: bool = false) =
debug "Requesting messages from", mailserver=self.activeMailserver
let generatedSymKey = status_chat.generateSymKeyFromPassword()
status_mailservers.requestMessages(topics, generatedSymKey, self.activeMailserver, 1000, fromValue, toValue, force)
proc getMailserverTopics*(self: MailserverModel): seq[MailserverTopic] =
let response = status_mailservers.getMailserverTopics()
let topics = parseJson(response)["result"]
var newTopic: MailserverTopic
result = @[]
if topics.kind != JNull:
for topic in topics:
newTopic = MailserverTopic(
topic: topic["topic"].getStr,
discovery: topic["discovery?"].getBool,
negotiated: topic["negotiated?"].getBool,
lastRequest: topic["last-request"].getInt
)
if (topic["chat-ids"].kind != JNull):
newTopic.chatIds = topic["chat-ids"].to(seq[string])
result.add(newTopic)
proc getMailserverTopicsByChatIds*(self: MailserverModel, chatIds: seq[string]): seq[MailServerTopic] =
var topics: seq[MailserverTopic] = @[]
for chatId in chatIds:
let filtered = self.getMailserverTopics().filter(topic => topic.chatIds.contains(chatId))
topics = topics.concat(filtered)
result = topics
proc getMailserverTopicsByChatId*(self: MailserverModel, chatId: string): seq[MailServerTopic] =
result = self.getMailserverTopics()
.filter(topic => topic.chatIds.contains(chatId))
proc addMailserverTopic*(self: MailserverModel, topic: MailserverTopic) =
discard status_mailservers.addMailserverTopic(topic)
proc deleteMailserverTopic*(self: MailserverModel, chatId: string) =
var topics = self.getMailserverTopicsByChatId(chatId)
if topics.len == 0:
return
var topic:MailserverTopic = topics[0]
if(topic.chatIds.len > 1):
discard status_mailservers.addMailserverTopic(topic)
else:
discard status_mailservers.deleteMailserverTopic(topic.topic)
proc findNewMailserver(self: MailserverModel) =
warn "Finding a new mailserver..."
let mailserversReply = parseJson(status_mailservers.ping(self.mailservers, 500))["result"]
var availableMailservers:seq[(string, int)] = @[]
for reply in mailserversReply:
if(reply["error"].kind != JNull): continue # The results with error are ignored
availableMailservers.add((reply["address"].getStr, reply["rttMs"].getInt))
availableMailservers.sort(cmpMailserverReply)
# No mailservers where returned... do nothing.
if availableMailservers.len == 0: return
# Picks a random mailserver amongs the ones with the lowest latency
# The pool size is 1/4 of the mailservers were pinged successfully
randomize()
let mailServer = availableMailservers[rand(poolSize(availableMailservers.len - 1))][0]
self.connect(mailserver)
proc cycleMailservers(self: MailserverModel) =
warn "Automatically switching mailserver"
if self.activeMailserver != "":
warn "Disconnecting active mailserver", peer=self.activeMailserver
self.nodes[self.activeMailserver] = MailserverStatus.Disconnected
removePeer(self.activeMailserver)
self.activeMailserver = ""
self.findNewMailserver()
proc checkConnection*(self: MailserverModel) {.async.} =
while true:
debug "Verifying mailserver connection state..."
let pinnedMailserver = status_settings.getPinnedMailserver()
if pinnedMailserver != "" and self.activeMailserver != pinnedMailserver:
# connect to current mailserver from the settings
self.mailservers.add(pinnedMailserver)
self.connect(pinnedMailserver)
else:
# or setup a random mailserver:
if not self.isActiveMailserverAvailable:
# TODO: have a timeout for reconnection before changing to a different server
self.cycleMailservers()
await sleepAsync(10.seconds)

View File

@ -0,0 +1,177 @@
import # std libs
json, tables
import # vendor libs
chronicles, chronos, json_serialization, task_runner
import # status-desktop libs
../worker, ./model, ../../qt, ../../common as task_runner_common,
../common as methuselash_common,
../../../libstatus/mailservers # TODO: needed for MailserverTopic type, remove?
export
chronos, task_runner_common, json_serialization
logScope:
topics = "mailserver worker"
type
MailserverWorker* = ref object of MarathonWorker
# below are all custom marathon task arg definitions
GetMailserverTopicsTaskArg* = ref object of MarathonTaskArg
IsActiveMailserverAvailableTaskArg* = ref object of MarathonTaskArg
topics*: seq[MailserverTopic]
GetActiveMailserverTaskArg* = ref object of MarathonTaskArg
RequestMessagesTaskArg* = ref object of MarathonTaskArg
topics*: seq[string]
fromValue*: int64
toValue*: int64
force*: bool
AddMailserverTopicTaskArg* = ref object of MarathonTaskArg
topic*: MailserverTopic
PeerSummaryChangeTaskArg* = ref object of MarathonTaskArg
peers*: seq[string]
GetMailserverTopicsByChatIdTaskArg* = ref object of MarathonTaskArg
chatId*: string
fetchRange*: int
GetMailserverTopicsByChatIdsTaskArg* = ref object of MarathonTaskArg
chatIds*: seq[string]
fetchRange*: int
DeleteMailserverTopicTaskArg* = ref object of MarathonTaskArg
chatId*: string
const
WORKER_NAME = "mailserver"
# forward declarations
proc workerThread(arg: WorkerThreadArg) {.thread.}
proc newMailserverWorker*(vptr: ByteAddress): MailserverWorker =
new(result)
result.chanRecvFromWorker = newAsyncChannel[ThreadSafeString](-1)
result.chanSendToWorker = newAsyncChannel[ThreadSafeString](-1)
result.vptr = vptr
method name*(self: MailserverWorker): string = WORKER_NAME
method init*(self: MailserverWorker) =
self.chanRecvFromWorker.open()
self.chanSendToWorker.open()
let arg = WorkerThreadArg(
chanSendToMain: self.chanRecvFromWorker,
chanRecvFromMain: self.chanSendToWorker,
vptr: self.vptr
)
createThread(self.thread, workerThread, arg)
# block until we receive "ready"
discard $(self.chanRecvFromWorker.recvSync())
method teardown*(self: MailserverWorker) =
self.chanSendToWorker.sendSync("shutdown".safe)
self.chanRecvFromWorker.close()
self.chanSendToWorker.close()
trace "waiting for the control thread to stop"
joinThread(self.thread)
method onLoggedIn*(self: MailserverWorker) =
self.chanSendToWorker.sendSync("loggedIn".safe)
proc processMessage(mailserverModel: MailserverModel, received: string) =
let
parsed = parseJson(received)
messageType = parsed{"$type"}.getStr
methodName = parsed{"method"}.getStr()
trace "initiating mailserver task", methodName=methodName, messageType=messageType
case methodName
of "getMailserverTopics":
let
taskArg = decode[GetMailserverTopicsTaskArg](received)
output = mailserverModel.getMailserverTopics()
taskArg.finish(output)
of "isActiveMailserverAvailable":
let
taskArg = decode[IsActiveMailserverAvailableTaskArg](received)
output = mailserverModel.isActiveMailserverAvailable()
payload: tuple[isActiveMailserverAvailable: bool, topics: seq[MailserverTopic]] = (output, taskArg.topics)
taskArg.finish(payload)
of "requestMessages":
let taskArg = decode[RequestMessagesTaskArg](received)
mailserverModel.requestMessages(taskArg.topics, taskArg.fromValue, taskArg.toValue, taskArg.force)
of "getActiveMailserver":
let
taskArg = decode[GetActiveMailserverTaskArg](received)
output = mailserverModel.getActiveMailserver()
taskArg.finish(output)
of "getMailserverTopicsByChatId":
let
taskArg = decode[GetMailserverTopicsByChatIdTaskArg](received)
output = mailserverModel.getMailserverTopicsByChatId(taskArg.chatId)
payload: tuple[topics: seq[MailserverTopic], fetchRange: int] = (output, taskArg.fetchRange)
taskArg.finish(payload)
of "getMailserverTopicsByChatIds":
let
taskArg = decode[GetMailserverTopicsByChatIdsTaskArg](received)
output = mailserverModel.getMailserverTopicsByChatIds(taskArg.chatIds)
payload: tuple[topics: seq[MailserverTopic], fetchRange: int] = (output, taskArg.fetchRange)
taskArg.finish(payload)
of "addMailserverTopic":
let taskArg = decode[AddMailserverTopicTaskArg](received)
mailserverModel.addMailserverTopic(taskArg.topic)
of "peerSummaryChange":
let taskArg = decode[PeerSummaryChangeTaskArg](received)
mailserverModel.peerSummaryChange(taskArg.peers)
of "deleteMailserverTopic":
let taskArg = decode[DeleteMailserverTopicTaskArg](received)
mailserverModel.deleteMailserverTopic(taskArg.chatId)
else:
error "unknown message", message=received
proc worker(arg: WorkerThreadArg) {.async, gcsafe, nimcall.} =
let
chanSendToMain = arg.chanSendToMain
chanRecvFromMain = arg.chanRecvFromMain
chanSendToMain.open()
chanRecvFromMain.open()
trace "sending 'ready' to main thread"
await chanSendToMain.send("ready".safe)
let mailserverModel = newMailserverModel(arg.vptr)
var unprocessedMsgs: seq[string] = @[]
while true:
let received = $(await chanRecvFromMain.recv())
if received == "loggedIn":
mailserverModel.init()
break
else:
unprocessedMsgs.add received
discard mailserverModel.checkConnection()
for msg in unprocessedMsgs.items:
mailserverModel.processMessage(msg)
while true:
trace "waiting for message"
let received = $(await chanRecvFromMain.recv())
case received
of "shutdown":
trace "received 'shutdown'"
trace "stopping worker"
break
else:
mailserverModel.processMessage(received)
proc workerThread(arg: WorkerThreadArg) =
waitFor worker(arg)

View File

@ -0,0 +1,49 @@
import # std libs
json
import # vendor libs
chronicles, chronos, json_serialization, task_runner
import # status-desktop libs
../common
export
chronos, common, json_serialization
logScope:
topics = "task-marathon-worker"
type
WorkerThreadArg* = object # of RootObj
chanSendToMain*: AsyncChannel[ThreadSafeString]
chanRecvFromMain*: AsyncChannel[ThreadSafeString]
vptr*: ByteAddress
MarathonWorker* = ref object of RootObj
chanSendToWorker*: AsyncChannel[ThreadSafeString]
chanRecvFromWorker*: AsyncChannel[ThreadSafeString]
thread*: Thread[WorkerThreadArg]
vptr*: ByteAddress
method name*(self: MarathonWorker): string {.base.} =
# override this base method
raise newException(CatchableError, "Method without implementation override")
method init*(self: MarathonWorker) {.base.} =
# override this base method
raise newException(CatchableError, "Method without implementation override")
method teardown*(self: MarathonWorker) {.base.} =
# override this base method
raise newException(CatchableError, "Method without implementation override")
method onLoggedIn*(self: MarathonWorker) {.base.} =
# override this base method
raise newException(CatchableError, "Method without implementation override")
method worker(arg: WorkerThreadArg) {.async, base, gcsafe, nimcall.} =
# override this base method
raise newException(CatchableError, "Method without implementation override")
method workerThread(arg: WorkerThreadArg) {.thread, base, gcsafe, nimcall.} =
# override this base method
raise newException(CatchableError, "Method without implementation override")

View File

@ -2,9 +2,9 @@ import # vendor libs
chronicles, task_runner chronicles, task_runner
import # status-desktop libs import # status-desktop libs
./threadpool ./marathon, ./threadpool
export task_runner, threadpool export marathon, task_runner, threadpool
logScope: logScope:
topics = "task-runner" topics = "task-runner"
@ -12,13 +12,17 @@ logScope:
type type
TaskRunner* = ref object TaskRunner* = ref object
threadpool*: ThreadPool threadpool*: ThreadPool
marathon*: Marathon
proc newTaskRunner*(): TaskRunner = proc newTaskRunner*(): TaskRunner =
new(result) new(result)
result.threadpool = newThreadPool() result.threadpool = newThreadPool()
result.marathon = newMarathon()
proc init*(self: TaskRunner) = proc init*(self: TaskRunner) =
self.threadpool.init() self.threadpool.init()
self.marathon.init()
proc teardown*(self: TaskRunner) = proc teardown*(self: TaskRunner) =
self.threadpool.teardown() self.threadpool.teardown()
self.marathon.teardown()

View File

@ -2,7 +2,7 @@ import # std libs
json, sequtils, tables json, sequtils, tables
import # vendor libs import # vendor libs
chronicles, chronos, json_serialization, NimQml, task_runner chronicles, chronos, json_serialization, task_runner
import # status-desktop libs import # status-desktop libs
./common ./common

View File

@ -9,7 +9,13 @@ Item {
id: syncContainer id: syncContainer
Layout.fillHeight: true Layout.fillHeight: true
Layout.fillWidth: true Layout.fillWidth: true
clip: true
Connections {
target: profileModel.mailservers
onActiveMailserverChanged: (activeMailserver) => {
syncContainer.activeMailserver = activeMailserver
}
}
Item { Item {
width: profileContainer.profileContentWidth width: profileContainer.profileContentWidth
@ -20,8 +26,9 @@ Item {
id: mailserversList id: mailserversList
StatusRadioButton { StatusRadioButton {
id: rbSetMailsever
text: name text: name
checked: name === profileModel.mailservers.activeMailserver checked: name === activeMailserver.activeMailserver
onClicked: { onClicked: {
if (checked) { if (checked) {
profileModel.mailservers.setMailserver(name); profileModel.mailservers.setMailserver(name);
@ -175,6 +182,10 @@ Item {
model: profileModel.mailservers.list model: profileModel.mailservers.list
delegate: mailserversList delegate: mailserversList
visible: !automaticSelectionSwitch.checked visible: !automaticSelectionSwitch.checked
Component.onCompleted: {
profileModel.mailservers.getActiveMailserver()
}
} }
} }
} }