pass logger to mvds

This commit is contained in:
Andrea Maria Piana 2019-08-28 10:54:10 +02:00
parent 1646cd6771
commit 822d18916e
No known key found for this signature in database
GPG Key ID: AA6CCA6DE0E06424
6 changed files with 106 additions and 33 deletions

View File

@ -119,6 +119,7 @@ func (s *AdaptersSuite) SetupTest() {
datasyncpeer.PublicKeyToPeerID(s.privateKey.PublicKey), datasyncpeer.PublicKeyToPeerID(s.privateKey.PublicKey),
datasyncnode.BATCH, datasyncnode.BATCH,
datasync.CalculateSendTime, datasync.CalculateSendTime,
logger,
) )
s.Require().NoError(err) s.Require().NoError(err)

4
go.mod
View File

@ -20,9 +20,7 @@ require (
github.com/status-im/migrate/v4 v4.0.0-20190821140204-a9d340ec8fb76af4afda06acf01740d45d2661ed github.com/status-im/migrate/v4 v4.0.0-20190821140204-a9d340ec8fb76af4afda06acf01740d45d2661ed
github.com/status-im/whisper v1.4.14 github.com/status-im/whisper v1.4.14
github.com/stretchr/testify v1.3.1-0.20190712000136-221dbe5ed467 github.com/stretchr/testify v1.3.1-0.20190712000136-221dbe5ed467
github.com/vacp2p/mvds v0.0.0-20190705123435-a8dc37599bf99958b0ea19431c9bb7a47bd7fa8f github.com/vacp2p/mvds v0.0.0-20190705123435-943ab5e228078f6389dffe00946c0d0a5b495a5c
go.uber.org/atomic v1.4.0 // indirect
go.uber.org/multierr v1.1.0 // indirect
go.uber.org/zap v1.10.0 go.uber.org/zap v1.10.0
golang.org/x/crypto v0.0.0-20190701094942-4def268fd1a4 golang.org/x/crypto v0.0.0-20190701094942-4def268fd1a4
golang.org/x/net v0.0.0-20190628185345-da137c7871d7 // indirect golang.org/x/net v0.0.0-20190628185345-da137c7871d7 // indirect

4
go.sum
View File

@ -252,8 +252,8 @@ github.com/syndtr/goleveldb v0.0.0-20181128100959-b001fa50d6b2/go.mod h1:Z4AUp2K
github.com/tidwall/pretty v0.0.0-20180105212114-65a9db5fad51/go.mod h1:XNkn88O1ChpSDQmQeStsy+sBenx6DDtFZJxhVysOjyk= github.com/tidwall/pretty v0.0.0-20180105212114-65a9db5fad51/go.mod h1:XNkn88O1ChpSDQmQeStsy+sBenx6DDtFZJxhVysOjyk=
github.com/uber/jaeger-client-go v0.0.0-20180607151842-f7e0d4744fa6/go.mod h1:WVhlPFC8FDjOFMMWRy2pZqQJSXxYSwNYOkTr/Z6d3Kk= github.com/uber/jaeger-client-go v0.0.0-20180607151842-f7e0d4744fa6/go.mod h1:WVhlPFC8FDjOFMMWRy2pZqQJSXxYSwNYOkTr/Z6d3Kk=
github.com/uber/jaeger-lib v0.0.0-20180615202729-a51202d6f4a7/go.mod h1:ComeNDZlWwrWnDv8aPp0Ba6+uUTzImX/AauajbLI56U= github.com/uber/jaeger-lib v0.0.0-20180615202729-a51202d6f4a7/go.mod h1:ComeNDZlWwrWnDv8aPp0Ba6+uUTzImX/AauajbLI56U=
github.com/vacp2p/mvds v0.0.0-20190705123435-a8dc37599bf99958b0ea19431c9bb7a47bd7fa8f h1:iBJcVbX4RaPtufXJ/PJtCF1jYPzlmyEF+imcQq5pTDs= github.com/vacp2p/mvds v0.0.0-20190705123435-943ab5e228078f6389dffe00946c0d0a5b495a5c h1:O7gT6vNipoBxFe19iWtDyUjIcIbkJ5MYhMXLS+RRCFs=
github.com/vacp2p/mvds v0.0.0-20190705123435-a8dc37599bf99958b0ea19431c9bb7a47bd7fa8f/go.mod h1:Xc3UWtA230lUzZoXStJHQd/BkqJK5BAZyXBsiR6XrXM= github.com/vacp2p/mvds v0.0.0-20190705123435-943ab5e228078f6389dffe00946c0d0a5b495a5c/go.mod h1:pIqr2Hg4cIkTJniGPCp4ptong2jxgxx6uToVoY94+II=
github.com/xanzy/go-gitlab v0.15.0/go.mod h1:8zdQa/ri1dfn8eS3Ir1SyfvOKlw7WBJ8DVThkpGiXrs= github.com/xanzy/go-gitlab v0.15.0/go.mod h1:8zdQa/ri1dfn8eS3Ir1SyfvOKlw7WBJ8DVThkpGiXrs=
github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c/go.mod h1:lB8K/P019DLNhemzwFU4jHLhdvlE6uDZjXFejJXr49I= github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c/go.mod h1:lB8K/P019DLNhemzwFU4jHLhdvlE6uDZjXFejJXr49I=
github.com/xdg/stringprep v1.0.0/go.mod h1:Jhud4/sHMO4oL310DaZAKk9ZaJ08SJfe+sJh0HrGL1Y= github.com/xdg/stringprep v1.0.0/go.mod h1:Jhud4/sHMO4oL310DaZAKk9ZaJ08SJfe+sJh0HrGL1Y=

View File

@ -273,6 +273,7 @@ func NewMessenger(
datasyncpeer.PublicKeyToPeerID(identity.PublicKey), datasyncpeer.PublicKeyToPeerID(identity.PublicKey),
datasyncnode.BATCH, datasyncnode.BATCH,
datasync.CalculateSendTime, datasync.CalculateSendTime,
logger,
) )
if err != nil { if err != nil {
return nil, errors.Wrap(err, "failed to create a persistent datasync node") return nil, errors.Wrap(err, "failed to create a persistent datasync node")

View File

@ -6,11 +6,13 @@ package node
import ( import (
"context" "context"
"database/sql" "database/sql"
"encoding/hex"
"fmt" "fmt"
"log"
"sync/atomic" "sync/atomic"
"time" "time"
"go.uber.org/zap"
"github.com/vacp2p/mvds/peers" "github.com/vacp2p/mvds/peers"
"github.com/vacp2p/mvds/protobuf" "github.com/vacp2p/mvds/protobuf"
"github.com/vacp2p/mvds/state" "github.com/vacp2p/mvds/state"
@ -31,6 +33,8 @@ type CalculateNextEpoch func(count uint64, epoch int64) int64
// Node represents an MVDS node, it runs all the logic like sending and receiving protocol messages. // Node represents an MVDS node, it runs all the logic like sending and receiving protocol messages.
type Node struct { type Node struct {
// This needs to be declared first: https://github.com/golang/go/issues/9959
epoch int64
ctx context.Context ctx context.Context
cancel context.CancelFunc cancel context.CancelFunc
@ -48,10 +52,11 @@ type Node struct {
ID state.PeerID ID state.PeerID
epochPersistence *epochSQLitePersistence epochPersistence *epochSQLitePersistence
epoch int64
mode Mode mode Mode
subscription chan protobuf.Message subscription chan protobuf.Message
logger *zap.Logger
} }
func NewPersistentNode( func NewPersistentNode(
@ -60,8 +65,13 @@ func NewPersistentNode(
id state.PeerID, id state.PeerID,
mode Mode, mode Mode,
nextEpoch CalculateNextEpoch, nextEpoch CalculateNextEpoch,
logger *zap.Logger,
) (*Node, error) { ) (*Node, error) {
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
if logger == nil {
logger = zap.NewNop()
}
node := Node{ node := Node{
ID: id, ID: id,
ctx: ctx, ctx: ctx,
@ -73,6 +83,7 @@ func NewPersistentNode(
payloads: newPayloads(), payloads: newPayloads(),
epochPersistence: newEpochSQLitePersistence(db), epochPersistence: newEpochSQLitePersistence(db),
nextEpoch: nextEpoch, nextEpoch: nextEpoch,
logger: logger.With(zap.Namespace("mvds")),
mode: mode, mode: mode,
} }
if currentEpoch, err := node.epochPersistence.Get(id); err != nil { if currentEpoch, err := node.epochPersistence.Get(id); err != nil {
@ -89,8 +100,13 @@ func NewEphemeralNode(
nextEpoch CalculateNextEpoch, nextEpoch CalculateNextEpoch,
currentEpoch int64, currentEpoch int64,
mode Mode, mode Mode,
logger *zap.Logger,
) *Node { ) *Node {
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
if logger == nil {
logger = zap.NewNop()
}
return &Node{ return &Node{
ID: id, ID: id,
ctx: ctx, ctx: ctx,
@ -102,6 +118,7 @@ func NewEphemeralNode(
payloads: newPayloads(), payloads: newPayloads(),
nextEpoch: nextEpoch, nextEpoch: nextEpoch,
epoch: currentEpoch, epoch: currentEpoch,
logger: logger.With(zap.Namespace("mvds")),
mode: mode, mode: mode,
} }
} }
@ -116,8 +133,13 @@ func NewNode(
id state.PeerID, id state.PeerID,
mode Mode, mode Mode,
pp peers.Persistence, pp peers.Persistence,
logger *zap.Logger,
) *Node { ) *Node {
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
if logger == nil {
logger = zap.NewNop()
}
return &Node{ return &Node{
ctx: ctx, ctx: ctx,
cancel: cancel, cancel: cancel,
@ -129,6 +151,7 @@ func NewNode(
nextEpoch: nextEpoch, nextEpoch: nextEpoch,
ID: id, ID: id,
epoch: currentEpoch, epoch: currentEpoch,
logger: logger.With(zap.Namespace("mvds")),
mode: mode, mode: mode,
} }
} }
@ -143,7 +166,7 @@ func (n *Node) Start(duration time.Duration) {
for { for {
select { select {
case <-n.ctx.Done(): case <-n.ctx.Done():
log.Print("Watch stopped") n.logger.Info("Watch stopped")
return return
default: default:
p := n.transport.Watch() p := n.transport.Watch()
@ -156,20 +179,20 @@ func (n *Node) Start(duration time.Duration) {
for { for {
select { select {
case <-n.ctx.Done(): case <-n.ctx.Done():
log.Print("Epoch processing stopped") n.logger.Info("Epoch processing stopped")
return return
default: default:
log.Printf("Node: %x Epoch: %d", n.ID[:4], n.epoch) n.logger.Debug("Epoch processing", zap.String("node", hex.EncodeToString(n.ID[:4])), zap.Int64("epoch", n.epoch))
time.Sleep(duration) time.Sleep(duration)
err := n.sendMessages() err := n.sendMessages()
if err != nil { if err != nil {
log.Printf("Error sending messages: %+v\n", err) n.logger.Error("Error sending messages.", zap.Error(err))
} }
atomic.AddInt64(&n.epoch, 1) atomic.AddInt64(&n.epoch, 1)
// When a persistent node is used, the epoch needs to be saved. // When a persistent node is used, the epoch needs to be saved.
if n.epochPersistence != nil { if n.epochPersistence != nil {
if err := n.epochPersistence.Set(n.ID, n.epoch); err != nil { if err := n.epochPersistence.Set(n.ID, n.epoch); err != nil {
log.Printf("Failed to persisten epoch: %v", err) n.logger.Error("Failed to persisten epoch", zap.Error(err))
} }
} }
} }
@ -179,7 +202,7 @@ func (n *Node) Start(duration time.Duration) {
// Stop message reading and epoch processing // Stop message reading and epoch processing
func (n *Node) Stop() { func (n *Node) Stop() {
log.Print("Stopping node") n.logger.Info("Stopping node")
n.Unsubscribe() n.Unsubscribe()
n.cancel() n.cancel()
} }
@ -227,7 +250,10 @@ func (n *Node) AppendMessage(groupID state.GroupID, data []byte) (state.MessageI
n.insertSyncState(&groupID, id, p, t) n.insertSyncState(&groupID, id, p, t)
} }
log.Printf("[%x] node %x sending %x\n", groupID[:4], n.ID[:4], id[:4]) n.logger.Debug("Sending message",
zap.String("node", hex.EncodeToString(n.ID[:4])),
zap.String("groupID", hex.EncodeToString(groupID[:4])),
zap.String("id", hex.EncodeToString(id[:4])))
// @todo think about a way to insta trigger send messages when send was selected, we don't wanna wait for ticks here // @todo think about a way to insta trigger send messages when send was selected, we don't wanna wait for ticks here
return id, nil return id, nil
@ -275,7 +301,12 @@ func (n *Node) sendMessages() error {
n.payloads.AddOffers(p, m[:]) n.payloads.AddOffers(p, m[:])
case state.REQUEST: case state.REQUEST:
n.payloads.AddRequests(p, m[:]) n.payloads.AddRequests(p, m[:])
log.Printf("sending REQUEST (%x -> %x): %x\n", n.ID[:4], p[:4], m[:4]) n.logger.Debug("sending REQUEST",
zap.String("from", hex.EncodeToString(n.ID[:4])),
zap.String("to", hex.EncodeToString(p[:4])),
zap.String("messageID", hex.EncodeToString(m[:4])),
)
case state.MESSAGE: case state.MESSAGE:
g := *s.GroupID g := *s.GroupID
// TODO: Handle errors // TODO: Handle errors
@ -290,26 +321,36 @@ func (n *Node) sendMessages() error {
msg, err := n.store.Get(m) msg, err := n.store.Get(m)
if err != nil { if err != nil {
log.Printf("failed to retreive message %x %s", m[:4], err.Error()) n.logger.Error("Failed to retreive message",
zap.String("messageID", hex.EncodeToString(m[:4])),
zap.Error(err),
)
return s return s
} }
n.payloads.AddMessages(p, msg) n.payloads.AddMessages(p, msg)
log.Printf("[%x] sending MESSAGE (%x -> %x): %x\n", g[:4], n.ID[:4], p[:4], m[:4]) n.logger.Debug("sending MESSAGE",
zap.String("groupID", hex.EncodeToString(g[:4])),
zap.String("from", hex.EncodeToString(n.ID[:4])),
zap.String("to", hex.EncodeToString(p[:4])),
zap.String("messageID", hex.EncodeToString(m[:4])),
)
} }
return n.updateSendEpoch(s) return n.updateSendEpoch(s)
}) })
if err != nil { if err != nil {
log.Printf("error while mapping sync state: %s", err.Error()) n.logger.Error("error while mapping sync state", zap.Error(err))
return err return err
} }
return n.payloads.MapAndClear(func(peer state.PeerID, payload protobuf.Payload) error { return n.payloads.MapAndClear(func(peer state.PeerID, payload protobuf.Payload) error {
err := n.transport.Send(n.ID, peer, payload) err := n.transport.Send(n.ID, peer, payload)
if err != nil { if err != nil {
log.Printf("error sending message: %s", err.Error()) n.logger.Error("error sending message", zap.Error(err))
return err return err
} }
return nil return nil
@ -320,13 +361,13 @@ func (n *Node) sendMessages() error {
func (n *Node) onPayload(sender state.PeerID, payload protobuf.Payload) { func (n *Node) onPayload(sender state.PeerID, payload protobuf.Payload) {
// Acks, Requests and Offers are all arrays of bytes as protobuf doesn't allow type aliases otherwise arrays of messageIDs would be nicer. // Acks, Requests and Offers are all arrays of bytes as protobuf doesn't allow type aliases otherwise arrays of messageIDs would be nicer.
if err := n.onAck(sender, payload.Acks); err != nil { if err := n.onAck(sender, payload.Acks); err != nil {
log.Printf("error processing acks: %s", err.Error()) n.logger.Error("error processing acks", zap.Error(err))
} }
if err := n.onRequest(sender, payload.Requests); err != nil { if err := n.onRequest(sender, payload.Requests); err != nil {
log.Printf("error processing requests: %s", err.Error()) n.logger.Error("error processing requests", zap.Error(err))
} }
if err := n.onOffer(sender, payload.Offers); err != nil { if err := n.onOffer(sender, payload.Offers); err != nil {
log.Printf("error processing offers: %s", err.Error()) n.logger.Error("error processing offers", zap.Error(err))
} }
messageIds := n.onMessages(sender, payload.Messages) messageIds := n.onMessages(sender, payload.Messages)
n.payloads.AddAcks(sender, messageIds) n.payloads.AddAcks(sender, messageIds)
@ -335,7 +376,11 @@ func (n *Node) onPayload(sender state.PeerID, payload protobuf.Payload) {
func (n *Node) onOffer(sender state.PeerID, offers [][]byte) error { func (n *Node) onOffer(sender state.PeerID, offers [][]byte) error {
for _, raw := range offers { for _, raw := range offers {
id := toMessageID(raw) id := toMessageID(raw)
log.Printf("OFFER (%x -> %x): %x received.\n", sender[:4], n.ID[:4], id[:4]) n.logger.Debug("OFFER received",
zap.String("from", hex.EncodeToString(sender[:4])),
zap.String("to", hex.EncodeToString(n.ID[:4])),
zap.String("messageID", hex.EncodeToString(id[:4])),
)
exist, err := n.store.Has(id) exist, err := n.store.Has(id)
// @todo maybe ack? // @todo maybe ack?
@ -355,7 +400,11 @@ func (n *Node) onOffer(sender state.PeerID, offers [][]byte) error {
func (n *Node) onRequest(sender state.PeerID, requests [][]byte) error { func (n *Node) onRequest(sender state.PeerID, requests [][]byte) error {
for _, raw := range requests { for _, raw := range requests {
id := toMessageID(raw) id := toMessageID(raw)
log.Printf("REQUEST (%x -> %x): %x received.\n", sender[:4], n.ID[:4], id[:4]) n.logger.Debug("REQUEST received",
zap.String("from", hex.EncodeToString(sender[:4])),
zap.String("to", hex.EncodeToString(n.ID[:4])),
zap.String("messageID", hex.EncodeToString(id[:4])),
)
message, err := n.store.Get(id) message, err := n.store.Get(id)
if err != nil { if err != nil {
@ -363,7 +412,7 @@ func (n *Node) onRequest(sender state.PeerID, requests [][]byte) error {
} }
if message == nil { if message == nil {
log.Printf("message %x does not exist", id[:4]) n.logger.Error("message does not exist", zap.String("messageID", hex.EncodeToString(id[:4])))
continue continue
} }
@ -375,7 +424,10 @@ func (n *Node) onRequest(sender state.PeerID, requests [][]byte) error {
} }
if !exist { if !exist {
log.Printf("[%x] peer %x is not in group", groupID, sender[:4]) n.logger.Error("peer is not in group",
zap.String("groupID", hex.EncodeToString(groupID[:4])),
zap.String("peer", hex.EncodeToString(sender[:4])),
)
continue continue
} }
@ -391,11 +443,16 @@ func (n *Node) onAck(sender state.PeerID, acks [][]byte) error {
err := n.syncState.Remove(id, sender) err := n.syncState.Remove(id, sender)
if err != nil { if err != nil {
log.Printf("error while removing sync state %s", err.Error()) n.logger.Error("Error while removing sync state.", zap.Error(err))
return err return err
} }
log.Printf("ACK (%x -> %x): %x received.\n", sender[:4], n.ID[:4], id[:4]) n.logger.Debug("ACK received",
zap.String("from", hex.EncodeToString(sender[:4])),
zap.String("to", hex.EncodeToString(n.ID[:4])),
zap.String("messageID", hex.EncodeToString(id[:4])),
)
} }
return nil return nil
} }
@ -407,12 +464,18 @@ func (n *Node) onMessages(sender state.PeerID, messages []*protobuf.Message) [][
groupID := toGroupID(m.GroupId) groupID := toGroupID(m.GroupId)
err := n.onMessage(sender, *m) err := n.onMessage(sender, *m)
if err != nil { if err != nil {
log.Printf("Error processing messsage: %+v\n", err) n.logger.Error("Error processing message", zap.Error(err))
continue continue
} }
id := m.ID() id := m.ID()
log.Printf("[%x] sending ACK (%x -> %x): %x\n", groupID[:4], n.ID[:4], sender[:4], id[:4]) n.logger.Debug("sending ACK",
zap.String("groupID", hex.EncodeToString(groupID[:4])),
zap.String("from", hex.EncodeToString(n.ID[:4])),
zap.String("", hex.EncodeToString(sender[:4])),
zap.String("messageID", hex.EncodeToString(id[:4])),
)
a = append(a, id[:]) a = append(a, id[:])
} }
@ -422,7 +485,11 @@ func (n *Node) onMessages(sender state.PeerID, messages []*protobuf.Message) [][
func (n *Node) onMessage(sender state.PeerID, msg protobuf.Message) error { func (n *Node) onMessage(sender state.PeerID, msg protobuf.Message) error {
id := msg.ID() id := msg.ID()
groupID := toGroupID(msg.GroupId) groupID := toGroupID(msg.GroupId)
log.Printf("MESSAGE (%x -> %x): %x received.\n", sender[:4], n.ID[:4], id[:4]) n.logger.Debug("MESSAGE received",
zap.String("from", hex.EncodeToString(sender[:4])),
zap.String("to", hex.EncodeToString(n.ID[:4])),
zap.String("messageID", hex.EncodeToString(id[:4])),
)
err := n.syncState.Remove(id, sender) err := n.syncState.Remove(id, sender)
if err != nil { if err != nil {
@ -466,7 +533,13 @@ func (n *Node) insertSyncState(groupID *state.GroupID, messageID state.MessageID
err := n.syncState.Add(s) err := n.syncState.Add(s)
if err != nil { if err != nil {
log.Printf("error (%s) setting sync state group: %x id: %x peer: %x", err.Error(), groupID, messageID, peerID) n.logger.Error("error setting sync states",
zap.Error(err),
zap.String("groupID", hex.EncodeToString(groupID[:4])),
zap.String("messageID", hex.EncodeToString(messageID[:4])),
zap.String("peerID", hex.EncodeToString(peerID[:4])),
)
} }
} }

2
vendor/modules.txt vendored
View File

@ -92,7 +92,7 @@ github.com/syndtr/goleveldb/leveldb/filter
github.com/syndtr/goleveldb/leveldb/journal github.com/syndtr/goleveldb/leveldb/journal
github.com/syndtr/goleveldb/leveldb/memdb github.com/syndtr/goleveldb/leveldb/memdb
github.com/syndtr/goleveldb/leveldb/table github.com/syndtr/goleveldb/leveldb/table
# github.com/vacp2p/mvds v0.0.0-20190705123435-a8dc37599bf99958b0ea19431c9bb7a47bd7fa8f # github.com/vacp2p/mvds v0.0.0-20190705123435-943ab5e228078f6389dffe00946c0d0a5b495a5c
github.com/vacp2p/mvds/node github.com/vacp2p/mvds/node
github.com/vacp2p/mvds/protobuf github.com/vacp2p/mvds/protobuf
github.com/vacp2p/mvds/state github.com/vacp2p/mvds/state