563 lines
13 KiB
Go
563 lines
13 KiB
Go
// Package node contains node logic.
|
|
package node
|
|
|
|
// @todo this is a very rough implementation that needs cleanup
|
|
|
|
import (
|
|
"context"
|
|
"database/sql"
|
|
"encoding/hex"
|
|
"fmt"
|
|
"sync/atomic"
|
|
"time"
|
|
|
|
"go.uber.org/zap"
|
|
|
|
"github.com/vacp2p/mvds/peers"
|
|
"github.com/vacp2p/mvds/protobuf"
|
|
"github.com/vacp2p/mvds/state"
|
|
"github.com/vacp2p/mvds/store"
|
|
"github.com/vacp2p/mvds/transport"
|
|
)
|
|
|
|
// Mode represents the synchronization mode.
|
|
type Mode int
|
|
|
|
const (
|
|
INTERACTIVE Mode = iota
|
|
BATCH
|
|
)
|
|
|
|
// CalculateNextEpoch is a function used to calculate the next `SendEpoch` for a given message.
|
|
type CalculateNextEpoch func(count uint64, epoch int64) int64
|
|
|
|
// Node represents an MVDS node, it runs all the logic like sending and receiving protocol messages.
|
|
type Node struct {
|
|
// This needs to be declared first: https://github.com/golang/go/issues/9959
|
|
epoch int64
|
|
ctx context.Context
|
|
cancel context.CancelFunc
|
|
|
|
store store.MessageStore
|
|
transport transport.Transport
|
|
|
|
syncState state.SyncState
|
|
|
|
peers peers.Persistence
|
|
|
|
payloads payloads
|
|
|
|
nextEpoch CalculateNextEpoch
|
|
|
|
ID state.PeerID
|
|
|
|
epochPersistence *epochSQLitePersistence
|
|
mode Mode
|
|
|
|
subscription chan protobuf.Message
|
|
|
|
logger *zap.Logger
|
|
}
|
|
|
|
func NewPersistentNode(
|
|
db *sql.DB,
|
|
st transport.Transport,
|
|
id state.PeerID,
|
|
mode Mode,
|
|
nextEpoch CalculateNextEpoch,
|
|
logger *zap.Logger,
|
|
) (*Node, error) {
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
if logger == nil {
|
|
logger = zap.NewNop()
|
|
}
|
|
|
|
node := Node{
|
|
ID: id,
|
|
ctx: ctx,
|
|
cancel: cancel,
|
|
store: store.NewPersistentMessageStore(db),
|
|
transport: st,
|
|
peers: peers.NewSQLitePersistence(db),
|
|
syncState: state.NewPersistentSyncState(db),
|
|
payloads: newPayloads(),
|
|
epochPersistence: newEpochSQLitePersistence(db),
|
|
nextEpoch: nextEpoch,
|
|
logger: logger.With(zap.Namespace("mvds")),
|
|
mode: mode,
|
|
}
|
|
if currentEpoch, err := node.epochPersistence.Get(id); err != nil {
|
|
return nil, err
|
|
} else {
|
|
node.epoch = currentEpoch
|
|
}
|
|
return &node, nil
|
|
}
|
|
|
|
func NewEphemeralNode(
|
|
id state.PeerID,
|
|
t transport.Transport,
|
|
nextEpoch CalculateNextEpoch,
|
|
currentEpoch int64,
|
|
mode Mode,
|
|
logger *zap.Logger,
|
|
) *Node {
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
if logger == nil {
|
|
logger = zap.NewNop()
|
|
}
|
|
|
|
return &Node{
|
|
ID: id,
|
|
ctx: ctx,
|
|
cancel: cancel,
|
|
store: store.NewDummyStore(),
|
|
transport: t,
|
|
syncState: state.NewSyncState(),
|
|
peers: peers.NewMemoryPersistence(),
|
|
payloads: newPayloads(),
|
|
nextEpoch: nextEpoch,
|
|
epoch: currentEpoch,
|
|
logger: logger.With(zap.Namespace("mvds")),
|
|
mode: mode,
|
|
}
|
|
}
|
|
|
|
// NewNode returns a new node.
|
|
func NewNode(
|
|
ms store.MessageStore,
|
|
st transport.Transport,
|
|
ss state.SyncState,
|
|
nextEpoch CalculateNextEpoch,
|
|
currentEpoch int64,
|
|
id state.PeerID,
|
|
mode Mode,
|
|
pp peers.Persistence,
|
|
logger *zap.Logger,
|
|
) *Node {
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
if logger == nil {
|
|
logger = zap.NewNop()
|
|
}
|
|
|
|
return &Node{
|
|
ctx: ctx,
|
|
cancel: cancel,
|
|
store: ms,
|
|
transport: st,
|
|
syncState: ss,
|
|
peers: pp,
|
|
payloads: newPayloads(),
|
|
nextEpoch: nextEpoch,
|
|
ID: id,
|
|
epoch: currentEpoch,
|
|
logger: logger.With(zap.Namespace("mvds")),
|
|
mode: mode,
|
|
}
|
|
}
|
|
|
|
func (n *Node) CurrentEpoch() int64 {
|
|
return atomic.LoadInt64(&n.epoch)
|
|
}
|
|
|
|
// Start listens for new messages received by the node and sends out those required every epoch.
|
|
func (n *Node) Start(duration time.Duration) {
|
|
go func() {
|
|
for {
|
|
select {
|
|
case <-n.ctx.Done():
|
|
n.logger.Info("Watch stopped")
|
|
return
|
|
default:
|
|
p := n.transport.Watch()
|
|
go n.onPayload(p.Sender, p.Payload)
|
|
}
|
|
}
|
|
}()
|
|
|
|
go func() {
|
|
for {
|
|
select {
|
|
case <-n.ctx.Done():
|
|
n.logger.Info("Epoch processing stopped")
|
|
return
|
|
default:
|
|
n.logger.Debug("Epoch processing", zap.String("node", hex.EncodeToString(n.ID[:4])), zap.Int64("epoch", n.epoch))
|
|
time.Sleep(duration)
|
|
err := n.sendMessages()
|
|
if err != nil {
|
|
n.logger.Error("Error sending messages.", zap.Error(err))
|
|
}
|
|
atomic.AddInt64(&n.epoch, 1)
|
|
// When a persistent node is used, the epoch needs to be saved.
|
|
if n.epochPersistence != nil {
|
|
if err := n.epochPersistence.Set(n.ID, n.epoch); err != nil {
|
|
n.logger.Error("Failed to persisten epoch", zap.Error(err))
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}()
|
|
}
|
|
|
|
// Stop message reading and epoch processing
|
|
func (n *Node) Stop() {
|
|
n.logger.Info("Stopping node")
|
|
n.Unsubscribe()
|
|
n.cancel()
|
|
}
|
|
|
|
// Subscribe subscribes to incoming messages.
|
|
func (n *Node) Subscribe() chan protobuf.Message {
|
|
n.subscription = make(chan protobuf.Message)
|
|
return n.subscription
|
|
}
|
|
|
|
// Unsubscribe closes the listening channels
|
|
func (n *Node) Unsubscribe() {
|
|
if n.subscription != nil {
|
|
close(n.subscription)
|
|
}
|
|
n.subscription = nil
|
|
}
|
|
|
|
// AppendMessage sends a message to a given group.
|
|
func (n *Node) AppendMessage(groupID state.GroupID, data []byte) (state.MessageID, error) {
|
|
m := protobuf.Message{
|
|
GroupId: groupID[:],
|
|
Timestamp: time.Now().Unix(),
|
|
Body: data,
|
|
}
|
|
|
|
id := m.ID()
|
|
|
|
peers, err := n.peers.GetByGroupID(groupID)
|
|
if err != nil {
|
|
return state.MessageID{}, fmt.Errorf("trying to send to unknown group %x", groupID[:4])
|
|
}
|
|
|
|
err = n.store.Add(&m)
|
|
if err != nil {
|
|
return state.MessageID{}, err
|
|
}
|
|
|
|
for _, p := range peers {
|
|
t := state.OFFER
|
|
if n.mode == BATCH {
|
|
t = state.MESSAGE
|
|
}
|
|
|
|
n.insertSyncState(&groupID, id, p, t)
|
|
}
|
|
|
|
n.logger.Debug("Sending message",
|
|
zap.String("node", hex.EncodeToString(n.ID[:4])),
|
|
zap.String("groupID", hex.EncodeToString(groupID[:4])),
|
|
zap.String("id", hex.EncodeToString(id[:4])))
|
|
// @todo think about a way to insta trigger send messages when send was selected, we don't wanna wait for ticks here
|
|
|
|
return id, nil
|
|
}
|
|
|
|
// RequestMessage adds a REQUEST record to the next payload for a given message ID.
|
|
func (n *Node) RequestMessage(group state.GroupID, id state.MessageID) error {
|
|
peers, err := n.peers.GetByGroupID(group)
|
|
if err != nil {
|
|
return fmt.Errorf("trying to request from an unknown group %x", group[:4])
|
|
}
|
|
|
|
for _, p := range peers {
|
|
exist, err := n.IsPeerInGroup(group, p)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if exist {
|
|
continue
|
|
}
|
|
|
|
n.insertSyncState(&group, id, p, state.REQUEST)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// AddPeer adds a peer to a specific group making it a recipient of messages.
|
|
func (n *Node) AddPeer(group state.GroupID, id state.PeerID) error {
|
|
return n.peers.Add(group, id)
|
|
}
|
|
|
|
// IsPeerInGroup checks whether a peer is in the specified group.
|
|
func (n *Node) IsPeerInGroup(g state.GroupID, p state.PeerID) (bool, error) {
|
|
return n.peers.Exists(g, p)
|
|
}
|
|
|
|
func (n *Node) sendMessages() error {
|
|
err := n.syncState.Map(n.epoch, func(s state.State) state.State {
|
|
m := s.MessageID
|
|
p := s.PeerID
|
|
switch s.Type {
|
|
case state.OFFER:
|
|
n.payloads.AddOffers(p, m[:])
|
|
case state.REQUEST:
|
|
n.payloads.AddRequests(p, m[:])
|
|
n.logger.Debug("sending REQUEST",
|
|
zap.String("from", hex.EncodeToString(n.ID[:4])),
|
|
zap.String("to", hex.EncodeToString(p[:4])),
|
|
zap.String("messageID", hex.EncodeToString(m[:4])),
|
|
)
|
|
|
|
case state.MESSAGE:
|
|
g := *s.GroupID
|
|
// TODO: Handle errors
|
|
exist, err := n.IsPeerInGroup(g, p)
|
|
if err != nil {
|
|
return s
|
|
}
|
|
|
|
if !exist {
|
|
return s
|
|
}
|
|
|
|
msg, err := n.store.Get(m)
|
|
if err != nil {
|
|
n.logger.Error("Failed to retreive message",
|
|
zap.String("messageID", hex.EncodeToString(m[:4])),
|
|
zap.Error(err),
|
|
)
|
|
|
|
return s
|
|
}
|
|
|
|
n.payloads.AddMessages(p, msg)
|
|
n.logger.Debug("sending MESSAGE",
|
|
zap.String("groupID", hex.EncodeToString(g[:4])),
|
|
zap.String("from", hex.EncodeToString(n.ID[:4])),
|
|
zap.String("to", hex.EncodeToString(p[:4])),
|
|
zap.String("messageID", hex.EncodeToString(m[:4])),
|
|
)
|
|
|
|
}
|
|
|
|
return n.updateSendEpoch(s)
|
|
})
|
|
|
|
if err != nil {
|
|
n.logger.Error("error while mapping sync state", zap.Error(err))
|
|
return err
|
|
}
|
|
|
|
return n.payloads.MapAndClear(func(peer state.PeerID, payload protobuf.Payload) error {
|
|
err := n.transport.Send(n.ID, peer, payload)
|
|
if err != nil {
|
|
n.logger.Error("error sending message", zap.Error(err))
|
|
return err
|
|
}
|
|
return nil
|
|
})
|
|
|
|
}
|
|
|
|
func (n *Node) onPayload(sender state.PeerID, payload protobuf.Payload) {
|
|
// Acks, Requests and Offers are all arrays of bytes as protobuf doesn't allow type aliases otherwise arrays of messageIDs would be nicer.
|
|
if err := n.onAck(sender, payload.Acks); err != nil {
|
|
n.logger.Error("error processing acks", zap.Error(err))
|
|
}
|
|
if err := n.onRequest(sender, payload.Requests); err != nil {
|
|
n.logger.Error("error processing requests", zap.Error(err))
|
|
}
|
|
if err := n.onOffer(sender, payload.Offers); err != nil {
|
|
n.logger.Error("error processing offers", zap.Error(err))
|
|
}
|
|
messageIds := n.onMessages(sender, payload.Messages)
|
|
n.payloads.AddAcks(sender, messageIds)
|
|
}
|
|
|
|
func (n *Node) onOffer(sender state.PeerID, offers [][]byte) error {
|
|
for _, raw := range offers {
|
|
id := toMessageID(raw)
|
|
n.logger.Debug("OFFER received",
|
|
zap.String("from", hex.EncodeToString(sender[:4])),
|
|
zap.String("to", hex.EncodeToString(n.ID[:4])),
|
|
zap.String("messageID", hex.EncodeToString(id[:4])),
|
|
)
|
|
|
|
exist, err := n.store.Has(id)
|
|
// @todo maybe ack?
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if exist {
|
|
continue
|
|
}
|
|
|
|
n.insertSyncState(nil, id, sender, state.REQUEST)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (n *Node) onRequest(sender state.PeerID, requests [][]byte) error {
|
|
for _, raw := range requests {
|
|
id := toMessageID(raw)
|
|
n.logger.Debug("REQUEST received",
|
|
zap.String("from", hex.EncodeToString(sender[:4])),
|
|
zap.String("to", hex.EncodeToString(n.ID[:4])),
|
|
zap.String("messageID", hex.EncodeToString(id[:4])),
|
|
)
|
|
|
|
message, err := n.store.Get(id)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if message == nil {
|
|
n.logger.Error("message does not exist", zap.String("messageID", hex.EncodeToString(id[:4])))
|
|
continue
|
|
}
|
|
|
|
groupID := toGroupID(message.GroupId)
|
|
|
|
exist, err := n.IsPeerInGroup(groupID, sender)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if !exist {
|
|
n.logger.Error("peer is not in group",
|
|
zap.String("groupID", hex.EncodeToString(groupID[:4])),
|
|
zap.String("peer", hex.EncodeToString(sender[:4])),
|
|
)
|
|
continue
|
|
}
|
|
|
|
n.insertSyncState(&groupID, id, sender, state.MESSAGE)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (n *Node) onAck(sender state.PeerID, acks [][]byte) error {
|
|
for _, ack := range acks {
|
|
id := toMessageID(ack)
|
|
|
|
err := n.syncState.Remove(id, sender)
|
|
if err != nil {
|
|
n.logger.Error("Error while removing sync state.", zap.Error(err))
|
|
return err
|
|
}
|
|
|
|
n.logger.Debug("ACK received",
|
|
zap.String("from", hex.EncodeToString(sender[:4])),
|
|
zap.String("to", hex.EncodeToString(n.ID[:4])),
|
|
zap.String("messageID", hex.EncodeToString(id[:4])),
|
|
)
|
|
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (n *Node) onMessages(sender state.PeerID, messages []*protobuf.Message) [][]byte {
|
|
a := make([][]byte, 0)
|
|
|
|
for _, m := range messages {
|
|
groupID := toGroupID(m.GroupId)
|
|
err := n.onMessage(sender, *m)
|
|
if err != nil {
|
|
n.logger.Error("Error processing message", zap.Error(err))
|
|
continue
|
|
}
|
|
|
|
id := m.ID()
|
|
n.logger.Debug("sending ACK",
|
|
zap.String("groupID", hex.EncodeToString(groupID[:4])),
|
|
zap.String("from", hex.EncodeToString(n.ID[:4])),
|
|
zap.String("", hex.EncodeToString(sender[:4])),
|
|
zap.String("messageID", hex.EncodeToString(id[:4])),
|
|
)
|
|
|
|
a = append(a, id[:])
|
|
}
|
|
|
|
return a
|
|
}
|
|
|
|
func (n *Node) onMessage(sender state.PeerID, msg protobuf.Message) error {
|
|
id := msg.ID()
|
|
groupID := toGroupID(msg.GroupId)
|
|
n.logger.Debug("MESSAGE received",
|
|
zap.String("from", hex.EncodeToString(sender[:4])),
|
|
zap.String("to", hex.EncodeToString(n.ID[:4])),
|
|
zap.String("messageID", hex.EncodeToString(id[:4])),
|
|
)
|
|
|
|
err := n.syncState.Remove(id, sender)
|
|
if err != nil && err != state.ErrStateNotFound {
|
|
return err
|
|
}
|
|
|
|
err = n.store.Add(&msg)
|
|
if err != nil {
|
|
return err
|
|
// @todo process, should this function ever even have an error?
|
|
}
|
|
|
|
peers, err := n.peers.GetByGroupID(groupID)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
for _, peer := range peers {
|
|
if peer == sender {
|
|
continue
|
|
}
|
|
|
|
n.insertSyncState(&groupID, id, peer, state.OFFER)
|
|
}
|
|
|
|
if n.subscription != nil {
|
|
n.subscription <- msg
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (n *Node) insertSyncState(groupID *state.GroupID, messageID state.MessageID, peerID state.PeerID, t state.RecordType) {
|
|
s := state.State{
|
|
GroupID: groupID,
|
|
MessageID: messageID,
|
|
PeerID: peerID,
|
|
Type: t,
|
|
SendEpoch: n.epoch + 1,
|
|
}
|
|
|
|
err := n.syncState.Add(s)
|
|
if err != nil {
|
|
n.logger.Error("error setting sync states",
|
|
zap.Error(err),
|
|
zap.String("groupID", hex.EncodeToString(groupID[:4])),
|
|
zap.String("messageID", hex.EncodeToString(messageID[:4])),
|
|
zap.String("peerID", hex.EncodeToString(peerID[:4])),
|
|
)
|
|
|
|
}
|
|
}
|
|
|
|
func (n *Node) updateSendEpoch(s state.State) state.State {
|
|
s.SendCount += 1
|
|
s.SendEpoch += n.nextEpoch(s.SendCount, n.epoch)
|
|
return s
|
|
}
|
|
|
|
func toMessageID(b []byte) state.MessageID {
|
|
var id state.MessageID
|
|
copy(id[:], b)
|
|
return id
|
|
}
|
|
|
|
func toGroupID(b []byte) state.GroupID {
|
|
var id state.GroupID
|
|
copy(id[:], b)
|
|
return id
|
|
}
|