Simplify async iterator usage

The iterator will return only once there is a first item available.
Hence, there is no need to add an observer on `change:protocols` event.
This commit is contained in:
Franck Royer 2022-05-17 18:23:50 +10:00
parent 4b43db6daa
commit d9940f4d9b
No known key found for this signature in database
GPG Key ID: A82ED75A8DFC50A4
2 changed files with 66 additions and 56 deletions

View File

@ -1,5 +1,4 @@
import { expect } from "chai"; import { expect } from "chai";
import debug from "debug";
import PeerId from "peer-id"; import PeerId from "peer-id";
import { import {
@ -8,13 +7,12 @@ import {
NOISE_KEY_2, NOISE_KEY_2,
Nwaku, Nwaku,
} from "../test_utils/"; } from "../test_utils/";
import { delay } from "../test_utils/delay";
import { Protocols, Waku } from "./waku"; import { Protocols, Waku } from "./waku";
import { WakuMessage } from "./waku_message"; import { WakuMessage } from "./waku_message";
import { generateSymmetricKey } from "./waku_message/version_1"; import { generateSymmetricKey } from "./waku_message/version_1";
const dbg = debug("waku:test");
const TestContentTopic = "/test/1/waku/utf8"; const TestContentTopic = "/test/1/waku/utf8";
describe("Waku Dial [node only]", function () { describe("Waku Dial [node only]", function () {
@ -175,21 +173,18 @@ describe("Wait for remote peer / get peers", function () {
!!waku && waku.stop().catch((e) => console.log("Waku failed to stop", e)); !!waku && waku.stop().catch((e) => console.log("Waku failed to stop", e));
}); });
it("Relay", async function () { it("Relay - dialed first", async function () {
this.timeout(20_000); this.timeout(20_000);
nwaku = new Nwaku(makeLogFileName(this)); nwaku = new Nwaku(makeLogFileName(this));
await nwaku.start(); await nwaku.start();
const multiAddrWithId = await nwaku.getMultiaddrWithId(); const multiAddrWithId = await nwaku.getMultiaddrWithId();
dbg("Create");
waku = await Waku.create({ waku = await Waku.create({
staticNoiseKey: NOISE_KEY_1, staticNoiseKey: NOISE_KEY_1,
}); });
dbg("Dial");
await waku.dial(multiAddrWithId); await waku.dial(multiAddrWithId);
dbg("waitForRemotePeer"); await delay(1000);
await waku.waitForRemotePeer([Protocols.Relay]); await waku.waitForRemotePeer([Protocols.Relay]);
dbg("Done, get peers");
const peers = waku.relay.getPeers(); const peers = waku.relay.getPeers();
const nimPeerId = multiAddrWithId.getPeerId(); const nimPeerId = multiAddrWithId.getPeerId();
@ -197,7 +192,29 @@ describe("Wait for remote peer / get peers", function () {
expect(peers.has(nimPeerId as string)).to.be.true; expect(peers.has(nimPeerId as string)).to.be.true;
}); });
it("Store", async function () { it("Relay - dialed after", async function () {
this.timeout(20_000);
nwaku = new Nwaku(makeLogFileName(this));
await nwaku.start();
const multiAddrWithId = await nwaku.getMultiaddrWithId();
waku = await Waku.create({
staticNoiseKey: NOISE_KEY_1,
});
const waitPromise = waku.waitForRemotePeer([Protocols.Relay]);
await delay(1000);
await waku.dial(multiAddrWithId);
await waitPromise;
const peers = waku.relay.getPeers();
const nimPeerId = multiAddrWithId.getPeerId();
expect(nimPeerId).to.not.be.undefined;
expect(peers.has(nimPeerId as string)).to.be.true;
});
it("Store - dialed first", async function () {
this.timeout(20_000); this.timeout(20_000);
nwaku = new Nwaku(makeLogFileName(this)); nwaku = new Nwaku(makeLogFileName(this));
await nwaku.start({ persistMessages: true }); await nwaku.start({ persistMessages: true });
@ -207,6 +224,7 @@ describe("Wait for remote peer / get peers", function () {
staticNoiseKey: NOISE_KEY_1, staticNoiseKey: NOISE_KEY_1,
}); });
await waku.dial(multiAddrWithId); await waku.dial(multiAddrWithId);
await delay(1000);
await waku.waitForRemotePeer([Protocols.Store]); await waku.waitForRemotePeer([Protocols.Store]);
const peers = []; const peers = [];
@ -220,6 +238,31 @@ describe("Wait for remote peer / get peers", function () {
expect(peers.includes(nimPeerId as string)).to.be.true; expect(peers.includes(nimPeerId as string)).to.be.true;
}); });
it("Store - dialed after", async function () {
this.timeout(20_000);
nwaku = new Nwaku(makeLogFileName(this));
await nwaku.start({ persistMessages: true });
const multiAddrWithId = await nwaku.getMultiaddrWithId();
waku = await Waku.create({
staticNoiseKey: NOISE_KEY_1,
});
const waitPromise = waku.waitForRemotePeer([Protocols.Store]);
await delay(1000);
await waku.dial(multiAddrWithId);
await waitPromise;
const peers = [];
for await (const peer of waku.store.peers) {
peers.push(peer.id.toB58String());
}
const nimPeerId = multiAddrWithId.getPeerId();
expect(nimPeerId).to.not.be.undefined;
expect(peers.includes(nimPeerId as string)).to.be.true;
});
it("LightPush", async function () { it("LightPush", async function () {
this.timeout(20_000); this.timeout(20_000);
nwaku = new Nwaku(makeLogFileName(this)); nwaku = new Nwaku(makeLogFileName(this));

View File

@ -353,56 +353,23 @@ export class Waku {
} }
if (desiredProtocols.includes(Protocols.Store)) { if (desiredProtocols.includes(Protocols.Store)) {
let storePeerFound = false; const storePromise = (async (): Promise<void> => {
for await (const peer of this.store.peers) {
for await (const _peer of this.store.peers) { dbg("Store peer found", peer.id.toB58String());
storePeerFound = true; break;
break; }
} })();
promises.push(storePromise);
if (!storePeerFound) {
// No peer available for this protocol, waiting to connect to one.
const promise = new Promise<void>((resolve) => {
this.libp2p.peerStore.on(
"change:protocols",
({ protocols: connectedPeerProtocols }) => {
for (const codec of Object.values(StoreCodecs)) {
if (connectedPeerProtocols.includes(codec)) {
dbg("Resolving for", codec, connectedPeerProtocols);
resolve();
}
}
}
);
});
promises.push(promise);
}
} }
if (desiredProtocols.includes(Protocols.LightPush)) { if (desiredProtocols.includes(Protocols.LightPush)) {
let lightPushPeerFound = false; const lightPushPromise = (async (): Promise<void> => {
for await (const peer of this.lightPush.peers) {
for await (const _peer of this.lightPush.peers) { dbg("Light Push peer found", peer.id.toB58String());
lightPushPeerFound = true; break;
break; }
} })();
promises.push(lightPushPromise);
if (!lightPushPeerFound) {
// No peer available for this protocol, waiting to connect to one.
const promise = new Promise<void>((resolve) => {
this.libp2p.peerStore.on(
"change:protocols",
({ protocols: connectedPeerProtocols }) => {
if (connectedPeerProtocols.includes(LightPushCodec)) {
dbg("Resolving for", LightPushCodec, connectedPeerProtocols);
resolve();
}
}
);
});
promises.push(promise);
}
} }
await Promise.all(promises); await Promise.all(promises);