Implement Events (formerly Signals) which are emitted from the daemon to interested clients.

This commit is contained in:
Andrew Resch 2009-02-03 04:13:41 +00:00
parent aa89c653f9
commit a24738a9ff
23 changed files with 437 additions and 446 deletions

View File

@ -40,7 +40,7 @@ from deluge.log import LOG as log
class AlertManager(component.Component):
def __init__(self, session):
log.debug("AlertManager initialized..")
component.Component.__init__(self, "AlertManager", interval=50)
component.Component.__init__(self, "AlertManager", interval=0.05)
self.session = session
self.session.set_alert_mask(

View File

@ -44,6 +44,7 @@ except ImportError:
import deluge.configmanager
import deluge.common
import deluge.component as component
from deluge.event import *
from deluge.core.torrentmanager import TorrentManager
from deluge.core.pluginmanager import PluginManager
from deluge.core.alertmanager import AlertManager
@ -212,7 +213,7 @@ class Core(component.Component):
return 1
if VersionSplit(self.new_release) > VersionSplit(deluge.common.get_version()):
self.signalmanager.emit("new_version_available", self.new_release)
component.get("RPCServer").emit_event(NewVersionAvailableEvent(self.new_release))
return self.new_release
return False
@ -240,7 +241,7 @@ class Core(component.Component):
except Exception, e:
log.error("There was an error adding the torrent file %s", filename)
log.exception(e)
else:
# Run the plugin hooks for 'post_torrent_add'
self.pluginmanager.run_post_torrent_add(torrent_id)
@ -364,7 +365,7 @@ class Core(component.Component):
def resume_all_torrents(self):
"""Resume all torrents in the session"""
self.session.resume()
self.signalmanager.emit("torrent_all_resumed")
component.get("RPCServer").emit_event(SessionResumedEvent())
@export()
def resume_torrent(self, torrent_ids):
@ -674,7 +675,7 @@ class Core(component.Component):
try:
# If the queue method returns True, then we should emit a signal
if self.torrentmanager.queue_top(torrent_id):
self.signalmanager.emit("torrent_queue_changed")
component.get("RPCServer").emit_event(TorrentQueueChangedEvent())
except KeyError:
log.warning("torrent_id: %s does not exist in the queue", torrent_id)
@ -687,7 +688,7 @@ class Core(component.Component):
try:
# If the queue method returns True, then we should emit a signal
if self.torrentmanager.queue_up(torrent_id):
self.signalmanager.emit("torrent_queue_changed")
component.get("RPCServer").emit_event(TorrentQueueChangedEvent())
except KeyError:
log.warning("torrent_id: %s does not exist in the queue", torrent_id)
@ -700,7 +701,7 @@ class Core(component.Component):
try:
# If the queue method returns True, then we should emit a signal
if self.torrentmanager.queue_down(torrent_id):
self.signalmanager.emit("torrent_queue_changed")
component.get("RPCServer").emit_event(TorrentQueueChangedEvent())
except KeyError:
log.warning("torrent_id: %s does not exist in the queue", torrent_id)
@ -711,7 +712,7 @@ class Core(component.Component):
try:
# If the queue method returns True, then we should emit a signal
if self.torrentmanager.queue_bottom(torrent_id):
self.signalmanager.emit("torrent_queue_changed")
component.get("RPCServer").emit_event(TorrentQueueChangedEvent())
except KeyError:
log.warning("torrent_id: %s does not exist in the queue", torrent_id)

View File

@ -35,6 +35,7 @@ except ImportError:
if not (lt.version_major == 0 and lt.version_minor == 14):
raise ImportError("This version of Deluge requires libtorrent 0.14!")
from deluge.event import *
import deluge.configmanager
import deluge.common
import deluge.component as component
@ -214,7 +215,7 @@ class PreferencesManager(component.Component):
# Config set functions
def _on_config_value_change(self, key, value):
self.signals.emit("config_value_changed", key, value)
component.get("RPCServer").emit_event(ConfigValueChangedEvent(key, value))
def _on_set_torrentfiles_location(self, key, value):
if self.config["copy_torrent_file"]:

View File

@ -137,7 +137,7 @@ class DelugeRPCProtocol(Protocol):
s += ", ".join([key + "=" + str(value) for key, value in call[3].items()])
s += ")"
log.debug("RPCRequest: %s", s)
#log.debug("RPCRequest: %s", s)
reactor.callLater(0, self._dispatch, *call)
def sendData(self, data):
@ -184,6 +184,20 @@ class DelugeRPCProtocol(Protocol):
:param kwargs: dict, the keyword-arguments to pass to `:param:method`
"""
def sendError():
"""
Sends an error response with the contents of the exception that was raised.
"""
exceptionType, exceptionValue, exceptionTraceback = sys.exc_info()
self.sendData((
RPC_ERROR,
request_id,
(exceptionType.__name__,
exceptionValue.message,
"".join(traceback.format_tb(exceptionTraceback)))
))
if method == "daemon.login":
# This is a special case and used in the initial connection process
# We need to authenticate the user here
@ -191,8 +205,9 @@ class DelugeRPCProtocol(Protocol):
ret = component.get("AuthManager").authorize(*args, **kwargs)
if ret:
self.factory.authorized_sessions[self.transport.sessionno] = ret
self.factory.session_protocols[self.transport.sessionno] = self
except Exception, e:
# Send error packet here
sendError()
log.exception(e)
else:
self.sendData((RPC_RESPONSE, request_id, (ret)))
@ -200,6 +215,20 @@ class DelugeRPCProtocol(Protocol):
self.transport.loseConnection()
finally:
return
elif method == "daemon.set_event_interest":
# This special case is to allow clients to set which events they are
# interested in receiving.
# We are expecting a sequence from the client.
try:
if self.transport.sessionno not in self.factory.interested_events:
self.factory.interested_events[self.transport.sessionno] = []
self.factory.interested_events[self.transport.sessionno].extend(args[0])
except Exception, e:
sendError()
else:
self.sendData((RPC_RESPONSE, request_id, (True)))
finally:
return
if method in self.factory.methods:
try:
@ -209,19 +238,9 @@ class DelugeRPCProtocol(Protocol):
# This session is not allowed to call this method
log.debug("Session %s is trying to call a method it is not authorized to call!", self.transport.sessionno)
raise NotAuthorizedError("Auth level too low: %s < %s" % (auth_level, method_auth_requirement))
ret = self.factory.methods[method](*args, **kwargs)
except Exception, e:
# Send an error packet here
exceptionType, exceptionValue, exceptionTraceback = sys.exc_info()
self.sendData((
RPC_ERROR,
request_id,
(exceptionType.__name__,
exceptionValue.message,
"".join(traceback.format_tb(exceptionTraceback)))
))
sendError()
# Don't bother printing out DelugeErrors, because they are just for the client
if not isinstance(e, DelugeError):
log.exception("Exception calling RPC request: %s", e)
@ -249,6 +268,10 @@ class RPCServer(component.Component):
self.factory.methods = {}
# Holds the session_ids and auth levels
self.factory.authorized_sessions = {}
# Holds the protocol objects with the session_id as key
self.factory.session_protocols = {}
# Holds the interested event list for the sessions
self.factory.interested_events = {}
if not listen:
return
@ -320,6 +343,22 @@ class RPCServer(component.Component):
"""
return self.factory.methods.keys()
def emit_event(self, event):
"""
Emits the event to interested clients.
:param event: DelugeEvent
"""
log.debug("intevents: %s", self.factory.interested_events)
# Find sessions interested in this event
for session_id, interest in self.factory.interested_events.iteritems():
if event.name in interest:
log.debug("Emit Event: %s %s", event.name, event.args)
# This session is interested so send a RPC_SIGNAL
self.factory.session_protocols[session_id].sendData(
(RPC_SIGNAL, event.name, event.args)
)
def __generate_ssl_keys(self):
"""
This method generates a new SSL key/cert.

View File

@ -99,7 +99,7 @@ class SignalManager(component.Component):
return
client = self.clients[uri]
try:
client.emit_signal(signal, *data)
client.emit_event_signal(signal, *data)
except (socket.error, Exception), e:
log.warning("Unable to emit signal to client %s: %s (%d)", client, e, count)
if count < 30:

View File

@ -40,8 +40,7 @@ import deluge.common
import deluge.component as component
from deluge.configmanager import ConfigManager
from deluge.log import LOG as log
import deluge.xmlrpclib
from deluge.event import *
TORRENT_STATE = deluge.common.TORRENT_STATE
@ -677,7 +676,7 @@ class Torrent:
# show it as 'Paused'. We need to emit a torrent_paused signal because
# the torrent_paused alert from libtorrent will not be generated.
self.update_state()
self.signals.emit("torrent_paused", self.torrent_id)
component.get("RPCServer").emit_event(TorrentStateChangedEvent(self.torrent_id, "Paused"))
else:
try:
self.handle.pause()
@ -706,7 +705,8 @@ class Torrent:
ratio = self.config["stop_seed_ratio"]
if self.get_ratio() >= ratio:
self.signals.emit("torrent_resume_at_stop_ratio")
#XXX: This should just be returned in the RPC Response, no event
#self.signals.emit_event("torrent_resume_at_stop_ratio")
return
if self.options["auto_managed"]:

View File

@ -42,6 +42,7 @@ except ImportError:
raise ImportError("This version of Deluge requires libtorrent 0.14!")
from deluge.event import *
import deluge.common
import deluge.component as component
from deluge.configmanager import ConfigManager
@ -405,7 +406,7 @@ class TorrentManager(component.Component):
self.save_state()
# Emit the torrent_added signal
self.signals.emit("torrent_added", torrent.torrent_id)
component.get("RPCServer").emit_event(TorrentAddedEvent(torrent.torrent_id))
return torrent.torrent_id
@ -452,7 +453,7 @@ class TorrentManager(component.Component):
self.save_state()
# Emit the signal to the clients
self.signals.emit("torrent_removed", torrent_id)
component.get("RPCServer").emit_event(TorrentRemovedEvent(torrent_id))
return True
@ -637,7 +638,7 @@ class TorrentManager(component.Component):
torrent.is_finished = True
torrent.update_state()
torrent.save_resume_data()
component.get("SignalManager").emit("torrent_finished", torrent_id)
component.get("RPCServer").emit_event(TorrentFinishedEvent(torrent_id))
def on_alert_torrent_paused(self, alert):
log.debug("on_alert_torrent_paused")
@ -645,7 +646,7 @@ class TorrentManager(component.Component):
torrent_id = str(alert.handle.info_hash())
# Set the torrent state
self.torrents[torrent_id].update_state()
component.get("SignalManager").emit("torrent_paused", torrent_id)
component.get("RPCServer").emit_event(TorrentStateChangedEvent(torrent_id, "Paused"))
# Write the fastresume file
self.torrents[torrent_id].save_resume_data()
@ -662,6 +663,10 @@ class TorrentManager(component.Component):
def on_alert_tracker_reply(self, alert):
log.debug("on_alert_tracker_reply: %s", alert.message())
if not alert.handle.is_valid():
# Handle is no longer valid, probably a removed torrent
return
# Get the torrent_id
torrent_id = str(alert.handle.info_hash())
# Set the tracker status for the torrent
@ -725,15 +730,17 @@ class TorrentManager(component.Component):
def on_alert_torrent_resumed(self, alert):
log.debug("on_alert_torrent_resumed")
torrent = self.torrents[str(alert.handle.info_hash())]
torrent_id = str(alert.handle.info_hash())
torrent = self.torrents[torrent_id]
torrent.is_finished = torrent.handle.is_seed()
torrent.update_state()
component.get("RPCServer").emit_event(TorrentResumedEvent(torrent_id))
def on_alert_state_changed(self, alert):
log.debug("on_alert_state_changed")
torrent_id = str(alert.handle.info_hash())
self.torrents[torrent_id].update_state()
component.get("SignalManager").emit("torrent_state_changed", torrent_id)
component.get("RPCServer").emit_event(TorrentStateChangedEvent(torrent_id, self.torrents[torrent_id].state))
def on_alert_save_resume_data(self, alert):
log.debug("on_alert_save_resume_data")
@ -759,7 +766,7 @@ class TorrentManager(component.Component):
folder_rename = True
if len(wait_on_folder[2]) == 1:
# This is the last alert we were waiting for, time to send signal
component.get("SignalManager").emit("torrent_folder_renamed", torrent_id, wait_on_folder[0], wait_on_folder[1])
component.get("RPCServer").emit_event(TorrentFolderRenamedEvent(torrent_id, wait_on_folder[0], wait_on_folder[1]))
del torrent.waiting_on_folder_rename[i]
break
# This isn't the last file to be renamed in this folder, so just
@ -768,7 +775,7 @@ class TorrentManager(component.Component):
if not folder_rename:
# This is just a regular file rename so send the signal
component.get("SignalManager").emit("torrent_file_renamed", torrent_id, alert.index, alert.name)
component.get("RPCServer").emit_event(TorrentFileRenamedEvent(torrent_id, alert.index, alert.name))
def on_alert_metadata_received(self, alert):
log.debug("on_alert_metadata_received")

168
deluge/event.py Normal file
View File

@ -0,0 +1,168 @@
#
# event.py
#
# Copyright (C) 2009 Andrew Resch <andrewresch@gmail.com>
#
# Deluge is free software.
#
# You may redistribute it and/or modify it under the terms of the
# GNU General Public License, as published by the Free Software
# Foundation; either version 3 of the License, or (at your option)
# any later version.
#
# deluge is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
# See the GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with deluge. If not, write to:
# The Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor
# Boston, MA 02110-1301, USA.
#
"""
Event module.
This module describes the types of events that can be generated by the daemon
and subsequently emitted to the clients.
"""
class DelugeEvent(object):
"""
The base class for all events.
:prop name: this is the name of the class which is in-turn the event name
:prop args: a list of the attribute values
"""
def _get_name(self):
return self.__class__.__name__
def _get_args(self):
return self.__dict__.values()
name = property(fget=_get_name)
args = property(fget=_get_args)
class TorrentAddedEvent(DelugeEvent):
"""
Emitted when a new torrent is successfully added to the session.
"""
def __init__(self, torrent_id):
"""
:param torrent_id: str, the torrent_id of the torrent that was added
"""
self.torrent_id = torrent_id
class TorrentRemovedEvent(DelugeEvent):
"""
Emitted when a torrent has been removed from the session.
"""
def __init__(self, torrent_id):
"""
:param torrent_id: str, the torrent_id
"""
self.torrent_id = torrent_id
class TorrentStateChangedEvent(DelugeEvent):
"""
Emitted when a torrent changes state.
"""
def __init__(self, torrent_id, state):
"""
:param torrent_id: str, the torrent_id
:param state: str, the new state
"""
self.torrent_id = torrent_id
self.state = state
class TorrentQueueChangedEvent(DelugeEvent):
"""
Emitted when the queue order has changed.
"""
pass
class TorrentFolderRenamedEvent(DelugeEvent):
"""
Emitted when a folder within a torrent has been renamed.
"""
def __init__(self, torrent_id, old, new):
"""
:param torrent_id: str, the torrent_id
:param old: str, the old folder name
:param new: str, the new folder name
"""
self.torrent_id = torrent_id
self.old = old
self.new = new
class TorrentFileRenamedEvent(DelugeEvent):
"""
Emitted when a file within a torrent has been renamed.
"""
def __init__(self, torrent_id, index, name):
"""
:param torrent_id: str, the torrent_id
:param index: int, the index of the file
:param name: str, the new filename
"""
self.torrent_id = torrent_id
self.index = index
self.name = name
class TorrentFinishedEvent(DelugeEvent):
"""
Emitted when a torrent finishes downloading.
"""
def __init__(self, torrent_id):
"""
:param torrent_id: str, the torrent_id
"""
self.torrent_id = torrent_id
class TorrentResumedEvent(DelugeEvent):
"""
Emitted when a torrent resumes from a paused state.
"""
def __init__(self, torrent_id):
"""
:param torrent_id: str, the torrent_id
"""
self.torrent_id = torrent_id
class NewVersionAvailableEvent(DelugeEvent):
"""
Emitted when a more recent version of Deluge is available.
"""
def __init__(self, new_release):
"""
:param new_release: str, the new version that is available
"""
self.new_release = new_release
class SessionPausedEvent(DelugeEvent):
"""
Emitted when the session has been paused.
"""
pass
class SessionResumedEvent(DelugeEvent):
"""
Emitted when the session has been resumed.
"""
pass
class ConfigValueChangedEvent(DelugeEvent):
"""
Emitted when a config value changes in the Core.
"""
def __init__(self, key, value):
"""
:param key: str, the key that changed
:param value: the new value of the `:param:key`
"""
self.key = key
self.value = value

View File

@ -28,6 +28,7 @@ import deluge.rencode as rencode
import zlib
import deluge.common
import deluge.component as component
from deluge.log import LOG as log
if deluge.common.windows_check():
@ -95,9 +96,9 @@ class DelugeRPCRequest(object):
return (self.request_id, self.method, self.args, self.kwargs)
class DelugeRPCProtocol(Protocol):
__rpc_requests = {}
__buffer = None
def connectionMade(self):
self.__rpc_requests = {}
self.__buffer = None
# Set the protocol in the daemon so it can send data
self.factory.daemon.protocol = self
# Get the address of the daemon that we've connected to
@ -144,12 +145,12 @@ class DelugeRPCProtocol(Protocol):
message_type = request[0]
if message_type == RPC_SIGNAL:
signal = request[1]
event = request[1]
# A RPCSignal was received from the daemon so run any handlers
# associated with it.
if signal in self.factory.daemon.signal_handlers:
for handler in self.factory.daemon.signal_handlers[signal]:
handler(*request[2])
if event in self.factory.event_handlers:
for handler in self.factory.event_handlers[event]:
reactor.callLater(0, handler, *request[2])
return
request_id = request[1]
@ -181,15 +182,16 @@ class DelugeRPCProtocol(Protocol):
# response to this request. We use the extra information when printing
# out the error for debugging purposes.
self.__rpc_requests[request.request_id] = request
log.debug("Sending RPCRequest %s: %s", request.request_id, request)
#log.debug("Sending RPCRequest %s: %s", request.request_id, request)
# Send the request in a tuple because multiple requests can be sent at once
self.transport.write(zlib.compress(rencode.dumps((request.format_message(),))))
class DelugeRPCClientFactory(ClientFactory):
protocol = DelugeRPCProtocol
def __init__(self, daemon):
def __init__(self, daemon, event_handlers):
self.daemon = daemon
self.event_handlers = event_handlers
def startedConnecting(self, connector):
log.info("Connecting to daemon at %s:%s..", connector.host, connector.port)
@ -206,37 +208,16 @@ class DelugeRPCClientFactory(ClientFactory):
self.daemon.connected = False
if self.daemon.disconnect_deferred:
self.daemon.disconnect_deferred.callback(reason.value)
self.daemon.generate_event("disconnected")
if self.daemon.disconnect_callback:
self.daemon.disconnect_callback()
class DaemonProxy(object):
__event_handlers = {
"connected": [],
"disconnected": []
}
def register_event_handler(self, event, handler):
"""
Registers a handler that will be called when an event happens.
:params event: str, the event to handle
:params handler: func, the handler function, f()
:raises KeyError: if event is not valid
"""
self.__event_handlers[event].append(handler)
def generate_event(self, event):
"""
Calls the event handlers for `:param:event`.
:param event: str, the event to generate
"""
for handler in self.__event_handlers[event]:
handler()
pass
class DaemonSSLProxy(DaemonProxy):
def __init__(self):
self.__factory = DelugeRPCClientFactory(self)
def __init__(self, event_handlers={}):
self.__factory = DelugeRPCClientFactory(self, event_handlers)
self.__request_counter = 0
self.__deferred = {}
@ -251,6 +232,7 @@ class DaemonSSLProxy(DaemonProxy):
self.connected = False
self.disconnect_deferred = None
self.disconnect_callback = None
def connect(self, host, port, username, password):
"""
@ -268,11 +250,12 @@ class DaemonSSLProxy(DaemonProxy):
self.port = port
self.__connector = reactor.connectSSL(self.host, self.port, self.__factory, ssl.ClientContextFactory())
self.connect_deferred = defer.Deferred()
self.login_deferred = defer.Deferred()
# Upon connect we do a 'daemon.login' RPC
self.connect_deferred.addCallback(self.__on_connect, username, password)
self.connect_deferred.addErrback(self.__on_connect_fail)
self.login_deferred = defer.Deferred()
# XXX: Add the login stuff..
return self.login_deferred
def disconnect(self):
@ -324,31 +307,34 @@ class DaemonSSLProxy(DaemonProxy):
"""
return self.__deferred.pop(request_id)
def register_signal_handler(self, signal, handler):
def register_event_handler(self, event, handler):
"""
Registers a handler function to be called when `:param:signal` is received
Registers a handler function to be called when `:param:event` is received
from the daemon.
:param signal: str, the name of the signal to handle
:param handler: function, the function to be called when `:param:signal`
:param event: str, the name of the event to handle
:param handler: function, the function to be called when `:param:event`
is emitted from the daemon
"""
if signal not in self.signal_handlers:
self.signal_handlers[signal] = []
if event not in self.factory.event_handlers:
# This is a new event to handle, so we need to tell the daemon
# that we're interested in receiving this type of event
self.event_handlers[event] = []
self.call("daemon.set_event_interest", [event])
self.signal_handlers[signal].append(handler)
self.factory.event_handlers[event].append(handler)
def deregister_signal_handler(self, signal, handler):
def deregister_event_handler(self, event, handler):
"""
Deregisters a signal handler.
Deregisters a event handler.
:param signal: str, the name of the signal
:param event: str, the name of the event
:param handler: function, the function registered
"""
if signal in self.signal_handlers and handler in self.signal_handlers[signal]:
self.signal_handlers[signal].remove(handler)
if event in self.event_handlers and handler in self.event_handlers[event]:
self.event_handlers[event].remove(handler)
def __rpcError(self, error_data):
"""
@ -379,16 +365,26 @@ class DaemonSSLProxy(DaemonProxy):
self.__login_deferred.addErrback(self.__on_login_fail)
def __on_connect_fail(self, reason):
log.debug("connect_fail: %s", reason)
self.login_deferred.callback(False)
def __on_login(self, result, username):
self.username = username
# We need to tell the daemon what events we're interested in receiving
if self.__factory.event_handlers:
self.call("daemon.set_event_interest", self.__factory.event_handlers.keys())
self.login_deferred.callback(result)
self.generate_event("connected")
def __on_login_fail(self, result):
self.login_deferred.callback(False)
def set_disconnect_callback(self, cb):
"""
Set a function to be called when the connection to the daemon is lost
for any reason.
"""
self.disconnect_callback = cb
class DaemonClassicProxy(DaemonProxy):
def __init__(self):
import deluge.core.daemon
@ -399,8 +395,6 @@ class DaemonClassicProxy(DaemonProxy):
self.port = 58846
self.user = "localclient"
def disconnect(self):
self.__daemon = None
@ -451,12 +445,11 @@ class Client(object):
"""
__event_handlers = {
"connected": [],
"disconnected": []
}
def __init__(self):
self._daemon_proxy = None
self.disconnect_callback = None
def connect(self, host="127.0.0.1", port=58846, username="", password=""):
"""
@ -470,9 +463,8 @@ class Client(object):
:returns: a Deferred object that will be called once the connection
has been established or fails
"""
self._daemon_proxy = DaemonSSLProxy()
self._daemon_proxy.register_event_handler("connected", self._on_connect)
self._daemon_proxy.register_event_handler("disconnected", self._on_disconnect)
self._daemon_proxy = DaemonSSLProxy(self.__event_handlers)
self._daemon_proxy.set_disconnect_callback(self.__on_disconnect)
d = self._daemon_proxy.connect(host, port, username, password)
return d
@ -544,24 +536,30 @@ class Client(object):
def register_event_handler(self, event, handler):
"""
Registers a handler that will be called when an event happens.
Registers a handler that will be called when an event is received from the daemon.
:params event: str, the event to handle
:params handler: func, the handler function, f()
:raises KeyError: if event is not valid
:params handler: func, the handler function, f(args)
"""
if event not in self.__event_handlers:
self.__event_handlers[event] = []
self.__event_handlers[event].append(handler)
# We need to replicate this in the daemon proxy
if self._daemon_proxy:
self._daemon_proxy.register_event_handler(event, handler)
def __generate_event(self, event):
def deregister_event_handler(self, event, handler):
"""
Calls the event handlers for `:param:event`.
Deregisters a event handler.
:param event: str, the name of the event
:param handler: function, the function registered
:param event: str, the event to generate
"""
for handler in self.__event_handlers[event]:
handler()
if event in self.__event_handlers and handler in self.__event_handlers[event]:
self.__event_handlers[event].remove(handler)
if self._daemon_proxy:
self._daemon_proxy.register_event_handler(event, handler)
def force_call(self, block=False):
# no-op for now.. we'll see if we need this in the future
@ -570,11 +568,16 @@ class Client(object):
def __getattr__(self, method):
return DottedObject(self._daemon_proxy, method)
def _on_connect(self):
self.__generate_event("connected")
def set_disconnect_callback(self, cb):
"""
Set a function to be called whenever the client is disconnected from
the daemon for any reason.
"""
self.disconnect_callback = cb
def _on_disconnect(self):
self.__generate_event("disconnected")
def __on_disconnect(self):
if self.disconnect_callback:
self.disconnect_callback()
# This is the object clients will use
client = Client()

View File

@ -363,6 +363,8 @@ class ConnectionManager(component.Component):
if self.gtkui_config["autoconnect"]:
self.gtkui_config["autoconnect_host_id"] = host_id
component.start()
def on_button_connect_clicked(self, widget=None):
model, row = self.hostlist.get_selection().get_selected()
status = model[row][HOSTLIST_COL_STATUS]

View File

@ -32,8 +32,7 @@ class CoreConfig(component.Component):
log.debug("CoreConfig init..")
component.Component.__init__(self, "CoreConfig", ["Signals"])
self.config = {}
component.get("Signals").connect_to_signal("config_value_changed",
self._on_config_value_changed)
client.register_event_handler("ConfigValueChangedEvent", self.on_configvaluechanged_event)
def start(self):
client.core.get_config().addCallback(self._on_get_config)
@ -50,5 +49,5 @@ class CoreConfig(component.Component):
def _on_get_config(self, config):
self.config = config
def _on_config_value_changed(self, key, value):
def on_configvaluechanged_event(self, key, value):
self.config[key] = value

View File

@ -190,10 +190,10 @@ class FilesTab(Tab):
"on_menuitem_expand_all_activate": self._on_menuitem_expand_all_activate
})
# Connect to the 'torrent_file_renamed' signal
component.get("Signals").connect_to_signal("torrent_file_renamed", self._on_torrent_file_renamed_signal)
component.get("Signals").connect_to_signal("torrent_folder_renamed", self._on_torrent_folder_renamed_signal)
component.get("Signals").connect_to_signal("torrent_removed", self._on_torrent_removed_signal)
# Connect to various events from the daemon
client.register_event_handler("TorrentFileRenamedEvent", self._on_torrentfilerenamed_event)
client.register_event_handler("TorrentFolderRenamedEvent", self._on_torrentfolderrenamed_event)
client.register_event_handler("TorrentRemovedEvent", self._on_torrentremoved_event)
# Attempt to load state
self.load_state()
@ -544,7 +544,7 @@ class FilesTab(Tab):
def _on_filename_editing_canceled(self, renderer):
self._editing_index = None
def _on_torrent_file_renamed_signal(self, torrent_id, index, name):
def _on_torrentfilerenamed_event(self, torrent_id, index, name):
log.debug("index: %s name: %s", index, name)
old_name = self.files_list[torrent_id][index]["path"]
self.files_list[torrent_id][index]["path"] = name
@ -690,7 +690,7 @@ class FilesTab(Tab):
self.treestore.remove(itr)
itr = parent
def _on_torrent_folder_renamed_signal(self, torrent_id, old_folder, new_folder):
def _on_torrentfolderrenamed_event(self, torrent_id, old_folder, new_folder):
log.debug("on_torrent_folder_renamed_signal")
log.debug("old_folder: %s new_folder: %s", old_folder, new_folder)
@ -736,7 +736,7 @@ class FilesTab(Tab):
# and if so, we delete it
self.remove_childless_folders(old_folder_iter_parent)
def _on_torrent_removed_signal(self, torrent_id):
def _on_torrentremoved_event(self, torrent_id):
if torrent_id in self.files_list:
del self.files_list[torrent_id]

View File

@ -49,7 +49,6 @@ from preferences import Preferences
from systemtray import SystemTray
from statusbar import StatusBar
from connectionmanager import ConnectionManager
from signals import Signals
from pluginmanager import PluginManager
from ipcinterface import IPCInterface
@ -128,7 +127,6 @@ class GtkUI:
except Exception, e:
log.error("Unable to initialize gettext/locale!")
log.exception(e)
# Setup signals
try:
import gnome.ui
@ -183,11 +181,7 @@ class GtkUI:
self.ipcinterface = IPCInterface(args)
# We make sure that the UI components start once we get a core URI
client.register_event_handler("connected", self._on_new_core)
client.register_event_handler("disconnected", self._on_no_core)
# Start the signal receiver
self.signal_receiver = Signals()
client.set_disconnect_callback(self.__on_disconnect)
# Initialize various components of the gtkui
self.mainwindow = MainWindow()
@ -250,11 +244,11 @@ class GtkUI:
if self.config["classic_mode"]:
client.start_classic_mode()
self._on_new_core()
return
def _on_new_core(self):
component.start()
def _on_no_core(self):
def __on_disconnect(self):
"""
Called when disconnected from the daemon. We basically just stop all
the components here.
"""
component.stop()

View File

@ -47,10 +47,11 @@ class IPCInterface(component.Component):
if win32api.GetLastError() == winerror.ERROR_ALREADY_EXISTS:
# We already have a running session, send a XMLRPC to the existing session
config = ConfigManager("gtkui.conf")
uri = "http://localhost:" + str(config["signal_port"])
import deluge.xmlrpclib as xmlrpclib
rpc = xmlrpclib.ServerProxy(uri, allow_none=True)
rpc.emit_signal("args_from_external", args)
# XXX: Need new IPC method
# uri = "http://localhost:" + str(config["signal_port"])
# import deluge.xmlrpclib as xmlrpclib
# rpc = xmlrpclib.ServerProxy(uri, allow_none=True)
# rpc.emit_signal("args_from_external", args)
sys.exit(0)
else:
process_args(args)

View File

@ -80,6 +80,9 @@ class MainWindow(component.Component):
log.debug("Showing window")
self.show()
client.register_event_handler("NewVersionAvailableEvent", self.on_newversionavailable_event)
client.register_event_handler("TorrentFinishedEvent", self.on_torrentfinished_event)
def show(self):
try:
component.resume("TorrentView")
@ -215,3 +218,12 @@ class MainWindow(component.Component):
self.update()
else:
self.window.set_title("Deluge")
def on_newversionavailable_event(self, new_version):
if self.config["show_new_releases"]:
from deluge.ui.gtkui.new_release_dialog import NewReleaseDialog
NewReleaseDialog().show(new_version)
def on_torrentfinished_event(self, torrent_id):
from deluge.ui.gtkui.notification import Notification
Notification().notify(torrent_id)

View File

@ -164,6 +164,11 @@ class MenuBar(component.Component):
self.window.main_glade.get_widget("separatormenuitem").hide()
self.window.main_glade.get_widget("menuitem_connectionmanager").hide()
client.register_event_handler("TorrentStateChangedEvent", self.on_torrentstatechanged_event)
client.register_event_handler("TorrentResumedEvent", self.on_torrentresumed_event)
client.register_event_handler("SessionPausedEvent", self.on_sessionpaused_event)
client.register_event_handler("SessionResumedEvent", self.on_sessionresumed_event)
def start(self):
for widget in self.change_sensitivity:
self.window.main_glade.get_widget(widget).set_sensitive(True)
@ -217,6 +222,18 @@ class MenuBar(component.Component):
return sep
### Callbacks ###
def on_torrentstatechanged_event(self, torrent_id, state):
if state == "Paused":
self.update_menu()
def on_torrentresumed_event(self, torrent_id):
self.update_menu()
def on_sessionpaused_event(self):
self.update_menu()
def on_sessionresumed_event(self):
self.update_menu()
## File Menu ##
def on_menuitem_addtorrent_activate(self, data=None):

View File

@ -32,7 +32,7 @@ from deluge.log import LOG as log
class PluginManager(deluge.pluginmanagerbase.PluginManagerBase,
component.Component):
def __init__(self):
component.Component.__init__(self, "PluginManager", depend=["Signals"])
component.Component.__init__(self, "PluginManager")
self.config = ConfigManager("gtkui.conf")
deluge.pluginmanagerbase.PluginManagerBase.__init__(
self, "gtkui.conf", "deluge.plugin.gtkui")

View File

@ -1,158 +0,0 @@
#
# signals.py
#
# Copyright (C) 2007, 2008 Andrew Resch <andrewresch@gmail.com>
#
# Deluge is free software.
#
# You may redistribute it and/or modify it under the terms of the
# GNU General Public License, as published by the Free Software
# Foundation; either version 3 of the License, or (at your option)
# any later version.
#
# deluge is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
# See the GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with deluge. If not, write to:
# The Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor
# Boston, MA 02110-1301, USA.
#
import gtk
import deluge.component as component
from deluge.ui.client import client
from deluge.ui.signalreceiver import SignalReceiver
from deluge.configmanager import ConfigManager
from deluge.log import LOG as log
class Signals(component.Component):
def __init__(self):
component.Component.__init__(self, "Signals")
# self.receiver = SignalReceiver()
self.config = ConfigManager("gtkui.conf")
#self.config["signal_port"] = self.receiver.get_port()
self.config.save()
def start(self):
return
self.receiver.set_remote(not client.is_localhost())
self.receiver.run()
self.receiver.connect_to_signal("torrent_added",
self.torrent_added_signal)
self.receiver.connect_to_signal("torrent_removed",
self.torrent_removed_signal)
self.receiver.connect_to_signal("torrent_paused", self.torrent_paused)
self.receiver.connect_to_signal("torrent_resumed",
self.torrent_resumed)
self.receiver.connect_to_signal("torrent_all_paused",
self.torrent_all_paused)
self.receiver.connect_to_signal("torrent_all_resumed",
self.torrent_all_resumed)
self.receiver.connect_to_signal("config_value_changed",
self.config_value_changed)
self.receiver.connect_to_signal("torrent_queue_changed",
self.torrent_queue_changed)
self.receiver.connect_to_signal("torrent_resume_at_stop_ratio",
self.torrent_resume_at_stop_ratio)
self.receiver.connect_to_signal("new_version_available",
self.new_version_available)
self.receiver.connect_to_signal("args_from_external",
self.args_from_external)
self.receiver.connect_to_signal("torrent_state_changed",
self.torrent_state_changed)
self.receiver.connect_to_signal("torrent_finished",
self.torrent_finished)
def stop(self):
return
try:
self.receiver.shutdown()
except:
pass
def connect_to_signal(self, signal, callback):
"""Connects a callback to a signal"""
#self.receiver.connect_to_signal(signal, callback)
pass
def torrent_finished(self, torrent_id):
log.debug("torrent_finished signal received..")
log.debug("torrent id: %s", torrent_id)
from deluge.ui.gtkui.notification import Notification
Notification().notify(torrent_id)
def torrent_added_signal(self, torrent_id):
log.debug("torrent_added signal received..")
log.debug("torrent id: %s", torrent_id)
# Add the torrent to the treeview
component.get("TorrentView").add_row(torrent_id)
component.get("TorrentView").mark_dirty(torrent_id)
def torrent_removed_signal(self, torrent_id):
log.debug("torrent_remove signal received..")
log.debug("torrent id: %s", torrent_id)
# Remove the torrent from the treeview
component.get("TorrentView").remove_row(torrent_id)
def torrent_paused(self, torrent_id):
log.debug("torrent_paused signal received..")
component.get("TorrentView").mark_dirty(torrent_id)
component.get("TorrentView").update()
component.get("ToolBar").update_buttons()
component.get("MenuBar").update_menu()
def torrent_resumed(self, torrent_id):
log.debug("torrent_resumed signal received..")
component.get("TorrentView").mark_dirty(torrent_id)
component.get("TorrentView").update()
component.get("ToolBar").update_buttons()
component.get("MenuBar").update_menu()
def torrent_all_paused(self):
log.debug("torrent_all_paused signal received..")
component.get("TorrentView").mark_dirty()
component.get("TorrentView").update()
component.get("ToolBar").update_buttons()
component.get("MenuBar").update_menu()
def torrent_all_resumed(self):
log.debug("torrent_all_resumed signal received..")
component.get("TorrentView").mark_dirty()
component.get("TorrentView").update()
component.get("ToolBar").update_buttons()
component.get("MenuBar").update_menu()
def config_value_changed(self, key, value):
log.debug("config_value_changed signal received..")
component.get("StatusBar").config_value_changed(key, value)
def torrent_queue_changed(self):
log.debug("torrent_queue_changed signal received..")
component.get("TorrentView").mark_dirty()
component.get("TorrentView").update()
def torrent_resume_at_stop_ratio(self):
log.debug("torrent_resume_at_stop_ratio")
component.get("StatusBar").display_warning(
text=_("Torrent is past stop ratio."))
def new_version_available(self, value):
log.debug("new_version_available: %s", value)
if self.config["show_new_releases"]:
from deluge.ui.gtkui.new_release_dialog import NewReleaseDialog
NewReleaseDialog().show(value)
def args_from_external(self, value):
log.debug("args_from_external: %s", value)
import ipcinterface
ipcinterface.process_args(value)
def torrent_state_changed(self, value):
log.debug("torrent_state_changed: %s", value)

View File

@ -134,6 +134,8 @@ class StatusBar(component.Component):
# Hide if necessary
self.visible(self.config["show_statusbar"])
client.register_event_handler("ConfigValueChangedEvent", self.on_configvaluechanged_event)
def start(self):
# Add in images and labels
self.remove_item(self.not_connected_item)
@ -262,7 +264,7 @@ class StatusBar(component.Component):
# Only request health status while False
client.core.get_health().addCallback(self._on_get_health)
def config_value_changed(self, key, value):
def on_configvaluechanged_event(self, key, value):
"""This is called when we received a config_value_changed signal from
the core."""

View File

@ -112,7 +112,7 @@ class SystemTray(component.Component):
self.tray_glade.get_widget("menuitem_quitdaemon").hide()
self.tray_glade.get_widget("separatormenuitem4").hide()
component.get("Signals").connect_to_signal("config_value_changed", self.config_value_changed)
client.register_event_handler("ConfigValueChangedEvent", self.config_value_changed)
if not client.connected():
# Hide menu widgets because we're not connected to a host.
for widget in self.hide_widget_list:

View File

@ -29,6 +29,7 @@ import gtk, gtk.glade
import gobject
import deluge.component as component
from deluge.ui.client import client
from deluge.log import LOG as log
from deluge.common import TORRENT_STATE
from deluge.configmanager import ConfigManager
@ -67,6 +68,10 @@ class ToolBar(component.Component):
# Hide if necessary
self.visible(self.config["show_toolbar"])
client.register_event_handler("TorrentStateChangedEvent", self.on_torrentstatechanged_event)
client.register_event_handler("TorrentResumedEvent", self.on_torrentresumed_event)
client.register_event_handler("SessionPausedEvent", self.on_sessionpaused_event)
client.register_event_handler("SessionResumedEvent", self.on_sessionresumed_event)
def start(self):
for widget in self.change_sensitivity:
@ -128,6 +133,19 @@ class ToolBar(component.Component):
self.toolbar.remove(widget)
### Callbacks ###
def on_torrentstatechanged_event(self, torrent_id, state):
if state == "Paused":
self.update_buttons()
def on_torrentresumed_event(self, torrent_id):
self.update_buttons()
def on_sessionpaused_event(self):
self.update_buttons()
def on_sessionresumed_event(self):
self.update_buttons()
def on_toolbutton_add_clicked(self, data):
log.debug("on_toolbutton_add_clicked")
# Use the menubar's callback

View File

@ -193,6 +193,13 @@ class TorrentView(listview.ListView, component.Component):
self.treeview.connect("drag-drop", self.on_drag_drop)
client.register_event_handler("TorrentStateChangedEvent", self.on_torrentstatechanged_event)
client.register_event_handler("TorrentAddedEvent", self.on_torrentadded_event)
client.register_event_handler("TorrentRemovedEvent", self.on_torrentremoved_event)
client.register_event_handler("SessionPausedEvent", self.on_sessionpaused_event)
client.register_event_handler("SessionResumedEvent", self.on_sessionresumed_event)
client.register_event_handler("TorrentQueueChangedEvent", self.on_torrentqueuechanged_event)
def start(self):
"""Start the torrentview"""
# We need to get the core session state to know which torrents are in
@ -424,3 +431,32 @@ class TorrentView(listview.ListView, component.Component):
def on_drag_drop(self, widget, drag_context, x, y, timestamp):
widget.stop_emission("drag-drop")
def on_torrentadded_event(self, torrent_id):
self.add_row(torrent_id)
self.mark_dirty(torrent_id)
def on_torrentremoved_event(self, torrent_id):
self.remove_row(torrent_id)
def on_torrentstatechanged_event(self, torrent_id, state):
# Update the torrents state
for row in self.liststore:
if not torrent_id == row[self.columns["torrent_id"].column_indices[0]]:
continue
row[self.get_column_index("Progress")[1]] = state
self.mark_dirty(torrent_id)
def on_sessionpaused_event(self):
self.mark_dirty()
self.update()
def on_sessionresumed_event(self):
self.mark_dirty()
self.update()
def on_torrentqueuechanged_event(self):
self.mark_dirty()
self.update()

View File

@ -1,151 +0,0 @@
#
# signalreceiver.py
#
# Copyright (C) 2007, 2008 Andrew Resch <andrewresch@gmail.com>
#
# Deluge is free software.
#
# You may redistribute it and/or modify it under the terms of the
# GNU General Public License, as published by the Free Software
# Foundation; either version 3 of the License, or (at your option)
# any later version.
#
# deluge is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
# See the GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with deluge. If not, write to:
# The Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor
# Boston, MA 02110-1301, USA.
#
import sys
import socket
import random
import gobject
from deluge.ui.client import client
import deluge.SimpleXMLRPCServer as SimpleXMLRPCServer
from SocketServer import ThreadingMixIn
import deluge.xmlrpclib as xmlrpclib
import threading
import socket
from deluge.log import LOG as log
class SignalReceiver(ThreadingMixIn, SimpleXMLRPCServer.SimpleXMLRPCServer):
def __init__(self):
log.debug("SignalReceiver init..")
# Set to true so that the receiver thread will exit
self.signals = {}
self.emitted_signals = []
self.remote = False
self.start_server()
def start_server(self, port=None):
# Setup the xmlrpc server
host = "127.0.0.1"
if self.remote:
host = ""
server_ready = False
while not server_ready:
if port:
_port = port
else:
_port = random.randint(40000, 65535)
try:
SimpleXMLRPCServer.SimpleXMLRPCServer.__init__(
self, (host, _port), logRequests=False, allow_none=True)
except socket.error, e:
log.debug("Trying again with another port: %s", e)
except:
log.error("Could not start SignalReceiver XMLRPC server: %s", e)
sys.exit(0)
else:
self.port = _port
server_ready = True
# Register the emit_signal function
self.register_function(self.emit_signal)
def shutdown(self):
"""Shutdowns receiver thread"""
log.debug("Shutting down signalreceiver")
self._shutdown = True
# De-register with the daemon so it doesn't try to send us more signals
try:
client.deregister_client()
client.force_call()
except Exception, e:
log.debug("Unable to deregister client from server: %s", e)
self.socket.shutdown(socket.SHUT_RDWR)
log.debug("Joining listening thread..")
self.listening_thread.join(1.0)
return
def set_remote(self, remote):
self.remote = remote
self.start_server(self.port)
def run(self):
"""This gets called when we start the thread"""
# Register the signal receiver with the core
self._shutdown = False
client.register_client(str(self.port))
self.listening_thread = threading.Thread(target=self.handle_thread)
gobject.timeout_add(50, self.handle_signals)
try:
self.listening_thread.start()
except Exception, e:
log.debug("Thread: %s", e)
def handle_thread(self):
try:
while not self._shutdown:
self.handle_request()
self._shutdown = False
except Exception, e:
log.debug("handle_thread: %s", e)
def get_port(self):
"""Get the port that the SignalReceiver is listening on"""
return self.port
def emit_signal(self, signal, *data):
"""Exported method used by the core to emit a signal to the client"""
self.emitted_signals.append((signal, data))
return
def handle_signals(self):
for signal, data in self.emitted_signals:
try:
for callback in self.signals[signal]:
gobject.idle_add(callback, *data)
except Exception, e:
log.warning("Unable to call callback for signal %s: %s", signal, e)
self.emitted_signals = []
return True
def connect_to_signal(self, signal, callback):
"""Connect to a signal"""
try:
if callback not in self.signals[signal]:
self.signals[signal].append(callback)
except KeyError:
self.signals[signal] = [callback]