diff --git a/libtorrent/include/libtorrent/aux_/session_impl.hpp b/libtorrent/include/libtorrent/aux_/session_impl.hpp index bff8e3387..df39fabb0 100644 --- a/libtorrent/include/libtorrent/aux_/session_impl.hpp +++ b/libtorrent/include/libtorrent/aux_/session_impl.hpp @@ -178,9 +178,7 @@ namespace libtorrent #endif friend struct checker_impl; friend class invariant_access; - typedef std::map - , boost::intrusive_ptr > - connection_map; + typedef std::set > connection_map; typedef std::map > torrent_map; session_impl( @@ -190,7 +188,8 @@ namespace libtorrent ~session_impl(); #ifndef TORRENT_DISABLE_EXTENSIONS - void add_extension(boost::function(torrent*, void*)> ext); + void add_extension(boost::function( + torrent*, void*)> ext); #endif void operator()(); @@ -214,7 +213,7 @@ namespace libtorrent peer_id const& get_peer_id() const { return m_peer_id; } void close_connection(boost::intrusive_ptr const& p); - void connection_failed(boost::shared_ptr const& s + void connection_failed(boost::intrusive_ptr const& p , tcp::endpoint const& a, char const* message); void set_settings(session_settings const& s); @@ -342,8 +341,8 @@ namespace libtorrent for (connection_map::const_iterator i = m_connections.begin() , end(m_connections.end()); i != end; ++i) { - send_buffer_capacity += i->second->send_buffer_capacity(); - used_send_buffer += i->second->send_buffer_size(); + send_buffer_capacity += (*i)->send_buffer_capacity(); + used_send_buffer += (*i)->send_buffer_size(); } TORRENT_ASSERT(send_buffer_capacity >= used_send_buffer); m_buffer_usage_logger << log_time() << " send_buffer_size: " << send_buffer_capacity << std::endl; @@ -375,12 +374,6 @@ namespace libtorrent // buffers from. boost::pool<> m_send_buffers; - // this is where all active sockets are stored. - // the selector can sleep while there's no activity on - // them - io_service m_io_service; - asio::strand m_strand; - // the file pool that all storages in this session's // torrents uses. It sets a limit on the number of // open files by this session. @@ -392,9 +385,17 @@ namespace libtorrent // handles disk io requests asynchronously // peers have pointers into the disk buffer // pool, and must be destructed before this - // object. + // object. The disk thread relies on the file + // pool object, and must be destructed before + // m_files. disk_io_thread m_disk_thread; + // this is where all active sockets are stored. + // the selector can sleep while there's no activity on + // them + io_service m_io_service; + asio::strand m_strand; + // this is a list of half-open tcp connections // (only outgoing connections) // this has to be one of the last @@ -646,7 +647,7 @@ namespace libtorrent void debug_log(const std::string& line) { - (*m_ses.m_logger) << line << "\n"; + (*m_ses.m_logger) << time_now_string() << " " << line << "\n"; } session_impl& m_ses; }; diff --git a/libtorrent/include/libtorrent/bandwidth_manager.hpp b/libtorrent/include/libtorrent/bandwidth_manager.hpp index 83df5b371..ef132543f 100644 --- a/libtorrent/include/libtorrent/bandwidth_manager.hpp +++ b/libtorrent/include/libtorrent/bandwidth_manager.hpp @@ -181,6 +181,7 @@ struct bandwidth_manager , m_current_quota(0) , m_channel(channel) , m_in_hand_out_bandwidth(false) + , m_abort(false) {} void throttle(int limit) throw() @@ -196,6 +197,12 @@ struct bandwidth_manager return m_limit; } + void close() + { + m_abort = true; + m_history_timer.cancel(); + } + // non prioritized means that, if there's a line for bandwidth, // others will cut in front of the non-prioritized peers. // this is used by web seeds @@ -275,6 +282,8 @@ private: // active that will be invoked, no need to set one up if (m_history.size() > 1) return; + if (m_abort) return; + m_history_timer.expires_at(e.expires_at); m_history_timer.async_wait(bind(&bandwidth_manager::on_history_expire, this, _1)); #ifndef NDEBUG @@ -308,7 +317,7 @@ private: } // now, wait for the next chunk to expire - if (!m_history.empty()) + if (!m_history.empty() && !m_abort) { m_history_timer.expires_at(m_history.back().expires_at); m_history_timer.async_wait(bind(&bandwidth_manager::on_history_expire, this, _1)); @@ -487,6 +496,7 @@ private: // to prevent recursive invocations to interfere bool m_in_hand_out_bandwidth; + bool m_abort; }; } diff --git a/libtorrent/include/libtorrent/bencode.hpp b/libtorrent/include/libtorrent/bencode.hpp index 66da191ab..1d3f1ea2b 100755 --- a/libtorrent/include/libtorrent/bencode.hpp +++ b/libtorrent/include/libtorrent/bencode.hpp @@ -135,26 +135,38 @@ namespace libtorrent } template - std::string read_until(InIt& in, InIt end, char end_token) + std::string read_until(InIt& in, InIt end, char end_token, bool& err) { - if (in == end) throw invalid_encoding(); std::string ret; + if (in == end) + { + err = true; + return ret; + } while (*in != end_token) { ret += *in; ++in; - if (in == end) throw invalid_encoding(); + if (in == end) + { + err = true; + return ret; + } } return ret; } template - void read_string(InIt& in, InIt end, int len, std::string& str) + void read_string(InIt& in, InIt end, int len, std::string& str, bool& err) { TORRENT_ASSERT(len >= 0); for (int i = 0; i < len; ++i) { - if (in == end) throw invalid_encoding(); + if (in == end) + { + err = true; + return; + } str += *in; ++in; } @@ -202,9 +214,13 @@ namespace libtorrent } template - void bdecode_recursive(InIt& in, InIt end, entry& ret) + void bdecode_recursive(InIt& in, InIt end, entry& ret, bool& err) { - if (in == end) throw invalid_encoding(); + if (in == end) + { + err = true; + return; + } switch (*in) { @@ -213,7 +229,8 @@ namespace libtorrent case 'i': { ++in; // 'i' - std::string val = read_until(in, end, 'e'); + std::string val = read_until(in, end, 'e', err); + if (err) return; TORRENT_ASSERT(*in == 'e'); ++in; // 'e' ret = entry(entry::int_t); @@ -230,8 +247,13 @@ namespace libtorrent { ret.list().push_back(entry()); entry& e = ret.list().back(); - bdecode_recursive(in, end, e); - if (in == end) throw invalid_encoding(); + bdecode_recursive(in, end, e, err); + if (err) return; + if (in == end) + { + err = true; + return; + } } TORRENT_ASSERT(*in == 'e'); ++in; // 'e' @@ -246,10 +268,16 @@ namespace libtorrent while (*in != 'e') { entry key; - bdecode_recursive(in, end, key); + bdecode_recursive(in, end, key, err); + if (err) return; entry& e = ret[key.string()]; - bdecode_recursive(in, end, e); - if (in == end) throw invalid_encoding(); + bdecode_recursive(in, end, e, err); + if (err) return; + if (in == end) + { + err = true; + return; + } } TORRENT_ASSERT(*in == 'e'); ++in; // 'e' @@ -260,16 +288,19 @@ namespace libtorrent default: if (isdigit((unsigned char)*in)) { - std::string len_s = read_until(in, end, ':'); + std::string len_s = read_until(in, end, ':', err); + if (err) return; TORRENT_ASSERT(*in == ':'); ++in; // ':' int len = std::atoi(len_s.c_str()); ret = entry(entry::string_t); - read_string(in, end, len, ret.string()); + read_string(in, end, len, ret.string(), err); + if (err) return; } else { - throw invalid_encoding(); + err = true; + return; } } } @@ -284,16 +315,18 @@ namespace libtorrent template entry bdecode(InIt start, InIt end) { - try - { - entry e; - detail::bdecode_recursive(start, end, e); - return e; - } - catch(type_error&) + entry e; + bool err = false; + detail::bdecode_recursive(start, end, e, err); + if (err) { +#ifdef BOOST_NO_EXCEPTIONS + return entry(); +#else throw invalid_encoding(); +#endif } + return e; } } diff --git a/libtorrent/include/libtorrent/connection_queue.hpp b/libtorrent/include/libtorrent/connection_queue.hpp index b3b7cde86..c229ec217 100644 --- a/libtorrent/include/libtorrent/connection_queue.hpp +++ b/libtorrent/include/libtorrent/connection_queue.hpp @@ -56,6 +56,7 @@ public: void done(int ticket); void limit(int limit); int limit() const; + void close(); #ifndef NDEBUG diff --git a/libtorrent/include/libtorrent/disk_io_thread.hpp b/libtorrent/include/libtorrent/disk_io_thread.hpp index bd6d5e1ba..b93ea8b75 100644 --- a/libtorrent/include/libtorrent/disk_io_thread.hpp +++ b/libtorrent/include/libtorrent/disk_io_thread.hpp @@ -106,6 +106,10 @@ namespace libtorrent , boost::function const& f = boost::function()); +#ifndef NDEBUG + disk_io_job find_job(boost::intrusive_ptr s + , int action, int piece) const; +#endif // keep track of the number of bytes in the job queue // at any given time. i.e. the sum of all buffer_size. // this is used to slow down the download global download @@ -120,7 +124,7 @@ namespace libtorrent private: - boost::mutex m_mutex; + mutable boost::mutex m_mutex; boost::condition m_signal; bool m_abort; std::deque m_jobs; @@ -131,6 +135,7 @@ namespace libtorrent #ifndef NDEBUG int m_block_size; + disk_io_job m_current; #endif #ifdef TORRENT_DISK_STATS diff --git a/libtorrent/include/libtorrent/entry.hpp b/libtorrent/include/libtorrent/entry.hpp index 7fd6c8c53..1c25cc7c7 100755 --- a/libtorrent/include/libtorrent/entry.hpp +++ b/libtorrent/include/libtorrent/entry.hpp @@ -161,8 +161,10 @@ namespace libtorrent // is a dictionary, otherwise they will throw entry& operator[](char const* key); entry& operator[](std::string const& key); +#ifndef BOOST_NO_EXCEPTIONS const entry& operator[](char const* key) const; const entry& operator[](std::string const& key) const; +#endif entry* find_key(char const* key); entry const* find_key(char const* key) const; @@ -221,55 +223,80 @@ namespace libtorrent copy(e); } + inline entry::integer_type& entry::integer() { if (m_type == undefined_t) construct(int_t); +#ifndef BOOST_NO_EXCEPTIONS if (m_type != int_t) throw type_error("invalid type requested from entry"); +#endif + TORRENT_ASSERT(m_type == int_t); return *reinterpret_cast(data); } inline entry::integer_type const& entry::integer() const { +#ifndef BOOST_NO_EXCEPTIONS if (m_type != int_t) throw type_error("invalid type requested from entry"); +#endif + TORRENT_ASSERT(m_type == int_t); return *reinterpret_cast(data); } inline entry::string_type& entry::string() { if (m_type == undefined_t) construct(string_t); +#ifndef BOOST_NO_EXCEPTIONS if (m_type != string_t) throw type_error("invalid type requested from entry"); +#endif + TORRENT_ASSERT(m_type == string_t); return *reinterpret_cast(data); } inline entry::string_type const& entry::string() const { +#ifndef BOOST_NO_EXCEPTIONS if (m_type != string_t) throw type_error("invalid type requested from entry"); +#endif + TORRENT_ASSERT(m_type == string_t); return *reinterpret_cast(data); } inline entry::list_type& entry::list() { if (m_type == undefined_t) construct(list_t); +#ifndef BOOST_NO_EXCEPTIONS if (m_type != list_t) throw type_error("invalid type requested from entry"); +#endif + TORRENT_ASSERT(m_type == list_t); return *reinterpret_cast(data); } inline entry::list_type const& entry::list() const { +#ifndef BOOST_NO_EXCEPTIONS if (m_type != list_t) throw type_error("invalid type requested from entry"); +#endif + TORRENT_ASSERT(m_type == list_t); return *reinterpret_cast(data); } inline entry::dictionary_type& entry::dict() { if (m_type == undefined_t) construct(dictionary_t); +#ifndef BOOST_NO_EXCEPTIONS if (m_type != dictionary_t) throw type_error("invalid type requested from entry"); +#endif + TORRENT_ASSERT(m_type == dictionary_t); return *reinterpret_cast(data); } inline entry::dictionary_type const& entry::dict() const { +#ifndef BOOST_NO_EXCEPTIONS if (m_type != dictionary_t) throw type_error("invalid type requested from entry"); +#endif + TORRENT_ASSERT(m_type == dictionary_t); return *reinterpret_cast(data); } diff --git a/libtorrent/include/libtorrent/http_tracker_connection.hpp b/libtorrent/include/libtorrent/http_tracker_connection.hpp index 70be3054a..c0057dfa1 100755 --- a/libtorrent/include/libtorrent/http_tracker_connection.hpp +++ b/libtorrent/include/libtorrent/http_tracker_connection.hpp @@ -130,6 +130,8 @@ namespace libtorrent , proxy_settings const& ps , std::string const& password = ""); + void close(); + private: boost::intrusive_ptr self() @@ -159,7 +161,7 @@ namespace libtorrent asio::strand& m_strand; tcp::resolver m_name_lookup; int m_port; - boost::shared_ptr m_socket; + socket_type m_socket; int m_recv_pos; std::vector m_buffer; std::string m_send_buffer; diff --git a/libtorrent/include/libtorrent/instantiate_connection.hpp b/libtorrent/include/libtorrent/instantiate_connection.hpp index 49cb1fe18..71282f993 100644 --- a/libtorrent/include/libtorrent/instantiate_connection.hpp +++ b/libtorrent/include/libtorrent/instantiate_connection.hpp @@ -41,8 +41,8 @@ namespace libtorrent { struct proxy_settings; - boost::shared_ptr instantiate_connection( - asio::io_service& ios, proxy_settings const& ps); + bool instantiate_connection(asio::io_service& ios + , proxy_settings const& ps, socket_type& s); } #endif diff --git a/libtorrent/include/libtorrent/intrusive_ptr_base.hpp b/libtorrent/include/libtorrent/intrusive_ptr_base.hpp index c7fbe46ad..5cccdf827 100644 --- a/libtorrent/include/libtorrent/intrusive_ptr_base.hpp +++ b/libtorrent/include/libtorrent/intrusive_ptr_base.hpp @@ -34,6 +34,7 @@ POSSIBILITY OF SUCH DAMAGE. #define TORRENT_INTRUSIVE_PTR_BASE #include +#include #include "libtorrent/config.hpp" #include "libtorrent/assert.hpp" @@ -45,7 +46,8 @@ namespace libtorrent intrusive_ptr_base(intrusive_ptr_base const&) : m_refs(0) {} - intrusive_ptr_base& operator=(intrusive_ptr_base const& rhs) {} + intrusive_ptr_base& operator=(intrusive_ptr_base const& rhs) + { return *this; } friend void intrusive_ptr_add_ref(intrusive_ptr_base const* s) { @@ -59,7 +61,7 @@ namespace libtorrent TORRENT_ASSERT(s->m_refs > 0); TORRENT_ASSERT(s != 0); if (--s->m_refs == 0) - delete static_cast(s); + boost::checked_delete(static_cast(s)); } boost::intrusive_ptr self() diff --git a/libtorrent/include/libtorrent/peer_connection.hpp b/libtorrent/include/libtorrent/peer_connection.hpp index e1581affe..805b38d9d 100755 --- a/libtorrent/include/libtorrent/peer_connection.hpp +++ b/libtorrent/include/libtorrent/peer_connection.hpp @@ -367,8 +367,12 @@ namespace libtorrent return boost::optional(); } - void send_buffer(char const* begin, int size); - buffer::interval allocate_send_buffer(int size); + // these functions are virtual to let bt_peer_connection hook into them + // and encrypt the content + virtual void send_buffer(char const* begin, int size); + virtual buffer::interval allocate_send_buffer(int size); + virtual void setup_send(); + template void append_send_buffer(char* buffer, int size, Destructor const& destructor) { @@ -378,7 +382,6 @@ namespace libtorrent m_ses.log_buffer_usage(); #endif } - void setup_send(); #ifndef TORRENT_DISABLE_RESOLVE_COUNTRIES void set_country(char const* c) @@ -466,9 +469,6 @@ namespace libtorrent // the peer belongs to. aux::session_impl& m_ses; - boost::intrusive_ptr self() - { return boost::intrusive_ptr(this); } - // called from the main loop when this connection has any // work to do. void on_send_data(asio::error_code const& error diff --git a/libtorrent/include/libtorrent/piece_picker.hpp b/libtorrent/include/libtorrent/piece_picker.hpp index ec6fc5bd3..ef69c3334 100755 --- a/libtorrent/include/libtorrent/piece_picker.hpp +++ b/libtorrent/include/libtorrent/piece_picker.hpp @@ -357,11 +357,12 @@ namespace libtorrent // the different priority levels switch (piece_priority) { + case 1: return prio; case 2: return prio - 1; case 3: return (std::max)(prio / 2, 1); case 4: return (std::max)(prio / 2 - 1, 1); - case 5: - case 6: return (std::min)(prio / 2 - 1, 2); + case 5: return (std::max)(prio / 3, 1); + case 6: return (std::max)(prio / 3 - 1, 1); case 7: return 1; } return prio; diff --git a/libtorrent/include/libtorrent/policy.hpp b/libtorrent/include/libtorrent/policy.hpp index c38bb426c..95397b49e 100755 --- a/libtorrent/include/libtorrent/policy.hpp +++ b/libtorrent/include/libtorrent/policy.hpp @@ -85,6 +85,7 @@ namespace libtorrent // the tracker, pex, lsd or dht. policy::peer* peer_from_tracker(const tcp::endpoint& remote, const peer_id& pid , int source, char flags); + void update_peer_port(int port, policy::peer* p, int src); // called when an incoming connection is accepted void new_connection(peer_connection& c); @@ -219,6 +220,8 @@ namespace libtorrent typedef std::multimap::const_iterator const_iterator; iterator begin_peer() { return m_peers.begin(); } iterator end_peer() { return m_peers.end(); } + const_iterator begin_peer() const { return m_peers.begin(); } + const_iterator end_peer() const { return m_peers.end(); } bool connect_one_peer(); bool disconnect_one_peer(); diff --git a/libtorrent/include/libtorrent/proxy_base.hpp b/libtorrent/include/libtorrent/proxy_base.hpp index 021802dd0..f2a955958 100644 --- a/libtorrent/include/libtorrent/proxy_base.hpp +++ b/libtorrent/include/libtorrent/proxy_base.hpp @@ -104,10 +104,9 @@ public: m_sock.bind(endpoint); } - template - void bind(endpoint_type const& endpoint, Error_Handler const& error_handler) + void bind(endpoint_type const& endpoint, asio::error_code& ec) { - m_sock.bind(endpoint, error_handler); + m_sock.bind(endpoint, ec); } void open(protocol_type const& p) @@ -115,10 +114,9 @@ public: m_sock.open(p); } - template - void open(protocol_type const& p, Error_Handler const& error_handler) + void open(protocol_type const& p, asio::error_code& ec) { - m_sock.open(p, error_handler); + m_sock.open(p, ec); } void close() @@ -127,10 +125,9 @@ public: m_sock.close(); } - template - void close(Error_Handler const& error_handler) + void close(asio::error_code& ec) { - m_sock.close(error_handler); + m_sock.close(ec); } endpoint_type remote_endpoint() @@ -138,8 +135,7 @@ public: return m_remote_endpoint; } - template - endpoint_type remote_endpoint(Error_Handler const& error_handler) + endpoint_type remote_endpoint(asio::error_code& ec) { return m_remote_endpoint; } @@ -149,10 +145,9 @@ public: return m_sock.local_endpoint(); } - template - endpoint_type local_endpoint(Error_Handler const& error_handler) + endpoint_type local_endpoint(asio::error_code& ec) { - return m_sock.local_endpoint(error_handler); + return m_sock.local_endpoint(ec); } asio::io_service& io_service() @@ -168,7 +163,6 @@ public: protected: stream_socket m_sock; - // the socks5 proxy std::string m_hostname; int m_port; diff --git a/libtorrent/include/libtorrent/session_settings.hpp b/libtorrent/include/libtorrent/session_settings.hpp index a792e296a..7b08ec11e 100644 --- a/libtorrent/include/libtorrent/session_settings.hpp +++ b/libtorrent/include/libtorrent/session_settings.hpp @@ -87,7 +87,7 @@ namespace libtorrent , tracker_receive_timeout(20) , stop_tracker_timeout(5) , tracker_maximum_response_length(1024*1024) - , piece_timeout(120) + , piece_timeout(10) , request_queue_time(3.f) , max_allowed_in_request_queue(250) , max_out_request_queue(200) @@ -111,6 +111,7 @@ namespace libtorrent , initial_picker_threshold(4) , allowed_fast_set_size(10) , max_outstanding_disk_bytes_per_connection(64 * 1024) + , handshake_timeout(10) #ifndef TORRENT_DISABLE_DHT , use_dht_as_fallback(true) #endif @@ -270,6 +271,11 @@ namespace libtorrent // to not completely disrupt normal downloads. int max_outstanding_disk_bytes_per_connection; + // the number of seconds to wait for a handshake + // response from a peer. If no response is received + // within this time, the peer is disconnected. + int handshake_timeout; + #ifndef TORRENT_DISABLE_DHT // while this is true, the dht will note be used unless the // tracker is online diff --git a/libtorrent/include/libtorrent/socks4_stream.hpp b/libtorrent/include/libtorrent/socks4_stream.hpp index 6110dbe4d..9530f9d57 100644 --- a/libtorrent/include/libtorrent/socks4_stream.hpp +++ b/libtorrent/include/libtorrent/socks4_stream.hpp @@ -89,3 +89,4 @@ private: } #endif + diff --git a/libtorrent/include/libtorrent/socks5_stream.hpp b/libtorrent/include/libtorrent/socks5_stream.hpp index 8bfc74c59..622557cd2 100644 --- a/libtorrent/include/libtorrent/socks5_stream.hpp +++ b/libtorrent/include/libtorrent/socks5_stream.hpp @@ -101,3 +101,4 @@ private: } #endif + diff --git a/libtorrent/include/libtorrent/storage.hpp b/libtorrent/include/libtorrent/storage.hpp index 68a81c75b..a3f97b589 100755 --- a/libtorrent/include/libtorrent/storage.hpp +++ b/libtorrent/include/libtorrent/storage.hpp @@ -57,6 +57,7 @@ POSSIBILITY OF SUCH DAMAGE. #include "libtorrent/peer_request.hpp" #include "libtorrent/hasher.hpp" #include "libtorrent/config.hpp" +#include "libtorrent/buffer.hpp" namespace libtorrent { @@ -344,8 +345,8 @@ namespace libtorrent // used to move pieces while expanding // the storage from compact allocation // to full allocation - std::vector m_scratch_buffer; - std::vector m_scratch_buffer2; + buffer m_scratch_buffer; + buffer m_scratch_buffer2; // the piece that is in the scratch buffer int m_scratch_piece; diff --git a/libtorrent/include/libtorrent/torrent.hpp b/libtorrent/include/libtorrent/torrent.hpp index 60140f2c2..5ee5ddb03 100755 --- a/libtorrent/include/libtorrent/torrent.hpp +++ b/libtorrent/include/libtorrent/torrent.hpp @@ -128,6 +128,8 @@ namespace libtorrent #ifndef TORRENT_DISABLE_EXTENSIONS void add_extension(boost::shared_ptr); + void add_extension(boost::function(torrent*, void*)> const& ext + , void* userdata); #endif // this is called when the torrent has metadata. @@ -171,7 +173,7 @@ namespace libtorrent boost::tuples::tuple bytes_done() const; size_type quantized_bytes_done() const; - void ip_filter_updated() { m_policy->ip_filter_updated(); } + void ip_filter_updated() { m_policy.ip_filter_updated(); } void pause(); void resume(); @@ -204,7 +206,7 @@ namespace libtorrent tcp::endpoint const& get_interface() const { return m_net_interface; } void connect_to_url_seed(std::string const& url); - peer_connection* connect_to_peer(policy::peer* peerinfo); + bool connect_to_peer(policy::peer* peerinfo) throw(); void set_ratio(float ratio) { TORRENT_ASSERT(ratio >= 0.0f); m_ratio = ratio; } @@ -276,31 +278,12 @@ namespace libtorrent bool want_more_peers() const; bool try_connect_peer(); - peer_connection* connection_for(tcp::endpoint const& a) - { - peer_iterator i = m_connections.find(a); - if (i == m_connections.end()) return 0; - return i->second; - } - -#ifndef NDEBUG - void connection_for(address const& a, std::vector& pc) - { - for (peer_iterator i = m_connections.begin() - , end(m_connections.end()); i != end; ++i) - { - if (i->first.address() == a) pc.push_back(i->second); - } - return; - } -#endif - // the number of peers that belong to this torrent int num_peers() const { return (int)m_connections.size(); } int num_seeds() const; - typedef std::map::iterator peer_iterator; - typedef std::map::const_iterator const_peer_iterator; + typedef std::set::iterator peer_iterator; + typedef std::set::const_iterator const_peer_iterator; const_peer_iterator begin() const { return m_connections.begin(); } const_peer_iterator end() const { return m_connections.end(); } @@ -424,6 +407,7 @@ namespace libtorrent } int block_size() const { TORRENT_ASSERT(m_block_size > 0); return m_block_size; } + peer_request to_req(piece_block const& p); // this will tell all peers that we just got his piece // and also let the piece picker know that we have this piece @@ -494,11 +478,7 @@ namespace libtorrent { return m_picker.get() != 0; } - policy& get_policy() - { - TORRENT_ASSERT(m_policy); - return *m_policy; - } + policy& get_policy() { return m_policy; } piece_manager& filesystem(); torrent_info const& torrent_file() const { return *m_torrent_file; } @@ -549,7 +529,7 @@ namespace libtorrent // to the checker thread for initial checking // of the storage. void set_metadata(entry const&); - + private: void on_files_deleted(int ret, disk_io_job const& j); @@ -632,7 +612,7 @@ namespace libtorrent #ifndef NDEBUG public: #endif - std::map m_connections; + std::set m_connections; #ifndef NDEBUG private: #endif @@ -688,8 +668,6 @@ namespace libtorrent // ----------------------------- - boost::shared_ptr m_policy; - // a back reference to the session // this torrent belongs to. aux::session_impl& m_ses; @@ -801,6 +779,8 @@ namespace libtorrent // total_done - m_initial_done <= total_payload_download size_type m_initial_done; #endif + + policy m_policy; }; inline ptime torrent::next_announce() const diff --git a/libtorrent/include/libtorrent/torrent_handle.hpp b/libtorrent/include/libtorrent/torrent_handle.hpp index 8ae2f7f41..7ddb218a6 100755 --- a/libtorrent/include/libtorrent/torrent_handle.hpp +++ b/libtorrent/include/libtorrent/torrent_handle.hpp @@ -64,6 +64,8 @@ namespace libtorrent struct checker_impl; } + struct torrent_plugin; + struct TORRENT_EXPORT duplicate_torrent: std::exception { virtual const char* what() const throw() @@ -279,6 +281,11 @@ namespace libtorrent void remove_url_seed(std::string const& url) const; std::set url_seeds() const; +#ifndef TORRENT_DISABLE_EXTENSIONS + void add_extension(boost::function(torrent*, void*)> const& ext + , void* userdata = 0); +#endif + bool has_metadata() const; const torrent_info& get_torrent_info() const; bool is_valid() const; diff --git a/libtorrent/include/libtorrent/torrent_info.hpp b/libtorrent/include/libtorrent/torrent_info.hpp index 89744d0af..16eebf234 100755 --- a/libtorrent/include/libtorrent/torrent_info.hpp +++ b/libtorrent/include/libtorrent/torrent_info.hpp @@ -240,8 +240,8 @@ namespace libtorrent void parse_info_section(entry const& e); - entry extra(char const* key) const - { return m_extra_info[key]; } + entry const* extra(char const* key) const + { return m_extra_info.find_key(key); } // frees parts of the metadata that isn't // used by seeds diff --git a/libtorrent/include/libtorrent/tracker_manager.hpp b/libtorrent/include/libtorrent/tracker_manager.hpp index 07c377a0f..fdc3f6bbf 100755 --- a/libtorrent/include/libtorrent/tracker_manager.hpp +++ b/libtorrent/include/libtorrent/tracker_manager.hpp @@ -184,6 +184,7 @@ namespace libtorrent typedef boost::mutex mutex_t; mutable mutex_t m_mutex; + bool m_abort; }; struct TORRENT_EXPORT tracker_connection @@ -202,7 +203,7 @@ namespace libtorrent void fail(int code, char const* msg); void fail_timeout(); - void close(); + virtual void close(); address const& bind_interface() const { return m_bind_interface; } protected: diff --git a/libtorrent/include/libtorrent/udp_tracker_connection.hpp b/libtorrent/include/libtorrent/udp_tracker_connection.hpp index c0e6853b9..4fba505a4 100755 --- a/libtorrent/include/libtorrent/udp_tracker_connection.hpp +++ b/libtorrent/include/libtorrent/udp_tracker_connection.hpp @@ -74,6 +74,8 @@ namespace libtorrent , boost::weak_ptr c , session_settings const& stn); + void close(); + private: enum action_t @@ -105,7 +107,7 @@ namespace libtorrent asio::strand& m_strand; udp::resolver m_name_lookup; - boost::shared_ptr m_socket; + datagram_socket m_socket; udp::endpoint m_target; udp::endpoint m_sender; diff --git a/libtorrent/include/libtorrent/upnp.hpp b/libtorrent/include/libtorrent/upnp.hpp index 2c819df5f..0b799d1e9 100644 --- a/libtorrent/include/libtorrent/upnp.hpp +++ b/libtorrent/include/libtorrent/upnp.hpp @@ -148,7 +148,18 @@ private: { mapping[0].protocol = 0; mapping[1].protocol = 1; +#ifndef NDEBUG + magic = 1337; +#endif } + +#ifndef NDEBUG + ~rootdevice() + { + TORRENT_ASSERT(magic == 1337); + magic = 0; + } +#endif // the interface url, through which the list of // supported interfaces are fetched @@ -174,8 +185,12 @@ private: mutable boost::shared_ptr upnp_connection; +#ifndef NDEBUG + int magic; +#endif void close() const { + TORRENT_ASSERT(magic == 1337); if (!upnp_connection) return; upnp_connection->close(); upnp_connection.reset(); diff --git a/libtorrent/include/libtorrent/variant_stream.hpp b/libtorrent/include/libtorrent/variant_stream.hpp index 4f45f5e01..bbe3d964d 100644 --- a/libtorrent/include/libtorrent/variant_stream.hpp +++ b/libtorrent/include/libtorrent/variant_stream.hpp @@ -48,8 +48,8 @@ namespace aux template struct io_control_visitor_ec: boost::static_visitor<> { - io_control_visitor_ec(IO_Control_Command& ioc, asio::error_code& ec) - : ioc(ioc), ec(ec) {} + io_control_visitor_ec(IO_Control_Command& ioc, asio::error_code& ec_) + : ioc(ioc), ec(ec_) {} template void operator()(T* p) const @@ -108,30 +108,27 @@ namespace aux // -------------- bind ----------- - template - struct bind_visitor + template + struct bind_visitor_ec : boost::static_visitor<> { - bind_visitor(EndpointType const& ep, Error_Handler const& error_handler) + bind_visitor_ec(EndpointType const& ep, asio::error_code& ec_) : endpoint(ep) - , error_handler(error_handler) + , ec(ec_) {} template void operator()(T* p) const - { - p->bind(endpoint, error_handler); - } + { p->bind(endpoint, ec); } - void operator()(boost::blank) const - {} + void operator()(boost::blank) const {} EndpointType const& endpoint; - Error_Handler const& error_handler; + asio::error_code& ec; }; template - struct bind_visitor + struct bind_visitor : boost::static_visitor<> { bind_visitor(EndpointType const& ep) @@ -140,42 +137,36 @@ namespace aux template void operator()(T* p) const - { - p->bind(endpoint); - } + { p->bind(endpoint); } - void operator()(boost::blank) const - {} + void operator()(boost::blank) const {} EndpointType const& endpoint; }; // -------------- open ----------- - template - struct open_visitor + template + struct open_visitor_ec : boost::static_visitor<> { - open_visitor(Protocol const& p, Error_Handler const& error_handler) + open_visitor_ec(Protocol const& p, asio::error_code& ec_) : proto(p) - , error_handler(error_handler) + , ec(ec_) {} template void operator()(T* p) const - { - p->open(proto, error_handler); - } + { p->open(proto, ec); } - void operator()(boost::blank) const - {} + void operator()(boost::blank) const {} Protocol const& proto; - Error_Handler const& error_handler; + asio::error_code& ec; }; template - struct open_visitor + struct open_visitor : boost::static_visitor<> { open_visitor(Protocol const& p) @@ -184,50 +175,39 @@ namespace aux template void operator()(T* p) const - { - p->open(proto); - } + { p->open(proto); } - void operator()(boost::blank) const - {} + void operator()(boost::blank) const {} Protocol const& proto; }; // -------------- close ----------- - template + struct close_visitor_ec + : boost::static_visitor<> + { + close_visitor_ec(asio::error_code& ec_) + : ec(ec_) + {} + + template + void operator()(T* p) const + { p->close(ec); } + + void operator()(boost::blank) const {} + + asio::error_code& ec; + }; + struct close_visitor : boost::static_visitor<> { - close_visitor(Error_Handler const& error_handler) - : error_handler(error_handler) - {} - template void operator()(T* p) const - { - p->close(error_handler); - } + { p->close(); } - void operator()(boost::blank) const - {} - - Error_Handler const& error_handler; - }; - - template <> - struct close_visitor - : boost::static_visitor<> - { - template - void operator()(T* p) const - { - p->close(); - } - - void operator()(boost::blank) const - {} + void operator()(boost::blank) const {} }; // -------------- remote_endpoint ----------- @@ -242,14 +222,10 @@ namespace aux template EndpointType operator()(T* p) const - { - return p->remote_endpoint(error_code); - } + { return p->remote_endpoint(error_code); } EndpointType operator()(boost::blank) const - { - return EndpointType(); - } + { return EndpointType(); } asio::error_code& error_code; }; @@ -260,14 +236,10 @@ namespace aux { template EndpointType operator()(T* p) const - { - return p->remote_endpoint(); - } + { return p->remote_endpoint(); } EndpointType operator()(boost::blank) const - { - return EndpointType(); - } + { return EndpointType(); } }; // -------------- local_endpoint ----------- @@ -357,9 +329,9 @@ namespace aux struct read_some_visitor_ec : boost::static_visitor { - read_some_visitor_ec(Mutable_Buffers const& buffers, asio::error_code& ec) + read_some_visitor_ec(Mutable_Buffers const& buffers, asio::error_code& ec_) : buffers(buffers) - , ec(ec) + , ec(ec_) {} template @@ -399,18 +371,17 @@ namespace aux // -------------- in_avail ----------- - template - struct in_avail_visitor + struct in_avail_visitor_ec : boost::static_visitor { - in_avail_visitor(Error_Handler const& error_handler) - : error_handler(error_handler) + in_avail_visitor_ec(asio::error_code& ec_) + : ec(ec_) {} template std::size_t operator()(T* p) const { - return p->in_avail(error_handler); + return p->in_avail(ec); } std::size_t operator()(boost::blank) const @@ -418,11 +389,10 @@ namespace aux return 0; } - Error_Handler const& error_handler; + asio::error_code& ec; }; - template <> - struct in_avail_visitor + struct in_avail_visitor : boost::static_visitor { template @@ -501,15 +471,12 @@ public: typedef typename S0::endpoint_type endpoint_type; typedef typename S0::protocol_type protocol_type; - explicit variant_stream(asio::io_service& io_service) - : m_io_service(io_service) - , m_variant(boost::blank()) - {} + explicit variant_stream() : m_variant(boost::blank()) {} template - void instantiate() + void instantiate(asio::io_service& ios) { - std::auto_ptr owned(new S(m_io_service)); + std::auto_ptr owned(new S(ios)); boost::apply_visitor(aux::delete_visitor(), m_variant); m_variant = owned.get(); owned.release(); @@ -605,12 +572,11 @@ public: boost::apply_visitor(aux::bind_visitor(endpoint), m_variant); } - template - void bind(endpoint_type const& endpoint, Error_Handler const& error_handler) + void bind(endpoint_type const& endpoint, asio::error_code& ec) { TORRENT_ASSERT(instantiated()); boost::apply_visitor( - aux::bind_visitor(endpoint, error_handler), m_variant + aux::bind_visitor_ec(endpoint, ec), m_variant ); } @@ -620,42 +586,39 @@ public: boost::apply_visitor(aux::open_visitor(p), m_variant); } - template - void open(protocol_type const& p, Error_Handler const& error_handler) + void open(protocol_type const& p, asio::error_code& ec) { TORRENT_ASSERT(instantiated()); boost::apply_visitor( - aux::open_visitor(p, error_handler), m_variant + aux::open_visitor_ec(p, ec), m_variant ); } void close() { - TORRENT_ASSERT(instantiated()); - boost::apply_visitor(aux::close_visitor<>(), m_variant); + if (!instantiated()) return; + boost::apply_visitor(aux::close_visitor(), m_variant); } - template - void close(Error_Handler const& error_handler) + void close(asio::error_code& ec) { - TORRENT_ASSERT(instantiated()); + if (!instantiated()) return; boost::apply_visitor( - aux::close_visitor(error_handler), m_variant + aux::close_visitor_ec(ec), m_variant ); } std::size_t in_avail() { TORRENT_ASSERT(instantiated()); - return boost::apply_visitor(aux::in_avail_visitor<>(), m_variant); + return boost::apply_visitor(aux::in_avail_visitor(), m_variant); } - template - std::size_t in_avail(Error_Handler const& error_handler) + std::size_t in_avail(asio::error_code& ec) { TORRENT_ASSERT(instantiated()); return boost::apply_visitor( - aux::in_avail_visitor(error_handler), m_variant + aux::in_avail_visitor_ec(ec), m_variant ); } @@ -704,7 +667,6 @@ public: } private: - asio::io_service& m_io_service; variant_type m_variant; }; diff --git a/libtorrent/src/broadcast_socket.cpp b/libtorrent/src/broadcast_socket.cpp index 4c2e9397c..a9d27eff4 100644 --- a/libtorrent/src/broadcast_socket.cpp +++ b/libtorrent/src/broadcast_socket.cpp @@ -190,6 +190,8 @@ namespace libtorrent void broadcast_socket::close() { + m_on_receive.clear(); + for (std::list::iterator i = m_sockets.begin() , end(m_sockets.end()); i != end; ++i) { diff --git a/libtorrent/src/bt_peer_connection.cpp b/libtorrent/src/bt_peer_connection.cpp index 0559aff95..d7b3226ec 100755 --- a/libtorrent/src/bt_peer_connection.cpp +++ b/libtorrent/src/bt_peer_connection.cpp @@ -582,9 +582,8 @@ namespace libtorrent { TORRENT_ASSERT(buf); TORRENT_ASSERT(size > 0); - TORRENT_ASSERT(!m_rc4_encrypted || m_encrypted); - if (m_rc4_encrypted) + if (m_encrypted && m_rc4_encrypted) m_RC4_handler->encrypt(buf, size); peer_connection::send_buffer(buf, size); @@ -592,9 +591,7 @@ namespace libtorrent buffer::interval bt_peer_connection::allocate_send_buffer(int size) { - TORRENT_ASSERT(!m_rc4_encrypted || m_encrypted); - - if (m_rc4_encrypted) + if (m_encrypted && m_rc4_encrypted) { TORRENT_ASSERT(m_enc_send_buffer.left() == 0); m_enc_send_buffer = peer_connection::allocate_send_buffer(size); @@ -609,9 +606,7 @@ namespace libtorrent void bt_peer_connection::setup_send() { - TORRENT_ASSERT(!m_rc4_encrypted || m_encrypted); - - if (m_rc4_encrypted && m_enc_send_buffer.left()) + if (m_encrypted && m_rc4_encrypted && m_enc_send_buffer.left()) { TORRENT_ASSERT(m_enc_send_buffer.begin); TORRENT_ASSERT(m_enc_send_buffer.end); @@ -1208,11 +1203,11 @@ namespace libtorrent // there is supposed to be a remote listen port if (entry* listen_port = root.find_key("p")) { - if (listen_port->type() == entry::int_t) + if (listen_port->type() == entry::int_t + && peer_info_struct() != 0) { - tcp::endpoint adr(remote().address() - , (unsigned short)listen_port->integer()); - t->get_policy().peer_from_tracker(adr, pid(), peer_info::incoming, 0); + t->get_policy().update_peer_port(listen_port->integer() + , peer_info_struct(), peer_info::incoming); } } // there should be a version too diff --git a/libtorrent/src/connection_queue.cpp b/libtorrent/src/connection_queue.cpp index 1caeb99fc..a48456ed5 100644 --- a/libtorrent/src/connection_queue.cpp +++ b/libtorrent/src/connection_queue.cpp @@ -86,6 +86,11 @@ namespace libtorrent try_connect(); } + void connection_queue::close() + { + m_timer.cancel(); + } + void connection_queue::limit(int limit) { m_half_open_limit = limit; } @@ -111,8 +116,14 @@ namespace libtorrent { INVARIANT_CHECK; - if (!free_slots() || m_queue.empty()) + if (!free_slots()) return; + + if (m_queue.empty()) + { + m_timer.cancel(); + return; + } std::list::iterator i = std::find_if(m_queue.begin() , m_queue.end(), boost::bind(&entry::connecting, _1) == false); diff --git a/libtorrent/src/disk_io_thread.cpp b/libtorrent/src/disk_io_thread.cpp index 22ee12179..a9446f664 100644 --- a/libtorrent/src/disk_io_thread.cpp +++ b/libtorrent/src/disk_io_thread.cpp @@ -70,6 +70,31 @@ namespace libtorrent m_disk_io_thread.join(); } +#ifndef NDEBUG + disk_io_job disk_io_thread::find_job(boost::intrusive_ptr s + , int action, int piece) const + { + boost::mutex::scoped_lock l(m_mutex); + for (std::deque::const_iterator i = m_jobs.begin(); + i != m_jobs.end(); ++i) + { + if (i->storage != s) + continue; + if ((i->action == action || action == -1) && i->piece == piece) + return *i; + } + if ((m_current.action == action || action == -1) + && m_current.piece == piece) + return m_current; + + disk_io_job ret; + ret.action = (disk_io_job::action_t)-1; + ret.piece = -1; + return ret; + } + +#endif + // aborts read operations void disk_io_thread::stop(boost::intrusive_ptr s) { @@ -205,12 +230,19 @@ namespace libtorrent m_log << log_time() << " idle" << std::endl; #endif boost::mutex::scoped_lock l(m_mutex); +#ifndef NDEBUG + m_current.action = (disk_io_job::action_t)-1; + m_current.piece = -1; +#endif while (m_jobs.empty() && !m_abort) m_signal.wait(l); if (m_abort && m_jobs.empty()) return; boost::function handler; handler.swap(m_jobs.front().callback); +#ifndef NDEBUG + m_current = m_jobs.front(); +#endif disk_io_job j = m_jobs.front(); m_jobs.pop_front(); m_queue_buffer_size -= j.buffer_size; @@ -309,6 +341,11 @@ namespace libtorrent // else std::cerr << "DISK THREAD: invoking callback" << std::endl; try { if (handler) handler(ret, j); } catch (std::exception&) {} + +#ifndef NDEBUG + m_current.storage = 0; + m_current.callback.clear(); +#endif if (j.buffer && free_buffer) { diff --git a/libtorrent/src/entry.cpp b/libtorrent/src/entry.cpp index 50c6967cc..88800713c 100755 --- a/libtorrent/src/entry.cpp +++ b/libtorrent/src/entry.cpp @@ -126,6 +126,7 @@ namespace libtorrent return &i->second; } +#ifndef BOOST_NO_EXCEPTIONS const entry& entry::operator[](char const* key) const { dictionary_type::const_iterator i = dict().find(key); @@ -138,6 +139,7 @@ namespace libtorrent { return (*this)[key.c_str()]; } +#endif entry::entry(dictionary_type const& v) { diff --git a/libtorrent/src/http_tracker_connection.cpp b/libtorrent/src/http_tracker_connection.cpp index de1783b58..ed9823b83 100755 --- a/libtorrent/src/http_tracker_connection.cpp +++ b/libtorrent/src/http_tracker_connection.cpp @@ -489,20 +489,39 @@ namespace libtorrent , boost::lexical_cast(m_port)); m_name_lookup.async_resolve(q, m_strand.wrap( boost::bind(&http_tracker_connection::name_lookup, self(), _1, _2))); - set_timeout(m_settings.tracker_completion_timeout + set_timeout(req.event == tracker_request::stopped + ? m_settings.stop_tracker_timeout + : m_settings.tracker_completion_timeout , m_settings.tracker_receive_timeout); } void http_tracker_connection::on_timeout() { m_timed_out = true; - m_socket.reset(); + m_socket.close(); m_name_lookup.cancel(); if (m_connection_ticket > -1) m_cc.done(m_connection_ticket); m_connection_ticket = -1; fail_timeout(); } + void http_tracker_connection::close() + { + asio::error_code ec; + m_socket.close(ec); + m_name_lookup.cancel(); + if (m_connection_ticket > -1) m_cc.done(m_connection_ticket); + m_connection_ticket = -1; + m_timed_out = true; + tracker_connection::close(); +#if defined(TORRENT_VERBOSE_LOGGING) || defined(TORRENT_LOGGING) + boost::shared_ptr cb = requester(); + std::stringstream msg; + msg << "http_tracker_connection::close() " << m_man.num_requests(); + if (cb) cb->debug_log(msg.str()); +#endif + } + void http_tracker_connection::name_lookup(asio::error_code const& error , tcp::resolver::iterator i) try { @@ -550,18 +569,20 @@ namespace libtorrent } if (cb) cb->m_tracker_address = target_address; - m_socket = instantiate_connection(m_name_lookup.io_service(), m_proxy); + bool ret = instantiate_connection(m_strand.io_service(), m_proxy, m_socket); + + TORRENT_ASSERT(ret); if (m_proxy.type == proxy_settings::http || m_proxy.type == proxy_settings::http_pw) { // the tracker connection will talk immediately to // the proxy, without requiring CONNECT support - m_socket->get().set_no_connect(true); + m_socket.get().set_no_connect(true); } - m_socket->open(target_address.protocol()); - m_socket->bind(tcp::endpoint(bind_interface(), 0)); + m_socket.open(target_address.protocol()); + m_socket.bind(tcp::endpoint(bind_interface(), 0)); m_cc.enqueue(bind(&http_tracker_connection::connect, self(), _1, target_address) , bind(&http_tracker_connection::on_timeout, self()) , seconds(m_settings.tracker_receive_timeout)); @@ -574,7 +595,7 @@ namespace libtorrent void http_tracker_connection::connect(int ticket, tcp::endpoint target_address) { m_connection_ticket = ticket; - m_socket->async_connect(target_address, bind(&http_tracker_connection::connected, self(), _1)); + m_socket.async_connect(target_address, bind(&http_tracker_connection::connected, self(), _1)); } void http_tracker_connection::connected(asio::error_code const& error) try @@ -595,7 +616,7 @@ namespace libtorrent #endif restart_read_timeout(); - async_write(*m_socket, asio::buffer(m_send_buffer.c_str() + async_write(m_socket, asio::buffer(m_send_buffer.c_str() , m_send_buffer.size()), bind(&http_tracker_connection::sent , self(), _1)); } @@ -620,7 +641,7 @@ namespace libtorrent #endif restart_read_timeout(); TORRENT_ASSERT(m_buffer.size() - m_recv_pos > 0); - m_socket->async_read_some(asio::buffer(&m_buffer[m_recv_pos] + m_socket.async_read_some(asio::buffer(&m_buffer[m_recv_pos] , m_buffer.size() - m_recv_pos), bind(&http_tracker_connection::receive , self(), _1, _2)); } @@ -701,7 +722,7 @@ namespace libtorrent } TORRENT_ASSERT(m_buffer.size() - m_recv_pos > 0); - m_socket->async_read_some(asio::buffer(&m_buffer[m_recv_pos] + m_socket.async_read_some(asio::buffer(&m_buffer[m_recv_pos] , m_buffer.size() - m_recv_pos), bind(&http_tracker_connection::receive , self(), _1, _2)); } @@ -757,7 +778,6 @@ namespace libtorrent if (m_parser.status_code() != 200) { fail(m_parser.status_code(), m_parser.message().c_str()); - close(); return; } @@ -819,6 +839,7 @@ namespace libtorrent TORRENT_ASSERT(false); } #endif + close(); } peer_entry http_tracker_connection::extract_peer_info(const entry& info) diff --git a/libtorrent/src/instantiate_connection.cpp b/libtorrent/src/instantiate_connection.cpp index 43b70f40d..f9c997fb1 100644 --- a/libtorrent/src/instantiate_connection.cpp +++ b/libtorrent/src/instantiate_connection.cpp @@ -42,42 +42,40 @@ POSSIBILITY OF SUCH DAMAGE. namespace libtorrent { - boost::shared_ptr instantiate_connection( - asio::io_service& ios, proxy_settings const& ps) + bool instantiate_connection(asio::io_service& ios + , proxy_settings const& ps, socket_type& s) { - boost::shared_ptr s(new socket_type(ios)); - if (ps.type == proxy_settings::none) { - s->instantiate(); + s.instantiate(ios); } else if (ps.type == proxy_settings::http || ps.type == proxy_settings::http_pw) { - s->instantiate(); - s->get().set_proxy(ps.hostname, ps.port); + s.instantiate(ios); + s.get().set_proxy(ps.hostname, ps.port); if (ps.type == proxy_settings::socks5_pw) - s->get().set_username(ps.username, ps.password); + s.get().set_username(ps.username, ps.password); } else if (ps.type == proxy_settings::socks5 || ps.type == proxy_settings::socks5_pw) { - s->instantiate(); - s->get().set_proxy(ps.hostname, ps.port); + s.instantiate(ios); + s.get().set_proxy(ps.hostname, ps.port); if (ps.type == proxy_settings::socks5_pw) - s->get().set_username(ps.username, ps.password); + s.get().set_username(ps.username, ps.password); } else if (ps.type == proxy_settings::socks4) { - s->instantiate(); - s->get().set_proxy(ps.hostname, ps.port); - s->get().set_username(ps.username); + s.instantiate(ios); + s.get().set_proxy(ps.hostname, ps.port); + s.get().set_username(ps.username); } else { - throw std::runtime_error("unsupported proxy type"); + return false; } - return s; + return true; } } diff --git a/libtorrent/src/lsd.cpp b/libtorrent/src/lsd.cpp index 44d7b19d4..6e1dcb7b3 100644 --- a/libtorrent/src/lsd.cpp +++ b/libtorrent/src/lsd.cpp @@ -184,5 +184,6 @@ void lsd::on_announce(udp::endpoint const& from, char* buffer void lsd::close() { m_socket.close(); + m_broadcast_timer.cancel(); } diff --git a/libtorrent/src/metadata_transfer.cpp b/libtorrent/src/metadata_transfer.cpp index e02a2d758..50dc57ec7 100644 --- a/libtorrent/src/metadata_transfer.cpp +++ b/libtorrent/src/metadata_transfer.cpp @@ -504,11 +504,10 @@ namespace libtorrent { namespace // extension and that has metadata int peers = 0; #ifndef TORRENT_DISABLE_EXTENSIONS - typedef std::map conn_map; - for (conn_map::iterator i = m_torrent.begin() + for (torrent::peer_iterator i = m_torrent.begin() , end(m_torrent.end()); i != end; ++i) { - bt_peer_connection* c = dynamic_cast(i->second); + bt_peer_connection* c = dynamic_cast(*i); if (c == 0) continue; metadata_peer_plugin* p = c->supports_extension(); diff --git a/libtorrent/src/natpmp.cpp b/libtorrent/src/natpmp.cpp index 42cf89e37..38319d18f 100644 --- a/libtorrent/src/natpmp.cpp +++ b/libtorrent/src/natpmp.cpp @@ -382,6 +382,8 @@ void natpmp::try_next_mapping(int i) void natpmp::close() { + asio::error_code ec; + m_socket.close(ec); if (m_disabled) return; for (int i = 0; i < num_mappings; ++i) { @@ -390,5 +392,7 @@ void natpmp::close() m_mappings[i].external_port = 0; refresh_mapping(i); } + m_refresh_timer.cancel(); + m_send_timer.cancel(); } diff --git a/libtorrent/src/peer_connection.cpp b/libtorrent/src/peer_connection.cpp index 625b8eacd..9bd089234 100755 --- a/libtorrent/src/peer_connection.cpp +++ b/libtorrent/src/peer_connection.cpp @@ -390,7 +390,6 @@ namespace libtorrent TORRENT_ASSERT(m_peer_info->connection == 0); boost::shared_ptr t = m_torrent.lock(); - if (t) TORRENT_ASSERT(t->connection_for(remote()) != this); #endif } @@ -1373,21 +1372,28 @@ namespace libtorrent { session_impl::mutex_t::scoped_lock l(m_ses.m_mutex); + INVARIANT_CHECK; + m_outstanding_writing_bytes -= p.length; TORRENT_ASSERT(m_outstanding_writing_bytes >= 0); -#ifdef TORRENT_VERBOSE_LOGGING - (*m_logger) << " *** on_disk_write_complete() " << p.length << "\n"; +#if defined(TORRENT_VERBOSE_LOGGING) || defined(TORRENT_LOGGING) + (*m_ses.m_logger) << time_now_string() << " *** DISK_WRITE_COMPLETE [ p: " + << p.piece << " o: " << p.start << " ]\n"; #endif // in case the outstanding bytes just dropped down // to allow to receive more data setup_receive(); + piece_block block_finished(p.piece, p.start / t->block_size()); + if (ret == -1 || !t) { + if (t->has_picker()) t->picker().abort_download(block_finished); + if (!t) { - m_ses.connection_failed(m_socket, remote(), j.str.c_str()); + m_ses.connection_failed(self(), remote(), j.str.c_str()); return; } @@ -1406,7 +1412,6 @@ namespace libtorrent TORRENT_ASSERT(p.piece == j.piece); TORRENT_ASSERT(p.start == j.offset); - piece_block block_finished(p.piece, p.start / t->block_size()); picker.mark_as_finished(block_finished, peer_info_struct()); if (t->alerts().should_post(alert::debug)) { @@ -1414,13 +1419,6 @@ namespace libtorrent block_finished.block_index, block_finished.piece_index, "block finished")); } - if (!t->is_seed() && !m_torrent.expired()) - { - // this is a free function defined in policy.cpp - request_a_block(*t, *this); - send_block_requests(); - } - #ifndef NDEBUG try { @@ -1444,6 +1442,14 @@ namespace libtorrent TORRENT_ASSERT(false); } #endif + + if (!t->is_seed() && !m_torrent.expired()) + { + // this is a free function defined in policy.cpp + request_a_block(*t, *this); + send_block_requests(); + } + } // ----------------------------- @@ -1918,7 +1924,8 @@ namespace libtorrent "s: " << r.start << " | " "l: " << r.length << " | " "ds: " << statistics().download_rate() << " B/s | " - "qs: " << m_desired_queue_size << " ]\n"; + "qs: " << m_desired_queue_size << " " + "blk: " << (m_request_large_blocks?"large":"single") << " ]\n"; #endif } m_last_piece = time_now(); @@ -1936,7 +1943,7 @@ namespace libtorrent (*m_ses.m_logger) << "CONNECTION TIMED OUT: " << m_remote.address().to_string() << "\n"; #endif - m_ses.connection_failed(m_socket, m_remote, "timed out"); + m_ses.connection_failed(self(), m_remote, "timed out"); } void peer_connection::disconnect() @@ -2201,18 +2208,19 @@ namespace libtorrent piece_picker& picker = t->picker(); while (!m_download_queue.empty()) { - picker.abort_download(m_download_queue.back()); + piece_block const& r = m_download_queue.back(); + picker.abort_download(r); + write_cancel(t->to_req(r)); m_download_queue.pop_back(); } while (!m_request_queue.empty()) { - picker.abort_download(m_request_queue.back()); + piece_block const& r = m_request_queue.back(); + picker.abort_download(r); + write_cancel(t->to_req(r)); m_request_queue.pop_back(); } - // TODO: If we have a limited number of upload - // slots, choke this peer - m_assume_fifo = true; request_a_block(*t, *this); @@ -2281,7 +2289,7 @@ namespace libtorrent #ifdef TORRENT_VERBOSE_LOGGING (*m_logger) << "**ERROR**: " << e.what() << "\n"; #endif - m_ses.connection_failed(m_socket, remote(), e.what()); + m_ses.connection_failed(self(), remote(), e.what()); } } @@ -2331,7 +2339,7 @@ namespace libtorrent boost::shared_ptr t = m_torrent.lock(); if (!t) { - m_ses.connection_failed(m_socket, remote(), j.str.c_str()); + m_ses.connection_failed(self(), remote(), j.str.c_str()); return; } @@ -2667,7 +2675,7 @@ namespace libtorrent boost::shared_ptr t = m_torrent.lock(); if (!t) { - m_ses.connection_failed(m_socket, remote(), e.what()); + m_ses.connection_failed(self(), remote(), e.what()); return; } @@ -2682,14 +2690,14 @@ namespace libtorrent catch (std::exception& e) { session_impl::mutex_t::scoped_lock l(m_ses.m_mutex); - m_ses.connection_failed(m_socket, remote(), e.what()); + m_ses.connection_failed(self(), remote(), e.what()); } catch (...) { // all exceptions should derive from std::exception TORRENT_ASSERT(false); session_impl::mutex_t::scoped_lock l(m_ses.m_mutex); - m_ses.connection_failed(m_socket, remote(), "connection failed for unknown reason"); + m_ses.connection_failed(self(), remote(), "connection failed for unknown reason"); } bool peer_connection::can_write() const @@ -2770,7 +2778,7 @@ namespace libtorrent (*m_ses.m_logger) << "CONNECTION FAILED: " << m_remote.address().to_string() << ": " << e.message() << "\n"; #endif - m_ses.connection_failed(m_socket, m_remote, e.message().c_str()); + m_ses.connection_failed(self(), m_remote, e.message().c_str()); return; } @@ -2790,14 +2798,14 @@ namespace libtorrent catch (std::exception& ex) { session_impl::mutex_t::scoped_lock l(m_ses.m_mutex); - m_ses.connection_failed(m_socket, remote(), ex.what()); + m_ses.connection_failed(self(), remote(), ex.what()); } catch (...) { // all exceptions should derive from std::exception TORRENT_ASSERT(false); session_impl::mutex_t::scoped_lock l(m_ses.m_mutex); - m_ses.connection_failed(m_socket, remote(), "connection failed for unkown reason"); + m_ses.connection_failed(self(), remote(), "connection failed for unkown reason"); } // -------------------------- @@ -2847,14 +2855,14 @@ namespace libtorrent catch (std::exception& e) { session_impl::mutex_t::scoped_lock l(m_ses.m_mutex); - m_ses.connection_failed(m_socket, remote(), e.what()); + m_ses.connection_failed(self(), remote(), e.what()); } catch (...) { // all exceptions should derive from std::exception TORRENT_ASSERT(false); session_impl::mutex_t::scoped_lock l(m_ses.m_mutex); - m_ses.connection_failed(m_socket, remote(), "connection failed for unknown reason"); + m_ses.connection_failed(self(), remote(), "connection failed for unknown reason"); } @@ -2871,26 +2879,47 @@ namespace libtorrent } boost::shared_ptr t = m_torrent.lock(); - if (!t) + if (!t) return; + + if (m_peer_info) { - typedef session_impl::torrent_map torrent_map; - torrent_map& m = m_ses.m_torrents; - for (torrent_map::iterator i = m.begin(), end(m.end()); i != end; ++i) + policy::const_iterator i; + for (i = t->get_policy().begin_peer(); + i != t->get_policy().end_peer(); ++i) { - torrent& t = *i->second; - TORRENT_ASSERT(t.connection_for(m_remote) != this); + if (&i->second == m_peer_info) break; } - return; + TORRENT_ASSERT(i != t->get_policy().end_peer()); } - - TORRENT_ASSERT(t->connection_for(remote()) != 0 || m_in_constructor); - - if (!m_in_constructor && t->connection_for(remote()) != this - && !m_ses.settings().allow_multiple_connections_per_ip) + if (t->has_picker() && !t->is_aborted()) { - TORRENT_ASSERT(false); - } + // make sure that pieces that have completed the download + // of all their blocks are in the disk io thread's queue + // to be checked. + const std::vector& dl_queue + = t->picker().get_download_queue(); + for (std::vector::const_iterator i = + dl_queue.begin(); i != dl_queue.end(); ++i) + { + const int blocks_per_piece = t->picker().blocks_in_piece(i->index); + bool complete = true; + for (int j = 0; j < blocks_per_piece; ++j) + { + if (i->info[j].state == piece_picker::block_info::state_finished) + continue; + complete = false; + break; + } + if (complete) + { + disk_io_job ret = m_ses.m_disk_thread.find_job( + &t->filesystem(), -1, i->index); + TORRENT_ASSERT(ret.action == disk_io_job::hash || ret.action == disk_io_job::write); + TORRENT_ASSERT(ret.piece == i->index); + } + } + } // expensive when using checked iterators /* if (t->valid_metadata()) @@ -2953,11 +2982,24 @@ namespace libtorrent // time, it is considered to have timed out time_duration d; d = now - m_last_receive; - if (d > seconds(m_timeout)) return true; + if (d > seconds(m_timeout)) + { +#ifdef TORRENT_VERBOSE_LOGGING + (*m_logger) << time_now_string() << " *** LAST ACTIVITY [ " + << total_seconds(d) << " seconds ago ] ***\n"; +#endif + return true; + } - // if it takes more than 5 seconds to receive - // handshake, disconnect - if (in_handshake() && d > seconds(5)) return true; + // do not stall waiting for a handshake + if (in_handshake() && d > seconds(m_ses.settings().handshake_timeout)) + { +#ifdef TORRENT_VERBOSE_LOGGING + (*m_logger) << time_now_string() << " *** NO HANDSHAKE [ waited " + << total_seconds(d) << " seconds ] ***\n"; +#endif + return true; + } // disconnect peers that we unchoked, but // they didn't send a request within 20 seconds. @@ -2968,7 +3010,14 @@ namespace libtorrent && !m_choked && m_peer_interested && t && t->is_finished() - && d > seconds(20)) return true; + && d > seconds(20)) + { +#ifdef TORRENT_VERBOSE_LOGGING + (*m_logger) << time_now_string() << " *** NO REQUEST [ t: " + << total_seconds(d) << " ] ***\n"; +#endif + return true; + } // TODO: as long as we have less than 95% of the // global (or local) connection limit, connections should @@ -2984,11 +3033,21 @@ namespace libtorrent time_duration time_limit = seconds( m_ses.settings().inactivity_timeout); + // don't bother disconnect peers we haven't been intersted + // in (and that hasn't been interested in us) for a while + // unless we have used up all our connection slots if (!m_interesting && !m_peer_interested && d1 > time_limit - && d2 > time_limit) + && d2 > time_limit + && (m_ses.num_connections() >= m_ses.max_connections() + || (t && t->num_peers() >= t->max_connections()))) { +#ifdef TORRENT_VERBOSE_LOGGING + (*m_logger) << time_now_string() << " *** MUTUAL NO INTEREST [ " + "t1: " << total_seconds(d1) << " | " + "t2: " << total_seconds(d2) << " ] ***\n"; +#endif return true; } @@ -3031,10 +3090,9 @@ namespace libtorrent if (m_writing) return; #ifdef TORRENT_VERBOSE_LOGGING - using namespace boost::posix_time; (*m_logger) << time_now_string() << " ==> KEEPALIVE\n"; #endif - + m_last_sent = time_now(); write_keepalive(); } diff --git a/libtorrent/src/piece_picker.cpp b/libtorrent/src/piece_picker.cpp index ebfe07637..831bd0986 100755 --- a/libtorrent/src/piece_picker.cpp +++ b/libtorrent/src/piece_picker.cpp @@ -962,16 +962,16 @@ namespace libtorrent int priority = p.priority(m_sequenced_download_threshold); TORRENT_ASSERT(priority < int(m_piece_info.size())); - TORRENT_ASSERT(p.downloading == 1); - TORRENT_ASSERT(!p.have()); - - std::vector::iterator i - = std::find_if(m_downloads.begin() - , m_downloads.end() - , has_index(index)); - TORRENT_ASSERT(i != m_downloads.end()); - erase_download_piece(i); - p.downloading = 0; + if (p.downloading) + { + std::vector::iterator i + = std::find_if(m_downloads.begin() + , m_downloads.end() + , has_index(index)); + TORRENT_ASSERT(i != m_downloads.end()); + erase_download_piece(i); + p.downloading = 0; + } TORRENT_ASSERT(std::find_if(m_downloads.begin(), m_downloads.end() , has_index(index)) == m_downloads.end()); diff --git a/libtorrent/src/policy.cpp b/libtorrent/src/policy.cpp index 4de01d055..f203eaa56 100755 --- a/libtorrent/src/policy.cpp +++ b/libtorrent/src/policy.cpp @@ -82,13 +82,13 @@ namespace // want to trade it's surplus uploads for downloads itself // (and we should not consider it free). If the share diff is // negative, there's no free download to get from this peer. - size_type diff = i->second->share_diff(); + size_type diff = (*i)->share_diff(); TORRENT_ASSERT(diff < (std::numeric_limits::max)()); - if (i->second->is_peer_interested() || diff <= 0) + if ((*i)->is_peer_interested() || diff <= 0) continue; TORRENT_ASSERT(diff > 0); - i->second->add_free_upload(-diff); + (*i)->add_free_upload(-diff); accumulator += diff; TORRENT_ASSERT(accumulator > 0); } @@ -109,10 +109,10 @@ namespace size_type total_diff = 0; for (torrent::peer_iterator i = start; i != end; ++i) { - size_type d = i->second->share_diff(); + size_type d = (*i)->share_diff(); TORRENT_ASSERT(d < (std::numeric_limits::max)()); total_diff += d; - if (!i->second->is_peer_interested() || i->second->share_diff() >= 0) continue; + if (!(*i)->is_peer_interested() || (*i)->share_diff() >= 0) continue; ++num_peers; } @@ -130,7 +130,7 @@ namespace for (torrent::peer_iterator i = start; i != end; ++i) { - peer_connection* p = i->second; + peer_connection* p = *i; if (!p->is_peer_interested() || p->share_diff() >= 0) continue; p->add_free_upload(upload_share); free_upload -= upload_share; @@ -904,7 +904,7 @@ namespace libtorrent { TORRENT_ASSERT(!c.is_local()); - INVARIANT_CHECK; +// INVARIANT_CHECK; // if the connection comes from the tracker, // it's probably just a NAT-check. Ignore the @@ -932,10 +932,11 @@ namespace libtorrent if (m_torrent->settings().allow_multiple_connections_per_ip) { - i = std::find_if( - m_peers.begin() - , m_peers.end() - , match_peer_connection(c)); + tcp::endpoint remote = c.remote(); + std::pair range = m_peers.equal_range(remote.address()); + i = std::find_if(range.first, range.second, match_peer_endpoint(remote)); + + if (i == range.second) i = m_peers.end(); } else { @@ -977,8 +978,6 @@ namespace libtorrent i = m_peers.insert(std::make_pair(c.remote().address(), p)); } - TORRENT_ASSERT(m_torrent->connection_for(c.remote()) == &c); - c.set_peer_info(&i->second); TORRENT_ASSERT(i->second.connection == 0); c.add_stat(i->second.prev_amount_download, i->second.prev_amount_upload); @@ -991,7 +990,38 @@ namespace libtorrent // m_last_optimistic_disconnect = time_now(); } - policy::peer* policy::peer_from_tracker(const tcp::endpoint& remote, const peer_id& pid + void policy::update_peer_port(int port, policy::peer* p, int src) + { + TORRENT_ASSERT(p != 0); + if (p->ip.port() == port) return; + + if (m_torrent->settings().allow_multiple_connections_per_ip) + { + tcp::endpoint remote(p->ip.address(), port); + std::pair range = m_peers.equal_range(remote.address()); + iterator i = std::find_if(range.first, range.second + , match_peer_endpoint(remote)); + if (i != m_peers.end()) + { + policy::peer& pp = i->second; + if (pp.connection) + { + throw protocol_error("duplicate connection"); + } + if (m_torrent->has_picker()) + m_torrent->picker().clear_peer(&i->second); + m_peers.erase(i); + } + } + else + { + TORRENT_ASSERT(m_peers.count(p->ip.address()) == 1); + } + p->ip.port(port); + p->source |= src; + } + + policy::peer* policy::peer_from_tracker(tcp::endpoint const& remote, peer_id const& pid , int src, char flags) { // too expensive @@ -1022,9 +1052,7 @@ namespace libtorrent { std::pair range = m_peers.equal_range(remote.address()); i = std::find_if(range.first, range.second, match_peer_endpoint(remote)); - - if (i == range.second) - i = std::find_if(m_peers.begin(), m_peers.end(), match_peer_id(pid)); + if (i == range.second) i = m_peers.end(); } else { @@ -1066,9 +1094,6 @@ namespace libtorrent { i->second.type = peer::connectable; - // in case we got the ip from a remote connection, port is - // not known, so save it. Client may also have changed port - // for some reason. i->second.ip = remote; i->second.source |= src; @@ -1281,10 +1306,7 @@ namespace libtorrent try { - p->second.connected = time_now(); - p->second.connection = m_torrent->connect_to_peer(&p->second); - TORRENT_ASSERT(p->second.connection == m_torrent->connection_for(p->second.ip)); - if (p->second.connection == 0) + if (!m_torrent->connect_to_peer(&p->second)) { ++p->second.failcount; return false; @@ -1396,6 +1418,7 @@ namespace libtorrent void policy::check_invariant() const { if (m_torrent->is_aborted()) return; + int connected_peers = 0; int total_connections = 0; @@ -1414,20 +1437,14 @@ namespace libtorrent { TORRENT_ASSERT(unique_test.count(p.ip) == 0); unique_test.insert(p.ip); + TORRENT_ASSERT(i->first == p.ip.address()); +// TORRENT_ASSERT(p.connection == 0 || p.ip == p.connection->remote()); } ++total_connections; if (!p.connection) { continue; } - if (!m_torrent->settings().allow_multiple_connections_per_ip) - { - std::vector conns; - m_torrent->connection_for(p.ip.address(), conns); - TORRENT_ASSERT(std::find_if(conns.begin(), conns.end() - , boost::bind(std::equal_to(), _1, p.connection)) - != conns.end()); - } if (p.optimistically_unchoked) { TORRENT_ASSERT(p.connection); @@ -1444,10 +1461,10 @@ namespace libtorrent for (torrent::const_peer_iterator i = m_torrent->begin(); i != m_torrent->end(); ++i) { - if (i->second->is_disconnecting()) continue; + if ((*i)->is_disconnecting()) continue; // ignore web_peer_connections since they are not managed // by the policy class - if (dynamic_cast(i->second)) continue; + if (dynamic_cast(*i)) continue; ++num_torrent_peers; } diff --git a/libtorrent/src/session_impl.cpp b/libtorrent/src/session_impl.cpp index 63c039010..b3eba0bf7 100755 --- a/libtorrent/src/session_impl.cpp +++ b/libtorrent/src/session_impl.cpp @@ -89,6 +89,10 @@ namespace } openssl_global_destructor; } +#endif +#ifdef _WIN32 +// for ERROR_SEM_TIMEOUT +#include #endif using boost::shared_ptr; @@ -543,8 +547,8 @@ namespace detail , fingerprint const& cl_fprint , char const* listen_interface) : m_send_buffers(send_buffer_size) - , m_strand(m_io_service) , m_files(40) + , m_strand(m_io_service) , m_half_open(m_io_service) , m_download_channel(m_io_service, peer_connection::download_channel) , m_upload_channel(m_io_service, peer_connection::upload_channel) @@ -671,6 +675,17 @@ namespace detail if (m_dht) m_dht->stop(); #endif m_timer.cancel(); + + // close the listen sockets + for (std::list::iterator i = m_listen_sockets.begin() + , end(m_listen_sockets.end()); i != end; ++i) + { + i->sock->close(); + } + +#if defined(TORRENT_VERBOSE_LOGGING) || defined(TORRENT_LOGGING) + (*m_logger) << time_now_string() << " aborting all torrents\n"; +#endif // abort all torrents for (torrent_map::iterator i = m_torrents.begin() , end(m_torrents.end()); i != end; ++i) @@ -678,7 +693,68 @@ namespace detail i->second->abort(); } - m_io_service.stop(); +#if defined(TORRENT_VERBOSE_LOGGING) || defined(TORRENT_LOGGING) + (*m_logger) << time_now_string() << " aborting all tracker requests\n"; +#endif + m_tracker_manager.abort_all_requests(); + +#if defined(TORRENT_VERBOSE_LOGGING) || defined(TORRENT_LOGGING) + (*m_logger) << time_now_string() << " sending event=stopped to trackers\n"; + int counter = 0; +#endif + for (torrent_map::iterator i = m_torrents.begin(); + i != m_torrents.end(); ++i) + { + torrent& t = *i->second; + + if ((!t.is_paused() || t.should_request()) + && !t.trackers().empty()) + { +#if defined(TORRENT_VERBOSE_LOGGING) || defined(TORRENT_LOGGING) + ++counter; +#endif + tracker_request req = t.generate_tracker_request(); + TORRENT_ASSERT(req.event == tracker_request::stopped); + req.listen_port = 0; + if (!m_listen_sockets.empty()) + req.listen_port = m_listen_sockets.front().external_port; + req.key = m_key; + std::string login = i->second->tracker_login(); +#if defined(TORRENT_VERBOSE_LOGGING) || defined(TORRENT_LOGGING) + boost::shared_ptr tl(new tracker_logger(*this)); + m_tracker_loggers.push_back(tl); + m_tracker_manager.queue_request(m_strand, m_half_open, req, login + , m_listen_interface.address(), tl); +#else + m_tracker_manager.queue_request(m_strand, m_half_open, req, login + , m_listen_interface.address()); +#endif + } + } +#if defined(TORRENT_VERBOSE_LOGGING) || defined(TORRENT_LOGGING) + (*m_logger) << time_now_string() << " sent " << counter << " tracker stop requests\n"; +#endif + +#if defined(TORRENT_VERBOSE_LOGGING) || defined(TORRENT_LOGGING) + (*m_logger) << time_now_string() << " aborting all connections (" << m_connections.size() << ")\n"; +#endif + // abort all connections + while (!m_connections.empty()) + { +#ifndef NDEBUG + int conn = m_connections.size(); +#endif + (*m_connections.begin())->disconnect(); + TORRENT_ASSERT(conn == int(m_connections.size()) + 1); + } + +#if defined(TORRENT_VERBOSE_LOGGING) || defined(TORRENT_LOGGING) + (*m_logger) << time_now_string() << " shutting down connection queue\n"; +#endif + m_half_open.close(); + + m_download_channel.close(); + m_upload_channel.close(); mutex::scoped_lock l2(m_checker_impl.m_mutex); // abort the checker thread @@ -898,8 +974,8 @@ namespace detail void session_impl::async_accept(boost::shared_ptr const& listener) { - shared_ptr c(new socket_type(m_io_service)); - c->instantiate(); + shared_ptr c(new socket_type); + c->instantiate(m_io_service); listener->async_accept(c->get() , bind(&session_impl::on_incoming_connection, this, c , boost::weak_ptr(listener), _1)); @@ -924,6 +1000,15 @@ namespace detail std::string msg = "error accepting connection on '" + boost::lexical_cast(ep) + "' " + e.message(); (*m_logger) << msg << "\n"; +#endif +#ifdef _WIN32 + // Windows sometimes generates this error. It seems to be + // non-fatal and we have to do another async_accept. + if (e.value() == ERROR_SEM_TIMEOUT) + { + async_accept(listener); + return; + } #endif if (m_alerts.should_post(alert::fatal)) { @@ -964,6 +1049,17 @@ namespace detail return; } + // don't allow more connections than the max setting + if (num_connections() > max_connections()) + { +#if defined(TORRENT_VERBOSE_LOGGING) || defined(TORRENT_LOGGING) + (*m_logger) << "number of connections limit exceeded (conns: " + << num_connections() << ", limit: " << max_connections() + << "), connection rejected\n"; +#endif + return; + } + // check if we have any active torrents // if we don't reject the connection if (m_torrents.empty()) return; @@ -986,7 +1082,7 @@ namespace detail c->m_in_constructor = false; #endif - m_connections.insert(std::make_pair(s, c)); + m_connections.insert(c); } catch (std::exception& exc) { @@ -995,7 +1091,7 @@ namespace detail #endif }; - void session_impl::connection_failed(boost::shared_ptr const& s + void session_impl::connection_failed(boost::intrusive_ptr const& peer , tcp::endpoint const& a, char const* message) #ifndef NDEBUG try @@ -1006,7 +1102,7 @@ namespace detail // too expensive // INVARIANT_CHECK; - connection_map::iterator p = m_connections.find(s); + connection_map::iterator p = m_connections.find(peer); // the connection may have been disconnected in the receive or send phase if (p == m_connections.end()) return; @@ -1015,15 +1111,15 @@ namespace detail m_alerts.post_alert( peer_error_alert( a - , p->second->pid() + , (*p)->pid() , message)); } #if defined(TORRENT_VERBOSE_LOGGING) - (*p->second->m_logger) << "*** CONNECTION FAILED " << message << "\n"; + (*(*p)->m_logger) << "*** CONNECTION FAILED " << message << "\n"; #endif - p->second->set_failed(); - p->second->disconnect(); + (*p)->set_failed(); + (*p)->disconnect(); } #ifndef NDEBUG catch (...) @@ -1040,10 +1136,10 @@ namespace detail // INVARIANT_CHECK; TORRENT_ASSERT(p->is_disconnecting()); - connection_map::iterator i = m_connections.find(p->get_socket()); + connection_map::iterator i = m_connections.find(p); if (i != m_connections.end()) { - if (!i->second->is_choked()) --m_num_unchoked; + if (!(*i)->is_choked()) --m_num_unchoked; m_connections.erase(i); } } @@ -1102,7 +1198,7 @@ namespace detail for (connection_map::iterator i = m_connections.begin() , end(m_connections.end()); i != end; ++i) { - if (i->second->is_connecting()) + if ((*i)->is_connecting()) ++num_half_open; else ++num_complete_connections; @@ -1190,7 +1286,7 @@ namespace detail ++i; // if this socket has timed out // close it. - peer_connection& c = *j->second; + peer_connection& c = *j->get(); if (c.has_timed_out()) { if (m_alerts.should_post(alert::debug)) @@ -1257,7 +1353,7 @@ namespace detail for (connection_map::iterator i = m_connections.begin() , end(m_connections.end()); i != end; ++i) { - peer_connection* p = i->second.get(); + peer_connection* p = i->get(); torrent* t = p->associated_torrent().lock().get(); if (!p->peer_info_struct() || t == 0 @@ -1267,7 +1363,7 @@ namespace detail || (p->share_diff() < -free_upload_amount && !t->is_seed())) { - if (!i->second->is_choked() && t) + if (!(*i)->is_choked() && t) { policy::peer* pi = p->peer_info_struct(); if (pi && pi->optimistically_unchoked) @@ -1276,11 +1372,11 @@ namespace detail // force a new optimistic unchoke m_optimistic_unchoke_time_scaler = 0; } - t->choke_peer(*i->second); + t->choke_peer(*(*i)); } continue; } - peers.push_back(i->second.get()); + peers.push_back(i->get()); } // sort the peers that are eligible for unchoke by download rate and secondary @@ -1352,7 +1448,7 @@ namespace detail for (connection_map::iterator i = m_connections.begin() , end(m_connections.end()); i != end; ++i) { - peer_connection* p = i->second.get(); + peer_connection* p = i->get(); TORRENT_ASSERT(p); policy::peer* pi = p->peer_info_struct(); if (!pi) continue; @@ -1383,21 +1479,21 @@ namespace detail { if (current_optimistic_unchoke != m_connections.end()) { - torrent* t = current_optimistic_unchoke->second->associated_torrent().lock().get(); + torrent* t = (*current_optimistic_unchoke)->associated_torrent().lock().get(); TORRENT_ASSERT(t); - current_optimistic_unchoke->second->peer_info_struct()->optimistically_unchoked = false; - t->choke_peer(*current_optimistic_unchoke->second); + (*current_optimistic_unchoke)->peer_info_struct()->optimistically_unchoked = false; + t->choke_peer(*current_optimistic_unchoke->get()); } else { ++m_num_unchoked; } - torrent* t = optimistic_unchoke_candidate->second->associated_torrent().lock().get(); + torrent* t = (*optimistic_unchoke_candidate)->associated_torrent().lock().get(); TORRENT_ASSERT(t); - bool ret = t->unchoke_peer(*optimistic_unchoke_candidate->second); + bool ret = t->unchoke_peer(*optimistic_unchoke_candidate->get()); TORRENT_ASSERT(ret); - optimistic_unchoke_candidate->second->peer_info_struct()->optimistically_unchoked = true; + (*optimistic_unchoke_candidate)->peer_info_struct()->optimistically_unchoked = true; } } } @@ -1460,96 +1556,11 @@ namespace detail } while (!m_abort); - deadline_timer tracker_timer(m_io_service); - // this will remove the port mappings - if (m_natpmp.get()) - m_natpmp->close(); - if (m_upnp.get()) - m_upnp->close(); - #if defined(TORRENT_VERBOSE_LOGGING) || defined(TORRENT_LOGGING) (*m_logger) << time_now_string() << " locking mutex\n"; #endif session_impl::mutex_t::scoped_lock l(m_mutex); -#if defined(TORRENT_VERBOSE_LOGGING) || defined(TORRENT_LOGGING) - (*m_logger) << time_now_string() << " aborting all tracker requests\n"; -#endif - m_tracker_manager.abort_all_requests(); -#if defined(TORRENT_VERBOSE_LOGGING) || defined(TORRENT_LOGGING) - (*m_logger) << time_now_string() << " sending stopped to all torrent's trackers\n"; -#endif - for (std::map >::iterator i = - m_torrents.begin(); i != m_torrents.end(); ++i) - { - i->second->abort(); - // generate a tracker request in case the torrent is not paused - // (in which case it's not currently announced with the tracker) - // or if the torrent itself thinks we should request. Do not build - // a request in case the torrent doesn't have any trackers - if ((!i->second->is_paused() || i->second->should_request()) - && !i->second->trackers().empty()) - { - tracker_request req = i->second->generate_tracker_request(); - TORRENT_ASSERT(!m_listen_sockets.empty()); - req.listen_port = 0; - if (!m_listen_sockets.empty()) - req.listen_port = m_listen_sockets.front().external_port; - req.key = m_key; - std::string login = i->second->tracker_login(); -#if defined(TORRENT_VERBOSE_LOGGING) || defined(TORRENT_LOGGING) - boost::shared_ptr tl(new tracker_logger(*this)); - m_tracker_loggers.push_back(tl); - m_tracker_manager.queue_request(m_strand, m_half_open, req, login - , m_listen_interface.address(), tl); -#else - m_tracker_manager.queue_request(m_strand, m_half_open, req, login - , m_listen_interface.address()); -#endif - } - } - - // close the listen sockets - m_listen_sockets.clear(); - - ptime start(time_now()); - l.unlock(); - -#if defined(TORRENT_VERBOSE_LOGGING) || defined(TORRENT_LOGGING) - (*m_logger) << time_now_string() << " waiting for trackers to respond (" - << m_settings.stop_tracker_timeout << " seconds timeout)\n"; -#endif - - while (time_now() - start < seconds( - m_settings.stop_tracker_timeout) - && !m_tracker_manager.empty()) - { -#if defined(TORRENT_VERBOSE_LOGGING) || defined(TORRENT_LOGGING) - (*m_logger) << time_now_string() << " " << m_tracker_manager.num_requests() - << " tracker requests pending\n"; -#endif - tracker_timer.expires_from_now(milliseconds(100)); - tracker_timer.async_wait(m_strand.wrap( - bind(&io_service::stop, &m_io_service))); - - m_io_service.reset(); - m_io_service.run(); - } - -#if defined(TORRENT_VERBOSE_LOGGING) || defined(TORRENT_LOGGING) - (*m_logger) << time_now_string() << " tracker shutdown complete, locking mutex\n"; -#endif - - l.lock(); - TORRENT_ASSERT(m_abort); - m_abort = true; - -#if defined(TORRENT_VERBOSE_LOGGING) || defined(TORRENT_LOGGING) - (*m_logger) << time_now_string() << " cleaning up connections\n"; -#endif - while (!m_connections.empty()) - m_connections.begin()->second->disconnect(); - #ifndef NDEBUG for (torrent_map::iterator i = m_torrents.begin(); i != m_torrents.end(); ++i) @@ -2077,8 +2088,8 @@ namespace detail entry session_impl::dht_state() const { - TORRENT_ASSERT(m_dht); mutex_t::scoped_lock l(m_mutex); + if (!m_dht) return entry(); return m_dht->state(); } @@ -2114,16 +2125,10 @@ namespace detail session_impl::~session_impl() { - abort(); - #if defined(TORRENT_VERBOSE_LOGGING) || defined(TORRENT_LOGGING) (*m_logger) << time_now_string() << "\n\n *** shutting down session *** \n\n"; #endif - // lock the main thread and abort it - mutex_t::scoped_lock l(m_mutex); - m_abort = true; - m_io_service.stop(); - l.unlock(); + abort(); #if defined(TORRENT_VERBOSE_LOGGING) || defined(TORRENT_LOGGING) (*m_logger) << time_now_string() << " waiting for main thread\n"; @@ -2364,19 +2369,20 @@ namespace detail for (connection_map::const_iterator i = m_connections.begin(); i != m_connections.end(); ++i) { - TORRENT_ASSERT(i->second); - boost::shared_ptr t = i->second->associated_torrent().lock(); + TORRENT_ASSERT(*i); + boost::shared_ptr t = (*i)->associated_torrent().lock(); - if (!i->second->is_choked()) ++unchokes; - if (i->second->peer_info_struct() - && i->second->peer_info_struct()->optimistically_unchoked) + peer_connection* p = i->get(); + if (!p->is_choked()) ++unchokes; + if (p->peer_info_struct() + && p->peer_info_struct()->optimistically_unchoked) { ++num_optimistic; - TORRENT_ASSERT(!i->second->is_choked()); + TORRENT_ASSERT(!p->is_choked()); } - if (t && i->second->peer_info_struct()) + if (t && p->peer_info_struct()) { - TORRENT_ASSERT(t->get_policy().has_connection(boost::get_pointer(i->second))); + TORRENT_ASSERT(t->get_policy().has_connection(p)); } } TORRENT_ASSERT(num_optimistic == 0 || num_optimistic == 1); diff --git a/libtorrent/src/storage.cpp b/libtorrent/src/storage.cpp index 6671e38e9..0468684f3 100755 --- a/libtorrent/src/storage.cpp +++ b/libtorrent/src/storage.cpp @@ -99,6 +99,7 @@ POSSIBILITY OF SUCH DAMAGE. #include #include #include "libtorrent/utf8.hpp" +#include "libtorrent/buffer.hpp" namespace libtorrent { @@ -386,7 +387,7 @@ namespace libtorrent file_pool& m_files; // temporary storage for moving pieces - std::vector m_scratch_buffer; + buffer m_scratch_buffer; }; sha1_hash storage::hash_for_slot(int slot, partial_hash& ph, int piece_size) @@ -468,14 +469,14 @@ namespace libtorrent void storage::release_files() { m_files.release(this); - std::vector().swap(m_scratch_buffer); + buffer().swap(m_scratch_buffer); } void storage::delete_files() { // make sure we don't have the files open m_files.release(this); - std::vector().swap(m_scratch_buffer); + buffer().swap(m_scratch_buffer); // delete the files from disk std::set directories; @@ -485,11 +486,12 @@ namespace libtorrent { std::string p = (m_save_path / i->path).string(); fs::path bp = i->path.branch_path(); - std::pair ret = directories.insert(bp.string()); + std::pair ret; + ret.second = true; while (ret.second && !bp.empty()) { + std::pair ret = directories.insert((m_save_path / bp).string()); bp = bp.branch_path(); - std::pair ret = directories.insert(bp.string()); } std::remove(p.c_str()); } @@ -498,9 +500,6 @@ namespace libtorrent // subdirectories first std::for_each(directories.rbegin(), directories.rend() , bind((int(*)(char const*))&std::remove, bind(&std::string::c_str, _1))); - - std::string p = (m_save_path / m_info->name()).string(); - std::remove(p.c_str()); } void storage::write_resume_data(entry& rd) const @@ -977,6 +976,7 @@ namespace libtorrent , m_storage_mode(storage_mode_sparse) , m_info(ti) , m_save_path(complete(save_path)) + , m_state(state_none) , m_current_slot(0) , m_out_of_place(false) , m_scratch_piece(-1) @@ -1624,8 +1624,8 @@ namespace libtorrent if (m_current_slot == m_info->num_pieces()) { m_state = state_create_files; - std::vector().swap(m_scratch_buffer); - std::vector().swap(m_scratch_buffer2); + buffer().swap(m_scratch_buffer); + buffer().swap(m_scratch_buffer2); if (m_storage_mode != storage_mode_compact) { std::vector().swap(m_piece_to_slot); diff --git a/libtorrent/src/torrent.cpp b/libtorrent/src/torrent.cpp index 840e488ba..84f30aa2d 100755 --- a/libtorrent/src/torrent.cpp +++ b/libtorrent/src/torrent.cpp @@ -116,11 +116,11 @@ namespace , tor(t) { TORRENT_ASSERT(t != 0); } - bool operator()(const session_impl::connection_map::value_type& c) const + bool operator()(session_impl::connection_map::value_type const& c) const { - tcp::endpoint sender = c.first->remote_endpoint(); + tcp::endpoint const& sender = c->remote(); if (sender.address() != ip.address()) return false; - if (tor != c.second->associated_torrent().lock().get()) return false; + if (tor != c->associated_torrent().lock().get()) return false; return true; } @@ -132,9 +132,9 @@ namespace { peer_by_id(const peer_id& i): pid(i) {} - bool operator()(const std::pair& p) const + bool operator()(session_impl::connection_map::value_type const& p) const { - if (p.second->pid() != pid) return false; + if (p->pid() != pid) return false; // have a special case for all zeros. We can have any number // of peers with that pid, since it's used to indicate no pid. if (std::count(pid.begin(), pid.end(), 0) == 20) return false; @@ -178,7 +178,6 @@ namespace libtorrent #ifndef TORRENT_DISABLE_DHT , m_last_dht_announce(time_now() - minutes(15)) #endif - , m_policy() , m_ses(ses) , m_checker(checker) , m_picker(0) @@ -203,8 +202,8 @@ namespace libtorrent , m_max_uploads((std::numeric_limits::max)()) , m_num_uploads(0) , m_max_connections((std::numeric_limits::max)()) + , m_policy(this) { - m_policy.reset(new policy(this)); } torrent::torrent( @@ -239,7 +238,6 @@ namespace libtorrent #ifndef TORRENT_DISABLE_DHT , m_last_dht_announce(time_now() - minutes(15)) #endif - , m_policy() , m_ses(ses) , m_checker(checker) , m_picker(0) @@ -263,6 +261,7 @@ namespace libtorrent , m_max_uploads((std::numeric_limits::max)()) , m_num_uploads(0) , m_max_connections((std::numeric_limits::max)()) + , m_policy(this) { INVARIANT_CHECK; @@ -273,8 +272,6 @@ namespace libtorrent m_trackers.push_back(announce_entry(tracker_url)); m_torrent_file->add_tracker(tracker_url); } - - m_policy.reset(new policy(this)); } void torrent::start() @@ -317,7 +314,7 @@ namespace libtorrent for (peer_iterator i = m_connections.begin(); i != m_connections.end(); ++i) { - (*i->second->m_logger) << "*** DESTRUCTING TORRENT\n"; + (*(*i)->m_logger) << "*** DESTRUCTING TORRENT\n"; } #endif @@ -326,6 +323,21 @@ namespace libtorrent disconnect_all(); } + peer_request torrent::to_req(piece_block const& p) + { + int block_offset = p.block_index * m_block_size; + int block_size = (std::min)((int)torrent_file().piece_size( + p.piece_index) - block_offset, m_block_size); + TORRENT_ASSERT(block_size > 0); + TORRENT_ASSERT(block_size <= m_block_size); + + peer_request r; + r.piece = p.piece_index; + r.start = block_offset; + r.length = block_size; + return r; + } + std::string torrent::name() const { if (valid_metadata()) return m_torrent_file->name(); @@ -334,10 +346,34 @@ namespace libtorrent } #ifndef TORRENT_DISABLE_EXTENSIONS + void torrent::add_extension(boost::shared_ptr ext) { m_extensions.push_back(ext); } + + void torrent::add_extension(boost::function(torrent*, void*)> const& ext + , void* userdata) + { + boost::shared_ptr tp(ext(this, userdata)); + if (!tp) return; + + add_extension(tp); + + for (peer_iterator i = m_connections.begin(); + i != m_connections.end(); ++i) + { + peer_connection* p = *i; + boost::shared_ptr pp(tp->new_connection(p)); + if (pp) p->add_extension(pp); + } + + // if files are checked for this torrent, call the extension + // to let it initialize itself + if (m_connections_initialized) + tp->on_files_checked(); + } + #endif // this may not be called from a constructor because of the call to @@ -551,7 +587,7 @@ namespace libtorrent continue; } - m_policy->peer_from_tracker(a, i->pid, peer_info::tracker, 0); + m_policy.peer_from_tracker(a, i->pid, peer_info::tracker, 0); } catch (std::exception&) { @@ -599,7 +635,7 @@ namespace libtorrent return; } - m_policy->peer_from_tracker(*host, pid, peer_info::tracker, 0); + m_policy.peer_from_tracker(*host, pid, peer_info::tracker, 0); } catch (std::exception&) {}; @@ -729,7 +765,7 @@ namespace libtorrent std::map downloading_piece; for (const_peer_iterator i = begin(); i != end(); ++i) { - peer_connection* pc = i->second; + peer_connection* pc = *i; boost::optional p = pc->downloading_piece_progress(); if (p) @@ -819,6 +855,11 @@ namespace libtorrent { session_impl::mutex_t::scoped_lock l(m_ses.m_mutex); +#if defined(TORRENT_VERBOSE_LOGGING) || defined(TORRENT_LOGGING) + (*m_ses.m_logger) << time_now_string() << " *** PIECE_FINISHED [ p: " + << index << " chk: " << (passed_hash_check?"passed":"failed") << " ]\n"; +#endif + bool was_seed = is_seed(); bool was_finished = m_picker->num_filtered() + num_pieces() == torrent_file().num_pieces(); @@ -865,7 +906,7 @@ namespace libtorrent { #endif - m_policy->piece_finished(index, passed_hash_check); + m_policy.piece_finished(index, passed_hash_check); #ifndef NDEBUG } @@ -905,7 +946,7 @@ namespace libtorrent // resets the download queue. So, we cannot do the // invariant check here since it assumes: // (total_done == m_torrent_file->total_size()) => is_seed() -// INVARIANT_CHECK; + INVARIANT_CHECK; TORRENT_ASSERT(m_storage); TORRENT_ASSERT(m_storage->refcount() > 0); @@ -944,17 +985,6 @@ namespace libtorrent { policy::peer* p = static_cast(*i); if (p == 0) continue; -#ifndef NDEBUG - if (!settings().allow_multiple_connections_per_ip) - { - std::vector conns; - connection_for(p->ip.address(), conns); - TORRENT_ASSERT(p->connection == 0 - || std::find_if(conns.begin(), conns.end() - , boost::bind(std::equal_to(), _1, p->connection)) - != conns.end()); - } -#endif if (p->connection) p->connection->received_invalid_data(index); // either, we have received too many failed hashes @@ -1019,7 +1049,7 @@ namespace libtorrent for (peer_iterator i = m_connections.begin(); i != m_connections.end(); ++i) { - (*i->second->m_logger) << "*** ABORTING TORRENT\n"; + (*(*i)->m_logger) << "*** ABORTING TORRENT\n"; } #endif @@ -1088,7 +1118,7 @@ namespace libtorrent m_picker->we_have(index); for (peer_iterator i = m_connections.begin(); i != m_connections.end(); ++i) - try { i->second->announce_piece(index); } catch (std::exception&) {} + try { (*i)->announce_piece(index); } catch (std::exception&) {} for (std::set::iterator i = peers.begin() , end(peers.end()); i != end; ++i) @@ -1259,7 +1289,7 @@ namespace libtorrent void torrent::update_peer_interest() { for (peer_iterator i = begin(); i != end(); ++i) - i->second->update_interest(); + (*i)->update_interest(); } void torrent::filter_piece(int index, bool filter) @@ -1441,7 +1471,7 @@ namespace libtorrent for (peer_iterator i = m_connections.begin() , end(m_connections.end()); i != end; ++i) { - i->second->cancel_request(block); + (*i)->cancel_request(block); } } @@ -1451,7 +1481,7 @@ namespace libtorrent TORRENT_ASSERT(p != 0); - peer_iterator i = m_connections.find(p->remote()); + peer_iterator i = m_connections.find(p); if (i == m_connections.end()) { TORRENT_ASSERT(false); @@ -1489,8 +1519,9 @@ namespace libtorrent if (!p->is_choked()) --m_num_uploads; - m_policy->connection_closed(*p); + m_policy.connection_closed(*p); p->set_peer_info(0); + TORRENT_ASSERT(i != m_connections.end()); m_connections.erase(i); } catch (std::exception& e) @@ -1506,7 +1537,7 @@ namespace libtorrent INVARIANT_CHECK; #if defined(TORRENT_VERBOSE_LOGGING) || defined(TORRENT_LOGGING) - (*m_ses.m_logger) << time_now_string() << " resolving: " << url << "\n"; + (*m_ses.m_logger) << time_now_string() << " resolving web seed: " << url << "\n"; #endif m_resolving_web_seeds.insert(url); @@ -1643,16 +1674,11 @@ namespace libtorrent return; } - peer_iterator conn = m_connections.find(a); - if (conn != m_connections.end()) - { - if (dynamic_cast(conn->second) == 0 - || conn->second->is_disconnecting()) conn->second->disconnect(); - else return; - } + boost::shared_ptr s(new socket_type); + + bool ret = instantiate_connection(m_ses.m_io_service, m_ses.web_seed_proxy(), *s); + TORRENT_ASSERT(ret); - boost::shared_ptr s - = instantiate_connection(m_ses.m_io_service, m_ses.web_seed_proxy()); if (m_ses.web_seed_proxy().type == proxy_settings::http || m_ses.web_seed_proxy().type == proxy_settings::http_pw) { @@ -1679,10 +1705,8 @@ namespace libtorrent try { // add the newly connected peer to this torrent's peer list - TORRENT_ASSERT(m_connections.find(a) == m_connections.end()); - m_connections.insert( - std::make_pair(a, boost::get_pointer(c))); - m_ses.m_connections.insert(std::make_pair(s, c)); + m_connections.insert(boost::get_pointer(c)); + m_ses.m_connections.insert(c); m_ses.m_half_open.enqueue( bind(&peer_connection::connect, c, _1) @@ -1698,7 +1722,7 @@ namespace libtorrent // TODO: post an error alert! // std::map::iterator i = m_connections.find(a); // if (i != m_connections.end()) m_connections.erase(i); - m_ses.connection_failed(s, a, e.what()); + m_ses.connection_failed(c, a, e.what()); c->disconnect(); } } @@ -1843,19 +1867,19 @@ namespace libtorrent } #endif - peer_connection* torrent::connect_to_peer(policy::peer* peerinfo) + bool torrent::connect_to_peer(policy::peer* peerinfo) throw() { INVARIANT_CHECK; TORRENT_ASSERT(peerinfo); TORRENT_ASSERT(peerinfo->connection == 0); + peerinfo->connected = time_now(); #ifndef NDEBUG // this asserts that we don't have duplicates in the policy's peer list - peer_iterator i_ = m_connections.find(peerinfo->ip); + peer_iterator i_ = std::find_if(m_connections.begin(), m_connections.end() + , bind(&peer_connection::remote, _1) == peerinfo->ip); TORRENT_ASSERT(i_ == m_connections.end() - || i_->second->is_disconnecting() - || dynamic_cast(i_->second) == 0 - || m_ses.settings().allow_multiple_connections_per_ip); + || dynamic_cast(*i_) == 0); #endif TORRENT_ASSERT(want_more_peers()); @@ -1864,8 +1888,11 @@ namespace libtorrent tcp::endpoint const& a(peerinfo->ip); TORRENT_ASSERT((m_ses.m_ip_filter.access(a.address()) & ip_filter::blocked) == 0); - boost::shared_ptr s - = instantiate_connection(m_ses.m_io_service, m_ses.peer_proxy()); + boost::shared_ptr s(new socket_type); + + bool ret = instantiate_connection(m_ses.m_io_service, m_ses.peer_proxy(), *s); + TORRENT_ASSERT(ret); + boost::intrusive_ptr c(new bt_peer_connection( m_ses, shared_from_this(), s, a, peerinfo)); @@ -1873,23 +1900,20 @@ namespace libtorrent c->m_in_constructor = false; #endif -#ifndef TORRENT_DISABLE_EXTENSIONS - for (extension_list_t::iterator i = m_extensions.begin() - , end(m_extensions.end()); i != end; ++i) - { - boost::shared_ptr pp((*i)->new_connection(c.get())); - if (pp) c->add_extension(pp); - } -#endif - try { - TORRENT_ASSERT(m_connections.find(a) == m_connections.end()); +#ifndef TORRENT_DISABLE_EXTENSIONS + for (extension_list_t::iterator i = m_extensions.begin() + , end(m_extensions.end()); i != end; ++i) + { + boost::shared_ptr pp((*i)->new_connection(c.get())); + if (pp) c->add_extension(pp); + } +#endif // add the newly connected peer to this torrent's peer list - m_connections.insert( - std::make_pair(a, boost::get_pointer(c))); - m_ses.m_connections.insert(std::make_pair(s, c)); + m_connections.insert(boost::get_pointer(c)); + m_ses.m_connections.insert(c); int timeout = settings().peer_connect_timeout; if (peerinfo) timeout += 3 * peerinfo->failcount; @@ -1901,16 +1925,15 @@ namespace libtorrent } catch (std::exception& e) { - TORRENT_ASSERT(false); - // TODO: post an error alert! - std::map::iterator i = m_connections.find(a); + std::set::iterator i + = m_connections.find(boost::get_pointer(c)); if (i != m_connections.end()) m_connections.erase(i); - m_ses.connection_failed(s, a, e.what()); + m_ses.connection_failed(c, a, e.what()); c->disconnect(); - throw; + return false; } - if (c->is_disconnecting()) throw protocol_error("failed to connect"); - return c.get(); + peerinfo->connection = c.get(); + return true; } void torrent::set_metadata(entry const& metadata) @@ -1954,25 +1977,7 @@ namespace libtorrent TORRENT_ASSERT(p != 0); TORRENT_ASSERT(!p->is_local()); - std::map::iterator c - = m_connections.find(p->remote()); - if (c != m_connections.end()) - { - TORRENT_ASSERT(p != c->second); - // we already have a peer_connection to this ip. - // It may currently be waiting for completing a - // connection attempt that might fail. So, - // prioritize this current connection since - // it has already succeeded. - if (!c->second->is_connecting()) - { - throw protocol_error("already connected to peer"); - } - c->second->disconnect(); - } - - if (m_ses.m_connections.find(p->get_socket()) - == m_ses.m_connections.end()) + if (m_ses.m_connections.find(p) == m_ses.m_connections.end()) { throw protocol_error("peer is not properly constructed"); } @@ -1982,9 +1987,8 @@ namespace libtorrent throw protocol_error("session is closing"); } - TORRENT_ASSERT(m_connections.find(p->remote()) == m_connections.end()); - peer_iterator ci = m_connections.insert( - std::make_pair(p->remote(), p)).first; + TORRENT_ASSERT(m_connections.find(p) == m_connections.end()); + peer_iterator ci = m_connections.insert(p).first; try { // if new_connection throws, we have to remove the @@ -1998,9 +2002,9 @@ namespace libtorrent if (pp) p->add_extension(pp); } #endif - TORRENT_ASSERT(connection_for(p->remote()) == p); - TORRENT_ASSERT(ci->second == p); - m_policy->new_connection(*ci->second); + TORRENT_ASSERT(m_connections.find(p) == ci); + TORRENT_ASSERT(*ci == p); + m_policy.new_connection(**ci); } catch (std::exception& e) { @@ -2010,7 +2014,7 @@ namespace libtorrent TORRENT_ASSERT(p->remote() == p->get_socket()->remote_endpoint()); #ifndef NDEBUG - m_policy->check_invariant(); + m_policy.check_invariant(); #endif } @@ -2029,19 +2033,19 @@ namespace libtorrent while (!m_connections.empty()) { - peer_connection& p = *m_connections.begin()->second; - TORRENT_ASSERT(p.associated_torrent().lock().get() == this); + peer_connection* p = *m_connections.begin(); + TORRENT_ASSERT(p->associated_torrent().lock().get() == this); #if defined(TORRENT_VERBOSE_LOGGING) if (m_abort) - (*p.m_logger) << "*** CLOSING CONNECTION 'aborting'\n"; + (*p->m_logger) << "*** CLOSING CONNECTION 'aborting'\n"; else - (*p.m_logger) << "*** CLOSING CONNECTION 'pausing'\n"; + (*p->m_logger) << "*** CLOSING CONNECTION 'pausing'\n"; #endif #ifndef NDEBUG std::size_t size = m_connections.size(); #endif - p.disconnect(); + p->disconnect(); TORRENT_ASSERT(m_connections.size() <= size); } } @@ -2112,7 +2116,8 @@ namespace libtorrent expire_bandwidth(channel, blk - amount); } - // called when torrent is finished (all interested pieces downloaded) + // called when torrent is finished (all interesting + // pieces have been downloaded) void torrent::finished() { INVARIANT_CHECK; @@ -2131,13 +2136,14 @@ namespace libtorrent for (peer_iterator i = m_connections.begin(); i != m_connections.end(); ++i) { - TORRENT_ASSERT(i->second->associated_torrent().lock().get() == this); - if (i->second->is_seed()) + peer_connection* p = *i; + TORRENT_ASSERT(p->associated_torrent().lock().get() == this); + if (p->is_seed()) { #if defined(TORRENT_VERBOSE_LOGGING) - (*i->second->m_logger) << "*** SEED, CLOSING CONNECTION\n"; + (*p->m_logger) << "*** SEED, CLOSING CONNECTION\n"; #endif - seeds.push_back(i->second); + seeds.push_back(p); } } std::for_each(seeds.begin(), seeds.end() @@ -2338,23 +2344,21 @@ namespace libtorrent m_connections_initialized = true; // all peer connections have to initialize themselves now that the metadata // is available - typedef std::map conn_map; - for (conn_map::iterator i = m_connections.begin() + for (torrent::peer_iterator i = m_connections.begin() , end(m_connections.end()); i != end;) { try { - i->second->on_metadata(); - i->second->init(); + (*i)->on_metadata(); + (*i)->init(); ++i; } catch (std::exception& e) { // the connection failed, close it - conn_map::iterator j = i; + torrent::peer_iterator j = i; ++j; - m_ses.connection_failed(i->second->get_socket() - , i->first, e.what()); + m_ses.connection_failed(*i, (*i)->remote(), e.what()); i = j; } } @@ -2425,7 +2429,7 @@ namespace libtorrent std::map num_requests; for (const_peer_iterator i = begin(); i != end(); ++i) { - peer_connection const& p = *i->second; + peer_connection const& p = *(*i); for (std::deque::const_iterator i = p.request_queue().begin() , end(p.request_queue().end()); i != end; ++i) ++num_requests[*i]; @@ -2457,12 +2461,12 @@ namespace libtorrent TORRENT_ASSERT(m_abort || m_have_pieces.empty()); } -/* for (policy::const_iterator i = m_policy->begin_peer() - , end(m_policy->end_peer()); i != end; ++i) + for (policy::const_iterator i = m_policy.begin_peer() + , end(m_policy.end_peer()); i != end; ++i) { - TORRENT_ASSERT(i->connection == const_cast(this)->connection_for(i->ip)); + TORRENT_ASSERT(i->second.ip.address() == i->first); } -*/ + size_type total_done = quantized_bytes_done(); if (m_torrent_file->is_valid()) { @@ -2476,6 +2480,36 @@ namespace libtorrent TORRENT_ASSERT(total_done == 0); } + if (m_picker && !m_abort) + { + // make sure that pieces that have completed the download + // of all their blocks are in the disk io thread's queue + // to be checked. + const std::vector& dl_queue + = m_picker->get_download_queue(); + for (std::vector::const_iterator i = + dl_queue.begin(); i != dl_queue.end(); ++i) + { + const int blocks_per_piece = m_picker->blocks_in_piece(i->index); + + bool complete = true; + for (int j = 0; j < blocks_per_piece; ++j) + { + if (i->info[j].state == piece_picker::block_info::state_finished) + continue; + complete = false; + break; + } + if (complete) + { + disk_io_job ret = m_ses.m_disk_thread.find_job( + m_owning_storage, -1, i->index); + TORRENT_ASSERT(ret.action == disk_io_job::hash || ret.action == disk_io_job::write); + TORRENT_ASSERT(ret.piece == i->index); + } + } + } + // This check is very expensive. TORRENT_ASSERT(m_num_pieces == std::count(m_have_pieces.begin(), m_have_pieces.end(), true)); @@ -2515,17 +2549,19 @@ namespace libtorrent void torrent::set_peer_upload_limit(tcp::endpoint ip, int limit) { TORRENT_ASSERT(limit >= -1); - peer_connection* p = connection_for(ip); - if (p == 0) return; - p->set_upload_limit(limit); + peer_iterator i = std::find_if(m_connections.begin(), m_connections.end() + , bind(&peer_connection::remote, _1) == ip); + if (i == m_connections.end()) return; + (*i)->set_upload_limit(limit); } void torrent::set_peer_download_limit(tcp::endpoint ip, int limit) { TORRENT_ASSERT(limit >= -1); - peer_connection* p = connection_for(ip); - if (p == 0) return; - p->set_download_limit(limit); + peer_iterator i = std::find_if(m_connections.begin(), m_connections.end() + , bind(&peer_connection::remote, _1) == ip); + if (i == m_connections.end()) return; + (*i)->set_download_limit(limit); } void torrent::set_upload_limit(int limit) @@ -2564,7 +2600,7 @@ namespace libtorrent for (peer_iterator i = m_connections.begin(); i != m_connections.end(); ++i) { - (*i->second->m_logger) << "*** DELETING FILES IN TORRENT\n"; + (*(*i)->m_logger) << "*** DELETING FILES IN TORRENT\n"; } #endif @@ -2599,7 +2635,7 @@ namespace libtorrent for (peer_iterator i = m_connections.begin(); i != m_connections.end(); ++i) { - (*i->second->m_logger) << "*** PAUSING TORRENT\n"; + (*(*i)->m_logger) << "*** PAUSING TORRENT\n"; } #endif @@ -2673,7 +2709,7 @@ namespace libtorrent i != m_connections.end(); ++i) { web_peer_connection* p - = dynamic_cast(i->second); + = dynamic_cast(*i); if (!p) continue; web_seeds.insert(p->url()); } @@ -2696,7 +2732,7 @@ namespace libtorrent for (peer_iterator i = m_connections.begin(); i != m_connections.end();) { - peer_connection* p = i->second; + peer_connection* p = *i; ++i; m_stat += p->statistics(); // updates the peer connection's ul/dl bandwidth @@ -2721,19 +2757,19 @@ namespace libtorrent if (m_time_scaler <= 0) { m_time_scaler = 10; - m_policy->pulse(); + m_policy.pulse(); } } bool torrent::try_connect_peer() { TORRENT_ASSERT(want_more_peers()); - return m_policy->connect_one_peer(); + return m_policy.connect_one_peer(); } void torrent::async_verify_piece(int piece_index, boost::function const& f) { - INVARIANT_CHECK; +// INVARIANT_CHECK; TORRENT_ASSERT(m_storage); TORRENT_ASSERT(m_storage->refcount() > 0); @@ -2743,6 +2779,9 @@ namespace libtorrent m_storage->async_hash(piece_index, bind(&torrent::on_piece_verified , shared_from_this(), _1, _2, f)); +#ifndef NDEBUG + check_invariant(); +#endif } void torrent::on_piece_verified(int ret, disk_io_job const& j @@ -2809,8 +2848,7 @@ namespace libtorrent torrent_status st; st.num_peers = (int)std::count_if(m_connections.begin(), m_connections.end(), - !boost::bind(&peer_connection::is_connecting - , boost::bind(&std::map::value_type::second, _1))); + !boost::bind(&peer_connection::is_connecting, _1)); st.storage_mode = m_storage_mode; @@ -2937,9 +2975,7 @@ namespace libtorrent INVARIANT_CHECK; return (int)std::count_if(m_connections.begin(), m_connections.end() - , boost::bind(&peer_connection::is_seed - , boost::bind(&std::map::value_type::second, _1))); + , boost::bind(&peer_connection::is_seed, _1)); } void torrent::tracker_request_timed_out( @@ -2995,7 +3031,7 @@ namespace libtorrent #if defined(TORRENT_VERBOSE_LOGGING) || defined(TORRENT_LOGGING) void torrent::debug_log(const std::string& line) { - (*m_ses.m_logger) << line << "\n"; + (*m_ses.m_logger) << time_now_string() << " " << line << "\n"; } #endif diff --git a/libtorrent/src/torrent_handle.cpp b/libtorrent/src/torrent_handle.cpp index 9418d50e8..b19e05bb4 100755 --- a/libtorrent/src/torrent_handle.cpp +++ b/libtorrent/src/torrent_handle.cpp @@ -247,6 +247,20 @@ namespace libtorrent find_torrent(m_ses, m_chk, m_info_hash)->move_storage(save_path); } + void torrent_handle::add_extension( + boost::function(torrent*, void*)> const& ext + , void* userdata) + { + INVARIANT_CHECK; + + if (m_ses == 0) throw_invalid_handle(); + TORRENT_ASSERT(m_chk); + + session_impl::mutex_t::scoped_lock l1(m_ses->m_mutex); + mutex::scoped_lock l2(m_chk->m_mutex); + return find_torrent(m_ses, m_chk, m_info_hash)->add_extension(ext, userdata); + } + bool torrent_handle::has_metadata() const { INVARIANT_CHECK; @@ -908,7 +922,7 @@ namespace libtorrent for (torrent::const_peer_iterator i = t->begin(); i != t->end(); ++i) { - peer_connection* peer = i->second; + peer_connection* peer = *i; // incoming peers that haven't finished the handshake should // not be included in this list diff --git a/libtorrent/src/tracker_manager.cpp b/libtorrent/src/tracker_manager.cpp index 0118e5802..82c5cc948 100755 --- a/libtorrent/src/tracker_manager.cpp +++ b/libtorrent/src/tracker_manager.cpp @@ -296,18 +296,20 @@ namespace libtorrent , m_timeout(str.io_service()) , m_completion_timeout(0) , m_read_timeout(0) + , m_abort(false) {} void timeout_handler::set_timeout(int completion_timeout, int read_timeout) { m_completion_timeout = completion_timeout; m_read_timeout = read_timeout; - m_start_time = time_now(); - m_read_time = time_now(); + m_start_time = m_read_time = time_now(); - m_timeout.expires_at((std::min)( - m_read_time + seconds(m_read_timeout) - , m_start_time + seconds(m_completion_timeout))); + if (m_abort) return; + + int timeout = (std::min)( + m_read_timeout, (std::min)(m_completion_timeout, m_read_timeout)); + m_timeout.expires_at(m_read_time + seconds(timeout)); m_timeout.async_wait(m_strand.wrap(bind( &timeout_handler::timeout_callback, self(), _1))); } @@ -319,6 +321,7 @@ namespace libtorrent void timeout_handler::cancel() { + m_abort = true; m_completion_timeout = 0; m_timeout.cancel(); } @@ -341,9 +344,11 @@ namespace libtorrent return; } - m_timeout.expires_at((std::min)( - m_read_time + seconds(m_read_timeout) - , m_start_time + seconds(m_completion_timeout))); + if (m_abort) return; + + int timeout = (std::min)( + m_read_timeout, (std::min)(m_completion_timeout, m_read_timeout)); + m_timeout.expires_at(m_read_time + seconds(timeout)); m_timeout.async_wait(m_strand.wrap( bind(&timeout_handler::timeout_callback, self(), _1))); } @@ -567,12 +572,24 @@ namespace libtorrent m_abort = true; tracker_connections_t keep_connections; - for (tracker_connections_t::const_iterator i = - m_connections.begin(); i != m_connections.end(); ++i) + while (!m_connections.empty()) { - tracker_request const& req = (*i)->tracker_req(); + boost::intrusive_ptr& c = m_connections.back(); + if (!c) + { + m_connections.pop_back(); + continue; + } + tracker_request const& req = c->tracker_req(); if (req.event == tracker_request::stopped) - keep_connections.push_back(*i); + { + keep_connections.push_back(c); + m_connections.pop_back(); + continue; + } + // close will remove the entry from m_connections + // so no need to pop + c->close(); } std::swap(m_connections, keep_connections); diff --git a/libtorrent/src/udp_tracker_connection.cpp b/libtorrent/src/udp_tracker_connection.cpp index 1ba50b3c6..6d76988d3 100755 --- a/libtorrent/src/udp_tracker_connection.cpp +++ b/libtorrent/src/udp_tracker_connection.cpp @@ -86,6 +86,7 @@ namespace libtorrent , m_man(man) , m_strand(str) , m_name_lookup(m_strand.io_service()) + , m_socket(m_strand.io_service()) , m_transaction_id(0) , m_connection_id(0) , m_settings(stn) @@ -95,7 +96,9 @@ namespace libtorrent m_name_lookup.async_resolve(q , m_strand.wrap(boost::bind( &udp_tracker_connection::name_lookup, self(), _1, _2))); - set_timeout(m_settings.tracker_completion_timeout + set_timeout(req.event == tracker_request::stopped + ? m_settings.stop_tracker_timeout + : m_settings.tracker_completion_timeout , m_settings.tracker_receive_timeout); } @@ -103,7 +106,7 @@ namespace libtorrent , udp::resolver::iterator i) try { if (error == asio::error::operation_aborted) return; - if (!m_socket) return; // the operation was aborted + if (!m_socket.is_open()) return; // the operation was aborted if (error || i == udp::resolver::iterator()) { fail(-1, error.message().c_str()); @@ -143,10 +146,9 @@ namespace libtorrent if (cb) cb->m_tracker_address = tcp::endpoint(target_address.address(), target_address.port()); m_target = target_address; - m_socket.reset(new datagram_socket(m_name_lookup.io_service())); - m_socket->open(target_address.protocol()); - m_socket->bind(udp::endpoint(bind_interface(), 0)); - m_socket->connect(target_address); + m_socket.open(target_address.protocol()); + m_socket.bind(udp::endpoint(bind_interface(), 0)); + m_socket.connect(target_address); send_udp_connect(); } catch (std::exception& e) @@ -156,11 +158,20 @@ namespace libtorrent void udp_tracker_connection::on_timeout() { - m_socket.reset(); + asio::error_code ec; + m_socket.close(ec); m_name_lookup.cancel(); fail_timeout(); } + void udp_tracker_connection::close() + { + asio::error_code ec; + m_socket.close(ec); + m_name_lookup.cancel(); + tracker_connection::close(); + } + void udp_tracker_connection::send_udp_connect() { #if defined(TORRENT_VERBOSE_LOGGING) || defined(TORRENT_LOGGING) @@ -171,7 +182,7 @@ namespace libtorrent + lexical_cast(tracker_req().info_hash) + "]"); } #endif - if (!m_socket) return; // the operation was aborted + if (!m_socket.is_open()) return; // the operation was aborted char send_buf[16]; char* ptr = send_buf; @@ -187,10 +198,10 @@ namespace libtorrent // transaction_id detail::write_int32(m_transaction_id, ptr); - m_socket->send(asio::buffer((void*)send_buf, 16), 0); + m_socket.send(asio::buffer((void*)send_buf, 16), 0); ++m_attempts; m_buffer.resize(udp_buffer_size); - m_socket->async_receive_from(asio::buffer(m_buffer), m_sender + m_socket.async_receive_from(asio::buffer(m_buffer), m_sender , boost::bind(&udp_tracker_connection::connect_response, self(), _1, _2)); } @@ -198,7 +209,7 @@ namespace libtorrent , std::size_t bytes_transferred) try { if (error == asio::error::operation_aborted) return; - if (!m_socket) return; // the operation was aborted + if (!m_socket.is_open()) return; // the operation was aborted if (error) { fail(-1, error.message().c_str()); @@ -208,7 +219,7 @@ namespace libtorrent if (m_target != m_sender) { // this packet was not received from the tracker - m_socket->async_receive_from(asio::buffer(m_buffer), m_sender + m_socket.async_receive_from(asio::buffer(m_buffer), m_sender , boost::bind(&udp_tracker_connection::connect_response, self(), _1, _2)); return; } @@ -284,7 +295,7 @@ namespace libtorrent if (m_transaction_id == 0) m_transaction_id = rand() ^ (rand() << 16); - if (!m_socket) return; // the operation was aborted + if (!m_socket.is_open()) return; // the operation was aborted std::vector buf; std::back_insert_iterator > out(buf); @@ -332,10 +343,10 @@ namespace libtorrent } #endif - m_socket->send(asio::buffer(buf), 0); + m_socket.send(asio::buffer(buf), 0); ++m_attempts; - m_socket->async_receive_from(asio::buffer(m_buffer), m_sender + m_socket.async_receive_from(asio::buffer(m_buffer), m_sender , bind(&udp_tracker_connection::announce_response, self(), _1, _2)); } @@ -344,7 +355,7 @@ namespace libtorrent if (m_transaction_id == 0) m_transaction_id = rand() ^ (rand() << 16); - if (!m_socket) return; // the operation was aborted + if (!m_socket.is_open()) return; // the operation was aborted std::vector buf; std::back_insert_iterator > out(buf); @@ -358,10 +369,10 @@ namespace libtorrent // info_hash std::copy(tracker_req().info_hash.begin(), tracker_req().info_hash.end(), out); - m_socket->send(asio::buffer(&buf[0], buf.size()), 0); + m_socket.send(asio::buffer(&buf[0], buf.size()), 0); ++m_attempts; - m_socket->async_receive_from(asio::buffer(m_buffer), m_sender + m_socket.async_receive_from(asio::buffer(m_buffer), m_sender , bind(&udp_tracker_connection::scrape_response, self(), _1, _2)); } @@ -369,7 +380,7 @@ namespace libtorrent , std::size_t bytes_transferred) try { if (error == asio::error::operation_aborted) return; - if (!m_socket) return; // the operation was aborted + if (!m_socket.is_open()) return; // the operation was aborted if (error) { fail(-1, error.message().c_str()); @@ -379,7 +390,7 @@ namespace libtorrent if (m_target != m_sender) { // this packet was not received from the tracker - m_socket->async_receive_from(asio::buffer(m_buffer), m_sender + m_socket.async_receive_from(asio::buffer(m_buffer), m_sender , bind(&udp_tracker_connection::connect_response, self(), _1, _2)); return; } @@ -468,6 +479,7 @@ namespace libtorrent , complete, incomplete); m_man.remove_request(this); + close(); return; } catch (std::exception& e) @@ -479,7 +491,7 @@ namespace libtorrent , std::size_t bytes_transferred) try { if (error == asio::error::operation_aborted) return; - if (!m_socket) return; // the operation was aborted + if (!m_socket.is_open()) return; // the operation was aborted if (error) { fail(-1, error.message().c_str()); @@ -489,7 +501,7 @@ namespace libtorrent if (m_target != m_sender) { // this packet was not received from the tracker - m_socket->async_receive_from(asio::buffer(m_buffer), m_sender + m_socket.async_receive_from(asio::buffer(m_buffer), m_sender , bind(&udp_tracker_connection::connect_response, self(), _1, _2)); return; } @@ -543,6 +555,7 @@ namespace libtorrent if (!cb) { m_man.remove_request(this); + close(); return; } @@ -551,6 +564,7 @@ namespace libtorrent , complete, incomplete); m_man.remove_request(this); + close(); } catch (std::exception& e) { diff --git a/libtorrent/src/upnp.cpp b/libtorrent/src/upnp.cpp index 4bdc20987..116eb1dfe 100644 --- a/libtorrent/src/upnp.cpp +++ b/libtorrent/src/upnp.cpp @@ -148,6 +148,7 @@ void upnp::set_mappings(int tcp, int udp) , end(m_devices.end()); i != end; ++i) { rootdevice& d = const_cast(*i); + TORRENT_ASSERT(d.magic == 1337); if (d.mapping[0].local_port != m_tcp_local_port) { if (d.mapping[0].external_port == 0) @@ -200,8 +201,13 @@ try // we don't have a WANIP or WANPPP url for this device, // ask for it rootdevice& d = const_cast(*i); + TORRENT_ASSERT(d.magic == 1337); try { +#ifdef TORRENT_UPNP_LOGGING + m_log << time_now_string() + << " ==> connecting to " << d.url << std::endl; +#endif d.upnp_connection.reset(new http_connection(m_io_service , m_cc, m_strand.wrap(bind(&upnp::on_upnp_xml, self(), _1, _2 , boost::ref(d))))); @@ -270,7 +276,7 @@ try { #ifdef TORRENT_UPNP_LOGGING m_log << time_now_string() - << " <== Rootdevice responded with incorrect HTTP packet. Ignoring device (" << e.what() << ")" << std::endl; + << " <== (" << from << ") Rootdevice responded with incorrect HTTP packet. Ignoring device (" << e.what() << ")" << std::endl; #endif return; } @@ -280,11 +286,11 @@ try #ifdef TORRENT_UPNP_LOGGING if (p.method().empty()) m_log << time_now_string() - << " <== Device responded with HTTP status: " << p.status_code() + << " <== (" << from << ") Device responded with HTTP status: " << p.status_code() << ". Ignoring device" << std::endl; else m_log << time_now_string() - << " <== Device with HTTP method: " << p.method() + << " <== (" << from << ") Device with HTTP method: " << p.method() << ". Ignoring device" << std::endl; #endif return; @@ -294,7 +300,7 @@ try { #ifdef TORRENT_UPNP_LOGGING m_log << time_now_string() - << " <== Rootdevice responded with incomplete HTTP " + << " <== (" << from << ") Rootdevice responded with incomplete HTTP " "packet. Ignoring device" << std::endl; #endif return; @@ -305,7 +311,7 @@ try { #ifdef TORRENT_UPNP_LOGGING m_log << time_now_string() - << " <== Rootdevice response is missing a location header. " + << " <== (" << from << ") Rootdevice response is missing a location header. " "Ignoring device" << std::endl; #endif return; @@ -332,7 +338,7 @@ try { #ifdef TORRENT_UPNP_LOGGING m_log << time_now_string() - << " <== Rootdevice uses unsupported protocol: '" << protocol + << " <== (" << from << ") Rootdevice uses unsupported protocol: '" << protocol << "'. Ignoring device" << std::endl; #endif return; @@ -342,16 +348,27 @@ try { #ifdef TORRENT_UPNP_LOGGING m_log << time_now_string() - << " <== Rootdevice responded with a url with port 0. " + << " <== (" << from << ") Rootdevice responded with a url with port 0. " "Ignoring device" << std::endl; #endif return; } #ifdef TORRENT_UPNP_LOGGING m_log << time_now_string() - << " <== Found rootdevice: " << d.url << std::endl; + << " <== (" << from << ") Found rootdevice: " << d.url + << " total: " << m_devices.size() << std::endl; #endif + if (m_devices.size() >= 50) + { +#ifdef TORRENT_UPNP_LOGGING + m_log << time_now_string() + << " <== (" << from << ") Too many devices (" << m_devices.size() << "), " + "ignoring: " << d.url << std::endl; +#endif + return; + } + if (m_tcp_local_port != 0) { d.mapping[0].need_update = true; @@ -390,8 +407,13 @@ try // we don't have a WANIP or WANPPP url for this device, // ask for it rootdevice& d = const_cast(*i); + TORRENT_ASSERT(d.magic == 1337); try { +#ifdef TORRENT_UPNP_LOGGING + m_log << time_now_string() + << " ==> connecting to " << d.url << std::endl; +#endif d.upnp_connection.reset(new http_connection(m_io_service , m_cc, m_strand.wrap(bind(&upnp::on_upnp_xml, self(), _1, _2 , boost::ref(d))))); @@ -420,6 +442,7 @@ catch (std::exception&) void upnp::post(upnp::rootdevice const& d, std::string const& soap , std::string const& soap_action) { + TORRENT_ASSERT(d.magic == 1337); std::stringstream header; header << "POST " << d.control_url << " HTTP/1.1\r\n" @@ -439,6 +462,7 @@ void upnp::post(upnp::rootdevice const& d, std::string const& soap void upnp::create_port_mapping(http_connection& c, rootdevice& d, int i) { + TORRENT_ASSERT(d.magic == 1337); std::string soap_action = "AddPortMapping"; std::stringstream soap; @@ -463,6 +487,7 @@ void upnp::create_port_mapping(http_connection& c, rootdevice& d, int i) void upnp::map_port(rootdevice& d, int i) { + TORRENT_ASSERT(d.magic == 1337); if (d.upnp_connection) return; if (!d.mapping[i].need_update) @@ -479,6 +504,10 @@ void upnp::map_port(rootdevice& d, int i) TORRENT_ASSERT(!d.upnp_connection); TORRENT_ASSERT(d.service_namespace); +#ifdef TORRENT_UPNP_LOGGING + m_log << time_now_string() + << " ==> connecting to " << d.hostname << std::endl; +#endif d.upnp_connection.reset(new http_connection(m_io_service , m_cc, m_strand.wrap(bind(&upnp::on_upnp_map_response, self(), _1, _2 , boost::ref(d), i)), true @@ -490,6 +519,7 @@ void upnp::map_port(rootdevice& d, int i) void upnp::delete_port_mapping(rootdevice& d, int i) { + TORRENT_ASSERT(d.magic == 1337); std::stringstream soap; std::string soap_action = "DeletePortMapping"; @@ -510,23 +540,24 @@ void upnp::delete_port_mapping(rootdevice& d, int i) // requires the mutex to be locked void upnp::unmap_port(rootdevice& d, int i) { - if (d.mapping[i].external_port == 0) + TORRENT_ASSERT(d.magic == 1337); + if (d.mapping[i].external_port == 0 + || d.disabled) { if (i < num_mappings - 1) { unmap_port(d, i + 1); } - else - { - m_devices.erase(d); - } return; } +#ifdef TORRENT_UPNP_LOGGING + m_log << time_now_string() + << " ==> connecting to " << d.hostname << std::endl; +#endif d.upnp_connection.reset(new http_connection(m_io_service , m_cc, m_strand.wrap(bind(&upnp::on_upnp_unmap_response, self(), _1, _2 , boost::ref(d), i)), true , bind(&upnp::delete_port_mapping, self(), boost::ref(d), i))); - d.upnp_connection->start(d.hostname, boost::lexical_cast(d.port) , seconds(10)); } @@ -591,6 +622,7 @@ namespace void upnp::on_upnp_xml(asio::error_code const& e , libtorrent::http_parser const& p, rootdevice& d) try { + TORRENT_ASSERT(d.magic == 1337); if (d.upnp_connection) { d.upnp_connection->close(); @@ -601,8 +633,10 @@ void upnp::on_upnp_xml(asio::error_code const& e { #ifdef TORRENT_UPNP_LOGGING m_log << time_now_string() - << " <== error while fetching control url: " << e.message() << std::endl; + << " <== (" << d.url << ") error while fetching control url: " + << e.message() << std::endl; #endif + d.disabled = true; return; } @@ -610,8 +644,9 @@ void upnp::on_upnp_xml(asio::error_code const& e { #ifdef TORRENT_UPNP_LOGGING m_log << time_now_string() - << " <== error while fetching control url: incomplete http message" << std::endl; + << " <== (" << d.url << ") error while fetching control url: incomplete http message" << std::endl; #endif + d.disabled = true; return; } @@ -619,8 +654,9 @@ void upnp::on_upnp_xml(asio::error_code const& e { #ifdef TORRENT_UPNP_LOGGING m_log << time_now_string() - << " <== error while fetching control url: " << p.message() << std::endl; + << " <== (" << d.url << ") error while fetching control url: " << p.message() << std::endl; #endif + d.disabled = true; return; } @@ -647,15 +683,17 @@ void upnp::on_upnp_xml(asio::error_code const& e { #ifdef TORRENT_UPNP_LOGGING m_log << time_now_string() - << " <== Rootdevice response, did not find a port mapping interface" << std::endl; + << " <== (" << d.url << ") Rootdevice response, did not find " + "a port mapping interface" << std::endl; #endif + d.disabled = true; return; } } #ifdef TORRENT_UPNP_LOGGING m_log << time_now_string() - << " <== Rootdevice response, found control URL: " << s.control_url + << " <== (" << d.url << ") Rootdevice response, found control URL: " << s.control_url << " namespace: " << d.service_namespace << std::endl; #endif @@ -732,6 +770,7 @@ namespace void upnp::on_upnp_map_response(asio::error_code const& e , libtorrent::http_parser const& p, rootdevice& d, int mapping) try { + TORRENT_ASSERT(d.magic == 1337); if (d.upnp_connection) { d.upnp_connection->close(); @@ -744,7 +783,7 @@ void upnp::on_upnp_map_response(asio::error_code const& e m_log << time_now_string() << " <== error while adding portmap: " << e.message() << std::endl; #endif - m_devices.erase(d); + d.disabled = true; return; } @@ -773,7 +812,7 @@ void upnp::on_upnp_map_response(asio::error_code const& e m_log << time_now_string() << " <== error while adding portmap: incomplete http message" << std::endl; #endif - m_devices.erase(d); + d.disabled = true; return; } @@ -877,6 +916,7 @@ catch (std::exception&) void upnp::on_upnp_unmap_response(asio::error_code const& e , libtorrent::http_parser const& p, rootdevice& d, int mapping) try { + TORRENT_ASSERT(d.magic == 1337); if (d.upnp_connection) { d.upnp_connection->close(); @@ -906,7 +946,7 @@ void upnp::on_upnp_unmap_response(asio::error_code const& e m_log << time_now_string() << " <== error while deleting portmap: " << p.message() << std::endl; #endif - m_devices.erase(d); + d.disabled = true; return; } @@ -922,10 +962,6 @@ void upnp::on_upnp_unmap_response(asio::error_code const& e unmap_port(d, mapping + 1); return; } - - // the main thread is likely to be waiting for - // all the unmap operations to complete - m_devices.erase(d); } catch (std::exception&) { @@ -943,6 +979,7 @@ void upnp::on_expire(asio::error_code const& e) try , end(m_devices.end()); i != end; ++i) { rootdevice& d = const_cast(*i); + TORRENT_ASSERT(d.magic == 1337); for (int m = 0; m < num_mappings; ++m) { if (d.mapping[m].expires != max_time()) @@ -984,15 +1021,11 @@ void upnp::close() } for (std::set::iterator i = m_devices.begin() - , end(m_devices.end()); i != end;) + , end(m_devices.end()); i != end; ++i) { rootdevice& d = const_cast(*i); - if (d.control_url.empty()) - { - m_devices.erase(i++); - continue; - } - ++i; + TORRENT_ASSERT(d.magic == 1337); + if (d.control_url.empty()) continue; unmap_port(d, 0); } } diff --git a/libtorrent/src/ut_pex.cpp b/libtorrent/src/ut_pex.cpp index 071407005..d1248637c 100644 --- a/libtorrent/src/ut_pex.cpp +++ b/libtorrent/src/ut_pex.cpp @@ -72,7 +72,7 @@ namespace libtorrent { namespace struct ut_pex_plugin: torrent_plugin { - ut_pex_plugin(torrent& t): m_torrent(t), m_1_minute(0) {} + ut_pex_plugin(torrent& t): m_torrent(t), m_1_minute(55) {} virtual boost::shared_ptr new_connection(peer_connection* pc); @@ -113,18 +113,20 @@ namespace libtorrent { namespace for (torrent::peer_iterator i = m_torrent.begin() , end(m_torrent.end()); i != end; ++i) { - if (!send_peer(*i->second)) continue; + peer_connection* peer = *i; + if (!send_peer(*peer)) continue; - m_old_peers.insert(i->first); + tcp::endpoint const& remote = peer->remote(); + m_old_peers.insert(remote); - std::set::iterator di = dropped.find(i->first); + std::set::iterator di = dropped.find(remote); if (di == dropped.end()) { // don't write too big of a package if (num_added >= max_peer_entries) break; // only send proper bittorrent peers - bt_peer_connection* p = dynamic_cast(i->second); + bt_peer_connection* p = dynamic_cast(peer); if (!p) continue; // no supported flags to set yet @@ -135,14 +137,14 @@ namespace libtorrent { namespace flags |= p->supports_encryption() ? 1 : 0; #endif // i->first was added since the last time - if (i->first.address().is_v4()) + if (remote.address().is_v4()) { - detail::write_endpoint(i->first, pla_out); + detail::write_endpoint(remote, pla_out); detail::write_uint8(flags, plf_out); } else { - detail::write_endpoint(i->first, pla6_out); + detail::write_endpoint(remote, pla6_out); detail::write_uint8(flags, plf6_out); } ++num_added; @@ -156,7 +158,7 @@ namespace libtorrent { namespace } for (std::set::const_iterator i = dropped.begin() - , end(dropped.end());i != end; ++i) + , end(dropped.end()); i != end; ++i) { if (i->address().is_v4()) detail::write_endpoint(*i, pld_out); @@ -183,7 +185,7 @@ namespace libtorrent { namespace : m_torrent(t) , m_pc(pc) , m_tp(tp) - , m_1_minute(0) + , m_1_minute(55) , m_message_index(0) , m_first_time(true) {} @@ -327,13 +329,14 @@ namespace libtorrent { namespace for (torrent::peer_iterator i = m_torrent.begin() , end(m_torrent.end()); i != end; ++i) { - if (!send_peer(*i->second)) continue; + peer_connection* peer = *i; + if (!send_peer(*peer)) continue; // don't write too big of a package if (num_added >= max_peer_entries) break; // only send proper bittorrent peers - bt_peer_connection* p = dynamic_cast(i->second); + bt_peer_connection* p = dynamic_cast(peer); if (!p) continue; // no supported flags to set yet @@ -343,15 +346,16 @@ namespace libtorrent { namespace #ifndef TORRENT_DISABLE_ENCRYPTION flags |= p->supports_encryption() ? 1 : 0; #endif + tcp::endpoint const& remote = peer->remote(); // i->first was added since the last time - if (i->first.address().is_v4()) + if (remote.address().is_v4()) { - detail::write_endpoint(i->first, pla_out); + detail::write_endpoint(remote, pla_out); detail::write_uint8(flags, plf_out); } else { - detail::write_endpoint(i->first, pla6_out); + detail::write_endpoint(remote, pla6_out); detail::write_uint8(flags, plf6_out); } ++num_added;