add packages/status-js/src/client/community/handle-waku-message.ts

This commit is contained in:
Felicio Mununga 2022-06-09 18:23:31 +02:00
parent c3840f8725
commit faa60f6ad4
No known key found for this signature in database
GPG Key ID: 0EB8D75C775AB6F1
4 changed files with 409 additions and 66 deletions

View File

@ -1,10 +1,13 @@
// todo?: already received (by messageId or event)
// todo: ignore not found
// todo: handle updates together with fetching history (chat messages)
// todo: handle diff waku messages on diff topics
// todo: tests
// todo?: already received (by messageId or event)
// todo: use clock for sorting
// todo: use Map
// todo: write class for channels
// todo: fetch all history until Map; remove end param
// todo: handle
// todo?: use clock for sorting
// todo: tests
// todo: handle disconnections; no messages after sleep; libp2p;
// todo: identities/members?
@ -14,6 +17,8 @@
// todo?: call onChannel* separately
// todo?: ignore messages of not yet approved users
// todo?: ignore messages with invalid signature
// todo?: handle encrypted waku messages
// todo?: handle waku messages/events without identity
import { hexToBytes } from 'ethereum-cryptography/utils'
import { Waku, WakuMessage } from 'js-waku'
@ -22,6 +27,7 @@ import { ApplicationMetadataMessage } from '~/protos/application-metadata-messag
import { Account } from './account'
import { Community } from './client/community/community'
import { handleWakuMessage } from './client/community/handle-waku-message'
export interface ClientOptions {
publicKey: string
@ -110,6 +116,10 @@ class Client {
await this.waku.relay.send(wakuMesage)
}
public handleWakuMessage = (wakuMessage: WakuMessage): void => {
handleWakuMessage(wakuMessage, this.community, this.account)
}
}
export async function createClient(options: ClientOptions): Promise<Client> {

View File

@ -1,4 +1,4 @@
import { waku_message } from 'js-waku'
import { PageDirection, waku_message } from 'js-waku'
import { hexToBytes } from 'js-waku/build/main/lib/utils'
import difference from 'lodash/difference'
@ -9,18 +9,16 @@ import { EmojiReaction } from '~/protos/emoji-reaction'
import { idToContentTopic } from '../../contentTopic'
import { createSymKeyFromPassword } from '../../encryption'
import { createChannelContentTopics } from './create-channel-content-topics'
import { fetchChannelChatMessages } from './fetch-channel-chat-messages'
import { handleChannelChatMessage } from './handle-channel-chat-message'
import { handleCommunity } from './handle-community'
import type { Client } from '../../client'
import type { CommunityDescription } from '../../wire/community_description'
import type { Reactions } from './get-reactions'
import type { ImageMessage } from '~/src/proto/communities/v1/chat_message'
import type { Waku, WakuMessage } from 'js-waku'
import type { Waku } from 'js-waku'
export type CommunityMetadataType = CommunityDescription['proto']
// todo?: rename to ChatMessageType
export type MessageType = ChatMessage & {
messageId: string
pinned: boolean
@ -36,13 +34,12 @@ export class Community {
// fixme!
private communityContentTopic!: string
private communityDecryptionKey!: Uint8Array
public communityMetadata!: CommunityMetadataType
public channelMessages: Partial<{ [key: string]: MessageType[] }> = {}
private channelMessagesCallbacks: {
public communityMetadata!: CommunityMetadataType // state
public channelMessages: Partial<{ [key: string]: MessageType[] }> = {} // state
public channelMessagesCallbacks: {
[key: string]: (messages: MessageType[]) => void
} = {}
private communityCallback:
public communityCallback:
| ((community: CommunityMetadataType) => void)
| undefined
@ -83,14 +80,15 @@ export class Community {
await this.waku.store.queryHistory([this.communityContentTopic], {
decryptionKeys: [this.communityDecryptionKey],
callback: wakuMessages => {
// todo: iterate from right
for (const wakuMessage of wakuMessages.reverse()) {
const message = handleCommunity(wakuMessage)
this.client.handleWakuMessage(wakuMessage)
if (!message) {
if (!this.communityMetadata) {
return shouldStop
}
communityMetadata = message
communityMetadata = this.communityMetadata
shouldStop = true
return shouldStop
@ -111,30 +109,46 @@ export class Community {
// todo?: keep in state instead and replace the factory
const symKey = await createSymKeyFromPassword(id)
return async (options: { start: Date; end?: Date }) => {
const messages = await fetchChannelChatMessages(
this.waku,
symKey,
channelContentTopic,
this.channelMessages[channelId] ?? [],
options,
callback
)
return async (options: { start: Date }) => {
const startTime = options.start
const endTime = new Date()
if (!messages.length) {
return
const _messages = this.channelMessages[channelId] || []
if (_messages.length) {
const oldestMessageTime = new Date(Number(_messages[0].timestamp))
if (oldestMessageTime <= options.start) {
callback(_messages)
}
}
// state
this.channelMessages[channelId] = messages
await this.waku.store.queryHistory([channelContentTopic], {
timeFilter: {
startTime: startTime,
endTime: endTime,
},
pageSize: 50,
// most recent page first
pageDirection: PageDirection.BACKWARD,
decryptionKeys: [symKey],
callback: wakuMessages => {
// oldest message first
for (const wakuMessage of wakuMessages) {
this.client.handleWakuMessage(wakuMessage)
}
},
})
return
// todo: call abck only if oldestMessageTime has changed
// callback
callback(this.channelMessages[channelId] ?? [])
}
}
private observeCommunity = () => {
this.waku.relay.addDecryptionKey(this.communityDecryptionKey)
this.waku.relay.addObserver(this.handleCommunity, [
this.waku.relay.addObserver(this.client.handleWakuMessage, [
this.communityContentTopic,
])
}
@ -156,7 +170,7 @@ export class Community {
})
const contentTopics = await Promise.all(symKeyPromises)
this.waku.relay.addObserver(this.handleMessage, contentTopics)
this.waku.relay.addObserver(this.client.handleWakuMessage, contentTopics)
}
private unobserveChannelMessages = (chatIds: string[]) => {
@ -165,16 +179,13 @@ export class Community {
this.communityPublicKey
)
this.waku.relay.deleteObserver(this.handleMessage, contentTopics)
}
private handleCommunity = (wakuMessage: WakuMessage) => {
const communityMetadata = handleCommunity(wakuMessage)
if (!communityMetadata) {
return
this.waku.relay.deleteObserver(this.client.handleWakuMessage, contentTopics)
}
public handleCommunityMetadataEvent = (
communityMetadata: CommunityMetadataType
) => {
if (this.communityMetadata) {
// Channels
const removedChats = difference(
Object.keys(this.communityMetadata.chats),
@ -192,26 +203,67 @@ export class Community {
if (addedChats.length) {
this.observeChannelMessages(addedChats)
}
}
// Community
// fixme!: set only if updated
this.communityMetadata = communityMetadata
this.communityCallback?.(communityMetadata)
}
private handleMessage = (wakuMessage: WakuMessage) => {
const messages = handleChannelChatMessage(
wakuMessage,
this.channelMessages,
this.client.account?.publicKey
)
public handleChannelChatMessageNewEvent = (chatMessage: MessageType) => {
const _messages = this.channelMessages[chatMessage.channelId] || []
if (!messages.length) {
return
// findIndexLeft
// const index = _messages.findIndex(({ timestamp }) => {
// new Date(Number(timestamp)) > new Date(Number(message.timestamp))
// })
// findIndexRight
let messageIndex = _messages.length
while (messageIndex > 0) {
const _message = _messages[messageIndex - 1]
// if (_message.messageId === chatMessage.messageId) {
// messageIndex = -1
// break
// }
if (
new Date(Number(_message.timestamp)) <=
new Date(Number(chatMessage.timestamp))
) {
break
}
// state
const channelId = messages[0].channelId
messageIndex--
}
this.channelMessages[channelId] = messages
// // already received
// if (messageIndex < 0) {
// return
// }
// replied
let responsedToMessageIndex = _messages.length
while (--responsedToMessageIndex >= 0) {
const _message = _messages[responsedToMessageIndex]
if (_message.messageId === chatMessage.responseTo) {
break
}
}
if (responsedToMessageIndex >= 0) {
chatMessage.responseToMessage = _messages[responsedToMessageIndex]
}
_messages.splice(messageIndex, 0, chatMessage)
// state
const channelId = _messages[0].channelId
this.channelMessages[channelId] = _messages
// callback
// todo!: review use of ! why not just use messages defined above?

View File

@ -0,0 +1,280 @@
// todo: move file
// fixme: relative paths
import { bytesToHex } from 'ethereum-cryptography/utils'
import { ApplicationMetadataMessage } from '../../../protos/application-metadata-message'
import {
ChatMessage,
DeleteMessage,
EditMessage,
} from '../../../protos/chat-message'
import { EmojiReaction } from '../../../protos/emoji-reaction'
import { PinMessage } from '../../../protos/pin-message'
import { ProtocolMessage } from '../../../protos/protocol-message'
import { CommunityDescription } from '../../proto/communities/v1/communities'
import { payloadToId } from '../../utils/payload-to-id'
import { recoverPublicKey } from '../../utils/recover-public-key'
import { getChannelId } from './get-channel-id'
import { getReactions } from './get-reactions'
import { mapChatMessage } from './map-chat-message'
import type { Account } from '../../account'
import type { Community /*, MessageType*/ } from './community'
import type { WakuMessage } from 'js-waku'
// todo?: return decoded, possibly mapped, event but do not update state and call callback
// todo?: return void
// todo?: return success (e.g. if this handler should be used in Community's init)
// fixme:
/**
* Argument of type '(wakuMessage: WakuMessage, community: Community) => boolean'
* is not assignable to parameter of type '(message: WakuMessage) => void'.ts(2345)
*/
// type HandlerResult = boolean
// type HandlerResult = CommunityDescription | MessageType | undefined
export function handleWakuMessage(
wakuMessage: WakuMessage,
// state
community: Community,
account?: Account
): void {
// decode (layers)
if (!wakuMessage.payload) {
return
}
// todo: explain
if (!wakuMessage.signaturePublicKey) {
return
}
let messageToDecode = wakuMessage.payload
let decodedProtocol
try {
decodedProtocol = ProtocolMessage.decode(messageToDecode)
if (decodedProtocol) {
messageToDecode = decodedProtocol.publicMessage
}
} catch {}
const decodedMetadata = ApplicationMetadataMessage.decode(messageToDecode)
if (!decodedMetadata.payload) {
return
}
messageToDecode = decodedMetadata.payload
// recover public key
const publicKey = recoverPublicKey(
decodedMetadata.signature,
decodedMetadata.payload
)
// decode, map and handle (events)
switch (decodedMetadata.type) {
case ApplicationMetadataMessage.Type.TYPE_COMMUNITY_DESCRIPTION: {
// decode
const decodedPayload = CommunityDescription.decode(messageToDecode)
// todo?: don't use class method
// handle (state and callback)
community.handleCommunityMetadataEvent(decodedPayload)
break
}
case ApplicationMetadataMessage.Type.TYPE_CHAT_MESSAGE: {
// decode
const decodedPayload = ChatMessage.decode(messageToDecode)
if (!decodedProtocol) {
break
}
// TODO?: ignore community.channelMessages which are messageType !== COMMUNITY_CHAT
// map
// fixme?: handle decodedProtocol.encryptedMessage
const messageId = payloadToId(decodedProtocol.publicMessage, publicKey)
// todo?: use full chatId (incl. pub key) instead
const channelId = getChannelId(decodedPayload.chatId)
const chatMessage = mapChatMessage(decodedPayload, {
messageId,
channelId,
})
// handle
community.handleChannelChatMessageNewEvent(chatMessage)
break
}
case ApplicationMetadataMessage.Type.TYPE_EDIT_MESSAGE: {
const decodedPayload = EditMessage.decode(messageToDecode)
const messageId = decodedPayload.messageId
const channelId = getChannelId(decodedPayload.chatId)
// todo?: move to class method (e.g. handleChannelChatMessageEditEvent)
const _messages = community.channelMessages[channelId] || []
let index = _messages.length
while (--index >= 0) {
const _message = _messages[index]
if (_message.messageId === messageId) {
break
}
}
// original not found
if (index < 0) {
break
}
const _message = _messages[index]
// todo?: use mapChatMessage
const message = {
..._message,
// fixme?: other fields that user can edit
text: decodedPayload.text,
}
_messages[index] = message
// state
community.channelMessages[channelId] = _messages
// callback
community.channelMessagesCallbacks[channelId]?.(
community.channelMessages[channelId]!
)
break
}
case ApplicationMetadataMessage.Type.TYPE_DELETE_MESSAGE: {
const decodedPayload = DeleteMessage.decode(messageToDecode)
const messageId = decodedPayload.messageId
const channelId = getChannelId(decodedPayload.chatId)
const _messages = community.channelMessages[channelId] || []
let index = _messages.length
while (--index >= 0) {
const _message = _messages[index]
if (_message.messageId === messageId) {
break
}
}
// original not found
if (index < 0) {
break
}
// todo?: use delete; set to null
_messages.splice(index, 1)
// state
community.channelMessages[channelId] = _messages
// callback
community.channelMessagesCallbacks[channelId]?.(
community.channelMessages[channelId]!
)
break
}
case ApplicationMetadataMessage.Type.TYPE_PIN_MESSAGE: {
const decodedPayload = PinMessage.decode(messageToDecode)
const messageId = decodedPayload.messageId
const channelId = getChannelId(decodedPayload.chatId)
const _messages = community.channelMessages[channelId] || []
let index = _messages.length
while (--index >= 0) {
const _message = _messages[index]
if (_message.messageId === messageId) {
break
}
}
// original not found
if (index < 0) {
break
}
_messages[index].pinned = Boolean(decodedPayload.pinned)
// state
community.channelMessages[channelId] = _messages
// callback
// todo!: review use of !
community.channelMessagesCallbacks[channelId]?.(
community.channelMessages[channelId]!
)
break
}
case ApplicationMetadataMessage.Type.TYPE_EMOJI_REACTION: {
const decodedPayload = EmojiReaction.decode(messageToDecode)
const messageId = decodedPayload.messageId
const channelId = getChannelId(decodedPayload.chatId)
const _messages = community.channelMessages[channelId] || []
let index = _messages.length
while (--index >= 0) {
const _message = _messages[index]
if (_message.messageId === messageId) {
break
}
}
// original not found
if (index < 0) {
break
}
const _message = _messages[index]
const isMe =
account?.publicKey === `0x${bytesToHex(wakuMessage.signaturePublicKey)}`
// fixme!
_messages[index].reactions = getReactions(
decodedPayload,
_message.reactions,
isMe
)
// state
community.channelMessages[channelId] = _messages
// callback
community.channelMessagesCallbacks[channelId]?.(
community.channelMessages[channelId]!
)
break
}
default:
break
}
return
}

View File

@ -14,6 +14,7 @@ const CHANNEL_ID =
await client.createAccount()
const communityMetadata = await client.community.fetchCommunity()
console.log(communityMetadata)
const fetchChannelMessages =
await client.community.createFetchChannelMessages(CHANNEL_ID, messages =>