mirror of https://github.com/waku-org/js-waku.git
Merge branch 'master' into feat/specific-node
This commit is contained in:
commit
5674b0e86b
File diff suppressed because it is too large
Load Diff
|
@ -115,7 +115,7 @@ export class ReceiverReliabilityMonitor {
|
|||
return;
|
||||
}
|
||||
|
||||
const timeout = window.setTimeout(
|
||||
const timeout = setTimeout(
|
||||
(async () => {
|
||||
const receivedAnyMessage = this.verifiedPeers.has(peerIdStr);
|
||||
const receivedTestMessage = this.receivedMessagesFormPeer.has(
|
||||
|
@ -136,7 +136,7 @@ export class ReceiverReliabilityMonitor {
|
|||
await this.renewAndSubscribePeer(peerId);
|
||||
}) as () => void,
|
||||
MESSAGE_VERIFICATION_DELAY
|
||||
);
|
||||
) as unknown as number;
|
||||
|
||||
this.scheduledVerification.set(peerIdStr, timeout);
|
||||
}
|
||||
|
|
|
@ -121,8 +121,8 @@ describe("waitForRemotePeer", () => {
|
|||
});
|
||||
|
||||
it("should check connected peers if present and suitable", async () => {
|
||||
const addEventListenerSpy = sinon.spy(eventTarget.addEventListener);
|
||||
eventTarget.addEventListener = addEventListenerSpy;
|
||||
const removeEventListenerSpy = sinon.spy(eventTarget.removeEventListener);
|
||||
eventTarget.removeEventListener = removeEventListenerSpy;
|
||||
|
||||
const wakuNode = mockWakuNode({
|
||||
isStarted: true,
|
||||
|
@ -144,7 +144,7 @@ describe("waitForRemotePeer", () => {
|
|||
}
|
||||
|
||||
expect(err).to.be.undefined;
|
||||
expect(addEventListenerSpy.notCalled).to.be.true;
|
||||
expect(removeEventListenerSpy.notCalled).to.be.true;
|
||||
});
|
||||
|
||||
it("should wait for LightPush peer to be connected", async () => {
|
||||
|
|
|
@ -40,57 +40,76 @@ export async function waitForRemotePeer(
|
|||
throw Error("Waku node is not started");
|
||||
}
|
||||
|
||||
if (connections.length > 0 && !protocols.includes(Protocols.Relay)) {
|
||||
const success = await waitForMetadata(waku, protocols);
|
||||
|
||||
if (success) {
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
const promises = [];
|
||||
|
||||
if (protocols.includes(Protocols.Relay)) {
|
||||
if (!waku.relay) {
|
||||
for (const protocol of protocols) {
|
||||
switch (protocol) {
|
||||
case Protocols.Relay:
|
||||
if (!waku.relay)
|
||||
throw Error("Cannot wait for Relay peer: protocol not mounted");
|
||||
}
|
||||
promises.push(waku.relay.waitForPeers());
|
||||
}
|
||||
|
||||
if (protocols.includes(Protocols.Store)) {
|
||||
if (!waku.store) {
|
||||
throw Error("Cannot wait for Store peer: protocol not mounted");
|
||||
}
|
||||
promises.push(waitForConnectedPeer(StoreCodec, waku.libp2p));
|
||||
}
|
||||
|
||||
if (protocols.includes(Protocols.LightPush)) {
|
||||
if (!waku.lightPush) {
|
||||
break;
|
||||
case Protocols.LightPush:
|
||||
if (!waku.lightPush)
|
||||
throw Error("Cannot wait for LightPush peer: protocol not mounted");
|
||||
break;
|
||||
case Protocols.Store:
|
||||
if (!waku.store)
|
||||
throw Error("Cannot wait for Store peer: protocol not mounted");
|
||||
break;
|
||||
case Protocols.Filter:
|
||||
if (!waku.filter)
|
||||
throw Error("Cannot wait for Filter peer: protocol not mounted");
|
||||
break;
|
||||
}
|
||||
promises.push(waitForConnectedPeer(LightPushCodec, waku.libp2p));
|
||||
}
|
||||
|
||||
if (protocols.includes(Protocols.Filter)) {
|
||||
if (!waku.filter) {
|
||||
throw new Error("Cannot wait for Filter peer: protocol not mounted");
|
||||
}
|
||||
promises.push(waitForConnectedPeer(FilterCodecs.SUBSCRIBE, waku.libp2p));
|
||||
const promises = [waitForProtocols(waku, protocols)];
|
||||
|
||||
if (connections.length > 0 && !protocols.includes(Protocols.Relay)) {
|
||||
promises.push(
|
||||
waitForMetadata(waku, protocols) as unknown as Promise<any[]>
|
||||
);
|
||||
}
|
||||
|
||||
if (timeoutMs) {
|
||||
await rejectOnTimeout(
|
||||
Promise.all(promises),
|
||||
Promise.any(promises),
|
||||
timeoutMs,
|
||||
"Timed out waiting for a remote peer."
|
||||
);
|
||||
} else {
|
||||
await Promise.all(promises);
|
||||
await Promise.any(promises);
|
||||
}
|
||||
}
|
||||
|
||||
type EventListener = (_: CustomEvent<IdentifyResult>) => void;
|
||||
|
||||
/**
|
||||
* Waits for required peers to be connected.
|
||||
*/
|
||||
async function waitForProtocols(
|
||||
waku: IWaku,
|
||||
protocols: Protocols[]
|
||||
): Promise<any[]> {
|
||||
const promises = [];
|
||||
|
||||
if (waku.relay && protocols.includes(Protocols.Relay)) {
|
||||
promises.push(waku.relay.waitForPeers());
|
||||
}
|
||||
|
||||
if (waku.store && protocols.includes(Protocols.Store)) {
|
||||
promises.push(waitForConnectedPeer(StoreCodec, waku.libp2p));
|
||||
}
|
||||
|
||||
if (waku.lightPush && protocols.includes(Protocols.LightPush)) {
|
||||
promises.push(waitForConnectedPeer(LightPushCodec, waku.libp2p));
|
||||
}
|
||||
|
||||
if (waku.filter && protocols.includes(Protocols.Filter)) {
|
||||
promises.push(waitForConnectedPeer(FilterCodecs.SUBSCRIBE, waku.libp2p));
|
||||
}
|
||||
|
||||
return Promise.all(promises);
|
||||
}
|
||||
|
||||
/**
|
||||
* Wait for a peer with the given protocol to be connected.
|
||||
* If sharding is enabled on the node, it will also wait for the peer to be confirmed by the metadata service.
|
||||
|
@ -135,12 +154,12 @@ async function waitForConnectedPeer(
|
|||
}
|
||||
|
||||
/**
|
||||
* Waits for the metadata from the remote peer.
|
||||
* Checks existing connections for needed metadata.
|
||||
*/
|
||||
async function waitForMetadata(
|
||||
waku: IWaku,
|
||||
protocols: Protocols[]
|
||||
): Promise<boolean> {
|
||||
): Promise<void> {
|
||||
const connectedPeers = waku.libp2p.getPeers();
|
||||
const metadataService = waku.libp2p.services.metadata;
|
||||
const enabledCodes = mapProtocolsToCodecs(protocols);
|
||||
|
@ -149,7 +168,7 @@ async function waitForMetadata(
|
|||
log.info(
|
||||
`Skipping waitForMetadata due to missing connections:${connectedPeers.length} or metadataService:${!!metadataService}`
|
||||
);
|
||||
return false;
|
||||
return;
|
||||
}
|
||||
|
||||
for (const peerId of connectedPeers) {
|
||||
|
@ -173,7 +192,7 @@ async function waitForMetadata(
|
|||
);
|
||||
|
||||
if (confirmedAllCodecs) {
|
||||
return true;
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -187,8 +206,6 @@ async function waitForMetadata(
|
|||
continue;
|
||||
}
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
const awaitTimeout = (ms: number, rejectReason: string): Promise<void> =>
|
||||
|
|
Loading…
Reference in New Issue