fathom/pkg/aggregator/aggregator.go

203 lines
4.8 KiB
Go
Raw Normal View History

2018-05-14 15:31:33 +02:00
package aggregator
import (
"errors"
"net/url"
"strings"
2018-12-24 12:04:29 +01:00
"time"
"github.com/usefathom/fathom/pkg/datastore"
"github.com/usefathom/fathom/pkg/models"
log "github.com/sirupsen/logrus"
)
type Aggregator struct {
database datastore.Datastore
}
type Report struct {
Processed int
PoolEmpty bool
2018-12-24 12:04:29 +01:00
Duration time.Duration
}
type results struct {
Sites map[string]*models.SiteStats
Pages map[string]*models.PageStats
Referrers map[string]*models.ReferrerStats
}
2018-05-21 11:54:01 +02:00
// New returns a new aggregator instance with the database dependency injected.
func New(db datastore.Datastore) *Aggregator {
return &Aggregator{
database: db,
}
}
2018-05-21 11:54:01 +02:00
// Run processes the pageviews which are ready to be processed and adds them to daily aggregation
func (agg *Aggregator) Run() Report {
2018-12-24 12:04:29 +01:00
startTime := time.Now()
// Get unprocessed pageviews
limit := 10000
pageviews, err := agg.database.GetProcessablePageviews(limit)
emptyReport := Report{
Processed: 0,
PoolEmpty: true,
}
if err != nil && err != datastore.ErrNoResults {
log.Error(err)
return emptyReport
}
// Do we have anything to process?
n := len(pageviews)
if n == 0 {
return emptyReport
}
2018-10-05 14:34:39 +02:00
results := &results{
Sites: map[string]*models.SiteStats{},
Pages: map[string]*models.PageStats{},
Referrers: map[string]*models.ReferrerStats{},
2018-05-14 15:31:33 +02:00
}
2018-05-08 12:31:51 +02:00
sites, err := agg.database.GetSites()
if err != nil {
log.Error(err)
return emptyReport
}
// create map of public tracking ID's => site ID
trackingIDMap := make(map[string]int64, len(sites)+1)
for _, s := range sites {
trackingIDMap[s.TrackingID] = s.ID
}
// if no explicit site ID was given in the tracking request, default to site with ID 1
trackingIDMap[""] = 1
// setup referrer spam blacklist
blacklist, err := newBlacklist()
if err != nil {
log.Error(err)
return emptyReport
}
2018-10-05 15:13:48 +02:00
// add each pageview to the various statistics we gather
2018-05-14 15:31:33 +02:00
for _, p := range pageviews {
// discard pageview if site tracking ID is unknown
siteID, ok := trackingIDMap[p.SiteTrackingID]
if !ok {
log.Debugf("Skipping pageview because of unrecognized site tracking ID %s", p.SiteTrackingID)
continue
}
// start with referrer because we may want to skip this pageview altogether if it is referrer spam
if p.Referrer != "" {
ref, err := parseReferrer(p.Referrer)
if err != nil {
log.Debugf("Skipping pageview from referrer %s because of malformed referrer URL", p.Referrer)
continue
}
// ignore out pageviews from blacklisted referrers
// we use Hostname() here to discard port numbers
if blacklist.Has(ref.Hostname()) {
log.Debugf("Skipping pageview from referrer %s because of blacklist", p.Referrer)
continue
}
hostname := ref.Scheme + "://" + ref.Host
referrerStats, err := agg.getReferrerStats(results, siteID, p.Timestamp, hostname, ref.Path)
if err != nil {
log.Error(err)
continue
}
referrerStats.HandlePageview(p)
}
// get existing site stats so we can add this pageview to it
site, err := agg.getSiteStats(results, siteID, p.Timestamp)
2018-05-14 15:31:33 +02:00
if err != nil {
log.Error(err)
2018-05-14 15:31:33 +02:00
continue
}
site.HandlePageview(p)
pageStats, err := agg.getPageStats(results, siteID, p.Timestamp, p.Hostname, p.Pathname)
2018-05-14 15:31:33 +02:00
if err != nil {
log.Error(err)
2018-05-14 15:31:33 +02:00
continue
}
pageStats.HandlePageview(p)
2018-05-07 19:33:52 +02:00
}
2018-10-05 14:34:39 +02:00
// update stats
for _, site := range results.Sites {
if err := agg.database.SaveSiteStats(site); err != nil {
2018-10-05 14:34:39 +02:00
log.Error(err)
}
}
for _, pageStats := range results.Pages {
if err := agg.database.SavePageStats(pageStats); err != nil {
2018-10-05 14:34:39 +02:00
log.Error(err)
}
}
for _, referrerStats := range results.Referrers {
if err := agg.database.SaveReferrerStats(referrerStats); err != nil {
2018-10-05 14:34:39 +02:00
log.Error(err)
}
}
// finally, remove pageviews that we just processed
if err := agg.database.DeletePageviews(pageviews); err != nil {
2018-10-05 14:34:39 +02:00
log.Error(err)
}
2018-12-24 12:04:29 +01:00
endTime := time.Now()
dur := endTime.Sub(startTime)
report := Report{
Processed: n,
PoolEmpty: n < limit,
2018-12-24 12:04:29 +01:00
Duration: dur,
}
2018-12-24 12:04:29 +01:00
log.Debugf("processed %d pageviews. took: %s, pool empty: %v", report.Processed, report.Duration, report.PoolEmpty)
return report
}
// parseReferrer parses the referrer string & normalizes it
func parseReferrer(r string) (*url.URL, error) {
u, err := url.Parse(r)
if err != nil {
return nil, err
}
// always require a hostname
if u.Host == "" {
return nil, errors.New("malformed URL, empty host")
}
// remove AMP & UTM vars
if u.RawQuery != "" {
q := u.Query()
keys := []string{"amp", "utm_campaign", "utm_medium", "utm_source"}
for _, k := range keys {
q.Del(k)
}
u.RawQuery = q.Encode()
}
// remove amp/ suffix (but keep trailing slash)
if strings.HasSuffix(u.Path, "/amp/") {
u.Path = u.Path[0:(len(u.Path) - 4)]
}
// re-parse our normalized string into a new URL struct
return url.Parse(u.String())
}