align WhisperClient with WhisperService (#17)

This commit is contained in:
Adam Babik 2019-03-28 15:03:37 +01:00 committed by GitHub
parent 3ada0cd593
commit b80668730a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 319 additions and 156 deletions

View File

@ -0,0 +1,61 @@
package adapters
import (
"errors"
"time"
"github.com/ethereum/go-ethereum/p2p"
"github.com/ethereum/go-ethereum/p2p/enode"
)
var (
// ErrNoRunningNode node is not running.
ErrNoRunningNode = errors.New("there is no running node")
// ErrEmptyPeerURL provided peer URL is empty
ErrEmptyPeerURL = errors.New("empty peer url")
)
// waitForPeer waits for a peer to be added
func waitForPeer(p *p2p.Server, u string, e p2p.PeerEventType, t time.Duration, subscribed chan struct{}) error {
if p == nil {
return ErrNoRunningNode
}
if u == "" {
return ErrEmptyPeerURL
}
parsedPeer, err := enode.ParseV4(u)
if err != nil {
return err
}
ch := make(chan *p2p.PeerEvent)
subscription := p.SubscribeEvents(ch)
defer subscription.Unsubscribe()
close(subscribed)
for {
select {
case ev := <-ch:
if ev.Type == e && ev.Peer == parsedPeer.ID() {
return nil
}
case err := <-subscription.Err():
if err != nil {
return err
}
case <-time.After(t):
return errors.New("wait for peer: timeout")
}
}
}
// waitForPeerAsync waits for a peer to be added asynchronously
func waitForPeerAsync(p *p2p.Server, u string, e p2p.PeerEventType, t time.Duration) <-chan error {
subscribed := make(chan struct{})
errCh := make(chan error)
go func() {
errCh <- waitForPeer(p, u, e, t, subscribed)
}()
<-subscribed
return errCh
}

View File

@ -8,11 +8,13 @@ import (
"sync"
"time"
"github.com/status-im/status-console-client/protocol/v1"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/rpc"
"github.com/status-im/status-console-client/protocol/v1"
"github.com/status-im/status-go/services/shhext"
"github.com/status-im/whisper/shhclient"
whisper "github.com/status-im/whisper/whisperv6"
)
@ -23,6 +25,7 @@ type WhisperClientAdapter struct {
rpcClient *rpc.Client
shhClient *shhclient.Client
mailServerEnodes []string
selectedMailServerEnode string
mu sync.RWMutex
passSymKeyCache map[string]string
@ -48,24 +51,16 @@ func (a *WhisperClientAdapter) Subscribe(
in chan<- *protocol.Message,
options protocol.SubscribeOptions,
) (*protocol.Subscription, error) {
if err := options.Validate(); err != nil {
return nil, err
}
criteria := whisper.Criteria{
MinPow: 0, // TODO: set it to proper value
AllowP2P: true, // messages from mail server are direct p2p messages
}
if options.IsPublic() {
symKeyID, err := a.getOrAddSymKey(ctx, options.ChatName)
if err != nil {
return nil, err
}
criteria.SymKeyID = symKeyID
topic, err := PublicChatTopic(options.ChatName)
if err != nil {
return nil, err
}
criteria.Topics = append(criteria.Topics, topic)
} else {
if options.Identity != nil {
identityID, err := a.shhClient.AddPrivateKey(ctx, crypto.FromECDSA(options.Identity))
if err != nil {
return nil, err
@ -79,6 +74,20 @@ func (a *WhisperClientAdapter) Subscribe(
criteria.Topics = append(criteria.Topics, topic)
}
if options.ChatName != "" {
symKeyID, err := a.getOrAddSymKey(ctx, options.ChatName)
if err != nil {
return nil, err
}
criteria.SymKeyID = symKeyID
topic, err := PublicChatTopic(options.ChatName)
if err != nil {
return nil, err
}
criteria.Topics = append(criteria.Topics, topic)
}
return a.subscribeMessages(ctx, criteria, in)
}
@ -137,58 +146,72 @@ func (a *WhisperClientAdapter) Send(
data []byte,
options protocol.SendOptions,
) ([]byte, error) {
if err := options.Validate(); err != nil {
return nil, err
}
identityID, err := a.shhClient.AddPrivateKey(ctx, crypto.FromECDSA(options.Identity))
if err != nil {
return nil, err
}
newMessage := whisper.NewMessage{
TTL: 60,
Payload: data,
PowTarget: 2.0,
PowTime: 5,
Sig: identityID,
}
if options.IsPublic() {
symKeyID, err := a.getOrAddSymKey(ctx, options.ChatName)
message, err := a.createNewMessage(data, identityID, options)
if err != nil {
return nil, err
}
newMessage.SymKeyID = symKeyID
topic, err := PublicChatTopic(options.ChatName)
if err != nil {
return nil, err
}
newMessage.Topic = topic
} else {
newMessage.PublicKey = crypto.FromECDSAPub(options.Recipient)
topic, err := PrivateChatTopic()
if err != nil {
return nil, err
}
newMessage.Topic = topic
}
hash, err := a.shhClient.Post(ctx, newMessage)
hash, err := a.shhClient.Post(ctx, message)
if err != nil {
return nil, err
}
return hex.DecodeString(hash)
}
func (a *WhisperClientAdapter) createNewMessage(data []byte, sigKey string, options protocol.SendOptions) (whisper.NewMessage, error) {
message := createWhisperNewMessage(data, sigKey)
if options.Recipient != nil {
message.PublicKey = crypto.FromECDSAPub(options.Recipient)
topic, err := PrivateChatTopic()
if err != nil {
return message, err
}
message.Topic = topic
}
if options.ChatName != "" {
ctx := context.Background()
symKeyID, err := a.getOrAddSymKey(ctx, options.ChatName)
if err != nil {
return message, err
}
message.SymKeyID = symKeyID
topic, err := PublicChatTopic(options.ChatName)
if err != nil {
return message, err
}
message.Topic = topic
}
return message, nil
}
// Request sends a request to MailServer for historic messages.
func (a *WhisperClientAdapter) Request(ctx context.Context, params protocol.RequestOptions) error {
enode, err := a.addMailServer(ctx)
enode, err := a.selectAndAddMailServer(ctx)
if err != nil {
return err
}
return a.requestMessages(ctx, enode, params)
}
func (a *WhisperClientAdapter) addMailServer(ctx context.Context) (string, error) {
func (a *WhisperClientAdapter) selectAndAddMailServer(ctx context.Context) (string, error) {
if a.selectedMailServerEnode != "" {
return a.selectedMailServerEnode, nil
}
enode := randomItem(a.mailServerEnodes)
if err := a.rpcClient.CallContext(ctx, nil, "admin_addPeer", enode); err != nil {
@ -198,8 +221,6 @@ func (a *WhisperClientAdapter) addMailServer(ctx context.Context) (string, error
// Adding peer is asynchronous operation so we need to retry a few times.
retries := 0
for {
<-time.After(time.Second)
err := a.shhClient.MarkTrustedPeer(ctx, enode)
if ctx.Err() == context.Canceled {
log.Printf("requesting public messages canceled")
@ -210,50 +231,55 @@ func (a *WhisperClientAdapter) addMailServer(ctx context.Context) (string, error
}
if retries < 3 {
retries++
<-time.After(time.Second)
} else {
return "", fmt.Errorf("failed to mark peer as trusted: %v", err)
}
}
a.selectedMailServerEnode = enode
return enode, nil
}
func (a *WhisperClientAdapter) requestMessages(ctx context.Context, enode string, params protocol.RequestOptions) error {
log.Printf("requesting messages from node %s", enode)
req, err := a.createMessagesRequest(enode, params)
arg, err := a.createMessagesRequest(enode, params)
if err != nil {
return err
}
return a.rpcClient.CallContext(ctx, nil, "shhext_requestMessages", req)
return a.rpcClient.CallContext(ctx, nil, "shhext_requestMessages", arg)
}
func (a *WhisperClientAdapter) createMessagesRequest(
enode string,
params protocol.RequestOptions,
) (req shhext.MessagesRequest, err error) {
) (req shhextRequestMessagesParam, err error) {
mailSymKeyID, err := a.getOrAddSymKey(context.Background(), MailServerPassword)
if err != nil {
return req, err
}
req = shhext.MessagesRequest{
req = shhextRequestMessagesParam{
MailServerPeer: enode,
From: uint32(params.From), // TODO: change to int in status-go
To: uint32(params.To), // TODO: change to int in status-go
Limit: uint32(params.Limit), // TODO: change to int in status-go
From: params.From,
To: params.To,
Limit: params.Limit,
SymKeyID: mailSymKeyID,
}
if params.IsPublic() {
topic, err := PublicChatTopic(params.ChatName)
if params.Recipient != nil {
topic, err := PrivateChatTopic()
if err != nil {
return req, err
}
req.Topics = append(req.Topics, topic)
} else {
topic, err := PrivateChatTopic()
}
if params.ChatName != "" {
topic, err := PublicChatTopic(params.ChatName)
if err != nil {
return req, err
}

View File

@ -8,12 +8,13 @@ import (
"sort"
"time"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/p2p"
"github.com/status-im/status-console-client/protocol/v1"
"github.com/status-im/status-go/node"
"github.com/status-im/status-go/services/shhext"
"github.com/status-im/status-go/t/helpers"
"github.com/status-im/status-console-client/protocol/v1"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/p2p"
whisper "github.com/status-im/whisper/whisperv6"
)
@ -98,7 +99,17 @@ func (a *WhisperServiceAdapter) createFilter(opts protocol.SubscribeOptions) (*w
AllowP2P: true,
}
if opts.IsPublic() {
if opts.Identity != nil {
filter.KeyAsym = opts.Identity
topic, err := PrivateChatTopic()
if err != nil {
return nil, err
}
filter.Topics = append(filter.Topics, topic[:])
}
if opts.ChatName != "" {
symKeyID, err := a.shh.AddSymKeyFromPassword(opts.ChatName)
if err != nil {
return nil, err
@ -107,21 +118,12 @@ func (a *WhisperServiceAdapter) createFilter(opts protocol.SubscribeOptions) (*w
if err != nil {
return nil, err
}
filter.KeySym = symKey
topic, err := PublicChatTopic(opts.ChatName)
if err != nil {
return nil, err
}
filter.KeySym = symKey
filter.Topics = append(filter.Topics, topic[:])
} else {
filter.KeyAsym = opts.Identity
topic, err := PrivateChatTopic()
if err != nil {
return nil, err
}
filter.Topics = append(filter.Topics, topic[:])
}
@ -157,7 +159,17 @@ func (a *WhisperServiceAdapter) createNewMessage(data []byte, options protocol.S
message = createWhisperNewMessage(data, keyID)
if options.IsPublic() {
if options.Recipient != nil {
message.PublicKey = crypto.FromECDSAPub(options.Recipient)
topic, err := PrivateChatTopic()
if err != nil {
return message, err
}
message.Topic = topic
}
if options.ChatName != "" {
symKeyID, err := a.shh.AddSymKeyFromPassword(options.ChatName)
if err != nil {
return message, err
@ -169,14 +181,6 @@ func (a *WhisperServiceAdapter) createNewMessage(data []byte, options protocol.S
return message, err
}
message.Topic = topic
} else {
message.PublicKey = crypto.FromECDSAPub(options.Recipient)
topic, err := PrivateChatTopic()
if err != nil {
return message, err
}
message.Topic = topic
}
return
@ -199,7 +203,11 @@ func (a *WhisperServiceAdapter) Request(ctx context.Context, options protocol.Re
return err
}
now := time.Now()
_, err = a.requestMessages(ctx, req, true)
log.Printf("[WhisperServiceAdapter::Request] took %s", time.Since(now))
return err
}
@ -210,12 +218,15 @@ func (a *WhisperServiceAdapter) selectAndAddMailServer() (string, error) {
config := a.node.Config()
enode := randomItem(config.ClusterConfig.TrustedMailServers)
errCh := helpers.WaitForPeerAsync(
errCh := waitForPeerAsync(
a.node.GethNode().Server(),
enode,
p2p.PeerEventTypeAdd,
time.Second*5,
)
log.Printf("[WhisperServiceAdapter::selectAndAddMailServer] randomly selected %s node", enode)
if err := a.node.AddPeer(enode); err != nil {
return "", err
}
@ -246,6 +257,9 @@ func (a *WhisperServiceAdapter) requestMessages(ctx context.Context, req shhext.
if err != nil {
return
}
log.Printf("[WhisperServiceAdapter::requestMessages] response = %+v, err = %v", resp, err)
if resp.Error != nil {
err = resp.Error
return
@ -276,14 +290,16 @@ func (a *WhisperServiceAdapter) createMessagesRequest(
SymKeyID: mailSymKeyID,
}
if params.IsPublic() {
topic, err := PublicChatTopic(params.ChatName)
if params.Recipient != nil {
topic, err := PrivateChatTopic()
if err != nil {
return req, err
}
req.Topics = append(req.Topics, topic)
} else {
topic, err := PrivateChatTopic()
}
if params.ChatName != "" {
topic, err := PublicChatTopic(params.ChatName)
if err != nil {
return req, err
}

View File

@ -6,9 +6,14 @@ import (
whisper "github.com/status-im/whisper/whisperv6"
)
func randomItem(items []string) string {
l := len(items)
return items[rand.Intn(l)]
// shhextRequestMessagesParam is used to remove dependency on shhext module.
type shhextRequestMessagesParam struct {
MailServerPeer string `json:"mailServerPeer"`
From int64 `json:"from"`
To int64 `json:"to"`
Limit int `json:"limit"`
SymKeyID string `json:"symKeyID"`
Topics []whisper.TopicType `json:"topics"`
}
func createWhisperNewMessage(data []byte, sigKey string) whisper.NewMessage {
@ -20,3 +25,8 @@ func createWhisperNewMessage(data []byte, sigKey string) whisper.NewMessage {
Sig: sigKey,
}
}
func randomItem(items []string) string {
l := len(items)
return items[rand.Intn(l)]
}

View File

@ -4,11 +4,10 @@ import (
"context"
"crypto/ecdsa"
"encoding/hex"
"fmt"
"log"
"sort"
"strings"
"sync"
"time"
"github.com/pkg/errors"
"github.com/status-im/status-console-client/protocol/v1"
@ -36,7 +35,8 @@ type Chat struct {
lastClock int64
ownMessages chan *protocol.Message // my private messages channel
// TODO: make it a ring buffer
// TODO: make it a ring buffer. It will require loading newer messages
// from the cache as well.
messages []*protocol.Message // all messages ordered by Clock
messagesByHash map[string]*protocol.Message // quick access to messages by hash
}
@ -116,11 +116,9 @@ func (c *Chat) Subscribe(params protocol.RequestOptions) (err error) {
return errors.New("already subscribed")
}
opts := protocol.SubscribeOptions{}
if c.contact.Type == ContactPublicChat {
opts.ChatName = c.contact.Name
} else {
opts.Identity = c.identity
opts, err := extendSubscribeOptions(protocol.SubscribeOptions{}, c)
if err != nil {
return errors.Wrap(err, "failed to subscribe")
}
messages := make(chan *protocol.Message)
@ -140,12 +138,12 @@ func (c *Chat) Subscribe(params protocol.RequestOptions) (err error) {
}
// Load loads messages from the database cache and the network.
func (c *Chat) load(params protocol.RequestOptions) error {
func (c *Chat) load(options protocol.RequestOptions) error {
// Get already cached messages from the database.
cachedMessages, err := c.db.Messages(
c.contact,
params.From,
params.To,
options.From,
options.To,
)
if err != nil {
return errors.Wrap(err, "db failed to get messages")
@ -160,21 +158,19 @@ func (c *Chat) load(params protocol.RequestOptions) error {
}()
// Request historic messages from the network.
if err := c.request(params); err != nil {
if err := c.request(options); err != nil {
return errors.Wrap(err, "failed to request for messages")
}
return nil
}
func (c *Chat) request(params protocol.RequestOptions) error {
if c.contact.Type == ContactPublicChat {
params.ChatName = c.contact.Name
} else {
params.Recipient = c.contact.PublicKey
func (c *Chat) request(options protocol.RequestOptions) error {
opts, err := extendRequestOptions(options, c)
if err != nil {
return err
}
return c.proto.Request(context.Background(), params)
return c.proto.Request(context.Background(), opts)
}
// Request historic messages.
@ -189,7 +185,7 @@ func (c *Chat) Send(data []byte) error {
// This is needed to prevent sending messages
// if the chat is already left/canceled
// as a it can't be guaranteed that processing
// looop goroutines are still running.
// loop goroutines are still running.
select {
case _, ok := <-c.cancel:
if !ok {
@ -198,40 +194,31 @@ func (c *Chat) Send(data []byte) error {
default:
}
var messageType string
var message protocol.StatusMessage
text := strings.TrimSpace(string(data))
ts := time.Now().Unix() * 1000
clock := protocol.CalcMessageClock(c.lastClock, ts)
opts := protocol.SendOptions{
Identity: c.identity,
switch c.contact.Type {
case ContactPublicChat:
message = protocol.CreatePublicTextMessage(data, c.lastClock, c.contact.Name)
case ContactPrivateChat:
message = protocol.CreatePrivateTextMessage(data, c.lastClock, c.contact.Name)
default:
return fmt.Errorf("failed to send message: unsupported contact type")
}
if c.contact.Type == ContactPublicChat {
opts.ChatName = c.contact.Name
messageType = protocol.MessageTypePublicGroupUserMessage
} else {
opts.Recipient = c.contact.PublicKey
messageType = protocol.MessageTypePrivateUserMessage
}
message := protocol.StatusMessage{
Text: text,
ContentT: protocol.ContentTypeTextPlain,
MessageT: messageType,
Clock: clock,
Timestamp: ts,
Content: protocol.StatusMessageContent{ChatID: c.contact.Name, Text: text},
}
encodedMessage, err := protocol.EncodeMessage(message)
if err != nil {
return errors.Wrap(err, "failed to encode message")
}
c.Lock()
c.updateLastClock(clock)
c.updateLastClock(message.Clock)
c.Unlock()
opts, err := extendSendOptions(protocol.SendOptions{Identity: c.identity}, c)
if err != nil {
return errors.Wrap(err, "failed to prepare send options")
}
hash, err := c.proto.Send(context.Background(), encodedMessage, opts)
// Own messages need to be pushed manually to the pipeline.

View File

@ -0,0 +1,47 @@
package client
import (
"fmt"
"github.com/status-im/status-console-client/protocol/v1"
)
var (
errUnsupportedContactType = fmt.Errorf("unsupported contact type")
)
func extendSubscribeOptions(opts protocol.SubscribeOptions, c *Chat) (protocol.SubscribeOptions, error) {
switch c.contact.Type {
case ContactPublicChat:
opts.ChatName = c.contact.Name
case ContactPrivateChat:
opts.Identity = c.identity
default:
return opts, errUnsupportedContactType
}
return opts, nil
}
func extendRequestOptions(opts protocol.RequestOptions, c *Chat) (protocol.RequestOptions, error) {
switch c.contact.Type {
case ContactPublicChat:
opts.ChatName = c.contact.Name
case ContactPrivateChat:
opts.Recipient = c.contact.PublicKey
default:
return opts, errUnsupportedContactType
}
return opts, nil
}
func extendSendOptions(opts protocol.SendOptions, c *Chat) (protocol.SendOptions, error) {
switch c.contact.Type {
case ContactPublicChat:
opts.ChatName = c.contact.Name
case ContactPrivateChat:
opts.Recipient = c.contact.PublicKey
default:
return opts, errUnsupportedContactType
}
return opts, nil
}

View File

@ -55,7 +55,7 @@ func (m *ChatMock) Request(ctx context.Context, params protocol.RequestOptions)
func TestSubscribe(t *testing.T) {
proto := ChatMock{}
contact := Contact{}
contact := Contact{Name: "test", Type: ContactPublicChat}
db, err := NewDatabase("")
require.NoError(t, err)

View File

@ -23,7 +23,7 @@ type Messenger struct {
events chan interface{}
}
// NewMessanger returns a new Messanger.
// NewMessenger returns a new Messanger.
func NewMessenger(proto protocol.Chat, identity *ecdsa.PrivateKey, db *Database) *Messenger {
return &Messenger{
proto: proto,
@ -50,15 +50,12 @@ func (m *Messenger) Chat(c Contact) *Chat {
// Join creates a new chat and creates a subscription.
func (m *Messenger) Join(contact Contact, params protocol.RequestOptions) error {
m.RLock()
_, found := m.chats[contact]
m.RUnlock()
if found {
return nil
chat := m.Chat(contact)
if chat != nil {
return chat.load(params)
}
chat := NewChat(m.proto, m.identity, contact, m.db)
chat = NewChat(m.proto, m.identity, contact, m.db)
cancel := make(chan struct{})
m.Lock()

View File

@ -55,9 +55,6 @@ func (o RequestOptions) Validate() error {
return nil
}
// IsPublic returns true if RequestOptions are for a public chat.
func (o RequestOptions) IsPublic() bool { return o.ChatName != "" }
// DefaultRequestOptions returns default options returning messages
// from the last 24 hours.
func DefaultRequestOptions() RequestOptions {
@ -85,9 +82,6 @@ func (o SubscribeOptions) Validate() error {
return nil
}
// IsPublic returns true if SubscribeOptions are for a public chat.
func (o SubscribeOptions) IsPublic() bool { return o.ChatName != "" }
// SendOptions are options for Chat.Send.
type SendOptions struct {
Identity *ecdsa.PrivateKey
@ -109,6 +103,3 @@ func (o SendOptions) Validate() error {
}
return nil
}
// IsPublic returns true if SendOptions are for a public chat.
func (o SendOptions) IsPublic() bool { return o.ChatName != "" }

View File

@ -3,6 +3,8 @@ package protocol
import (
"bytes"
"errors"
"strings"
"time"
)
const (
@ -38,6 +40,32 @@ type StatusMessage struct {
Content StatusMessageContent
}
// CreateTextStatusMessage creates a StatusMessage.
func CreateTextStatusMessage(data []byte, lastClock int64, chatID, messageType string) StatusMessage {
text := strings.TrimSpace(string(data))
ts := time.Now().Unix() * 1000
clock := CalcMessageClock(lastClock, ts)
return StatusMessage{
Text: text,
ContentT: ContentTypeTextPlain,
MessageT: messageType,
Clock: clock,
Timestamp: ts,
Content: StatusMessageContent{ChatID: chatID, Text: text},
}
}
// CreatePublicTextMessage creates a public text StatusMessage.
func CreatePublicTextMessage(data []byte, lastClock int64, chatID string) StatusMessage {
return CreateTextStatusMessage(data, lastClock, chatID, MessageTypePublicGroupUserMessage)
}
// CreatePrivateTextMessage creates a public text StatusMessage.
func CreatePrivateTextMessage(data []byte, lastClock int64, chatID string) StatusMessage {
return CreateTextStatusMessage(data, lastClock, chatID, MessageTypePrivateUserMessage)
}
// DecodeMessage decodes a raw payload to StatusMessage struct.
func DecodeMessage(data []byte) (message StatusMessage, err error) {
buf := bytes.NewBuffer(data)