improve timeouts for x-check-mailserver
This commit is contained in:
parent
b0cbb6effc
commit
3d890ae63a
6
Makefile
6
Makefile
|
@ -14,11 +14,11 @@ image:
|
||||||
|
|
||||||
build: bin/pubchats bin/bench-mailserver bin/x-check-mailserver
|
build: bin/pubchats bin/bench-mailserver bin/x-check-mailserver
|
||||||
|
|
||||||
bin/pubchats:
|
pubchats:
|
||||||
go build -o ./bin/pubchats ./cmd/pubchats
|
go build -o ./bin/pubchats ./cmd/pubchats
|
||||||
bin/bench-mailserver:
|
bench-mailserver:
|
||||||
go build -o ./bin/bench-mailserver ./cmd/bench-mailserver
|
go build -o ./bin/bench-mailserver ./cmd/bench-mailserver
|
||||||
bin/x-check-mailserver:
|
x-check-mailserver:
|
||||||
go build -o ./bin/x-check-mailserver ./cmd/x-check-mailserver
|
go build -o ./bin/x-check-mailserver ./cmd/x-check-mailserver
|
||||||
|
|
||||||
clean:
|
clean:
|
||||||
|
|
|
@ -93,7 +93,7 @@ func main() {
|
||||||
work := NewWorkUnit(enode, &nodeConfig)
|
work := NewWorkUnit(enode, &nodeConfig)
|
||||||
go func(work *WorkUnit) {
|
go func(work *WorkUnit) {
|
||||||
if err := work.Execute(workConfig, mailSignalsForwarder); err != nil {
|
if err := work.Execute(workConfig, mailSignalsForwarder); err != nil {
|
||||||
log.Crit("failed to execute work", "err", err)
|
log.Crit("failed to execute work", "err", err, "enode", work.MailServerEnode)
|
||||||
}
|
}
|
||||||
wg.Done()
|
wg.Done()
|
||||||
}(work)
|
}(work)
|
||||||
|
|
|
@ -96,7 +96,7 @@ func (u *WorkUnit) Execute(config WorkUnitConfig, mailSignals *signalForwarder)
|
||||||
To: config.To,
|
To: config.To,
|
||||||
Limit: 1000,
|
Limit: 1000,
|
||||||
Topics: topics,
|
Topics: topics,
|
||||||
Timeout: 30,
|
Timeout: 15,
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("failed to request %s for messages: %v", u.MailServerEnode, err)
|
return fmt.Errorf("failed to request %s for messages: %v", u.MailServerEnode, err)
|
||||||
|
@ -107,6 +107,11 @@ func (u *WorkUnit) Execute(config WorkUnitConfig, mailSignals *signalForwarder)
|
||||||
signals, cancelSignalsFilter := mailSignals.Filter([]byte(reqID))
|
signals, cancelSignalsFilter := mailSignals.Filter([]byte(reqID))
|
||||||
defer cancelSignalsFilter()
|
defer cancelSignalsFilter()
|
||||||
|
|
||||||
|
timeout := time.NewTimer(16 * time.Second) // greater than request messages timeout
|
||||||
|
defer timeout.Stop()
|
||||||
|
|
||||||
|
start := time.Now()
|
||||||
|
|
||||||
var lastEnvelopeID []byte
|
var lastEnvelopeID []byte
|
||||||
|
|
||||||
for {
|
for {
|
||||||
|
@ -114,14 +119,14 @@ func (u *WorkUnit) Execute(config WorkUnitConfig, mailSignals *signalForwarder)
|
||||||
case m := <-messages:
|
case m := <-messages:
|
||||||
log.Debug("received a message", "hash", hex.EncodeToString(m.Hash))
|
log.Debug("received a message", "hash", hex.EncodeToString(m.Hash))
|
||||||
u.Messages = append(u.Messages, m)
|
u.Messages = append(u.Messages, m)
|
||||||
case <-time.After(time.Second * 5):
|
case <-timeout.C:
|
||||||
// As we can not predict when messages finish to come in,
|
// As we can not predict when messages finish to come in,
|
||||||
// we timeout after 5 seconds of silence.
|
// we timeout after some time.
|
||||||
// If lastEnvelopeID is found amoung received messages,
|
// If lastEnvelopeID is found amoung received messages,
|
||||||
// it's a successful request. Otherwise, an error is returned.
|
// it's a successful request. Otherwise, an error is returned.
|
||||||
for i, m := range u.Messages {
|
for i, m := range u.Messages {
|
||||||
if bytes.Equal(lastEnvelopeID, m.Hash) {
|
if bytes.Equal(lastEnvelopeID, m.Hash) {
|
||||||
log.Debug("received lastEnvelopeID",
|
log.Info("received lastEnvelopeID",
|
||||||
"hash", hex.EncodeToString(lastEnvelopeID),
|
"hash", hex.EncodeToString(lastEnvelopeID),
|
||||||
"index", i,
|
"index", i,
|
||||||
"messagesCount", len(u.Messages))
|
"messagesCount", len(u.Messages))
|
||||||
|
@ -135,6 +140,15 @@ func (u *WorkUnit) Execute(config WorkUnitConfig, mailSignals *signalForwarder)
|
||||||
switch s.Type {
|
switch s.Type {
|
||||||
case signal.EventMailServerRequestCompleted:
|
case signal.EventMailServerRequestCompleted:
|
||||||
lastEnvelopeID = s.LastEnvelopeID
|
lastEnvelopeID = s.LastEnvelopeID
|
||||||
|
|
||||||
|
log.Info("received EventMailServerRequestCompleted", "latency", time.Since(start), "enode", u.MailServerEnode)
|
||||||
|
|
||||||
|
// After receiving a request complete event,
|
||||||
|
// we reduce the timeout to 5 seconds.
|
||||||
|
if !timeout.Stop() {
|
||||||
|
<-timeout.C
|
||||||
|
}
|
||||||
|
timeout.Reset(time.Second * 5)
|
||||||
case signal.EventMailServerRequestExpired:
|
case signal.EventMailServerRequestExpired:
|
||||||
return fmt.Errorf("request for messages expired")
|
return fmt.Errorf("request for messages expired")
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue