mirror of
https://github.com/status-im/fathom.git
synced 2025-03-01 11:30:28 +00:00
implement results type container
This commit is contained in:
parent
e5fcb0a34e
commit
69fc3e7aa1
@ -1,4 +1,4 @@
|
|||||||
package counter
|
package aggregator
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"time"
|
"time"
|
||||||
@ -9,39 +9,61 @@ import (
|
|||||||
log "github.com/sirupsen/logrus"
|
log "github.com/sirupsen/logrus"
|
||||||
)
|
)
|
||||||
|
|
||||||
func Aggregate() error {
|
func Run() {
|
||||||
// Get unprocessed pageviews
|
// Get unprocessed pageviews
|
||||||
pageviews, err := datastore.GetProcessablePageviews()
|
pageviews, err := datastore.GetProcessablePageviews()
|
||||||
if err != nil && err != datastore.ErrNoResults {
|
if err != nil && err != datastore.ErrNoResults {
|
||||||
log.Error(err)
|
log.Error(err)
|
||||||
return err
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
// Do we have anything to process?
|
// Do we have anything to process?
|
||||||
if len(pageviews) == 0 {
|
if len(pageviews) == 0 {
|
||||||
return nil
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
sites := map[string]*models.SiteStats{}
|
results := Process(pageviews)
|
||||||
pages := map[string]*models.PageStats{}
|
|
||||||
referrers := map[string]*models.ReferrerStats{}
|
// update stats
|
||||||
|
for _, site := range results.Sites {
|
||||||
|
err = datastore.UpdateSiteStats(site)
|
||||||
|
if err != nil {
|
||||||
|
log.Error(err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, pageStats := range results.Pages {
|
||||||
|
err = datastore.UpdatePageStats(pageStats)
|
||||||
|
if err != nil {
|
||||||
|
log.Error(err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, referrerStats := range results.Referrers {
|
||||||
|
err = datastore.UpdateReferrerStats(referrerStats)
|
||||||
|
if err != nil {
|
||||||
|
log.Error(err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// finally, remove pageviews that we just processed
|
||||||
|
err = datastore.DeletePageviews(pageviews)
|
||||||
|
if err != nil {
|
||||||
|
log.Error(err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func Process(pageviews []*models.Pageview) *Results {
|
||||||
|
log.Debugf("processing %d pageviews", len(pageviews))
|
||||||
|
results := NewResults()
|
||||||
|
|
||||||
for _, p := range pageviews {
|
for _, p := range pageviews {
|
||||||
date := p.Timestamp.Format("2006-01-02")
|
site, err := results.GetSiteStats(p.Timestamp)
|
||||||
|
if err != nil {
|
||||||
var site *models.SiteStats
|
log.Error(err)
|
||||||
var ok bool
|
continue
|
||||||
if site, ok = sites[date]; !ok {
|
|
||||||
site, err = getSiteStats(p.Timestamp)
|
|
||||||
if err != nil {
|
|
||||||
log.Error(err)
|
|
||||||
continue // TODO: Pageview should not be deleted if this happens
|
|
||||||
}
|
|
||||||
|
|
||||||
sites[date] = site
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// site stats
|
|
||||||
site.Pageviews += 1
|
site.Pageviews += 1
|
||||||
|
|
||||||
// TODO: Weight isn't right here because we need the number of pageview with a known time of page, not all pageviews
|
// TODO: Weight isn't right here because we need the number of pageview with a known time of page, not all pageviews
|
||||||
@ -63,15 +85,10 @@ func Aggregate() error {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// page stats
|
pageStats, err := results.GetPageStats(p.Timestamp, p.Hostname, p.Pathname)
|
||||||
var pageStats *models.PageStats
|
if err != nil {
|
||||||
if pageStats, ok = pages[date+p.Pathname]; !ok {
|
log.Error(err)
|
||||||
pageStats, err = getPageStats(p.Timestamp, p.Hostname, p.Pathname)
|
continue
|
||||||
if err != nil {
|
|
||||||
log.Error(err)
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
pages[date+p.Hostname+p.Pathname] = pageStats
|
|
||||||
}
|
}
|
||||||
|
|
||||||
pageStats.Pageviews += 1
|
pageStats.Pageviews += 1
|
||||||
@ -95,15 +112,10 @@ func Aggregate() error {
|
|||||||
|
|
||||||
// referrer stats
|
// referrer stats
|
||||||
if p.Referrer != "" {
|
if p.Referrer != "" {
|
||||||
var referrerStats *models.ReferrerStats
|
referrerStats, err := results.GetReferrerStats(p.Timestamp, p.Referrer)
|
||||||
var ok bool
|
if err != nil {
|
||||||
if referrerStats, ok = referrers[date+p.Referrer]; !ok {
|
log.Error(err)
|
||||||
referrerStats, err = getReferrerStats(p.Timestamp, p.Referrer)
|
continue
|
||||||
if err != nil {
|
|
||||||
log.Error(err)
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
referrers[date+p.Referrer] = referrerStats
|
|
||||||
}
|
}
|
||||||
|
|
||||||
referrerStats.Pageviews += 1
|
referrerStats.Pageviews += 1
|
||||||
@ -126,41 +138,11 @@ func Aggregate() error {
|
|||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// update stats
|
return results
|
||||||
for _, site := range sites {
|
|
||||||
err = datastore.UpdateSiteStats(site)
|
|
||||||
if err != nil {
|
|
||||||
log.Error(err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
for _, pageStats := range pages {
|
|
||||||
err = datastore.UpdatePageStats(pageStats)
|
|
||||||
if err != nil {
|
|
||||||
log.Error(err)
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
for _, referrerStats := range referrers {
|
|
||||||
err = datastore.UpdateReferrerStats(referrerStats)
|
|
||||||
if err != nil {
|
|
||||||
log.Error(err)
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// finally, remove pageviews that we just processed
|
|
||||||
err = datastore.DeletePageviews(pageviews)
|
|
||||||
if err != nil {
|
|
||||||
log.Error(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func getSiteStats(date time.Time) (*models.SiteStats, error) {
|
func getSiteStats(t time.Time) (*models.SiteStats, error) {
|
||||||
stats, err := datastore.GetSiteStats(date)
|
stats, err := datastore.GetSiteStats(t)
|
||||||
if err != nil && err != datastore.ErrNoResults {
|
if err != nil && err != datastore.ErrNoResults {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
@ -170,7 +152,7 @@ func getSiteStats(date time.Time) (*models.SiteStats, error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
stats = &models.SiteStats{
|
stats = &models.SiteStats{
|
||||||
Date: date,
|
Date: t,
|
||||||
}
|
}
|
||||||
err = datastore.InsertSiteStats(stats)
|
err = datastore.InsertSiteStats(stats)
|
||||||
return stats, err
|
return stats, err
|
9
pkg/aggregator/aggregator_test.go
Normal file
9
pkg/aggregator/aggregator_test.go
Normal file
@ -0,0 +1,9 @@
|
|||||||
|
package aggregator
|
||||||
|
|
||||||
|
import (
|
||||||
|
"testing"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestProcess(t *testing.T) {
|
||||||
|
|
||||||
|
}
|
70
pkg/aggregator/result.go
Normal file
70
pkg/aggregator/result.go
Normal file
@ -0,0 +1,70 @@
|
|||||||
|
package aggregator
|
||||||
|
|
||||||
|
import (
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/usefathom/fathom/pkg/models"
|
||||||
|
)
|
||||||
|
|
||||||
|
type Results struct {
|
||||||
|
Sites map[string]*models.SiteStats
|
||||||
|
Pages map[string]*models.PageStats
|
||||||
|
Referrers map[string]*models.ReferrerStats
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewResults() *Results {
|
||||||
|
return &Results{
|
||||||
|
Sites: map[string]*models.SiteStats{},
|
||||||
|
Pages: map[string]*models.PageStats{},
|
||||||
|
Referrers: map[string]*models.ReferrerStats{},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *Results) GetSiteStats(t time.Time) (*models.SiteStats, error) {
|
||||||
|
var stats *models.SiteStats
|
||||||
|
var ok bool
|
||||||
|
var err error
|
||||||
|
|
||||||
|
date := t.Format("2006-01-02")
|
||||||
|
if stats, ok = r.Sites[date]; !ok {
|
||||||
|
stats, err = getSiteStats(t)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
r.Sites[date] = stats
|
||||||
|
}
|
||||||
|
return stats, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *Results) GetPageStats(t time.Time, hostname string, pathname string) (*models.PageStats, error) {
|
||||||
|
var stats *models.PageStats
|
||||||
|
var ok bool
|
||||||
|
var err error
|
||||||
|
|
||||||
|
date := t.Format("2006-01-02")
|
||||||
|
if stats, ok = r.Pages[date+hostname+pathname]; !ok {
|
||||||
|
stats, err = getPageStats(t, hostname, pathname)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
r.Pages[date+hostname+pathname] = stats
|
||||||
|
}
|
||||||
|
return stats, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *Results) GetReferrerStats(t time.Time, referrer string) (*models.ReferrerStats, error) {
|
||||||
|
var stats *models.ReferrerStats
|
||||||
|
var ok bool
|
||||||
|
var err error
|
||||||
|
|
||||||
|
date := t.Format("2006-01-02")
|
||||||
|
if stats, ok = r.Referrers[date+referrer]; !ok {
|
||||||
|
stats, err = getReferrerStats(t, referrer)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
r.Referrers[date+referrer] = stats
|
||||||
|
}
|
||||||
|
|
||||||
|
return stats, nil
|
||||||
|
}
|
@ -8,7 +8,7 @@ import (
|
|||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/mssola/user_agent"
|
"github.com/mssola/user_agent"
|
||||||
"github.com/usefathom/fathom/pkg/counter"
|
"github.com/usefathom/fathom/pkg/aggregator"
|
||||||
"github.com/usefathom/fathom/pkg/datastore"
|
"github.com/usefathom/fathom/pkg/datastore"
|
||||||
"github.com/usefathom/fathom/pkg/models"
|
"github.com/usefathom/fathom/pkg/models"
|
||||||
)
|
)
|
||||||
@ -98,13 +98,13 @@ func NewCollectHandler() http.Handler {
|
|||||||
|
|
||||||
// runs the aggregate func every minute
|
// runs the aggregate func every minute
|
||||||
func aggregate() {
|
func aggregate() {
|
||||||
counter.Aggregate()
|
aggregator.Run()
|
||||||
timeout := 1 * time.Minute
|
timeout := 1 * time.Minute
|
||||||
|
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case <-time.After(timeout):
|
case <-time.After(timeout):
|
||||||
counter.Aggregate()
|
aggregator.Run()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user