diff --git a/packages/core/src/lib/light_push/light_push.ts b/packages/core/src/lib/light_push/light_push.ts index eb3b517eeb..6206a9c89f 100644 --- a/packages/core/src/lib/light_push/light_push.ts +++ b/packages/core/src/lib/light_push/light_push.ts @@ -55,11 +55,11 @@ export class LightPushCore { }; } - const { rpc, error: prepError } = await ProtocolHandler.preparePushMessage( - encoder, - message, - protocol - ); + const { + rpc, + error: prepError, + message: protoMessage + } = await ProtocolHandler.preparePushMessage(encoder, message, protocol); if (prepError) { return { @@ -117,7 +117,21 @@ export class LightPushCore { }; } - return ProtocolHandler.handleResponse(bytes, protocol, peerId); + const processedResponse = ProtocolHandler.handleResponse( + bytes, + protocol, + peerId + ); + + if (processedResponse.success) { + return { + success: processedResponse.success, + failure: null, + message: protoMessage + }; + } + + return processedResponse; } private async getProtocol( diff --git a/packages/core/src/lib/light_push/protocol_handler.ts b/packages/core/src/lib/light_push/protocol_handler.ts index 429664f32d..0cf2835a96 100644 --- a/packages/core/src/lib/light_push/protocol_handler.ts +++ b/packages/core/src/lib/light_push/protocol_handler.ts @@ -1,5 +1,10 @@ import type { PeerId } from "@libp2p/interface"; -import type { IEncoder, IMessage, LightPushCoreResult } from "@waku/interfaces"; +import type { + IEncoder, + IMessage, + IProtoMessage, + LightPushCoreResult +} from "@waku/interfaces"; import { LightPushError, LightPushStatusCode } from "@waku/interfaces"; import { PushResponse, WakuMessage } from "@waku/proto"; import { isMessageSizeUnderCap, Logger } from "@waku/utils"; @@ -15,8 +20,8 @@ type VersionedPushRpc = | ({ version: "v3" } & PushRpc); type PreparePushMessageResult = - | { rpc: VersionedPushRpc; error: null } - | { rpc: null; error: LightPushError }; + | { rpc: VersionedPushRpc; error: null; message?: IProtoMessage } + | { rpc: null; error: LightPushError; message?: IProtoMessage }; const log = new Logger("light-push:protocol-handler"); @@ -47,13 +52,15 @@ export class ProtocolHandler { log.info("Creating v3 RPC message"); return { rpc: ProtocolHandler.createV3Rpc(protoMessage, encoder.pubsubTopic), - error: null + error: null, + message: protoMessage }; } log.info("Creating v2 RPC message"); return { rpc: ProtocolHandler.createV2Rpc(protoMessage, encoder.pubsubTopic), + message: protoMessage, error: null }; } catch (err) { diff --git a/packages/interfaces/src/protocols.ts b/packages/interfaces/src/protocols.ts index 0fb60c182f..66e2ca09db 100644 --- a/packages/interfaces/src/protocols.ts +++ b/packages/interfaces/src/protocols.ts @@ -5,7 +5,7 @@ import type { DiscoveryOptions, PeerCache } from "./discovery.js"; import type { FilterProtocolOptions } from "./filter.js"; import type { CreateLibp2pOptions } from "./libp2p.js"; import type { LightPushProtocolOptions } from "./light_push.js"; -import type { IDecodedMessage } from "./message.js"; +import type { IDecodedMessage, IProtoMessage } from "./message.js"; import type { ThisAndThat, ThisOrThat } from "./misc.js"; import { NetworkConfig } from "./sharding.js"; import type { StoreProtocolOptions } from "./store.js"; @@ -195,7 +195,13 @@ export type LightPushCoreResult = ThisOrThat< PeerId, "failure", LightPushFailure ->; +> & { + /** + * The proto object of the message. + * Present only if the message was successfully pushed to the network. + */ + message?: IProtoMessage; +}; export type FilterCoreResult = ThisOrThat< "success", @@ -209,7 +215,13 @@ export type LightPushSDKResult = ThisAndThat< PeerId[], "failures", LightPushFailure[] ->; +> & { + /** + * The proto objects of the messages. + * Present only if the messages were successfully pushed to the network. + */ + messages?: IProtoMessage[]; +}; export type FilterSDKResult = ThisAndThat< "successes", diff --git a/packages/interfaces/src/sender.ts b/packages/interfaces/src/sender.ts index da4fc5f003..8cfee2ebfc 100644 --- a/packages/interfaces/src/sender.ts +++ b/packages/interfaces/src/sender.ts @@ -20,6 +20,12 @@ export type ISendOptions = { * @default false */ useLegacy?: boolean; + + /** + * Amount of peers to send message to. + * Overrides `numPeersToUse` in {@link @waku/interfaces!CreateNodeOptions}. + */ + numPeersToUse?: number; }; export interface ISender { diff --git a/packages/sdk/src/light_push/light_push.ts b/packages/sdk/src/light_push/light_push.ts index 669c77e38c..05a94aaf91 100644 --- a/packages/sdk/src/light_push/light_push.ts +++ b/packages/sdk/src/light_push/light_push.ts @@ -4,6 +4,7 @@ import { type IEncoder, ILightPush, type IMessage, + IProtoMessage, type ISendOptions, type Libp2p, LightPushCoreResult, @@ -82,10 +83,11 @@ export class LightPush implements ILightPush { log.info("send: attempting to send a message to pubsubTopic:", pubsubTopic); - const peerIds = await this.peerManager.getPeers({ + let peerIds = await this.peerManager.getPeers({ protocol: options.useLegacy ? "light-push-v2" : Protocols.LightPush, pubsubTopic: encoder.pubsubTopic }); + peerIds = peerIds.slice(0, options.numPeersToUse); const coreResults = peerIds?.length > 0 @@ -93,12 +95,15 @@ export class LightPush implements ILightPush { peerIds.map((peerId) => this.protocol .send(encoder, message, peerId, options.useLegacy) - .catch((_e) => ({ - success: null, - failure: { - error: LightPushError.GENERIC_FAIL - } - })) + .catch( + (_e) => + ({ + success: null, + failure: { + error: LightPushError.GENERIC_FAIL + } + }) as LightPushCoreResult + ) ) ) : []; @@ -110,7 +115,10 @@ export class LightPush implements ILightPush { .map((v) => v.success) as PeerId[], failures: coreResults .filter((v) => v.failure) - .map((v) => v.failure) as LightPushFailure[] + .map((v) => v.failure) as LightPushFailure[], + messages: coreResults + .filter((v) => v.message) + .map((v) => v.message) as IProtoMessage[] } : { successes: [], diff --git a/packages/sdk/src/messaging/ack_manager.ts b/packages/sdk/src/messaging/ack_manager.ts index 21d856073f..8551cbc4b3 100644 --- a/packages/sdk/src/messaging/ack_manager.ts +++ b/packages/sdk/src/messaging/ack_manager.ts @@ -24,6 +24,8 @@ export class AckManager implements IAckManager { private readonly storeAckManager: StoreAckManager; private readonly networkConfig: NetworkConfig; + private readonly subscribedContentTopics: Set = new Set(); + public constructor(params: AckManagerConstructorParams) { this.messageStore = params.messageStore; this.networkConfig = params.networkConfig; @@ -44,9 +46,15 @@ export class AckManager implements IAckManager { public async stop(): Promise { await this.filterAckManager.stop(); this.storeAckManager.stop(); + this.subscribedContentTopics.clear(); } public async subscribe(contentTopic: string): Promise { + if (this.subscribedContentTopics.has(contentTopic)) { + return true; + } + + this.subscribedContentTopics.add(contentTopic); const decoder = createDecoder( contentTopic, createRoutingInfo(this.networkConfig, { @@ -55,9 +63,11 @@ export class AckManager implements IAckManager { ); return ( - (await this.filterAckManager.subscribe(decoder)) || - (await this.storeAckManager.subscribe(decoder)) - ); + await Promise.all([ + this.filterAckManager.subscribe(decoder), + this.storeAckManager.subscribe(decoder) + ]) + ).some((success) => success); } } @@ -118,7 +128,7 @@ class StoreAckManager { this.interval = setInterval(() => { void this.query(); - }, 1000); + }, 5000); } public stop(): void { diff --git a/packages/sdk/src/messaging/message_store.ts b/packages/sdk/src/messaging/message_store.ts index 5e9da4f2b6..7996beff08 100644 --- a/packages/sdk/src/messaging/message_store.ts +++ b/packages/sdk/src/messaging/message_store.ts @@ -32,15 +32,15 @@ export class MessageStore { private readonly resendIntervalMs: number; public constructor(options: MessageStoreOptions = {}) { - this.resendIntervalMs = options.resendIntervalMs ?? 2000; + this.resendIntervalMs = options.resendIntervalMs ?? 5000; } public has(hashStr: string): boolean { - return this.messages.has(hashStr); + return this.messages.has(hashStr) || this.pendingMessages.has(hashStr); } public add(message: IDecodedMessage, options: AddMessageOptions = {}): void { - if (!this.messages.has(message.hashStr)) { + if (!this.has(message.hashStr)) { this.messages.set(message.hashStr, { filterAck: options.filterAck ?? false, storeAck: options.storeAck ?? false, @@ -59,10 +59,7 @@ export class MessageStore { this.replacePendingWithMessage(hashStr); } - public async markSent( - requestId: RequestId, - sentMessage: IDecodedMessage - ): Promise { + public markSent(requestId: RequestId, sentMessage: IDecodedMessage): void { const entry = this.pendingRequests.get(requestId); if (!entry || !entry.messageRequest) { @@ -71,6 +68,8 @@ export class MessageStore { entry.lastSentAt = Number(sentMessage.timestamp); this.pendingMessages.set(sentMessage.hashStr, requestId); + + this.replacePendingWithMessage(sentMessage.hashStr); } public async queue(message: WakuLikeMessage): Promise { @@ -102,13 +101,11 @@ export class MessageStore { continue; } - const notSent = !entry.lastSentAt; - const notAcknowledged = - entry.lastSentAt && - Date.now() - entry.lastSentAt >= this.resendIntervalMs && - !isAcknowledged; + const sentAt = entry.lastSentAt || entry.createdAt; + const notTooRecent = Date.now() - sentAt >= this.resendIntervalMs; + const notAcknowledged = !isAcknowledged; - if (notSent || notAcknowledged) { + if (notTooRecent && notAcknowledged) { res.push({ requestId, message: entry.messageRequest @@ -154,12 +151,22 @@ export class MessageStore { return; } - const entry = this.pendingRequests.get(requestId); + let entry = this.pendingRequests.get(requestId); if (!entry) { return; } + // merge with message entry if possible + // this can happen if message we sent got received before we could add it to the message store + const messageEntry = this.messages.get(hashStr); + entry = { + ...entry, + ...messageEntry, + filterAck: messageEntry?.filterAck ?? entry.filterAck, + storeAck: messageEntry?.storeAck ?? entry.storeAck + }; + this.pendingRequests.delete(requestId); this.pendingMessages.delete(hashStr); diff --git a/packages/sdk/src/messaging/sender.ts b/packages/sdk/src/messaging/sender.ts index ea7d150658..35bfd38e94 100644 --- a/packages/sdk/src/messaging/sender.ts +++ b/packages/sdk/src/messaging/sender.ts @@ -1,10 +1,5 @@ import { createDecoder, createEncoder } from "@waku/core"; -import { - IDecodedMessage, - ILightPush, - IProtoMessage, - NetworkConfig -} from "@waku/interfaces"; +import { ILightPush, NetworkConfig } from "@waku/interfaces"; import { createRoutingInfo } from "@waku/utils"; import { AckManager } from "./ack_manager.js"; @@ -24,6 +19,8 @@ export class Sender { private readonly ackManager: AckManager; private readonly networkConfig: NetworkConfig; + private readonly processingRequests: Set = new Set(); + private sendInterval: ReturnType | null = null; public constructor(params: SenderConstructorParams) { @@ -48,37 +45,7 @@ export class Sender { const requestId = await this.messageStore.queue(wakuLikeMessage); await this.ackManager.subscribe(wakuLikeMessage.contentTopic); - - const encoder = createEncoder({ - contentTopic: wakuLikeMessage.contentTopic, - routingInfo: createRoutingInfo(this.networkConfig, { - contentTopic: wakuLikeMessage.contentTopic - }), - ephemeral: wakuLikeMessage.ephemeral - }); - - const decoder = createDecoder( - wakuLikeMessage.contentTopic, - createRoutingInfo(this.networkConfig, { - contentTopic: wakuLikeMessage.contentTopic - }) - ); - - const response = await this.lightPush.send(encoder, { - payload: wakuLikeMessage.payload - }); // todo: add to light push return of proto message or decoded message - - if (response.successes.length > 0) { - const protoObj = await encoder.toProtoObj({ - payload: wakuLikeMessage.payload - }); - const decodedMessage = await decoder.fromProtoObj( - decoder.pubsubTopic, - protoObj as IProtoMessage - ); - - await this.messageStore.markSent(requestId, decodedMessage!); - } + await this.sendMessage(requestId, wakuLikeMessage); return requestId; } @@ -87,6 +54,21 @@ export class Sender { const pendingRequests = this.messageStore.getMessagesToSend(); for (const { requestId, message } of pendingRequests) { + await this.sendMessage(requestId, message); + } + } + + private async sendMessage( + requestId: RequestId, + message: WakuLikeMessage + ): Promise { + try { + if (this.processingRequests.has(requestId)) { + return; + } + + this.processingRequests.add(requestId); + const encoder = createEncoder({ contentTopic: message.contentTopic, routingInfo: createRoutingInfo(this.networkConfig, { @@ -102,24 +84,31 @@ export class Sender { }) ); - const response = await this.lightPush.send(encoder, { - payload: message.payload - }); - - if (response.successes.length > 0) { - const protoObj = await encoder.toProtoObj({ + const response = await this.lightPush.send( + encoder, + { payload: message.payload - }); + }, + { + // force no retry as we have retry implemented in the sender + autoRetry: false, + // send to only one peer as we will retry on failure and need to ensure only one message is in the network + numPeersToUse: 1 + } + ); + + if (response?.messages && response.messages.length > 0) { const decodedMessage = await decoder.fromProtoObj( decoder.pubsubTopic, - protoObj as IProtoMessage + response.messages[0] ); - await this.messageStore.markSent( - requestId, - decodedMessage as IDecodedMessage - ); + this.messageStore.markSent(requestId, decodedMessage!); + } else { + // do nothing on failure, will retry } + } finally { + this.processingRequests.delete(requestId); } } }