2022-12-16 15:26:01 -07:00
|
|
|
// Package stream provides a concurrent, ordered stream implementation.
|
2022-12-15 18:56:25 -07:00
|
|
|
package stream
|
2022-12-15 20:29:34 -07:00
|
|
|
|
|
|
|
|
import (
|
2022-12-16 13:11:41 -07:00
|
|
|
"sync"
|
|
|
|
|
|
2023-01-02 13:55:35 -07:00
|
|
|
"github.com/sourcegraph/conc"
|
2023-01-08 11:45:24 -08:00
|
|
|
"github.com/sourcegraph/conc/panics"
|
2023-01-02 13:55:35 -07:00
|
|
|
"github.com/sourcegraph/conc/pool"
|
2022-12-15 20:29:34 -07:00
|
|
|
)
|
|
|
|
|
|
2023-01-09 10:31:55 +03:00
|
|
|
// New creates a new Stream with default settings.
|
2022-12-16 09:40:59 -07:00
|
|
|
func New() *Stream {
|
|
|
|
|
return &Stream{
|
|
|
|
|
pool: *pool.New(),
|
2022-12-15 20:29:34 -07:00
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
2022-12-16 17:27:58 -07:00
|
|
|
// Stream is used to execute a stream of tasks concurrently while maintaining
|
2022-12-16 15:26:01 -07:00
|
|
|
// the order of the results.
|
|
|
|
|
//
|
2023-01-10 22:42:42 +03:00
|
|
|
// To use a stream, you submit some number of `Task`s, each of which
|
2022-12-16 15:26:01 -07:00
|
|
|
// return a callback. Each task will be executed concurrently in the stream's
|
2022-12-16 17:27:58 -07:00
|
|
|
// associated Pool, and the callbacks will be executed sequentially in the
|
2022-12-16 15:26:01 -07:00
|
|
|
// order the tasks were submitted.
|
2022-12-16 17:27:58 -07:00
|
|
|
//
|
|
|
|
|
// Once all your tasks have been submitted, Wait() must be called to clean up
|
|
|
|
|
// running goroutines and propagate any panics.
|
|
|
|
|
//
|
|
|
|
|
// In the case of panic during execution of a task or a callback, all other
|
|
|
|
|
// tasks and callbacks will still execute. The panic will be propagated to the
|
|
|
|
|
// caller when Wait() is called.
|
2023-01-02 12:18:43 -07:00
|
|
|
//
|
|
|
|
|
// A Stream is efficient, but not zero cost. It should not be used for very
|
|
|
|
|
// short tasks. Startup and teardown adds an overhead of a couple of
|
|
|
|
|
// microseconds, and the overhead for each task is roughly 500ns. It should be
|
|
|
|
|
// good enough for any task that requires a network call.
|
2022-12-15 20:29:34 -07:00
|
|
|
type Stream struct {
|
|
|
|
|
pool pool.Pool
|
|
|
|
|
callbackerHandle conc.WaitGroup
|
|
|
|
|
queue chan callbackCh
|
2022-12-16 09:40:59 -07:00
|
|
|
|
|
|
|
|
initOnce sync.Once
|
2022-12-15 20:29:34 -07:00
|
|
|
}
|
|
|
|
|
|
2023-01-09 10:33:53 +03:00
|
|
|
// Task is a task that is submitted to the stream. Submitted tasks will
|
2022-12-16 15:26:01 -07:00
|
|
|
// be executed concurrently. It returns a callback that will be called after
|
|
|
|
|
// the task has completed.
|
2023-01-09 10:33:53 +03:00
|
|
|
type Task func() Callback
|
2022-12-16 15:26:01 -07:00
|
|
|
|
2023-01-10 22:42:42 +03:00
|
|
|
// Callback is a function that is returned by a Task. Callbacks are
|
2022-12-16 15:26:01 -07:00
|
|
|
// called in the same order that tasks are submitted.
|
|
|
|
|
type Callback func()
|
|
|
|
|
|
|
|
|
|
// Go schedules a task to be run in the stream's pool. All submitted tasks
|
|
|
|
|
// will be executed concurrently in worker goroutines. Then, the callbacks
|
|
|
|
|
// returned by the tasks will be executed in the order that the tasks were
|
|
|
|
|
// submitted. All callbacks will be executed by the same goroutine, so no
|
2023-01-15 12:42:57 -07:00
|
|
|
// synchronization is necessary between callbacks. If all goroutines in the
|
|
|
|
|
// stream's pool are busy, a call to Go() will block until the task can be
|
|
|
|
|
// started.
|
2023-01-09 10:33:53 +03:00
|
|
|
func (s *Stream) Go(f Task) {
|
2023-01-02 11:18:42 -07:00
|
|
|
s.init()
|
2022-12-16 09:40:59 -07:00
|
|
|
|
2023-01-08 01:18:54 +03:00
|
|
|
// Get a channel from the cache.
|
2023-01-02 11:18:42 -07:00
|
|
|
ch := getCh()
|
2022-12-16 15:26:01 -07:00
|
|
|
|
2023-01-08 01:18:54 +03:00
|
|
|
// Queue the channel for the callbacker.
|
2022-12-15 20:29:34 -07:00
|
|
|
s.queue <- ch
|
2022-12-16 15:26:01 -07:00
|
|
|
|
2023-01-08 01:18:54 +03:00
|
|
|
// Submit the task for execution.
|
2022-12-16 09:40:59 -07:00
|
|
|
s.pool.Go(func() {
|
2022-12-16 17:27:58 -07:00
|
|
|
defer func() {
|
|
|
|
|
// In the case of a panic from f, we don't want the callbacker to
|
|
|
|
|
// starve waiting for a callback from this channel, so give it an
|
|
|
|
|
// empty callback.
|
|
|
|
|
if r := recover(); r != nil {
|
|
|
|
|
ch <- func() {}
|
|
|
|
|
panic(r)
|
|
|
|
|
}
|
|
|
|
|
}()
|
|
|
|
|
|
2023-01-08 01:18:54 +03:00
|
|
|
// Run the task, sending its callback down this task's channel.
|
2022-12-16 17:27:58 -07:00
|
|
|
callback := f()
|
|
|
|
|
ch <- callback
|
2022-12-15 20:29:34 -07:00
|
|
|
})
|
|
|
|
|
}
|
|
|
|
|
|
2022-12-16 15:26:01 -07:00
|
|
|
// Wait signals to the stream that all tasks have been submitted. Wait will
|
|
|
|
|
// not return until all tasks and callbacks have been run.
|
2022-12-15 20:29:34 -07:00
|
|
|
func (s *Stream) Wait() {
|
2023-01-02 11:18:42 -07:00
|
|
|
s.init()
|
|
|
|
|
|
2022-12-16 17:27:58 -07:00
|
|
|
// Defer the callbacker cleanup so that it occurs even in the case
|
2023-01-08 01:18:54 +03:00
|
|
|
// that one of the tasks panics and is propagated up by s.pool.Wait().
|
2022-12-16 17:27:58 -07:00
|
|
|
defer func() {
|
2023-01-02 11:18:42 -07:00
|
|
|
close(s.queue)
|
2022-12-16 17:27:58 -07:00
|
|
|
s.callbackerHandle.Wait()
|
|
|
|
|
}()
|
2022-12-16 09:40:59 -07:00
|
|
|
|
2023-01-08 01:18:54 +03:00
|
|
|
// Wait for all the workers to exit.
|
2022-12-15 20:29:34 -07:00
|
|
|
s.pool.Wait()
|
2022-12-16 17:27:58 -07:00
|
|
|
}
|
2022-12-16 15:26:01 -07:00
|
|
|
|
2022-12-16 17:27:58 -07:00
|
|
|
func (s *Stream) WithMaxGoroutines(n int) *Stream {
|
|
|
|
|
s.pool.WithMaxGoroutines(n)
|
|
|
|
|
return s
|
2022-12-15 20:29:34 -07:00
|
|
|
}
|
|
|
|
|
|
2022-12-16 09:40:59 -07:00
|
|
|
func (s *Stream) init() {
|
2023-01-02 11:18:42 -07:00
|
|
|
s.initOnce.Do(func() {
|
|
|
|
|
s.queue = make(chan callbackCh, s.pool.MaxGoroutines()+1)
|
2022-12-16 09:40:59 -07:00
|
|
|
|
2023-01-08 01:18:54 +03:00
|
|
|
// Start the callbacker.
|
2023-01-02 11:18:42 -07:00
|
|
|
s.callbackerHandle.Go(s.callbacker)
|
|
|
|
|
})
|
2022-12-16 09:40:59 -07:00
|
|
|
}
|
|
|
|
|
|
2022-12-16 17:27:58 -07:00
|
|
|
// callbacker is responsible for calling the returned callbacks in the order
|
|
|
|
|
// they were submitted. There is only a single instance of callbacker running.
|
2022-12-15 20:29:34 -07:00
|
|
|
func (s *Stream) callbacker() {
|
2023-01-08 11:45:24 -08:00
|
|
|
var panicCatcher panics.Catcher
|
2022-12-24 23:07:09 -07:00
|
|
|
defer panicCatcher.Repanic()
|
2022-12-16 17:27:58 -07:00
|
|
|
|
2022-12-16 15:26:01 -07:00
|
|
|
// For every scheduled task, read that tasks channel from the queue.
|
2022-12-15 20:29:34 -07:00
|
|
|
for callbackCh := range s.queue {
|
2023-01-08 01:18:54 +03:00
|
|
|
// Wait for the task to complete and get its callback from the channel.
|
2022-12-15 20:29:34 -07:00
|
|
|
callback := <-callbackCh
|
2022-12-16 15:26:01 -07:00
|
|
|
|
2023-01-08 01:18:54 +03:00
|
|
|
// Execute the callback (with panic protection).
|
2023-08-03 10:37:53 +08:00
|
|
|
if callback != nil {
|
|
|
|
|
panicCatcher.Try(callback)
|
|
|
|
|
}
|
2022-12-16 15:26:01 -07:00
|
|
|
|
2023-01-08 01:18:54 +03:00
|
|
|
// Return the channel to the pool of unused channels.
|
2023-01-02 11:18:42 -07:00
|
|
|
putCh(callbackCh)
|
2022-12-15 20:29:34 -07:00
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
type callbackCh chan func()
|
2023-01-02 11:19:05 -07:00
|
|
|
|
|
|
|
|
var callbackChPool = sync.Pool{
|
|
|
|
|
New: func() any {
|
|
|
|
|
return make(callbackCh, 1)
|
|
|
|
|
},
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func getCh() callbackCh {
|
|
|
|
|
return callbackChPool.Get().(callbackCh)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func putCh(ch callbackCh) {
|
|
|
|
|
callbackChPool.Put(ch)
|
|
|
|
|
}
|