fix: filter v2

This commit is contained in:
danisharora099 2023-05-03 13:46:46 +05:30
parent 040a63095a
commit ba7bc95b81
No known key found for this signature in database
GPG Key ID: FBD2BF500037F135
1 changed files with 20 additions and 12 deletions

View File

@ -32,6 +32,8 @@ import {
const log = debug("waku:filter:v2"); const log = debug("waku:filter:v2");
type PeerIdStr = string;
type Subscription<T extends IDecodedMessage> = { type Subscription<T extends IDecodedMessage> = {
decoders: IDecoder<T>[]; decoders: IDecoder<T>[];
callback: Callback<T>; callback: Callback<T>;
@ -52,7 +54,7 @@ const FilterV2Codecs = {
*/ */
class FilterV2 extends BaseProtocol implements IFilterV2 { class FilterV2 extends BaseProtocol implements IFilterV2 {
options: ProtocolCreateOptions; options: ProtocolCreateOptions;
private subscriptions: Map<PeerId, unknown[]>; private subscriptions: Map<PeerIdStr, unknown[]>;
public pubSubTopic: string; public pubSubTopic: string;
constructor(public libp2p: Libp2p, options?: ProtocolCreateOptions) { constructor(public libp2p: Libp2p, options?: ProtocolCreateOptions) {
@ -134,7 +136,12 @@ class FilterV2 extends BaseProtocol implements IFilterV2 {
decoders: decodersArray, decoders: decodersArray,
pubSubTopic: this.pubSubTopic, pubSubTopic: this.pubSubTopic,
}; };
this.subscriptions.get(peer.id)?.push(subscription) ?? [subscription]; this.subscriptions.set(
peer.id.toString(),
[...(this.subscriptions.get(peer.id.toString()) ?? []), subscription] ?? [
subscription,
]
);
return { return {
ping: async () => { ping: async () => {
@ -173,11 +180,14 @@ class FilterV2 extends BaseProtocol implements IFilterV2 {
} }
const subs = this.subscriptions.set( const subs = this.subscriptions.set(
peer.id, peer.id.toString(),
removeItemFromArray(this.subscriptions.get(peer.id) ?? [], subscription) removeItemFromArray(
this.subscriptions.get(peer.id.toString()) ?? [],
subscription
)
); );
if (!subs.get(peer.id)?.length) { if (!subs.get(peer.id.toString())?.length) {
this.subscriptions.delete(peer.id); this.subscriptions.delete(peer.id.toString());
} }
} }
@ -246,7 +256,7 @@ class FilterV2 extends BaseProtocol implements IFilterV2 {
public getActiveSubscriptions(): ActiveSubscriptions { public getActiveSubscriptions(): ActiveSubscriptions {
const map: ActiveSubscriptions = new Map(); const map: ActiveSubscriptions = new Map();
const subscriptions = this.subscriptions as Map< const subscriptions = this.subscriptions as Map<
PeerId, PeerIdStr,
Subscription<IDecodedMessage>[] Subscription<IDecodedMessage>[]
>; >;
@ -276,15 +286,13 @@ class FilterV2 extends BaseProtocol implements IFilterV2 {
} }
const subs = this.subscriptions as Map< const subs = this.subscriptions as Map<
PeerId, PeerIdStr,
Subscription<IDecodedMessage>[] Subscription<IDecodedMessage>[]
>; >;
const subscription = subs const subscription = subs
.get(streamData.connection.remotePeer) .get(streamData.connection.remotePeer.toString())
?.find((s) => { ?.find((s) => s.pubSubTopic === pubsubTopic);
s.pubSubTopic === pubsubTopic;
});
if (!subscription) { if (!subscription) {
log(`No subscription locally registered for topic ${pubsubTopic}`); log(`No subscription locally registered for topic ${pubsubTopic}`);