status-go/services/ext/mailrequests.go
frank 38308d48f2
feat_: log on panic (#5849)
* feat_: log error and stacktrace when panic in goroutine

* test_: add test TestSafeGo

* chore_: rename logAndCall to call

* chore_: rename SafeGo to Go

* chore_: make lint-fix

* chore_: use t.Cleanup

* chore_: Revert "chore_: use t.Cleanup"

This reverts commit 4eb420d179cc0e208e84c13cb941e6b3d1ed9819.

* chore_: Revert "chore_: make lint-fix"

This reverts commit fcc995f157e671a4229b47419c3a0e4004b5fdab.

* chore_: Revert "chore_: rename SafeGo to Go"

This reverts commit a6d73d6df583f313032d79aac62f66328039cb55.

* chore_: Revert "chore_: rename logAndCall to call"

This reverts commit 8fbe993bedb9fbba67349a44f151e2dd5e3bc4cc.

* chore_: Revert "test_: add test TestSafeGo"

This reverts commit a1fa91839f3960398980c6bf456e6462ec944819.

* chore_: Revert "feat_: log error and stacktrace when panic in goroutine"

This reverts commit f612dd828fa2ce410d0e806fe773ecbe3e86a68a.

* feat_: log error and stacktrace when panic in goroutine

* chore_: make lint-fix

* chore_: rename logAndCall to call

* chore_: renaming LogOnPanic

* chore_: update rest goroutine function calls

* chore_: make lint-fix
2024-09-27 06:37:32 +08:00

139 lines
3.6 KiB
Go

package ext
import (
"sync"
"github.com/ethereum/go-ethereum/log"
"github.com/status-im/status-go/common"
"github.com/status-im/status-go/eth-node/types"
"github.com/status-im/status-go/services/ext/mailservers"
)
// EnvelopeState in local tracker
type EnvelopeState int
const (
// NotRegistered returned if asked hash wasn't registered in the tracker.
NotRegistered EnvelopeState = -1
// MailServerRequestSent is set when p2p request is sent to the mailserver
MailServerRequestSent
)
// MailRequestMonitor is responsible for monitoring history request to mailservers.
type MailRequestMonitor struct {
eventSub mailservers.EnvelopeEventSubscriber
handler EnvelopeEventsHandler
mu sync.Mutex
cache map[types.Hash]EnvelopeState
requestsRegistry *RequestsRegistry
wg sync.WaitGroup
quit chan struct{}
}
func NewMailRequestMonitor(eventSub mailservers.EnvelopeEventSubscriber, h EnvelopeEventsHandler, reg *RequestsRegistry) *MailRequestMonitor {
return &MailRequestMonitor{
eventSub: eventSub,
handler: h,
cache: make(map[types.Hash]EnvelopeState),
requestsRegistry: reg,
}
}
// Start processing events.
func (m *MailRequestMonitor) Start() {
m.quit = make(chan struct{})
m.wg.Add(1)
go func() {
defer common.LogOnPanic()
m.handleEnvelopeEvents()
m.wg.Done()
}()
}
// Stop process events.
func (m *MailRequestMonitor) Stop() {
close(m.quit)
m.wg.Wait()
}
func (m *MailRequestMonitor) GetState(hash types.Hash) EnvelopeState {
m.mu.Lock()
defer m.mu.Unlock()
state, exist := m.cache[hash]
if !exist {
return NotRegistered
}
return state
}
// handleEnvelopeEvents processes whisper envelope events
func (m *MailRequestMonitor) handleEnvelopeEvents() {
events := make(chan types.EnvelopeEvent, 100) // must be buffered to prevent blocking whisper
sub := m.eventSub.SubscribeEnvelopeEvents(events)
defer sub.Unsubscribe()
for {
select {
case <-m.quit:
return
case event := <-events:
m.handleEvent(event)
}
}
}
// handleEvent based on type of the event either triggers
// confirmation handler or removes hash from MailRequestMonitor
func (m *MailRequestMonitor) handleEvent(event types.EnvelopeEvent) {
handlers := map[types.EventType]func(types.EnvelopeEvent){
types.EventMailServerRequestSent: m.handleRequestSent,
types.EventMailServerRequestCompleted: m.handleEventMailServerRequestCompleted,
types.EventMailServerRequestExpired: m.handleEventMailServerRequestExpired,
}
if handler, ok := handlers[event.Event]; ok {
handler(event)
}
}
func (m *MailRequestMonitor) handleRequestSent(event types.EnvelopeEvent) {
m.mu.Lock()
defer m.mu.Unlock()
m.cache[event.Hash] = MailServerRequestSent
}
func (m *MailRequestMonitor) handleEventMailServerRequestCompleted(event types.EnvelopeEvent) {
m.mu.Lock()
defer m.mu.Unlock()
m.requestsRegistry.Unregister(event.Hash)
state, ok := m.cache[event.Hash]
if !ok || state != MailServerRequestSent {
return
}
log.Debug("mailserver response received", "hash", event.Hash)
delete(m.cache, event.Hash)
if m.handler != nil {
if resp, ok := event.Data.(*types.MailServerResponse); ok {
m.handler.MailServerRequestCompleted(event.Hash, resp.LastEnvelopeHash, resp.Cursor, resp.Error)
}
}
}
func (m *MailRequestMonitor) handleEventMailServerRequestExpired(event types.EnvelopeEvent) {
m.mu.Lock()
defer m.mu.Unlock()
m.requestsRegistry.Unregister(event.Hash)
state, ok := m.cache[event.Hash]
if !ok || state != MailServerRequestSent {
return
}
log.Debug("mailserver response expired", "hash", event.Hash)
delete(m.cache, event.Hash)
if m.handler != nil {
m.handler.MailServerRequestExpired(event.Hash)
}
}