Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Data generator with multiple file output #105

Open
noproblem666 opened this issue Sep 29, 2016 · 7 comments
Open

Data generator with multiple file output #105

noproblem666 opened this issue Sep 29, 2016 · 7 comments
Assignees
Labels

Comments

@noproblem666
Copy link
Contributor

We have a data generator for a KMeans benchmark and want to use it with the PEEL framework.
The generator produces 2 files, points and centers and run as a flink job. We want to save these files in <hdfs-root-directory >/kmeans using the GeneratedDataSet class and then pick these files with the KMeans flink job.

My question is: How can we configure PEEL to create the directory kmeans in HDFS and then copy the files to that directory? With our current configuration shown below that does not work.

     <!--************************************************************************
    * Data Generators
    *************************************************************************-->

    <bean id="datagen.kmeans" class="org.peelframework.flink.beans.job.FlinkJob">
        <constructor-arg name="runner" ref="flink-1.0.3"/>
        <constructor-arg name="command">
            <value><![CDATA[
              -v -c org.apache.flink.examples.java.clustering.util.KMeansDataGenerator  \
              ${app.path.datagens}/KMeans.jar                                                                                   \
              --points ${datagen.points}                                                                                                \
              --k ${datagen.k}                                                                                                                  \
               --output ${system.hadoop-2.path.input}/kmeans
            ]]>
            </value>
        </constructor-arg>
    </bean>

    <!--************************************************************************
    * Data Sets
    *************************************************************************-->

        <bean id="dataset.kmeans.generated" class="org.peelframework.core.beans.data.GeneratedDataSet">
        <constructor-arg name="src" ref="datagen.kmeans"/>
        <constructor-arg name="dst" value="${system.hadoop-2.path.input}/kmeans"/>
        <constructor-arg name="fs" ref="hdfs-2.7.1"/>
    </bean>

The usage of our data generator is similar to the WordGenetator except that it produces 2 files instead of just one.

Do you have an idea how we could solve this problem with PEEL or do we have to adjust our data generator?

Thanks!

@aalexandrov aalexandrov self-assigned this Sep 29, 2016
@aalexandrov
Copy link
Member

aalexandrov commented Sep 29, 2016

I think this should work, and at runtime the generator will run only once.

I suggest to try the latest SNAPSHOT from the master as it fixes some issues related to the setup / teardown logic of systems and dataset materialization.

<!--************************************************************************
* Data Generators
*************************************************************************-->

<bean id="datagen.kmeans" class="org.peelframework.flink.beans.job.FlinkJob">
<constructor-arg name="runner" ref="flink-1.0.3"/>
<constructor-arg name="command">
    <value><![CDATA[
      -v -c org.apache.flink.examples.java.clustering.util.KMeansDataGenerator  \
      ${app.path.datagens}/KMeans.jar                                                                                   \
      --points ${datagen.points}                                                                                                \
      --k ${datagen.k}                                                                                                                  \
       --output ${system.hadoop-2.path.input}/kmeans
    ]]>
    </value>
</constructor-arg>
</bean>

<!--************************************************************************
* Data Sets
*************************************************************************-->

<bean id="dataset.kmeans.points.generated" class="org.peelframework.core.beans.data.GeneratedDataSet">
    <constructor-arg name="src" ref="datagen.kmeans"/>
    <constructor-arg name="dst" value="${system.hadoop-2.path.input}/kmeans/points.csv"/>
    <constructor-arg name="fs" ref="hdfs-2.7.1"/>
</bean>


<bean id="dataset.kmeans.means.generated" class="org.peelframework.core.beans.data.GeneratedDataSet">
    <constructor-arg name="src" ref="datagen.kmeans"/>
    <constructor-arg name="dst" value="${system.hadoop-2.path.input}/kmeans/means.csv"/>
    <constructor-arg name="fs" ref="hdfs-2.7.1"/>
</bean>

@noproblem666
Copy link
Contributor Author

Thanks for your fast reply!
Unfortunately, this does not work. I got the following exception:

Configuration problem: Bean name 'dataset.kmeans.generated' is already used in this <beans> element 

It seems that the bean cannot used twice.

@aalexandrov
Copy link
Member

Then duplicate the bean definition as well (using the same command and runner values).

@noproblem666
Copy link
Contributor Author

noproblem666 commented Sep 29, 2016

Sorry, my fault! I forgot to change the bean id for each GeneratedDataSet.
Now, the experiment starts but the job does not finish successfully.
This is the experiment configuration for the data generation:

    <!--************************************************************************
    * Data Generators
    *************************************************************************-->

    <bean id="datagen.kmeans" class="org.peelframework.flink.beans.job.FlinkJob">
        <constructor-arg name="runner" ref="flink-1.0.3"/>
        <constructor-arg name="command">
            <value><![CDATA[
              -v -c org.apache.flink.examples.java.clustering.util.KMeansDataGenerator  \
              ${app.path.apps}/KMeans.jar                                                                                       \
              --points ${datagen.points}                                                                                                \
              --k ${datagen.k}                                                                                                                  \
              --output ${system.hadoop-2.path.input}/kmeans
            ]]>
            </value>
        </constructor-arg>
    </bean>

    <!--************************************************************************
    * Data Sets
    *************************************************************************-->

        <bean id="dataset.kmeans.points.generated" class="org.peelframework.core.beans.data.GeneratedDataSet">
        <constructor-arg name="src" ref="datagen.kmeans"/>
        <constructor-arg name="dst" value="${system.hadoop-2.path.input}/kmeans/points:"/>
        <constructor-arg name="fs" ref="hdfs-2.7.1"/>
    </bean>

        <bean id="dataset.kmeans.centers.generated" class="org.peelframework.core.beans.data.GeneratedDataSet">
        <constructor-arg name="src" ref="datagen.kmeans"/>
        <constructor-arg name="dst" value="${system.hadoop-2.path.input}/kmeans/centers"/>
        <constructor-arg name="fs" ref="hdfs-2.7.1"/>
    </bean>

This is the error message from stdout:

16-09-29 16:36:28 [INFO] Running Job with command
              -v -c org.apache.flink.examples.java.clustering.util.KMeansDataGenerator  \
              /usr/local/share/peel/peel-wordcount/apps/KMeans.jar                                          \
              --points 10000                                                \
              --k 10                                                            \
              --output hdfs://loadgen112:8020//tmp/input/kmeans


16-09-29 16:36:30 [WARN] Job threw an unexpected exception: Data generation job did not finish successfully
16-09-29 16:36:32 [ERROR] Exception for experiment kmeans.flink in suite kmeans.default: Could not generate data for target 'hdfs://loadgen112:8020//tmp/input/kmeans/points'.
java.lang.RuntimeException: Data generation job did not finish successfully
    at org.peelframework.core.beans.system.Job.execute(Job.scala:68)
    at org.peelframework.core.beans.data.GeneratedDataSet.materialize(GeneratedDataSet.scala:42)
    at org.peelframework.core.cli.command.experiment.Run$$anonfun$run$3$$anonfun$apply$1$$anonfun$apply$6.apply(Run.scala:119)
    at org.peelframework.core.cli.command.experiment.Run$$anonfun$run$3$$anonfun$apply$1$$anonfun$apply$6.apply(Run.scala:117)
    at scala.collection.immutable.Set$Set2.foreach(Set.scala:94)
    at org.peelframework.core.cli.command.experiment.Run$$anonfun$run$3$$anonfun$apply$1.apply(Run.scala:117)
    at org.peelframework.core.cli.command.experiment.Run$$anonfun$run$3$$anonfun$apply$1.apply(Run.scala:94)
    at scala.Option.foreach(Option.scala:236)
    at org.peelframework.core.cli.command.experiment.Run$$anonfun$run$3.apply(Run.scala:94)
    at org.peelframework.core.cli.command.experiment.Run$$anonfun$run$3.apply(Run.scala:94)
    at scala.collection.immutable.Stream.foreach(Stream.scala:547)
    at org.peelframework.core.cli.command.experiment.Run.run(Run.scala:94)
    at org.peelframework.core.cli.Peel$.main(Peel.scala:112)
    at org.peelframework.core.cli.Peel.main(Peel.scala)

And this is from the log run.err:

------------------------------------------------------------
 The program finished with the following exception:

org.apache.flink.client.program.ProgramInvocationException: The main method caused an error.
    at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:520)
    at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403)
    at org.apache.flink.client.program.Client.runBlocking(Client.java:248)
    at org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:866)
    at org.apache.flink.client.CliFrontend.run(CliFrontend.java:333)
    at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1192)
    at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1243)
Caused by: java.io.FileNotFoundException: hdfs:/loadgen112:8020/tmp/input/kmeans/points (No such file or directory)
    at java.io.FileOutputStream.open0(Native Method)
    at java.io.FileOutputStream.open(FileOutputStream.java:270)
    at java.io.FileOutputStream.<init>(FileOutputStream.java:213)
    at java.io.FileOutputStream.<init>(FileOutputStream.java:162)
    at java.io.FileWriter.<init>(FileWriter.java:90)
    at org.apache.flink.examples.java.clustering.util.KMeansDataGenerator.main(KMeansDataGenerator.java:100)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:505)
    ... 6 more

Have you an idea what could be wrong?

Thanks a lot!

@aalexandrov
Copy link
Member

Try

--output hdfs:///loadgen112:8020//tmp/input/kmeans

(with an extra / in the beginning).

@aalexandrov
Copy link
Member

aalexandrov commented Sep 29, 2016

Actually, can you show me the Java / Scala code that parses the --output $PATH value?

@noproblem666
Copy link
Contributor Author

Sorry for the delay!

We use the KMeans benchmark and the KMeans data generator from the "official" flink examples on GitHub:

https://github.com/apache/flink/blob/d7b59d761601baba6765bb4fc407bcd9fd6a9387/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/clustering/KMeans.java

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

2 participants