From e1f10d2099dc6a1eda910a4a6678ee4d87dee481 Mon Sep 17 00:00:00 2001 From: Richard Ramos Date: Wed, 14 Apr 2021 22:17:53 -0400 Subject: [PATCH] Add options and start/end time to waku_store --- waku/v2/protocol/waku_store/waku_store.go | 141 +++++++++++++++++++--- 1 file changed, 127 insertions(+), 14 deletions(-) diff --git a/waku/v2/protocol/waku_store/waku_store.go b/waku/v2/protocol/waku_store/waku_store.go index 83af6d3d..f2a2bc6f 100644 --- a/waku/v2/protocol/waku_store/waku_store.go +++ b/waku/v2/protocol/waku_store/waku_store.go @@ -33,6 +33,11 @@ const MaxPageSize = 100 // Maximum number of waku messages in each page const ConnectionTimeout = 10 * time.Second const DefaultContentTopic = "/waku/2/default-content/proto" +var ( + ErrNoPeersAvailable = errors.New("no suitable remote peers") + ErrInvalidId = errors.New("invalid request id") +) + func minOf(vars ...int) int { min := vars[0] @@ -144,6 +149,14 @@ func (w *WakuStore) FindMessages(query *protocol.HistoryQuery) *protocol.History // data holds IndexedWakuMessage whose topics match the query var data []IndexedWakuMessage for _, indexedMsg := range w.messages { + // temporal filtering + // check whether the history query contains a time filter + if query.StartTime != 0 && query.EndTime != 0 { + if indexedMsg.msg.Timestamp < query.StartTime || indexedMsg.msg.Timestamp > query.EndTime { + continue + } + } + if contains(query.Topics, indexedMsg.msg.ContentTopic) { data = append(data, indexedMsg) } @@ -165,7 +178,7 @@ type IndexedWakuMessage struct { } type WakuStore struct { - msg chan *common.Envelope + MsgC chan *common.Envelope messages []IndexedWakuMessage messagesMutex sync.Mutex @@ -174,9 +187,9 @@ type WakuStore struct { ctx context.Context } -func NewWakuStore(ctx context.Context, h host.Host, msg chan *common.Envelope, p MessageProvider) *WakuStore { +func NewWakuStore(ctx context.Context, h host.Host, p MessageProvider) *WakuStore { wakuStore := new(WakuStore) - wakuStore.msg = msg + wakuStore.MsgC = make(chan *common.Envelope) wakuStore.msgProvider = p wakuStore.h = h wakuStore.ctx = ctx @@ -210,7 +223,7 @@ func (store *WakuStore) Start() { } func (store *WakuStore) storeIncomingMessages() { - for envelope := range store.msg { + for envelope := range store.MsgC { index, err := computeIndex(envelope.Message()) if err != nil { log.Error("could not calculate message index", err) @@ -364,7 +377,7 @@ var brHmacDrbgPool = sync.Pool{New: func() interface{} { return hmacdrbg.NewHmacDrbg(256, seed, nil) }} -func GenerateRequestId() string { +func GenerateRequestId() []byte { rng := brHmacDrbgPool.Get().(*hmacdrbg.HmacDrbg) defer brHmacDrbgPool.Put(rng) @@ -386,19 +399,119 @@ func GenerateRequestId() string { log.Error("could not generate random request id") } } - return hex.EncodeToString(randData) + return randData } -func (store *WakuStore) Query(q *protocol.HistoryQuery) (*protocol.HistoryResponse, error) { - peer := store.selectPeer() - if peer == nil { - return nil, errors.New("no suitable remote peers") +type HistoryRequestParameters struct { + selectedPeer *peer.ID + requestId []byte + timeout *time.Duration + ctx context.Context + cancelFunc context.CancelFunc + + cursor *protocol.Index + pageSize uint64 + asc bool + + s *WakuStore +} + +type HistoryRequestOption func(*HistoryRequestParameters) + +func WithPeer(p string) HistoryRequestOption { + return func(params *HistoryRequestParameters) { + pid := peer.ID(p) + params.selectedPeer = &pid + } +} + +func WithAutomaticPeerSelection() HistoryRequestOption { + return func(params *HistoryRequestParameters) { + params.selectedPeer = params.s.selectPeer() + } +} + +func WithRequestId(requestId []byte) HistoryRequestOption { + return func(params *HistoryRequestParameters) { + params.requestId = requestId + } +} + +func WithAutomaticRequestId() HistoryRequestOption { + return func(params *HistoryRequestParameters) { + params.requestId = GenerateRequestId() + } +} + +func WithTimeout(t time.Duration) HistoryRequestOption { + return func(params *HistoryRequestParameters) { + params.timeout = &t + params.ctx, params.cancelFunc = context.WithTimeout(params.s.ctx, t) + } +} + +func WithCursor(c *protocol.Index) HistoryRequestOption { + return func(params *HistoryRequestParameters) { + params.cursor = c + } +} + +func WithPaging(asc bool, pageSize uint64) HistoryRequestOption { + return func(params *HistoryRequestParameters) { + params.asc = asc + params.pageSize = pageSize + } +} + +func DefaultOptions() []HistoryRequestOption { + return []HistoryRequestOption{ + WithAutomaticRequestId(), + WithAutomaticPeerSelection(), + WithTimeout(ConnectionTimeout), + WithPaging(true, 0), + } +} + +func (store *WakuStore) Query(q *protocol.HistoryQuery, opts ...HistoryRequestOption) (*protocol.HistoryResponse, error) { + params := new(HistoryRequestParameters) + params.s = store + for _, opt := range opts { + opt(params) } - ctx, cancel := context.WithTimeout(store.ctx, ConnectionTimeout) - defer cancel() + if params.selectedPeer == nil { + return nil, ErrNoPeersAvailable + } - connOpt, err := store.h.NewStream(ctx, *peer, WakuStoreProtocolId) + if len(params.requestId) == 0 { + return nil, ErrInvalidId + } + + // Setting default timeout if none is specified + if params.timeout == nil { + timeoutF := WithTimeout(ConnectionTimeout) + timeoutF(params) + } + + if *params.timeout == 0 { + params.ctx = store.ctx + } else { + defer params.cancelFunc() + } + + if params.cursor != nil { + q.PagingInfo.Cursor = params.cursor + } + + if params.asc { + q.PagingInfo.Direction = protocol.PagingInfo_FORWARD + } else { + q.PagingInfo.Direction = protocol.PagingInfo_BACKWARD + } + + q.PagingInfo.PageSize = params.pageSize + + connOpt, err := store.h.NewStream(params.ctx, *params.selectedPeer, WakuStoreProtocolId) if err != nil { log.Info("failed to connect to remote peer", err) return nil, err @@ -407,7 +520,7 @@ func (store *WakuStore) Query(q *protocol.HistoryQuery) (*protocol.HistoryRespon defer connOpt.Close() defer connOpt.Reset() - historyRequest := &protocol.HistoryRPC{Query: q, RequestId: GenerateRequestId()} + historyRequest := &protocol.HistoryRPC{Query: q, RequestId: hex.EncodeToString(params.requestId)} writer := protoio.NewDelimitedWriter(connOpt) reader := protoio.NewDelimitedReader(connOpt, 64*1024)