revert last

This commit is contained in:
Marcos Pinto 2007-12-22 08:20:17 +00:00
parent b803d39d63
commit 23d13a2cfd
30 changed files with 25 additions and 6776 deletions

View File

@ -97,7 +97,7 @@ class plugin_WebUi(object):
if status[0] == 0: if status[0] == 0:
os.kill(int(status[1].split()[0]), 9) os.kill(int(status[1].split()[0]), 9)
time.sleep(1) #safe time to wait for kill to finish. time.sleep(1) #safe time to wait for kill to finish.
self.config_file = deluge.common.CONFIG_DIR + "/webui.conf" self.config_file = os.path.join(deluge.common.CONFIG_DIR, "webui.conf")
self.config = deluge.pref.Preferences(self.config_file, False) self.config = deluge.pref.Preferences(self.config_file, False)
try: try:
self.config.load() self.config.load()
@ -162,7 +162,7 @@ class plugin_WebUi(object):
else: else:
print 'Start Webui(in process)..' print 'Start Webui(in process)..'
server_bin = os.path.dirname(__file__) + '/run_webserver' server_bin = os.path.join(os.path.dirname(__file__), 'run_webserver')
self.proc = Popen((server_bin,'env=0.5')) self.proc = Popen((server_bin,'env=0.5'))
def kill_server(self): def kill_server(self):

View File

@ -11,9 +11,9 @@ http://www.xfree86.org/3.3.6/COPYRIGHT2.html#5
__all__ = ["debugerror", "djangoerror"] __all__ = ["debugerror", "djangoerror"]
import sys, urlparse, pprint import sys, urlparse, pprint
from lib.webpy022.net import websafe from webpy022.net import websafe
from lib.webpy022.template import Template from webpy022.template import Template
import lib.webpy022.webapi as web import webpy022.webapi as web
import webserver_common as ws import webserver_common as ws
from traceback import format_tb from traceback import format_tb

View File

@ -34,15 +34,13 @@
import webserver_common as ws import webserver_common as ws
from webserver_framework import * from webserver_framework import *
import lib.webpy022 as web import webpy022 as web
from lib.webpy022.http import seeother, url from webpy022.http import seeother, url
import base64 import base64
from operator import attrgetter from operator import attrgetter
import os import os
from json_api import json_api
#routing: #routing:
urls = ( urls = (
"/login", "login", "/login", "login",
@ -66,7 +64,6 @@ urls = (
"/logout", "logout", "/logout", "logout",
#remote-api: #remote-api:
"/remote/torrent/add(.*)", "remote_torrent_add", "/remote/torrent/add(.*)", "remote_torrent_add",
"/json/(.*)","json_api",
#static: #static:
"/static/(.*)", "static", "/static/(.*)", "static",
"/template/static/(.*)", "template_static", "/template/static/(.*)", "template_static",

View File

@ -1,132 +0,0 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
# webserver_framework.py
#
# Copyright (C) Martijn Voncken 2007 <mvoncken@gmail.com>
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2, or (at your option)
# any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, write to:
# The Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor
# Boston, MA 02110-1301, USA.
#
# In addition, as a special exception, the copyright holders give
# permission to link the code of portions of this program with the OpenSSL
# library.
# You must obey the GNU General Public License in all respects for all of
# the code used other than OpenSSL. If you modify file(s) with this
# exception, you may extend this exception to your version of the file(s),
# but you are not obligated to do so. If you do not wish to do so, delete
# this exception statement from your version. If you delete this exception
# statement from all source files in the program, then also delete it here.
"""
json api.
only used for XUL and/or external scripts
it would be possible not to incluse the python-json dependency.
"""
import deluge.ui.client as proxy
from new import instancemethod
from inspect import getargspec
from webserver_framework import remote,ws,get_torrent_status,log
proxy = ws.proxy
def to_json(obj):
from lib.pythonize import pythonize
obj = pythonize(obj)
try:
import json
return json.write(obj)
except ImportError:
raise ImportError("""Install python-json using your package-manager
http://sourceforge.net/projects/json-py/""")
class json_api:
"""
eperimental json api
generic proxy for all methods onm self.
"""
illegal_methods = ['shutdown', 'socket', 'xmlrpclib','pickle','os',
'is_localhost','CoreProxy','connect_on_new_core', 'connect_on_no_core',
'connected','deluge','GET','POST']
def __init__(self):
self._add_proxy_methods()
@remote
def GET(self,name):
if name.startswith('_'):
raise AttributeError('_ methods are illegal!')
if name in self.illegal_methods:
raise AttributeError('Illegal method , I smell a rat!')
if not(hasattr(self,name)):
raise AttributeError('No such Method')
method = getattr(self,name)
kwargs = {}
result = method(**kwargs)
return to_json(result)
POST = GET
def list_methods(self):
"""
list all json methods
returns a dict of {methodname:{args:[list of kwargs],doc:'string'},..}
"""
methods = [getattr(self,m) for m in dir(self)
if not m.startswith('_')
and (not m in self.illegal_methods)
and callable(getattr(self,m))
]
return dict([(f.__name__,
{'args':getargspec(f)[0],'doc':(f.__doc__ or '').strip()})
for f in methods])
def _add_proxy_methods(self):
methods = [getattr(proxy,m) for m in dir(proxy)
if not m.startswith('_')
and (not m in self.illegal_methods)
and callable(getattr(proxy,m))
]
for m in methods:
setattr(self,m.__name__,m)
#extra's:
def list_torrents(self):
return [get_torrent_status(torrent_id)
for torrent_id in ws.proxy.get_session_state()]
get_torrent_status = get_torrent_status
if __name__ == '__main__':
from pprint import pprint
#proxy.set_core_uri('http://localhost:58846') #How to configure this?
j = json_api()
if True:
print 'list-methods:'
methods = j.list_methods()
names = methods.keys()
names.sort()
for name in names:
m = methods[name]
print "%s(%s)\n %s\n" % (name , m['args'] , m['doc'])
#j.GET('list_torrents')
j.POST('list_torrents')

File diff suppressed because it is too large Load Diff

View File

@ -1,38 +0,0 @@
"""
some dbus to python type conversions
-decorator for interface
-wrapper class for proxy
"""
def pythonize(var):
"""translates dbus types back to basic python types."""
if isinstance(var, list):
return [pythonize(value) for value in var]
if isinstance(var, tuple):
return tuple([pythonize(value) for value in var])
if isinstance(var, dict):
return dict(
[(pythonize(key), pythonize(value)) for key, value in var.iteritems()]
)
for klass in [unicode, str, bool, int, float, long]:
if isinstance(var,klass):
return klass(var)
return var
def pythonize_call(func):
def deco(*args,**kwargs):
return pythonize(func(*args, **kwargs))
return deco
def pythonize_interface(func):
def deco(*args, **kwargs):
args = pythonize(args)
kwargs = pythonize(kwargs)
return func(*args, **kwargs)
return deco
class PythonizeProxy(object):
def __init__(self,proxy):
self.proxy = proxy
def __getattr__(self, key):
return pythonize_call(getattr(self.proxy, key))

View File

@ -1,8 +0,0 @@
This folder may only contain general purpose utilities/files/tools.
They should be usable outside of deluge.
Disclaimer:
Some may have been adapted to work better with deluge.
But they will not other import parts of deluge or Webui.

View File

@ -1,136 +0,0 @@
#!/usr/bin/env python
#(c) Martijn Voncken, mvoncken@gmail.com
#Same Licence as web.py 0.22 ->Public Domain
#
"""
static fileserving for web.py
without the need for wsgi wrapper magic.
"""
import webpy022 as web
from webpy022.http import seeother, url
import posixpath
import urlparse
import urllib
import mimetypes
import os
import datetime
import cgi
from StringIO import StringIO
mimetypes.init() # try to read system mime.types
class static_handler:
"""
mostly c&p from SimpleHttpServer
serves relative from start location
"""
base_dir = './'
extensions_map = mimetypes.types_map
def get_base_dir(self):
#override this if you have a config that changes the base dir at runtime
#deluge on windows :(
return self.base_dir
def GET(self, path):
path = self.translate_path(path)
if os.path.isdir(path):
if not path.endswith('/'):
path += "/"
return self.list_directory(path)
ctype = self.guess_type(path)
try:
f = open(path, 'rb')
except IOError:
raise Exception('file not found:%s' % path)
#web.header("404", "File not found")
#return
web.header("Content-type", ctype)
fs = os.fstat(f.fileno())
web.header("Content-Length", str(fs[6]))
web.lastmodified(datetime.datetime.fromtimestamp(fs.st_mtime))
print f.read()
def translate_path(self, path):
"""Translate a /-separated PATH to the local filename syntax.
Components that mean special things to the local file system
(e.g. drive or directory names) are ignored. (XXX They should
probably be diagnosed.)
"""
# abandon query parameters
path = urlparse.urlparse(path)[2]
path = posixpath.normpath(urllib.unquote(path))
words = path.split('/')
words = filter(None, words)
path = self.get_base_dir()
for word in words:
drive, word = os.path.splitdrive(word)
head, word = os.path.split(word)
if word in (os.curdir, os.pardir): continue
path = os.path.join(path, word)
return path
def guess_type(self, path):
base, ext = posixpath.splitext(path)
if ext in self.extensions_map:
return self.extensions_map[ext]
ext = ext.lower()
if ext in self.extensions_map:
return self.extensions_map[ext]
else:
return 'application/octet-stream'
def list_directory(self, path):
"""Helper to produce a directory listing (absent index.html).
Return value is either a file object, or None (indicating an
error). In either case, the headers are sent, making the
interface the same as for send_head().
#TODO ->use web.py +template!
"""
try:
list = os.listdir(path)
except os.error:
web.header('404', "No permission to list directory")
return None
list.sort(key=lambda a: a.lower())
f = StringIO()
displaypath = cgi.escape(urllib.unquote(path))
f.write("<title>Directory listing for %s</title>\n" % displaypath)
f.write("<h2>Directory listing for %s</h2>\n" % displaypath)
f.write("<hr>\n<ul>\n")
for name in list:
fullname = os.path.join(path, name)
displayname = linkname = name
# Append / for directories or @ for symbolic links
if os.path.isdir(fullname):
displayname = name + "/"
linkname = name + "/"
if os.path.islink(fullname):
displayname = name + "@"
# Note: a link to a directory displays with @ and links with /
f.write('<li><a href="%s">%s</a>\n'
% (urllib.quote(linkname), cgi.escape(displayname)))
f.write("</ul>\n<hr>\n")
length = f.tell()
f.seek(0)
web.header("Content-type", "text/html")
web.header("Content-Length", str(length))
print f.read()
if __name__ == '__main__':
#example:
class usr_static(static_handler):
base_dir = os.path.expanduser('~')
urls = ('/relative/(.*)','static_handler',
'/(.*)','usr_static')
web.run(urls,globals())

View File

@ -1 +0,0 @@
http://webpy.org/

View File

@ -1,62 +0,0 @@
#!/usr/bin/env python
from __future__ import generators
"""web.py: makes web apps (http://webpy.org)"""
__version__ = "0.22"
__revision__ = "$Rev: 183 $"
__author__ = "Aaron Swartz <me@aaronsw.com>"
__license__ = "public domain"
__contributors__ = "see http://webpy.org/changes"
# todo:
# - some sort of accounts system
import utils, db, net, wsgi, http, webapi, request, httpserver, debugerror
import template, form
from utils import *
from db import *
from net import *
from wsgi import *
from http import *
from webapi import *
from request import *
from httpserver import *
from debugerror import *
try:
import cheetah
from cheetah import *
except ImportError:
pass
def main():
import doctest
doctest.testmod(utils)
doctest.testmod(db)
doctest.testmod(net)
doctest.testmod(wsgi)
doctest.testmod(http)
doctest.testmod(webapi)
doctest.testmod(request)
try:
doctest.testmod(cheetah)
except NameError:
pass
template.test()
import sys
urls = ('/web.py', 'source')
class source:
def GET(self):
header('Content-Type', 'text/python')
print open(sys.argv[0]).read()
if listget(sys.argv, 1) != 'test':
run(urls, locals())
if __name__ == "__main__": main()

View File

@ -1,5 +0,0 @@
1:Commented out some code to enable a relative redirect.
This is not according to HTTP/1.1 Spec
But many deluge users will want to route the webui through firewalls/routers or use apache redirects.
2:Disabled logging in the builtin http-server.

View File

@ -1,98 +0,0 @@
"""
Cheetah API
(from web.py)
"""
__all__ = ["render"]
import re, urlparse, pprint, traceback, sys
from Cheetah.Compiler import Compiler
from Cheetah.Filters import Filter
from utils import re_compile, memoize, dictadd
from net import htmlquote, websafe
from webapi import ctx, header, output, input, cookies, loadhooks
def upvars(level=2):
"""Guido van Rossum sez: don't use this function."""
return dictadd(
sys._getframe(level).f_globals,
sys._getframe(level).f_locals)
r_include = re_compile(r'(?!\\)#include \"(.*?)\"($|#)', re.M)
def __compiletemplate(template, base=None, isString=False):
if isString:
text = template
else:
text = open('templates/'+template).read()
# implement #include at compile-time
def do_include(match):
text = open('templates/'+match.groups()[0]).read()
return text
while r_include.findall(text):
text = r_include.sub(do_include, text)
execspace = _compiletemplate.bases.copy()
tmpl_compiler = Compiler(source=text, mainClassName='GenTemplate')
tmpl_compiler.addImportedVarNames(execspace.keys())
exec str(tmpl_compiler) in execspace
if base:
_compiletemplate.bases[base] = execspace['GenTemplate']
return execspace['GenTemplate']
_compiletemplate = memoize(__compiletemplate)
_compiletemplate.bases = {}
def render(template, terms=None, asTemplate=False, base=None,
isString=False):
"""
Renders a template, caching where it can.
`template` is the name of a file containing the a template in
the `templates/` folder, unless `isString`, in which case it's the
template itself.
`terms` is a dictionary used to fill the template. If it's None, then
the caller's local variables are used instead, plus context, if it's not
already set, is set to `context`.
If asTemplate is False, it `output`s the template directly. Otherwise,
it returns the template object.
If the template is a potential base template (that is, something other templates)
can extend, then base should be a string with the name of the template. The
template will be cached and made available for future calls to `render`.
Requires [Cheetah](http://cheetahtemplate.org/).
"""
# terms=['var1', 'var2'] means grab those variables
if isinstance(terms, list):
new = {}
old = upvars()
for k in terms:
new[k] = old[k]
terms = new
# default: grab all locals
elif terms is None:
terms = {'context': ctx, 'ctx':ctx}
terms.update(sys._getframe(1).f_locals)
# terms=d means use d as the searchList
if not isinstance(terms, tuple):
terms = (terms,)
if 'headers' in ctx and not isString and template.endswith('.html'):
header('Content-Type','text/html; charset=utf-8', unique=True)
if loadhooks.has_key('reloader'):
compiled_tmpl = __compiletemplate(template, base=base, isString=isString)
else:
compiled_tmpl = _compiletemplate(template, base=base, isString=isString)
compiled_tmpl = compiled_tmpl(searchList=terms, filter=WebSafe)
if asTemplate:
return compiled_tmpl
else:
return output(str(compiled_tmpl))
class WebSafe(Filter):
def filter(self, val, **keywords):
return websafe(val)

View File

@ -1,703 +0,0 @@
"""
Database API
(part of web.py)
"""
# todo:
# - test with sqlite
# - a store function?
__all__ = [
"UnknownParamstyle", "UnknownDB",
"sqllist", "sqlors", "aparam", "reparam",
"SQLQuery", "sqlquote",
"SQLLiteral", "sqlliteral",
"connect",
"TransactionError", "transaction", "transact", "commit", "rollback",
"query",
"select", "insert", "update", "delete"
]
import time
try: import datetime
except ImportError: datetime = None
from utils import storage, iters, iterbetter
import webapi as web
try:
from DBUtils import PooledDB
web.config._hasPooling = True
except ImportError:
web.config._hasPooling = False
class _ItplError(ValueError):
def __init__(self, text, pos):
ValueError.__init__(self)
self.text = text
self.pos = pos
def __str__(self):
return "unfinished expression in %s at char %d" % (
repr(self.text), self.pos)
def _interpolate(format):
"""
Takes a format string and returns a list of 2-tuples of the form
(boolean, string) where boolean says whether string should be evaled
or not.
from <http://lfw.org/python/Itpl.py> (public domain, Ka-Ping Yee)
"""
from tokenize import tokenprog
def matchorfail(text, pos):
match = tokenprog.match(text, pos)
if match is None:
raise _ItplError(text, pos)
return match, match.end()
namechars = "abcdefghijklmnopqrstuvwxyz" \
"ABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789_";
chunks = []
pos = 0
while 1:
dollar = format.find("$", pos)
if dollar < 0:
break
nextchar = format[dollar + 1]
if nextchar == "{":
chunks.append((0, format[pos:dollar]))
pos, level = dollar + 2, 1
while level:
match, pos = matchorfail(format, pos)
tstart, tend = match.regs[3]
token = format[tstart:tend]
if token == "{":
level = level + 1
elif token == "}":
level = level - 1
chunks.append((1, format[dollar + 2:pos - 1]))
elif nextchar in namechars:
chunks.append((0, format[pos:dollar]))
match, pos = matchorfail(format, dollar + 1)
while pos < len(format):
if format[pos] == "." and \
pos + 1 < len(format) and format[pos + 1] in namechars:
match, pos = matchorfail(format, pos + 1)
elif format[pos] in "([":
pos, level = pos + 1, 1
while level:
match, pos = matchorfail(format, pos)
tstart, tend = match.regs[3]
token = format[tstart:tend]
if token[0] in "([":
level = level + 1
elif token[0] in ")]":
level = level - 1
else:
break
chunks.append((1, format[dollar + 1:pos]))
else:
chunks.append((0, format[pos:dollar + 1]))
pos = dollar + 1 + (nextchar == "$")
if pos < len(format):
chunks.append((0, format[pos:]))
return chunks
class UnknownParamstyle(Exception):
"""
raised for unsupported db paramstyles
(currently supported: qmark, numeric, format, pyformat)
"""
pass
def aparam():
"""
Returns the appropriate string to be used to interpolate
a value with the current `web.ctx.db_module` or simply %s
if there isn't one.
>>> aparam()
'%s'
"""
if hasattr(web.ctx, 'db_module'):
style = web.ctx.db_module.paramstyle
else:
style = 'pyformat'
if style == 'qmark':
return '?'
elif style == 'numeric':
return ':1'
elif style in ['format', 'pyformat']:
return '%s'
raise UnknownParamstyle, style
def reparam(string_, dictionary):
"""
Takes a string and a dictionary and interpolates the string
using values from the dictionary. Returns an `SQLQuery` for the result.
>>> reparam("s = $s", dict(s=True))
<sql: "s = 't'">
"""
vals = []
result = []
for live, chunk in _interpolate(string_):
if live:
result.append(aparam())
vals.append(eval(chunk, dictionary))
else: result.append(chunk)
return SQLQuery(''.join(result), vals)
def sqlify(obj):
"""
converts `obj` to its proper SQL version
>>> sqlify(None)
'NULL'
>>> sqlify(True)
"'t'"
>>> sqlify(3)
'3'
"""
# because `1 == True and hash(1) == hash(True)`
# we have to do this the hard way...
if obj is None:
return 'NULL'
elif obj is True:
return "'t'"
elif obj is False:
return "'f'"
elif datetime and isinstance(obj, datetime.datetime):
return repr(obj.isoformat())
else:
return repr(obj)
class SQLQuery:
"""
You can pass this sort of thing as a clause in any db function.
Otherwise, you can pass a dictionary to the keyword argument `vars`
and the function will call reparam for you.
"""
# tested in sqlquote's docstring
def __init__(self, s='', v=()):
self.s, self.v = str(s), tuple(v)
def __getitem__(self, key): # for backwards-compatibility
return [self.s, self.v][key]
def __add__(self, other):
if isinstance(other, str):
self.s += other
elif isinstance(other, SQLQuery):
self.s += other.s
self.v += other.v
return self
def __radd__(self, other):
if isinstance(other, str):
self.s = other + self.s
return self
else:
return NotImplemented
def __str__(self):
try:
return self.s % tuple([sqlify(x) for x in self.v])
except (ValueError, TypeError):
return self.s
def __repr__(self):
return '<sql: %s>' % repr(str(self))
class SQLLiteral:
"""
Protects a string from `sqlquote`.
>>> insert('foo', time=SQLLiteral('NOW()'), _test=True)
<sql: 'INSERT INTO foo (time) VALUES (NOW())'>
"""
def __init__(self, v):
self.v = v
def __repr__(self):
return self.v
sqlliteral = SQLLiteral
def sqlquote(a):
"""
Ensures `a` is quoted properly for use in a SQL query.
>>> 'WHERE x = ' + sqlquote(True) + ' AND y = ' + sqlquote(3)
<sql: "WHERE x = 't' AND y = 3">
"""
return SQLQuery(aparam(), (a,))
class UnknownDB(Exception):
"""raised for unsupported dbms"""
pass
def connect(dbn, **keywords):
"""
Connects to the specified database.
`dbn` currently must be "postgres", "mysql", or "sqlite".
If DBUtils is installed, connection pooling will be used.
"""
if dbn == "postgres":
try:
import psycopg2 as db
except ImportError:
try:
import psycopg as db
except ImportError:
import pgdb as db
if 'pw' in keywords:
keywords['password'] = keywords['pw']
del keywords['pw']
keywords['database'] = keywords['db']
del keywords['db']
elif dbn == "mysql":
import MySQLdb as db
if 'pw' in keywords:
keywords['passwd'] = keywords['pw']
del keywords['pw']
db.paramstyle = 'pyformat' # it's both, like psycopg
elif dbn == "sqlite":
try:
import sqlite3 as db
db.paramstyle = 'qmark'
except ImportError:
try:
from pysqlite2 import dbapi2 as db
db.paramstyle = 'qmark'
except ImportError:
import sqlite as db
web.config._hasPooling = False
keywords['database'] = keywords['db']
del keywords['db']
elif dbn == "firebird":
import kinterbasdb as db
if 'pw' in keywords:
keywords['passwd'] = keywords['pw']
del keywords['pw']
keywords['database'] = keywords['db']
del keywords['db']
else:
raise UnknownDB, dbn
web.ctx.db_name = dbn
web.ctx.db_module = db
web.ctx.db_transaction = 0
web.ctx.db = keywords
def _PooledDB(db, keywords):
# In DBUtils 0.9.3, `dbapi` argument is renamed as `creator`
# see Bug#122112
if PooledDB.__version__.split('.') < '0.9.3'.split('.'):
return PooledDB.PooledDB(dbapi=db, **keywords)
else:
return PooledDB.PooledDB(creator=db, **keywords)
def db_cursor():
if isinstance(web.ctx.db, dict):
keywords = web.ctx.db
if web.config._hasPooling:
if 'db' not in globals():
globals()['db'] = _PooledDB(db, keywords)
web.ctx.db = globals()['db'].connection()
else:
web.ctx.db = db.connect(**keywords)
return web.ctx.db.cursor()
web.ctx.db_cursor = db_cursor
web.ctx.dbq_count = 0
def db_execute(cur, sql_query, dorollback=True):
"""executes an sql query"""
web.ctx.dbq_count += 1
try:
a = time.time()
out = cur.execute(sql_query.s, sql_query.v)
b = time.time()
except:
if web.config.get('db_printing'):
print >> web.debug, 'ERR:', str(sql_query)
if dorollback: rollback(care=False)
raise
if web.config.get('db_printing'):
print >> web.debug, '%s (%s): %s' % (round(b-a, 2), web.ctx.dbq_count, str(sql_query))
return out
web.ctx.db_execute = db_execute
return web.ctx.db
class TransactionError(Exception): pass
class transaction:
"""
A context that can be used in conjunction with "with" statements
to implement SQL transactions. Starts a transaction on enter,
rolls it back if there's an error; otherwise it commits it at the
end.
"""
def __enter__(self):
transact()
def __exit__(self, exctype, excvalue, traceback):
if exctype is not None:
rollback()
else:
commit()
def transact():
"""Start a transaction."""
if not web.ctx.db_transaction:
# commit everything up to now, so we don't rollback it later
if hasattr(web.ctx.db, 'commit'):
web.ctx.db.commit()
else:
db_cursor = web.ctx.db_cursor()
web.ctx.db_execute(db_cursor,
SQLQuery("SAVEPOINT webpy_sp_%s" % web.ctx.db_transaction))
web.ctx.db_transaction += 1
def commit():
"""Commits a transaction."""
web.ctx.db_transaction -= 1
if web.ctx.db_transaction < 0:
raise TransactionError, "not in a transaction"
if not web.ctx.db_transaction:
if hasattr(web.ctx.db, 'commit'):
web.ctx.db.commit()
else:
db_cursor = web.ctx.db_cursor()
web.ctx.db_execute(db_cursor,
SQLQuery("RELEASE SAVEPOINT webpy_sp_%s" % web.ctx.db_transaction))
def rollback(care=True):
"""Rolls back a transaction."""
web.ctx.db_transaction -= 1
if web.ctx.db_transaction < 0:
web.db_transaction = 0
if care:
raise TransactionError, "not in a transaction"
else:
return
if not web.ctx.db_transaction:
if hasattr(web.ctx.db, 'rollback'):
web.ctx.db.rollback()
else:
db_cursor = web.ctx.db_cursor()
web.ctx.db_execute(db_cursor,
SQLQuery("ROLLBACK TO SAVEPOINT webpy_sp_%s" % web.ctx.db_transaction),
dorollback=False)
def query(sql_query, vars=None, processed=False, _test=False):
"""
Execute SQL query `sql_query` using dictionary `vars` to interpolate it.
If `processed=True`, `vars` is a `reparam`-style list to use
instead of interpolating.
>>> query("SELECT * FROM foo", _test=True)
<sql: 'SELECT * FROM foo'>
>>> query("SELECT * FROM foo WHERE x = $x", vars=dict(x='f'), _test=True)
<sql: "SELECT * FROM foo WHERE x = 'f'">
>>> query("SELECT * FROM foo WHERE x = " + sqlquote('f'), _test=True)
<sql: "SELECT * FROM foo WHERE x = 'f'">
"""
if vars is None: vars = {}
if not processed and not isinstance(sql_query, SQLQuery):
sql_query = reparam(sql_query, vars)
if _test: return sql_query
db_cursor = web.ctx.db_cursor()
web.ctx.db_execute(db_cursor, sql_query)
if db_cursor.description:
names = [x[0] for x in db_cursor.description]
def iterwrapper():
row = db_cursor.fetchone()
while row:
yield storage(dict(zip(names, row)))
row = db_cursor.fetchone()
out = iterbetter(iterwrapper())
if web.ctx.db_name != "sqlite":
out.__len__ = lambda: int(db_cursor.rowcount)
out.list = lambda: [storage(dict(zip(names, x))) \
for x in db_cursor.fetchall()]
else:
out = db_cursor.rowcount
if not web.ctx.db_transaction: web.ctx.db.commit()
return out
def sqllist(lst):
"""
Converts the arguments for use in something like a WHERE clause.
>>> sqllist(['a', 'b'])
'a, b'
>>> sqllist('a')
'a'
"""
if isinstance(lst, str):
return lst
else:
return ', '.join(lst)
def sqlors(left, lst):
"""
`left is a SQL clause like `tablename.arg = `
and `lst` is a list of values. Returns a reparam-style
pair featuring the SQL that ORs together the clause
for each item in the lst.
>>> sqlors('foo = ', [])
<sql: '2+2=5'>
>>> sqlors('foo = ', [1])
<sql: 'foo = 1'>
>>> sqlors('foo = ', 1)
<sql: 'foo = 1'>
>>> sqlors('foo = ', [1,2,3])
<sql: '(foo = 1 OR foo = 2 OR foo = 3)'>
"""
if isinstance(lst, iters):
lst = list(lst)
ln = len(lst)
if ln == 0:
return SQLQuery("2+2=5", [])
if ln == 1:
lst = lst[0]
if isinstance(lst, iters):
return SQLQuery('(' + left +
(' OR ' + left).join([aparam() for param in lst]) + ")", lst)
else:
return SQLQuery(left + aparam(), [lst])
def sqlwhere(dictionary, grouping=' AND '):
"""
Converts a `dictionary` to an SQL WHERE clause `SQLQuery`.
>>> sqlwhere({'cust_id': 2, 'order_id':3})
<sql: 'order_id = 3 AND cust_id = 2'>
>>> sqlwhere({'cust_id': 2, 'order_id':3}, grouping=', ')
<sql: 'order_id = 3, cust_id = 2'>
"""
return SQLQuery(grouping.join([
'%s = %s' % (k, aparam()) for k in dictionary.keys()
]), dictionary.values())
def select(tables, vars=None, what='*', where=None, order=None, group=None,
limit=None, offset=None, _test=False):
"""
Selects `what` from `tables` with clauses `where`, `order`,
`group`, `limit`, and `offset`. Uses vars to interpolate.
Otherwise, each clause can be a SQLQuery.
>>> select('foo', _test=True)
<sql: 'SELECT * FROM foo'>
>>> select(['foo', 'bar'], where="foo.bar_id = bar.id", limit=5, _test=True)
<sql: 'SELECT * FROM foo, bar WHERE foo.bar_id = bar.id LIMIT 5'>
"""
if vars is None: vars = {}
qout = ""
def gen_clause(sql, val):
if isinstance(val, (int, long)):
if sql == 'WHERE':
nout = 'id = ' + sqlquote(val)
else:
nout = SQLQuery(val)
elif isinstance(val, (list, tuple)) and len(val) == 2:
nout = SQLQuery(val[0], val[1]) # backwards-compatibility
elif isinstance(val, SQLQuery):
nout = val
elif val:
nout = reparam(val, vars)
else:
return ""
out = ""
if qout: out += " "
out += sql + " " + nout
return out
if web.ctx.get('db_name') == "firebird":
for (sql, val) in (
('FIRST', limit),
('SKIP', offset)
):
qout += gen_clause(sql, val)
if qout:
SELECT = 'SELECT ' + qout
else:
SELECT = 'SELECT'
qout = ""
sql_clauses = (
(SELECT, what),
('FROM', sqllist(tables)),
('WHERE', where),
('GROUP BY', group),
('ORDER BY', order)
)
else:
sql_clauses = (
('SELECT', what),
('FROM', sqllist(tables)),
('WHERE', where),
('GROUP BY', group),
('ORDER BY', order),
('LIMIT', limit),
('OFFSET', offset)
)
for (sql, val) in sql_clauses:
qout += gen_clause(sql, val)
if _test: return qout
return query(qout, processed=True)
def insert(tablename, seqname=None, _test=False, **values):
"""
Inserts `values` into `tablename`. Returns current sequence ID.
Set `seqname` to the ID if it's not the default, or to `False`
if there isn't one.
>>> insert('foo', joe='bob', a=2, _test=True)
<sql: "INSERT INTO foo (a, joe) VALUES (2, 'bob')">
"""
if values:
sql_query = SQLQuery("INSERT INTO %s (%s) VALUES (%s)" % (
tablename,
", ".join(values.keys()),
', '.join([aparam() for x in values])
), values.values())
else:
sql_query = SQLQuery("INSERT INTO %s DEFAULT VALUES" % tablename)
if _test: return sql_query
db_cursor = web.ctx.db_cursor()
if seqname is False:
pass
elif web.ctx.db_name == "postgres":
if seqname is None:
seqname = tablename + "_id_seq"
sql_query += "; SELECT currval('%s')" % seqname
elif web.ctx.db_name == "mysql":
web.ctx.db_execute(db_cursor, sql_query)
sql_query = SQLQuery("SELECT last_insert_id()")
elif web.ctx.db_name == "sqlite":
web.ctx.db_execute(db_cursor, sql_query)
# not really the same...
sql_query = SQLQuery("SELECT last_insert_rowid()")
web.ctx.db_execute(db_cursor, sql_query)
try:
out = db_cursor.fetchone()[0]
except Exception:
out = None
if not web.ctx.db_transaction: web.ctx.db.commit()
return out
def update(tables, where, vars=None, _test=False, **values):
"""
Update `tables` with clause `where` (interpolated using `vars`)
and setting `values`.
>>> joe = 'Joseph'
>>> update('foo', where='name = $joe', name='bob', age=5,
... vars=locals(), _test=True)
<sql: "UPDATE foo SET age = 5, name = 'bob' WHERE name = 'Joseph'">
"""
if vars is None: vars = {}
if isinstance(where, (int, long)):
where = "id = " + sqlquote(where)
elif isinstance(where, (list, tuple)) and len(where) == 2:
where = SQLQuery(where[0], where[1])
elif isinstance(where, SQLQuery):
pass
else:
where = reparam(where, vars)
query = (
"UPDATE " + sqllist(tables) +
" SET " + sqlwhere(values, ', ') +
" WHERE " + where)
if _test: return query
db_cursor = web.ctx.db_cursor()
web.ctx.db_execute(db_cursor, query)
if not web.ctx.db_transaction: web.ctx.db.commit()
return db_cursor.rowcount
def delete(table, where=None, using=None, vars=None, _test=False):
"""
Deletes from `table` with clauses `where` and `using`.
>>> name = 'Joe'
>>> delete('foo', where='name = $name', vars=locals(), _test=True)
<sql: "DELETE FROM foo WHERE name = 'Joe'">
"""
if vars is None: vars = {}
if isinstance(where, (int, long)):
where = "id = " + sqlquote(where)
elif isinstance(where, (list, tuple)) and len(where) == 2:
where = SQLQuery(where[0], where[1])
elif isinstance(where, SQLQuery):
pass
elif where is None:
pass
else:
where = reparam(where, vars)
q = 'DELETE FROM ' + table
if where:
q += ' WHERE ' + where
if using and web.ctx.get('db_name') != "firebird":
q += ' USING ' + sqllist(using)
if _test: return q
db_cursor = web.ctx.db_cursor()
web.ctx.db_execute(db_cursor, q)
if not web.ctx.db_transaction: web.ctx.db.commit()
return db_cursor.rowcount
if __name__ == "__main__":
import doctest
doctest.testmod()

View File

@ -1,316 +0,0 @@
"""
pretty debug errors
(part of web.py)
adapted from Django <djangoproject.com>
Copyright (c) 2005, the Lawrence Journal-World
Used under the modified BSD license:
http://www.xfree86.org/3.3.6/COPYRIGHT2.html#5
"""
__all__ = ["debugerror", "djangoerror"]
import sys, urlparse, pprint
from net import websafe
from template import Template
import webapi as web
import os, os.path
whereami = os.path.join(os.getcwd(), __file__)
whereami = os.path.sep.join(whereami.split(os.path.sep)[:-1])
djangoerror_t = """\
$def with (exception_type, exception_value, frames)
<!DOCTYPE HTML PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN" "http://www.w3.org/TR/html4/loose.dtd">
<html lang="en">
<head>
<meta http-equiv="content-type" content="text/html; charset=utf-8" />
<meta name="robots" content="NONE,NOARCHIVE" />
<title>$exception_type at $ctx.path</title>
<style type="text/css">
html * { padding:0; margin:0; }
body * { padding:10px 20px; }
body * * { padding:0; }
body { font:small sans-serif; }
body>div { border-bottom:1px solid #ddd; }
h1 { font-weight:normal; }
h2 { margin-bottom:.8em; }
h2 span { font-size:80%; color:#666; font-weight:normal; }
h3 { margin:1em 0 .5em 0; }
h4 { margin:0 0 .5em 0; font-weight: normal; }
table {
border:1px solid #ccc; border-collapse: collapse; background:white; }
tbody td, tbody th { vertical-align:top; padding:2px 3px; }
thead th {
padding:1px 6px 1px 3px; background:#fefefe; text-align:left;
font-weight:normal; font-size:11px; border:1px solid #ddd; }
tbody th { text-align:right; color:#666; padding-right:.5em; }
table.vars { margin:5px 0 2px 40px; }
table.vars td, table.req td { font-family:monospace; }
table td.code { width:100%;}
table td.code div { overflow:hidden; }
table.source th { color:#666; }
table.source td {
font-family:monospace; white-space:pre; border-bottom:1px solid #eee; }
ul.traceback { list-style-type:none; }
ul.traceback li.frame { margin-bottom:1em; }
div.context { margin: 10px 0; }
div.context ol {
padding-left:30px; margin:0 10px; list-style-position: inside; }
div.context ol li {
font-family:monospace; white-space:pre; color:#666; cursor:pointer; }
div.context ol.context-line li { color:black; background-color:#ccc; }
div.context ol.context-line li span { float: right; }
div.commands { margin-left: 40px; }
div.commands a { color:black; text-decoration:none; }
#summary { background: #ffc; }
#summary h2 { font-weight: normal; color: #666; }
#explanation { background:#eee; }
#template, #template-not-exist { background:#f6f6f6; }
#template-not-exist ul { margin: 0 0 0 20px; }
#traceback { background:#eee; }
#requestinfo { background:#f6f6f6; padding-left:120px; }
#summary table { border:none; background:transparent; }
#requestinfo h2, #requestinfo h3 { position:relative; margin-left:-100px; }
#requestinfo h3 { margin-bottom:-1em; }
.error { background: #ffc; }
.specific { color:#cc3300; font-weight:bold; }
</style>
<script type="text/javascript">
//<!--
function getElementsByClassName(oElm, strTagName, strClassName){
// Written by Jonathan Snook, http://www.snook.ca/jon;
// Add-ons by Robert Nyman, http://www.robertnyman.com
var arrElements = (strTagName == "*" && document.all)? document.all :
oElm.getElementsByTagName(strTagName);
var arrReturnElements = new Array();
strClassName = strClassName.replace(/\-/g, "\\-");
var oRegExp = new RegExp("(^|\\s)" + strClassName + "(\\s|$)");
var oElement;
for(var i=0; i<arrElements.length; i++){
oElement = arrElements[i];
if(oRegExp.test(oElement.className)){
arrReturnElements.push(oElement);
}
}
return (arrReturnElements)
}
function hideAll(elems) {
for (var e = 0; e < elems.length; e++) {
elems[e].style.display = 'none';
}
}
window.onload = function() {
hideAll(getElementsByClassName(document, 'table', 'vars'));
hideAll(getElementsByClassName(document, 'ol', 'pre-context'));
hideAll(getElementsByClassName(document, 'ol', 'post-context'));
}
function toggle() {
for (var i = 0; i < arguments.length; i++) {
var e = document.getElementById(arguments[i]);
if (e) {
e.style.display = e.style.display == 'none' ? 'block' : 'none';
}
}
return false;
}
function varToggle(link, id) {
toggle('v' + id);
var s = link.getElementsByTagName('span')[0];
var uarr = String.fromCharCode(0x25b6);
var darr = String.fromCharCode(0x25bc);
s.innerHTML = s.innerHTML == uarr ? darr : uarr;
return false;
}
//-->
</script>
</head>
<body>
<div id="summary">
<h1>$exception_type at $ctx.path</h1>
<h2>$exception_value</h2>
<table><tr>
<th>Python</th>
<td>$frames[0].filename in $frames[0].function, line $frames[0].lineno</td>
</tr><tr>
<th>Web</th>
<td>$ctx.method $ctx.home$ctx.path</td>
</tr></table>
</div>
<div id="traceback">
<h2>Traceback <span>(innermost first)</span></h2>
<ul class="traceback">
$for frame in frames:
<li class="frame">
<code>$frame.filename</code> in <code>$frame.function</code>
$if frame.context_line:
<div class="context" id="c$frame.id">
$if frame.pre_context:
<ol start="$frame.pre_context_lineno" class="pre-context" id="pre$frame.id">
$for line in frame.pre_context:
<li onclick="toggle('pre$frame.id', 'post$frame.id')">$line</li>
</ol>
<ol start="$frame.lineno" class="context-line"><li onclick="toggle('pre$frame.id', 'post$frame.id')">$frame.context_line <span>...</span></li></ol>
$if frame.post_context:
<ol start='${frame.lineno + 1}' class="post-context" id="post$frame.id">
$for line in frame.post_context:
<li onclick="toggle('pre$frame.id', 'post$frame.id')">$line</li>
</ol>
</div>
$if frame.vars:
<div class="commands">
<a href='#' onclick="return varToggle(this, '$frame.id')"><span>&#x25b6;</span> Local vars</a>
$# $inspect.formatargvalues(*inspect.getargvalues(frame['tb'].tb_frame))
</div>
$:dicttable(frame.vars, kls='vars', id=('v' + str(frame.id)))
</li>
</ul>
</div>
<div id="requestinfo">
$if ctx.output or ctx.headers:
<h2>Response so far</h2>
<h3>HEADERS</h3>
<p class="req"><code>
$for kv in ctx.headers:
$kv[0]: $kv[1]<br />
$else:
[no headers]
</code></p>
<h3>BODY</h3>
<p class="req" style="padding-bottom: 2em"><code>
$ctx.output
</code></p>
<h2>Request information</h2>
<h3>INPUT</h3>
$:dicttable(web.input())
<h3 id="cookie-info">COOKIES</h3>
$:dicttable(web.cookies())
<h3 id="meta-info">META</h3>
$ newctx = []
$# ) and (k not in ['env', 'output', 'headers', 'environ', 'status', 'db_execute']):
$for k, v in ctx.iteritems():
$if not k.startswith('_') and (k in x):
$newctx.append(kv)
$:dicttable(dict(newctx))
<h3 id="meta-info">ENVIRONMENT</h3>
$:dicttable(ctx.env)
</div>
<div id="explanation">
<p>
You're seeing this error because you have <code>web.internalerror</code>
set to <code>web.debugerror</code>. Change that if you want a different one.
</p>
</div>
</body>
</html>
"""
dicttable_t = r"""$def with (d, kls='req', id=None)
$if d:
<table class="$kls"\
$if id: id="$id"\
><thead><tr><th>Variable</th><th>Value</th></tr></thead>
<tbody>
$ temp = d.items()
$temp.sort()
$for kv in temp:
<tr><td>$kv[0]</td><td class="code"><div>$prettify(kv[1])</div></td></tr>
</tbody>
</table>
$else:
<p>No data.</p>
"""
dicttable_r = Template(dicttable_t, filter=websafe)
djangoerror_r = Template(djangoerror_t, filter=websafe)
def djangoerror():
def _get_lines_from_file(filename, lineno, context_lines):
"""
Returns context_lines before and after lineno from file.
Returns (pre_context_lineno, pre_context, context_line, post_context).
"""
try:
source = open(filename).readlines()
lower_bound = max(0, lineno - context_lines)
upper_bound = lineno + context_lines
pre_context = \
[line.strip('\n') for line in source[lower_bound:lineno]]
context_line = source[lineno].strip('\n')
post_context = \
[line.strip('\n') for line in source[lineno + 1:upper_bound]]
return lower_bound, pre_context, context_line, post_context
except (OSError, IOError):
return None, [], None, []
exception_type, exception_value, tback = sys.exc_info()
frames = []
while tback is not None:
filename = tback.tb_frame.f_code.co_filename
function = tback.tb_frame.f_code.co_name
lineno = tback.tb_lineno - 1
pre_context_lineno, pre_context, context_line, post_context = \
_get_lines_from_file(filename, lineno, 7)
frames.append(web.storage({
'tback': tback,
'filename': filename,
'function': function,
'lineno': lineno,
'vars': tback.tb_frame.f_locals,
'id': id(tback),
'pre_context': pre_context,
'context_line': context_line,
'post_context': post_context,
'pre_context_lineno': pre_context_lineno,
}))
tback = tback.tb_next
frames.reverse()
urljoin = urlparse.urljoin
def prettify(x):
try:
out = pprint.pformat(x)
except Exception, e:
out = '[could not display: <' + e.__class__.__name__ + \
': '+str(e)+'>]'
return out
dt = dicttable_r
dt.globals = {'prettify': prettify}
t = djangoerror_r
t.globals = {'ctx': web.ctx, 'web':web, 'dicttable':dt, 'dict':dict, 'str':str}
return t(exception_type, exception_value, frames)
def debugerror():
"""
A replacement for `internalerror` that presents a nice page with lots
of debug information for the programmer.
(Based on the beautiful 500 page from [Django](http://djangoproject.com/),
designed by [Wilson Miner](http://wilsonminer.com/).)
"""
web.ctx.headers = [('Content-Type', 'text/html')]
web.ctx.output = djangoerror()
if __name__ == "__main__":
urls = (
'/', 'index'
)
class index:
def GET(self):
thisdoesnotexist
web.internalerror = web.debugerror
web.run(urls)

View File

@ -1,215 +0,0 @@
"""
HTML forms
(part of web.py)
"""
import copy, re
import webapi as web
import utils, net
def attrget(obj, attr, value=None):
if hasattr(obj, 'has_key') and obj.has_key(attr): return obj[attr]
if hasattr(obj, attr): return getattr(obj, attr)
return value
class Form:
def __init__(self, *inputs, **kw):
self.inputs = inputs
self.valid = True
self.note = None
self.validators = kw.pop('validators', [])
def __call__(self, x=None):
o = copy.deepcopy(self)
if x: o.validates(x)
return o
def render(self):
out = ''
out += self.rendernote(self.note)
out += '<table>\n'
for i in self.inputs:
out += ' <tr><th><label for="%s">%s</label></th>' % (i.id, i.description)
out += "<td>"+i.pre+i.render()+i.post+"</td>"
out += '<td id="note_%s">%s</td></tr>\n' % (i.id, self.rendernote(i.note))
out += "</table>"
return out
def rendernote(self, note):
if note: return '<strong class="wrong">%s</strong>' % note
else: return ""
def validates(self, source=None, _validate=True, **kw):
source = source or kw or web.input()
out = True
for i in self.inputs:
v = attrget(source, i.name)
if _validate:
out = i.validate(v) and out
else:
i.value = v
if _validate:
out = out and self._validate(source)
self.valid = out
return out
def _validate(self, value):
self.value = value
for v in self.validators:
if not v.valid(value):
self.note = v.msg
return False
return True
def fill(self, source=None, **kw):
return self.validates(source, _validate=False, **kw)
def __getitem__(self, i):
for x in self.inputs:
if x.name == i: return x
raise KeyError, i
def _get_d(self): #@@ should really be form.attr, no?
return utils.storage([(i.name, i.value) for i in self.inputs])
d = property(_get_d)
class Input(object):
def __init__(self, name, *validators, **attrs):
self.description = attrs.pop('description', name)
self.value = attrs.pop('value', None)
self.pre = attrs.pop('pre', "")
self.post = attrs.pop('post', "")
self.id = attrs.setdefault('id', name)
if 'class_' in attrs:
attrs['class'] = attrs['class_']
del attrs['class_']
self.name, self.validators, self.attrs, self.note = name, validators, attrs, None
def validate(self, value):
self.value = value
for v in self.validators:
if not v.valid(value):
self.note = v.msg
return False
return True
def render(self): raise NotImplementedError
def addatts(self):
str = ""
for (n, v) in self.attrs.items():
str += ' %s="%s"' % (n, net.websafe(v))
return str
#@@ quoting
class Textbox(Input):
def render(self):
x = '<input type="text" name="%s"' % net.websafe(self.name)
if self.value: x += ' value="%s"' % net.websafe(self.value)
x += self.addatts()
x += ' />'
return x
class Password(Input):
def render(self):
x = '<input type="password" name="%s"' % net.websafe(self.name)
if self.value: x += ' value="%s"' % net.websafe(self.value)
x += self.addatts()
x += ' />'
return x
class Textarea(Input):
def render(self):
x = '<textarea name="%s"' % net.websafe(self.name)
x += self.addatts()
x += '>'
if self.value is not None: x += net.websafe(self.value)
x += '</textarea>'
return x
class Dropdown(Input):
def __init__(self, name, args, *validators, **attrs):
self.args = args
super(Dropdown, self).__init__(name, *validators, **attrs)
def render(self):
x = '<select name="%s"%s>\n' % (net.websafe(self.name), self.addatts())
for arg in self.args:
if type(arg) == tuple:
value, desc= arg
else:
value, desc = arg, arg
if self.value == value: select_p = ' selected="selected"'
else: select_p = ''
x += ' <option %s value="%s">%s</option>\n' % (select_p, net.websafe(value), net.websafe(desc))
x += '</select>\n'
return x
class Radio(Input):
def __init__(self, name, args, *validators, **attrs):
self.args = args
super(Radio, self).__init__(name, *validators, **attrs)
def render(self):
x = '<span>'
for arg in self.args:
if self.value == arg: select_p = ' checked="checked"'
else: select_p = ''
x += '<input type="radio" name="%s" value="%s"%s%s /> %s ' % (net.websafe(self.name), net.websafe(arg), select_p, self.addatts(), net.websafe(arg))
return x+'</span>'
class Checkbox(Input):
def render(self):
x = '<input name="%s" type="checkbox"' % net.websafe(self.name)
if self.value: x += ' checked="checked"'
x += self.addatts()
x += ' />'
return x
class Button(Input):
def __init__(self, name, *validators, **attrs):
super(Button, self).__init__(name, *validators, **attrs)
self.description = ""
def render(self):
safename = net.websafe(self.name)
x = '<button name="%s"%s>%s</button>' % (safename, self.addatts(), safename)
return x
class Hidden(Input):
def __init__(self, name, *validators, **attrs):
super(Hidden, self).__init__(name, *validators, **attrs)
# it doesnt make sence for a hidden field to have description
self.description = ""
def render(self):
x = '<input type="hidden" name="%s"' % net.websafe(self.name)
if self.value: x += ' value="%s"' % net.websafe(self.value)
x += ' />'
return x
class File(Input):
def render(self):
x = '<input type="file" name="%s"' % net.websafe(self.name)
x += self.addatts()
x += ' />'
return x
class Validator:
def __deepcopy__(self, memo): return copy.copy(self)
def __init__(self, msg, test, jstest=None): utils.autoassign(self, locals())
def valid(self, value):
try: return self.test(value)
except: return False
notnull = Validator("Required", bool)
class regexp(Validator):
def __init__(self, rexp, msg):
self.rexp = re.compile(rexp)
self.msg = msg
def valid(self, value):
return bool(self.rexp.match(value))

View File

@ -1,270 +0,0 @@
"""
HTTP Utilities
(from web.py)
"""
__all__ = [
"expires", "lastmodified",
"prefixurl", "modified",
"redirect", "found", "seeother", "tempredirect",
"write",
"changequery", "url",
"background", "backgrounder",
"Reloader", "reloader", "profiler",
]
import sys, os, threading, urllib, urlparse
try: import datetime
except ImportError: pass
import net, utils, webapi as web
def prefixurl(base=''):
"""
Sorry, this function is really difficult to explain.
Maybe some other time.
"""
url = web.ctx.path.lstrip('/')
for i in xrange(url.count('/')):
base += '../'
if not base:
base = './'
return base
def expires(delta):
"""
Outputs an `Expires` header for `delta` from now.
`delta` is a `timedelta` object or a number of seconds.
"""
if isinstance(delta, (int, long)):
delta = datetime.timedelta(seconds=delta)
date_obj = datetime.datetime.utcnow() + delta
web.header('Expires', net.httpdate(date_obj))
def lastmodified(date_obj):
"""Outputs a `Last-Modified` header for `datetime`."""
web.header('Last-Modified', net.httpdate(date_obj))
def modified(date=None, etag=None):
n = web.ctx.env.get('HTTP_IF_NONE_MATCH')
m = net.parsehttpdate(web.ctx.env.get('HTTP_IF_MODIFIED_SINCE', '').split(';')[0])
validate = False
if etag:
raise NotImplementedError, "no etag support yet"
# should really be a warning
if date and m:
# we subtract a second because
# HTTP dates don't have sub-second precision
if date-datetime.timedelta(seconds=1) <= m:
validate = True
if validate: web.ctx.status = '304 Not Modified'
return not validate
"""
By default, these all return simple error messages that send very short messages
(like "bad request") to the user. They can and should be overridden
to return nicer ones.
"""
def redirect(url, status='301 Moved Permanently'):
"""
Returns a `status` redirect to the new URL.
`url` is joined with the base URL so that things like
`redirect("about") will work properly.
"""
newloc = urlparse.urljoin(web.ctx.path, url)
# if newloc is relative then make it absolute
#mvoncken:Disabled because we don't want to redirect to localhost!
#if newloc.startswith('/'):
# newloc = web.ctx.home + newloc
web.ctx.status = status
web.ctx.output = ''
web.header('Content-Type', 'text/html')
web.header('Location', newloc)
# seems to add a three-second delay for some reason:
# web.output('<a href="'+ newloc + '">moved permanently</a>')
def found(url):
"""A `302 Found` redirect."""
return redirect(url, '302 Found')
def seeother(url):
"""A `303 See Other` redirect."""
return redirect(url, '303 See Other')
def tempredirect(url):
"""A `307 Temporary Redirect` redirect."""
return redirect(url, '307 Temporary Redirect')
def write(cgi_response):
"""
Converts a standard CGI-style string response into `header` and
`output` calls.
"""
cgi_response = str(cgi_response)
cgi_response.replace('\r\n', '\n')
head, body = cgi_response.split('\n\n', 1)
lines = head.split('\n')
for line in lines:
if line.isspace():
continue
hdr, value = line.split(":", 1)
value = value.strip()
if hdr.lower() == "status":
web.ctx.status = value
else:
web.header(hdr, value)
web.output(body)
def urlencode(query):
"""
Same as urllib.urlencode, but supports unicode strings.
>>> urlencode({'text':'foo bar'})
'text=foo+bar'
"""
query = dict([(k, utils.utf8(v)) for k, v in query.items()])
return urllib.urlencode(query)
def changequery(query=None, **kw):
"""
Imagine you're at `/foo?a=1&b=2`. Then `changequery(a=3)` will return
`/foo?a=3&b=2` -- the same URL but with the arguments you requested
changed.
"""
if query is None:
query = web.input(_method='get')
for k, v in kw.iteritems():
if v is None:
query.pop(k, None)
else:
query[k] = v
out = web.ctx.path
if query:
out += '?' + urlencode(query)
return out
def url(path=None, **kw):
"""
Makes url by concatinating web.ctx.homepath and path and the
query string created using the arguments.
"""
if path is None:
path = web.ctx.path
if path.startswith("/"):
out = web.ctx.homepath + path
else:
out = path
if kw:
out += '?' + urlencode(kw)
return out
def background(func):
"""A function decorator to run a long-running function as a background thread."""
def internal(*a, **kw):
web.data() # cache it
tmpctx = web._context[threading.currentThread()]
web._context[threading.currentThread()] = utils.storage(web.ctx.copy())
def newfunc():
web._context[threading.currentThread()] = tmpctx
func(*a, **kw)
myctx = web._context[threading.currentThread()]
for k in myctx.keys():
if k not in ['status', 'headers', 'output']:
try: del myctx[k]
except KeyError: pass
t = threading.Thread(target=newfunc)
background.threaddb[id(t)] = t
t.start()
web.ctx.headers = []
return seeother(changequery(_t=id(t)))
return internal
background.threaddb = {}
def backgrounder(func):
def internal(*a, **kw):
i = web.input(_method='get')
if '_t' in i:
try:
t = background.threaddb[int(i._t)]
except KeyError:
return web.notfound()
web._context[threading.currentThread()] = web._context[t]
return
else:
return func(*a, **kw)
return internal
class Reloader:
"""
Before every request, checks to see if any loaded modules have changed on
disk and, if so, reloads them.
"""
def __init__(self, func):
self.func = func
self.mtimes = {}
# cheetah:
# b = _compiletemplate.bases
# _compiletemplate = globals()['__compiletemplate']
# _compiletemplate.bases = b
web.loadhooks['reloader'] = self.check
# todo:
# - replace relrcheck with a loadhook
#if reloader in middleware:
# relr = reloader(None)
# relrcheck = relr.check
# middleware.remove(reloader)
#else:
# relr = None
# relrcheck = lambda: None
# if relr:
# relr.func = wsgifunc
# return wsgifunc
#
def check(self):
for mod in sys.modules.values():
try:
mtime = os.stat(mod.__file__).st_mtime
except (AttributeError, OSError, IOError):
continue
if mod.__file__.endswith('.pyc') and \
os.path.exists(mod.__file__[:-1]):
mtime = max(os.stat(mod.__file__[:-1]).st_mtime, mtime)
if mod not in self.mtimes:
self.mtimes[mod] = mtime
elif self.mtimes[mod] < mtime:
try:
reload(mod)
self.mtimes[mod] = mtime
except ImportError:
pass
return True
def __call__(self, e, o):
self.check()
return self.func(e, o)
reloader = Reloader
def profiler(app):
"""Outputs basic profiling information at the bottom of each response."""
from utils import profile
def profile_internal(e, o):
out, result = profile(app)(e, o)
return out + ['<pre>' + net.websafe(result) + '</pre>']
return profile_internal
if __name__ == "__main__":
import doctest
doctest.testmod()

View File

@ -1,227 +0,0 @@
__all__ = ["runsimple"]
import sys, os
import webapi as web
import net
def runbasic(func, server_address=("0.0.0.0", 8080)):
"""
Runs a simple HTTP server hosting WSGI app `func`. The directory `static/`
is hosted statically.
Based on [WsgiServer][ws] from [Colin Stewart][cs].
[ws]: http://www.owlfish.com/software/wsgiutils/documentation/wsgi-server-api.html
[cs]: http://www.owlfish.com/
"""
# Copyright (c) 2004 Colin Stewart (http://www.owlfish.com/)
# Modified somewhat for simplicity
# Used under the modified BSD license:
# http://www.xfree86.org/3.3.6/COPYRIGHT2.html#5
import SimpleHTTPServer, SocketServer, BaseHTTPServer, urlparse
import socket, errno
import traceback
class WSGIHandler(SimpleHTTPServer.SimpleHTTPRequestHandler):
def run_wsgi_app(self):
protocol, host, path, parameters, query, fragment = \
urlparse.urlparse('http://dummyhost%s' % self.path)
# we only use path, query
env = {'wsgi.version': (1, 0)
,'wsgi.url_scheme': 'http'
,'wsgi.input': self.rfile
,'wsgi.errors': sys.stderr
,'wsgi.multithread': 1
,'wsgi.multiprocess': 0
,'wsgi.run_once': 0
,'REQUEST_METHOD': self.command
,'REQUEST_URI': self.path
,'PATH_INFO': path
,'QUERY_STRING': query
,'CONTENT_TYPE': self.headers.get('Content-Type', '')
,'CONTENT_LENGTH': self.headers.get('Content-Length', '')
,'REMOTE_ADDR': self.client_address[0]
,'SERVER_NAME': self.server.server_address[0]
,'SERVER_PORT': str(self.server.server_address[1])
,'SERVER_PROTOCOL': self.request_version
}
for http_header, http_value in self.headers.items():
env ['HTTP_%s' % http_header.replace('-', '_').upper()] = \
http_value
# Setup the state
self.wsgi_sent_headers = 0
self.wsgi_headers = []
try:
# We have there environment, now invoke the application
result = self.server.app(env, self.wsgi_start_response)
try:
try:
for data in result:
if data:
self.wsgi_write_data(data)
finally:
if hasattr(result, 'close'):
result.close()
except socket.error, socket_err:
# Catch common network errors and suppress them
if (socket_err.args[0] in \
(errno.ECONNABORTED, errno.EPIPE)):
return
except socket.timeout, socket_timeout:
return
except:
print >> web.debug, traceback.format_exc(),
if (not self.wsgi_sent_headers):
# We must write out something!
self.wsgi_write_data(" ")
return
do_POST = run_wsgi_app
do_PUT = run_wsgi_app
do_DELETE = run_wsgi_app
def do_GET(self):
if self.path.startswith('/static/'):
SimpleHTTPServer.SimpleHTTPRequestHandler.do_GET(self)
else:
self.run_wsgi_app()
def wsgi_start_response(self, response_status, response_headers,
exc_info=None):
if (self.wsgi_sent_headers):
raise Exception \
("Headers already sent and start_response called again!")
# Should really take a copy to avoid changes in the application....
self.wsgi_headers = (response_status, response_headers)
return self.wsgi_write_data
def wsgi_write_data(self, data):
if (not self.wsgi_sent_headers):
status, headers = self.wsgi_headers
# Need to send header prior to data
status_code = status[:status.find(' ')]
status_msg = status[status.find(' ') + 1:]
self.send_response(int(status_code), status_msg)
for header, value in headers:
self.send_header(header, value)
self.end_headers()
self.wsgi_sent_headers = 1
# Send the data
self.wfile.write(data)
class WSGIServer(SocketServer.ThreadingMixIn, BaseHTTPServer.HTTPServer):
def __init__(self, func, server_address):
BaseHTTPServer.HTTPServer.__init__(self,
server_address,
WSGIHandler)
self.app = func
self.serverShuttingDown = 0
print "http://%s:%d/" % server_address
WSGIServer(func, server_address).serve_forever()
def runsimple(func, server_address=("0.0.0.0", 8080)):
"""
Runs [CherryPy][cp] WSGI server hosting WSGI app `func`.
The directory `static/` is hosted statically.
[cp]: http://www.cherrypy.org
"""
from wsgiserver import CherryPyWSGIServer
from SimpleHTTPServer import SimpleHTTPRequestHandler
from BaseHTTPServer import BaseHTTPRequestHandler
class StaticApp(SimpleHTTPRequestHandler):
"""WSGI application for serving static files."""
def __init__(self, environ, start_response):
self.headers = []
self.environ = environ
self.start_response = start_response
def send_response(self, status, msg=""):
self.status = str(status) + " " + msg
def send_header(self, name, value):
self.headers.append((name, value))
def end_headers(self):
pass
def log_message(*a): pass
def __iter__(self):
environ = self.environ
self.path = environ.get('PATH_INFO', '')
self.client_address = environ.get('REMOTE_ADDR','-'), \
environ.get('REMOTE_PORT','-')
self.command = environ.get('REQUEST_METHOD', '-')
from cStringIO import StringIO
self.wfile = StringIO() # for capturing error
f = self.send_head()
self.start_response(self.status, self.headers)
if f:
block_size = 16 * 1024
while True:
buf = f.read(block_size)
if not buf:
break
yield buf
f.close()
else:
value = self.wfile.getvalue()
yield value
class WSGIWrapper(BaseHTTPRequestHandler):
"""WSGI wrapper for logging the status and serving static files."""
def __init__(self, app):
self.app = app
self.format = '%s - - [%s] "%s %s %s" - %s'
def __call__(self, environ, start_response):
def xstart_response(status, response_headers, *args):
write = start_response(status, response_headers, *args)
self.log(status, environ)
return write
path = environ.get('PATH_INFO', '')
if path.startswith('/static/'):
return StaticApp(environ, xstart_response)
else:
return self.app(environ, xstart_response)
def log(self, status, environ):
#mvoncken,no logging..
return
outfile = environ.get('wsgi.errors', web.debug)
req = environ.get('PATH_INFO', '_')
protocol = environ.get('ACTUAL_SERVER_PROTOCOL', '-')
method = environ.get('REQUEST_METHOD', '-')
host = "%s:%s" % (environ.get('REMOTE_ADDR','-'),
environ.get('REMOTE_PORT','-'))
#@@ It is really bad to extend from
#@@ BaseHTTPRequestHandler just for this method
time = self.log_date_time_string()
print >> outfile, self.format % (host, time, protocol,
method, req, status)
func = WSGIWrapper(func)
server = CherryPyWSGIServer(server_address, func, server_name="localhost")
print "http://%s:%d/" % server_address
try:
server.start()
except KeyboardInterrupt:
server.stop()

View File

@ -1,155 +0,0 @@
"""
Network Utilities
(from web.py)
"""
__all__ = [
"validipaddr", "validipport", "validip", "validaddr",
"urlquote",
"httpdate", "parsehttpdate",
"htmlquote", "websafe",
]
import urllib, time
try: import datetime
except ImportError: pass
def validipaddr(address):
"""returns True if `address` is a valid IPv4 address"""
try:
octets = address.split('.')
assert len(octets) == 4
for x in octets:
assert 0 <= int(x) <= 255
except (AssertionError, ValueError):
return False
return True
def validipport(port):
"""returns True if `port` is a valid IPv4 port"""
try:
assert 0 <= int(port) <= 65535
except (AssertionError, ValueError):
return False
return True
def validip(ip, defaultaddr="0.0.0.0", defaultport=8080):
"""returns `(ip_address, port)` from string `ip_addr_port`"""
addr = defaultaddr
port = defaultport
ip = ip.split(":", 1)
if len(ip) == 1:
if not ip[0]:
pass
elif validipaddr(ip[0]):
addr = ip[0]
elif validipport(ip[0]):
port = int(ip[0])
else:
raise ValueError, ':'.join(ip) + ' is not a valid IP address/port'
elif len(ip) == 2:
addr, port = ip
if not validipaddr(addr) and validipport(port):
raise ValueError, ':'.join(ip) + ' is not a valid IP address/port'
port = int(port)
else:
raise ValueError, ':'.join(ip) + ' is not a valid IP address/port'
return (addr, port)
def validaddr(string_):
"""
returns either (ip_address, port) or "/path/to/socket" from string_
>>> validaddr('/path/to/socket')
'/path/to/socket'
>>> validaddr('8000')
('0.0.0.0', 8000)
>>> validaddr('127.0.0.1')
('127.0.0.1', 8080)
>>> validaddr('127.0.0.1:8000')
('127.0.0.1', 8000)
>>> validaddr('fff')
Traceback (most recent call last):
...
ValueError: fff is not a valid IP address/port
"""
if '/' in string_:
return string_
else:
return validip(string_)
def urlquote(val):
"""
Quotes a string for use in a URL.
>>> urlquote('://?f=1&j=1')
'%3A//%3Ff%3D1%26j%3D1'
>>> urlquote(None)
''
>>> urlquote(u'\u203d')
'%E2%80%BD'
"""
if val is None: return ''
if not isinstance(val, unicode): val = str(val)
else: val = val.encode('utf-8')
return urllib.quote(val)
def httpdate(date_obj):
"""
Formats a datetime object for use in HTTP headers.
>>> import datetime
>>> httpdate(datetime.datetime(1970, 1, 1, 1, 1, 1))
'Thu, 01 Jan 1970 01:01:01 GMT'
"""
return date_obj.strftime("%a, %d %b %Y %H:%M:%S GMT")
def parsehttpdate(string_):
"""
Parses an HTTP date into a datetime object.
>>> parsehttpdate('Thu, 01 Jan 1970 01:01:01 GMT')
datetime.datetime(1970, 1, 1, 1, 1, 1)
"""
try:
t = time.strptime(string_, "%a, %d %b %Y %H:%M:%S %Z")
except ValueError:
return None
return datetime.datetime(*t[:6])
def htmlquote(text):
"""
Encodes `text` for raw use in HTML.
>>> htmlquote("<'&\\">")
'&lt;&#39;&amp;&quot;&gt;'
"""
text = text.replace("&", "&amp;") # Must be done first!
text = text.replace("<", "&lt;")
text = text.replace(">", "&gt;")
text = text.replace("'", "&#39;")
text = text.replace('"', "&quot;")
return text
def websafe(val):
"""
Converts `val` so that it's safe for use in UTF-8 HTML.
>>> websafe("<'&\\">")
'&lt;&#39;&amp;&quot;&gt;'
>>> websafe(None)
''
>>> websafe(u'\u203d')
'\\xe2\\x80\\xbd'
"""
if val is None:
return ''
if isinstance(val, unicode):
val = val.encode('utf-8')
val = str(val)
return htmlquote(val)
if __name__ == "__main__":
import doctest
doctest.testmod()

View File

@ -1,153 +0,0 @@
"""
Request Delegation
(from web.py)
"""
__all__ = ["handle", "nomethod", "autodelegate", "webpyfunc", "run"]
import sys, re, types, os.path, urllib
import http, wsgi, utils, webapi
import webapi as web
def handle(mapping, fvars=None):
"""
Call the appropriate function based on the url to function mapping in `mapping`.
If no module for the function is specified, look up the function in `fvars`. If
`fvars` is empty, using the caller's context.
`mapping` should be a tuple of paired regular expressions with function name
substitutions. `handle` will import modules as necessary.
"""
for url, ofno in utils.group(mapping, 2):
if isinstance(ofno, tuple):
ofn, fna = ofno[0], list(ofno[1:])
else:
ofn, fna = ofno, []
fn, result = utils.re_subm('^' + url + '$', ofn, web.ctx.path)
if result: # it's a match
if fn.split(' ', 1)[0] == "redirect":
url = fn.split(' ', 1)[1]
if web.ctx.method == "GET":
x = web.ctx.env.get('QUERY_STRING', '')
if x:
url += '?' + x
return http.redirect(url)
elif '.' in fn:
x = fn.split('.')
mod, cls = '.'.join(x[:-1]), x[-1]
mod = __import__(mod, globals(), locals(), [""])
cls = getattr(mod, cls)
else:
cls = fn
mod = fvars
if isinstance(mod, types.ModuleType):
mod = vars(mod)
try:
cls = mod[cls]
except KeyError:
return web.notfound()
meth = web.ctx.method
if meth == "HEAD":
if not hasattr(cls, meth):
meth = "GET"
if not hasattr(cls, meth):
return nomethod(cls)
tocall = getattr(cls(), meth)
args = list(result.groups())
for d in re.findall(r'\\(\d+)', ofn):
args.pop(int(d) - 1)
return tocall(*([x and urllib.unquote(x) for x in args] + fna))
return web.notfound()
def nomethod(cls):
"""Returns a `405 Method Not Allowed` error for `cls`."""
web.ctx.status = '405 Method Not Allowed'
web.header('Content-Type', 'text/html')
web.header('Allow', \
', '.join([method for method in \
['GET', 'HEAD', 'POST', 'PUT', 'DELETE'] \
if hasattr(cls, method)]))
# commented out for the same reason redirect is
# return output('method not allowed')
def autodelegate(prefix=''):
"""
Returns a method that takes one argument and calls the method named prefix+arg,
calling `notfound()` if there isn't one. Example:
urls = ('/prefs/(.*)', 'prefs')
class prefs:
GET = autodelegate('GET_')
def GET_password(self): pass
def GET_privacy(self): pass
`GET_password` would get called for `/prefs/password` while `GET_privacy` for
`GET_privacy` gets called for `/prefs/privacy`.
If a user visits `/prefs/password/change` then `GET_password(self, '/change')`
is called.
"""
def internal(self, arg):
if '/' in arg:
first, rest = arg.split('/', 1)
func = prefix + first
args = ['/' + rest]
else:
func = prefix + arg
args = []
if hasattr(self, func):
try:
return getattr(self, func)(*args)
except TypeError:
return web.notfound()
else:
return web.notfound()
return internal
def webpyfunc(inp, fvars, autoreload=False):
"""If `inp` is a url mapping, returns a function that calls handle."""
if not hasattr(inp, '__call__'):
if autoreload:
def modname():
"""find name of the module name from fvars."""
file, name = fvars['__file__'], fvars['__name__']
if name == '__main__':
# Since the __main__ module can't be reloaded, the module has
# to be imported using its file name.
name = os.path.splitext(os.path.basename(file))[0]
return name
mod = __import__(modname(), None, None, [""])
#@@probably should replace this with some inspect magic
name = utils.dictfind(fvars, inp)
func = lambda: handle(getattr(mod, name), mod)
else:
func = lambda: handle(inp, fvars)
else:
func = inp
return func
def run(inp, fvars, *middleware):
"""
Starts handling requests. If called in a CGI or FastCGI context, it will follow
that protocol. If called from the command line, it will start an HTTP
server on the port named in the first command line argument, or, if there
is no argument, on port 8080.
`input` is a callable, then it's called with no arguments.
Otherwise, it's a `mapping` object to be passed to `handle(...)`.
**Caveat:** So that `reloader` will work correctly, input has to be a variable,
it can't be a tuple passed in directly.
`middleware` is a list of WSGI middleware which is applied to the resulting WSGI
function.
"""
autoreload = http.reloader in middleware
return wsgi.runwsgi(webapi.wsgifunc(webpyfunc(inp, fvars, autoreload), *middleware))

View File

@ -1,878 +0,0 @@
"""
simple, elegant templating
(part of web.py)
"""
import re, glob, os, os.path
from types import FunctionType as function
from utils import storage, group, utf8
from net import websafe
# differences from python:
# - for: has an optional else: that gets called if the loop never runs
# differences to add:
# - you can use the expression inside if, while blocks
# - special for loop attributes, like django?
# - you can check to see if a variable is defined (perhaps w/ get func?)
# all these are probably good ideas for python...
# todo:
# inline tuple
# relax constraints on spacing
# continue, break, etc.
# tracebacks
global_globals = {'None':None, 'False':False, 'True': True}
MAX_ITERS = 100000
WHAT = 0
ARGS = 4
KWARGS = 6
NAME = 2
BODY = 4
CLAUSE = 2
ELIF = 6
ELSE = 8
IN = 6
NAME = 2
EXPR = 4
FILTER = 4
THING = 2
ATTR = 4
ITEM = 4
NEGATE = 4
X = 2
OP = 4
Y = 6
LINENO = -1
# http://docs.python.org/ref/identifiers.html
r_var = '[a-zA-Z_][a-zA-Z0-9_]*'
class ParseError(Exception): pass
class Parser:
def __init__(self, text, name=""):
self.t = text
self.p = 0
self._lock = [False]
self.name = name
def lock(self):
self._lock[-1] = True
def curline(self):
return self.t[:self.p].count('\n')+1
def csome(self):
return repr(self.t[self.p:self.p+5]+'...')
def Error(self, x, y=None):
if y is None: y = self.csome()
raise ParseError, "%s: expected %s, got %s (line %s)" % (self.name, x, y, self.curline())
def q(self, f):
def internal(*a, **kw):
checkp = self.p
self._lock.append(False)
try:
q = f(*a, **kw)
except ParseError:
if self._lock[-1]:
raise
self.p = checkp
self._lock.pop()
return False
self._lock.pop()
return q or True
return internal
def tokr(self, t):
text = self.c(len(t))
if text != t:
self.Error(repr(t), repr(text))
return t
def ltokr(self, *l):
for x in l:
o = self.tokq(x)
if o: return o
self.Error('one of '+repr(l))
def rer(self, r):
x = re.match(r, self.t[self.p:]) #@@re_compile
if not x:
self.Error('r'+repr(r))
return self.tokr(x.group())
def endr(self):
if self.p != len(self.t):
self.Error('EOF')
def c(self, n=1):
out = self.t[self.p:self.p+n]
if out == '' and n != 0:
self.Error('character', 'EOF')
self.p += n
return out
def lookbehind(self, t):
return self.t[self.p-len(t):self.p] == t
def __getattr__(self, a):
if a.endswith('q'):
return self.q(getattr(self, a[:-1]+'r'))
raise AttributeError, a
class TemplateParser(Parser):
def __init__(self, *a, **kw):
Parser.__init__(self, *a, **kw)
self.curws = ''
self.curind = ''
def o(self, *a):
return a+('lineno', self.curline())
def go(self):
# maybe try to do some traceback parsing/hacking
return self.gor()
def gor(self):
header = self.defwithq()
results = self.lines(start=True)
self.endr()
return header, results
def ws(self):
n = 0
while self.tokq(" "): n += 1
return " " * n
def defwithr(self):
self.tokr('$def with ')
self.lock()
self.tokr('(')
args = []
kw = []
x = self.req(r_var)
while x:
if self.tokq('='):
v = self.exprr()
kw.append((x, v))
else:
args.append(x)
x = self.tokq(', ') and self.req(r_var)
self.tokr(')\n')
return self.o('defwith', 'null', None, 'args', args, 'kwargs', kw)
def literalr(self):
o = (
self.req('"[^"]*"') or #@@ no support for escapes
self.req("'[^']*'")
)
if o is False:
o = self.req('\-?[0-9]+(\.[0-9]*)?')
if o is not False:
if '.' in o: o = float(o)
else: o = int(o)
if o is False: self.Error('literal')
return self.o('literal', 'thing', o)
def listr(self):
self.tokr('[')
self.lock()
x = []
if not self.tokq(']'):
while True:
t = self.exprr()
x.append(t)
if not self.tokq(', '): break
self.tokr(']')
return self.o('list', 'thing', x)
def dictr(self):
self.tokr('{')
self.lock()
x = {}
if not self.tokq('}'):
while True:
k = self.exprr()
self.tokr(': ')
v = self.exprr()
x[k] = v
if not self.tokq(', '): break
self.tokr('}')
return self.o('dict', 'thing', x)
def parenr(self):
self.tokr('(')
self.lock()
o = self.exprr() # todo: allow list
self.tokr(')')
return self.o('paren', 'thing', o)
def atomr(self):
"""returns var, literal, paren, dict, or list"""
o = (
self.varq() or
self.parenq() or
self.dictq() or
self.listq() or
self.literalq()
)
if o is False: self.Error('atom')
return o
def primaryr(self):
"""returns getattr, call, or getitem"""
n = self.atomr()
while 1:
if self.tokq('.'):
v = self.req(r_var)
if not v:
self.p -= 1 # get rid of the '.'
break
else:
n = self.o('getattr', 'thing', n, 'attr', v)
elif self.tokq('('):
args = []
kw = []
while 1:
# need to see if we're doing a keyword argument
checkp = self.p
k = self.req(r_var)
if k and self.tokq('='): # yup
v = self.exprr()
kw.append((k, v))
else:
self.p = checkp
x = self.exprq()
if x: # at least it's something
args.append(x)
else:
break
if not self.tokq(', '): break
self.tokr(')')
n = self.o('call', 'thing', n, 'args', args, 'kwargs', kw)
elif self.tokq('['):
v = self.exprr()
self.tokr(']')
n = self.o('getitem', 'thing', n, 'item', v)
else:
break
return n
def exprr(self):
negate = self.tokq('not ')
x = self.primaryr()
if self.tokq(' '):
operator = self.ltokr('not in', 'in', 'is not', 'is', '==', '!=', '>=', '<=', '<', '>', 'and', 'or', '*', '+', '-', '/', '%')
self.tokr(' ')
y = self.exprr()
x = self.o('test', 'x', x, 'op', operator, 'y', y)
return self.o('expr', 'thing', x, 'negate', negate)
def varr(self):
return self.o('var', 'name', self.rer(r_var))
def liner(self):
out = []
o = self.curws
while 1:
c = self.c()
self.lock()
if c == '\n':
self.p -= 1
break
if c == '$':
if self.lookbehind('\\$'):
o = o[:-1] + c
else:
filter = not bool(self.tokq(':'))
if self.tokq('{'):
out.append(o)
out.append(self.o('itpl', 'name', self.exprr(), 'filter', filter))
self.tokr('}')
o = ''
else:
g = self.primaryq()
if g:
out.append(o)
out.append(self.o('itpl', 'name', g, 'filter', filter))
o = ''
else:
o += c
else:
o += c
self.tokr('\n')
if not self.lookbehind('\\\n'):
o += '\n'
else:
o = o[:-1]
out.append(o)
return self.o('line', 'thing', out)
def varsetr(self):
self.tokr('$var ')
self.lock()
what = self.rer(r_var)
self.tokr(':')
body = self.lines()
return self.o('varset', 'name', what, 'body', body)
def ifr(self):
self.tokr("$if ")
self.lock()
expr = self.exprr()
self.tokr(":")
ifc = self.lines()
elifs = []
while self.tokq(self.curws + self.curind + '$elif '):
v = self.exprr()
self.tokr(':')
c = self.lines()
elifs.append(self.o('elif', 'clause', v, 'body', c))
if self.tokq(self.curws + self.curind + "$else:"):
elsec = self.lines()
else:
elsec = None
return self.o('if', 'clause', expr, 'then', ifc, 'elif', elifs, 'else', elsec)
def forr(self):
self.tokr("$for ")
self.lock()
v = self.setabler()
self.tokr(" in ")
g = self.exprr()
self.tokr(":")
l = self.lines()
if self.tokq(self.curws + self.curind + '$else:'):
elsec = self.lines()
else:
elsec = None
return self.o('for', 'name', v, 'body', l, 'in', g, 'else', elsec)
def whiler(self):
self.tokr('$while ')
self.lock()
v = self.exprr()
self.tokr(":")
l = self.lines()
if self.tokq(self.curws + self.curind + '$else:'):
elsec = self.lines()
else:
elsec = None
return self.o('while', 'clause', v, 'body', l, 'null', None, 'else', elsec)
def assignr(self):
self.tokr('$ ')
assign = self.rer(r_var) # NOTE: setable
self.tokr(' = ')
expr = self.exprr()
self.tokr('\n')
return self.o('assign', 'name', assign, 'expr', expr)
def commentr(self):
self.tokr('$#')
self.lock()
while self.c() != '\n': pass
return self.o('comment')
def setabler(self):
out = [self.varr()] #@@ not quite right
while self.tokq(', '):
out.append(self.varr())
return out
def lines(self, start=False):
"""
This function gets called from two places:
1. at the start, where it's matching the document itself
2. after any command, where it matches one line or an indented block
"""
o = []
if not start: # try to match just one line
singleline = self.tokq(' ') and self.lineq()
if singleline:
return [singleline]
else:
self.rer(' *') #@@slurp space?
self.tokr('\n')
oldind = self.curind
self.curind += ' '
while 1:
oldws = self.curws
t = self.tokq(oldws + self.curind)
if not t: break
self.curws += self.ws()
x = t and (
self.varsetq() or
self.ifq() or
self.forq() or
self.whileq() or
self.assignq() or
self.commentq() or
self.lineq())
self.curws = oldws
if not x:
break
elif x[WHAT] == 'comment':
pass
else:
o.append(x)
if not start: self.curind = oldind
return o
class Stowage(storage):
def __str__(self): return self.get('_str')
#@@ edits in place
def __add__(self, other):
if isinstance(other, (unicode, str)):
self._str += other
return self
else:
raise TypeError, 'cannot add'
def __radd__(self, other):
if isinstance(other, (unicode, str)):
self._str = other + self._str
return self
else:
raise TypeError, 'cannot add'
class WTF(AssertionError): pass
class SecurityError(Exception):
"""The template seems to be trying to do something naughty."""
pass
Required = object()
class Template:
globals = {}
content_types = {
'.html' : 'text/html; charset=utf-8',
'.txt' : 'text/plain',
}
def __init__(self, text, filter=None, filename=""):
self.filter = filter
self.filename = filename
# universal newlines:
text = text.replace('\r\n', '\n').replace('\r', '\n').expandtabs()
if not text.endswith('\n'): text += '\n'
header, tree = TemplateParser(text, filename).go()
self.tree = tree
if header:
self.h_defwith(header)
else:
self.args, self.kwargs = (), {}
def __call__(self, *a, **kw):
d = self.globals.copy()
d.update(self._parseargs(a, kw))
f = Fill(self.tree, d=d)
if self.filter: f.filter = self.filter
import webapi as web
if 'headers' in web.ctx and self.filename:
content_type = self.find_content_type()
if content_type:
web.header('Content-Type', content_type, unique=True)
return f.go()
def find_content_type(self):
for ext, content_type in self.content_types.iteritems():
if self.filename.endswith(ext):
return content_type
def _parseargs(self, inargs, inkwargs):
# difference from Python:
# no error on setting a keyword arg twice
d = {}
for arg in self.args:
d[arg] = Required
for kw, val in self.kwargs:
d[kw] = val
for n, val in enumerate(inargs):
if n < len(self.args):
d[self.args[n]] = val
elif n < len(self.args)+len(self.kwargs):
kw = self.kwargs[n - len(self.args)][0]
d[kw] = val
for kw, val in inkwargs.iteritems():
d[kw] = val
unset = []
for k, v in d.iteritems():
if v is Required:
unset.append(k)
if unset:
raise TypeError, 'values for %s are required' % unset
return d
def h_defwith(self, header):
assert header[WHAT] == 'defwith'
f = Fill(self.tree, d={})
self.args = header[ARGS]
self.kwargs = []
for var, valexpr in header[KWARGS]:
self.kwargs.append((var, f.h(valexpr)))
def __repr__(self):
return "<Template: %s>" % self.filename
class Handle:
def __init__(self, parsetree, **kw):
self._funccache = {}
self.parsetree = parsetree
for (k, v) in kw.iteritems(): setattr(self, k, v)
def h(self, item):
return getattr(self, 'h_' + item[WHAT])(item)
class Fill(Handle):
builtins = global_globals
def filter(self, text):
if text is None: return ''
else: return utf8(text)
# often replaced with stuff like net.websafe
def h_literal(self, i):
item = i[THING]
if isinstance(item, (unicode, str)) and item[0] in ['"', "'"]:
item = item[1:-1]
elif isinstance(item, (float, int)):
pass
return item
def h_list(self, i):
x = i[THING]
out = []
for item in x:
out.append(self.h(item))
return out
def h_dict(self, i):
x = i[THING]
out = {}
for k, v in x.iteritems():
out[self.h(k)] = self.h(v)
return out
def h_paren(self, i):
item = i[THING]
if isinstance(item, list):
raise NotImplementedError, 'tuples'
return self.h(item)
def h_getattr(self, i):
thing, attr = i[THING], i[ATTR]
thing = self.h(thing)
if attr.startswith('_') or attr.startswith('func_') or attr.startswith('im_'):
raise SecurityError, 'tried to get ' + attr
try:
if thing in self.builtins:
raise SecurityError, 'tried to getattr on ' + repr(thing)
except TypeError:
pass # raised when testing an unhashable object
try:
return getattr(thing, attr)
except AttributeError:
if isinstance(thing, list) and attr == 'join':
return lambda s: s.join(thing)
else:
raise
def h_call(self, i):
call = self.h(i[THING])
args = [self.h(x) for x in i[ARGS]]
kw = dict([(x, self.h(y)) for (x, y) in i[KWARGS]])
return call(*args, **kw)
def h_getitem(self, i):
thing, item = i[THING], i[ITEM]
thing = self.h(thing)
item = self.h(item)
return thing[item]
def h_expr(self, i):
item = self.h(i[THING])
if i[NEGATE]:
item = not item
return item
def h_test(self, item):
ox, op, oy = item[X], item[OP], item[Y]
# for short-circuiting to work, we can't eval these here
e = self.h
if op == 'is':
return e(ox) is e(oy)
elif op == 'is not':
return e(ox) is not e(oy)
elif op == 'in':
return e(ox) in e(oy)
elif op == 'not in':
return e(ox) not in e(oy)
elif op == '==':
return e(ox) == e(oy)
elif op == '!=':
return e(ox) != e(oy)
elif op == '>':
return e(ox) > e(oy)
elif op == '<':
return e(ox) < e(oy)
elif op == '<=':
return e(ox) <= e(oy)
elif op == '>=':
return e(ox) >= e(oy)
elif op == 'and':
return e(ox) and e(oy)
elif op == 'or':
return e(ox) or e(oy)
elif op == '+':
return e(ox) + e(oy)
elif op == '-':
return e(ox) - e(oy)
elif op == '*':
return e(ox) * e(oy)
elif op == '/':
return e(ox) / e(oy)
elif op == '%':
return e(ox) % e(oy)
else:
raise WTF, 'op ' + op
def h_var(self, i):
v = i[NAME]
if v in self.d:
return self.d[v]
elif v in self.builtins:
return self.builtins[v]
elif v == 'self':
return self.output
else:
raise NameError, 'could not find %s (line %s)' % (repr(i[NAME]), i[LINENO])
def h_line(self, i):
out = []
for x in i[THING]:
#@@ what if x is unicode
if isinstance(x, str):
out.append(x)
elif x[WHAT] == 'itpl':
o = self.h(x[NAME])
if x[FILTER]:
o = self.filter(o)
else:
o = (o is not None and utf8(o)) or ""
out.append(o)
else:
raise WTF, x
return ''.join(out)
def h_varset(self, i):
self.output[i[NAME]] = ''.join(self.h_lines(i[BODY]))
return ''
def h_if(self, i):
expr = self.h(i[CLAUSE])
if expr:
do = i[BODY]
else:
for e in i[ELIF]:
expr = self.h(e[CLAUSE])
if expr:
do = e[BODY]
break
else:
do = i[ELSE]
return ''.join(self.h_lines(do))
def h_for(self, i):
out = []
assert i[IN][WHAT] == 'expr'
invar = self.h(i[IN])
forvar = i[NAME]
if invar:
for nv in invar:
if len(forvar) == 1:
fv = forvar[0]
assert fv[WHAT] == 'var'
self.d[fv[NAME]] = nv # same (lack of) scoping as Python
else:
for x, y in zip(forvar, nv):
assert x[WHAT] == 'var'
self.d[x[NAME]] = y
out.extend(self.h_lines(i[BODY]))
else:
if i[ELSE]:
out.extend(self.h_lines(i[ELSE]))
return ''.join(out)
def h_while(self, i):
out = []
expr = self.h(i[CLAUSE])
if not expr:
return ''.join(self.h_lines(i[ELSE]))
c = 0
while expr:
c += 1
if c >= MAX_ITERS:
raise RuntimeError, 'too many while-loop iterations (line %s)' % i[LINENO]
out.extend(self.h_lines(i[BODY]))
expr = self.h(i[CLAUSE])
return ''.join(out)
def h_assign(self, i):
self.d[i[NAME]] = self.h(i[EXPR])
return ''
def h_comment(self, i): pass
def h_lines(self, lines):
if lines is None: return []
return map(self.h, lines)
def go(self):
self.output = Stowage()
self.output._str = ''.join(map(self.h, self.parsetree))
if self.output.keys() == ['_str']:
self.output = self.output['_str']
return self.output
class render:
def __init__(self, loc='templates/', cache=True):
self.loc = loc
if cache:
self.cache = {}
else:
self.cache = False
def _do(self, name, filter=None):
if self.cache is False or name not in self.cache:
tmplpath = os.path.join(self.loc, name)
p = [f for f in glob.glob(tmplpath + '.*') if not f.endswith('~')] # skip backup files
if not p and os.path.isdir(tmplpath):
return render(tmplpath, cache=self.cache)
elif not p:
raise AttributeError, 'no template named ' + name
p = p[0]
c = Template(open(p).read(), filename=p)
if self.cache is not False: self.cache[name] = (p, c)
if self.cache is not False: p, c = self.cache[name]
if p.endswith('.html') or p.endswith('.xml'):
if not filter: c.filter = websafe
return c
def __getattr__(self, p):
return self._do(p)
def frender(fn, *a, **kw):
return Template(open(fn).read(), *a, **kw)
def test():
import sys
verbose = '-v' in sys.argv
def assertEqual(a, b):
if a == b:
if verbose:
sys.stderr.write('.')
sys.stderr.flush()
else:
assert a == b, "\nexpected: %s\ngot: %s" % (repr(b), repr(a))
from utils import storage, group
class t:
def __init__(self, text):
self.text = text
def __call__(self, *a, **kw):
return TestResult(self.text, Template(self.text)(*a, **kw))
class TestResult:
def __init__(self, source, value):
self.source = source
self.value = value
def __eq__(self, other):
if self.value == other:
if verbose:
sys.stderr.write('.')
else:
print >> sys.stderr, 'FAIL:', repr(self.source), 'expected', repr(other), ', got', repr(self.value)
sys.stderr.flush()
t('1')() == '1\n'
t('$def with ()\n1')() == '1\n'
t('$def with (a)\n$a')(1) == '1\n'
t('$def with (a=0)\n$a')(1) == '1\n'
t('$def with (a=0)\n$a')(a=1) == '1\n'
t('$if 1: 1')() == '1\n'
t('$if 1:\n 1')() == '1\n'
t('$if 0: 0\n$elif 1: 1')() == '1\n'
t('$if 0: 0\n$elif None: 0\n$else: 1')() == '1\n'
t('$if (0 < 1) and (1 < 2): 1')() == '1\n'
t('$for x in [1, 2, 3]: $x')() == '1\n2\n3\n'
t('$for x in []: 0\n$else: 1')() == '1\n'
t('$def with (a)\n$while a and a.pop(): 1')([1, 2, 3]) == '1\n1\n1\n'
t('$while 0: 0\n$else: 1')() == '1\n'
t('$ a = 1\n$a')() == '1\n'
t('$# 0')() == ''
t('$def with (d)\n$for k, v in d.iteritems(): $k')({1: 1}) == '1\n'
t('$def with (a)\n$(a)')(1) == '1\n'
t('$def with (a)\n$a')(1) == '1\n'
t('$def with (a)\n$a.b')(storage(b=1)) == '1\n'
t('$def with (a)\n$a[0]')([1]) == '1\n'
t('${0 or 1}')() == '1\n'
t('$ a = [1]\n$a[0]')() == '1\n'
t('$ a = {1: 1}\n$a.keys()[0]')() == '1\n'
t('$ a = []\n$if not a: 1')() == '1\n'
t('$ a = {}\n$if not a: 1')() == '1\n'
t('$ a = -1\n$a')() == '-1\n'
t('$ a = "1"\n$a')() == '1\n'
t('$if 1 is 1: 1')() == '1\n'
t('$if not 0: 1')() == '1\n'
t('$if 1:\n $if 1: 1')() == '1\n'
t('$ a = 1\n$a')() == '1\n'
t('$ a = 1.\n$a')() == '1.0\n'
t('$({1: 1}.keys()[0])')() == '1\n'
t('$for x in [1, 2, 3]:\n\t$x')() == ' 1\n 2\n 3\n'
t('$def with (a)\n$:a')(1) == '1\n'
t('$def with (a)\n$a')(u'\u203d') == '\xe2\x80\xbd\n'
t(u'$def with (f)\n$:f("x")')(lambda x: x) == 'x\n'
j = Template("$var foo: bar")()
assertEqual(str(j), '')
assertEqual(j.foo, 'bar\n')
if verbose: sys.stderr.write('\n')
if __name__ == "__main__":
test()

View File

@ -1,796 +0,0 @@
"""
General Utilities
(part of web.py)
"""
__all__ = [
"Storage", "storage", "storify",
"iters",
"rstrips", "lstrips", "strips", "utf8",
"TimeoutError", "timelimit",
"Memoize", "memoize",
"re_compile", "re_subm",
"group",
"IterBetter", "iterbetter",
"dictreverse", "dictfind", "dictfindall", "dictincr", "dictadd",
"listget", "intget", "datestr",
"numify", "denumify", "dateify",
"CaptureStdout", "capturestdout", "Profile", "profile",
"tryall",
"ThreadedDict",
"autoassign",
"to36",
"safemarkdown"
]
import re, sys, time, threading
try: import datetime
except ImportError: pass
class Storage(dict):
"""
A Storage object is like a dictionary except `obj.foo` can be used
in addition to `obj['foo']`.
>>> o = storage(a=1)
>>> o.a
1
>>> o['a']
1
>>> o.a = 2
>>> o['a']
2
>>> del o.a
>>> o.a
Traceback (most recent call last):
...
AttributeError: 'a'
"""
def __getattr__(self, key):
try:
return self[key]
except KeyError, k:
raise AttributeError, k
def __setattr__(self, key, value):
self[key] = value
def __delattr__(self, key):
try:
del self[key]
except KeyError, k:
raise AttributeError, k
def __repr__(self):
return '<Storage ' + dict.__repr__(self) + '>'
storage = Storage
def storify(mapping, *requireds, **defaults):
"""
Creates a `storage` object from dictionary `mapping`, raising `KeyError` if
d doesn't have all of the keys in `requireds` and using the default
values for keys found in `defaults`.
For example, `storify({'a':1, 'c':3}, b=2, c=0)` will return the equivalent of
`storage({'a':1, 'b':2, 'c':3})`.
If a `storify` value is a list (e.g. multiple values in a form submission),
`storify` returns the last element of the list, unless the key appears in
`defaults` as a list. Thus:
>>> storify({'a':[1, 2]}).a
2
>>> storify({'a':[1, 2]}, a=[]).a
[1, 2]
>>> storify({'a':1}, a=[]).a
[1]
>>> storify({}, a=[]).a
[]
Similarly, if the value has a `value` attribute, `storify will return _its_
value, unless the key appears in `defaults` as a dictionary.
>>> storify({'a':storage(value=1)}).a
1
>>> storify({'a':storage(value=1)}, a={}).a
<Storage {'value': 1}>
>>> storify({}, a={}).a
{}
"""
def getvalue(x):
if hasattr(x, 'value'):
return x.value
else:
return x
stor = Storage()
for key in requireds + tuple(mapping.keys()):
value = mapping[key]
if isinstance(value, list):
if isinstance(defaults.get(key), list):
value = [getvalue(x) for x in value]
else:
value = value[-1]
if not isinstance(defaults.get(key), dict):
value = getvalue(value)
if isinstance(defaults.get(key), list) and not isinstance(value, list):
value = [value]
setattr(stor, key, value)
for (key, value) in defaults.iteritems():
result = value
if hasattr(stor, key):
result = stor[key]
if value == () and not isinstance(result, tuple):
result = (result,)
setattr(stor, key, result)
return stor
iters = [list, tuple]
import __builtin__
if hasattr(__builtin__, 'set'):
iters.append(set)
try:
from sets import Set
iters.append(Set)
except ImportError:
pass
class _hack(tuple): pass
iters = _hack(iters)
iters.__doc__ = """
A list of iterable items (like lists, but not strings). Includes whichever
of lists, tuples, sets, and Sets are available in this version of Python.
"""
def _strips(direction, text, remove):
if direction == 'l':
if text.startswith(remove):
return text[len(remove):]
elif direction == 'r':
if text.endswith(remove):
return text[:-len(remove)]
else:
raise ValueError, "Direction needs to be r or l."
return text
def rstrips(text, remove):
"""
removes the string `remove` from the right of `text`
>>> rstrips("foobar", "bar")
'foo'
"""
return _strips('r', text, remove)
def lstrips(text, remove):
"""
removes the string `remove` from the left of `text`
>>> lstrips("foobar", "foo")
'bar'
"""
return _strips('l', text, remove)
def strips(text, remove):
"""removes the string `remove` from the both sides of `text`
>>> strips("foobarfoo", "foo")
'bar'
"""
return rstrips(lstrips(text, remove), remove)
def utf8(text):
"""Encodes text in utf-8.
>> utf8(u'\u1234') # doctest doesn't seem to like utf-8
'\xe1\x88\xb4'
>>> utf8('hello')
'hello'
>>> utf8(42)
'42'
"""
if isinstance(text, unicode):
return text.encode('utf-8')
elif isinstance(text, str):
return text
else:
return str(text)
class TimeoutError(Exception): pass
def timelimit(timeout):
"""
A decorator to limit a function to `timeout` seconds, raising `TimeoutError`
if it takes longer.
>>> import time
>>> def meaningoflife():
... time.sleep(.2)
... return 42
>>>
>>> timelimit(.1)(meaningoflife)()
Traceback (most recent call last):
...
TimeoutError: took too long
>>> timelimit(1)(meaningoflife)()
42
_Caveat:_ The function isn't stopped after `timeout` seconds but continues
executing in a separate thread. (There seems to be no way to kill a thread.)
inspired by <http://aspn.activestate.com/ASPN/Cookbook/Python/Recipe/473878>
"""
def _1(function):
def _2(*args, **kw):
class Dispatch(threading.Thread):
def __init__(self):
threading.Thread.__init__(self)
self.result = None
self.error = None
self.setDaemon(True)
self.start()
def run(self):
try:
self.result = function(*args, **kw)
except:
self.error = sys.exc_info()
c = Dispatch()
c.join(timeout)
if c.isAlive():
raise TimeoutError, 'took too long'
if c.error:
raise c.error[0], c.error[1]
return c.result
return _2
return _1
class Memoize:
"""
'Memoizes' a function, caching its return values for each input.
>>> import time
>>> def meaningoflife():
... time.sleep(.2)
... return 42
>>> fastlife = memoize(meaningoflife)
>>> meaningoflife()
42
>>> timelimit(.1)(meaningoflife)()
Traceback (most recent call last):
...
TimeoutError: took too long
>>> fastlife()
42
>>> timelimit(.1)(fastlife)()
42
"""
def __init__(self, func):
self.func = func
self.cache = {}
def __call__(self, *args, **keywords):
key = (args, tuple(keywords.items()))
if key not in self.cache:
self.cache[key] = self.func(*args, **keywords)
return self.cache[key]
memoize = Memoize
re_compile = memoize(re.compile) #@@ threadsafe?
re_compile.__doc__ = """
A memoized version of re.compile.
"""
class _re_subm_proxy:
def __init__(self):
self.match = None
def __call__(self, match):
self.match = match
return ''
def re_subm(pat, repl, string):
"""
Like re.sub, but returns the replacement _and_ the match object.
>>> t, m = re_subm('g(oo+)fball', r'f\\1lish', 'goooooofball')
>>> t
'foooooolish'
>>> m.groups()
('oooooo',)
"""
compiled_pat = re_compile(pat)
proxy = _re_subm_proxy()
compiled_pat.sub(proxy.__call__, string)
return compiled_pat.sub(repl, string), proxy.match
def group(seq, size):
"""
Returns an iterator over a series of lists of length size from iterable.
>>> list(group([1,2,3,4], 2))
[[1, 2], [3, 4]]
"""
if not hasattr(seq, 'next'):
seq = iter(seq)
while True:
yield [seq.next() for i in xrange(size)]
class IterBetter:
"""
Returns an object that can be used as an iterator
but can also be used via __getitem__ (although it
cannot go backwards -- that is, you cannot request
`iterbetter[0]` after requesting `iterbetter[1]`).
>>> import itertools
>>> c = iterbetter(itertools.count())
>>> c[1]
1
>>> c[5]
5
>>> c[3]
Traceback (most recent call last):
...
IndexError: already passed 3
"""
def __init__(self, iterator):
self.i, self.c = iterator, 0
def __iter__(self):
while 1:
yield self.i.next()
self.c += 1
def __getitem__(self, i):
#todo: slices
if i < self.c:
raise IndexError, "already passed "+str(i)
try:
while i > self.c:
self.i.next()
self.c += 1
# now self.c == i
self.c += 1
return self.i.next()
except StopIteration:
raise IndexError, str(i)
iterbetter = IterBetter
def dictreverse(mapping):
"""
>>> dictreverse({1: 2, 3: 4})
{2: 1, 4: 3}
"""
return dict([(value, key) for (key, value) in mapping.iteritems()])
def dictfind(dictionary, element):
"""
Returns a key whose value in `dictionary` is `element`
or, if none exists, None.
>>> d = {1:2, 3:4}
>>> dictfind(d, 4)
3
>>> dictfind(d, 5)
"""
for (key, value) in dictionary.iteritems():
if element is value:
return key
def dictfindall(dictionary, element):
"""
Returns the keys whose values in `dictionary` are `element`
or, if none exists, [].
>>> d = {1:4, 3:4}
>>> dictfindall(d, 4)
[1, 3]
>>> dictfindall(d, 5)
[]
"""
res = []
for (key, value) in dictionary.iteritems():
if element is value:
res.append(key)
return res
def dictincr(dictionary, element):
"""
Increments `element` in `dictionary`,
setting it to one if it doesn't exist.
>>> d = {1:2, 3:4}
>>> dictincr(d, 1)
3
>>> d[1]
3
>>> dictincr(d, 5)
1
>>> d[5]
1
"""
dictionary.setdefault(element, 0)
dictionary[element] += 1
return dictionary[element]
def dictadd(*dicts):
"""
Returns a dictionary consisting of the keys in the argument dictionaries.
If they share a key, the value from the last argument is used.
>>> dictadd({1: 0, 2: 0}, {2: 1, 3: 1})
{1: 0, 2: 1, 3: 1}
"""
result = {}
for dct in dicts:
result.update(dct)
return result
def listget(lst, ind, default=None):
"""
Returns `lst[ind]` if it exists, `default` otherwise.
>>> listget(['a'], 0)
'a'
>>> listget(['a'], 1)
>>> listget(['a'], 1, 'b')
'b'
"""
if len(lst)-1 < ind:
return default
return lst[ind]
def intget(integer, default=None):
"""
Returns `integer` as an int or `default` if it can't.
>>> intget('3')
3
>>> intget('3a')
>>> intget('3a', 0)
0
"""
try:
return int(integer)
except (TypeError, ValueError):
return default
def datestr(then, now=None):
"""
Converts a (UTC) datetime object to a nice string representation.
>>> from datetime import datetime, timedelta
>>> d = datetime(1970, 5, 1)
>>> datestr(d, now=d)
'0 microseconds ago'
>>> for t, v in {
... timedelta(microseconds=1): '1 microsecond ago',
... timedelta(microseconds=2): '2 microseconds ago',
... -timedelta(microseconds=1): '1 microsecond from now',
... -timedelta(microseconds=2): '2 microseconds from now',
... timedelta(microseconds=2000): '2 milliseconds ago',
... timedelta(seconds=2): '2 seconds ago',
... timedelta(seconds=2*60): '2 minutes ago',
... timedelta(seconds=2*60*60): '2 hours ago',
... timedelta(days=2): '2 days ago',
... }.iteritems():
... assert datestr(d, now=d+t) == v
>>> datestr(datetime(1970, 1, 1), now=d)
'January 1'
>>> datestr(datetime(1969, 1, 1), now=d)
'January 1, 1969'
>>> datestr(datetime(1970, 6, 1), now=d)
'June 1, 1970'
"""
def agohence(n, what, divisor=None):
if divisor: n = n // divisor
out = str(abs(n)) + ' ' + what # '2 day'
if abs(n) != 1: out += 's' # '2 days'
out += ' ' # '2 days '
if n < 0:
out += 'from now'
else:
out += 'ago'
return out # '2 days ago'
oneday = 24 * 60 * 60
if not now: now = datetime.datetime.utcnow()
if type(now).__name__ == "DateTime":
now = datetime.datetime.fromtimestamp(now)
if type(then).__name__ == "DateTime":
then = datetime.datetime.fromtimestamp(then)
delta = now - then
deltaseconds = int(delta.days * oneday + delta.seconds + delta.microseconds * 1e-06)
deltadays = abs(deltaseconds) // oneday
if deltaseconds < 0: deltadays *= -1 # fix for oddity of floor
if deltadays:
if abs(deltadays) < 4:
return agohence(deltadays, 'day')
out = then.strftime('%B %e') # e.g. 'June 13'
if then.year != now.year or deltadays < 0:
out += ', %s' % then.year
return out
if int(deltaseconds):
if abs(deltaseconds) > (60 * 60):
return agohence(deltaseconds, 'hour', 60 * 60)
elif abs(deltaseconds) > 60:
return agohence(deltaseconds, 'minute', 60)
else:
return agohence(deltaseconds, 'second')
deltamicroseconds = delta.microseconds
if delta.days: deltamicroseconds = int(delta.microseconds - 1e6) # datetime oddity
if abs(deltamicroseconds) > 1000:
return agohence(deltamicroseconds, 'millisecond', 1000)
return agohence(deltamicroseconds, 'microsecond')
def numify(string):
"""
Removes all non-digit characters from `string`.
>>> numify('800-555-1212')
'8005551212'
>>> numify('800.555.1212')
'8005551212'
"""
return ''.join([c for c in str(string) if c.isdigit()])
def denumify(string, pattern):
"""
Formats `string` according to `pattern`, where the letter X gets replaced
by characters from `string`.
>>> denumify("8005551212", "(XXX) XXX-XXXX")
'(800) 555-1212'
"""
out = []
for c in pattern:
if c == "X":
out.append(string[0])
string = string[1:]
else:
out.append(c)
return ''.join(out)
def dateify(datestring):
"""
Formats a numified `datestring` properly.
"""
return denumify(datestring, "XXXX-XX-XX XX:XX:XX")
class CaptureStdout:
"""
Captures everything `func` prints to stdout and returns it instead.
>>> def idiot():
... print "foo"
>>> capturestdout(idiot)()
'foo\\n'
**WARNING:** Not threadsafe!
"""
def __init__(self, func):
self.func = func
def __call__(self, *args, **keywords):
from cStringIO import StringIO
# Not threadsafe!
out = StringIO()
oldstdout = sys.stdout
sys.stdout = out
try:
self.func(*args, **keywords)
finally:
sys.stdout = oldstdout
return out.getvalue()
capturestdout = CaptureStdout
class Profile:
"""
Profiles `func` and returns a tuple containing its output
and a string with human-readable profiling information.
>>> import time
>>> out, inf = profile(time.sleep)(.001)
>>> out
>>> inf[:10].strip()
'took 0.0'
"""
def __init__(self, func):
self.func = func
def __call__(self, *args): ##, **kw): kw unused
import hotshot, hotshot.stats, tempfile ##, time already imported
temp = tempfile.NamedTemporaryFile()
prof = hotshot.Profile(temp.name)
stime = time.time()
result = prof.runcall(self.func, *args)
stime = time.time() - stime
prof.close()
stats = hotshot.stats.load(temp.name)
stats.strip_dirs()
stats.sort_stats('time', 'calls')
x = '\n\ntook '+ str(stime) + ' seconds\n'
x += capturestdout(stats.print_stats)(40)
x += capturestdout(stats.print_callers)()
return result, x
profile = Profile
import traceback
# hack for compatibility with Python 2.3:
if not hasattr(traceback, 'format_exc'):
from cStringIO import StringIO
def format_exc(limit=None):
strbuf = StringIO()
traceback.print_exc(limit, strbuf)
return strbuf.getvalue()
traceback.format_exc = format_exc
def tryall(context, prefix=None):
"""
Tries a series of functions and prints their results.
`context` is a dictionary mapping names to values;
the value will only be tried if it's callable.
>>> tryall(dict(j=lambda: True))
j: True
----------------------------------------
results:
True: 1
For example, you might have a file `test/stuff.py`
with a series of functions testing various things in it.
At the bottom, have a line:
if __name__ == "__main__": tryall(globals())
Then you can run `python test/stuff.py` and get the results of
all the tests.
"""
context = context.copy() # vars() would update
results = {}
for (key, value) in context.iteritems():
if not hasattr(value, '__call__'):
continue
if prefix and not key.startswith(prefix):
continue
print key + ':',
try:
r = value()
dictincr(results, r)
print r
except:
print 'ERROR'
dictincr(results, 'ERROR')
print ' ' + '\n '.join(traceback.format_exc().split('\n'))
print '-'*40
print 'results:'
for (key, value) in results.iteritems():
print ' '*2, str(key)+':', value
class ThreadedDict:
"""
Takes a dictionary that maps threads to objects.
When a thread tries to get or set an attribute or item
of the threadeddict, it passes it on to the object
for that thread in dictionary.
"""
def __init__(self, dictionary):
self.__dict__['_ThreadedDict__d'] = dictionary
def __getattr__(self, attr):
return getattr(self.__d[threading.currentThread()], attr)
def __getitem__(self, item):
return self.__d[threading.currentThread()][item]
def __setattr__(self, attr, value):
if attr == '__doc__':
self.__dict__[attr] = value
else:
return setattr(self.__d[threading.currentThread()], attr, value)
def __delattr__(self, item):
try:
del self.__d[threading.currentThread()][item]
except KeyError, k:
raise AttributeError, k
def __delitem__(self, item):
del self.__d[threading.currentThread()][item]
def __setitem__(self, item, value):
self.__d[threading.currentThread()][item] = value
def __hash__(self):
return hash(self.__d[threading.currentThread()])
threadeddict = ThreadedDict
def autoassign(self, locals):
"""
Automatically assigns local variables to `self`.
>>> self = storage()
>>> autoassign(self, dict(a=1, b=2))
>>> self
<Storage {'a': 1, 'b': 2}>
Generally used in `__init__` methods, as in:
def __init__(self, foo, bar, baz=1): autoassign(self, locals())
"""
for (key, value) in locals.iteritems():
if key == 'self':
continue
setattr(self, key, value)
def to36(q):
"""
Converts an integer to base 36 (a useful scheme for human-sayable IDs).
>>> to36(35)
'z'
>>> to36(119292)
'2k1o'
>>> int(to36(939387374), 36)
939387374
>>> to36(0)
'0'
>>> to36(-393)
Traceback (most recent call last):
...
ValueError: must supply a positive integer
"""
if q < 0: raise ValueError, "must supply a positive integer"
letters = "0123456789abcdefghijklmnopqrstuvwxyz"
converted = []
while q != 0:
q, r = divmod(q, 36)
converted.insert(0, letters[r])
return "".join(converted) or '0'
r_url = re_compile('(?<!\()(http://(\S+))')
def safemarkdown(text):
"""
Converts text to HTML following the rules of Markdown, but blocking any
outside HTML input, so that only the things supported by Markdown
can be used. Also converts raw URLs to links.
(requires [markdown.py](http://webpy.org/markdown.py))
"""
from markdown import markdown
if text:
text = text.replace('<', '&lt;')
# TODO: automatically get page title?
text = r_url.sub(r'<\1>', text)
text = markdown(text)
return text
if __name__ == "__main__":
import doctest
doctest.testmod()

View File

@ -1,369 +0,0 @@
"""
Web API (wrapper around WSGI)
(from web.py)
"""
__all__ = [
"config",
"badrequest", "notfound", "gone", "internalerror",
"header", "output", "flush", "debug",
"input", "data",
"setcookie", "cookies",
"ctx",
"loadhooks", "load", "unloadhooks", "unload", "_loadhooks",
"wsgifunc"
]
import sys, os, cgi, threading, Cookie, pprint, traceback
try: import itertools
except ImportError: pass
from utils import storage, storify, threadeddict, dictadd, intget, lstrips, utf8
config = storage()
config.__doc__ = """
A configuration object for various aspects of web.py.
`db_parameters`
: A dictionary containing the parameters to be passed to `connect`
when `load()` is called.
`db_printing`
: Set to `True` if you would like SQL queries and timings to be
printed to the debug output.
"""
def badrequest():
"""Return a `400 Bad Request` error."""
ctx.status = '400 Bad Request'
header('Content-Type', 'text/html')
return output('bad request')
def notfound():
"""Returns a `404 Not Found` error."""
ctx.status = '404 Not Found'
header('Content-Type', 'text/html')
return output('not found')
def gone():
"""Returns a `410 Gone` error."""
ctx.status = '410 Gone'
header('Content-Type', 'text/html')
return output("gone")
def internalerror():
"""Returns a `500 Internal Server` error."""
ctx.status = "500 Internal Server Error"
ctx.headers = [('Content-Type', 'text/html')]
ctx.output = "internal server error"
def header(hdr, value, unique=False):
"""
Adds the header `hdr: value` with the response.
If `unique` is True and a header with that name already exists,
it doesn't add a new one.
"""
hdr, value = utf8(hdr), utf8(value)
# protection against HTTP response splitting attack
if '\n' in hdr or '\r' in hdr or '\n' in value or '\r' in value:
raise ValueError, 'invalid characters in header'
if unique is True:
for h, v in ctx.headers:
if h.lower() == hdr.lower(): return
ctx.headers.append((hdr, value))
def output(string_):
"""Appends `string_` to the response."""
if isinstance(string_, unicode): string_ = string_.encode('utf8')
if ctx.get('flush'):
ctx._write(string_)
else:
ctx.output += str(string_)
def flush():
ctx.flush = True
return flush
def input(*requireds, **defaults):
"""
Returns a `storage` object with the GET and POST arguments.
See `storify` for how `requireds` and `defaults` work.
"""
from cStringIO import StringIO
def dictify(fs): return dict([(k, fs[k]) for k in fs.keys()])
_method = defaults.pop('_method', 'both')
e = ctx.env.copy()
a = b = {}
if _method.lower() in ['both', 'post']:
if e['REQUEST_METHOD'] == 'POST':
a = cgi.FieldStorage(fp = StringIO(data()), environ=e,
keep_blank_values=1)
a = dictify(a)
if _method.lower() in ['both', 'get']:
e['REQUEST_METHOD'] = 'GET'
b = dictify(cgi.FieldStorage(environ=e, keep_blank_values=1))
out = dictadd(b, a)
try:
return storify(out, *requireds, **defaults)
except KeyError:
badrequest()
raise StopIteration
def data():
"""Returns the data sent with the request."""
if 'data' not in ctx:
cl = intget(ctx.env.get('CONTENT_LENGTH'), 0)
ctx.data = ctx.env['wsgi.input'].read(cl)
return ctx.data
def setcookie(name, value, expires="", domain=None):
"""Sets a cookie."""
if expires < 0:
expires = -1000000000
kargs = {'expires': expires, 'path':'/'}
if domain:
kargs['domain'] = domain
# @@ should we limit cookies to a different path?
cookie = Cookie.SimpleCookie()
cookie[name] = value
for key, val in kargs.iteritems():
cookie[name][key] = val
header('Set-Cookie', cookie.items()[0][1].OutputString())
def cookies(*requireds, **defaults):
"""
Returns a `storage` object with all the cookies in it.
See `storify` for how `requireds` and `defaults` work.
"""
cookie = Cookie.SimpleCookie()
cookie.load(ctx.env.get('HTTP_COOKIE', ''))
try:
return storify(cookie, *requireds, **defaults)
except KeyError:
badrequest()
raise StopIteration
def debug(*args):
"""
Prints a prettyprinted version of `args` to stderr.
"""
try:
out = ctx.environ['wsgi.errors']
except:
out = sys.stderr
for arg in args:
print >> out, pprint.pformat(arg)
return ''
def _debugwrite(x):
try:
out = ctx.environ['wsgi.errors']
except:
out = sys.stderr
out.write(x)
debug.write = _debugwrite
class _outputter:
"""Wraps `sys.stdout` so that print statements go into the response."""
def __init__(self, file): self.file = file
def write(self, string_):
if hasattr(ctx, 'output'):
return output(string_)
else:
self.file.write(string_)
def __getattr__(self, attr): return getattr(self.file, attr)
def __getitem__(self, item): return self.file[item]
def _capturedstdout():
sysstd = sys.stdout
while hasattr(sysstd, 'file'):
if isinstance(sys.stdout, _outputter): return True
sysstd = sysstd.file
if isinstance(sys.stdout, _outputter): return True
return False
if not _capturedstdout():
sys.stdout = _outputter(sys.stdout)
_context = {threading.currentThread(): storage()}
ctx = context = threadeddict(_context)
ctx.__doc__ = """
A `storage` object containing various information about the request:
`environ` (aka `env`)
: A dictionary containing the standard WSGI environment variables.
`host`
: The domain (`Host` header) requested by the user.
`home`
: The base path for the application.
`ip`
: The IP address of the requester.
`method`
: The HTTP method used.
`path`
: The path request.
`query`
: If there are no query arguments, the empty string. Otherwise, a `?` followed
by the query string.
`fullpath`
: The full path requested, including query arguments (`== path + query`).
### Response Data
`status` (default: "200 OK")
: The status code to be used in the response.
`headers`
: A list of 2-tuples to be used in the response.
`output`
: A string to be used as the response.
"""
loadhooks = {}
_loadhooks = {}
def load():
"""
Loads a new context for the thread.
You can ask for a function to be run at loadtime by
adding it to the dictionary `loadhooks`.
"""
_context[threading.currentThread()] = storage()
ctx.status = '200 OK'
ctx.headers = []
if config.get('db_parameters'):
import db
db.connect(**config.db_parameters)
for x in loadhooks.values(): x()
def _load(env):
load()
ctx.output = ''
ctx.environ = ctx.env = env
ctx.host = env.get('HTTP_HOST')
ctx.homedomain = 'http://' + env.get('HTTP_HOST', '[unknown]')
ctx.homepath = os.environ.get('REAL_SCRIPT_NAME', env.get('SCRIPT_NAME', ''))
ctx.home = ctx.homedomain + ctx.homepath
ctx.ip = env.get('REMOTE_ADDR')
ctx.method = env.get('REQUEST_METHOD')
ctx.path = env.get('PATH_INFO')
# http://trac.lighttpd.net/trac/ticket/406 requires:
if env.get('SERVER_SOFTWARE', '').startswith('lighttpd/'):
ctx.path = lstrips(env.get('REQUEST_URI').split('?')[0],
os.environ.get('REAL_SCRIPT_NAME', env.get('SCRIPT_NAME', '')))
if env.get('QUERY_STRING'):
ctx.query = '?' + env.get('QUERY_STRING', '')
else:
ctx.query = ''
ctx.fullpath = ctx.path + ctx.query
for x in _loadhooks.values(): x()
unloadhooks = {}
def unload():
"""
Unloads the context for the thread.
You can ask for a function to be run at loadtime by
adding it ot the dictionary `unloadhooks`.
"""
for x in unloadhooks.values(): x()
# ensures db cursors and such are GCed promptly
del _context[threading.currentThread()]
def _unload():
unload()
def wsgifunc(func, *middleware):
"""Returns a WSGI-compatible function from a webpy-function."""
middleware = list(middleware)
def wsgifunc(env, start_resp):
_load(env)
try:
result = func()
except StopIteration:
result = None
except:
print >> debug, traceback.format_exc()
result = internalerror()
is_generator = result and hasattr(result, 'next')
if is_generator:
# wsgi requires the headers first
# so we need to do an iteration
# and save the result for later
try:
firstchunk = result.next()
except StopIteration:
firstchunk = ''
status, headers, output = ctx.status, ctx.headers, ctx.output
ctx._write = start_resp(status, headers)
# and now, the fun:
def cleanup():
# we insert this little generator
# at the end of our itertools.chain
# so that it unloads the request
# when everything else is done
yield '' # force it to be a generator
_unload()
# result is the output of calling the webpy function
# it could be a generator...
if is_generator:
if firstchunk is flush:
# oh, it's just our special flush mode
# ctx._write is set up, so just continue execution
try:
result.next()
except StopIteration:
pass
_unload()
return []
else:
return itertools.chain([firstchunk], result, cleanup())
# ... but it's usually just None
#
# output is the stuff in ctx.output
# it's usually a string...
if isinstance(output, str): #@@ other stringlikes?
_unload()
return [output]
# it could be a generator...
elif hasattr(output, 'next'):
return itertools.chain(output, cleanup())
else:
_unload()
raise Exception, "Invalid ctx.output"
for mw_func in middleware:
wsgifunc = mw_func(wsgifunc)
return wsgifunc

View File

@ -1,54 +0,0 @@
"""
WSGI Utilities
(from web.py)
"""
import os, sys
import http
import webapi as web
from utils import listget
from net import validaddr, validip
import httpserver
def runfcgi(func, addr=('localhost', 8000)):
"""Runs a WSGI function as a FastCGI server."""
import flup.server.fcgi as flups
return flups.WSGIServer(func, multiplexed=True, bindAddress=addr).run()
def runscgi(func, addr=('localhost', 4000)):
"""Runs a WSGI function as an SCGI server."""
import flup.server.scgi as flups
return flups.WSGIServer(func, bindAddress=addr).run()
def runwsgi(func):
"""
Runs a WSGI-compatible `func` using FCGI, SCGI, or a simple web server,
as appropriate based on context and `sys.argv`.
"""
if os.environ.has_key('SERVER_SOFTWARE'): # cgi
os.environ['FCGI_FORCE_CGI'] = 'Y'
if (os.environ.has_key('PHP_FCGI_CHILDREN') #lighttpd fastcgi
or os.environ.has_key('SERVER_SOFTWARE')):
return runfcgi(func, None)
if 'fcgi' in sys.argv or 'fastcgi' in sys.argv:
args = sys.argv[1:]
if 'fastcgi' in args: args.remove('fastcgi')
elif 'fcgi' in args: args.remove('fcgi')
if args:
return runfcgi(func, validaddr(args[0]))
else:
return runfcgi(func, None)
if 'scgi' in sys.argv:
args = sys.argv[1:]
args.remove('scgi')
if args:
return runscgi(func, validaddr(args[0]))
else:
return runscgi(func)
return httpserver.runsimple(func, validip(listget(sys.argv, 1, '')))

File diff suppressed because it is too large Load Diff

View File

@ -47,7 +47,7 @@ $:render.header(_('Torrent list'))
<div id="tableContainer" class="tableContainer"> <div id="tableContainer" class="tableContainer">
<table class="torrent_list" border=0 cellspacing=0 cellpadding=2 id="torrent_list"> <table class="torrent_list" border=1 id="torrent_list">
<thead class="fixedHeader"> <thead class="fixedHeader">
<tr> <tr>
$:(sort_head('calc_state_str', 'S')) $:(sort_head('calc_state_str', 'S'))
@ -67,10 +67,9 @@ $:render.header(_('Torrent list'))
</tr> </tr>
</thead> </thead>
<tbody class="scrollContent"> <tbody class="scrollContent">
$altrow(True)
$#4-space indentation is mandatory for for-loops in templetor! $#4-space indentation is mandatory for for-loops in templetor!
$for torrent in torrent_list: $for torrent in torrent_list:
<tr class="$altrow()" onclick="on_click_row(event, '$torrent.id')" id="torrent_$torrent.id"> <tr class="torrent_table" onclick="on_click_row(event, '$torrent.id')" id="torrent_$torrent.id">
<td> <td>
<form action="/torrent/$torrent.action/$torrent.id" method="POST" <form action="/torrent/$torrent.action/$torrent.id" method="POST"
class="pause_resume"> class="pause_resume">

View File

@ -78,19 +78,6 @@ tr.torrent_table:hover {
background-color:#68a; background-color:#68a;
} }
tr.altrow0:hover {
background-color:#68a;
}
tr.altrow1:hover {
background-color:#68a;
}
tr.altrow1{
background-color: #37506f;
}
tr.torrent_table_selected { tr.torrent_table_selected {
background-color:#900; background-color:#900;
} }
@ -98,9 +85,6 @@ tr.torrent_table_selected {
th.torrent_table:hover { th.torrent_table:hover {
background-color:#68a; background-color:#68a;
} }
th.torrent_table {
background-color: #37506f;
}
img.button { img.button {
margin-bottom:0px; margin-bottom:0px;

View File

@ -45,9 +45,6 @@ function on_click_row_js(e, id) {
function select_row(id){ function select_row(id){
var row = get_row(id); var row = get_row(id);
if (row) { if (row) {
if (!(row.default_class_name)) {
row.default_class_name = row.className;
}
row.className = 'torrent_table_selected'; row.className = 'torrent_table_selected';
state.selected_rows[state.selected_rows.length] = id; state.selected_rows[state.selected_rows.length] = id;
setCookie('selected_rows',state.selected_rows); setCookie('selected_rows',state.selected_rows);
@ -57,7 +54,7 @@ function select_row(id){
function deselect_row(id){ function deselect_row(id){
var row = get_row(id); var row = get_row(id);
if (row) { if (row) {
row.className = row.default_class_name row.className = 'torrent_table'
/*remove from state.selected_rows*/ /*remove from state.selected_rows*/
var idx = state.selected_rows.indexOf(id); var idx = state.selected_rows.indexOf(id);
state.selected_rows.splice(idx,1); state.selected_rows.splice(idx,1);

View File

@ -31,8 +31,8 @@
""" """
initializes config,render and proxy. initializes config,render and proxy.
All hacks go here, so this is a really ugly source-file.. contains all hacks to support running in process0.5 ,run inside-gtk0.5 and
Support running in process0.5 ,run inside-gtk0.5 and run in process0.6 run in process0.6
""" """
import os import os
@ -41,7 +41,7 @@ import random
import pickle import pickle
import sys import sys
import base64 import base64
from lib.webpy022 import template from webpy022 import template
random.seed() random.seed()
webui_path = os.path.dirname(__file__) webui_path = os.path.dirname(__file__)
@ -163,8 +163,7 @@ def init_gtk_05():
def init_logger(): def init_logger():
#only for 0.5.. #only for 0.5..
import logging import logging
logging.basicConfig(level=logging.DEBUG, logging.basicConfig(level=logging.DEBUG,format="[%(levelname)-8s] %(module)s:%(lineno)d %(message)s")
format="[%(levelname)s] %(message)s")
globals()['log'] = logging globals()['log'] = logging

View File

@ -40,13 +40,13 @@ Todo's before stable:
-clear finished? -clear finished?
-torrent files. -torrent files.
""" """
import lib.webpy022 as web import webpy022 as web
from lib.webpy022.webapi import cookies, setcookie as w_setcookie from webpy022.webapi import cookies, setcookie as w_setcookie
from lib.webpy022.http import seeother, url from webpy022.http import seeother, url
from lib.webpy022 import template,changequery as self_url from webpy022 import template,changequery as self_url
from lib.webpy022.utils import Storage from webpy022.utils import Storage
from lib.static_handler import static_handler from static_handler import static_handler
from deluge.common import fsize,fspeed from deluge.common import fsize,fspeed
@ -144,8 +144,7 @@ def check_session(func):
return func if session is valid, else redirect to login page. return func if session is valid, else redirect to login page.
""" """
def deco(self, name = None): def deco(self, name = None):
log.debug('%s.%s(name=%s)' % (self.__class__.__name__, func.__name__, log.debug('%s.%s(name=%s)' % (self.__class__.__name__,func.__name__,name))
name))
vars = web.input(redir_after_login = None) vars = web.input(redir_after_login = None)
ck = cookies() ck = cookies()
if ck.has_key("session_id") and ck["session_id"] in ws.SESSIONS: if ck.has_key("session_id") and ck["session_id"] in ws.SESSIONS:
@ -283,7 +282,6 @@ def filter_torrent_state(torrent_list,filter_name):
,'queued':lambda t: (t.paused and not t.user_paused) ,'queued':lambda t: (t.paused and not t.user_paused)
,'paused':lambda t: (t.user_paused) ,'paused':lambda t: (t.user_paused)
,'seeding':lambda t:(t.is_seed and not t.paused ) ,'seeding':lambda t:(t.is_seed and not t.paused )
,'active':lambda t: (t.download_rate > 0 or t.upload_rate > 0)
} }
filter_func = filters[filter_name] filter_func = filters[filter_name]
return [t for t in torrent_list if filter_func(t)] return [t for t in torrent_list if filter_func(t)]
@ -302,8 +300,7 @@ def category_tabs(torrent_list):
(_('Downloading'),'downloading') , (_('Downloading'),'downloading') ,
(_('Queued'),'queued') , (_('Queued'),'queued') ,
(_('Paused'),'paused') , (_('Paused'),'paused') ,
(_('Seeding'),'seeding'), (_('Seeding'),'seeding')
(_('Active'),'active')
]: ]:
title += ' (%s)' % ( title += ' (%s)' % (
len(filter_torrent_state(torrent_list, filter_name)), ) len(filter_torrent_state(torrent_list, filter_name)), )
@ -352,17 +349,6 @@ def template_part_stats():
def get_config(var): def get_config(var):
return ws.config.get(var) return ws.config.get(var)
irow = 0
def altrow(reset = False):
global irow
if reset:
irow = 1
return
irow +=1
irow = irow % 2
return "altrow%s" % irow
template.Template.globals.update({ template.Template.globals.update({
'sort_head': template_sort_head, 'sort_head': template_sort_head,
'part_stats':template_part_stats, 'part_stats':template_part_stats,
@ -371,7 +357,6 @@ template.Template.globals.update({
'_': _ , #gettext/translations '_': _ , #gettext/translations
'str': str, #because % in templetor is broken. 'str': str, #because % in templetor is broken.
'sorted': sorted, 'sorted': sorted,
'altrow':altrow,
'get_config': get_config, 'get_config': get_config,
'self_url': self_url, 'self_url': self_url,
'fspeed': common.fspeed, 'fspeed': common.fspeed,
@ -385,9 +370,9 @@ template.Template.globals.update({
#/template-defs #/template-defs
def create_webserver(urls, methods): def create_webserver(urls, methods):
from lib.webpy022.request import webpyfunc from webpy022.request import webpyfunc
from lib.webpy022 import webapi from webpy022 import webapi
from lib.gtk_cherrypy_wsgiserver import CherryPyWSGIServer from gtk_cherrypy_wsgiserver import CherryPyWSGIServer
import os import os
func = webapi.wsgifunc(webpyfunc(urls, methods, False)) func = webapi.wsgifunc(webpyfunc(urls, methods, False))