diff --git a/README.md b/README.md index 58645d6..89c7293 100644 --- a/README.md +++ b/README.md @@ -2,6 +2,25 @@ used to compare the number of Status messages across Status nodes to understand the potential discrepancies and odd behaviour of messages being inserted in the past in a given pubsub topic. +# Querying the data + +The following tables are available in https://superset.bi.status.im/superset/sqllab/ `telemetry` database: + +### `missingMessages` +This table contains the list of missing messages, and in which storenode the messages was identified to be missing. The `msgStatus` column indicates the possible scenarios in which a message can be considered missing: +- `does_not_exist`. The message is not stored in the storenode's db +- `unknown`. It was not possible to retrieve information about this message. The store node was likely not available during the time the query to verify the message status was done. + +### `storeNodeUnavailable` +This table records timestamps on which a specific storenode was not available. This information should be used along with data from https://kibana.infra.status.im/ and https://grafana.infra.status.im/ to determine the reason why the storenode was not available. + +# Browsing the logs + +You can browse the logs in kibana at https://kibana.infra.status.im/goto/0be1cef0-1782-11ef-8a2b-91a1a792bd9c + +Assuming the link is not available use the following parameters: +- `logsource`: `node-01.he-eu-hel1.tmetry.misc` +- `program`: `docker/telemetry-counter` # Development diff --git a/cmd/storemsgcounter/execute.go b/cmd/storemsgcounter/execute.go index 8a43113..8b0d496 100644 --- a/cmd/storemsgcounter/execute.go +++ b/cmd/storemsgcounter/execute.go @@ -20,6 +20,7 @@ import ( "github.com/waku-org/go-waku/waku/v2/protocol/pb" "github.com/waku-org/go-waku/waku/v2/protocol/store" "github.com/waku-org/storenode-messages/internal/logging" + "github.com/waku-org/storenode-messages/internal/metrics" "github.com/waku-org/storenode-messages/internal/persistence" "go.uber.org/zap" "go.uber.org/zap/zapcore" @@ -50,6 +51,12 @@ func Execute(ctx context.Context, options Options) error { logger := logging.Logger() + var metricsServer *metrics.Server + if options.EnableMetrics { + metricsServer = metrics.NewMetricsServer(options.MetricsAddress, options.MetricsPort, logger) + go metricsServer.Start() + } + var db *sql.DB var migrationFn func(*sql.DB, *zap.Logger) error db, migrationFn, err := persistence.ParseURL(options.DatabaseURL, logger) @@ -370,7 +377,7 @@ func verifyMessageExistence(ctx context.Context, runId string, peerID peer.ID, m peerInfo := wakuNode.Host().Peerstore().PeerInfo(peerID) - queryLogger := logger.With(zap.Stringer("storenode", peerID), zap.Stringers("hashes", messageHashes)) + queryLogger := logger.With(zap.Stringer("storenode", peerID)) queryLbl: for i := 0; i < maxAttempts; i++ { diff --git a/cmd/storemsgcounter/flags.go b/cmd/storemsgcounter/flags.go index dfeb663..39e83e6 100644 --- a/cmd/storemsgcounter/flags.go +++ b/cmd/storemsgcounter/flags.go @@ -102,4 +102,27 @@ var cliFlags = []cli.Flag{ Destination: &options.LogOutput, EnvVars: []string{"STORE_MSG_CTR_LOG_OUTPUT"}, }), + altsrc.NewBoolFlag(&cli.BoolFlag{ + Name: "metrics-server", + Aliases: []string{"metrics"}, + Usage: "Enable the metrics server", + Destination: &options.EnableMetrics, + EnvVars: []string{"STORE_MSG_CTR_METRICS_SERVER"}, + }), + altsrc.NewStringFlag(&cli.StringFlag{ + Name: "metrics-server-address", + Aliases: []string{"metrics-address"}, + Value: "127.0.0.1", + Usage: "Listening address of the metrics server", + Destination: &options.MetricsAddress, + EnvVars: []string{"STORE_MSG_CTR_METRICS_SERVER_ADDRESS"}, + }), + altsrc.NewIntFlag(&cli.IntFlag{ + Name: "metrics-server-port", + Aliases: []string{"metrics-port"}, + Value: 8008, + Usage: "Listening HTTP port of the metrics server", + Destination: &options.MetricsPort, + EnvVars: []string{"STORE_MSG_CTR_METRICS_SERVER_PORT"}, + }), } diff --git a/cmd/storemsgcounter/options.go b/cmd/storemsgcounter/options.go index 25e0555..f7a63a8 100644 --- a/cmd/storemsgcounter/options.go +++ b/cmd/storemsgcounter/options.go @@ -20,4 +20,7 @@ type Options struct { StoreNodes []multiaddr.Multiaddr DNSDiscoveryNameserver string DNSDiscoveryURLs cli.StringSlice + EnableMetrics bool + MetricsAddress string + MetricsPort int } diff --git a/go.mod b/go.mod index edad55b..ba94a51 100644 --- a/go.mod +++ b/go.mod @@ -15,8 +15,10 @@ require ( github.com/libp2p/go-libp2p v0.35.0 github.com/mattn/go-sqlite3 v1.14.17 github.com/multiformats/go-multiaddr v0.12.4 + github.com/prometheus/client_golang v1.19.1 github.com/urfave/cli/v2 v2.27.2 github.com/waku-org/go-waku v0.8.1-0.20240605190333-d2d2f5672ebd + go.opencensus.io v0.24.0 go.uber.org/zap v1.27.0 google.golang.org/protobuf v1.34.1 ) @@ -50,6 +52,7 @@ require ( github.com/go-task/slim-sprig v0.0.0-20230315185526-52ccab3ef572 // indirect github.com/godbus/dbus/v5 v5.1.0 // indirect github.com/gogo/protobuf v1.3.2 // indirect + github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect github.com/golang/snappy v0.0.4 // indirect github.com/google/gopacket v1.1.19 // indirect github.com/google/pprof v0.0.0-20240207164012-fb44976bdcd5 // indirect @@ -120,7 +123,6 @@ require ( github.com/pion/webrtc/v3 v3.2.40 // indirect github.com/pkg/errors v0.9.1 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect - github.com/prometheus/client_golang v1.19.1 // indirect github.com/prometheus/client_model v0.6.1 // indirect github.com/prometheus/common v0.48.0 // indirect github.com/prometheus/procfs v0.12.0 // indirect diff --git a/go.sum b/go.sum index 7585d1f..d79bd4a 100644 --- a/go.sum +++ b/go.sum @@ -115,6 +115,7 @@ github.com/chzyer/test v0.0.0-20180213035817-a1ea475d72b1/go.mod h1:Q3SI9o4m/ZMn github.com/cilium/ebpf v0.2.0/go.mod h1:To2CFviqOWL/M0gIMsvSMlqe7em/l1ALkX1PyjrX2Qs= github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw= github.com/cloudflare/cloudflare-go v0.14.0/go.mod h1:EnwdgGMaFOruiPZRFSgn+TsQ3hQ7C/YWzIGLeu5c304= +github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc= github.com/cockroachdb/apd v1.1.0 h1:3LFP3629v+1aKXU5Q37mxmRxX/pIu1nijXydLShEq5I= github.com/cockroachdb/apd v1.1.0/go.mod h1:8Sl8LxpKi29FqWXR16WEFZRNSz3SoPzUzeMeY4+DwBQ= github.com/consensys/bavard v0.1.8-0.20210406032232-f3452dc9b572/go.mod h1:Bpd0/3mZuaj6Sj+PqrmIquiOKy397AKGThQPaGzNXAQ= @@ -177,7 +178,9 @@ github.com/edsrzf/mmap-go v1.0.0/go.mod h1:YO35OhQPt3KJa3ryjFM5Bs14WD66h8eGKpfaB github.com/elastic/gosigar v0.12.0/go.mod h1:iXRIGg2tLnu7LBdpqzyQfGDEidKCfWcCMS0WKyPWoMs= github.com/elastic/gosigar v0.14.2 h1:Dg80n8cr90OZ7x+bAax/QjoW/XqTI11RmA79ZwIm9/4= github.com/elastic/gosigar v0.14.2/go.mod h1:iXRIGg2tLnu7LBdpqzyQfGDEidKCfWcCMS0WKyPWoMs= +github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= +github.com/envoyproxy/go-control-plane v0.9.4/go.mod h1:6rpuAdCZL397s3pYoYcLgu1mIlRU8Am5FuJP05cCM98= github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c= github.com/ethereum/go-ethereum v1.9.5/go.mod h1:PwpWDrCLZrV+tfrhqqF6kPknbISMHaJv9Ln3kPCZLwY= github.com/ethereum/go-ethereum v1.10.16/go.mod h1:Anj6cxczl+AHy63o4X9O8yWNHuN5wMpfb8MAnHkWn7Y= @@ -246,6 +249,9 @@ github.com/golang/geo v0.0.0-20190916061304-5b978397cfec/go.mod h1:QZ0nwyI2jOfgR github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q= github.com/golang/groupcache v0.0.0-20190702054246-869f871628b6/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc= github.com/golang/groupcache v0.0.0-20191227052852-215e87163ea7/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc= +github.com/golang/groupcache v0.0.0-20200121045136-8c9f03a8e57e/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc= +github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da h1:oI5xCqsCo564l8iNU+DwB5epxmsaqB+rhGL0m5jtYqE= +github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc= github.com/golang/lint v0.0.0-20180702182130-06c8688daad7/go.mod h1:tluoj9z5200jBnyusfRPU2LqT6J+DAorxEvtC7LHB+E= github.com/golang/mock v1.1.1/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A= github.com/golang/mock v1.2.0/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A= @@ -258,6 +264,7 @@ github.com/golang/protobuf v1.4.0-rc.1.0.20200221234624-67d41d38c208/go.mod h1:x github.com/golang/protobuf v1.4.0-rc.2/go.mod h1:LlEzMj4AhA7rCAGe4KMBDvJI+AwstrUpVNzEA03Pprs= github.com/golang/protobuf v1.4.0-rc.4.0.20200313231945-b860323f09d0/go.mod h1:WU3c8KckQ9AFe+yFwt9sWVRKCVIyN9cPHBJSNnbL67w= github.com/golang/protobuf v1.4.0/go.mod h1:jodUvKwWbYaEsadDk5Fwe5c77LiNKVO9IDvqG2KuDX0= +github.com/golang/protobuf v1.4.1/go.mod h1:U8fpvMrcmy5pZrNK1lt4xCsGvpyWQ/VVv6QDs8UjoX8= github.com/golang/protobuf v1.4.2/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI= github.com/golang/protobuf v1.4.3/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI= github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk= @@ -277,7 +284,9 @@ github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMyw github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.4.1/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/go-cmp v0.5.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.2/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/go-cmp v0.5.3/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.4/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= @@ -295,6 +304,7 @@ github.com/google/pprof v0.0.0-20210407192527-94a9f03dee38/go.mod h1:kpwsk12EmLe github.com/google/pprof v0.0.0-20240207164012-fb44976bdcd5 h1:E/LAvt58di64hlYjx7AsNS6C/ysHWYo+2qPCZKTQhRo= github.com/google/pprof v0.0.0-20240207164012-fb44976bdcd5/go.mod h1:czg5+yv1E0ZGTi6S6vVK1mke0fV+FaUhNGcd6VRS9Ik= github.com/google/renameio v0.1.0/go.mod h1:KWCgfxg9yswjAJkECMjeO8J8rahYeXnNhOm40UhjYkI= +github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/google/uuid v1.1.5/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/google/uuid v1.3.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/google/uuid v1.4.0 h1:MtMxsa51/r9yyhkyLsVeVt0B+BGQZzpQiTQ4eHZ8bc4= @@ -837,6 +847,8 @@ go.opencensus.io v0.18.0/go.mod h1:vKdFvxhtzZ9onBp9VKHK8z/sRpBMnKAsufL7wlDrCOA= go.opencensus.io v0.21.0/go.mod h1:mSImk1erAIZhrmZN+AvHh14ztQfjbGwt4TtuofqLduU= go.opencensus.io v0.22.0/go.mod h1:+kGneAE2xo2IficOXnaByMWTGM9T73dGwxeWcUqIpI8= go.opencensus.io v0.22.2/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw= +go.opencensus.io v0.24.0 h1:y73uSU6J157QMP2kn2r30vwW1A2W2WFwSCGnAVxeaD0= +go.opencensus.io v0.24.0/go.mod h1:vNK8G9p7aAivkbmorf4v+7Hgx+Zs0yY+0fOtgBfjQKo= go.uber.org/atomic v1.3.2/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE= go.uber.org/atomic v1.4.0/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE= go.uber.org/atomic v1.5.0/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ= @@ -961,6 +973,7 @@ golang.org/x/net v0.0.0-20200520004742-59133d7f0dd7/go.mod h1:qpuaurCH72eLCgpAm/ golang.org/x/net v0.0.0-20200813134508-3edf25e44fcc/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA= golang.org/x/net v0.0.0-20200822124328-c89045814202/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA= golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= +golang.org/x/net v0.0.0-20201110031124-69a78807bb2b/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= golang.org/x/net v0.0.0-20210119194325-5f4716e94777/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= golang.org/x/net v0.0.0-20210220033124-5f55cee0dc0d/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= @@ -1193,6 +1206,7 @@ google.golang.org/genproto v0.0.0-20191115194625-c23dd37a84c9/go.mod h1:n3cpQtvx google.golang.org/genproto v0.0.0-20191216164720-4f79533eabd1/go.mod h1:n3cpQtvxv34hfy77yVDNjmbRyujviMdxYliBSkLhpCc= google.golang.org/genproto v0.0.0-20191230161307-f3c370f40bfb/go.mod h1:n3cpQtvxv34hfy77yVDNjmbRyujviMdxYliBSkLhpCc= google.golang.org/genproto v0.0.0-20200108215221-bd8f9a0ef82f/go.mod h1:n3cpQtvxv34hfy77yVDNjmbRyujviMdxYliBSkLhpCc= +google.golang.org/genproto v0.0.0-20200526211855-cb27e3aa2013/go.mod h1:NbSheEEYHJ7i3ixzK3sjbqSGDJWnxyFXZblF3eUsNvo= google.golang.org/grpc v1.14.0/go.mod h1:yo6s7OP7yaDglbqo1J04qKzAhqBH6lvTonzMVmEdcZw= google.golang.org/grpc v1.16.0/go.mod h1:0JHn/cJsOMiMfNA9+DeHDlAU7KAAB5GDlYFpa9MZMio= google.golang.org/grpc v1.17.0/go.mod h1:6QZJwpn2B+Zp71q/5VxRsJ6NXXVCE5NRUHRo+f3cWCs= @@ -1200,13 +1214,19 @@ google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZi google.golang.org/grpc v1.20.1/go.mod h1:10oTOabMzJvdu6/UiuZezV6QK5dSlG84ov/aaiqXj38= google.golang.org/grpc v1.21.1/go.mod h1:oYelfM1adQP15Ek0mdvEgi9Df8B9CZIaU1084ijfRaM= google.golang.org/grpc v1.23.0/go.mod h1:Y5yQAOtifL1yxbo5wqy6BxZv8vAUGQwXBOALyacEbxg= +google.golang.org/grpc v1.25.1/go.mod h1:c3i+UQWmh7LiEpx4sFZnkU36qjEYZ0imhYfXVyQciAY= google.golang.org/grpc v1.26.0/go.mod h1:qbnxyOmOxrQa7FizSgH+ReBfzJrCY1pSN7KXBS8abTk= +google.golang.org/grpc v1.27.0/go.mod h1:qbnxyOmOxrQa7FizSgH+ReBfzJrCY1pSN7KXBS8abTk= +google.golang.org/grpc v1.33.2/go.mod h1:JMHMWHQWaTccqQQlmk3MJZS+GWXOdAesneDmEnv2fbc= google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8= google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0= google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM= google.golang.org/protobuf v1.20.1-0.20200309200217-e05f789c0967/go.mod h1:A+miEFZTKqfCUM6K7xSMQL9OKL/b6hQv+e19PK+JZNE= google.golang.org/protobuf v1.21.0/go.mod h1:47Nbq4nVaFHyn7ilMalzfO3qCViNmqZ2kzikPIcrTAo= +google.golang.org/protobuf v1.22.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU= google.golang.org/protobuf v1.23.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU= +google.golang.org/protobuf v1.23.1-0.20200526195155-81db48ad09cc/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU= +google.golang.org/protobuf v1.25.0/go.mod h1:9JNX74DMeImyA3h4bdi1ymwjUzf21/xIlbajtzgsN7c= google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw= google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= google.golang.org/protobuf v1.34.1 h1:9ddQBjfCyZPOHPUiPxpYESBLc+T8P3E+Vo4IbKZgFWg= diff --git a/internal/metrics/metrics.go b/internal/metrics/metrics.go new file mode 100644 index 0000000..2f20843 --- /dev/null +++ b/internal/metrics/metrics.go @@ -0,0 +1,59 @@ +package metrics + +import ( + "github.com/libp2p/go-libp2p/p2p/metricshelper" + "github.com/prometheus/client_golang/prometheus" + "go.uber.org/zap" +) + +var missingMessages = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Name: "missing_messages", + Help: "The messages identified as missing and the reason why they're missing", + }, + []string{"storenode", "status"}, +) + +var storenodeUnavailable = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Name: "storenode_unavailable", + Help: "Number of PubSub Topics node is subscribed to", + }, + []string{"storenode"}, +) + +var collectors = []prometheus.Collector{ + missingMessages, + storenodeUnavailable, +} + +// Metrics exposes the functions required to update prometheus metrics for relay protocol +type Metrics interface { + RecordMissingMessage(storenode string, status string) + RecordStorenodeUnavailable(storenode string) +} + +type metricsImpl struct { + log *zap.Logger + reg prometheus.Registerer +} + +func NewMetrics(reg prometheus.Registerer, logger *zap.Logger) Metrics { + metricshelper.RegisterCollectors(reg, collectors...) + return &metricsImpl{ + log: logger, + reg: reg, + } +} + +func (m *metricsImpl) RecordMissingMessage(storenode string, status string) { + go func() { + missingMessages.WithLabelValues(storenode, status).Inc() + }() +} + +func (m *metricsImpl) RecordStorenodeUnavailable(storenode string) { + go func() { + storenodeUnavailable.WithLabelValues(storenode).Inc() + }() +} diff --git a/internal/metrics/server.go b/internal/metrics/server.go new file mode 100644 index 0000000..3934235 --- /dev/null +++ b/internal/metrics/server.go @@ -0,0 +1,58 @@ +package metrics + +import ( + "context" + "fmt" + "net/http" + + "github.com/prometheus/client_golang/prometheus/promhttp" + "go.opencensus.io/plugin/ochttp" + "go.uber.org/zap" +) + +// Server runs and controls a HTTP pprof interface. +type Server struct { + server *http.Server + log *zap.Logger +} + +// NewMetricsServer creates a prometheus server on a particular interface and port +func NewMetricsServer(address string, port int, log *zap.Logger) *Server { + p := Server{ + log: log.Named("metrics"), + } + + p.log.Info("starting server", zap.String("address", address), zap.Int("port", port)) + mux := http.NewServeMux() + mux.Handle("/metrics", promhttp.Handler()) + + // Healthcheck + mux.HandleFunc("/health", func(w http.ResponseWriter, r *http.Request) { + fmt.Fprint(w, "OK") + }) + + h := &ochttp.Handler{Handler: mux} + + p.server = &http.Server{ + Addr: fmt.Sprintf("%s:%d", address, port), + Handler: h, + } + + return &p +} + +// Start executes the HTTP server in the background. +func (p *Server) Start() { + p.log.Info("server started ", zap.Error(p.server.ListenAndServe())) +} + +// Stop shuts down the prometheus server +func (p *Server) Stop(ctx context.Context) error { + err := p.server.Shutdown(ctx) + if err != nil { + p.log.Error("stopping server", zap.Error(err)) + return err + } + + return nil +} diff --git a/internal/persistence/database.go b/internal/persistence/database.go index eeafc70..0d60edb 100644 --- a/internal/persistence/database.go +++ b/internal/persistence/database.go @@ -7,9 +7,11 @@ import ( "time" "github.com/libp2p/go-libp2p/core/peer" + "github.com/prometheus/client_golang/prometheus" "github.com/waku-org/go-waku/waku/v2/protocol/pb" "github.com/waku-org/go-waku/waku/v2/timesource" "github.com/waku-org/go-waku/waku/v2/utils" + "github.com/waku-org/storenode-messages/internal/metrics" "go.uber.org/zap" ) @@ -18,6 +20,7 @@ type DBStore struct { db *sql.DB migrationFn func(db *sql.DB, logger *zap.Logger) error retentionPolicy time.Duration + metrics metrics.Metrics timesource timesource.Timesource log *zap.Logger @@ -101,6 +104,8 @@ func NewDBStore(log *zap.Logger, options ...DBOption) (*DBStore, error) { optList := DefaultOptions() optList = append(optList, options...) + result.metrics = metrics.NewMetrics(prometheus.DefaultRegisterer, log) + for _, opt := range optList { err := opt(result) if err != nil { @@ -261,10 +266,14 @@ func (d *DBStore) RecordMessage(uuid string, tx *sql.Tx, msgHash pb.MessageHash, now := time.Now().UnixNano() for _, s := range storenodes { - _, err := stmt.Exec(uuid, clusterID, topic, msgHash.String(), timestamp, utils.EncapsulatePeerID(s.ID, s.Addrs[0])[0].String(), status, now) + storeAddr := utils.EncapsulatePeerID(s.ID, s.Addrs[0])[0].String() + + _, err := stmt.Exec(uuid, clusterID, topic, msgHash.String(), timestamp, storeAddr, status, now) if err != nil { return err } + + d.metrics.RecordMissingMessage(storeAddr, status) } return nil @@ -277,11 +286,15 @@ func (d *DBStore) RecordStorenodeUnavailable(uuid string, storenode peer.AddrInf } defer stmt.Close() + storeAddr := utils.EncapsulatePeerID(storenode.ID, storenode.Addrs[0])[0].String() + now := time.Now().UnixNano() - _, err = stmt.Exec(uuid, utils.EncapsulatePeerID(storenode.ID, storenode.Addrs[0])[0].String(), now) + _, err = stmt.Exec(uuid, storenode, now) if err != nil { return err } + d.metrics.RecordStorenodeUnavailable(storeAddr) + return nil }