mirror of
https://github.com/status-im/status-console-client.git
synced 2025-02-23 16:18:23 +00:00
simplify protocol message struct (#26)
This commit is contained in:
parent
6758e393f4
commit
f022b7b068
28
chat.go
28
chat.go
@ -16,6 +16,10 @@ import (
|
||||
"github.com/status-im/status-console-client/protocol/v1"
|
||||
)
|
||||
|
||||
const (
|
||||
defaultRequestOptionsFrom = 60 * 60 * 24
|
||||
)
|
||||
|
||||
// ChatViewController manages chat view.
|
||||
type ChatViewController struct {
|
||||
*ViewController
|
||||
@ -134,19 +138,22 @@ func (c *ChatViewController) Select(contact client.Contact) error {
|
||||
return err
|
||||
}
|
||||
|
||||
func (c *ChatViewController) RequestOptions(older bool) protocol.RequestOptions {
|
||||
// RequestOptions returns the RequestOptions for the next request call.
|
||||
// Newest param when true means that we are interested in the most recent messages.
|
||||
func (c *ChatViewController) RequestOptions(newest bool) protocol.RequestOptions {
|
||||
params := protocol.DefaultRequestOptions()
|
||||
|
||||
if older && c.firstRequest != (protocol.RequestOptions{}) {
|
||||
params.From = c.firstRequest.From - 60*60*24
|
||||
params.To = c.firstRequest.From
|
||||
} else if c.lastRequest != (protocol.RequestOptions{}) {
|
||||
params.From = c.lastRequest.To
|
||||
if newest && c.lastRequest != (protocol.RequestOptions{}) {
|
||||
params.From = c.lastRequest.From
|
||||
} else if c.firstRequest != (protocol.RequestOptions{}) {
|
||||
params.From = c.firstRequest.From - defaultRequestOptionsFrom
|
||||
params.To = c.firstRequest.To
|
||||
}
|
||||
|
||||
return params
|
||||
}
|
||||
|
||||
// RequestMessages sends a request fro historical messages.
|
||||
func (c *ChatViewController) RequestMessages(params protocol.RequestOptions) error {
|
||||
chat := c.messenger.Chat(c.contact)
|
||||
if chat == nil {
|
||||
@ -169,6 +176,7 @@ func (c *ChatViewController) updateRequests(params protocol.RequestOptions) {
|
||||
}
|
||||
}
|
||||
|
||||
// Send sends a payload as a message.
|
||||
func (c *ChatViewController) Send(data []byte) error {
|
||||
chat := c.messenger.Chat(c.contact)
|
||||
if chat == nil {
|
||||
@ -202,10 +210,10 @@ func (c *ChatViewController) writeMessage(message *protocol.Message) error {
|
||||
|
||||
line := formatMessageLine(
|
||||
pubKey,
|
||||
message.Hash,
|
||||
message.Decoded.Clock,
|
||||
time.Unix(message.Decoded.Timestamp/1000, 0),
|
||||
message.Decoded.Text,
|
||||
message.ID,
|
||||
int64(message.Clock),
|
||||
message.Timestamp.Time(),
|
||||
message.Text,
|
||||
)
|
||||
|
||||
println := fmt.Fprintln
|
||||
|
@ -73,6 +73,6 @@ func TestSendMessage(t *testing.T) {
|
||||
require.Equal(t, protocol.ContentTypeTextPlain, statusMessage.ContentT)
|
||||
require.Equal(t, protocol.MessageTypePublicGroup, statusMessage.MessageT)
|
||||
require.Equal(t,
|
||||
protocol.StatusMessageContent{ChatID: chatName, Text: string(payload)},
|
||||
protocol.Content{ChatID: chatName, Text: string(payload)},
|
||||
statusMessage.Content)
|
||||
}
|
||||
|
2
main.go
2
main.go
@ -297,7 +297,7 @@ func main() {
|
||||
Key: gocui.KeyHome,
|
||||
Mod: gocui.ModNone,
|
||||
Handler: func(g *gocui.Gui, v *gocui.View) error {
|
||||
params := chat.RequestOptions(true)
|
||||
params := chat.RequestOptions(false)
|
||||
|
||||
if err := notifications.Debug("Messages request", fmt.Sprintf("%v", params)); err != nil {
|
||||
return err
|
||||
|
@ -138,11 +138,9 @@ func (a *WhisperClientAdapter) subscribeMessages(
|
||||
log.Printf("failed to get a signature: %v", err)
|
||||
break
|
||||
}
|
||||
m.SigPubKey = sigPubKey
|
||||
|
||||
in <- &protocol.Message{
|
||||
Decoded: m,
|
||||
SigPubKey: sigPubKey,
|
||||
}
|
||||
in <- &m
|
||||
case err := <-shhSub.Err():
|
||||
sub.Cancel(err)
|
||||
return
|
||||
|
@ -191,7 +191,7 @@ func (a *WhisperServiceAdapter) handleMessages(received []*whisper.ReceivedMessa
|
||||
}
|
||||
|
||||
sort.Slice(messages, func(i, j int) bool {
|
||||
return messages[i].Decoded.Clock < messages[j].Decoded.Clock
|
||||
return messages[i].Clock < messages[j].Clock
|
||||
})
|
||||
|
||||
return messages
|
||||
@ -220,12 +220,10 @@ func (a *WhisperServiceAdapter) decodeMessage(message *whisper.ReceivedMessage)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
decoded.ID = hash
|
||||
decoded.SigPubKey = publicKey
|
||||
|
||||
return &protocol.Message{
|
||||
Decoded: decoded,
|
||||
Hash: hash,
|
||||
SigPubKey: publicKey,
|
||||
}, nil
|
||||
return &decoded, nil
|
||||
}
|
||||
|
||||
// Send sends a new message using the Whisper service.
|
||||
|
@ -41,6 +41,10 @@ type Chat struct {
|
||||
}
|
||||
|
||||
// NewChat returns a new Chat instance.
|
||||
// Instances should not be reused after
|
||||
// leaving but instead a new instance
|
||||
// should be created and replace the
|
||||
// previous one.
|
||||
func NewChat(proto protocol.Protocol, identity *ecdsa.PrivateKey, contact Contact, db *Database) *Chat {
|
||||
c := Chat{
|
||||
proto: proto,
|
||||
@ -75,6 +79,13 @@ func (c *Chat) Events() <-chan interface{} {
|
||||
return c.events
|
||||
}
|
||||
|
||||
// Done informs when the Chat finished processing messages.
|
||||
func (c *Chat) Done() <-chan struct{} {
|
||||
c.RLock()
|
||||
defer c.RUnlock()
|
||||
return c.cancel
|
||||
}
|
||||
|
||||
// Err returns a cached error.
|
||||
func (c *Chat) Err() error {
|
||||
c.RLock()
|
||||
@ -92,22 +103,24 @@ func (c *Chat) Messages() []*protocol.Message {
|
||||
// HasMessage returns true if a given message is already cached.
|
||||
func (c *Chat) HasMessage(m *protocol.Message) bool {
|
||||
c.RLock()
|
||||
defer c.RUnlock()
|
||||
return c.hasMessage(m)
|
||||
_, ok := c.hasMessageWithHash(m)
|
||||
c.RUnlock()
|
||||
return ok
|
||||
}
|
||||
|
||||
func (c *Chat) hasMessage(m *protocol.Message) bool {
|
||||
hash := messageHashStr(m)
|
||||
func (c *Chat) hasMessageWithHash(m *protocol.Message) (string, bool) {
|
||||
hash := hex.EncodeToString(m.ID)
|
||||
_, ok := c.messagesByHash[hash]
|
||||
return ok
|
||||
return hash, ok
|
||||
}
|
||||
|
||||
// Subscribe reads messages from the network.
|
||||
//
|
||||
// TODO: consider removing getting data from this method.
|
||||
// Instead, getting data should be a separate call.
|
||||
func (c *Chat) Subscribe(params protocol.RequestOptions) (err error) {
|
||||
func (c *Chat) Subscribe(params protocol.RequestOptions) error {
|
||||
c.RLock()
|
||||
cancel := c.cancel
|
||||
sub := c.sub
|
||||
c.RUnlock()
|
||||
|
||||
@ -131,7 +144,7 @@ func (c *Chat) Subscribe(params protocol.RequestOptions) (err error) {
|
||||
c.sub = sub
|
||||
c.Unlock()
|
||||
|
||||
go c.readLoop(messages, sub, c.cancel)
|
||||
go c.readLoop(messages, sub, cancel)
|
||||
|
||||
return c.load(params)
|
||||
}
|
||||
@ -141,8 +154,8 @@ func (c *Chat) load(options protocol.RequestOptions) error {
|
||||
// Get already cached messages from the database.
|
||||
cachedMessages, err := c.db.Messages(
|
||||
c.contact,
|
||||
options.From,
|
||||
options.To,
|
||||
options.FromAsTime(),
|
||||
options.ToAsTime(),
|
||||
)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "db failed to get messages")
|
||||
@ -193,7 +206,7 @@ func (c *Chat) Send(data []byte) error {
|
||||
default:
|
||||
}
|
||||
|
||||
var message protocol.StatusMessage
|
||||
var message protocol.Message
|
||||
|
||||
switch c.contact.Type {
|
||||
case ContactPublicChat:
|
||||
@ -210,7 +223,7 @@ func (c *Chat) Send(data []byte) error {
|
||||
}
|
||||
|
||||
c.Lock()
|
||||
c.updateLastClock(message.Clock)
|
||||
c.updateLastClock(int64(message.Clock))
|
||||
c.Unlock()
|
||||
|
||||
opts, err := createSendOptions(c.contact)
|
||||
@ -225,11 +238,9 @@ func (c *Chat) Send(data []byte) error {
|
||||
log.Printf("[Chat::Send] sent a private message")
|
||||
|
||||
// TODO: this should be created by c.proto
|
||||
c.ownMessages <- &protocol.Message{
|
||||
Decoded: message,
|
||||
SigPubKey: &c.identity.PublicKey,
|
||||
Hash: hash,
|
||||
}
|
||||
message.SigPubKey = &c.identity.PublicKey
|
||||
message.ID = hash
|
||||
c.ownMessages <- &message
|
||||
}
|
||||
|
||||
return err
|
||||
@ -251,7 +262,6 @@ func (c *Chat) readLoop(messages <-chan *protocol.Message, sub *protocol.Subscri
|
||||
c.Unlock()
|
||||
|
||||
close(cancel)
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
@ -305,18 +315,16 @@ func (c *Chat) handleMessages(messages ...*protocol.Message) (rearranged bool) {
|
||||
c.Lock()
|
||||
defer c.Unlock()
|
||||
|
||||
for _, message := range messages {
|
||||
c.updateLastClock(message.Decoded.Clock)
|
||||
for _, m := range messages {
|
||||
c.updateLastClock(int64(m.Clock))
|
||||
|
||||
hash := messageHashStr(message)
|
||||
|
||||
// TODO: remove from here
|
||||
if _, ok := c.messagesByHash[hash]; ok {
|
||||
hash, exists := c.hasMessageWithHash(m)
|
||||
if exists {
|
||||
continue
|
||||
}
|
||||
|
||||
c.messagesByHash[hash] = message
|
||||
c.messages = append(c.messages, message)
|
||||
c.messagesByHash[hash] = m
|
||||
c.messages = append(c.messages, m)
|
||||
|
||||
sorted := sort.SliceIsSorted(c.messages, c.lessFn)
|
||||
log.Printf("[Chat::handleMessages] sorted = %t", sorted)
|
||||
@ -330,7 +338,7 @@ func (c *Chat) handleMessages(messages ...*protocol.Message) (rearranged bool) {
|
||||
}
|
||||
|
||||
func (c *Chat) lessFn(i, j int) bool {
|
||||
return c.messages[i].Decoded.Clock < c.messages[j].Decoded.Clock
|
||||
return c.messages[i].Clock < c.messages[j].Clock
|
||||
}
|
||||
|
||||
func (c *Chat) onMessagesRearrange() {
|
||||
@ -358,7 +366,3 @@ func (c *Chat) updateLastClock(clock int64) {
|
||||
c.lastClock = clock
|
||||
}
|
||||
}
|
||||
|
||||
func messageHashStr(m *protocol.Message) string {
|
||||
return hex.EncodeToString(m.Hash)
|
||||
}
|
||||
|
@ -15,6 +15,10 @@ const (
|
||||
testPubKey = "0x047d036c25b97a377df74ca4f1780369b1f5475cb58b95d8683cce7f7cfd832271072c18ebf75d09b1c04ae066efcf46b10e14bda83fc220b39ae3dece38f91993"
|
||||
)
|
||||
|
||||
var (
|
||||
timeZero = time.Unix(0, 0)
|
||||
)
|
||||
|
||||
type message struct {
|
||||
chat string
|
||||
dest *ecdsa.PublicKey
|
||||
@ -93,7 +97,7 @@ func TestSendPrivateMessage(t *testing.T) {
|
||||
require.Len(t, chat.Messages(), 1)
|
||||
|
||||
// the message should be also saved in the database
|
||||
result, err := db.Messages(contact, 0, time.Now().Unix())
|
||||
result, err := db.Messages(contact, timeZero, time.Now())
|
||||
require.NoError(t, err)
|
||||
require.Len(t, result, 1)
|
||||
|
||||
@ -116,16 +120,15 @@ func TestHandleMessageFromProtocol(t *testing.T) {
|
||||
err = chat.Subscribe(params)
|
||||
require.NoError(t, err)
|
||||
|
||||
now := time.Now().Unix()
|
||||
now := time.Now()
|
||||
ts := protocol.TimestampInMsFromTime(now)
|
||||
message := &protocol.Message{
|
||||
Decoded: protocol.StatusMessage{
|
||||
Text: "some",
|
||||
ContentT: protocol.ContentTypeTextPlain,
|
||||
MessageT: protocol.MessageTypePublicGroup,
|
||||
Timestamp: now * 1000,
|
||||
Clock: now * 1000,
|
||||
},
|
||||
Hash: []byte{0x01, 0x02, 0x03},
|
||||
ID: []byte{0x01, 0x02, 0x03},
|
||||
Text: "some",
|
||||
ContentT: protocol.ContentTypeTextPlain,
|
||||
MessageT: protocol.MessageTypePublicGroup,
|
||||
Timestamp: ts,
|
||||
Clock: int64(ts),
|
||||
}
|
||||
proto.input <- message
|
||||
|
||||
@ -135,12 +138,12 @@ func TestHandleMessageFromProtocol(t *testing.T) {
|
||||
require.True(t, chat.HasMessage(message))
|
||||
|
||||
// the message should be also saved in the database
|
||||
result, err := db.Messages(contact, 0, now)
|
||||
result, err := db.Messages(contact, timeZero, now)
|
||||
require.NoError(t, err)
|
||||
require.Len(t, result, 1)
|
||||
|
||||
// clock should be updated
|
||||
require.Equal(t, now*1000, chat.lastClock)
|
||||
require.Equal(t, int64(ts), chat.lastClock)
|
||||
}
|
||||
|
||||
func waitForEventTypeMessage(t *testing.T, chat *Chat) {
|
||||
|
@ -8,18 +8,16 @@ import (
|
||||
"github.com/ethereum/go-ethereum/crypto"
|
||||
)
|
||||
|
||||
type ContactType int
|
||||
|
||||
// Types of contacts.
|
||||
const (
|
||||
ContactPublicChat ContactType = iota + 1
|
||||
ContactPublicChat int = iota + 1
|
||||
ContactPrivateChat
|
||||
)
|
||||
|
||||
// Contact is a single contact which has a type and name.
|
||||
type Contact struct {
|
||||
Name string
|
||||
Type ContactType
|
||||
Type int
|
||||
PublicKey *ecdsa.PublicKey
|
||||
}
|
||||
|
||||
|
@ -7,6 +7,7 @@ import (
|
||||
"encoding/gob"
|
||||
"io"
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
"github.com/ethereum/go-ethereum/crypto/secp256k1"
|
||||
"github.com/syndtr/goleveldb/leveldb"
|
||||
@ -51,15 +52,16 @@ func NewDatabase(path string) (*Database, error) {
|
||||
return &Database{db: db}, nil
|
||||
}
|
||||
|
||||
// Close closes the database.
|
||||
func (d *Database) Close() error {
|
||||
return d.db.Close()
|
||||
}
|
||||
|
||||
// Messages returns all messages for a given contact
|
||||
// and between from and to timestamps.
|
||||
func (d *Database) Messages(c Contact, from, to int64) (result []*protocol.Message, err error) {
|
||||
func (d *Database) Messages(c Contact, from, to time.Time) (result []*protocol.Message, err error) {
|
||||
start := d.keyFromContact(c, from, nil)
|
||||
limit := d.keyFromContact(c, to+1, nil) // because iter is right-exclusive
|
||||
limit := d.keyFromContact(c, to.Add(time.Second), nil) // add 1s because iter is right-exclusive
|
||||
|
||||
iter := d.db.NewIterator(&util.Range{Start: start, Limit: limit}, nil)
|
||||
defer iter.Release()
|
||||
@ -91,15 +93,12 @@ func (d *Database) SaveMessages(c Contact, messages []*protocol.Message) error {
|
||||
|
||||
batch := new(leveldb.Batch)
|
||||
for _, m := range messages {
|
||||
// TODO(adam): incoming Timestamp is in ms
|
||||
key := d.keyFromContact(c, m.Decoded.Timestamp/1000, m.Hash)
|
||||
|
||||
if err := enc.Encode(m); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
key := d.keyFromContact(c, m.Timestamp.Time(), m.ID)
|
||||
data := buf.Bytes()
|
||||
|
||||
// Data from the buffer needs to be copied to another slice
|
||||
// because a slice returned from Buffer.Bytes() is valid
|
||||
// only until another write.
|
||||
@ -161,11 +160,11 @@ func (d *Database) prefixFromContact(c Contact) []byte {
|
||||
return h.Sum(nil)
|
||||
}
|
||||
|
||||
func (d *Database) keyFromContact(c Contact, t int64, hash []byte) []byte {
|
||||
func (d *Database) keyFromContact(c Contact, t time.Time, hash []byte) []byte {
|
||||
var key [keyFromContactLength]byte
|
||||
|
||||
copy(key[:], d.prefixFromContact(c))
|
||||
binary.BigEndian.PutUint64(key[contactPrefixLength:], uint64(t))
|
||||
binary.BigEndian.PutUint64(key[contactPrefixLength:], uint64(t.Unix()))
|
||||
|
||||
if hash != nil {
|
||||
copy(key[contactPrefixLength+timeLength:], hash)
|
||||
|
@ -75,6 +75,9 @@ LOOP:
|
||||
case ev := <-chat.Events():
|
||||
log.Printf("[Messenger::Join] received an event: %+v", ev)
|
||||
m.events <- ev
|
||||
case <-chat.Done():
|
||||
log.Printf("[Messenger::Join] chat was left")
|
||||
break LOOP
|
||||
case <-cancel:
|
||||
break LOOP
|
||||
}
|
||||
|
@ -74,7 +74,7 @@ func (api *PublicAPI) Messages(ctx context.Context, params MessagesParams) (*rpc
|
||||
for {
|
||||
select {
|
||||
case m := <-messages:
|
||||
if err := notifier.Notify(rpcSub.ID, m.Decoded); err != nil {
|
||||
if err := notifier.Notify(rpcSub.ID, m); err != nil {
|
||||
log.Printf("failed to notify %s about new message", rpcSub.ID)
|
||||
}
|
||||
case <-sub.Done():
|
||||
|
@ -3,6 +3,7 @@ package gethservice
|
||||
import (
|
||||
"context"
|
||||
"crypto/ecdsa"
|
||||
"fmt"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
@ -48,7 +49,7 @@ func TestPublicAPISend(t *testing.T) {
|
||||
Return(result, nil)
|
||||
|
||||
var hash hexutil.Bytes
|
||||
err = client.Call(&hash, "protos_send", hexutil.Encode(data), params)
|
||||
err = client.Call(&hash, createRPCMethod("send"), hexutil.Encode(data), params)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, result, hash)
|
||||
}
|
||||
@ -87,7 +88,7 @@ func TestPublicAPIRequest(t *testing.T) {
|
||||
Return(nil)
|
||||
|
||||
// nil skips the result... because there is no result
|
||||
err = client.Call(nil, "protos_request", params)
|
||||
err = client.Call(nil, createRPCMethod("request"), params)
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
@ -100,7 +101,7 @@ func TestPublicAPIMessages(t *testing.T) {
|
||||
require.NoError(t, err)
|
||||
defer func() { go discardStop(aNode) }() // Stop() is slow so do it in a goroutine
|
||||
|
||||
messages := make(chan protocol.StatusMessage)
|
||||
messages := make(chan protocol.Message)
|
||||
params := MessagesParams{
|
||||
ChatParams: ChatParams{
|
||||
PubChatName: "test-chat",
|
||||
@ -120,7 +121,7 @@ func TestPublicAPIMessages(t *testing.T) {
|
||||
Return(protocol.NewSubscription(), nil)
|
||||
|
||||
// The first argument is a name of the method to use for subscription.
|
||||
_, err = client.Subscribe(context.Background(), "protos", messages, "messages", params)
|
||||
_, err = client.Subscribe(context.Background(), ServiceProtosAPIName, messages, "messages", params)
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
@ -142,7 +143,7 @@ func createAndStartNode(privateKey *ecdsa.PrivateKey) (*node.StatusNode, *Servic
|
||||
}
|
||||
|
||||
return n, service, n.Start(
|
||||
¶ms.NodeConfig{APIModules: "protos"},
|
||||
¶ms.NodeConfig{APIModules: ServiceProtosAPIName},
|
||||
services...,
|
||||
)
|
||||
}
|
||||
@ -165,6 +166,10 @@ func setupRPCClient(proto protocol.Protocol) (*rpc.Client, *node.StatusNode, err
|
||||
return client, n, err
|
||||
}
|
||||
|
||||
func createRPCMethod(name string) string {
|
||||
return fmt.Sprintf("%s_%s", ServiceProtosAPIName, name)
|
||||
}
|
||||
|
||||
type keysGetter struct {
|
||||
privateKey *ecdsa.PrivateKey
|
||||
}
|
||||
|
@ -12,6 +12,12 @@ import (
|
||||
"github.com/ethereum/go-ethereum/rpc"
|
||||
)
|
||||
|
||||
const (
|
||||
// ServiceProtosAPIName is a name of the API namespace
|
||||
// with the protocol specific methods.
|
||||
ServiceProtosAPIName = "protos"
|
||||
)
|
||||
|
||||
var _ gethnode.Service = (*Service)(nil)
|
||||
|
||||
// KeysGetter is an interface that specifies what kind of keys
|
||||
@ -51,7 +57,7 @@ func (s *Service) Protocols() []p2p.Protocol {
|
||||
func (s *Service) APIs() []rpc.API {
|
||||
return []rpc.API{
|
||||
{
|
||||
Namespace: "protos",
|
||||
Namespace: ServiceProtosAPIName,
|
||||
Version: "1.0",
|
||||
Service: &PublicAPI{service: s},
|
||||
Public: true,
|
||||
|
@ -2,16 +2,16 @@ package protocol
|
||||
|
||||
import "time"
|
||||
|
||||
// CalcMessageClock calculates a new clock value for StatusMessage.
|
||||
// CalcMessageClock calculates a new clock value for Message.
|
||||
// It is used to properly sort messages and accomodate the fact
|
||||
// that time might be different on each device.
|
||||
func CalcMessageClock(lastObservedValue, timeInMs int64) int64 {
|
||||
func CalcMessageClock(lastObservedValue int64, timeInMs TimestampInMs) int64 {
|
||||
clock := lastObservedValue
|
||||
if clock < timeInMs {
|
||||
if clock < int64(timeInMs) {
|
||||
// Added time should be larger than time skew tollerance for a message.
|
||||
// Here, we use 5 minutes which is much larger
|
||||
// than accepted message time skew by Whisper.
|
||||
clock = timeInMs + int64(5*time.Minute/time.Millisecond)
|
||||
clock = int64(timeInMs) + int64(5*time.Minute/time.Millisecond)
|
||||
} else {
|
||||
clock++
|
||||
}
|
||||
|
@ -9,7 +9,7 @@ import (
|
||||
)
|
||||
|
||||
// NewMessageDecoder returns a new Transit decoder
|
||||
// that can deserialize StatusMessage structs.
|
||||
// that can deserialize Message structs.
|
||||
// More about Transit: https://github.com/cognitect/transit-format
|
||||
func NewMessageDecoder(r io.Reader) *transit.Decoder {
|
||||
decoder := transit.NewDecoder(r)
|
||||
@ -29,7 +29,7 @@ func statusMessageHandler(d transit.Decoder, value interface{}) (interface{}, er
|
||||
return nil, errors.New("tagged value does not contain values")
|
||||
}
|
||||
|
||||
sm := StatusMessage{}
|
||||
sm := Message{}
|
||||
for idx, v := range values {
|
||||
var ok bool
|
||||
|
||||
@ -47,7 +47,11 @@ func statusMessageHandler(d transit.Decoder, value interface{}) (interface{}, er
|
||||
case 3:
|
||||
sm.Clock, ok = v.(int64)
|
||||
case 4:
|
||||
sm.Timestamp, ok = v.(int64)
|
||||
var timestamp int64
|
||||
timestamp, ok = v.(int64)
|
||||
if ok {
|
||||
sm.Timestamp = TimestampInMs(timestamp)
|
||||
}
|
||||
case 5:
|
||||
var content map[interface{}]interface{}
|
||||
content, ok = v.(map[interface{}]interface{})
|
||||
|
@ -8,27 +8,27 @@ import (
|
||||
)
|
||||
|
||||
// NewMessageEncoder returns a new Transit encoder
|
||||
// that can encode StatusMessage values.
|
||||
// that can encode Message values.
|
||||
// More about Transit: https://github.com/cognitect/transit-format
|
||||
func NewMessageEncoder(w io.Writer) *transit.Encoder {
|
||||
encoder := transit.NewEncoder(w, false)
|
||||
encoder.AddHandler(statusMessageType, defaultStatusMessageValueEncoder)
|
||||
encoder.AddHandler(messageType, defaultMessageValueEncoder)
|
||||
return encoder
|
||||
}
|
||||
|
||||
var (
|
||||
statusMessageType = reflect.TypeOf(StatusMessage{})
|
||||
defaultStatusMessageValueEncoder = &statusMessageValueEncoder{}
|
||||
messageType = reflect.TypeOf(Message{})
|
||||
defaultMessageValueEncoder = &messageValueEncoder{}
|
||||
)
|
||||
|
||||
type statusMessageValueEncoder struct{}
|
||||
type messageValueEncoder struct{}
|
||||
|
||||
func (statusMessageValueEncoder) IsStringable(reflect.Value) bool {
|
||||
func (messageValueEncoder) IsStringable(reflect.Value) bool {
|
||||
return false
|
||||
}
|
||||
|
||||
func (statusMessageValueEncoder) Encode(e transit.Encoder, value reflect.Value, asString bool) error {
|
||||
message := value.Interface().(StatusMessage)
|
||||
func (messageValueEncoder) Encode(e transit.Encoder, value reflect.Value, asString bool) error {
|
||||
message := value.Interface().(Message)
|
||||
taggedValue := transit.TaggedValue{
|
||||
Tag: statusMessageTag,
|
||||
Value: []interface{}{
|
||||
|
@ -2,6 +2,7 @@ package protocol
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"crypto/ecdsa"
|
||||
"errors"
|
||||
"strings"
|
||||
"time"
|
||||
@ -24,50 +25,69 @@ var (
|
||||
ErrInvalidDecodedValue = errors.New("invalid decoded value type")
|
||||
)
|
||||
|
||||
// StatusMessageContent contains the chat ID and the actual text of a message.
|
||||
type StatusMessageContent struct {
|
||||
// Content contains the chat ID and the actual text of a message.
|
||||
type Content struct {
|
||||
ChatID string `json:"chat_id"`
|
||||
Text string `json:"text"`
|
||||
}
|
||||
|
||||
// StatusMessage contains all message details.
|
||||
type StatusMessage struct {
|
||||
Text string `json:"text"` // TODO: why is this duplicated?
|
||||
ContentT string `json:"content_type"`
|
||||
MessageT string `json:"message_type"`
|
||||
Clock int64 `json:"clock"` // in milliseconds; see CalcMessageClock for more details
|
||||
Timestamp int64 `json:"timestamp"` // in milliseconds
|
||||
Content StatusMessageContent `json:"content"`
|
||||
// TimestampInMs is a timestamp in milliseconds.
|
||||
type TimestampInMs int64
|
||||
|
||||
// Time returns a time.Time instance.
|
||||
func (t TimestampInMs) Time() time.Time {
|
||||
ts := int64(t)
|
||||
seconds := ts / 1000
|
||||
return time.Unix(seconds, (ts%1000)*int64(time.Millisecond))
|
||||
}
|
||||
|
||||
// CreateTextStatusMessage creates a StatusMessage.
|
||||
func CreateTextStatusMessage(data []byte, lastClock int64, chatID, messageType string) StatusMessage {
|
||||
// TimestampInMsFromTime returns a TimestampInMs from a time.Time instance.
|
||||
func TimestampInMsFromTime(t time.Time) TimestampInMs {
|
||||
return TimestampInMs(t.UnixNano() / int64(time.Millisecond))
|
||||
}
|
||||
|
||||
// Message contains all message details.
|
||||
type Message struct {
|
||||
Text string `json:"text"` // TODO: why is this duplicated?
|
||||
ContentT string `json:"content_type"`
|
||||
MessageT string `json:"message_type"`
|
||||
Clock int64 `json:"clock"` // lamport timestamp; see CalcMessageClock for more details
|
||||
Timestamp TimestampInMs `json:"timestamp"`
|
||||
Content Content `json:"content"`
|
||||
|
||||
// not protocol defined fields
|
||||
ID []byte `json:"id"`
|
||||
SigPubKey *ecdsa.PublicKey `json:"-"`
|
||||
}
|
||||
|
||||
// createTextMessage creates a Message.
|
||||
func createTextMessage(data []byte, lastClock int64, chatID, messageType string) Message {
|
||||
text := strings.TrimSpace(string(data))
|
||||
ts := time.Now().Unix() * 1000
|
||||
ts := TimestampInMsFromTime(time.Now())
|
||||
clock := CalcMessageClock(lastClock, ts)
|
||||
|
||||
return StatusMessage{
|
||||
return Message{
|
||||
Text: text,
|
||||
ContentT: ContentTypeTextPlain,
|
||||
MessageT: messageType,
|
||||
Clock: clock,
|
||||
Timestamp: ts,
|
||||
Content: StatusMessageContent{ChatID: chatID, Text: text},
|
||||
Content: Content{ChatID: chatID, Text: text},
|
||||
}
|
||||
}
|
||||
|
||||
// CreatePublicTextMessage creates a public text StatusMessage.
|
||||
func CreatePublicTextMessage(data []byte, lastClock int64, chatID string) StatusMessage {
|
||||
return CreateTextStatusMessage(data, lastClock, chatID, MessageTypePublicGroup)
|
||||
// CreatePublicTextMessage creates a public text Message.
|
||||
func CreatePublicTextMessage(data []byte, lastClock int64, chatID string) Message {
|
||||
return createTextMessage(data, lastClock, chatID, MessageTypePublicGroup)
|
||||
}
|
||||
|
||||
// CreatePrivateTextMessage creates a public text StatusMessage.
|
||||
func CreatePrivateTextMessage(data []byte, lastClock int64, chatID string) StatusMessage {
|
||||
return CreateTextStatusMessage(data, lastClock, chatID, MessageTypePrivate)
|
||||
// CreatePrivateTextMessage creates a public text Message.
|
||||
func CreatePrivateTextMessage(data []byte, lastClock int64, chatID string) Message {
|
||||
return createTextMessage(data, lastClock, chatID, MessageTypePrivate)
|
||||
}
|
||||
|
||||
// DecodeMessage decodes a raw payload to StatusMessage struct.
|
||||
func DecodeMessage(data []byte) (message StatusMessage, err error) {
|
||||
// DecodeMessage decodes a raw payload to Message struct.
|
||||
func DecodeMessage(data []byte) (message Message, err error) {
|
||||
buf := bytes.NewBuffer(data)
|
||||
decoder := NewMessageDecoder(buf)
|
||||
value, err := decoder.Decode()
|
||||
@ -75,15 +95,15 @@ func DecodeMessage(data []byte) (message StatusMessage, err error) {
|
||||
return
|
||||
}
|
||||
|
||||
message, ok := value.(StatusMessage)
|
||||
message, ok := value.(Message)
|
||||
if !ok {
|
||||
return message, ErrInvalidDecodedValue
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// EncodeMessage encodes a StatusMessage using Transit serialization.
|
||||
func EncodeMessage(value StatusMessage) ([]byte, error) {
|
||||
// EncodeMessage encodes a Message using Transit serialization.
|
||||
func EncodeMessage(value Message) ([]byte, error) {
|
||||
var buf bytes.Buffer
|
||||
encoder := NewMessageEncoder(&buf)
|
||||
if err := encoder.Encode(value); err != nil {
|
||||
|
@ -2,19 +2,20 @@ package protocol
|
||||
|
||||
import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
var (
|
||||
testMessageBytes = []byte(`["~#c4",["abc123","text/plain","~:public-group-user-message",154593077368201,1545930773682,["^ ","~:chat-id","testing-adamb","~:text","abc123"]]]`)
|
||||
testMessageStruct = StatusMessage{
|
||||
testMessageStruct = Message{
|
||||
Text: "abc123",
|
||||
ContentT: "text/plain",
|
||||
MessageT: "public-group-user-message",
|
||||
Clock: 154593077368201,
|
||||
Timestamp: 1545930773682,
|
||||
Content: StatusMessageContent{"testing-adamb", "abc123"},
|
||||
Content: Content{"testing-adamb", "abc123"},
|
||||
}
|
||||
)
|
||||
|
||||
@ -46,3 +47,10 @@ func TestEncodeMessage(t *testing.T) {
|
||||
require.NoError(t, err)
|
||||
require.EqualValues(t, testMessageStruct, val)
|
||||
}
|
||||
|
||||
func TestTimestampInMs(t *testing.T) {
|
||||
ts := TimestampInMs(1555274502548) // random timestamp in milliseconds
|
||||
tt := ts.Time()
|
||||
require.Equal(t, tt.UnixNano(), 1555274502548*int64(time.Millisecond))
|
||||
require.Equal(t, ts, TimestampInMsFromTime(tt))
|
||||
}
|
||||
|
@ -21,15 +21,6 @@ type Protocol interface {
|
||||
Request(ctx context.Context, params RequestOptions) error
|
||||
}
|
||||
|
||||
// Message contains a decoded message payload
|
||||
// and some additional fields that we learnt
|
||||
// about the message.
|
||||
type Message struct {
|
||||
Decoded StatusMessage `json:"message"`
|
||||
SigPubKey *ecdsa.PublicKey `json:"-"`
|
||||
Hash []byte `json:"hash"`
|
||||
}
|
||||
|
||||
// ChatOptions are chat specific options, usually related to the recipient/destination.
|
||||
type ChatOptions struct {
|
||||
ChatName string // for public chats
|
||||
@ -41,8 +32,18 @@ type ChatOptions struct {
|
||||
type RequestOptions struct {
|
||||
ChatOptions
|
||||
Limit int
|
||||
From int64
|
||||
To int64
|
||||
From int64 // in seconds
|
||||
To int64 // in seconds
|
||||
}
|
||||
|
||||
// FromAsTime converts int64 (timestamp in seconds) to time.Time.
|
||||
func (o RequestOptions) FromAsTime() time.Time {
|
||||
return time.Unix(o.From, 0)
|
||||
}
|
||||
|
||||
// ToAsTime converts int64 (timestamp in seconds) to time.Time.
|
||||
func (o RequestOptions) ToAsTime() time.Time {
|
||||
return time.Unix(o.To, 0)
|
||||
}
|
||||
|
||||
// Validate verifies that the given request options are valid.
|
||||
|
Loading…
x
Reference in New Issue
Block a user