Merge pull request #1203 from waku-org/chore/remove-any

This commit is contained in:
fryorcraken.eth 2023-02-28 12:08:48 +11:00 committed by GitHub
commit 3c7c5d290c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 64 additions and 94 deletions

View File

@ -6,7 +6,6 @@ import type {
IDecodedMessage,
IDecoder,
IFilter,
IMessage,
ProtocolCreateOptions,
ProtocolOptions,
} from "@waku/interfaces";
@ -30,6 +29,12 @@ export const FilterCodec = "/vac/waku/filter/2.0.0-beta1";
const log = debug("waku:filter");
export type UnsubscribeFunction = () => Promise<void>;
export type RequestID = string;
type Subscription<T extends IDecodedMessage> = {
decoders: IDecoder<T>[];
callback: Callback<T>;
};
/**
* Implements client side of the [Waku v2 Filter protocol](https://rfc.vac.dev/spec/12/).
@ -40,17 +45,12 @@ export type UnsubscribeFunction = () => Promise<void>;
*/
class Filter extends BaseProtocol implements IFilter {
options: ProtocolCreateOptions;
private subscriptions: Map<string, Callback<any>>;
private decoders: Map<
string, // content topic
Set<IDecoder<any>>
>;
private subscriptions: Map<RequestID, unknown>;
constructor(public libp2p: Libp2p, options?: ProtocolCreateOptions) {
super(FilterCodec, libp2p);
this.options = options ?? {};
this.subscriptions = new Map();
this.decoders = new Map();
this.libp2p
.handle(this.multicodec, this.onRequest.bind(this))
.catch((e) => log("Failed to register filter protocol", e));
@ -69,8 +69,7 @@ class Filter extends BaseProtocol implements IFilter {
): Promise<UnsubscribeFunction> {
const { pubSubTopic = DefaultPubSubTopic } = this.options;
const groupedDecoders = groupByContentTopic(decoders);
const contentTopics = Array.from(groupedDecoders.keys());
const contentTopics = Array.from(groupByContentTopic(decoders).keys());
const contentFilters = contentTopics.map((contentTopic) => ({
contentTopic,
@ -109,13 +108,11 @@ class Filter extends BaseProtocol implements IFilter {
throw e;
}
this.addDecoders(groupedDecoders);
this.addCallback(requestId, callback);
this.subscriptions.set(requestId, { callback, decoders });
return async () => {
await this.unsubscribe(pubSubTopic, contentFilters, requestId, peer);
this.deleteDecoders(groupedDecoders);
this.deleteCallback(requestId);
this.subscriptions.delete(requestId);
};
}
@ -142,13 +139,21 @@ class Filter extends BaseProtocol implements IFilter {
}
}
private async pushMessages(
private async pushMessages<T extends IDecodedMessage>(
requestId: string,
messages: WakuMessageProto[]
): Promise<void> {
const callback = this.subscriptions.get(requestId);
if (!callback) {
log(`No callback registered for request ID ${requestId}`);
const subscription = this.subscriptions.get(requestId) as
| Subscription<T>
| undefined;
if (!subscription) {
log(`No subscription locally registered for request ID ${requestId}`);
return;
}
const { decoders, callback } = subscription;
if (!decoders || !decoders.length) {
log(`No decoder registered for request ID ${requestId}`);
return;
}
@ -159,18 +164,12 @@ class Filter extends BaseProtocol implements IFilter {
return;
}
const decoders = this.decoders.get(contentTopic);
if (!decoders) {
log("No decoder for", contentTopic);
return;
}
let msg: IMessage | undefined;
let didDecodeMsg = false;
// We don't want to wait for decoding failure, just attempt to decode
// all messages and do the call back on the one that works
// noinspection ES6MissingAwait
decoders.forEach(async (dec) => {
if (msg) return;
decoders.forEach(async (dec: IDecoder<T>) => {
if (didDecodeMsg) return;
const decoded = await dec.fromProtoObj(toProtoMessage(protoMessage));
if (!decoded) {
log("Not able to decode message");
@ -178,46 +177,12 @@ class Filter extends BaseProtocol implements IFilter {
}
// This is just to prevent more decoding attempt
// TODO: Could be better if we were to abort promises
msg = decoded;
didDecodeMsg = Boolean(decoded);
await callback(decoded);
});
}
}
private addCallback(requestId: string, callback: Callback<any>): void {
this.subscriptions.set(requestId, callback);
}
private deleteCallback(requestId: string): void {
this.subscriptions.delete(requestId);
}
private addDecoders<T extends IDecodedMessage>(
decoders: Map<string, Array<IDecoder<T>>>
): void {
decoders.forEach((decoders, contentTopic) => {
const currDecs = this.decoders.get(contentTopic);
if (!currDecs) {
this.decoders.set(contentTopic, new Set(decoders));
} else {
this.decoders.set(contentTopic, new Set([...currDecs, ...decoders]));
}
});
}
private deleteDecoders<T extends IDecodedMessage>(
decoders: Map<string, Array<IDecoder<T>>>
): void {
decoders.forEach((decoders, contentTopic) => {
const currDecs = this.decoders.get(contentTopic);
if (currDecs) {
decoders.forEach((dec) => {
currDecs.delete(dec);
});
}
});
}
private async unsubscribe(
topic: string,
contentFilters: ContentFilter[],

View File

@ -1,5 +1,5 @@
export function groupByContentTopic<T extends { contentTopic: string }>(
values: T[]
values: readonly T[]
): Map<string, Array<T>> {
const groupedDecoders = new Map();
values.forEach((value) => {

View File

@ -32,6 +32,7 @@ export type Observer<T extends IDecodedMessage> = {
};
export type RelayCreateOptions = ProtocolCreateOptions & GossipsubOpts;
export type ContentTopic = string;
/**
* Implements the [Waku v2 Relay protocol](https://rfc.vac.dev/spec/11/).
@ -40,7 +41,7 @@ export type RelayCreateOptions = ProtocolCreateOptions & GossipsubOpts;
* @implements {require('libp2p-interfaces/src/pubsub')}
*/
class Relay extends GossipSub implements IRelay {
private pubSubTopic: string;
private readonly pubSubTopic: string;
defaultDecoder: IDecoder<IDecodedMessage>;
public static multicodec: string = constants.RelayCodecs[0];
@ -48,7 +49,7 @@ class Relay extends GossipSub implements IRelay {
* observers called when receiving new message.
* Observers under key `""` are always called.
*/
public observers: Map<string, Set<Observer<any>>>;
public observers: Map<ContentTopic, Set<unknown>>;
constructor(
components: GossipSubComponents,
@ -119,6 +120,38 @@ class Relay extends GossipSub implements IRelay {
};
}
private async processIncomingMessage<T extends IDecodedMessage>(
bytes: Uint8Array
): Promise<void> {
const topicOnlyMsg = await this.defaultDecoder.fromWireToProtoObj(bytes);
if (!topicOnlyMsg || !topicOnlyMsg.contentTopic) {
log("Message does not have a content topic, skipping");
return;
}
const observers = this.observers.get(topicOnlyMsg.contentTopic) as Set<
Observer<T>
>;
if (!observers) {
return;
}
await Promise.all(
Array.from(observers).map(async ({ decoder, callback }) => {
const protoMsg = await decoder.fromWireToProtoObj(bytes);
if (!protoMsg) {
log("Internal error: message previously decoded failed on 2nd pass.");
return;
}
const msg = await decoder.fromProtoObj(protoMsg);
if (msg) {
callback(msg);
} else {
log("Failed to decode messages on", topicOnlyMsg.contentTopic);
}
})
);
}
/**
* Subscribe to a pubsub topic and start emitting Waku messages to observers.
*
@ -131,36 +164,8 @@ class Relay extends GossipSub implements IRelay {
if (event.detail.msg.topic !== pubSubTopic) return;
log(`Message received on ${pubSubTopic}`);
const topicOnlyMsg = await this.defaultDecoder.fromWireToProtoObj(
event.detail.msg.data
);
if (!topicOnlyMsg || !topicOnlyMsg.contentTopic) {
log("Message does not have a content topic, skipping");
return;
}
const observers = this.observers.get(topicOnlyMsg.contentTopic);
if (!observers) {
return;
}
await Promise.all(
Array.from(observers).map(async ({ decoder, callback }) => {
const protoMsg = await decoder.fromWireToProtoObj(
event.detail.msg.data
);
if (!protoMsg) {
log(
"Internal error: message previously decoded failed on 2nd pass."
);
return;
}
const msg = await decoder.fromProtoObj(protoMsg);
if (msg) {
callback(msg);
} else {
log("Failed to decode messages on", topicOnlyMsg.contentTopic);
}
})
this.processIncomingMessage(event.detail.msg.data).catch((e) =>
log("Failed to process incoming message", e)
);
}
);