feat: shard retrieval for store and store peers selection (#2417)

* feat: implement shard retrieval for store and improve set store peers usage

* remove log

* remove only, improve condition

* implement smarter way to retrieve peers

* up tests

* update mock

* address nits, add target to eslint, revert to es2022
This commit is contained in:
Sasha 2025-06-23 10:01:54 +02:00 committed by GitHub
parent fcc6496fef
commit f55db3eb4b
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
10 changed files with 150 additions and 72 deletions

View File

@ -4,7 +4,6 @@ import {
type Peer,
type PeerId,
type PeerInfo,
type PeerStore,
type Stream,
TypedEventEmitter
} from "@libp2p/interface";
@ -574,12 +573,9 @@ export class ConnectionManager
return false;
}
const isSameShard = await this.isPeerTopicConfigured(peerId);
const isSameShard = await this.isPeerOnSameShard(peerId);
if (!isSameShard) {
const shardInfo = await this.getPeerShardInfo(
peerId,
this.libp2p.peerStore
);
const shardInfo = await this.getPeerShardInfo(peerId);
log.warn(
`Discovered peer ${peerId.toString()} with ShardInfo ${shardInfo} is not part of any of the configured pubsub topics (${
@ -666,28 +662,40 @@ export class ConnectionManager
}
}
private async isPeerTopicConfigured(peerId: PeerId): Promise<boolean> {
const shardInfo = await this.getPeerShardInfo(
peerId,
this.libp2p.peerStore
);
public async isPeerOnSameShard(peerId: PeerId): Promise<boolean> {
const shardInfo = await this.getPeerShardInfo(peerId);
// If there's no shard information, simply return true
if (!shardInfo) return true;
if (!shardInfo) {
return true;
}
const pubsubTopics = shardInfoToPubsubTopics(shardInfo);
const isTopicConfigured = pubsubTopics.some((topic) =>
this.pubsubTopics.includes(topic)
);
return isTopicConfigured;
}
private async getPeerShardInfo(
public async isPeerOnPubsubTopic(
peerId: PeerId,
peerStore: PeerStore
pubsubTopic: string
): Promise<boolean> {
const shardInfo = await this.getPeerShardInfo(peerId);
if (!shardInfo) {
return true;
}
const pubsubTopics = shardInfoToPubsubTopics(shardInfo);
return pubsubTopics.some((t) => t === pubsubTopic);
}
private async getPeerShardInfo(
peerId: PeerId
): Promise<ShardInfo | undefined> {
const peer = await peerStore.get(peerId);
const peer = await this.libp2p.peerStore.get(peerId);
const shardInfoBytes = peer.metadata.get("shardInfo");
if (!shardInfoBytes) return undefined;
return decodeRelayShard(shardInfoBytes);

View File

@ -99,5 +99,9 @@ export type IStore = {
};
export type StoreProtocolOptions = {
peer: string;
/**
* List of Multi-addresses of peers to be prioritized for Store protocol queries.
* @default []
*/
peers: string[];
};

View File

@ -106,8 +106,13 @@ export async function createLibp2pAndUpdateOptions(
peerDiscovery.push(...getPeerDiscoveries(options.discovery));
}
if (options?.bootstrapPeers) {
peerDiscovery.push(bootstrap({ list: options.bootstrapPeers }));
const bootstrapPeers = [
...(options.bootstrapPeers || []),
...(options.store?.peers || [])
];
if (bootstrapPeers.length) {
peerDiscovery.push(bootstrap({ list: bootstrapPeers }));
}
libp2pOptions.peerDiscovery = peerDiscovery;

View File

@ -1,5 +1,12 @@
import type { PeerId } from "@libp2p/interface";
import { ConnectionManager, messageHash, StoreCore } from "@waku/core";
import type { Peer, PeerId } from "@libp2p/interface";
import { peerIdFromString } from "@libp2p/peer-id";
import { multiaddr } from "@multiformats/multiaddr";
import {
ConnectionManager,
messageHash,
StoreCodec,
StoreCore
} from "@waku/core";
import {
IDecodedMessage,
IDecoder,
@ -28,14 +35,14 @@ type StoreConstructorParams = {
*/
export class Store implements IStore {
private readonly options: Partial<StoreProtocolOptions>;
private readonly peerManager: PeerManager;
private readonly libp2p: Libp2p;
private readonly connectionManager: ConnectionManager;
private readonly protocol: StoreCore;
public constructor(params: StoreConstructorParams) {
this.options = params.options || {};
this.peerManager = params.peerManager;
this.connectionManager = params.connectionManager;
this.libp2p = params.libp2p;
this.protocol = new StoreCore(
params.connectionManager.pubsubTopics,
@ -93,7 +100,7 @@ export class Store implements IStore {
...options
};
const peer = await this.getPeerToUse();
const peer = await this.getPeerToUse(pubsubTopic);
if (!peer) {
log.error("No peers available to query");
@ -260,32 +267,81 @@ export class Store implements IStore {
};
}
private async getPeerToUse(): Promise<PeerId | undefined> {
let peerId: PeerId | undefined;
private async getPeerToUse(pubsubTopic: string): Promise<PeerId | undefined> {
const peers = await this.filterConnectedPeers(pubsubTopic);
if (this.options?.peer) {
const connectedPeers = await this.connectionManager.getConnectedPeers();
const peer = this.options.peers
? await this.getPeerFromConfigurationOrFirst(peers, this.options.peers)
: peers[0]?.id;
const peer = connectedPeers.find(
(p) => p.id.toString() === this.options?.peer
return peer;
}
private async getPeerFromConfigurationOrFirst(
peers: Peer[],
configPeers: string[]
): Promise<PeerId | undefined> {
const storeConfigPeers = configPeers.map(multiaddr);
const missing = [];
for (const peer of storeConfigPeers) {
const matchedPeer = peers.find(
(p) => p.id.toString() === peer.getPeerId()?.toString()
);
peerId = peer?.id;
if (!peerId) {
if (matchedPeer) {
return matchedPeer.id;
}
missing.push(peer);
}
while (missing.length) {
const toDial = missing.pop();
if (!toDial) {
return;
}
try {
const conn = await this.libp2p.dial(toDial);
if (conn) {
return peerIdFromString(toDial.getPeerId() as string);
}
} catch (e) {
log.warn(
`Passed node to use for Store not found: ${this.options.peer}. Attempting to use random peers.`
`Failed to dial peer from options.peers list for Store protocol. Peer:${toDial.getPeerId()}, error:${e}`
);
}
}
const peerIds = this.peerManager.getPeers();
log.warn(
`Passed node to use for Store not found: ${configPeers.toString()}. Attempting to use first available peers.`
);
if (peerIds.length > 0) {
// TODO(weboko): implement smart way of getting a peer https://github.com/waku-org/js-waku/issues/2243
return peerIds[Math.floor(Math.random() * peerIds.length)];
return peers[0]?.id;
}
private async filterConnectedPeers(pubsubTopic: string): Promise<Peer[]> {
const peers = await this.connectionManager.getConnectedPeers();
const result: Peer[] = [];
for (const peer of peers) {
const isStoreCodec = peer.protocols.includes(StoreCodec);
const isSameShard = await this.connectionManager.isPeerOnSameShard(
peer.id
);
const isSamePubsub = await this.connectionManager.isPeerOnPubsubTopic(
peer.id,
pubsubTopic
);
if (isStoreCodec && isSameShard && isSamePubsub) {
result.push(peer);
}
}
log.error("No peers available to use.");
return;
return result;
}
}

View File

@ -1,11 +1,6 @@
import type { Peer, PeerId, Stream } from "@libp2p/interface";
import { MultiaddrInput } from "@multiformats/multiaddr";
import {
ConnectionManager,
createDecoder,
createEncoder,
StoreCodec
} from "@waku/core";
import { ConnectionManager, createDecoder, createEncoder } from "@waku/core";
import type {
CreateDecoderParams,
CreateEncoderParams,
@ -103,21 +98,11 @@ export class WakuNode implements IWaku {
this.health = new HealthIndicator({ libp2p });
if (protocolsEnabled.store) {
if (options.store?.peer) {
this.connectionManager
.rawDialPeerWithProtocols(options.store.peer, [StoreCodec])
.catch((e) => {
log.error("Failed to dial store peer", e);
});
}
this.store = new Store({
libp2p,
connectionManager: this.connectionManager,
peerManager: this.peerManager,
options: {
peer: options.store?.peer
}
options: options?.store
});
}

View File

@ -17,21 +17,21 @@ describe("Dials", function () {
let dialPeerStub: SinonStub;
let getConnectionsStub: SinonStub;
let getTagNamesForPeerStub: SinonStub;
let isPeerTopicConfigured: SinonStub;
let isPeerOnSameShard: SinonStub;
let waku: LightNode;
beforeEachCustom(this, async () => {
waku = await createLightNode();
isPeerTopicConfigured = sinon.stub(
isPeerOnSameShard = sinon.stub(
waku.connectionManager as any,
"isPeerTopicConfigured"
"isPeerOnSameShard"
);
isPeerTopicConfigured.resolves(true);
isPeerOnSameShard.resolves(true);
});
afterEachCustom(this, async () => {
await tearDownNodes([], waku);
isPeerTopicConfigured.restore();
isPeerOnSameShard.restore();
sinon.restore();
});

View File

@ -20,7 +20,7 @@ describe("multiaddr: dialing", function () {
let waku: IWaku;
let nwaku: ServiceNode;
let dialPeerSpy: SinonSpy;
let isPeerTopicConfigured: SinonStub;
let isPeerOnSameShard: SinonStub;
afterEachCustom(this, async () => {
await tearDownNodes(nwaku, waku);
@ -63,11 +63,11 @@ describe("multiaddr: dialing", function () {
peerId = await nwaku.getPeerId();
multiaddr = await nwaku.getMultiaddrWithId();
isPeerTopicConfigured = Sinon.stub(
isPeerOnSameShard = Sinon.stub(
waku.connectionManager as any,
"isPeerTopicConfigured"
"isPeerOnSameShard"
);
isPeerTopicConfigured.resolves(true);
isPeerOnSameShard.resolves(true);
dialPeerSpy = Sinon.spy(waku.connectionManager as any, "dialPeer");
});

View File

@ -304,13 +304,10 @@ describe("Waku Store, general", function () {
for await (const msg of query) {
if (msg) {
messages.push(msg as DecodedMessage);
console.log(bytesToUtf8(msg.payload!));
}
}
}
console.log(messages.length);
// Messages are ordered from oldest to latest within a page (1 page query)
expect(bytesToUtf8(messages[0].payload!)).to.eq(asymText);
expect(bytesToUtf8(messages[1].payload!)).to.eq(symText);

View File

@ -105,15 +105,25 @@ describe("Waku Store, custom pubsub topic", function () {
it("Generator, 2 nwaku nodes each with different pubsubtopics", async function () {
this.timeout(10000);
await tearDownNodes([nwaku], []);
// make sure each nwaku node operates on dedicated shard only
nwaku = new ServiceNode(makeLogFileName(this) + "1");
await nwaku.start({
store: true,
clusterId: TestShardInfo.clusterId,
shard: [TestShardInfo.shards[0]],
relay: true
});
// Set up and start a new nwaku node with Default Pubsubtopic
nwaku2 = new ServiceNode(makeLogFileName(this) + "2");
await nwaku2.start({
store: true,
clusterId: TestShardInfo.clusterId,
shard: TestShardInfo.shards,
shard: [TestShardInfo.shards[1]],
relay: true
});
await nwaku2.ensureSubscriptions([TestDecoder2.pubsubTopic]);
const totalMsgs = 10;
await sendMessages(
@ -129,6 +139,7 @@ describe("Waku Store, custom pubsub topic", function () {
TestDecoder2.pubsubTopic
);
await waku.dial(await nwaku.getMultiaddrWithId());
await waku.dial(await nwaku2.getMultiaddrWithId());
await waku.waitForPeers([Protocols.Store]);
@ -366,6 +377,17 @@ describe("Waku Store (named sharding), custom pubsub topic", function () {
it("Generator, 2 nwaku nodes each with different pubsubtopics", async function () {
this.timeout(10000);
await tearDownNodes([nwaku], []);
// make sure each nwaku node operates on dedicated shard only
nwaku = new ServiceNode(makeLogFileName(this) + "1");
await nwaku.start({
store: true,
clusterId: TestShardInfo.clusterId,
shard: [TestShardInfo.shards[0]],
relay: true
});
// Set up and start a new nwaku node with Default Pubsubtopic
nwaku2 = new ServiceNode(makeLogFileName(this) + "2");
await nwaku2.start({
@ -390,6 +412,7 @@ describe("Waku Store (named sharding), custom pubsub topic", function () {
TestDecoder2.pubsubTopic
);
await waku.dial(await nwaku.getMultiaddrWithId());
await waku.dial(await nwaku2.getMultiaddrWithId());
await waku.waitForPeers([Protocols.Store]);

View File

@ -1,7 +1,7 @@
{
"compilerOptions": {
"incremental": true,
"target": "ES2023",
"target": "ES2022",
"moduleResolution": "Bundler",
"module": "esnext",
"declaration": true,
@ -38,7 +38,7 @@
// "experimentalDecorators": true /* Enables experimental support for ES7 decorators. */,
// "emitDecoratorMetadata": true /* Enables experimental support for emitting type metadata for decorators. */,
"lib": ["es2023", "dom"],
"lib": ["es2022", "dom"],
"types": ["node", "mocha"],
"typeRoots": ["node_modules/@types"]
},