moving helper functions to tools.py

Signed-off-by: Csaba Kiraly <csaba.kiraly@gmail.com>
This commit is contained in:
Csaba Kiraly 2023-03-01 22:34:16 +01:00
parent 68fdaf3572
commit 66a9d66dc6
No known key found for this signature in database
GPG Key ID: 0FE274EE8C95166E
2 changed files with 54 additions and 54 deletions

View File

@ -1,7 +1,9 @@
#!/bin/python3
import logging
import sys
import random
from bitarray.util import zeros
class CustomFormatter():
"""This class defines the terminal output formatting."""
@ -28,3 +30,53 @@ class CustomFormatter():
formatter = logging.Formatter(log_fmt)
return formatter.format(record)
def shuffled(lis, shuffle=True):
''' Generator yielding list in shuffled order
'''
# based on https://stackoverflow.com/a/60342323
if shuffle:
for index in random.sample(range(len(lis)), len(lis)):
yield lis[index]
else:
for v in lis:
yield v
def shuffledDict(d, shuffle=True):
''' Generator yielding dictionary in shuffled order
Shuffle, except if not (optional parameter useful for experiment setup)
'''
if shuffle:
lis = list(d.items())
for index in random.sample(range(len(d)), len(d)):
yield lis[index]
else:
for kv in d.items():
yield kv
def sampleLine(line, limit):
""" sample up to 'limit' bits from a bitarray
Since this is quite expensive, we use a number of heuristics to get it fast.
"""
if limit == sys.maxsize :
return line
else:
w = line.count(1)
if limit >= w :
return line
else:
l = len(line)
r = zeros(l)
if w < l/10 or limit > l/2 :
indices = [ i for i in range(l) if line[i] ]
sample = random.sample(indices, limit)
for i in sample:
r[i] = 1
return r
else:
while limit:
i = random.randrange(0, l)
if line[i] and not r[i]:
r[i] = 1
limit -= 1
return r

View File

@ -3,64 +3,12 @@
import random
import collections
import logging
import sys
from DAS.block import *
from bitarray import bitarray
from DAS.tools import shuffled, shuffledDict
from bitarray.util import zeros
from collections import deque
from itertools import chain
def shuffled(lis, shuffle=True):
''' Generator yielding list in shuffled order
'''
# based on https://stackoverflow.com/a/60342323
if shuffle:
for index in random.sample(range(len(lis)), len(lis)):
yield lis[index]
else:
for v in lis:
yield v
def shuffledDict(d, shuffle=True):
''' Generator yielding dictionary in shuffled order
Shuffle, except if not (optional parameter useful for experiment setup)
'''
if shuffle:
lis = list(d.items())
for index in random.sample(range(len(d)), len(d)):
yield lis[index]
else:
for kv in d.items():
yield kv
def sampleLine(line, limit):
""" sample up to 'limit' bits from a bitarray
Since this is quite expensive, we use a number of heuristics to get it fast.
"""
if limit == sys.maxsize :
return line
else:
w = line.count(1)
if limit >= w :
return line
else:
l = len(line)
r = zeros(l)
if w < l/10 or limit > l/2 :
indices = [ i for i in range(l) if line[i] ]
sample = random.sample(indices, limit)
for i in sample:
r[i] = 1
return r
else:
while limit:
i = random.randrange(0, l)
if line[i] and not r[i]:
r[i] = 1
limit -= 1
return r
class Neighbor:
"""This class implements a node neighbor to monitor sent and received data."""