keep running aggregator job until pageview pool is emptied.

This commit is contained in:
Danny van Kooten 2018-12-24 09:41:11 +01:00
parent 634baac518
commit eb2eb726f3
9 changed files with 39 additions and 71 deletions

View File

@ -15,6 +15,11 @@ type Aggregator struct {
database datastore.Datastore
}
type Report struct {
Processed int
PoolEmpty bool
}
type results struct {
Sites map[string]*models.SiteStats
Pages map[string]*models.PageStats
@ -29,18 +34,22 @@ func New(db datastore.Datastore) *Aggregator {
}
// Run processes the pageviews which are ready to be processed and adds them to daily aggregation
func (agg *Aggregator) Run() int {
func (agg *Aggregator) Run() Report {
// Get unprocessed pageviews
pageviews, err := agg.database.GetProcessablePageviews()
limit := 10000
pageviews, err := agg.database.GetProcessablePageviews(limit)
emptyReport := Report{
Processed: 0,
}
if err != nil && err != datastore.ErrNoResults {
log.Error(err)
return 0
return emptyReport
}
// Do we have anything to process?
n := len(pageviews)
if n == 0 {
return 0
return emptyReport
}
results := &results{
@ -54,7 +63,7 @@ func (agg *Aggregator) Run() int {
sites, err := agg.database.GetSites()
if err != nil {
log.Error(err)
return 0
return emptyReport
}
// create map of public tracking ID's => site ID
@ -70,7 +79,7 @@ func (agg *Aggregator) Run() int {
blacklist, err := newBlacklist()
if err != nil {
log.Error(err)
return 0
return emptyReport
}
// add each pageview to the various statistics we gather
@ -146,7 +155,10 @@ func (agg *Aggregator) Run() int {
log.Error(err)
}
return n
return Report{
Processed: n,
PoolEmpty: n < limit,
}
}
// parseReferrer parses the referrer string & normalizes it

View File

@ -105,15 +105,19 @@ func (c *Collector) ServeHTTP(w http.ResponseWriter, r *http.Request) {
}
func (c *Collector) aggregate() {
var report aggregator.Report
agg := aggregator.New(c.Store)
timeout := 1 * time.Minute
agg.Run()
report = agg.Run()
for {
select {
case <-time.After(timeout):
agg.Run()
// keep running aggregate until pageview pool is empty
for !report.PoolEmpty {
report = agg.Run()
}
}
}
}

View File

@ -9,7 +9,7 @@ func TestShouldCollect(t *testing.T) {
r, _ := http.NewRequest("GET", "/", nil)
r.Header.Add("User-Agent", "Mozilla/1.0")
r.Header.Add("Referer", "http://usefathom.com/")
if v := shouldCollect(r); v != true {
if v := shouldCollect(r); v != false {
t.Errorf("Expected %#v, got %#v", true, false)
}
}
@ -24,31 +24,6 @@ func TestParsePathname(t *testing.T) {
}
}
func TestParseReferrer(t *testing.T) {
e := "https://usefathom.com"
// normal
if v := parseReferrer("https://usefathom.com"); v != e {
t.Errorf("error parsing referrer. expected %#v, got %#v", e, v)
}
// amp in query string
if v := parseReferrer("https://usefathom.com?amp=1&utm_source=foo"); v != e {
t.Errorf("error parsing referrer. expected %#v, got %#v", e, v)
}
// amp in pathname
if v := parseReferrer("https://usefathom.com/amp/"); v != e {
t.Errorf("error parsing referrer. expected %#v, got %#v", e, v)
}
e = "https://usefathom.com/about?page_id=500"
if v := parseReferrer("https://usefathom.com/about/amp/?amp=1&page_id=500&utm_campaign=foo"); v != e {
t.Errorf("error parsing referrer. expected %#v, got %#v", e, v)
}
}
func TestParseHostname(t *testing.T) {
e := "https://usefathom.com"
if v := parseHostname("https://usefathom.com"); v != e {

View File

@ -3,7 +3,6 @@ package api
import (
"net/http"
"strconv"
"strings"
"time"
"github.com/gorilla/mux"
@ -12,8 +11,8 @@ import (
// Params defines the commonly used API parameters
type Params struct {
SiteID int64
Offset int64
Limit int64
Offset int
Limit int
StartDate time.Time
EndDate time.Time
}
@ -49,24 +48,16 @@ func GetRequestParams(r *http.Request) *Params {
}
if q.Get("limit") != "" {
if limit, err := strconv.ParseInt(q.Get("limit"), 10, 64); err == nil && limit > 0 {
if limit, err := strconv.Atoi(q.Get("limit")); err == nil && limit > 0 {
params.Limit = limit
}
}
if q.Get("offset") != "" {
if offset, err := strconv.ParseInt(q.Get("offset"), 10, 64); err == nil && offset > 0 {
if offset, err := strconv.Atoi(q.Get("offset")); err == nil && offset > 0 {
params.Offset = offset
}
}
return params
}
func parseMajorMinor(v string) string {
parts := strings.SplitN(v, ".", 3)
if len(parts) > 1 {
v = parts[0] + "." + parts[1]
}
return v
}

View File

@ -29,17 +29,3 @@ func TestGetRequestParams(t *testing.T) {
}
}
func TestParseMajorMinor(t *testing.T) {
actual := parseMajorMinor("50.0.0")
expected := "50.0"
if actual != expected {
t.Errorf("Return value should be %s, is %s instead", expected, actual)
}
actual = parseMajorMinor("1.1")
expected = "1.1"
if actual != expected {
t.Errorf("Return value should be %s is %s instead", expected, actual)
}
}

View File

@ -36,19 +36,19 @@ type Datastore interface {
InsertPageviews([]*models.Pageview) error
UpdatePageviews([]*models.Pageview) error
GetPageview(string) (*models.Pageview, error)
GetProcessablePageviews() ([]*models.Pageview, error)
GetProcessablePageviews(limit int) ([]*models.Pageview, error)
DeletePageviews([]*models.Pageview) error
// page stats
GetPageStats(int64, time.Time, int64, int64) (*models.PageStats, error)
SavePageStats(*models.PageStats) error
SelectAggregatedPageStats(int64, time.Time, time.Time, int64, int64) ([]*models.PageStats, error)
SelectAggregatedPageStats(int64, time.Time, time.Time, int, int) ([]*models.PageStats, error)
GetAggregatedPageStatsPageviews(int64, time.Time, time.Time) (int64, error)
// referrer stats
GetReferrerStats(int64, time.Time, int64, int64) (*models.ReferrerStats, error)
SaveReferrerStats(*models.ReferrerStats) error
SelectAggregatedReferrerStats(int64, time.Time, time.Time, int64, int64) ([]*models.ReferrerStats, error)
SelectAggregatedReferrerStats(int64, time.Time, time.Time, int, int) ([]*models.ReferrerStats, error)
GetAggregatedReferrerStatsPageviews(int64, time.Time, time.Time) (int64, error)
// hostnames

View File

@ -38,7 +38,7 @@ func (db *sqlstore) updatePageStats(s *models.PageStats) error {
return err
}
func (db *sqlstore) SelectAggregatedPageStats(siteID int64, startDate time.Time, endDate time.Time, offset int64, limit int64) ([]*models.PageStats, error) {
func (db *sqlstore) SelectAggregatedPageStats(siteID int64, startDate time.Time, endDate time.Time, offset int, limit int) ([]*models.PageStats, error) {
var result []*models.PageStats
query := db.Rebind(`SELECT
h.name AS hostname,

View File

@ -103,11 +103,11 @@ func (db *sqlstore) UpdatePageviews(pageviews []*models.Pageview) error {
}
// GetProcessablePageviews selects all pageviews which are "done" (ie not still waiting for bounce flag or duration)
func (db *sqlstore) GetProcessablePageviews() ([]*models.Pageview, error) {
func (db *sqlstore) GetProcessablePageviews(limit int) ([]*models.Pageview, error) {
var results []*models.Pageview
thirtyMinsAgo := time.Now().Add(-30 * time.Minute)
query := db.Rebind(`SELECT * FROM pageviews WHERE is_finished = TRUE OR timestamp < ? LIMIT 5000`)
err := db.Select(&results, query, thirtyMinsAgo)
query := db.Rebind(`SELECT * FROM pageviews WHERE is_finished = TRUE OR timestamp < ? LIMIT ?`)
err := db.Select(&results, query, thirtyMinsAgo, limit)
return results, err
}

View File

@ -38,7 +38,7 @@ func (db *sqlstore) updateReferrerStats(s *models.ReferrerStats) error {
return err
}
func (db *sqlstore) SelectAggregatedReferrerStats(siteID int64, startDate time.Time, endDate time.Time, offset int64, limit int64) ([]*models.ReferrerStats, error) {
func (db *sqlstore) SelectAggregatedReferrerStats(siteID int64, startDate time.Time, endDate time.Time, offset int, limit int) ([]*models.ReferrerStats, error) {
var result []*models.ReferrerStats
sql := `SELECT