diff --git a/src/app/chat/event_handling.nim b/src/app/chat/event_handling.nim index 314af2b0ed..5cc91a7c5a 100644 --- a/src/app/chat/event_handling.nim +++ b/src/app/chat/event_handling.nim @@ -91,7 +91,7 @@ proc handleMailserverEvents(self: ChatController) = topic.lastRequest = times.toUnix(times.getTime()) self.status.mailservers.addMailserverTopic(topic) - if(self.status.mailservers.isSelectedMailserverAvailable): + if(self.status.mailservers.isActiveMailserverAvailable): self.status.mailservers.requestMessages(topics.map(t => t.topic)) self.status.events.on("mailserverAvailable") do(e:Args): diff --git a/src/status/mailservers.nim b/src/status/mailservers.nim index c03f09118f..dbbb838f77 100644 --- a/src/status/mailservers.nim +++ b/src/status/mailservers.nim @@ -1,4 +1,4 @@ -import algorithm, json, random, math, os, tables, sets, chronicles, sequtils, locks, sugar +import algorithm, json, random, math, os, tables, sets, chronicles, sequtils, locks, sugar, times import libstatus/core as status_core import libstatus/chat as status_chat import libstatus/mailservers as status_mailservers @@ -9,17 +9,21 @@ import ../eventemitter # # - We send a request to the mailserver, we are only interested in the # messages since `last-request` up to the last seven days -# and the last 24 hours for topics that were just joined TODO: +# and the last 24 hours for topics that were just joined # - The mailserver doesn't directly respond to the request and # instead we start receiving messages in the filters for the requested # topics. # - If the mailserver was not ready when we tried for instance to request # the history of a topic after joining a chat, the request will be done -# as soon as the mailserver becomes available TODO: +# as soon as the mailserver becomes available logScope: topics = "mailserver-model" +var nodesLock: Lock +var activeMailserverLock: Lock + + type MailserverArg* = ref object of Args peer*: string @@ -34,50 +38,58 @@ type MailserverModel* = ref object events*: EventEmitter nodes*: Table[string, MailserverStatus] - selectedMailserver*: string + activeMailserver*: string topics*: HashSet[string] - connThread*: Thread[ptr MailserverModel] - lock*: Lock + lastConnectionAttempt*: float + proc cmpMailserverReply(x, y: (string, int)): int = if x[1] > y[1]: 1 elif x[1] == y[1]: 0 else: -1 + proc poolSize(fleetSize: int): int = ceil(fleetSize / 4).int + +var mailserverModel: MailServerModel +var modelLock: Lock +var connThread: Thread[void] + + proc newMailserverModel*(events: EventEmitter): MailserverModel = result = MailserverModel() result.events = events result.nodes = initTable[string, MailserverStatus]() - result.selectedMailserver = "" - result.lock.initLock() + result.activeMailserver = "" + + mailserverModel = result -proc trustPeer*(self: MailserverModel, enode:string) = + modelLock.initLock() + nodesLock.initLock() + activeMailserverLock.initLock() + + +proc trustPeer(self: MailserverModel, enode:string) = markTrustedPeer(enode) self.nodes[enode] = MailserverStatus.Trusted - if self.selectedMailserver == enode: - debug "Mailserver available", enode - self.events.emit("mailserverAvailable", Args()) -proc selectedServerStatus*(self: MailserverModel): MailserverStatus = - if self.selectedMailserver == "": MailserverStatus.Unknown - else: self.nodes[self.selectedMailserver] -proc isSelectedMailserverAvailable*(self:MailserverModel): bool = - if not self.nodes.hasKey(self.selectedMailserver): return false - self.nodes[self.selectedMailserver] == MailserverStatus.Trusted +proc isActiveMailserverAvailable*(self:MailserverModel): bool = + activeMailserverLock.acquire() + nodesLock.acquire() -proc addPeer(self:MailserverModel, enode: string) = - addPeer(enode) - update(enode) + if not self.nodes.hasKey(self.activeMailserver): + result = false + else: + result = self.nodes[self.activeMailserver] == MailserverStatus.Trusted -proc removePeer(self:MailserverModel, enode: string) = - removePeer(enode) - delete(enode) + nodesLock.release() + activeMailserverLock.release() -proc connect*(self: MailserverModel, enode: string) = - debug "Connecting to mailserver", enode + +proc connect(self: MailserverModel, enode: string) = + debug "Connecting to mailserver", enode=enode.substr[enode.len-40..enode.len-1] # TODO: this should come from settings var knownMailservers = initHashSet[string]() @@ -87,7 +99,10 @@ proc connect*(self: MailserverModel, enode: string) = warn "Mailserver not known", enode return - self.selectedMailserver = enode + activeMailserverLock.acquire() + nodesLock.acquire() + + self.activeMailserver = enode # Adding a peer and marking it as trusted can't be executed sync, because # There's a delay between requesting a peer being added, and a signal being @@ -95,16 +110,16 @@ proc connect*(self: MailserverModel, enode: string) = # Connecting and once a peerConnected signal is received, we mark it as # Connected and then as Trusted - if self.nodes.hasKey(enode) and self.nodes[enode] == MailserverStatus.Connecting: - if self.nodes[enode] == MailserverStatus.Connected: - self.trustPeer(enode) - else: - # Attempt to connect to mailserver by adding it as a peer - self.nodes[enode] = MailserverStatus.Connecting - self.addPeer(enode) - - # TODO: check if connection is made after a connection timeout? - status_mailservers.update(enode) + # Attempt to connect to mailserver by adding it as a peer + self.nodes[enode] = MailserverStatus.Connecting + addPeer(enode) + self.lastConnectionAttempt = cpuTime() + + nodesLock.release() + activeMailserverLock.release() + + + proc peerSummaryChange*(self: MailserverModel, peers: seq[string]) = # When a node is added as a peer, or disconnected @@ -112,27 +127,43 @@ proc peerSummaryChange*(self: MailserverModel, peers: seq[string]) = # change the status of the nodes the app is connected to # Connected / Disconnected and emit peerConnected / peerDisconnected # events. + var mailserverAvailable = false + withLock nodesLock: + for knownPeer in self.nodes.keys: + if not peers.contains(knownPeer) and self.nodes[knownPeer] != MailserverStatus.Disconnected: + debug "Peer disconnected", peer=knownPeer + self.nodes[knownPeer] = MailserverStatus.Disconnected + self.events.emit("peerDisconnected", MailserverArg(peer: knownPeer)) + withLock activeMailserverLock: + if self.activeMailserver == knownPeer: + warn "Active mailserver disconnected!", peer = knownPeer + self.activeMailserver = "" + + for peer in peers: + if self.nodes.hasKey(peer) and (self.nodes[peer] == MailserverStatus.Connected or self.nodes[peer] == MailserverStatus.Trusted): continue + debug "Peer connected", peer + self.nodes[peer] = MailserverStatus.Connected + self.events.emit("peerConnected", MailserverArg(peer: peer)) - for peer in self.nodes.keys: - if not peers.contains(peer): - self.nodes[peer] = MailserverStatus.Disconnected - self.events.emit("peerDisconnected", MailserverArg(peer: peer)) - # TODO: reconnect peer up to N times on 'peerDisconnected' - - var knownMailservers = initHashSet[string]() - for m in getMailservers(): - knownMailservers.incl m[1] + withLock activeMailserverLock: + if peer == self.activeMailserver: + if self.nodes.hasKey(self.activeMailserver): + self.trustPeer(peer) + if self.activeMailserver == peer: + mailserverAvailable = true + + status_mailservers.update(peer) + + if mailserverAvailable: + debug "Mailserver available" + self.events.emit("mailserverAvailable", Args()) - for peer in peers: - if not knownMailservers.contains(peer): continue - if self.nodes.hasKey(peer) and self.nodes[peer] == MailserverStatus.Trusted: continue - self.nodes[peer] = MailserverStatus.Connected - self.events.emit("peerConnected", MailserverArg(peer: peer)) proc requestMessages*(self: MailserverModel, topics: seq[string], fromValue: int64 = 0, toValue: int64 = 0, force: bool = false) = - debug "Requesting messages from", mailserver=self.selectedMailserver - let generatedSymKey = status_chat.generateSymKeyFromPassword() - status_mailservers.requestMessages(topics, generatedSymKey, self.selectedMailserver, 1000, fromValue, toValue, force) + withLock activeMailserverLock: + debug "Requesting messages from", mailserver=self.activeMailserver + let generatedSymKey = status_chat.generateSymKeyFromPassword() + status_mailservers.requestMessages(topics, generatedSymKey, self.activeMailserver, 1000, fromValue, toValue, force) proc getMailserverTopics*(self: MailserverModel): seq[MailserverTopic] = let response = status_mailservers.getMailserverTopics() @@ -148,6 +179,7 @@ proc getMailserverTopics*(self: MailserverModel): seq[MailserverTopic] = lastRequest: topic["last-request"].getInt )) + proc getMailserverTopicsByChatId*(self: MailserverModel, chatId: string): seq[MailServerTopic] = result = self.getMailserverTopics() .filter(topic => topic.chatIds.contains(chatId)) @@ -155,7 +187,10 @@ proc getMailserverTopicsByChatId*(self: MailserverModel, chatId: string): seq[Ma proc addMailserverTopic*(self: MailserverModel, topic: MailserverTopic) = discard status_mailservers.addMailserverTopic(topic) -proc autoConnect*(self: MailserverModel) = + +proc findNewMailserver(self: MailserverModel) = + warn "Finding a new mailserver..." + let mailserversReply = parseJson(status_mailservers.ping(500))["result"] var availableMailservers:seq[(string, int)] = @[] @@ -170,37 +205,39 @@ proc autoConnect*(self: MailserverModel) = # Picks a random mailserver amongs the ones with the lowest latency # The pool size is 1/4 of the mailservers were pinged successfully randomize() + let mailServer = availableMailservers[rand(poolSize(availableMailservers.len - 1))][0] + self.connect(mailserver) -proc changeMailserver*(self: MailserverModel) = - warn "Automatically switching mailserver" - if self.selectedMailserver != "": - self.nodes[self.selectedMailserver] = MailserverStatus.Disconnected - self.removePeer(self.selectedMailserver) - self.selectedMailserver = "" - self.autoConnect() -proc checkConnection*(mailserverPtr: ptr MailserverModel) {.thread.} = +proc cycleMailservers(self: MailserverModel) = + warn "Automatically switching mailserver" + withLock activeMailserverLock: + if self.activeMailserver != "": + warn "Disconnecting Actime Mailserver", peer=self.activeMailserver + withLock nodesLock: + self.nodes[self.activeMailserver] = MailserverStatus.Disconnected + removePeer(self.activeMailserver) + self.activeMailserver = "" + self.findNewMailserver() + +proc checkConnection() {.thread.} = {.gcsafe.}: #TODO: connect to current mailserver from the settings # or setup a random mailserver: let sleepDuration = 10000 while true: - withLock mailserverPtr[].lock: + debug "Verifying mailserver connection state..." + withLock modelLock: # TODO: have a timeout for reconnection before changing to a different server - if not mailserverPtr[].isSelectedMailserverAvailable: - mailserverPtr[].changeMailserver() - sleep(sleepDuration) + if not mailserverModel.isActiveMailserverAvailable: + mailserverModel.cycleMailservers() + sleep(sleepDuration) + proc init*(self: MailserverModel) = - # Reconnect to peer - # Might be a good idea to have a timeout / limit of max number of reconnect attempts? - self.events.on("peerDisconnected") do(e: Args): self.connect(MailserverArg(e).peer) - - # Peer was added. Mark it as trusted. - self.events.on("peerConnected") do(e: Args): self.trustPeer(MailserverArg(e).peer) - - self.connThread.createThread(checkConnection, self.unsafeAddr) + debug "MailserverModel::init()" + connThread.createThread(checkConnection) \ No newline at end of file