initial changes

This commit is contained in:
Gabriel mermelstein 2025-01-10 14:58:43 +01:00
parent d517abc07b
commit 3873229d7c
No known key found for this signature in database
GPG Key ID: 82B8134785FEAE0D
4 changed files with 73 additions and 17 deletions

View File

@ -3,7 +3,6 @@ package common
import ( import (
"encoding/json" "encoding/json"
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/waku-org/go-waku/waku/v2/protocol/pb" "github.com/waku-org/go-waku/waku/v2/protocol/pb"
) )
@ -13,13 +12,13 @@ import (
type Envelope interface { type Envelope interface {
Message() *pb.WakuMessage Message() *pb.WakuMessage
PubsubTopic() string PubsubTopic() string
Hash() pb.MessageHash Hash() MessageHash
} }
type envelopeImpl struct { type envelopeImpl struct {
msg *pb.WakuMessage msg *pb.WakuMessage
topic string topic string
hash pb.MessageHash hash MessageHash
} }
type tmpWakuMessageJson struct { type tmpWakuMessageJson struct {
@ -35,7 +34,7 @@ type tmpWakuMessageJson struct {
type tmpEnvelopeStruct struct { type tmpEnvelopeStruct struct {
WakuMessage tmpWakuMessageJson `json:"wakuMessage"` WakuMessage tmpWakuMessageJson `json:"wakuMessage"`
PubsubTopic string `json:"pubsubTopic"` PubsubTopic string `json:"pubsubTopic"`
MessageHash string `json:"messageHash"` MessageHash MessageHash `json:"messageHash"`
} }
// NewEnvelope creates a new Envelope from a json string generated in nwaku // NewEnvelope creates a new Envelope from a json string generated in nwaku
@ -46,7 +45,6 @@ func NewEnvelope(jsonEventStr string) (Envelope, error) {
return nil, err return nil, err
} }
hash, err := hexutil.Decode(tmpEnvelopeStruct.MessageHash)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -62,7 +60,7 @@ func NewEnvelope(jsonEventStr string) (Envelope, error) {
RateLimitProof: tmpEnvelopeStruct.WakuMessage.RateLimitProof, RateLimitProof: tmpEnvelopeStruct.WakuMessage.RateLimitProof,
}, },
topic: tmpEnvelopeStruct.PubsubTopic, topic: tmpEnvelopeStruct.PubsubTopic,
hash: pb.ToMessageHash(hash), hash: tmpEnvelopeStruct.MessageHash,
}, nil }, nil
} }
@ -74,6 +72,6 @@ func (e *envelopeImpl) PubsubTopic() string {
return e.topic return e.topic
} }
func (e *envelopeImpl) Hash() pb.MessageHash { func (e *envelopeImpl) Hash() MessageHash {
return e.hash return e.hash
} }

View File

@ -0,0 +1,38 @@
package common
import (
"encoding/hex"
"fmt"
)
// MessageHash represents an unique identifier for a message within a pubsub topic
type MessageHash string
func ToMessageHash(val string) (MessageHash, error) {
if len(val) == 0 {
return "", fmt.Errorf("empty string not allowed")
}
if len(val) < 2 || val[:2] != "0x" {
return "", fmt.Errorf("string must start with 0x")
}
// Remove "0x" prefix for hex decoding
hexStr := val[2:]
// Verify the remaining string is valid hex
_, err := hex.DecodeString(hexStr)
if err != nil {
return "", fmt.Errorf("invalid hex string: %v", err)
}
return MessageHash(val), nil
}
func (h MessageHash) String() string {
return string(h)
}
func (h MessageHash) Bytes() ([]byte, error) {
return hex.DecodeString(string(h))
}

22
waku/common/store.go Normal file
View File

@ -0,0 +1,22 @@
package common
type StoreQueryRequest struct {
RequestId string `json:"requestId,omitempty"`
IncludeData bool `json:"includeData,omitempty"`
PubsubTopic *string `json:"pubsubTopic,omitempty"`
ContentTopics []string `json:"contentTopics,omitempty"`
TimeStart *int64 `json:"timeStart,omitempty"`
TimeEnd *int64 `json:"timeEnd,omitempty"`
MessageHashes []MessageHash `json:"messageHashes,omitempty"`
PaginationCursor []MessageHash `json:"paginationCursor,omitempty"`
PaginationForward bool `json:"paginationForward,omitempty"`
PaginationLimit *uint64 `json:"paginationLimit,omitempty"`
}
type StoreQueryResponse struct {
RequestId string `json:"request_id,omitempty"`
StatusCode *uint32 `json:"status_code,omitempty"`
StatusDesc *string `json:"status_desc,omitempty"`
Messages []*Envelope `json:"messages,omitempty"`
PaginationCursor []MessageHash `json:"pagination_cursor,omitempty"`
}

View File

@ -323,7 +323,6 @@ import (
"time" "time"
"unsafe" "unsafe"
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/p2p/enode" "github.com/ethereum/go-ethereum/p2p/enode"
@ -331,7 +330,6 @@ import (
libp2pproto "github.com/libp2p/go-libp2p/core/protocol" libp2pproto "github.com/libp2p/go-libp2p/core/protocol"
"github.com/multiformats/go-multiaddr" "github.com/multiformats/go-multiaddr"
"github.com/waku-org/go-waku/waku/v2/protocol/pb" "github.com/waku-org/go-waku/waku/v2/protocol/pb"
storepb "github.com/waku-org/go-waku/waku/v2/protocol/store/pb"
"github.com/waku-org/go-waku/waku/v2/utils" "github.com/waku-org/go-waku/waku/v2/utils"
"github.com/waku-org/waku-go-bindings/waku/common" "github.com/waku-org/waku-go-bindings/waku/common"
"go.uber.org/zap" "go.uber.org/zap"
@ -827,7 +825,7 @@ func (n *WakuNode) Version() (string, error) {
return "", errors.New(errMsg) return "", errors.New(errMsg)
} }
func (n *WakuNode) StoreQuery(ctx context.Context, storeRequest *storepb.StoreQueryRequest, peerInfo peer.AddrInfo) (*storepb.StoreQueryResponse, error) { func (n *WakuNode) StoreQuery(ctx context.Context, storeRequest *common.StoreQueryRequest, peerInfo peer.AddrInfo) (*common.StoreQueryResponse, error) {
timeoutMs := getContextTimeoutMilliseconds(ctx) timeoutMs := getContextTimeoutMilliseconds(ctx)
b, err := json.Marshal(storeRequest) b, err := json.Marshal(storeRequest)
@ -856,7 +854,7 @@ func (n *WakuNode) StoreQuery(ctx context.Context, storeRequest *storepb.StoreQu
if C.getRet(resp) == C.RET_OK { if C.getRet(resp) == C.RET_OK {
jsonResponseStr := C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) jsonResponseStr := C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp)))
storeQueryResponse := &storepb.StoreQueryResponse{} storeQueryResponse := &common.StoreQueryResponse{}
err = json.Unmarshal([]byte(jsonResponseStr), storeQueryResponse) err = json.Unmarshal([]byte(jsonResponseStr), storeQueryResponse)
if err != nil { if err != nil {
return nil, err return nil, err
@ -868,12 +866,12 @@ func (n *WakuNode) StoreQuery(ctx context.Context, storeRequest *storepb.StoreQu
return nil, errors.New(errMsg) return nil, errors.New(errMsg)
} }
func (n *WakuNode) RelayPublish(ctx context.Context, message *pb.WakuMessage, pubsubTopic string) (pb.MessageHash, error) { func (n *WakuNode) RelayPublish(ctx context.Context, message *pb.WakuMessage, pubsubTopic string) (common.MessageHash, error) {
timeoutMs := getContextTimeoutMilliseconds(ctx) timeoutMs := getContextTimeoutMilliseconds(ctx)
jsonMsg, err := json.Marshal(message) jsonMsg, err := json.Marshal(message)
if err != nil { if err != nil {
return pb.MessageHash{}, err return common.MessageHash(""), err
} }
wg := sync.WaitGroup{} wg := sync.WaitGroup{}
@ -890,14 +888,14 @@ func (n *WakuNode) RelayPublish(ctx context.Context, message *pb.WakuMessage, pu
wg.Wait() wg.Wait()
if C.getRet(resp) == C.RET_OK { if C.getRet(resp) == C.RET_OK {
msgHash := C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) msgHash := C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp)))
msgHashBytes, err := hexutil.Decode(msgHash) parsedMsgHash, err := common.ToMessageHash(msgHash)
if err != nil { if err != nil {
return pb.MessageHash{}, err return common.MessageHash(""), err
} }
return pb.ToMessageHash(msgHashBytes), nil return parsedMsgHash, nil
} }
errMsg := "WakuRelayPublish: " + C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) errMsg := "WakuRelayPublish: " + C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp)))
return pb.MessageHash{}, errors.New(errMsg) return common.MessageHash(""), errors.New(errMsg)
} }
func (n *WakuNode) DnsDiscovery(ctx context.Context, enrTreeUrl string, nameDnsServer string) ([]multiaddr.Multiaddr, error) { func (n *WakuNode) DnsDiscovery(ctx context.Context, enrTreeUrl string, nameDnsServer string) ([]multiaddr.Multiaddr, error) {