Merge pull request #1014 from waku-org/danisharora/add-ephemeral-support

This commit is contained in:
fryorcraken.eth 2022-11-16 11:06:42 +11:00 committed by GitHub
commit efe5b326a5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 222 additions and 8 deletions

View File

@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
## [Unreleased] ## [Unreleased]
### Added
- Waku Message `ephemeral` field to mark messages as do-not-store.
## @waku/core [0.0.5](https://github.com/waku-org/js-waku/compare/@waku/core@0.0.4...@waku/core@0.0.5) (2022-11-11) ## @waku/core [0.0.5](https://github.com/waku-org/js-waku/compare/@waku/core@0.0.4...@waku/core@0.0.5) (2022-11-11)
### Changed ### Changed

View File

@ -20,5 +20,6 @@ describe("to proto message", () => {
expect(keys).to.contain("version"); expect(keys).to.contain("version");
expect(keys).to.contain("timestamp"); expect(keys).to.contain("timestamp");
expect(keys).to.contain("rateLimitProof"); expect(keys).to.contain("rateLimitProof");
expect(keys).to.contain("ephemeral");
}); });
}); });

View File

@ -8,6 +8,7 @@ const EmptyMessage: ProtoMessage = {
version: undefined, version: undefined,
timestamp: undefined, timestamp: undefined,
rateLimitProof: undefined, rateLimitProof: undefined,
ephemeral: undefined,
}; };
export function toProtoMessage(wire: WakuMessageProto): ProtoMessage { export function toProtoMessage(wire: WakuMessageProto): ProtoMessage {

View File

@ -1,14 +1,15 @@
import type { Decoder, Message, ProtoMessage } from "@waku/interfaces"; import type { DecodedMessage, Decoder, ProtoMessage } from "@waku/interfaces";
import debug from "debug"; import debug from "debug";
import * as proto from "../../proto/topic_only_message"; import * as proto from "../../proto/topic_only_message";
const log = debug("waku:message:topic-only"); const log = debug("waku:message:topic-only");
export class TopicOnlyMessage implements Message { export class TopicOnlyMessage implements DecodedMessage {
public payload: undefined; public payload: undefined;
public rateLimitProof: undefined; public rateLimitProof: undefined;
public timestamp: undefined; public timestamp: undefined;
public ephemeral: undefined;
constructor(private proto: proto.TopicOnlyMessage) {} constructor(private proto: proto.TopicOnlyMessage) {}
@ -29,6 +30,7 @@ export class TopicOnlyDecoder implements Decoder<TopicOnlyMessage> {
rateLimitProof: undefined, rateLimitProof: undefined,
timestamp: undefined, timestamp: undefined,
version: undefined, version: undefined,
ephemeral: undefined,
}); });
} }

View File

@ -17,6 +17,25 @@ describe("Waku Message version 0", function () {
expect(result.contentTopic).to.eq(TestContentTopic); expect(result.contentTopic).to.eq(TestContentTopic);
expect(result.version).to.eq(0); expect(result.version).to.eq(0);
expect(result.ephemeral).to.be.false;
expect(result.payload).to.deep.eq(payload);
expect(result.timestamp).to.not.be.undefined;
})
);
});
it("Ephemeral", async function () {
await fc.assert(
fc.asyncProperty(fc.uint8Array({ minLength: 1 }), async (payload) => {
const encoder = new EncoderV0(TestContentTopic, true);
const bytes = await encoder.toWire({ payload });
const decoder = new DecoderV0(TestContentTopic);
const protoResult = await decoder.fromWireToProtoObj(bytes);
const result = (await decoder.fromProtoObj(protoResult!)) as MessageV0;
expect(result.contentTopic).to.eq(TestContentTopic);
expect(result.version).to.eq(0);
expect(result.ephemeral).to.be.true;
expect(result.payload).to.deep.eq(payload); expect(result.payload).to.deep.eq(payload);
expect(result.timestamp).to.not.be.undefined; expect(result.timestamp).to.not.be.undefined;
}) })

View File

@ -26,6 +26,10 @@ export class MessageV0 implements DecodedMessage {
return; return;
} }
get ephemeral(): boolean {
return Boolean(this.proto.ephemeral);
}
get payload(): Uint8Array | undefined { get payload(): Uint8Array | undefined {
return this._rawPayload; return this._rawPayload;
} }
@ -68,7 +72,7 @@ export class MessageV0 implements DecodedMessage {
} }
export class EncoderV0 implements Encoder { export class EncoderV0 implements Encoder {
constructor(public contentTopic: string) {} constructor(public contentTopic: string, public ephemeral: boolean = false) {}
async toWire(message: Partial<Message>): Promise<Uint8Array> { async toWire(message: Partial<Message>): Promise<Uint8Array> {
return proto.WakuMessage.encode(await this.toProtoObj(message)); return proto.WakuMessage.encode(await this.toProtoObj(message));
@ -83,12 +87,13 @@ export class EncoderV0 implements Encoder {
contentTopic: this.contentTopic, contentTopic: this.contentTopic,
timestamp: BigInt(timestamp.valueOf()) * OneMillion, timestamp: BigInt(timestamp.valueOf()) * OneMillion,
rateLimitProof: message.rateLimitProof, rateLimitProof: message.rateLimitProof,
ephemeral: this.ephemeral,
}; };
} }
} }
export class DecoderV0 implements Decoder<MessageV0> { export class DecoderV0 implements Decoder<MessageV0> {
constructor(public contentTopic: string) {} constructor(public contentTopic: string, public ephemeral: boolean = false) {}
fromWireToProtoObj(bytes: Uint8Array): Promise<ProtoMessage | undefined> { fromWireToProtoObj(bytes: Uint8Array): Promise<ProtoMessage | undefined> {
const protoMessage = proto.WakuMessage.decode(bytes); const protoMessage = proto.WakuMessage.decode(bytes);
@ -99,6 +104,7 @@ export class DecoderV0 implements Decoder<MessageV0> {
version: protoMessage.version ?? undefined, version: protoMessage.version ?? undefined,
timestamp: protoMessage.timestamp ?? undefined, timestamp: protoMessage.timestamp ?? undefined,
rateLimitProof: protoMessage.rateLimitProof ?? undefined, rateLimitProof: protoMessage.rateLimitProof ?? undefined,
ephemeral: protoMessage.ephemeral ?? false,
}); });
} }

View File

@ -501,6 +501,7 @@ export interface WakuMessage {
timestampDeprecated?: number; timestampDeprecated?: number;
timestamp?: bigint; timestamp?: bigint;
rateLimitProof?: RateLimitProof; rateLimitProof?: RateLimitProof;
ephemeral?: boolean;
} }
export namespace WakuMessage { export namespace WakuMessage {
@ -544,6 +545,11 @@ export namespace WakuMessage {
RateLimitProof.codec().encode(obj.rateLimitProof, writer); RateLimitProof.codec().encode(obj.rateLimitProof, writer);
} }
if (obj.ephemeral != null) {
writer.uint32(248);
writer.bool(obj.ephemeral);
}
if (opts.lengthDelimited !== false) { if (opts.lengthDelimited !== false) {
writer.ldelim(); writer.ldelim();
} }
@ -578,6 +584,9 @@ export namespace WakuMessage {
reader.uint32() reader.uint32()
); );
break; break;
case 31:
obj.ephemeral = reader.bool();
break;
default: default:
reader.skipType(tag & 7); reader.skipType(tag & 7);
break; break;

View File

@ -425,6 +425,7 @@ export interface WakuMessage {
timestampDeprecated?: number; timestampDeprecated?: number;
timestamp?: bigint; timestamp?: bigint;
rateLimitProof?: RateLimitProof; rateLimitProof?: RateLimitProof;
ephemeral?: boolean;
} }
export namespace WakuMessage { export namespace WakuMessage {
@ -468,6 +469,11 @@ export namespace WakuMessage {
RateLimitProof.codec().encode(obj.rateLimitProof, writer); RateLimitProof.codec().encode(obj.rateLimitProof, writer);
} }
if (obj.ephemeral != null) {
writer.uint32(248);
writer.bool(obj.ephemeral);
}
if (opts.lengthDelimited !== false) { if (opts.lengthDelimited !== false) {
writer.ldelim(); writer.ldelim();
} }
@ -502,6 +508,9 @@ export namespace WakuMessage {
reader.uint32() reader.uint32()
); );
break; break;
case 31:
obj.ephemeral = reader.bool();
break;
default: default:
reader.skipType(tag & 7); reader.skipType(tag & 7);
break; break;

View File

@ -17,5 +17,6 @@ message WakuMessage {
optional double timestamp_deprecated = 4; optional double timestamp_deprecated = 4;
optional sint64 timestamp = 10; optional sint64 timestamp = 10;
optional RateLimitProof rate_limit_proof = 21; optional RateLimitProof rate_limit_proof = 21;
optional bool ephemeral = 31;
} }

View File

@ -203,6 +203,7 @@ export interface WakuMessage {
timestampDeprecated?: number; timestampDeprecated?: number;
timestamp?: bigint; timestamp?: bigint;
rateLimitProof?: RateLimitProof; rateLimitProof?: RateLimitProof;
ephemeral?: boolean;
} }
export namespace WakuMessage { export namespace WakuMessage {
@ -246,6 +247,11 @@ export namespace WakuMessage {
RateLimitProof.codec().encode(obj.rateLimitProof, writer); RateLimitProof.codec().encode(obj.rateLimitProof, writer);
} }
if (obj.ephemeral != null) {
writer.uint32(248);
writer.bool(obj.ephemeral);
}
if (opts.lengthDelimited !== false) { if (opts.lengthDelimited !== false) {
writer.ldelim(); writer.ldelim();
} }
@ -280,6 +286,9 @@ export namespace WakuMessage {
reader.uint32() reader.uint32()
); );
break; break;
case 31:
obj.ephemeral = reader.bool();
break;
default: default:
reader.skipType(tag & 7); reader.skipType(tag & 7);
break; break;

View File

@ -743,6 +743,7 @@ export interface WakuMessage {
timestampDeprecated?: number; timestampDeprecated?: number;
timestamp?: bigint; timestamp?: bigint;
rateLimitProof?: RateLimitProof; rateLimitProof?: RateLimitProof;
ephemeral?: boolean;
} }
export namespace WakuMessage { export namespace WakuMessage {
@ -786,6 +787,11 @@ export namespace WakuMessage {
RateLimitProof.codec().encode(obj.rateLimitProof, writer); RateLimitProof.codec().encode(obj.rateLimitProof, writer);
} }
if (obj.ephemeral != null) {
writer.uint32(248);
writer.bool(obj.ephemeral);
}
if (opts.lengthDelimited !== false) { if (opts.lengthDelimited !== false) {
writer.ldelim(); writer.ldelim();
} }
@ -820,6 +826,9 @@ export namespace WakuMessage {
reader.uint32() reader.uint32()
); );
break; break;
case 31:
obj.ephemeral = reader.bool();
break;
default: default:
reader.skipType(tag & 7); reader.skipType(tag & 7);
break; break;

View File

@ -165,6 +165,7 @@ export interface ProtoMessage {
version: number | undefined; version: number | undefined;
timestamp: bigint | undefined; timestamp: bigint | undefined;
rateLimitProof: RateLimitProof | undefined; rateLimitProof: RateLimitProof | undefined;
ephemeral: boolean | undefined;
} }
/** /**
@ -178,6 +179,7 @@ export interface Message {
export interface Encoder { export interface Encoder {
contentTopic: string; contentTopic: string;
ephemeral: boolean;
toWire: (message: Message) => Promise<Uint8Array | undefined>; toWire: (message: Message) => Promise<Uint8Array | undefined>;
toProtoObj: (message: Message) => Promise<ProtoMessage | undefined>; toProtoObj: (message: Message) => Promise<ProtoMessage | undefined>;
} }
@ -187,6 +189,7 @@ export interface DecodedMessage {
contentTopic: string | undefined; contentTopic: string | undefined;
timestamp: Date | undefined; timestamp: Date | undefined;
rateLimitProof: RateLimitProof | undefined; rateLimitProof: RateLimitProof | undefined;
ephemeral: boolean | undefined;
} }
export interface Decoder<T extends DecodedMessage> { export interface Decoder<T extends DecodedMessage> {

View File

@ -66,7 +66,8 @@ export class AsymEncoder implements Encoder {
constructor( constructor(
public contentTopic: string, public contentTopic: string,
private publicKey: Uint8Array, private publicKey: Uint8Array,
private sigPrivKey?: Uint8Array private sigPrivKey?: Uint8Array,
public ephemeral: boolean = false
) {} ) {}
async toWire(message: Partial<Message>): Promise<Uint8Array | undefined> { async toWire(message: Partial<Message>): Promise<Uint8Array | undefined> {
@ -94,6 +95,7 @@ export class AsymEncoder implements Encoder {
contentTopic: this.contentTopic, contentTopic: this.contentTopic,
timestamp: BigInt(timestamp.valueOf()) * OneMillion, timestamp: BigInt(timestamp.valueOf()) * OneMillion,
rateLimitProof: message.rateLimitProof, rateLimitProof: message.rateLimitProof,
ephemeral: this.ephemeral,
}; };
} }
} }
@ -102,7 +104,8 @@ export class SymEncoder implements Encoder {
constructor( constructor(
public contentTopic: string, public contentTopic: string,
private symKey: Uint8Array, private symKey: Uint8Array,
private sigPrivKey?: Uint8Array private sigPrivKey?: Uint8Array,
public ephemeral: boolean = false
) {} ) {}
async toWire(message: Partial<Message>): Promise<Uint8Array | undefined> { async toWire(message: Partial<Message>): Promise<Uint8Array | undefined> {
@ -129,6 +132,7 @@ export class SymEncoder implements Encoder {
contentTopic: this.contentTopic, contentTopic: this.contentTopic,
timestamp: BigInt(timestamp.valueOf()) * OneMillion, timestamp: BigInt(timestamp.valueOf()) * OneMillion,
rateLimitProof: message.rateLimitProof, rateLimitProof: message.rateLimitProof,
ephemeral: this.ephemeral,
}; };
} }
} }

View File

@ -3,7 +3,7 @@ import { PageDirection } from "@waku/core";
import { waitForRemotePeer } from "@waku/core/lib/wait_for_remote_peer"; import { waitForRemotePeer } from "@waku/core/lib/wait_for_remote_peer";
import { DecoderV0, EncoderV0 } from "@waku/core/lib/waku_message/version_0"; import { DecoderV0, EncoderV0 } from "@waku/core/lib/waku_message/version_0";
import { createFullNode } from "@waku/create"; import { createFullNode } from "@waku/create";
import type { Message, WakuFull } from "@waku/interfaces"; import { DecodedMessage, Message, WakuFull } from "@waku/interfaces";
import { Protocols } from "@waku/interfaces"; import { Protocols } from "@waku/interfaces";
import { import {
AsymDecoder, AsymDecoder,
@ -343,7 +343,7 @@ describe("Waku Store", () => {
await waitForRemotePeer(waku2, [Protocols.Store]); await waitForRemotePeer(waku2, [Protocols.Store]);
const messages: Message[] = []; const messages: DecodedMessage[] = [];
log("Retrieve messages from store"); log("Retrieve messages from store");
for await (const msgPromises of waku2.store.queryGenerator([ for await (const msgPromises of waku2.store.queryGenerator([
@ -369,6 +369,143 @@ describe("Waku Store", () => {
!!waku2 && waku2.stop().catch((e) => console.log("Waku failed to stop", e)); !!waku2 && waku2.stop().catch((e) => console.log("Waku failed to stop", e));
}); });
it.skip("Ephemeral support", async function () {
this.timeout(15_000);
const asymText = "This message is encrypted for me using asymmetric";
const asymTopic = "/test/1/asymmetric/proto";
const symText =
"This message is encrypted for me using symmetric encryption";
const symTopic = "/test/1/symmetric/proto";
const clearText = "This is a clear text message for everyone to read";
const storeReadableText = "This message is readable by the store";
const storeUnreadableText = "This message is not readable by the store";
const timestamp = new Date();
const asymMsg = { payload: utf8ToBytes(asymText), timestamp };
const symMsg = {
payload: utf8ToBytes(symText),
timestamp: new Date(timestamp.valueOf() + 1),
};
const clearMsg = {
payload: utf8ToBytes(clearText),
timestamp: new Date(timestamp.valueOf() + 2),
};
const storeReadableMsg = {
payload: utf8ToBytes(storeReadableText),
};
const storeUnreadableMsg = {
payload: utf8ToBytes(storeUnreadableText),
};
const privateKey = generatePrivateKey();
const symKey = generateSymmetricKey();
const publicKey = getPublicKey(privateKey);
const storeWithAsymEncoder = new AsymEncoder(
asymTopic,
publicKey,
undefined,
false
);
const storeWithSymEncoder = new SymEncoder(
symTopic,
symKey,
undefined,
false
);
const dontStoreWithAsymEncoder = new AsymEncoder(
asymTopic,
publicKey,
undefined,
true
);
const dontStoreWithSymEncoder = new SymEncoder(
symTopic,
symKey,
undefined,
true
);
const storeEncoder = new EncoderV0(TestContentTopic, false);
const storeUnreadableEncoder = new EncoderV0(TestContentTopic, true);
const asymDecoder = new AsymDecoder(asymTopic, privateKey);
const symDecoder = new SymDecoder(symTopic, symKey);
const [waku1, waku2, nimWakuMultiaddr] = await Promise.all([
createFullNode({
staticNoiseKey: NOISE_KEY_1,
}).then((waku) => waku.start().then(() => waku)),
createFullNode({
staticNoiseKey: NOISE_KEY_2,
}).then((waku) => waku.start().then(() => waku)),
nwaku.getMultiaddrWithId(),
]);
log("Waku nodes created");
await Promise.all([
waku1.dial(nimWakuMultiaddr),
waku2.dial(nimWakuMultiaddr),
]);
log("Waku nodes connected to nwaku");
await waitForRemotePeer(waku1, [Protocols.LightPush]);
log("Sending messages using light push");
await Promise.all([
waku1.lightPush.push(storeWithAsymEncoder, asymMsg),
waku1.lightPush.push(storeWithSymEncoder, symMsg),
waku1.lightPush.push(dontStoreWithAsymEncoder, asymMsg),
waku1.lightPush.push(dontStoreWithSymEncoder, symMsg),
waku1.lightPush.push(TestEncoder, clearMsg),
waku1.lightPush.push(storeEncoder, storeReadableMsg),
waku1.lightPush.push(storeUnreadableEncoder, storeUnreadableMsg),
]);
await waitForRemotePeer(waku2, [Protocols.Store]);
const messages: DecodedMessage[] = [];
log("Retrieve messages from store");
for await (const msgPromises of waku2.store.queryGenerator([
asymDecoder,
symDecoder,
TestDecoder,
])) {
for (const promise of msgPromises) {
const msg = await promise;
if (msg) {
messages.push(msg);
}
}
}
// Messages are ordered from oldest to latest within a page (1 page query)
expect(bytesToUtf8(messages[0].payload!)).to.eq(asymText);
expect(bytesToUtf8(messages[1].payload!)).to.eq(symText);
expect(bytesToUtf8(messages[2].payload!)).to.eq(clearText);
expect(bytesToUtf8(messages[3].payload!)).to.eq(storeReadableText);
expect(messages?.length).eq(4);
// check for ephemeral
expect(messages[0].ephemeral).to.be.false;
expect(messages[1].ephemeral).to.be.false;
expect(messages[2].ephemeral).to.be.false;
expect(messages[3].ephemeral).to.be.false;
!!waku1 && waku1.stop().catch((e) => console.log("Waku failed to stop", e));
!!waku2 && waku2.stop().catch((e) => console.log("Waku failed to stop", e));
});
it("Ordered callback, using start and end time", async function () { it("Ordered callback, using start and end time", async function () {
this.timeout(20000); this.timeout(20000);