Skip to content

Commit

Permalink
More consistent naming
Browse files Browse the repository at this point in the history
  • Loading branch information
destel committed Oct 1, 2024
1 parent bbda889 commit 1ba4461
Show file tree
Hide file tree
Showing 4 changed files with 15 additions and 15 deletions.
6 changes: 3 additions & 3 deletions batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (
// Batch take a stream of items and returns a stream of batches based on a maximum size and a timeout.
//
// A batch is emitted when one of the following conditions is met:
// - The batch reaches the size of n items
// - The batch reaches the maximum size
// - The time since the first item was added to the batch exceeds the timeout
// - The input stream is closed
//
Expand All @@ -19,9 +19,9 @@ import (
// This is a non-blocking ordered function that processes items sequentially.
//
// See the package documentation for more information on non-blocking ordered functions and error handling.
func Batch[A any](in <-chan Try[A], n int, timeout time.Duration) <-chan Try[[]A] {
func Batch[A any](in <-chan Try[A], size int, timeout time.Duration) <-chan Try[[]A] {
values, errs := ToChans(in)
batches := core.Batch(values, n, timeout)
batches := core.Batch(values, size, timeout)
return FromChans(batches, errs)
}

Expand Down
12 changes: 6 additions & 6 deletions internal/core/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (
// A batch is emitted when it reaches the maximum size, the timeout expires, or the input channel closes.
// This function never emits empty batches. The timeout countdown starts when the first item is added to a new batch.
// To emit batches only when full, set the timeout to -1. Zero timeout is not supported and will panic.
func Batch[A any](in <-chan A, n int, timeout time.Duration) <-chan []A {
func Batch[A any](in <-chan A, size int, timeout time.Duration) <-chan []A {
if in == nil {
return nil
}
Expand All @@ -27,9 +27,9 @@ func Batch[A any](in <-chan A, n int, timeout time.Duration) <-chan []A {
var batch []A
for a := range in {
batch = append(batch, a)
if len(batch) >= n {
if len(batch) >= size {
out <- batch
batch = make([]A, 0, n)
batch = make([]A, 0, size)
}
}
if len(batch) > 0 {
Expand All @@ -40,14 +40,14 @@ func Batch[A any](in <-chan A, n int, timeout time.Duration) <-chan []A {
default:
// finite timeout
go func() {
batch := make([]A, 0, n)
batch := make([]A, 0, size)
t := time.NewTicker(1 * time.Hour)
t.Stop()

flush := func() {
if len(batch) > 0 {
out <- batch
batch = make([]A, 0, n)
batch = make([]A, 0, size)
}

t.Stop()
Expand Down Expand Up @@ -81,7 +81,7 @@ func Batch[A any](in <-chan A, n int, timeout time.Duration) <-chan []A {
t.Reset(timeout)
}

if len(batch) >= n {
if len(batch) >= size {
// batch is full
flush()
}
Expand Down
6 changes: 3 additions & 3 deletions internal/core/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,9 @@ func DrainNB[A any](in <-chan A) {
go Drain(in)
}

func Buffer[A any](in <-chan A, n int) <-chan A {
// we use n-1 since 1 additional item is held on the stack (x variable)
out := make(chan A, n-1)
func Buffer[A any](in <-chan A, size int) <-chan A {
// we use size-1 since 1 additional item is held on the stack (x variable)
out := make(chan A, size-1)

go func() {
defer close(out)
Expand Down
6 changes: 3 additions & 3 deletions util.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,14 @@ func DrainNB[A any](in <-chan A) {
// Buffer takes a channel of items and returns a buffered channel of exact same items in the same order.
// This can be useful for preventing write operations on the input channel from blocking, especially if subsequent stages
// in the processing pipeline are slow.
// Buffering allows up to n items to be held in memory before back pressure is applied to the upstream producer.
// Buffering allows up to size items to be held in memory before back pressure is applied to the upstream producer.
//
// Typical usage of Buffer might look like this:
//
// users := getUsers(ctx, companyID)
// users = rill.Buffer(users, 100)
// // Now work with the users channel as usual.
// // Up to 100 users can be buffered if subsequent stages of the pipeline are slow.
func Buffer[A any](in <-chan A, n int) <-chan A {
return core.Buffer(in, n)
func Buffer[A any](in <-chan A, size int) <-chan A {
return core.Buffer(in, size)
}

0 comments on commit 1ba4461

Please sign in to comment.