From 6cad59cab1f7c3cbd6a2f4bb358712c471c57ffb Mon Sep 17 00:00:00 2001 From: Felicio Mununga Date: Mon, 6 Jun 2022 15:33:56 +0200 Subject: [PATCH] split history fetching --- packages/status-js/src/client.ts | 143 ++---------------- .../community/fetch-channel-messages.ts | 51 +++++++ .../src/client/community/fetch-messages.ts | 66 ++++++++ .../src/client/community/handle-message.ts | 72 +++++++++ 4 files changed, 202 insertions(+), 130 deletions(-) create mode 100644 packages/status-js/src/client/community/fetch-channel-messages.ts create mode 100644 packages/status-js/src/client/community/fetch-messages.ts create mode 100644 packages/status-js/src/client/community/handle-message.ts diff --git a/packages/status-js/src/client.ts b/packages/status-js/src/client.ts index 4071272e..8df49f0c 100644 --- a/packages/status-js/src/client.ts +++ b/packages/status-js/src/client.ts @@ -9,7 +9,6 @@ // proactively change import { bytesToHex } from 'ethereum-cryptography/utils' import { Waku, waku_message } from 'js-waku' -import chunk from 'lodash/chunk' import difference from 'lodash/difference' import sortBy from 'lodash/sortBy' import uniqBy from 'lodash/uniqBy' @@ -21,6 +20,7 @@ import { EmojiReaction } from '../protos/emoji-reaction' import { PinMessage } from '../protos/pin-message' import { ProtocolMessage } from '../protos/protocol-message' import { Account } from './account' +import { fetchChannelMessages } from './client/community/fetch-channel-messages' // import { ChatIdentity } from './wire/chat_identity' import { idToContentTopic } from './contentTopic' import { createSymKeyFromPassword } from './encryption' @@ -76,11 +76,12 @@ class Client { bootstrap: { default: false, peers: [ - // '/dns4/node-01.gc-us-central1-a.wakuv2.test.statusim.net/tcp/443/wss/p2p/16Uiu2HAmJb2e28qLXxT5kZxVUUoJt72EMzNGXB47Rxx5hw3q4YjS', - '/dns4/node-01.do-ams3.wakuv2.test.statusim.net/tcp/8000/wss/p2p/16Uiu2HAmPLe7Mzm8TsYUubgCAW1aJoeFScxrLj8ppHFivPo97bUZ', + '/dns4/node-01.gc-us-central1-a.wakuv2.test.statusim.net/tcp/443/wss/p2p/16Uiu2HAmJb2e28qLXxT5kZxVUUoJt72EMzNGXB47Rxx5hw3q4YjS', + // '/dns4/node-01.do-ams3.wakuv2.test.statusim.net/tcp/8000/wss/p2p/16Uiu2HAmPLe7Mzm8TsYUubgCAW1aJoeFScxrLj8ppHFivPo97bUZ', // '/dns4/node-01.do-ams3.status.test.statusim.net/tcp/30303/p2p/16Uiu2HAkukebeXjTQ9QDBeNDWuGfbaSg79wkkhK4vPocLgR6QFDf' ], }, + libp2p: { config: { pubsub: { enabled: true, emitSelf: true } } }, }) await waku.waitForRemotePeer() this.waku = waku @@ -108,23 +109,29 @@ class Client { class Community { private client: Client private waku: Waku - private communityPublicKey: string + public communityPublicKey: string private communityContentTopic!: string private communityDecryptionKey!: Uint8Array public communityMetadata!: CommunityType - private communityCallback: ((community: CommunityType) => void) | undefined - private channelMessages: Partial<{ [key: string]: MessageType[] }> = {} + public channelMessages: Partial<{ [key: string]: MessageType[] }> = {} private channelCallbacks: { [key: string]: (channel: ChannelType) => void } = {} private channelMessagesCallbacks: { [key: string]: (messages: MessageType[]) => void } = {} + private communityCallback: ((community: CommunityType) => void) | undefined + public fetchChannelMessages constructor(client: Client, waku: Waku, publicKey: string) { this.client = client this.waku = waku this.communityPublicKey = publicKey + + this.fetchChannelMessages = fetchChannelMessages( + this, + this.waku.store.queryHistory.bind(this.waku.store) + ) } public async start() { @@ -527,130 +534,6 @@ class Community { return this.channelMessages[channelId] ?? [] } - public async fetchChannelMessages( - channelId: string, - callback: (messages: MessageType[], isDone: boolean) => boolean, - options: { start: Date; end: Date; chunk: number /*total: number*/ } - ) { - const id = `${this.communityPublicKey}${channelId}` - const channelContentTopic = await idToContentTopic(id) - const symKey = await createSymKeyFromPassword(id) - - const messages: MessageType[] = [] - let shouldStop = false - - await this.waku.store.queryHistory([channelContentTopic], { - // timeFilter: { - // startTime: options.start, - // endTime: options.end, - // }, - // todo!: increase after testing - pageSize: 100, - decryptionKeys: [symKey], - callback: wakuMessages => { - for (const wakuMessage of wakuMessages.reverse()) { - if (!wakuMessage.payload) { - continue - } - - if (!wakuMessage.signaturePublicKey) { - continue - } - - const decodedProtocol = ProtocolMessage.decode(wakuMessage.payload) - if (!decodedProtocol) { - continue - } - - const decodedMetadata = ApplicationMetadataMessage.decode( - decodedProtocol.publicMessage - ) - if (!decodedMetadata.payload) { - continue - } - - const messageId = payloadToId( - decodedProtocol.publicMessage, - wakuMessage.signaturePublicKey - ) - - const decodedPayload = ChatMessage.decode(decodedMetadata.payload) - - messages.push({ - ...decodedPayload, - messageId: messageId, - pinned: false, - reactions: { - 'thumbs-up': { - count: 0, - me: false, - }, - 'thumbs-down': { - count: 0, - me: false, - }, - heart: { - count: 0, - me: false, - }, - smile: { - count: 0, - me: false, - }, - sad: { - count: 0, - me: false, - }, - angry: { - count: 0, - me: false, - }, - }, - }) - } - - for (const _chunk of chunk(messages, options.chunk)) { - if (_chunk.length === options.chunk) { - const sortedMessages = sortBy(messages.splice(0, options.chunk), [ - 'timestamp', - ]) - const _messages = [ - ...sortedMessages, - ...(this.channelMessages[channelId] ?? []), - ] - - this.channelMessages[channelId] = _messages - - shouldStop = callback(_messages, false) - - if (shouldStop) { - return true - } - } - } - }, - }) - - if (messages.length && !shouldStop) { - const _messages = [ - ...messages.splice(0), - ...(this.channelMessages[channelId] ?? []), - ] - const sortedMessages = sortBy(_messages, ['timestamp']) - - this.channelMessages[channelId] = sortedMessages - - callback(sortedMessages, true) - - return - } - - // do?: always return at least last state - // callback(this.channelMessages[channelId] ?? [], true) - - return - } - public onCommunityUpdate(callback: (community: CommunityType) => void) { this.communityCallback = callback diff --git a/packages/status-js/src/client/community/fetch-channel-messages.ts b/packages/status-js/src/client/community/fetch-channel-messages.ts new file mode 100644 index 00000000..3ee5a497 --- /dev/null +++ b/packages/status-js/src/client/community/fetch-channel-messages.ts @@ -0,0 +1,51 @@ +import { idToContentTopic } from '../../contentTopic' +import { createSymKeyFromPassword } from '../../encryption' +import { fetchMessages } from './fetch-messages' + +import type { Community, MessageType } from '../../client' +import type { Waku } from 'js-waku' + +export const fetchChannelMessages = + (community: Community, queryHistory: Waku['store']['queryHistory']) => + async ( + channelId: string, + callback: (messages: MessageType[]) => void, + // { start, end = new Date() }: { start: Date; end?: Date } + options: { start: Date; end?: Date } + ) => { + const id = `${community.communityPublicKey}${channelId}` + + const channelContentTopic = idToContentTopic(id) + const symKey = await createSymKeyFromPassword(id) + + const startTime = options.start + let endTime = options.end || new Date() + + const storedMessages = community.channelMessages[channelId] ?? [] + if (storedMessages.length) { + const oldestMessageTime = new Date(Number(storedMessages[0].timestamp)) + + if (oldestMessageTime <= options.start) { + callback(storedMessages) + + return + } + + if (endTime >= oldestMessageTime) { + endTime = oldestMessageTime + } + } + + const messages = await fetchMessages( + queryHistory, + { startTime, endTime, symKey, channelContentTopic }, + storedMessages, + callback + ) + + if (messages.length) { + community.channelMessages[channelId] = [...messages, ...storedMessages] + } + + return + } diff --git a/packages/status-js/src/client/community/fetch-messages.ts b/packages/status-js/src/client/community/fetch-messages.ts new file mode 100644 index 00000000..581d5d34 --- /dev/null +++ b/packages/status-js/src/client/community/fetch-messages.ts @@ -0,0 +1,66 @@ +import { PageDirection } from 'js-waku' + +import { handleMessage } from './handle-message' + +import type { MessageType } from '../../client' +import type { Waku } from 'js-waku' + +const CHUNK_SIZE = 2 +const PAGE_SIZE = 100 + +export async function fetchMessages( + queryHistory: Waku['store']['queryHistory'], + options: { + symKey: Uint8Array + channelContentTopic: string + startTime: Date + endTime: Date + }, + storedMessages: MessageType[], + callback: (messages: MessageType[]) => void +) { + const remainingFetchedMessages: MessageType[] = [] + let fetchedMessages: MessageType[] = [] + + await queryHistory([options.channelContentTopic], { + timeFilter: { + startTime: options.startTime, + endTime: options.endTime, + }, + pageSize: PAGE_SIZE, + // most recent page first + pageDirection: PageDirection.BACKWARD, + decryptionKeys: [options.symKey], + callback: wakuMessages => { + // most recent message first + for (const wakuMessage of wakuMessages.reverse()) { + const message = handleMessage(wakuMessage) + + if (message) { + remainingFetchedMessages.push(message) + } + } + + while (remainingFetchedMessages.length > CHUNK_SIZE) { + // reverse + const _chunk = remainingFetchedMessages.splice(0, CHUNK_SIZE).reverse() + const _messages = [..._chunk, ...fetchedMessages, ...storedMessages] + + callback(_messages) + + fetchedMessages = [..._chunk, ...fetchedMessages] + } + }, + }) + + if (remainingFetchedMessages.length) { + const _chunk = remainingFetchedMessages.splice(0) + const _messages = [..._chunk, ...fetchedMessages, ...storedMessages] + + callback(_messages) + + fetchedMessages = [..._chunk, ...fetchedMessages] + } + + return fetchedMessages +} diff --git a/packages/status-js/src/client/community/handle-message.ts b/packages/status-js/src/client/community/handle-message.ts new file mode 100644 index 00000000..9c746eba --- /dev/null +++ b/packages/status-js/src/client/community/handle-message.ts @@ -0,0 +1,72 @@ +import { ApplicationMetadataMessage } from '../../../protos/application-metadata-message' +import { ChatMessage } from '../../../protos/chat-message' +import { ProtocolMessage } from '../../../protos/protocol-message' +import { payloadToId } from '../../utils/payload-to-id' + +import type { MessageType } from '../../client' +import type { WakuMessage } from 'js-waku' + +export function handleMessage( + wakuMessage: WakuMessage +): MessageType | undefined { + if (!wakuMessage.payload) { + return + } + + if (!wakuMessage.signaturePublicKey) { + return + } + + const decodedProtocol = ProtocolMessage.decode(wakuMessage.payload) + if (!decodedProtocol) { + return + } + + const decodedMetadata = ApplicationMetadataMessage.decode( + decodedProtocol.publicMessage + ) + if (!decodedMetadata.payload) { + return + } + + const decodedPayload = ChatMessage.decode(decodedMetadata.payload) + + const messageId = payloadToId( + decodedProtocol.publicMessage, + wakuMessage.signaturePublicKey + ) + + const message = { + ...decodedPayload, + messageId: messageId, + pinned: false, + reactions: { + 'thumbs-up': { + count: 0, + me: false, + }, + 'thumbs-down': { + count: 0, + me: false, + }, + heart: { + count: 0, + me: false, + }, + smile: { + count: 0, + me: false, + }, + sad: { + count: 0, + me: false, + }, + angry: { + count: 0, + me: false, + }, + }, + } + + return message +}