Dmitry eeca435064 Add rendezvous implementation for discovery interface
Update vendor

Integrate rendezvous into status node

Add a test with failover using rendezvous

Use multiple servers in client

Use discovery V5 by default and test that node can be started with rendezvous discovet

Fix linter

Update rendezvous client to one with instrumented stream

Address feedback

Fix test with updated topic limits

Apply several suggestions

Change log to debug for request errors because we continue execution

Remove web3js after rebase

Update rendezvous package
2018-07-25 15:10:57 +03:00

203 lines
3.7 KiB
Go

// +build linux
package singlepoll
import (
"context"
"errors"
"golang.org/x/sys/unix"
"sync"
"github.com/gxed/eventfd"
logging "github.com/ipfs/go-log"
)
var (
ErrUnsupportedMode error = errors.New("only 'w' and 'r' modes are supported on this arch")
)
var (
initOnce sync.Once
workChan chan interface{}
log logging.EventLogger = logging.Logger("reuseport-poll")
)
type addPoll struct {
fd int
events uint32
ctx context.Context
wakeUp chan<- error
}
type ctxDone struct {
fd int
}
func PollPark(reqctx context.Context, fd int, mode string) error {
initOnce.Do(func() {
workChan = make(chan interface{}, 128)
go worker()
})
events := uint32(0)
for _, c := range mode {
switch c {
case 'w':
events |= unix.EPOLLOUT
case 'r':
events |= unix.EPOLLIN
default:
return ErrUnsupportedMode
}
}
wakeUp := make(chan error)
workChan <- addPoll{
fd: fd,
events: events,
ctx: reqctx,
wakeUp: wakeUp,
}
return <-wakeUp
}
func criticalError(msg string, err error) {
log.Errorf("%s: %s.", msg, err.Error())
log.Errorf("This is critical error, please report it at https://github.com/libp2p/go-reuseport/issues/new")
log.Errorf("Bailing out. You are on your own. Good luck.")
for {
select {
case <-backgroundctx.Done():
return
case unit := <-workChan:
switch u := unit.(type) {
case addPoll:
u.wakeUp <- err
default:
}
}
}
}
func worker() {
epfd, err := unix.EpollCreate1(0)
if err != nil {
criticalError("EpollCreate1(0) failed", err)
}
evfd, err := eventfd.New()
if err != nil {
criticalError("eventfd.New() failed", err)
}
pool := make(map[int]addPoll)
{
event := unix.EpollEvent{
Events: unix.EPOLLIN,
Fd: int32(evfd.Fd()),
}
unix.EpollCtl(epfd, unix.EPOLL_CTL_ADD, evfd.Fd(), &event)
}
go poller(epfd, evfd)
remove := func(fd int) *addPoll {
unit, ok := pool[fd]
if ok {
unix.EpollCtl(epfd, unix.EPOLL_CTL_DEL, unit.fd, nil)
delete(pool, fd)
close(unit.wakeUp)
}
return &unit
}
for {
select {
case <-backgroundctx.Done():
evfd.WriteEvents(1)
return
case unit := <-workChan:
switch u := unit.(type) {
case addPoll:
event := unix.EpollEvent{
Events: u.events | unix.EPOLLONESHOT,
Fd: int32(u.fd),
}
// Make copies for *I* before we add it to Epoll group
wrapWakeUp := make(chan error)
wakeUp := u.wakeUp
u.wakeUp = wrapWakeUp
if _, ok := pool[u.fd]; ok {
panic("duplicate fd") // safe guard against bad close calls
}
pool[u.fd] = u
err := unix.EpollCtl(epfd, unix.EPOLL_CTL_ADD, u.fd, &event)
if err != nil {
delete(pool, u.fd)
u.wakeUp <- err
}
// *I*
reqCtx := u.ctx
fd := u.fd
go func() {
select {
case err := <-wrapWakeUp:
wakeUp <- err
case <-reqCtx.Done():
workChan <- ctxDone{
fd: fd,
}
<-wrapWakeUp
wakeUp <- reqCtx.Err()
}
}()
case []unix.EpollEvent:
for _, event := range u {
remove(int(event.Fd))
}
case ctxDone:
remove(u.fd)
}
}
}
}
func poller(epfd int, evfd *eventfd.EventFD) {
for {
// do not reuse the array as we will be passing it over channel
// 128 is quite arbitrary
// to small and number of EpollWait calls would increase
// to big and GC overhead increases
events := make([]unix.EpollEvent, 128)
n, err := unix.EpollWait(epfd, events, -1)
switch err {
case nil:
// everything is great
case unix.EINTR:
// ignore
continue
default:
// log
log.Errorf("EpollWait returned error: %s. Continuing.", err.Error())
continue
}
for i := 0; i < n; i++ {
if int(events[i].Fd) == evfd.Fd() {
unix.Close(epfd)
evfd.Close()
return
}
}
workChan <- events[:n]
}
}