From c1bcba756c8a781c058b6f3f05defa88c425588b Mon Sep 17 00:00:00 2001 From: Richard Ramos Date: Mon, 22 Mar 2021 12:45:13 -0400 Subject: [PATCH] add logs --- cmd/root.go | 33 ++++++++-- go.mod | 2 + go.sum | 2 + waku.go | 11 +++- waku/v2/node/wakunode2.go | 78 +++++++++++++---------- waku/v2/protocol/waku_message.pb.go | 24 +++++-- waku/v2/protocol/waku_message.proto | 3 +- waku/v2/protocol/waku_relay.go | 13 +++- waku/v2/protocol/waku_store/waku_store.go | 76 ++++++++++++++-------- 9 files changed, 163 insertions(+), 79 deletions(-) diff --git a/cmd/root.go b/cmd/root.go index d810978c..28a48734 100644 --- a/cmd/root.go +++ b/cmd/root.go @@ -15,6 +15,7 @@ import ( "github.com/spf13/viper" "github.com/status-im/go-waku/waku/v2/node" "github.com/status-im/go-waku/waku/v2/protocol" + store "github.com/status-im/go-waku/waku/v2/protocol/waku_store" ethNodeCrypto "github.com/status-im/status-go/eth-node/crypto" ) @@ -27,7 +28,8 @@ func randomHex(n int) (string, error) { } func write(wakuNode *node.WakuNode, msgContent string) { - var contentTopic uint32 = 1 + + var contentTopic uint32 = 1735289188 var version uint32 = 0 payload, err := node.Encode([]byte(wakuNode.ID()+" says "+msgContent), &node.KeyInfo{Kind: node.None}, 0) @@ -43,7 +45,7 @@ func write(wakuNode *node.WakuNode, msgContent string) { func writeLoop(wakuNode *node.WakuNode) { for { time.Sleep(2 * time.Second) - write(wakuNode, "Hey!") + write(wakuNode, fmt.Sprint("Hey - ", time.Now().Unix())) } } @@ -64,6 +66,27 @@ func readLoop(wakuNode *node.WakuNode) { } } +type DBStore struct { + store.MessageProvider +} + +func (dbStore *DBStore) Put(message *protocol.WakuMessage) error { + fmt.Println("TODO: Implement MessageProvider.Put") + return nil +} + +func (dbStore *DBStore) GetAll() ([]*protocol.WakuMessage, error) { + fmt.Println("TODO: Implement MessageProvider.GetAll. Returning a sample message") + exampleMessage := new(protocol.WakuMessage) + var contentTopic uint32 = 1 + var version uint32 = 0 + exampleMessage.ContentTopic = &contentTopic + exampleMessage.Payload = []byte("Hello!") + exampleMessage.Version = &version + + return []*protocol.WakuMessage{exampleMessage}, nil +} + var rootCmd = &cobra.Command{ Use: "waku", Short: "Start a waku node", @@ -110,7 +133,7 @@ var rootCmd = &cobra.Command{ } if store { - wakuNode.MountStore() + wakuNode.MountStore(new(DBStore)) } if startStore { @@ -177,9 +200,7 @@ var rootCmd = &cobra.Command{ fmt.Println("\n\n\nReceived signal, shutting down...") // shut the node down - if err := wakuNode.Stop(); err != nil { - panic(err) - } + wakuNode.Stop() }, } diff --git a/go.mod b/go.mod index 01bf254a..d362784b 100644 --- a/go.mod +++ b/go.mod @@ -9,8 +9,10 @@ require ( github.com/ethereum/go-ethereum v1.10.1 github.com/gogo/protobuf v1.3.1 github.com/golang/protobuf v1.4.3 + github.com/ipfs/go-log v1.0.4 github.com/ipfs/go-log/v2 v2.1.1 github.com/libp2p/go-libp2p v0.13.0 + github.com/libp2p/go-libp2p-connmgr v0.2.4 github.com/libp2p/go-libp2p-core v0.8.5 github.com/libp2p/go-libp2p-peerstore v0.2.6 github.com/libp2p/go-libp2p-pubsub v0.4.1 diff --git a/go.sum b/go.sum index c422e045..2b2cd458 100644 --- a/go.sum +++ b/go.sum @@ -67,6 +67,7 @@ github.com/armon/go-metrics v0.0.0-20180917152333-f0300d1749da/go.mod h1:Q73ZrmV github.com/armon/go-radix v0.0.0-20180808171621-7fddfc383310/go.mod h1:ufUuZ+zHj4x4TnLV4JWEpy2hxWSpsRywHrMgIH9cCH8= github.com/aws/aws-sdk-go v1.25.48/go.mod h1:KmX6BPdI08NWTb3/sm4ZGu5ShLoqVDhKgpiN924inxo= github.com/benbjohnson/clock v1.0.2/go.mod h1:bGMdMPoPVvcYyt1gHDf4J2KE153Yf9BuiUKYMaxlTDM= +github.com/benbjohnson/clock v1.0.3 h1:vkLuvpK4fmtSCuo60+yC63p7y0BmQ8gm5ZXGuBCJyXg= github.com/benbjohnson/clock v1.0.3/go.mod h1:bGMdMPoPVvcYyt1gHDf4J2KE153Yf9BuiUKYMaxlTDM= github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q= github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+CedLV8= @@ -434,6 +435,7 @@ github.com/libp2p/go-libp2p-circuit v0.1.4/go.mod h1:CY67BrEjKNDhdTk8UgBX1Y/H5c3 github.com/libp2p/go-libp2p-circuit v0.2.1/go.mod h1:BXPwYDN5A8z4OEY9sOfr2DUQMLQvKt/6oku45YUmjIo= github.com/libp2p/go-libp2p-circuit v0.4.0 h1:eqQ3sEYkGTtybWgr6JLqJY6QLtPWRErvFjFDfAOO1wc= github.com/libp2p/go-libp2p-circuit v0.4.0/go.mod h1:t/ktoFIUzM6uLQ+o1G6NuBl2ANhBKN9Bc8jRIk31MoA= +github.com/libp2p/go-libp2p-connmgr v0.2.4 h1:TMS0vc0TCBomtQJyWr7fYxcVYYhx+q/2gF++G5Jkl/w= github.com/libp2p/go-libp2p-connmgr v0.2.4/go.mod h1:YV0b/RIm8NGPnnNWM7hG9Q38OeQiQfKhHCCs1++ufn0= github.com/libp2p/go-libp2p-core v0.0.1/go.mod h1:g/VxnTZ/1ygHxH3dKok7Vno1VfpvGcGip57wjTU4fco= github.com/libp2p/go-libp2p-core v0.0.4/go.mod h1:jyuCQP356gzfCFtRKyvAbNkyeuxb7OlyhWZ3nls5d2I= diff --git a/waku.go b/waku.go index 260a8328..792aa64f 100644 --- a/waku.go +++ b/waku.go @@ -1,7 +1,16 @@ package main -import "github.com/status-im/go-waku/cmd" +import ( + logging "github.com/ipfs/go-log" + "github.com/status-im/go-waku/cmd" +) func main() { + lvl, err := logging.LevelFromString("info") + if err != nil { + panic(err) + } + logging.SetAllLoggers(lvl) + cmd.Execute() } diff --git a/waku/v2/node/wakunode2.go b/waku/v2/node/wakunode2.go index 633b6d4b..14223dc4 100644 --- a/waku/v2/node/wakunode2.go +++ b/waku/v2/node/wakunode2.go @@ -5,24 +5,26 @@ import ( "crypto/ecdsa" "errors" "fmt" - "log" "net" "sync" "time" proto "github.com/golang/protobuf/proto" + logging "github.com/ipfs/go-log" "github.com/libp2p/go-libp2p" + connmgr "github.com/libp2p/go-libp2p-connmgr" "github.com/libp2p/go-libp2p-core/crypto" "github.com/libp2p/go-libp2p-core/host" "github.com/libp2p/go-libp2p-core/peer" pubsub "github.com/libp2p/go-libp2p-pubsub" ma "github.com/multiformats/go-multiaddr" manet "github.com/multiformats/go-multiaddr-net" - "github.com/status-im/go-waku/waku/v2/protocol" store "github.com/status-im/go-waku/waku/v2/protocol/waku_store" ) +var log = logging.Logger("wakunode") + // Default clientId const clientId string = "Go Waku v2 node" @@ -52,28 +54,25 @@ type Subscription struct { } type WakuNode struct { - // peerManager *PeerManager host host.Host pubsub *pubsub.PubSub store *store.WakuStore - topics map[Topic]*pubsub.Topic - topicsLock sync.Mutex + topics map[Topic]*pubsub.Topic + topicsMutex sync.Mutex - // peerInfo peer.AddrInfo - // TODO Revisit messages field indexing as well as if this should be Message or WakuMessage - // messages []MessagePair + subscriptions []*Subscription + subscriptionsMutex sync.Mutex - subscriptions protocol.MessageNotificationSubscriptions - ctx context.Context - cancel context.CancelFunc - privKey crypto.PrivKey + ctx context.Context + cancel context.CancelFunc + privKey crypto.PrivKey } -func New(ctx context.Context, privKey *ecdsa.PrivateKey, hostAddr net.Addr, extAddr net.Addr) (*WakuNode, error) { +func New(ctx context.Context, privKey *ecdsa.PrivateKey, hostAddr net.Addr, extAddr net.Addr, opts ...libp2p.Option) (*WakuNode, error) { // Creates a Waku Node. if hostAddr == nil { - return nil, errors.New("Host address cannot be null") + return nil, errors.New("host address cannot be null") } var multiAddresses []ma.Multiaddr @@ -93,14 +92,14 @@ func New(ctx context.Context, privKey *ecdsa.PrivateKey, hostAddr net.Addr, extA nodeKey := crypto.PrivKey((*crypto.Secp256k1PrivateKey)(privKey)) - opts := []libp2p.Option{ + opts = append(opts, libp2p.ListenAddrs(multiAddresses...), libp2p.Identity(nodeKey), - libp2p.DefaultTransports, // - libp2p.NATPortMap(), // Attempt to open ports using uPNP for NATed hosts. - //libp2p.DisableRelay(), // TODO: is this needed? - //libp2p.EnableNATService(), // TODO: is this needed? - } + libp2p.DefaultTransports, + libp2p.NATPortMap(), // Attempt to open ports using uPNP for NATed hosts. + libp2p.EnableNATService(), // TODO: is this needed?) + libp2p.ConnectionManager(connmgr.NewConnManager(200, 300, 0)), // ? + ) ctx, cancel := context.WithCancel(ctx) _ = cancel @@ -111,7 +110,6 @@ func New(ctx context.Context, privKey *ecdsa.PrivateKey, hostAddr net.Addr, extA } w := new(WakuNode) - //w.peerManager = NewPeerManager(host) w.pubsub = nil w.host = host w.cancel = cancel @@ -122,16 +120,22 @@ func New(ctx context.Context, privKey *ecdsa.PrivateKey, hostAddr net.Addr, extA hostInfo, _ := ma.NewMultiaddr(fmt.Sprintf("/p2p/%s", host.ID().Pretty())) for _, addr := range host.Addrs() { fullAddr := addr.Encapsulate(hostInfo) - log.Printf("Listening on %s\n", fullAddr) + log.Info("Listening on", fullAddr) } return w, nil } -func (w *WakuNode) Stop() error { +func (w *WakuNode) Stop() { + w.subscriptionsMutex.Lock() + defer w.subscriptionsMutex.Unlock() defer w.cancel() - // TODO: Is it necessary to stop WakuRelaySubRouter? - return nil + + for _, sub := range w.subscriptions { + sub.Unsubscribe() + } + + w.subscriptions = nil } func (w *WakuNode) Host() host.Host { @@ -163,14 +167,12 @@ func (w *WakuNode) MountRelay() error { return nil } -func (w *WakuNode) MountStore() error { +func (w *WakuNode) MountStore(s store.MessageProvider) error { sub, err := w.Subscribe(nil) if err != nil { return err } - - // TODO: kill subscription on close - w.store = store.NewWakuStore(w.ctx, w.host, sub.C, new(store.MessageProvider)) // TODO: pass store + w.store = store.NewWakuStore(w.ctx, w.host, sub.C, s) return nil } @@ -263,14 +265,14 @@ func (node *WakuNode) Subscribe(topic *Topic) (*Subscription, error) { case <-nextMsgTicker.C: msg, err := sub.Next(ctx) if err != nil { - fmt.Println("Error receiving message", err) + log.Error("error receiving message", err) close(subscription.quit) return } wakuMessage := &protocol.WakuMessage{} if err := proto.Unmarshal(msg.Data, wakuMessage); err != nil { - fmt.Println("Error decoding WakuMessage: ", err) // TODO: use log lib + log.Error("could not decode message", err) // TODO: use log lib return } @@ -279,6 +281,11 @@ func (node *WakuNode) Subscribe(topic *Topic) (*Subscription, error) { } }(node.ctx, sub) + node.subscriptionsMutex.Lock() + defer node.subscriptionsMutex.Unlock() + + node.subscriptions = append(node.subscriptions, subscription) + return subscription, nil } @@ -298,8 +305,8 @@ func (subs *Subscription) IsClosed() bool { } func (node *WakuNode) upsertTopic(topic *Topic) (*pubsub.Topic, error) { - defer node.topicsLock.Unlock() - node.topicsLock.Lock() + defer node.topicsMutex.Unlock() + node.topicsMutex.Lock() var t Topic = DefaultWakuTopic if topic != nil { @@ -328,10 +335,11 @@ func (node *WakuNode) Publish(message *protocol.WakuMessage, topic *Topic) error } if message == nil { - return errors.New("Message can't be null") + return errors.New("message can't be null") } pubSubTopic, err := node.upsertTopic(topic) + if err != nil { return err } @@ -342,6 +350,7 @@ func (node *WakuNode) Publish(message *protocol.WakuMessage, topic *Topic) error } err = pubSubTopic.Publish(node.ctx, out) + if err != nil { return err } @@ -350,7 +359,6 @@ func (node *WakuNode) Publish(message *protocol.WakuMessage, topic *Topic) error } func (w *WakuNode) DialPeer(address string) error { - p, err := ma.NewMultiaddr(address) if err != nil { return err diff --git a/waku/v2/protocol/waku_message.pb.go b/waku/v2/protocol/waku_message.pb.go index 44617458..7b966641 100644 --- a/waku/v2/protocol/waku_message.pb.go +++ b/waku/v2/protocol/waku_message.pb.go @@ -26,8 +26,9 @@ type WakuMessage struct { unknownFields protoimpl.UnknownFields Payload []byte `protobuf:"bytes,1,opt,name=payload,proto3,oneof" json:"payload,omitempty"` - ContentTopic *uint32 `protobuf:"fixed32,2,opt,name=contentTopic,proto3,oneof" json:"contentTopic,omitempty"` + ContentTopic *uint32 `protobuf:"varint,2,opt,name=contentTopic,proto3,oneof" json:"contentTopic,omitempty"` Version *uint32 `protobuf:"varint,3,opt,name=version,proto3,oneof" json:"version,omitempty"` + Proof []byte `protobuf:"bytes,4,opt,name=proof,proto3,oneof" json:"proof,omitempty"` } func (x *WakuMessage) Reset() { @@ -83,22 +84,31 @@ func (x *WakuMessage) GetVersion() uint32 { return 0 } +func (x *WakuMessage) GetProof() []byte { + if x != nil { + return x.Proof + } + return nil +} + var File_waku_message_proto protoreflect.FileDescriptor var file_waku_message_proto_rawDesc = []byte{ 0x0a, 0x12, 0x77, 0x61, 0x6b, 0x75, 0x5f, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x2e, 0x70, - 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x08, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x63, 0x6f, 0x6c, 0x22, 0x9d, + 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x08, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x63, 0x6f, 0x6c, 0x22, 0xc2, 0x01, 0x0a, 0x0b, 0x57, 0x61, 0x6b, 0x75, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x12, 0x1d, 0x0a, 0x07, 0x70, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0c, 0x48, 0x00, 0x52, 0x07, 0x70, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x88, 0x01, 0x01, 0x12, 0x27, 0x0a, 0x0c, 0x63, 0x6f, 0x6e, 0x74, 0x65, 0x6e, 0x74, 0x54, 0x6f, 0x70, 0x69, 0x63, 0x18, 0x02, 0x20, - 0x01, 0x28, 0x07, 0x48, 0x01, 0x52, 0x0c, 0x63, 0x6f, 0x6e, 0x74, 0x65, 0x6e, 0x74, 0x54, 0x6f, + 0x01, 0x28, 0x0d, 0x48, 0x01, 0x52, 0x0c, 0x63, 0x6f, 0x6e, 0x74, 0x65, 0x6e, 0x74, 0x54, 0x6f, 0x70, 0x69, 0x63, 0x88, 0x01, 0x01, 0x12, 0x1d, 0x0a, 0x07, 0x76, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0d, 0x48, 0x02, 0x52, 0x07, 0x76, 0x65, 0x72, 0x73, 0x69, - 0x6f, 0x6e, 0x88, 0x01, 0x01, 0x42, 0x0a, 0x0a, 0x08, 0x5f, 0x70, 0x61, 0x79, 0x6c, 0x6f, 0x61, - 0x64, 0x42, 0x0f, 0x0a, 0x0d, 0x5f, 0x63, 0x6f, 0x6e, 0x74, 0x65, 0x6e, 0x74, 0x54, 0x6f, 0x70, - 0x69, 0x63, 0x42, 0x0a, 0x0a, 0x08, 0x5f, 0x76, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x62, 0x06, - 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, + 0x6f, 0x6e, 0x88, 0x01, 0x01, 0x12, 0x19, 0x0a, 0x05, 0x70, 0x72, 0x6f, 0x6f, 0x66, 0x18, 0x04, + 0x20, 0x01, 0x28, 0x0c, 0x48, 0x03, 0x52, 0x05, 0x70, 0x72, 0x6f, 0x6f, 0x66, 0x88, 0x01, 0x01, + 0x42, 0x0a, 0x0a, 0x08, 0x5f, 0x70, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x42, 0x0f, 0x0a, 0x0d, + 0x5f, 0x63, 0x6f, 0x6e, 0x74, 0x65, 0x6e, 0x74, 0x54, 0x6f, 0x70, 0x69, 0x63, 0x42, 0x0a, 0x0a, + 0x08, 0x5f, 0x76, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x42, 0x08, 0x0a, 0x06, 0x5f, 0x70, 0x72, + 0x6f, 0x6f, 0x66, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, } var ( diff --git a/waku/v2/protocol/waku_message.proto b/waku/v2/protocol/waku_message.proto index 1487ea0c..157561b6 100644 --- a/waku/v2/protocol/waku_message.proto +++ b/waku/v2/protocol/waku_message.proto @@ -4,6 +4,7 @@ package protocol; message WakuMessage { optional bytes payload = 1; - optional fixed32 contentTopic = 2; + optional uint32 contentTopic = 2; optional uint32 version = 3; + optional bytes proof = 4; } \ No newline at end of file diff --git a/waku/v2/protocol/waku_relay.go b/waku/v2/protocol/waku_relay.go index cd0898f6..1a137e3d 100644 --- a/waku/v2/protocol/waku_relay.go +++ b/waku/v2/protocol/waku_relay.go @@ -6,10 +6,12 @@ package protocol import ( "context" + "crypto/sha256" "github.com/libp2p/go-libp2p-core/host" libp2pProtocol "github.com/libp2p/go-libp2p-core/protocol" pubsub "github.com/libp2p/go-libp2p-pubsub" + pubsub_pb "github.com/libp2p/go-libp2p-pubsub/pb" ) const WakuRelayProtocol = libp2pProtocol.ID("/vac/waku/relay/2.0.0-beta2") @@ -18,11 +20,16 @@ type WakuRelay struct { p *pubsub.PubSub } +// Once https://github.com/status-im/nim-waku/issues/420 is fixed, implement a custom messageIdFn +func msgIdFn(pmsg *pubsub_pb.Message) string { + hash := sha256.Sum256(pmsg.Data) + return string(hash[:]) +} + func NewWakuRelay(ctx context.Context, h host.Host, opts ...pubsub.Option) (*pubsub.PubSub, error) { - // Once https://github.com/status-im/nim-waku/issues/420 is fixed, implement a custom messageIdFn - //opts = append(opts, pubsub.WithNoAuthor()) - //opts = append(opts, pubsub.WithMessageIdFn(messageIdFn)) opts = append(opts, pubsub.WithMessageSignaturePolicy(pubsub.StrictNoSign)) + opts = append(opts, pubsub.WithNoAuthor()) + opts = append(opts, pubsub.WithMessageIdFn(msgIdFn)) gossipSub, err := pubsub.NewGossipSub(ctx, h, []libp2pProtocol.ID{WakuRelayProtocol}, opts...) diff --git a/waku/v2/protocol/waku_store/waku_store.go b/waku/v2/protocol/waku_store/waku_store.go index 804b8caf..f6b2ced3 100644 --- a/waku/v2/protocol/waku_store/waku_store.go +++ b/waku/v2/protocol/waku_store/waku_store.go @@ -7,23 +7,27 @@ import ( "crypto/sha256" "encoding/hex" "errors" + "fmt" "io/ioutil" - "log" "sort" "sync" "time" "github.com/cruxic/go-hmac-drbg/hmacdrbg" + logging "github.com/ipfs/go-log" "github.com/libp2p/go-libp2p-core/host" "github.com/libp2p/go-libp2p-core/network" "github.com/libp2p/go-libp2p-core/peer" "github.com/libp2p/go-libp2p-core/peerstore" libp2pProtocol "github.com/libp2p/go-libp2p-core/protocol" + ma "github.com/multiformats/go-multiaddr" "github.com/status-im/go-waku/waku/v2/protocol" "google.golang.org/protobuf/proto" ) +var log = logging.Logger("wakustore") + const WakuStoreProtocolId = libp2pProtocol.ID("/vac/waku/store/2.0.0-beta2") const MaxPageSize = 100 // Maximum number of waku messages in each page const ConnectionTimeout = 10 * time.Second @@ -148,6 +152,7 @@ func (w *WakuStore) FindMessages(query *protocol.HistoryQuery) *protocol.History type MessageProvider interface { GetAll() ([]*protocol.WakuMessage, error) + Put(message *protocol.WakuMessage) error } type IndexedWakuMessage struct { @@ -156,14 +161,16 @@ type IndexedWakuMessage struct { } type WakuStore struct { - msg chan *protocol.WakuMessage - messages []IndexedWakuMessage - msgProvider *MessageProvider + msg chan *protocol.WakuMessage + messages []IndexedWakuMessage + messagesMutex sync.Mutex + + msgProvider MessageProvider h host.Host ctx context.Context } -func NewWakuStore(ctx context.Context, h host.Host, msg chan *protocol.WakuMessage, p *MessageProvider) *WakuStore { +func NewWakuStore(ctx context.Context, h host.Host, msg chan *protocol.WakuMessage, p MessageProvider) *WakuStore { wakuStore := new(WakuStore) wakuStore.msg = msg wakuStore.msgProvider = p @@ -175,30 +182,47 @@ func NewWakuStore(ctx context.Context, h host.Host, msg chan *protocol.WakuMessa func (store *WakuStore) Start() { store.h.SetStreamHandler(WakuStoreProtocolId, store.onRequest) - go store.processMessages() - // TODO: Load all messages - // proc onData(timestamp: uint64, msg: WakuMessage) = - // ws.messages.add(IndexedWakuMessage(msg: msg, index: msg.computeIndex())) - // let res = ws.store.getAll(onData) + messages, err := store.msgProvider.GetAll() + if err != nil { + log.Error("could not load DBProvider messages") + return + } + + for _, msg := range messages { + idx, err := computeIndex(msg) + if err != nil { + log.Error("could not calculate message index", err) + continue + } + store.messages = append(store.messages, IndexedWakuMessage{msg: msg, index: idx}) + } + + go store.storeIncomingMessages() } -func (store *WakuStore) processMessages() { +func (store *WakuStore) storeIncomingMessages() { for message := range store.msg { index, err := computeIndex(message) if err != nil { - log.Println(err) + log.Error("could not calculate message index", err) continue } + store.messagesMutex.Lock() store.messages = append(store.messages, IndexedWakuMessage{msg: message, index: index}) + store.messagesMutex.Unlock() if store.msgProvider == nil { continue } - // let res = store.msgProvider.put(index, msg) // TODO: store in the DB + err = store.msgProvider.Put(message) // Should the index be stored? + if err != nil { + log.Error("could not store message", err) + continue + } } } @@ -212,17 +236,17 @@ func (store *WakuStore) onRequest(s network.Stream) { if err != nil { s.Reset() - log.Println(err) + log.Error("error reading request", err) return } proto.Unmarshal(buf, historyRPCRequest) if err != nil { - log.Println(err) + log.Error("error decoding request", err) return } - log.Printf("%s: Received query from %s", s.Conn().LocalPeer(), s.Conn().RemotePeer()) + log.Info(fmt.Sprintf("%s: Received query from %s", s.Conn().LocalPeer(), s.Conn().RemotePeer())) historyResponseRPC := &protocol.HistoryRPC{} historyResponseRPC.RequestId = historyRPCRequest.RequestId @@ -230,16 +254,16 @@ func (store *WakuStore) onRequest(s network.Stream) { message, err := proto.Marshal(historyResponseRPC) if err != nil { - log.Println(err) + log.Error("error encoding response", err) return } _, err = s.Write(message) if err != nil { - log.Println(err) + log.Error("error writing response", err) s.Reset() } else { - log.Printf("%s: Response sent to %s", s.Conn().LocalPeer().String(), s.Conn().RemotePeer().String()) + log.Info(fmt.Sprintf("%s: Response sent to %s", s.Conn().LocalPeer().String(), s.Conn().RemotePeer().String())) } } @@ -319,7 +343,7 @@ func (store *WakuStore) selectPeer() *peer.ID { for _, peer := range store.h.Peerstore().Peers() { protocols, err := store.h.Peerstore().SupportsProtocols(peer, string(WakuStoreProtocolId)) if err != nil { - log.Println("error obtaining the protocols supported by peers", err) + log.Error("error obtaining the protocols supported by peers", err) return nil } @@ -364,7 +388,7 @@ func GenerateRequestId() string { } if !rng.Generate(randData) { - log.Fatal("could not generate random request id") + log.Error("could not generate random request id") } } return hex.EncodeToString(randData) @@ -381,7 +405,7 @@ func (store *WakuStore) Query(q *protocol.HistoryQuery) (*protocol.HistoryRespon connOpt, err := store.h.NewStream(ctx, *peer, WakuStoreProtocolId) if err != nil { - log.Println("failed to connect to remote peer", err) + log.Info("failed to connect to remote peer", err) return nil, err } @@ -389,7 +413,7 @@ func (store *WakuStore) Query(q *protocol.HistoryQuery) (*protocol.HistoryRespon message, err := proto.Marshal(historyRequest) if err != nil { - log.Println(err) + log.Error("could not encode request", err) return nil, err } @@ -398,20 +422,20 @@ func (store *WakuStore) Query(q *protocol.HistoryQuery) (*protocol.HistoryRespon _, err = connOpt.Write(message) if err != nil { - log.Println(err) + log.Error("could not write request", err) return nil, err } buf, err := ioutil.ReadAll(connOpt) if err != nil { - log.Println("failed to read response", err) + log.Error("could not read response", err) return nil, err } historyResponseRPC := &protocol.HistoryRPC{} proto.Unmarshal(buf, historyResponseRPC) if err != nil { - log.Println("failed to decode response", err) + log.Error("could not decode response", err) return nil, err }