refactor(@desktop/general): mailservers doesn't refer to the old `status-lib` code
This commit is contained in:
parent
e220599612
commit
6ba0cdb7aa
|
@ -30,7 +30,7 @@ type
|
|||
meta: Meta
|
||||
|
||||
## Forward declaration
|
||||
proc extractConfig(self: FleetConfiguration, jsonString: string)
|
||||
proc extractConfig(self: FleetConfiguration, jsonString: string) {.gcsafe.}
|
||||
|
||||
proc newFleetConfiguration*(jsonString: string): FleetConfiguration =
|
||||
result = FleetConfiguration()
|
||||
|
@ -39,7 +39,7 @@ proc newFleetConfiguration*(jsonString: string): FleetConfiguration =
|
|||
proc delete*(self: FleetConfiguration) =
|
||||
discard
|
||||
|
||||
proc extractConfig(self: FleetConfiguration, jsonString: string) =
|
||||
proc extractConfig(self: FleetConfiguration, jsonString: string) {.gcsafe.} =
|
||||
let fleetJson = jsonString.parseJSON
|
||||
self.meta.hostname = fleetJson["meta"]["hostname"].getStr
|
||||
self.meta.timestamp = fleetJson["meta"]["timestamp"].getBiggestInt.uint64
|
||||
|
|
|
@ -1,8 +1,13 @@
|
|||
import # std libs
|
||||
strutils
|
||||
import NimQml, times, strutils, json, json_serialization, chronicles
|
||||
|
||||
import # vendor libs
|
||||
chronicles, NimQml, json_serialization
|
||||
import ../../../fleets/fleet_configuration
|
||||
import ../../../../../app_service/service/settings/service_interface as settings_service
|
||||
import ../../../../../app_service/service/node_configuration/service_interface as node_config_service
|
||||
|
||||
import status/statusgo_backend_new/settings as status_settings
|
||||
import status/statusgo_backend_new/node_config as status_node_config
|
||||
import status/statusgo_backend_new/mailservers as status_mailservers
|
||||
import status/statusgo_backend_new/general as status_general
|
||||
|
||||
import events
|
||||
import ../../common as task_runner_common
|
||||
|
@ -12,6 +17,9 @@ import eventemitter
|
|||
logScope:
|
||||
topics = "mailserver controller"
|
||||
|
||||
const STATUS_MAILSERVER_PASS = "status-offline-inbox"
|
||||
const STATUS_STORE_MESSAGES_TIMEOUT = 30
|
||||
|
||||
################################################################################
|
||||
## ##
|
||||
## NOTE: MailserverController runs on the main thread ##
|
||||
|
@ -35,4 +43,164 @@ QtObject:
|
|||
proc receiveEvent(self: MailserverController, eventTuple: string) {.slot.} =
|
||||
let event = Json.decode(eventTuple, tuple[name: string, arg: MailserverArgs])
|
||||
trace "forwarding event from long-running mailserver task to the main thread", event=eventTuple
|
||||
self.events.emit(event.name, event.arg)
|
||||
self.events.emit(event.name, event.arg)
|
||||
|
||||
# In case of mailserver task, we need to fetch data directly from the `status-go`, and that's why direct calls to
|
||||
# `status-lib` are made here. If we use services here, the state remains the same as it was in the moment when certain
|
||||
# service is passed to the mailserver thread.
|
||||
proc getCurrentSettings(self: MailserverController): SettingsDto =
|
||||
try:
|
||||
let response = status_settings.getSettings()
|
||||
let settings = response.result.toSettingsDto()
|
||||
return settings
|
||||
except Exception as e:
|
||||
let errDesription = e.msg
|
||||
error "error: ", methodName="mailserver-getCurrentSettings", errDesription
|
||||
|
||||
proc getCurrentNodeConfiguration(self: MailserverController): NodeConfigDto =
|
||||
try:
|
||||
let response = status_node_config.getNodeConfig()
|
||||
let configuration = response.result.toNodeConfigDto()
|
||||
return configuration
|
||||
except Exception as e:
|
||||
let errDesription = e.msg
|
||||
error "error: ", methodName="mailserver-getCurrentNodeConfiguration", errDesription
|
||||
|
||||
proc getCurrentMailservers*(self: MailserverController): seq[JsonNode] =
|
||||
try:
|
||||
let response = status_mailservers.getMailservers()
|
||||
return response.result.getElems()
|
||||
except Exception as e:
|
||||
let errDesription = e.msg
|
||||
error "error: ", methodName="mailserver-getCurrentMailservers", errDesription
|
||||
|
||||
proc getFleet*(self: MailserverController): string =
|
||||
let settings = self.getCurrentSettings()
|
||||
var fleet = settings_service.DEFAULT_FLEET
|
||||
if(settings.fleet.len > 0):
|
||||
fleet = settings.fleet
|
||||
return fleet
|
||||
|
||||
proc getWakuVersion*(self: MailserverController): int =
|
||||
let nodeConfiguration = self.getCurrentNodeConfiguration()
|
||||
if nodeConfiguration.WakuConfig.Enabled:
|
||||
return WAKU_VERSION_1
|
||||
elif nodeConfiguration.WakuV2Config.Enabled:
|
||||
return WAKU_VERSION_2
|
||||
|
||||
error "error: unsupported waku version", methodName="mailserver-getWakuVersion"
|
||||
return 0
|
||||
|
||||
proc getPinnedMailserver*(self: MailserverController): string =
|
||||
let settings = self.getCurrentSettings()
|
||||
let fleet = self.getFleet()
|
||||
|
||||
if (fleet == $Fleet.Prod):
|
||||
return settings.pinnedMailserver.ethProd
|
||||
elif (fleet == $Fleet.Staging):
|
||||
return settings.pinnedMailserver.ethStaging
|
||||
elif (fleet == $Fleet.Test):
|
||||
return settings.pinnedMailserver.ethTest
|
||||
elif (fleet == $Fleet.WakuV2Prod):
|
||||
return settings.pinnedMailserver.wakuv2Prod
|
||||
elif (fleet == $Fleet.WakuV2Test):
|
||||
return settings.pinnedMailserver.wakuv2Test
|
||||
elif (fleet == $Fleet.GoWakuTest):
|
||||
return settings.pinnedMailserver.goWakuTest
|
||||
return ""
|
||||
|
||||
proc dialPeer*(self: MailserverController, address: string): bool =
|
||||
try:
|
||||
let response = status_general.dialPeer(address)
|
||||
if response.result.hasKey("error"):
|
||||
let errMsg = $response.result
|
||||
error "waku peer could not be dialed", methodName="mailserver-dialPeer", errMsg
|
||||
return false
|
||||
return true
|
||||
except Exception as e:
|
||||
let errDesription = e.msg
|
||||
error "error: ", methodName="mailserver-dialPeer", errDesription
|
||||
return false
|
||||
|
||||
proc generateSymKeyFromPassword*(self: MailserverController): string =
|
||||
try:
|
||||
let response = status_general.generateSymKeyFromPassword(STATUS_MAILSERVER_PASS)
|
||||
let resultAsString = $response.result
|
||||
return resultAsString.strip(chars = {'"'})
|
||||
except Exception as e:
|
||||
let errDesription = e.msg
|
||||
error "error: ", methodName="mailserver-dialPeer", errDesription
|
||||
|
||||
proc setMailserver*(self: MailserverController, peer: string) =
|
||||
try:
|
||||
discard status_mailservers.setMailserver(peer)
|
||||
except Exception as e:
|
||||
let errDesription = e.msg
|
||||
error "error: ", methodName="mailserver-setMailserver", errDesription
|
||||
|
||||
proc update*(self: MailserverController, peer: string) =
|
||||
try:
|
||||
discard status_mailservers.update(peer)
|
||||
except Exception as e:
|
||||
let errDesription = e.msg
|
||||
error "error: ", methodName="mailserver-update", errDesription
|
||||
|
||||
proc requestAllHistoricMessages*(self: MailserverController) =
|
||||
try:
|
||||
discard status_mailservers.requestAllHistoricMessages()
|
||||
except Exception as e:
|
||||
let errDesription = e.msg
|
||||
error "error: ", methodName="mailserver-requestAllHistoricMessages", errDesription
|
||||
|
||||
proc requestStoreMessages*(self: MailserverController, topics: seq[string], symKeyID: string, peer: string,
|
||||
numberOfMessages: int, fromTimestamp: int64, toTimestamp: int64, force: bool) =
|
||||
try:
|
||||
var toValue = toTimestamp
|
||||
if toValue <= 0:
|
||||
toValue = times.toUnix(times.getTime())
|
||||
|
||||
var fromValue = fromTimestamp
|
||||
if fromValue <= 0:
|
||||
fromValue = toValue - 86400
|
||||
|
||||
discard status_mailservers.requestStoreMessages(topics, STATUS_STORE_MESSAGES_TIMEOUT, symKeyID, peer,
|
||||
numberOfMessages, fromValue, toValue, force)
|
||||
except Exception as e:
|
||||
let errDesription = e.msg
|
||||
error "error: ", methodName="mailserver-requestStoreMessages", errDesription
|
||||
|
||||
proc syncChatFromSyncedFrom*(self: MailserverController, chatId: string) =
|
||||
try:
|
||||
discard status_mailservers.syncChatFromSyncedFrom(chatId)
|
||||
except Exception as e:
|
||||
let errDesription = e.msg
|
||||
error "error: ", methodName="mailserver-syncChatFromSyncedFrom", errDesription
|
||||
|
||||
proc fillGaps*(self: MailserverController, chatId: string, messageIds: seq[string]) =
|
||||
try:
|
||||
discard status_mailservers.fillGaps(chatId, messageIds)
|
||||
except Exception as e:
|
||||
let errDesription = e.msg
|
||||
error "error: ", methodName="mailserver-fillGaps", errDesription
|
||||
|
||||
proc ping*(self: MailserverController, addresses: seq[string], timeoutMs: int, isWakuV2: bool): JsonNode =
|
||||
try:
|
||||
let response = status_mailservers.ping(addresses, timeoutMs, isWakuV2)
|
||||
return response.result
|
||||
except Exception as e:
|
||||
let errDesription = e.msg
|
||||
error "error: ", methodName="mailserver-ping", errDesription
|
||||
|
||||
proc dropPeerByID*(self: MailserverController, peer: string) =
|
||||
try:
|
||||
discard status_general.dropPeerByID(peer)
|
||||
except Exception as e:
|
||||
let errDesription = e.msg
|
||||
error "error: ", methodName="mailserver-dropPeerByID", errDesription
|
||||
|
||||
proc removePeer*(self: MailserverController, peer: string) =
|
||||
try:
|
||||
discard status_general.removePeer(peer)
|
||||
except Exception as e:
|
||||
let errDesription = e.msg
|
||||
error "error: ", methodName="mailserver-removePeer", errDesription
|
|
@ -3,13 +3,11 @@ import
|
|||
tables, strutils
|
||||
from times import cpuTime
|
||||
|
||||
import
|
||||
status/statusgo_backend/settings as status_settings,
|
||||
status/statusgo_backend/chat as status_chat,
|
||||
status/statusgo_backend/mailservers as status_mailservers,
|
||||
status/statusgo_backend/core as status_core,
|
||||
status/fleet,
|
||||
./events as mailserver_events
|
||||
import ../../../fleets/fleet_configuration
|
||||
import ../../../../../app_service/service/node_configuration/service_interface as node_config_service
|
||||
|
||||
import controller
|
||||
import events as mailserver_events
|
||||
|
||||
logScope:
|
||||
topics = "mailserver model"
|
||||
|
@ -38,13 +36,9 @@ type
|
|||
nodes*: Table[string, MailserverStatus]
|
||||
activeMailserver*: string
|
||||
lastConnectionAttempt*: float
|
||||
## At this moment we cannot remove FleetModel from `status-lib` easily since the following error occurs:
|
||||
## /desktop-app/src/app/core/tasks/marathon/mailserver/worker.nim(120, 37) template/generic instantiation of `async` from here
|
||||
## /desktop-app/vendor/status-lib/vendor/nim-task-runner/vendor/nim-chronos/chronos/asyncmacro2.nim(210, 31) Error: 'worker' is not GC-safe as it calls 'init'
|
||||
##
|
||||
## But at some point in future we will spend more time and figure out what's the issue.
|
||||
fleet*: FleetModel
|
||||
fleetConfiguration*: FleetConfiguration
|
||||
wakuVersion*: int
|
||||
mailserverController: MailserverController
|
||||
|
||||
MailserverStatus* = enum
|
||||
Unknown = -1,
|
||||
|
@ -68,6 +62,7 @@ proc newMailserverModel*(vptr: ByteAddress): MailserverModel =
|
|||
result.events = newMailserverEvents(vptr)
|
||||
result.nodes = initTable[string, MailserverStatus]()
|
||||
result.activeMailserver = ""
|
||||
result.mailserverController = cast[MailserverController](vptr)
|
||||
|
||||
proc init*(self: MailserverModel) =
|
||||
trace "MailserverModel::init()"
|
||||
|
@ -77,17 +72,16 @@ proc init*(self: MailserverModel) =
|
|||
else:
|
||||
"/../fleets.json"
|
||||
|
||||
self.wakuVersion = status_settings.getWakuVersion()
|
||||
|
||||
let fleetConfig = readFile(joinPath(getAppDir(), fleets))
|
||||
self.fleet = newFleetModel(fleetConfig)
|
||||
self.wakuVersion = status_settings.getWakuVersion()
|
||||
self.fleetConfiguration = newFleetConfiguration(fleetConfig)
|
||||
self.wakuVersion = self.mailserverController.getWakuVersion()
|
||||
|
||||
let fleet = parseEnum[Fleet](status_settings.getFleet())
|
||||
self.mailservers = toSeq(self.fleet.config.getMailservers(fleet, self.wakuVersion == 2).values)
|
||||
let fleet = parseEnum[Fleet](self.mailserverController.getFleet())
|
||||
self.mailservers = toSeq(self.fleetConfiguration.getMailservers(fleet, self.wakuVersion == WAKU_VERSION_2).values)
|
||||
|
||||
for mailserver in status_settings.getMailservers().getElems():
|
||||
self.mailservers.add(mailserver["address"].getStr())
|
||||
let mailservers = self.mailserverController.getCurrentMailservers()
|
||||
for m in mailservers:
|
||||
self.mailservers.add(m["address"].getStr())
|
||||
|
||||
proc getActiveMailserver*(self: MailserverModel): string = self.activeMailserver
|
||||
|
||||
|
@ -108,7 +102,7 @@ proc connect(self: MailserverModel, nodeAddr: string) =
|
|||
warn "Mailserver not known", nodeAddr
|
||||
return
|
||||
|
||||
self.activeMailserver = if self.wakuVersion == 2: peerIdFromMultiAddress(nodeAddr) else: nodeAddr
|
||||
self.activeMailserver = if self.wakuVersion == WAKU_VERSION_2: peerIdFromMultiAddress(nodeAddr) else: nodeAddr
|
||||
self.events.emit("mailserver:changed", MailserverArgs(peer: nodeAddr))
|
||||
|
||||
# Adding a peer and marking it as connected can't be executed sync in WakuV1, because
|
||||
|
@ -121,13 +115,13 @@ proc connect(self: MailserverModel, nodeAddr: string) =
|
|||
connected = true
|
||||
else:
|
||||
# Attempt to connect to mailserver by adding it as a peer
|
||||
if self.wakuVersion == 2:
|
||||
if status_core.dialPeer(nodeAddr): # WakuV2 dial is sync (should it be async?)
|
||||
discard status_mailservers.setMailserver(self.activeMailserver)
|
||||
if self.wakuVersion == WAKU_VERSION_2:
|
||||
if self.mailserverController.dialPeer(nodeAddr): # WakuV2 dial is sync (should it be async?)
|
||||
self.mailserverController.setMailserver(self.activeMailserver)
|
||||
self.nodes[self.activeMailserver] = MailserverStatus.Connected
|
||||
connected = true
|
||||
else:
|
||||
status_mailservers.update(nodeAddr)
|
||||
self.mailserverController.update(nodeAddr)
|
||||
self.nodes[nodeAddr] = MailserverStatus.Connecting
|
||||
|
||||
self.lastConnectionAttempt = cpuTime()
|
||||
|
@ -170,25 +164,25 @@ proc peerSummaryChange*(self: MailserverModel, peers: seq[string]) =
|
|||
|
||||
proc requestMessages*(self: MailserverModel) =
|
||||
info "Requesting messages from", mailserver=self.activeMailserver
|
||||
discard status_mailservers.requestAllHistoricMessages()
|
||||
self.mailserverController.requestAllHistoricMessages()
|
||||
|
||||
proc requestStoreMessages*(self: MailserverModel, topics: seq[string], fromValue: int64 = 0, toValue: int64 = 0, force: bool = false) =
|
||||
info "Requesting messages from", mailserver=self.activeMailserver
|
||||
let generatedSymKey = status_chat.generateSymKeyFromPassword()
|
||||
status_mailservers.requestStoreMessages(topics, generatedSymKey, self.activeMailserver, 1000, fromValue, toValue, force)
|
||||
let generatedSymKey = self.mailserverController.generateSymKeyFromPassword()
|
||||
self.mailserverController.requestStoreMessages(topics, generatedSymKey, self.activeMailserver, 1000, fromValue, toValue, force)
|
||||
|
||||
proc requestMoreMessages*(self: MailserverModel, chatId: string) =
|
||||
info "Requesting more messages from", mailserver=self.activeMailserver, chatId=chatId
|
||||
discard status_mailservers.syncChatFromSyncedFrom(chatId)
|
||||
self.mailserverController.syncChatFromSyncedFrom(chatId)
|
||||
|
||||
proc fillGaps*(self: MailserverModel, chatId: string, messageIds: seq[string]) =
|
||||
info "Requesting fill gaps from", mailserver=self.activeMailserver, chatId=chatId
|
||||
discard status_mailservers.fillGaps(chatId, messageIds)
|
||||
self.mailserverController.fillGaps(chatId, messageIds)
|
||||
|
||||
proc findNewMailserver(self: MailserverModel) =
|
||||
warn "Finding a new mailserver...", wakuVersion=self.wakuVersion
|
||||
|
||||
let mailserversReply = parseJson(status_mailservers.ping(self.mailservers, 500, self.wakuVersion == 2))["result"]
|
||||
let mailserversReply = self.mailserverController.ping(self.mailservers, 500, self.wakuVersion == WAKU_VERSION_2)
|
||||
|
||||
var availableMailservers:seq[(string, int)] = @[]
|
||||
for reply in mailserversReply:
|
||||
|
@ -214,18 +208,21 @@ proc cycleMailservers(self: MailserverModel) =
|
|||
if self.activeMailserver != "":
|
||||
info "Disconnecting active mailserver", peer=self.activeMailserver
|
||||
self.nodes[self.activeMailserver] = MailserverStatus.Disconnected
|
||||
if self.wakuVersion == 2:
|
||||
dropPeerByID(self.activeMailserver)
|
||||
if self.wakuVersion == WAKU_VERSION_2:
|
||||
self.mailserverController.dropPeerByID(self.activeMailserver)
|
||||
else:
|
||||
removePeer(self.activeMailserver)
|
||||
self.mailserverController.removePeer(self.activeMailserver)
|
||||
self.activeMailserver = ""
|
||||
self.findNewMailserver()
|
||||
|
||||
proc checkConnection*(self: MailserverModel) {.async.} =
|
||||
while true:
|
||||
info "Verifying mailserver connection state..."
|
||||
let pinnedMailserver = status_settings.getPinnedMailserver()
|
||||
if self.wakuVersion == 1 and pinnedMailserver != "" and self.activeMailserver != pinnedMailserver:
|
||||
var pinnedMailserver = ""
|
||||
if(not self.mailserverController.isNil):
|
||||
pinnedMailserver = self.mailserverController.getPinnedMailserver()
|
||||
|
||||
if self.wakuVersion == WAKU_VERSION_1 and pinnedMailserver != "" and self.activeMailserver != pinnedMailserver:
|
||||
# connect to current mailserver from the settings
|
||||
self.mailservers.add(pinnedMailserver)
|
||||
self.connect(pinnedMailserver)
|
||||
|
|
|
@ -7,6 +7,7 @@ import # vendor libs
|
|||
import # status-desktop libs
|
||||
../worker, ./model, ../../qt, ../../common as task_runner_common,
|
||||
../common as methuselash_common
|
||||
|
||||
export
|
||||
chronos, task_runner_common, json_serialization
|
||||
|
||||
|
@ -84,6 +85,12 @@ proc processMessage(mailserverModel: MailserverModel, received: string) =
|
|||
mailserverModel.requestMessages()
|
||||
taskArg.finish("") # TODO:
|
||||
|
||||
of "isActiveMailserverAvailable":
|
||||
let
|
||||
taskArg = decode[IsActiveMailserverAvailableTaskArg](received)
|
||||
output = mailserverModel.isActiveMailserverAvailable()
|
||||
taskArg.finish(output)
|
||||
|
||||
of "requestMessages":
|
||||
let taskArg = decode[RequestMessagesTaskArg](received)
|
||||
mailserverModel.requestMessages()
|
||||
|
@ -96,6 +103,16 @@ proc processMessage(mailserverModel: MailserverModel, received: string) =
|
|||
let taskArg = decode[FillGapsTaskArg](received)
|
||||
mailserverModel.fillGaps(taskArg.chatId, taskArg.messageIds)
|
||||
|
||||
of "getActiveMailserver":
|
||||
let
|
||||
taskArg = decode[GetActiveMailserverTaskArg](received)
|
||||
output = mailserverModel.getActiveMailserver()
|
||||
taskArg.finish(output)
|
||||
|
||||
of "peerSummaryChange":
|
||||
let taskArg = decode[PeerSummaryChangeTaskArg](received)
|
||||
mailserverModel.peerSummaryChange(taskArg.peers)
|
||||
|
||||
else:
|
||||
error "unknown message", message=received
|
||||
|
||||
|
@ -122,6 +139,9 @@ proc worker(arg: WorkerThreadArg) {.async, gcsafe, nimcall.} =
|
|||
return
|
||||
else:
|
||||
unprocessedMsgs.add received
|
||||
|
||||
mailserverModel.init()
|
||||
discard mailserverModel.checkConnection()
|
||||
|
||||
for msg in unprocessedMsgs.items:
|
||||
mailserverModel.processMessage(msg)
|
||||
|
|
Loading…
Reference in New Issue