2019-06-05 18:50:59 +02:00

194 lines
4.6 KiB
Go

package client
import (
"context"
"crypto/ecdsa"
"log"
"sync"
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/crypto"
"github.com/pkg/errors"
"github.com/status-im/status-console-client/protocol/v1"
)
// StreamHandler handles messages generated by Stream.
type StreamHandler func(*protocol.Message) error
// StreamStoreHandlerForContact handles a stream of messages for a known contact.
func StreamStoreHandlerForContact(db Database, c Contact) StreamHandler {
return func(m *protocol.Message) error {
return streamStoreHandlerForContact(db, c, m)
}
}
func streamStoreHandlerForContact(db Database, c Contact, m *protocol.Message) error {
_, err := db.SaveMessages(c, []*protocol.Message{m})
if err == ErrMsgAlreadyExist {
return err
} else if err != nil {
return errors.Wrap(err, "can't add message")
}
return nil
}
// StreamStoreHandlerMultiplexed handles a stream of messages
// where a sender of a message (Contact) is figured out
// from the message itself based on a public key used
// to sign a message.
func StreamStoreHandlerMultiplexed(db Database) StreamHandler {
return func(m *protocol.Message) error {
return streamStoreHandlerMultiplexed(db, m)
}
}
func streamStoreHandlerMultiplexed(db Database, m *protocol.Message) error {
publicKey := m.SigPubKey
if publicKey == nil {
return errors.New("message is unsigned")
}
contact := Contact{
Type: ContactPublicKey,
State: ContactNew,
Name: pubkeyToHex(publicKey), // TODO(dshulyak) replace with 3-word funny name
PublicKey: publicKey,
Topic: DefaultPrivateTopic(),
}
exists, err := db.PublicContactExist(contact)
if err != nil {
return errors.Wrap(err, "error verifying if a contact exist")
}
if !exists {
err := db.SaveContacts([]Contact{contact})
if err != nil {
return errors.Wrap(err, "can't save a new contact")
}
} else {
// TODO: replace with db.ContactByPublicKey()
contacts, err := db.Contacts()
if err != nil {
return errors.Wrap(err, "error getting contacts")
}
for _, c := range contacts {
if c.PublicKey == nil {
continue
}
// TODO: extract
if publicKey.X.Cmp(c.PublicKey.X) == 0 && publicKey.Y.Cmp(c.PublicKey.Y) == 0 {
contact = c
break
}
}
}
// TODO: discard message from blocked contact (State != ContactBlocked)
_, err = db.SaveMessages(contact, []*protocol.Message{m})
if err == ErrMsgAlreadyExist {
return err
} else if err != nil {
return errors.Wrap(err, "can't add a message")
}
return nil
}
// Stream converts messages subscription model to a stream of messages.
type Stream struct {
proto protocol.Protocol
handler StreamHandler
mu sync.Mutex
cancel func() // context cancel
wg sync.WaitGroup // wait for goroutines to finish
}
// NewStream creates a new stream instance with protocol and handler.
func NewStream(proto protocol.Protocol, handler StreamHandler) *Stream {
return &Stream{
proto: proto,
handler: handler,
}
}
// Start subscribes to the protocol and starts converting messages to a stream.
func (s *Stream) Start(aCtx context.Context, options protocol.SubscribeOptions) (err error) {
if s.cancel != nil {
return errors.New("already started")
}
ctx, cancel := context.WithCancel(aCtx)
s.mu.Lock()
s.cancel = cancel
s.mu.Unlock()
defer func() {
if err != nil {
s.mu.Lock()
s.cancel = nil
s.mu.Unlock()
}
}()
// Use buffered channel in a case when handler is slow.
msgs := make(chan *protocol.Message, 100)
sub, err := s.proto.Subscribe(ctx, msgs, options)
if err != nil {
return err
}
s.wg.Add(1)
go func() {
s.processLoop(ctx, msgs)
// clean up after the process loop finished
sub.Unsubscribe()
s.wg.Done()
}()
return nil
}
func (s *Stream) processLoop(ctx context.Context, messages <-chan *protocol.Message) {
for {
select {
case msg := <-messages:
err := s.handler(msg)
if err != nil {
s.processHandlerErr(err, msg)
continue
}
case <-ctx.Done():
return
}
}
}
// TODO(adam): back-propagate this error
func (s *Stream) processHandlerErr(err error, msg *protocol.Message) {
if err == ErrMsgAlreadyExist {
log.Printf("[Stream::Start] message with ID %x already exist", msg.ID)
} else if err != nil {
log.Printf("[Stream::Start] failed to save message with ID %x: %v", msg.ID, err)
}
}
// Stop stops the current subscription to the protocol.
func (s *Stream) Stop() {
if s.cancel == nil {
return
}
s.cancel()
s.mu.Lock()
s.cancel = nil
s.mu.Unlock()
s.wg.Wait()
}
func pubkeyToHex(key *ecdsa.PublicKey) string {
buf := crypto.FromECDSAPub(key)
return hexutil.Encode(buf)
}