2018-05-14 15:31:33 +02:00
|
|
|
package aggregator
|
2018-05-06 11:53:19 +02:00
|
|
|
|
|
|
|
import (
|
2018-08-09 14:43:42 +02:00
|
|
|
"net/url"
|
|
|
|
|
2018-05-06 11:53:19 +02:00
|
|
|
"github.com/usefathom/fathom/pkg/datastore"
|
|
|
|
"github.com/usefathom/fathom/pkg/models"
|
2018-05-07 16:05:53 +02:00
|
|
|
|
|
|
|
log "github.com/sirupsen/logrus"
|
2018-05-06 11:53:19 +02:00
|
|
|
)
|
|
|
|
|
2018-10-10 09:02:32 +02:00
|
|
|
type Aggregator struct {
|
2018-05-15 13:30:37 +02:00
|
|
|
database datastore.Datastore
|
|
|
|
}
|
|
|
|
|
2018-10-10 09:02:32 +02:00
|
|
|
type results struct {
|
|
|
|
Sites map[string]*models.SiteStats
|
|
|
|
Pages map[string]*models.PageStats
|
|
|
|
Referrers map[string]*models.ReferrerStats
|
|
|
|
}
|
|
|
|
|
2018-05-21 11:54:01 +02:00
|
|
|
// New returns a new aggregator instance with the database dependency injected.
|
2018-10-10 09:02:32 +02:00
|
|
|
func New(db datastore.Datastore) *Aggregator {
|
|
|
|
return &Aggregator{
|
2018-05-15 13:30:37 +02:00
|
|
|
database: db,
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2018-05-21 11:54:01 +02:00
|
|
|
// Run processes the pageviews which are ready to be processed and adds them to daily aggregation
|
2018-10-10 09:02:32 +02:00
|
|
|
func (agg *Aggregator) Run() int {
|
2018-05-06 11:53:19 +02:00
|
|
|
// Get unprocessed pageviews
|
2018-05-15 13:30:37 +02:00
|
|
|
pageviews, err := agg.database.GetProcessablePageviews()
|
2018-05-06 11:53:19 +02:00
|
|
|
if err != nil && err != datastore.ErrNoResults {
|
2018-05-09 10:36:05 +02:00
|
|
|
log.Error(err)
|
2018-06-01 12:12:15 +02:00
|
|
|
return 0
|
2018-05-06 11:53:19 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
// Do we have anything to process?
|
2018-06-01 12:12:15 +02:00
|
|
|
n := len(pageviews)
|
|
|
|
if n == 0 {
|
|
|
|
return 0
|
2018-05-06 11:53:19 +02:00
|
|
|
}
|
|
|
|
|
2018-10-05 14:34:39 +02:00
|
|
|
results := &results{
|
|
|
|
Sites: map[string]*models.SiteStats{},
|
|
|
|
Pages: map[string]*models.PageStats{},
|
|
|
|
Referrers: map[string]*models.ReferrerStats{},
|
2018-05-14 15:31:33 +02:00
|
|
|
}
|
2018-05-08 12:31:51 +02:00
|
|
|
|
2018-05-14 15:31:33 +02:00
|
|
|
log.Debugf("processing %d pageviews", len(pageviews))
|
|
|
|
|
2018-10-05 14:19:11 +02:00
|
|
|
sites, err := agg.database.GetSites()
|
|
|
|
if err != nil {
|
|
|
|
log.Error(err)
|
2018-10-05 14:34:39 +02:00
|
|
|
return 0
|
2018-10-05 14:19:11 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
// create map of public tracking ID's => site ID
|
|
|
|
trackingIDMap := make(map[string]int64, len(sites)+1)
|
|
|
|
for _, s := range sites {
|
|
|
|
trackingIDMap[s.TrackingID] = s.ID
|
|
|
|
}
|
|
|
|
|
2018-10-10 09:02:32 +02:00
|
|
|
// if no explicit site ID was given in the tracking request, default to site with ID 1
|
|
|
|
trackingIDMap[""] = 1
|
|
|
|
|
2018-10-05 15:13:48 +02:00
|
|
|
// add each pageview to the various statistics we gather
|
2018-05-14 15:31:33 +02:00
|
|
|
for _, p := range pageviews {
|
2018-10-05 14:19:11 +02:00
|
|
|
|
|
|
|
// discard pageview if site tracking ID is unknown
|
|
|
|
siteID, ok := trackingIDMap[p.SiteTrackingID]
|
|
|
|
if !ok {
|
2018-10-10 09:02:32 +02:00
|
|
|
log.Debugf("discarding pageview because of unrecognized site tracking ID %s", p.SiteTrackingID)
|
2018-10-05 14:19:11 +02:00
|
|
|
continue
|
|
|
|
}
|
|
|
|
|
|
|
|
// get existing site stats so we can add this pageview to it
|
|
|
|
site, err := agg.getSiteStats(results, siteID, p.Timestamp)
|
2018-05-14 15:31:33 +02:00
|
|
|
if err != nil {
|
2018-08-09 14:43:42 +02:00
|
|
|
log.Error(err)
|
2018-05-14 15:31:33 +02:00
|
|
|
continue
|
2018-05-08 11:52:01 +02:00
|
|
|
}
|
2018-08-09 14:43:42 +02:00
|
|
|
site.HandlePageview(p)
|
2018-05-08 11:52:01 +02:00
|
|
|
|
2018-10-05 14:19:11 +02:00
|
|
|
pageStats, err := agg.getPageStats(results, siteID, p.Timestamp, p.Hostname, p.Pathname)
|
2018-05-14 15:31:33 +02:00
|
|
|
if err != nil {
|
2018-08-09 14:57:35 +02:00
|
|
|
log.Error(err)
|
2018-05-14 15:31:33 +02:00
|
|
|
continue
|
2018-05-07 16:05:53 +02:00
|
|
|
}
|
2018-08-09 14:57:35 +02:00
|
|
|
pageStats.HandlePageview(p)
|
2018-05-07 16:05:53 +02:00
|
|
|
|
2018-05-08 10:45:17 +02:00
|
|
|
// referrer stats
|
2018-05-07 19:33:52 +02:00
|
|
|
if p.Referrer != "" {
|
2018-08-09 15:04:38 +02:00
|
|
|
hostname, pathname, err := parseUrlParts(p.Referrer)
|
|
|
|
if err != nil {
|
|
|
|
log.Error(err)
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
|
2018-10-05 14:19:11 +02:00
|
|
|
referrerStats, err := agg.getReferrerStats(results, siteID, p.Timestamp, hostname, pathname)
|
2018-05-14 15:31:33 +02:00
|
|
|
if err != nil {
|
2018-08-09 15:04:38 +02:00
|
|
|
log.Error(err)
|
2018-05-14 15:31:33 +02:00
|
|
|
continue
|
2018-05-07 19:33:52 +02:00
|
|
|
}
|
2018-08-09 15:04:38 +02:00
|
|
|
referrerStats.HandlePageview(p)
|
2018-05-07 19:33:52 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
}
|
|
|
|
|
2018-10-05 14:34:39 +02:00
|
|
|
// update stats
|
|
|
|
for _, site := range results.Sites {
|
2018-10-10 09:02:32 +02:00
|
|
|
if err := agg.database.SaveSiteStats(site); err != nil {
|
2018-10-05 14:34:39 +02:00
|
|
|
log.Error(err)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
for _, pageStats := range results.Pages {
|
2018-10-10 09:02:32 +02:00
|
|
|
if err := agg.database.SavePageStats(pageStats); err != nil {
|
2018-10-05 14:34:39 +02:00
|
|
|
log.Error(err)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
for _, referrerStats := range results.Referrers {
|
2018-10-10 09:02:32 +02:00
|
|
|
if err := agg.database.SaveReferrerStats(referrerStats); err != nil {
|
2018-10-05 14:34:39 +02:00
|
|
|
log.Error(err)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
// finally, remove pageviews that we just processed
|
2018-10-10 09:02:32 +02:00
|
|
|
if err := agg.database.DeletePageviews(pageviews); err != nil {
|
2018-10-05 14:34:39 +02:00
|
|
|
log.Error(err)
|
|
|
|
}
|
|
|
|
|
|
|
|
return n
|
2018-05-06 11:53:19 +02:00
|
|
|
}
|
2018-05-30 12:50:28 +02:00
|
|
|
|
|
|
|
func parseUrlParts(s string) (string, string, error) {
|
|
|
|
u, err := url.Parse(s)
|
|
|
|
if err != nil {
|
|
|
|
return "", "", err
|
|
|
|
}
|
|
|
|
|
|
|
|
return u.Scheme + "://" + u.Host, u.Path, nil
|
|
|
|
}
|