mirror of
https://github.com/status-im/js-waku.git
synced 2025-02-23 18:38:11 +00:00
chore: remove usage of any
This commit is contained in:
parent
6c37ee5f19
commit
60e866f00d
@ -29,6 +29,12 @@ export const FilterCodec = "/vac/waku/filter/2.0.0-beta1";
|
|||||||
const log = debug("waku:filter");
|
const log = debug("waku:filter");
|
||||||
|
|
||||||
export type UnsubscribeFunction = () => Promise<void>;
|
export type UnsubscribeFunction = () => Promise<void>;
|
||||||
|
export type RequestID = string;
|
||||||
|
|
||||||
|
type Subscription<T extends IDecodedMessage> = {
|
||||||
|
decoders: IDecoder<T>[];
|
||||||
|
callback: Callback<T>;
|
||||||
|
};
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Implements client side of the [Waku v2 Filter protocol](https://rfc.vac.dev/spec/12/).
|
* Implements client side of the [Waku v2 Filter protocol](https://rfc.vac.dev/spec/12/).
|
||||||
@ -39,17 +45,12 @@ export type UnsubscribeFunction = () => Promise<void>;
|
|||||||
*/
|
*/
|
||||||
class Filter extends BaseProtocol implements IFilter {
|
class Filter extends BaseProtocol implements IFilter {
|
||||||
options: ProtocolCreateOptions;
|
options: ProtocolCreateOptions;
|
||||||
private subscriptions: Map<string, Callback<any>>;
|
private subscriptions: Map<RequestID, unknown>;
|
||||||
private decoders: Map<
|
|
||||||
string, // content topic
|
|
||||||
Set<IDecoder<any>>
|
|
||||||
>;
|
|
||||||
|
|
||||||
constructor(public libp2p: Libp2p, options?: ProtocolCreateOptions) {
|
constructor(public libp2p: Libp2p, options?: ProtocolCreateOptions) {
|
||||||
super(FilterCodec, libp2p);
|
super(FilterCodec, libp2p);
|
||||||
this.options = options ?? {};
|
this.options = options ?? {};
|
||||||
this.subscriptions = new Map();
|
this.subscriptions = new Map();
|
||||||
this.decoders = new Map();
|
|
||||||
this.libp2p
|
this.libp2p
|
||||||
.handle(this.multicodec, this.onRequest.bind(this))
|
.handle(this.multicodec, this.onRequest.bind(this))
|
||||||
.catch((e) => log("Failed to register filter protocol", e));
|
.catch((e) => log("Failed to register filter protocol", e));
|
||||||
@ -68,8 +69,7 @@ class Filter extends BaseProtocol implements IFilter {
|
|||||||
): Promise<UnsubscribeFunction> {
|
): Promise<UnsubscribeFunction> {
|
||||||
const { pubSubTopic = DefaultPubSubTopic } = this.options;
|
const { pubSubTopic = DefaultPubSubTopic } = this.options;
|
||||||
|
|
||||||
const groupedDecoders = groupByContentTopic(decoders);
|
const contentTopics = Array.from(groupByContentTopic(decoders).keys());
|
||||||
const contentTopics = Array.from(groupedDecoders.keys());
|
|
||||||
|
|
||||||
const contentFilters = contentTopics.map((contentTopic) => ({
|
const contentFilters = contentTopics.map((contentTopic) => ({
|
||||||
contentTopic,
|
contentTopic,
|
||||||
@ -108,13 +108,11 @@ class Filter extends BaseProtocol implements IFilter {
|
|||||||
throw e;
|
throw e;
|
||||||
}
|
}
|
||||||
|
|
||||||
this.addDecoders(groupedDecoders);
|
this.subscriptions.set(requestId, { callback, decoders });
|
||||||
this.addCallback(requestId, callback);
|
|
||||||
|
|
||||||
return async () => {
|
return async () => {
|
||||||
await this.unsubscribe(pubSubTopic, contentFilters, requestId, peer);
|
await this.unsubscribe(pubSubTopic, contentFilters, requestId, peer);
|
||||||
this.deleteDecoders(groupedDecoders);
|
this.subscriptions.delete(requestId);
|
||||||
this.deleteCallback(requestId);
|
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -141,13 +139,21 @@ class Filter extends BaseProtocol implements IFilter {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private async pushMessages(
|
private async pushMessages<T extends IDecodedMessage>(
|
||||||
requestId: string,
|
requestId: string,
|
||||||
messages: WakuMessageProto[]
|
messages: WakuMessageProto[]
|
||||||
): Promise<void> {
|
): Promise<void> {
|
||||||
const callback = this.subscriptions.get(requestId);
|
const subscription = this.subscriptions.get(requestId) as
|
||||||
if (!callback) {
|
| Subscription<T>
|
||||||
log(`No callback registered for request ID ${requestId}`);
|
| undefined;
|
||||||
|
if (!subscription) {
|
||||||
|
log(`No subscription locally registered for request ID ${requestId}`);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
const { decoders, callback } = subscription;
|
||||||
|
|
||||||
|
if (!decoders || !decoders.length) {
|
||||||
|
log(`No decoder registered for request ID ${requestId}`);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -158,17 +164,11 @@ class Filter extends BaseProtocol implements IFilter {
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
const decoders = this.decoders.get(contentTopic);
|
|
||||||
if (!decoders) {
|
|
||||||
log("No decoder for", contentTopic);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
let didDecodeMsg = false;
|
let didDecodeMsg = false;
|
||||||
// We don't want to wait for decoding failure, just attempt to decode
|
// We don't want to wait for decoding failure, just attempt to decode
|
||||||
// all messages and do the call back on the one that works
|
// all messages and do the call back on the one that works
|
||||||
// noinspection ES6MissingAwait
|
// noinspection ES6MissingAwait
|
||||||
decoders.forEach(async (dec) => {
|
decoders.forEach(async (dec: IDecoder<T>) => {
|
||||||
if (didDecodeMsg) return;
|
if (didDecodeMsg) return;
|
||||||
const decoded = await dec.fromProtoObj(toProtoMessage(protoMessage));
|
const decoded = await dec.fromProtoObj(toProtoMessage(protoMessage));
|
||||||
if (!decoded) {
|
if (!decoded) {
|
||||||
@ -183,40 +183,6 @@ class Filter extends BaseProtocol implements IFilter {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private addCallback(requestId: string, callback: Callback<any>): void {
|
|
||||||
this.subscriptions.set(requestId, callback);
|
|
||||||
}
|
|
||||||
|
|
||||||
private deleteCallback(requestId: string): void {
|
|
||||||
this.subscriptions.delete(requestId);
|
|
||||||
}
|
|
||||||
|
|
||||||
private addDecoders<T extends IDecodedMessage>(
|
|
||||||
decoders: Map<string, Array<IDecoder<T>>>
|
|
||||||
): void {
|
|
||||||
decoders.forEach((decoders, contentTopic) => {
|
|
||||||
const currDecs = this.decoders.get(contentTopic);
|
|
||||||
if (!currDecs) {
|
|
||||||
this.decoders.set(contentTopic, new Set(decoders));
|
|
||||||
} else {
|
|
||||||
this.decoders.set(contentTopic, new Set([...currDecs, ...decoders]));
|
|
||||||
}
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
private deleteDecoders<T extends IDecodedMessage>(
|
|
||||||
decoders: Map<string, Array<IDecoder<T>>>
|
|
||||||
): void {
|
|
||||||
decoders.forEach((decoders, contentTopic) => {
|
|
||||||
const currDecs = this.decoders.get(contentTopic);
|
|
||||||
if (currDecs) {
|
|
||||||
decoders.forEach((dec) => {
|
|
||||||
currDecs.delete(dec);
|
|
||||||
});
|
|
||||||
}
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
private async unsubscribe(
|
private async unsubscribe(
|
||||||
topic: string,
|
topic: string,
|
||||||
contentFilters: ContentFilter[],
|
contentFilters: ContentFilter[],
|
||||||
|
@ -1,5 +1,5 @@
|
|||||||
export function groupByContentTopic<T extends { contentTopic: string }>(
|
export function groupByContentTopic<T extends { contentTopic: string }>(
|
||||||
values: T[]
|
values: readonly T[]
|
||||||
): Map<string, Array<T>> {
|
): Map<string, Array<T>> {
|
||||||
const groupedDecoders = new Map();
|
const groupedDecoders = new Map();
|
||||||
values.forEach((value) => {
|
values.forEach((value) => {
|
||||||
|
@ -32,6 +32,7 @@ export type Observer<T extends IDecodedMessage> = {
|
|||||||
};
|
};
|
||||||
|
|
||||||
export type RelayCreateOptions = ProtocolCreateOptions & GossipsubOpts;
|
export type RelayCreateOptions = ProtocolCreateOptions & GossipsubOpts;
|
||||||
|
export type ContentTopic = string;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Implements the [Waku v2 Relay protocol](https://rfc.vac.dev/spec/11/).
|
* Implements the [Waku v2 Relay protocol](https://rfc.vac.dev/spec/11/).
|
||||||
@ -40,7 +41,7 @@ export type RelayCreateOptions = ProtocolCreateOptions & GossipsubOpts;
|
|||||||
* @implements {require('libp2p-interfaces/src/pubsub')}
|
* @implements {require('libp2p-interfaces/src/pubsub')}
|
||||||
*/
|
*/
|
||||||
class Relay extends GossipSub implements IRelay {
|
class Relay extends GossipSub implements IRelay {
|
||||||
private pubSubTopic: string;
|
private readonly pubSubTopic: string;
|
||||||
defaultDecoder: IDecoder<IDecodedMessage>;
|
defaultDecoder: IDecoder<IDecodedMessage>;
|
||||||
public static multicodec: string = constants.RelayCodecs[0];
|
public static multicodec: string = constants.RelayCodecs[0];
|
||||||
|
|
||||||
@ -48,7 +49,7 @@ class Relay extends GossipSub implements IRelay {
|
|||||||
* observers called when receiving new message.
|
* observers called when receiving new message.
|
||||||
* Observers under key `""` are always called.
|
* Observers under key `""` are always called.
|
||||||
*/
|
*/
|
||||||
public observers: Map<string, Set<Observer<any>>>;
|
public observers: Map<ContentTopic, Set<unknown>>;
|
||||||
|
|
||||||
constructor(
|
constructor(
|
||||||
components: GossipSubComponents,
|
components: GossipSubComponents,
|
||||||
@ -119,6 +120,38 @@ class Relay extends GossipSub implements IRelay {
|
|||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private async processIncomingMessage<T extends IDecodedMessage>(
|
||||||
|
bytes: Uint8Array
|
||||||
|
): Promise<void> {
|
||||||
|
const topicOnlyMsg = await this.defaultDecoder.fromWireToProtoObj(bytes);
|
||||||
|
if (!topicOnlyMsg || !topicOnlyMsg.contentTopic) {
|
||||||
|
log("Message does not have a content topic, skipping");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
const observers = this.observers.get(topicOnlyMsg.contentTopic) as Set<
|
||||||
|
Observer<T>
|
||||||
|
>;
|
||||||
|
if (!observers) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
await Promise.all(
|
||||||
|
Array.from(observers).map(async ({ decoder, callback }) => {
|
||||||
|
const protoMsg = await decoder.fromWireToProtoObj(bytes);
|
||||||
|
if (!protoMsg) {
|
||||||
|
log("Internal error: message previously decoded failed on 2nd pass.");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
const msg = await decoder.fromProtoObj(protoMsg);
|
||||||
|
if (msg) {
|
||||||
|
callback(msg);
|
||||||
|
} else {
|
||||||
|
log("Failed to decode messages on", topicOnlyMsg.contentTopic);
|
||||||
|
}
|
||||||
|
})
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Subscribe to a pubsub topic and start emitting Waku messages to observers.
|
* Subscribe to a pubsub topic and start emitting Waku messages to observers.
|
||||||
*
|
*
|
||||||
@ -131,36 +164,8 @@ class Relay extends GossipSub implements IRelay {
|
|||||||
if (event.detail.msg.topic !== pubSubTopic) return;
|
if (event.detail.msg.topic !== pubSubTopic) return;
|
||||||
log(`Message received on ${pubSubTopic}`);
|
log(`Message received on ${pubSubTopic}`);
|
||||||
|
|
||||||
const topicOnlyMsg = await this.defaultDecoder.fromWireToProtoObj(
|
this.processIncomingMessage(event.detail.msg.data).catch((e) =>
|
||||||
event.detail.msg.data
|
log("Failed to process incoming message", e)
|
||||||
);
|
|
||||||
if (!topicOnlyMsg || !topicOnlyMsg.contentTopic) {
|
|
||||||
log("Message does not have a content topic, skipping");
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
const observers = this.observers.get(topicOnlyMsg.contentTopic);
|
|
||||||
if (!observers) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
await Promise.all(
|
|
||||||
Array.from(observers).map(async ({ decoder, callback }) => {
|
|
||||||
const protoMsg = await decoder.fromWireToProtoObj(
|
|
||||||
event.detail.msg.data
|
|
||||||
);
|
|
||||||
if (!protoMsg) {
|
|
||||||
log(
|
|
||||||
"Internal error: message previously decoded failed on 2nd pass."
|
|
||||||
);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
const msg = await decoder.fromProtoObj(protoMsg);
|
|
||||||
if (msg) {
|
|
||||||
callback(msg);
|
|
||||||
} else {
|
|
||||||
log("Failed to decode messages on", topicOnlyMsg.contentTopic);
|
|
||||||
}
|
|
||||||
})
|
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
);
|
);
|
||||||
|
Loading…
x
Reference in New Issue
Block a user