Default limit & upper bound (#1411)
We allow the user not to specify an upper bound, in which case it will default to Time.Now() + 60 seconds, to accomodate for messages with higher timestamp. Limit is also defaulted to 2000 if not set. We removed as well the 24 hours limitation as now all the requests will come with a limit.
This commit is contained in:
parent
5bce7e793f
commit
b574e3e53f
|
@ -40,7 +40,10 @@ import (
|
||||||
|
|
||||||
const (
|
const (
|
||||||
maxQueryRange = 24 * time.Hour
|
maxQueryRange = 24 * time.Hour
|
||||||
noLimits = 0
|
defaultLimit = 2000
|
||||||
|
// When we default the upper limit, we want to extend the range a bit
|
||||||
|
// to accommodate for envelopes with slightly higher timestamp, in seconds
|
||||||
|
whisperTTLSafeThreshold = 60
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
|
@ -265,6 +268,10 @@ func (s *WMailServer) DeliverMail(peer *whisper.Peer, request *whisper.Envelope)
|
||||||
lower, upper, bloom, limit, cursor, err = s.validateRequest(peer.ID(), request)
|
lower, upper, bloom, limit, cursor, err = s.validateRequest(peer.ID(), request)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if limit == 0 {
|
||||||
|
limit = defaultLimit
|
||||||
|
}
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
requestValidationErrorsCounter.Inc(1)
|
requestValidationErrorsCounter.Inc(1)
|
||||||
log.Error("[mailserver:DeliverMail] request failed validaton",
|
log.Error("[mailserver:DeliverMail] request failed validaton",
|
||||||
|
@ -535,7 +542,7 @@ func (s *WMailServer) processRequestInBundles(
|
||||||
lastEnvelopeHash = envelope.Hash()
|
lastEnvelopeHash = envelope.Hash()
|
||||||
processedEnvelopes++
|
processedEnvelopes++
|
||||||
envelopeSize := whisper.EnvelopeHeaderLength + uint32(len(envelope.Data))
|
envelopeSize := whisper.EnvelopeHeaderLength + uint32(len(envelope.Data))
|
||||||
limitReached := limit != noLimits && processedEnvelopes == limit
|
limitReached := processedEnvelopes == limit
|
||||||
newSize := bundleSize + envelopeSize
|
newSize := bundleSize + envelopeSize
|
||||||
|
|
||||||
// If we still have some room for messages, add and continue
|
// If we still have some room for messages, add and continue
|
||||||
|
@ -676,18 +683,15 @@ func (s *WMailServer) decodeRequest(peerID []byte, request *whisper.Envelope) (M
|
||||||
return payload, fmt.Errorf("failed to decode data: %v", err)
|
return payload, fmt.Errorf("failed to decode data: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if payload.Upper == 0 {
|
||||||
|
payload.Upper = uint32(time.Now().Unix() + whisperTTLSafeThreshold)
|
||||||
|
}
|
||||||
|
|
||||||
if payload.Upper < payload.Lower {
|
if payload.Upper < payload.Lower {
|
||||||
log.Error("Query range is invalid: lower > upper", "lower", payload.Lower, "upper", payload.Upper)
|
log.Error("Query range is invalid: lower > upper", "lower", payload.Lower, "upper", payload.Upper)
|
||||||
return payload, errors.New("query range is invalid: lower > upper")
|
return payload, errors.New("query range is invalid: lower > upper")
|
||||||
}
|
}
|
||||||
|
|
||||||
lowerTime := time.Unix(int64(payload.Lower), 0)
|
|
||||||
upperTime := time.Unix(int64(payload.Upper), 0)
|
|
||||||
if upperTime.Sub(lowerTime) > maxQueryRange {
|
|
||||||
log.Warn("Query range too long", "peerID", peerIDBytesString(peerID), "length", upperTime.Sub(lowerTime), "max", maxQueryRange)
|
|
||||||
return payload, fmt.Errorf("query range must be shorted than %d", maxQueryRange)
|
|
||||||
}
|
|
||||||
|
|
||||||
return payload, nil
|
return payload, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -467,6 +467,32 @@ func (s *MailserverSuite) TestDecodeRequest() {
|
||||||
s.Equal(payload, decodedPayload)
|
s.Equal(payload, decodedPayload)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s *MailserverSuite) TestDecodeRequestNoUpper() {
|
||||||
|
s.setupServer(s.server)
|
||||||
|
defer s.server.Close()
|
||||||
|
|
||||||
|
payload := MessagesRequestPayload{
|
||||||
|
Lower: 50,
|
||||||
|
Bloom: []byte{0x01},
|
||||||
|
Limit: 10,
|
||||||
|
Cursor: []byte{},
|
||||||
|
Batch: true,
|
||||||
|
}
|
||||||
|
data, err := rlp.EncodeToBytes(payload)
|
||||||
|
s.Require().NoError(err)
|
||||||
|
|
||||||
|
id, err := s.shh.NewKeyPair()
|
||||||
|
s.Require().NoError(err)
|
||||||
|
srcKey, err := s.shh.GetPrivateKey(id)
|
||||||
|
s.Require().NoError(err)
|
||||||
|
|
||||||
|
env := s.createEnvelope(whisper.TopicType{0x01}, data, srcKey)
|
||||||
|
|
||||||
|
decodedPayload, err := s.server.decodeRequest(nil, env)
|
||||||
|
s.Require().NoError(err)
|
||||||
|
s.NotEqual(0, decodedPayload.Upper)
|
||||||
|
}
|
||||||
|
|
||||||
func (s *MailserverSuite) TestProcessRequestDeadlockHandling() {
|
func (s *MailserverSuite) TestProcessRequestDeadlockHandling() {
|
||||||
s.setupServer(s.server)
|
s.setupServer(s.server)
|
||||||
defer s.server.Close()
|
defer s.server.Close()
|
||||||
|
|
Loading…
Reference in New Issue