refactor: improve and simplify mailserver logic to make it thread safe

This commit is contained in:
Richard Ramos 2020-10-24 15:38:48 -04:00 committed by Iuri Matias
parent cc6fb42d14
commit 0d717d7919
2 changed files with 113 additions and 76 deletions

View File

@ -91,7 +91,7 @@ proc handleMailserverEvents(self: ChatController) =
topic.lastRequest = times.toUnix(times.getTime())
self.status.mailservers.addMailserverTopic(topic)
if(self.status.mailservers.isSelectedMailserverAvailable):
if(self.status.mailservers.isActiveMailserverAvailable):
self.status.mailservers.requestMessages(topics.map(t => t.topic))
self.status.events.on("mailserverAvailable") do(e:Args):

View File

@ -1,4 +1,4 @@
import algorithm, json, random, math, os, tables, sets, chronicles, sequtils, locks, sugar
import algorithm, json, random, math, os, tables, sets, chronicles, sequtils, locks, sugar, times
import libstatus/core as status_core
import libstatus/chat as status_chat
import libstatus/mailservers as status_mailservers
@ -9,17 +9,21 @@ import ../eventemitter
#
# - We send a request to the mailserver, we are only interested in the
# messages since `last-request` up to the last seven days
# and the last 24 hours for topics that were just joined TODO:
# and the last 24 hours for topics that were just joined
# - The mailserver doesn't directly respond to the request and
# instead we start receiving messages in the filters for the requested
# topics.
# - If the mailserver was not ready when we tried for instance to request
# the history of a topic after joining a chat, the request will be done
# as soon as the mailserver becomes available TODO:
# as soon as the mailserver becomes available
logScope:
topics = "mailserver-model"
var nodesLock: Lock
var activeMailserverLock: Lock
type
MailserverArg* = ref object of Args
peer*: string
@ -34,50 +38,58 @@ type
MailserverModel* = ref object
events*: EventEmitter
nodes*: Table[string, MailserverStatus]
selectedMailserver*: string
activeMailserver*: string
topics*: HashSet[string]
connThread*: Thread[ptr MailserverModel]
lock*: Lock
lastConnectionAttempt*: float
proc cmpMailserverReply(x, y: (string, int)): int =
if x[1] > y[1]: 1
elif x[1] == y[1]: 0
else: -1
proc poolSize(fleetSize: int): int = ceil(fleetSize / 4).int
var mailserverModel: MailServerModel
var modelLock: Lock
var connThread: Thread[void]
proc newMailserverModel*(events: EventEmitter): MailserverModel =
result = MailserverModel()
result.events = events
result.nodes = initTable[string, MailserverStatus]()
result.selectedMailserver = ""
result.lock.initLock()
result.activeMailserver = ""
proc trustPeer*(self: MailserverModel, enode:string) =
mailserverModel = result
modelLock.initLock()
nodesLock.initLock()
activeMailserverLock.initLock()
proc trustPeer(self: MailserverModel, enode:string) =
markTrustedPeer(enode)
self.nodes[enode] = MailserverStatus.Trusted
if self.selectedMailserver == enode:
debug "Mailserver available", enode
self.events.emit("mailserverAvailable", Args())
proc selectedServerStatus*(self: MailserverModel): MailserverStatus =
if self.selectedMailserver == "": MailserverStatus.Unknown
else: self.nodes[self.selectedMailserver]
proc isSelectedMailserverAvailable*(self:MailserverModel): bool =
if not self.nodes.hasKey(self.selectedMailserver): return false
self.nodes[self.selectedMailserver] == MailserverStatus.Trusted
proc isActiveMailserverAvailable*(self:MailserverModel): bool =
activeMailserverLock.acquire()
nodesLock.acquire()
proc addPeer(self:MailserverModel, enode: string) =
addPeer(enode)
update(enode)
if not self.nodes.hasKey(self.activeMailserver):
result = false
else:
result = self.nodes[self.activeMailserver] == MailserverStatus.Trusted
proc removePeer(self:MailserverModel, enode: string) =
removePeer(enode)
delete(enode)
nodesLock.release()
activeMailserverLock.release()
proc connect*(self: MailserverModel, enode: string) =
debug "Connecting to mailserver", enode
proc connect(self: MailserverModel, enode: string) =
debug "Connecting to mailserver", enode=enode.substr[enode.len-40..enode.len-1]
# TODO: this should come from settings
var knownMailservers = initHashSet[string]()
@ -87,7 +99,10 @@ proc connect*(self: MailserverModel, enode: string) =
warn "Mailserver not known", enode
return
self.selectedMailserver = enode
activeMailserverLock.acquire()
nodesLock.acquire()
self.activeMailserver = enode
# Adding a peer and marking it as trusted can't be executed sync, because
# There's a delay between requesting a peer being added, and a signal being
@ -95,16 +110,16 @@ proc connect*(self: MailserverModel, enode: string) =
# Connecting and once a peerConnected signal is received, we mark it as
# Connected and then as Trusted
if self.nodes.hasKey(enode) and self.nodes[enode] == MailserverStatus.Connecting:
if self.nodes[enode] == MailserverStatus.Connected:
self.trustPeer(enode)
else:
# Attempt to connect to mailserver by adding it as a peer
self.nodes[enode] = MailserverStatus.Connecting
self.addPeer(enode)
addPeer(enode)
self.lastConnectionAttempt = cpuTime()
nodesLock.release()
activeMailserverLock.release()
# TODO: check if connection is made after a connection timeout?
status_mailservers.update(enode)
proc peerSummaryChange*(self: MailserverModel, peers: seq[string]) =
# When a node is added as a peer, or disconnected
@ -112,27 +127,43 @@ proc peerSummaryChange*(self: MailserverModel, peers: seq[string]) =
# change the status of the nodes the app is connected to
# Connected / Disconnected and emit peerConnected / peerDisconnected
# events.
for peer in self.nodes.keys:
if not peers.contains(peer):
self.nodes[peer] = MailserverStatus.Disconnected
self.events.emit("peerDisconnected", MailserverArg(peer: peer))
# TODO: reconnect peer up to N times on 'peerDisconnected'
var knownMailservers = initHashSet[string]()
for m in getMailservers():
knownMailservers.incl m[1]
var mailserverAvailable = false
withLock nodesLock:
for knownPeer in self.nodes.keys:
if not peers.contains(knownPeer) and self.nodes[knownPeer] != MailserverStatus.Disconnected:
debug "Peer disconnected", peer=knownPeer
self.nodes[knownPeer] = MailserverStatus.Disconnected
self.events.emit("peerDisconnected", MailserverArg(peer: knownPeer))
withLock activeMailserverLock:
if self.activeMailserver == knownPeer:
warn "Active mailserver disconnected!", peer = knownPeer
self.activeMailserver = ""
for peer in peers:
if not knownMailservers.contains(peer): continue
if self.nodes.hasKey(peer) and self.nodes[peer] == MailserverStatus.Trusted: continue
if self.nodes.hasKey(peer) and (self.nodes[peer] == MailserverStatus.Connected or self.nodes[peer] == MailserverStatus.Trusted): continue
debug "Peer connected", peer
self.nodes[peer] = MailserverStatus.Connected
self.events.emit("peerConnected", MailserverArg(peer: peer))
withLock activeMailserverLock:
if peer == self.activeMailserver:
if self.nodes.hasKey(self.activeMailserver):
self.trustPeer(peer)
if self.activeMailserver == peer:
mailserverAvailable = true
status_mailservers.update(peer)
if mailserverAvailable:
debug "Mailserver available"
self.events.emit("mailserverAvailable", Args())
proc requestMessages*(self: MailserverModel, topics: seq[string], fromValue: int64 = 0, toValue: int64 = 0, force: bool = false) =
debug "Requesting messages from", mailserver=self.selectedMailserver
withLock activeMailserverLock:
debug "Requesting messages from", mailserver=self.activeMailserver
let generatedSymKey = status_chat.generateSymKeyFromPassword()
status_mailservers.requestMessages(topics, generatedSymKey, self.selectedMailserver, 1000, fromValue, toValue, force)
status_mailservers.requestMessages(topics, generatedSymKey, self.activeMailserver, 1000, fromValue, toValue, force)
proc getMailserverTopics*(self: MailserverModel): seq[MailserverTopic] =
let response = status_mailservers.getMailserverTopics()
@ -148,6 +179,7 @@ proc getMailserverTopics*(self: MailserverModel): seq[MailserverTopic] =
lastRequest: topic["last-request"].getInt
))
proc getMailserverTopicsByChatId*(self: MailserverModel, chatId: string): seq[MailServerTopic] =
result = self.getMailserverTopics()
.filter(topic => topic.chatIds.contains(chatId))
@ -155,7 +187,10 @@ proc getMailserverTopicsByChatId*(self: MailserverModel, chatId: string): seq[Ma
proc addMailserverTopic*(self: MailserverModel, topic: MailserverTopic) =
discard status_mailservers.addMailserverTopic(topic)
proc autoConnect*(self: MailserverModel) =
proc findNewMailserver(self: MailserverModel) =
warn "Finding a new mailserver..."
let mailserversReply = parseJson(status_mailservers.ping(500))["result"]
var availableMailservers:seq[(string, int)] = @[]
@ -170,37 +205,39 @@ proc autoConnect*(self: MailserverModel) =
# Picks a random mailserver amongs the ones with the lowest latency
# The pool size is 1/4 of the mailservers were pinged successfully
randomize()
let mailServer = availableMailservers[rand(poolSize(availableMailservers.len - 1))][0]
self.connect(mailserver)
proc changeMailserver*(self: MailserverModel) =
warn "Automatically switching mailserver"
if self.selectedMailserver != "":
self.nodes[self.selectedMailserver] = MailserverStatus.Disconnected
self.removePeer(self.selectedMailserver)
self.selectedMailserver = ""
self.autoConnect()
proc checkConnection*(mailserverPtr: ptr MailserverModel) {.thread.} =
proc cycleMailservers(self: MailserverModel) =
warn "Automatically switching mailserver"
withLock activeMailserverLock:
if self.activeMailserver != "":
warn "Disconnecting Actime Mailserver", peer=self.activeMailserver
withLock nodesLock:
self.nodes[self.activeMailserver] = MailserverStatus.Disconnected
removePeer(self.activeMailserver)
self.activeMailserver = ""
self.findNewMailserver()
proc checkConnection() {.thread.} =
{.gcsafe.}:
#TODO: connect to current mailserver from the settings
# or setup a random mailserver:
let sleepDuration = 10000
while true:
withLock mailserverPtr[].lock:
debug "Verifying mailserver connection state..."
withLock modelLock:
# TODO: have a timeout for reconnection before changing to a different server
if not mailserverPtr[].isSelectedMailserverAvailable:
mailserverPtr[].changeMailserver()
if not mailserverModel.isActiveMailserverAvailable:
mailserverModel.cycleMailservers()
sleep(sleepDuration)
proc init*(self: MailserverModel) =
# Reconnect to peer
# Might be a good idea to have a timeout / limit of max number of reconnect attempts?
self.events.on("peerDisconnected") do(e: Args): self.connect(MailserverArg(e).peer)
# Peer was added. Mark it as trusted.
self.events.on("peerConnected") do(e: Args): self.trustPeer(MailserverArg(e).peer)
self.connThread.createThread(checkConnection, self.unsafeAddr)
debug "MailserverModel::init()"
connThread.createThread(checkConnection)