mirror of
https://github.com/status-im/status-go.git
synced 2025-01-09 14:16:21 +00:00
6ad047d56f
This commit fixes a bug on the mvds library where the nextEpoch would be incorrectly summed to the retry time, resulting in messages not being retried, or retried much less frequently the longer the app was running. It also updates the retry timing to backoff exponentially at multiple of 30 seconds.
563 lines
13 KiB
Go
563 lines
13 KiB
Go
// Package node contains node logic.
|
|
package node
|
|
|
|
// @todo this is a very rough implementation that needs cleanup
|
|
|
|
import (
|
|
"context"
|
|
"database/sql"
|
|
"encoding/hex"
|
|
"fmt"
|
|
"sync/atomic"
|
|
"time"
|
|
|
|
"go.uber.org/zap"
|
|
|
|
"github.com/vacp2p/mvds/peers"
|
|
"github.com/vacp2p/mvds/protobuf"
|
|
"github.com/vacp2p/mvds/state"
|
|
"github.com/vacp2p/mvds/store"
|
|
"github.com/vacp2p/mvds/transport"
|
|
)
|
|
|
|
// Mode represents the synchronization mode.
|
|
type Mode int
|
|
|
|
const (
|
|
INTERACTIVE Mode = iota
|
|
BATCH
|
|
)
|
|
|
|
// CalculateNextEpoch is a function used to calculate the next `SendEpoch` for a given message.
|
|
type CalculateNextEpoch func(count uint64, epoch int64) int64
|
|
|
|
// Node represents an MVDS node, it runs all the logic like sending and receiving protocol messages.
|
|
type Node struct {
|
|
// This needs to be declared first: https://github.com/golang/go/issues/9959
|
|
epoch int64
|
|
ctx context.Context
|
|
cancel context.CancelFunc
|
|
|
|
store store.MessageStore
|
|
transport transport.Transport
|
|
|
|
syncState state.SyncState
|
|
|
|
peers peers.Persistence
|
|
|
|
payloads payloads
|
|
|
|
nextEpoch CalculateNextEpoch
|
|
|
|
ID state.PeerID
|
|
|
|
epochPersistence *epochSQLitePersistence
|
|
mode Mode
|
|
|
|
subscription chan protobuf.Message
|
|
|
|
logger *zap.Logger
|
|
}
|
|
|
|
func NewPersistentNode(
|
|
db *sql.DB,
|
|
st transport.Transport,
|
|
id state.PeerID,
|
|
mode Mode,
|
|
nextEpoch CalculateNextEpoch,
|
|
logger *zap.Logger,
|
|
) (*Node, error) {
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
if logger == nil {
|
|
logger = zap.NewNop()
|
|
}
|
|
|
|
node := Node{
|
|
ID: id,
|
|
ctx: ctx,
|
|
cancel: cancel,
|
|
store: store.NewPersistentMessageStore(db),
|
|
transport: st,
|
|
peers: peers.NewSQLitePersistence(db),
|
|
syncState: state.NewPersistentSyncState(db),
|
|
payloads: newPayloads(),
|
|
epochPersistence: newEpochSQLitePersistence(db),
|
|
nextEpoch: nextEpoch,
|
|
logger: logger.With(zap.Namespace("mvds")),
|
|
mode: mode,
|
|
}
|
|
if currentEpoch, err := node.epochPersistence.Get(id); err != nil {
|
|
return nil, err
|
|
} else {
|
|
node.epoch = currentEpoch
|
|
}
|
|
return &node, nil
|
|
}
|
|
|
|
func NewEphemeralNode(
|
|
id state.PeerID,
|
|
t transport.Transport,
|
|
nextEpoch CalculateNextEpoch,
|
|
currentEpoch int64,
|
|
mode Mode,
|
|
logger *zap.Logger,
|
|
) *Node {
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
if logger == nil {
|
|
logger = zap.NewNop()
|
|
}
|
|
|
|
return &Node{
|
|
ID: id,
|
|
ctx: ctx,
|
|
cancel: cancel,
|
|
store: store.NewDummyStore(),
|
|
transport: t,
|
|
syncState: state.NewSyncState(),
|
|
peers: peers.NewMemoryPersistence(),
|
|
payloads: newPayloads(),
|
|
nextEpoch: nextEpoch,
|
|
epoch: currentEpoch,
|
|
logger: logger.With(zap.Namespace("mvds")),
|
|
mode: mode,
|
|
}
|
|
}
|
|
|
|
// NewNode returns a new node.
|
|
func NewNode(
|
|
ms store.MessageStore,
|
|
st transport.Transport,
|
|
ss state.SyncState,
|
|
nextEpoch CalculateNextEpoch,
|
|
currentEpoch int64,
|
|
id state.PeerID,
|
|
mode Mode,
|
|
pp peers.Persistence,
|
|
logger *zap.Logger,
|
|
) *Node {
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
if logger == nil {
|
|
logger = zap.NewNop()
|
|
}
|
|
|
|
return &Node{
|
|
ctx: ctx,
|
|
cancel: cancel,
|
|
store: ms,
|
|
transport: st,
|
|
syncState: ss,
|
|
peers: pp,
|
|
payloads: newPayloads(),
|
|
nextEpoch: nextEpoch,
|
|
ID: id,
|
|
epoch: currentEpoch,
|
|
logger: logger.With(zap.Namespace("mvds")),
|
|
mode: mode,
|
|
}
|
|
}
|
|
|
|
func (n *Node) CurrentEpoch() int64 {
|
|
return atomic.LoadInt64(&n.epoch)
|
|
}
|
|
|
|
// Start listens for new messages received by the node and sends out those required every epoch.
|
|
func (n *Node) Start(duration time.Duration) {
|
|
go func() {
|
|
for {
|
|
select {
|
|
case <-n.ctx.Done():
|
|
n.logger.Info("Watch stopped")
|
|
return
|
|
default:
|
|
p := n.transport.Watch()
|
|
go n.onPayload(p.Sender, p.Payload)
|
|
}
|
|
}
|
|
}()
|
|
|
|
go func() {
|
|
for {
|
|
select {
|
|
case <-n.ctx.Done():
|
|
n.logger.Info("Epoch processing stopped")
|
|
return
|
|
default:
|
|
n.logger.Debug("Epoch processing", zap.String("node", hex.EncodeToString(n.ID[:4])), zap.Int64("epoch", n.epoch))
|
|
time.Sleep(duration)
|
|
err := n.sendMessages()
|
|
if err != nil {
|
|
n.logger.Error("Error sending messages.", zap.Error(err))
|
|
}
|
|
atomic.AddInt64(&n.epoch, 1)
|
|
// When a persistent node is used, the epoch needs to be saved.
|
|
if n.epochPersistence != nil {
|
|
if err := n.epochPersistence.Set(n.ID, n.epoch); err != nil {
|
|
n.logger.Error("Failed to persisten epoch", zap.Error(err))
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}()
|
|
}
|
|
|
|
// Stop message reading and epoch processing
|
|
func (n *Node) Stop() {
|
|
n.logger.Info("Stopping node")
|
|
n.Unsubscribe()
|
|
n.cancel()
|
|
}
|
|
|
|
// Subscribe subscribes to incoming messages.
|
|
func (n *Node) Subscribe() chan protobuf.Message {
|
|
n.subscription = make(chan protobuf.Message)
|
|
return n.subscription
|
|
}
|
|
|
|
// Unsubscribe closes the listening channels
|
|
func (n *Node) Unsubscribe() {
|
|
if n.subscription != nil {
|
|
close(n.subscription)
|
|
}
|
|
n.subscription = nil
|
|
}
|
|
|
|
// AppendMessage sends a message to a given group.
|
|
func (n *Node) AppendMessage(groupID state.GroupID, data []byte) (state.MessageID, error) {
|
|
m := protobuf.Message{
|
|
GroupId: groupID[:],
|
|
Timestamp: time.Now().Unix(),
|
|
Body: data,
|
|
}
|
|
|
|
id := m.ID()
|
|
|
|
peers, err := n.peers.GetByGroupID(groupID)
|
|
if err != nil {
|
|
return state.MessageID{}, fmt.Errorf("trying to send to unknown group %x", groupID[:4])
|
|
}
|
|
|
|
err = n.store.Add(&m)
|
|
if err != nil {
|
|
return state.MessageID{}, err
|
|
}
|
|
|
|
for _, p := range peers {
|
|
t := state.OFFER
|
|
if n.mode == BATCH {
|
|
t = state.MESSAGE
|
|
}
|
|
|
|
n.insertSyncState(&groupID, id, p, t)
|
|
}
|
|
|
|
n.logger.Debug("Sending message",
|
|
zap.String("node", hex.EncodeToString(n.ID[:4])),
|
|
zap.String("groupID", hex.EncodeToString(groupID[:4])),
|
|
zap.String("id", hex.EncodeToString(id[:4])))
|
|
// @todo think about a way to insta trigger send messages when send was selected, we don't wanna wait for ticks here
|
|
|
|
return id, nil
|
|
}
|
|
|
|
// RequestMessage adds a REQUEST record to the next payload for a given message ID.
|
|
func (n *Node) RequestMessage(group state.GroupID, id state.MessageID) error {
|
|
peers, err := n.peers.GetByGroupID(group)
|
|
if err != nil {
|
|
return fmt.Errorf("trying to request from an unknown group %x", group[:4])
|
|
}
|
|
|
|
for _, p := range peers {
|
|
exist, err := n.IsPeerInGroup(group, p)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if exist {
|
|
continue
|
|
}
|
|
|
|
n.insertSyncState(&group, id, p, state.REQUEST)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// AddPeer adds a peer to a specific group making it a recipient of messages.
|
|
func (n *Node) AddPeer(group state.GroupID, id state.PeerID) error {
|
|
return n.peers.Add(group, id)
|
|
}
|
|
|
|
// IsPeerInGroup checks whether a peer is in the specified group.
|
|
func (n *Node) IsPeerInGroup(g state.GroupID, p state.PeerID) (bool, error) {
|
|
return n.peers.Exists(g, p)
|
|
}
|
|
|
|
func (n *Node) sendMessages() error {
|
|
err := n.syncState.Map(n.epoch, func(s state.State) state.State {
|
|
m := s.MessageID
|
|
p := s.PeerID
|
|
switch s.Type {
|
|
case state.OFFER:
|
|
n.payloads.AddOffers(p, m[:])
|
|
case state.REQUEST:
|
|
n.payloads.AddRequests(p, m[:])
|
|
n.logger.Debug("sending REQUEST",
|
|
zap.String("from", hex.EncodeToString(n.ID[:4])),
|
|
zap.String("to", hex.EncodeToString(p[:4])),
|
|
zap.String("messageID", hex.EncodeToString(m[:4])),
|
|
)
|
|
|
|
case state.MESSAGE:
|
|
g := *s.GroupID
|
|
// TODO: Handle errors
|
|
exist, err := n.IsPeerInGroup(g, p)
|
|
if err != nil {
|
|
return s
|
|
}
|
|
|
|
if !exist {
|
|
return s
|
|
}
|
|
|
|
msg, err := n.store.Get(m)
|
|
if err != nil {
|
|
n.logger.Error("Failed to retreive message",
|
|
zap.String("messageID", hex.EncodeToString(m[:4])),
|
|
zap.Error(err),
|
|
)
|
|
|
|
return s
|
|
}
|
|
|
|
n.payloads.AddMessages(p, msg)
|
|
n.logger.Debug("sending MESSAGE",
|
|
zap.String("groupID", hex.EncodeToString(g[:4])),
|
|
zap.String("from", hex.EncodeToString(n.ID[:4])),
|
|
zap.String("to", hex.EncodeToString(p[:4])),
|
|
zap.String("messageID", hex.EncodeToString(m[:4])),
|
|
)
|
|
|
|
}
|
|
|
|
return n.updateSendEpoch(s)
|
|
})
|
|
|
|
if err != nil {
|
|
n.logger.Error("error while mapping sync state", zap.Error(err))
|
|
return err
|
|
}
|
|
|
|
return n.payloads.MapAndClear(func(peer state.PeerID, payload protobuf.Payload) error {
|
|
err := n.transport.Send(n.ID, peer, payload)
|
|
if err != nil {
|
|
n.logger.Error("error sending message", zap.Error(err))
|
|
return err
|
|
}
|
|
return nil
|
|
})
|
|
|
|
}
|
|
|
|
func (n *Node) onPayload(sender state.PeerID, payload protobuf.Payload) {
|
|
// Acks, Requests and Offers are all arrays of bytes as protobuf doesn't allow type aliases otherwise arrays of messageIDs would be nicer.
|
|
if err := n.onAck(sender, payload.Acks); err != nil {
|
|
n.logger.Error("error processing acks", zap.Error(err))
|
|
}
|
|
if err := n.onRequest(sender, payload.Requests); err != nil {
|
|
n.logger.Error("error processing requests", zap.Error(err))
|
|
}
|
|
if err := n.onOffer(sender, payload.Offers); err != nil {
|
|
n.logger.Error("error processing offers", zap.Error(err))
|
|
}
|
|
messageIds := n.onMessages(sender, payload.Messages)
|
|
n.payloads.AddAcks(sender, messageIds)
|
|
}
|
|
|
|
func (n *Node) onOffer(sender state.PeerID, offers [][]byte) error {
|
|
for _, raw := range offers {
|
|
id := toMessageID(raw)
|
|
n.logger.Debug("OFFER received",
|
|
zap.String("from", hex.EncodeToString(sender[:4])),
|
|
zap.String("to", hex.EncodeToString(n.ID[:4])),
|
|
zap.String("messageID", hex.EncodeToString(id[:4])),
|
|
)
|
|
|
|
exist, err := n.store.Has(id)
|
|
// @todo maybe ack?
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if exist {
|
|
continue
|
|
}
|
|
|
|
n.insertSyncState(nil, id, sender, state.REQUEST)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (n *Node) onRequest(sender state.PeerID, requests [][]byte) error {
|
|
for _, raw := range requests {
|
|
id := toMessageID(raw)
|
|
n.logger.Debug("REQUEST received",
|
|
zap.String("from", hex.EncodeToString(sender[:4])),
|
|
zap.String("to", hex.EncodeToString(n.ID[:4])),
|
|
zap.String("messageID", hex.EncodeToString(id[:4])),
|
|
)
|
|
|
|
message, err := n.store.Get(id)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if message == nil {
|
|
n.logger.Error("message does not exist", zap.String("messageID", hex.EncodeToString(id[:4])))
|
|
continue
|
|
}
|
|
|
|
groupID := toGroupID(message.GroupId)
|
|
|
|
exist, err := n.IsPeerInGroup(groupID, sender)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if !exist {
|
|
n.logger.Error("peer is not in group",
|
|
zap.String("groupID", hex.EncodeToString(groupID[:4])),
|
|
zap.String("peer", hex.EncodeToString(sender[:4])),
|
|
)
|
|
continue
|
|
}
|
|
|
|
n.insertSyncState(&groupID, id, sender, state.MESSAGE)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (n *Node) onAck(sender state.PeerID, acks [][]byte) error {
|
|
for _, ack := range acks {
|
|
id := toMessageID(ack)
|
|
|
|
err := n.syncState.Remove(id, sender)
|
|
if err != nil {
|
|
n.logger.Error("Error while removing sync state.", zap.Error(err))
|
|
return err
|
|
}
|
|
|
|
n.logger.Debug("ACK received",
|
|
zap.String("from", hex.EncodeToString(sender[:4])),
|
|
zap.String("to", hex.EncodeToString(n.ID[:4])),
|
|
zap.String("messageID", hex.EncodeToString(id[:4])),
|
|
)
|
|
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (n *Node) onMessages(sender state.PeerID, messages []*protobuf.Message) [][]byte {
|
|
a := make([][]byte, 0)
|
|
|
|
for _, m := range messages {
|
|
groupID := toGroupID(m.GroupId)
|
|
err := n.onMessage(sender, *m)
|
|
if err != nil {
|
|
n.logger.Error("Error processing message", zap.Error(err))
|
|
continue
|
|
}
|
|
|
|
id := m.ID()
|
|
n.logger.Debug("sending ACK",
|
|
zap.String("groupID", hex.EncodeToString(groupID[:4])),
|
|
zap.String("from", hex.EncodeToString(n.ID[:4])),
|
|
zap.String("", hex.EncodeToString(sender[:4])),
|
|
zap.String("messageID", hex.EncodeToString(id[:4])),
|
|
)
|
|
|
|
a = append(a, id[:])
|
|
}
|
|
|
|
return a
|
|
}
|
|
|
|
func (n *Node) onMessage(sender state.PeerID, msg protobuf.Message) error {
|
|
id := msg.ID()
|
|
groupID := toGroupID(msg.GroupId)
|
|
n.logger.Debug("MESSAGE received",
|
|
zap.String("from", hex.EncodeToString(sender[:4])),
|
|
zap.String("to", hex.EncodeToString(n.ID[:4])),
|
|
zap.String("messageID", hex.EncodeToString(id[:4])),
|
|
)
|
|
|
|
err := n.syncState.Remove(id, sender)
|
|
if err != nil && err != state.ErrStateNotFound {
|
|
return err
|
|
}
|
|
|
|
err = n.store.Add(&msg)
|
|
if err != nil {
|
|
return err
|
|
// @todo process, should this function ever even have an error?
|
|
}
|
|
|
|
peers, err := n.peers.GetByGroupID(groupID)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
for _, peer := range peers {
|
|
if peer == sender {
|
|
continue
|
|
}
|
|
|
|
n.insertSyncState(&groupID, id, peer, state.OFFER)
|
|
}
|
|
|
|
if n.subscription != nil {
|
|
n.subscription <- msg
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (n *Node) insertSyncState(groupID *state.GroupID, messageID state.MessageID, peerID state.PeerID, t state.RecordType) {
|
|
s := state.State{
|
|
GroupID: groupID,
|
|
MessageID: messageID,
|
|
PeerID: peerID,
|
|
Type: t,
|
|
SendEpoch: n.epoch + 1,
|
|
}
|
|
|
|
err := n.syncState.Add(s)
|
|
if err != nil {
|
|
n.logger.Error("error setting sync states",
|
|
zap.Error(err),
|
|
zap.String("groupID", hex.EncodeToString(groupID[:4])),
|
|
zap.String("messageID", hex.EncodeToString(messageID[:4])),
|
|
zap.String("peerID", hex.EncodeToString(peerID[:4])),
|
|
)
|
|
|
|
}
|
|
}
|
|
|
|
func (n *Node) updateSendEpoch(s state.State) state.State {
|
|
s.SendCount += 1
|
|
s.SendEpoch = n.nextEpoch(s.SendCount, n.epoch)
|
|
return s
|
|
}
|
|
|
|
func toMessageID(b []byte) state.MessageID {
|
|
var id state.MessageID
|
|
copy(id[:], b)
|
|
return id
|
|
}
|
|
|
|
func toGroupID(b []byte) state.GroupID {
|
|
var id state.GroupID
|
|
copy(id[:], b)
|
|
return id
|
|
}
|