126: Ensure that listeners are added only when waku is initialised r=D4nte a=D4nte

this removes the `Reached max listeners` error.

Co-authored-by: Franck Royer <franck@status.im>
This commit is contained in:
bors[bot] 2021-05-05 04:42:40 +00:00 committed by GitHub
commit a43e9987ff
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 151 additions and 67 deletions

View File

@ -2,7 +2,8 @@ import { multiaddr } from 'multiaddr';
import PeerId from 'peer-id'; import PeerId from 'peer-id';
import { useEffect, useState } from 'react'; import { useEffect, useState } from 'react';
import './App.css'; import './App.css';
import { ChatMessage } from 'waku/chat_message'; import { ChatMessage } from './ChatMessage';
import { ChatMessage as WakuChatMessage } from 'waku/chat_message';
import { WakuMessage } from 'waku/waku_message'; import { WakuMessage } from 'waku/waku_message';
import { RelayDefaultTopic } from 'waku/waku_relay'; import { RelayDefaultTopic } from 'waku/waku_relay';
import { StoreCodec } from 'waku/waku_store'; import { StoreCodec } from 'waku/waku_store';
@ -45,15 +46,16 @@ const themes = {
export const ChatContentTopic = 'dingpu'; export const ChatContentTopic = 'dingpu';
export default function App() { export default function App() {
let [stateMessages, setMessages] = useState<ChatMessage[]>([]); let [newMessages, setNewMessages] = useState<ChatMessage[]>([]);
let [archivedMessages, setArchivedMessages] = useState<ChatMessage[]>([]);
let [stateWaku, setWaku] = useState<Waku | undefined>(undefined); let [stateWaku, setWaku] = useState<Waku | undefined>(undefined);
let [nick, setNick] = useState<string>(generate()); let [nick, setNick] = useState<string>(generate());
useEffect(() => { useEffect(() => {
const handleNewMessages = (event: { data: Uint8Array }) => { const handleRelayMessage = (event: { data: Uint8Array }) => {
const chatMsg = decodeWakuMessage(event.data); const chatMsg = decodeWakuMessage(event.data);
if (chatMsg) { if (chatMsg) {
copyAppendReplace([chatMsg], stateMessages, setMessages); setNewMessages([chatMsg]);
} }
}; };
@ -73,8 +75,11 @@ export default function App() {
const messages = response const messages = response
.map((wakuMsg) => wakuMsg.payload) .map((wakuMsg) => wakuMsg.payload)
.filter((payload) => !!payload) .filter((payload) => !!payload)
.map((payload) => ChatMessage.decode(payload as Uint8Array)); .map((payload) => WakuChatMessage.decode(payload as Uint8Array))
copyMergeUniqueReplace(messages, stateMessages, setMessages); .map((wakuChatMessage) =>
ChatMessage.fromWakuChatMessage(wakuChatMessage)
);
setArchivedMessages(messages);
} }
} }
}; };
@ -84,7 +89,7 @@ export default function App() {
.then(() => console.log('Waku init done')) .then(() => console.log('Waku init done'))
.catch((e) => console.log('Waku init failed ', e)); .catch((e) => console.log('Waku init failed ', e));
} else { } else {
stateWaku.libp2p.pubsub.on(RelayDefaultTopic, handleNewMessages); stateWaku.libp2p.pubsub.on(RelayDefaultTopic, handleRelayMessage);
stateWaku.libp2p.peerStore.on( stateWaku.libp2p.peerStore.on(
'change:protocols', 'change:protocols',
@ -95,7 +100,7 @@ export default function App() {
return () => { return () => {
stateWaku?.libp2p.pubsub.removeListener( stateWaku?.libp2p.pubsub.removeListener(
RelayDefaultTopic, RelayDefaultTopic,
handleNewMessages handleRelayMessage
); );
stateWaku?.libp2p.peerStore.removeListener( stateWaku?.libp2p.peerStore.removeListener(
'change:protocols', 'change:protocols',
@ -103,7 +108,7 @@ export default function App() {
); );
}; };
} }
}, [stateWaku, stateMessages]); }, [stateWaku]);
return ( return (
<div <div
@ -114,7 +119,8 @@ export default function App() {
<ThemeProvider theme={themes}> <ThemeProvider theme={themes}>
<Room <Room
nick={nick} nick={nick}
lines={stateMessages} newMessages={newMessages}
archivedMessages={archivedMessages}
commandHandler={(input: string) => { commandHandler={(input: string) => {
const { command, response } = handleCommand( const { command, response } = handleCommand(
input, input,
@ -122,9 +128,9 @@ export default function App() {
setNick setNick
); );
const commandMessages = response.map((msg) => { const commandMessages = response.map((msg) => {
return new ChatMessage(new Date(), command, msg); return new ChatMessage(new Date(), new Date(), command, msg);
}); });
copyAppendReplace(commandMessages, stateMessages, setMessages); setNewMessages(commandMessages);
}} }}
/> />
</ThemeProvider> </ThemeProvider>
@ -162,37 +168,7 @@ function decodeWakuMessage(data: Uint8Array): null | ChatMessage {
if (!wakuMsg.payload) { if (!wakuMsg.payload) {
return null; return null;
} }
return ChatMessage.decode(wakuMsg.payload); return ChatMessage.fromWakuChatMessage(
} WakuChatMessage.decode(wakuMsg.payload)
function copyAppendReplace<T>(
newValues: Array<T>,
currentValues: Array<T>,
setter: (val: Array<T>) => void
) {
const copy = currentValues.slice();
setter(copy.concat(newValues));
}
function copyMergeUniqueReplace(
newValues: ChatMessage[],
currentValues: ChatMessage[],
setter: (val: ChatMessage[]) => void
) {
const copy = currentValues.slice();
newValues.forEach((msg) => {
if (!copy.find(isEqual.bind({}, msg))) {
copy.push(msg);
}
});
copy.sort((a, b) => a.timestamp.valueOf() - b.timestamp.valueOf());
setter(copy);
}
function isEqual(lhs: ChatMessage, rhs: ChatMessage): boolean {
return (
lhs.nick === rhs.nick &&
lhs.message === rhs.message &&
lhs.timestamp.toString() === rhs.timestamp.toString()
); );
} }

View File

@ -1,5 +1,5 @@
import { useEffect, useRef } from 'react'; import { useEffect, useRef, useState } from 'react';
import { ChatMessage } from 'waku/chat_message'; import { ChatMessage } from './ChatMessage';
import { import {
Message, Message,
MessageText, MessageText,
@ -8,19 +8,45 @@ import {
} from '@livechat/ui-kit'; } from '@livechat/ui-kit';
interface Props { interface Props {
messages: ChatMessage[]; archivedMessages: ChatMessage[];
newMessages: ChatMessage[];
} }
export default function ChatList(props: Props) { export default function ChatList(props: Props) {
const messages = props.messages; const [messages, setMessages] = useState<ChatMessage[]>([]);
let updatedMessages;
const messagesGroupedBySender = groupMessagesBySender(props.messages).map( if (IsThereNewMessages(props.newMessages, messages)) {
updatedMessages = messages.slice().concat(props.newMessages);
if (IsThereNewMessages(props.archivedMessages, updatedMessages)) {
updatedMessages = copyMergeUniqueReplace(
props.archivedMessages,
updatedMessages
);
}
} else {
if (IsThereNewMessages(props.archivedMessages, messages)) {
updatedMessages = copyMergeUniqueReplace(
props.archivedMessages,
messages
);
}
}
if (updatedMessages) {
setMessages(updatedMessages);
}
const messagesGroupedBySender = groupMessagesBySender(messages).map(
(currentMessageGroup) => ( (currentMessageGroup) => (
<MessageGroup onlyFirstWithMeta> <MessageGroup onlyFirstWithMeta>
{currentMessageGroup.map((currentMessage) => ( {currentMessageGroup.map((currentMessage) => (
<Message <Message
// We assume that the same user is not sending two messages in the same second key={
key={currentMessage.timestamp.toString() + currentMessage.nick} currentMessage.receivedTimestampMs.valueOf() +
currentMessage.nick +
currentMessage.message
}
authorName={currentMessage.nick} authorName={currentMessage.nick}
date={formatDisplayDate(currentMessage)} date={formatDisplayDate(currentMessage)}
> >
@ -34,7 +60,7 @@ export default function ChatList(props: Props) {
return ( return (
<MessageList active containScrollInSubtree> <MessageList active containScrollInSubtree>
{messagesGroupedBySender} {messagesGroupedBySender}
<AlwaysScrollToBottom messages={messages} /> <AlwaysScrollToBottom newMessages={props.newMessages} />
</MessageList> </MessageList>
); );
} }
@ -58,7 +84,7 @@ function groupMessagesBySender(messageArray: ChatMessage[]): ChatMessage[][] {
} }
function formatDisplayDate(message: ChatMessage): string { function formatDisplayDate(message: ChatMessage): string {
return message.timestamp.toLocaleString([], { return message.sentTimestamp.toLocaleString([], {
month: 'short', month: 'short',
day: 'numeric', day: 'numeric',
hour: 'numeric', hour: 'numeric',
@ -67,14 +93,48 @@ function formatDisplayDate(message: ChatMessage): string {
}); });
} }
const AlwaysScrollToBottom = (props: Props) => { const AlwaysScrollToBottom = (props: { newMessages: ChatMessage[] }) => {
const elementRef = useRef<HTMLDivElement>(); const elementRef = useRef<HTMLDivElement>();
useEffect(() => { useEffect(() => {
// @ts-ignore // @ts-ignore
elementRef.current.scrollIntoView(); elementRef.current.scrollIntoView();
}, [props.messages]); }, [props.newMessages]);
// @ts-ignore // @ts-ignore
return <div ref={elementRef} />; return <div ref={elementRef} />;
}; };
function IsThereNewMessages(
newValues: ChatMessage[],
currentValues: ChatMessage[]
): boolean {
if (newValues.length === 0) return false;
if (currentValues.length === 0) return true;
return !newValues.find((newMsg) =>
currentValues.find(isEqual.bind({}, newMsg))
);
}
function copyMergeUniqueReplace(
newValues: ChatMessage[],
currentValues: ChatMessage[]
) {
const copy = currentValues.slice();
newValues.forEach((msg) => {
if (!copy.find(isEqual.bind({}, msg))) {
copy.push(msg);
}
});
copy.sort((a, b) => a.sentTimestamp.valueOf() - b.sentTimestamp.valueOf());
return copy;
}
function isEqual(lhs: ChatMessage, rhs: ChatMessage): boolean {
return (
lhs.nick === rhs.nick &&
lhs.message === rhs.message &&
lhs.sentTimestamp.toString() === rhs.sentTimestamp.toString()
);
}

View File

@ -0,0 +1,19 @@
import { ChatMessage as WakuChatMessage } from 'waku/chat_message';
export class ChatMessage {
constructor(
public receivedTimestampMs: Date,
public sentTimestamp: Date,
public nick: string,
public message: string
) {}
static fromWakuChatMessage(wakuChatMessage: WakuChatMessage): ChatMessage {
return new ChatMessage(
new Date(),
wakuChatMessage.timestamp,
wakuChatMessage.nick,
wakuChatMessage.message
);
}
}

View File

@ -10,8 +10,7 @@ import {
} from '@livechat/ui-kit'; } from '@livechat/ui-kit';
interface Props { interface Props {
messageHandler: (msg: string) => void; sendMessage: ((msg: string) => Promise<void>) | undefined;
sendMessage: (() => Promise<void>) | undefined;
} }
export default function MessageInput(props: Props) { export default function MessageInput(props: Props) {
@ -20,14 +19,13 @@ export default function MessageInput(props: Props) {
const sendMessage = async () => { const sendMessage = async () => {
if (props.sendMessage) { if (props.sendMessage) {
await props.sendMessage(); await props.sendMessage(inputText);
setInputText(''); setInputText('');
} }
}; };
const messageHandler = (event: ChangeEvent<HTMLInputElement>) => { const messageHandler = (event: ChangeEvent<HTMLInputElement>) => {
setInputText(event.target.value); setInputText(event.target.value);
props.messageHandler(event.target.value);
}; };
const keyPressHandler = async (event: KeyboardEvent<HTMLInputElement>) => { const keyPressHandler = async (event: KeyboardEvent<HTMLInputElement>) => {

View File

@ -1,5 +1,5 @@
import { useState } from 'react'; import { ChatMessage } from './ChatMessage';
import { ChatMessage } from 'waku/chat_message'; import { ChatMessage as WakuChatMessage } from 'waku/chat_message';
import { WakuMessage } from 'waku/waku_message'; import { WakuMessage } from 'waku/waku_message';
import { ChatContentTopic } from './App'; import { ChatContentTopic } from './App';
import ChatList from './ChatList'; import ChatList from './ChatList';
@ -8,13 +8,13 @@ import { useWaku } from './WakuContext';
import { TitleBar } from '@livechat/ui-kit'; import { TitleBar } from '@livechat/ui-kit';
interface Props { interface Props {
lines: ChatMessage[]; newMessages: ChatMessage[];
archivedMessages: ChatMessage[];
commandHandler: (cmd: string) => void; commandHandler: (cmd: string) => void;
nick: string; nick: string;
} }
export default function Room(props: Props) { export default function Room(props: Props) {
let [messageToSend, setMessageToSend] = useState<string>('');
const { waku } = useWaku(); const { waku } = useWaku();
return ( return (
@ -23,12 +23,14 @@ export default function Room(props: Props) {
style={{ height: '98vh', display: 'flex', flexDirection: 'column' }} style={{ height: '98vh', display: 'flex', flexDirection: 'column' }}
> >
<TitleBar title="Waku v2 chat app" /> <TitleBar title="Waku v2 chat app" />
<ChatList messages={props.lines} /> <ChatList
newMessages={props.newMessages}
archivedMessages={props.archivedMessages}
/>
<MessageInput <MessageInput
messageHandler={setMessageToSend}
sendMessage={ sendMessage={
waku waku
? async () => { ? async (messageToSend) => {
return handleMessage( return handleMessage(
messageToSend, messageToSend,
props.nick, props.nick,
@ -52,7 +54,7 @@ async function handleMessage(
if (message.startsWith('/')) { if (message.startsWith('/')) {
commandHandler(message); commandHandler(message);
} else { } else {
const chatMessage = new ChatMessage(new Date(), nick, message); const chatMessage = new WakuChatMessage(new Date(), nick, message);
const wakuMsg = WakuMessage.fromBytes( const wakuMsg = WakuMessage.fromBytes(
chatMessage.encode(), chatMessage.encode(),
ChatContentTopic ChatContentTopic

View File

@ -78,6 +78,32 @@ function peers(waku: Waku | undefined): string[] {
return response; return response;
} }
function connections(waku: Waku | undefined): string[] {
if (!waku) {
return ['Waku node is starting'];
}
let response: string[] = [];
waku.libp2p.connections.forEach(
(
connections: import('libp2p-interfaces/src/connection/connection')[],
peerId
) => {
response.push(peerId + ':');
let strConnections = ' connections: [';
connections.forEach((connection) => {
strConnections += JSON.stringify(connection.stat);
strConnections += '; ' + JSON.stringify(connection.streams);
});
strConnections += ']';
response.push(strConnections);
}
);
if (response.length === 0) {
response.push('Not connected to any peer.');
}
return response;
}
export default function handleCommand( export default function handleCommand(
input: string, input: string,
waku: Waku | undefined, waku: Waku | undefined,
@ -102,6 +128,9 @@ export default function handleCommand(
case '/peers': case '/peers':
peers(waku).map((str) => response.push(str)); peers(waku).map((str) => response.push(str));
break; break;
case '/connections':
connections(waku).map((str) => response.push(str));
break;
default: default:
response.push(`Unknown Command '${command}'`); response.push(`Unknown Command '${command}'`);
} }