status-go/services/shhext/service.go

463 lines
14 KiB
Go
Raw Normal View History

package shhext
import (
"context"
"crypto/ecdsa"
"fmt"
2019-07-17 22:25:42 +00:00
"github.com/status-im/status-go/logutils"
2019-07-01 09:39:51 +00:00
"os"
"path/filepath"
"time"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/node"
"github.com/ethereum/go-ethereum/p2p"
"github.com/ethereum/go-ethereum/p2p/enode"
"github.com/ethereum/go-ethereum/rpc"
2019-07-01 09:39:51 +00:00
"github.com/status-im/status-go/db"
"github.com/status-im/status-go/params"
"github.com/status-im/status-go/services/shhext/dedup"
Mail peer store and connection manager (#1295) This change implements connection manager that monitors 3 types of events: 1. update of the selected mail servers 2. disconnect from a mail server 3. errors for requesting mail history When selected mail servers provided we will try to connect with as many as possible, and later disconnect the surplus. For example if we want to connect with one mail server and 3 were selected, we try to connect with all (3), and later disconnect with 2. It will to establish connection with live mail server faster. If mail server disconnects we will choose any other mail server from the list of selected. Unless we have only one mail server. In such case we don't have any other choice and we will leave things as is. If request for history was expired we will disconnect such peer and try to find another one. We will follow same rules as described above. We will have two components that will rely on this logic: 1. requesting history If target peer is provided we will use that peer, otherwise we will request history from any selected mail server that is connected at the time of request. 2. confirmation from selected mail server Confirmation from any selected mail server will bee used to send a feedback that envelope was sent. I will add several extensions, but probably in separate PRs: 1. prioritize connection with mail server that was used before reboot 2. disconnect from mail servers if history request wasn't expired but failed. 3. wait some time in RequestsMessage RPC to establish connection with any mail server Currently this feature is hidden, as certain changes will be necessary in status-react. partially implements: https://github.com/status-im/status-go/issues/1285
2018-12-05 13:57:05 +00:00
"github.com/status-im/status-go/services/shhext/mailservers"
2019-07-01 09:39:51 +00:00
"github.com/status-im/status-go/signal"
2019-07-17 22:25:42 +00:00
protocol "github.com/status-im/status-protocol-go"
protocolwhisper "github.com/status-im/status-protocol-go/transport/whisper"
whisper "github.com/status-im/whisper/whisperv6"
"github.com/syndtr/goleveldb/leveldb"
"go.uber.org/zap"
2019-07-01 09:39:51 +00:00
"golang.org/x/crypto/sha3"
)
const (
Mail peer store and connection manager (#1295) This change implements connection manager that monitors 3 types of events: 1. update of the selected mail servers 2. disconnect from a mail server 3. errors for requesting mail history When selected mail servers provided we will try to connect with as many as possible, and later disconnect the surplus. For example if we want to connect with one mail server and 3 were selected, we try to connect with all (3), and later disconnect with 2. It will to establish connection with live mail server faster. If mail server disconnects we will choose any other mail server from the list of selected. Unless we have only one mail server. In such case we don't have any other choice and we will leave things as is. If request for history was expired we will disconnect such peer and try to find another one. We will follow same rules as described above. We will have two components that will rely on this logic: 1. requesting history If target peer is provided we will use that peer, otherwise we will request history from any selected mail server that is connected at the time of request. 2. confirmation from selected mail server Confirmation from any selected mail server will bee used to send a feedback that envelope was sent. I will add several extensions, but probably in separate PRs: 1. prioritize connection with mail server that was used before reboot 2. disconnect from mail servers if history request wasn't expired but failed. 3. wait some time in RequestsMessage RPC to establish connection with any mail server Currently this feature is hidden, as certain changes will be necessary in status-react. partially implements: https://github.com/status-im/status-go/issues/1285
2018-12-05 13:57:05 +00:00
// defaultConnectionsTarget used in Service.Start if configured connection target is 0.
defaultConnectionsTarget = 1
// defaultTimeoutWaitAdded is a timeout to use to establish initial connections.
defaultTimeoutWaitAdded = 5 * time.Second
)
// EnvelopeEventsHandler used for two different event types.
type EnvelopeEventsHandler interface {
EnvelopeSent([][]byte)
EnvelopeExpired([][]byte, error)
MailServerRequestCompleted(common.Hash, common.Hash, []byte, error)
MailServerRequestExpired(common.Hash)
}
// Service is a service that provides some additional Whisper API.
type Service struct {
2019-07-17 22:25:42 +00:00
messenger *protocol.Messenger
cancelMessenger chan struct{}
storage db.TransactionalStorage
w *whisper.Whisper
config params.ShhextConfig
mailMonitor *MailRequestMonitor
requestsRegistry *RequestsRegistry
historyUpdates *HistoryUpdateReactor
server *p2p.Server
nodeID *ecdsa.PrivateKey
deduplicator *dedup.Deduplicator
peerStore *mailservers.PeerStore
cache *mailservers.Cache
connManager *mailservers.ConnectionManager
lastUsedMonitor *mailservers.LastUsedConnectionMonitor
}
// Make sure that Service implements node.Service interface.
var _ node.Service = (*Service)(nil)
2019-05-23 08:47:20 +00:00
// New returns a new Service.
func New(w *whisper.Whisper, handler EnvelopeEventsHandler, ldb *leveldb.DB, config params.ShhextConfig) *Service {
cache := mailservers.NewCache(ldb)
ps := mailservers.NewPeerStore(cache)
delay := defaultRequestsDelay
if config.RequestsDelay != 0 {
delay = config.RequestsDelay
}
requestsRegistry := NewRequestsRegistry(delay)
historyUpdates := NewHistoryUpdateReactor()
mailMonitor := &MailRequestMonitor{
w: w,
handler: handler,
cache: map[common.Hash]EnvelopeState{},
requestsRegistry: requestsRegistry,
}
return &Service{
storage: db.NewLevelDBStorage(ldb),
w: w,
config: config,
mailMonitor: mailMonitor,
requestsRegistry: requestsRegistry,
historyUpdates: historyUpdates,
deduplicator: dedup.NewDeduplicator(w, ldb),
peerStore: ps,
cache: cache,
}
}
2019-07-01 09:39:51 +00:00
func (s *Service) InitProtocolWithPassword(address string, password string) error {
digest := sha3.Sum256([]byte(password))
encKey := fmt.Sprintf("%x", digest)
return s.initProtocol(address, encKey, password)
}
// InitProtocolWithEncyptionKey creates an instance of ProtocolService given an address and encryption key.
func (s *Service) InitProtocolWithEncyptionKey(address string, encKey string) error {
return s.initProtocol(address, encKey, "")
}
2019-07-30 06:14:13 +00:00
func (s *Service) initProtocol(address, encKey, password string) error { // nolint: gocyclo
2019-07-01 09:39:51 +00:00
if !s.config.PFSEnabled {
return nil
}
dataDir := filepath.Clean(s.config.BackupDisabledDataDir)
if err := os.MkdirAll(dataDir, os.ModePerm); err != nil {
return err
}
v0Path := filepath.Join(dataDir, fmt.Sprintf("%x.db", address))
v1Path := filepath.Join(dataDir, fmt.Sprintf("%s.db", s.config.InstallationID))
v2Path := filepath.Join(dataDir, fmt.Sprintf("%s.v2.db", s.config.InstallationID))
v3Path := filepath.Join(dataDir, fmt.Sprintf("%s.v3.db", s.config.InstallationID))
v4Path := filepath.Join(dataDir, fmt.Sprintf("%s.v4.db", s.config.InstallationID))
if password != "" {
2019-07-17 22:25:42 +00:00
if err := migrateDBFile(v0Path, v1Path, "ON", password); err != nil {
2019-07-01 09:39:51 +00:00
return err
}
2019-07-17 22:25:42 +00:00
if err := migrateDBFile(v1Path, v2Path, password, encKey); err != nil {
2019-07-01 09:39:51 +00:00
// Remove db file as created with a blank password and never used,
// and there's no need to rekey in this case
os.Remove(v1Path)
os.Remove(v2Path)
}
}
2019-07-17 22:25:42 +00:00
if err := migrateDBKeyKdfIterations(v2Path, v3Path, encKey); err != nil {
2019-07-01 09:39:51 +00:00
os.Remove(v2Path)
os.Remove(v3Path)
}
// Fix IOS not encrypting database
2019-07-17 22:25:42 +00:00
if err := encryptDatabase(v3Path, v4Path, encKey); err != nil {
2019-07-01 09:39:51 +00:00
os.Remove(v3Path)
os.Remove(v4Path)
}
// Desktop was passing a network dependent directory, which meant that
// if running on testnet it would not access the right db. This copies
// the db from mainnet to the root location.
networkDependentPath := filepath.Join(dataDir, "ethereum", "mainnet_rpc", fmt.Sprintf("%s.v4.db", s.config.InstallationID))
if _, err := os.Stat(networkDependentPath); err == nil {
if err := os.Rename(networkDependentPath, v4Path); err != nil {
return err
}
} else if !os.IsNotExist(err) {
return err
}
2019-07-30 06:14:13 +00:00
// In one of the versions, we split the database file into multiple ones.
// Later, we discovered that it really hurts the performance so we consolidated
// it again but in a better way keeping migrations in separate packages.
2019-07-17 22:25:42 +00:00
sessionsDatabasePath := filepath.Join(dataDir, fmt.Sprintf("%s.sessions.v4.sql", s.config.InstallationID))
2019-07-30 06:14:13 +00:00
sessionsStat, sessionsStatErr := os.Stat(sessionsDatabasePath)
v4PathStat, v4PathStatErr := os.Stat(v4Path)
if sessionsStatErr == nil && os.IsNotExist(v4PathStatErr) {
// This is a clear situation where we have the sessions.v4.sql file and v4Path does not exist.
// In the previous migration, we removed v4Path when it is successfully copied into the sessions sql file.
if err := os.Rename(sessionsDatabasePath, v4Path); err != nil {
return err
}
} else if sessionsStatErr == nil && v4PathStatErr == nil {
// Both files exist so probably the migration to split databases failed.
if sessionsStat.ModTime().After(v4PathStat.ModTime()) {
// Sessions sql file is newer.
if err := os.Rename(sessionsDatabasePath, v4Path); err != nil {
return err
}
2019-07-17 22:25:42 +00:00
}
}
// Create a custom zap.Logger which will forward logs from status-protocol-go to status-go logger.
zapLogger, err := logutils.NewZapLoggerWithAdapter(logutils.Logger())
2019-07-01 09:39:51 +00:00
if err != nil {
return err
}
envelopesMonitorConfig := &protocolwhisper.EnvelopesMonitorConfig{
MaxAttempts: s.config.MaxMessageDeliveryAttempts,
MailserverConfirmationsEnabled: s.config.MailServerConfirmations,
IsMailserver: func(peer enode.ID) bool {
return s.peerStore.Exist(peer)
},
EnvelopeEventsHandler: EnvelopeSignalHandler{},
Logger: zapLogger,
}
options := buildMessengerOptions(s.config, v4Path, encKey, envelopesMonitorConfig, zapLogger)
selectedKeyID := s.w.SelectedKeyPairID()
identity, err := s.w.GetPrivateKey(selectedKeyID)
2019-07-17 22:25:42 +00:00
if err != nil {
return err
2019-07-01 09:39:51 +00:00
}
2019-07-17 22:25:42 +00:00
messenger, err := protocol.NewMessenger(
identity,
s.w,
s.config.InstallationID,
options...,
2019-07-17 22:25:42 +00:00
)
if err != nil {
return err
2019-07-01 09:39:51 +00:00
}
2019-07-17 22:25:42 +00:00
s.messenger = messenger
// Start a loop that retrieves all messages and propagates them to status-react.
s.cancelMessenger = make(chan struct{})
go s.retrieveMessagesLoop(time.Second, s.cancelMessenger)
2019-07-01 09:39:51 +00:00
2019-07-17 22:25:42 +00:00
return nil
}
func (s *Service) retrieveMessagesLoop(tick time.Duration, cancel <-chan struct{}) {
ticker := time.NewTicker(tick)
defer ticker.Stop()
for {
select {
case <-ticker.C:
chatWithMessages, err := s.messenger.RetrieveRawAll()
2019-07-05 12:45:47 +00:00
if err != nil {
2019-07-17 22:25:42 +00:00
log.Error("failed to retrieve raw messages", "err", err)
2019-07-05 12:45:47 +00:00
continue
}
var messageIDs []string
for _, messages := range chatWithMessages {
for _, message := range messages {
messageIDs = append(messageIDs, message.ID.String())
}
}
existingMessages, err := s.messenger.MessagesExist(messageIDs)
if err != nil {
log.Error("failed to check existing messages", "err", err)
continue
}
2019-07-05 12:45:47 +00:00
2019-07-17 22:25:42 +00:00
var signalMessages []*signal.Messages
2019-07-17 22:25:42 +00:00
for chat, messages := range chatWithMessages {
var dedupMessages []*dedup.DeduplicateMessage
// Filter out already saved messages
for _, message := range messages {
if !existingMessages[message.ID.String()] {
dedupMessage := &dedup.DeduplicateMessage{
Metadata: dedup.Metadata{
MessageID: message.ID,
EncryptionID: message.Hash,
},
Message: message.TransportMessage,
}
dedupMessage.Message.Payload = message.DecryptedPayload
dedupMessages = append(dedupMessages, dedupMessage)
}
}
dedupMessages = s.deduplicator.Deduplicate(dedupMessages)
if len(dedupMessages) != 0 {
signalMessage := &signal.Messages{
Chat: chat,
Error: nil, // TODO: what is it needed for?
Messages: dedupMessages,
}
signalMessages = append(signalMessages, signalMessage)
2019-07-17 22:25:42 +00:00
}
}
2019-07-01 09:39:51 +00:00
2019-07-17 22:25:42 +00:00
log.Debug("retrieve messages loop", "messages", len(signalMessages))
if len(signalMessages) == 0 {
continue
2019-07-01 09:39:51 +00:00
}
2019-07-17 22:25:42 +00:00
PublisherSignalHandler{}.NewMessages(signalMessages)
case <-cancel:
return
2019-07-01 09:39:51 +00:00
}
}
2019-07-17 22:25:42 +00:00
}
2019-07-05 12:45:47 +00:00
2019-07-17 22:25:42 +00:00
func (s *Service) ConfirmMessagesProcessed(messageIDs [][]byte) error {
return s.messenger.ConfirmMessagesProcessed(messageIDs)
}
func (s *Service) EnableInstallation(installationID string) error {
return s.messenger.EnableInstallation(installationID)
}
// DisableInstallation disables an installation for multi-device sync.
func (s *Service) DisableInstallation(installationID string) error {
return s.messenger.DisableInstallation(installationID)
2019-07-01 09:39:51 +00:00
}
// UpdateMailservers updates information about selected mail servers.
func (s *Service) UpdateMailservers(nodes []*enode.Node) error {
if err := s.peerStore.Update(nodes); err != nil {
return err
}
Mail peer store and connection manager (#1295) This change implements connection manager that monitors 3 types of events: 1. update of the selected mail servers 2. disconnect from a mail server 3. errors for requesting mail history When selected mail servers provided we will try to connect with as many as possible, and later disconnect the surplus. For example if we want to connect with one mail server and 3 were selected, we try to connect with all (3), and later disconnect with 2. It will to establish connection with live mail server faster. If mail server disconnects we will choose any other mail server from the list of selected. Unless we have only one mail server. In such case we don't have any other choice and we will leave things as is. If request for history was expired we will disconnect such peer and try to find another one. We will follow same rules as described above. We will have two components that will rely on this logic: 1. requesting history If target peer is provided we will use that peer, otherwise we will request history from any selected mail server that is connected at the time of request. 2. confirmation from selected mail server Confirmation from any selected mail server will bee used to send a feedback that envelope was sent. I will add several extensions, but probably in separate PRs: 1. prioritize connection with mail server that was used before reboot 2. disconnect from mail servers if history request wasn't expired but failed. 3. wait some time in RequestsMessage RPC to establish connection with any mail server Currently this feature is hidden, as certain changes will be necessary in status-react. partially implements: https://github.com/status-im/status-go/issues/1285
2018-12-05 13:57:05 +00:00
if s.connManager != nil {
s.connManager.Notify(nodes)
}
return nil
}
// Protocols returns a new protocols list. In this case, there are none.
func (s *Service) Protocols() []p2p.Protocol {
return []p2p.Protocol{}
}
// APIs returns a list of new APIs.
func (s *Service) APIs() []rpc.API {
apis := []rpc.API{
{
Namespace: "shhext",
Version: "1.0",
Service: NewPublicAPI(s),
Public: true,
},
}
return apis
}
// Start is run when a service is started.
// It does nothing in this case but is required by `node.Service` interface.
func (s *Service) Start(server *p2p.Server) error {
Mail peer store and connection manager (#1295) This change implements connection manager that monitors 3 types of events: 1. update of the selected mail servers 2. disconnect from a mail server 3. errors for requesting mail history When selected mail servers provided we will try to connect with as many as possible, and later disconnect the surplus. For example if we want to connect with one mail server and 3 were selected, we try to connect with all (3), and later disconnect with 2. It will to establish connection with live mail server faster. If mail server disconnects we will choose any other mail server from the list of selected. Unless we have only one mail server. In such case we don't have any other choice and we will leave things as is. If request for history was expired we will disconnect such peer and try to find another one. We will follow same rules as described above. We will have two components that will rely on this logic: 1. requesting history If target peer is provided we will use that peer, otherwise we will request history from any selected mail server that is connected at the time of request. 2. confirmation from selected mail server Confirmation from any selected mail server will bee used to send a feedback that envelope was sent. I will add several extensions, but probably in separate PRs: 1. prioritize connection with mail server that was used before reboot 2. disconnect from mail servers if history request wasn't expired but failed. 3. wait some time in RequestsMessage RPC to establish connection with any mail server Currently this feature is hidden, as certain changes will be necessary in status-react. partially implements: https://github.com/status-im/status-go/issues/1285
2018-12-05 13:57:05 +00:00
if s.config.EnableConnectionManager {
connectionsTarget := s.config.ConnectionTarget
if connectionsTarget == 0 {
connectionsTarget = defaultConnectionsTarget
}
2019-01-21 14:00:10 +00:00
maxFailures := s.config.MaxServerFailures
// if not defined change server on first expired event
if maxFailures == 0 {
maxFailures = 1
}
s.connManager = mailservers.NewConnectionManager(server, s.w, connectionsTarget, maxFailures, defaultTimeoutWaitAdded)
Mail peer store and connection manager (#1295) This change implements connection manager that monitors 3 types of events: 1. update of the selected mail servers 2. disconnect from a mail server 3. errors for requesting mail history When selected mail servers provided we will try to connect with as many as possible, and later disconnect the surplus. For example if we want to connect with one mail server and 3 were selected, we try to connect with all (3), and later disconnect with 2. It will to establish connection with live mail server faster. If mail server disconnects we will choose any other mail server from the list of selected. Unless we have only one mail server. In such case we don't have any other choice and we will leave things as is. If request for history was expired we will disconnect such peer and try to find another one. We will follow same rules as described above. We will have two components that will rely on this logic: 1. requesting history If target peer is provided we will use that peer, otherwise we will request history from any selected mail server that is connected at the time of request. 2. confirmation from selected mail server Confirmation from any selected mail server will bee used to send a feedback that envelope was sent. I will add several extensions, but probably in separate PRs: 1. prioritize connection with mail server that was used before reboot 2. disconnect from mail servers if history request wasn't expired but failed. 3. wait some time in RequestsMessage RPC to establish connection with any mail server Currently this feature is hidden, as certain changes will be necessary in status-react. partially implements: https://github.com/status-im/status-go/issues/1285
2018-12-05 13:57:05 +00:00
s.connManager.Start()
if err := mailservers.EnsureUsedRecordsAddedFirst(s.peerStore, s.connManager); err != nil {
return err
}
}
if s.config.EnableLastUsedMonitor {
s.lastUsedMonitor = mailservers.NewLastUsedConnectionMonitor(s.peerStore, s.cache, s.w)
s.lastUsedMonitor.Start()
Mail peer store and connection manager (#1295) This change implements connection manager that monitors 3 types of events: 1. update of the selected mail servers 2. disconnect from a mail server 3. errors for requesting mail history When selected mail servers provided we will try to connect with as many as possible, and later disconnect the surplus. For example if we want to connect with one mail server and 3 were selected, we try to connect with all (3), and later disconnect with 2. It will to establish connection with live mail server faster. If mail server disconnects we will choose any other mail server from the list of selected. Unless we have only one mail server. In such case we don't have any other choice and we will leave things as is. If request for history was expired we will disconnect such peer and try to find another one. We will follow same rules as described above. We will have two components that will rely on this logic: 1. requesting history If target peer is provided we will use that peer, otherwise we will request history from any selected mail server that is connected at the time of request. 2. confirmation from selected mail server Confirmation from any selected mail server will bee used to send a feedback that envelope was sent. I will add several extensions, but probably in separate PRs: 1. prioritize connection with mail server that was used before reboot 2. disconnect from mail servers if history request wasn't expired but failed. 3. wait some time in RequestsMessage RPC to establish connection with any mail server Currently this feature is hidden, as certain changes will be necessary in status-react. partially implements: https://github.com/status-im/status-go/issues/1285
2018-12-05 13:57:05 +00:00
}
s.mailMonitor.Start()
s.nodeID = server.PrivateKey
Mail peer store and connection manager (#1295) This change implements connection manager that monitors 3 types of events: 1. update of the selected mail servers 2. disconnect from a mail server 3. errors for requesting mail history When selected mail servers provided we will try to connect with as many as possible, and later disconnect the surplus. For example if we want to connect with one mail server and 3 were selected, we try to connect with all (3), and later disconnect with 2. It will to establish connection with live mail server faster. If mail server disconnects we will choose any other mail server from the list of selected. Unless we have only one mail server. In such case we don't have any other choice and we will leave things as is. If request for history was expired we will disconnect such peer and try to find another one. We will follow same rules as described above. We will have two components that will rely on this logic: 1. requesting history If target peer is provided we will use that peer, otherwise we will request history from any selected mail server that is connected at the time of request. 2. confirmation from selected mail server Confirmation from any selected mail server will bee used to send a feedback that envelope was sent. I will add several extensions, but probably in separate PRs: 1. prioritize connection with mail server that was used before reboot 2. disconnect from mail servers if history request wasn't expired but failed. 3. wait some time in RequestsMessage RPC to establish connection with any mail server Currently this feature is hidden, as certain changes will be necessary in status-react. partially implements: https://github.com/status-im/status-go/issues/1285
2018-12-05 13:57:05 +00:00
s.server = server
2019-07-05 12:45:47 +00:00
return nil
2019-06-03 14:29:14 +00:00
}
// Stop is run when a service is stopped.
func (s *Service) Stop() error {
2019-06-03 14:29:14 +00:00
log.Info("Stopping shhext service")
Mail peer store and connection manager (#1295) This change implements connection manager that monitors 3 types of events: 1. update of the selected mail servers 2. disconnect from a mail server 3. errors for requesting mail history When selected mail servers provided we will try to connect with as many as possible, and later disconnect the surplus. For example if we want to connect with one mail server and 3 were selected, we try to connect with all (3), and later disconnect with 2. It will to establish connection with live mail server faster. If mail server disconnects we will choose any other mail server from the list of selected. Unless we have only one mail server. In such case we don't have any other choice and we will leave things as is. If request for history was expired we will disconnect such peer and try to find another one. We will follow same rules as described above. We will have two components that will rely on this logic: 1. requesting history If target peer is provided we will use that peer, otherwise we will request history from any selected mail server that is connected at the time of request. 2. confirmation from selected mail server Confirmation from any selected mail server will bee used to send a feedback that envelope was sent. I will add several extensions, but probably in separate PRs: 1. prioritize connection with mail server that was used before reboot 2. disconnect from mail servers if history request wasn't expired but failed. 3. wait some time in RequestsMessage RPC to establish connection with any mail server Currently this feature is hidden, as certain changes will be necessary in status-react. partially implements: https://github.com/status-im/status-go/issues/1285
2018-12-05 13:57:05 +00:00
if s.config.EnableConnectionManager {
s.connManager.Stop()
}
if s.config.EnableLastUsedMonitor {
s.lastUsedMonitor.Stop()
}
s.requestsRegistry.Clear()
s.mailMonitor.Stop()
2019-07-17 22:25:42 +00:00
if s.cancelMessenger != nil {
select {
case <-s.cancelMessenger:
// channel already closed
default:
close(s.cancelMessenger)
s.cancelMessenger = nil
2019-05-23 07:54:28 +00:00
}
}
2019-07-17 22:25:42 +00:00
if s.messenger != nil {
if err := s.messenger.Shutdown(); err != nil {
return err
}
}
return nil
}
func (s *Service) syncMessages(ctx context.Context, mailServerID []byte, r whisper.SyncMailRequest) (resp whisper.SyncEventResponse, err error) {
err = s.w.SyncMessages(mailServerID, r)
if err != nil {
return
}
// Wait for the response which is received asynchronously as a p2p packet.
// This packet handler will send an event which contains the response payload.
events := make(chan whisper.EnvelopeEvent, 1024)
sub := s.w.SubscribeEnvelopeEvents(events)
defer sub.Unsubscribe()
// Add explicit timeout context, otherwise the request
// can hang indefinitely if not specified by the sender.
// Sender is usually through netcat or some bash tool
// so it's not really possible to specify the timeout.
timeoutCtx, cancel := context.WithTimeout(ctx, time.Second*30)
defer cancel()
for {
select {
case event := <-events:
if event.Event != whisper.EventMailServerSyncFinished {
continue
}
log.Info("received EventMailServerSyncFinished event", "data", event.Data)
var ok bool
resp, ok = event.Data.(whisper.SyncEventResponse)
if !ok {
err = fmt.Errorf("did not understand the response event data")
return
}
return
case <-timeoutCtx.Done():
err = timeoutCtx.Err()
return
}
}
}
2019-07-17 22:25:42 +00:00
func buildMessengerOptions(config params.ShhextConfig, dbPath, dbKey string, envelopesMonitorConfig *protocolwhisper.EnvelopesMonitorConfig, logger *zap.Logger) []protocol.Option {
options := []protocol.Option{
protocol.WithCustomLogger(logger),
2019-07-30 06:14:13 +00:00
protocol.WithDatabaseConfig(dbPath, dbKey),
protocol.WithEnvelopesMonitorConfig(envelopesMonitorConfig),
}
if !config.DisableGenericDiscoveryTopic {
options = append(options, protocol.WithGenericDiscoveryTopicSupport())
}
if config.DataSyncEnabled {
options = append(options, protocol.WithDatasync())
}
if config.SendV1Messages {
options = append(options, protocol.WithSendV1Messages())
}
return options
2019-07-17 22:25:42 +00:00
}