use relay

This commit is contained in:
Sasha 2023-11-07 00:50:47 +01:00
parent 156ee1d517
commit bfd9ed1454
No known key found for this signature in database
3 changed files with 51 additions and 78 deletions

View File

@ -15,7 +15,6 @@
"next": "13.5.6", "next": "13.5.6",
"react": "^18", "react": "^18",
"react-dom": "^18", "react-dom": "^18",
"uuid": "^9.0.1",
"zustand": "^4.4.4" "zustand": "^4.4.4"
}, },
"devDependencies": { "devDependencies": {

View File

@ -28,16 +28,16 @@ export const useWaku = () => {
setMessages((prev) => [...prev, ...parsedMessaged]); setMessages((prev) => [...prev, ...parsedMessaged]);
}; };
waku.filter.addEventListener(CONTENT_TOPIC, messageListener); waku.relay.addEventListener(CONTENT_TOPIC, messageListener);
return () => { return () => {
waku.filter.removeEventListener(CONTENT_TOPIC, messageListener); waku.relay.removeEventListener(CONTENT_TOPIC, messageListener);
}; };
}, [setMessages]); }, [setMessages]);
const onSend = React.useCallback( const onSend = React.useCallback(
async (nick: string, text: string) => { async (nick: string, text: string) => {
await waku.lightPush.send({ await waku.relay.send({
version: 0, version: 0,
timestamp: Date.now(), timestamp: Date.now(),
contentTopic: CONTENT_TOPIC, contentTopic: CONTENT_TOPIC,

View File

@ -1,71 +1,70 @@
import { v4 as uuid } from "uuid"; import { PUBSUB_TOPIC } from "@/constants";
import {
PUBSUB_TOPIC,
} from "@/constants";
import { http } from "@/utils/http"; import { http } from "@/utils/http";
export type Message = { export type Message = {
payload: string, payload: string;
contentTopic: string, contentTopic: string;
version: number, version: number;
timestamp: number timestamp: number;
}; };
type EventListener = (event: CustomEvent) => void; type EventListener = (event: CustomEvent) => void;
const SECOND = 1000; const SECOND = 1000;
const LOCAL_NODE = "http://127.0.0.1:8645/"; const LOCAL_NODE = "http://127.0.0.1:8645/";
const FILTER_URL = "/filter/v2/"; const RELAY = "/relay/v1";
const LIGHT_PUSH = "/lightpush/v1/";
class Filter { const buildURL = (endpoint: string) => `${LOCAL_NODE}${endpoint}`;
private readonly internalEmitter = new EventTarget();
class Relay {
private readonly subscriptionsEmitter = new EventTarget(); private readonly subscriptionsEmitter = new EventTarget();
private contentTopicToRequestID: Map<string, string> = new Map();
private contentTopicListeners: Map<string, number> = new Map(); private contentTopicListeners: Map<string, number> = new Map();
// only one content topic subscriptions is possible now // only one content topic subscriptions is possible now
private subscriptionRoutine: undefined | number; private subscriptionRoutine: undefined | number;
constructor() { constructor() {}
this.internalEmitter.addEventListener("subscribed", this.handleSubscribed.bind(this));
this.internalEmitter.addEventListener("unsubscribed", this.handleUnsubscribed.bind(this)); public addEventListener(contentTopic: string, fn: EventListener) {
this.handleSubscribed(contentTopic);
return this.subscriptionsEmitter.addEventListener(contentTopic, fn as any);
} }
private async handleSubscribed(_e: Event) {
const event = _e as CustomEvent;
const contentTopic = event.detail;
const numberOfListeners = this.contentTopicListeners.get(contentTopic);
// if nwaku node already subscribed to this content topic public removeEventListener(contentTopic: string, fn: EventListener) {
if (numberOfListeners) { this.handleUnsubscribed(contentTopic);
this.contentTopicListeners.set(contentTopic, numberOfListeners + 1); return this.subscriptionsEmitter.removeEventListener(
return; contentTopic,
} fn as any
);
}
const requestId = uuid(); private async handleSubscribed(contentTopic: string) {
await http.post(`${LOCAL_NODE}/${FILTER_URL}/subscriptions`, { const numberOfListeners = this.contentTopicListeners.get(contentTopic);
requestId,
contentFilters: [contentTopic], // if nwaku node already subscribed to this content topic
pubsubTopic: PUBSUB_TOPIC if (numberOfListeners) {
}); this.contentTopicListeners.set(contentTopic, numberOfListeners + 1);
return;
}
try {
await http.post(buildURL(`${RELAY}/subscriptions`), [PUBSUB_TOPIC]);
this.subscriptionRoutine = window.setInterval(async () => { this.subscriptionRoutine = window.setInterval(async () => {
await this.fetchMessages(); await this.fetchMessages();
}, SECOND); }, SECOND);
this.contentTopicToRequestID.set(contentTopic, requestId);
this.contentTopicListeners.set(contentTopic, 1); this.contentTopicListeners.set(contentTopic, 1);
} catch (error) {
console.error(`Failed to subscribe node ${contentTopic}:`, error);
}
} }
private async handleUnsubscribed(_e: Event) { private async handleUnsubscribed(contentTopic: string) {
const event = _e as CustomEvent;
const contentTopic = event.detail;
const requestId = this.contentTopicToRequestID.get(contentTopic);
const numberOfListeners = this.contentTopicListeners.get(contentTopic); const numberOfListeners = this.contentTopicListeners.get(contentTopic);
if (!numberOfListeners || !requestId) { if (!numberOfListeners) {
return; return;
} }
@ -74,15 +73,14 @@ class Filter {
return; return;
} }
await http.delete(`${LOCAL_NODE}/${FILTER_URL}/subscriptions`, { try {
requestId, await http.delete(buildURL(`${RELAY}/subscriptions`), [PUBSUB_TOPIC]);
contentFilters: [contentTopic], } catch (error) {
pubsubTopic: PUBSUB_TOPIC console.error(`Failed to unsubscribe node from ${contentTopic}:`, error);
}); }
clearInterval(this.subscriptionRoutine); clearInterval(this.subscriptionRoutine);
this.contentTopicListeners.delete(contentTopic); this.contentTopicListeners.delete(contentTopic);
this.contentTopicToRequestID.delete(contentTopic);
} }
private async fetchMessages(): Promise<void> { private async fetchMessages(): Promise<void> {
@ -92,7 +90,9 @@ class Filter {
return; return;
} }
const response = await http.get(`${LOCAL_NODE}/${FILTER_URL}/${encodeURIComponent(contentTopic)}`); const response = await http.get(
buildURL(`${RELAY}/messages/${encodeURIComponent(PUBSUB_TOPIC)}`)
);
const body: Message[] = await response.json(); const body: Message[] = await response.json();
if (!body || !body.length) { if (!body || !body.length) {
@ -104,37 +104,11 @@ class Filter {
); );
} }
public addEventListener(contentTopic: string, fn: EventListener) {
this.emitSubscribedEvent(contentTopic);
return this.subscriptionsEmitter.addEventListener(contentTopic, fn as any);
}
public removeEventListener(contentTopic: string, fn: EventListener) {
this.emitUnsubscribedEvent(contentTopic);
return this.subscriptionsEmitter.removeEventListener(contentTopic, fn as any);
}
private emitSubscribedEvent(contentTopic: string) {
this.internalEmitter.dispatchEvent(new CustomEvent("subscribed", { detail: contentTopic }));
}
private emitUnsubscribedEvent(contentTopic: string) {
this.internalEmitter.dispatchEvent(new CustomEvent("unsubscribed", { detail: contentTopic }));
}
}
class LightPush {
constructor() {}
public async send(message: Message): Promise<void> { public async send(message: Message): Promise<void> {
await http.post(`${LOCAL_NODE}/${LIGHT_PUSH}/message`, { await http.post(buildURL(`${RELAY}/messages/${encodeURIComponent(PUBSUB_TOPIC)}`), message);
pubsubTopic: PUBSUB_TOPIC,
message,
});
} }
} }
export const waku = { export const waku = {
filter: new Filter(), relay: new Relay(),
lightPush: new LightPush(), };
};