Arseniy Klempner 16253026c6
feat: implement lp-v3 error codes with backwards compatibility (#2501)
* feat: implement LightPush v3 protocol support

Add comprehensive LightPush v3 protocol implementation with:

Core Features:
- LightPush v3 protocol codec and multicodec detection
- Status code-based error handling and validation
- Protocol version inference and compatibility layers
- Enhanced error types with detailed failure information

Protocol Support:
- Automatic v3/v2 protocol negotiation and fallback
- Status code mapping to LightPush error types
- Protocol version tracking in SDK results
- Mixed protocol environment support

Testing Infrastructure:
- Comprehensive v3 error code handling tests
- Mock functions for v3/v2 response scenarios
- Protocol version detection and validation tests
- Backward compatibility verification

Implementation Details:
- Clean separation between v2 and v3 response handling
- Type-safe status code validation with isSuccess helper
- Enhanced failure reporting with protocol version context
- Proper error propagation through SDK layers

This implementation maintains full backward compatibility with v2
while providing enhanced functionality for v3 protocol features.

* feat: handle both light push protocols

* fix: unsubscribe test

* feat: consolidate lpv2/v3 types

* feat(tests): bump nwaku to 0.36.0

* fix: remove extraneous exports

* fix: add delay to tests

* fix: remove protocol result types

* feat: consolidate light push codec branching

* fix: revert nwaku image

* fix: remove multicodec

* fix: remove protocolversion

* feat: simplify v2/v3 branching logic to use two stream managers

* fix: remove unused utils

* fix: remove comments

* fix: revert store test

* fix: cleanup lightpush sdk

* fix: remove unused util

* fix: remove unused exports

* fix: rename file from public to protocol_handler

* fix: use proper type for sdk result

* fix: update return types in filter

* fix: rebase against latest master

* fix: use both lightpush codecs when waiting for peer

* fix: handle both lp codecs

* fix: remove unused code

* feat: use array for multicodec fields

* fix: add timestamp if missing in v3 rpc

* fix: resolve on either lp codec when waiting for peer

* fix: remove unused util

* fix: remove unnecessary abstraction

* feat: accept nwaku docker image as arg, test lp backwards compat

* fix: revert filter error

* feat: add legacy flag to enable lightpushv2 only

* Revert "feat: accept nwaku docker image as arg, test lp backwards compat"

This reverts commit 857e12cbc73305e5c51abd057665bd34708b2737.

* fix: remove unused test

* feat: improve lp3 (#2597)

* improve light push core

* move back to singualar multicodec property, enable array prop only for light push

* implement v2/v3 interop e2e test, re-add useLegacy flag, ensure e2e runs for v2 and v3

* fix v2 v3 condition

* generate message package earlier

* add log, fix condition

---------

Co-authored-by: Sasha <118575614+weboko@users.noreply.github.com>
Co-authored-by: Sasha <oleksandr@status.im>
2025-09-05 00:52:37 +02:00

336 lines
11 KiB
TypeScript

import { PeerId } from "@libp2p/interface";
import {
ClusterId,
CONNECTION_LOCKED_TAG,
IConnectionManager,
Libp2p,
Protocols,
ShardId
} from "@waku/interfaces";
import { expect } from "chai";
import sinon from "sinon";
import { PeerManager, PeerManagerEventNames } from "./peer_manager.js";
describe("PeerManager", () => {
let libp2p: Libp2p;
let peerManager: PeerManager;
let connectionManager: IConnectionManager;
let peers: any[];
let mockConnections: any[];
const TEST_PUBSUB_TOPIC = "/test/1/waku-light-push/utf8";
const TEST_PROTOCOL = Protocols.LightPush;
const clearPeerState = (): void => {
(peerManager as any).lockedPeers.clear();
(peerManager as any).unlockedPeers.clear();
};
const createPeerManagerWithConfig = (numPeersToUse: number): PeerManager => {
return new PeerManager({
libp2p,
connectionManager: connectionManager as any,
config: { numPeersToUse }
});
};
const getPeersForTest = async (): Promise<PeerId[]> => {
return await peerManager.getPeers({
protocol: TEST_PROTOCOL,
pubsubTopic: TEST_PUBSUB_TOPIC
});
};
const skipIfNoPeers = (result: PeerId[] | null): boolean => {
return !result || result.length === 0;
};
beforeEach(() => {
peers = [
{
id: makePeerId("peer-1"),
protocols: [Protocols.LightPush, Protocols.Filter, Protocols.Store]
},
{
id: makePeerId("peer-2"),
protocols: [Protocols.LightPush, Protocols.Filter, Protocols.Store]
},
{
id: makePeerId("peer-3"),
protocols: [Protocols.LightPush, Protocols.Filter, Protocols.Store]
}
];
mockConnections = [
{
remotePeer: makePeerId("peer-1"),
tags: [] as string[]
},
{
remotePeer: makePeerId("peer-2"),
tags: [] as string[]
},
{
remotePeer: makePeerId("peer-3"),
tags: [] as string[]
}
];
libp2p = mockLibp2p(mockConnections);
connectionManager = {
pubsubTopics: [TEST_PUBSUB_TOPIC],
getConnectedPeers: async () => peers,
getPeers: async () => peers,
isPeerOnShard: async (
_id: PeerId,
_clusterId: ClusterId,
_shardId: ShardId
) => true,
isPeerOnTopic: async (_id: PeerId, _topic: string) => true,
hasShardInfo: async (_id: PeerId) => true
} as unknown as IConnectionManager;
peerManager = new PeerManager({
libp2p,
connectionManager: connectionManager as any
});
clearPeerState();
(peerManager as any).isPeerAvailableForUse = () => true;
});
afterEach(() => {
peerManager.stop();
sinon.restore();
});
it("should initialize with default number of peers", () => {
expect(peerManager["numPeersToUse"]).to.equal(2);
});
it("should initialize with custom number of peers", () => {
peerManager = createPeerManagerWithConfig(3);
expect(peerManager["numPeersToUse"]).to.equal(3);
});
it("should return available peers with correct protocol and pubsub topic", async () => {
clearPeerState();
const result = await getPeersForTest();
if (skipIfNoPeers(result)) return;
expect(result[0].toString()).to.equal("peer-1");
});
it("should lock peers when selected", async () => {
clearPeerState();
const result = await getPeersForTest();
if (skipIfNoPeers(result)) return;
expect((peerManager as any).lockedPeers.size).to.be.greaterThan(0);
});
it("should unlock peer and allow reuse after renewPeer", async () => {
clearPeerState();
const ids = await getPeersForTest();
if (skipIfNoPeers(ids)) return;
const peerId = ids[0];
await peerManager.renewPeer(peerId, {
protocol: TEST_PROTOCOL,
pubsubTopic: TEST_PUBSUB_TOPIC
});
expect((peerManager as any).lockedPeers.has(peerId.toString())).to.be.false;
expect((peerManager as any).unlockedPeers.has(peerId.toString())).to.be
.true;
});
it("should not return locked peers if enough unlocked are available", async () => {
clearPeerState();
const ids = await getPeersForTest();
if (skipIfNoPeers(ids)) return;
(peerManager as any).lockedPeers.add(ids[0].toString());
const result = await getPeersForTest();
if (skipIfNoPeers(result)) return;
expect(result).to.not.include(ids[0]);
});
it("should dispatch connect and disconnect events", () => {
const filterConnectSpy = sinon.spy();
const storeConnectSpy = sinon.spy();
const filterDisconnectSpy = sinon.spy();
peerManager.events.addEventListener(
PeerManagerEventNames.FilterConnect,
filterConnectSpy
);
peerManager.events.addEventListener(
PeerManagerEventNames.StoreConnect,
storeConnectSpy
);
peerManager.events.addEventListener(
PeerManagerEventNames.FilterDisconnect,
filterDisconnectSpy
);
peerManager["dispatchFilterPeerConnect"](peers[0].id);
peerManager["dispatchStorePeerConnect"](peers[0].id);
peerManager["dispatchFilterPeerDisconnect"](peers[0].id);
expect(filterConnectSpy.calledOnce).to.be.true;
expect(storeConnectSpy.calledOnce).to.be.true;
expect(filterDisconnectSpy.calledOnce).to.be.true;
});
it("should handle onConnected and onDisconnected", async () => {
const peerId = peers[0].id;
sinon.stub(peerManager, "isPeerOnPubsub" as any).resolves(true);
await (peerManager as any).onConnected({
detail: { peerId, protocols: [Protocols.Filter] }
});
await (peerManager as any).onDisconnected({ detail: peerId });
expect(true).to.be.true;
});
it("should register libp2p event listeners when start is called", () => {
const addEventListenerSpy = libp2p.addEventListener as sinon.SinonSpy;
peerManager.start();
expect(addEventListenerSpy.calledWith("peer:identify")).to.be.true;
expect(addEventListenerSpy.calledWith("peer:disconnect")).to.be.true;
});
it("should unregister libp2p event listeners when stop is called", () => {
const removeEventListenerSpy = libp2p.removeEventListener as sinon.SinonSpy;
peerManager.stop();
expect(removeEventListenerSpy.calledWith("peer:identify")).to.be.true;
expect(removeEventListenerSpy.calledWith("peer:disconnect")).to.be.true;
});
it("should return only peers supporting the requested protocol and pubsub topic", async () => {
peers[0].protocols = [Protocols.LightPush];
peers[1].protocols = [Protocols.Filter];
peers[2].protocols = [Protocols.Store];
(peerManager as any).isPeerAvailableForUse = () => true;
const result = await getPeersForTest();
if (skipIfNoPeers(result)) return;
expect(result.length).to.equal(1);
expect(result[0].toString()).to.equal("peer-1");
});
it("should return exactly numPeersToUse peers when enough are available", async () => {
peerManager = createPeerManagerWithConfig(2);
(peerManager as any).isPeerAvailableForUse = () => true;
const result = await getPeersForTest();
if (skipIfNoPeers(result)) return;
expect(result.length).to.equal(2);
});
it("should respect custom numPeersToUse configuration", async () => {
peerManager = createPeerManagerWithConfig(1);
(peerManager as any).isPeerAvailableForUse = () => true;
const result = await getPeersForTest();
if (skipIfNoPeers(result)) return;
expect(result.length).to.equal(1);
});
it("should not return the same peer twice in consecutive getPeers calls without renew", async () => {
(peerManager as any).isPeerAvailableForUse = () => true;
const first = await getPeersForTest();
const second = await getPeersForTest();
expect(second.some((id: PeerId) => first.includes(id))).to.be.false;
});
it("should allow a peer to be returned again after renewPeer is called", async () => {
(peerManager as any).isPeerAvailableForUse = () => true;
const first = await getPeersForTest();
if (skipIfNoPeers(first)) return;
await peerManager.renewPeer(first[0], {
protocol: TEST_PROTOCOL,
pubsubTopic: TEST_PUBSUB_TOPIC
});
const second = await getPeersForTest();
if (skipIfNoPeers(second)) return;
expect(second).to.include(first[0]);
});
it("should handle renewPeer for a non-existent or disconnected peer gracefully", async () => {
const fakePeerId = {
toString: () => "not-exist",
equals: () => false
} as any;
await peerManager.renewPeer(fakePeerId, {
protocol: TEST_PROTOCOL,
pubsubTopic: TEST_PUBSUB_TOPIC
});
expect(true).to.be.true;
});
it("should add CONNECTION_LOCKED_TAG to peer connections when locking", async () => {
clearPeerState();
const result = await getPeersForTest();
if (skipIfNoPeers(result)) return;
const peerId = result[0];
const connection = mockConnections.find((c) => c.remotePeer.equals(peerId));
expect(connection).to.exist;
expect(connection.tags).to.include(CONNECTION_LOCKED_TAG);
});
it("should remove CONNECTION_LOCKED_TAG from peer connections when unlocking", async () => {
clearPeerState();
const result = await getPeersForTest();
if (skipIfNoPeers(result)) return;
const peerId = result[0];
await peerManager.renewPeer(peerId, {
protocol: TEST_PROTOCOL,
pubsubTopic: TEST_PUBSUB_TOPIC
});
const connection = mockConnections.find((c) => c.remotePeer.equals(peerId));
expect(connection).to.exist;
expect(connection.tags).to.not.include(CONNECTION_LOCKED_TAG);
});
it("should not modify tags of connections for different peers", async () => {
clearPeerState();
const result = await getPeersForTest();
if (skipIfNoPeers(result)) return;
const lockedPeerId = result[0];
const otherPeerId = peers.find((p) => !p.id.equals(lockedPeerId))?.id;
if (!otherPeerId) return;
const otherConnection = mockConnections.find((c) =>
c.remotePeer.equals(otherPeerId)
);
expect(otherConnection).to.exist;
expect(otherConnection.tags).to.not.include(CONNECTION_LOCKED_TAG);
});
});
function mockLibp2p(connections: any[]): Libp2p {
return {
getConnections: sinon.stub().returns(connections),
getPeers: sinon
.stub()
.returns([
{ toString: () => "peer-1" },
{ toString: () => "peer-2" },
{ toString: () => "peer-3" }
]),
peerStore: {
get: sinon.stub().callsFake((peerId: PeerId) =>
Promise.resolve({
id: peerId,
protocols: [Protocols.LightPush, Protocols.Filter, Protocols.Store]
})
)
},
dispatchEvent: sinon.spy(),
addEventListener: sinon.spy(),
removeEventListener: sinon.spy()
} as unknown as Libp2p;
}
function makePeerId(id: string): PeerId {
return {
toString: () => id,
equals: (other: any) => other && other.toString && other.toString() === id
} as PeerId;
}