Update messages data structure (#278)

* Update messages data structure

* remove setting of DeletedChatMessage

* delete unathorized events

* use event pks

* remove comment

* remove else

* fix ui

* fix more ui

* fix sendReaction
This commit is contained in:
Felicio Mununga 2022-06-23 16:47:14 +02:00 committed by GitHub
parent 53b1ed4f1b
commit fed1dd210f
No known key found for this signature in database
GPG Key ID: 0EB8D75C775AB6F1
5 changed files with 291 additions and 173 deletions

View File

@ -26,9 +26,13 @@ export type ChatMessage = ChatMessageProto & {
reactions: Reactions
chatUuid: string
signer: string
responseToMessage?: Omit<ChatMessage, 'responseToMessage'>
responseToMessage?: ChatMessage
edittedClock?: bigint
pinnedClock?: bigint
}
type FetchedMessage = { messageId: string; timestamp?: Date }
export class Chat {
private readonly client: Client
@ -39,7 +43,14 @@ export class Chat {
public readonly symmetricKey: Uint8Array
public description: CommunityChat
public readonly chatCallbacks: Set<(description: CommunityChat) => void>
public messages: ChatMessage[]
#messages: Map<string, ChatMessage>
#editTextEvents: Map<string, Pick<ChatMessage, 'clock' | 'signer' | 'text'>>
#pinEvents: Map<string, Pick<ChatMessage, 'clock' | 'pinned'>>
#reactEvents: Map<string, Pick<ChatMessage, 'clock' | 'reactions'>>
#deleteEvents: Map<string, Pick<ChatMessage, 'clock' | 'signer'>>
#fetchingMessages?: boolean
#previousFetchedStartTime?: Date
#oldestFetchedMessage?: FetchedMessage
public readonly messageCallbacks: Set<(messages: ChatMessage[]) => void>
constructor(options: {
@ -61,7 +72,11 @@ export class Chat {
this.description = options.description
this.chatCallbacks = new Set()
this.messages = []
this.#messages = new Map()
this.#editTextEvents = new Map()
this.#pinEvents = new Map()
this.#reactEvents = new Map()
this.#deleteEvents = new Map()
this.messageCallbacks = new Set()
}
@ -87,12 +102,40 @@ export class Chat {
})
}
/**
* Returns chat messages soreted in ascending order and reply references resolved.
*/
public getMessages = () => {
return this.messages
const messages: ChatMessage[] = []
for (const message of this.#messages.values()) {
// resolve references
const referencedMessage = this.#messages.get(message.responseTo)
if (referencedMessage) {
message.responseToMessage = referencedMessage
}
messages.push(message)
}
// sort
messages.sort((a, b) => {
if (a.clock < b.clock) {
return -1
}
if (a.clock > b.clock) {
return 1
}
return 0
})
return messages
}
public getMessage = (id: string) => {
return this.messages.find(message => message.messageId === id)
return this.#messages.get(id)
}
public onChange = (callback: (description: CommunityChat) => void) => {
@ -117,26 +160,24 @@ export class Chat {
}
}
public fetchMessages = async (
options: { start: Date },
callback: (messages: ChatMessage[]) => void
) => {
public fetchMessages = async (options: { start: Date }) => {
const previousOldestMessage = this.#oldestFetchedMessage
const startTime = options.start
const endTime = new Date()
// nothing to fetch
if (
previousOldestMessage &&
previousOldestMessage.timestamp &&
previousOldestMessage.timestamp < options.start
) {
return
}
let _oldestClock: bigint | undefined
let _oldestMessageTime: Date | undefined
if (this.messages.length) {
_oldestClock = this.messages[0].clock
_oldestMessageTime = new Date(Number(this.messages[0].timestamp))
// already handled
if (_oldestMessageTime <= options.start) {
callback(this.messages)
return
}
let endTime: Date
if (this.#previousFetchedStartTime) {
endTime = this.#previousFetchedStartTime
} else {
endTime = new Date()
}
await this.client.waku.store.queryHistory([this.contentTopic], {
@ -149,31 +190,48 @@ export class Chat {
pageDirection: PageDirection.BACKWARD,
decryptionKeys: [this.symmetricKey],
callback: (wakuMessages: WakuMessage[]) => {
// oldest message first
for (const wakuMessage of wakuMessages) {
this.client.handleWakuMessage(wakuMessage)
let index = wakuMessages.length
this.#fetchingMessages = true
// most recent message first
while (--index >= 0) {
this.client.handleWakuMessage(wakuMessages[index])
}
this.#fetchingMessages = false
},
})
// callback
// more not found
if (
_oldestClock &&
this.messages.length &&
_oldestClock >= this.messages[0].clock
) {
callback([])
this.#previousFetchedStartTime = startTime
// more chat messages not found
if (
previousOldestMessage &&
this.#oldestFetchedMessage &&
// same message
previousOldestMessage.messageId === this.#oldestFetchedMessage.messageId
) {
return
}
callback(this.messages)
const messages = this.emitMessages()
return messages
}
public emitMessages = (messages: ChatMessage[]) => {
// fixme!: don't emit on backfill
this.messageCallbacks.forEach(callback => callback([...messages]))
public emitMessages = () => {
if (this.#fetchingMessages) {
return
}
if (!this.#messages.size) {
return
}
const messages = this.getMessages()
this.messageCallbacks.forEach(callback => callback(messages))
return messages
}
public handleChange = (description: CommunityChat) => {
@ -184,140 +242,182 @@ export class Chat {
this.emitChange(description)
}
public handleNewMessage = (message: ChatMessage) => {
let messageIndex = this.messages.length
while (messageIndex > 0) {
const _message = this.messages[messageIndex - 1]
if (_message.clock <= message.clock) {
break
}
messageIndex--
public handleNewMessage = (newMessage: ChatMessage, timestamp?: Date) => {
// fetching in progress
if (this.#fetchingMessages) {
this.#oldestFetchedMessage = this.getOldestFetchedMessage(
this.#oldestFetchedMessage,
newMessage.messageId,
timestamp
)
}
let responseToMessageIndex = this.messages.length
while (--responseToMessageIndex >= 0) {
const _message = this.messages[responseToMessageIndex]
if (_message.messageId === message.responseTo) {
break
// delete event received first
const deletedEvent = this.#deleteEvents.get(newMessage.messageId)
if (deletedEvent) {
if (this.isAuthor(newMessage, deletedEvent.signer)) {
return
}
// delete unathorized event from stash
this.#deleteEvents.delete(newMessage.messageId)
}
if (responseToMessageIndex >= 0) {
message.responseToMessage = this.messages[responseToMessageIndex]
// message already received
const message = this.#messages.get(newMessage.messageId)
if (message) {
return
}
// action events received prior
const editTextEvent = this.#editTextEvents.get(newMessage.messageId)
if (editTextEvent) {
if (this.isAuthor(newMessage, editTextEvent.signer)) {
newMessage.text = editTextEvent.text
newMessage.edittedClock = editTextEvent.clock
}
// finally, delete event from stash whether it was authorized or not
this.#editTextEvents.delete(newMessage.messageId)
}
const pinEvent = this.#pinEvents.get(newMessage.messageId)
if (pinEvent) {
newMessage.pinned = pinEvent.pinned
newMessage.pinnedClock = pinEvent.clock
this.#pinEvents.delete(newMessage.messageId)
}
const reactEvent = this.#reactEvents.get(newMessage.messageId)
if (reactEvent) {
newMessage.reactions = reactEvent.reactions
this.#reactEvents.delete(newMessage.messageId)
}
// state
this.messages.splice(messageIndex, 0, message)
this.#messages.set(newMessage.messageId, newMessage)
// callback
this.emitMessages(this.messages)
this.emitMessages()
}
public handleEditedMessage = (
messageId: string,
text: string,
clock: bigint,
signerPublicKey: string
) => {
let messageIndex = this.messages.length
while (--messageIndex >= 0) {
const _message = this.messages[messageIndex]
const message = this.#messages.get(messageId)
if (_message.messageId === messageId) {
break
}
}
if (message && this.isAuthor(message, signerPublicKey)) {
message.text = text
this.emitMessages()
// original not found
if (messageIndex < 0) {
return
}
if (!this.isAuthor(this.messages[messageIndex], signerPublicKey)) {
return
const editTextEvent = this.#editTextEvents.get(messageId)
if (!editTextEvent || editTextEvent.clock < clock) {
this.#editTextEvents.set(messageId, {
clock,
signer: signerPublicKey,
text,
})
}
this.messages[messageIndex] = {
...this.messages[messageIndex],
text,
}
// callback
this.emitMessages(this.messages)
}
public handleDeletedMessage = (
messageId: string,
clock: bigint,
signerPublicKey: string
) => {
let messageIndex = this.messages.length
while (--messageIndex >= 0) {
const _message = this.messages[messageIndex]
const message = this.#messages.get(messageId)
if (message && this.isAuthor(message, signerPublicKey)) {
this.#messages.delete(messageId)
this.#deleteEvents.set(messageId, { clock, signer: signerPublicKey })
this.emitMessages()
if (_message.messageId === messageId) {
break
}
}
if (messageIndex < 0) {
return
}
if (!this.isAuthor(this.messages[messageIndex], signerPublicKey)) {
return
const deleteEvent = this.#deleteEvents.get(messageId)
if (!deleteEvent || deleteEvent.clock > clock) {
this.#deleteEvents.set(messageId, { clock, signer: signerPublicKey })
}
this.messages.splice(messageIndex, 1)
this.emitMessages(this.messages)
}
public handlePinnedMessage = (messageId: string, pinned?: boolean) => {
let messageIndex = this.messages.length
while (--messageIndex >= 0) {
const _message = this.messages[messageIndex]
public handlePinnedMessage = (
messageId: string,
clock: bigint,
pinned?: boolean
) => {
const message = this.#messages.get(messageId)
if (message) {
message.pinned = Boolean(pinned)
message.pinnedClock = clock
if (_message.messageId === messageId) {
break
}
}
this.emitMessages()
if (messageIndex < 0) {
return
}
this.messages[messageIndex].pinned = Boolean(pinned)
this.emitMessages(this.messages)
const pinEvent = this.#pinEvents.get(messageId)
if (!pinEvent || pinEvent.clock < clock) {
this.#pinEvents.set(messageId, {
clock,
pinned: Boolean(pinned),
})
}
}
public handleEmojiReaction = (
messageId: string,
reaction: EmojiReaction,
publicKey: string
clock: bigint,
signerPublicKey: string
) => {
let messageIndex = this.messages.length
while (--messageIndex >= 0) {
const _message = this.messages[messageIndex]
const message = this.#messages.get(messageId)
if (message) {
const reactions = getReactions(
reaction,
message.reactions,
signerPublicKey
)
message.reactions = reactions
if (_message.messageId === messageId) {
break
}
}
this.emitMessages()
if (messageIndex < 0) {
return
}
this.messages[messageIndex].reactions = getReactions(
reaction,
this.messages[messageIndex].reactions,
publicKey
)
const reactEvent = this.#reactEvents.get(messageId)
if (!reactEvent) {
const reactions = getReactions(
reaction,
{
THUMBS_UP: new Set<string>(),
THUMBS_DOWN: new Set<string>(),
LOVE: new Set<string>(),
LAUGH: new Set<string>(),
SAD: new Set<string>(),
ANGRY: new Set<string>(),
},
signerPublicKey
)
this.emitMessages(this.messages)
this.#reactEvents.set(messageId, { clock, reactions })
} else {
const reactions = getReactions(
reaction,
reactEvent.reactions,
signerPublicKey
)
this.#reactEvents.set(messageId, { clock, reactions })
}
}
public sendTextMessage = async (text: string, responseTo?: string) => {
@ -400,25 +500,13 @@ export class Chat {
throw new Error('Text message cannot be edited without a created account')
}
let messageIndex = this.messages.length
while (--messageIndex >= 0) {
const _message = this.messages[messageIndex]
const message = this.#messages.get(messageId)
if (_message.messageId === messageId) {
break
}
if (!message) {
throw new Error('Message not found')
}
if (messageIndex < 0) {
throw new Error('Text message was not found')
}
if (
!this.isAuthor(
this.messages[messageIndex],
`0x${this.client.account.publicKey}`
)
) {
if (!this.isAuthor(message, `0x${this.client.account.publicKey}`)) {
throw new Error('Text message can only be edited by its author')
}
@ -450,25 +538,13 @@ export class Chat {
)
}
let messageIndex = this.messages.length
while (--messageIndex >= 0) {
const _message = this.messages[messageIndex]
const message = this.#messages.get(messageId)
if (_message.messageId === messageId) {
break
}
if (!message) {
throw new Error('Message not found')
}
if (messageIndex < 0) {
throw new Error('Text message was not found')
}
if (
!this.isAuthor(
this.messages[messageIndex],
`0x${this.client.account.publicKey}`
)
) {
if (!this.isAuthor(message, `0x${this.client.account.publicKey}`)) {
throw new Error('Text message can only be deleted by its author')
}
@ -496,14 +572,14 @@ export class Chat {
throw new Error('Account not initialized')
}
const message = this.getMessage(messageId)
const message = this.#messages.get(messageId)
if (!message) {
throw new Error('Message not found')
}
const retracted = message.reactions[reaction].has(
this.client.account.publicKey
`0x${this.client.account.publicKey}`
)
const payload = EmojiReaction.encode({
@ -530,4 +606,33 @@ export class Chat {
): boolean => {
return message.signer === signerPublicKey
}
private getOldestFetchedMessage(
oldestMessage: FetchedMessage | undefined,
messageId: string,
messageTimestamp?: Date
): FetchedMessage {
let message: FetchedMessage
if (!oldestMessage) {
message = {
messageId: messageId,
timestamp: messageTimestamp,
}
} else if (
messageTimestamp &&
oldestMessage.timestamp &&
// is older
messageTimestamp < oldestMessage.timestamp
) {
message = {
messageId: messageId,
timestamp: messageTimestamp,
}
} else {
message = oldestMessage
}
return message
}
}

View File

@ -88,12 +88,12 @@ export class Community {
}
public fetch = async () => {
// most recent page first
await this.client.waku.store.queryHistory([this.contentTopic], {
// oldest message first
callback: wakuMessages => {
let index = wakuMessages.length
// most recent page first
// most recent message first
while (--index >= 0) {
this.client.handleWakuMessage(wakuMessages[index])

View File

@ -61,6 +61,7 @@ export function handleWakuMessage(
decodedProtocol?.publicMessage ?? wakuMessage.payload,
signerPublicKeyBytes
)
const messageTimestamp = wakuMessage.timestamp
const signerPublicKey = `0x${bytesToHex(signerPublicKeyBytes)}`
// already handled
@ -112,7 +113,7 @@ export function handleWakuMessage(
})
// handle
chat.handleNewMessage(chatMessage)
chat.handleNewMessage(chatMessage, messageTimestamp)
break
}
@ -145,6 +146,7 @@ export function handleWakuMessage(
chat.handleEditedMessage(
messageId,
decodedPayload.text,
decodedPayload.clock,
signerPublicKey
)
@ -176,7 +178,11 @@ export function handleWakuMessage(
return
}
chat.handleDeletedMessage(messageId, signerPublicKey)
chat.handleDeletedMessage(
messageId,
decodedPayload.clock,
signerPublicKey
)
break
}
@ -206,7 +212,11 @@ export function handleWakuMessage(
return
}
chat.handlePinnedMessage(messageId, decodedPayload.pinned)
chat.handlePinnedMessage(
messageId,
decodedPayload.clock,
decodedPayload.pinned
)
break
}
@ -236,7 +246,12 @@ export function handleWakuMessage(
return
}
chat.handleEmojiReaction(messageId, decodedPayload, signerPublicKey)
chat.handleEmojiReaction(
messageId,
decodedPayload,
decodedPayload.clock,
signerPublicKey
)
break
}

View File

@ -1,5 +1,7 @@
import React, { useState } from 'react'
import { useMatch } from 'react-router-dom'
import { UserProfileDialog } from '~/src/components/user-profile-dialog'
import { useChatContext } from '~/src/contexts/chat-context'
import { BellIcon } from '~/src/icons/bell-icon'
@ -56,27 +58,22 @@ interface Props {
export const ChatMessage = (props: Props) => {
const { client, account } = useProtocol()
const { params } = useMatch(':id')!
const chatId = params.id!
const { message } = props
const mention = false
const pinned = false
const {
messageId,
chatId,
contentType,
clock,
reactions,
sender,
responseTo,
} = message
const { messageId, contentType, clock, reactions, signer, responseTo } =
message
// TODO: remove usage of 0x prefix
const owner = '0x' + account?.publicKey === sender
const chat = client.community.getChatById(chatId)
const owner = '0x' + account?.publicKey === signer
const chat = client.community.getChat(chatId)
const member = client.community.getMember(sender) ?? {}
const member = client.community.getMember(signer) ?? {}
const [editing, setEditing] = useState(false)
const [reacting, setReacting] = useState(false)

View File

@ -29,8 +29,9 @@ export const MessageReply = (props: Props) => {
)
}
const { contentType, text, sender } = message
const { username } = client.community.getMember(sender)
const { contentType, text, signer } = message
const { username } = client.community.getMember(signer)
return (
<Wrapper>