From bfd9ed1454ef5dbf90152384e9cfcab2ff75ffb3 Mon Sep 17 00:00:00 2001 From: Sasha Date: Tue, 7 Nov 2023 00:50:47 +0100 Subject: [PATCH] use relay --- package.json | 1 - src/hooks/useWaku.ts | 6 +-- src/services/waku.ts | 122 +++++++++++++++++-------------------------- 3 files changed, 51 insertions(+), 78 deletions(-) diff --git a/package.json b/package.json index 4f88de3..03aea4f 100644 --- a/package.json +++ b/package.json @@ -15,7 +15,6 @@ "next": "13.5.6", "react": "^18", "react-dom": "^18", - "uuid": "^9.0.1", "zustand": "^4.4.4" }, "devDependencies": { diff --git a/src/hooks/useWaku.ts b/src/hooks/useWaku.ts index 6bf2300..5b67a19 100644 --- a/src/hooks/useWaku.ts +++ b/src/hooks/useWaku.ts @@ -28,16 +28,16 @@ export const useWaku = () => { setMessages((prev) => [...prev, ...parsedMessaged]); }; - waku.filter.addEventListener(CONTENT_TOPIC, messageListener); + waku.relay.addEventListener(CONTENT_TOPIC, messageListener); return () => { - waku.filter.removeEventListener(CONTENT_TOPIC, messageListener); + waku.relay.removeEventListener(CONTENT_TOPIC, messageListener); }; }, [setMessages]); const onSend = React.useCallback( async (nick: string, text: string) => { - await waku.lightPush.send({ + await waku.relay.send({ version: 0, timestamp: Date.now(), contentTopic: CONTENT_TOPIC, diff --git a/src/services/waku.ts b/src/services/waku.ts index 7a122b6..cafb01a 100644 --- a/src/services/waku.ts +++ b/src/services/waku.ts @@ -1,71 +1,70 @@ -import { v4 as uuid } from "uuid"; -import { - PUBSUB_TOPIC, -} from "@/constants"; +import { PUBSUB_TOPIC } from "@/constants"; import { http } from "@/utils/http"; export type Message = { - payload: string, - contentTopic: string, - version: number, - timestamp: number + payload: string; + contentTopic: string; + version: number; + timestamp: number; }; type EventListener = (event: CustomEvent) => void; const SECOND = 1000; const LOCAL_NODE = "http://127.0.0.1:8645/"; -const FILTER_URL = "/filter/v2/"; -const LIGHT_PUSH = "/lightpush/v1/"; +const RELAY = "/relay/v1"; -class Filter { - private readonly internalEmitter = new EventTarget(); +const buildURL = (endpoint: string) => `${LOCAL_NODE}${endpoint}`; + +class Relay { private readonly subscriptionsEmitter = new EventTarget(); - private contentTopicToRequestID: Map = new Map(); private contentTopicListeners: Map = new Map(); // only one content topic subscriptions is possible now private subscriptionRoutine: undefined | number; - constructor() { - this.internalEmitter.addEventListener("subscribed", this.handleSubscribed.bind(this)); - this.internalEmitter.addEventListener("unsubscribed", this.handleUnsubscribed.bind(this)); + constructor() {} + + public addEventListener(contentTopic: string, fn: EventListener) { + this.handleSubscribed(contentTopic); + return this.subscriptionsEmitter.addEventListener(contentTopic, fn as any); } - - private async handleSubscribed(_e: Event) { - const event = _e as CustomEvent; - const contentTopic = event.detail; - const numberOfListeners = this.contentTopicListeners.get(contentTopic); - // if nwaku node already subscribed to this content topic - if (numberOfListeners) { - this.contentTopicListeners.set(contentTopic, numberOfListeners + 1); - return; - } + public removeEventListener(contentTopic: string, fn: EventListener) { + this.handleUnsubscribed(contentTopic); + return this.subscriptionsEmitter.removeEventListener( + contentTopic, + fn as any + ); + } - const requestId = uuid(); - await http.post(`${LOCAL_NODE}/${FILTER_URL}/subscriptions`, { - requestId, - contentFilters: [contentTopic], - pubsubTopic: PUBSUB_TOPIC - }); + private async handleSubscribed(contentTopic: string) { + const numberOfListeners = this.contentTopicListeners.get(contentTopic); + + // if nwaku node already subscribed to this content topic + if (numberOfListeners) { + this.contentTopicListeners.set(contentTopic, numberOfListeners + 1); + return; + } + + try { + await http.post(buildURL(`${RELAY}/subscriptions`), [PUBSUB_TOPIC]); this.subscriptionRoutine = window.setInterval(async () => { await this.fetchMessages(); }, SECOND); - this.contentTopicToRequestID.set(contentTopic, requestId); this.contentTopicListeners.set(contentTopic, 1); + } catch (error) { + console.error(`Failed to subscribe node ${contentTopic}:`, error); + } } - private async handleUnsubscribed(_e: Event) { - const event = _e as CustomEvent; - const contentTopic = event.detail; - const requestId = this.contentTopicToRequestID.get(contentTopic); + private async handleUnsubscribed(contentTopic: string) { const numberOfListeners = this.contentTopicListeners.get(contentTopic); - if (!numberOfListeners || !requestId) { + if (!numberOfListeners) { return; } @@ -74,15 +73,14 @@ class Filter { return; } - await http.delete(`${LOCAL_NODE}/${FILTER_URL}/subscriptions`, { - requestId, - contentFilters: [contentTopic], - pubsubTopic: PUBSUB_TOPIC - }); + try { + await http.delete(buildURL(`${RELAY}/subscriptions`), [PUBSUB_TOPIC]); + } catch (error) { + console.error(`Failed to unsubscribe node from ${contentTopic}:`, error); + } clearInterval(this.subscriptionRoutine); this.contentTopicListeners.delete(contentTopic); - this.contentTopicToRequestID.delete(contentTopic); } private async fetchMessages(): Promise { @@ -92,7 +90,9 @@ class Filter { return; } - const response = await http.get(`${LOCAL_NODE}/${FILTER_URL}/${encodeURIComponent(contentTopic)}`); + const response = await http.get( + buildURL(`${RELAY}/messages/${encodeURIComponent(PUBSUB_TOPIC)}`) + ); const body: Message[] = await response.json(); if (!body || !body.length) { @@ -104,37 +104,11 @@ class Filter { ); } - public addEventListener(contentTopic: string, fn: EventListener) { - this.emitSubscribedEvent(contentTopic); - return this.subscriptionsEmitter.addEventListener(contentTopic, fn as any); - } - - public removeEventListener(contentTopic: string, fn: EventListener) { - this.emitUnsubscribedEvent(contentTopic); - return this.subscriptionsEmitter.removeEventListener(contentTopic, fn as any); - } - - private emitSubscribedEvent(contentTopic: string) { - this.internalEmitter.dispatchEvent(new CustomEvent("subscribed", { detail: contentTopic })); - } - - private emitUnsubscribedEvent(contentTopic: string) { - this.internalEmitter.dispatchEvent(new CustomEvent("unsubscribed", { detail: contentTopic })); - } -} - -class LightPush { - constructor() {} - public async send(message: Message): Promise { - await http.post(`${LOCAL_NODE}/${LIGHT_PUSH}/message`, { - pubsubTopic: PUBSUB_TOPIC, - message, - }); + await http.post(buildURL(`${RELAY}/messages/${encodeURIComponent(PUBSUB_TOPIC)}`), message); } } export const waku = { - filter: new Filter(), - lightPush: new LightPush(), -}; \ No newline at end of file + relay: new Relay(), +};