diff --git a/mailserver/mailserver.go b/mailserver/mailserver.go index 0497fa590..d87057024 100644 --- a/mailserver/mailserver.go +++ b/mailserver/mailserver.go @@ -20,6 +20,7 @@ import ( "encoding/binary" "errors" "fmt" + "math/rand" "sync" "time" @@ -213,7 +214,8 @@ func (s *WMailServer) Archive(env *whisper.Envelope) { func (s *WMailServer) DeliverMail(peer *whisper.Peer, request *whisper.Envelope) { defer recoverLevelDBPanics("DeliverMail") - log.Info("[mailserver:DeliverMail] delivering mail", "peerID", peerIDString(peer)) + startMethod := time.Now() + defer deliverMailTimer.UpdateSince(startMethod) requestsMeter.Mark(1) @@ -222,9 +224,19 @@ func (s *WMailServer) DeliverMail(peer *whisper.Peer, request *whisper.Envelope) log.Error("[mailserver:DeliverMail] peer is nil") return } + + requestID := request.Hash().String() + peerID := peerIDString(peer) + + log.Info("[mailserver:DeliverMail] delivering mail", + "peerID", peerID, + "requestID", requestID) + if s.exceedsPeerRequests(peer.ID()) { requestErrorsCounter.Inc(1) - log.Error("[mailserver:DeliverMail] peer exceeded the limit", "peerID", peerIDString(peer)) + log.Error("[mailserver:DeliverMail] peer exceeded the limit", + "peerID", peerID, + "requestID", requestID) s.trySendHistoricMessageErrorResponse(peer, request, fmt.Errorf("rate limit exceeded")) return } @@ -246,18 +258,25 @@ func (s *WMailServer) DeliverMail(peer *whisper.Peer, request *whisper.Envelope) limit = payload.Limit batch = payload.Batch } else { - log.Debug("[mailserver:DeliverMail] failed to decode request", "err", err, "peerID", peerIDString(peer)) + log.Debug("[mailserver:DeliverMail] failed to decode request", + "err", err, + "peerID", peerID, + "requestID", requestID) lower, upper, bloom, limit, cursor, err = s.validateRequest(peer.ID(), request) } if err != nil { requestValidationErrorsCounter.Inc(1) - log.Error("[mailserver:DeliverMail] request failed validaton", "peerID", peerIDString(peer)) + log.Error("[mailserver:DeliverMail] request failed validaton", + "peerID", peerID, + "requestID", requestID) s.trySendHistoricMessageErrorResponse(peer, request, err) return } - log.Debug("[mailserver:DeliverMail] processing request", + log.Info("[mailserver:DeliverMail] processing request", + "peerID", peerID, + "requestID", requestID, "lower", lower, "upper", upper, "bloom", bloom, @@ -288,7 +307,10 @@ func (s *WMailServer) DeliverMail(peer *whisper.Peer, request *whisper.Envelope) counter++ } close(errCh) - log.Info("[mailserver:DeliverMail] finished sending bundles", "counter", counter) + log.Info("[mailserver:DeliverMail] finished sending bundles", + "peerID", peerID, + "requestID", requestID, + "counter", counter) }() start := time.Now() @@ -297,6 +319,7 @@ func (s *WMailServer) DeliverMail(peer *whisper.Peer, request *whisper.Envelope) bloom, int(limit), processRequestTimeout, + requestID, bundles, cancelProcessing, ) @@ -305,7 +328,10 @@ func (s *WMailServer) DeliverMail(peer *whisper.Peer, request *whisper.Envelope) // Wait for the goroutine to finish the work. It may return an error. if err := <-errCh; err != nil { processRequestErrorsCounter.Inc(1) - log.Error("[mailserver:DeliverMail] error while processing", "err", err, "peerID", peerIDString(peer)) + log.Error("[mailserver:DeliverMail] error while processing", + "err", err, + "peerID", peerID, + "requestID", requestID) s.trySendHistoricMessageErrorResponse(peer, request, err) return } @@ -313,16 +339,26 @@ func (s *WMailServer) DeliverMail(peer *whisper.Peer, request *whisper.Envelope) // Processing of the request could be finished earlier due to iterator error. if err := iter.Error(); err != nil { processRequestErrorsCounter.Inc(1) - log.Error("[mailserver:DeliverMail] iterator failed", "err", err, "peerID", peerIDString(peer)) + log.Error("[mailserver:DeliverMail] iterator failed", + "err", err, + "peerID", peerID, + "requestID", requestID) s.trySendHistoricMessageErrorResponse(peer, request, err) return } - log.Debug("[mailserver:DeliverMail] sending historic message response", "last", lastEnvelopeHash, "next", nextPageCursor) + log.Info("[mailserver:DeliverMail] sending historic message response", + "peerID", peerID, + "requestID", requestID, + "last", lastEnvelopeHash, + "next", nextPageCursor) if err := s.sendHistoricMessageResponse(peer, request, lastEnvelopeHash, nextPageCursor); err != nil { historicResponseErrorsCounter.Inc(1) - log.Error("[mailserver:DeliverMail] error sending historic message response", "err", err, "peerID", peerIDString(peer)) + log.Error("[mailserver:DeliverMail] error sending historic message response", + "err", err, + "peerID", peerID, + "requestID", requestID) // we still want to try to report error even it it is a p2p error and it is unlikely s.trySendHistoricMessageErrorResponse(peer, request, err) } @@ -334,6 +370,8 @@ func (s *WMailServer) SyncMail(peer *whisper.Peer, request whisper.SyncMailReque defer recoverLevelDBPanics("SyncMail") + requestID := fmt.Sprintf("%d-%d", time.Now().UnixNano(), rand.Intn(1000)) + syncRequestsMeter.Mark(1) // Check rate limiting for a requesting peer. @@ -372,6 +410,7 @@ func (s *WMailServer) SyncMail(peer *whisper.Peer, request whisper.SyncMailReque request.Bloom, int(request.Limit), processRequestTimeout, + requestID, bundles, cancelProcessing, ) @@ -454,6 +493,7 @@ func (s *WMailServer) processRequestInBundles( bloom []byte, limit int, timeout time.Duration, + requestID string, output chan<- []*whisper.Envelope, cancel <-chan struct{}, ) ([]byte, common.Hash) { @@ -467,13 +507,9 @@ func (s *WMailServer) processRequestInBundles( lastEnvelopeHash common.Hash ) - requestID := fmt.Sprintf("%d", time.Now().UnixNano()) - log.Info("[mailserver:processRequestInBundles] processing request", - "requestID", - requestID, - "limit", - limit) + "requestID", requestID, + "limit", limit) // We iterate over the envelopes. // We collect envelopes in batches. @@ -486,7 +522,9 @@ func (s *WMailServer) processRequestInBundles( decodeErr := rlp.DecodeBytes(iter.Value(), &envelope) if decodeErr != nil { - log.Error("failed to decode RLP", "err", decodeErr) + log.Error("[mailserver:processRequestInBundles] failed to decode RLP", + "err", decodeErr, + "requestID", requestID) continue } @@ -531,17 +569,11 @@ func (s *WMailServer) processRequestInBundles( } log.Info("[mailserver:processRequestInBundles] publishing envelopes", - "requestID", - requestID, - "batchesCount", - len(batches), - "envelopeCount", - processedEnvelopes, - "processedEnvelopesSize", - processedEnvelopesSize, - "cursor", - nextCursor, - ) + "requestID", requestID, + "batchesCount", len(batches), + "envelopeCount", processedEnvelopes, + "processedEnvelopesSize", processedEnvelopesSize, + "cursor", nextCursor) // Publish for _, batch := range batches { diff --git a/mailserver/mailserver_test.go b/mailserver/mailserver_test.go index fee5e4a11..1eb0520aa 100644 --- a/mailserver/mailserver_test.go +++ b/mailserver/mailserver_test.go @@ -512,7 +512,7 @@ func (s *MailserverSuite) TestProcessRequestDeadlockHandling() { processFinished := make(chan struct{}) go func() { - s.server.processRequestInBundles(iter, bloom, int(limit), timeout, bundles, done) + s.server.processRequestInBundles(iter, bloom, int(limit), timeout, "req-01", bundles, done) close(processFinished) }() go close(done) @@ -536,7 +536,7 @@ func (s *MailserverSuite) TestProcessRequestDeadlockHandling() { processFinished := make(chan struct{}) go func() { - s.server.processRequestInBundles(iter, bloom, int(limit), time.Second, bundles, done) + s.server.processRequestInBundles(iter, bloom, int(limit), time.Second, "req-01", bundles, done) close(processFinished) }() @@ -765,7 +765,7 @@ func processRequestAndCollectHashes( close(done) }() - cursor, lastHash := server.processRequestInBundles(iter, bloom, limit, time.Minute, bundles, done) + cursor, lastHash := server.processRequestInBundles(iter, bloom, limit, time.Minute, "req-01", bundles, done) <-done diff --git a/mailserver/metrics.go b/mailserver/metrics.go index 49b2b8a0f..1b3b6aaa9 100644 --- a/mailserver/metrics.go +++ b/mailserver/metrics.go @@ -19,4 +19,5 @@ var ( processRequestErrorsCounter = metrics.NewRegisteredCounter("mailserver/processRequestErrors", nil) historicResponseErrorsCounter = metrics.NewRegisteredCounter("mailserver/historicResponseErrors", nil) syncRequestsMeter = metrics.NewRegisteredMeter("mailserver/syncRequests", nil) + deliverMailTimer = metrics.NewRegisteredTimer("mailserver/deliverMailTime", nil) )