WakuStore.queryHistory throws when encountering an error

Instead of returning a `null` value.
This commit is contained in:
Franck Royer 2021-08-04 12:35:47 +10:00
parent b422c9a10b
commit 319f44a0b1
No known key found for this signature in database
GPG Key ID: A82ED75A8DFC50A4
3 changed files with 83 additions and 97 deletions

View File

@ -24,6 +24,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Changed ### Changed
- **Breaking**: The `WakuMessage` APIs have been changed to move `contentTopic` out of the optional parameters. - **Breaking**: The `WakuMessage` APIs have been changed to move `contentTopic` out of the optional parameters.
- **Breaking**: Move `contentTopics` out the `WakuStore.queryHistory`'s optional parameters. - **Breaking**: Move `contentTopics` out the `WakuStore.queryHistory`'s optional parameters.
- **Breaking**: `WakuStore.queryHistory` throws when encountering an error instead of returning a `null` value.
### Removed ### Removed
- Examples (web-chat): Remove broken `/fleet` command. - Examples (web-chat): Remove broken `/fleet` command.

View File

@ -62,13 +62,18 @@ async function retrieveStoreMessages(
setArchivedMessages(messages); setArchivedMessages(messages);
}; };
const res = await waku.store.queryHistory([ChatContentTopic], { try {
pageSize: 5, const res = await waku.store.queryHistory([ChatContentTopic], {
direction: Direction.FORWARD, pageSize: 5,
callback, direction: Direction.FORWARD,
}); callback,
});
return res ? res.length : 0; return res.length;
} catch {
console.log('Failed to retrieve messages');
return 0;
}
} }
export default function App() { export default function App() {

View File

@ -67,12 +67,12 @@ export class WakuStore {
* @param options.decryptionKeys Keys that will be used to decrypt messages. * @param options.decryptionKeys Keys that will be used to decrypt messages.
* It can be Asymmetric Private Keys and Symmetric Keys in the same array, all keys will be tried with both * It can be Asymmetric Private Keys and Symmetric Keys in the same array, all keys will be tried with both
* methods. * methods.
* @throws If not able to reach the peer to query. * @throws If not able to reach the peer to query or error when processing the reply.
*/ */
async queryHistory( async queryHistory(
contentTopics: string[], contentTopics: string[],
options?: QueryOptions options?: QueryOptions
): Promise<WakuMessage[] | null> { ): Promise<WakuMessage[]> {
const opts = Object.assign( const opts = Object.assign(
{ {
pubsubTopic: this.pubsubTopic, pubsubTopic: this.pubsubTopic,
@ -100,98 +100,78 @@ export class WakuStore {
const messages: WakuMessage[] = []; const messages: WakuMessage[] = [];
let cursor = undefined; let cursor = undefined;
while (true) { while (true) {
try { const { stream } = await connection.newStream(StoreCodec);
const { stream } = await connection.newStream(StoreCodec); const queryOpts = Object.assign(opts, { cursor });
try { const historyRpcQuery = HistoryRPC.createQuery(queryOpts);
const queryOpts = Object.assign(opts, { cursor }); const res = await pipe(
const historyRpcQuery = HistoryRPC.createQuery(queryOpts); [historyRpcQuery.encode()],
const res = await pipe( lp.encode(),
[historyRpcQuery.encode()], stream,
lp.encode(), lp.decode(),
stream, concat
lp.decode(), );
concat const reply = HistoryRPC.decode(res.slice());
const response = reply.response;
if (!response) {
throw 'History response misses response field';
}
if (
response.error &&
response.error === HistoryResponse_Error.ERROR_INVALID_CURSOR
) {
throw 'History response contains an Error: INVALID CURSOR';
}
if (!response.messages || !response.messages.length) {
// No messages left (or stored)
console.log('No messages present in HistoryRPC response');
return messages;
}
dbg(
`${response.messages.length} messages retrieved for pubsub topic ${opts.pubsubTopic}`
);
const pageMessages: WakuMessage[] = [];
await Promise.all(
response.messages.map(async (protoMsg) => {
const msg = await WakuMessage.decodeProto(
protoMsg,
opts.decryptionKeys
); );
try {
const reply = HistoryRPC.decode(res.slice());
const response = reply.response; if (msg) {
if (!response) { messages.push(msg);
console.log('No response in HistoryRPC'); pageMessages.push(msg);
return null;
}
if (
response.error &&
response.error === HistoryResponse_Error.ERROR_INVALID_CURSOR
) {
console.log('Error in response: INVALID CURSOR');
return null;
}
if (!response.messages || !response.messages.length) {
// No messages left (or stored)
console.log('No messages present in HistoryRPC response');
return messages;
}
dbg(
`${response.messages.length} messages retrieved for pubsub topic ${opts.pubsubTopic}`
);
const pageMessages: WakuMessage[] = [];
await Promise.all(
response.messages.map(async (protoMsg) => {
const msg = await WakuMessage.decodeProto(
protoMsg,
opts.decryptionKeys
);
if (msg) {
messages.push(msg);
pageMessages.push(msg);
}
})
);
if (opts.callback) {
// TODO: Test the callback feature
// TODO: Change callback to take individual messages
opts.callback(pageMessages);
}
const responsePageSize = response.pagingInfo?.pageSize;
const queryPageSize = historyRpcQuery.query?.pagingInfo?.pageSize;
if (
responsePageSize &&
queryPageSize &&
responsePageSize < queryPageSize
) {
// Response page size smaller than query, meaning this is the last page
return messages;
}
cursor = response.pagingInfo?.cursor;
if (cursor === undefined) {
// If the server does not return cursor then there is an issue,
// Need to abort or we end up in an infinite loop
console.log('No cursor returned by peer.');
return messages;
}
} catch (err) {
console.log('Failed to decode store reply', err);
return null;
} }
} catch (err) { })
console.log('Failed to send waku store query', err); );
return null;
} if (opts.callback) {
} catch (err) { // TODO: Test the callback feature
console.log( // TODO: Change callback to take individual messages
'Failed to negotiate waku store protocol stream with peer', opts.callback(pageMessages);
err }
);
return null; const responsePageSize = response.pagingInfo?.pageSize;
const queryPageSize = historyRpcQuery.query?.pagingInfo?.pageSize;
if (
responsePageSize &&
queryPageSize &&
responsePageSize < queryPageSize
) {
// Response page size smaller than query, meaning this is the last page
return messages;
}
cursor = response.pagingInfo?.cursor;
if (cursor === undefined) {
// If the server does not return cursor then there is an issue,
// Need to abort or we end up in an infinite loop
console.log('No cursor returned by peer.');
return messages;
} }
} }
} }