From b26575e92024db377783a2435139b2a1605dc483 Mon Sep 17 00:00:00 2001 From: Marcos Pinto Date: Tue, 14 Aug 2007 17:59:02 +0000 Subject: [PATCH] fast-extension support from libtorrent --- .../include/libtorrent/bt_peer_connection.hpp | 29 +- libtorrent/include/libtorrent/extensions.hpp | 9 + .../include/libtorrent/peer_connection.hpp | 32 +- .../include/libtorrent/piece_picker.hpp | 21 +- libtorrent/include/libtorrent/session.hpp | 1 + .../include/libtorrent/session_settings.hpp | 5 + .../libtorrent/web_peer_connection.hpp | 2 + libtorrent/src/bt_peer_connection.cpp | 213 ++++++++++- libtorrent/src/peer_connection.cpp | 337 ++++++++++++++++-- libtorrent/src/piece_picker.cpp | 1 + libtorrent/src/policy.cpp | 78 ++-- 11 files changed, 647 insertions(+), 81 deletions(-) diff --git a/libtorrent/include/libtorrent/bt_peer_connection.hpp b/libtorrent/include/libtorrent/bt_peer_connection.hpp index beec94979..4f13b2fcf 100755 --- a/libtorrent/include/libtorrent/bt_peer_connection.hpp +++ b/libtorrent/include/libtorrent/bt_peer_connection.hpp @@ -122,8 +122,16 @@ namespace libtorrent msg_request, msg_piece, msg_cancel, + // DHT extension msg_dht_port, - // extension protocol message + // FAST extension + msg_suggest_piece = 0xd, + msg_have_all, + msg_have_none, + msg_reject_request, + msg_allowed_fast, + + // extension protocol message msg_extended = 20, num_supported_messages @@ -174,8 +182,17 @@ namespace libtorrent void on_request(int received); void on_piece(int received); void on_cancel(int received); + + // DHT extension void on_dht_port(int received); + // FAST extension + void on_suggest_piece(int received); + void on_have_all(int received); + void on_have_none(int received); + void on_reject_request(int received); + void on_allowed_fast(int received); + void on_extended(int received); void on_extended_handshake(); @@ -201,7 +218,16 @@ namespace libtorrent void write_metadata(std::pair req); void write_metadata_request(std::pair req); void write_keepalive(); + + // DHT extension void write_dht_port(int listen_port); + + // FAST extension + void write_have_all(); + void write_have_none(); + void write_reject_request(peer_request const&); + void write_allow_fast(int piece); + void on_connected(); void on_metadata(); @@ -325,6 +351,7 @@ namespace libtorrent bool m_supports_extensions; #endif bool m_supports_dht_port; + bool m_supports_fast; #ifndef TORRENT_DISABLE_ENCRYPTION // this is set to true after the encryption method has been diff --git a/libtorrent/include/libtorrent/extensions.hpp b/libtorrent/include/libtorrent/extensions.hpp index 5f8172649..44fff9c36 100644 --- a/libtorrent/include/libtorrent/extensions.hpp +++ b/libtorrent/include/libtorrent/extensions.hpp @@ -131,6 +131,15 @@ namespace libtorrent virtual bool on_bitfield(std::vector const& bitfield) { return false; } + virtual bool on_have_all() + { return false; } + + virtual bool on_have_none() + { return false; } + + virtual bool on_allowed_fast(int index) + { return false; } + virtual bool on_request(peer_request const& req) { return false; } diff --git a/libtorrent/include/libtorrent/peer_connection.hpp b/libtorrent/include/libtorrent/peer_connection.hpp index 31bcde94a..976e03794 100755 --- a/libtorrent/include/libtorrent/peer_connection.hpp +++ b/libtorrent/include/libtorrent/peer_connection.hpp @@ -131,6 +131,8 @@ namespace libtorrent enum peer_speed_t { slow, medium, fast }; peer_speed_t peer_speed(); + void send_allowed_set(); + #ifndef TORRENT_DISABLE_EXTENSIONS void add_extension(boost::shared_ptr); #endif @@ -186,9 +188,9 @@ namespace libtorrent void set_pid(const peer_id& pid) { m_peer_id = pid; } bool has_piece(int i) const; - const std::deque& download_queue() const; - const std::deque& request_queue() const; - const std::deque& upload_queue() const; + std::deque const& download_queue() const; + std::deque const& request_queue() const; + std::deque const& upload_queue() const; bool is_interesting() const { return m_interesting; } bool is_choked() const { return m_choked; } @@ -217,6 +219,7 @@ namespace libtorrent tcp::endpoint const& remote() const { return m_remote; } std::vector const& get_bitfield() const; + std::vector const& allowed_fast(); void timed_out(); // this will cause this peer_connection to be disconnected. @@ -294,7 +297,13 @@ namespace libtorrent void incoming_piece(peer_request const& p, char const* data); void incoming_piece_fragment(); void incoming_cancel(peer_request const& r); + void incoming_dht_port(int listen_port); + + void incoming_reject_request(peer_request const& r); + void incoming_have_all(); + void incoming_have_none(); + void incoming_allowed_fast(int index); // the following functions appends messages // to the send buffer @@ -373,6 +382,9 @@ namespace libtorrent virtual void write_keepalive() = 0; virtual void write_piece(peer_request const& r, char const* buffer) = 0; + virtual void write_reject_request(peer_request const& r) = 0; + virtual void write_allow_fast(int piece) = 0; + virtual void on_connected() = 0; virtual void on_tick() {} @@ -529,7 +541,7 @@ namespace libtorrent // set to the torrent it belongs to. boost::weak_ptr m_torrent; // is true if it was we that connected to the peer - // and false if we got an incomming connection + // and false if we got an incoming connection // could be considered: true = local, false = remote bool m_active; @@ -563,6 +575,10 @@ namespace libtorrent // the pieces the other end have std::vector m_have_piece; + // this is set to true when a have_all + // message is received. This information + // is used to fill the bitmask in init() + bool m_have_all; // the number of pieces this peer // has. Must be the same as @@ -695,6 +711,14 @@ namespace libtorrent // was last updated ptime m_remote_dl_update; + // the pieces we will send to the peer + // if requested (regardless of choke state) + std::set m_accept_fast; + + // the pieces the peer will send us if + // requested (regardless of choke state) + std::vector m_allowed_fast; + // the number of bytes send to the disk-io // thread that hasn't yet been completely written. int m_outstanding_writing_bytes; diff --git a/libtorrent/include/libtorrent/piece_picker.hpp b/libtorrent/include/libtorrent/piece_picker.hpp index 54df003ef..8e396bdff 100755 --- a/libtorrent/include/libtorrent/piece_picker.hpp +++ b/libtorrent/include/libtorrent/piece_picker.hpp @@ -197,6 +197,19 @@ namespace libtorrent , void* peer, piece_state_t speed , bool rarest_first) const; + // picks blocks from each of the pieces in the piece_list + // vector that is also in the piece bitmask. The blocks + // are added to interesting_blocks, and busy blocks are + // added to backup_blocks. num blocks is the number of + // blocks to be picked. + int add_interesting_blocks(const std::vector& piece_list + , const std::vector& pieces + , std::vector& interesting_blocks + , std::vector& backup_blocks + , int num_blocks, bool prefer_whole_pieces + , void* peer, piece_state_t speed + , bool ignore_downloading_pieces) const; + // clears the peer pointer in all downloading pieces with this // peer pointer void clear_peer(void* peer); @@ -358,14 +371,6 @@ namespace libtorrent void move(int vec_index, int elem_index); void sort_piece(std::vector::iterator dp); - int add_interesting_blocks(const std::vector& piece_list - , const std::vector& pieces - , std::vector& interesting_blocks - , std::vector& backup_blocks - , int num_blocks, bool prefer_whole_pieces - , void* peer, piece_state_t speed - , bool ignore_downloading_pieces) const; - downloading_piece& add_download_piece(); void erase_download_piece(std::vector::iterator i); diff --git a/libtorrent/include/libtorrent/session.hpp b/libtorrent/include/libtorrent/session.hpp index 38206f32c..f721b3293 100755 --- a/libtorrent/include/libtorrent/session.hpp +++ b/libtorrent/include/libtorrent/session.hpp @@ -53,6 +53,7 @@ POSSIBILITY OF SUCH DAMAGE. #pragma warning(pop) #endif +#include "libtorrent/config.hpp" #include "libtorrent/torrent_handle.hpp" #include "libtorrent/entry.hpp" #include "libtorrent/alert.hpp" diff --git a/libtorrent/include/libtorrent/session_settings.hpp b/libtorrent/include/libtorrent/session_settings.hpp index ebc30eae3..e5bb9879a 100644 --- a/libtorrent/include/libtorrent/session_settings.hpp +++ b/libtorrent/include/libtorrent/session_settings.hpp @@ -108,6 +108,7 @@ namespace libtorrent , unchoke_interval(20) , num_want(200) , initial_picker_threshold(4) + , allowed_fast_set_size(10) , max_outstanding_disk_bytes_per_connection(64 * 1024) #ifndef TORRENT_DISABLE_DHT , use_dht_as_fallback(true) @@ -252,6 +253,10 @@ namespace libtorrent // random pieces instead of rarest first. int initial_picker_threshold; + // the number of allowed pieces to send to peers + // that supports the fast extensions + int allowed_fast_set_size; + // the maximum number of bytes a connection may have // pending in the disk write queue before its download // rate is being throttled. This prevents fast downloads diff --git a/libtorrent/include/libtorrent/web_peer_connection.hpp b/libtorrent/include/libtorrent/web_peer_connection.hpp index ba7450c0a..b3ca73c4d 100755 --- a/libtorrent/include/libtorrent/web_peer_connection.hpp +++ b/libtorrent/include/libtorrent/web_peer_connection.hpp @@ -126,6 +126,8 @@ namespace libtorrent void write_piece(peer_request const& r, char const* buffer) { assert(false); } void write_keepalive() {} void on_connected(); + void write_reject_request(peer_request const&) {} + void write_allow_fast(int) {} #ifndef NDEBUG void check_invariant() const; diff --git a/libtorrent/src/bt_peer_connection.cpp b/libtorrent/src/bt_peer_connection.cpp index 5b63f26cd..07ebf5431 100755 --- a/libtorrent/src/bt_peer_connection.cpp +++ b/libtorrent/src/bt_peer_connection.cpp @@ -75,7 +75,14 @@ namespace libtorrent &bt_peer_connection::on_piece, &bt_peer_connection::on_cancel, &bt_peer_connection::on_dht_port, - 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, + 0, 0, 0, + // FAST extension messages + &bt_peer_connection::on_suggest_piece, + &bt_peer_connection::on_have_all, + &bt_peer_connection::on_have_none, + &bt_peer_connection::on_reject_request, + &bt_peer_connection::on_allowed_fast, + 0, 0, &bt_peer_connection::on_extended }; @@ -93,6 +100,7 @@ namespace libtorrent , m_supports_extensions(false) #endif , m_supports_dht_port(false) + , m_supports_fast(false) #ifndef TORRENT_DISABLE_ENCRYPTION , m_encrypted(false) , m_rc4_encrypted(false) @@ -124,6 +132,7 @@ namespace libtorrent , m_supports_extensions(false) #endif , m_supports_dht_port(false) + , m_supports_fast(false) #ifndef TORRENT_DISABLE_ENCRYPTION , m_encrypted(false) , m_rc4_encrypted(false) @@ -226,6 +235,10 @@ namespace libtorrent boost::shared_ptr t = associated_torrent().lock(); assert(t); write_bitfield(t->pieces()); +#ifndef TORRENT_DISABLE_DHT + if (m_supports_dht_port && m_ses.m_dht) + write_dht_port(m_ses.get_dht_settings().service_port); +#endif } void bt_peer_connection::write_dht_port(int listen_port) @@ -246,6 +259,75 @@ namespace libtorrent setup_send(); } + void bt_peer_connection::write_have_all() + { + INVARIANT_CHECK; + assert(m_sent_handshake && !m_sent_bitfield); +#ifndef NDEBUG + m_sent_bitfield = true; +#endif +#ifdef TORRENT_VERBOSE_LOGGING + (*m_logger) << time_now_string() + << " ==> HAVE_ALL\n"; +#endif + char buf[] = {0,0,0,1, msg_have_all}; + send_buffer(buf, buf + sizeof(buf)); + } + + void bt_peer_connection::write_have_none() + { + INVARIANT_CHECK; + assert(m_sent_handshake && !m_sent_bitfield); +#ifndef NDEBUG + m_sent_bitfield = true; +#endif +#ifdef TORRENT_VERBOSE_LOGGING + (*m_logger) << time_now_string() + << " ==> HAVE_NONE\n"; +#endif + char buf[] = {0,0,0,1, msg_have_none}; + send_buffer(buf, buf + sizeof(buf)); + } + + void bt_peer_connection::write_reject_request(peer_request const& r) + { + INVARIANT_CHECK; + + assert(m_sent_handshake && m_sent_bitfield); + assert(associated_torrent().lock()->valid_metadata()); + + char buf[] = {0,0,0,13, msg_reject_request}; + + buffer::interval i = allocate_send_buffer(17); + + std::copy(buf, buf + 5, i.begin); + i.begin += 5; + + // index + detail::write_int32(r.piece, i.begin); + // begin + detail::write_int32(r.start, i.begin); + // length + detail::write_int32(r.length, i.begin); + assert(i.begin == i.end); + + setup_send(); + } + + void bt_peer_connection::write_allow_fast(int piece) + { + INVARIANT_CHECK; + + assert(m_sent_handshake && m_sent_bitfield); + assert(associated_torrent().lock()->valid_metadata()); + + char buf[] = {0,0,0,5, msg_allowed_fast, 0, 0, 0, 0}; + + char* ptr = buf + 5; + detail::write_int32(piece, ptr); + send_buffer(buf, buf + sizeof(buf)); + } + void bt_peer_connection::get_specific_peer_info(peer_info& p) const { assert(!associated_torrent().expired()); @@ -636,6 +718,9 @@ namespace libtorrent *(i.begin + 5) = 0x10; #endif + // we support FAST extension + *(i.begin + 7) = 0x04; + i.begin += 8; // info hash @@ -721,6 +806,20 @@ namespace libtorrent if (!packet_finished()) return; incoming_choke(); + if (!m_supports_fast) + { + boost::shared_ptr t = associated_torrent().lock(); + assert(t); + while (!request_queue().empty()) + { + piece_block const& b = request_queue().front(); + peer_request r; + r.piece = b.piece_index; + r.start = b.block_index * t->block_size(); + r.length = t->block_size(); + incoming_reject_request(r); + } + } } // ----------------------------- @@ -939,6 +1038,9 @@ namespace libtorrent { INVARIANT_CHECK; + if (!m_supports_dht_port) + throw protocol_error("got 'dht_port' message from peer that doesn't support it"); + assert(received > 0); if (packet_size() != 3) throw protocol_error("'dht_port' message size != 3"); @@ -953,6 +1055,74 @@ namespace libtorrent incoming_dht_port(listen_port); } + void bt_peer_connection::on_suggest_piece(int received) + { + INVARIANT_CHECK; + + if (!m_supports_fast) + throw protocol_error("got 'suggest_piece' without FAST extension support"); + + // just ignore for now + return; + } + + void bt_peer_connection::on_have_all(int received) + { + INVARIANT_CHECK; + + if (!m_supports_fast) + throw protocol_error("got 'have_all' without FAST extension support"); + m_statistics.received_bytes(0, received); + incoming_have_all(); + } + + void bt_peer_connection::on_have_none(int received) + { + INVARIANT_CHECK; + + if (!m_supports_fast) + throw protocol_error("got 'have_none' without FAST extension support"); + m_statistics.received_bytes(0, received); + incoming_have_none(); + } + + void bt_peer_connection::on_reject_request(int received) + { + INVARIANT_CHECK; + + if (!m_supports_fast) + throw protocol_error("got 'reject_request' without FAST extension support"); + + m_statistics.received_bytes(0, received); + if (!packet_finished()) return; + + buffer::const_interval recv_buffer = receive_buffer(); + + peer_request r; + const char* ptr = recv_buffer.begin + 1; + r.piece = detail::read_int32(ptr); + r.start = detail::read_int32(ptr); + r.length = detail::read_int32(ptr); + + incoming_reject_request(r); + } + + void bt_peer_connection::on_allowed_fast(int received) + { + INVARIANT_CHECK; + + if (!m_supports_fast) + throw protocol_error("got 'allowed_fast' without FAST extension support"); + + m_statistics.received_bytes(0, received); + if (!packet_finished()) return; + buffer::const_interval recv_buffer = receive_buffer(); + const char* ptr = recv_buffer.begin + 1; + int index = detail::read_int32(ptr); + + incoming_allowed_fast(index); + } + // ----------------------------- // --------- EXTENDED ---------- // ----------------------------- @@ -1175,6 +1345,22 @@ namespace libtorrent assert(m_sent_handshake && !m_sent_bitfield); assert(t->valid_metadata()); + // in this case, have_all or have_none should be sent instead + assert(!m_supports_fast || !t->is_seed() || t->num_pieces() != 0); + + if (m_supports_fast && t->is_seed()) + { + write_have_all(); + send_allowed_set(); + return; + } + else if (m_supports_fast && t->num_pieces() == 0) + { + write_have_none(); + send_allowed_set(); + return; + } + int num_pieces = bitfield.size(); int lazy_pieces[50]; int num_lazy_pieces = 0; @@ -1251,6 +1437,9 @@ namespace libtorrent #endif } } + + if (m_supports_fast) + send_allowed_set(); } #ifndef TORRENT_DISABLE_EXTENSIONS @@ -2016,6 +2205,9 @@ namespace libtorrent if (recv_buffer[7] & 0x01) m_supports_dht_port = true; + if (recv_buffer[7] & 0x04) + m_supports_fast = true; + // ok, now we have got enough of the handshake. Is this connection // attached to a torrent? if (!t) @@ -2049,10 +2241,10 @@ namespace libtorrent assert(t); // if this is a local connection, we have already - // send the handshake + // sent the handshake if (!is_local()) write_handshake(); - if (t->valid_metadata()) - write_bitfield(t->pieces()); +// if (t->valid_metadata()) +// write_bitfield(t->pieces()); assert(t->get_policy().has_connection(this)); @@ -2125,11 +2317,6 @@ namespace libtorrent throw protocol_error("closing connection to ourself"); } -#ifndef TORRENT_DISABLE_DHT - if (m_supports_dht_port && m_ses.m_dht) - write_dht_port(m_ses.get_dht_settings().service_port); -#endif - m_client_version = identify_client(pid); boost::optional f = client_fingerprint(pid); if (f && std::equal(f->name, f->name + 2, "BC")) @@ -2181,6 +2368,14 @@ namespace libtorrent m_state = read_packet_size; reset_recv_buffer(4); + if (t->valid_metadata()) + { + write_bitfield(t->pieces()); +#ifndef TORRENT_DISABLE_DHT + if (m_supports_dht_port && m_ses.m_dht) + write_dht_port(m_ses.get_dht_settings().service_port); +#endif + } assert(!packet_finished()); return; diff --git a/libtorrent/src/peer_connection.cpp b/libtorrent/src/peer_connection.cpp index b73e32896..86d4e1500 100755 --- a/libtorrent/src/peer_connection.cpp +++ b/libtorrent/src/peer_connection.cpp @@ -93,6 +93,7 @@ namespace libtorrent , m_choked(true) , m_failed(false) , m_ignore_bandwidth_limits(false) + , m_have_all(false) , m_num_pieces(0) , m_desired_queue_size(2) , m_free_upload(0) @@ -168,6 +169,7 @@ namespace libtorrent , m_choked(true) , m_failed(false) , m_ignore_bandwidth_limits(false) + , m_have_all(false) , m_num_pieces(0) , m_desired_queue_size(2) , m_free_upload(0) @@ -251,6 +253,67 @@ namespace libtorrent } #endif + void peer_connection::send_allowed_set() + { + INVARIANT_CHECK; + + boost::shared_ptr t = m_torrent.lock(); + assert(t); + + int num_allowed_pieces = m_ses.settings().allowed_fast_set_size; + int num_pieces = t->torrent_file().num_pieces(); + + if (num_allowed_pieces >= num_pieces) + { + for (int i = 0; i < num_pieces; ++i) + { +#ifdef TORRENT_VERBOSE_LOGGING + (*m_logger) << time_now_string() + << " ==> ALLOWED_FAST [ " << i << " ]\n"; +#endif + write_allow_fast(i); + m_accept_fast.insert(i); + } + return; + } + + std::string x; + address const& addr = m_remote.address(); + if (addr.is_v4()) + { + address_v4::bytes_type bytes = addr.to_v4().to_bytes(); + x.assign((char*)&bytes[0], bytes.size()); + } + else + { + address_v6::bytes_type bytes = addr.to_v6().to_bytes(); + x.assign((char*)&bytes[0], bytes.size()); + } + x.append((char*)&t->torrent_file().info_hash()[0], 20); + + sha1_hash hash = hasher(&x[0], x.size()).final(); + for (;;) + { + char* p = (char*)&hash[0]; + for (int i = 0; i < 5; ++i) + { + int piece = detail::read_uint32(p) % num_pieces; + if (m_accept_fast.find(piece) == m_accept_fast.end()) + { +#ifdef TORRENT_VERBOSE_LOGGING + (*m_logger) << time_now_string() + << " ==> ALLOWED_FAST [ " << piece << " ]\n"; +#endif + write_allow_fast(piece); + m_accept_fast.insert(piece); + if (int(m_accept_fast.size()) >= num_allowed_pieces + || int(m_accept_fast.size()) == num_pieces) return; + } + } + hash = hasher((char*)&hash[0], 20).final(); + } + } + void peer_connection::init() { INVARIANT_CHECK; @@ -260,7 +323,7 @@ namespace libtorrent assert(t->valid_metadata()); assert(t->ready_for_connections()); - m_have_piece.resize(t->torrent_file().num_pieces(), false); + m_have_piece.resize(t->torrent_file().num_pieces(), m_have_all); // now that we have a piece_picker, // update it with this peers pieces @@ -588,6 +651,74 @@ namespace libtorrent m_request_queue.clear(); } + bool match_request(peer_request const& r, piece_block const& b, int block_size) + { + if (b.piece_index != r.piece) return false; + if (b.block_index != r.start / block_size) return false; + if (r.start % block_size != 0) return false; + return true; + } + + // ----------------------------- + // -------- REJECT PIECE ------- + // ----------------------------- + + void peer_connection::incoming_reject_request(peer_request const& r) + { + INVARIANT_CHECK; + + boost::shared_ptr t = m_torrent.lock(); + assert(t); + + std::deque::iterator i = std::find_if( + m_download_queue.begin(), m_download_queue.end() + , bind(match_request, boost::cref(r), _1, t->block_size())); + +#ifdef TORRENT_VERBOSE_LOGGING + (*m_logger) << time_now_string() + << " <== REJECT_PIECE [ piece: " << r.piece << " | s: " << r.start << " | l: " << r.length << " ]\n"; +#endif + + piece_block b(-1, 0); + if (i != m_download_queue.end()) + { + b = *i; + m_download_queue.erase(i); + } + else + { + i = std::find_if(m_request_queue.begin(), m_request_queue.end() + , bind(match_request, boost::cref(r), _1, t->block_size())); + + if (i != m_request_queue.end()) + { + b = *i; + m_request_queue.erase(i); + } + } + + if (b.piece_index != -1 && !t->is_seed()) + { + piece_picker& p = t->picker(); + p.abort_download(b); + } +#ifdef TORRENT_VERBOSE_LOGGING + else + { + (*m_logger) << time_now_string() + << " *** PIECE NOT IN REQUEST QUEUE\n"; + } +#endif + if (m_request_queue.empty()) + { + if (m_download_queue.size() < 2) + { + request_a_block(*t, *this); + } + send_block_requests(); + } + } + // ----------------------------- // ---------- UNCHOKE ---------- // ----------------------------- @@ -798,11 +929,12 @@ namespace libtorrent { m_have_piece = bitfield; m_num_pieces = std::count(bitfield.begin(), bitfield.end(), true); - - if (m_peer_info) m_peer_info->seed = true; + if (m_peer_info) m_peer_info->seed = (m_num_pieces == int(bitfield.size())); return; } + assert(t->valid_metadata()); + int num_pieces = std::count(bitfield.begin(), bitfield.end(), true); if (num_pieces == int(m_have_piece.size())) { @@ -906,6 +1038,7 @@ namespace libtorrent "t: " << (int)t->torrent_file().piece_size(r.piece) << " | " "n: " << t->torrent_file().num_pieces() << " ]\n"; #endif + write_reject_request(r); return; } @@ -925,6 +1058,7 @@ namespace libtorrent "t: " << (int)t->torrent_file().piece_size(r.piece) << " | " "n: " << t->torrent_file().num_pieces() << " ]\n"; #endif + write_reject_request(r); return; } @@ -947,11 +1081,19 @@ namespace libtorrent #endif // if we have choked the client // ignore the request - if (m_choked) - return; - - m_requests.push_back(r); - fill_send_buffer(); + if (m_choked && m_accept_fast.find(r.piece) == m_accept_fast.end()) + { + write_reject_request(r); +#ifdef TORRENT_VERBOSE_LOGGING + (*m_logger) << time_now_string() + << " *** REJECTING REQUEST [ peer choked and piece not in allowed fast set ]\n"; +#endif + } + else + { + m_requests.push_back(r); + fill_send_buffer(); + } } else { @@ -968,6 +1110,7 @@ namespace libtorrent "block_limit: " << t->block_size() << " ]\n"; #endif + write_reject_request(r); ++m_num_invalid_requests; if (t->alerts().should_post(alert::debug)) @@ -977,7 +1120,7 @@ namespace libtorrent , t->get_handle() , m_remote , m_peer_id - , "peer sent an illegal piece request, ignoring")); + , "peer sent an illegal piece request")); } } } @@ -1131,11 +1274,8 @@ namespace libtorrent "request queue ***\n"; #endif t->received_redundant_data(p.length); - if (!has_peer_choked()) - { - request_a_block(*t, *this); - send_block_requests(); - } + request_a_block(*t, *this); + send_block_requests(); return; } @@ -1144,11 +1284,8 @@ namespace libtorrent { t->received_redundant_data(p.length); - if (!has_peer_choked()) - { - request_a_block(*t, *this); - send_block_requests(); - } + request_a_block(*t, *this); + send_block_requests(); return; } @@ -1205,15 +1342,11 @@ namespace libtorrent block_finished.block_index, block_finished.piece_index, "block finished")); } - if (!has_peer_choked() && !t->is_seed() && !m_torrent.expired()) + if (!t->is_seed() && !m_torrent.expired()) { // this is a free function defined in policy.cpp request_a_block(*t, *this); - try - { - send_block_requests(); - } - catch (std::exception const&) {} + send_block_requests(); } #ifndef NDEBUG @@ -1295,6 +1428,149 @@ namespace libtorrent #endif } + // ----------------------------- + // --------- HAVE ALL ---------- + // ----------------------------- + + void peer_connection::incoming_have_all() + { + INVARIANT_CHECK; + + boost::shared_ptr t = m_torrent.lock(); + assert(t); + +#ifdef TORRENT_VERBOSE_LOGGING + (*m_logger) << time_now_string() << " <== HAVE_ALL\n"; +#endif + +#ifndef TORRENT_DISABLE_EXTENSIONS + for (extension_list_t::iterator i = m_extensions.begin() + , end(m_extensions.end()); i != end; ++i) + { + if ((*i)->on_have_all()) return; + } +#endif + + m_have_all = true; + + if (m_peer_info) m_peer_info->seed = true; + + // if we don't have metadata yet + // just remember the bitmask + // don't update the piecepicker + // (since it doesn't exist yet) + if (!t->ready_for_connections()) + { + // TODO: this might need something more + // so that once we have the metadata + // we can construct a full bitfield + return; + } + +#ifdef TORRENT_VERBOSE_LOGGING + (*m_logger) << " *** THIS IS A SEED ***\n"; +#endif + + // if we're a seed too, disconnect + if (t->is_seed()) + throw protocol_error("seed to seed connection redundant, disconnecting"); + + assert(!m_have_piece.empty()); + std::fill(m_have_piece.begin(), m_have_piece.end(), true); + + t->peer_has_all(); + if (!t->is_finished()) + t->get_policy().peer_is_interesting(*this); + } + + // ----------------------------- + // --------- HAVE NONE --------- + // ----------------------------- + + void peer_connection::incoming_have_none() + { + INVARIANT_CHECK; + + boost::shared_ptr t = m_torrent.lock(); + assert(t); + +#ifdef TORRENT_VERBOSE_LOGGING + (*m_logger) << time_now_string() << " <== HAVE_NONE\n"; +#endif + +#ifndef TORRENT_DISABLE_EXTENSIONS + for (extension_list_t::iterator i = m_extensions.begin() + , end(m_extensions.end()); i != end; ++i) + { + if ((*i)->on_have_none()) return; + } +#endif + + if (m_peer_info) m_peer_info->seed = false; + assert(!m_have_piece.empty() || !t->ready_for_connections()); + } + + // ----------------------------- + // ------- ALLOWED FAST -------- + // ----------------------------- + + void peer_connection::incoming_allowed_fast(int index) + { + INVARIANT_CHECK; + + boost::shared_ptr t = m_torrent.lock(); + assert(t); + +#ifdef TORRENT_VERBOSE_LOGGING + (*m_logger) << time_now_string() << " <== ALLOWED_FAST [ " << index << " ]\n"; +#endif + +#ifndef TORRENT_DISABLE_EXTENSIONS + for (extension_list_t::iterator i = m_extensions.begin() + , end(m_extensions.end()); i != end; ++i) + { + if ((*i)->on_allowed_fast(index)) return; + } +#endif + + // if we already have the piece, we can + // ignore this message + if (t->valid_metadata() + && t->have_piece(index)) + return; + + m_allowed_fast.push_back(index); + + // if the peer has the piece and we want + // to download it, request it + if (m_have_piece.size() > index + && m_have_piece[index] + && t->has_picker() + && t->picker().piece_priority(index) > 0) + { + t->get_policy().peer_is_interesting(*this); + } + } + + std::vector const& peer_connection::allowed_fast() + { + INVARIANT_CHECK; + + boost::shared_ptr t = m_torrent.lock(); + assert(t); + + for (std::vector::iterator i = m_allowed_fast.begin() + , end(m_allowed_fast.end()); i != end; ++i) + { + if (!t->have_piece(*i)) continue; + *i = m_allowed_fast.back(); + m_allowed_fast.pop_back(); + } + + // TODO: sort the allowed fast set in priority order + return m_allowed_fast; + } + void peer_connection::add_request(piece_block const& block) { INVARIANT_CHECK; @@ -1470,13 +1746,9 @@ namespace libtorrent { INVARIANT_CHECK; - if (has_peer_choked()) return; - boost::shared_ptr t = m_torrent.lock(); assert(t); - assert(!has_peer_choked()); - if ((int)m_download_queue.size() >= m_desired_queue_size) return; while (!m_request_queue.empty() @@ -1854,11 +2126,8 @@ namespace libtorrent m_assume_fifo = true; - if (!has_peer_choked()) - { - request_a_block(*t, *this); - send_block_requests(); - } + request_a_block(*t, *this); + send_block_requests(); } } diff --git a/libtorrent/src/piece_picker.cpp b/libtorrent/src/piece_picker.cpp index ddc2c2f5a..7f44d600c 100755 --- a/libtorrent/src/piece_picker.cpp +++ b/libtorrent/src/piece_picker.cpp @@ -1419,6 +1419,7 @@ namespace libtorrent if (p.downloading == 0) { int prio = p.priority(m_sequenced_download_threshold); + assert(prio > 0); p.downloading = 1; move(prio, p.index); diff --git a/libtorrent/src/policy.cpp b/libtorrent/src/policy.cpp index 572f48d35..e5d625dff 100755 --- a/libtorrent/src/policy.cpp +++ b/libtorrent/src/policy.cpp @@ -187,17 +187,18 @@ namespace libtorrent // have only one piece that we don't have, and it's the // same piece for both peers. Then they might get into an // infinite loop, fighting to request the same blocks. - void request_a_block( - torrent& t - , peer_connection& c) + void request_a_block(torrent& t, peer_connection& c) { assert(!t.is_seed()); - assert(!c.has_peer_choked()); + assert(t.valid_metadata()); assert(c.peer_info_struct() != 0 || !dynamic_cast(&c)); int num_requests = c.desired_queue_size() - (int)c.download_queue().size() - (int)c.request_queue().size(); +#ifdef TORRENT_VERBOSE_LOGGING + (*c.m_logger) << time_now_string() << " PIECE_PICKER [ req: " << num_requests << " ]\n"; +#endif assert(c.desired_queue_size() > 0); // if our request queue is already full, we // don't have to make any new requests yet @@ -231,18 +232,6 @@ namespace libtorrent else if (speed == peer_connection::medium) state = piece_picker::medium; else state = piece_picker::slow; - // picks the interesting pieces from this peer - // the integer is the number of pieces that - // should be guaranteed to be available for download - // (if num_requests is too big, too many pieces are - // picked and cpu-time is wasted) - // the last argument is if we should prefer whole pieces - // for this peer. If we're downloading one piece in 20 seconds - // then use this mode. - p.pick_pieces(c.get_bitfield(), interesting_pieces - , num_requests, prefer_whole_pieces, c.peer_info_struct() - , state, rarest_first); - // this vector is filled with the interesting pieces // that some other peer is currently downloading // we should then compare this peer's download speed @@ -251,6 +240,40 @@ namespace libtorrent std::vector busy_pieces; busy_pieces.reserve(num_requests); + if (c.has_peer_choked()) + { + // if we are choked we can only pick pieces from the + // allowed fast set. The allowed fast set is sorted + // in ascending priority order + std::vector const& allowed_fast = c.allowed_fast(); + + p.add_interesting_blocks(allowed_fast, c.get_bitfield() + , interesting_pieces, busy_pieces, num_requests + , prefer_whole_pieces, c.peer_info_struct(), state + , false); + interesting_pieces.insert(interesting_pieces.end() + , busy_pieces.begin(), busy_pieces.end()); + busy_pieces.clear(); + } + else + { + // picks the interesting pieces from this peer + // the integer is the number of pieces that + // should be guaranteed to be available for download + // (if num_requests is too big, too many pieces are + // picked and cpu-time is wasted) + // the last argument is if we should prefer whole pieces + // for this peer. If we're downloading one piece in 20 seconds + // then use this mode. + p.pick_pieces(c.get_bitfield(), interesting_pieces + , num_requests, prefer_whole_pieces, c.peer_info_struct() + , state, rarest_first); + busy_pieces.reserve(10); + } + +#ifdef TORRENT_VERBOSE_LOGGING + (*c.m_logger) << time_now_string() << " PIECE_PICKER [ picked: " << interesting_pieces.size() << " ]\n"; +#endif for (std::vector::iterator i = interesting_pieces.begin(); i != interesting_pieces.end(); ++i) { @@ -277,13 +300,13 @@ namespace libtorrent num_requests--; } - // in this case, we could not find any blocks - // that was free. If we couldn't find any busy - // blocks as well, we cannot download anything - // more from this peer. - if (busy_pieces.empty() || num_requests == 0) { + // in this case, we could not find any blocks + // that was free. If we couldn't find any busy + // blocks as well, we cannot download anything + // more from this peer. + c.send_block_requests(); return; } @@ -970,7 +993,7 @@ namespace libtorrent INVARIANT_CHECK; // just ignore the obviously invalid entries - if(remote.address() == address() || remote.port() == 0) + if (remote.address() == address() || remote.port() == 0) return; aux::session_impl& ses = m_torrent->session(); @@ -1056,7 +1079,10 @@ namespace libtorrent if (i->failcount > 0 && src != peer_info::dht) --i->failcount; - if (flags & 0x02) i->seed = true; + // if we're connected to this peer + // we already know if it's a seed or not + // so we don't have to trust this source + if ((flags & 0x02) && !i->connection) i->seed = true; if (i->connection) { @@ -1327,7 +1353,9 @@ namespace libtorrent INVARIANT_CHECK; c.send_interested(); - if (c.has_peer_choked()) return; + if (c.has_peer_choked() + && c.allowed_fast().empty()) + return; request_a_block(*m_torrent, c); } @@ -1353,7 +1381,7 @@ namespace libtorrent int total_connections = 0; int nonempty_connections = 0; - + std::set
unique_test; for (const_iterator i = m_peers.begin(); i != m_peers.end(); ++i)