Send and receive message

This commit is contained in:
Franck Royer 2021-09-22 15:05:36 +10:00
parent 4af46a8320
commit 9c2f497c9e
No known key found for this signature in database
GPG Key ID: A82ED75A8DFC50A4
8 changed files with 127 additions and 18 deletions

Binary file not shown.

View File

@ -23,6 +23,7 @@
"proto:build": "buf generate" "proto:build": "buf generate"
}, },
"devDependencies": { "devDependencies": {
"@types/chai": "^4.2.22",
"@types/mocha": "^9.0.0", "@types/mocha": "^9.0.0",
"@typescript-eslint/eslint-plugin": "^4.31.1", "@typescript-eslint/eslint-plugin": "^4.31.1",
"@typescript-eslint/parser": "^4.31.1", "@typescript-eslint/parser": "^4.31.1",

View File

@ -14,19 +14,21 @@ export class Chat {
return chatIdToContentTopic(this.id); return chatIdToContentTopic(this.id);
} }
private createMessage(text: string): ChatMessage { public createMessage(text: string): ChatMessage {
const { timestamp, clock } = this.nextClockAndTimestamp(); const { timestamp, clock } = this._nextClockAndTimestamp();
return ChatMessage.createMessage(clock, timestamp, text, this.id); const message = ChatMessage.createMessage(clock, timestamp, text, this.id);
this._updateFromMessage(message);
return message;
} }
public handleNewMessage(message: ChatMessage) { public handleNewMessage(message: ChatMessage) {
this.updateFromMessage(message); this._updateFromMessage(message);
// TODO: emit message
} }
private nextClockAndTimestamp(): { clock: number; timestamp: number } { private _nextClockAndTimestamp(): { clock: number; timestamp: number } {
let clock = this.lastClockValue; let clock = this.lastClockValue;
const timestamp = Date.now(); const timestamp = Date.now();
@ -39,7 +41,7 @@ export class Chat {
return { clock, timestamp }; return { clock, timestamp };
} }
private updateFromMessage(message: ChatMessage): void { private _updateFromMessage(message: ChatMessage): void {
if (!this.lastMessage || this.lastMessage.clock <= message.clock) { if (!this.lastMessage || this.lastMessage.clock <= message.clock) {
this.lastMessage = message; this.lastMessage = message;
} }

View File

@ -46,7 +46,15 @@ export class ChatMessage {
return new ChatMessage(protoBuf); return new ChatMessage(protoBuf);
} }
encode(): Uint8Array {
return proto.ChatMessage.encode(this.proto).finish();
}
public get clock() { public get clock() {
return this.proto.clock; return this.proto.clock;
} }
get text(): string | undefined {
return this.proto.text;
}
} }

View File

@ -1,7 +1,6 @@
// eslint-disable-next-line @typescript-eslint/ban-ts-comment import { expect } from "chai";
// @ts-ignore: No types available
import TCP from "libp2p-tcp";
import { ChatMessage } from "./chat_message";
import { Messenger } from "./messenger"; import { Messenger } from "./messenger";
const testChatId = "test-chat-id"; const testChatId = "test-chat-id";
@ -38,9 +37,25 @@ describe("Messenger", () => {
]); ]);
}); });
it("Sends message in public chat", function () { it("Sends & Receive message in public chat", async function () {
messenger1.joinChat(testChatId); messenger1.joinChat(testChatId);
messenger2.joinChat(testChatId); messenger2.joinChat(testChatId);
const text = "This is a message.";
const receivedMessagePromise: Promise<ChatMessage> = new Promise(
(resolve) => {
messenger2.addObserver((message) => {
resolve(message);
}, testChatId);
}
);
messenger1.sendMessage(text, testChatId);
const receivedMessage = await receivedMessagePromise;
expect(receivedMessage?.text).to.eq(text);
}); });
afterEach(async function () { afterEach(async function () {

View File

@ -6,21 +6,28 @@ import { ChatMessage } from "./chat_message";
export class Messenger { export class Messenger {
waku: Waku; waku: Waku;
chatsById: Map<string, Chat>; chatsById: Map<string, Chat>;
observers: {
[chatId: string]: Set<(chatMessage: ChatMessage) => void>;
};
private constructor(waku: Waku) { private constructor(waku: Waku) {
this.waku = waku; this.waku = waku;
this.chatsById = new Map(); this.chatsById = new Map();
this.observers = {};
} }
public static async create(wakuOptions?: WakuCreateOptions) { public static async create(wakuOptions?: WakuCreateOptions) {
const _wakuOptions = Object.assign({ bootstrap: true }, wakuOptions); const _wakuOptions = Object.assign({ bootstrap: true }, wakuOptions);
const waku = await Waku.create(_wakuOptions); const waku = await Waku.create(_wakuOptions);
const messenger = new Messenger(waku); return new Messenger(waku);
return messenger;
} }
/**
* Joins a public chat.
*
* Use `addListener` to get messages received on this chat.
*/
public joinChat(chatId: string) { public joinChat(chatId: string) {
if (this.chatsById.has(chatId)) throw "Chat already joined"; if (this.chatsById.has(chatId)) throw "Chat already joined";
@ -33,6 +40,12 @@ export class Messenger {
const chatMessage = ChatMessage.decode(wakuMessage.payload); const chatMessage = ChatMessage.decode(wakuMessage.payload);
chat.handleNewMessage(chatMessage); chat.handleNewMessage(chatMessage);
if (this.observers[chatId]) {
this.observers[chatId].forEach((observer) => {
observer(chatMessage);
});
}
}, },
[chat.contentTopic] [chat.contentTopic]
); );
@ -40,6 +53,64 @@ export class Messenger {
this.chatsById.set(chatId, chat); this.chatsById.set(chatId, chat);
} }
/**
* Sends a message on the given chat Id.
*
* @param text
* @param chatId
*/
public async sendMessage(text: string, chatId: string): Promise<void> {
const chat = this.chatsById.get(chatId);
if (!chat) throw `Chat not joined: ${chatId}`;
const message = chat.createMessage(text);
const wakuMessage = await WakuMessage.fromBytes(
message.encode(),
chat.contentTopic
);
await this.waku.relay.send(wakuMessage);
}
/**
* Add an observer of new messages received on the chat.
*
* @throws string If the chat has not been joined first using [joinChat].
*/
public addObserver(
observer: (chatMessage: ChatMessage) => void,
chatId: string
) {
// Not sure this is the best design here. Maybe `addObserver` and `joinChat` should be merged.
if (!this.chatsById.has(chatId))
throw "Cannot add observer on a chat that is not joined.";
if (!this.observers[chatId]) {
this.observers[chatId] = new Set();
}
this.observers[chatId].add(observer);
}
/**
* Add an observer of new messages received on the chat.
*
* @throws string If the chat has not been joined first using [joinChat].
*/
deleteObserver(
observer: (chatMessage: ChatMessage) => void,
chatId: string
): void {
if (this.observers[chatId]) {
this.observers[chatId].delete(observer);
}
}
/**
* Stops the messenger.
*/
public async stop(): Promise<void> { public async stop(): Promise<void> {
await this.waku.stop(); await this.waku.stop();
} }

View File

@ -38,7 +38,11 @@
"skipLibCheck": true, "skipLibCheck": true,
"lib": ["es6", "dom"], "lib": ["es6", "dom"],
"typeRoots": ["./node_modules/@types", "./src/types", "../../node_modules/@types"] "typeRoots": [
"./node_modules/@types",
"./src/types",
"../../node_modules/@types"
]
}, },
"include": ["src"], "include": ["src"],
"exclude": ["node_modules/**"], "exclude": ["node_modules/**"],

View File

@ -448,6 +448,13 @@ __metadata:
languageName: node languageName: node
linkType: hard linkType: hard
"@types/chai@npm:^4.2.22":
version: 4.2.22
resolution: "@types/chai@npm:4.2.22"
checksum: dca66a263b25c26112c0a8c6df20316412fa54b557443a108836c07cee961aa56cc5b1763273f69eb450c83ca9f28069ff78b617bffc01806cdd83afc1c20c2a
languageName: node
linkType: hard
"@types/debug@npm:^4.1.5": "@types/debug@npm:^4.1.5":
version: 4.1.7 version: 4.1.7
resolution: "@types/debug@npm:4.1.7" resolution: "@types/debug@npm:4.1.7"
@ -5531,6 +5538,7 @@ fsevents@~2.3.2:
version: 0.0.0-use.local version: 0.0.0-use.local
resolution: "status-communities@workspace:packages/status-communities" resolution: "status-communities@workspace:packages/status-communities"
dependencies: dependencies:
"@types/chai": ^4.2.22
"@types/mocha": ^9.0.0 "@types/mocha": ^9.0.0
"@typescript-eslint/eslint-plugin": ^4.31.1 "@typescript-eslint/eslint-plugin": ^4.31.1
"@typescript-eslint/parser": ^4.31.1 "@typescript-eslint/parser": ^4.31.1