-
Notifications
You must be signed in to change notification settings - Fork 336
feat(iter): Added context accepting variants of Map & ForEach #114
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from 15 commits
60f3996
a22d62e
57ae9ca
722e19d
e9a88bf
c6581d3
4a15de5
3ae45b6
11cbc39
1d04c68
d71cc05
2f570b7
bba9127
825d1df
296e00b
9018777
08c699d
56a8612
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,10 +1,11 @@ | ||
package iter | ||
|
||
import ( | ||
"context" | ||
"runtime" | ||
"sync/atomic" | ||
|
||
"github.com/sourcegraph/conc" | ||
"github.com/sourcegraph/conc/pool" | ||
) | ||
|
||
// defaultMaxGoroutines returns the default maximum number of | ||
|
@@ -57,29 +58,69 @@ func ForEachIdx[T any](input []T, f func(int, *T)) { Iterator[T]{}.ForEachIdx(in | |
// ForEachIdx is the same as ForEach except it also provides the | ||
// index of the element to the callback. | ||
func (iter Iterator[T]) ForEachIdx(input []T, f func(int, *T)) { | ||
_ = iter.ForEachIdxCtx(context.Background(), input, func(_ context.Context, idx int, input *T) error { | ||
f(idx, input) | ||
return nil | ||
}) | ||
} | ||
|
||
// ForEachCtx is the same as ForEach except it also accepts a context | ||
// that it uses to manages the execution of tasks. | ||
// The context is cancelled on task failure and the first error is returned. | ||
func ForEachCtx[T any](octx context.Context, input []T, f func(context.Context, *T) error) error { | ||
return Iterator[T]{}.ForEachCtx(octx, input, f) | ||
} | ||
|
||
// ForEachCtx is the same as ForEach except it also accepts a context | ||
// that it uses to manages the execution of tasks. | ||
// The context is cancelled on task failure and the first error is returned. | ||
func (iter Iterator[T]) ForEachCtx(octx context.Context, input []T, f func(context.Context, *T) error) error { | ||
return iter.ForEachIdxCtx(octx, input, func(ctx context.Context, _ int, input *T) error { | ||
return f(ctx, input) | ||
}) | ||
} | ||
|
||
// ForEachIdxCtx is the same as ForEachIdx except it also accepts a context | ||
// that it uses to manages the execution of tasks. | ||
// The context is cancelled on task failure and the first error is returned. | ||
func ForEachIdxCtx[T any](octx context.Context, input []T, f func(context.Context, int, *T) error) error { | ||
return Iterator[T]{}.ForEachIdxCtx(octx, input, f) | ||
} | ||
|
||
// ForEachIdxCtx is the same as ForEachIdx except it also accepts a context | ||
// that it uses to manages the execution of tasks. | ||
// The context is cancelled on task failure and the first error is returned. | ||
func (iter Iterator[T]) ForEachIdxCtx(octx context.Context, input []T, f func(context.Context, int, *T) error) error { | ||
if iter.MaxGoroutines == 0 { | ||
// iter is a value receiver and is hence safe to mutate | ||
iter.MaxGoroutines = defaultMaxGoroutines() | ||
} | ||
|
||
numInput := len(input) | ||
if iter.MaxGoroutines > numInput { | ||
if iter.MaxGoroutines > numInput && numInput > 0 { | ||
// No more concurrent tasks than the number of input items. | ||
iter.MaxGoroutines = numInput | ||
} | ||
|
||
var idx atomic.Int64 | ||
// Create the task outside the loop to avoid extra closure allocations. | ||
task := func() { | ||
task := func(ctx context.Context) error { | ||
i := int(idx.Add(1) - 1) | ||
for ; i < numInput; i = int(idx.Add(1) - 1) { | ||
f(i, &input[i]) | ||
for ; i < numInput && ctx.Err() == nil; i = int(idx.Add(1) - 1) { | ||
if err := f(ctx, i, &input[i]); err != nil { | ||
return err | ||
} | ||
} | ||
return ctx.Err() // nil if the context was never cancelled | ||
} | ||
|
||
var wg conc.WaitGroup | ||
runner := pool.New(). | ||
WithContext(octx). | ||
WithCancelOnError(). | ||
WithFirstError(). | ||
WithMaxGoroutines(iter.MaxGoroutines) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I would prefer to not add a dependency on I think it would be good to reconcile this PR with the patterns in #104. In particular, the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The abstractions built into Additionally, as a further argument for using I could add the *Err variants as extensions to what I have here that just return an error but don't require a context. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Related: #118
You might be right. The simplicity of using a Let me noodle on the design a bit and get back to ya. I'll probably open a draft that unifies this with #104 and the |
||
for i := 0; i < iter.MaxGoroutines; i++ { | ||
wg.Go(task) | ||
runner.Go(task) | ||
} | ||
wg.Wait() | ||
return runner.Wait() | ||
} |
Uh oh!
There was an error while loading. Please reload this page.