extract adapters from protocol package (#13)

This commit is contained in:
Adam Babik 2019-03-18 09:53:39 +01:00 committed by GitHub
parent 97964d0e7a
commit 6ef0686b10
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 53 additions and 43 deletions

View File

@ -15,6 +15,7 @@ import (
"github.com/jroimartin/gocui"
"github.com/peterbourgon/ff"
"github.com/pkg/errors"
"github.com/status-im/status-console-client/protocol/adapters"
"github.com/status-im/status-console-client/protocol/client"
"github.com/status-im/status-console-client/protocol/v1"
"github.com/status-im/status-go/node"
@ -88,7 +89,7 @@ func main() {
exitErr(errors.Wrap(err, "failed to generate node config"))
}
chatAdapter = protocol.NewWhisperClientAdapter(rpc, nodeConfig.ClusterConfig.TrustedMailServers)
chatAdapter = adapters.NewWhisperClientAdapter(rpc, nodeConfig.ClusterConfig.TrustedMailServers)
} else {
// collect mail server request signals
signalsForwarder := newSignalForwarder()
@ -115,7 +116,7 @@ func main() {
exitErr(errors.Wrap(err, "failed to get Whisper service"))
}
chatAdapter = protocol.NewWhisperServiceAdapter(statusNode, shhService)
chatAdapter = adapters.NewWhisperServiceAdapter(statusNode, shhService)
}
var err error

View File

@ -1,4 +1,4 @@
package protocol
package adapters
// MailServerPassword is a password that is required
// to request messages from a Status mail server.

View File

@ -1,4 +1,4 @@
package protocol
package adapters
import (
"context"
@ -10,6 +10,7 @@ import (
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/rpc"
"github.com/status-im/status-console-client/protocol/v1"
"github.com/status-im/status-go/services/shhext"
"github.com/status-im/whisper/shhclient"
whisper "github.com/status-im/whisper/whisperv6"
@ -28,7 +29,7 @@ type WhisperClientAdapter struct {
}
// WhisperClientAdapter must implement Chat interface.
var _ Chat = (*WhisperClientAdapter)(nil)
var _ protocol.Chat = (*WhisperClientAdapter)(nil)
// NewWhisperClientAdapter returns a new WhisperClientAdapter.
func NewWhisperClientAdapter(c *rpc.Client, mailServers []string) *WhisperClientAdapter {
@ -40,13 +41,13 @@ func NewWhisperClientAdapter(c *rpc.Client, mailServers []string) *WhisperClient
}
}
// SubscribePublicChat subscribes to a public channel.
// Subscribe subscribes to a public channel.
// in channel is used to receive messages.
func (a *WhisperClientAdapter) Subscribe(
ctx context.Context,
in chan<- *Message,
options SubscribeOptions,
) (*Subscription, error) {
in chan<- *protocol.Message,
options protocol.SubscribeOptions,
) (*protocol.Subscription, error) {
criteria := whisper.Criteria{
MinPow: 0, // TODO: set it to proper value
AllowP2P: true, // messages from mail server are direct p2p messages
@ -84,15 +85,15 @@ func (a *WhisperClientAdapter) Subscribe(
func (a *WhisperClientAdapter) subscribeMessages(
ctx context.Context,
crit whisper.Criteria,
in chan<- *Message,
) (*Subscription, error) {
in chan<- *protocol.Message,
) (*protocol.Subscription, error) {
messages := make(chan *whisper.Message)
shhSub, err := a.shhClient.SubscribeMessages(ctx, crit, messages)
if err != nil {
return nil, err
}
sub := NewSubscription()
sub := protocol.NewSubscription()
go func() {
defer shhSub.Unsubscribe()
@ -100,7 +101,7 @@ func (a *WhisperClientAdapter) subscribeMessages(
for {
select {
case raw := <-messages:
m, err := DecodeMessage(raw.Payload)
m, err := protocol.DecodeMessage(raw.Payload)
if err != nil {
log.Printf("failed to decode message: %v", err)
break
@ -112,12 +113,12 @@ func (a *WhisperClientAdapter) subscribeMessages(
break
}
in <- &Message{
in <- &protocol.Message{
Decoded: m,
SigPubKey: sigPubKey,
}
case err := <-shhSub.Err():
sub.cancel(err)
sub.Cancel(err)
return
case <-sub.Done():
return
@ -128,13 +129,13 @@ func (a *WhisperClientAdapter) subscribeMessages(
return sub, nil
}
// SendPublicMessage sends a new message to a public chat.
// Send sends a new message to a public chat.
// Identity is required to sign a message as only signed messages
// are accepted and displayed.
func (a *WhisperClientAdapter) Send(
ctx context.Context,
data []byte,
options SendOptions,
options protocol.SendOptions,
) ([]byte, error) {
identityID, err := a.shhClient.AddPrivateKey(ctx, crypto.FromECDSA(options.Identity))
if err != nil {
@ -179,7 +180,7 @@ func (a *WhisperClientAdapter) Send(
}
// Request sends a request to MailServer for historic messages.
func (a *WhisperClientAdapter) Request(ctx context.Context, params RequestOptions) error {
func (a *WhisperClientAdapter) Request(ctx context.Context, params protocol.RequestOptions) error {
enode, err := a.addMailServer(ctx)
if err != nil {
return err
@ -217,7 +218,7 @@ func (a *WhisperClientAdapter) addMailServer(ctx context.Context) (string, error
return enode, nil
}
func (a *WhisperClientAdapter) requestMessages(ctx context.Context, enode string, params RequestOptions) error {
func (a *WhisperClientAdapter) requestMessages(ctx context.Context, enode string, params protocol.RequestOptions) error {
log.Printf("requesting messages from node %s", enode)
req, err := a.createMessagesRequest(enode, params)
@ -230,7 +231,7 @@ func (a *WhisperClientAdapter) requestMessages(ctx context.Context, enode string
func (a *WhisperClientAdapter) createMessagesRequest(
enode string,
params RequestOptions,
params protocol.RequestOptions,
) (req shhext.MessagesRequest, err error) {
mailSymKeyID, err := a.getOrAddSymKey(context.Background(), MailServerPassword)
if err != nil {

View File

@ -1,4 +1,4 @@
package protocol
package adapters
import (
"context"
@ -10,6 +10,7 @@ import (
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/p2p"
"github.com/status-im/status-console-client/protocol/v1"
"github.com/status-im/status-go/node"
"github.com/status-im/status-go/services/shhext"
"github.com/status-im/status-go/t/helpers"
@ -27,7 +28,7 @@ type WhisperServiceAdapter struct {
}
// WhisperServiceAdapter must implement Chat interface.
var _ Chat = (*WhisperServiceAdapter)(nil)
var _ protocol.Chat = (*WhisperServiceAdapter)(nil)
// NewWhisperServiceAdapter returns a new WhisperServiceAdapter.
func NewWhisperServiceAdapter(node *node.StatusNode, shh *whisper.Whisper) *WhisperServiceAdapter {
@ -37,12 +38,12 @@ func NewWhisperServiceAdapter(node *node.StatusNode, shh *whisper.Whisper) *Whis
}
}
// SubscribePublicChat subscribes to a public chat using the Whisper service.
// Subscribe subscribes to a public chat using the Whisper service.
func (a *WhisperServiceAdapter) Subscribe(
ctx context.Context,
in chan<- *Message,
options SubscribeOptions,
) (*Subscription, error) {
in chan<- *protocol.Message,
options protocol.SubscribeOptions,
) (*protocol.Subscription, error) {
if err := options.Validate(); err != nil {
return nil, err
}
@ -58,7 +59,7 @@ func (a *WhisperServiceAdapter) Subscribe(
}
subWhisper := newWhisperSubscription(a.shh, filterID)
sub := NewSubscription()
sub := protocol.NewSubscription()
go func() {
defer subWhisper.Unsubscribe() // nolint: errcheck
@ -71,7 +72,7 @@ func (a *WhisperServiceAdapter) Subscribe(
case <-t.C:
messages, err := subWhisper.Messages()
if err != nil {
sub.cancel(err)
sub.Cancel(err)
return
}
@ -91,7 +92,7 @@ func (a *WhisperServiceAdapter) Subscribe(
return sub, nil
}
func (a *WhisperServiceAdapter) createFilter(opts SubscribeOptions) (*whisper.Filter, error) {
func (a *WhisperServiceAdapter) createFilter(opts protocol.SubscribeOptions) (*whisper.Filter, error) {
filter := whisper.Filter{
PoW: 0,
AllowP2P: true,
@ -131,7 +132,7 @@ func (a *WhisperServiceAdapter) createFilter(opts SubscribeOptions) (*whisper.Fi
func (a *WhisperServiceAdapter) Send(
ctx context.Context,
data []byte,
options SendOptions,
options protocol.SendOptions,
) ([]byte, error) {
if err := options.Validate(); err != nil {
return nil, err
@ -147,7 +148,7 @@ func (a *WhisperServiceAdapter) Send(
return shhAPI.Post(ctx, newMessage)
}
func (a *WhisperServiceAdapter) createNewMessage(data []byte, options SendOptions) (message whisper.NewMessage, err error) {
func (a *WhisperServiceAdapter) createNewMessage(data []byte, options protocol.SendOptions) (message whisper.NewMessage, err error) {
// TODO: add cache
keyID, err := a.shh.AddKeyPair(options.Identity)
if err != nil {
@ -182,7 +183,7 @@ func (a *WhisperServiceAdapter) createNewMessage(data []byte, options SendOption
}
// Request requests messages from mail servers.
func (a *WhisperServiceAdapter) Request(ctx context.Context, options RequestOptions) error {
func (a *WhisperServiceAdapter) Request(ctx context.Context, options protocol.RequestOptions) error {
if err := options.Validate(); err != nil {
return err
}
@ -229,7 +230,7 @@ func (a *WhisperServiceAdapter) selectAndAddMailServer() (string, error) {
return enode, err
}
func (a *WhisperServiceAdapter) requestMessages(ctx context.Context, enode string, params RequestOptions) (resp shhext.MessagesResponse, err error) {
func (a *WhisperServiceAdapter) requestMessages(ctx context.Context, enode string, params protocol.RequestOptions) (resp shhext.MessagesResponse, err error) {
shhextService, err := a.node.ShhExtService()
if err != nil {
return
@ -250,7 +251,7 @@ func (a *WhisperServiceAdapter) requestMessages(ctx context.Context, enode strin
func (a *WhisperServiceAdapter) createMessagesRequest(
enode string,
params RequestOptions,
params protocol.RequestOptions,
) (req shhext.MessagesRequest, err error) {
mailSymKeyID, err := a.shh.AddSymKeyFromPassword(MailServerPassword)
if err != nil {
@ -294,23 +295,23 @@ func newWhisperSubscription(shh *whisper.Whisper, filterID string) *whisperSubsc
}
// Messages retrieves a list of messages for a given filter.
func (s whisperSubscription) Messages() ([]*Message, error) {
func (s whisperSubscription) Messages() ([]*protocol.Message, error) {
f := s.shh.GetFilter(s.filterID)
if f == nil {
return nil, errors.New("filter does not exist")
}
items := f.Retrieve()
result := make([]*Message, 0, len(items))
result := make([]*protocol.Message, 0, len(items))
for _, item := range items {
decoded, err := DecodeMessage(item.Payload)
decoded, err := protocol.DecodeMessage(item.Payload)
if err != nil {
log.Printf("failed to decode message: %v", err)
continue
}
result = append(result, &Message{
result = append(result, &protocol.Message{
Decoded: decoded,
Hash: item.EnvelopeHash.Bytes(),
SigPubKey: item.SigToPubKey(),

View File

@ -1,4 +1,4 @@
package protocol
package adapters
import (
whisper "github.com/status-im/whisper/whisperv6"

View File

@ -1,4 +1,4 @@
package protocol
package adapters
import (
"math/rand"

View File

@ -1,6 +1,8 @@
package protocol
import "sync"
import (
"sync"
)
type Subscription struct {
sync.RWMutex
@ -15,12 +17,17 @@ func NewSubscription() *Subscription {
}
}
func (s *Subscription) cancel(err error) {
func (s *Subscription) Cancel(err error) {
s.Lock()
defer s.Unlock()
if s.done == nil {
return
}
close(s.done)
s.done = nil
s.err = err
s.Unlock()
}
func (s *Subscription) Unsubscribe() {