address review

This commit is contained in:
Sasha 2025-10-08 23:40:49 +02:00
parent de70791e41
commit e39c6c94bf
No known key found for this signature in database
5 changed files with 30 additions and 15 deletions

View File

@ -87,7 +87,7 @@ export class LightPush implements ILightPush {
protocol: options.useLegacy ? "light-push-v2" : Protocols.LightPush,
pubsubTopic: encoder.pubsubTopic
});
peerIds = peerIds.slice(0, options.numPeersToUse);
peerIds = peerIds?.slice(0, options.numPeersToUse);
const coreResults =
peerIds?.length > 0

View File

@ -18,6 +18,9 @@ type AckManagerConstructorParams = {
networkConfig: NetworkConfig;
};
const DEFAULT_QUERY_INTERVAL = 5000;
const QUERY_TIME_WINDOW_MS = 60 * 60 * 1000;
export class AckManager implements IAckManager {
private readonly messageStore: MessageStore;
private readonly filterAckManager: FilterAckManager;
@ -25,6 +28,7 @@ export class AckManager implements IAckManager {
private readonly networkConfig: NetworkConfig;
private readonly subscribedContentTopics: Set<string> = new Set();
private readonly subscribingAttempts: Set<string> = new Set();
public constructor(params: AckManagerConstructorParams) {
this.messageStore = params.messageStore;
@ -50,11 +54,15 @@ export class AckManager implements IAckManager {
}
public async subscribe(contentTopic: string): Promise<boolean> {
if (this.subscribedContentTopics.has(contentTopic)) {
if (
this.subscribedContentTopics.has(contentTopic) ||
this.subscribingAttempts.has(contentTopic)
) {
return true;
}
this.subscribedContentTopics.add(contentTopic);
this.subscribingAttempts.add(contentTopic);
const decoder = createDecoder(
contentTopic,
createRoutingInfo(this.networkConfig, {
@ -62,12 +70,14 @@ export class AckManager implements IAckManager {
})
);
return (
await Promise.all([
this.filterAckManager.subscribe(decoder),
this.storeAckManager.subscribe(decoder)
])
).some((success) => success);
const promises = await Promise.all([
this.filterAckManager.subscribe(decoder),
this.storeAckManager.subscribe(decoder)
]);
this.subscribedContentTopics.add(contentTopic);
this.subscribingAttempts.delete(contentTopic);
return promises.some((success) => success);
}
}
@ -128,7 +138,7 @@ class StoreAckManager {
this.interval = setInterval(() => {
void this.query();
}, 5000);
}, DEFAULT_QUERY_INTERVAL);
}
public stop(): void {
@ -157,7 +167,7 @@ class StoreAckManager {
this.messageStore.markStoreAck(message.hashStr);
},
{
timeStart: new Date(Date.now() - 60 * 60 * 1000),
timeStart: new Date(Date.now() - QUERY_TIME_WINDOW_MS),
timeEnd: new Date()
}
);

View File

@ -61,13 +61,13 @@ describe("Sender", () => {
expect(setIntervalSpy.calledWith(sinon.match.func, 1000)).to.be.true;
});
it("should create multiple intervals when called multiple times", () => {
it("should not create multiple intervals when called multiple times", () => {
const setIntervalSpy = sinon.spy(global, "setInterval");
sender.start();
sender.start();
expect(setIntervalSpy.calledTwice).to.be.true;
expect(setIntervalSpy.calledOnce).to.be.true;
});
});

View File

@ -17,6 +17,8 @@ type SenderConstructorParams = {
networkConfig: NetworkConfig;
};
const DEFAULT_SEND_INTERVAL = 1000;
export class Sender {
private readonly messageStore: MessageStore;
private readonly lightPush: ILightPush;
@ -35,7 +37,10 @@ export class Sender {
}
public start(): void {
this.sendInterval = setInterval(() => void this.backgroundSend(), 1000);
this.sendInterval = setInterval(
() => void this.backgroundSend(),
DEFAULT_SEND_INTERVAL
);
}
public stop(): void {

View File

@ -67,7 +67,7 @@ export class WakuNode implements IWaku {
private readonly connectionManager: ConnectionManager;
private readonly peerManager: PeerManager;
private readonly healthIndicator: HealthIndicator;
private readonly messaging: Messaging | null = null;
private messaging: Messaging | null = null;
public constructor(
options: CreateNodeOptions,