refactor: mailserver_cycle (base_bc)

This commit is contained in:
Richard Ramos 2022-01-07 14:09:35 -04:00 committed by Sale Djenic
parent f24d6e968b
commit 4cb3126513
16 changed files with 125 additions and 415 deletions

View File

@ -183,26 +183,6 @@ proc handleChatEvents(self: ChatController) =
#Notifying communities about this change. #Notifying communities about this change.
self.view.communities.markNotificationsAsRead(markAsReadProps) self.view.communities.markNotificationsAsRead(markAsReadProps)
proc handleMailserverEvents(self: ChatController) =
let mailserverWorker = self.statusFoundation.marathon[MailserverWorker().name]
# TODO: test mailserver topics when joining chat
self.status.events.on("channelJoined") do(e:Args):
let task = IsActiveMailserverAvailableTaskArg(
`method`: "isActiveMailserverAvailable",
vptr: cast[ByteAddress](self.view.vptr),
slot: "isActiveMailserverResult"
)
mailserverWorker.start(task)
self.status.events.on("mailserverAvailable") do(e:Args):
self.view.messageView.setLoadingMessages(true)
let task = RequestMessagesTaskArg(
`method`: "requestMessages",
vptr: cast[ByteAddress](self.view.vptr),
slot: "requestAllHistoricMessagesResult"
)
mailserverWorker.start(task)
proc handleSystemEvents(self: ChatController) = proc handleSystemEvents(self: ChatController) =
discard discard
# Not refactored yet - don't delete # Not refactored yet - don't delete

View File

@ -27,6 +27,17 @@ proc handleSignals(self: ChatController) =
) )
mailserverWorker.start(task) mailserverWorker.start(task)
self.status.events.on(SignalType.MailserverAvailable.event) do(e:Args):
var data = MailserverAvailableSignal(e)
info "active mailserver changed", node=data.address, topics="mailserver-interaction"
self.view.messageView.setLoadingMessages(true)
let
mailserverWorker = self.statusFoundation.marathon[MailserverWorker().name]
task = RequestMessagesTaskArg(
`method`: "requestMessages"
)
mailserverWorker.start(task)
self.status.events.on(SignalType.EnvelopeSent.event) do(e:Args): self.status.events.on(SignalType.EnvelopeSent.event) do(e:Args):
var data = EnvelopeSentSignal(e) var data = EnvelopeSentSignal(e)
self.status.messages.updateStatus(data.messageIds) self.status.messages.updateStatus(data.messageIds)

View File

@ -387,18 +387,6 @@ QtObject:
QtProperty[QVariant] transactions: QtProperty[QVariant] transactions:
read = getTransactions read = getTransactions
proc isActiveMailserverResult(self: ChatsView, resultEncoded: string) {.slot.} =
let isActiveMailserverAvailable = decode[bool](resultEncoded)
if isActiveMailserverAvailable:
self.messageView.setLoadingMessages(true)
let
mailserverWorker = self.statusFoundation.marathon[MailserverWorker().name]
task = RequestMessagesTaskArg(`method`: "requestMessages")
mailserverWorker.start(task)
proc requestAllHistoricMessagesResult(self: ChatsView, resultEncoded: string) {.slot.} =
self.messageView.setLoadingMessages(true)
proc createCommunityChannel*(self: ChatsView, communityId: string, name: string, description: string, categoryId: string): string {.slot.} = proc createCommunityChannel*(self: ChatsView, communityId: string, name: string, description: string, categoryId: string): string {.slot.} =
try: try:
let chat = self.status.chat.createCommunityChannel(communityId, name, description) let chat = self.status.chat.createCommunityChannel(communityId, name, description)
@ -492,15 +480,6 @@ QtObject:
proc markMessageAsSent*(self: ChatsView, chat: string, messageId: string) = proc markMessageAsSent*(self: ChatsView, chat: string, messageId: string) =
self.messageView.markMessageAsSent(chat, messageId) self.messageView.markMessageAsSent(chat, messageId)
# TODO: this method was created just to test the store functionality.
# It should be removed, once peer management is added to status-go
proc requestAllHistoricMessages(self: ChatsView) {.slot.} =
debug "Requesting messages"
# TODO: the mailservers must change depending on whether we are using wakuV1 or wakuV2
# in the meantime I'm hardcoding a specific mailserver
echo self.status.mailservers.setMailserver("16Uiu2HAm4v86W3bmT1BiH6oSPzcsSr24iDQpSN5Qa992BCjjwgrD")
echo self.status.mailservers.requestAllHistoricMessages()
proc switchTo*(self: ChatsView, communityId: string, channelId: string, proc switchTo*(self: ChatsView, communityId: string, channelId: string,
messageId: string) = messageId: string) =
## This method displays community with communityId as an active one (if ## This method displays community with communityId as an active one (if

View File

@ -30,6 +30,12 @@ type HistoryRequestFailedSignal* = ref object of Signal
errorMessage*: string errorMessage*: string
error*: bool error*: bool
type MailserverAvailableSignal* = ref object of Signal
address*: string
type MailserverChangedSignal* = ref object of Signal
address*: string
proc fromEvent*(T: type MailserverRequestCompletedSignal, jsonSignal: JsonNode): MailserverRequestCompletedSignal = proc fromEvent*(T: type MailserverRequestCompletedSignal, jsonSignal: JsonNode): MailserverRequestCompletedSignal =
result = MailserverRequestCompletedSignal() result = MailserverRequestCompletedSignal()
result.signalType = SignalType.MailserverRequestCompleted result.signalType = SignalType.MailserverRequestCompleted
@ -70,3 +76,13 @@ proc fromEvent*(T: type HistoryRequestFailedSignal, jsonSignal: JsonNode): Histo
if jsonSignal["event"].kind != JNull: if jsonSignal["event"].kind != JNull:
result.errorMessage = jsonSignal["event"]{"errorMessage"}.getStr() result.errorMessage = jsonSignal["event"]{"errorMessage"}.getStr()
result.error = result.errorMessage != "" result.error = result.errorMessage != ""
proc fromEvent*(T: type MailserverAvailableSignal, jsonSignal: JsonNode): MailserverAvailableSignal =
result = MailserverAvailableSignal()
result.signalType = SignalType.MailserverAvailable
result.address = jsonSignal["event"]{"address"}.getStr()
proc fromEvent*(T: type MailserverChangedSignal, jsonSignal: JsonNode): MailserverChangedSignal =
result = MailserverChangedSignal()
result.signalType = SignalType.MailserverChanged
result.address = jsonSignal["event"]{"address"}.getStr()

View File

@ -27,6 +27,8 @@ type SignalType* {.pure.} = enum
HistoryRequestFailed = "history.request.failed" HistoryRequestFailed = "history.request.failed"
HistoryRequestBatchProcessed = "history.request.batch.processed" HistoryRequestBatchProcessed = "history.request.batch.processed"
KeycardConnected = "keycard.connected" KeycardConnected = "keycard.connected"
MailserverAvailable = "mailserver.available"
MailserverChanged = "mailserver.changed"
Unknown Unknown
proc event*(self:SignalType):string = proc event*(self:SignalType):string =

View File

@ -82,6 +82,8 @@ QtObject:
of SignalType.HistoryRequestFailed: HistoryRequestFailedSignal.fromEvent(jsonSignal) of SignalType.HistoryRequestFailed: HistoryRequestFailedSignal.fromEvent(jsonSignal)
of SignalType.HistoryRequestBatchProcessed: HistoryRequestBatchProcessedSignal.fromEvent(jsonSignal) of SignalType.HistoryRequestBatchProcessed: HistoryRequestBatchProcessedSignal.fromEvent(jsonSignal)
of SignalType.KeycardConnected: KeycardConnectedSignal.fromEvent(jsonSignal) of SignalType.KeycardConnected: KeycardConnectedSignal.fromEvent(jsonSignal)
of SignalType.MailserverAvailable: MailserverAvailableSignal.fromEvent(jsonSignal)
of SignalType.MailserverChanged: MailserverChangedSignal.fromEvent(jsonSignal)
else: Signal() else: Signal()
result.signalType = signalType result.signalType = signalType

View File

@ -1,4 +1,4 @@
import NimQml, times, strutils, json, json_serialization, chronicles import NimQml, json_serialization, chronicles
import ../../../eventemitter import ../../../eventemitter
import ../../../fleets/fleet_configuration import ../../../fleets/fleet_configuration
@ -6,11 +6,8 @@ import ../../../../../app_service/service/settings/service_interface as settings
import ../../../../../app_service/service/node_configuration/service_interface as node_config_service import ../../../../../app_service/service/node_configuration/service_interface as node_config_service
import status/statusgo_backend_new/settings as status_settings import status/statusgo_backend_new/settings as status_settings
import status/statusgo_backend_new/node_config as status_node_config
import status/statusgo_backend_new/mailservers as status_mailservers import status/statusgo_backend_new/mailservers as status_mailservers
import status/statusgo_backend_new/general as status_general
import events
import ../../common as task_runner_common import ../../common as task_runner_common
logScope: logScope:
@ -39,11 +36,6 @@ QtObject:
proc delete*(self: MailserverController) = proc delete*(self: MailserverController) =
self.QObject.delete self.QObject.delete
proc receiveEvent(self: MailserverController, eventTuple: string) {.slot.} =
let event = Json.decode(eventTuple, tuple[name: string, arg: MailserverArgs])
trace "forwarding event from long-running mailserver task to the main thread", event=eventTuple
self.events.emit(event.name, event.arg)
# In case of mailserver task, we need to fetch data directly from the `status-go`, and that's why direct calls to # In case of mailserver task, we need to fetch data directly from the `status-go`, and that's why direct calls to
# `status-lib` are made here. If we use services here, the state remains the same as it was in the moment when certain # `status-lib` are made here. If we use services here, the state remains the same as it was in the moment when certain
# service is passed to the mailserver thread. # service is passed to the mailserver thread.
@ -56,22 +48,6 @@ QtObject:
let errDesription = e.msg let errDesription = e.msg
error "error: ", methodName="mailserver-getCurrentSettings", errDesription error "error: ", methodName="mailserver-getCurrentSettings", errDesription
proc getCurrentNodeConfiguration(self: MailserverController): NodeConfigDto =
try:
let response = status_node_config.getNodeConfig()
let configuration = response.result.toNodeConfigDto()
return configuration
except Exception as e:
let errDesription = e.msg
error "error: ", methodName="mailserver-getCurrentNodeConfiguration", errDesription
proc getCurrentMailservers*(self: MailserverController): seq[JsonNode] =
try:
let response = status_mailservers.getMailservers()
return response.result.getElems()
except Exception as e:
let errDesription = e.msg
error "error: ", methodName="mailserver-getCurrentMailservers", errDesription
proc getFleet*(self: MailserverController): string = proc getFleet*(self: MailserverController): string =
let settings = self.getCurrentSettings() let settings = self.getCurrentSettings()
@ -80,16 +56,6 @@ QtObject:
fleet = settings.fleet fleet = settings.fleet
return fleet return fleet
proc getWakuVersion*(self: MailserverController): int =
let nodeConfiguration = self.getCurrentNodeConfiguration()
if nodeConfiguration.WakuConfig.Enabled:
return WAKU_VERSION_1
elif nodeConfiguration.WakuV2Config.Enabled:
return WAKU_VERSION_2
error "error: unsupported waku version", methodName="mailserver-getWakuVersion"
return 0
proc getPinnedMailserver*(self: MailserverController): string = proc getPinnedMailserver*(self: MailserverController): string =
let settings = self.getCurrentSettings() let settings = self.getCurrentSettings()
let fleet = self.getFleet() let fleet = self.getFleet()
@ -108,42 +74,6 @@ QtObject:
return settings.pinnedMailserver.goWakuTest return settings.pinnedMailserver.goWakuTest
return "" return ""
proc dialPeer*(self: MailserverController, address: string): bool =
try:
let response = status_general.dialPeer(address)
if response.result.hasKey("error"):
let errMsg = $response.result
error "waku peer could not be dialed", methodName="mailserver-dialPeer", errMsg
return false
return true
except Exception as e:
let errDesription = e.msg
error "error: ", methodName="mailserver-dialPeer", errDesription
return false
proc generateSymKeyFromPassword*(self: MailserverController): string =
try:
let response = status_general.generateSymKeyFromPassword(STATUS_MAILSERVER_PASS)
let resultAsString = $response.result
return resultAsString.strip(chars = {'"'})
except Exception as e:
let errDesription = e.msg
error "error: ", methodName="mailserver-dialPeer", errDesription
proc setMailserver*(self: MailserverController, peer: string) =
try:
discard status_mailservers.setMailserver(peer)
except Exception as e:
let errDesription = e.msg
error "error: ", methodName="mailserver-setMailserver", errDesription
proc update*(self: MailserverController, peer: string) =
try:
discard status_mailservers.update(peer)
except Exception as e:
let errDesription = e.msg
error "error: ", methodName="mailserver-update", errDesription
proc requestAllHistoricMessages*(self: MailserverController) = proc requestAllHistoricMessages*(self: MailserverController) =
try: try:
discard status_mailservers.requestAllHistoricMessages() discard status_mailservers.requestAllHistoricMessages()
@ -151,23 +81,6 @@ QtObject:
let errDesription = e.msg let errDesription = e.msg
error "error: ", methodName="mailserver-requestAllHistoricMessages", errDesription error "error: ", methodName="mailserver-requestAllHistoricMessages", errDesription
proc requestStoreMessages*(self: MailserverController, topics: seq[string], symKeyID: string, peer: string,
numberOfMessages: int, fromTimestamp: int64, toTimestamp: int64, force: bool) =
try:
var toValue = toTimestamp
if toValue <= 0:
toValue = times.toUnix(times.getTime())
var fromValue = fromTimestamp
if fromValue <= 0:
fromValue = toValue - 86400
discard status_mailservers.requestStoreMessages(topics, STATUS_STORE_MESSAGES_TIMEOUT, symKeyID, peer,
numberOfMessages, fromValue, toValue, force)
except Exception as e:
let errDesription = e.msg
error "error: ", methodName="mailserver-requestStoreMessages", errDesription
proc syncChatFromSyncedFrom*(self: MailserverController, chatId: string) = proc syncChatFromSyncedFrom*(self: MailserverController, chatId: string) =
try: try:
discard status_mailservers.syncChatFromSyncedFrom(chatId) discard status_mailservers.syncChatFromSyncedFrom(chatId)
@ -182,24 +95,9 @@ QtObject:
let errDesription = e.msg let errDesription = e.msg
error "error: ", methodName="mailserver-fillGaps", errDesription error "error: ", methodName="mailserver-fillGaps", errDesription
proc ping*(self: MailserverController, addresses: seq[string], timeoutMs: int, isWakuV2: bool): JsonNode = proc disconnectActiveMailserver*(self: MailserverController) =
try: try:
let response = status_mailservers.ping(addresses, timeoutMs, isWakuV2) discard status_mailservers.disconnectActiveMailserver()
return response.result
except Exception as e: except Exception as e:
let errDesription = e.msg let errDesription = e.msg
error "error: ", methodName="mailserver-ping", errDesription error "error: ", methodName="mailserver-disconnectActiveMailserver", errDesription
proc dropPeerByID*(self: MailserverController, peer: string) =
try:
discard status_general.dropPeerByID(peer)
except Exception as e:
let errDesription = e.msg
error "error: ", methodName="mailserver-dropPeerByID", errDesription
proc removePeer*(self: MailserverController, peer: string) =
try:
discard status_general.removePeer(peer)
except Exception as e:
let errDesription = e.msg
error "error: ", methodName="mailserver-removePeer", errDesription

View File

@ -1,19 +0,0 @@
import NimQml, json_serialization
import ../../../eventemitter
type
MailserverEvents* = ref object
vptr: ByteAddress
MailserverArgs* = ref object of Args
peer*: string
const EVENTS_SLOT = "receiveEvent"
proc newMailserverEvents*(vptr: ByteAddress): MailserverEvents =
new(result)
result.vptr = vptr
proc emit*(self: MailserverEvents, event: string, arg: MailserverArgs) =
let payload: tuple[event: string, arg: MailserverArgs] = (event, arg)
signal_handler(cast[pointer](self.vptr), Json.encode(payload), EVENTS_SLOT)

View File

@ -1,234 +1,45 @@
import import chronos, chronicles
algorithm, chronos, chronicles, json, math, os, random, sequtils, sets,
tables, strutils
from times import cpuTime
import ../../../fleets/fleet_configuration
import ../../../../../app_service/service/node_configuration/service_interface as node_config_service import ../../../../../app_service/service/node_configuration/service_interface as node_config_service
import controller import controller
import events as mailserver_events
logScope: logScope:
topics = "mailserver model" topics = "mailserver model"
################################################################################
## ##
## NOTE: MailserverModel runs on a separate (long-running) thread ##
## ##
## How do mailservers work ? ##
## ##
## - We send a request to the mailserver, we are only interested in the ##
## messages since `last-request` up to the last seven days ##
## and the last 24 hours for topics that were just joined ##
## - The mailserver doesn't directly respond to the request and ##
## instead we start receiving messages in the filters for the requested ##
## topics. ##
## - If the mailserver was not ready when we tried for instance to request ##
## the history of a topic after joining a chat, the request will be done ##
## as soon as the mailserver becomes available ##
## ##
################################################################################
type type
MailserverModel* = ref object MailserverModel* = ref object
mailservers*: seq[string]
events*: MailserverEvents
nodes*: Table[string, MailserverStatus]
activeMailserver*: string
lastConnectionAttempt*: float
fleetConfiguration*: FleetConfiguration
wakuVersion*: int
mailserverController: MailserverController mailserverController: MailserverController
MailserverStatus* = enum
Unknown = -1,
Disconnected = 0,
Connecting = 1
Connected = 2,
proc peerIdFromMultiAddress(nodeAddr: string): string =
let multiAddressParts = nodeAddr.split("/")
return multiAddressParts[multiAddressParts.len - 1]
proc cmpMailserverReply(x, y: (string, int)): int =
if x[1] > y[1]: 1
elif x[1] == y[1]: 0
else: -1
proc poolSize(fleetSize: int): int = ceil(fleetSize / 4).int
proc newMailserverModel*(vptr: ByteAddress): MailserverModel = proc newMailserverModel*(vptr: ByteAddress): MailserverModel =
result = MailserverModel() result = MailserverModel()
result.events = newMailserverEvents(vptr)
result.nodes = initTable[string, MailserverStatus]()
result.activeMailserver = ""
result.mailserverController = cast[MailserverController](vptr) result.mailserverController = cast[MailserverController](vptr)
proc init*(self: MailserverModel) = proc disconnectActiveMailserver(self: MailserverModel) =
trace "MailserverModel::init()" try:
let fleets = warn "Disconnecting active mailserver due to error"
if defined(windows) and defined(production): self.mailserverController.disconnectActiveMailserver()
"/../resources/fleets.json" except Exception as e:
else: error "error: ", errDescription=e.msg
"/../fleets.json"
let fleetConfig = readFile(joinPath(getAppDir(), fleets))
self.fleetConfiguration = newFleetConfiguration(fleetConfig)
self.wakuVersion = self.mailserverController.getWakuVersion()
let fleet = parseEnum[Fleet](self.mailserverController.getFleet())
self.mailservers = toSeq(self.fleetConfiguration.getMailservers(fleet, self.wakuVersion == WAKU_VERSION_2).values)
let mailservers = self.mailserverController.getCurrentMailservers()
for m in mailservers:
self.mailservers.add(m["address"].getStr())
proc getActiveMailserver*(self: MailserverModel): string = self.activeMailserver
proc isActiveMailserverAvailable*(self: MailserverModel): bool =
if not self.nodes.hasKey(self.activeMailserver):
result = false
else:
result = self.nodes[self.activeMailserver] == MailserverStatus.Connected
proc connect(self: MailserverModel, nodeAddr: string) =
debug "Connecting to mailserver", nodeAddr
var connected = false
# TODO: this should come from settings
var knownMailservers = initHashSet[string]()
for m in self.mailservers:
knownMailservers.incl m
if not knownMailservers.contains(nodeAddr):
warn "Mailserver not known", nodeAddr
return
self.activeMailserver = if self.wakuVersion == WAKU_VERSION_2: peerIdFromMultiAddress(nodeAddr) else: nodeAddr
self.events.emit("mailserver:changed", MailserverArgs(peer: nodeAddr))
# Adding a peer and marking it as connected can't be executed sync in WakuV1, because
# There's a delay between requesting a peer being added, and a signal being
# received after the peer was added. So we first set the peer status as
# Connecting and once a peerConnected signal is received, we mark it as
# Connected
if self.nodes.hasKey(self.activeMailserver) and self.nodes[self.activeMailserver] == MailserverStatus.Connected:
connected = true
else:
# Attempt to connect to mailserver by adding it as a peer
if self.wakuVersion == WAKU_VERSION_2:
if self.mailserverController.dialPeer(nodeAddr): # WakuV2 dial is sync (should it be async?)
self.mailserverController.setMailserver(self.activeMailserver)
self.nodes[self.activeMailserver] = MailserverStatus.Connected
connected = true
else:
self.mailserverController.update(nodeAddr)
self.nodes[nodeAddr] = MailserverStatus.Connecting
self.lastConnectionAttempt = cpuTime()
if connected:
info "Mailserver available"
self.events.emit("mailserverAvailable", MailserverArgs())
proc peerSummaryChange*(self: MailserverModel, peers: seq[string]) =
# When a node is added as a peer, or disconnected
# a DiscoverySummary signal is emitted. In here we
# change the status of the nodes the app is connected to
# Connected / Disconnected and emit peerConnected / peerDisconnected
# events.
var mailserverAvailable = false
for knownPeer in self.nodes.keys:
if not peers.contains(knownPeer) and (self.nodes[knownPeer] == MailserverStatus.Connected or (self.nodes[knownPeer] == MailserverStatus.Connecting and (cpuTime() - self.lastConnectionAttempt) > 8)):
info "Peer disconnected", peer=knownPeer
self.nodes[knownPeer] = MailserverStatus.Disconnected
self.events.emit("peerDisconnected", MailserverArgs(peer: knownPeer))
if self.activeMailserver == knownPeer:
warn "Active mailserver disconnected!", peer = knownPeer
self.activeMailserver = ""
for peer in peers:
if self.nodes.hasKey(peer) and (self.nodes[peer] == MailserverStatus.Connected): continue
info "Peer connected", peer
self.nodes[peer] = MailserverStatus.Connected
self.events.emit("peerConnected", MailserverArgs(peer: peer))
if peer == self.activeMailserver:
if self.nodes.hasKey(self.activeMailserver):
if self.activeMailserver == peer:
mailserverAvailable = true
if mailserverAvailable:
info "Mailserver available"
self.events.emit("mailserverAvailable", MailserverArgs())
proc requestMessages*(self: MailserverModel) = proc requestMessages*(self: MailserverModel) =
info "Requesting messages from", mailserver=self.activeMailserver try:
info "Requesting message history"
self.mailserverController.requestAllHistoricMessages() self.mailserverController.requestAllHistoricMessages()
except Exception as e:
proc requestStoreMessages*(self: MailserverModel, topics: seq[string], fromValue: int64 = 0, toValue: int64 = 0, force: bool = false) = error "error: ", errDescription=e.msg
info "Requesting messages from", mailserver=self.activeMailserver self.disconnectActiveMailserver()
let generatedSymKey = self.mailserverController.generateSymKeyFromPassword()
self.mailserverController.requestStoreMessages(topics, generatedSymKey, self.activeMailserver, 1000, fromValue, toValue, force)
proc requestMoreMessages*(self: MailserverModel, chatId: string) = proc requestMoreMessages*(self: MailserverModel, chatId: string) =
info "Requesting more messages from", mailserver=self.activeMailserver, chatId=chatId try:
info "Requesting more messages for", chatId=chatId
self.mailserverController.syncChatFromSyncedFrom(chatId) self.mailserverController.syncChatFromSyncedFrom(chatId)
except Exception as e:
error "error: ", errDescription=e.msg
self.disconnectActiveMailserver()
proc fillGaps*(self: MailserverModel, chatId: string, messageIds: seq[string]) = proc fillGaps*(self: MailserverModel, chatId: string, messageIds: seq[string]) =
info "Requesting fill gaps from", mailserver=self.activeMailserver, chatId=chatId try:
info "Requesting fill gaps from", chatId=chatId
self.mailserverController.fillGaps(chatId, messageIds) self.mailserverController.fillGaps(chatId, messageIds)
except Exception as e:
proc findNewMailserver(self: MailserverModel) = error "error: ", errDescription=e.msg
warn "Finding a new mailserver...", wakuVersion=self.wakuVersion self.disconnectActiveMailserver()
let mailserversReply = self.mailserverController.ping(self.mailservers, 500, self.wakuVersion == WAKU_VERSION_2)
var availableMailservers:seq[(string, int)] = @[]
for reply in mailserversReply:
if(reply["error"].kind != JNull): continue # The results with error are ignored
availableMailservers.add((reply["address"].getStr, reply["rttMs"].getInt))
availableMailservers.sort(cmpMailserverReply)
# No mailservers where returned... do nothing.
if availableMailservers.len == 0:
warn "No mailservers available"
return
# Picks a random mailserver amongs the ones with the lowest latency
# The pool size is 1/4 of the mailservers were pinged successfully
randomize()
let mailServer = availableMailservers[rand(poolSize(availableMailservers.len - 1))][0]
self.connect(mailserver)
proc cycleMailservers(self: MailserverModel) =
warn "Automatically switching mailserver"
if self.activeMailserver != "":
info "Disconnecting active mailserver", peer=self.activeMailserver
self.nodes[self.activeMailserver] = MailserverStatus.Disconnected
if self.wakuVersion == WAKU_VERSION_2:
self.mailserverController.dropPeerByID(self.activeMailserver)
else:
self.mailserverController.removePeer(self.activeMailserver)
self.activeMailserver = ""
self.findNewMailserver()
proc checkConnection*(self: MailserverModel) {.async.} =
while true:
info "Verifying mailserver connection state..."
var pinnedMailserver = ""
if(not self.mailserverController.isNil):
pinnedMailserver = self.mailserverController.getPinnedMailserver()
if self.wakuVersion == WAKU_VERSION_1 and pinnedMailserver != "" and self.activeMailserver != pinnedMailserver:
# connect to current mailserver from the settings
self.mailservers.add(pinnedMailserver)
self.connect(pinnedMailserver)
else:
# or setup a random mailserver:
if not self.isActiveMailserverAvailable:
# TODO: have a timeout for reconnection before changing to a different server
self.cycleMailservers()
await sleepAsync(10.seconds)

View File

@ -85,12 +85,6 @@ proc processMessage(mailserverModel: MailserverModel, received: string) =
mailserverModel.requestMessages() mailserverModel.requestMessages()
taskArg.finish("") # TODO: taskArg.finish("") # TODO:
of "isActiveMailserverAvailable":
let
taskArg = decode[IsActiveMailserverAvailableTaskArg](received)
output = mailserverModel.isActiveMailserverAvailable()
taskArg.finish(output)
of "requestMessages": of "requestMessages":
let taskArg = decode[RequestMessagesTaskArg](received) let taskArg = decode[RequestMessagesTaskArg](received)
mailserverModel.requestMessages() mailserverModel.requestMessages()
@ -103,16 +97,6 @@ proc processMessage(mailserverModel: MailserverModel, received: string) =
let taskArg = decode[FillGapsTaskArg](received) let taskArg = decode[FillGapsTaskArg](received)
mailserverModel.fillGaps(taskArg.chatId, taskArg.messageIds) mailserverModel.fillGaps(taskArg.chatId, taskArg.messageIds)
of "getActiveMailserver":
let
taskArg = decode[GetActiveMailserverTaskArg](received)
output = mailserverModel.getActiveMailserver()
taskArg.finish(output)
of "peerSummaryChange":
let taskArg = decode[PeerSummaryChangeTaskArg](received)
mailserverModel.peerSummaryChange(taskArg.peers)
else: else:
error "unknown message", message=received error "unknown message", message=received
@ -140,9 +124,6 @@ proc worker(arg: WorkerThreadArg) {.async, gcsafe, nimcall.} =
else: else:
unprocessedMsgs.add received unprocessedMsgs.add received
mailserverModel.init()
discard mailserverModel.checkConnection()
for msg in unprocessedMsgs.items: for msg in unprocessedMsgs.items:
mailserverModel.processMessage(msg) mailserverModel.processMessage(msg)

View File

@ -126,6 +126,7 @@ var NODE_CONFIG* = %* {
} }
}, },
"ShhextConfig": { "ShhextConfig": {
"EnableMailserverCycle": true,
"BackupDisabledDataDir": "./", "BackupDisabledDataDir": "./",
"DataSyncEnabled": true, "DataSyncEnabled": true,
"InstallationID": "aef27732-8d86-5039-a32e-bdbe094d8791", "InstallationID": "aef27732-8d86-5039-a32e-bdbe094d8791",
@ -137,6 +138,12 @@ var NODE_CONFIG* = %* {
"VerifyTransactionChainID": 1, "VerifyTransactionChainID": 1,
"VerifyTransactionURL": "https://mainnet.infura.io/v3/" & INFURA_TOKEN_RESOLVED "VerifyTransactionURL": "https://mainnet.infura.io/v3/" & INFURA_TOKEN_RESOLVED
}, },
"Web3ProviderConfig": {
"Enabled": true
},
"EnsConfig": {
"Enabled": true
},
"StatusAccountsConfig": { "StatusAccountsConfig": {
"Enabled": true "Enabled": true
}, },

View File

@ -298,8 +298,26 @@ method login*(self: Service, account: AccountDto, password: string): string =
elif(img.imgType == "large"): elif(img.imgType == "large"):
largeImage = img.uri largeImage = img.uri
# This is moved from `status-lib` here
# TODO:
# If you added a new value in the nodeconfig in status-go, old accounts will not have this value, since the node config
# is stored in the database, and it's not easy to migrate using .sql
# While this is fixed, you can add here any missing attribute on the node config, and it will be merged with whatever
# the account has in the db
var nodeCfg = %* {
"ShhextConfig": %* {
"EnableMailserverCycle": true
},
"Web3ProviderConfig": %* {
"Enabled": true
},
"EnsConfig": %* {
"Enabled": true
},
}
let response = status_account.login(account.name, account.keyUid, hashedPassword, account.identicon, thumbnailImage, let response = status_account.login(account.name, account.keyUid, hashedPassword, account.identicon, thumbnailImage,
largeImage) largeImage, $nodeCfg)
var error = "response doesn't contain \"error\"" var error = "response doesn't contain \"error\""
if(response.result.contains("error")): if(response.result.contains("error")):

View File

@ -4,7 +4,6 @@ import ./dto/mailserver as mailserver_dto
import ../../../app/core/signals/types import ../../../app/core/signals/types
import ../../../app/core/fleets/fleet_configuration import ../../../app/core/fleets/fleet_configuration
import ../../../app/core/[main] import ../../../app/core/[main]
import ../../../app/core/tasks/marathon/mailserver/events
import ../../../app/core/tasks/marathon/mailserver/worker import ../../../app/core/tasks/marathon/mailserver/worker
import ../settings/service_interface as settings_service import ../settings/service_interface as settings_service
import ../node_configuration/service_interface as node_configuration_service import ../node_configuration/service_interface as node_configuration_service
@ -54,11 +53,11 @@ QtObject:
self.fetchMailservers() self.fetchMailservers()
proc doConnect(self: Service) = proc doConnect(self: Service) =
self.events.on("mailserver:changed") do(e: Args): self.events.on(SignalType.MailserverChanged.event) do(e: Args):
let receivedData = MailserverArgs(e) let receivedData = MailserverChangedSignal(e)
let peer = receivedData.peer let address = receivedData.address
info "mailserver changed to ", peer info "active mailserver changed", node=address
let data = ActiveMailserverChangedArgs(nodeAddress: peer) let data = ActiveMailserverChangedArgs(nodeAddress: address)
self.events.emit(SIGNAL_ACTIVE_MAILSERVER_CHANGED, data) self.events.emit(SIGNAL_ACTIVE_MAILSERVER_CHANGED, data)
self.events.on(SignalType.HistoryRequestStarted.event) do(e: Args): self.events.on(SignalType.HistoryRequestStarted.event) do(e: Args):
@ -124,13 +123,14 @@ QtObject:
let fleet = self.settingsService.getFleet() let fleet = self.settingsService.getFleet()
discard self.settingsService.unpinMailserver(fleet) discard self.settingsService.unpinMailserver(fleet)
else: else:
let mailserverWorker = self.marathon[MailserverWorker().name] discard # TODO: handle pin mailservers in status-go (in progress)
let task = GetActiveMailserverTaskArg( #let mailserverWorker = self.marathon[MailserverWorker().name]
`method`: "getActiveMailserver", #let task = GetActiveMailserverTaskArg(
vptr: cast[ByteAddress](self.vptr), # `method`: "getActiveMailserver",
slot: "onActiveMailserverResult" # vptr: cast[ByteAddress](self.vptr),
) # slot: "onActiveMailserverResult"
mailserverWorker.start(task) # )
#mailserverWorker.start(task)
proc onActiveMailserverResult*(self: Service, response: string) {.slot.} = proc onActiveMailserverResult*(self: Service, response: string) {.slot.} =
let fleet = self.settingsService.getFleet() let fleet = self.settingsService.getFleet()

View File

@ -87,6 +87,7 @@ type
ShhextConfig* = object ShhextConfig* = object
PFSEnabled*: bool PFSEnabled*: bool
EnableMailserverCycle*: bool
BackupDisabledDataDir*: string BackupDisabledDataDir*: string
InstallationID*: string InstallationID*: string
MailServerConfirmations*: bool MailServerConfirmations*: bool
@ -128,6 +129,12 @@ type
MailserversConfig* = object MailserversConfig* = object
Enabled*: bool Enabled*: bool
Web3ProviderConfig* = object
Enabled*: bool
EnsConfig* = object
Enabled*: bool
SwarmConfig* = object SwarmConfig* = object
Enabled*: bool Enabled*: bool
@ -191,6 +198,8 @@ type
BrowsersConfig*: BrowsersConfig BrowsersConfig*: BrowsersConfig
PermissionsConfig*: PermissionsConfig PermissionsConfig*: PermissionsConfig
MailserversConfig*: MailserversConfig MailserversConfig*: MailserversConfig
Web3ProviderConfig*: Web3ProviderConfig
EnsConfig*: EnsConfig
SwarmConfig*: SwarmConfig SwarmConfig*: SwarmConfig
RegisterTopics*: seq[string] RegisterTopics*: seq[string]
RequireTopics*: RequireTopics RequireTopics*: RequireTopics
@ -323,6 +332,7 @@ proc toWakuConfig*(jsonObj: JsonNode): WakuConfig =
proc toShhextConfig*(jsonObj: JsonNode): ShhextConfig = proc toShhextConfig*(jsonObj: JsonNode): ShhextConfig =
discard jsonObj.getProp("PFSEnabled", result.PFSEnabled) discard jsonObj.getProp("PFSEnabled", result.PFSEnabled)
discard jsonObj.getProp("EnableMailserverCycle", result.EnableMailserverCycle)
discard jsonObj.getProp("BackupDisabledDataDir", result.BackupDisabledDataDir) discard jsonObj.getProp("BackupDisabledDataDir", result.BackupDisabledDataDir)
discard jsonObj.getProp("InstallationID", result.InstallationID) discard jsonObj.getProp("InstallationID", result.InstallationID)
discard jsonObj.getProp("MailServerConfirmations", result.MailServerConfirmations) discard jsonObj.getProp("MailServerConfirmations", result.MailServerConfirmations)
@ -369,6 +379,12 @@ proc toPermissionsConfig*(jsonObj: JsonNode): PermissionsConfig =
proc toMailserversConfig*(jsonObj: JsonNode): MailserversConfig = proc toMailserversConfig*(jsonObj: JsonNode): MailserversConfig =
discard jsonObj.getProp("Enabled", result.Enabled) discard jsonObj.getProp("Enabled", result.Enabled)
proc toWeb3ProviderConfig*(jsonObj: JsonNode): Web3ProviderConfig =
discard jsonObj.getProp("Enabled", result.Enabled)
proc toEnsConfig*(jsonObj: JsonNode): EnsConfig =
discard jsonObj.getProp("Enabled", result.Enabled)
proc toSwarmConfig*(jsonObj: JsonNode): SwarmConfig = proc toSwarmConfig*(jsonObj: JsonNode): SwarmConfig =
discard jsonObj.getProp("Enabled", result.Enabled) discard jsonObj.getProp("Enabled", result.Enabled)
@ -485,6 +501,14 @@ proc toNodeConfigDto*(jsonObj: JsonNode): NodeConfigDto =
if(jsonObj.getProp("MailserversConfig", mailserversConfigObj)): if(jsonObj.getProp("MailserversConfig", mailserversConfigObj)):
result.MailserversConfig = toMailserversConfig(mailserversConfigObj) result.MailserversConfig = toMailserversConfig(mailserversConfigObj)
var web3ProviderConfig: JsonNode
if(jsonObj.getProp("Web3ProviderConfig", web3ProviderConfig)):
result.Web3ProviderConfig = toWeb3ProviderConfig(web3ProviderConfig)
var ensConfig: JsonNode
if(jsonObj.getProp("EnsConfig", ensConfig)):
result.EnsConfig = toEnsConfig(ensConfig)
var swarmConfigObj: JsonNode var swarmConfigObj: JsonNode
if(jsonObj.getProp("SwarmConfig", swarmConfigObj)): if(jsonObj.getProp("SwarmConfig", swarmConfigObj)):
result.SwarmConfig = toSwarmConfig(swarmConfigObj) result.SwarmConfig = toSwarmConfig(swarmConfigObj)