checkpoint

This commit is contained in:
Gabriel mermelstein 2025-01-15 18:11:52 +01:00
parent 1222d12907
commit fc57ed9cd9
No known key found for this signature in database
GPG Key ID: 82B8134785FEAE0D
3 changed files with 34 additions and 22 deletions

View File

@ -1,22 +1,28 @@
package common package common
type StoreQueryRequest struct { type StoreQueryRequest struct {
RequestId string `json:"requestId,omitempty"` RequestId string `json:"request_id"`
IncludeData bool `json:"includeData,omitempty"` IncludeData bool `json:"include_data"`
PubsubTopic *string `json:"pubsubTopic,omitempty"` PubsubTopic *string `json:"pubsub_topic,omitempty"`
ContentTopics []string `json:"contentTopics,omitempty"` ContentTopics []string `json:"content_topics,omitempty"`
TimeStart *int64 `json:"timeStart,omitempty"` TimeStart *int64 `json:"time_start,omitempty"`
TimeEnd *int64 `json:"timeEnd,omitempty"` TimeEnd *int64 `json:"time_end,omitempty"`
MessageHashes []MessageHash `json:"messageHashes,omitempty"` MessageHashes []MessageHash `json:"message_hashes,omitempty"`
PaginationCursor []MessageHash `json:"paginationCursor,omitempty"` PaginationCursor MessageHash `json:"pagination_cursor,omitempty"`
PaginationForward bool `json:"paginationForward,omitempty"` PaginationForward bool `json:"pagination_forward"`
PaginationLimit *uint64 `json:"paginationLimit,omitempty"` PaginationLimit *uint64 `json:"pagination_limit,omitempty"`
}
type storeMessageResponse struct {
WakuMessage tmpWakuMessageJson `json:"message"`
PubsubTopic string `json:"pubsubTopic"`
MessageHash MessageHash `json:"messageHash"`
} }
type StoreQueryResponse struct { type StoreQueryResponse struct {
RequestId string `json:"request_id,omitempty"` RequestId string `json:"requestId,omitempty"`
StatusCode *uint32 `json:"status_code,omitempty"` StatusCode *uint32 `json:"statusCode,omitempty"`
StatusDesc *string `json:"status_desc,omitempty"` StatusDesc string `json:"statusDesc,omitempty"`
Messages []*Envelope `json:"messages,omitempty"` Messages []storeMessageResponse `json:"messages,omitempty"`
PaginationCursor []MessageHash `json:"pagination_cursor,omitempty"` PaginationCursor MessageHash `json:"paginationCursor,omitempty"`
} }

View File

@ -345,6 +345,7 @@ type WakuConfig struct {
Nodekey string `json:"nodekey,omitempty"` Nodekey string `json:"nodekey,omitempty"`
Relay bool `json:"relay,omitempty"` Relay bool `json:"relay,omitempty"`
Store bool `json:"store,omitempty"` Store bool `json:"store,omitempty"`
LegacyStore bool `json:"legacyStore"`
Storenode string `json:"storenode,omitempty"` Storenode string `json:"storenode,omitempty"`
StoreMessageRetentionPolicy string `json:"storeMessageRetentionPolicy,omitempty"` StoreMessageRetentionPolicy string `json:"storeMessageRetentionPolicy,omitempty"`
StoreMessageDbUrl string `json:"storeMessageDbUrl,omitempty"` StoreMessageDbUrl string `json:"storeMessageDbUrl,omitempty"`
@ -828,11 +829,13 @@ func (n *WakuNode) Version() (string, error) {
func (n *WakuNode) StoreQuery(ctx context.Context, storeRequest *common.StoreQueryRequest, peerInfo peer.AddrInfo) (*common.StoreQueryResponse, error) { func (n *WakuNode) StoreQuery(ctx context.Context, storeRequest *common.StoreQueryRequest, peerInfo peer.AddrInfo) (*common.StoreQueryResponse, error) {
timeoutMs := getContextTimeoutMilliseconds(ctx) timeoutMs := getContextTimeoutMilliseconds(ctx)
fmt.Println("---------- StoreQuery 1 ---------")
b, err := json.Marshal(storeRequest) b, err := json.Marshal(storeRequest)
if err != nil { if err != nil {
return nil, err return nil, err
} }
fmt.Println("---------- StoreQuery 2 ---------")
addrs := make([]string, len(peerInfo.Addrs)) addrs := make([]string, len(peerInfo.Addrs))
for i, addr := range utils.EncapsulatePeerID(peerInfo.ID, peerInfo.Addrs...) { for i, addr := range utils.EncapsulatePeerID(peerInfo.ID, peerInfo.Addrs...) {
addrs[i] = addr.String() addrs[i] = addr.String()
@ -854,12 +857,12 @@ func (n *WakuNode) StoreQuery(ctx context.Context, storeRequest *common.StoreQue
if C.getRet(resp) == C.RET_OK { if C.getRet(resp) == C.RET_OK {
jsonResponseStr := C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) jsonResponseStr := C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp)))
storeQueryResponse := &common.StoreQueryResponse{} storeQueryResponse := common.StoreQueryResponse{}
err = json.Unmarshal([]byte(jsonResponseStr), storeQueryResponse) err = json.Unmarshal([]byte(jsonResponseStr), &storeQueryResponse)
if err != nil { if err != nil {
return nil, err return nil, err
} }
return storeQueryResponse, nil return &storeQueryResponse, nil
} }
errMsg := "error WakuStoreQuery: " + errMsg := "error WakuStoreQuery: " +
C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp)))

View File

@ -459,7 +459,7 @@ func TestRelay(t *testing.T) {
Payload: []byte{1, 2, 3, 4, 5, 6}, Payload: []byte{1, 2, 3, 4, 5, 6},
ContentTopic: "test-content-topic", ContentTopic: "test-content-topic",
Version: proto.Uint32(0), Version: proto.Uint32(0),
Timestamp: proto.Int64(time.Now().Unix()), Timestamp: proto.Int64(time.Now().UnixNano()),
} }
// send message // send message
pubsubTopic := FormatWakuRelayTopic(senderNodeWakuConfig.ClusterID, senderNodeWakuConfig.Shards[0]) pubsubTopic := FormatWakuRelayTopic(senderNodeWakuConfig.ClusterID, senderNodeWakuConfig.Shards[0])
@ -650,6 +650,7 @@ func TestHash(t *testing.T) {
Shards: []uint16{64}, Shards: []uint16{64},
Discv5UdpPort: 9070, Discv5UdpPort: 9070,
TcpPort: 60070, TcpPort: 60070,
LegacyStore: false,
} }
fmt.Println("------------ creating node 1") fmt.Println("------------ creating node 1")
@ -669,6 +670,7 @@ func TestHash(t *testing.T) {
Shards: []uint16{64}, Shards: []uint16{64},
Discv5UdpPort: 9071, Discv5UdpPort: 9071,
TcpPort: 60071, TcpPort: 60071,
LegacyStore: false,
} }
receiverNode, err := NewWakuNode(&receiverNodeWakuConfig, logger.Named("receiverNode")) receiverNode, err := NewWakuNode(&receiverNodeWakuConfig, logger.Named("receiverNode"))
require.NoError(t, err) require.NoError(t, err)
@ -696,7 +698,7 @@ func TestHash(t *testing.T) {
Payload: []byte{1, 2, 3, 4, 5, 6}, Payload: []byte{1, 2, 3, 4, 5, 6},
ContentTopic: "test-content-topic", ContentTopic: "test-content-topic",
Version: proto.Uint32(0), Version: proto.Uint32(0),
Timestamp: proto.Int64(time.Now().Unix()), Timestamp: proto.Int64(time.Now().UnixNano()),
} }
// send message // send message
pubsubTopic := FormatWakuRelayTopic(senderNodeWakuConfig.ClusterID, senderNodeWakuConfig.Shards[0]) pubsubTopic := FormatWakuRelayTopic(senderNodeWakuConfig.ClusterID, senderNodeWakuConfig.Shards[0])
@ -719,7 +721,7 @@ func TestHash(t *testing.T) {
t.Fatal("Timeout: No message received within 10 seconds") t.Fatal("Timeout: No message received within 10 seconds")
} }
// No send store query // Now send store query
ctx3, cancel3 := context.WithTimeout(context.Background(), requestTimeout) ctx3, cancel3 := context.WithTimeout(context.Background(), requestTimeout)
defer cancel3() defer cancel3()
@ -728,12 +730,13 @@ func TestHash(t *testing.T) {
ContentTopics: []string{"test-content-topic"}, ContentTopics: []string{"test-content-topic"},
} }
fmt.Println("------------ storeNode multiaddr: ", receiverMultiaddr[0].String())
storeNodeAddrInfo, err := peer.AddrInfoFromString(receiverMultiaddr[0].String()) storeNodeAddrInfo, err := peer.AddrInfoFromString(receiverMultiaddr[0].String())
require.NoError(t, err) require.NoError(t, err)
res, err := senderNode.StoreQuery(ctx3, &storeReq, *storeNodeAddrInfo) res, err := senderNode.StoreQuery(ctx3, &storeReq, *storeNodeAddrInfo)
require.NoError(t, err) require.NoError(t, err)
fmt.Println("----------- res: ", res) fmt.Printf("%+v\n", res)
// Stop nodes // Stop nodes
require.NoError(t, senderNode.Stop()) require.NoError(t, senderNode.Stop())