mirror of
https://github.com/waku-org/js-waku.git
synced 2025-01-27 12:45:21 +00:00
Use one object as we are increasing the number of parameters
This commit is contained in:
parent
d1ef76f7f9
commit
4b6fe84392
@ -7,6 +7,11 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
|
||||
|
||||
## [Unreleased]
|
||||
|
||||
### Added
|
||||
- `callback` argument to `WakuStore.queryHistory()`, called as messages are retrieved
|
||||
; Messages are retrieved using pagination, and it may take some time to retrieve all messages,
|
||||
with the `callback` function, messages are processed as soon as they are received.
|
||||
|
||||
### Changed
|
||||
- Testing: Upgrade nim-waku node to v0.3.
|
||||
|
||||
|
@ -76,9 +76,10 @@ export default async function startChat(): Promise<void> {
|
||||
console.log(
|
||||
`Retrieving archived messages from ${peerId.toB58String()}`
|
||||
);
|
||||
const messages = await waku.store.queryHistory(peerId, [
|
||||
ChatContentTopic,
|
||||
]);
|
||||
const messages = await waku.store.queryHistory({
|
||||
peerId,
|
||||
contentTopics: [ChatContentTopic],
|
||||
});
|
||||
messages?.map((msg) => {
|
||||
if (msg.payload) {
|
||||
const chatMsg = ChatMessage.decode(msg.payload);
|
||||
|
@ -68,9 +68,10 @@ export default function App() {
|
||||
if (protocols.includes(StoreCodec)) {
|
||||
console.log(`${peerId.toB58String()}: retrieving archived messages}`);
|
||||
try {
|
||||
const response = await waku.store.queryHistory(peerId, [
|
||||
ChatContentTopic,
|
||||
]);
|
||||
const response = await waku.store.queryHistory({
|
||||
peerId,
|
||||
contentTopics: [ChatContentTopic],
|
||||
});
|
||||
console.log(`${peerId.toB58String()}: messages retrieved:`, response);
|
||||
if (response) {
|
||||
const messages = response
|
||||
|
@ -2,31 +2,34 @@ import { Reader } from 'protobufjs/minimal';
|
||||
import { v4 as uuid } from 'uuid';
|
||||
|
||||
import * as proto from '../../proto/waku/v2/store';
|
||||
import { DefaultContentTopic } from '../waku_message';
|
||||
import { DefaultPubsubTopic } from '../waku_relay';
|
||||
|
||||
export interface Options {
|
||||
contentTopics: string[];
|
||||
cursor?: proto.Index;
|
||||
pubsubTopic: string;
|
||||
}
|
||||
|
||||
export class HistoryRPC {
|
||||
public constructor(public proto: proto.HistoryRPC) {}
|
||||
|
||||
static createQuery(
|
||||
contentTopics: string[] = [DefaultContentTopic],
|
||||
cursor?: proto.Index,
|
||||
pubsubTopic: string = DefaultPubsubTopic
|
||||
): HistoryRPC {
|
||||
/**
|
||||
* Create History Query.
|
||||
*/
|
||||
static createQuery(options: Options): HistoryRPC {
|
||||
const pagingInfo = {
|
||||
pageSize: 10,
|
||||
cursor,
|
||||
cursor: options.cursor,
|
||||
direction: proto.PagingInfo_Direction.DIRECTION_FORWARD,
|
||||
};
|
||||
|
||||
const contentFilters = contentTopics.map((contentTopic) => {
|
||||
const contentFilters = options.contentTopics.map((contentTopic) => {
|
||||
return { contentTopic };
|
||||
});
|
||||
|
||||
return new HistoryRPC({
|
||||
requestId: uuid(),
|
||||
query: {
|
||||
pubsubTopic,
|
||||
pubsubTopic: options.pubsubTopic,
|
||||
contentFilters,
|
||||
pagingInfo,
|
||||
startTime: undefined,
|
||||
|
@ -3,7 +3,7 @@ import TCP from 'libp2p-tcp';
|
||||
|
||||
import { makeLogFileName, NimWaku, NOISE_KEY_1 } from '../../test_utils';
|
||||
import { Waku } from '../waku';
|
||||
import { WakuMessage } from '../waku_message';
|
||||
import { DefaultContentTopic, WakuMessage } from '../waku_message';
|
||||
|
||||
describe('Waku Store', () => {
|
||||
let waku: Waku;
|
||||
@ -39,7 +39,10 @@ describe('Waku Store', () => {
|
||||
|
||||
const nimPeerId = await nimWaku.getPeerId();
|
||||
|
||||
const messages = await waku.store.queryHistory(nimPeerId);
|
||||
const messages = await waku.store.queryHistory({
|
||||
peerId: nimPeerId,
|
||||
contentTopics: [],
|
||||
});
|
||||
|
||||
expect(messages?.length).eq(2);
|
||||
const result = messages?.findIndex((msg) => {
|
||||
@ -73,7 +76,10 @@ describe('Waku Store', () => {
|
||||
|
||||
const nimPeerId = await nimWaku.getPeerId();
|
||||
|
||||
const messages = await waku.store.queryHistory(nimPeerId);
|
||||
const messages = await waku.store.queryHistory({
|
||||
peerId: nimPeerId,
|
||||
contentTopics: [DefaultContentTopic],
|
||||
});
|
||||
|
||||
expect(messages?.length).eq(15);
|
||||
for (let index = 0; index < 2; index++) {
|
||||
|
@ -5,11 +5,18 @@ import Libp2p from 'libp2p';
|
||||
import PeerId from 'peer-id';
|
||||
|
||||
import { WakuMessage } from '../waku_message';
|
||||
import { DefaultPubsubTopic } from '../waku_relay';
|
||||
|
||||
import { HistoryRPC } from './history_rpc';
|
||||
|
||||
export const StoreCodec = '/vac/waku/store/2.0.0-beta3';
|
||||
|
||||
export interface Options {
|
||||
peerId: PeerId;
|
||||
contentTopics: string[];
|
||||
pubsubTopic?: string;
|
||||
}
|
||||
|
||||
/**
|
||||
* Implements the [Waku v2 Store protocol](https://rfc.vac.dev/spec/13/).
|
||||
*/
|
||||
@ -19,19 +26,23 @@ export class WakuStore {
|
||||
/**
|
||||
* Query given peer using Waku Store.
|
||||
*
|
||||
* @param peerId The peer to query.
|
||||
* @param contentTopics The content topics to retrieve, leave empty to
|
||||
* @param options
|
||||
* @param options.peerId The peer to query.
|
||||
* @param options.contentTopics The content topics to retrieve, leave empty to
|
||||
* retrieve all messages.
|
||||
* @param pubsubTopic The pubsub topic to retrieve. Currently, all waku nodes
|
||||
* @param options.pubsubTopic The pubsub topic to retrieve. Currently, all waku nodes
|
||||
* use the same pubsub topic. This is reserved for future applications.
|
||||
* @throws If not able to reach the peer to query.
|
||||
*/
|
||||
async queryHistory(
|
||||
peerId: PeerId,
|
||||
contentTopics?: string[],
|
||||
pubsubTopic?: string
|
||||
): Promise<WakuMessage[] | null> {
|
||||
const peer = this.libp2p.peerStore.get(peerId);
|
||||
async queryHistory(options: Options): Promise<WakuMessage[] | null> {
|
||||
const opts = Object.assign(
|
||||
{
|
||||
pubsubTopic: DefaultPubsubTopic,
|
||||
},
|
||||
options
|
||||
);
|
||||
|
||||
const peer = this.libp2p.peerStore.get(opts.peerId);
|
||||
if (!peer) throw 'Peer is unknown';
|
||||
if (!peer.protocols.includes(StoreCodec))
|
||||
throw 'Peer does not register waku store protocol';
|
||||
@ -44,11 +55,11 @@ export class WakuStore {
|
||||
try {
|
||||
const { stream } = await connection.newStream(StoreCodec);
|
||||
try {
|
||||
const historyRpcQuery = HistoryRPC.createQuery(
|
||||
contentTopics,
|
||||
const historyRpcQuery = HistoryRPC.createQuery({
|
||||
contentTopics: opts.contentTopics,
|
||||
cursor,
|
||||
pubsubTopic
|
||||
);
|
||||
pubsubTopic: opts.pubsubTopic,
|
||||
});
|
||||
const res = await pipe(
|
||||
[historyRpcQuery.encode()],
|
||||
lp.encode(),
|
||||
|
Loading…
x
Reference in New Issue
Block a user