From d2ecffc50092b7950ab9220a55601f5dd7d0777f Mon Sep 17 00:00:00 2001 From: "naren.s" Date: Thu, 3 Jan 2019 12:29:28 +0530 Subject: [PATCH 1/2] Minor corrections in vignettes --- ...s_for_working_Apache_Spark_Structured_Streaming.Rmd | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/vignettes/Streaming_pipelines_for_working_Apache_Spark_Structured_Streaming.Rmd b/vignettes/Streaming_pipelines_for_working_Apache_Spark_Structured_Streaming.Rmd index 6ae6efd..ff916b3 100644 --- a/vignettes/Streaming_pipelines_for_working_Apache_Spark_Structured_Streaming.Rmd +++ b/vignettes/Streaming_pipelines_for_working_Apache_Spark_Structured_Streaming.Rmd @@ -51,9 +51,9 @@ knitr::opts_chunk$set( library(analysisPipelines) library(SparkR) -## Define these variables as per the configuration of your machine. This is just an example. +## Define these variables as per the configuration of your machine. The below example is just illustrative. -sparkHome <- "/Users/naren/softwares/spark-2.3.1-bin-hadoop2.7/" +sparkHome <- "/path/to/spark/directory/" sparkMaster <- "local[1]" sparkPackages <- c("org.apache.spark:spark-sql-kafka-0-10_2.11:2.3.1") # Set spark home variable if not present @@ -81,10 +81,10 @@ This example illustrates usage of pipelines for a streaming application. In this Read streaming data from Kafka. ```{r} -## Define these variables as per the configuration of your machine. This is just an example. +## Define these variables as per the configuration of your machine. The below example is just illustrative. -kafkaBootstrapServers <- "172.25.0.144:9092,172.25.0.98:9092,172.25.0.137:9092" -consumerTopic <- "netlogo" +kafkaBootstrapServers <- "192.168.0.256:9092,192.168.0.257:9092,192.168.0.258:9092" +consumerTopic <- "topic1" streamObj <- read.stream(source = "kafka", kafka.bootstrap.servers = kafkaBootstrapServers, subscribe = consumerTopic, startingOffsets="earliest") printSchema(streamObj) ``` From a09e945080bc2b601196d6784c5fc81de23ce868 Mon Sep 17 00:00:00 2001 From: "naren.s" Date: Thu, 3 Jan 2019 12:53:08 +0530 Subject: [PATCH 2/2] More minor vignette corrections --- vignettes/Interoperable_Pipelines.Rmd | 2 +- ...ipelines_for_working_Apache_Spark_Structured_Streaming.Rmd | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/vignettes/Interoperable_Pipelines.Rmd b/vignettes/Interoperable_Pipelines.Rmd index 44c0313..3bfbee3 100644 --- a/vignettes/Interoperable_Pipelines.Rmd +++ b/vignettes/Interoperable_Pipelines.Rmd @@ -231,7 +231,7 @@ opWithFilter %>>% getOutputById(2) Finally, we show a case, where sequential filtering steps are performed in Spark, before visualizing in R, and running a decision tree model in Python. -Note, that in this case, we register `getTargetForPyClassifcation` and `getTargetForPyClassification` as *non-data* functions. In this particular pipeline, there is no main *path* as such, as the pipeline branches into 2 paths - one in R and the other in Python. In such cases, using `outAsIn` or the `dataFunction` parameter with formula semantics is just a **question of convenience**. If the first argument of a *non-data* function is of a data frame class in R, Python (Pandas) or Spark, the package automatically performs type conversions when environments are switched (R -> Spark, Spark -> Python, and so on). +Note, that in this case, `getTargetForPyClassifcation` and `getTargetForPyClassification` have been registered as *data* functions. Type conversions between R, Spark and Python for data functions are performed automatically by the package. ```{r} pipelineObj %>>% filterData_spark(condition = "Species == 'setosa' or Species == 'virginica'") %>>% diff --git a/vignettes/Streaming_pipelines_for_working_Apache_Spark_Structured_Streaming.Rmd b/vignettes/Streaming_pipelines_for_working_Apache_Spark_Structured_Streaming.Rmd index ff916b3..294c819 100644 --- a/vignettes/Streaming_pipelines_for_working_Apache_Spark_Structured_Streaming.Rmd +++ b/vignettes/Streaming_pipelines_for_working_Apache_Spark_Structured_Streaming.Rmd @@ -95,7 +95,7 @@ Users can define their own functions and use it as a part of the pipeline. These ```{r} -# Function to convert datatype json struct to colums +# Function to convert datatype json struct to columns convertStructToDf <- function(streamObj) { streamObj <- SparkR::select(streamObj,list(getField(streamObj$`jsontostructs(value)`,"bannerId"), getField(streamObj$`jsontostructs(value)`,"mobile"), @@ -131,7 +131,7 @@ castDfColumns <- function(streamObj) { return (streamObj) } -# Function to convert datatype json struct to colums +# Function to convert datatype json struct to columns convertDfToKafkaKeyValuePairs <- function (streamObj, kafkaKey) { streamObj <- SparkR::toJSON(streamObj) streamObj$key <- kafkaKey