2019-10-14 16:10:48 +02:00

563 lines
13 KiB
Go

// Package node contains node logic.
package node
// @todo this is a very rough implementation that needs cleanup
import (
"context"
"database/sql"
"encoding/hex"
"fmt"
"sync/atomic"
"time"
"go.uber.org/zap"
"github.com/vacp2p/mvds/peers"
"github.com/vacp2p/mvds/protobuf"
"github.com/vacp2p/mvds/state"
"github.com/vacp2p/mvds/store"
"github.com/vacp2p/mvds/transport"
)
// Mode represents the synchronization mode.
type Mode int
const (
INTERACTIVE Mode = iota
BATCH
)
// CalculateNextEpoch is a function used to calculate the next `SendEpoch` for a given message.
type CalculateNextEpoch func(count uint64, epoch int64) int64
// Node represents an MVDS node, it runs all the logic like sending and receiving protocol messages.
type Node struct {
// This needs to be declared first: https://github.com/golang/go/issues/9959
epoch int64
ctx context.Context
cancel context.CancelFunc
store store.MessageStore
transport transport.Transport
syncState state.SyncState
peers peers.Persistence
payloads payloads
nextEpoch CalculateNextEpoch
ID state.PeerID
epochPersistence *epochSQLitePersistence
mode Mode
subscription chan protobuf.Message
logger *zap.Logger
}
func NewPersistentNode(
db *sql.DB,
st transport.Transport,
id state.PeerID,
mode Mode,
nextEpoch CalculateNextEpoch,
logger *zap.Logger,
) (*Node, error) {
ctx, cancel := context.WithCancel(context.Background())
if logger == nil {
logger = zap.NewNop()
}
node := Node{
ID: id,
ctx: ctx,
cancel: cancel,
store: store.NewPersistentMessageStore(db),
transport: st,
peers: peers.NewSQLitePersistence(db),
syncState: state.NewPersistentSyncState(db),
payloads: newPayloads(),
epochPersistence: newEpochSQLitePersistence(db),
nextEpoch: nextEpoch,
logger: logger.With(zap.Namespace("mvds")),
mode: mode,
}
if currentEpoch, err := node.epochPersistence.Get(id); err != nil {
return nil, err
} else {
node.epoch = currentEpoch
}
return &node, nil
}
func NewEphemeralNode(
id state.PeerID,
t transport.Transport,
nextEpoch CalculateNextEpoch,
currentEpoch int64,
mode Mode,
logger *zap.Logger,
) *Node {
ctx, cancel := context.WithCancel(context.Background())
if logger == nil {
logger = zap.NewNop()
}
return &Node{
ID: id,
ctx: ctx,
cancel: cancel,
store: store.NewDummyStore(),
transport: t,
syncState: state.NewSyncState(),
peers: peers.NewMemoryPersistence(),
payloads: newPayloads(),
nextEpoch: nextEpoch,
epoch: currentEpoch,
logger: logger.With(zap.Namespace("mvds")),
mode: mode,
}
}
// NewNode returns a new node.
func NewNode(
ms store.MessageStore,
st transport.Transport,
ss state.SyncState,
nextEpoch CalculateNextEpoch,
currentEpoch int64,
id state.PeerID,
mode Mode,
pp peers.Persistence,
logger *zap.Logger,
) *Node {
ctx, cancel := context.WithCancel(context.Background())
if logger == nil {
logger = zap.NewNop()
}
return &Node{
ctx: ctx,
cancel: cancel,
store: ms,
transport: st,
syncState: ss,
peers: pp,
payloads: newPayloads(),
nextEpoch: nextEpoch,
ID: id,
epoch: currentEpoch,
logger: logger.With(zap.Namespace("mvds")),
mode: mode,
}
}
func (n *Node) CurrentEpoch() int64 {
return atomic.LoadInt64(&n.epoch)
}
// Start listens for new messages received by the node and sends out those required every epoch.
func (n *Node) Start(duration time.Duration) {
go func() {
for {
select {
case <-n.ctx.Done():
n.logger.Info("Watch stopped")
return
default:
p := n.transport.Watch()
go n.onPayload(p.Sender, p.Payload)
}
}
}()
go func() {
for {
select {
case <-n.ctx.Done():
n.logger.Info("Epoch processing stopped")
return
default:
n.logger.Debug("Epoch processing", zap.String("node", hex.EncodeToString(n.ID[:4])), zap.Int64("epoch", n.epoch))
time.Sleep(duration)
err := n.sendMessages()
if err != nil {
n.logger.Error("Error sending messages.", zap.Error(err))
}
atomic.AddInt64(&n.epoch, 1)
// When a persistent node is used, the epoch needs to be saved.
if n.epochPersistence != nil {
if err := n.epochPersistence.Set(n.ID, n.epoch); err != nil {
n.logger.Error("Failed to persisten epoch", zap.Error(err))
}
}
}
}
}()
}
// Stop message reading and epoch processing
func (n *Node) Stop() {
n.logger.Info("Stopping node")
n.Unsubscribe()
n.cancel()
}
// Subscribe subscribes to incoming messages.
func (n *Node) Subscribe() chan protobuf.Message {
n.subscription = make(chan protobuf.Message)
return n.subscription
}
// Unsubscribe closes the listening channels
func (n *Node) Unsubscribe() {
if n.subscription != nil {
close(n.subscription)
}
n.subscription = nil
}
// AppendMessage sends a message to a given group.
func (n *Node) AppendMessage(groupID state.GroupID, data []byte) (state.MessageID, error) {
m := protobuf.Message{
GroupId: groupID[:],
Timestamp: time.Now().Unix(),
Body: data,
}
id := m.ID()
peers, err := n.peers.GetByGroupID(groupID)
if err != nil {
return state.MessageID{}, fmt.Errorf("trying to send to unknown group %x", groupID[:4])
}
err = n.store.Add(&m)
if err != nil {
return state.MessageID{}, err
}
for _, p := range peers {
t := state.OFFER
if n.mode == BATCH {
t = state.MESSAGE
}
n.insertSyncState(&groupID, id, p, t)
}
n.logger.Debug("Sending message",
zap.String("node", hex.EncodeToString(n.ID[:4])),
zap.String("groupID", hex.EncodeToString(groupID[:4])),
zap.String("id", hex.EncodeToString(id[:4])))
// @todo think about a way to insta trigger send messages when send was selected, we don't wanna wait for ticks here
return id, nil
}
// RequestMessage adds a REQUEST record to the next payload for a given message ID.
func (n *Node) RequestMessage(group state.GroupID, id state.MessageID) error {
peers, err := n.peers.GetByGroupID(group)
if err != nil {
return fmt.Errorf("trying to request from an unknown group %x", group[:4])
}
for _, p := range peers {
exist, err := n.IsPeerInGroup(group, p)
if err != nil {
return err
}
if exist {
continue
}
n.insertSyncState(&group, id, p, state.REQUEST)
}
return nil
}
// AddPeer adds a peer to a specific group making it a recipient of messages.
func (n *Node) AddPeer(group state.GroupID, id state.PeerID) error {
return n.peers.Add(group, id)
}
// IsPeerInGroup checks whether a peer is in the specified group.
func (n *Node) IsPeerInGroup(g state.GroupID, p state.PeerID) (bool, error) {
return n.peers.Exists(g, p)
}
func (n *Node) sendMessages() error {
err := n.syncState.Map(n.epoch, func(s state.State) state.State {
m := s.MessageID
p := s.PeerID
switch s.Type {
case state.OFFER:
n.payloads.AddOffers(p, m[:])
case state.REQUEST:
n.payloads.AddRequests(p, m[:])
n.logger.Debug("sending REQUEST",
zap.String("from", hex.EncodeToString(n.ID[:4])),
zap.String("to", hex.EncodeToString(p[:4])),
zap.String("messageID", hex.EncodeToString(m[:4])),
)
case state.MESSAGE:
g := *s.GroupID
// TODO: Handle errors
exist, err := n.IsPeerInGroup(g, p)
if err != nil {
return s
}
if !exist {
return s
}
msg, err := n.store.Get(m)
if err != nil {
n.logger.Error("Failed to retreive message",
zap.String("messageID", hex.EncodeToString(m[:4])),
zap.Error(err),
)
return s
}
n.payloads.AddMessages(p, msg)
n.logger.Debug("sending MESSAGE",
zap.String("groupID", hex.EncodeToString(g[:4])),
zap.String("from", hex.EncodeToString(n.ID[:4])),
zap.String("to", hex.EncodeToString(p[:4])),
zap.String("messageID", hex.EncodeToString(m[:4])),
)
}
return n.updateSendEpoch(s)
})
if err != nil {
n.logger.Error("error while mapping sync state", zap.Error(err))
return err
}
return n.payloads.MapAndClear(func(peer state.PeerID, payload protobuf.Payload) error {
err := n.transport.Send(n.ID, peer, payload)
if err != nil {
n.logger.Error("error sending message", zap.Error(err))
return err
}
return nil
})
}
func (n *Node) onPayload(sender state.PeerID, payload protobuf.Payload) {
// Acks, Requests and Offers are all arrays of bytes as protobuf doesn't allow type aliases otherwise arrays of messageIDs would be nicer.
if err := n.onAck(sender, payload.Acks); err != nil {
n.logger.Error("error processing acks", zap.Error(err))
}
if err := n.onRequest(sender, payload.Requests); err != nil {
n.logger.Error("error processing requests", zap.Error(err))
}
if err := n.onOffer(sender, payload.Offers); err != nil {
n.logger.Error("error processing offers", zap.Error(err))
}
messageIds := n.onMessages(sender, payload.Messages)
n.payloads.AddAcks(sender, messageIds)
}
func (n *Node) onOffer(sender state.PeerID, offers [][]byte) error {
for _, raw := range offers {
id := toMessageID(raw)
n.logger.Debug("OFFER received",
zap.String("from", hex.EncodeToString(sender[:4])),
zap.String("to", hex.EncodeToString(n.ID[:4])),
zap.String("messageID", hex.EncodeToString(id[:4])),
)
exist, err := n.store.Has(id)
// @todo maybe ack?
if err != nil {
return err
}
if exist {
continue
}
n.insertSyncState(nil, id, sender, state.REQUEST)
}
return nil
}
func (n *Node) onRequest(sender state.PeerID, requests [][]byte) error {
for _, raw := range requests {
id := toMessageID(raw)
n.logger.Debug("REQUEST received",
zap.String("from", hex.EncodeToString(sender[:4])),
zap.String("to", hex.EncodeToString(n.ID[:4])),
zap.String("messageID", hex.EncodeToString(id[:4])),
)
message, err := n.store.Get(id)
if err != nil {
return err
}
if message == nil {
n.logger.Error("message does not exist", zap.String("messageID", hex.EncodeToString(id[:4])))
continue
}
groupID := toGroupID(message.GroupId)
exist, err := n.IsPeerInGroup(groupID, sender)
if err != nil {
return err
}
if !exist {
n.logger.Error("peer is not in group",
zap.String("groupID", hex.EncodeToString(groupID[:4])),
zap.String("peer", hex.EncodeToString(sender[:4])),
)
continue
}
n.insertSyncState(&groupID, id, sender, state.MESSAGE)
}
return nil
}
func (n *Node) onAck(sender state.PeerID, acks [][]byte) error {
for _, ack := range acks {
id := toMessageID(ack)
err := n.syncState.Remove(id, sender)
if err != nil {
n.logger.Error("Error while removing sync state.", zap.Error(err))
return err
}
n.logger.Debug("ACK received",
zap.String("from", hex.EncodeToString(sender[:4])),
zap.String("to", hex.EncodeToString(n.ID[:4])),
zap.String("messageID", hex.EncodeToString(id[:4])),
)
}
return nil
}
func (n *Node) onMessages(sender state.PeerID, messages []*protobuf.Message) [][]byte {
a := make([][]byte, 0)
for _, m := range messages {
groupID := toGroupID(m.GroupId)
err := n.onMessage(sender, *m)
if err != nil {
n.logger.Error("Error processing message", zap.Error(err))
continue
}
id := m.ID()
n.logger.Debug("sending ACK",
zap.String("groupID", hex.EncodeToString(groupID[:4])),
zap.String("from", hex.EncodeToString(n.ID[:4])),
zap.String("", hex.EncodeToString(sender[:4])),
zap.String("messageID", hex.EncodeToString(id[:4])),
)
a = append(a, id[:])
}
return a
}
func (n *Node) onMessage(sender state.PeerID, msg protobuf.Message) error {
id := msg.ID()
groupID := toGroupID(msg.GroupId)
n.logger.Debug("MESSAGE received",
zap.String("from", hex.EncodeToString(sender[:4])),
zap.String("to", hex.EncodeToString(n.ID[:4])),
zap.String("messageID", hex.EncodeToString(id[:4])),
)
err := n.syncState.Remove(id, sender)
if err != nil && err != state.ErrStateNotFound {
return err
}
err = n.store.Add(&msg)
if err != nil {
return err
// @todo process, should this function ever even have an error?
}
peers, err := n.peers.GetByGroupID(groupID)
if err != nil {
return err
}
for _, peer := range peers {
if peer == sender {
continue
}
n.insertSyncState(&groupID, id, peer, state.OFFER)
}
if n.subscription != nil {
n.subscription <- msg
}
return nil
}
func (n *Node) insertSyncState(groupID *state.GroupID, messageID state.MessageID, peerID state.PeerID, t state.RecordType) {
s := state.State{
GroupID: groupID,
MessageID: messageID,
PeerID: peerID,
Type: t,
SendEpoch: n.epoch + 1,
}
err := n.syncState.Add(s)
if err != nil {
n.logger.Error("error setting sync states",
zap.Error(err),
zap.String("groupID", hex.EncodeToString(groupID[:4])),
zap.String("messageID", hex.EncodeToString(messageID[:4])),
zap.String("peerID", hex.EncodeToString(peerID[:4])),
)
}
}
func (n *Node) updateSendEpoch(s state.State) state.State {
s.SendCount += 1
s.SendEpoch += n.nextEpoch(s.SendCount, n.epoch)
return s
}
func toMessageID(b []byte) state.MessageID {
var id state.MessageID
copy(id[:], b)
return id
}
func toGroupID(b []byte) state.GroupID {
var id state.GroupID
copy(id[:], b)
return id
}