test updates for blocklist - tarka

This commit is contained in:
Marcos Pinto 2007-06-24 10:32:51 +00:00
parent 61f000fae9
commit 08cd36de7f
6 changed files with 75 additions and 2 deletions

View File

@ -71,27 +71,43 @@ class BlocklistImport:
if fetch: if fetch:
print "Fetching",self.config.get('url') print "Fetching",self.config.get('url')
self.gtkprog.start_download() self.gtkprog.start_download()
try:
filename, headers = urllib.urlretrieve(self.config.get('url'), filename, headers = urllib.urlretrieve(self.config.get('url'),
filename=self.blockfile, filename=self.blockfile,
reporthook=self._download_update) reporthook=self._download_update)
except IOError, (errno, strerr):
err = ui.GTKError("Couldn't download URL: %s"%strerr)
self.gtkprog.stop()
return
self.gtkprog.start_import() self.gtkprog.start_import()
self.core.reset_ip_filter() self.core.reset_ip_filter()
ltype = self.config.get('listtype') ltype = self.config.get('listtype')
print "importing with",ltype print "importing with",ltype
reader = readers[ltype][1](self.blockfile)
try:
reader = readers[ltype][1](self.blockfile)
except IOError, (errno, strerr):
err = ui.GTKError("Couldn't open blocklist file: %s"%strerr)
self.gtkprog.stop()
return
print "Starting import"
ips = reader.next() ips = reader.next()
curr = 0 curr = 0
while ips and not self.cancelled: while ips and not self.cancelled:
self.core.add_range_to_ip_filter(*ips) self.core.add_range_to_ip_filter(*ips)
ips = reader.next() ips = reader.next()
curr += 1 curr += 1
if curr % 100 == 0:
self.gtkprog.import_prog(text="Imported %s IPs"%curr)
else:
self.gtkprog.import_prog() self.gtkprog.import_prog()
reader.close() reader.close()
self.gtkprog.end_import() self.gtkprog.end_import()
print "Import complete"
self.gtkprog.stop() self.gtkprog.stop()

Binary file not shown.

View File

@ -0,0 +1,4 @@
General Electric Company:3.0.0.0-3.255.255.255
s0-0.ciscoseattle.bbnplanet.net:4.0.25.146-4.0.25.148

View File

@ -0,0 +1,4 @@
Bogon,:0.0.0.0-3.255.255.255
s0-0.c:4.0.25.146-4.0.25.148

View File

@ -0,0 +1,20 @@
import unittest
from text import TextReader, GZMuleReader
class ImportTests(unittest.TestCase):
def testpgtext(self):
tr = TextReader("pg.txt")
ips = tr.next()
self.assertEqual("3.0.0.0", ips[0])
self.assertEqual("3.255.255.255", ips[1])
def testMule(self):
mr = GZMuleReader("nipfilter.dat.gz")
ips = mr.next()
self.assertEqual("0.0.0.0", ips[0])
self.assertEqual("3.255.255.255", ips[1])
if __name__ == '__main__':
unittest.main()

View File

@ -114,7 +114,9 @@ class GTKProgress(gtk.Dialog):
self.progress.set_pulse_step(0.0075) self.progress.set_pulse_step(0.0075)
self.update() self.update()
def import_prog(self): def import_prog(self, text=None):
if text:
self.progress.set_text(text)
self.progress.pulse() self.progress.pulse()
self.update() self.update()
@ -142,3 +144,30 @@ class GTKProgress(gtk.Dialog):
def update(self): def update(self):
while gtk.events_pending(): while gtk.events_pending():
not gtk.main_iteration(block=True) not gtk.main_iteration(block=True)
class GTKError(gtk.Dialog):
def __init__(self, message):
gtk.Dialog.__init__(self, title="Error",
flags=gtk.DIALOG_MODAL,
buttons=(gtk.STOCK_OK, gtk.RESPONSE_ACCEPT))
# Setup
self.set_border_width(12)
self.vbox.set_spacing(6)
# List source
label = gtk.Label()
label.set_text(message)
self.vbox.pack_start(label)
self.connect('response', self.ok)
self.connect('close', self.cancel)
self.show_all()
def ok(self, dialog, response):
self.hide_all()
def cancel(self, dialog):
self.hide_all()