feature/listening (#2)

* basic listening loop

* added doc block

* added node interface
This commit is contained in:
Dean Eigenmann 2019-08-05 22:02:38 +02:00 committed by GitHub
parent 93625382b6
commit 2fb2649a28
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 26 additions and 2 deletions

View File

@ -8,8 +8,8 @@ import (
"github.com/ethereum/go-ethereum/crypto"
"github.com/golang/protobuf/proto"
"github.com/pkg/errors"
"github.com/vacp2p/dasy/client/internal"
"github.com/vacp2p/dasy/protobuf"
mvds "github.com/vacp2p/mvds/node"
mvdsproto "github.com/vacp2p/mvds/protobuf"
"github.com/vacp2p/mvds/state"
"github.com/vacp2p/mvds/store"
@ -23,7 +23,7 @@ type Peer state.PeerID
// Client is the actual daisy client.
type Client struct {
node mvds.Node
node internal.DataSyncNode
store store.MessageStore // @todo we probably need a different message store, not sure tho
identity *ecdsa.PrivateKey
@ -61,6 +61,16 @@ func (c *Client) Post(chat Chat, body []byte) error {
return c.send(chat, protobuf.Message_POST, body)
}
// Listen listens for newly received messages and handles them appropriately.
func (c *Client) Listen() {
sub := make(chan mvdsproto.Message)
c.node.Subscribe(sub)
for {
go c.onReceive(<- sub)
}
}
func (c *Client) send(chat Chat, t protobuf.Message_MessageType, body []byte) error {
lastMessage := c.lastMessages[chat]
msg := &protobuf.Message{
@ -89,6 +99,8 @@ func (c *Client) send(chat Chat, t protobuf.Message_MessageType, body []byte) er
return nil
}
// onReceive handles lower level message receiving logic, such as requesting all previous message dependencies that we
// may not have, as well as unmarshalling and storing the message.
func (c *Client) onReceive(message mvdsproto.Message) {
var msg protobuf.Message
err := proto.Unmarshal(message.Body, &msg)

12
client/internal/node.go Normal file
View File

@ -0,0 +1,12 @@
package internal
import (
"github.com/vacp2p/mvds/protobuf"
"github.com/vacp2p/mvds/state"
)
type DataSyncNode interface {
AppendMessage(groupID state.GroupID, data []byte) (state.MessageID, error)
Subscribe(sub chan <-protobuf.Message)
RequestMessage(group state.GroupID, id state.MessageID) error
}