diff --git a/client/client.go b/client/client.go index 79831ef..18f8e09 100644 --- a/client/client.go +++ b/client/client.go @@ -8,8 +8,8 @@ import ( "github.com/ethereum/go-ethereum/crypto" "github.com/golang/protobuf/proto" "github.com/pkg/errors" + "github.com/vacp2p/dasy/client/internal" "github.com/vacp2p/dasy/protobuf" - mvds "github.com/vacp2p/mvds/node" mvdsproto "github.com/vacp2p/mvds/protobuf" "github.com/vacp2p/mvds/state" "github.com/vacp2p/mvds/store" @@ -23,7 +23,7 @@ type Peer state.PeerID // Client is the actual daisy client. type Client struct { - node mvds.Node + node internal.DataSyncNode store store.MessageStore // @todo we probably need a different message store, not sure tho identity *ecdsa.PrivateKey @@ -61,6 +61,16 @@ func (c *Client) Post(chat Chat, body []byte) error { return c.send(chat, protobuf.Message_POST, body) } +// Listen listens for newly received messages and handles them appropriately. +func (c *Client) Listen() { + sub := make(chan mvdsproto.Message) + c.node.Subscribe(sub) + + for { + go c.onReceive(<- sub) + } +} + func (c *Client) send(chat Chat, t protobuf.Message_MessageType, body []byte) error { lastMessage := c.lastMessages[chat] msg := &protobuf.Message{ @@ -89,6 +99,8 @@ func (c *Client) send(chat Chat, t protobuf.Message_MessageType, body []byte) er return nil } +// onReceive handles lower level message receiving logic, such as requesting all previous message dependencies that we +// may not have, as well as unmarshalling and storing the message. func (c *Client) onReceive(message mvdsproto.Message) { var msg protobuf.Message err := proto.Unmarshal(message.Body, &msg) diff --git a/client/internal/node.go b/client/internal/node.go new file mode 100644 index 0000000..2f75c96 --- /dev/null +++ b/client/internal/node.go @@ -0,0 +1,12 @@ +package internal + +import ( + "github.com/vacp2p/mvds/protobuf" + "github.com/vacp2p/mvds/state" +) + +type DataSyncNode interface { + AppendMessage(groupID state.GroupID, data []byte) (state.MessageID, error) + Subscribe(sub chan <-protobuf.Message) + RequestMessage(group state.GroupID, id state.MessageID) error +}