🎯 Задача (Go, продвинутая конкуррентность; версия: Go 1.21+)
Сделай универсальную функцию обработки массива с контролем параллельности, сохранением исходного порядка результатов и мгновенной отменой при первой ошибке.
Требования:- Сигнатура:
Process[T any, R any](ctx context.Context, in []T, worker func(context.Context, T) (R, error), parallelism int) ([]R, error)
- Параллельная обработка не более parallelism задач одновременно.
- Результаты возвращаются в том же порядке, что и входной срез, даже если отдельные задачи завершаются вразнобой.
- При первой ошибке:
- немедленно отменить все ещё выполняющиеся задачи,
- вернуть первую ошибку,
- не оставить «утекших» горутин.
- Учитывать ctx.Done() и корректно завершаться по таймауту/отмене.
- Без внешних зависимостей; только стандартная библиотека.
Подсказка:- Используй context.WithCancelCause для распространения причины отмены.
- Организуй пул рабочих через буферизованный канал с задачами.
- Результаты складывай по индексу, чтобы сохранить порядок.
- Для потокозащищённой фиксации первой ошибки используй sync.Once.
Ниже — эталонная реализация и пример использования.
Код (Go 1.21+):
package mainimport ( "context" "errors" "fmt" "math/rand" "sync" "time")type job[T any] struct { i int val T}func Process[T any, R any]( parent context.Context, in []T, worker func(context.Context, T) (R, error), parallelism int,) ([]R, error) { if parallelism <= 0 { return nil, errors.New("parallelism must be > 0") } ctx, cancel := context.WithCancelCause(parent) defer cancel(nil) jobs := make(chan job[T], parallelism) // лёгкая обратная давление out := make([]R, len(in)) var wg sync.WaitGroup var once sync.Once var firstErr error // Рабочие workerFn := func() { defer wg.Done() for { select { case <-ctx.Done(): return case j, ok := <-jobs: if !ok { return } res, err := worker(ctx, j.val) if err != nil { once.Do(func() { firstErr = err cancel(err) // прерываем остальных }) return } // Сохраняем порядок out[j.i] = res } } } // Старт пула wg.Add(parallelism) for k := 0; k < parallelism; k++ { go workerFn() } // Диспетчер задачsendLoop: for i, v := range in { select { case <-ctx.Done(): break sendLoop case jobs <- job[T]{i: i, val: v}: } } close(jobs) // Ждём завершения wg.Wait() // Если была отмена по ошибке — вернём её if firstErr != nil { return nil, firstErr } // Если отменил родительский контекст — вернём его причину if err := context.Cause(ctx); err != nil && !errors.Is(err, context.Canceled) && !errors.Is(err, context.DeadlineExceeded) { // это cause из cancel(err), уже обработали выше } else if err := context.Cause(parent); err != nil { return nil, err } return out, nil}// Демонстрация: умножаем числа с случайной задержкой; каждое третье число — ошибка.// Видно, что вывод упорядочен по входу, а отмена срабатывает на первой ошибке.func main() { rand.New(rand.NewSource(time.Now().UnixNano())) ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) defer cancel() input := []int{1, 2, 3, 4, 5, 6, 7} parallelism := 3 worker := func(ctx context.Context, x int) (int, error) { // эмуляция непредсказуемого времени работы time.Sleep(time.Duration(rand.Intn(120)) * time.Millisecond) if x%7 == 0 { // попробуй поменять условие на (x%3==0), чтобы увидеть раннюю отмену return 0, fmt.Errorf("bad luck on %d", x) } select { case <-ctx.Done(): return 0, ctx.Err() default: return x * x, nil } } res, err := Process[int, int](ctx, input, worker, parallelism) if err != nil { fmt.Println("error:", err) return } fmt.Println("results:", res)}
Как проверить:- go run . — запусти несколько раз, чтобы увидеть разные порядки завершения, но стабильный порядок результатов.
- Поменяй порог ошибки в worker (например, x%3==0), чтобы убедиться, что отмена срабатывает мгновенно и горутины не висят.
- Проверь на гонки: go run -race .