From f8eaedc9a7c5bdfecd7c2e4c7c68c29bb9578c0e Mon Sep 17 00:00:00 2001 From: Felicio Mununga Date: Thu, 9 Jun 2022 18:23:31 +0200 Subject: [PATCH] add packages/status-js/src/client/community/handle-waku-message.ts --- packages/status-js/src/client.ts | 18 +- .../src/client/community/community.ts | 176 +++++++---- .../client/community/handle-waku-message.ts | 280 ++++++++++++++++++ packages/status-js/src/debug.ts | 1 + 4 files changed, 409 insertions(+), 66 deletions(-) diff --git a/packages/status-js/src/client.ts b/packages/status-js/src/client.ts index aeff3b1b..178778a0 100644 --- a/packages/status-js/src/client.ts +++ b/packages/status-js/src/client.ts @@ -1,10 +1,13 @@ -// todo?: already received (by messageId or event) -// todo: ignore not found // todo: handle updates together with fetching history (chat messages) // todo: handle diff waku messages on diff topics -// todo: tests +// todo?: already received (by messageId or event) +// todo: use clock for sorting +// todo: use Map +// todo: write class for channels +// todo: fetch all history until Map; remove end param +// todo: handle -// todo?: use clock for sorting +// todo: tests // todo: handle disconnections; no messages after sleep; libp2p; // todo: identities/members? @@ -14,6 +17,8 @@ // todo?: call onChannel* separately // todo?: ignore messages of not yet approved users // todo?: ignore messages with invalid signature +// todo?: handle encrypted waku messages +// todo?: handle waku messages/events without identity import { hexToBytes } from 'ethereum-cryptography/utils' import { Waku, WakuMessage } from 'js-waku' @@ -22,6 +27,7 @@ import { ApplicationMetadataMessage } from '~/protos/application-metadata-messag import { Account } from './account' import { Community } from './client/community/community' +import { handleWakuMessage } from './client/community/handle-waku-message' export interface ClientOptions { publicKey: string @@ -110,6 +116,10 @@ class Client { await this.waku.relay.send(wakuMesage) } + + public handleWakuMessage = (wakuMessage: WakuMessage): void => { + handleWakuMessage(wakuMessage, this.community, this.account) + } } export async function createClient(options: ClientOptions): Promise { diff --git a/packages/status-js/src/client/community/community.ts b/packages/status-js/src/client/community/community.ts index 50b181d9..949d9713 100644 --- a/packages/status-js/src/client/community/community.ts +++ b/packages/status-js/src/client/community/community.ts @@ -1,4 +1,4 @@ -import { waku_message } from 'js-waku' +import { PageDirection, waku_message } from 'js-waku' import { hexToBytes } from 'js-waku/build/main/lib/utils' import difference from 'lodash/difference' @@ -9,18 +9,16 @@ import { EmojiReaction } from '~/protos/emoji-reaction' import { idToContentTopic } from '../../contentTopic' import { createSymKeyFromPassword } from '../../encryption' import { createChannelContentTopics } from './create-channel-content-topics' -import { fetchChannelChatMessages } from './fetch-channel-chat-messages' -import { handleChannelChatMessage } from './handle-channel-chat-message' -import { handleCommunity } from './handle-community' import type { Client } from '../../client' import type { CommunityDescription } from '../../wire/community_description' import type { Reactions } from './get-reactions' import type { ImageMessage } from '~/src/proto/communities/v1/chat_message' -import type { Waku, WakuMessage } from 'js-waku' +import type { Waku } from 'js-waku' export type CommunityMetadataType = CommunityDescription['proto'] +// todo?: rename to ChatMessageType export type MessageType = ChatMessage & { messageId: string pinned: boolean @@ -36,13 +34,12 @@ export class Community { // fixme! private communityContentTopic!: string private communityDecryptionKey!: Uint8Array - public communityMetadata!: CommunityMetadataType - public channelMessages: Partial<{ [key: string]: MessageType[] }> = {} - private channelMessagesCallbacks: { + public communityMetadata!: CommunityMetadataType // state + public channelMessages: Partial<{ [key: string]: MessageType[] }> = {} // state + public channelMessagesCallbacks: { [key: string]: (messages: MessageType[]) => void } = {} - - private communityCallback: + public communityCallback: | ((community: CommunityMetadataType) => void) | undefined @@ -83,14 +80,15 @@ export class Community { await this.waku.store.queryHistory([this.communityContentTopic], { decryptionKeys: [this.communityDecryptionKey], callback: wakuMessages => { + // todo: iterate from right for (const wakuMessage of wakuMessages.reverse()) { - const message = handleCommunity(wakuMessage) + this.client.handleWakuMessage(wakuMessage) - if (!message) { + if (!this.communityMetadata) { return shouldStop } - communityMetadata = message + communityMetadata = this.communityMetadata shouldStop = true return shouldStop @@ -111,30 +109,46 @@ export class Community { // todo?: keep in state instead and replace the factory const symKey = await createSymKeyFromPassword(id) - return async (options: { start: Date; end?: Date }) => { - const messages = await fetchChannelChatMessages( - this.waku, - symKey, - channelContentTopic, - this.channelMessages[channelId] ?? [], - options, - callback - ) + return async (options: { start: Date }) => { + const startTime = options.start + const endTime = new Date() - if (!messages.length) { - return + const _messages = this.channelMessages[channelId] || [] + + if (_messages.length) { + const oldestMessageTime = new Date(Number(_messages[0].timestamp)) + + if (oldestMessageTime <= options.start) { + callback(_messages) + } } - // state - this.channelMessages[channelId] = messages + await this.waku.store.queryHistory([channelContentTopic], { + timeFilter: { + startTime: startTime, + endTime: endTime, + }, + pageSize: 50, + // most recent page first + pageDirection: PageDirection.BACKWARD, + decryptionKeys: [symKey], + callback: wakuMessages => { + // oldest message first + for (const wakuMessage of wakuMessages) { + this.client.handleWakuMessage(wakuMessage) + } + }, + }) - return + // todo: call abck only if oldestMessageTime has changed + // callback + callback(this.channelMessages[channelId] ?? []) } } private observeCommunity = () => { this.waku.relay.addDecryptionKey(this.communityDecryptionKey) - this.waku.relay.addObserver(this.handleCommunity, [ + this.waku.relay.addObserver(this.client.handleWakuMessage, [ this.communityContentTopic, ]) } @@ -156,7 +170,7 @@ export class Community { }) const contentTopics = await Promise.all(symKeyPromises) - this.waku.relay.addObserver(this.handleMessage, contentTopics) + this.waku.relay.addObserver(this.client.handleWakuMessage, contentTopics) } private unobserveChannelMessages = (chatIds: string[]) => { @@ -165,53 +179,91 @@ export class Community { this.communityPublicKey ) - this.waku.relay.deleteObserver(this.handleMessage, contentTopics) + this.waku.relay.deleteObserver(this.client.handleWakuMessage, contentTopics) } - private handleCommunity = (wakuMessage: WakuMessage) => { - const communityMetadata = handleCommunity(wakuMessage) + public handleCommunityMetadataEvent = ( + communityMetadata: CommunityMetadataType + ) => { + if (this.communityMetadata) { + // Channels + const removedChats = difference( + Object.keys(this.communityMetadata.chats), + Object.keys(communityMetadata.chats) + ) + const addedChats = difference( + Object.keys(communityMetadata.chats), + Object.keys(communityMetadata.chats) + ) - if (!communityMetadata) { - return - } + if (removedChats.length) { + this.unobserveChannelMessages(removedChats) + } - // Channels - const removedChats = difference( - Object.keys(this.communityMetadata.chats), - Object.keys(communityMetadata.chats) - ) - const addedChats = difference( - Object.keys(communityMetadata.chats), - Object.keys(communityMetadata.chats) - ) - - if (removedChats.length) { - this.unobserveChannelMessages(removedChats) - } - - if (addedChats.length) { - this.observeChannelMessages(addedChats) + if (addedChats.length) { + this.observeChannelMessages(addedChats) + } } // Community + // fixme!: set only if updated + this.communityMetadata = communityMetadata this.communityCallback?.(communityMetadata) } - private handleMessage = (wakuMessage: WakuMessage) => { - const messages = handleChannelChatMessage( - wakuMessage, - this.channelMessages, - this.client.account?.publicKey - ) + public handleChannelChatMessageNewEvent = (chatMessage: MessageType) => { + const _messages = this.channelMessages[chatMessage.channelId] || [] - if (!messages.length) { - return + // findIndexLeft + // const index = _messages.findIndex(({ timestamp }) => { + // new Date(Number(timestamp)) > new Date(Number(message.timestamp)) + // }) + // findIndexRight + let messageIndex = _messages.length + while (messageIndex > 0) { + const _message = _messages[messageIndex - 1] + + // if (_message.messageId === chatMessage.messageId) { + // messageIndex = -1 + + // break + // } + + if ( + new Date(Number(_message.timestamp)) <= + new Date(Number(chatMessage.timestamp)) + ) { + break + } + + messageIndex-- } - // state - const channelId = messages[0].channelId + // // already received + // if (messageIndex < 0) { + // return + // } - this.channelMessages[channelId] = messages + // replied + let responsedToMessageIndex = _messages.length + while (--responsedToMessageIndex >= 0) { + const _message = _messages[responsedToMessageIndex] + + if (_message.messageId === chatMessage.responseTo) { + break + } + } + + if (responsedToMessageIndex >= 0) { + chatMessage.responseToMessage = _messages[responsedToMessageIndex] + } + + _messages.splice(messageIndex, 0, chatMessage) + + // state + const channelId = _messages[0].channelId + + this.channelMessages[channelId] = _messages // callback // todo!: review use of ! why not just use messages defined above? diff --git a/packages/status-js/src/client/community/handle-waku-message.ts b/packages/status-js/src/client/community/handle-waku-message.ts index e69de29b..2b3fa27d 100644 --- a/packages/status-js/src/client/community/handle-waku-message.ts +++ b/packages/status-js/src/client/community/handle-waku-message.ts @@ -0,0 +1,280 @@ +// todo: move file +// fixme: relative paths + +import { bytesToHex } from 'ethereum-cryptography/utils' + +import { ApplicationMetadataMessage } from '../../../protos/application-metadata-message' +import { + ChatMessage, + DeleteMessage, + EditMessage, +} from '../../../protos/chat-message' +import { EmojiReaction } from '../../../protos/emoji-reaction' +import { PinMessage } from '../../../protos/pin-message' +import { ProtocolMessage } from '../../../protos/protocol-message' +import { CommunityDescription } from '../../proto/communities/v1/communities' +import { payloadToId } from '../../utils/payload-to-id' +import { recoverPublicKey } from '../../utils/recover-public-key' +import { getChannelId } from './get-channel-id' +import { getReactions } from './get-reactions' +import { mapChatMessage } from './map-chat-message' + +import type { Account } from '../../account' +import type { Community /*, MessageType*/ } from './community' +import type { WakuMessage } from 'js-waku' + +// todo?: return decoded, possibly mapped, event but do not update state and call callback +// todo?: return void +// todo?: return success (e.g. if this handler should be used in Community's init) +// fixme: +/** + * Argument of type '(wakuMessage: WakuMessage, community: Community) => boolean' + * is not assignable to parameter of type '(message: WakuMessage) => void'.ts(2345) + */ +// type HandlerResult = boolean +// type HandlerResult = CommunityDescription | MessageType | undefined + +export function handleWakuMessage( + wakuMessage: WakuMessage, + // state + community: Community, + account?: Account +): void { + // decode (layers) + if (!wakuMessage.payload) { + return + } + + // todo: explain + if (!wakuMessage.signaturePublicKey) { + return + } + + let messageToDecode = wakuMessage.payload + let decodedProtocol + try { + decodedProtocol = ProtocolMessage.decode(messageToDecode) + if (decodedProtocol) { + messageToDecode = decodedProtocol.publicMessage + } + } catch {} + + const decodedMetadata = ApplicationMetadataMessage.decode(messageToDecode) + if (!decodedMetadata.payload) { + return + } + messageToDecode = decodedMetadata.payload + + // recover public key + const publicKey = recoverPublicKey( + decodedMetadata.signature, + decodedMetadata.payload + ) + + // decode, map and handle (events) + switch (decodedMetadata.type) { + case ApplicationMetadataMessage.Type.TYPE_COMMUNITY_DESCRIPTION: { + // decode + const decodedPayload = CommunityDescription.decode(messageToDecode) + + // todo?: don't use class method + // handle (state and callback) + community.handleCommunityMetadataEvent(decodedPayload) + + break + } + + case ApplicationMetadataMessage.Type.TYPE_CHAT_MESSAGE: { + // decode + const decodedPayload = ChatMessage.decode(messageToDecode) + + if (!decodedProtocol) { + break + } + + // TODO?: ignore community.channelMessages which are messageType !== COMMUNITY_CHAT + + // map + // fixme?: handle decodedProtocol.encryptedMessage + const messageId = payloadToId(decodedProtocol.publicMessage, publicKey) + // todo?: use full chatId (incl. pub key) instead + const channelId = getChannelId(decodedPayload.chatId) + + const chatMessage = mapChatMessage(decodedPayload, { + messageId, + channelId, + }) + + // handle + community.handleChannelChatMessageNewEvent(chatMessage) + + break + } + + case ApplicationMetadataMessage.Type.TYPE_EDIT_MESSAGE: { + const decodedPayload = EditMessage.decode(messageToDecode) + + const messageId = decodedPayload.messageId + const channelId = getChannelId(decodedPayload.chatId) + + // todo?: move to class method (e.g. handleChannelChatMessageEditEvent) + const _messages = community.channelMessages[channelId] || [] + + let index = _messages.length + while (--index >= 0) { + const _message = _messages[index] + + if (_message.messageId === messageId) { + break + } + } + + // original not found + if (index < 0) { + break + } + + const _message = _messages[index] + + // todo?: use mapChatMessage + const message = { + ..._message, + // fixme?: other fields that user can edit + text: decodedPayload.text, + } + + _messages[index] = message + + // state + community.channelMessages[channelId] = _messages + + // callback + community.channelMessagesCallbacks[channelId]?.( + community.channelMessages[channelId]! + ) + + break + } + + case ApplicationMetadataMessage.Type.TYPE_DELETE_MESSAGE: { + const decodedPayload = DeleteMessage.decode(messageToDecode) + + const messageId = decodedPayload.messageId + const channelId = getChannelId(decodedPayload.chatId) + + const _messages = community.channelMessages[channelId] || [] + + let index = _messages.length + while (--index >= 0) { + const _message = _messages[index] + + if (_message.messageId === messageId) { + break + } + } + + // original not found + if (index < 0) { + break + } + + // todo?: use delete; set to null + _messages.splice(index, 1) + + // state + community.channelMessages[channelId] = _messages + + // callback + community.channelMessagesCallbacks[channelId]?.( + community.channelMessages[channelId]! + ) + + break + } + + case ApplicationMetadataMessage.Type.TYPE_PIN_MESSAGE: { + const decodedPayload = PinMessage.decode(messageToDecode) + + const messageId = decodedPayload.messageId + const channelId = getChannelId(decodedPayload.chatId) + + const _messages = community.channelMessages[channelId] || [] + + let index = _messages.length + while (--index >= 0) { + const _message = _messages[index] + + if (_message.messageId === messageId) { + break + } + } + + // original not found + if (index < 0) { + break + } + + _messages[index].pinned = Boolean(decodedPayload.pinned) + + // state + community.channelMessages[channelId] = _messages + + // callback + // todo!: review use of ! + community.channelMessagesCallbacks[channelId]?.( + community.channelMessages[channelId]! + ) + + break + } + + case ApplicationMetadataMessage.Type.TYPE_EMOJI_REACTION: { + const decodedPayload = EmojiReaction.decode(messageToDecode) + + const messageId = decodedPayload.messageId + const channelId = getChannelId(decodedPayload.chatId) + + const _messages = community.channelMessages[channelId] || [] + + let index = _messages.length + while (--index >= 0) { + const _message = _messages[index] + + if (_message.messageId === messageId) { + break + } + } + + // original not found + if (index < 0) { + break + } + + const _message = _messages[index] + const isMe = + account?.publicKey === `0x${bytesToHex(wakuMessage.signaturePublicKey)}` + + // fixme! + _messages[index].reactions = getReactions( + decodedPayload, + _message.reactions, + isMe + ) + + // state + community.channelMessages[channelId] = _messages + + // callback + community.channelMessagesCallbacks[channelId]?.( + community.channelMessages[channelId]! + ) + + break + } + + default: + break + } + + return +} diff --git a/packages/status-js/src/debug.ts b/packages/status-js/src/debug.ts index 76012182..50e2a356 100644 --- a/packages/status-js/src/debug.ts +++ b/packages/status-js/src/debug.ts @@ -14,6 +14,7 @@ const CHANNEL_ID = await client.createAccount() const communityMetadata = await client.community.fetchCommunity() + console.log(communityMetadata) const fetchChannelMessages = await client.community.createFetchChannelMessages(CHANNEL_ID, messages =>