mirror of
https://github.com/status-im/fathom.git
synced 2025-03-01 03:20:27 +00:00
re-use buffer (to get rid of allocation) & separate inserts vs. updates early on in collect handler.
This commit is contained in:
parent
c09fda89a3
commit
064b5cb038
@ -2,18 +2,172 @@ package api
|
||||
|
||||
import (
|
||||
"encoding/base64"
|
||||
log "github.com/sirupsen/logrus"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
log "github.com/sirupsen/logrus"
|
||||
|
||||
"github.com/mssola/user_agent"
|
||||
"github.com/usefathom/fathom/pkg/aggregator"
|
||||
"github.com/usefathom/fathom/pkg/datastore"
|
||||
"github.com/usefathom/fathom/pkg/models"
|
||||
)
|
||||
|
||||
type Collector struct {
|
||||
Store datastore.Datastore
|
||||
Pageviews chan *models.Pageview
|
||||
|
||||
// buffer vars
|
||||
updates []*models.Pageview
|
||||
inserts []*models.Pageview
|
||||
sizeu int
|
||||
sizei int
|
||||
}
|
||||
|
||||
func NewCollector(store datastore.Datastore) *Collector {
|
||||
bufferCap := 512 // persist every 512 pageviews
|
||||
bufferTimeout := 600 * time.Millisecond // or every 600ms
|
||||
|
||||
c := &Collector{
|
||||
Store: store,
|
||||
Pageviews: make(chan *models.Pageview),
|
||||
updates: make([]*models.Pageview, bufferCap),
|
||||
inserts: make([]*models.Pageview, bufferCap),
|
||||
sizeu: 0,
|
||||
sizei: 0,
|
||||
}
|
||||
go c.aggregate()
|
||||
go c.worker(bufferCap, bufferTimeout)
|
||||
return c
|
||||
}
|
||||
|
||||
func (c *Collector) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
||||
if !shouldCollect(r) {
|
||||
return
|
||||
}
|
||||
|
||||
q := r.URL.Query()
|
||||
now := time.Now()
|
||||
|
||||
// get pageview details
|
||||
pageview := &models.Pageview{
|
||||
ID: q.Get("id"),
|
||||
Hostname: parseHostname(q.Get("h")),
|
||||
Pathname: parsePathname(q.Get("p")),
|
||||
IsNewVisitor: q.Get("nv") == "1",
|
||||
IsNewSession: q.Get("ns") == "1",
|
||||
IsUnique: q.Get("u") == "1",
|
||||
IsBounce: q.Get("b") != "0",
|
||||
Referrer: parseReferrer(q.Get("r")),
|
||||
Duration: 0,
|
||||
Timestamp: now,
|
||||
}
|
||||
|
||||
// push pageview onto channel to be inserted (in batch) later
|
||||
c.Pageviews <- pageview
|
||||
|
||||
// indicate that we're not tracking user data, see https://github.com/usefathom/fathom/issues/65
|
||||
w.Header().Set("Tk", "N")
|
||||
|
||||
// headers to prevent caching
|
||||
w.Header().Set("Content-Type", "image/gif")
|
||||
w.Header().Set("Expires", "Mon, 01 Jan 1990 00:00:00 GMT")
|
||||
w.Header().Set("Cache-Control", "no-cache, no-store, must-revalidate")
|
||||
w.Header().Set("Pragma", "no-cache")
|
||||
|
||||
// response, 1x1 px transparent GIF
|
||||
w.WriteHeader(http.StatusOK)
|
||||
b, _ := base64.StdEncoding.DecodeString("R0lGODlhAQABAIAAAAAAAP///yH5BAEAAAAALAAAAAABAAEAAAIBRAA7")
|
||||
w.Write(b)
|
||||
|
||||
// find previous pageview by same visitor
|
||||
previousPageviewID := q.Get("pid")
|
||||
if !pageview.IsNewSession && previousPageviewID != "" {
|
||||
previousPageview, err := c.Store.GetPageview(previousPageviewID)
|
||||
if err != nil && err != datastore.ErrNoResults {
|
||||
log.Errorf("error getting previous pageview: %s", err)
|
||||
return
|
||||
}
|
||||
|
||||
// if we have a recent pageview that is less than 30 minutes old
|
||||
if previousPageview != nil && previousPageview.Timestamp.After(now.Add(-30*time.Minute)) {
|
||||
previousPageview.Duration = (now.Unix() - previousPageview.Timestamp.Unix())
|
||||
previousPageview.IsBounce = false
|
||||
|
||||
// push onto channel to be updated (in batch) later
|
||||
c.Pageviews <- previousPageview
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (c *Collector) aggregate() {
|
||||
agg := aggregator.New(c.Store)
|
||||
timeout := 1 * time.Minute
|
||||
|
||||
agg.Run()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-time.After(timeout):
|
||||
agg.Run()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (c *Collector) worker(cap int, timeout time.Duration) {
|
||||
var size int
|
||||
|
||||
for {
|
||||
select {
|
||||
// persist pageviews in buffer when buffer at capacity
|
||||
case p := <-c.Pageviews:
|
||||
size = c.buffer(p)
|
||||
if size >= cap {
|
||||
c.persist()
|
||||
}
|
||||
|
||||
// or after timeout passed
|
||||
case <-time.After(timeout):
|
||||
c.persist()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (c *Collector) buffer(p *models.Pageview) int {
|
||||
// a bounce is always an insert
|
||||
if p.IsBounce {
|
||||
c.inserts[c.sizei] = p
|
||||
c.sizei++
|
||||
} else {
|
||||
c.updates[c.sizeu] = p
|
||||
c.sizeu++
|
||||
}
|
||||
|
||||
return (c.sizeu + c.sizei)
|
||||
}
|
||||
|
||||
func (c *Collector) persist() {
|
||||
if (c.sizeu + c.sizei) == 0 {
|
||||
return
|
||||
}
|
||||
|
||||
log.Debugf("persisting %d pageviews (%d inserts, %d updates)", (c.sizeu + c.sizei), c.sizei, c.sizeu)
|
||||
|
||||
if err := c.Store.InsertPageviews(c.inserts[0:c.sizei]); err != nil {
|
||||
log.Errorf("error inserting pageviews: %s", err)
|
||||
}
|
||||
|
||||
if err := c.Store.UpdatePageviews(c.updates[0:c.sizeu]); err != nil {
|
||||
log.Errorf("error updating pageviews: %s", err)
|
||||
}
|
||||
|
||||
// reset buffer
|
||||
c.sizei = 0
|
||||
c.sizeu = 0
|
||||
}
|
||||
|
||||
func shouldCollect(r *http.Request) bool {
|
||||
// abort if DNT header is set to "1" (these should have been filtered client-side already)
|
||||
if r.Header.Get("DNT") == "1" {
|
||||
@ -69,134 +223,3 @@ func parseHostname(r string) string {
|
||||
|
||||
return u.Scheme + "://" + u.Host
|
||||
}
|
||||
|
||||
func (api *API) NewCollectHandler() http.Handler {
|
||||
pageviews := make(chan *models.Pageview, 10)
|
||||
go aggregate(api.database)
|
||||
go collect(api.database, pageviews)
|
||||
|
||||
return HandlerFunc(func(w http.ResponseWriter, r *http.Request) error {
|
||||
if !shouldCollect(r) {
|
||||
return nil
|
||||
}
|
||||
|
||||
q := r.URL.Query()
|
||||
now := time.Now()
|
||||
|
||||
// get pageview details
|
||||
pageview := &models.Pageview{
|
||||
ID: q.Get("id"),
|
||||
Hostname: parseHostname(q.Get("h")),
|
||||
Pathname: parsePathname(q.Get("p")),
|
||||
IsNewVisitor: q.Get("nv") == "1",
|
||||
IsNewSession: q.Get("ns") == "1",
|
||||
IsUnique: q.Get("u") == "1",
|
||||
IsBounce: q.Get("b") != "0",
|
||||
Referrer: parseReferrer(q.Get("r")),
|
||||
Duration: 0,
|
||||
Timestamp: now,
|
||||
}
|
||||
|
||||
// find previous pageview by same visitor
|
||||
previousPageviewID := q.Get("pid")
|
||||
if !pageview.IsNewSession && previousPageviewID != "" {
|
||||
previousPageview, err := api.database.GetPageview(previousPageviewID)
|
||||
if err != nil && err != datastore.ErrNoResults {
|
||||
return err
|
||||
}
|
||||
|
||||
// if we have a recent pageview that is less than 30 minutes old
|
||||
if previousPageview != nil && previousPageview.Timestamp.After(now.Add(-30*time.Minute)) {
|
||||
previousPageview.Duration = (now.Unix() - previousPageview.Timestamp.Unix())
|
||||
previousPageview.IsBounce = false
|
||||
|
||||
// push onto channel to be updated (in batch) later
|
||||
pageviews <- previousPageview
|
||||
}
|
||||
}
|
||||
|
||||
// push pageview onto channel to be inserted (in batch) later
|
||||
pageviews <- pageview
|
||||
|
||||
// indicate that we're not tracking user data, see https://github.com/usefathom/fathom/issues/65
|
||||
w.Header().Set("Tk", "N")
|
||||
|
||||
// headers to prevent caching
|
||||
w.Header().Set("Content-Type", "image/gif")
|
||||
w.Header().Set("Expires", "Mon, 01 Jan 1990 00:00:00 GMT")
|
||||
w.Header().Set("Cache-Control", "no-cache, no-store, must-revalidate")
|
||||
w.Header().Set("Pragma", "no-cache")
|
||||
|
||||
// response
|
||||
w.WriteHeader(http.StatusOK)
|
||||
|
||||
// 1x1 px transparent GIF
|
||||
b, _ := base64.StdEncoding.DecodeString("R0lGODlhAQABAIAAAAAAAP///yH5BAEAAAAALAAAAAABAAEAAAIBRAA7")
|
||||
w.Write(b)
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
||||
// runs the aggregate func every minute
|
||||
func aggregate(db datastore.Datastore) {
|
||||
agg := aggregator.New(db)
|
||||
agg.Run()
|
||||
|
||||
timeout := 1 * time.Minute
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-time.After(timeout):
|
||||
agg.Run()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func collect(db datastore.Datastore, pageviews chan *models.Pageview) {
|
||||
var size = 800
|
||||
var timeout = 600 * time.Millisecond
|
||||
var buffer = make([]*models.Pageview, 0)
|
||||
|
||||
for {
|
||||
select {
|
||||
case pageview := <-pageviews:
|
||||
buffer = append(buffer, pageview)
|
||||
if len(buffer) >= size {
|
||||
persist(db, buffer)
|
||||
buffer = make([]*models.Pageview, 0)
|
||||
}
|
||||
case <-time.After(timeout):
|
||||
if len(buffer) > 0 {
|
||||
persist(db, buffer)
|
||||
buffer = make([]*models.Pageview, 0)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func persist(db datastore.Datastore, pageviews []*models.Pageview) {
|
||||
n := len(pageviews)
|
||||
updates := make([]*models.Pageview, 0, n)
|
||||
inserts := make([]*models.Pageview, 0, n)
|
||||
|
||||
for i := range pageviews {
|
||||
if !pageviews[i].IsBounce {
|
||||
updates = append(updates, pageviews[i])
|
||||
} else {
|
||||
inserts = append(inserts, pageviews[i])
|
||||
}
|
||||
}
|
||||
|
||||
log.Debugf("persisting %d pageviews (%d inserts, %d updates)", len(pageviews), len(inserts), len(updates))
|
||||
|
||||
var err error
|
||||
err = db.InsertPageviews(inserts)
|
||||
if err != nil {
|
||||
log.Errorf("error inserting pageviews: %s", err)
|
||||
}
|
||||
|
||||
err = db.UpdatePageviews(updates)
|
||||
if err != nil {
|
||||
log.Errorf("error updating pageviews: %s", err)
|
||||
}
|
||||
}
|
||||
|
@ -1,15 +1,16 @@
|
||||
package api
|
||||
|
||||
import (
|
||||
"net/http"
|
||||
|
||||
"github.com/gobuffalo/packr"
|
||||
"github.com/gorilla/mux"
|
||||
"net/http"
|
||||
)
|
||||
|
||||
func (api *API) Routes() *mux.Router {
|
||||
// register routes
|
||||
r := mux.NewRouter()
|
||||
r.Handle("/collect", api.NewCollectHandler()).Methods(http.MethodGet)
|
||||
r.Handle("/collect", NewCollector(api.database)).Methods(http.MethodGet)
|
||||
r.Handle("/api/session", HandlerFunc(api.LoginHandler)).Methods(http.MethodPost)
|
||||
r.Handle("/api/session", HandlerFunc(api.LogoutHandler)).Methods(http.MethodDelete)
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user