mirror of
https://github.com/codex-storage/deluge.git
synced 2025-01-17 23:01:01 +00:00
271 lines
7.7 KiB
Python
271 lines
7.7 KiB
Python
"""
|
|
HTTP Utilities
|
|
(from web.py)
|
|
"""
|
|
|
|
__all__ = [
|
|
"expires", "lastmodified",
|
|
"prefixurl", "modified",
|
|
"redirect", "found", "seeother", "tempredirect",
|
|
"write",
|
|
"changequery", "url",
|
|
"background", "backgrounder",
|
|
"Reloader", "reloader", "profiler",
|
|
]
|
|
|
|
import sys, os, threading, urllib, urlparse
|
|
try: import datetime
|
|
except ImportError: pass
|
|
import net, utils, webapi as web
|
|
|
|
def prefixurl(base=''):
|
|
"""
|
|
Sorry, this function is really difficult to explain.
|
|
Maybe some other time.
|
|
"""
|
|
url = web.ctx.path.lstrip('/')
|
|
for i in xrange(url.count('/')):
|
|
base += '../'
|
|
if not base:
|
|
base = './'
|
|
return base
|
|
|
|
def expires(delta):
|
|
"""
|
|
Outputs an `Expires` header for `delta` from now.
|
|
`delta` is a `timedelta` object or a number of seconds.
|
|
"""
|
|
if isinstance(delta, (int, long)):
|
|
delta = datetime.timedelta(seconds=delta)
|
|
date_obj = datetime.datetime.utcnow() + delta
|
|
web.header('Expires', net.httpdate(date_obj))
|
|
|
|
def lastmodified(date_obj):
|
|
"""Outputs a `Last-Modified` header for `datetime`."""
|
|
web.header('Last-Modified', net.httpdate(date_obj))
|
|
|
|
def modified(date=None, etag=None):
|
|
n = web.ctx.env.get('HTTP_IF_NONE_MATCH')
|
|
m = net.parsehttpdate(web.ctx.env.get('HTTP_IF_MODIFIED_SINCE', '').split(';')[0])
|
|
validate = False
|
|
if etag:
|
|
raise NotImplementedError, "no etag support yet"
|
|
# should really be a warning
|
|
if date and m:
|
|
# we subtract a second because
|
|
# HTTP dates don't have sub-second precision
|
|
if date-datetime.timedelta(seconds=1) <= m:
|
|
validate = True
|
|
|
|
if validate: web.ctx.status = '304 Not Modified'
|
|
return not validate
|
|
|
|
"""
|
|
By default, these all return simple error messages that send very short messages
|
|
(like "bad request") to the user. They can and should be overridden
|
|
to return nicer ones.
|
|
"""
|
|
def redirect(url, status='301 Moved Permanently'):
|
|
"""
|
|
Returns a `status` redirect to the new URL.
|
|
`url` is joined with the base URL so that things like
|
|
`redirect("about") will work properly.
|
|
"""
|
|
newloc = urlparse.urljoin(web.ctx.path, url)
|
|
|
|
# if newloc is relative then make it absolute
|
|
#mvoncken:Disabled because we don't want to redirect to localhost!
|
|
#if newloc.startswith('/'):
|
|
# newloc = web.ctx.home + newloc
|
|
|
|
web.ctx.status = status
|
|
web.ctx.output = ''
|
|
web.header('Content-Type', 'text/html')
|
|
web.header('Location', newloc)
|
|
# seems to add a three-second delay for some reason:
|
|
# web.output('<a href="'+ newloc + '">moved permanently</a>')
|
|
|
|
def found(url):
|
|
"""A `302 Found` redirect."""
|
|
return redirect(url, '302 Found')
|
|
|
|
def seeother(url):
|
|
"""A `303 See Other` redirect."""
|
|
return redirect(url, '303 See Other')
|
|
|
|
def tempredirect(url):
|
|
"""A `307 Temporary Redirect` redirect."""
|
|
return redirect(url, '307 Temporary Redirect')
|
|
|
|
def write(cgi_response):
|
|
"""
|
|
Converts a standard CGI-style string response into `header` and
|
|
`output` calls.
|
|
"""
|
|
cgi_response = str(cgi_response)
|
|
cgi_response.replace('\r\n', '\n')
|
|
head, body = cgi_response.split('\n\n', 1)
|
|
lines = head.split('\n')
|
|
|
|
for line in lines:
|
|
if line.isspace():
|
|
continue
|
|
hdr, value = line.split(":", 1)
|
|
value = value.strip()
|
|
if hdr.lower() == "status":
|
|
web.ctx.status = value
|
|
else:
|
|
web.header(hdr, value)
|
|
|
|
web.output(body)
|
|
|
|
def urlencode(query):
|
|
"""
|
|
Same as urllib.urlencode, but supports unicode strings.
|
|
|
|
>>> urlencode({'text':'foo bar'})
|
|
'text=foo+bar'
|
|
"""
|
|
query = dict([(k, utils.utf8(v)) for k, v in query.items()])
|
|
return urllib.urlencode(query)
|
|
|
|
def changequery(query=None, **kw):
|
|
"""
|
|
Imagine you're at `/foo?a=1&b=2`. Then `changequery(a=3)` will return
|
|
`/foo?a=3&b=2` -- the same URL but with the arguments you requested
|
|
changed.
|
|
"""
|
|
if query is None:
|
|
query = web.input(_method='get')
|
|
for k, v in kw.iteritems():
|
|
if v is None:
|
|
query.pop(k, None)
|
|
else:
|
|
query[k] = v
|
|
out = web.ctx.path
|
|
if query:
|
|
out += '?' + urlencode(query)
|
|
return out
|
|
|
|
def url(path=None, **kw):
|
|
"""
|
|
Makes url by concatinating web.ctx.homepath and path and the
|
|
query string created using the arguments.
|
|
"""
|
|
if path is None:
|
|
path = web.ctx.path
|
|
if path.startswith("/"):
|
|
out = web.ctx.homepath + path
|
|
else:
|
|
out = path
|
|
|
|
if kw:
|
|
out += '?' + urlencode(kw)
|
|
|
|
return out
|
|
|
|
def background(func):
|
|
"""A function decorator to run a long-running function as a background thread."""
|
|
def internal(*a, **kw):
|
|
web.data() # cache it
|
|
|
|
tmpctx = web._context[threading.currentThread()]
|
|
web._context[threading.currentThread()] = utils.storage(web.ctx.copy())
|
|
|
|
def newfunc():
|
|
web._context[threading.currentThread()] = tmpctx
|
|
func(*a, **kw)
|
|
myctx = web._context[threading.currentThread()]
|
|
for k in myctx.keys():
|
|
if k not in ['status', 'headers', 'output']:
|
|
try: del myctx[k]
|
|
except KeyError: pass
|
|
|
|
t = threading.Thread(target=newfunc)
|
|
background.threaddb[id(t)] = t
|
|
t.start()
|
|
web.ctx.headers = []
|
|
return seeother(changequery(_t=id(t)))
|
|
return internal
|
|
background.threaddb = {}
|
|
|
|
def backgrounder(func):
|
|
def internal(*a, **kw):
|
|
i = web.input(_method='get')
|
|
if '_t' in i:
|
|
try:
|
|
t = background.threaddb[int(i._t)]
|
|
except KeyError:
|
|
return web.notfound()
|
|
web._context[threading.currentThread()] = web._context[t]
|
|
return
|
|
else:
|
|
return func(*a, **kw)
|
|
return internal
|
|
|
|
class Reloader:
|
|
"""
|
|
Before every request, checks to see if any loaded modules have changed on
|
|
disk and, if so, reloads them.
|
|
"""
|
|
def __init__(self, func):
|
|
self.func = func
|
|
self.mtimes = {}
|
|
# cheetah:
|
|
# b = _compiletemplate.bases
|
|
# _compiletemplate = globals()['__compiletemplate']
|
|
# _compiletemplate.bases = b
|
|
|
|
web.loadhooks['reloader'] = self.check
|
|
# todo:
|
|
# - replace relrcheck with a loadhook
|
|
#if reloader in middleware:
|
|
# relr = reloader(None)
|
|
# relrcheck = relr.check
|
|
# middleware.remove(reloader)
|
|
#else:
|
|
# relr = None
|
|
# relrcheck = lambda: None
|
|
# if relr:
|
|
# relr.func = wsgifunc
|
|
# return wsgifunc
|
|
#
|
|
|
|
|
|
def check(self):
|
|
for mod in sys.modules.values():
|
|
try:
|
|
mtime = os.stat(mod.__file__).st_mtime
|
|
except (AttributeError, OSError, IOError):
|
|
continue
|
|
if mod.__file__.endswith('.pyc') and \
|
|
os.path.exists(mod.__file__[:-1]):
|
|
mtime = max(os.stat(mod.__file__[:-1]).st_mtime, mtime)
|
|
if mod not in self.mtimes:
|
|
self.mtimes[mod] = mtime
|
|
elif self.mtimes[mod] < mtime:
|
|
try:
|
|
reload(mod)
|
|
self.mtimes[mod] = mtime
|
|
except ImportError:
|
|
pass
|
|
return True
|
|
|
|
def __call__(self, e, o):
|
|
self.check()
|
|
return self.func(e, o)
|
|
|
|
reloader = Reloader
|
|
|
|
def profiler(app):
|
|
"""Outputs basic profiling information at the bottom of each response."""
|
|
from utils import profile
|
|
def profile_internal(e, o):
|
|
out, result = profile(app)(e, o)
|
|
return out + ['<pre>' + net.websafe(result) + '</pre>']
|
|
return profile_internal
|
|
|
|
if __name__ == "__main__":
|
|
import doctest
|
|
doctest.testmod()
|