Fix deadlock in mailserver DeliverMail method (#1388)

This commit is contained in:
Adam Babik 2019-02-22 09:55:37 +01:00 committed by GitHub
parent f2a400dc44
commit 38aafe01db
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 167 additions and 26 deletions

View File

@ -51,6 +51,7 @@ const (
timestampLength = 4 timestampLength = 4
requestLimitLength = 4 requestLimitLength = 4
requestTimeRangeLength = timestampLength * 2 requestTimeRangeLength = timestampLength * 2
processRequestTimeout = time.Minute
) )
// dbImpl is an interface introduced to be able to test some unexpected // dbImpl is an interface introduced to be able to test some unexpected
@ -212,18 +213,18 @@ func (s *WMailServer) Archive(env *whisper.Envelope) {
func (s *WMailServer) DeliverMail(peer *whisper.Peer, request *whisper.Envelope) { func (s *WMailServer) DeliverMail(peer *whisper.Peer, request *whisper.Envelope) {
defer recoverLevelDBPanics("DeliverMail") defer recoverLevelDBPanics("DeliverMail")
log.Info("Delivering mail", "peerID", peerIDString(peer)) log.Info("[mailserver:DeliverMail] delivering mail", "peerID", peerIDString(peer))
requestsMeter.Mark(1) requestsMeter.Mark(1)
if peer == nil { if peer == nil {
requestErrorsCounter.Inc(1) requestErrorsCounter.Inc(1)
log.Error("Whisper peer is nil") log.Error("[mailserver:DeliverMail] peer is nil")
return return
} }
if s.exceedsPeerRequests(peer.ID()) { if s.exceedsPeerRequests(peer.ID()) {
requestErrorsCounter.Inc(1) requestErrorsCounter.Inc(1)
log.Error("Peer exceeded request per seconds limit", "peerID", peerIDString(peer)) log.Error("[mailserver:DeliverMail] peer exceeded the limit", "peerID", peerIDString(peer))
s.trySendHistoricMessageErrorResponse(peer, request, fmt.Errorf("rate limit exceeded")) s.trySendHistoricMessageErrorResponse(peer, request, fmt.Errorf("rate limit exceeded"))
return return
} }
@ -245,18 +246,18 @@ func (s *WMailServer) DeliverMail(peer *whisper.Peer, request *whisper.Envelope)
limit = payload.Limit limit = payload.Limit
batch = payload.Batch batch = payload.Batch
} else { } else {
log.Debug("Failed to decode request", "err", err, "peerID", peerIDString(peer)) log.Debug("[mailserver:DeliverMail] failed to decode request", "err", err, "peerID", peerIDString(peer))
lower, upper, bloom, limit, cursor, err = s.validateRequest(peer.ID(), request) lower, upper, bloom, limit, cursor, err = s.validateRequest(peer.ID(), request)
} }
if err != nil { if err != nil {
requestValidationErrorsCounter.Inc(1) requestValidationErrorsCounter.Inc(1)
log.Error("Mailserver request failed validaton", "peerID", peerIDString(peer)) log.Error("[mailserver:DeliverMail] request failed validaton", "peerID", peerIDString(peer))
s.trySendHistoricMessageErrorResponse(peer, request, err) s.trySendHistoricMessageErrorResponse(peer, request, err)
return return
} }
log.Debug("Processing request", log.Debug("[mailserver:DeliverMail] processing request",
"lower", lower, "lower", lower,
"upper", upper, "upper", upper,
"bloom", bloom, "bloom", bloom,
@ -274,15 +275,20 @@ func (s *WMailServer) DeliverMail(peer *whisper.Peer, request *whisper.Envelope)
bundles := make(chan []*whisper.Envelope, 5) bundles := make(chan []*whisper.Envelope, 5)
errCh := make(chan error) errCh := make(chan error)
cancelProcessing := make(chan struct{})
go func() { go func() {
counter := 0
for bundle := range bundles { for bundle := range bundles {
if err := s.sendEnvelopes(peer, bundle, batch); err != nil { if err := s.sendEnvelopes(peer, bundle, batch); err != nil {
close(cancelProcessing)
errCh <- err errCh <- err
break break
} }
counter++
} }
close(errCh) close(errCh)
log.Info("[mailserver:DeliverMail] finished sending bundles", "counter", counter)
}() }()
start := time.Now() start := time.Now()
@ -290,14 +296,16 @@ func (s *WMailServer) DeliverMail(peer *whisper.Peer, request *whisper.Envelope)
iter, iter,
bloom, bloom,
int(limit), int(limit),
processRequestTimeout,
bundles, bundles,
cancelProcessing,
) )
requestProcessTimer.UpdateSince(start) requestProcessTimer.UpdateSince(start)
// Wait for the goroutine to finish the work. It may return an error. // Wait for the goroutine to finish the work. It may return an error.
if err := <-errCh; err != nil { if err := <-errCh; err != nil {
processRequestErrorsCounter.Inc(1) processRequestErrorsCounter.Inc(1)
log.Error("Error while processing mail server request", "err", err, "peerID", peerIDString(peer)) log.Error("[mailserver:DeliverMail] error while processing", "err", err, "peerID", peerIDString(peer))
s.trySendHistoricMessageErrorResponse(peer, request, err) s.trySendHistoricMessageErrorResponse(peer, request, err)
return return
} }
@ -305,16 +313,16 @@ func (s *WMailServer) DeliverMail(peer *whisper.Peer, request *whisper.Envelope)
// Processing of the request could be finished earlier due to iterator error. // Processing of the request could be finished earlier due to iterator error.
if err := iter.Error(); err != nil { if err := iter.Error(); err != nil {
processRequestErrorsCounter.Inc(1) processRequestErrorsCounter.Inc(1)
log.Error("Error while processing mail server request", "err", err, "peerID", peerIDString(peer)) log.Error("[mailserver:DeliverMail] iterator failed", "err", err, "peerID", peerIDString(peer))
s.trySendHistoricMessageErrorResponse(peer, request, err) s.trySendHistoricMessageErrorResponse(peer, request, err)
return return
} }
log.Debug("Sending historic message response", "last", lastEnvelopeHash, "next", nextPageCursor) log.Debug("[mailserver:DeliverMail] sending historic message response", "last", lastEnvelopeHash, "next", nextPageCursor)
if err := s.sendHistoricMessageResponse(peer, request, lastEnvelopeHash, nextPageCursor); err != nil { if err := s.sendHistoricMessageResponse(peer, request, lastEnvelopeHash, nextPageCursor); err != nil {
historicResponseErrorsCounter.Inc(1) historicResponseErrorsCounter.Inc(1)
log.Error("Error sending historic message response", "err", err, "peerID", peerIDString(peer)) log.Error("[mailserver:DeliverMail] error sending historic message response", "err", err, "peerID", peerIDString(peer))
// we still want to try to report error even it it is a p2p error and it is unlikely // we still want to try to report error even it it is a p2p error and it is unlikely
s.trySendHistoricMessageErrorResponse(peer, request, err) s.trySendHistoricMessageErrorResponse(peer, request, err)
} }
@ -344,11 +352,13 @@ func (s *WMailServer) SyncMail(peer *whisper.Peer, request whisper.SyncMailReque
bundles := make(chan []*whisper.Envelope, 5) bundles := make(chan []*whisper.Envelope, 5)
errCh := make(chan error) errCh := make(chan error)
cancelProcessing := make(chan struct{})
go func() { go func() {
for bundle := range bundles { for bundle := range bundles {
resp := whisper.SyncResponse{Envelopes: bundle} resp := whisper.SyncResponse{Envelopes: bundle}
if err := s.w.SendSyncResponse(peer, resp); err != nil { if err := s.w.SendSyncResponse(peer, resp); err != nil {
close(cancelProcessing)
errCh <- fmt.Errorf("failed to send sync response: %v", err) errCh <- fmt.Errorf("failed to send sync response: %v", err)
break break
} }
@ -361,7 +371,9 @@ func (s *WMailServer) SyncMail(peer *whisper.Peer, request whisper.SyncMailReque
iter, iter,
request.Bloom, request.Bloom,
int(request.Limit), int(request.Limit),
processRequestTimeout,
bundles, bundles,
cancelProcessing,
) )
requestProcessTimer.UpdateSince(start) requestProcessTimer.UpdateSince(start)
@ -441,7 +453,9 @@ func (s *WMailServer) processRequestInBundles(
iter iterator.Iterator, iter iterator.Iterator,
bloom []byte, bloom []byte,
limit int, limit int,
timeout time.Duration,
output chan<- []*whisper.Envelope, output chan<- []*whisper.Envelope,
cancel <-chan struct{},
) ([]byte, common.Hash) { ) ([]byte, common.Hash) {
var ( var (
bundle []*whisper.Envelope bundle []*whisper.Envelope
@ -455,7 +469,7 @@ func (s *WMailServer) processRequestInBundles(
requestID := fmt.Sprintf("%d", time.Now().UnixNano()) requestID := fmt.Sprintf("%d", time.Now().UnixNano())
log.Debug("[mailserver:processRequestInBundles] processing request", log.Info("[mailserver:processRequestInBundles] processing request",
"requestID", "requestID",
requestID, requestID,
"limit", "limit",
@ -516,7 +530,7 @@ func (s *WMailServer) processRequestInBundles(
processedEnvelopesSize += int64(bundleSize) processedEnvelopesSize += int64(bundleSize)
} }
log.Debug("[mailserver:processRequestInBundles] publishing envelopes", log.Info("[mailserver:processRequestInBundles] publishing envelopes",
"requestID", "requestID",
requestID, requestID,
"batchesCount", "batchesCount",
@ -531,15 +545,28 @@ func (s *WMailServer) processRequestInBundles(
// Publish // Publish
for _, batch := range batches { for _, batch := range batches {
output <- batch select {
case output <- batch:
// It might happen that during producing the batches,
// the connection with the peer goes down and
// the consumer of `output` channel exits prematurely.
// In such a case, we should stop pushing batches and exit.
case <-cancel:
log.Info("[mailserver:processRequestInBundles] failed to push all batches",
"requestID", requestID)
break
case <-time.After(timeout):
log.Error("[mailserver:processRequestInBundles] timed out pushing a batch",
"requestID", requestID)
break
}
} }
sentEnvelopesMeter.Mark(int64(processedEnvelopes)) sentEnvelopesMeter.Mark(int64(processedEnvelopes))
sentEnvelopesSizeMeter.Mark(processedEnvelopesSize) sentEnvelopesSizeMeter.Mark(processedEnvelopesSize)
log.Debug("[mailserver:processRequestInBundles] envelopes published", log.Info("[mailserver:processRequestInBundles] envelopes published",
"requestID", "requestID", requestID)
requestID)
close(output) close(output)
return nextCursor, lastEnvelopeHash return nextCursor, lastEnvelopeHash

View File

@ -28,6 +28,7 @@ import (
"time" "time"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
"github.com/syndtr/goleveldb/leveldb/iterator"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/crypto"
@ -318,16 +319,13 @@ func (s *MailserverSuite) TestRequestPaginationLimit() {
reverseSentHashes = append([]common.Hash{env.Hash()}, reverseSentHashes...) reverseSentHashes = append([]common.Hash{env.Hash()}, reverseSentHashes...)
} }
params := s.defaultServerParams(sentEnvelopes[0]) reqLimit := uint32(6)
params.low = uint32(now.Add(time.Duration(-count) * time.Second).Unix()) peerID, request, err := s.prepareRequest(sentEnvelopes, reqLimit)
params.upp = uint32(now.Unix()) s.NoError(err)
params.limit = 6 lower, upper, bloom, limit, cursor, err := s.server.validateRequest(peerID, request)
request := s.createRequest(params) s.NoError(err)
src := crypto.FromECDSAPub(&params.key.PublicKey)
lower, upper, bloom, limit, cursor, err := s.server.validateRequest(src, request)
s.True(err == nil)
s.Nil(cursor) s.Nil(cursor)
s.Equal(params.limit, limit) s.Equal(reqLimit, limit)
receivedHashes, cursor, _ = processRequestAndCollectHashes( receivedHashes, cursor, _ = processRequestAndCollectHashes(
s.server, lower, upper, cursor, bloom, int(limit), s.server, lower, upper, cursor, bloom, int(limit),
@ -469,6 +467,102 @@ func (s *MailserverSuite) TestDecodeRequest() {
s.Equal(payload, decodedPayload) s.Equal(payload, decodedPayload)
} }
func (s *MailserverSuite) TestProcessRequestDeadlockHandling() {
s.setupServer(s.server)
defer s.server.Close()
var archievedEnvelopes []*whisper.Envelope
now := time.Now()
count := uint32(10)
// Archieve some envelopes.
for i := count; i > 0; i-- {
sentTime := now.Add(time.Duration(-i) * time.Second)
env, err := generateEnvelope(sentTime)
s.NoError(err)
s.server.Archive(env)
archievedEnvelopes = append(archievedEnvelopes, env)
}
// Prepare a request.
peerID, request, err := s.prepareRequest(archievedEnvelopes, 5)
s.NoError(err)
lower, upper, bloom, limit, cursor, err := s.server.validateRequest(peerID, request)
s.NoError(err)
testCases := []struct {
Name string
Timeout time.Duration
Verify func(
iterator.Iterator,
time.Duration, // processRequestInBundles timeout
chan []*whisper.Envelope,
)
}{
{
Name: "finish processing using `done` channel",
Timeout: time.Second * 5,
Verify: func(
iter iterator.Iterator,
timeout time.Duration,
bundles chan []*whisper.Envelope,
) {
done := make(chan struct{})
processFinished := make(chan struct{})
go func() {
s.server.processRequestInBundles(iter, bloom, int(limit), timeout, bundles, done)
close(processFinished)
}()
go close(done)
select {
case <-processFinished:
case <-time.After(time.Second):
s.FailNow("waiting for processing finish timed out")
}
},
},
{
Name: "finish processing due to timeout",
Timeout: time.Second,
Verify: func(
iter iterator.Iterator,
timeout time.Duration,
bundles chan []*whisper.Envelope,
) {
done := make(chan struct{}) // won't be closed because we test timeout of `processRequestInBundles()`
processFinished := make(chan struct{})
go func() {
s.server.processRequestInBundles(iter, bloom, int(limit), time.Second, bundles, done)
close(processFinished)
}()
select {
case <-processFinished:
case <-time.After(time.Second * 5):
s.FailNow("waiting for processing finish timed out")
}
},
},
}
for _, tc := range testCases {
s.T().Run(tc.Name, func(t *testing.T) {
iter := s.server.createIterator(lower, upper, cursor)
defer iter.Release()
// Nothing reads from this unbuffered channel which simulates a situation
// when a connection between a peer and mail server was dropped.
bundles := make(chan []*whisper.Envelope)
tc.Verify(iter, tc.Timeout, bundles)
})
}
}
func (s *MailserverSuite) messageExists(envelope *whisper.Envelope, low, upp uint32, bloom []byte, limit uint32) bool { func (s *MailserverSuite) messageExists(envelope *whisper.Envelope, low, upp uint32, bloom []byte, limit uint32) bool {
receivedHashes, _, _ := processRequestAndCollectHashes( receivedHashes, _, _ := processRequestAndCollectHashes(
s.server, low, upp, nil, bloom, int(limit), s.server, low, upp, nil, bloom, int(limit),
@ -538,6 +632,26 @@ func (s *MailserverSuite) setupServer(server *WMailServer) {
} }
} }
func (s *MailserverSuite) prepareRequest(envelopes []*whisper.Envelope, limit uint32) (
[]byte, *whisper.Envelope, error,
) {
if len(envelopes) == 0 {
return nil, nil, errors.New("envelopes is empty")
}
now := time.Now()
params := s.defaultServerParams(envelopes[0])
params.low = uint32(now.Add(time.Duration(-len(envelopes)) * time.Second).Unix())
params.upp = uint32(now.Unix())
params.limit = limit
request := s.createRequest(params)
peerID := crypto.FromECDSAPub(&params.key.PublicKey)
return peerID, request, nil
}
func (s *MailserverSuite) defaultServerParams(env *whisper.Envelope) *ServerTestParams { func (s *MailserverSuite) defaultServerParams(env *whisper.Envelope) *ServerTestParams {
id, err := s.shh.NewKeyPair() id, err := s.shh.NewKeyPair()
if err != nil { if err != nil {
@ -651,7 +765,7 @@ func processRequestAndCollectHashes(
close(done) close(done)
}() }()
cursor, lastHash := server.processRequestInBundles(iter, bloom, limit, bundles) cursor, lastHash := server.processRequestInBundles(iter, bloom, limit, time.Minute, bundles, done)
<-done <-done