mirror of
https://github.com/logos-messaging/js-waku.git
synced 2026-01-02 13:53:12 +00:00
feat: move Peer to PeerId (#2246)
* feat: move Peer to PeerId * up tests * update tests
This commit is contained in:
parent
2a7f4b6e86
commit
fc93fae873
@ -1,5 +1,5 @@
|
||||
import type { Libp2p } from "@libp2p/interface";
|
||||
import type { Peer, Stream } from "@libp2p/interface";
|
||||
import type { PeerId, Stream } from "@libp2p/interface";
|
||||
import type {
|
||||
IBaseProtocolCore,
|
||||
Libp2pComponents,
|
||||
@ -38,7 +38,7 @@ export class BaseProtocol implements IBaseProtocolCore {
|
||||
);
|
||||
}
|
||||
|
||||
protected async getStream(peer: Peer): Promise<Stream> {
|
||||
return this.streamManager.getStream(peer);
|
||||
protected async getStream(peerId: PeerId): Promise<Stream> {
|
||||
return this.streamManager.getStream(peerId);
|
||||
}
|
||||
}
|
||||
|
||||
@ -1,4 +1,4 @@
|
||||
import type { Peer, Stream } from "@libp2p/interface";
|
||||
import type { PeerId, Stream } from "@libp2p/interface";
|
||||
import type { IncomingStreamData } from "@libp2p/interface-internal";
|
||||
import {
|
||||
type ContentTopic,
|
||||
@ -53,10 +53,10 @@ export class FilterCore extends BaseProtocol implements IBaseProtocolCore {
|
||||
|
||||
public async subscribe(
|
||||
pubsubTopic: PubsubTopic,
|
||||
peer: Peer,
|
||||
peerId: PeerId,
|
||||
contentTopics: ContentTopic[]
|
||||
): Promise<CoreProtocolResult> {
|
||||
const stream = await this.getStream(peer);
|
||||
const stream = await this.getStream(peerId);
|
||||
|
||||
const request = FilterSubscribeRpc.createSubscribeRequest(
|
||||
pubsubTopic,
|
||||
@ -78,7 +78,7 @@ export class FilterCore extends BaseProtocol implements IBaseProtocolCore {
|
||||
success: null,
|
||||
failure: {
|
||||
error: ProtocolError.GENERIC_FAIL,
|
||||
peerId: peer.id
|
||||
peerId: peerId
|
||||
}
|
||||
};
|
||||
}
|
||||
@ -93,7 +93,7 @@ export class FilterCore extends BaseProtocol implements IBaseProtocolCore {
|
||||
return {
|
||||
failure: {
|
||||
error: ProtocolError.REMOTE_PEER_REJECTED,
|
||||
peerId: peer.id
|
||||
peerId: peerId
|
||||
},
|
||||
success: null
|
||||
};
|
||||
@ -101,28 +101,28 @@ export class FilterCore extends BaseProtocol implements IBaseProtocolCore {
|
||||
|
||||
return {
|
||||
failure: null,
|
||||
success: peer.id
|
||||
success: peerId
|
||||
};
|
||||
}
|
||||
|
||||
public async unsubscribe(
|
||||
pubsubTopic: PubsubTopic,
|
||||
peer: Peer,
|
||||
peerId: PeerId,
|
||||
contentTopics: ContentTopic[]
|
||||
): Promise<CoreProtocolResult> {
|
||||
let stream: Stream | undefined;
|
||||
try {
|
||||
stream = await this.getStream(peer);
|
||||
stream = await this.getStream(peerId);
|
||||
} catch (error) {
|
||||
log.error(
|
||||
`Failed to get a stream for remote peer${peer.id.toString()}`,
|
||||
`Failed to get a stream for remote peer${peerId.toString()}`,
|
||||
error
|
||||
);
|
||||
return {
|
||||
success: null,
|
||||
failure: {
|
||||
error: ProtocolError.NO_STREAM_AVAILABLE,
|
||||
peerId: peer.id
|
||||
peerId: peerId
|
||||
}
|
||||
};
|
||||
}
|
||||
@ -140,22 +140,22 @@ export class FilterCore extends BaseProtocol implements IBaseProtocolCore {
|
||||
success: null,
|
||||
failure: {
|
||||
error: ProtocolError.GENERIC_FAIL,
|
||||
peerId: peer.id
|
||||
peerId: peerId
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
return {
|
||||
success: peer.id,
|
||||
success: peerId,
|
||||
failure: null
|
||||
};
|
||||
}
|
||||
|
||||
public async unsubscribeAll(
|
||||
pubsubTopic: PubsubTopic,
|
||||
peer: Peer
|
||||
peerId: PeerId
|
||||
): Promise<CoreProtocolResult> {
|
||||
const stream = await this.getStream(peer);
|
||||
const stream = await this.getStream(peerId);
|
||||
|
||||
const request = FilterSubscribeRpc.createUnsubscribeAllRequest(pubsubTopic);
|
||||
|
||||
@ -171,7 +171,7 @@ export class FilterCore extends BaseProtocol implements IBaseProtocolCore {
|
||||
return {
|
||||
failure: {
|
||||
error: ProtocolError.NO_RESPONSE,
|
||||
peerId: peer.id
|
||||
peerId: peerId
|
||||
},
|
||||
success: null
|
||||
};
|
||||
@ -187,7 +187,7 @@ export class FilterCore extends BaseProtocol implements IBaseProtocolCore {
|
||||
return {
|
||||
failure: {
|
||||
error: ProtocolError.REMOTE_PEER_REJECTED,
|
||||
peerId: peer.id
|
||||
peerId: peerId
|
||||
},
|
||||
success: null
|
||||
};
|
||||
@ -195,24 +195,24 @@ export class FilterCore extends BaseProtocol implements IBaseProtocolCore {
|
||||
|
||||
return {
|
||||
failure: null,
|
||||
success: peer.id
|
||||
success: peerId
|
||||
};
|
||||
}
|
||||
|
||||
public async ping(peer: Peer): Promise<CoreProtocolResult> {
|
||||
public async ping(peerId: PeerId): Promise<CoreProtocolResult> {
|
||||
let stream: Stream | undefined;
|
||||
try {
|
||||
stream = await this.getStream(peer);
|
||||
stream = await this.getStream(peerId);
|
||||
} catch (error) {
|
||||
log.error(
|
||||
`Failed to get a stream for remote peer${peer.id.toString()}`,
|
||||
`Failed to get a stream for remote peer${peerId.toString()}`,
|
||||
error
|
||||
);
|
||||
return {
|
||||
success: null,
|
||||
failure: {
|
||||
error: ProtocolError.NO_STREAM_AVAILABLE,
|
||||
peerId: peer.id
|
||||
peerId: peerId
|
||||
}
|
||||
};
|
||||
}
|
||||
@ -234,7 +234,7 @@ export class FilterCore extends BaseProtocol implements IBaseProtocolCore {
|
||||
success: null,
|
||||
failure: {
|
||||
error: ProtocolError.GENERIC_FAIL,
|
||||
peerId: peer.id
|
||||
peerId: peerId
|
||||
}
|
||||
};
|
||||
}
|
||||
@ -244,7 +244,7 @@ export class FilterCore extends BaseProtocol implements IBaseProtocolCore {
|
||||
success: null,
|
||||
failure: {
|
||||
error: ProtocolError.NO_RESPONSE,
|
||||
peerId: peer.id
|
||||
peerId: peerId
|
||||
}
|
||||
};
|
||||
}
|
||||
@ -260,12 +260,12 @@ export class FilterCore extends BaseProtocol implements IBaseProtocolCore {
|
||||
success: null,
|
||||
failure: {
|
||||
error: ProtocolError.REMOTE_PEER_REJECTED,
|
||||
peerId: peer.id
|
||||
peerId: peerId
|
||||
}
|
||||
};
|
||||
}
|
||||
return {
|
||||
success: peer.id,
|
||||
success: peerId,
|
||||
failure: null
|
||||
};
|
||||
}
|
||||
|
||||
@ -1,4 +1,4 @@
|
||||
import type { Peer, Stream } from "@libp2p/interface";
|
||||
import type { PeerId, Stream } from "@libp2p/interface";
|
||||
import {
|
||||
type CoreProtocolResult,
|
||||
type IBaseProtocolCore,
|
||||
@ -76,11 +76,10 @@ export class LightPushCore extends BaseProtocol implements IBaseProtocolCore {
|
||||
}
|
||||
}
|
||||
|
||||
// TODO(weboko): use peer.id as parameter instead
|
||||
public async send(
|
||||
encoder: IEncoder,
|
||||
message: IMessage,
|
||||
peer: Peer
|
||||
peerId: PeerId
|
||||
): Promise<CoreProtocolResult> {
|
||||
const { query, error: preparationError } = await this.preparePushMessage(
|
||||
encoder,
|
||||
@ -92,21 +91,21 @@ export class LightPushCore extends BaseProtocol implements IBaseProtocolCore {
|
||||
success: null,
|
||||
failure: {
|
||||
error: preparationError,
|
||||
peerId: peer.id
|
||||
peerId
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
let stream: Stream;
|
||||
try {
|
||||
stream = await this.getStream(peer);
|
||||
stream = await this.getStream(peerId);
|
||||
} catch (error) {
|
||||
log.error("Failed to get stream", error);
|
||||
return {
|
||||
success: null,
|
||||
failure: {
|
||||
error: ProtocolError.NO_STREAM_AVAILABLE,
|
||||
peerId: peer.id
|
||||
peerId: peerId
|
||||
}
|
||||
};
|
||||
}
|
||||
@ -126,7 +125,7 @@ export class LightPushCore extends BaseProtocol implements IBaseProtocolCore {
|
||||
success: null,
|
||||
failure: {
|
||||
error: ProtocolError.GENERIC_FAIL,
|
||||
peerId: peer.id
|
||||
peerId: peerId
|
||||
}
|
||||
};
|
||||
}
|
||||
@ -145,7 +144,7 @@ export class LightPushCore extends BaseProtocol implements IBaseProtocolCore {
|
||||
success: null,
|
||||
failure: {
|
||||
error: ProtocolError.DECODE_FAILED,
|
||||
peerId: peer.id
|
||||
peerId: peerId
|
||||
}
|
||||
};
|
||||
}
|
||||
@ -156,7 +155,7 @@ export class LightPushCore extends BaseProtocol implements IBaseProtocolCore {
|
||||
success: null,
|
||||
failure: {
|
||||
error: ProtocolError.NO_RESPONSE,
|
||||
peerId: peer.id
|
||||
peerId: peerId
|
||||
}
|
||||
};
|
||||
}
|
||||
@ -168,7 +167,7 @@ export class LightPushCore extends BaseProtocol implements IBaseProtocolCore {
|
||||
success: null,
|
||||
failure: {
|
||||
error: rlnErrorCase,
|
||||
peerId: peer.id
|
||||
peerId: peerId
|
||||
}
|
||||
};
|
||||
}
|
||||
@ -179,11 +178,11 @@ export class LightPushCore extends BaseProtocol implements IBaseProtocolCore {
|
||||
success: null,
|
||||
failure: {
|
||||
error: ProtocolError.REMOTE_PEER_REJECTED,
|
||||
peerId: peer.id
|
||||
peerId: peerId
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
return { success: peer.id, failure: null };
|
||||
return { success: peerId, failure: null };
|
||||
}
|
||||
}
|
||||
|
||||
@ -55,7 +55,7 @@ class Metadata extends BaseProtocol implements IMetadata {
|
||||
|
||||
let stream;
|
||||
try {
|
||||
stream = await this.getStream(peer);
|
||||
stream = await this.getStream(peerId);
|
||||
} catch (error) {
|
||||
log.error("Failed to get stream", error);
|
||||
return {
|
||||
|
||||
@ -1,4 +1,4 @@
|
||||
import type { Peer } from "@libp2p/interface";
|
||||
import type { PeerId } from "@libp2p/interface";
|
||||
import {
|
||||
IDecodedMessage,
|
||||
IDecoder,
|
||||
@ -38,7 +38,7 @@ export class StoreCore extends BaseProtocol implements IStoreCore {
|
||||
public async *queryPerPage<T extends IDecodedMessage>(
|
||||
queryOpts: QueryRequestParams,
|
||||
decoders: Map<string, IDecoder<T>>,
|
||||
peer: Peer
|
||||
peerId: PeerId
|
||||
): AsyncGenerator<Promise<T | undefined>[]> {
|
||||
if (
|
||||
queryOpts.contentTopics.toString() !==
|
||||
@ -58,7 +58,7 @@ export class StoreCore extends BaseProtocol implements IStoreCore {
|
||||
|
||||
let stream;
|
||||
try {
|
||||
stream = await this.getStream(peer);
|
||||
stream = await this.getStream(peerId);
|
||||
} catch (e) {
|
||||
log.error("Failed to get stream", e);
|
||||
break;
|
||||
|
||||
@ -36,7 +36,7 @@ describe("StreamManager", () => {
|
||||
|
||||
streamManager["getConnections"] = (_peerId: PeerId | undefined) => [con1];
|
||||
|
||||
const stream = await streamManager.getStream(mockPeer);
|
||||
const stream = await streamManager.getStream(mockPeer.id);
|
||||
|
||||
expect(stream).not.to.be.undefined;
|
||||
expect(stream?.id).to.be.eq("1");
|
||||
@ -48,7 +48,7 @@ describe("StreamManager", () => {
|
||||
|
||||
let error: Error | undefined;
|
||||
try {
|
||||
await streamManager.getStream(mockPeer);
|
||||
await streamManager.getStream(mockPeer.id);
|
||||
} catch (e) {
|
||||
error = e as Error;
|
||||
}
|
||||
@ -76,7 +76,7 @@ describe("StreamManager", () => {
|
||||
con1.newStream = newStreamSpy;
|
||||
streamManager["getConnections"] = (_peerId: PeerId | undefined) => [con1];
|
||||
|
||||
const stream = await streamManager.getStream(mockPeer);
|
||||
const stream = await streamManager.getStream(mockPeer.id);
|
||||
|
||||
expect(stream).not.to.be.undefined;
|
||||
expect(stream?.id).to.be.eq("2");
|
||||
@ -102,8 +102,8 @@ describe("StreamManager", () => {
|
||||
streamManager["getConnections"] = (_peerId: PeerId | undefined) => [con1];
|
||||
|
||||
const [stream1, stream2] = await Promise.all([
|
||||
streamManager.getStream(mockPeer),
|
||||
streamManager.getStream(mockPeer)
|
||||
streamManager.getStream(mockPeer.id),
|
||||
streamManager.getStream(mockPeer.id)
|
||||
]);
|
||||
|
||||
const expected = ["1", "2"].toString();
|
||||
|
||||
@ -21,39 +21,38 @@ export class StreamManager {
|
||||
this.addEventListener("peer:update", this.handlePeerUpdateStreamPool);
|
||||
}
|
||||
|
||||
public async getStream(peer: Peer): Promise<Stream> {
|
||||
const peerId = peer.id.toString();
|
||||
|
||||
const scheduledStream = this.streamPool.get(peerId);
|
||||
public async getStream(peerId: PeerId): Promise<Stream> {
|
||||
const peerIdStr = peerId.toString();
|
||||
const scheduledStream = this.streamPool.get(peerIdStr);
|
||||
|
||||
if (scheduledStream) {
|
||||
this.streamPool.delete(peerId);
|
||||
this.streamPool.delete(peerIdStr);
|
||||
await scheduledStream;
|
||||
}
|
||||
|
||||
let stream = this.getOpenStreamForCodec(peer.id);
|
||||
let stream = this.getOpenStreamForCodec(peerId);
|
||||
|
||||
if (stream) {
|
||||
this.log.info(
|
||||
`Found existing stream peerId=${peer.id.toString()} multicodec=${this.multicodec}`
|
||||
`Found existing stream peerId=${peerIdStr} multicodec=${this.multicodec}`
|
||||
);
|
||||
this.lockStream(peer.id.toString(), stream);
|
||||
this.lockStream(peerIdStr, stream);
|
||||
return stream;
|
||||
}
|
||||
|
||||
stream = await this.createStream(peer);
|
||||
this.lockStream(peer.id.toString(), stream);
|
||||
stream = await this.createStream(peerId);
|
||||
this.lockStream(peerIdStr, stream);
|
||||
|
||||
return stream;
|
||||
}
|
||||
|
||||
private async createStream(peer: Peer, retries = 0): Promise<Stream> {
|
||||
const connections = this.getConnections(peer.id);
|
||||
private async createStream(peerId: PeerId, retries = 0): Promise<Stream> {
|
||||
const connections = this.getConnections(peerId);
|
||||
const connection = selectOpenConnection(connections);
|
||||
|
||||
if (!connection) {
|
||||
throw new Error(
|
||||
`Failed to get a connection to the peer peerId=${peer.id.toString()} multicodec=${this.multicodec}`
|
||||
`Failed to get a connection to the peer peerId=${peerId.toString()} multicodec=${this.multicodec}`
|
||||
);
|
||||
}
|
||||
|
||||
@ -63,11 +62,11 @@ export class StreamManager {
|
||||
for (let i = 0; i < retries + 1; i++) {
|
||||
try {
|
||||
this.log.info(
|
||||
`Attempting to create a stream for peerId=${peer.id.toString()} multicodec=${this.multicodec}`
|
||||
`Attempting to create a stream for peerId=${peerId.toString()} multicodec=${this.multicodec}`
|
||||
);
|
||||
stream = await connection.newStream(this.multicodec);
|
||||
this.log.info(
|
||||
`Created stream for peerId=${peer.id.toString()} multicodec=${this.multicodec}`
|
||||
`Created stream for peerId=${peerId.toString()} multicodec=${this.multicodec}`
|
||||
);
|
||||
break;
|
||||
} catch (error) {
|
||||
@ -77,8 +76,7 @@ export class StreamManager {
|
||||
|
||||
if (!stream) {
|
||||
throw new Error(
|
||||
`Failed to create a new stream for ${peer.id.toString()} -- ` +
|
||||
lastError
|
||||
`Failed to create a new stream for ${peerId.toString()} -- ` + lastError
|
||||
);
|
||||
}
|
||||
|
||||
@ -97,7 +95,7 @@ export class StreamManager {
|
||||
|
||||
try {
|
||||
this.ongoingCreation.add(peerId);
|
||||
await this.createStream(peer);
|
||||
await this.createStream(peer.id);
|
||||
} catch (error) {
|
||||
this.log.error(`Failed to createStreamWithLock:`, error);
|
||||
} finally {
|
||||
|
||||
@ -57,7 +57,7 @@ export class WakuPeerExchange extends BaseProtocol implements IPeerExchange {
|
||||
|
||||
let stream;
|
||||
try {
|
||||
stream = await this.getStream(peer);
|
||||
stream = await this.getStream(peerId);
|
||||
} catch (err) {
|
||||
log.error("Failed to get stream", err);
|
||||
return {
|
||||
|
||||
@ -157,8 +157,8 @@ class Filter implements IFilter {
|
||||
|
||||
ensurePubsubTopicIsConfigured(pubsubTopic, this.protocol.pubsubTopics);
|
||||
|
||||
const peers = await this.peerManager.getPeers();
|
||||
if (peers.length === 0) {
|
||||
const peerIds = await this.peerManager.getPeers();
|
||||
if (peerIds.length === 0) {
|
||||
return {
|
||||
error: ProtocolError.NO_PEER_AVAILABLE,
|
||||
subscription: null
|
||||
@ -166,8 +166,8 @@ class Filter implements IFilter {
|
||||
}
|
||||
|
||||
log.info(
|
||||
`Creating filter subscription with ${peers.length} peers: `,
|
||||
peers.map((peer) => peer.id.toString())
|
||||
`Creating filter subscription with ${peerIds.length} peers: `,
|
||||
peerIds.map((id) => id.toString())
|
||||
);
|
||||
|
||||
const subscription =
|
||||
|
||||
@ -1,4 +1,4 @@
|
||||
import type { EventHandler, Peer, PeerId } from "@libp2p/interface";
|
||||
import type { EventHandler, PeerId } from "@libp2p/interface";
|
||||
import { FilterCore } from "@waku/core";
|
||||
import type {
|
||||
FilterProtocolOptions,
|
||||
@ -31,7 +31,7 @@ export class SubscriptionMonitor {
|
||||
* Cached peers that are in use by subscription.
|
||||
* Needed to understand if they disconnect later or not.
|
||||
*/
|
||||
public peers: Peer[] = [];
|
||||
public peerIds: PeerId[] = [];
|
||||
|
||||
private isStarted: boolean = false;
|
||||
|
||||
@ -108,12 +108,12 @@ export class SubscriptionMonitor {
|
||||
* Method to get peers that are used by particular subscription or, if initially called, peers that can be used by subscription.
|
||||
* @returns array of peers
|
||||
*/
|
||||
public async getPeers(): Promise<Peer[]> {
|
||||
public async getPeers(): Promise<PeerId[]> {
|
||||
if (!this.isStarted) {
|
||||
this.peers = await this.peerManager.getPeers();
|
||||
this.peerIds = await this.peerManager.getPeers();
|
||||
}
|
||||
|
||||
return this.peers;
|
||||
return this.peerIds;
|
||||
}
|
||||
|
||||
/**
|
||||
@ -168,7 +168,7 @@ export class SubscriptionMonitor {
|
||||
return;
|
||||
}
|
||||
|
||||
await Promise.all(this.peers.map((peer) => this.ping(peer, true)));
|
||||
await Promise.all(this.peerIds.map((id) => this.ping(id, true)));
|
||||
this.startKeepAlive();
|
||||
}
|
||||
|
||||
@ -178,7 +178,7 @@ export class SubscriptionMonitor {
|
||||
}
|
||||
|
||||
this.keepAliveIntervalId = setInterval(() => {
|
||||
void this.peers.map((peer) => this.ping(peer));
|
||||
void this.peerIds.map((id) => this.ping(id));
|
||||
}, this.config.keepAliveIntervalMs) as unknown as number;
|
||||
}
|
||||
|
||||
@ -216,40 +216,40 @@ export class SubscriptionMonitor {
|
||||
// this method keeps track of new connections and will trigger subscribe request if needed
|
||||
private async onPeerConnected(_event: CustomEvent<PeerId>): Promise<void> {
|
||||
// TODO(weboko): use config.numOfUsedPeers instead of this.peers
|
||||
const hasSomePeers = this.peers.length > 0;
|
||||
const hasSomePeers = this.peerIds.length > 0;
|
||||
if (hasSomePeers) {
|
||||
return;
|
||||
}
|
||||
|
||||
this.peers = await this.peerManager.getPeers();
|
||||
await Promise.all(this.peers.map((peer) => this.subscribe(peer)));
|
||||
this.peerIds = await this.peerManager.getPeers();
|
||||
await Promise.all(this.peerIds.map((id) => this.subscribe(id)));
|
||||
}
|
||||
|
||||
// this method keeps track of disconnects and will trigger subscribe request if needed
|
||||
private async onPeerDisconnected(event: CustomEvent<PeerId>): Promise<void> {
|
||||
const hasNotBeenUsed = !this.peers.find((p) => p.id.equals(event.detail));
|
||||
const hasNotBeenUsed = !this.peerIds.find((id) => id.equals(event.detail));
|
||||
if (hasNotBeenUsed) {
|
||||
return;
|
||||
}
|
||||
|
||||
this.peers = await this.peerManager.getPeers();
|
||||
this.peerIds = await this.peerManager.getPeers();
|
||||
|
||||
// we trigger subscribe for peer that was used before
|
||||
// it will expectedly fail and we will initiate addition of a new peer
|
||||
await Promise.all(this.peers.map((peer) => this.subscribe(peer)));
|
||||
await Promise.all(this.peerIds.map((id) => this.subscribe(id)));
|
||||
}
|
||||
|
||||
private async subscribe(_peer: Peer | undefined): Promise<void> {
|
||||
let peer: Peer | undefined = _peer;
|
||||
private async subscribe(_peerId: PeerId | undefined): Promise<void> {
|
||||
let peerId: PeerId | undefined = _peerId;
|
||||
|
||||
for (let i = 0; i < MAX_SUBSCRIBE_ATTEMPTS; i++) {
|
||||
if (!peer) {
|
||||
if (!peerId) {
|
||||
return;
|
||||
}
|
||||
|
||||
const response = await this.filter.subscribe(
|
||||
this.pubsubTopic,
|
||||
peer,
|
||||
peerId,
|
||||
Array.from(this.activeSubscriptions.keys())
|
||||
);
|
||||
|
||||
@ -257,19 +257,19 @@ export class SubscriptionMonitor {
|
||||
return;
|
||||
}
|
||||
|
||||
peer = await this.peerManager.requestRenew(peer.id);
|
||||
peerId = await this.peerManager.requestRenew(peerId);
|
||||
}
|
||||
}
|
||||
|
||||
private async ping(
|
||||
peer: Peer,
|
||||
peerId: PeerId,
|
||||
renewOnFirstFail: boolean = false
|
||||
): Promise<void> {
|
||||
const peerIdStr = peer.id.toString();
|
||||
const response = await this.filter.ping(peer);
|
||||
const peerIdStr = peerId.toString();
|
||||
const response = await this.filter.ping(peerId);
|
||||
|
||||
if (response.failure && renewOnFirstFail) {
|
||||
const newPeer = await this.peerManager.requestRenew(peer.id);
|
||||
const newPeer = await this.peerManager.requestRenew(peerId);
|
||||
await this.subscribe(newPeer);
|
||||
return;
|
||||
}
|
||||
@ -286,7 +286,7 @@ export class SubscriptionMonitor {
|
||||
const madeAttempts = this.pingFailedAttempts.get(peerIdStr) || 0;
|
||||
|
||||
if (madeAttempts >= this.config.pingsBeforePeerRenewed) {
|
||||
const newPeer = await this.peerManager.requestRenew(peer.id);
|
||||
const newPeer = await this.peerManager.requestRenew(peerId);
|
||||
await this.subscribe(newPeer);
|
||||
}
|
||||
}
|
||||
|
||||
@ -1,4 +1,4 @@
|
||||
import { Peer } from "@libp2p/interface";
|
||||
import { Peer, PeerId } from "@libp2p/interface";
|
||||
import {
|
||||
ConnectionManager,
|
||||
createEncoder,
|
||||
@ -59,8 +59,8 @@ describe("LightPush SDK", () => {
|
||||
|
||||
lightPush = mockLightPush({ libp2p, numPeersToUse: 2 });
|
||||
let sendSpy = sinon.spy(
|
||||
(_encoder: any, _message: any, peer: Peer) =>
|
||||
({ success: peer.id }) as any
|
||||
(_encoder: any, _message: any, peerId: PeerId) =>
|
||||
({ success: peerId }) as any
|
||||
);
|
||||
lightPush.protocol.send = sendSpy;
|
||||
|
||||
@ -74,8 +74,8 @@ describe("LightPush SDK", () => {
|
||||
// check if setting another value works
|
||||
lightPush = mockLightPush({ libp2p, numPeersToUse: 3 });
|
||||
sendSpy = sinon.spy(
|
||||
(_encoder: any, _message: any, peer: Peer) =>
|
||||
({ success: peer.id }) as any
|
||||
(_encoder: any, _message: any, peerId: PeerId) =>
|
||||
({ success: peerId }) as any
|
||||
);
|
||||
lightPush.protocol.send = sendSpy;
|
||||
|
||||
@ -91,9 +91,9 @@ describe("LightPush SDK", () => {
|
||||
});
|
||||
|
||||
lightPush = mockLightPush({ libp2p });
|
||||
let sendSpy = sinon.spy((_encoder: any, _message: any, peer: Peer) => {
|
||||
if (peer.id.toString() === "1") {
|
||||
return { success: peer.id };
|
||||
let sendSpy = sinon.spy((_encoder: any, _message: any, peerId: PeerId) => {
|
||||
if (peerId.toString() === "1") {
|
||||
return { success: peerId };
|
||||
}
|
||||
|
||||
return { failure: { error: "problem" } };
|
||||
@ -108,7 +108,7 @@ describe("LightPush SDK", () => {
|
||||
{ autoRetry: true }
|
||||
);
|
||||
|
||||
expect(attemptRetriesSpy.calledOnce).to.be.true;
|
||||
expect(attemptRetriesSpy.callCount).to.be.eq(1);
|
||||
expect(result.successes?.length).to.be.eq(1);
|
||||
expect(result.failures?.length).to.be.eq(1);
|
||||
|
||||
@ -162,7 +162,6 @@ function mockLightPush(options: MockLightPushOptions): LightPush {
|
||||
getPeers: () =>
|
||||
options.libp2p
|
||||
.getPeers()
|
||||
.map((id) => mockPeer(id.toString()))
|
||||
.slice(0, options.numPeersToUse || options.libp2p.getPeers().length)
|
||||
} as unknown as PeerManager,
|
||||
options.libp2p
|
||||
|
||||
@ -1,4 +1,4 @@
|
||||
import type { Peer, PeerId } from "@libp2p/interface";
|
||||
import type { PeerId } from "@libp2p/interface";
|
||||
import { ConnectionManager, getHealthManager, LightPushCore } from "@waku/core";
|
||||
import {
|
||||
type CoreProtocolResult,
|
||||
@ -23,7 +23,7 @@ const DEFAULT_SEND_OPTIONS: ISenderOptions = {
|
||||
maxAttempts: DEFAULT_MAX_ATTEMPTS
|
||||
};
|
||||
|
||||
type RetryCallback = (peer: Peer) => Promise<CoreProtocolResult>;
|
||||
type RetryCallback = (peerId: PeerId) => Promise<CoreProtocolResult>;
|
||||
|
||||
export class LightPush implements ILightPush {
|
||||
public readonly protocol: LightPushCore;
|
||||
@ -59,8 +59,8 @@ export class LightPush implements ILightPush {
|
||||
};
|
||||
}
|
||||
|
||||
const peers = await this.peerManager.getPeers();
|
||||
if (peers.length === 0) {
|
||||
const peerIds = await this.peerManager.getPeers();
|
||||
if (peerIds.length === 0) {
|
||||
return {
|
||||
successes,
|
||||
failures: [
|
||||
@ -72,7 +72,7 @@ export class LightPush implements ILightPush {
|
||||
}
|
||||
|
||||
const results = await Promise.allSettled(
|
||||
peers.map((peer) => this.protocol.send(encoder, message, peer))
|
||||
peerIds.map((id) => this.protocol.send(encoder, message, id))
|
||||
);
|
||||
|
||||
for (const result of results) {
|
||||
@ -94,7 +94,7 @@ export class LightPush implements ILightPush {
|
||||
|
||||
if (options?.autoRetry) {
|
||||
void this.attemptRetries(
|
||||
(peer: Peer) => this.protocol.send(encoder, message, peer),
|
||||
(id: PeerId) => this.protocol.send(encoder, message, id),
|
||||
options.maxAttempts
|
||||
);
|
||||
}
|
||||
@ -117,23 +117,23 @@ export class LightPush implements ILightPush {
|
||||
maxAttempts?: number
|
||||
): Promise<void> {
|
||||
maxAttempts = maxAttempts || DEFAULT_MAX_ATTEMPTS;
|
||||
const connectedPeers = await this.peerManager.getPeers();
|
||||
const peerIds = await this.peerManager.getPeers();
|
||||
|
||||
if (connectedPeers.length === 0) {
|
||||
if (peerIds.length === 0) {
|
||||
log.warn("Cannot retry with no connected peers.");
|
||||
return;
|
||||
}
|
||||
|
||||
for (let i = 0; i < maxAttempts; i++) {
|
||||
const peer = connectedPeers[i % connectedPeers.length]; // always present as we checked for the length already
|
||||
const response = await fn(peer);
|
||||
const id = peerIds[i % peerIds.length]; // always present as we checked for the length already
|
||||
const response = await fn(id);
|
||||
|
||||
if (response.success) {
|
||||
return;
|
||||
}
|
||||
|
||||
log.info(
|
||||
`Attempted retry for peer:${peer.id} failed with:${response?.failure?.error}`
|
||||
`Attempted retry for peer:${id} failed with:${response?.failure?.error}`
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
@ -48,9 +48,9 @@ describe("PeerManager", () => {
|
||||
];
|
||||
sinon.stub(libp2p, "getConnections").returns(connections);
|
||||
|
||||
const peer = await peerManager.requestRenew("1");
|
||||
expect(peer).to.not.be.undefined;
|
||||
expect(peer?.id).to.not.equal("1");
|
||||
const peerId = await peerManager.requestRenew("1");
|
||||
expect(peerId).to.not.be.undefined;
|
||||
expect(peerId).to.not.equal("1");
|
||||
});
|
||||
|
||||
it("should handle connection events", () => {
|
||||
|
||||
@ -1,4 +1,4 @@
|
||||
import { Connection, Peer, PeerId } from "@libp2p/interface";
|
||||
import { Connection, PeerId } from "@libp2p/interface";
|
||||
import { Libp2p } from "@waku/interfaces";
|
||||
import { Logger } from "@waku/utils";
|
||||
|
||||
@ -37,15 +37,13 @@ export class PeerManager {
|
||||
this.stopConnectionListener();
|
||||
}
|
||||
|
||||
public async getPeers(): Promise<Peer[]> {
|
||||
return Promise.all(
|
||||
this.getLockedConnections().map((c) => this.mapConnectionToPeer(c))
|
||||
);
|
||||
public async getPeers(): Promise<PeerId[]> {
|
||||
return Promise.all(this.getLockedConnections().map((c) => c.remotePeer));
|
||||
}
|
||||
|
||||
public async requestRenew(
|
||||
peerId: PeerId | string
|
||||
): Promise<Peer | undefined> {
|
||||
): Promise<PeerId | undefined> {
|
||||
const lockedConnections = this.getLockedConnections();
|
||||
const neededPeers = this.numPeersToUse - lockedConnections.length;
|
||||
|
||||
@ -58,12 +56,12 @@ export class PeerManager {
|
||||
.filter((c) => !c.remotePeer.equals(peerId))
|
||||
.slice(0, neededPeers)
|
||||
.map((c) => this.lockConnection(c))
|
||||
.map((c) => this.mapConnectionToPeer(c))
|
||||
.map((c) => c.remotePeer)
|
||||
);
|
||||
|
||||
const newPeer = result[0];
|
||||
const newPeerId = result[0];
|
||||
|
||||
if (!newPeer) {
|
||||
if (!newPeerId) {
|
||||
log.warn(
|
||||
`requestRenew: Couldn't renew peer ${peerId.toString()} - no peers.`
|
||||
);
|
||||
@ -71,10 +69,10 @@ export class PeerManager {
|
||||
}
|
||||
|
||||
log.info(
|
||||
`requestRenew: Renewed peer ${peerId.toString()} to ${newPeer.id.toString()}`
|
||||
`requestRenew: Renewed peer ${peerId.toString()} to ${newPeerId.toString()}`
|
||||
);
|
||||
|
||||
return newPeer;
|
||||
return newPeerId;
|
||||
}
|
||||
|
||||
private startConnectionListener(): void {
|
||||
@ -133,9 +131,4 @@ export class PeerManager {
|
||||
private isConnectionLocked(c: Connection): boolean {
|
||||
return c.tags.includes(CONNECTION_LOCK_TAG);
|
||||
}
|
||||
|
||||
private async mapConnectionToPeer(c: Connection): Promise<Peer> {
|
||||
const peerId = c.remotePeer;
|
||||
return this.libp2p.peerStore.get(peerId);
|
||||
}
|
||||
}
|
||||
|
||||
@ -1,4 +1,4 @@
|
||||
import type { Peer } from "@libp2p/interface";
|
||||
import type { PeerId } from "@libp2p/interface";
|
||||
import { ConnectionManager, StoreCore } from "@waku/core";
|
||||
import {
|
||||
IDecodedMessage,
|
||||
@ -223,26 +223,29 @@ export class Store implements IStore {
|
||||
};
|
||||
}
|
||||
|
||||
private async getPeerToUse(): Promise<Peer | undefined> {
|
||||
let peer: Peer | undefined;
|
||||
private async getPeerToUse(): Promise<PeerId | undefined> {
|
||||
let peerId: PeerId | undefined;
|
||||
|
||||
if (this.options?.peer) {
|
||||
const connectedPeers = await this.connectionManager.getConnectedPeers();
|
||||
|
||||
peer = connectedPeers.find((p) => p.id.toString() === this.options?.peer);
|
||||
const peer = connectedPeers.find(
|
||||
(p) => p.id.toString() === this.options?.peer
|
||||
);
|
||||
peerId = peer?.id;
|
||||
|
||||
if (!peer) {
|
||||
if (!peerId) {
|
||||
log.warn(
|
||||
`Passed node to use for Store not found: ${this.options.peer}. Attempting to use random peers.`
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
const peers = await this.peerManager.getPeers();
|
||||
const peerIds = await this.peerManager.getPeers();
|
||||
|
||||
if (peers.length > 0) {
|
||||
if (peerIds.length > 0) {
|
||||
// TODO(weboko): implement smart way of getting a peer https://github.com/waku-org/js-waku/issues/2243
|
||||
return peers[Math.floor(Math.random() * peers.length)];
|
||||
return peerIds[Math.floor(Math.random() * peerIds.length)];
|
||||
}
|
||||
|
||||
log.error("No peers available to use.");
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user