mirror of https://github.com/waku-org/js-waku.git
feat!: add Waku Message ephemeral support
This commit is contained in:
parent
07e3a1ad69
commit
59992832fe
|
@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
|
|||
|
||||
## [Unreleased]
|
||||
|
||||
### Added
|
||||
|
||||
- Waku Message `ephemeral` field to mark messages as do-not-store.
|
||||
|
||||
## @waku/core [0.0.5](https://github.com/waku-org/js-waku/compare/@waku/core@0.0.4...@waku/core@0.0.5) (2022-11-11)
|
||||
|
||||
### Changed
|
||||
|
|
|
@ -20,5 +20,6 @@ describe("to proto message", () => {
|
|||
expect(keys).to.contain("version");
|
||||
expect(keys).to.contain("timestamp");
|
||||
expect(keys).to.contain("rateLimitProof");
|
||||
expect(keys).to.contain("ephemeral");
|
||||
});
|
||||
});
|
||||
|
|
|
@ -8,6 +8,7 @@ const EmptyMessage: ProtoMessage = {
|
|||
version: undefined,
|
||||
timestamp: undefined,
|
||||
rateLimitProof: undefined,
|
||||
ephemeral: undefined,
|
||||
};
|
||||
|
||||
export function toProtoMessage(wire: WakuMessageProto): ProtoMessage {
|
||||
|
|
|
@ -1,14 +1,15 @@
|
|||
import type { Decoder, Message, ProtoMessage } from "@waku/interfaces";
|
||||
import type { DecodedMessage, Decoder, ProtoMessage } from "@waku/interfaces";
|
||||
import debug from "debug";
|
||||
|
||||
import * as proto from "../../proto/topic_only_message";
|
||||
|
||||
const log = debug("waku:message:topic-only");
|
||||
|
||||
export class TopicOnlyMessage implements Message {
|
||||
export class TopicOnlyMessage implements DecodedMessage {
|
||||
public payload: undefined;
|
||||
public rateLimitProof: undefined;
|
||||
public timestamp: undefined;
|
||||
public ephemeral: undefined;
|
||||
|
||||
constructor(private proto: proto.TopicOnlyMessage) {}
|
||||
|
||||
|
@ -29,6 +30,7 @@ export class TopicOnlyDecoder implements Decoder<TopicOnlyMessage> {
|
|||
rateLimitProof: undefined,
|
||||
timestamp: undefined,
|
||||
version: undefined,
|
||||
ephemeral: undefined,
|
||||
});
|
||||
}
|
||||
|
||||
|
|
|
@ -17,6 +17,25 @@ describe("Waku Message version 0", function () {
|
|||
|
||||
expect(result.contentTopic).to.eq(TestContentTopic);
|
||||
expect(result.version).to.eq(0);
|
||||
expect(result.ephemeral).to.be.false;
|
||||
expect(result.payload).to.deep.eq(payload);
|
||||
expect(result.timestamp).to.not.be.undefined;
|
||||
})
|
||||
);
|
||||
});
|
||||
|
||||
it("Ephemeral", async function () {
|
||||
await fc.assert(
|
||||
fc.asyncProperty(fc.uint8Array({ minLength: 1 }), async (payload) => {
|
||||
const encoder = new EncoderV0(TestContentTopic, true);
|
||||
const bytes = await encoder.toWire({ payload });
|
||||
const decoder = new DecoderV0(TestContentTopic);
|
||||
const protoResult = await decoder.fromWireToProtoObj(bytes);
|
||||
const result = (await decoder.fromProtoObj(protoResult!)) as MessageV0;
|
||||
|
||||
expect(result.contentTopic).to.eq(TestContentTopic);
|
||||
expect(result.version).to.eq(0);
|
||||
expect(result.ephemeral).to.be.true;
|
||||
expect(result.payload).to.deep.eq(payload);
|
||||
expect(result.timestamp).to.not.be.undefined;
|
||||
})
|
||||
|
|
|
@ -26,6 +26,10 @@ export class MessageV0 implements DecodedMessage {
|
|||
return;
|
||||
}
|
||||
|
||||
get ephemeral(): boolean {
|
||||
return Boolean(this.proto.ephemeral);
|
||||
}
|
||||
|
||||
get payload(): Uint8Array | undefined {
|
||||
return this._rawPayload;
|
||||
}
|
||||
|
@ -68,7 +72,7 @@ export class MessageV0 implements DecodedMessage {
|
|||
}
|
||||
|
||||
export class EncoderV0 implements Encoder {
|
||||
constructor(public contentTopic: string) {}
|
||||
constructor(public contentTopic: string, public ephemeral: boolean = false) {}
|
||||
|
||||
async toWire(message: Partial<Message>): Promise<Uint8Array> {
|
||||
return proto.WakuMessage.encode(await this.toProtoObj(message));
|
||||
|
@ -83,12 +87,13 @@ export class EncoderV0 implements Encoder {
|
|||
contentTopic: this.contentTopic,
|
||||
timestamp: BigInt(timestamp.valueOf()) * OneMillion,
|
||||
rateLimitProof: message.rateLimitProof,
|
||||
ephemeral: this.ephemeral,
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
export class DecoderV0 implements Decoder<MessageV0> {
|
||||
constructor(public contentTopic: string) {}
|
||||
constructor(public contentTopic: string, public ephemeral: boolean = false) {}
|
||||
|
||||
fromWireToProtoObj(bytes: Uint8Array): Promise<ProtoMessage | undefined> {
|
||||
const protoMessage = proto.WakuMessage.decode(bytes);
|
||||
|
@ -99,6 +104,7 @@ export class DecoderV0 implements Decoder<MessageV0> {
|
|||
version: protoMessage.version ?? undefined,
|
||||
timestamp: protoMessage.timestamp ?? undefined,
|
||||
rateLimitProof: protoMessage.rateLimitProof ?? undefined,
|
||||
ephemeral: protoMessage.ephemeral ?? false,
|
||||
});
|
||||
}
|
||||
|
||||
|
|
|
@ -501,6 +501,7 @@ export interface WakuMessage {
|
|||
timestampDeprecated?: number;
|
||||
timestamp?: bigint;
|
||||
rateLimitProof?: RateLimitProof;
|
||||
ephemeral?: boolean;
|
||||
}
|
||||
|
||||
export namespace WakuMessage {
|
||||
|
@ -544,6 +545,11 @@ export namespace WakuMessage {
|
|||
RateLimitProof.codec().encode(obj.rateLimitProof, writer);
|
||||
}
|
||||
|
||||
if (obj.ephemeral != null) {
|
||||
writer.uint32(248);
|
||||
writer.bool(obj.ephemeral);
|
||||
}
|
||||
|
||||
if (opts.lengthDelimited !== false) {
|
||||
writer.ldelim();
|
||||
}
|
||||
|
@ -578,6 +584,9 @@ export namespace WakuMessage {
|
|||
reader.uint32()
|
||||
);
|
||||
break;
|
||||
case 31:
|
||||
obj.ephemeral = reader.bool();
|
||||
break;
|
||||
default:
|
||||
reader.skipType(tag & 7);
|
||||
break;
|
||||
|
|
|
@ -425,6 +425,7 @@ export interface WakuMessage {
|
|||
timestampDeprecated?: number;
|
||||
timestamp?: bigint;
|
||||
rateLimitProof?: RateLimitProof;
|
||||
ephemeral?: boolean;
|
||||
}
|
||||
|
||||
export namespace WakuMessage {
|
||||
|
@ -468,6 +469,11 @@ export namespace WakuMessage {
|
|||
RateLimitProof.codec().encode(obj.rateLimitProof, writer);
|
||||
}
|
||||
|
||||
if (obj.ephemeral != null) {
|
||||
writer.uint32(248);
|
||||
writer.bool(obj.ephemeral);
|
||||
}
|
||||
|
||||
if (opts.lengthDelimited !== false) {
|
||||
writer.ldelim();
|
||||
}
|
||||
|
@ -502,6 +508,9 @@ export namespace WakuMessage {
|
|||
reader.uint32()
|
||||
);
|
||||
break;
|
||||
case 31:
|
||||
obj.ephemeral = reader.bool();
|
||||
break;
|
||||
default:
|
||||
reader.skipType(tag & 7);
|
||||
break;
|
||||
|
|
|
@ -17,5 +17,6 @@ message WakuMessage {
|
|||
optional double timestamp_deprecated = 4;
|
||||
optional sint64 timestamp = 10;
|
||||
optional RateLimitProof rate_limit_proof = 21;
|
||||
optional bool ephemeral = 31;
|
||||
}
|
||||
|
||||
|
|
|
@ -203,6 +203,7 @@ export interface WakuMessage {
|
|||
timestampDeprecated?: number;
|
||||
timestamp?: bigint;
|
||||
rateLimitProof?: RateLimitProof;
|
||||
ephemeral?: boolean;
|
||||
}
|
||||
|
||||
export namespace WakuMessage {
|
||||
|
@ -246,6 +247,11 @@ export namespace WakuMessage {
|
|||
RateLimitProof.codec().encode(obj.rateLimitProof, writer);
|
||||
}
|
||||
|
||||
if (obj.ephemeral != null) {
|
||||
writer.uint32(248);
|
||||
writer.bool(obj.ephemeral);
|
||||
}
|
||||
|
||||
if (opts.lengthDelimited !== false) {
|
||||
writer.ldelim();
|
||||
}
|
||||
|
@ -280,6 +286,9 @@ export namespace WakuMessage {
|
|||
reader.uint32()
|
||||
);
|
||||
break;
|
||||
case 31:
|
||||
obj.ephemeral = reader.bool();
|
||||
break;
|
||||
default:
|
||||
reader.skipType(tag & 7);
|
||||
break;
|
||||
|
|
|
@ -743,6 +743,7 @@ export interface WakuMessage {
|
|||
timestampDeprecated?: number;
|
||||
timestamp?: bigint;
|
||||
rateLimitProof?: RateLimitProof;
|
||||
ephemeral?: boolean;
|
||||
}
|
||||
|
||||
export namespace WakuMessage {
|
||||
|
@ -786,6 +787,11 @@ export namespace WakuMessage {
|
|||
RateLimitProof.codec().encode(obj.rateLimitProof, writer);
|
||||
}
|
||||
|
||||
if (obj.ephemeral != null) {
|
||||
writer.uint32(248);
|
||||
writer.bool(obj.ephemeral);
|
||||
}
|
||||
|
||||
if (opts.lengthDelimited !== false) {
|
||||
writer.ldelim();
|
||||
}
|
||||
|
@ -820,6 +826,9 @@ export namespace WakuMessage {
|
|||
reader.uint32()
|
||||
);
|
||||
break;
|
||||
case 31:
|
||||
obj.ephemeral = reader.bool();
|
||||
break;
|
||||
default:
|
||||
reader.skipType(tag & 7);
|
||||
break;
|
||||
|
|
|
@ -165,6 +165,7 @@ export interface ProtoMessage {
|
|||
version: number | undefined;
|
||||
timestamp: bigint | undefined;
|
||||
rateLimitProof: RateLimitProof | undefined;
|
||||
ephemeral: boolean | undefined;
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -178,6 +179,7 @@ export interface Message {
|
|||
|
||||
export interface Encoder {
|
||||
contentTopic: string;
|
||||
ephemeral: boolean;
|
||||
toWire: (message: Message) => Promise<Uint8Array | undefined>;
|
||||
toProtoObj: (message: Message) => Promise<ProtoMessage | undefined>;
|
||||
}
|
||||
|
@ -187,6 +189,7 @@ export interface DecodedMessage {
|
|||
contentTopic: string | undefined;
|
||||
timestamp: Date | undefined;
|
||||
rateLimitProof: RateLimitProof | undefined;
|
||||
ephemeral: boolean | undefined;
|
||||
}
|
||||
|
||||
export interface Decoder<T extends DecodedMessage> {
|
||||
|
|
|
@ -66,7 +66,8 @@ export class AsymEncoder implements Encoder {
|
|||
constructor(
|
||||
public contentTopic: string,
|
||||
private publicKey: Uint8Array,
|
||||
private sigPrivKey?: Uint8Array
|
||||
private sigPrivKey?: Uint8Array,
|
||||
public ephemeral: boolean = false
|
||||
) {}
|
||||
|
||||
async toWire(message: Partial<Message>): Promise<Uint8Array | undefined> {
|
||||
|
@ -94,6 +95,7 @@ export class AsymEncoder implements Encoder {
|
|||
contentTopic: this.contentTopic,
|
||||
timestamp: BigInt(timestamp.valueOf()) * OneMillion,
|
||||
rateLimitProof: message.rateLimitProof,
|
||||
ephemeral: this.ephemeral,
|
||||
};
|
||||
}
|
||||
}
|
||||
|
@ -102,7 +104,8 @@ export class SymEncoder implements Encoder {
|
|||
constructor(
|
||||
public contentTopic: string,
|
||||
private symKey: Uint8Array,
|
||||
private sigPrivKey?: Uint8Array
|
||||
private sigPrivKey?: Uint8Array,
|
||||
public ephemeral: boolean = false
|
||||
) {}
|
||||
|
||||
async toWire(message: Partial<Message>): Promise<Uint8Array | undefined> {
|
||||
|
@ -129,6 +132,7 @@ export class SymEncoder implements Encoder {
|
|||
contentTopic: this.contentTopic,
|
||||
timestamp: BigInt(timestamp.valueOf()) * OneMillion,
|
||||
rateLimitProof: message.rateLimitProof,
|
||||
ephemeral: this.ephemeral,
|
||||
};
|
||||
}
|
||||
}
|
||||
|
|
|
@ -3,7 +3,7 @@ import { PageDirection } from "@waku/core";
|
|||
import { waitForRemotePeer } from "@waku/core/lib/wait_for_remote_peer";
|
||||
import { DecoderV0, EncoderV0 } from "@waku/core/lib/waku_message/version_0";
|
||||
import { createFullNode } from "@waku/create";
|
||||
import type { Message, WakuFull } from "@waku/interfaces";
|
||||
import { DecodedMessage, Message, WakuFull } from "@waku/interfaces";
|
||||
import { Protocols } from "@waku/interfaces";
|
||||
import {
|
||||
AsymDecoder,
|
||||
|
@ -343,7 +343,7 @@ describe("Waku Store", () => {
|
|||
|
||||
await waitForRemotePeer(waku2, [Protocols.Store]);
|
||||
|
||||
const messages: Message[] = [];
|
||||
const messages: DecodedMessage[] = [];
|
||||
log("Retrieve messages from store");
|
||||
|
||||
for await (const msgPromises of waku2.store.queryGenerator([
|
||||
|
@ -369,6 +369,143 @@ describe("Waku Store", () => {
|
|||
!!waku2 && waku2.stop().catch((e) => console.log("Waku failed to stop", e));
|
||||
});
|
||||
|
||||
it.skip("Ephemeral support", async function () {
|
||||
this.timeout(15_000);
|
||||
|
||||
const asymText = "This message is encrypted for me using asymmetric";
|
||||
const asymTopic = "/test/1/asymmetric/proto";
|
||||
|
||||
const symText =
|
||||
"This message is encrypted for me using symmetric encryption";
|
||||
const symTopic = "/test/1/symmetric/proto";
|
||||
|
||||
const clearText = "This is a clear text message for everyone to read";
|
||||
|
||||
const storeReadableText = "This message is readable by the store";
|
||||
const storeUnreadableText = "This message is not readable by the store";
|
||||
|
||||
const timestamp = new Date();
|
||||
|
||||
const asymMsg = { payload: utf8ToBytes(asymText), timestamp };
|
||||
const symMsg = {
|
||||
payload: utf8ToBytes(symText),
|
||||
timestamp: new Date(timestamp.valueOf() + 1),
|
||||
};
|
||||
const clearMsg = {
|
||||
payload: utf8ToBytes(clearText),
|
||||
timestamp: new Date(timestamp.valueOf() + 2),
|
||||
};
|
||||
|
||||
const storeReadableMsg = {
|
||||
payload: utf8ToBytes(storeReadableText),
|
||||
};
|
||||
const storeUnreadableMsg = {
|
||||
payload: utf8ToBytes(storeUnreadableText),
|
||||
};
|
||||
|
||||
const privateKey = generatePrivateKey();
|
||||
const symKey = generateSymmetricKey();
|
||||
const publicKey = getPublicKey(privateKey);
|
||||
|
||||
const storeWithAsymEncoder = new AsymEncoder(
|
||||
asymTopic,
|
||||
publicKey,
|
||||
undefined,
|
||||
false
|
||||
);
|
||||
const storeWithSymEncoder = new SymEncoder(
|
||||
symTopic,
|
||||
symKey,
|
||||
undefined,
|
||||
false
|
||||
);
|
||||
|
||||
const dontStoreWithAsymEncoder = new AsymEncoder(
|
||||
asymTopic,
|
||||
publicKey,
|
||||
undefined,
|
||||
true
|
||||
);
|
||||
const dontStoreWithSymEncoder = new SymEncoder(
|
||||
symTopic,
|
||||
symKey,
|
||||
undefined,
|
||||
true
|
||||
);
|
||||
|
||||
const storeEncoder = new EncoderV0(TestContentTopic, false);
|
||||
const storeUnreadableEncoder = new EncoderV0(TestContentTopic, true);
|
||||
|
||||
const asymDecoder = new AsymDecoder(asymTopic, privateKey);
|
||||
const symDecoder = new SymDecoder(symTopic, symKey);
|
||||
|
||||
const [waku1, waku2, nimWakuMultiaddr] = await Promise.all([
|
||||
createFullNode({
|
||||
staticNoiseKey: NOISE_KEY_1,
|
||||
}).then((waku) => waku.start().then(() => waku)),
|
||||
createFullNode({
|
||||
staticNoiseKey: NOISE_KEY_2,
|
||||
}).then((waku) => waku.start().then(() => waku)),
|
||||
nwaku.getMultiaddrWithId(),
|
||||
]);
|
||||
|
||||
log("Waku nodes created");
|
||||
|
||||
await Promise.all([
|
||||
waku1.dial(nimWakuMultiaddr),
|
||||
waku2.dial(nimWakuMultiaddr),
|
||||
]);
|
||||
|
||||
log("Waku nodes connected to nwaku");
|
||||
|
||||
await waitForRemotePeer(waku1, [Protocols.LightPush]);
|
||||
|
||||
log("Sending messages using light push");
|
||||
await Promise.all([
|
||||
waku1.lightPush.push(storeWithAsymEncoder, asymMsg),
|
||||
waku1.lightPush.push(storeWithSymEncoder, symMsg),
|
||||
waku1.lightPush.push(dontStoreWithAsymEncoder, asymMsg),
|
||||
waku1.lightPush.push(dontStoreWithSymEncoder, symMsg),
|
||||
waku1.lightPush.push(TestEncoder, clearMsg),
|
||||
waku1.lightPush.push(storeEncoder, storeReadableMsg),
|
||||
waku1.lightPush.push(storeUnreadableEncoder, storeUnreadableMsg),
|
||||
]);
|
||||
|
||||
await waitForRemotePeer(waku2, [Protocols.Store]);
|
||||
|
||||
const messages: DecodedMessage[] = [];
|
||||
log("Retrieve messages from store");
|
||||
|
||||
for await (const msgPromises of waku2.store.queryGenerator([
|
||||
asymDecoder,
|
||||
symDecoder,
|
||||
TestDecoder,
|
||||
])) {
|
||||
for (const promise of msgPromises) {
|
||||
const msg = await promise;
|
||||
if (msg) {
|
||||
messages.push(msg);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Messages are ordered from oldest to latest within a page (1 page query)
|
||||
expect(bytesToUtf8(messages[0].payload!)).to.eq(asymText);
|
||||
expect(bytesToUtf8(messages[1].payload!)).to.eq(symText);
|
||||
expect(bytesToUtf8(messages[2].payload!)).to.eq(clearText);
|
||||
expect(bytesToUtf8(messages[3].payload!)).to.eq(storeReadableText);
|
||||
expect(messages?.length).eq(4);
|
||||
|
||||
// check for ephemeral
|
||||
expect(messages[0].ephemeral).to.be.false;
|
||||
expect(messages[1].ephemeral).to.be.false;
|
||||
expect(messages[2].ephemeral).to.be.false;
|
||||
expect(messages[3].ephemeral).to.be.false;
|
||||
|
||||
!!waku1 && waku1.stop().catch((e) => console.log("Waku failed to stop", e));
|
||||
!!waku2 && waku2.stop().catch((e) => console.log("Waku failed to stop", e));
|
||||
});
|
||||
|
||||
it("Ordered callback, using start and end time", async function () {
|
||||
this.timeout(20000);
|
||||
|
||||
|
|
Loading…
Reference in New Issue