Remove conntrack, expose Torrent.AnnounceToDht, ClientConfig.PeriodicallyAnnounceTorrentsToDht

This commit is contained in:
Matt Joiner 2021-06-18 11:05:23 +10:00
parent ebd19af795
commit af1ca91e04
3 changed files with 39 additions and 51 deletions

View File

@ -22,7 +22,6 @@ import (
"github.com/anacrolix/missinggo/slices" "github.com/anacrolix/missinggo/slices"
"github.com/anacrolix/missinggo/v2" "github.com/anacrolix/missinggo/v2"
"github.com/anacrolix/missinggo/v2/bitmap" "github.com/anacrolix/missinggo/v2/bitmap"
"github.com/anacrolix/missinggo/v2/conntrack"
"github.com/anacrolix/missinggo/v2/pproffd" "github.com/anacrolix/missinggo/v2/pproffd"
"github.com/anacrolix/sync" "github.com/anacrolix/sync"
"github.com/davecgh/go-spew/spew" "github.com/davecgh/go-spew/spew"
@ -257,7 +256,7 @@ func NewClient(cfg *ClientConfig) (cl *Client, err error) {
if !cfg.NoDHT { if !cfg.NoDHT {
for _, s := range sockets { for _, s := range sockets {
if pc, ok := s.(net.PacketConn); ok { if pc, ok := s.(net.PacketConn); ok {
ds, err := cl.newAnacrolixDhtServer(pc) ds, err := cl.NewAnacrolixDhtServer(pc)
if err != nil { if err != nil {
panic(err) panic(err)
} }
@ -363,7 +362,8 @@ func (cl *Client) listenNetworks() (ns []network) {
return return
} }
func (cl *Client) newAnacrolixDhtServer(conn net.PacketConn) (s *dht.Server, err error) { // Creates an anacrolix/dht Server, as would be done internally in NewClient, for the given conn.
func (cl *Client) NewAnacrolixDhtServer(conn net.PacketConn) (s *dht.Server, err error) {
cfg := dht.ServerConfig{ cfg := dht.ServerConfig{
IPBlocklist: cl.ipBlockList, IPBlocklist: cl.ipBlockList,
Conn: conn, Conn: conn,
@ -375,7 +375,6 @@ func (cl *Client) newAnacrolixDhtServer(conn net.PacketConn) (s *dht.Server, err
return cl.config.PublicIp4 return cl.config.PublicIp4
}(), }(),
StartingNodes: cl.config.DhtStartingNodes(conn.LocalAddr().Network()), StartingNodes: cl.config.DhtStartingNodes(conn.LocalAddr().Network()),
ConnectionTracking: cl.config.ConnTracker,
OnQuery: cl.config.DHTOnQuery, OnQuery: cl.config.DHTOnQuery,
Logger: cl.logger.WithContextText(fmt.Sprintf("dht server on %v", conn.LocalAddr().String())), Logger: cl.logger.WithContextText(fmt.Sprintf("dht server on %v", conn.LocalAddr().String())),
} }
@ -639,21 +638,6 @@ func (cl *Client) dialFirst(ctx context.Context, addr string) (res dialResult) {
} }
func (cl *Client) dialFromSocket(ctx context.Context, s Dialer, addr string) net.Conn { func (cl *Client) dialFromSocket(ctx context.Context, s Dialer, addr string) net.Conn {
network := s.LocalAddr().Network()
cte := cl.config.ConnTracker.Wait(
ctx,
conntrack.Entry{network, s.LocalAddr().String(), addr},
"dial torrent client",
0,
)
// Try to avoid committing to a dial if the context is complete as it's difficult to determine
// which dial errors allow us to forget the connection tracking entry handle.
if ctx.Err() != nil {
if cte != nil {
cte.Forget()
}
return nil
}
c, err := s.Dial(ctx, addr) c, err := s.Dial(ctx, addr)
// This is a bit optimistic, but it looks non-trivial to thread this through the proxy code. Set // This is a bit optimistic, but it looks non-trivial to thread this through the proxy code. Set
// it now in case we close the connection forthwith. // it now in case we close the connection forthwith.
@ -661,19 +645,7 @@ func (cl *Client) dialFromSocket(ctx context.Context, s Dialer, addr string) net
tc.SetLinger(0) tc.SetLinger(0)
} }
countDialResult(err) countDialResult(err)
if c == nil { return c
if err != nil && forgettableDialError(err) {
cte.Forget()
} else {
cte.Done()
}
return nil
}
return closeWrapper{c, func() error {
err := c.Close()
cte.Done()
return err
}}
} }
func forgettableDialError(err error) bool { func forgettableDialError(err error) bool {
@ -1180,7 +1152,9 @@ func (cl *Client) AddTorrentInfoHashWithStorage(infoHash metainfo.Hash, specStor
t = cl.newTorrent(infoHash, specStorage) t = cl.newTorrent(infoHash, specStorage)
cl.eachDhtServer(func(s DhtServer) { cl.eachDhtServer(func(s DhtServer) {
if cl.config.PeriodicallyAnnounceTorrentsToDht {
go t.dhtAnnouncer(s) go t.dhtAnnouncer(s)
}
}) })
cl.torrents[infoHash] = t cl.torrents[infoHash] = t
cl.clearAcceptLimits() cl.clearAcceptLimits()

View File

@ -11,7 +11,6 @@ import (
"github.com/anacrolix/log" "github.com/anacrolix/log"
"github.com/anacrolix/missinggo" "github.com/anacrolix/missinggo"
"github.com/anacrolix/missinggo/expect" "github.com/anacrolix/missinggo/expect"
"github.com/anacrolix/missinggo/v2/conntrack"
"golang.org/x/time/rate" "golang.org/x/time/rate"
"github.com/anacrolix/torrent/iplist" "github.com/anacrolix/torrent/iplist"
@ -39,6 +38,7 @@ type ClientConfig struct {
DhtStartingNodes func(network string) dht.StartingNodesGetter DhtStartingNodes func(network string) dht.StartingNodesGetter
// Called for each anacrolix/dht Server created for the Client. // Called for each anacrolix/dht Server created for the Client.
ConfigureAnacrolixDhtServer func(*dht.ServerConfig) ConfigureAnacrolixDhtServer func(*dht.ServerConfig)
PeriodicallyAnnounceTorrentsToDht bool
// Never send chunks to peers. // Never send chunks to peers.
NoUpload bool `long:"no-upload"` NoUpload bool `long:"no-upload"`
@ -134,8 +134,6 @@ type ClientConfig struct {
// we don't intend to obtain all of a torrent's data. // we don't intend to obtain all of a torrent's data.
DropMutuallyCompletePeers bool DropMutuallyCompletePeers bool
ConnTracker *conntrack.Instance
// OnQuery hook func // OnQuery hook func
DHTOnQuery func(query *krpc.Msg, source net.Addr) (propagate bool) DHTOnQuery func(query *krpc.Msg, source net.Addr) (propagate bool)
@ -172,10 +170,10 @@ func NewDefaultClientConfig() *ClientConfig {
DhtStartingNodes: func(network string) dht.StartingNodesGetter { DhtStartingNodes: func(network string) dht.StartingNodesGetter {
return func() ([]dht.Addr, error) { return dht.GlobalBootstrapAddrs(network) } return func() ([]dht.Addr, error) { return dht.GlobalBootstrapAddrs(network) }
}, },
PeriodicallyAnnounceTorrentsToDht: true,
ListenHost: func(string) string { return "" }, ListenHost: func(string) string { return "" },
UploadRateLimiter: unlimited, UploadRateLimiter: unlimited,
DownloadRateLimiter: unlimited, DownloadRateLimiter: unlimited,
ConnTracker: conntrack.NewInstance(),
DisableAcceptRateLimiting: true, DisableAcceptRateLimiting: true,
DropMutuallyCompletePeers: true, DropMutuallyCompletePeers: true,
HeaderObfuscationPolicy: HeaderObfuscationPolicy{ HeaderObfuscationPolicy: HeaderObfuscationPolicy{

View File

@ -1643,17 +1643,33 @@ func (t *Torrent) consumeDhtAnnouncePeers(pvs <-chan dht.PeersValues) {
} }
} }
func (t *Torrent) announceToDht(impliedPort bool, s DhtServer) error { // Announce using the provided DHT server. Peers are consumed automatically. done is closed when the
ps, err := s.Announce(t.infoHash, t.cl.incomingPeerPort(), impliedPort) // announce ends. stop will force the announce to end.
func (t *Torrent) AnnounceToDht(s DhtServer) (done <-chan struct{}, stop func(), err error) {
ps, err := s.Announce(t.infoHash, t.cl.incomingPeerPort(), true)
if err != nil {
return
}
_done := make(chan struct{})
done = _done
stop = ps.Close
go func() {
t.consumeDhtAnnouncePeers(ps.Peers())
close(_done)
}()
return
}
func (t *Torrent) announceToDht(s DhtServer) error {
_, stop, err := t.AnnounceToDht(s)
if err != nil { if err != nil {
return err return err
} }
go t.consumeDhtAnnouncePeers(ps.Peers())
select { select {
case <-t.closed.LockedChan(t.cl.locker()): case <-t.closed.LockedChan(t.cl.locker()):
case <-time.After(5 * time.Minute): case <-time.After(5 * time.Minute):
} }
ps.Close() stop()
return nil return nil
} }
@ -1681,7 +1697,7 @@ func (t *Torrent) dhtAnnouncer(s DhtServer) {
t.numDHTAnnounces++ t.numDHTAnnounces++
cl.unlock() cl.unlock()
defer cl.lock() defer cl.lock()
err := t.announceToDht(true, s) err := t.announceToDht(s)
if err != nil { if err != nil {
t.logger.WithDefaultLevel(log.Warning).Printf("error announcing %q to DHT: %s", t, err) t.logger.WithDefaultLevel(log.Warning).Printf("error announcing %q to DHT: %s", t, err)
} }