refactor: dynamic rln

This commit is contained in:
Richard Ramos 2023-04-04 17:02:12 -04:00 committed by RichΛrd
parent 6747603a73
commit 04c90657cd
4 changed files with 221 additions and 225 deletions

View File

@ -6,9 +6,9 @@ package node
import (
"bytes"
"context"
"encoding/hex"
"errors"
"github.com/waku-org/go-waku/waku/v2/protocol/rln"
"github.com/waku-org/go-waku/waku/v2/protocol/rln/group_manager/dynamic"
"github.com/waku-org/go-waku/waku/v2/protocol/rln/group_manager/static"
r "github.com/waku-org/go-zerokit-rln/rln"
"go.uber.org/zap"
@ -39,33 +39,61 @@ func (w *WakuNode) mountRlnRelay(ctx context.Context) error {
return errors.New("relay protocol does not support the configured pubsub topic")
}
var err error
var groupManager rln.GroupManager
if !w.opts.rlnRelayDynamic {
w.log.Info("setting up waku-rln-relay in off-chain mode")
// set up rln relay inputs
groupKeys, idCredential, err := static.Setup(w.opts.rlnRelayMemIndex)
if err != nil {
return err
}
// rlnrelay in off-chain mode with a static group of user
groupManager, err := static.NewStaticGroupManager(groupKeys, idCredential, w.opts.rlnRelayMemIndex, w.log)
groupManager, err = static.NewStaticGroupManager(groupKeys, idCredential, w.opts.rlnRelayMemIndex, w.log)
if err != nil {
return err
}
} else {
w.log.Info("setting up waku-rln-relay in on-chain mode")
rlnRelay, err := rln.New(w.Relay(), groupManager, w.opts.rlnRelayPubsubTopic, w.opts.rlnRelayContentTopic, w.opts.rlnSpamHandler, w.timesource, w.log)
// check if the peer has provided its rln credentials
var memKeyPair *r.IdentityCredential
if w.opts.rlnRelayIDCommitment != nil && w.opts.rlnRelayIDKey != nil {
memKeyPair = &r.IdentityCredential{
IDCommitment: *w.opts.rlnRelayIDCommitment,
IDSecretHash: *w.opts.rlnRelayIDKey,
}
}
groupManager, err = dynamic.NewDynamicGroupManager(
w.opts.rlnETHClientAddress,
w.opts.rlnETHPrivateKey,
w.opts.rlnMembershipContractAddress,
memKeyPair,
w.opts.rlnRelayMemIndex,
w.opts.rlnRegistrationHandler,
w.log,
)
if err != nil {
return err
}
}
err = rlnRelay.Start(ctx)
if err != nil {
return err
}
rlnRelay, err := rln.New(w.Relay(), groupManager, w.opts.rlnRelayPubsubTopic, w.opts.rlnRelayContentTopic, w.opts.rlnSpamHandler, w.timesource, w.log)
if err != nil {
return err
}
w.rlnRelay = rlnRelay
err = rlnRelay.Start(ctx)
if err != nil {
return err
}
w.rlnRelay = rlnRelay
if !w.opts.rlnRelayDynamic {
// check the correct construction of the tree by comparing the calculated root against the expected root
// no error should happen as it is already captured in the unit tests
root, err := rlnRelay.RLN.GetMerkleRoot()
@ -81,26 +109,6 @@ func (w *WakuNode) mountRlnRelay(ctx context.Context) error {
if !bytes.Equal(expectedRoot[:], root[:]) {
return errors.New("root mismatch: something went wrong not in Merkle tree construction")
}
w.log.Debug("the calculated root", zap.String("root", hex.EncodeToString(root[:])))
} else {
w.log.Info("setting up waku-rln-relay in on-chain mode")
/*// check if the peer has provided its rln credentials
var memKeyPair *r.IdentityCredential
if w.opts.rlnRelayIDCommitment != nil && w.opts.rlnRelayIDKey != nil {
memKeyPair = &r.IdentityCredential{
IDCommitment: *w.opts.rlnRelayIDCommitment,
IDSecretHash: *w.opts.rlnRelayIDKey,
}
}
// mount the rln relay protocol in the on-chain/dynamic mode
var err error
w.rlnRelay, err = rln.RlnRelayDynamic(ctx, w.Relay(), w.opts.rlnETHClientAddress, w.opts.rlnETHPrivateKey, w.opts.rlnMembershipContractAddress, memKeyPair, w.opts.rlnRelayMemIndex, w.opts.rlnRelayPubsubTopic, w.opts.rlnRelayContentTopic, w.opts.rlnSpamHandler, w.opts.rlnRegistrationHandler, w.timesource, w.log)
if err != nil {
return err
}*/
}
w.log.Info("mounted waku RLN relay", zap.String("pubsubTopic", w.opts.rlnRelayPubsubTopic), zap.String("contentTopic", w.opts.rlnRelayContentTopic))

View File

@ -0,0 +1,158 @@
package dynamic
import (
"context"
"crypto/ecdsa"
"errors"
"sync"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/ethclient"
"github.com/waku-org/go-waku/waku/v2/protocol/rln/group_manager"
"github.com/waku-org/go-zerokit-rln/rln"
r "github.com/waku-org/go-zerokit-rln/rln"
"go.uber.org/zap"
)
type DynamicGroupManager struct {
rln *rln.RLN
log *zap.Logger
cancel context.CancelFunc
wg sync.WaitGroup
identityCredential *rln.IdentityCredential
membershipIndex *rln.MembershipIndex
membershipContractAddress common.Address
ethClientAddress string
ethClient *ethclient.Client
// ethAccountPrivateKey is required for signing transactions
// TODO may need to erase this ethAccountPrivateKey when is not used
// TODO may need to make ethAccountPrivateKey mandatory
ethAccountPrivateKey *ecdsa.PrivateKey
registrationHandler RegistrationHandler
lastIndexLoaded int64
rootTracker *group_manager.MerkleRootTracker
}
type RegistrationHandler = func(tx *types.Transaction)
func NewDynamicGroupManager(
ethClientAddr string,
ethAccountPrivateKey *ecdsa.PrivateKey,
memContractAddr common.Address,
identityCredential *rln.IdentityCredential,
index rln.MembershipIndex,
registrationHandler RegistrationHandler,
log *zap.Logger,
) (*DynamicGroupManager, error) {
return &DynamicGroupManager{
identityCredential: identityCredential,
membershipIndex: &index,
membershipContractAddress: memContractAddr,
ethClientAddress: ethClientAddr,
ethAccountPrivateKey: ethAccountPrivateKey,
registrationHandler: registrationHandler,
lastIndexLoaded: -1,
}, nil
}
func (gm *DynamicGroupManager) Start(ctx context.Context, rlnInstance *rln.RLN, rootTracker *group_manager.MerkleRootTracker) error {
if gm.cancel != nil {
return errors.New("already started")
}
ctx, cancel := context.WithCancel(ctx)
gm.cancel = cancel
gm.log.Info("mounting rln-relay in on-chain/dynamic mode")
gm.rln = rlnInstance
gm.rootTracker = rootTracker
err := rootTracker.Sync()
if err != nil {
return err
}
// prepare rln membership key pair
if gm.identityCredential == nil && gm.ethAccountPrivateKey != nil {
gm.log.Debug("no rln-relay key is provided, generating one")
identityCredential, err := rlnInstance.MembershipKeyGen()
if err != nil {
return err
}
gm.identityCredential = identityCredential
// register the rln-relay peer to the membership contract
membershipIndex, err := gm.Register(ctx)
if err != nil {
return err
}
gm.membershipIndex = membershipIndex
gm.log.Info("registered peer into the membership contract")
}
handler := func(pubkey r.IDCommitment, index r.MembershipIndex) error {
return gm.InsertMember(pubkey)
}
errChan := make(chan error)
gm.wg.Add(1)
go gm.HandleGroupUpdates(ctx, handler, errChan)
err = <-errChan
if err != nil {
return err
}
return nil
}
func (gm *DynamicGroupManager) InsertMember(pubkey rln.IDCommitment) error {
gm.log.Debug("a new key is added", zap.Binary("pubkey", pubkey[:]))
// assuming all the members arrive in order
err := gm.rln.InsertMember(pubkey)
if err != nil {
gm.log.Error("inserting member into merkletree", zap.Error(err))
return err
}
err = gm.rootTracker.Sync()
if err != nil {
return err
}
return nil
}
func (gm *DynamicGroupManager) IdentityCredentials() (rln.IdentityCredential, error) {
if gm.identityCredential == nil {
return rln.IdentityCredential{}, errors.New("identity credential has not been setup")
}
return *gm.identityCredential, nil
}
func (gm *DynamicGroupManager) MembershipIndex() (rln.MembershipIndex, error) {
if gm.membershipIndex == nil {
return 0, errors.New("membership index has not been setup")
}
return *gm.membershipIndex, nil
}
func (gm *DynamicGroupManager) Stop() {
if gm.cancel == nil {
return
}
gm.cancel()
gm.wg.Wait()
}

View File

@ -1,4 +1,4 @@
package rln
package dynamic
import (
"context"
@ -99,15 +99,15 @@ func register(ctx context.Context, idComm r.IDCommitment, ethAccountPrivateKey *
// Register registers the public key of the rlnPeer which is rlnPeer.membershipKeyPair.publicKey
// into the membership contract whose address is in rlnPeer.membershipContractAddress
func (rln *WakuRLNRelay) Register(ctx context.Context) (*r.MembershipIndex, error) {
pk := rln.membershipKeyPair.IDCommitment
return register(ctx, pk, rln.ethAccountPrivateKey, rln.ethClientAddress, rln.membershipContractAddress, rln.registrationHandler, rln.log)
func (gm *DynamicGroupManager) Register(ctx context.Context) (*r.MembershipIndex, error) {
pk := gm.identityCredential.IDCommitment
return register(ctx, pk, gm.ethAccountPrivateKey, gm.ethClientAddress, gm.membershipContractAddress, gm.registrationHandler, gm.log)
}
// the types of inputs to this handler matches the MemberRegistered event/proc defined in the MembershipContract interface
type RegistrationEventHandler = func(pubkey r.IDCommitment, index r.MembershipIndex) error
func (rln *WakuRLNRelay) processLogs(evt *contracts.RLNMemberRegistered, handler RegistrationEventHandler) error {
func (gm *DynamicGroupManager) processLogs(evt *contracts.RLNMemberRegistered, handler RegistrationEventHandler) error {
if evt == nil {
return nil
}
@ -115,41 +115,43 @@ func (rln *WakuRLNRelay) processLogs(evt *contracts.RLNMemberRegistered, handler
var pubkey r.IDCommitment = r.Bytes32(evt.Pubkey.Bytes())
index := evt.Index.Int64()
if index <= rln.lastIndexLoaded {
if index <= gm.lastIndexLoaded {
return nil
}
rln.lastIndexLoaded = index
gm.lastIndexLoaded = index
return handler(pubkey, r.MembershipIndex(uint(evt.Index.Int64())))
}
// HandleGroupUpdates mounts the supplied handler for the registration events emitting from the membership contract
// It connects to the eth client, subscribes to the `MemberRegistered` event emitted from the `MembershipContract`
// and collects all the events, for every received event, it calls the `handler`
func (rln *WakuRLNRelay) HandleGroupUpdates(handler RegistrationEventHandler) error {
backend, err := ethclient.Dial(rln.ethClientAddress)
func (gm *DynamicGroupManager) HandleGroupUpdates(ctx context.Context, handler RegistrationEventHandler) error {
defer gm.wg.Done()
backend, err := ethclient.Dial(gm.ethClientAddress)
if err != nil {
return err
}
rln.ethClient = backend
gm.ethClient = backend
rlnContract, err := contracts.NewRLN(rln.membershipContractAddress, backend)
rlnContract, err := contracts.NewRLN(gm.membershipContractAddress, backend)
if err != nil {
return err
}
err = rln.loadOldEvents(rlnContract, handler)
err = gm.loadOldEvents(ctx, rlnContract, handler)
if err != nil {
return err
}
errCh := make(chan error)
go rln.watchNewEvents(rlnContract, handler, rln.log, errCh)
go gm.watchNewEvents(ctx, rlnContract, handler, gm.log, errCh)
return <-errCh
}
func (rln *WakuRLNRelay) loadOldEvents(rlnContract *contracts.RLN, handler RegistrationEventHandler) error {
logIterator, err := rlnContract.FilterMemberRegistered(&bind.FilterOpts{Start: 0, End: nil, Context: rln.ctx})
func (gm *DynamicGroupManager) loadOldEvents(ctx context.Context, rlnContract *contracts.RLN, handler RegistrationEventHandler) error {
logIterator, err := rlnContract.FilterMemberRegistered(&bind.FilterOpts{Start: 0, End: nil, Context: ctx})
if err != nil {
return err
}
@ -162,7 +164,7 @@ func (rln *WakuRLNRelay) loadOldEvents(rlnContract *contracts.RLN, handler Regis
return logIterator.Error()
}
err = rln.processLogs(logIterator.Event, handler)
err = gm.processLogs(logIterator.Event, handler)
if err != nil {
return err
}
@ -170,13 +172,13 @@ func (rln *WakuRLNRelay) loadOldEvents(rlnContract *contracts.RLN, handler Regis
return nil
}
func (rln *WakuRLNRelay) watchNewEvents(rlnContract *contracts.RLN, handler RegistrationEventHandler, log *zap.Logger, errCh chan<- error) {
func (gm *DynamicGroupManager) watchNewEvents(ctx context.Context, rlnContract *contracts.RLN, handler RegistrationEventHandler, log *zap.Logger, errCh chan<- error) {
// Watch for new events
logSink := make(chan *contracts.RLNMemberRegistered)
firstErr := true
subs := event.Resubscribe(2*time.Second, func(ctx context.Context) (event.Subscription, error) {
subs, err := rlnContract.WatchMemberRegistered(&bind.WatchOpts{Context: rln.ctx, Start: nil}, logSink)
subs, err := rlnContract.WatchMemberRegistered(&bind.WatchOpts{Context: ctx, Start: nil}, logSink)
if err != nil {
if err == rpc.ErrNotificationsUnsupported {
err = errors.New("notifications not supported. The node must support websockets")
@ -184,7 +186,7 @@ func (rln *WakuRLNRelay) watchNewEvents(rlnContract *contracts.RLN, handler Regi
if firstErr {
errCh <- err
}
rln.log.Error("subscribing to rln events", zap.Error(err))
gm.log.Error("subscribing to rln events", zap.Error(err))
}
firstErr = false
close(errCh)
@ -197,15 +199,15 @@ func (rln *WakuRLNRelay) watchNewEvents(rlnContract *contracts.RLN, handler Regi
for {
select {
case evt := <-logSink:
err := rln.processLogs(evt, handler)
err := gm.processLogs(evt, handler)
if err != nil {
rln.log.Error("processing rln log", zap.Error(err))
gm.log.Error("processing rln log", zap.Error(err))
}
case <-rln.ctx.Done():
case <-ctx.Done():
return
case err := <-subs.Err():
if err != nil {
rln.log.Error("watching new events", zap.Error(err))
gm.log.Error("watching new events", zap.Error(err))
}
return
}

View File

@ -1,172 +0,0 @@
package rln
import (
"context"
"crypto/ecdsa"
"errors"
"github.com/ethereum/go-ethereum/common"
"github.com/waku-org/go-waku/waku/v2/protocol/relay"
"github.com/waku-org/go-waku/waku/v2/timesource"
r "github.com/waku-org/go-zerokit-rln/rln"
"go.uber.org/zap"
)
func RlnRelayStatic(
ctx context.Context,
relay *relay.WakuRelay,
group []r.IDCommitment,
memKeyPair r.IdentityCredential,
memIndex r.MembershipIndex,
pubsubTopic string,
contentTopic string,
spamHandler SpamHandler,
timesource timesource.Timesource,
log *zap.Logger,
) (*WakuRLNRelay, error) {
log = log.Named("rln-static")
log.Info("mounting rln-relay in off-chain/static mode")
// check the peer's index and the inclusion of user's identity commitment in the group
if memKeyPair.IDCommitment != group[int(memIndex)] {
return nil, errors.New("peer's IDCommitment does not match commitment in group")
}
rlnInstance, err := r.NewRLN()
if err != nil {
return nil, err
}
// create the WakuRLNRelay
rlnPeer := &WakuRLNRelay{
ctx: ctx,
membershipKeyPair: &memKeyPair,
membershipIndex: memIndex,
RLN: rlnInstance,
pubsubTopic: pubsubTopic,
contentTopic: contentTopic,
log: log,
timesource: timesource,
nullifierLog: make(map[r.Nullifier][]r.ProofMetadata),
}
root, err := rlnPeer.RLN.GetMerkleRoot()
if err != nil {
return nil, err
}
rlnPeer.validMerkleRoots = append(rlnPeer.validMerkleRoots, root)
// add members to the Merkle tree
for _, member := range group {
if err := rlnPeer.insertMember(member); err != nil {
return nil, err
}
}
// adds a topic validator for the supplied pubsub topic at the relay protocol
// messages published on this pubsub topic will be relayed upon a successful validation, otherwise they will be dropped
// the topic validator checks for the correct non-spamming proof of the message
err = rlnPeer.addValidator(relay, pubsubTopic, contentTopic, spamHandler)
if err != nil {
return nil, err
}
log.Info("rln relay topic validator mounted", zap.String("pubsubTopic", pubsubTopic), zap.String("contentTopic", contentTopic))
return rlnPeer, nil
}
func RlnRelayDynamic(
ctx context.Context,
relay *relay.WakuRelay,
ethClientAddr string,
ethAccountPrivateKey *ecdsa.PrivateKey,
memContractAddr common.Address,
memKeyPair *r.IdentityCredential,
memIndex r.MembershipIndex,
pubsubTopic string,
contentTopic string,
spamHandler SpamHandler,
registrationHandler RegistrationHandler,
timesource timesource.Timesource,
log *zap.Logger,
) (*WakuRLNRelay, error) {
log = log.Named("rln-dynamic")
log.Info("mounting rln-relay in onchain/dynamic mode")
rlnInstance, err := r.NewRLN()
if err != nil {
return nil, err
}
// create the WakuRLNRelay
rlnPeer := &WakuRLNRelay{
ctx: ctx,
membershipIndex: memIndex,
membershipContractAddress: memContractAddr,
ethClientAddress: ethClientAddr,
ethAccountPrivateKey: ethAccountPrivateKey,
RLN: rlnInstance,
pubsubTopic: pubsubTopic,
contentTopic: contentTopic,
log: log,
timesource: timesource,
nullifierLog: make(map[r.Nullifier][]r.ProofMetadata),
registrationHandler: registrationHandler,
lastIndexLoaded: -1,
}
root, err := rlnPeer.RLN.GetMerkleRoot()
if err != nil {
return nil, err
}
rlnPeer.validMerkleRoots = append(rlnPeer.validMerkleRoots, root)
// prepare rln membership key pair
if memKeyPair == nil && ethAccountPrivateKey != nil {
log.Debug("no rln-relay key is provided, generating one")
memKeyPair, err = rlnInstance.MembershipKeyGen()
if err != nil {
return nil, err
}
rlnPeer.membershipKeyPair = memKeyPair
// register the rln-relay peer to the membership contract
membershipIndex, err := rlnPeer.Register(ctx)
if err != nil {
return nil, err
}
rlnPeer.membershipIndex = *membershipIndex
log.Info("registered peer into the membership contract")
} else if memKeyPair != nil {
rlnPeer.membershipKeyPair = memKeyPair
}
handler := func(pubkey r.IDCommitment, index r.MembershipIndex) error {
return rlnPeer.insertMember(pubkey)
}
if err = rlnPeer.HandleGroupUpdates(handler); err != nil {
return nil, err
}
// adds a topic validator for the supplied pubsub topic at the relay protocol
// messages published on this pubsub topic will be relayed upon a successful validation, otherwise they will be dropped
// the topic validator checks for the correct non-spamming proof of the message
err = rlnPeer.addValidator(relay, pubsubTopic, contentTopic, spamHandler)
if err != nil {
return nil, err
}
log.Info("rln relay topic validator mounted", zap.String("pubsubTopic", pubsubTopic), zap.String("contentTopic", contentTopic))
return rlnPeer, nil
}