add get_present_days() and use it to skip indices
Signed-off-by: Jakub Sokołowski <jakub@status.im>
This commit is contained in:
parent
7d025dc40d
commit
0642e52d26
18
README.md
18
README.md
|
@ -8,9 +8,27 @@ The script queries an ElasticSearch endpoint for `logstash-*` indices and aggreg
|
||||||
|
|
||||||
This data is pushed to a PostgreSQL database in the following format:
|
This data is pushed to a PostgreSQL database in the following format:
|
||||||
```
|
```
|
||||||
|
peers=> \d peers;
|
||||||
|
Table "public.peers"
|
||||||
|
┌────────┬───────────────────────┬───────────┬──────────┬─────────┐
|
||||||
|
│ Column │ Type │ Collation │ Nullable │ Default │
|
||||||
|
├────────┼───────────────────────┼───────────┼──────────┼─────────┤
|
||||||
|
│ date │ date │ │ │ │
|
||||||
|
│ peer │ character varying(64) │ │ │ │
|
||||||
|
│ count │ integer │ │ │ │
|
||||||
|
└────────┴───────────────────────┴───────────┴──────────┴─────────┘
|
||||||
```
|
```
|
||||||
|
|
||||||
# Example
|
# Example
|
||||||
|
|
||||||
```
|
```
|
||||||
|
peers=> select * from peers limit 3;
|
||||||
|
┌────────────┬──────────────────────────────────────────────────────────────────┬───────┐
|
||||||
|
│ date │ peer │ count │
|
||||||
|
├────────────┼──────────────────────────────────────────────────────────────────┼───────┤
|
||||||
|
│ 2020-06-01 │ a18d4417b1d2fbddd7f9474250f703ba20472be5e1131bc09e35e9b18c1a5bf7 │ 1300 │
|
||||||
|
│ 2020-06-01 │ 7dba96249159cef53fbb5ec010c2d7799fec7dcaf8b1d9754559ce9fbd463328 │ 652 │
|
||||||
|
│ 2020-06-01 │ 3a13adfa4799f9505c83fab18d49a47f6de09344db3d96e18678c5d3c92f717e │ 632 │
|
||||||
|
└────────────┴──────────────────────────────────────────────────────────────────┴───────┘
|
||||||
|
(3 rows)
|
||||||
```
|
```
|
||||||
|
|
16
main.py
16
main.py
|
@ -54,14 +54,22 @@ def main():
|
||||||
opts.db_port
|
opts.db_port
|
||||||
)
|
)
|
||||||
|
|
||||||
data = []
|
days = psg.get_present_days()
|
||||||
|
present_indices = ['logstash-{}'.format(d.replace('-', '.')) for d in days]
|
||||||
|
|
||||||
|
peers = []
|
||||||
for index in esq.get_indices(opts.index_pattern):
|
for index in esq.get_indices(opts.index_pattern):
|
||||||
|
if index in present_indices:
|
||||||
|
continue
|
||||||
print('Index: {}'.format(index))
|
print('Index: {}'.format(index))
|
||||||
data.extend(esq.get_peers(index, opts.field, opts.max_size))
|
peers.extend(esq.get_peers(index, opts.field, opts.max_size))
|
||||||
|
|
||||||
rval = psg.get_most_recent_day()
|
if len(peers) == 0:
|
||||||
|
print('Nothing to insert into database.')
|
||||||
|
exit(0)
|
||||||
|
|
||||||
|
rval = psg.inject_peers(peers)
|
||||||
print(rval)
|
print(rval)
|
||||||
|
|
||||||
|
|
||||||
if __name__ == '__main__':
|
if __name__ == '__main__':
|
||||||
main()
|
main()
|
||||||
|
|
20
postgres.py
20
postgres.py
|
@ -1,4 +1,5 @@
|
||||||
import psycopg2
|
import psycopg2
|
||||||
|
from datetime import datetime
|
||||||
|
|
||||||
class PGDatabase:
|
class PGDatabase:
|
||||||
_SCHEMA = """
|
_SCHEMA = """
|
||||||
|
@ -20,6 +21,21 @@ class PGDatabase:
|
||||||
self.c.execute(self._SCHEMA)
|
self.c.execute(self._SCHEMA)
|
||||||
self.db.commit()
|
self.db.commit()
|
||||||
|
|
||||||
def get_most_recent_day(self):
|
def get_last_day(self):
|
||||||
rval = self.c.execute('SELECT date FROM peers ORDER BY date LIMIT 1;')
|
self.c.execute('SELECT date FROM peers ORDER BY date DESC LIMIT 1;')
|
||||||
|
return self.c.fetchone()
|
||||||
|
|
||||||
|
def get_present_days(self):
|
||||||
|
self.c.execute('SELECT DISTINCT date FROM peers;')
|
||||||
|
return [d[0].strftime('%Y-%m-%d') for d in self.c.fetchall()]
|
||||||
|
|
||||||
|
def inject_peers(self, peers):
|
||||||
|
args = ','.join(
|
||||||
|
self.c.mogrify('(%s,%s,%s)', peer.to_tuple()).decode('utf-8')
|
||||||
|
for peer in peers
|
||||||
|
)
|
||||||
|
rval = self.c.execute(
|
||||||
|
'INSERT INTO peers(date, peer, count) VALUES {}'.format(args)
|
||||||
|
)
|
||||||
|
self.db.commit()
|
||||||
return rval
|
return rval
|
||||||
|
|
19
query.py
19
query.py
|
@ -9,6 +9,15 @@ def remove_prefix(text, prefix):
|
||||||
def hash_string(text):
|
def hash_string(text):
|
||||||
return hashlib.sha256(text.encode('utf-8')).hexdigest()
|
return hashlib.sha256(text.encode('utf-8')).hexdigest()
|
||||||
|
|
||||||
|
class Peer:
|
||||||
|
|
||||||
|
def __init__(self, date, peer, count):
|
||||||
|
self.date = date
|
||||||
|
self.peer = peer
|
||||||
|
self.count = count
|
||||||
|
|
||||||
|
def to_tuple(self):
|
||||||
|
return (self.date, self.peer, self.count)
|
||||||
|
|
||||||
class ESQueryPeers():
|
class ESQueryPeers():
|
||||||
def __init__(self, host='localhost', port=9200, timeout=1200):
|
def __init__(self, host='localhost', port=9200, timeout=1200):
|
||||||
|
@ -40,10 +49,10 @@ class ESQueryPeers():
|
||||||
# Collect results as list of dicts
|
# Collect results as list of dicts
|
||||||
rval = []
|
rval = []
|
||||||
for bucket in aggs['peers']['buckets']:
|
for bucket in aggs['peers']['buckets']:
|
||||||
rval.append({
|
rval.append(Peer(
|
||||||
'Date': remove_prefix(index, 'logstash-'),
|
date = remove_prefix(index, 'logstash-'),
|
||||||
'Peer': hash_string(bucket['key']),
|
peer = hash_string(bucket['key']),
|
||||||
'Count': bucket['doc_count'],
|
count = bucket['doc_count']
|
||||||
})
|
))
|
||||||
|
|
||||||
return rval
|
return rval
|
||||||
|
|
Loading…
Reference in New Issue