feat: offline state recovery for Filter subscription (#2049)

* up

* fix window reference

* add tests

* up

* add e2e renew test

* address comments

* remove unused

* add test

* try

* remove only

* up test

* up

* remove only

* add tmp logs, use before/after hooks

* up

* fix check

* remove only

* fix test

* up
This commit is contained in:
Sasha 2024-08-28 18:00:18 +02:00 committed by GitHub
parent 71384dfdfd
commit eadb85ab83
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 305 additions and 97 deletions

View File

@ -38,32 +38,15 @@ export class ConnectionManager
private currentActiveParallelDialCount = 0; private currentActiveParallelDialCount = 0;
private pendingPeerDialQueue: Array<PeerId> = []; private pendingPeerDialQueue: Array<PeerId> = [];
private online: boolean = false;
private isP2PNetworkConnected: boolean = false;
public isConnected(): boolean { public isConnected(): boolean {
return this.online; if (globalThis?.navigator && !globalThis?.navigator?.onLine) {
} return false;
private toggleOnline(): void {
if (!this.online) {
this.online = true;
this.dispatchEvent(
new CustomEvent<boolean>(EConnectionStateEvents.CONNECTION_STATUS, {
detail: this.online
})
);
} }
}
private toggleOffline(): void { return this.isP2PNetworkConnected;
if (this.online && this.libp2p.getConnections().length == 0) {
this.online = false;
this.dispatchEvent(
new CustomEvent<boolean>(EConnectionStateEvents.CONNECTION_STATUS, {
detail: this.online
})
);
}
} }
public static create( public static create(
@ -103,6 +86,7 @@ export class ConnectionManager
"peer:discovery", "peer:discovery",
this.onEventHandlers["peer:discovery"] this.onEventHandlers["peer:discovery"]
); );
this.stopNetworkStatusListener();
} }
public async dropConnection(peerId: PeerId): Promise<void> { public async dropConnection(peerId: PeerId): Promise<void> {
@ -193,7 +177,7 @@ export class ConnectionManager
options: keepAliveOptions options: keepAliveOptions
}); });
this.run() this.startEventListeners()
.then(() => log.info(`Connection Manager is now running`)) .then(() => log.info(`Connection Manager is now running`))
.catch((error) => .catch((error) =>
log.error(`Unexpected error while running service`, error) log.error(`Unexpected error while running service`, error)
@ -225,11 +209,12 @@ export class ConnectionManager
} }
} }
private async run(): Promise<void> { private async startEventListeners(): Promise<void> {
// start event listeners
this.startPeerDiscoveryListener(); this.startPeerDiscoveryListener();
this.startPeerConnectionListener(); this.startPeerConnectionListener();
this.startPeerDisconnectionListener(); this.startPeerDisconnectionListener();
this.startNetworkStatusListener();
} }
private async dialPeer(peerId: PeerId): Promise<void> { private async dialPeer(peerId: PeerId): Promise<void> {
@ -428,14 +413,18 @@ export class ConnectionManager
) )
); );
} }
this.toggleOnline();
this.setP2PNetworkConnected();
})(); })();
}, },
"peer:disconnect": (evt: CustomEvent<PeerId>): void => { "peer:disconnect": (evt: CustomEvent<PeerId>): void => {
void (async () => { void (async () => {
this.keepAliveManager.stop(evt.detail); this.keepAliveManager.stop(evt.detail);
this.toggleOffline(); this.setP2PNetworkDisconnected();
})(); })();
},
"browser:network": (): void => {
this.dispatchWakuConnectionEvent();
} }
}; };
@ -572,4 +561,59 @@ export class ConnectionManager
if (!shardInfoBytes) return undefined; if (!shardInfoBytes) return undefined;
return decodeRelayShard(shardInfoBytes); return decodeRelayShard(shardInfoBytes);
} }
private startNetworkStatusListener(): void {
try {
globalThis.addEventListener(
"online",
this.onEventHandlers["browser:network"]
);
globalThis.addEventListener(
"offline",
this.onEventHandlers["browser:network"]
);
} catch (err) {
log.error(`Failed to start network listener: ${err}`);
}
}
private stopNetworkStatusListener(): void {
try {
globalThis.removeEventListener(
"online",
this.onEventHandlers["browser:network"]
);
globalThis.removeEventListener(
"offline",
this.onEventHandlers["browser:network"]
);
} catch (err) {
log.error(`Failed to stop network listener: ${err}`);
}
}
private setP2PNetworkConnected(): void {
if (!this.isP2PNetworkConnected) {
this.isP2PNetworkConnected = true;
this.dispatchWakuConnectionEvent();
}
}
private setP2PNetworkDisconnected(): void {
if (
this.isP2PNetworkConnected &&
this.libp2p.getConnections().length === 0
) {
this.isP2PNetworkConnected = false;
this.dispatchWakuConnectionEvent();
}
}
private dispatchWakuConnectionEvent(): void {
this.dispatchEvent(
new CustomEvent<boolean>(EConnectionStateEvents.CONNECTION_STATUS, {
detail: this.isConnected()
})
);
}
} }

View File

@ -6,6 +6,7 @@ import {
type ContentTopic, type ContentTopic,
type CoreProtocolResult, type CoreProtocolResult,
type CreateSubscriptionResult, type CreateSubscriptionResult,
EConnectionStateEvents,
type IAsyncIterator, type IAsyncIterator,
type IDecodedMessage, type IDecodedMessage,
type IDecoder, type IDecoder,
@ -65,20 +66,22 @@ export class SubscriptionManager implements ISubscriptionSDK {
private missedMessagesByPeer: Map<string, number> = new Map(); private missedMessagesByPeer: Map<string, number> = new Map();
private maxPingFailures: number = DEFAULT_MAX_PINGS; private maxPingFailures: number = DEFAULT_MAX_PINGS;
private maxMissedMessagesThreshold = DEFAULT_MAX_MISSED_MESSAGES_THRESHOLD; private maxMissedMessagesThreshold = DEFAULT_MAX_MISSED_MESSAGES_THRESHOLD;
private subscribeOptions: SubscribeOptions = DEFAULT_SUBSCRIBE_OPTIONS;
private subscriptionCallbacks: Map< private subscriptionCallbacks: Map<
ContentTopic, ContentTopic,
SubscriptionCallback<IDecodedMessage> SubscriptionCallback<IDecodedMessage>
>; > = new Map();
public constructor( public constructor(
private readonly pubsubTopic: PubsubTopic, private readonly pubsubTopic: PubsubTopic,
private protocol: FilterCore, private readonly protocol: FilterCore,
private getPeers: () => Peer[], private readonly connectionManager: ConnectionManager,
private readonly getPeers: () => Peer[],
private readonly renewPeer: (peerToDisconnect: PeerId) => Promise<Peer> private readonly renewPeer: (peerToDisconnect: PeerId) => Promise<Peer>
) { ) {
this.pubsubTopic = pubsubTopic; this.pubsubTopic = pubsubTopic;
this.subscriptionCallbacks = new Map();
const allPeerIdStr = this.getPeers().map((p) => p.id.toString()); const allPeerIdStr = this.getPeers().map((p) => p.id.toString());
this.receivedMessagesHashes = { this.receivedMessagesHashes = {
all: new Set(), all: new Set(),
@ -89,10 +92,6 @@ export class SubscriptionManager implements ISubscriptionSDK {
allPeerIdStr.forEach((peerId) => this.missedMessagesByPeer.set(peerId, 0)); allPeerIdStr.forEach((peerId) => this.missedMessagesByPeer.set(peerId, 0));
} }
public get messageHashes(): string[] {
return [...this.receivedMessagesHashes.all];
}
private addHash(hash: string, peerIdStr?: string): void { private addHash(hash: string, peerIdStr?: string): void {
this.receivedMessagesHashes.all.add(hash); this.receivedMessagesHashes.all.add(hash);
@ -155,9 +154,8 @@ export class SubscriptionManager implements ISubscriptionSDK {
this.subscriptionCallbacks.set(contentTopic, subscriptionCallback); this.subscriptionCallbacks.set(contentTopic, subscriptionCallback);
}); });
if (options.keepAlive) { this.subscribeOptions = options;
this.startKeepAlivePings(options); this.startSubscriptionsMaintenance(options);
}
return finalResult; return finalResult;
} }
@ -183,9 +181,7 @@ export class SubscriptionManager implements ISubscriptionSDK {
const finalResult = this.handleResult(results, "unsubscribe"); const finalResult = this.handleResult(results, "unsubscribe");
if (this.subscriptionCallbacks.size === 0) { if (this.subscriptionCallbacks.size === 0) {
if (this.keepAliveTimer) { this.stopSubscriptionsMaintenance();
this.stopKeepAlivePings();
}
} }
return finalResult; return finalResult;
@ -211,9 +207,7 @@ export class SubscriptionManager implements ISubscriptionSDK {
const finalResult = this.handleResult(results, "unsubscribeAll"); const finalResult = this.handleResult(results, "unsubscribeAll");
if (this.keepAliveTimer) { this.stopSubscriptionsMaintenance();
this.stopKeepAlivePings();
}
return finalResult; return finalResult;
} }
@ -378,8 +372,19 @@ export class SubscriptionManager implements ISubscriptionSDK {
} }
} }
private startKeepAlivePings(options: SubscribeOptions): void { private startSubscriptionsMaintenance(options: SubscribeOptions): void {
const { keepAlive } = options; if (options?.keepAlive) {
this.startKeepAlivePings(options.keepAlive);
}
this.startConnectionListener();
}
private stopSubscriptionsMaintenance(): void {
this.stopKeepAlivePings();
this.stopConnectionListener();
}
private startKeepAlivePings(interval: number): void {
if (this.keepAliveTimer) { if (this.keepAliveTimer) {
log.info("Recurring pings already set up."); log.info("Recurring pings already set up.");
return; return;
@ -389,7 +394,7 @@ export class SubscriptionManager implements ISubscriptionSDK {
void this.ping().catch((error) => { void this.ping().catch((error) => {
log.error("Error in keep-alive ping cycle:", error); log.error("Error in keep-alive ping cycle:", error);
}); });
}, keepAlive) as unknown as number; }, interval) as unknown as number;
} }
private stopKeepAlivePings(): void { private stopKeepAlivePings(): void {
@ -403,6 +408,48 @@ export class SubscriptionManager implements ISubscriptionSDK {
this.keepAliveTimer = null; this.keepAliveTimer = null;
} }
private startConnectionListener(): void {
this.connectionManager.addEventListener(
EConnectionStateEvents.CONNECTION_STATUS,
this.connectionListener.bind(this) as (v: CustomEvent<boolean>) => void
);
}
private stopConnectionListener(): void {
this.connectionManager.removeEventListener(
EConnectionStateEvents.CONNECTION_STATUS,
this.connectionListener.bind(this) as (v: CustomEvent<boolean>) => void
);
}
private async connectionListener({
detail: isConnected
}: CustomEvent<boolean>): Promise<void> {
if (!isConnected) {
this.stopKeepAlivePings();
return;
}
try {
const result = await this.ping();
const renewPeerPromises = result.failures.map(
async (v): Promise<void> => {
if (v.peerId) {
await this.renewAndSubscribePeer(v.peerId);
}
}
);
await Promise.all(renewPeerPromises);
} catch (err) {
log.error(`networkStateListener failed to recover: ${err}`);
}
this.startKeepAlivePings(
this.subscribeOptions?.keepAlive || DEFAULT_SUBSCRIBE_OPTIONS.keepAlive
);
}
private incrementMissedMessageCount(peerIdStr: string): void { private incrementMissedMessageCount(peerIdStr: string): void {
const currentCount = this.missedMessagesByPeer.get(peerIdStr) || 0; const currentCount = this.missedMessagesByPeer.get(peerIdStr) || 0;
this.missedMessagesByPeer.set(peerIdStr, currentCount + 1); this.missedMessagesByPeer.set(peerIdStr, currentCount + 1);
@ -416,6 +463,7 @@ export class SubscriptionManager implements ISubscriptionSDK {
class FilterSDK extends BaseProtocolSDK implements IFilterSDK { class FilterSDK extends BaseProtocolSDK implements IFilterSDK {
public readonly protocol: FilterCore; public readonly protocol: FilterCore;
private readonly _connectionManager: ConnectionManager;
private activeSubscriptions = new Map<string, SubscriptionManager>(); private activeSubscriptions = new Map<string, SubscriptionManager>();
@ -445,8 +493,7 @@ class FilterSDK extends BaseProtocolSDK implements IFilterSDK {
); );
this.protocol = this.core as FilterCore; this.protocol = this.core as FilterCore;
this._connectionManager = connectionManager;
this.activeSubscriptions = new Map();
} }
/** /**
@ -576,6 +623,7 @@ class FilterSDK extends BaseProtocolSDK implements IFilterSDK {
new SubscriptionManager( new SubscriptionManager(
pubsubTopic, pubsubTopic,
this.protocol, this.protocol,
this._connectionManager,
() => this.connectedPeers, () => this.connectedPeers,
this.renewPeer.bind(this) this.renewPeer.bind(this)
) )

View File

@ -1,5 +1,5 @@
import type { PeerId, PeerInfo } from "@libp2p/interface"; import type { PeerId, PeerInfo } from "@libp2p/interface";
import { CustomEvent } from "@libp2p/interface"; import { CustomEvent, TypedEventEmitter } from "@libp2p/interface";
import { createSecp256k1PeerId } from "@libp2p/peer-id-factory"; import { createSecp256k1PeerId } from "@libp2p/peer-id-factory";
import { import {
EConnectionStateEvents, EConnectionStateEvents,
@ -151,8 +151,34 @@ describe("Events", function () {
}); });
}); });
describe("peer:disconnect", () => { describe(EConnectionStateEvents.CONNECTION_STATUS, function () {
it("should emit `waku:offline` event when all peers disconnect", async function () { let navigatorMock: any;
this.beforeEach(() => {
navigatorMock = { onLine: true };
globalThis.navigator = navigatorMock;
const eventEmmitter = new TypedEventEmitter();
globalThis.addEventListener =
eventEmmitter.addEventListener.bind(eventEmmitter);
globalThis.removeEventListener =
eventEmmitter.removeEventListener.bind(eventEmmitter);
globalThis.dispatchEvent =
eventEmmitter.dispatchEvent.bind(eventEmmitter);
});
this.afterEach(() => {
// @ts-expect-error: resetting set value
globalThis.navigator = undefined;
// @ts-expect-error: resetting set value
globalThis.addEventListener = undefined;
// @ts-expect-error: resetting set value
globalThis.removeEventListener = undefined;
// @ts-expect-error: resetting set value
globalThis.dispatchEvent = undefined;
});
it(`should emit events and trasition isConnected state when has peers or no peers`, async function () {
const peerIdPx = await createSecp256k1PeerId(); const peerIdPx = await createSecp256k1PeerId();
const peerIdPx2 = await createSecp256k1PeerId(); const peerIdPx2 = await createSecp256k1PeerId();
@ -174,17 +200,8 @@ describe("Events", function () {
} }
}); });
waku.libp2p.dispatchEvent(
new CustomEvent<PeerId>("peer:connect", { detail: peerIdPx })
);
waku.libp2p.dispatchEvent(
new CustomEvent<PeerId>("peer:connect", { detail: peerIdPx2 })
);
await delay(100);
let eventCount = 0; let eventCount = 0;
const connectionStatus = new Promise<boolean>((resolve) => { const connectedStatus = new Promise<boolean>((resolve) => {
waku.connectionManager.addEventListener( waku.connectionManager.addEventListener(
EConnectionStateEvents.CONNECTION_STATUS, EConnectionStateEvents.CONNECTION_STATUS,
({ detail: status }) => { ({ detail: status }) => {
@ -194,40 +211,6 @@ describe("Events", function () {
); );
}); });
expect(waku.isConnected()).to.be.true;
waku.libp2p.dispatchEvent(
new CustomEvent<PeerId>("peer:disconnect", { detail: peerIdPx })
);
waku.libp2p.dispatchEvent(
new CustomEvent<PeerId>("peer:disconnect", { detail: peerIdPx2 })
);
expect(await connectionStatus).to.eq(false);
expect(eventCount).to.be.eq(1);
});
it("isConnected should return false after all peers disconnect", async function () {
const peerIdPx = await createSecp256k1PeerId();
const peerIdPx2 = await createSecp256k1PeerId();
await waku.libp2p.peerStore.save(peerIdPx, {
tags: {
[Tags.PEER_EXCHANGE]: {
value: 50,
ttl: 1200000
}
}
});
await waku.libp2p.peerStore.save(peerIdPx2, {
tags: {
[Tags.PEER_EXCHANGE]: {
value: 50,
ttl: 1200000
}
}
});
waku.libp2p.dispatchEvent( waku.libp2p.dispatchEvent(
new CustomEvent<PeerId>("peer:connect", { detail: peerIdPx }) new CustomEvent<PeerId>("peer:connect", { detail: peerIdPx })
); );
@ -238,6 +221,17 @@ describe("Events", function () {
await delay(100); await delay(100);
expect(waku.isConnected()).to.be.true; expect(waku.isConnected()).to.be.true;
expect(await connectedStatus).to.eq(true);
expect(eventCount).to.be.eq(1);
const disconnectedStatus = new Promise<boolean>((resolve) => {
waku.connectionManager.addEventListener(
EConnectionStateEvents.CONNECTION_STATUS,
({ detail: status }) => {
resolve(status);
}
);
});
waku.libp2p.dispatchEvent( waku.libp2p.dispatchEvent(
new CustomEvent<PeerId>("peer:disconnect", { detail: peerIdPx }) new CustomEvent<PeerId>("peer:disconnect", { detail: peerIdPx })
@ -247,6 +241,81 @@ describe("Events", function () {
); );
expect(waku.isConnected()).to.be.false; expect(waku.isConnected()).to.be.false;
expect(await disconnectedStatus).to.eq(false);
expect(eventCount).to.be.eq(2);
});
it("should be online or offline if network state changed", async function () {
// have to recreate js-waku for it to pick up new globalThis
waku = await createLightNode();
const peerIdPx = await createSecp256k1PeerId();
await waku.libp2p.peerStore.save(peerIdPx, {
tags: {
[Tags.PEER_EXCHANGE]: {
value: 50,
ttl: 1200000
}
}
});
let eventCount = 0;
const connectedStatus = new Promise<boolean>((resolve) => {
waku.connectionManager.addEventListener(
EConnectionStateEvents.CONNECTION_STATUS,
({ detail: status }) => {
eventCount++;
resolve(status);
}
);
});
waku.libp2p.dispatchEvent(
new CustomEvent<PeerId>("peer:connect", { detail: peerIdPx })
);
await delay(100);
expect(waku.isConnected()).to.be.true;
expect(await connectedStatus).to.eq(true);
expect(eventCount).to.be.eq(1);
const disconnectedStatus = new Promise<boolean>((resolve) => {
waku.connectionManager.addEventListener(
EConnectionStateEvents.CONNECTION_STATUS,
({ detail: status }) => {
resolve(status);
}
);
});
navigatorMock.onLine = false;
globalThis.dispatchEvent(new CustomEvent("offline"));
await delay(100);
expect(waku.isConnected()).to.be.false;
expect(await disconnectedStatus).to.eq(false);
expect(eventCount).to.be.eq(2);
const connectionRecoveredStatus = new Promise<boolean>((resolve) => {
waku.connectionManager.addEventListener(
EConnectionStateEvents.CONNECTION_STATUS,
({ detail: status }) => {
resolve(status);
}
);
});
navigatorMock.onLine = true;
globalThis.dispatchEvent(new CustomEvent("online"));
await delay(100);
expect(waku.isConnected()).to.be.true;
expect(await connectionRecoveredStatus).to.eq(true);
expect(eventCount).to.be.eq(3);
}); });
}); });
}); });

View File

@ -18,7 +18,8 @@ import {
runMultipleNodes, runMultipleNodes,
ServiceNodesFleet, ServiceNodesFleet,
teardownNodesWithRedundancy, teardownNodesWithRedundancy,
TEST_STRING TEST_STRING,
waitForConnections
} from "../../src/index.js"; } from "../../src/index.js";
import { import {
@ -485,6 +486,52 @@ const runTests = (strictCheckNodes: boolean): void => {
expectedPubsubTopic: TestPubsubTopic expectedPubsubTopic: TestPubsubTopic
}); });
}); });
it("Renews subscription after lossing a connection", async function () {
// setup check
expect(waku.libp2p.getConnections()).has.length(2);
await waku.filter.subscribe(
[TestDecoder],
serviceNodes.messageCollector.callback
);
await waku.lightPush.send(TestEncoder, messagePayload);
expect(await serviceNodes.messageCollector.waitForMessages(1)).to.eq(
true
);
serviceNodes.messageCollector.verifyReceivedMessage(0, {
expectedMessageText: messageText,
expectedContentTopic: TestContentTopic
});
await serviceNodes.confirmMessageLength(1);
// check renew logic
const nwakuPeers = await Promise.all(
serviceNodes.nodes.map((v) => v.getMultiaddrWithId())
);
await Promise.all(nwakuPeers.map((v) => waku.libp2p.hangUp(v)));
expect(waku.libp2p.getConnections().length).eq(0);
await Promise.all(nwakuPeers.map((v) => waku.libp2p.dial(v)));
await waitForConnections(nwakuPeers.length, waku);
const testText = "second try";
await waku.lightPush.send(TestEncoder, {
payload: utf8ToBytes(testText)
});
expect(await serviceNodes.messageCollector.waitForMessages(2)).to.eq(
true
);
serviceNodes.messageCollector.verifyReceivedMessage(1, {
expectedMessageText: testText,
expectedContentTopic: TestContentTopic
});
});
}); });
}; };