From 6ef0686b102dabf2b9cd1a0d2e87dd78af540890 Mon Sep 17 00:00:00 2001 From: Adam Babik Date: Mon, 18 Mar 2019 09:53:39 +0100 Subject: [PATCH] extract adapters from protocol package (#13) --- main.go | 5 ++- protocol/{v1 => adapters}/mailserver.go | 2 +- protocol/{v1 => adapters}/whisper_client.go | 35 +++++++++--------- protocol/{v1 => adapters}/whisper_service.go | 37 ++++++++++--------- .../topic.go => adapters/whisper_topic.go} | 2 +- protocol/{v1 => adapters}/whisper_utils.go | 2 +- protocol/v1/subscription.go | 13 +++++-- 7 files changed, 53 insertions(+), 43 deletions(-) rename protocol/{v1 => adapters}/mailserver.go (90%) rename protocol/{v1 => adapters}/whisper_client.go (90%) rename protocol/{v1 => adapters}/whisper_service.go (87%) rename protocol/{v1/topic.go => adapters/whisper_topic.go} (97%) rename protocol/{v1 => adapters}/whisper_utils.go (95%) diff --git a/main.go b/main.go index 926b347..47f41d6 100644 --- a/main.go +++ b/main.go @@ -15,6 +15,7 @@ import ( "github.com/jroimartin/gocui" "github.com/peterbourgon/ff" "github.com/pkg/errors" + "github.com/status-im/status-console-client/protocol/adapters" "github.com/status-im/status-console-client/protocol/client" "github.com/status-im/status-console-client/protocol/v1" "github.com/status-im/status-go/node" @@ -88,7 +89,7 @@ func main() { exitErr(errors.Wrap(err, "failed to generate node config")) } - chatAdapter = protocol.NewWhisperClientAdapter(rpc, nodeConfig.ClusterConfig.TrustedMailServers) + chatAdapter = adapters.NewWhisperClientAdapter(rpc, nodeConfig.ClusterConfig.TrustedMailServers) } else { // collect mail server request signals signalsForwarder := newSignalForwarder() @@ -115,7 +116,7 @@ func main() { exitErr(errors.Wrap(err, "failed to get Whisper service")) } - chatAdapter = protocol.NewWhisperServiceAdapter(statusNode, shhService) + chatAdapter = adapters.NewWhisperServiceAdapter(statusNode, shhService) } var err error diff --git a/protocol/v1/mailserver.go b/protocol/adapters/mailserver.go similarity index 90% rename from protocol/v1/mailserver.go rename to protocol/adapters/mailserver.go index eecef9c..bc71a5f 100644 --- a/protocol/v1/mailserver.go +++ b/protocol/adapters/mailserver.go @@ -1,4 +1,4 @@ -package protocol +package adapters // MailServerPassword is a password that is required // to request messages from a Status mail server. diff --git a/protocol/v1/whisper_client.go b/protocol/adapters/whisper_client.go similarity index 90% rename from protocol/v1/whisper_client.go rename to protocol/adapters/whisper_client.go index 04adc92..eb0887c 100644 --- a/protocol/v1/whisper_client.go +++ b/protocol/adapters/whisper_client.go @@ -1,4 +1,4 @@ -package protocol +package adapters import ( "context" @@ -10,6 +10,7 @@ import ( "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/rpc" + "github.com/status-im/status-console-client/protocol/v1" "github.com/status-im/status-go/services/shhext" "github.com/status-im/whisper/shhclient" whisper "github.com/status-im/whisper/whisperv6" @@ -28,7 +29,7 @@ type WhisperClientAdapter struct { } // WhisperClientAdapter must implement Chat interface. -var _ Chat = (*WhisperClientAdapter)(nil) +var _ protocol.Chat = (*WhisperClientAdapter)(nil) // NewWhisperClientAdapter returns a new WhisperClientAdapter. func NewWhisperClientAdapter(c *rpc.Client, mailServers []string) *WhisperClientAdapter { @@ -40,13 +41,13 @@ func NewWhisperClientAdapter(c *rpc.Client, mailServers []string) *WhisperClient } } -// SubscribePublicChat subscribes to a public channel. +// Subscribe subscribes to a public channel. // in channel is used to receive messages. func (a *WhisperClientAdapter) Subscribe( ctx context.Context, - in chan<- *Message, - options SubscribeOptions, -) (*Subscription, error) { + in chan<- *protocol.Message, + options protocol.SubscribeOptions, +) (*protocol.Subscription, error) { criteria := whisper.Criteria{ MinPow: 0, // TODO: set it to proper value AllowP2P: true, // messages from mail server are direct p2p messages @@ -84,15 +85,15 @@ func (a *WhisperClientAdapter) Subscribe( func (a *WhisperClientAdapter) subscribeMessages( ctx context.Context, crit whisper.Criteria, - in chan<- *Message, -) (*Subscription, error) { + in chan<- *protocol.Message, +) (*protocol.Subscription, error) { messages := make(chan *whisper.Message) shhSub, err := a.shhClient.SubscribeMessages(ctx, crit, messages) if err != nil { return nil, err } - sub := NewSubscription() + sub := protocol.NewSubscription() go func() { defer shhSub.Unsubscribe() @@ -100,7 +101,7 @@ func (a *WhisperClientAdapter) subscribeMessages( for { select { case raw := <-messages: - m, err := DecodeMessage(raw.Payload) + m, err := protocol.DecodeMessage(raw.Payload) if err != nil { log.Printf("failed to decode message: %v", err) break @@ -112,12 +113,12 @@ func (a *WhisperClientAdapter) subscribeMessages( break } - in <- &Message{ + in <- &protocol.Message{ Decoded: m, SigPubKey: sigPubKey, } case err := <-shhSub.Err(): - sub.cancel(err) + sub.Cancel(err) return case <-sub.Done(): return @@ -128,13 +129,13 @@ func (a *WhisperClientAdapter) subscribeMessages( return sub, nil } -// SendPublicMessage sends a new message to a public chat. +// Send sends a new message to a public chat. // Identity is required to sign a message as only signed messages // are accepted and displayed. func (a *WhisperClientAdapter) Send( ctx context.Context, data []byte, - options SendOptions, + options protocol.SendOptions, ) ([]byte, error) { identityID, err := a.shhClient.AddPrivateKey(ctx, crypto.FromECDSA(options.Identity)) if err != nil { @@ -179,7 +180,7 @@ func (a *WhisperClientAdapter) Send( } // Request sends a request to MailServer for historic messages. -func (a *WhisperClientAdapter) Request(ctx context.Context, params RequestOptions) error { +func (a *WhisperClientAdapter) Request(ctx context.Context, params protocol.RequestOptions) error { enode, err := a.addMailServer(ctx) if err != nil { return err @@ -217,7 +218,7 @@ func (a *WhisperClientAdapter) addMailServer(ctx context.Context) (string, error return enode, nil } -func (a *WhisperClientAdapter) requestMessages(ctx context.Context, enode string, params RequestOptions) error { +func (a *WhisperClientAdapter) requestMessages(ctx context.Context, enode string, params protocol.RequestOptions) error { log.Printf("requesting messages from node %s", enode) req, err := a.createMessagesRequest(enode, params) @@ -230,7 +231,7 @@ func (a *WhisperClientAdapter) requestMessages(ctx context.Context, enode string func (a *WhisperClientAdapter) createMessagesRequest( enode string, - params RequestOptions, + params protocol.RequestOptions, ) (req shhext.MessagesRequest, err error) { mailSymKeyID, err := a.getOrAddSymKey(context.Background(), MailServerPassword) if err != nil { diff --git a/protocol/v1/whisper_service.go b/protocol/adapters/whisper_service.go similarity index 87% rename from protocol/v1/whisper_service.go rename to protocol/adapters/whisper_service.go index 501e39f..4e58c3c 100644 --- a/protocol/v1/whisper_service.go +++ b/protocol/adapters/whisper_service.go @@ -1,4 +1,4 @@ -package protocol +package adapters import ( "context" @@ -10,6 +10,7 @@ import ( "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/p2p" + "github.com/status-im/status-console-client/protocol/v1" "github.com/status-im/status-go/node" "github.com/status-im/status-go/services/shhext" "github.com/status-im/status-go/t/helpers" @@ -27,7 +28,7 @@ type WhisperServiceAdapter struct { } // WhisperServiceAdapter must implement Chat interface. -var _ Chat = (*WhisperServiceAdapter)(nil) +var _ protocol.Chat = (*WhisperServiceAdapter)(nil) // NewWhisperServiceAdapter returns a new WhisperServiceAdapter. func NewWhisperServiceAdapter(node *node.StatusNode, shh *whisper.Whisper) *WhisperServiceAdapter { @@ -37,12 +38,12 @@ func NewWhisperServiceAdapter(node *node.StatusNode, shh *whisper.Whisper) *Whis } } -// SubscribePublicChat subscribes to a public chat using the Whisper service. +// Subscribe subscribes to a public chat using the Whisper service. func (a *WhisperServiceAdapter) Subscribe( ctx context.Context, - in chan<- *Message, - options SubscribeOptions, -) (*Subscription, error) { + in chan<- *protocol.Message, + options protocol.SubscribeOptions, +) (*protocol.Subscription, error) { if err := options.Validate(); err != nil { return nil, err } @@ -58,7 +59,7 @@ func (a *WhisperServiceAdapter) Subscribe( } subWhisper := newWhisperSubscription(a.shh, filterID) - sub := NewSubscription() + sub := protocol.NewSubscription() go func() { defer subWhisper.Unsubscribe() // nolint: errcheck @@ -71,7 +72,7 @@ func (a *WhisperServiceAdapter) Subscribe( case <-t.C: messages, err := subWhisper.Messages() if err != nil { - sub.cancel(err) + sub.Cancel(err) return } @@ -91,7 +92,7 @@ func (a *WhisperServiceAdapter) Subscribe( return sub, nil } -func (a *WhisperServiceAdapter) createFilter(opts SubscribeOptions) (*whisper.Filter, error) { +func (a *WhisperServiceAdapter) createFilter(opts protocol.SubscribeOptions) (*whisper.Filter, error) { filter := whisper.Filter{ PoW: 0, AllowP2P: true, @@ -131,7 +132,7 @@ func (a *WhisperServiceAdapter) createFilter(opts SubscribeOptions) (*whisper.Fi func (a *WhisperServiceAdapter) Send( ctx context.Context, data []byte, - options SendOptions, + options protocol.SendOptions, ) ([]byte, error) { if err := options.Validate(); err != nil { return nil, err @@ -147,7 +148,7 @@ func (a *WhisperServiceAdapter) Send( return shhAPI.Post(ctx, newMessage) } -func (a *WhisperServiceAdapter) createNewMessage(data []byte, options SendOptions) (message whisper.NewMessage, err error) { +func (a *WhisperServiceAdapter) createNewMessage(data []byte, options protocol.SendOptions) (message whisper.NewMessage, err error) { // TODO: add cache keyID, err := a.shh.AddKeyPair(options.Identity) if err != nil { @@ -182,7 +183,7 @@ func (a *WhisperServiceAdapter) createNewMessage(data []byte, options SendOption } // Request requests messages from mail servers. -func (a *WhisperServiceAdapter) Request(ctx context.Context, options RequestOptions) error { +func (a *WhisperServiceAdapter) Request(ctx context.Context, options protocol.RequestOptions) error { if err := options.Validate(); err != nil { return err } @@ -229,7 +230,7 @@ func (a *WhisperServiceAdapter) selectAndAddMailServer() (string, error) { return enode, err } -func (a *WhisperServiceAdapter) requestMessages(ctx context.Context, enode string, params RequestOptions) (resp shhext.MessagesResponse, err error) { +func (a *WhisperServiceAdapter) requestMessages(ctx context.Context, enode string, params protocol.RequestOptions) (resp shhext.MessagesResponse, err error) { shhextService, err := a.node.ShhExtService() if err != nil { return @@ -250,7 +251,7 @@ func (a *WhisperServiceAdapter) requestMessages(ctx context.Context, enode strin func (a *WhisperServiceAdapter) createMessagesRequest( enode string, - params RequestOptions, + params protocol.RequestOptions, ) (req shhext.MessagesRequest, err error) { mailSymKeyID, err := a.shh.AddSymKeyFromPassword(MailServerPassword) if err != nil { @@ -294,23 +295,23 @@ func newWhisperSubscription(shh *whisper.Whisper, filterID string) *whisperSubsc } // Messages retrieves a list of messages for a given filter. -func (s whisperSubscription) Messages() ([]*Message, error) { +func (s whisperSubscription) Messages() ([]*protocol.Message, error) { f := s.shh.GetFilter(s.filterID) if f == nil { return nil, errors.New("filter does not exist") } items := f.Retrieve() - result := make([]*Message, 0, len(items)) + result := make([]*protocol.Message, 0, len(items)) for _, item := range items { - decoded, err := DecodeMessage(item.Payload) + decoded, err := protocol.DecodeMessage(item.Payload) if err != nil { log.Printf("failed to decode message: %v", err) continue } - result = append(result, &Message{ + result = append(result, &protocol.Message{ Decoded: decoded, Hash: item.EnvelopeHash.Bytes(), SigPubKey: item.SigToPubKey(), diff --git a/protocol/v1/topic.go b/protocol/adapters/whisper_topic.go similarity index 97% rename from protocol/v1/topic.go rename to protocol/adapters/whisper_topic.go index 7d86ce6..d04e58c 100644 --- a/protocol/v1/topic.go +++ b/protocol/adapters/whisper_topic.go @@ -1,4 +1,4 @@ -package protocol +package adapters import ( whisper "github.com/status-im/whisper/whisperv6" diff --git a/protocol/v1/whisper_utils.go b/protocol/adapters/whisper_utils.go similarity index 95% rename from protocol/v1/whisper_utils.go rename to protocol/adapters/whisper_utils.go index 592a67e..09e35a3 100644 --- a/protocol/v1/whisper_utils.go +++ b/protocol/adapters/whisper_utils.go @@ -1,4 +1,4 @@ -package protocol +package adapters import ( "math/rand" diff --git a/protocol/v1/subscription.go b/protocol/v1/subscription.go index 891554e..c8620b9 100644 --- a/protocol/v1/subscription.go +++ b/protocol/v1/subscription.go @@ -1,6 +1,8 @@ package protocol -import "sync" +import ( + "sync" +) type Subscription struct { sync.RWMutex @@ -15,12 +17,17 @@ func NewSubscription() *Subscription { } } -func (s *Subscription) cancel(err error) { +func (s *Subscription) Cancel(err error) { s.Lock() + defer s.Unlock() + + if s.done == nil { + return + } + close(s.done) s.done = nil s.err = err - s.Unlock() } func (s *Subscription) Unsubscribe() {