From c44b40319e9f0e628858cd821f760141da6dc9ad Mon Sep 17 00:00:00 2001 From: Richard Ramos Date: Tue, 30 Mar 2021 10:13:33 -0400 Subject: [PATCH] create envelope struct with hash and size of message --- cmd/root.go | 9 ++++--- waku/common/envelope.go | 29 +++++++++++++++++++++++ waku/v2/node/wakunode2.go | 27 ++++++++++----------- waku/v2/protocol/waku_store/waku_store.go | 14 +++++------ 4 files changed, 52 insertions(+), 27 deletions(-) create mode 100644 waku/common/envelope.go diff --git a/cmd/root.go b/cmd/root.go index 6634fb84..74f437e4 100644 --- a/cmd/root.go +++ b/cmd/root.go @@ -29,7 +29,7 @@ func randomHex(n int) (string, error) { func write(wakuNode *node.WakuNode, msgContent string) { - var contentTopic uint32 = 1735289188 + var contentTopic uint32 = 1 var version uint32 = 0 payload, err := node.Encode([]byte(wakuNode.ID()+" says "+msgContent), &node.KeyInfo{Kind: node.None}, 0) @@ -57,7 +57,7 @@ func readLoop(wakuNode *node.WakuNode) { } for value := range sub.C { - payload, err := node.DecodePayload(value, &node.KeyInfo{Kind: node.None}) + payload, err := node.DecodePayload(value.Message(), &node.KeyInfo{Kind: node.None}) if err != nil { fmt.Println(err) return @@ -107,8 +107,7 @@ var rootCmd = &cobra.Command{ listen, _ := cmd.Flags().GetBool("listen") say, _ := cmd.Flags().GetString("say") - hostAddr, _ := net.ResolveTCPAddr("tcp", fmt.Sprint("127.0.0.1:", port)) - extAddr, _ := net.ResolveTCPAddr("tcp", fmt.Sprint("0.0.0.0:", port)) + hostAddr, _ := net.ResolveTCPAddr("tcp", fmt.Sprint("0.0.0.0:", port)) if key == "" { var err error @@ -122,7 +121,7 @@ var rootCmd = &cobra.Command{ prvKey, err := crypto.HexToECDSA(key) ctx := context.Background() - wakuNode, err := node.New(ctx, prvKey, hostAddr, extAddr) + wakuNode, err := node.New(ctx, prvKey, []net.Addr{hostAddr}) if err != nil { fmt.Print(err) return diff --git a/waku/common/envelope.go b/waku/common/envelope.go new file mode 100644 index 00000000..6b623644 --- /dev/null +++ b/waku/common/envelope.go @@ -0,0 +1,29 @@ +package common + +import "github.com/status-im/go-waku/waku/v2/protocol" + +type Envelope struct { + msg *protocol.WakuMessage + size int + hash [32]byte +} + +func NewEnvelope(msg *protocol.WakuMessage, size int, hash [32]byte) *Envelope { + return &Envelope{ + msg: msg, + size: size, + hash: hash, + } +} + +func (e *Envelope) Message() *protocol.WakuMessage { + return e.msg +} + +func (e *Envelope) Hash() [32]byte { + return e.hash +} + +func (e *Envelope) Size() int { + return e.size +} diff --git a/waku/v2/node/wakunode2.go b/waku/v2/node/wakunode2.go index 6a6ba903..0c60752a 100644 --- a/waku/v2/node/wakunode2.go +++ b/waku/v2/node/wakunode2.go @@ -3,6 +3,7 @@ package node import ( "context" "crypto/ecdsa" + "crypto/sha256" "errors" "fmt" "net" @@ -18,6 +19,7 @@ import ( "github.com/libp2p/go-libp2p-core/peer" ma "github.com/multiformats/go-multiaddr" manet "github.com/multiformats/go-multiaddr-net" + common "github.com/status-im/go-waku/waku/common" "github.com/status-im/go-waku/waku/v2/protocol" store "github.com/status-im/go-waku/waku/v2/protocol/waku_store" wakurelay "github.com/status-im/go-wakurelay-pubsub" @@ -46,7 +48,7 @@ type MessagePair struct { } type Subscription struct { - C chan *protocol.WakuMessage + C chan *common.Envelope closed bool mutex sync.Mutex pubSubscription *wakurelay.Subscription @@ -69,25 +71,19 @@ type WakuNode struct { privKey crypto.PrivKey } -func New(ctx context.Context, privKey *ecdsa.PrivateKey, hostAddr net.Addr, extAddr net.Addr, opts ...libp2p.Option) (*WakuNode, error) { +func New(ctx context.Context, privKey *ecdsa.PrivateKey, hostAddr []net.Addr, opts ...libp2p.Option) (*WakuNode, error) { // Creates a Waku Node. if hostAddr == nil { return nil, errors.New("host address cannot be null") } var multiAddresses []ma.Multiaddr - hostAddrMA, err := manet.FromNetAddr(hostAddr) - if err != nil { - return nil, err - } - multiAddresses = append(multiAddresses, hostAddrMA) - - if extAddr != nil { - extAddrMA, err := manet.FromNetAddr(extAddr) + for _, addr := range hostAddr { + hostAddrMA, err := manet.FromNetAddr(addr) if err != nil { return nil, err } - multiAddresses = append(multiAddresses, extAddrMA) + multiAddresses = append(multiAddresses, hostAddrMA) } nodeKey := crypto.PrivKey((*crypto.Secp256k1PrivateKey)(privKey)) @@ -154,8 +150,8 @@ func (w *WakuNode) SetPubSub(pubSub *wakurelay.PubSub) { w.pubsub = pubSub } -func (w *WakuNode) MountRelay() error { - ps, err := wakurelay.NewWakuRelaySub(w.ctx, w.host) +func (w *WakuNode) MountRelay(opts ...wakurelay.Option) error { + ps, err := wakurelay.NewWakuRelaySub(w.ctx, w.host, opts...) if err != nil { return err } @@ -247,7 +243,7 @@ func (node *WakuNode) Subscribe(topic *Topic) (*Subscription, error) { subscription := new(Subscription) subscription.closed = false subscription.pubSubscription = sub - subscription.C = make(chan *protocol.WakuMessage) + subscription.C = make(chan *common.Envelope) subscription.quit = make(chan struct{}) go func(ctx context.Context, sub *wakurelay.Subscription) { @@ -276,7 +272,8 @@ func (node *WakuNode) Subscribe(topic *Topic) (*Subscription, error) { return } - subscription.C <- wakuMessage + envelope := common.NewEnvelope(wakuMessage, len(msg.Data), sha256.Sum256(msg.Data)) + subscription.C <- envelope } } }(node.ctx, sub) diff --git a/waku/v2/protocol/waku_store/waku_store.go b/waku/v2/protocol/waku_store/waku_store.go index f6b2ced3..d0c5f6be 100644 --- a/waku/v2/protocol/waku_store/waku_store.go +++ b/waku/v2/protocol/waku_store/waku_store.go @@ -22,6 +22,7 @@ import ( libp2pProtocol "github.com/libp2p/go-libp2p-core/protocol" ma "github.com/multiformats/go-multiaddr" + "github.com/status-im/go-waku/waku/common" "github.com/status-im/go-waku/waku/v2/protocol" "google.golang.org/protobuf/proto" ) @@ -161,7 +162,7 @@ type IndexedWakuMessage struct { } type WakuStore struct { - msg chan *protocol.WakuMessage + msg chan *common.Envelope messages []IndexedWakuMessage messagesMutex sync.Mutex @@ -170,7 +171,7 @@ type WakuStore struct { ctx context.Context } -func NewWakuStore(ctx context.Context, h host.Host, msg chan *protocol.WakuMessage, p MessageProvider) *WakuStore { +func NewWakuStore(ctx context.Context, h host.Host, msg chan *common.Envelope, p MessageProvider) *WakuStore { wakuStore := new(WakuStore) wakuStore.msg = msg wakuStore.msgProvider = p @@ -202,23 +203,22 @@ func (store *WakuStore) Start() { } func (store *WakuStore) storeIncomingMessages() { - for message := range store.msg { - - index, err := computeIndex(message) + for envelope := range store.msg { + index, err := computeIndex(envelope.Message()) if err != nil { log.Error("could not calculate message index", err) continue } store.messagesMutex.Lock() - store.messages = append(store.messages, IndexedWakuMessage{msg: message, index: index}) + store.messages = append(store.messages, IndexedWakuMessage{msg: envelope.Message(), index: index}) store.messagesMutex.Unlock() if store.msgProvider == nil { continue } - err = store.msgProvider.Put(message) // Should the index be stored? + err = store.msgProvider.Put(envelope.Message()) // Should the index be stored? if err != nil { log.Error("could not store message", err) continue