Add options and start/end time to waku_store

This commit is contained in:
Richard Ramos 2021-04-14 22:17:53 -04:00
parent 547be3c951
commit e1f10d2099
No known key found for this signature in database
GPG Key ID: 80D4B01265FDFE8F
1 changed files with 127 additions and 14 deletions

View File

@ -33,6 +33,11 @@ const MaxPageSize = 100 // Maximum number of waku messages in each page
const ConnectionTimeout = 10 * time.Second
const DefaultContentTopic = "/waku/2/default-content/proto"
var (
ErrNoPeersAvailable = errors.New("no suitable remote peers")
ErrInvalidId = errors.New("invalid request id")
)
func minOf(vars ...int) int {
min := vars[0]
@ -144,6 +149,14 @@ func (w *WakuStore) FindMessages(query *protocol.HistoryQuery) *protocol.History
// data holds IndexedWakuMessage whose topics match the query
var data []IndexedWakuMessage
for _, indexedMsg := range w.messages {
// temporal filtering
// check whether the history query contains a time filter
if query.StartTime != 0 && query.EndTime != 0 {
if indexedMsg.msg.Timestamp < query.StartTime || indexedMsg.msg.Timestamp > query.EndTime {
continue
}
}
if contains(query.Topics, indexedMsg.msg.ContentTopic) {
data = append(data, indexedMsg)
}
@ -165,7 +178,7 @@ type IndexedWakuMessage struct {
}
type WakuStore struct {
msg chan *common.Envelope
MsgC chan *common.Envelope
messages []IndexedWakuMessage
messagesMutex sync.Mutex
@ -174,9 +187,9 @@ type WakuStore struct {
ctx context.Context
}
func NewWakuStore(ctx context.Context, h host.Host, msg chan *common.Envelope, p MessageProvider) *WakuStore {
func NewWakuStore(ctx context.Context, h host.Host, p MessageProvider) *WakuStore {
wakuStore := new(WakuStore)
wakuStore.msg = msg
wakuStore.MsgC = make(chan *common.Envelope)
wakuStore.msgProvider = p
wakuStore.h = h
wakuStore.ctx = ctx
@ -210,7 +223,7 @@ func (store *WakuStore) Start() {
}
func (store *WakuStore) storeIncomingMessages() {
for envelope := range store.msg {
for envelope := range store.MsgC {
index, err := computeIndex(envelope.Message())
if err != nil {
log.Error("could not calculate message index", err)
@ -364,7 +377,7 @@ var brHmacDrbgPool = sync.Pool{New: func() interface{} {
return hmacdrbg.NewHmacDrbg(256, seed, nil)
}}
func GenerateRequestId() string {
func GenerateRequestId() []byte {
rng := brHmacDrbgPool.Get().(*hmacdrbg.HmacDrbg)
defer brHmacDrbgPool.Put(rng)
@ -386,19 +399,119 @@ func GenerateRequestId() string {
log.Error("could not generate random request id")
}
}
return hex.EncodeToString(randData)
return randData
}
func (store *WakuStore) Query(q *protocol.HistoryQuery) (*protocol.HistoryResponse, error) {
peer := store.selectPeer()
if peer == nil {
return nil, errors.New("no suitable remote peers")
type HistoryRequestParameters struct {
selectedPeer *peer.ID
requestId []byte
timeout *time.Duration
ctx context.Context
cancelFunc context.CancelFunc
cursor *protocol.Index
pageSize uint64
asc bool
s *WakuStore
}
type HistoryRequestOption func(*HistoryRequestParameters)
func WithPeer(p string) HistoryRequestOption {
return func(params *HistoryRequestParameters) {
pid := peer.ID(p)
params.selectedPeer = &pid
}
}
func WithAutomaticPeerSelection() HistoryRequestOption {
return func(params *HistoryRequestParameters) {
params.selectedPeer = params.s.selectPeer()
}
}
func WithRequestId(requestId []byte) HistoryRequestOption {
return func(params *HistoryRequestParameters) {
params.requestId = requestId
}
}
func WithAutomaticRequestId() HistoryRequestOption {
return func(params *HistoryRequestParameters) {
params.requestId = GenerateRequestId()
}
}
func WithTimeout(t time.Duration) HistoryRequestOption {
return func(params *HistoryRequestParameters) {
params.timeout = &t
params.ctx, params.cancelFunc = context.WithTimeout(params.s.ctx, t)
}
}
func WithCursor(c *protocol.Index) HistoryRequestOption {
return func(params *HistoryRequestParameters) {
params.cursor = c
}
}
func WithPaging(asc bool, pageSize uint64) HistoryRequestOption {
return func(params *HistoryRequestParameters) {
params.asc = asc
params.pageSize = pageSize
}
}
func DefaultOptions() []HistoryRequestOption {
return []HistoryRequestOption{
WithAutomaticRequestId(),
WithAutomaticPeerSelection(),
WithTimeout(ConnectionTimeout),
WithPaging(true, 0),
}
}
func (store *WakuStore) Query(q *protocol.HistoryQuery, opts ...HistoryRequestOption) (*protocol.HistoryResponse, error) {
params := new(HistoryRequestParameters)
params.s = store
for _, opt := range opts {
opt(params)
}
ctx, cancel := context.WithTimeout(store.ctx, ConnectionTimeout)
defer cancel()
if params.selectedPeer == nil {
return nil, ErrNoPeersAvailable
}
connOpt, err := store.h.NewStream(ctx, *peer, WakuStoreProtocolId)
if len(params.requestId) == 0 {
return nil, ErrInvalidId
}
// Setting default timeout if none is specified
if params.timeout == nil {
timeoutF := WithTimeout(ConnectionTimeout)
timeoutF(params)
}
if *params.timeout == 0 {
params.ctx = store.ctx
} else {
defer params.cancelFunc()
}
if params.cursor != nil {
q.PagingInfo.Cursor = params.cursor
}
if params.asc {
q.PagingInfo.Direction = protocol.PagingInfo_FORWARD
} else {
q.PagingInfo.Direction = protocol.PagingInfo_BACKWARD
}
q.PagingInfo.PageSize = params.pageSize
connOpt, err := store.h.NewStream(params.ctx, *params.selectedPeer, WakuStoreProtocolId)
if err != nil {
log.Info("failed to connect to remote peer", err)
return nil, err
@ -407,7 +520,7 @@ func (store *WakuStore) Query(q *protocol.HistoryQuery) (*protocol.HistoryRespon
defer connOpt.Close()
defer connOpt.Reset()
historyRequest := &protocol.HistoryRPC{Query: q, RequestId: GenerateRequestId()}
historyRequest := &protocol.HistoryRPC{Query: q, RequestId: hex.EncodeToString(params.requestId)}
writer := protoio.NewDelimitedWriter(connOpt)
reader := protoio.NewDelimitedReader(connOpt, 64*1024)