feat: switches to REST for calling nwaku messages/subscription endpoints

This commit modifies functions in ServiceNode to use the REST API
instead of JSON RPC when reading messages for a pubsub topic or
content topic, and when ensuring a nwaku node is subscribed to a
pubsub topic. Also modifies default Docker params to enable the
rest API and provide a port.
This commit is contained in:
Arseniy Klempner 2024-02-14 18:04:51 -08:00
parent a787e306b9
commit 739f2bc2c9
No known key found for this signature in database
GPG Key ID: 59967D458EFBF01B
4 changed files with 99 additions and 34 deletions

View File

@ -3,7 +3,7 @@ import fs from "fs";
import { Logger } from "@waku/utils"; import { Logger } from "@waku/utils";
import Docker from "dockerode"; import Docker from "dockerode";
import { Args } from "../types.js"; import { Args, Ports } from "../types.js";
const log = new Logger("test:docker"); const log = new Logger("test:docker");
@ -87,12 +87,12 @@ export default class Dockerode {
} }
async startContainer( async startContainer(
ports: number[], ports: Ports,
args: Args, args: Args,
logPath: string, logPath: string,
wakuServiceNodeParams?: string wakuServiceNodeParams?: string
): Promise<Docker.Container> { ): Promise<Docker.Container> {
const [rpcPort, tcpPort, websocketPort, discv5UdpPort] = ports; const { rpcPort, restPort, tcpPort, websocketPort, discv5UdpPort } = ports;
await this.confirmImageExistsOrPull(); await this.confirmImageExistsOrPull();
@ -109,6 +109,7 @@ export default class Dockerode {
HostConfig: { HostConfig: {
AutoRemove: true, AutoRemove: true,
PortBindings: { PortBindings: {
[`${restPort}/tcp`]: [{ HostPort: restPort.toString() }],
[`${rpcPort}/tcp`]: [{ HostPort: rpcPort.toString() }], [`${rpcPort}/tcp`]: [{ HostPort: rpcPort.toString() }],
[`${tcpPort}/tcp`]: [{ HostPort: tcpPort.toString() }], [`${tcpPort}/tcp`]: [{ HostPort: tcpPort.toString() }],
[`${websocketPort}/tcp`]: [{ HostPort: websocketPort.toString() }], [`${websocketPort}/tcp`]: [{ HostPort: websocketPort.toString() }],
@ -118,6 +119,7 @@ export default class Dockerode {
} }
}, },
ExposedPorts: { ExposedPorts: {
[`${restPort}/tcp`]: {},
[`${rpcPort}/tcp`]: {}, [`${rpcPort}/tcp`]: {},
[`${tcpPort}/tcp`]: {}, [`${tcpPort}/tcp`]: {},
[`${websocketPort}/tcp`]: {}, [`${websocketPort}/tcp`]: {},

View File

@ -13,7 +13,8 @@ import {
KeyPair, KeyPair,
LogLevel, LogLevel,
MessageRpcQuery, MessageRpcQuery,
MessageRpcResponse MessageRpcResponse,
Ports
} from "../types.js"; } from "../types.js";
import { existsAsync, mkdirAsync, openAsync } from "../utils/async_fs.js"; import { existsAsync, mkdirAsync, openAsync } from "../utils/async_fs.js";
import { delay } from "../utils/delay.js"; import { delay } from "../utils/delay.js";
@ -49,6 +50,7 @@ export class ServiceNode {
private websocketPort?: number; private websocketPort?: number;
private readonly logPath: string; private readonly logPath: string;
private rpcPort?: number; private rpcPort?: number;
private restPort?: number;
/** /**
* Convert a [[WakuMessage]] to a [[WakuRelayMessage]]. The latter is used * Convert a [[WakuMessage]] to a [[WakuRelayMessage]]. The latter is used
@ -116,10 +118,16 @@ export class ServiceNode {
// we also randomize the first port that portfinder will try // we also randomize the first port that portfinder will try
const startPort = Math.floor(Math.random() * (65535 - 1025) + 1025); const startPort = Math.floor(Math.random() * (65535 - 1025) + 1025);
const ports: number[] = await new Promise((resolve, reject) => { const ports: Ports = await new Promise((resolve, reject) => {
portfinder.getPorts(4, { port: startPort }, (err, ports) => { portfinder.getPorts(5, { port: startPort }, (err, ports) => {
if (err) reject(err); if (err) reject(err);
resolve(ports); resolve({
rpcPort: ports[0],
tcpPort: ports[1],
websocketPort: ports[2],
restPort: ports[3],
discv5UdpPort: ports[4]
});
}); });
}); });
@ -127,7 +135,9 @@ export class ServiceNode {
args.logLevel = LogLevel.Debug; args.logLevel = LogLevel.Debug;
} }
const [rpcPort, tcpPort, websocketPort, discv5UdpPort] = ports; const { rpcPort, tcpPort, websocketPort, restPort, discv5UdpPort } =
ports;
this.restPort = restPort;
this.rpcPort = rpcPort; this.rpcPort = rpcPort;
this.websocketPort = websocketPort; this.websocketPort = websocketPort;
@ -138,13 +148,15 @@ export class ServiceNode {
Object.assign( Object.assign(
mergedArgs, mergedArgs,
{ {
rest: true,
restPort,
rpcPort, rpcPort,
tcpPort, tcpPort,
websocketPort, websocketPort,
...(args?.peerExchange && { discv5UdpPort }), ...(args?.peerExchange && { discv5UdpPort }),
...(isGoWaku && { minRelayPeersToPublish: 0, legacyFilter }) ...(isGoWaku && { minRelayPeersToPublish: 0, legacyFilter })
}, },
{ rpcAddress: "0.0.0.0" }, { rpcAddress: "0.0.0.0", restAddress: "0.0.0.0" },
_args _args
); );
@ -210,11 +222,27 @@ export class ServiceNode {
async ensureSubscriptions( async ensureSubscriptions(
pubsubTopics: string[] = [DefaultPubsubTopic] pubsubTopics: string[] = [DefaultPubsubTopic]
): Promise<boolean> { ): Promise<boolean> {
this.checkProcess(); return this.restCall<boolean>(
"/relay/v1/subscriptions",
"POST",
pubsubTopics,
async (response) => response.status === 200
);
}
return this.rpcCall<boolean>("post_waku_v2_relay_v1_subscriptions", [ async messages(
pubsubTopics pubsubTopic: string = DefaultPubsubTopic
]); ): Promise<MessageRpcResponse[]> {
pubsubTopic = encodeURIComponent(pubsubTopic);
return this.restCall<MessageRpcResponse[]>(
`/relay/v1/messages/${pubsubTopic}`,
"GET",
null,
async (response) => {
const data = await response.json();
return data?.length ? data : [];
}
);
} }
async ensureSubscriptionsAutosharding( async ensureSubscriptionsAutosharding(
@ -222,9 +250,12 @@ export class ServiceNode {
): Promise<boolean> { ): Promise<boolean> {
this.checkProcess(); this.checkProcess();
return this.rpcCall<boolean>("post_waku_v2_relay_v1_auto_subscriptions", [ return this.restCall<boolean>(
contentTopics "/relay/v1/subscriptions",
]); "POST",
contentTopics,
async (response) => response.status === 200
);
} }
async sendMessage( async sendMessage(
@ -255,30 +286,21 @@ export class ServiceNode {
]); ]);
} }
async messages(
pubsubTopic: string = DefaultPubsubTopic
): Promise<MessageRpcResponse[]> {
this.checkProcess();
const msgs = await this.rpcCall<MessageRpcResponse[]>(
"get_waku_v2_relay_v1_messages",
[pubsubTopic]
);
return msgs.filter(isDefined);
}
async messagesAutosharding( async messagesAutosharding(
contentTopic: string contentTopic: string
): Promise<MessageRpcResponse[]> { ): Promise<MessageRpcResponse[]> {
this.checkProcess(); this.checkProcess();
const msgs = await this.rpcCall<MessageRpcResponse[]>( contentTopic = encodeURIComponent(contentTopic);
"get_waku_v2_relay_v1_auto_messages", return this.restCall<MessageRpcResponse[]>(
[contentTopic] `/relay/v1/auto/messages/${contentTopic}`,
"GET",
null,
async (response) => {
const data = await response.json();
return data?.length ? data.filter(isDefined) : [];
}
); );
return msgs.filter(isDefined);
} }
async getAsymmetricKeyPair(): Promise<KeyPair> { async getAsymmetricKeyPair(): Promise<KeyPair> {
@ -411,6 +433,10 @@ export class ServiceNode {
return `http://127.0.0.1:${this.rpcPort}/`; return `http://127.0.0.1:${this.rpcPort}/`;
} }
get httpUrl(): string {
return `http://127.0.0.1:${this.restPort}`;
}
async rpcCall<T>( async rpcCall<T>(
method: string, method: string,
params: Array<string | number | unknown> params: Array<string | number | unknown>
@ -442,6 +468,31 @@ export class ServiceNode {
); );
} }
async restCall<T>(
endpoint: string,
method: "GET" | "POST",
body: any = null,
processResponse: (response: Response) => Promise<T>
): Promise<T> {
this.checkProcess();
try {
log.info("Making a REST Call: ", endpoint, body);
const options: RequestInit = {
method,
headers: new Headers({ "Content-Type": "application/json" })
};
if (body) options.body = JSON.stringify(body);
const response = await fetch(`${this.httpUrl}${endpoint}`, options);
log.info(`Received REST Response: `, response.status);
return await processResponse(response);
} catch (error) {
log.error(`${this.httpUrl} failed with error:`, error);
throw error;
}
}
private checkProcess(): void { private checkProcess(): void {
if (!this.docker?.container) { if (!this.docker?.container) {
throw `${this.type} container hasn't started`; throw `${this.type} container hasn't started`;
@ -454,6 +505,7 @@ export function defaultArgs(): Args {
listenAddress: "0.0.0.0", listenAddress: "0.0.0.0",
rpc: true, rpc: true,
relay: false, relay: false,
rest: true,
rpcAdmin: true, rpcAdmin: true,
websocketSupport: true, websocketSupport: true,
logLevel: LogLevel.Trace logLevel: LogLevel.Trace

View File

@ -3,6 +3,7 @@ export interface Args {
nat?: "none"; nat?: "none";
listenAddress?: string; listenAddress?: string;
relay?: boolean; relay?: boolean;
rest?: boolean;
rpc?: boolean; rpc?: boolean;
rpcAdmin?: boolean; rpcAdmin?: boolean;
nodekey?: string; nodekey?: string;
@ -18,6 +19,7 @@ export interface Args {
rpcPrivate?: boolean; rpcPrivate?: boolean;
websocketSupport?: boolean; websocketSupport?: boolean;
tcpPort?: number; tcpPort?: number;
restPort?: number;
rpcPort?: number; rpcPort?: number;
websocketPort?: number; websocketPort?: number;
discv5BootstrapNode?: string; discv5BootstrapNode?: string;
@ -27,6 +29,14 @@ export interface Args {
clusterId?: number; clusterId?: number;
} }
export interface Ports {
rpcPort: number;
tcpPort: number;
websocketPort: number;
restPort: number;
discv5UdpPort: number;
}
export enum LogLevel { export enum LogLevel {
Error = "ERROR", Error = "ERROR",
Info = "INFO", Info = "INFO",

View File

@ -14,6 +14,7 @@ describe("nwaku", () => {
"--listen-address=0.0.0.0", "--listen-address=0.0.0.0",
"--rpc=true", "--rpc=true",
"--relay=false", "--relay=false",
"--rest=true",
"--rpc-admin=true", "--rpc-admin=true",
"--websocket-support=true", "--websocket-support=true",
"--log-level=TRACE", "--log-level=TRACE",