diff --git a/eth/filters/filter_system.go b/eth/filters/filter_system.go index e08cedb274..a234709c63 100644 --- a/eth/filters/filter_system.go +++ b/eth/filters/filter_system.go @@ -109,7 +109,11 @@ func NewEventSystem(mux *event.TypeMux, backend Backend, lightMode bool) *EventS uninstall: make(chan *subscription), } - go m.eventLoop() + eventLoopInit := sync.WaitGroup{} + eventLoopInit.Add(1) + + go m.eventLoop(&eventLoopInit) + eventLoopInit.Wait() // Wait until eventLoop is properly initialized return m } @@ -393,23 +397,33 @@ func (es *EventSystem) lightFilterLogs(header *types.Header, addresses []common. } // eventLoop (un)installs filters and processes mux events. -func (es *EventSystem) eventLoop() { +func (es *EventSystem) eventLoop(initWaitGroup *sync.WaitGroup) { var ( - index = make(filterIndex) - sub = es.mux.Subscribe(core.PendingLogsEvent{}) + sub *event.TypeMuxSubscription + txSub, rmLogsSub, logsSub, chainEvSub event.Subscription + + index = make(filterIndex) + txCh = make(chan core.TxPreEvent, txChanSize) + rmLogsCh = make(chan core.RemovedLogsEvent, rmLogsChanSize) + logsCh = make(chan []*types.Log, logsChanSize) + chainEvCh = make(chan core.ChainEvent, chainEvChanSize) + ) + + // To guarantee that `initWaitGroup.Done()` is called even if any of the subscriptions + // will panic. Otherwise, it will deadlock `NewEventSystem`. + func() { + defer initWaitGroup.Done() + + sub = es.mux.Subscribe(core.PendingLogsEvent{}) // Subscribe TxPreEvent form txpool - txCh = make(chan core.TxPreEvent, txChanSize) txSub = es.backend.SubscribeTxPreEvent(txCh) // Subscribe RemovedLogsEvent - rmLogsCh = make(chan core.RemovedLogsEvent, rmLogsChanSize) rmLogsSub = es.backend.SubscribeRemovedLogsEvent(rmLogsCh) // Subscribe []*types.Log - logsCh = make(chan []*types.Log, logsChanSize) logsSub = es.backend.SubscribeLogsEvent(logsCh) // Subscribe ChainEvent - chainEvCh = make(chan core.ChainEvent, chainEvChanSize) chainEvSub = es.backend.SubscribeChainEvent(chainEvCh) - ) + }() // Unsubscribe all events defer sub.Unsubscribe()