lt sync 1638

This commit is contained in:
Marcos Pinto 2007-10-03 22:29:42 +00:00
parent 27360c6cf8
commit 1a6afef6b8
36 changed files with 943 additions and 521 deletions

View File

@ -115,6 +115,7 @@ namespace libtorrent
std::vector<piece_picker::downloading_piece> unfinished_pieces;
std::vector<piece_picker::block_info> block_info;
std::vector<tcp::endpoint> peers;
std::vector<tcp::endpoint> banned_peers;
entry resume_data;
// this is true if this torrent is being processed (checked)
@ -168,6 +169,10 @@ namespace libtorrent
// thread started to run the main downloader loop
struct session_impl: boost::noncopyable
{
// the size of each allocation that is chained in the send buffer
enum { send_buffer_size = 200 };
#ifndef NDEBUG
friend class ::libtorrent::peer_connection;
#endif
@ -329,6 +334,24 @@ namespace libtorrent
{ return m_dht_proxy; }
#endif
#ifdef TORRENT_STATS
void log_buffer_usage()
{
int send_buffer_capacity = 0;
int used_send_buffer = 0;
for (connection_map::const_iterator i = m_connections.begin()
, end(m_connections.end()); i != end; ++i)
{
send_buffer_capacity += i->second->send_buffer_capacity();
used_send_buffer += i->second->send_buffer_size();
}
assert(send_buffer_capacity >= used_send_buffer);
m_buffer_usage_logger << log_time() << " send_buffer_size: " << send_buffer_capacity << std::endl;
m_buffer_usage_logger << log_time() << " used_send_buffer: " << used_send_buffer << std::endl;
m_buffer_usage_logger << log_time() << " send_buffer_utilization: "
<< (used_send_buffer * 100.f / send_buffer_capacity) << std::endl;
}
#endif
void start_lsd();
void start_natpmp();
void start_upnp();
@ -339,11 +362,25 @@ namespace libtorrent
// handles delayed alerts
alert_manager m_alerts;
std::pair<char*, int> allocate_buffer(int size);
void free_buffer(char* buf, int size);
void free_disk_buffer(char* buf);
// private:
void on_lsd_peer(tcp::endpoint peer, sha1_hash const& ih);
// handles disk io requests asynchronously
// peers have pointers into the disk buffer
// pool, and must be destructed before this
// object.
disk_io_thread m_disk_thread;
// this pool is used to allocate and recycle send
// buffers from.
boost::pool<> m_send_buffers;
// this is where all active sockets are stored.
// the selector can sleep while there's no activity on
// them
@ -358,9 +395,6 @@ namespace libtorrent
// when they are destructed.
file_pool m_files;
// handles disk io requests asynchronously
disk_io_thread m_disk_thread;
// this is a list of half-open tcp connections
// (only outgoing connections)
// this has to be one of the last
@ -507,9 +541,9 @@ namespace libtorrent
pe_settings m_pe_settings;
#endif
boost::shared_ptr<natpmp> m_natpmp;
boost::shared_ptr<upnp> m_upnp;
boost::shared_ptr<lsd> m_lsd;
boost::intrusive_ptr<natpmp> m_natpmp;
boost::intrusive_ptr<upnp> m_upnp;
boost::intrusive_ptr<lsd> m_lsd;
// the timer used to fire the second_tick
deadline_timer m_timer;
@ -526,6 +560,10 @@ namespace libtorrent
// logger used to write bandwidth usage statistics
std::ofstream m_stats_logger;
int m_second_counter;
// used to log send buffer usage statistics
std::ofstream m_buffer_usage_logger;
// the number of send buffers that are allocated
int m_buffer_allocations;
#endif
#if defined(TORRENT_VERBOSE_LOGGING) || defined(TORRENT_LOGGING)
boost::shared_ptr<logger> create_log(std::string const& name

View File

@ -46,7 +46,7 @@ namespace libtorrent
bool is_multicast(address const& addr);
bool is_any(address const& addr);
address_v4 guess_local_address(asio::io_service&);
address guess_local_address(asio::io_service&);
typedef boost::function<void(udp::endpoint const& from
, char* buffer, int size)> receive_handler_t;

View File

@ -208,7 +208,7 @@ namespace libtorrent
void write_cancel(peer_request const& r);
void write_bitfield(std::vector<bool> const& bitfield);
void write_have(int index);
void write_piece(peer_request const& r, char const* buffer);
void write_piece(peer_request const& r, char* buffer);
void write_handshake();
#ifndef TORRENT_DISABLE_EXTENSIONS
void write_extensions();
@ -270,8 +270,17 @@ namespace libtorrent
// these functions encrypt the send buffer if m_rc4_encrypted
// is true, otherwise it passes the call to the
// peer_connection functions of the same names
void send_buffer(char* begin, char* end);
void send_buffer(char* buf, int size);
buffer::interval allocate_send_buffer(int size);
template <class Destructor>
void append_send_buffer(char* buffer, int size, Destructor const& destructor)
{
#ifndef TORRENT_DISABLE_ENCRYPTION
if (m_rc4_encrypted)
m_RC4_handler->encrypt(buffer, size);
#endif
peer_connection::append_send_buffer(buffer, size, destructor);
}
void setup_send();
// Returns offset at which bytestream (src, src + src_size)

View File

@ -0,0 +1,192 @@
/*
Copyright (c) 2007, Arvid Norberg
All rights reserved.
Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions
are met:
* Redistributions of source code must retain the above copyright
notice, this list of conditions and the following disclaimer.
* Redistributions in binary form must reproduce the above copyright
notice, this list of conditions and the following disclaimer in
the documentation and/or other materials provided with the distribution.
* Neither the name of the author nor the names of its
contributors may be used to endorse or promote products derived
from this software without specific prior written permission.
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
POSSIBILITY OF SUCH DAMAGE.
*/
#ifndef TORRENT_CHAINED_BUFFER_HPP_INCLUDED
#define TORRENT_CHAINED_BUFFER_HPP_INCLUDED
#include <boost/function.hpp>
#include <asio/buffer.hpp>
#include <list>
#include <cstring>
namespace libtorrent
{
struct chained_buffer
{
chained_buffer(): m_bytes(0), m_capacity(0) {}
struct buffer_t
{
boost::function<void(char*)> free; // destructs the buffer
char* buf; // the first byte of the buffer
int size; // the total size of the buffer
char* start; // the first byte to send/receive in the buffer
int used_size; // this is the number of bytes to send/receive
};
bool empty() const { return m_bytes == 0; }
int size() const { return m_bytes; }
int capacity() const { return m_capacity; }
void pop_front(int bytes_to_pop)
{
assert(bytes_to_pop <= m_bytes);
while (bytes_to_pop > 0 && !m_vec.empty())
{
buffer_t& b = m_vec.front();
if (b.used_size > bytes_to_pop)
{
b.start += bytes_to_pop;
b.used_size -= bytes_to_pop;
m_bytes -= bytes_to_pop;
assert(m_bytes <= m_capacity);
assert(m_bytes >= 0);
assert(m_capacity >= 0);
break;
}
b.free(b.buf);
m_bytes -= b.used_size;
m_capacity -= b.size;
bytes_to_pop -= b.used_size;
assert(m_bytes >= 0);
assert(m_capacity >= 0);
assert(m_bytes <= m_capacity);
m_vec.pop_front();
}
}
template <class D>
void append_buffer(char* buffer, int size, int used_size, D const& destructor)
{
assert(size >= used_size);
buffer_t b;
b.buf = buffer;
b.size = size;
b.start = buffer;
b.used_size = used_size;
b.free = destructor;
m_vec.push_back(b);
m_bytes += used_size;
m_capacity += size;
assert(m_bytes <= m_capacity);
}
// returns the number of bytes available at the
// end of the last chained buffer.
int space_in_last_buffer()
{
if (m_vec.empty()) return 0;
buffer_t& b = m_vec.back();
return b.size - b.used_size - (b.start - b.buf);
}
// tries to copy the given buffer to the end of the
// last chained buffer. If there's not enough room
// it returns false
bool append(char const* buf, int size)
{
char* insert = allocate_appendix(size);
if (insert == 0) return false;
std::memcpy(insert, buf, size);
return true;
}
// tries to allocate memory from the end
// of the last buffer. If there isn't
// enough room, returns 0
char* allocate_appendix(int size)
{
if (m_vec.empty()) return 0;
buffer_t& b = m_vec.back();
char* insert = b.start + b.used_size;
if (insert + size > b.buf + b.size) return 0;
b.used_size += size;
m_bytes += size;
assert(m_bytes <= m_capacity);
return insert;
}
std::list<asio::const_buffer> const& build_iovec(int to_send)
{
m_tmp_vec.clear();
for (std::list<buffer_t>::iterator i = m_vec.begin()
, end(m_vec.end()); to_send > 0 && i != end; ++i)
{
if (i->used_size > to_send)
{
assert(to_send > 0);
m_tmp_vec.push_back(asio::const_buffer(i->start, to_send));
break;
}
assert(i->used_size > 0);
m_tmp_vec.push_back(asio::const_buffer(i->start, i->used_size));
to_send -= i->used_size;
}
return m_tmp_vec;
}
~chained_buffer()
{
for (std::list<buffer_t>::iterator i = m_vec.begin()
, end(m_vec.end()); i != end; ++i)
{
i->free(i->buf);
}
}
private:
// this is the list of all the buffers we want to
// send
std::list<buffer_t> m_vec;
// this is the number of bytes in the send buf.
// this will always be equal to the sum of the
// size of all buffers in vec
int m_bytes;
// the total size of all buffers in the chain
// including unused space
int m_capacity;
// this is the vector of buffers used when
// invoking the async write call
std::list<asio::const_buffer> m_tmp_vec;
};
}
#endif

View File

@ -60,10 +60,17 @@ namespace libtorrent
{
logger(fs::path const& filename, int instance, bool append = true)
{
fs::path dir(fs::complete("libtorrent_logs" + boost::lexical_cast<std::string>(instance)));
if (!fs::exists(dir)) fs::create_directories(dir);
m_file.open((dir / filename).string().c_str(), std::ios_base::out | (append ? std::ios_base::app : std::ios_base::out));
*this << "\n\n\n*** starting log ***\n";
try
{
fs::path dir(fs::complete("libtorrent_logs" + boost::lexical_cast<std::string>(instance)));
if (!fs::exists(dir)) fs::create_directories(dir);
m_file.open((dir / filename).string().c_str(), std::ios_base::out | (append ? std::ios_base::app : std::ios_base::out));
*this << "\n\n\n*** starting log ***\n";
}
catch (std::exception& e)
{
std::cerr << "failed to create log '" << filename.string() << "': " << e.what() << std::endl;
}
}
template <class T>

View File

@ -94,6 +94,11 @@ namespace libtorrent
disk_io_thread(int block_size = 16 * 1024);
~disk_io_thread();
#ifdef TORRENT_STATS
int disk_allocations() const
{ return m_allocations; }
#endif
// aborts read operations
void stop(boost::intrusive_ptr<piece_manager> s);
void add_job(disk_io_job const& j
@ -110,6 +115,7 @@ namespace libtorrent
void operator()();
char* allocate_buffer();
void free_buffer(char* buf);
private:
@ -129,6 +135,9 @@ namespace libtorrent
#ifdef TORRENT_DISK_STATS
std::ofstream m_log;
#endif
#ifdef TORRENT_STATS
int m_allocations;
#endif
// thread for performing blocking disk io operations
boost::thread m_disk_io_thread;

View File

@ -38,6 +38,7 @@ POSSIBILITY OF SUCH DAMAGE.
namespace libtorrent
{
std::vector<address> enum_net_interfaces(asio::io_service& ios, asio::error_code& ec);
address router_for_interface(address const interface, asio::error_code& ec);
}
#endif

View File

@ -69,13 +69,20 @@ namespace libtorrent
{
public:
http_parser();
template <class T>
T header(char const* key) const;
std::string const& header(char const* key) const
{
static std::string empty;
std::map<std::string, std::string>::const_iterator i
= m_header.find(key);
if (i == m_header.end()) return empty;
return i->second;
}
std::string const& protocol() const { return m_protocol; }
int status_code() const { return m_status_code; }
std::string const& method() const { return m_method; }
std::string const& path() const { return m_path; }
std::string message() const { return m_server_message; }
std::string const& message() const { return m_server_message; }
buffer::const_interval get_body() const;
bool header_finished() const { return m_state == read_body; }
bool finished() const { return m_finished; }
@ -103,15 +110,6 @@ namespace libtorrent
bool m_finished;
};
template <class T>
T http_parser::header(char const* key) const
{
std::map<std::string, std::string>::const_iterator i
= m_header.find(key);
if (i == m_header.end()) return T();
return boost::lexical_cast<T>(i->second);
}
class TORRENT_EXPORT http_tracker_connection
: public tracker_connection
{

View File

@ -60,6 +60,12 @@ namespace libtorrent
delete static_cast<T const*>(s);
}
boost::intrusive_ptr<T> self()
{ return boost::intrusive_ptr<T>((T*)this); }
boost::intrusive_ptr<const T> self() const
{ return boost::intrusive_ptr<const T>((T const*)this); }
int refcount() const { return m_refs; }
intrusive_ptr_base(): m_refs(0) {}

View File

@ -36,6 +36,7 @@ POSSIBILITY OF SUCH DAMAGE.
#include "libtorrent/socket.hpp"
#include "libtorrent/peer_id.hpp"
#include "libtorrent/broadcast_socket.hpp"
#include "libtorrent/intrusive_ptr_base.hpp"
#include <boost/function.hpp>
#include <boost/noncopyable.hpp>
@ -52,7 +53,7 @@ namespace libtorrent
typedef boost::function<void(tcp::endpoint, sha1_hash)> peer_callback_t;
class lsd : boost::noncopyable
class lsd : public intrusive_ptr_base<lsd>
{
public:
lsd(io_service& ios, address const& listen_interface

View File

@ -34,6 +34,7 @@ POSSIBILITY OF SUCH DAMAGE.
#define TORRENT_NATPMP_HPP
#include "libtorrent/socket.hpp"
#include "libtorrent/intrusive_ptr_base.hpp"
#include <boost/function.hpp>
@ -49,7 +50,7 @@ namespace libtorrent
// std::string: error message
typedef boost::function<void(int, int, std::string const&)> portmap_callback_t;
class natpmp
class natpmp : public intrusive_ptr_base<natpmp>
{
public:
natpmp(io_service& ios, address const& listen_interface, portmap_callback_t const& cb);

View File

@ -51,6 +51,7 @@ POSSIBILITY OF SUCH DAMAGE.
#include <boost/array.hpp>
#include <boost/optional.hpp>
#include <boost/cstdint.hpp>
#include <boost/pool/pool.hpp>
#ifdef _MSC_VER
#pragma warning(pop)
@ -73,6 +74,7 @@ POSSIBILITY OF SUCH DAMAGE.
#include "libtorrent/socket_type.hpp"
#include "libtorrent/intrusive_ptr_base.hpp"
#include "libtorrent/assert.hpp"
#include "libtorrent/chained_buffer.hpp"
namespace libtorrent
{
@ -356,14 +358,23 @@ namespace libtorrent
virtual boost::optional<piece_block_progress>
downloading_piece_progress() const
{
#ifdef TORRENT_VERBOSE_LOGGING
(*m_logger) << "downloading_piece_progress() dispatched to the base class!\n";
#endif
#ifdef TORRENT_VERBOSE_LOGGING
(*m_logger) << "downloading_piece_progress() dispatched to the base class!\n";
#endif
return boost::optional<piece_block_progress>();
}
void send_buffer(char const* begin, char const* end);
void send_buffer(char const* begin, int size);
buffer::interval allocate_send_buffer(int size);
template <class Destructor>
void append_send_buffer(char* buffer, int size, Destructor const& destructor)
{
m_send_buffer.append_buffer(buffer, size, size, destructor);
#ifdef TORRENT_STATS
m_ses.m_buffer_usage_logger << log_time() << " append_send_buffer: " << size << std::endl;
m_ses.log_buffer_usage();
#endif
}
void setup_send();
#ifndef TORRENT_DISABLE_RESOLVE_COUNTRIES
@ -376,6 +387,12 @@ namespace libtorrent
bool has_country() const { return m_country[0] != 0; }
#endif
int send_buffer_size() const
{ return m_send_buffer.size(); }
int send_buffer_capacity() const
{ return m_send_buffer.capacity(); }
protected:
virtual void get_specific_peer_info(peer_info& p) const = 0;
@ -388,7 +405,7 @@ namespace libtorrent
virtual void write_cancel(peer_request const& r) = 0;
virtual void write_have(int index) = 0;
virtual void write_keepalive() = 0;
virtual void write_piece(peer_request const& r, char const* buffer) = 0;
virtual void write_piece(peer_request const& r, char* buffer) = 0;
virtual void write_reject_request(peer_request const& r) = 0;
virtual void write_allow_fast(int piece) = 0;
@ -401,13 +418,6 @@ namespace libtorrent
virtual void on_sent(asio::error_code const& error
, std::size_t bytes_transferred) = 0;
int send_buffer_size() const
{
return (int)m_send_buffer[0].size()
+ (int)m_send_buffer[1].size()
- m_write_pos;
}
#ifndef TORRENT_DISABLE_ENCRYPTION
buffer::interval wr_recv_buffer()
{
@ -512,31 +522,13 @@ namespace libtorrent
int m_recv_pos;
buffer m_recv_buffer;
// this is the buffer where data that is
// to be sent is stored until it gets
// consumed by send(). Since asio requires
// the memory buffer that is given to async.
// operations to remain valid until the operation
// finishes, there has to be two buffers. While
// waiting for a async_write operation on one
// buffer, the other is used to write data to
// be queued up.
buffer m_send_buffer[2];
// the current send buffer is the one to write to.
// (m_current_send_buffer + 1) % 2 is the
// buffer we're currently waiting for.
int m_current_send_buffer;
chained_buffer m_send_buffer;
// the number of bytes we are currently reading
// from disk, that will be added to the send
// buffer as soon as they complete
int m_reading_bytes;
// if the sending buffer doesn't finish in one send
// operation, this is the position within that buffer
// where the next operation should continue
int m_write_pos;
// timeouts
ptime m_last_receive;
ptime m_last_sent;

View File

@ -80,9 +80,10 @@ namespace libtorrent
// for peer choking management
void pulse();
struct peer;
// this is called once for every peer we get from
// the tracker, pex, lsd or dht.
void peer_from_tracker(const tcp::endpoint& remote, const peer_id& pid
policy::peer* peer_from_tracker(const tcp::endpoint& remote, const peer_id& pid
, int source, char flags);
// called when an incoming connection is accepted
@ -212,8 +213,8 @@ namespace libtorrent
int num_peers() const { return m_peers.size(); }
typedef std::list<peer>::iterator iterator;
typedef std::list<peer>::const_iterator const_iterator;
typedef std::multimap<address, peer>::iterator iterator;
typedef std::multimap<address, peer>::const_iterator const_iterator;
iterator begin_peer() { return m_peers.begin(); }
iterator end_peer() { return m_peers.end(); }
@ -237,7 +238,7 @@ namespace libtorrent
iterator find_disconnect_candidate();
iterator find_connect_candidate();
std::list<peer> m_peers;
std::multimap<address, peer> m_peers;
torrent* m_torrent;

View File

@ -184,6 +184,9 @@ namespace libtorrent
std::pair<bool, float> check_files(std::vector<bool>& pieces
, int& num_pieces, boost::recursive_mutex& mutex);
// frees a buffer that was returned from a read operation
void free_buffer(char* buf);
void write_resume_data(entry& rd) const;
bool verify_resume_data(entry& rd, std::string& error);

View File

@ -49,6 +49,8 @@ namespace libtorrent
std::strftime(str, 200, "%b %d %X", timeinfo);
return str;
}
std::string log_time();
}
#if (!defined (__MACH__) && !defined (_WIN32) && (!defined(_POSIX_MONOTONIC_CLOCK) \
@ -389,5 +391,6 @@ namespace libtorrent
#endif
#endif
#endif

View File

@ -551,6 +551,7 @@ namespace libtorrent
private:
void on_files_released(int ret, disk_io_job const& j);
void on_torrent_paused(int ret, disk_io_job const& j);
void on_storage_moved(int ret, disk_io_job const& j);
void on_piece_verified(int ret, disk_io_job const& j

View File

@ -37,6 +37,7 @@ POSSIBILITY OF SUCH DAMAGE.
#include "libtorrent/broadcast_socket.hpp"
#include "libtorrent/http_connection.hpp"
#include "libtorrent/connection_queue.hpp"
#include "libtorrent/intrusive_ptr_base.hpp"
#include <boost/function.hpp>
#include <boost/noncopyable.hpp>
@ -62,7 +63,7 @@ namespace libtorrent
// std::string: error message
typedef boost::function<void(int, int, std::string const&)> portmap_callback_t;
class upnp : boost::noncopyable
class upnp : public intrusive_ptr_base<upnp>
{
public:
upnp(io_service& ios, connection_queue& cc

View File

@ -122,7 +122,7 @@ namespace libtorrent
void write_request(peer_request const& r);
void write_cancel(peer_request const& r) {}
void write_have(int index) {}
void write_piece(peer_request const& r, char const* buffer) { assert(false); }
void write_piece(peer_request const& r, char* buffer) { assert(false); }
void write_keepalive() {}
void on_connected();
void write_reject_request(peer_request const&) {}

View File

@ -51,23 +51,6 @@ namespace libtorrent
|| (ip & 0xffff0000) == 0xc0a80000);
}
address_v4 guess_local_address(asio::io_service& ios)
{
// make a best guess of the interface we're using and its IP
udp::resolver r(ios);
udp::resolver::iterator i = r.resolve(udp::resolver::query(asio::ip::host_name(), "0"));
for (;i != udp::resolver_iterator(); ++i)
{
address const& a = i->endpoint().address();
// ignore non-IPv4 addresses
if (!a.is_v4()) break;
// ignore the loopback
if (a.to_v4() == address_v4::loopback()) continue;
}
if (i == udp::resolver_iterator()) return address_v4::any();
return i->endpoint().address().to_v4();
}
bool is_loopback(address const& addr)
{
if (addr.is_v4())
@ -92,6 +75,30 @@ namespace libtorrent
return addr.to_v6() == address_v6::any();
}
address guess_local_address(asio::io_service& ios)
{
// make a best guess of the interface we're using and its IP
asio::error_code ec;
std::vector<address> const& interfaces = enum_net_interfaces(ios, ec);
address ret = address_v4::any();
for (std::vector<address>::const_iterator i = interfaces.begin()
, end(interfaces.end()); i != end; ++i)
{
address const& a = *i;
if (is_loopback(a)
|| is_multicast(a)
|| is_any(a)) continue;
// prefer a v4 address, but return a v6 if
// there are no v4
if (a.is_v4()) return a;
if (ret != address_v4::any())
ret = a;
}
return ret;
}
broadcast_socket::broadcast_socket(asio::io_service& ios
, udp::endpoint const& multicast_endpoint
, receive_handler_t const& handler

View File

@ -251,12 +251,10 @@ namespace libtorrent
(*m_logger) << time_now_string()
<< " ==> DHT_PORT [ " << listen_port << " ]\n";
#endif
buffer::interval packet = allocate_send_buffer(7);
detail::write_uint32(3, packet.begin);
detail::write_uint8(msg_dht_port, packet.begin);
detail::write_uint16(listen_port, packet.begin);
assert(packet.begin == packet.end);
setup_send();
char msg[] = {0,0,0,3, msg_dht_port, 0, 0};
char* ptr = msg + 5;
detail::write_uint16(listen_port, ptr);
send_buffer(msg, sizeof(msg));
}
void bt_peer_connection::write_have_all()
@ -270,8 +268,8 @@ namespace libtorrent
(*m_logger) << time_now_string()
<< " ==> HAVE_ALL\n";
#endif
char buf[] = {0,0,0,1, msg_have_all};
send_buffer(buf, buf + sizeof(buf));
char msg[] = {0,0,0,1, msg_have_all};
send_buffer(msg, sizeof(msg));
}
void bt_peer_connection::write_have_none()
@ -285,8 +283,8 @@ namespace libtorrent
(*m_logger) << time_now_string()
<< " ==> HAVE_NONE\n";
#endif
char buf[] = {0,0,0,1, msg_have_none};
send_buffer(buf, buf + sizeof(buf));
char msg[] = {0,0,0,1, msg_have_none};
send_buffer(msg, sizeof(msg));
}
void bt_peer_connection::write_reject_request(peer_request const& r)
@ -296,22 +294,12 @@ namespace libtorrent
assert(m_sent_handshake && m_sent_bitfield);
assert(associated_torrent().lock()->valid_metadata());
char buf[] = {0,0,0,13, msg_reject_request};
buffer::interval i = allocate_send_buffer(17);
std::copy(buf, buf + 5, i.begin);
i.begin += 5;
// index
detail::write_int32(r.piece, i.begin);
// begin
detail::write_int32(r.start, i.begin);
// length
detail::write_int32(r.length, i.begin);
assert(i.begin == i.end);
setup_send();
char msg[] = {0,0,0,13, msg_reject_request,0,0,0,0, 0,0,0,0, 0,0,0,0};
char* ptr = msg + 5;
detail::write_int32(r.piece, ptr); // index
detail::write_int32(r.start, ptr); // begin
detail::write_int32(r.length, ptr); // length
send_buffer(msg, sizeof(msg));
}
void bt_peer_connection::write_allow_fast(int piece)
@ -321,11 +309,10 @@ namespace libtorrent
assert(m_sent_handshake && m_sent_bitfield);
assert(associated_torrent().lock()->valid_metadata());
char buf[] = {0,0,0,5, msg_allowed_fast, 0, 0, 0, 0};
char* ptr = buf + 5;
char msg[] = {0,0,0,5, msg_allowed_fast, 0, 0, 0, 0};
char* ptr = msg + 5;
detail::write_int32(piece, ptr);
send_buffer(buf, buf + sizeof(buf));
send_buffer(msg, sizeof(msg));
}
void bt_peer_connection::get_specific_peer_info(peer_info& p) const
@ -556,8 +543,8 @@ namespace libtorrent
assert(secret);
hasher h;
const char keyA[] = "keyA";
const char keyB[] = "keyB";
static const char keyA[] = "keyA";
static const char keyB[] = "keyB";
// encryption rc4 longkeys
// outgoing connection : hash ('keyA',S,SKEY)
@ -587,17 +574,16 @@ namespace libtorrent
#endif
}
void bt_peer_connection::send_buffer(char* begin, char* end)
void bt_peer_connection::send_buffer(char* buf, int size)
{
assert (begin);
assert (end);
assert (end > begin);
assert (!m_rc4_encrypted || m_encrypted);
assert(buf);
assert(size > 0);
assert(!m_rc4_encrypted || m_encrypted);
if (m_rc4_encrypted)
m_RC4_handler->encrypt(begin, end - begin);
m_RC4_handler->encrypt(buf, size);
peer_connection::send_buffer(begin, end);
peer_connection::send_buffer(buf, size);
}
buffer::interval bt_peer_connection::allocate_send_buffer(int size)
@ -606,6 +592,7 @@ namespace libtorrent
if (m_rc4_encrypted)
{
assert(m_enc_send_buffer.left() == 0);
m_enc_send_buffer = peer_connection::allocate_send_buffer(size);
return m_enc_send_buffer;
}
@ -620,24 +607,24 @@ namespace libtorrent
{
assert(!m_rc4_encrypted || m_encrypted);
if (m_rc4_encrypted)
if (m_rc4_encrypted && m_enc_send_buffer.left())
{
assert (m_enc_send_buffer.begin);
assert (m_enc_send_buffer.end);
assert (m_enc_send_buffer.left() > 0);
assert(m_enc_send_buffer.begin);
assert(m_enc_send_buffer.end);
m_RC4_handler->encrypt (m_enc_send_buffer.begin, m_enc_send_buffer.left());
m_RC4_handler->encrypt(m_enc_send_buffer.begin, m_enc_send_buffer.left());
m_enc_send_buffer.end = m_enc_send_buffer.begin;
}
peer_connection::setup_send();
}
int bt_peer_connection::get_syncoffset(char const* src, int src_size,
char const* target, int target_size) const
char const* target, int target_size) const
{
assert (target_size >= src_size);
assert (src_size > 0);
assert (src);
assert (target);
assert(target_size >= src_size);
assert(src_size > 0);
assert(src);
assert(target);
int traverse_limit = target_size - src_size;
@ -1288,8 +1275,8 @@ namespace libtorrent
assert(m_sent_handshake && m_sent_bitfield);
char buf[] = {0,0,0,0};
send_buffer(buf, buf + sizeof(buf));
char msg[] = {0,0,0,0};
send_buffer(msg, sizeof(msg));
}
void bt_peer_connection::write_cancel(peer_request const& r)
@ -1299,22 +1286,12 @@ namespace libtorrent
assert(m_sent_handshake && m_sent_bitfield);
assert(associated_torrent().lock()->valid_metadata());
char buf[] = {0,0,0,13, msg_cancel};
buffer::interval i = allocate_send_buffer(17);
std::copy(buf, buf + 5, i.begin);
i.begin += 5;
// index
detail::write_int32(r.piece, i.begin);
// begin
detail::write_int32(r.start, i.begin);
// length
detail::write_int32(r.length, i.begin);
assert(i.begin == i.end);
setup_send();
char msg[17] = {0,0,0,13, msg_cancel};
char* ptr = msg + 5;
detail::write_int32(r.piece, ptr); // index
detail::write_int32(r.start, ptr); // begin
detail::write_int32(r.length, ptr); // length
send_buffer(msg, sizeof(msg));
}
void bt_peer_connection::write_request(peer_request const& r)
@ -1324,22 +1301,13 @@ namespace libtorrent
assert(m_sent_handshake && m_sent_bitfield);
assert(associated_torrent().lock()->valid_metadata());
char buf[] = {0,0,0,13, msg_request};
char msg[17] = {0,0,0,13, msg_request};
char* ptr = msg + 5;
buffer::interval i = allocate_send_buffer(17);
std::copy(buf, buf + 5, i.begin);
i.begin += 5;
// index
detail::write_int32(r.piece, i.begin);
// begin
detail::write_int32(r.start, i.begin);
// length
detail::write_int32(r.length, i.begin);
assert(i.begin == i.end);
setup_send();
detail::write_int32(r.piece, ptr); // index
detail::write_int32(r.start, ptr); // begin
detail::write_int32(r.length, ptr); // length
send_buffer(msg, sizeof(msg));
}
void bt_peer_connection::write_bitfield(std::vector<bool> const& bitfield)
@ -1526,7 +1494,7 @@ namespace libtorrent
if (is_choked()) return;
char msg[] = {0,0,0,1,msg_choke};
send_buffer(msg, msg + sizeof(msg));
send_buffer(msg, sizeof(msg));
}
void bt_peer_connection::write_unchoke()
@ -1536,7 +1504,7 @@ namespace libtorrent
assert(m_sent_handshake && m_sent_bitfield);
char msg[] = {0,0,0,1,msg_unchoke};
send_buffer(msg, msg + sizeof(msg));
send_buffer(msg, sizeof(msg));
}
void bt_peer_connection::write_interested()
@ -1546,7 +1514,7 @@ namespace libtorrent
assert(m_sent_handshake && m_sent_bitfield);
char msg[] = {0,0,0,1,msg_interested};
send_buffer(msg, msg + sizeof(msg));
send_buffer(msg, sizeof(msg));
}
void bt_peer_connection::write_not_interested()
@ -1556,7 +1524,7 @@ namespace libtorrent
assert(m_sent_handshake && m_sent_bitfield);
char msg[] = {0,0,0,1,msg_not_interested};
send_buffer(msg, msg + sizeof(msg));
send_buffer(msg, sizeof(msg));
}
void bt_peer_connection::write_have(int index)
@ -1567,34 +1535,39 @@ namespace libtorrent
assert(index < associated_torrent().lock()->torrent_file().num_pieces());
assert(m_sent_handshake && m_sent_bitfield);
const int packet_size = 9;
char msg[packet_size] = {0,0,0,5,msg_have};
char msg[] = {0,0,0,5,msg_have,0,0,0,0};
char* ptr = msg + 5;
detail::write_int32(index, ptr);
send_buffer(msg, msg + packet_size);
send_buffer(msg, sizeof(msg));
}
void bt_peer_connection::write_piece(peer_request const& r, char const* buffer)
void bt_peer_connection::write_piece(peer_request const& r, char* buffer)
{
INVARIANT_CHECK;
assert(m_sent_handshake && m_sent_bitfield);
const int packet_size = 4 + 5 + 4 + r.length;
boost::shared_ptr<torrent> t = associated_torrent().lock();
assert(t);
buffer::interval i = allocate_send_buffer(packet_size);
detail::write_int32(packet_size-4, i.begin);
detail::write_uint8(msg_piece, i.begin);
detail::write_int32(r.piece, i.begin);
detail::write_int32(r.start, i.begin);
char msg[4 + 1 + 4 + 4];
char* ptr = msg;
assert(r.length <= 16 * 1024);
detail::write_int32(r.length + 1 + 4 + 4, ptr);
detail::write_uint8(msg_piece, ptr);
detail::write_int32(r.piece, ptr);
detail::write_int32(r.start, ptr);
send_buffer(msg, sizeof(msg));
append_send_buffer(buffer, r.length
, boost::bind(&session_impl::free_disk_buffer
, boost::ref(m_ses), _1));
/*
buffer::interval i = allocate_send_buffer(r.length);
std::memcpy(i.begin, buffer, r.length);
assert(i.begin + r.length == i.end);
t->filesystem().free_buffer(buffer);
*/
m_payloads.push_back(range(send_buffer_size() - r.length, r.length));
setup_send();
}
@ -1607,13 +1580,13 @@ namespace libtorrent
: m_id(id), m_pc(pc)
{ assert(pc); }
bool operator()(policy::peer const& p) const
bool operator()(std::pair<const address, policy::peer> const& p) const
{
return p.connection != m_pc
&& p.connection
&& p.connection->pid() == m_id
&& !p.connection->pid().is_all_zeros()
&& p.ip.address() == m_pc->remote().address();
return p.second.connection != m_pc
&& p.second.connection
&& p.second.connection->pid() == m_id
&& !p.second.connection->pid().is_all_zeros()
&& p.second.ip.address() == m_pc->remote().address();
}
peer_id const& m_id;
@ -2308,7 +2281,7 @@ namespace libtorrent
, match_peer_id(pid, this));
if (i != p.end_peer())
{
assert(i->connection->pid() == pid);
assert(i->second.connection->pid() == pid);
// we found another connection with the same peer-id
// which connection should be closed in order to be
// sure that the other end closes the same connection?
@ -2318,7 +2291,7 @@ namespace libtorrent
// if not, we should close the outgoing one.
if (pid < m_ses.get_peer_id() && is_local())
{
i->connection->disconnect();
i->second.connection->disconnect();
}
else
{

View File

@ -37,18 +37,6 @@ POSSIBILITY OF SUCH DAMAGE.
#ifdef TORRENT_DISK_STATS
#include "libtorrent/time.hpp"
#include <boost/lexical_cast.hpp>
namespace
{
std::string log_time()
{
using namespace libtorrent;
static ptime start = time_now();
return boost::lexical_cast<std::string>(
total_milliseconds(time_now() - start));
}
}
#endif
@ -64,7 +52,9 @@ namespace libtorrent
#endif
, m_disk_io_thread(boost::ref(*this))
{
#ifdef TORRENT_STATS
m_allocations = 0;
#endif
#ifdef TORRENT_DISK_STATS
m_log.open("disk_io_thread.log", std::ios::trunc);
#endif
@ -188,9 +178,21 @@ namespace libtorrent
char* disk_io_thread::allocate_buffer()
{
boost::mutex::scoped_lock l(m_mutex);
#ifdef TORRENT_STATS
++m_allocations;
#endif
return (char*)m_pool.ordered_malloc();
}
void disk_io_thread::free_buffer(char* buf)
{
boost::mutex::scoped_lock l(m_mutex);
#ifdef TORRENT_STATS
--m_allocations;
#endif
m_pool.ordered_free(buf);
}
void disk_io_thread::operator()()
{
for (;;)
@ -225,10 +227,14 @@ namespace libtorrent
#ifdef TORRENT_DISK_STATS
m_log << log_time() << " read " << j.buffer_size << std::endl;
#endif
free_buffer = false;
if (j.buffer == 0)
{
l.lock();
j.buffer = (char*)m_pool.ordered_malloc();
#ifdef TORRENT_STATS
++m_allocations;
#endif
l.unlock();
assert(j.buffer_size <= m_block_size);
if (j.buffer == 0)
@ -238,10 +244,6 @@ namespace libtorrent
break;
}
}
else
{
free_buffer = false;
}
ret = j.storage->read_impl(j.buffer, j.piece, j.offset
, j.buffer_size);
@ -301,6 +303,9 @@ namespace libtorrent
{
l.lock();
m_pool.ordered_free(j.buffer);
#ifdef TORRENT_STATS
--m_allocations;
#endif
}
}
}

View File

@ -34,6 +34,12 @@ POSSIBILITY OF SUCH DAMAGE.
#include <sys/ioctl.h>
#include <netinet/in.h>
#include <net/if.h>
#elif defined WIN32
#ifndef WIN32_LEAN_AND_MEAN
#define WIN32_LEAN_AND_MEAN
#endif
#include <windows.h>
#include <iphlpapi.h>
#endif
#include "libtorrent/enum_net.hpp"
@ -136,6 +142,77 @@ namespace libtorrent
return ret;
}
address router_for_interface(address const interface, asio::error_code& ec)
{
#ifdef WIN32
// Load Iphlpapi library
HMODULE iphlp = LoadLibraryA("Iphlpapi.dll");
if (!iphlp)
{
ec = asio::error::fault;
return address_v4::any();
}
// Get GetAdaptersInfo() pointer
typedef DWORD (WINAPI *GetAdaptersInfo_t)(PIP_ADAPTER_INFO, PULONG);
GetAdaptersInfo_t GetAdaptersInfo = (GetAdaptersInfo_t)GetProcAddress(iphlp, "GetAdaptersInfo");
if (!GetAdaptersInfo)
{
FreeLibrary(iphlp);
ec = asio::error::fault;
return address_v4::any();
}
PIP_ADAPTER_INFO adapter_info = 0;
ULONG out_buf_size = 0;
if (GetAdaptersInfo(adapter_info, &out_buf_size) != ERROR_BUFFER_OVERFLOW)
{
FreeLibrary(iphlp);
ec = asio::error::fault;
return address_v4::any();
}
adapter_info = (IP_ADAPTER_INFO*)malloc(out_buf_size);
if (!adapter_info)
{
FreeLibrary(iphlp);
ec = asio::error::fault;
return address_v4::any();
}
address ret;
if (GetAdaptersInfo(adapter_info, &out_buf_size) == NO_ERROR)
{
PIP_ADAPTER_INFO adapter = adapter_info;
while (adapter != 0)
{
if (interface == address::from_string(adapter->IpAddressList.IpAddress.String, ec))
{
ret = address::from_string(adapter->GatewayList.IpAddress.String, ec);
break;
}
adapter = adapter->Next;
}
}
// Free memory
free(adapter_info);
FreeLibrary(iphlp);
return ret;
#else
// TODO: temporary implementation
if (!interface.is_v4())
{
ec = asio::error::fault;
return address_v4::any();
}
return address_v4((interface.to_v4().to_ulong() & 0xffffff00) | 1);
#endif
}
}

View File

@ -235,7 +235,7 @@ void http_connection::on_read(asio::error_code const& e
m_called = true;
char const* data = 0;
std::size_t size = 0;
if (m_bottled)
if (m_bottled && m_parser.header_finished())
{
data = m_parser.get_body().begin;
size = m_parser.get_body().left();
@ -263,7 +263,7 @@ void http_connection::on_read(asio::error_code const& e
if (code >= 300 && code < 400)
{
// attempt a redirect
std::string url = m_parser.header<std::string>("location");
std::string const& url = m_parser.header("location");
if (url.empty())
{
// missing location header

View File

@ -679,7 +679,7 @@ namespace libtorrent
if (m_parser.header_finished())
{
int cl = m_parser.header<int>("content-length");
int cl = atoi(m_parser.header("content-length").c_str());
if (cl > m_settings.tracker_maximum_response_length)
{
fail(-1, "content-length is greater than maximum response length");
@ -718,7 +718,7 @@ namespace libtorrent
return;
}
std::string location = m_parser.header<std::string>("location");
std::string location = m_parser.header("location");
boost::shared_ptr<request_callback> cb = requester();
@ -763,7 +763,7 @@ namespace libtorrent
buffer::const_interval buf(&m_buffer[0] + m_parser.body_start(), &m_buffer[0] + m_recv_pos);
std::string content_encoding = m_parser.header<std::string>("content-encoding");
std::string content_encoding = m_parser.header("content-encoding");
#if defined(TORRENT_VERBOSE_LOGGING) || defined(TORRENT_LOGGING)
if (cb) cb->debug_log("content-encoding: \"" + content_encoding + "\"");

View File

@ -83,7 +83,7 @@ void intrusive_ptr_release(observer const* o)
{
boost::pool<>& p = o->pool_allocator;
o->~observer();
p.ordered_free(const_cast<observer*>(o));
p.free(const_cast<observer*>(o));
}
}

View File

@ -49,8 +49,8 @@ using namespace libtorrent;
namespace libtorrent
{
// defined in upnp.cpp
address_v4 guess_local_address(asio::io_service&);
// defined in broadcast_socket.cpp
address guess_local_address(asio::io_service&);
}
lsd::lsd(io_service& ios, address const& listen_interface
@ -58,7 +58,7 @@ lsd::lsd(io_service& ios, address const& listen_interface
: m_callback(cb)
, m_retry_count(0)
, m_socket(ios, udp::endpoint(address_v4::from_string("239.192.152.143"), 6771)
, bind(&lsd::on_announce, this, _1, _2, _3))
, bind(&lsd::on_announce, self(), _1, _2, _3))
, m_broadcast_timer(ios)
, m_disabled(false)
{
@ -96,7 +96,7 @@ void lsd::announce(sha1_hash const& ih, int listen_port)
#endif
m_broadcast_timer.expires_from_now(milliseconds(250 * m_retry_count));
m_broadcast_timer.async_wait(bind(&lsd::resend_announce, this, _1, msg));
m_broadcast_timer.async_wait(bind(&lsd::resend_announce, self(), _1, msg));
}
void lsd::resend_announce(asio::error_code const& e, std::string msg) try
@ -111,7 +111,7 @@ void lsd::resend_announce(asio::error_code const& e, std::string msg) try
return;
m_broadcast_timer.expires_from_now(milliseconds(250 * m_retry_count));
m_broadcast_timer.async_wait(bind(&lsd::resend_announce, this, _1, msg));
m_broadcast_timer.async_wait(bind(&lsd::resend_announce, self(), _1, msg));
}
catch (std::exception&)
{}
@ -121,48 +121,53 @@ void lsd::on_announce(udp::endpoint const& from, char* buffer
{
using namespace libtorrent::detail;
char* p = buffer;
char* end = buffer + bytes_transferred;
char* line = std::find(p, end, '\n');
for (char* i = p; i < line; ++i) *i = std::tolower(*i);
#if defined(TORRENT_LOGGING) || defined(TORRENT_VERBOSE_LOGGING)
m_log << time_now_string()
<< " <== announce: " << std::string(p, line) << std::endl;
#endif
if (line == end || (line - p >= 9 && std::memcmp("bt-search", p, 9)))
http_parser p;
p.incoming(buffer::const_interval(buffer, buffer + bytes_transferred));
if (!p.header_finished())
{
#if defined(TORRENT_LOGGING) || defined(TORRENT_VERBOSE_LOGGING)
m_log << time_now_string()
<< " *** assumed 'bt-search', ignoring" << std::endl;
m_log << time_now_string()
<< " <== announce: incomplete HTTP message\n";
#endif
return;
}
p = line + 1;
int port = 0;
sha1_hash ih(0);
while (p != end)
if (p.method() != "bt-search")
{
line = std::find(p, end, '\n');
if (line == end) break;
*line = 0;
for (char* i = p; i < line; ++i) *i = std::tolower(*i);
if (line - p >= 5 && memcmp(p, "port:", 5) == 0)
{
p += 5;
while (*p == ' ') ++p;
port = atoi(p);
}
else if (line - p >= 9 && memcmp(p, "infohash:", 9) == 0)
{
p += 9;
while (*p == ' ') ++p;
if (line - p > 40) p[40] = 0;
try { ih = boost::lexical_cast<sha1_hash>(p); }
catch (std::exception&) {}
}
p = line + 1;
#if defined(TORRENT_LOGGING) || defined(TORRENT_VERBOSE_LOGGING)
m_log << time_now_string()
<< " <== announce: invalid HTTP method: " << p.method() << std::endl;
#endif
return;
}
std::string const& port_str = p.header("port");
if (port_str.empty())
{
#if defined(TORRENT_LOGGING) || defined(TORRENT_VERBOSE_LOGGING)
m_log << time_now_string()
<< " <== announce: invalid BT-SEARCH, missing port" << std::endl;
#endif
return;
}
std::string const& ih_str = p.header("infohash");
if (ih_str.empty())
{
#if defined(TORRENT_LOGGING) || defined(TORRENT_VERBOSE_LOGGING)
m_log << time_now_string()
<< " <== announce: invalid BT-SEARCH, missing infohash" << std::endl;
#endif
return;
}
sha1_hash ih(0);
std::istringstream ih_sstr(ih_str);
ih_sstr >> ih;
int port = atoi(port_str.c_str());
if (!ih.is_all_zeros() && port != 0)
{
#if defined(TORRENT_LOGGING) || defined(TORRENT_VERBOSE_LOGGING)

View File

@ -38,6 +38,7 @@ POSSIBILITY OF SUCH DAMAGE.
#include "libtorrent/natpmp.hpp"
#include "libtorrent/io.hpp"
#include "libtorrent/assert.hpp"
#include "libtorrent/enum_net.hpp"
using boost::bind;
using namespace libtorrent;
@ -48,7 +49,7 @@ namespace libtorrent
{
// defined in upnp.cpp
bool is_local(address const& a);
address_v4 guess_local_address(asio::io_service&);
address guess_local_address(asio::io_service&);
}
natpmp::natpmp(io_service& ios, address const& listen_interface, portmap_callback_t const& cb)
@ -71,10 +72,10 @@ natpmp::natpmp(io_service& ios, address const& listen_interface, portmap_callbac
void natpmp::rebind(address const& listen_interface) try
{
address_v4 local = address_v4::any();
if (listen_interface.is_v4() && listen_interface != address_v4::any())
address local = address_v4::any();
if (listen_interface != address_v4::any())
{
local = listen_interface.to_v4();
local = listen_interface;
}
else
{
@ -101,14 +102,12 @@ void natpmp::rebind(address const& listen_interface) try
m_disabled = false;
// assume the router is located on the local
// network as x.x.x.1
udp::endpoint nat_endpoint(
address_v4((local.to_ulong() & 0xffffff00) | 1), 5351);
asio::error_code ec;
udp::endpoint nat_endpoint(router_for_interface(local, ec), 5351);
if (ec)
throw std::runtime_error("cannot retrieve router address");
if (nat_endpoint == m_nat_endpoint) return;
// TODO: find a better way to figure out the router IP
m_nat_endpoint = nat_endpoint;
#if defined(TORRENT_LOGGING) || defined(TORRENT_VERBOSE_LOGGING)
@ -161,7 +160,7 @@ void natpmp::update_mapping(int i, int port)
m_retry_count = 0;
send_map_request(i);
m_socket.async_receive_from(asio::buffer(&m_response_buffer, 16)
, m_remote, bind(&natpmp::on_reply, this, _1, _2));
, m_remote, bind(&natpmp::on_reply, self(), _1, _2));
}
}
@ -194,7 +193,7 @@ void natpmp::send_map_request(int i) try
// linear back-off instead of exponential
++m_retry_count;
m_send_timer.expires_from_now(milliseconds(250 * m_retry_count));
m_send_timer.async_wait(bind(&natpmp::resend_request, this, i, _1));
m_send_timer.async_wait(bind(&natpmp::resend_request, self(), i, _1));
}
catch (std::exception& e)
{
@ -227,7 +226,7 @@ void natpmp::on_reply(asio::error_code const& e
if (m_remote != m_nat_endpoint)
{
m_socket.async_receive_from(asio::buffer(&m_response_buffer, 16)
, m_remote, bind(&natpmp::on_reply, this, _1, _2));
, m_remote, bind(&natpmp::on_reply, self(), _1, _2));
return;
}
@ -346,7 +345,7 @@ void natpmp::update_expiration_timer()
if (min_index >= 0)
{
m_refresh_timer.expires_from_now(min_expire - now);
m_refresh_timer.async_wait(bind(&natpmp::mapping_expired, this, _1, min_index));
m_refresh_timer.async_wait(bind(&natpmp::mapping_expired, self(), _1, min_index));
}
}
@ -369,7 +368,7 @@ void natpmp::refresh_mapping(int i)
m_retry_count = 0;
send_map_request(i);
m_socket.async_receive_from(asio::buffer(&m_response_buffer, 16)
, m_remote, bind(&natpmp::on_reply, this, _1, _2));
, m_remote, bind(&natpmp::on_reply, self(), _1, _2));
}
}

View File

@ -81,9 +81,7 @@ namespace libtorrent
, m_last_unchoke(min_time())
, m_packet_size(0)
, m_recv_pos(0)
, m_current_send_buffer(0)
, m_reading_bytes(0)
, m_write_pos(0)
, m_last_receive(time_now())
, m_last_sent(time_now())
, m_socket(s)
@ -161,9 +159,7 @@ namespace libtorrent
, m_last_unchoke(min_time())
, m_packet_size(0)
, m_recv_pos(0)
, m_current_send_buffer(0)
, m_reading_bytes(0)
, m_write_pos(0)
, m_last_receive(time_now())
, m_last_sent(time_now())
, m_socket(s)
@ -734,6 +730,15 @@ namespace libtorrent
<< " *** PIECE NOT IN REQUEST QUEUE\n";
}
#endif
if (has_peer_choked())
{
// if we're choked and we got a rejection of
// a piece in the allowed fast set, remove it
// from the allow fast set.
std::vector<int>::iterator i = std::find(
m_allowed_fast.begin(), m_allowed_fast.end(), r.piece);
if (i != m_allowed_fast.end()) m_allowed_fast.erase(i);
}
if (m_request_queue.empty())
{
if (m_download_queue.size() < 2)
@ -1601,6 +1606,15 @@ namespace libtorrent
&& t->have_piece(index))
return;
if (index < 0 || index >= int(m_have_piece.size()))
{
#ifdef TORRENT_VERBOSE_LOGGING
(*m_logger) << time_now_string() << " <== INVALID_ALLOWED_FAST [ " << index << " | s: "
<< int(m_have_piece.size()) << " ]\n";
#endif
return;
}
m_allowed_fast.push_back(index);
// if the peer has the piece and we want
@ -2082,8 +2096,7 @@ namespace libtorrent
p.remote_dl_rate = 0;
}
p.send_buffer_size = int(m_send_buffer[0].capacity()
+ m_send_buffer[1].capacity());
p.send_buffer_size = m_send_buffer.capacity();
}
void peer_connection::cut_receive_buffer(int size, int packet_size)
@ -2386,8 +2399,7 @@ namespace libtorrent
shared_ptr<torrent> t = m_torrent.lock();
if (m_bandwidth_limit[upload_channel].quota_left() == 0
&& (!m_send_buffer[m_current_send_buffer].empty()
|| !m_send_buffer[(m_current_send_buffer + 1) & 1].empty())
&& !m_send_buffer.empty()
&& !m_connecting
&& t
&& !m_ignore_bandwidth_limits)
@ -2415,32 +2427,21 @@ namespace libtorrent
assert(!m_writing);
int sending_buffer = (m_current_send_buffer + 1) & 1;
if (m_send_buffer[sending_buffer].empty())
{
// this means we have to swap buffer, because there's no
// previous buffer we're still waiting for.
std::swap(m_current_send_buffer, sending_buffer);
m_write_pos = 0;
}
// send the actual buffer
if (!m_send_buffer[sending_buffer].empty())
if (!m_send_buffer.empty())
{
int amount_to_send = (int)m_send_buffer[sending_buffer].size() - m_write_pos;
int amount_to_send = m_send_buffer.size();
int quota_left = m_bandwidth_limit[upload_channel].quota_left();
if (!m_ignore_bandwidth_limits && amount_to_send > quota_left)
amount_to_send = quota_left;
assert(amount_to_send > 0);
assert(m_write_pos < (int)m_send_buffer[sending_buffer].size());
#ifdef TORRENT_VERBOSE_LOGGING
(*m_logger) << "async_write " << amount_to_send << " bytes\n";
#endif
m_socket->async_write_some(asio::buffer(
&m_send_buffer[sending_buffer][m_write_pos], amount_to_send)
, bind(&peer_connection::on_send_data, self(), _1, _2));
std::list<asio::const_buffer> const& vec = m_send_buffer.build_iovec(amount_to_send);
m_socket->async_write_some(vec, bind(&peer_connection::on_send_data, self(), _1, _2));
m_writing = true;
}
@ -2511,10 +2512,32 @@ namespace libtorrent
m_recv_buffer.resize(m_packet_size);
}
void peer_connection::send_buffer(char const* begin, char const* end)
void peer_connection::send_buffer(char const* buf, int size)
{
buffer& buf = m_send_buffer[m_current_send_buffer];
buf.insert(buf.end(), begin, end);
int free_space = m_send_buffer.space_in_last_buffer();
if (free_space > size) free_space = size;
if (free_space > 0)
{
m_send_buffer.append(buf, free_space);
size -= free_space;
buf += free_space;
#ifdef TORRENT_STATS
m_ses.m_buffer_usage_logger << log_time() << " send_buffer: "
<< free_space << std::endl;
m_ses.log_buffer_usage();
#endif
}
if (size <= 0) return;
std::pair<char*, int> buffer = m_ses.allocate_buffer(size);
assert(buffer.second >= size);
std::memcpy(buffer.first, buf, size);
m_send_buffer.append_buffer(buffer.first, buffer.second, size
, bind(&session_impl::free_buffer, boost::ref(m_ses), _1, buffer.second));
#ifdef TORRENT_STATS
m_ses.m_buffer_usage_logger << log_time() << " send_buffer_alloc: " << size << std::endl;
m_ses.log_buffer_usage();
#endif
setup_send();
}
@ -2522,10 +2545,29 @@ namespace libtorrent
// return value is destructed
buffer::interval peer_connection::allocate_send_buffer(int size)
{
buffer& buf = m_send_buffer[m_current_send_buffer];
buf.resize(buf.size() + size);
buffer::interval ret(&buf[0] + buf.size() - size, &buf[0] + buf.size());
return ret;
char* insert = m_send_buffer.allocate_appendix(size);
if (insert == 0)
{
std::pair<char*, int> buffer = m_ses.allocate_buffer(size);
assert(buffer.second >= size);
m_send_buffer.append_buffer(buffer.first, buffer.second, size
, bind(&session_impl::free_buffer, boost::ref(m_ses), _1, buffer.second));
buffer::interval ret(buffer.first, buffer.first + size);
#ifdef TORRENT_STATS
m_ses.m_buffer_usage_logger << log_time() << " allocate_buffer_alloc: " << size << std::endl;
m_ses.log_buffer_usage();
#endif
return ret;
}
else
{
#ifdef TORRENT_STATS
m_ses.m_buffer_usage_logger << log_time() << " allocate_buffer: " << size << std::endl;
m_ses.log_buffer_usage();
#endif
buffer::interval ret(insert, insert + size);
return ret;
}
}
template<class T>
@ -2647,8 +2689,7 @@ namespace libtorrent
// if we have requests or pending data to be sent or announcements to be made
// we want to send data
return (!m_send_buffer[m_current_send_buffer].empty()
|| !m_send_buffer[(m_current_send_buffer + 1) & 1].empty())
return !m_send_buffer.empty()
&& (m_bandwidth_limit[upload_channel].quota_left() > 0
|| m_ignore_bandwidth_limits)
&& !m_connecting;
@ -2763,6 +2804,9 @@ namespace libtorrent
INVARIANT_CHECK;
assert(m_writing);
m_send_buffer.pop_front(bytes_transferred);
m_writing = false;
if (!m_ignore_bandwidth_limits)
@ -2772,9 +2816,6 @@ namespace libtorrent
(*m_logger) << "wrote " << bytes_transferred << " bytes\n";
#endif
m_write_pos += bytes_transferred;
if (error)
{
#ifdef TORRENT_VERBOSE_LOGGING
@ -2787,34 +2828,11 @@ namespace libtorrent
assert(!m_connecting);
assert(bytes_transferred > 0);
int sending_buffer = (m_current_send_buffer + 1) & 1;
assert(int(m_send_buffer[sending_buffer].size()) >= m_write_pos);
if (int(m_send_buffer[sending_buffer].size()) == m_write_pos)
{
m_send_buffer[sending_buffer].clear();
m_write_pos = 0;
}
m_last_sent = time_now();
on_sent(error, bytes_transferred);
fill_send_buffer();
if (m_choked)
{
for (int i = 0; i < 2; ++i)
{
if (int(m_send_buffer[i].size()) < 64
&& int(m_send_buffer[i].capacity()) > 128)
{
buffer tmp(m_send_buffer[i]);
tmp.swap(m_send_buffer[i]);
assert(m_send_buffer[i].capacity() == m_send_buffer[i].size());
}
}
}
setup_send();
}
catch (std::exception& e)
@ -2876,8 +2894,6 @@ namespace libtorrent
}
}
*/
assert(m_write_pos <= int(m_send_buffer[
(m_current_send_buffer + 1) & 1].size()));
// extremely expensive invariant check
/*

View File

@ -138,26 +138,14 @@ namespace
return free_upload;
}
struct match_peer_address
{
match_peer_address(address const& addr)
: m_addr(addr)
{}
bool operator()(policy::peer const& p) const
{ return p.ip.address() == m_addr; }
address const& m_addr;
};
struct match_peer_endpoint
{
match_peer_endpoint(tcp::endpoint const& ep)
: m_ep(ep)
{}
bool operator()(policy::peer const& p) const
{ return p.ip == m_ep; }
bool operator()(std::pair<const address, policy::peer> const& p) const
{ return p.second.ip == m_ep; }
tcp::endpoint const& m_ep;
};
@ -168,8 +156,8 @@ namespace
: m_id(id_)
{}
bool operator()(policy::peer const& p) const
{ return p.connection && p.connection->pid() == m_id; }
bool operator()(std::pair<const address, policy::peer> const& p) const
{ return p.second.connection && p.second.connection->pid() == m_id; }
peer_id const& m_id;
};
@ -180,11 +168,11 @@ namespace
: m_conn(c)
{}
bool operator()(policy::peer const& p) const
bool operator()(std::pair<const address, policy::peer> const& p) const
{
return p.connection == &m_conn
|| (p.ip == m_conn.remote()
&& p.type == policy::peer::connectable);
return p.second.connection == &m_conn
|| (p.second.ip == m_conn.remote()
&& p.second.type == policy::peer::connectable);
}
peer_connection const& m_conn;
@ -362,35 +350,35 @@ namespace libtorrent
piece_picker* p = 0;
if (m_torrent->has_picker())
p = &m_torrent->picker();
for (std::list<peer>::iterator i = m_peers.begin()
for (iterator i = m_peers.begin()
, end(m_peers.end()); i != end;)
{
if ((ses.m_ip_filter.access(i->ip.address()) & ip_filter::blocked) == 0)
if ((ses.m_ip_filter.access(i->second.ip.address()) & ip_filter::blocked) == 0)
{
++i;
continue;
}
if (i->connection)
if (i->second.connection)
{
i->connection->disconnect();
i->second.connection->disconnect();
if (ses.m_alerts.should_post(alert::info))
{
ses.m_alerts.post_alert(peer_blocked_alert(i->ip.address()
ses.m_alerts.post_alert(peer_blocked_alert(i->second.ip.address()
, "disconnected blocked peer"));
}
assert(i->connection == 0
|| i->connection->peer_info_struct() == 0);
assert(i->second.connection == 0
|| i->second.connection->peer_info_struct() == 0);
}
else
{
if (ses.m_alerts.should_post(alert::info))
{
ses.m_alerts.post_alert(peer_blocked_alert(i->ip.address()
ses.m_alerts.post_alert(peer_blocked_alert(i->second.ip.address()
, "blocked peer removed from peer list"));
}
}
if (p) p->clear_peer(&(*i));
if (p) p->clear_peer(&i->second);
m_peers.erase(i++);
}
}
@ -489,7 +477,7 @@ namespace libtorrent
for (iterator i = m_peers.begin();
i != m_peers.end(); ++i)
{
peer_connection* c = i->connection;
peer_connection* c = i->second.connection;
if (c == 0) continue;
if (c->is_disconnecting()) continue;
@ -497,13 +485,13 @@ namespace libtorrent
// isn't interesting
if (disconnect_peer != m_peers.end()
&& c->is_interesting()
&& !disconnect_peer->connection->is_interesting())
&& !disconnect_peer->second.connection->is_interesting())
continue;
double transferred_amount
= (double)c->statistics().total_payload_download();
time_duration connected_time = now - i->connected;
time_duration connected_time = now - i->second.connected;
double connected_time_in_seconds = total_seconds(connected_time);
@ -513,7 +501,7 @@ namespace libtorrent
// prefer to disconnect uninteresting peers, and secondly slow peers
if (transfer_rate <= slowest_transfer_rate
|| (disconnect_peer != m_peers.end()
&& disconnect_peer->connection->is_interesting()
&& disconnect_peer->second.connection->is_interesting()
&& !c->is_interesting()))
{
slowest_transfer_rate = transfer_rate;
@ -540,21 +528,21 @@ namespace libtorrent
for (iterator i = m_peers.begin(); i != m_peers.end(); ++i)
{
if (i->connection) continue;
if (i->banned) continue;
if (i->type == peer::not_connectable) continue;
if (i->seed && finished) continue;
if (i->failcount >= max_failcount) continue;
if (now - i->connected < seconds(i->failcount * min_reconnect_time))
if (i->second.connection) continue;
if (i->second.banned) continue;
if (i->second.type == peer::not_connectable) continue;
if (i->second.seed && finished) continue;
if (i->second.failcount >= max_failcount) continue;
if (now - i->second.connected < seconds(i->second.failcount * min_reconnect_time))
continue;
if (ses.m_port_filter.access(i->ip.port()) & port_filter::blocked)
if (ses.m_port_filter.access(i->second.ip.port()) & port_filter::blocked)
continue;
assert(i->connected <= now);
assert(i->second.connected <= now);
if (i->connected <= min_connect_time)
if (i->second.connected <= min_connect_time)
{
min_connect_time = i->connected;
min_connect_time = i->second.connected;
candidate = i;
}
}
@ -685,11 +673,14 @@ namespace libtorrent
for (iterator i = m_peers.begin(); i != m_peers.end();)
{
// this timeout has to be customizable!
if (i->connection == 0
&& i->connected != min_time()
&& now - i->connected > minutes(120))
// don't remove banned peers, they should
// remain banned
if (i->second.connection == 0
&& i->second.connected != min_time()
&& !i->second.banned
&& now - i->second.connected > minutes(120))
{
if (p) p->clear_peer(&(*i));
if (p) p->clear_peer(&i->second);
m_peers.erase(i++);
}
else
@ -899,12 +890,12 @@ namespace libtorrent
for (const_iterator i = m_peers.begin();
i != m_peers.end(); ++i)
{
if (!i->connection
|| i->connection->is_connecting()
|| i->connection->is_disconnecting()
|| !i->connection->is_peer_interested())
if (!i->second.connection
|| i->second.connection->is_connecting()
|| i->second.connection->is_disconnecting()
|| !i->second.connection->is_peer_interested())
continue;
if (i->connection->is_choked()) ++ret;
if (i->second.connection->is_choked()) ++ret;
}
return ret;
}
@ -948,23 +939,20 @@ namespace libtorrent
}
else
{
i = std::find_if(
m_peers.begin()
, m_peers.end()
, match_peer_address(c.remote().address()));
i = m_peers.find(c.remote().address());
}
if (i != m_peers.end())
{
if (i->banned)
if (i->second.banned)
throw protocol_error("ip address banned, closing");
if (i->connection != 0)
if (i->second.connection != 0)
{
assert(i->connection != &c);
assert(i->second.connection != &c);
// the new connection is a local (outgoing) connection
// or the current one is already connected
if (!i->connection->is_connecting() || c.is_local())
if (!i->second.connection->is_connecting() || c.is_local())
{
throw protocol_error("duplicate connection, closing");
}
@ -975,10 +963,7 @@ namespace libtorrent
" is connecting and this connection is incoming. closing existing "
"connection in favour of this one");
#endif
i->connection->disconnect();
#ifndef NDEBUG
check_invariant();
#endif
i->second.connection->disconnect();
}
}
}
@ -989,27 +974,23 @@ namespace libtorrent
assert(c.remote() == c.get_socket()->remote_endpoint());
peer p(c.remote(), peer::not_connectable, 0);
m_peers.push_back(p);
i = boost::prior(m_peers.end());
#ifndef NDEBUG
check_invariant();
#endif
i = m_peers.insert(std::make_pair(c.remote().address(), p));
}
assert(m_torrent->connection_for(c.remote()) == &c);
c.set_peer_info(&*i);
assert(i->connection == 0);
c.add_stat(i->prev_amount_download, i->prev_amount_upload);
i->prev_amount_download = 0;
i->prev_amount_upload = 0;
i->connection = &c;
assert(i->connection);
i->connected = time_now();
c.set_peer_info(&i->second);
assert(i->second.connection == 0);
c.add_stat(i->second.prev_amount_download, i->second.prev_amount_upload);
i->second.prev_amount_download = 0;
i->second.prev_amount_upload = 0;
i->second.connection = &c;
assert(i->second.connection);
i->second.connected = time_now();
// m_last_optimistic_disconnect = time_now();
}
void policy::peer_from_tracker(const tcp::endpoint& remote, const peer_id& pid
policy::peer* policy::peer_from_tracker(const tcp::endpoint& remote, const peer_id& pid
, int src, char flags)
{
// too expensive
@ -1017,7 +998,7 @@ namespace libtorrent
// just ignore the obviously invalid entries
if (remote.address() == address() || remote.port() == 0)
return;
return 0;
aux::session_impl& ses = m_torrent->session();
@ -1029,7 +1010,7 @@ namespace libtorrent
ses.m_alerts.post_alert(peer_blocked_alert(remote.address()
, "outgoing port blocked, peer not added to peer list"));
}
return;
return 0;
}
try
@ -1038,23 +1019,15 @@ namespace libtorrent
if (m_torrent->settings().allow_multiple_connections_per_ip)
{
i = std::find_if(
m_peers.begin()
, m_peers.end()
, match_peer_endpoint(remote));
std::pair<iterator, iterator> range = m_peers.equal_range(remote.address());
i = std::find_if(range.first, range.second, match_peer_endpoint(remote));
if (i == m_peers.end())
i = std::find_if(
m_peers.begin()
, m_peers.end()
, match_peer_id(pid));
if (i == range.second)
i = std::find_if(m_peers.begin(), m_peers.end(), match_peer_id(pid));
}
else
{
i = std::find_if(
m_peers.begin()
, m_peers.end()
, match_peer_address(remote.address()));
i = m_peers.find(remote.address());
}
if (i == m_peers.end())
@ -1067,20 +1040,17 @@ namespace libtorrent
ses.m_alerts.post_alert(peer_blocked_alert(remote.address()
, "blocked peer not added to peer list"));
}
return;
return 0;
}
// we don't have any info about this peer.
// add a new entry
peer p(remote, peer::connectable, src);
m_peers.push_back(p);
// the iterator is invalid
// because of the push_back()
i = boost::prior(m_peers.end());
i = m_peers.insert(std::make_pair(remote.address(), p));
#ifndef TORRENT_DISABLE_ENCRYPTION
if (flags & 0x01) p.pe_support = true;
#endif
if (flags & 0x02) i->seed = true;
if (flags & 0x02) i->second.seed = true;
// try to send a DHT ping to this peer
// as well, to figure out if it supports
@ -1093,42 +1063,42 @@ namespace libtorrent
}
else
{
i->type = peer::connectable;
i->second.type = peer::connectable;
// in case we got the ip from a remote connection, port is
// not known, so save it. Client may also have changed port
// for some reason.
i->ip = remote;
i->source |= src;
i->second.ip = remote;
i->second.source |= src;
// if this peer has failed before, decrease the
// counter to allow it another try, since somebody
// else is appearantly able to connect to it
// if it comes from the DHT it might be stale though
if (i->failcount > 0 && src != peer_info::dht)
--i->failcount;
if (i->second.failcount > 0 && src != peer_info::dht)
--i->second.failcount;
// if we're connected to this peer
// we already know if it's a seed or not
// so we don't have to trust this source
if ((flags & 0x02) && !i->connection) i->seed = true;
if ((flags & 0x02) && !i->second.connection) i->second.seed = true;
if (i->connection)
#if defined(TORRENT_VERBOSE_LOGGING) || defined(TORRENT_LOGGING)
if (i->second.connection)
{
// this means we're already connected
// to this peer. don't connect to
// it again.
#if defined(TORRENT_VERBOSE_LOGGING) || defined(TORRENT_LOGGING)
m_torrent->debug_log("already connected to peer: " + remote.address().to_string() + ":"
+ boost::lexical_cast<std::string>(remote.port()) + " "
+ boost::lexical_cast<std::string>(i->connection->pid()));
#endif
+ boost::lexical_cast<std::string>(i->second.connection->pid()));
assert(i->connection->associated_torrent().lock().get() == m_torrent);
return;
assert(i->second.connection->associated_torrent().lock().get() == m_torrent);
}
#endif
}
return &i->second;
}
catch(std::exception& e)
{
@ -1138,6 +1108,7 @@ namespace libtorrent
peer_error_alert(remote, pid, e.what()));
}
}
return 0;
}
// this is called when we are choked by a peer
@ -1159,12 +1130,12 @@ namespace libtorrent
for (iterator i = m_peers.begin();
i != m_peers.end(); ++i)
{
if (i->connection == 0) continue;
if (i->second.connection == 0) continue;
// if we're not interested, we will not become interested
if (!i->connection->is_interesting()) continue;
if (!i->connection->has_piece(index)) continue;
if (!i->second.connection->is_interesting()) continue;
if (!i->second.connection->has_piece(index)) continue;
i->connection->update_interest();
i->second.connection->update_interest();
}
}
}
@ -1187,8 +1158,8 @@ namespace libtorrent
// INVARIANT_CHECK;
assert(std::find_if(m_peers.begin(), m_peers.end()
, boost::bind<bool>(std::equal_to<peer_connection*>(), bind(&peer::connection, _1)
, &c)) != m_peers.end());
, boost::bind<bool>(std::equal_to<peer_connection*>(), bind(&peer::connection
, bind(&iterator::value_type::second, _1)), &c)) != m_peers.end());
// if the peer is choked and we have upload slots left,
// then unchoke it. Another condition that has to be met
@ -1303,23 +1274,23 @@ namespace libtorrent
iterator p = find_connect_candidate();
if (p == m_peers.end()) return false;
assert(!p->banned);
assert(!p->connection);
assert(p->type == peer::connectable);
assert(!p->second.banned);
assert(!p->second.connection);
assert(p->second.type == peer::connectable);
try
{
p->connected = time_now();
p->connection = m_torrent->connect_to_peer(&*p);
assert(p->connection == m_torrent->connection_for(p->ip));
if (p->connection == 0)
p->second.connected = time_now();
p->second.connection = m_torrent->connect_to_peer(&p->second);
assert(p->second.connection == m_torrent->connection_for(p->second.ip));
if (p->second.connection == 0)
{
++p->failcount;
++p->second.failcount;
return false;
}
p->connection->add_stat(p->prev_amount_download, p->prev_amount_upload);
p->prev_amount_download = 0;
p->prev_amount_upload = 0;
p->second.connection->add_stat(p->second.prev_amount_download, p->second.prev_amount_upload);
p->second.prev_amount_download = 0;
p->second.prev_amount_upload = 0;
return true;
}
catch (std::exception& e)
@ -1329,7 +1300,7 @@ namespace libtorrent
<< e.what() << "'\n";
#endif
std::cerr << e.what() << std::endl;
++p->failcount;
++p->second.failcount;
return false;
}
}
@ -1340,10 +1311,10 @@ namespace libtorrent
if (p == m_peers.end())
return false;
#if defined(TORRENT_VERBOSE_LOGGING)
(*p->connection->m_logger) << "*** CLOSING CONNECTION 'too many connections'\n";
(*p->second.connection->m_logger) << "*** CLOSING CONNECTION 'too many connections'\n";
#endif
p->connection->disconnect();
p->second.connection->disconnect();
return true;
}
@ -1425,21 +1396,23 @@ namespace libtorrent
int total_connections = 0;
int nonempty_connections = 0;
std::set<address> unique_test;
std::set<tcp::endpoint> unique_test2;
std::set<tcp::endpoint> unique_test;
for (const_iterator i = m_peers.begin();
i != m_peers.end(); ++i)
{
peer const& p = *i;
peer const& p = i->second;
if (!m_torrent->settings().allow_multiple_connections_per_ip)
assert(unique_test.find(p.ip.address()) == unique_test.end());
assert(unique_test2.find(p.ip) == unique_test2.end());
unique_test.insert(p.ip.address());
unique_test2.insert(p.ip);
{
assert(m_peers.count(p.ip.address()) == 1);
}
else
{
assert(unique_test.count(p.ip) == 0);
unique_test.insert(p.ip);
}
++total_connections;
if (!p.connection)
{
// assert(m_torrent->connection_for(p.ip) == 0);
continue;
}
if (!m_torrent->settings().allow_multiple_connections_per_ip)
@ -1493,10 +1466,8 @@ namespace libtorrent
{
policy::peer* p = static_cast<policy::peer*>(*i);
if (p == 0) continue;
std::list<peer>::const_iterator k = m_peers.begin();
for (; k != m_peers.end(); ++k)
if (&(*k) == p) break;
assert(k != m_peers.end());
assert(std::find_if(m_peers.begin(), m_peers.end()
, match_peer_connection(*p->connection)) != m_peers.end());
}
}

View File

@ -83,6 +83,14 @@ using libtorrent::aux::session_impl;
namespace libtorrent
{
std::string log_time()
{
static const ptime start = time_now();
char ret[200];
std::sprintf(ret, "%d", total_milliseconds(time_now() - start));
return ret;
}
namespace aux
{
filesystem_init::filesystem_init()

View File

@ -205,6 +205,15 @@ namespace detail
// lock the session to add the new torrent
session_impl::mutex_t::scoped_lock l(m_ses.m_mutex);
mutex::scoped_lock l2(m_mutex);
if (m_torrents.empty() || m_torrents.front() != t)
{
// this means the torrent was removed right after it was
// added. Abort the checking.
t.reset();
continue;
}
// clear the resume data now that it has been used
// (the fast resume data is now parsed and stored in t)
t->resume_data = entry();
@ -214,6 +223,7 @@ namespace detail
{
INVARIANT_CHECK;
assert(!m_torrents.empty());
assert(m_torrents.front() == t);
t->torrent_ptr->files_checked(t->unfinished_pieces);
@ -244,6 +254,14 @@ namespace detail
t->torrent_ptr->get_policy().peer_from_tracker(*i, id
, peer_info::resume_data, 0);
}
for (std::vector<tcp::endpoint>::const_iterator i = t->banned_peers.begin();
i != t->banned_peers.end(); ++i)
{
policy::peer* p = t->torrent_ptr->get_policy().peer_from_tracker(*i, id
, peer_info::resume_data, 0);
if (p) p->banned = true;
}
}
else
{
@ -507,7 +525,8 @@ namespace detail
std::pair<int, int> listen_port_range
, fingerprint const& cl_fprint
, char const* listen_interface)
: m_strand(m_io_service)
: m_send_buffers(send_buffer_size)
, m_strand(m_io_service)
, m_files(40)
, m_half_open(m_io_service)
, m_download_channel(m_io_service, peer_connection::download_channel)
@ -546,7 +565,7 @@ namespace detail
#endif
#ifdef TORRENT_STATS
m_stats_logger.open("session_stats.log");
m_stats_logger.open("session_stats.log", std::ios::trunc);
m_stats_logger <<
"1. second\n"
"2. upload rate\n"
@ -555,7 +574,9 @@ namespace detail
"5. seeding torrents\n"
"6. peers\n"
"7. connecting peers\n"
"8. disk block buffers\n"
"\n";
m_buffer_usage_logger.open("buffer_stats.log", std::ios::trunc);
m_second_counter = 0;
#endif
@ -999,7 +1020,8 @@ namespace detail
{
session_impl::mutex_t::scoped_lock l(m_mutex);
INVARIANT_CHECK;
// too expensive
// INVARIANT_CHECK;
if (e)
{
@ -1031,7 +1053,7 @@ namespace detail
else
++downloading_torrents;
}
int num_connections = 0;
int num_complete_connections = 0;
int num_half_open = 0;
for (connection_map::iterator i = m_connections.begin()
, end(m_connections.end()); i != end; ++i)
@ -1039,7 +1061,7 @@ namespace detail
if (i->second->is_connecting())
++num_half_open;
else
++num_connections;
++num_complete_connections;
}
m_stats_logger
@ -1048,8 +1070,9 @@ namespace detail
<< m_stat.download_rate() << "\t"
<< downloading_torrents << "\t"
<< seeding_torrents << "\t"
<< num_connections << "\t"
<< num_complete_connections << "\t"
<< num_half_open << "\t"
<< m_disk_thread.disk_allocations() << "\t"
<< std::endl;
#endif
@ -1755,7 +1778,6 @@ namespace detail
assert(m_torrents.find(i_hash) == m_torrents.end());
return;
}
l.unlock();
if (h.m_chk)
{
@ -2195,9 +2217,9 @@ namespace detail
INVARIANT_CHECK;
m_lsd.reset(new lsd(m_io_service
m_lsd = new lsd(m_io_service
, m_listen_interface.address()
, bind(&session_impl::on_lsd_peer, this, _1, _2)));
, bind(&session_impl::on_lsd_peer, this, _1, _2));
}
void session_impl::start_natpmp()
@ -2206,10 +2228,10 @@ namespace detail
INVARIANT_CHECK;
m_natpmp.reset(new natpmp(m_io_service
m_natpmp = new natpmp(m_io_service
, m_listen_interface.address()
, bind(&session_impl::on_port_mapping
, this, _1, _2, _3)));
, this, _1, _2, _3));
m_natpmp->set_mappings(m_listen_interface.port(),
#ifndef TORRENT_DISABLE_DHT
@ -2224,11 +2246,11 @@ namespace detail
INVARIANT_CHECK;
m_upnp.reset(new upnp(m_io_service, m_half_open
m_upnp = new upnp(m_io_service, m_half_open
, m_listen_interface.address()
, m_settings.user_agent
, bind(&session_impl::on_port_mapping
, this, _1, _2, _3)));
, this, _1, _2, _3));
m_upnp->set_mappings(m_listen_interface.port(),
#ifndef TORRENT_DISABLE_DHT
@ -2240,7 +2262,7 @@ namespace detail
void session_impl::stop_lsd()
{
mutex_t::scoped_lock l(m_mutex);
m_lsd.reset();
m_lsd = 0;
}
void session_impl::stop_natpmp()
@ -2248,7 +2270,7 @@ namespace detail
mutex_t::scoped_lock l(m_mutex);
if (m_natpmp.get())
m_natpmp->close();
m_natpmp.reset();
m_natpmp = 0;
}
void session_impl::stop_upnp()
@ -2256,9 +2278,38 @@ namespace detail
mutex_t::scoped_lock l(m_mutex);
if (m_upnp.get())
m_upnp->close();
m_upnp.reset();
m_upnp = 0;
}
void session_impl::free_disk_buffer(char* buf)
{
m_disk_thread.free_buffer(buf);
}
std::pair<char*, int> session_impl::allocate_buffer(int size)
{
int num_buffers = (size + send_buffer_size - 1) / send_buffer_size;
#ifdef TORRENT_STATS
m_buffer_allocations += num_buffers;
m_buffer_usage_logger << log_time() << " protocol_buffer: "
<< (m_buffer_allocations * send_buffer_size) << std::endl;
#endif
return std::make_pair((char*)m_send_buffers.ordered_malloc(num_buffers)
, num_buffers * send_buffer_size);
}
void session_impl::free_buffer(char* buf, int size)
{
assert(size % send_buffer_size == 0);
int num_buffers = size / send_buffer_size;
#ifdef TORRENT_STATS
m_buffer_allocations -= num_buffers;
assert(m_buffer_allocations >= 0);
m_buffer_usage_logger << log_time() << " protocol_buffer: "
<< (m_buffer_allocations * send_buffer_size) << std::endl;
#endif
m_send_buffers.ordered_free(buf, num_buffers);
}
#ifndef NDEBUG
void session_impl::check_invariant() const
@ -2328,9 +2379,9 @@ namespace detail
// the peers
if (rd.find_key("peers"))
if (entry* peers_entry = rd.find_key("peers"))
{
entry::list_type& peer_list = rd["peers"].list();
entry::list_type& peer_list = peers_entry->list();
std::vector<tcp::endpoint> tmp_peers;
tmp_peers.reserve(peer_list.size());
@ -2346,6 +2397,24 @@ namespace detail
peers.swap(tmp_peers);
}
if (entry* banned_peers_entry = rd.find_key("banned_peers"))
{
entry::list_type& peer_list = banned_peers_entry->list();
std::vector<tcp::endpoint> tmp_peers;
tmp_peers.reserve(peer_list.size());
for (entry::list_type::iterator i = peer_list.begin();
i != peer_list.end(); ++i)
{
tcp::endpoint a(
address::from_string((*i)["ip"].string())
, (unsigned short)(*i)["port"].integer());
tmp_peers.push_back(a);
}
banned_peers.swap(tmp_peers);
}
// read piece map
const entry::list_type& slots = rd["slots"].list();
if ((int)slots.size() > info.num_pieces())

View File

@ -1065,6 +1065,11 @@ namespace libtorrent
return m_storage->verify_resume_data(rd, error);
}
void piece_manager::free_buffer(char* buf)
{
m_io_thread.free_buffer(buf);
}
void piece_manager::async_release_files(
boost::function<void(int, disk_io_job const&)> const& handler)
{
@ -1243,7 +1248,7 @@ namespace libtorrent
, block_size);
crc.update(&buf[0], block_size);
}
if (bi[num_blocks - 1].state == piece_picker::block_info::state_finished)
if (num_blocks > 0 && bi[num_blocks - 1].state == piece_picker::block_info::state_finished)
{
m_storage->read(
&buf[0]

View File

@ -775,6 +775,8 @@ namespace libtorrent
if (total_done >= m_torrent_file->total_size())
{
// Thist happens when a piece has been downloaded completely
// but not yet verified against the hash
std::copy(m_have_pieces.begin(), m_have_pieces.end()
, std::ostream_iterator<bool>(std::cerr, " "));
std::cerr << std::endl;
@ -1022,11 +1024,26 @@ namespace libtorrent
#endif
disconnect_all();
if (m_owning_storage.get()) m_storage->async_release_files();
if (m_owning_storage.get())
m_storage->async_release_files(
bind(&torrent::on_files_released, shared_from_this(), _1, _2));
m_owning_storage = 0;
}
void torrent::on_files_released(int ret, disk_io_job const& j)
{
/*
session_impl::mutex_t::scoped_lock l(m_ses.m_mutex);
if (alerts().should_post(alert::warning))
{
alerts().post_alert(torrent_paused_alert(get_handle(), "torrent paused"));
}
*/
}
void torrent::on_torrent_paused(int ret, disk_io_job const& j)
{
session_impl::mutex_t::scoped_lock l(m_ses.m_mutex);
@ -1464,9 +1481,6 @@ namespace libtorrent
m_policy->connection_closed(*p);
p->set_peer_info(0);
m_connections.erase(i);
#ifndef NDEBUG
m_policy->check_invariant();
#endif
}
catch (std::exception& e)
{
@ -1731,7 +1745,7 @@ namespace libtorrent
m_resolving_country = false;
// must be ordered in increasing order
country_entry country_map[] =
static const country_entry country_map[] =
{
{ 4, "AF"}, { 8, "AL"}, { 10, "AQ"}, { 12, "DZ"}, { 16, "AS"}
, { 20, "AD"}, { 24, "AO"}, { 28, "AG"}, { 31, "AZ"}, { 32, "AR"}
@ -1801,7 +1815,7 @@ namespace libtorrent
// look up the country code in the map
const int size = sizeof(country_map)/sizeof(country_map[0]);
country_entry tmp = {country, ""};
country_entry* i =
country_entry const* i =
std::lower_bound(country_map, country_map + size, tmp
, bind(&country_entry::code, _1) < bind(&country_entry::code, _2));
if (i == country_map + size
@ -2118,7 +2132,9 @@ namespace libtorrent
, bind(&peer_connection::disconnect, _1));
assert(m_storage);
m_storage->async_release_files();
// we need to keep the object alive during this operation
m_storage->async_release_files(
bind(&torrent::on_files_released, shared_from_this(), _1, _2));
}
// called when torrent is complete (all pieces downloaded)
@ -2381,6 +2397,8 @@ namespace libtorrent
#ifndef NDEBUG
void torrent::check_invariant() const
{
session_impl::mutex_t::scoped_lock l(m_ses.m_mutex);
int num_uploads = 0;
std::map<piece_block, int> num_requests;
for (const_peer_iterator i = begin(); i != end(); ++i)
@ -2550,10 +2568,8 @@ namespace libtorrent
if (m_owning_storage.get())
{
assert(m_storage);
// TOOD: add a callback which posts
// an alert for the client to sync. with
m_storage->async_release_files(
bind(&torrent::on_files_released, shared_from_this(), _1, _2));
bind(&torrent::on_torrent_paused, shared_from_this(), _1, _2));
}
}

View File

@ -363,7 +363,7 @@ namespace libtorrent
aux::piece_checker_data* d = m_chk->find_torrent(m_info_hash);
if (d != 0)
{
torrent_status st;
torrent_status st = d->torrent_ptr->status();
if (d->processing)
{
@ -737,14 +737,23 @@ namespace libtorrent
}
// write local peers
ret["peers"] = entry::list_type();
entry::list_type& peer_list = ret["peers"].list();
entry::list_type& banned_peer_list = ret["banned_peers"].list();
policy& pol = t->get_policy();
for (policy::iterator i = pol.begin_peer()
, end(pol.end_peer()); i != end; ++i)
{
if (i->second.banned)
{
tcp::endpoint ip = i->second.ip;
entry peer(entry::dictionary_t);
peer["ip"] = ip.address().to_string();
peer["port"] = ip.port();
banned_peer_list.push_back(peer);
continue;
}
// we cannot save remote connection
// since we don't know their listen port
// unless they gave us their listen port
@ -752,10 +761,9 @@ namespace libtorrent
// so, if the peer is not connectable (i.e. we
// don't know its listen port) or if it has
// been banned, don't save it.
if (i->type == policy::peer::not_connectable
|| i->banned) continue;
if (i->second.type == policy::peer::not_connectable) continue;
tcp::endpoint ip = i->ip;
tcp::endpoint ip = i->second.ip;
entry peer(entry::dictionary_t);
peer["ip"] = ip.address().to_string();
peer["port"] = ip.port();

View File

@ -56,7 +56,7 @@ using namespace libtorrent;
namespace libtorrent
{
bool is_local(address const& a);
address_v4 guess_local_address(asio::io_service&);
address guess_local_address(asio::io_service&);
}
upnp::upnp(io_service& ios, connection_queue& cc
@ -70,7 +70,7 @@ upnp::upnp(io_service& ios, connection_queue& cc
, m_io_service(ios)
, m_strand(ios)
, m_socket(ios, udp::endpoint(address_v4::from_string("239.255.255.250"), 1900)
, m_strand.wrap(bind(&upnp::on_reply, this, _1, _2, _3)), false)
, m_strand.wrap(bind(&upnp::on_reply, self(), _1, _2, _3)), false)
, m_broadcast_timer(ios)
, m_refresh_timer(ios)
, m_disabled(false)
@ -119,7 +119,7 @@ void upnp::discover_device() try
++m_retry_count;
m_broadcast_timer.expires_from_now(milliseconds(250 * m_retry_count));
m_broadcast_timer.async_wait(m_strand.wrap(bind(&upnp::resend_request
, this, _1)));
, self(), _1)));
#ifdef TORRENT_UPNP_LOGGING
m_log << time_now_string()
@ -203,7 +203,7 @@ try
try
{
d.upnp_connection.reset(new http_connection(m_io_service
, m_cc, m_strand.wrap(bind(&upnp::on_upnp_xml, this, _1, _2
, m_cc, m_strand.wrap(bind(&upnp::on_upnp_xml, self(), _1, _2
, boost::ref(d)))));
d.upnp_connection->get(d.url);
}
@ -300,7 +300,7 @@ try
return;
}
std::string url = p.header<std::string>("location");
std::string url = p.header("location");
if (url.empty())
{
#ifdef TORRENT_UPNP_LOGGING
@ -393,7 +393,7 @@ try
try
{
d.upnp_connection.reset(new http_connection(m_io_service
, m_cc, m_strand.wrap(bind(&upnp::on_upnp_xml, this, _1, _2
, m_cc, m_strand.wrap(bind(&upnp::on_upnp_xml, self(), _1, _2
, boost::ref(d)))));
d.upnp_connection->get(d.url);
}
@ -480,9 +480,9 @@ void upnp::map_port(rootdevice& d, int i)
assert(d.service_namespace);
d.upnp_connection.reset(new http_connection(m_io_service
, m_cc, m_strand.wrap(bind(&upnp::on_upnp_map_response, this, _1, _2
, m_cc, m_strand.wrap(bind(&upnp::on_upnp_map_response, self(), _1, _2
, boost::ref(d), i)), true
, bind(&upnp::create_port_mapping, this, _1, boost::ref(d), i)));
, bind(&upnp::create_port_mapping, self(), _1, boost::ref(d), i)));
d.upnp_connection->start(d.hostname, boost::lexical_cast<std::string>(d.port)
, seconds(10));
@ -523,9 +523,9 @@ void upnp::unmap_port(rootdevice& d, int i)
return;
}
d.upnp_connection.reset(new http_connection(m_io_service
, m_cc, m_strand.wrap(bind(&upnp::on_upnp_unmap_response, this, _1, _2
, m_cc, m_strand.wrap(bind(&upnp::on_upnp_unmap_response, self(), _1, _2
, boost::ref(d), i)), true
, bind(&upnp::delete_port_mapping, this, boost::ref(d), i)));
, bind(&upnp::delete_port_mapping, self(), boost::ref(d), i)));
d.upnp_connection->start(d.hostname, boost::lexical_cast<std::string>(d.port)
, seconds(10));
@ -851,7 +851,7 @@ void upnp::on_upnp_map_response(asio::error_code const& e
|| next_expire > d.mapping[mapping].expires)
{
m_refresh_timer.expires_at(d.mapping[mapping].expires);
m_refresh_timer.async_wait(m_strand.wrap(bind(&upnp::on_expire, this, _1)));
m_refresh_timer.async_wait(m_strand.wrap(bind(&upnp::on_expire, self(), _1)));
}
}
else
@ -962,7 +962,7 @@ void upnp::on_expire(asio::error_code const& e) try
if (next_expire != max_time())
{
m_refresh_timer.expires_at(next_expire);
m_refresh_timer.async_wait(m_strand.wrap(bind(&upnp::on_expire, this, _1)));
m_refresh_timer.async_wait(m_strand.wrap(bind(&upnp::on_expire, self(), _1)));
}
}
catch (std::exception&)

View File

@ -297,7 +297,7 @@ namespace libtorrent
(*m_logger) << request << "\n";
#endif
send_buffer(request.c_str(), request.c_str() + request.size());
send_buffer(request.c_str(), request.size());
}
// --------------------------
@ -387,7 +387,7 @@ namespace libtorrent
{
// this means we got a redirection request
// look for the location header
std::string location = m_parser.header<std::string>("location");
std::string location = m_parser.header("location");
if (location.empty())
{
@ -423,7 +423,7 @@ namespace libtorrent
throw std::runtime_error("redirecting to " + location);
}
std::string server_version = m_parser.header<std::string>("server");
std::string const& server_version = m_parser.header("server");
if (!server_version.empty())
{
m_server_string = "URL seed @ ";
@ -445,7 +445,7 @@ namespace libtorrent
size_type range_end;
if (m_parser.status_code() == 206)
{
std::stringstream range_str(m_parser.header<std::string>("content-range"));
std::stringstream range_str(m_parser.header("content-range"));
char dummy;
std::string bytes;
range_str >> bytes >> range_start >> dummy >> range_end;
@ -461,7 +461,7 @@ namespace libtorrent
else
{
range_start = 0;
range_end = m_parser.header<size_type>("content-length");
range_end = atol(m_parser.header("content-length").c_str());
if (range_end == -1)
{
// we should not try this server again.