IvanBelyakoff cb9f2dd7d2
feat(wallet): refactor transfers to allow multiple transfer fetching (#3432)
strategies.

On mobile devices we might not want to load all transfers history
to avoid high internet traffic consumption or to save some space.
On desktop we will load all transfers in background.

- Moved current fetching transfers implementation to
OnDemandFetchStrategy
- Implemented QueuedAtomicGroup to cache outgoing commands above a
threshold
- Refactored some type names, methods for better clarity:
  - LastKnownBlock name was misused
  - Block renamed to BlockDAO to clarify what it does and avoid
confusion with geth Block type
- Cleanup - removed unused parameters and types' members
- Changed tests to use in memory DB for speedup
- Added more logging and comments

Updates #10246
2023-05-08 08:02:00 +02:00

222 lines
3.8 KiB
Go

package async
import (
"context"
"sync"
"time"
)
type Command func(context.Context) error
type Commander interface {
Command() Command
}
// FiniteCommand terminates when error is nil.
type FiniteCommand struct {
Interval time.Duration
Runable func(context.Context) error
}
func (c FiniteCommand) Run(ctx context.Context) error {
err := c.Runable(ctx)
if err == nil {
return nil
}
ticker := time.NewTicker(c.Interval)
for {
select {
case <-ctx.Done():
return ctx.Err()
case <-ticker.C:
err := c.Runable(ctx)
if err == nil {
return nil
}
}
}
}
// InfiniteCommand runs until context is closed.
type InfiniteCommand struct {
Interval time.Duration
Runable func(context.Context) error
}
func (c InfiniteCommand) Run(ctx context.Context) error {
_ = c.Runable(ctx)
ticker := time.NewTicker(c.Interval)
for {
select {
case <-ctx.Done():
return ctx.Err()
case <-ticker.C:
_ = c.Runable(ctx)
}
}
}
func NewGroup(parent context.Context) *Group {
ctx, cancel := context.WithCancel(parent)
return &Group{
ctx: ctx,
cancel: cancel,
}
}
type Group struct {
ctx context.Context
cancel func()
wg sync.WaitGroup
}
func (g *Group) Add(cmd Command) {
g.wg.Add(1)
go func() {
_ = cmd(g.ctx)
g.wg.Done()
}()
}
func (g *Group) Stop() {
g.cancel()
}
func (g *Group) Wait() {
g.wg.Wait()
}
func (g *Group) WaitAsync() <-chan struct{} {
ch := make(chan struct{})
go func() {
g.Wait()
close(ch)
}()
return ch
}
func NewAtomicGroup(parent context.Context) *AtomicGroup {
ctx, cancel := context.WithCancel(parent)
ag := &AtomicGroup{ctx: ctx, cancel: cancel}
ag.done = ag.onFinish
return ag
}
// AtomicGroup terminates as soon as first goroutine terminates..
type AtomicGroup struct {
ctx context.Context
cancel func()
done func()
wg sync.WaitGroup
mu sync.Mutex
error error
}
// Go spawns function in a goroutine and stores results or errors.
func (d *AtomicGroup) Add(cmd Command) {
d.wg.Add(1)
go func() {
defer d.done()
err := cmd(d.ctx)
d.mu.Lock()
defer d.mu.Unlock()
if err != nil {
// do not overwrite original error by context errors
if d.error != nil {
return
}
d.error = err
d.cancel()
return
}
}()
}
// Wait for all downloaders to finish.
func (d *AtomicGroup) Wait() {
d.wg.Wait()
if d.Error() == nil {
d.mu.Lock()
defer d.mu.Unlock()
d.cancel()
}
}
func (d *AtomicGroup) WaitAsync() <-chan struct{} {
ch := make(chan struct{})
go func() {
d.Wait()
close(ch)
}()
return ch
}
// Error stores an error that was reported by any of the downloader. Should be called after Wait.
func (d *AtomicGroup) Error() error {
d.mu.Lock()
defer d.mu.Unlock()
return d.error
}
func (d *AtomicGroup) Stop() {
d.cancel()
}
func (d *AtomicGroup) onFinish() {
d.wg.Done()
}
func NewQueuedAtomicGroup(parent context.Context, limit uint32) *QueuedAtomicGroup {
qag := &QueuedAtomicGroup{NewAtomicGroup(parent), limit, 0, []Command{}, sync.Mutex{}}
baseDoneFunc := qag.done // save original done function
qag.AtomicGroup.done = func() {
baseDoneFunc()
qag.onFinish()
}
return qag
}
type QueuedAtomicGroup struct {
*AtomicGroup
limit uint32
count uint32
pendingCmds []Command
mu sync.Mutex
}
func (d *QueuedAtomicGroup) Add(cmd Command) {
d.mu.Lock()
if d.limit > 0 && d.count >= d.limit {
d.pendingCmds = append(d.pendingCmds, cmd)
d.mu.Unlock()
return
}
d.mu.Unlock()
d.run(cmd)
}
func (d *QueuedAtomicGroup) run(cmd Command) {
d.mu.Lock()
d.count++
d.mu.Unlock()
d.AtomicGroup.Add(cmd)
}
func (d *QueuedAtomicGroup) onFinish() {
d.mu.Lock()
d.count--
if d.count < d.limit && len(d.pendingCmds) > 0 {
cmd := d.pendingCmds[0]
d.pendingCmds = d.pendingCmds[1:]
d.mu.Unlock()
d.run(cmd)
return
}
d.mu.Unlock()
}