From 998fbeb0c68848744fb04ff09f829f5192e7b2b9 Mon Sep 17 00:00:00 2001 From: Matt Joiner Date: Tue, 18 Mar 2014 22:39:33 +1100 Subject: [PATCH] Hunting for goroutine leaks Start implementing Client.Stop(). --- client.go | 25 ++++++++++++++++++++++--- fs/torrentfs.go | 13 +++++++------ fs/torrentfs_test.go | 40 +++++++++++++++++++++++++++++++++++++++- 3 files changed, 68 insertions(+), 10 deletions(-) diff --git a/client.go b/client.go index 83d0d6a0..077a5ac1 100644 --- a/client.go +++ b/client.go @@ -88,6 +88,7 @@ type Request struct { type Connection struct { Socket net.Conn + Closed bool post chan encoding.BinaryMarshaler write chan []byte @@ -106,8 +107,12 @@ type Connection struct { } func (c *Connection) Close() { + if c.Closed { + return + } c.Socket.Close() close(c.post) + c.Closed = true } func (c *Connection) PeerHasPiece(index peer_protocol.Integer) bool { @@ -417,6 +422,7 @@ type Client struct { sync.Mutex mu *sync.Mutex event sync.Cond + quit chan struct{} halfOpen int torrents map[InfoHash]*Torrent @@ -487,14 +493,30 @@ func (c *Client) Start() { if err != nil { panic("error generating peer id") } + c.quit = make(chan struct{}) if c.Listener != nil { go c.acceptConnections() } } +func (me *Client) Stop() { + close(me.quit) + me.event.Broadcast() + for _, t := range me.torrents { + for _, c := range t.Conns { + c.Close() + } + } +} + func (cl *Client) acceptConnections() { for { conn, err := cl.Listener.Accept() + select { + case <-cl.quit: + return + default: + } if err != nil { log.Print(err) return @@ -958,9 +980,6 @@ func (me *Client) WaitAll() { me.mu.Unlock() } -func (me *Client) Stop() { -} - func (me *Client) replenishConnRequests(torrent *Torrent, conn *Connection) { requestHeatMap := torrent.requestHeat() addRequest := func(req Request) (again bool) { diff --git a/fs/torrentfs.go b/fs/torrentfs.go index b6fcbe51..0d8ae09e 100644 --- a/fs/torrentfs.go +++ b/fs/torrentfs.go @@ -49,6 +49,8 @@ func (tfs *torrentFS) UnsubscribeData(ch chan torrent.DataSpec) { close(ch) } +var _ fusefs.NodeForgetter = rootNode{} + type rootNode struct { fs *torrentFS } @@ -242,19 +244,18 @@ func (rootNode) Attr() fuse.Attr { } } +func (rootNode) Forget() { +} + func (tfs *torrentFS) Root() (fusefs.Node, fuse.Error) { return rootNode{tfs}, nil } -func MountAndServe(dir string, cl *torrent.Client) error { - conn, err := fuse.Mount(dir) - if err != nil { - return err - } +func New(cl *torrent.Client) *torrentFS { fs := &torrentFS{ Client: cl, DataSubs: make(map[chan torrent.DataSpec]struct{}), } go fs.publishData() - return fusefs.Serve(conn, fs) + return fs } diff --git a/fs/torrentfs_test.go b/fs/torrentfs_test.go index 962d6e26..a1905c26 100644 --- a/fs/torrentfs_test.go +++ b/fs/torrentfs_test.go @@ -1,6 +1,8 @@ package torrentfs import ( + "bazil.org/fuse" + fusefs "bazil.org/fuse/fs" "bitbucket.org/anacrolix/go.torrent" "bytes" metainfo "github.com/nsf/libtorgo/torrent" @@ -9,6 +11,7 @@ import ( "net" "os" "path/filepath" + "runtime" "testing" ) @@ -34,6 +37,8 @@ func TestTCPAddrString(t *testing.T) { } } +const dummyFileContents = "hello, world\n" + func createDummyTorrentData(dirName string) string { f, _ := os.Create(filepath.Join(dirName, "greeting")) f.WriteString("hello, world\n") @@ -53,6 +58,13 @@ func createMetaInfo(name string, w io.Writer) { } func TestDownloadOnDemand(t *testing.T) { + priorNumGoroutines := runtime.NumGoroutine() + defer func() { + n := runtime.NumGoroutine() + if n != priorNumGoroutines { + t.Fatalf("expected %d goroutines, but %d are running", priorNumGoroutines, n) + } + }() dir, err := ioutil.TempDir("", "torrentfs") if err != nil { t.Fatal(err) @@ -79,13 +91,16 @@ func TestDownloadOnDemand(t *testing.T) { return conn }(), } + defer seeder.Listener.Close() seeder.Start() + defer seeder.Stop() seeder.AddTorrent(metaInfo) leecher := torrent.Client{ DataDir: filepath.Join(dir, "download"), DataReady: make(chan torrent.DataSpec), } leecher.Start() + defer leecher.Stop() leecher.AddTorrent(metaInfo) leecher.AddPeers(torrent.BytesInfoHash(metaInfo.InfoHash), []torrent.Peer{func() torrent.Peer { tcpAddr := seeder.Listener.Addr().(*net.TCPAddr) @@ -96,8 +111,31 @@ func TestDownloadOnDemand(t *testing.T) { }()}) mountDir := filepath.Join(dir, "mnt") os.Mkdir(mountDir, 0777) - err = MountAndServe(mountDir, &leecher) + fs := New(&leecher) + fuseConn, err := fuse.Mount(mountDir) if err != nil { t.Fatal(err) } + go func() { + if err := fusefs.Serve(fuseConn, fs); err != nil { + t.Fatal(err) + } + if err := fuseConn.Close(); err != nil { + t.Fatal(err) + } + }() + <-fuseConn.Ready + if fuseConn.MountError != nil { + t.Fatal(fuseConn.MountError) + } + content, err := ioutil.ReadFile(filepath.Join(mountDir, "greeting")) + if err != nil { + t.Fatal(err) + } + if err := fuse.Unmount(mountDir); err != nil { + t.Fatal(err) + } + if string(content) != dummyFileContents { + t.FailNow() + } }