Merge pull request #975 from waku-org/proof-attached

This commit is contained in:
fryorcraken.eth 2022-09-30 13:12:51 +10:00 committed by GitHub
commit 29436eafdc
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 139 additions and 55 deletions

View File

@ -11,6 +11,11 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- `RateLimitProof` field in Waku Message protobuf for RLN. - `RateLimitProof` field in Waku Message protobuf for RLN.
### Changed
- `Message` interface changed to ensure implementations do not omit fields.
- `Decoder` and `Encoder` interfaces change to better express what the function members do.
## [0.29.0] - 2022-09-21 ## [0.29.0] - 2022-09-21
### Changed ### Changed

View File

@ -62,30 +62,30 @@ export interface RateLimitProof {
} }
export interface ProtoMessage { export interface ProtoMessage {
payload?: Uint8Array; payload: Uint8Array | undefined;
contentTopic?: string; contentTopic: string | undefined;
version?: number; version: number | undefined;
timestamp?: bigint; timestamp: bigint | undefined;
rateLimitProof?: RateLimitProof; rateLimitProof: RateLimitProof | undefined;
} }
export interface Message { export interface Message {
payload?: Uint8Array; payload: Uint8Array | undefined;
contentTopic?: string; contentTopic: string | undefined;
timestamp?: Date; timestamp: Date | undefined;
rateLimitProof?: RateLimitProof; rateLimitProof: RateLimitProof | undefined;
} }
export interface Encoder { export interface Encoder {
contentTopic: string; contentTopic: string;
encode: (message: Message) => Promise<Uint8Array | undefined>; toWire: (message: Partial<Message>) => Promise<Uint8Array | undefined>;
encodeProto: (message: Message) => Promise<ProtoMessage | undefined>; toProtoObj: (message: Partial<Message>) => Promise<ProtoMessage | undefined>;
} }
export interface Decoder<T extends Message> { export interface Decoder<T extends Message> {
contentTopic: string; contentTopic: string;
decodeProto: (bytes: Uint8Array) => Promise<ProtoMessage | undefined>; fromWireToProtoObj: (bytes: Uint8Array) => Promise<ProtoMessage | undefined>;
decode: (proto: ProtoMessage) => Promise<T | undefined>; fromProtoObj: (proto: ProtoMessage) => Promise<T | undefined>;
} }
export interface SendResult { export interface SendResult {

View File

@ -0,0 +1,24 @@
import { expect } from "chai";
import { WakuMessage as WakuMessageProto } from "../proto/message";
import { toProtoMessage } from "./to_proto_message";
describe("to proto message", () => {
it("Fields are not dropped", () => {
const wire: WakuMessageProto = {
contentTopic: "foo",
};
const protoMessage = toProtoMessage(wire);
expect(protoMessage.contentTopic).to.eq("foo");
const keys = Object.keys(protoMessage);
expect(keys).to.contain("payload");
expect(keys).to.contain("contentTopic");
expect(keys).to.contain("version");
expect(keys).to.contain("timestamp");
expect(keys).to.contain("rateLimitProof");
});
});

View File

@ -0,0 +1,15 @@
import { WakuMessage as WakuMessageProto } from "../proto/message";
import { ProtoMessage } from "./interfaces";
const EmptyMessage: ProtoMessage = {
payload: undefined,
contentTopic: undefined,
version: undefined,
timestamp: undefined,
rateLimitProof: undefined,
};
export function toProtoMessage(wire: WakuMessageProto): ProtoMessage {
return { ...EmptyMessage, ...wire };
}

View File

@ -18,6 +18,7 @@ import {
selectPeerForProtocol, selectPeerForProtocol,
selectRandomPeer, selectRandomPeer,
} from "../select_peer"; } from "../select_peer";
import { toProtoMessage } from "../to_proto_message";
import { ContentFilter, FilterRPC } from "./filter_rpc"; import { ContentFilter, FilterRPC } from "./filter_rpc";
export { ContentFilter }; export { ContentFilter };
@ -198,7 +199,7 @@ export class WakuFilter {
// noinspection ES6MissingAwait // noinspection ES6MissingAwait
decoders.forEach(async (dec) => { decoders.forEach(async (dec) => {
if (msg) return; if (msg) return;
const decoded = await dec.decode(protoMessage); const decoded = await dec.fromProtoObj(toProtoMessage(protoMessage));
if (!decoded) { if (!decoded) {
log("Not able to decode message"); log("Not able to decode message");
return; return;

View File

@ -53,7 +53,7 @@ export class WakuLightPush {
async push( async push(
encoder: Encoder, encoder: Encoder,
message: Message, message: Partial<Message>,
opts?: PushOptions opts?: PushOptions
): Promise<SendResult> { ): Promise<SendResult> {
const pubSubTopic = opts?.pubSubTopic ? opts.pubSubTopic : this.pubSubTopic; const pubSubTopic = opts?.pubSubTopic ? opts.pubSubTopic : this.pubSubTopic;
@ -79,7 +79,7 @@ export class WakuLightPush {
const recipients: PeerId[] = []; const recipients: PeerId[] = [];
try { try {
const protoMessage = await encoder.encodeProto(message); const protoMessage = await encoder.toProtoObj(message);
if (!protoMessage) { if (!protoMessage) {
log("Failed to encode to protoMessage, aborting push"); log("Failed to encode to protoMessage, aborting push");
return { recipients }; return { recipients };

View File

@ -6,6 +6,10 @@ import type { Decoder, Message, ProtoMessage } from "../interfaces";
const log = debug("waku:message:topic-only"); const log = debug("waku:message:topic-only");
export class TopicOnlyMessage implements Message { export class TopicOnlyMessage implements Message {
public payload: undefined;
public rateLimitProof: undefined;
public timestamp: undefined;
constructor(private proto: proto.TopicOnlyMessage) {} constructor(private proto: proto.TopicOnlyMessage) {}
get contentTopic(): string { get contentTopic(): string {
@ -16,13 +20,21 @@ export class TopicOnlyMessage implements Message {
export class TopicOnlyDecoder implements Decoder<TopicOnlyMessage> { export class TopicOnlyDecoder implements Decoder<TopicOnlyMessage> {
public contentTopic = ""; public contentTopic = "";
decodeProto(bytes: Uint8Array): Promise<ProtoMessage | undefined> { fromWireToProtoObj(bytes: Uint8Array): Promise<ProtoMessage | undefined> {
const protoMessage = proto.TopicOnlyMessage.decode(bytes); const protoMessage = proto.TopicOnlyMessage.decode(bytes);
log("Message decoded", protoMessage); log("Message decoded", protoMessage);
return Promise.resolve(protoMessage); return Promise.resolve({
contentTopic: protoMessage.contentTopic,
payload: undefined,
rateLimitProof: undefined,
timestamp: undefined,
version: undefined,
});
} }
async decode(proto: ProtoMessage): Promise<TopicOnlyMessage | undefined> { async fromProtoObj(
proto: ProtoMessage
): Promise<TopicOnlyMessage | undefined> {
return new TopicOnlyMessage(proto); return new TopicOnlyMessage(proto);
} }
} }

View File

@ -10,10 +10,10 @@ describe("Waku Message version 0", function () {
await fc.assert( await fc.assert(
fc.asyncProperty(fc.uint8Array({ minLength: 1 }), async (payload) => { fc.asyncProperty(fc.uint8Array({ minLength: 1 }), async (payload) => {
const encoder = new EncoderV0(TestContentTopic); const encoder = new EncoderV0(TestContentTopic);
const bytes = await encoder.encode({ payload }); const bytes = await encoder.toWire({ payload });
const decoder = new DecoderV0(TestContentTopic); const decoder = new DecoderV0(TestContentTopic);
const protoResult = await decoder.decodeProto(bytes); const protoResult = await decoder.fromWireToProtoObj(bytes);
const result = (await decoder.decode(protoResult!)) as MessageV0; const result = (await decoder.fromProtoObj(protoResult!)) as MessageV0;
expect(result.contentTopic).to.eq(TestContentTopic); expect(result.contentTopic).to.eq(TestContentTopic);
expect(result.version).to.eq(0); expect(result.version).to.eq(0);

View File

@ -1,7 +1,7 @@
import debug from "debug"; import debug from "debug";
import * as proto from "../../proto/message"; import * as proto from "../../proto/message";
import { Decoder, Message, ProtoMessage } from "../interfaces"; import { Decoder, Message, ProtoMessage, RateLimitProof } from "../interfaces";
import { Encoder } from "../interfaces"; import { Encoder } from "../interfaces";
const log = debug("waku:message:version-0"); const log = debug("waku:message:version-0");
@ -54,16 +54,20 @@ export class MessageV0 implements Message {
// https://github.com/status-im/js-waku/issues/921 // https://github.com/status-im/js-waku/issues/921
return this.proto.version ?? 0; return this.proto.version ?? 0;
} }
get rateLimitProof(): RateLimitProof | undefined {
return this.proto.rateLimitProof;
}
} }
export class EncoderV0 implements Encoder { export class EncoderV0 implements Encoder {
constructor(public contentTopic: string) {} constructor(public contentTopic: string) {}
async encode(message: Message): Promise<Uint8Array> { async toWire(message: Partial<Message>): Promise<Uint8Array> {
return proto.WakuMessage.encode(await this.encodeProto(message)); return proto.WakuMessage.encode(await this.toProtoObj(message));
} }
async encodeProto(message: Message): Promise<ProtoMessage> { async toProtoObj(message: Partial<Message>): Promise<ProtoMessage> {
const timestamp = message.timestamp ?? new Date(); const timestamp = message.timestamp ?? new Date();
return { return {
@ -71,6 +75,7 @@ export class EncoderV0 implements Encoder {
version: Version, version: Version,
contentTopic: message.contentTopic ?? this.contentTopic, contentTopic: message.contentTopic ?? this.contentTopic,
timestamp: BigInt(timestamp.valueOf()) * OneMillion, timestamp: BigInt(timestamp.valueOf()) * OneMillion,
rateLimitProof: message.rateLimitProof,
}; };
} }
} }
@ -78,13 +83,19 @@ export class EncoderV0 implements Encoder {
export class DecoderV0 implements Decoder<MessageV0> { export class DecoderV0 implements Decoder<MessageV0> {
constructor(public contentTopic: string) {} constructor(public contentTopic: string) {}
decodeProto(bytes: Uint8Array): Promise<ProtoMessage | undefined> { fromWireToProtoObj(bytes: Uint8Array): Promise<ProtoMessage | undefined> {
const protoMessage = proto.WakuMessage.decode(bytes); const protoMessage = proto.WakuMessage.decode(bytes);
log("Message decoded", protoMessage); log("Message decoded", protoMessage);
return Promise.resolve(protoMessage); return Promise.resolve({
payload: protoMessage.payload ?? undefined,
contentTopic: protoMessage.contentTopic ?? undefined,
version: protoMessage.version ?? undefined,
timestamp: protoMessage.timestamp ?? undefined,
rateLimitProof: protoMessage.rateLimitProof ?? undefined,
});
} }
async decode(proto: ProtoMessage): Promise<MessageV0 | undefined> { async fromProtoObj(proto: ProtoMessage): Promise<MessageV0 | undefined> {
// https://github.com/status-im/js-waku/issues/921 // https://github.com/status-im/js-waku/issues/921
if (proto.version === undefined) { if (proto.version === undefined) {
proto.version = 0; proto.version = 0;

View File

@ -28,12 +28,12 @@ describe("Waku Message version 1", function () {
const publicKey = getPublicKey(privateKey); const publicKey = getPublicKey(privateKey);
const encoder = new AsymEncoder(TestContentTopic, publicKey); const encoder = new AsymEncoder(TestContentTopic, publicKey);
const bytes = await encoder.encode({ payload }); const bytes = await encoder.toWire({ payload });
const decoder = new AsymDecoder(TestContentTopic, privateKey); const decoder = new AsymDecoder(TestContentTopic, privateKey);
const protoResult = await decoder.decodeProto(bytes!); const protoResult = await decoder.fromWireToProtoObj(bytes!);
if (!protoResult) throw "Failed to proto decode"; if (!protoResult) throw "Failed to proto decode";
const result = await decoder.decode(protoResult); const result = await decoder.fromProtoObj(protoResult);
if (!result) throw "Failed to decode"; if (!result) throw "Failed to decode";
expect(result.contentTopic).to.equal(TestContentTopic); expect(result.contentTopic).to.equal(TestContentTopic);
@ -63,12 +63,12 @@ describe("Waku Message version 1", function () {
bobPublicKey, bobPublicKey,
alicePrivateKey alicePrivateKey
); );
const bytes = await encoder.encode({ payload }); const bytes = await encoder.toWire({ payload });
const decoder = new AsymDecoder(TestContentTopic, bobPrivateKey); const decoder = new AsymDecoder(TestContentTopic, bobPrivateKey);
const protoResult = await decoder.decodeProto(bytes!); const protoResult = await decoder.fromWireToProtoObj(bytes!);
if (!protoResult) throw "Failed to proto decode"; if (!protoResult) throw "Failed to proto decode";
const result = await decoder.decode(protoResult); const result = await decoder.fromProtoObj(protoResult);
if (!result) throw "Failed to decode"; if (!result) throw "Failed to decode";
expect(result.contentTopic).to.equal(TestContentTopic); expect(result.contentTopic).to.equal(TestContentTopic);
@ -88,12 +88,12 @@ describe("Waku Message version 1", function () {
fc.uint8Array({ min: 1, minLength: 32, maxLength: 32 }), fc.uint8Array({ min: 1, minLength: 32, maxLength: 32 }),
async (payload, symKey) => { async (payload, symKey) => {
const encoder = new SymEncoder(TestContentTopic, symKey); const encoder = new SymEncoder(TestContentTopic, symKey);
const bytes = await encoder.encode({ payload }); const bytes = await encoder.toWire({ payload });
const decoder = new SymDecoder(TestContentTopic, symKey); const decoder = new SymDecoder(TestContentTopic, symKey);
const protoResult = await decoder.decodeProto(bytes!); const protoResult = await decoder.fromWireToProtoObj(bytes!);
if (!protoResult) throw "Failed to proto decode"; if (!protoResult) throw "Failed to proto decode";
const result = await decoder.decode(protoResult); const result = await decoder.fromProtoObj(protoResult);
if (!result) throw "Failed to decode"; if (!result) throw "Failed to decode";
expect(result.contentTopic).to.equal(TestContentTopic); expect(result.contentTopic).to.equal(TestContentTopic);
@ -116,12 +116,12 @@ describe("Waku Message version 1", function () {
const sigPubKey = getPublicKey(sigPrivKey); const sigPubKey = getPublicKey(sigPrivKey);
const encoder = new SymEncoder(TestContentTopic, symKey, sigPrivKey); const encoder = new SymEncoder(TestContentTopic, symKey, sigPrivKey);
const bytes = await encoder.encode({ payload }); const bytes = await encoder.toWire({ payload });
const decoder = new SymDecoder(TestContentTopic, symKey); const decoder = new SymDecoder(TestContentTopic, symKey);
const protoResult = await decoder.decodeProto(bytes!); const protoResult = await decoder.fromWireToProtoObj(bytes!);
if (!protoResult) throw "Failed to proto decode"; if (!protoResult) throw "Failed to proto decode";
const result = await decoder.decode(protoResult); const result = await decoder.fromProtoObj(protoResult);
if (!result) throw "Failed to decode"; if (!result) throw "Failed to decode";
expect(result.contentTopic).to.equal(TestContentTopic); expect(result.contentTopic).to.equal(TestContentTopic);

View File

@ -52,14 +52,16 @@ export class AsymEncoder implements Encoder {
private sigPrivKey?: Uint8Array private sigPrivKey?: Uint8Array
) {} ) {}
async encode(message: Message): Promise<Uint8Array | undefined> { async toWire(message: Partial<Message>): Promise<Uint8Array | undefined> {
const protoMessage = await this.encodeProto(message); const protoMessage = await this.toProtoObj(message);
if (!protoMessage) return; if (!protoMessage) return;
return proto.WakuMessage.encode(protoMessage); return proto.WakuMessage.encode(protoMessage);
} }
async encodeProto(message: Message): Promise<ProtoMessage | undefined> { async toProtoObj(
message: Partial<Message>
): Promise<ProtoMessage | undefined> {
const timestamp = message.timestamp ?? new Date(); const timestamp = message.timestamp ?? new Date();
if (!message.payload) { if (!message.payload) {
log("No payload to encrypt, skipping: ", message); log("No payload to encrypt, skipping: ", message);
@ -74,6 +76,7 @@ export class AsymEncoder implements Encoder {
version: Version, version: Version,
contentTopic: this.contentTopic, contentTopic: this.contentTopic,
timestamp: BigInt(timestamp.valueOf()) * OneMillion, timestamp: BigInt(timestamp.valueOf()) * OneMillion,
rateLimitProof: message.rateLimitProof,
}; };
} }
} }
@ -85,14 +88,16 @@ export class SymEncoder implements Encoder {
private sigPrivKey?: Uint8Array private sigPrivKey?: Uint8Array
) {} ) {}
async encode(message: Message): Promise<Uint8Array | undefined> { async toWire(message: Partial<Message>): Promise<Uint8Array | undefined> {
const protoMessage = await this.encodeProto(message); const protoMessage = await this.toProtoObj(message);
if (!protoMessage) return; if (!protoMessage) return;
return proto.WakuMessage.encode(protoMessage); return proto.WakuMessage.encode(protoMessage);
} }
async encodeProto(message: Message): Promise<ProtoMessage | undefined> { async toProtoObj(
message: Partial<Message>
): Promise<ProtoMessage | undefined> {
const timestamp = message.timestamp ?? new Date(); const timestamp = message.timestamp ?? new Date();
if (!message.payload) { if (!message.payload) {
log("No payload to encrypt, skipping: ", message); log("No payload to encrypt, skipping: ", message);
@ -106,6 +111,7 @@ export class SymEncoder implements Encoder {
version: Version, version: Version,
contentTopic: this.contentTopic, contentTopic: this.contentTopic,
timestamp: BigInt(timestamp.valueOf()) * OneMillion, timestamp: BigInt(timestamp.valueOf()) * OneMillion,
rateLimitProof: message.rateLimitProof,
}; };
} }
} }
@ -115,7 +121,9 @@ export class AsymDecoder extends DecoderV0 implements Decoder<MessageV1> {
super(contentTopic); super(contentTopic);
} }
async decode(protoMessage: ProtoMessage): Promise<MessageV1 | undefined> { async fromProtoObj(
protoMessage: ProtoMessage
): Promise<MessageV1 | undefined> {
const cipherPayload = protoMessage.payload; const cipherPayload = protoMessage.payload;
if (protoMessage.version !== Version) { if (protoMessage.version !== Version) {
@ -171,7 +179,9 @@ export class SymDecoder extends DecoderV0 implements Decoder<MessageV1> {
super(contentTopic); super(contentTopic);
} }
async decode(protoMessage: ProtoMessage): Promise<MessageV1 | undefined> { async fromProtoObj(
protoMessage: ProtoMessage
): Promise<MessageV1 | undefined> {
const cipherPayload = protoMessage.payload; const cipherPayload = protoMessage.payload;
if (protoMessage.version !== Version) { if (protoMessage.version !== Version) {

View File

@ -92,8 +92,11 @@ export class WakuRelay extends GossipSub {
/** /**
* Send Waku message. * Send Waku message.
*/ */
public async send(encoder: Encoder, message: Message): Promise<SendResult> { public async send(
const msg = await encoder.encode(message); encoder: Encoder,
message: Partial<Message>
): Promise<SendResult> {
const msg = await encoder.toWire(message);
if (!msg) { if (!msg) {
log("Failed to encode message, aborting publish"); log("Failed to encode message, aborting publish");
return { recipients: [] }; return { recipients: [] };
@ -136,7 +139,7 @@ export class WakuRelay extends GossipSub {
if (event.detail.msg.topic !== pubSubTopic) return; if (event.detail.msg.topic !== pubSubTopic) return;
log(`Message received on ${pubSubTopic}`); log(`Message received on ${pubSubTopic}`);
const topicOnlyMsg = await this.defaultDecoder.decodeProto( const topicOnlyMsg = await this.defaultDecoder.fromWireToProtoObj(
event.detail.msg.data event.detail.msg.data
); );
if (!topicOnlyMsg || !topicOnlyMsg.contentTopic) { if (!topicOnlyMsg || !topicOnlyMsg.contentTopic) {
@ -150,14 +153,16 @@ export class WakuRelay extends GossipSub {
} }
await Promise.all( await Promise.all(
Array.from(observers).map(async ({ decoder, callback }) => { Array.from(observers).map(async ({ decoder, callback }) => {
const protoMsg = await decoder.decodeProto(event.detail.msg.data); const protoMsg = await decoder.fromWireToProtoObj(
event.detail.msg.data
);
if (!protoMsg) { if (!protoMsg) {
log( log(
"Internal error: message previously decoded failed on 2nd pass." "Internal error: message previously decoded failed on 2nd pass."
); );
return; return;
} }
const msg = await decoder.decode(protoMsg); const msg = await decoder.fromProtoObj(protoMsg);
if (msg) { if (msg) {
callback(msg); callback(msg);
} else { } else {

View File

@ -14,6 +14,7 @@ import { DefaultPubSubTopic, StoreCodecs } from "../constants";
import { Decoder, Message } from "../interfaces"; import { Decoder, Message } from "../interfaces";
import { selectConnection } from "../select_connection"; import { selectConnection } from "../select_connection";
import { getPeersForProtocol, selectPeerForProtocol } from "../select_peer"; import { getPeersForProtocol, selectPeerForProtocol } from "../select_peer";
import { toProtoMessage } from "../to_proto_message";
import { HistoryRPC, PageDirection, Params } from "./history_rpc"; import { HistoryRPC, PageDirection, Params } from "./history_rpc";
@ -341,7 +342,7 @@ async function* paginate<T extends Message>(
if (typeof contentTopic !== "undefined") { if (typeof contentTopic !== "undefined") {
const decoder = decoders.get(contentTopic); const decoder = decoders.get(contentTopic);
if (decoder) { if (decoder) {
return decoder.decode(protoMsg); return decoder.fromProtoObj(toProtoMessage(protoMsg));
} }
} }
return Promise.resolve(undefined); return Promise.resolve(undefined);