diff --git a/docs/operator.md b/docs/operator.md index 4e7f415010..1e9b498ad3 100644 --- a/docs/operator.md +++ b/docs/operator.md @@ -536,6 +536,18 @@ Available options: `tag` : Associate the channel with a tag that can be specified with the `-dump-channels` option to select which channels to dump. +:::{versionadded} 23.09.0-edge +The `dump` operator can be chained like any other operator: + +```groovy +Channel.of('foo', 'bar', 'baz') + | dump(tag: 'words') + | map { it[0] } + | unique + | dump(tag: 'first_letters') +``` +::: + ## filter *Returns: queue channel* diff --git a/modules/nextflow/src/main/groovy/nextflow/extension/ChannelEx.groovy b/modules/nextflow/src/main/groovy/nextflow/extension/ChannelEx.groovy index ba68ef0390..783086b9bd 100644 --- a/modules/nextflow/src/main/groovy/nextflow/extension/ChannelEx.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/extension/ChannelEx.groovy @@ -54,23 +54,6 @@ class ChannelEx { // } // } - static DataflowWriteChannel dump(final DataflowWriteChannel source, Closure closure = null) { - dump(source, Collections.emptyMap(), closure) - } - - static DataflowWriteChannel dump(final DataflowWriteChannel source, Map opts, Closure closure = null) { - def op = new DumpOp(opts, closure) - if( op.isEnabled() ) { - op.setSource(source) - def target = op.apply() - NodeMarker.addOperatorNode('dump', source, target) - return target - } - else { - return source - } - } - /** * Creates a channel emitting the entries in the collection to which is applied * diff --git a/modules/nextflow/src/main/groovy/nextflow/extension/DumpOp.groovy b/modules/nextflow/src/main/groovy/nextflow/extension/DumpOp.groovy index 2aae01dbab..a7d4afd673 100644 --- a/modules/nextflow/src/main/groovy/nextflow/extension/DumpOp.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/extension/DumpOp.groovy @@ -51,15 +51,14 @@ class DumpOp { DumpOp(Map opts, Closure renderer) { checkParams('dump', opts, PARAMS_DUMP) - this.source = source this.tag = opts.tag this.pretty = opts.pretty ?: false this.renderer = renderer this.dumpNames = session.getDumpChannels() } - DumpOp setSource( DataflowWriteChannel source ) { - this.source = CH.getReadChannel(source) + DumpOp setSource( DataflowReadChannel source ) { + this.source = source return this } @@ -79,12 +78,6 @@ class DumpOp { DataflowWriteChannel apply() { - if( !isEnabled() ) { - if( source instanceof DataflowWriteChannel ) - return (DataflowWriteChannel)source - throw new IllegalArgumentException("Illegal dump operator source channel") - } - final target = CH.createBy(source) final events = new HashMap(2) events.onNext = { diff --git a/modules/nextflow/src/main/groovy/nextflow/extension/OperatorImpl.groovy b/modules/nextflow/src/main/groovy/nextflow/extension/OperatorImpl.groovy index 63970a8c55..d31743d0f3 100644 --- a/modules/nextflow/src/main/groovy/nextflow/extension/OperatorImpl.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/extension/OperatorImpl.groovy @@ -61,6 +61,26 @@ class OperatorImpl { private static Session getSession() { Global.getSession() as Session } + DataflowWriteChannel dump(final DataflowReadChannel source, Closure closure = null) { + dump(source, Collections.emptyMap(), closure) + } + + DataflowWriteChannel dump(final DataflowReadChannel source, Map opts, Closure closure = null) { + def op = new DumpOp(opts, closure) + if( op.isEnabled() ) { + op.setSource(source) + return op.apply() + } + else { + def target = CH.createBy(source) + DataflowHelper.subscribeImpl(source, [ + onNext: { target.bind(it) }, + onComplete: { CH.close0(target) } + ]) + return target + } + } + /** * Subscribe *onNext* event * diff --git a/modules/nextflow/src/test/groovy/nextflow/extension/DumpOp2Test.groovy b/modules/nextflow/src/test/groovy/nextflow/extension/DumpOp2Test.groovy new file mode 100644 index 0000000000..f073cface2 --- /dev/null +++ b/modules/nextflow/src/test/groovy/nextflow/extension/DumpOp2Test.groovy @@ -0,0 +1,54 @@ +/* + * Copyright 2013-2023, Seqera Labs + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package nextflow.extension + +import nextflow.Channel +import nextflow.Session +import test.Dsl2Spec +import test.OutputCapture + +/** + * + * @author Ben Sherman + */ +class DumpOp2Test extends Dsl2Spec { + + @org.junit.Rule + OutputCapture capture = new OutputCapture() + + def 'should support pipe' () { + + given: + new Session(dumpChannels: ['*']) + + when: + def result = dsl_eval(/ + Channel.of(1, 2, 3) | dump + /) + def stdout = capture.toString() + then: + result.val == 1 + result.val == 2 + result.val == 3 + result.val == Channel.STOP + stdout.contains('[DUMP] 1') + stdout.contains('[DUMP] 2') + stdout.contains('[DUMP] 3') + + } + +}