mirror of https://github.com/waku-org/js-waku.git
Merge pull request #1001 from waku-org/feat/split-message-interface
This commit is contained in:
commit
21f249b73b
|
@ -4,6 +4,7 @@ import type { Peer } from "@libp2p/interface-peer-store";
|
||||||
import type { IncomingStreamData } from "@libp2p/interface-registrar";
|
import type { IncomingStreamData } from "@libp2p/interface-registrar";
|
||||||
import type {
|
import type {
|
||||||
Callback,
|
Callback,
|
||||||
|
DecodedMessage,
|
||||||
Decoder,
|
Decoder,
|
||||||
Filter,
|
Filter,
|
||||||
Message,
|
Message,
|
||||||
|
@ -77,7 +78,7 @@ export class WakuFilter implements Filter {
|
||||||
* @param opts The FilterSubscriptionOpts used to narrow which messages are returned, and which peer to connect to.
|
* @param opts The FilterSubscriptionOpts used to narrow which messages are returned, and which peer to connect to.
|
||||||
* @returns Unsubscribe function that can be used to end the subscription.
|
* @returns Unsubscribe function that can be used to end the subscription.
|
||||||
*/
|
*/
|
||||||
async subscribe<T extends Message>(
|
async subscribe<T extends DecodedMessage>(
|
||||||
decoders: Decoder<T>[],
|
decoders: Decoder<T>[],
|
||||||
callback: Callback<T>,
|
callback: Callback<T>,
|
||||||
opts?: ProtocolOptions
|
opts?: ProtocolOptions
|
||||||
|
@ -211,7 +212,7 @@ export class WakuFilter implements Filter {
|
||||||
this.subscriptions.delete(requestId);
|
this.subscriptions.delete(requestId);
|
||||||
}
|
}
|
||||||
|
|
||||||
private addDecoders<T extends Message>(
|
private addDecoders<T extends DecodedMessage>(
|
||||||
decoders: Map<string, Array<Decoder<T>>>
|
decoders: Map<string, Array<Decoder<T>>>
|
||||||
): void {
|
): void {
|
||||||
decoders.forEach((decoders, contentTopic) => {
|
decoders.forEach((decoders, contentTopic) => {
|
||||||
|
@ -224,7 +225,7 @@ export class WakuFilter implements Filter {
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
private deleteDecoders<T extends Message>(
|
private deleteDecoders<T extends DecodedMessage>(
|
||||||
decoders: Map<string, Array<Decoder<T>>>
|
decoders: Map<string, Array<Decoder<T>>>
|
||||||
): void {
|
): void {
|
||||||
decoders.forEach((decoders, contentTopic) => {
|
decoders.forEach((decoders, contentTopic) => {
|
||||||
|
|
|
@ -1,4 +1,5 @@
|
||||||
import type {
|
import type {
|
||||||
|
DecodedMessage,
|
||||||
Decoder,
|
Decoder,
|
||||||
Encoder,
|
Encoder,
|
||||||
Message,
|
Message,
|
||||||
|
@ -15,7 +16,7 @@ const OneMillion = BigInt(1_000_000);
|
||||||
export const Version = 0;
|
export const Version = 0;
|
||||||
export { proto };
|
export { proto };
|
||||||
|
|
||||||
export class MessageV0 implements Message {
|
export class MessageV0 implements DecodedMessage {
|
||||||
constructor(protected proto: proto.WakuMessage) {}
|
constructor(protected proto: proto.WakuMessage) {}
|
||||||
|
|
||||||
get _rawPayload(): Uint8Array | undefined {
|
get _rawPayload(): Uint8Array | undefined {
|
||||||
|
@ -79,7 +80,7 @@ export class EncoderV0 implements Encoder {
|
||||||
return {
|
return {
|
||||||
payload: message.payload,
|
payload: message.payload,
|
||||||
version: Version,
|
version: Version,
|
||||||
contentTopic: message.contentTopic ?? this.contentTopic,
|
contentTopic: this.contentTopic,
|
||||||
timestamp: BigInt(timestamp.valueOf()) * OneMillion,
|
timestamp: BigInt(timestamp.valueOf()) * OneMillion,
|
||||||
rateLimitProof: message.rateLimitProof,
|
rateLimitProof: message.rateLimitProof,
|
||||||
};
|
};
|
||||||
|
|
|
@ -16,6 +16,7 @@ import type {
|
||||||
Relay,
|
Relay,
|
||||||
SendResult,
|
SendResult,
|
||||||
} from "@waku/interfaces";
|
} from "@waku/interfaces";
|
||||||
|
import { DecodedMessage } from "@waku/interfaces";
|
||||||
import debug from "debug";
|
import debug from "debug";
|
||||||
|
|
||||||
import { DefaultPubSubTopic } from "../constants";
|
import { DefaultPubSubTopic } from "../constants";
|
||||||
|
@ -26,7 +27,7 @@ import * as constants from "./constants";
|
||||||
|
|
||||||
const log = debug("waku:relay");
|
const log = debug("waku:relay");
|
||||||
|
|
||||||
export type Observer<T extends Message> = {
|
export type Observer<T extends DecodedMessage> = {
|
||||||
decoder: Decoder<T>;
|
decoder: Decoder<T>;
|
||||||
callback: Callback<T>;
|
callback: Callback<T>;
|
||||||
};
|
};
|
||||||
|
@ -56,7 +57,7 @@ export type CreateOptions = {
|
||||||
*/
|
*/
|
||||||
export class WakuRelay extends GossipSub implements Relay {
|
export class WakuRelay extends GossipSub implements Relay {
|
||||||
pubSubTopic: string;
|
pubSubTopic: string;
|
||||||
defaultDecoder: Decoder<Message>;
|
defaultDecoder: Decoder<DecodedMessage>;
|
||||||
public static multicodec: string = constants.RelayCodecs[0];
|
public static multicodec: string = constants.RelayCodecs[0];
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -114,7 +115,7 @@ export class WakuRelay extends GossipSub implements Relay {
|
||||||
*
|
*
|
||||||
* @returns Function to delete the observer
|
* @returns Function to delete the observer
|
||||||
*/
|
*/
|
||||||
addObserver<T extends Message>(
|
addObserver<T extends DecodedMessage>(
|
||||||
decoder: Decoder<T>,
|
decoder: Decoder<T>,
|
||||||
callback: Callback<T>
|
callback: Callback<T>
|
||||||
): () => void {
|
): () => void {
|
||||||
|
|
|
@ -1,7 +1,7 @@
|
||||||
import type { Connection } from "@libp2p/interface-connection";
|
import type { Connection } from "@libp2p/interface-connection";
|
||||||
import type { PeerId } from "@libp2p/interface-peer-id";
|
import type { PeerId } from "@libp2p/interface-peer-id";
|
||||||
import { Peer } from "@libp2p/interface-peer-store";
|
import { Peer } from "@libp2p/interface-peer-store";
|
||||||
import { Decoder, Message } from "@waku/interfaces";
|
import { DecodedMessage, Decoder } from "@waku/interfaces";
|
||||||
import debug from "debug";
|
import debug from "debug";
|
||||||
import all from "it-all";
|
import all from "it-all";
|
||||||
import * as lp from "it-length-prefixed";
|
import * as lp from "it-length-prefixed";
|
||||||
|
@ -106,7 +106,7 @@ export class WakuStore {
|
||||||
* or if an error is encountered when processing the reply,
|
* or if an error is encountered when processing the reply,
|
||||||
* or if two decoders with the same content topic are passed.
|
* or if two decoders with the same content topic are passed.
|
||||||
*/
|
*/
|
||||||
async queryOrderedCallback<T extends Message>(
|
async queryOrderedCallback<T extends DecodedMessage>(
|
||||||
decoders: Decoder<T>[],
|
decoders: Decoder<T>[],
|
||||||
callback: (message: T) => Promise<void | boolean> | boolean | void,
|
callback: (message: T) => Promise<void | boolean> | boolean | void,
|
||||||
options?: QueryOptions
|
options?: QueryOptions
|
||||||
|
@ -155,7 +155,7 @@ export class WakuStore {
|
||||||
* or if an error is encountered when processing the reply,
|
* or if an error is encountered when processing the reply,
|
||||||
* or if two decoders with the same content topic are passed.
|
* or if two decoders with the same content topic are passed.
|
||||||
*/
|
*/
|
||||||
async queryCallbackOnPromise<T extends Message>(
|
async queryCallbackOnPromise<T extends DecodedMessage>(
|
||||||
decoders: Decoder<T>[],
|
decoders: Decoder<T>[],
|
||||||
callback: (
|
callback: (
|
||||||
message: Promise<T | undefined>
|
message: Promise<T | undefined>
|
||||||
|
@ -193,7 +193,7 @@ export class WakuStore {
|
||||||
* or if an error is encountered when processing the reply,
|
* or if an error is encountered when processing the reply,
|
||||||
* or if two decoders with the same content topic are passed.
|
* or if two decoders with the same content topic are passed.
|
||||||
*/
|
*/
|
||||||
async *queryGenerator<T extends Message>(
|
async *queryGenerator<T extends DecodedMessage>(
|
||||||
decoders: Decoder<T>[],
|
decoders: Decoder<T>[],
|
||||||
options?: QueryOptions
|
options?: QueryOptions
|
||||||
): AsyncGenerator<Promise<T | undefined>[]> {
|
): AsyncGenerator<Promise<T | undefined>[]> {
|
||||||
|
@ -266,7 +266,7 @@ export class WakuStore {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
async function* paginate<T extends Message>(
|
async function* paginate<T extends DecodedMessage>(
|
||||||
connection: Connection,
|
connection: Connection,
|
||||||
protocol: string,
|
protocol: string,
|
||||||
queryOpts: Params,
|
queryOpts: Params,
|
||||||
|
|
|
@ -28,7 +28,7 @@ export type ProtocolOptions = {
|
||||||
export type Callback<T extends Message> = (msg: T) => void | Promise<void>;
|
export type Callback<T extends Message> = (msg: T) => void | Promise<void>;
|
||||||
|
|
||||||
export interface Filter extends PointToPointProtocol {
|
export interface Filter extends PointToPointProtocol {
|
||||||
subscribe: <T extends Message>(
|
subscribe: <T extends DecodedMessage>(
|
||||||
decoders: Decoder<T>[],
|
decoders: Decoder<T>[],
|
||||||
callback: Callback<T>,
|
callback: Callback<T>,
|
||||||
opts?: ProtocolOptions
|
opts?: ProtocolOptions
|
||||||
|
@ -38,7 +38,7 @@ export interface Filter extends PointToPointProtocol {
|
||||||
export interface LightPush extends PointToPointProtocol {
|
export interface LightPush extends PointToPointProtocol {
|
||||||
push: (
|
push: (
|
||||||
encoder: Encoder,
|
encoder: Encoder,
|
||||||
message: Partial<Message>,
|
message: Message,
|
||||||
opts?: ProtocolOptions
|
opts?: ProtocolOptions
|
||||||
) => Promise<SendResult>;
|
) => Promise<SendResult>;
|
||||||
}
|
}
|
||||||
|
@ -76,27 +76,27 @@ export type StoreQueryOptions = {
|
||||||
} & ProtocolOptions;
|
} & ProtocolOptions;
|
||||||
|
|
||||||
export interface Store extends PointToPointProtocol {
|
export interface Store extends PointToPointProtocol {
|
||||||
queryOrderedCallback: <T extends Message>(
|
queryOrderedCallback: <T extends DecodedMessage>(
|
||||||
decoders: Decoder<T>[],
|
decoders: Decoder<T>[],
|
||||||
callback: (message: T) => Promise<void | boolean> | boolean | void,
|
callback: (message: T) => Promise<void | boolean> | boolean | void,
|
||||||
options?: StoreQueryOptions
|
options?: StoreQueryOptions
|
||||||
) => Promise<void>;
|
) => Promise<void>;
|
||||||
queryCallbackOnPromise: <T extends Message>(
|
queryCallbackOnPromise: <T extends DecodedMessage>(
|
||||||
decoders: Decoder<T>[],
|
decoders: Decoder<T>[],
|
||||||
callback: (
|
callback: (
|
||||||
message: Promise<T | undefined>
|
message: Promise<T | undefined>
|
||||||
) => Promise<void | boolean> | boolean | void,
|
) => Promise<void | boolean> | boolean | void,
|
||||||
options?: StoreQueryOptions
|
options?: StoreQueryOptions
|
||||||
) => Promise<void>;
|
) => Promise<void>;
|
||||||
queryGenerator: <T extends Message>(
|
queryGenerator: <T extends DecodedMessage>(
|
||||||
decoders: Decoder<T>[],
|
decoders: Decoder<T>[],
|
||||||
options?: StoreQueryOptions
|
options?: StoreQueryOptions
|
||||||
) => AsyncGenerator<Promise<T | undefined>[]>;
|
) => AsyncGenerator<Promise<T | undefined>[]>;
|
||||||
}
|
}
|
||||||
|
|
||||||
export interface Relay extends GossipSub {
|
export interface Relay extends GossipSub {
|
||||||
send: (encoder: Encoder, message: Partial<Message>) => Promise<SendResult>;
|
send: (encoder: Encoder, message: Message) => Promise<SendResult>;
|
||||||
addObserver: <T extends Message>(
|
addObserver: <T extends DecodedMessage>(
|
||||||
decoder: Decoder<T>,
|
decoder: Decoder<T>,
|
||||||
callback: Callback<T>
|
callback: Callback<T>
|
||||||
) => () => void;
|
) => () => void;
|
||||||
|
@ -155,6 +155,10 @@ export interface RateLimitProof {
|
||||||
rlnIdentifier: Uint8Array;
|
rlnIdentifier: Uint8Array;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Interface matching the protobuf library.
|
||||||
|
* Field types matches the protobuf type over the wire
|
||||||
|
*/
|
||||||
export interface ProtoMessage {
|
export interface ProtoMessage {
|
||||||
payload: Uint8Array | undefined;
|
payload: Uint8Array | undefined;
|
||||||
contentTopic: string | undefined;
|
contentTopic: string | undefined;
|
||||||
|
@ -163,20 +167,29 @@ export interface ProtoMessage {
|
||||||
rateLimitProof: RateLimitProof | undefined;
|
rateLimitProof: RateLimitProof | undefined;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Interface for messages to encode and send.
|
||||||
|
*/
|
||||||
export interface Message {
|
export interface Message {
|
||||||
|
payload?: Uint8Array;
|
||||||
|
timestamp?: Date;
|
||||||
|
rateLimitProof?: RateLimitProof;
|
||||||
|
}
|
||||||
|
|
||||||
|
export interface Encoder {
|
||||||
|
contentTopic: string;
|
||||||
|
toWire: (message: Message) => Promise<Uint8Array | undefined>;
|
||||||
|
toProtoObj: (message: Message) => Promise<ProtoMessage | undefined>;
|
||||||
|
}
|
||||||
|
|
||||||
|
export interface DecodedMessage {
|
||||||
payload: Uint8Array | undefined;
|
payload: Uint8Array | undefined;
|
||||||
contentTopic: string | undefined;
|
contentTopic: string | undefined;
|
||||||
timestamp: Date | undefined;
|
timestamp: Date | undefined;
|
||||||
rateLimitProof: RateLimitProof | undefined;
|
rateLimitProof: RateLimitProof | undefined;
|
||||||
}
|
}
|
||||||
|
|
||||||
export interface Encoder {
|
export interface Decoder<T extends DecodedMessage> {
|
||||||
contentTopic: string;
|
|
||||||
toWire: (message: Partial<Message>) => Promise<Uint8Array | undefined>;
|
|
||||||
toProtoObj: (message: Partial<Message>) => Promise<ProtoMessage | undefined>;
|
|
||||||
}
|
|
||||||
|
|
||||||
export interface Decoder<T extends Message> {
|
|
||||||
contentTopic: string;
|
contentTopic: string;
|
||||||
fromWireToProtoObj: (bytes: Uint8Array) => Promise<ProtoMessage | undefined>;
|
fromWireToProtoObj: (bytes: Uint8Array) => Promise<ProtoMessage | undefined>;
|
||||||
fromProtoObj: (proto: ProtoMessage) => Promise<T | undefined>;
|
fromProtoObj: (proto: ProtoMessage) => Promise<T | undefined>;
|
||||||
|
|
|
@ -5,7 +5,13 @@ import {
|
||||||
MessageV0,
|
MessageV0,
|
||||||
proto,
|
proto,
|
||||||
} from "@waku/core/lib/waku_message/version_0";
|
} from "@waku/core/lib/waku_message/version_0";
|
||||||
import type { Decoder, Encoder, Message, ProtoMessage } from "@waku/interfaces";
|
import type {
|
||||||
|
DecodedMessage,
|
||||||
|
Decoder,
|
||||||
|
Encoder,
|
||||||
|
Message,
|
||||||
|
ProtoMessage,
|
||||||
|
} from "@waku/interfaces";
|
||||||
import debug from "debug";
|
import debug from "debug";
|
||||||
|
|
||||||
import { Symmetric } from "./constants.js";
|
import { Symmetric } from "./constants.js";
|
||||||
|
@ -38,7 +44,7 @@ export type Signature = {
|
||||||
publicKey: Uint8Array | undefined;
|
publicKey: Uint8Array | undefined;
|
||||||
};
|
};
|
||||||
|
|
||||||
export class MessageV1 extends MessageV0 implements Message {
|
export class MessageV1 extends MessageV0 implements DecodedMessage {
|
||||||
private readonly _decodedPayload: Uint8Array;
|
private readonly _decodedPayload: Uint8Array;
|
||||||
|
|
||||||
constructor(
|
constructor(
|
||||||
|
|
|
@ -46,7 +46,7 @@
|
||||||
"check:prettier": "prettier . --list-different",
|
"check:prettier": "prettier . --list-different",
|
||||||
"check:lint": "eslint src tests --ext .ts",
|
"check:lint": "eslint src tests --ext .ts",
|
||||||
"check:spelling": "cspell \"{README.md,{tests,src}/**/*.ts}\"",
|
"check:spelling": "cspell \"{README.md,{tests,src}/**/*.ts}\"",
|
||||||
"check:tsc": "tsc -p tsconfig.json",
|
"check:tsc": "tsc -p tsconfig.dev.json",
|
||||||
"test": "run-s test:*",
|
"test": "run-s test:*",
|
||||||
"test:node": "TS_NODE_PROJECT=./tsconfig.json mocha",
|
"test:node": "TS_NODE_PROJECT=./tsconfig.json mocha",
|
||||||
"reset-hard": "git clean -dfx -e .idea && git reset --hard && npm i && npm run build"
|
"reset-hard": "git clean -dfx -e .idea && git reset --hard && npm i && npm run build"
|
||||||
|
|
|
@ -2,7 +2,7 @@ import { bytesToUtf8, utf8ToBytes } from "@waku/byte-utils";
|
||||||
import { waitForRemotePeer } from "@waku/core/lib/wait_for_remote_peer";
|
import { waitForRemotePeer } from "@waku/core/lib/wait_for_remote_peer";
|
||||||
import { DecoderV0, EncoderV0 } from "@waku/core/lib/waku_message/version_0";
|
import { DecoderV0, EncoderV0 } from "@waku/core/lib/waku_message/version_0";
|
||||||
import { createFullNode } from "@waku/create";
|
import { createFullNode } from "@waku/create";
|
||||||
import type { Message, WakuFull } from "@waku/interfaces";
|
import type { DecodedMessage, WakuFull } from "@waku/interfaces";
|
||||||
import { Protocols } from "@waku/interfaces";
|
import { Protocols } from "@waku/interfaces";
|
||||||
import { expect } from "chai";
|
import { expect } from "chai";
|
||||||
import debug from "debug";
|
import debug from "debug";
|
||||||
|
@ -47,7 +47,7 @@ describe("Waku Filter", () => {
|
||||||
const messageText = "Filtering works!";
|
const messageText = "Filtering works!";
|
||||||
const message = { payload: utf8ToBytes(messageText) };
|
const message = { payload: utf8ToBytes(messageText) };
|
||||||
|
|
||||||
const callback = (msg: Message): void => {
|
const callback = (msg: DecodedMessage): void => {
|
||||||
log("Got a message");
|
log("Got a message");
|
||||||
messageCount++;
|
messageCount++;
|
||||||
expect(msg.contentTopic).to.eq(TestContentTopic);
|
expect(msg.contentTopic).to.eq(TestContentTopic);
|
||||||
|
@ -71,7 +71,7 @@ describe("Waku Filter", () => {
|
||||||
this.timeout(10000);
|
this.timeout(10000);
|
||||||
|
|
||||||
let messageCount = 0;
|
let messageCount = 0;
|
||||||
const callback = (msg: Message): void => {
|
const callback = (msg: DecodedMessage): void => {
|
||||||
messageCount++;
|
messageCount++;
|
||||||
expect(msg.contentTopic).to.eq(TestContentTopic);
|
expect(msg.contentTopic).to.eq(TestContentTopic);
|
||||||
};
|
};
|
||||||
|
|
|
@ -8,7 +8,7 @@ import {
|
||||||
MessageV0,
|
MessageV0,
|
||||||
} from "@waku/core/lib/waku_message/version_0";
|
} from "@waku/core/lib/waku_message/version_0";
|
||||||
import { createPrivacyNode } from "@waku/create";
|
import { createPrivacyNode } from "@waku/create";
|
||||||
import type { Message, WakuPrivacy } from "@waku/interfaces";
|
import type { DecodedMessage, WakuPrivacy } from "@waku/interfaces";
|
||||||
import { Protocols } from "@waku/interfaces";
|
import { Protocols } from "@waku/interfaces";
|
||||||
import {
|
import {
|
||||||
AsymDecoder,
|
AsymDecoder,
|
||||||
|
@ -118,9 +118,11 @@ describe("Waku Relay [node only]", () => {
|
||||||
timestamp: messageTimestamp,
|
timestamp: messageTimestamp,
|
||||||
};
|
};
|
||||||
|
|
||||||
const receivedMsgPromise: Promise<Message> = new Promise((resolve) => {
|
const receivedMsgPromise: Promise<DecodedMessage> = new Promise(
|
||||||
waku2.relay.addObserver(TestDecoder, resolve);
|
(resolve) => {
|
||||||
});
|
waku2.relay.addObserver(TestDecoder, resolve);
|
||||||
|
}
|
||||||
|
);
|
||||||
|
|
||||||
await waku1.relay.send(TestEncoder, message);
|
await waku1.relay.send(TestEncoder, message);
|
||||||
|
|
||||||
|
@ -148,12 +150,12 @@ describe("Waku Relay [node only]", () => {
|
||||||
const fooDecoder = new DecoderV0(fooContentTopic);
|
const fooDecoder = new DecoderV0(fooContentTopic);
|
||||||
const barDecoder = new DecoderV0(barContentTopic);
|
const barDecoder = new DecoderV0(barContentTopic);
|
||||||
|
|
||||||
const fooMessages: Message[] = [];
|
const fooMessages: DecodedMessage[] = [];
|
||||||
waku2.relay.addObserver(fooDecoder, (msg) => {
|
waku2.relay.addObserver(fooDecoder, (msg) => {
|
||||||
fooMessages.push(msg);
|
fooMessages.push(msg);
|
||||||
});
|
});
|
||||||
|
|
||||||
const barMessages: Message[] = [];
|
const barMessages: DecodedMessage[] = [];
|
||||||
waku2.relay.addObserver(barDecoder, (msg) => {
|
waku2.relay.addObserver(barDecoder, (msg) => {
|
||||||
barMessages.push(msg);
|
barMessages.push(msg);
|
||||||
});
|
});
|
||||||
|
@ -197,7 +199,7 @@ describe("Waku Relay [node only]", () => {
|
||||||
const asymDecoder = new AsymDecoder(asymTopic, privateKey);
|
const asymDecoder = new AsymDecoder(asymTopic, privateKey);
|
||||||
const symDecoder = new SymDecoder(symTopic, symKey);
|
const symDecoder = new SymDecoder(symTopic, symKey);
|
||||||
|
|
||||||
const msgs: Message[] = [];
|
const msgs: DecodedMessage[] = [];
|
||||||
waku2.relay.addObserver(asymDecoder, (wakuMsg) => {
|
waku2.relay.addObserver(asymDecoder, (wakuMsg) => {
|
||||||
msgs.push(wakuMsg);
|
msgs.push(wakuMsg);
|
||||||
});
|
});
|
||||||
|
@ -228,7 +230,7 @@ describe("Waku Relay [node only]", () => {
|
||||||
const contentTopic = "added-then-deleted-observer";
|
const contentTopic = "added-then-deleted-observer";
|
||||||
|
|
||||||
// The promise **fails** if we receive a message on this observer.
|
// The promise **fails** if we receive a message on this observer.
|
||||||
const receivedMsgPromise: Promise<Message> = new Promise(
|
const receivedMsgPromise: Promise<DecodedMessage> = new Promise(
|
||||||
(resolve, reject) => {
|
(resolve, reject) => {
|
||||||
const deleteObserver = waku2.relay.addObserver(
|
const deleteObserver = waku2.relay.addObserver(
|
||||||
new DecoderV0(contentTopic),
|
new DecoderV0(contentTopic),
|
||||||
|
@ -304,7 +306,7 @@ describe("Waku Relay [node only]", () => {
|
||||||
|
|
||||||
const messageText = "Communicating using a custom pubsub topic";
|
const messageText = "Communicating using a custom pubsub topic";
|
||||||
|
|
||||||
const waku2ReceivedMsgPromise: Promise<Message> = new Promise(
|
const waku2ReceivedMsgPromise: Promise<DecodedMessage> = new Promise(
|
||||||
(resolve) => {
|
(resolve) => {
|
||||||
waku2.relay.addObserver(TestDecoder, resolve);
|
waku2.relay.addObserver(TestDecoder, resolve);
|
||||||
}
|
}
|
||||||
|
@ -312,7 +314,7 @@ describe("Waku Relay [node only]", () => {
|
||||||
|
|
||||||
// The promise **fails** if we receive a message on the default
|
// The promise **fails** if we receive a message on the default
|
||||||
// pubsub topic.
|
// pubsub topic.
|
||||||
const waku3NoMsgPromise: Promise<Message> = new Promise(
|
const waku3NoMsgPromise: Promise<DecodedMessage> = new Promise(
|
||||||
(resolve, reject) => {
|
(resolve, reject) => {
|
||||||
waku3.relay.addObserver(TestDecoder, reject);
|
waku3.relay.addObserver(TestDecoder, reject);
|
||||||
setTimeout(resolve, 1000);
|
setTimeout(resolve, 1000);
|
||||||
|
@ -466,7 +468,7 @@ describe("Waku Relay [node only]", () => {
|
||||||
const msgStr = "Hello there!";
|
const msgStr = "Hello there!";
|
||||||
const message = { payload: utf8ToBytes(msgStr) };
|
const message = { payload: utf8ToBytes(msgStr) };
|
||||||
|
|
||||||
const waku2ReceivedMsgPromise: Promise<Message> = new Promise(
|
const waku2ReceivedMsgPromise: Promise<DecodedMessage> = new Promise(
|
||||||
(resolve) => {
|
(resolve) => {
|
||||||
waku2.relay.addObserver(TestDecoder, resolve);
|
waku2.relay.addObserver(TestDecoder, resolve);
|
||||||
}
|
}
|
||||||
|
|
|
@ -3,7 +3,12 @@ import { bytesToUtf8, utf8ToBytes } from "@waku/byte-utils";
|
||||||
import { PeerDiscoveryStaticPeers } from "@waku/core/lib/peer_discovery_static_list";
|
import { PeerDiscoveryStaticPeers } from "@waku/core/lib/peer_discovery_static_list";
|
||||||
import { waitForRemotePeer } from "@waku/core/lib/wait_for_remote_peer";
|
import { waitForRemotePeer } from "@waku/core/lib/wait_for_remote_peer";
|
||||||
import { createLightNode, createPrivacyNode } from "@waku/create";
|
import { createLightNode, createPrivacyNode } from "@waku/create";
|
||||||
import type { Message, Waku, WakuLight, WakuPrivacy } from "@waku/interfaces";
|
import type {
|
||||||
|
DecodedMessage,
|
||||||
|
Waku,
|
||||||
|
WakuLight,
|
||||||
|
WakuPrivacy,
|
||||||
|
} from "@waku/interfaces";
|
||||||
import { Protocols } from "@waku/interfaces";
|
import { Protocols } from "@waku/interfaces";
|
||||||
import {
|
import {
|
||||||
generateSymmetricKey,
|
generateSymmetricKey,
|
||||||
|
@ -175,9 +180,11 @@ describe("Decryption Keys", () => {
|
||||||
timestamp: messageTimestamp,
|
timestamp: messageTimestamp,
|
||||||
};
|
};
|
||||||
|
|
||||||
const receivedMsgPromise: Promise<Message> = new Promise((resolve) => {
|
const receivedMsgPromise: Promise<DecodedMessage> = new Promise(
|
||||||
waku2.relay.addObserver(decoder, resolve);
|
(resolve) => {
|
||||||
});
|
waku2.relay.addObserver(decoder, resolve);
|
||||||
|
}
|
||||||
|
);
|
||||||
|
|
||||||
await waku1.relay.send(encoder, message);
|
await waku1.relay.send(encoder, message);
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue