Enable pubsub topic filter in history queries

Resolves #78
This commit is contained in:
Franck Royer 2021-04-29 16:04:34 +10:00
parent fbfad9390c
commit 1016e85f70
No known key found for this signature in database
GPG Key ID: A82ED75A8DFC50A4
5 changed files with 46 additions and 17 deletions

@ -1 +1 @@
Subproject commit fb2ea06a4fdfaee91aa081be2592eb79349ed6c2 Subproject commit 967db6d6102646549bc2184b6fd26968ca764de7

View File

@ -24,10 +24,11 @@ message ContentFilter {
} }
message HistoryQuery { message HistoryQuery {
repeated ContentFilter content_filters = 2; optional string pubsub_topic = 2;
optional PagingInfo paging_info = 3; repeated ContentFilter content_filters = 3;
optional double start_time = 4; optional PagingInfo paging_info = 4;
optional double end_time = 5; optional double start_time = 5;
optional double end_time = 6;
} }
message HistoryResponse { message HistoryResponse {

View File

@ -3,13 +3,15 @@ import { v4 as uuid } from 'uuid';
import * as proto from '../../proto/waku/v2/store'; import * as proto from '../../proto/waku/v2/store';
import { DEFAULT_CONTENT_TOPIC } from '../waku_message'; import { DEFAULT_CONTENT_TOPIC } from '../waku_message';
import { RelayDefaultTopic } from '../waku_relay';
export class HistoryRPC { export class HistoryRPC {
public constructor(public proto: proto.HistoryRPC) {} public constructor(public proto: proto.HistoryRPC) {}
static createQuery( static createQuery(
contentTopics: string[] = [DEFAULT_CONTENT_TOPIC], contentTopics: string[] = [DEFAULT_CONTENT_TOPIC],
cursor?: proto.Index cursor?: proto.Index,
pubsubTopic: string = RelayDefaultTopic
): HistoryRPC { ): HistoryRPC {
const pagingInfo = { const pagingInfo = {
pageSize: 10, pageSize: 10,
@ -24,6 +26,7 @@ export class HistoryRPC {
return new HistoryRPC({ return new HistoryRPC({
requestId: uuid(), requestId: uuid(),
query: { query: {
pubsubTopic,
contentFilters, contentFilters,
pagingInfo, pagingInfo,
startTime: undefined, startTime: undefined,

View File

@ -8,7 +8,7 @@ import { WakuMessage } from '../waku_message';
import { HistoryRPC } from './history_rpc'; import { HistoryRPC } from './history_rpc';
export const StoreCodec = '/vac/waku/store/2.0.0-beta1'; export const StoreCodec = '/vac/waku/store/2.0.0-beta3';
export class WakuStore { export class WakuStore {
constructor(public libp2p: Libp2p) {} constructor(public libp2p: Libp2p) {}
@ -16,12 +16,14 @@ export class WakuStore {
/** /**
* Retrieve history from given peer * Retrieve history from given peer
* @param peerId * @param peerId
* @param topics * @param contentTopics
* @param pubsubTopic
* @throws if not able to reach peer * @throws if not able to reach peer
*/ */
async queryHistory( async queryHistory(
peerId: PeerId, peerId: PeerId,
topics?: string[] contentTopics?: string[],
pubsubTopic?: string
): Promise<WakuMessage[] | null> { ): Promise<WakuMessage[] | null> {
const peer = this.libp2p.peerStore.get(peerId); const peer = this.libp2p.peerStore.get(peerId);
if (!peer) throw 'Peer is unknown'; if (!peer) throw 'Peer is unknown';
@ -36,7 +38,11 @@ export class WakuStore {
try { try {
const { stream } = await connection.newStream(StoreCodec); const { stream } = await connection.newStream(StoreCodec);
try { try {
const historyRpcQuery = HistoryRPC.createQuery(topics, cursor); const historyRpcQuery = HistoryRPC.createQuery(
contentTopics,
cursor,
pubsubTopic
);
const res = await pipe( const res = await pipe(
[historyRpcQuery.encode()], [historyRpcQuery.encode()],
lp.encode(), lp.encode(),

View File

@ -57,6 +57,7 @@ export interface ContentFilter {
} }
export interface HistoryQuery { export interface HistoryQuery {
pubsubTopic?: string | undefined;
contentFilters: ContentFilter[]; contentFilters: ContentFilter[];
pagingInfo?: PagingInfo | undefined; pagingInfo?: PagingInfo | undefined;
startTime?: number | undefined; startTime?: number | undefined;
@ -310,17 +311,20 @@ export const HistoryQuery = {
message: HistoryQuery, message: HistoryQuery,
writer: _m0.Writer = _m0.Writer.create() writer: _m0.Writer = _m0.Writer.create()
): _m0.Writer { ): _m0.Writer {
if (message.pubsubTopic !== undefined) {
writer.uint32(18).string(message.pubsubTopic);
}
for (const v of message.contentFilters) { for (const v of message.contentFilters) {
ContentFilter.encode(v!, writer.uint32(18).fork()).ldelim(); ContentFilter.encode(v!, writer.uint32(26).fork()).ldelim();
} }
if (message.pagingInfo !== undefined) { if (message.pagingInfo !== undefined) {
PagingInfo.encode(message.pagingInfo, writer.uint32(26).fork()).ldelim(); PagingInfo.encode(message.pagingInfo, writer.uint32(34).fork()).ldelim();
} }
if (message.startTime !== undefined) { if (message.startTime !== undefined) {
writer.uint32(33).double(message.startTime); writer.uint32(41).double(message.startTime);
} }
if (message.endTime !== undefined) { if (message.endTime !== undefined) {
writer.uint32(41).double(message.endTime); writer.uint32(49).double(message.endTime);
} }
return writer; return writer;
}, },
@ -334,17 +338,20 @@ export const HistoryQuery = {
const tag = reader.uint32(); const tag = reader.uint32();
switch (tag >>> 3) { switch (tag >>> 3) {
case 2: case 2:
message.pubsubTopic = reader.string();
break;
case 3:
message.contentFilters.push( message.contentFilters.push(
ContentFilter.decode(reader, reader.uint32()) ContentFilter.decode(reader, reader.uint32())
); );
break; break;
case 3: case 4:
message.pagingInfo = PagingInfo.decode(reader, reader.uint32()); message.pagingInfo = PagingInfo.decode(reader, reader.uint32());
break; break;
case 4: case 5:
message.startTime = reader.double(); message.startTime = reader.double();
break; break;
case 5: case 6:
message.endTime = reader.double(); message.endTime = reader.double();
break; break;
default: default:
@ -358,6 +365,11 @@ export const HistoryQuery = {
fromJSON(object: any): HistoryQuery { fromJSON(object: any): HistoryQuery {
const message = { ...baseHistoryQuery } as HistoryQuery; const message = { ...baseHistoryQuery } as HistoryQuery;
message.contentFilters = []; message.contentFilters = [];
if (object.pubsubTopic !== undefined && object.pubsubTopic !== null) {
message.pubsubTopic = String(object.pubsubTopic);
} else {
message.pubsubTopic = undefined;
}
if (object.contentFilters !== undefined && object.contentFilters !== null) { if (object.contentFilters !== undefined && object.contentFilters !== null) {
for (const e of object.contentFilters) { for (const e of object.contentFilters) {
message.contentFilters.push(ContentFilter.fromJSON(e)); message.contentFilters.push(ContentFilter.fromJSON(e));
@ -383,6 +395,8 @@ export const HistoryQuery = {
toJSON(message: HistoryQuery): unknown { toJSON(message: HistoryQuery): unknown {
const obj: any = {}; const obj: any = {};
message.pubsubTopic !== undefined &&
(obj.pubsubTopic = message.pubsubTopic);
if (message.contentFilters) { if (message.contentFilters) {
obj.contentFilters = message.contentFilters.map((e) => obj.contentFilters = message.contentFilters.map((e) =>
e ? ContentFilter.toJSON(e) : undefined e ? ContentFilter.toJSON(e) : undefined
@ -402,6 +416,11 @@ export const HistoryQuery = {
fromPartial(object: DeepPartial<HistoryQuery>): HistoryQuery { fromPartial(object: DeepPartial<HistoryQuery>): HistoryQuery {
const message = { ...baseHistoryQuery } as HistoryQuery; const message = { ...baseHistoryQuery } as HistoryQuery;
message.contentFilters = []; message.contentFilters = [];
if (object.pubsubTopic !== undefined && object.pubsubTopic !== null) {
message.pubsubTopic = object.pubsubTopic;
} else {
message.pubsubTopic = undefined;
}
if (object.contentFilters !== undefined && object.contentFilters !== null) { if (object.contentFilters !== undefined && object.contentFilters !== null) {
for (const e of object.contentFilters) { for (const e of object.contentFilters) {
message.contentFilters.push(ContentFilter.fromPartial(e)); message.contentFilters.push(ContentFilter.fromPartial(e));