feat!: split outgoing and incoming message interface

While the data structure are similar, they serve different purposes.
Having the same type has show to confuse API consumers

Resolves #979
This commit is contained in:
fryorcraken.eth 2022-11-04 14:01:30 +11:00
parent de6415f4f1
commit 8aa9b43f61
No known key found for this signature in database
GPG Key ID: A82ED75A8DFC50A4
9 changed files with 78 additions and 47 deletions

View File

@ -4,6 +4,7 @@ import type { Peer } from "@libp2p/interface-peer-store";
import type { IncomingStreamData } from "@libp2p/interface-registrar"; import type { IncomingStreamData } from "@libp2p/interface-registrar";
import type { import type {
Callback, Callback,
DecodedMessage,
Decoder, Decoder,
Filter, Filter,
Message, Message,
@ -77,7 +78,7 @@ export class WakuFilter implements Filter {
* @param opts The FilterSubscriptionOpts used to narrow which messages are returned, and which peer to connect to. * @param opts The FilterSubscriptionOpts used to narrow which messages are returned, and which peer to connect to.
* @returns Unsubscribe function that can be used to end the subscription. * @returns Unsubscribe function that can be used to end the subscription.
*/ */
async subscribe<T extends Message>( async subscribe<T extends DecodedMessage>(
decoders: Decoder<T>[], decoders: Decoder<T>[],
callback: Callback<T>, callback: Callback<T>,
opts?: ProtocolOptions opts?: ProtocolOptions
@ -211,7 +212,7 @@ export class WakuFilter implements Filter {
this.subscriptions.delete(requestId); this.subscriptions.delete(requestId);
} }
private addDecoders<T extends Message>( private addDecoders<T extends DecodedMessage>(
decoders: Map<string, Array<Decoder<T>>> decoders: Map<string, Array<Decoder<T>>>
): void { ): void {
decoders.forEach((decoders, contentTopic) => { decoders.forEach((decoders, contentTopic) => {
@ -224,7 +225,7 @@ export class WakuFilter implements Filter {
}); });
} }
private deleteDecoders<T extends Message>( private deleteDecoders<T extends DecodedMessage>(
decoders: Map<string, Array<Decoder<T>>> decoders: Map<string, Array<Decoder<T>>>
): void { ): void {
decoders.forEach((decoders, contentTopic) => { decoders.forEach((decoders, contentTopic) => {

View File

@ -1,4 +1,5 @@
import type { import type {
DecodedMessage,
Decoder, Decoder,
Encoder, Encoder,
Message, Message,
@ -15,7 +16,7 @@ const OneMillion = BigInt(1_000_000);
export const Version = 0; export const Version = 0;
export { proto }; export { proto };
export class MessageV0 implements Message { export class MessageV0 implements DecodedMessage {
constructor(protected proto: proto.WakuMessage) {} constructor(protected proto: proto.WakuMessage) {}
get _rawPayload(): Uint8Array | undefined { get _rawPayload(): Uint8Array | undefined {
@ -79,7 +80,7 @@ export class EncoderV0 implements Encoder {
return { return {
payload: message.payload, payload: message.payload,
version: Version, version: Version,
contentTopic: message.contentTopic ?? this.contentTopic, contentTopic: this.contentTopic,
timestamp: BigInt(timestamp.valueOf()) * OneMillion, timestamp: BigInt(timestamp.valueOf()) * OneMillion,
rateLimitProof: message.rateLimitProof, rateLimitProof: message.rateLimitProof,
}; };

View File

@ -16,6 +16,7 @@ import type {
Relay, Relay,
SendResult, SendResult,
} from "@waku/interfaces"; } from "@waku/interfaces";
import { DecodedMessage } from "@waku/interfaces";
import debug from "debug"; import debug from "debug";
import { DefaultPubSubTopic } from "../constants"; import { DefaultPubSubTopic } from "../constants";
@ -26,7 +27,7 @@ import * as constants from "./constants";
const log = debug("waku:relay"); const log = debug("waku:relay");
export type Observer<T extends Message> = { export type Observer<T extends DecodedMessage> = {
decoder: Decoder<T>; decoder: Decoder<T>;
callback: Callback<T>; callback: Callback<T>;
}; };
@ -56,7 +57,7 @@ export type CreateOptions = {
*/ */
export class WakuRelay extends GossipSub implements Relay { export class WakuRelay extends GossipSub implements Relay {
pubSubTopic: string; pubSubTopic: string;
defaultDecoder: Decoder<Message>; defaultDecoder: Decoder<DecodedMessage>;
public static multicodec: string = constants.RelayCodecs[0]; public static multicodec: string = constants.RelayCodecs[0];
/** /**
@ -114,7 +115,7 @@ export class WakuRelay extends GossipSub implements Relay {
* *
* @returns Function to delete the observer * @returns Function to delete the observer
*/ */
addObserver<T extends Message>( addObserver<T extends DecodedMessage>(
decoder: Decoder<T>, decoder: Decoder<T>,
callback: Callback<T> callback: Callback<T>
): () => void { ): () => void {

View File

@ -1,7 +1,7 @@
import type { Connection } from "@libp2p/interface-connection"; import type { Connection } from "@libp2p/interface-connection";
import type { PeerId } from "@libp2p/interface-peer-id"; import type { PeerId } from "@libp2p/interface-peer-id";
import { Peer } from "@libp2p/interface-peer-store"; import { Peer } from "@libp2p/interface-peer-store";
import { Decoder, Message } from "@waku/interfaces"; import { DecodedMessage, Decoder } from "@waku/interfaces";
import debug from "debug"; import debug from "debug";
import all from "it-all"; import all from "it-all";
import * as lp from "it-length-prefixed"; import * as lp from "it-length-prefixed";
@ -106,7 +106,7 @@ export class WakuStore {
* or if an error is encountered when processing the reply, * or if an error is encountered when processing the reply,
* or if two decoders with the same content topic are passed. * or if two decoders with the same content topic are passed.
*/ */
async queryOrderedCallback<T extends Message>( async queryOrderedCallback<T extends DecodedMessage>(
decoders: Decoder<T>[], decoders: Decoder<T>[],
callback: (message: T) => Promise<void | boolean> | boolean | void, callback: (message: T) => Promise<void | boolean> | boolean | void,
options?: QueryOptions options?: QueryOptions
@ -155,7 +155,7 @@ export class WakuStore {
* or if an error is encountered when processing the reply, * or if an error is encountered when processing the reply,
* or if two decoders with the same content topic are passed. * or if two decoders with the same content topic are passed.
*/ */
async queryCallbackOnPromise<T extends Message>( async queryCallbackOnPromise<T extends DecodedMessage>(
decoders: Decoder<T>[], decoders: Decoder<T>[],
callback: ( callback: (
message: Promise<T | undefined> message: Promise<T | undefined>
@ -193,7 +193,7 @@ export class WakuStore {
* or if an error is encountered when processing the reply, * or if an error is encountered when processing the reply,
* or if two decoders with the same content topic are passed. * or if two decoders with the same content topic are passed.
*/ */
async *queryGenerator<T extends Message>( async *queryGenerator<T extends DecodedMessage>(
decoders: Decoder<T>[], decoders: Decoder<T>[],
options?: QueryOptions options?: QueryOptions
): AsyncGenerator<Promise<T | undefined>[]> { ): AsyncGenerator<Promise<T | undefined>[]> {
@ -266,7 +266,7 @@ export class WakuStore {
} }
} }
async function* paginate<T extends Message>( async function* paginate<T extends DecodedMessage>(
connection: Connection, connection: Connection,
protocol: string, protocol: string,
queryOpts: Params, queryOpts: Params,

View File

@ -28,7 +28,7 @@ export type ProtocolOptions = {
export type Callback<T extends Message> = (msg: T) => void | Promise<void>; export type Callback<T extends Message> = (msg: T) => void | Promise<void>;
export interface Filter extends PointToPointProtocol { export interface Filter extends PointToPointProtocol {
subscribe: <T extends Message>( subscribe: <T extends DecodedMessage>(
decoders: Decoder<T>[], decoders: Decoder<T>[],
callback: Callback<T>, callback: Callback<T>,
opts?: ProtocolOptions opts?: ProtocolOptions
@ -38,7 +38,7 @@ export interface Filter extends PointToPointProtocol {
export interface LightPush extends PointToPointProtocol { export interface LightPush extends PointToPointProtocol {
push: ( push: (
encoder: Encoder, encoder: Encoder,
message: Partial<Message>, message: Message,
opts?: ProtocolOptions opts?: ProtocolOptions
) => Promise<SendResult>; ) => Promise<SendResult>;
} }
@ -76,27 +76,27 @@ export type StoreQueryOptions = {
} & ProtocolOptions; } & ProtocolOptions;
export interface Store extends PointToPointProtocol { export interface Store extends PointToPointProtocol {
queryOrderedCallback: <T extends Message>( queryOrderedCallback: <T extends DecodedMessage>(
decoders: Decoder<T>[], decoders: Decoder<T>[],
callback: (message: T) => Promise<void | boolean> | boolean | void, callback: (message: T) => Promise<void | boolean> | boolean | void,
options?: StoreQueryOptions options?: StoreQueryOptions
) => Promise<void>; ) => Promise<void>;
queryCallbackOnPromise: <T extends Message>( queryCallbackOnPromise: <T extends DecodedMessage>(
decoders: Decoder<T>[], decoders: Decoder<T>[],
callback: ( callback: (
message: Promise<T | undefined> message: Promise<T | undefined>
) => Promise<void | boolean> | boolean | void, ) => Promise<void | boolean> | boolean | void,
options?: StoreQueryOptions options?: StoreQueryOptions
) => Promise<void>; ) => Promise<void>;
queryGenerator: <T extends Message>( queryGenerator: <T extends DecodedMessage>(
decoders: Decoder<T>[], decoders: Decoder<T>[],
options?: StoreQueryOptions options?: StoreQueryOptions
) => AsyncGenerator<Promise<T | undefined>[]>; ) => AsyncGenerator<Promise<T | undefined>[]>;
} }
export interface Relay extends GossipSub { export interface Relay extends GossipSub {
send: (encoder: Encoder, message: Partial<Message>) => Promise<SendResult>; send: (encoder: Encoder, message: Message) => Promise<SendResult>;
addObserver: <T extends Message>( addObserver: <T extends DecodedMessage>(
decoder: Decoder<T>, decoder: Decoder<T>,
callback: Callback<T> callback: Callback<T>
) => () => void; ) => () => void;
@ -155,6 +155,10 @@ export interface RateLimitProof {
rlnIdentifier: Uint8Array; rlnIdentifier: Uint8Array;
} }
/**
* Interface matching the protobuf library.
* Field types matches the protobuf type over the wire
*/
export interface ProtoMessage { export interface ProtoMessage {
payload: Uint8Array | undefined; payload: Uint8Array | undefined;
contentTopic: string | undefined; contentTopic: string | undefined;
@ -163,20 +167,29 @@ export interface ProtoMessage {
rateLimitProof: RateLimitProof | undefined; rateLimitProof: RateLimitProof | undefined;
} }
/**
* Interface for messages to encode and send.
*/
export interface Message { export interface Message {
payload?: Uint8Array;
timestamp?: Date;
rateLimitProof?: RateLimitProof;
}
export interface Encoder {
contentTopic: string;
toWire: (message: Message) => Promise<Uint8Array | undefined>;
toProtoObj: (message: Message) => Promise<ProtoMessage | undefined>;
}
export interface DecodedMessage {
payload: Uint8Array | undefined; payload: Uint8Array | undefined;
contentTopic: string | undefined; contentTopic: string | undefined;
timestamp: Date | undefined; timestamp: Date | undefined;
rateLimitProof: RateLimitProof | undefined; rateLimitProof: RateLimitProof | undefined;
} }
export interface Encoder { export interface Decoder<T extends DecodedMessage> {
contentTopic: string;
toWire: (message: Partial<Message>) => Promise<Uint8Array | undefined>;
toProtoObj: (message: Partial<Message>) => Promise<ProtoMessage | undefined>;
}
export interface Decoder<T extends Message> {
contentTopic: string; contentTopic: string;
fromWireToProtoObj: (bytes: Uint8Array) => Promise<ProtoMessage | undefined>; fromWireToProtoObj: (bytes: Uint8Array) => Promise<ProtoMessage | undefined>;
fromProtoObj: (proto: ProtoMessage) => Promise<T | undefined>; fromProtoObj: (proto: ProtoMessage) => Promise<T | undefined>;

View File

@ -5,7 +5,13 @@ import {
MessageV0, MessageV0,
proto, proto,
} from "@waku/core/lib/waku_message/version_0"; } from "@waku/core/lib/waku_message/version_0";
import type { Decoder, Encoder, Message, ProtoMessage } from "@waku/interfaces"; import type {
DecodedMessage,
Decoder,
Encoder,
Message,
ProtoMessage,
} from "@waku/interfaces";
import debug from "debug"; import debug from "debug";
import { Symmetric } from "./constants.js"; import { Symmetric } from "./constants.js";
@ -38,7 +44,7 @@ export type Signature = {
publicKey: Uint8Array | undefined; publicKey: Uint8Array | undefined;
}; };
export class MessageV1 extends MessageV0 implements Message { export class MessageV1 extends MessageV0 implements DecodedMessage {
private readonly _decodedPayload: Uint8Array; private readonly _decodedPayload: Uint8Array;
constructor( constructor(

View File

@ -2,7 +2,7 @@ import { bytesToUtf8, utf8ToBytes } from "@waku/byte-utils";
import { waitForRemotePeer } from "@waku/core/lib/wait_for_remote_peer"; import { waitForRemotePeer } from "@waku/core/lib/wait_for_remote_peer";
import { DecoderV0, EncoderV0 } from "@waku/core/lib/waku_message/version_0"; import { DecoderV0, EncoderV0 } from "@waku/core/lib/waku_message/version_0";
import { createFullNode } from "@waku/create"; import { createFullNode } from "@waku/create";
import type { Message, WakuFull } from "@waku/interfaces"; import type { DecodedMessage, WakuFull } from "@waku/interfaces";
import { Protocols } from "@waku/interfaces"; import { Protocols } from "@waku/interfaces";
import { expect } from "chai"; import { expect } from "chai";
import debug from "debug"; import debug from "debug";
@ -47,7 +47,7 @@ describe("Waku Filter", () => {
const messageText = "Filtering works!"; const messageText = "Filtering works!";
const message = { payload: utf8ToBytes(messageText) }; const message = { payload: utf8ToBytes(messageText) };
const callback = (msg: Message): void => { const callback = (msg: DecodedMessage): void => {
log("Got a message"); log("Got a message");
messageCount++; messageCount++;
expect(msg.contentTopic).to.eq(TestContentTopic); expect(msg.contentTopic).to.eq(TestContentTopic);
@ -71,7 +71,7 @@ describe("Waku Filter", () => {
this.timeout(10000); this.timeout(10000);
let messageCount = 0; let messageCount = 0;
const callback = (msg: Message): void => { const callback = (msg: DecodedMessage): void => {
messageCount++; messageCount++;
expect(msg.contentTopic).to.eq(TestContentTopic); expect(msg.contentTopic).to.eq(TestContentTopic);
}; };

View File

@ -8,7 +8,7 @@ import {
MessageV0, MessageV0,
} from "@waku/core/lib/waku_message/version_0"; } from "@waku/core/lib/waku_message/version_0";
import { createPrivacyNode } from "@waku/create"; import { createPrivacyNode } from "@waku/create";
import type { Message, WakuPrivacy } from "@waku/interfaces"; import type { DecodedMessage, WakuPrivacy } from "@waku/interfaces";
import { Protocols } from "@waku/interfaces"; import { Protocols } from "@waku/interfaces";
import { import {
AsymDecoder, AsymDecoder,
@ -118,9 +118,11 @@ describe("Waku Relay [node only]", () => {
timestamp: messageTimestamp, timestamp: messageTimestamp,
}; };
const receivedMsgPromise: Promise<Message> = new Promise((resolve) => { const receivedMsgPromise: Promise<DecodedMessage> = new Promise(
waku2.relay.addObserver(TestDecoder, resolve); (resolve) => {
}); waku2.relay.addObserver(TestDecoder, resolve);
}
);
await waku1.relay.send(TestEncoder, message); await waku1.relay.send(TestEncoder, message);
@ -148,12 +150,12 @@ describe("Waku Relay [node only]", () => {
const fooDecoder = new DecoderV0(fooContentTopic); const fooDecoder = new DecoderV0(fooContentTopic);
const barDecoder = new DecoderV0(barContentTopic); const barDecoder = new DecoderV0(barContentTopic);
const fooMessages: Message[] = []; const fooMessages: DecodedMessage[] = [];
waku2.relay.addObserver(fooDecoder, (msg) => { waku2.relay.addObserver(fooDecoder, (msg) => {
fooMessages.push(msg); fooMessages.push(msg);
}); });
const barMessages: Message[] = []; const barMessages: DecodedMessage[] = [];
waku2.relay.addObserver(barDecoder, (msg) => { waku2.relay.addObserver(barDecoder, (msg) => {
barMessages.push(msg); barMessages.push(msg);
}); });
@ -197,7 +199,7 @@ describe("Waku Relay [node only]", () => {
const asymDecoder = new AsymDecoder(asymTopic, privateKey); const asymDecoder = new AsymDecoder(asymTopic, privateKey);
const symDecoder = new SymDecoder(symTopic, symKey); const symDecoder = new SymDecoder(symTopic, symKey);
const msgs: Message[] = []; const msgs: DecodedMessage[] = [];
waku2.relay.addObserver(asymDecoder, (wakuMsg) => { waku2.relay.addObserver(asymDecoder, (wakuMsg) => {
msgs.push(wakuMsg); msgs.push(wakuMsg);
}); });
@ -228,7 +230,7 @@ describe("Waku Relay [node only]", () => {
const contentTopic = "added-then-deleted-observer"; const contentTopic = "added-then-deleted-observer";
// The promise **fails** if we receive a message on this observer. // The promise **fails** if we receive a message on this observer.
const receivedMsgPromise: Promise<Message> = new Promise( const receivedMsgPromise: Promise<DecodedMessage> = new Promise(
(resolve, reject) => { (resolve, reject) => {
const deleteObserver = waku2.relay.addObserver( const deleteObserver = waku2.relay.addObserver(
new DecoderV0(contentTopic), new DecoderV0(contentTopic),
@ -304,7 +306,7 @@ describe("Waku Relay [node only]", () => {
const messageText = "Communicating using a custom pubsub topic"; const messageText = "Communicating using a custom pubsub topic";
const waku2ReceivedMsgPromise: Promise<Message> = new Promise( const waku2ReceivedMsgPromise: Promise<DecodedMessage> = new Promise(
(resolve) => { (resolve) => {
waku2.relay.addObserver(TestDecoder, resolve); waku2.relay.addObserver(TestDecoder, resolve);
} }
@ -312,7 +314,7 @@ describe("Waku Relay [node only]", () => {
// The promise **fails** if we receive a message on the default // The promise **fails** if we receive a message on the default
// pubsub topic. // pubsub topic.
const waku3NoMsgPromise: Promise<Message> = new Promise( const waku3NoMsgPromise: Promise<DecodedMessage> = new Promise(
(resolve, reject) => { (resolve, reject) => {
waku3.relay.addObserver(TestDecoder, reject); waku3.relay.addObserver(TestDecoder, reject);
setTimeout(resolve, 1000); setTimeout(resolve, 1000);
@ -466,7 +468,7 @@ describe("Waku Relay [node only]", () => {
const msgStr = "Hello there!"; const msgStr = "Hello there!";
const message = { payload: utf8ToBytes(msgStr) }; const message = { payload: utf8ToBytes(msgStr) };
const waku2ReceivedMsgPromise: Promise<Message> = new Promise( const waku2ReceivedMsgPromise: Promise<DecodedMessage> = new Promise(
(resolve) => { (resolve) => {
waku2.relay.addObserver(TestDecoder, resolve); waku2.relay.addObserver(TestDecoder, resolve);
} }

View File

@ -3,7 +3,12 @@ import { bytesToUtf8, utf8ToBytes } from "@waku/byte-utils";
import { PeerDiscoveryStaticPeers } from "@waku/core/lib/peer_discovery_static_list"; import { PeerDiscoveryStaticPeers } from "@waku/core/lib/peer_discovery_static_list";
import { waitForRemotePeer } from "@waku/core/lib/wait_for_remote_peer"; import { waitForRemotePeer } from "@waku/core/lib/wait_for_remote_peer";
import { createLightNode, createPrivacyNode } from "@waku/create"; import { createLightNode, createPrivacyNode } from "@waku/create";
import type { Message, Waku, WakuLight, WakuPrivacy } from "@waku/interfaces"; import type {
DecodedMessage,
Waku,
WakuLight,
WakuPrivacy,
} from "@waku/interfaces";
import { Protocols } from "@waku/interfaces"; import { Protocols } from "@waku/interfaces";
import { import {
generateSymmetricKey, generateSymmetricKey,
@ -175,9 +180,11 @@ describe("Decryption Keys", () => {
timestamp: messageTimestamp, timestamp: messageTimestamp,
}; };
const receivedMsgPromise: Promise<Message> = new Promise((resolve) => { const receivedMsgPromise: Promise<DecodedMessage> = new Promise(
waku2.relay.addObserver(decoder, resolve); (resolve) => {
}); waku2.relay.addObserver(decoder, resolve);
}
);
await waku1.relay.send(encoder, message); await waku1.relay.send(encoder, message);