Handling peer summary changes (required for mailservers)
This commit is contained in:
parent
366d50bb74
commit
886f58491d
|
@ -1,5 +1,8 @@
|
|||
import NimQml
|
||||
import json, eventemitter, chronicles
|
||||
import ../../status/chat as chat_model
|
||||
import ../../status/mailservers as mailserver_model
|
||||
import ../../signals/types
|
||||
import ../../status/libstatus/types as status_types
|
||||
import ../../signals/types
|
||||
import ../../status/chat
|
||||
|
@ -25,7 +28,7 @@ proc delete*(self: ChatController) =
|
|||
delete self.view
|
||||
delete self.variant
|
||||
|
||||
proc init*(self: ChatController) =
|
||||
proc handleChatEvents(self: ChatController) =
|
||||
self.status.events.on("messageSent") do(e: Args):
|
||||
var sentMessage = MsgArgs(e)
|
||||
var chatMessage = sentMessage.payload.toChatMessage()
|
||||
|
@ -38,13 +41,18 @@ proc init*(self: ChatController) =
|
|||
let chatItem = newChatItem(id = channelMessage.channel, channelMessage.chatTypeInt)
|
||||
discard self.view.chats.addChatItemToList(chatItem)
|
||||
|
||||
self.status.events.on("channelLeft") do(e: Args):
|
||||
self.chatModel.events.on("channelLeft") do(e: Args):
|
||||
discard self.view.chats.removeChatItemFromList(self.view.activeChannel)
|
||||
|
||||
self.status.events.on("activeChannelChanged") do(e: Args):
|
||||
self.chatModel.events.on("activeChannelChanged") do(e: Args):
|
||||
self.view.setActiveChannel(ChannelArgs(e).channel)
|
||||
|
||||
self.status.chat.load()
|
||||
proc init*(self: ChatController) =
|
||||
self.handleChatEvents()
|
||||
|
||||
self.chatModel.init()
|
||||
self.mailserverModel.init()
|
||||
|
||||
self.view.setActiveChannelByIndex(0)
|
||||
|
||||
proc handleMessage(self: ChatController, data: Signal) =
|
||||
|
@ -58,12 +66,13 @@ proc handleMessage(self: ChatController, data: Signal) =
|
|||
let chatMessage = message.toChatMessage()
|
||||
self.view.pushMessage(message.localChatId, chatMessage)
|
||||
|
||||
proc handleWhisperFilter(self: ChatController, data: Signal) =
|
||||
echo "Do something"
|
||||
proc handleDiscoverySummary(self: ChatController, data: Signal) =
|
||||
var discovery = DiscoverySummarySignal(data)
|
||||
self.mailserverModel.peerSummaryChange(discovery.enodes)
|
||||
|
||||
method onSignal(self: ChatController, data: Signal) =
|
||||
case data.signalType:
|
||||
of SignalType.Message: handleMessage(self, data)
|
||||
of SignalType.WhisperFilterAdded: handleWhisperFilter(self, data)
|
||||
of SignalType.DiscoverySummary: handleDiscoverySummary(self, data)
|
||||
else:
|
||||
warn "Unhandled signal received", signalType = data.signalType
|
||||
|
|
|
@ -74,7 +74,7 @@ proc mainProc() =
|
|||
signalController.addSubscriber(SignalType.Wallet, wallet)
|
||||
signalController.addSubscriber(SignalType.Wallet, node)
|
||||
signalController.addSubscriber(SignalType.Message, chat)
|
||||
signalController.addSubscriber(SignalType.WhisperFilterAdded, chat)
|
||||
signalController.addSubscriber(SignalType.DiscoverySummary, chat)
|
||||
signalController.addSubscriber(SignalType.NodeLogin, login)
|
||||
signalController.addSubscriber(SignalType.NodeLogin, onboarding)
|
||||
|
||||
|
|
|
@ -4,6 +4,7 @@ import tables
|
|||
import json
|
||||
import types
|
||||
import messages
|
||||
import discovery
|
||||
import chronicles
|
||||
import whisperFilter
|
||||
import strutils
|
||||
|
@ -69,6 +70,8 @@ QtObject:
|
|||
signal = WalletSignal(content: $jsonSignal)
|
||||
of SignalType.NodeLogin:
|
||||
signal = Json.decode($jsonSignal, NodeSignal)
|
||||
of SignalType.DiscoverySummary:
|
||||
signal = discovery.fromEvent(jsonSignal)
|
||||
else:
|
||||
discard
|
||||
|
||||
|
|
|
@ -0,0 +1,9 @@
|
|||
import json
|
||||
import types
|
||||
|
||||
proc fromEvent*(jsonSignal: JsonNode): Signal =
|
||||
var signal:DiscoverySummarySignal = DiscoverySummarySignal()
|
||||
if jsonSignal["event"].kind != JNull:
|
||||
for discoveryItem in jsonSignal["event"]:
|
||||
signal.enodes.add(discoveryItem["enode"].getStr)
|
||||
result = signal
|
|
@ -53,6 +53,8 @@ type ChatType* = enum
|
|||
Public = 2,
|
||||
PrivateGroupChat = 3
|
||||
|
||||
proc isOneToOne*(self: ChatType): bool = self == ChatType.OneToOne
|
||||
|
||||
type Chat* = object
|
||||
id*: string # ID is the id of the chat, for public chats it is the name e.g. status, for one-to-one is the hex encoded public key and for group chats is a random uuid appended with the hex encoded pk of the creator of the chat
|
||||
name*: string
|
||||
|
@ -81,4 +83,7 @@ type Filter* = object
|
|||
topic*: string
|
||||
|
||||
type WhisperFilterSignal* = ref object of Signal
|
||||
filters*: seq[Filter]
|
||||
filters*: seq[Filter]
|
||||
|
||||
type DiscoverySummarySignal* = ref object of Signal
|
||||
enodes*: seq[string]
|
|
@ -1,7 +1,9 @@
|
|||
import eventemitter, sets, json, strutils
|
||||
import sequtils
|
||||
import libstatus/utils
|
||||
import libstatus/core as status_core
|
||||
import libstatus/chat as status_chat
|
||||
import libstatus/mailservers as status_mailservers
|
||||
import chronicles
|
||||
import ../signals/types
|
||||
import chat/chat_item
|
||||
|
@ -43,23 +45,18 @@ proc hasChannel*(self: ChatModel, chatId: string): bool =
|
|||
proc getActiveChannel*(self: ChatModel): string =
|
||||
if (self.channels.len == 0): "" else: self.channels.toSeq[self.channels.len - 1]
|
||||
|
||||
proc join*(self: ChatModel, chatId: string, chatTypeInt: ChatType, isNewChat: bool = true) =
|
||||
proc join*(self: ChatModel, chatId: string, chatType: ChatType) =
|
||||
if self.hasChannel(chatId): return
|
||||
|
||||
self.channels.incl chatId
|
||||
|
||||
let generatedSymKey = status_chat.generateSymKeyFromPassword()
|
||||
|
||||
# TODO get this from the connection or something
|
||||
#TODO get this from the connection or something
|
||||
let peer = "enode://44160e22e8b42bd32a06c1532165fa9e096eebedd7fa6d6e5f8bbef0440bc4a4591fe3651be68193a7ec029021cdb496cfe1d7f9f1dc69eb99226e6f39a7a5d4@35.225.221.245:443"
|
||||
|
||||
let oneToOne = isOneToOneChat(chatId)
|
||||
|
||||
if isNewChat: status_chat.saveChat(chatId, oneToOne)
|
||||
|
||||
let filterResult = status_chat.loadFilters(chatId = chatId, oneToOne = oneToOne)
|
||||
|
||||
status_chat.chatMessages(chatId)
|
||||
status_chat.saveChat(chatId, chatType.isOneToOne)
|
||||
let filterResult = status_chat.loadFilters(@[status_chat.buildFilter(chatId = chatId, oneToOne = chatType.isOneToOne)])
|
||||
|
||||
let parsedResult = parseJson(filterResult)["result"]
|
||||
|
||||
|
@ -75,15 +72,44 @@ proc join*(self: ChatModel, chatId: string, chatTypeInt: ChatType, isNewChat: bo
|
|||
else:
|
||||
status_chat.requestMessages(topics, generatedSymKey, peer, 20)
|
||||
|
||||
self.events.emit("channelJoined", ChannelArgs(channel: chatId, chatTypeInt: chatTypeInt))
|
||||
self.events.emit("channelJoined", ChannelArgs(channel: chatId, chatTypeInt: chatType))
|
||||
self.events.emit("activeChannelChanged", ChannelArgs(channel: self.getActiveChannel()))
|
||||
|
||||
|
||||
proc load*(self: ChatModel) =
|
||||
proc init*(self: ChatModel) =
|
||||
let chatList = status_chat.loadChats()
|
||||
let generatedSymKey = status_chat.generateSymKeyFromPassword()
|
||||
|
||||
let peer = "enode://c42f368a23fa98ee546fd247220759062323249ef657d26d357a777443aec04db1b29a3a22ef3e7c548e18493ddaf51a31b0aed6079bd6ebe5ae838fcfaf3a49@178.128.142.54:443"
|
||||
# TODO this is needed for now for the retrieving of past messages. We'll either move or remove it later
|
||||
status_core.addPeer(peer)
|
||||
|
||||
var filters:seq[JsonNode] = @[]
|
||||
for chat in chatList:
|
||||
self.join(chat.id, chat.chatType, false)
|
||||
if self.hasChannel(chat.id): continue
|
||||
filters.add status_chat.buildFilter(chatId = chat.id, oneToOne = chat.chatType.isOneToOne)
|
||||
self.channels.incl chat.id
|
||||
self.events.emit("channelJoined", ChannelArgs(channel: chat.id, chatTypeInt: chat.chatType))
|
||||
self.events.emit("activeChannelChanged", ChannelArgs(channel: self.getActiveChannel()))
|
||||
|
||||
if filters.len == 0: return
|
||||
|
||||
let filterResult = status_chat.loadFilters(filters)
|
||||
|
||||
self.events.emit("chatsLoaded", ChatArgs(chats: chatList))
|
||||
|
||||
let parsedResult = parseJson(filterResult)["result"]
|
||||
var topics = newSeq[string](0)
|
||||
for topicObj in parsedResult:
|
||||
topics.add($topicObj["topic"].getStr)
|
||||
self.filters[$topicObj["chatId"].getStr] = topicObj["filterId"].getStr
|
||||
|
||||
if (topics.len == 0):
|
||||
warn "No topic found for the chat. Cannot load past messages"
|
||||
else:
|
||||
status_chat.requestMessages(topics, generatedSymKey, peer, 20)
|
||||
|
||||
|
||||
proc leave*(self: ChatModel, chatId: string) =
|
||||
status_chat.removeFilters(chatId, self.filters[chatId])
|
||||
status_chat.deactivateChat(chatId)
|
||||
|
|
|
@ -7,21 +7,22 @@ import chronicles
|
|||
import ../../signals/types
|
||||
import ../../signals/messages
|
||||
|
||||
proc loadFilters*(chatId: string, filterId: string = "", symKeyId: string = "", oneToOne: bool = false, identity: string = "", topic: string = "", discovery: bool = false, negotiated: bool = false, listen: bool = true): string =
|
||||
result = callPrivateRPC("loadFilters".prefix, %* [
|
||||
[{
|
||||
"ChatID": chatId, # identifier of the chat
|
||||
"FilterID": filterId, # whisper filter id generated
|
||||
"SymKeyID": symKeyId, # symmetric key id used for symmetric filters
|
||||
"OneToOne": oneToOne, # if asymmetric encryption is used for this chat
|
||||
"Identity": identity, # public key of the other recipient for non-public filters.
|
||||
# FIXME: passing empty string to the topic makes it error
|
||||
# "Topic": topic, # whisper topic
|
||||
"Discovery": discovery,
|
||||
"Negotiated": negotiated,
|
||||
"Listen": listen # whether we are actually listening for messages on this chat, or the filter is only created in order to be able to post on the topic
|
||||
}]
|
||||
])
|
||||
proc buildFilter*(chatId: string, filterId: string = "", symKeyId: string = "", oneToOne: bool = false, identity: string = "", topic: string = "", discovery: bool = false, negotiated: bool = false, listen: bool = true):JsonNode =
|
||||
result = %* {
|
||||
"ChatID": chatId, # identifier of the chat
|
||||
"FilterID": filterId, # whisper filter id generated
|
||||
"SymKeyID": symKeyId, # symmetric key id used for symmetric filters
|
||||
"OneToOne": oneToOne, # if asymmetric encryption is used for this chat
|
||||
"Identity": identity, # public key of the other recipient for non-public filters.
|
||||
# FIXME: passing empty string to the topic makes it error
|
||||
# "Topic": topic, # whisper topic
|
||||
"Discovery": discovery,
|
||||
"Negotiated": negotiated,
|
||||
"Listen": listen # whether we are actually listening for messages on this chat, or the filter is only created in order to be able to post on the topic
|
||||
}
|
||||
|
||||
proc loadFilters*(filters: seq[JsonNode]): string =
|
||||
result = callPrivateRPC("loadFilters".prefix, %* [filters])
|
||||
|
||||
proc removeFilters*(chatId: string, filterId: string) =
|
||||
discard callPrivateRPC("removeFilters".prefix, %* [
|
||||
|
@ -67,7 +68,8 @@ proc loadChats*(): seq[Chat] =
|
|||
if jsonResponse["result"].kind != JNull:
|
||||
for jsonChat in jsonResponse{"result"}:
|
||||
let chat = jsonChat.toChat
|
||||
if chat.active: result.add(jsonChat.toChat)
|
||||
if chat.active and chat.chatType != ChatType.Unknown:
|
||||
result.add(jsonChat.toChat)
|
||||
|
||||
proc chatMessages*(chatId: string) =
|
||||
discard callPrivateRPC("chatMessages".prefix, %* [chatId, nil, 20])
|
||||
|
@ -80,7 +82,7 @@ proc generateSymKeyFromPassword*(): string =
|
|||
]))["result"]).strip(chars = {'"'})
|
||||
|
||||
proc requestMessages*(topics: seq[string], symKeyID: string, peer: string, numberOfMessages: int) =
|
||||
discard callPrivateRPC("requestMessages".prefix, %* [
|
||||
echo callPrivateRPC("requestMessages".prefix, %* [
|
||||
{
|
||||
"topics": topics,
|
||||
"mailServerPeer": peer,
|
||||
|
|
|
@ -27,3 +27,9 @@ proc sendTransaction*(inputJSON: string, password: string): string =
|
|||
|
||||
proc startMessenger*() =
|
||||
discard callPrivateRPC("startMessenger".prefix)
|
||||
|
||||
proc addPeer*(peer: string) =
|
||||
echo libstatus.addPeer(peer)
|
||||
|
||||
proc markTrustedPeer*(peer: string) =
|
||||
echo callPrivateRPC("markTrustedPeer".prefix(false), %* [peer])
|
||||
|
|
|
@ -1,3 +1,7 @@
|
|||
import core
|
||||
import json
|
||||
import utils
|
||||
|
||||
proc getMailservers*(): array[0..8, (string, string)] =
|
||||
result = [
|
||||
(
|
||||
|
@ -37,3 +41,17 @@ proc getMailservers*(): array[0..8, (string, string)] =
|
|||
"enode://44160e22e8b42bd32a06c1532165fa9e096eebedd7fa6d6e5f8bbef0440bc4a4591fe3651be68193a7ec029021cdb496cfe1d7f9f1dc69eb99226e6f39a7a5d4@35.225.221.245:443"
|
||||
)
|
||||
]
|
||||
|
||||
proc ping*(timeoutMs: int): string =
|
||||
var addresses: seq[string] = @[]
|
||||
for mailserver in getMailservers():
|
||||
addresses.add(mailserver[1])
|
||||
result = callPrivateRPC("mailservers_ping", %* [
|
||||
{
|
||||
"addresses": addresses,
|
||||
"timeoutMs": timeoutMs
|
||||
}
|
||||
])
|
||||
|
||||
proc update*(peer: string): string =
|
||||
result = callPrivateRPC("updateMailservers".prefix, %* [[peer]])
|
|
@ -8,8 +8,9 @@ import accounts/signing_phrases
|
|||
proc isWakuEnabled(): bool =
|
||||
true # TODO:
|
||||
|
||||
proc prefix*(methodName: string): string =
|
||||
result = if isWakuEnabled(): "wakuext_" else: "shhext_"
|
||||
proc prefix*(methodName: string, isExt:bool = true): string =
|
||||
result = if isWakuEnabled(): "waku" else: "shh"
|
||||
result = result & (if isExt: "ext_" else: "_")
|
||||
result = result & methodName
|
||||
|
||||
proc isOneToOneChat*(chatId: string): bool =
|
||||
|
|
|
@ -0,0 +1,91 @@
|
|||
import algorithm, json, random, math
|
||||
import libstatus/core as status_core
|
||||
import libstatus/chat as status_chat
|
||||
import libstatus/mailservers as status_mailservers
|
||||
import tables
|
||||
import chronicles
|
||||
import eventemitter
|
||||
|
||||
|
||||
logScope:
|
||||
topics = "mailserver-model"
|
||||
|
||||
type
|
||||
MailserverArg* = ref object of Args
|
||||
peer*: string
|
||||
|
||||
MailserverStatus* = enum
|
||||
Disconnected = 0,
|
||||
Connecting = 1
|
||||
Connected = 2,
|
||||
Trusted = 3,
|
||||
|
||||
MailserverModel* = ref object
|
||||
events*: EventEmitter
|
||||
nodes*: Table[string, MailserverStatus]
|
||||
|
||||
|
||||
proc cmpMailserverReply(x, y: (string, int)): int =
|
||||
if x[1] > y[1]: 1
|
||||
elif x[1] == y[1]: 0
|
||||
else: -1
|
||||
|
||||
proc poolSize(fleetSize: int): int = ceil(fleetSize / 4).int
|
||||
|
||||
proc newMailserverModel*(): MailserverModel =
|
||||
result = MailserverModel()
|
||||
result.events = createEventEmitter()
|
||||
result.nodes = initTable[string, MailserverStatus]()
|
||||
|
||||
proc trustPeer*(self: MailserverModel, enode:string) =
|
||||
markTrustedPeer(enode)
|
||||
|
||||
proc connect*(self: MailserverModel, enode: string) =
|
||||
if self.nodes.hasKey(enode):
|
||||
if self.nodes[enode] == MailserverStatus.Connected:
|
||||
self.trustPeer(enode)
|
||||
else:
|
||||
self.nodes[enode] = MailserverStatus.Connecting
|
||||
addPeer(enode)
|
||||
# TODO: check if connection is made after a connection timeout?
|
||||
|
||||
echo status_mailservers.update(enode)
|
||||
|
||||
proc peerSummaryChange*(self: MailserverModel, peers: seq[string]) =
|
||||
for peer in self.nodes.keys:
|
||||
if not peers.contains(peer):
|
||||
self.nodes[peer] = MailserverStatus.Disconnected
|
||||
self.events.emit("peerDisconnected", MailserverArg(peer: peer))
|
||||
# TODO: reconnect peer up to N times on 'peerDisconnected'
|
||||
|
||||
for peer in peers:
|
||||
if not self.nodes.hasKey(peer) or self.nodes[peer] == MailserverStatus.Disconnected:
|
||||
self.nodes[peer] = MailserverStatus.Connected
|
||||
self.events.emit("peerConnected", MailserverArg(peer: peer))
|
||||
|
||||
|
||||
proc init*(self: MailserverModel) =
|
||||
self.events.on("peerConnected") do(e: Args):
|
||||
let arg = MailserverArg(e)
|
||||
self.trustPeer(arg.peer)
|
||||
|
||||
|
||||
#TODO: connect to current mailserver from the settings
|
||||
|
||||
# or setup a random one:
|
||||
let mailserversReply = parseJson(status_mailservers.ping(500))["result"]
|
||||
var availableMailservers:seq[(string, int)] = @[]
|
||||
|
||||
for reply in mailserversReply:
|
||||
if(reply["error"].kind != JNull): continue # The results with error are ignored
|
||||
availableMailservers.add((reply["address"].getStr, reply["rttMs"].getInt))
|
||||
|
||||
availableMailservers.sort(cmpMailserverReply)
|
||||
|
||||
# Picks a random mailserver amongs the ones with the lowest latency
|
||||
# The pool size is 1/4 of the mailservers were pinged successfully
|
||||
randomize()
|
||||
let mailServer = availableMailservers[rand(poolSize(availableMailservers.len))][0]
|
||||
self.connect(mailserver)
|
||||
|
||||
|
Loading…
Reference in New Issue