chore: fix nwaku interop test failures (#1792)

* chore: investigate interop test failures

* try some fixes

* remove only

* remove comment

* use getConnections for waku node
This commit is contained in:
Sasha 2024-01-18 11:05:35 +01:00 committed by GitHub
parent a859243b76
commit ea4b30752d
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 54 additions and 31 deletions

View File

@ -309,6 +309,10 @@ class Filter extends BaseProtocol implements IReceiver {
}) })
)[0]; )[0];
if (!peer) {
throw new Error("No peer found to initiate subscription.");
}
const subscription = const subscription =
this.getActiveSubscription(pubsubTopic, peer.id.toString()) ?? this.getActiveSubscription(pubsubTopic, peer.id.toString()) ??
this.setActiveSubscription( this.setActiveSubscription(

View File

@ -386,10 +386,14 @@ describe("Waku Filter V2 (Named sharding): Multiple PubsubTopics", function () {
this.beforeEach(async function () { this.beforeEach(async function () {
this.timeout(15000); this.timeout(15000);
[nwaku, waku] = await runNodes(this, [ [nwaku, waku] = await runNodes(
customPubsubTopic1, this,
customPubsubTopic2 [customPubsubTopic1, customPubsubTopic2],
]); {
clusterId: 3,
shards: [1, 2]
}
);
subscription = await waku.filter.createSubscription(customPubsubTopic1); subscription = await waku.filter.createSubscription(customPubsubTopic1);
messageCollector = new MessageCollector(); messageCollector = new MessageCollector();
}); });

View File

@ -79,12 +79,22 @@ export async function runNodes(
log.error("jswaku node failed to start:", error); log.error("jswaku node failed to start:", error);
} }
if (waku) { if (!waku) {
throw new Error("Failed to initialize waku");
}
await waku.dial(await nwaku.getMultiaddrWithId()); await waku.dial(await nwaku.getMultiaddrWithId());
await waitForRemotePeer(waku, [Protocols.Filter, Protocols.LightPush]); await waitForRemotePeer(waku, [Protocols.Filter, Protocols.LightPush]);
await nwaku.ensureSubscriptions(pubsubTopics); await nwaku.ensureSubscriptions(pubsubTopics);
return [nwaku, waku];
} else { const wakuConnections = waku.libp2p.getConnections();
throw new Error("Failed to initialize waku"); const nwakuPeers = await nwaku.peers();
if (wakuConnections.length < 1 || nwakuPeers.length < 1) {
throw new Error(
`Expected at least 1 peer in each node. Got waku connections: ${wakuConnections.length} and nwaku: ${nwakuPeers.length}`
);
} }
return [nwaku, waku];
} }

View File

@ -342,13 +342,22 @@ describe("Waku Store (named sharding), custom pubsub topic", function () {
nwaku = new ServiceNode(makeLogFileName(this)); nwaku = new ServiceNode(makeLogFileName(this));
await nwaku.start({ await nwaku.start({
store: true, store: true,
pubsubTopic: [customShardedPubsubTopic1, customShardedPubsubTopic2], relay: true,
relay: true pubsubTopic: [customShardedPubsubTopic1, customShardedPubsubTopic2]
}); });
await nwaku.ensureSubscriptions([ await nwaku.ensureSubscriptions([
customShardedPubsubTopic1, customShardedPubsubTopic1,
customShardedPubsubTopic2 customShardedPubsubTopic2
]); ]);
waku = await startAndConnectLightNode(
nwaku,
[customShardedPubsubTopic1, customShardedPubsubTopic2],
{
clusterId: 3,
shards: [1, 2]
}
);
}); });
afterEach(async function () { afterEach(async function () {
@ -363,10 +372,7 @@ describe("Waku Store (named sharding), custom pubsub topic", function () {
customContentTopic1, customContentTopic1,
customShardedPubsubTopic1 customShardedPubsubTopic1
); );
waku = await startAndConnectLightNode(nwaku, [
customShardedPubsubTopic1,
customShardedPubsubTopic2
]);
const messages = await processQueriedMessages( const messages = await processQueriedMessages(
waku, waku,
[customDecoder1], [customDecoder1],
@ -397,11 +403,6 @@ describe("Waku Store (named sharding), custom pubsub topic", function () {
customShardedPubsubTopic2 customShardedPubsubTopic2
); );
waku = await startAndConnectLightNode(nwaku, [
customShardedPubsubTopic1,
customShardedPubsubTopic2
]);
const customMessages = await processQueriedMessages( const customMessages = await processQueriedMessages(
waku, waku,
[customDecoder1], [customDecoder1],
@ -451,13 +452,6 @@ describe("Waku Store (named sharding), custom pubsub topic", function () {
customShardedPubsubTopic2 customShardedPubsubTopic2
); );
waku = await createLightNode({
staticNoiseKey: NOISE_KEY_1,
pubsubTopics: [customShardedPubsubTopic1, customShardedPubsubTopic2]
});
await waku.start();
await waku.dial(await nwaku.getMultiaddrWithId());
await waku.dial(await nwaku2.getMultiaddrWithId()); await waku.dial(await nwaku2.getMultiaddrWithId());
await waitForRemotePeer(waku, [Protocols.Store]); await waitForRemotePeer(waku, [Protocols.Store]);

View File

@ -107,16 +107,27 @@ export async function startAndConnectLightNode(
shardInfo?: ShardingParams shardInfo?: ShardingParams
): Promise<LightNode> { ): Promise<LightNode> {
const waku = await createLightNode({ const waku = await createLightNode({
pubsubTopics: shardInfo ? undefined : pubsubTopics,
staticNoiseKey: NOISE_KEY_1,
libp2p: { addresses: { listen: ["/ip4/0.0.0.0/tcp/0/ws"] } },
...((pubsubTopics.length !== 1 || ...((pubsubTopics.length !== 1 ||
pubsubTopics[0] !== DefaultPubsubTopic) && { pubsubTopics[0] !== DefaultPubsubTopic) && {
shardInfo: shardInfo shardInfo: shardInfo
}), })
pubsubTopics: shardInfo ? undefined : pubsubTopics,
staticNoiseKey: NOISE_KEY_1
}); });
await waku.start(); await waku.start();
await waku.dial(await instance.getMultiaddrWithId()); await waku.dial(await instance.getMultiaddrWithId());
await waitForRemotePeer(waku, [Protocols.Store]); await waitForRemotePeer(waku, [Protocols.Store]);
const wakuConnections = waku.libp2p.getConnections();
const nwakuPeers = await instance.peers();
if (wakuConnections.length < 1 || nwakuPeers.length < 1) {
throw new Error(
`Expected at least 1 peer in each node. Got waku connections: ${wakuConnections.length} and nwaku: ${nwakuPeers.length}`
);
}
log.info("Waku node created"); log.info("Waku node created");
return waku; return waku;
} }