change client

- split client and community
- split some community methods
- add message fetching
- make observeChannelMessages async
This commit is contained in:
Felicio Mununga 2022-06-08 11:37:53 +02:00
parent cc4765d602
commit 857d283307
No known key found for this signature in database
GPG Key ID: 0EB8D75C775AB6F1
17 changed files with 814 additions and 748 deletions

View File

@ -36,4 +36,3 @@ export class Account {
return concatBytes(signature, new Uint8Array([recoverId])) return concatBytes(signature, new Uint8Array([recoverId]))
} }
} }
}

View File

@ -1,58 +1,25 @@
// todo: replies // todo: try protocol layer; then application data layer
// todo: replies; normalize messages (e.g. replies) prior returning
// todo: tests
// todo?: use clock for sorting
// todo: handle diff waku messages on diff topics
// todo: handle disconnections; no messages after sleep; libp2p;
// todo: identities/members? // todo: identities/members?
// todo: validate sig // todo: validate sig
// todo: observer contact updates // todo: observer contact updates
// todo: observer channels // todo: observer channels
// todo?: rename channels to chats
// todo: change relative import paths
// todo?: multiple communityCallback
// todo?: call onChannel* separately
// denormalized import { Waku } from 'js-waku'
// before calling callback; response to message id
// proactively change
import { bytesToHex } from 'ethereum-cryptography/utils'
import { Waku, waku_message } from 'js-waku'
import difference from 'lodash/difference'
import sortBy from 'lodash/sortBy'
import uniqBy from 'lodash/uniqBy'
import { ApplicationMetadataMessage } from '../protos/application-metadata-message'
// import { ChatIdentity } from '../protos/chat-identity'
import { ChatMessage, DeleteMessage, EditMessage } from '../protos/chat-message'
import { EmojiReaction } from '../protos/emoji-reaction'
import { PinMessage } from '../protos/pin-message'
import { ProtocolMessage } from '../protos/protocol-message'
import { Account } from './account' import { Account } from './account'
import { fetchChannelMessages } from './client/community/fetch-channel-messages' import { Community } from './client/community/community'
// import { ChatIdentity } from './wire/chat_identity'
import { idToContentTopic } from './contentTopic'
import { createSymKeyFromPassword } from './encryption'
import { payloadToId } from './utils/payload-to-id'
import { recoverPublicKeyFromMetadata } from './utils/recover-public-key-from-metadata'
import { CommunityDescription } from './wire/community_description'
import type { WakuMessage } from 'js-waku'
// todo: rename to chat
type CommunityType = CommunityDescription['proto']
type ChannelType = any
export type MessageType = ChatMessage & {
messageId: string
pinned: boolean
reactions: Reactions
}
type Reaction =
| 'heart'
| 'thumbs-up'
| 'thumbs-down'
| 'smile'
| 'sad'
| 'angry'
type Reactions = {
[key in Reaction]: {
count: number
me: boolean
}
}
export interface ClientOptions { export interface ClientOptions {
publicKey: string publicKey: string
@ -60,35 +27,37 @@ export interface ClientOptions {
} }
class Client { class Client {
private waku!: Waku private waku: Waku
private communityPublicKey: string
public account?: Account public account?: Account
// fixme public community: Community
public community!: Community
constructor(options: ClientOptions) { constructor(waku: Waku, options: ClientOptions) {
this.communityPublicKey = options.publicKey // Waku
this.waku = waku
// Community
this.community = new Community(this, waku, options.publicKey)
} }
public async start() { static async start(options: ClientOptions) {
// Waku
const waku = await Waku.create({ const waku = await Waku.create({
bootstrap: { bootstrap: {
default: false, default: false,
peers: [ peers: [
'/dns4/node-01.gc-us-central1-a.wakuv2.test.statusim.net/tcp/443/wss/p2p/16Uiu2HAmJb2e28qLXxT5kZxVUUoJt72EMzNGXB47Rxx5hw3q4YjS', '/dns4/node-01.gc-us-central1-a.wakuv2.test.statusim.net/tcp/443/wss/p2p/16Uiu2HAmJb2e28qLXxT5kZxVUUoJt72EMzNGXB47Rxx5hw3q4YjS',
// '/dns4/node-01.do-ams3.wakuv2.test.statusim.net/tcp/8000/wss/p2p/16Uiu2HAmPLe7Mzm8TsYUubgCAW1aJoeFScxrLj8ppHFivPo97bUZ',
// '/dns4/node-01.do-ams3.status.test.statusim.net/tcp/30303/p2p/16Uiu2HAkukebeXjTQ9QDBeNDWuGfbaSg79wkkhK4vPocLgR6QFDf'
], ],
}, },
libp2p: { config: { pubsub: { enabled: true, emitSelf: true } } },
}) })
await waku.waitForRemotePeer() await waku.waitForRemotePeer()
this.waku = waku
const community = new Community(this, waku, this.communityPublicKey) // Client
await community.start() const client = new Client(waku, options)
this.community = community
// Community
await client.community.start()
return client
} }
public async stop() { public async stop() {
@ -97,6 +66,7 @@ class Client {
public createAccount = (): Account => { public createAccount = (): Account => {
this.account = new Account() this.account = new Account()
return this.account return this.account
} }
@ -106,482 +76,10 @@ class Client {
// } // }
} }
class Community {
private client: Client
private waku: Waku
public communityPublicKey: string
private communityContentTopic!: string
private communityDecryptionKey!: Uint8Array
public communityMetadata!: CommunityType
public channelMessages: Partial<{ [key: string]: MessageType[] }> = {}
private channelCallbacks: {
[key: string]: (channel: ChannelType) => void
} = {}
private channelMessagesCallbacks: {
[key: string]: (messages: MessageType[]) => void
} = {}
private communityCallback: ((community: CommunityType) => void) | undefined
public fetchChannelMessages
constructor(client: Client, waku: Waku, publicKey: string) {
this.client = client
this.waku = waku
this.communityPublicKey = publicKey
this.fetchChannelMessages = fetchChannelMessages(
this,
this.waku.store.queryHistory.bind(this.waku.store)
)
}
public async start() {
// Community
this.communityContentTopic = idToContentTopic(this.communityPublicKey)
this.communityDecryptionKey = await createSymKeyFromPassword(
this.communityPublicKey
)
this.waku.store.addDecryptionKey(this.communityDecryptionKey)
await this.fetchCommunity()
// handle community not connected
await this.observeCommunity()
// Channel messages
await this.observeChannelMessages(Object.keys(this.communityMetadata.chats))
}
private async observeCommunity() {
this.waku.relay.addDecryptionKey(this.communityDecryptionKey)
this.waku.relay.addObserver(
message => {
if (!message.payload) {
return
}
const decodedMetadata = ApplicationMetadataMessage.decode(
message.payload
)
if (!decodedMetadata.payload) {
return
}
const decodedPayload = CommunityDescription.decode(
decodedMetadata.payload
)
if (!decodedPayload.identity) {
return
}
const removedChats = difference(
Object.keys(this.communityMetadata.chats),
Object.keys(decodedPayload.proto.chats)
)
const addedChats = difference(
Object.keys(decodedPayload.proto.chats),
Object.keys(this.communityMetadata.chats)
)
if (removedChats.length) {
this.unobserveChannelMessages(removedChats)
}
if (addedChats.length) {
this.observeChannelMessages(addedChats)
}
this.communityMetadata = decodedPayload.proto
this.communityCallback?.(decodedPayload.proto)
},
[this.communityContentTopic]
)
}
private async observeChannelMessages(chats: string[]) {
const contentTopics: string[] = []
for (const chatId of chats) {
const id = `${this.communityPublicKey}${chatId}`
const channelContentTopic = idToContentTopic(id)
const symKey = await createSymKeyFromPassword(id)
contentTopics.push(channelContentTopic)
// todo: request waku feature to be passed as param
// TODO?: use contentTopics as array instead of separate observer for each chat
this.waku.relay.addDecryptionKey(symKey, {
method: waku_message.DecryptionMethod.Symmetric,
contentTopics: [channelContentTopic],
})
}
// todo?: delete Waku observers
// todo?: check if waku propagates errors
// todo!: request Waku feature to accept decryption keys as a param
this.waku.relay.addObserver(this.handleMessage, contentTopics)
}
private unobserveChannelMessages(chatIds: string[]) {
const contentTopics = chatIds.map(chatId => {
const id = `${this.communityPublicKey}${chatId}`
const channelContentTopic = idToContentTopic(id)
return channelContentTopic
})
this.waku.relay.deleteObserver(this.handleMessage, contentTopics)
}
private handleMessage = (wakuMessage: WakuMessage) => {
if (!wakuMessage.payload) {
return
}
const decodedProtocol = ProtocolMessage.decode(wakuMessage.payload)
if (!decodedProtocol) {
return
}
const decodedMetadata = ApplicationMetadataMessage.decode(
decodedProtocol.publicMessage
// message.payload
)
if (!decodedMetadata.payload) {
return
}
try {
const pk = recoverPublicKeyFromMetadata(decodedMetadata)
console.log('pk', pk)
} catch (err) {
console.error(err)
}
let shouldUpdate = false
let _decodedPayload:
| ChatMessage
| EditMessage
| DeleteMessage
| PinMessage
| EmojiReaction
| undefined
switch (decodedMetadata.type) {
case ApplicationMetadataMessage.Type.TYPE_CHAT_MESSAGE: {
if (!wakuMessage.signaturePublicKey) {
break
}
const messageId = payloadToId(
decodedProtocol.publicMessage,
wakuMessage.signaturePublicKey
)
const decodedPayload = ChatMessage.decode(decodedMetadata.payload)
// todo: explain
// if (!decodedMetadata.identity) {
// break
// }
// const decodedIdentity = ChatIdentity.decode(decodedProtocol.bundles[0].identity)
// todo: handle already received messages
// TODO?: ignore messages which are messageType !== COMMUNITY_CHAT
const channelId = decodedPayload.chatId.slice(68)
if (!this.channelMessages[channelId]) {
this.channelMessages[channelId] = []
}
const channelMessage: MessageType = {
...decodedPayload,
// ...decodedIdentity,
messageId,
pinned: false,
reactions: {
'thumbs-up': {
count: 0,
me: false,
},
'thumbs-down': {
count: 0,
me: false,
},
heart: {
count: 0,
me: false,
},
smile: {
count: 0,
me: false,
},
sad: {
count: 0,
me: false,
},
angry: {
count: 0,
me: false,
},
},
}
this.channelMessages[channelId].push(channelMessage)
shouldUpdate = true
_decodedPayload = decodedPayload
break
}
case ApplicationMetadataMessage.Type.TYPE_EDIT_MESSAGE: {
if (!wakuMessage.signaturePublicKey) {
break
}
const decodedPayload = EditMessage.decode(decodedMetadata.payload)
const channelId = decodedPayload.chatId.slice(68)
const messageId = decodedPayload.messageId
const msgs = this.channelMessages[channelId].map(message => {
if (message.messageId === messageId) {
shouldUpdate = true
return {
...message,
// fixme?: other fields that user can edit
text: decodedPayload.text,
}
}
return message
})
this.channelMessages[channelId] = msgs
_decodedPayload = decodedPayload
break
}
case ApplicationMetadataMessage.Type.TYPE_DELETE_MESSAGE: {
const decodedPayload = DeleteMessage.decode(decodedMetadata.payload)
const channelId = decodedPayload.chatId.slice(68)
const messageId = decodedPayload.messageId
const msgs = this.channelMessages[channelId].filter(message => {
if (message.messageId === messageId) {
shouldUpdate = true
return false
}
return true
})
this.channelMessages[channelId] = msgs
_decodedPayload = decodedPayload
break
}
case ApplicationMetadataMessage.Type.TYPE_PIN_MESSAGE: {
const decodedPayload = PinMessage.decode(decodedMetadata.payload)
const channelId = decodedPayload.chatId.slice(68)
const messageId = decodedPayload.messageId
const message = this.channelMessages[channelId].find(
message => message.messageId === messageId
)
if (message) {
message.pinned = Boolean(decodedPayload.pinned)
shouldUpdate = true
_decodedPayload = decodedPayload
}
break
}
case ApplicationMetadataMessage.Type.TYPE_EMOJI_REACTION: {
if (!wakuMessage.signaturePublicKey) {
break
}
const decodedPayload = EmojiReaction.decode(decodedMetadata.payload)
const channelId = decodedPayload.chatId.slice(68)
const messageId = decodedPayload.messageId
const message = this.channelMessages[channelId].find(
message => message.messageId === messageId
)
if (message) {
const isMe =
this.client.account?.publicKey ===
`0x${bytesToHex(wakuMessage.signaturePublicKey)}`
// TODO?: not needed anymore
message.reactions ??= {
'thumbs-up': {
count: 0,
me: false,
},
'thumbs-down': {
count: 0,
me: false,
},
heart: {
count: 0,
me: false,
},
smile: {
count: 0,
me: false,
},
sad: {
count: 0,
me: false,
},
angry: {
count: 0,
me: false,
},
}
// fixme?: mutates
setReactions(message.reactions, decodedPayload, isMe)
shouldUpdate = true
_decodedPayload = decodedPayload
}
break
}
default:
break
}
if (shouldUpdate && _decodedPayload) {
const channelId = _decodedPayload.chatId.slice(68)
const messages = this.channelMessages[channelId] ?? []
const sortedMessages = sortBy(messages, ['timestamp'])
// todo: do not use
const uniqueChannelMessages = uniqBy(sortedMessages, 'messageId')
this.channelMessages[channelId] = uniqueChannelMessages
this.channelMessagesCallbacks[channelId]?.(
this.channelMessages[channelId]
)
}
}
public async fetchCommunity() {
let community: CommunityType | undefined
await this.waku.store.queryHistory([this.communityContentTopic], {
decryptionKeys: [this.communityDecryptionKey],
callback: messages => {
for (const message of messages.reverse()) {
if (!message.payload) {
return
}
const decodedMetadata = ApplicationMetadataMessage.decode(
message.payload
)
if (!decodedMetadata.payload) {
return
}
const decodedPayload = CommunityDescription.decode(
decodedMetadata.payload
)
// todo: explain
if (!decodedPayload.identity) {
return
}
community = decodedPayload.proto
this.communityMetadata = decodedPayload.proto
return true
}
},
})
return community
}
public getMessages(channelId: string): MessageType[] {
return this.channelMessages[channelId] ?? []
}
public onCommunityUpdate(callback: (community: CommunityType) => void) {
this.communityCallback = callback
return () => {
this.communityCallback = undefined
}
}
public onChannelUpdate(
channelId: string,
callback: (channel: ChannelType) => void
) {
this.channelCallbacks[channelId] = callback
return () => {
delete this.channelCallbacks[channelId]
}
}
public onChannelMessageUpdate(
channelId: string,
callback: (messages: MessageType[]) => void
) {
this.channelMessagesCallbacks[channelId] = callback
return () => {
delete this.channelMessagesCallbacks[channelId]
}
}
}
// fixme: type
const REACTION_MAP: Record<EmojiReaction.Type, string> = {
[EmojiReaction.Type.LOVE]: 'heart',
[EmojiReaction.Type.THUMBS_UP]: 'thumbs-up',
[EmojiReaction.Type.THUMBS_DOWN]: 'thumbs-down',
[EmojiReaction.Type.LAUGH]: 'smile',
[EmojiReaction.Type.SAD]: 'sad',
[EmojiReaction.Type.ANGRY]: 'angry',
[EmojiReaction.Type.UNKNOWN_EMOJI_REACTION_TYPE]: 'unknown',
}
function setReactions(
reactions: Reactions,
reaction: EmojiReaction,
isMe: boolean
) {
const type = REACTION_MAP[reaction.type]
const isRetracted = reaction.retracted
if (!reactions[type]) {
reactions[type] = {
count: 1,
me: isMe,
}
return
}
reactions[type].count += isRetracted ? -1 : 1
if (isMe) {
reactions[type].me = isRetracted ? false : true
}
}
// todo export community metadata type
export type { Client, Community, CommunityType }
export async function createClient(options: ClientOptions): Promise<Client> { export async function createClient(options: ClientOptions): Promise<Client> {
const client = new Client(options) const client = await Client.start(options)
// TODO?: add start
return client return client
} }
export type { Client }

View File

@ -0,0 +1,246 @@
import { waku_message } from 'js-waku'
import difference from 'lodash/difference'
import { idToContentTopic } from '../../contentTopic'
import { createSymKeyFromPassword } from '../../encryption'
import { createChannelContentTopics } from './create-channel-content-topics'
import { fetchChannelChatMessages } from './fetch-channel-chat-messages'
import { handleChannelChatMessage } from './handle-channel-chat-message'
import { handleCommunity } from './handle-community'
import type { ChatMessage } from '../../../protos/chat-message'
import type { Client } from '../../client'
import type { CommunityDescription } from '../../wire/community_description'
import type { Reactions } from './get-reactions'
import type { Waku, WakuMessage } from 'js-waku'
export type CommunityMetadataType = CommunityDescription['proto']
export type MessageType = ChatMessage & {
messageId: string
pinned: boolean
reactions: Reactions
channelId: string
}
export class Community {
private client: Client
private waku: Waku
public communityPublicKey: string
// fixme!
private communityContentTopic!: string
private communityDecryptionKey!: Uint8Array
public communityMetadata!: CommunityMetadataType
public channelMessages: Partial<{ [key: string]: MessageType[] }> = {}
private channelMessagesCallbacks: {
[key: string]: (messages: MessageType[]) => void
} = {}
private communityCallback:
| ((community: CommunityMetadataType) => void)
| undefined
constructor(client: Client, waku: Waku, publicKey: string) {
this.client = client
this.waku = waku
this.communityPublicKey = publicKey
}
public async start() {
this.communityContentTopic = idToContentTopic(this.communityPublicKey)
this.communityDecryptionKey = await createSymKeyFromPassword(
this.communityPublicKey
)
// Waku
this.waku.store.addDecryptionKey(this.communityDecryptionKey)
// Community
const communityMetadata = await this.fetchCommunity()
if (!communityMetadata) {
throw new Error('Failed to intiliaze Community')
}
this.communityMetadata = communityMetadata
await this.observeCommunity()
// Channels
await this.observeChannelMessages(Object.keys(this.communityMetadata.chats))
}
public fetchCommunity = async () => {
let communityMetadata: CommunityMetadataType | undefined
let shouldStop = false
await this.waku.store.queryHistory([this.communityContentTopic], {
decryptionKeys: [this.communityDecryptionKey],
callback: wakuMessages => {
for (const wakuMessage of wakuMessages.reverse()) {
const message = handleCommunity(wakuMessage)
if (!message) {
return shouldStop
}
communityMetadata = message
shouldStop = true
return shouldStop
}
},
})
return communityMetadata
}
// todo?: rename and implement as "fetch history" (e.g. emojis, however, would arrive first and not match)
public createFetchChannelMessages = async (
channelId: string,
callback: (messages: MessageType[]) => void
) => {
const id = `${this.communityPublicKey}${channelId}`
const channelContentTopic = idToContentTopic(id)
// todo: keep in state instead and replace the factory
const symKey = await createSymKeyFromPassword(id)
return async (options: { start: Date; end?: Date }) => {
const messages = await fetchChannelChatMessages(
this.waku,
symKey,
channelContentTopic,
this.channelMessages[channelId] ?? [],
options,
callback
)
if (!messages.length) {
return
}
// state
this.channelMessages[channelId] = messages
return
}
}
private observeCommunity = () => {
this.waku.relay.addDecryptionKey(this.communityDecryptionKey)
this.waku.relay.addObserver(this.handleCommunity, [
this.communityContentTopic,
])
}
private observeChannelMessages = async (chatsIds: string[]) => {
const symKeyPromises = chatsIds.map((chatId: string) => {
return new Promise<string>(resolve => {
const id = `${this.communityPublicKey}${chatId}`
const channelContentTopic = idToContentTopic(id)
createSymKeyFromPassword(id).then(symKey => {
// todo: request waku feature to be passed as param
this.waku.relay.addDecryptionKey(symKey, {
method: waku_message.DecryptionMethod.Symmetric,
contentTopics: [channelContentTopic],
})
resolve(channelContentTopic)
})
})
})
const contentTopics = await Promise.all(symKeyPromises)
this.waku.relay.addObserver(this.handleMessage, contentTopics)
}
private unobserveChannelMessages = (chatIds: string[]) => {
const contentTopics = createChannelContentTopics(
chatIds,
this.communityPublicKey
)
this.waku.relay.deleteObserver(this.handleMessage, contentTopics)
}
private handleCommunity = (wakuMessage: WakuMessage) => {
const communityMetadata = handleCommunity(wakuMessage)
if (!communityMetadata) {
return
}
// Channels
const removedChats = difference(
Object.keys(this.communityMetadata.chats),
Object.keys(communityMetadata.chats)
)
const addedChats = difference(
Object.keys(communityMetadata.chats),
Object.keys(communityMetadata.chats)
)
if (removedChats.length) {
this.unobserveChannelMessages(removedChats)
}
if (addedChats.length) {
this.observeChannelMessages(addedChats)
}
// Community
this.communityCallback?.(communityMetadata)
}
private handleMessage = (wakuMessage: WakuMessage) => {
const messages = handleChannelChatMessage(
wakuMessage,
this.channelMessages,
this.client.account?.publicKey
)
if (!messages.length) {
return
}
// state
const channelId = messages[0].channelId
// todo: don't use; insert in-place
// const sortedMessages = sortBy(messages, ['timestamp'])
// todo: don't use; check prior insert
// const uniqueChannelMessages = uniqBy(sortedMessages, 'messageId')
// todo?: remove undefined left after deletion
this.channelMessages[channelId] = messages
// callback
// todo!: review use of !
this.channelMessagesCallbacks[channelId]?.(this.channelMessages[channelId]!)
}
public getMessages(channelId: string): MessageType[] {
return this.channelMessages[channelId] ?? []
}
public onCommunityUpdate = (
callback: (community: CommunityMetadataType) => void
) => {
this.communityCallback = callback
return () => {
this.communityCallback = undefined
}
}
public onChannelMessageUpdate = (
channelId: string,
callback: (messages: MessageType[]) => void
) => {
this.channelMessagesCallbacks[channelId] = callback
return () => {
delete this.channelMessagesCallbacks[channelId]
}
}
}

View File

@ -0,0 +1,15 @@
import { idToContentTopic } from '../../contentTopic'
export function createChannelContentTopics(
channelIds: string[],
communityPublicKey: string
) {
const channelTopics = channelIds.map(channelId => {
const id = `${communityPublicKey}${channelId}`
const channelContentTopic = idToContentTopic(id)
return channelContentTopic
})
return channelTopics
}

View File

@ -2,11 +2,13 @@ import { ApplicationMetadataMessage } from '../../../protos/application-metadata
import { ChatMessage } from '../../../protos/chat-message' import { ChatMessage } from '../../../protos/chat-message'
import { ProtocolMessage } from '../../../protos/protocol-message' import { ProtocolMessage } from '../../../protos/protocol-message'
import { payloadToId } from '../../utils/payload-to-id' import { payloadToId } from '../../utils/payload-to-id'
import { getChannelId } from './get-channel-id'
import { mapChatMessage } from './map-chat-message'
import type { MessageType } from '../../client' import type { MessageType } from './community'
import type { WakuMessage } from 'js-waku' import type { WakuMessage } from 'js-waku'
export function handleMessage( export function handleChannelChatMessage(
wakuMessage: WakuMessage wakuMessage: WakuMessage
): MessageType | undefined { ): MessageType | undefined {
if (!wakuMessage.payload) { if (!wakuMessage.payload) {
@ -29,44 +31,22 @@ export function handleMessage(
return return
} }
// todo?: process other types of messages
if (
decodedMetadata.type !== ApplicationMetadataMessage.Type.TYPE_CHAT_MESSAGE
) {
return
}
const decodedPayload = ChatMessage.decode(decodedMetadata.payload) const decodedPayload = ChatMessage.decode(decodedMetadata.payload)
const messageId = payloadToId( const messageId = payloadToId(
decodedProtocol.publicMessage, decodedProtocol.publicMessage,
wakuMessage.signaturePublicKey wakuMessage.signaturePublicKey
) )
const channelId = getChannelId(decodedPayload.chatId)
const message = { const message = mapChatMessage(decodedPayload, { messageId, channelId })
...decodedPayload,
messageId: messageId,
pinned: false,
reactions: {
'thumbs-up': {
count: 0,
me: false,
},
'thumbs-down': {
count: 0,
me: false,
},
heart: {
count: 0,
me: false,
},
smile: {
count: 0,
me: false,
},
sad: {
count: 0,
me: false,
},
angry: {
count: 0,
me: false,
},
},
}
return message return message
} }

View File

@ -0,0 +1,111 @@
import { PageDirection } from 'js-waku'
import { handleChannelChatMessage } from './delete_handle-channel-chat-message'
import type { MessageType } from './community'
import type { Waku } from 'js-waku'
const CHUNK_SIZE = 2
const PAGE_SIZE = 2
export const fetchChannelChatMessages = async (
waku: Waku,
symKey: Uint8Array,
contentTopic: string,
storedMessages: MessageType[],
options: { start: Date; end?: Date },
callback: (messages: MessageType[]) => void
): Promise<MessageType[]> => {
let result: MessageType[] = []
const startTime = options.start
let endTime = options.end || new Date()
if (storedMessages.length) {
const oldestMessageTime = new Date(Number(storedMessages[0].timestamp))
if (oldestMessageTime <= options.start) {
callback(storedMessages)
return result
}
if (endTime >= oldestMessageTime) {
endTime = oldestMessageTime
}
}
const fetchedMessages = await fetchMessages(
waku,
symKey,
contentTopic,
storedMessages,
{ startTime, endTime },
callback
)
if (!fetchedMessages.length) {
return result
}
result = [...fetchedMessages, ...storedMessages]
return result
}
export async function fetchMessages(
waku: Waku,
symKey: Uint8Array,
contentTopic: string,
storedMessages: MessageType[],
options: {
startTime: Date
endTime: Date
},
callback: (messages: MessageType[]) => void
) {
const remainingFetchedMessages: MessageType[] = []
let fetchedMessages: MessageType[] = []
await waku.store.queryHistory([contentTopic], {
timeFilter: {
startTime: options.startTime,
endTime: options.endTime,
},
pageSize: PAGE_SIZE,
// most recent page first
pageDirection: PageDirection.BACKWARD,
decryptionKeys: [symKey],
callback: wakuMessages => {
// most recent message first
for (const wakuMessage of wakuMessages.reverse()) {
const message = handleChannelChatMessage(wakuMessage)
if (message) {
remainingFetchedMessages.push(message)
}
}
while (remainingFetchedMessages.length >= CHUNK_SIZE) {
// reverse
const _chunk = remainingFetchedMessages.splice(0, CHUNK_SIZE).reverse()
const _messages = [..._chunk, ...fetchedMessages, ...storedMessages]
callback(_messages)
fetchedMessages = [..._chunk, ...fetchedMessages]
}
},
})
if (remainingFetchedMessages.length) {
const _chunk = remainingFetchedMessages.splice(0)
const _messages = [..._chunk, ...fetchedMessages, ...storedMessages]
callback(_messages)
fetchedMessages = [..._chunk, ...fetchedMessages]
}
return fetchedMessages
}

View File

@ -1,51 +0,0 @@
import { idToContentTopic } from '../../contentTopic'
import { createSymKeyFromPassword } from '../../encryption'
import { fetchMessages } from './fetch-messages'
import type { Community, MessageType } from '../../client'
import type { Waku } from 'js-waku'
export const fetchChannelMessages =
(community: Community, queryHistory: Waku['store']['queryHistory']) =>
async (
channelId: string,
callback: (messages: MessageType[]) => void,
// { start, end = new Date() }: { start: Date; end?: Date }
options: { start: Date; end?: Date }
) => {
const id = `${community.communityPublicKey}${channelId}`
const channelContentTopic = idToContentTopic(id)
const symKey = await createSymKeyFromPassword(id)
const startTime = options.start
let endTime = options.end || new Date()
const storedMessages = community.channelMessages[channelId] ?? []
if (storedMessages.length) {
const oldestMessageTime = new Date(Number(storedMessages[0].timestamp))
if (oldestMessageTime <= options.start) {
callback(storedMessages)
return
}
if (endTime >= oldestMessageTime) {
endTime = oldestMessageTime
}
}
const messages = await fetchMessages(
queryHistory,
{ startTime, endTime, symKey, channelContentTopic },
storedMessages,
callback
)
if (messages.length) {
community.channelMessages[channelId] = [...messages, ...storedMessages]
}
return
}

View File

@ -1,66 +0,0 @@
import { PageDirection } from 'js-waku'
import { handleMessage } from './handle-message'
import type { MessageType } from '../../client'
import type { Waku } from 'js-waku'
const CHUNK_SIZE = 2
const PAGE_SIZE = 100
export async function fetchMessages(
queryHistory: Waku['store']['queryHistory'],
options: {
symKey: Uint8Array
channelContentTopic: string
startTime: Date
endTime: Date
},
storedMessages: MessageType[],
callback: (messages: MessageType[]) => void
) {
const remainingFetchedMessages: MessageType[] = []
let fetchedMessages: MessageType[] = []
await queryHistory([options.channelContentTopic], {
timeFilter: {
startTime: options.startTime,
endTime: options.endTime,
},
pageSize: PAGE_SIZE,
// most recent page first
pageDirection: PageDirection.BACKWARD,
decryptionKeys: [options.symKey],
callback: wakuMessages => {
// most recent message first
for (const wakuMessage of wakuMessages.reverse()) {
const message = handleMessage(wakuMessage)
if (message) {
remainingFetchedMessages.push(message)
}
}
while (remainingFetchedMessages.length > CHUNK_SIZE) {
// reverse
const _chunk = remainingFetchedMessages.splice(0, CHUNK_SIZE).reverse()
const _messages = [..._chunk, ...fetchedMessages, ...storedMessages]
callback(_messages)
fetchedMessages = [..._chunk, ...fetchedMessages]
}
},
})
if (remainingFetchedMessages.length) {
const _chunk = remainingFetchedMessages.splice(0)
const _messages = [..._chunk, ...fetchedMessages, ...storedMessages]
callback(_messages)
fetchedMessages = [..._chunk, ...fetchedMessages]
}
return fetchedMessages
}

View File

@ -0,0 +1,3 @@
export function getChannelId(chatId: string) {
return chatId.slice(68)
}

View File

@ -0,0 +1,56 @@
import { EmojiReaction } from '../../../protos/emoji-reaction'
type Reaction =
| 'heart'
| 'thumbs-up'
| 'thumbs-down'
| 'smile'
| 'sad'
| 'angry'
export type Reactions = {
[key in Reaction]: {
count: number
me: boolean
}
}
const REACTION_MAP: Record<EmojiReaction.Type, string> = {
[EmojiReaction.Type.LOVE]: 'heart',
[EmojiReaction.Type.THUMBS_UP]: 'thumbs-up',
[EmojiReaction.Type.THUMBS_DOWN]: 'thumbs-down',
[EmojiReaction.Type.LAUGH]: 'smile',
[EmojiReaction.Type.SAD]: 'sad',
[EmojiReaction.Type.ANGRY]: 'angry',
[EmojiReaction.Type.UNKNOWN_EMOJI_REACTION_TYPE]: 'unknown',
}
export function getReactions(
reaction: EmojiReaction,
reactions: Reactions,
isMe: boolean
) {
// fixme!: type
const type = REACTION_MAP[reaction.type] as Reaction
const isRetracted = reaction.retracted
const _reaction = {
count: reactions[type].count,
me: reactions[type].me,
}
if (isRetracted && _reaction.count !== 0) {
_reaction.count -= 1
} else {
_reaction.count += 1
}
if (isMe) {
_reaction.me = isRetracted ? false : true
}
return {
...reactions,
[type]: _reaction,
}
}

View File

@ -0,0 +1,241 @@
// todo: merge with handle-channel-chat-message.ts
// todo?: rename to handle-message
import { bytesToHex } from 'ethereum-cryptography/utils'
import { ApplicationMetadataMessage } from '../../../protos/application-metadata-message'
import {
ChatMessage,
DeleteMessage,
EditMessage,
} from '../../../protos/chat-message'
import { EmojiReaction } from '../../../protos/emoji-reaction'
import { PinMessage } from '../../../protos/pin-message'
import { ProtocolMessage } from '../../../protos/protocol-message'
import { payloadToId } from '../../utils/payload-to-id'
import { getChannelId } from './get-channel-id'
import { getReactions } from './get-reactions'
import { mapChatMessage } from './map-chat-message'
import type { MessageType } from './community'
import type { WakuMessage } from 'js-waku'
export function handleChannelChatMessage(
wakuMessage: WakuMessage,
messages: Partial<{ [key: string]: MessageType[] }>,
accountPublicKey?: string
): MessageType[] {
let result: MessageType[] = []
if (!wakuMessage.payload) {
return result
}
// todo: explain
if (!wakuMessage.signaturePublicKey) {
return result
}
const decodedProtocol = ProtocolMessage.decode(wakuMessage.payload)
if (!decodedProtocol) {
return result
}
const decodedMetadata = ApplicationMetadataMessage.decode(
decodedProtocol.publicMessage
)
if (!decodedMetadata.payload) {
return result
}
// todo?:
// if (!decodedMetadata.identity) {
// break
// }
// TODO?: ignore messages which are messageType !== COMMUNITY_CHAT
switch (decodedMetadata.type) {
case ApplicationMetadataMessage.Type.TYPE_CHAT_MESSAGE: {
const decodedPayload = ChatMessage.decode(decodedMetadata.payload)
const messageId = payloadToId(
decodedProtocol.publicMessage,
wakuMessage.signaturePublicKey
)
const channelId = getChannelId(decodedPayload.chatId)
const _messages = messages[channelId] || []
// already received
if (_messages.find(message => message.messageId === messageId)) {
break
}
const message = mapChatMessage(decodedPayload, { messageId, channelId })
// findIndexLeft
// const index = _messages.findIndex(({ timestamp }) => {
// new Date(Number(timestamp)) > new Date(Number(message.timestamp))
// })
// findIndexRight
let index = _messages.length
while (index >= 0) {
const _message = _messages[index - 1]
if (
new Date(Number(_message.timestamp)) <=
new Date(Number(message.timestamp))
) {
break
}
index--
}
_messages.splice(index, 0, message)
result = _messages
break
}
case ApplicationMetadataMessage.Type.TYPE_EDIT_MESSAGE: {
const decodedPayload = EditMessage.decode(decodedMetadata.payload)
const messageId = decodedPayload.messageId
const channelId = getChannelId(decodedPayload.chatId)
const _messages = messages[channelId] || []
// findIndexLeft
// const index = _messages.findIndex(message => message.messageId === messageId)
// findIndexRight
let index = _messages.length
while (--index >= 0) {
const _message = _messages[index]
if (_message.messageId === messageId) {
break
}
}
// original not found
if (index < 0) {
break
}
const _message = _messages[index]
// todo?: use mapChatMessage
const message = {
..._message,
// fixme?: other fields that user can edit
text: decodedPayload.text,
}
_messages[index] = message
result = _messages
break
}
case ApplicationMetadataMessage.Type.TYPE_DELETE_MESSAGE: {
const decodedPayload = DeleteMessage.decode(decodedMetadata.payload)
const messageId = decodedPayload.messageId
const channelId = getChannelId(decodedPayload.chatId)
const _messages = messages[channelId] || []
let index = _messages.length
while (--index >= 0) {
const _message = _messages[index]
if (_message.messageId === messageId) {
break
}
}
// original not found
if (index < 0) {
break
}
// todo?: use delete; set to null
_messages.splice(index, 1)
result = _messages
break
}
case ApplicationMetadataMessage.Type.TYPE_PIN_MESSAGE: {
const decodedPayload = PinMessage.decode(decodedMetadata.payload)
const messageId = decodedPayload.messageId
const channelId = getChannelId(decodedPayload.chatId)
const _messages = messages[channelId] || []
let index = _messages.length
while (--index >= 0) {
const _message = _messages[index]
if (_message.messageId === messageId) {
break
}
}
// original not found
if (index < 0) {
break
}
_messages[index].pinned = Boolean(decodedPayload.pinned)
result = _messages
break
}
case ApplicationMetadataMessage.Type.TYPE_EMOJI_REACTION: {
const decodedPayload = EmojiReaction.decode(decodedMetadata.payload)
const messageId = decodedPayload.messageId
const channelId = getChannelId(decodedPayload.chatId)
const _messages = messages[channelId] || []
let index = _messages.length
while (--index >= 0) {
const _message = _messages[index]
if (_message.messageId === messageId) {
break
}
}
// original not found
if (index < 0) {
break
}
const _message = _messages[index]
const isMe =
accountPublicKey === `0x${bytesToHex(wakuMessage.signaturePublicKey)}`
// fixme!
_messages[index].reactions = getReactions(
decodedPayload,
_message.reactions,
isMe
)
result = _messages
break
}
default:
break
}
return result
}

View File

@ -0,0 +1,26 @@
import { ApplicationMetadataMessage } from '../../../protos/application-metadata-message'
import { CommunityDescription } from '../../wire/community_description'
import type { CommunityMetadataType } from './community'
import type { WakuMessage } from 'js-waku'
export function handleCommunity(
wakuMessage: WakuMessage
): CommunityMetadataType | undefined {
if (!wakuMessage.payload) {
return
}
const decodedMetadata = ApplicationMetadataMessage.decode(wakuMessage.payload)
if (!decodedMetadata.payload) {
return
}
const decodedPayload = CommunityDescription.decode(decodedMetadata.payload)
// todo!: explain
if (!decodedPayload.identity) {
return
}
return decodedPayload.proto
}

View File

@ -0,0 +1,50 @@
import type { ChatMessage } from '../../../protos/chat-message'
import type { MessageType } from './community'
// import type { Reactions } from './set-reactions'
export function mapChatMessage(
decodedMessage: ChatMessage,
props: {
messageId: string
channelId: string
// pinned: boolean
// reactions: Reactions
}
): MessageType {
const { messageId, channelId } = props
const message = {
...decodedMessage,
messageId,
channelId,
pinned: false,
reactions: {
'thumbs-up': {
count: 0,
me: false,
},
'thumbs-down': {
count: 0,
me: false,
},
heart: {
count: 0,
me: false,
},
smile: {
count: 0,
me: false,
},
sad: {
count: 0,
me: false,
},
angry: {
count: 0,
me: false,
},
},
}
return message
}

View File

@ -8,30 +8,29 @@ const CHANNEL_ID =
// '00d3f525-a0cf-4c40-832d-543ec9f8188b' // #messages // '00d3f525-a0cf-4c40-832d-543ec9f8188b' // #messages
'30804ea7-bd66-4d5d-91eb-b2dcfe2515b3' // #test-messages '30804ea7-bd66-4d5d-91eb-b2dcfe2515b3' // #test-messages
// 0x029f196bbfef4fa6a5eb81dd802133a63498325445ca1af1d154b1bb4542955133 c8b6df78-96be-4658-8bde-b51b2a09c599
;(async () => { ;(async () => {
const client = await createClient({ publicKey: COMMUNITY_PUBLIC_KEY }) const client = await createClient({ publicKey: COMMUNITY_PUBLIC_KEY })
await client.start()
await client.createAccount() await client.createAccount()
const community = client.community.communityMetadata const communityMetadata = client.community.fetchCommunity()
client.community.fetchChannelMessages( const fetchChannelMessages =
CHANNEL_ID, await client.community.createFetchChannelMessages(CHANNEL_ID, messages =>
(messages, isDone) => {
console.log(messages) console.log(messages)
return false
},
{ start: new Date('2022-01-01'), end: new Date(), chunk: 3 }
) )
// client.community.onCommunityUpdate(community => console.log(community)) await fetchChannelMessages({ start: new Date('2022-06-08T08:00:00.000Z') })
// client.community.onChannelUpdate(CHANNEL_ID, channel => console.log(channel)) await fetchChannelMessages({ start: new Date('2022-06-08T08:45:00.000Z') })
// client.community.onChannelMessageUpdate(CHANNEL_ID, messages => await fetchChannelMessages({ start: new Date('2022-01-01T08:00:00.000Z') })
// console.log(messages) await fetchChannelMessages({ start: new Date('2021-01-01T08:00:00.000Z') }) // 2021
// )
client.community.onCommunityUpdate(community => console.log(community))
client.community.onChannelMessageUpdate(CHANNEL_ID, messages =>
console.log(messages)
)
debugger
// await client.stop() // await client.stop()
})() })()

View File

@ -1,3 +1,4 @@
// todo: delete
import { createClient } from '../src/client' import { createClient } from '../src/client'
const COMMUNITY_PUBLIC_KEY = const COMMUNITY_PUBLIC_KEY =
@ -9,8 +10,6 @@ const CHANNEL_ID = '00d3f525-a0cf-4c40-832d-543ec9f8188b' // messages
;(async () => { ;(async () => {
const client = await createClient({ publicKey: COMMUNITY_PUBLIC_KEY }) const client = await createClient({ publicKey: COMMUNITY_PUBLIC_KEY })
await client.start()
// client.community.onCommunityUpdate(() => console.log("community:update")) // client.community.onCommunityUpdate(() => console.log("community:update"))
// client.community.onChannelUpdate(() => console.log("channel:update")) // client.community.onChannelUpdate(() => console.log("channel:update"))
client.community.onChannelMessageUpdate(CHANNEL_ID, () => client.community.onChannelMessageUpdate(CHANNEL_ID, () =>

View File

@ -1,44 +1,4 @@
// import { createClient } from '../src/client'
// const COMMUNITY_PUBLIC_KEY =
// '0x029f196bbfef4fa6a5eb81dd802133a63498325445ca1af1d154b1bb4542955133' // Boring community
// // '0x0243611cc13cc4e4390180fe8fd35234ab0fe2a7ba8d32e8ae5dd23b60ac7ec177'
// // '0x02e7102c85ed78e5be30124f8f52014b1135f972c383f55f83ec8ff50436cd1260'
// const CHANNEL_ID =
// // '00d3f525-a0cf-4c40-832d-543ec9f8188b' // #messages
// '30804ea7-bd66-4d5d-91eb-b2dcfe2515b3' // #test-messages
// ;(async () => {
// const client = await createClient({ publicKey: COMMUNITY_PUBLIC_KEY })
// await client.start()
// await client.createAccount()
// const community = client.community.communityMetadata
// client.community.fetchChannelMessages(
// CHANNEL_ID,
// (messages, isDone) => {
// console.log(messages)
// return false
// },
// { start: new Date('2022-01-01'), end: new Date(), chunk: 3 }
// )
// // client.community.onCommunityUpdate(community => console.log(community))
// // client.community.onChannelUpdate(CHANNEL_ID, channel => console.log(channel))
// // client.community.onChannelMessageUpdate(CHANNEL_ID, messages =>
// // console.log(messages)
// // )
// // await client.stop()
// })()
// export type {} from './'
export type { Client, ClientOptions, Community, MessageType } from './client'
export type { Account } from './account' export type { Account } from './account'
export type { Client, ClientOptions } from './client'
export { createClient } from './client' export { createClient } from './client'
export type { Community, MessageType } from './client/community/community'
// import { Community } from './client'
// type h = Community['communityMetadata']['members'][0]