initial checkin of new console ui

pretty alpha code, but it works and gives an idea of the direction the ui might go
This commit is contained in:
Nick 2011-01-26 22:18:18 +01:00
parent b30499c6ac
commit 2f6283ea39
8 changed files with 1405 additions and 3 deletions

View File

@ -61,7 +61,12 @@ schemes = {
"info": ("white", "black", "bold"),
"error": ("red", "black", "bold"),
"success": ("green", "black", "bold"),
"event": ("magenta", "black", "bold")
"event": ("magenta", "black", "bold"),
"selected": ("black", "white", "bold"),
"marked": ("white","blue","bold"),
"selectedmarked": ("blue","white","bold"),
"header": ("green","black","bold"),
"filterstatus": ("green", "blue", "bold")
}
# Colors for various torrent states

View File

@ -213,7 +213,8 @@ class ConsoleUI(component.Component):
# We want to do an interactive session, so start up the curses screen and
# pass it the function that handles commands
colors.init_colors()
self.screen = screen.Screen(stdscr, self.do_command, self.tab_completer, self.encoding)
from modes.alltorrents import AllTorrents
self.screen = AllTorrents(stdscr, self.coreconfig, self.encoding)
self.statusbars = StatusBars()
self.eventlog = EventLog()
@ -271,7 +272,8 @@ class ConsoleUI(component.Component):
"""
if self.interactive:
self.screen.add_line(line, not self.batch_write)
#self.screen.add_line(line, not self.batch_write)
pass
else:
print(colors.strip_colors(line))

View File

View File

@ -0,0 +1,71 @@
#
# add_util.py
#
# Copyright (C) 2011 Nick Lanham <nick@afternight.org>
#
# Modified function from commands/add.py:
# Copyright (C) 2008-2009 Ido Abramovich <ido.deluge@gmail.com>
# Copyright (C) 2009 Andrew Resch <andrewresch@gmail.com>
#
# Deluge is free software.
#
# You may redistribute it and/or modify it under the terms of the
# GNU General Public License, as published by the Free Software
# Foundation; either version 3 of the License, or (at your option)
# any later version.
#
# deluge is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
# See the GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with deluge. If not, write to:
# The Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor
# Boston, MA 02110-1301, USA.
#
# In addition, as a special exception, the copyright holders give
# permission to link the code of portions of this program with the OpenSSL
# library.
# You must obey the GNU General Public License in all respects for all of
# the code used other than OpenSSL. If you modify file(s) with this
# exception, you may extend this exception to your version of the file(s),
# but you are not obligated to do so. If you do not wish to do so, delete
# this exception statement from your version. If you delete this exception
# statement from all source files in the program, then also delete it here.
#
#
from twisted.internet import defer
from deluge.ui.client import client
import deluge.component as component
from optparse import make_option
import os
import base64
def add_torrent(t_file, options, success_cb, fail_cb):
t_options = {}
if options["path"]:
t_options["download_location"] = os.path.expanduser(options["path"])
# Keep a list of deferreds to make a DeferredList
if not os.path.exists(t_file):
fail_cb("{!error!}%s doesn't exist!" % t_file)
return
if not os.path.isfile(t_file):
fail_cb("{!error!}%s is a directory!" % t_file)
return
filename = os.path.split(t_file)[-1]
filedump = base64.encodestring(open(t_file).read())
def on_success(result):
success_cb("{!success!}Torrent added!")
def on_fail(result):
fail_cb("{!error!}Torrent was not added! %s" % result)
client.core.add_torrent_file(filename, filedump, t_options).addCallback(on_success).addErrback(on_fail)

View File

@ -0,0 +1,602 @@
# -*- coding: utf-8 -*-
#
# alltorrens.py
#
# Copyright (C) 2011 Nick Lanham <nick@afternight.org>
#
# Deluge is free software.
#
# You may redistribute it and/or modify it under the terms of the
# GNU General Public License, as published by the Free Software
# Foundation; either version 3 of the License, or (at your option)
# any later version.
#
# deluge is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
# See the GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with deluge. If not, write to:
# The Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor
# Boston, MA 02110-1301, USA.
#
# In addition, as a special exception, the copyright holders give
# permission to link the code of portions of this program with the OpenSSL
# library.
# You must obey the GNU General Public License in all respects for all of
# the code used other than OpenSSL. If you modify file(s) with this
# exception, you may extend this exception to your version of the file(s),
# but you are not obligated to do so. If you do not wish to do so, delete
# this exception statement from your version. If you delete this exception
# statement from all source files in the program, then also delete it here.
#
#
import deluge.component as component
from basemode import BaseMode
import deluge.common
from deluge.ui.client import client
from collections import deque
from deluge.ui.sessionproxy import SessionProxy
from popup import Popup,SelectablePopup,MessagePopup
from add_util import add_torrent
from input_popup import InputPopup
try:
import curses
except ImportError:
pass
import logging
log = logging.getLogger(__name__)
# Big help string that gets displayed when the user hits 'h'
HELP_STR = \
"""This screen shows an overview of the current torrents Deluge is managing.
The currently selected torrent is indicated by having a white background.
You can change the selected torrent using the up/down arrows or the
PgUp/Pg keys.
Operations can be performed on multiple torrents by marking them and
then hitting Enter. See below for the keys used to mark torrents.
You can scroll a popup window that doesn't fit its content (like
this one) using the up/down arrows.
All popup windows can be closed/canceled by hitting the Esc key
(you might need to wait a second for an Esc to register)
The actions you can perform and the keys to perform them are as follows:
'a' - Add a torrent
'h' - Show this help
'f' - Show only torrents in a certain state
(Will open a popup where you can select the state you want to see)
'i' - Show more detailed information about the current selected torrent
'Q' - quit
'm' - Mark a torrent
'M' - Mark all torrents between currently selected torrent
and last marked torrent
'c' - Un-mark all torrents
Enter - Show torrent actions popup. Here you can do things like
pause/resume, remove, recheck and so one. These actions
apply to all currently marked torrents. The currently
selected torrent is automatically marked when you press enter.
'q'/Esc - Close a popup
"""
HELP_LINES = HELP_STR.split('\n')
class ACTION:
PAUSE=0
RESUME=1
REANNOUNCE=2
EDIT_TRACKERS=3
RECHECK=4
REMOVE=5
class FILTER:
ALL=0
ACTIVE=1
DOWNLOADING=2
SEEDING=3
PAUSED=4
CHECKING=5
ERROR=6
QUEUED=7
class StateUpdater(component.Component):
def __init__(self, cb, sf,tcb):
component.Component.__init__(self, "AllTorrentsStateUpdater", 1, depend=["SessionProxy"])
self._status_cb = cb
self._status_fields = sf
self.status_dict = {}
self._torrent_cb = tcb
self._torrent_to_update = None
def set_torrent_to_update(self, tid, keys):
self._torrent_to_update = tid
self._torrent_keys = keys
def start(self):
component.get("SessionProxy").get_torrents_status(self.status_dict, self._status_fields).addCallback(self._on_torrents_status,False)
def update(self):
component.get("SessionProxy").get_torrents_status(self.status_dict, self._status_fields).addCallback(self._on_torrents_status,True)
if self._torrent_to_update:
component.get("SessionProxy").get_torrent_status(self._torrent_to_update, self._torrent_keys).addCallback(self._torrent_cb)
def _on_torrents_status(self, state, refresh):
self._status_cb(state,refresh)
class AllTorrents(BaseMode, component.Component):
def __init__(self, stdscr, coreconfig, encoding=None):
self.curstate = None
self.cursel = 1
self.curoff = 1
self.column_string = ""
self.popup = None
self.messages = deque()
self.marked = []
self.last_mark = -1
self._sorted_ids = None
self._go_top = False
self._curr_filter = None
self.coreconfig = coreconfig
BaseMode.__init__(self, stdscr, encoding)
curses.curs_set(0)
self.stdscr.notimeout(0)
self.sessionproxy = SessionProxy()
self._status_fields = ["queue","name","total_wanted","state","progress","num_seeds","total_seeds",
"num_peers","total_peers","download_payload_rate", "upload_payload_rate"]
self.updater = StateUpdater(self.set_state,self._status_fields,self._on_torrent_status)
self.column_names = ["#", "Name","Size","State","Progress","Seeders","Peers","Down Speed","Up Speed"]
self._update_columns()
self._info_fields = [
("Name",None,("name",)),
("State", None, ("state",)),
("Down Speed", self._format_speed, ("download_payload_rate",)),
("Up Speed", self._format_speed, ("upload_payload_rate",)),
("Progress", self._format_progress, ("progress",)),
("ETA", deluge.common.ftime, ("eta",)),
("Path", None, ("save_path",)),
("Downloaded",deluge.common.fsize,("all_time_download",)),
("Uploaded", deluge.common.fsize,("total_uploaded",)),
("Share Ratio", lambda x:x < 0 and "" or "%.3f"%x, ("ratio",)),
("Seeders",self._format_seeds_peers,("num_seeds","total_seeds")),
("Peers",self._format_seeds_peers,("num_peers","total_peers")),
("Active Time",deluge.common.ftime,("active_time",)),
("Seeding Time",deluge.common.ftime,("seeding_time",)),
("Date Added",deluge.common.fdate,("time_added",)),
("Availability", lambda x:x < 0 and "" or "%.3f"%x, ("distributed_copies",)),
("Pieces", self._format_pieces, ("num_pieces","piece_length")),
]
self._status_keys = ["name","state","download_payload_rate","upload_payload_rate",
"progress","eta","all_time_download","total_uploaded", "ratio",
"num_seeds","total_seeds","num_peers","total_peers", "active_time",
"seeding_time","time_added","distributed_copies", "num_pieces",
"piece_length","save_path"]
def _update_columns(self):
self.column_widths = [5,-1,15,13,10,10,10,15,15]
req = sum(filter(lambda x:x >= 0,self.column_widths))
if (req > self.cols): # can't satisfy requests, just spread out evenly
cw = int(self.cols/len(self.column_names))
for i in range(0,len(self.column_widths)):
self.column_widths[i] = cw
else:
rem = self.cols - req
var_cols = len(filter(lambda x: x < 0,self.column_widths))
vw = int(rem/var_cols)
for i in range(0, len(self.column_widths)):
if (self.column_widths[i] < 0):
self.column_widths[i] = vw
self.column_string = "{!header!}%s"%("".join(["%s%s"%(self.column_names[i]," "*(self.column_widths[i]-len(self.column_names[i]))) for i in range(0,len(self.column_names))]))
def _trim_string(self, string, w):
return "%s... "%(string[0:w-4])
def _format_column(self, col, lim):
size = len(col)
if (size >= lim - 1):
return self._trim_string(col,lim)
else:
return "%s%s"%(col," "*(lim-size))
def _format_row(self, row):
return "".join([self._format_column(row[i],self.column_widths[i]) for i in range(0,len(row))])
def set_state(self, state, refresh):
self.curstate = state
self.numtorrents = len(state)
if refresh:
self.refresh()
def _scroll_up(self, by):
self.cursel = max(self.cursel - by,1)
if ((self.cursel - 1) < self.curoff):
self.curoff = max(self.cursel - 1,1)
def _scroll_down(self, by):
self.cursel = min(self.cursel + by,self.numtorrents)
if ((self.curoff + self.rows - 5) < self.cursel):
self.curoff = self.cursel - self.rows + 5
def _current_torrent_id(self):
if self._sorted_ids:
return self._sorted_ids[self.cursel-1]
else:
return None
def _selected_torrent_ids(self):
ret = []
for i in self.marked:
ret.append(self._sorted_ids[i-1])
return ret
def _on_torrent_status(self, state):
if (self.popup):
self.popup.clear()
name = state["name"]
off = int((self.cols/4)-(len(name)/2))
self.popup.set_title(name)
for i,f in enumerate(self._info_fields):
if f[1] != None:
args = []
try:
for key in f[2]:
args.append(state[key])
except:
log.debug("Could not get info field: %s",e)
continue
info = f[1](*args)
else:
info = state[f[2][0]]
self.popup.add_line("{!info!}%s: {!input!}%s"%(f[0],info))
self.refresh()
else:
self.updater.set_torrent_to_update(None,None)
def on_resize(self, *args):
BaseMode.on_resize_norefresh(self, *args)
self._update_columns()
if self.popup:
self.popup.handle_resize()
self.refresh()
def _queue_sort(self, v1, v2):
if v1 == v2:
return 0
if v2 < 0:
return -1
if v1 < 0:
return 1
if v1 > v2:
return 1
if v2 > v1:
return -1
def _sort_torrents(self, state):
"sorts by queue #"
return sorted(state,cmp=self._queue_sort,key=lambda s:state.get(s)["queue"])
def _format_speed(self, speed):
if (speed > 0):
return deluge.common.fspeed(speed)
else:
return "-"
def _format_queue(self, qnum):
if (qnum >= 0):
return "%d"%(qnum+1)
else:
return ""
def _format_seeds_peers(self, num, total):
return "%d (%d)"%(num,total)
def _format_pieces(self, num, size):
return "%d (%s)"%(num,deluge.common.fsize(size))
def _format_progress(self, perc):
return "%.2f%%"%perc
def _action_error(self, error):
rerr = error.value
self.report_message("An Error Occurred","%s got error %s: %s"%(rerr.method,rerr.exception_type,rerr.exception_msg))
self.refresh()
def _torrent_action(self, idx, data):
ids = self._selected_torrent_ids()
if ids:
if data==ACTION.PAUSE:
log.debug("Pausing torrents: %s",ids)
client.core.pause_torrent(ids).addErrback(self._action_error)
elif data==ACTION.RESUME:
log.debug("Resuming torrents: %s", ids)
client.core.resume_torrent(ids).addErrback(self._action_error)
elif data==ACTION.REMOVE:
log.error("Can't remove just yet")
elif data==ACTION.RECHECK:
log.debug("Rechecking torrents: %s", ids)
client.core.force_recheck(ids).addErrback(self._action_error)
elif data==ACTION.REANNOUNCE:
log.debug("Reannouncing torrents: %s",ids)
client.core.force_reannounce(ids).addErrback(self._action_error)
if len(ids) == 1:
self.marked = []
self.last_mark = -1
def _show_torrent_actions_popup(self):
#cid = self._current_torrent_id()
if len(self.marked):
self.popup = SelectablePopup(self,"Torrent Actions",self._torrent_action)
self.popup.add_line("Pause",data=ACTION.PAUSE)
self.popup.add_line("Resume",data=ACTION.RESUME)
self.popup.add_divider()
self.popup.add_line("Update Tracker",data=ACTION.REANNOUNCE)
self.popup.add_divider()
self.popup.add_line("Remove Torrent",data=ACTION.REMOVE)
self.popup.add_line("Force Recheck",data=ACTION.RECHECK)
def _torrent_filter(self, idx, data):
if data==FILTER.ALL:
self.updater.status_dict = {}
self._curr_filter = None
elif data==FILTER.ACTIVE:
self.updater.status_dict = {"state":"Active"}
self._curr_filter = "Active"
elif data==FILTER.DOWNLOADING:
self.updater.status_dict = {"state":"Downloading"}
self._curr_filter = "Downloading"
elif data==FILTER.SEEDING:
self.updater.status_dict = {"state":"Seeding"}
self._curr_filter = "Seeding"
elif data==FILTER.PAUSED:
self.updater.status_dict = {"state":"Paused"}
self._curr_filter = "Paused"
elif data==FILTER.CHECKING:
self.updater.status_dict = {"state":"Checking"}
self._curr_filter = "Checking"
elif data==FILTER.ERROR:
self.updater.status_dict = {"state":"Error"}
self._curr_filter = "Error"
elif data==FILTER.QUEUED:
self.updater.status_dict = {"state":"Queued"}
self._curr_filter = "Queued"
self._go_top = True
def _show_torrent_filter_popup(self):
self.popup = SelectablePopup(self,"Filter Torrents",self._torrent_filter)
self.popup.add_line("All",data=FILTER.ALL)
self.popup.add_line("Active",data=FILTER.ACTIVE)
self.popup.add_line("Downloading",data=FILTER.DOWNLOADING,foreground="green")
self.popup.add_line("Seeding",data=FILTER.SEEDING,foreground="blue")
self.popup.add_line("Paused",data=FILTER.PAUSED)
self.popup.add_line("Error",data=FILTER.ERROR,foreground="red")
self.popup.add_line("Checking",data=FILTER.CHECKING,foreground="cyan")
self.popup.add_line("Queued",data=FILTER.QUEUED,foreground="yellow")
def _do_add(self, result):
log.debug("Doing adding %s (dl to %s)",result["file"],result["path"])
def suc_cb(msg):
self.report_message("Torrent Added",msg)
def fail_cb(msg):
self.report_message("Failed To Add Torrent",msg)
add_torrent(result["file"],result,suc_cb,fail_cb)
def _show_torrent_add_popup(self):
dl = ""
try:
dl = self.coreconfig["download_location"]
except KeyError:
pass
self.popup = InputPopup(self,"Add Torrent (Esc to cancel)",close_cb=self._do_add)
self.popup.add_text_input("Enter path to torrent file:","file")
self.popup.add_text_input("Enter save path:","path",dl)
def report_message(self,title,message):
self.messages.append((title,message))
def refresh(self):
# Something has requested we scroll to the top of the list
if self._go_top:
self.cursel = 1
self.curoff = 1
self._go_top = False
# show a message popup if there's anything queued
if self.popup == None and self.messages:
title,msg = self.messages.popleft()
self.popup = MessagePopup(self,title,msg)
self.stdscr.clear()
# Update the status bars
if self._curr_filter == None:
self.add_string(0,self.topbar)
else:
self.add_string(0,"%s {!filterstatus!}Current filter: %s"%(self.topbar,self._curr_filter))
self.add_string(1,self.column_string)
self.add_string(self.rows - 1, self.bottombar)
# add all the torrents
if self.curstate == {}:
msg = "No torrents match filter".center(self.cols)
self.add_string(3, "{!info!}%s"%msg)
elif self.curstate != None:
tidx = 1
currow = 2
self._sorted_ids = self._sort_torrents(self.curstate)
for torrent_id in self._sorted_ids:
if (tidx < self.curoff):
tidx += 1
continue
ts = self.curstate[torrent_id]
s = self._format_row([self._format_queue(ts["queue"]),
ts["name"],
"%s"%deluge.common.fsize(ts["total_wanted"]),
ts["state"],
self._format_progress(ts["progress"]),
self._format_seeds_peers(ts["num_seeds"],ts["total_seeds"]),
self._format_seeds_peers(ts["num_peers"],ts["total_peers"]),
self._format_speed(ts["download_payload_rate"]),
self._format_speed(ts["upload_payload_rate"])
])
# default style
fg = "white"
bg = "black"
attr = None
if tidx in self.marked:
bg = "blue"
attr = "bold"
if tidx == self.cursel:
bg = "white"
attr = "bold"
if tidx in self.marked:
fg = "blue"
else:
fg = "black"
if ts["state"] == "Downloading":
fg = "green"
elif ts["state"] == "Seeding":
fg = "blue"
elif ts["state"] == "Error":
fg = "red"
elif ts["state"] == "Queued":
fg = "yellow"
elif ts["state"] == "Checking":
fg = "cyan"
if attr:
colorstr = "{!%s,%s,%s!}"%(fg,bg,attr)
else:
colorstr = "{!%s,%s!}"%(fg,bg)
self.add_string(currow,"%s%s"%(colorstr,s))
tidx += 1
currow += 1
if (currow > (self.rows - 2)):
break
else:
self.add_string(1, "Waiting for torrents from core...")
self.stdscr.redrawwin()
self.stdscr.noutrefresh()
if self.popup:
self.popup.refresh()
curses.doupdate()
def _mark_unmark(self,idx):
if idx in self.marked:
self.marked.remove(idx)
self.last_mark = -1
else:
self.marked.append(idx)
self.last_mark = idx
def _doRead(self):
# Read the character
c = self.stdscr.getch()
if self.popup:
if self.popup.handle_read(c):
self.popup = None
self.refresh()
return
if c > 31 and c < 256:
if chr(c) == 'Q':
from twisted.internet import reactor
if client.connected():
def on_disconnect(result):
reactor.stop()
client.disconnect().addCallback(on_disconnect)
else:
reactor.stop()
return
if self.curstate==None or self.popup:
return
#log.error("pressed key: %d\n",c)
#if c == 27: # handle escape
# log.error("CANCEL")
# Navigate the torrent list
if c == curses.KEY_UP:
self._scroll_up(1)
elif c == curses.KEY_PPAGE:
self._scroll_up(int(self.rows/2))
elif c == curses.KEY_DOWN:
self._scroll_down(1)
elif c == curses.KEY_NPAGE:
self._scroll_down(int(self.rows/2))
# Enter Key
elif c == curses.KEY_ENTER or c == 10:
self.marked.append(self.cursel)
self.last_mark = self.cursel
self._show_torrent_actions_popup()
else:
if c > 31 and c < 256:
if chr(c) == 'i':
cid = self._current_torrent_id()
if cid:
self.popup = Popup(self,"Info",close_cb=lambda:self.updater.set_torrent_to_update(None,None))
self.popup.add_line("Getting torrent info...")
self.updater.set_torrent_to_update(cid,self._status_keys)
elif chr(c) == 'm':
self._mark_unmark(self.cursel)
elif chr(c) == 'M':
if self.last_mark >= 0:
self.marked.extend(range(self.last_mark,self.cursel+1))
else:
self._mark_unmark(self.cursel)
elif chr(c) == 'c':
self.marked = []
self.last_mark = -1
elif chr(c) == 'a':
self._show_torrent_add_popup()
elif chr(c) == 'f':
self._show_torrent_filter_popup()
elif chr(c) == 'h':
self.popup = Popup(self,"Help")
for l in HELP_LINES:
self.popup.add_line(l)
self.refresh()

View File

@ -0,0 +1,224 @@
#
# basemode.py
#
# Copyright (C) 2011 Nick Lanham <nick@afternight.org>
#
# Most code in this file taken from screen.py:
# Copyright (C) 2009 Andrew Resch <andrewresch@gmail.com>
#
# Deluge is free software.
#
# You may redistribute it and/or modify it under the terms of the
# GNU General Public License, as published by the Free Software
# Foundation; either version 3 of the License, or (at your option)
# any later version.
#
# deluge is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
# See the GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with deluge. If not, write to:
# The Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor
# Boston, MA 02110-1301, USA.
#
# In addition, as a special exception, the copyright holders give
# permission to link the code of portions of this program with the OpenSSL
# library.
# You must obey the GNU General Public License in all respects for all of
# the code used other than OpenSSL. If you modify file(s) with this
# exception, you may extend this exception to your version of the file(s),
# but you are not obligated to do so. If you do not wish to do so, delete
# this exception statement from your version. If you delete this exception
# statement from all source files in the program, then also delete it here.
#
#
import sys
import logging
try:
import curses
except ImportError:
pass
import deluge.ui.console.colors as colors
try:
import signal
from fcntl import ioctl
import termios
import struct
except:
pass
from twisted.internet import reactor
log = logging.getLogger(__name__)
class CursesStdIO(object):
"""fake fd to be registered as a reader with the twisted reactor.
Curses classes needing input should extend this"""
def fileno(self):
""" We want to select on FD 0 """
return 0
def doRead(self):
"""called when input is ready"""
pass
def logPrefix(self): return 'CursesClient'
class BaseMode(CursesStdIO):
def __init__(self, stdscr, encoding=None):
"""
A mode that provides a curses screen designed to run as a reader in a twisted reactor.
This mode doesn't do much, just shows status bars and "Base Mode" on the screen
Modes should subclass this and provide overrides for:
_doRead(self) - Handle user input
refresh(self) - draw the mode to the screen
add_string(self, row, string) - add a string of text to be displayed.
see method for detailed info
The init method of a subclass *must* call BaseMode.__init__
Useful fields after calling BaseMode.__init__:
self.stdscr - the curses screen
self.rows - # of rows on the curses screen
self.cols - # of cols on the curses screen
self.topbar - top statusbar
self.bottombar - bottom statusbar
"""
log.debug("BaseMode init!")
self.stdscr = stdscr
# Make the input calls non-blocking
self.stdscr.nodelay(1)
# Strings for the 2 status bars
self.topbar = ""
self.bottombar = ""
# Keep track of the screen size
self.rows, self.cols = self.stdscr.getmaxyx()
try:
signal.signal(signal.SIGWINCH, self.on_resize)
except Exception, e:
log.debug("Unable to catch SIGWINCH signal!")
if not encoding:
self.encoding = sys.getdefaultencoding()
else:
self.encoding = encoding
colors.init_colors()
# Do a refresh right away to draw the screen
self.refresh()
def on_resize_norefresh(self, *args):
log.debug("on_resize_from_signal")
# Get the new rows and cols value
self.rows, self.cols = struct.unpack("hhhh", ioctl(0, termios.TIOCGWINSZ ,"\000"*8))[0:2]
curses.resizeterm(self.rows, self.cols)
def on_resize(self, *args):
self.on_resize_norefresh(args)
self.refresh()
def connectionLost(self, reason):
self.close()
def add_string(self, row, string, scr=None, col = 0, pad=True, trim=True):
"""
Adds a string to the desired `:param:row`.
:param row: int, the row number to write the string
:param string: string, the string of text to add
:param scr: curses.window, optional window to add string to instead of self.stdscr
:param col: int, optional starting column offset
:param pad: bool, optional bool if the string should be padded out to the width of the screen
:param trim: bool, optional bool if the string should be trimmed if it is too wide for the screen
The text can be formatted with color using the following format:
"{!fg, bg, attributes, ...!}"
See: http://docs.python.org/library/curses.html#constants for attributes.
Alternatively, it can use some built-in scheme for coloring.
See colors.py for built-in schemes.
"{!scheme!}"
Examples:
"{!blue, black, bold!}My Text is {!white, black!}cool"
"{!info!}I am some info text!"
"{!error!}Uh oh!"
"""
if scr:
screen = scr
else:
screen = self.stdscr
try:
parsed = colors.parse_color_string(string, self.encoding)
except colors.BadColorString, e:
log.error("Cannot add bad color string %s: %s", string, e)
return
for index, (color, s) in enumerate(parsed):
if index + 1 == len(parsed) and pad:
# This is the last string so lets append some " " to it
s += " " * (self.cols - (col + len(s)) - 1)
if trim:
y,x = screen.getmaxyx()
if (col+len(s)) > x:
s = "%s..."%s[0:x-4-col]
screen.addstr(row, col, s, color)
col += len(s)
def refresh(self):
"""
Refreshes the screen.
Updates the lines based on the`:attr:lines` based on the `:attr:display_lines_offset`
attribute and the status bars.
"""
self.stdscr.clear()
# Update the status bars
self.add_string(0, self.topbar)
self.add_string(self.rows - 1, self.bottombar)
self.add_string(1,"{!info!}Base Mode (or subclass hasn't overridden refresh)")
self.stdscr.redrawwin()
self.stdscr.refresh()
def doRead(self):
"""
Called when there is data to be read, ie, input from the keyboard.
"""
# We wrap this function to catch exceptions and shutdown the mainloop
try:
self._doRead()
except Exception, e:
log.exception(e)
reactor.stop()
def _doRead(self):
# Read the character
c = self.stdscr.getch()
self.stdscr.refresh()
def close(self):
"""
Clean up the curses stuff on exit.
"""
curses.nocbreak()
self.stdscr.keypad(0)
curses.echo()
curses.endwin()

View File

@ -0,0 +1,244 @@
#
# input_popup.py
#
# Copyright (C) 2011 Nick Lanham <nick@afternight.org>
#
# Complete function from commands/add.py:
# Copyright (C) 2008-2009 Ido Abramovich <ido.deluge@gmail.com>
# Copyright (C) 2009 Andrew Resch <andrewresch@gmail.com>
#
# Deluge is free software.
#
# You may redistribute it and/or modify it under the terms of the
# GNU General Public License, as published by the Free Software
# Foundation; either version 3 of the License, or (at your option)
# any later version.
#
# deluge is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
# See the GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with deluge. If not, write to:
# The Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor
# Boston, MA 02110-1301, USA.
#
# In addition, as a special exception, the copyright holders give
# permission to link the code of portions of this program with the OpenSSL
# library.
# You must obey the GNU General Public License in all respects for all of
# the code used other than OpenSSL. If you modify file(s) with this
# exception, you may extend this exception to your version of the file(s),
# but you are not obligated to do so. If you do not wish to do so, delete
# this exception statement from your version. If you delete this exception
# statement from all source files in the program, then also delete it here.
#
#
try:
import curses
except ImportError:
pass
import logging,os.path
from popup import Popup
log = logging.getLogger(__name__)
def complete(line):
line = os.path.abspath(os.path.expanduser(line))
ret = []
if os.path.exists(line):
# This is a correct path, check to see if it's a directory
if os.path.isdir(line):
# Directory, so we need to show contents of directory
#ret.extend(os.listdir(line))
for f in os.listdir(line):
# Skip hidden
if f.startswith("."):
continue
f = os.path.join(line, f)
if os.path.isdir(f):
f += "/"
ret.append(f)
else:
# This is a file, but we could be looking for another file that
# shares a common prefix.
for f in os.listdir(os.path.dirname(line)):
if f.startswith(os.path.split(line)[1]):
ret.append(os.path.join( os.path.dirname(line), f))
else:
# This path does not exist, so lets do a listdir on it's parent
# and find any matches.
ret = []
if os.path.isdir(os.path.dirname(line)):
for f in os.listdir(os.path.dirname(line)):
if f.startswith(os.path.split(line)[1]):
p = os.path.join(os.path.dirname(line), f)
if os.path.isdir(p):
p += "/"
ret.append(p)
return ret
class InputPopup(Popup):
def __init__(self,parent_mode,title,width_req=-1,height_req=-1,close_cb=None):
Popup.__init__(self,parent_mode,title,width_req,height_req,close_cb)
self.input_list = []
self.input_states = {}
self.current_input = 0
self.tab_count = 0
self.opts = None
self.opt_off = 0
curses.curs_set(2)
def add_text_input(self, message, name, value="", complete=True):
"""
Add a text input field to the popup.
:param message: string to display above the input field
:param name: name of the field, for the return callback
:param value: initial value of the field
:param complete: should completion be run when tab is hit and this field is active
"""
self.input_list.append(name)
self.input_states[name] = [message,value,0,complete]
def _refresh_lines(self):
crow = 1
for i,ipt in enumerate(self.input_list):
msg,txt,curs,comp = self.input_states[ipt]
if i == self.current_input:
curs_row = crow+1
curs_col = curs
if self.opts:
self.parent.add_string(crow+2,self.opts[self.opt_off:],self.screen,1,False,True)
self.parent.add_string(crow,msg,self.screen,1,False,True)
self.parent.add_string(crow+1,"{!selected!}%s"%txt.ljust(self.width-2),self.screen,1,False,False)
crow += 3
self.screen.move(curs_row,curs_col+1)
def _get_current_input_value(self):
return self.input_states[self.input_list[self.current_input]][1]
def _set_current_input_value(self, val):
self.input_states[self.input_list[self.current_input]][1] = val
def _get_current_cursor(self):
return self.input_states[self.input_list[self.current_input]][2]
def _move_current_cursor(self, amt):
self.input_states[self.input_list[self.current_input]][2] += amt
def _set_current_cursor(self, pos):
self.input_states[self.input_list[self.current_input]][2] = pos
# most of the cursor,input stuff here taken from ui/console/screen.py
def handle_read(self, c):
if c == curses.KEY_UP:
self.current_input = max(0,self.current_input-1)
elif c == curses.KEY_DOWN:
self.current_input = min(len(self.input_list)-1,self.current_input+1)
elif c == curses.KEY_ENTER or c == 10:
if self._close_cb:
vals = {}
for ipt in self.input_list:
vals[ipt] = self.input_states[ipt][1]
curses.curs_set(0)
self._close_cb(vals)
return True # close the popup
elif c == 9:
# Keep track of tab hit count to know when it's double-hit
self.tab_count += 1
if self.tab_count > 1:
second_hit = True
self.tab_count = 0
else:
second_hit = False
# We only call the tab completer function if we're at the end of
# the input string on the cursor is on a space
cur_input = self._get_current_input_value()
cur_cursor = self._get_current_cursor()
if cur_cursor == len(cur_input) or cur_input[cur_cursor] == " ":
if self.opts:
prev = self.opt_off
self.opt_off += self.width-3
# now find previous double space, best guess at a split point
# in future could keep opts unjoined to get this really right
self.opt_off = self.opts.rfind(" ",0,self.opt_off)+2
if second_hit and self.opt_off == prev: # double tap and we're at the end
self.opt_off = 0
else:
opts = complete(cur_input)
if len(opts) == 1: # only one option, just complete it
self._set_current_input_value(opts[0])
self._set_current_cursor(len(opts[0]))
self.tab_count = 0
elif len(opts) > 1 and second_hit: # display multiple options on second tab hit
self.opts = " ".join(opts)
elif c == 27: # close on esc, no action
return True
# Cursor movement
elif c == curses.KEY_LEFT:
if self._get_current_cursor():
self._move_current_cursor(-1)
elif c == curses.KEY_RIGHT:
if self._get_current_cursor() < len(self._get_current_input_value()):
self._move_current_cursor(1)
elif c == curses.KEY_HOME:
self._set_current_cursor(0)
elif c == curses.KEY_END:
self._set_current_cursor(len(self._get_current_input_value()))
if c != 9:
self.opts = None
self.opt_off = 0
self.tab_count = 0
# Delete a character in the input string based on cursor position
if c == curses.KEY_BACKSPACE or c == 127:
cur_input = self._get_current_input_value()
cur_cursor = self._get_current_cursor()
if cur_input and cur_cursor > 0:
self._set_current_input_value(cur_input[:cur_cursor - 1] + cur_input[cur_cursor:])
self._move_current_cursor(-1)
elif c == curses.KEY_DC:
cur_input = self._get_current_input_value()
cur_cursor = self._get_current_cursor()
if cur_input and cur_cursor < len(cur_input):
self._set_current_input_value(cur_input[:cur_cursor] + cur_input[cur_cursor+1:])
elif c > 31 and c < 256:
cur_input = self._get_current_input_value()
cur_cursor = self._get_current_cursor()
# Emulate getwch
stroke = chr(c)
uchar = ""
while not uchar:
try:
uchar = stroke.decode(self.parent.encoding)
except UnicodeDecodeError:
c = self.parent.stdscr.getch()
stroke += chr(c)
if uchar:
if cur_cursor == len(cur_input):
self._set_current_input_value(cur_input+uchar)
else:
# Insert into string
self._set_current_input_value(cur_input[:cur_cursor] + uchar + cur_input[cur_cursor:])
# Move the cursor forward
self._move_current_cursor(1)
self.refresh()
return False

View File

@ -0,0 +1,254 @@
# -*- coding: utf-8 -*-
#
# popup.py
#
# Copyright (C) 2011 Nick Lanham <nick@afternight.org>
#
# Deluge is free software.
#
# You may redistribute it and/or modify it under the terms of the
# GNU General Public License, as published by the Free Software
# Foundation; either version 3 of the License, or (at your option)
# any later version.
#
# deluge is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
# See the GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with deluge. If not, write to:
# The Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor
# Boston, MA 02110-1301, USA.
#
# In addition, as a special exception, the copyright holders give
# permission to link the code of portions of this program with the OpenSSL
# library.
# You must obey the GNU General Public License in all respects for all of
# the code used other than OpenSSL. If you modify file(s) with this
# exception, you may extend this exception to your version of the file(s),
# but you are not obligated to do so. If you do not wish to do so, delete
# this exception statement from your version. If you delete this exception
# statement from all source files in the program, then also delete it here.
#
#
try:
import curses
import signal
except ImportError:
pass
import logging
log = logging.getLogger(__name__)
class Popup:
def __init__(self,parent_mode,title,width_req=-1,height_req=-1,close_cb=None):
"""
Init a new popup. The default constructor will handle sizing and borders and the like.
NB: The parent mode is responsible for calling refresh on any popups it wants to show.
This should be called as the last thing in the parents refresh method.
The parent *must* also call _doRead on the popup instead of/in addition to
running its own _doRead code if it wants to have the popup handle user input.
:param parent_mode: must be a basemode (or subclass) which the popup will be drawn over
:parem title: string, the title of the popup window
Popups have two methods that must be implemented:
refresh(self) - draw the popup window to screen. this default mode simply draws a bordered window
with the supplied title to the screen
add_string(self, row, string) - add string at row. handles triming/ignoring if the string won't fit in the popup
_doRead(self) - handle user input to the popup.
"""
self.parent = parent_mode
if (height_req <= 0):
height_req = int(self.parent.rows/2)
if (width_req <= 0):
width_req = int(self.parent.cols/2)
by = (self.parent.rows/2)-(height_req/2)
bx = (self.parent.cols/2)-(width_req/2)
self.screen = curses.newwin(height_req,width_req,by,bx)
self.title = title
self._close_cb = close_cb
self.height,self.width = self.screen.getmaxyx()
self._divider = None
self._lineoff = 0
self._lines = []
def _refresh_lines(self):
crow = 1
for row,line in enumerate(self._lines):
if (crow >= self.height-1):
break
if (row < self._lineoff):
continue
self.parent.add_string(crow,line,self.screen,1,False,True)
crow+=1
def handle_resize(self):
log.debug("Resizing popup window (actually, just creating a new one)")
self.screen = curses.newwin((self.parent.rows/2),(self.parent.cols/2),(self.parent.rows/4),(self.parent.cols/4))
self.height,self.width = self.screen.getmaxyx()
def refresh(self):
self.screen.clear()
self.screen.border(0,0,0,0)
toff = max(1,int((self.parent.cols/4)-(len(self.title)/2)))
self.parent.add_string(0,"{!white,black,bold!}%s"%self.title,self.screen,toff,False,True)
self._refresh_lines()
self.screen.redrawwin()
self.screen.noutrefresh()
def clear(self):
self._lines = []
def handle_read(self, c):
if c == curses.KEY_UP:
self._lineoff = max(0,self._lineoff -1)
elif c == curses.KEY_DOWN:
if len(self._lines)-self._lineoff > (self.height-2):
self._lineoff += 1
elif c == curses.KEY_ENTER or c == 10 or c == 27: # close on enter/esc
if self._close_cb:
self._close_cb()
return True # close the popup
if c > 31 and c < 256 and chr(c) == 'q':
if self._close_cb:
self._close_cb()
return True # close the popup
self.refresh()
return False
def set_title(self, title):
self.title = title
def add_line(self, string):
self._lines.append(string)
def add_divider(self):
if not self._divider:
self._divider = "-"*(self.width-2)
self._lines.append(self._divider)
class SelectablePopup(Popup):
"""
A popup which will let the user select from some of the lines that
are added.
"""
def __init__(self,parent_mode,title,selection_callback,*args):
Popup.__init__(self,parent_mode,title)
self._selection_callback = selection_callback
self._selection_args = args
self._selectable_lines = []
self._select_data = []
self._line_foregrounds = []
self._selected = -1
def add_line(self, string, selectable=True, data=None, foreground=None):
Popup.add_line(self,string)
self._line_foregrounds.append(foreground)
if selectable:
self._selectable_lines.append(len(self._lines)-1)
self._select_data.append(data)
if self._selected < 0:
self._selected = (len(self._lines)-1)
def _refresh_lines(self):
crow = 1
for row,line in enumerate(self._lines):
if (crow >= self.height-1):
break
if (row < self._lineoff):
continue
fg = self._line_foregrounds[row]
if row == self._selected:
if fg == None: fg = "black"
colorstr = "{!%s,white,bold!}"%fg
else:
if fg == None: fg = "white"
colorstr = "{!%s,black!}"%fg
self.parent.add_string(crow,"- %s%s"%(colorstr,line),self.screen,1,False,True)
crow+=1
def add_divider(self,color="white"):
if not self._divider:
self._divider = "-"*(self.width-6)+" -"
self._lines.append(self._divider)
self._line_foregrounds.append(color)
def handle_read(self, c):
if c == curses.KEY_UP:
#self._lineoff = max(0,self._lineoff -1)
if (self._selected != self._selectable_lines[0] and
len(self._selectable_lines) > 1):
idx = self._selectable_lines.index(self._selected)
self._selected = self._selectable_lines[idx-1]
elif c == curses.KEY_DOWN:
#if len(self._lines)-self._lineoff > (self.height-2):
# self._lineoff += 1
idx = self._selectable_lines.index(self._selected)
if (idx < len(self._selectable_lines)-1):
self._selected = self._selectable_lines[idx+1]
elif c == 27: # close on esc, no action
return True
elif c == curses.KEY_ENTER or c == 10:
idx = self._selectable_lines.index(self._selected)
self._selection_callback(idx,self._select_data[idx],*self._selection_args)
return True
if c > 31 and c < 256 and chr(c) == 'q':
return True # close the popup
self.refresh()
return False
class MessagePopup(Popup):
"""
Popup that just displays a message
"""
import re
_strip_re = re.compile("\{!.*?!\}")
_min_height = 3
def __init__(self, parent_mode, title, message):
self.message = message
self.width= int(parent_mode.cols/2)
lns = self._split_message(self.message)
height = max(len(lns),self._min_height)
Popup.__init__(self,parent_mode,title,height_req=(height+2))
lft = height - len(lns)
if lft:
for i in range(0,int(lft/2)):
lns.insert(0,"")
self._lines = lns
def _split_message(self,message):
ret = []
wl = (self.width-2)
for i in range(0,len(self.message),wl):
l = self.message[i:i+wl]
lp = (wl-len(self._strip_re.sub('',l)))/2
ret.append("%s%s"%(lp*" ",l))
return ret
def handle_resize(self):
Popup.handle_resize(self)
self.clear()
self._lines = self._split_message(self.message)