feat: enable custom Message type on Decoder

This enables the type passed on the callback functions to match the
decoder's so the consumer can access implementation specific fields.
This commit is contained in:
fryorcraken.eth 2022-09-19 16:06:03 +10:00
parent 9ff602da7e
commit 52005f8963
No known key found for this signature in database
GPG Key ID: A82ED75A8DFC50A4
7 changed files with 36 additions and 26 deletions

View File

@ -70,8 +70,8 @@ export interface Encoder {
encodeProto: (message: Message) => Promise<ProtoMessage | undefined>;
}
export interface Decoder {
export interface Decoder<T extends Message> {
contentTopic: string;
decodeProto: (bytes: Uint8Array) => Promise<ProtoMessage | undefined>;
decode: (proto: ProtoMessage) => Promise<Message | undefined>;
decode: (proto: ProtoMessage) => Promise<T | undefined>;
}

View File

@ -63,9 +63,9 @@ export type UnsubscribeFunction = () => Promise<void>;
export class WakuFilter {
pubSubTopic: string;
private subscriptions: Map<string, FilterCallback>;
public decoders: Map<
private decoders: Map<
string, // content topic
Set<Decoder>
Set<Decoder<any>>
>;
constructor(public libp2p: Libp2p, options?: CreateOptions) {
@ -83,8 +83,8 @@ export class WakuFilter {
* @param opts The FilterSubscriptionOpts used to narrow which messages are returned, and which peer to connect to.
* @returns Unsubscribe function that can be used to end the subscription.
*/
async subscribe(
decoders: Decoder[],
async subscribe<T extends Message>(
decoders: Decoder<T>[],
callback: FilterCallback,
opts?: FilterSubscriptionOpts
): Promise<UnsubscribeFunction> {
@ -217,7 +217,9 @@ export class WakuFilter {
this.subscriptions.delete(requestId);
}
private addDecoders(decoders: Map<string, Array<Decoder>>): void {
private addDecoders<T extends Message>(
decoders: Map<string, Array<Decoder<T>>>
): void {
decoders.forEach((decoders, contentTopic) => {
const currDecs = this.decoders.get(contentTopic);
if (!currDecs) {
@ -228,7 +230,9 @@ export class WakuFilter {
});
}
private deleteDecoders(decoders: Map<string, Array<Decoder>>): void {
private deleteDecoders<T extends Message>(
decoders: Map<string, Array<Decoder<T>>>
): void {
decoders.forEach((decoders, contentTopic) => {
const currDecs = this.decoders.get(contentTopic);
if (currDecs) {

View File

@ -75,7 +75,7 @@ export class EncoderV0 implements Encoder {
}
}
export class DecoderV0 implements Decoder {
export class DecoderV0 implements Decoder<MessageV0> {
constructor(public contentTopic: string) {}
decodeProto(bytes: Uint8Array): Promise<ProtoMessage | undefined> {
@ -84,7 +84,7 @@ export class DecoderV0 implements Decoder {
return Promise.resolve(protoMessage);
}
async decode(proto: ProtoMessage): Promise<Message | undefined> {
async decode(proto: ProtoMessage): Promise<MessageV0 | undefined> {
// https://github.com/status-im/js-waku/issues/921
if (proto.version === undefined) {
proto.version = 0;

View File

@ -110,7 +110,7 @@ export class SymEncoder implements Encoder {
}
}
export class AsymDecoder extends DecoderV0 implements Decoder {
export class AsymDecoder extends DecoderV0 implements Decoder<MessageV1> {
constructor(contentTopic: string, private privateKey: Uint8Array) {
super(contentTopic);
}
@ -166,7 +166,7 @@ export class AsymDecoder extends DecoderV0 implements Decoder {
}
}
export class SymDecoder extends DecoderV0 implements Decoder {
export class SymDecoder extends DecoderV0 implements Decoder<MessageV1> {
constructor(contentTopic: string, private symKey: Uint8Array) {
super(contentTopic);
}

View File

@ -381,9 +381,7 @@ describe("Waku Relay [node only]", () => {
const messageText = "Here is another message.";
const receivedMsgPromise: Promise<MessageV0> = new Promise((resolve) => {
waku.relay.addObserver(TestDecoder, (msg) =>
resolve(msg as unknown as MessageV0)
);
waku.relay.addObserver<MessageV0>(TestDecoder, (msg) => resolve(msg));
});
await nwaku.sendMessage(

View File

@ -20,7 +20,12 @@ import * as constants from "./constants";
const log = debug("waku:relay");
export type Callback = (msg: Message) => void;
export type Callback<T extends Message> = (msg: T) => void;
export type Observer<T extends Message> = {
decoder: Decoder<T>;
callback: Callback<T>;
};
export type CreateOptions = {
/**
@ -53,7 +58,7 @@ export class WakuRelay extends GossipSub {
* observers called when receiving new message.
* Observers under key `""` are always called.
*/
public observers: Map<string, Set<{ decoder: Decoder; callback: Callback }>>;
public observers: Map<string, Set<Observer<any>>>;
constructor(options?: Partial<CreateOptions>) {
options = Object.assign(options ?? {}, {
@ -101,7 +106,10 @@ export class WakuRelay extends GossipSub {
*
* @returns Function to delete the observer
*/
addObserver(decoder: Decoder, callback: Callback): () => void {
addObserver<T extends Message>(
decoder: Decoder<T>,
callback: Callback<T>
): () => void {
const observer = {
decoder,
callback,

View File

@ -103,8 +103,8 @@ export class WakuStore {
* @throws If not able to reach a Waku Store peer to query
* or if an error is encountered when processing the reply.
*/
async queryOrderedCallback(
decoder: Decoder,
async queryOrderedCallback<T extends Message>(
decoder: Decoder<T>,
callback: (message: Message) => Promise<void | boolean> | boolean | void,
options?: QueryOptions
): Promise<void> {
@ -151,8 +151,8 @@ export class WakuStore {
* @throws If not able to reach a Waku Store peer to query
* or if an error is encountered when processing the reply.
*/
async queryCallbackOnPromise(
decoder: Decoder,
async queryCallbackOnPromise<T extends Message>(
decoder: Decoder<T>,
callback: (
message: Promise<Message | undefined>
) => Promise<void | boolean> | boolean | void,
@ -188,8 +188,8 @@ export class WakuStore {
* @throws If not able to reach a Waku Store peer to query
* or if an error is encountered when processing the reply.
*/
async *queryGenerator(
decoder: Decoder,
async *queryGenerator<T extends Message>(
decoder: Decoder<T>,
options?: QueryOptions
): AsyncGenerator<Promise<Message | undefined>[]> {
let startTime, endTime;
@ -256,11 +256,11 @@ export class WakuStore {
}
}
async function* paginate(
async function* paginate<T extends Message>(
connection: Connection,
protocol: string,
queryOpts: Params,
decoder: Decoder
decoder: Decoder<T>
): AsyncGenerator<Promise<Message | undefined>[]> {
let cursor = undefined;
while (true) {