Add initial connection tracking

This commit is contained in:
Matt Joiner 2018-11-21 17:02:22 +11:00
parent 41a9344a58
commit f4e30656d6
3 changed files with 33 additions and 2 deletions

View File

@ -21,6 +21,7 @@ import (
"github.com/anacrolix/log" "github.com/anacrolix/log"
"github.com/anacrolix/missinggo" "github.com/anacrolix/missinggo"
"github.com/anacrolix/missinggo/bitmap" "github.com/anacrolix/missinggo/bitmap"
"github.com/anacrolix/missinggo/conntrack"
"github.com/anacrolix/missinggo/perf" "github.com/anacrolix/missinggo/perf"
"github.com/anacrolix/missinggo/pproffd" "github.com/anacrolix/missinggo/pproffd"
"github.com/anacrolix/missinggo/pubsub" "github.com/anacrolix/missinggo/pubsub"
@ -292,6 +293,7 @@ func (cl *Client) newDhtServer(conn net.PacketConn) (s *dht.Server, err error) {
return cl.config.PublicIp4 return cl.config.PublicIp4
}(), }(),
StartingNodes: cl.config.DhtStartingNodes, StartingNodes: cl.config.DhtStartingNodes,
ConnectionTracking: cl.config.ConnTracker,
} }
s, err = dht.NewServer(&cfg) s, err = dht.NewServer(&cfg)
if err == nil { if err == nil {
@ -538,6 +540,9 @@ func (cl *Client) dialFirst(ctx context.Context, addr string) dialResult {
if peerNetworkEnabled(network, cl.config) { if peerNetworkEnabled(network, cl.config) {
left++ left++
go func() { go func() {
cte := cl.config.ConnTracker.Wait(
conntrack.Entry{network, s.Addr().String(), addr},
"dial torrent client")
c, err := s.dial(ctx, addr) c, err := s.dial(ctx, addr)
// This is a bit optimistic, but it looks non-trivial to thread // This is a bit optimistic, but it looks non-trivial to thread
// this through the proxy code. Set it now in case we close the // this through the proxy code. Set it now in case we close the
@ -546,7 +551,17 @@ func (cl *Client) dialFirst(ctx context.Context, addr string) dialResult {
tc.SetLinger(0) tc.SetLinger(0)
} }
countDialResult(err) countDialResult(err)
resCh <- dialResult{c, network} dr := dialResult{c, network}
if c == nil {
cte.Done()
} else {
dr.Conn = closeWrapper{c, func() error {
err := c.Close()
cte.Done()
return err
}}
}
resCh <- dr
}() }()
} }
return true return true

12
closewrapper.go Normal file
View File

@ -0,0 +1,12 @@
package torrent
import "net"
type closeWrapper struct {
net.Conn
closer func() error
}
func (me closeWrapper) Close() error {
return me.closer()
}

View File

@ -8,6 +8,7 @@ import (
"github.com/anacrolix/dht" "github.com/anacrolix/dht"
"github.com/anacrolix/missinggo" "github.com/anacrolix/missinggo"
"github.com/anacrolix/missinggo/conntrack"
"github.com/anacrolix/missinggo/expect" "github.com/anacrolix/missinggo/expect"
"github.com/anacrolix/torrent/iplist" "github.com/anacrolix/torrent/iplist"
"github.com/anacrolix/torrent/storage" "github.com/anacrolix/torrent/storage"
@ -121,6 +122,8 @@ type ClientConfig struct {
// Don't add connections that have the same peer ID as an existing // Don't add connections that have the same peer ID as an existing
// connection for a given Torrent. // connection for a given Torrent.
dropDuplicatePeerIds bool dropDuplicatePeerIds bool
ConnTracker *conntrack.Instance
} }
func (cfg *ClientConfig) SetListenAddr(addr string) *ClientConfig { func (cfg *ClientConfig) SetListenAddr(addr string) *ClientConfig {
@ -147,6 +150,7 @@ func NewDefaultClientConfig() *ClientConfig {
ListenHost: func(string) string { return "" }, ListenHost: func(string) string { return "" },
UploadRateLimiter: unlimited, UploadRateLimiter: unlimited,
DownloadRateLimiter: unlimited, DownloadRateLimiter: unlimited,
ConnTracker: conntrack.NewInstance(),
} }
} }