fast-extension support from libtorrent

This commit is contained in:
Marcos Pinto 2007-08-14 17:59:02 +00:00
parent 1800dbad86
commit b26575e920
11 changed files with 647 additions and 81 deletions

View File

@ -122,8 +122,16 @@ namespace libtorrent
msg_request,
msg_piece,
msg_cancel,
// DHT extension
msg_dht_port,
// extension protocol message
// FAST extension
msg_suggest_piece = 0xd,
msg_have_all,
msg_have_none,
msg_reject_request,
msg_allowed_fast,
// extension protocol message
msg_extended = 20,
num_supported_messages
@ -174,8 +182,17 @@ namespace libtorrent
void on_request(int received);
void on_piece(int received);
void on_cancel(int received);
// DHT extension
void on_dht_port(int received);
// FAST extension
void on_suggest_piece(int received);
void on_have_all(int received);
void on_have_none(int received);
void on_reject_request(int received);
void on_allowed_fast(int received);
void on_extended(int received);
void on_extended_handshake();
@ -201,7 +218,16 @@ namespace libtorrent
void write_metadata(std::pair<int, int> req);
void write_metadata_request(std::pair<int, int> req);
void write_keepalive();
// DHT extension
void write_dht_port(int listen_port);
// FAST extension
void write_have_all();
void write_have_none();
void write_reject_request(peer_request const&);
void write_allow_fast(int piece);
void on_connected();
void on_metadata();
@ -325,6 +351,7 @@ namespace libtorrent
bool m_supports_extensions;
#endif
bool m_supports_dht_port;
bool m_supports_fast;
#ifndef TORRENT_DISABLE_ENCRYPTION
// this is set to true after the encryption method has been

View File

@ -131,6 +131,15 @@ namespace libtorrent
virtual bool on_bitfield(std::vector<bool> const& bitfield)
{ return false; }
virtual bool on_have_all()
{ return false; }
virtual bool on_have_none()
{ return false; }
virtual bool on_allowed_fast(int index)
{ return false; }
virtual bool on_request(peer_request const& req)
{ return false; }

View File

@ -131,6 +131,8 @@ namespace libtorrent
enum peer_speed_t { slow, medium, fast };
peer_speed_t peer_speed();
void send_allowed_set();
#ifndef TORRENT_DISABLE_EXTENSIONS
void add_extension(boost::shared_ptr<peer_plugin>);
#endif
@ -186,9 +188,9 @@ namespace libtorrent
void set_pid(const peer_id& pid) { m_peer_id = pid; }
bool has_piece(int i) const;
const std::deque<piece_block>& download_queue() const;
const std::deque<piece_block>& request_queue() const;
const std::deque<peer_request>& upload_queue() const;
std::deque<piece_block> const& download_queue() const;
std::deque<piece_block> const& request_queue() const;
std::deque<peer_request> const& upload_queue() const;
bool is_interesting() const { return m_interesting; }
bool is_choked() const { return m_choked; }
@ -217,6 +219,7 @@ namespace libtorrent
tcp::endpoint const& remote() const { return m_remote; }
std::vector<bool> const& get_bitfield() const;
std::vector<int> const& allowed_fast();
void timed_out();
// this will cause this peer_connection to be disconnected.
@ -294,8 +297,14 @@ namespace libtorrent
void incoming_piece(peer_request const& p, char const* data);
void incoming_piece_fragment();
void incoming_cancel(peer_request const& r);
void incoming_dht_port(int listen_port);
void incoming_reject_request(peer_request const& r);
void incoming_have_all();
void incoming_have_none();
void incoming_allowed_fast(int index);
// the following functions appends messages
// to the send buffer
void send_choke();
@ -373,6 +382,9 @@ namespace libtorrent
virtual void write_keepalive() = 0;
virtual void write_piece(peer_request const& r, char const* buffer) = 0;
virtual void write_reject_request(peer_request const& r) = 0;
virtual void write_allow_fast(int piece) = 0;
virtual void on_connected() = 0;
virtual void on_tick() {}
@ -529,7 +541,7 @@ namespace libtorrent
// set to the torrent it belongs to.
boost::weak_ptr<torrent> m_torrent;
// is true if it was we that connected to the peer
// and false if we got an incomming connection
// and false if we got an incoming connection
// could be considered: true = local, false = remote
bool m_active;
@ -563,6 +575,10 @@ namespace libtorrent
// the pieces the other end have
std::vector<bool> m_have_piece;
// this is set to true when a have_all
// message is received. This information
// is used to fill the bitmask in init()
bool m_have_all;
// the number of pieces this peer
// has. Must be the same as
@ -695,6 +711,14 @@ namespace libtorrent
// was last updated
ptime m_remote_dl_update;
// the pieces we will send to the peer
// if requested (regardless of choke state)
std::set<int> m_accept_fast;
// the pieces the peer will send us if
// requested (regardless of choke state)
std::vector<int> m_allowed_fast;
// the number of bytes send to the disk-io
// thread that hasn't yet been completely written.
int m_outstanding_writing_bytes;

View File

@ -197,6 +197,19 @@ namespace libtorrent
, void* peer, piece_state_t speed
, bool rarest_first) const;
// picks blocks from each of the pieces in the piece_list
// vector that is also in the piece bitmask. The blocks
// are added to interesting_blocks, and busy blocks are
// added to backup_blocks. num blocks is the number of
// blocks to be picked.
int add_interesting_blocks(const std::vector<int>& piece_list
, const std::vector<bool>& pieces
, std::vector<piece_block>& interesting_blocks
, std::vector<piece_block>& backup_blocks
, int num_blocks, bool prefer_whole_pieces
, void* peer, piece_state_t speed
, bool ignore_downloading_pieces) const;
// clears the peer pointer in all downloading pieces with this
// peer pointer
void clear_peer(void* peer);
@ -358,14 +371,6 @@ namespace libtorrent
void move(int vec_index, int elem_index);
void sort_piece(std::vector<downloading_piece>::iterator dp);
int add_interesting_blocks(const std::vector<int>& piece_list
, const std::vector<bool>& pieces
, std::vector<piece_block>& interesting_blocks
, std::vector<piece_block>& backup_blocks
, int num_blocks, bool prefer_whole_pieces
, void* peer, piece_state_t speed
, bool ignore_downloading_pieces) const;
downloading_piece& add_download_piece();
void erase_download_piece(std::vector<downloading_piece>::iterator i);

View File

@ -53,6 +53,7 @@ POSSIBILITY OF SUCH DAMAGE.
#pragma warning(pop)
#endif
#include "libtorrent/config.hpp"
#include "libtorrent/torrent_handle.hpp"
#include "libtorrent/entry.hpp"
#include "libtorrent/alert.hpp"

View File

@ -108,6 +108,7 @@ namespace libtorrent
, unchoke_interval(20)
, num_want(200)
, initial_picker_threshold(4)
, allowed_fast_set_size(10)
, max_outstanding_disk_bytes_per_connection(64 * 1024)
#ifndef TORRENT_DISABLE_DHT
, use_dht_as_fallback(true)
@ -252,6 +253,10 @@ namespace libtorrent
// random pieces instead of rarest first.
int initial_picker_threshold;
// the number of allowed pieces to send to peers
// that supports the fast extensions
int allowed_fast_set_size;
// the maximum number of bytes a connection may have
// pending in the disk write queue before its download
// rate is being throttled. This prevents fast downloads

View File

@ -126,6 +126,8 @@ namespace libtorrent
void write_piece(peer_request const& r, char const* buffer) { assert(false); }
void write_keepalive() {}
void on_connected();
void write_reject_request(peer_request const&) {}
void write_allow_fast(int) {}
#ifndef NDEBUG
void check_invariant() const;

View File

@ -75,7 +75,14 @@ namespace libtorrent
&bt_peer_connection::on_piece,
&bt_peer_connection::on_cancel,
&bt_peer_connection::on_dht_port,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0,
// FAST extension messages
&bt_peer_connection::on_suggest_piece,
&bt_peer_connection::on_have_all,
&bt_peer_connection::on_have_none,
&bt_peer_connection::on_reject_request,
&bt_peer_connection::on_allowed_fast,
0, 0,
&bt_peer_connection::on_extended
};
@ -93,6 +100,7 @@ namespace libtorrent
, m_supports_extensions(false)
#endif
, m_supports_dht_port(false)
, m_supports_fast(false)
#ifndef TORRENT_DISABLE_ENCRYPTION
, m_encrypted(false)
, m_rc4_encrypted(false)
@ -124,6 +132,7 @@ namespace libtorrent
, m_supports_extensions(false)
#endif
, m_supports_dht_port(false)
, m_supports_fast(false)
#ifndef TORRENT_DISABLE_ENCRYPTION
, m_encrypted(false)
, m_rc4_encrypted(false)
@ -226,6 +235,10 @@ namespace libtorrent
boost::shared_ptr<torrent> t = associated_torrent().lock();
assert(t);
write_bitfield(t->pieces());
#ifndef TORRENT_DISABLE_DHT
if (m_supports_dht_port && m_ses.m_dht)
write_dht_port(m_ses.get_dht_settings().service_port);
#endif
}
void bt_peer_connection::write_dht_port(int listen_port)
@ -246,6 +259,75 @@ namespace libtorrent
setup_send();
}
void bt_peer_connection::write_have_all()
{
INVARIANT_CHECK;
assert(m_sent_handshake && !m_sent_bitfield);
#ifndef NDEBUG
m_sent_bitfield = true;
#endif
#ifdef TORRENT_VERBOSE_LOGGING
(*m_logger) << time_now_string()
<< " ==> HAVE_ALL\n";
#endif
char buf[] = {0,0,0,1, msg_have_all};
send_buffer(buf, buf + sizeof(buf));
}
void bt_peer_connection::write_have_none()
{
INVARIANT_CHECK;
assert(m_sent_handshake && !m_sent_bitfield);
#ifndef NDEBUG
m_sent_bitfield = true;
#endif
#ifdef TORRENT_VERBOSE_LOGGING
(*m_logger) << time_now_string()
<< " ==> HAVE_NONE\n";
#endif
char buf[] = {0,0,0,1, msg_have_none};
send_buffer(buf, buf + sizeof(buf));
}
void bt_peer_connection::write_reject_request(peer_request const& r)
{
INVARIANT_CHECK;
assert(m_sent_handshake && m_sent_bitfield);
assert(associated_torrent().lock()->valid_metadata());
char buf[] = {0,0,0,13, msg_reject_request};
buffer::interval i = allocate_send_buffer(17);
std::copy(buf, buf + 5, i.begin);
i.begin += 5;
// index
detail::write_int32(r.piece, i.begin);
// begin
detail::write_int32(r.start, i.begin);
// length
detail::write_int32(r.length, i.begin);
assert(i.begin == i.end);
setup_send();
}
void bt_peer_connection::write_allow_fast(int piece)
{
INVARIANT_CHECK;
assert(m_sent_handshake && m_sent_bitfield);
assert(associated_torrent().lock()->valid_metadata());
char buf[] = {0,0,0,5, msg_allowed_fast, 0, 0, 0, 0};
char* ptr = buf + 5;
detail::write_int32(piece, ptr);
send_buffer(buf, buf + sizeof(buf));
}
void bt_peer_connection::get_specific_peer_info(peer_info& p) const
{
assert(!associated_torrent().expired());
@ -636,6 +718,9 @@ namespace libtorrent
*(i.begin + 5) = 0x10;
#endif
// we support FAST extension
*(i.begin + 7) = 0x04;
i.begin += 8;
// info hash
@ -721,6 +806,20 @@ namespace libtorrent
if (!packet_finished()) return;
incoming_choke();
if (!m_supports_fast)
{
boost::shared_ptr<torrent> t = associated_torrent().lock();
assert(t);
while (!request_queue().empty())
{
piece_block const& b = request_queue().front();
peer_request r;
r.piece = b.piece_index;
r.start = b.block_index * t->block_size();
r.length = t->block_size();
incoming_reject_request(r);
}
}
}
// -----------------------------
@ -939,6 +1038,9 @@ namespace libtorrent
{
INVARIANT_CHECK;
if (!m_supports_dht_port)
throw protocol_error("got 'dht_port' message from peer that doesn't support it");
assert(received > 0);
if (packet_size() != 3)
throw protocol_error("'dht_port' message size != 3");
@ -953,6 +1055,74 @@ namespace libtorrent
incoming_dht_port(listen_port);
}
void bt_peer_connection::on_suggest_piece(int received)
{
INVARIANT_CHECK;
if (!m_supports_fast)
throw protocol_error("got 'suggest_piece' without FAST extension support");
// just ignore for now
return;
}
void bt_peer_connection::on_have_all(int received)
{
INVARIANT_CHECK;
if (!m_supports_fast)
throw protocol_error("got 'have_all' without FAST extension support");
m_statistics.received_bytes(0, received);
incoming_have_all();
}
void bt_peer_connection::on_have_none(int received)
{
INVARIANT_CHECK;
if (!m_supports_fast)
throw protocol_error("got 'have_none' without FAST extension support");
m_statistics.received_bytes(0, received);
incoming_have_none();
}
void bt_peer_connection::on_reject_request(int received)
{
INVARIANT_CHECK;
if (!m_supports_fast)
throw protocol_error("got 'reject_request' without FAST extension support");
m_statistics.received_bytes(0, received);
if (!packet_finished()) return;
buffer::const_interval recv_buffer = receive_buffer();
peer_request r;
const char* ptr = recv_buffer.begin + 1;
r.piece = detail::read_int32(ptr);
r.start = detail::read_int32(ptr);
r.length = detail::read_int32(ptr);
incoming_reject_request(r);
}
void bt_peer_connection::on_allowed_fast(int received)
{
INVARIANT_CHECK;
if (!m_supports_fast)
throw protocol_error("got 'allowed_fast' without FAST extension support");
m_statistics.received_bytes(0, received);
if (!packet_finished()) return;
buffer::const_interval recv_buffer = receive_buffer();
const char* ptr = recv_buffer.begin + 1;
int index = detail::read_int32(ptr);
incoming_allowed_fast(index);
}
// -----------------------------
// --------- EXTENDED ----------
// -----------------------------
@ -1175,6 +1345,22 @@ namespace libtorrent
assert(m_sent_handshake && !m_sent_bitfield);
assert(t->valid_metadata());
// in this case, have_all or have_none should be sent instead
assert(!m_supports_fast || !t->is_seed() || t->num_pieces() != 0);
if (m_supports_fast && t->is_seed())
{
write_have_all();
send_allowed_set();
return;
}
else if (m_supports_fast && t->num_pieces() == 0)
{
write_have_none();
send_allowed_set();
return;
}
int num_pieces = bitfield.size();
int lazy_pieces[50];
int num_lazy_pieces = 0;
@ -1251,6 +1437,9 @@ namespace libtorrent
#endif
}
}
if (m_supports_fast)
send_allowed_set();
}
#ifndef TORRENT_DISABLE_EXTENSIONS
@ -2016,6 +2205,9 @@ namespace libtorrent
if (recv_buffer[7] & 0x01)
m_supports_dht_port = true;
if (recv_buffer[7] & 0x04)
m_supports_fast = true;
// ok, now we have got enough of the handshake. Is this connection
// attached to a torrent?
if (!t)
@ -2049,10 +2241,10 @@ namespace libtorrent
assert(t);
// if this is a local connection, we have already
// send the handshake
// sent the handshake
if (!is_local()) write_handshake();
if (t->valid_metadata())
write_bitfield(t->pieces());
// if (t->valid_metadata())
// write_bitfield(t->pieces());
assert(t->get_policy().has_connection(this));
@ -2125,11 +2317,6 @@ namespace libtorrent
throw protocol_error("closing connection to ourself");
}
#ifndef TORRENT_DISABLE_DHT
if (m_supports_dht_port && m_ses.m_dht)
write_dht_port(m_ses.get_dht_settings().service_port);
#endif
m_client_version = identify_client(pid);
boost::optional<fingerprint> f = client_fingerprint(pid);
if (f && std::equal(f->name, f->name + 2, "BC"))
@ -2181,6 +2368,14 @@ namespace libtorrent
m_state = read_packet_size;
reset_recv_buffer(4);
if (t->valid_metadata())
{
write_bitfield(t->pieces());
#ifndef TORRENT_DISABLE_DHT
if (m_supports_dht_port && m_ses.m_dht)
write_dht_port(m_ses.get_dht_settings().service_port);
#endif
}
assert(!packet_finished());
return;

View File

@ -93,6 +93,7 @@ namespace libtorrent
, m_choked(true)
, m_failed(false)
, m_ignore_bandwidth_limits(false)
, m_have_all(false)
, m_num_pieces(0)
, m_desired_queue_size(2)
, m_free_upload(0)
@ -168,6 +169,7 @@ namespace libtorrent
, m_choked(true)
, m_failed(false)
, m_ignore_bandwidth_limits(false)
, m_have_all(false)
, m_num_pieces(0)
, m_desired_queue_size(2)
, m_free_upload(0)
@ -251,6 +253,67 @@ namespace libtorrent
}
#endif
void peer_connection::send_allowed_set()
{
INVARIANT_CHECK;
boost::shared_ptr<torrent> t = m_torrent.lock();
assert(t);
int num_allowed_pieces = m_ses.settings().allowed_fast_set_size;
int num_pieces = t->torrent_file().num_pieces();
if (num_allowed_pieces >= num_pieces)
{
for (int i = 0; i < num_pieces; ++i)
{
#ifdef TORRENT_VERBOSE_LOGGING
(*m_logger) << time_now_string()
<< " ==> ALLOWED_FAST [ " << i << " ]\n";
#endif
write_allow_fast(i);
m_accept_fast.insert(i);
}
return;
}
std::string x;
address const& addr = m_remote.address();
if (addr.is_v4())
{
address_v4::bytes_type bytes = addr.to_v4().to_bytes();
x.assign((char*)&bytes[0], bytes.size());
}
else
{
address_v6::bytes_type bytes = addr.to_v6().to_bytes();
x.assign((char*)&bytes[0], bytes.size());
}
x.append((char*)&t->torrent_file().info_hash()[0], 20);
sha1_hash hash = hasher(&x[0], x.size()).final();
for (;;)
{
char* p = (char*)&hash[0];
for (int i = 0; i < 5; ++i)
{
int piece = detail::read_uint32(p) % num_pieces;
if (m_accept_fast.find(piece) == m_accept_fast.end())
{
#ifdef TORRENT_VERBOSE_LOGGING
(*m_logger) << time_now_string()
<< " ==> ALLOWED_FAST [ " << piece << " ]\n";
#endif
write_allow_fast(piece);
m_accept_fast.insert(piece);
if (int(m_accept_fast.size()) >= num_allowed_pieces
|| int(m_accept_fast.size()) == num_pieces) return;
}
}
hash = hasher((char*)&hash[0], 20).final();
}
}
void peer_connection::init()
{
INVARIANT_CHECK;
@ -260,7 +323,7 @@ namespace libtorrent
assert(t->valid_metadata());
assert(t->ready_for_connections());
m_have_piece.resize(t->torrent_file().num_pieces(), false);
m_have_piece.resize(t->torrent_file().num_pieces(), m_have_all);
// now that we have a piece_picker,
// update it with this peers pieces
@ -588,6 +651,74 @@ namespace libtorrent
m_request_queue.clear();
}
bool match_request(peer_request const& r, piece_block const& b, int block_size)
{
if (b.piece_index != r.piece) return false;
if (b.block_index != r.start / block_size) return false;
if (r.start % block_size != 0) return false;
return true;
}
// -----------------------------
// -------- REJECT PIECE -------
// -----------------------------
void peer_connection::incoming_reject_request(peer_request const& r)
{
INVARIANT_CHECK;
boost::shared_ptr<torrent> t = m_torrent.lock();
assert(t);
std::deque<piece_block>::iterator i = std::find_if(
m_download_queue.begin(), m_download_queue.end()
, bind(match_request, boost::cref(r), _1, t->block_size()));
#ifdef TORRENT_VERBOSE_LOGGING
(*m_logger) << time_now_string()
<< " <== REJECT_PIECE [ piece: " << r.piece << " | s: " << r.start << " | l: " << r.length << " ]\n";
#endif
piece_block b(-1, 0);
if (i != m_download_queue.end())
{
b = *i;
m_download_queue.erase(i);
}
else
{
i = std::find_if(m_request_queue.begin(), m_request_queue.end()
, bind(match_request, boost::cref(r), _1, t->block_size()));
if (i != m_request_queue.end())
{
b = *i;
m_request_queue.erase(i);
}
}
if (b.piece_index != -1 && !t->is_seed())
{
piece_picker& p = t->picker();
p.abort_download(b);
}
#ifdef TORRENT_VERBOSE_LOGGING
else
{
(*m_logger) << time_now_string()
<< " *** PIECE NOT IN REQUEST QUEUE\n";
}
#endif
if (m_request_queue.empty())
{
if (m_download_queue.size() < 2)
{
request_a_block(*t, *this);
}
send_block_requests();
}
}
// -----------------------------
// ---------- UNCHOKE ----------
// -----------------------------
@ -798,11 +929,12 @@ namespace libtorrent
{
m_have_piece = bitfield;
m_num_pieces = std::count(bitfield.begin(), bitfield.end(), true);
if (m_peer_info) m_peer_info->seed = true;
if (m_peer_info) m_peer_info->seed = (m_num_pieces == int(bitfield.size()));
return;
}
assert(t->valid_metadata());
int num_pieces = std::count(bitfield.begin(), bitfield.end(), true);
if (num_pieces == int(m_have_piece.size()))
{
@ -906,6 +1038,7 @@ namespace libtorrent
"t: " << (int)t->torrent_file().piece_size(r.piece) << " | "
"n: " << t->torrent_file().num_pieces() << " ]\n";
#endif
write_reject_request(r);
return;
}
@ -925,6 +1058,7 @@ namespace libtorrent
"t: " << (int)t->torrent_file().piece_size(r.piece) << " | "
"n: " << t->torrent_file().num_pieces() << " ]\n";
#endif
write_reject_request(r);
return;
}
@ -947,11 +1081,19 @@ namespace libtorrent
#endif
// if we have choked the client
// ignore the request
if (m_choked)
return;
m_requests.push_back(r);
fill_send_buffer();
if (m_choked && m_accept_fast.find(r.piece) == m_accept_fast.end())
{
write_reject_request(r);
#ifdef TORRENT_VERBOSE_LOGGING
(*m_logger) << time_now_string()
<< " *** REJECTING REQUEST [ peer choked and piece not in allowed fast set ]\n";
#endif
}
else
{
m_requests.push_back(r);
fill_send_buffer();
}
}
else
{
@ -968,6 +1110,7 @@ namespace libtorrent
"block_limit: " << t->block_size() << " ]\n";
#endif
write_reject_request(r);
++m_num_invalid_requests;
if (t->alerts().should_post(alert::debug))
@ -977,7 +1120,7 @@ namespace libtorrent
, t->get_handle()
, m_remote
, m_peer_id
, "peer sent an illegal piece request, ignoring"));
, "peer sent an illegal piece request"));
}
}
}
@ -1131,11 +1274,8 @@ namespace libtorrent
"request queue ***\n";
#endif
t->received_redundant_data(p.length);
if (!has_peer_choked())
{
request_a_block(*t, *this);
send_block_requests();
}
request_a_block(*t, *this);
send_block_requests();
return;
}
@ -1144,11 +1284,8 @@ namespace libtorrent
{
t->received_redundant_data(p.length);
if (!has_peer_choked())
{
request_a_block(*t, *this);
send_block_requests();
}
request_a_block(*t, *this);
send_block_requests();
return;
}
@ -1205,15 +1342,11 @@ namespace libtorrent
block_finished.block_index, block_finished.piece_index, "block finished"));
}
if (!has_peer_choked() && !t->is_seed() && !m_torrent.expired())
if (!t->is_seed() && !m_torrent.expired())
{
// this is a free function defined in policy.cpp
request_a_block(*t, *this);
try
{
send_block_requests();
}
catch (std::exception const&) {}
send_block_requests();
}
#ifndef NDEBUG
@ -1295,6 +1428,149 @@ namespace libtorrent
#endif
}
// -----------------------------
// --------- HAVE ALL ----------
// -----------------------------
void peer_connection::incoming_have_all()
{
INVARIANT_CHECK;
boost::shared_ptr<torrent> t = m_torrent.lock();
assert(t);
#ifdef TORRENT_VERBOSE_LOGGING
(*m_logger) << time_now_string() << " <== HAVE_ALL\n";
#endif
#ifndef TORRENT_DISABLE_EXTENSIONS
for (extension_list_t::iterator i = m_extensions.begin()
, end(m_extensions.end()); i != end; ++i)
{
if ((*i)->on_have_all()) return;
}
#endif
m_have_all = true;
if (m_peer_info) m_peer_info->seed = true;
// if we don't have metadata yet
// just remember the bitmask
// don't update the piecepicker
// (since it doesn't exist yet)
if (!t->ready_for_connections())
{
// TODO: this might need something more
// so that once we have the metadata
// we can construct a full bitfield
return;
}
#ifdef TORRENT_VERBOSE_LOGGING
(*m_logger) << " *** THIS IS A SEED ***\n";
#endif
// if we're a seed too, disconnect
if (t->is_seed())
throw protocol_error("seed to seed connection redundant, disconnecting");
assert(!m_have_piece.empty());
std::fill(m_have_piece.begin(), m_have_piece.end(), true);
t->peer_has_all();
if (!t->is_finished())
t->get_policy().peer_is_interesting(*this);
}
// -----------------------------
// --------- HAVE NONE ---------
// -----------------------------
void peer_connection::incoming_have_none()
{
INVARIANT_CHECK;
boost::shared_ptr<torrent> t = m_torrent.lock();
assert(t);
#ifdef TORRENT_VERBOSE_LOGGING
(*m_logger) << time_now_string() << " <== HAVE_NONE\n";
#endif
#ifndef TORRENT_DISABLE_EXTENSIONS
for (extension_list_t::iterator i = m_extensions.begin()
, end(m_extensions.end()); i != end; ++i)
{
if ((*i)->on_have_none()) return;
}
#endif
if (m_peer_info) m_peer_info->seed = false;
assert(!m_have_piece.empty() || !t->ready_for_connections());
}
// -----------------------------
// ------- ALLOWED FAST --------
// -----------------------------
void peer_connection::incoming_allowed_fast(int index)
{
INVARIANT_CHECK;
boost::shared_ptr<torrent> t = m_torrent.lock();
assert(t);
#ifdef TORRENT_VERBOSE_LOGGING
(*m_logger) << time_now_string() << " <== ALLOWED_FAST [ " << index << " ]\n";
#endif
#ifndef TORRENT_DISABLE_EXTENSIONS
for (extension_list_t::iterator i = m_extensions.begin()
, end(m_extensions.end()); i != end; ++i)
{
if ((*i)->on_allowed_fast(index)) return;
}
#endif
// if we already have the piece, we can
// ignore this message
if (t->valid_metadata()
&& t->have_piece(index))
return;
m_allowed_fast.push_back(index);
// if the peer has the piece and we want
// to download it, request it
if (m_have_piece.size() > index
&& m_have_piece[index]
&& t->has_picker()
&& t->picker().piece_priority(index) > 0)
{
t->get_policy().peer_is_interesting(*this);
}
}
std::vector<int> const& peer_connection::allowed_fast()
{
INVARIANT_CHECK;
boost::shared_ptr<torrent> t = m_torrent.lock();
assert(t);
for (std::vector<int>::iterator i = m_allowed_fast.begin()
, end(m_allowed_fast.end()); i != end; ++i)
{
if (!t->have_piece(*i)) continue;
*i = m_allowed_fast.back();
m_allowed_fast.pop_back();
}
// TODO: sort the allowed fast set in priority order
return m_allowed_fast;
}
void peer_connection::add_request(piece_block const& block)
{
INVARIANT_CHECK;
@ -1470,13 +1746,9 @@ namespace libtorrent
{
INVARIANT_CHECK;
if (has_peer_choked()) return;
boost::shared_ptr<torrent> t = m_torrent.lock();
assert(t);
assert(!has_peer_choked());
if ((int)m_download_queue.size() >= m_desired_queue_size) return;
while (!m_request_queue.empty()
@ -1854,11 +2126,8 @@ namespace libtorrent
m_assume_fifo = true;
if (!has_peer_choked())
{
request_a_block(*t, *this);
send_block_requests();
}
request_a_block(*t, *this);
send_block_requests();
}
}

View File

@ -1419,6 +1419,7 @@ namespace libtorrent
if (p.downloading == 0)
{
int prio = p.priority(m_sequenced_download_threshold);
assert(prio > 0);
p.downloading = 1;
move(prio, p.index);

View File

@ -187,17 +187,18 @@ namespace libtorrent
// have only one piece that we don't have, and it's the
// same piece for both peers. Then they might get into an
// infinite loop, fighting to request the same blocks.
void request_a_block(
torrent& t
, peer_connection& c)
void request_a_block(torrent& t, peer_connection& c)
{
assert(!t.is_seed());
assert(!c.has_peer_choked());
assert(t.valid_metadata());
assert(c.peer_info_struct() != 0 || !dynamic_cast<bt_peer_connection*>(&c));
int num_requests = c.desired_queue_size()
- (int)c.download_queue().size()
- (int)c.request_queue().size();
#ifdef TORRENT_VERBOSE_LOGGING
(*c.m_logger) << time_now_string() << " PIECE_PICKER [ req: " << num_requests << " ]\n";
#endif
assert(c.desired_queue_size() > 0);
// if our request queue is already full, we
// don't have to make any new requests yet
@ -231,18 +232,6 @@ namespace libtorrent
else if (speed == peer_connection::medium) state = piece_picker::medium;
else state = piece_picker::slow;
// picks the interesting pieces from this peer
// the integer is the number of pieces that
// should be guaranteed to be available for download
// (if num_requests is too big, too many pieces are
// picked and cpu-time is wasted)
// the last argument is if we should prefer whole pieces
// for this peer. If we're downloading one piece in 20 seconds
// then use this mode.
p.pick_pieces(c.get_bitfield(), interesting_pieces
, num_requests, prefer_whole_pieces, c.peer_info_struct()
, state, rarest_first);
// this vector is filled with the interesting pieces
// that some other peer is currently downloading
// we should then compare this peer's download speed
@ -251,6 +240,40 @@ namespace libtorrent
std::vector<piece_block> busy_pieces;
busy_pieces.reserve(num_requests);
if (c.has_peer_choked())
{
// if we are choked we can only pick pieces from the
// allowed fast set. The allowed fast set is sorted
// in ascending priority order
std::vector<int> const& allowed_fast = c.allowed_fast();
p.add_interesting_blocks(allowed_fast, c.get_bitfield()
, interesting_pieces, busy_pieces, num_requests
, prefer_whole_pieces, c.peer_info_struct(), state
, false);
interesting_pieces.insert(interesting_pieces.end()
, busy_pieces.begin(), busy_pieces.end());
busy_pieces.clear();
}
else
{
// picks the interesting pieces from this peer
// the integer is the number of pieces that
// should be guaranteed to be available for download
// (if num_requests is too big, too many pieces are
// picked and cpu-time is wasted)
// the last argument is if we should prefer whole pieces
// for this peer. If we're downloading one piece in 20 seconds
// then use this mode.
p.pick_pieces(c.get_bitfield(), interesting_pieces
, num_requests, prefer_whole_pieces, c.peer_info_struct()
, state, rarest_first);
busy_pieces.reserve(10);
}
#ifdef TORRENT_VERBOSE_LOGGING
(*c.m_logger) << time_now_string() << " PIECE_PICKER [ picked: " << interesting_pieces.size() << " ]\n";
#endif
for (std::vector<piece_block>::iterator i = interesting_pieces.begin();
i != interesting_pieces.end(); ++i)
{
@ -277,13 +300,13 @@ namespace libtorrent
num_requests--;
}
// in this case, we could not find any blocks
// that was free. If we couldn't find any busy
// blocks as well, we cannot download anything
// more from this peer.
if (busy_pieces.empty() || num_requests == 0)
{
// in this case, we could not find any blocks
// that was free. If we couldn't find any busy
// blocks as well, we cannot download anything
// more from this peer.
c.send_block_requests();
return;
}
@ -970,7 +993,7 @@ namespace libtorrent
INVARIANT_CHECK;
// just ignore the obviously invalid entries
if(remote.address() == address() || remote.port() == 0)
if (remote.address() == address() || remote.port() == 0)
return;
aux::session_impl& ses = m_torrent->session();
@ -1056,7 +1079,10 @@ namespace libtorrent
if (i->failcount > 0 && src != peer_info::dht)
--i->failcount;
if (flags & 0x02) i->seed = true;
// if we're connected to this peer
// we already know if it's a seed or not
// so we don't have to trust this source
if ((flags & 0x02) && !i->connection) i->seed = true;
if (i->connection)
{
@ -1327,7 +1353,9 @@ namespace libtorrent
INVARIANT_CHECK;
c.send_interested();
if (c.has_peer_choked()) return;
if (c.has_peer_choked()
&& c.allowed_fast().empty())
return;
request_a_block(*m_torrent, c);
}