From e7571bd2a42e22e2fc1fb7ad75cf54dedb25c7a0 Mon Sep 17 00:00:00 2001 From: Eric Mastro Date: Tue, 30 Mar 2021 12:12:10 +1100 Subject: [PATCH] feat: Add Methuselah and Mailserver long-running task Fixes #2143. Methuselah is the a task manager for long-running tasks. It allows fo registration of workers that will each run in their own thead. Each worker can pass messages to and recieve messages from the main thread. MailserverWoker was also introduced which moves all mailserver model logic to a MethuselahWorker. All communication to/from the model is done via a MethuselahTask. Results of the task are returned to the main thread by way of `signal_handler`, which calls the QtObject slot specified in the task. Mailsever also provides a way for the model to emit events inside of the worker. These events are forwarded to the main thread via the `receiveEvent` slot of the `MailserverController`. Co-authored-by: Michael Bradley, Jr --- src/app/chat/core.nim | 2 - src/app/chat/event_handling.nim | 46 ++- src/app/chat/signal_handling.nim | 11 +- src/app/chat/view.nim | 118 +++++++- src/app/profile/core.nim | 4 +- src/app/profile/views/mailservers.nim | 32 +- src/nim_status_client.nim | 17 +- src/status/fleet.nim | 5 +- src/status/libstatus/chat.nim | 2 - src/status/mailservers.nim | 273 ------------------ src/status/status.nim | 8 +- src/status/tasks/common.nim | 4 +- src/status/tasks/marathon.nim | 43 +++ src/status/tasks/marathon/common.nim | 6 + .../tasks/marathon/mailserver/controller.nim | 39 +++ .../tasks/marathon/mailserver/events.nim | 21 ++ .../tasks/marathon/mailserver/model.nim | 240 +++++++++++++++ .../tasks/marathon/mailserver/worker.nim | 177 ++++++++++++ src/status/tasks/marathon/worker.nim | 49 ++++ src/status/tasks/task_runner_impl.nim | 8 +- src/status/tasks/threadpool.nim | 2 +- .../Profile/Sections/SyncContainer.qml | 15 +- 22 files changed, 788 insertions(+), 334 deletions(-) delete mode 100644 src/status/mailservers.nim create mode 100644 src/status/tasks/marathon.nim create mode 100644 src/status/tasks/marathon/common.nim create mode 100644 src/status/tasks/marathon/mailserver/controller.nim create mode 100644 src/status/tasks/marathon/mailserver/events.nim create mode 100644 src/status/tasks/marathon/mailserver/model.nim create mode 100644 src/status/tasks/marathon/mailserver/worker.nim create mode 100644 src/status/tasks/marathon/worker.nim diff --git a/src/app/chat/core.nim b/src/app/chat/core.nim index ee2e904bf9..eb638b8ea5 100644 --- a/src/app/chat/core.nim +++ b/src/app/chat/core.nim @@ -1,6 +1,5 @@ import NimQml, chronicles, tables import ../../status/chat as chat_model -import ../../status/mailservers as mailserver_model import ../../status/messages as messages_model import ../../status/signals/types import ../../status/libstatus/types as status_types @@ -38,7 +37,6 @@ proc init*(self: ChatController) = let pubKey = status_settings.getSetting[string](Setting.PublicKey, "0x0") self.view.pubKey = pubKey - self.status.mailservers.init() self.status.chat.init(pubKey) self.status.stickers.init() self.view.reactions.init() diff --git a/src/app/chat/event_handling.nim b/src/app/chat/event_handling.nim index 784db00bba..4674558b1d 100644 --- a/src/app/chat/event_handling.nim +++ b/src/app/chat/event_handling.nim @@ -1,6 +1,14 @@ -import sugar, sequtils, times, strutils -import ../../status/chat/chat as status_chat -import ./views/communities +import # std libs + sugar, sequtils, times, strutils + +import + stew/faux_closures + +import # status-desktop libs + ../../status/chat/chat as status_chat, ./views/communities, + ../../status/tasks/marathon, + ../../status/tasks/marathon/mailserver/worker, + ../../status/libstatus/mailservers # TODO: needed for MailserverTopic type, remove? proc handleChatEvents(self: ChatController) = # Display already saved messages @@ -113,23 +121,33 @@ proc handleChatEvents(self: ChatController) = else: self.view.stickers.resetBuyAttempt(tx.data.parseInt) + proc handleMailserverEvents(self: ChatController) = + let mailserverWorker = self.status.tasks.marathon[MailserverWorker().name] + self.status.events.on("mailserverTopics") do(e: Args): var topics = TopicArgs(e).topics for topic in topics: topic.lastRequest = times.toUnix(times.getTime()) - self.status.mailservers.addMailserverTopic(topic) + let task = AddMailserverTopicTaskArg( + `method`: "addMailserverTopic", + topic: topic + ) + mailserverWorker.start(task) - if(self.status.mailservers.isActiveMailserverAvailable): - self.view.setLoadingMessages(true) - self.status.mailservers.requestMessages(topics.map(t => t.topic)) + let task = IsActiveMailserverAvailableTaskArg( + `method`: "isActiveMailserverAvailable", + vptr: cast[ByteAddress](self.view.vptr), + slot: "isActiveMailserverResult", + topics: topics + ) + mailserverWorker.start(task) self.status.events.on("mailserverAvailable") do(e:Args): - let mailserverTopics = self.status.mailservers.getMailserverTopics() - var fromValue = times.toUnix(times.getTime()) - 86400 # today - 24 hours - - if mailserverTopics.len > 0: - fromValue = min(mailserverTopics.map(topic => topic.lastRequest)) + let task = GetMailserverTopicsTaskArg( + `method`: "getMailserverTopics", + vptr: cast[ByteAddress](self.view.vptr), + slot: "getMailserverTopicsResult" + ) + mailserverWorker.start(task) - self.view.setLoadingMessages(true) - self.status.mailservers.requestMessages(mailserverTopics.map(t => t.topic), fromValue) diff --git a/src/app/chat/signal_handling.nim b/src/app/chat/signal_handling.nim index 80281fb48c..45613f1ad0 100644 --- a/src/app/chat/signal_handling.nim +++ b/src/app/chat/signal_handling.nim @@ -1,3 +1,6 @@ +import + ../../status/tasks/marathon/mailserver/worker + proc handleSignals(self: ChatController) = self.status.events.on(SignalType.Message.event) do(e:Args): var data = MessageSignal(e) @@ -6,7 +9,13 @@ proc handleSignals(self: ChatController) = self.status.events.on(SignalType.DiscoverySummary.event) do(e:Args): ## Handle mailserver peers being added and removed var data = DiscoverySummarySignal(e) - self.status.mailservers.peerSummaryChange(data.enodes) + let + mailserverWorker = self.status.tasks.marathon[MailserverWorker().name] + task = PeerSummaryChangeTaskArg( + `method`: "peerSummaryChange", + peers: data.enodes + ) + mailserverWorker.start(task) self.status.events.on(SignalType.EnvelopeSent.event) do(e:Args): var data = EnvelopeSentSignal(e) diff --git a/src/app/chat/view.nim b/src/app/chat/view.nim index fdbd044850..cbf4b0f67b 100644 --- a/src/app/chat/view.nim +++ b/src/app/chat/view.nim @@ -1,6 +1,5 @@ import NimQml, Tables, json, sequtils, chronicles, times, re, sugar, strutils, os, strformat, algorithm import ../../status/status -import ../../status/mailservers import ../../status/libstatus/chat as libstatus_chat import ../../status/libstatus/accounts/constants import ../../status/libstatus/mailservers as status_mailservers @@ -17,6 +16,7 @@ import web3/[conversions, ethtypes] import views/[channels_list, message_list, chat_item, suggestions_list, reactions, stickers, groups, transactions, communities, community_list, community_item] import ../utils/image_utils import ../../status/tasks/[qt, task_runner_impl] +import ../../status/tasks/marathon/mailserver/worker logScope: topics = "chats-view" @@ -199,12 +199,24 @@ QtObject: if self.status.chat.lastMessageTimestamps.hasKey(self.activeChannel.id): if force or self.status.chat.lastMessageTimestamps[self.activeChannel.id] <= self.oldestMessageTimestamp: self.oldestMessageTimestamp = self.status.chat.lastMessageTimestamps[self.activeChannel.id] + self.oldestMessageTimestampChanged() else: - let topics = self.status.mailservers.getMailserverTopicsByChatId(self.activeChannel.id) - if topics.len > 0: - self.oldestMessageTimestamp = topics[0].lastRequest - else: - self.oldestMessageTimestamp = times.toUnix(times.getTime()) + let + mailserverWorker = self.status.tasks.marathon[MailserverWorker().name] + task = GetMailserverTopicsByChatIdTaskArg( + `method`: "getMailserverTopicsByChatId", + vptr: cast[ByteAddress](self.vptr), + slot: "getMailserverTopicsByChatIdResult", + chatId: self.activeChannel.id + ) + mailserverWorker.start(task) + + proc getMailserverTopicsByChatIdResult(self: ChatsView, topicsEncoded: string) {.slot.} = + let topicsTuple = decode[tuple[topics: seq[MailserverTopic], fetchRange: int]](topicsEncoded) + if topicsTuple.topics.len > 0: + self.oldestMessageTimestamp = topicsTuple.topics[0].lastRequest + else: + self.oldestMessageTimestamp = times.toUnix(times.getTime()) self.oldestMessageTimestampChanged() proc getChatsList(self: ChatsView): QVariant {.slot.} = @@ -622,22 +634,54 @@ QtObject: write = setLoadingMessages notify = loadingMessagesChanged + proc getMailserverTopicsByChatIdResult2(self: ChatsView, topicsEncoded: string) {.slot.} = + let + topicsTuple = decode[tuple[topics: seq[MailserverTopic], fetchRange: int]](topicsEncoded) + currentOldestMessageTimestamp = self.oldestMessageTimestamp + self.oldestMessageTimestamp = self.oldestMessageTimestamp - topicsTuple.fetchRange + + let + mailserverWorker = self.status.tasks.marathon[MailserverWorker().name] + task = RequestMessagesTaskArg( + `method`: "requestMessages", + vptr: cast[ByteAddress](self.vptr), + slot: "requestMessagesResult", + topics: topicsTuple.topics.map(topic => topic.topic), + fromValue: self.oldestMessageTimestamp, + toValue: currentOldestMessageTimestamp, + force: true + ) + mailserverWorker.start(task) + proc requestMoreMessages*(self: ChatsView, fetchRange: int) {.slot.} = self.loadingMessages = true self.loadingMessagesChanged(true) + let mailserverWorker = self.status.tasks.marathon[MailserverWorker().name] - var allTopics: seq[string] = @[] if(self.activeChannel.isTimelineChat): + var chatIds: seq[string] = @[] for contact in self.status.contacts.getContacts(): - for t in self.status.mailservers.getMailserverTopicsByChatId(getTimelineChatId(contact.id)).map(topic => topic.topic): - allTopics.add(t) + chatIds.add(getTimelineChatId(contact.id)) + + let task = GetMailserverTopicsByChatIdsTaskArg( + `method`: "getMailserverTopicsByChatIds", + vptr: cast[ByteAddress](self.vptr), + slot: "getMailserverTopicsByChatIdResult2", + chatIds: chatIds, + fetchRange: fetchRange + ) + mailserverWorker.start(task) else: - allTopics = self.status.mailservers.getMailserverTopicsByChatId(self.activeChannel.id).map(topic => topic.topic) + let task = GetMailserverTopicsByChatIdTaskArg( + `method`: "getMailserverTopicsByChatId", + vptr: cast[ByteAddress](self.vptr), + slot: "getMailserverTopicsByChatIdResult2", + chatId: self.activeChannel.id, + fetchRange: fetchRange + ) + mailserverWorker.start(task) - let currentOldestMessageTimestamp = self.oldestMessageTimestamp - self.oldestMessageTimestamp = self.oldestMessageTimestamp - fetchRange - - self.status.mailservers.requestMessages(allTopics, self.oldestMessageTimestamp, currentOldestMessageTimestamp, true) + proc requestMessagesResult(self: ChatsView, resultEncoded: string) {.slot.} = self.oldestMessageTimestampChanged() self.messagesLoaded(); @@ -648,11 +692,23 @@ QtObject: if (self.activeChannel.id == selectedChannel.id): self.activeChannel.chatItem = nil self.status.chat.leave(selectedChannel.id) - self.status.mailservers.deleteMailserverTopic(selectedChannel.id) + let + mailserverWorker = self.status.tasks.marathon[MailserverWorker().name] + task = DeleteMailserverTopicTaskArg( + `method`: "deleteMailserverTopic", + chatId: selectedChannel.id + ) + mailserverWorker.start(task) proc leaveActiveChat*(self: ChatsView) {.slot.} = self.status.chat.leave(self.activeChannel.id) - self.status.mailservers.deleteMailserverTopic(self.activeChannel.id) + let + mailserverWorker = self.status.tasks.marathon[MailserverWorker().name] + task = DeleteMailserverTopicTaskArg( + `method`: "deleteMailserverTopic", + chatId: self.activeChannel.id + ) + mailserverWorker.start(task) proc removeChat*(self: ChatsView, chatId: string) = discard self.chats.removeChatItemFromList(chatId) @@ -843,3 +899,33 @@ QtObject: idx = idx + 1 if(id == msg.id): return idx return idx + + proc isActiveMailserverResult(self: ChatsView, resultEncoded: string) {.slot.} = + let arg = decode[tuple[isActiveMailserverAvailable: bool, topics: seq[MailserverTopic]]](resultEncoded) + + if arg.isActiveMailserverAvailable: + self.setLoadingMessages(true) + let + mailserverWorker = self.status.tasks.marathon[MailserverWorker().name] + task = RequestMessagesTaskArg( + `method`: "requestMessages", + topics: arg.topics.map(t => t.topic) + ) + mailserverWorker.start(task) + + proc getMailserverTopicsResult(self: ChatsView, resultEncoded: string) {.slot.} = + let mailserverTopics = decode[seq[MailserverTopic]](resultEncoded) + var fromValue = times.toUnix(times.getTime()) - 86400 # today - 24 hours + + if mailserverTopics.len > 0: + fromValue = min(mailserverTopics.map(topic => topic.lastRequest)) + + self.setLoadingMessages(true) + let + mailserverWorker = self.status.tasks.marathon[MailserverWorker().name] + task = RequestMessagesTaskArg( + `method`: "requestMessages", + topics: mailserverTopics.map(t => t.topic), + fromValue: fromValue + ) + mailserverWorker.start(task) diff --git a/src/app/profile/core.nim b/src/app/profile/core.nim index 3901719487..732fe30f44 100644 --- a/src/app/profile/core.nim +++ b/src/app/profile/core.nim @@ -17,6 +17,7 @@ import view import views/[ens_manager, devices, network, mailservers, contacts] import ../chat/views/channels_list import chronicles +import ../../status/tasks/marathon/mailserver/events type ProfileController* = ref object view*: ProfileView @@ -109,7 +110,8 @@ proc init*(self: ProfileController, account: Account) = self.view.contacts.setContactList(contacts) self.status.events.on("mailserver:changed") do(e: Args): - self.view.mailservers.activeMailserverChanged() + let mailserverArg = MailserverArgs(e) + self.view.mailservers.activeMailserverChanged(mailserverArg.peer) self.status.events.on(SignalType.Message.event) do(e: Args): let msgData = MessageSignal(e); diff --git a/src/app/profile/views/mailservers.nim b/src/app/profile/views/mailservers.nim index ea0db941ef..128cb104a9 100644 --- a/src/app/profile/views/mailservers.nim +++ b/src/app/profile/views/mailservers.nim @@ -1,9 +1,9 @@ import NimQml, chronicles import ../../../status/status -import ../../../status/mailservers import ../../../status/profile/mailserver import mailservers_list import ../../../status/libstatus/settings as status_settings +import ../../../status/tasks/marathon/mailserver/worker logScope: topics = "mailservers-view" @@ -35,14 +35,20 @@ QtObject: QtProperty[QVariant] list: read = getMailserversList + proc activeMailserverChanged*(self: MailserversView, activeMailserverName: string) {.signal.} + proc getActiveMailserver(self: MailserversView): string {.slot.} = - return self.mailserversList.getMailserverName(self.status.mailservers.getActiveMailserver()) + let + mailserverWorker = self.status.tasks.marathon[MailserverWorker().name] + task = GetActiveMailserverTaskArg( + `method`: "getActiveMailserver", + vptr: cast[ByteAddress](self.vptr), + slot: "getActiveMailserverResult" + ) + mailserverWorker.start(task) - proc activeMailserverChanged*(self: MailserversView) {.signal.} - - QtProperty[string] activeMailserver: - read = getActiveMailserver - notify = activeMailserverChanged + proc getActiveMailserverResult*(self: MailserversView, activeMailserver: string) {.slot.} = + self.activeMailserverChanged(activeMailserver) proc getAutomaticSelection(self: MailserversView): bool {.slot.} = status_settings.getPinnedMailserver() == "" @@ -58,7 +64,17 @@ QtObject: if value: status_settings.pinMailserver() else: - status_settings.pinMailserver(self.status.mailservers.getActiveMailserver()) + let + mailserverWorker = self.status.tasks.marathon[MailserverWorker().name] + task = GetActiveMailserverTaskArg( + `method`: "getActiveMailserver", + vptr: cast[ByteAddress](self.vptr), + slot: "getActiveMailserverResult2" + ) + mailserverWorker.start(task) + + proc getActiveMailserverResult2(self: MailserversView, activeMailserver: string) {.slot.} = + status_settings.pinMailserver(activeMailserver) proc save(self: MailserversView, name: string, address: string) {.slot.} = status_settings.saveMailserver(name, address) diff --git a/src/nim_status_client.nim b/src/nim_status_client.nim index 62aa2a73b5..5d7304590f 100644 --- a/src/nim_status_client.nim +++ b/src/nim_status_client.nim @@ -15,8 +15,11 @@ import status/libstatus/accounts/constants import status_go import status/status as statuslib import ./eventemitter +import ./status/tasks/marathon/mailserver/controller as mailserver_controller +import ./status/tasks/marathon/mailserver/worker as mailserver_worker var signalsQObjPointer: pointer +var mailserverQObjPointer: pointer logScope: topics = "main" @@ -28,7 +31,14 @@ proc mainProc() = else: "/../fleets.json" - let status = statuslib.newStatusInstance(readFile(joinPath(getAppDir(), fleets))) + let + fleetConfig = readFile(joinPath(getAppDir(), fleets)) + status = statuslib.newStatusInstance(fleetConfig) + mailserverController = mailserver_controller.newController(status) + mailserverWorker = mailserver_worker.newMailserverWorker(cast[ByteAddress](mailserverController.vptr)) + + # TODO: create and register an ipcWorker + status.tasks.marathon.registerWorker(mailserverWorker) status.initNode() enableHDPI() @@ -117,6 +127,9 @@ proc mainProc() = status.events.once("login") do(a: Args): var args = AccountArgs(a) + + status.tasks.marathon.onLoggedIn() + # Delete login and onboarding from memory to remove any mnemonic that would have been saved in the accounts list login.delete() onboarding.delete() @@ -132,7 +145,6 @@ proc mainProc() = wallet.checkPendingTransactions() wallet.start() - engine.setRootContextProperty("loginModel", login.variant) engine.setRootContextProperty("onboardingModel", onboarding.variant) @@ -183,6 +195,7 @@ proc mainProc() = initControllers() engine.setRootContextProperty("signals", signalController.variant) + engine.setRootContextProperty("mailserver", mailserverController.variant) engine.load(newQUrl("qrc:///main.qml")) diff --git a/src/status/fleet.nim b/src/status/fleet.nim index befc08711c..0d9f48e8e0 100644 --- a/src/status/fleet.nim +++ b/src/status/fleet.nim @@ -1,15 +1,12 @@ -import ../eventemitter import json import libstatus/types type FleetModel* = ref object - events*: EventEmitter config*: FleetConfig -proc newFleetModel*(events: EventEmitter, fleetConfigJson: string): FleetModel = +proc newFleetModel*(fleetConfigJson: string): FleetModel = result = FleetModel() - result.events = events result.config = fleetConfigJson.toFleetConfig() proc delete*(self: FleetModel) = diff --git a/src/status/libstatus/chat.nim b/src/status/libstatus/chat.nim index d2a4779d8f..2db3cb3bdf 100644 --- a/src/status/libstatus/chat.nim +++ b/src/status/libstatus/chat.nim @@ -169,7 +169,6 @@ proc sendImageMessage*(chatId: string, image: string): string = proc sendImageMessages*(chatId: string, images: var seq[string]): string = let preferredUsername = getSetting[string](Setting.PreferredUsername, "") - debugEcho ">>> [status/libstatus/chat.sendImageMessages] about to send images" let imagesJson = %* images.map(image => %* { "chatId": chatId, @@ -179,7 +178,6 @@ proc sendImageMessages*(chatId: string, images: var seq[string]): string = "text": "Update to latest version to see a nice image here!" } ) - debugEcho ">>> [status/libstatus/chat.sendImageMessages] imagesJson:", $imagesJson callPrivateRPC("sendChatMessages".prefix, %* [imagesJson]) proc sendStickerMessage*(chatId: string, sticker: Sticker): string = diff --git a/src/status/mailservers.nim b/src/status/mailservers.nim deleted file mode 100644 index 715ad1f772..0000000000 --- a/src/status/mailservers.nim +++ /dev/null @@ -1,273 +0,0 @@ -import algorithm, json, random, math, os, tables, sets, chronicles, sequtils, locks, sugar, times -import libstatus/core as status_core -import libstatus/chat as status_chat -import libstatus/settings as status_settings -import libstatus/types -import libstatus/mailservers as status_mailservers -import ../eventemitter -import fleet - - -# How do mailserver should work ? -# -# - We send a request to the mailserver, we are only interested in the -# messages since `last-request` up to the last seven days -# and the last 24 hours for topics that were just joined -# - The mailserver doesn't directly respond to the request and -# instead we start receiving messages in the filters for the requested -# topics. -# - If the mailserver was not ready when we tried for instance to request -# the history of a topic after joining a chat, the request will be done -# as soon as the mailserver becomes available - -logScope: - topics = "mailserver-model" - -var nodesLock: Lock -var activeMailserverLock: Lock - - -type - MailserverArg* = ref object of Args - peer*: string - - MailserverStatus* = enum - Unknown = -1, - Disconnected = 0, - Connecting = 1 - Connected = 2, - - MailserverModel* = ref object - mailservers*: seq[string] - events*: EventEmitter - nodes*: Table[string, MailserverStatus] - activeMailserver*: string - topics*: HashSet[string] - lastConnectionAttempt*: float - fleet*: FleetModel - - -proc cmpMailserverReply(x, y: (string, int)): int = - if x[1] > y[1]: 1 - elif x[1] == y[1]: 0 - else: -1 - - -proc poolSize(fleetSize: int): int = ceil(fleetSize / 4).int - - -var mailserverModel: MailServerModel -var modelLock: Lock -var connThread: Thread[void] - - -proc newMailserverModel*(fleet: FleetModel, events: EventEmitter): MailserverModel = - result = MailserverModel() - result.events = events - result.fleet = fleet - result.nodes = initTable[string, MailserverStatus]() - result.activeMailserver = "" - - mailserverModel = result - - modelLock.initLock() - nodesLock.initLock() - activeMailserverLock.initLock() - - -proc getActiveMailserver*(self:MailserverModel): string = - withLock activeMailserverLock: - result = self.activeMailserver - -proc isActiveMailserverAvailable*(self:MailserverModel): bool = - activeMailserverLock.acquire() - nodesLock.acquire() - - if not self.nodes.hasKey(self.activeMailserver): - result = false - else: - result = self.nodes[self.activeMailserver] == MailserverStatus.Connected - - nodesLock.release() - activeMailserverLock.release() - - -proc connect(self: MailserverModel, enode: string) = - debug "Connecting to mailserver", enode=enode.substr[enode.len-40..enode.len-1] - var connected = false - # TODO: this should come from settings - var knownMailservers = initHashSet[string]() - for m in self.mailservers: - knownMailservers.incl m - if not knownMailservers.contains(enode): - warn "Mailserver not known", enode - return - - activeMailserverLock.acquire() - nodesLock.acquire() - - self.activeMailserver = enode - self.events.emit("mailserver:changed", Args()) - - # Adding a peer and marking it as connected can't be executed sync, because - # There's a delay between requesting a peer being added, and a signal being - # received after the peer was added. So we first set the peer status as - # Connecting and once a peerConnected signal is received, we mark it as - # Connected - - if self.nodes.hasKey(enode) and self.nodes[enode] == MailserverStatus.Connected: - status_mailservers.update(enode) - connected = true - else: - # Attempt to connect to mailserver by adding it as a peer - self.nodes[enode] = MailserverStatus.Connecting - addPeer(enode) - self.lastConnectionAttempt = cpuTime() - - nodesLock.release() - activeMailserverLock.release() - if connected: - self.events.emit("mailserverAvailable", Args()) - - - -proc peerSummaryChange*(self: MailserverModel, peers: seq[string]) = - # When a node is added as a peer, or disconnected - # a DiscoverySummary signal is emitted. In here we - # change the status of the nodes the app is connected to - # Connected / Disconnected and emit peerConnected / peerDisconnected - # events. - var mailserverAvailable = false - withLock nodesLock: - for knownPeer in self.nodes.keys: - if not peers.contains(knownPeer) and self.nodes[knownPeer] != MailserverStatus.Disconnected: - debug "Peer disconnected", peer=knownPeer - self.nodes[knownPeer] = MailserverStatus.Disconnected - self.events.emit("peerDisconnected", MailserverArg(peer: knownPeer)) - withLock activeMailserverLock: - if self.activeMailserver == knownPeer: - warn "Active mailserver disconnected!", peer = knownPeer - self.activeMailserver = "" - - for peer in peers: - if self.nodes.hasKey(peer) and (self.nodes[peer] == MailserverStatus.Connected): continue - debug "Peer connected", peer - self.nodes[peer] = MailserverStatus.Connected - self.events.emit("peerConnected", MailserverArg(peer: peer)) - - withLock activeMailserverLock: - if peer == self.activeMailserver: - if self.nodes.hasKey(self.activeMailserver): - if self.activeMailserver == peer: - mailserverAvailable = true - - status_mailservers.update(peer) - - if mailserverAvailable: - debug "Mailserver available" - self.events.emit("mailserverAvailable", Args()) - - -proc requestMessages*(self: MailserverModel, topics: seq[string], fromValue: int64 = 0, toValue: int64 = 0, force: bool = false) = - withLock activeMailserverLock: - debug "Requesting messages from", mailserver=self.activeMailserver - let generatedSymKey = status_chat.generateSymKeyFromPassword() - status_mailservers.requestMessages(topics, generatedSymKey, self.activeMailserver, 1000, fromValue, toValue, force) - -proc getMailserverTopics*(self: MailserverModel): seq[MailserverTopic] = - let response = status_mailservers.getMailserverTopics() - let topics = parseJson(response)["result"] - var newTopic: MailserverTopic - result = @[] - if topics.kind != JNull: - for topic in topics: - newTopic = MailserverTopic( - topic: topic["topic"].getStr, - discovery: topic["discovery?"].getBool, - negotiated: topic["negotiated?"].getBool, - lastRequest: topic["last-request"].getInt - ) - if (topic["chat-ids"].kind != JNull): - newTopic.chatIds = topic["chat-ids"].to(seq[string]) - - result.add(newTopic) - - -proc getMailserverTopicsByChatId*(self: MailserverModel, chatId: string): seq[MailServerTopic] = - result = self.getMailserverTopics() - .filter(topic => topic.chatIds.contains(chatId)) - -proc addMailserverTopic*(self: MailserverModel, topic: MailserverTopic) = - discard status_mailservers.addMailserverTopic(topic) - -proc deleteMailserverTopic*(self: MailserverModel, chatId: string) = - var topics = self.getMailserverTopicsByChatId(chatId) - if topics.len == 0: - return - - var topic:MailserverTopic = topics[0] - if(topic.chatIds.len > 1): - discard status_mailservers.addMailserverTopic(topic) - else: - discard status_mailservers.deleteMailserverTopic(topic.topic) - -proc findNewMailserver(self: MailserverModel) = - warn "Finding a new mailserver..." - - let mailserversReply = parseJson(status_mailservers.ping(self.mailservers, 500))["result"] - - var availableMailservers:seq[(string, int)] = @[] - for reply in mailserversReply: - if(reply["error"].kind != JNull): continue # The results with error are ignored - availableMailservers.add((reply["address"].getStr, reply["rttMs"].getInt)) - availableMailservers.sort(cmpMailserverReply) - - # No mailservers where returned... do nothing. - if availableMailservers.len == 0: return - - # Picks a random mailserver amongs the ones with the lowest latency - # The pool size is 1/4 of the mailservers were pinged successfully - randomize() - - let mailServer = availableMailservers[rand(poolSize(availableMailservers.len - 1))][0] - - self.connect(mailserver) - - -proc cycleMailservers(self: MailserverModel) = - warn "Automatically switching mailserver" - withLock activeMailserverLock: - if self.activeMailserver != "": - warn "Disconnecting active mailserver", peer=self.activeMailserver - withLock nodesLock: - self.nodes[self.activeMailserver] = MailserverStatus.Disconnected - removePeer(self.activeMailserver) - self.activeMailserver = "" - self.findNewMailserver() - -proc checkConnection() {.thread.} = - {.gcsafe.}: - let sleepDuration = 10000 - while true: - trace "Verifying mailserver connection state..." - withLock modelLock: - let pinnedMailserver = status_settings.getPinnedMailserver() - if pinnedMailserver != "" and mailserverModel.getActiveMailserver() != pinnedMailserver: - # connect to current mailserver from the settings - mailserverModel.mailservers.add(pinnedMailserver) - mailserverModel.connect(pinnedMailserver) - else: - # or setup a random mailserver: - if not mailserverModel.isActiveMailserverAvailable: - # TODO: have a timeout for reconnection before changing to a different server - mailserverModel.cycleMailservers() - sleep(sleepDuration) - - -proc init*(self: MailserverModel) = - debug "MailserverModel::init()" - self.mailservers = toSeq(self.fleet.config.getMailservers(status_settings.getFleet()).values) - for mailserver in status_settings.getMailservers().getElems(): - self.mailservers.add(mailserver["address"].getStr()) - connThread.createThread(checkConnection) - \ No newline at end of file diff --git a/src/status/status.nim b/src/status/status.nim index 852f366a26..0ccc8fbde1 100644 --- a/src/status/status.nim +++ b/src/status/status.nim @@ -2,18 +2,17 @@ import libstatus/accounts as libstatus_accounts import libstatus/core as libstatus_core import libstatus/settings as libstatus_settings import libstatus/types as libstatus_types -import chat, accounts, wallet, node, network, mailservers, messages, contacts, profile, stickers, permissions, fleet +import chat, accounts, wallet, node, network, messages, contacts, profile, stickers, permissions, fleet import ../eventemitter import ./tasks/task_runner_impl -export chat, accounts, node, mailservers, messages, contacts, profile, network, permissions, fleet, task_runner_impl +export chat, accounts, node, messages, contacts, profile, network, permissions, fleet, task_runner_impl, eventemitter type Status* = ref object events*: EventEmitter fleet*: FleetModel chat*: ChatModel messages*: MessagesModel - mailservers*: MailserverModel accounts*: AccountModel wallet*: WalletModel node*: NodeModel @@ -28,13 +27,12 @@ proc newStatusInstance*(fleetConfig: string): Status = result = Status() result.tasks = newTaskRunner() result.events = createEventEmitter() - result.fleet = fleet.newFleetModel(result.events, fleetConfig) + result.fleet = fleet.newFleetModel(fleetConfig) result.chat = chat.newChatModel(result.events) result.accounts = accounts.newAccountModel(result.events) result.wallet = wallet.newWalletModel(result.events) result.wallet.initEvents() result.node = node.newNodeModel() - result.mailservers = mailservers.newMailserverModel(result.fleet, result.events) result.messages = messages.newMessagesModel(result.events) result.profile = profile.newProfileModel() result.contacts = contacts.newContactModel(result.events) diff --git a/src/status/tasks/common.nim b/src/status/tasks/common.nim index af30cca370..ae508e5ed7 100644 --- a/src/status/tasks/common.nim +++ b/src/status/tasks/common.nim @@ -1,5 +1,7 @@ import # vendor libs - json_serialization, NimQml, task_runner + json_serialization + +export json_serialization type Task* = proc(arg: string): void {.gcsafe, nimcall.} diff --git a/src/status/tasks/marathon.nim b/src/status/tasks/marathon.nim new file mode 100644 index 0000000000..565cc42119 --- /dev/null +++ b/src/status/tasks/marathon.nim @@ -0,0 +1,43 @@ +import # std libs + strformat, tables + +import # vendor libs + chronicles + +import # status-desktop libs + ./marathon/worker, ./marathon/common as marathon_common +export marathon_common + +logScope: + topics = "marathon" + +type + Marathon* = ref object + workers: Table[string, MarathonWorker] + +proc start*[T: MarathonTaskArg](self: MarathonWorker, arg: T) = + self.chanSendToWorker.sendSync(arg.encode.safe) + +proc newMarathon*(): Marathon = + new(result) + result.workers = initTable[string, MarathonWorker]() + +proc registerWorker*(self: Marathon, worker: MarathonWorker) = + self.workers[worker.name] = worker # overwrite if exists + +proc `[]`*(self: Marathon, name: string): MarathonWorker = + if not self.workers.contains(name): + raise newException(ValueError, &"""Worker '{name}' is not registered. Use 'registerWorker("{name}", {name}Worker)' to register the worker first.""") + self.workers[name] + +proc init*(self: Marathon) = + for worker in self.workers.values: + worker.init() + +proc teardown*(self: Marathon) = + for worker in self.workers.values: + worker.teardown() + +proc onLoggedIn*(self: Marathon) = + for worker in self.workers.values: + worker.onLoggedIn() diff --git a/src/status/tasks/marathon/common.nim b/src/status/tasks/marathon/common.nim new file mode 100644 index 0000000000..f719742953 --- /dev/null +++ b/src/status/tasks/marathon/common.nim @@ -0,0 +1,6 @@ +import # status-desktop libs + ../qt + +type + MarathonTaskArg* = ref object of QObjectTaskArg + `method`*: string diff --git a/src/status/tasks/marathon/mailserver/controller.nim b/src/status/tasks/marathon/mailserver/controller.nim new file mode 100644 index 0000000000..db4d71c494 --- /dev/null +++ b/src/status/tasks/marathon/mailserver/controller.nim @@ -0,0 +1,39 @@ +import # std libs + strutils + +import # vendor libs + chronicles, NimQml, json_serialization + +import # status-desktop libs + ../../../status, ../../common as task_runner_common, ./events + +logScope: + topics = "mailserver controller" + +################################################################################ +## ## +## NOTE: MailserverController runs on the main thread ## +## ## +################################################################################ +QtObject: + type MailserverController* = ref object of QObject + variant*: QVariant + status*: Status + + proc newController*(status: Status): MailserverController = + new(result) + result.status = status + result.setup() + result.variant = newQVariant(result) + + proc setup(self: MailserverController) = + self.QObject.setup + + proc delete*(self: MailserverController) = + self.variant.delete + self.QObject.delete + + proc receiveEvent(self: MailserverController, eventTuple: string) {.slot.} = + let event = Json.decode(eventTuple, tuple[name: string, arg: MailserverArgs]) + trace "forwarding event from long-running mailserver task to the main thread", event=eventTuple + self.status.events.emit(event.name, event.arg) \ No newline at end of file diff --git a/src/status/tasks/marathon/mailserver/events.nim b/src/status/tasks/marathon/mailserver/events.nim new file mode 100644 index 0000000000..31765d7538 --- /dev/null +++ b/src/status/tasks/marathon/mailserver/events.nim @@ -0,0 +1,21 @@ +import # vendor libs + NimQml, json_serialization + +import # status-desktop libs + ../../../../eventemitter + +type + MailserverEvents* = ref object + vptr: ByteAddress + MailserverArgs* = ref object of Args + peer*: string + +const EVENTS_SLOT = "receiveEvent" + +proc newMailserverEvents*(vptr: ByteAddress): MailserverEvents = + new(result) + result.vptr = vptr + +proc emit*(self: MailserverEvents, event: string, arg: MailserverArgs) = + let payload: tuple[event: string, arg: MailserverArgs] = (event, arg) + signal_handler(cast[pointer](self.vptr), Json.encode(payload), EVENTS_SLOT) \ No newline at end of file diff --git a/src/status/tasks/marathon/mailserver/model.nim b/src/status/tasks/marathon/mailserver/model.nim new file mode 100644 index 0000000000..ac393e56e6 --- /dev/null +++ b/src/status/tasks/marathon/mailserver/model.nim @@ -0,0 +1,240 @@ +import + algorithm, chronos, chronicles, json, math, os, random, sequtils, sets, sugar, + tables +from times import cpuTime + +import + ../../../libstatus/settings as status_settings, + ../../../libstatus/mailservers as status_mailservers, + ../../../libstatus/core as status_core, ../../../libstatus/chat as status_chat, + ../../../libstatus/types, ../../../fleet, + ./events as mailserver_events + +logScope: + topics = "mailserver model" + +################################################################################ +## ## +## NOTE: MailserverModel runs on a separate (long-running) thread ## +## ## +## How do mailservers work ? ## +## ## +## - We send a request to the mailserver, we are only interested in the ## +## messages since `last-request` up to the last seven days ## +## and the last 24 hours for topics that were just joined ## +## - The mailserver doesn't directly respond to the request and ## +## instead we start receiving messages in the filters for the requested ## +## topics. ## +## - If the mailserver was not ready when we tried for instance to request ## +## the history of a topic after joining a chat, the request will be done ## +## as soon as the mailserver becomes available ## +## ## +################################################################################ +type + MailserverModel* = ref object + mailservers*: seq[string] + events*: MailserverEvents + nodes*: Table[string, MailserverStatus] + activeMailserver*: string + topics*: HashSet[string] + lastConnectionAttempt*: float + fleet*: FleetModel + + MailserverStatus* = enum + Unknown = -1, + Disconnected = 0, + Connecting = 1 + Connected = 2, + +proc cmpMailserverReply(x, y: (string, int)): int = + if x[1] > y[1]: 1 + elif x[1] == y[1]: 0 + else: -1 + +proc poolSize(fleetSize: int): int = ceil(fleetSize / 4).int + +proc newMailserverModel*(vptr: ByteAddress): MailserverModel = + result = MailserverModel() + result.events = newMailserverEvents(vptr) + result.nodes = initTable[string, MailserverStatus]() + result.activeMailserver = "" + +proc init*(self: MailserverModel) = + trace "MailserverModel::init()" + let fleets = + if defined(windows) and getEnv("NIM_STATUS_CLIENT_DEV").string == "": + "/../resources/fleets.json" + else: + "/../fleets.json" + let fleetConfig = readFile(joinPath(getAppDir(), fleets)) + self.fleet = newFleetModel(fleetConfig) + self.mailservers = toSeq(self.fleet.config.getMailservers(status_settings.getFleet()).values) + for mailserver in status_settings.getMailservers().getElems(): + self.mailservers.add(mailserver["address"].getStr()) + +proc getActiveMailserver*(self: MailserverModel): string = self.activeMailserver + +proc isActiveMailserverAvailable*(self: MailserverModel): bool = + if not self.nodes.hasKey(self.activeMailserver): + result = false + else: + result = self.nodes[self.activeMailserver] == MailserverStatus.Connected + +proc connect(self: MailserverModel, enode: string) = + debug "Connecting to mailserver", enode=enode.substr[enode.len-40..enode.len-1] + var connected = false + # TODO: this should come from settings + var knownMailservers = initHashSet[string]() + for m in self.mailservers: + knownMailservers.incl m + if not knownMailservers.contains(enode): + warn "Mailserver not known", enode + return + + self.activeMailserver = enode + self.events.emit("mailserver:changed", MailserverArgs(peer: enode)) + + # Adding a peer and marking it as connected can't be executed sync, because + # There's a delay between requesting a peer being added, and a signal being + # received after the peer was added. So we first set the peer status as + # Connecting and once a peerConnected signal is received, we mark it as + # Connected + + if self.nodes.hasKey(enode) and self.nodes[enode] == MailserverStatus.Connected: + status_mailservers.update(enode) + connected = true + else: + # Attempt to connect to mailserver by adding it as a peer + self.nodes[enode] = MailserverStatus.Connecting + addPeer(enode) + self.lastConnectionAttempt = cpuTime() + + if connected: + self.events.emit("mailserverAvailable", MailserverArgs()) + +proc peerSummaryChange*(self: MailserverModel, peers: seq[string]) = + # When a node is added as a peer, or disconnected + # a DiscoverySummary signal is emitted. In here we + # change the status of the nodes the app is connected to + # Connected / Disconnected and emit peerConnected / peerDisconnected + # events. + var mailserverAvailable = false + for knownPeer in self.nodes.keys: + if not peers.contains(knownPeer) and self.nodes[knownPeer] != MailserverStatus.Disconnected: + debug "Peer disconnected", peer=knownPeer + self.nodes[knownPeer] = MailserverStatus.Disconnected + self.events.emit("peerDisconnected", MailserverArgs(peer: knownPeer)) + if self.activeMailserver == knownPeer: + warn "Active mailserver disconnected!", peer = knownPeer + self.activeMailserver = "" + + for peer in peers: + if self.nodes.hasKey(peer) and (self.nodes[peer] == MailserverStatus.Connected): continue + debug "Peer connected", peer + self.nodes[peer] = MailserverStatus.Connected + self.events.emit("peerConnected", MailserverArgs(peer: peer)) + + if peer == self.activeMailserver: + if self.nodes.hasKey(self.activeMailserver): + if self.activeMailserver == peer: + mailserverAvailable = true + + status_mailservers.update(peer) + + if mailserverAvailable: + debug "Mailserver available" + self.events.emit("mailserverAvailable", MailserverArgs()) + +proc requestMessages*(self: MailserverModel, topics: seq[string], fromValue: int64 = 0, toValue: int64 = 0, force: bool = false) = + debug "Requesting messages from", mailserver=self.activeMailserver + let generatedSymKey = status_chat.generateSymKeyFromPassword() + status_mailservers.requestMessages(topics, generatedSymKey, self.activeMailserver, 1000, fromValue, toValue, force) + +proc getMailserverTopics*(self: MailserverModel): seq[MailserverTopic] = + let response = status_mailservers.getMailserverTopics() + let topics = parseJson(response)["result"] + var newTopic: MailserverTopic + result = @[] + if topics.kind != JNull: + for topic in topics: + newTopic = MailserverTopic( + topic: topic["topic"].getStr, + discovery: topic["discovery?"].getBool, + negotiated: topic["negotiated?"].getBool, + lastRequest: topic["last-request"].getInt + ) + if (topic["chat-ids"].kind != JNull): + newTopic.chatIds = topic["chat-ids"].to(seq[string]) + + result.add(newTopic) + +proc getMailserverTopicsByChatIds*(self: MailserverModel, chatIds: seq[string]): seq[MailServerTopic] = + var topics: seq[MailserverTopic] = @[] + for chatId in chatIds: + let filtered = self.getMailserverTopics().filter(topic => topic.chatIds.contains(chatId)) + topics = topics.concat(filtered) + result = topics + +proc getMailserverTopicsByChatId*(self: MailserverModel, chatId: string): seq[MailServerTopic] = + result = self.getMailserverTopics() + .filter(topic => topic.chatIds.contains(chatId)) + +proc addMailserverTopic*(self: MailserverModel, topic: MailserverTopic) = + discard status_mailservers.addMailserverTopic(topic) + +proc deleteMailserverTopic*(self: MailserverModel, chatId: string) = + var topics = self.getMailserverTopicsByChatId(chatId) + if topics.len == 0: + return + + var topic:MailserverTopic = topics[0] + if(topic.chatIds.len > 1): + discard status_mailservers.addMailserverTopic(topic) + else: + discard status_mailservers.deleteMailserverTopic(topic.topic) + +proc findNewMailserver(self: MailserverModel) = + warn "Finding a new mailserver..." + + let mailserversReply = parseJson(status_mailservers.ping(self.mailservers, 500))["result"] + + var availableMailservers:seq[(string, int)] = @[] + for reply in mailserversReply: + if(reply["error"].kind != JNull): continue # The results with error are ignored + availableMailservers.add((reply["address"].getStr, reply["rttMs"].getInt)) + availableMailservers.sort(cmpMailserverReply) + + # No mailservers where returned... do nothing. + if availableMailservers.len == 0: return + + # Picks a random mailserver amongs the ones with the lowest latency + # The pool size is 1/4 of the mailservers were pinged successfully + randomize() + + let mailServer = availableMailservers[rand(poolSize(availableMailservers.len - 1))][0] + + self.connect(mailserver) + +proc cycleMailservers(self: MailserverModel) = + warn "Automatically switching mailserver" + if self.activeMailserver != "": + warn "Disconnecting active mailserver", peer=self.activeMailserver + self.nodes[self.activeMailserver] = MailserverStatus.Disconnected + removePeer(self.activeMailserver) + self.activeMailserver = "" + self.findNewMailserver() + +proc checkConnection*(self: MailserverModel) {.async.} = + while true: + debug "Verifying mailserver connection state..." + let pinnedMailserver = status_settings.getPinnedMailserver() + if pinnedMailserver != "" and self.activeMailserver != pinnedMailserver: + # connect to current mailserver from the settings + self.mailservers.add(pinnedMailserver) + self.connect(pinnedMailserver) + else: + # or setup a random mailserver: + if not self.isActiveMailserverAvailable: + # TODO: have a timeout for reconnection before changing to a different server + self.cycleMailservers() + await sleepAsync(10.seconds) diff --git a/src/status/tasks/marathon/mailserver/worker.nim b/src/status/tasks/marathon/mailserver/worker.nim new file mode 100644 index 0000000000..c209ea708c --- /dev/null +++ b/src/status/tasks/marathon/mailserver/worker.nim @@ -0,0 +1,177 @@ +import # std libs + json, tables + +import # vendor libs + chronicles, chronos, json_serialization, task_runner + +import # status-desktop libs + ../worker, ./model, ../../qt, ../../common as task_runner_common, + ../common as methuselash_common, + ../../../libstatus/mailservers # TODO: needed for MailserverTopic type, remove? + +export + chronos, task_runner_common, json_serialization + +logScope: + topics = "mailserver worker" + +type + MailserverWorker* = ref object of MarathonWorker + + # below are all custom marathon task arg definitions + GetMailserverTopicsTaskArg* = ref object of MarathonTaskArg + IsActiveMailserverAvailableTaskArg* = ref object of MarathonTaskArg + topics*: seq[MailserverTopic] + GetActiveMailserverTaskArg* = ref object of MarathonTaskArg + RequestMessagesTaskArg* = ref object of MarathonTaskArg + topics*: seq[string] + fromValue*: int64 + toValue*: int64 + force*: bool + AddMailserverTopicTaskArg* = ref object of MarathonTaskArg + topic*: MailserverTopic + PeerSummaryChangeTaskArg* = ref object of MarathonTaskArg + peers*: seq[string] + GetMailserverTopicsByChatIdTaskArg* = ref object of MarathonTaskArg + chatId*: string + fetchRange*: int + GetMailserverTopicsByChatIdsTaskArg* = ref object of MarathonTaskArg + chatIds*: seq[string] + fetchRange*: int + DeleteMailserverTopicTaskArg* = ref object of MarathonTaskArg + chatId*: string + +const + WORKER_NAME = "mailserver" + +# forward declarations +proc workerThread(arg: WorkerThreadArg) {.thread.} + +proc newMailserverWorker*(vptr: ByteAddress): MailserverWorker = + new(result) + result.chanRecvFromWorker = newAsyncChannel[ThreadSafeString](-1) + result.chanSendToWorker = newAsyncChannel[ThreadSafeString](-1) + result.vptr = vptr + +method name*(self: MailserverWorker): string = WORKER_NAME + +method init*(self: MailserverWorker) = + self.chanRecvFromWorker.open() + self.chanSendToWorker.open() + let arg = WorkerThreadArg( + chanSendToMain: self.chanRecvFromWorker, + chanRecvFromMain: self.chanSendToWorker, + vptr: self.vptr + ) + createThread(self.thread, workerThread, arg) + # block until we receive "ready" + discard $(self.chanRecvFromWorker.recvSync()) + +method teardown*(self: MailserverWorker) = + self.chanSendToWorker.sendSync("shutdown".safe) + self.chanRecvFromWorker.close() + self.chanSendToWorker.close() + trace "waiting for the control thread to stop" + joinThread(self.thread) + +method onLoggedIn*(self: MailserverWorker) = + self.chanSendToWorker.sendSync("loggedIn".safe) + +proc processMessage(mailserverModel: MailserverModel, received: string) = + let + parsed = parseJson(received) + messageType = parsed{"$type"}.getStr + methodName = parsed{"method"}.getStr() + trace "initiating mailserver task", methodName=methodName, messageType=messageType + + case methodName + of "getMailserverTopics": + let + taskArg = decode[GetMailserverTopicsTaskArg](received) + output = mailserverModel.getMailserverTopics() + taskArg.finish(output) + + of "isActiveMailserverAvailable": + let + taskArg = decode[IsActiveMailserverAvailableTaskArg](received) + output = mailserverModel.isActiveMailserverAvailable() + payload: tuple[isActiveMailserverAvailable: bool, topics: seq[MailserverTopic]] = (output, taskArg.topics) + taskArg.finish(payload) + + of "requestMessages": + let taskArg = decode[RequestMessagesTaskArg](received) + mailserverModel.requestMessages(taskArg.topics, taskArg.fromValue, taskArg.toValue, taskArg.force) + + of "getActiveMailserver": + let + taskArg = decode[GetActiveMailserverTaskArg](received) + output = mailserverModel.getActiveMailserver() + taskArg.finish(output) + + of "getMailserverTopicsByChatId": + let + taskArg = decode[GetMailserverTopicsByChatIdTaskArg](received) + output = mailserverModel.getMailserverTopicsByChatId(taskArg.chatId) + payload: tuple[topics: seq[MailserverTopic], fetchRange: int] = (output, taskArg.fetchRange) + taskArg.finish(payload) + + of "getMailserverTopicsByChatIds": + let + taskArg = decode[GetMailserverTopicsByChatIdsTaskArg](received) + output = mailserverModel.getMailserverTopicsByChatIds(taskArg.chatIds) + payload: tuple[topics: seq[MailserverTopic], fetchRange: int] = (output, taskArg.fetchRange) + taskArg.finish(payload) + + of "addMailserverTopic": + let taskArg = decode[AddMailserverTopicTaskArg](received) + mailserverModel.addMailserverTopic(taskArg.topic) + + of "peerSummaryChange": + let taskArg = decode[PeerSummaryChangeTaskArg](received) + mailserverModel.peerSummaryChange(taskArg.peers) + + of "deleteMailserverTopic": + let taskArg = decode[DeleteMailserverTopicTaskArg](received) + mailserverModel.deleteMailserverTopic(taskArg.chatId) + + else: + error "unknown message", message=received + +proc worker(arg: WorkerThreadArg) {.async, gcsafe, nimcall.} = + let + chanSendToMain = arg.chanSendToMain + chanRecvFromMain = arg.chanRecvFromMain + chanSendToMain.open() + chanRecvFromMain.open() + + trace "sending 'ready' to main thread" + await chanSendToMain.send("ready".safe) + let mailserverModel = newMailserverModel(arg.vptr) + + var unprocessedMsgs: seq[string] = @[] + while true: + let received = $(await chanRecvFromMain.recv()) + if received == "loggedIn": + mailserverModel.init() + break + else: + unprocessedMsgs.add received + + discard mailserverModel.checkConnection() + + for msg in unprocessedMsgs.items: + mailserverModel.processMessage(msg) + + while true: + trace "waiting for message" + let received = $(await chanRecvFromMain.recv()) + case received + of "shutdown": + trace "received 'shutdown'" + trace "stopping worker" + break + else: + mailserverModel.processMessage(received) + +proc workerThread(arg: WorkerThreadArg) = + waitFor worker(arg) \ No newline at end of file diff --git a/src/status/tasks/marathon/worker.nim b/src/status/tasks/marathon/worker.nim new file mode 100644 index 0000000000..1e40196be4 --- /dev/null +++ b/src/status/tasks/marathon/worker.nim @@ -0,0 +1,49 @@ +import # std libs + json + +import # vendor libs + chronicles, chronos, json_serialization, task_runner + +import # status-desktop libs + ../common + +export + chronos, common, json_serialization + +logScope: + topics = "task-marathon-worker" + +type + WorkerThreadArg* = object # of RootObj + chanSendToMain*: AsyncChannel[ThreadSafeString] + chanRecvFromMain*: AsyncChannel[ThreadSafeString] + vptr*: ByteAddress + MarathonWorker* = ref object of RootObj + chanSendToWorker*: AsyncChannel[ThreadSafeString] + chanRecvFromWorker*: AsyncChannel[ThreadSafeString] + thread*: Thread[WorkerThreadArg] + vptr*: ByteAddress + +method name*(self: MarathonWorker): string {.base.} = + # override this base method + raise newException(CatchableError, "Method without implementation override") + +method init*(self: MarathonWorker) {.base.} = + # override this base method + raise newException(CatchableError, "Method without implementation override") + +method teardown*(self: MarathonWorker) {.base.} = + # override this base method + raise newException(CatchableError, "Method without implementation override") + +method onLoggedIn*(self: MarathonWorker) {.base.} = + # override this base method + raise newException(CatchableError, "Method without implementation override") + +method worker(arg: WorkerThreadArg) {.async, base, gcsafe, nimcall.} = + # override this base method + raise newException(CatchableError, "Method without implementation override") + +method workerThread(arg: WorkerThreadArg) {.thread, base, gcsafe, nimcall.} = + # override this base method + raise newException(CatchableError, "Method without implementation override") \ No newline at end of file diff --git a/src/status/tasks/task_runner_impl.nim b/src/status/tasks/task_runner_impl.nim index 91727ed1c6..395da24f52 100644 --- a/src/status/tasks/task_runner_impl.nim +++ b/src/status/tasks/task_runner_impl.nim @@ -2,9 +2,9 @@ import # vendor libs chronicles, task_runner import # status-desktop libs - ./threadpool + ./marathon, ./threadpool -export task_runner, threadpool +export marathon, task_runner, threadpool logScope: topics = "task-runner" @@ -12,13 +12,17 @@ logScope: type TaskRunner* = ref object threadpool*: ThreadPool + marathon*: Marathon proc newTaskRunner*(): TaskRunner = new(result) result.threadpool = newThreadPool() + result.marathon = newMarathon() proc init*(self: TaskRunner) = self.threadpool.init() + self.marathon.init() proc teardown*(self: TaskRunner) = self.threadpool.teardown() + self.marathon.teardown() diff --git a/src/status/tasks/threadpool.nim b/src/status/tasks/threadpool.nim index 9bbb9ce4f3..6e8c1b2553 100644 --- a/src/status/tasks/threadpool.nim +++ b/src/status/tasks/threadpool.nim @@ -2,7 +2,7 @@ import # std libs json, sequtils, tables import # vendor libs - chronicles, chronos, json_serialization, NimQml, task_runner + chronicles, chronos, json_serialization, task_runner import # status-desktop libs ./common diff --git a/ui/app/AppLayouts/Profile/Sections/SyncContainer.qml b/ui/app/AppLayouts/Profile/Sections/SyncContainer.qml index c5824d5498..cd8e974652 100644 --- a/ui/app/AppLayouts/Profile/Sections/SyncContainer.qml +++ b/ui/app/AppLayouts/Profile/Sections/SyncContainer.qml @@ -9,7 +9,13 @@ Item { id: syncContainer Layout.fillHeight: true Layout.fillWidth: true - clip: true + + Connections { + target: profileModel.mailservers + onActiveMailserverChanged: (activeMailserver) => { + syncContainer.activeMailserver = activeMailserver + } + } Item { width: profileContainer.profileContentWidth @@ -20,8 +26,9 @@ Item { id: mailserversList StatusRadioButton { + id: rbSetMailsever text: name - checked: name === profileModel.mailservers.activeMailserver + checked: name === activeMailserver.activeMailserver onClicked: { if (checked) { profileModel.mailservers.setMailserver(name); @@ -175,6 +182,10 @@ Item { model: profileModel.mailservers.list delegate: mailserversList visible: !automaticSelectionSwitch.checked + + Component.onCompleted: { + profileModel.mailservers.getActiveMailserver() + } } } }