fix: remove window reference and improve waitForRemotePeer (#2194)

* fix: remove window reference and improve waitForRemotePeer

* remove only

* up lock

* debug: try * as version

* return version

* up lock
This commit is contained in:
Sasha 2024-11-08 12:34:31 +07:00 committed by GitHub
parent 7c0ce7b2ec
commit 88e33a90fd
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 1375 additions and 1177 deletions

2439
package-lock.json generated

File diff suppressed because it is too large Load Diff

View File

@ -115,7 +115,7 @@ export class ReceiverReliabilityMonitor {
return; return;
} }
const timeout = window.setTimeout( const timeout = setTimeout(
(async () => { (async () => {
const receivedAnyMessage = this.verifiedPeers.has(peerIdStr); const receivedAnyMessage = this.verifiedPeers.has(peerIdStr);
const receivedTestMessage = this.receivedMessagesFormPeer.has( const receivedTestMessage = this.receivedMessagesFormPeer.has(
@ -136,7 +136,7 @@ export class ReceiverReliabilityMonitor {
await this.renewAndSubscribePeer(peerId); await this.renewAndSubscribePeer(peerId);
}) as () => void, }) as () => void,
MESSAGE_VERIFICATION_DELAY MESSAGE_VERIFICATION_DELAY
); ) as unknown as number;
this.scheduledVerification.set(peerIdStr, timeout); this.scheduledVerification.set(peerIdStr, timeout);
} }

View File

@ -121,8 +121,8 @@ describe("waitForRemotePeer", () => {
}); });
it("should check connected peers if present and suitable", async () => { it("should check connected peers if present and suitable", async () => {
const addEventListenerSpy = sinon.spy(eventTarget.addEventListener); const removeEventListenerSpy = sinon.spy(eventTarget.removeEventListener);
eventTarget.addEventListener = addEventListenerSpy; eventTarget.removeEventListener = removeEventListenerSpy;
const wakuNode = mockWakuNode({ const wakuNode = mockWakuNode({
isStarted: true, isStarted: true,
@ -144,7 +144,7 @@ describe("waitForRemotePeer", () => {
} }
expect(err).to.be.undefined; expect(err).to.be.undefined;
expect(addEventListenerSpy.notCalled).to.be.true; expect(removeEventListenerSpy.notCalled).to.be.true;
}); });
it("should wait for LightPush peer to be connected", async () => { it("should wait for LightPush peer to be connected", async () => {

View File

@ -40,57 +40,76 @@ export async function waitForRemotePeer(
throw Error("Waku node is not started"); throw Error("Waku node is not started");
} }
if (connections.length > 0 && !protocols.includes(Protocols.Relay)) { for (const protocol of protocols) {
const success = await waitForMetadata(waku, protocols); switch (protocol) {
case Protocols.Relay:
if (success) { if (!waku.relay)
return;
}
}
const promises = [];
if (protocols.includes(Protocols.Relay)) {
if (!waku.relay) {
throw Error("Cannot wait for Relay peer: protocol not mounted"); throw Error("Cannot wait for Relay peer: protocol not mounted");
} break;
promises.push(waku.relay.waitForPeers()); case Protocols.LightPush:
} if (!waku.lightPush)
if (protocols.includes(Protocols.Store)) {
if (!waku.store) {
throw Error("Cannot wait for Store peer: protocol not mounted");
}
promises.push(waitForConnectedPeer(StoreCodec, waku.libp2p));
}
if (protocols.includes(Protocols.LightPush)) {
if (!waku.lightPush) {
throw Error("Cannot wait for LightPush peer: protocol not mounted"); throw Error("Cannot wait for LightPush peer: protocol not mounted");
break;
case Protocols.Store:
if (!waku.store)
throw Error("Cannot wait for Store peer: protocol not mounted");
break;
case Protocols.Filter:
if (!waku.filter)
throw Error("Cannot wait for Filter peer: protocol not mounted");
break;
} }
promises.push(waitForConnectedPeer(LightPushCodec, waku.libp2p));
} }
if (protocols.includes(Protocols.Filter)) { const promises = [waitForProtocols(waku, protocols)];
if (!waku.filter) {
throw new Error("Cannot wait for Filter peer: protocol not mounted"); if (connections.length > 0 && !protocols.includes(Protocols.Relay)) {
} promises.push(
promises.push(waitForConnectedPeer(FilterCodecs.SUBSCRIBE, waku.libp2p)); waitForMetadata(waku, protocols) as unknown as Promise<any[]>
);
} }
if (timeoutMs) { if (timeoutMs) {
await rejectOnTimeout( await rejectOnTimeout(
Promise.all(promises), Promise.any(promises),
timeoutMs, timeoutMs,
"Timed out waiting for a remote peer." "Timed out waiting for a remote peer."
); );
} else { } else {
await Promise.all(promises); await Promise.any(promises);
} }
} }
type EventListener = (_: CustomEvent<IdentifyResult>) => void; type EventListener = (_: CustomEvent<IdentifyResult>) => void;
/**
* Waits for required peers to be connected.
*/
async function waitForProtocols(
waku: IWaku,
protocols: Protocols[]
): Promise<any[]> {
const promises = [];
if (waku.relay && protocols.includes(Protocols.Relay)) {
promises.push(waku.relay.waitForPeers());
}
if (waku.store && protocols.includes(Protocols.Store)) {
promises.push(waitForConnectedPeer(StoreCodec, waku.libp2p));
}
if (waku.lightPush && protocols.includes(Protocols.LightPush)) {
promises.push(waitForConnectedPeer(LightPushCodec, waku.libp2p));
}
if (waku.filter && protocols.includes(Protocols.Filter)) {
promises.push(waitForConnectedPeer(FilterCodecs.SUBSCRIBE, waku.libp2p));
}
return Promise.all(promises);
}
/** /**
* Wait for a peer with the given protocol to be connected. * Wait for a peer with the given protocol to be connected.
* If sharding is enabled on the node, it will also wait for the peer to be confirmed by the metadata service. * If sharding is enabled on the node, it will also wait for the peer to be confirmed by the metadata service.
@ -135,12 +154,12 @@ async function waitForConnectedPeer(
} }
/** /**
* Waits for the metadata from the remote peer. * Checks existing connections for needed metadata.
*/ */
async function waitForMetadata( async function waitForMetadata(
waku: IWaku, waku: IWaku,
protocols: Protocols[] protocols: Protocols[]
): Promise<boolean> { ): Promise<void> {
const connectedPeers = waku.libp2p.getPeers(); const connectedPeers = waku.libp2p.getPeers();
const metadataService = waku.libp2p.services.metadata; const metadataService = waku.libp2p.services.metadata;
const enabledCodes = mapProtocolsToCodecs(protocols); const enabledCodes = mapProtocolsToCodecs(protocols);
@ -149,7 +168,7 @@ async function waitForMetadata(
log.info( log.info(
`Skipping waitForMetadata due to missing connections:${connectedPeers.length} or metadataService:${!!metadataService}` `Skipping waitForMetadata due to missing connections:${connectedPeers.length} or metadataService:${!!metadataService}`
); );
return false; return;
} }
for (const peerId of connectedPeers) { for (const peerId of connectedPeers) {
@ -173,7 +192,7 @@ async function waitForMetadata(
); );
if (confirmedAllCodecs) { if (confirmedAllCodecs) {
return true; return;
} }
} }
} }
@ -187,8 +206,6 @@ async function waitForMetadata(
continue; continue;
} }
} }
return false;
} }
const awaitTimeout = (ms: number, rejectReason: string): Promise<void> => const awaitTimeout = (ms: number, rejectReason: string): Promise<void> =>