Refactorings: whisper transport removed (#2200)

This commit is contained in:
Volodymyr Kozieiev 2021-04-22 16:36:18 +03:00 committed by GitHub
parent fd49876a47
commit 9241903edb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
33 changed files with 994 additions and 2527 deletions

View File

@ -1 +1 @@
0.76.5
0.76.6

1
go.mod
View File

@ -54,6 +54,7 @@ require (
github.com/pborman/uuid v1.2.0
github.com/pkg/errors v0.9.1
github.com/prometheus/client_golang v1.5.0
github.com/prometheus/common v0.9.1
github.com/russolsen/ohyeah v0.0.0-20160324131710-f4938c005315 // indirect
github.com/russolsen/same v0.0.0-20160222130632-f089df61f51d // indirect
github.com/russolsen/transit v0.0.0-20180705123435-0794b4c4505a

3
go.sum
View File

@ -24,8 +24,10 @@ github.com/Shopify/toxiproxy v2.1.4+incompatible/go.mod h1:OXgGpZ6Cli1/URJOF1DMx
github.com/StackExchange/wmi v0.0.0-20180116203802-5d049714c4a6/go.mod h1:3eOhrUMpNV+6aFIbp5/iudMxNCF27Vw2OZgy4xEx0Fg=
github.com/aead/siphash v1.0.1/go.mod h1:Nywa3cDsYNNK3gaciGTWPwHt0wlpNV15vwmswBAUSII=
github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751 h1:JYp7IbQjafoB+tBA3gMyHYHrpOtNuDiK/uB5uXxq5wM=
github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0=
github.com/alecthomas/units v0.0.0-20190717042225-c3de453c63f4 h1:Hs82Z41s6SdL1CELW+XaDYmOH4hkBN4/N9og/AsOv7E=
github.com/alecthomas/units v0.0.0-20190717042225-c3de453c63f4/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0=
github.com/allegro/bigcache v0.0.0-20190218064605-e24eb225f156/go.mod h1:Cb/ax3seSYIx7SuZdm2G2xzfwmv3TPSk2ucNfQESPXM=
github.com/allegro/bigcache v1.2.0 h1:qDaE0QoF29wKBb3+pXFrJFy1ihe5OT9OiXhg1t85SxM=
@ -854,6 +856,7 @@ google.golang.org/grpc v1.20.1/go.mod h1:10oTOabMzJvdu6/UiuZezV6QK5dSlG84ov/aaiq
google.golang.org/grpc v1.23.0/go.mod h1:Y5yQAOtifL1yxbo5wqy6BxZv8vAUGQwXBOALyacEbxg=
google.golang.org/grpc v1.25.1 h1:wdKvqQk7IttEw92GoRyKG2IDrUIpgpj6H6m81yfeMW0=
google.golang.org/grpc v1.25.1/go.mod h1:c3i+UQWmh7LiEpx4sFZnkU36qjEYZ0imhYfXVyQciAY=
gopkg.in/alecthomas/kingpin.v2 v2.2.6 h1:jMFz6MfLP0/4fUyZle81rXUoxOBFi19VUFKVDOQfozc=
gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=

View File

@ -47,7 +47,7 @@ type MessageProcessor struct {
identity *ecdsa.PrivateKey
datasync *datasync.DataSync
protocol *encryption.Protocol
transport transport.Transport
transport *transport.Transport
logger *zap.Logger
persistence *RawMessagesPersistence
@ -71,7 +71,7 @@ func NewMessageProcessor(
identity *ecdsa.PrivateKey,
database *sql.DB,
enc *encryption.Protocol,
transport transport.Transport,
transport *transport.Transport,
logger *zap.Logger,
features FeatureFlags,
) (*MessageProcessor, error) {

View File

@ -6,6 +6,10 @@ import (
"path/filepath"
"testing"
transport2 "github.com/status-im/status-go/protocol/transport"
"github.com/status-im/status-go/waku"
"github.com/golang/protobuf/proto"
"github.com/stretchr/testify/suite"
@ -19,9 +23,7 @@ import (
"github.com/status-im/status-go/protocol/encryption"
"github.com/status-im/status-go/protocol/protobuf"
"github.com/status-im/status-go/protocol/sqlite"
transport "github.com/status-im/status-go/protocol/transport/whisper"
v1protocol "github.com/status-im/status-go/protocol/v1"
"github.com/status-im/status-go/whisper"
)
func TestMessageProcessorSuite(t *testing.T) {
@ -67,13 +69,13 @@ func (s *MessageProcessorSuite) SetupTest() {
s.logger,
)
whisperConfig := whisper.DefaultConfig
whisperConfig.MinimumAcceptedPOW = 0
shh := whisper.New(&whisperConfig)
wakuConfig := waku.DefaultConfig
wakuConfig.MinimumAcceptedPoW = 0
shh := waku.New(&wakuConfig, s.logger)
s.Require().NoError(shh.Start(nil))
whisperTransport, err := transport.NewTransport(
gethbridge.NewGethWhisperWrapper(shh),
whisperTransport, err := transport2.NewTransport(
gethbridge.NewGethWakuWrapper(shh),
identity,
database,
nil,

View File

@ -5,6 +5,8 @@ import (
"encoding/hex"
"fmt"
"github.com/status-im/status-go/protocol/transport"
"github.com/pkg/errors"
"github.com/status-im/status-go/eth-node/crypto"
@ -16,7 +18,6 @@ import (
"github.com/status-im/status-go/protocol/encryption/multidevice"
"github.com/status-im/status-go/protocol/ens"
"github.com/status-im/status-go/protocol/protobuf"
"github.com/status-im/status-go/protocol/transport"
v1protocol "github.com/status-im/status-go/protocol/v1"
"go.uber.org/zap"
@ -34,13 +35,13 @@ type MessageHandler struct {
identity *ecdsa.PrivateKey
persistence *sqlitePersistence
settings *accounts.Database
transport transport.Transport
transport *transport.Transport
ensVerifier *ens.Verifier
communitiesManager *communities.Manager
logger *zap.Logger
}
func newMessageHandler(identity *ecdsa.PrivateKey, logger *zap.Logger, persistence *sqlitePersistence, communitiesManager *communities.Manager, transport transport.Transport, ensVerifier *ens.Verifier, settings *accounts.Database) *MessageHandler {
func newMessageHandler(identity *ecdsa.PrivateKey, logger *zap.Logger, persistence *sqlitePersistence, communitiesManager *communities.Manager, transport *transport.Transport, ensVerifier *ens.Verifier, settings *accounts.Database) *MessageHandler {
return &MessageHandler{
identity: identity,
persistence: persistence,

View File

@ -42,12 +42,11 @@ import (
"github.com/status-im/status-go/protocol/pushnotificationserver"
"github.com/status-im/status-go/protocol/sqlite"
"github.com/status-im/status-go/protocol/transport"
wakutransp "github.com/status-im/status-go/protocol/transport/waku"
shhtransp "github.com/status-im/status-go/protocol/transport/whisper"
v1protocol "github.com/status-im/status-go/protocol/v1"
"github.com/status-im/status-go/services/mailservers"
)
//todo: kozieiev: get rid of wakutransp word
type chatContext string
const (
@ -79,7 +78,7 @@ type Messenger struct {
config *config
identity *ecdsa.PrivateKey
persistence *sqlitePersistence
transport transport.Transport
transport *transport.Transport
encryptor *encryption.Protocol
processor *common.MessageProcessor
handler *MessageHandler
@ -204,26 +203,12 @@ func NewMessenger(
}
// Initialize transport layer.
var transp transport.Transport
if shh, err := node.GetWhisper(nil); err == nil && shh != nil {
transp, err = shhtransp.NewTransport(
shh,
identity,
database,
nil,
c.envelopesMonitorConfig,
logger,
)
if err != nil {
return nil, errors.Wrap(err, "failed to create Transport")
}
} else {
logger.Info("failed to find Whisper service; trying Waku", zap.Error(err))
waku, err := node.GetWaku(nil)
if err != nil || waku == nil {
return nil, errors.Wrap(err, "failed to find Whisper and Waku services")
}
transp, err = wakutransp.NewTransport(
transp, err := transport.NewTransport(
waku,
identity,
database,
@ -234,7 +219,6 @@ func NewMessenger(
if err != nil {
return nil, errors.Wrap(err, "failed to create Transport")
}
}
// Initialize encryption layer.
encryptionProtocol := encryption.New(

View File

@ -9,8 +9,7 @@ import (
appmigrations "github.com/status-im/status-go/protocol/migrations"
push_notification_client_migrations "github.com/status-im/status-go/protocol/pushnotificationclient/migrations"
push_notification_server_migrations "github.com/status-im/status-go/protocol/pushnotificationserver/migrations"
wakumigrations "github.com/status-im/status-go/protocol/transport/waku/migrations"
whispermigrations "github.com/status-im/status-go/protocol/transport/whisper/migrations"
wakumigrations "github.com/status-im/status-go/protocol/transport/migrations"
)
type getter func(string) ([]byte, error)
@ -21,10 +20,6 @@ type migrationsWithGetter struct {
}
var defaultMigrations = []migrationsWithGetter{
{
Names: whispermigrations.AssetNames(),
Getter: whispermigrations.Asset,
},
{
Names: wakumigrations.AssetNames(),
Getter: wakumigrations.Asset,

View File

@ -1,11 +1,27 @@
package transport
import (
"context"
"errors"
"sync"
"go.uber.org/zap"
"github.com/status-im/status-go/eth-node/types"
)
// EnvelopeState in local tracker
type EnvelopeState int
const (
// NotRegistered returned if asked hash wasn't registered in the tracker.
NotRegistered EnvelopeState = -1
// EnvelopePosted is set when envelope was added to a local waku queue.
EnvelopePosted EnvelopeState = iota + 1
// EnvelopeSent is set when envelope is sent to at least one peer.
EnvelopeSent
)
type EnvelopesMonitorConfig struct {
EnvelopeEventsHandler EnvelopeEventsHandler
MaxAttempts int
@ -21,3 +37,281 @@ type EnvelopeEventsHandler interface {
MailServerRequestCompleted(types.Hash, types.Hash, []byte, error)
MailServerRequestExpired(types.Hash)
}
// NewEnvelopesMonitor returns a pointer to an instance of the EnvelopesMonitor.
func NewEnvelopesMonitor(w types.Waku, config EnvelopesMonitorConfig) *EnvelopesMonitor {
logger := config.Logger
if logger == nil {
logger = zap.NewNop()
}
var api types.PublicWakuAPI
if w != nil {
api = w.PublicWakuAPI()
}
return &EnvelopesMonitor{
w: w,
api: api,
handler: config.EnvelopeEventsHandler,
mailServerConfirmation: config.MailserverConfirmationsEnabled,
maxAttempts: config.MaxAttempts,
isMailserver: config.IsMailserver,
logger: logger.With(zap.Namespace("EnvelopesMonitor")),
// key is envelope hash (event.Hash)
envelopes: map[types.Hash]EnvelopeState{},
messages: map[types.Hash]*types.NewMessage{},
attempts: map[types.Hash]int{},
identifiers: make(map[types.Hash][][]byte),
// key is hash of the batch (event.Batch)
batches: map[types.Hash]map[types.Hash]struct{}{},
}
}
// EnvelopesMonitor is responsible for monitoring waku envelopes state.
type EnvelopesMonitor struct {
w types.Waku
api types.PublicWakuAPI
handler EnvelopeEventsHandler
mailServerConfirmation bool
maxAttempts int
mu sync.Mutex
envelopes map[types.Hash]EnvelopeState
batches map[types.Hash]map[types.Hash]struct{}
messages map[types.Hash]*types.NewMessage
attempts map[types.Hash]int
identifiers map[types.Hash][][]byte
wg sync.WaitGroup
quit chan struct{}
isMailserver func(peer types.EnodeID) bool
logger *zap.Logger
}
// Start processing events.
func (m *EnvelopesMonitor) Start() {
m.quit = make(chan struct{})
m.wg.Add(1)
go func() {
m.handleEnvelopeEvents()
m.wg.Done()
}()
}
// Stop process events.
func (m *EnvelopesMonitor) Stop() {
close(m.quit)
m.wg.Wait()
}
// Add hash to a tracker.
func (m *EnvelopesMonitor) Add(identifiers [][]byte, envelopeHash types.Hash, message types.NewMessage) {
m.mu.Lock()
defer m.mu.Unlock()
m.envelopes[envelopeHash] = EnvelopePosted
m.identifiers[envelopeHash] = identifiers
m.messages[envelopeHash] = &message
m.attempts[envelopeHash] = 1
}
func (m *EnvelopesMonitor) GetState(hash types.Hash) EnvelopeState {
m.mu.Lock()
defer m.mu.Unlock()
state, exist := m.envelopes[hash]
if !exist {
return NotRegistered
}
return state
}
// handleEnvelopeEvents processes waku envelope events
func (m *EnvelopesMonitor) handleEnvelopeEvents() {
events := make(chan types.EnvelopeEvent, 100) // must be buffered to prevent blocking waku
sub := m.w.SubscribeEnvelopeEvents(events)
defer func() {
close(events)
sub.Unsubscribe()
}()
for {
select {
case <-m.quit:
return
case event := <-events:
m.handleEvent(event)
}
}
}
// handleEvent based on type of the event either triggers
// confirmation handler or removes hash from tracker
func (m *EnvelopesMonitor) handleEvent(event types.EnvelopeEvent) {
handlers := map[types.EventType]func(types.EnvelopeEvent){
types.EventEnvelopeSent: m.handleEventEnvelopeSent,
types.EventEnvelopeExpired: m.handleEventEnvelopeExpired,
types.EventBatchAcknowledged: m.handleAcknowledgedBatch,
types.EventEnvelopeReceived: m.handleEventEnvelopeReceived,
}
if handler, ok := handlers[event.Event]; ok {
handler(event)
}
}
func (m *EnvelopesMonitor) handleEventEnvelopeSent(event types.EnvelopeEvent) {
if m.mailServerConfirmation {
if !m.isMailserver(event.Peer) {
return
}
}
m.mu.Lock()
defer m.mu.Unlock()
state, ok := m.envelopes[event.Hash]
// if we didn't send a message using extension - skip it
// if message was already confirmed - skip it
if !ok || state == EnvelopeSent {
return
}
m.logger.Debug("envelope is sent", zap.String("hash", event.Hash.String()), zap.String("peer", event.Peer.String()))
if event.Batch != (types.Hash{}) {
if _, ok := m.batches[event.Batch]; !ok {
m.batches[event.Batch] = map[types.Hash]struct{}{}
}
m.batches[event.Batch][event.Hash] = struct{}{}
m.logger.Debug("waiting for a confirmation", zap.String("batch", event.Batch.String()))
} else {
m.envelopes[event.Hash] = EnvelopeSent
if m.handler != nil {
m.handler.EnvelopeSent(m.identifiers[event.Hash])
}
}
}
func (m *EnvelopesMonitor) handleAcknowledgedBatch(event types.EnvelopeEvent) {
if m.mailServerConfirmation {
if !m.isMailserver(event.Peer) {
return
}
}
m.mu.Lock()
defer m.mu.Unlock()
envelopes, ok := m.batches[event.Batch]
if !ok {
m.logger.Debug("batch is not found", zap.String("batch", event.Batch.String()))
}
m.logger.Debug("received a confirmation", zap.String("batch", event.Batch.String()), zap.String("peer", event.Peer.String()))
envelopeErrors, ok := event.Data.([]types.EnvelopeError)
if event.Data != nil && !ok {
m.logger.Error("received unexpected data in the the confirmation event", zap.Any("data", event.Data))
}
failedEnvelopes := map[types.Hash]struct{}{}
for i := range envelopeErrors {
envelopeError := envelopeErrors[i]
_, exist := m.envelopes[envelopeError.Hash]
if exist {
m.logger.Warn("envelope that was posted by us is discarded", zap.String("hash", envelopeError.Hash.String()), zap.String("peer", event.Peer.String()), zap.String("error", envelopeError.Description))
var err error
switch envelopeError.Code {
case types.EnvelopeTimeNotSynced:
err = errors.New("envelope wasn't delivered due to time sync issues")
}
m.handleEnvelopeFailure(envelopeError.Hash, err)
}
failedEnvelopes[envelopeError.Hash] = struct{}{}
}
for hash := range envelopes {
if _, exist := failedEnvelopes[hash]; exist {
continue
}
state, ok := m.envelopes[hash]
if !ok || state == EnvelopeSent {
continue
}
m.envelopes[hash] = EnvelopeSent
if m.handler != nil {
m.handler.EnvelopeSent(m.identifiers[hash])
}
}
delete(m.batches, event.Batch)
}
func (m *EnvelopesMonitor) handleEventEnvelopeExpired(event types.EnvelopeEvent) {
m.mu.Lock()
defer m.mu.Unlock()
m.handleEnvelopeFailure(event.Hash, errors.New("envelope expired due to connectivity issues"))
}
// handleEnvelopeFailure is a common code path for processing envelopes failures. not thread safe, lock
// must be used on a higher level.
func (m *EnvelopesMonitor) handleEnvelopeFailure(hash types.Hash, err error) {
if state, ok := m.envelopes[hash]; ok {
message, exist := m.messages[hash]
if !exist {
m.logger.Error("message was deleted erroneously", zap.String("envelope hash", hash.String()))
}
attempt := m.attempts[hash]
identifiers := m.identifiers[hash]
m.clearMessageState(hash)
if state == EnvelopeSent {
return
}
if attempt < m.maxAttempts {
m.logger.Debug("retrying to send a message", zap.String("hash", hash.String()), zap.Int("attempt", attempt+1))
hex, err := m.api.Post(context.TODO(), *message)
if err != nil {
m.logger.Error("failed to retry sending message", zap.String("hash", hash.String()), zap.Int("attempt", attempt+1), zap.Error(err))
if m.handler != nil {
m.handler.EnvelopeExpired(identifiers, err)
}
}
envelopeID := types.BytesToHash(hex)
m.envelopes[envelopeID] = EnvelopePosted
m.messages[envelopeID] = message
m.attempts[envelopeID] = attempt + 1
m.identifiers[envelopeID] = identifiers
} else {
m.logger.Debug("envelope expired", zap.String("hash", hash.String()))
if m.handler != nil {
m.handler.EnvelopeExpired(identifiers, err)
}
}
}
}
func (m *EnvelopesMonitor) handleEventEnvelopeReceived(event types.EnvelopeEvent) {
if m.mailServerConfirmation {
if !m.isMailserver(event.Peer) {
return
}
}
m.mu.Lock()
defer m.mu.Unlock()
state, ok := m.envelopes[event.Hash]
if !ok || state != EnvelopePosted {
return
}
m.logger.Debug("expected envelope received", zap.String("hash", event.Hash.String()), zap.String("peer", event.Peer.String()))
m.envelopes[event.Hash] = EnvelopeSent
if m.handler != nil {
m.handler.EnvelopeSent(m.identifiers[event.Hash])
}
}
// clearMessageState removes all message and envelope state.
// not thread-safe, should be protected on a higher level.
func (m *EnvelopesMonitor) clearMessageState(envelopeID types.Hash) {
delete(m.envelopes, envelopeID)
delete(m.messages, envelopeID)
delete(m.attempts, envelopeID)
delete(m.identifiers, envelopeID)
}

View File

@ -1,4 +1,4 @@
package waku
package transport
import (
"testing"
@ -11,7 +11,6 @@ import (
"github.com/status-im/status-go/eth-node/crypto"
"github.com/status-im/status-go/eth-node/types"
"github.com/status-im/status-go/protocol/transport"
)
var (
@ -32,7 +31,7 @@ func TestEnvelopesMonitorSuite(t *testing.T) {
func (s *EnvelopesMonitorSuite) SetupTest() {
s.monitor = NewEnvelopesMonitor(
nil,
transport.EnvelopesMonitorConfig{
EnvelopesMonitorConfig{
EnvelopeEventsHandler: nil,
MaxAttempts: 0,
MailserverConfirmationsEnabled: false,

View File

@ -71,59 +71,59 @@ func NewFiltersManager(persistence KeysPersistence, service FiltersService, priv
}, nil
}
func (s *FiltersManager) Init(
func (f *FiltersManager) Init(
chatIDs []string,
publicKeys []*ecdsa.PublicKey,
) ([]*Filter, error) {
// Load our contact code.
_, err := s.LoadContactCode(&s.privateKey.PublicKey)
_, err := f.LoadContactCode(&f.privateKey.PublicKey)
if err != nil {
return nil, errors.Wrap(err, "failed to load contact code")
}
// Load partitioned topic.
_, err = s.loadMyPartitioned()
_, err = f.loadMyPartitioned()
if err != nil {
return nil, err
}
// Add discovery topic.
_, err = s.LoadDiscovery()
_, err = f.LoadDiscovery()
if err != nil {
return nil, err
}
// Add public, one-to-one and negotiated filters.
for _, chatID := range chatIDs {
_, err := s.LoadPublic(chatID)
_, err := f.LoadPublic(chatID)
if err != nil {
return nil, err
}
}
for _, publicKey := range publicKeys {
_, err := s.LoadContactCode(publicKey)
_, err := f.LoadContactCode(publicKey)
if err != nil {
return nil, err
}
}
s.mutex.Lock()
defer s.mutex.Unlock()
f.mutex.Lock()
defer f.mutex.Unlock()
var allFilters []*Filter
for _, f := range s.filters {
for _, f := range f.filters {
allFilters = append(allFilters, f)
}
return allFilters, nil
}
func (s *FiltersManager) InitPublicFilters(chatIDs []string) ([]*Filter, error) {
func (f *FiltersManager) InitPublicFilters(chatIDs []string) ([]*Filter, error) {
var filters []*Filter
// Add public, one-to-one and negotiated filters.
for _, chatID := range chatIDs {
f, err := s.LoadPublic(chatID)
f, err := f.LoadPublic(chatID)
if err != nil {
return nil, err
}
@ -132,15 +132,15 @@ func (s *FiltersManager) InitPublicFilters(chatIDs []string) ([]*Filter, error)
return filters, nil
}
func (s *FiltersManager) InitCommunityFilters(pks []*ecdsa.PrivateKey) ([]*Filter, error) {
func (f *FiltersManager) InitCommunityFilters(pks []*ecdsa.PrivateKey) ([]*Filter, error) {
var filters []*Filter
s.mutex.Lock()
defer s.mutex.Unlock()
f.mutex.Lock()
defer f.mutex.Unlock()
for _, pk := range pks {
identityStr := PublicKeyToStr(&pk.PublicKey)
rawFilter, err := s.addAsymmetric(identityStr, pk, true)
rawFilter, err := f.addAsymmetric(identityStr, pk, true)
if err != nil {
return nil, err
@ -155,7 +155,7 @@ func (s *FiltersManager) InitCommunityFilters(pks []*ecdsa.PrivateKey) ([]*Filte
OneToOne: true,
}
s.filters[filterID] = filter
f.filters[filterID] = filter
filters = append(filters, filter)
}
@ -163,7 +163,7 @@ func (s *FiltersManager) InitCommunityFilters(pks []*ecdsa.PrivateKey) ([]*Filte
}
// DEPRECATED
func (s *FiltersManager) InitWithFilters(filters []*Filter) ([]*Filter, error) {
func (f *FiltersManager) InitWithFilters(filters []*Filter) ([]*Filter, error) {
var (
chatIDs []string
publicKeys []*ecdsa.PublicKey
@ -181,43 +181,43 @@ func (s *FiltersManager) InitWithFilters(filters []*Filter) ([]*Filter, error) {
}
}
return s.Init(chatIDs, publicKeys)
return f.Init(chatIDs, publicKeys)
}
func (s *FiltersManager) Reset() error {
func (f *FiltersManager) Reset() error {
var filters []*Filter
s.mutex.Lock()
for _, f := range s.filters {
f.mutex.Lock()
for _, f := range f.filters {
filters = append(filters, f)
}
s.mutex.Unlock()
f.mutex.Unlock()
return s.Remove(filters...)
return f.Remove(filters...)
}
func (s *FiltersManager) Filters() (result []*Filter) {
s.mutex.Lock()
defer s.mutex.Unlock()
func (f *FiltersManager) Filters() (result []*Filter) {
f.mutex.Lock()
defer f.mutex.Unlock()
for _, f := range s.filters {
for _, f := range f.filters {
result = append(result, f)
}
return
}
func (s *FiltersManager) Filter(chatID string) *Filter {
s.mutex.Lock()
defer s.mutex.Unlock()
return s.filters[chatID]
func (f *FiltersManager) Filter(chatID string) *Filter {
f.mutex.Lock()
defer f.mutex.Unlock()
return f.filters[chatID]
}
// FilterByFilterID returns a Filter with a given Whisper filter ID.
func (s *FiltersManager) FilterByFilterID(filterID string) *Filter {
s.mutex.Lock()
defer s.mutex.Unlock()
for _, f := range s.filters {
func (f *FiltersManager) FilterByFilterID(filterID string) *Filter {
f.mutex.Lock()
defer f.mutex.Unlock()
for _, f := range f.filters {
if f.FilterID == filterID {
return f
}
@ -226,20 +226,19 @@ func (s *FiltersManager) FilterByFilterID(filterID string) *Filter {
}
// FilterByChatID returns a Filter for given chat id
func (s *FiltersManager) FilterByChatID(chatID string) *Filter {
s.mutex.Lock()
defer s.mutex.Unlock()
func (f *FiltersManager) FilterByChatID(chatID string) *Filter {
f.mutex.Lock()
defer f.mutex.Unlock()
return s.filters[chatID]
return f.filters[chatID]
}
func (s *FiltersManager) FiltersByPublicKey(publicKey *ecdsa.PublicKey) (result []*Filter) {
s.mutex.Lock()
defer s.mutex.Unlock()
func (f *FiltersManager) FiltersByPublicKey(publicKey *ecdsa.PublicKey) (result []*Filter) {
f.mutex.Lock()
defer f.mutex.Unlock()
identityStr := PublicKeyToStr(publicKey)
for _, f := range s.filters {
for _, f := range f.filters {
if f.Identity == identityStr {
result = append(result, f)
}
@ -249,27 +248,27 @@ func (s *FiltersManager) FiltersByPublicKey(publicKey *ecdsa.PublicKey) (result
}
// Remove remove all the filters associated with a chat/identity
func (s *FiltersManager) Remove(filters ...*Filter) error {
s.mutex.Lock()
defer s.mutex.Unlock()
func (f *FiltersManager) Remove(filters ...*Filter) error {
f.mutex.Lock()
defer f.mutex.Unlock()
for _, f := range filters {
if err := s.service.Unsubscribe(f.FilterID); err != nil {
for _, filter := range filters {
if err := f.service.Unsubscribe(filter.FilterID); err != nil {
return err
}
if f.SymKeyID != "" {
s.service.DeleteSymKey(f.SymKeyID)
if filter.SymKeyID != "" {
f.service.DeleteSymKey(filter.SymKeyID)
}
delete(s.filters, f.ChatID)
delete(f.filters, filter.ChatID)
}
return nil
}
// Remove remove all the filters associated with a chat/identity
func (s *FiltersManager) RemoveNoListenFilters() error {
s.mutex.Lock()
defer s.mutex.Unlock()
func (f *FiltersManager) RemoveNoListenFilters() error {
f.mutex.Lock()
defer f.mutex.Unlock()
var filterIDs []string
var filters []*Filter
@ -279,31 +278,31 @@ func (s *FiltersManager) RemoveNoListenFilters() error {
filters = append(filters, f)
}
}
if err := s.service.UnsubscribeMany(filterIDs); err != nil {
if err := f.service.UnsubscribeMany(filterIDs); err != nil {
return err
}
for _, filter := range filters {
if filter.SymKeyID != "" {
s.service.DeleteSymKey(filter.SymKeyID)
f.service.DeleteSymKey(filter.SymKeyID)
}
delete(s.filters, filter.ChatID)
delete(f.filters, filter.ChatID)
}
return nil
}
// Remove remove all the filters associated with a chat/identity
func (s *FiltersManager) RemoveFilterByChatID(chatID string) (*Filter, error) {
s.mutex.Lock()
filter, ok := s.filters[chatID]
s.mutex.Unlock()
func (f *FiltersManager) RemoveFilterByChatID(chatID string) (*Filter, error) {
f.mutex.Lock()
filter, ok := f.filters[chatID]
f.mutex.Unlock()
if !ok {
return nil, nil
}
err := s.Remove(filter)
err := f.Remove(filter)
if err != nil {
return nil, err
}
@ -312,28 +311,28 @@ func (s *FiltersManager) RemoveFilterByChatID(chatID string) (*Filter, error) {
}
// LoadPartitioned creates a filter for a partitioned topic.
func (s *FiltersManager) LoadPartitioned(publicKey *ecdsa.PublicKey, identity *ecdsa.PrivateKey, listen bool) (*Filter, error) {
return s.loadPartitioned(publicKey, identity, listen, false)
func (f *FiltersManager) LoadPartitioned(publicKey *ecdsa.PublicKey, identity *ecdsa.PrivateKey, listen bool) (*Filter, error) {
return f.loadPartitioned(publicKey, identity, listen, false)
}
// LoadEphemeral creates a filter for a partitioned/personal topic.
func (s *FiltersManager) LoadEphemeral(publicKey *ecdsa.PublicKey, identity *ecdsa.PrivateKey, listen bool) (*Filter, error) {
return s.loadPartitioned(publicKey, identity, listen, true)
func (f *FiltersManager) LoadEphemeral(publicKey *ecdsa.PublicKey, identity *ecdsa.PrivateKey, listen bool) (*Filter, error) {
return f.loadPartitioned(publicKey, identity, listen, true)
}
// LoadPersonal creates a filter for a personal topic.
func (s *FiltersManager) LoadPersonal(publicKey *ecdsa.PublicKey, identity *ecdsa.PrivateKey, listen bool) (*Filter, error) {
s.mutex.Lock()
defer s.mutex.Unlock()
func (f *FiltersManager) LoadPersonal(publicKey *ecdsa.PublicKey, identity *ecdsa.PrivateKey, listen bool) (*Filter, error) {
f.mutex.Lock()
defer f.mutex.Unlock()
chatID := PersonalDiscoveryTopic(publicKey)
if _, ok := s.filters[chatID]; ok {
return s.filters[chatID], nil
if _, ok := f.filters[chatID]; ok {
return f.filters[chatID], nil
}
// We set up a filter so we can publish,
// but we discard envelopes if listen is false.
filter, err := s.addAsymmetric(chatID, identity, listen)
filter, err := f.addAsymmetric(chatID, identity, listen)
if err != nil {
return nil, err
}
@ -347,28 +346,28 @@ func (s *FiltersManager) LoadPersonal(publicKey *ecdsa.PublicKey, identity *ecds
OneToOne: true,
}
s.filters[chatID] = chat
f.filters[chatID] = chat
return chat, nil
}
func (s *FiltersManager) loadMyPartitioned() (*Filter, error) {
return s.loadPartitioned(&s.privateKey.PublicKey, s.privateKey, true, false)
func (f *FiltersManager) loadMyPartitioned() (*Filter, error) {
return f.loadPartitioned(&f.privateKey.PublicKey, f.privateKey, true, false)
}
func (s *FiltersManager) loadPartitioned(publicKey *ecdsa.PublicKey, identity *ecdsa.PrivateKey, listen, ephemeral bool) (*Filter, error) {
s.mutex.Lock()
defer s.mutex.Unlock()
func (f *FiltersManager) loadPartitioned(publicKey *ecdsa.PublicKey, identity *ecdsa.PrivateKey, listen, ephemeral bool) (*Filter, error) {
f.mutex.Lock()
defer f.mutex.Unlock()
chatID := PartitionedTopic(publicKey)
if _, ok := s.filters[chatID]; ok {
return s.filters[chatID], nil
if _, ok := f.filters[chatID]; ok {
return f.filters[chatID], nil
}
// We set up a filter so we can publish,
// but we discard envelopes if listen is false.
filter, err := s.addAsymmetric(chatID, identity, listen)
filter, err := f.addAsymmetric(chatID, identity, listen)
if err != nil {
return nil, err
}
@ -383,24 +382,24 @@ func (s *FiltersManager) loadPartitioned(publicKey *ecdsa.PublicKey, identity *e
OneToOne: true,
}
s.filters[chatID] = chat
f.filters[chatID] = chat
return chat, nil
}
// LoadNegotiated loads a negotiated secret as a filter.
func (s *FiltersManager) LoadNegotiated(secret types.NegotiatedSecret) (*Filter, error) {
s.mutex.Lock()
defer s.mutex.Unlock()
func (f *FiltersManager) LoadNegotiated(secret types.NegotiatedSecret) (*Filter, error) {
f.mutex.Lock()
defer f.mutex.Unlock()
chatID := NegotiatedTopic(secret.PublicKey)
if _, ok := s.filters[chatID]; ok {
return s.filters[chatID], nil
if _, ok := f.filters[chatID]; ok {
return f.filters[chatID], nil
}
keyString := hex.EncodeToString(secret.Key)
filter, err := s.addSymmetric(keyString)
filter, err := f.addSymmetric(keyString)
if err != nil {
return nil, err
}
@ -416,25 +415,25 @@ func (s *FiltersManager) LoadNegotiated(secret types.NegotiatedSecret) (*Filter,
OneToOne: true,
}
s.filters[chat.ChatID] = chat
f.filters[chat.ChatID] = chat
return chat, nil
}
// LoadDiscovery adds 1 discovery filter
// for the personal discovery topic.
func (s *FiltersManager) LoadDiscovery() ([]*Filter, error) {
s.mutex.Lock()
defer s.mutex.Unlock()
func (f *FiltersManager) LoadDiscovery() ([]*Filter, error) {
f.mutex.Lock()
defer f.mutex.Unlock()
personalDiscoveryTopic := PersonalDiscoveryTopic(&s.privateKey.PublicKey)
personalDiscoveryTopic := PersonalDiscoveryTopic(&f.privateKey.PublicKey)
// Check if filters are already loaded.
var result []*Filter
expectedTopicCount := 1
if chat, ok := s.filters[personalDiscoveryTopic]; ok {
if chat, ok := f.filters[personalDiscoveryTopic]; ok {
result = append(result, chat)
}
@ -442,7 +441,7 @@ func (s *FiltersManager) LoadDiscovery() ([]*Filter, error) {
return result, nil
}
identityStr := PublicKeyToStr(&s.privateKey.PublicKey)
identityStr := PublicKeyToStr(&f.privateKey.PublicKey)
// Load personal discovery
personalDiscoveryChat := &Filter{
@ -453,7 +452,7 @@ func (s *FiltersManager) LoadDiscovery() ([]*Filter, error) {
OneToOne: true,
}
discoveryResponse, err := s.addAsymmetric(personalDiscoveryChat.ChatID, s.privateKey, true)
discoveryResponse, err := f.addAsymmetric(personalDiscoveryChat.ChatID, f.privateKey, true)
if err != nil {
return nil, err
}
@ -461,21 +460,21 @@ func (s *FiltersManager) LoadDiscovery() ([]*Filter, error) {
personalDiscoveryChat.Topic = discoveryResponse.Topic
personalDiscoveryChat.FilterID = discoveryResponse.FilterID
s.filters[personalDiscoveryChat.ChatID] = personalDiscoveryChat
f.filters[personalDiscoveryChat.ChatID] = personalDiscoveryChat
return []*Filter{personalDiscoveryChat}, nil
}
// LoadPublic adds a filter for a public chat.
func (s *FiltersManager) LoadPublic(chatID string) (*Filter, error) {
s.mutex.Lock()
defer s.mutex.Unlock()
func (f *FiltersManager) LoadPublic(chatID string) (*Filter, error) {
f.mutex.Lock()
defer f.mutex.Unlock()
if chat, ok := s.filters[chatID]; ok {
if chat, ok := f.filters[chatID]; ok {
return chat, nil
}
filterAndTopic, err := s.addSymmetric(chatID)
filterAndTopic, err := f.addSymmetric(chatID)
if err != nil {
return nil, err
}
@ -489,23 +488,23 @@ func (s *FiltersManager) LoadPublic(chatID string) (*Filter, error) {
OneToOne: false,
}
s.filters[chatID] = chat
f.filters[chatID] = chat
return chat, nil
}
// LoadContactCode creates a filter for the advertise topic for a given public key.
func (s *FiltersManager) LoadContactCode(pubKey *ecdsa.PublicKey) (*Filter, error) {
s.mutex.Lock()
defer s.mutex.Unlock()
func (f *FiltersManager) LoadContactCode(pubKey *ecdsa.PublicKey) (*Filter, error) {
f.mutex.Lock()
defer f.mutex.Unlock()
chatID := ContactCodeTopic(pubKey)
if _, ok := s.filters[chatID]; ok {
return s.filters[chatID], nil
if _, ok := f.filters[chatID]; ok {
return f.filters[chatID], nil
}
contactCodeFilter, err := s.addSymmetric(chatID)
contactCodeFilter, err := f.addSymmetric(chatID)
if err != nil {
return nil, err
}
@ -519,41 +518,41 @@ func (s *FiltersManager) LoadContactCode(pubKey *ecdsa.PublicKey) (*Filter, erro
Listen: true,
}
s.filters[chatID] = chat
f.filters[chatID] = chat
return chat, nil
}
// addSymmetric adds a symmetric key filter
func (s *FiltersManager) addSymmetric(chatID string) (*RawFilter, error) {
func (f *FiltersManager) addSymmetric(chatID string) (*RawFilter, error) {
var symKeyID string
var err error
topic := ToTopic(chatID)
topics := [][]byte{topic}
symKey, ok := s.keys[chatID]
symKey, ok := f.keys[chatID]
if ok {
symKeyID, err = s.service.AddSymKeyDirect(symKey)
symKeyID, err = f.service.AddSymKeyDirect(symKey)
if err != nil {
return nil, err
}
} else {
symKeyID, err = s.service.AddSymKeyFromPassword(chatID)
symKeyID, err = f.service.AddSymKeyFromPassword(chatID)
if err != nil {
return nil, err
}
if symKey, err = s.service.GetSymKey(symKeyID); err != nil {
if symKey, err = f.service.GetSymKey(symKeyID); err != nil {
return nil, err
}
s.keys[chatID] = symKey
f.keys[chatID] = symKey
err = s.persistence.Add(chatID, symKey)
err = f.persistence.Add(chatID, symKey)
if err != nil {
return nil, err
}
}
id, err := s.service.Subscribe(&types.SubscriptionOptions{
id, err := f.service.Subscribe(&types.SubscriptionOptions{
SymKeyID: symKeyID,
PoW: minPow,
Topics: topics,
@ -571,7 +570,7 @@ func (s *FiltersManager) addSymmetric(chatID string) (*RawFilter, error) {
// addAsymmetricFilter adds a filter with our private key
// and set minPow according to the listen parameter.
func (s *FiltersManager) addAsymmetric(chatID string, identity *ecdsa.PrivateKey, listen bool) (*RawFilter, error) {
func (f *FiltersManager) addAsymmetric(chatID string, identity *ecdsa.PrivateKey, listen bool) (*RawFilter, error) {
var (
err error
pow = 1.0 // use PoW high enough to discard all messages for the filter
@ -584,12 +583,12 @@ func (s *FiltersManager) addAsymmetric(chatID string, identity *ecdsa.PrivateKey
topic := ToTopic(chatID)
topics := [][]byte{topic}
privateKeyID, err := s.service.AddKeyPair(identity)
privateKeyID, err := f.service.AddKeyPair(identity)
if err != nil {
return nil, err
}
id, err := s.service.Subscribe(&types.SubscriptionOptions{
id, err := f.service.Subscribe(&types.SubscriptionOptions{
PrivateKeyID: privateKeyID,
PoW: pow,
Topics: topics,
@ -601,9 +600,9 @@ func (s *FiltersManager) addAsymmetric(chatID string, identity *ecdsa.PrivateKey
}
// GetNegotiated returns a negotiated chat given an identity
func (s *FiltersManager) GetNegotiated(identity *ecdsa.PublicKey) *Filter {
s.mutex.Lock()
defer s.mutex.Unlock()
func (f *FiltersManager) GetNegotiated(identity *ecdsa.PublicKey) *Filter {
f.mutex.Lock()
defer f.mutex.Unlock()
return s.filters[NegotiatedTopic(identity)]
return f.filters[NegotiatedTopic(identity)]
}

View File

@ -1,4 +1,4 @@
package waku
package transport
import (
"database/sql"

View File

@ -1,66 +1,552 @@
package transport
import (
"bytes"
"context"
"crypto/ecdsa"
"database/sql"
"encoding/hex"
"math/big"
"sync"
"time"
"github.com/google/uuid"
"github.com/pkg/errors"
"go.uber.org/zap"
"github.com/status-im/status-go/eth-node/crypto"
"github.com/status-im/status-go/eth-node/types"
)
type Transport interface {
Stop() error
var (
// ErrNoMailservers returned if there is no configured mailservers that can be used.
ErrNoMailservers = errors.New("no configured mailservers")
)
JoinPrivate(publicKey *ecdsa.PublicKey) (*Filter, error)
LeavePrivate(publicKey *ecdsa.PublicKey) error
JoinGroup(publicKeys []*ecdsa.PublicKey) ([]*Filter, error)
LeaveGroup(publicKeys []*ecdsa.PublicKey) error
JoinPublic(chatID string) (*Filter, error)
LeavePublic(chatID string) error
GetCurrentTime() uint64
MaxMessageSize() uint32
type transportKeysManager struct {
waku types.Waku
SendPublic(ctx context.Context, newMessage *types.NewMessage, chatName string) ([]byte, error)
SendPrivateWithSharedSecret(ctx context.Context, newMessage *types.NewMessage, publicKey *ecdsa.PublicKey, secret []byte) ([]byte, error)
SendPrivateWithPartitioned(ctx context.Context, newMessage *types.NewMessage, publicKey *ecdsa.PublicKey) ([]byte, error)
SendPrivateOnPersonalTopic(ctx context.Context, newMessage *types.NewMessage, publicKey *ecdsa.PublicKey) ([]byte, error)
SendCommunityMessage(ctx context.Context, newMessage *types.NewMessage, publicKey *ecdsa.PublicKey) ([]byte, error)
SendMessagesRequest(
// Identity of the current user.
privateKey *ecdsa.PrivateKey
passToSymKeyMutex sync.RWMutex
passToSymKeyCache map[string]string
}
func (m *transportKeysManager) AddOrGetKeyPair(priv *ecdsa.PrivateKey) (string, error) {
// caching is handled in waku
return m.waku.AddKeyPair(priv)
}
func (m *transportKeysManager) AddOrGetSymKeyFromPassword(password string) (string, error) {
m.passToSymKeyMutex.Lock()
defer m.passToSymKeyMutex.Unlock()
if val, ok := m.passToSymKeyCache[password]; ok {
return val, nil
}
id, err := m.waku.AddSymKeyFromPassword(password)
if err != nil {
return id, err
}
m.passToSymKeyCache[password] = id
return id, nil
}
func (m *transportKeysManager) RawSymKey(id string) ([]byte, error) {
return m.waku.GetSymKey(id)
}
type Option func(*Transport) error
// Transport is a transport based on Whisper service.
type Transport struct {
waku types.Waku
api types.PublicWakuAPI // only PublicWakuAPI implements logic to send messages
keysManager *transportKeysManager
filters *FiltersManager
logger *zap.Logger
cache *ProcessedMessageIDsCache
mailservers []string
envelopesMonitor *EnvelopesMonitor
quit chan struct{}
}
// NewTransport returns a new Transport.
// TODO: leaving a chat should verify that for a given public key
// there are no other chats. It may happen that we leave a private chat
// but still have a public chat for a given public key.
func NewTransport(
waku types.Waku,
privateKey *ecdsa.PrivateKey,
db *sql.DB,
mailservers []string,
envelopesMonitorConfig *EnvelopesMonitorConfig,
logger *zap.Logger,
opts ...Option,
) (*Transport, error) {
filtersManager, err := NewFiltersManager(newSQLitePersistence(db), waku, privateKey, logger)
if err != nil {
return nil, err
}
var envelopesMonitor *EnvelopesMonitor
if envelopesMonitorConfig != nil {
envelopesMonitor = NewEnvelopesMonitor(waku, *envelopesMonitorConfig)
envelopesMonitor.Start()
}
var api types.PublicWhisperAPI
if waku != nil {
api = waku.PublicWakuAPI()
}
t := &Transport{
waku: waku,
api: api,
cache: NewProcessedMessageIDsCache(db),
envelopesMonitor: envelopesMonitor,
quit: make(chan struct{}),
keysManager: &transportKeysManager{
waku: waku,
privateKey: privateKey,
passToSymKeyCache: make(map[string]string),
},
filters: filtersManager,
mailservers: mailservers,
logger: logger.With(zap.Namespace("Transport")),
}
for _, opt := range opts {
if err := opt(t); err != nil {
return nil, err
}
}
t.cleanFiltersLoop()
return t, nil
}
func (t *Transport) InitFilters(chatIDs []string, publicKeys []*ecdsa.PublicKey) ([]*Filter, error) {
return t.filters.Init(chatIDs, publicKeys)
}
func (t *Transport) InitPublicFilters(chatIDs []string) ([]*Filter, error) {
return t.filters.InitPublicFilters(chatIDs)
}
func (t *Transport) Filters() []*Filter {
return t.filters.Filters()
}
func (t *Transport) FilterByChatID(chatID string) *Filter {
return t.filters.FilterByChatID(chatID)
}
func (t *Transport) LoadFilters(filters []*Filter) ([]*Filter, error) {
return t.filters.InitWithFilters(filters)
}
func (t *Transport) InitCommunityFilters(pks []*ecdsa.PrivateKey) ([]*Filter, error) {
return t.filters.InitCommunityFilters(pks)
}
func (t *Transport) RemoveFilters(filters []*Filter) error {
return t.filters.Remove(filters...)
}
func (t *Transport) RemoveFilterByChatID(chatID string) (*Filter, error) {
return t.filters.RemoveFilterByChatID(chatID)
}
func (t *Transport) ResetFilters() error {
return t.filters.Reset()
}
func (t *Transport) ProcessNegotiatedSecret(secret types.NegotiatedSecret) (*Filter, error) {
filter, err := t.filters.LoadNegotiated(secret)
if err != nil {
return nil, err
}
return filter, nil
}
func (t *Transport) JoinPublic(chatID string) (*Filter, error) {
return t.filters.LoadPublic(chatID)
}
func (t *Transport) LeavePublic(chatID string) error {
chat := t.filters.Filter(chatID)
if chat != nil {
return nil
}
return t.filters.Remove(chat)
}
func (t *Transport) JoinPrivate(publicKey *ecdsa.PublicKey) (*Filter, error) {
return t.filters.LoadContactCode(publicKey)
}
func (t *Transport) LeavePrivate(publicKey *ecdsa.PublicKey) error {
filters := t.filters.FiltersByPublicKey(publicKey)
return t.filters.Remove(filters...)
}
func (t *Transport) JoinGroup(publicKeys []*ecdsa.PublicKey) ([]*Filter, error) {
var filters []*Filter
for _, pk := range publicKeys {
f, err := t.filters.LoadContactCode(pk)
if err != nil {
return nil, err
}
filters = append(filters, f)
}
return filters, nil
}
func (t *Transport) LeaveGroup(publicKeys []*ecdsa.PublicKey) error {
for _, publicKey := range publicKeys {
filters := t.filters.FiltersByPublicKey(publicKey)
if err := t.filters.Remove(filters...); err != nil {
return err
}
}
return nil
}
func (t *Transport) RetrieveRawAll() (map[Filter][]*types.Message, error) {
result := make(map[Filter][]*types.Message)
allFilters := t.filters.Filters()
for _, filter := range allFilters {
// Don't pull from filters we don't listen to
if !filter.Listen {
continue
}
msgs, err := t.api.GetFilterMessages(filter.FilterID)
if err != nil {
t.logger.Warn("failed to fetch messages", zap.Error(err))
continue
}
if len(msgs) == 0 {
continue
}
ids := make([]string, len(msgs))
for i := range msgs {
id := types.EncodeHex(msgs[i].Hash)
ids[i] = id
}
hits, err := t.cache.Hits(ids)
if err != nil {
t.logger.Error("failed to check messages exists", zap.Error(err))
return nil, err
}
for i := range msgs {
// Exclude anything that is a cache hit
if !hits[types.EncodeHex(msgs[i].Hash)] {
result[*filter] = append(result[*filter], msgs[i])
}
}
}
return result, nil
}
// SendPublic sends a new message using the Whisper service.
// For public filters, chat name is used as an ID as well as
// a topic.
func (t *Transport) SendPublic(ctx context.Context, newMessage *types.NewMessage, chatName string) ([]byte, error) {
if err := t.addSig(newMessage); err != nil {
return nil, err
}
filter, err := t.filters.LoadPublic(chatName)
if err != nil {
return nil, err
}
newMessage.SymKeyID = filter.SymKeyID
newMessage.Topic = filter.Topic
return t.api.Post(ctx, *newMessage)
}
func (t *Transport) SendPrivateWithSharedSecret(ctx context.Context, newMessage *types.NewMessage, publicKey *ecdsa.PublicKey, secret []byte) ([]byte, error) {
if err := t.addSig(newMessage); err != nil {
return nil, err
}
filter, err := t.filters.LoadNegotiated(types.NegotiatedSecret{
PublicKey: publicKey,
Key: secret,
})
if err != nil {
return nil, err
}
newMessage.SymKeyID = filter.SymKeyID
newMessage.Topic = filter.Topic
newMessage.PublicKey = nil
return t.api.Post(ctx, *newMessage)
}
func (t *Transport) SendPrivateWithPartitioned(ctx context.Context, newMessage *types.NewMessage, publicKey *ecdsa.PublicKey) ([]byte, error) {
if err := t.addSig(newMessage); err != nil {
return nil, err
}
filter, err := t.filters.LoadPartitioned(publicKey, t.keysManager.privateKey, false)
if err != nil {
return nil, err
}
newMessage.Topic = filter.Topic
newMessage.PublicKey = crypto.FromECDSAPub(publicKey)
return t.api.Post(ctx, *newMessage)
}
func (t *Transport) SendPrivateOnPersonalTopic(ctx context.Context, newMessage *types.NewMessage, publicKey *ecdsa.PublicKey) ([]byte, error) {
if err := t.addSig(newMessage); err != nil {
return nil, err
}
filter, err := t.filters.LoadPersonal(publicKey, t.keysManager.privateKey, false)
if err != nil {
return nil, err
}
newMessage.Topic = filter.Topic
newMessage.PublicKey = crypto.FromECDSAPub(publicKey)
return t.api.Post(ctx, *newMessage)
}
func (t *Transport) LoadKeyFilters(key *ecdsa.PrivateKey) (*Filter, error) {
return t.filters.LoadEphemeral(&key.PublicKey, key, true)
}
func (t *Transport) SendCommunityMessage(ctx context.Context, newMessage *types.NewMessage, publicKey *ecdsa.PublicKey) ([]byte, error) {
if err := t.addSig(newMessage); err != nil {
return nil, err
}
// We load the filter to make sure we can post on it
filter, err := t.filters.LoadPublic(PubkeyToHex(publicKey)[2:])
if err != nil {
return nil, err
}
newMessage.Topic = filter.Topic
newMessage.PublicKey = crypto.FromECDSAPub(publicKey)
t.logger.Debug("SENDING message", zap.Binary("topic", filter.Topic[:]))
return t.api.Post(ctx, *newMessage)
}
func (t *Transport) cleanFilters() error {
return t.filters.RemoveNoListenFilters()
}
func (t *Transport) addSig(newMessage *types.NewMessage) error {
sigID, err := t.keysManager.AddOrGetKeyPair(t.keysManager.privateKey)
if err != nil {
return err
}
newMessage.SigID = sigID
return nil
}
func (t *Transport) Track(identifiers [][]byte, hash []byte, newMessage *types.NewMessage) {
if t.envelopesMonitor != nil {
t.envelopesMonitor.Add(identifiers, types.BytesToHash(hash), *newMessage)
}
}
// GetCurrentTime returns the current unix timestamp in milliseconds
func (t *Transport) GetCurrentTime() uint64 {
return uint64(t.waku.GetCurrentTime().UnixNano() / int64(time.Millisecond))
}
func (t *Transport) MaxMessageSize() uint32 {
return t.waku.MaxMessageSize()
}
func (t *Transport) Stop() error {
close(t.quit)
if t.envelopesMonitor != nil {
t.envelopesMonitor.Stop()
}
return nil
}
// cleanFiltersLoop cleans up the topic we create for the only purpose
// of sending messages.
// Whenever we send a message we also need to listen to that particular topic
// but in case of asymettric topics, we are not interested in listening to them.
// We therefore periodically clean them up so we don't receive unnecessary data.
func (t *Transport) cleanFiltersLoop() {
ticker := time.NewTicker(5 * time.Minute)
go func() {
for {
select {
case <-t.quit:
ticker.Stop()
return
case <-ticker.C:
err := t.cleanFilters()
if err != nil {
t.logger.Error("failed to clean up topics", zap.Error(err))
}
}
}
}()
}
func (t *Transport) sendMessagesRequestForTopics(
ctx context.Context,
peerID []byte,
from, to uint32,
previousCursor []byte,
topics []types.TopicType,
waitForResponse bool,
) (cursor []byte, err error) {
r := createMessagesRequest(from, to, previousCursor, topics)
r.SetDefaults(t.waku.GetCurrentTime())
events := make(chan types.EnvelopeEvent, 10)
sub := t.waku.SubscribeEnvelopeEvents(events)
defer sub.Unsubscribe()
err = t.waku.SendMessagesRequest(peerID, r)
if err != nil {
return
}
if !waitForResponse {
return
}
resp, err := t.waitForRequestCompleted(ctx, r.ID, events)
if err == nil && resp != nil && resp.Error != nil {
err = resp.Error
} else if err == nil && resp != nil {
cursor = resp.Cursor
}
return
}
// RequestHistoricMessages requests historic messages for all registered filters.
func (t *Transport) SendMessagesRequest(
ctx context.Context,
peerID []byte,
from, to uint32,
previousCursor []byte,
waitForResponse bool,
) (cursor []byte, err error)
) (cursor []byte, err error) {
SendMessagesRequestForFilter(
topics := make([]types.TopicType, len(t.Filters()))
for _, f := range t.Filters() {
topics = append(topics, f.Topic)
}
return t.sendMessagesRequestForTopics(ctx, peerID, from, to, previousCursor, topics, waitForResponse)
}
func (t *Transport) SendMessagesRequestForFilter(
ctx context.Context,
peerID []byte,
from, to uint32,
previousCursor []byte,
filter *Filter,
waitForResponse bool,
) (cursor []byte, err error)
FilterByChatID(string) *Filter
) (cursor []byte, err error) {
Track(identifiers [][]byte, hash []byte, newMessage *types.NewMessage)
topics := make([]types.TopicType, len(t.Filters()))
topics = append(topics, filter.Topic)
InitFilters(chatIDs []string, publicKeys []*ecdsa.PublicKey) ([]*Filter, error)
InitPublicFilters(chatIDs []string) ([]*Filter, error)
InitCommunityFilters(pks []*ecdsa.PrivateKey) ([]*Filter, error)
LoadFilters(filters []*Filter) ([]*Filter, error)
RemoveFilters(filters []*Filter) error
RemoveFilterByChatID(string) (*Filter, error)
ResetFilters() error
Filters() []*Filter
LoadKeyFilters(*ecdsa.PrivateKey) (*Filter, error)
ProcessNegotiatedSecret(secret types.NegotiatedSecret) (*Filter, error)
RetrieveRawAll() (map[Filter][]*types.Message, error)
return t.sendMessagesRequestForTopics(ctx, peerID, from, to, previousCursor, topics, waitForResponse)
}
ConfirmMessagesProcessed([]string, uint64) error
CleanMessagesProcessed(uint64) error
func createMessagesRequest(from, to uint32, cursor []byte, topics []types.TopicType) types.MessagesRequest {
aUUID := uuid.New()
// uuid is 16 bytes, converted to hex it's 32 bytes as expected by types.MessagesRequest
id := []byte(hex.EncodeToString(aUUID[:]))
return types.MessagesRequest{
ID: id,
From: from,
To: to,
Limit: 100,
Cursor: cursor,
Bloom: topicsToBloom(topics...),
}
}
SetEnvelopeEventsHandler(handler EnvelopeEventsHandler) error
func topicsToBloom(topics ...types.TopicType) []byte {
i := new(big.Int)
for _, topic := range topics {
bloom := types.TopicToBloom(topic)
i.Or(i, new(big.Int).SetBytes(bloom[:]))
}
combined := make([]byte, types.BloomFilterSize)
data := i.Bytes()
copy(combined[types.BloomFilterSize-len(data):], data[:])
return combined
}
func (t *Transport) waitForRequestCompleted(ctx context.Context, requestID []byte, events chan types.EnvelopeEvent) (*types.MailServerResponse, error) {
for {
select {
case ev := <-events:
if !bytes.Equal(ev.Hash.Bytes(), requestID) {
continue
}
if ev.Event != types.EventMailServerRequestCompleted {
continue
}
data, ok := ev.Data.(*types.MailServerResponse)
if ok {
return data, nil
}
case <-ctx.Done():
return nil, ctx.Err()
}
}
}
// ConfirmMessagesProcessed marks the messages as processed in the cache so
// they won't be passed to the next layer anymore
func (t *Transport) ConfirmMessagesProcessed(ids []string, timestamp uint64) error {
return t.cache.Add(ids, timestamp)
}
// CleanMessagesProcessed clears the messages that are older than timestamp
func (t *Transport) CleanMessagesProcessed(timestamp uint64) error {
return t.cache.Clean(timestamp)
}
func (t *Transport) SetEnvelopeEventsHandler(handler EnvelopeEventsHandler) error {
if t.envelopesMonitor == nil {
return errors.New("Current transport has no envelopes monitor")
}
t.envelopesMonitor.handler = handler
return nil
}
func PubkeyToHex(key *ecdsa.PublicKey) string {

View File

@ -1,4 +1,4 @@
package waku
package transport
import (
"io/ioutil"
@ -12,7 +12,7 @@ import (
"github.com/status-im/status-go/protocol/tt"
)
func TestNewWakuServiceTransport(t *testing.T) {
func TestNewTransport(t *testing.T) {
dbPath, err := ioutil.TempFile("", "transport.sql")
require.NoError(t, err)
defer os.Remove(dbPath.Name())

View File

@ -1,310 +0,0 @@
package waku
import (
"context"
"errors"
"sync"
"go.uber.org/zap"
"github.com/status-im/status-go/eth-node/types"
"github.com/status-im/status-go/protocol/transport"
)
// EnvelopeState in local tracker
type EnvelopeState int
const (
// NotRegistered returned if asked hash wasn't registered in the tracker.
NotRegistered EnvelopeState = -1
// EnvelopePosted is set when envelope was added to a local waku queue.
EnvelopePosted EnvelopeState = iota
// EnvelopeSent is set when envelope is sent to at least one peer.
EnvelopeSent
)
// EnvelopeEventsHandler used for two different event types.
type EnvelopeEventsHandler interface {
EnvelopeSent([][]byte)
EnvelopeExpired([][]byte, error)
MailServerRequestCompleted(types.Hash, types.Hash, []byte, error)
MailServerRequestExpired(types.Hash)
}
// NewEnvelopesMonitor returns a pointer to an instance of the EnvelopesMonitor.
func NewEnvelopesMonitor(w types.Waku, config transport.EnvelopesMonitorConfig) *EnvelopesMonitor {
logger := config.Logger
if logger == nil {
logger = zap.NewNop()
}
var api types.PublicWakuAPI
if w != nil {
api = w.PublicWakuAPI()
}
return &EnvelopesMonitor{
w: w,
api: api,
handler: config.EnvelopeEventsHandler,
mailServerConfirmation: config.MailserverConfirmationsEnabled,
maxAttempts: config.MaxAttempts,
isMailserver: config.IsMailserver,
logger: logger.With(zap.Namespace("EnvelopesMonitor")),
// key is envelope hash (event.Hash)
envelopes: map[types.Hash]EnvelopeState{},
messages: map[types.Hash]*types.NewMessage{},
attempts: map[types.Hash]int{},
identifiers: make(map[types.Hash][][]byte),
// key is hash of the batch (event.Batch)
batches: map[types.Hash]map[types.Hash]struct{}{},
}
}
// EnvelopesMonitor is responsible for monitoring waku envelopes state.
type EnvelopesMonitor struct {
w types.Waku
api types.PublicWakuAPI
handler EnvelopeEventsHandler
mailServerConfirmation bool
maxAttempts int
mu sync.Mutex
envelopes map[types.Hash]EnvelopeState
batches map[types.Hash]map[types.Hash]struct{}
messages map[types.Hash]*types.NewMessage
attempts map[types.Hash]int
identifiers map[types.Hash][][]byte
wg sync.WaitGroup
quit chan struct{}
isMailserver func(peer types.EnodeID) bool
logger *zap.Logger
}
// Start processing events.
func (m *EnvelopesMonitor) Start() {
m.quit = make(chan struct{})
m.wg.Add(1)
go func() {
m.handleEnvelopeEvents()
m.wg.Done()
}()
}
// Stop process events.
func (m *EnvelopesMonitor) Stop() {
close(m.quit)
m.wg.Wait()
}
// Add hash to a tracker.
func (m *EnvelopesMonitor) Add(identifiers [][]byte, envelopeHash types.Hash, message types.NewMessage) {
m.mu.Lock()
defer m.mu.Unlock()
m.envelopes[envelopeHash] = EnvelopePosted
m.identifiers[envelopeHash] = identifiers
m.messages[envelopeHash] = &message
m.attempts[envelopeHash] = 1
}
func (m *EnvelopesMonitor) GetState(hash types.Hash) EnvelopeState {
m.mu.Lock()
defer m.mu.Unlock()
state, exist := m.envelopes[hash]
if !exist {
return NotRegistered
}
return state
}
// handleEnvelopeEvents processes waku envelope events
func (m *EnvelopesMonitor) handleEnvelopeEvents() {
events := make(chan types.EnvelopeEvent, 100) // must be buffered to prevent blocking waku
sub := m.w.SubscribeEnvelopeEvents(events)
defer func() {
close(events)
sub.Unsubscribe()
}()
for {
select {
case <-m.quit:
return
case event := <-events:
m.handleEvent(event)
}
}
}
// handleEvent based on type of the event either triggers
// confirmation handler or removes hash from tracker
func (m *EnvelopesMonitor) handleEvent(event types.EnvelopeEvent) {
handlers := map[types.EventType]func(types.EnvelopeEvent){
types.EventEnvelopeSent: m.handleEventEnvelopeSent,
types.EventEnvelopeExpired: m.handleEventEnvelopeExpired,
types.EventBatchAcknowledged: m.handleAcknowledgedBatch,
types.EventEnvelopeReceived: m.handleEventEnvelopeReceived,
}
if handler, ok := handlers[event.Event]; ok {
handler(event)
}
}
func (m *EnvelopesMonitor) handleEventEnvelopeSent(event types.EnvelopeEvent) {
if m.mailServerConfirmation {
if !m.isMailserver(event.Peer) {
return
}
}
m.mu.Lock()
defer m.mu.Unlock()
state, ok := m.envelopes[event.Hash]
// if we didn't send a message using extension - skip it
// if message was already confirmed - skip it
if !ok || state == EnvelopeSent {
return
}
m.logger.Debug("envelope is sent", zap.String("hash", event.Hash.String()), zap.String("peer", event.Peer.String()))
if event.Batch != (types.Hash{}) {
if _, ok := m.batches[event.Batch]; !ok {
m.batches[event.Batch] = map[types.Hash]struct{}{}
}
m.batches[event.Batch][event.Hash] = struct{}{}
m.logger.Debug("waiting for a confirmation", zap.String("batch", event.Batch.String()))
} else {
m.envelopes[event.Hash] = EnvelopeSent
if m.handler != nil {
m.handler.EnvelopeSent(m.identifiers[event.Hash])
}
}
}
func (m *EnvelopesMonitor) handleAcknowledgedBatch(event types.EnvelopeEvent) {
if m.mailServerConfirmation {
if !m.isMailserver(event.Peer) {
return
}
}
m.mu.Lock()
defer m.mu.Unlock()
envelopes, ok := m.batches[event.Batch]
if !ok {
m.logger.Debug("batch is not found", zap.String("batch", event.Batch.String()))
}
m.logger.Debug("received a confirmation", zap.String("batch", event.Batch.String()), zap.String("peer", event.Peer.String()))
envelopeErrors, ok := event.Data.([]types.EnvelopeError)
if event.Data != nil && !ok {
m.logger.Error("received unexpected data in the the confirmation event", zap.Any("data", event.Data))
}
failedEnvelopes := map[types.Hash]struct{}{}
for i := range envelopeErrors {
envelopeError := envelopeErrors[i]
_, exist := m.envelopes[envelopeError.Hash]
if exist {
m.logger.Warn("envelope that was posted by us is discarded", zap.String("hash", envelopeError.Hash.String()), zap.String("peer", event.Peer.String()), zap.String("error", envelopeError.Description))
var err error
switch envelopeError.Code {
case types.EnvelopeTimeNotSynced:
err = errors.New("envelope wasn't delivered due to time sync issues")
}
m.handleEnvelopeFailure(envelopeError.Hash, err)
}
failedEnvelopes[envelopeError.Hash] = struct{}{}
}
for hash := range envelopes {
if _, exist := failedEnvelopes[hash]; exist {
continue
}
state, ok := m.envelopes[hash]
if !ok || state == EnvelopeSent {
continue
}
m.envelopes[hash] = EnvelopeSent
if m.handler != nil {
m.handler.EnvelopeSent(m.identifiers[hash])
}
}
delete(m.batches, event.Batch)
}
func (m *EnvelopesMonitor) handleEventEnvelopeExpired(event types.EnvelopeEvent) {
m.mu.Lock()
defer m.mu.Unlock()
m.handleEnvelopeFailure(event.Hash, errors.New("envelope expired due to connectivity issues"))
}
// handleEnvelopeFailure is a common code path for processing envelopes failures. not thread safe, lock
// must be used on a higher level.
func (m *EnvelopesMonitor) handleEnvelopeFailure(hash types.Hash, err error) {
if state, ok := m.envelopes[hash]; ok {
message, exist := m.messages[hash]
if !exist {
m.logger.Error("message was deleted erroneously", zap.String("envelope hash", hash.String()))
}
attempt := m.attempts[hash]
identifiers := m.identifiers[hash]
m.clearMessageState(hash)
if state == EnvelopeSent {
return
}
if attempt < m.maxAttempts {
m.logger.Debug("retrying to send a message", zap.String("hash", hash.String()), zap.Int("attempt", attempt+1))
hex, err := m.api.Post(context.TODO(), *message)
if err != nil {
m.logger.Error("failed to retry sending message", zap.String("hash", hash.String()), zap.Int("attempt", attempt+1), zap.Error(err))
if m.handler != nil {
m.handler.EnvelopeExpired(identifiers, err)
}
}
envelopeID := types.BytesToHash(hex)
m.envelopes[envelopeID] = EnvelopePosted
m.messages[envelopeID] = message
m.attempts[envelopeID] = attempt + 1
m.identifiers[envelopeID] = identifiers
} else {
m.logger.Debug("envelope expired", zap.String("hash", hash.String()))
if m.handler != nil {
m.handler.EnvelopeExpired(identifiers, err)
}
}
}
}
func (m *EnvelopesMonitor) handleEventEnvelopeReceived(event types.EnvelopeEvent) {
if m.mailServerConfirmation {
if !m.isMailserver(event.Peer) {
return
}
}
m.mu.Lock()
defer m.mu.Unlock()
state, ok := m.envelopes[event.Hash]
if !ok || state != EnvelopePosted {
return
}
m.logger.Debug("expected envelope received", zap.String("hash", event.Hash.String()), zap.String("peer", event.Peer.String()))
m.envelopes[event.Hash] = EnvelopeSent
if m.handler != nil {
m.handler.EnvelopeSent(m.identifiers[event.Hash])
}
}
// clearMessageState removes all message and envelope state.
// not thread-safe, should be protected on a higher level.
func (m *EnvelopesMonitor) clearMessageState(envelopeID types.Hash) {
delete(m.envelopes, envelopeID)
delete(m.messages, envelopeID)
delete(m.attempts, envelopeID)
delete(m.identifiers, envelopeID)
}

View File

@ -1,38 +0,0 @@
package waku
import (
"encoding/hex"
"math/big"
"github.com/google/uuid"
"github.com/status-im/status-go/eth-node/types"
)
func createMessagesRequest(from, to uint32, cursor []byte, topics []types.TopicType) types.MessagesRequest {
aUUID := uuid.New()
// uuid is 16 bytes, converted to hex it's 32 bytes as expected by types.MessagesRequest
id := []byte(hex.EncodeToString(aUUID[:]))
return types.MessagesRequest{
ID: id,
From: from,
To: to,
Limit: 100,
Cursor: cursor,
Bloom: topicsToBloom(topics...),
}
}
func topicsToBloom(topics ...types.TopicType) []byte {
i := new(big.Int)
for _, topic := range topics {
bloom := types.TopicToBloom(topic)
i.Or(i, new(big.Int).SetBytes(bloom[:]))
}
combined := make([]byte, types.BloomFilterSize)
data := i.Bytes()
copy(combined[types.BloomFilterSize-len(data):], data[:])
return combined
}

View File

@ -1,13 +0,0 @@
package waku
import (
"github.com/status-im/status-go/eth-node/types"
)
type RequestOptions struct {
Topics []types.TopicType
Password string
Limit int
From int64 // in seconds
To int64 // in seconds
}

View File

@ -1,520 +0,0 @@
package waku
import (
"bytes"
"context"
"crypto/ecdsa"
"database/sql"
"sync"
"time"
"github.com/pkg/errors"
"go.uber.org/zap"
"github.com/status-im/status-go/eth-node/crypto"
"github.com/status-im/status-go/eth-node/types"
"github.com/status-im/status-go/protocol/transport"
)
var (
// ErrNoMailservers returned if there is no configured mailservers that can be used.
ErrNoMailservers = errors.New("no configured mailservers")
)
type wakuServiceKeysManager struct {
waku types.Waku
// Identity of the current user.
privateKey *ecdsa.PrivateKey
passToSymKeyMutex sync.RWMutex
passToSymKeyCache map[string]string
}
func (m *wakuServiceKeysManager) AddOrGetKeyPair(priv *ecdsa.PrivateKey) (string, error) {
// caching is handled in waku
return m.waku.AddKeyPair(priv)
}
func (m *wakuServiceKeysManager) AddOrGetSymKeyFromPassword(password string) (string, error) {
m.passToSymKeyMutex.Lock()
defer m.passToSymKeyMutex.Unlock()
if val, ok := m.passToSymKeyCache[password]; ok {
return val, nil
}
id, err := m.waku.AddSymKeyFromPassword(password)
if err != nil {
return id, err
}
m.passToSymKeyCache[password] = id
return id, nil
}
func (m *wakuServiceKeysManager) RawSymKey(id string) ([]byte, error) {
return m.waku.GetSymKey(id)
}
type Option func(*Transport) error
// Transport is a transport based on Whisper service.
type Transport struct {
waku types.Waku
api types.PublicWakuAPI // only PublicWakuAPI implements logic to send messages
keysManager *wakuServiceKeysManager
filters *transport.FiltersManager
logger *zap.Logger
cache *transport.ProcessedMessageIDsCache
mailservers []string
envelopesMonitor *EnvelopesMonitor
quit chan struct{}
}
// NewTransport returns a new Transport.
// TODO: leaving a chat should verify that for a given public key
// there are no other chats. It may happen that we leave a private chat
// but still have a public chat for a given public key.
func NewTransport(
waku types.Waku,
privateKey *ecdsa.PrivateKey,
db *sql.DB,
mailservers []string,
envelopesMonitorConfig *transport.EnvelopesMonitorConfig,
logger *zap.Logger,
opts ...Option,
) (*Transport, error) {
filtersManager, err := transport.NewFiltersManager(newSQLitePersistence(db), waku, privateKey, logger)
if err != nil {
return nil, err
}
var envelopesMonitor *EnvelopesMonitor
if envelopesMonitorConfig != nil {
envelopesMonitor = NewEnvelopesMonitor(waku, *envelopesMonitorConfig)
envelopesMonitor.Start()
}
var api types.PublicWhisperAPI
if waku != nil {
api = waku.PublicWakuAPI()
}
t := &Transport{
waku: waku,
api: api,
cache: transport.NewProcessedMessageIDsCache(db),
envelopesMonitor: envelopesMonitor,
quit: make(chan struct{}),
keysManager: &wakuServiceKeysManager{
waku: waku,
privateKey: privateKey,
passToSymKeyCache: make(map[string]string),
},
filters: filtersManager,
mailservers: mailservers,
logger: logger.With(zap.Namespace("Transport")),
}
for _, opt := range opts {
if err := opt(t); err != nil {
return nil, err
}
}
t.cleanFiltersLoop()
return t, nil
}
func (a *Transport) InitFilters(chatIDs []string, publicKeys []*ecdsa.PublicKey) ([]*transport.Filter, error) {
return a.filters.Init(chatIDs, publicKeys)
}
func (a *Transport) InitPublicFilters(chatIDs []string) ([]*transport.Filter, error) {
return a.filters.InitPublicFilters(chatIDs)
}
func (a *Transport) Filters() []*transport.Filter {
return a.filters.Filters()
}
func (a *Transport) FilterByChatID(chatID string) *transport.Filter {
return a.filters.FilterByChatID(chatID)
}
func (a *Transport) LoadFilters(filters []*transport.Filter) ([]*transport.Filter, error) {
return a.filters.InitWithFilters(filters)
}
func (a *Transport) InitCommunityFilters(pks []*ecdsa.PrivateKey) ([]*transport.Filter, error) {
return a.filters.InitCommunityFilters(pks)
}
func (a *Transport) RemoveFilters(filters []*transport.Filter) error {
return a.filters.Remove(filters...)
}
func (a *Transport) RemoveFilterByChatID(chatID string) (*transport.Filter, error) {
return a.filters.RemoveFilterByChatID(chatID)
}
func (a *Transport) ResetFilters() error {
return a.filters.Reset()
}
func (a *Transport) ProcessNegotiatedSecret(secret types.NegotiatedSecret) (*transport.Filter, error) {
filter, err := a.filters.LoadNegotiated(secret)
if err != nil {
return nil, err
}
return filter, nil
}
func (a *Transport) JoinPublic(chatID string) (*transport.Filter, error) {
return a.filters.LoadPublic(chatID)
}
func (a *Transport) LeavePublic(chatID string) error {
chat := a.filters.Filter(chatID)
if chat != nil {
return nil
}
return a.filters.Remove(chat)
}
func (a *Transport) JoinPrivate(publicKey *ecdsa.PublicKey) (*transport.Filter, error) {
return a.filters.LoadContactCode(publicKey)
}
func (a *Transport) LeavePrivate(publicKey *ecdsa.PublicKey) error {
filters := a.filters.FiltersByPublicKey(publicKey)
return a.filters.Remove(filters...)
}
func (a *Transport) JoinGroup(publicKeys []*ecdsa.PublicKey) ([]*transport.Filter, error) {
var filters []*transport.Filter
for _, pk := range publicKeys {
f, err := a.filters.LoadContactCode(pk)
if err != nil {
return nil, err
}
filters = append(filters, f)
}
return filters, nil
}
func (a *Transport) LeaveGroup(publicKeys []*ecdsa.PublicKey) error {
for _, publicKey := range publicKeys {
filters := a.filters.FiltersByPublicKey(publicKey)
if err := a.filters.Remove(filters...); err != nil {
return err
}
}
return nil
}
func (a *Transport) RetrieveRawAll() (map[transport.Filter][]*types.Message, error) {
result := make(map[transport.Filter][]*types.Message)
allFilters := a.filters.Filters()
for _, filter := range allFilters {
// Don't pull from filters we don't listen to
if !filter.Listen {
continue
}
msgs, err := a.api.GetFilterMessages(filter.FilterID)
if err != nil {
a.logger.Warn("failed to fetch messages", zap.Error(err))
continue
}
if len(msgs) == 0 {
continue
}
ids := make([]string, len(msgs))
for i := range msgs {
id := types.EncodeHex(msgs[i].Hash)
ids[i] = id
}
hits, err := a.cache.Hits(ids)
if err != nil {
a.logger.Error("failed to check messages exists", zap.Error(err))
return nil, err
}
for i := range msgs {
// Exclude anything that is a cache hit
if !hits[types.EncodeHex(msgs[i].Hash)] {
result[*filter] = append(result[*filter], msgs[i])
}
}
}
return result, nil
}
// SendPublic sends a new message using the Whisper service.
// For public filters, chat name is used as an ID as well as
// a topic.
func (a *Transport) SendPublic(ctx context.Context, newMessage *types.NewMessage, chatName string) ([]byte, error) {
if err := a.addSig(newMessage); err != nil {
return nil, err
}
filter, err := a.filters.LoadPublic(chatName)
if err != nil {
return nil, err
}
newMessage.SymKeyID = filter.SymKeyID
newMessage.Topic = filter.Topic
return a.api.Post(ctx, *newMessage)
}
func (a *Transport) SendPrivateWithSharedSecret(ctx context.Context, newMessage *types.NewMessage, publicKey *ecdsa.PublicKey, secret []byte) ([]byte, error) {
if err := a.addSig(newMessage); err != nil {
return nil, err
}
filter, err := a.filters.LoadNegotiated(types.NegotiatedSecret{
PublicKey: publicKey,
Key: secret,
})
if err != nil {
return nil, err
}
newMessage.SymKeyID = filter.SymKeyID
newMessage.Topic = filter.Topic
newMessage.PublicKey = nil
return a.api.Post(ctx, *newMessage)
}
func (a *Transport) SendPrivateWithPartitioned(ctx context.Context, newMessage *types.NewMessage, publicKey *ecdsa.PublicKey) ([]byte, error) {
if err := a.addSig(newMessage); err != nil {
return nil, err
}
filter, err := a.filters.LoadPartitioned(publicKey, a.keysManager.privateKey, false)
if err != nil {
return nil, err
}
newMessage.Topic = filter.Topic
newMessage.PublicKey = crypto.FromECDSAPub(publicKey)
return a.api.Post(ctx, *newMessage)
}
func (a *Transport) SendPrivateOnPersonalTopic(ctx context.Context, newMessage *types.NewMessage, publicKey *ecdsa.PublicKey) ([]byte, error) {
if err := a.addSig(newMessage); err != nil {
return nil, err
}
filter, err := a.filters.LoadPersonal(publicKey, a.keysManager.privateKey, false)
if err != nil {
return nil, err
}
newMessage.Topic = filter.Topic
newMessage.PublicKey = crypto.FromECDSAPub(publicKey)
return a.api.Post(ctx, *newMessage)
}
func (a *Transport) LoadKeyFilters(key *ecdsa.PrivateKey) (*transport.Filter, error) {
return a.filters.LoadEphemeral(&key.PublicKey, key, true)
}
func (a *Transport) SendCommunityMessage(ctx context.Context, newMessage *types.NewMessage, publicKey *ecdsa.PublicKey) ([]byte, error) {
if err := a.addSig(newMessage); err != nil {
return nil, err
}
// We load the filter to make sure we can post on it
filter, err := a.filters.LoadPublic(transport.PubkeyToHex(publicKey)[2:])
if err != nil {
return nil, err
}
newMessage.Topic = filter.Topic
newMessage.PublicKey = crypto.FromECDSAPub(publicKey)
a.logger.Debug("SENDING message", zap.Binary("topic", filter.Topic[:]))
return a.api.Post(ctx, *newMessage)
}
func (a *Transport) cleanFilters() error {
return a.filters.RemoveNoListenFilters()
}
func (a *Transport) addSig(newMessage *types.NewMessage) error {
sigID, err := a.keysManager.AddOrGetKeyPair(a.keysManager.privateKey)
if err != nil {
return err
}
newMessage.SigID = sigID
return nil
}
func (a *Transport) Track(identifiers [][]byte, hash []byte, newMessage *types.NewMessage) {
if a.envelopesMonitor != nil {
a.envelopesMonitor.Add(identifiers, types.BytesToHash(hash), *newMessage)
}
}
// GetCurrentTime returns the current unix timestamp in milliseconds
func (a *Transport) GetCurrentTime() uint64 {
return uint64(a.waku.GetCurrentTime().UnixNano() / int64(time.Millisecond))
}
func (a *Transport) MaxMessageSize() uint32 {
return a.waku.MaxMessageSize()
}
func (a *Transport) Stop() error {
close(a.quit)
if a.envelopesMonitor != nil {
a.envelopesMonitor.Stop()
}
return nil
}
// cleanFiltersLoop cleans up the topic we create for the only purpose
// of sending messages.
// Whenever we send a message we also need to listen to that particular topic
// but in case of asymettric topics, we are not interested in listening to them.
// We therefore periodically clean them up so we don't receive unnecessary data.
func (a *Transport) cleanFiltersLoop() {
ticker := time.NewTicker(5 * time.Minute)
go func() {
for {
select {
case <-a.quit:
ticker.Stop()
return
case <-ticker.C:
err := a.cleanFilters()
if err != nil {
a.logger.Error("failed to clean up topics", zap.Error(err))
}
}
}
}()
}
func (a *Transport) sendMessagesRequestForTopics(
ctx context.Context,
peerID []byte,
from, to uint32,
previousCursor []byte,
topics []types.TopicType,
waitForResponse bool,
) (cursor []byte, err error) {
r := createMessagesRequest(from, to, previousCursor, topics)
r.SetDefaults(a.waku.GetCurrentTime())
events := make(chan types.EnvelopeEvent, 10)
sub := a.waku.SubscribeEnvelopeEvents(events)
defer sub.Unsubscribe()
err = a.waku.SendMessagesRequest(peerID, r)
if err != nil {
return
}
if !waitForResponse {
return
}
resp, err := a.waitForRequestCompleted(ctx, r.ID, events)
if err == nil && resp != nil && resp.Error != nil {
err = resp.Error
} else if err == nil && resp != nil {
cursor = resp.Cursor
}
return
}
// RequestHistoricMessages requests historic messages for all registered filters.
func (a *Transport) SendMessagesRequest(
ctx context.Context,
peerID []byte,
from, to uint32,
previousCursor []byte,
waitForResponse bool,
) (cursor []byte, err error) {
topics := make([]types.TopicType, len(a.Filters()))
for _, f := range a.Filters() {
topics = append(topics, f.Topic)
}
return a.sendMessagesRequestForTopics(ctx, peerID, from, to, previousCursor, topics, waitForResponse)
}
func (a *Transport) SendMessagesRequestForFilter(
ctx context.Context,
peerID []byte,
from, to uint32,
previousCursor []byte,
filter *transport.Filter,
waitForResponse bool,
) (cursor []byte, err error) {
topics := make([]types.TopicType, len(a.Filters()))
topics = append(topics, filter.Topic)
return a.sendMessagesRequestForTopics(ctx, peerID, from, to, previousCursor, topics, waitForResponse)
}
func (a *Transport) waitForRequestCompleted(ctx context.Context, requestID []byte, events chan types.EnvelopeEvent) (*types.MailServerResponse, error) {
for {
select {
case ev := <-events:
if !bytes.Equal(ev.Hash.Bytes(), requestID) {
continue
}
if ev.Event != types.EventMailServerRequestCompleted {
continue
}
data, ok := ev.Data.(*types.MailServerResponse)
if ok {
return data, nil
}
case <-ctx.Done():
return nil, ctx.Err()
}
}
}
// ConfirmMessagesProcessed marks the messages as processed in the cache so
// they won't be passed to the next layer anymore
func (a *Transport) ConfirmMessagesProcessed(ids []string, timestamp uint64) error {
return a.cache.Add(ids, timestamp)
}
// CleanMessagesProcessed clears the messages that are older than timestamp
func (a *Transport) CleanMessagesProcessed(timestamp uint64) error {
return a.cache.Clean(timestamp)
}
func (a *Transport) SetEnvelopeEventsHandler(handler transport.EnvelopeEventsHandler) error {
if a.envelopesMonitor == nil {
return errors.New("Current transport has no envelopes monitor")
}
a.envelopesMonitor.handler = handler
return nil
}

View File

@ -1,311 +0,0 @@
package whisper
import (
"context"
"errors"
"sync"
"github.com/status-im/status-go/protocol/transport"
"go.uber.org/zap"
"github.com/status-im/status-go/eth-node/types"
)
// EnvelopeState in local tracker
type EnvelopeState int
const (
// NotRegistered returned if asked hash wasn't registered in the tracker.
NotRegistered EnvelopeState = -1
// EnvelopePosted is set when envelope was added to a local whisper queue.
EnvelopePosted EnvelopeState = iota
// EnvelopeSent is set when envelope is sent to at least one peer.
EnvelopeSent
)
// EnvelopeEventsHandler used for two different event types.
type EnvelopeEventsHandler interface {
EnvelopeSent([][]byte)
EnvelopeExpired([][]byte, error)
MailServerRequestCompleted(types.Hash, types.Hash, []byte, error)
MailServerRequestExpired(types.Hash)
}
// NewEnvelopesMonitor returns a pointer to an instance of the EnvelopesMonitor.
func NewEnvelopesMonitor(w types.Whisper, config transport.EnvelopesMonitorConfig) *EnvelopesMonitor {
logger := config.Logger
if logger == nil {
logger = zap.NewNop()
}
var whisperAPI types.PublicWhisperAPI
if w != nil {
whisperAPI = w.PublicWhisperAPI()
}
return &EnvelopesMonitor{
w: w,
whisperAPI: whisperAPI,
handler: config.EnvelopeEventsHandler,
mailServerConfirmation: config.MailserverConfirmationsEnabled,
maxAttempts: config.MaxAttempts,
isMailserver: config.IsMailserver,
logger: logger.With(zap.Namespace("EnvelopesMonitor")),
// key is envelope hash (event.Hash)
envelopes: map[types.Hash]EnvelopeState{},
messages: map[types.Hash]*types.NewMessage{},
attempts: map[types.Hash]int{},
identifiers: make(map[types.Hash][][]byte),
// key is hash of the batch (event.Batch)
batches: map[types.Hash]map[types.Hash]struct{}{},
}
}
// EnvelopesMonitor is responsible for monitoring whisper envelopes state.
type EnvelopesMonitor struct {
w types.Whisper
whisperAPI types.PublicWhisperAPI
handler EnvelopeEventsHandler
mailServerConfirmation bool
maxAttempts int
mu sync.Mutex
envelopes map[types.Hash]EnvelopeState
batches map[types.Hash]map[types.Hash]struct{}
messages map[types.Hash]*types.NewMessage
attempts map[types.Hash]int
identifiers map[types.Hash][][]byte
wg sync.WaitGroup
quit chan struct{}
isMailserver func(peer types.EnodeID) bool
logger *zap.Logger
}
// Start processing events.
func (m *EnvelopesMonitor) Start() {
m.quit = make(chan struct{})
m.wg.Add(1)
go func() {
m.handleEnvelopeEvents()
m.wg.Done()
}()
}
// Stop process events.
func (m *EnvelopesMonitor) Stop() {
close(m.quit)
m.wg.Wait()
}
// Add hash to a tracker.
func (m *EnvelopesMonitor) Add(identifiers [][]byte, envelopeHash types.Hash, message types.NewMessage) {
m.mu.Lock()
defer m.mu.Unlock()
m.envelopes[envelopeHash] = EnvelopePosted
m.identifiers[envelopeHash] = identifiers
m.messages[envelopeHash] = &message
m.attempts[envelopeHash] = 1
}
func (m *EnvelopesMonitor) GetState(hash types.Hash) EnvelopeState {
m.mu.Lock()
defer m.mu.Unlock()
state, exist := m.envelopes[hash]
if !exist {
return NotRegistered
}
return state
}
// handleEnvelopeEvents processes whisper envelope events
func (m *EnvelopesMonitor) handleEnvelopeEvents() {
events := make(chan types.EnvelopeEvent, 100) // must be buffered to prevent blocking whisper
sub := m.w.SubscribeEnvelopeEvents(events)
defer func() {
close(events)
sub.Unsubscribe()
}()
for {
select {
case <-m.quit:
return
case event := <-events:
m.handleEvent(event)
}
}
}
// handleEvent based on type of the event either triggers
// confirmation handler or removes hash from tracker
func (m *EnvelopesMonitor) handleEvent(event types.EnvelopeEvent) {
handlers := map[types.EventType]func(types.EnvelopeEvent){
types.EventEnvelopeSent: m.handleEventEnvelopeSent,
types.EventEnvelopeExpired: m.handleEventEnvelopeExpired,
types.EventBatchAcknowledged: m.handleAcknowledgedBatch,
types.EventEnvelopeReceived: m.handleEventEnvelopeReceived,
}
if handler, ok := handlers[event.Event]; ok {
handler(event)
}
}
func (m *EnvelopesMonitor) handleEventEnvelopeSent(event types.EnvelopeEvent) {
if m.mailServerConfirmation {
if !m.isMailserver(event.Peer) {
return
}
}
m.mu.Lock()
defer m.mu.Unlock()
state, ok := m.envelopes[event.Hash]
// if we didn't send a message using extension - skip it
// if message was already confirmed - skip it
if !ok || state == EnvelopeSent {
return
}
m.logger.Debug("envelope is sent", zap.String("hash", event.Hash.String()), zap.String("peer", event.Peer.String()))
if event.Batch != (types.Hash{}) {
if _, ok := m.batches[event.Batch]; !ok {
m.batches[event.Batch] = map[types.Hash]struct{}{}
}
m.batches[event.Batch][event.Hash] = struct{}{}
m.logger.Debug("waiting for a confirmation", zap.String("batch", event.Batch.String()))
} else {
m.envelopes[event.Hash] = EnvelopeSent
if m.handler != nil {
m.handler.EnvelopeSent(m.identifiers[event.Hash])
}
}
}
func (m *EnvelopesMonitor) handleAcknowledgedBatch(event types.EnvelopeEvent) {
if m.mailServerConfirmation {
if !m.isMailserver(event.Peer) {
return
}
}
m.mu.Lock()
defer m.mu.Unlock()
envelopes, ok := m.batches[event.Batch]
if !ok {
m.logger.Debug("batch is not found", zap.String("batch", event.Batch.String()))
}
m.logger.Debug("received a confirmation", zap.String("batch", event.Batch.String()), zap.String("peer", event.Peer.String()))
envelopeErrors, ok := event.Data.([]types.EnvelopeError)
if event.Data != nil && !ok {
m.logger.Error("received unexpected data in the the confirmation event", zap.Any("data", event.Data))
}
failedEnvelopes := map[types.Hash]struct{}{}
for i := range envelopeErrors {
envelopeError := envelopeErrors[i]
_, exist := m.envelopes[envelopeError.Hash]
if exist {
m.logger.Warn("envelope that was posted by us is discarded", zap.String("hash", envelopeError.Hash.String()), zap.String("peer", event.Peer.String()), zap.String("error", envelopeError.Description))
var err error
switch envelopeError.Code {
case types.EnvelopeTimeNotSynced:
err = errors.New("envelope wasn't delivered due to time sync issues")
}
m.handleEnvelopeFailure(envelopeError.Hash, err)
}
failedEnvelopes[envelopeError.Hash] = struct{}{}
}
for hash := range envelopes {
if _, exist := failedEnvelopes[hash]; exist {
continue
}
state, ok := m.envelopes[hash]
if !ok || state == EnvelopeSent {
continue
}
m.envelopes[hash] = EnvelopeSent
if m.handler != nil {
m.handler.EnvelopeSent(m.identifiers[hash])
}
}
delete(m.batches, event.Batch)
}
func (m *EnvelopesMonitor) handleEventEnvelopeExpired(event types.EnvelopeEvent) {
m.mu.Lock()
defer m.mu.Unlock()
m.handleEnvelopeFailure(event.Hash, errors.New("envelope expired due to connectivity issues"))
}
// handleEnvelopeFailure is a common code path for processing envelopes failures. not thread safe, lock
// must be used on a higher level.
func (m *EnvelopesMonitor) handleEnvelopeFailure(hash types.Hash, err error) {
if state, ok := m.envelopes[hash]; ok {
message, exist := m.messages[hash]
if !exist {
m.logger.Error("message was deleted erroneously", zap.String("envelope hash", hash.String()))
}
attempt := m.attempts[hash]
identifiers := m.identifiers[hash]
m.clearMessageState(hash)
if state == EnvelopeSent {
return
}
if attempt < m.maxAttempts {
m.logger.Debug("retrying to send a message", zap.String("hash", hash.String()), zap.Int("attempt", attempt+1))
hex, err := m.whisperAPI.Post(context.TODO(), *message)
if err != nil {
m.logger.Error("failed to retry sending message", zap.String("hash", hash.String()), zap.Int("attempt", attempt+1), zap.Error(err))
if m.handler != nil {
m.handler.EnvelopeExpired(identifiers, err)
}
}
envelopeID := types.BytesToHash(hex)
m.envelopes[envelopeID] = EnvelopePosted
m.messages[envelopeID] = message
m.attempts[envelopeID] = attempt + 1
m.identifiers[envelopeID] = identifiers
} else {
m.logger.Debug("envelope expired", zap.String("hash", hash.String()))
if m.handler != nil {
m.handler.EnvelopeExpired(identifiers, err)
}
}
}
}
func (m *EnvelopesMonitor) handleEventEnvelopeReceived(event types.EnvelopeEvent) {
if m.mailServerConfirmation {
if !m.isMailserver(event.Peer) {
return
}
}
m.mu.Lock()
defer m.mu.Unlock()
state, ok := m.envelopes[event.Hash]
if !ok || state != EnvelopePosted {
return
}
m.logger.Debug("expected envelope received", zap.String("hash", event.Hash.String()), zap.String("peer", event.Peer.String()))
m.envelopes[event.Hash] = EnvelopeSent
if m.handler != nil {
m.handler.EnvelopeSent(m.identifiers[event.Hash])
}
}
// clearMessageState removes all message and envelope state.
// not thread-safe, should be protected on a higher level.
func (m *EnvelopesMonitor) clearMessageState(envelopeID types.Hash) {
delete(m.envelopes, envelopeID)
delete(m.messages, envelopeID)
delete(m.attempts, envelopeID)
delete(m.identifiers, envelopeID)
}

View File

@ -1,121 +0,0 @@
package whisper
import (
"testing"
"go.uber.org/zap"
"github.com/stretchr/testify/suite"
"github.com/ethereum/go-ethereum/p2p/enode"
"github.com/status-im/status-go/eth-node/crypto"
"github.com/status-im/status-go/eth-node/types"
"github.com/status-im/status-go/protocol/transport"
)
var (
testHash = types.Hash{0x01}
testIDs = [][]byte{[]byte("id")}
)
type EnvelopesMonitorSuite struct {
suite.Suite
monitor *EnvelopesMonitor
}
func TestEnvelopesMonitorSuite(t *testing.T) {
suite.Run(t, new(EnvelopesMonitorSuite))
}
func (s *EnvelopesMonitorSuite) SetupTest() {
s.monitor = NewEnvelopesMonitor(
nil,
transport.EnvelopesMonitorConfig{
EnvelopeEventsHandler: nil,
MaxAttempts: 0,
MailserverConfirmationsEnabled: false,
IsMailserver: func(types.EnodeID) bool { return false },
Logger: zap.NewNop(),
},
)
}
func (s *EnvelopesMonitorSuite) TestConfirmed() {
s.monitor.Add(testIDs, testHash, types.NewMessage{})
s.Contains(s.monitor.envelopes, testHash)
s.Equal(EnvelopePosted, s.monitor.envelopes[testHash])
s.monitor.handleEvent(types.EnvelopeEvent{
Event: types.EventEnvelopeSent,
Hash: testHash,
})
s.Contains(s.monitor.envelopes, testHash)
s.Equal(EnvelopeSent, s.monitor.envelopes[testHash])
}
func (s *EnvelopesMonitorSuite) TestConfirmedWithAcknowledge() {
testBatch := types.Hash{1}
pkey, err := crypto.GenerateKey()
s.Require().NoError(err)
node := enode.NewV4(&pkey.PublicKey, nil, 0, 0)
s.monitor.Add(testIDs, testHash, types.NewMessage{})
s.Contains(s.monitor.envelopes, testHash)
s.Equal(EnvelopePosted, s.monitor.envelopes[testHash])
s.monitor.handleEvent(types.EnvelopeEvent{
Event: types.EventEnvelopeSent,
Hash: testHash,
Batch: testBatch,
})
s.Equal(EnvelopePosted, s.monitor.envelopes[testHash])
s.monitor.handleEvent(types.EnvelopeEvent{
Event: types.EventBatchAcknowledged,
Batch: testBatch,
Peer: types.EnodeID(node.ID()),
})
s.Contains(s.monitor.envelopes, testHash)
s.Equal(EnvelopeSent, s.monitor.envelopes[testHash])
}
func (s *EnvelopesMonitorSuite) TestIgnored() {
s.monitor.handleEvent(types.EnvelopeEvent{
Event: types.EventEnvelopeSent,
Hash: testHash,
})
s.NotContains(s.monitor.envelopes, testHash)
}
func (s *EnvelopesMonitorSuite) TestRemoved() {
s.monitor.Add(testIDs, testHash, types.NewMessage{})
s.Contains(s.monitor.envelopes, testHash)
s.monitor.handleEvent(types.EnvelopeEvent{
Event: types.EventEnvelopeExpired,
Hash: testHash,
})
s.NotContains(s.monitor.envelopes, testHash)
}
func (s *EnvelopesMonitorSuite) TestIgnoreNotFromMailserver() {
// enables filter in the tracker to drop confirmations from non-mailserver peers
s.monitor.mailServerConfirmation = true
s.monitor.Add(testIDs, testHash, types.NewMessage{})
s.monitor.handleEvent(types.EnvelopeEvent{
Event: types.EventEnvelopeSent,
Hash: testHash,
Peer: types.EnodeID{1}, // could be empty, doesn't impact test behaviour
})
s.Require().Equal(EnvelopePosted, s.monitor.GetState(testHash))
}
func (s *EnvelopesMonitorSuite) TestReceived() {
s.monitor.isMailserver = func(peer types.EnodeID) bool {
return true
}
s.monitor.Add(testIDs, testHash, types.NewMessage{})
s.Contains(s.monitor.envelopes, testHash)
s.monitor.handleEvent(types.EnvelopeEvent{
Event: types.EventEnvelopeReceived,
Hash: testHash,
})
s.Require().Equal(EnvelopeSent, s.monitor.GetState(testHash))
}

View File

@ -1,38 +0,0 @@
package whisper
import (
"encoding/hex"
"math/big"
"github.com/google/uuid"
"github.com/status-im/status-go/eth-node/types"
)
func createMessagesRequest(from, to uint32, cursor []byte, topics []types.TopicType) types.MessagesRequest {
aUUID := uuid.New()
// uuid is 16 bytes, converted to hex it's 32 bytes as expected by types.MessagesRequest
id := []byte(hex.EncodeToString(aUUID[:]))
return types.MessagesRequest{
ID: id,
From: from,
To: to,
Limit: 100,
Cursor: cursor,
Bloom: topicsToBloom(topics...),
}
}
func topicsToBloom(topics ...types.TopicType) []byte {
i := new(big.Int)
for _, topic := range topics {
bloom := types.TopicToBloom(topic)
i.Or(i, new(big.Int).SetBytes(bloom[:]))
}
combined := make([]byte, types.BloomFilterSize)
data := i.Bytes()
copy(combined[types.BloomFilterSize-len(data):], data[:])
return combined
}

View File

@ -1,319 +0,0 @@
// Code generated by go-bindata. DO NOT EDIT.
// sources:
// 1561059285_add_whisper_keys.down.sql (25B)
// 1561059285_add_whisper_keys.up.sql (112B)
// doc.go (373B)
package sqlite
import (
"bytes"
"compress/gzip"
"crypto/sha256"
"fmt"
"io"
"io/ioutil"
"os"
"path/filepath"
"strings"
"time"
)
func bindataRead(data []byte, name string) ([]byte, error) {
gz, err := gzip.NewReader(bytes.NewBuffer(data))
if err != nil {
return nil, fmt.Errorf("read %q: %v", name, err)
}
var buf bytes.Buffer
_, err = io.Copy(&buf, gz)
clErr := gz.Close()
if err != nil {
return nil, fmt.Errorf("read %q: %v", name, err)
}
if clErr != nil {
return nil, err
}
return buf.Bytes(), nil
}
type asset struct {
bytes []byte
info os.FileInfo
digest [sha256.Size]byte
}
type bindataFileInfo struct {
name string
size int64
mode os.FileMode
modTime time.Time
}
func (fi bindataFileInfo) Name() string {
return fi.name
}
func (fi bindataFileInfo) Size() int64 {
return fi.size
}
func (fi bindataFileInfo) Mode() os.FileMode {
return fi.mode
}
func (fi bindataFileInfo) ModTime() time.Time {
return fi.modTime
}
func (fi bindataFileInfo) IsDir() bool {
return false
}
func (fi bindataFileInfo) Sys() interface{} {
return nil
}
var __1561059285_add_whisper_keysDownSql = []byte("\x1f\x8b\x08\x00\x00\x00\x00\x00\x00\xff\x72\x09\xf2\x0f\x50\x08\x71\x74\xf2\x71\x55\x28\xcf\xc8\x2c\x2e\x48\x2d\x8a\xcf\x4e\xad\x2c\xb6\xe6\x02\x04\x00\x00\xff\xff\x42\x93\x8e\x79\x19\x00\x00\x00")
func _1561059285_add_whisper_keysDownSqlBytes() ([]byte, error) {
return bindataRead(
__1561059285_add_whisper_keysDownSql,
"1561059285_add_whisper_keys.down.sql",
)
}
func _1561059285_add_whisper_keysDownSql() (*asset, error) {
bytes, err := _1561059285_add_whisper_keysDownSqlBytes()
if err != nil {
return nil, err
}
info := bindataFileInfo{name: "1561059285_add_whisper_keys.down.sql", size: 25, mode: os.FileMode(0644), modTime: time.Unix(1610115164, 0)}
a := &asset{bytes: bytes, info: info, digest: [32]uint8{0xb9, 0x31, 0x3f, 0xce, 0xfa, 0x44, 0x36, 0x1b, 0xb0, 0xec, 0x5d, 0xb, 0x90, 0xb, 0x21, 0x4f, 0xd5, 0xe5, 0x50, 0xed, 0xc7, 0x43, 0xdf, 0x83, 0xb4, 0x3a, 0xc1, 0x55, 0x2e, 0x53, 0x7c, 0x67}}
return a, nil
}
var __1561059285_add_whisper_keysUpSql = []byte("\x1f\x8b\x08\x00\x00\x00\x00\x00\x00\xff\x04\xc0\xb1\x0a\xc2\x40\x0c\x06\xe0\xfd\x9e\xe2\x1f\x15\x7c\x03\xa7\xde\x19\x35\x18\x13\x09\x29\xb5\x53\x11\x3d\x68\xe9\x22\x56\x90\xbe\xbd\x5f\x71\x6a\x82\x10\x4d\x16\xc2\x6f\x9c\x96\x77\xfd\x0c\x73\x5d\x17\x6c\x12\xf0\x1c\x1f\xdf\x61\x7a\x21\xe8\x1e\xb8\x39\x5f\x1b\xef\x71\xa1\x1e\xa6\x28\xa6\x47\xe1\x12\xe0\x93\x9a\xd3\x2e\x01\x73\x5d\x91\xc5\x32\xd4\x02\xda\x8a\xa4\x2d\x3a\x8e\xb3\xb5\x01\xb7\x8e\x0f\xfb\xf4\x0f\x00\x00\xff\xff\x6e\x23\x28\x7d\x70\x00\x00\x00")
func _1561059285_add_whisper_keysUpSqlBytes() ([]byte, error) {
return bindataRead(
__1561059285_add_whisper_keysUpSql,
"1561059285_add_whisper_keys.up.sql",
)
}
func _1561059285_add_whisper_keysUpSql() (*asset, error) {
bytes, err := _1561059285_add_whisper_keysUpSqlBytes()
if err != nil {
return nil, err
}
info := bindataFileInfo{name: "1561059285_add_whisper_keys.up.sql", size: 112, mode: os.FileMode(0644), modTime: time.Unix(1610115164, 0)}
a := &asset{bytes: bytes, info: info, digest: [32]uint8{0x25, 0x41, 0xc, 0x92, 0xdd, 0x9e, 0xff, 0x5d, 0xd0, 0x93, 0xe4, 0x24, 0x50, 0x29, 0xcf, 0xc6, 0xf7, 0x49, 0x3c, 0x73, 0xd9, 0x8c, 0xfa, 0xf2, 0xcf, 0xf6, 0x6f, 0xbc, 0x31, 0xe6, 0xf7, 0xe2}}
return a, nil
}
var _docGo = []byte("\x1f\x8b\x08\x00\x00\x00\x00\x00\x00\xff\x84\x8f\x3d\x72\xeb\x30\x0c\x84\x7b\x9d\x62\xc7\x8d\x9b\x27\xb2\x79\x55\xba\x94\xe9\x73\x01\x98\x5a\x91\x18\x4b\xa4\x42\xc0\x7f\xb7\xcf\xc8\xe3\xc2\x5d\xda\x1d\x7c\x1f\x76\x63\xc4\x77\x51\xc3\xac\x0b\xa1\x86\xca\x44\x33\xe9\x0f\x9c\x98\xe4\x62\xc4\x21\xab\x97\xcb\x29\xa4\xb6\x46\x73\xf1\x8b\x8d\xba\xc6\x55\x73\x17\x67\xbc\xfe\x3f\x0c\x31\x22\x49\x3d\x3a\x8a\xd4\x69\xe1\xd3\x65\x30\x97\xee\x5a\x33\x6e\xea\x05\x82\xad\x73\xd6\x7b\xc0\xa7\x63\xa1\x98\xc3\x8b\xf8\xd1\xe0\x85\x48\x62\xdc\x35\x73\xeb\xc8\x6d\x3c\x69\x9d\xc4\x25\xec\xd1\xd7\xfc\x96\xec\x0d\x93\x2c\x0b\x27\xcc\xbd\xad\x4f\xd6\x64\x25\x26\xed\x4c\xde\xfa\xe3\x1f\xc4\x8c\x8e\x2a\x2b\x6d\xe7\x8b\x5c\x89\xda\x5e\xef\x21\x75\xfa\x7b\x11\x6e\xad\x9f\x0d\x62\xe0\x7d\x63\x72\x4e\x61\x18\x36\x49\x67\xc9\x84\xfd\x2c\xea\x1c\x86\x18\x73\xfb\xc8\xac\xdc\xa9\xf7\x8e\xe3\x76\xce\xaf\x2b\x8c\x0d\x21\xbc\xd4\xda\xaa\x85\xdc\x10\x86\xdf\x00\x00\x00\xff\xff\x21\xa5\x75\x05\x75\x01\x00\x00")
func docGoBytes() ([]byte, error) {
return bindataRead(
_docGo,
"doc.go",
)
}
func docGo() (*asset, error) {
bytes, err := docGoBytes()
if err != nil {
return nil, err
}
info := bindataFileInfo{name: "doc.go", size: 373, mode: os.FileMode(0644), modTime: time.Unix(1610115164, 0)}
a := &asset{bytes: bytes, info: info, digest: [32]uint8{0x23, 0x6a, 0xc1, 0xce, 0x94, 0xf6, 0xef, 0xf1, 0x97, 0x95, 0xb, 0x35, 0xaf, 0x5f, 0xe7, 0x5f, 0xac, 0x6e, 0xb8, 0xab, 0xba, 0xb5, 0x35, 0x97, 0x22, 0x36, 0x11, 0xce, 0x44, 0xfc, 0xfa, 0xac}}
return a, nil
}
// Asset loads and returns the asset for the given name.
// It returns an error if the asset could not be found or
// could not be loaded.
func Asset(name string) ([]byte, error) {
canonicalName := strings.Replace(name, "\\", "/", -1)
if f, ok := _bindata[canonicalName]; ok {
a, err := f()
if err != nil {
return nil, fmt.Errorf("Asset %s can't read by error: %v", name, err)
}
return a.bytes, nil
}
return nil, fmt.Errorf("Asset %s not found", name)
}
// AssetString returns the asset contents as a string (instead of a []byte).
func AssetString(name string) (string, error) {
data, err := Asset(name)
return string(data), err
}
// MustAsset is like Asset but panics when Asset would return an error.
// It simplifies safe initialization of global variables.
func MustAsset(name string) []byte {
a, err := Asset(name)
if err != nil {
panic("asset: Asset(" + name + "): " + err.Error())
}
return a
}
// MustAssetString is like AssetString but panics when Asset would return an
// error. It simplifies safe initialization of global variables.
func MustAssetString(name string) string {
return string(MustAsset(name))
}
// AssetInfo loads and returns the asset info for the given name.
// It returns an error if the asset could not be found or
// could not be loaded.
func AssetInfo(name string) (os.FileInfo, error) {
canonicalName := strings.Replace(name, "\\", "/", -1)
if f, ok := _bindata[canonicalName]; ok {
a, err := f()
if err != nil {
return nil, fmt.Errorf("AssetInfo %s can't read by error: %v", name, err)
}
return a.info, nil
}
return nil, fmt.Errorf("AssetInfo %s not found", name)
}
// AssetDigest returns the digest of the file with the given name. It returns an
// error if the asset could not be found or the digest could not be loaded.
func AssetDigest(name string) ([sha256.Size]byte, error) {
canonicalName := strings.Replace(name, "\\", "/", -1)
if f, ok := _bindata[canonicalName]; ok {
a, err := f()
if err != nil {
return [sha256.Size]byte{}, fmt.Errorf("AssetDigest %s can't read by error: %v", name, err)
}
return a.digest, nil
}
return [sha256.Size]byte{}, fmt.Errorf("AssetDigest %s not found", name)
}
// Digests returns a map of all known files and their checksums.
func Digests() (map[string][sha256.Size]byte, error) {
mp := make(map[string][sha256.Size]byte, len(_bindata))
for name := range _bindata {
a, err := _bindata[name]()
if err != nil {
return nil, err
}
mp[name] = a.digest
}
return mp, nil
}
// AssetNames returns the names of the assets.
func AssetNames() []string {
names := make([]string, 0, len(_bindata))
for name := range _bindata {
names = append(names, name)
}
return names
}
// _bindata is a table, holding each asset generator, mapped to its name.
var _bindata = map[string]func() (*asset, error){
"1561059285_add_whisper_keys.down.sql": _1561059285_add_whisper_keysDownSql,
"1561059285_add_whisper_keys.up.sql": _1561059285_add_whisper_keysUpSql,
"doc.go": docGo,
}
// AssetDir returns the file names below a certain
// directory embedded in the file by go-bindata.
// For example if you run go-bindata on data/... and data contains the
// following hierarchy:
// data/
// foo.txt
// img/
// a.png
// b.png
// then AssetDir("data") would return []string{"foo.txt", "img"},
// AssetDir("data/img") would return []string{"a.png", "b.png"},
// AssetDir("foo.txt") and AssetDir("notexist") would return an error, and
// AssetDir("") will return []string{"data"}.
func AssetDir(name string) ([]string, error) {
node := _bintree
if len(name) != 0 {
canonicalName := strings.Replace(name, "\\", "/", -1)
pathList := strings.Split(canonicalName, "/")
for _, p := range pathList {
node = node.Children[p]
if node == nil {
return nil, fmt.Errorf("Asset %s not found", name)
}
}
}
if node.Func != nil {
return nil, fmt.Errorf("Asset %s not found", name)
}
rv := make([]string, 0, len(node.Children))
for childName := range node.Children {
rv = append(rv, childName)
}
return rv, nil
}
type bintree struct {
Func func() (*asset, error)
Children map[string]*bintree
}
var _bintree = &bintree{nil, map[string]*bintree{
"1561059285_add_whisper_keys.down.sql": &bintree{_1561059285_add_whisper_keysDownSql, map[string]*bintree{}},
"1561059285_add_whisper_keys.up.sql": &bintree{_1561059285_add_whisper_keysUpSql, map[string]*bintree{}},
"doc.go": &bintree{docGo, map[string]*bintree{}},
}}
// RestoreAsset restores an asset under the given directory.
func RestoreAsset(dir, name string) error {
data, err := Asset(name)
if err != nil {
return err
}
info, err := AssetInfo(name)
if err != nil {
return err
}
err = os.MkdirAll(_filePath(dir, filepath.Dir(name)), os.FileMode(0755))
if err != nil {
return err
}
err = ioutil.WriteFile(_filePath(dir, name), data, info.Mode())
if err != nil {
return err
}
return os.Chtimes(_filePath(dir, name), info.ModTime(), info.ModTime())
}
// RestoreAssets restores an asset under the given directory recursively.
func RestoreAssets(dir, name string) error {
children, err := AssetDir(name)
// File
if err != nil {
return RestoreAsset(dir, name)
}
// Dir
for _, child := range children {
err = RestoreAssets(dir, filepath.Join(name, child))
if err != nil {
return err
}
}
return nil
}
func _filePath(dir, name string) string {
canonicalName := strings.Replace(name, "\\", "/", -1)
return filepath.Join(append([]string{dir}, strings.Split(canonicalName, "/")...)...)
}

View File

@ -1,4 +0,0 @@
CREATE TABLE whisper_keys (
chat_id TEXT PRIMARY KEY ON CONFLICT IGNORE,
key BLOB NOT NULL
) WITHOUT ROWID;

View File

@ -1,9 +0,0 @@
// This file is necessary because "github.com/status-im/migrate/v4"
// can't handle files starting with a prefix. At least that's the case
// for go-bindata.
// If go-bindata is called from the same directory, asset names
// have no prefix and "github.com/status-im/migrate/v4" works as expected.
package sqlite
//go:generate go-bindata -pkg sqlite -o ../migrations.go .

View File

@ -1,58 +0,0 @@
package whisper
import (
"database/sql"
)
type sqlitePersistence struct {
db *sql.DB
}
func newSQLitePersistence(db *sql.DB) *sqlitePersistence {
return &sqlitePersistence{db: db}
}
func (s *sqlitePersistence) Add(chatID string, key []byte) error {
statement := "INSERT INTO whisper_keys(chat_id, key) VALUES(?, ?)"
stmt, err := s.db.Prepare(statement)
if err != nil {
return err
}
defer stmt.Close()
_, err = stmt.Exec(chatID, key)
return err
}
func (s *sqlitePersistence) All() (map[string][]byte, error) {
keys := make(map[string][]byte)
statement := "SELECT chat_id, key FROM whisper_keys"
stmt, err := s.db.Prepare(statement)
if err != nil {
return nil, err
}
defer stmt.Close()
rows, err := stmt.Query()
if err != nil && err != sql.ErrNoRows {
return nil, err
}
defer rows.Close()
for rows.Next() {
var (
chatID string
key []byte
)
err := rows.Scan(&chatID, &key)
if err != nil {
return nil, err
}
keys[chatID] = key
}
return keys, nil
}

View File

@ -1,13 +0,0 @@
package whisper
import (
"github.com/status-im/status-go/eth-node/types"
)
type RequestOptions struct {
Topics []types.TopicType
Password string
Limit int
From int64 // in seconds
To int64 // in seconds
}

View File

@ -1,514 +0,0 @@
package whisper
import (
"bytes"
"context"
"crypto/ecdsa"
"database/sql"
"sync"
"time"
"github.com/pkg/errors"
"go.uber.org/zap"
"github.com/status-im/status-go/eth-node/crypto"
"github.com/status-im/status-go/eth-node/types"
"github.com/status-im/status-go/protocol/transport"
)
var (
// ErrNoMailservers returned if there is no configured mailservers that can be used.
ErrNoMailservers = errors.New("no configured mailservers")
)
type whisperServiceKeysManager struct {
shh types.Whisper
// Identity of the current user.
privateKey *ecdsa.PrivateKey
passToSymKeyMutex sync.RWMutex
passToSymKeyCache map[string]string
}
func (m *whisperServiceKeysManager) AddOrGetKeyPair(priv *ecdsa.PrivateKey) (string, error) {
// caching is handled in Whisper
return m.shh.AddKeyPair(priv)
}
func (m *whisperServiceKeysManager) AddOrGetSymKeyFromPassword(password string) (string, error) {
m.passToSymKeyMutex.Lock()
defer m.passToSymKeyMutex.Unlock()
if val, ok := m.passToSymKeyCache[password]; ok {
return val, nil
}
id, err := m.shh.AddSymKeyFromPassword(password)
if err != nil {
return id, err
}
m.passToSymKeyCache[password] = id
return id, nil
}
func (m *whisperServiceKeysManager) RawSymKey(id string) ([]byte, error) {
return m.shh.GetSymKey(id)
}
type Option func(*Transport) error
// Transport is a transport based on Whisper service.
type Transport struct {
shh types.Whisper
shhAPI types.PublicWhisperAPI // only PublicWhisperAPI implements logic to send messages
keysManager *whisperServiceKeysManager
filters *transport.FiltersManager
logger *zap.Logger
mailservers []string
envelopesMonitor *EnvelopesMonitor
}
// NewTransport returns a new Transport.
// TODO: leaving a chat should verify that for a given public key
// there are no other chats. It may happen that we leave a private chat
// but still have a public chat for a given public key.
func NewTransport(
shh types.Whisper,
privateKey *ecdsa.PrivateKey,
db *sql.DB,
mailservers []string,
envelopesMonitorConfig *transport.EnvelopesMonitorConfig,
logger *zap.Logger,
opts ...Option,
) (*Transport, error) {
filtersManager, err := transport.NewFiltersManager(newSQLitePersistence(db), shh, privateKey, logger)
if err != nil {
return nil, err
}
var envelopesMonitor *EnvelopesMonitor
if envelopesMonitorConfig != nil {
envelopesMonitor = NewEnvelopesMonitor(shh, *envelopesMonitorConfig)
envelopesMonitor.Start()
}
var shhAPI types.PublicWhisperAPI
if shh != nil {
shhAPI = shh.PublicWhisperAPI()
}
t := &Transport{
shh: shh,
shhAPI: shhAPI,
envelopesMonitor: envelopesMonitor,
keysManager: &whisperServiceKeysManager{
shh: shh,
privateKey: privateKey,
passToSymKeyCache: make(map[string]string),
},
filters: filtersManager,
mailservers: mailservers,
logger: logger.With(zap.Namespace("Transport")),
}
for _, opt := range opts {
if err := opt(t); err != nil {
return nil, err
}
}
return t, nil
}
func (a *Transport) InitFilters(chatIDs []string, publicKeys []*ecdsa.PublicKey) ([]*transport.Filter, error) {
return a.filters.Init(chatIDs, publicKeys)
}
func (a *Transport) InitPublicFilters(chatIDs []string) ([]*transport.Filter, error) {
return a.filters.InitPublicFilters(chatIDs)
}
func (a *Transport) InitCommunityFilters(pks []*ecdsa.PrivateKey) ([]*transport.Filter, error) {
return a.filters.InitCommunityFilters(pks)
}
func (a *Transport) Filters() []*transport.Filter {
return a.filters.Filters()
}
func (a *Transport) FilterByChatID(chatID string) *transport.Filter {
return a.filters.FilterByChatID(chatID)
}
func (a *Transport) LoadFilters(filters []*transport.Filter) ([]*transport.Filter, error) {
return a.filters.InitWithFilters(filters)
}
func (a *Transport) SendCommunityMessage(ctx context.Context, newMessage *types.NewMessage, publicKey *ecdsa.PublicKey) ([]byte, error) {
if err := a.addSig(newMessage); err != nil {
return nil, err
}
// We load the filter to make sure we can post on it
filter, err := a.filters.LoadPublic(transport.PubkeyToHex(publicKey))
if err != nil {
return nil, err
}
newMessage.Topic = filter.Topic
newMessage.PublicKey = crypto.FromECDSAPub(publicKey)
return a.shhAPI.Post(ctx, *newMessage)
}
func (a *Transport) RemoveFilters(filters []*transport.Filter) error {
return a.filters.Remove(filters...)
}
func (a *Transport) RemoveFilterByChatID(chatID string) (*transport.Filter, error) {
return a.filters.RemoveFilterByChatID(chatID)
}
func (a *Transport) ResetFilters() error {
return a.filters.Reset()
}
func (a *Transport) ProcessNegotiatedSecret(secret types.NegotiatedSecret) (*transport.Filter, error) {
filter, err := a.filters.LoadNegotiated(secret)
if err != nil {
return nil, err
}
return filter, nil
}
func (a *Transport) JoinPublic(chatID string) (*transport.Filter, error) {
return a.filters.LoadPublic(chatID)
}
func (a *Transport) LeavePublic(chatID string) error {
chat := a.filters.Filter(chatID)
if chat != nil {
return nil
}
return a.filters.Remove(chat)
}
func (a *Transport) JoinPrivate(publicKey *ecdsa.PublicKey) (*transport.Filter, error) {
return a.filters.LoadContactCode(publicKey)
}
func (a *Transport) LeavePrivate(publicKey *ecdsa.PublicKey) error {
filters := a.filters.FiltersByPublicKey(publicKey)
return a.filters.Remove(filters...)
}
func (a *Transport) JoinGroup(publicKeys []*ecdsa.PublicKey) ([]*transport.Filter, error) {
var filters []*transport.Filter
for _, pk := range publicKeys {
f, err := a.filters.LoadContactCode(pk)
if err != nil {
return nil, err
}
filters = append(filters, f)
}
return filters, nil
}
func (a *Transport) LeaveGroup(publicKeys []*ecdsa.PublicKey) error {
for _, publicKey := range publicKeys {
filters := a.filters.FiltersByPublicKey(publicKey)
if err := a.filters.Remove(filters...); err != nil {
return err
}
}
return nil
}
type Message struct {
Message *types.Message
Public bool
}
func (a *Transport) RetrieveAllMessages() ([]Message, error) {
var messages []Message
for _, filter := range a.filters.Filters() {
filterMsgs, err := a.shhAPI.GetFilterMessages(filter.FilterID)
if err != nil {
return nil, err
}
for _, m := range filterMsgs {
messages = append(messages, Message{
Message: m,
Public: filter.IsPublic(),
})
}
}
return messages, nil
}
func (a *Transport) RetrievePublicMessages(chatID string) ([]*types.Message, error) {
filter, err := a.filters.LoadPublic(chatID)
if err != nil {
return nil, err
}
return a.shhAPI.GetFilterMessages(filter.FilterID)
}
func (a *Transport) RetrievePrivateMessages(publicKey *ecdsa.PublicKey) ([]*types.Message, error) {
chats := a.filters.FiltersByPublicKey(publicKey)
discoveryChats, err := a.filters.Init(nil, nil)
if err != nil {
return nil, err
}
var result []*types.Message
for _, chat := range append(chats, discoveryChats...) {
filterMsgs, err := a.shhAPI.GetFilterMessages(chat.FilterID)
if err != nil {
return nil, err
}
result = append(result, filterMsgs...)
}
return result, nil
}
func (a *Transport) RetrieveRawAll() (map[transport.Filter][]*types.Message, error) {
result := make(map[transport.Filter][]*types.Message)
allFilters := a.filters.Filters()
for _, filter := range allFilters {
msgs, err := a.shhAPI.GetFilterMessages(filter.FilterID)
if err != nil {
continue
}
result[*filter] = append(result[*filter], msgs...)
}
return result, nil
}
// SendPublic sends a new message using the Whisper service.
// For public filters, chat name is used as an ID as well as
// a topic.
func (a *Transport) SendPublic(ctx context.Context, newMessage *types.NewMessage, chatName string) ([]byte, error) {
if err := a.addSig(newMessage); err != nil {
return nil, err
}
filter, err := a.filters.LoadPublic(chatName)
if err != nil {
return nil, err
}
newMessage.SymKeyID = filter.SymKeyID
newMessage.Topic = filter.Topic
return a.shhAPI.Post(ctx, *newMessage)
}
func (a *Transport) SendPrivateWithSharedSecret(ctx context.Context, newMessage *types.NewMessage, publicKey *ecdsa.PublicKey, secret []byte) ([]byte, error) {
if err := a.addSig(newMessage); err != nil {
return nil, err
}
filter, err := a.filters.LoadNegotiated(types.NegotiatedSecret{
PublicKey: publicKey,
Key: secret,
})
if err != nil {
return nil, err
}
newMessage.SymKeyID = filter.SymKeyID
newMessage.Topic = filter.Topic
newMessage.PublicKey = nil
return a.shhAPI.Post(ctx, *newMessage)
}
func (a *Transport) SendPrivateWithPartitioned(ctx context.Context, newMessage *types.NewMessage, publicKey *ecdsa.PublicKey) ([]byte, error) {
if err := a.addSig(newMessage); err != nil {
return nil, err
}
filter, err := a.filters.LoadPartitioned(publicKey, a.keysManager.privateKey, false)
if err != nil {
return nil, err
}
newMessage.Topic = filter.Topic
newMessage.PublicKey = crypto.FromECDSAPub(publicKey)
return a.shhAPI.Post(ctx, *newMessage)
}
func (a *Transport) SendPrivateOnPersonalTopic(ctx context.Context, newMessage *types.NewMessage, publicKey *ecdsa.PublicKey) ([]byte, error) {
if err := a.addSig(newMessage); err != nil {
return nil, err
}
filter, err := a.filters.LoadPersonal(publicKey, a.keysManager.privateKey, false)
if err != nil {
return nil, err
}
newMessage.Topic = filter.Topic
newMessage.PublicKey = crypto.FromECDSAPub(publicKey)
return a.shhAPI.Post(ctx, *newMessage)
}
func (a *Transport) SendPrivateOnDiscovery(ctx context.Context, newMessage *types.NewMessage, publicKey *ecdsa.PublicKey) ([]byte, error) {
if err := a.addSig(newMessage); err != nil {
return nil, err
}
// There is no need to load any chat
// because listening on the discovery topic
// is done automatically.
// TODO: change this anyway, it should be explicit
// and idempotent.
newMessage.Topic = types.BytesToTopic(transport.ToTopic(transport.DiscoveryTopic()))
newMessage.PublicKey = crypto.FromECDSAPub(publicKey)
return a.shhAPI.Post(ctx, *newMessage)
}
func (a *Transport) addSig(newMessage *types.NewMessage) error {
sigID, err := a.keysManager.AddOrGetKeyPair(a.keysManager.privateKey)
if err != nil {
return err
}
newMessage.SigID = sigID
return nil
}
func (a *Transport) Track(identifiers [][]byte, hash []byte, newMessage *types.NewMessage) {
if a.envelopesMonitor != nil {
a.envelopesMonitor.Add(identifiers, types.BytesToHash(hash), *newMessage)
}
}
// GetCurrentTime returns the current unix timestamp in milliseconds
func (a *Transport) GetCurrentTime() uint64 {
return uint64(a.shh.GetCurrentTime().UnixNano() / int64(time.Millisecond))
}
func (a *Transport) MaxMessageSize() uint32 {
return a.shh.MaxMessageSize()
}
func (a *Transport) Stop() error {
if a.envelopesMonitor != nil {
a.envelopesMonitor.Stop()
}
return nil
}
// RequestHistoricMessages requests historic messages for all registered filters.
func (a *Transport) SendMessagesRequest(
ctx context.Context,
peerID []byte,
from, to uint32,
previousCursor []byte,
waitForResponse bool,
) (cursor []byte, err error) {
topics := make([]types.TopicType, len(a.Filters()))
for _, f := range a.Filters() {
topics = append(topics, f.Topic)
}
r := createMessagesRequest(from, to, previousCursor, topics)
r.SetDefaults(a.shh.GetCurrentTime())
events := make(chan types.EnvelopeEvent, 10)
sub := a.shh.SubscribeEnvelopeEvents(events)
defer sub.Unsubscribe()
err = a.shh.SendMessagesRequest(peerID, r)
if err != nil {
return
}
if !waitForResponse {
return
}
resp, err := a.waitForRequestCompleted(ctx, r.ID, events)
if err == nil && resp != nil && resp.Error != nil {
err = resp.Error
} else if err == nil && resp != nil {
cursor = resp.Cursor
}
return
}
//TODO: kozieiev: fix
func (a *Transport) SendMessagesRequestForFilter(
ctx context.Context,
peerID []byte,
from, to uint32,
previousCursor []byte,
filter *transport.Filter,
waitForResponse bool,
) (cursor []byte, err error) {
return nil, nil
}
func (a *Transport) LoadKeyFilters(key *ecdsa.PrivateKey) (*transport.Filter, error) {
return a.filters.LoadPartitioned(&key.PublicKey, key, true)
}
func (a *Transport) waitForRequestCompleted(ctx context.Context, requestID []byte, events chan types.EnvelopeEvent) (*types.MailServerResponse, error) {
for {
select {
case ev := <-events:
a.logger.Debug(
"waiting for request completed and received an event",
zap.Binary("requestID", requestID),
zap.Any("event", ev),
)
if !bytes.Equal(ev.Hash.Bytes(), requestID) {
continue
}
if ev.Event != types.EventMailServerRequestCompleted {
continue
}
data, ok := ev.Data.(*types.MailServerResponse)
if ok {
return data, nil
}
case <-ctx.Done():
return nil, ctx.Err()
}
}
}
// NOTE: currently not used as whisper is not maintained anymore
func (a *Transport) ConfirmMessagesProcessed(ids []string, timestamp uint64) error {
return nil
}
// NOTE: currently not used as whisper is not maintained anymore
func (a *Transport) CleanMessagesProcessed(timestamp uint64) error {
return nil
}
func (a *Transport) SetEnvelopeEventsHandler(handler transport.EnvelopeEventsHandler) error {
if a.envelopesMonitor == nil {
return errors.New("Current transport has no envelopes monitor")
}
a.envelopesMonitor.handler = handler
return nil
}

View File

@ -1,28 +0,0 @@
package whisper
import (
"io/ioutil"
"os"
"testing"
"github.com/status-im/status-go/protocol/sqlite"
"github.com/stretchr/testify/require"
"github.com/status-im/status-go/protocol/tt"
)
func TestNewWhisperServiceTransport(t *testing.T) {
dbPath, err := ioutil.TempFile("", "transport.sql")
require.NoError(t, err)
defer os.Remove(dbPath.Name())
db, err := sqlite.Open(dbPath.Name(), "some-key")
require.NoError(t, err)
logger := tt.MustCreateTestLogger()
require.NoError(t, err)
defer func() { _ = logger.Sync() }()
_, err = NewTransport(nil, nil, db, nil, nil, logger)
require.NoError(t, err)
}