From 8f520c717b0f36bdee7b729b48a697b2dfb8df71 Mon Sep 17 00:00:00 2001 From: Marcos Pinto Date: Fri, 6 Jul 2007 18:11:01 +0000 Subject: [PATCH] made the piece picker handle multi-request blocks better. fixes problem that might occur at the end of a torrent where the piece picker could take over blocks back and forth --- .../include/libtorrent/piece_picker.hpp | 9 +- libtorrent/include/libtorrent/policy.hpp | 9 +- libtorrent/include/libtorrent/torrent.hpp | 2 + .../include/libtorrent/torrent_handle.hpp | 2 +- libtorrent/src/peer_connection.cpp | 79 +++------- libtorrent/src/piece_picker.cpp | 48 +++++- libtorrent/src/policy.cpp | 149 +++--------------- libtorrent/src/torrent.cpp | 9 ++ libtorrent/src/torrent_handle.cpp | 2 +- 9 files changed, 109 insertions(+), 200 deletions(-) diff --git a/libtorrent/include/libtorrent/piece_picker.hpp b/libtorrent/include/libtorrent/piece_picker.hpp index 636f7e530..f6c3117b9 100755 --- a/libtorrent/include/libtorrent/piece_picker.hpp +++ b/libtorrent/include/libtorrent/piece_picker.hpp @@ -89,13 +89,15 @@ namespace libtorrent struct block_info { - block_info(): peer(0), num_downloads(0), state(state_none) {} + block_info(): peer(0), num_peers(0), state(state_none) {} // the peer this block was requested or // downloaded from. This is a pointer to // a policy::peer object void* peer; - // the number of times this block has been downloaded - unsigned num_downloads:14; + // the number of peers that has this block in their + // download or request queues + unsigned num_peers:14; + // the state of this block enum { state_none, state_requested, state_writing, state_finished }; unsigned state:2; }; @@ -210,6 +212,7 @@ namespace libtorrent , piece_state_t s); void mark_as_writing(piece_block block, void* peer); void mark_as_finished(piece_block block, void* peer); + int num_peers(piece_block block) const; // if a piece had a hash-failure, it must be restored and // made available for redownloading diff --git a/libtorrent/include/libtorrent/policy.hpp b/libtorrent/include/libtorrent/policy.hpp index fffc3bfa2..e087ccb75 100755 --- a/libtorrent/include/libtorrent/policy.hpp +++ b/libtorrent/include/libtorrent/policy.hpp @@ -68,10 +68,7 @@ namespace libtorrent free_upload_amount = 4 * 16 * 1024 }; - void request_a_block( - torrent& t - , peer_connection& c - , std::vector ignore = std::vector()); + void request_a_block(torrent& t, peer_connection& c); class TORRENT_EXPORT policy { @@ -121,9 +118,9 @@ namespace libtorrent struct peer { - enum connection_type { not_connectable,connectable }; + enum connection_type { not_connectable, connectable }; - peer(const tcp::endpoint& ip, connection_type t, int src); + peer(tcp::endpoint const& ip, connection_type t, int src); size_type total_download() const; size_type total_upload() const; diff --git a/libtorrent/include/libtorrent/torrent.hpp b/libtorrent/include/libtorrent/torrent.hpp index 90a9c2c57..8938d50af 100755 --- a/libtorrent/include/libtorrent/torrent.hpp +++ b/libtorrent/include/libtorrent/torrent.hpp @@ -262,6 +262,8 @@ namespace libtorrent // decreased in the piece_picker void remove_peer(peer_connection* p); + void cancel_block(piece_block block); + bool want_more_peers() const; bool try_connect_peer(); diff --git a/libtorrent/include/libtorrent/torrent_handle.hpp b/libtorrent/include/libtorrent/torrent_handle.hpp index b5e6bdc17..6fa8d1082 100755 --- a/libtorrent/include/libtorrent/torrent_handle.hpp +++ b/libtorrent/include/libtorrent/torrent_handle.hpp @@ -211,7 +211,7 @@ namespace libtorrent tcp::endpoint peer; unsigned state:2; - unsigned num_downloads:14; + unsigned num_peers:14; }; struct TORRENT_EXPORT partial_piece_info diff --git a/libtorrent/src/peer_connection.cpp b/libtorrent/src/peer_connection.cpp index 739378c85..cdad6a7b9 100755 --- a/libtorrent/src/peer_connection.cpp +++ b/libtorrent/src/peer_connection.cpp @@ -1081,10 +1081,6 @@ namespace libtorrent , m_download_queue.end() , block_finished); - // if there's another peer that needs to do another - // piece request, this will point to it - peer_connection* request_peer = 0; - if (b != m_download_queue.end()) { if (m_assume_fifo) @@ -1113,48 +1109,32 @@ namespace libtorrent { m_download_queue.erase(b); } + + t->cancel_block(block_finished); } else { -/* // cancel the block from the - // peer that has taken over it. - boost::optional peer - = t->picker().get_downloader(block_finished); - if (peer && t->picker().is_requested(block_finished)) + if (t->alerts().should_post(alert::debug)) { - peer_connection* pc = t->connection_for(*peer); - if (pc && pc != this) - { - pc->cancel_request(block_finished); - request_peer = pc; - } + t->alerts().post_alert( + peer_error_alert( + m_remote + , m_peer_id + , "got a block that was not in the request queue")); } - else -*/ { - if (t->alerts().should_post(alert::debug)) - { - t->alerts().post_alert( - peer_error_alert( - m_remote - , m_peer_id - , "got a block that was not requested")); - } #ifdef TORRENT_VERBOSE_LOGGING - (*m_logger) << " *** The block we just got was not in the " - "request queue ***\n"; + (*m_logger) << " *** The block we just got was not in the " + "request queue ***\n"; #endif - t->received_redundant_data(p.length); - if (!has_peer_choked()) - { - request_a_block(*t, *this); - send_block_requests(); - } - return; + t->received_redundant_data(p.length); + if (!has_peer_choked()) + { + request_a_block(*t, *this); + send_block_requests(); } + return; } - assert(picker.is_requested(block_finished)); - // if the block we got is already finished, then ignore it if (picker.is_downloaded(block_finished)) { @@ -1165,25 +1145,12 @@ namespace libtorrent request_a_block(*t, *this); send_block_requests(); } - - if (request_peer && !request_peer->has_peer_choked() && !t->is_seed()) - { - request_a_block(*t, *request_peer); - request_peer->send_block_requests(); - } return; } fs.async_write(p, data, bind(&peer_connection::on_disk_write_complete , self(), _1, _2, p, t)); picker.mark_as_writing(block_finished, peer_info_struct()); - - if (request_peer && !request_peer->has_peer_choked() && !t->is_seed()) - { - request_a_block(*t, *request_peer); - request_peer->send_block_requests(); - } - } void peer_connection::on_disk_write_complete(int ret, disk_io_job const& j @@ -1319,7 +1286,7 @@ namespace libtorrent assert(block.piece_index < t->torrent_file().num_pieces()); assert(block.block_index >= 0); assert(block.block_index < t->torrent_file().piece_size(block.piece_index)); - assert(!t->picker().is_requested(block)); + assert(!t->picker().is_requested(block) || (t->picker().num_peers(block) > 0)); piece_picker::piece_state_t state; peer_speed_t speed = peer_speed(); @@ -1345,17 +1312,22 @@ namespace libtorrent assert(block.piece_index < t->torrent_file().num_pieces()); assert(block.block_index >= 0); assert(block.block_index < t->torrent_file().piece_size(block.piece_index)); - assert(t->picker().is_requested(block)); - t->picker().abort_download(block); + // if all the peers that requested this block has been + // cancelled, then just ignore the cancel. + if (!t->picker().is_requested(block)) return; std::deque::iterator it = std::find(m_download_queue.begin(), m_download_queue.end(), block); if (it == m_download_queue.end()) { it = std::find(m_request_queue.begin(), m_request_queue.end(), block); - assert(it != m_request_queue.end()); + // when a multi block is received, it is cancelled + // from all peers, so if this one hasn't requested + // the block, just ignore to cancel it. if (it == m_request_queue.end()) return; + + t->picker().abort_download(block); m_request_queue.erase(it); // since we found it in the request queue, it means it hasn't been // sent yet, so we don't have to send a cancel. @@ -1364,6 +1336,7 @@ namespace libtorrent else { m_download_queue.erase(it); + t->picker().abort_download(block); } int block_offset = block.block_index * t->block_size(); diff --git a/libtorrent/src/piece_picker.cpp b/libtorrent/src/piece_picker.cpp index 82d2ccbdb..bd99be6e6 100755 --- a/libtorrent/src/piece_picker.cpp +++ b/libtorrent/src/piece_picker.cpp @@ -209,7 +209,7 @@ namespace libtorrent ret.info = &m_block_info[block_index]; for (int i = 0; i < m_blocks_per_piece; ++i) { - ret.info[i].num_downloads = 0; + ret.info[i].num_peers = 0; ret.info[i].state = block_info::state_none; ret.info[i].peer = 0; } @@ -1087,8 +1087,8 @@ namespace libtorrent && info.peer != peer) { exclusive = false; - if (info.peer == 0 - || static_cast(info.peer)->connection == 0) + if (info.state == piece_picker::block_info::state_requested + && info.peer != 0) { exclusive_active = false; return boost::make_tuple(exclusive, exclusive_active); @@ -1322,6 +1322,7 @@ namespace libtorrent block_info& info = dp.info[block.block_index]; info.state = block_info::state_requested; info.peer = peer; + info.num_peers = 1; ++dp.requested; } else @@ -1330,14 +1331,38 @@ namespace libtorrent = std::find_if(m_downloads.begin(), m_downloads.end(), has_index(block.piece_index)); assert(i != m_downloads.end()); block_info& info = i->info[block.block_index]; - assert(info.state == block_info::state_none); + assert(info.state == block_info::state_none + || (info.state == block_info::state_requested + && (info.num_peers > 0))); info.peer = peer; - info.state = block_info::state_requested; - ++i->requested; + if (info.state != block_info::state_requested) + { + info.state = block_info::state_requested; + ++i->requested; + } + ++info.num_peers; if (i->state == none) i->state = state; } } + int piece_picker::num_peers(piece_block block) const + { + assert(block.piece_index >= 0); + assert(block.block_index >= 0); + assert(block.piece_index < (int)m_piece_map.size()); + assert(block.block_index < blocks_in_piece(block.piece_index)); + + piece_pos const& p = m_piece_map[block.piece_index]; + if (!p.downloading) return 0; + + std::vector::const_iterator i + = std::find_if(m_downloads.begin(), m_downloads.end(), has_index(block.piece_index)); + assert(i != m_downloads.end()); + + block_info const& info = i->info[block.block_index]; + return info.num_peers; + } + void piece_picker::get_availability(std::vector& avail) const { TORRENT_PIECE_PICKER_INVARIANT_CHECK; @@ -1393,6 +1418,8 @@ namespace libtorrent assert (info.state != block_info::state_writing); ++i->writing; info.state = block_info::state_writing; + if (info.num_peers > 0) --info.num_peers; + assert(info.num_peers >= 0); if (i->requested == 0) { @@ -1509,6 +1536,11 @@ namespace libtorrent , m_downloads.end(), has_index(block.piece_index)); assert(i != m_downloads.end()); + block_info& info = i->info[block.block_index]; + --info.num_peers; + assert(info.num_peers >= 0); + if (info.num_peers > 0) return; + if (i->info[block.block_index].state == block_info::state_finished || i->info[block.block_index].state == block_info::state_writing) { @@ -1519,11 +1551,11 @@ namespace libtorrent assert(i->info[block.block_index].state == block_info::state_requested); // clear this block as being downloaded - i->info[block.block_index].state = block_info::state_none; + info.state = block_info::state_none; --i->requested; // clear the downloader of this block - i->info[block.block_index].peer = 0; + info.peer = 0; // if there are no other blocks in this piece // that's being downloaded, remove it from the list diff --git a/libtorrent/src/policy.cpp b/libtorrent/src/policy.cpp index 917694721..9835738d7 100755 --- a/libtorrent/src/policy.cpp +++ b/libtorrent/src/policy.cpp @@ -189,8 +189,7 @@ namespace libtorrent // infinite loop, fighting to request the same blocks. void request_a_block( torrent& t - , peer_connection& c - , std::vector ignore) + , peer_connection& c) { assert(!t.is_seed()); assert(!c.has_peer_choked()); @@ -254,6 +253,13 @@ namespace libtorrent { if (p.is_requested(*i)) { + // don't request pieces we already have in our request queue + const std::deque& dq = c.download_queue(); + const std::deque& rq = c.request_queue(); + if (std::find(dq.begin(), dq.end(), *i) != dq.end() + || std::find(rq.begin(), rq.end(), *i) != rq.end()) + continue; + busy_pieces.push_back(*i); continue; } @@ -265,139 +271,26 @@ namespace libtorrent num_requests--; } - c.send_block_requests(); - // in this case, we could not find any blocks // that was free. If we couldn't find any busy // blocks as well, we cannot download anything // more from this peer. - if (busy_pieces.empty()) return; - - // first look for blocks that are just queued - // and not actually sent to us yet - // (then we can cancel those and request them - // from this peer instead) - - while (num_requests > 0) + if (busy_pieces.empty()) { - peer_connection* peer = 0; - - const int initial_queue_size = (int)c.download_queue().size() - + (int)c.request_queue().size(); - - // This peer's weight will be the minimum, to prevent - // cancelling requests from a faster peer. - float min_weight = initial_queue_size == 0 - ? std::numeric_limits::max() - : c.statistics().download_payload_rate() / initial_queue_size; - - // find the peer with the lowest download - // speed that also has a piece that this - // peer could send us - for (torrent::peer_iterator i = t.begin(); - i != t.end(); ++i) - { - // don't try to take over blocks from ourself - if (i->second == &c) - continue; - - // ignore all peers in the ignore list - if (std::find(ignore.begin(), ignore.end(), i->second) != ignore.end()) - continue; - - const std::deque& download_queue = i->second->download_queue(); - const std::deque& request_queue = i->second->request_queue(); - const int queue_size = (int)i->second->download_queue().size() - + (int)i->second->request_queue().size(); - - bool in_request_queue = std::find_first_of( - busy_pieces.begin() - , busy_pieces.end() - , request_queue.begin() - , request_queue.end()) != busy_pieces.end(); - - bool in_download_queue = std::find_first_of( - busy_pieces.begin() - , busy_pieces.end() - , download_queue.begin() - , download_queue.end()) != busy_pieces.end(); - - // if the block is in the request queue rather than the download queue - // (i.e. the request message hasn't been sent yet) lower the weight in - // order to prioritize it. Taking over a block in the request queue is - // free in terms of redundant download. A block that already has been - // requested is likely to be in transit already, and would in that case - // mean redundant data to receive. - const float weight = (queue_size == 0) - ? std::numeric_limits::max() - : i->second->statistics().download_payload_rate() / queue_size - * in_request_queue ? .1f : 1.f; - - // if the peer's (i) weight is less than the lowest we've found so - // far (weight == priority) and it has blocks in its request- - // or download queue that we could request from this peer (c), - // replace the currently lowest ranking peer. - if (weight < min_weight && (in_request_queue || in_download_queue)) - { - peer = i->second; - min_weight = weight; - } - } - - if (peer == 0) - { - // we probably couldn't request the block because - // we are ignoring some peers - break; - } - - // find a suitable block to take over from this peer - - std::deque::const_reverse_iterator common_block = - std::find_first_of( - peer->request_queue().rbegin() - , peer->request_queue().rend() - , busy_pieces.begin() - , busy_pieces.end()); - - if (common_block == peer->request_queue().rend()) - { - common_block = std::find_first_of( - peer->download_queue().rbegin() - , peer->download_queue().rend() - , busy_pieces.begin() - , busy_pieces.end()); - assert(common_block != peer->download_queue().rend()); - } - - piece_block block = *common_block; - - // the one we interrupted may need to request a new piece. - // make sure it doesn't take over a block from the peer - // that just took over its block (that would cause an - // infinite recursion) - peer->cancel_request(block); - c.add_request(block); - ignore.push_back(&c); - if (!peer->has_peer_choked() && !t.is_seed()) - { - request_a_block(t, *peer, ignore); - peer->send_block_requests(); - } - - num_requests--; - - const int queue_size = (int)c.download_queue().size() - + (int)c.request_queue().size(); - const float weight = queue_size == 0 - ? std::numeric_limits::max() - : c.statistics().download_payload_rate() / queue_size; - - // this peer doesn't have a faster connection than the - // slowest peer. Don't take over any blocks - if (weight <= min_weight) break; + c.send_block_requests(); + return; } + + std::random_shuffle(busy_pieces.begin(), busy_pieces.end()); + + // find the block with the fewest requests to it + std::vector::iterator i = std::min_element( + busy_pieces.begin(), busy_pieces.end() + , bind(&piece_picker::num_peers, boost::cref(p), _1) < + bind(&piece_picker::num_peers, boost::cref(p), _2)); + + c.add_request(*i); c.send_block_requests(); } diff --git a/libtorrent/src/torrent.cpp b/libtorrent/src/torrent.cpp index 788c45c57..6ed6dd65a 100755 --- a/libtorrent/src/torrent.cpp +++ b/libtorrent/src/torrent.cpp @@ -1362,6 +1362,15 @@ namespace libtorrent return req; } + void torrent::cancel_block(piece_block block) + { + for (peer_iterator i = m_connections.begin() + , end(m_connections.end()); i != end; ++i) + { + i->second->cancel_request(block); + } + } + void torrent::remove_peer(peer_connection* p) try { INVARIANT_CHECK; diff --git a/libtorrent/src/torrent_handle.cpp b/libtorrent/src/torrent_handle.cpp index 4ebe4e904..00877aa16 100755 --- a/libtorrent/src/torrent_handle.cpp +++ b/libtorrent/src/torrent_handle.cpp @@ -784,7 +784,7 @@ namespace libtorrent pi.blocks[j].peer = p->ip; } - pi.blocks[j].num_downloads = i->info[j].num_downloads; + pi.blocks[j].num_peers = i->info[j].num_peers; pi.blocks[j].state = i->info[j].state; } pi.piece_index = i->index;