x-check-mailserver: fix a case when a channel has no messages (#11)
This commit is contained in:
parent
eb85bd494a
commit
3481054c74
|
@ -89,6 +89,8 @@ func (u *WorkUnit) Execute(config WorkUnitConfig, mailSignals *signalForwarder)
|
||||||
messageSubErrs = append(messageSubErrs, sub.Err())
|
messageSubErrs = append(messageSubErrs, sub.Err())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TODO: sshext.MessagesRequest expects time.Duration but multiplies it by time.Second
|
||||||
|
reqTimeout := time.Duration(15)
|
||||||
reqID, err := u.shhextAPI.RequestMessages(nil, shhext.MessagesRequest{
|
reqID, err := u.shhextAPI.RequestMessages(nil, shhext.MessagesRequest{
|
||||||
MailServerPeer: u.MailServerEnode,
|
MailServerPeer: u.MailServerEnode,
|
||||||
SymKeyID: mailServerSymKeyID,
|
SymKeyID: mailServerSymKeyID,
|
||||||
|
@ -96,7 +98,7 @@ func (u *WorkUnit) Execute(config WorkUnitConfig, mailSignals *signalForwarder)
|
||||||
To: config.To,
|
To: config.To,
|
||||||
Limit: 1000,
|
Limit: 1000,
|
||||||
Topics: topics,
|
Topics: topics,
|
||||||
Timeout: 15,
|
Timeout: reqTimeout,
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("failed to request %s for messages: %v", u.MailServerEnode, err)
|
return fmt.Errorf("failed to request %s for messages: %v", u.MailServerEnode, err)
|
||||||
|
@ -107,9 +109,6 @@ func (u *WorkUnit) Execute(config WorkUnitConfig, mailSignals *signalForwarder)
|
||||||
signals, cancelSignalsFilter := mailSignals.Filter([]byte(reqID))
|
signals, cancelSignalsFilter := mailSignals.Filter([]byte(reqID))
|
||||||
defer cancelSignalsFilter()
|
defer cancelSignalsFilter()
|
||||||
|
|
||||||
timeout := time.NewTimer(16 * time.Second) // greater than request messages timeout
|
|
||||||
defer timeout.Stop()
|
|
||||||
|
|
||||||
start := time.Now()
|
start := time.Now()
|
||||||
|
|
||||||
var lastEnvelopeID []byte
|
var lastEnvelopeID []byte
|
||||||
|
@ -119,7 +118,7 @@ func (u *WorkUnit) Execute(config WorkUnitConfig, mailSignals *signalForwarder)
|
||||||
case m := <-messages:
|
case m := <-messages:
|
||||||
log.Debug("received a message", "hash", hex.EncodeToString(m.Hash))
|
log.Debug("received a message", "hash", hex.EncodeToString(m.Hash))
|
||||||
u.Messages = append(u.Messages, m)
|
u.Messages = append(u.Messages, m)
|
||||||
case <-timeout.C:
|
case <-time.After(time.Duration(reqTimeout) * time.Second):
|
||||||
// As we can not predict when messages finish to come in,
|
// As we can not predict when messages finish to come in,
|
||||||
// we timeout after some time.
|
// we timeout after some time.
|
||||||
// If lastEnvelopeID is found amoung received messages,
|
// If lastEnvelopeID is found amoung received messages,
|
||||||
|
@ -141,14 +140,12 @@ func (u *WorkUnit) Execute(config WorkUnitConfig, mailSignals *signalForwarder)
|
||||||
case signal.EventMailServerRequestCompleted:
|
case signal.EventMailServerRequestCompleted:
|
||||||
lastEnvelopeID = s.LastEnvelopeID
|
lastEnvelopeID = s.LastEnvelopeID
|
||||||
|
|
||||||
log.Info("received EventMailServerRequestCompleted", "latency", time.Since(start), "enode", u.MailServerEnode)
|
log.Info("received EventMailServerRequestCompleted", "latency", time.Since(start), "enode", u.MailServerEnode, "lastEnvelopeID", lastEnvelopeID)
|
||||||
|
|
||||||
// After receiving a request complete event,
|
if allZeros(lastEnvelopeID) {
|
||||||
// we reduce the timeout to 5 seconds.
|
log.Info("lastEnvelopeID is empty so return early")
|
||||||
if !timeout.Stop() {
|
return u.stopNode()
|
||||||
<-timeout.C
|
|
||||||
}
|
}
|
||||||
timeout.Reset(time.Second * 5)
|
|
||||||
case signal.EventMailServerRequestExpired:
|
case signal.EventMailServerRequestExpired:
|
||||||
return fmt.Errorf("request for messages expired")
|
return fmt.Errorf("request for messages expired")
|
||||||
}
|
}
|
||||||
|
@ -193,3 +190,13 @@ func (u *WorkUnit) addPeer() error {
|
||||||
5*time.Second,
|
5*time.Second,
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func allZeros(b []byte) bool {
|
||||||
|
zero := byte(0)
|
||||||
|
for _, n := range b {
|
||||||
|
if n != zero {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
Loading…
Reference in New Issue