Merge pull request #946 from waku-org/rpc-type

This commit is contained in:
fryorcraken.eth 2022-09-11 08:25:12 +10:00 committed by GitHub
commit bbd035f248
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 104 additions and 60 deletions

View File

@ -11,6 +11,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- `addPeerToAddressBook` is now async. - `addPeerToAddressBook` is now async.
- API Docs moved to https://js.waku.org/ - API Docs moved to https://js.waku.org/
- test: fix typing for nwaku JSON RPC responses.
## [0.26.0] - 2022-09-08 ## [0.26.0] - 2022-09-08

View File

@ -1,10 +1,16 @@
import { expect } from "chai"; import { expect } from "chai";
import debug from "debug"; import debug from "debug";
import { makeLogFileName, NOISE_KEY_1, Nwaku } from "../../test_utils"; import {
makeLogFileName,
MessageRpcResponse,
NOISE_KEY_1,
Nwaku,
} from "../../test_utils";
import { delay } from "../../test_utils/delay"; import { delay } from "../../test_utils/delay";
import { createFullNode } from "../create_waku"; import { createFullNode } from "../create_waku";
import type { WakuFull } from "../interfaces"; import type { WakuFull } from "../interfaces";
import { bytesToUtf8 } from "../utils";
import { waitForRemotePeer } from "../wait_for_remote_peer"; import { waitForRemotePeer } from "../wait_for_remote_peer";
import { Protocols } from "../waku"; import { Protocols } from "../waku";
import { WakuMessage } from "../waku_message"; import { WakuMessage } from "../waku_message";
@ -44,7 +50,7 @@ describe("Waku Light Push [node only]", () => {
const pushResponse = await waku.lightPush.push(message); const pushResponse = await waku.lightPush.push(message);
expect(pushResponse?.isSuccess).to.be.true; expect(pushResponse?.isSuccess).to.be.true;
let msgs: WakuMessage[] = []; let msgs: MessageRpcResponse[] = [];
while (msgs.length === 0) { while (msgs.length === 0) {
await delay(200); await delay(200);
@ -53,7 +59,7 @@ describe("Waku Light Push [node only]", () => {
expect(msgs[0].contentTopic).to.equal(message.contentTopic); expect(msgs[0].contentTopic).to.equal(message.contentTopic);
expect(msgs[0].version).to.equal(message.version); expect(msgs[0].version).to.equal(message.version);
expect(msgs[0].payloadAsUtf8).to.equal(messageText); expect(bytesToUtf8(new Uint8Array(msgs[0].payload))).to.equal(messageText);
}); });
it("Push on custom pubsub topic", async function () { it("Push on custom pubsub topic", async function () {
@ -88,7 +94,7 @@ describe("Waku Light Push [node only]", () => {
log("Ack received", pushResponse); log("Ack received", pushResponse);
expect(pushResponse?.isSuccess).to.be.true; expect(pushResponse?.isSuccess).to.be.true;
let msgs: WakuMessage[] = []; let msgs: MessageRpcResponse[] = [];
log("Waiting for message to show in nwaku"); log("Waiting for message to show in nwaku");
while (msgs.length === 0) { while (msgs.length === 0) {
@ -98,6 +104,6 @@ describe("Waku Light Push [node only]", () => {
expect(msgs[0].contentTopic).to.equal(message.contentTopic); expect(msgs[0].contentTopic).to.equal(message.contentTopic);
expect(msgs[0].version).to.equal(message.version); expect(msgs[0].version).to.equal(message.version);
expect(msgs[0].payloadAsUtf8).to.equal(messageText); expect(bytesToUtf8(new Uint8Array(msgs[0].payload))!).to.equal(messageText);
}); });
}); });

View File

@ -3,9 +3,10 @@ import debug from "debug";
import { import {
makeLogFileName, makeLogFileName,
MessageRpcQuery,
MessageRpcResponseHex,
NOISE_KEY_1, NOISE_KEY_1,
Nwaku, Nwaku,
WakuRelayMessage,
} from "../../test_utils"; } from "../../test_utils";
import { delay } from "../../test_utils/delay"; import { delay } from "../../test_utils/delay";
import { createPrivacyNode } from "../create_waku"; import { createPrivacyNode } from "../create_waku";
@ -61,7 +62,7 @@ describe("Waku Message [node only]", function () {
this.timeout(5000); this.timeout(5000);
const messageText = "Here is an encrypted message."; const messageText = "Here is an encrypted message.";
const message: WakuRelayMessage = { const message: MessageRpcQuery = {
contentTopic: TestContentTopic, contentTopic: TestContentTopic,
payload: bytesToHex(utf8ToBytes(messageText)), payload: bytesToHex(utf8ToBytes(messageText)),
}; };
@ -111,7 +112,7 @@ describe("Waku Message [node only]", function () {
log("Send message over relay"); log("Send message over relay");
await waku.relay.send(message); await waku.relay.send(message);
let msgs: WakuRelayMessage[] = []; let msgs: MessageRpcResponseHex[] = [];
while (msgs.length === 0) { while (msgs.length === 0) {
log("Wait for message to be seen by nwaku"); log("Wait for message to be seen by nwaku");
@ -128,7 +129,7 @@ describe("Waku Message [node only]", function () {
this.timeout(5000); this.timeout(5000);
const messageText = "Here is a message encrypted in a symmetric manner."; const messageText = "Here is a message encrypted in a symmetric manner.";
const message: WakuRelayMessage = { const message: MessageRpcQuery = {
contentTopic: TestContentTopic, contentTopic: TestContentTopic,
payload: bytesToHex(utf8ToBytes(messageText)), payload: bytesToHex(utf8ToBytes(messageText)),
}; };
@ -175,7 +176,7 @@ describe("Waku Message [node only]", function () {
log("Sending message over relay"); log("Sending message over relay");
await waku.relay.send(message); await waku.relay.send(message);
let msgs: WakuRelayMessage[] = []; let msgs: MessageRpcResponseHex[] = [];
while (msgs.length === 0) { while (msgs.length === 0) {
await delay(200); await delay(200);

View File

@ -4,6 +4,7 @@ import debug from "debug";
import { import {
makeLogFileName, makeLogFileName,
MessageRpcResponse,
NOISE_KEY_1, NOISE_KEY_1,
NOISE_KEY_2, NOISE_KEY_2,
NOISE_KEY_3, NOISE_KEY_3,
@ -18,6 +19,7 @@ import {
getPublicKey, getPublicKey,
} from "../crypto"; } from "../crypto";
import type { WakuPrivacy } from "../interfaces"; import type { WakuPrivacy } from "../interfaces";
import { bytesToUtf8, utf8ToBytes } from "../utils";
import { waitForRemotePeer } from "../wait_for_remote_peer"; import { waitForRemotePeer } from "../wait_for_remote_peer";
import { Protocols } from "../waku"; import { Protocols } from "../waku";
import { DecryptionMethod, WakuMessage } from "../waku_message"; import { DecryptionMethod, WakuMessage } from "../waku_message";
@ -385,7 +387,7 @@ describe("Waku Relay [node only]", () => {
await delay(1000); await delay(1000);
await waku.relay.send(message); await waku.relay.send(message);
let msgs: WakuMessage[] = []; let msgs: MessageRpcResponse[] = [];
while (msgs.length === 0) { while (msgs.length === 0) {
console.log("Waiting for messages"); console.log("Waiting for messages");
@ -395,17 +397,19 @@ describe("Waku Relay [node only]", () => {
expect(msgs[0].contentTopic).to.equal(message.contentTopic); expect(msgs[0].contentTopic).to.equal(message.contentTopic);
expect(msgs[0].version).to.equal(message.version); expect(msgs[0].version).to.equal(message.version);
expect(msgs[0].payloadAsUtf8).to.equal(messageText); expect(bytesToUtf8(new Uint8Array(msgs[0].payload))).to.equal(
messageText
);
}); });
it("Nwaku publishes", async function () { it("Nwaku publishes", async function () {
await delay(200); await delay(200);
const messageText = "Here is another message."; const messageText = "Here is another message.";
const message = await WakuMessage.fromUtf8String( const message = {
messageText, payload: utf8ToBytes(messageText),
TestContentTopic contentTopic: TestContentTopic,
); };
const receivedMsgPromise: Promise<WakuMessage> = new Promise( const receivedMsgPromise: Promise<WakuMessage> = new Promise(
(resolve) => { (resolve) => {
@ -413,12 +417,12 @@ describe("Waku Relay [node only]", () => {
} }
); );
await nwaku.sendMessage(Nwaku.toWakuRelayMessage(message)); await nwaku.sendMessage(Nwaku.toMessageRpcQuery(message));
const receivedMsg = await receivedMsgPromise; const receivedMsg = await receivedMsgPromise;
expect(receivedMsg.contentTopic).to.eq(message.contentTopic); expect(receivedMsg.contentTopic).to.eq(message.contentTopic);
expect(receivedMsg.version).to.eq(message.version); expect(receivedMsg.version).to.eq(0);
expect(receivedMsg.payloadAsUtf8).to.eq(messageText); expect(receivedMsg.payloadAsUtf8).to.eq(messageText);
}); });

View File

@ -14,6 +14,7 @@ import {
getPublicKey, getPublicKey,
} from "../crypto"; } from "../crypto";
import type { WakuFull } from "../interfaces"; import type { WakuFull } from "../interfaces";
import { utf8ToBytes } from "../utils";
import { waitForRemotePeer } from "../wait_for_remote_peer"; import { waitForRemotePeer } from "../wait_for_remote_peer";
import { Protocols } from "../waku"; import { Protocols } from "../waku";
import { DecryptionMethod, WakuMessage } from "../waku_message"; import { DecryptionMethod, WakuMessage } from "../waku_message";
@ -42,9 +43,10 @@ describe("Waku Store", () => {
for (let i = 0; i < 2; i++) { for (let i = 0; i < 2; i++) {
expect( expect(
await nwaku.sendMessage( await nwaku.sendMessage(
Nwaku.toWakuRelayMessage( Nwaku.toMessageRpcQuery({
await WakuMessage.fromUtf8String(`Message ${i}`, TestContentTopic) payload: utf8ToBytes(`Message ${i}`),
) contentTopic: TestContentTopic,
})
) )
).to.be.true; ).to.be.true;
} }
@ -75,9 +77,10 @@ describe("Waku Store", () => {
for (let i = 0; i < totalMsgs; i++) { for (let i = 0; i < totalMsgs; i++) {
expect( expect(
await nwaku.sendMessage( await nwaku.sendMessage(
Nwaku.toWakuRelayMessage( Nwaku.toMessageRpcQuery({
await WakuMessage.fromUtf8String(`Message ${i}`, TestContentTopic) payload: utf8ToBytes(`Message ${i}`),
) contentTopic: TestContentTopic,
})
) )
).to.be.true; ).to.be.true;
} }
@ -115,9 +118,10 @@ describe("Waku Store", () => {
for (let i = 0; i < availMsgs; i++) { for (let i = 0; i < availMsgs; i++) {
expect( expect(
await nwaku.sendMessage( await nwaku.sendMessage(
Nwaku.toWakuRelayMessage( Nwaku.toMessageRpcQuery({
await WakuMessage.fromUtf8String(`Message ${i}`, TestContentTopic) payload: utf8ToBytes(`Message ${i}`),
) contentTopic: TestContentTopic,
})
) )
).to.be.true; ).to.be.true;
} }
@ -152,9 +156,10 @@ describe("Waku Store", () => {
for (let i = 0; i < 15; i++) { for (let i = 0; i < 15; i++) {
expect( expect(
await nwaku.sendMessage( await nwaku.sendMessage(
Nwaku.toWakuRelayMessage( Nwaku.toMessageRpcQuery({
await WakuMessage.fromUtf8String(`Message ${i}`, TestContentTopic) payload: utf8ToBytes(`Message ${i}`),
) contentTopic: TestContentTopic,
})
) )
).to.be.true; ).to.be.true;
} }
@ -194,9 +199,10 @@ describe("Waku Store", () => {
for (let i = 0; i < 2; i++) { for (let i = 0; i < 2; i++) {
expect( expect(
await nwaku.sendMessage( await nwaku.sendMessage(
Nwaku.toWakuRelayMessage( Nwaku.toMessageRpcQuery({
await WakuMessage.fromUtf8String(`Message ${i}`, TestContentTopic) payload: utf8ToBytes(`Message ${i}`),
), contentTopic: TestContentTopic,
}),
customPubSubTopic customPubSubTopic
) )
).to.be.true; ).to.be.true;
@ -449,12 +455,12 @@ describe("Waku Store", () => {
for (let i = 0; i < 2; i++) { for (let i = 0; i < 2; i++) {
expect( expect(
await nwaku.sendMessage( await nwaku.sendMessage(
Nwaku.toWakuRelayMessage( Nwaku.toMessageRpcQuery({
await WakuMessage.fromUtf8String(`Message ${i}`, TestContentTopic, { payload: utf8ToBytes(`Message ${i}`),
contentTopic: TestContentTopic,
timestamp: messageTimestamps[i], timestamp: messageTimestamps[i],
}) })
) )
)
).to.be.true; ).to.be.true;
} }

View File

@ -14,8 +14,6 @@ import portfinder from "portfinder";
import { DefaultPubSubTopic } from "../lib/constants"; import { DefaultPubSubTopic } from "../lib/constants";
import { bytesToHex, hexToBytes } from "../lib/utils"; import { bytesToHex, hexToBytes } from "../lib/utils";
import { WakuMessage } from "../lib/waku_message";
import * as proto from "../proto/message";
import { existsAsync, mkdirAsync, openAsync } from "./async_fs"; import { existsAsync, mkdirAsync, openAsync } from "./async_fs";
import { delay } from "./delay"; import { delay } from "./delay";
@ -34,6 +32,14 @@ const NODE_READY_LOG_LINE = "Node setup complete";
const LOG_DIR = "./log"; const LOG_DIR = "./log";
const OneMillion = BigInt(1_000_000);
// eslint-disable-next-line @typescript-eslint/ban-ts-comment
// @ts-ignore
BigInt.prototype.toJSON = function toJSON() {
return Number(this);
};
export interface Args { export interface Args {
staticnode?: string; staticnode?: string;
nat?: "none"; nat?: "none";
@ -71,10 +77,24 @@ export interface KeyPair {
publicKey: string; publicKey: string;
} }
export interface WakuRelayMessage { export interface MessageRpcQuery {
payload: string; // Hex encoded data string without `0x` prefix. payload: string; // Hex encoded data string without `0x` prefix.
contentTopic?: string; contentTopic?: string;
timestamp?: number; // Unix epoch time in nanoseconds as a 64-bits integer value. timestamp?: bigint; // Unix epoch time in nanoseconds as a 64-bits integer value.
}
export interface MessageRpcResponse {
payload: number[];
contentTopic?: string;
version?: number;
timestamp?: bigint; // Unix epoch time in nanoseconds as a 64-bits integer value.
}
export interface MessageRpcResponseHex {
payload: string;
contentTopic?: string;
version?: number;
timestamp?: bigint; // Unix epoch time in nanoseconds as a 64-bits integer value.
} }
export class Nwaku { export class Nwaku {
@ -89,14 +109,18 @@ export class Nwaku {
* Convert a [[WakuMessage]] to a [[WakuRelayMessage]]. The latter is used * Convert a [[WakuMessage]] to a [[WakuRelayMessage]]. The latter is used
* by the nwaku JSON-RPC API. * by the nwaku JSON-RPC API.
*/ */
static toWakuRelayMessage(message: WakuMessage): WakuRelayMessage { static toMessageRpcQuery(message: {
payload: Uint8Array;
contentTopic: string;
timestamp?: Date;
}): MessageRpcQuery {
if (!message.payload) { if (!message.payload) {
throw "Attempting to convert empty message"; throw "Attempting to convert empty message";
} }
let timestamp; let timestamp;
if (message.proto.timestamp) { if (message.timestamp) {
timestamp = Number.parseInt(message.proto.timestamp.toString(10)); timestamp = BigInt(message.timestamp.valueOf()) * OneMillion;
} }
return { return {
@ -216,11 +240,15 @@ export class Nwaku {
} }
async sendMessage( async sendMessage(
message: WakuRelayMessage, message: MessageRpcQuery,
pubSubTopic: string = DefaultPubSubTopic pubSubTopic: string = DefaultPubSubTopic
): Promise<boolean> { ): Promise<boolean> {
this.checkProcess(); this.checkProcess();
if (typeof message.timestamp === "undefined") {
message.timestamp = BigInt(new Date().valueOf()) * OneMillion;
}
return this.rpcCall<boolean>("post_waku_v2_relay_v1_message", [ return this.rpcCall<boolean>("post_waku_v2_relay_v1_message", [
pubSubTopic, pubSubTopic,
message, message,
@ -229,22 +257,20 @@ export class Nwaku {
async messages( async messages(
pubsubTopic: string = DefaultPubSubTopic pubsubTopic: string = DefaultPubSubTopic
): Promise<WakuMessage[]> { ): Promise<MessageRpcResponse[]> {
this.checkProcess(); this.checkProcess();
const isDefined = (msg: WakuMessage | undefined): msg is WakuMessage => { const isDefined = (
msg: MessageRpcResponse | undefined
): msg is MessageRpcResponse => {
return !!msg; return !!msg;
}; };
const protoMsgs = await this.rpcCall<proto.WakuMessage[]>( const msgs = await this.rpcCall<MessageRpcResponse[]>(
"get_waku_v2_relay_v1_messages", "get_waku_v2_relay_v1_messages",
[pubsubTopic] [pubsubTopic]
); );
const msgs = await Promise.all(
protoMsgs.map(async (protoMsg) => await WakuMessage.decodeProto(protoMsg))
);
return msgs.filter(isDefined); return msgs.filter(isDefined);
} }
@ -267,7 +293,7 @@ export class Nwaku {
} }
async postAsymmetricMessage( async postAsymmetricMessage(
message: WakuRelayMessage, message: MessageRpcQuery,
publicKey: Uint8Array, publicKey: Uint8Array,
pubSubTopic?: string pubSubTopic?: string
): Promise<boolean> { ): Promise<boolean> {
@ -287,10 +313,10 @@ export class Nwaku {
async getAsymmetricMessages( async getAsymmetricMessages(
privateKey: Uint8Array, privateKey: Uint8Array,
pubSubTopic?: string pubSubTopic?: string
): Promise<WakuRelayMessage[]> { ): Promise<MessageRpcResponseHex[]> {
this.checkProcess(); this.checkProcess();
return await this.rpcCall<WakuRelayMessage[]>( return await this.rpcCall<MessageRpcResponseHex[]>(
"get_waku_v2_private_v1_asymmetric_messages", "get_waku_v2_private_v1_asymmetric_messages",
[ [
pubSubTopic ? pubSubTopic : DefaultPubSubTopic, pubSubTopic ? pubSubTopic : DefaultPubSubTopic,
@ -309,7 +335,7 @@ export class Nwaku {
} }
async postSymmetricMessage( async postSymmetricMessage(
message: WakuRelayMessage, message: MessageRpcQuery,
symKey: Uint8Array, symKey: Uint8Array,
pubSubTopic?: string pubSubTopic?: string
): Promise<boolean> { ): Promise<boolean> {
@ -329,10 +355,10 @@ export class Nwaku {
async getSymmetricMessages( async getSymmetricMessages(
symKey: Uint8Array, symKey: Uint8Array,
pubSubTopic?: string pubSubTopic?: string
): Promise<WakuRelayMessage[]> { ): Promise<MessageRpcResponseHex[]> {
this.checkProcess(); this.checkProcess();
return await this.rpcCall<WakuRelayMessage[]>( return await this.rpcCall<MessageRpcResponseHex[]>(
"get_waku_v2_private_v1_symmetric_messages", "get_waku_v2_private_v1_symmetric_messages",
[ [
pubSubTopic ? pubSubTopic : DefaultPubSubTopic, pubSubTopic ? pubSubTopic : DefaultPubSubTopic,
@ -387,7 +413,7 @@ export class Nwaku {
headers: new Headers({ "Content-Type": "application/json" }), headers: new Headers({ "Content-Type": "application/json" }),
}); });
const json = await res.json(); const json = await res.json();
log(`RPC Response: `, res, json); log(`RPC Response: `, res, JSON.stringify(json));
return json.result; return json.result;
} }