diff --git a/chat.go b/chat.go index a7473de..080c972 100644 --- a/chat.go +++ b/chat.go @@ -1,6 +1,7 @@ package main import ( + "context" "crypto/ecdsa" "encoding/hex" "errors" @@ -12,7 +13,6 @@ import ( "github.com/ethereum/go-ethereum/crypto" "github.com/fatih/color" "github.com/jroimartin/gocui" - whisper "github.com/status-im/whisper/whisperv6" "github.com/status-im/status-term-client/protocol/v1" ) @@ -27,43 +27,12 @@ var ( ErrUnsupportedContactType = errors.New("unsupported contact type") ) -// ReceivedMessage contains a raw Whisper message and decoded payload. -type ReceivedMessage struct { - Decoded protocol.StatusMessage - Received *whisper.ReceivedMessage -} - -// RequestMessagesParams is a list of params sent while requesting historic messages. -type RequestMessagesParams struct { - Limit int - From int64 - To int64 -} - -// MessagesSubscription is a subscription that retrieves messages. -type MessagesSubscription interface { - Messages() ([]*ReceivedMessage, error) - Unsubscribe() error -} - -// PublicChat provides an interface to interact with public chats. -type PublicChat interface { - SubscribePublicChat(name string) (MessagesSubscription, error) - SendPublicMessage(chatName string, data []byte, identity Identity) (string, error) - RequestPublicMessages(chatName string, params RequestMessagesParams) error -} - -// Chat provides an interface to interact with any chat. -type Chat interface { - PublicChat -} - // ChatViewController manages chat view. type ChatViewController struct { *ViewController - identity Identity - node Chat + identity *ecdsa.PrivateKey + chat protocol.Chat currentContact Contact lastClockValue int64 @@ -74,11 +43,11 @@ type ChatViewController struct { } // NewChatViewController returns a new chat view controller. -func NewChatViewController(vc *ViewController, id Identity, node Chat) (*ChatViewController, error) { +func NewChatViewController(vc *ViewController, id Identity, chat protocol.Chat) (*ChatViewController, error) { return &ChatViewController{ ViewController: vc, identity: id, - node: node, + chat: chat, sentMessages: make(map[string]struct{}), }, nil } @@ -91,13 +60,20 @@ func (c *ChatViewController) Select(contact Contact) error { c.currentContact = contact var ( - sub MessagesSubscription + sub *protocol.Subscription err error ) + messages := make(chan *protocol.ReceivedMessage) + switch contact.Type { case ContactPublicChat: - sub, err = c.node.SubscribePublicChat(contact.Name) + ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) + defer cancel() + sub, err = c.chat.SubscribePublicChat(ctx, contact.Name, messages) + if err != nil { + err = fmt.Errorf("failed to subscribe to public chat: %v", err) + } default: err = ErrUnsupportedContactType } @@ -122,71 +98,73 @@ func (c *ChatViewController) Select(contact Contact) error { c.cancel = make(chan struct{}) c.done = make(chan struct{}) - go c.readMessagesLoop(sub, c.cancel, c.done) + go c.readMessagesLoop(sub, messages, c.cancel, c.done) // Request some previous messages from the current chat // to provide some context for the user. // TODO: handle pagination // TODO: RequestPublicMessages should return only after receiving a response. - params := RequestMessagesParams{ + ctx, cancel := context.WithTimeout(context.Background(), time.Second*30) + defer cancel() + params := protocol.RequestMessagesParams{ Limit: 100, } - if err := c.node.RequestPublicMessages(c.currentContact.Name, params); err != nil { + if err := c.chat.RequestPublicMessages(ctx, c.currentContact.Name, params); err != nil { return fmt.Errorf("failed to request messages: %v", err) } return nil } // TODO: change done channel to err channel. Err channel should be handled by a goroutine. -func (c *ChatViewController) readMessagesLoop(sub MessagesSubscription, cancel <-chan struct{}, done chan struct{}) { +func (c *ChatViewController) readMessagesLoop( + sub *protocol.Subscription, + messages <-chan *protocol.ReceivedMessage, + cancel <-chan struct{}, + done chan struct{}, +) { // TODO: check calls order. `close(done)` should be the last one. - defer func() { _ = sub.Unsubscribe() }() + defer sub.Unsubscribe() defer close(done) - t := time.NewTimer(ReadMessagesTimeout) - defer t.Stop() - for { select { - case <-t.C: + case m := <-messages: + c.updateLastClockValue(m) + c.g.Update(func(*gocui.Gui) error { - messages, err := sub.Messages() + err := c.printMessage(m) if err != nil { - return fmt.Errorf("failed to get messages: %v", err) + err = fmt.Errorf("failed to print a message because of %v", err) } - log.Printf("received %d messages", len(messages)) - - c.updateLastClockValue(messages) - - return c.printMessages(messages) + return err }) - - t.Reset(ReadMessagesTimeout) + case <-sub.Done(): + if err := sub.Err(); err != nil { + log.Fatalf("protocol subscription errored: %v", err) + } case <-cancel: return } } } -func (c *ChatViewController) printMessages(messages []*ReceivedMessage) error { +func (c *ChatViewController) printMessage(message *protocol.ReceivedMessage) error { myPubKey := c.identity.PublicKey + pubKey := message.SigPubKey - for _, message := range messages { - pubKey := message.Received.SigToPubKey() - line := formatMessageLine( - pubKey, - time.Unix(message.Decoded.Timestamp/1000, 0), - message.Decoded.Text, - ) + line := formatMessageLine( + pubKey, + time.Unix(message.Decoded.Timestamp/1000, 0), + message.Decoded.Text, + ) - println := fmt.Fprintln - if pubKey.X.Cmp(myPubKey.X) == 0 && pubKey.Y.Cmp(myPubKey.Y) == 0 { - println = color.New(color.FgGreen).Fprintln - } + println := fmt.Fprintln + if pubKey.X.Cmp(myPubKey.X) == 0 && pubKey.Y.Cmp(myPubKey.Y) == 0 { + println = color.New(color.FgGreen).Fprintln + } - if _, err := println(c.ViewController, line); err != nil { - return err - } + if _, err := println(c.ViewController, line); err != nil { + return err } return nil @@ -205,14 +183,7 @@ func formatMessageLine(id *ecdsa.PublicKey, t time.Time, text string) string { ) } -func (c *ChatViewController) updateLastClockValue(messages []*ReceivedMessage) { - size := len(messages) - if size == 0 { - return - } - - m := messages[size-1] - +func (c *ChatViewController) updateLastClockValue(m *protocol.ReceivedMessage) { if m.Decoded.Clock > c.lastClockValue { c.lastClockValue = m.Decoded.Clock } @@ -244,7 +215,9 @@ func (c *ChatViewController) SendMessage(content []byte) (string, error) { switch c.currentContact.Type { case ContactPublicChat: - return c.node.SendPublicMessage(c.currentContact.Name, data, c.identity) + ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) + defer cancel() + return c.chat.SendPublicMessage(ctx, c.currentContact.Name, data, c.identity) default: return "", ErrUnsupportedContactType } diff --git a/go.mod b/go.mod index e5159da..9a55713 100644 --- a/go.mod +++ b/go.mod @@ -20,6 +20,7 @@ require ( github.com/golang-migrate/migrate v3.5.4+incompatible // indirect github.com/golang/mock v1.2.0 // indirect github.com/golang/protobuf v1.2.0 // indirect + github.com/google/pprof v0.0.0-20190109223431-e84dfd68c163 // indirect github.com/google/uuid v1.1.0 // indirect github.com/gorilla/websocket v1.4.0 // indirect github.com/gxed/GoEndian v0.0.0-20160916112711-0f5c6873267e // indirect @@ -27,6 +28,7 @@ require ( github.com/gxed/hashland v0.0.0-20180221191214-d9f6b97f8db2 // indirect github.com/hashicorp/golang-lru v0.5.0 // indirect github.com/huin/goupnp v1.0.0 // indirect + github.com/ianlancetaylor/demangle v0.0.0-20181102032728-5e5cf60278f6 // indirect github.com/ipfs/go-cid v0.9.0 // indirect github.com/ipfs/go-datastore v3.2.0+incompatible // indirect github.com/ipfs/go-ipfs-util v1.2.8 // indirect diff --git a/go.sum b/go.sum index 0c638a6..d031f65 100644 --- a/go.sum +++ b/go.sum @@ -86,6 +86,8 @@ github.com/golang/snappy v0.0.0-20180518054509-2e65f85255db/go.mod h1:/XxbfmMg8l github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M= github.com/google/go-github v15.0.0+incompatible/go.mod h1:zLgOLi98H3fifZn+44m+umXrS52loVEgC2AApnigrVQ= github.com/google/go-querystring v0.0.0-20170111101155-53e6ce116135/go.mod h1:odCYkC5MyYFN7vkCjXpyrEuKhc/BUO6wN/zVPAxq5ck= +github.com/google/pprof v0.0.0-20190109223431-e84dfd68c163 h1:beB+Da4k9B1zmgag78k3k1Bx4L/fdWr5FwNa0f8RxmY= +github.com/google/pprof v0.0.0-20190109223431-e84dfd68c163/go.mod h1:zfwlbNMJ+OItoe0UupaVj+oy1omPYYDuagoSzA8v9mc= github.com/google/uuid v1.0.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/google/uuid v1.1.0 h1:Jf4mxPC/ziBnoPIdpQdPJ9OeiomAUHLvxmPRSPH9m4s= github.com/google/uuid v1.1.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= @@ -109,6 +111,8 @@ github.com/huin/goupnp v0.0.0-20180415215157-1395d1447324/go.mod h1:MZ2ZmwcBpvOo github.com/huin/goupnp v1.0.0 h1:wg75sLpL6DZqwHQN6E1Cfk6mtfzS45z8OV+ic+DtHRo= github.com/huin/goupnp v1.0.0/go.mod h1:n9v9KO1tAxYH82qOn+UTIFQDmx5n1Zxd/ClZDMX7Bnc= github.com/huin/goutil v0.0.0-20170803182201-1ca381bf3150/go.mod h1:PpLOETDnJ0o3iZrZfqZzyLl6l7F3c6L1oWn7OICBi6o= +github.com/ianlancetaylor/demangle v0.0.0-20181102032728-5e5cf60278f6 h1:UDMh68UUwekSh5iP2OMhRRZJiiBccgV7axzUG8vi56c= +github.com/ianlancetaylor/demangle v0.0.0-20181102032728-5e5cf60278f6/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc= github.com/ipfs/go-cid v0.9.0 h1:EdO7meRzk9MpAo8DbOmPDU3Yh2BQ4ABc0xN2wgEtREA= github.com/ipfs/go-cid v0.9.0/go.mod h1:DEZAg7ik3SR8PY77P+hNaWtHtBirqeEgHbfmePL8WJA= github.com/ipfs/go-datastore v3.2.0+incompatible h1:d9fANkqO9u1kgx6FSlZb8eZPDzD2uthVikkJAI7CUII= diff --git a/main.go b/main.go index 6356128..d17fdd1 100644 --- a/main.go +++ b/main.go @@ -11,10 +11,14 @@ import ( "path/filepath" "strings" + "github.com/ethereum/go-ethereum/rpc" + "github.com/ethereum/go-ethereum/crypto" "github.com/jroimartin/gocui" "github.com/peterbourgon/ff" + "github.com/status-im/status-go/node" "github.com/status-im/status-go/params" + "github.com/status-im/status-term-client/protocol/v1" ) func init() { @@ -28,11 +32,15 @@ func main() { // flags acting like commands createKeyPair = fs.Bool("create-key-pair", false, "creates and prints a key pair instead of running") - // runtime flags + // flags for in-proc node dataDir = fs.String("data-dir", filepath.Join(os.TempDir(), "status-term-client"), "data directory for Ethereum node") fleet = fs.String("fleet", params.FleetBeta, fmt.Sprintf("Status nodes cluster to connect to: %s", []string{params.FleetBeta, params.FleetStaging})) configFile = fs.String("node-config", "", "a JSON file with node config") - keyHex = fs.String("keyhex", "", "pass a private key in hex") + + // flags for external node + providerURI = fs.String("provider", "", "an URI pointing at a provider") + + keyHex = fs.String("keyhex", "", "pass a private key in hex") ) ff.Parse(fs, os.Args[1:]) @@ -58,22 +66,51 @@ func main() { exitErr(errors.New("private key is required")) } + // initialize chat + var chatAdapter protocol.Chat + + if *providerURI != "" { + rpc, err := rpc.Dial(*providerURI) + if err != nil { + exitErr(err) + } + + // TODO: provide Mail Servers in a different way. + nodeConfig, err := generateStatusNodeConfig(*dataDir, *fleet, *configFile) + if err != nil { + exitErr(err) + } + + chatAdapter = protocol.NewWhisperClientAdapter(rpc, nodeConfig.ClusterConfig.TrustedMailServers) + } else { + nodeConfig, err := generateStatusNodeConfig(*dataDir, *fleet, *configFile) + if err != nil { + exitErr(err) + } + + statusNode := node.New() + if err := statusNode.Start(nodeConfig); err != nil { + exitErr(err) + } + + shhService, err := statusNode.WhisperService() + if err != nil { + exitErr(err) + } + + chatAdapter = protocol.NewWhisperServiceAdapter(statusNode, shhService) + } + g, err := gocui.NewGui(gocui.Output256) if err != nil { exitErr(err) } defer g.Close() - // start Status node - node := NewNode() - if err := node.Start(*dataDir, *fleet, *configFile); err != nil { - exitErr(err) - } - // prepare views vm := NewViewManager(nil, g) - chat, err := NewChatViewController(&ViewController{vm, g, ViewChat}, privateKey, node) + chat, err := NewChatViewController(&ViewController{vm, g, ViewChat}, privateKey, chatAdapter) if err != nil { exitErr(err) } diff --git a/node.go b/node.go deleted file mode 100644 index 3e73942..0000000 --- a/node.go +++ /dev/null @@ -1,262 +0,0 @@ -package main - -import ( - "context" - "crypto/ecdsa" - "errors" - "fmt" - "log" - stdlog "log" - "math/rand" - "os" - "sort" - "time" - - "github.com/status-im/status-term-client/protocol/v1" - - "github.com/ethereum/go-ethereum/p2p" - "github.com/status-im/status-go/logutils" - "github.com/status-im/status-go/node" - "github.com/status-im/status-go/params" - "github.com/status-im/status-go/services/shhext" - "github.com/status-im/status-go/t/helpers" - whisper "github.com/status-im/whisper/whisperv6" -) - -func init() { - if err := logutils.OverrideRootLog(true, "DEBUG", "", false); err != nil { - stdlog.Fatalf("failed to override root log: %v\n", err) - } -} - -// WhisperSubscription encapsulates a Whisper filter. -type WhisperSubscription struct { - shh *whisper.Whisper - filterID string -} - -// NewWhisperSubscription returns a new WhisperSubscription. -func NewWhisperSubscription(shh *whisper.Whisper, filterID string) *WhisperSubscription { - return &WhisperSubscription{shh, filterID} -} - -// Messages retrieves a list of messages for a given filter. -func (s WhisperSubscription) Messages() ([]*ReceivedMessage, error) { - f := s.shh.GetFilter(s.filterID) - if f == nil { - return nil, errors.New("filter does not exist") - } - - items := f.Retrieve() - result := make([]*ReceivedMessage, len(items)) - - for i, item := range items { - decoded, err := protocol.DecodeMessage(item.Payload) - if err != nil { - log.Printf("failed to decode message: %s", item.Payload) - continue - } - - result[i] = &ReceivedMessage{ - Received: item, - Decoded: decoded, - } - } - - sort.Slice(result, func(i, j int) bool { - return result[i].Decoded.Clock < result[j].Decoded.Clock - }) - - return result, nil -} - -// Unsubscribe removes the subscription. -func (s WhisperSubscription) Unsubscribe() error { - return s.shh.Unsubscribe(s.filterID) -} - -// Node is an adapter between Chat interface and Status node. -type Node struct { - node *node.StatusNode -} - -// NewNode returns a new node. -func NewNode() *Node { - return &Node{node: node.New()} -} - -// Start starts the node. -// TODO: params should be taken from the config argument. -func (n *Node) Start(dataDir, fleet, configFile string) error { - if err := os.MkdirAll(dataDir, os.ModeDir|0755); err != nil { - return fmt.Errorf("failed to create a data dir: %v", err) - } - - var configFiles []string - if configFile != "" { - configFiles = append(configFiles, configFile) - } - - config, err := params.NewNodeConfigWithDefaultsAndFiles( - dataDir, - params.MainNetworkID, - []params.Option{params.WithFleet(fleet)}, - configFiles, - ) - if err != nil { - return fmt.Errorf("failed to create a config: %v", err) - } - return n.node.Start(config) -} - -// AddKeyPair adds a key pair to the Whisper service. -func (n *Node) AddKeyPair(key *ecdsa.PrivateKey) (string, error) { - shh, err := n.node.WhisperService() - if err != nil { - return "", err - } - return shh.AddKeyPair(key) -} - -// SubscribePublicChat subscribes to a public chat using the Whisper service. -func (n *Node) SubscribePublicChat(name string) (sub MessagesSubscription, err error) { - shh, err := n.node.WhisperService() - if err != nil { - return - } - - // TODO: add cache - symKeyID, err := shh.AddSymKeyFromPassword(name) - if err != nil { - return - } - symKey, err := shh.GetSymKey(symKeyID) - if err != nil { - return - } - - // TODO: add cache - topic, err := protocol.PublicChatTopic(name) - if err != nil { - return - } - - filterID, err := shh.Subscribe(&whisper.Filter{ - KeySym: symKey, - Topics: [][]byte{topic[:]}, - PoW: 0, - AllowP2P: true, - }) - if err != nil { - return - } - - return NewWhisperSubscription(shh, filterID), nil -} - -// SendPublicMessage sends a new message using the Whisper service. -func (n *Node) SendPublicMessage(name string, data []byte, identity Identity) (string, error) { - whisperService, err := n.node.WhisperService() - if err != nil { - return "", err - } - - // TODO: add cache - keyID, err := whisperService.AddKeyPair(identity) - if err != nil { - return "", err - } - - // TODO: add cache - symKeyID, err := whisperService.AddSymKeyFromPassword(name) - if err != nil { - return "", err - } - - // TODO: add cache - topic, err := protocol.PublicChatTopic(name) - if err != nil { - return "", err - } - - shh := whisper.NewPublicWhisperAPI(whisperService) - - ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) - defer cancel() - hash, err := shh.Post(ctx, whisper.NewMessage{ - SymKeyID: symKeyID, - TTL: 60, - Topic: topic, - Payload: data, - PowTarget: 2.0, - PowTime: 5, - Sig: keyID, - }) - if err == nil { - stdlog.Printf("sent a message with hash %s", hash.String()) - } - - return hash.String(), err -} - -// RequestPublicMessages requests messages from mail servers. -func (n *Node) RequestPublicMessages(chatName string, params RequestMessagesParams) error { - // TODO: add cache - topic, err := protocol.PublicChatTopic(chatName) - if err != nil { - return err - } - - shhService, err := n.node.WhisperService() - if err != nil { - return err - } - - shhextService, err := n.node.ShhExtService() - if err != nil { - return err - } - shhextAPI := shhext.NewPublicAPI(shhextService) - - config := n.node.Config() - mailServerEnode := randomItem(config.ClusterConfig.TrustedMailServers) - errCh := helpers.WaitForPeerAsync( - n.node.GethNode().Server(), - mailServerEnode, - p2p.PeerEventTypeAdd, - time.Second*5, - ) - if err := n.node.AddPeer(mailServerEnode); err != nil { - return err - } - if err := <-errCh; err != nil { - return err - } - - mailServerSymKeyID, err := shhService.AddSymKeyFromPassword(protocol.MailServerPassword) - if err != nil { - return err - } - - ctx, cancel := context.WithTimeout(context.Background(), time.Second*10) - defer cancel() - hash, err := shhextAPI.RequestMessages(ctx, shhext.MessagesRequest{ - MailServerPeer: mailServerEnode, - From: uint32(params.From), // TODO: change to int in status-go - To: uint32(params.To), // TODO: change to int in status-go - Limit: uint32(params.Limit), // TODO: change to int in status-go - Topics: []whisper.TopicType{topic}, - SymKeyID: mailServerSymKeyID, - }) - if err == nil { - stdlog.Printf("send a request for messages: %s", hash.String()) - } - - // TODO: wait for the request to finish before returning - return err -} - -func randomItem(items []string) string { - l := len(items) - return items[rand.Intn(l)] -} diff --git a/node_config.go b/node_config.go new file mode 100644 index 0000000..5a03594 --- /dev/null +++ b/node_config.go @@ -0,0 +1,34 @@ +package main + +import ( + "fmt" + stdlog "log" + "os" + + "github.com/status-im/status-go/logutils" + "github.com/status-im/status-go/params" +) + +func init() { + if err := logutils.OverrideRootLog(true, "DEBUG", "", false); err != nil { + stdlog.Fatalf("failed to override root log: %v\n", err) + } +} + +func generateStatusNodeConfig(dataDir, fleet, configFile string) (*params.NodeConfig, error) { + if err := os.MkdirAll(dataDir, os.ModeDir|0755); err != nil { + return nil, fmt.Errorf("failed to create a data dir: %v", err) + } + + var configFiles []string + if configFile != "" { + configFiles = append(configFiles, configFile) + } + + return params.NewNodeConfigWithDefaultsAndFiles( + dataDir, + params.MainNetworkID, + []params.Option{params.WithFleet(fleet)}, + configFiles, + ) +} diff --git a/protocol/v1/chat.go b/protocol/v1/chat.go new file mode 100644 index 0000000..0f5955e --- /dev/null +++ b/protocol/v1/chat.go @@ -0,0 +1,53 @@ +package protocol + +import ( + "context" + "crypto/ecdsa" +) + +// Chat provides an interface to interact with any chat. +type Chat interface { + PublicChat +} + +// PublicChat provides an interface to interact with public chats. +type PublicChat interface { + SubscribePublicChat( + ctx context.Context, + name string, + in chan<- *ReceivedMessage, + ) (*Subscription, error) + + // SendPublicMessages sends a message to a public chat. + // Identity is required as the protocol requires + // all messages to be signed. + SendPublicMessage( + ctx context.Context, + chatName string, + data []byte, + identity *ecdsa.PrivateKey, + ) (string, error) + + // TODO: RequestMessagesParams is Whisper specific. + RequestPublicMessages( + ctx context.Context, + chatName string, + params RequestMessagesParams, + ) error +} + +// ReceivedMessage contains a decoded message payload +// and some additional fields that we learnt +// about the message. +type ReceivedMessage struct { + Decoded StatusMessage + SigPubKey *ecdsa.PublicKey +} + +// RequestMessagesParams is a list of params required +// to get historic messages. +type RequestMessagesParams struct { + Limit int + From int64 + To int64 +} diff --git a/protocol/v1/subscription.go b/protocol/v1/subscription.go new file mode 100644 index 0000000..ee486fd --- /dev/null +++ b/protocol/v1/subscription.go @@ -0,0 +1,36 @@ +package protocol + +import "sync" + +type Subscription struct { + sync.RWMutex + + err error + done chan struct{} +} + +func NewSubscription() *Subscription { + return &Subscription{ + done: make(chan struct{}), + } +} + +func (s *Subscription) cancel(err error) { + s.Lock() + s.err = err + s.Unlock() +} + +func (s *Subscription) Unsubscribe() { + close(s.done) +} + +func (s *Subscription) Err() error { + s.RLock() + defer s.RUnlock() + return s.err +} + +func (s *Subscription) Done() <-chan struct{} { + return s.done +} diff --git a/protocol/v1/whisper_client.go b/protocol/v1/whisper_client.go new file mode 100644 index 0000000..486c2cf --- /dev/null +++ b/protocol/v1/whisper_client.go @@ -0,0 +1,207 @@ +package protocol + +import ( + "context" + "crypto/ecdsa" + "fmt" + "log" + "sync" + "time" + + "github.com/ethereum/go-ethereum/crypto" + "github.com/ethereum/go-ethereum/rpc" + "github.com/status-im/status-go/services/shhext" + "github.com/status-im/whisper/shhclient" + whisper "github.com/status-im/whisper/whisperv6" +) + +// WhisperClientAdapter is an adapter for Whisper client +// which implements Chat interface. It requires an RPC client +// which can use various transports like HTTP, IPC or in-proc. +type WhisperClientAdapter struct { + rpcClient *rpc.Client + shhClient *shhclient.Client + mailServerEnodes []string + + mu sync.RWMutex + passSymKeyCache map[string]string +} + +// WhisperClientAdapter must implement Chat interface. +var _ Chat = (*WhisperClientAdapter)(nil) + +// NewWhisperClientAdapter returns a new WhisperClientAdapter. +func NewWhisperClientAdapter(c *rpc.Client, mailServers []string) *WhisperClientAdapter { + return &WhisperClientAdapter{ + rpcClient: c, + shhClient: shhclient.NewClient(c), + mailServerEnodes: mailServers, + passSymKeyCache: make(map[string]string), + } +} + +// SubscribePublicChat subscribes to a public channel. +// in channel is used to receive messages. +// errCh is used to forward any errors that may occur +// during the subscription. +func (a *WhisperClientAdapter) SubscribePublicChat(ctx context.Context, name string, in chan<- *ReceivedMessage) (*Subscription, error) { + symKeyID, err := a.getOrAddSymKey(ctx, name) + if err != nil { + return nil, err + } + + topic, err := PublicChatTopic(name) + if err != nil { + return nil, err + } + + messages := make(chan *whisper.Message) + criteria := whisper.Criteria{ + SymKeyID: symKeyID, + MinPow: 0, // TODO: set it to proper value + Topics: []whisper.TopicType{topic}, + AllowP2P: true, // messages from mail server are direct p2p messages + } + shhSub, err := a.shhClient.SubscribeMessages(ctx, criteria, messages) + if err != nil { + return nil, err + } + + sub := NewSubscription() + + go func() { + defer shhSub.Unsubscribe() + + for { + select { + case raw := <-messages: + m, err := DecodeMessage(raw.Payload) + if err != nil { + log.Printf("failed to decode message: %v", err) + break + } + + sigPubKey, err := crypto.UnmarshalPubkey(raw.Sig) + if err != nil { + log.Printf("failed to get a signature: %v", err) + break + } + + in <- &ReceivedMessage{ + Decoded: m, + SigPubKey: sigPubKey, + } + case err := <-shhSub.Err(): + sub.cancel(err) + return + case <-sub.Done(): + return + } + } + }() + + return sub, nil +} + +// SendPublicMessage sends a new message to a public chat. +// Identity is required to sign a message as only signed messages +// are accepted and displayed. +func (a *WhisperClientAdapter) SendPublicMessage(ctx context.Context, name string, data []byte, identity *ecdsa.PrivateKey) (string, error) { + identityID, err := a.shhClient.AddPrivateKey(ctx, crypto.FromECDSA(identity)) + if err != nil { + return "", err + } + + symKeyID, err := a.getOrAddSymKey(ctx, name) + if err != nil { + return "", err + } + + topic, err := PublicChatTopic(name) + if err != nil { + return "", err + } + + return a.shhClient.Post(ctx, whisper.NewMessage{ + SymKeyID: symKeyID, + TTL: 60, + Topic: topic, + Payload: data, + PowTarget: 2.0, + PowTime: 5, + Sig: identityID, + }) +} + +// RequestPublicMessages sends a request to MailServer for historic messages. +func (a *WhisperClientAdapter) RequestPublicMessages(ctx context.Context, name string, params RequestMessagesParams) error { + enode := randomItem(a.mailServerEnodes) + + log.Printf("using %s as a mail server", enode) + + if err := a.rpcClient.CallContext(ctx, nil, "admin_addPeer", enode); err != nil { + return err + } + + // Adding peer is asynchronous operation so we need to retry a few times. + retries := 0 + for { + <-time.After(time.Second) + + err := a.shhClient.MarkTrustedPeer(ctx, enode) + if ctx.Err() == context.Canceled { + log.Printf("requesting public messages canceled") + return err + } + if err == nil { + break + } + if retries < 3 { + retries++ + } else { + return fmt.Errorf("failed to mark peer as trusted: %v", err) + } + } + + mailServerSymKeyID, err := a.getOrAddSymKey(ctx, MailServerPassword) + if err != nil { + return err + } + + topic, err := PublicChatTopic(name) + if err != nil { + return err + } + + req := shhext.MessagesRequest{ + MailServerPeer: enode, + SymKeyID: mailServerSymKeyID, + From: uint32(params.From), // TODO: change to int in status-go + To: uint32(params.To), // TODO: change to int in status-go + Limit: uint32(params.Limit), // TODO: change to int in status-go + Topics: []whisper.TopicType{topic}, + } + + return a.rpcClient.CallContext(ctx, nil, "shhext_requestMessages", req) +} + +func (a *WhisperClientAdapter) getOrAddSymKey(ctx context.Context, pass string) (string, error) { + a.mu.RLock() + symKeyID, ok := a.passSymKeyCache[pass] + a.mu.RUnlock() + + if ok { + return symKeyID, nil + } + + symKeyID, err := a.shhClient.GenerateSymmetricKeyFromPassword(ctx, pass) + if err != nil { + return "", err + } + + a.mu.Lock() + a.passSymKeyCache[pass] = symKeyID + a.mu.Unlock() + + return symKeyID, nil +} diff --git a/protocol/v1/whisper_service.go b/protocol/v1/whisper_service.go new file mode 100644 index 0000000..ab403d4 --- /dev/null +++ b/protocol/v1/whisper_service.go @@ -0,0 +1,232 @@ +package protocol + +import ( + "context" + "crypto/ecdsa" + "errors" + "log" + "sort" + "time" + + "github.com/ethereum/go-ethereum/p2p" + "github.com/status-im/status-go/node" + "github.com/status-im/status-go/services/shhext" + "github.com/status-im/status-go/t/helpers" + + whisper "github.com/status-im/whisper/whisperv6" +) + +// WhisperServiceAdapter is an adapter for Whisper service +// the implements Chat interface. +type WhisperServiceAdapter struct { + node *node.StatusNode + shh *whisper.Whisper +} + +// WhisperServiceAdapter must implement Chat interface. +var _ Chat = (*WhisperServiceAdapter)(nil) + +// NewWhisperServiceAdapter returns a new WhisperServiceAdapter. +func NewWhisperServiceAdapter(node *node.StatusNode, shh *whisper.Whisper) *WhisperServiceAdapter { + return &WhisperServiceAdapter{ + node: node, + shh: shh, + } +} + +// SubscribePublicChat subscribes to a public chat using the Whisper service. +func (a *WhisperServiceAdapter) SubscribePublicChat(ctx context.Context, name string, in chan<- *ReceivedMessage) (*Subscription, error) { + // TODO: add cache + symKeyID, err := a.shh.AddSymKeyFromPassword(name) + if err != nil { + return nil, err + } + symKey, err := a.shh.GetSymKey(symKeyID) + if err != nil { + return nil, err + } + + // TODO: add cache + topic, err := PublicChatTopic(name) + if err != nil { + return nil, err + } + + filterID, err := a.shh.Subscribe(&whisper.Filter{ + KeySym: symKey, + Topics: [][]byte{topic[:]}, + PoW: 0, + AllowP2P: true, + }) + if err != nil { + return nil, err + } + + subMessages := newWhisperSubscription(a.shh, filterID) + sub := NewSubscription() + + go func() { + defer subMessages.Unsubscribe() + + t := time.NewTicker(time.Second) + defer t.Stop() + + for { + select { + case <-t.C: + messages, err := subMessages.Messages() + if err != nil { + sub.cancel(err) + return + } + + sort.Slice(messages, func(i, j int) bool { + return messages[i].Decoded.Clock < messages[j].Decoded.Clock + }) + + for _, m := range messages { + in <- m + } + case <-sub.Done(): + return + } + } + }() + + return sub, nil +} + +// SendPublicMessage sends a new message using the Whisper service. +func (a *WhisperServiceAdapter) SendPublicMessage( + ctx context.Context, name string, data []byte, identity *ecdsa.PrivateKey, +) (string, error) { + // TODO: add cache + keyID, err := a.shh.AddKeyPair(identity) + if err != nil { + return "", err + } + + // TODO: add cache + symKeyID, err := a.shh.AddSymKeyFromPassword(name) + if err != nil { + return "", err + } + + // TODO: add cache + topic, err := PublicChatTopic(name) + if err != nil { + return "", err + } + + // Only public Whisper API implements logic to send messages. + shhAPI := whisper.NewPublicWhisperAPI(a.shh) + hash, err := shhAPI.Post(ctx, whisper.NewMessage{ + SymKeyID: symKeyID, + TTL: 60, + Topic: topic, + Payload: data, + PowTarget: 2.0, + PowTime: 5, + Sig: keyID, + }) + + return hash.String(), err +} + +// RequestPublicMessages requests messages from mail servers. +func (a *WhisperServiceAdapter) RequestPublicMessages( + ctx context.Context, name string, params RequestMessagesParams, +) error { + // TODO: add cache + topic, err := PublicChatTopic(name) + if err != nil { + return err + } + + shhextService, err := a.node.ShhExtService() + if err != nil { + return err + } + shhextAPI := shhext.NewPublicAPI(shhextService) + + // TODO: remove from here. MailServerEnode must be provided in the params. + config := a.node.Config() + mailServerEnode := randomItem(config.ClusterConfig.TrustedMailServers) + errCh := helpers.WaitForPeerAsync( + a.node.GethNode().Server(), + mailServerEnode, + p2p.PeerEventTypeAdd, + time.Second*5, + ) + if err := a.node.AddPeer(mailServerEnode); err != nil { + return err + } + if err := <-errCh; err != nil { + return err + } + + mailServerSymKeyID, err := a.shh.AddSymKeyFromPassword(MailServerPassword) + if err != nil { + return err + } + + _, err = shhextAPI.RequestMessages(ctx, shhext.MessagesRequest{ + MailServerPeer: mailServerEnode, + From: uint32(params.From), // TODO: change to int in status-go + To: uint32(params.To), // TODO: change to int in status-go + Limit: uint32(params.Limit), // TODO: change to int in status-go + Topics: []whisper.TopicType{topic}, + SymKeyID: mailServerSymKeyID, + }) + + // TODO: wait for the request to finish before returning + return err +} + +// whisperSubscription encapsulates a Whisper filter. +type whisperSubscription struct { + shh *whisper.Whisper + filterID string +} + +// newWhisperSubscription returns a new whisperSubscription. +func newWhisperSubscription(shh *whisper.Whisper, filterID string) *whisperSubscription { + return &whisperSubscription{shh, filterID} +} + +// Messages retrieves a list of messages for a given filter. +func (s whisperSubscription) Messages() ([]*ReceivedMessage, error) { + f := s.shh.GetFilter(s.filterID) + if f == nil { + return nil, errors.New("filter does not exist") + } + + items := f.Retrieve() + result := make([]*ReceivedMessage, 0, len(items)) + + for _, item := range items { + log.Printf("retrieve a message: %#v", item) + + decoded, err := DecodeMessage(item.Payload) + if err != nil { + log.Printf("failed to decode message: %v", err) + continue + } + + result = append(result, &ReceivedMessage{ + Decoded: decoded, + SigPubKey: item.SigToPubKey(), + }) + } + + sort.Slice(result, func(i, j int) bool { + return result[i].Decoded.Clock < result[j].Decoded.Clock + }) + + return result, nil +} + +// Unsubscribe removes the subscription. +func (s whisperSubscription) Unsubscribe() error { + return s.shh.Unsubscribe(s.filterID) +} diff --git a/protocol/v1/whisper_utils.go b/protocol/v1/whisper_utils.go new file mode 100644 index 0000000..5c64236 --- /dev/null +++ b/protocol/v1/whisper_utils.go @@ -0,0 +1,8 @@ +package protocol + +import "math/rand" + +func randomItem(items []string) string { + l := len(items) + return items[rand.Intn(l)] +}