split history fetching

This commit is contained in:
Felicio Mununga 2022-06-06 15:33:56 +02:00
parent e9a00f559e
commit fa95a375a3
No known key found for this signature in database
GPG Key ID: 0EB8D75C775AB6F1
4 changed files with 202 additions and 130 deletions

View File

@ -9,7 +9,6 @@
// proactively change
import { bytesToHex } from 'ethereum-cryptography/utils'
import { Waku, waku_message } from 'js-waku'
import chunk from 'lodash/chunk'
import difference from 'lodash/difference'
import sortBy from 'lodash/sortBy'
import uniqBy from 'lodash/uniqBy'
@ -21,6 +20,7 @@ import { EmojiReaction } from '../protos/emoji-reaction'
import { PinMessage } from '../protos/pin-message'
import { ProtocolMessage } from '../protos/protocol-message'
import { Account } from './account'
import { fetchChannelMessages } from './client/community/fetch-channel-messages'
// import { ChatIdentity } from './wire/chat_identity'
import { idToContentTopic } from './contentTopic'
import { createSymKeyFromPassword } from './encryption'
@ -76,11 +76,12 @@ class Client {
bootstrap: {
default: false,
peers: [
// '/dns4/node-01.gc-us-central1-a.wakuv2.test.statusim.net/tcp/443/wss/p2p/16Uiu2HAmJb2e28qLXxT5kZxVUUoJt72EMzNGXB47Rxx5hw3q4YjS',
'/dns4/node-01.do-ams3.wakuv2.test.statusim.net/tcp/8000/wss/p2p/16Uiu2HAmPLe7Mzm8TsYUubgCAW1aJoeFScxrLj8ppHFivPo97bUZ',
'/dns4/node-01.gc-us-central1-a.wakuv2.test.statusim.net/tcp/443/wss/p2p/16Uiu2HAmJb2e28qLXxT5kZxVUUoJt72EMzNGXB47Rxx5hw3q4YjS',
// '/dns4/node-01.do-ams3.wakuv2.test.statusim.net/tcp/8000/wss/p2p/16Uiu2HAmPLe7Mzm8TsYUubgCAW1aJoeFScxrLj8ppHFivPo97bUZ',
// '/dns4/node-01.do-ams3.status.test.statusim.net/tcp/30303/p2p/16Uiu2HAkukebeXjTQ9QDBeNDWuGfbaSg79wkkhK4vPocLgR6QFDf'
],
},
libp2p: { config: { pubsub: { enabled: true, emitSelf: true } } },
})
await waku.waitForRemotePeer()
this.waku = waku
@ -108,23 +109,29 @@ class Client {
class Community {
private client: Client
private waku: Waku
private communityPublicKey: string
public communityPublicKey: string
private communityContentTopic!: string
private communityDecryptionKey!: Uint8Array
public communityMetadata!: CommunityType
private communityCallback: ((community: CommunityType) => void) | undefined
private channelMessages: Partial<{ [key: string]: MessageType[] }> = {}
public channelMessages: Partial<{ [key: string]: MessageType[] }> = {}
private channelCallbacks: {
[key: string]: (channel: ChannelType) => void
} = {}
private channelMessagesCallbacks: {
[key: string]: (messages: MessageType[]) => void
} = {}
private communityCallback: ((community: CommunityType) => void) | undefined
public fetchChannelMessages
constructor(client: Client, waku: Waku, publicKey: string) {
this.client = client
this.waku = waku
this.communityPublicKey = publicKey
this.fetchChannelMessages = fetchChannelMessages(
this,
this.waku.store.queryHistory.bind(this.waku.store)
)
}
public async start() {
@ -527,130 +534,6 @@ class Community {
return this.channelMessages[channelId] ?? []
}
public async fetchChannelMessages(
channelId: string,
callback: (messages: MessageType[], isDone: boolean) => boolean,
options: { start: Date; end: Date; chunk: number /*total: number*/ }
) {
const id = `${this.communityPublicKey}${channelId}`
const channelContentTopic = await idToContentTopic(id)
const symKey = await createSymKeyFromPassword(id)
const messages: MessageType[] = []
let shouldStop = false
await this.waku.store.queryHistory([channelContentTopic], {
// timeFilter: {
// startTime: options.start,
// endTime: options.end,
// },
// todo!: increase after testing
pageSize: 100,
decryptionKeys: [symKey],
callback: wakuMessages => {
for (const wakuMessage of wakuMessages.reverse()) {
if (!wakuMessage.payload) {
continue
}
if (!wakuMessage.signaturePublicKey) {
continue
}
const decodedProtocol = ProtocolMessage.decode(wakuMessage.payload)
if (!decodedProtocol) {
continue
}
const decodedMetadata = ApplicationMetadataMessage.decode(
decodedProtocol.publicMessage
)
if (!decodedMetadata.payload) {
continue
}
const messageId = payloadToId(
decodedProtocol.publicMessage,
wakuMessage.signaturePublicKey
)
const decodedPayload = ChatMessage.decode(decodedMetadata.payload)
messages.push({
...decodedPayload,
messageId: messageId,
pinned: false,
reactions: {
'thumbs-up': {
count: 0,
me: false,
},
'thumbs-down': {
count: 0,
me: false,
},
heart: {
count: 0,
me: false,
},
smile: {
count: 0,
me: false,
},
sad: {
count: 0,
me: false,
},
angry: {
count: 0,
me: false,
},
},
})
}
for (const _chunk of chunk(messages, options.chunk)) {
if (_chunk.length === options.chunk) {
const sortedMessages = sortBy(messages.splice(0, options.chunk), [
'timestamp',
])
const _messages = [
...sortedMessages,
...(this.channelMessages[channelId] ?? []),
]
this.channelMessages[channelId] = _messages
shouldStop = callback(_messages, false)
if (shouldStop) {
return true
}
}
}
},
})
if (messages.length && !shouldStop) {
const _messages = [
...messages.splice(0),
...(this.channelMessages[channelId] ?? []),
]
const sortedMessages = sortBy(_messages, ['timestamp'])
this.channelMessages[channelId] = sortedMessages
callback(sortedMessages, true)
return
}
// do?: always return at least last state
// callback(this.channelMessages[channelId] ?? [], true)
return
}
public onCommunityUpdate(callback: (community: CommunityType) => void) {
this.communityCallback = callback

View File

@ -0,0 +1,51 @@
import { idToContentTopic } from '../../contentTopic'
import { createSymKeyFromPassword } from '../../encryption'
import { fetchMessages } from './fetch-messages'
import type { Community, MessageType } from '../../client'
import type { Waku } from 'js-waku'
export const fetchChannelMessages =
(community: Community, queryHistory: Waku['store']['queryHistory']) =>
async (
channelId: string,
callback: (messages: MessageType[]) => void,
// { start, end = new Date() }: { start: Date; end?: Date }
options: { start: Date; end?: Date }
) => {
const id = `${community.communityPublicKey}${channelId}`
const channelContentTopic = idToContentTopic(id)
const symKey = await createSymKeyFromPassword(id)
const startTime = options.start
let endTime = options.end || new Date()
const storedMessages = community.channelMessages[channelId] ?? []
if (storedMessages.length) {
const oldestMessageTime = new Date(Number(storedMessages[0].timestamp))
if (oldestMessageTime <= options.start) {
callback(storedMessages)
return
}
if (endTime >= oldestMessageTime) {
endTime = oldestMessageTime
}
}
const messages = await fetchMessages(
queryHistory,
{ startTime, endTime, symKey, channelContentTopic },
storedMessages,
callback
)
if (messages.length) {
community.channelMessages[channelId] = [...messages, ...storedMessages]
}
return
}

View File

@ -0,0 +1,66 @@
import { PageDirection } from 'js-waku'
import { handleMessage } from './handle-message'
import type { MessageType } from '../../client'
import type { Waku } from 'js-waku'
const CHUNK_SIZE = 2
const PAGE_SIZE = 100
export async function fetchMessages(
queryHistory: Waku['store']['queryHistory'],
options: {
symKey: Uint8Array
channelContentTopic: string
startTime: Date
endTime: Date
},
storedMessages: MessageType[],
callback: (messages: MessageType[]) => void
) {
const remainingFetchedMessages: MessageType[] = []
let fetchedMessages: MessageType[] = []
await queryHistory([options.channelContentTopic], {
timeFilter: {
startTime: options.startTime,
endTime: options.endTime,
},
pageSize: PAGE_SIZE,
// most recent page first
pageDirection: PageDirection.BACKWARD,
decryptionKeys: [options.symKey],
callback: wakuMessages => {
// most recent message first
for (const wakuMessage of wakuMessages.reverse()) {
const message = handleMessage(wakuMessage)
if (message) {
remainingFetchedMessages.push(message)
}
}
while (remainingFetchedMessages.length > CHUNK_SIZE) {
// reverse
const _chunk = remainingFetchedMessages.splice(0, CHUNK_SIZE).reverse()
const _messages = [..._chunk, ...fetchedMessages, ...storedMessages]
callback(_messages)
fetchedMessages = [..._chunk, ...fetchedMessages]
}
},
})
if (remainingFetchedMessages.length) {
const _chunk = remainingFetchedMessages.splice(0)
const _messages = [..._chunk, ...fetchedMessages, ...storedMessages]
callback(_messages)
fetchedMessages = [..._chunk, ...fetchedMessages]
}
return fetchedMessages
}

View File

@ -0,0 +1,72 @@
import { ApplicationMetadataMessage } from '../../../protos/application-metadata-message'
import { ChatMessage } from '../../../protos/chat-message'
import { ProtocolMessage } from '../../../protos/protocol-message'
import { payloadToId } from '../../utils/payload-to-id'
import type { MessageType } from '../../client'
import type { WakuMessage } from 'js-waku'
export function handleMessage(
wakuMessage: WakuMessage
): MessageType | undefined {
if (!wakuMessage.payload) {
return
}
if (!wakuMessage.signaturePublicKey) {
return
}
const decodedProtocol = ProtocolMessage.decode(wakuMessage.payload)
if (!decodedProtocol) {
return
}
const decodedMetadata = ApplicationMetadataMessage.decode(
decodedProtocol.publicMessage
)
if (!decodedMetadata.payload) {
return
}
const decodedPayload = ChatMessage.decode(decodedMetadata.payload)
const messageId = payloadToId(
decodedProtocol.publicMessage,
wakuMessage.signaturePublicKey
)
const message = {
...decodedPayload,
messageId: messageId,
pinned: false,
reactions: {
'thumbs-up': {
count: 0,
me: false,
},
'thumbs-down': {
count: 0,
me: false,
},
heart: {
count: 0,
me: false,
},
smile: {
count: 0,
me: false,
},
sad: {
count: 0,
me: false,
},
angry: {
count: 0,
me: false,
},
},
}
return message
}