create envelope struct with hash and size of message

This commit is contained in:
Richard Ramos 2021-03-30 10:13:33 -04:00
parent bc32532401
commit c44b40319e
No known key found for this signature in database
GPG Key ID: 80D4B01265FDFE8F
4 changed files with 52 additions and 27 deletions

View File

@ -29,7 +29,7 @@ func randomHex(n int) (string, error) {
func write(wakuNode *node.WakuNode, msgContent string) { func write(wakuNode *node.WakuNode, msgContent string) {
var contentTopic uint32 = 1735289188 var contentTopic uint32 = 1
var version uint32 = 0 var version uint32 = 0
payload, err := node.Encode([]byte(wakuNode.ID()+" says "+msgContent), &node.KeyInfo{Kind: node.None}, 0) payload, err := node.Encode([]byte(wakuNode.ID()+" says "+msgContent), &node.KeyInfo{Kind: node.None}, 0)
@ -57,7 +57,7 @@ func readLoop(wakuNode *node.WakuNode) {
} }
for value := range sub.C { for value := range sub.C {
payload, err := node.DecodePayload(value, &node.KeyInfo{Kind: node.None}) payload, err := node.DecodePayload(value.Message(), &node.KeyInfo{Kind: node.None})
if err != nil { if err != nil {
fmt.Println(err) fmt.Println(err)
return return
@ -107,8 +107,7 @@ var rootCmd = &cobra.Command{
listen, _ := cmd.Flags().GetBool("listen") listen, _ := cmd.Flags().GetBool("listen")
say, _ := cmd.Flags().GetString("say") say, _ := cmd.Flags().GetString("say")
hostAddr, _ := net.ResolveTCPAddr("tcp", fmt.Sprint("127.0.0.1:", port)) hostAddr, _ := net.ResolveTCPAddr("tcp", fmt.Sprint("0.0.0.0:", port))
extAddr, _ := net.ResolveTCPAddr("tcp", fmt.Sprint("0.0.0.0:", port))
if key == "" { if key == "" {
var err error var err error
@ -122,7 +121,7 @@ var rootCmd = &cobra.Command{
prvKey, err := crypto.HexToECDSA(key) prvKey, err := crypto.HexToECDSA(key)
ctx := context.Background() ctx := context.Background()
wakuNode, err := node.New(ctx, prvKey, hostAddr, extAddr) wakuNode, err := node.New(ctx, prvKey, []net.Addr{hostAddr})
if err != nil { if err != nil {
fmt.Print(err) fmt.Print(err)
return return

29
waku/common/envelope.go Normal file
View File

@ -0,0 +1,29 @@
package common
import "github.com/status-im/go-waku/waku/v2/protocol"
type Envelope struct {
msg *protocol.WakuMessage
size int
hash [32]byte
}
func NewEnvelope(msg *protocol.WakuMessage, size int, hash [32]byte) *Envelope {
return &Envelope{
msg: msg,
size: size,
hash: hash,
}
}
func (e *Envelope) Message() *protocol.WakuMessage {
return e.msg
}
func (e *Envelope) Hash() [32]byte {
return e.hash
}
func (e *Envelope) Size() int {
return e.size
}

View File

@ -3,6 +3,7 @@ package node
import ( import (
"context" "context"
"crypto/ecdsa" "crypto/ecdsa"
"crypto/sha256"
"errors" "errors"
"fmt" "fmt"
"net" "net"
@ -18,6 +19,7 @@ import (
"github.com/libp2p/go-libp2p-core/peer" "github.com/libp2p/go-libp2p-core/peer"
ma "github.com/multiformats/go-multiaddr" ma "github.com/multiformats/go-multiaddr"
manet "github.com/multiformats/go-multiaddr-net" manet "github.com/multiformats/go-multiaddr-net"
common "github.com/status-im/go-waku/waku/common"
"github.com/status-im/go-waku/waku/v2/protocol" "github.com/status-im/go-waku/waku/v2/protocol"
store "github.com/status-im/go-waku/waku/v2/protocol/waku_store" store "github.com/status-im/go-waku/waku/v2/protocol/waku_store"
wakurelay "github.com/status-im/go-wakurelay-pubsub" wakurelay "github.com/status-im/go-wakurelay-pubsub"
@ -46,7 +48,7 @@ type MessagePair struct {
} }
type Subscription struct { type Subscription struct {
C chan *protocol.WakuMessage C chan *common.Envelope
closed bool closed bool
mutex sync.Mutex mutex sync.Mutex
pubSubscription *wakurelay.Subscription pubSubscription *wakurelay.Subscription
@ -69,25 +71,19 @@ type WakuNode struct {
privKey crypto.PrivKey privKey crypto.PrivKey
} }
func New(ctx context.Context, privKey *ecdsa.PrivateKey, hostAddr net.Addr, extAddr net.Addr, opts ...libp2p.Option) (*WakuNode, error) { func New(ctx context.Context, privKey *ecdsa.PrivateKey, hostAddr []net.Addr, opts ...libp2p.Option) (*WakuNode, error) {
// Creates a Waku Node. // Creates a Waku Node.
if hostAddr == nil { if hostAddr == nil {
return nil, errors.New("host address cannot be null") return nil, errors.New("host address cannot be null")
} }
var multiAddresses []ma.Multiaddr var multiAddresses []ma.Multiaddr
hostAddrMA, err := manet.FromNetAddr(hostAddr) for _, addr := range hostAddr {
hostAddrMA, err := manet.FromNetAddr(addr)
if err != nil { if err != nil {
return nil, err return nil, err
} }
multiAddresses = append(multiAddresses, hostAddrMA) multiAddresses = append(multiAddresses, hostAddrMA)
if extAddr != nil {
extAddrMA, err := manet.FromNetAddr(extAddr)
if err != nil {
return nil, err
}
multiAddresses = append(multiAddresses, extAddrMA)
} }
nodeKey := crypto.PrivKey((*crypto.Secp256k1PrivateKey)(privKey)) nodeKey := crypto.PrivKey((*crypto.Secp256k1PrivateKey)(privKey))
@ -154,8 +150,8 @@ func (w *WakuNode) SetPubSub(pubSub *wakurelay.PubSub) {
w.pubsub = pubSub w.pubsub = pubSub
} }
func (w *WakuNode) MountRelay() error { func (w *WakuNode) MountRelay(opts ...wakurelay.Option) error {
ps, err := wakurelay.NewWakuRelaySub(w.ctx, w.host) ps, err := wakurelay.NewWakuRelaySub(w.ctx, w.host, opts...)
if err != nil { if err != nil {
return err return err
} }
@ -247,7 +243,7 @@ func (node *WakuNode) Subscribe(topic *Topic) (*Subscription, error) {
subscription := new(Subscription) subscription := new(Subscription)
subscription.closed = false subscription.closed = false
subscription.pubSubscription = sub subscription.pubSubscription = sub
subscription.C = make(chan *protocol.WakuMessage) subscription.C = make(chan *common.Envelope)
subscription.quit = make(chan struct{}) subscription.quit = make(chan struct{})
go func(ctx context.Context, sub *wakurelay.Subscription) { go func(ctx context.Context, sub *wakurelay.Subscription) {
@ -276,7 +272,8 @@ func (node *WakuNode) Subscribe(topic *Topic) (*Subscription, error) {
return return
} }
subscription.C <- wakuMessage envelope := common.NewEnvelope(wakuMessage, len(msg.Data), sha256.Sum256(msg.Data))
subscription.C <- envelope
} }
} }
}(node.ctx, sub) }(node.ctx, sub)

View File

@ -22,6 +22,7 @@ import (
libp2pProtocol "github.com/libp2p/go-libp2p-core/protocol" libp2pProtocol "github.com/libp2p/go-libp2p-core/protocol"
ma "github.com/multiformats/go-multiaddr" ma "github.com/multiformats/go-multiaddr"
"github.com/status-im/go-waku/waku/common"
"github.com/status-im/go-waku/waku/v2/protocol" "github.com/status-im/go-waku/waku/v2/protocol"
"google.golang.org/protobuf/proto" "google.golang.org/protobuf/proto"
) )
@ -161,7 +162,7 @@ type IndexedWakuMessage struct {
} }
type WakuStore struct { type WakuStore struct {
msg chan *protocol.WakuMessage msg chan *common.Envelope
messages []IndexedWakuMessage messages []IndexedWakuMessage
messagesMutex sync.Mutex messagesMutex sync.Mutex
@ -170,7 +171,7 @@ type WakuStore struct {
ctx context.Context ctx context.Context
} }
func NewWakuStore(ctx context.Context, h host.Host, msg chan *protocol.WakuMessage, p MessageProvider) *WakuStore { func NewWakuStore(ctx context.Context, h host.Host, msg chan *common.Envelope, p MessageProvider) *WakuStore {
wakuStore := new(WakuStore) wakuStore := new(WakuStore)
wakuStore.msg = msg wakuStore.msg = msg
wakuStore.msgProvider = p wakuStore.msgProvider = p
@ -202,23 +203,22 @@ func (store *WakuStore) Start() {
} }
func (store *WakuStore) storeIncomingMessages() { func (store *WakuStore) storeIncomingMessages() {
for message := range store.msg { for envelope := range store.msg {
index, err := computeIndex(envelope.Message())
index, err := computeIndex(message)
if err != nil { if err != nil {
log.Error("could not calculate message index", err) log.Error("could not calculate message index", err)
continue continue
} }
store.messagesMutex.Lock() store.messagesMutex.Lock()
store.messages = append(store.messages, IndexedWakuMessage{msg: message, index: index}) store.messages = append(store.messages, IndexedWakuMessage{msg: envelope.Message(), index: index})
store.messagesMutex.Unlock() store.messagesMutex.Unlock()
if store.msgProvider == nil { if store.msgProvider == nil {
continue continue
} }
err = store.msgProvider.Put(message) // Should the index be stored? err = store.msgProvider.Put(envelope.Message()) // Should the index be stored?
if err != nil { if err != nil {
log.Error("could not store message", err) log.Error("could not store message", err)
continue continue