From 319f44a0b1185807c1f1423a63449744649dde25 Mon Sep 17 00:00:00 2001 From: Franck Royer Date: Wed, 4 Aug 2021 12:35:47 +1000 Subject: [PATCH] `WakuStore.queryHistory` throws when encountering an error Instead of returning a `null` value. --- CHANGELOG.md | 1 + examples/web-chat/src/App.tsx | 17 ++-- src/lib/waku_store/index.ts | 162 +++++++++++++++------------------- 3 files changed, 83 insertions(+), 97 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 19df7fd74f..8969317afa 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -24,6 +24,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Changed - **Breaking**: The `WakuMessage` APIs have been changed to move `contentTopic` out of the optional parameters. - **Breaking**: Move `contentTopics` out the `WakuStore.queryHistory`'s optional parameters. +- **Breaking**: `WakuStore.queryHistory` throws when encountering an error instead of returning a `null` value. ### Removed - Examples (web-chat): Remove broken `/fleet` command. diff --git a/examples/web-chat/src/App.tsx b/examples/web-chat/src/App.tsx index 728471a103..7ae79dd814 100644 --- a/examples/web-chat/src/App.tsx +++ b/examples/web-chat/src/App.tsx @@ -62,13 +62,18 @@ async function retrieveStoreMessages( setArchivedMessages(messages); }; - const res = await waku.store.queryHistory([ChatContentTopic], { - pageSize: 5, - direction: Direction.FORWARD, - callback, - }); + try { + const res = await waku.store.queryHistory([ChatContentTopic], { + pageSize: 5, + direction: Direction.FORWARD, + callback, + }); - return res ? res.length : 0; + return res.length; + } catch { + console.log('Failed to retrieve messages'); + return 0; + } } export default function App() { diff --git a/src/lib/waku_store/index.ts b/src/lib/waku_store/index.ts index 534bf4b9e5..c862ba6354 100644 --- a/src/lib/waku_store/index.ts +++ b/src/lib/waku_store/index.ts @@ -67,12 +67,12 @@ export class WakuStore { * @param options.decryptionKeys Keys that will be used to decrypt messages. * It can be Asymmetric Private Keys and Symmetric Keys in the same array, all keys will be tried with both * methods. - * @throws If not able to reach the peer to query. + * @throws If not able to reach the peer to query or error when processing the reply. */ async queryHistory( contentTopics: string[], options?: QueryOptions - ): Promise { + ): Promise { const opts = Object.assign( { pubsubTopic: this.pubsubTopic, @@ -100,98 +100,78 @@ export class WakuStore { const messages: WakuMessage[] = []; let cursor = undefined; while (true) { - try { - const { stream } = await connection.newStream(StoreCodec); - try { - const queryOpts = Object.assign(opts, { cursor }); - const historyRpcQuery = HistoryRPC.createQuery(queryOpts); - const res = await pipe( - [historyRpcQuery.encode()], - lp.encode(), - stream, - lp.decode(), - concat + const { stream } = await connection.newStream(StoreCodec); + const queryOpts = Object.assign(opts, { cursor }); + const historyRpcQuery = HistoryRPC.createQuery(queryOpts); + const res = await pipe( + [historyRpcQuery.encode()], + lp.encode(), + stream, + lp.decode(), + concat + ); + const reply = HistoryRPC.decode(res.slice()); + + const response = reply.response; + if (!response) { + throw 'History response misses response field'; + } + + if ( + response.error && + response.error === HistoryResponse_Error.ERROR_INVALID_CURSOR + ) { + throw 'History response contains an Error: INVALID CURSOR'; + } + + if (!response.messages || !response.messages.length) { + // No messages left (or stored) + console.log('No messages present in HistoryRPC response'); + return messages; + } + + dbg( + `${response.messages.length} messages retrieved for pubsub topic ${opts.pubsubTopic}` + ); + + const pageMessages: WakuMessage[] = []; + await Promise.all( + response.messages.map(async (protoMsg) => { + const msg = await WakuMessage.decodeProto( + protoMsg, + opts.decryptionKeys ); - try { - const reply = HistoryRPC.decode(res.slice()); - const response = reply.response; - if (!response) { - console.log('No response in HistoryRPC'); - return null; - } - - if ( - response.error && - response.error === HistoryResponse_Error.ERROR_INVALID_CURSOR - ) { - console.log('Error in response: INVALID CURSOR'); - return null; - } - - if (!response.messages || !response.messages.length) { - // No messages left (or stored) - console.log('No messages present in HistoryRPC response'); - return messages; - } - - dbg( - `${response.messages.length} messages retrieved for pubsub topic ${opts.pubsubTopic}` - ); - - const pageMessages: WakuMessage[] = []; - await Promise.all( - response.messages.map(async (protoMsg) => { - const msg = await WakuMessage.decodeProto( - protoMsg, - opts.decryptionKeys - ); - - if (msg) { - messages.push(msg); - pageMessages.push(msg); - } - }) - ); - - if (opts.callback) { - // TODO: Test the callback feature - // TODO: Change callback to take individual messages - opts.callback(pageMessages); - } - - const responsePageSize = response.pagingInfo?.pageSize; - const queryPageSize = historyRpcQuery.query?.pagingInfo?.pageSize; - if ( - responsePageSize && - queryPageSize && - responsePageSize < queryPageSize - ) { - // Response page size smaller than query, meaning this is the last page - return messages; - } - - cursor = response.pagingInfo?.cursor; - if (cursor === undefined) { - // If the server does not return cursor then there is an issue, - // Need to abort or we end up in an infinite loop - console.log('No cursor returned by peer.'); - return messages; - } - } catch (err) { - console.log('Failed to decode store reply', err); - return null; + if (msg) { + messages.push(msg); + pageMessages.push(msg); } - } catch (err) { - console.log('Failed to send waku store query', err); - return null; - } - } catch (err) { - console.log( - 'Failed to negotiate waku store protocol stream with peer', - err - ); - return null; + }) + ); + + if (opts.callback) { + // TODO: Test the callback feature + // TODO: Change callback to take individual messages + opts.callback(pageMessages); + } + + const responsePageSize = response.pagingInfo?.pageSize; + const queryPageSize = historyRpcQuery.query?.pagingInfo?.pageSize; + if ( + responsePageSize && + queryPageSize && + responsePageSize < queryPageSize + ) { + // Response page size smaller than query, meaning this is the last page + return messages; + } + + cursor = response.pagingInfo?.cursor; + if (cursor === undefined) { + // If the server does not return cursor then there is an issue, + // Need to abort or we end up in an infinite loop + console.log('No cursor returned by peer.'); + return messages; } } }