From ad9abe601c279aade9fd8d8b77eb289cc75a7a3b Mon Sep 17 00:00:00 2001 From: Richard Ramos Date: Mon, 12 Apr 2021 13:59:09 -0400 Subject: [PATCH] fix: numeric identifiers and index out of bounds issue on store --- waku/v2/node/wakunode2.go | 4 ++++ waku/v2/protocol/waku_store.proto | 12 +++++------ waku/v2/protocol/waku_store/waku_store.go | 25 ++++++++++++----------- 3 files changed, 23 insertions(+), 18 deletions(-) diff --git a/waku/v2/node/wakunode2.go b/waku/v2/node/wakunode2.go index fecdb323..8d427e7a 100644 --- a/waku/v2/node/wakunode2.go +++ b/waku/v2/node/wakunode2.go @@ -119,6 +119,10 @@ func (w *WakuNode) Stop() { } w.subscriptions = nil + + if w.store != nil { + w.store.Stop() + } } func (w *WakuNode) Host() host.Host { diff --git a/waku/v2/protocol/waku_store.proto b/waku/v2/protocol/waku_store.proto index 3fe260af..6b8648c9 100644 --- a/waku/v2/protocol/waku_store.proto +++ b/waku/v2/protocol/waku_store.proto @@ -20,15 +20,15 @@ message PagingInfo { } message HistoryQuery { - repeated string topics = 1; - PagingInfo pagingInfo = 2; // used for pagination - double startTime = 3; - double endTime = 4; + repeated string topics = 2; + PagingInfo pagingInfo = 3; // used for pagination + double startTime = 4; + double endTime = 5; } message HistoryResponse { - repeated WakuMessage messages = 1; - PagingInfo pagingInfo = 2; // used for pagination + repeated WakuMessage messages = 2; + PagingInfo pagingInfo = 3; // used for pagination } message HistoryRPC { diff --git a/waku/v2/protocol/waku_store/waku_store.go b/waku/v2/protocol/waku_store/waku_store.go index 0626395d..a211b9a5 100644 --- a/waku/v2/protocol/waku_store/waku_store.go +++ b/waku/v2/protocol/waku_store/waku_store.go @@ -88,28 +88,24 @@ func paginateWithIndex(list []IndexedWakuMessage, pinfo *protocol.PagingInfo) (r switch dir { case protocol.PagingInfo_FORWARD: // forward pagination remainingMessages := len(msgList) - foundIndex - 1 - // the number of queried messages cannot exceed the MaxPageSize and the total remaining messages i.e., msgList.len-foundIndex - retrievedPageSize = minOf(int(pageSize), MaxPageSize, remainingMessages) if initQuery { + remainingMessages = remainingMessages + 1 foundIndex = foundIndex - 1 } + // the number of queried messages cannot exceed the MaxPageSize and the total remaining messages i.e., msgList.len-foundIndex + retrievedPageSize = minOf(int(pageSize), MaxPageSize, remainingMessages) s = foundIndex + 1 // non inclusive e = foundIndex + retrievedPageSize - if e < 0 { - e = 0 - } newCursor = msgList[e].index // the new cursor points to the end of the page case protocol.PagingInfo_BACKWARD: // backward pagination remainingMessages := foundIndex - // the number of queried messages cannot exceed the MaxPageSize and the total remaining messages i.e., foundIndex-0 - retrievedPageSize = minOf(int(pageSize), MaxPageSize, remainingMessages) if initQuery { + remainingMessages = remainingMessages + 1 foundIndex = foundIndex + 1 } + // the number of queried messages cannot exceed the MaxPageSize and the total remaining messages i.e., foundIndex-0 + retrievedPageSize = minOf(int(pageSize), MaxPageSize, remainingMessages) s = foundIndex - retrievedPageSize - if s >= len(msgList) { - s = len(msgList) - 1 - } e = foundIndex - 1 newCursor = msgList[s].index // the new cursor points to the begining of the page } @@ -159,7 +155,8 @@ func (w *WakuStore) FindMessages(query *protocol.HistoryQuery) *protocol.History type MessageProvider interface { GetAll() ([]*protocol.WakuMessage, error) - Put(message *protocol.WakuMessage) error + Put(cursor *protocol.Index, message *protocol.WakuMessage) error + Stop() } type IndexedWakuMessage struct { @@ -212,6 +209,10 @@ func (store *WakuStore) Start() { go store.storeIncomingMessages() } +func (store *WakuStore) Stop() { + store.msgProvider.Stop() +} + func (store *WakuStore) storeIncomingMessages() { for envelope := range store.msg { index, err := computeIndex(envelope.Message()) @@ -228,7 +229,7 @@ func (store *WakuStore) storeIncomingMessages() { continue } - err = store.msgProvider.Put(envelope.Message()) // Should the index be stored? + err = store.msgProvider.Put(index, envelope.Message()) // Should the index be stored? if err != nil { log.Error("could not store message", err) continue