From 9241903edb0a00cb0b25c789959a5d411955ff3b Mon Sep 17 00:00:00 2001 From: Volodymyr Kozieiev Date: Thu, 22 Apr 2021 16:36:18 +0300 Subject: [PATCH] Refactorings: whisper transport removed (#2200) --- VERSION | 2 +- go.mod | 1 + go.sum | 3 + protocol/common/message_processor.go | 4 +- protocol/common/message_processor_test.go | 16 +- protocol/message_handler.go | 7 +- protocol/messenger.go | 50 +- protocol/sqlite/migrations.go | 7 +- protocol/transport/envelopes_monitor.go | 294 +++++++++ ...opes_test.go => envelopes_monitor_test.go} | 5 +- protocol/transport/filters_manager.go | 251 ++++---- .../{waku => }/migrations/migrations.go | 0 .../sqlite/1561059284_add_waku_keys.down.sql | 0 .../sqlite/1561059284_add_waku_keys.up.sql | 0 .../{waku => }/migrations/sqlite/doc.go | 0 protocol/transport/{waku => }/persistence.go | 2 +- protocol/transport/transport.go | 578 ++++++++++++++++-- ...waku_service_test.go => transport_test.go} | 4 +- protocol/transport/waku/envelopes.go | 310 ---------- protocol/transport/waku/mailserver.go | 38 -- protocol/transport/waku/waku.go | 13 - protocol/transport/waku/waku_service.go | 520 ---------------- protocol/transport/whisper/envelopes.go | 311 ---------- protocol/transport/whisper/envelopes_test.go | 121 ---- protocol/transport/whisper/mailserver.go | 38 -- .../whisper/migrations/migrations.go | 319 ---------- .../1561059285_add_whisper_keys.down.sql | 1 - .../sqlite/1561059285_add_whisper_keys.up.sql | 4 - .../whisper/migrations/sqlite/doc.go | 9 - protocol/transport/whisper/persistence.go | 58 -- protocol/transport/whisper/whisper.go | 13 - protocol/transport/whisper/whisper_service.go | 514 ---------------- .../transport/whisper/whisper_service_test.go | 28 - 33 files changed, 994 insertions(+), 2527 deletions(-) rename protocol/transport/{waku/envelopes_test.go => envelopes_monitor_test.go} (97%) rename protocol/transport/{waku => }/migrations/migrations.go (100%) rename protocol/transport/{waku => }/migrations/sqlite/1561059284_add_waku_keys.down.sql (100%) rename protocol/transport/{waku => }/migrations/sqlite/1561059284_add_waku_keys.up.sql (100%) rename protocol/transport/{waku => }/migrations/sqlite/doc.go (100%) rename protocol/transport/{waku => }/persistence.go (98%) rename protocol/transport/{waku/waku_service_test.go => transport_test.go} (89%) delete mode 100644 protocol/transport/waku/envelopes.go delete mode 100644 protocol/transport/waku/mailserver.go delete mode 100644 protocol/transport/waku/waku.go delete mode 100644 protocol/transport/waku/waku_service.go delete mode 100644 protocol/transport/whisper/envelopes.go delete mode 100644 protocol/transport/whisper/envelopes_test.go delete mode 100644 protocol/transport/whisper/mailserver.go delete mode 100644 protocol/transport/whisper/migrations/migrations.go delete mode 100644 protocol/transport/whisper/migrations/sqlite/1561059285_add_whisper_keys.down.sql delete mode 100644 protocol/transport/whisper/migrations/sqlite/1561059285_add_whisper_keys.up.sql delete mode 100644 protocol/transport/whisper/migrations/sqlite/doc.go delete mode 100644 protocol/transport/whisper/persistence.go delete mode 100644 protocol/transport/whisper/whisper.go delete mode 100644 protocol/transport/whisper/whisper_service.go delete mode 100644 protocol/transport/whisper/whisper_service_test.go diff --git a/VERSION b/VERSION index f2b9ee573..b3631d672 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -0.76.5 +0.76.6 diff --git a/go.mod b/go.mod index 52540ebed..660ea0903 100644 --- a/go.mod +++ b/go.mod @@ -54,6 +54,7 @@ require ( github.com/pborman/uuid v1.2.0 github.com/pkg/errors v0.9.1 github.com/prometheus/client_golang v1.5.0 + github.com/prometheus/common v0.9.1 github.com/russolsen/ohyeah v0.0.0-20160324131710-f4938c005315 // indirect github.com/russolsen/same v0.0.0-20160222130632-f089df61f51d // indirect github.com/russolsen/transit v0.0.0-20180705123435-0794b4c4505a diff --git a/go.sum b/go.sum index 4e08c75c6..d07ce4cae 100644 --- a/go.sum +++ b/go.sum @@ -24,8 +24,10 @@ github.com/Shopify/toxiproxy v2.1.4+incompatible/go.mod h1:OXgGpZ6Cli1/URJOF1DMx github.com/StackExchange/wmi v0.0.0-20180116203802-5d049714c4a6/go.mod h1:3eOhrUMpNV+6aFIbp5/iudMxNCF27Vw2OZgy4xEx0Fg= github.com/aead/siphash v1.0.1/go.mod h1:Nywa3cDsYNNK3gaciGTWPwHt0wlpNV15vwmswBAUSII= github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc= +github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751 h1:JYp7IbQjafoB+tBA3gMyHYHrpOtNuDiK/uB5uXxq5wM= github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc= github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0= +github.com/alecthomas/units v0.0.0-20190717042225-c3de453c63f4 h1:Hs82Z41s6SdL1CELW+XaDYmOH4hkBN4/N9og/AsOv7E= github.com/alecthomas/units v0.0.0-20190717042225-c3de453c63f4/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0= github.com/allegro/bigcache v0.0.0-20190218064605-e24eb225f156/go.mod h1:Cb/ax3seSYIx7SuZdm2G2xzfwmv3TPSk2ucNfQESPXM= github.com/allegro/bigcache v1.2.0 h1:qDaE0QoF29wKBb3+pXFrJFy1ihe5OT9OiXhg1t85SxM= @@ -854,6 +856,7 @@ google.golang.org/grpc v1.20.1/go.mod h1:10oTOabMzJvdu6/UiuZezV6QK5dSlG84ov/aaiq google.golang.org/grpc v1.23.0/go.mod h1:Y5yQAOtifL1yxbo5wqy6BxZv8vAUGQwXBOALyacEbxg= google.golang.org/grpc v1.25.1 h1:wdKvqQk7IttEw92GoRyKG2IDrUIpgpj6H6m81yfeMW0= google.golang.org/grpc v1.25.1/go.mod h1:c3i+UQWmh7LiEpx4sFZnkU36qjEYZ0imhYfXVyQciAY= +gopkg.in/alecthomas/kingpin.v2 v2.2.6 h1:jMFz6MfLP0/4fUyZle81rXUoxOBFi19VUFKVDOQfozc= gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= diff --git a/protocol/common/message_processor.go b/protocol/common/message_processor.go index f66ce9e4e..a3516d0c1 100644 --- a/protocol/common/message_processor.go +++ b/protocol/common/message_processor.go @@ -47,7 +47,7 @@ type MessageProcessor struct { identity *ecdsa.PrivateKey datasync *datasync.DataSync protocol *encryption.Protocol - transport transport.Transport + transport *transport.Transport logger *zap.Logger persistence *RawMessagesPersistence @@ -71,7 +71,7 @@ func NewMessageProcessor( identity *ecdsa.PrivateKey, database *sql.DB, enc *encryption.Protocol, - transport transport.Transport, + transport *transport.Transport, logger *zap.Logger, features FeatureFlags, ) (*MessageProcessor, error) { diff --git a/protocol/common/message_processor_test.go b/protocol/common/message_processor_test.go index baa22a1a7..b17092156 100644 --- a/protocol/common/message_processor_test.go +++ b/protocol/common/message_processor_test.go @@ -6,6 +6,10 @@ import ( "path/filepath" "testing" + transport2 "github.com/status-im/status-go/protocol/transport" + + "github.com/status-im/status-go/waku" + "github.com/golang/protobuf/proto" "github.com/stretchr/testify/suite" @@ -19,9 +23,7 @@ import ( "github.com/status-im/status-go/protocol/encryption" "github.com/status-im/status-go/protocol/protobuf" "github.com/status-im/status-go/protocol/sqlite" - transport "github.com/status-im/status-go/protocol/transport/whisper" v1protocol "github.com/status-im/status-go/protocol/v1" - "github.com/status-im/status-go/whisper" ) func TestMessageProcessorSuite(t *testing.T) { @@ -67,13 +69,13 @@ func (s *MessageProcessorSuite) SetupTest() { s.logger, ) - whisperConfig := whisper.DefaultConfig - whisperConfig.MinimumAcceptedPOW = 0 - shh := whisper.New(&whisperConfig) + wakuConfig := waku.DefaultConfig + wakuConfig.MinimumAcceptedPoW = 0 + shh := waku.New(&wakuConfig, s.logger) s.Require().NoError(shh.Start(nil)) - whisperTransport, err := transport.NewTransport( - gethbridge.NewGethWhisperWrapper(shh), + whisperTransport, err := transport2.NewTransport( + gethbridge.NewGethWakuWrapper(shh), identity, database, nil, diff --git a/protocol/message_handler.go b/protocol/message_handler.go index 076516a54..4de4ef218 100644 --- a/protocol/message_handler.go +++ b/protocol/message_handler.go @@ -5,6 +5,8 @@ import ( "encoding/hex" "fmt" + "github.com/status-im/status-go/protocol/transport" + "github.com/pkg/errors" "github.com/status-im/status-go/eth-node/crypto" @@ -16,7 +18,6 @@ import ( "github.com/status-im/status-go/protocol/encryption/multidevice" "github.com/status-im/status-go/protocol/ens" "github.com/status-im/status-go/protocol/protobuf" - "github.com/status-im/status-go/protocol/transport" v1protocol "github.com/status-im/status-go/protocol/v1" "go.uber.org/zap" @@ -34,13 +35,13 @@ type MessageHandler struct { identity *ecdsa.PrivateKey persistence *sqlitePersistence settings *accounts.Database - transport transport.Transport + transport *transport.Transport ensVerifier *ens.Verifier communitiesManager *communities.Manager logger *zap.Logger } -func newMessageHandler(identity *ecdsa.PrivateKey, logger *zap.Logger, persistence *sqlitePersistence, communitiesManager *communities.Manager, transport transport.Transport, ensVerifier *ens.Verifier, settings *accounts.Database) *MessageHandler { +func newMessageHandler(identity *ecdsa.PrivateKey, logger *zap.Logger, persistence *sqlitePersistence, communitiesManager *communities.Manager, transport *transport.Transport, ensVerifier *ens.Verifier, settings *accounts.Database) *MessageHandler { return &MessageHandler{ identity: identity, persistence: persistence, diff --git a/protocol/messenger.go b/protocol/messenger.go index 54962c140..02ced641f 100644 --- a/protocol/messenger.go +++ b/protocol/messenger.go @@ -42,12 +42,11 @@ import ( "github.com/status-im/status-go/protocol/pushnotificationserver" "github.com/status-im/status-go/protocol/sqlite" "github.com/status-im/status-go/protocol/transport" - wakutransp "github.com/status-im/status-go/protocol/transport/waku" - shhtransp "github.com/status-im/status-go/protocol/transport/whisper" v1protocol "github.com/status-im/status-go/protocol/v1" "github.com/status-im/status-go/services/mailservers" ) +//todo: kozieiev: get rid of wakutransp word type chatContext string const ( @@ -79,7 +78,7 @@ type Messenger struct { config *config identity *ecdsa.PrivateKey persistence *sqlitePersistence - transport transport.Transport + transport *transport.Transport encryptor *encryption.Protocol processor *common.MessageProcessor handler *MessageHandler @@ -204,36 +203,21 @@ func NewMessenger( } // Initialize transport layer. - var transp transport.Transport - if shh, err := node.GetWhisper(nil); err == nil && shh != nil { - transp, err = shhtransp.NewTransport( - shh, - identity, - database, - nil, - c.envelopesMonitorConfig, - logger, - ) - if err != nil { - return nil, errors.Wrap(err, "failed to create Transport") - } - } else { - logger.Info("failed to find Whisper service; trying Waku", zap.Error(err)) - waku, err := node.GetWaku(nil) - if err != nil || waku == nil { - return nil, errors.Wrap(err, "failed to find Whisper and Waku services") - } - transp, err = wakutransp.NewTransport( - waku, - identity, - database, - nil, - c.envelopesMonitorConfig, - logger, - ) - if err != nil { - return nil, errors.Wrap(err, "failed to create Transport") - } + logger.Info("failed to find Whisper service; trying Waku", zap.Error(err)) + waku, err := node.GetWaku(nil) + if err != nil || waku == nil { + return nil, errors.Wrap(err, "failed to find Whisper and Waku services") + } + transp, err := transport.NewTransport( + waku, + identity, + database, + nil, + c.envelopesMonitorConfig, + logger, + ) + if err != nil { + return nil, errors.Wrap(err, "failed to create Transport") } // Initialize encryption layer. diff --git a/protocol/sqlite/migrations.go b/protocol/sqlite/migrations.go index 5c9eb035d..e4d735676 100644 --- a/protocol/sqlite/migrations.go +++ b/protocol/sqlite/migrations.go @@ -9,8 +9,7 @@ import ( appmigrations "github.com/status-im/status-go/protocol/migrations" push_notification_client_migrations "github.com/status-im/status-go/protocol/pushnotificationclient/migrations" push_notification_server_migrations "github.com/status-im/status-go/protocol/pushnotificationserver/migrations" - wakumigrations "github.com/status-im/status-go/protocol/transport/waku/migrations" - whispermigrations "github.com/status-im/status-go/protocol/transport/whisper/migrations" + wakumigrations "github.com/status-im/status-go/protocol/transport/migrations" ) type getter func(string) ([]byte, error) @@ -21,10 +20,6 @@ type migrationsWithGetter struct { } var defaultMigrations = []migrationsWithGetter{ - { - Names: whispermigrations.AssetNames(), - Getter: whispermigrations.Asset, - }, { Names: wakumigrations.AssetNames(), Getter: wakumigrations.Asset, diff --git a/protocol/transport/envelopes_monitor.go b/protocol/transport/envelopes_monitor.go index be1490565..0851c7c8b 100644 --- a/protocol/transport/envelopes_monitor.go +++ b/protocol/transport/envelopes_monitor.go @@ -1,11 +1,27 @@ package transport import ( + "context" + "errors" + "sync" + "go.uber.org/zap" "github.com/status-im/status-go/eth-node/types" ) +// EnvelopeState in local tracker +type EnvelopeState int + +const ( + // NotRegistered returned if asked hash wasn't registered in the tracker. + NotRegistered EnvelopeState = -1 + // EnvelopePosted is set when envelope was added to a local waku queue. + EnvelopePosted EnvelopeState = iota + 1 + // EnvelopeSent is set when envelope is sent to at least one peer. + EnvelopeSent +) + type EnvelopesMonitorConfig struct { EnvelopeEventsHandler EnvelopeEventsHandler MaxAttempts int @@ -21,3 +37,281 @@ type EnvelopeEventsHandler interface { MailServerRequestCompleted(types.Hash, types.Hash, []byte, error) MailServerRequestExpired(types.Hash) } + +// NewEnvelopesMonitor returns a pointer to an instance of the EnvelopesMonitor. +func NewEnvelopesMonitor(w types.Waku, config EnvelopesMonitorConfig) *EnvelopesMonitor { + logger := config.Logger + + if logger == nil { + logger = zap.NewNop() + } + + var api types.PublicWakuAPI + if w != nil { + api = w.PublicWakuAPI() + } + + return &EnvelopesMonitor{ + w: w, + api: api, + handler: config.EnvelopeEventsHandler, + mailServerConfirmation: config.MailserverConfirmationsEnabled, + maxAttempts: config.MaxAttempts, + isMailserver: config.IsMailserver, + logger: logger.With(zap.Namespace("EnvelopesMonitor")), + + // key is envelope hash (event.Hash) + envelopes: map[types.Hash]EnvelopeState{}, + messages: map[types.Hash]*types.NewMessage{}, + attempts: map[types.Hash]int{}, + identifiers: make(map[types.Hash][][]byte), + + // key is hash of the batch (event.Batch) + batches: map[types.Hash]map[types.Hash]struct{}{}, + } +} + +// EnvelopesMonitor is responsible for monitoring waku envelopes state. +type EnvelopesMonitor struct { + w types.Waku + api types.PublicWakuAPI + handler EnvelopeEventsHandler + mailServerConfirmation bool + maxAttempts int + + mu sync.Mutex + envelopes map[types.Hash]EnvelopeState + batches map[types.Hash]map[types.Hash]struct{} + + messages map[types.Hash]*types.NewMessage + attempts map[types.Hash]int + identifiers map[types.Hash][][]byte + + wg sync.WaitGroup + quit chan struct{} + isMailserver func(peer types.EnodeID) bool + + logger *zap.Logger +} + +// Start processing events. +func (m *EnvelopesMonitor) Start() { + m.quit = make(chan struct{}) + m.wg.Add(1) + go func() { + m.handleEnvelopeEvents() + m.wg.Done() + }() +} + +// Stop process events. +func (m *EnvelopesMonitor) Stop() { + close(m.quit) + m.wg.Wait() +} + +// Add hash to a tracker. +func (m *EnvelopesMonitor) Add(identifiers [][]byte, envelopeHash types.Hash, message types.NewMessage) { + m.mu.Lock() + defer m.mu.Unlock() + m.envelopes[envelopeHash] = EnvelopePosted + m.identifiers[envelopeHash] = identifiers + m.messages[envelopeHash] = &message + m.attempts[envelopeHash] = 1 +} + +func (m *EnvelopesMonitor) GetState(hash types.Hash) EnvelopeState { + m.mu.Lock() + defer m.mu.Unlock() + state, exist := m.envelopes[hash] + if !exist { + return NotRegistered + } + return state +} + +// handleEnvelopeEvents processes waku envelope events +func (m *EnvelopesMonitor) handleEnvelopeEvents() { + events := make(chan types.EnvelopeEvent, 100) // must be buffered to prevent blocking waku + sub := m.w.SubscribeEnvelopeEvents(events) + defer func() { + close(events) + sub.Unsubscribe() + }() + for { + select { + case <-m.quit: + return + case event := <-events: + m.handleEvent(event) + } + } +} + +// handleEvent based on type of the event either triggers +// confirmation handler or removes hash from tracker +func (m *EnvelopesMonitor) handleEvent(event types.EnvelopeEvent) { + handlers := map[types.EventType]func(types.EnvelopeEvent){ + types.EventEnvelopeSent: m.handleEventEnvelopeSent, + types.EventEnvelopeExpired: m.handleEventEnvelopeExpired, + types.EventBatchAcknowledged: m.handleAcknowledgedBatch, + types.EventEnvelopeReceived: m.handleEventEnvelopeReceived, + } + if handler, ok := handlers[event.Event]; ok { + handler(event) + } +} + +func (m *EnvelopesMonitor) handleEventEnvelopeSent(event types.EnvelopeEvent) { + if m.mailServerConfirmation { + if !m.isMailserver(event.Peer) { + return + } + } + + m.mu.Lock() + defer m.mu.Unlock() + + state, ok := m.envelopes[event.Hash] + // if we didn't send a message using extension - skip it + // if message was already confirmed - skip it + if !ok || state == EnvelopeSent { + return + } + m.logger.Debug("envelope is sent", zap.String("hash", event.Hash.String()), zap.String("peer", event.Peer.String())) + if event.Batch != (types.Hash{}) { + if _, ok := m.batches[event.Batch]; !ok { + m.batches[event.Batch] = map[types.Hash]struct{}{} + } + m.batches[event.Batch][event.Hash] = struct{}{} + m.logger.Debug("waiting for a confirmation", zap.String("batch", event.Batch.String())) + } else { + m.envelopes[event.Hash] = EnvelopeSent + if m.handler != nil { + m.handler.EnvelopeSent(m.identifiers[event.Hash]) + } + } +} + +func (m *EnvelopesMonitor) handleAcknowledgedBatch(event types.EnvelopeEvent) { + if m.mailServerConfirmation { + if !m.isMailserver(event.Peer) { + return + } + } + + m.mu.Lock() + defer m.mu.Unlock() + + envelopes, ok := m.batches[event.Batch] + if !ok { + m.logger.Debug("batch is not found", zap.String("batch", event.Batch.String())) + } + m.logger.Debug("received a confirmation", zap.String("batch", event.Batch.String()), zap.String("peer", event.Peer.String())) + envelopeErrors, ok := event.Data.([]types.EnvelopeError) + if event.Data != nil && !ok { + m.logger.Error("received unexpected data in the the confirmation event", zap.Any("data", event.Data)) + } + failedEnvelopes := map[types.Hash]struct{}{} + for i := range envelopeErrors { + envelopeError := envelopeErrors[i] + _, exist := m.envelopes[envelopeError.Hash] + if exist { + m.logger.Warn("envelope that was posted by us is discarded", zap.String("hash", envelopeError.Hash.String()), zap.String("peer", event.Peer.String()), zap.String("error", envelopeError.Description)) + var err error + switch envelopeError.Code { + case types.EnvelopeTimeNotSynced: + err = errors.New("envelope wasn't delivered due to time sync issues") + } + m.handleEnvelopeFailure(envelopeError.Hash, err) + } + failedEnvelopes[envelopeError.Hash] = struct{}{} + } + + for hash := range envelopes { + if _, exist := failedEnvelopes[hash]; exist { + continue + } + state, ok := m.envelopes[hash] + if !ok || state == EnvelopeSent { + continue + } + m.envelopes[hash] = EnvelopeSent + if m.handler != nil { + m.handler.EnvelopeSent(m.identifiers[hash]) + } + } + delete(m.batches, event.Batch) +} + +func (m *EnvelopesMonitor) handleEventEnvelopeExpired(event types.EnvelopeEvent) { + m.mu.Lock() + defer m.mu.Unlock() + m.handleEnvelopeFailure(event.Hash, errors.New("envelope expired due to connectivity issues")) +} + +// handleEnvelopeFailure is a common code path for processing envelopes failures. not thread safe, lock +// must be used on a higher level. +func (m *EnvelopesMonitor) handleEnvelopeFailure(hash types.Hash, err error) { + if state, ok := m.envelopes[hash]; ok { + message, exist := m.messages[hash] + if !exist { + m.logger.Error("message was deleted erroneously", zap.String("envelope hash", hash.String())) + } + attempt := m.attempts[hash] + identifiers := m.identifiers[hash] + m.clearMessageState(hash) + if state == EnvelopeSent { + return + } + if attempt < m.maxAttempts { + m.logger.Debug("retrying to send a message", zap.String("hash", hash.String()), zap.Int("attempt", attempt+1)) + hex, err := m.api.Post(context.TODO(), *message) + if err != nil { + m.logger.Error("failed to retry sending message", zap.String("hash", hash.String()), zap.Int("attempt", attempt+1), zap.Error(err)) + if m.handler != nil { + m.handler.EnvelopeExpired(identifiers, err) + } + + } + envelopeID := types.BytesToHash(hex) + m.envelopes[envelopeID] = EnvelopePosted + m.messages[envelopeID] = message + m.attempts[envelopeID] = attempt + 1 + m.identifiers[envelopeID] = identifiers + } else { + m.logger.Debug("envelope expired", zap.String("hash", hash.String())) + if m.handler != nil { + m.handler.EnvelopeExpired(identifiers, err) + } + } + } +} + +func (m *EnvelopesMonitor) handleEventEnvelopeReceived(event types.EnvelopeEvent) { + if m.mailServerConfirmation { + if !m.isMailserver(event.Peer) { + return + } + } + m.mu.Lock() + defer m.mu.Unlock() + state, ok := m.envelopes[event.Hash] + if !ok || state != EnvelopePosted { + return + } + m.logger.Debug("expected envelope received", zap.String("hash", event.Hash.String()), zap.String("peer", event.Peer.String())) + m.envelopes[event.Hash] = EnvelopeSent + if m.handler != nil { + m.handler.EnvelopeSent(m.identifiers[event.Hash]) + } +} + +// clearMessageState removes all message and envelope state. +// not thread-safe, should be protected on a higher level. +func (m *EnvelopesMonitor) clearMessageState(envelopeID types.Hash) { + delete(m.envelopes, envelopeID) + delete(m.messages, envelopeID) + delete(m.attempts, envelopeID) + delete(m.identifiers, envelopeID) +} diff --git a/protocol/transport/waku/envelopes_test.go b/protocol/transport/envelopes_monitor_test.go similarity index 97% rename from protocol/transport/waku/envelopes_test.go rename to protocol/transport/envelopes_monitor_test.go index c07eaf308..c93ede598 100644 --- a/protocol/transport/waku/envelopes_test.go +++ b/protocol/transport/envelopes_monitor_test.go @@ -1,4 +1,4 @@ -package waku +package transport import ( "testing" @@ -11,7 +11,6 @@ import ( "github.com/status-im/status-go/eth-node/crypto" "github.com/status-im/status-go/eth-node/types" - "github.com/status-im/status-go/protocol/transport" ) var ( @@ -32,7 +31,7 @@ func TestEnvelopesMonitorSuite(t *testing.T) { func (s *EnvelopesMonitorSuite) SetupTest() { s.monitor = NewEnvelopesMonitor( nil, - transport.EnvelopesMonitorConfig{ + EnvelopesMonitorConfig{ EnvelopeEventsHandler: nil, MaxAttempts: 0, MailserverConfirmationsEnabled: false, diff --git a/protocol/transport/filters_manager.go b/protocol/transport/filters_manager.go index 9d2c8da52..ca3ff6c00 100644 --- a/protocol/transport/filters_manager.go +++ b/protocol/transport/filters_manager.go @@ -71,59 +71,59 @@ func NewFiltersManager(persistence KeysPersistence, service FiltersService, priv }, nil } -func (s *FiltersManager) Init( +func (f *FiltersManager) Init( chatIDs []string, publicKeys []*ecdsa.PublicKey, ) ([]*Filter, error) { // Load our contact code. - _, err := s.LoadContactCode(&s.privateKey.PublicKey) + _, err := f.LoadContactCode(&f.privateKey.PublicKey) if err != nil { return nil, errors.Wrap(err, "failed to load contact code") } // Load partitioned topic. - _, err = s.loadMyPartitioned() + _, err = f.loadMyPartitioned() if err != nil { return nil, err } // Add discovery topic. - _, err = s.LoadDiscovery() + _, err = f.LoadDiscovery() if err != nil { return nil, err } // Add public, one-to-one and negotiated filters. for _, chatID := range chatIDs { - _, err := s.LoadPublic(chatID) + _, err := f.LoadPublic(chatID) if err != nil { return nil, err } } for _, publicKey := range publicKeys { - _, err := s.LoadContactCode(publicKey) + _, err := f.LoadContactCode(publicKey) if err != nil { return nil, err } } - s.mutex.Lock() - defer s.mutex.Unlock() + f.mutex.Lock() + defer f.mutex.Unlock() var allFilters []*Filter - for _, f := range s.filters { + for _, f := range f.filters { allFilters = append(allFilters, f) } return allFilters, nil } -func (s *FiltersManager) InitPublicFilters(chatIDs []string) ([]*Filter, error) { +func (f *FiltersManager) InitPublicFilters(chatIDs []string) ([]*Filter, error) { var filters []*Filter // Add public, one-to-one and negotiated filters. for _, chatID := range chatIDs { - f, err := s.LoadPublic(chatID) + f, err := f.LoadPublic(chatID) if err != nil { return nil, err } @@ -132,15 +132,15 @@ func (s *FiltersManager) InitPublicFilters(chatIDs []string) ([]*Filter, error) return filters, nil } -func (s *FiltersManager) InitCommunityFilters(pks []*ecdsa.PrivateKey) ([]*Filter, error) { +func (f *FiltersManager) InitCommunityFilters(pks []*ecdsa.PrivateKey) ([]*Filter, error) { var filters []*Filter - s.mutex.Lock() - defer s.mutex.Unlock() + f.mutex.Lock() + defer f.mutex.Unlock() for _, pk := range pks { identityStr := PublicKeyToStr(&pk.PublicKey) - rawFilter, err := s.addAsymmetric(identityStr, pk, true) + rawFilter, err := f.addAsymmetric(identityStr, pk, true) if err != nil { return nil, err @@ -155,7 +155,7 @@ func (s *FiltersManager) InitCommunityFilters(pks []*ecdsa.PrivateKey) ([]*Filte OneToOne: true, } - s.filters[filterID] = filter + f.filters[filterID] = filter filters = append(filters, filter) } @@ -163,7 +163,7 @@ func (s *FiltersManager) InitCommunityFilters(pks []*ecdsa.PrivateKey) ([]*Filte } // DEPRECATED -func (s *FiltersManager) InitWithFilters(filters []*Filter) ([]*Filter, error) { +func (f *FiltersManager) InitWithFilters(filters []*Filter) ([]*Filter, error) { var ( chatIDs []string publicKeys []*ecdsa.PublicKey @@ -181,43 +181,43 @@ func (s *FiltersManager) InitWithFilters(filters []*Filter) ([]*Filter, error) { } } - return s.Init(chatIDs, publicKeys) + return f.Init(chatIDs, publicKeys) } -func (s *FiltersManager) Reset() error { +func (f *FiltersManager) Reset() error { var filters []*Filter - s.mutex.Lock() - for _, f := range s.filters { + f.mutex.Lock() + for _, f := range f.filters { filters = append(filters, f) } - s.mutex.Unlock() + f.mutex.Unlock() - return s.Remove(filters...) + return f.Remove(filters...) } -func (s *FiltersManager) Filters() (result []*Filter) { - s.mutex.Lock() - defer s.mutex.Unlock() +func (f *FiltersManager) Filters() (result []*Filter) { + f.mutex.Lock() + defer f.mutex.Unlock() - for _, f := range s.filters { + for _, f := range f.filters { result = append(result, f) } return } -func (s *FiltersManager) Filter(chatID string) *Filter { - s.mutex.Lock() - defer s.mutex.Unlock() - return s.filters[chatID] +func (f *FiltersManager) Filter(chatID string) *Filter { + f.mutex.Lock() + defer f.mutex.Unlock() + return f.filters[chatID] } // FilterByFilterID returns a Filter with a given Whisper filter ID. -func (s *FiltersManager) FilterByFilterID(filterID string) *Filter { - s.mutex.Lock() - defer s.mutex.Unlock() - for _, f := range s.filters { +func (f *FiltersManager) FilterByFilterID(filterID string) *Filter { + f.mutex.Lock() + defer f.mutex.Unlock() + for _, f := range f.filters { if f.FilterID == filterID { return f } @@ -226,20 +226,19 @@ func (s *FiltersManager) FilterByFilterID(filterID string) *Filter { } // FilterByChatID returns a Filter for given chat id -func (s *FiltersManager) FilterByChatID(chatID string) *Filter { - s.mutex.Lock() - defer s.mutex.Unlock() +func (f *FiltersManager) FilterByChatID(chatID string) *Filter { + f.mutex.Lock() + defer f.mutex.Unlock() - return s.filters[chatID] + return f.filters[chatID] } -func (s *FiltersManager) FiltersByPublicKey(publicKey *ecdsa.PublicKey) (result []*Filter) { - s.mutex.Lock() - defer s.mutex.Unlock() - +func (f *FiltersManager) FiltersByPublicKey(publicKey *ecdsa.PublicKey) (result []*Filter) { + f.mutex.Lock() + defer f.mutex.Unlock() identityStr := PublicKeyToStr(publicKey) - for _, f := range s.filters { + for _, f := range f.filters { if f.Identity == identityStr { result = append(result, f) } @@ -249,27 +248,27 @@ func (s *FiltersManager) FiltersByPublicKey(publicKey *ecdsa.PublicKey) (result } // Remove remove all the filters associated with a chat/identity -func (s *FiltersManager) Remove(filters ...*Filter) error { - s.mutex.Lock() - defer s.mutex.Unlock() +func (f *FiltersManager) Remove(filters ...*Filter) error { + f.mutex.Lock() + defer f.mutex.Unlock() - for _, f := range filters { - if err := s.service.Unsubscribe(f.FilterID); err != nil { + for _, filter := range filters { + if err := f.service.Unsubscribe(filter.FilterID); err != nil { return err } - if f.SymKeyID != "" { - s.service.DeleteSymKey(f.SymKeyID) + if filter.SymKeyID != "" { + f.service.DeleteSymKey(filter.SymKeyID) } - delete(s.filters, f.ChatID) + delete(f.filters, filter.ChatID) } return nil } // Remove remove all the filters associated with a chat/identity -func (s *FiltersManager) RemoveNoListenFilters() error { - s.mutex.Lock() - defer s.mutex.Unlock() +func (f *FiltersManager) RemoveNoListenFilters() error { + f.mutex.Lock() + defer f.mutex.Unlock() var filterIDs []string var filters []*Filter @@ -279,31 +278,31 @@ func (s *FiltersManager) RemoveNoListenFilters() error { filters = append(filters, f) } } - if err := s.service.UnsubscribeMany(filterIDs); err != nil { + if err := f.service.UnsubscribeMany(filterIDs); err != nil { return err } for _, filter := range filters { if filter.SymKeyID != "" { - s.service.DeleteSymKey(filter.SymKeyID) + f.service.DeleteSymKey(filter.SymKeyID) } - delete(s.filters, filter.ChatID) + delete(f.filters, filter.ChatID) } return nil } // Remove remove all the filters associated with a chat/identity -func (s *FiltersManager) RemoveFilterByChatID(chatID string) (*Filter, error) { - s.mutex.Lock() - filter, ok := s.filters[chatID] - s.mutex.Unlock() +func (f *FiltersManager) RemoveFilterByChatID(chatID string) (*Filter, error) { + f.mutex.Lock() + filter, ok := f.filters[chatID] + f.mutex.Unlock() if !ok { return nil, nil } - err := s.Remove(filter) + err := f.Remove(filter) if err != nil { return nil, err } @@ -312,28 +311,28 @@ func (s *FiltersManager) RemoveFilterByChatID(chatID string) (*Filter, error) { } // LoadPartitioned creates a filter for a partitioned topic. -func (s *FiltersManager) LoadPartitioned(publicKey *ecdsa.PublicKey, identity *ecdsa.PrivateKey, listen bool) (*Filter, error) { - return s.loadPartitioned(publicKey, identity, listen, false) +func (f *FiltersManager) LoadPartitioned(publicKey *ecdsa.PublicKey, identity *ecdsa.PrivateKey, listen bool) (*Filter, error) { + return f.loadPartitioned(publicKey, identity, listen, false) } // LoadEphemeral creates a filter for a partitioned/personal topic. -func (s *FiltersManager) LoadEphemeral(publicKey *ecdsa.PublicKey, identity *ecdsa.PrivateKey, listen bool) (*Filter, error) { - return s.loadPartitioned(publicKey, identity, listen, true) +func (f *FiltersManager) LoadEphemeral(publicKey *ecdsa.PublicKey, identity *ecdsa.PrivateKey, listen bool) (*Filter, error) { + return f.loadPartitioned(publicKey, identity, listen, true) } // LoadPersonal creates a filter for a personal topic. -func (s *FiltersManager) LoadPersonal(publicKey *ecdsa.PublicKey, identity *ecdsa.PrivateKey, listen bool) (*Filter, error) { - s.mutex.Lock() - defer s.mutex.Unlock() +func (f *FiltersManager) LoadPersonal(publicKey *ecdsa.PublicKey, identity *ecdsa.PrivateKey, listen bool) (*Filter, error) { + f.mutex.Lock() + defer f.mutex.Unlock() chatID := PersonalDiscoveryTopic(publicKey) - if _, ok := s.filters[chatID]; ok { - return s.filters[chatID], nil + if _, ok := f.filters[chatID]; ok { + return f.filters[chatID], nil } // We set up a filter so we can publish, // but we discard envelopes if listen is false. - filter, err := s.addAsymmetric(chatID, identity, listen) + filter, err := f.addAsymmetric(chatID, identity, listen) if err != nil { return nil, err } @@ -347,28 +346,28 @@ func (s *FiltersManager) LoadPersonal(publicKey *ecdsa.PublicKey, identity *ecds OneToOne: true, } - s.filters[chatID] = chat + f.filters[chatID] = chat return chat, nil } -func (s *FiltersManager) loadMyPartitioned() (*Filter, error) { - return s.loadPartitioned(&s.privateKey.PublicKey, s.privateKey, true, false) +func (f *FiltersManager) loadMyPartitioned() (*Filter, error) { + return f.loadPartitioned(&f.privateKey.PublicKey, f.privateKey, true, false) } -func (s *FiltersManager) loadPartitioned(publicKey *ecdsa.PublicKey, identity *ecdsa.PrivateKey, listen, ephemeral bool) (*Filter, error) { - s.mutex.Lock() - defer s.mutex.Unlock() +func (f *FiltersManager) loadPartitioned(publicKey *ecdsa.PublicKey, identity *ecdsa.PrivateKey, listen, ephemeral bool) (*Filter, error) { + f.mutex.Lock() + defer f.mutex.Unlock() chatID := PartitionedTopic(publicKey) - if _, ok := s.filters[chatID]; ok { - return s.filters[chatID], nil + if _, ok := f.filters[chatID]; ok { + return f.filters[chatID], nil } // We set up a filter so we can publish, // but we discard envelopes if listen is false. - filter, err := s.addAsymmetric(chatID, identity, listen) + filter, err := f.addAsymmetric(chatID, identity, listen) if err != nil { return nil, err } @@ -383,24 +382,24 @@ func (s *FiltersManager) loadPartitioned(publicKey *ecdsa.PublicKey, identity *e OneToOne: true, } - s.filters[chatID] = chat + f.filters[chatID] = chat return chat, nil } // LoadNegotiated loads a negotiated secret as a filter. -func (s *FiltersManager) LoadNegotiated(secret types.NegotiatedSecret) (*Filter, error) { - s.mutex.Lock() - defer s.mutex.Unlock() +func (f *FiltersManager) LoadNegotiated(secret types.NegotiatedSecret) (*Filter, error) { + f.mutex.Lock() + defer f.mutex.Unlock() chatID := NegotiatedTopic(secret.PublicKey) - if _, ok := s.filters[chatID]; ok { - return s.filters[chatID], nil + if _, ok := f.filters[chatID]; ok { + return f.filters[chatID], nil } keyString := hex.EncodeToString(secret.Key) - filter, err := s.addSymmetric(keyString) + filter, err := f.addSymmetric(keyString) if err != nil { return nil, err } @@ -416,25 +415,25 @@ func (s *FiltersManager) LoadNegotiated(secret types.NegotiatedSecret) (*Filter, OneToOne: true, } - s.filters[chat.ChatID] = chat + f.filters[chat.ChatID] = chat return chat, nil } // LoadDiscovery adds 1 discovery filter // for the personal discovery topic. -func (s *FiltersManager) LoadDiscovery() ([]*Filter, error) { - s.mutex.Lock() - defer s.mutex.Unlock() +func (f *FiltersManager) LoadDiscovery() ([]*Filter, error) { + f.mutex.Lock() + defer f.mutex.Unlock() - personalDiscoveryTopic := PersonalDiscoveryTopic(&s.privateKey.PublicKey) + personalDiscoveryTopic := PersonalDiscoveryTopic(&f.privateKey.PublicKey) // Check if filters are already loaded. var result []*Filter expectedTopicCount := 1 - if chat, ok := s.filters[personalDiscoveryTopic]; ok { + if chat, ok := f.filters[personalDiscoveryTopic]; ok { result = append(result, chat) } @@ -442,7 +441,7 @@ func (s *FiltersManager) LoadDiscovery() ([]*Filter, error) { return result, nil } - identityStr := PublicKeyToStr(&s.privateKey.PublicKey) + identityStr := PublicKeyToStr(&f.privateKey.PublicKey) // Load personal discovery personalDiscoveryChat := &Filter{ @@ -453,7 +452,7 @@ func (s *FiltersManager) LoadDiscovery() ([]*Filter, error) { OneToOne: true, } - discoveryResponse, err := s.addAsymmetric(personalDiscoveryChat.ChatID, s.privateKey, true) + discoveryResponse, err := f.addAsymmetric(personalDiscoveryChat.ChatID, f.privateKey, true) if err != nil { return nil, err } @@ -461,21 +460,21 @@ func (s *FiltersManager) LoadDiscovery() ([]*Filter, error) { personalDiscoveryChat.Topic = discoveryResponse.Topic personalDiscoveryChat.FilterID = discoveryResponse.FilterID - s.filters[personalDiscoveryChat.ChatID] = personalDiscoveryChat + f.filters[personalDiscoveryChat.ChatID] = personalDiscoveryChat return []*Filter{personalDiscoveryChat}, nil } // LoadPublic adds a filter for a public chat. -func (s *FiltersManager) LoadPublic(chatID string) (*Filter, error) { - s.mutex.Lock() - defer s.mutex.Unlock() +func (f *FiltersManager) LoadPublic(chatID string) (*Filter, error) { + f.mutex.Lock() + defer f.mutex.Unlock() - if chat, ok := s.filters[chatID]; ok { + if chat, ok := f.filters[chatID]; ok { return chat, nil } - filterAndTopic, err := s.addSymmetric(chatID) + filterAndTopic, err := f.addSymmetric(chatID) if err != nil { return nil, err } @@ -489,23 +488,23 @@ func (s *FiltersManager) LoadPublic(chatID string) (*Filter, error) { OneToOne: false, } - s.filters[chatID] = chat + f.filters[chatID] = chat return chat, nil } // LoadContactCode creates a filter for the advertise topic for a given public key. -func (s *FiltersManager) LoadContactCode(pubKey *ecdsa.PublicKey) (*Filter, error) { - s.mutex.Lock() - defer s.mutex.Unlock() +func (f *FiltersManager) LoadContactCode(pubKey *ecdsa.PublicKey) (*Filter, error) { + f.mutex.Lock() + defer f.mutex.Unlock() chatID := ContactCodeTopic(pubKey) - if _, ok := s.filters[chatID]; ok { - return s.filters[chatID], nil + if _, ok := f.filters[chatID]; ok { + return f.filters[chatID], nil } - contactCodeFilter, err := s.addSymmetric(chatID) + contactCodeFilter, err := f.addSymmetric(chatID) if err != nil { return nil, err } @@ -519,41 +518,41 @@ func (s *FiltersManager) LoadContactCode(pubKey *ecdsa.PublicKey) (*Filter, erro Listen: true, } - s.filters[chatID] = chat + f.filters[chatID] = chat return chat, nil } // addSymmetric adds a symmetric key filter -func (s *FiltersManager) addSymmetric(chatID string) (*RawFilter, error) { +func (f *FiltersManager) addSymmetric(chatID string) (*RawFilter, error) { var symKeyID string var err error topic := ToTopic(chatID) topics := [][]byte{topic} - symKey, ok := s.keys[chatID] + symKey, ok := f.keys[chatID] if ok { - symKeyID, err = s.service.AddSymKeyDirect(symKey) + symKeyID, err = f.service.AddSymKeyDirect(symKey) if err != nil { return nil, err } } else { - symKeyID, err = s.service.AddSymKeyFromPassword(chatID) + symKeyID, err = f.service.AddSymKeyFromPassword(chatID) if err != nil { return nil, err } - if symKey, err = s.service.GetSymKey(symKeyID); err != nil { + if symKey, err = f.service.GetSymKey(symKeyID); err != nil { return nil, err } - s.keys[chatID] = symKey + f.keys[chatID] = symKey - err = s.persistence.Add(chatID, symKey) + err = f.persistence.Add(chatID, symKey) if err != nil { return nil, err } } - id, err := s.service.Subscribe(&types.SubscriptionOptions{ + id, err := f.service.Subscribe(&types.SubscriptionOptions{ SymKeyID: symKeyID, PoW: minPow, Topics: topics, @@ -571,7 +570,7 @@ func (s *FiltersManager) addSymmetric(chatID string) (*RawFilter, error) { // addAsymmetricFilter adds a filter with our private key // and set minPow according to the listen parameter. -func (s *FiltersManager) addAsymmetric(chatID string, identity *ecdsa.PrivateKey, listen bool) (*RawFilter, error) { +func (f *FiltersManager) addAsymmetric(chatID string, identity *ecdsa.PrivateKey, listen bool) (*RawFilter, error) { var ( err error pow = 1.0 // use PoW high enough to discard all messages for the filter @@ -584,12 +583,12 @@ func (s *FiltersManager) addAsymmetric(chatID string, identity *ecdsa.PrivateKey topic := ToTopic(chatID) topics := [][]byte{topic} - privateKeyID, err := s.service.AddKeyPair(identity) + privateKeyID, err := f.service.AddKeyPair(identity) if err != nil { return nil, err } - id, err := s.service.Subscribe(&types.SubscriptionOptions{ + id, err := f.service.Subscribe(&types.SubscriptionOptions{ PrivateKeyID: privateKeyID, PoW: pow, Topics: topics, @@ -601,9 +600,9 @@ func (s *FiltersManager) addAsymmetric(chatID string, identity *ecdsa.PrivateKey } // GetNegotiated returns a negotiated chat given an identity -func (s *FiltersManager) GetNegotiated(identity *ecdsa.PublicKey) *Filter { - s.mutex.Lock() - defer s.mutex.Unlock() +func (f *FiltersManager) GetNegotiated(identity *ecdsa.PublicKey) *Filter { + f.mutex.Lock() + defer f.mutex.Unlock() - return s.filters[NegotiatedTopic(identity)] + return f.filters[NegotiatedTopic(identity)] } diff --git a/protocol/transport/waku/migrations/migrations.go b/protocol/transport/migrations/migrations.go similarity index 100% rename from protocol/transport/waku/migrations/migrations.go rename to protocol/transport/migrations/migrations.go diff --git a/protocol/transport/waku/migrations/sqlite/1561059284_add_waku_keys.down.sql b/protocol/transport/migrations/sqlite/1561059284_add_waku_keys.down.sql similarity index 100% rename from protocol/transport/waku/migrations/sqlite/1561059284_add_waku_keys.down.sql rename to protocol/transport/migrations/sqlite/1561059284_add_waku_keys.down.sql diff --git a/protocol/transport/waku/migrations/sqlite/1561059284_add_waku_keys.up.sql b/protocol/transport/migrations/sqlite/1561059284_add_waku_keys.up.sql similarity index 100% rename from protocol/transport/waku/migrations/sqlite/1561059284_add_waku_keys.up.sql rename to protocol/transport/migrations/sqlite/1561059284_add_waku_keys.up.sql diff --git a/protocol/transport/waku/migrations/sqlite/doc.go b/protocol/transport/migrations/sqlite/doc.go similarity index 100% rename from protocol/transport/waku/migrations/sqlite/doc.go rename to protocol/transport/migrations/sqlite/doc.go diff --git a/protocol/transport/waku/persistence.go b/protocol/transport/persistence.go similarity index 98% rename from protocol/transport/waku/persistence.go rename to protocol/transport/persistence.go index 0e0d865b9..8721a0043 100644 --- a/protocol/transport/waku/persistence.go +++ b/protocol/transport/persistence.go @@ -1,4 +1,4 @@ -package waku +package transport import ( "database/sql" diff --git a/protocol/transport/transport.go b/protocol/transport/transport.go index 6df05fa56..110ffe567 100644 --- a/protocol/transport/transport.go +++ b/protocol/transport/transport.go @@ -1,66 +1,552 @@ package transport import ( + "bytes" "context" "crypto/ecdsa" + "database/sql" + "encoding/hex" + "math/big" + "sync" + "time" + + "github.com/google/uuid" + "github.com/pkg/errors" + "go.uber.org/zap" "github.com/status-im/status-go/eth-node/crypto" "github.com/status-im/status-go/eth-node/types" ) -type Transport interface { - Stop() error +var ( + // ErrNoMailservers returned if there is no configured mailservers that can be used. + ErrNoMailservers = errors.New("no configured mailservers") +) - JoinPrivate(publicKey *ecdsa.PublicKey) (*Filter, error) - LeavePrivate(publicKey *ecdsa.PublicKey) error - JoinGroup(publicKeys []*ecdsa.PublicKey) ([]*Filter, error) - LeaveGroup(publicKeys []*ecdsa.PublicKey) error - JoinPublic(chatID string) (*Filter, error) - LeavePublic(chatID string) error - GetCurrentTime() uint64 - MaxMessageSize() uint32 +type transportKeysManager struct { + waku types.Waku - SendPublic(ctx context.Context, newMessage *types.NewMessage, chatName string) ([]byte, error) - SendPrivateWithSharedSecret(ctx context.Context, newMessage *types.NewMessage, publicKey *ecdsa.PublicKey, secret []byte) ([]byte, error) - SendPrivateWithPartitioned(ctx context.Context, newMessage *types.NewMessage, publicKey *ecdsa.PublicKey) ([]byte, error) - SendPrivateOnPersonalTopic(ctx context.Context, newMessage *types.NewMessage, publicKey *ecdsa.PublicKey) ([]byte, error) - SendCommunityMessage(ctx context.Context, newMessage *types.NewMessage, publicKey *ecdsa.PublicKey) ([]byte, error) - SendMessagesRequest( - ctx context.Context, - peerID []byte, - from, to uint32, - previousCursor []byte, - waitForResponse bool, - ) (cursor []byte, err error) + // Identity of the current user. + privateKey *ecdsa.PrivateKey - SendMessagesRequestForFilter( - ctx context.Context, - peerID []byte, - from, to uint32, - previousCursor []byte, - filter *Filter, - waitForResponse bool, - ) (cursor []byte, err error) - FilterByChatID(string) *Filter + passToSymKeyMutex sync.RWMutex + passToSymKeyCache map[string]string +} - Track(identifiers [][]byte, hash []byte, newMessage *types.NewMessage) +func (m *transportKeysManager) AddOrGetKeyPair(priv *ecdsa.PrivateKey) (string, error) { + // caching is handled in waku + return m.waku.AddKeyPair(priv) +} - InitFilters(chatIDs []string, publicKeys []*ecdsa.PublicKey) ([]*Filter, error) - InitPublicFilters(chatIDs []string) ([]*Filter, error) - InitCommunityFilters(pks []*ecdsa.PrivateKey) ([]*Filter, error) - LoadFilters(filters []*Filter) ([]*Filter, error) - RemoveFilters(filters []*Filter) error - RemoveFilterByChatID(string) (*Filter, error) - ResetFilters() error - Filters() []*Filter - LoadKeyFilters(*ecdsa.PrivateKey) (*Filter, error) - ProcessNegotiatedSecret(secret types.NegotiatedSecret) (*Filter, error) - RetrieveRawAll() (map[Filter][]*types.Message, error) +func (m *transportKeysManager) AddOrGetSymKeyFromPassword(password string) (string, error) { + m.passToSymKeyMutex.Lock() + defer m.passToSymKeyMutex.Unlock() - ConfirmMessagesProcessed([]string, uint64) error - CleanMessagesProcessed(uint64) error + if val, ok := m.passToSymKeyCache[password]; ok { + return val, nil + } - SetEnvelopeEventsHandler(handler EnvelopeEventsHandler) error + id, err := m.waku.AddSymKeyFromPassword(password) + if err != nil { + return id, err + } + + m.passToSymKeyCache[password] = id + + return id, nil +} + +func (m *transportKeysManager) RawSymKey(id string) ([]byte, error) { + return m.waku.GetSymKey(id) +} + +type Option func(*Transport) error + +// Transport is a transport based on Whisper service. +type Transport struct { + waku types.Waku + api types.PublicWakuAPI // only PublicWakuAPI implements logic to send messages + keysManager *transportKeysManager + filters *FiltersManager + logger *zap.Logger + cache *ProcessedMessageIDsCache + + mailservers []string + envelopesMonitor *EnvelopesMonitor + quit chan struct{} +} + +// NewTransport returns a new Transport. +// TODO: leaving a chat should verify that for a given public key +// there are no other chats. It may happen that we leave a private chat +// but still have a public chat for a given public key. +func NewTransport( + waku types.Waku, + privateKey *ecdsa.PrivateKey, + db *sql.DB, + mailservers []string, + envelopesMonitorConfig *EnvelopesMonitorConfig, + logger *zap.Logger, + opts ...Option, +) (*Transport, error) { + filtersManager, err := NewFiltersManager(newSQLitePersistence(db), waku, privateKey, logger) + if err != nil { + return nil, err + } + + var envelopesMonitor *EnvelopesMonitor + if envelopesMonitorConfig != nil { + envelopesMonitor = NewEnvelopesMonitor(waku, *envelopesMonitorConfig) + envelopesMonitor.Start() + } + + var api types.PublicWhisperAPI + if waku != nil { + api = waku.PublicWakuAPI() + } + t := &Transport{ + waku: waku, + api: api, + cache: NewProcessedMessageIDsCache(db), + envelopesMonitor: envelopesMonitor, + quit: make(chan struct{}), + keysManager: &transportKeysManager{ + waku: waku, + privateKey: privateKey, + passToSymKeyCache: make(map[string]string), + }, + filters: filtersManager, + mailservers: mailservers, + logger: logger.With(zap.Namespace("Transport")), + } + + for _, opt := range opts { + if err := opt(t); err != nil { + return nil, err + } + } + + t.cleanFiltersLoop() + + return t, nil +} + +func (t *Transport) InitFilters(chatIDs []string, publicKeys []*ecdsa.PublicKey) ([]*Filter, error) { + return t.filters.Init(chatIDs, publicKeys) +} + +func (t *Transport) InitPublicFilters(chatIDs []string) ([]*Filter, error) { + return t.filters.InitPublicFilters(chatIDs) +} + +func (t *Transport) Filters() []*Filter { + return t.filters.Filters() +} + +func (t *Transport) FilterByChatID(chatID string) *Filter { + return t.filters.FilterByChatID(chatID) +} + +func (t *Transport) LoadFilters(filters []*Filter) ([]*Filter, error) { + return t.filters.InitWithFilters(filters) +} + +func (t *Transport) InitCommunityFilters(pks []*ecdsa.PrivateKey) ([]*Filter, error) { + return t.filters.InitCommunityFilters(pks) +} + +func (t *Transport) RemoveFilters(filters []*Filter) error { + return t.filters.Remove(filters...) +} + +func (t *Transport) RemoveFilterByChatID(chatID string) (*Filter, error) { + return t.filters.RemoveFilterByChatID(chatID) +} + +func (t *Transport) ResetFilters() error { + return t.filters.Reset() +} + +func (t *Transport) ProcessNegotiatedSecret(secret types.NegotiatedSecret) (*Filter, error) { + filter, err := t.filters.LoadNegotiated(secret) + if err != nil { + return nil, err + } + return filter, nil +} + +func (t *Transport) JoinPublic(chatID string) (*Filter, error) { + return t.filters.LoadPublic(chatID) +} + +func (t *Transport) LeavePublic(chatID string) error { + chat := t.filters.Filter(chatID) + if chat != nil { + return nil + } + return t.filters.Remove(chat) +} + +func (t *Transport) JoinPrivate(publicKey *ecdsa.PublicKey) (*Filter, error) { + return t.filters.LoadContactCode(publicKey) +} + +func (t *Transport) LeavePrivate(publicKey *ecdsa.PublicKey) error { + filters := t.filters.FiltersByPublicKey(publicKey) + return t.filters.Remove(filters...) +} + +func (t *Transport) JoinGroup(publicKeys []*ecdsa.PublicKey) ([]*Filter, error) { + var filters []*Filter + for _, pk := range publicKeys { + f, err := t.filters.LoadContactCode(pk) + if err != nil { + return nil, err + } + filters = append(filters, f) + + } + return filters, nil +} + +func (t *Transport) LeaveGroup(publicKeys []*ecdsa.PublicKey) error { + for _, publicKey := range publicKeys { + filters := t.filters.FiltersByPublicKey(publicKey) + if err := t.filters.Remove(filters...); err != nil { + return err + } + } + return nil +} + +func (t *Transport) RetrieveRawAll() (map[Filter][]*types.Message, error) { + result := make(map[Filter][]*types.Message) + + allFilters := t.filters.Filters() + for _, filter := range allFilters { + // Don't pull from filters we don't listen to + if !filter.Listen { + continue + } + msgs, err := t.api.GetFilterMessages(filter.FilterID) + if err != nil { + t.logger.Warn("failed to fetch messages", zap.Error(err)) + continue + } + if len(msgs) == 0 { + continue + } + + ids := make([]string, len(msgs)) + for i := range msgs { + id := types.EncodeHex(msgs[i].Hash) + ids[i] = id + } + + hits, err := t.cache.Hits(ids) + if err != nil { + t.logger.Error("failed to check messages exists", zap.Error(err)) + return nil, err + } + + for i := range msgs { + // Exclude anything that is a cache hit + if !hits[types.EncodeHex(msgs[i].Hash)] { + result[*filter] = append(result[*filter], msgs[i]) + } + } + + } + + return result, nil +} + +// SendPublic sends a new message using the Whisper service. +// For public filters, chat name is used as an ID as well as +// a topic. +func (t *Transport) SendPublic(ctx context.Context, newMessage *types.NewMessage, chatName string) ([]byte, error) { + if err := t.addSig(newMessage); err != nil { + return nil, err + } + + filter, err := t.filters.LoadPublic(chatName) + if err != nil { + return nil, err + } + + newMessage.SymKeyID = filter.SymKeyID + newMessage.Topic = filter.Topic + + return t.api.Post(ctx, *newMessage) +} + +func (t *Transport) SendPrivateWithSharedSecret(ctx context.Context, newMessage *types.NewMessage, publicKey *ecdsa.PublicKey, secret []byte) ([]byte, error) { + if err := t.addSig(newMessage); err != nil { + return nil, err + } + + filter, err := t.filters.LoadNegotiated(types.NegotiatedSecret{ + PublicKey: publicKey, + Key: secret, + }) + if err != nil { + return nil, err + } + + newMessage.SymKeyID = filter.SymKeyID + newMessage.Topic = filter.Topic + newMessage.PublicKey = nil + + return t.api.Post(ctx, *newMessage) +} + +func (t *Transport) SendPrivateWithPartitioned(ctx context.Context, newMessage *types.NewMessage, publicKey *ecdsa.PublicKey) ([]byte, error) { + if err := t.addSig(newMessage); err != nil { + return nil, err + } + + filter, err := t.filters.LoadPartitioned(publicKey, t.keysManager.privateKey, false) + if err != nil { + return nil, err + } + + newMessage.Topic = filter.Topic + newMessage.PublicKey = crypto.FromECDSAPub(publicKey) + + return t.api.Post(ctx, *newMessage) +} + +func (t *Transport) SendPrivateOnPersonalTopic(ctx context.Context, newMessage *types.NewMessage, publicKey *ecdsa.PublicKey) ([]byte, error) { + if err := t.addSig(newMessage); err != nil { + return nil, err + } + + filter, err := t.filters.LoadPersonal(publicKey, t.keysManager.privateKey, false) + if err != nil { + return nil, err + } + + newMessage.Topic = filter.Topic + newMessage.PublicKey = crypto.FromECDSAPub(publicKey) + + return t.api.Post(ctx, *newMessage) +} + +func (t *Transport) LoadKeyFilters(key *ecdsa.PrivateKey) (*Filter, error) { + return t.filters.LoadEphemeral(&key.PublicKey, key, true) +} + +func (t *Transport) SendCommunityMessage(ctx context.Context, newMessage *types.NewMessage, publicKey *ecdsa.PublicKey) ([]byte, error) { + if err := t.addSig(newMessage); err != nil { + return nil, err + } + + // We load the filter to make sure we can post on it + filter, err := t.filters.LoadPublic(PubkeyToHex(publicKey)[2:]) + if err != nil { + return nil, err + } + + newMessage.Topic = filter.Topic + newMessage.PublicKey = crypto.FromECDSAPub(publicKey) + + t.logger.Debug("SENDING message", zap.Binary("topic", filter.Topic[:])) + + return t.api.Post(ctx, *newMessage) +} + +func (t *Transport) cleanFilters() error { + return t.filters.RemoveNoListenFilters() +} + +func (t *Transport) addSig(newMessage *types.NewMessage) error { + sigID, err := t.keysManager.AddOrGetKeyPair(t.keysManager.privateKey) + if err != nil { + return err + } + newMessage.SigID = sigID + return nil +} + +func (t *Transport) Track(identifiers [][]byte, hash []byte, newMessage *types.NewMessage) { + if t.envelopesMonitor != nil { + t.envelopesMonitor.Add(identifiers, types.BytesToHash(hash), *newMessage) + } +} + +// GetCurrentTime returns the current unix timestamp in milliseconds +func (t *Transport) GetCurrentTime() uint64 { + return uint64(t.waku.GetCurrentTime().UnixNano() / int64(time.Millisecond)) +} + +func (t *Transport) MaxMessageSize() uint32 { + return t.waku.MaxMessageSize() +} + +func (t *Transport) Stop() error { + close(t.quit) + if t.envelopesMonitor != nil { + t.envelopesMonitor.Stop() + } + return nil +} + +// cleanFiltersLoop cleans up the topic we create for the only purpose +// of sending messages. +// Whenever we send a message we also need to listen to that particular topic +// but in case of asymettric topics, we are not interested in listening to them. +// We therefore periodically clean them up so we don't receive unnecessary data. + +func (t *Transport) cleanFiltersLoop() { + + ticker := time.NewTicker(5 * time.Minute) + go func() { + for { + select { + case <-t.quit: + ticker.Stop() + return + case <-ticker.C: + err := t.cleanFilters() + if err != nil { + t.logger.Error("failed to clean up topics", zap.Error(err)) + } + } + } + }() +} + +func (t *Transport) sendMessagesRequestForTopics( + ctx context.Context, + peerID []byte, + from, to uint32, + previousCursor []byte, + topics []types.TopicType, + waitForResponse bool, +) (cursor []byte, err error) { + + r := createMessagesRequest(from, to, previousCursor, topics) + r.SetDefaults(t.waku.GetCurrentTime()) + + events := make(chan types.EnvelopeEvent, 10) + sub := t.waku.SubscribeEnvelopeEvents(events) + defer sub.Unsubscribe() + + err = t.waku.SendMessagesRequest(peerID, r) + if err != nil { + return + } + + if !waitForResponse { + return + } + + resp, err := t.waitForRequestCompleted(ctx, r.ID, events) + if err == nil && resp != nil && resp.Error != nil { + err = resp.Error + } else if err == nil && resp != nil { + cursor = resp.Cursor + } + return +} + +// RequestHistoricMessages requests historic messages for all registered filters. +func (t *Transport) SendMessagesRequest( + ctx context.Context, + peerID []byte, + from, to uint32, + previousCursor []byte, + waitForResponse bool, +) (cursor []byte, err error) { + + topics := make([]types.TopicType, len(t.Filters())) + for _, f := range t.Filters() { + topics = append(topics, f.Topic) + } + + return t.sendMessagesRequestForTopics(ctx, peerID, from, to, previousCursor, topics, waitForResponse) +} + +func (t *Transport) SendMessagesRequestForFilter( + ctx context.Context, + peerID []byte, + from, to uint32, + previousCursor []byte, + filter *Filter, + waitForResponse bool, +) (cursor []byte, err error) { + + topics := make([]types.TopicType, len(t.Filters())) + topics = append(topics, filter.Topic) + + return t.sendMessagesRequestForTopics(ctx, peerID, from, to, previousCursor, topics, waitForResponse) +} + +func createMessagesRequest(from, to uint32, cursor []byte, topics []types.TopicType) types.MessagesRequest { + aUUID := uuid.New() + // uuid is 16 bytes, converted to hex it's 32 bytes as expected by types.MessagesRequest + id := []byte(hex.EncodeToString(aUUID[:])) + return types.MessagesRequest{ + ID: id, + From: from, + To: to, + Limit: 100, + Cursor: cursor, + Bloom: topicsToBloom(topics...), + } +} + +func topicsToBloom(topics ...types.TopicType) []byte { + i := new(big.Int) + for _, topic := range topics { + bloom := types.TopicToBloom(topic) + i.Or(i, new(big.Int).SetBytes(bloom[:])) + } + + combined := make([]byte, types.BloomFilterSize) + data := i.Bytes() + copy(combined[types.BloomFilterSize-len(data):], data[:]) + + return combined +} + +func (t *Transport) waitForRequestCompleted(ctx context.Context, requestID []byte, events chan types.EnvelopeEvent) (*types.MailServerResponse, error) { + for { + select { + case ev := <-events: + if !bytes.Equal(ev.Hash.Bytes(), requestID) { + continue + } + if ev.Event != types.EventMailServerRequestCompleted { + continue + } + data, ok := ev.Data.(*types.MailServerResponse) + if ok { + return data, nil + } + case <-ctx.Done(): + return nil, ctx.Err() + } + } +} + +// ConfirmMessagesProcessed marks the messages as processed in the cache so +// they won't be passed to the next layer anymore +func (t *Transport) ConfirmMessagesProcessed(ids []string, timestamp uint64) error { + return t.cache.Add(ids, timestamp) +} + +// CleanMessagesProcessed clears the messages that are older than timestamp +func (t *Transport) CleanMessagesProcessed(timestamp uint64) error { + return t.cache.Clean(timestamp) +} + +func (t *Transport) SetEnvelopeEventsHandler(handler EnvelopeEventsHandler) error { + if t.envelopesMonitor == nil { + return errors.New("Current transport has no envelopes monitor") + } + t.envelopesMonitor.handler = handler + return nil } func PubkeyToHex(key *ecdsa.PublicKey) string { diff --git a/protocol/transport/waku/waku_service_test.go b/protocol/transport/transport_test.go similarity index 89% rename from protocol/transport/waku/waku_service_test.go rename to protocol/transport/transport_test.go index d11121998..5a1da2449 100644 --- a/protocol/transport/waku/waku_service_test.go +++ b/protocol/transport/transport_test.go @@ -1,4 +1,4 @@ -package waku +package transport import ( "io/ioutil" @@ -12,7 +12,7 @@ import ( "github.com/status-im/status-go/protocol/tt" ) -func TestNewWakuServiceTransport(t *testing.T) { +func TestNewTransport(t *testing.T) { dbPath, err := ioutil.TempFile("", "transport.sql") require.NoError(t, err) defer os.Remove(dbPath.Name()) diff --git a/protocol/transport/waku/envelopes.go b/protocol/transport/waku/envelopes.go deleted file mode 100644 index 0d991475f..000000000 --- a/protocol/transport/waku/envelopes.go +++ /dev/null @@ -1,310 +0,0 @@ -package waku - -import ( - "context" - "errors" - "sync" - - "go.uber.org/zap" - - "github.com/status-im/status-go/eth-node/types" - "github.com/status-im/status-go/protocol/transport" -) - -// EnvelopeState in local tracker -type EnvelopeState int - -const ( - // NotRegistered returned if asked hash wasn't registered in the tracker. - NotRegistered EnvelopeState = -1 - // EnvelopePosted is set when envelope was added to a local waku queue. - EnvelopePosted EnvelopeState = iota - // EnvelopeSent is set when envelope is sent to at least one peer. - EnvelopeSent -) - -// EnvelopeEventsHandler used for two different event types. -type EnvelopeEventsHandler interface { - EnvelopeSent([][]byte) - EnvelopeExpired([][]byte, error) - MailServerRequestCompleted(types.Hash, types.Hash, []byte, error) - MailServerRequestExpired(types.Hash) -} - -// NewEnvelopesMonitor returns a pointer to an instance of the EnvelopesMonitor. -func NewEnvelopesMonitor(w types.Waku, config transport.EnvelopesMonitorConfig) *EnvelopesMonitor { - logger := config.Logger - - if logger == nil { - logger = zap.NewNop() - } - - var api types.PublicWakuAPI - if w != nil { - api = w.PublicWakuAPI() - } - - return &EnvelopesMonitor{ - w: w, - api: api, - handler: config.EnvelopeEventsHandler, - mailServerConfirmation: config.MailserverConfirmationsEnabled, - maxAttempts: config.MaxAttempts, - isMailserver: config.IsMailserver, - logger: logger.With(zap.Namespace("EnvelopesMonitor")), - - // key is envelope hash (event.Hash) - envelopes: map[types.Hash]EnvelopeState{}, - messages: map[types.Hash]*types.NewMessage{}, - attempts: map[types.Hash]int{}, - identifiers: make(map[types.Hash][][]byte), - - // key is hash of the batch (event.Batch) - batches: map[types.Hash]map[types.Hash]struct{}{}, - } -} - -// EnvelopesMonitor is responsible for monitoring waku envelopes state. -type EnvelopesMonitor struct { - w types.Waku - api types.PublicWakuAPI - handler EnvelopeEventsHandler - mailServerConfirmation bool - maxAttempts int - - mu sync.Mutex - envelopes map[types.Hash]EnvelopeState - batches map[types.Hash]map[types.Hash]struct{} - - messages map[types.Hash]*types.NewMessage - attempts map[types.Hash]int - identifiers map[types.Hash][][]byte - - wg sync.WaitGroup - quit chan struct{} - isMailserver func(peer types.EnodeID) bool - - logger *zap.Logger -} - -// Start processing events. -func (m *EnvelopesMonitor) Start() { - m.quit = make(chan struct{}) - m.wg.Add(1) - go func() { - m.handleEnvelopeEvents() - m.wg.Done() - }() -} - -// Stop process events. -func (m *EnvelopesMonitor) Stop() { - close(m.quit) - m.wg.Wait() -} - -// Add hash to a tracker. -func (m *EnvelopesMonitor) Add(identifiers [][]byte, envelopeHash types.Hash, message types.NewMessage) { - m.mu.Lock() - defer m.mu.Unlock() - m.envelopes[envelopeHash] = EnvelopePosted - m.identifiers[envelopeHash] = identifiers - m.messages[envelopeHash] = &message - m.attempts[envelopeHash] = 1 -} - -func (m *EnvelopesMonitor) GetState(hash types.Hash) EnvelopeState { - m.mu.Lock() - defer m.mu.Unlock() - state, exist := m.envelopes[hash] - if !exist { - return NotRegistered - } - return state -} - -// handleEnvelopeEvents processes waku envelope events -func (m *EnvelopesMonitor) handleEnvelopeEvents() { - events := make(chan types.EnvelopeEvent, 100) // must be buffered to prevent blocking waku - sub := m.w.SubscribeEnvelopeEvents(events) - defer func() { - close(events) - sub.Unsubscribe() - }() - for { - select { - case <-m.quit: - return - case event := <-events: - m.handleEvent(event) - } - } -} - -// handleEvent based on type of the event either triggers -// confirmation handler or removes hash from tracker -func (m *EnvelopesMonitor) handleEvent(event types.EnvelopeEvent) { - handlers := map[types.EventType]func(types.EnvelopeEvent){ - types.EventEnvelopeSent: m.handleEventEnvelopeSent, - types.EventEnvelopeExpired: m.handleEventEnvelopeExpired, - types.EventBatchAcknowledged: m.handleAcknowledgedBatch, - types.EventEnvelopeReceived: m.handleEventEnvelopeReceived, - } - if handler, ok := handlers[event.Event]; ok { - handler(event) - } -} - -func (m *EnvelopesMonitor) handleEventEnvelopeSent(event types.EnvelopeEvent) { - if m.mailServerConfirmation { - if !m.isMailserver(event.Peer) { - return - } - } - - m.mu.Lock() - defer m.mu.Unlock() - - state, ok := m.envelopes[event.Hash] - // if we didn't send a message using extension - skip it - // if message was already confirmed - skip it - if !ok || state == EnvelopeSent { - return - } - m.logger.Debug("envelope is sent", zap.String("hash", event.Hash.String()), zap.String("peer", event.Peer.String())) - if event.Batch != (types.Hash{}) { - if _, ok := m.batches[event.Batch]; !ok { - m.batches[event.Batch] = map[types.Hash]struct{}{} - } - m.batches[event.Batch][event.Hash] = struct{}{} - m.logger.Debug("waiting for a confirmation", zap.String("batch", event.Batch.String())) - } else { - m.envelopes[event.Hash] = EnvelopeSent - if m.handler != nil { - m.handler.EnvelopeSent(m.identifiers[event.Hash]) - } - } -} - -func (m *EnvelopesMonitor) handleAcknowledgedBatch(event types.EnvelopeEvent) { - if m.mailServerConfirmation { - if !m.isMailserver(event.Peer) { - return - } - } - - m.mu.Lock() - defer m.mu.Unlock() - - envelopes, ok := m.batches[event.Batch] - if !ok { - m.logger.Debug("batch is not found", zap.String("batch", event.Batch.String())) - } - m.logger.Debug("received a confirmation", zap.String("batch", event.Batch.String()), zap.String("peer", event.Peer.String())) - envelopeErrors, ok := event.Data.([]types.EnvelopeError) - if event.Data != nil && !ok { - m.logger.Error("received unexpected data in the the confirmation event", zap.Any("data", event.Data)) - } - failedEnvelopes := map[types.Hash]struct{}{} - for i := range envelopeErrors { - envelopeError := envelopeErrors[i] - _, exist := m.envelopes[envelopeError.Hash] - if exist { - m.logger.Warn("envelope that was posted by us is discarded", zap.String("hash", envelopeError.Hash.String()), zap.String("peer", event.Peer.String()), zap.String("error", envelopeError.Description)) - var err error - switch envelopeError.Code { - case types.EnvelopeTimeNotSynced: - err = errors.New("envelope wasn't delivered due to time sync issues") - } - m.handleEnvelopeFailure(envelopeError.Hash, err) - } - failedEnvelopes[envelopeError.Hash] = struct{}{} - } - - for hash := range envelopes { - if _, exist := failedEnvelopes[hash]; exist { - continue - } - state, ok := m.envelopes[hash] - if !ok || state == EnvelopeSent { - continue - } - m.envelopes[hash] = EnvelopeSent - if m.handler != nil { - m.handler.EnvelopeSent(m.identifiers[hash]) - } - } - delete(m.batches, event.Batch) -} - -func (m *EnvelopesMonitor) handleEventEnvelopeExpired(event types.EnvelopeEvent) { - m.mu.Lock() - defer m.mu.Unlock() - m.handleEnvelopeFailure(event.Hash, errors.New("envelope expired due to connectivity issues")) -} - -// handleEnvelopeFailure is a common code path for processing envelopes failures. not thread safe, lock -// must be used on a higher level. -func (m *EnvelopesMonitor) handleEnvelopeFailure(hash types.Hash, err error) { - if state, ok := m.envelopes[hash]; ok { - message, exist := m.messages[hash] - if !exist { - m.logger.Error("message was deleted erroneously", zap.String("envelope hash", hash.String())) - } - attempt := m.attempts[hash] - identifiers := m.identifiers[hash] - m.clearMessageState(hash) - if state == EnvelopeSent { - return - } - if attempt < m.maxAttempts { - m.logger.Debug("retrying to send a message", zap.String("hash", hash.String()), zap.Int("attempt", attempt+1)) - hex, err := m.api.Post(context.TODO(), *message) - if err != nil { - m.logger.Error("failed to retry sending message", zap.String("hash", hash.String()), zap.Int("attempt", attempt+1), zap.Error(err)) - if m.handler != nil { - m.handler.EnvelopeExpired(identifiers, err) - } - - } - envelopeID := types.BytesToHash(hex) - m.envelopes[envelopeID] = EnvelopePosted - m.messages[envelopeID] = message - m.attempts[envelopeID] = attempt + 1 - m.identifiers[envelopeID] = identifiers - } else { - m.logger.Debug("envelope expired", zap.String("hash", hash.String())) - if m.handler != nil { - m.handler.EnvelopeExpired(identifiers, err) - } - } - } -} - -func (m *EnvelopesMonitor) handleEventEnvelopeReceived(event types.EnvelopeEvent) { - if m.mailServerConfirmation { - if !m.isMailserver(event.Peer) { - return - } - } - m.mu.Lock() - defer m.mu.Unlock() - state, ok := m.envelopes[event.Hash] - if !ok || state != EnvelopePosted { - return - } - m.logger.Debug("expected envelope received", zap.String("hash", event.Hash.String()), zap.String("peer", event.Peer.String())) - m.envelopes[event.Hash] = EnvelopeSent - if m.handler != nil { - m.handler.EnvelopeSent(m.identifiers[event.Hash]) - } -} - -// clearMessageState removes all message and envelope state. -// not thread-safe, should be protected on a higher level. -func (m *EnvelopesMonitor) clearMessageState(envelopeID types.Hash) { - delete(m.envelopes, envelopeID) - delete(m.messages, envelopeID) - delete(m.attempts, envelopeID) - delete(m.identifiers, envelopeID) -} diff --git a/protocol/transport/waku/mailserver.go b/protocol/transport/waku/mailserver.go deleted file mode 100644 index b4419d10f..000000000 --- a/protocol/transport/waku/mailserver.go +++ /dev/null @@ -1,38 +0,0 @@ -package waku - -import ( - "encoding/hex" - "math/big" - - "github.com/google/uuid" - - "github.com/status-im/status-go/eth-node/types" -) - -func createMessagesRequest(from, to uint32, cursor []byte, topics []types.TopicType) types.MessagesRequest { - aUUID := uuid.New() - // uuid is 16 bytes, converted to hex it's 32 bytes as expected by types.MessagesRequest - id := []byte(hex.EncodeToString(aUUID[:])) - return types.MessagesRequest{ - ID: id, - From: from, - To: to, - Limit: 100, - Cursor: cursor, - Bloom: topicsToBloom(topics...), - } -} - -func topicsToBloom(topics ...types.TopicType) []byte { - i := new(big.Int) - for _, topic := range topics { - bloom := types.TopicToBloom(topic) - i.Or(i, new(big.Int).SetBytes(bloom[:])) - } - - combined := make([]byte, types.BloomFilterSize) - data := i.Bytes() - copy(combined[types.BloomFilterSize-len(data):], data[:]) - - return combined -} diff --git a/protocol/transport/waku/waku.go b/protocol/transport/waku/waku.go deleted file mode 100644 index 3a0eaa4ea..000000000 --- a/protocol/transport/waku/waku.go +++ /dev/null @@ -1,13 +0,0 @@ -package waku - -import ( - "github.com/status-im/status-go/eth-node/types" -) - -type RequestOptions struct { - Topics []types.TopicType - Password string - Limit int - From int64 // in seconds - To int64 // in seconds -} diff --git a/protocol/transport/waku/waku_service.go b/protocol/transport/waku/waku_service.go deleted file mode 100644 index 4aece734b..000000000 --- a/protocol/transport/waku/waku_service.go +++ /dev/null @@ -1,520 +0,0 @@ -package waku - -import ( - "bytes" - "context" - "crypto/ecdsa" - "database/sql" - "sync" - "time" - - "github.com/pkg/errors" - "go.uber.org/zap" - - "github.com/status-im/status-go/eth-node/crypto" - "github.com/status-im/status-go/eth-node/types" - "github.com/status-im/status-go/protocol/transport" -) - -var ( - // ErrNoMailservers returned if there is no configured mailservers that can be used. - ErrNoMailservers = errors.New("no configured mailservers") -) - -type wakuServiceKeysManager struct { - waku types.Waku - - // Identity of the current user. - privateKey *ecdsa.PrivateKey - - passToSymKeyMutex sync.RWMutex - passToSymKeyCache map[string]string -} - -func (m *wakuServiceKeysManager) AddOrGetKeyPair(priv *ecdsa.PrivateKey) (string, error) { - // caching is handled in waku - return m.waku.AddKeyPair(priv) -} - -func (m *wakuServiceKeysManager) AddOrGetSymKeyFromPassword(password string) (string, error) { - m.passToSymKeyMutex.Lock() - defer m.passToSymKeyMutex.Unlock() - - if val, ok := m.passToSymKeyCache[password]; ok { - return val, nil - } - - id, err := m.waku.AddSymKeyFromPassword(password) - if err != nil { - return id, err - } - - m.passToSymKeyCache[password] = id - - return id, nil -} - -func (m *wakuServiceKeysManager) RawSymKey(id string) ([]byte, error) { - return m.waku.GetSymKey(id) -} - -type Option func(*Transport) error - -// Transport is a transport based on Whisper service. -type Transport struct { - waku types.Waku - api types.PublicWakuAPI // only PublicWakuAPI implements logic to send messages - keysManager *wakuServiceKeysManager - filters *transport.FiltersManager - logger *zap.Logger - cache *transport.ProcessedMessageIDsCache - - mailservers []string - envelopesMonitor *EnvelopesMonitor - quit chan struct{} -} - -// NewTransport returns a new Transport. -// TODO: leaving a chat should verify that for a given public key -// there are no other chats. It may happen that we leave a private chat -// but still have a public chat for a given public key. -func NewTransport( - waku types.Waku, - privateKey *ecdsa.PrivateKey, - db *sql.DB, - mailservers []string, - envelopesMonitorConfig *transport.EnvelopesMonitorConfig, - logger *zap.Logger, - opts ...Option, -) (*Transport, error) { - filtersManager, err := transport.NewFiltersManager(newSQLitePersistence(db), waku, privateKey, logger) - if err != nil { - return nil, err - } - - var envelopesMonitor *EnvelopesMonitor - if envelopesMonitorConfig != nil { - envelopesMonitor = NewEnvelopesMonitor(waku, *envelopesMonitorConfig) - envelopesMonitor.Start() - } - - var api types.PublicWhisperAPI - if waku != nil { - api = waku.PublicWakuAPI() - } - t := &Transport{ - waku: waku, - api: api, - cache: transport.NewProcessedMessageIDsCache(db), - envelopesMonitor: envelopesMonitor, - quit: make(chan struct{}), - keysManager: &wakuServiceKeysManager{ - waku: waku, - privateKey: privateKey, - passToSymKeyCache: make(map[string]string), - }, - filters: filtersManager, - mailservers: mailservers, - logger: logger.With(zap.Namespace("Transport")), - } - - for _, opt := range opts { - if err := opt(t); err != nil { - return nil, err - } - } - - t.cleanFiltersLoop() - - return t, nil -} - -func (a *Transport) InitFilters(chatIDs []string, publicKeys []*ecdsa.PublicKey) ([]*transport.Filter, error) { - return a.filters.Init(chatIDs, publicKeys) -} - -func (a *Transport) InitPublicFilters(chatIDs []string) ([]*transport.Filter, error) { - return a.filters.InitPublicFilters(chatIDs) -} - -func (a *Transport) Filters() []*transport.Filter { - return a.filters.Filters() -} - -func (a *Transport) FilterByChatID(chatID string) *transport.Filter { - return a.filters.FilterByChatID(chatID) -} - -func (a *Transport) LoadFilters(filters []*transport.Filter) ([]*transport.Filter, error) { - return a.filters.InitWithFilters(filters) -} - -func (a *Transport) InitCommunityFilters(pks []*ecdsa.PrivateKey) ([]*transport.Filter, error) { - return a.filters.InitCommunityFilters(pks) -} - -func (a *Transport) RemoveFilters(filters []*transport.Filter) error { - return a.filters.Remove(filters...) -} - -func (a *Transport) RemoveFilterByChatID(chatID string) (*transport.Filter, error) { - return a.filters.RemoveFilterByChatID(chatID) -} - -func (a *Transport) ResetFilters() error { - return a.filters.Reset() -} - -func (a *Transport) ProcessNegotiatedSecret(secret types.NegotiatedSecret) (*transport.Filter, error) { - filter, err := a.filters.LoadNegotiated(secret) - if err != nil { - return nil, err - } - return filter, nil -} - -func (a *Transport) JoinPublic(chatID string) (*transport.Filter, error) { - return a.filters.LoadPublic(chatID) -} - -func (a *Transport) LeavePublic(chatID string) error { - chat := a.filters.Filter(chatID) - if chat != nil { - return nil - } - return a.filters.Remove(chat) -} - -func (a *Transport) JoinPrivate(publicKey *ecdsa.PublicKey) (*transport.Filter, error) { - return a.filters.LoadContactCode(publicKey) -} - -func (a *Transport) LeavePrivate(publicKey *ecdsa.PublicKey) error { - filters := a.filters.FiltersByPublicKey(publicKey) - return a.filters.Remove(filters...) -} - -func (a *Transport) JoinGroup(publicKeys []*ecdsa.PublicKey) ([]*transport.Filter, error) { - var filters []*transport.Filter - for _, pk := range publicKeys { - f, err := a.filters.LoadContactCode(pk) - if err != nil { - return nil, err - } - filters = append(filters, f) - - } - return filters, nil -} - -func (a *Transport) LeaveGroup(publicKeys []*ecdsa.PublicKey) error { - for _, publicKey := range publicKeys { - filters := a.filters.FiltersByPublicKey(publicKey) - if err := a.filters.Remove(filters...); err != nil { - return err - } - } - return nil -} - -func (a *Transport) RetrieveRawAll() (map[transport.Filter][]*types.Message, error) { - result := make(map[transport.Filter][]*types.Message) - - allFilters := a.filters.Filters() - for _, filter := range allFilters { - // Don't pull from filters we don't listen to - if !filter.Listen { - continue - } - msgs, err := a.api.GetFilterMessages(filter.FilterID) - if err != nil { - a.logger.Warn("failed to fetch messages", zap.Error(err)) - continue - } - if len(msgs) == 0 { - continue - } - - ids := make([]string, len(msgs)) - for i := range msgs { - id := types.EncodeHex(msgs[i].Hash) - ids[i] = id - } - - hits, err := a.cache.Hits(ids) - if err != nil { - a.logger.Error("failed to check messages exists", zap.Error(err)) - return nil, err - } - - for i := range msgs { - // Exclude anything that is a cache hit - if !hits[types.EncodeHex(msgs[i].Hash)] { - result[*filter] = append(result[*filter], msgs[i]) - } - } - - } - - return result, nil -} - -// SendPublic sends a new message using the Whisper service. -// For public filters, chat name is used as an ID as well as -// a topic. -func (a *Transport) SendPublic(ctx context.Context, newMessage *types.NewMessage, chatName string) ([]byte, error) { - if err := a.addSig(newMessage); err != nil { - return nil, err - } - - filter, err := a.filters.LoadPublic(chatName) - if err != nil { - return nil, err - } - - newMessage.SymKeyID = filter.SymKeyID - newMessage.Topic = filter.Topic - - return a.api.Post(ctx, *newMessage) -} - -func (a *Transport) SendPrivateWithSharedSecret(ctx context.Context, newMessage *types.NewMessage, publicKey *ecdsa.PublicKey, secret []byte) ([]byte, error) { - if err := a.addSig(newMessage); err != nil { - return nil, err - } - - filter, err := a.filters.LoadNegotiated(types.NegotiatedSecret{ - PublicKey: publicKey, - Key: secret, - }) - if err != nil { - return nil, err - } - - newMessage.SymKeyID = filter.SymKeyID - newMessage.Topic = filter.Topic - newMessage.PublicKey = nil - - return a.api.Post(ctx, *newMessage) -} - -func (a *Transport) SendPrivateWithPartitioned(ctx context.Context, newMessage *types.NewMessage, publicKey *ecdsa.PublicKey) ([]byte, error) { - if err := a.addSig(newMessage); err != nil { - return nil, err - } - - filter, err := a.filters.LoadPartitioned(publicKey, a.keysManager.privateKey, false) - if err != nil { - return nil, err - } - - newMessage.Topic = filter.Topic - newMessage.PublicKey = crypto.FromECDSAPub(publicKey) - - return a.api.Post(ctx, *newMessage) -} - -func (a *Transport) SendPrivateOnPersonalTopic(ctx context.Context, newMessage *types.NewMessage, publicKey *ecdsa.PublicKey) ([]byte, error) { - if err := a.addSig(newMessage); err != nil { - return nil, err - } - - filter, err := a.filters.LoadPersonal(publicKey, a.keysManager.privateKey, false) - if err != nil { - return nil, err - } - - newMessage.Topic = filter.Topic - newMessage.PublicKey = crypto.FromECDSAPub(publicKey) - - return a.api.Post(ctx, *newMessage) -} - -func (a *Transport) LoadKeyFilters(key *ecdsa.PrivateKey) (*transport.Filter, error) { - return a.filters.LoadEphemeral(&key.PublicKey, key, true) -} - -func (a *Transport) SendCommunityMessage(ctx context.Context, newMessage *types.NewMessage, publicKey *ecdsa.PublicKey) ([]byte, error) { - if err := a.addSig(newMessage); err != nil { - return nil, err - } - - // We load the filter to make sure we can post on it - filter, err := a.filters.LoadPublic(transport.PubkeyToHex(publicKey)[2:]) - if err != nil { - return nil, err - } - - newMessage.Topic = filter.Topic - newMessage.PublicKey = crypto.FromECDSAPub(publicKey) - - a.logger.Debug("SENDING message", zap.Binary("topic", filter.Topic[:])) - - return a.api.Post(ctx, *newMessage) -} - -func (a *Transport) cleanFilters() error { - return a.filters.RemoveNoListenFilters() -} - -func (a *Transport) addSig(newMessage *types.NewMessage) error { - sigID, err := a.keysManager.AddOrGetKeyPair(a.keysManager.privateKey) - if err != nil { - return err - } - newMessage.SigID = sigID - return nil -} - -func (a *Transport) Track(identifiers [][]byte, hash []byte, newMessage *types.NewMessage) { - if a.envelopesMonitor != nil { - a.envelopesMonitor.Add(identifiers, types.BytesToHash(hash), *newMessage) - } -} - -// GetCurrentTime returns the current unix timestamp in milliseconds -func (a *Transport) GetCurrentTime() uint64 { - return uint64(a.waku.GetCurrentTime().UnixNano() / int64(time.Millisecond)) -} - -func (a *Transport) MaxMessageSize() uint32 { - return a.waku.MaxMessageSize() -} - -func (a *Transport) Stop() error { - close(a.quit) - if a.envelopesMonitor != nil { - a.envelopesMonitor.Stop() - } - return nil -} - -// cleanFiltersLoop cleans up the topic we create for the only purpose -// of sending messages. -// Whenever we send a message we also need to listen to that particular topic -// but in case of asymettric topics, we are not interested in listening to them. -// We therefore periodically clean them up so we don't receive unnecessary data. - -func (a *Transport) cleanFiltersLoop() { - - ticker := time.NewTicker(5 * time.Minute) - go func() { - for { - select { - case <-a.quit: - ticker.Stop() - return - case <-ticker.C: - err := a.cleanFilters() - if err != nil { - a.logger.Error("failed to clean up topics", zap.Error(err)) - } - } - } - }() -} - -func (a *Transport) sendMessagesRequestForTopics( - ctx context.Context, - peerID []byte, - from, to uint32, - previousCursor []byte, - topics []types.TopicType, - waitForResponse bool, -) (cursor []byte, err error) { - - r := createMessagesRequest(from, to, previousCursor, topics) - r.SetDefaults(a.waku.GetCurrentTime()) - - events := make(chan types.EnvelopeEvent, 10) - sub := a.waku.SubscribeEnvelopeEvents(events) - defer sub.Unsubscribe() - - err = a.waku.SendMessagesRequest(peerID, r) - if err != nil { - return - } - - if !waitForResponse { - return - } - - resp, err := a.waitForRequestCompleted(ctx, r.ID, events) - if err == nil && resp != nil && resp.Error != nil { - err = resp.Error - } else if err == nil && resp != nil { - cursor = resp.Cursor - } - return -} - -// RequestHistoricMessages requests historic messages for all registered filters. -func (a *Transport) SendMessagesRequest( - ctx context.Context, - peerID []byte, - from, to uint32, - previousCursor []byte, - waitForResponse bool, -) (cursor []byte, err error) { - - topics := make([]types.TopicType, len(a.Filters())) - for _, f := range a.Filters() { - topics = append(topics, f.Topic) - } - - return a.sendMessagesRequestForTopics(ctx, peerID, from, to, previousCursor, topics, waitForResponse) -} - -func (a *Transport) SendMessagesRequestForFilter( - ctx context.Context, - peerID []byte, - from, to uint32, - previousCursor []byte, - filter *transport.Filter, - waitForResponse bool, -) (cursor []byte, err error) { - - topics := make([]types.TopicType, len(a.Filters())) - topics = append(topics, filter.Topic) - - return a.sendMessagesRequestForTopics(ctx, peerID, from, to, previousCursor, topics, waitForResponse) -} - -func (a *Transport) waitForRequestCompleted(ctx context.Context, requestID []byte, events chan types.EnvelopeEvent) (*types.MailServerResponse, error) { - for { - select { - case ev := <-events: - if !bytes.Equal(ev.Hash.Bytes(), requestID) { - continue - } - if ev.Event != types.EventMailServerRequestCompleted { - continue - } - data, ok := ev.Data.(*types.MailServerResponse) - if ok { - return data, nil - } - case <-ctx.Done(): - return nil, ctx.Err() - } - } -} - -// ConfirmMessagesProcessed marks the messages as processed in the cache so -// they won't be passed to the next layer anymore -func (a *Transport) ConfirmMessagesProcessed(ids []string, timestamp uint64) error { - return a.cache.Add(ids, timestamp) -} - -// CleanMessagesProcessed clears the messages that are older than timestamp -func (a *Transport) CleanMessagesProcessed(timestamp uint64) error { - return a.cache.Clean(timestamp) -} - -func (a *Transport) SetEnvelopeEventsHandler(handler transport.EnvelopeEventsHandler) error { - if a.envelopesMonitor == nil { - return errors.New("Current transport has no envelopes monitor") - } - a.envelopesMonitor.handler = handler - return nil -} diff --git a/protocol/transport/whisper/envelopes.go b/protocol/transport/whisper/envelopes.go deleted file mode 100644 index 1efadd74a..000000000 --- a/protocol/transport/whisper/envelopes.go +++ /dev/null @@ -1,311 +0,0 @@ -package whisper - -import ( - "context" - "errors" - "sync" - - "github.com/status-im/status-go/protocol/transport" - - "go.uber.org/zap" - - "github.com/status-im/status-go/eth-node/types" -) - -// EnvelopeState in local tracker -type EnvelopeState int - -const ( - // NotRegistered returned if asked hash wasn't registered in the tracker. - NotRegistered EnvelopeState = -1 - // EnvelopePosted is set when envelope was added to a local whisper queue. - EnvelopePosted EnvelopeState = iota - // EnvelopeSent is set when envelope is sent to at least one peer. - EnvelopeSent -) - -// EnvelopeEventsHandler used for two different event types. -type EnvelopeEventsHandler interface { - EnvelopeSent([][]byte) - EnvelopeExpired([][]byte, error) - MailServerRequestCompleted(types.Hash, types.Hash, []byte, error) - MailServerRequestExpired(types.Hash) -} - -// NewEnvelopesMonitor returns a pointer to an instance of the EnvelopesMonitor. -func NewEnvelopesMonitor(w types.Whisper, config transport.EnvelopesMonitorConfig) *EnvelopesMonitor { - logger := config.Logger - - if logger == nil { - logger = zap.NewNop() - } - - var whisperAPI types.PublicWhisperAPI - if w != nil { - whisperAPI = w.PublicWhisperAPI() - } - - return &EnvelopesMonitor{ - w: w, - whisperAPI: whisperAPI, - handler: config.EnvelopeEventsHandler, - mailServerConfirmation: config.MailserverConfirmationsEnabled, - maxAttempts: config.MaxAttempts, - isMailserver: config.IsMailserver, - logger: logger.With(zap.Namespace("EnvelopesMonitor")), - - // key is envelope hash (event.Hash) - envelopes: map[types.Hash]EnvelopeState{}, - messages: map[types.Hash]*types.NewMessage{}, - attempts: map[types.Hash]int{}, - identifiers: make(map[types.Hash][][]byte), - - // key is hash of the batch (event.Batch) - batches: map[types.Hash]map[types.Hash]struct{}{}, - } -} - -// EnvelopesMonitor is responsible for monitoring whisper envelopes state. -type EnvelopesMonitor struct { - w types.Whisper - whisperAPI types.PublicWhisperAPI - handler EnvelopeEventsHandler - mailServerConfirmation bool - maxAttempts int - - mu sync.Mutex - envelopes map[types.Hash]EnvelopeState - batches map[types.Hash]map[types.Hash]struct{} - - messages map[types.Hash]*types.NewMessage - attempts map[types.Hash]int - identifiers map[types.Hash][][]byte - - wg sync.WaitGroup - quit chan struct{} - isMailserver func(peer types.EnodeID) bool - - logger *zap.Logger -} - -// Start processing events. -func (m *EnvelopesMonitor) Start() { - m.quit = make(chan struct{}) - m.wg.Add(1) - go func() { - m.handleEnvelopeEvents() - m.wg.Done() - }() -} - -// Stop process events. -func (m *EnvelopesMonitor) Stop() { - close(m.quit) - m.wg.Wait() -} - -// Add hash to a tracker. -func (m *EnvelopesMonitor) Add(identifiers [][]byte, envelopeHash types.Hash, message types.NewMessage) { - m.mu.Lock() - defer m.mu.Unlock() - m.envelopes[envelopeHash] = EnvelopePosted - m.identifiers[envelopeHash] = identifiers - m.messages[envelopeHash] = &message - m.attempts[envelopeHash] = 1 -} - -func (m *EnvelopesMonitor) GetState(hash types.Hash) EnvelopeState { - m.mu.Lock() - defer m.mu.Unlock() - state, exist := m.envelopes[hash] - if !exist { - return NotRegistered - } - return state -} - -// handleEnvelopeEvents processes whisper envelope events -func (m *EnvelopesMonitor) handleEnvelopeEvents() { - events := make(chan types.EnvelopeEvent, 100) // must be buffered to prevent blocking whisper - sub := m.w.SubscribeEnvelopeEvents(events) - defer func() { - close(events) - sub.Unsubscribe() - }() - for { - select { - case <-m.quit: - return - case event := <-events: - m.handleEvent(event) - } - } -} - -// handleEvent based on type of the event either triggers -// confirmation handler or removes hash from tracker -func (m *EnvelopesMonitor) handleEvent(event types.EnvelopeEvent) { - handlers := map[types.EventType]func(types.EnvelopeEvent){ - types.EventEnvelopeSent: m.handleEventEnvelopeSent, - types.EventEnvelopeExpired: m.handleEventEnvelopeExpired, - types.EventBatchAcknowledged: m.handleAcknowledgedBatch, - types.EventEnvelopeReceived: m.handleEventEnvelopeReceived, - } - if handler, ok := handlers[event.Event]; ok { - handler(event) - } -} - -func (m *EnvelopesMonitor) handleEventEnvelopeSent(event types.EnvelopeEvent) { - if m.mailServerConfirmation { - if !m.isMailserver(event.Peer) { - return - } - } - - m.mu.Lock() - defer m.mu.Unlock() - - state, ok := m.envelopes[event.Hash] - // if we didn't send a message using extension - skip it - // if message was already confirmed - skip it - if !ok || state == EnvelopeSent { - return - } - m.logger.Debug("envelope is sent", zap.String("hash", event.Hash.String()), zap.String("peer", event.Peer.String())) - if event.Batch != (types.Hash{}) { - if _, ok := m.batches[event.Batch]; !ok { - m.batches[event.Batch] = map[types.Hash]struct{}{} - } - m.batches[event.Batch][event.Hash] = struct{}{} - m.logger.Debug("waiting for a confirmation", zap.String("batch", event.Batch.String())) - } else { - m.envelopes[event.Hash] = EnvelopeSent - if m.handler != nil { - m.handler.EnvelopeSent(m.identifiers[event.Hash]) - } - } -} - -func (m *EnvelopesMonitor) handleAcknowledgedBatch(event types.EnvelopeEvent) { - if m.mailServerConfirmation { - if !m.isMailserver(event.Peer) { - return - } - } - - m.mu.Lock() - defer m.mu.Unlock() - - envelopes, ok := m.batches[event.Batch] - if !ok { - m.logger.Debug("batch is not found", zap.String("batch", event.Batch.String())) - } - m.logger.Debug("received a confirmation", zap.String("batch", event.Batch.String()), zap.String("peer", event.Peer.String())) - envelopeErrors, ok := event.Data.([]types.EnvelopeError) - if event.Data != nil && !ok { - m.logger.Error("received unexpected data in the the confirmation event", zap.Any("data", event.Data)) - } - failedEnvelopes := map[types.Hash]struct{}{} - for i := range envelopeErrors { - envelopeError := envelopeErrors[i] - _, exist := m.envelopes[envelopeError.Hash] - if exist { - m.logger.Warn("envelope that was posted by us is discarded", zap.String("hash", envelopeError.Hash.String()), zap.String("peer", event.Peer.String()), zap.String("error", envelopeError.Description)) - var err error - switch envelopeError.Code { - case types.EnvelopeTimeNotSynced: - err = errors.New("envelope wasn't delivered due to time sync issues") - } - m.handleEnvelopeFailure(envelopeError.Hash, err) - } - failedEnvelopes[envelopeError.Hash] = struct{}{} - } - - for hash := range envelopes { - if _, exist := failedEnvelopes[hash]; exist { - continue - } - state, ok := m.envelopes[hash] - if !ok || state == EnvelopeSent { - continue - } - m.envelopes[hash] = EnvelopeSent - if m.handler != nil { - m.handler.EnvelopeSent(m.identifiers[hash]) - } - } - delete(m.batches, event.Batch) -} - -func (m *EnvelopesMonitor) handleEventEnvelopeExpired(event types.EnvelopeEvent) { - m.mu.Lock() - defer m.mu.Unlock() - m.handleEnvelopeFailure(event.Hash, errors.New("envelope expired due to connectivity issues")) -} - -// handleEnvelopeFailure is a common code path for processing envelopes failures. not thread safe, lock -// must be used on a higher level. -func (m *EnvelopesMonitor) handleEnvelopeFailure(hash types.Hash, err error) { - if state, ok := m.envelopes[hash]; ok { - message, exist := m.messages[hash] - if !exist { - m.logger.Error("message was deleted erroneously", zap.String("envelope hash", hash.String())) - } - attempt := m.attempts[hash] - identifiers := m.identifiers[hash] - m.clearMessageState(hash) - if state == EnvelopeSent { - return - } - if attempt < m.maxAttempts { - m.logger.Debug("retrying to send a message", zap.String("hash", hash.String()), zap.Int("attempt", attempt+1)) - hex, err := m.whisperAPI.Post(context.TODO(), *message) - if err != nil { - m.logger.Error("failed to retry sending message", zap.String("hash", hash.String()), zap.Int("attempt", attempt+1), zap.Error(err)) - if m.handler != nil { - m.handler.EnvelopeExpired(identifiers, err) - } - - } - envelopeID := types.BytesToHash(hex) - m.envelopes[envelopeID] = EnvelopePosted - m.messages[envelopeID] = message - m.attempts[envelopeID] = attempt + 1 - m.identifiers[envelopeID] = identifiers - } else { - m.logger.Debug("envelope expired", zap.String("hash", hash.String())) - if m.handler != nil { - m.handler.EnvelopeExpired(identifiers, err) - } - } - } -} - -func (m *EnvelopesMonitor) handleEventEnvelopeReceived(event types.EnvelopeEvent) { - if m.mailServerConfirmation { - if !m.isMailserver(event.Peer) { - return - } - } - m.mu.Lock() - defer m.mu.Unlock() - state, ok := m.envelopes[event.Hash] - if !ok || state != EnvelopePosted { - return - } - m.logger.Debug("expected envelope received", zap.String("hash", event.Hash.String()), zap.String("peer", event.Peer.String())) - m.envelopes[event.Hash] = EnvelopeSent - if m.handler != nil { - m.handler.EnvelopeSent(m.identifiers[event.Hash]) - } -} - -// clearMessageState removes all message and envelope state. -// not thread-safe, should be protected on a higher level. -func (m *EnvelopesMonitor) clearMessageState(envelopeID types.Hash) { - delete(m.envelopes, envelopeID) - delete(m.messages, envelopeID) - delete(m.attempts, envelopeID) - delete(m.identifiers, envelopeID) -} diff --git a/protocol/transport/whisper/envelopes_test.go b/protocol/transport/whisper/envelopes_test.go deleted file mode 100644 index aaae6b11e..000000000 --- a/protocol/transport/whisper/envelopes_test.go +++ /dev/null @@ -1,121 +0,0 @@ -package whisper - -import ( - "testing" - - "go.uber.org/zap" - - "github.com/stretchr/testify/suite" - - "github.com/ethereum/go-ethereum/p2p/enode" - - "github.com/status-im/status-go/eth-node/crypto" - "github.com/status-im/status-go/eth-node/types" - "github.com/status-im/status-go/protocol/transport" -) - -var ( - testHash = types.Hash{0x01} - testIDs = [][]byte{[]byte("id")} -) - -type EnvelopesMonitorSuite struct { - suite.Suite - - monitor *EnvelopesMonitor -} - -func TestEnvelopesMonitorSuite(t *testing.T) { - suite.Run(t, new(EnvelopesMonitorSuite)) -} - -func (s *EnvelopesMonitorSuite) SetupTest() { - s.monitor = NewEnvelopesMonitor( - nil, - transport.EnvelopesMonitorConfig{ - EnvelopeEventsHandler: nil, - MaxAttempts: 0, - MailserverConfirmationsEnabled: false, - IsMailserver: func(types.EnodeID) bool { return false }, - Logger: zap.NewNop(), - }, - ) -} - -func (s *EnvelopesMonitorSuite) TestConfirmed() { - s.monitor.Add(testIDs, testHash, types.NewMessage{}) - s.Contains(s.monitor.envelopes, testHash) - s.Equal(EnvelopePosted, s.monitor.envelopes[testHash]) - s.monitor.handleEvent(types.EnvelopeEvent{ - Event: types.EventEnvelopeSent, - Hash: testHash, - }) - s.Contains(s.monitor.envelopes, testHash) - s.Equal(EnvelopeSent, s.monitor.envelopes[testHash]) -} - -func (s *EnvelopesMonitorSuite) TestConfirmedWithAcknowledge() { - testBatch := types.Hash{1} - pkey, err := crypto.GenerateKey() - s.Require().NoError(err) - node := enode.NewV4(&pkey.PublicKey, nil, 0, 0) - s.monitor.Add(testIDs, testHash, types.NewMessage{}) - s.Contains(s.monitor.envelopes, testHash) - s.Equal(EnvelopePosted, s.monitor.envelopes[testHash]) - s.monitor.handleEvent(types.EnvelopeEvent{ - Event: types.EventEnvelopeSent, - Hash: testHash, - Batch: testBatch, - }) - s.Equal(EnvelopePosted, s.monitor.envelopes[testHash]) - s.monitor.handleEvent(types.EnvelopeEvent{ - Event: types.EventBatchAcknowledged, - Batch: testBatch, - Peer: types.EnodeID(node.ID()), - }) - s.Contains(s.monitor.envelopes, testHash) - s.Equal(EnvelopeSent, s.monitor.envelopes[testHash]) -} - -func (s *EnvelopesMonitorSuite) TestIgnored() { - s.monitor.handleEvent(types.EnvelopeEvent{ - Event: types.EventEnvelopeSent, - Hash: testHash, - }) - s.NotContains(s.monitor.envelopes, testHash) -} - -func (s *EnvelopesMonitorSuite) TestRemoved() { - s.monitor.Add(testIDs, testHash, types.NewMessage{}) - s.Contains(s.monitor.envelopes, testHash) - s.monitor.handleEvent(types.EnvelopeEvent{ - Event: types.EventEnvelopeExpired, - Hash: testHash, - }) - s.NotContains(s.monitor.envelopes, testHash) -} - -func (s *EnvelopesMonitorSuite) TestIgnoreNotFromMailserver() { - // enables filter in the tracker to drop confirmations from non-mailserver peers - s.monitor.mailServerConfirmation = true - s.monitor.Add(testIDs, testHash, types.NewMessage{}) - s.monitor.handleEvent(types.EnvelopeEvent{ - Event: types.EventEnvelopeSent, - Hash: testHash, - Peer: types.EnodeID{1}, // could be empty, doesn't impact test behaviour - }) - s.Require().Equal(EnvelopePosted, s.monitor.GetState(testHash)) -} - -func (s *EnvelopesMonitorSuite) TestReceived() { - s.monitor.isMailserver = func(peer types.EnodeID) bool { - return true - } - s.monitor.Add(testIDs, testHash, types.NewMessage{}) - s.Contains(s.monitor.envelopes, testHash) - s.monitor.handleEvent(types.EnvelopeEvent{ - Event: types.EventEnvelopeReceived, - Hash: testHash, - }) - s.Require().Equal(EnvelopeSent, s.monitor.GetState(testHash)) -} diff --git a/protocol/transport/whisper/mailserver.go b/protocol/transport/whisper/mailserver.go deleted file mode 100644 index d630b69ab..000000000 --- a/protocol/transport/whisper/mailserver.go +++ /dev/null @@ -1,38 +0,0 @@ -package whisper - -import ( - "encoding/hex" - "math/big" - - "github.com/google/uuid" - - "github.com/status-im/status-go/eth-node/types" -) - -func createMessagesRequest(from, to uint32, cursor []byte, topics []types.TopicType) types.MessagesRequest { - aUUID := uuid.New() - // uuid is 16 bytes, converted to hex it's 32 bytes as expected by types.MessagesRequest - id := []byte(hex.EncodeToString(aUUID[:])) - return types.MessagesRequest{ - ID: id, - From: from, - To: to, - Limit: 100, - Cursor: cursor, - Bloom: topicsToBloom(topics...), - } -} - -func topicsToBloom(topics ...types.TopicType) []byte { - i := new(big.Int) - for _, topic := range topics { - bloom := types.TopicToBloom(topic) - i.Or(i, new(big.Int).SetBytes(bloom[:])) - } - - combined := make([]byte, types.BloomFilterSize) - data := i.Bytes() - copy(combined[types.BloomFilterSize-len(data):], data[:]) - - return combined -} diff --git a/protocol/transport/whisper/migrations/migrations.go b/protocol/transport/whisper/migrations/migrations.go deleted file mode 100644 index ff60d2d6b..000000000 --- a/protocol/transport/whisper/migrations/migrations.go +++ /dev/null @@ -1,319 +0,0 @@ -// Code generated by go-bindata. DO NOT EDIT. -// sources: -// 1561059285_add_whisper_keys.down.sql (25B) -// 1561059285_add_whisper_keys.up.sql (112B) -// doc.go (373B) - -package sqlite - -import ( - "bytes" - "compress/gzip" - "crypto/sha256" - "fmt" - "io" - "io/ioutil" - "os" - "path/filepath" - "strings" - "time" -) - -func bindataRead(data []byte, name string) ([]byte, error) { - gz, err := gzip.NewReader(bytes.NewBuffer(data)) - if err != nil { - return nil, fmt.Errorf("read %q: %v", name, err) - } - - var buf bytes.Buffer - _, err = io.Copy(&buf, gz) - clErr := gz.Close() - - if err != nil { - return nil, fmt.Errorf("read %q: %v", name, err) - } - if clErr != nil { - return nil, err - } - - return buf.Bytes(), nil -} - -type asset struct { - bytes []byte - info os.FileInfo - digest [sha256.Size]byte -} - -type bindataFileInfo struct { - name string - size int64 - mode os.FileMode - modTime time.Time -} - -func (fi bindataFileInfo) Name() string { - return fi.name -} -func (fi bindataFileInfo) Size() int64 { - return fi.size -} -func (fi bindataFileInfo) Mode() os.FileMode { - return fi.mode -} -func (fi bindataFileInfo) ModTime() time.Time { - return fi.modTime -} -func (fi bindataFileInfo) IsDir() bool { - return false -} -func (fi bindataFileInfo) Sys() interface{} { - return nil -} - -var __1561059285_add_whisper_keysDownSql = []byte("\x1f\x8b\x08\x00\x00\x00\x00\x00\x00\xff\x72\x09\xf2\x0f\x50\x08\x71\x74\xf2\x71\x55\x28\xcf\xc8\x2c\x2e\x48\x2d\x8a\xcf\x4e\xad\x2c\xb6\xe6\x02\x04\x00\x00\xff\xff\x42\x93\x8e\x79\x19\x00\x00\x00") - -func _1561059285_add_whisper_keysDownSqlBytes() ([]byte, error) { - return bindataRead( - __1561059285_add_whisper_keysDownSql, - "1561059285_add_whisper_keys.down.sql", - ) -} - -func _1561059285_add_whisper_keysDownSql() (*asset, error) { - bytes, err := _1561059285_add_whisper_keysDownSqlBytes() - if err != nil { - return nil, err - } - - info := bindataFileInfo{name: "1561059285_add_whisper_keys.down.sql", size: 25, mode: os.FileMode(0644), modTime: time.Unix(1610115164, 0)} - a := &asset{bytes: bytes, info: info, digest: [32]uint8{0xb9, 0x31, 0x3f, 0xce, 0xfa, 0x44, 0x36, 0x1b, 0xb0, 0xec, 0x5d, 0xb, 0x90, 0xb, 0x21, 0x4f, 0xd5, 0xe5, 0x50, 0xed, 0xc7, 0x43, 0xdf, 0x83, 0xb4, 0x3a, 0xc1, 0x55, 0x2e, 0x53, 0x7c, 0x67}} - return a, nil -} - -var __1561059285_add_whisper_keysUpSql = []byte("\x1f\x8b\x08\x00\x00\x00\x00\x00\x00\xff\x04\xc0\xb1\x0a\xc2\x40\x0c\x06\xe0\xfd\x9e\xe2\x1f\x15\x7c\x03\xa7\xde\x19\x35\x18\x13\x09\x29\xb5\x53\x11\x3d\x68\xe9\x22\x56\x90\xbe\xbd\x5f\x71\x6a\x82\x10\x4d\x16\xc2\x6f\x9c\x96\x77\xfd\x0c\x73\x5d\x17\x6c\x12\xf0\x1c\x1f\xdf\x61\x7a\x21\xe8\x1e\xb8\x39\x5f\x1b\xef\x71\xa1\x1e\xa6\x28\xa6\x47\xe1\x12\xe0\x93\x9a\xd3\x2e\x01\x73\x5d\x91\xc5\x32\xd4\x02\xda\x8a\xa4\x2d\x3a\x8e\xb3\xb5\x01\xb7\x8e\x0f\xfb\xf4\x0f\x00\x00\xff\xff\x6e\x23\x28\x7d\x70\x00\x00\x00") - -func _1561059285_add_whisper_keysUpSqlBytes() ([]byte, error) { - return bindataRead( - __1561059285_add_whisper_keysUpSql, - "1561059285_add_whisper_keys.up.sql", - ) -} - -func _1561059285_add_whisper_keysUpSql() (*asset, error) { - bytes, err := _1561059285_add_whisper_keysUpSqlBytes() - if err != nil { - return nil, err - } - - info := bindataFileInfo{name: "1561059285_add_whisper_keys.up.sql", size: 112, mode: os.FileMode(0644), modTime: time.Unix(1610115164, 0)} - a := &asset{bytes: bytes, info: info, digest: [32]uint8{0x25, 0x41, 0xc, 0x92, 0xdd, 0x9e, 0xff, 0x5d, 0xd0, 0x93, 0xe4, 0x24, 0x50, 0x29, 0xcf, 0xc6, 0xf7, 0x49, 0x3c, 0x73, 0xd9, 0x8c, 0xfa, 0xf2, 0xcf, 0xf6, 0x6f, 0xbc, 0x31, 0xe6, 0xf7, 0xe2}} - return a, nil -} - -var _docGo = []byte("\x1f\x8b\x08\x00\x00\x00\x00\x00\x00\xff\x84\x8f\x3d\x72\xeb\x30\x0c\x84\x7b\x9d\x62\xc7\x8d\x9b\x27\xb2\x79\x55\xba\x94\xe9\x73\x01\x98\x5a\x91\x18\x4b\xa4\x42\xc0\x7f\xb7\xcf\xc8\xe3\xc2\x5d\xda\x1d\x7c\x1f\x76\x63\xc4\x77\x51\xc3\xac\x0b\xa1\x86\xca\x44\x33\xe9\x0f\x9c\x98\xe4\x62\xc4\x21\xab\x97\xcb\x29\xa4\xb6\x46\x73\xf1\x8b\x8d\xba\xc6\x55\x73\x17\x67\xbc\xfe\x3f\x0c\x31\x22\x49\x3d\x3a\x8a\xd4\x69\xe1\xd3\x65\x30\x97\xee\x5a\x33\x6e\xea\x05\x82\xad\x73\xd6\x7b\xc0\xa7\x63\xa1\x98\xc3\x8b\xf8\xd1\xe0\x85\x48\x62\xdc\x35\x73\xeb\xc8\x6d\x3c\x69\x9d\xc4\x25\xec\xd1\xd7\xfc\x96\xec\x0d\x93\x2c\x0b\x27\xcc\xbd\xad\x4f\xd6\x64\x25\x26\xed\x4c\xde\xfa\xe3\x1f\xc4\x8c\x8e\x2a\x2b\x6d\xe7\x8b\x5c\x89\xda\x5e\xef\x21\x75\xfa\x7b\x11\x6e\xad\x9f\x0d\x62\xe0\x7d\x63\x72\x4e\x61\x18\x36\x49\x67\xc9\x84\xfd\x2c\xea\x1c\x86\x18\x73\xfb\xc8\xac\xdc\xa9\xf7\x8e\xe3\x76\xce\xaf\x2b\x8c\x0d\x21\xbc\xd4\xda\xaa\x85\xdc\x10\x86\xdf\x00\x00\x00\xff\xff\x21\xa5\x75\x05\x75\x01\x00\x00") - -func docGoBytes() ([]byte, error) { - return bindataRead( - _docGo, - "doc.go", - ) -} - -func docGo() (*asset, error) { - bytes, err := docGoBytes() - if err != nil { - return nil, err - } - - info := bindataFileInfo{name: "doc.go", size: 373, mode: os.FileMode(0644), modTime: time.Unix(1610115164, 0)} - a := &asset{bytes: bytes, info: info, digest: [32]uint8{0x23, 0x6a, 0xc1, 0xce, 0x94, 0xf6, 0xef, 0xf1, 0x97, 0x95, 0xb, 0x35, 0xaf, 0x5f, 0xe7, 0x5f, 0xac, 0x6e, 0xb8, 0xab, 0xba, 0xb5, 0x35, 0x97, 0x22, 0x36, 0x11, 0xce, 0x44, 0xfc, 0xfa, 0xac}} - return a, nil -} - -// Asset loads and returns the asset for the given name. -// It returns an error if the asset could not be found or -// could not be loaded. -func Asset(name string) ([]byte, error) { - canonicalName := strings.Replace(name, "\\", "/", -1) - if f, ok := _bindata[canonicalName]; ok { - a, err := f() - if err != nil { - return nil, fmt.Errorf("Asset %s can't read by error: %v", name, err) - } - return a.bytes, nil - } - return nil, fmt.Errorf("Asset %s not found", name) -} - -// AssetString returns the asset contents as a string (instead of a []byte). -func AssetString(name string) (string, error) { - data, err := Asset(name) - return string(data), err -} - -// MustAsset is like Asset but panics when Asset would return an error. -// It simplifies safe initialization of global variables. -func MustAsset(name string) []byte { - a, err := Asset(name) - if err != nil { - panic("asset: Asset(" + name + "): " + err.Error()) - } - - return a -} - -// MustAssetString is like AssetString but panics when Asset would return an -// error. It simplifies safe initialization of global variables. -func MustAssetString(name string) string { - return string(MustAsset(name)) -} - -// AssetInfo loads and returns the asset info for the given name. -// It returns an error if the asset could not be found or -// could not be loaded. -func AssetInfo(name string) (os.FileInfo, error) { - canonicalName := strings.Replace(name, "\\", "/", -1) - if f, ok := _bindata[canonicalName]; ok { - a, err := f() - if err != nil { - return nil, fmt.Errorf("AssetInfo %s can't read by error: %v", name, err) - } - return a.info, nil - } - return nil, fmt.Errorf("AssetInfo %s not found", name) -} - -// AssetDigest returns the digest of the file with the given name. It returns an -// error if the asset could not be found or the digest could not be loaded. -func AssetDigest(name string) ([sha256.Size]byte, error) { - canonicalName := strings.Replace(name, "\\", "/", -1) - if f, ok := _bindata[canonicalName]; ok { - a, err := f() - if err != nil { - return [sha256.Size]byte{}, fmt.Errorf("AssetDigest %s can't read by error: %v", name, err) - } - return a.digest, nil - } - return [sha256.Size]byte{}, fmt.Errorf("AssetDigest %s not found", name) -} - -// Digests returns a map of all known files and their checksums. -func Digests() (map[string][sha256.Size]byte, error) { - mp := make(map[string][sha256.Size]byte, len(_bindata)) - for name := range _bindata { - a, err := _bindata[name]() - if err != nil { - return nil, err - } - mp[name] = a.digest - } - return mp, nil -} - -// AssetNames returns the names of the assets. -func AssetNames() []string { - names := make([]string, 0, len(_bindata)) - for name := range _bindata { - names = append(names, name) - } - return names -} - -// _bindata is a table, holding each asset generator, mapped to its name. -var _bindata = map[string]func() (*asset, error){ - "1561059285_add_whisper_keys.down.sql": _1561059285_add_whisper_keysDownSql, - - "1561059285_add_whisper_keys.up.sql": _1561059285_add_whisper_keysUpSql, - - "doc.go": docGo, -} - -// AssetDir returns the file names below a certain -// directory embedded in the file by go-bindata. -// For example if you run go-bindata on data/... and data contains the -// following hierarchy: -// data/ -// foo.txt -// img/ -// a.png -// b.png -// then AssetDir("data") would return []string{"foo.txt", "img"}, -// AssetDir("data/img") would return []string{"a.png", "b.png"}, -// AssetDir("foo.txt") and AssetDir("notexist") would return an error, and -// AssetDir("") will return []string{"data"}. -func AssetDir(name string) ([]string, error) { - node := _bintree - if len(name) != 0 { - canonicalName := strings.Replace(name, "\\", "/", -1) - pathList := strings.Split(canonicalName, "/") - for _, p := range pathList { - node = node.Children[p] - if node == nil { - return nil, fmt.Errorf("Asset %s not found", name) - } - } - } - if node.Func != nil { - return nil, fmt.Errorf("Asset %s not found", name) - } - rv := make([]string, 0, len(node.Children)) - for childName := range node.Children { - rv = append(rv, childName) - } - return rv, nil -} - -type bintree struct { - Func func() (*asset, error) - Children map[string]*bintree -} - -var _bintree = &bintree{nil, map[string]*bintree{ - "1561059285_add_whisper_keys.down.sql": &bintree{_1561059285_add_whisper_keysDownSql, map[string]*bintree{}}, - "1561059285_add_whisper_keys.up.sql": &bintree{_1561059285_add_whisper_keysUpSql, map[string]*bintree{}}, - "doc.go": &bintree{docGo, map[string]*bintree{}}, -}} - -// RestoreAsset restores an asset under the given directory. -func RestoreAsset(dir, name string) error { - data, err := Asset(name) - if err != nil { - return err - } - info, err := AssetInfo(name) - if err != nil { - return err - } - err = os.MkdirAll(_filePath(dir, filepath.Dir(name)), os.FileMode(0755)) - if err != nil { - return err - } - err = ioutil.WriteFile(_filePath(dir, name), data, info.Mode()) - if err != nil { - return err - } - return os.Chtimes(_filePath(dir, name), info.ModTime(), info.ModTime()) -} - -// RestoreAssets restores an asset under the given directory recursively. -func RestoreAssets(dir, name string) error { - children, err := AssetDir(name) - // File - if err != nil { - return RestoreAsset(dir, name) - } - // Dir - for _, child := range children { - err = RestoreAssets(dir, filepath.Join(name, child)) - if err != nil { - return err - } - } - return nil -} - -func _filePath(dir, name string) string { - canonicalName := strings.Replace(name, "\\", "/", -1) - return filepath.Join(append([]string{dir}, strings.Split(canonicalName, "/")...)...) -} diff --git a/protocol/transport/whisper/migrations/sqlite/1561059285_add_whisper_keys.down.sql b/protocol/transport/whisper/migrations/sqlite/1561059285_add_whisper_keys.down.sql deleted file mode 100644 index f0761da00..000000000 --- a/protocol/transport/whisper/migrations/sqlite/1561059285_add_whisper_keys.down.sql +++ /dev/null @@ -1 +0,0 @@ -DROP TABLE whisper_keys; diff --git a/protocol/transport/whisper/migrations/sqlite/1561059285_add_whisper_keys.up.sql b/protocol/transport/whisper/migrations/sqlite/1561059285_add_whisper_keys.up.sql deleted file mode 100644 index 844e77282..000000000 --- a/protocol/transport/whisper/migrations/sqlite/1561059285_add_whisper_keys.up.sql +++ /dev/null @@ -1,4 +0,0 @@ -CREATE TABLE whisper_keys ( - chat_id TEXT PRIMARY KEY ON CONFLICT IGNORE, - key BLOB NOT NULL -) WITHOUT ROWID; diff --git a/protocol/transport/whisper/migrations/sqlite/doc.go b/protocol/transport/whisper/migrations/sqlite/doc.go deleted file mode 100644 index c3e0f561d..000000000 --- a/protocol/transport/whisper/migrations/sqlite/doc.go +++ /dev/null @@ -1,9 +0,0 @@ -// This file is necessary because "github.com/status-im/migrate/v4" -// can't handle files starting with a prefix. At least that's the case -// for go-bindata. -// If go-bindata is called from the same directory, asset names -// have no prefix and "github.com/status-im/migrate/v4" works as expected. - -package sqlite - -//go:generate go-bindata -pkg sqlite -o ../migrations.go . diff --git a/protocol/transport/whisper/persistence.go b/protocol/transport/whisper/persistence.go deleted file mode 100644 index fe9062de9..000000000 --- a/protocol/transport/whisper/persistence.go +++ /dev/null @@ -1,58 +0,0 @@ -package whisper - -import ( - "database/sql" -) - -type sqlitePersistence struct { - db *sql.DB -} - -func newSQLitePersistence(db *sql.DB) *sqlitePersistence { - return &sqlitePersistence{db: db} -} - -func (s *sqlitePersistence) Add(chatID string, key []byte) error { - statement := "INSERT INTO whisper_keys(chat_id, key) VALUES(?, ?)" - stmt, err := s.db.Prepare(statement) - if err != nil { - return err - } - defer stmt.Close() - - _, err = stmt.Exec(chatID, key) - return err -} - -func (s *sqlitePersistence) All() (map[string][]byte, error) { - keys := make(map[string][]byte) - - statement := "SELECT chat_id, key FROM whisper_keys" - - stmt, err := s.db.Prepare(statement) - if err != nil { - return nil, err - } - defer stmt.Close() - - rows, err := stmt.Query() - if err != nil && err != sql.ErrNoRows { - return nil, err - } - defer rows.Close() - - for rows.Next() { - var ( - chatID string - key []byte - ) - - err := rows.Scan(&chatID, &key) - if err != nil { - return nil, err - } - keys[chatID] = key - } - - return keys, nil -} diff --git a/protocol/transport/whisper/whisper.go b/protocol/transport/whisper/whisper.go deleted file mode 100644 index ef4970900..000000000 --- a/protocol/transport/whisper/whisper.go +++ /dev/null @@ -1,13 +0,0 @@ -package whisper - -import ( - "github.com/status-im/status-go/eth-node/types" -) - -type RequestOptions struct { - Topics []types.TopicType - Password string - Limit int - From int64 // in seconds - To int64 // in seconds -} diff --git a/protocol/transport/whisper/whisper_service.go b/protocol/transport/whisper/whisper_service.go deleted file mode 100644 index 5319470a9..000000000 --- a/protocol/transport/whisper/whisper_service.go +++ /dev/null @@ -1,514 +0,0 @@ -package whisper - -import ( - "bytes" - "context" - "crypto/ecdsa" - "database/sql" - "sync" - "time" - - "github.com/pkg/errors" - "go.uber.org/zap" - - "github.com/status-im/status-go/eth-node/crypto" - "github.com/status-im/status-go/eth-node/types" - "github.com/status-im/status-go/protocol/transport" -) - -var ( - // ErrNoMailservers returned if there is no configured mailservers that can be used. - ErrNoMailservers = errors.New("no configured mailservers") -) - -type whisperServiceKeysManager struct { - shh types.Whisper - - // Identity of the current user. - privateKey *ecdsa.PrivateKey - - passToSymKeyMutex sync.RWMutex - passToSymKeyCache map[string]string -} - -func (m *whisperServiceKeysManager) AddOrGetKeyPair(priv *ecdsa.PrivateKey) (string, error) { - // caching is handled in Whisper - return m.shh.AddKeyPair(priv) -} - -func (m *whisperServiceKeysManager) AddOrGetSymKeyFromPassword(password string) (string, error) { - m.passToSymKeyMutex.Lock() - defer m.passToSymKeyMutex.Unlock() - - if val, ok := m.passToSymKeyCache[password]; ok { - return val, nil - } - - id, err := m.shh.AddSymKeyFromPassword(password) - if err != nil { - return id, err - } - - m.passToSymKeyCache[password] = id - - return id, nil -} - -func (m *whisperServiceKeysManager) RawSymKey(id string) ([]byte, error) { - return m.shh.GetSymKey(id) -} - -type Option func(*Transport) error - -// Transport is a transport based on Whisper service. -type Transport struct { - shh types.Whisper - shhAPI types.PublicWhisperAPI // only PublicWhisperAPI implements logic to send messages - keysManager *whisperServiceKeysManager - filters *transport.FiltersManager - logger *zap.Logger - - mailservers []string - envelopesMonitor *EnvelopesMonitor -} - -// NewTransport returns a new Transport. -// TODO: leaving a chat should verify that for a given public key -// there are no other chats. It may happen that we leave a private chat -// but still have a public chat for a given public key. -func NewTransport( - shh types.Whisper, - privateKey *ecdsa.PrivateKey, - db *sql.DB, - mailservers []string, - envelopesMonitorConfig *transport.EnvelopesMonitorConfig, - logger *zap.Logger, - opts ...Option, -) (*Transport, error) { - filtersManager, err := transport.NewFiltersManager(newSQLitePersistence(db), shh, privateKey, logger) - if err != nil { - return nil, err - } - - var envelopesMonitor *EnvelopesMonitor - if envelopesMonitorConfig != nil { - envelopesMonitor = NewEnvelopesMonitor(shh, *envelopesMonitorConfig) - envelopesMonitor.Start() - } - - var shhAPI types.PublicWhisperAPI - if shh != nil { - shhAPI = shh.PublicWhisperAPI() - } - t := &Transport{ - shh: shh, - shhAPI: shhAPI, - envelopesMonitor: envelopesMonitor, - keysManager: &whisperServiceKeysManager{ - shh: shh, - privateKey: privateKey, - passToSymKeyCache: make(map[string]string), - }, - filters: filtersManager, - mailservers: mailservers, - logger: logger.With(zap.Namespace("Transport")), - } - - for _, opt := range opts { - if err := opt(t); err != nil { - return nil, err - } - } - - return t, nil -} - -func (a *Transport) InitFilters(chatIDs []string, publicKeys []*ecdsa.PublicKey) ([]*transport.Filter, error) { - return a.filters.Init(chatIDs, publicKeys) -} - -func (a *Transport) InitPublicFilters(chatIDs []string) ([]*transport.Filter, error) { - return a.filters.InitPublicFilters(chatIDs) -} - -func (a *Transport) InitCommunityFilters(pks []*ecdsa.PrivateKey) ([]*transport.Filter, error) { - return a.filters.InitCommunityFilters(pks) -} - -func (a *Transport) Filters() []*transport.Filter { - return a.filters.Filters() -} - -func (a *Transport) FilterByChatID(chatID string) *transport.Filter { - return a.filters.FilterByChatID(chatID) -} - -func (a *Transport) LoadFilters(filters []*transport.Filter) ([]*transport.Filter, error) { - return a.filters.InitWithFilters(filters) -} - -func (a *Transport) SendCommunityMessage(ctx context.Context, newMessage *types.NewMessage, publicKey *ecdsa.PublicKey) ([]byte, error) { - if err := a.addSig(newMessage); err != nil { - return nil, err - } - - // We load the filter to make sure we can post on it - filter, err := a.filters.LoadPublic(transport.PubkeyToHex(publicKey)) - if err != nil { - return nil, err - } - - newMessage.Topic = filter.Topic - newMessage.PublicKey = crypto.FromECDSAPub(publicKey) - - return a.shhAPI.Post(ctx, *newMessage) -} - -func (a *Transport) RemoveFilters(filters []*transport.Filter) error { - return a.filters.Remove(filters...) -} - -func (a *Transport) RemoveFilterByChatID(chatID string) (*transport.Filter, error) { - return a.filters.RemoveFilterByChatID(chatID) -} - -func (a *Transport) ResetFilters() error { - return a.filters.Reset() -} - -func (a *Transport) ProcessNegotiatedSecret(secret types.NegotiatedSecret) (*transport.Filter, error) { - filter, err := a.filters.LoadNegotiated(secret) - if err != nil { - return nil, err - } - return filter, nil -} - -func (a *Transport) JoinPublic(chatID string) (*transport.Filter, error) { - return a.filters.LoadPublic(chatID) -} - -func (a *Transport) LeavePublic(chatID string) error { - chat := a.filters.Filter(chatID) - if chat != nil { - return nil - } - return a.filters.Remove(chat) -} - -func (a *Transport) JoinPrivate(publicKey *ecdsa.PublicKey) (*transport.Filter, error) { - return a.filters.LoadContactCode(publicKey) -} - -func (a *Transport) LeavePrivate(publicKey *ecdsa.PublicKey) error { - filters := a.filters.FiltersByPublicKey(publicKey) - return a.filters.Remove(filters...) -} - -func (a *Transport) JoinGroup(publicKeys []*ecdsa.PublicKey) ([]*transport.Filter, error) { - var filters []*transport.Filter - for _, pk := range publicKeys { - f, err := a.filters.LoadContactCode(pk) - if err != nil { - return nil, err - } - filters = append(filters, f) - } - return filters, nil -} - -func (a *Transport) LeaveGroup(publicKeys []*ecdsa.PublicKey) error { - for _, publicKey := range publicKeys { - filters := a.filters.FiltersByPublicKey(publicKey) - if err := a.filters.Remove(filters...); err != nil { - return err - } - } - return nil -} - -type Message struct { - Message *types.Message - Public bool -} - -func (a *Transport) RetrieveAllMessages() ([]Message, error) { - var messages []Message - - for _, filter := range a.filters.Filters() { - filterMsgs, err := a.shhAPI.GetFilterMessages(filter.FilterID) - if err != nil { - return nil, err - } - - for _, m := range filterMsgs { - messages = append(messages, Message{ - Message: m, - Public: filter.IsPublic(), - }) - } - } - - return messages, nil -} - -func (a *Transport) RetrievePublicMessages(chatID string) ([]*types.Message, error) { - filter, err := a.filters.LoadPublic(chatID) - if err != nil { - return nil, err - } - - return a.shhAPI.GetFilterMessages(filter.FilterID) -} - -func (a *Transport) RetrievePrivateMessages(publicKey *ecdsa.PublicKey) ([]*types.Message, error) { - chats := a.filters.FiltersByPublicKey(publicKey) - discoveryChats, err := a.filters.Init(nil, nil) - if err != nil { - return nil, err - } - - var result []*types.Message - - for _, chat := range append(chats, discoveryChats...) { - filterMsgs, err := a.shhAPI.GetFilterMessages(chat.FilterID) - if err != nil { - return nil, err - } - - result = append(result, filterMsgs...) - } - - return result, nil -} - -func (a *Transport) RetrieveRawAll() (map[transport.Filter][]*types.Message, error) { - result := make(map[transport.Filter][]*types.Message) - - allFilters := a.filters.Filters() - for _, filter := range allFilters { - msgs, err := a.shhAPI.GetFilterMessages(filter.FilterID) - if err != nil { - continue - } - result[*filter] = append(result[*filter], msgs...) - } - - return result, nil -} - -// SendPublic sends a new message using the Whisper service. -// For public filters, chat name is used as an ID as well as -// a topic. -func (a *Transport) SendPublic(ctx context.Context, newMessage *types.NewMessage, chatName string) ([]byte, error) { - if err := a.addSig(newMessage); err != nil { - return nil, err - } - - filter, err := a.filters.LoadPublic(chatName) - if err != nil { - return nil, err - } - - newMessage.SymKeyID = filter.SymKeyID - newMessage.Topic = filter.Topic - - return a.shhAPI.Post(ctx, *newMessage) -} - -func (a *Transport) SendPrivateWithSharedSecret(ctx context.Context, newMessage *types.NewMessage, publicKey *ecdsa.PublicKey, secret []byte) ([]byte, error) { - if err := a.addSig(newMessage); err != nil { - return nil, err - } - - filter, err := a.filters.LoadNegotiated(types.NegotiatedSecret{ - PublicKey: publicKey, - Key: secret, - }) - if err != nil { - return nil, err - } - - newMessage.SymKeyID = filter.SymKeyID - newMessage.Topic = filter.Topic - newMessage.PublicKey = nil - - return a.shhAPI.Post(ctx, *newMessage) -} - -func (a *Transport) SendPrivateWithPartitioned(ctx context.Context, newMessage *types.NewMessage, publicKey *ecdsa.PublicKey) ([]byte, error) { - if err := a.addSig(newMessage); err != nil { - return nil, err - } - - filter, err := a.filters.LoadPartitioned(publicKey, a.keysManager.privateKey, false) - if err != nil { - return nil, err - } - - newMessage.Topic = filter.Topic - newMessage.PublicKey = crypto.FromECDSAPub(publicKey) - - return a.shhAPI.Post(ctx, *newMessage) -} - -func (a *Transport) SendPrivateOnPersonalTopic(ctx context.Context, newMessage *types.NewMessage, publicKey *ecdsa.PublicKey) ([]byte, error) { - if err := a.addSig(newMessage); err != nil { - return nil, err - } - - filter, err := a.filters.LoadPersonal(publicKey, a.keysManager.privateKey, false) - if err != nil { - return nil, err - } - - newMessage.Topic = filter.Topic - newMessage.PublicKey = crypto.FromECDSAPub(publicKey) - - return a.shhAPI.Post(ctx, *newMessage) -} - -func (a *Transport) SendPrivateOnDiscovery(ctx context.Context, newMessage *types.NewMessage, publicKey *ecdsa.PublicKey) ([]byte, error) { - if err := a.addSig(newMessage); err != nil { - return nil, err - } - - // There is no need to load any chat - // because listening on the discovery topic - // is done automatically. - // TODO: change this anyway, it should be explicit - // and idempotent. - - newMessage.Topic = types.BytesToTopic(transport.ToTopic(transport.DiscoveryTopic())) - newMessage.PublicKey = crypto.FromECDSAPub(publicKey) - - return a.shhAPI.Post(ctx, *newMessage) -} - -func (a *Transport) addSig(newMessage *types.NewMessage) error { - sigID, err := a.keysManager.AddOrGetKeyPair(a.keysManager.privateKey) - if err != nil { - return err - } - newMessage.SigID = sigID - return nil -} - -func (a *Transport) Track(identifiers [][]byte, hash []byte, newMessage *types.NewMessage) { - if a.envelopesMonitor != nil { - a.envelopesMonitor.Add(identifiers, types.BytesToHash(hash), *newMessage) - } -} - -// GetCurrentTime returns the current unix timestamp in milliseconds -func (a *Transport) GetCurrentTime() uint64 { - return uint64(a.shh.GetCurrentTime().UnixNano() / int64(time.Millisecond)) -} - -func (a *Transport) MaxMessageSize() uint32 { - return a.shh.MaxMessageSize() -} - -func (a *Transport) Stop() error { - if a.envelopesMonitor != nil { - a.envelopesMonitor.Stop() - } - return nil -} - -// RequestHistoricMessages requests historic messages for all registered filters. -func (a *Transport) SendMessagesRequest( - ctx context.Context, - peerID []byte, - from, to uint32, - previousCursor []byte, - waitForResponse bool, -) (cursor []byte, err error) { - topics := make([]types.TopicType, len(a.Filters())) - for _, f := range a.Filters() { - topics = append(topics, f.Topic) - } - - r := createMessagesRequest(from, to, previousCursor, topics) - r.SetDefaults(a.shh.GetCurrentTime()) - - events := make(chan types.EnvelopeEvent, 10) - sub := a.shh.SubscribeEnvelopeEvents(events) - defer sub.Unsubscribe() - - err = a.shh.SendMessagesRequest(peerID, r) - if err != nil { - return - } - - if !waitForResponse { - return - } - - resp, err := a.waitForRequestCompleted(ctx, r.ID, events) - if err == nil && resp != nil && resp.Error != nil { - err = resp.Error - } else if err == nil && resp != nil { - cursor = resp.Cursor - } - return -} - -//TODO: kozieiev: fix -func (a *Transport) SendMessagesRequestForFilter( - ctx context.Context, - peerID []byte, - from, to uint32, - previousCursor []byte, - filter *transport.Filter, - waitForResponse bool, -) (cursor []byte, err error) { - return nil, nil -} - -func (a *Transport) LoadKeyFilters(key *ecdsa.PrivateKey) (*transport.Filter, error) { - return a.filters.LoadPartitioned(&key.PublicKey, key, true) -} - -func (a *Transport) waitForRequestCompleted(ctx context.Context, requestID []byte, events chan types.EnvelopeEvent) (*types.MailServerResponse, error) { - for { - select { - case ev := <-events: - a.logger.Debug( - "waiting for request completed and received an event", - zap.Binary("requestID", requestID), - zap.Any("event", ev), - ) - if !bytes.Equal(ev.Hash.Bytes(), requestID) { - continue - } - if ev.Event != types.EventMailServerRequestCompleted { - continue - } - data, ok := ev.Data.(*types.MailServerResponse) - if ok { - return data, nil - } - case <-ctx.Done(): - return nil, ctx.Err() - } - } -} - -// NOTE: currently not used as whisper is not maintained anymore -func (a *Transport) ConfirmMessagesProcessed(ids []string, timestamp uint64) error { - return nil -} - -// NOTE: currently not used as whisper is not maintained anymore -func (a *Transport) CleanMessagesProcessed(timestamp uint64) error { - return nil -} - -func (a *Transport) SetEnvelopeEventsHandler(handler transport.EnvelopeEventsHandler) error { - if a.envelopesMonitor == nil { - return errors.New("Current transport has no envelopes monitor") - } - a.envelopesMonitor.handler = handler - return nil -} diff --git a/protocol/transport/whisper/whisper_service_test.go b/protocol/transport/whisper/whisper_service_test.go deleted file mode 100644 index 4a9fc05c6..000000000 --- a/protocol/transport/whisper/whisper_service_test.go +++ /dev/null @@ -1,28 +0,0 @@ -package whisper - -import ( - "io/ioutil" - "os" - "testing" - - "github.com/status-im/status-go/protocol/sqlite" - - "github.com/stretchr/testify/require" - - "github.com/status-im/status-go/protocol/tt" -) - -func TestNewWhisperServiceTransport(t *testing.T) { - dbPath, err := ioutil.TempFile("", "transport.sql") - require.NoError(t, err) - defer os.Remove(dbPath.Name()) - db, err := sqlite.Open(dbPath.Name(), "some-key") - require.NoError(t, err) - - logger := tt.MustCreateTestLogger() - require.NoError(t, err) - defer func() { _ = logger.Sync() }() - - _, err = NewTransport(nil, nil, db, nil, nil, logger) - require.NoError(t, err) -}