diff --git a/whisper/notifications/discovery.go b/whisper/notifications/discovery.go --- a/whisper/notifications/discovery.go 1970-01-01 01:00:00.000000000 +0100 +++ b/whisper/notifications/discovery.go 2017-09-19 17:03:51.000000000 +0200 @@ -0,0 +1,154 @@ +package notifications + +import ( + "encoding/hex" + "encoding/json" + "errors" + "fmt" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/crypto" + "github.com/ethereum/go-ethereum/log" + whisper "github.com/ethereum/go-ethereum/whisper/whisperv5" +) + +const ( + topicDiscoverServer = "DISCOVER_NOTIFICATION_SERVER" + topicProposeServer = "PROPOSE_NOTIFICATION_SERVER" + topicServerAccepted = "ACCEPT_NOTIFICATION_SERVER" + topicAckClientSubscription = "ACK_NOTIFICATION_SERVER_SUBSCRIPTION" +) + +// discoveryService abstract notification server discovery protocol +type discoveryService struct { + server *NotificationServer + + discoverFilterID string + serverAcceptedFilterID string +} + +// messageProcessingFn is a callback used to process incoming client requests +type messageProcessingFn func(*whisper.ReceivedMessage) error + +func NewDiscoveryService(notificationServer *NotificationServer) *discoveryService { + return &discoveryService{ + server: notificationServer, + } +} + +// Start installs necessary filters to watch for incoming discovery requests, +// then in separate routine starts watcher loop +func (s *discoveryService) Start() error { + var err error + + // notification server discovery requests + s.discoverFilterID, err = s.server.installKeyFilter(topicDiscoverServer, s.server.protocolKey) + if err != nil { + return fmt.Errorf("failed installing filter: %v", err) + } + go s.server.requestProcessorLoop(s.discoverFilterID, topicDiscoverServer, s.processDiscoveryRequest) + + // notification server accept/select requests + s.serverAcceptedFilterID, err = s.server.installKeyFilter(topicServerAccepted, s.server.protocolKey) + if err != nil { + return fmt.Errorf("failed installing filter: %v", err) + } + go s.server.requestProcessorLoop(s.serverAcceptedFilterID, topicServerAccepted, s.processServerAcceptedRequest) + + log.Info("notification server discovery service started") + return nil +} + +// Stop stops all discovery processing loops +func (s *discoveryService) Stop() error { + s.server.whisper.Unsubscribe(s.discoverFilterID) + s.server.whisper.Unsubscribe(s.serverAcceptedFilterID) + + log.Info("notification server discovery service stopped") + return nil +} + +// processDiscoveryRequest processes incoming client requests of type: +// when client tries to discover suitable notification server +func (s *discoveryService) processDiscoveryRequest(msg *whisper.ReceivedMessage) error { + // offer this node as notification server + msgParams := whisper.MessageParams{ + Src: s.server.protocolKey, + Dst: msg.Src, + Topic: MakeTopic([]byte(topicProposeServer)), + Payload: []byte(`{"server": "0x` + s.server.nodeID + `"}`), + TTL: uint32(s.server.config.TTL), + PoW: s.server.config.MinimumPoW, + WorkTime: 5, + } + response, err := whisper.NewSentMessage(&msgParams) + if err != nil { + return fmt.Errorf("failed to create proposal message: %v", err) + } + env, err := response.Wrap(&msgParams) + if err != nil { + return fmt.Errorf("failed to wrap server proposal message: %v", err) + } + + if err := s.server.whisper.Send(env); err != nil { + return fmt.Errorf("failed to send server proposal message: %v", err) + } + + log.Info(fmt.Sprintf("server proposal sent (server: %v, dst: %v, topic: %x)", + s.server.nodeID, common.ToHex(crypto.FromECDSAPub(msgParams.Dst)), msgParams.Topic)) + return nil +} + +// processServerAcceptedRequest processes incoming client requests of type: +// when client is ready to select the given node as its notification server +func (s *discoveryService) processServerAcceptedRequest(msg *whisper.ReceivedMessage) error { + var parsedMessage struct { + ServerID string `json:"server"` + } + if err := json.Unmarshal(msg.Payload, &parsedMessage); err != nil { + return err + } + + if msg.Src == nil { + return errors.New("message 'from' field is required") + } + + // make sure that only requests made to the current node are processed + if parsedMessage.ServerID != `0x`+s.server.nodeID { + return nil + } + + // register client + sessionKey, err := s.server.RegisterClientSession(&ClientSession{ + ClientKey: hex.EncodeToString(crypto.FromECDSAPub(msg.Src)), + }) + if err != nil { + return err + } + + // confirm that client has been successfully subscribed + msgParams := whisper.MessageParams{ + Src: s.server.protocolKey, + Dst: msg.Src, + Topic: MakeTopic([]byte(topicAckClientSubscription)), + Payload: []byte(`{"server": "0x` + s.server.nodeID + `", "key": "0x` + hex.EncodeToString(sessionKey) + `"}`), + TTL: uint32(s.server.config.TTL), + PoW: s.server.config.MinimumPoW, + WorkTime: 5, + } + response, err := whisper.NewSentMessage(&msgParams) + if err != nil { + return fmt.Errorf("failed to create server proposal message: %v", err) + } + env, err := response.Wrap(&msgParams) + if err != nil { + return fmt.Errorf("failed to wrap server proposal message: %v", err) + } + + if err := s.server.whisper.Send(env); err != nil { + return fmt.Errorf("failed to send server proposal message: %v", err) + } + + log.Info(fmt.Sprintf("server confirms client subscription (dst: %v, topic: %x)", msgParams.Dst, msgParams.Topic)) + return nil +} diff --git a/whisper/notifications/provider.go b/whisper/notifications/provider.go --- a/whisper/notifications/provider.go 1970-01-01 01:00:00.000000000 +0100 +++ b/whisper/notifications/provider.go 2017-09-19 17:03:51.000000000 +0200 @@ -0,0 +1,59 @@ +package notifications + +import ( + "bytes" + "fmt" + "io/ioutil" + "net/http" + "strings" + + "github.com/ethereum/go-ethereum/log" + "github.com/status-im/status-go/geth/params" +) + +// NotificationDeliveryProvider handles the notification delivery +type NotificationDeliveryProvider interface { + Send(id string, payload string) error +} + +// FirebaseProvider represents FCM provider +type FirebaseProvider struct { + AuthorizationKey string + NotificationTriggerURL string +} + +// NewFirebaseProvider creates new FCM provider +func NewFirebaseProvider(config *params.FirebaseConfig) *FirebaseProvider { + authorizationKey, _ := config.ReadAuthorizationKeyFile() + return &FirebaseProvider{ + NotificationTriggerURL: config.NotificationTriggerURL, + AuthorizationKey: string(authorizationKey), + } +} + +// Send triggers sending of Push Notification to a given device id +func (p *FirebaseProvider) Send(id string, payload string) (err error) { + defer func() { + if r := recover(); r != nil { + err = fmt.Errorf("panic: %v", r) + } + }() + + jsonRequest := strings.Replace(payload, "{{ ID }}", id, 3) + req, err := http.NewRequest("POST", p.NotificationTriggerURL, bytes.NewBuffer([]byte(jsonRequest))) + req.Header.Set("Authorization", "key="+p.AuthorizationKey) + req.Header.Set("Content-Type", "application/json") + + client := &http.Client{} + resp, err := client.Do(req) + if err != nil { + return err + } + defer resp.Body.Close() + + log.Debug("FCM response", "status", resp.Status, "header", resp.Header) + body, _ := ioutil.ReadAll(resp.Body) + log.Debug("FCM response body", "body", string(body)) + + return nil +} diff --git a/whisper/notifications/server.go b/whisper/notifications/server.go --- a/whisper/notifications/server.go 1970-01-01 01:00:00.000000000 +0100 +++ b/whisper/notifications/server.go 2017-09-19 17:03:51.000000000 +0200 @@ -0,0 +1,590 @@ +package notifications + +import ( + "errors" + "fmt" + "sync" + "time" + + "crypto/ecdsa" + "encoding/hex" + "encoding/json" + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/crypto" + "github.com/ethereum/go-ethereum/log" + "github.com/ethereum/go-ethereum/p2p" + whisper "github.com/ethereum/go-ethereum/whisper/whisperv5" + "github.com/status-im/status-go/geth/params" +) + +const ( + topicSendNotification = "SEND_NOTIFICATION" + topicNewChatSession = "NEW_CHAT_SESSION" + topicAckNewChatSession = "ACK_NEW_CHAT_SESSION" + topicNewDeviceRegistration = "NEW_DEVICE_REGISTRATION" + topicAckDeviceRegistration = "ACK_DEVICE_REGISTRATION" + topicCheckClientSession = "CHECK_CLIENT_SESSION" + topicConfirmClientSession = "CONFIRM_CLIENT_SESSION" + topicDropClientSession = "DROP_CLIENT_SESSION" +) + +var ( + ErrServiceInitError = errors.New("notification service has not been properly initialized") +) + +// NotificationServer service capable of handling Push Notifications +type NotificationServer struct { + whisper *whisper.Whisper + config *params.WhisperConfig + + nodeID string // proposed server will feature this ID + discovery *discoveryService // discovery service handles client/server negotiation, when server is selected + protocolKey *ecdsa.PrivateKey // private key of service, used to encode handshake communication + + clientSessions map[string]*ClientSession + clientSessionsMu sync.RWMutex + + chatSessions map[string]*ChatSession + chatSessionsMu sync.RWMutex + + deviceSubscriptions map[string]*DeviceSubscription + deviceSubscriptionsMu sync.RWMutex + + firebaseProvider NotificationDeliveryProvider + + quit chan struct{} +} + +// ClientSession abstracts notification client, which expects notifications whenever +// some envelope can be decoded with session key (key hash is compared for optimization) +type ClientSession struct { + ClientKey string // public key uniquely identifying a client + SessionKey []byte // actual symkey used for client - server communication + SessionKeyHash common.Hash // The Keccak256Hash of the symmetric key, which is shared between server/client + SessionKeyInput []byte // raw symkey used as input for actual SessionKey +} + +// ChatSession abstracts chat session, which some previously registered client can create. +// ChatSession is used by client for sharing common secret, allowing others to register +// themselves and eventually to trigger notifications. +type ChatSession struct { + ParentKey string // public key uniquely identifying a client session used to create a chat session + ChatKey string // ID that uniquely identifies a chat session + SessionKey []byte // actual symkey used for client - server communication + SessionKeyHash common.Hash // The Keccak256Hash of the symmetric key, which is shared between server/client +} + +// DeviceSubscription stores enough information about a device (or group of devices), +// so that Notification Server can trigger notification on that device(s) +type DeviceSubscription struct { + DeviceID string // ID that will be used as destination + ChatSessionKeyHash common.Hash // The Keccak256Hash of the symmetric key, which is shared between server/client + PubKey *ecdsa.PublicKey // public key of subscriber (to filter out when notification is triggered) +} + +// Init used for service initialization, making sure it is safe to call Start() +func (s *NotificationServer) Init(whisperService *whisper.Whisper, whisperConfig *params.WhisperConfig) { + s.whisper = whisperService + s.config = whisperConfig + + s.discovery = NewDiscoveryService(s) + s.clientSessions = make(map[string]*ClientSession) + s.chatSessions = make(map[string]*ChatSession) + s.deviceSubscriptions = make(map[string]*DeviceSubscription) + s.quit = make(chan struct{}) + + // setup providers (FCM only, for now) + s.firebaseProvider = NewFirebaseProvider(whisperConfig.FirebaseConfig) +} + +// Start begins notification loop, in a separate go routine +func (s *NotificationServer) Start(stack *p2p.Server) error { + if s.whisper == nil { + return ErrServiceInitError + } + + // configure nodeID + if stack != nil { + if nodeInfo := stack.NodeInfo(); nodeInfo != nil { + s.nodeID = nodeInfo.ID + } + } + + // configure keys + identity, err := s.config.ReadIdentityFile() + if err != nil { + return err + } + s.whisper.AddKeyPair(identity) + s.protocolKey = identity + log.Info("protocol pubkey", "key", common.ToHex(crypto.FromECDSAPub(&s.protocolKey.PublicKey))) + + // start discovery protocol + s.discovery.Start() + + // client session status requests + clientSessionStatusFilterID, err := s.installKeyFilter(topicCheckClientSession, s.protocolKey) + if err != nil { + return fmt.Errorf("failed installing filter: %v", err) + } + go s.requestProcessorLoop(clientSessionStatusFilterID, topicDiscoverServer, s.processClientSessionStatusRequest) + + // client session remove requests + dropClientSessionFilterID, err := s.installKeyFilter(topicDropClientSession, s.protocolKey) + if err != nil { + return fmt.Errorf("failed installing filter: %v", err) + } + go s.requestProcessorLoop(dropClientSessionFilterID, topicDropClientSession, s.processDropClientSessionRequest) + + log.Info("Whisper Notification Server started") + return nil +} + +// Stop handles stopping the running notification loop, and all related resources +func (s *NotificationServer) Stop() error { + close(s.quit) + + if s.whisper == nil { + return ErrServiceInitError + } + + if s.discovery != nil { + s.discovery.Stop() + } + + log.Info("Whisper Notification Server stopped") + return nil +} + +// RegisterClientSession forms a cryptographic link between server and client. +// It does so by sharing a session SymKey and installing filter listening for messages +// encrypted with that key. So, both server and client have a secure way to communicate. +func (s *NotificationServer) RegisterClientSession(session *ClientSession) (sessionKey []byte, err error) { + s.clientSessionsMu.Lock() + defer s.clientSessionsMu.Unlock() + + // generate random symmetric session key + keyName := fmt.Sprintf("%s-%s", "ntfy-client", crypto.Keccak256Hash([]byte(session.ClientKey)).Hex()) + sessionKey, sessionKeyDerived, err := s.makeSessionKey(keyName) + if err != nil { + return nil, err + } + + // populate session key hash (will be used to match decrypted message to a given client id) + session.SessionKeyInput = sessionKey + session.SessionKeyHash = crypto.Keccak256Hash(sessionKeyDerived) + session.SessionKey = sessionKeyDerived + + // append to list of known clients + // so that it is trivial to go key hash -> client session info + id := session.SessionKeyHash.Hex() + s.clientSessions[id] = session + + // setup filter, which will get all incoming messages, that are encrypted with SymKey + filterID, err := s.installTopicFilter(topicNewChatSession, sessionKeyDerived) + if err != nil { + return nil, fmt.Errorf("failed installing filter: %v", err) + } + go s.requestProcessorLoop(filterID, topicNewChatSession, s.processNewChatSessionRequest) + return +} + +// RegisterChatSession forms a cryptographic link between server and client. +// This link is meant to be shared with other clients, so that they can use +// the shared SymKey to trigger notifications for devices attached to a given +// chat session. +func (s *NotificationServer) RegisterChatSession(session *ChatSession) (sessionKey []byte, err error) { + s.chatSessionsMu.Lock() + defer s.chatSessionsMu.Unlock() + + // generate random symmetric session key + keyName := fmt.Sprintf("%s-%s", "ntfy-chat", crypto.Keccak256Hash([]byte(session.ParentKey+session.ChatKey)).Hex()) + sessionKey, sessionKeyDerived, err := s.makeSessionKey(keyName) + if err != nil { + return nil, err + } + + // populate session key hash (will be used to match decrypted message to a given client id) + session.SessionKeyHash = crypto.Keccak256Hash(sessionKeyDerived) + session.SessionKey = sessionKeyDerived + + // append to list of known clients + // so that it is trivial to go key hash -> client session info + id := session.SessionKeyHash.Hex() + s.chatSessions[id] = session + + // setup filter, to process incoming device registration requests + filterID1, err := s.installTopicFilter(topicNewDeviceRegistration, sessionKeyDerived) + if err != nil { + return nil, fmt.Errorf("failed installing filter: %v", err) + } + go s.requestProcessorLoop(filterID1, topicNewDeviceRegistration, s.processNewDeviceRegistrationRequest) + + // setup filter, to process incoming notification trigger requests + filterID2, err := s.installTopicFilter(topicSendNotification, sessionKeyDerived) + if err != nil { + return nil, fmt.Errorf("failed installing filter: %v", err) + } + go s.requestProcessorLoop(filterID2, topicSendNotification, s.processSendNotificationRequest) + + return +} + +// RegisterDeviceSubscription persists device id, so that it can be used to trigger notifications. +func (s *NotificationServer) RegisterDeviceSubscription(subscription *DeviceSubscription) error { + s.deviceSubscriptionsMu.Lock() + defer s.deviceSubscriptionsMu.Unlock() + + // if one passes the same id again, we will just overwrite + id := fmt.Sprintf("%s-%s", "ntfy-device", + crypto.Keccak256Hash([]byte(subscription.ChatSessionKeyHash.Hex()+subscription.DeviceID)).Hex()) + s.deviceSubscriptions[id] = subscription + + log.Info("device registered", "device", subscription.DeviceID) + return nil +} + +// DropClientSession uninstalls session +func (s *NotificationServer) DropClientSession(id string) { + dropChatSessions := func(parentKey string) { + s.chatSessionsMu.Lock() + defer s.chatSessionsMu.Unlock() + + for key, chatSession := range s.chatSessions { + if chatSession.ParentKey == parentKey { + delete(s.chatSessions, key) + log.Info("drop chat session", "key", key) + } + } + } + + dropDeviceSubscriptions := func(parentKey string) { + s.deviceSubscriptionsMu.Lock() + defer s.deviceSubscriptionsMu.Unlock() + + for key, subscription := range s.deviceSubscriptions { + if hex.EncodeToString(crypto.FromECDSAPub(subscription.PubKey)) == parentKey { + delete(s.deviceSubscriptions, key) + log.Info("drop device subscription", "key", key) + } + } + } + + s.clientSessionsMu.Lock() + if session, ok := s.clientSessions[id]; ok { + delete(s.clientSessions, id) + log.Info("server drops client session", "id", id) + s.clientSessionsMu.Unlock() + + dropDeviceSubscriptions(session.ClientKey) + dropChatSessions(session.ClientKey) + } +} + +// processNewChatSessionRequest processes incoming client requests of type: +// client has a session key, and ready to create a new chat session (which is +// a bag of subscribed devices, basically) +func (s *NotificationServer) processNewChatSessionRequest(msg *whisper.ReceivedMessage) error { + s.clientSessionsMu.RLock() + defer s.clientSessionsMu.RUnlock() + + var parsedMessage struct { + ChatID string `json:"chat"` + } + if err := json.Unmarshal(msg.Payload, &parsedMessage); err != nil { + return err + } + + if msg.Src == nil { + return errors.New("message 'from' field is required") + } + + clientSession, ok := s.clientSessions[msg.SymKeyHash.Hex()] + if !ok { + return errors.New("client session not found") + } + + // register chat session + parentKey := hex.EncodeToString(crypto.FromECDSAPub(msg.Src)) + sessionKey, err := s.RegisterChatSession(&ChatSession{ + ParentKey: parentKey, + ChatKey: parsedMessage.ChatID, + }) + if err != nil { + return err + } + + // confirm that chat has been successfully created + msgParams := whisper.MessageParams{ + Dst: msg.Src, + KeySym: clientSession.SessionKey, + Topic: MakeTopic([]byte(topicAckNewChatSession)), + Payload: []byte(`{"server": "0x` + s.nodeID + `", "key": "0x` + hex.EncodeToString(sessionKey) + `"}`), + TTL: uint32(s.config.TTL), + PoW: s.config.MinimumPoW, + WorkTime: 5, + } + response, err := whisper.NewSentMessage(&msgParams) + if err != nil { + return fmt.Errorf("failed to create server response message: %v", err) + } + env, err := response.Wrap(&msgParams) + if err != nil { + return fmt.Errorf("failed to wrap server response message: %v", err) + } + + if err := s.whisper.Send(env); err != nil { + return fmt.Errorf("failed to send server response message: %v", err) + } + + log.Info("server confirms chat creation", "dst", + common.ToHex(crypto.FromECDSAPub(msgParams.Dst)), "topic", msgParams.Topic.String()) + return nil +} + +// processNewDeviceRegistrationRequest processes incoming client requests of type: +// client has a session key, creates chat, and obtains chat SymKey (to be shared with +// others). Then using that chat SymKey client registers it's device ID with server. +func (s *NotificationServer) processNewDeviceRegistrationRequest(msg *whisper.ReceivedMessage) error { + s.chatSessionsMu.RLock() + defer s.chatSessionsMu.RUnlock() + + var parsedMessage struct { + DeviceID string `json:"device"` + } + if err := json.Unmarshal(msg.Payload, &parsedMessage); err != nil { + return err + } + + if msg.Src == nil { + return errors.New("message 'from' field is required") + } + + chatSession, ok := s.chatSessions[msg.SymKeyHash.Hex()] + if !ok { + return errors.New("chat session not found") + } + + if len(parsedMessage.DeviceID) <= 0 { + return errors.New("'device' cannot be empty") + } + + // register chat session + err := s.RegisterDeviceSubscription(&DeviceSubscription{ + DeviceID: parsedMessage.DeviceID, + ChatSessionKeyHash: chatSession.SessionKeyHash, + PubKey: msg.Src, + }) + if err != nil { + return err + } + + // confirm that client has been successfully subscribed + msgParams := whisper.MessageParams{ + Dst: msg.Src, + KeySym: chatSession.SessionKey, + Topic: MakeTopic([]byte(topicAckDeviceRegistration)), + Payload: []byte(`{"server": "0x` + s.nodeID + `"}`), + TTL: uint32(s.config.TTL), + PoW: s.config.MinimumPoW, + WorkTime: 5, + } + response, err := whisper.NewSentMessage(&msgParams) + if err != nil { + return fmt.Errorf("failed to create server response message: %v", err) + } + env, err := response.Wrap(&msgParams) + if err != nil { + return fmt.Errorf("failed to wrap server response message: %v", err) + } + + if err := s.whisper.Send(env); err != nil { + return fmt.Errorf("failed to send server response message: %v", err) + } + + log.Info("server confirms device registration", "dst", + common.ToHex(crypto.FromECDSAPub(msgParams.Dst)), "topic", msgParams.Topic.String()) + return nil +} + +// processSendNotificationRequest processes incoming client requests of type: +// when client has session key, and ready to use it to send notifications +func (s *NotificationServer) processSendNotificationRequest(msg *whisper.ReceivedMessage) error { + s.deviceSubscriptionsMu.RLock() + defer s.deviceSubscriptionsMu.RUnlock() + + for _, subscriber := range s.deviceSubscriptions { + if subscriber.ChatSessionKeyHash == msg.SymKeyHash { + if whisper.IsPubKeyEqual(msg.Src, subscriber.PubKey) { + continue // no need to notify ourselves + } + + if s.firebaseProvider != nil { + err := s.firebaseProvider.Send(subscriber.DeviceID, string(msg.Payload)) + if err != nil { + log.Info("cannot send notification", "error", err) + } + } + } + } + + return nil +} + +// processClientSessionStatusRequest processes incoming client requests when: +// client wants to learn whether it is already registered on some of the servers +func (s *NotificationServer) processClientSessionStatusRequest(msg *whisper.ReceivedMessage) error { + s.clientSessionsMu.RLock() + defer s.clientSessionsMu.RUnlock() + + if msg.Src == nil { + return errors.New("message 'from' field is required") + } + + var sessionKey []byte + pubKey := hex.EncodeToString(crypto.FromECDSAPub(msg.Src)) + for _, clientSession := range s.clientSessions { + if clientSession.ClientKey == pubKey { + sessionKey = clientSession.SessionKeyInput + break + } + } + + // session is not found + if sessionKey == nil { + return nil + } + + // let client know that we have session for a given public key + msgParams := whisper.MessageParams{ + Src: s.protocolKey, + Dst: msg.Src, + Topic: MakeTopic([]byte(topicConfirmClientSession)), + Payload: []byte(`{"server": "0x` + s.nodeID + `", "key": "0x` + hex.EncodeToString(sessionKey) + `"}`), + TTL: uint32(s.config.TTL), + PoW: s.config.MinimumPoW, + WorkTime: 5, + } + response, err := whisper.NewSentMessage(&msgParams) + if err != nil { + return fmt.Errorf("failed to create server response message: %v", err) + } + env, err := response.Wrap(&msgParams) + if err != nil { + return fmt.Errorf("failed to wrap server response message: %v", err) + } + + if err := s.whisper.Send(env); err != nil { + return fmt.Errorf("failed to send server response message: %v", err) + } + + log.Info("server confirms client session", "dst", + common.ToHex(crypto.FromECDSAPub(msgParams.Dst)), "topic", msgParams.Topic.String()) + return nil +} + +// processDropClientSessionRequest processes incoming client requests when: +// client wants to drop its sessions with notification servers (if they exist) +func (s *NotificationServer) processDropClientSessionRequest(msg *whisper.ReceivedMessage) error { + if msg.Src == nil { + return errors.New("message 'from' field is required") + } + + s.clientSessionsMu.RLock() + pubKey := hex.EncodeToString(crypto.FromECDSAPub(msg.Src)) + for _, clientSession := range s.clientSessions { + if clientSession.ClientKey == pubKey { + s.clientSessionsMu.RUnlock() + s.DropClientSession(clientSession.SessionKeyHash.Hex()) + break + } + } + return nil +} + +// installTopicFilter installs Whisper filter using symmetric key +func (s *NotificationServer) installTopicFilter(topicName string, topicKey []byte) (filterID string, err error) { + topic := MakeTopicAsBytes([]byte(topicName)) + filter := whisper.Filter{ + KeySym: topicKey, + Topics: [][]byte{topic}, + AllowP2P: true, + } + filterID, err = s.whisper.Subscribe(&filter) + if err != nil { + return "", fmt.Errorf("failed installing filter: %v", err) + } + + log.Debug(fmt.Sprintf("installed topic filter %v for topic %x (%s)", filterID, topic, topicName)) + return +} + +// installKeyFilter installs Whisper filter using asymmetric key +func (s *NotificationServer) installKeyFilter(topicName string, key *ecdsa.PrivateKey) (filterID string, err error) { + topic := MakeTopicAsBytes([]byte(topicName)) + filter := whisper.Filter{ + KeyAsym: key, + Topics: [][]byte{topic}, + AllowP2P: true, + } + filterID, err = s.whisper.Subscribe(&filter) + if err != nil { + return "", fmt.Errorf("failed installing filter: %v", err) + } + + log.Info(fmt.Sprintf("installed key filter %v for topic %x (%s)", filterID, topic, topicName)) + return +} + +// requestProcessorLoop processes incoming client requests, by listening to a given filter, +// and executing process function on each incoming message +func (s *NotificationServer) requestProcessorLoop(filterID string, topicWatched string, fn messageProcessingFn) { + log.Debug(fmt.Sprintf("request processor started: %s", topicWatched)) + + filter := s.whisper.GetFilter(filterID) + if filter == nil { + log.Warn(fmt.Sprintf("filter is not installed: %s (for topic '%s')", filterID, topicWatched)) + return + } + + ticker := time.NewTicker(time.Millisecond * 50) + + for { + select { + case <-ticker.C: + messages := filter.Retrieve() + for _, msg := range messages { + if err := fn(msg); err != nil { + log.Warn("failed processing incoming request", "error", err) + } + } + case <-s.quit: + log.Debug("request processor stopped", "topic", topicWatched) + return + } + } +} + +// makeSessionKey generates and saves random SymKey, allowing to establish secure +// channel between server and client +func (s *NotificationServer) makeSessionKey(keyName string) (sessionKey, sessionKeyDerived []byte, err error) { + // wipe out previous occurrence of symmetric key + s.whisper.DeleteSymKey(keyName) + + sessionKey, err = makeSessionKey() + if err != nil { + return nil, nil, err + } + + keyName, err = s.whisper.AddSymKey(keyName, sessionKey) + if err != nil { + return nil, nil, err + } + + sessionKeyDerived, err = s.whisper.GetSymKey(keyName) + if err != nil { + return nil, nil, err + } + + return +} diff --git a/whisper/notifications/utils.go b/whisper/notifications/utils.go --- a/whisper/notifications/utils.go 1970-01-01 01:00:00.000000000 +0100 +++ b/whisper/notifications/utils.go 2017-09-19 17:03:51.000000000 +0200 @@ -0,0 +1,84 @@ +package notifications + +import ( + "crypto/sha512" + "errors" + "crypto/sha256" + + crand "crypto/rand" + whisper "github.com/ethereum/go-ethereum/whisper/whisperv5" + "golang.org/x/crypto/pbkdf2" +) + +// makeSessionKey returns pseudo-random symmetric key, which is used as +// session key between notification client and server +func makeSessionKey() ([]byte, error) { + // generate random key + const keyLen = 32 + buf := make([]byte, keyLen) + _, err := crand.Read(buf) + if err != nil { + return nil, err + } else if !validateSymmetricKey(buf) { + return nil, errors.New("error in GenerateSymKey: crypto/rand failed to generate random data") + } + + key := buf[:keyLen] + derived, err := deriveKeyMaterial(key, whisper.EnvelopeVersion) + if err != nil { + return nil, err + } else if !validateSymmetricKey(derived) { + return nil, errors.New("failed to derive valid key") + } + + return derived, nil +} + +// validateSymmetricKey returns false if the key contains all zeros +func validateSymmetricKey(k []byte) bool { + return len(k) > 0 && !containsOnlyZeros(k) +} + +// containsOnlyZeros checks if data is empty or not +func containsOnlyZeros(data []byte) bool { + for _, b := range data { + if b != 0 { + return false + } + } + return true +} + +// deriveKeyMaterial derives symmetric key material from the key or password./~~~ +// pbkdf2 is used for security, in case people use password instead of randomly generated keys. +func deriveKeyMaterial(key []byte, version uint64) (derivedKey []byte, err error) { + if version == 0 { + // kdf should run no less than 0.1 seconds on average compute, + // because it's a once in a session experience + derivedKey := pbkdf2.Key(key, nil, 65356, 32, sha256.New) + return derivedKey, nil + } else { + return nil, errors.New("unknown version") + } +} + +// MakeTopic returns Whisper topic *as bytes array* by generating cryptographic key from the provided password +func MakeTopicAsBytes(password []byte) ([]byte) { + topic := make([]byte, int(whisper.TopicLength)) + x := pbkdf2.Key(password, password, 8196, 128, sha512.New) + for i := 0; i < len(x); i++ { + topic[i%whisper.TopicLength] ^= x[i] + } + + return topic +} + +// MakeTopic returns Whisper topic by generating cryptographic key from the provided password +func MakeTopic(password []byte) (topic whisper.TopicType) { + x := pbkdf2.Key(password, password, 8196, 128, sha512.New) + for i := 0; i < len(x); i++ { + topic[i%whisper.TopicLength] ^= x[i] + } + + return +} diff --git a/whisper/whisperv2/whisper.go b/whisper/whisperv2/whisper.go --- a/whisper/whisperv2/whisper.go 2017-12-04 14:11:36.000000000 +0100 +++ b/whisper/whisperv2/whisper.go 2017-12-03 19:32:31.000000000 +0100 @@ -134,6 +134,13 @@ return key } +// AddIdentity adds identity into the known identities list (for message decryption). +func (self *Whisper) AddIdentity(key *ecdsa.PrivateKey) { + self.keysMu.Lock() + self.keys[string(crypto.FromECDSAPub(&key.PublicKey))] = key + self.keysMu.Unlock() +} + // HasIdentity checks if the the whisper node is configured with the private key // of the specified public pair. func (self *Whisper) HasIdentity(key *ecdsa.PublicKey) bool { diff --git a/whisper/whisperv5/api.go b/whisper/whisperv5/api.go --- a/whisper/whisperv5/api.go 2017-12-04 14:13:07.000000000 +0100 +++ b/whisper/whisperv5/api.go 2017-12-04 17:40:41.000000000 +0100 @@ -313,6 +313,16 @@ return true, api.w.Send(env) } +// UninstallFilter is alias for Unsubscribe +func (api *PublicWhisperAPI) UninstallFilter(id string) { + api.w.Unsubscribe(id) +} + +// Unsubscribe disables and removes an existing filter. +func (api *PublicWhisperAPI) Unsubscribe(id string) { + api.w.Unsubscribe(id) +} + //go:generate gencodec -type Criteria -field-override criteriaOverride -out gen_criteria_json.go // Criteria holds various filter options for inbound messages. diff --git a/whisper/whisperv5/doc.go b/whisper/whisperv5/doc.go --- a/whisper/whisperv5/doc.go 2017-12-03 18:18:01.000000000 +0100 +++ b/whisper/whisperv5/doc.go 2017-12-04 17:40:41.000000000 +0100 @@ -32,6 +32,8 @@ import ( "fmt" "time" + + "github.com/ethereum/go-ethereum/p2p" ) const ( @@ -57,7 +59,7 @@ MaxMessageSize = uint32(10 * 1024 * 1024) // maximum accepted size of a message. DefaultMaxMessageSize = uint32(1024 * 1024) - DefaultMinimumPoW = 0.2 + DefaultMinimumPoW = 0.001 padSizeLimit = 256 // just an arbitrary number, could be changed without breaking the protocol (must not exceed 2^24) messageQueueLimit = 1024 @@ -85,3 +87,15 @@ Archive(env *Envelope) DeliverMail(whisperPeer *Peer, request *Envelope) } + +// NotificationServer represents a notification server, +// capable of screening incoming envelopes for special +// topics, and once located, subscribe client nodes as +// recipients to notifications (push notifications atm) +type NotificationServer interface { + // Start initializes notification sending loop + Start(server *p2p.Server) error + + // Stop stops notification sending loop, releasing related resources + Stop() error +} diff --git a/whisper/whisperv5/whisper.go b/whisper/whisperv5/whisper.go --- a/whisper/whisperv5/whisper.go 2017-12-03 19:03:21.000000000 +0100 +++ b/whisper/whisperv5/whisper.go 2017-12-04 17:40:41.000000000 +0100 @@ -77,7 +77,8 @@ statsMu sync.Mutex // guard stats stats Statistics // Statistics of whisper node - mailServer MailServer // MailServer interface + mailServer MailServer // MailServer interface + notificationServer NotificationServer } // New creates a Whisper client ready to communicate through the Ethereum P2P network. @@ -156,6 +157,11 @@ w.mailServer = server } +// RegisterNotificationServer registers notification server with Whisper +func (w *Whisper) RegisterNotificationServer(server NotificationServer) { + w.notificationServer = server +} + // Protocols returns the whisper sub-protocols ran by this particular client. func (w *Whisper) Protocols() []p2p.Protocol { return []p2p.Protocol{w.protocol} @@ -250,9 +256,9 @@ return "", fmt.Errorf("failed to generate valid key") } - id, err := GenerateRandomID() + id, err := toDeterministicID(common.ToHex(crypto.FromECDSAPub(&key.PublicKey)), keyIdSize) if err != nil { - return "", fmt.Errorf("failed to generate ID: %s", err) + return "", err } w.keyMu.Lock() @@ -265,45 +271,94 @@ return id, nil } -// DeleteKeyPair deletes the specified key if it exists. -func (w *Whisper) DeleteKeyPair(key string) bool { +// AddIdentity adds cryptographic identity into the known +// identities list (for message decryption). +func (w *Whisper) AddKeyPair(key *ecdsa.PrivateKey) (string, error) { + id, err := makeDeterministicID(common.ToHex(crypto.FromECDSAPub(&key.PublicKey)), keyIdSize) + if err != nil { + return "", err + } + if w.HasKeyPair(id) { + return id, nil // no need to re-inject + } + w.keyMu.Lock() defer w.keyMu.Unlock() - if w.privateKeys[key] != nil { - delete(w.privateKeys, key) - return true - } - return false + w.privateKeys[id] = key + log.Info("Whisper identity added", "id", id, "pubkey", common.ToHex(crypto.FromECDSAPub(&key.PublicKey))) + + return id, nil } -// AddKeyPair imports a asymmetric private key and returns it identifier. -func (w *Whisper) AddKeyPair(key *ecdsa.PrivateKey) (string, error) { - id, err := GenerateRandomID() +// SelectKeyPair adds cryptographic identity, and makes sure +// that it is the only private key known to the node. +func (w *Whisper) SelectKeyPair(key *ecdsa.PrivateKey) error { + id, err := makeDeterministicID(common.ToHex(crypto.FromECDSAPub(&key.PublicKey)), keyIdSize) if err != nil { - return "", fmt.Errorf("failed to generate ID: %s", err) + return err } w.keyMu.Lock() + defer w.keyMu.Unlock() + + w.privateKeys = make(map[string]*ecdsa.PrivateKey) // reset key store w.privateKeys[id] = key - w.keyMu.Unlock() - return id, nil + log.Info("Whisper identity selected", "id", id, "key", common.ToHex(crypto.FromECDSAPub(&key.PublicKey))) + return nil +} + +// DeleteKeyPairs removes all cryptographic identities known to the node +func (w *Whisper) DeleteKeyPairs() error { + w.keyMu.Lock() + defer w.keyMu.Unlock() + + w.privateKeys = make(map[string]*ecdsa.PrivateKey) + + return nil +} + +// DeleteKeyPair deletes the specified key if it exists. +func (w *Whisper) DeleteKeyPair(id string) bool { + deterministicID, err := toDeterministicID(id, keyIdSize) + if err != nil { + return false + } + + w.keyMu.Lock() + defer w.keyMu.Unlock() + + if w.privateKeys[deterministicID] != nil { + delete(w.privateKeys, deterministicID) + return true + } + return false } // HasKeyPair checks if the the whisper node is configured with the private key // of the specified public pair. func (w *Whisper) HasKeyPair(id string) bool { + deterministicID, err := toDeterministicID(id, keyIdSize) + if err != nil { + return false + } + w.keyMu.RLock() defer w.keyMu.RUnlock() - return w.privateKeys[id] != nil + return w.privateKeys[deterministicID] != nil } // GetPrivateKey retrieves the private key of the specified identity. func (w *Whisper) GetPrivateKey(id string) (*ecdsa.PrivateKey, error) { + deterministicID, err := toDeterministicID(id, keyIdSize) + if err != nil { + return nil, err + } + w.keyMu.RLock() defer w.keyMu.RUnlock() - key := w.privateKeys[id] + key := w.privateKeys[deterministicID] if key == nil { return nil, fmt.Errorf("invalid id") } @@ -336,6 +391,23 @@ return id, nil } +// AddSymKey stores the key with a given id. +func (w *Whisper) AddSymKey(id string, key []byte) (string, error) { + deterministicID, err := toDeterministicID(id, keyIdSize) + if err != nil { + return "", err + } + + w.keyMu.Lock() + defer w.keyMu.Unlock() + + if w.symKeys[deterministicID] != nil { + return "", fmt.Errorf("key already exists: %v", id) + } + w.symKeys[deterministicID] = key + return deterministicID, nil +} + // AddSymKeyDirect stores the key, and returns its id. func (w *Whisper) AddSymKeyDirect(key []byte) (string, error) { if len(key) != aesKeyLength { @@ -447,7 +519,7 @@ // Start implements node.Service, starting the background data propagation thread // of the Whisper protocol. -func (w *Whisper) Start(*p2p.Server) error { +func (w *Whisper) Start(stack *p2p.Server) error { log.Info("started whisper v." + ProtocolVersionStr) go w.update() @@ -456,6 +528,12 @@ go w.processQueue() } + if w.notificationServer != nil { + if err := w.notificationServer.Start(stack); err != nil { + return err + } + } + return nil } @@ -463,6 +541,13 @@ // of the Whisper protocol. func (w *Whisper) Stop() error { close(w.quit) + + if w.notificationServer != nil { + if err := w.notificationServer.Stop(); err != nil { + return err + } + } + log.Info("whisper stopped") return nil } @@ -856,3 +941,30 @@ id = common.Bytes2Hex(buf) return id, err } + +// makeDeterministicID generates a deterministic ID, based on a given input +func makeDeterministicID(input string, keyLen int) (id string, err error) { + buf := pbkdf2.Key([]byte(input), nil, 4096, keyLen, sha256.New) + if !validateSymmetricKey(buf) { + return "", fmt.Errorf("error in GenerateDeterministicID: failed to generate key") + } + id = common.Bytes2Hex(buf) + return id, err +} + +// toDeterministicID reviews incoming id, and transforms it to format +// expected internally be private key store. Originally, public keys +// were used as keys, now random keys are being used. And in order to +// make it easier to consume, we now allow both random IDs and public +// keys to be passed. +func toDeterministicID(id string, expectedLen int) (string, error) { + if len(id) != (expectedLen * 2) { // we received hex key, so number of chars in id is doubled + var err error + id, err = makeDeterministicID(id, expectedLen) + if err != nil { + return "", err + } + } + + return id, nil +}