feat: make pubsub topic optional

This commit is contained in:
Arseniy Klempner 2025-09-05 15:10:08 -07:00
parent 4c079158b5
commit bc901369ac
No known key found for this signature in database
GPG Key ID: 51653F18863BD24B
5 changed files with 25 additions and 21 deletions

View File

@ -65,7 +65,21 @@ The server exposes the following HTTP endpoints:
### Examples
#### Send a Message
#### Send a Message (Auto-sharding)
```bash
curl -X POST http://localhost:8080/lightpush/v3/message \
-H "Content-Type: application/json" \
-d '{
"pubsubTopic": "",
"message": {
"contentTopic": "/test/1/example/proto",
"payload": "SGVsbG8gV2FrdQ==",
"version": 1
}
}'
```
#### Send a Message (Explicit pubsub topic)
```bash
curl -X POST http://localhost:8080/lightpush/v3/message \
-H "Content-Type: application/json" \

View File

@ -42,12 +42,13 @@ router.get("/waku/v1/peer-info", createEndpointHandler({
router.post("/lightpush/v3/message", createEndpointHandler({
methodName: "pushMessageV3",
validateInput: (body: any): [string, string] => {
validateInput: (body: any): [string, string, string] => {
const validatedRequest = validators.requireLightpushV3(body);
return [
validatedRequest.message.contentTopic,
validatedRequest.message.payload,
validatedRequest.pubsubTopic,
];
},
handleError: errorHandlers.lightpushError,

View File

@ -145,8 +145,6 @@ async function startServer(port: number = 3000): Promise<void> {
}
} catch (error: any) {
console.error("Error starting server:", error);
// Don't exit the process, just log the error
// The server might still be partially functional
}
}

View File

@ -118,8 +118,8 @@ export function createEndpointHandler<TInput = any, TOutput = any>(
export const validators = {
requireLightpushV3: (body: any): LightpushV3Request => {
if (!body.pubsubTopic || typeof body.pubsubTopic !== "string") {
throw new Error("pubsubTopic is required and must be a string");
if (body.pubsubTopic !== undefined && typeof body.pubsubTopic !== "string") {
throw new Error("pubsubTopic must be a string if provided");
}
if (!body.message || typeof body.message !== "object") {
throw new Error("message is required and must be an object");
@ -135,7 +135,7 @@ export const validators = {
}
return {
pubsubTopic: body.pubsubTopic,
pubsubTopic: body.pubsubTopic || "",
message: {
payload: body.message.payload,
contentTopic: body.message.contentTopic,

View File

@ -64,7 +64,6 @@ export class WakuHeadless {
enrBootstrap: string | null;
constructor(networkConfig?: Partial<NetworkConfig>, lightpushNode?: string, enrBootstrap?: string) {
this.waku = null as unknown as LightNode;
// Use provided config or defaults
this.networkConfig = this.buildNetworkConfig(networkConfig);
this.lightpushNode = lightpushNode || null;
this.enrBootstrap = enrBootstrap || null;
@ -107,7 +106,6 @@ export class WakuHeadless {
private buildNetworkConfig(providedConfig?: Partial<NetworkConfig>): NetworkConfig {
const clusterId = providedConfig?.clusterId ?? 1;
// Check if static sharding is requested through environment or config
const staticShards = (providedConfig as any)?.shards;
if (staticShards && Array.isArray(staticShards) && staticShards.length > 0) {
return {
@ -116,7 +114,6 @@ export class WakuHeadless {
} as NetworkConfig;
}
// Default to auto-sharding
const numShardsInCluster = (providedConfig as any)?.numShardsInCluster ?? 8;
return {
clusterId,
@ -133,11 +130,8 @@ export class WakuHeadless {
throw new Error("Waku node not started");
}
// Ensure payload is properly formatted
let processedPayload: Uint8Array;
// If it's a string, try to decode as base64 first
try {
// Use TextDecoder to decode base64 (browser-compatible)
const binaryString = atob(payload);
const bytes = new Uint8Array(binaryString.length);
for (let i = 0; i < binaryString.length; i++) {
@ -145,7 +139,6 @@ export class WakuHeadless {
}
processedPayload = bytes;
} catch (e) {
// If base64 decoding fails, encode as UTF-8
processedPayload = new TextEncoder().encode(payload);
}
@ -163,7 +156,6 @@ export class WakuHeadless {
timestamp: new Date(),
});
// Convert to serializable format for cross-context communication
const serializableResult = makeSerializable(result);
return serializableResult;
@ -178,16 +170,14 @@ export class WakuHeadless {
async pushMessageV3(
contentTopic: string,
payload: string,
pubsubTopic: string,
): Promise<SerializableSDKProtocolResult> {
if (!this.waku) {
throw new Error("Waku node not started");
}
// Ensure payload is properly formatted
let processedPayload: Uint8Array;
// If it's a string, try to decode as base64 first
try {
// Use TextDecoder to decode base64 (browser-compatible)
const binaryString = atob(payload);
const bytes = new Uint8Array(binaryString.length);
for (let i = 0; i < binaryString.length; i++) {
@ -195,7 +185,6 @@ export class WakuHeadless {
}
processedPayload = bytes;
} catch (e) {
// If base64 decoding fails, encode as UTF-8
processedPayload = new TextEncoder().encode(payload);
}
@ -207,6 +196,10 @@ export class WakuHeadless {
const encoder = this.waku.createEncoder({ contentTopic });
if (pubsubTopic && pubsubTopic !== encoder.pubsubTopic) {
console.warn(`Explicit pubsubTopic ${pubsubTopic} provided, but auto-sharding determined ${encoder.pubsubTopic}. Using auto-sharding.`);
}
let result;
if (this.lightpushNode) {
@ -234,7 +227,6 @@ export class WakuHeadless {
});
}
// Convert to serializable format for cross-context communication
const serializableResult = makeSerializable(result);
return serializableResult;
@ -260,7 +252,6 @@ export class WakuHeadless {
await this.waku.waitForPeers(protocols, timeoutMs);
const elapsed = Date.now() - startTime;
// Log connected peers
const peers = this.waku.libp2p.getPeers();
return {