From 92afd0d47e4e98a5f19c056678726605c1a7665e Mon Sep 17 00:00:00 2001 From: Victor Farazdagi Date: Wed, 12 Apr 2017 21:26:54 +0300 Subject: [PATCH] vendor/whisper, statusd: push notifications implemented, closes #135 --- cmd/statusd/wnodecmd.go | 56 +- geth/node.go | 40 +- geth/node_manager.go | 1 + geth/params/config.go | 93 ++- geth/params/defaults.go | 3 + geth/params/testdata/config.mainnet.json | 10 +- geth/params/testdata/config.testnet.json | 10 +- static/html/shh-push-notifications.html | 392 ++++++++++++ .../whisper/notifications/discovery.go | 148 +++++ .../whisper/notifications/provider.go | 59 ++ .../whisper/notifications/server.go | 581 ++++++++++++++++++ .../whisper/notifications/utils.go | 72 +++ .../go-ethereum/whisper/whisperv5/api.go | 5 + .../go-ethereum/whisper/whisperv5/doc.go | 14 + .../go-ethereum/whisper/whisperv5/whisper.go | 21 +- 15 files changed, 1479 insertions(+), 26 deletions(-) create mode 100644 static/html/shh-push-notifications.html create mode 100644 vendor/github.com/ethereum/go-ethereum/whisper/notifications/discovery.go create mode 100644 vendor/github.com/ethereum/go-ethereum/whisper/notifications/provider.go create mode 100644 vendor/github.com/ethereum/go-ethereum/whisper/notifications/server.go create mode 100644 vendor/github.com/ethereum/go-ethereum/whisper/notifications/utils.go diff --git a/cmd/statusd/wnodecmd.go b/cmd/statusd/wnodecmd.go index 77871a2de..66dbe5524 100644 --- a/cmd/statusd/wnodecmd.go +++ b/cmd/statusd/wnodecmd.go @@ -31,9 +31,13 @@ var ( Name: "mailserver", Usage: "Delivers expired messages on demand", } - WhisperPassword = cli.StringFlag{ + WhisperIdentityFile = cli.StringFlag{ + Name: "identity", + Usage: "Protocol identity file (private key used for asymetric encryption)", + } + WhisperPasswordFile = cli.StringFlag{ Name: "password", - Usage: "Password, will be used for topic keys, as Mail & Notification Server password", + Usage: "Password file (password is used for symmetric encryption)", } WhisperPortFlag = cli.IntFlag{ Name: "port", @@ -54,6 +58,10 @@ var ( Name: "injectaccounts", Usage: "Whether test account should be injected or not (default: true)", } + FirebaseAuthorizationKey = cli.StringFlag{ + Name: "firebaseauth", + Usage: "FCM Authorization Key used for sending Push Notifications", + } ) var ( @@ -67,11 +75,13 @@ var ( WhisperNotificationServerNodeFlag, WhisperForwarderNodeFlag, WhisperMailserverNodeFlag, - WhisperPassword, + WhisperIdentityFile, + WhisperPasswordFile, WhisperPoWFlag, WhisperPortFlag, WhisperTTLFlag, WhisperInjectTestAccounts, + FirebaseAuthorizationKey, }, } ) @@ -129,24 +139,52 @@ func makeWhisperNodeConfig(ctx *cli.Context) (*params.NodeConfig, error) { whisperConfig := nodeConfig.WhisperConfig whisperConfig.Enabled = true + whisperConfig.IdentityFile = ctx.String(WhisperIdentityFile.Name) + whisperConfig.PasswordFile = ctx.String(WhisperPasswordFile.Name) whisperConfig.EchoMode = ctx.BoolT(WhisperEchoModeFlag.Name) whisperConfig.BootstrapNode = ctx.BoolT(WhisperBootstrapNodeFlag.Name) whisperConfig.ForwarderNode = ctx.Bool(WhisperForwarderNodeFlag.Name) whisperConfig.NotificationServerNode = ctx.Bool(WhisperNotificationServerNodeFlag.Name) whisperConfig.MailServerNode = ctx.Bool(WhisperMailserverNodeFlag.Name) - whisperConfig.MailServerPassword = ctx.String(WhisperPassword.Name) - whisperConfig.NotificationServerPassword = ctx.String(WhisperPassword.Name) // the same for both mail and notification servers - whisperConfig.Port = ctx.Int(WhisperPortFlag.Name) whisperConfig.TTL = ctx.Int(WhisperTTLFlag.Name) whisperConfig.MinimumPoW = ctx.Float64(WhisperPoWFlag.Name) - if whisperConfig.MailServerNode && len(whisperConfig.MailServerPassword) == 0 { + if whisperConfig.MailServerNode && len(whisperConfig.PasswordFile) == 0 { return nil, errors.New("mail server requires --password to be specified") } - if whisperConfig.NotificationServerNode && len(whisperConfig.NotificationServerPassword) == 0 { - return nil, errors.New("notification server requires --password to be specified") + if whisperConfig.NotificationServerNode && len(whisperConfig.IdentityFile) == 0 { + return nil, errors.New("notification server requires either --identity file to be specified") + } + + if len(whisperConfig.PasswordFile) > 0 { // make sure that we can load password file + if whisperConfig.PasswordFile, err = filepath.Abs(whisperConfig.PasswordFile); err != nil { + return nil, err + } + if _, err := whisperConfig.ReadPasswordFile(); err != nil { + return nil, err + } + } + + if len(whisperConfig.IdentityFile) > 0 { // make sure that we can load identity file + if whisperConfig.IdentityFile, err = filepath.Abs(whisperConfig.IdentityFile); err != nil { + return nil, err + } + if _, err := whisperConfig.ReadIdentityFile(); err != nil { + return nil, err + } + } + + firebaseConfig := whisperConfig.FirebaseConfig + firebaseConfig.AuthorizationKeyFile = ctx.String(FirebaseAuthorizationKey.Name) + if len(firebaseConfig.AuthorizationKeyFile) > 0 { // make sure authorization key can be loaded + if firebaseConfig.AuthorizationKeyFile, err = filepath.Abs(firebaseConfig.AuthorizationKeyFile); err != nil { + return nil, err + } + if _, err := firebaseConfig.ReadAuthorizationKeyFile(); err != nil { + return nil, err + } } return nodeConfig, nil diff --git a/geth/node.go b/geth/node.go index ea42e4e64..8986d50c5 100644 --- a/geth/node.go +++ b/geth/node.go @@ -24,6 +24,8 @@ import ( "github.com/ethereum/go-ethereum/p2p/discover" "github.com/ethereum/go-ethereum/p2p/discv5" "github.com/ethereum/go-ethereum/p2p/nat" + "github.com/ethereum/go-ethereum/whisper/mailserver" + "github.com/ethereum/go-ethereum/whisper/notifications" whisper "github.com/ethereum/go-ethereum/whisper/whisperv5" "github.com/status-im/status-go/geth/params" ) @@ -114,6 +116,17 @@ func MakeNode(config *params.NodeConfig) *Node { stackConfig.P2P.PrivateKey = pk } + if len(config.NodeKeyFile) > 0 { + log.Info("Loading private key file", "file", config.NodeKeyFile) + pk, err := crypto.LoadECDSA(config.NodeKeyFile) + if err != nil { + log.Info("Failed loading private key file", "file", config.NodeKeyFile, "err", err) + } + + // override node's private key + stackConfig.P2P.PrivateKey = pk + } + stack, err := node.New(stackConfig) if err != nil { Fatalf(ErrNodeMakeFailure) @@ -175,7 +188,30 @@ func activateShhService(stack *node.Node, config *params.NodeConfig) error { return nil } serviceConstructor := func(*node.ServiceContext) (node.Service, error) { - return whisper.New(), nil + whisperConfig := config.WhisperConfig + whisperService := whisper.New() + + // enable mail service + if whisperConfig.MailServerNode { + password, err := whisperConfig.ReadPasswordFile() + if err != nil { + return nil, err + } + + var mailServer mailserver.WMailServer + whisperService.RegisterServer(&mailServer) + mailServer.Init(whisperService, whisperConfig.DataDir, string(password), whisperConfig.MinimumPoW) + } + + // enable notification service + if whisperConfig.NotificationServerNode { + var notificationServer notifications.NotificationServer + whisperService.RegisterNotificationServer(¬ificationServer) + + notificationServer.Init(whisperService, whisperConfig) + } + + return whisperService, nil } if err := stack.Register(serviceConstructor); err != nil { return err @@ -286,7 +322,7 @@ func Fatalf(reason interface{}, args ...interface{}) { // HaltOnPanic recovers from panic, logs issue, sends upward notification, and exits func HaltOnPanic() { if r := recover(); r != nil { - err := fmt.Errorf("%v: %v", ErrNodeStartFailure, r) + err := fmt.Errorf("%v: %v", ErrNodeRunFailure, r) // send signal up to native app SendSignal(SignalEnvelope{ diff --git a/geth/node_manager.go b/geth/node_manager.go index 6f7940415..a4ee343c0 100644 --- a/geth/node_manager.go +++ b/geth/node_manager.go @@ -53,6 +53,7 @@ var ( ErrInvalidJailedRequestQueue = errors.New("jailed request queue is not properly initialized") ErrNodeMakeFailure = errors.New("error creating p2p node") ErrNodeStartFailure = errors.New("error starting p2p node") + ErrNodeRunFailure = errors.New("error running p2p node") ErrInvalidNodeAPI = errors.New("no node API connected") ErrAccountKeyStoreMissing = errors.New("account key store is not set") ) diff --git a/geth/params/config.go b/geth/params/config.go index 3ea4db746..b492dc3ab 100644 --- a/geth/params/config.go +++ b/geth/params/config.go @@ -1,6 +1,8 @@ package params import ( + "bytes" + "crypto/ecdsa" "encoding/json" "errors" "fmt" @@ -10,6 +12,7 @@ import ( "strings" "github.com/ethereum/go-ethereum/core" + "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/log" ) @@ -26,8 +29,11 @@ func init() { } var ( - ErrMissingDataDir = errors.New("missing required 'DataDir' parameter") - ErrMissingNetworkId = errors.New("missing required 'NetworkId' parameter") + ErrMissingDataDir = errors.New("missing required 'DataDir' parameter") + ErrMissingNetworkId = errors.New("missing required 'NetworkId' parameter") + ErrEmptyPasswordFile = errors.New("password file cannot be empty") + ErrEmptyIdentityFile = errors.New("identity file cannot be empty") + ErrEmptyAuthorizationKeyFile = errors.New("authorization key file cannot be empty") ) // LightEthConfig holds LES-related configuration @@ -43,11 +49,26 @@ type LightEthConfig struct { DatabaseCache int } +type FirebaseConfig struct { + // AuthorizationKeyFile file path that contains FCM authorization key + AuthorizationKeyFile string + + // NotificationTriggerURL URL used to send push notification requests to + NotificationTriggerURL string +} + // WhisperConfig holds SHH-related configuration type WhisperConfig struct { // Enabled flag specifies whether protocol is enabled Enabled bool + // IdentityFile path to private key, that will be loaded as identity into Whisper + IdentityFile string + + // PasswordFile path to password file, for non-interactive password entry + // (if no account file selected, then this password is used for symmetric encryption) + PasswordFile string + // EchoMode if mode is on, prints some arguments for diagnostics EchoMode bool @@ -60,15 +81,9 @@ type WhisperConfig struct { // MailServerNode is mode when node is capable of delivering expired messages on demand MailServerNode bool - // MailServerPassword is password for MailServer's symmetric key - MailServerPassword string - // NotificationServerNode is mode when node is capable of sending Push (and probably other kinds) Notifications NotificationServerNode bool - // NotificationServerPassword is password for NotificationServer's symmetric key (used in discovery) - NotificationServerPassword string - // DataDir is the file system folder Whisper should use for any data storage needs. DataDir string @@ -80,6 +95,9 @@ type WhisperConfig struct { // TTL time to live for messages, in seconds TTL int + + // FirebaseConfig extra configuration for Firebase Cloud Messaging + FirebaseConfig *FirebaseConfig `json:"FirebaseConfig,"` } // SwarmConfig holds Swarm-related configuration @@ -200,6 +218,9 @@ func NewNodeConfig(dataDir string, networkId uint64) (*NodeConfig, error) { Port: WhisperPort, MinimumPoW: WhisperMinimumPoW, TTL: WhisperTTL, + FirebaseConfig: &FirebaseConfig{ + NotificationTriggerURL: FirebaseNotificationTriggerURL, + }, }, SwarmConfig: &SwarmConfig{}, } @@ -330,3 +351,59 @@ func (c *SwarmConfig) String() string { data, _ := json.MarshalIndent(c, "", " ") return string(data) } + +// ReadPasswordFile reads and returns content of the password file +func (c *WhisperConfig) ReadPasswordFile() ([]byte, error) { + if len(c.PasswordFile) <= 0 { + return nil, ErrEmptyPasswordFile + } + + password, err := ioutil.ReadFile(c.PasswordFile) + if err != nil { + return nil, err + } + password = bytes.TrimRight(password, "\n") + + if len(password) == 0 { + return nil, ErrEmptyPasswordFile + } + + return password, nil +} + +// ReadIdentityFile reads and loads identity private key +func (c *WhisperConfig) ReadIdentityFile() (*ecdsa.PrivateKey, error) { + if len(c.IdentityFile) <= 0 { + return nil, ErrEmptyIdentityFile + } + + identity, err := crypto.LoadECDSA(c.IdentityFile) + if err != nil { + return nil, err + } + + if identity == nil { + return nil, ErrEmptyIdentityFile + } + + return identity, nil +} + +// ReadAuthorizationKeyFile reads and loads FCM authorization key +func (c *FirebaseConfig) ReadAuthorizationKeyFile() ([]byte, error) { + if len(c.AuthorizationKeyFile) <= 0 { + return nil, ErrEmptyAuthorizationKeyFile + } + + key, err := ioutil.ReadFile(c.AuthorizationKeyFile) + if err != nil { + return nil, err + } + key = bytes.TrimRight(key, "\n") + + if key == nil { + return nil, ErrEmptyAuthorizationKeyFile + } + + return key, nil +} diff --git a/geth/params/defaults.go b/geth/params/defaults.go index 625691a55..4e05880ca 100644 --- a/geth/params/defaults.go +++ b/geth/params/defaults.go @@ -66,6 +66,9 @@ const ( // WhisperTTL is time to live for messages, in seconds WhisperTTL = 120 + // FirebaseNotificationTriggerURL is URL where FCM notification requests are sent to + FirebaseNotificationTriggerURL = "https://fcm.googleapis.com/fcm/send" + // MainNetworkId is id of the main network MainNetworkId = 1 diff --git a/geth/params/testdata/config.mainnet.json b/geth/params/testdata/config.mainnet.json index 35719c65c..de15468c9 100755 --- a/geth/params/testdata/config.mainnet.json +++ b/geth/params/testdata/config.mainnet.json @@ -28,17 +28,21 @@ }, "WhisperConfig": { "Enabled": true, + "IdentityFile": "", + "PasswordFile": "", "EchoMode": false, "BootstrapNode": false, "ForwarderNode": false, "MailServerNode": false, - "MailServerPassword": "", "NotificationServerNode": false, - "NotificationServerPassword": "", "DataDir": "$TMPDIR/wnode", "Port": 30379, "MinimumPoW": 0.001, - "TTL": 120 + "TTL": 120, + "FirebaseConfig": { + "AuthorizationKeyFile": "", + "NotificationTriggerURL": "https://fcm.googleapis.com/fcm/send" + } }, "SwarmConfig": { "Enabled": false diff --git a/geth/params/testdata/config.testnet.json b/geth/params/testdata/config.testnet.json index ca5a04002..7a53a17d3 100755 --- a/geth/params/testdata/config.testnet.json +++ b/geth/params/testdata/config.testnet.json @@ -28,17 +28,21 @@ }, "WhisperConfig": { "Enabled": true, + "IdentityFile": "", + "PasswordFile": "", "EchoMode": false, "BootstrapNode": false, "ForwarderNode": false, "MailServerNode": false, - "MailServerPassword": "", "NotificationServerNode": false, - "NotificationServerPassword": "", "DataDir": "$TMPDIR/wnode", "Port": 30379, "MinimumPoW": 0.001, - "TTL": 120 + "TTL": 120, + "FirebaseConfig": { + "AuthorizationKeyFile": "", + "NotificationTriggerURL": "https://fcm.googleapis.com/fcm/send" + } }, "SwarmConfig": { "Enabled": false diff --git a/static/html/shh-push-notifications.html b/static/html/shh-push-notifications.html new file mode 100644 index 000000000..4ec96f565 --- /dev/null +++ b/static/html/shh-push-notifications.html @@ -0,0 +1,392 @@ + + + Whisper Notification Server Test + + + + + + + + + + + + + + + +
+
+ +
+
+ + + + + diff --git a/vendor/github.com/ethereum/go-ethereum/whisper/notifications/discovery.go b/vendor/github.com/ethereum/go-ethereum/whisper/notifications/discovery.go new file mode 100644 index 000000000..e826a94aa --- /dev/null +++ b/vendor/github.com/ethereum/go-ethereum/whisper/notifications/discovery.go @@ -0,0 +1,148 @@ +package notifications + +import ( + "encoding/hex" + "encoding/json" + "errors" + "fmt" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/crypto" + "github.com/ethereum/go-ethereum/log" + whisper "github.com/ethereum/go-ethereum/whisper/whisperv5" +) + +const ( + topicDiscoverServer = "DISCOVER_NOTIFICATION_SERVER" + topicProposeServer = "PROPOSE_NOTIFICATION_SERVER" + topicServerAccepted = "ACCEPT_NOTIFICATION_SERVER" + topicAckClientSubscription = "ACK_NOTIFICATION_SERVER_SUBSCRIPTION" +) + +// discoveryService abstract notification server discovery protocol +type discoveryService struct { + server *NotificationServer + + discoverFilterID string + serverAcceptedFilterID string +} + +// messageProcessingFn is a callback used to process incoming client requests +type messageProcessingFn func(*whisper.ReceivedMessage) error + +func NewDiscoveryService(notificationServer *NotificationServer) *discoveryService { + return &discoveryService{ + server: notificationServer, + } +} + +// Start installs necessary filters to watch for incoming discovery requests, +// then in separate routine starts watcher loop +func (s *discoveryService) Start() error { + var err error + + // notification server discovery requests + s.discoverFilterID, err = s.server.installKeyFilter(topicDiscoverServer, s.server.protocolKey) + if err != nil { + return fmt.Errorf("failed installing filter: %v", err) + } + go s.server.requestProcessorLoop(s.discoverFilterID, topicDiscoverServer, s.processDiscoveryRequest) + + // notification server accept/select requests + s.serverAcceptedFilterID, err = s.server.installKeyFilter(topicServerAccepted, s.server.protocolKey) + if err != nil { + return fmt.Errorf("failed installing filter: %v", err) + } + go s.server.requestProcessorLoop(s.serverAcceptedFilterID, topicServerAccepted, s.processServerAcceptedRequest) + + log.Info("notification server discovery service started") + return nil +} + +// Stop stops all discovery processing loops +func (s *discoveryService) Stop() error { + s.server.whisper.Unsubscribe(s.discoverFilterID) + s.server.whisper.Unsubscribe(s.serverAcceptedFilterID) + + log.Info("notification server discovery service stopped") + return nil +} + +// processDiscoveryRequest processes incoming client requests of type: +// when client tries to discover suitable notification server +func (s *discoveryService) processDiscoveryRequest(msg *whisper.ReceivedMessage) error { + // offer this node as notification server + msgParams := whisper.MessageParams{ + Src: s.server.protocolKey, + Dst: msg.Src, + Topic: MakeTopic([]byte(topicProposeServer)), + Payload: []byte(`{"server": "0x` + s.server.nodeID + `"}`), + TTL: uint32(s.server.config.TTL), + PoW: s.server.config.MinimumPoW, + WorkTime: 5, + } + response := whisper.NewSentMessage(&msgParams) + env, err := response.Wrap(&msgParams) + if err != nil { + return fmt.Errorf("failed to wrap server proposal message: %v", err) + } + + if err := s.server.whisper.Send(env); err != nil { + return fmt.Errorf("failed to send server proposal message: %v", err) + } + + log.Info(fmt.Sprintf("server proposal sent (server: %v, dst: %v, topic: %x)", + s.server.nodeID, common.ToHex(crypto.FromECDSAPub(msgParams.Dst)), msgParams.Topic)) + return nil +} + +// processServerAcceptedRequest processes incoming client requests of type: +// when client is ready to select the given node as its notification server +func (s *discoveryService) processServerAcceptedRequest(msg *whisper.ReceivedMessage) error { + var parsedMessage struct { + ServerID string `json:"server"` + } + if err := json.Unmarshal(msg.Payload, &parsedMessage); err != nil { + return err + } + + if msg.Src == nil { + return errors.New("message 'from' field is required") + } + + // make sure that only requests made to the current node are processed + if parsedMessage.ServerID != `0x`+s.server.nodeID { + return nil + } + + // register client + sessionKey, err := s.server.RegisterClientSession(&ClientSession{ + ClientKey: hex.EncodeToString(crypto.FromECDSAPub(msg.Src)), + }) + if err != nil { + return err + } + + // confirm that client has been successfully subscribed + msgParams := whisper.MessageParams{ + Src: s.server.protocolKey, + Dst: msg.Src, + Topic: MakeTopic([]byte(topicAckClientSubscription)), + Payload: []byte(`{"server": "0x` + s.server.nodeID + `", "key": "0x` + hex.EncodeToString(sessionKey) + `"}`), + TTL: uint32(s.server.config.TTL), + PoW: s.server.config.MinimumPoW, + WorkTime: 5, + } + response := whisper.NewSentMessage(&msgParams) + env, err := response.Wrap(&msgParams) + if err != nil { + return fmt.Errorf("failed to wrap server proposal message: %v", err) + } + + if err := s.server.whisper.Send(env); err != nil { + return fmt.Errorf("failed to send server proposal message: %v", err) + } + + log.Info(fmt.Sprintf("server confirms client subscription (dst: %v, topic: %x)", msgParams.Dst, msgParams.Topic)) + return nil +} diff --git a/vendor/github.com/ethereum/go-ethereum/whisper/notifications/provider.go b/vendor/github.com/ethereum/go-ethereum/whisper/notifications/provider.go new file mode 100644 index 000000000..032463a6e --- /dev/null +++ b/vendor/github.com/ethereum/go-ethereum/whisper/notifications/provider.go @@ -0,0 +1,59 @@ +package notifications + +import ( + "bytes" + "fmt" + "io/ioutil" + "net/http" + "strings" + + "github.com/ethereum/go-ethereum/log" + "github.com/status-im/status-go/geth/params" +) + +// NotificationDeliveryProvider handles the notification delivery +type NotificationDeliveryProvider interface { + Send(id string, payload string) error +} + +// FirebaseProvider represents FCM provider +type FirebaseProvider struct { + AuthorizationKey string + NotificationTriggerURL string +} + +// NewFirebaseProvider creates new FCM provider +func NewFirebaseProvider(config *params.FirebaseConfig) *FirebaseProvider { + authorizationKey, _ := config.ReadAuthorizationKeyFile() + return &FirebaseProvider{ + NotificationTriggerURL: config.NotificationTriggerURL, + AuthorizationKey: string(authorizationKey), + } +} + +// Send triggers sending of Push Notification to a given device id +func (p *FirebaseProvider) Send(id string, payload string) (err error) { + defer func() { + if r := recover(); r != nil { + err = fmt.Errorf("panic: %v", r) + } + }() + + jsonRequest := strings.Replace(payload, "{{ ID }}", id, 3) + req, err := http.NewRequest("POST", p.NotificationTriggerURL, bytes.NewBuffer([]byte(jsonRequest))) + req.Header.Set("Authorization", "key="+p.AuthorizationKey) + req.Header.Set("Content-Type", "application/json") + + client := &http.Client{} + resp, err := client.Do(req) + if err != nil { + return err + } + defer resp.Body.Close() + + log.Debug("FCM response", "status", resp.Status, "header", resp.Header) + body, _ := ioutil.ReadAll(resp.Body) + log.Debug("FCM response body", "body", string(body)) + + return nil +} diff --git a/vendor/github.com/ethereum/go-ethereum/whisper/notifications/server.go b/vendor/github.com/ethereum/go-ethereum/whisper/notifications/server.go new file mode 100644 index 000000000..218620427 --- /dev/null +++ b/vendor/github.com/ethereum/go-ethereum/whisper/notifications/server.go @@ -0,0 +1,581 @@ +package notifications + +import ( + "errors" + "fmt" + "sync" + "time" + + "crypto/ecdsa" + "encoding/hex" + "encoding/json" + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/crypto" + "github.com/ethereum/go-ethereum/log" + "github.com/ethereum/go-ethereum/p2p" + whisper "github.com/ethereum/go-ethereum/whisper/whisperv5" + "github.com/status-im/status-go/geth/params" +) + +const ( + topicSendNotification = "SEND_NOTIFICATION" + topicNewChatSession = "NEW_CHAT_SESSION" + topicAckNewChatSession = "ACK_NEW_CHAT_SESSION" + topicNewDeviceRegistration = "NEW_DEVICE_REGISTRATION" + topicAckDeviceRegistration = "ACK_DEVICE_REGISTRATION" + topicCheckClientSession = "CHECK_CLIENT_SESSION" + topicConfirmClientSession = "CONFIRM_CLIENT_SESSION" + topicDropClientSession = "DROP_CLIENT_SESSION" +) + +var ( + ErrServiceInitError = errors.New("notification service has not been properly initialized") +) + +// NotificationServer service capable of handling Push Notifications +type NotificationServer struct { + whisper *whisper.Whisper + config *params.WhisperConfig + + nodeID string // proposed server will feature this ID + discovery *discoveryService // discovery service handles client/server negotiation, when server is selected + protocolKey *ecdsa.PrivateKey // private key of service, used to encode handshake communication + + clientSessions map[string]*ClientSession + clientSessionsMu sync.RWMutex + + chatSessions map[string]*ChatSession + chatSessionsMu sync.RWMutex + + deviceSubscriptions map[string]*DeviceSubscription + deviceSubscriptionsMu sync.RWMutex + + firebaseProvider NotificationDeliveryProvider + + quit chan struct{} +} + +// ClientSession abstracts notification client, which expects notifications whenever +// some envelope can be decoded with session key (key hash is compared for optimization) +type ClientSession struct { + ClientKey string // public key uniquely identifying a client + SessionKey []byte // actual symkey used for client - server communication + SessionKeyHash common.Hash // The Keccak256Hash of the symmetric key, which is shared between server/client + SessionKeyInput []byte // raw symkey used as input for actual SessionKey +} + +// ChatSession abstracts chat session, which some previously registered client can create. +// ChatSession is used by client for sharing common secret, allowing others to register +// themselves and eventually to trigger notifications. +type ChatSession struct { + ParentKey string // public key uniquely identifying a client session used to create a chat session + ChatKey string // ID that uniquely identifies a chat session + SessionKey []byte // actual symkey used for client - server communication + SessionKeyHash common.Hash // The Keccak256Hash of the symmetric key, which is shared between server/client +} + +// DeviceSubscription stores enough information about a device (or group of devices), +// so that Notification Server can trigger notification on that device(s) +type DeviceSubscription struct { + DeviceID string // ID that will be used as destination + ChatSessionKeyHash common.Hash // The Keccak256Hash of the symmetric key, which is shared between server/client + PubKey *ecdsa.PublicKey // public key of subscriber (to filter out when notification is triggered) +} + +// Init used for service initialization, making sure it is safe to call Start() +func (s *NotificationServer) Init(whisperService *whisper.Whisper, whisperConfig *params.WhisperConfig) { + s.whisper = whisperService + s.config = whisperConfig + + s.discovery = NewDiscoveryService(s) + s.clientSessions = make(map[string]*ClientSession) + s.chatSessions = make(map[string]*ChatSession) + s.deviceSubscriptions = make(map[string]*DeviceSubscription) + s.quit = make(chan struct{}) + + // setup providers (FCM only, for now) + s.firebaseProvider = NewFirebaseProvider(whisperConfig.FirebaseConfig) +} + +// Start begins notification loop, in a separate go routine +func (s *NotificationServer) Start(stack *p2p.Server) error { + if s.whisper == nil { + return ErrServiceInitError + } + + // configure nodeID + if stack != nil { + if nodeInfo := stack.NodeInfo(); nodeInfo != nil { + s.nodeID = nodeInfo.ID + } + } + + // configure keys + identity, err := s.config.ReadIdentityFile() + if err != nil { + return err + } + s.whisper.AddKeyPair(identity) + s.protocolKey = identity + log.Info("protocol pubkey", "key", common.ToHex(crypto.FromECDSAPub(&s.protocolKey.PublicKey))) + + // start discovery protocol + s.discovery.Start() + + // client session status requests + clientSessionStatusFilterID, err := s.installKeyFilter(topicCheckClientSession, s.protocolKey) + if err != nil { + return fmt.Errorf("failed installing filter: %v", err) + } + go s.requestProcessorLoop(clientSessionStatusFilterID, topicDiscoverServer, s.processClientSessionStatusRequest) + + // client session remove requests + dropClientSessionFilterID, err := s.installKeyFilter(topicDropClientSession, s.protocolKey) + if err != nil { + return fmt.Errorf("failed installing filter: %v", err) + } + go s.requestProcessorLoop(dropClientSessionFilterID, topicDropClientSession, s.processDropClientSessionRequest) + + log.Info("Whisper Notification Server started") + return nil +} + +// Stop handles stopping the running notification loop, and all related resources +func (s *NotificationServer) Stop() error { + close(s.quit) + + if s.whisper == nil { + return ErrServiceInitError + } + + if s.discovery != nil { + s.discovery.Stop() + } + + log.Info("Whisper Notification Server stopped") + return nil +} + +// RegisterClientSession forms a cryptographic link between server and client. +// It does so by sharing a session SymKey and installing filter listening for messages +// encrypted with that key. So, both server and client have a secure way to communicate. +func (s *NotificationServer) RegisterClientSession(session *ClientSession) (sessionKey []byte, err error) { + s.clientSessionsMu.Lock() + defer s.clientSessionsMu.Unlock() + + // generate random symmetric session key + keyName := fmt.Sprintf("%s-%s", "ntfy-client", crypto.Keccak256Hash([]byte(session.ClientKey)).Hex()) + sessionKey, sessionKeyDerived, err := s.makeSessionKey(keyName) + if err != nil { + return nil, err + } + + // populate session key hash (will be used to match decrypted message to a given client id) + session.SessionKeyInput = sessionKey + session.SessionKeyHash = crypto.Keccak256Hash(sessionKeyDerived) + session.SessionKey = sessionKeyDerived + + // append to list of known clients + // so that it is trivial to go key hash -> client session info + id := session.SessionKeyHash.Hex() + s.clientSessions[id] = session + + // setup filter, which will get all incoming messages, that are encrypted with SymKey + filterID, err := s.installTopicFilter(topicNewChatSession, sessionKeyDerived) + if err != nil { + return nil, fmt.Errorf("failed installing filter: %v", err) + } + go s.requestProcessorLoop(filterID, topicNewChatSession, s.processNewChatSessionRequest) + return +} + +// RegisterChatSession forms a cryptographic link between server and client. +// This link is meant to be shared with other clients, so that they can use +// the shared SymKey to trigger notifications for devices attached to a given +// chat session. +func (s *NotificationServer) RegisterChatSession(session *ChatSession) (sessionKey []byte, err error) { + s.chatSessionsMu.Lock() + defer s.chatSessionsMu.Unlock() + + // generate random symmetric session key + keyName := fmt.Sprintf("%s-%s", "ntfy-chat", crypto.Keccak256Hash([]byte(session.ParentKey+session.ChatKey)).Hex()) + sessionKey, sessionKeyDerived, err := s.makeSessionKey(keyName) + if err != nil { + return nil, err + } + + // populate session key hash (will be used to match decrypted message to a given client id) + session.SessionKeyHash = crypto.Keccak256Hash(sessionKeyDerived) + session.SessionKey = sessionKeyDerived + + // append to list of known clients + // so that it is trivial to go key hash -> client session info + id := session.SessionKeyHash.Hex() + s.chatSessions[id] = session + + // setup filter, to process incoming device registration requests + filterID1, err := s.installTopicFilter(topicNewDeviceRegistration, sessionKeyDerived) + if err != nil { + return nil, fmt.Errorf("failed installing filter: %v", err) + } + go s.requestProcessorLoop(filterID1, topicNewDeviceRegistration, s.processNewDeviceRegistrationRequest) + + // setup filter, to process incoming notification trigger requests + filterID2, err := s.installTopicFilter(topicSendNotification, sessionKeyDerived) + if err != nil { + return nil, fmt.Errorf("failed installing filter: %v", err) + } + go s.requestProcessorLoop(filterID2, topicSendNotification, s.processSendNotificationRequest) + + return +} + +// RegisterDeviceSubscription persists device id, so that it can be used to trigger notifications. +func (s *NotificationServer) RegisterDeviceSubscription(subscription *DeviceSubscription) error { + s.deviceSubscriptionsMu.Lock() + defer s.deviceSubscriptionsMu.Unlock() + + // if one passes the same id again, we will just overwrite + id := fmt.Sprintf("%s-%s", "ntfy-device", + crypto.Keccak256Hash([]byte(subscription.ChatSessionKeyHash.Hex()+subscription.DeviceID)).Hex()) + s.deviceSubscriptions[id] = subscription + + log.Info("device registered", "device", subscription.DeviceID) + return nil +} + +// DropClientSession uninstalls session +func (s *NotificationServer) DropClientSession(id string) { + dropChatSessions := func(parentKey string) { + s.chatSessionsMu.Lock() + defer s.chatSessionsMu.Unlock() + + for key, chatSession := range s.chatSessions { + if chatSession.ParentKey == parentKey { + delete(s.chatSessions, key) + log.Info("drop chat session", "key", key) + } + } + } + + dropDeviceSubscriptions := func(parentKey string) { + s.deviceSubscriptionsMu.Lock() + defer s.deviceSubscriptionsMu.Unlock() + + for key, subscription := range s.deviceSubscriptions { + if hex.EncodeToString(crypto.FromECDSAPub(subscription.PubKey)) == parentKey { + delete(s.deviceSubscriptions, key) + log.Info("drop device subscription", "key", key) + } + } + } + + s.clientSessionsMu.Lock() + if session, ok := s.clientSessions[id]; ok { + delete(s.clientSessions, id) + log.Info("server drops client session", "id", id) + s.clientSessionsMu.Unlock() + + dropDeviceSubscriptions(session.ClientKey) + dropChatSessions(session.ClientKey) + } +} + +// processNewChatSessionRequest processes incoming client requests of type: +// client has a session key, and ready to create a new chat session (which is +// a bag of subscribed devices, basically) +func (s *NotificationServer) processNewChatSessionRequest(msg *whisper.ReceivedMessage) error { + s.clientSessionsMu.RLock() + defer s.clientSessionsMu.RUnlock() + + var parsedMessage struct { + ChatID string `json:"chat"` + } + if err := json.Unmarshal(msg.Payload, &parsedMessage); err != nil { + return err + } + + if msg.Src == nil { + return errors.New("message 'from' field is required") + } + + clientSession, ok := s.clientSessions[msg.SymKeyHash.Hex()] + if !ok { + return errors.New("client session not found") + } + + // register chat session + parentKey := hex.EncodeToString(crypto.FromECDSAPub(msg.Src)) + sessionKey, err := s.RegisterChatSession(&ChatSession{ + ParentKey: parentKey, + ChatKey: parsedMessage.ChatID, + }) + if err != nil { + return err + } + + // confirm that chat has been successfully created + msgParams := whisper.MessageParams{ + Dst: msg.Src, + KeySym: clientSession.SessionKey, + Topic: MakeTopic([]byte(topicAckNewChatSession)), + Payload: []byte(`{"server": "0x` + s.nodeID + `", "key": "0x` + hex.EncodeToString(sessionKey) + `"}`), + TTL: uint32(s.config.TTL), + PoW: s.config.MinimumPoW, + WorkTime: 5, + } + response := whisper.NewSentMessage(&msgParams) + env, err := response.Wrap(&msgParams) + if err != nil { + return fmt.Errorf("failed to wrap server response message: %v", err) + } + + if err := s.whisper.Send(env); err != nil { + return fmt.Errorf("failed to send server response message: %v", err) + } + + log.Info("server confirms chat creation", "dst", + common.ToHex(crypto.FromECDSAPub(msgParams.Dst)), "topic", msgParams.Topic.String()) + return nil +} + +// processNewDeviceRegistrationRequest processes incoming client requests of type: +// client has a session key, creates chat, and obtains chat SymKey (to be shared with +// others). Then using that chat SymKey client registers it's device ID with server. +func (s *NotificationServer) processNewDeviceRegistrationRequest(msg *whisper.ReceivedMessage) error { + s.chatSessionsMu.RLock() + defer s.chatSessionsMu.RUnlock() + + var parsedMessage struct { + DeviceID string `json:"device"` + } + if err := json.Unmarshal(msg.Payload, &parsedMessage); err != nil { + return err + } + + if msg.Src == nil { + return errors.New("message 'from' field is required") + } + + chatSession, ok := s.chatSessions[msg.SymKeyHash.Hex()] + if !ok { + return errors.New("chat session not found") + } + + if len(parsedMessage.DeviceID) <= 0 { + return errors.New("'device' cannot be empty") + } + + // register chat session + err := s.RegisterDeviceSubscription(&DeviceSubscription{ + DeviceID: parsedMessage.DeviceID, + ChatSessionKeyHash: chatSession.SessionKeyHash, + PubKey: msg.Src, + }) + if err != nil { + return err + } + + // confirm that client has been successfully subscribed + msgParams := whisper.MessageParams{ + Dst: msg.Src, + KeySym: chatSession.SessionKey, + Topic: MakeTopic([]byte(topicAckDeviceRegistration)), + Payload: []byte(`{"server": "0x` + s.nodeID + `"}`), + TTL: uint32(s.config.TTL), + PoW: s.config.MinimumPoW, + WorkTime: 5, + } + response := whisper.NewSentMessage(&msgParams) + env, err := response.Wrap(&msgParams) + if err != nil { + return fmt.Errorf("failed to wrap server response message: %v", err) + } + + if err := s.whisper.Send(env); err != nil { + return fmt.Errorf("failed to send server response message: %v", err) + } + + log.Info("server confirms device registration", "dst", + common.ToHex(crypto.FromECDSAPub(msgParams.Dst)), "topic", msgParams.Topic.String()) + return nil +} + +// processSendNotificationRequest processes incoming client requests of type: +// when client has session key, and ready to use it to send notifications +func (s *NotificationServer) processSendNotificationRequest(msg *whisper.ReceivedMessage) error { + s.deviceSubscriptionsMu.RLock() + defer s.deviceSubscriptionsMu.RUnlock() + + for _, subscriber := range s.deviceSubscriptions { + if subscriber.ChatSessionKeyHash == msg.SymKeyHash { + if whisper.IsPubKeyEqual(msg.Src, subscriber.PubKey) { + continue // no need to notify ourselves + } + + if s.firebaseProvider != nil { + err := s.firebaseProvider.Send(subscriber.DeviceID, string(msg.Payload)) + if err != nil { + log.Info("cannot send notification", "error", err) + } + } + } + } + + return nil +} + +// processClientSessionStatusRequest processes incoming client requests when: +// client wants to learn whether it is already registered on some of the servers +func (s *NotificationServer) processClientSessionStatusRequest(msg *whisper.ReceivedMessage) error { + s.clientSessionsMu.RLock() + defer s.clientSessionsMu.RUnlock() + + if msg.Src == nil { + return errors.New("message 'from' field is required") + } + + var sessionKey []byte + pubKey := hex.EncodeToString(crypto.FromECDSAPub(msg.Src)) + for _, clientSession := range s.clientSessions { + if clientSession.ClientKey == pubKey { + sessionKey = clientSession.SessionKeyInput + break + } + } + + // session is not found + if sessionKey == nil { + return nil + } + + // let client know that we have session for a given public key + msgParams := whisper.MessageParams{ + Src: s.protocolKey, + Dst: msg.Src, + Topic: MakeTopic([]byte(topicConfirmClientSession)), + Payload: []byte(`{"server": "0x` + s.nodeID + `", "key": "0x` + hex.EncodeToString(sessionKey) + `"}`), + TTL: uint32(s.config.TTL), + PoW: s.config.MinimumPoW, + WorkTime: 5, + } + response := whisper.NewSentMessage(&msgParams) + env, err := response.Wrap(&msgParams) + if err != nil { + return fmt.Errorf("failed to wrap server response message: %v", err) + } + + if err := s.whisper.Send(env); err != nil { + return fmt.Errorf("failed to send server response message: %v", err) + } + + log.Info("server confirms client session", "dst", + common.ToHex(crypto.FromECDSAPub(msgParams.Dst)), "topic", msgParams.Topic.String()) + return nil +} + +// processDropClientSessionRequest processes incoming client requests when: +// client wants to drop its sessions with notification servers (if they exist) +func (s *NotificationServer) processDropClientSessionRequest(msg *whisper.ReceivedMessage) error { + if msg.Src == nil { + return errors.New("message 'from' field is required") + } + + s.clientSessionsMu.RLock() + pubKey := hex.EncodeToString(crypto.FromECDSAPub(msg.Src)) + for _, clientSession := range s.clientSessions { + if clientSession.ClientKey == pubKey { + s.clientSessionsMu.RUnlock() + s.DropClientSession(clientSession.SessionKeyHash.Hex()) + break + } + } + return nil +} + +// installTopicFilter installs Whisper filter using symmetric key +func (s *NotificationServer) installTopicFilter(topicName string, topicKey []byte) (filterID string, err error) { + topic := MakeTopicAsBytes([]byte(topicName)) + filter := whisper.Filter{ + KeySym: topicKey, + Topics: [][]byte{topic}, + AllowP2P: true, + } + filterID, err = s.whisper.Subscribe(&filter) + if err != nil { + return "", fmt.Errorf("failed installing filter: %v", err) + } + + log.Debug(fmt.Sprintf("installed topic filter %v for topic %x (%s)", filterID, topic, topicName)) + return +} + +// installKeyFilter installs Whisper filter using asymmetric key +func (s *NotificationServer) installKeyFilter(topicName string, key *ecdsa.PrivateKey) (filterID string, err error) { + topic := MakeTopicAsBytes([]byte(topicName)) + filter := whisper.Filter{ + KeyAsym: key, + Topics: [][]byte{topic}, + AllowP2P: true, + } + filterID, err = s.whisper.Subscribe(&filter) + if err != nil { + return "", fmt.Errorf("failed installing filter: %v", err) + } + + log.Info(fmt.Sprintf("installed key filter %v for topic %x (%s)", filterID, topic, topicName)) + return +} + +// requestProcessorLoop processes incoming client requests, by listening to a given filter, +// and executing process function on each incoming message +func (s *NotificationServer) requestProcessorLoop(filterID string, topicWatched string, fn messageProcessingFn) { + log.Debug(fmt.Sprintf("request processor started: %s", topicWatched)) + + filter := s.whisper.GetFilter(filterID) + if filter == nil { + log.Warn(fmt.Sprintf("filter is not installed: %s (for topic '%s')", filterID, topicWatched)) + return + } + + ticker := time.NewTicker(time.Millisecond * 50) + + for { + select { + case <-ticker.C: + messages := filter.Retrieve() + for _, msg := range messages { + if err := fn(msg); err != nil { + log.Warn("failed processing incoming request", "error", err) + } + } + case <-s.quit: + log.Debug("request processor stopped", "topic", topicWatched) + return + } + } +} + +// makeSessionKey generates and saves random SymKey, allowing to establish secure +// channel between server and client +func (s *NotificationServer) makeSessionKey(keyName string) (sessionKey, sessionKeyDerived []byte, err error) { + // wipe out previous occurrence of symmetric key + s.whisper.DeleteSymKey(keyName) + + sessionKey, err = makeSessionKey() + if err != nil { + return nil, nil, err + } + + keyName, err = s.whisper.AddSymKey(keyName, sessionKey) + if err != nil { + return nil, nil, err + } + + sessionKeyDerived, err = s.whisper.GetSymKey(keyName) + if err != nil { + return nil, nil, err + } + + return +} diff --git a/vendor/github.com/ethereum/go-ethereum/whisper/notifications/utils.go b/vendor/github.com/ethereum/go-ethereum/whisper/notifications/utils.go new file mode 100644 index 000000000..7dbd4f916 --- /dev/null +++ b/vendor/github.com/ethereum/go-ethereum/whisper/notifications/utils.go @@ -0,0 +1,72 @@ +package notifications + +import ( + "crypto/sha512" + "errors" + + crand "crypto/rand" + whisper "github.com/ethereum/go-ethereum/whisper/whisperv5" + "golang.org/x/crypto/pbkdf2" +) + +// makeSessionKey returns pseudo-random symmetric key, which is used as +// session key between notification client and server +func makeSessionKey() ([]byte, error) { + // generate random key + const keyLen = 32 + const size = keyLen * 2 + buf := make([]byte, size) + _, err := crand.Read(buf) + if err != nil { + return nil, err + } else if !validateSymmetricKey(buf) { + return nil, errors.New("error in GenerateSymKey: crypto/rand failed to generate random data") + } + + key := buf[:keyLen] + salt := buf[keyLen:] + derived, err := whisper.DeriveOneTimeKey(key, salt, whisper.EnvelopeVersion) + if err != nil { + return nil, err + } else if !validateSymmetricKey(derived) { + return nil, errors.New("failed to derive valid key") + } + + return derived, nil +} + +// validateSymmetricKey returns false if the key contains all zeros +func validateSymmetricKey(k []byte) bool { + return len(k) > 0 && !containsOnlyZeros(k) +} + +// containsOnlyZeros checks if data is empty or not +func containsOnlyZeros(data []byte) bool { + for _, b := range data { + if b != 0 { + return false + } + } + return true +} + +// MakeTopic returns Whisper topic *as bytes array* by generating cryptographic key from the provided password +func MakeTopicAsBytes(password []byte) ([]byte) { + topic := make([]byte, int(whisper.TopicLength)) + x := pbkdf2.Key(password, password, 8196, 128, sha512.New) + for i := 0; i < len(x); i++ { + topic[i%whisper.TopicLength] ^= x[i] + } + + return topic +} + +// MakeTopic returns Whisper topic by generating cryptographic key from the provided password +func MakeTopic(password []byte) (topic whisper.TopicType) { + x := pbkdf2.Key(password, password, 8196, 128, sha512.New) + for i := 0; i < len(x); i++ { + topic[i%whisper.TopicLength] ^= x[i] + } + + return +} diff --git a/vendor/github.com/ethereum/go-ethereum/whisper/whisperv5/api.go b/vendor/github.com/ethereum/go-ethereum/whisper/whisperv5/api.go index dda388a41..a1f24fdb8 100644 --- a/vendor/github.com/ethereum/go-ethereum/whisper/whisperv5/api.go +++ b/vendor/github.com/ethereum/go-ethereum/whisper/whisperv5/api.go @@ -274,6 +274,11 @@ func (api *PublicWhisperAPI) Subscribe(args WhisperFilterArgs) (string, error) { return api.whisper.Subscribe(&filter) } +// UninstallFilter is alias for Unsubscribe +func (api *PublicWhisperAPI) UninstallFilter(id string) { + api.Unsubscribe(id) +} + // Unsubscribe disables and removes an existing filter. func (api *PublicWhisperAPI) Unsubscribe(id string) { api.whisper.Unsubscribe(id) diff --git a/vendor/github.com/ethereum/go-ethereum/whisper/whisperv5/doc.go b/vendor/github.com/ethereum/go-ethereum/whisper/whisperv5/doc.go index 1346c5411..777074ea8 100644 --- a/vendor/github.com/ethereum/go-ethereum/whisper/whisperv5/doc.go +++ b/vendor/github.com/ethereum/go-ethereum/whisper/whisperv5/doc.go @@ -32,6 +32,8 @@ package whisperv5 import ( "fmt" "time" + + "github.com/ethereum/go-ethereum/p2p" ) const ( @@ -87,3 +89,15 @@ type MailServer interface { Archive(env *Envelope) DeliverMail(whisperPeer *Peer, request *Envelope) } + +// NotificationServer represents a notification server, +// capable of screening incoming envelopes for special +// topics, and once located, subscribe client nodes as +// recipients to notifications (push notifications atm) +type NotificationServer interface { + // Start initializes notification sending loop + Start(server *p2p.Server) error + + // Stop stops notification sending loop, releasing related resources + Stop() error +} diff --git a/vendor/github.com/ethereum/go-ethereum/whisper/whisperv5/whisper.go b/vendor/github.com/ethereum/go-ethereum/whisper/whisperv5/whisper.go index 0f5b497d5..b63197ad8 100644 --- a/vendor/github.com/ethereum/go-ethereum/whisper/whisperv5/whisper.go +++ b/vendor/github.com/ethereum/go-ethereum/whisper/whisperv5/whisper.go @@ -71,7 +71,8 @@ type Whisper struct { stats Statistics // Statistics of whisper node - mailServer MailServer // MailServer interface + mailServer MailServer // MailServer interface + notificationServer NotificationServer } // New creates a Whisper client ready to communicate through the Ethereum P2P network. @@ -119,6 +120,11 @@ func (w *Whisper) RegisterServer(server MailServer) { w.mailServer = server } +// RegisterNotificationServer registers notification server with Whisper +func (w *Whisper) RegisterNotificationServer(server NotificationServer) { + w.notificationServer = server +} + // Protocols returns the whisper sub-protocols ran by this particular client. func (w *Whisper) Protocols() []p2p.Protocol { return []p2p.Protocol{w.protocol} @@ -492,6 +498,12 @@ func (w *Whisper) Start(stack *p2p.Server) error { go w.processQueue() } + if w.notificationServer != nil { + if err := w.notificationServer.Start(stack); err != nil { + return err + } + } + return nil } @@ -499,6 +511,13 @@ func (w *Whisper) Start(stack *p2p.Server) error { // of the Whisper protocol. func (w *Whisper) Stop() error { close(w.quit) + + if w.notificationServer != nil { + if err := w.notificationServer.Stop(); err != nil { + return err + } + } + log.Info("whisper stopped") return nil }