diff --git a/Makefile b/Makefile index c147ab47..58d74210 100644 --- a/Makefile +++ b/Makefile @@ -1,4 +1,4 @@ -.PHONY: all build lint +.PHONY: all build lint test all: build @@ -7,4 +7,7 @@ build: lint: @echo "lint" - @golangci-lint --exclude=SA1019 run ./... --deadline=5m \ No newline at end of file + @golangci-lint --exclude=SA1019 run ./... --deadline=5m +test: + go test -v -failfast ./... + diff --git a/examples/chat2/.gitignore b/examples/chat2/.gitignore new file mode 100644 index 00000000..933fe673 --- /dev/null +++ b/examples/chat2/.gitignore @@ -0,0 +1 @@ +chat2 diff --git a/go.mod b/go.mod index e4684924..05fd8914 100644 --- a/go.mod +++ b/go.mod @@ -30,6 +30,7 @@ require ( github.com/spf13/jwalterweatherman v1.1.0 // indirect github.com/spf13/viper v1.7.1 github.com/status-im/go-wakurelay-pubsub v0.4.3-0.20210729162817-adc68830282a + github.com/stretchr/testify v1.6.1 go.opencensus.io v0.23.0 gopkg.in/ini.v1 v1.62.0 // indirect ) diff --git a/go.sum b/go.sum index b70b6c99..139b208b 100644 --- a/go.sum +++ b/go.sum @@ -123,6 +123,7 @@ github.com/cruxic/go-hmac-drbg v0.0.0-20170206035330-84c46983886d h1:bE1UyBQ5aE6 github.com/cruxic/go-hmac-drbg v0.0.0-20170206035330-84c46983886d/go.mod h1:HAe1wsCrwH2uFnFaCC2vlcyEohnxs8KeShAFqGIHvmM= github.com/davecgh/go-spew v0.0.0-20171005155431-ecdeabc65495/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davidlazar/go-crypto v0.0.0-20170701192655-dcfb0a7ac018 h1:6xT9KW8zLC5IlbaIF5Q7JNieBoACT7iW0YTxQHR0in0= github.com/davidlazar/go-crypto v0.0.0-20170701192655-dcfb0a7ac018/go.mod h1:rQYf4tfk5sSwFsnDg3qYaBxSjsD9S8+59vW0dKUgme4= @@ -735,6 +736,7 @@ github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/profile v1.2.1/go.mod h1:hJw3o1OdXxsrSjjVksARp5W95eeEaEfptyVZyv6JUPA= github.com/pkg/sftp v1.10.1/go.mod h1:lYOWFsE0bwd1+KfKJaKeuokY15vzFx25BLbzYYoAxZI= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/posener/complete v1.1.1/go.mod h1:em0nMJCgc9GFtwrmVmEMR/ZL6WyhyjMBndrE9hABlRI= github.com/prometheus/client_golang v0.9.1/go.mod h1:7SWBe2y4D6OKWSNQJUaRYU/AaXPKyh/dDVn+NZz0KFw= @@ -859,6 +861,7 @@ github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXf github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA= +github.com/stretchr/testify v1.6.1 h1:hDPOHmpOpP40lSULcqw7IrRb/u7w6RpDC9399XyoNd0= github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/subosito/gotenv v1.2.0 h1:Slr1R9HxAlEKefgq5jn9U+DnETlIUa6HfgEzj0g5d7s= github.com/subosito/gotenv v1.2.0/go.mod h1:N0PQaV/YGNqwC0u51sEeR/aUtSLEXKX9iv69rRypqCw= @@ -1261,6 +1264,7 @@ gopkg.in/yaml.v2 v2.2.5/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.3.0/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY= gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= honnef.co/go/tools v0.0.0-20180728063816-88497007e858/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= diff --git a/tests/connection_test.go b/tests/connection_test.go new file mode 100644 index 00000000..d6d3a3d7 --- /dev/null +++ b/tests/connection_test.go @@ -0,0 +1,81 @@ +package tests + +import ( + "context" + "crypto/rand" + "encoding/hex" + "net" + "testing" + + "github.com/ethereum/go-ethereum/crypto" + "github.com/stretchr/testify/require" + + "github.com/status-im/go-waku/waku/v2/node" + "github.com/status-im/go-waku/waku/v2/protocol/pb" + "github.com/status-im/go-waku/waku/v2/utils" +) + +func TestBasicSendingReceiving(t *testing.T) { + hostAddr, err := net.ResolveTCPAddr("tcp", "0.0.0.0:0") + require.NoError(t, err) + + key, err := randomHex(32) + require.NoError(t, err) + + prvKey, err := crypto.HexToECDSA(key) + require.NoError(t, err) + + ctx := context.Background() + + wakuNode, err := node.New(ctx, + node.WithPrivateKey(prvKey), + node.WithHostAddress([]net.Addr{hostAddr}), + node.WithWakuRelay(), + ) + require.NoError(t, err) + require.NotNil(t, wakuNode) + + require.NoError(t, write(ctx, wakuNode, "test")) + + sub, err := wakuNode.Subscribe(nil) + require.NoError(t, err) + + value := <-sub.C + payload, err := node.DecodePayload(value.Message(), &node.KeyInfo{Kind: node.None}) + require.NoError(t, err) + + require.Contains(t, string(payload.Data), "test") +} + +func randomHex(n int) (string, error) { + bytes := make([]byte, n) + if _, err := rand.Read(bytes); err != nil { + return "", err + } + return hex.EncodeToString(bytes), nil +} + +func write(ctx context.Context, wakuNode *node.WakuNode, msgContent string) error { + var contentTopic string = "test" + var version uint32 = 0 + var timestamp float64 = utils.GetUnixEpoch() + + p := new(node.Payload) + p.Data = []byte(wakuNode.ID() + ": " + msgContent) + p.Key = &node.KeyInfo{Kind: node.None} + + payload, err := p.Encode(version) + if err != nil { + return err + } + + msg := &pb.WakuMessage{ + Payload: payload, + Version: version, + ContentTopic: contentTopic, + Timestamp: timestamp, + } + + _, err = wakuNode.Publish(ctx, msg, nil) + return err +} diff --git a/tests/init.go b/tests/init.go new file mode 100644 index 00000000..ca8701d2 --- /dev/null +++ b/tests/init.go @@ -0,0 +1 @@ +package tests diff --git a/waku/v2/node/wakunode2.go b/waku/v2/node/wakunode2.go index 3572cbe7..aea088c3 100644 --- a/waku/v2/node/wakunode2.go +++ b/waku/v2/node/wakunode2.go @@ -376,7 +376,9 @@ func (w *WakuNode) startStore() { peerChan := make(chan *event.EvtPeerConnectednessChanged) w.opts.store.Start(w.ctx, w.host, peerChan) w.peerListeners = append(w.peerListeners, peerChan) - w.opts.store.Resume(string(relay.GetTopic(nil)), nil) + if _, err := w.opts.store.Resume(string(relay.GetTopic(nil)), nil); err != nil { + log.Error("failed to resume", err) + } } func (w *WakuNode) AddStorePeer(address string) (*peer.ID, error) { diff --git a/waku/v2/protocol/filter/waku_filter.go b/waku/v2/protocol/filter/waku_filter.go index 95c6481a..c9be11b6 100644 --- a/waku/v2/protocol/filter/waku_filter.go +++ b/waku/v2/protocol/filter/waku_filter.go @@ -59,12 +59,6 @@ const WakuFilterCodec = "/vac/waku/filter/2.0.0-beta1" const WakuFilterProtocolId = libp2pProtocol.ID(WakuFilterCodec) -// Error types (metric label values) -const ( - dialFailure = "dial_failure" - decodeRpcFailure = "decode_rpc_failure" -) - func (filters *Filters) Notify(msg *pb.WakuMessage, requestId string) { for key, filter := range *filters { envelope := protocol.NewEnvelope(msg, filter.Topic) @@ -252,7 +246,9 @@ func (wf *WakuFilter) FilterListener() { } for m := range wf.MsgC { - handle(m) + if err := handle(m); err != nil { + log.Error("failed to handle message", err) + } } } @@ -273,6 +269,10 @@ func (wf *WakuFilter) Subscribe(ctx context.Context, request pb.FilterRequest) ( filterRPC := &pb.FilterRPC{RequestId: hex.EncodeToString(id), Request: &request} log.Info("Sending filterRPC: ", filterRPC) err = writer.WriteMsg(filterRPC) + if err != nil { + log.Error("failed to write message", err) + return "", err + } return string(id), nil } else { // @TODO more sophisticated error handling here @@ -300,6 +300,9 @@ func (wf *WakuFilter) Unsubscribe(ctx context.Context, request pb.FilterRequest) writer := protoio.NewDelimitedWriter(conn) filterRPC := &pb.FilterRPC{RequestId: hex.EncodeToString(id), Request: &request} err = writer.WriteMsg(filterRPC) + if err != nil { + log.Error("failed to write message", err) + } //return some(id) } else { // @TODO more sophisticated error handling here diff --git a/waku/v2/protocol/lightpush/waku_lightpush.go b/waku/v2/protocol/lightpush/waku_lightpush.go index 83d1719a..ebbfe4b2 100644 --- a/waku/v2/protocol/lightpush/waku_lightpush.go +++ b/waku/v2/protocol/lightpush/waku_lightpush.go @@ -177,7 +177,12 @@ func (wakuLP *WakuLightPush) Request(ctx context.Context, req *pb.PushRequest, o } defer connOpt.Close() - defer connOpt.Reset() + defer func() { + err := connOpt.Reset() + if err != nil { + log.Error("failed to reset connection", err) + } + }() pushRequestRPC := &pb.PushRPC{RequestId: hex.EncodeToString(params.requestId), Query: req} diff --git a/waku/v2/protocol/store/waku_store.go b/waku/v2/protocol/store/waku_store.go index bf60ae1d..c27e815c 100644 --- a/waku/v2/protocol/store/waku_store.go +++ b/waku/v2/protocol/store/waku_store.go @@ -260,7 +260,10 @@ func (store *WakuStore) Start(ctx context.Context, h host.Host, peerChan chan *e storedMessages, err := store.msgProvider.GetAll() if err != nil { log.Error("could not load DBProvider messages", err) - stats.RecordWithTags(ctx, []tag.Mutator{tag.Insert(metrics.KeyStoreErrorType, "store_load_failure")}, metrics.Errors.M(1)) + err := stats.RecordWithTags(ctx, []tag.Mutator{tag.Insert(metrics.KeyStoreErrorType, "store_load_failure")}, metrics.Errors.M(1)) + if err != nil { + log.Error("failed to record with tags") + } return } @@ -272,7 +275,9 @@ func (store *WakuStore) Start(ctx context.Context, h host.Host, peerChan chan *e store.storeMessageWithIndex(storedMessage.PubsubTopic, idx, storedMessage.Message) - stats.RecordWithTags(ctx, []tag.Mutator{tag.Insert(metrics.KeyType, "stored")}, metrics.StoreMessages.M(int64(len(store.messages)))) + if err := stats.RecordWithTags(ctx, []tag.Mutator{tag.Insert(metrics.KeyType, "stored")}, metrics.StoreMessages.M(int64(len(store.messages)))); err != nil { + log.Error("failed to record with tags") + } } go store.peerListener() @@ -312,11 +317,16 @@ func (store *WakuStore) storeMessage(pubSubTopic string, msg *pb.WakuMessage) { if err != nil { log.Error("could not store message", err) - stats.RecordWithTags(store.ctx, []tag.Mutator{tag.Insert(metrics.KeyStoreErrorType, "store_failure")}, metrics.Errors.M(1)) + if err := stats.RecordWithTags(store.ctx, []tag.Mutator{tag.Insert(metrics.KeyStoreErrorType, "store_failure")}, metrics.Errors.M(1)); err != nil { + log.Error("failed to record with tags", err) + } return } - stats.RecordWithTags(store.ctx, []tag.Mutator{tag.Insert(metrics.KeyType, "stored")}, metrics.StoreMessages.M(int64(len(store.messages)))) + if err := stats.RecordWithTags(store.ctx, []tag.Mutator{tag.Insert(metrics.KeyType, "stored")}, metrics.StoreMessages.M(int64(len(store.messages)))); err != nil { + log.Error("failed to record with tags", err) + } + } func (store *WakuStore) storeIncomingMessages(ctx context.Context) { @@ -336,7 +346,9 @@ func (store *WakuStore) onRequest(s network.Stream) { err := reader.ReadMsg(historyRPCRequest) if err != nil { log.Error("error reading request", err) - stats.RecordWithTags(store.ctx, []tag.Mutator{tag.Insert(metrics.KeyStoreErrorType, "decodeRPCFailure")}, metrics.Errors.M(1)) + if err := stats.RecordWithTags(store.ctx, []tag.Mutator{tag.Insert(metrics.KeyStoreErrorType, "decodeRPCFailure")}, metrics.Errors.M(1)); err != nil { + log.Error("failed to record with tags", err) + } return } @@ -497,11 +509,15 @@ func (store *WakuStore) queryFrom(ctx context.Context, q *pb.HistoryQuery, selec err = reader.ReadMsg(historyResponseRPC) if err != nil { log.Error("could not read response", err) - stats.RecordWithTags(store.ctx, []tag.Mutator{tag.Insert(metrics.KeyStoreErrorType, "decodeRPCFailure")}, metrics.Errors.M(1)) + if err := stats.RecordWithTags(store.ctx, []tag.Mutator{tag.Insert(metrics.KeyStoreErrorType, "decodeRPCFailure")}, metrics.Errors.M(1)); err != nil { + log.Error("failed to record with tags") + } return nil, err } - stats.RecordWithTags(store.ctx, []tag.Mutator{tag.Insert(metrics.KeyType, "retrieved")}, metrics.StoreMessages.M(int64(len(store.messages)))) + if err := stats.RecordWithTags(store.ctx, []tag.Mutator{tag.Insert(metrics.KeyType, "retrieved")}, metrics.StoreMessages.M(int64(len(store.messages)))); err != nil { + log.Error("failed to record with tags", err) + } return historyResponseRPC.Response, nil }