WebUi Plugin - rev40

Zach Tibbitts 2007-09-28 01:47:12 +00:00
plugins/WebUi/LICENSE
Version 2, June 1991
Copyright (C) 1989, 1991 Free Software Foundation, Inc.
59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
Everyone is permitted to copy and distribute verbatim copies
of this license document, but changing it is not allowed.
The licenses for most software are designed to take away your
freedom to share and change it. By contrast, the GNU General Public
License is intended to guarantee your freedom to share and change free
software--to make sure the software is free for all its users. This
General Public License applies to most of the Free Software
Foundation's software and to any other program whose authors commit to
using it. (Some other Free Software Foundation software is covered by
the GNU Library General Public License instead.) You can apply it to
your programs, too.
When we speak of free software, we are referring to freedom, not
price. Our General Public Licenses are designed to make sure that you
have the freedom to distribute copies of free software (and charge for
this service if you wish), that you receive source code or can get it
if you want it, that you can change the software or use pieces of it
in new free programs; and that you know you can do these things.
To protect your rights, we need to make restrictions that forbid
anyone to deny you these rights or to ask you to surrender the rights.
These restrictions translate to certain responsibilities for you if you
distribute copies of the software, or if you modify it.
For example, if you distribute copies of such a program, whether
gratis or for a fee, you must give the recipients all the rights that
you have. You must make sure that they, too, receive or can get the
source code. And you must show them these terms so they know their
We protect your rights with two steps: (1) copyright the software, and
(2) offer you this license which gives you legal permission to copy,
distribute and/or modify the software.
Also, for each author's protection and ours, we want to make certain
that everyone understands that there is no warranty for this free
software. If the software is modified by someone else and passed on, we
want its recipients to know that what they have is not the original, so
that any problems introduced by others will not reflect on the original
authors' reputations.
Finally, any free program is threatened constantly by software
patents. We wish to avoid the danger that redistributors of a free
program will individually obtain patent licenses, in effect making the
program proprietary. To prevent this, we have made it clear that any
patent must be licensed for everyone's free use or not licensed at all.
The precise terms and conditions for copying, distribution and
modification follow.
0. This License applies to any program or other work which contains
a notice placed by the copyright holder saying it may be distributed
under the terms of this General Public License. The "Program", below,
refers to any such program or work, and a "work based on the Program"
means either the Program or any derivative work under copyright law:
that is to say, a work containing the Program or a portion of it,
either verbatim or with modifications and/or translated into another
language. (Hereinafter, translation is included without limitation in
the term "modification".) Each licensee is addressed as "you".
Activities other than copying, distribution and modification are not
covered by this License; they are outside its scope. The act of
running the Program is not restricted, and the output from the Program
is covered only if its contents constitute a work based on the
Program (independent of having been made by running the Program).
Whether that is true depends on what the Program does.
1. You may copy and distribute verbatim copies of the Program's
source code as you receive it, in any medium, provided that you
conspicuously and appropriately publish on each copy an appropriate
copyright notice and disclaimer of warranty; keep intact all the
notices that refer to this License and to the absence of any warranty;
and give any other recipients of the Program a copy of this License
along with the Program.
You may charge a fee for the physical act of transferring a copy, and
you may at your option offer warranty protection in exchange for a fee.
2. You may modify your copy or copies of the Program or any portion
of it, thus forming a work based on the Program, and copy and
distribute such modifications or work under the terms of Section 1
above, provided that you also meet all of these conditions:
a) You must cause the modified files to carry prominent notices
stating that you changed the files and the date of any change.
b) You must cause any work that you distribute or publish, that in
whole or in part contains or is derived from the Program or any
part thereof, to be licensed as a whole at no charge to all third
parties under the terms of this License.
c) If the modified program normally reads commands interactively
when run, you must cause it, when started running for such
interactive use in the most ordinary way, to print or display an
announcement including an appropriate copyright notice and a
notice that there is no warranty (or else, saying that you provide
a warranty) and that users may redistribute the program under
these conditions, and telling the user how to view a copy of this
License. (Exception: if the Program itself is interactive but
does not normally print such an announcement, your work based on
the Program is not required to print an announcement.)
These requirements apply to the modified work as a whole. If
identifiable sections of that work are not derived from the Program,
and can be reasonably considered independent and separate works in
themselves, then this License, and its terms, do not apply to those
sections when you distribute them as separate works. But when you
distribute the same sections as part of a whole which is a work based
on the Program, the distribution of the whole must be on the terms of
this License, whose permissions for other licensees extend to the
entire whole, and thus to each and every part regardless of who wrote it.
Thus, it is not the intent of this section to claim rights or contest
your rights to work written entirely by you; rather, the intent is to
exercise the right to control the distribution of derivative or
collective works based on the Program.
In addition, mere aggregation of another work not based on the Program
with the Program (or with a work based on the Program) on a volume of
a storage or distribution medium does not bring the other work under
the scope of this License.
3. You may copy and distribute the Program (or a work based on it,
under Section 2) in object code or executable form under the terms of
Sections 1 and 2 above provided that you also do one of the following:
a) Accompany it with the complete corresponding machine-readable
source code, which must be distributed under the terms of Sections
1 and 2 above on a medium customarily used for software interchange; or,
b) Accompany it with a written offer, valid for at least three
years, to give any third party, for a charge no more than your
cost of physically performing source distribution, a complete
machine-readable copy of the corresponding source code, to be
distributed under the terms of Sections 1 and 2 above on a medium
customarily used for software interchange; or,
c) Accompany it with the information you received as to the offer
to distribute corresponding source code. (This alternative is
allowed only for noncommercial distribution and only if you
received the program in object code or executable form with such
an offer, in accord with Subsection b above.)
The source code for a work means the preferred form of the work for
making modifications to it. For an executable work, complete source
code means all the source code for all modules it contains, plus any
associated interface definition files, plus the scripts used to
control compilation and installation of the executable. However, as a
special exception, the source code distributed need not include
anything that is normally distributed (in either source or binary
form) with the major components (compiler, kernel, and so on) of the
operating system on which the executable runs, unless that component
itself accompanies the executable.
If distribution of executable or object code is made by offering
access to copy from a designated place, then offering equivalent
access to copy the source code from the same place counts as
distribution of the source code, even though third parties are not
compelled to copy the source along with the object code.
4. You may not copy, modify, sublicense, or distribute the Program
except as expressly provided under this License. Any attempt
otherwise to copy, modify, sublicense or distribute the Program is
void, and will automatically terminate your rights under this License.
However, parties who have received copies, or rights, from you under
this License will not have their licenses terminated so long as such
parties remain in full compliance.
5. You are not required to accept this License, since you have not
signed it. However, nothing else grants you permission to modify or
distribute the Program or its derivative works. These actions are
prohibited by law if you do not accept this License. Therefore, by
modifying or distributing the Program (or any work based on the
Program), you indicate your acceptance of this License to do so, and
all its terms and conditions for copying, distributing or modifying
the Program or works based on it.
6. Each time you redistribute the Program (or any work based on the
Program), the recipient automatically receives a license from the
original licensor to copy, distribute or modify the Program subject to
these terms and conditions. You may not impose any further
restrictions on the recipients' exercise of the rights granted herein.
You are not responsible for enforcing compliance by third parties to
this License.
7. If, as a consequence of a court judgment or allegation of patent
infringement or for any other reason (not limited to patent issues),
conditions are imposed on you (whether by court order, agreement or
otherwise) that contradict the conditions of this License, they do not
excuse you from the conditions of this License. If you cannot
distribute so as to satisfy simultaneously your obligations under this
License and any other pertinent obligations, then as a consequence you
may not distribute the Program at all. For example, if a patent
license would not permit royalty-free redistribution of the Program by
all those who receive copies directly or indirectly through you, then
the only way you could satisfy both it and this License would be to
refrain entirely from distribution of the Program.
If any portion of this section is held invalid or unenforceable under
any particular circumstance, the balance of the section is intended to
apply and the section as a whole is intended to apply in other
It is not the purpose of this section to induce you to infringe any
patents or other property right claims or to contest validity of any
such claims; this section has the sole purpose of protecting the
integrity of the free software distribution system, which is
implemented by public license practices. Many people have made
generous contributions to the wide range of software distributed
through that system in reliance on consistent application of that
system; it is up to the author/donor to decide if he or she is willing
to distribute software through any other system and a licensee cannot
impose that choice.
This section is intended to make thoroughly clear what is believed to
be a consequence of the rest of this License.
8. If the distribution and/or use of the Program is restricted in
certain countries either by patents or by copyrighted interfaces, the
original copyright holder who places the Program under this License
may add an explicit geographical distribution limitation excluding
those countries, so that distribution is permitted only in or among
countries not thus excluded. In such case, this License incorporates
the limitation as if written in the body of this License.
9. The Free Software Foundation may publish revised and/or new versions
of the General Public License from time to time. Such new versions will
be similar in spirit to the present version, but may differ in detail to
address new problems or concerns.
Each version is given a distinguishing version number. If the Program
specifies a version number of this License which applies to it and "any
later version", you have the option of following the terms and conditions
either of that version or of any later version published by the Free
Software Foundation. If the Program does not specify a version number of
this License, you may choose any version ever published by the Free Software
10. If you wish to incorporate parts of the Program into other free
programs whose distribution conditions are different, write to the author
to ask for permission. For software which is copyrighted by the Free
Software Foundation, write to the Free Software Foundation; we sometimes
make exceptions for this. Our decision will be guided by the two goals
of preserving the free status of all derivatives of our free software and
of promoting the sharing and reuse of software generally.
How to Apply These Terms to Your New Programs
If you develop a new program, and you want it to be of the greatest
possible use to the public, the best way to achieve this is to make it
free software which everyone can redistribute and change under these terms.
To do so, attach the following notices to the program. It is safest
to attach them to the start of each source file to most effectively
convey the exclusion of warranty; and each file should have at least
the "copyright" line and a pointer to where the full notice is found.
<one line to give the program's name and a brief idea of what it does.>
Copyright (C) <year> <name of author>
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; either version 2 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
Also add information on how to contact you by electronic and paper mail.
If the program is interactive, make it output a short notice like this
when it starts in an interactive mode:
Gnomovision version 69, Copyright (C) year name of author
Gnomovision comes with ABSOLUTELY NO WARRANTY; for details type `show w'.
This is free software, and you are welcome to redistribute it
under certain conditions; type `show c' for details.
The hypothetical commands `show w' and `show c' should show the appropriate
parts of the General Public License. Of course, the commands you use may
be called something other than `show w' and `show c'; they could even be
mouse-clicks or menu items--whatever suits your program.
You should also get your employer (if you work as a programmer) or your
school, if any, to sign a "copyright disclaimer" for the program, if
necessary. Here is a sample; alter the names:
Yoyodyne, Inc., hereby disclaims all copyright interest in the program
`Gnomovision' (which makes passes at compilers) written by James Hacker.
<signature of Ty Coon>, 1 April 1989
Ty Coon, President of Vice
This General Public License does not permit incorporating your program into
proprietary programs. If your program is a subroutine library, you may
consider it more useful to permit linking proprietary applications with the
library. If this is what you want to do, use the GNU Library General
Public License instead of this License.
In addition, as a special exception, the copyright holders give
permission to link the code of portions of this program with the OpenSSL
You must obey the GNU General Public License in all respects for all of
the code used other than OpenSSL. If you modify file(s) with this
exception, you may extend this exception to your version of the file(s),
but you are not obligated to do so. If you do not wish to do so, delete
this exception statement from your version. If you delete this exception
statement from all source files in the program, then also delete it here.

plugins/WebUi/__init__.py
@ -0,0 +1,197 @@
# -*- coding: utf-8 -*-
# Dbus Ipc for experimental web interface
# dbus_interface.py
# Copyright (C) Martijn Voncken 2007 <mvoncken@gmail.com>
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2, or (at your option)
# any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# GNU General Public License for more details.
# You should have received a copy of the GNU General Public License
# along with this program. If not, write to:
# The Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor
# Boston, MA 02110-1301, USA.
# In addition, as a special exception, the copyright holders give
# permission to link the code of portions of this program with the OpenSSL
# library.
# You must obey the GNU General Public License in all respects for all of
# the code used other than OpenSSL. If you modify file(s) with this
# exception, you may extend this exception to your version of the file(s),
# but you are not obligated to do so. If you do not wish to do so, delete
# this exception statement from your version. If you delete this exception
plugin_name = "Web User interface"
plugin_author = "Martijn Voncken"
plugin_version = "rev."
plugin_description = """A Web based User Interface (and dbus-ipc)
beta test version, disclaimer, etc..
import deluge.common, deluge.pref
from dbus_interface import DbusManager
import gtk
import os
from subprocess import Popen
plugin_version += open(os.path.join(os.path.dirname(__file__),'revno')).read()
def deluge_init(deluge_path):
global path
path = deluge_path
def enable(core, interface):
global path
return plugin_WebUi(path, core, interface)
class plugin_WebUi:
def __init__(self, path, deluge_core, deluge_interface):
self.path = path
self.core = deluge_core
self.interface = deluge_interface
self.proc = None
self.config_file = deluge.common.CONFIG_DIR + "/webui.conf"
self.config = deluge.pref.Preferences(self.config_file, False)
except IOError:
# File does not exist
if not self.config.get('port'): #ugly way to detect new config file.
#set default values:
self.config.set("port", 8112)
self.config.set("user", "deluge")
self.config.set("pwd", "deluge")
#future->use deluge-core setting for download_dir (if it is set)
self.config.set("download_dir", os.path.expanduser("~/"))
self.config.set("torrent_dir", os.path.expanduser("~/"))
self.config.set("auto_refresh", False)
self.config.set("auto_refresh_secs", 4)
self.config.set("template", "deluge")
self.dbusManager = DbusManager(deluge_core, deluge_interface
, self.config, self.config_file)
def unload(self):
print 'WebUI:unload..'
def update(self):
## This will be only called if your plugin is configurable
def configure(self,parent_dialog):
d = ConfigDialog(self.config,self)
if d.run() == gtk.RESPONSE_OK:
def start_server(self):
print 'start Webui..'
path = os.path.dirname(__file__)
server_bin = path + '/run_webserver'
port = str(self.config.get('port'))
self.proc = Popen((server_bin, port),cwd=path)
def kill_server(self):
if self.proc:
print "webserver: kill %i"%self.proc.pid
os.system("kill %i"%self.proc.pid)
self.proc = None
class ConfigDialog(gtk.Dialog):
sorry, can't get used to gui builders.
from what I read glade is better, but i dont want to invest time in them.
def __init__(self, config, plugin):
self.config = config
self.plugin = plugin
self.vb = gtk.VBox()
self.set_title(_("WebUi Config"))
template_path = os.path.join(os.path.dirname(__file__), 'templates')
self.templates = [dirname for dirname
in os.listdir(template_path)
if os.path.isdir(os.path.join(template_path, dirname))]
self.port = self.add_widget(_('Port Number'), gtk.SpinButton())
self.user = self.add_widget(_('User'), gtk.Entry())
self.pwd = self.add_widget(_('Password'), gtk.Entry())
self.template = self.add_widget(_('Template'), gtk.combo_box_new_text())
self.download_dir = self.add_widget(_('Download Directory'),
gtk.FileChooserButton(_('Download Directory')))
self.torrent_dir = self.add_widget(_('Torrent Directory'),
gtk.FileChooserButton(_('Torrent Directory')))
self.port.set_range(80, 65536)
self.port.set_increments(1, 10)
for item in self.templates:
if not self.config.get("template") in self.templates:
self.vbox.pack_start(self.vb, True, True, 0)
self.add_buttons(gtk.STOCK_CANCEL, gtk.RESPONSE_CANCEL
def add_widget(self,label,w=None):
hb = gtk.HBox()
lbl = gtk.Label(label)
hb.pack_start(lbl,False,False, 0)
hb.pack_start(w,True,True, 0)
self.vb.pack_start(hb,False,False, 0)
return w
self.add_buttons(dgtk.STOCK_CLOSE, dgtk.RESPONSE_CLOSE)
def save_config(self):
print 'save config'
self.config.set("user", self.user.get_text())
self.config.set("pwd", self.pwd.get_text())
self.config.set("port", int(self.port.get_value()))
self.config.set("template", self.template.get_active_text())
self.config.set("torrent_dir", self.torrent_dir.get_filename())
self.plugin.start_server() #restarts server

@ -0,0 +1,238 @@
# -*- coding: utf-8 -*-
# Dbus Ipc for experimental web interface
# dbus_interface.py
# Copyright (C) Martijn Voncken 2007 <mvoncken@gmail.com>
# Contains copy and pasted code from other parts of deluge,see deluge AUTHORS
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2, or (at your option)
# any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# GNU General Public License for more details.
# You should have received a copy of the GNU General Public License
# along with this program. If not, write to:
# The Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor
# Boston, MA 02110-1301, USA.
# In addition, as a special exception, the copyright holders give
# permission to link the code of portions of this program with the OpenSSL
# library.
# You must obey the GNU General Public License in all respects for all of
# the code used other than OpenSSL. If you modify file(s) with this
# exception, you may extend this exception to your version of the file(s),
# but you are not obligated to do so. If you do not wish to do so, delete
# this exception statement from your version. If you delete this exception
# statement from all source files in the program, then also delete it here.
import os
import gtk
import dbus
import deluge.common as common
from dbus_pythonize import pythonize
import base64
import random
class DbusManager(dbus.service.Object):
def __init__(self, core, interface,config,config_file):
self.core = core
self.interface = interface
self.config = config
self.config_file = config_file
self.bus = dbus.SessionBus()
bus_name = dbus.service.BusName(dbus_interface,bus=self.bus)
dbus.service.Object.__init__(self, bus_name,dbus_service)
#todo : add: get_interface_version in=i,get_config_value in=s out=s
def get_torrent_state(self):
"""Returns a list of torrent_ids in the session.
same as 0.6, but returns type "as" instead of a pickle
torrent_list = [str(key) for key in self.core.unique_IDs]
return torrent_list
def get_torrent_status(self, torrent_id, keys):
"""return torrent metadata of a single torrent as a dict
0.6 returns a pickle, this returns a dbus-type.
+added some more values to the dict
torrent_id = int(torrent_id)
# Convert the array of strings to a python list of strings
nkeys = [str(key) for key in keys]
state = self.core.get_torrent_state(torrent_id)
torrent = self.core.unique_IDs[torrent_id]
status = {
"name": state["name"],
"total_size": state["total_size"],
"num_pieces": state["num_pieces"],
#"state": int(status.state), #?
"paused": self.core.is_user_paused(torrent_id),
"progress": int(state["progress"] * 100),
"next_announce": state["next_announce"],
"total_payload_upload": state["total_payload_upload"],
"download_payload_rate": state["download_rate"],
"upload_payload_rate": state["upload_rate"],
"num_peers": state["num_peers"],
"num_seeds": state["num_seeds"],
"total_wanted": state["total_wanted"],
"eta": common.estimate_eta(state),
"ratio": self.interface.manager.calc_ratio(torrent_id,state),
#non 0.6 values follow here:
"message": self.interface.get_message_from_state(state),
"tracker_status": state.get("tracker_status","?"),
"uploaded_memory": torrent.uploaded_memory,
#more non 0.6 values
for key in ["total_seeds", "total_peers","is_seed", "total_done",
"total_download", "total_upload", "download_rate",
"upload_rate", "num_files", "piece_length", "distributed_copies"
status[key] = state[key]
#print 'all_keys:',sorted(status.keys())
status_subset = {}
for key in keys:
if key in status:
status_subset[key] = status[key]
print 'mbus error,no key named:', key
return status_subset
def pause_torrent(self, torrent_id):
"""same as 0.6 interface"""
torrent_id = int(torrent_id)
in_signature="s", out_signature="")
def resume_torrent(self, torrent_id):
"""same as 0.6 interface"""
torrent_id = int(torrent_id)
in_signature="sbb", out_signature="")
def remove_torrent(self, torrent_id, data_also, torrent_also):
"""remove a torrent,and optionally data and torrent
additions compared to 0.6 interface: (data_also, torrent_also)
torrent_id = int(torrent_id)
self.core.remove_torrent(torrent_id, bool(data_also)
,bool( torrent_also))
#this should not be needed:
in_signature="s", out_signature="b")
def add_torrent_url(self, url):
"""not available in deluge 0.6 interface"""
filename = fetch_url(url)
return True
in_signature="ss", out_signature="b")
def add_torrent_filecontent(self, name, filecontent_b64):
"""not available in deluge 0.6 interface"""
#name = fillename without directory
name = name.replace('\\','/')
name = 'deluge_' + str(random.random()) + '_' + name.split('/')[-1]
filename = os.path.join(self.config.get("torrent_dir"),name)
filecontent = base64.b64decode(filecontent_b64)
f = open(filename,"wb") #no with statement, that's py 2.5+
print 'write:',filename
return True
def get_webui_config(self,key):
return data from wevbui config.
not in 0.6
retval = self.config.get(str(key))
print 'get webui config:', str(key), retval
if retval == None:
retval = False #dbus does not accept None :(
return retval
def set_webui_config(self, key, value):
return data from wevbui config.
not in 0.6
print 'set webui config:', str(key), pythonize(value)
self.config.set(str(key), pythonize(value))
def _add_torrent(self, filename):
#dbus types break pickle, again.....
filename = unicode(filename)
target = self.config.get("download_dir")
torrent_id = self.core.add_torrent(filename, target,
#update gtk-ui This should not be needed!!
#finally is 2.5 only!
return True
def fetch_url(url):
import urllib
filename, headers = urllib.urlretrieve(url)
except IOError:
raise Exception( "Network error while trying to fetch torrent from %s"
% url)
if (filename.endswith(".torrent") or
return filename
raise Exception("URL doesn't appear to be a valid torrent file:%s"
% url)
return None

@ -0,0 +1,38 @@
some dbus to python type conversions
-decorator for interface
-wrapper class for proxy
def pythonize(var):
"""translates dbus types back to basic python types."""
if isinstance(var, list):
return [pythonize(value) for value in var]
if isinstance(var, tuple):
return tuple([pythonize(value) for value in var])
if isinstance(var, dict):
return dict(
[(pythonize(key), pythonize(value)) for key, value in var.iteritems()]
for klass in [unicode, str, bool, int, float, long]:
if isinstance(var,klass):
return klass(var)
return var
def pythonize_call(func):
def deco(*args,**kwargs):
return pythonize(func(*args, **kwargs))
return deco
def pythonize_interface(func):
def deco(*args, **kwargs):
args = pythonize(args)
kwargs = pythonize(kwargs)
return func(*args, **kwargs)
return deco
class PythonizeProxy(object):
def __init__(self,proxy):
self.proxy = proxy
def __getattr__(self, key):
return pythonize_call(getattr(self.proxy, key))

@ -0,0 +1,391 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# deluge_webserver.py
# Copyright (C) Martijn Voncken 2007 <mvoncken@gmail.com>
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2, or (at your option)
# any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# GNU General Public License for more details.
# You should have received a copy of the GNU General Public License
# along with this program. If not, write to:
# The Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor
# Boston, MA 02110-1301, USA.
# In addition, as a special exception, the copyright holders give
# permission to link the code of portions of this program with the OpenSSL
# library.
# You must obey the GNU General Public License in all respects for all of
# the code used other than OpenSSL. If you modify file(s) with this
# exception, you may extend this exception to your version of the file(s),
# but you are not obligated to do so. If you do not wish to do so, delete
# this exception statement from your version. If you delete this exception
# statement from all source files in the program, then also delete it here.
Todo's before beta:
-alternating rows?
-__init__:unload plugin is broken!
-__init__:kill->restart is not waiting for kill to be finished.
-redir is broken.
-set prio
-clear finished?
-torrent files.
import webpy022 as web
from webpy022.webapi import cookies, setcookie
from webpy022.http import seeother, url
from webpy022.utils import Storage
from webpy022 import template
import dbus
import gettext, os, platform, locale
import random
import base64
from operator import attrgetter
from deluge import common
from deluge.common import INSTALL_PREFIX
APP = 'deluge'
DIR = os.path.join(INSTALL_PREFIX, 'share', 'locale')
if platform.system() != "Windows":
locale.setlocale(locale.LC_MESSAGES, '')
locale.bindtextdomain(APP, DIR)
locale.setlocale(locale.LC_ALL, '')
gettext.bindtextdomain(APP, DIR)
gettext.install(APP, DIR)
bus = dbus.SessionBus()
proxy = bus.get_object("org.deluge_torrent.dbusplugin"
, "/org/deluge_torrent/DelugeDbusPlugin")
web.webapi.internalerror = web.debugerror
def do_redirect():
"""for redirects after a POST"""
vars = web.input(redir = None)
ck = cookies()
if vars.redir:
elif ("order" in ck and "sort" in ck):
seeother(url("/index", sort=ck['sort'], order=ck['order']))
def deluge_page_noauth(func):
add http headers
print result of func
def deco(self, name=None):
web.header("Content-Type", "text/html; charset=utf-8")
web.header("Cache-Control", "no-cache, must-revalidate")
res = func(self, name)
print res
return deco
def check_session(func):
a decorator
return func if session is valid, else redirect to login page.
def deco(self, name):
ck = cookies()
if ck.has_key("session_id") and ck["session_id"] in SESSIONS:
return func(self, name) #ok, continue..
seeother("/login") #do not continue, and redirect to login page
return deco
def deluge_page(func):
return check_session(deluge_page_noauth(func))
def auto_refreshed(func):
"decorator:adds a refresh header"
def deco(self, name):
if proxy.get_webui_config('auto_refresh'):
web.header("Refresh", "%i ; url=%s" %
return func(self, name)
return deco
def error_page(error):
web.header("Content-Type", "text/html; charset=utf-8")
web.header("Cache-Control", "no-cache, must-revalidate")
print render.error(error)
torrent_keys = ['distributed_copies', 'download_payload_rate',
'download_rate', 'eta', 'is_seed', 'message', 'name', 'next_announce',
'num_files', 'num_peers', 'num_pieces', 'num_seeds', 'paused',
'piece_length','progress', 'ratio', 'total_done', 'total_download',
'total_payload_download', 'total_payload_upload', 'total_peers',
'total_seeds', 'total_size', 'total_upload', 'total_wanted',
'tracker_status', 'upload_payload_rate', 'upload_rate',
def get_torrent_status(torrent_id):
helper method.
enhance proxy.get_torrent_status with some extra data
status = proxy.get_torrent_status(torrent_id,torrent_keys)
status["id"] = torrent_id
#for naming the status-images
status["calc_state_str"] = "downloading"
if status["paused"]:
status["calc_state_str"] = "inactive"
elif status["is_seed"]:
status["calc_state_str"] = "seeding"
#action for torrent_pause
if status["calc_state_str"] == "inactive":
status["action"] = "start"
status["action"] = "stop"
#add some pre-calculated values
"calc_total_downloaded" : (common.fsize(status["total_done"])
+ " (" + common.fsize(status["total_download"]) + ")"),
"calc_total_uploaded": (common.fsize(status['uploaded_memory']
+ status["total_payload_upload"]) + " ("
+ common.fsize(status["total_upload"]) + ")"),
return Storage(status) #Storage for easy templating.
def template_crop(text, end):
if len(text) > end:
return text[0:end - 3] + '...'
return text
def sort_head(id,name):
#got tired of doing these complex things inside templetor..
vars = web.input(sort=None, order=None)
active_up = False
active_down = False
order = 'down'
if vars.sort == id:
if vars.order == 'down':
order = 'up'
active_down = True
active_up = True
return render.sort_column_head(id, name, order, active_up, active_down)
render = template.render('templates/%s/' % proxy.get_webui_config('template'))
template.Template.globals['crop'] = template_crop
template.Template.globals['fspeed'] = common.fspeed
template.Template.globals['fsize'] = common.fsize
template.Template.globals['sorted'] = sorted
template.Template.globals['_'] = _ #gettext/translations
template.Template.globals['deluge_web_version'] = ('rev.'
+ open(os.path.join(os.path.dirname(__file__),'revno')).read())
template.Template.globals['render'] = render #for easy resuse of templates
template.Template.globals['sort_head'] = sort_head
template.Template.globals['get_config'] = proxy.get_webui_config
template.Template.globals['self_url'] = web.changequery
urls = (
"/login(.*)", "login",
"/index(.*)", "index",
"/torrent/info/(.*)", "torrent_info",
"/torrent/pause(.*)", "torrent_pause",
"/torrent/add(.*)", "torrent_add",
"/torrent/delete/(.*)", "torrent_delete",
"/pause_all(.*)", "pause_all",
"/resume_all(.*)", "resume_all",
"/refresh/set(.*)", "refresh_set",
"/refresh/(.*)", "refresh",
"/home(.*)", "home",
"/", "login",
"", "login"
class login:
def GET(self, name):
return render.login()
def POST(self, name):
vars = web.input(var = None, pwd = None)
if (vars.user == proxy.get_webui_config('user')
and vars.pwd == proxy.get_webui_config('pwd')):
#start new session
session_id = str(random.random())
SESSIONS[session_id] = {"user":vars.user}
setcookie("session_id", session_id)
error_page(_("Username or Password is invalid."))
class home:
def GET(self, name):
class index:
"page containing the torrent list."
def GET(self, name):
vars = web.input(sort=None, order=None)
status_rows = [get_torrent_status(torrent_id)
for torrent_id in proxy.get_torrent_state()]
if vars.sort:
if vars.order == 'up':
status_rows = reversed(status_rows)
setcookie("order", vars.order)
setcookie("sort", vars.sort)
return render.index(status_rows)
class torrent_info:
"torrent details"
def GET(self, torrent_id):
return render.torrent_info(get_torrent_status(torrent_id))
class torrent_pause:
"start/stop a torrent"
def POST(self, name):
vars = web.input(stop = None, start = None, redir = None)
if vars.stop:
elif vars.start:
class torrent_add:
def GET(self, name):
return render.torrent_add()
def POST(self, name):
vars = web.input(url = None, torrent = {})
if vars.url and vars.torrent.filename:
error_page(_("Choose an url or a torrent, not both."))
if vars.url:
elif vars.torrent.filename:
data = vars.torrent.file.read()
data_b64 = base64.b64encode(data)
#b64 because of strange bug-reports related to binary data
error_page(_("no data."))
class torrent_delete:
def GET(self, torrent_id):
return render.torrent_delete(get_torrent_status(torrent_id))
def POST(self, name):
torrent_id = name
vars = web.input(data_also = None, torrent_also = None)
data_also = bool(vars.data_also)
torrent_also = bool(vars.torrent_also)
proxy.remove_torrent(torrent_id, data_also, torrent_also)
class pause_all:
def POST(self, name):
for torrent_id in proxy.get_torrent_state():
class resume_all:
def POST(self, name):
for torrent_id in proxy.get_torrent_state():
class refresh:
def POST(self, name):
auto_refresh = {'off':False, 'on':True}[name]
proxy.set_webui_config('auto_refresh', auto_refresh)
class refresh_set:
def GET(self, name):
return render.refresh_form()
def POST(self, name):
vars = web.input(refresh = 0)
refresh = int(vars.refresh)
if refresh > 0:
proxy.set_webui_config('refresh', refresh)
proxy.set_webui_config('auto_refresh', True)
error_page(_('refresh must be > 0'))
if __name__ == "__main__":
web.run(urls, globals())

@ -0,0 +1

View File

@ -0,0 +1,3 @@
#!/usr/bin/env python
from deluge_webserver import *
web.run(urls, globals())

@ -0,0 +1,36 @@
/* ----------------------------------------------------------- Theme Name: Simple Theme URI: http://deluge-torrent.org Description: Deluge Theme Version: 1.0 ----------------------------------------------------------- */ BODY { background: #304663 url(images/simple_bg.jpg) repeat-x; font-family: trebuchet ms; font-size: 10pt; margin: 0; } /* GENERIC STYLES */ a img {border: 0px} hr {color: #627082; margin: 15px 0 15px 0;} /* STRUCTURE */ #page { min-width: 800px; margin-left: auto; margin-right: auto; } #main_content { background:url(images/simple_line.jpg) repeat-x; } #simple_logo { background:url(images/simple_logo.jpg) no-repeat; } #main { padding-top: 20px; padding-left: 20px; color: #fff; } #main form table { border: #2a425c 1px solid; } #main form table tr { border: 0px; } #main form table tr th { background: #1f3044; font-size: 16px; border: 0px;
white-space: nowrap; } #main form table tr td{ border: 0px; color: #fff; font-size: 12px; white-space: nowrap; } #main form table tr th a { color: #8fa6c3; font-size: 16px; white-space: nowrap; } #main form table tr th a, a:active, a:visited { color: #8fa6c3; text-decoration: none; } #main form table tr th a:hover {color: #fff; text-decoration: underline;} #main form table tr td a { color: #fff; font-size: 12px; white-space: nowrap; } #main form table tr td a, a:active, a:visited { color: #fff; text-decoration: none;} #main form table tr td a:hover {color: #fff; text-decoration: underline;} #main a { color: #fff; font-size: 12px; } #main a, a:active, a:visited { color: #fff; text-decoration: none;} #main a:hover {color: #fff; text-decoration: underline;} .info { text-align: right; padding: 0 50px 0 0; color: #8fa6c3; font-size: 16px; letter-spacing: 4px; font-weight: bold; } .title { color: #dce4ee; font-size: 32px; padding: 10px 50px 0 0; text-align: right; } .title a, a:active, a:visited { color: #dce4ee; text-decoration: none;} .title a:hover {color: #fff; text-decoration: underline;} #button { border:1px solid #23344b; background: #99acc3; color: #000; font-family:verdana, arial, helvetica, sans-serif; font-size:10px; margin-top:5px; } INPUT{ border:1px solid #23344b; background: #99acc3; color: #000; } TEXTAREA{ border:1px solid #23344b; background: #99acc3; width:480px; } .footertext a { color: #c0c0c0; text-decoration:none;} .footertext a:visited { color: #c0c0c0; text-decoration:none;} .footertext a:active { color: #c0c0c0; text-decoration:none;} .footertext a:hover {color: #fff; text-decoration: underline;} .footertext { text-align: center; padding: 60px 0 0 0; font-size: 8pt; left: -100px; font-family: trebuchet MS; color: #fff; position: relative; } .clearfix:after { content: "."; display: block; height: 0; clear: both; visibility: hidden; } div.progress_bar{ background-color:#4573a5; /*color:blue;*/ -moz-border-radius:5px; /*ff only setting*/ }
div.progress_bar_outer { /*used in table-view*/
td.progress_bar { white-space: nowrap; } td.info_label { font-weight: bold; } td { font-size: 10pt; color: #d1dae5; white-space: nowrap; } tr { font-size: 10pt; color: #d1dae5; }
div.panel {
background-color: #37506f;
-moz-border-radius:10px; /*ff-only!*/
/*New styles:*/
div.deluge_button {
form.deluge_button {
button.deluge_button {
background-color: #37506f;
border:1px solid #23344b;
background: #99acc3;
color: #000;
/* Hides from IE-mac \*/ * html .clearfix {height: 1%;} .clearfix {display: block;} /* End hide from IE-mac */

View File

@ -0,0 +1,5 @@
-first layout taken from deluge website
improved by:

View File

@ -0,0 +1,6 @
$def with (error_msg)
<pre class="error">

View File

@ -0,0 +1,6 @@

View File

@ -0,0 +1,25 @@
$def with (title)
<!DOCTYPE HTML PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN" "http://www.w3.org/TR/1999/REC-html401-19991224/loose.dtd">
<meta http-equiv="Content-Type" content="text/html; charset=utf-8" />
<link rel="icon" href="/static/images/deluge_icon.gif" type="image/gif" />
<link rel="shortcut icon" href="/static/images/deluge_icon.gif" type="image/gif" />
<link rel="stylesheet" type="text/css" href="/static/simple_site_style.css" />
<div id="page">
<a href='/home'>
<div id="simple_logo">
<div class="title">Deluge</div>
<div class="info">$title</div>
<div id="main_content">
<div id="main">

View File

@ -0,0 +1,58 @@
$def with (torrent_list)
$:render.header(_('Torrent list'))
<form action="/torrent/pause" method="POST">
<table class="torrent_list" border=1>
$:(sort_head('calc_state_str', 'S'))
$:(sort_head('id', '#'))
$:(sort_head('name', _('Name')))
$:(sort_head('total_size', _('Size')))
$:(sort_head('progress', _('Progress')))
$:(sort_head('num_seeds', _('Seeders')))
$:(sort_head('num_peers', _('Peers')))
$:(sort_head('download_rate', _('Download')))
$:(sort_head('upload_rate', _('Upload')))
$:(sort_head('eta', _('Eta')))
$:(sort_head('distributed_copies', _('Ava')))
$:(sort_head('ratio', _('Ratio')))
$#4-space indentation is mandatory for for-loops in templetor!
$for torrent in torrent_list:
<td><input type="image"
name="$torrent.action" value="$torrent.id">
<td style="width:100px; overflow:hidden;white-space: nowrap">
<a href="/torrent/info/$torrent.id">$(crop(torrent.name, 40))</a></td>
<td class="progress_bar">
<div class="progress_bar_outer">
<div class="progress_bar" style="width:$(torrent.progress)%">
<td>$torrent.num_seeds ($torrent.total_seeds)</td>
<td>$torrent.num_peers ($torrent.total_peers)</td>
<td>$("%.3f" % torrent.distributed_copies)</td>
<td>$("%.3f" % torrent.ratio)</td\>
<div class="panel">
$:render.part_button('GET', '/torrent/add', _('Add torrent'), 'tango/list-add.png')
$:render.part_button('POST', '/pause_all', _('Pause all'), 'tango/media-playback-pause.png')
$:render.part_button('POST', '/resume_all', _('Resume all'), 'tango/media-playback-start.png')

View File

@ -0,0 +1,21 @
<div class="panel">
<form method="POST" id="loginform" action='/login'>
<div id="loginpanel">
<div class="form_row">
<span class="form_label">$_('User')</span>
<input type="text" name="user" id="user" class="form_input">
<div class="form_row">
<span class="form_label">$_('Pass')</span>
<input type="password" name="pwd" id="pwd" class="form_input">
<div class="form_row">
<span class="form_label"></span>
<input type="submit" name="submit"
id="submit" value="Submit" class="form_input">

View File

@ -0,0 +1,13 @@
$def with (method, url, title, image='')
<div class="deluge_button">
<form method="$method" action="$url" class="deluge_button">
<button type="submit" class="deluge_button">
<input type="hidden" name="redir" value="$self_url()">
$if image:
<image src="/static/images/$image" class="button"/>
<!--no image-->

View File

@ -0,0 +1,13 @
<div class="panel">
<div id='refresh'>
$_('Auto refresh:')
$if get_config('auto_refresh'):
($(get_config('refresh')) $_('seconds')) &nbsp;
$:render.part_button('GET', '/refresh/set', _('Set'), 'tango/preferences-system.png')
$:render.part_button('POST', '/refresh/off', _('Disable'), 'tango/process-stop.png')
$_('Off') &nbsp;
$:render.part_button('POST', '/refresh/on', _('Enable'), 'tango/view-refresh.png')

View File

@ -0,0 +1,10 @@
$:render.header(_('Set Timeout'))
<div class="panel">
<form action="/refresh/set" method="POST">
$_('Refresh page every:')
<input type="text" name="refresh" value="$get_config('refresh')" size="3">
<input type="submit" value="$_('Set')">

View File

@ -0,0 +1,12 @@
$def with (column_id, column_name, order, active_up, active_down)
<a href="/index?sort=$column_id&order=$order">
$if active_up:
<img src="/static/images/tango/up.png" />
$if active_down:
<img src="/static/images/tango/down.png" />

View File

@ -0,0 +1,21 @
$:render.header(_("Add Torrent"))
<div class="panel">
<form method="POST" action="/torrent/add" ENCTYPE="multipart/form-data">
<div id="torrent_add">
<div class="form_row">
<span class="form_label">$_('Url')</span>
<input type="text" name="url" class="form_input" size=60>
<div class="form_row">
<span class="form_label">$_('Upload torrent')</span>
<input type="file" name="torrent" class="form_input" size=50>
<div class="form_row">
<span class="form_label"></span>
<input type="submit" name="submit"
value="$_('Submit')" class="form_input">

View File

@ -0,0 +1,26 @@
$def with (torrent)
$:render.header(_("Remove %s ") % torrent.name)
<div class="panel">
<form method="POST" action='/torrent/delete/$torrent.id'>
<div id="del_torrent">
$(_("Remove %s?") % torrent.name)
<div class="form_row2">
<span class="form_label2">
<input type="checkbox" name="torrent_also" class="form_input" checked
value="1">$_('Delete .torrent file')</span>
<div class="form_row2">
<span class="form_label2">
<input type="checkbox" name="data_also" class="form_input"
value="1">$_('Delete downloaded files.')</span>
<div class="form_row2">
<span class="form_label2"></span>
<input type="submit" name="submit"
value="$_('Remove')" class="form_input">

View File

@ -0,0 +1,118 @@
$def with (torrent)
$:(render.header(torrent.message + '/' + torrent.name))
<div class="panel">
<table width="100%"><tr>
<td colspan=3 style="background-color:#999;-moz-border-radius:5px;">
<div class="progress_bar" style="width:$torrent.progress%;
$torrent.progress %</div>
</tr><td width=30%%>
<tr><td class="info_label">$_('Downloaded'):</td>
<td class="info_value">$torrent.calc_total_downloaded</td></tr>
<tr><td class="info_label">$_('Uploaded'):</td>
<td class="info_value">$torrent.calc_total_uploaded</td>
<tr><td class="info_label">$_('Seeders'):</td>
<td class="info_value">$torrent.num_seeds ($torrent.total_seeds )</td></tr>
<tr><td class="info_label">$_('Share Ratio'):</td>
<td class="info_value">$("%.3f" % torrent.ratio)</td></tr>
<tr><td class="info_label">$_('Pieces'):</td>
<td class="info_value">$torrent.num_pieces x $fsize(torrent.piece_length) </td>
</td><td width=30%%>
<tr><td class="info_label">$_('Speed'):</td><td class="info_value">
<tr><td class="info_label">$_('Speed'):</td>
<td class="info_value">$fspeed(torrent.upload_rate)</td></tr>
<tr><td class="info_label">$_('Peers'):</td>
<td class="info_value">$torrent.num_peers ($torrent.total_peers )</td></tr>
<tr><td class="info_label">$_('ETA'):</td>
<td class="info_value">$torrent.eta </td></tr>
<tr><td class="info_label">$_('Availability'):</td>
<td class="info_value">$("%.3f" % torrent.distributed_copies)</td></td></tr>
</td><td width=30%%>
<tr><td class="info_label">$_('Total Size'):</td>
<td class="info_value">$fspeed(torrent.total_size)</td></tr>
<tr><td class="info_label">$_('# Of Files'):</td>
<td class="info_value">$torrent.num_files</td></tr>
<tr><td class="info_label">$_('Tracker'):</td>
<td class="info_value">$(crop(torrent.tracker, 30))</td></tr>
<tr><td class="info_label">$_('Tracker Status'):</td>
<td class="info_value">$torrent.tracker_status </td></tr>
<tr><td class="info_label">$_('Next Announce'):</td>
<td class="info_value">$torrent.next_announce </td></tr>
<form action="/torrent/pause?redir=/torrent/info/$torrent.id" method="POST"
<input type="image" src="/static/images/$(torrent.calc_state_str)16.png"
name="$torrent.action" value="$torrent.id">
<div class="deluge_button">
<form method="GET" action="/torrent/delete/$torrent.id" class="deluge_button">
<button type="submit" class="deluge_button">
<image src="/static/images/tango/user-trash.png" /> $_('Delete')
[<a onclick="javascript:toggle_dump()">$_('Debug:Data Dump')</a>]
<pre style="background-color:white;color:black;display:none" id="data_dump">
$for key in sorted(torrent):
$key : $torrent[key]
<script language="javascript">
function toggle_dump(){
el = document.getElementById("data_dump");
if (el.style.display == "block"){
el.style.display = "none";
el.style.display = "block";

@ -0,0 +1,27 @@
Just copy and rename an existing template.
-The settings panel will see all directory's in this folder ,and let you choose your new template.
-Clicking Ok in the settings panel will restart the webserver and reload your template.
Please configure your editor to use 4-space indents instead of tabs.
Or use scite and my config: http://mvoncken.sohosted.com/deluge/SciTEUser.properties.txt
template language: http://webpy.org/templetor
Exposed methods and variables (c&p from deluge_webserver):
template.Template.globals['crop'] = template_crop
template.Template.globals['fspeed'] = common.fspeed
template.Template.globals['fsize'] = common.fsize
template.Template.globals['sorted'] = sorted
template.Template.globals['_'] = _ #gettext/translations
template.Template.globals['deluge_web_version'] = '0.3alfa'
template.Template.globals['render'] = render #for easy resuse of templates
template.Template.globals['sort_head'] = sort_head
template.Template.globals['get_config'] = proxy.get_webui_config
I will update this file if there is interest in making templates.

@ -0,0 +1,5 @@
revision-id: mvoncken@gmail.com-20070923193628-omqar5yaxatdmfu0
date: 2007-09-23 21:36:28 +0200
build-date: 2007-09-23 21:54:09 +0200
revno: 30
branch-nick: WebUi

@ -0,0 +1

@ -0,0 +1,62 @@
#!/usr/bin/env python
from __future__ import generators
"""web.py: makes web apps (http://webpy.org)"""
__version__ = "0.22"
__revision__ = "$Rev: 183 $"
__author__ = "Aaron Swartz <me@aaronsw.com>"
__license__ = "public domain"
__contributors__ = "see http://webpy.org/changes"
# todo:
# - some sort of accounts system
import utils, db, net, wsgi, http, webapi, request, httpserver, debugerror
import template, form
from utils import *
from db import *
from net import *
from wsgi import *
from http import *
from webapi import *
from request import *
from httpserver import *
from debugerror import *
import cheetah
from cheetah import *
except ImportError:
def main():
import doctest
except NameError:
import sys
urls = ('/web.py', 'source')
class source:
def GET(self):
header('Content-Type', 'text/python')
print open(sys.argv[0]).read()
if listget(sys.argv, 1) != 'test':
run(urls, locals())
if __name__ == "__main__": main()

@ -0,0 +1,6 @@
Commented out some code to enable a relative redirect.
This is not according to HTTP/1.1 Spec
But many deluge users will want to route the webui through firewalls/routers or use apache redirects.

@ -0,0 +1,98 @@
Cheetah API
(from web.py)
__all__ = ["render"]
import re, urlparse, pprint, traceback, sys
from Cheetah.Compiler import Compiler
from Cheetah.Filters import Filter
from utils import re_compile, memoize, dictadd
from net import htmlquote, websafe
from webapi import ctx, header, output, input, cookies, loadhooks
def upvars(level=2):
"""Guido van Rossum sez: don't use this function."""
return dictadd(
r_include = re_compile(r'(?!\\)#include \"(.*?)\"($|#)', re.M)
def __compiletemplate(template, base=None, isString=False):
if isString:
text = template
text = open('templates/'+template).read()
# implement #include at compile-time
def do_include(match):
text = open('templates/'+match.groups()[0]).read()
return text
while r_include.findall(text):
text = r_include.sub(do_include, text)
execspace = _compiletemplate.bases.copy()
tmpl_compiler = Compiler(source=text, mainClassName='GenTemplate')
exec str(tmpl_compiler) in execspace
if base:
_compiletemplate.bases[base] = execspace['GenTemplate']
return execspace['GenTemplate']
_compiletemplate = memoize(__compiletemplate)
_compiletemplate.bases = {}
def render(template, terms=None, asTemplate=False, base=None,
Renders a template, caching where it can.
`template` is the name of a file containing the a template in
the `templates/` folder, unless `isString`, in which case it's the
template itself.
`terms` is a dictionary used to fill the template. If it's None, then
the caller's local variables are used instead, plus context, if it's not
already set, is set to `context`.
If asTemplate is False, it `output`s the template directly. Otherwise,
it returns the template object.
If the template is a potential base template (that is, something other templates)
can extend, then base should be a string with the name of the template. The
template will be cached and made available for future calls to `render`.
Requires [Cheetah](http://cheetahtemplate.org/).
# terms=['var1', 'var2'] means grab those variables
if isinstance(terms, list):
new = {}
old = upvars()
for k in terms:
new[k] = old[k]
terms = new
# default: grab all locals
elif terms is None:
terms = {'context': ctx, 'ctx':ctx}
# terms=d means use d as the searchList
if not isinstance(terms, tuple):
terms = (terms,)
if 'headers' in ctx and not isString and template.endswith('.html'):
header('Content-Type','text/html; charset=utf-8', unique=True)
if loadhooks.has_key('reloader'):
compiled_tmpl = __compiletemplate(template, base=base, isString=isString)
compiled_tmpl = _compiletemplate(template, base=base, isString=isString)
compiled_tmpl = compiled_tmpl(searchList=terms, filter=WebSafe)
if asTemplate:
return compiled_tmpl
return output(str(compiled_tmpl))
class WebSafe(Filter):
def filter(self, val, **keywords):
return websafe(val)

@ -0,0 +1,703 @@
Database API
(part of web.py)
# todo:
# - test with sqlite
# - a store function?
__all__ = [
"UnknownParamstyle", "UnknownDB",
"sqllist", "sqlors", "aparam", "reparam",
"SQLQuery", "sqlquote",
"SQLLiteral", "sqlliteral",
"TransactionError", "transaction", "transact", "commit", "rollback",
"select", "insert", "update", "delete"
import time
try: import datetime
except ImportError: datetime = None
from utils import storage, iters, iterbetter
import webapi as web
from DBUtils import PooledDB
web.config._hasPooling = True
except ImportError:
web.config._hasPooling = False
class _ItplError(ValueError):
def __init__(self, text, pos):
self.text = text
self.pos = pos
def __str__(self):
return "unfinished expression in %s at char %d" % (
repr(self.text), self.pos)
def _interpolate(format):
Takes a format string and returns a list of 2-tuples of the form
(boolean, string) where boolean says whether string should be evaled
or not.
from <http://lfw.org/python/Itpl.py> (public domain, Ka-Ping Yee)
from tokenize import tokenprog
def matchorfail(text, pos):
match = tokenprog.match(text, pos)
if match is None:
raise _ItplError(text, pos)
return match, match.end()
namechars = "abcdefghijklmnopqrstuvwxyz" \
chunks = []
pos = 0
while 1:
dollar = format.find("$", pos)
if dollar < 0:
nextchar = format[dollar + 1]
if nextchar == "{":
chunks.append((0, format[pos:dollar]))
pos, level = dollar + 2, 1
while level:
match, pos = matchorfail(format, pos)
tstart, tend = match.regs[3]
token = format[tstart:tend]
if token == "{":
level = level + 1
elif token == "}":
level = level - 1
chunks.append((1, format[dollar + 2:pos - 1]))
elif nextchar in namechars:
chunks.append((0, format[pos:dollar]))
match, pos = matchorfail(format, dollar + 1)
while pos < len(format):
if format[pos] == "." and \
pos + 1 < len(format) and format[pos + 1] in namechars:
match, pos = matchorfail(format, pos + 1)
elif format[pos] in "([":
pos, level = pos + 1, 1
while level:
match, pos = matchorfail(format, pos)
tstart, tend = match.regs[3]
token = format[tstart:tend]
if token[0] in "([":
level = level + 1
elif token[0] in ")]":
level = level - 1
chunks.append((1, format[dollar + 1:pos]))
chunks.append((0, format[pos:dollar + 1]))
pos = dollar + 1 + (nextchar == "$")
if pos < len(format):
chunks.append((0, format[pos:]))
return chunks
class UnknownParamstyle(Exception):
raised for unsupported db paramstyles
(currently supported: qmark, numeric, format, pyformat)
def aparam():
Returns the appropriate string to be used to interpolate
a value with the current `web.ctx.db_module` or simply %s
if there isn't one.
>>> aparam()
if hasattr(web.ctx, 'db_module'):
style = web.ctx.db_module.paramstyle
style = 'pyformat'
if style == 'qmark':
return '?'
elif style == 'numeric':
return ':1'
elif style in ['format', 'pyformat']:
return '%s'
raise UnknownParamstyle, style
def reparam(string_, dictionary):
Takes a string and a dictionary and interpolates the string
using values from the dictionary. Returns an `SQLQuery` for the result.
>>> reparam("s = $s", dict(s=True))
<sql: "s = 't'">
vals = []
result = []
for live, chunk in _interpolate(string_):
if live:
vals.append(eval(chunk, dictionary))
else: result.append(chunk)
return SQLQuery(''.join(result), vals)
def sqlify(obj):
converts `obj` to its proper SQL version
>>> sqlify(None)
>>> sqlify(True)
>>> sqlify(3)
# because `1 == True and hash(1) == hash(True)`
# we have to do this the hard way...
if obj is None:
return 'NULL'
elif obj is True:
return "'t'"
elif obj is False:
return "'f'"
elif datetime and isinstance(obj, datetime.datetime):
return repr(obj.isoformat())
return repr(obj)
class SQLQuery:
You can pass this sort of thing as a clause in any db function.
Otherwise, you can pass a dictionary to the keyword argument `vars`
and the function will call reparam for you.
# tested in sqlquote's docstring
def __init__(self, s='', v=()):
self.s, self.v = str(s), tuple(v)
def __getitem__(self, key): # for backwards-compatibility
return [self.s, self.v][key]
def __add__(self, other):
if isinstance(other, str):
self.s += other
elif isinstance(other, SQLQuery):
self.s += other.s
self.v += other.v
return self
def __radd__(self, other):
if isinstance(other, str):
self.s = other + self.s
return self
return NotImplemented
def __str__(self):
return self.s % tuple([sqlify(x) for x in self.v])
except (ValueError, TypeError):
return self.s
def __repr__(self):
return '<sql: %s>' % repr(str(self))
class SQLLiteral:
Protects a string from `sqlquote`.
>>> insert('foo', time=SQLLiteral('NOW()'), _test=True)
<sql: 'INSERT INTO foo (time) VALUES (NOW())'>
def __init__(self, v):
self.v = v
def __repr__(self):
return self.v
sqlliteral = SQLLiteral
def sqlquote(a):
Ensures `a` is quoted properly for use in a SQL query.
>>> 'WHERE x = ' + sqlquote(True) + ' AND y = ' + sqlquote(3)
<sql: "WHERE x = 't' AND y = 3">
return SQLQuery(aparam(), (a,))
class UnknownDB(Exception):
"""raised for unsupported dbms"""
def connect(dbn, **keywords):
Connects to the specified database.
`dbn` currently must be "postgres", "mysql", or "sqlite".
If DBUtils is installed, connection pooling will be used.
if dbn == "postgres":
import psycopg2 as db
except ImportError:
import psycopg as db
except ImportError:
import pgdb as db
if 'pw' in keywords:
keywords['password'] = keywords['pw']
del keywords['pw']
keywords['database'] = keywords['db']
del keywords['db']
elif dbn == "mysql":
import MySQLdb as db
if 'pw' in keywords:
keywords['passwd'] = keywords['pw']
del keywords['pw']
db.paramstyle = 'pyformat' # it's both, like psycopg
elif dbn == "sqlite":
import sqlite3 as db
db.paramstyle = 'qmark'
except ImportError:
from pysqlite2 import dbapi2 as db
db.paramstyle = 'qmark'
except ImportError:
import sqlite as db
web.config._hasPooling = False
keywords['database'] = keywords['db']
del keywords['db']
elif dbn == "firebird":
import kinterbasdb as db
if 'pw' in keywords:
keywords['passwd'] = keywords['pw']
del keywords['pw']
keywords['database'] = keywords['db']
del keywords['db']
raise UnknownDB, dbn
web.ctx.db_name = dbn
web.ctx.db_module = db
web.ctx.db_transaction = 0
web.ctx.db = keywords
def _PooledDB(db, keywords):
# In DBUtils 0.9.3, `dbapi` argument is renamed as `creator`
# see Bug#122112
if PooledDB.__version__.split('.') < '0.9.3'.split('.'):
return PooledDB.PooledDB(dbapi=db, **keywords)
return PooledDB.PooledDB(creator=db, **keywords)
def db_cursor():
if isinstance(web.ctx.db, dict):
keywords = web.ctx.db
if web.config._hasPooling:
if 'db' not in globals():
globals()['db'] = _PooledDB(db, keywords)
web.ctx.db = globals()['db'].connection()
web.ctx.db = db.connect(**keywords)
return web.ctx.db.cursor()
web.ctx.db_cursor = db_cursor
web.ctx.dbq_count = 0
def db_execute(cur, sql_query, dorollback=True):
"""executes an sql query"""
web.ctx.dbq_count += 1
a = time.time()
out = cur.execute(sql_query.s, sql_query.v)
b = time.time()
if web.config.get('db_printing'):
print >> web.debug, 'ERR:', str(sql_query)
if dorollback: rollback(care=False)
if web.config.get('db_printing'):
print >> web.debug, '%s (%s): %s' % (round(b-a, 2), web.ctx.dbq_count, str(sql_query))
return out
web.ctx.db_execute = db_execute
return web.ctx.db
class TransactionError(Exception): pass
class transaction:
A context that can be used in conjunction with "with" statements
to implement SQL transactions. Starts a transaction on enter,
rolls it back if there's an error; otherwise it commits it at the
def __enter__(self):
def __exit__(self, exctype, excvalue, traceback):
if exctype is not None:
def transact():
"""Start a transaction."""
if not web.ctx.db_transaction:
# commit everything up to now, so we don't rollback it later
if hasattr(web.ctx.db, 'commit'):
db_cursor = web.ctx.db_cursor()
SQLQuery("SAVEPOINT webpy_sp_%s" % web.ctx.db_transaction))
web.ctx.db_transaction += 1
def commit():
"""Commits a transaction."""
web.ctx.db_transaction -= 1
if web.ctx.db_transaction < 0:
raise TransactionError, "not in a transaction"
if not web.ctx.db_transaction:
if hasattr(web.ctx.db, 'commit'):
db_cursor = web.ctx.db_cursor()
SQLQuery("RELEASE SAVEPOINT webpy_sp_%s" % web.ctx.db_transaction))
def rollback(care=True):
"""Rolls back a transaction."""
web.ctx.db_transaction -= 1
if web.ctx.db_transaction < 0:
web.db_transaction = 0
if care:
raise TransactionError, "not in a transaction"
if not web.ctx.db_transaction:
if hasattr(web.ctx.db, 'rollback'):
db_cursor = web.ctx.db_cursor()
SQLQuery("ROLLBACK TO SAVEPOINT webpy_sp_%s" % web.ctx.db_transaction),
def query(sql_query, vars=None, processed=False, _test=False):
Execute SQL query `sql_query` using dictionary `vars` to interpolate it.
If `processed=True`, `vars` is a `reparam`-style list to use
instead of interpolating.
>>> query("SELECT * FROM foo", _test=True)
<sql: 'SELECT * FROM foo'>
>>> query("SELECT * FROM foo WHERE x = $x", vars=dict(x='f'), _test=True)
<sql: "SELECT * FROM foo WHERE x = 'f'">
>>> query("SELECT * FROM foo WHERE x = " + sqlquote('f'), _test=True)
<sql: "SELECT * FROM foo WHERE x = 'f'">
if vars is None: vars = {}
if not processed and not isinstance(sql_query, SQLQuery):
sql_query = reparam(sql_query, vars)
if _test: return sql_query
db_cursor = web.ctx.db_cursor()
web.ctx.db_execute(db_cursor, sql_query)
if db_cursor.description:
names = [x[0] for x in db_cursor.description]
def iterwrapper():
row = db_cursor.fetchone()
while row:
yield storage(dict(zip(names, row)))
row = db_cursor.fetchone()
out = iterbetter(iterwrapper())
if web.ctx.db_name != "sqlite":
out.__len__ = lambda: int(db_cursor.rowcount)
out.list = lambda: [storage(dict(zip(names, x))) \
for x in db_cursor.fetchall()]
out = db_cursor.rowcount
if not web.ctx.db_transaction: web.ctx.db.commit()
return out
def sqllist(lst):
Converts the arguments for use in something like a WHERE clause.
>>> sqllist(['a', 'b'])
'a, b'
>>> sqllist('a')
if isinstance(lst, str):
return lst
return ', '.join(lst)
def sqlors(left, lst):
`left is a SQL clause like `tablename.arg = `
and `lst` is a list of values. Returns a reparam-style
pair featuring the SQL that ORs together the clause
for each item in the lst.
>>> sqlors('foo = ', [])
<sql: '2+2=5'>
>>> sqlors('foo = ', [1])
<sql: 'foo = 1'>
>>> sqlors('foo = ', 1)
<sql: 'foo = 1'>
>>> sqlors('foo = ', [1,2,3])
<sql: '(foo = 1 OR foo = 2 OR foo = 3)'>
if isinstance(lst, iters):
lst = list(lst)
ln = len(lst)
if ln == 0:
return SQLQuery("2+2=5", [])
if ln == 1:
lst = lst[0]
if isinstance(lst, iters):
return SQLQuery('(' + left +
(' OR ' + left).join([aparam() for param in lst]) + ")", lst)
return SQLQuery(left + aparam(), [lst])
def sqlwhere(dictionary, grouping=' AND '):
Converts a `dictionary` to an SQL WHERE clause `SQLQuery`.
>>> sqlwhere({'cust_id': 2, 'order_id':3})
<sql: 'order_id = 3 AND cust_id = 2'>
>>> sqlwhere({'cust_id': 2, 'order_id':3}, grouping=', ')
<sql: 'order_id = 3, cust_id = 2'>
return SQLQuery(grouping.join([
'%s = %s' % (k, aparam()) for k in dictionary.keys()
]), dictionary.values())
def select(tables, vars=None, what='*', where=None, order=None, group=None,
limit=None, offset=None, _test=False):
Selects `what` from `tables` with clauses `where`, `order`,
`group`, `limit`, and `offset`. Uses vars to interpolate.
Otherwise, each clause can be a SQLQuery.
>>> select('foo', _test=True)
<sql: 'SELECT * FROM foo'>
>>> select(['foo', 'bar'], where="foo.bar_id = bar.id", limit=5, _test=True)
<sql: 'SELECT * FROM foo, bar WHERE foo.bar_id = bar.id LIMIT 5'>
if vars is None: vars = {}
qout = ""
def gen_clause(sql, val):
if isinstance(val, (int, long)):
if sql == 'WHERE':
nout = 'id = ' + sqlquote(val)
nout = SQLQuery(val)
elif isinstance(val, (list, tuple)) and len(val) == 2:
nout = SQLQuery(val[0], val[1]) # backwards-compatibility
elif isinstance(val, SQLQuery):
nout = val
elif val:
nout = reparam(val, vars)
return ""
out = ""
if qout: out += " "
out += sql + " " + nout
return out
if web.ctx.get('db_name') == "firebird":
for (sql, val) in (
('FIRST', limit),
('SKIP', offset)
qout += gen_clause(sql, val)
if qout:
SELECT = 'SELECT ' + qout
qout = ""
sql_clauses = (
(SELECT, what),
('FROM', sqllist(tables)),
('WHERE', where),
('GROUP BY', group),
('ORDER BY', order)
sql_clauses = (
('SELECT', what),
('FROM', sqllist(tables)),
('WHERE', where),
('GROUP BY', group),
('ORDER BY', order),
('LIMIT', limit),
('OFFSET', offset)
for (sql, val) in sql_clauses:
qout += gen_clause(sql, val)
if _test: return qout
return query(qout, processed=True)
def insert(tablename, seqname=None, _test=False, **values):
Inserts `values` into `tablename`. Returns current sequence ID.
Set `seqname` to the ID if it's not the default, or to `False`
if there isn't one.
>>> insert('foo', joe='bob', a=2, _test=True)
<sql: "INSERT INTO foo (a, joe) VALUES (2, 'bob')">
if values:
sql_query = SQLQuery("INSERT INTO %s (%s) VALUES (%s)" % (
", ".join(values.keys()),
', '.join([aparam() for x in values])
), values.values())
sql_query = SQLQuery("INSERT INTO %s DEFAULT VALUES" % tablename)
if _test: return sql_query
db_cursor = web.ctx.db_cursor()
if seqname is False:
elif web.ctx.db_name == "postgres":
if seqname is None:
seqname = tablename + "_id_seq"
sql_query += "; SELECT currval('%s')" % seqname
elif web.ctx.db_name == "mysql":
web.ctx.db_execute(db_cursor, sql_query)
sql_query = SQLQuery("SELECT last_insert_id()")
elif web.ctx.db_name == "sqlite":
web.ctx.db_execute(db_cursor, sql_query)
# not really the same...
sql_query = SQLQuery("SELECT last_insert_rowid()")
web.ctx.db_execute(db_cursor, sql_query)
out = db_cursor.fetchone()[0]
except Exception:
out = None
if not web.ctx.db_transaction: web.ctx.db.commit()
return out
def update(tables, where, vars=None, _test=False, **values):
Update `tables` with clause `where` (interpolated using `vars`)
and setting `values`.
>>> joe = 'Joseph'
>>> update('foo', where='name = $joe', name='bob', age=5,
... vars=locals(), _test=True)
<sql: "UPDATE foo SET age = 5, name = 'bob' WHERE name = 'Joseph'">
if vars is None: vars = {}
if isinstance(where, (int, long)):
where = "id = " + sqlquote(where)
elif isinstance(where, (list, tuple)) and len(where) == 2:
where = SQLQuery(where[0], where[1])
elif isinstance(where, SQLQuery):
where = reparam(where, vars)
query = (
"UPDATE " + sqllist(tables) +
" SET " + sqlwhere(values, ', ') +
" WHERE " + where)
if _test: return query
db_cursor = web.ctx.db_cursor()
web.ctx.db_execute(db_cursor, query)
if not web.ctx.db_transaction: web.ctx.db.commit()
return db_cursor.rowcount
def delete(table, where=None, using=None, vars=None, _test=False):
Deletes from `table` with clauses `where` and `using`.
>>> name = 'Joe'
>>> delete('foo', where='name = $name', vars=locals(), _test=True)
<sql: "DELETE FROM foo WHERE name = 'Joe'">
if vars is None: vars = {}
if isinstance(where, (int, long)):
where = "id = " + sqlquote(where)
elif isinstance(where, (list, tuple)) and len(where) == 2:
where = SQLQuery(where[0], where[1])
elif isinstance(where, SQLQuery):
elif where is None:
where = reparam(where, vars)
q = 'DELETE FROM ' + table
if where:
q += ' WHERE ' + where
if using and web.ctx.get('db_name') != "firebird":
q += ' USING ' + sqllist(using)
if _test: return q
db_cursor = web.ctx.db_cursor()
web.ctx.db_execute(db_cursor, q)
if not web.ctx.db_transaction: web.ctx.db.commit()
return db_cursor.rowcount
if __name__ == "__main__":
import doctest

@ -0,0 +1,316 @@
pretty debug errors
(part of web.py)
adapted from Django <djangoproject.com>
Copyright (c) 2005, the Lawrence Journal-World
Used under the modified BSD license:
__all__ = ["debugerror", "djangoerror"]
import sys, urlparse, pprint
from net import websafe
from template import Template
import webapi as web
import os, os.path
whereami = os.path.join(os.getcwd(), __file__)
whereami = os.path.sep.join(whereami.split(os.path.sep)[:-1])
djangoerror_t = """\
$def with (exception_type, exception_value, frames)
<!DOCTYPE HTML PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN" "http://www.w3.org/TR/html4/loose.dtd">
<html lang="en">
<meta http-equiv="content-type" content="text/html; charset=utf-8" />
<meta name="robots" content="NONE,NOARCHIVE" />
<title>$exception_type at $ctx.path</title>
<style type="text/css">
html * { padding:0; margin:0; }
body * { padding:10px 20px; }
body * * { padding:0; }
body { font:small sans-serif; }
body>div { border-bottom:1px solid #ddd; }
h1 { font-weight:normal; }
h2 { margin-bottom:.8em; }
h2 span { font-size:80%; color:#666; font-weight:normal; }
h3 { margin:1em 0 .5em 0; }
h4 { margin:0 0 .5em 0; font-weight: normal; }
table {
border:1px solid #ccc; border-collapse: collapse; background:white; }
tbody td, tbody th { vertical-align:top; padding:2px 3px; }
thead th {
padding:1px 6px 1px 3px; background:#fefefe; text-align:left;
font-weight:normal; font-size:11px; border:1px solid #ddd; }
tbody th { text-align:right; color:#666; padding-right:.5em; }
table.vars { margin:5px 0 2px 40px; }
table.vars td, table.req td { font-family:monospace; }
table td.code { width:100%;}
table td.code div { overflow:hidden; }
table.source th { color:#666; }
table.source td {
font-family:monospace; white-space:pre; border-bottom:1px solid #eee; }
ul.traceback { list-style-type:none; }
ul.traceback li.frame { margin-bottom:1em; }
div.context { margin: 10px 0; }
div.context ol {
padding-left:30px; margin:0 10px; list-style-position: inside; }
div.context ol li {
font-family:monospace; white-space:pre; color:#666; cursor:pointer; }
div.context ol.context-line li { color:black; background-color:#ccc; }
div.context ol.context-line li span { float: right; }
div.commands { margin-left: 40px; }
div.commands a { color:black; text-decoration:none; }
#summary { background: #ffc; }
#summary h2 { font-weight: normal; color: #666; }
#explanation { background:#eee; }
#template, #template-not-exist { background:#f6f6f6; }
#template-not-exist ul { margin: 0 0 0 20px; }
#traceback { background:#eee; }
#requestinfo { background:#f6f6f6; padding-left:120px; }
#summary table { border:none; background:transparent; }
#requestinfo h2, #requestinfo h3 { position:relative; margin-left:-100px; }
#requestinfo h3 { margin-bottom:-1em; }
.error { background: #ffc; }
.specific { color:#cc3300; font-weight:bold; }
<script type="text/javascript">
function getElementsByClassName(oElm, strTagName, strClassName){
// Written by Jonathan Snook, http://www.snook.ca/jon;
// Add-ons by Robert Nyman, http://www.robertnyman.com
var arrElements = (strTagName == "*" && document.all)? document.all :
var arrReturnElements = new Array();
strClassName = strClassName.replace(/\-/g, "\\-");
var oRegExp = new RegExp("(^|\\s)" + strClassName + "(\\s|$)");
var oElement;
for(var i=0; i<arrElements.length; i++){
oElement = arrElements[i];
return (arrReturnElements)
function hideAll(elems) {
for (var e = 0; e < elems.length; e++) {
elems[e].style.display = 'none';
window.onload = function() {
hideAll(getElementsByClassName(document, 'table', 'vars'));
hideAll(getElementsByClassName(document, 'ol', 'pre-context'));
hideAll(getElementsByClassName(document, 'ol', 'post-context'));
function toggle() {
for (var i = 0; i < arguments.length; i++) {
var e = document.getElementById(arguments[i]);
if (e) {
e.style.display = e.style.display == 'none' ? 'block' : 'none';
return false;
function varToggle(link, id) {
toggle('v' + id);
var s = link.getElementsByTagName('span')[0];
var uarr = String.fromCharCode(0x25b6);
var darr = String.fromCharCode(0x25bc);
s.innerHTML = s.innerHTML == uarr ? darr : uarr;
return false;
<div id="summary">
<h1>$exception_type at $ctx.path</h1>
<td>$frames[0].filename in $frames[0].function, line $frames[0].lineno</td>
<td>$ctx.method $ctx.home$ctx.path</td>
<div id="traceback">
<h2>Traceback <span>(innermost first)</span></h2>
<ul class="traceback">
$for frame in frames:
<li class="frame">
<code>$frame.filename</code> in <code>$frame.function</code>
$if frame.context_line:
<div class="context" id="c$frame.id">
$if frame.pre_context:
<ol start="$frame.pre_context_lineno" class="pre-context" id="pre$frame.id">
$for line in frame.pre_context:
<li onclick="toggle('pre$frame.id', 'post$frame.id')">$line</li>
<ol start="$frame.lineno" class="context-line"><li onclick="toggle('pre$frame.id', 'post$frame.id')">$frame.context_line <span>...</span></li></ol>
$if frame.post_context:
<ol start='${frame.lineno + 1}' class="post-context" id="post$frame.id">
$for line in frame.post_context:
<li onclick="toggle('pre$frame.id', 'post$frame.id')">$line</li>
$if frame.vars:
<div class="commands">
<a href='#' onclick="return varToggle(this, '$frame.id')"><span>&#x25b6;</span> Local vars</a>
$# $inspect.formatargvalues(*inspect.getargvalues(frame['tb'].tb_frame))
$:dicttable(frame.vars, kls='vars', id=('v' + str(frame.id)))
<div id="requestinfo">
$if ctx.output or ctx.headers:
<h2>Response so far</h2>
<p class="req"><code>
$for kv in ctx.headers:
$kv[0]: $kv[1]<br />
[no headers]
<p class="req" style="padding-bottom: 2em"><code>
<h2>Request information</h2>
<h3 id="cookie-info">COOKIES</h3>
<h3 id="meta-info">META</h3>
$ newctx = []
$# ) and (k not in ['env', 'output', 'headers', 'environ', 'status', 'db_execute']):
$for k, v in ctx.iteritems():
$if not k.startswith('_') and (k in x):
<h3 id="meta-info">ENVIRONMENT</h3>
<div id="explanation">
You're seeing this error because you have <code>web.internalerror</code>
set to <code>web.debugerror</code>. Change that if you want a different one.
dicttable_t = r"""$def with (d, kls='req', id=None)
$if d:
<table class="$kls"\
$if id: id="$id"\
$ temp = d.items()
$for kv in temp:
<tr><td>$kv[0]</td><td class="code"><div>$prettify(kv[1])</div></td></tr>
<p>No data.</p>
dicttable_r = Template(dicttable_t, filter=websafe)
djangoerror_r = Template(djangoerror_t, filter=websafe)
def djangoerror():
def _get_lines_from_file(filename, lineno, context_lines):
Returns context_lines before and after lineno from file.
Returns (pre_context_lineno, pre_context, context_line, post_context).
source = open(filename).readlines()
lower_bound = max(0, lineno - context_lines)
upper_bound = lineno + context_lines
pre_context = \
[line.strip('\n') for line in source[lower_bound:lineno]]
context_line = source[lineno].strip('\n')
post_context = \
[line.strip('\n') for line in source[lineno + 1:upper_bound]]
return lower_bound, pre_context, context_line, post_context
except (OSError, IOError):
return None, [], None, []
exception_type, exception_value, tback = sys.exc_info()
frames = []
while tback is not None:
filename = tback.tb_frame.f_code.co_filename
function = tback.tb_frame.f_code.co_name
lineno = tback.tb_lineno - 1
pre_context_lineno, pre_context, context_line, post_context = \
_get_lines_from_file(filename, lineno, 7)
'tback': tback,
'filename': filename,
'function': function,
'lineno': lineno,
'vars': tback.tb_frame.f_locals,
'id': id(tback),
'pre_context': pre_context,
'context_line': context_line,
'post_context': post_context,
'pre_context_lineno': pre_context_lineno,
tback = tback.tb_next
urljoin = urlparse.urljoin
def prettify(x):
out = pprint.pformat(x)
except Exception, e:
out = '[could not display: <' + e.__class__.__name__ + \
': '+str(e)+'>]'
return out
dt = dicttable_r
dt.globals = {'prettify': prettify}
t = djangoerror_r
t.globals = {'ctx': web.ctx, 'web':web, 'dicttable':dt, 'dict':dict, 'str':str}
return t(exception_type, exception_value, frames)
def debugerror():
A replacement for `internalerror` that presents a nice page with lots
of debug information for the programmer.
(Based on the beautiful 500 page from [Django](http://djangoproject.com/),
designed by [Wilson Miner](http://wilsonminer.com/).)
web.ctx.headers = [('Content-Type', 'text/html')]
web.ctx.output = djangoerror()
if __name__ == "__main__":
urls = (
'/', 'index'
class index:
def GET(self):
web.internalerror = web.debugerror

@ -0,0 +1,215 @@
HTML forms
(part of web.py)
import copy, re
import webapi as web
import utils, net
def attrget(obj, attr, value=None):
if hasattr(obj, 'has_key') and obj.has_key(attr): return obj[attr]
if hasattr(obj, attr): return getattr(obj, attr)
return value
class Form:
def __init__(self, *inputs, **kw):
self.inputs = inputs
self.valid = True
self.note = None
self.validators = kw.pop('validators', [])
def __call__(self, x=None):
o = copy.deepcopy(self)
if x: o.validates(x)
return o
def render(self):
out = ''
out += self.rendernote(self.note)
out += '<table>\n'
for i in self.inputs:
out += ' <tr><th><label for="%s">%s</label></th>' % (i.id, i.description)
out += "<td>"+i.pre+i.render()+i.post+"</td>"
out += '<td id="note_%s">%s</td></tr>\n' % (i.id, self.rendernote(i.note))
out += "</table>"
return out
def rendernote(self, note):
if note: return '<strong class="wrong">%s</strong>' % note
else: return ""
def validates(self, source=None, _validate=True, **kw):
source = source or kw or web.input()
out = True
for i in self.inputs:
v = attrget(source, i.name)
if _validate:
out = i.validate(v) and out
i.value = v
if _validate:
out = out and self._validate(source)
self.valid = out
return out
def _validate(self, value):
self.value = value
for v in self.validators:
if not v.valid(value):
self.note = v.msg
return False
return True
def fill(self, source=None, **kw):
return self.validates(source, _validate=False, **kw)
def __getitem__(self, i):
for x in self.inputs:
if x.name == i: return x
raise KeyError, i
def _get_d(self): #@@ should really be form.attr, no?
return utils.storage([(i.name, i.value) for i in self.inputs])
d = property(_get_d)
class Input(object):
def __init__(self, name, *validators, **attrs):
self.description = attrs.pop('description', name)
self.value = attrs.pop('value', None)
self.pre = attrs.pop('pre', "")
self.post = attrs.pop('post', "")
self.id = attrs.setdefault('id', name)
if 'class_' in attrs:
attrs['class'] = attrs['class_']
del attrs['class_']
self.name, self.validators, self.attrs, self.note = name, validators, attrs, None
def validate(self, value):
self.value = value
for v in self.validators:
if not v.valid(value):
self.note = v.msg
return False
return True
def render(self): raise NotImplementedError
def addatts(self):
str = ""
for (n, v) in self.attrs.items():
str += ' %s="%s"' % (n, net.websafe(v))
return str
#@@ quoting
class Textbox(Input):
def render(self):
x = '<input type="text" name="%s"' % net.websafe(self.name)
if self.value: x += ' value="%s"' % net.websafe(self.value)
x += self.addatts()
x += ' />'
return x
class Password(Input):
def render(self):
x = '<input type="password" name="%s"' % net.websafe(self.name)
if self.value: x += ' value="%s"' % net.websafe(self.value)
x += self.addatts()
x += ' />'
return x
class Textarea(Input):
def render(self):
x = '<textarea name="%s"' % net.websafe(self.name)
x += self.addatts()
x += '>'
if self.value is not None: x += net.websafe(self.value)
x += '</textarea>'
return x
class Dropdown(Input):
def __init__(self, name, args, *validators, **attrs):
self.args = args
super(Dropdown, self).__init__(name, *validators, **attrs)
def render(self):
x = '<select name="%s"%s>\n' % (net.websafe(self.name), self.addatts())
for arg in self.args:
if type(arg) == tuple:
value, desc= arg
value, desc = arg, arg
if self.value == value: select_p = ' selected="selected"'
else: select_p = ''
x += ' <option %s value="%s">%s</option>\n' % (select_p, net.websafe(value), net.websafe(desc))
x += '</select>\n'
return x
class Radio(Input):
def __init__(self, name, args, *validators, **attrs):
self.args = args
super(Radio, self).__init__(name, *validators, **attrs)
def render(self):
x = '<span>'
for arg in self.args:
if self.value == arg: select_p = ' checked="checked"'
else: select_p = ''
x += '<input type="radio" name="%s" value="%s"%s%s /> %s ' % (net.websafe(self.name), net.websafe(arg), select_p, self.addatts(), net.websafe(arg))
return x+'</span>'
class Checkbox(Input):
def render(self):
x = '<input name="%s" type="checkbox"' % net.websafe(self.name)
if self.value: x += ' checked="checked"'
x += self.addatts()
x += ' />'
return x
class Button(Input):
def __init__(self, name, *validators, **attrs):
super(Button, self).__init__(name, *validators, **attrs)
self.description = ""
def render(self):
safename = net.websafe(self.name)
x = '<button name="%s"%s>%s</button>' % (safename, self.addatts(), safename)
return x
class Hidden(Input):
def __init__(self, name, *validators, **attrs):
super(Hidden, self).__init__(name, *validators, **attrs)
# it doesnt make sence for a hidden field to have description
self.description = ""
def render(self):
x = '<input type="hidden" name="%s"' % net.websafe(self.name)
if self.value: x += ' value="%s"' % net.websafe(self.value)
x += ' />'
return x
class File(Input):
def render(self):
x = '<input type="file" name="%s"' % net.websafe(self.name)
x += self.addatts()
x += ' />'
return x
class Validator:
def __deepcopy__(self, memo): return copy.copy(self)
def __init__(self, msg, test, jstest=None): utils.autoassign(self, locals())
def valid(self, value):
try: return self.test(value)
except: return False
notnull = Validator("Required", bool)
class regexp(Validator):
def __init__(self, rexp, msg):
self.rexp = re.compile(rexp)
self.msg = msg
def valid(self, value):
return bool(self.rexp.match(value))

HTTP Utilities
(from web.py)
__all__ = [
"expires", "lastmodified",
"prefixurl", "modified",
"redirect", "found", "seeother", "tempredirect",
"changequery", "url",
"background", "backgrounder",
"Reloader", "reloader", "profiler",
import sys, os, threading, urllib, urlparse
try: import datetime
except ImportError: pass
import net, utils, webapi as web
def prefixurl(base=''):
Sorry, this function is really difficult to explain.
Maybe some other time.
url = web.ctx.path.lstrip('/')
for i in xrange(url.count('/')):
base += '../'
if not base:
base = './'
return base
def expires(delta):
Outputs an `Expires` header for `delta` from now.
`delta` is a `timedelta` object or a number of seconds.
if isinstance(delta, (int, long)):
delta = datetime.timedelta(seconds=delta)
date_obj = datetime.datetime.utcnow() + delta
web.header('Expires', net.httpdate(date_obj))
def lastmodified(date_obj):
"""Outputs a `Last-Modified` header for `datetime`."""
web.header('Last-Modified', net.httpdate(date_obj))
def modified(date=None, etag=None):
n = web.ctx.env.get('HTTP_IF_NONE_MATCH')
m = net.parsehttpdate(web.ctx.env.get('HTTP_IF_MODIFIED_SINCE', '').split(';')[0])
validate = False
if etag:
raise NotImplementedError, "no etag support yet"
# should really be a warning
if date and m:
# we subtract a second because
# HTTP dates don't have sub-second precision
if date-datetime.timedelta(seconds=1) <= m:
validate = True
if validate: web.ctx.status = '304 Not Modified'
return not validate
By default, these all return simple error messages that send very short messages
(like "bad request") to the user. They can and should be overridden
to return nicer ones.
def redirect(url, status='301 Moved Permanently'):
Returns a `status` redirect to the new URL.
`url` is joined with the base URL so that things like
`redirect("about") will work properly.
newloc = urlparse.urljoin(web.ctx.path, url)
# if newloc is relative then make it absolute
#mvoncken:Disabled because we don't want to redirect to localhost!
#if newloc.startswith('/'):
# newloc = web.ctx.home + newloc
web.ctx.status = status
web.ctx.output = ''
web.header('Content-Type', 'text/html')
web.header('Location', newloc)
# seems to add a three-second delay for some reason:
# web.output('<a href="'+ newloc + '">moved permanently</a>')
def found(url):
"""A `302 Found` redirect."""
return redirect(url, '302 Found')
def seeother(url):
"""A `303 See Other` redirect."""
return redirect(url, '303 See Other')
def tempredirect(url):
"""A `307 Temporary Redirect` redirect."""
return redirect(url, '307 Temporary Redirect')
def write(cgi_response):
Converts a standard CGI-style string response into `header` and
`output` calls.
cgi_response = str(cgi_response)
cgi_response.replace('\r\n', '\n')
head, body = cgi_response.split('\n\n', 1)
lines = head.split('\n')
for line in lines:
if line.isspace():
hdr, value = line.split(":", 1)
value = value.strip()
if hdr.lower() == "status":
web.ctx.status = value
web.header(hdr, value)
def urlencode(query):
Same as urllib.urlencode, but supports unicode strings.
>>> urlencode({'text':'foo bar'})
query = dict([(k, utils.utf8(v)) for k, v in query.items()])
return urllib.urlencode(query)
def changequery(query=None, **kw):
Imagine you're at `/foo?a=1&b=2`. Then `changequery(a=3)` will return
`/foo?a=3&b=2` -- the same URL but with the arguments you requested
if query is None:
query = web.input(_method='get')
for k, v in kw.iteritems():
if v is None:
query.pop(k, None)
query[k] = v
out = web.ctx.path
if query:
out += '?' + urlencode(query)
return out
def url(path=None, **kw):
Makes url by concatinating web.ctx.homepath and path and the
query string created using the arguments.
if path is None:
path = web.ctx.path
if path.startswith("/"):
out = web.ctx.homepath + path
out = path
if kw:
out += '?' + urlencode(kw)
return out
def background(func):
"""A function decorator to run a long-running function as a background thread."""
def internal(*a, **kw):
web.data() # cache it
tmpctx = web._context[threading.currentThread()]
web._context[threading.currentThread()] = utils.storage(web.ctx.copy())
def newfunc():
web._context[threading.currentThread()] = tmpctx
func(*a, **kw)
myctx = web._context[threading.currentThread()]
for k in myctx.keys():
if k not in ['status', 'headers', 'output']:
try: del myctx[k]
except KeyError: pass
t = threading.Thread(target=newfunc)
background.threaddb[id(t)] = t
web.ctx.headers = []
return seeother(changequery(_t=id(t)))
return internal
background.threaddb = {}
def backgrounder(func):
def internal(*a, **kw):
i = web.input(_method='get')
if '_t' in i:
t = background.threaddb[int(i._t)]
except KeyError:
return web.notfound()
web._context[threading.currentThread()] = web._context[t]
return func(*a, **kw)
return internal
class Reloader:
Before every request, checks to see if any loaded modules have changed on
disk and, if so, reloads them.
def __init__(self, func):
self.func = func
self.mtimes = {}
# cheetah:
# b = _compiletemplate.bases
# _compiletemplate = globals()['__compiletemplate']
# _compiletemplate.bases = b
web.loadhooks['reloader'] = self.check
# todo:
# - replace relrcheck with a loadhook
#if reloader in middleware:
# relr = reloader(None)
# relrcheck = relr.check
# middleware.remove(reloader)
# relr = None
# relrcheck = lambda: None
# if relr:
# relr.func = wsgifunc
# return wsgifunc
def check(self):
for mod in sys.modules.values():
mtime = os.stat(mod.__file__).st_mtime
except (AttributeError, OSError, IOError):
if mod.__file__.endswith('.pyc') and \
mtime = max(os.stat(mod.__file__[:-1]).st_mtime, mtime)
if mod not in self.mtimes:
self.mtimes[mod] = mtime
elif self.mtimes[mod] < mtime:
self.mtimes[mod] = mtime
except ImportError:
return True
def __call__(self, e, o):
return self.func(e, o)
reloader = Reloader
def profiler(app):
"""Outputs basic profiling information at the bottom of each response."""
from utils import profile
def profile_internal(e, o):
out, result = profile(app)(e, o)
return out + ['<pre>' + net.websafe(result) + '</pre>']
return profile_internal
if __name__ == "__main__":
import doctest

__all__ = ["runsimple"]
import sys, os
import webapi as web
import net
def runbasic(func, server_address=("", 8080)):
Runs a simple HTTP server hosting WSGI app `func`. The directory `static/`
is hosted statically.
Based on [WsgiServer][ws] from [Colin Stewart][cs].
[ws]: http://www.owlfish.com/software/wsgiutils/documentation/wsgi-server-api.html
[cs]: http://www.owlfish.com/
# Copyright (c) 2004 Colin Stewart (http://www.owlfish.com/)
# Modified somewhat for simplicity
# Used under the modified BSD license:
# http://www.xfree86.org/3.3.6/COPYRIGHT2.html#5
import SimpleHTTPServer, SocketServer, BaseHTTPServer, urlparse
import socket, errno
import traceback
class WSGIHandler(SimpleHTTPServer.SimpleHTTPRequestHandler):
def run_wsgi_app(self):
protocol, host, path, parameters, query, fragment = \
urlparse.urlparse('http://dummyhost%s' % self.path)
# we only use path, query
env = {'wsgi.version': (1, 0)
,'wsgi.url_scheme': 'http'
,'wsgi.input': self.rfile
,'wsgi.errors': sys.stderr
,'wsgi.multithread': 1
,'wsgi.multiprocess': 0
,'wsgi.run_once': 0
,'REQUEST_METHOD': self.command
,'REQUEST_URI': self.path
,'PATH_INFO': path
,'QUERY_STRING': query
,'CONTENT_TYPE': self.headers.get('Content-Type', '')
,'CONTENT_LENGTH': self.headers.get('Content-Length', '')
,'REMOTE_ADDR': self.client_address[0]
,'SERVER_NAME': self.server.server_address[0]
,'SERVER_PORT': str(self.server.server_address[1])
,'SERVER_PROTOCOL': self.request_version
for http_header, http_value in self.headers.items():
env ['HTTP_%s' % http_header.replace('-', '_').upper()] = \
# Setup the state
self.wsgi_sent_headers = 0
self.wsgi_headers = []
# We have there environment, now invoke the application
result = self.server.app(env, self.wsgi_start_response)
for data in result:
if data:
if hasattr(result, 'close'):
except socket.error, socket_err:
# Catch common network errors and suppress them
if (socket_err.args[0] in \
(errno.ECONNABORTED, errno.EPIPE)):
except socket.timeout, socket_timeout:
print >> web.debug, traceback.format_exc(),
if (not self.wsgi_sent_headers):
# We must write out something!
self.wsgi_write_data(" ")
do_POST = run_wsgi_app
do_PUT = run_wsgi_app
do_DELETE = run_wsgi_app
def do_GET(self):
if self.path.startswith('/static/'):
def wsgi_start_response(self, response_status, response_headers,
if (self.wsgi_sent_headers):
raise Exception \
("Headers already sent and start_response called again!")
# Should really take a copy to avoid changes in the application....
self.wsgi_headers = (response_status, response_headers)
return self.wsgi_write_data
def wsgi_write_data(self, data):
if (not self.wsgi_sent_headers):
status, headers = self.wsgi_headers
# Need to send header prior to data
status_code = status[:status.find(' ')]
status_msg = status[status.find(' ') + 1:]
self.send_response(int(status_code), status_msg)
for header, value in headers:
self.send_header(header, value)
self.wsgi_sent_headers = 1
# Send the data
class WSGIServer(SocketServer.ThreadingMixIn, BaseHTTPServer.HTTPServer):
def __init__(self, func, server_address):
self.app = func
self.serverShuttingDown = 0
print "http://%s:%d/" % server_address
WSGIServer(func, server_address).serve_forever()
def runsimple(func, server_address=("", 8080)):
Runs [CherryPy][cp] WSGI server hosting WSGI app `func`.
The directory `static/` is hosted statically.
[cp]: http://www.cherrypy.org
from wsgiserver import CherryPyWSGIServer
from SimpleHTTPServer import SimpleHTTPRequestHandler
from BaseHTTPServer import BaseHTTPRequestHandler
class StaticApp(SimpleHTTPRequestHandler):
"""WSGI application for serving static files."""
def __init__(self, environ, start_response):
self.headers = []
self.environ = environ
self.start_response = start_response
def send_response(self, status, msg=""):
self.status = str(status) + " " + msg
def send_header(self, name, value):
self.headers.append((name, value))
def end_headers(self):
def log_message(*a): pass
def __iter__(self):
environ = self.environ
self.path = environ.get('PATH_INFO', '')
self.client_address = environ.get('REMOTE_ADDR','-'), \
self.command = environ.get('REQUEST_METHOD', '-')
from cStringIO import StringIO
self.wfile = StringIO() # for capturing error
f = self.send_head()
self.start_response(self.status, self.headers)
if f:
block_size = 16 * 1024
while True:
buf = f.read(block_size)
if not buf:
yield buf
value = self.wfile.getvalue()
yield value
class WSGIWrapper(BaseHTTPRequestHandler):
"""WSGI wrapper for logging the status and serving static files."""
def __init__(self, app):
self.app = app
self.format = '%s - - [%s] "%s %s %s" - %s'
def __call__(self, environ, start_response):
def xstart_response(status, response_headers, *args):
write = start_response(status, response_headers, *args)
self.log(status, environ)
return write
path = environ.get('PATH_INFO', '')
if path.startswith('/static/'):
return StaticApp(environ, xstart_response)
return self.app(environ, xstart_response)
def log(self, status, environ):
outfile = environ.get('wsgi.errors', web.debug)
req = environ.get('PATH_INFO', '_')
protocol = environ.get('ACTUAL_SERVER_PROTOCOL', '-')
method = environ.get('REQUEST_METHOD', '-')
host = "%s:%s" % (environ.get('REMOTE_ADDR','-'),
#@@ It is really bad to extend from
#@@ BaseHTTPRequestHandler just for this method
time = self.log_date_time_string()
print >> outfile, self.format % (host, time, protocol,
method, req, status)
func = WSGIWrapper(func)
server = CherryPyWSGIServer(server_address, func, server_name="localhost")
print "http://%s:%d/" % server_address
except KeyboardInterrupt:

Network Utilities
(from web.py)
__all__ = [
"validipaddr", "validipport", "validip", "validaddr",
"httpdate", "parsehttpdate",
"htmlquote", "websafe",
import urllib, time
try: import datetime
except ImportError: pass
def validipaddr(address):
"""returns True if `address` is a valid IPv4 address"""
octets = address.split('.')
assert len(octets) == 4
for x in octets:
assert 0 <= int(x) <= 255
except (AssertionError, ValueError):
return False
return True
def validipport(port):
"""returns True if `port` is a valid IPv4 port"""
assert 0 <= int(port) <= 65535
except (AssertionError, ValueError):
return False
return True
def validip(ip, defaultaddr="", defaultport=8080):
"""returns `(ip_address, port)` from string `ip_addr_port`"""
addr = defaultaddr
port = defaultport
ip = ip.split(":", 1)
if len(ip) == 1:
if not ip[0]:
elif validipaddr(ip[0]):
addr = ip[0]
elif validipport(ip[0]):
port = int(ip[0])
raise ValueError, ':'.join(ip) + ' is not a valid IP address/port'
elif len(ip) == 2:
addr, port = ip
if not validipaddr(addr) and validipport(port):
raise ValueError, ':'.join(ip) + ' is not a valid IP address/port'
port = int(port)
raise ValueError, ':'.join(ip) + ' is not a valid IP address/port'
return (addr, port)
def validaddr(string_):
returns either (ip_address, port) or "/path/to/socket" from string_
>>> validaddr('/path/to/socket')
>>> validaddr('8000')
('', 8000)
>>> validaddr('')
('', 8080)
>>> validaddr('')
('', 8000)
>>> validaddr('fff')
Traceback (most recent call last):
ValueError: fff is not a valid IP address/port
if '/' in string_:
return string_
return validip(string_)
def urlquote(val):
Quotes a string for use in a URL.
>>> urlquote('://?f=1&j=1')
>>> urlquote(None)
>>> urlquote(u'\u203d')
if val is None: return ''
if not isinstance(val, unicode): val = str(val)
else: val = val.encode('utf-8')
return urllib.quote(val)
def httpdate(date_obj):
Formats a datetime object for use in HTTP headers.
>>> import datetime
>>> httpdate(datetime.datetime(1970, 1, 1, 1, 1, 1))
'Thu, 01 Jan 1970 01:01:01 GMT'
return date_obj.strftime("%a, %d %b %Y %H:%M:%S GMT")
def parsehttpdate(string_):
Parses an HTTP date into a datetime object.
>>> parsehttpdate('Thu, 01 Jan 1970 01:01:01 GMT')
datetime.datetime(1970, 1, 1, 1, 1, 1)
t = time.strptime(string_, "%a, %d %b %Y %H:%M:%S %Z")
except ValueError:
return None
return datetime.datetime(*t[:6])
def htmlquote(text):
Encodes `text` for raw use in HTML.
>>> htmlquote("<'&\\">")
text = text.replace("&", "&amp;") # Must be done first!
text = text.replace("<", "&lt;")
text = text.replace(">", "&gt;")
text = text.replace("'", "&#39;")
text = text.replace('"', "&quot;")
return text
def websafe(val):
Converts `val` so that it's safe for use in UTF-8 HTML.
>>> websafe("<'&\\">")
>>> websafe(None)
>>> websafe(u'\u203d')
if val is None:
return ''
if isinstance(val, unicode):
val = val.encode('utf-8')
val = str(val)
return htmlquote(val)
if __name__ == "__main__":
import doctest

Request Delegation
(from web.py)
__all__ = ["handle", "nomethod", "autodelegate", "webpyfunc", "run"]
import sys, re, types, os.path, urllib
import http, wsgi, utils, webapi
import webapi as web
def handle(mapping, fvars=None):
Call the appropriate function based on the url to function mapping in `mapping`.
If no module for the function is specified, look up the function in `fvars`. If
`fvars` is empty, using the caller's context.
`mapping` should be a tuple of paired regular expressions with function name
substitutions. `handle` will import modules as necessary.
for url, ofno in utils.group(mapping, 2):
if isinstance(ofno, tuple):
ofn, fna = ofno[0], list(ofno[1:])
ofn, fna = ofno, []
fn, result = utils.re_subm('^' + url + '$', ofn, web.ctx.path)
if result: # it's a match
if fn.split(' ', 1)[0] == "redirect":
url = fn.split(' ', 1)[1]
if web.ctx.method == "GET":
x = web.ctx.env.get('QUERY_STRING', '')
if x:
url += '?' + x
return http.redirect(url)
elif '.' in fn:
x = fn.split('.')
mod, cls = '.'.join(x[:-1]), x[-1]
mod = __import__(mod, globals(), locals(), [""])
cls = getattr(mod, cls)
cls = fn
mod = fvars
if isinstance(mod, types.ModuleType):
mod = vars(mod)
cls = mod[cls]
except KeyError:
return web.notfound()
meth = web.ctx.method
if meth == "HEAD":
if not hasattr(cls, meth):
meth = "GET"
if not hasattr(cls, meth):
return nomethod(cls)
tocall = getattr(cls(), meth)
args = list(result.groups())
for d in re.findall(r'\\(\d+)', ofn):
args.pop(int(d) - 1)
return tocall(*([x and urllib.unquote(x) for x in args] + fna))
return web.notfound()
def nomethod(cls):
"""Returns a `405 Method Not Allowed` error for `cls`."""
web.ctx.status = '405 Method Not Allowed'
web.header('Content-Type', 'text/html')
web.header('Allow', \
', '.join([method for method in \
['GET', 'HEAD', 'POST', 'PUT', 'DELETE'] \
if hasattr(cls, method)]))
# commented out for the same reason redirect is
# return output('method not allowed')
def autodelegate(prefix=''):
Returns a method that takes one argument and calls the method named prefix+arg,
calling `notfound()` if there isn't one. Example:
urls = ('/prefs/(.*)', 'prefs')
class prefs:
GET = autodelegate('GET_')
def GET_password(self): pass
def GET_privacy(self): pass
`GET_password` would get called for `/prefs/password` while `GET_privacy` for
`GET_privacy` gets called for `/prefs/privacy`.
If a user visits `/prefs/password/change` then `GET_password(self, '/change')`
is called.
def internal(self, arg):
if '/' in arg:
first, rest = arg.split('/', 1)
func = prefix + first
args = ['/' + rest]
func = prefix + arg
args = []
if hasattr(self, func):
return getattr(self, func)(*args)
except TypeError:
return web.notfound()
return web.notfound()
return internal
def webpyfunc(inp, fvars, autoreload=False):
"""If `inp` is a url mapping, returns a function that calls handle."""
if not hasattr(inp, '__call__'):
if autoreload:
def modname():
"""find name of the module name from fvars."""
file, name = fvars['__file__'], fvars['__name__']
if name == '__main__':
# Since the __main__ module can't be reloaded, the module has
# to be imported using its file name.
name = os.path.splitext(os.path.basename(file))[0]
return name
mod = __import__(modname(), None, None, [""])
#@@probably should replace this with some inspect magic
name = utils.dictfind(fvars, inp)
func = lambda: handle(getattr(mod, name), mod)
func = lambda: handle(inp, fvars)
func = inp
return func
def run(inp, fvars, *middleware):
Starts handling requests. If called in a CGI or FastCGI context, it will follow
that protocol. If called from the command line, it will start an HTTP
server on the port named in the first command line argument, or, if there
is no argument, on port 8080.
`input` is a callable, then it's called with no arguments.
Otherwise, it's a `mapping` object to be passed to `handle(...)`.
**Caveat:** So that `reloader` will work correctly, input has to be a variable,
it can't be a tuple passed in directly.
`middleware` is a list of WSGI middleware which is applied to the resulting WSGI
autoreload = http.reloader in middleware
return wsgi.runwsgi(webapi.wsgifunc(webpyfunc(inp, fvars, autoreload), *middleware))

simple, elegant templating
(part of web.py)
import re, glob, os, os.path
from types import FunctionType as function
from utils import storage, group, utf8
from net import websafe
# differences from python:
# - for: has an optional else: that gets called if the loop never runs
# differences to add:
# - you can use the expression inside if, while blocks
# - special for loop attributes, like django?
# - you can check to see if a variable is defined (perhaps w/ get func?)
# all these are probably good ideas for python...
# todo:
# inline tuple
# relax constraints on spacing
# continue, break, etc.
# tracebacks
global_globals = {'None':None, 'False':False, 'True': True}
MAX_ITERS = 100000
WHAT = 0
ARGS = 4
NAME = 2
BODY = 4
ELIF = 6
ELSE = 8
IN = 6
NAME = 2
EXPR = 4
ATTR = 4
ITEM = 4
X = 2
OP = 4
Y = 6
# http://docs.python.org/ref/identifiers.html
r_var = '[a-zA-Z_][a-zA-Z0-9_]*'
class ParseError(Exception): pass
class Parser:
def __init__(self, text, name=""):
self.t = text
self.p = 0
self._lock = [False]
self.name = name
def lock(self):
self._lock[-1] = True
def curline(self):
return self.t[:self.p].count('\n')+1
def csome(self):
return repr(self.t[self.p:self.p+5]+'...')
def Error(self, x, y=None):
if y is None: y = self.csome()
raise ParseError, "%s: expected %s, got %s (line %s)" % (self.name, x, y, self.curline())
def q(self, f):
def internal(*a, **kw):
checkp = self.p
q = f(*a, **kw)
except ParseError:
if self._lock[-1]:
self.p = checkp
return False
return q or True
return internal
def tokr(self, t):
text = self.c(len(t))
if text != t:
self.Error(repr(t), repr(text))
return t
def ltokr(self, *l):
for x in l:
o = self.tokq(x)
if o: return o
self.Error('one of '+repr(l))
def rer(self, r):
x = re.match(r, self.t[self.p:]) #@@re_compile
if not x:
return self.tokr(x.group())
def endr(self):
if self.p != len(self.t):
def c(self, n=1):
out = self.t[self.p:self.p+n]
if out == '' and n != 0:
self.Error('character', 'EOF')
self.p += n
return out
def lookbehind(self, t):
return self.t[self.p-len(t):self.p] == t
def __getattr__(self, a):
if a.endswith('q'):
return self.q(getattr(self, a[:-1]+'r'))
raise AttributeError, a
class TemplateParser(Parser):
def __init__(self, *a, **kw):
Parser.__init__(self, *a, **kw)
self.curws = ''
self.curind = ''
def o(self, *a):
return a+('lineno', self.curline())
def go(self):
# maybe try to do some traceback parsing/hacking
return self.gor()
def gor(self):
header = self.defwithq()
results = self.lines(start=True)
return header, results
def ws(self):
n = 0
while self.tokq(" "): n += 1
return " " * n
def defwithr(self):
self.tokr('$def with ')
args = []
kw = []
x = self.req(r_var)
while x:
if self.tokq('='):
v = self.exprr()
kw.append((x, v))
x = self.tokq(', ') and self.req(r_var)
return self.o('defwith', 'null', None, 'args', args, 'kwargs', kw)
def literalr(self):
o = (
self.req('"[^"]*"') or #@@ no support for escapes
if o is False:
o = self.req('\-?[0-9]+(\.[0-9]*)?')
if o is not False:
if '.' in o: o = float(o)
else: o = int(o)
if o is False: self.Error('literal')
return self.o('literal', 'thing', o)
def listr(self):
x = []
if not self.tokq(']'):
while True:
t = self.exprr()
if not self.tokq(', '): break
return self.o('list', 'thing', x)
def dictr(self):
x = {}
if not self.tokq('}'):
while True:
k = self.exprr()
self.tokr(': ')
v = self.exprr()
x[k] = v
if not self.tokq(', '): break
return self.o('dict', 'thing', x)
def parenr(self):
o = self.exprr() # todo: allow list
return self.o('paren', 'thing', o)
def atomr(self):
"""returns var, literal, paren, dict, or list"""
o = (
self.varq() or
self.parenq() or
self.dictq() or
self.listq() or
if o is False: self.Error('atom')
return o
def primaryr(self):
"""returns getattr, call, or getitem"""
n = self.atomr()
while 1:
if self.tokq('.'):
v = self.req(r_var)
if not v:
self.p -= 1 # get rid of the '.'
n = self.o('getattr', 'thing', n, 'attr', v)
elif self.tokq('('):
args = []
kw = []
while 1:
# need to see if we're doing a keyword argument
checkp = self.p
k = self.req(r_var)
if k and self.tokq('='): # yup
v = self.exprr()
kw.append((k, v))
self.p = checkp
x = self.exprq()
if x: # at least it's something
if not self.tokq(', '): break
n = self.o('call', 'thing', n, 'args', args, 'kwargs', kw)
elif self.tokq('['):
v = self.exprr()
n = self.o('getitem', 'thing', n, 'item', v)
return n
def exprr(self):
negate = self.tokq('not ')
x = self.primaryr()
if self.tokq(' '):
operator = self.ltokr('not in', 'in', 'is not', 'is', '==', '!=', '>=', '<=', '<', '>', 'and', 'or', '*', '+', '-', '/', '%')
self.tokr(' ')
y = self.exprr()
x = self.o('test', 'x', x, 'op', operator, 'y', y)
return self.o('expr', 'thing', x, 'negate', negate)
def varr(self):
return self.o('var', 'name', self.rer(r_var))
def liner(self):
out = []
o = self.curws
while 1:
c = self.c()
if c == '\n':
self.p -= 1
if c == '$':
if self.lookbehind('\\$'):
o = o[:-1] + c
filter = not bool(self.tokq(':'))
if self.tokq('{'):
out.append(self.o('itpl', 'name', self.exprr(), 'filter', filter))
o = ''
g = self.primaryq()
if g:
out.append(self.o('itpl', 'name', g, 'filter', filter))
o = ''
o += c
o += c
if not self.lookbehind('\\\n'):
o += '\n'
o = o[:-1]
return self.o('line', 'thing', out)
def varsetr(self):
self.tokr('$var ')
what = self.rer(r_var)
body = self.lines()
return self.o('varset', 'name', what, 'body', body)
def ifr(self):
self.tokr("$if ")
expr = self.exprr()
ifc = self.lines()
elifs = []
while self.tokq(self.curws + self.curind + '$elif '):
v = self.exprr()
c = self.lines()
elifs.append(self.o('elif', 'clause', v, 'body', c))
if self.tokq(self.curws + self.curind + "$else:"):
elsec = self.lines()
elsec = None
return self.o('if', 'clause', expr, 'then', ifc, 'elif', elifs, 'else', elsec)
def forr(self):
self.tokr("$for ")
v = self.setabler()
self.tokr(" in ")
g = self.exprr()
l = self.lines()
if self.tokq(self.curws + self.curind + '$else:'):
elsec = self.lines()
elsec = None
return self.o('for', 'name', v, 'body', l, 'in', g, 'else', elsec)
def whiler(self):
self.tokr('$while ')
v = self.exprr()
l = self.lines()
if self.tokq(self.curws + self.curind + '$else:'):
elsec = self.lines()
elsec = None
return self.o('while', 'clause', v, 'body', l, 'null', None, 'else', elsec)
def assignr(self):
self.tokr('$ ')
assign = self.rer(r_var) # NOTE: setable
self.tokr(' = ')
expr = self.exprr()
return self.o('assign', 'name', assign, 'expr', expr)
def commentr(self):
while self.c() != '\n': pass
return self.o('comment')
def setabler(self):
out = [self.varr()] #@@ not quite right
while self.tokq(', '):
return out
def lines(self, start=False):
This function gets called from two places:
1. at the start, where it's matching the document itself
2. after any command, where it matches one line or an indented block
o = []
if not start: # try to match just one line
singleline = self.tokq(' ') and self.lineq()
if singleline:
return [singleline]
self.rer(' *') #@@slurp space?
oldind = self.curind
self.curind += ' '
while 1:
oldws = self.curws
t = self.tokq(oldws + self.curind)
if not t: break
self.curws += self.ws()
x = t and (
self.varsetq() or
self.ifq() or
self.forq() or
self.whileq() or
self.assignq() or
self.commentq() or
self.curws = oldws
if not x:
elif x[WHAT] == 'comment':
if not start: self.curind = oldind
return o
class Stowage(storage):
def __str__(self): return self.get('_str')
#@@ edits in place
def __add__(self, other):
if isinstance(other, (unicode, str)):
self._str += other
return self
raise TypeError, 'cannot add'
def __radd__(self, other):
if isinstance(other, (unicode, str)):
self._str = other + self._str
return self
raise TypeError, 'cannot add'
class WTF(AssertionError): pass
class SecurityError(Exception):
"""The template seems to be trying to do something naughty."""
Required = object()
class Template:
globals = {}
content_types = {
'.html' : 'text/html; charset=utf-8',
'.txt' : 'text/plain',
def __init__(self, text, filter=None, filename=""):
self.filter = filter
self.filename = filename
# universal newlines:
text = text.replace('\r\n', '\n').replace('\r', '\n').expandtabs()
if not text.endswith('\n'): text += '\n'
header, tree = TemplateParser(text, filename).go()
self.tree = tree
if header:
self.args, self.kwargs = (), {}
def __call__(self, *a, **kw):
d = self.globals.copy()
d.update(self._parseargs(a, kw))
f = Fill(self.tree, d=d)
if self.filter: f.filter = self.filter
import webapi as web
if 'headers' in web.ctx and self.filename:
content_type = self.find_content_type()
if content_type:
web.header('Content-Type', content_type, unique=True)
return f.go()
def find_content_type(self):
for ext, content_type in self.content_types.iteritems():
if self.filename.endswith(ext):
return content_type
def _parseargs(self, inargs, inkwargs):
# difference from Python:
# no error on setting a keyword arg twice
d = {}
for arg in self.args:
d[arg] = Required
for kw, val in self.kwargs:
d[kw] = val
for n, val in enumerate(inargs):
if n < len(self.args):
d[self.args[n]] = val
elif n < len(self.args)+len(self.kwargs):
kw = self.kwargs[n - len(self.args)][0]
d[kw] = val
for kw, val in inkwargs.iteritems():
d[kw] = val
unset = []
for k, v in d.iteritems():
if v is Required:
if unset:
raise TypeError, 'values for %s are required' % unset
return d
def h_defwith(self, header):
assert header[WHAT] == 'defwith'
f = Fill(self.tree, d={})
self.args = header[ARGS]
self.kwargs = []
for var, valexpr in header[KWARGS]:
self.kwargs.append((var, f.h(valexpr)))
def __repr__(self):
return "<Template: %s>" % self.filename
class Handle:
def __init__(self, parsetree, **kw):
self._funccache = {}
self.parsetree = parsetree
for (k, v) in kw.iteritems(): setattr(self, k, v)
def h(self, item):
return getattr(self, 'h_' + item[WHAT])(item)
class Fill(Handle):
builtins = global_globals
def filter(self, text):
if text is None: return ''
else: return utf8(text)
# often replaced with stuff like net.websafe
def h_literal(self, i):
item = i[THING]
if isinstance(item, (unicode, str)) and item[0] in ['"', "'"]:
item = item[1:-1]
elif isinstance(item, (float, int)):
return item
def h_list(self, i):
x = i[THING]
out = []
for item in x:
return out
def h_dict(self, i):
x = i[THING]
out = {}
for k, v in x.iteritems():
out[self.h(k)] = self.h(v)
return out
def h_paren(self, i):
item = i[THING]
if isinstance(item, list):
raise NotImplementedError, 'tuples'
return self.h(item)
def h_getattr(self, i):
thing, attr = i[THING], i[ATTR]
thing = self.h(thing)
if attr.startswith('_') or attr.startswith('func_') or attr.startswith('im_'):
raise SecurityError, 'tried to get ' + attr
if thing in self.builtins:
raise SecurityError, 'tried to getattr on ' + repr(thing)
except TypeError:
pass # raised when testing an unhashable object
return getattr(thing, attr)
except AttributeError:
if isinstance(thing, list) and attr == 'join':
return lambda s: s.join(thing)
def h_call(self, i):
call = self.h(i[THING])
args = [self.h(x) for x in i[ARGS]]
kw = dict([(x, self.h(y)) for (x, y) in i[KWARGS]])
return call(*args, **kw)
def h_getitem(self, i):
thing, item = i[THING], i[ITEM]
thing = self.h(thing)
item = self.h(item)
return thing[item]
def h_expr(self, i):
item = self.h(i[THING])
if i[NEGATE]:
item = not item
return item
def h_test(self, item):
ox, op, oy = item[X], item[OP], item[Y]
# for short-circuiting to work, we can't eval these here
e = self.h
if op == 'is':
return e(ox) is e(oy)
elif op == 'is not':
return e(ox) is not e(oy)
elif op == 'in':
return e(ox) in e(oy)
elif op == 'not in':
return e(ox) not in e(oy)
elif op == '==':
return e(ox) == e(oy)
elif op == '!=':
return e(ox) != e(oy)
elif op == '>':
return e(ox) > e(oy)
elif op == '<':
return e(ox) < e(oy)
elif op == '<=':
return e(ox) <= e(oy)
elif op == '>=':
return e(ox) >= e(oy)
elif op == 'and':
return e(ox) and e(oy)
elif op == 'or':
return e(ox) or e(oy)
elif op == '+':
return e(ox) + e(oy)
elif op == '-':
return e(ox) - e(oy)
elif op == '*':
return e(ox) * e(oy)
elif op == '/':
return e(ox) / e(oy)
elif op == '%':
return e(ox) % e(oy)
raise WTF, 'op ' + op
def h_var(self, i):
v = i[NAME]
if v in self.d:
return self.d[v]
elif v in self.builtins:
return self.builtins[v]
elif v == 'self':
return self.output
raise NameError, 'could not find %s (line %s)' % (repr(i[NAME]), i[LINENO])
def h_line(self, i):
out = []
for x in i[THING]:
#@@ what if x is unicode
if isinstance(x, str):
elif x[WHAT] == 'itpl':
o = self.h(x[NAME])
if x[FILTER]:
o = self.filter(o)
o = (o is not None and utf8(o)) or ""
raise WTF, x
return ''.join(out)
def h_varset(self, i):
self.output[i[NAME]] = ''.join(self.h_lines(i[BODY]))
return ''
def h_if(self, i):
expr = self.h(i[CLAUSE])
if expr:
do = i[BODY]
for e in i[ELIF]:
expr = self.h(e[CLAUSE])
if expr:
do = e[BODY]
do = i[ELSE]
return ''.join(self.h_lines(do))
def h_for(self, i):
out = []
assert i[IN][WHAT] == 'expr'
invar = self.h(i[IN])
forvar = i[NAME]
if invar:
for nv in invar:
if len(forvar) == 1:
fv = forvar[0]
assert fv[WHAT] == 'var'
self.d[fv[NAME]] = nv # same (lack of) scoping as Python
for x, y in zip(forvar, nv):
assert x[WHAT] == 'var'
self.d[x[NAME]] = y
if i[ELSE]:
return ''.join(out)
def h_while(self, i):
out = []
expr = self.h(i[CLAUSE])
if not expr:
return ''.join(self.h_lines(i[ELSE]))
c = 0
while expr:
c += 1
if c >= MAX_ITERS:
raise RuntimeError, 'too many while-loop iterations (line %s)' % i[LINENO]
expr = self.h(i[CLAUSE])
return ''.join(out)
def h_assign(self, i):
self.d[i[NAME]] = self.h(i[EXPR])
return ''
def h_comment(self, i): pass
def h_lines(self, lines):
if lines is None: return []
return map(self.h, lines)
def go(self):
self.output = Stowage()
self.output._str = ''.join(map(self.h, self.parsetree))
if self.output.keys() == ['_str']:
self.output = self.output['_str']
return self.output
class render:
def __init__(self, loc='templates/', cache=True):
self.loc = loc
if cache:
self.cache = {}
self.cache = False
def _do(self, name, filter=None):
if self.cache is False or name not in self.cache:
tmplpath = os.path.join(self.loc, name)
p = [f for f in glob.glob(tmplpath + '.*') if not f.endswith('~')] # skip backup files
if not p and os.path.isdir(tmplpath):
return render(tmplpath, cache=self.cache)
elif not p:
raise AttributeError, 'no template named ' + name
p = p[0]
c = Template(open(p).read(), filename=p)
if self.cache is not False: self.cache[name] = (p, c)
if self.cache is not False: p, c = self.cache[name]
if p.endswith('.html') or p.endswith('.xml'):
if not filter: c.filter = websafe
return c
def __getattr__(self, p):
return self._do(p)
def frender(fn, *a, **kw):
return Template(open(fn).read(), *a, **kw)
def test():
import sys
verbose = '-v' in sys.argv
def assertEqual(a, b):
if a == b:
if verbose:
assert a == b, "\nexpected: %s\ngot: %s" % (repr(b), repr(a))
from utils import storage, group
class t:
def __init__(self, text):
self.text = text
def __call__(self, *a, **kw):
return TestResult(self.text, Template(self.text)(*a, **kw))
class TestResult:
def __init__(self, source, value):
self.source = source
self.value = value
def __eq__(self, other):
if self.value == other:
if verbose:
print >> sys.stderr, 'FAIL:', repr(self.source), 'expected', repr(other), ', got', repr(self.value)
t('1')() == '1\n'
t('$def with ()\n1')() == '1\n'
t('$def with (a)\n$a')(1) == '1\n'
t('$def with (a=0)\n$a')(1) == '1\n'
t('$def with (a=0)\n$a')(a=1) == '1\n'
t('$if 1: 1')() == '1\n'
t('$if 1:\n 1')() == '1\n'
t('$if 0: 0\n$elif 1: 1')() == '1\n'
t('$if 0: 0\n$elif None: 0\n$else: 1')() == '1\n'
t('$if (0 < 1) and (1 < 2): 1')() == '1\n'
t('$for x in [1, 2, 3]: $x')() == '1\n2\n3\n'
t('$for x in []: 0\n$else: 1')() == '1\n'
t('$def with (a)\n$while a and a.pop(): 1')([1, 2, 3]) == '1\n1\n1\n'
t('$while 0: 0\n$else: 1')() == '1\n'
t('$ a = 1\n$a')() == '1\n'
t('$# 0')() == ''
t('$def with (d)\n$for k, v in d.iteritems(): $k')({1: 1}) == '1\n'
t('$def with (a)\n$(a)')(1) == '1\n'
t('$def with (a)\n$a')(1) == '1\n'
t('$def with (a)\n$a.b')(storage(b=1)) == '1\n'
t('$def with (a)\n$a[0]')([1]) == '1\n'
t('${0 or 1}')() == '1\n'
t('$ a = [1]\n$a[0]')() == '1\n'
t('$ a = {1: 1}\n$a.keys()[0]')() == '1\n'
t('$ a = []\n$if not a: 1')() == '1\n'
t('$ a = {}\n$if not a: 1')() == '1\n'
t('$ a = -1\n$a')() == '-1\n'
t('$ a = "1"\n$a')() == '1\n'
t('$if 1 is 1: 1')() == '1\n'
t('$if not 0: 1')() == '1\n'
t('$if 1:\n $if 1: 1')() == '1\n'
t('$ a = 1\n$a')() == '1\n'
t('$ a = 1.\n$a')() == '1.0\n'
t('$({1: 1}.keys()[0])')() == '1\n'
t('$for x in [1, 2, 3]:\n\t$x')() == ' 1\n 2\n 3\n'
t('$def with (a)\n$:a')(1) == '1\n'
t('$def with (a)\n$a')(u'\u203d') == '\xe2\x80\xbd\n'
t(u'$def with (f)\n$:f("x")')(lambda x: x) == 'x\n'
j = Template("$var foo: bar")()
assertEqual(str(j), '')
assertEqual(j.foo, 'bar\n')
if verbose: sys.stderr.write('\n')
if __name__ == "__main__":

General Utilities
(part of web.py)
__all__ = [
"Storage", "storage", "storify",
"rstrips", "lstrips", "strips", "utf8",
"TimeoutError", "timelimit",
"Memoize", "memoize",
"re_compile", "re_subm",
"IterBetter", "iterbetter",
"dictreverse", "dictfind", "dictfindall", "dictincr", "dictadd",
"listget", "intget", "datestr",
"numify", "denumify", "dateify",
"CaptureStdout", "capturestdout", "Profile", "profile",
import re, sys, time, threading
try: import datetime
except ImportError: pass
class Storage(dict):
A Storage object is like a dictionary except `obj.foo` can be used
in addition to `obj['foo']`.
>>> o = storage(a=1)
>>> o.a
>>> o['a']
>>> o.a = 2
>>> o['a']
>>> del o.a
>>> o.a
Traceback (most recent call last):
AttributeError: 'a'
def __getattr__(self, key):
return self[key]
except KeyError, k:
raise AttributeError, k
def __setattr__(self, key, value):
self[key] = value
def __delattr__(self, key):
del self[key]
except KeyError, k:
raise AttributeError, k
def __repr__(self):
return '<Storage ' + dict.__repr__(self) + '>'
storage = Storage
def storify(mapping, *requireds, **defaults):
Creates a `storage` object from dictionary `mapping`, raising `KeyError` if
d doesn't have all of the keys in `requireds` and using the default
values for keys found in `defaults`.
For example, `storify({'a':1, 'c':3}, b=2, c=0)` will return the equivalent of
`storage({'a':1, 'b':2, 'c':3})`.
If a `storify` value is a list (e.g. multiple values in a form submission),
`storify` returns the last element of the list, unless the key appears in
`defaults` as a list. Thus:
>>> storify({'a':[1, 2]}).a
>>> storify({'a':[1, 2]}, a=[]).a
[1, 2]
>>> storify({'a':1}, a=[]).a
>>> storify({}, a=[]).a
Similarly, if the value has a `value` attribute, `storify will return _its_
value, unless the key appears in `defaults` as a dictionary.
>>> storify({'a':storage(value=1)}).a
>>> storify({'a':storage(value=1)}, a={}).a
<Storage {'value': 1}>
>>> storify({}, a={}).a
def getvalue(x):
if hasattr(x, 'value'):
return x.value
return x
stor = Storage()
for key in requireds + tuple(mapping.keys()):
value = mapping[key]
if isinstance(value, list):
if isinstance(defaults.get(key), list):
value = [getvalue(x) for x in value]
value = value[-1]
if not isinstance(defaults.get(key), dict):
value = getvalue(value)
if isinstance(defaults.get(key), list) and not isinstance(value, list):
value = [value]
setattr(stor, key, value)
for (key, value) in defaults.iteritems():
result = value
if hasattr(stor, key):
result = stor[key]
if value == () and not isinstance(result, tuple):
result = (result,)
setattr(stor, key, result)
return stor
iters = [list, tuple]
import __builtin__
if hasattr(__builtin__, 'set'):
from sets import Set
except ImportError:
class _hack(tuple): pass
iters = _hack(iters)
iters.__doc__ = """
A list of iterable items (like lists, but not strings). Includes whichever
of lists, tuples, sets, and Sets are available in this version of Python.
def _strips(direction, text, remove):
if direction == 'l':
if text.startswith(remove):
return text[len(remove):]
elif direction == 'r':
if text.endswith(remove):
return text[:-len(remove)]
raise ValueError, "Direction needs to be r or l."
return text
def rstrips(text, remove):
removes the string `remove` from the right of `text`
>>> rstrips("foobar", "bar")
return _strips('r', text, remove)
def lstrips(text, remove):
removes the string `remove` from the left of `text`
>>> lstrips("foobar", "foo")
return _strips('l', text, remove)
def strips(text, remove):
"""removes the string `remove` from the both sides of `text`
>>> strips("foobarfoo", "foo")
return rstrips(lstrips(text, remove), remove)
def utf8(text):
"""Encodes text in utf-8.
>> utf8(u'\u1234') # doctest doesn't seem to like utf-8
>>> utf8('hello')
>>> utf8(42)
if isinstance(text, unicode):
return text.encode('utf-8')
elif isinstance(text, str):
return text
return str(text)
class TimeoutError(Exception): pass
def timelimit(timeout):
A decorator to limit a function to `timeout` seconds, raising `TimeoutError`
if it takes longer.
>>> import time
>>> def meaningoflife():
... time.sleep(.2)
... return 42
>>> timelimit(.1)(meaningoflife)()
Traceback (most recent call last):
TimeoutError: took too long
>>> timelimit(1)(meaningoflife)()
_Caveat:_ The function isn't stopped after `timeout` seconds but continues
executing in a separate thread. (There seems to be no way to kill a thread.)
inspired by <http://aspn.activestate.com/ASPN/Cookbook/Python/Recipe/473878>
def _1(function):
def _2(*args, **kw):
class Dispatch(threading.Thread):
def __init__(self):
self.result = None
self.error = None
def run(self):
self.result = function(*args, **kw)
self.error = sys.exc_info()
c = Dispatch()
if c.isAlive():
raise TimeoutError, 'took too long'
if c.error:
raise c.error[0], c.error[1]
return c.result
return _2
return _1
class Memoize:
'Memoizes' a function, caching its return values for each input.
>>> import time
>>> def meaningoflife():
... time.sleep(.2)
... return 42
>>> fastlife = memoize(meaningoflife)
>>> meaningoflife()
>>> timelimit(.1)(meaningoflife)()
Traceback (most recent call last):
TimeoutError: took too long
>>> fastlife()
>>> timelimit(.1)(fastlife)()
def __init__(self, func):
self.func = func
self.cache = {}
def __call__(self, *args, **keywords):
key = (args, tuple(keywords.items()))
if key not in self.cache:
self.cache[key] = self.func(*args, **keywords)
return self.cache[key]
memoize = Memoize
re_compile = memoize(re.compile) #@@ threadsafe?
re_compile.__doc__ = """
A memoized version of re.compile.
class _re_subm_proxy:
def __init__(self):
self.match = None
def __call__(self, match):
self.match = match
return ''
def re_subm(pat, repl, string):
Like re.sub, but returns the replacement _and_ the match object.
>>> t, m = re_subm('g(oo+)fball', r'f\\1lish', 'goooooofball')
>>> t
>>> m.groups()
compiled_pat = re_compile(pat)
proxy = _re_subm_proxy()
compiled_pat.sub(proxy.__call__, string)
return compiled_pat.sub(repl, string), proxy.match
def group(seq, size):
Returns an iterator over a series of lists of length size from iterable.
>>> list(group([1,2,3,4], 2))
[[1, 2], [3, 4]]
if not hasattr(seq, 'next'):
seq = iter(seq)
while True:
yield [seq.next() for i in xrange(size)]
class IterBetter:
Returns an object that can be used as an iterator
but can also be used via __getitem__ (although it
cannot go backwards -- that is, you cannot request
`iterbetter[0]` after requesting `iterbetter[1]`).
>>> import itertools
>>> c = iterbetter(itertools.count())
>>> c[1]
>>> c[5]
>>> c[3]
Traceback (most recent call last):
IndexError: already passed 3
def __init__(self, iterator):
self.i, self.c = iterator, 0
def __iter__(self):
while 1:
yield self.i.next()
self.c += 1
def __getitem__(self, i):
#todo: slices
if i < self.c:
raise IndexError, "already passed "+str(i)
while i > self.c:
self.c += 1
# now self.c == i
self.c += 1
return self.i.next()
except StopIteration:
raise IndexError, str(i)
iterbetter = IterBetter
def dictreverse(mapping):
>>> dictreverse({1: 2, 3: 4})
{2: 1, 4: 3}
return dict([(value, key) for (key, value) in mapping.iteritems()])
def dictfind(dictionary, element):
Returns a key whose value in `dictionary` is `element`
or, if none exists, None.
>>> d = {1:2, 3:4}
>>> dictfind(d, 4)
>>> dictfind(d, 5)
for (key, value) in dictionary.iteritems():
if element is value:
return key
def dictfindall(dictionary, element):
Returns the keys whose values in `dictionary` are `element`
or, if none exists, [].
>>> d = {1:4, 3:4}
>>> dictfindall(d, 4)
[1, 3]
>>> dictfindall(d, 5)
res = []
for (key, value) in dictionary.iteritems():
if element is value:
return res
def dictincr(dictionary, element):
Increments `element` in `dictionary`,
setting it to one if it doesn't exist.
>>> d = {1:2, 3:4}
>>> dictincr(d, 1)
>>> d[1]
>>> dictincr(d, 5)
>>> d[5]
dictionary.setdefault(element, 0)
dictionary[element] += 1
return dictionary[element]
def dictadd(*dicts):
Returns a dictionary consisting of the keys in the argument dictionaries.
If they share a key, the value from the last argument is used.
>>> dictadd({1: 0, 2: 0}, {2: 1, 3: 1})
{1: 0, 2: 1, 3: 1}
result = {}
for dct in dicts:
return result
def listget(lst, ind, default=None):
Returns `lst[ind]` if it exists, `default` otherwise.
>>> listget(['a'], 0)
>>> listget(['a'], 1)
>>> listget(['a'], 1, 'b')
if len(lst)-1 < ind:
return default
return lst[ind]
def intget(integer, default=None):
Returns `integer` as an int or `default` if it can't.
>>> intget('3')
>>> intget('3a')
>>> intget('3a', 0)
return int(integer)
except (TypeError, ValueError):
return default
def datestr(then, now=None):
Converts a (UTC) datetime object to a nice string representation.
>>> from datetime import datetime, timedelta
>>> d = datetime(1970, 5, 1)
>>> datestr(d, now=d)
'0 microseconds ago'
>>> for t, v in {
... timedelta(microseconds=1): '1 microsecond ago',
... timedelta(microseconds=2): '2 microseconds ago',
... -timedelta(microseconds=1): '1 microsecond from now',
... -timedelta(microseconds=2): '2 microseconds from now',
... timedelta(microseconds=2000): '2 milliseconds ago',
... timedelta(seconds=2): '2 seconds ago',
... timedelta(seconds=2*60): '2 minutes ago',
... timedelta(seconds=2*60*60): '2 hours ago',
... timedelta(days=2): '2 days ago',
... }.iteritems():
... assert datestr(d, now=d+t) == v
>>> datestr(datetime(1970, 1, 1), now=d)
'January 1'
>>> datestr(datetime(1969, 1, 1), now=d)
'January 1, 1969'
>>> datestr(datetime(1970, 6, 1), now=d)
'June 1, 1970'
def agohence(n, what, divisor=None):
if divisor: n = n // divisor
out = str(abs(n)) + ' ' + what # '2 day'
if abs(n) != 1: out += 's' # '2 days'
out += ' ' # '2 days '
if n < 0:
out += 'from now'
out += 'ago'
return out # '2 days ago'
oneday = 24 * 60 * 60
if not now: now = datetime.datetime.utcnow()
if type(now).__name__ == "DateTime":
now = datetime.datetime.fromtimestamp(now)
if type(then).__name__ == "DateTime":
then = datetime.datetime.fromtimestamp(then)
delta = now - then
deltaseconds = int(delta.days * oneday + delta.seconds + delta.microseconds * 1e-06)
deltadays = abs(deltaseconds) // oneday
if deltaseconds < 0: deltadays *= -1 # fix for oddity of floor
if deltadays:
if abs(deltadays) < 4:
return agohence(deltadays, 'day')
out = then.strftime('%B %e') # e.g. 'June 13'
if then.year != now.year or deltadays < 0:
out += ', %s' % then.year
return out
if int(deltaseconds):
if abs(deltaseconds) > (60 * 60):
return agohence(deltaseconds, 'hour', 60 * 60)
elif abs(deltaseconds) > 60:
return agohence(deltaseconds, 'minute', 60)
return agohence(deltaseconds, 'second')
deltamicroseconds = delta.microseconds
if delta.days: deltamicroseconds = int(delta.microseconds - 1e6) # datetime oddity
if abs(deltamicroseconds) > 1000:
return agohence(deltamicroseconds, 'millisecond', 1000)
return agohence(deltamicroseconds, 'microsecond')
def numify(string):
Removes all non-digit characters from `string`.
>>> numify('800-555-1212')
>>> numify('800.555.1212')
return ''.join([c for c in str(string) if c.isdigit()])
def denumify(string, pattern):
Formats `string` according to `pattern`, where the letter X gets replaced
by characters from `string`.
>>> denumify("8005551212", "(XXX) XXX-XXXX")
'(800) 555-1212'
out = []
for c in pattern:
if c == "X":
string = string[1:]
return ''.join(out)
def dateify(datestring):
Formats a numified `datestring` properly.
return denumify(datestring, "XXXX-XX-XX XX:XX:XX")
class CaptureStdout:
Captures everything `func` prints to stdout and returns it instead.
>>> def idiot():
... print "foo"
>>> capturestdout(idiot)()
**WARNING:** Not threadsafe!
def __init__(self, func):
self.func = func
def __call__(self, *args, **keywords):
from cStringIO import StringIO
# Not threadsafe!
out = StringIO()
oldstdout = sys.stdout
sys.stdout = out
self.func(*args, **keywords)
sys.stdout = oldstdout
return out.getvalue()
capturestdout = CaptureStdout
class Profile:
Profiles `func` and returns a tuple containing its output
and a string with human-readable profiling information.
>>> import time
>>> out, inf = profile(time.sleep)(.001)
>>> out
>>> inf[:10].strip()
'took 0.0'
def __init__(self, func):
self.func = func
def __call__(self, *args): ##, **kw): kw unused
import hotshot, hotshot.stats, tempfile ##, time already imported
temp = tempfile.NamedTemporaryFile()
prof = hotshot.Profile(temp.name)
stime = time.time()
result = prof.runcall(self.func, *args)
stime = time.time() - stime
stats = hotshot.stats.load(temp.name)
stats.sort_stats('time', 'calls')
x = '\n\ntook '+ str(stime) + ' seconds\n'
x += capturestdout(stats.print_stats)(40)
x += capturestdout(stats.print_callers)()
return result, x
profile = Profile
import traceback
# hack for compatibility with Python 2.3:
if not hasattr(traceback, 'format_exc'):
from cStringIO import StringIO
def format_exc(limit=None):
strbuf = StringIO()
traceback.print_exc(limit, strbuf)
return strbuf.getvalue()
traceback.format_exc = format_exc
def tryall(context, prefix=None):
Tries a series of functions and prints their results.
`context` is a dictionary mapping names to values;
the value will only be tried if it's callable.
>>> tryall(dict(j=lambda: True))
j: True
True: 1
For example, you might have a file `test/stuff.py`
with a series of functions testing various things in it.
At the bottom, have a line:
if __name__ == "__main__": tryall(globals())
Then you can run `python test/stuff.py` and get the results of
all the tests.
context = context.copy() # vars() would update
results = {}
for (key, value) in context.iteritems():
if not hasattr(value, '__call__'):
if prefix and not key.startswith(prefix):
print key + ':',
r = value()
dictincr(results, r)
print r
print 'ERROR'
dictincr(results, 'ERROR')
print ' ' + '\n '.join(traceback.format_exc().split('\n'))
print '-'*40
print 'results:'
for (key, value) in results.iteritems():
print ' '*2, str(key)+':', value
class ThreadedDict:
Takes a dictionary that maps threads to objects.
When a thread tries to get or set an attribute or item
of the threadeddict, it passes it on to the object
for that thread in dictionary.
def __init__(self, dictionary):
self.__dict__['_ThreadedDict__d'] = dictionary
def __getattr__(self, attr):
return getattr(self.__d[threading.currentThread()], attr)
def __getitem__(self, item):
return self.__d[threading.currentThread()][item]
def __setattr__(self, attr, value):
if attr == '__doc__':
self.__dict__[attr] = value
return setattr(self.__d[threading.currentThread()], attr, value)
def __delattr__(self, item):
del self.__d[threading.currentThread()][item]
except KeyError, k:
raise AttributeError, k
def __delitem__(self, item):
del self.__d[threading.currentThread()][item]
def __setitem__(self, item, value):
self.__d[threading.currentThread()][item] = value
def __hash__(self):
return hash(self.__d[threading.currentThread()])
threadeddict = ThreadedDict
def autoassign(self, locals):
Automatically assigns local variables to `self`.
>>> self = storage()
>>> autoassign(self, dict(a=1, b=2))
>>> self
<Storage {'a': 1, 'b': 2}>
Generally used in `__init__` methods, as in:
def __init__(self, foo, bar, baz=1): autoassign(self, locals())
for (key, value) in locals.iteritems():
if key == 'self':
setattr(self, key, value)
def to36(q):
Converts an integer to base 36 (a useful scheme for human-sayable IDs).
>>> to36(35)
>>> to36(119292)
>>> int(to36(939387374), 36)
>>> to36(0)
>>> to36(-393)
Traceback (most recent call last):
ValueError: must supply a positive integer
if q < 0: raise ValueError, "must supply a positive integer"
letters = "0123456789abcdefghijklmnopqrstuvwxyz"
converted = []
while q != 0:
q, r = divmod(q, 36)
converted.insert(0, letters[r])
return "".join(converted) or '0'
r_url = re_compile('(?<!\()(http://(\S+))')
def safemarkdown(text):
Converts text to HTML following the rules of Markdown, but blocking any
outside HTML input, so that only the things supported by Markdown
can be used. Also converts raw URLs to links.
(requires [markdown.py](http://webpy.org/markdown.py))
from markdown import markdown
if text:
text = text.replace('<', '&lt;')
# TODO: automatically get page title?
text = r_url.sub(r'<\1>', text)
text = markdown(text)
return text
if __name__ == "__main__":
import doctest

Web API (wrapper around WSGI)
(from web.py)
__all__ = [
"badrequest", "notfound", "gone", "internalerror",
"header", "output", "flush", "debug",
"input", "data",
"setcookie", "cookies",
"loadhooks", "load", "unloadhooks", "unload", "_loadhooks",
import sys, os, cgi, threading, Cookie, pprint, traceback
try: import itertools
except ImportError: pass
from utils import storage, storify, threadeddict, dictadd, intget, lstrips, utf8
config = storage()
config.__doc__ = """
A configuration object for various aspects of web.py.
: A dictionary containing the parameters to be passed to `connect`
when `load()` is called.
: Set to `True` if you would like SQL queries and timings to be
printed to the debug output.
def badrequest():
"""Return a `400 Bad Request` error."""
ctx.status = '400 Bad Request'
header('Content-Type', 'text/html')
return output('bad request')
def notfound():
"""Returns a `404 Not Found` error."""
ctx.status = '404 Not Found'
header('Content-Type', 'text/html')
return output('not found')
def gone():
"""Returns a `410 Gone` error."""
ctx.status = '410 Gone'
header('Content-Type', 'text/html')
return output("gone")
def internalerror():
"""Returns a `500 Internal Server` error."""
ctx.status = "500 Internal Server Error"
ctx.headers = [('Content-Type', 'text/html')]
ctx.output = "internal server error"
def header(hdr, value, unique=False):
Adds the header `hdr: value` with the response.
If `unique` is True and a header with that name already exists,
it doesn't add a new one.
hdr, value = utf8(hdr), utf8(value)
# protection against HTTP response splitting attack
if '\n' in hdr or '\r' in hdr or '\n' in value or '\r' in value:
raise ValueError, 'invalid characters in header'
if unique is True:
for h, v in ctx.headers:
if h.lower() == hdr.lower(): return
ctx.headers.append((hdr, value))
def output(string_):
"""Appends `string_` to the response."""
if isinstance(string_, unicode): string_ = string_.encode('utf8')
if ctx.get('flush'):
ctx.output += str(string_)
def flush():
ctx.flush = True
return flush
def input(*requireds, **defaults):
Returns a `storage` object with the GET and POST arguments.
See `storify` for how `requireds` and `defaults` work.
from cStringIO import StringIO
def dictify(fs): return dict([(k, fs[k]) for k in fs.keys()])
_method = defaults.pop('_method', 'both')
e = ctx.env.copy()
a = b = {}
if _method.lower() in ['both', 'post']:
a = cgi.FieldStorage(fp = StringIO(data()), environ=e,
a = dictify(a)
if _method.lower() in ['both', 'get']:
b = dictify(cgi.FieldStorage(environ=e, keep_blank_values=1))
out = dictadd(b, a)
return storify(out, *requireds, **defaults)
except KeyError:
raise StopIteration
def data():
"""Returns the data sent with the request."""
if 'data' not in ctx:
cl = intget(ctx.env.get('CONTENT_LENGTH'), 0)
ctx.data = ctx.env['wsgi.input'].read(cl)
return ctx.data
def setcookie(name, value, expires="", domain=None):
"""Sets a cookie."""
if expires < 0:
expires = -1000000000
kargs = {'expires': expires, 'path':'/'}
if domain:
kargs['domain'] = domain
# @@ should we limit cookies to a different path?
cookie = Cookie.SimpleCookie()
cookie[name] = value
for key, val in kargs.iteritems():
cookie[name][key] = val
header('Set-Cookie', cookie.items()[0][1].OutputString())
def cookies(*requireds, **defaults):
Returns a `storage` object with all the cookies in it.
See `storify` for how `requireds` and `defaults` work.
cookie = Cookie.SimpleCookie()
cookie.load(ctx.env.get('HTTP_COOKIE', ''))
return storify(cookie, *requireds, **defaults)
except KeyError:
raise StopIteration
def debug(*args):
Prints a prettyprinted version of `args` to stderr.
out = ctx.environ['wsgi.errors']
out = sys.stderr
for arg in args:
print >> out, pprint.pformat(arg)
return ''
def _debugwrite(x):
out = ctx.environ['wsgi.errors']
out = sys.stderr
debug.write = _debugwrite
class _outputter:
"""Wraps `sys.stdout` so that print statements go into the response."""
def __init__(self, file): self.file = file
def write(self, string_):
if hasattr(ctx, 'output'):
return output(string_)
def __getattr__(self, attr): return getattr(self.file, attr)
def __getitem__(self, item): return self.file[item]
def _capturedstdout():
sysstd = sys.stdout
while hasattr(sysstd, 'file'):
if isinstance(sys.stdout, _outputter): return True
sysstd = sysstd.file
if isinstance(sys.stdout, _outputter): return True
return False
if not _capturedstdout():
sys.stdout = _outputter(sys.stdout)
_context = {threading.currentThread(): storage()}
ctx = context = threadeddict(_context)
ctx.__doc__ = """
A `storage` object containing various information about the request:
`environ` (aka `env`)
: A dictionary containing the standard WSGI environment variables.
: The domain (`Host` header) requested by the user.
: The base path for the application.
: The IP address of the requester.
: The HTTP method used.
: The path request.
: If there are no query arguments, the empty string. Otherwise, a `?` followed
by the query string.
: The full path requested, including query arguments (`== path + query`).
### Response Data
`status` (default: "200 OK")
: The status code to be used in the response.
: A list of 2-tuples to be used in the response.
: A string to be used as the response.
loadhooks = {}
_loadhooks = {}
def load():
Loads a new context for the thread.
You can ask for a function to be run at loadtime by
adding it to the dictionary `loadhooks`.
_context[threading.currentThread()] = storage()
ctx.status = '200 OK'
ctx.headers = []
if config.get('db_parameters'):
import db
for x in loadhooks.values(): x()
def _load(env):
ctx.output = ''
ctx.environ = ctx.env = env
ctx.host = env.get('HTTP_HOST')
ctx.homedomain = 'http://' + env.get('HTTP_HOST', '[unknown]')
ctx.homepath = os.environ.get('REAL_SCRIPT_NAME', env.get('SCRIPT_NAME', ''))
ctx.home = ctx.homedomain + ctx.homepath
ctx.ip = env.get('REMOTE_ADDR')
ctx.method = env.get('REQUEST_METHOD')
ctx.path = env.get('PATH_INFO')
# http://trac.lighttpd.net/trac/ticket/406 requires:
if env.get('SERVER_SOFTWARE', '').startswith('lighttpd/'):
ctx.path = lstrips(env.get('REQUEST_URI').split('?')[0],
os.environ.get('REAL_SCRIPT_NAME', env.get('SCRIPT_NAME', '')))
if env.get('QUERY_STRING'):
ctx.query = '?' + env.get('QUERY_STRING', '')
ctx.query = ''
ctx.fullpath = ctx.path + ctx.query
for x in _loadhooks.values(): x()
unloadhooks = {}
def unload():
Unloads the context for the thread.
You can ask for a function to be run at loadtime by
adding it ot the dictionary `unloadhooks`.
for x in unloadhooks.values(): x()
# ensures db cursors and such are GCed promptly
del _context[threading.currentThread()]
def _unload():
def wsgifunc(func, *middleware):
"""Returns a WSGI-compatible function from a webpy-function."""
middleware = list(middleware)
def wsgifunc(env, start_resp):
result = func()
except StopIteration:
result = None
print >> debug, traceback.format_exc()
result = internalerror()
is_generator = result and hasattr(result, 'next')
if is_generator:
# wsgi requires the headers first
# so we need to do an iteration
# and save the result for later
firstchunk = result.next()
except StopIteration:
firstchunk = ''
status, headers, output = ctx.status, ctx.headers, ctx.output
ctx._write = start_resp(status, headers)
# and now, the fun:
def cleanup():
# we insert this little generator
# at the end of our itertools.chain
# so that it unloads the request
# when everything else is done
yield '' # force it to be a generator
# result is the output of calling the webpy function
# it could be a generator...
if is_generator:
if firstchunk is flush:
# oh, it's just our special flush mode
# ctx._write is set up, so just continue execution
except StopIteration:
return []
return itertools.chain([firstchunk], result, cleanup())
# ... but it's usually just None
# output is the stuff in ctx.output
# it's usually a string...
if isinstance(output, str): #@@ other stringlikes?
return [output]
# it could be a generator...
elif hasattr(output, 'next'):
return itertools.chain(output, cleanup())
raise Exception, "Invalid ctx.output"
for mw_func in middleware:
wsgifunc = mw_func(wsgifunc)
return wsgifunc

WSGI Utilities
(from web.py)
import os, sys
import http
import webapi as web
from utils import listget
from net import validaddr, validip
import httpserver
def runfcgi(func, addr=('localhost', 8000)):
"""Runs a WSGI function as a FastCGI server."""
import flup.server.fcgi as flups
return flups.WSGIServer(func, multiplexed=True, bindAddress=addr).run()
def runscgi(func, addr=('localhost', 4000)):
"""Runs a WSGI function as an SCGI server."""
import flup.server.scgi as flups
return flups.WSGIServer(func, bindAddress=addr).run()
def runwsgi(func):
Runs a WSGI-compatible `func` using FCGI, SCGI, or a simple web server,
as appropriate based on context and `sys.argv`.
if os.environ.has_key('SERVER_SOFTWARE'): # cgi
os.environ['FCGI_FORCE_CGI'] = 'Y'
if (os.environ.has_key('PHP_FCGI_CHILDREN') #lighttpd fastcgi
or os.environ.has_key('SERVER_SOFTWARE')):
return runfcgi(func, None)
if 'fcgi' in sys.argv or 'fastcgi' in sys.argv:
args = sys.argv[1:]
if 'fastcgi' in args: args.remove('fastcgi')
elif 'fcgi' in args: args.remove('fcgi')
if args:
return runfcgi(func, validaddr(args[0]))
return runfcgi(func, None)
if 'scgi' in sys.argv:
args = sys.argv[1:]
if args:
return runscgi(func, validaddr(args[0]))
return runscgi(func)
return httpserver.runsimple(func, validip(listget(sys.argv, 1, '')))

