diff --git a/src/lib/waku_filter/filter_rpc.ts b/src/lib/waku_filter/filter_rpc.ts index db38b2cb7e..493501e5ad 100644 --- a/src/lib/waku_filter/filter_rpc.ts +++ b/src/lib/waku_filter/filter_rpc.ts @@ -51,7 +51,7 @@ export class FilterRPC { return this.proto.push; } - get requestId(): string { + get requestId(): string | undefined { return this.proto.requestId; } } diff --git a/src/lib/waku_filter/index.ts b/src/lib/waku_filter/index.ts index 77076be9ca..a2f12933d5 100644 --- a/src/lib/waku_filter/index.ts +++ b/src/lib/waku_filter/index.ts @@ -73,6 +73,12 @@ export class WakuFilter { true ); + const requestId = request.requestId; + if (!requestId) + throw new Error( + "Internal error: createRequest expected to set `requestId`" + ); + const peer = await this.getPeer(opts?.peerId); const stream = await this.newStream(peer); @@ -90,11 +96,11 @@ export class WakuFilter { throw e; } - this.addCallback(request.requestId, callback); + this.addCallback(requestId, callback); return async () => { - await this.unsubscribe(topic, contentFilters, request.requestId, peer); - this.removeCallback(request.requestId); + await this.unsubscribe(topic, contentFilters, requestId, peer); + this.removeCallback(requestId); }; } @@ -107,7 +113,7 @@ export class WakuFilter { async (source: AsyncIterable) => { for await (const bytes of source) { const res = FilterRPC.decode(bytes.slice()); - if (res.push?.messages?.length) { + if (res.requestId && res.push?.messages?.length) { await this.pushMessages(res.requestId, res.push.messages); } } diff --git a/src/lib/waku_store/index.ts b/src/lib/waku_store/index.ts index 36f5e15b8f..e2e462a94f 100644 --- a/src/lib/waku_store/index.ts +++ b/src/lib/waku_store/index.ts @@ -219,7 +219,7 @@ export class WakuStore { const response = reply.response as protoV2Beta4.HistoryResponse; if (response.error) { - throw "History response contains an Error" + response.error; + throw "History response contains an Error: " + response.error; } if (!response.messages || !response.messages.length) { diff --git a/src/proto/filter.proto b/src/proto/filter.proto index 369d369560..df9144023d 100644 --- a/src/proto/filter.proto +++ b/src/proto/filter.proto @@ -3,12 +3,12 @@ syntax = "proto3"; import "message.proto"; message FilterRequest { - bool subscribe = 1; - string topic = 2; + optional bool subscribe = 1; + optional string topic = 2; repeated ContentFilter content_filters = 3; message ContentFilter { - string content_topic = 1; + optional string content_topic = 1; } } @@ -17,7 +17,7 @@ message MessagePush { } message FilterRPC { - string request_id = 1; + optional string request_id = 1; optional FilterRequest request = 2; optional MessagePush push = 3; } diff --git a/src/proto/filter.ts b/src/proto/filter.ts index ea8911ac25..afd99896f2 100644 --- a/src/proto/filter.ts +++ b/src/proto/filter.ts @@ -15,20 +15,20 @@ import { import type { Codec } from "protons-runtime"; export interface FilterRequest { - subscribe: boolean; - topic: string; + subscribe?: boolean; + topic?: string; contentFilters: FilterRequest.ContentFilter[]; } export namespace FilterRequest { export interface ContentFilter { - contentTopic: string; + contentTopic?: string; } export namespace ContentFilter { export const codec = (): Codec => { return message({ - 1: { name: "contentTopic", codec: string }, + 1: { name: "contentTopic", codec: string, optional: true }, }); }; @@ -43,8 +43,8 @@ export namespace FilterRequest { export const codec = (): Codec => { return message({ - 1: { name: "subscribe", codec: bool }, - 2: { name: "topic", codec: string }, + 1: { name: "subscribe", codec: bool, optional: true }, + 2: { name: "topic", codec: string, optional: true }, 3: { name: "contentFilters", codec: FilterRequest.ContentFilter.codec(), @@ -83,7 +83,7 @@ export namespace MessagePush { } export interface FilterRPC { - requestId: string; + requestId?: string; request?: FilterRequest; push?: MessagePush; } @@ -91,7 +91,7 @@ export interface FilterRPC { export namespace FilterRPC { export const codec = (): Codec => { return message({ - 1: { name: "requestId", codec: string }, + 1: { name: "requestId", codec: string, optional: true }, 2: { name: "request", codec: FilterRequest.codec(), optional: true }, 3: { name: "push", codec: MessagePush.codec(), optional: true }, }); diff --git a/src/proto/light_push.proto b/src/proto/light_push.proto index 7d52944317..d1954b6f6b 100644 --- a/src/proto/light_push.proto +++ b/src/proto/light_push.proto @@ -3,17 +3,17 @@ syntax = "proto3"; import "message.proto"; message PushRequest { - string pub_sub_topic = 1; - WakuMessage message = 2; + optional string pub_sub_topic = 1; + optional WakuMessage message = 2; } message PushResponse { - bool is_success = 1; - string info = 2; + optional bool is_success = 1; + optional string info = 2; } message PushRPC { - string request_id = 1; + optional string request_id = 1; optional PushRequest request = 2; optional PushResponse response = 3; } diff --git a/src/proto/light_push.ts b/src/proto/light_push.ts index 2081fd2f98..6b418a3210 100644 --- a/src/proto/light_push.ts +++ b/src/proto/light_push.ts @@ -15,15 +15,15 @@ import { import type { Codec } from "protons-runtime"; export interface PushRequest { - pubSubTopic: string; - message: WakuMessage; + pubSubTopic?: string; + message?: WakuMessage; } export namespace PushRequest { export const codec = (): Codec => { return message({ - 1: { name: "pubSubTopic", codec: string }, - 2: { name: "message", codec: WakuMessage.codec() }, + 1: { name: "pubSubTopic", codec: string, optional: true }, + 2: { name: "message", codec: WakuMessage.codec(), optional: true }, }); }; @@ -37,15 +37,15 @@ export namespace PushRequest { } export interface PushResponse { - isSuccess: boolean; - info: string; + isSuccess?: boolean; + info?: string; } export namespace PushResponse { export const codec = (): Codec => { return message({ - 1: { name: "isSuccess", codec: bool }, - 2: { name: "info", codec: string }, + 1: { name: "isSuccess", codec: bool, optional: true }, + 2: { name: "info", codec: string, optional: true }, }); }; @@ -59,7 +59,7 @@ export namespace PushResponse { } export interface PushRPC { - requestId: string; + requestId?: string; request?: PushRequest; response?: PushResponse; } @@ -67,7 +67,7 @@ export interface PushRPC { export namespace PushRPC { export const codec = (): Codec => { return message({ - 1: { name: "requestId", codec: string }, + 1: { name: "requestId", codec: string, optional: true }, 2: { name: "request", codec: PushRequest.codec(), optional: true }, 3: { name: "response", codec: PushResponse.codec(), optional: true }, }); diff --git a/src/proto/store_v2beta3.proto b/src/proto/store_v2beta3.proto index 2ac4139754..8bd89ed1a2 100644 --- a/src/proto/store_v2beta3.proto +++ b/src/proto/store_v2beta3.proto @@ -3,23 +3,23 @@ syntax = "proto3"; import "message.proto"; message Index { - bytes digest = 1; - double received_time = 2; - double sender_time = 3; + optional bytes digest = 1; + optional double received_time = 2; + optional double sender_time = 3; } message PagingInfo { - uint64 page_size = 1; - Index cursor = 2; + optional uint64 page_size = 1; + optional Index cursor = 2; enum Direction { DIRECTION_BACKWARD_UNSPECIFIED = 0; DIRECTION_FORWARD = 1; } - Direction direction = 3; + optional Direction direction = 3; } message ContentFilter { - string content_topic = 1; + optional string content_topic = 1; } message HistoryQuery { @@ -32,16 +32,16 @@ message HistoryQuery { message HistoryResponse { repeated WakuMessage messages = 2; - PagingInfo paging_info = 3; + optional PagingInfo paging_info = 3; enum Error { ERROR_NONE_UNSPECIFIED = 0; ERROR_INVALID_CURSOR = 1; } - Error error = 4; + optional Error error = 4; } message HistoryRPC { - string request_id = 1; + optional string request_id = 1; optional HistoryQuery query = 2; optional HistoryResponse response = 3; } diff --git a/src/proto/store_v2beta3.ts b/src/proto/store_v2beta3.ts index f92bde640a..800de7f5c3 100644 --- a/src/proto/store_v2beta3.ts +++ b/src/proto/store_v2beta3.ts @@ -16,17 +16,17 @@ import { import type { Codec } from "protons-runtime"; export interface Index { - digest: Uint8Array; - receivedTime: number; - senderTime: number; + digest?: Uint8Array; + receivedTime?: number; + senderTime?: number; } export namespace Index { export const codec = (): Codec => { return message({ - 1: { name: "digest", codec: bytes }, - 2: { name: "receivedTime", codec: double }, - 3: { name: "senderTime", codec: double }, + 1: { name: "digest", codec: bytes, optional: true }, + 2: { name: "receivedTime", codec: double, optional: true }, + 3: { name: "senderTime", codec: double, optional: true }, }); }; @@ -40,9 +40,9 @@ export namespace Index { } export interface PagingInfo { - pageSize: bigint; - cursor: Index; - direction: PagingInfo.Direction; + pageSize?: bigint; + cursor?: Index; + direction?: PagingInfo.Direction; } export namespace PagingInfo { @@ -64,9 +64,13 @@ export namespace PagingInfo { export const codec = (): Codec => { return message({ - 1: { name: "pageSize", codec: uint64 }, - 2: { name: "cursor", codec: Index.codec() }, - 3: { name: "direction", codec: PagingInfo.Direction.codec() }, + 1: { name: "pageSize", codec: uint64, optional: true }, + 2: { name: "cursor", codec: Index.codec(), optional: true }, + 3: { + name: "direction", + codec: PagingInfo.Direction.codec(), + optional: true, + }, }); }; @@ -80,13 +84,13 @@ export namespace PagingInfo { } export interface ContentFilter { - contentTopic: string; + contentTopic?: string; } export namespace ContentFilter { export const codec = (): Codec => { return message({ - 1: { name: "contentTopic", codec: string }, + 1: { name: "contentTopic", codec: string, optional: true }, }); }; @@ -133,8 +137,8 @@ export namespace HistoryQuery { export interface HistoryResponse { messages: WakuMessage[]; - pagingInfo: PagingInfo; - error: HistoryResponse.Error; + pagingInfo?: PagingInfo; + error?: HistoryResponse.Error; } export namespace HistoryResponse { @@ -157,8 +161,12 @@ export namespace HistoryResponse { export const codec = (): Codec => { return message({ 2: { name: "messages", codec: WakuMessage.codec(), repeats: true }, - 3: { name: "pagingInfo", codec: PagingInfo.codec() }, - 4: { name: "error", codec: HistoryResponse.Error.codec() }, + 3: { name: "pagingInfo", codec: PagingInfo.codec(), optional: true }, + 4: { + name: "error", + codec: HistoryResponse.Error.codec(), + optional: true, + }, }); }; @@ -172,7 +180,7 @@ export namespace HistoryResponse { } export interface HistoryRPC { - requestId: string; + requestId?: string; query?: HistoryQuery; response?: HistoryResponse; } @@ -180,7 +188,7 @@ export interface HistoryRPC { export namespace HistoryRPC { export const codec = (): Codec => { return message({ - 1: { name: "requestId", codec: string }, + 1: { name: "requestId", codec: string, optional: true }, 2: { name: "query", codec: HistoryQuery.codec(), optional: true }, 3: { name: "response", codec: HistoryResponse.codec(), optional: true }, }); diff --git a/src/proto/store_v2beta4.proto b/src/proto/store_v2beta4.proto index 3166dc588b..337c21200d 100644 --- a/src/proto/store_v2beta4.proto +++ b/src/proto/store_v2beta4.proto @@ -3,24 +3,24 @@ syntax = "proto3"; import "message.proto"; message Index { - bytes digest = 1; - sint64 received_time = 2; - sint64 sender_time = 3; - string pubsub_topic = 4; + optional bytes digest = 1; + optional sint64 received_time = 2; + optional sint64 sender_time = 3; + optional string pubsub_topic = 4; } message PagingInfo { - uint64 page_size = 1; - Index cursor = 2; + optional uint64 page_size = 1; + optional Index cursor = 2; enum Direction { DIRECTION_BACKWARD_UNSPECIFIED = 0; DIRECTION_FORWARD = 1; } - Direction direction = 3; + optional Direction direction = 3; } message ContentFilter { - string content_topic = 1; + optional string content_topic = 1; } message HistoryQuery { @@ -33,16 +33,16 @@ message HistoryQuery { message HistoryResponse { repeated WakuMessage messages = 2; - PagingInfo paging_info = 3; + optional PagingInfo paging_info = 3; enum Error { ERROR_NONE_UNSPECIFIED = 0; ERROR_INVALID_CURSOR = 1; } - Error error = 4; + optional Error error = 4; } message HistoryRPC { - string request_id = 1; + optional string request_id = 1; optional HistoryQuery query = 2; optional HistoryResponse response = 3; } diff --git a/src/proto/store_v2beta4.ts b/src/proto/store_v2beta4.ts index 7a5adedd22..256da55a43 100644 --- a/src/proto/store_v2beta4.ts +++ b/src/proto/store_v2beta4.ts @@ -16,19 +16,19 @@ import { import type { Codec } from "protons-runtime"; export interface Index { - digest: Uint8Array; - receivedTime: bigint; - senderTime: bigint; - pubsubTopic: string; + digest?: Uint8Array; + receivedTime?: bigint; + senderTime?: bigint; + pubsubTopic?: string; } export namespace Index { export const codec = (): Codec => { return message({ - 1: { name: "digest", codec: bytes }, - 2: { name: "receivedTime", codec: sint64 }, - 3: { name: "senderTime", codec: sint64 }, - 4: { name: "pubsubTopic", codec: string }, + 1: { name: "digest", codec: bytes, optional: true }, + 2: { name: "receivedTime", codec: sint64, optional: true }, + 3: { name: "senderTime", codec: sint64, optional: true }, + 4: { name: "pubsubTopic", codec: string, optional: true }, }); }; @@ -42,9 +42,9 @@ export namespace Index { } export interface PagingInfo { - pageSize: bigint; - cursor: Index; - direction: PagingInfo.Direction; + pageSize?: bigint; + cursor?: Index; + direction?: PagingInfo.Direction; } export namespace PagingInfo { @@ -66,9 +66,13 @@ export namespace PagingInfo { export const codec = (): Codec => { return message({ - 1: { name: "pageSize", codec: uint64 }, - 2: { name: "cursor", codec: Index.codec() }, - 3: { name: "direction", codec: PagingInfo.Direction.codec() }, + 1: { name: "pageSize", codec: uint64, optional: true }, + 2: { name: "cursor", codec: Index.codec(), optional: true }, + 3: { + name: "direction", + codec: PagingInfo.Direction.codec(), + optional: true, + }, }); }; @@ -82,13 +86,13 @@ export namespace PagingInfo { } export interface ContentFilter { - contentTopic: string; + contentTopic?: string; } export namespace ContentFilter { export const codec = (): Codec => { return message({ - 1: { name: "contentTopic", codec: string }, + 1: { name: "contentTopic", codec: string, optional: true }, }); }; @@ -135,8 +139,8 @@ export namespace HistoryQuery { export interface HistoryResponse { messages: WakuMessage[]; - pagingInfo: PagingInfo; - error: HistoryResponse.Error; + pagingInfo?: PagingInfo; + error?: HistoryResponse.Error; } export namespace HistoryResponse { @@ -159,8 +163,12 @@ export namespace HistoryResponse { export const codec = (): Codec => { return message({ 2: { name: "messages", codec: WakuMessage.codec(), repeats: true }, - 3: { name: "pagingInfo", codec: PagingInfo.codec() }, - 4: { name: "error", codec: HistoryResponse.Error.codec() }, + 3: { name: "pagingInfo", codec: PagingInfo.codec(), optional: true }, + 4: { + name: "error", + codec: HistoryResponse.Error.codec(), + optional: true, + }, }); }; @@ -174,7 +182,7 @@ export namespace HistoryResponse { } export interface HistoryRPC { - requestId: string; + requestId?: string; query?: HistoryQuery; response?: HistoryResponse; } @@ -182,7 +190,7 @@ export interface HistoryRPC { export namespace HistoryRPC { export const codec = (): Codec => { return message({ - 1: { name: "requestId", codec: string }, + 1: { name: "requestId", codec: string, optional: true }, 2: { name: "query", codec: HistoryQuery.codec(), optional: true }, 3: { name: "response", codec: HistoryResponse.codec(), optional: true }, });