diff --git a/protocol/adapters/peer_utils.go b/protocol/adapters/peer_utils.go new file mode 100644 index 0000000..5668be6 --- /dev/null +++ b/protocol/adapters/peer_utils.go @@ -0,0 +1,61 @@ +package adapters + +import ( + "errors" + "time" + + "github.com/ethereum/go-ethereum/p2p" + "github.com/ethereum/go-ethereum/p2p/enode" +) + +var ( + // ErrNoRunningNode node is not running. + ErrNoRunningNode = errors.New("there is no running node") + // ErrEmptyPeerURL provided peer URL is empty + ErrEmptyPeerURL = errors.New("empty peer url") +) + +// waitForPeer waits for a peer to be added +func waitForPeer(p *p2p.Server, u string, e p2p.PeerEventType, t time.Duration, subscribed chan struct{}) error { + if p == nil { + return ErrNoRunningNode + } + if u == "" { + return ErrEmptyPeerURL + } + parsedPeer, err := enode.ParseV4(u) + if err != nil { + return err + } + + ch := make(chan *p2p.PeerEvent) + subscription := p.SubscribeEvents(ch) + defer subscription.Unsubscribe() + close(subscribed) + + for { + select { + case ev := <-ch: + if ev.Type == e && ev.Peer == parsedPeer.ID() { + return nil + } + case err := <-subscription.Err(): + if err != nil { + return err + } + case <-time.After(t): + return errors.New("wait for peer: timeout") + } + } +} + +// waitForPeerAsync waits for a peer to be added asynchronously +func waitForPeerAsync(p *p2p.Server, u string, e p2p.PeerEventType, t time.Duration) <-chan error { + subscribed := make(chan struct{}) + errCh := make(chan error) + go func() { + errCh <- waitForPeer(p, u, e, t, subscribed) + }() + <-subscribed + return errCh +} diff --git a/protocol/adapters/whisper_client.go b/protocol/adapters/whisper_client.go index eb0887c..cc0aef1 100644 --- a/protocol/adapters/whisper_client.go +++ b/protocol/adapters/whisper_client.go @@ -8,11 +8,13 @@ import ( "sync" "time" + "github.com/status-im/status-console-client/protocol/v1" + "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/rpc" - "github.com/status-im/status-console-client/protocol/v1" - "github.com/status-im/status-go/services/shhext" + "github.com/status-im/whisper/shhclient" + whisper "github.com/status-im/whisper/whisperv6" ) @@ -20,9 +22,10 @@ import ( // which implements Chat interface. It requires an RPC client // which can use various transports like HTTP, IPC or in-proc. type WhisperClientAdapter struct { - rpcClient *rpc.Client - shhClient *shhclient.Client - mailServerEnodes []string + rpcClient *rpc.Client + shhClient *shhclient.Client + mailServerEnodes []string + selectedMailServerEnode string mu sync.RWMutex passSymKeyCache map[string]string @@ -48,24 +51,16 @@ func (a *WhisperClientAdapter) Subscribe( in chan<- *protocol.Message, options protocol.SubscribeOptions, ) (*protocol.Subscription, error) { + if err := options.Validate(); err != nil { + return nil, err + } + criteria := whisper.Criteria{ MinPow: 0, // TODO: set it to proper value AllowP2P: true, // messages from mail server are direct p2p messages } - if options.IsPublic() { - symKeyID, err := a.getOrAddSymKey(ctx, options.ChatName) - if err != nil { - return nil, err - } - criteria.SymKeyID = symKeyID - - topic, err := PublicChatTopic(options.ChatName) - if err != nil { - return nil, err - } - criteria.Topics = append(criteria.Topics, topic) - } else { + if options.Identity != nil { identityID, err := a.shhClient.AddPrivateKey(ctx, crypto.FromECDSA(options.Identity)) if err != nil { return nil, err @@ -79,6 +74,20 @@ func (a *WhisperClientAdapter) Subscribe( criteria.Topics = append(criteria.Topics, topic) } + if options.ChatName != "" { + symKeyID, err := a.getOrAddSymKey(ctx, options.ChatName) + if err != nil { + return nil, err + } + criteria.SymKeyID = symKeyID + + topic, err := PublicChatTopic(options.ChatName) + if err != nil { + return nil, err + } + criteria.Topics = append(criteria.Topics, topic) + } + return a.subscribeMessages(ctx, criteria, in) } @@ -137,58 +146,72 @@ func (a *WhisperClientAdapter) Send( data []byte, options protocol.SendOptions, ) ([]byte, error) { + if err := options.Validate(); err != nil { + return nil, err + } + identityID, err := a.shhClient.AddPrivateKey(ctx, crypto.FromECDSA(options.Identity)) if err != nil { return nil, err } - newMessage := whisper.NewMessage{ - TTL: 60, - Payload: data, - PowTarget: 2.0, - PowTime: 5, - Sig: identityID, + message, err := a.createNewMessage(data, identityID, options) + if err != nil { + return nil, err } - if options.IsPublic() { - symKeyID, err := a.getOrAddSymKey(ctx, options.ChatName) - if err != nil { - return nil, err - } - newMessage.SymKeyID = symKeyID - - topic, err := PublicChatTopic(options.ChatName) - if err != nil { - return nil, err - } - newMessage.Topic = topic - } else { - newMessage.PublicKey = crypto.FromECDSAPub(options.Recipient) - - topic, err := PrivateChatTopic() - if err != nil { - return nil, err - } - newMessage.Topic = topic - } - - hash, err := a.shhClient.Post(ctx, newMessage) + hash, err := a.shhClient.Post(ctx, message) if err != nil { return nil, err } return hex.DecodeString(hash) } +func (a *WhisperClientAdapter) createNewMessage(data []byte, sigKey string, options protocol.SendOptions) (whisper.NewMessage, error) { + message := createWhisperNewMessage(data, sigKey) + + if options.Recipient != nil { + message.PublicKey = crypto.FromECDSAPub(options.Recipient) + + topic, err := PrivateChatTopic() + if err != nil { + return message, err + } + message.Topic = topic + } + + if options.ChatName != "" { + ctx := context.Background() + symKeyID, err := a.getOrAddSymKey(ctx, options.ChatName) + if err != nil { + return message, err + } + message.SymKeyID = symKeyID + + topic, err := PublicChatTopic(options.ChatName) + if err != nil { + return message, err + } + message.Topic = topic + } + + return message, nil +} + // Request sends a request to MailServer for historic messages. func (a *WhisperClientAdapter) Request(ctx context.Context, params protocol.RequestOptions) error { - enode, err := a.addMailServer(ctx) + enode, err := a.selectAndAddMailServer(ctx) if err != nil { return err } return a.requestMessages(ctx, enode, params) } -func (a *WhisperClientAdapter) addMailServer(ctx context.Context) (string, error) { +func (a *WhisperClientAdapter) selectAndAddMailServer(ctx context.Context) (string, error) { + if a.selectedMailServerEnode != "" { + return a.selectedMailServerEnode, nil + } + enode := randomItem(a.mailServerEnodes) if err := a.rpcClient.CallContext(ctx, nil, "admin_addPeer", enode); err != nil { @@ -198,8 +221,6 @@ func (a *WhisperClientAdapter) addMailServer(ctx context.Context) (string, error // Adding peer is asynchronous operation so we need to retry a few times. retries := 0 for { - <-time.After(time.Second) - err := a.shhClient.MarkTrustedPeer(ctx, enode) if ctx.Err() == context.Canceled { log.Printf("requesting public messages canceled") @@ -210,50 +231,55 @@ func (a *WhisperClientAdapter) addMailServer(ctx context.Context) (string, error } if retries < 3 { retries++ + <-time.After(time.Second) } else { return "", fmt.Errorf("failed to mark peer as trusted: %v", err) } } + a.selectedMailServerEnode = enode + return enode, nil } func (a *WhisperClientAdapter) requestMessages(ctx context.Context, enode string, params protocol.RequestOptions) error { log.Printf("requesting messages from node %s", enode) - req, err := a.createMessagesRequest(enode, params) + arg, err := a.createMessagesRequest(enode, params) if err != nil { return err } - return a.rpcClient.CallContext(ctx, nil, "shhext_requestMessages", req) + return a.rpcClient.CallContext(ctx, nil, "shhext_requestMessages", arg) } func (a *WhisperClientAdapter) createMessagesRequest( enode string, params protocol.RequestOptions, -) (req shhext.MessagesRequest, err error) { +) (req shhextRequestMessagesParam, err error) { mailSymKeyID, err := a.getOrAddSymKey(context.Background(), MailServerPassword) if err != nil { return req, err } - req = shhext.MessagesRequest{ + req = shhextRequestMessagesParam{ MailServerPeer: enode, - From: uint32(params.From), // TODO: change to int in status-go - To: uint32(params.To), // TODO: change to int in status-go - Limit: uint32(params.Limit), // TODO: change to int in status-go + From: params.From, + To: params.To, + Limit: params.Limit, SymKeyID: mailSymKeyID, } - if params.IsPublic() { - topic, err := PublicChatTopic(params.ChatName) + if params.Recipient != nil { + topic, err := PrivateChatTopic() if err != nil { return req, err } req.Topics = append(req.Topics, topic) - } else { - topic, err := PrivateChatTopic() + } + + if params.ChatName != "" { + topic, err := PublicChatTopic(params.ChatName) if err != nil { return req, err } diff --git a/protocol/adapters/whisper_service.go b/protocol/adapters/whisper_service.go index f822bc5..9b0584b 100644 --- a/protocol/adapters/whisper_service.go +++ b/protocol/adapters/whisper_service.go @@ -8,12 +8,13 @@ import ( "sort" "time" - "github.com/ethereum/go-ethereum/crypto" - "github.com/ethereum/go-ethereum/p2p" - "github.com/status-im/status-console-client/protocol/v1" "github.com/status-im/status-go/node" "github.com/status-im/status-go/services/shhext" - "github.com/status-im/status-go/t/helpers" + + "github.com/status-im/status-console-client/protocol/v1" + + "github.com/ethereum/go-ethereum/crypto" + "github.com/ethereum/go-ethereum/p2p" whisper "github.com/status-im/whisper/whisperv6" ) @@ -98,7 +99,17 @@ func (a *WhisperServiceAdapter) createFilter(opts protocol.SubscribeOptions) (*w AllowP2P: true, } - if opts.IsPublic() { + if opts.Identity != nil { + filter.KeyAsym = opts.Identity + + topic, err := PrivateChatTopic() + if err != nil { + return nil, err + } + filter.Topics = append(filter.Topics, topic[:]) + } + + if opts.ChatName != "" { symKeyID, err := a.shh.AddSymKeyFromPassword(opts.ChatName) if err != nil { return nil, err @@ -107,21 +118,12 @@ func (a *WhisperServiceAdapter) createFilter(opts protocol.SubscribeOptions) (*w if err != nil { return nil, err } + filter.KeySym = symKey + topic, err := PublicChatTopic(opts.ChatName) if err != nil { return nil, err } - - filter.KeySym = symKey - filter.Topics = append(filter.Topics, topic[:]) - } else { - filter.KeyAsym = opts.Identity - - topic, err := PrivateChatTopic() - if err != nil { - return nil, err - } - filter.Topics = append(filter.Topics, topic[:]) } @@ -157,7 +159,17 @@ func (a *WhisperServiceAdapter) createNewMessage(data []byte, options protocol.S message = createWhisperNewMessage(data, keyID) - if options.IsPublic() { + if options.Recipient != nil { + message.PublicKey = crypto.FromECDSAPub(options.Recipient) + + topic, err := PrivateChatTopic() + if err != nil { + return message, err + } + message.Topic = topic + } + + if options.ChatName != "" { symKeyID, err := a.shh.AddSymKeyFromPassword(options.ChatName) if err != nil { return message, err @@ -169,14 +181,6 @@ func (a *WhisperServiceAdapter) createNewMessage(data []byte, options protocol.S return message, err } message.Topic = topic - } else { - message.PublicKey = crypto.FromECDSAPub(options.Recipient) - - topic, err := PrivateChatTopic() - if err != nil { - return message, err - } - message.Topic = topic } return @@ -199,7 +203,11 @@ func (a *WhisperServiceAdapter) Request(ctx context.Context, options protocol.Re return err } + now := time.Now() _, err = a.requestMessages(ctx, req, true) + + log.Printf("[WhisperServiceAdapter::Request] took %s", time.Since(now)) + return err } @@ -210,12 +218,15 @@ func (a *WhisperServiceAdapter) selectAndAddMailServer() (string, error) { config := a.node.Config() enode := randomItem(config.ClusterConfig.TrustedMailServers) - errCh := helpers.WaitForPeerAsync( + errCh := waitForPeerAsync( a.node.GethNode().Server(), enode, p2p.PeerEventTypeAdd, time.Second*5, ) + + log.Printf("[WhisperServiceAdapter::selectAndAddMailServer] randomly selected %s node", enode) + if err := a.node.AddPeer(enode); err != nil { return "", err } @@ -246,6 +257,9 @@ func (a *WhisperServiceAdapter) requestMessages(ctx context.Context, req shhext. if err != nil { return } + + log.Printf("[WhisperServiceAdapter::requestMessages] response = %+v, err = %v", resp, err) + if resp.Error != nil { err = resp.Error return @@ -276,14 +290,16 @@ func (a *WhisperServiceAdapter) createMessagesRequest( SymKeyID: mailSymKeyID, } - if params.IsPublic() { - topic, err := PublicChatTopic(params.ChatName) + if params.Recipient != nil { + topic, err := PrivateChatTopic() if err != nil { return req, err } req.Topics = append(req.Topics, topic) - } else { - topic, err := PrivateChatTopic() + } + + if params.ChatName != "" { + topic, err := PublicChatTopic(params.ChatName) if err != nil { return req, err } diff --git a/protocol/adapters/whisper_utils.go b/protocol/adapters/whisper_utils.go index 09e35a3..c1e1593 100644 --- a/protocol/adapters/whisper_utils.go +++ b/protocol/adapters/whisper_utils.go @@ -6,9 +6,14 @@ import ( whisper "github.com/status-im/whisper/whisperv6" ) -func randomItem(items []string) string { - l := len(items) - return items[rand.Intn(l)] +// shhextRequestMessagesParam is used to remove dependency on shhext module. +type shhextRequestMessagesParam struct { + MailServerPeer string `json:"mailServerPeer"` + From int64 `json:"from"` + To int64 `json:"to"` + Limit int `json:"limit"` + SymKeyID string `json:"symKeyID"` + Topics []whisper.TopicType `json:"topics"` } func createWhisperNewMessage(data []byte, sigKey string) whisper.NewMessage { @@ -20,3 +25,8 @@ func createWhisperNewMessage(data []byte, sigKey string) whisper.NewMessage { Sig: sigKey, } } + +func randomItem(items []string) string { + l := len(items) + return items[rand.Intn(l)] +} diff --git a/protocol/client/chat.go b/protocol/client/chat.go index 70deaea..46fc318 100644 --- a/protocol/client/chat.go +++ b/protocol/client/chat.go @@ -4,11 +4,10 @@ import ( "context" "crypto/ecdsa" "encoding/hex" + "fmt" "log" "sort" - "strings" "sync" - "time" "github.com/pkg/errors" "github.com/status-im/status-console-client/protocol/v1" @@ -36,7 +35,8 @@ type Chat struct { lastClock int64 ownMessages chan *protocol.Message // my private messages channel - // TODO: make it a ring buffer + // TODO: make it a ring buffer. It will require loading newer messages + // from the cache as well. messages []*protocol.Message // all messages ordered by Clock messagesByHash map[string]*protocol.Message // quick access to messages by hash } @@ -116,11 +116,9 @@ func (c *Chat) Subscribe(params protocol.RequestOptions) (err error) { return errors.New("already subscribed") } - opts := protocol.SubscribeOptions{} - if c.contact.Type == ContactPublicChat { - opts.ChatName = c.contact.Name - } else { - opts.Identity = c.identity + opts, err := extendSubscribeOptions(protocol.SubscribeOptions{}, c) + if err != nil { + return errors.Wrap(err, "failed to subscribe") } messages := make(chan *protocol.Message) @@ -140,12 +138,12 @@ func (c *Chat) Subscribe(params protocol.RequestOptions) (err error) { } // Load loads messages from the database cache and the network. -func (c *Chat) load(params protocol.RequestOptions) error { +func (c *Chat) load(options protocol.RequestOptions) error { // Get already cached messages from the database. cachedMessages, err := c.db.Messages( c.contact, - params.From, - params.To, + options.From, + options.To, ) if err != nil { return errors.Wrap(err, "db failed to get messages") @@ -160,21 +158,19 @@ func (c *Chat) load(params protocol.RequestOptions) error { }() // Request historic messages from the network. - if err := c.request(params); err != nil { + if err := c.request(options); err != nil { return errors.Wrap(err, "failed to request for messages") } return nil } -func (c *Chat) request(params protocol.RequestOptions) error { - if c.contact.Type == ContactPublicChat { - params.ChatName = c.contact.Name - } else { - params.Recipient = c.contact.PublicKey +func (c *Chat) request(options protocol.RequestOptions) error { + opts, err := extendRequestOptions(options, c) + if err != nil { + return err } - - return c.proto.Request(context.Background(), params) + return c.proto.Request(context.Background(), opts) } // Request historic messages. @@ -189,7 +185,7 @@ func (c *Chat) Send(data []byte) error { // This is needed to prevent sending messages // if the chat is already left/canceled // as a it can't be guaranteed that processing - // looop goroutines are still running. + // loop goroutines are still running. select { case _, ok := <-c.cancel: if !ok { @@ -198,40 +194,31 @@ func (c *Chat) Send(data []byte) error { default: } - var messageType string + var message protocol.StatusMessage - text := strings.TrimSpace(string(data)) - ts := time.Now().Unix() * 1000 - clock := protocol.CalcMessageClock(c.lastClock, ts) - opts := protocol.SendOptions{ - Identity: c.identity, + switch c.contact.Type { + case ContactPublicChat: + message = protocol.CreatePublicTextMessage(data, c.lastClock, c.contact.Name) + case ContactPrivateChat: + message = protocol.CreatePrivateTextMessage(data, c.lastClock, c.contact.Name) + default: + return fmt.Errorf("failed to send message: unsupported contact type") } - if c.contact.Type == ContactPublicChat { - opts.ChatName = c.contact.Name - messageType = protocol.MessageTypePublicGroupUserMessage - } else { - opts.Recipient = c.contact.PublicKey - messageType = protocol.MessageTypePrivateUserMessage - } - - message := protocol.StatusMessage{ - Text: text, - ContentT: protocol.ContentTypeTextPlain, - MessageT: messageType, - Clock: clock, - Timestamp: ts, - Content: protocol.StatusMessageContent{ChatID: c.contact.Name, Text: text}, - } encodedMessage, err := protocol.EncodeMessage(message) if err != nil { return errors.Wrap(err, "failed to encode message") } c.Lock() - c.updateLastClock(clock) + c.updateLastClock(message.Clock) c.Unlock() + opts, err := extendSendOptions(protocol.SendOptions{Identity: c.identity}, c) + if err != nil { + return errors.Wrap(err, "failed to prepare send options") + } + hash, err := c.proto.Send(context.Background(), encodedMessage, opts) // Own messages need to be pushed manually to the pipeline. diff --git a/protocol/client/chat_options.go b/protocol/client/chat_options.go new file mode 100644 index 0000000..29b8132 --- /dev/null +++ b/protocol/client/chat_options.go @@ -0,0 +1,47 @@ +package client + +import ( + "fmt" + + "github.com/status-im/status-console-client/protocol/v1" +) + +var ( + errUnsupportedContactType = fmt.Errorf("unsupported contact type") +) + +func extendSubscribeOptions(opts protocol.SubscribeOptions, c *Chat) (protocol.SubscribeOptions, error) { + switch c.contact.Type { + case ContactPublicChat: + opts.ChatName = c.contact.Name + case ContactPrivateChat: + opts.Identity = c.identity + default: + return opts, errUnsupportedContactType + } + return opts, nil +} + +func extendRequestOptions(opts protocol.RequestOptions, c *Chat) (protocol.RequestOptions, error) { + switch c.contact.Type { + case ContactPublicChat: + opts.ChatName = c.contact.Name + case ContactPrivateChat: + opts.Recipient = c.contact.PublicKey + default: + return opts, errUnsupportedContactType + } + return opts, nil +} + +func extendSendOptions(opts protocol.SendOptions, c *Chat) (protocol.SendOptions, error) { + switch c.contact.Type { + case ContactPublicChat: + opts.ChatName = c.contact.Name + case ContactPrivateChat: + opts.Recipient = c.contact.PublicKey + default: + return opts, errUnsupportedContactType + } + return opts, nil +} diff --git a/protocol/client/chat_test.go b/protocol/client/chat_test.go index 5c70b95..787d868 100644 --- a/protocol/client/chat_test.go +++ b/protocol/client/chat_test.go @@ -55,7 +55,7 @@ func (m *ChatMock) Request(ctx context.Context, params protocol.RequestOptions) func TestSubscribe(t *testing.T) { proto := ChatMock{} - contact := Contact{} + contact := Contact{Name: "test", Type: ContactPublicChat} db, err := NewDatabase("") require.NoError(t, err) diff --git a/protocol/client/messenger.go b/protocol/client/messenger.go index ea58dda..ccd292d 100644 --- a/protocol/client/messenger.go +++ b/protocol/client/messenger.go @@ -23,7 +23,7 @@ type Messenger struct { events chan interface{} } -// NewMessanger returns a new Messanger. +// NewMessenger returns a new Messanger. func NewMessenger(proto protocol.Chat, identity *ecdsa.PrivateKey, db *Database) *Messenger { return &Messenger{ proto: proto, @@ -50,15 +50,12 @@ func (m *Messenger) Chat(c Contact) *Chat { // Join creates a new chat and creates a subscription. func (m *Messenger) Join(contact Contact, params protocol.RequestOptions) error { - m.RLock() - _, found := m.chats[contact] - m.RUnlock() - - if found { - return nil + chat := m.Chat(contact) + if chat != nil { + return chat.load(params) } - chat := NewChat(m.proto, m.identity, contact, m.db) + chat = NewChat(m.proto, m.identity, contact, m.db) cancel := make(chan struct{}) m.Lock() diff --git a/protocol/v1/chat.go b/protocol/v1/chat.go index 795beae..9e4b8b1 100644 --- a/protocol/v1/chat.go +++ b/protocol/v1/chat.go @@ -55,9 +55,6 @@ func (o RequestOptions) Validate() error { return nil } -// IsPublic returns true if RequestOptions are for a public chat. -func (o RequestOptions) IsPublic() bool { return o.ChatName != "" } - // DefaultRequestOptions returns default options returning messages // from the last 24 hours. func DefaultRequestOptions() RequestOptions { @@ -85,9 +82,6 @@ func (o SubscribeOptions) Validate() error { return nil } -// IsPublic returns true if SubscribeOptions are for a public chat. -func (o SubscribeOptions) IsPublic() bool { return o.ChatName != "" } - // SendOptions are options for Chat.Send. type SendOptions struct { Identity *ecdsa.PrivateKey @@ -109,6 +103,3 @@ func (o SendOptions) Validate() error { } return nil } - -// IsPublic returns true if SendOptions are for a public chat. -func (o SendOptions) IsPublic() bool { return o.ChatName != "" } diff --git a/protocol/v1/message.go b/protocol/v1/message.go index 58a9b57..100613d 100644 --- a/protocol/v1/message.go +++ b/protocol/v1/message.go @@ -3,6 +3,8 @@ package protocol import ( "bytes" "errors" + "strings" + "time" ) const ( @@ -38,6 +40,32 @@ type StatusMessage struct { Content StatusMessageContent } +// CreateTextStatusMessage creates a StatusMessage. +func CreateTextStatusMessage(data []byte, lastClock int64, chatID, messageType string) StatusMessage { + text := strings.TrimSpace(string(data)) + ts := time.Now().Unix() * 1000 + clock := CalcMessageClock(lastClock, ts) + + return StatusMessage{ + Text: text, + ContentT: ContentTypeTextPlain, + MessageT: messageType, + Clock: clock, + Timestamp: ts, + Content: StatusMessageContent{ChatID: chatID, Text: text}, + } +} + +// CreatePublicTextMessage creates a public text StatusMessage. +func CreatePublicTextMessage(data []byte, lastClock int64, chatID string) StatusMessage { + return CreateTextStatusMessage(data, lastClock, chatID, MessageTypePublicGroupUserMessage) +} + +// CreatePrivateTextMessage creates a public text StatusMessage. +func CreatePrivateTextMessage(data []byte, lastClock int64, chatID string) StatusMessage { + return CreateTextStatusMessage(data, lastClock, chatID, MessageTypePrivateUserMessage) +} + // DecodeMessage decodes a raw payload to StatusMessage struct. func DecodeMessage(data []byte) (message StatusMessage, err error) { buf := bytes.NewBuffer(data)