Refactor and use whisper adapters (#1)

This commit is contained in:
Adam Babik 2019-01-22 09:39:23 +01:00 committed by GitHub
parent dfb5e8375e
commit e744368566
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 675 additions and 351 deletions

133
chat.go
View File

@ -1,6 +1,7 @@
package main
import (
"context"
"crypto/ecdsa"
"encoding/hex"
"errors"
@ -12,7 +13,6 @@ import (
"github.com/ethereum/go-ethereum/crypto"
"github.com/fatih/color"
"github.com/jroimartin/gocui"
whisper "github.com/status-im/whisper/whisperv6"
"github.com/status-im/status-term-client/protocol/v1"
)
@ -27,43 +27,12 @@ var (
ErrUnsupportedContactType = errors.New("unsupported contact type")
)
// ReceivedMessage contains a raw Whisper message and decoded payload.
type ReceivedMessage struct {
Decoded protocol.StatusMessage
Received *whisper.ReceivedMessage
}
// RequestMessagesParams is a list of params sent while requesting historic messages.
type RequestMessagesParams struct {
Limit int
From int64
To int64
}
// MessagesSubscription is a subscription that retrieves messages.
type MessagesSubscription interface {
Messages() ([]*ReceivedMessage, error)
Unsubscribe() error
}
// PublicChat provides an interface to interact with public chats.
type PublicChat interface {
SubscribePublicChat(name string) (MessagesSubscription, error)
SendPublicMessage(chatName string, data []byte, identity Identity) (string, error)
RequestPublicMessages(chatName string, params RequestMessagesParams) error
}
// Chat provides an interface to interact with any chat.
type Chat interface {
PublicChat
}
// ChatViewController manages chat view.
type ChatViewController struct {
*ViewController
identity Identity
node Chat
identity *ecdsa.PrivateKey
chat protocol.Chat
currentContact Contact
lastClockValue int64
@ -74,11 +43,11 @@ type ChatViewController struct {
}
// NewChatViewController returns a new chat view controller.
func NewChatViewController(vc *ViewController, id Identity, node Chat) (*ChatViewController, error) {
func NewChatViewController(vc *ViewController, id Identity, chat protocol.Chat) (*ChatViewController, error) {
return &ChatViewController{
ViewController: vc,
identity: id,
node: node,
chat: chat,
sentMessages: make(map[string]struct{}),
}, nil
}
@ -91,13 +60,20 @@ func (c *ChatViewController) Select(contact Contact) error {
c.currentContact = contact
var (
sub MessagesSubscription
sub *protocol.Subscription
err error
)
messages := make(chan *protocol.ReceivedMessage)
switch contact.Type {
case ContactPublicChat:
sub, err = c.node.SubscribePublicChat(contact.Name)
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
defer cancel()
sub, err = c.chat.SubscribePublicChat(ctx, contact.Name, messages)
if err != nil {
err = fmt.Errorf("failed to subscribe to public chat: %v", err)
}
default:
err = ErrUnsupportedContactType
}
@ -122,71 +98,73 @@ func (c *ChatViewController) Select(contact Contact) error {
c.cancel = make(chan struct{})
c.done = make(chan struct{})
go c.readMessagesLoop(sub, c.cancel, c.done)
go c.readMessagesLoop(sub, messages, c.cancel, c.done)
// Request some previous messages from the current chat
// to provide some context for the user.
// TODO: handle pagination
// TODO: RequestPublicMessages should return only after receiving a response.
params := RequestMessagesParams{
ctx, cancel := context.WithTimeout(context.Background(), time.Second*30)
defer cancel()
params := protocol.RequestMessagesParams{
Limit: 100,
}
if err := c.node.RequestPublicMessages(c.currentContact.Name, params); err != nil {
if err := c.chat.RequestPublicMessages(ctx, c.currentContact.Name, params); err != nil {
return fmt.Errorf("failed to request messages: %v", err)
}
return nil
}
// TODO: change done channel to err channel. Err channel should be handled by a goroutine.
func (c *ChatViewController) readMessagesLoop(sub MessagesSubscription, cancel <-chan struct{}, done chan struct{}) {
func (c *ChatViewController) readMessagesLoop(
sub *protocol.Subscription,
messages <-chan *protocol.ReceivedMessage,
cancel <-chan struct{},
done chan struct{},
) {
// TODO: check calls order. `close(done)` should be the last one.
defer func() { _ = sub.Unsubscribe() }()
defer sub.Unsubscribe()
defer close(done)
t := time.NewTimer(ReadMessagesTimeout)
defer t.Stop()
for {
select {
case <-t.C:
case m := <-messages:
c.updateLastClockValue(m)
c.g.Update(func(*gocui.Gui) error {
messages, err := sub.Messages()
err := c.printMessage(m)
if err != nil {
return fmt.Errorf("failed to get messages: %v", err)
err = fmt.Errorf("failed to print a message because of %v", err)
}
log.Printf("received %d messages", len(messages))
c.updateLastClockValue(messages)
return c.printMessages(messages)
return err
})
t.Reset(ReadMessagesTimeout)
case <-sub.Done():
if err := sub.Err(); err != nil {
log.Fatalf("protocol subscription errored: %v", err)
}
case <-cancel:
return
}
}
}
func (c *ChatViewController) printMessages(messages []*ReceivedMessage) error {
func (c *ChatViewController) printMessage(message *protocol.ReceivedMessage) error {
myPubKey := c.identity.PublicKey
pubKey := message.SigPubKey
for _, message := range messages {
pubKey := message.Received.SigToPubKey()
line := formatMessageLine(
pubKey,
time.Unix(message.Decoded.Timestamp/1000, 0),
message.Decoded.Text,
)
line := formatMessageLine(
pubKey,
time.Unix(message.Decoded.Timestamp/1000, 0),
message.Decoded.Text,
)
println := fmt.Fprintln
if pubKey.X.Cmp(myPubKey.X) == 0 && pubKey.Y.Cmp(myPubKey.Y) == 0 {
println = color.New(color.FgGreen).Fprintln
}
println := fmt.Fprintln
if pubKey.X.Cmp(myPubKey.X) == 0 && pubKey.Y.Cmp(myPubKey.Y) == 0 {
println = color.New(color.FgGreen).Fprintln
}
if _, err := println(c.ViewController, line); err != nil {
return err
}
if _, err := println(c.ViewController, line); err != nil {
return err
}
return nil
@ -205,14 +183,7 @@ func formatMessageLine(id *ecdsa.PublicKey, t time.Time, text string) string {
)
}
func (c *ChatViewController) updateLastClockValue(messages []*ReceivedMessage) {
size := len(messages)
if size == 0 {
return
}
m := messages[size-1]
func (c *ChatViewController) updateLastClockValue(m *protocol.ReceivedMessage) {
if m.Decoded.Clock > c.lastClockValue {
c.lastClockValue = m.Decoded.Clock
}
@ -244,7 +215,9 @@ func (c *ChatViewController) SendMessage(content []byte) (string, error) {
switch c.currentContact.Type {
case ContactPublicChat:
return c.node.SendPublicMessage(c.currentContact.Name, data, c.identity)
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
defer cancel()
return c.chat.SendPublicMessage(ctx, c.currentContact.Name, data, c.identity)
default:
return "", ErrUnsupportedContactType
}

2
go.mod
View File

@ -20,6 +20,7 @@ require (
github.com/golang-migrate/migrate v3.5.4+incompatible // indirect
github.com/golang/mock v1.2.0 // indirect
github.com/golang/protobuf v1.2.0 // indirect
github.com/google/pprof v0.0.0-20190109223431-e84dfd68c163 // indirect
github.com/google/uuid v1.1.0 // indirect
github.com/gorilla/websocket v1.4.0 // indirect
github.com/gxed/GoEndian v0.0.0-20160916112711-0f5c6873267e // indirect
@ -27,6 +28,7 @@ require (
github.com/gxed/hashland v0.0.0-20180221191214-d9f6b97f8db2 // indirect
github.com/hashicorp/golang-lru v0.5.0 // indirect
github.com/huin/goupnp v1.0.0 // indirect
github.com/ianlancetaylor/demangle v0.0.0-20181102032728-5e5cf60278f6 // indirect
github.com/ipfs/go-cid v0.9.0 // indirect
github.com/ipfs/go-datastore v3.2.0+incompatible // indirect
github.com/ipfs/go-ipfs-util v1.2.8 // indirect

4
go.sum
View File

@ -86,6 +86,8 @@ github.com/golang/snappy v0.0.0-20180518054509-2e65f85255db/go.mod h1:/XxbfmMg8l
github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M=
github.com/google/go-github v15.0.0+incompatible/go.mod h1:zLgOLi98H3fifZn+44m+umXrS52loVEgC2AApnigrVQ=
github.com/google/go-querystring v0.0.0-20170111101155-53e6ce116135/go.mod h1:odCYkC5MyYFN7vkCjXpyrEuKhc/BUO6wN/zVPAxq5ck=
github.com/google/pprof v0.0.0-20190109223431-e84dfd68c163 h1:beB+Da4k9B1zmgag78k3k1Bx4L/fdWr5FwNa0f8RxmY=
github.com/google/pprof v0.0.0-20190109223431-e84dfd68c163/go.mod h1:zfwlbNMJ+OItoe0UupaVj+oy1omPYYDuagoSzA8v9mc=
github.com/google/uuid v1.0.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/google/uuid v1.1.0 h1:Jf4mxPC/ziBnoPIdpQdPJ9OeiomAUHLvxmPRSPH9m4s=
github.com/google/uuid v1.1.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
@ -109,6 +111,8 @@ github.com/huin/goupnp v0.0.0-20180415215157-1395d1447324/go.mod h1:MZ2ZmwcBpvOo
github.com/huin/goupnp v1.0.0 h1:wg75sLpL6DZqwHQN6E1Cfk6mtfzS45z8OV+ic+DtHRo=
github.com/huin/goupnp v1.0.0/go.mod h1:n9v9KO1tAxYH82qOn+UTIFQDmx5n1Zxd/ClZDMX7Bnc=
github.com/huin/goutil v0.0.0-20170803182201-1ca381bf3150/go.mod h1:PpLOETDnJ0o3iZrZfqZzyLl6l7F3c6L1oWn7OICBi6o=
github.com/ianlancetaylor/demangle v0.0.0-20181102032728-5e5cf60278f6 h1:UDMh68UUwekSh5iP2OMhRRZJiiBccgV7axzUG8vi56c=
github.com/ianlancetaylor/demangle v0.0.0-20181102032728-5e5cf60278f6/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc=
github.com/ipfs/go-cid v0.9.0 h1:EdO7meRzk9MpAo8DbOmPDU3Yh2BQ4ABc0xN2wgEtREA=
github.com/ipfs/go-cid v0.9.0/go.mod h1:DEZAg7ik3SR8PY77P+hNaWtHtBirqeEgHbfmePL8WJA=
github.com/ipfs/go-datastore v3.2.0+incompatible h1:d9fANkqO9u1kgx6FSlZb8eZPDzD2uthVikkJAI7CUII=

55
main.go
View File

@ -11,10 +11,14 @@ import (
"path/filepath"
"strings"
"github.com/ethereum/go-ethereum/rpc"
"github.com/ethereum/go-ethereum/crypto"
"github.com/jroimartin/gocui"
"github.com/peterbourgon/ff"
"github.com/status-im/status-go/node"
"github.com/status-im/status-go/params"
"github.com/status-im/status-term-client/protocol/v1"
)
func init() {
@ -28,11 +32,15 @@ func main() {
// flags acting like commands
createKeyPair = fs.Bool("create-key-pair", false, "creates and prints a key pair instead of running")
// runtime flags
// flags for in-proc node
dataDir = fs.String("data-dir", filepath.Join(os.TempDir(), "status-term-client"), "data directory for Ethereum node")
fleet = fs.String("fleet", params.FleetBeta, fmt.Sprintf("Status nodes cluster to connect to: %s", []string{params.FleetBeta, params.FleetStaging}))
configFile = fs.String("node-config", "", "a JSON file with node config")
keyHex = fs.String("keyhex", "", "pass a private key in hex")
// flags for external node
providerURI = fs.String("provider", "", "an URI pointing at a provider")
keyHex = fs.String("keyhex", "", "pass a private key in hex")
)
ff.Parse(fs, os.Args[1:])
@ -58,22 +66,51 @@ func main() {
exitErr(errors.New("private key is required"))
}
// initialize chat
var chatAdapter protocol.Chat
if *providerURI != "" {
rpc, err := rpc.Dial(*providerURI)
if err != nil {
exitErr(err)
}
// TODO: provide Mail Servers in a different way.
nodeConfig, err := generateStatusNodeConfig(*dataDir, *fleet, *configFile)
if err != nil {
exitErr(err)
}
chatAdapter = protocol.NewWhisperClientAdapter(rpc, nodeConfig.ClusterConfig.TrustedMailServers)
} else {
nodeConfig, err := generateStatusNodeConfig(*dataDir, *fleet, *configFile)
if err != nil {
exitErr(err)
}
statusNode := node.New()
if err := statusNode.Start(nodeConfig); err != nil {
exitErr(err)
}
shhService, err := statusNode.WhisperService()
if err != nil {
exitErr(err)
}
chatAdapter = protocol.NewWhisperServiceAdapter(statusNode, shhService)
}
g, err := gocui.NewGui(gocui.Output256)
if err != nil {
exitErr(err)
}
defer g.Close()
// start Status node
node := NewNode()
if err := node.Start(*dataDir, *fleet, *configFile); err != nil {
exitErr(err)
}
// prepare views
vm := NewViewManager(nil, g)
chat, err := NewChatViewController(&ViewController{vm, g, ViewChat}, privateKey, node)
chat, err := NewChatViewController(&ViewController{vm, g, ViewChat}, privateKey, chatAdapter)
if err != nil {
exitErr(err)
}

262
node.go
View File

@ -1,262 +0,0 @@
package main
import (
"context"
"crypto/ecdsa"
"errors"
"fmt"
"log"
stdlog "log"
"math/rand"
"os"
"sort"
"time"
"github.com/status-im/status-term-client/protocol/v1"
"github.com/ethereum/go-ethereum/p2p"
"github.com/status-im/status-go/logutils"
"github.com/status-im/status-go/node"
"github.com/status-im/status-go/params"
"github.com/status-im/status-go/services/shhext"
"github.com/status-im/status-go/t/helpers"
whisper "github.com/status-im/whisper/whisperv6"
)
func init() {
if err := logutils.OverrideRootLog(true, "DEBUG", "", false); err != nil {
stdlog.Fatalf("failed to override root log: %v\n", err)
}
}
// WhisperSubscription encapsulates a Whisper filter.
type WhisperSubscription struct {
shh *whisper.Whisper
filterID string
}
// NewWhisperSubscription returns a new WhisperSubscription.
func NewWhisperSubscription(shh *whisper.Whisper, filterID string) *WhisperSubscription {
return &WhisperSubscription{shh, filterID}
}
// Messages retrieves a list of messages for a given filter.
func (s WhisperSubscription) Messages() ([]*ReceivedMessage, error) {
f := s.shh.GetFilter(s.filterID)
if f == nil {
return nil, errors.New("filter does not exist")
}
items := f.Retrieve()
result := make([]*ReceivedMessage, len(items))
for i, item := range items {
decoded, err := protocol.DecodeMessage(item.Payload)
if err != nil {
log.Printf("failed to decode message: %s", item.Payload)
continue
}
result[i] = &ReceivedMessage{
Received: item,
Decoded: decoded,
}
}
sort.Slice(result, func(i, j int) bool {
return result[i].Decoded.Clock < result[j].Decoded.Clock
})
return result, nil
}
// Unsubscribe removes the subscription.
func (s WhisperSubscription) Unsubscribe() error {
return s.shh.Unsubscribe(s.filterID)
}
// Node is an adapter between Chat interface and Status node.
type Node struct {
node *node.StatusNode
}
// NewNode returns a new node.
func NewNode() *Node {
return &Node{node: node.New()}
}
// Start starts the node.
// TODO: params should be taken from the config argument.
func (n *Node) Start(dataDir, fleet, configFile string) error {
if err := os.MkdirAll(dataDir, os.ModeDir|0755); err != nil {
return fmt.Errorf("failed to create a data dir: %v", err)
}
var configFiles []string
if configFile != "" {
configFiles = append(configFiles, configFile)
}
config, err := params.NewNodeConfigWithDefaultsAndFiles(
dataDir,
params.MainNetworkID,
[]params.Option{params.WithFleet(fleet)},
configFiles,
)
if err != nil {
return fmt.Errorf("failed to create a config: %v", err)
}
return n.node.Start(config)
}
// AddKeyPair adds a key pair to the Whisper service.
func (n *Node) AddKeyPair(key *ecdsa.PrivateKey) (string, error) {
shh, err := n.node.WhisperService()
if err != nil {
return "", err
}
return shh.AddKeyPair(key)
}
// SubscribePublicChat subscribes to a public chat using the Whisper service.
func (n *Node) SubscribePublicChat(name string) (sub MessagesSubscription, err error) {
shh, err := n.node.WhisperService()
if err != nil {
return
}
// TODO: add cache
symKeyID, err := shh.AddSymKeyFromPassword(name)
if err != nil {
return
}
symKey, err := shh.GetSymKey(symKeyID)
if err != nil {
return
}
// TODO: add cache
topic, err := protocol.PublicChatTopic(name)
if err != nil {
return
}
filterID, err := shh.Subscribe(&whisper.Filter{
KeySym: symKey,
Topics: [][]byte{topic[:]},
PoW: 0,
AllowP2P: true,
})
if err != nil {
return
}
return NewWhisperSubscription(shh, filterID), nil
}
// SendPublicMessage sends a new message using the Whisper service.
func (n *Node) SendPublicMessage(name string, data []byte, identity Identity) (string, error) {
whisperService, err := n.node.WhisperService()
if err != nil {
return "", err
}
// TODO: add cache
keyID, err := whisperService.AddKeyPair(identity)
if err != nil {
return "", err
}
// TODO: add cache
symKeyID, err := whisperService.AddSymKeyFromPassword(name)
if err != nil {
return "", err
}
// TODO: add cache
topic, err := protocol.PublicChatTopic(name)
if err != nil {
return "", err
}
shh := whisper.NewPublicWhisperAPI(whisperService)
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
defer cancel()
hash, err := shh.Post(ctx, whisper.NewMessage{
SymKeyID: symKeyID,
TTL: 60,
Topic: topic,
Payload: data,
PowTarget: 2.0,
PowTime: 5,
Sig: keyID,
})
if err == nil {
stdlog.Printf("sent a message with hash %s", hash.String())
}
return hash.String(), err
}
// RequestPublicMessages requests messages from mail servers.
func (n *Node) RequestPublicMessages(chatName string, params RequestMessagesParams) error {
// TODO: add cache
topic, err := protocol.PublicChatTopic(chatName)
if err != nil {
return err
}
shhService, err := n.node.WhisperService()
if err != nil {
return err
}
shhextService, err := n.node.ShhExtService()
if err != nil {
return err
}
shhextAPI := shhext.NewPublicAPI(shhextService)
config := n.node.Config()
mailServerEnode := randomItem(config.ClusterConfig.TrustedMailServers)
errCh := helpers.WaitForPeerAsync(
n.node.GethNode().Server(),
mailServerEnode,
p2p.PeerEventTypeAdd,
time.Second*5,
)
if err := n.node.AddPeer(mailServerEnode); err != nil {
return err
}
if err := <-errCh; err != nil {
return err
}
mailServerSymKeyID, err := shhService.AddSymKeyFromPassword(protocol.MailServerPassword)
if err != nil {
return err
}
ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
defer cancel()
hash, err := shhextAPI.RequestMessages(ctx, shhext.MessagesRequest{
MailServerPeer: mailServerEnode,
From: uint32(params.From), // TODO: change to int in status-go
To: uint32(params.To), // TODO: change to int in status-go
Limit: uint32(params.Limit), // TODO: change to int in status-go
Topics: []whisper.TopicType{topic},
SymKeyID: mailServerSymKeyID,
})
if err == nil {
stdlog.Printf("send a request for messages: %s", hash.String())
}
// TODO: wait for the request to finish before returning
return err
}
func randomItem(items []string) string {
l := len(items)
return items[rand.Intn(l)]
}

34
node_config.go Normal file
View File

@ -0,0 +1,34 @@
package main
import (
"fmt"
stdlog "log"
"os"
"github.com/status-im/status-go/logutils"
"github.com/status-im/status-go/params"
)
func init() {
if err := logutils.OverrideRootLog(true, "DEBUG", "", false); err != nil {
stdlog.Fatalf("failed to override root log: %v\n", err)
}
}
func generateStatusNodeConfig(dataDir, fleet, configFile string) (*params.NodeConfig, error) {
if err := os.MkdirAll(dataDir, os.ModeDir|0755); err != nil {
return nil, fmt.Errorf("failed to create a data dir: %v", err)
}
var configFiles []string
if configFile != "" {
configFiles = append(configFiles, configFile)
}
return params.NewNodeConfigWithDefaultsAndFiles(
dataDir,
params.MainNetworkID,
[]params.Option{params.WithFleet(fleet)},
configFiles,
)
}

53
protocol/v1/chat.go Normal file
View File

@ -0,0 +1,53 @@
package protocol
import (
"context"
"crypto/ecdsa"
)
// Chat provides an interface to interact with any chat.
type Chat interface {
PublicChat
}
// PublicChat provides an interface to interact with public chats.
type PublicChat interface {
SubscribePublicChat(
ctx context.Context,
name string,
in chan<- *ReceivedMessage,
) (*Subscription, error)
// SendPublicMessages sends a message to a public chat.
// Identity is required as the protocol requires
// all messages to be signed.
SendPublicMessage(
ctx context.Context,
chatName string,
data []byte,
identity *ecdsa.PrivateKey,
) (string, error)
// TODO: RequestMessagesParams is Whisper specific.
RequestPublicMessages(
ctx context.Context,
chatName string,
params RequestMessagesParams,
) error
}
// ReceivedMessage contains a decoded message payload
// and some additional fields that we learnt
// about the message.
type ReceivedMessage struct {
Decoded StatusMessage
SigPubKey *ecdsa.PublicKey
}
// RequestMessagesParams is a list of params required
// to get historic messages.
type RequestMessagesParams struct {
Limit int
From int64
To int64
}

View File

@ -0,0 +1,36 @@
package protocol
import "sync"
type Subscription struct {
sync.RWMutex
err error
done chan struct{}
}
func NewSubscription() *Subscription {
return &Subscription{
done: make(chan struct{}),
}
}
func (s *Subscription) cancel(err error) {
s.Lock()
s.err = err
s.Unlock()
}
func (s *Subscription) Unsubscribe() {
close(s.done)
}
func (s *Subscription) Err() error {
s.RLock()
defer s.RUnlock()
return s.err
}
func (s *Subscription) Done() <-chan struct{} {
return s.done
}

View File

@ -0,0 +1,207 @@
package protocol
import (
"context"
"crypto/ecdsa"
"fmt"
"log"
"sync"
"time"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/rpc"
"github.com/status-im/status-go/services/shhext"
"github.com/status-im/whisper/shhclient"
whisper "github.com/status-im/whisper/whisperv6"
)
// WhisperClientAdapter is an adapter for Whisper client
// which implements Chat interface. It requires an RPC client
// which can use various transports like HTTP, IPC or in-proc.
type WhisperClientAdapter struct {
rpcClient *rpc.Client
shhClient *shhclient.Client
mailServerEnodes []string
mu sync.RWMutex
passSymKeyCache map[string]string
}
// WhisperClientAdapter must implement Chat interface.
var _ Chat = (*WhisperClientAdapter)(nil)
// NewWhisperClientAdapter returns a new WhisperClientAdapter.
func NewWhisperClientAdapter(c *rpc.Client, mailServers []string) *WhisperClientAdapter {
return &WhisperClientAdapter{
rpcClient: c,
shhClient: shhclient.NewClient(c),
mailServerEnodes: mailServers,
passSymKeyCache: make(map[string]string),
}
}
// SubscribePublicChat subscribes to a public channel.
// in channel is used to receive messages.
// errCh is used to forward any errors that may occur
// during the subscription.
func (a *WhisperClientAdapter) SubscribePublicChat(ctx context.Context, name string, in chan<- *ReceivedMessage) (*Subscription, error) {
symKeyID, err := a.getOrAddSymKey(ctx, name)
if err != nil {
return nil, err
}
topic, err := PublicChatTopic(name)
if err != nil {
return nil, err
}
messages := make(chan *whisper.Message)
criteria := whisper.Criteria{
SymKeyID: symKeyID,
MinPow: 0, // TODO: set it to proper value
Topics: []whisper.TopicType{topic},
AllowP2P: true, // messages from mail server are direct p2p messages
}
shhSub, err := a.shhClient.SubscribeMessages(ctx, criteria, messages)
if err != nil {
return nil, err
}
sub := NewSubscription()
go func() {
defer shhSub.Unsubscribe()
for {
select {
case raw := <-messages:
m, err := DecodeMessage(raw.Payload)
if err != nil {
log.Printf("failed to decode message: %v", err)
break
}
sigPubKey, err := crypto.UnmarshalPubkey(raw.Sig)
if err != nil {
log.Printf("failed to get a signature: %v", err)
break
}
in <- &ReceivedMessage{
Decoded: m,
SigPubKey: sigPubKey,
}
case err := <-shhSub.Err():
sub.cancel(err)
return
case <-sub.Done():
return
}
}
}()
return sub, nil
}
// SendPublicMessage sends a new message to a public chat.
// Identity is required to sign a message as only signed messages
// are accepted and displayed.
func (a *WhisperClientAdapter) SendPublicMessage(ctx context.Context, name string, data []byte, identity *ecdsa.PrivateKey) (string, error) {
identityID, err := a.shhClient.AddPrivateKey(ctx, crypto.FromECDSA(identity))
if err != nil {
return "", err
}
symKeyID, err := a.getOrAddSymKey(ctx, name)
if err != nil {
return "", err
}
topic, err := PublicChatTopic(name)
if err != nil {
return "", err
}
return a.shhClient.Post(ctx, whisper.NewMessage{
SymKeyID: symKeyID,
TTL: 60,
Topic: topic,
Payload: data,
PowTarget: 2.0,
PowTime: 5,
Sig: identityID,
})
}
// RequestPublicMessages sends a request to MailServer for historic messages.
func (a *WhisperClientAdapter) RequestPublicMessages(ctx context.Context, name string, params RequestMessagesParams) error {
enode := randomItem(a.mailServerEnodes)
log.Printf("using %s as a mail server", enode)
if err := a.rpcClient.CallContext(ctx, nil, "admin_addPeer", enode); err != nil {
return err
}
// Adding peer is asynchronous operation so we need to retry a few times.
retries := 0
for {
<-time.After(time.Second)
err := a.shhClient.MarkTrustedPeer(ctx, enode)
if ctx.Err() == context.Canceled {
log.Printf("requesting public messages canceled")
return err
}
if err == nil {
break
}
if retries < 3 {
retries++
} else {
return fmt.Errorf("failed to mark peer as trusted: %v", err)
}
}
mailServerSymKeyID, err := a.getOrAddSymKey(ctx, MailServerPassword)
if err != nil {
return err
}
topic, err := PublicChatTopic(name)
if err != nil {
return err
}
req := shhext.MessagesRequest{
MailServerPeer: enode,
SymKeyID: mailServerSymKeyID,
From: uint32(params.From), // TODO: change to int in status-go
To: uint32(params.To), // TODO: change to int in status-go
Limit: uint32(params.Limit), // TODO: change to int in status-go
Topics: []whisper.TopicType{topic},
}
return a.rpcClient.CallContext(ctx, nil, "shhext_requestMessages", req)
}
func (a *WhisperClientAdapter) getOrAddSymKey(ctx context.Context, pass string) (string, error) {
a.mu.RLock()
symKeyID, ok := a.passSymKeyCache[pass]
a.mu.RUnlock()
if ok {
return symKeyID, nil
}
symKeyID, err := a.shhClient.GenerateSymmetricKeyFromPassword(ctx, pass)
if err != nil {
return "", err
}
a.mu.Lock()
a.passSymKeyCache[pass] = symKeyID
a.mu.Unlock()
return symKeyID, nil
}

View File

@ -0,0 +1,232 @@
package protocol
import (
"context"
"crypto/ecdsa"
"errors"
"log"
"sort"
"time"
"github.com/ethereum/go-ethereum/p2p"
"github.com/status-im/status-go/node"
"github.com/status-im/status-go/services/shhext"
"github.com/status-im/status-go/t/helpers"
whisper "github.com/status-im/whisper/whisperv6"
)
// WhisperServiceAdapter is an adapter for Whisper service
// the implements Chat interface.
type WhisperServiceAdapter struct {
node *node.StatusNode
shh *whisper.Whisper
}
// WhisperServiceAdapter must implement Chat interface.
var _ Chat = (*WhisperServiceAdapter)(nil)
// NewWhisperServiceAdapter returns a new WhisperServiceAdapter.
func NewWhisperServiceAdapter(node *node.StatusNode, shh *whisper.Whisper) *WhisperServiceAdapter {
return &WhisperServiceAdapter{
node: node,
shh: shh,
}
}
// SubscribePublicChat subscribes to a public chat using the Whisper service.
func (a *WhisperServiceAdapter) SubscribePublicChat(ctx context.Context, name string, in chan<- *ReceivedMessage) (*Subscription, error) {
// TODO: add cache
symKeyID, err := a.shh.AddSymKeyFromPassword(name)
if err != nil {
return nil, err
}
symKey, err := a.shh.GetSymKey(symKeyID)
if err != nil {
return nil, err
}
// TODO: add cache
topic, err := PublicChatTopic(name)
if err != nil {
return nil, err
}
filterID, err := a.shh.Subscribe(&whisper.Filter{
KeySym: symKey,
Topics: [][]byte{topic[:]},
PoW: 0,
AllowP2P: true,
})
if err != nil {
return nil, err
}
subMessages := newWhisperSubscription(a.shh, filterID)
sub := NewSubscription()
go func() {
defer subMessages.Unsubscribe()
t := time.NewTicker(time.Second)
defer t.Stop()
for {
select {
case <-t.C:
messages, err := subMessages.Messages()
if err != nil {
sub.cancel(err)
return
}
sort.Slice(messages, func(i, j int) bool {
return messages[i].Decoded.Clock < messages[j].Decoded.Clock
})
for _, m := range messages {
in <- m
}
case <-sub.Done():
return
}
}
}()
return sub, nil
}
// SendPublicMessage sends a new message using the Whisper service.
func (a *WhisperServiceAdapter) SendPublicMessage(
ctx context.Context, name string, data []byte, identity *ecdsa.PrivateKey,
) (string, error) {
// TODO: add cache
keyID, err := a.shh.AddKeyPair(identity)
if err != nil {
return "", err
}
// TODO: add cache
symKeyID, err := a.shh.AddSymKeyFromPassword(name)
if err != nil {
return "", err
}
// TODO: add cache
topic, err := PublicChatTopic(name)
if err != nil {
return "", err
}
// Only public Whisper API implements logic to send messages.
shhAPI := whisper.NewPublicWhisperAPI(a.shh)
hash, err := shhAPI.Post(ctx, whisper.NewMessage{
SymKeyID: symKeyID,
TTL: 60,
Topic: topic,
Payload: data,
PowTarget: 2.0,
PowTime: 5,
Sig: keyID,
})
return hash.String(), err
}
// RequestPublicMessages requests messages from mail servers.
func (a *WhisperServiceAdapter) RequestPublicMessages(
ctx context.Context, name string, params RequestMessagesParams,
) error {
// TODO: add cache
topic, err := PublicChatTopic(name)
if err != nil {
return err
}
shhextService, err := a.node.ShhExtService()
if err != nil {
return err
}
shhextAPI := shhext.NewPublicAPI(shhextService)
// TODO: remove from here. MailServerEnode must be provided in the params.
config := a.node.Config()
mailServerEnode := randomItem(config.ClusterConfig.TrustedMailServers)
errCh := helpers.WaitForPeerAsync(
a.node.GethNode().Server(),
mailServerEnode,
p2p.PeerEventTypeAdd,
time.Second*5,
)
if err := a.node.AddPeer(mailServerEnode); err != nil {
return err
}
if err := <-errCh; err != nil {
return err
}
mailServerSymKeyID, err := a.shh.AddSymKeyFromPassword(MailServerPassword)
if err != nil {
return err
}
_, err = shhextAPI.RequestMessages(ctx, shhext.MessagesRequest{
MailServerPeer: mailServerEnode,
From: uint32(params.From), // TODO: change to int in status-go
To: uint32(params.To), // TODO: change to int in status-go
Limit: uint32(params.Limit), // TODO: change to int in status-go
Topics: []whisper.TopicType{topic},
SymKeyID: mailServerSymKeyID,
})
// TODO: wait for the request to finish before returning
return err
}
// whisperSubscription encapsulates a Whisper filter.
type whisperSubscription struct {
shh *whisper.Whisper
filterID string
}
// newWhisperSubscription returns a new whisperSubscription.
func newWhisperSubscription(shh *whisper.Whisper, filterID string) *whisperSubscription {
return &whisperSubscription{shh, filterID}
}
// Messages retrieves a list of messages for a given filter.
func (s whisperSubscription) Messages() ([]*ReceivedMessage, error) {
f := s.shh.GetFilter(s.filterID)
if f == nil {
return nil, errors.New("filter does not exist")
}
items := f.Retrieve()
result := make([]*ReceivedMessage, 0, len(items))
for _, item := range items {
log.Printf("retrieve a message: %#v", item)
decoded, err := DecodeMessage(item.Payload)
if err != nil {
log.Printf("failed to decode message: %v", err)
continue
}
result = append(result, &ReceivedMessage{
Decoded: decoded,
SigPubKey: item.SigToPubKey(),
})
}
sort.Slice(result, func(i, j int) bool {
return result[i].Decoded.Clock < result[j].Decoded.Clock
})
return result, nil
}
// Unsubscribe removes the subscription.
func (s whisperSubscription) Unsubscribe() error {
return s.shh.Unsubscribe(s.filterID)
}

View File

@ -0,0 +1,8 @@
package protocol
import "math/rand"
func randomItem(items []string) string {
l := len(items)
return items[rand.Intn(l)]
}