dasy/client/client.go

146 lines
3.6 KiB
Go

// Package client contains the implementation of a `dasy` client.
package client
// @todo add ordering by default
import (
"crypto/ecdsa"
"log"
"sync"
ethcrypto "github.com/ethereum/go-ethereum/crypto"
"github.com/golang/protobuf/proto"
"github.com/pkg/errors"
"github.com/vacp2p/dasy/client/internal"
"github.com/vacp2p/dasy/crypto"
"github.com/vacp2p/dasy/event"
"github.com/vacp2p/dasy/protobuf"
mvdsproto "github.com/vacp2p/mvds/protobuf"
"github.com/vacp2p/mvds/state"
)
// Chat is the ID for a specific chat.
type Chat state.GroupID
// Peer is the ID for a specific peer.
type Peer state.PeerID
// Client is the actual dasy client.
type Client struct {
sync.Mutex
id Peer // @todo think of turning dataSyncNode ID into a func
node internal.DataSyncNode
//store store.MessageStore
identity *ecdsa.PrivateKey
feeds map[protobuf.Message_MessageType]*event.Feed
}
// Invite invites a peer to a chat.
func (c *Client) Invite(chat Chat, peer Peer) {
}
// Join joins a chat.
func (c *Client) Join(chat Chat) (state.MessageID, error) {
return c.send(chat, protobuf.Message_JOIN, c.id[:])
}
// Leave leaves a chat.
func (c *Client) Leave(chat Chat) (state.MessageID, error) {
return c.send(chat, protobuf.Message_LEAVE, c.id[:])
}
// Kick kicks peer from a chat.
func (c *Client) Kick(chat Chat, peer Peer) (state.MessageID, error) {
return c.send(chat, protobuf.Message_KICK, peer[:])
}
// Ack acknowledges `Join`, `Leave` and `Kick` messages.
func (c *Client) Ack(chat Chat, messageID state.MessageID) (state.MessageID, error) {
// @todo We may not need this as we can rely on the acks of data sync
return c.send(chat, protobuf.Message_ACK, messageID[:])
}
// Post sends a message to a chat.
func (c *Client) Post(chat Chat, body []byte) (state.MessageID, error) {
return c.send(chat, protobuf.Message_POST, body)
}
// Feed is a subscription feed for the specified message type.
func (c *Client) Feed(msg protobuf.Message_MessageType) *event.Feed {
c.Lock()
defer c.Unlock()
if c.feeds[msg] == nil {
c.feeds[msg] = new(event.Feed)
}
return c.feeds[msg]
}
// Listen listens for newly received messages and handles them appropriately.
func (c *Client) Listen() {
sub := c.node.Subscribe()
for {
go c.onReceive(<-sub)
}
}
func (c *Client) send(chat Chat, t protobuf.Message_MessageType, body []byte) (state.MessageID, error) {
c.Lock()
defer c.Unlock()
msg := &protobuf.Message{
MessageType: protobuf.Message_MessageType(t),
Body: body,
}
err := crypto.Sign(c.identity, msg)
if err != nil {
return state.MessageID{}, errors.Wrap(err, "failed to sign message")
}
buf, err := proto.Marshal(msg)
if err != nil {
return state.MessageID{}, errors.Wrap(err, "failed to marshall message")
}
id, err := c.node.AppendMessage(state.GroupID(chat), buf)
if err != nil {
return state.MessageID{}, errors.Wrap(err, "failed to append message")
}
return id, nil
}
// onReceive handles lower level message receiving logic
func (c *Client) onReceive(message mvdsproto.Message) {
var msg protobuf.Message
err := proto.Unmarshal(message.Body, &msg)
if err != nil {
log.Printf("error while unmarshalling message: %s", err.Error())
return
}
pubkey, err := ethcrypto.SigToPub(msg.ID(), msg.Signature)
if err != nil {
log.Printf("error while recovering pubkey: %s", err.Error())
// @todo
return
}
payload := event.Payload{
Body: msg.Body, // @todo this might need to be unmarshalled depending on the message type like invite?
Signature: msg.Signature, // @todo recover from signature
Sender: crypto.PublicKeyToPeerID(*pubkey),
Timestamp: message.Timestamp,
}
go c.Feed(msg.MessageType).Send(payload)
}