fix: start go-waku on Start() instead of New()

This commit is contained in:
Richard Ramos 2023-04-14 12:08:06 -04:00 committed by RichΛrd
parent 4ef5eba6b5
commit 63d9a9b3d5
1 changed files with 86 additions and 80 deletions

View File

@ -90,6 +90,7 @@ type settings struct {
DiscoveryLimit int // Indicates the number of nodes to discover DiscoveryLimit int // Indicates the number of nodes to discover
Nameserver string // Optional nameserver to use for dns discovery Nameserver string // Optional nameserver to use for dns discovery
EnableDiscV5 bool // Indicates whether discv5 is enabled or not EnableDiscV5 bool // Indicates whether discv5 is enabled or not
Options []node.WakuNodeOption
} }
// Waku represents a dark communication interface through the Ethereum // Waku represents a dark communication interface through the Ethereum
@ -120,6 +121,7 @@ type Waku struct {
quit chan struct{} // Channel used for graceful exit quit chan struct{} // Channel used for graceful exit
wg sync.WaitGroup wg sync.WaitGroup
cfg *Config
settings settings // Holds configuration settings that can be dynamically changed settings settings // Holds configuration settings that can be dynamically changed
settingsMu sync.RWMutex // Mutex to sync the settings access settingsMu sync.RWMutex // Mutex to sync the settings access
@ -128,6 +130,7 @@ type Waku struct {
storeMsgIDs map[gethcommon.Hash]bool // Map of the currently processing ids storeMsgIDs map[gethcommon.Hash]bool // Map of the currently processing ids
storeMsgIDsMu sync.RWMutex storeMsgIDsMu sync.RWMutex
connStatusChan chan node.ConnStatus
connStatusSubscriptions map[string]*types.ConnStatusSubscription connStatusSubscriptions map[string]*types.ConnStatusSubscription
connStatusMu sync.Mutex connStatusMu sync.Mutex
@ -189,12 +192,14 @@ func New(nodeKey string, fleet string, cfg *Config, logger *zap.Logger, appDB *s
waku := &Waku{ waku := &Waku{
appDB: appDB, appDB: appDB,
cfg: cfg,
privateKeys: make(map[string]*ecdsa.PrivateKey), privateKeys: make(map[string]*ecdsa.PrivateKey),
symKeys: make(map[string][]byte), symKeys: make(map[string][]byte),
envelopes: make(map[gethcommon.Hash]*common.ReceivedMessage), envelopes: make(map[gethcommon.Hash]*common.ReceivedMessage),
expirations: make(map[uint32]mapset.Set), expirations: make(map[uint32]mapset.Set),
msgQueue: make(chan *common.ReceivedMessage, messageQueueLimit), msgQueue: make(chan *common.ReceivedMessage, messageQueueLimit),
sendQueue: make(chan *protocol.Envelope, 1000), sendQueue: make(chan *protocol.Envelope, 1000),
connStatusChan: make(chan node.ConnStatus, 100),
connStatusSubscriptions: make(map[string]*types.ConnStatusSubscription), connStatusSubscriptions: make(map[string]*types.ConnStatusSubscription),
quit: make(chan struct{}), quit: make(chan struct{}),
connectionChanged: make(chan struct{}), connectionChanged: make(chan struct{}),
@ -244,10 +249,6 @@ func New(nodeKey string, fleet string, cfg *Config, logger *zap.Logger, appDB *s
return nil, fmt.Errorf("failed to setup the network interface: %v", err) return nil, fmt.Errorf("failed to setup the network interface: %v", err)
} }
ctx := context.Background()
connStatusChan := make(chan node.ConnStatus, 100)
if cfg.KeepAliveInterval == 0 { if cfg.KeepAliveInterval == 0 {
cfg.KeepAliveInterval = DefaultConfig.KeepAliveInterval cfg.KeepAliveInterval = DefaultConfig.KeepAliveInterval
} }
@ -265,14 +266,14 @@ func New(nodeKey string, fleet string, cfg *Config, logger *zap.Logger, appDB *s
node.WithLibP2POptions(libp2pOpts...), node.WithLibP2POptions(libp2pOpts...),
node.WithPrivateKey(privateKey), node.WithPrivateKey(privateKey),
node.WithHostAddress(hostAddr), node.WithHostAddress(hostAddr),
node.WithConnectionStatusChannel(connStatusChan), node.WithConnectionStatusChannel(waku.connStatusChan),
node.WithKeepAlive(time.Duration(cfg.KeepAliveInterval) * time.Second), node.WithKeepAlive(time.Duration(cfg.KeepAliveInterval) * time.Second),
node.WithDiscoverParams(cfg.DiscoveryLimit), node.WithDiscoverParams(cfg.DiscoveryLimit),
node.WithLogger(logger), node.WithLogger(logger),
} }
if cfg.EnableDiscV5 { if cfg.EnableDiscV5 {
bootnodes, err := waku.getDiscV5BootstrapNodes(ctx, cfg.DiscV5BootstrapNodes) bootnodes, err := waku.getDiscV5BootstrapNodes(context.Background(), cfg.DiscV5BootstrapNodes)
if err != nil { if err != nil {
logger.Error("failed to get bootstrap nodes", zap.Error(err)) logger.Error("failed to get bootstrap nodes", zap.Error(err))
return nil, err return nil, err
@ -306,80 +307,7 @@ func New(nodeKey string, fleet string, cfg *Config, logger *zap.Logger, appDB *s
opts = append(opts, node.WithMessageProvider(dbStore)) opts = append(opts, node.WithMessageProvider(dbStore))
} }
if waku.node, err = node.New(opts...); err != nil { waku.settings.Options = opts
return nil, fmt.Errorf("failed to create a go-waku node: %v", err)
}
idService, err := identify.NewIDService(waku.node.Host())
if err != nil {
return nil, err
}
waku.identifyService = idService
if err = waku.node.Start(ctx); err != nil {
return nil, fmt.Errorf("failed to start go-waku node: %v", err)
}
if err = waku.addWakuV2Peers(ctx, cfg); err != nil {
return nil, fmt.Errorf("failed to add wakuv2 peers: %v", err)
}
if cfg.EnableDiscV5 {
err := waku.node.DiscV5().Start(ctx)
if err != nil {
return nil, err
}
}
waku.wg.Add(4)
go func() {
defer waku.wg.Done()
isConnected := false
for {
select {
case <-waku.quit:
return
case c := <-connStatusChan:
waku.connStatusMu.Lock()
latestConnStatus := formatConnStatus(waku.node, c)
for k, subs := range waku.connStatusSubscriptions {
if subs.Active() {
subs.C <- latestConnStatus
} else {
delete(waku.connStatusSubscriptions, k)
}
}
waku.connStatusMu.Unlock()
signal.SendPeerStats(latestConnStatus)
if cfg.EnableDiscV5 {
// Restarting DiscV5
if !latestConnStatus.IsOnline && isConnected {
waku.logger.Debug("Restarting DiscV5: offline and is connected")
isConnected = false
waku.node.DiscV5().Stop()
} else if latestConnStatus.IsOnline && !isConnected {
waku.logger.Debug("Restarting DiscV5: online and is not connected")
isConnected = true
if !waku.node.DiscV5().IsStarted() {
err := waku.node.DiscV5().Start(ctx)
if err != nil {
waku.logger.Error("Could not start DiscV5", zap.Error(err))
}
}
}
}
}
}
}()
go waku.telemetryBandwidthStats(cfg.TelemetryServerURL)
go waku.runFilterMsgLoop()
go waku.runRelayMsgLoop()
go waku.runPeerExchangeLoop()
waku.logger.Info("setup the go-waku node successfully") waku.logger.Info("setup the go-waku node successfully")
return waku, nil return waku, nil
@ -1192,6 +1120,84 @@ func (w *Waku) Query(ctx context.Context, peerID peer.ID, topics []common.TopicT
// Start implements node.Service, starting the background data propagation thread // Start implements node.Service, starting the background data propagation thread
// of the Waku protocol. // of the Waku protocol.
func (w *Waku) Start() error { func (w *Waku) Start() error {
var err error
if w.node, err = node.New(w.settings.Options...); err != nil {
return fmt.Errorf("failed to create a go-waku node: %v", err)
}
idService, err := identify.NewIDService(w.node.Host())
if err != nil {
return err
}
w.identifyService = idService
ctx := context.Background()
if err = w.node.Start(ctx); err != nil {
return fmt.Errorf("failed to start go-waku node: %v", err)
}
if err = w.addWakuV2Peers(ctx, w.cfg); err != nil {
return fmt.Errorf("failed to add wakuv2 peers: %v", err)
}
if w.cfg.EnableDiscV5 {
err := w.node.DiscV5().Start(ctx)
if err != nil {
return err
}
}
w.wg.Add(4)
go func() {
defer w.wg.Done()
isConnected := false
for {
select {
case <-w.quit:
return
case c := <-w.connStatusChan:
w.connStatusMu.Lock()
latestConnStatus := formatConnStatus(w.node, c)
for k, subs := range w.connStatusSubscriptions {
if subs.Active() {
subs.C <- latestConnStatus
} else {
delete(w.connStatusSubscriptions, k)
}
}
w.connStatusMu.Unlock()
signal.SendPeerStats(latestConnStatus)
if w.cfg.EnableDiscV5 {
// Restarting DiscV5
if !latestConnStatus.IsOnline && isConnected {
w.logger.Debug("Restarting DiscV5: offline and is connected")
isConnected = false
w.node.DiscV5().Stop()
} else if latestConnStatus.IsOnline && !isConnected {
w.logger.Debug("Restarting DiscV5: online and is not connected")
isConnected = true
if !w.node.DiscV5().IsStarted() {
err := w.node.DiscV5().Start(ctx)
if err != nil {
w.logger.Error("Could not start DiscV5", zap.Error(err))
}
}
}
}
}
}
}()
go w.telemetryBandwidthStats(w.cfg.TelemetryServerURL)
go w.runFilterMsgLoop()
go w.runRelayMsgLoop()
go w.runPeerExchangeLoop()
numCPU := runtime.NumCPU() numCPU := runtime.NumCPU()
for i := 0; i < numCPU; i++ { for i := 0; i < numCPU; i++ {
go w.processQueue() go w.processQueue()