From dbe152b8e5b6a4b509f50f055eadc57eb8f2ae71 Mon Sep 17 00:00:00 2001 From: Richard Ramos Date: Mon, 14 Nov 2022 10:21:16 -0400 Subject: [PATCH] feat: find by criteria --- waku/v2/protocol/store/waku_store.go | 39 +++++++++++ .../store/waku_store_protocol_test.go | 69 +++++++++++++++++++ 2 files changed, 108 insertions(+) diff --git a/waku/v2/protocol/store/waku_store.go b/waku/v2/protocol/store/waku_store.go index 15a8a811..11f5433a 100644 --- a/waku/v2/protocol/store/waku_store.go +++ b/waku/v2/protocol/store/waku_store.go @@ -169,9 +169,12 @@ type WakuStore struct { swap *swap.WakuSwap } +type criteriaFN = func(msg *pb.WakuMessage) (bool, error) + type Store interface { Start(ctx context.Context) Query(ctx context.Context, query Query, opts ...HistoryRequestOption) (*Result, error) + Find(ctx context.Context, query Query, cb criteriaFN, opts ...HistoryRequestOption) (*pb.WakuMessage, error) Next(ctx context.Context, r *Result) (*Result, error) Resume(ctx context.Context, pubsubTopic string, peerList []peer.ID) (int, error) MessageChannel() chan *protocol.Envelope @@ -499,6 +502,42 @@ func (store *WakuStore) Query(ctx context.Context, query Query, opts ...HistoryR return result, nil } +// Find the first message that matches a criteria. criteriaCB is a function that will be invoked for each message and returns true if the message matches the criteria +func (store *WakuStore) Find(ctx context.Context, query Query, cb criteriaFN, opts ...HistoryRequestOption) (*pb.WakuMessage, error) { + if cb == nil { + return nil, errors.New("callback can't be null") + } + + result, err := store.Query(ctx, query, opts...) + if err != nil { + return nil, err + } + + for { + for _, m := range result.Messages { + found, err := cb(m) + if err != nil { + return nil, err + } + + if found { + return m, nil + } + } + + if result.IsComplete() { + break + } + + result, err = store.Next(ctx, result) + if err != nil { + return nil, err + } + } + + return nil, nil +} + // Next is used with to retrieve the next page of rows from a query response. // If no more records are found, the result will not contain any messages. // This function is useful for iterating over results without having to manually diff --git a/waku/v2/protocol/store/waku_store_protocol_test.go b/waku/v2/protocol/store/waku_store_protocol_test.go index 70b4509e..ed297784 100644 --- a/waku/v2/protocol/store/waku_store_protocol_test.go +++ b/waku/v2/protocol/store/waku_store_protocol_test.go @@ -125,3 +125,72 @@ func TestWakuStoreProtocolNext(t *testing.T) { require.NoError(t, err) require.Len(t, response.Messages, 0) } + +func TestWakuStoreProtocolFind(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + host1, err := libp2p.New(libp2p.DefaultTransports, libp2p.ListenAddrStrings("/ip4/0.0.0.0/tcp/0")) + require.NoError(t, err) + + db := MemoryDB(t) + + s1 := NewWakuStore(host1, nil, db, utils.Logger()) + s1.Start(ctx) + defer s1.Stop() + + topic1 := "1" + pubsubTopic1 := "topic1" + + msg1 := tests.CreateWakuMessage(topic1, 1) + msg2 := tests.CreateWakuMessage(topic1, 2) + msg3 := tests.CreateWakuMessage(topic1, 3) + msg4 := tests.CreateWakuMessage(topic1, 4) + msg5 := tests.CreateWakuMessage(topic1, 5) + msg6 := tests.CreateWakuMessage(topic1, 6) + msg7 := tests.CreateWakuMessage("hello", 7) + msg8 := tests.CreateWakuMessage(topic1, 8) + msg9 := tests.CreateWakuMessage(topic1, 9) + + s1.MsgC <- protocol.NewEnvelope(msg1, utils.GetUnixEpoch(), pubsubTopic1) + s1.MsgC <- protocol.NewEnvelope(msg2, utils.GetUnixEpoch(), pubsubTopic1) + s1.MsgC <- protocol.NewEnvelope(msg3, utils.GetUnixEpoch(), pubsubTopic1) + s1.MsgC <- protocol.NewEnvelope(msg4, utils.GetUnixEpoch(), pubsubTopic1) + s1.MsgC <- protocol.NewEnvelope(msg5, utils.GetUnixEpoch(), pubsubTopic1) + s1.MsgC <- protocol.NewEnvelope(msg6, utils.GetUnixEpoch(), pubsubTopic1) + s1.MsgC <- protocol.NewEnvelope(msg7, utils.GetUnixEpoch(), pubsubTopic1) + s1.MsgC <- protocol.NewEnvelope(msg8, utils.GetUnixEpoch(), pubsubTopic1) + s1.MsgC <- protocol.NewEnvelope(msg9, utils.GetUnixEpoch(), pubsubTopic1) + + host2, err := libp2p.New(libp2p.DefaultTransports, libp2p.ListenAddrStrings("/ip4/0.0.0.0/tcp/0")) + require.NoError(t, err) + + host2.Peerstore().AddAddr(host1.ID(), tests.GetHostAddress(host1), peerstore.PermanentAddrTTL) + err = host2.Peerstore().AddProtocols(host1.ID(), string(StoreID_v20beta4)) + require.NoError(t, err) + + s2 := NewWakuStore(host2, nil, db, utils.Logger()) + s2.Start(ctx) + defer s2.Stop() + + q := Query{ + Topic: pubsubTopic1, + } + + fn := func(msg *pb.WakuMessage) (bool, error) { + return msg.ContentTopic == "hello", nil + } + + foundMsg, err := s2.Find(ctx, q, fn, WithAutomaticPeerSelection(), WithAutomaticRequestId(), WithPaging(true, 2)) + require.NoError(t, err) + require.NotNil(t, foundMsg) + require.Equal(t, "hello", foundMsg.ContentTopic) + + fn2 := func(msg *pb.WakuMessage) (bool, error) { + return msg.ContentTopic == "bye", nil + } + + foundMsg, err = s2.Find(ctx, q, fn2, WithAutomaticPeerSelection(), WithAutomaticRequestId(), WithPaging(true, 2)) + require.NoError(t, err) + require.Nil(t, foundMsg) +}