dht: Timeout goroutines waiting for transaction responses
This commit is contained in:
parent
b467f15bae
commit
99bec9ac58
27
dht/dht.go
27
dht/dht.go
@ -78,9 +78,16 @@ type transaction struct {
|
||||
t string
|
||||
Response chan Msg
|
||||
onResponse func(Msg)
|
||||
done chan struct{}
|
||||
}
|
||||
|
||||
func (t *transaction) timeout() {
|
||||
close(t.Response)
|
||||
close(t.done)
|
||||
}
|
||||
|
||||
func (t *transaction) handleResponse(m Msg) {
|
||||
close(t.done)
|
||||
if t.onResponse != nil {
|
||||
t.onResponse(m)
|
||||
}
|
||||
@ -337,6 +344,23 @@ func (s *Server) IDString() string {
|
||||
return s.ID
|
||||
}
|
||||
|
||||
func (s *Server) timeoutTransaction(t *transaction) {
|
||||
select {
|
||||
case <-t.done:
|
||||
return
|
||||
case <-time.After(time.Minute):
|
||||
}
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
select {
|
||||
case <-t.done:
|
||||
return
|
||||
default:
|
||||
}
|
||||
t.timeout()
|
||||
s.removeTransaction(t)
|
||||
}
|
||||
|
||||
func (s *Server) query(node *net.UDPAddr, q string, a map[string]string) (t *transaction, err error) {
|
||||
tid := s.nextTransactionID()
|
||||
if a == nil {
|
||||
@ -357,12 +381,15 @@ func (s *Server) query(node *net.UDPAddr, q string, a map[string]string) (t *tra
|
||||
remoteAddr: node,
|
||||
t: tid,
|
||||
Response: make(chan Msg, 1),
|
||||
done: make(chan struct{}),
|
||||
}
|
||||
s.addTransaction(t)
|
||||
err = s.writeToNode(b, node)
|
||||
if err != nil {
|
||||
s.removeTransaction(t)
|
||||
return
|
||||
}
|
||||
go s.timeoutTransaction(t)
|
||||
return
|
||||
}
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user