Merge pull request #9 from tevino/refactor

Refactor
This commit is contained in:
Tevin 2019-02-12 16:40:31 +08:00 committed by GitHub
commit 32a4a549c2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 715 additions and 439 deletions

View File

@ -1,40 +1,46 @@
# TCP Checker :heartbeat:
[![Go Report Card](https://goreportcard.com/badge/github.com/tevino/tcp-shaker)](https://goreportcard.com/report/github.com/tevino/tcp-shaker)
[![GoDoc](https://godoc.org/github.com/tevino/tcp-shaker?status.svg)](https://godoc.org/github.com/tevino/tcp-shaker)
Performing TCP handshake without ACK, useful for health checking.
This package is used to perform TCP handshake without ACK, which useful for TCP health checking.
HAProxy does this exactly the same, which is:
- SYN
- SYN-ACK
- RST
1. SYN
2. SYN-ACK
3. RST
## Why do I have to do this?
Usually when you establish a TCP connection(e.g. `net.Dial`), these are the first three packets (TCP three-way handshake):
## Why do I have to do this
- Client -> Server: SYN
- Server -> Client: SYN-ACK
- Client -> Server: ACK
In most cases when you establish a TCP connection(e.g. via `net.Dial`), these are the first three packets between the client and server([TCP three-way handshake][tcp-handshake]):
1. Client -> Server: SYN
2. Server -> Client: SYN-ACK
3. Client -> Server: ACK
**This package tries to avoid the last ACK when doing handshakes.**
By sending the last ACK, the connection is considered established.
However as for TCP health checking the last ACK may not necessary.
However, as for TCP health checking the server could be considered alive right after it sends back SYN-ACK,
The Server could be considered alive after it sends back SYN-ACK.
that renders the last ACK unnecessary or even harmful in some cases.
### Benefits
By avoiding the last ACK
### Benefits of avoiding the last ACK:
1. Less packets better efficiency
2. The health checking is less obvious
The second one is essential, because it bothers server less.
The second one is essential because it bothers the server less.
Usually this means the server will not notice the health checking traffic at all, **thus the act of health checking will not be
considered as some misbehaviour of client.**
This means the application level server will not notice the health checking traffic at all, **thus the act of health checking will not be
considered as some misbehavior of client.**
## Requirements
## Requirements:
- Linux 2.4 or newer
There is a **fake implementation** for **non-Linux** platform which is equivalent to:
@ -44,30 +50,40 @@ conn.Close()
```
## Usage
```go
import "github.com/tevino/tcp-shaker"
c := tcp.NewChecker(true)
if err := c.InitChecker(); err != nil {
log.Fatal("Checker init failed:", err)
c := NewChecker()
ctx, stopChecker := context.WithCancel(context.Background())
defer stopChecker()
go func() {
if err := c.CheckingLoop(ctx); err != nil {
fmt.Println("checking loop stopped due to fatal error: ", err)
}
}()
<-c.WaitReady()
timeout := time.Second * 1
err := c.CheckAddr("google.com:80", timeout)
switch err {
case tcp.ErrTimeout:
case ErrTimeout:
fmt.Println("Connect to Google timed out")
case nil:
fmt.Println("Connect to Google succeeded")
default:
if e, ok := err.(*tcp.ErrConnect); ok {
fmt.Println("Connect to Google failed:", e)
} else {
fmt.Println("Error occurred while connecting:", err)
}
fmt.Println("Error occurred while connecting: ", err)
}
```
## TODO:
## TODO
- [ ] IPv6 support (Test environment needed, PRs are welcome)
## Special thanks to contributors
- @lujjjh
[tcp-handshake]: https://en.wikipedia.org/wiki/Handshaking#TCP_three-way_handshake

View File

@ -1,7 +1,9 @@
package main
import (
"context"
"flag"
"fmt"
"log"
"net"
"os"
@ -10,7 +12,7 @@ import (
"sync/atomic"
"time"
"github.com/tevino/tcp-shaker"
tcp "github.com/tevino/tcp-shaker"
)
// Counter is an atomic counter for multiple metrics.
@ -60,6 +62,7 @@ type Config struct {
Timeout time.Duration
Requests int
Concurrency int
Verbose bool
}
func parseConfig() *Config {
@ -70,6 +73,7 @@ func parseConfig() *Config {
flag.StringVar(&conf.Addr, "a", "google.com:80", "TCP address to test")
flag.IntVar(&conf.Requests, "n", 1, "Number of requests to perform")
flag.IntVar(&conf.Concurrency, "c", 1, "Number of checks to perform simultaneously")
flag.BoolVar(&conf.Verbose, "v", false, "Print more logs e.g. error detail")
// Parse flags
flag.Parse()
if _, err := net.ResolveTCPAddr("tcp", conf.Addr); err != nil {
@ -101,7 +105,7 @@ func NewConcurrentChecker(conf *Config) *ConcurrentChecker {
return &ConcurrentChecker{
conf: conf,
counter: NewCounter(CRequest, CSucceed, CErrConnect, CErrTimeout, CErrOther),
checker: tcp.NewChecker(true),
checker: tcp.NewChecker(),
queue: make(chan bool),
closed: make(chan bool),
}
@ -114,20 +118,29 @@ func (cc *ConcurrentChecker) Count(i int) uint64 {
}
// Launch initialize the checker.
func (cc *ConcurrentChecker) Launch() error {
if err := cc.checker.InitChecker(); err != nil {
return err
}
func (cc *ConcurrentChecker) Launch(ctx context.Context) error {
var err error
go func() {
err := cc.checker.CheckingLoop(ctx)
log.Fatal("Error during checking loop: ", err)
}()
for i := 0; i < cc.conf.Concurrency; i++ {
go cc.worker()
}
cc.wg.Add(cc.conf.Requests)
if cc.conf.Verbose {
fmt.Println("Waiting for checker to be ready")
}
<-cc.checker.WaitReady()
go func() {
for i := 0; i < cc.conf.Requests; i++ {
cc.queue <- true
}
}()
return nil
return err
}
func (cc *ConcurrentChecker) doCheck() {
@ -139,6 +152,9 @@ func (cc *ConcurrentChecker) doCheck() {
case nil:
cc.counter.Inc(CSucceed)
default:
if cc.conf.Verbose {
fmt.Println(err)
}
if _, ok := err.(*tcp.ErrConnect); ok {
cc.counter.Inc(CErrConnect)
} else {
@ -196,7 +212,8 @@ Concurrency: %d`, conf.Addr, conf.Timeout, conf.Requests, conf.Concurrency)
go setupSignal(exit)
startedAt := time.Now()
if err := checker.Launch(); err != nil {
var ctx, cancel = context.WithCancel(context.Background())
if err := checker.Launch(ctx); err != nil {
log.Fatal("Initializing failed: ", err)
}
select {
@ -205,6 +222,10 @@ Concurrency: %d`, conf.Addr, conf.Timeout, conf.Requests, conf.Concurrency)
}
duration := time.Now().Sub(startedAt)
if conf.Verbose {
log.Println("Canceling checking loop")
}
cancel()
log.Println("")
log.Printf("Finished %d/%d checks in %s\n", checker.Count(CRequest), conf.Requests, duration)

82
check_linux_test.go Normal file
View File

@ -0,0 +1,82 @@
package tcp
import (
"context"
"net/http"
"net/http/httptest"
"testing"
"time"
)
func TestCheckerReadyOK(t *testing.T) {
t.Parallel()
c := NewChecker()
assert(t, !c.IsReady())
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
go c.CheckingLoop(ctx)
select {
case <-time.After(time.Second):
t.FailNow()
case <-c.WaitReady():
}
}
func TestStopNStartChecker(t *testing.T) {
t.Parallel()
// Create checker
c := NewChecker()
// Start checker
ctx, cancel := context.WithCancel(context.Background())
loopStopped := make(chan bool)
go func() {
err := c.CheckingLoop(ctx)
assert(t, err == nil)
loopStopped <- true
}()
// Close the checker
cancel()
<-loopStopped
// Start the checker again
ctx, cancel = context.WithCancel(context.Background())
defer func() {
cancel()
<-loopStopped
}()
go func() {
err := c.CheckingLoop(ctx)
assert(t, err == nil)
loopStopped <- true
}()
// Ensure the check works
_testChecker(t, c)
}
func _testChecker(t *testing.T, c *Checker) {
select {
case <-c.WaitReady():
case <-time.After(time.Second):
}
timeout := time.Second * 2
// Check dead server
err := c.CheckAddr("127.0.0.1:1", timeout)
_, ok := err.(*ErrConnect)
assert(t, ok)
// Launch a server for test
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {}))
// Check alive server
err = c.CheckAddr(ts.Listener.Addr().String(), timeout)
assert(t, err == nil)
// Check non-routable address, thus timeout
err = c.CheckAddr("10.0.0.0:1", timeout)
assert(t, err == ErrTimeout)
}

View File

@ -1,47 +0,0 @@
// +build !linux
package tcp
import (
"net"
"time"
)
// Checker is a fake implementation.
type Checker struct {
zeroLinger bool
}
// NewChecker creates a Checker with linger set to zero or not.
func NewChecker(zeroLinger bool) *Checker {
return &Checker{zeroLinger: zeroLinger}
}
// InitChecker is unnecessary on this platform.
func (s *Checker) InitChecker() error { return nil }
// CheckAddr performs a TCP check with given TCP address and timeout.
// NOTE: zeroLinger is ignored on non-POSIX operating systems because
// net.TCPConn.SetLinger is only implemented in src/net/sockopt_posix.go.
func (s *Checker) CheckAddr(addr string, timeout time.Duration, zeroLinger ...bool) error {
conn, err := net.DialTimeout("tcp", addr, timeout)
if conn != nil {
if (len(zeroLinger) > 0 && zeroLinger[0]) || s.zeroLinger {
// Simply ignore the error since this is a fake implementation.
conn.(*net.TCPConn).SetLinger(0)
}
conn.Close()
}
if opErr, ok := err.(*net.OpError); ok {
if opErr.Timeout() {
return ErrTimeout
}
}
return err
}
// Ready is always true on this platform.
func (s *Checker) Ready() bool { return true }
// Close is unnecessary on this platform.
func (s *Checker) Close() error { return nil }

View File

@ -1,74 +1,138 @@
// +build linux
// Package tcp is used to perform TCP handshake without ACK,
// useful for health checking, HAProxy does this exactly the same.
// Which is SYN, SYN-ACK, RST.
//
// Why do I have to do this?
// Usually when you establish a TCP connection(e.g. net.Dial), these
// are the first three packets (TCP three-way handshake):
//
// SYN: Client -> Server
// SYN-ACK: Server -> Client
// ACK: Client -> Server
//
// This package tries to avoid the last ACK when doing handshakes.
//
// By sending the last ACK, the connection is considered established.
// However as for TCP health checking the last ACK may not necessary.
// The Server could be considered alive after it sends back SYN-ACK.
//
// Benefits of avoiding the last ACK:
//
// 1. Less packets better efficiency
//
// 2. The health checking is less obvious
//
// The second one is essential, because it bothers server less.
// Usually this means the server will not notice the health checking
// traffic at all, thus the act of health checking will not be
// considered as some misbehaviour of client.
//
// Checker's methods may be called by multiple goroutines simultaneously.
package tcp
import (
"os"
"runtime"
"context"
"sync"
"sync/atomic"
"syscall"
"time"
)
const maxEpollEvents = 32
"github.com/pkg/errors"
)
// Checker contains an epoll instance for TCP handshake checking
type Checker struct {
sync.RWMutex
epollFd int
pipePool
pollerLock sync.Mutex
_pollerFd int32
zeroLinger bool
fdResultPipes sync.Map
isReady chan struct{}
}
// NewChecker creates a Checker with linger set to zero or not
func NewChecker(zeroLinger bool) *Checker {
return &Checker{zeroLinger: zeroLinger}
// NewChecker creates a Checker with linger set to zero.
func NewChecker() *Checker {
return NewCheckerZeroLinger(true)
}
// InitChecker creates inner epoll instance, call this before anything else
func (s *Checker) InitChecker() error {
var err error
s.Lock()
defer s.Unlock()
// Check if we already initialized
if s.epollFd > 0 {
return nil
// NewCheckerZeroLinger creates a Checker with zeroLinger set to given value.
func NewCheckerZeroLinger(zeroLinger bool) *Checker {
return &Checker{
pipePool: newPipePool(),
_pollerFd: -1,
zeroLinger: zeroLinger,
isReady: make(chan struct{}),
}
// Create epoll instance
s.epollFd, err = syscall.EpollCreate1(0)
}
// CheckingLoop must be called before anything else.
// NOTE: this function blocks until ctx got canceled.
func (c *Checker) CheckingLoop(ctx context.Context) error {
pollerFd, err := c.createPoller()
if err != nil {
return os.NewSyscallError("epoll_create1", err)
return errors.Wrap(err, "error creating poller")
}
defer c.closePoller()
c.setReady()
defer c.resetReady()
return c.pollingLoop(ctx, pollerFd)
}
func (c *Checker) createPoller() (int, error) {
c.pollerLock.Lock()
defer c.pollerLock.Unlock()
if c.pollerFD() > 0 {
// return if already initialized
return -1, ErrCheckerAlreadyStarted
}
pollerFd, err := createPoller()
if err != nil {
return -1, err
}
c.setPollerFD(pollerFd)
return pollerFd, nil
}
func (c *Checker) closePoller() error {
c.pollerLock.Lock()
defer c.pollerLock.Unlock()
var err error
if c.pollerFD() > 0 {
err = syscall.Close(c.pollerFD())
}
c.setPollerFD(-1)
return err
}
func (c *Checker) setReady() {
close(c.isReady)
}
func (c *Checker) resetReady() {
c.isReady = make(chan struct{})
}
const pollerTimeout = time.Second
func (c *Checker) pollingLoop(ctx context.Context, pollerFd int) error {
for {
select {
case <-ctx.Done():
return nil
default:
evts, err := pollEvents(pollerFd, pollerTimeout)
if err != nil {
// fatal error
return errors.Wrap(err, "error during polling loop")
}
c.handlePollerEvents(evts)
}
}
}
func (c *Checker) handlePollerEvents(evts []event) {
for _, e := range evts {
if pipe, exists := c.popErrPipe(e.Fd); exists {
pipe <- e.Err
}
// error pipe not found
// in this case, e.Fd should have been handled in the previous event.
}
}
func (c *Checker) popErrPipe(fd int) (chan error, bool) {
p, exist := c.fdResultPipes.Load(fd)
if exist {
c.fdResultPipes.Delete(fd)
}
if p != nil {
return p.(chan error), exist
}
return nil, exist
}
func (c *Checker) pollerFD() int {
return int(atomic.LoadInt32(&c._pollerFd))
}
func (c *Checker) setPollerFD(fd int) {
atomic.StoreInt32(&c._pollerFd, int32(fd))
}
// CheckAddr performs a TCP check with given TCP address and timeout
@ -77,183 +141,88 @@ func (s *Checker) InitChecker() error {
// zeroLinger is an optional parameter indicating if linger should be set to zero
// for this particular connection
// Note: timeout includes domain resolving
func (s *Checker) CheckAddr(addr string, timeout time.Duration, zeroLinger ...bool) (err error) {
func (c *Checker) CheckAddr(addr string, timeout time.Duration) (err error) {
return c.CheckAddrZeroLinger(addr, timeout, c.zeroLinger)
}
func (c *Checker) CheckAddrZeroLinger(addr string, timeout time.Duration, zeroLinger bool) error {
// Set deadline
deadline := time.Now().Add(timeout)
// Parse address
var rAddr syscall.Sockaddr
if rAddr, err = parseSockAddr(addr); err != nil {
rAddr, err := parseSockAddr(addr)
if err != nil {
return err
}
// Create socket with options set
var fd int
if fd, err = s.createSocket(zeroLinger...); err != nil {
return
}
defer func() {
// Socket should be closed anyway
cErr := syscall.Close(fd)
// Error from close should be returned if no other error happened
if err == nil {
err = cErr
}
}()
var retry bool
// Connect to the address
for {
retry, err = s.doConnect(fd, rAddr)
if !retry {
break
}
time.Sleep(time.Millisecond) // TODO: Better idea?
// Check if the deadline was hit
if reached(deadline) {
return ErrTimeout
}
}
fd, err := createSocketZeroLinger(zeroLinger)
if err != nil {
return &ErrConnect{err}
}
// Check if the deadline was hit
if reached(deadline) {
return ErrTimeout
}
// Register to epoll for later error checking
if err = s.registerFd(fd); err != nil {
return
}
// Check for connect error
var succeed bool
var timeoutMS = int(timeout.Nanoseconds() / 1000000)
for {
succeed, err = s.waitForConnected(fd, timeoutMS)
// Check if the deadline was hit
if reached(deadline) {
return ErrTimeout
}
if succeed || err != nil {
break
}
}
return
}
// Ready returns a bool indicates whether the Checker is ready for use
func (s *Checker) Ready() bool {
s.RLock()
defer s.RUnlock()
return s.epollFd > 0
}
// Close closes the inner epoll fd
// InitChecker needs to be called before reuse of the closed Checker
func (s *Checker) Close() error {
s.Lock()
defer s.Unlock()
if s.epollFd > 0 {
err := syscall.Close(s.epollFd)
s.epollFd = 0
return err
}
// Socket should be closed anyway
defer syscall.Close(fd)
// Connect to the address
if success, cErr := connect(fd, rAddr); cErr != nil {
// If there was an error, return it.
return &ErrConnect{cErr}
} else if success {
// If the connect was successful, we are done.
return nil
}
// Otherwise wait for the result of connect.
return c.waitConnectResult(fd, deadline.Sub(time.Now()))
}
// EpollFd returns the inner fd of epoll instance
// NOTE: Use this only when you really know what you are doing
func (s *Checker) EpollFd() int {
s.RLock()
defer s.RUnlock()
return s.epollFd
}
func (c *Checker) waitConnectResult(fd int, timeout time.Duration) error {
// get a pipe of connect result
resultPipe := c.getPipe()
defer func() {
c.deregisterResultPipe(fd)
c.putBackPipe(resultPipe)
}()
// createSocket creats a socket with necessary options set
func (s *Checker) createSocket(zeroLinger ...bool) (fd int, err error) {
// Create socket
fd, err = createSocket()
// Set necessary options
if err == nil {
err = setSockOpts(fd)
}
// Set linger if zeroLinger or s.zeroLinger is on
if err == nil {
if (len(zeroLinger) > 0 && zeroLinger[0]) || s.zeroLinger {
err = setZeroLinger(fd)
}
}
return
}
// registerFd registers given fd to epoll with EPOLLOUT
func (s *Checker) registerFd(fd int) error {
var event syscall.EpollEvent
event.Events = syscall.EPOLLOUT
event.Fd = int32(fd)
s.RLock()
defer s.RUnlock()
if err := syscall.EpollCtl(s.epollFd, syscall.EPOLL_CTL_ADD, fd, &event); err != nil {
return os.NewSyscallError("epoll_ctl", err)
}
return nil
}
// waitForConnected waits for epoll event of given fd with given timeout
// The boolean returned indicates whether the previous connect is successful
func (s *Checker) waitForConnected(fd int, timeoutMS int) (bool, error) {
var events [maxEpollEvents]syscall.EpollEvent
s.RLock()
epollFd := s.epollFd
if epollFd <= 0 {
return false, ErrNotInitialized
}
s.RUnlock()
nevents, err := syscall.EpollWait(epollFd, events[:], timeoutMS)
if err != nil {
return false, os.NewSyscallError("epoll_wait", err)
// this must be done before registerEvents
c.registerResultPipe(fd, resultPipe)
// Register to epoll for later error checking
if err := registerEvents(c.pollerFD(), fd); err != nil {
return err
}
for ev := 0; ev < nevents; ev++ {
if int(events[ev].Fd) == fd {
errCode, err := syscall.GetsockoptInt(fd, syscall.SOL_SOCKET, syscall.SO_ERROR)
if err != nil {
return false, os.NewSyscallError("getsockopt", err)
}
if errCode != 0 {
return false, newErrConnect(errCode)
}
return true, nil
}
}
return false, nil
// Wait for connect result
return c.waitPipeTimeout(resultPipe, timeout)
}
// doConnect calls the connect syscall with error handled.
// NOTE: return value: needRetry, error
func (s *Checker) doConnect(fd int, addr syscall.Sockaddr) (bool, error) {
switch err := syscall.Connect(fd, addr); err {
case syscall.EINPROGRESS, syscall.EALREADY, syscall.EINTR:
// retry
return true, err
case nil, syscall.EISCONN:
// already connected
case syscall.EINVAL:
// On Solaris we can see EINVAL if the socket has
// already been accepted and closed by the server.
// Treat this as a successful connection--writes to
// the socket will see EOF. For details and a test
// case in C see https://golang.org/issue/6828.
if runtime.GOOS == "solaris" {
return false, nil
}
fallthrough
default:
return false, err
}
return false, nil
func (c *Checker) deregisterResultPipe(fd int) {
c.fdResultPipes.Delete(fd)
}
// reached tests if the given deadline was hit
func reached(deadline time.Time) bool {
return !deadline.IsZero() && deadline.Before(time.Now())
func (c *Checker) registerResultPipe(fd int, pipe chan error) {
// NOTE: the pipe should have been put back if c.fdResultPipes[fd] exists.
c.fdResultPipes.Store(fd, pipe)
}
func (c *Checker) waitPipeTimeout(pipe chan error, timeout time.Duration) error {
select {
case ret := <-pipe:
return ret
case <-time.After(timeout):
return ErrTimeout
}
}
// WaitReady returns a chan which is closed when the Checker is ready for use.
func (c *Checker) WaitReady() <-chan struct{} {
return c.isReady
}
// IsReady returns a bool indicates whether the Checker is ready for use
func (c *Checker) IsReady() bool {
return c.pollerFD() > 0
}
// PollerFd returns the inner fd of poller instance.
// NOTE: Use this only when you really know what you are doing.
func (c *Checker) PollerFd() int {
return c.pollerFD()
}

View File

@ -1,125 +0,0 @@
package tcp
import (
"fmt"
"log"
"net/http"
"net/http/httptest"
"runtime"
"sync"
"testing"
"time"
)
// assert calls t.Fatal if the result is false
func assert(t *testing.T, result bool) {
if !result {
_, fileName, line, _ := runtime.Caller(1)
t.Fatalf("Test failed: %s:%d", fileName, line)
}
}
func ExampleChecker() {
s := NewChecker(true)
if err := s.InitChecker(); err != nil {
log.Fatal("Checker init failed:", err)
}
timeout := time.Second * 1
err := s.CheckAddr("google.com:80", timeout)
switch err {
case ErrTimeout:
fmt.Println("Connect to Google timed out")
case nil:
fmt.Println("Connect to Google succeeded")
default:
fmt.Println("Error occurred while connecting:", err)
}
}
func TestCheckAddr(t *testing.T) {
var err error
// Create checker
s := NewChecker(true)
if err := s.InitChecker(); err != nil {
t.Fatal("Checker init failed:", err)
}
timeout := time.Second * 2
// Check dead server
err = s.CheckAddr("127.0.0.1:1", timeout)
_, ok := err.(*ErrConnect)
assert(t, ok)
// Launch a server for test
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {}))
// Check alive server
err = s.CheckAddr(ts.Listener.Addr().String(), timeout)
assert(t, err == nil)
ts.Close()
// Check non-routable address, thus timeout
err = s.CheckAddr("10.0.0.0:1", timeout)
assert(t, err == ErrTimeout)
}
func TestClose(t *testing.T) {
var err error
// Create checker
s := NewChecker(true)
assert(t, !s.Ready())
if err := s.InitChecker(); err != nil {
t.Fatal("Checker init failed:", err)
}
assert(t, s.Ready())
// Close the checker
err = s.Close()
assert(t, err == nil)
assert(t, !s.Ready())
// Init the checker again
err = s.InitChecker()
assert(t, err == nil)
assert(t, s.Ready())
timeout := time.Second * 2
// Check dead server
err = s.CheckAddr("127.0.0.1:1", timeout)
_, ok := err.(*ErrConnect)
assert(t, ok)
// Launch a server for test
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {}))
// Check alive server
err = s.CheckAddr(ts.Listener.Addr().String(), timeout)
assert(t, err == nil)
ts.Close()
// Check non-routable address, thus timeout
err = s.CheckAddr("10.0.0.0:1", timeout)
assert(t, err == ErrTimeout)
}
func TestCheckAddrConcurrently(t *testing.T) {
// Create checker
s := NewChecker(true)
if err := s.InitChecker(); err != nil {
t.Fatal("Checker init failed:", err)
}
var wg sync.WaitGroup
tasks := make(chan bool, 10)
worker := func() {
for range tasks {
if err := s.CheckAddr("10.0.0.0:1", time.Second); err == nil {
t.Fatal("Concurrent testing failed")
}
wg.Done()
}
}
for i := 0; i < 10; i++ {
go worker()
}
for i := 0; i < 1000; i++ {
tasks <- true
wg.Add(1)
}
wg.Wait()
close(tasks)
}

69
checker_nonlinux.go Normal file
View File

@ -0,0 +1,69 @@
// +build !linux
package tcp
import (
"context"
"net"
"time"
)
// Checker is a fake implementation.
type Checker struct {
zeroLinger bool
isReady chan struct{}
}
// NewChecker creates a Checker with linger set to zero.
func NewChecker() *Checker {
return NewCheckerZeroLinger(true)
}
// NewCheckerZeroLinger creates a Checker with zeroLinger set to given value.
func NewCheckerZeroLinger(zeroLinger bool) *Checker {
isReady := make(chan struct{})
close(isReady)
return &Checker{zeroLinger: zeroLinger, isReady: isReady}
}
// CheckingLoop is unnecessary on this platform.
func (c *Checker) CheckingLoop(ctx context.Context) error {
<-ctx.Done()
return nil
}
// CheckAddr performs a TCP check with given TCP address and timeout.
// NOTE: zeroLinger is ignored on non-POSIX operating systems because
// net.TCPConn.SetLinger is only implemented in src/net/sockopt_posix.go.
func (c *Checker) CheckAddr(addr string, timeout time.Duration) error {
return c.CheckAddrZeroLinger(addr, timeout, c.zeroLinger)
}
// CheckAddrZeroLinger is CheckerAddr with a zeroLinger parameter.
func (c *Checker) CheckAddrZeroLinger(addr string, timeout time.Duration, zeroLinger bool) error {
conn, err := net.DialTimeout("tcp", addr, timeout)
if conn != nil {
if zeroLinger {
// Simply ignore the error since this is a fake implementation.
conn.(*net.TCPConn).SetLinger(0)
}
conn.Close()
}
if opErr, ok := err.(*net.OpError); ok {
if opErr.Timeout() {
return ErrTimeout
}
}
return err
}
// IsReady is always true on this platform.
func (c *Checker) IsReady() bool { return true }
// WaitReady returns a closed chan on this platform.
func (c *Checker) WaitReady() <-chan struct{} {
return c.isReady
}
// Close is unnecessary on this platform.
func (c *Checker) Close() error { return nil }

101
checker_test.go Normal file
View File

@ -0,0 +1,101 @@
package tcp
import (
"context"
"fmt"
"net/http"
"net/http/httptest"
"runtime"
"sync"
"testing"
"time"
)
// assert calls t.Fatal if the result is false
func assert(t *testing.T, result bool) {
if !result {
_, fileName, line, _ := runtime.Caller(1)
t.Fatalf("Test failed: %s:%d", fileName, line)
}
}
func ExampleChecker() {
c := NewChecker()
ctx, stopChecker := context.WithCancel(context.Background())
defer stopChecker()
go func() {
if err := c.CheckingLoop(ctx); err != nil {
fmt.Println("checking loop stopped due to fatal error: ", err)
}
}()
<-c.WaitReady()
timeout := time.Second * 1
err := c.CheckAddr("google.com:80", timeout)
switch err {
case ErrTimeout:
fmt.Println("Connect to Google timed out")
case nil:
fmt.Println("Connect to Google succeeded")
default:
fmt.Println("Error occurred while connecting: ", err)
}
}
func TestCheckAddr(t *testing.T) {
t.Parallel()
var err error
// Create checker
c := NewChecker()
// Start checker
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
go c.CheckingLoop(ctx)
<-c.WaitReady()
timeout := time.Second * 2
// Check dead server
err = c.CheckAddr("127.0.0.1:1", timeout)
if runtime.GOOS == "linux" {
_, ok := err.(*ErrConnect)
assert(t, ok)
} else {
assert(t, err != nil)
}
// Launch a server for test
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {}))
// Check alive server
err = c.CheckAddr(ts.Listener.Addr().String(), timeout)
assert(t, err == nil)
ts.Close()
// Check non-routable address, thus timeout
err = c.CheckAddr("10.0.0.0:1", timeout)
assert(t, err == ErrTimeout)
}
func TestCheckAddrConcurrently(t *testing.T) {
// Create checker
c := NewChecker()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
go c.CheckingLoop(ctx)
var wg sync.WaitGroup
check := func() {
if err := c.CheckAddr("10.0.0.0:1", time.Millisecond*50); err == nil {
t.Fatal("Concurrent testing failed")
}
wg.Done()
}
for i := 0; i < 1000; i++ {
wg.Add(1)
go check()
}
wg.Wait()
}

34
doc.go Normal file
View File

@ -0,0 +1,34 @@
/*
Package tcp is used to perform TCP handshake without ACK, which useful for TCP health checking.
HAProxy does this exactly the same, which is:
1. SYN
2. SYN-ACK
3. RST
Why do I have to do this
In most cases when you establish a TCP connection(e.g. via net.Dial), these are the first three packets between the client and server(TCP three-way handshake):
1. Client -> Server: SYN
2. Server -> Client: SYN-ACK
3. Client -> Server: ACK
This package tries to avoid the last ACK when doing handshakes.
By sending the last ACK, the connection is considered established.
However, as for TCP health checking the server could be considered alive right after it sends back SYN-ACK, that renders the last ACK unnecessary or even harmful in some cases.
Benefits
By avoiding the last ACK
1. Less packets better efficiency
2. The health checking is less obvious
The second one is essential because it bothers the server less.
This means the application level server will not notice the health checking traffic at all, thus the act of health checking will not be considered as some misbehavior of client.
*/
package tcp

6
err.go
View File

@ -8,9 +8,6 @@ import (
// ErrTimeout indicates I/O timeout
var ErrTimeout = &timeoutError{}
// ErrNotInitialized occurs while the Checker is not initialized
var ErrNotInitialized = errors.New("not initialized")
type timeoutError struct{}
func (e *timeoutError) Error() string { return "I/O timeout" }
@ -27,3 +24,6 @@ type ErrConnect struct {
func newErrConnect(errCode int) *ErrConnect {
return &ErrConnect{syscall.Errno(errCode)}
}
// ErrCheckerAlreadyStarted indicates there is another instance of CheckingLoop running.
var ErrCheckerAlreadyStarted = errors.New("Checker was already started")

6
event.go Normal file
View File

@ -0,0 +1,6 @@
package tcp
type event struct {
Fd int
Err error
}

31
pipe_pool.go Normal file
View File

@ -0,0 +1,31 @@
package tcp
import "sync"
type pipePool struct {
pool sync.Pool
}
func newPipePool() pipePool {
return pipePool{sync.Pool{
New: func() interface{} {
return make(chan error, 1)
}},
}
}
func (p *pipePool) getPipe() chan error {
return p.pool.Get().(chan error)
}
func (p *pipePool) putBackPipe(pipe chan error) {
p.cleanPipe(pipe)
p.pool.Put(pipe)
}
func (p *pipePool) cleanPipe(pipe chan error) {
select {
case <-pipe:
default:
}
}

53
socket.go Normal file
View File

@ -0,0 +1,53 @@
package tcp
import (
"net"
"runtime"
"syscall"
)
// parseSockAddr resolves given addr to syscall.Sockaddr
func parseSockAddr(addr string) (syscall.Sockaddr, error) {
tAddr, err := net.ResolveTCPAddr("tcp", addr)
if err != nil {
return nil, err
}
var addr4 [4]byte
if tAddr.IP != nil {
copy(addr4[:], tAddr.IP.To4()) // copy last 4 bytes of slice to array
}
return &syscall.SockaddrInet4{Port: tAddr.Port, Addr: addr4}, nil
}
// connect calls the connect syscall with error handled.
func connect(fd int, addr syscall.Sockaddr) (success bool, err error) {
switch serr := syscall.Connect(fd, addr); serr {
case syscall.EALREADY, syscall.EINPROGRESS, syscall.EINTR:
// Connection could not be made immediately but asynchronously.
success = false
err = nil
case nil, syscall.EISCONN:
// The specified socket is already connected.
success = true
err = nil
case syscall.EINVAL:
// On Solaris we can see EINVAL if the socket has
// already been accepted and closed by the server.
// Treat this as a successful connection--writes to
// the socket will see EOF. For details and a test
// case in C see https://golang.org/issue/6828.
if runtime.GOOS == "solaris" {
success = true
err = nil
} else {
// error must be reported
success = false
err = serr
}
default:
// Connect error.
success = false
err = serr
}
return success, err
}

View File

@ -1,34 +1,50 @@
// +build linux
package tcp
import (
"net"
"fmt"
"os"
"syscall"
"time"
)
// parseSockAddr resolves given addr to syscall.Sockaddr
func parseSockAddr(addr string) (syscall.Sockaddr, error) {
tAddr, err := net.ResolveTCPAddr("tcp", addr)
const maxEpollEvents = 32
// createSocket creates a socket with necessary options set.
func createSocketZeroLinger(zeroLinger bool) (fd int, err error) {
// Create socket
fd, err = _createNonBlockingSocket()
if err == nil {
if zeroLinger {
err = _setZeroLinger(fd)
}
}
return
}
// createNonBlockingSocket creates a non-blocking socket with necessary options all set.
func _createNonBlockingSocket() (int, error) {
// Create socket
fd, err := _createSocket()
if err != nil {
return nil, err
return 0, err
}
var addr4 [4]byte
if tAddr.IP != nil {
copy(addr4[:], tAddr.IP.To4()) // copy last 4 bytes of slice to array
// Set necessary options
err = _setSockOpts(fd)
if err != nil {
syscall.Close(fd)
}
return &syscall.SockaddrInet4{Port: tAddr.Port, Addr: addr4}, nil
return fd, err
}
// createSocket creates a socket with CloseOnExec set
func createSocket() (int, error) {
func _createSocket() (int, error) {
fd, err := syscall.Socket(syscall.AF_INET, syscall.SOCK_STREAM, 0)
syscall.CloseOnExec(fd)
return fd, err
}
// setSockOpts sets SOCK_NONBLOCK and TCP_QUICKACK for given fd
func setSockOpts(fd int) error {
func _setSockOpts(fd int) error {
err := syscall.SetNonblock(fd, true)
if err != nil {
return err
@ -39,6 +55,56 @@ func setSockOpts(fd int) error {
var zeroLinger = syscall.Linger{Onoff: 1, Linger: 0}
// setLinger sets SO_Linger with 0 timeout to given fd
func setZeroLinger(fd int) error {
func _setZeroLinger(fd int) error {
return syscall.SetsockoptLinger(fd, syscall.SOL_SOCKET, syscall.SO_LINGER, &zeroLinger)
}
func createPoller() (fd int, err error) {
fd, err = syscall.EpollCreate1(syscall.EPOLL_CLOEXEC)
if err != nil {
err = os.NewSyscallError("epoll_create1", err)
}
return fd, err
}
const EPOLLET = 1 << 31
// registerEvents registers given fd with read and write events.
func registerEvents(pollerFd int, fd int) error {
var event syscall.EpollEvent
event.Events = syscall.EPOLLOUT | syscall.EPOLLIN | EPOLLET
event.Fd = int32(fd)
if err := syscall.EpollCtl(pollerFd, syscall.EPOLL_CTL_ADD, fd, &event); err != nil {
return os.NewSyscallError(fmt.Sprintf("epoll_ctl(%d, ADD, %d, ...)", pollerFd, fd), err)
}
return nil
}
func pollEvents(pollerFd int, timeout time.Duration) ([]event, error) {
var timeoutMS = int(timeout.Nanoseconds() / 1000000)
var epollEvents [maxEpollEvents]syscall.EpollEvent
nEvents, err := syscall.EpollWait(pollerFd, epollEvents[:], timeoutMS)
if err != nil {
if err == syscall.EINTR {
return nil, nil
}
return nil, os.NewSyscallError("epoll_wait", err)
}
var events = make([]event, 0, nEvents)
for i := 0; i < nEvents; i++ {
var fd = int(epollEvents[i].Fd)
var evt = event{Fd: fd, Err: nil}
errCode, err := syscall.GetsockoptInt(fd, syscall.SOL_SOCKET, syscall.SO_ERROR)
if err != nil {
evt.Err = os.NewSyscallError("getsockopt", err)
}
if errCode != 0 {
evt.Err = newErrConnect(errCode)
}
events = append(events, evt)
}
return events, nil
}