fathom/pkg/aggregator/aggregator.go

121 lines
2.5 KiB
Go
Raw Normal View History

2018-05-14 15:31:33 +02:00
package aggregator
import (
"net/url"
"github.com/usefathom/fathom/pkg/datastore"
"github.com/usefathom/fathom/pkg/models"
log "github.com/sirupsen/logrus"
)
type aggregator struct {
database datastore.Datastore
}
2018-05-21 11:54:01 +02:00
// New returns a new aggregator instance with the database dependency injected.
func New(db datastore.Datastore) *aggregator {
return &aggregator{
database: db,
}
}
2018-05-21 11:54:01 +02:00
// Run processes the pageviews which are ready to be processed and adds them to daily aggregation
func (agg *aggregator) Run() int {
// Get unprocessed pageviews
pageviews, err := agg.database.GetProcessablePageviews()
if err != nil && err != datastore.ErrNoResults {
log.Error(err)
return 0
}
// Do we have anything to process?
n := len(pageviews)
if n == 0 {
return 0
}
results := agg.Process(pageviews)
2018-05-14 15:31:33 +02:00
// update stats
for _, site := range results.Sites {
err = agg.database.UpdateSiteStats(site)
2018-05-14 15:31:33 +02:00
if err != nil {
log.Error(err)
}
}
2018-05-14 15:31:33 +02:00
for _, pageStats := range results.Pages {
err = agg.database.UpdatePageStats(pageStats)
2018-05-14 15:31:33 +02:00
if err != nil {
log.Error(err)
}
}
for _, referrerStats := range results.Referrers {
err = agg.database.UpdateReferrerStats(referrerStats)
2018-05-14 15:31:33 +02:00
if err != nil {
log.Error(err)
}
}
2018-05-08 12:31:51 +02:00
2018-05-14 15:31:33 +02:00
// finally, remove pageviews that we just processed
err = agg.database.DeletePageviews(pageviews)
2018-05-14 15:31:33 +02:00
if err != nil {
log.Error(err)
}
return n
2018-05-14 15:31:33 +02:00
}
2018-05-21 11:54:01 +02:00
// Process processes the given pageviews and returns the (aggregated) results per metric per day
func (agg *aggregator) Process(pageviews []*models.Pageview) *results {
2018-05-14 15:31:33 +02:00
log.Debugf("processing %d pageviews", len(pageviews))
2018-05-21 11:54:01 +02:00
results := newResults()
2018-05-14 15:31:33 +02:00
for _, p := range pageviews {
site, err := agg.getSiteStats(results, p.Timestamp)
2018-05-14 15:31:33 +02:00
if err != nil {
log.Error(err)
2018-05-14 15:31:33 +02:00
continue
}
site.HandlePageview(p)
pageStats, err := agg.getPageStats(results, p.Timestamp, p.Hostname, p.Pathname)
2018-05-14 15:31:33 +02:00
if err != nil {
log.Error(err)
2018-05-14 15:31:33 +02:00
continue
}
pageStats.HandlePageview(p)
2018-05-08 10:45:17 +02:00
// referrer stats
2018-05-07 19:33:52 +02:00
if p.Referrer != "" {
hostname, pathname, err := parseUrlParts(p.Referrer)
if err != nil {
log.Error(err)
continue
}
referrerStats, err := agg.getReferrerStats(results, p.Timestamp, hostname, pathname)
2018-05-14 15:31:33 +02:00
if err != nil {
log.Error(err)
2018-05-14 15:31:33 +02:00
continue
2018-05-07 19:33:52 +02:00
}
referrerStats.HandlePageview(p)
2018-05-07 19:33:52 +02:00
}
}
2018-05-14 15:31:33 +02:00
return results
}
func parseUrlParts(s string) (string, string, error) {
u, err := url.Parse(s)
if err != nil {
return "", "", err
}
return u.Scheme + "://" + u.Host, u.Path, nil
}