Rotate mailservers when not connected and load old messages
This commit is contained in:
parent
61b70fb969
commit
24c31e04f6
|
@ -29,6 +29,11 @@ proc delete*(self: ChatController) =
|
|||
delete self.variant
|
||||
|
||||
proc handleChatEvents(self: ChatController) =
|
||||
self.status.events.on("messagesLoaded") do(e:Args):
|
||||
for message in MsgsLoadedArgs(e).messages:
|
||||
self.view.pushMessage(message.chatId, message.toChatMessage())
|
||||
|
||||
|
||||
self.status.events.on("messageSent") do(e: Args):
|
||||
var sentMessage = MsgArgs(e)
|
||||
var chatMessage = sentMessage.payload.toChatMessage()
|
||||
|
@ -40,6 +45,7 @@ proc handleChatEvents(self: ChatController) =
|
|||
var channelMessage = ChannelArgs(e)
|
||||
let chatItem = newChatItem(id = channelMessage.channel, channelMessage.chatTypeInt)
|
||||
discard self.view.chats.addChatItemToList(chatItem)
|
||||
self.status.chat.chatMessages(channelMessage.channel)
|
||||
|
||||
self.status.events.on("channelLeft") do(e: Args):
|
||||
discard self.view.chats.removeChatItemFromList(self.view.activeChannel.chatItem.id)
|
||||
|
|
|
@ -49,7 +49,7 @@ QtObject:
|
|||
let signalString = $jsonSignal["type"].getStr
|
||||
|
||||
trace "Raw signal data", data = $jsonSignal
|
||||
|
||||
|
||||
var signalType: SignalType
|
||||
|
||||
try:
|
||||
|
|
|
@ -27,6 +27,9 @@ type ChatArgs* = ref object of Args
|
|||
type TopicArgs* = ref object of Args
|
||||
topics*: seq[string]
|
||||
|
||||
type MsgsLoadedArgs* = ref object of Args
|
||||
messages*: seq[Message]
|
||||
|
||||
type
|
||||
ChatModel* = ref object
|
||||
events*: EventEmitter
|
||||
|
@ -117,3 +120,7 @@ proc sendMessage*(self: ChatModel, chatId: string, msg: string): string =
|
|||
var parsedMessage = parseJson(sentMessage)["result"]["chats"][0]["lastMessage"]
|
||||
self.events.emit("messageSent", MsgArgs(message: msg, chatId: chatId, payload: parsedMessage))
|
||||
sentMessage
|
||||
|
||||
proc chatMessages*(self: ChatModel, chatId: string) =
|
||||
let msgs = status_chat.chatMessages(chatId)
|
||||
self.events.emit("messagesLoaded", MsgsLoadedArgs(messages: msgs))
|
||||
|
|
|
@ -71,8 +71,12 @@ proc loadChats*(): seq[Chat] =
|
|||
if chat.active and chat.chatType != ChatType.Unknown:
|
||||
result.add(jsonChat.toChat)
|
||||
|
||||
proc chatMessages*(chatId: string) =
|
||||
discard callPrivateRPC("chatMessages".prefix, %* [chatId, nil, 20])
|
||||
proc chatMessages*(chatId: string): seq[Message] =
|
||||
result = @[]
|
||||
let rpcResult = parseJson(callPrivateRPC("chatMessages".prefix, %* [chatId, nil, 5]))["result"]
|
||||
if rpcResult["messages"].kind != JNull:
|
||||
for jsonMsg in rpcResult["messages"]:
|
||||
result.add(jsonMsg.toMessage)
|
||||
|
||||
# TODO this probably belongs in another file
|
||||
proc generateSymKeyFromPassword*(): string =
|
||||
|
|
|
@ -36,7 +36,10 @@ proc startMessenger*() =
|
|||
discard callPrivateRPC("startMessenger".prefix)
|
||||
|
||||
proc addPeer*(peer: string) =
|
||||
discard libstatus.addPeer(peer)
|
||||
discard callPrivateRPC("admin_addPeer", %* [peer])
|
||||
|
||||
proc removePeer*(peer: string) =
|
||||
echo "TODO: removePeer"
|
||||
|
||||
proc markTrustedPeer*(peer: string) =
|
||||
discard callPrivateRPC("markTrustedPeer".prefix(false), %* [peer])
|
||||
|
|
|
@ -53,5 +53,8 @@ proc ping*(timeoutMs: int): string =
|
|||
}
|
||||
])
|
||||
|
||||
proc update*(peer: string): string =
|
||||
result = callPrivateRPC("updateMailservers".prefix, %* [[peer]])
|
||||
proc update*(peer: string) =
|
||||
discard callPrivateRPC("updateMailservers".prefix, %* [[peer]])
|
||||
|
||||
proc delete*(peer: string) =
|
||||
discard callPrivateRPC("mailservers_deleteMailserver".prefix, %* [[peer]])
|
|
@ -1,4 +1,4 @@
|
|||
import algorithm, json, random, math
|
||||
import algorithm, json, random, math, os
|
||||
import libstatus/core as status_core
|
||||
import libstatus/chat as status_chat
|
||||
import libstatus/mailservers as status_mailservers
|
||||
|
@ -7,7 +7,7 @@ import sets
|
|||
import chronicles
|
||||
import eventemitter
|
||||
import sequtils
|
||||
|
||||
import locks
|
||||
|
||||
logScope:
|
||||
topics = "mailserver-model"
|
||||
|
@ -28,7 +28,8 @@ type
|
|||
nodes*: Table[string, MailserverStatus]
|
||||
selectedMailserver*: string
|
||||
topics*: HashSet[string]
|
||||
|
||||
connThread*: Thread[ptr MailserverModel]
|
||||
lock*: Lock
|
||||
|
||||
proc cmpMailserverReply(x, y: (string, int)): int =
|
||||
if x[1] > y[1]: 1
|
||||
|
@ -43,6 +44,7 @@ proc newMailserverModel*(events: EventEmitter): MailserverModel =
|
|||
result.nodes = initTable[string, MailserverStatus]()
|
||||
result.selectedMailserver = ""
|
||||
result.topics = initHashSet[string]()
|
||||
result.lock.initLock()
|
||||
|
||||
proc addTopics*(self: MailserverModel, topics: seq[string]) =
|
||||
for t in topics: self.topics.incl(t)
|
||||
|
@ -61,24 +63,43 @@ proc selectedServerStatus*(self: MailserverModel): MailserverStatus =
|
|||
proc isSelectedMailserverAvailable*(self:MailserverModel): bool =
|
||||
self.nodes[self.selectedMailserver] == MailserverStatus.Trusted
|
||||
|
||||
proc addPeer(self:MailserverModel, enode: string) =
|
||||
addPeer(enode)
|
||||
update(enode)
|
||||
|
||||
proc removePeer(self:MailserverModel, enode: string) =
|
||||
removePeer(enode)
|
||||
delete(enode)
|
||||
|
||||
proc connect*(self: MailserverModel, enode: string) =
|
||||
debug "Connecting to mailserver", enode
|
||||
|
||||
# TODO: this should come from settings
|
||||
var knownMailservers = initHashSet[string]()
|
||||
for m in getMailservers():
|
||||
knownMailservers.incl m[1]
|
||||
if not knownMailservers.contains(enode):
|
||||
warn "Mailserver not known", enode
|
||||
return
|
||||
|
||||
self.selectedMailserver = enode
|
||||
if self.nodes.hasKey(enode):
|
||||
if self.nodes[enode] == MailserverStatus.Connected:
|
||||
self.trustPeer(enode)
|
||||
else:
|
||||
self.nodes[enode] = MailserverStatus.Connecting
|
||||
addPeer(enode)
|
||||
# TODO: check if connection is made after a connection timeout?
|
||||
echo status_mailservers.update(enode)
|
||||
self.addPeer(enode)
|
||||
|
||||
# TODO: check if connection is made after a connection timeout?
|
||||
status_mailservers.update(enode)
|
||||
|
||||
proc peerSummaryChange*(self: MailserverModel, peers: seq[string]) =
|
||||
# TODO: check if peer received is a mailserver from the list before doing any operation
|
||||
|
||||
for p in peers:
|
||||
debug "Peer summary", p
|
||||
for peer in self.nodes.keys:
|
||||
if not peers.contains(peer):
|
||||
self.nodes[peer] = MailserverStatus.Disconnected
|
||||
warn "Peer disconnected", peer
|
||||
self.events.emit("peerDisconnected", MailserverArg(peer: peer))
|
||||
# TODO: reconnect peer up to N times on 'peerDisconnected'
|
||||
|
||||
|
@ -93,33 +114,48 @@ proc peerSummaryChange*(self: MailserverModel, peers: seq[string]) =
|
|||
self.nodes[peer] = MailserverStatus.Connected
|
||||
self.events.emit("peerConnected", MailserverArg(peer: peer))
|
||||
|
||||
proc requestMessages*(self: MailserverModel) =
|
||||
debug "Requesting messages from", mailserver=self.selectedMailserver
|
||||
let generatedSymKey = status_chat.generateSymKeyFromPassword()
|
||||
status_chat.requestMessages(toSeq(self.topics), generatedSymKey, self.selectedMailserver, 1000)
|
||||
|
||||
proc init*(self: MailserverModel) =
|
||||
self.events.on("peerConnected") do(e: Args):
|
||||
let arg = MailserverArg(e)
|
||||
self.trustPeer(arg.peer)
|
||||
|
||||
|
||||
#TODO: connect to current mailserver from the settings
|
||||
|
||||
# or setup a random one:
|
||||
proc autoConnect(self: MailserverModel) =
|
||||
let mailserversReply = parseJson(status_mailservers.ping(500))["result"]
|
||||
var availableMailservers:seq[(string, int)] = @[]
|
||||
|
||||
for reply in mailserversReply:
|
||||
if(reply["error"].kind != JNull): continue # The results with error are ignored
|
||||
availableMailservers.add((reply["address"].getStr, reply["rttMs"].getInt))
|
||||
|
||||
availableMailservers.sort(cmpMailserverReply)
|
||||
|
||||
# Picks a random mailserver amongs the ones with the lowest latency
|
||||
# The pool size is 1/4 of the mailservers were pinged successfully
|
||||
randomize()
|
||||
let mailServer = availableMailservers[rand(poolSize(availableMailservers.len))][0]
|
||||
self.connect(mailserver)
|
||||
|
||||
proc requestMessages*(self: MailserverModel) =
|
||||
debug "Requesting messages from", mailserver=self.selectedMailserver
|
||||
let generatedSymKey = status_chat.generateSymKeyFromPassword()
|
||||
status_chat.requestMessages(toSeq(self.topics), generatedSymKey, self.selectedMailserver, 1000)
|
||||
proc changeMailserver*(self: MailserverModel) =
|
||||
warn "Automatically switching mailserver"
|
||||
self.nodes[self.selectedMailserver] = MailserverStatus.Disconnected
|
||||
self.removePeer(self.selectedMailserver)
|
||||
self.selectedMailserver = ""
|
||||
self.autoConnect()
|
||||
|
||||
proc checkConnection*(mailserverPtr: ptr MailserverModel) {.thread.} =
|
||||
let sleepDuration = 10000
|
||||
while true:
|
||||
{.gcsafe.}:
|
||||
withLock mailserverPtr[].lock:
|
||||
sleep(sleepDuration)
|
||||
# TODO: have a timeout for reconnection before changing to a different server
|
||||
if not mailserverPtr[].isSelectedMailserverAvailable:
|
||||
mailserverPtr[].changeMailserver()
|
||||
|
||||
proc init*(self: MailserverModel) =
|
||||
self.events.on("peerDisconnected") do(e: Args): self.connect(MailserverArg(e).peer)
|
||||
self.events.on("peerConnected") do(e: Args): self.trustPeer(MailserverArg(e).peer)
|
||||
|
||||
self.connThread.createThread(checkConnection, self.unsafeAddr)
|
||||
|
||||
#TODO: connect to current mailserver from the settings
|
||||
# or setup a random one:
|
||||
self.autoConnect()
|
||||
|
||||
|
|
Loading…
Reference in New Issue