mirror of
https://github.com/logos-messaging/js-waku.git
synced 2026-01-02 13:53:12 +00:00
feat: make peer manager aware of codec and shard, fix retry manager and shut down subscriptions (#2425)
* implement new peer manager, use in lightPush, improve retry manager and fix retry bug * fix unsubscribe issue * remove not needed usage of pubsub, use peer manager in store sdk * chore: remove deprecated filter implementation * update tests * update next filter for new peer manager * skip IReceiver test, remove unused utility * remove comment * fix typo * remove old connection based peer manager * update types, export, and edge case for light push * add retry manager tests * add new peer manager tests * refactor tests * use peer manager events in filter and check for pubsub topic as well * update test names * address comments * unskip Filter e2e test * address more comments, remove duplication * skip CI test * update after merge * move to peer:idenfity and peer:disconnect events, improve mapping in filter subscriptions * update tests * add logs and change peer manager time lock to 10s
This commit is contained in:
parent
981248eedd
commit
058f2ff620
@ -22,7 +22,7 @@ import {
|
||||
FilterSubscribeRpc
|
||||
} from "./filter_rpc.js";
|
||||
|
||||
const log = new Logger("filter:v2");
|
||||
const log = new Logger("filter-core");
|
||||
|
||||
export const FilterCodecs = {
|
||||
SUBSCRIBE: "/vac/waku/filter-subscribe/2.0.0-beta1",
|
||||
@ -104,6 +104,10 @@ export class FilterCore {
|
||||
lp.decode,
|
||||
async (source) => await all(source)
|
||||
);
|
||||
|
||||
if (!res?.length) {
|
||||
throw Error("Received no response from subscription request.");
|
||||
}
|
||||
} catch (error) {
|
||||
log.error("Failed to send subscribe request", error);
|
||||
return {
|
||||
|
||||
@ -3,7 +3,6 @@ import {
|
||||
IDecodedMessage,
|
||||
IDecoder,
|
||||
Libp2p,
|
||||
PubsubTopic,
|
||||
QueryRequestParams
|
||||
} from "@waku/interfaces";
|
||||
import { Logger } from "@waku/utils";
|
||||
@ -31,10 +30,7 @@ export class StoreCore {
|
||||
|
||||
public readonly multicodec = StoreCodec;
|
||||
|
||||
public constructor(
|
||||
public readonly pubsubTopics: PubsubTopic[],
|
||||
libp2p: Libp2p
|
||||
) {
|
||||
public constructor(libp2p: Libp2p) {
|
||||
this.streamManager = new StreamManager(StoreCodec, libp2p.components);
|
||||
}
|
||||
|
||||
|
||||
@ -166,7 +166,11 @@ function mockConnectionManager(): ConnectionManager {
|
||||
|
||||
function mockPeerManager(): PeerManager {
|
||||
return {
|
||||
getPeers: sinon.stub().returns([])
|
||||
getPeers: sinon.stub().returns([]),
|
||||
events: {
|
||||
addEventListener: sinon.stub(),
|
||||
removeEventListener: sinon.stub()
|
||||
}
|
||||
} as unknown as PeerManager;
|
||||
}
|
||||
|
||||
|
||||
@ -4,8 +4,7 @@ import type {
|
||||
FilterProtocolOptions,
|
||||
IDecodedMessage,
|
||||
IDecoder,
|
||||
IFilter,
|
||||
Libp2p
|
||||
IFilter
|
||||
} from "@waku/interfaces";
|
||||
import { WakuMessage } from "@waku/proto";
|
||||
import { Logger } from "@waku/utils";
|
||||
@ -15,12 +14,11 @@ import { PeerManager } from "../peer_manager/index.js";
|
||||
import { Subscription } from "./subscription.js";
|
||||
import { FilterConstructorParams } from "./types.js";
|
||||
|
||||
const log = new Logger("sdk:next-filter");
|
||||
const log = new Logger("sdk:filter");
|
||||
|
||||
type PubsubTopic = string;
|
||||
|
||||
export class Filter implements IFilter {
|
||||
private readonly libp2p: Libp2p;
|
||||
private readonly protocol: FilterCore;
|
||||
private readonly peerManager: PeerManager;
|
||||
private readonly connectionManager: ConnectionManager;
|
||||
@ -36,7 +34,6 @@ export class Filter implements IFilter {
|
||||
...params.options
|
||||
};
|
||||
|
||||
this.libp2p = params.libp2p;
|
||||
this.peerManager = params.peerManager;
|
||||
this.connectionManager = params.connectionManager;
|
||||
|
||||
@ -85,7 +82,6 @@ export class Filter implements IFilter {
|
||||
if (!subscription) {
|
||||
subscription = new Subscription({
|
||||
pubsubTopic: singlePubsubTopic,
|
||||
libp2p: this.libp2p,
|
||||
protocol: this.protocol,
|
||||
config: this.config,
|
||||
peerManager: this.peerManager
|
||||
|
||||
@ -1,10 +1,8 @@
|
||||
import type { PeerId } from "@libp2p/interface";
|
||||
import { FilterCore } from "@waku/core";
|
||||
import type {
|
||||
FilterProtocolOptions,
|
||||
IDecodedMessage,
|
||||
IDecoder,
|
||||
Libp2p
|
||||
IDecoder
|
||||
} from "@waku/interfaces";
|
||||
import { WakuMessage } from "@waku/proto";
|
||||
import { expect } from "chai";
|
||||
@ -18,7 +16,6 @@ const PUBSUB_TOPIC = "/waku/2/rs/1/4";
|
||||
const CONTENT_TOPIC = "/test/1/waku-filter/utf8";
|
||||
|
||||
describe("Filter Subscription", () => {
|
||||
let libp2p: Libp2p;
|
||||
let filterCore: FilterCore;
|
||||
let peerManager: PeerManager;
|
||||
let subscription: Subscription;
|
||||
@ -26,7 +23,6 @@ describe("Filter Subscription", () => {
|
||||
let config: FilterProtocolOptions;
|
||||
|
||||
beforeEach(() => {
|
||||
libp2p = mockLibp2p();
|
||||
filterCore = mockFilterCore();
|
||||
peerManager = mockPeerManager();
|
||||
config = {
|
||||
@ -37,7 +33,6 @@ describe("Filter Subscription", () => {
|
||||
|
||||
subscription = new Subscription({
|
||||
pubsubTopic: PUBSUB_TOPIC,
|
||||
libp2p,
|
||||
protocol: filterCore,
|
||||
config,
|
||||
peerManager
|
||||
@ -193,23 +188,6 @@ describe("Filter Subscription", () => {
|
||||
});
|
||||
});
|
||||
|
||||
function mockLibp2p(): Libp2p {
|
||||
return {
|
||||
addEventListener: sinon.stub(),
|
||||
removeEventListener: sinon.stub(),
|
||||
handle: sinon.stub().resolves(),
|
||||
components: {
|
||||
events: {
|
||||
addEventListener: sinon.stub(),
|
||||
removeEventListener: sinon.stub()
|
||||
},
|
||||
connectionManager: {
|
||||
getConnections: sinon.stub().returns([])
|
||||
}
|
||||
}
|
||||
} as unknown as Libp2p;
|
||||
}
|
||||
|
||||
function mockFilterCore(): FilterCore {
|
||||
return {
|
||||
subscribe: sinon.stub().resolves(true),
|
||||
@ -220,20 +198,19 @@ function mockFilterCore(): FilterCore {
|
||||
|
||||
function mockPeerManager(): PeerManager {
|
||||
return {
|
||||
getPeers: sinon.stub().returns([mockPeerId("peer1"), mockPeerId("peer2")])
|
||||
getPeers: sinon.stub().resolves([]),
|
||||
renewPeer: sinon.stub().resolves(),
|
||||
events: {
|
||||
addEventListener: sinon.stub(),
|
||||
removeEventListener: sinon.stub()
|
||||
}
|
||||
} as unknown as PeerManager;
|
||||
}
|
||||
|
||||
function mockPeerId(id: string): PeerId {
|
||||
return {
|
||||
toString: () => id
|
||||
} as unknown as PeerId;
|
||||
}
|
||||
|
||||
function mockDecoder(): IDecoder<IDecodedMessage> {
|
||||
return {
|
||||
pubsubTopic: PUBSUB_TOPIC,
|
||||
contentTopic: CONTENT_TOPIC,
|
||||
fromProtoObj: sinon.stub().resolves(undefined)
|
||||
fromProtoObj: sinon.stub().resolves({ payload: new Uint8Array() })
|
||||
} as unknown as IDecoder<IDecodedMessage>;
|
||||
}
|
||||
|
||||
@ -10,12 +10,13 @@ import type {
|
||||
IDecodedMessage,
|
||||
IDecoder,
|
||||
IProtoMessage,
|
||||
Libp2p
|
||||
PeerIdStr
|
||||
} from "@waku/interfaces";
|
||||
import { Protocols } from "@waku/interfaces";
|
||||
import { WakuMessage } from "@waku/proto";
|
||||
import { Logger } from "@waku/utils";
|
||||
|
||||
import { PeerManager } from "../peer_manager/index.js";
|
||||
import { PeerManager, PeerManagerEventNames } from "../peer_manager/index.js";
|
||||
|
||||
import { SubscriptionEvents, SubscriptionParams } from "./types.js";
|
||||
import { TTLSet } from "./utils.js";
|
||||
@ -31,8 +32,9 @@ type AttemptUnsubscribeParams = {
|
||||
useNewContentTopics: boolean;
|
||||
};
|
||||
|
||||
type Libp2pEventHandler = (e: CustomEvent<PeerId>) => void;
|
||||
|
||||
export class Subscription {
|
||||
private readonly libp2p: Libp2p;
|
||||
private readonly pubsubTopic: string;
|
||||
private readonly protocol: FilterCore;
|
||||
private readonly peerManager: PeerManager;
|
||||
@ -42,8 +44,9 @@ export class Subscription {
|
||||
private isStarted: boolean = false;
|
||||
private inProgress: boolean = false;
|
||||
|
||||
private peers = new Set<PeerId>();
|
||||
private peerFailures = new Map<PeerId, number>();
|
||||
// Map and Set cannot reliably use PeerId type as a key
|
||||
private peers = new Map<PeerIdStr, PeerId>();
|
||||
private peerFailures = new Map<PeerIdStr, number>();
|
||||
|
||||
private readonly receivedMessages = new TTLSet<string>(60_000);
|
||||
|
||||
@ -72,7 +75,6 @@ export class Subscription {
|
||||
this.config = params.config;
|
||||
this.pubsubTopic = params.pubsubTopic;
|
||||
|
||||
this.libp2p = params.libp2p;
|
||||
this.protocol = params.protocol;
|
||||
this.peerManager = params.peerManager;
|
||||
|
||||
@ -311,13 +313,13 @@ export class Subscription {
|
||||
|
||||
if (response.success) {
|
||||
log.info(`Ping successful for peer: ${peer.toString()}`);
|
||||
this.peerFailures.set(peer, 0);
|
||||
this.peerFailures.set(peer.toString(), 0);
|
||||
return;
|
||||
}
|
||||
|
||||
let failures = this.peerFailures.get(peer) || 0;
|
||||
let failures = this.peerFailures.get(peer.toString()) || 0;
|
||||
failures += 1;
|
||||
this.peerFailures.set(peer, failures);
|
||||
this.peerFailures.set(peer.toString(), failures);
|
||||
|
||||
log.warn(
|
||||
`Ping failed for peer: ${peer.toString()}, failures: ${failures}/${this.config.pingsBeforePeerRenewed}`
|
||||
@ -339,8 +341,8 @@ export class Subscription {
|
||||
|
||||
await Promise.all(
|
||||
peersToReplace.map((p) => {
|
||||
this.peers.delete(p as PeerId);
|
||||
this.peerFailures.delete(p as PeerId);
|
||||
this.peers.delete(p?.toString() as PeerIdStr);
|
||||
this.peerFailures.delete(p?.toString() as PeerIdStr);
|
||||
return this.requestUnsubscribe(p as PeerId, this.contentTopics);
|
||||
})
|
||||
);
|
||||
@ -360,13 +362,13 @@ export class Subscription {
|
||||
}
|
||||
|
||||
private setupEventListeners(): void {
|
||||
this.libp2p.addEventListener(
|
||||
"peer:connect",
|
||||
(e) => void this.onPeerConnected(e)
|
||||
this.peerManager.events.addEventListener(
|
||||
PeerManagerEventNames.Connect,
|
||||
this.onPeerConnected as Libp2pEventHandler
|
||||
);
|
||||
this.libp2p.addEventListener(
|
||||
"peer:disconnect",
|
||||
(e) => void this.onPeerDisconnected(e)
|
||||
this.peerManager.events.addEventListener(
|
||||
PeerManagerEventNames.Disconnect,
|
||||
this.onPeerDisconnected as Libp2pEventHandler
|
||||
);
|
||||
}
|
||||
|
||||
@ -395,41 +397,65 @@ export class Subscription {
|
||||
}
|
||||
|
||||
private disposeEventListeners(): void {
|
||||
this.libp2p.removeEventListener("peer:connect", this.onPeerConnected);
|
||||
this.libp2p.removeEventListener("peer:disconnect", this.onPeerDisconnected);
|
||||
this.peerManager.events.removeEventListener(
|
||||
PeerManagerEventNames.Connect,
|
||||
this.onPeerConnected as Libp2pEventHandler
|
||||
);
|
||||
this.peerManager.events.removeEventListener(
|
||||
PeerManagerEventNames.Disconnect,
|
||||
this.onPeerDisconnected as Libp2pEventHandler
|
||||
);
|
||||
}
|
||||
|
||||
private onPeerConnected(event: CustomEvent<PeerId>): void {
|
||||
log.info(`Peer connected: ${event.detail.toString()}`);
|
||||
private async onPeerConnected(event: CustomEvent<PeerId>): Promise<void> {
|
||||
const id = event.detail?.toString();
|
||||
log.info(`Peer connected: ${id}`);
|
||||
|
||||
// skip the peer we already subscribe to
|
||||
if (this.peers.has(event.detail)) {
|
||||
log.info(`Peer ${event.detail.toString()} already subscribed, skipping`);
|
||||
const usablePeer = await this.peerManager.isPeerOnPubsub(
|
||||
event.detail,
|
||||
this.pubsubTopic
|
||||
);
|
||||
|
||||
if (!usablePeer) {
|
||||
log.info(`Peer ${id} doesn't support pubsubTopic:${this.pubsubTopic}`);
|
||||
return;
|
||||
}
|
||||
|
||||
void this.attemptSubscribe({
|
||||
// skip the peer we already subscribe to
|
||||
if (this.peers.has(id)) {
|
||||
log.info(`Peer ${id} already subscribed, skipping`);
|
||||
return;
|
||||
}
|
||||
|
||||
await this.attemptSubscribe({
|
||||
useNewContentTopics: false,
|
||||
useOnlyNewPeers: true
|
||||
});
|
||||
}
|
||||
|
||||
private onPeerDisconnected(event: CustomEvent<PeerId>): void {
|
||||
log.info(`Peer disconnected: ${event.detail.toString()}`);
|
||||
private async onPeerDisconnected(event: CustomEvent<PeerId>): Promise<void> {
|
||||
const id = event.detail?.toString();
|
||||
log.info(`Peer disconnected: ${id}`);
|
||||
|
||||
// ignore as the peer is not the one that is in use
|
||||
if (!this.peers.has(event.detail)) {
|
||||
log.info(
|
||||
`Disconnected peer ${event.detail.toString()} not in use, ignoring`
|
||||
);
|
||||
const usablePeer = await this.peerManager.isPeerOnPubsub(
|
||||
event.detail,
|
||||
this.pubsubTopic
|
||||
);
|
||||
|
||||
if (!usablePeer) {
|
||||
log.info(`Peer ${id} doesn't support pubsubTopic:${this.pubsubTopic}`);
|
||||
return;
|
||||
}
|
||||
|
||||
log.info(
|
||||
`Active peer ${event.detail.toString()} disconnected, removing from peers list`
|
||||
);
|
||||
// ignore as the peer is not the one that is in use
|
||||
if (!this.peers.has(id)) {
|
||||
log.info(`Disconnected peer ${id} not in use, ignoring`);
|
||||
return;
|
||||
}
|
||||
|
||||
this.peers.delete(event.detail);
|
||||
log.info(`Active peer ${id} disconnected, removing from peers list`);
|
||||
|
||||
this.peers.delete(id);
|
||||
void this.attemptSubscribe({
|
||||
useNewContentTopics: false,
|
||||
useOnlyNewPeers: true
|
||||
@ -454,18 +480,24 @@ export class Subscription {
|
||||
return false;
|
||||
}
|
||||
|
||||
const prevPeers = new Set(this.peers);
|
||||
const peersToAdd = this.peerManager.getPeers();
|
||||
const prevPeers = new Set<PeerIdStr>(this.peers.keys());
|
||||
const peersToAdd = await this.peerManager.getPeers({
|
||||
protocol: Protocols.Filter,
|
||||
pubsubTopic: this.pubsubTopic
|
||||
});
|
||||
|
||||
for (const peer of peersToAdd) {
|
||||
if (this.peers.size >= this.config.numPeersToUse) {
|
||||
break;
|
||||
}
|
||||
|
||||
this.peers.add(peer);
|
||||
this.peers.set(peer.toString(), peer);
|
||||
}
|
||||
|
||||
const peersToUse = useOnlyNewPeers
|
||||
? Array.from(this.peers.values()).filter((p) => !prevPeers.has(p))
|
||||
? Array.from(this.peers.values()).filter(
|
||||
(p) => !prevPeers.has(p.toString())
|
||||
)
|
||||
: Array.from(this.peers.values());
|
||||
|
||||
log.info(
|
||||
|
||||
@ -17,7 +17,6 @@ export type SubscriptionEvents = {
|
||||
};
|
||||
|
||||
export type SubscriptionParams = {
|
||||
libp2p: Libp2p;
|
||||
pubsubTopic: string;
|
||||
protocol: FilterCore;
|
||||
config: FilterProtocolOptions;
|
||||
|
||||
@ -10,6 +10,7 @@ import {
|
||||
type Libp2p,
|
||||
LightPushProtocolOptions,
|
||||
ProtocolError,
|
||||
Protocols,
|
||||
SDKProtocolResult
|
||||
} from "@waku/interfaces";
|
||||
import { Logger } from "@waku/utils";
|
||||
@ -95,47 +96,50 @@ export class LightPush implements ILightPush {
|
||||
};
|
||||
}
|
||||
|
||||
const peerIds = this.peerManager
|
||||
.getPeers()
|
||||
.slice(0, this.config.numPeersToUse);
|
||||
const peerIds = await this.peerManager.getPeers({
|
||||
protocol: Protocols.LightPush,
|
||||
pubsubTopic: encoder.pubsubTopic
|
||||
});
|
||||
|
||||
if (peerIds.length === 0) {
|
||||
return {
|
||||
successes: [],
|
||||
failures: [
|
||||
{
|
||||
error: ProtocolError.NO_PEER_AVAILABLE
|
||||
}
|
||||
]
|
||||
};
|
||||
}
|
||||
const coreResults: CoreProtocolResult[] =
|
||||
peerIds?.length > 0
|
||||
? await Promise.all(
|
||||
peerIds.map((peerId) =>
|
||||
this.protocol.send(encoder, message, peerId).catch((_e) => ({
|
||||
success: null,
|
||||
failure: {
|
||||
error: ProtocolError.GENERIC_FAIL
|
||||
}
|
||||
}))
|
||||
)
|
||||
)
|
||||
: [];
|
||||
|
||||
const coreResults: CoreProtocolResult[] = await Promise.all(
|
||||
peerIds.map((peerId) =>
|
||||
this.protocol.send(encoder, message, peerId).catch((_e) => ({
|
||||
success: null,
|
||||
failure: {
|
||||
error: ProtocolError.GENERIC_FAIL
|
||||
}
|
||||
}))
|
||||
)
|
||||
);
|
||||
|
||||
const results: SDKProtocolResult = {
|
||||
successes: coreResults
|
||||
.filter((v) => v.success)
|
||||
.map((v) => v.success) as PeerId[],
|
||||
failures: coreResults
|
||||
.filter((v) => v.failure)
|
||||
.map((v) => v.failure) as Failure[]
|
||||
};
|
||||
const results: SDKProtocolResult = coreResults.length
|
||||
? {
|
||||
successes: coreResults
|
||||
.filter((v) => v.success)
|
||||
.map((v) => v.success) as PeerId[],
|
||||
failures: coreResults
|
||||
.filter((v) => v.failure)
|
||||
.map((v) => v.failure) as Failure[]
|
||||
}
|
||||
: {
|
||||
successes: [],
|
||||
failures: [
|
||||
{
|
||||
error: ProtocolError.NO_PEER_AVAILABLE
|
||||
}
|
||||
]
|
||||
};
|
||||
|
||||
if (options.autoRetry && results.successes.length === 0) {
|
||||
const sendCallback = (peerId: PeerId): Promise<CoreProtocolResult> =>
|
||||
this.protocol.send(encoder, message, peerId);
|
||||
this.retryManager.push(
|
||||
sendCallback.bind(this),
|
||||
options.maxAttempts || DEFAULT_MAX_ATTEMPTS
|
||||
options.maxAttempts || DEFAULT_MAX_ATTEMPTS,
|
||||
encoder.pubsubTopic
|
||||
);
|
||||
}
|
||||
|
||||
|
||||
@ -1,5 +1,9 @@
|
||||
import type { PeerId } from "@libp2p/interface";
|
||||
import { type CoreProtocolResult, ProtocolError } from "@waku/interfaces";
|
||||
import {
|
||||
type CoreProtocolResult,
|
||||
ProtocolError,
|
||||
Protocols
|
||||
} from "@waku/interfaces";
|
||||
import { expect } from "chai";
|
||||
import sinon from "sinon";
|
||||
|
||||
@ -19,7 +23,7 @@ describe("RetryManager", () => {
|
||||
mockPeerId = { toString: () => "test-peer-id" } as PeerId;
|
||||
peerManager = {
|
||||
getPeers: () => [mockPeerId],
|
||||
requestRenew: sinon.spy(),
|
||||
renewPeer: sinon.spy(),
|
||||
start: sinon.spy(),
|
||||
stop: sinon.spy()
|
||||
} as unknown as PeerManager;
|
||||
@ -55,16 +59,51 @@ describe("RetryManager", () => {
|
||||
})
|
||||
);
|
||||
|
||||
retryManager.push(successCallback, 3);
|
||||
retryManager.push(successCallback, 3, "test-topic");
|
||||
retryManager.start();
|
||||
|
||||
clock.tick(1000);
|
||||
await clock.tickAsync(200);
|
||||
retryManager.stop();
|
||||
|
||||
expect(successCallback.calledOnce, "called").to.be.true;
|
||||
expect(successCallback.calledWith(mockPeerId), "called with peer").to.be
|
||||
.true;
|
||||
});
|
||||
|
||||
it("should requeue task if no peer is available", async () => {
|
||||
(peerManager as any).getPeers = () => [];
|
||||
const callback = sinon.spy();
|
||||
|
||||
retryManager.push(callback, 2, "test-topic");
|
||||
retryManager.start();
|
||||
|
||||
const queue = (retryManager as any)["queue"] as ScheduledTask[];
|
||||
expect(queue.length).to.equal(1);
|
||||
|
||||
await clock.tickAsync(200);
|
||||
retryManager.stop();
|
||||
|
||||
expect(callback.called).to.be.false;
|
||||
expect(queue.length).to.equal(1);
|
||||
expect(queue[0].maxAttempts).to.equal(1);
|
||||
});
|
||||
|
||||
it("should not requeue if maxAttempts is exhausted and no peer is available", async () => {
|
||||
(peerManager as any).getPeers = () => [];
|
||||
const callback = sinon.spy();
|
||||
|
||||
retryManager.push(callback, 1, "test-topic");
|
||||
retryManager.start();
|
||||
const queue = (retryManager as any)["queue"] as ScheduledTask[];
|
||||
expect(queue.length).to.equal(1);
|
||||
|
||||
await clock.tickAsync(500);
|
||||
retryManager.stop();
|
||||
|
||||
expect(callback.called).to.be.false;
|
||||
expect(queue.length).to.equal(0);
|
||||
});
|
||||
|
||||
it("should retry failed tasks", async () => {
|
||||
const failingCallback = sinon.spy(
|
||||
async (): Promise<CoreProtocolResult> => ({
|
||||
@ -75,7 +114,11 @@ describe("RetryManager", () => {
|
||||
|
||||
const queue = (retryManager as any)["queue"] as ScheduledTask[];
|
||||
|
||||
const task = { callback: failingCallback, maxAttempts: 2 };
|
||||
const task = {
|
||||
callback: failingCallback,
|
||||
maxAttempts: 2,
|
||||
pubsubTopic: "test-topic"
|
||||
};
|
||||
await (retryManager as any)["taskExecutor"](task);
|
||||
|
||||
expect(failingCallback.calledOnce, "executed callback").to.be.true;
|
||||
@ -92,12 +135,17 @@ describe("RetryManager", () => {
|
||||
|
||||
await (retryManager as any)["taskExecutor"]({
|
||||
callback: errorCallback,
|
||||
maxAttempts: 1
|
||||
maxAttempts: 1,
|
||||
pubsubTopic: "test-topic"
|
||||
});
|
||||
|
||||
expect((peerManager.requestRenew as sinon.SinonSpy).calledOnce).to.be.true;
|
||||
expect((peerManager.requestRenew as sinon.SinonSpy).calledWith(mockPeerId))
|
||||
.to.be.true;
|
||||
expect((peerManager.renewPeer as sinon.SinonSpy).calledOnce).to.be.true;
|
||||
expect(
|
||||
(peerManager.renewPeer as sinon.SinonSpy).calledWith(mockPeerId, {
|
||||
protocol: Protocols.LightPush,
|
||||
pubsubTopic: "test-topic"
|
||||
})
|
||||
).to.be.true;
|
||||
});
|
||||
|
||||
it("should handle task timeouts", async () => {
|
||||
@ -106,24 +154,64 @@ describe("RetryManager", () => {
|
||||
return { success: mockPeerId, failure: null };
|
||||
});
|
||||
|
||||
const task = { callback: slowCallback, maxAttempts: 1 };
|
||||
const task = {
|
||||
callback: slowCallback,
|
||||
maxAttempts: 1,
|
||||
pubsubTopic: "test-topic"
|
||||
};
|
||||
const executionPromise = (retryManager as any)["taskExecutor"](task);
|
||||
|
||||
clock.tick(11000);
|
||||
await clock.tickAsync(11000);
|
||||
await executionPromise;
|
||||
|
||||
expect(slowCallback.calledOnce).to.be.true;
|
||||
});
|
||||
|
||||
it("should respect max attempts limit", async () => {
|
||||
it("should not execute task if max attempts is 0", async () => {
|
||||
const failingCallback = sinon.spy(async (): Promise<CoreProtocolResult> => {
|
||||
throw new Error("test error" as any);
|
||||
});
|
||||
|
||||
const task = { callback: failingCallback, maxAttempts: 0 };
|
||||
const task = {
|
||||
callback: failingCallback,
|
||||
maxAttempts: 0,
|
||||
pubsubTopic: "test-topic"
|
||||
};
|
||||
await (retryManager as any)["taskExecutor"](task);
|
||||
|
||||
expect(failingCallback.calledOnce).to.be.true;
|
||||
expect(task.maxAttempts).to.equal(0);
|
||||
expect(failingCallback.called).to.be.false;
|
||||
});
|
||||
|
||||
it("should not retry if at least one success", async () => {
|
||||
let called = 0;
|
||||
(peerManager as any).getPeers = () => [mockPeerId];
|
||||
const successCallback = sinon.stub().callsFake(() => {
|
||||
called++;
|
||||
if (called === 1) retryManager.stop();
|
||||
return Promise.resolve({ success: mockPeerId, failure: null });
|
||||
});
|
||||
retryManager.push(successCallback, 2, "test-topic");
|
||||
retryManager.start();
|
||||
await clock.tickAsync(500);
|
||||
expect(called).to.equal(1);
|
||||
});
|
||||
|
||||
it("should retry if all attempts fail", async () => {
|
||||
let called = 0;
|
||||
(peerManager as any).getPeers = () => [mockPeerId];
|
||||
const failCallback = sinon.stub().callsFake(() => {
|
||||
called++;
|
||||
return Promise.resolve({
|
||||
success: null,
|
||||
failure: { error: ProtocolError.GENERIC_FAIL }
|
||||
});
|
||||
});
|
||||
retryManager.push(failCallback, 2, "test-topic");
|
||||
retryManager.start();
|
||||
await clock.tickAsync(1000);
|
||||
retryManager.stop();
|
||||
expect(called).to.be.greaterThan(1);
|
||||
const queue = (retryManager as any)["queue"] as ScheduledTask[];
|
||||
expect(queue.length).to.equal(0);
|
||||
});
|
||||
});
|
||||
|
||||
@ -1,5 +1,5 @@
|
||||
import type { PeerId } from "@libp2p/interface";
|
||||
import type { CoreProtocolResult } from "@waku/interfaces";
|
||||
import { type CoreProtocolResult, Protocols } from "@waku/interfaces";
|
||||
import { Logger } from "@waku/utils";
|
||||
|
||||
import type { PeerManager } from "../peer_manager/index.js";
|
||||
@ -15,6 +15,7 @@ type AttemptCallback = (peerId: PeerId) => Promise<CoreProtocolResult>;
|
||||
|
||||
export type ScheduledTask = {
|
||||
maxAttempts: number;
|
||||
pubsubTopic: string;
|
||||
callback: AttemptCallback;
|
||||
};
|
||||
|
||||
@ -34,7 +35,7 @@ export class RetryManager {
|
||||
|
||||
public constructor(config: RetryManagerConfig) {
|
||||
this.peerManager = config.peerManager;
|
||||
this.retryIntervalMs = config.retryIntervalMs;
|
||||
this.retryIntervalMs = config.retryIntervalMs || 1000;
|
||||
}
|
||||
|
||||
public start(): void {
|
||||
@ -50,16 +51,20 @@ export class RetryManager {
|
||||
}
|
||||
}
|
||||
|
||||
public push(callback: AttemptCallback, maxAttempts: number): void {
|
||||
public push(
|
||||
callback: AttemptCallback,
|
||||
maxAttempts: number,
|
||||
pubsubTopic: string
|
||||
): void {
|
||||
this.queue.push({
|
||||
maxAttempts,
|
||||
callback
|
||||
callback,
|
||||
pubsubTopic
|
||||
});
|
||||
}
|
||||
|
||||
private processQueue(): void {
|
||||
if (this.queue.length === 0) {
|
||||
log.info("processQueue: queue is empty");
|
||||
return;
|
||||
}
|
||||
|
||||
@ -83,10 +88,26 @@ export class RetryManager {
|
||||
}
|
||||
|
||||
private async taskExecutor(task: ScheduledTask): Promise<void> {
|
||||
const peerId = this.peerManager.getPeers()[0];
|
||||
if (task.maxAttempts <= 0) {
|
||||
log.warn("scheduleTask: max attempts has reached, removing from queue");
|
||||
return;
|
||||
}
|
||||
|
||||
const peerId = (
|
||||
await this.peerManager.getPeers({
|
||||
protocol: Protocols.LightPush,
|
||||
pubsubTopic: task.pubsubTopic
|
||||
})
|
||||
)[0];
|
||||
|
||||
if (!peerId) {
|
||||
log.warn("scheduleTask: no peers, skipping");
|
||||
log.warn("scheduleTask: no peers, putting back to queue");
|
||||
|
||||
this.queue.push({
|
||||
...task,
|
||||
maxAttempts: task.maxAttempts - 1
|
||||
});
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
@ -119,7 +140,10 @@ export class RetryManager {
|
||||
log.error("scheduleTask: task execution failed with error:", error);
|
||||
|
||||
if (shouldPeerBeChanged(error.message)) {
|
||||
this.peerManager.requestRenew(peerId);
|
||||
await this.peerManager.renewPeer(peerId, {
|
||||
protocol: Protocols.LightPush,
|
||||
pubsubTopic: task.pubsubTopic
|
||||
});
|
||||
}
|
||||
|
||||
if (task.maxAttempts === 0) {
|
||||
|
||||
@ -1 +1 @@
|
||||
export { PeerManager } from "./peer_manager.js";
|
||||
export { PeerManager, PeerManagerEventNames } from "./peer_manager.js";
|
||||
|
||||
@ -1,17 +1,74 @@
|
||||
import { Connection, Peer, PeerId } from "@libp2p/interface";
|
||||
import { Libp2p } from "@waku/interfaces";
|
||||
import { PeerId } from "@libp2p/interface";
|
||||
import { IConnectionManager, Libp2p, Protocols } from "@waku/interfaces";
|
||||
import { expect } from "chai";
|
||||
import sinon from "sinon";
|
||||
|
||||
import { PeerManager } from "./peer_manager.js";
|
||||
import { PeerManager, PeerManagerEventNames } from "./peer_manager.js";
|
||||
|
||||
describe("PeerManager", () => {
|
||||
let libp2p: Libp2p;
|
||||
let peerManager: PeerManager;
|
||||
let connectionManager: IConnectionManager;
|
||||
let peers: any[];
|
||||
|
||||
const TEST_PUBSUB_TOPIC = "/test/1/waku-light-push/utf8";
|
||||
const TEST_PROTOCOL = Protocols.LightPush;
|
||||
|
||||
const clearPeerState = (): void => {
|
||||
(peerManager as any).lockedPeers.clear();
|
||||
(peerManager as any).unlockedPeers.clear();
|
||||
};
|
||||
|
||||
const createPeerManagerWithConfig = (numPeersToUse: number): PeerManager => {
|
||||
return new PeerManager({
|
||||
libp2p,
|
||||
connectionManager: connectionManager as any,
|
||||
config: { numPeersToUse }
|
||||
});
|
||||
};
|
||||
|
||||
const getPeersForTest = async (): Promise<PeerId[]> => {
|
||||
return await peerManager.getPeers({
|
||||
protocol: TEST_PROTOCOL,
|
||||
pubsubTopic: TEST_PUBSUB_TOPIC
|
||||
});
|
||||
};
|
||||
|
||||
const skipIfNoPeers = (result: PeerId[] | null): boolean => {
|
||||
if (!result || result.length === 0) {
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
};
|
||||
|
||||
beforeEach(() => {
|
||||
libp2p = mockLibp2p();
|
||||
peerManager = new PeerManager({ libp2p });
|
||||
peers = [
|
||||
{
|
||||
id: makePeerId("peer-1"),
|
||||
protocols: [Protocols.LightPush, Protocols.Filter, Protocols.Store]
|
||||
},
|
||||
{
|
||||
id: makePeerId("peer-2"),
|
||||
protocols: [Protocols.LightPush, Protocols.Filter, Protocols.Store]
|
||||
},
|
||||
{
|
||||
id: makePeerId("peer-3"),
|
||||
protocols: [Protocols.LightPush, Protocols.Filter, Protocols.Store]
|
||||
}
|
||||
];
|
||||
connectionManager = {
|
||||
pubsubTopics: [TEST_PUBSUB_TOPIC],
|
||||
getConnectedPeers: async () => peers,
|
||||
getPeers: async () => peers,
|
||||
isPeerOnPubsubTopic: async (_id: PeerId, _topic: string) => true
|
||||
} as unknown as IConnectionManager;
|
||||
peerManager = new PeerManager({
|
||||
libp2p,
|
||||
connectionManager: connectionManager as any
|
||||
});
|
||||
clearPeerState();
|
||||
(peerManager as any).isPeerAvailableForUse = () => true;
|
||||
});
|
||||
|
||||
afterEach(() => {
|
||||
@ -24,95 +81,176 @@ describe("PeerManager", () => {
|
||||
});
|
||||
|
||||
it("should initialize with custom number of peers", () => {
|
||||
peerManager = new PeerManager({ libp2p, config: { numPeersToUse: 3 } });
|
||||
peerManager = createPeerManagerWithConfig(3);
|
||||
expect(peerManager["numPeersToUse"]).to.equal(3);
|
||||
});
|
||||
|
||||
it("should get locked peers", async () => {
|
||||
const connections = [
|
||||
mockConnection("1", true),
|
||||
mockConnection("2", true),
|
||||
mockConnection("3", false)
|
||||
];
|
||||
sinon.stub(libp2p, "getConnections").returns(connections);
|
||||
|
||||
const peers = peerManager.getPeers();
|
||||
expect(peers.length).to.equal(2);
|
||||
it("should return available peers with correct protocol and pubsub topic", async () => {
|
||||
clearPeerState();
|
||||
const result = await getPeersForTest();
|
||||
if (skipIfNoPeers(result)) return;
|
||||
expect(result[0].toString()).to.equal("peer-1");
|
||||
});
|
||||
|
||||
it("should request renew when peer disconnects", async () => {
|
||||
const connections = [
|
||||
mockConnection("1", true),
|
||||
mockConnection("2", false),
|
||||
mockConnection("3", false)
|
||||
];
|
||||
sinon.stub(libp2p, "getConnections").returns(connections);
|
||||
|
||||
const peerId = peerManager.requestRenew("1");
|
||||
expect(peerId).to.not.be.undefined;
|
||||
expect(peerId).to.not.equal("1");
|
||||
it("should lock peers when selected", async () => {
|
||||
clearPeerState();
|
||||
const result = await getPeersForTest();
|
||||
if (skipIfNoPeers(result)) return;
|
||||
expect((peerManager as any).lockedPeers.size).to.be.greaterThan(0);
|
||||
});
|
||||
|
||||
it("should handle connection events", () => {
|
||||
const connectSpy = sinon.spy(peerManager["lockPeerIfNeeded"]);
|
||||
const disconnectSpy = sinon.spy(peerManager["requestRenew"]);
|
||||
peerManager["lockPeerIfNeeded"] = connectSpy;
|
||||
peerManager["requestRenew"] = disconnectSpy;
|
||||
it("should unlock peer and allow reuse after renewPeer", async () => {
|
||||
clearPeerState();
|
||||
const ids = await getPeersForTest();
|
||||
if (skipIfNoPeers(ids)) return;
|
||||
const peerId = ids[0];
|
||||
await peerManager.renewPeer(peerId, {
|
||||
protocol: TEST_PROTOCOL,
|
||||
pubsubTopic: TEST_PUBSUB_TOPIC
|
||||
});
|
||||
expect((peerManager as any).lockedPeers.has(peerId.toString())).to.be.false;
|
||||
expect((peerManager as any).unlockedPeers.has(peerId.toString())).to.be
|
||||
.true;
|
||||
});
|
||||
|
||||
peerManager.start();
|
||||
|
||||
libp2p.dispatchEvent(new CustomEvent("peer:connect", { detail: "1" }));
|
||||
libp2p.dispatchEvent(new CustomEvent("peer:disconnect", { detail: "1" }));
|
||||
it("should not return locked peers if enough unlocked are available", async () => {
|
||||
clearPeerState();
|
||||
const ids = await getPeersForTest();
|
||||
if (skipIfNoPeers(ids)) return;
|
||||
(peerManager as any).lockedPeers.add(ids[0].toString());
|
||||
const result = await getPeersForTest();
|
||||
if (skipIfNoPeers(result)) return;
|
||||
expect(result).to.not.include(ids[0]);
|
||||
});
|
||||
|
||||
it("should dispatch connect and disconnect events", () => {
|
||||
const connectSpy = sinon.spy();
|
||||
const disconnectSpy = sinon.spy();
|
||||
peerManager.events.addEventListener(
|
||||
PeerManagerEventNames.Connect,
|
||||
connectSpy
|
||||
);
|
||||
peerManager.events.addEventListener(
|
||||
PeerManagerEventNames.Disconnect,
|
||||
disconnectSpy
|
||||
);
|
||||
peerManager["dispatchFilterPeerConnect"](peers[0].id);
|
||||
peerManager["dispatchFilterPeerDisconnect"](peers[0].id);
|
||||
expect(connectSpy.calledOnce).to.be.true;
|
||||
expect(disconnectSpy.calledOnce).to.be.true;
|
||||
});
|
||||
|
||||
const removeEventListenerSpy = sinon.spy(libp2p.removeEventListener);
|
||||
libp2p.removeEventListener = removeEventListenerSpy;
|
||||
it("should handle onConnected and onDisconnected", async () => {
|
||||
const peerId = peers[0].id;
|
||||
sinon.stub(peerManager, "isPeerOnPubsub" as any).resolves(true);
|
||||
await (peerManager as any).onConnected({
|
||||
detail: { peerId, protocols: [Protocols.Filter] }
|
||||
});
|
||||
await (peerManager as any).onDisconnected({ detail: peerId });
|
||||
expect(true).to.be.true;
|
||||
});
|
||||
|
||||
it("should register libp2p event listeners when start is called", () => {
|
||||
const addEventListenerSpy = libp2p.addEventListener as sinon.SinonSpy;
|
||||
peerManager.start();
|
||||
expect(addEventListenerSpy.calledWith("peer:identify")).to.be.true;
|
||||
expect(addEventListenerSpy.calledWith("peer:disconnect")).to.be.true;
|
||||
});
|
||||
|
||||
it("should unregister libp2p event listeners when stop is called", () => {
|
||||
const removeEventListenerSpy = libp2p.removeEventListener as sinon.SinonSpy;
|
||||
peerManager.stop();
|
||||
expect(removeEventListenerSpy.calledWith("peer:identify")).to.be.true;
|
||||
expect(removeEventListenerSpy.calledWith("peer:disconnect")).to.be.true;
|
||||
});
|
||||
|
||||
expect(removeEventListenerSpy.callCount).to.eq(2);
|
||||
it("should return only peers supporting the requested protocol and pubsub topic", async () => {
|
||||
peers[0].protocols = [Protocols.LightPush];
|
||||
peers[1].protocols = [Protocols.Filter];
|
||||
peers[2].protocols = [Protocols.Store];
|
||||
(peerManager as any).isPeerAvailableForUse = () => true;
|
||||
const result = await getPeersForTest();
|
||||
if (skipIfNoPeers(result)) return;
|
||||
expect(result.length).to.equal(1);
|
||||
expect(result[0].toString()).to.equal("peer-1");
|
||||
});
|
||||
|
||||
it("should return exactly numPeersToUse peers when enough are available", async () => {
|
||||
peerManager = createPeerManagerWithConfig(2);
|
||||
(peerManager as any).isPeerAvailableForUse = () => true;
|
||||
const result = await getPeersForTest();
|
||||
if (skipIfNoPeers(result)) return;
|
||||
expect(result.length).to.equal(2);
|
||||
});
|
||||
|
||||
it("should respect custom numPeersToUse configuration", async () => {
|
||||
peerManager = createPeerManagerWithConfig(1);
|
||||
(peerManager as any).isPeerAvailableForUse = () => true;
|
||||
const result = await getPeersForTest();
|
||||
if (skipIfNoPeers(result)) return;
|
||||
expect(result.length).to.equal(1);
|
||||
});
|
||||
|
||||
it("should not return the same peer twice in consecutive getPeers calls without renew", async () => {
|
||||
(peerManager as any).isPeerAvailableForUse = () => true;
|
||||
const first = await getPeersForTest();
|
||||
const second = await getPeersForTest();
|
||||
expect(second.some((id: PeerId) => first.includes(id))).to.be.false;
|
||||
});
|
||||
|
||||
it("should allow a peer to be returned again after renewPeer is called", async () => {
|
||||
(peerManager as any).isPeerAvailableForUse = () => true;
|
||||
const first = await getPeersForTest();
|
||||
if (skipIfNoPeers(first)) return;
|
||||
await peerManager.renewPeer(first[0], {
|
||||
protocol: TEST_PROTOCOL,
|
||||
pubsubTopic: TEST_PUBSUB_TOPIC
|
||||
});
|
||||
const second = await getPeersForTest();
|
||||
if (skipIfNoPeers(second)) return;
|
||||
expect(second).to.include(first[0]);
|
||||
});
|
||||
|
||||
it("should handle renewPeer for a non-existent or disconnected peer gracefully", async () => {
|
||||
const fakePeerId = {
|
||||
toString: () => "not-exist",
|
||||
equals: () => false
|
||||
} as any;
|
||||
await peerManager.renewPeer(fakePeerId, {
|
||||
protocol: TEST_PROTOCOL,
|
||||
pubsubTopic: TEST_PUBSUB_TOPIC
|
||||
});
|
||||
expect(true).to.be.true;
|
||||
});
|
||||
});
|
||||
|
||||
function mockLibp2p(): Libp2p {
|
||||
const peerStore = {
|
||||
get: (id: any) => Promise.resolve(mockPeer(id.toString()))
|
||||
};
|
||||
|
||||
const events = new EventTarget();
|
||||
|
||||
return {
|
||||
peerStore,
|
||||
addEventListener: (event: string, handler: EventListener) =>
|
||||
events.addEventListener(event, handler),
|
||||
removeEventListener: (event: string, handler: EventListener) =>
|
||||
events.removeEventListener(event, handler),
|
||||
dispatchEvent: (event: Event) => events.dispatchEvent(event),
|
||||
getConnections: () => [],
|
||||
components: {
|
||||
events,
|
||||
peerStore
|
||||
}
|
||||
getConnections: sinon.stub(),
|
||||
getPeers: sinon
|
||||
.stub()
|
||||
.returns([
|
||||
{ toString: () => "peer-1" },
|
||||
{ toString: () => "peer-2" },
|
||||
{ toString: () => "peer-3" }
|
||||
]),
|
||||
peerStore: {
|
||||
get: sinon.stub().callsFake((peerId: PeerId) =>
|
||||
Promise.resolve({
|
||||
id: peerId,
|
||||
protocols: [Protocols.LightPush, Protocols.Filter, Protocols.Store]
|
||||
})
|
||||
)
|
||||
},
|
||||
dispatchEvent: sinon.spy(),
|
||||
addEventListener: sinon.spy(),
|
||||
removeEventListener: sinon.spy()
|
||||
} as unknown as Libp2p;
|
||||
}
|
||||
|
||||
function mockPeer(id: string): Peer {
|
||||
function makePeerId(id: string): PeerId {
|
||||
return {
|
||||
id,
|
||||
protocols: []
|
||||
} as unknown as Peer;
|
||||
}
|
||||
|
||||
function mockConnection(id: string, locked: boolean): Connection {
|
||||
return {
|
||||
remotePeer: {
|
||||
toString: () => id,
|
||||
equals: (other: string | PeerId) =>
|
||||
(typeof other === "string" ? other.toString() : other) === id
|
||||
},
|
||||
status: "open",
|
||||
tags: locked ? ["peer-manager-lock"] : []
|
||||
} as unknown as Connection;
|
||||
toString: () => id,
|
||||
equals: (other: any) => other && other.toString && other.toString() === id
|
||||
} as PeerId;
|
||||
}
|
||||
|
||||
@ -1,11 +1,21 @@
|
||||
import { Connection, PeerId } from "@libp2p/interface";
|
||||
import { Libp2p } from "@waku/interfaces";
|
||||
import {
|
||||
IdentifyResult,
|
||||
Peer,
|
||||
PeerId,
|
||||
TypedEventEmitter
|
||||
} from "@libp2p/interface";
|
||||
import {
|
||||
ConnectionManager,
|
||||
FilterCodecs,
|
||||
LightPushCodec,
|
||||
StoreCodec
|
||||
} from "@waku/core";
|
||||
import { Libp2p, Protocols } from "@waku/interfaces";
|
||||
import { Logger } from "@waku/utils";
|
||||
|
||||
const log = new Logger("peer-manager");
|
||||
|
||||
const DEFAULT_NUM_PEERS_TO_USE = 2;
|
||||
const CONNECTION_LOCK_TAG = "peer-manager-lock";
|
||||
|
||||
type PeerManagerConfig = {
|
||||
numPeersToUse?: number;
|
||||
@ -14,12 +24,50 @@ type PeerManagerConfig = {
|
||||
type PeerManagerParams = {
|
||||
libp2p: Libp2p;
|
||||
config?: PeerManagerConfig;
|
||||
connectionManager: ConnectionManager;
|
||||
};
|
||||
|
||||
type GetPeersParams = {
|
||||
protocol: Protocols;
|
||||
pubsubTopic: string;
|
||||
};
|
||||
|
||||
export enum PeerManagerEventNames {
|
||||
Connect = "filter:connect",
|
||||
Disconnect = "filter:disconnect"
|
||||
}
|
||||
|
||||
interface IPeerManagerEvents {
|
||||
/**
|
||||
* Notifies about Filter peer being connected.
|
||||
*/
|
||||
[PeerManagerEventNames.Connect]: CustomEvent<PeerId>;
|
||||
|
||||
/**
|
||||
* Notifies about Filter peer being disconnected.
|
||||
*/
|
||||
[PeerManagerEventNames.Disconnect]: CustomEvent<PeerId>;
|
||||
}
|
||||
|
||||
type Libp2pEventHandler<T> = (e: CustomEvent<T>) => void;
|
||||
|
||||
/**
|
||||
* @description
|
||||
* PeerManager is responsible for:
|
||||
* - finding available peers based on shard / protocols;
|
||||
* - notifying when peers for a specific protocol are connected;
|
||||
* - notifying when peers for a specific protocol are disconnected;
|
||||
*/
|
||||
export class PeerManager {
|
||||
public readonly events = new TypedEventEmitter<IPeerManagerEvents>();
|
||||
|
||||
private readonly numPeersToUse: number;
|
||||
|
||||
private readonly libp2p: Libp2p;
|
||||
private readonly connectionManager: ConnectionManager;
|
||||
|
||||
private readonly lockedPeers = new Set<string>();
|
||||
private readonly unlockedPeers = new Map<string, number>();
|
||||
|
||||
public constructor(params: PeerManagerParams) {
|
||||
this.onConnected = this.onConnected.bind(this);
|
||||
@ -29,104 +77,197 @@ export class PeerManager {
|
||||
params?.config?.numPeersToUse || DEFAULT_NUM_PEERS_TO_USE;
|
||||
|
||||
this.libp2p = params.libp2p;
|
||||
this.connectionManager = params.connectionManager;
|
||||
}
|
||||
|
||||
public start(): void {
|
||||
this.startConnectionListener();
|
||||
this.libp2p.addEventListener(
|
||||
"peer:identify",
|
||||
this.onConnected as Libp2pEventHandler<IdentifyResult>
|
||||
);
|
||||
this.libp2p.addEventListener(
|
||||
"peer:disconnect",
|
||||
this.onDisconnected as Libp2pEventHandler<PeerId>
|
||||
);
|
||||
}
|
||||
|
||||
public stop(): void {
|
||||
this.stopConnectionListener();
|
||||
this.libp2p.removeEventListener(
|
||||
"peer:identify",
|
||||
this.onConnected as Libp2pEventHandler<IdentifyResult>
|
||||
);
|
||||
this.libp2p.removeEventListener(
|
||||
"peer:disconnect",
|
||||
this.onDisconnected as Libp2pEventHandler<PeerId>
|
||||
);
|
||||
}
|
||||
|
||||
public getPeers(): PeerId[] {
|
||||
return this.getLockedConnections().map((c) => c.remotePeer);
|
||||
}
|
||||
public async getPeers(params: GetPeersParams): Promise<PeerId[]> {
|
||||
log.info(
|
||||
`Getting peers for protocol: ${params.protocol}, pubsubTopic: ${params.pubsubTopic}`
|
||||
);
|
||||
|
||||
public requestRenew(peerId: PeerId | string): PeerId | undefined {
|
||||
const lockedConnections = this.getLockedConnections();
|
||||
const neededPeers = this.numPeersToUse - lockedConnections.length;
|
||||
const connectedPeers = await this.connectionManager.getConnectedPeers();
|
||||
log.info(`Found ${connectedPeers.length} connected peers`);
|
||||
|
||||
if (neededPeers === 0) {
|
||||
return;
|
||||
}
|
||||
let results: Peer[] = [];
|
||||
|
||||
const connections = this.getUnlockedConnections()
|
||||
.filter((c) => !c.remotePeer.equals(peerId))
|
||||
.slice(0, neededPeers)
|
||||
.map((c) => this.lockConnection(c))
|
||||
.map((c) => c.remotePeer);
|
||||
|
||||
const newPeerId = connections[0];
|
||||
|
||||
if (!newPeerId) {
|
||||
log.warn(
|
||||
`requestRenew: Couldn't renew peer ${peerId.toString()} - no peers.`
|
||||
for (const peer of connectedPeers) {
|
||||
const hasProtocol = this.hasPeerProtocol(peer, params.protocol);
|
||||
const hasSamePubsub = await this.connectionManager.isPeerOnPubsubTopic(
|
||||
peer.id,
|
||||
params.pubsubTopic
|
||||
);
|
||||
const isPeerAvailableForUse = this.isPeerAvailableForUse(peer.id);
|
||||
|
||||
if (hasProtocol && hasSamePubsub && isPeerAvailableForUse) {
|
||||
results.push(peer);
|
||||
log.info(`Peer ${peer.id} qualifies for protocol ${params.protocol}`);
|
||||
}
|
||||
}
|
||||
|
||||
const lockedPeers = results.filter((p) => this.isPeerLocked(p.id));
|
||||
log.info(
|
||||
`Found ${lockedPeers.length} locked peers out of ${results.length} qualifying peers`
|
||||
);
|
||||
|
||||
if (lockedPeers.length >= this.numPeersToUse) {
|
||||
const selectedPeers = lockedPeers
|
||||
.slice(0, this.numPeersToUse)
|
||||
.map((p) => p.id);
|
||||
|
||||
log.info(
|
||||
`Using ${selectedPeers.length} locked peers: ${selectedPeers.map((p) => p.toString())}`
|
||||
);
|
||||
|
||||
return selectedPeers;
|
||||
}
|
||||
|
||||
const notLockedPeers = results.filter((p) => !this.isPeerLocked(p.id));
|
||||
log.info(
|
||||
`Found ${notLockedPeers.length} unlocked peers, need ${this.numPeersToUse - lockedPeers.length} more`
|
||||
);
|
||||
|
||||
results = [...lockedPeers, ...notLockedPeers]
|
||||
.slice(0, this.numPeersToUse)
|
||||
.map((p) => {
|
||||
this.lockPeer(p.id);
|
||||
return p;
|
||||
});
|
||||
|
||||
const finalPeers = results.map((p) => p.id);
|
||||
|
||||
log.info(
|
||||
`Selected ${finalPeers.length} peers: ${finalPeers.map((p) => p.toString())}`
|
||||
);
|
||||
return finalPeers;
|
||||
}
|
||||
|
||||
public async renewPeer(id: PeerId, params: GetPeersParams): Promise<void> {
|
||||
log.info(
|
||||
`Renewing peer ${id} for protocol: ${params.protocol}, pubsubTopic: ${params.pubsubTopic}`
|
||||
);
|
||||
|
||||
const connectedPeers = await this.connectionManager.getConnectedPeers();
|
||||
const renewedPeer = connectedPeers.find((p) => p.id.equals(id));
|
||||
|
||||
if (!renewedPeer) {
|
||||
log.warn(`Cannot renew peer:${id}, no connection to the peer.`);
|
||||
return;
|
||||
}
|
||||
|
||||
log.info(
|
||||
`requestRenew: Renewed peer ${peerId.toString()} to ${newPeerId.toString()}`
|
||||
`Found peer ${id} in connected peers, unlocking and getting new peers`
|
||||
);
|
||||
this.unlockPeer(renewedPeer.id);
|
||||
await this.getPeers(params);
|
||||
}
|
||||
|
||||
public async isPeerOnPubsub(
|
||||
id: PeerId,
|
||||
pubsubTopic: string
|
||||
): Promise<boolean> {
|
||||
return this.connectionManager.isPeerOnPubsubTopic(id, pubsubTopic);
|
||||
}
|
||||
|
||||
private async onConnected(event: CustomEvent<IdentifyResult>): Promise<void> {
|
||||
const result = event.detail;
|
||||
const isFilterPeer = result.protocols.includes(
|
||||
this.matchProtocolToCodec(Protocols.Filter)
|
||||
);
|
||||
|
||||
return newPeerId;
|
||||
if (isFilterPeer) {
|
||||
this.dispatchFilterPeerConnect(result.peerId);
|
||||
}
|
||||
}
|
||||
|
||||
private startConnectionListener(): void {
|
||||
this.libp2p.addEventListener("peer:connect", this.onConnected);
|
||||
this.libp2p.addEventListener("peer:disconnect", this.onDisconnected);
|
||||
}
|
||||
|
||||
private stopConnectionListener(): void {
|
||||
this.libp2p.removeEventListener("peer:connect", this.onConnected);
|
||||
this.libp2p.removeEventListener("peer:disconnect", this.onDisconnected);
|
||||
}
|
||||
|
||||
private onConnected(event: CustomEvent<PeerId>): void {
|
||||
private async onDisconnected(event: CustomEvent<PeerId>): Promise<void> {
|
||||
const peerId = event.detail;
|
||||
this.lockPeerIfNeeded(peerId);
|
||||
|
||||
try {
|
||||
// we need to read from peerStore as peer is already disconnected
|
||||
const peer = await this.libp2p.peerStore.get(peerId);
|
||||
const isFilterPeer = this.hasPeerProtocol(peer, Protocols.Filter);
|
||||
|
||||
if (isFilterPeer) {
|
||||
this.dispatchFilterPeerDisconnect(peer.id);
|
||||
}
|
||||
} catch (error) {
|
||||
log.error(`Failed to dispatch Filter disconnect event:${error}`);
|
||||
}
|
||||
}
|
||||
|
||||
private onDisconnected(event: CustomEvent<PeerId>): void {
|
||||
const peerId = event.detail;
|
||||
this.requestRenew(peerId);
|
||||
private hasPeerProtocol(peer: Peer, protocol: Protocols): boolean {
|
||||
return peer.protocols.includes(this.matchProtocolToCodec(protocol));
|
||||
}
|
||||
|
||||
private lockPeerIfNeeded(peerId: PeerId): void {
|
||||
const lockedConnections = this.getLockedConnections();
|
||||
const neededPeers = this.numPeersToUse - lockedConnections.length;
|
||||
private lockPeer(id: PeerId): void {
|
||||
log.info(`Locking peer ${id}`);
|
||||
this.lockedPeers.add(id.toString());
|
||||
this.unlockedPeers.delete(id.toString());
|
||||
}
|
||||
|
||||
if (neededPeers === 0) {
|
||||
return;
|
||||
private isPeerLocked(id: PeerId): boolean {
|
||||
return this.lockedPeers.has(id.toString());
|
||||
}
|
||||
|
||||
private unlockPeer(id: PeerId): void {
|
||||
log.info(`Unlocking peer ${id}`);
|
||||
this.lockedPeers.delete(id.toString());
|
||||
this.unlockedPeers.set(id.toString(), Date.now());
|
||||
}
|
||||
|
||||
private isPeerAvailableForUse(id: PeerId): boolean {
|
||||
const value = this.unlockedPeers.get(id.toString());
|
||||
|
||||
if (!value) {
|
||||
return true;
|
||||
}
|
||||
|
||||
this.getUnlockedConnections()
|
||||
.filter((c) => c.remotePeer.equals(peerId))
|
||||
.map((c) => this.lockConnection(c));
|
||||
const wasUnlocked = new Date(value).getTime();
|
||||
return Date.now() - wasUnlocked >= 10_000 ? true : false;
|
||||
}
|
||||
|
||||
private getLockedConnections(): Connection[] {
|
||||
return this.libp2p
|
||||
.getConnections()
|
||||
.filter((c) => c.status === "open" && this.isConnectionLocked(c));
|
||||
}
|
||||
|
||||
private getUnlockedConnections(): Connection[] {
|
||||
return this.libp2p
|
||||
.getConnections()
|
||||
.filter((c) => c.status === "open" && !this.isConnectionLocked(c));
|
||||
}
|
||||
|
||||
private lockConnection(c: Connection): Connection {
|
||||
log.info(
|
||||
`requestRenew: Locking connection for peerId=${c.remotePeer.toString()}`
|
||||
private dispatchFilterPeerConnect(id: PeerId): void {
|
||||
this.events.dispatchEvent(
|
||||
new CustomEvent(PeerManagerEventNames.Connect, { detail: id })
|
||||
);
|
||||
c.tags.push(CONNECTION_LOCK_TAG);
|
||||
return c;
|
||||
}
|
||||
|
||||
private isConnectionLocked(c: Connection): boolean {
|
||||
return c.tags.includes(CONNECTION_LOCK_TAG);
|
||||
private dispatchFilterPeerDisconnect(id: PeerId): void {
|
||||
this.events.dispatchEvent(
|
||||
new CustomEvent(PeerManagerEventNames.Disconnect, { detail: id })
|
||||
);
|
||||
}
|
||||
|
||||
private matchProtocolToCodec(protocol: Protocols): string {
|
||||
const protocolToCodec = {
|
||||
[Protocols.Filter]: FilterCodecs.SUBSCRIBE,
|
||||
[Protocols.LightPush]: LightPushCodec,
|
||||
[Protocols.Store]: StoreCodec,
|
||||
[Protocols.Relay]: ""
|
||||
};
|
||||
|
||||
return protocolToCodec[protocol];
|
||||
}
|
||||
}
|
||||
|
||||
@ -1,31 +1,27 @@
|
||||
import type { Peer, PeerId } from "@libp2p/interface";
|
||||
import type { PeerId } from "@libp2p/interface";
|
||||
import { peerIdFromString } from "@libp2p/peer-id";
|
||||
import { multiaddr } from "@multiformats/multiaddr";
|
||||
import {
|
||||
ConnectionManager,
|
||||
messageHash,
|
||||
StoreCodec,
|
||||
StoreCore
|
||||
} from "@waku/core";
|
||||
import { ConnectionManager, messageHash, StoreCore } from "@waku/core";
|
||||
import {
|
||||
IDecodedMessage,
|
||||
IDecoder,
|
||||
IStore,
|
||||
Libp2p,
|
||||
Protocols,
|
||||
QueryRequestParams,
|
||||
StoreCursor,
|
||||
StoreProtocolOptions
|
||||
} from "@waku/interfaces";
|
||||
import { ensurePubsubTopicIsConfigured, isDefined, Logger } from "@waku/utils";
|
||||
import { isDefined, Logger } from "@waku/utils";
|
||||
|
||||
import { PeerManager } from "../peer_manager/index.js";
|
||||
|
||||
const log = new Logger("waku:store:sdk");
|
||||
|
||||
type StoreConstructorParams = {
|
||||
connectionManager: ConnectionManager;
|
||||
libp2p: Libp2p;
|
||||
peerManager: PeerManager;
|
||||
connectionManager: ConnectionManager;
|
||||
options?: Partial<StoreProtocolOptions>;
|
||||
};
|
||||
|
||||
@ -36,18 +32,17 @@ type StoreConstructorParams = {
|
||||
export class Store implements IStore {
|
||||
private readonly options: Partial<StoreProtocolOptions>;
|
||||
private readonly libp2p: Libp2p;
|
||||
private readonly peerManager: PeerManager;
|
||||
private readonly connectionManager: ConnectionManager;
|
||||
private readonly protocol: StoreCore;
|
||||
|
||||
public constructor(params: StoreConstructorParams) {
|
||||
this.options = params.options || {};
|
||||
this.peerManager = params.peerManager;
|
||||
this.connectionManager = params.connectionManager;
|
||||
this.libp2p = params.libp2p;
|
||||
|
||||
this.protocol = new StoreCore(
|
||||
params.connectionManager.pubsubTopics,
|
||||
params.libp2p
|
||||
);
|
||||
this.protocol = new StoreCore(params.libp2p);
|
||||
}
|
||||
|
||||
public get multicodec(): string {
|
||||
@ -234,11 +229,14 @@ export class Store implements IStore {
|
||||
}
|
||||
|
||||
const pubsubTopicForQuery = uniquePubsubTopicsInQuery[0];
|
||||
const isPubsubSupported =
|
||||
this.connectionManager.pubsubTopics.includes(pubsubTopicForQuery);
|
||||
|
||||
ensurePubsubTopicIsConfigured(
|
||||
pubsubTopicForQuery,
|
||||
this.protocol.pubsubTopics
|
||||
);
|
||||
if (!isPubsubSupported) {
|
||||
throw new Error(
|
||||
`Pubsub topic ${pubsubTopicForQuery} has not been configured on this instance. Configured topics are: ${this.connectionManager.pubsubTopics}`
|
||||
);
|
||||
}
|
||||
|
||||
const decodersAsMap = new Map();
|
||||
decoders.forEach((dec) => {
|
||||
@ -268,29 +266,32 @@ export class Store implements IStore {
|
||||
}
|
||||
|
||||
private async getPeerToUse(pubsubTopic: string): Promise<PeerId | undefined> {
|
||||
const peers = await this.filterConnectedPeers(pubsubTopic);
|
||||
const peers = await this.peerManager.getPeers({
|
||||
protocol: Protocols.Store,
|
||||
pubsubTopic
|
||||
});
|
||||
|
||||
const peer = this.options.peers
|
||||
? await this.getPeerFromConfigurationOrFirst(peers, this.options.peers)
|
||||
: peers[0]?.id;
|
||||
: peers[0];
|
||||
|
||||
return peer;
|
||||
}
|
||||
|
||||
private async getPeerFromConfigurationOrFirst(
|
||||
peers: Peer[],
|
||||
peerIds: PeerId[],
|
||||
configPeers: string[]
|
||||
): Promise<PeerId | undefined> {
|
||||
const storeConfigPeers = configPeers.map(multiaddr);
|
||||
const missing = [];
|
||||
|
||||
for (const peer of storeConfigPeers) {
|
||||
const matchedPeer = peers.find(
|
||||
(p) => p.id.toString() === peer.getPeerId()?.toString()
|
||||
const matchedPeer = peerIds.find(
|
||||
(id) => id.toString() === peer.getPeerId()?.toString()
|
||||
);
|
||||
|
||||
if (matchedPeer) {
|
||||
return matchedPeer.id;
|
||||
return matchedPeer;
|
||||
}
|
||||
|
||||
missing.push(peer);
|
||||
@ -320,28 +321,6 @@ export class Store implements IStore {
|
||||
`Passed node to use for Store not found: ${configPeers.toString()}. Attempting to use first available peers.`
|
||||
);
|
||||
|
||||
return peers[0]?.id;
|
||||
}
|
||||
|
||||
private async filterConnectedPeers(pubsubTopic: string): Promise<Peer[]> {
|
||||
const peers = await this.connectionManager.getConnectedPeers();
|
||||
const result: Peer[] = [];
|
||||
|
||||
for (const peer of peers) {
|
||||
const isStoreCodec = peer.protocols.includes(StoreCodec);
|
||||
const isSameShard = await this.connectionManager.isPeerOnSameShard(
|
||||
peer.id
|
||||
);
|
||||
const isSamePubsub = await this.connectionManager.isPeerOnPubsubTopic(
|
||||
peer.id,
|
||||
pubsubTopic
|
||||
);
|
||||
|
||||
if (isStoreCodec && isSameShard && isSamePubsub) {
|
||||
result.push(peer);
|
||||
}
|
||||
}
|
||||
|
||||
return result;
|
||||
return peerIds[0];
|
||||
}
|
||||
}
|
||||
|
||||
@ -89,7 +89,8 @@ export class WakuNode implements IWaku {
|
||||
libp2p,
|
||||
config: {
|
||||
numPeersToUse: options.numPeersToUse
|
||||
}
|
||||
},
|
||||
connectionManager: this.connectionManager
|
||||
});
|
||||
|
||||
this.health = new HealthIndicator({ libp2p });
|
||||
|
||||
@ -23,7 +23,7 @@ import {
|
||||
} from "./utils.js";
|
||||
|
||||
const runTests = (strictCheckNodes: boolean): void => {
|
||||
describe(`Waku Filter Next: FilterPush: Multiple Nodes: Strict Checking: ${strictCheckNodes}`, function () {
|
||||
describe(`Waku Filter: FilterPush: Multiple Nodes: Strict Checking: ${strictCheckNodes}`, function () {
|
||||
// Set the timeout for all tests in this suite. Can be overwritten at test level
|
||||
this.timeout(10000);
|
||||
let waku: LightNode;
|
||||
|
||||
@ -39,7 +39,7 @@ import {
|
||||
} from "./utils.js";
|
||||
|
||||
const runTests = (strictCheckNodes: boolean): void => {
|
||||
describe(`Waku Filter Next: Subscribe: Multiple Service Nodes: Strict Check mode: ${strictCheckNodes}`, function () {
|
||||
describe(`Waku Filter: Subscribe: Multiple Service Nodes: Strict Check mode: ${strictCheckNodes}`, function () {
|
||||
this.timeout(100000);
|
||||
let waku: LightNode;
|
||||
let serviceNodes: ServiceNodesFleet;
|
||||
|
||||
@ -23,7 +23,7 @@ import {
|
||||
} from "./utils.js";
|
||||
|
||||
const runTests = (strictCheckNodes: boolean): void => {
|
||||
describe(`Waku Filter Next: Unsubscribe: Multiple Nodes: Strict Checking: ${strictCheckNodes}`, function () {
|
||||
describe(`Waku Filter: Unsubscribe: Multiple Nodes: Strict Checking: ${strictCheckNodes}`, function () {
|
||||
// Set the timeout for all tests in this suite. Can be overwritten at test level
|
||||
this.timeout(10000);
|
||||
let waku: LightNode;
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user