mirror of
https://github.com/status-im/status-go.git
synced 2025-01-18 18:55:47 +00:00
f4cd8d27b5
This function returns only the new messages from the filter, never returns the same message for the same user twice.
171 lines
4.1 KiB
Go
171 lines
4.1 KiB
Go
package shhext
|
|
|
|
import (
|
|
"crypto/ecdsa"
|
|
"sync"
|
|
|
|
"github.com/ethereum/go-ethereum/common"
|
|
"github.com/ethereum/go-ethereum/log"
|
|
"github.com/ethereum/go-ethereum/node"
|
|
"github.com/ethereum/go-ethereum/p2p"
|
|
"github.com/ethereum/go-ethereum/rpc"
|
|
whisper "github.com/ethereum/go-ethereum/whisper/whisperv6"
|
|
"github.com/status-im/status-go/services/shhext/dedup"
|
|
)
|
|
|
|
// EnvelopeState in local tracker
|
|
type EnvelopeState int
|
|
|
|
const (
|
|
// EnvelopePosted is set when envelope was added to a local whisper queue.
|
|
EnvelopePosted EnvelopeState = iota
|
|
// EnvelopeSent is set when envelope is sent to atleast one peer.
|
|
EnvelopeSent
|
|
)
|
|
|
|
// EnvelopeEventsHandler used for two different event types.
|
|
type EnvelopeEventsHandler interface {
|
|
EnvelopeSent(common.Hash)
|
|
EnvelopeExpired(common.Hash)
|
|
}
|
|
|
|
// Service is a service that provides some additional Whisper API.
|
|
type Service struct {
|
|
w *whisper.Whisper
|
|
tracker *tracker
|
|
nodeID *ecdsa.PrivateKey
|
|
Deduplicator *dedup.Deduplicator
|
|
}
|
|
|
|
// Make sure that Service implements node.Service interface.
|
|
var _ node.Service = (*Service)(nil)
|
|
|
|
// New returns a new Service.
|
|
func New(w *whisper.Whisper, handler EnvelopeEventsHandler) *Service {
|
|
track := &tracker{
|
|
w: w,
|
|
handler: handler,
|
|
cache: map[common.Hash]EnvelopeState{},
|
|
}
|
|
return &Service{
|
|
w: w,
|
|
tracker: track,
|
|
Deduplicator: dedup.NewDeduplicator(w),
|
|
}
|
|
}
|
|
|
|
// Protocols returns a new protocols list. In this case, there are none.
|
|
func (s *Service) Protocols() []p2p.Protocol {
|
|
return []p2p.Protocol{}
|
|
}
|
|
|
|
// APIs returns a list of new APIs.
|
|
func (s *Service) APIs() []rpc.API {
|
|
return []rpc.API{
|
|
{
|
|
Namespace: "shhext",
|
|
Version: "1.0",
|
|
Service: NewPublicAPI(s),
|
|
Public: true,
|
|
},
|
|
}
|
|
}
|
|
|
|
// Start is run when a service is started.
|
|
// It does nothing in this case but is required by `node.Service` interface.
|
|
func (s *Service) Start(server *p2p.Server) error {
|
|
s.tracker.Start()
|
|
s.nodeID = server.PrivateKey
|
|
return nil
|
|
}
|
|
|
|
// Stop is run when a service is stopped.
|
|
// It does nothing in this case but is required by `node.Service` interface.
|
|
func (s *Service) Stop() error {
|
|
s.tracker.Stop()
|
|
return nil
|
|
}
|
|
|
|
// tracker responsible for processing events for envelopes that we are interested in
|
|
// and calling specified handler.
|
|
type tracker struct {
|
|
w *whisper.Whisper
|
|
handler EnvelopeEventsHandler
|
|
|
|
mu sync.Mutex
|
|
cache map[common.Hash]EnvelopeState
|
|
|
|
wg sync.WaitGroup
|
|
quit chan struct{}
|
|
}
|
|
|
|
// Start processing events.
|
|
func (t *tracker) Start() {
|
|
t.quit = make(chan struct{})
|
|
t.wg.Add(1)
|
|
go func() {
|
|
t.handleEnvelopeEvents()
|
|
t.wg.Done()
|
|
}()
|
|
}
|
|
|
|
// Stop process events.
|
|
func (t *tracker) Stop() {
|
|
close(t.quit)
|
|
t.wg.Wait()
|
|
}
|
|
|
|
// Add hash to a tracker.
|
|
func (t *tracker) Add(hash common.Hash) {
|
|
t.mu.Lock()
|
|
defer t.mu.Unlock()
|
|
t.cache[hash] = EnvelopePosted
|
|
}
|
|
|
|
// handleEnvelopeEvents processes whisper envelope events
|
|
func (t *tracker) handleEnvelopeEvents() {
|
|
events := make(chan whisper.EnvelopeEvent, 100) // must be buffered to prevent blocking whisper
|
|
sub := t.w.SubscribeEnvelopeEvents(events)
|
|
defer sub.Unsubscribe()
|
|
for {
|
|
select {
|
|
case <-t.quit:
|
|
return
|
|
case event := <-events:
|
|
t.handleEvent(event)
|
|
}
|
|
}
|
|
}
|
|
|
|
// handleEvent based on type of the event either triggers
|
|
// confirmation handler or removes hash from tracker
|
|
func (t *tracker) handleEvent(event whisper.EnvelopeEvent) {
|
|
t.mu.Lock()
|
|
defer t.mu.Unlock()
|
|
switch event.Event {
|
|
case whisper.EventEnvelopeSent:
|
|
state, ok := t.cache[event.Hash]
|
|
// if we didn't send a message using extension - skip it
|
|
// if message was already confirmed - skip it
|
|
if !ok || state == EnvelopeSent {
|
|
return
|
|
}
|
|
log.Debug("envelope is sent", "hash", event.Hash, "peer", event.Peer)
|
|
t.cache[event.Hash] = EnvelopeSent
|
|
if t.handler != nil {
|
|
t.handler.EnvelopeSent(event.Hash)
|
|
}
|
|
case whisper.EventEnvelopeExpired:
|
|
if state, ok := t.cache[event.Hash]; ok {
|
|
log.Debug("envelope expired", "hash", event.Hash, "state", state)
|
|
delete(t.cache, event.Hash)
|
|
if state == EnvelopeSent {
|
|
return
|
|
}
|
|
if t.handler != nil {
|
|
t.handler.EnvelopeExpired(event.Hash)
|
|
}
|
|
}
|
|
}
|
|
}
|