Merge pull request #1852 from waku-org/feat/messages-subscription-rest

feat: switches to REST for calling nwaku messages/subscription endpoints
This commit is contained in:
Arseniy Klempner 2024-02-15 14:58:08 -08:00 committed by GitHub
commit aabd907f6a
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 99 additions and 34 deletions

View File

@ -3,7 +3,7 @@ import fs from "fs";
import { Logger } from "@waku/utils";
import Docker from "dockerode";
import { Args } from "../types.js";
import { Args, Ports } from "../types.js";
const log = new Logger("test:docker");
@ -87,12 +87,12 @@ export default class Dockerode {
}
async startContainer(
ports: number[],
ports: Ports,
args: Args,
logPath: string,
wakuServiceNodeParams?: string
): Promise<Docker.Container> {
const [rpcPort, tcpPort, websocketPort, discv5UdpPort] = ports;
const { rpcPort, restPort, tcpPort, websocketPort, discv5UdpPort } = ports;
await this.confirmImageExistsOrPull();
@ -109,6 +109,7 @@ export default class Dockerode {
HostConfig: {
AutoRemove: true,
PortBindings: {
[`${restPort}/tcp`]: [{ HostPort: restPort.toString() }],
[`${rpcPort}/tcp`]: [{ HostPort: rpcPort.toString() }],
[`${tcpPort}/tcp`]: [{ HostPort: tcpPort.toString() }],
[`${websocketPort}/tcp`]: [{ HostPort: websocketPort.toString() }],
@ -118,6 +119,7 @@ export default class Dockerode {
}
},
ExposedPorts: {
[`${restPort}/tcp`]: {},
[`${rpcPort}/tcp`]: {},
[`${tcpPort}/tcp`]: {},
[`${websocketPort}/tcp`]: {},

View File

@ -13,7 +13,8 @@ import {
KeyPair,
LogLevel,
MessageRpcQuery,
MessageRpcResponse
MessageRpcResponse,
Ports
} from "../types.js";
import { existsAsync, mkdirAsync, openAsync } from "../utils/async_fs.js";
import { delay } from "../utils/delay.js";
@ -49,6 +50,7 @@ export class ServiceNode {
private websocketPort?: number;
private readonly logPath: string;
private rpcPort?: number;
private restPort?: number;
/**
* Convert a [[WakuMessage]] to a [[WakuRelayMessage]]. The latter is used
@ -116,10 +118,16 @@ export class ServiceNode {
// we also randomize the first port that portfinder will try
const startPort = Math.floor(Math.random() * (65535 - 1025) + 1025);
const ports: number[] = await new Promise((resolve, reject) => {
portfinder.getPorts(4, { port: startPort }, (err, ports) => {
const ports: Ports = await new Promise((resolve, reject) => {
portfinder.getPorts(5, { port: startPort }, (err, ports) => {
if (err) reject(err);
resolve(ports);
resolve({
rpcPort: ports[0],
tcpPort: ports[1],
websocketPort: ports[2],
restPort: ports[3],
discv5UdpPort: ports[4]
});
});
});
@ -127,7 +135,9 @@ export class ServiceNode {
args.logLevel = LogLevel.Debug;
}
const [rpcPort, tcpPort, websocketPort, discv5UdpPort] = ports;
const { rpcPort, tcpPort, websocketPort, restPort, discv5UdpPort } =
ports;
this.restPort = restPort;
this.rpcPort = rpcPort;
this.websocketPort = websocketPort;
@ -138,13 +148,15 @@ export class ServiceNode {
Object.assign(
mergedArgs,
{
rest: true,
restPort,
rpcPort,
tcpPort,
websocketPort,
...(args?.peerExchange && { discv5UdpPort }),
...(isGoWaku && { minRelayPeersToPublish: 0, legacyFilter })
},
{ rpcAddress: "0.0.0.0" },
{ rpcAddress: "0.0.0.0", restAddress: "0.0.0.0" },
_args
);
@ -210,11 +222,27 @@ export class ServiceNode {
async ensureSubscriptions(
pubsubTopics: string[] = [DefaultPubsubTopic]
): Promise<boolean> {
this.checkProcess();
return this.restCall<boolean>(
"/relay/v1/subscriptions",
"POST",
pubsubTopics,
async (response) => response.status === 200
);
}
return this.rpcCall<boolean>("post_waku_v2_relay_v1_subscriptions", [
pubsubTopics
]);
async messages(
pubsubTopic: string = DefaultPubsubTopic
): Promise<MessageRpcResponse[]> {
pubsubTopic = encodeURIComponent(pubsubTopic);
return this.restCall<MessageRpcResponse[]>(
`/relay/v1/messages/${pubsubTopic}`,
"GET",
null,
async (response) => {
const data = await response.json();
return data?.length ? data : [];
}
);
}
async ensureSubscriptionsAutosharding(
@ -222,9 +250,12 @@ export class ServiceNode {
): Promise<boolean> {
this.checkProcess();
return this.rpcCall<boolean>("post_waku_v2_relay_v1_auto_subscriptions", [
contentTopics
]);
return this.restCall<boolean>(
"/relay/v1/subscriptions",
"POST",
contentTopics,
async (response) => response.status === 200
);
}
async sendMessage(
@ -255,30 +286,21 @@ export class ServiceNode {
]);
}
async messages(
pubsubTopic: string = DefaultPubsubTopic
): Promise<MessageRpcResponse[]> {
this.checkProcess();
const msgs = await this.rpcCall<MessageRpcResponse[]>(
"get_waku_v2_relay_v1_messages",
[pubsubTopic]
);
return msgs.filter(isDefined);
}
async messagesAutosharding(
contentTopic: string
): Promise<MessageRpcResponse[]> {
this.checkProcess();
const msgs = await this.rpcCall<MessageRpcResponse[]>(
"get_waku_v2_relay_v1_auto_messages",
[contentTopic]
contentTopic = encodeURIComponent(contentTopic);
return this.restCall<MessageRpcResponse[]>(
`/relay/v1/auto/messages/${contentTopic}`,
"GET",
null,
async (response) => {
const data = await response.json();
return data?.length ? data.filter(isDefined) : [];
}
);
return msgs.filter(isDefined);
}
async getAsymmetricKeyPair(): Promise<KeyPair> {
@ -411,6 +433,10 @@ export class ServiceNode {
return `http://127.0.0.1:${this.rpcPort}/`;
}
get httpUrl(): string {
return `http://127.0.0.1:${this.restPort}`;
}
async rpcCall<T>(
method: string,
params: Array<string | number | unknown>
@ -442,6 +468,31 @@ export class ServiceNode {
);
}
async restCall<T>(
endpoint: string,
method: "GET" | "POST",
body: any = null,
processResponse: (response: Response) => Promise<T>
): Promise<T> {
this.checkProcess();
try {
log.info("Making a REST Call: ", endpoint, body);
const options: RequestInit = {
method,
headers: new Headers({ "Content-Type": "application/json" })
};
if (body) options.body = JSON.stringify(body);
const response = await fetch(`${this.httpUrl}${endpoint}`, options);
log.info(`Received REST Response: `, response.status);
return await processResponse(response);
} catch (error) {
log.error(`${this.httpUrl} failed with error:`, error);
throw error;
}
}
private checkProcess(): void {
if (!this.docker?.container) {
throw `${this.type} container hasn't started`;
@ -454,6 +505,7 @@ export function defaultArgs(): Args {
listenAddress: "0.0.0.0",
rpc: true,
relay: false,
rest: true,
rpcAdmin: true,
websocketSupport: true,
logLevel: LogLevel.Trace

View File

@ -3,6 +3,7 @@ export interface Args {
nat?: "none";
listenAddress?: string;
relay?: boolean;
rest?: boolean;
rpc?: boolean;
rpcAdmin?: boolean;
nodekey?: string;
@ -18,6 +19,7 @@ export interface Args {
rpcPrivate?: boolean;
websocketSupport?: boolean;
tcpPort?: number;
restPort?: number;
rpcPort?: number;
websocketPort?: number;
discv5BootstrapNode?: string;
@ -27,6 +29,14 @@ export interface Args {
clusterId?: number;
}
export interface Ports {
rpcPort: number;
tcpPort: number;
websocketPort: number;
restPort: number;
discv5UdpPort: number;
}
export enum LogLevel {
Error = "ERROR",
Info = "INFO",

View File

@ -14,6 +14,7 @@ describe("nwaku", () => {
"--listen-address=0.0.0.0",
"--rpc=true",
"--relay=false",
"--rest=true",
"--rpc-admin=true",
"--websocket-support=true",
"--log-level=TRACE",