Stop download of stickers on canceling context (#2699)
This commit is contained in:
parent
c3b0582cc9
commit
869942c05e
40
ipfs/ipfs.go
40
ipfs/ipfs.go
|
@ -1,6 +1,7 @@
|
||||||
package ipfs
|
package ipfs
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"context"
|
||||||
"errors"
|
"errors"
|
||||||
"io/ioutil"
|
"io/ioutil"
|
||||||
"net/http"
|
"net/http"
|
||||||
|
@ -32,6 +33,8 @@ type taskRequest struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
type Downloader struct {
|
type Downloader struct {
|
||||||
|
ctx context.Context
|
||||||
|
cancel func()
|
||||||
ipfsDir string
|
ipfsDir string
|
||||||
wg sync.WaitGroup
|
wg sync.WaitGroup
|
||||||
rateLimiterChan chan taskRequest
|
rateLimiterChan chan taskRequest
|
||||||
|
@ -47,7 +50,11 @@ func NewDownloader(rootDir string) *Downloader {
|
||||||
panic("could not create IPFSDir")
|
panic("could not create IPFSDir")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
ctx, cancel := context.WithCancel(context.TODO())
|
||||||
|
|
||||||
d := &Downloader{
|
d := &Downloader{
|
||||||
|
ctx: ctx,
|
||||||
|
cancel: cancel,
|
||||||
ipfsDir: ipfsDir,
|
ipfsDir: ipfsDir,
|
||||||
rateLimiterChan: make(chan taskRequest, maxRequestsPerSecond),
|
rateLimiterChan: make(chan taskRequest, maxRequestsPerSecond),
|
||||||
inputTaskChan: make(chan taskRequest, 1000),
|
inputTaskChan: make(chan taskRequest, 1000),
|
||||||
|
@ -68,6 +75,8 @@ func NewDownloader(rootDir string) *Downloader {
|
||||||
func (d *Downloader) Stop() {
|
func (d *Downloader) Stop() {
|
||||||
close(d.quit)
|
close(d.quit)
|
||||||
|
|
||||||
|
d.cancel()
|
||||||
|
|
||||||
d.wg.Wait()
|
d.wg.Wait()
|
||||||
|
|
||||||
close(d.inputTaskChan)
|
close(d.inputTaskChan)
|
||||||
|
@ -75,16 +84,11 @@ func (d *Downloader) Stop() {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (d *Downloader) worker() {
|
func (d *Downloader) worker() {
|
||||||
for {
|
for request := range d.rateLimiterChan {
|
||||||
select {
|
resp, err := d.download(request.cid, request.download)
|
||||||
case <-d.quit:
|
request.doneChan <- taskResponse{
|
||||||
return
|
err: err,
|
||||||
case request := <-d.rateLimiterChan:
|
response: resp,
|
||||||
resp, err := d.download(request.cid, request.download)
|
|
||||||
request.doneChan <- taskResponse{
|
|
||||||
err: err,
|
|
||||||
response: resp,
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -94,17 +98,13 @@ func (d *Downloader) taskDispatcher() {
|
||||||
defer ticker.Stop()
|
defer ticker.Stop()
|
||||||
|
|
||||||
for {
|
for {
|
||||||
select {
|
<-ticker.C
|
||||||
case <-d.quit:
|
request, ok := <-d.inputTaskChan
|
||||||
|
if !ok {
|
||||||
return
|
return
|
||||||
case <-ticker.C:
|
|
||||||
request, ok := <-d.inputTaskChan
|
|
||||||
if !ok {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
d.rateLimiterChan <- request
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
d.rateLimiterChan <- request
|
||||||
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -203,6 +203,8 @@ func (d *Downloader) download(cid string, download bool) ([]byte, error) {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
req = req.WithContext(d.ctx)
|
||||||
|
|
||||||
resp, err := d.client.Do(req)
|
resp, err := d.client.Do(req)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
|
|
Loading…
Reference in New Issue