implement working version of sendind

This commit is contained in:
Sasha 2025-10-07 00:00:27 +02:00
parent 7f98bb183d
commit da11478193
No known key found for this signature in database
8 changed files with 141 additions and 88 deletions

View File

@ -55,11 +55,11 @@ export class LightPushCore {
};
}
const { rpc, error: prepError } = await ProtocolHandler.preparePushMessage(
encoder,
message,
protocol
);
const {
rpc,
error: prepError,
message: protoMessage
} = await ProtocolHandler.preparePushMessage(encoder, message, protocol);
if (prepError) {
return {
@ -117,7 +117,21 @@ export class LightPushCore {
};
}
return ProtocolHandler.handleResponse(bytes, protocol, peerId);
const processedResponse = ProtocolHandler.handleResponse(
bytes,
protocol,
peerId
);
if (processedResponse.success) {
return {
success: processedResponse.success,
failure: null,
message: protoMessage
};
}
return processedResponse;
}
private async getProtocol(

View File

@ -1,5 +1,10 @@
import type { PeerId } from "@libp2p/interface";
import type { IEncoder, IMessage, LightPushCoreResult } from "@waku/interfaces";
import type {
IEncoder,
IMessage,
IProtoMessage,
LightPushCoreResult
} from "@waku/interfaces";
import { LightPushError, LightPushStatusCode } from "@waku/interfaces";
import { PushResponse, WakuMessage } from "@waku/proto";
import { isMessageSizeUnderCap, Logger } from "@waku/utils";
@ -15,8 +20,8 @@ type VersionedPushRpc =
| ({ version: "v3" } & PushRpc);
type PreparePushMessageResult =
| { rpc: VersionedPushRpc; error: null }
| { rpc: null; error: LightPushError };
| { rpc: VersionedPushRpc; error: null; message?: IProtoMessage }
| { rpc: null; error: LightPushError; message?: IProtoMessage };
const log = new Logger("light-push:protocol-handler");
@ -47,13 +52,15 @@ export class ProtocolHandler {
log.info("Creating v3 RPC message");
return {
rpc: ProtocolHandler.createV3Rpc(protoMessage, encoder.pubsubTopic),
error: null
error: null,
message: protoMessage
};
}
log.info("Creating v2 RPC message");
return {
rpc: ProtocolHandler.createV2Rpc(protoMessage, encoder.pubsubTopic),
message: protoMessage,
error: null
};
} catch (err) {

View File

@ -5,7 +5,7 @@ import type { DiscoveryOptions, PeerCache } from "./discovery.js";
import type { FilterProtocolOptions } from "./filter.js";
import type { CreateLibp2pOptions } from "./libp2p.js";
import type { LightPushProtocolOptions } from "./light_push.js";
import type { IDecodedMessage } from "./message.js";
import type { IDecodedMessage, IProtoMessage } from "./message.js";
import type { ThisAndThat, ThisOrThat } from "./misc.js";
import { NetworkConfig } from "./sharding.js";
import type { StoreProtocolOptions } from "./store.js";
@ -195,7 +195,13 @@ export type LightPushCoreResult = ThisOrThat<
PeerId,
"failure",
LightPushFailure
>;
> & {
/**
* The proto object of the message.
* Present only if the message was successfully pushed to the network.
*/
message?: IProtoMessage;
};
export type FilterCoreResult = ThisOrThat<
"success",
@ -209,7 +215,13 @@ export type LightPushSDKResult = ThisAndThat<
PeerId[],
"failures",
LightPushFailure[]
>;
> & {
/**
* The proto objects of the messages.
* Present only if the messages were successfully pushed to the network.
*/
messages?: IProtoMessage[];
};
export type FilterSDKResult = ThisAndThat<
"successes",

View File

@ -20,6 +20,12 @@ export type ISendOptions = {
* @default false
*/
useLegacy?: boolean;
/**
* Amount of peers to send message to.
* Overrides `numPeersToUse` in {@link @waku/interfaces!CreateNodeOptions}.
*/
numPeersToUse?: number;
};
export interface ISender {

View File

@ -4,6 +4,7 @@ import {
type IEncoder,
ILightPush,
type IMessage,
IProtoMessage,
type ISendOptions,
type Libp2p,
LightPushCoreResult,
@ -82,10 +83,11 @@ export class LightPush implements ILightPush {
log.info("send: attempting to send a message to pubsubTopic:", pubsubTopic);
const peerIds = await this.peerManager.getPeers({
let peerIds = await this.peerManager.getPeers({
protocol: options.useLegacy ? "light-push-v2" : Protocols.LightPush,
pubsubTopic: encoder.pubsubTopic
});
peerIds = peerIds.slice(0, options.numPeersToUse);
const coreResults =
peerIds?.length > 0
@ -93,12 +95,15 @@ export class LightPush implements ILightPush {
peerIds.map((peerId) =>
this.protocol
.send(encoder, message, peerId, options.useLegacy)
.catch((_e) => ({
success: null,
failure: {
error: LightPushError.GENERIC_FAIL
}
}))
.catch(
(_e) =>
({
success: null,
failure: {
error: LightPushError.GENERIC_FAIL
}
}) as LightPushCoreResult
)
)
)
: [];
@ -110,7 +115,10 @@ export class LightPush implements ILightPush {
.map((v) => v.success) as PeerId[],
failures: coreResults
.filter((v) => v.failure)
.map((v) => v.failure) as LightPushFailure[]
.map((v) => v.failure) as LightPushFailure[],
messages: coreResults
.filter((v) => v.message)
.map((v) => v.message) as IProtoMessage[]
}
: {
successes: [],

View File

@ -24,6 +24,8 @@ export class AckManager implements IAckManager {
private readonly storeAckManager: StoreAckManager;
private readonly networkConfig: NetworkConfig;
private readonly subscribedContentTopics: Set<string> = new Set();
public constructor(params: AckManagerConstructorParams) {
this.messageStore = params.messageStore;
this.networkConfig = params.networkConfig;
@ -44,9 +46,15 @@ export class AckManager implements IAckManager {
public async stop(): Promise<void> {
await this.filterAckManager.stop();
this.storeAckManager.stop();
this.subscribedContentTopics.clear();
}
public async subscribe(contentTopic: string): Promise<boolean> {
if (this.subscribedContentTopics.has(contentTopic)) {
return true;
}
this.subscribedContentTopics.add(contentTopic);
const decoder = createDecoder(
contentTopic,
createRoutingInfo(this.networkConfig, {
@ -55,9 +63,11 @@ export class AckManager implements IAckManager {
);
return (
(await this.filterAckManager.subscribe(decoder)) ||
(await this.storeAckManager.subscribe(decoder))
);
await Promise.all([
this.filterAckManager.subscribe(decoder),
this.storeAckManager.subscribe(decoder)
])
).some((success) => success);
}
}
@ -118,7 +128,7 @@ class StoreAckManager {
this.interval = setInterval(() => {
void this.query();
}, 1000);
}, 5000);
}
public stop(): void {

View File

@ -32,15 +32,15 @@ export class MessageStore {
private readonly resendIntervalMs: number;
public constructor(options: MessageStoreOptions = {}) {
this.resendIntervalMs = options.resendIntervalMs ?? 2000;
this.resendIntervalMs = options.resendIntervalMs ?? 5000;
}
public has(hashStr: string): boolean {
return this.messages.has(hashStr);
return this.messages.has(hashStr) || this.pendingMessages.has(hashStr);
}
public add(message: IDecodedMessage, options: AddMessageOptions = {}): void {
if (!this.messages.has(message.hashStr)) {
if (!this.has(message.hashStr)) {
this.messages.set(message.hashStr, {
filterAck: options.filterAck ?? false,
storeAck: options.storeAck ?? false,
@ -59,10 +59,7 @@ export class MessageStore {
this.replacePendingWithMessage(hashStr);
}
public async markSent(
requestId: RequestId,
sentMessage: IDecodedMessage
): Promise<void> {
public markSent(requestId: RequestId, sentMessage: IDecodedMessage): void {
const entry = this.pendingRequests.get(requestId);
if (!entry || !entry.messageRequest) {
@ -71,6 +68,8 @@ export class MessageStore {
entry.lastSentAt = Number(sentMessage.timestamp);
this.pendingMessages.set(sentMessage.hashStr, requestId);
this.replacePendingWithMessage(sentMessage.hashStr);
}
public async queue(message: WakuLikeMessage): Promise<RequestId> {
@ -102,13 +101,11 @@ export class MessageStore {
continue;
}
const notSent = !entry.lastSentAt;
const notAcknowledged =
entry.lastSentAt &&
Date.now() - entry.lastSentAt >= this.resendIntervalMs &&
!isAcknowledged;
const sentAt = entry.lastSentAt || entry.createdAt;
const notTooRecent = Date.now() - sentAt >= this.resendIntervalMs;
const notAcknowledged = !isAcknowledged;
if (notSent || notAcknowledged) {
if (notTooRecent && notAcknowledged) {
res.push({
requestId,
message: entry.messageRequest
@ -154,12 +151,22 @@ export class MessageStore {
return;
}
const entry = this.pendingRequests.get(requestId);
let entry = this.pendingRequests.get(requestId);
if (!entry) {
return;
}
// merge with message entry if possible
// this can happen if message we sent got received before we could add it to the message store
const messageEntry = this.messages.get(hashStr);
entry = {
...entry,
...messageEntry,
filterAck: messageEntry?.filterAck ?? entry.filterAck,
storeAck: messageEntry?.storeAck ?? entry.storeAck
};
this.pendingRequests.delete(requestId);
this.pendingMessages.delete(hashStr);

View File

@ -1,10 +1,5 @@
import { createDecoder, createEncoder } from "@waku/core";
import {
IDecodedMessage,
ILightPush,
IProtoMessage,
NetworkConfig
} from "@waku/interfaces";
import { ILightPush, NetworkConfig } from "@waku/interfaces";
import { createRoutingInfo } from "@waku/utils";
import { AckManager } from "./ack_manager.js";
@ -24,6 +19,8 @@ export class Sender {
private readonly ackManager: AckManager;
private readonly networkConfig: NetworkConfig;
private readonly processingRequests: Set<RequestId> = new Set();
private sendInterval: ReturnType<typeof setInterval> | null = null;
public constructor(params: SenderConstructorParams) {
@ -48,37 +45,7 @@ export class Sender {
const requestId = await this.messageStore.queue(wakuLikeMessage);
await this.ackManager.subscribe(wakuLikeMessage.contentTopic);
const encoder = createEncoder({
contentTopic: wakuLikeMessage.contentTopic,
routingInfo: createRoutingInfo(this.networkConfig, {
contentTopic: wakuLikeMessage.contentTopic
}),
ephemeral: wakuLikeMessage.ephemeral
});
const decoder = createDecoder(
wakuLikeMessage.contentTopic,
createRoutingInfo(this.networkConfig, {
contentTopic: wakuLikeMessage.contentTopic
})
);
const response = await this.lightPush.send(encoder, {
payload: wakuLikeMessage.payload
}); // todo: add to light push return of proto message or decoded message
if (response.successes.length > 0) {
const protoObj = await encoder.toProtoObj({
payload: wakuLikeMessage.payload
});
const decodedMessage = await decoder.fromProtoObj(
decoder.pubsubTopic,
protoObj as IProtoMessage
);
await this.messageStore.markSent(requestId, decodedMessage!);
}
await this.sendMessage(requestId, wakuLikeMessage);
return requestId;
}
@ -87,6 +54,21 @@ export class Sender {
const pendingRequests = this.messageStore.getMessagesToSend();
for (const { requestId, message } of pendingRequests) {
await this.sendMessage(requestId, message);
}
}
private async sendMessage(
requestId: RequestId,
message: WakuLikeMessage
): Promise<void> {
try {
if (this.processingRequests.has(requestId)) {
return;
}
this.processingRequests.add(requestId);
const encoder = createEncoder({
contentTopic: message.contentTopic,
routingInfo: createRoutingInfo(this.networkConfig, {
@ -102,24 +84,31 @@ export class Sender {
})
);
const response = await this.lightPush.send(encoder, {
payload: message.payload
});
if (response.successes.length > 0) {
const protoObj = await encoder.toProtoObj({
const response = await this.lightPush.send(
encoder,
{
payload: message.payload
});
},
{
// force no retry as we have retry implemented in the sender
autoRetry: false,
// send to only one peer as we will retry on failure and need to ensure only one message is in the network
numPeersToUse: 1
}
);
if (response?.messages && response.messages.length > 0) {
const decodedMessage = await decoder.fromProtoObj(
decoder.pubsubTopic,
protoObj as IProtoMessage
response.messages[0]
);
await this.messageStore.markSent(
requestId,
decodedMessage as IDecodedMessage
);
this.messageStore.markSent(requestId, decodedMessage!);
} else {
// do nothing on failure, will retry
}
} finally {
this.processingRequests.delete(requestId);
}
}
}