Skip to content

Concurrency limiting goroutine pool without upper limit on queue length. Extends github.com/gammazero/workerpool

License

Notifications You must be signed in to change notification settings

matthewoestreich/workerpoolxt

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

workerpoolxt

Worker pool library that extends https://github.com/gammazero/workerpool

go get github.com/matthewoestreich/workerpoolxt

pkg.go.dev


Synopsis

We wrap the func(s) that get passed to workerpool, which we call "jobs". Job results will be returned to you after all jobs have completed.

You have the ability to give each job a name. You can access job results, job runtime duration, or any job errors within job results. In gammazero/workerpool this is not possible - you do not have the ability to get any sort of result or error data from a "job".

You still retain access to all gammazero/workerpool members. See Native Member Access for more.

Generics

If you want to use this package with type-safe generics, please see here.

Import

import wpxt "github.com/matthewoestreich/workerpoolxt"

Examples

Hello, world!

Playground

numWorkers := 5
pool := wpxt.New(numWorkers)

// Or if you have an existing |*workerpool.WorkerPool| instance
pool := wpxt.WithWorkerPool(existingWorkerPool)

helloWorldJob := &wpxt.Job{
  Name: "Hello world job",
  // Function signature must be |func() (any, error)|
  Function: func() (any, error) {
    pretendError := nil
    if pretendError != nil {
      // To demonstrate how you would return an error
      return nil, theError
    }
    return "Hello, world!", nil
  },
}

// Submit job
pool.SubmitXT(helloWorldJob)

// Block main thread until all jobs are done.
results := pool.StopWaitXT()

// Results also accessable via
sameResults := pool.Results()

// Grab first result
r := results[0]

// Print it to console
fmt.Printf(
  "Name: %s\nData: %v\nDuration: %v\nError?: %v", 
  r.Name, r.Data, r.Duration, r.Error,
)
/*
Name: Hello world job
Data: Hello, world!
Duration: 3.139µs
Error?: <nil>
*/

Using Context

Works with timeouts, cancelled context, etc..

The point is: you have full control over every job.

&wpxt.Job{
  Name: "Job using context",
  Function: func() (any, error) {
    timeout := 10 * time.Second
    ctx, cancel := context.WithTimeout(context.Background(), timeout)
    defer cancel()
    result, err := LongRunningTaskWithContext(ctx)
    if err != nil {
    	return nil, err
    }
    return result, nil
  },
}

Using Retry

You can use something like backoff for this (just an example).

The point is: you have full control over every job.

&wpxt.Job{
  Name: "Job using retry",
  Function: func() (any, error) {
    work := func() (string, error) {
      if /* Some result is an error */ {
        return "", theError
      }
      return "IT SUCCEEDED", nil
    }
    expBackoff := backoff.WithBackOff(backoff.NewExponentialBackOff())
    result, err := backoff.Retry(ctx, work, expBackoff)
    if err != nil {
    	return nil, err
    }
    return result, nil
  },
}

Native Member Access

Using native gammazero/workerpool members with workerpoolxt.

StopWait

If you would like to use the native .StopWait() instead of allResults := .StopWaitXT(), here is how you can still gain access to job results.

wp := wpxt.New(4)

//
// Submitting a bunch of jobs here
//

wp.StopWait()

allResults := wp.Results()
// Do something with |allResults|

About

Concurrency limiting goroutine pool without upper limit on queue length. Extends github.com/gammazero/workerpool

Topics

Resources

License

Stars

Watchers

Forks

Languages