fix: numeric identifiers and index out of bounds issue on store

This commit is contained in:
Richard Ramos 2021-04-12 13:59:09 -04:00
parent d85c857a74
commit ad9abe601c
No known key found for this signature in database
GPG Key ID: 80D4B01265FDFE8F
3 changed files with 23 additions and 18 deletions

View File

@ -119,6 +119,10 @@ func (w *WakuNode) Stop() {
} }
w.subscriptions = nil w.subscriptions = nil
if w.store != nil {
w.store.Stop()
}
} }
func (w *WakuNode) Host() host.Host { func (w *WakuNode) Host() host.Host {

View File

@ -20,15 +20,15 @@ message PagingInfo {
} }
message HistoryQuery { message HistoryQuery {
repeated string topics = 1; repeated string topics = 2;
PagingInfo pagingInfo = 2; // used for pagination PagingInfo pagingInfo = 3; // used for pagination
double startTime = 3; double startTime = 4;
double endTime = 4; double endTime = 5;
} }
message HistoryResponse { message HistoryResponse {
repeated WakuMessage messages = 1; repeated WakuMessage messages = 2;
PagingInfo pagingInfo = 2; // used for pagination PagingInfo pagingInfo = 3; // used for pagination
} }
message HistoryRPC { message HistoryRPC {

View File

@ -88,28 +88,24 @@ func paginateWithIndex(list []IndexedWakuMessage, pinfo *protocol.PagingInfo) (r
switch dir { switch dir {
case protocol.PagingInfo_FORWARD: // forward pagination case protocol.PagingInfo_FORWARD: // forward pagination
remainingMessages := len(msgList) - foundIndex - 1 remainingMessages := len(msgList) - foundIndex - 1
// the number of queried messages cannot exceed the MaxPageSize and the total remaining messages i.e., msgList.len-foundIndex
retrievedPageSize = minOf(int(pageSize), MaxPageSize, remainingMessages)
if initQuery { if initQuery {
remainingMessages = remainingMessages + 1
foundIndex = foundIndex - 1 foundIndex = foundIndex - 1
} }
// the number of queried messages cannot exceed the MaxPageSize and the total remaining messages i.e., msgList.len-foundIndex
retrievedPageSize = minOf(int(pageSize), MaxPageSize, remainingMessages)
s = foundIndex + 1 // non inclusive s = foundIndex + 1 // non inclusive
e = foundIndex + retrievedPageSize e = foundIndex + retrievedPageSize
if e < 0 {
e = 0
}
newCursor = msgList[e].index // the new cursor points to the end of the page newCursor = msgList[e].index // the new cursor points to the end of the page
case protocol.PagingInfo_BACKWARD: // backward pagination case protocol.PagingInfo_BACKWARD: // backward pagination
remainingMessages := foundIndex remainingMessages := foundIndex
// the number of queried messages cannot exceed the MaxPageSize and the total remaining messages i.e., foundIndex-0
retrievedPageSize = minOf(int(pageSize), MaxPageSize, remainingMessages)
if initQuery { if initQuery {
remainingMessages = remainingMessages + 1
foundIndex = foundIndex + 1 foundIndex = foundIndex + 1
} }
// the number of queried messages cannot exceed the MaxPageSize and the total remaining messages i.e., foundIndex-0
retrievedPageSize = minOf(int(pageSize), MaxPageSize, remainingMessages)
s = foundIndex - retrievedPageSize s = foundIndex - retrievedPageSize
if s >= len(msgList) {
s = len(msgList) - 1
}
e = foundIndex - 1 e = foundIndex - 1
newCursor = msgList[s].index // the new cursor points to the begining of the page newCursor = msgList[s].index // the new cursor points to the begining of the page
} }
@ -159,7 +155,8 @@ func (w *WakuStore) FindMessages(query *protocol.HistoryQuery) *protocol.History
type MessageProvider interface { type MessageProvider interface {
GetAll() ([]*protocol.WakuMessage, error) GetAll() ([]*protocol.WakuMessage, error)
Put(message *protocol.WakuMessage) error Put(cursor *protocol.Index, message *protocol.WakuMessage) error
Stop()
} }
type IndexedWakuMessage struct { type IndexedWakuMessage struct {
@ -212,6 +209,10 @@ func (store *WakuStore) Start() {
go store.storeIncomingMessages() go store.storeIncomingMessages()
} }
func (store *WakuStore) Stop() {
store.msgProvider.Stop()
}
func (store *WakuStore) storeIncomingMessages() { func (store *WakuStore) storeIncomingMessages() {
for envelope := range store.msg { for envelope := range store.msg {
index, err := computeIndex(envelope.Message()) index, err := computeIndex(envelope.Message())
@ -228,7 +229,7 @@ func (store *WakuStore) storeIncomingMessages() {
continue continue
} }
err = store.msgProvider.Put(envelope.Message()) // Should the index be stored? err = store.msgProvider.Put(index, envelope.Message()) // Should the index be stored?
if err != nil { if err != nil {
log.Error("could not store message", err) log.Error("could not store message", err)
continue continue