add packages/status-js/src/client/community/handle-waku-message.ts
This commit is contained in:
parent
a41091a680
commit
f8eaedc9a7
|
@ -1,10 +1,13 @@
|
|||
// todo?: already received (by messageId or event)
|
||||
// todo: ignore not found
|
||||
// todo: handle updates together with fetching history (chat messages)
|
||||
// todo: handle diff waku messages on diff topics
|
||||
// todo: tests
|
||||
// todo?: already received (by messageId or event)
|
||||
// todo: use clock for sorting
|
||||
// todo: use Map
|
||||
// todo: write class for channels
|
||||
// todo: fetch all history until Map; remove end param
|
||||
// todo: handle
|
||||
|
||||
// todo?: use clock for sorting
|
||||
// todo: tests
|
||||
|
||||
// todo: handle disconnections; no messages after sleep; libp2p;
|
||||
// todo: identities/members?
|
||||
|
@ -14,6 +17,8 @@
|
|||
// todo?: call onChannel* separately
|
||||
// todo?: ignore messages of not yet approved users
|
||||
// todo?: ignore messages with invalid signature
|
||||
// todo?: handle encrypted waku messages
|
||||
// todo?: handle waku messages/events without identity
|
||||
|
||||
import { hexToBytes } from 'ethereum-cryptography/utils'
|
||||
import { Waku, WakuMessage } from 'js-waku'
|
||||
|
@ -22,6 +27,7 @@ import { ApplicationMetadataMessage } from '~/protos/application-metadata-messag
|
|||
|
||||
import { Account } from './account'
|
||||
import { Community } from './client/community/community'
|
||||
import { handleWakuMessage } from './client/community/handle-waku-message'
|
||||
|
||||
export interface ClientOptions {
|
||||
publicKey: string
|
||||
|
@ -110,6 +116,10 @@ class Client {
|
|||
|
||||
await this.waku.relay.send(wakuMesage)
|
||||
}
|
||||
|
||||
public handleWakuMessage = (wakuMessage: WakuMessage): void => {
|
||||
handleWakuMessage(wakuMessage, this.community, this.account)
|
||||
}
|
||||
}
|
||||
|
||||
export async function createClient(options: ClientOptions): Promise<Client> {
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
import { waku_message } from 'js-waku'
|
||||
import { PageDirection, waku_message } from 'js-waku'
|
||||
import { hexToBytes } from 'js-waku/build/main/lib/utils'
|
||||
import difference from 'lodash/difference'
|
||||
|
||||
|
@ -9,18 +9,16 @@ import { EmojiReaction } from '~/protos/emoji-reaction'
|
|||
import { idToContentTopic } from '../../contentTopic'
|
||||
import { createSymKeyFromPassword } from '../../encryption'
|
||||
import { createChannelContentTopics } from './create-channel-content-topics'
|
||||
import { fetchChannelChatMessages } from './fetch-channel-chat-messages'
|
||||
import { handleChannelChatMessage } from './handle-channel-chat-message'
|
||||
import { handleCommunity } from './handle-community'
|
||||
|
||||
import type { Client } from '../../client'
|
||||
import type { CommunityDescription } from '../../wire/community_description'
|
||||
import type { Reactions } from './get-reactions'
|
||||
import type { ImageMessage } from '~/src/proto/communities/v1/chat_message'
|
||||
import type { Waku, WakuMessage } from 'js-waku'
|
||||
import type { Waku } from 'js-waku'
|
||||
|
||||
export type CommunityMetadataType = CommunityDescription['proto']
|
||||
|
||||
// todo?: rename to ChatMessageType
|
||||
export type MessageType = ChatMessage & {
|
||||
messageId: string
|
||||
pinned: boolean
|
||||
|
@ -36,13 +34,12 @@ export class Community {
|
|||
// fixme!
|
||||
private communityContentTopic!: string
|
||||
private communityDecryptionKey!: Uint8Array
|
||||
public communityMetadata!: CommunityMetadataType
|
||||
public channelMessages: Partial<{ [key: string]: MessageType[] }> = {}
|
||||
private channelMessagesCallbacks: {
|
||||
public communityMetadata!: CommunityMetadataType // state
|
||||
public channelMessages: Partial<{ [key: string]: MessageType[] }> = {} // state
|
||||
public channelMessagesCallbacks: {
|
||||
[key: string]: (messages: MessageType[]) => void
|
||||
} = {}
|
||||
|
||||
private communityCallback:
|
||||
public communityCallback:
|
||||
| ((community: CommunityMetadataType) => void)
|
||||
| undefined
|
||||
|
||||
|
@ -83,14 +80,15 @@ export class Community {
|
|||
await this.waku.store.queryHistory([this.communityContentTopic], {
|
||||
decryptionKeys: [this.communityDecryptionKey],
|
||||
callback: wakuMessages => {
|
||||
// todo: iterate from right
|
||||
for (const wakuMessage of wakuMessages.reverse()) {
|
||||
const message = handleCommunity(wakuMessage)
|
||||
this.client.handleWakuMessage(wakuMessage)
|
||||
|
||||
if (!message) {
|
||||
if (!this.communityMetadata) {
|
||||
return shouldStop
|
||||
}
|
||||
|
||||
communityMetadata = message
|
||||
communityMetadata = this.communityMetadata
|
||||
shouldStop = true
|
||||
|
||||
return shouldStop
|
||||
|
@ -111,30 +109,46 @@ export class Community {
|
|||
// todo?: keep in state instead and replace the factory
|
||||
const symKey = await createSymKeyFromPassword(id)
|
||||
|
||||
return async (options: { start: Date; end?: Date }) => {
|
||||
const messages = await fetchChannelChatMessages(
|
||||
this.waku,
|
||||
symKey,
|
||||
channelContentTopic,
|
||||
this.channelMessages[channelId] ?? [],
|
||||
options,
|
||||
callback
|
||||
)
|
||||
return async (options: { start: Date }) => {
|
||||
const startTime = options.start
|
||||
const endTime = new Date()
|
||||
|
||||
if (!messages.length) {
|
||||
return
|
||||
const _messages = this.channelMessages[channelId] || []
|
||||
|
||||
if (_messages.length) {
|
||||
const oldestMessageTime = new Date(Number(_messages[0].timestamp))
|
||||
|
||||
if (oldestMessageTime <= options.start) {
|
||||
callback(_messages)
|
||||
}
|
||||
}
|
||||
|
||||
// state
|
||||
this.channelMessages[channelId] = messages
|
||||
await this.waku.store.queryHistory([channelContentTopic], {
|
||||
timeFilter: {
|
||||
startTime: startTime,
|
||||
endTime: endTime,
|
||||
},
|
||||
pageSize: 50,
|
||||
// most recent page first
|
||||
pageDirection: PageDirection.BACKWARD,
|
||||
decryptionKeys: [symKey],
|
||||
callback: wakuMessages => {
|
||||
// oldest message first
|
||||
for (const wakuMessage of wakuMessages) {
|
||||
this.client.handleWakuMessage(wakuMessage)
|
||||
}
|
||||
},
|
||||
})
|
||||
|
||||
return
|
||||
// todo: call abck only if oldestMessageTime has changed
|
||||
// callback
|
||||
callback(this.channelMessages[channelId] ?? [])
|
||||
}
|
||||
}
|
||||
|
||||
private observeCommunity = () => {
|
||||
this.waku.relay.addDecryptionKey(this.communityDecryptionKey)
|
||||
this.waku.relay.addObserver(this.handleCommunity, [
|
||||
this.waku.relay.addObserver(this.client.handleWakuMessage, [
|
||||
this.communityContentTopic,
|
||||
])
|
||||
}
|
||||
|
@ -156,7 +170,7 @@ export class Community {
|
|||
})
|
||||
const contentTopics = await Promise.all(symKeyPromises)
|
||||
|
||||
this.waku.relay.addObserver(this.handleMessage, contentTopics)
|
||||
this.waku.relay.addObserver(this.client.handleWakuMessage, contentTopics)
|
||||
}
|
||||
|
||||
private unobserveChannelMessages = (chatIds: string[]) => {
|
||||
|
@ -165,53 +179,91 @@ export class Community {
|
|||
this.communityPublicKey
|
||||
)
|
||||
|
||||
this.waku.relay.deleteObserver(this.handleMessage, contentTopics)
|
||||
this.waku.relay.deleteObserver(this.client.handleWakuMessage, contentTopics)
|
||||
}
|
||||
|
||||
private handleCommunity = (wakuMessage: WakuMessage) => {
|
||||
const communityMetadata = handleCommunity(wakuMessage)
|
||||
public handleCommunityMetadataEvent = (
|
||||
communityMetadata: CommunityMetadataType
|
||||
) => {
|
||||
if (this.communityMetadata) {
|
||||
// Channels
|
||||
const removedChats = difference(
|
||||
Object.keys(this.communityMetadata.chats),
|
||||
Object.keys(communityMetadata.chats)
|
||||
)
|
||||
const addedChats = difference(
|
||||
Object.keys(communityMetadata.chats),
|
||||
Object.keys(communityMetadata.chats)
|
||||
)
|
||||
|
||||
if (!communityMetadata) {
|
||||
return
|
||||
}
|
||||
if (removedChats.length) {
|
||||
this.unobserveChannelMessages(removedChats)
|
||||
}
|
||||
|
||||
// Channels
|
||||
const removedChats = difference(
|
||||
Object.keys(this.communityMetadata.chats),
|
||||
Object.keys(communityMetadata.chats)
|
||||
)
|
||||
const addedChats = difference(
|
||||
Object.keys(communityMetadata.chats),
|
||||
Object.keys(communityMetadata.chats)
|
||||
)
|
||||
|
||||
if (removedChats.length) {
|
||||
this.unobserveChannelMessages(removedChats)
|
||||
}
|
||||
|
||||
if (addedChats.length) {
|
||||
this.observeChannelMessages(addedChats)
|
||||
if (addedChats.length) {
|
||||
this.observeChannelMessages(addedChats)
|
||||
}
|
||||
}
|
||||
|
||||
// Community
|
||||
// fixme!: set only if updated
|
||||
this.communityMetadata = communityMetadata
|
||||
this.communityCallback?.(communityMetadata)
|
||||
}
|
||||
|
||||
private handleMessage = (wakuMessage: WakuMessage) => {
|
||||
const messages = handleChannelChatMessage(
|
||||
wakuMessage,
|
||||
this.channelMessages,
|
||||
this.client.account?.publicKey
|
||||
)
|
||||
public handleChannelChatMessageNewEvent = (chatMessage: MessageType) => {
|
||||
const _messages = this.channelMessages[chatMessage.channelId] || []
|
||||
|
||||
if (!messages.length) {
|
||||
return
|
||||
// findIndexLeft
|
||||
// const index = _messages.findIndex(({ timestamp }) => {
|
||||
// new Date(Number(timestamp)) > new Date(Number(message.timestamp))
|
||||
// })
|
||||
// findIndexRight
|
||||
let messageIndex = _messages.length
|
||||
while (messageIndex > 0) {
|
||||
const _message = _messages[messageIndex - 1]
|
||||
|
||||
// if (_message.messageId === chatMessage.messageId) {
|
||||
// messageIndex = -1
|
||||
|
||||
// break
|
||||
// }
|
||||
|
||||
if (
|
||||
new Date(Number(_message.timestamp)) <=
|
||||
new Date(Number(chatMessage.timestamp))
|
||||
) {
|
||||
break
|
||||
}
|
||||
|
||||
messageIndex--
|
||||
}
|
||||
|
||||
// state
|
||||
const channelId = messages[0].channelId
|
||||
// // already received
|
||||
// if (messageIndex < 0) {
|
||||
// return
|
||||
// }
|
||||
|
||||
this.channelMessages[channelId] = messages
|
||||
// replied
|
||||
let responsedToMessageIndex = _messages.length
|
||||
while (--responsedToMessageIndex >= 0) {
|
||||
const _message = _messages[responsedToMessageIndex]
|
||||
|
||||
if (_message.messageId === chatMessage.responseTo) {
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
if (responsedToMessageIndex >= 0) {
|
||||
chatMessage.responseToMessage = _messages[responsedToMessageIndex]
|
||||
}
|
||||
|
||||
_messages.splice(messageIndex, 0, chatMessage)
|
||||
|
||||
// state
|
||||
const channelId = _messages[0].channelId
|
||||
|
||||
this.channelMessages[channelId] = _messages
|
||||
|
||||
// callback
|
||||
// todo!: review use of ! why not just use messages defined above?
|
||||
|
|
|
@ -0,0 +1,280 @@
|
|||
// todo: move file
|
||||
// fixme: relative paths
|
||||
|
||||
import { bytesToHex } from 'ethereum-cryptography/utils'
|
||||
|
||||
import { ApplicationMetadataMessage } from '../../../protos/application-metadata-message'
|
||||
import {
|
||||
ChatMessage,
|
||||
DeleteMessage,
|
||||
EditMessage,
|
||||
} from '../../../protos/chat-message'
|
||||
import { EmojiReaction } from '../../../protos/emoji-reaction'
|
||||
import { PinMessage } from '../../../protos/pin-message'
|
||||
import { ProtocolMessage } from '../../../protos/protocol-message'
|
||||
import { CommunityDescription } from '../../proto/communities/v1/communities'
|
||||
import { payloadToId } from '../../utils/payload-to-id'
|
||||
import { recoverPublicKey } from '../../utils/recover-public-key'
|
||||
import { getChannelId } from './get-channel-id'
|
||||
import { getReactions } from './get-reactions'
|
||||
import { mapChatMessage } from './map-chat-message'
|
||||
|
||||
import type { Account } from '../../account'
|
||||
import type { Community /*, MessageType*/ } from './community'
|
||||
import type { WakuMessage } from 'js-waku'
|
||||
|
||||
// todo?: return decoded, possibly mapped, event but do not update state and call callback
|
||||
// todo?: return void
|
||||
// todo?: return success (e.g. if this handler should be used in Community's init)
|
||||
// fixme:
|
||||
/**
|
||||
* Argument of type '(wakuMessage: WakuMessage, community: Community) => boolean'
|
||||
* is not assignable to parameter of type '(message: WakuMessage) => void'.ts(2345)
|
||||
*/
|
||||
// type HandlerResult = boolean
|
||||
// type HandlerResult = CommunityDescription | MessageType | undefined
|
||||
|
||||
export function handleWakuMessage(
|
||||
wakuMessage: WakuMessage,
|
||||
// state
|
||||
community: Community,
|
||||
account?: Account
|
||||
): void {
|
||||
// decode (layers)
|
||||
if (!wakuMessage.payload) {
|
||||
return
|
||||
}
|
||||
|
||||
// todo: explain
|
||||
if (!wakuMessage.signaturePublicKey) {
|
||||
return
|
||||
}
|
||||
|
||||
let messageToDecode = wakuMessage.payload
|
||||
let decodedProtocol
|
||||
try {
|
||||
decodedProtocol = ProtocolMessage.decode(messageToDecode)
|
||||
if (decodedProtocol) {
|
||||
messageToDecode = decodedProtocol.publicMessage
|
||||
}
|
||||
} catch {}
|
||||
|
||||
const decodedMetadata = ApplicationMetadataMessage.decode(messageToDecode)
|
||||
if (!decodedMetadata.payload) {
|
||||
return
|
||||
}
|
||||
messageToDecode = decodedMetadata.payload
|
||||
|
||||
// recover public key
|
||||
const publicKey = recoverPublicKey(
|
||||
decodedMetadata.signature,
|
||||
decodedMetadata.payload
|
||||
)
|
||||
|
||||
// decode, map and handle (events)
|
||||
switch (decodedMetadata.type) {
|
||||
case ApplicationMetadataMessage.Type.TYPE_COMMUNITY_DESCRIPTION: {
|
||||
// decode
|
||||
const decodedPayload = CommunityDescription.decode(messageToDecode)
|
||||
|
||||
// todo?: don't use class method
|
||||
// handle (state and callback)
|
||||
community.handleCommunityMetadataEvent(decodedPayload)
|
||||
|
||||
break
|
||||
}
|
||||
|
||||
case ApplicationMetadataMessage.Type.TYPE_CHAT_MESSAGE: {
|
||||
// decode
|
||||
const decodedPayload = ChatMessage.decode(messageToDecode)
|
||||
|
||||
if (!decodedProtocol) {
|
||||
break
|
||||
}
|
||||
|
||||
// TODO?: ignore community.channelMessages which are messageType !== COMMUNITY_CHAT
|
||||
|
||||
// map
|
||||
// fixme?: handle decodedProtocol.encryptedMessage
|
||||
const messageId = payloadToId(decodedProtocol.publicMessage, publicKey)
|
||||
// todo?: use full chatId (incl. pub key) instead
|
||||
const channelId = getChannelId(decodedPayload.chatId)
|
||||
|
||||
const chatMessage = mapChatMessage(decodedPayload, {
|
||||
messageId,
|
||||
channelId,
|
||||
})
|
||||
|
||||
// handle
|
||||
community.handleChannelChatMessageNewEvent(chatMessage)
|
||||
|
||||
break
|
||||
}
|
||||
|
||||
case ApplicationMetadataMessage.Type.TYPE_EDIT_MESSAGE: {
|
||||
const decodedPayload = EditMessage.decode(messageToDecode)
|
||||
|
||||
const messageId = decodedPayload.messageId
|
||||
const channelId = getChannelId(decodedPayload.chatId)
|
||||
|
||||
// todo?: move to class method (e.g. handleChannelChatMessageEditEvent)
|
||||
const _messages = community.channelMessages[channelId] || []
|
||||
|
||||
let index = _messages.length
|
||||
while (--index >= 0) {
|
||||
const _message = _messages[index]
|
||||
|
||||
if (_message.messageId === messageId) {
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
// original not found
|
||||
if (index < 0) {
|
||||
break
|
||||
}
|
||||
|
||||
const _message = _messages[index]
|
||||
|
||||
// todo?: use mapChatMessage
|
||||
const message = {
|
||||
..._message,
|
||||
// fixme?: other fields that user can edit
|
||||
text: decodedPayload.text,
|
||||
}
|
||||
|
||||
_messages[index] = message
|
||||
|
||||
// state
|
||||
community.channelMessages[channelId] = _messages
|
||||
|
||||
// callback
|
||||
community.channelMessagesCallbacks[channelId]?.(
|
||||
community.channelMessages[channelId]!
|
||||
)
|
||||
|
||||
break
|
||||
}
|
||||
|
||||
case ApplicationMetadataMessage.Type.TYPE_DELETE_MESSAGE: {
|
||||
const decodedPayload = DeleteMessage.decode(messageToDecode)
|
||||
|
||||
const messageId = decodedPayload.messageId
|
||||
const channelId = getChannelId(decodedPayload.chatId)
|
||||
|
||||
const _messages = community.channelMessages[channelId] || []
|
||||
|
||||
let index = _messages.length
|
||||
while (--index >= 0) {
|
||||
const _message = _messages[index]
|
||||
|
||||
if (_message.messageId === messageId) {
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
// original not found
|
||||
if (index < 0) {
|
||||
break
|
||||
}
|
||||
|
||||
// todo?: use delete; set to null
|
||||
_messages.splice(index, 1)
|
||||
|
||||
// state
|
||||
community.channelMessages[channelId] = _messages
|
||||
|
||||
// callback
|
||||
community.channelMessagesCallbacks[channelId]?.(
|
||||
community.channelMessages[channelId]!
|
||||
)
|
||||
|
||||
break
|
||||
}
|
||||
|
||||
case ApplicationMetadataMessage.Type.TYPE_PIN_MESSAGE: {
|
||||
const decodedPayload = PinMessage.decode(messageToDecode)
|
||||
|
||||
const messageId = decodedPayload.messageId
|
||||
const channelId = getChannelId(decodedPayload.chatId)
|
||||
|
||||
const _messages = community.channelMessages[channelId] || []
|
||||
|
||||
let index = _messages.length
|
||||
while (--index >= 0) {
|
||||
const _message = _messages[index]
|
||||
|
||||
if (_message.messageId === messageId) {
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
// original not found
|
||||
if (index < 0) {
|
||||
break
|
||||
}
|
||||
|
||||
_messages[index].pinned = Boolean(decodedPayload.pinned)
|
||||
|
||||
// state
|
||||
community.channelMessages[channelId] = _messages
|
||||
|
||||
// callback
|
||||
// todo!: review use of !
|
||||
community.channelMessagesCallbacks[channelId]?.(
|
||||
community.channelMessages[channelId]!
|
||||
)
|
||||
|
||||
break
|
||||
}
|
||||
|
||||
case ApplicationMetadataMessage.Type.TYPE_EMOJI_REACTION: {
|
||||
const decodedPayload = EmojiReaction.decode(messageToDecode)
|
||||
|
||||
const messageId = decodedPayload.messageId
|
||||
const channelId = getChannelId(decodedPayload.chatId)
|
||||
|
||||
const _messages = community.channelMessages[channelId] || []
|
||||
|
||||
let index = _messages.length
|
||||
while (--index >= 0) {
|
||||
const _message = _messages[index]
|
||||
|
||||
if (_message.messageId === messageId) {
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
// original not found
|
||||
if (index < 0) {
|
||||
break
|
||||
}
|
||||
|
||||
const _message = _messages[index]
|
||||
const isMe =
|
||||
account?.publicKey === `0x${bytesToHex(wakuMessage.signaturePublicKey)}`
|
||||
|
||||
// fixme!
|
||||
_messages[index].reactions = getReactions(
|
||||
decodedPayload,
|
||||
_message.reactions,
|
||||
isMe
|
||||
)
|
||||
|
||||
// state
|
||||
community.channelMessages[channelId] = _messages
|
||||
|
||||
// callback
|
||||
community.channelMessagesCallbacks[channelId]?.(
|
||||
community.channelMessages[channelId]!
|
||||
)
|
||||
|
||||
break
|
||||
}
|
||||
|
||||
default:
|
||||
break
|
||||
}
|
||||
|
||||
return
|
||||
}
|
|
@ -14,6 +14,7 @@ const CHANNEL_ID =
|
|||
await client.createAccount()
|
||||
|
||||
const communityMetadata = await client.community.fetchCommunity()
|
||||
console.log(communityMetadata)
|
||||
|
||||
const fetchChannelMessages =
|
||||
await client.community.createFetchChannelMessages(CHANNEL_ID, messages =>
|
||||
|
|
Loading…
Reference in New Issue