SIGN IN SIGN UP
sourcegraph / conc UNCLAIMED

Better structured concurrency for go

0 0 1 Go
2022-12-16 15:26:01 -07:00
// Package stream provides a concurrent, ordered stream implementation.
2022-12-15 18:56:25 -07:00
package stream
2022-12-15 20:29:34 -07:00
import (
2022-12-16 13:11:41 -07:00
"sync"
2023-01-02 13:55:35 -07:00
"github.com/sourcegraph/conc"
"github.com/sourcegraph/conc/panics"
2023-01-02 13:55:35 -07:00
"github.com/sourcegraph/conc/pool"
2022-12-15 20:29:34 -07:00
)
2023-01-09 10:31:55 +03:00
// New creates a new Stream with default settings.
2022-12-16 09:40:59 -07:00
func New() *Stream {
return &Stream{
pool: *pool.New(),
2022-12-15 20:29:34 -07:00
}
}
2022-12-16 17:27:58 -07:00
// Stream is used to execute a stream of tasks concurrently while maintaining
2022-12-16 15:26:01 -07:00
// the order of the results.
//
// To use a stream, you submit some number of `Task`s, each of which
2022-12-16 15:26:01 -07:00
// return a callback. Each task will be executed concurrently in the stream's
2022-12-16 17:27:58 -07:00
// associated Pool, and the callbacks will be executed sequentially in the
2022-12-16 15:26:01 -07:00
// order the tasks were submitted.
2022-12-16 17:27:58 -07:00
//
// Once all your tasks have been submitted, Wait() must be called to clean up
// running goroutines and propagate any panics.
//
// In the case of panic during execution of a task or a callback, all other
// tasks and callbacks will still execute. The panic will be propagated to the
// caller when Wait() is called.
2023-01-02 12:18:43 -07:00
//
// A Stream is efficient, but not zero cost. It should not be used for very
// short tasks. Startup and teardown adds an overhead of a couple of
// microseconds, and the overhead for each task is roughly 500ns. It should be
// good enough for any task that requires a network call.
2022-12-15 20:29:34 -07:00
type Stream struct {
pool pool.Pool
callbackerHandle conc.WaitGroup
queue chan callbackCh
2022-12-16 09:40:59 -07:00
initOnce sync.Once
2022-12-15 20:29:34 -07:00
}
2023-01-09 10:33:53 +03:00
// Task is a task that is submitted to the stream. Submitted tasks will
2022-12-16 15:26:01 -07:00
// be executed concurrently. It returns a callback that will be called after
// the task has completed.
2023-01-09 10:33:53 +03:00
type Task func() Callback
2022-12-16 15:26:01 -07:00
// Callback is a function that is returned by a Task. Callbacks are
2022-12-16 15:26:01 -07:00
// called in the same order that tasks are submitted.
type Callback func()
// Go schedules a task to be run in the stream's pool. All submitted tasks
// will be executed concurrently in worker goroutines. Then, the callbacks
// returned by the tasks will be executed in the order that the tasks were
// submitted. All callbacks will be executed by the same goroutine, so no
// synchronization is necessary between callbacks. If all goroutines in the
// stream's pool are busy, a call to Go() will block until the task can be
// started.
2023-01-09 10:33:53 +03:00
func (s *Stream) Go(f Task) {
2023-01-02 11:18:42 -07:00
s.init()
2022-12-16 09:40:59 -07:00
// Get a channel from the cache.
2023-01-02 11:18:42 -07:00
ch := getCh()
2022-12-16 15:26:01 -07:00
// Queue the channel for the callbacker.
2022-12-15 20:29:34 -07:00
s.queue <- ch
2022-12-16 15:26:01 -07:00
// Submit the task for execution.
2022-12-16 09:40:59 -07:00
s.pool.Go(func() {
2022-12-16 17:27:58 -07:00
defer func() {
// In the case of a panic from f, we don't want the callbacker to
// starve waiting for a callback from this channel, so give it an
// empty callback.
if r := recover(); r != nil {
ch <- func() {}
panic(r)
}
}()
// Run the task, sending its callback down this task's channel.
2022-12-16 17:27:58 -07:00
callback := f()
ch <- callback
2022-12-15 20:29:34 -07:00
})
}
2022-12-16 15:26:01 -07:00
// Wait signals to the stream that all tasks have been submitted. Wait will
// not return until all tasks and callbacks have been run.
2022-12-15 20:29:34 -07:00
func (s *Stream) Wait() {
2023-01-02 11:18:42 -07:00
s.init()
2022-12-16 17:27:58 -07:00
// Defer the callbacker cleanup so that it occurs even in the case
// that one of the tasks panics and is propagated up by s.pool.Wait().
2022-12-16 17:27:58 -07:00
defer func() {
2023-01-02 11:18:42 -07:00
close(s.queue)
2022-12-16 17:27:58 -07:00
s.callbackerHandle.Wait()
}()
2022-12-16 09:40:59 -07:00
// Wait for all the workers to exit.
2022-12-15 20:29:34 -07:00
s.pool.Wait()
2022-12-16 17:27:58 -07:00
}
2022-12-16 15:26:01 -07:00
2022-12-16 17:27:58 -07:00
func (s *Stream) WithMaxGoroutines(n int) *Stream {
s.pool.WithMaxGoroutines(n)
return s
2022-12-15 20:29:34 -07:00
}
2022-12-16 09:40:59 -07:00
func (s *Stream) init() {
2023-01-02 11:18:42 -07:00
s.initOnce.Do(func() {
s.queue = make(chan callbackCh, s.pool.MaxGoroutines()+1)
2022-12-16 09:40:59 -07:00
// Start the callbacker.
2023-01-02 11:18:42 -07:00
s.callbackerHandle.Go(s.callbacker)
})
2022-12-16 09:40:59 -07:00
}
2022-12-16 17:27:58 -07:00
// callbacker is responsible for calling the returned callbacks in the order
// they were submitted. There is only a single instance of callbacker running.
2022-12-15 20:29:34 -07:00
func (s *Stream) callbacker() {
var panicCatcher panics.Catcher
2022-12-24 23:07:09 -07:00
defer panicCatcher.Repanic()
2022-12-16 17:27:58 -07:00
2022-12-16 15:26:01 -07:00
// For every scheduled task, read that tasks channel from the queue.
2022-12-15 20:29:34 -07:00
for callbackCh := range s.queue {
// Wait for the task to complete and get its callback from the channel.
2022-12-15 20:29:34 -07:00
callback := <-callbackCh
2022-12-16 15:26:01 -07:00
// Execute the callback (with panic protection).
if callback != nil {
panicCatcher.Try(callback)
}
2022-12-16 15:26:01 -07:00
// Return the channel to the pool of unused channels.
2023-01-02 11:18:42 -07:00
putCh(callbackCh)
2022-12-15 20:29:34 -07:00
}
}
type callbackCh chan func()
2023-01-02 11:19:05 -07:00
var callbackChPool = sync.Pool{
New: func() any {
return make(callbackCh, 1)
},
}
func getCh() callbackCh {
return callbackChPool.Get().(callbackCh)
}
func putCh(ch callbackCh) {
callbackChPool.Put(ch)
}