From df48d06e8954948fe1fc0e1d86150cadd25612b8 Mon Sep 17 00:00:00 2001 From: Franck Royer Date: Fri, 30 Apr 2021 15:25:58 +1000 Subject: [PATCH] Use store waku to get older messages Chronological order is preserved and dupes are removed. --- web-chat/src/App.tsx | 61 +++++++++++++++++++++++++++++++++++++++++--- 1 file changed, 58 insertions(+), 3 deletions(-) diff --git a/web-chat/src/App.tsx b/web-chat/src/App.tsx index 8e50ee014d..b6949b28a9 100644 --- a/web-chat/src/App.tsx +++ b/web-chat/src/App.tsx @@ -5,6 +5,7 @@ import './App.css'; import { ChatMessage } from 'waku-chat/chat_message'; import { WakuMessage } from 'waku/waku_message'; import { RelayDefaultTopic } from 'waku/waku_relay'; +import { StoreCodec } from 'waku/waku_store'; import handleCommand from './command'; import Room from './Room'; import Waku from 'waku/waku'; @@ -23,7 +24,29 @@ export default function App() { const handleNewMessages = (event: { data: Uint8Array }) => { const chatMsg = decodeWakuMessage(event.data); if (chatMsg) { - copyAndReplace([chatMsg], stateMessages, setMessages); + copyAppendReplace([chatMsg], stateMessages, setMessages); + } + }; + + const handleProtocolChange = async ( + waku: Waku, + { peerId, protocols }: { peerId: PeerId; protocols: string[] } + ) => { + if (protocols.includes(StoreCodec)) { + console.log( + `Retrieving archived messages from ${peerId.toB58String()}` + ); + const response = await waku.store.queryHistory(peerId, [ + ChatContentTopic, + ]); + + if (response) { + const messages = response + .map((wakuMsg) => wakuMsg.payload) + .filter((payload) => !!payload) + .map((payload) => ChatMessage.decode(payload as Uint8Array)); + copyMergeUniqueReplace(messages, stateMessages, setMessages); + } } }; @@ -34,12 +57,21 @@ export default function App() { } else { stateWaku.libp2p.pubsub.on(RelayDefaultTopic, handleNewMessages); + stateWaku.libp2p.peerStore.once( + 'change:protocols', + handleProtocolChange.bind({}, stateWaku) + ); + // To clean up listener when component unmounts return () => { stateWaku?.libp2p.pubsub.removeListener( RelayDefaultTopic, handleNewMessages ); + stateWaku?.libp2p.peerStore.removeListener( + 'change:protocols', + handleProtocolChange.bind({}, stateWaku) + ); }; } }, [stateWaku, stateMessages]); @@ -63,7 +95,7 @@ export default function App() { const commandMessages = response.map((msg) => { return new ChatMessage(new Date(), command, msg); }); - copyAndReplace(commandMessages, stateMessages, setMessages); + copyAppendReplace(commandMessages, stateMessages, setMessages); }} /> @@ -104,7 +136,7 @@ function decodeWakuMessage(data: Uint8Array): null | ChatMessage { return ChatMessage.decode(wakuMsg.payload); } -function copyAndReplace( +function copyAppendReplace( newValues: Array, currentValues: Array, setter: (val: Array) => void @@ -112,3 +144,26 @@ function copyAndReplace( const copy = currentValues.slice(); setter(copy.concat(newValues)); } + +function copyMergeUniqueReplace( + newValues: ChatMessage[], + currentValues: ChatMessage[], + setter: (val: ChatMessage[]) => void +) { + const copy = currentValues.slice(); + newValues.forEach((msg) => { + if (!copy.find(isEqual.bind({}, msg))) { + copy.push(msg); + } + }); + copy.sort((a, b) => a.timestamp.valueOf() - b.timestamp.valueOf()); + setter(copy); +} + +function isEqual(lhs: ChatMessage, rhs: ChatMessage): boolean { + return ( + lhs.nick === rhs.nick && + lhs.message === rhs.message && + lhs.timestamp.toString() === rhs.timestamp.toString() + ); +}