From 3d4640307b11d5540fe31ef29e20e62b59792804 Mon Sep 17 00:00:00 2001
From: Ben Sherman <bentshermann@gmail.com>
Date: Thu, 10 Aug 2023 15:44:34 -0500
Subject: [PATCH 1/2] Make dump operator chainable

Signed-off-by: Ben Sherman <bentshermann@gmail.com>
---
 docs/operator.md                              | 12 +++++++++++
 .../nextflow/extension/ChannelEx.groovy       | 17 ----------------
 .../groovy/nextflow/extension/DumpOp.groovy   | 11 ++--------
 .../nextflow/extension/OperatorImpl.groovy    | 20 +++++++++++++++++++
 4 files changed, 34 insertions(+), 26 deletions(-)

diff --git a/docs/operator.md b/docs/operator.md
index 636d82cbe7..ed673f0966 100644
--- a/docs/operator.md
+++ b/docs/operator.md
@@ -651,6 +651,18 @@ Channel
 ```
 :::
 
+:::{versionadded} 23.09.0-edge
+The `dump` operator can be chained like any other operator:
+
+```groovy
+Channel.of('foo', 'bar', 'baz')
+    | dump(tag: 'words')
+    | map { it[0] }
+    | unique
+    | dump(tag: 'first_letters')
+```
+:::
+
 ## filter
 
 *Returns: queue channel*
diff --git a/modules/nextflow/src/main/groovy/nextflow/extension/ChannelEx.groovy b/modules/nextflow/src/main/groovy/nextflow/extension/ChannelEx.groovy
index 499124ce44..435668584c 100644
--- a/modules/nextflow/src/main/groovy/nextflow/extension/ChannelEx.groovy
+++ b/modules/nextflow/src/main/groovy/nextflow/extension/ChannelEx.groovy
@@ -54,23 +54,6 @@ class ChannelEx {
 //        }
 //    }
 
-    static DataflowWriteChannel dump(final DataflowWriteChannel source, Closure closure = null) {
-        dump(source, Collections.emptyMap(), closure)
-    }
-
-    static DataflowWriteChannel dump(final DataflowWriteChannel source, Map opts, Closure closure = null) {
-        def op = new DumpOp(opts, closure)
-        if( op.isEnabled() ) {
-            op.setSource(source)
-            def target = op.apply()
-            NodeMarker.addOperatorNode('dump', source, target)
-            return target
-        }
-        else {
-            return source
-        }
-    }
-
     /**
      * Creates a channel emitting the entries in the collection to which is applied
      *
diff --git a/modules/nextflow/src/main/groovy/nextflow/extension/DumpOp.groovy b/modules/nextflow/src/main/groovy/nextflow/extension/DumpOp.groovy
index 6e499c048b..4d6ce5e89b 100644
--- a/modules/nextflow/src/main/groovy/nextflow/extension/DumpOp.groovy
+++ b/modules/nextflow/src/main/groovy/nextflow/extension/DumpOp.groovy
@@ -51,15 +51,14 @@ class DumpOp {
 
     DumpOp(Map opts, Closure<String> renderer) {
         checkParams('dump', opts, PARAMS_DUMP)
-        this.source = source
         this.tag = opts.tag
         this.pretty = opts.pretty ?: false
         this.renderer = renderer
         this.dumpNames = session.getDumpChannels()
     }
 
-    DumpOp setSource( DataflowWriteChannel source ) {
-        this.source = CH.getReadChannel(source)
+    DumpOp setSource( DataflowReadChannel source ) {
+        this.source = source
         return this
     }
 
@@ -79,12 +78,6 @@ class DumpOp {
 
     DataflowWriteChannel apply() {
 
-        if( !isEnabled() ) {
-            if( source instanceof DataflowWriteChannel )
-                return (DataflowWriteChannel)source
-            throw new IllegalArgumentException("Illegal dump operator source channel")
-        }
-
         final target = CH.createBy(source)
         final events = new HashMap(2)
         events.onNext = {
diff --git a/modules/nextflow/src/main/groovy/nextflow/extension/OperatorImpl.groovy b/modules/nextflow/src/main/groovy/nextflow/extension/OperatorImpl.groovy
index 12c6abbe28..3a9ff41614 100644
--- a/modules/nextflow/src/main/groovy/nextflow/extension/OperatorImpl.groovy
+++ b/modules/nextflow/src/main/groovy/nextflow/extension/OperatorImpl.groovy
@@ -60,6 +60,26 @@ class OperatorImpl {
 
     private static Session getSession() { Global.getSession() as Session }
 
+    DataflowWriteChannel dump(final DataflowReadChannel source, Closure closure = null) {
+        dump(source, Collections.emptyMap(), closure)
+    }
+
+    DataflowWriteChannel dump(final DataflowReadChannel source, Map opts, Closure closure = null) {
+        def op = new DumpOp(opts, closure)
+        if( op.isEnabled() ) {
+            op.setSource(source)
+            return op.apply()
+        }
+        else {
+            def target = CH.createBy(source)
+            DataflowHelper.subscribeImpl(source, [
+                onNext: { target.bind(it) },
+                onComplete: { CH.close0(target) }
+            ])
+            return target
+        }
+    }
+
     /**
      * Subscribe *onNext* event
      *

From 488c22d26495e3add68dd7db252b6db76b12f76a Mon Sep 17 00:00:00 2001
From: Ben Sherman <bentshermann@gmail.com>
Date: Fri, 11 Aug 2023 20:47:26 -0500
Subject: [PATCH 2/2] Add unit test

Signed-off-by: Ben Sherman <bentshermann@gmail.com>
---
 .../nextflow/extension/DumpOp2Test.groovy     | 54 +++++++++++++++++++
 1 file changed, 54 insertions(+)
 create mode 100644 modules/nextflow/src/test/groovy/nextflow/extension/DumpOp2Test.groovy

diff --git a/modules/nextflow/src/test/groovy/nextflow/extension/DumpOp2Test.groovy b/modules/nextflow/src/test/groovy/nextflow/extension/DumpOp2Test.groovy
new file mode 100644
index 0000000000..f073cface2
--- /dev/null
+++ b/modules/nextflow/src/test/groovy/nextflow/extension/DumpOp2Test.groovy
@@ -0,0 +1,54 @@
+/*
+ * Copyright 2013-2023, Seqera Labs
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package nextflow.extension
+
+import nextflow.Channel
+import nextflow.Session
+import test.Dsl2Spec
+import test.OutputCapture
+
+/**
+ *
+ * @author Ben Sherman <bentshermann@gmail.com>
+ */
+class DumpOp2Test extends Dsl2Spec {
+
+    @org.junit.Rule
+    OutputCapture capture = new OutputCapture()
+
+    def 'should support pipe' () {
+
+        given:
+        new Session(dumpChannels: ['*'])
+
+        when:
+        def result = dsl_eval(/
+            Channel.of(1, 2, 3) | dump
+        /)
+        def stdout = capture.toString()
+        then:
+        result.val == 1
+        result.val == 2
+        result.val == 3
+        result.val == Channel.STOP
+        stdout.contains('[DUMP] 1')
+        stdout.contains('[DUMP] 2')
+        stdout.contains('[DUMP] 3')
+
+    }
+
+}