feat: find by criteria

This commit is contained in:
Richard Ramos 2022-11-14 10:21:16 -04:00 committed by RichΛrd
parent c8fc0404d1
commit dbe152b8e5
2 changed files with 108 additions and 0 deletions

View File

@ -169,9 +169,12 @@ type WakuStore struct {
swap *swap.WakuSwap swap *swap.WakuSwap
} }
type criteriaFN = func(msg *pb.WakuMessage) (bool, error)
type Store interface { type Store interface {
Start(ctx context.Context) Start(ctx context.Context)
Query(ctx context.Context, query Query, opts ...HistoryRequestOption) (*Result, error) Query(ctx context.Context, query Query, opts ...HistoryRequestOption) (*Result, error)
Find(ctx context.Context, query Query, cb criteriaFN, opts ...HistoryRequestOption) (*pb.WakuMessage, error)
Next(ctx context.Context, r *Result) (*Result, error) Next(ctx context.Context, r *Result) (*Result, error)
Resume(ctx context.Context, pubsubTopic string, peerList []peer.ID) (int, error) Resume(ctx context.Context, pubsubTopic string, peerList []peer.ID) (int, error)
MessageChannel() chan *protocol.Envelope MessageChannel() chan *protocol.Envelope
@ -499,6 +502,42 @@ func (store *WakuStore) Query(ctx context.Context, query Query, opts ...HistoryR
return result, nil return result, nil
} }
// Find the first message that matches a criteria. criteriaCB is a function that will be invoked for each message and returns true if the message matches the criteria
func (store *WakuStore) Find(ctx context.Context, query Query, cb criteriaFN, opts ...HistoryRequestOption) (*pb.WakuMessage, error) {
if cb == nil {
return nil, errors.New("callback can't be null")
}
result, err := store.Query(ctx, query, opts...)
if err != nil {
return nil, err
}
for {
for _, m := range result.Messages {
found, err := cb(m)
if err != nil {
return nil, err
}
if found {
return m, nil
}
}
if result.IsComplete() {
break
}
result, err = store.Next(ctx, result)
if err != nil {
return nil, err
}
}
return nil, nil
}
// Next is used with to retrieve the next page of rows from a query response. // Next is used with to retrieve the next page of rows from a query response.
// If no more records are found, the result will not contain any messages. // If no more records are found, the result will not contain any messages.
// This function is useful for iterating over results without having to manually // This function is useful for iterating over results without having to manually

View File

@ -125,3 +125,72 @@ func TestWakuStoreProtocolNext(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
require.Len(t, response.Messages, 0) require.Len(t, response.Messages, 0)
} }
func TestWakuStoreProtocolFind(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
host1, err := libp2p.New(libp2p.DefaultTransports, libp2p.ListenAddrStrings("/ip4/0.0.0.0/tcp/0"))
require.NoError(t, err)
db := MemoryDB(t)
s1 := NewWakuStore(host1, nil, db, utils.Logger())
s1.Start(ctx)
defer s1.Stop()
topic1 := "1"
pubsubTopic1 := "topic1"
msg1 := tests.CreateWakuMessage(topic1, 1)
msg2 := tests.CreateWakuMessage(topic1, 2)
msg3 := tests.CreateWakuMessage(topic1, 3)
msg4 := tests.CreateWakuMessage(topic1, 4)
msg5 := tests.CreateWakuMessage(topic1, 5)
msg6 := tests.CreateWakuMessage(topic1, 6)
msg7 := tests.CreateWakuMessage("hello", 7)
msg8 := tests.CreateWakuMessage(topic1, 8)
msg9 := tests.CreateWakuMessage(topic1, 9)
s1.MsgC <- protocol.NewEnvelope(msg1, utils.GetUnixEpoch(), pubsubTopic1)
s1.MsgC <- protocol.NewEnvelope(msg2, utils.GetUnixEpoch(), pubsubTopic1)
s1.MsgC <- protocol.NewEnvelope(msg3, utils.GetUnixEpoch(), pubsubTopic1)
s1.MsgC <- protocol.NewEnvelope(msg4, utils.GetUnixEpoch(), pubsubTopic1)
s1.MsgC <- protocol.NewEnvelope(msg5, utils.GetUnixEpoch(), pubsubTopic1)
s1.MsgC <- protocol.NewEnvelope(msg6, utils.GetUnixEpoch(), pubsubTopic1)
s1.MsgC <- protocol.NewEnvelope(msg7, utils.GetUnixEpoch(), pubsubTopic1)
s1.MsgC <- protocol.NewEnvelope(msg8, utils.GetUnixEpoch(), pubsubTopic1)
s1.MsgC <- protocol.NewEnvelope(msg9, utils.GetUnixEpoch(), pubsubTopic1)
host2, err := libp2p.New(libp2p.DefaultTransports, libp2p.ListenAddrStrings("/ip4/0.0.0.0/tcp/0"))
require.NoError(t, err)
host2.Peerstore().AddAddr(host1.ID(), tests.GetHostAddress(host1), peerstore.PermanentAddrTTL)
err = host2.Peerstore().AddProtocols(host1.ID(), string(StoreID_v20beta4))
require.NoError(t, err)
s2 := NewWakuStore(host2, nil, db, utils.Logger())
s2.Start(ctx)
defer s2.Stop()
q := Query{
Topic: pubsubTopic1,
}
fn := func(msg *pb.WakuMessage) (bool, error) {
return msg.ContentTopic == "hello", nil
}
foundMsg, err := s2.Find(ctx, q, fn, WithAutomaticPeerSelection(), WithAutomaticRequestId(), WithPaging(true, 2))
require.NoError(t, err)
require.NotNil(t, foundMsg)
require.Equal(t, "hello", foundMsg.ContentTopic)
fn2 := func(msg *pb.WakuMessage) (bool, error) {
return msg.ContentTopic == "bye", nil
}
foundMsg, err = s2.Find(ctx, q, fn2, WithAutomaticPeerSelection(), WithAutomaticRequestId(), WithPaging(true, 2))
require.NoError(t, err)
require.Nil(t, foundMsg)
}