Change publishing envelopes behaviour (#1371)

This commit is contained in:
Andrea Maria Piana 2019-02-08 16:39:24 +01:00 committed by GitHub
parent 4f3f5ee574
commit da3964ba31
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 66 additions and 37 deletions

View File

@ -294,9 +294,6 @@ func (s *WMailServer) DeliverMail(peer *whisper.Peer, request *whisper.Envelope)
) )
requestProcessTimer.UpdateSince(start) requestProcessTimer.UpdateSince(start)
// bundles channel can be closed now because the processing finished.
close(bundles)
// Wait for the goroutine to finish the work. It may return an error. // Wait for the goroutine to finish the work. It may return an error.
if err := <-errCh; err != nil { if err := <-errCh; err != nil {
processRequestErrorsCounter.Inc(1) processRequestErrorsCounter.Inc(1)
@ -368,9 +365,6 @@ func (s *WMailServer) SyncMail(peer *whisper.Peer, request whisper.SyncMailReque
) )
requestProcessTimer.UpdateSince(start) requestProcessTimer.UpdateSince(start)
// bundles channel can be closed now because the processing finished.
close(bundles)
// Wait for the goroutine to finish the work. It may return an error. // Wait for the goroutine to finish the work. It may return an error.
if err := <-errCh; err != nil { if err := <-errCh; err != nil {
_ = s.w.SendSyncResponse( _ = s.w.SendSyncResponse(
@ -444,17 +438,35 @@ func (s *WMailServer) createIterator(lower, upper uint32, cursor []byte) iterato
// processRequestInBundles processes envelopes using an iterator and passes them // processRequestInBundles processes envelopes using an iterator and passes them
// to the output channel in bundles. // to the output channel in bundles.
func (s *WMailServer) processRequestInBundles( func (s *WMailServer) processRequestInBundles(
iter iterator.Iterator, bloom []byte, limit int, output chan<- []*whisper.Envelope, iter iterator.Iterator,
bloom []byte,
limit int,
output chan<- []*whisper.Envelope,
) ([]byte, common.Hash) { ) ([]byte, common.Hash) {
var ( var (
bundle []*whisper.Envelope bundle []*whisper.Envelope
bundleSize uint32 bundleSize uint32
batches [][]*whisper.Envelope
processedEnvelopes int processedEnvelopes int
processedEnvelopesSize int64 processedEnvelopesSize int64
nextCursor []byte nextCursor []byte
lastEnvelopeHash common.Hash lastEnvelopeHash common.Hash
) )
requestID := fmt.Sprintf("%d", time.Now().UnixNano())
log.Debug("[mailserver:processRequestInBundles] processing request",
"requestID",
requestID,
"limit",
limit)
// We iterate over the envelopes.
// We collect envelopes in batches.
// If there still room and we haven't reached the limit
// append and continue.
// Otherwise publish what you have so far, reset the bundle to the
// current envelope, and leave if we hit the limit
for iter.Prev() { for iter.Prev() {
var envelope whisper.Envelope var envelope whisper.Envelope
@ -468,50 +480,68 @@ func (s *WMailServer) processRequestInBundles(
continue continue
} }
newSize := bundleSize + whisper.EnvelopeHeaderLength + uint32(len(envelope.Data)) lastEnvelopeHash = envelope.Hash()
limitReached := limit != noLimits && (processedEnvelopes+len(bundle)) == limit processedEnvelopes++
envelopeSize := whisper.EnvelopeHeaderLength + uint32(len(envelope.Data))
limitReached := limit != noLimits && processedEnvelopes == limit
newSize := bundleSize + envelopeSize
// If we still have some room for messages, add and continue
if !limitReached && newSize < s.w.MaxMessageSize() { if !limitReached && newSize < s.w.MaxMessageSize() {
bundle = append(bundle, &envelope) bundle = append(bundle, &envelope)
bundleSize = newSize bundleSize = newSize
lastEnvelopeHash = envelope.Hash()
continue continue
} }
output <- bundle // Publish if anything is in the bundle (there should always be
bundle = bundle[:0] // something unless limit = 1)
bundleSize = 0 if len(bundle) != 0 {
batches = append(batches, bundle)
processedEnvelopes += len(bundle) processedEnvelopesSize += int64(bundleSize)
processedEnvelopesSize += int64(bundleSize)
if limitReached {
// When the limit is reached, the current retrieved envelope
// is not included in the response.
// The nextCursor is a key used as a limit in a range and
// is not included in the range, hence, we need to get
// the previous iterator key.
// Because we iterate backwards, we use Next().
iter.Next()
nextCursor = iter.Key()
break
} else {
// Reset bundle information and add the last read envelope
// which did not make in the last batch.
bundle = []*whisper.Envelope{&envelope}
bundleSize = whisper.EnvelopeHeaderLength + uint32(len(envelope.Data))
} }
lastEnvelopeHash = envelope.Hash() // Reset the bundle with the current envelope
bundle = []*whisper.Envelope{&envelope}
bundleSize = envelopeSize
// Leave if we reached the limit
if limitReached {
nextCursor = iter.Key()
break
}
} }
// There might be some outstanding elements in the bundle. if len(bundle) > 0 {
if len(bundle) > 0 && bundleSize > 0 { batches = append(batches, bundle)
output <- bundle processedEnvelopesSize += int64(bundleSize)
}
log.Debug("[mailserver:processRequestInBundles] publishing envelopes",
"requestID",
requestID,
"batchesCount",
len(batches),
"envelopeCount",
processedEnvelopes,
"processedEnvelopesSize",
processedEnvelopesSize,
"cursor",
nextCursor,
)
// Publish
for _, batch := range batches {
output <- batch
} }
sentEnvelopesMeter.Mark(int64(processedEnvelopes)) sentEnvelopesMeter.Mark(int64(processedEnvelopes))
sentEnvelopesSizeMeter.Mark(processedEnvelopesSize) sentEnvelopesSizeMeter.Mark(processedEnvelopesSize)
log.Debug("[mailserver:processRequestInBundles] envelopes published",
"requestID",
requestID)
close(output)
return nextCursor, lastEnvelopeHash return nextCursor, lastEnvelopeHash
} }

View File

@ -652,7 +652,6 @@ func processRequestAndCollectHashes(
}() }()
cursor, lastHash := server.processRequestInBundles(iter, bloom, limit, bundles) cursor, lastHash := server.processRequestInBundles(iter, bloom, limit, bundles)
close(bundles)
<-done <-done