mirror of
https://github.com/status-im/status-web.git
synced 2025-01-26 12:30:45 +00:00
e6680f8e62
* Unify ESLint configuration * Add .eslintignore file * Add Node and Jest ESLint plugins * Fix linting issues * Sort imports and type imports
271 lines
7.3 KiB
TypeScript
271 lines
7.3 KiB
TypeScript
import debug from 'debug'
|
|
import { Waku, WakuMessage } from 'js-waku'
|
|
import { DecryptionMethod } from 'js-waku/build/main/lib/waku_message'
|
|
|
|
import { Chat } from './chat'
|
|
import { ApplicationMetadataMessage_Type } from './proto/status/v1/application_metadata_message'
|
|
import { getLatestUserNickname } from './utils'
|
|
import { ApplicationMetadataMessage } from './wire/application_metadata_message'
|
|
import { ChatMessage } from './wire/chat_message'
|
|
|
|
import type { Identity } from './identity'
|
|
import type { Content } from './wire/chat_message'
|
|
import type { CreateOptions as WakuCreateOptions } from 'js-waku/build/main/lib/waku'
|
|
|
|
const dbg = debug('communities:messenger')
|
|
|
|
export class Messenger {
|
|
waku: Waku
|
|
chatsById: Map<string, Chat>
|
|
observers: {
|
|
[chatId: string]: Set<
|
|
(
|
|
message: ApplicationMetadataMessage,
|
|
timestamp: Date,
|
|
chatId: string
|
|
) => void
|
|
>
|
|
}
|
|
identity: Identity | undefined
|
|
|
|
private constructor(identity: Identity | undefined, waku: Waku) {
|
|
this.identity = identity
|
|
this.waku = waku
|
|
this.chatsById = new Map()
|
|
this.observers = {}
|
|
}
|
|
|
|
public static async create(
|
|
identity: Identity | undefined,
|
|
wakuOptions?: WakuCreateOptions
|
|
): Promise<Messenger> {
|
|
const _wakuOptions = Object.assign(
|
|
{ bootstrap: { default: true } },
|
|
wakuOptions
|
|
)
|
|
const waku = await Waku.create(_wakuOptions)
|
|
return new Messenger(identity, waku)
|
|
}
|
|
|
|
/**
|
|
* Joins a public chat using its id.
|
|
*
|
|
* For community chats, prefer [[joinChat]].
|
|
*
|
|
* Use `addListener` to get messages received on this chat.
|
|
*/
|
|
public async joinChatById(chatId: string): Promise<void> {
|
|
const chat = await Chat.create(chatId)
|
|
|
|
await this.joinChat(chat)
|
|
}
|
|
|
|
/**
|
|
* Joins several of public chats.
|
|
*
|
|
* Use `addListener` to get messages received on these chats.
|
|
*/
|
|
public async joinChats(chats: Iterable<Chat>): Promise<void> {
|
|
await Promise.all(
|
|
Array.from(chats).map(chat => {
|
|
return this.joinChat(chat)
|
|
})
|
|
)
|
|
}
|
|
|
|
/**
|
|
* Joins a public chat.
|
|
*
|
|
* Use `addListener` to get messages received on this chat.
|
|
*/
|
|
public async joinChat(chat: Chat): Promise<void> {
|
|
if (this.chatsById.has(chat.id))
|
|
throw `Failed to join chat, it is already joined: ${chat.id}`
|
|
|
|
this.waku.addDecryptionKey(chat.symKey, {
|
|
method: DecryptionMethod.Symmetric,
|
|
contentTopics: [chat.contentTopic],
|
|
})
|
|
|
|
this.waku.relay.addObserver(
|
|
(wakuMessage: WakuMessage) => {
|
|
if (!wakuMessage.payload || !wakuMessage.timestamp) return
|
|
|
|
const message = ApplicationMetadataMessage.decode(wakuMessage.payload)
|
|
|
|
switch (message.type) {
|
|
case ApplicationMetadataMessage_Type.TYPE_CHAT_MESSAGE:
|
|
this._handleNewChatMessage(chat, message, wakuMessage.timestamp)
|
|
break
|
|
default:
|
|
dbg('Received unsupported message type', message.type)
|
|
}
|
|
},
|
|
[chat.contentTopic]
|
|
)
|
|
|
|
this.chatsById.set(chat.id, chat)
|
|
}
|
|
|
|
/**
|
|
* Sends a message on the given chat Id.
|
|
*/
|
|
public async sendMessage(
|
|
chatId: string,
|
|
content: Content,
|
|
responseTo?: string
|
|
): Promise<void> {
|
|
if (this.identity) {
|
|
const chat = this.chatsById.get(chatId)
|
|
if (!chat) throw `Failed to send message, chat not joined: ${chatId}`
|
|
|
|
const chatMessage = chat.createMessage(content, responseTo)
|
|
|
|
const appMetadataMessage = ApplicationMetadataMessage.create(
|
|
chatMessage.encode(),
|
|
ApplicationMetadataMessage_Type.TYPE_CHAT_MESSAGE,
|
|
this.identity
|
|
)
|
|
|
|
const wakuMessage = await WakuMessage.fromBytes(
|
|
appMetadataMessage.encode(),
|
|
chat.contentTopic,
|
|
{ symKey: chat.symKey, sigPrivKey: this.identity.privateKey }
|
|
)
|
|
|
|
await this.waku.relay.send(wakuMessage)
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Add an observer of new messages received on the given chat id.
|
|
*
|
|
* @throws string If the chat has not been joined first using [joinChat].
|
|
*/
|
|
public addObserver(
|
|
observer: (
|
|
message: ApplicationMetadataMessage,
|
|
timestamp: Date,
|
|
chatId: string
|
|
) => void,
|
|
chatId: string | string[]
|
|
): void {
|
|
let chats = []
|
|
|
|
if (typeof chatId === 'string') {
|
|
chats.push(chatId)
|
|
} else {
|
|
chats = [...chatId]
|
|
}
|
|
|
|
chats.forEach(id => {
|
|
if (!this.chatsById.has(id))
|
|
throw 'Cannot add observer on a chat that is not joined.'
|
|
if (!this.observers[id]) {
|
|
this.observers[id] = new Set()
|
|
}
|
|
|
|
this.observers[id].add(observer)
|
|
})
|
|
}
|
|
|
|
/**
|
|
* Delete an observer of new messages received on the given chat id.
|
|
*
|
|
* @throws string If the chat has not been joined first using [joinChat].
|
|
*/
|
|
|
|
deleteObserver(
|
|
observer: (message: ApplicationMetadataMessage) => void,
|
|
chatId: string
|
|
): void {
|
|
if (this.observers[chatId]) {
|
|
this.observers[chatId].delete(observer)
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Stops the messenger.
|
|
*/
|
|
public async stop(): Promise<void> {
|
|
await this.waku.stop()
|
|
}
|
|
|
|
/**
|
|
* Retrieve previous messages from a Waku Store node for the given chat Id.
|
|
*
|
|
* Note: note sure what is the preferred interface: callback or returning all messages
|
|
* Callback is more flexible and allow processing messages as they are retrieved instead of waiting for the
|
|
* full retrieval via paging to be done.
|
|
*/
|
|
public async retrievePreviousMessages(
|
|
chatId: string,
|
|
startTime: Date,
|
|
endTime: Date,
|
|
callback?: (messages: ApplicationMetadataMessage[]) => void
|
|
): Promise<number> {
|
|
const chat = this.chatsById.get(chatId)
|
|
if (!chat)
|
|
throw `Failed to retrieve messages, chat is not joined: ${chatId}`
|
|
|
|
const _callback = (wakuMessages: WakuMessage[]): void => {
|
|
const isDefined = (
|
|
msg: ApplicationMetadataMessage | undefined
|
|
): msg is ApplicationMetadataMessage => {
|
|
return !!msg
|
|
}
|
|
|
|
const messages = wakuMessages.map((wakuMessage: WakuMessage) => {
|
|
if (!wakuMessage.payload || !wakuMessage.timestamp) return
|
|
|
|
const message = ApplicationMetadataMessage.decode(wakuMessage.payload)
|
|
|
|
switch (message.type) {
|
|
case ApplicationMetadataMessage_Type.TYPE_CHAT_MESSAGE:
|
|
this._handleNewChatMessage(chat, message, wakuMessage.timestamp)
|
|
return message
|
|
default:
|
|
dbg('Retrieved unsupported message type', message.type)
|
|
return
|
|
}
|
|
})
|
|
if (callback) {
|
|
callback(messages.filter(isDefined))
|
|
}
|
|
}
|
|
const allMessages = await this.waku.store.queryHistory(
|
|
[chat.contentTopic],
|
|
{
|
|
timeFilter: { startTime, endTime },
|
|
callback: _callback,
|
|
}
|
|
)
|
|
return allMessages.length
|
|
}
|
|
|
|
private _handleNewChatMessage(
|
|
chat: Chat,
|
|
message: ApplicationMetadataMessage,
|
|
timestamp: Date
|
|
): void {
|
|
if (!message.payload || !message.type || !message.signature) return
|
|
|
|
const chatMessage = ChatMessage.decode(message.payload)
|
|
chat.handleNewMessage(chatMessage)
|
|
|
|
if (this.observers[chat.id]) {
|
|
this.observers[chat.id].forEach(observer => {
|
|
observer(message, timestamp, chat.id)
|
|
})
|
|
}
|
|
}
|
|
|
|
async checkIfUserInWakuNetwork(publicKey: Uint8Array): Promise<boolean> {
|
|
const { clock, nickname } = await getLatestUserNickname(
|
|
publicKey,
|
|
this.waku
|
|
)
|
|
return clock > 0 && nickname !== ''
|
|
}
|
|
}
|