From 3d890ae63ae4444074316e8e182cb67779b5fe03 Mon Sep 17 00:00:00 2001 From: Adam Babik Date: Fri, 7 Dec 2018 10:43:07 +0100 Subject: [PATCH] improve timeouts for x-check-mailserver --- Makefile | 6 +++--- cmd/x-check-mailserver/main.go | 2 +- cmd/x-check-mailserver/work.go | 22 ++++++++++++++++++---- 3 files changed, 22 insertions(+), 8 deletions(-) diff --git a/Makefile b/Makefile index a9310b0..fd3ac6d 100644 --- a/Makefile +++ b/Makefile @@ -14,11 +14,11 @@ image: build: bin/pubchats bin/bench-mailserver bin/x-check-mailserver -bin/pubchats: +pubchats: go build -o ./bin/pubchats ./cmd/pubchats -bin/bench-mailserver: +bench-mailserver: go build -o ./bin/bench-mailserver ./cmd/bench-mailserver -bin/x-check-mailserver: +x-check-mailserver: go build -o ./bin/x-check-mailserver ./cmd/x-check-mailserver clean: diff --git a/cmd/x-check-mailserver/main.go b/cmd/x-check-mailserver/main.go index 7573e85..ca2e9a9 100644 --- a/cmd/x-check-mailserver/main.go +++ b/cmd/x-check-mailserver/main.go @@ -93,7 +93,7 @@ func main() { work := NewWorkUnit(enode, &nodeConfig) go func(work *WorkUnit) { if err := work.Execute(workConfig, mailSignalsForwarder); err != nil { - log.Crit("failed to execute work", "err", err) + log.Crit("failed to execute work", "err", err, "enode", work.MailServerEnode) } wg.Done() }(work) diff --git a/cmd/x-check-mailserver/work.go b/cmd/x-check-mailserver/work.go index 825d225..7518ce3 100644 --- a/cmd/x-check-mailserver/work.go +++ b/cmd/x-check-mailserver/work.go @@ -96,7 +96,7 @@ func (u *WorkUnit) Execute(config WorkUnitConfig, mailSignals *signalForwarder) To: config.To, Limit: 1000, Topics: topics, - Timeout: 30, + Timeout: 15, }) if err != nil { return fmt.Errorf("failed to request %s for messages: %v", u.MailServerEnode, err) @@ -107,6 +107,11 @@ func (u *WorkUnit) Execute(config WorkUnitConfig, mailSignals *signalForwarder) signals, cancelSignalsFilter := mailSignals.Filter([]byte(reqID)) defer cancelSignalsFilter() + timeout := time.NewTimer(16 * time.Second) // greater than request messages timeout + defer timeout.Stop() + + start := time.Now() + var lastEnvelopeID []byte for { @@ -114,14 +119,14 @@ func (u *WorkUnit) Execute(config WorkUnitConfig, mailSignals *signalForwarder) case m := <-messages: log.Debug("received a message", "hash", hex.EncodeToString(m.Hash)) u.Messages = append(u.Messages, m) - case <-time.After(time.Second * 5): + case <-timeout.C: // As we can not predict when messages finish to come in, - // we timeout after 5 seconds of silence. + // we timeout after some time. // If lastEnvelopeID is found amoung received messages, // it's a successful request. Otherwise, an error is returned. for i, m := range u.Messages { if bytes.Equal(lastEnvelopeID, m.Hash) { - log.Debug("received lastEnvelopeID", + log.Info("received lastEnvelopeID", "hash", hex.EncodeToString(lastEnvelopeID), "index", i, "messagesCount", len(u.Messages)) @@ -135,6 +140,15 @@ func (u *WorkUnit) Execute(config WorkUnitConfig, mailSignals *signalForwarder) switch s.Type { case signal.EventMailServerRequestCompleted: lastEnvelopeID = s.LastEnvelopeID + + log.Info("received EventMailServerRequestCompleted", "latency", time.Since(start), "enode", u.MailServerEnode) + + // After receiving a request complete event, + // we reduce the timeout to 5 seconds. + if !timeout.Stop() { + <-timeout.C + } + timeout.Reset(time.Second * 5) case signal.EventMailServerRequestExpired: return fmt.Errorf("request for messages expired") }