feat(rln-relay): isReady

This commit is contained in:
Richard Ramos 2023-09-11 17:34:56 -04:00 committed by richΛrd
parent 7beaa3f029
commit 55bc21c604
10 changed files with 138 additions and 14 deletions

View File

@ -0,0 +1,48 @@
package rest
import (
"context"
"errors"
"net/http"
"github.com/go-chi/chi/v5"
"github.com/waku-org/go-waku/waku/v2/node"
)
type HealthService struct {
node *node.WakuNode
mux *chi.Mux
}
const routeHealth = "/health"
func NewHealthService(node *node.WakuNode, m *chi.Mux) *HealthService {
h := &HealthService{
node: node,
mux: m,
}
m.Get(routeHealth, h.getHealth)
return h
}
type HealthResponse string
func (d *HealthService) getHealth(w http.ResponseWriter, r *http.Request) {
isReady, err := d.node.RLNRelay().IsReady(r.Context())
if err != nil {
if errors.Is(err, context.DeadlineExceeded) {
writeResponse(w, HealthResponse("Health check timed out"), http.StatusInternalServerError)
} else {
writeResponse(w, HealthResponse(err.Error()), http.StatusInternalServerError)
}
return
}
if isReady {
writeResponse(w, HealthResponse("Node is healthy"), http.StatusOK)
} else {
writeResponse(w, HealthResponse("Node is not ready"), http.StatusInternalServerError)
}
}

View File

@ -0,0 +1,41 @@
openapi: 3.0.3
info:
title: Waku V2 node REST API
version: 1.0.0
contact:
name: VAC Team
url: https://forum.vac.dev/
tags:
- name: health
description: Healt check REST API for WakuV2 node
paths:
/health:
get:
summary: Get node health status
description: Retrieve readiness of a Waku v2 node.
operationId: healthcheck
tags:
- health
responses:
'200':
description: Waku v2 node is up and running.
content:
text/plain:
schema:
type: string
example: Node is healty
'500':
description: Internal server error
content:
text/plain:
schema:
type: string
'503':
description: Node not initialized or having issues
content:
text/plain:
schema:
type: string
example: Node is not initialized

View File

@ -34,7 +34,7 @@ func NewWakuRest(node *node.WakuNode, address string, port int, enablePProf bool
} }
_ = NewDebugService(node, mux) _ = NewDebugService(node, mux)
_ = NewHealthService(node, mux)
_ = NewStoreService(node, mux) _ = NewStoreService(node, mux)
listenAddr := fmt.Sprintf("%s:%d", address, port) listenAddr := fmt.Sprintf("%s:%d", address, port)

View File

@ -74,6 +74,7 @@ type RLNRelay interface {
Validator(spamHandler SpamHandler) func(ctx context.Context, message *pb.WakuMessage, topic string) bool Validator(spamHandler SpamHandler) func(ctx context.Context, message *pb.WakuMessage, topic string) bool
Start(ctx context.Context) error Start(ctx context.Context) error
Stop() error Stop() error
IsReady(ctx context.Context) (bool, error)
} }
type WakuNode struct { type WakuNode struct {

View File

@ -5,6 +5,7 @@ import (
"errors" "errors"
"fmt" "fmt"
"math/big" "math/big"
"sync"
"time" "time"
"github.com/ethereum/go-ethereum/accounts/abi/bind" "github.com/ethereum/go-ethereum/accounts/abi/bind"
@ -36,7 +37,8 @@ type DynamicGroupManager struct {
identityCredential *rln.IdentityCredential identityCredential *rln.IdentityCredential
membershipIndex rln.MembershipIndex membershipIndex rln.MembershipIndex
lastBlockProcessed uint64 lastBlockProcessedMutex sync.RWMutex
lastBlockProcessed uint64
appKeystore *keystore.AppKeystore appKeystore *keystore.AppKeystore
keystorePassword string keystorePassword string
@ -44,6 +46,9 @@ type DynamicGroupManager struct {
} }
func (gm *DynamicGroupManager) handler(events []*contracts.RLNMemberRegistered) error { func (gm *DynamicGroupManager) handler(events []*contracts.RLNMemberRegistered) error {
gm.lastBlockProcessedMutex.Lock()
defer gm.lastBlockProcessedMutex.Unlock()
toRemoveTable := om.New() toRemoveTable := om.New()
toInsertTable := om.New() toInsertTable := om.New()
@ -82,8 +87,6 @@ func (gm *DynamicGroupManager) handler(events []*contracts.RLNMemberRegistered)
return err return err
} }
gm.metrics.RecordRegisteredMembership(toInsertTable.Len() - toRemoveTable.Len())
gm.lastBlockProcessed = lastBlockProcessed gm.lastBlockProcessed = lastBlockProcessed
err = gm.SetMetadata(RLNMetadata{ err = gm.SetMetadata(RLNMetadata{
LastProcessedBlock: gm.lastBlockProcessed, LastProcessedBlock: gm.lastBlockProcessed,
@ -95,7 +98,7 @@ func (gm *DynamicGroupManager) handler(events []*contracts.RLNMemberRegistered)
// this is not a fatal error, hence we don't raise an exception // this is not a fatal error, hence we don't raise an exception
gm.log.Warn("failed to persist rln metadata", zap.Error(err)) gm.log.Warn("failed to persist rln metadata", zap.Error(err))
} else { } else {
gm.log.Debug("rln metadata persisted", zap.Uint64("lastProcessedBlock", gm.lastBlockProcessed), zap.Uint64("chainID", gm.web3Config.ChainID.Uint64()), logging.HexBytes("contractAddress", gm.web3Config.RegistryContract.Address.Bytes())) gm.log.Debug("rln metadata persisted", zap.Uint64("lastBlockProcessed", gm.lastBlockProcessed), zap.Uint64("chainID", gm.web3Config.ChainID.Uint64()), logging.HexBytes("contractAddress", gm.web3Config.RegistryContract.Address.Bytes()))
} }
return nil return nil
@ -228,6 +231,8 @@ func (gm *DynamicGroupManager) InsertMembers(toInsert *om.OrderedMap) error {
} }
gm.metrics.RecordMembershipInsertionDuration(time.Since(start)) gm.metrics.RecordMembershipInsertionDuration(time.Since(start))
gm.metrics.RecordRegisteredMembership(startIndex + uint(len(idCommitments)))
_, err = gm.rootTracker.UpdateLatestRoot(pair.Key.(uint64)) _, err = gm.rootTracker.UpdateLatestRoot(pair.Key.(uint64))
if err != nil { if err != nil {
return err return err
@ -279,3 +284,25 @@ func (gm *DynamicGroupManager) Stop() error {
return nil return nil
} }
func (gm *DynamicGroupManager) IsReady(ctx context.Context) (bool, error) {
latestBlockNumber, err := gm.latestBlockNumber(ctx)
if err != nil {
return false, fmt.Errorf("could not retrieve latest block: %w", err)
}
gm.lastBlockProcessedMutex.RLock()
allBlocksProcessed := gm.lastBlockProcessed >= latestBlockNumber
gm.lastBlockProcessedMutex.RUnlock()
if !allBlocksProcessed {
return false, nil
}
syncProgress, err := gm.web3Config.ETHClient.SyncProgress(ctx)
if err != nil {
return false, fmt.Errorf("could not retrieve sync state: %w", err)
}
return syncProgress == nil, nil // syncProgress only has a value while node is syncing
}

View File

@ -7,8 +7,8 @@ import (
"github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus"
) )
var numberRegisteredMemberships = prometheus.NewCounter( var numberRegisteredMemberships = prometheus.NewGauge(
prometheus.CounterOpts{ prometheus.GaugeOpts{
Name: "waku_rln_number_registered_memberships", Name: "waku_rln_number_registered_memberships",
Help: "number of registered and active rln memberships", Help: "number of registered and active rln memberships",
}) })
@ -33,7 +33,7 @@ var collectors = []prometheus.Collector{
// Metrics exposes the functions required to update prometheus metrics for lightpush protocol // Metrics exposes the functions required to update prometheus metrics for lightpush protocol
type Metrics interface { type Metrics interface {
RecordRegisteredMembership(num int) RecordRegisteredMembership(num uint)
RecordMembershipInsertionDuration(duration time.Duration) RecordMembershipInsertionDuration(duration time.Duration)
RecordMembershipCredentialsImportDuration(duration time.Duration) RecordMembershipCredentialsImportDuration(duration time.Duration)
} }
@ -60,10 +60,6 @@ func (m *metricsImpl) RecordMembershipCredentialsImportDuration(duration time.Du
} }
// RecordRegisteredMembership records the number of registered memberships // RecordRegisteredMembership records the number of registered memberships
func (m *metricsImpl) RecordRegisteredMembership(num int) { func (m *metricsImpl) RecordRegisteredMembership(num uint) {
if num < 0 { numberRegisteredMemberships.Set(float64(num))
return
}
numberRegisteredMemberships.Add(float64(num))
} }

View File

@ -11,6 +11,7 @@ type GroupManager interface {
IdentityCredentials() (rln.IdentityCredential, error) IdentityCredentials() (rln.IdentityCredential, error)
MembershipIndex() rln.MembershipIndex MembershipIndex() rln.MembershipIndex
Stop() error Stop() error
IsReady(ctx context.Context) (bool, error)
} }
type Details struct { type Details struct {

View File

@ -95,3 +95,7 @@ func (gm *StaticGroupManager) Stop() error {
// Do nothing // Do nothing
return nil return nil
} }
func (gm *StaticGroupManager) IsReady(ctx context.Context) (bool, error) {
return true, nil
}

View File

@ -293,3 +293,8 @@ func (rlnRelay *WakuRLNRelay) IdentityCredential() (rln.IdentityCredential, erro
func (rlnRelay *WakuRLNRelay) MembershipIndex() uint { func (rlnRelay *WakuRLNRelay) MembershipIndex() uint {
return rlnRelay.GroupManager.MembershipIndex() return rlnRelay.GroupManager.MembershipIndex()
} }
// IsReady returns true if the RLN Relay protocol is ready to relay messages
func (rlnRelay *WakuRLNRelay) IsReady(ctx context.Context) (bool, error) {
return rlnRelay.GroupManager.IsReady(ctx)
}

View File

@ -31,6 +31,7 @@ type RLNContract struct {
// EthClient is an interface for the ethclient.Client, so that we can pass mock client for testing // EthClient is an interface for the ethclient.Client, so that we can pass mock client for testing
type EthClient interface { type EthClient interface {
bind.ContractBackend bind.ContractBackend
SyncProgress(ctx context.Context) (*ethereum.SyncProgress, error)
TransactionReceipt(ctx context.Context, txHash common.Hash) (*types.Receipt, error) TransactionReceipt(ctx context.Context, txHash common.Hash) (*types.Receipt, error)
BlockByNumber(ctx context.Context, number *big.Int) (*types.Block, error) BlockByNumber(ctx context.Context, number *big.Int) (*types.Block, error)
SubscribeNewHead(ctx context.Context, ch chan<- *types.Header) (ethereum.Subscription, error) SubscribeNewHead(ctx context.Context, ch chan<- *types.Header) (ethereum.Subscription, error)