Merge pull request #158 from status-im/138-web-chat-archived-messages

This commit is contained in:
Franck Royer 2021-05-18 13:20:25 +10:00 committed by GitHub
commit fb99d6025d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 147 additions and 67 deletions

View File

@ -7,8 +7,14 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
## [Unreleased] ## [Unreleased]
### Added
- `callback` argument to `WakuStore.queryHistory()`, called as messages are retrieved
; Messages are retrieved using pagination, and it may take some time to retrieve all messages,
with the `callback` function, messages are processed as soon as they are received.
### Changed ### Changed
- Testing: Upgrade nim-waku node to v0.3. - Testing: Upgrade nim-waku node to v0.3.
- **Breaking**: Modify `WakuStore.queryHistory()` to accept one `Object` instead of multiple individual arguments.
## [0.3.0] - 2021-05-15 ## [0.3.0] - 2021-05-15

View File

@ -76,9 +76,10 @@ export default async function startChat(): Promise<void> {
console.log( console.log(
`Retrieving archived messages from ${peerId.toB58String()}` `Retrieving archived messages from ${peerId.toB58String()}`
); );
const messages = await waku.store.queryHistory(peerId, [ const messages = await waku.store.queryHistory({
ChatContentTopic, peerId,
]); contentTopics: [ChatContentTopic],
});
messages?.map((msg) => { messages?.map((msg) => {
if (msg.payload) { if (msg.payload) {
const chatMsg = ChatMessage.decode(msg.payload); const chatMsg = ChatMessage.decode(msg.payload);

View File

@ -45,6 +45,29 @@ const themes = {
export const ChatContentTopic = 'dingpu'; export const ChatContentTopic = 'dingpu';
async function retrieveStoreMessages(
waku: Waku,
peerId: PeerId,
setArchivedMessages: (value: ChatMessage[]) => void
): Promise<number> {
const callback = (wakuMessages: WakuMessage[]): void => {
const messages = wakuMessages
.map((wakuMsg) => wakuMsg.payload)
.filter((payload) => !!payload)
.map((payload) => ChatMessage.decode(payload as Uint8Array));
setArchivedMessages(messages);
};
const res = await waku.store.queryHistory({
peerId,
contentTopics: [ChatContentTopic],
pageSize: 5,
callback,
});
return res ? res.length : 0;
}
export default function App() { export default function App() {
let [newMessages, setNewMessages] = useState<ChatMessage[]>([]); let [newMessages, setNewMessages] = useState<ChatMessage[]>([]);
let [archivedMessages, setArchivedMessages] = useState<ChatMessage[]>([]); let [archivedMessages, setArchivedMessages] = useState<ChatMessage[]>([]);
@ -61,6 +84,7 @@ export default function App() {
} }
}; };
// TODO: Split this
const handleProtocolChange = async ( const handleProtocolChange = async (
waku: Waku, waku: Waku,
{ peerId, protocols }: { peerId: PeerId; protocols: string[] } { peerId, protocols }: { peerId: PeerId; protocols: string[] }
@ -68,17 +92,12 @@ export default function App() {
if (protocols.includes(StoreCodec)) { if (protocols.includes(StoreCodec)) {
console.log(`${peerId.toB58String()}: retrieving archived messages}`); console.log(`${peerId.toB58String()}: retrieving archived messages}`);
try { try {
const response = await waku.store.queryHistory(peerId, [ const length = await retrieveStoreMessages(
ChatContentTopic, waku,
]); peerId,
console.log(`${peerId.toB58String()}: messages retrieved:`, response); setArchivedMessages
if (response) { );
const messages = response console.log(`${peerId.toB58String()}: messages retrieved:`, length);
.map((wakuMsg) => wakuMsg.payload)
.filter((payload) => !!payload)
.map((payload) => ChatMessage.decode(payload as Uint8Array));
setArchivedMessages(messages);
}
} catch (e) { } catch (e) {
console.log( console.log(
`${peerId.toB58String()}: error encountered when retrieving archived messages`, `${peerId.toB58String()}: error encountered when retrieving archived messages`,

View File

@ -14,10 +14,11 @@ interface Props {
export default function ChatList(props: Props) { export default function ChatList(props: Props) {
const [messages, setMessages] = useState<ChatMessage[]>([]); const [messages, setMessages] = useState<ChatMessage[]>([]);
const [groupedMessages, setGroupedMessages] = useState<ChatMessage[][]>([]);
let updatedMessages; let updatedMessages;
if (IsThereNewMessages(props.newMessages, messages)) { if (IsThereNewMessages(props.newMessages, messages)) {
updatedMessages = messages.slice().concat(props.newMessages); updatedMessages = messages.concat(props.newMessages);
if (IsThereNewMessages(props.archivedMessages, updatedMessages)) { if (IsThereNewMessages(props.archivedMessages, updatedMessages)) {
updatedMessages = copyMergeUniqueReplace( updatedMessages = copyMergeUniqueReplace(
props.archivedMessages, props.archivedMessages,
@ -34,32 +35,31 @@ export default function ChatList(props: Props) {
} }
if (updatedMessages) { if (updatedMessages) {
setGroupedMessages(groupMessagesBySender(updatedMessages));
setMessages(updatedMessages); setMessages(updatedMessages);
} }
const messagesGroupedBySender = groupMessagesBySender(messages).map( const renderedGroupedMessages = groupedMessages.map((currentMessageGroup) => (
(currentMessageGroup) => ( <MessageGroup onlyFirstWithMeta>
<MessageGroup onlyFirstWithMeta> {currentMessageGroup.map((currentMessage) => (
{currentMessageGroup.map((currentMessage) => ( <Message
<Message key={
key={ currentMessage.timestamp.valueOf() +
currentMessage.timestamp.valueOf() + currentMessage.nick +
currentMessage.nick + currentMessage.payloadAsUtf8
currentMessage.payloadAsUtf8 }
} authorName={currentMessage.nick}
authorName={currentMessage.nick} date={formatDisplayDate(currentMessage)}
date={formatDisplayDate(currentMessage)} >
> <MessageText>{currentMessage.payloadAsUtf8}</MessageText>
<MessageText>{currentMessage.payloadAsUtf8}</MessageText> </Message>
</Message> ))}
))} </MessageGroup>
</MessageGroup> ));
)
);
return ( return (
<MessageList active containScrollInSubtree> <MessageList active containScrollInSubtree>
{messagesGroupedBySender} {renderedGroupedMessages}
<AlwaysScrollToBottom newMessages={props.newMessages} /> <AlwaysScrollToBottom newMessages={props.newMessages} />
</MessageList> </MessageList>
); );

View File

@ -2,31 +2,42 @@ import { Reader } from 'protobufjs/minimal';
import { v4 as uuid } from 'uuid'; import { v4 as uuid } from 'uuid';
import * as proto from '../../proto/waku/v2/store'; import * as proto from '../../proto/waku/v2/store';
import { DefaultContentTopic } from '../waku_message';
import { DefaultPubsubTopic } from '../waku_relay'; export enum Direction {
BACKWARD = 'backward',
FORWARD = 'forward',
}
export interface Options {
contentTopics: string[];
cursor?: proto.Index;
pubsubTopic: string;
direction: Direction;
pageSize: number;
}
export class HistoryRPC { export class HistoryRPC {
public constructor(public proto: proto.HistoryRPC) {} public constructor(public proto: proto.HistoryRPC) {}
static createQuery( /**
contentTopics: string[] = [DefaultContentTopic], * Create History Query.
cursor?: proto.Index, */
pubsubTopic: string = DefaultPubsubTopic static createQuery(options: Options): HistoryRPC {
): HistoryRPC { const direction = directionToProto(options.direction);
const pagingInfo = { const pagingInfo = {
pageSize: 10, pageSize: options.pageSize,
cursor, cursor: options.cursor,
direction: proto.PagingInfo_Direction.DIRECTION_FORWARD, direction,
}; };
const contentFilters = contentTopics.map((contentTopic) => { const contentFilters = options.contentTopics.map((contentTopic) => {
return { contentTopic }; return { contentTopic };
}); });
return new HistoryRPC({ return new HistoryRPC({
requestId: uuid(), requestId: uuid(),
query: { query: {
pubsubTopic, pubsubTopic: options.pubsubTopic,
contentFilters, contentFilters,
pagingInfo, pagingInfo,
startTime: undefined, startTime: undefined,
@ -53,3 +64,14 @@ export class HistoryRPC {
return this.proto.response; return this.proto.response;
} }
} }
function directionToProto(direction: Direction): proto.PagingInfo_Direction {
switch (direction) {
case Direction.BACKWARD:
return proto.PagingInfo_Direction.DIRECTION_BACKWARD_UNSPECIFIED;
case Direction.FORWARD:
return proto.PagingInfo_Direction.DIRECTION_FORWARD;
default:
return proto.PagingInfo_Direction.DIRECTION_BACKWARD_UNSPECIFIED;
}
}

View File

@ -3,7 +3,9 @@ import TCP from 'libp2p-tcp';
import { makeLogFileName, NimWaku, NOISE_KEY_1 } from '../../test_utils'; import { makeLogFileName, NimWaku, NOISE_KEY_1 } from '../../test_utils';
import { Waku } from '../waku'; import { Waku } from '../waku';
import { WakuMessage } from '../waku_message'; import { DefaultContentTopic, WakuMessage } from '../waku_message';
import { Direction } from './history_rpc';
describe('Waku Store', () => { describe('Waku Store', () => {
let waku: Waku; let waku: Waku;
@ -39,7 +41,10 @@ describe('Waku Store', () => {
const nimPeerId = await nimWaku.getPeerId(); const nimPeerId = await nimWaku.getPeerId();
const messages = await waku.store.queryHistory(nimPeerId); const messages = await waku.store.queryHistory({
peerId: nimPeerId,
contentTopics: [],
});
expect(messages?.length).eq(2); expect(messages?.length).eq(2);
const result = messages?.findIndex((msg) => { const result = messages?.findIndex((msg) => {
@ -73,7 +78,11 @@ describe('Waku Store', () => {
const nimPeerId = await nimWaku.getPeerId(); const nimPeerId = await nimWaku.getPeerId();
const messages = await waku.store.queryHistory(nimPeerId); const messages = await waku.store.queryHistory({
peerId: nimPeerId,
contentTopics: [DefaultContentTopic],
direction: Direction.FORWARD,
});
expect(messages?.length).eq(15); expect(messages?.length).eq(15);
for (let index = 0; index < 2; index++) { for (let index = 0; index < 2; index++) {

View File

@ -5,11 +5,21 @@ import Libp2p from 'libp2p';
import PeerId from 'peer-id'; import PeerId from 'peer-id';
import { WakuMessage } from '../waku_message'; import { WakuMessage } from '../waku_message';
import { DefaultPubsubTopic } from '../waku_relay';
import { HistoryRPC } from './history_rpc'; import { Direction, HistoryRPC } from './history_rpc';
export const StoreCodec = '/vac/waku/store/2.0.0-beta3'; export const StoreCodec = '/vac/waku/store/2.0.0-beta3';
export interface Options {
peerId: PeerId;
contentTopics: string[];
pubsubTopic?: string;
direction?: Direction;
pageSize?: number;
callback?: (messages: WakuMessage[]) => void;
}
/** /**
* Implements the [Waku v2 Store protocol](https://rfc.vac.dev/spec/13/). * Implements the [Waku v2 Store protocol](https://rfc.vac.dev/spec/13/).
*/ */
@ -19,19 +29,26 @@ export class WakuStore {
/** /**
* Query given peer using Waku Store. * Query given peer using Waku Store.
* *
* @param peerId The peer to query. * @param options
* @param contentTopics The content topics to retrieve, leave empty to * @param options.peerId The peer to query.
* @param options.contentTopics The content topics to retrieve, leave empty to
* retrieve all messages. * retrieve all messages.
* @param pubsubTopic The pubsub topic to retrieve. Currently, all waku nodes * @param options.pubsubTopic The pubsub topic to retrieve. Currently, all waku nodes
* use the same pubsub topic. This is reserved for future applications. * use the same pubsub topic. This is reserved for future applications.
* @param options.callback Callback called on page of stored messages as they are retrieved
* @throws If not able to reach the peer to query. * @throws If not able to reach the peer to query.
*/ */
async queryHistory( async queryHistory(options: Options): Promise<WakuMessage[] | null> {
peerId: PeerId, const opts = Object.assign(
contentTopics?: string[], {
pubsubTopic?: string pubsubTopic: DefaultPubsubTopic,
): Promise<WakuMessage[] | null> { direction: Direction.BACKWARD,
const peer = this.libp2p.peerStore.get(peerId); pageSize: 10,
},
options
);
const peer = this.libp2p.peerStore.get(opts.peerId);
if (!peer) throw 'Peer is unknown'; if (!peer) throw 'Peer is unknown';
if (!peer.protocols.includes(StoreCodec)) if (!peer.protocols.includes(StoreCodec))
throw 'Peer does not register waku store protocol'; throw 'Peer does not register waku store protocol';
@ -44,11 +61,8 @@ export class WakuStore {
try { try {
const { stream } = await connection.newStream(StoreCodec); const { stream } = await connection.newStream(StoreCodec);
try { try {
const historyRpcQuery = HistoryRPC.createQuery( const queryOpts = Object.assign(opts, { cursor });
contentTopics, const historyRpcQuery = HistoryRPC.createQuery(queryOpts);
cursor,
pubsubTopic
);
const res = await pipe( const res = await pipe(
[historyRpcQuery.encode()], [historyRpcQuery.encode()],
lp.encode(), lp.encode(),
@ -71,8 +85,17 @@ export class WakuStore {
return messages; return messages;
} }
response.messages.map((protoMsg) => { const pageMessages = response.messages.map((protoMsg) => {
messages.push(new WakuMessage(protoMsg)); return new WakuMessage(protoMsg);
});
if (opts.callback) {
// TODO: Test the callback feature
opts.callback(pageMessages);
}
pageMessages.forEach((wakuMessage) => {
messages.push(wakuMessage);
}); });
const responsePageSize = response.pagingInfo?.pageSize; const responsePageSize = response.pagingInfo?.pageSize;