2
0
mirror of synced 2025-02-24 06:38:14 +00:00

Hunting for goroutine leaks

Start implementing Client.Stop().
This commit is contained in:
Matt Joiner 2014-03-18 22:39:33 +11:00
parent 2e81f914e7
commit 998fbeb0c6
3 changed files with 68 additions and 10 deletions

View File

@ -88,6 +88,7 @@ type Request struct {
type Connection struct { type Connection struct {
Socket net.Conn Socket net.Conn
Closed bool
post chan encoding.BinaryMarshaler post chan encoding.BinaryMarshaler
write chan []byte write chan []byte
@ -106,8 +107,12 @@ type Connection struct {
} }
func (c *Connection) Close() { func (c *Connection) Close() {
if c.Closed {
return
}
c.Socket.Close() c.Socket.Close()
close(c.post) close(c.post)
c.Closed = true
} }
func (c *Connection) PeerHasPiece(index peer_protocol.Integer) bool { func (c *Connection) PeerHasPiece(index peer_protocol.Integer) bool {
@ -417,6 +422,7 @@ type Client struct {
sync.Mutex sync.Mutex
mu *sync.Mutex mu *sync.Mutex
event sync.Cond event sync.Cond
quit chan struct{}
halfOpen int halfOpen int
torrents map[InfoHash]*Torrent torrents map[InfoHash]*Torrent
@ -487,14 +493,30 @@ func (c *Client) Start() {
if err != nil { if err != nil {
panic("error generating peer id") panic("error generating peer id")
} }
c.quit = make(chan struct{})
if c.Listener != nil { if c.Listener != nil {
go c.acceptConnections() go c.acceptConnections()
} }
} }
func (me *Client) Stop() {
close(me.quit)
me.event.Broadcast()
for _, t := range me.torrents {
for _, c := range t.Conns {
c.Close()
}
}
}
func (cl *Client) acceptConnections() { func (cl *Client) acceptConnections() {
for { for {
conn, err := cl.Listener.Accept() conn, err := cl.Listener.Accept()
select {
case <-cl.quit:
return
default:
}
if err != nil { if err != nil {
log.Print(err) log.Print(err)
return return
@ -958,9 +980,6 @@ func (me *Client) WaitAll() {
me.mu.Unlock() me.mu.Unlock()
} }
func (me *Client) Stop() {
}
func (me *Client) replenishConnRequests(torrent *Torrent, conn *Connection) { func (me *Client) replenishConnRequests(torrent *Torrent, conn *Connection) {
requestHeatMap := torrent.requestHeat() requestHeatMap := torrent.requestHeat()
addRequest := func(req Request) (again bool) { addRequest := func(req Request) (again bool) {

View File

@ -49,6 +49,8 @@ func (tfs *torrentFS) UnsubscribeData(ch chan torrent.DataSpec) {
close(ch) close(ch)
} }
var _ fusefs.NodeForgetter = rootNode{}
type rootNode struct { type rootNode struct {
fs *torrentFS fs *torrentFS
} }
@ -242,19 +244,18 @@ func (rootNode) Attr() fuse.Attr {
} }
} }
func (rootNode) Forget() {
}
func (tfs *torrentFS) Root() (fusefs.Node, fuse.Error) { func (tfs *torrentFS) Root() (fusefs.Node, fuse.Error) {
return rootNode{tfs}, nil return rootNode{tfs}, nil
} }
func MountAndServe(dir string, cl *torrent.Client) error { func New(cl *torrent.Client) *torrentFS {
conn, err := fuse.Mount(dir)
if err != nil {
return err
}
fs := &torrentFS{ fs := &torrentFS{
Client: cl, Client: cl,
DataSubs: make(map[chan torrent.DataSpec]struct{}), DataSubs: make(map[chan torrent.DataSpec]struct{}),
} }
go fs.publishData() go fs.publishData()
return fusefs.Serve(conn, fs) return fs
} }

View File

@ -1,6 +1,8 @@
package torrentfs package torrentfs
import ( import (
"bazil.org/fuse"
fusefs "bazil.org/fuse/fs"
"bitbucket.org/anacrolix/go.torrent" "bitbucket.org/anacrolix/go.torrent"
"bytes" "bytes"
metainfo "github.com/nsf/libtorgo/torrent" metainfo "github.com/nsf/libtorgo/torrent"
@ -9,6 +11,7 @@ import (
"net" "net"
"os" "os"
"path/filepath" "path/filepath"
"runtime"
"testing" "testing"
) )
@ -34,6 +37,8 @@ func TestTCPAddrString(t *testing.T) {
} }
} }
const dummyFileContents = "hello, world\n"
func createDummyTorrentData(dirName string) string { func createDummyTorrentData(dirName string) string {
f, _ := os.Create(filepath.Join(dirName, "greeting")) f, _ := os.Create(filepath.Join(dirName, "greeting"))
f.WriteString("hello, world\n") f.WriteString("hello, world\n")
@ -53,6 +58,13 @@ func createMetaInfo(name string, w io.Writer) {
} }
func TestDownloadOnDemand(t *testing.T) { func TestDownloadOnDemand(t *testing.T) {
priorNumGoroutines := runtime.NumGoroutine()
defer func() {
n := runtime.NumGoroutine()
if n != priorNumGoroutines {
t.Fatalf("expected %d goroutines, but %d are running", priorNumGoroutines, n)
}
}()
dir, err := ioutil.TempDir("", "torrentfs") dir, err := ioutil.TempDir("", "torrentfs")
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
@ -79,13 +91,16 @@ func TestDownloadOnDemand(t *testing.T) {
return conn return conn
}(), }(),
} }
defer seeder.Listener.Close()
seeder.Start() seeder.Start()
defer seeder.Stop()
seeder.AddTorrent(metaInfo) seeder.AddTorrent(metaInfo)
leecher := torrent.Client{ leecher := torrent.Client{
DataDir: filepath.Join(dir, "download"), DataDir: filepath.Join(dir, "download"),
DataReady: make(chan torrent.DataSpec), DataReady: make(chan torrent.DataSpec),
} }
leecher.Start() leecher.Start()
defer leecher.Stop()
leecher.AddTorrent(metaInfo) leecher.AddTorrent(metaInfo)
leecher.AddPeers(torrent.BytesInfoHash(metaInfo.InfoHash), []torrent.Peer{func() torrent.Peer { leecher.AddPeers(torrent.BytesInfoHash(metaInfo.InfoHash), []torrent.Peer{func() torrent.Peer {
tcpAddr := seeder.Listener.Addr().(*net.TCPAddr) tcpAddr := seeder.Listener.Addr().(*net.TCPAddr)
@ -96,8 +111,31 @@ func TestDownloadOnDemand(t *testing.T) {
}()}) }()})
mountDir := filepath.Join(dir, "mnt") mountDir := filepath.Join(dir, "mnt")
os.Mkdir(mountDir, 0777) os.Mkdir(mountDir, 0777)
err = MountAndServe(mountDir, &leecher) fs := New(&leecher)
fuseConn, err := fuse.Mount(mountDir)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
go func() {
if err := fusefs.Serve(fuseConn, fs); err != nil {
t.Fatal(err)
}
if err := fuseConn.Close(); err != nil {
t.Fatal(err)
}
}()
<-fuseConn.Ready
if fuseConn.MountError != nil {
t.Fatal(fuseConn.MountError)
}
content, err := ioutil.ReadFile(filepath.Join(mountDir, "greeting"))
if err != nil {
t.Fatal(err)
}
if err := fuse.Unmount(mountDir); err != nil {
t.Fatal(err)
}
if string(content) != dummyFileContents {
t.FailNow()
}
} }