From 55bc21c604b08a7a83458fa29b87361347ca3559 Mon Sep 17 00:00:00 2001 From: Richard Ramos Date: Mon, 11 Sep 2023 17:34:56 -0400 Subject: [PATCH] feat(rln-relay): isReady --- cmd/waku/server/rest/health.go | 48 +++++++++++++++++++ cmd/waku/server/rest/health_api.yaml | 41 ++++++++++++++++ cmd/waku/server/rest/waku_rest.go | 2 +- waku/v2/node/wakunode2.go | 1 + .../rln/group_manager/dynamic/dynamic.go | 35 ++++++++++++-- .../rln/group_manager/dynamic/metrics.go | 14 ++---- .../rln/group_manager/group_manager.go | 1 + .../rln/group_manager/static/static.go | 4 ++ waku/v2/protocol/rln/waku_rln_relay.go | 5 ++ waku/v2/protocol/rln/web3/web3.go | 1 + 10 files changed, 138 insertions(+), 14 deletions(-) create mode 100644 cmd/waku/server/rest/health.go create mode 100644 cmd/waku/server/rest/health_api.yaml diff --git a/cmd/waku/server/rest/health.go b/cmd/waku/server/rest/health.go new file mode 100644 index 00000000..6fb18685 --- /dev/null +++ b/cmd/waku/server/rest/health.go @@ -0,0 +1,48 @@ +package rest + +import ( + "context" + "errors" + "net/http" + + "github.com/go-chi/chi/v5" + "github.com/waku-org/go-waku/waku/v2/node" +) + +type HealthService struct { + node *node.WakuNode + mux *chi.Mux +} + +const routeHealth = "/health" + +func NewHealthService(node *node.WakuNode, m *chi.Mux) *HealthService { + h := &HealthService{ + node: node, + mux: m, + } + + m.Get(routeHealth, h.getHealth) + + return h +} + +type HealthResponse string + +func (d *HealthService) getHealth(w http.ResponseWriter, r *http.Request) { + isReady, err := d.node.RLNRelay().IsReady(r.Context()) + if err != nil { + if errors.Is(err, context.DeadlineExceeded) { + writeResponse(w, HealthResponse("Health check timed out"), http.StatusInternalServerError) + } else { + writeResponse(w, HealthResponse(err.Error()), http.StatusInternalServerError) + } + return + } + + if isReady { + writeResponse(w, HealthResponse("Node is healthy"), http.StatusOK) + } else { + writeResponse(w, HealthResponse("Node is not ready"), http.StatusInternalServerError) + } +} diff --git a/cmd/waku/server/rest/health_api.yaml b/cmd/waku/server/rest/health_api.yaml new file mode 100644 index 00000000..d1528c77 --- /dev/null +++ b/cmd/waku/server/rest/health_api.yaml @@ -0,0 +1,41 @@ +openapi: 3.0.3 +info: + title: Waku V2 node REST API + version: 1.0.0 + contact: + name: VAC Team + url: https://forum.vac.dev/ + +tags: + - name: health + description: Healt check REST API for WakuV2 node + +paths: + /health: + get: + summary: Get node health status + description: Retrieve readiness of a Waku v2 node. + operationId: healthcheck + tags: + - health + responses: + '200': + description: Waku v2 node is up and running. + content: + text/plain: + schema: + type: string + example: Node is healty + '500': + description: Internal server error + content: + text/plain: + schema: + type: string + '503': + description: Node not initialized or having issues + content: + text/plain: + schema: + type: string + example: Node is not initialized \ No newline at end of file diff --git a/cmd/waku/server/rest/waku_rest.go b/cmd/waku/server/rest/waku_rest.go index 19f02cd0..ec032590 100644 --- a/cmd/waku/server/rest/waku_rest.go +++ b/cmd/waku/server/rest/waku_rest.go @@ -34,7 +34,7 @@ func NewWakuRest(node *node.WakuNode, address string, port int, enablePProf bool } _ = NewDebugService(node, mux) - + _ = NewHealthService(node, mux) _ = NewStoreService(node, mux) listenAddr := fmt.Sprintf("%s:%d", address, port) diff --git a/waku/v2/node/wakunode2.go b/waku/v2/node/wakunode2.go index d49043c5..afe9d1d9 100644 --- a/waku/v2/node/wakunode2.go +++ b/waku/v2/node/wakunode2.go @@ -74,6 +74,7 @@ type RLNRelay interface { Validator(spamHandler SpamHandler) func(ctx context.Context, message *pb.WakuMessage, topic string) bool Start(ctx context.Context) error Stop() error + IsReady(ctx context.Context) (bool, error) } type WakuNode struct { diff --git a/waku/v2/protocol/rln/group_manager/dynamic/dynamic.go b/waku/v2/protocol/rln/group_manager/dynamic/dynamic.go index a9ff2745..fdecd309 100644 --- a/waku/v2/protocol/rln/group_manager/dynamic/dynamic.go +++ b/waku/v2/protocol/rln/group_manager/dynamic/dynamic.go @@ -5,6 +5,7 @@ import ( "errors" "fmt" "math/big" + "sync" "time" "github.com/ethereum/go-ethereum/accounts/abi/bind" @@ -36,7 +37,8 @@ type DynamicGroupManager struct { identityCredential *rln.IdentityCredential membershipIndex rln.MembershipIndex - lastBlockProcessed uint64 + lastBlockProcessedMutex sync.RWMutex + lastBlockProcessed uint64 appKeystore *keystore.AppKeystore keystorePassword string @@ -44,6 +46,9 @@ type DynamicGroupManager struct { } func (gm *DynamicGroupManager) handler(events []*contracts.RLNMemberRegistered) error { + gm.lastBlockProcessedMutex.Lock() + defer gm.lastBlockProcessedMutex.Unlock() + toRemoveTable := om.New() toInsertTable := om.New() @@ -82,8 +87,6 @@ func (gm *DynamicGroupManager) handler(events []*contracts.RLNMemberRegistered) return err } - gm.metrics.RecordRegisteredMembership(toInsertTable.Len() - toRemoveTable.Len()) - gm.lastBlockProcessed = lastBlockProcessed err = gm.SetMetadata(RLNMetadata{ LastProcessedBlock: gm.lastBlockProcessed, @@ -95,7 +98,7 @@ func (gm *DynamicGroupManager) handler(events []*contracts.RLNMemberRegistered) // this is not a fatal error, hence we don't raise an exception gm.log.Warn("failed to persist rln metadata", zap.Error(err)) } else { - gm.log.Debug("rln metadata persisted", zap.Uint64("lastProcessedBlock", gm.lastBlockProcessed), zap.Uint64("chainID", gm.web3Config.ChainID.Uint64()), logging.HexBytes("contractAddress", gm.web3Config.RegistryContract.Address.Bytes())) + gm.log.Debug("rln metadata persisted", zap.Uint64("lastBlockProcessed", gm.lastBlockProcessed), zap.Uint64("chainID", gm.web3Config.ChainID.Uint64()), logging.HexBytes("contractAddress", gm.web3Config.RegistryContract.Address.Bytes())) } return nil @@ -228,6 +231,8 @@ func (gm *DynamicGroupManager) InsertMembers(toInsert *om.OrderedMap) error { } gm.metrics.RecordMembershipInsertionDuration(time.Since(start)) + gm.metrics.RecordRegisteredMembership(startIndex + uint(len(idCommitments))) + _, err = gm.rootTracker.UpdateLatestRoot(pair.Key.(uint64)) if err != nil { return err @@ -279,3 +284,25 @@ func (gm *DynamicGroupManager) Stop() error { return nil } + +func (gm *DynamicGroupManager) IsReady(ctx context.Context) (bool, error) { + latestBlockNumber, err := gm.latestBlockNumber(ctx) + if err != nil { + return false, fmt.Errorf("could not retrieve latest block: %w", err) + } + + gm.lastBlockProcessedMutex.RLock() + allBlocksProcessed := gm.lastBlockProcessed >= latestBlockNumber + gm.lastBlockProcessedMutex.RUnlock() + + if !allBlocksProcessed { + return false, nil + } + + syncProgress, err := gm.web3Config.ETHClient.SyncProgress(ctx) + if err != nil { + return false, fmt.Errorf("could not retrieve sync state: %w", err) + } + + return syncProgress == nil, nil // syncProgress only has a value while node is syncing +} diff --git a/waku/v2/protocol/rln/group_manager/dynamic/metrics.go b/waku/v2/protocol/rln/group_manager/dynamic/metrics.go index a11fc201..a345dc0b 100644 --- a/waku/v2/protocol/rln/group_manager/dynamic/metrics.go +++ b/waku/v2/protocol/rln/group_manager/dynamic/metrics.go @@ -7,8 +7,8 @@ import ( "github.com/prometheus/client_golang/prometheus" ) -var numberRegisteredMemberships = prometheus.NewCounter( - prometheus.CounterOpts{ +var numberRegisteredMemberships = prometheus.NewGauge( + prometheus.GaugeOpts{ Name: "waku_rln_number_registered_memberships", Help: "number of registered and active rln memberships", }) @@ -33,7 +33,7 @@ var collectors = []prometheus.Collector{ // Metrics exposes the functions required to update prometheus metrics for lightpush protocol type Metrics interface { - RecordRegisteredMembership(num int) + RecordRegisteredMembership(num uint) RecordMembershipInsertionDuration(duration time.Duration) RecordMembershipCredentialsImportDuration(duration time.Duration) } @@ -60,10 +60,6 @@ func (m *metricsImpl) RecordMembershipCredentialsImportDuration(duration time.Du } // RecordRegisteredMembership records the number of registered memberships -func (m *metricsImpl) RecordRegisteredMembership(num int) { - if num < 0 { - return - } - - numberRegisteredMemberships.Add(float64(num)) +func (m *metricsImpl) RecordRegisteredMembership(num uint) { + numberRegisteredMemberships.Set(float64(num)) } diff --git a/waku/v2/protocol/rln/group_manager/group_manager.go b/waku/v2/protocol/rln/group_manager/group_manager.go index 3dbec886..32792482 100644 --- a/waku/v2/protocol/rln/group_manager/group_manager.go +++ b/waku/v2/protocol/rln/group_manager/group_manager.go @@ -11,6 +11,7 @@ type GroupManager interface { IdentityCredentials() (rln.IdentityCredential, error) MembershipIndex() rln.MembershipIndex Stop() error + IsReady(ctx context.Context) (bool, error) } type Details struct { diff --git a/waku/v2/protocol/rln/group_manager/static/static.go b/waku/v2/protocol/rln/group_manager/static/static.go index 251bf1b4..e4c5889a 100644 --- a/waku/v2/protocol/rln/group_manager/static/static.go +++ b/waku/v2/protocol/rln/group_manager/static/static.go @@ -95,3 +95,7 @@ func (gm *StaticGroupManager) Stop() error { // Do nothing return nil } + +func (gm *StaticGroupManager) IsReady(ctx context.Context) (bool, error) { + return true, nil +} diff --git a/waku/v2/protocol/rln/waku_rln_relay.go b/waku/v2/protocol/rln/waku_rln_relay.go index d3c00eb4..3e0248ef 100644 --- a/waku/v2/protocol/rln/waku_rln_relay.go +++ b/waku/v2/protocol/rln/waku_rln_relay.go @@ -293,3 +293,8 @@ func (rlnRelay *WakuRLNRelay) IdentityCredential() (rln.IdentityCredential, erro func (rlnRelay *WakuRLNRelay) MembershipIndex() uint { return rlnRelay.GroupManager.MembershipIndex() } + +// IsReady returns true if the RLN Relay protocol is ready to relay messages +func (rlnRelay *WakuRLNRelay) IsReady(ctx context.Context) (bool, error) { + return rlnRelay.GroupManager.IsReady(ctx) +} diff --git a/waku/v2/protocol/rln/web3/web3.go b/waku/v2/protocol/rln/web3/web3.go index cdd162a0..59c2234b 100644 --- a/waku/v2/protocol/rln/web3/web3.go +++ b/waku/v2/protocol/rln/web3/web3.go @@ -31,6 +31,7 @@ type RLNContract struct { // EthClient is an interface for the ethclient.Client, so that we can pass mock client for testing type EthClient interface { bind.ContractBackend + SyncProgress(ctx context.Context) (*ethereum.SyncProgress, error) TransactionReceipt(ctx context.Context, txHash common.Hash) (*types.Receipt, error) BlockByNumber(ctx context.Context, number *big.Int) (*types.Block, error) SubscribeNewHead(ctx context.Context, ch chan<- *types.Header) (ethereum.Subscription, error)