diff --git a/mailserver/mailserver.go b/mailserver/mailserver.go index 092148b49..db988b064 100644 --- a/mailserver/mailserver.go +++ b/mailserver/mailserver.go @@ -294,9 +294,6 @@ func (s *WMailServer) DeliverMail(peer *whisper.Peer, request *whisper.Envelope) ) requestProcessTimer.UpdateSince(start) - // bundles channel can be closed now because the processing finished. - close(bundles) - // Wait for the goroutine to finish the work. It may return an error. if err := <-errCh; err != nil { processRequestErrorsCounter.Inc(1) @@ -368,9 +365,6 @@ func (s *WMailServer) SyncMail(peer *whisper.Peer, request whisper.SyncMailReque ) requestProcessTimer.UpdateSince(start) - // bundles channel can be closed now because the processing finished. - close(bundles) - // Wait for the goroutine to finish the work. It may return an error. if err := <-errCh; err != nil { _ = s.w.SendSyncResponse( @@ -444,17 +438,35 @@ func (s *WMailServer) createIterator(lower, upper uint32, cursor []byte) iterato // processRequestInBundles processes envelopes using an iterator and passes them // to the output channel in bundles. func (s *WMailServer) processRequestInBundles( - iter iterator.Iterator, bloom []byte, limit int, output chan<- []*whisper.Envelope, + iter iterator.Iterator, + bloom []byte, + limit int, + output chan<- []*whisper.Envelope, ) ([]byte, common.Hash) { var ( bundle []*whisper.Envelope bundleSize uint32 + batches [][]*whisper.Envelope processedEnvelopes int processedEnvelopesSize int64 nextCursor []byte lastEnvelopeHash common.Hash ) + requestID := fmt.Sprintf("%d", time.Now().UnixNano()) + + log.Debug("[mailserver:processRequestInBundles] processing request", + "requestID", + requestID, + "limit", + limit) + + // We iterate over the envelopes. + // We collect envelopes in batches. + // If there still room and we haven't reached the limit + // append and continue. + // Otherwise publish what you have so far, reset the bundle to the + // current envelope, and leave if we hit the limit for iter.Prev() { var envelope whisper.Envelope @@ -468,50 +480,68 @@ func (s *WMailServer) processRequestInBundles( continue } - newSize := bundleSize + whisper.EnvelopeHeaderLength + uint32(len(envelope.Data)) - limitReached := limit != noLimits && (processedEnvelopes+len(bundle)) == limit + lastEnvelopeHash = envelope.Hash() + processedEnvelopes++ + envelopeSize := whisper.EnvelopeHeaderLength + uint32(len(envelope.Data)) + limitReached := limit != noLimits && processedEnvelopes == limit + newSize := bundleSize + envelopeSize + + // If we still have some room for messages, add and continue if !limitReached && newSize < s.w.MaxMessageSize() { bundle = append(bundle, &envelope) bundleSize = newSize - lastEnvelopeHash = envelope.Hash() continue } - output <- bundle - bundle = bundle[:0] - bundleSize = 0 - - processedEnvelopes += len(bundle) - processedEnvelopesSize += int64(bundleSize) - - if limitReached { - // When the limit is reached, the current retrieved envelope - // is not included in the response. - // The nextCursor is a key used as a limit in a range and - // is not included in the range, hence, we need to get - // the previous iterator key. - // Because we iterate backwards, we use Next(). - iter.Next() - nextCursor = iter.Key() - break - } else { - // Reset bundle information and add the last read envelope - // which did not make in the last batch. - bundle = []*whisper.Envelope{&envelope} - bundleSize = whisper.EnvelopeHeaderLength + uint32(len(envelope.Data)) + // Publish if anything is in the bundle (there should always be + // something unless limit = 1) + if len(bundle) != 0 { + batches = append(batches, bundle) + processedEnvelopesSize += int64(bundleSize) } - lastEnvelopeHash = envelope.Hash() + // Reset the bundle with the current envelope + bundle = []*whisper.Envelope{&envelope} + bundleSize = envelopeSize + + // Leave if we reached the limit + if limitReached { + nextCursor = iter.Key() + break + } } - // There might be some outstanding elements in the bundle. - if len(bundle) > 0 && bundleSize > 0 { - output <- bundle + if len(bundle) > 0 { + batches = append(batches, bundle) + processedEnvelopesSize += int64(bundleSize) + } + + log.Debug("[mailserver:processRequestInBundles] publishing envelopes", + "requestID", + requestID, + "batchesCount", + len(batches), + "envelopeCount", + processedEnvelopes, + "processedEnvelopesSize", + processedEnvelopesSize, + "cursor", + nextCursor, + ) + + // Publish + for _, batch := range batches { + output <- batch } sentEnvelopesMeter.Mark(int64(processedEnvelopes)) sentEnvelopesSizeMeter.Mark(processedEnvelopesSize) + log.Debug("[mailserver:processRequestInBundles] envelopes published", + "requestID", + requestID) + close(output) + return nextCursor, lastEnvelopeHash } diff --git a/mailserver/mailserver_test.go b/mailserver/mailserver_test.go index f90e83a9d..b9bb90297 100644 --- a/mailserver/mailserver_test.go +++ b/mailserver/mailserver_test.go @@ -652,7 +652,6 @@ func processRequestAndCollectHashes( }() cursor, lastHash := server.processRequestInBundles(iter, bloom, limit, bundles) - close(bundles) <-done