expose more logs and metrics in DeliverMail (#1398)

This commit is contained in:
Adam Babik 2019-02-27 15:30:08 +01:00 committed by GitHub
parent ccce8b9a80
commit 0c76505b95
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 64 additions and 31 deletions

View File

@ -20,6 +20,7 @@ import (
"encoding/binary" "encoding/binary"
"errors" "errors"
"fmt" "fmt"
"math/rand"
"sync" "sync"
"time" "time"
@ -213,7 +214,8 @@ func (s *WMailServer) Archive(env *whisper.Envelope) {
func (s *WMailServer) DeliverMail(peer *whisper.Peer, request *whisper.Envelope) { func (s *WMailServer) DeliverMail(peer *whisper.Peer, request *whisper.Envelope) {
defer recoverLevelDBPanics("DeliverMail") defer recoverLevelDBPanics("DeliverMail")
log.Info("[mailserver:DeliverMail] delivering mail", "peerID", peerIDString(peer)) startMethod := time.Now()
defer deliverMailTimer.UpdateSince(startMethod)
requestsMeter.Mark(1) requestsMeter.Mark(1)
@ -222,9 +224,19 @@ func (s *WMailServer) DeliverMail(peer *whisper.Peer, request *whisper.Envelope)
log.Error("[mailserver:DeliverMail] peer is nil") log.Error("[mailserver:DeliverMail] peer is nil")
return return
} }
requestID := request.Hash().String()
peerID := peerIDString(peer)
log.Info("[mailserver:DeliverMail] delivering mail",
"peerID", peerID,
"requestID", requestID)
if s.exceedsPeerRequests(peer.ID()) { if s.exceedsPeerRequests(peer.ID()) {
requestErrorsCounter.Inc(1) requestErrorsCounter.Inc(1)
log.Error("[mailserver:DeliverMail] peer exceeded the limit", "peerID", peerIDString(peer)) log.Error("[mailserver:DeliverMail] peer exceeded the limit",
"peerID", peerID,
"requestID", requestID)
s.trySendHistoricMessageErrorResponse(peer, request, fmt.Errorf("rate limit exceeded")) s.trySendHistoricMessageErrorResponse(peer, request, fmt.Errorf("rate limit exceeded"))
return return
} }
@ -246,18 +258,25 @@ func (s *WMailServer) DeliverMail(peer *whisper.Peer, request *whisper.Envelope)
limit = payload.Limit limit = payload.Limit
batch = payload.Batch batch = payload.Batch
} else { } else {
log.Debug("[mailserver:DeliverMail] failed to decode request", "err", err, "peerID", peerIDString(peer)) log.Debug("[mailserver:DeliverMail] failed to decode request",
"err", err,
"peerID", peerID,
"requestID", requestID)
lower, upper, bloom, limit, cursor, err = s.validateRequest(peer.ID(), request) lower, upper, bloom, limit, cursor, err = s.validateRequest(peer.ID(), request)
} }
if err != nil { if err != nil {
requestValidationErrorsCounter.Inc(1) requestValidationErrorsCounter.Inc(1)
log.Error("[mailserver:DeliverMail] request failed validaton", "peerID", peerIDString(peer)) log.Error("[mailserver:DeliverMail] request failed validaton",
"peerID", peerID,
"requestID", requestID)
s.trySendHistoricMessageErrorResponse(peer, request, err) s.trySendHistoricMessageErrorResponse(peer, request, err)
return return
} }
log.Debug("[mailserver:DeliverMail] processing request", log.Info("[mailserver:DeliverMail] processing request",
"peerID", peerID,
"requestID", requestID,
"lower", lower, "lower", lower,
"upper", upper, "upper", upper,
"bloom", bloom, "bloom", bloom,
@ -288,7 +307,10 @@ func (s *WMailServer) DeliverMail(peer *whisper.Peer, request *whisper.Envelope)
counter++ counter++
} }
close(errCh) close(errCh)
log.Info("[mailserver:DeliverMail] finished sending bundles", "counter", counter) log.Info("[mailserver:DeliverMail] finished sending bundles",
"peerID", peerID,
"requestID", requestID,
"counter", counter)
}() }()
start := time.Now() start := time.Now()
@ -297,6 +319,7 @@ func (s *WMailServer) DeliverMail(peer *whisper.Peer, request *whisper.Envelope)
bloom, bloom,
int(limit), int(limit),
processRequestTimeout, processRequestTimeout,
requestID,
bundles, bundles,
cancelProcessing, cancelProcessing,
) )
@ -305,7 +328,10 @@ func (s *WMailServer) DeliverMail(peer *whisper.Peer, request *whisper.Envelope)
// Wait for the goroutine to finish the work. It may return an error. // Wait for the goroutine to finish the work. It may return an error.
if err := <-errCh; err != nil { if err := <-errCh; err != nil {
processRequestErrorsCounter.Inc(1) processRequestErrorsCounter.Inc(1)
log.Error("[mailserver:DeliverMail] error while processing", "err", err, "peerID", peerIDString(peer)) log.Error("[mailserver:DeliverMail] error while processing",
"err", err,
"peerID", peerID,
"requestID", requestID)
s.trySendHistoricMessageErrorResponse(peer, request, err) s.trySendHistoricMessageErrorResponse(peer, request, err)
return return
} }
@ -313,16 +339,26 @@ func (s *WMailServer) DeliverMail(peer *whisper.Peer, request *whisper.Envelope)
// Processing of the request could be finished earlier due to iterator error. // Processing of the request could be finished earlier due to iterator error.
if err := iter.Error(); err != nil { if err := iter.Error(); err != nil {
processRequestErrorsCounter.Inc(1) processRequestErrorsCounter.Inc(1)
log.Error("[mailserver:DeliverMail] iterator failed", "err", err, "peerID", peerIDString(peer)) log.Error("[mailserver:DeliverMail] iterator failed",
"err", err,
"peerID", peerID,
"requestID", requestID)
s.trySendHistoricMessageErrorResponse(peer, request, err) s.trySendHistoricMessageErrorResponse(peer, request, err)
return return
} }
log.Debug("[mailserver:DeliverMail] sending historic message response", "last", lastEnvelopeHash, "next", nextPageCursor) log.Info("[mailserver:DeliverMail] sending historic message response",
"peerID", peerID,
"requestID", requestID,
"last", lastEnvelopeHash,
"next", nextPageCursor)
if err := s.sendHistoricMessageResponse(peer, request, lastEnvelopeHash, nextPageCursor); err != nil { if err := s.sendHistoricMessageResponse(peer, request, lastEnvelopeHash, nextPageCursor); err != nil {
historicResponseErrorsCounter.Inc(1) historicResponseErrorsCounter.Inc(1)
log.Error("[mailserver:DeliverMail] error sending historic message response", "err", err, "peerID", peerIDString(peer)) log.Error("[mailserver:DeliverMail] error sending historic message response",
"err", err,
"peerID", peerID,
"requestID", requestID)
// we still want to try to report error even it it is a p2p error and it is unlikely // we still want to try to report error even it it is a p2p error and it is unlikely
s.trySendHistoricMessageErrorResponse(peer, request, err) s.trySendHistoricMessageErrorResponse(peer, request, err)
} }
@ -334,6 +370,8 @@ func (s *WMailServer) SyncMail(peer *whisper.Peer, request whisper.SyncMailReque
defer recoverLevelDBPanics("SyncMail") defer recoverLevelDBPanics("SyncMail")
requestID := fmt.Sprintf("%d-%d", time.Now().UnixNano(), rand.Intn(1000))
syncRequestsMeter.Mark(1) syncRequestsMeter.Mark(1)
// Check rate limiting for a requesting peer. // Check rate limiting for a requesting peer.
@ -372,6 +410,7 @@ func (s *WMailServer) SyncMail(peer *whisper.Peer, request whisper.SyncMailReque
request.Bloom, request.Bloom,
int(request.Limit), int(request.Limit),
processRequestTimeout, processRequestTimeout,
requestID,
bundles, bundles,
cancelProcessing, cancelProcessing,
) )
@ -454,6 +493,7 @@ func (s *WMailServer) processRequestInBundles(
bloom []byte, bloom []byte,
limit int, limit int,
timeout time.Duration, timeout time.Duration,
requestID string,
output chan<- []*whisper.Envelope, output chan<- []*whisper.Envelope,
cancel <-chan struct{}, cancel <-chan struct{},
) ([]byte, common.Hash) { ) ([]byte, common.Hash) {
@ -467,13 +507,9 @@ func (s *WMailServer) processRequestInBundles(
lastEnvelopeHash common.Hash lastEnvelopeHash common.Hash
) )
requestID := fmt.Sprintf("%d", time.Now().UnixNano())
log.Info("[mailserver:processRequestInBundles] processing request", log.Info("[mailserver:processRequestInBundles] processing request",
"requestID", "requestID", requestID,
requestID, "limit", limit)
"limit",
limit)
// We iterate over the envelopes. // We iterate over the envelopes.
// We collect envelopes in batches. // We collect envelopes in batches.
@ -486,7 +522,9 @@ func (s *WMailServer) processRequestInBundles(
decodeErr := rlp.DecodeBytes(iter.Value(), &envelope) decodeErr := rlp.DecodeBytes(iter.Value(), &envelope)
if decodeErr != nil { if decodeErr != nil {
log.Error("failed to decode RLP", "err", decodeErr) log.Error("[mailserver:processRequestInBundles] failed to decode RLP",
"err", decodeErr,
"requestID", requestID)
continue continue
} }
@ -531,17 +569,11 @@ func (s *WMailServer) processRequestInBundles(
} }
log.Info("[mailserver:processRequestInBundles] publishing envelopes", log.Info("[mailserver:processRequestInBundles] publishing envelopes",
"requestID", "requestID", requestID,
requestID, "batchesCount", len(batches),
"batchesCount", "envelopeCount", processedEnvelopes,
len(batches), "processedEnvelopesSize", processedEnvelopesSize,
"envelopeCount", "cursor", nextCursor)
processedEnvelopes,
"processedEnvelopesSize",
processedEnvelopesSize,
"cursor",
nextCursor,
)
// Publish // Publish
for _, batch := range batches { for _, batch := range batches {

View File

@ -512,7 +512,7 @@ func (s *MailserverSuite) TestProcessRequestDeadlockHandling() {
processFinished := make(chan struct{}) processFinished := make(chan struct{})
go func() { go func() {
s.server.processRequestInBundles(iter, bloom, int(limit), timeout, bundles, done) s.server.processRequestInBundles(iter, bloom, int(limit), timeout, "req-01", bundles, done)
close(processFinished) close(processFinished)
}() }()
go close(done) go close(done)
@ -536,7 +536,7 @@ func (s *MailserverSuite) TestProcessRequestDeadlockHandling() {
processFinished := make(chan struct{}) processFinished := make(chan struct{})
go func() { go func() {
s.server.processRequestInBundles(iter, bloom, int(limit), time.Second, bundles, done) s.server.processRequestInBundles(iter, bloom, int(limit), time.Second, "req-01", bundles, done)
close(processFinished) close(processFinished)
}() }()
@ -765,7 +765,7 @@ func processRequestAndCollectHashes(
close(done) close(done)
}() }()
cursor, lastHash := server.processRequestInBundles(iter, bloom, limit, time.Minute, bundles, done) cursor, lastHash := server.processRequestInBundles(iter, bloom, limit, time.Minute, "req-01", bundles, done)
<-done <-done

View File

@ -19,4 +19,5 @@ var (
processRequestErrorsCounter = metrics.NewRegisteredCounter("mailserver/processRequestErrors", nil) processRequestErrorsCounter = metrics.NewRegisteredCounter("mailserver/processRequestErrors", nil)
historicResponseErrorsCounter = metrics.NewRegisteredCounter("mailserver/historicResponseErrors", nil) historicResponseErrorsCounter = metrics.NewRegisteredCounter("mailserver/historicResponseErrors", nil)
syncRequestsMeter = metrics.NewRegisteredMeter("mailserver/syncRequests", nil) syncRequestsMeter = metrics.NewRegisteredMeter("mailserver/syncRequests", nil)
deliverMailTimer = metrics.NewRegisteredTimer("mailserver/deliverMailTime", nil)
) )