Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support pipe for dump operator #4176

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions docs/operator.md
Original file line number Diff line number Diff line change
Expand Up @@ -536,6 +536,18 @@ Available options:
`tag`
: Associate the channel with a tag that can be specified with the `-dump-channels` option to select which channels to dump.

:::{versionadded} 23.09.0-edge
The `dump` operator can be chained like any other operator:

```groovy
Channel.of('foo', 'bar', 'baz')
| dump(tag: 'words')
| map { it[0] }
| unique
| dump(tag: 'first_letters')
```
:::

## filter

*Returns: queue channel*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,23 +54,6 @@ class ChannelEx {
// }
// }

static DataflowWriteChannel dump(final DataflowWriteChannel source, Closure closure = null) {
dump(source, Collections.emptyMap(), closure)
}

static DataflowWriteChannel dump(final DataflowWriteChannel source, Map opts, Closure closure = null) {
def op = new DumpOp(opts, closure)
if( op.isEnabled() ) {
op.setSource(source)
def target = op.apply()
NodeMarker.addOperatorNode('dump', source, target)
return target
}
else {
return source
}
}

/**
* Creates a channel emitting the entries in the collection to which is applied
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,15 +51,14 @@ class DumpOp {

DumpOp(Map opts, Closure<String> renderer) {
checkParams('dump', opts, PARAMS_DUMP)
this.source = source
this.tag = opts.tag
this.pretty = opts.pretty ?: false
this.renderer = renderer
this.dumpNames = session.getDumpChannels()
}

DumpOp setSource( DataflowWriteChannel source ) {
this.source = CH.getReadChannel(source)
DumpOp setSource( DataflowReadChannel source ) {
this.source = source
return this
}

Expand All @@ -79,12 +78,6 @@ class DumpOp {

DataflowWriteChannel apply() {

if( !isEnabled() ) {
if( source instanceof DataflowWriteChannel )
return (DataflowWriteChannel)source
throw new IllegalArgumentException("Illegal dump operator source channel")
}

final target = CH.createBy(source)
final events = new HashMap(2)
events.onNext = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,26 @@ class OperatorImpl {

private static Session getSession() { Global.getSession() as Session }

DataflowWriteChannel dump(final DataflowReadChannel source, Closure closure = null) {
dump(source, Collections.emptyMap(), closure)
}

DataflowWriteChannel dump(final DataflowReadChannel source, Map opts, Closure closure = null) {
def op = new DumpOp(opts, closure)
if( op.isEnabled() ) {
op.setSource(source)
return op.apply()
}
else {
def target = CH.createBy(source)
DataflowHelper.subscribeImpl(source, [
onNext: { target.bind(it) },
onComplete: { CH.close0(target) }
])
return target
}
}

/**
* Subscribe *onNext* event
*
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/*
* Copyright 2013-2023, Seqera Labs
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package nextflow.extension

import nextflow.Channel
import nextflow.Session
import test.Dsl2Spec
import test.OutputCapture

/**
*
* @author Ben Sherman <bentshermann@gmail.com>
*/
class DumpOp2Test extends Dsl2Spec {

@org.junit.Rule
OutputCapture capture = new OutputCapture()

def 'should support pipe' () {

given:
new Session(dumpChannels: ['*'])

when:
def result = dsl_eval(/
Channel.of(1, 2, 3) | dump
/)
def stdout = capture.toString()
then:
result.val == 1
result.val == 2
result.val == 3
result.val == Channel.STOP
stdout.contains('[DUMP] 1')
stdout.contains('[DUMP] 2')
stdout.contains('[DUMP] 3')

}

}
Loading