Skip to content

Commit a8d3da9

Browse files
committed
Merge branch 'dev-rpackage' into 'master'
v0.0.0.9003 See merge request eoc_foundation_wip/analysis-pipelines!9
2 parents c86d3b6 + 18a16a5 commit a8d3da9

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

53 files changed

+1612
-268
lines changed

.gitignore

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
metastore_db/
66
.DS_Store
77
vignettes/metastore_db/
8-
vignettes/pipeline.RDS
8+
vignettes/*.RDS
99
vignettes/*.out
1010
vignettes/*.R
1111
vignettes/*.html

DESCRIPTION

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,18 @@
11
Package: analysisPipelines
22
Type: Package
33
Title: Compose interoperable analysis pipelines, and put them into production
4-
Version: 0.0.0.9002
4+
Version: 0.0.0.9003
55
Authors@R: c(
6-
person("Naren","Srinivasan", email = "Naren.Srinivasan@mu-sigma.com", role = c("cre","aut")),
6+
person("Naren","Srinivasan", email = "Naren.Srinivasan@mu-sigma.com", role = c("aut")),
7+
person("Zubin Dowlaty","", email = "Zubin.Dowlaty@mu-sigma.com", role = c("ctb")),
78
person("Sanjay","", email = "Sanjay@mu-sigma.com", role = c("ctb")),
89
person("Neeratyoy","Mallik", email = "Neeratyoy.Mallik@mu-sigma.com", role = c("ctb")),
9-
person("Anoop S","", email = "Anoop.S@mu-sigma.com", role = c("ctb"))
10+
person("Anoop S","", email = "Anoop.S@mu-sigma.com", role = c("ctb")),
11+
person("Mu Sigma, Inc.", email = "ird.experiencelab@mu-sigma.com", role = c("cre"))
1012
)
11-
Description: The package aims at enabling data scientists to compose pipelines of analysis which consist of data manipulation, exploratory analysis & reporting, as well as modeling steps. It also aims to enable data scientists to use tools of their choice through an R interface, and compose interoperable pipelines between R, Spark, and Python.
13+
Description: The package aims at enabling data scientists to compose pipelines of analysis which consist of data manipulation, exploratory analysis & reporting, as well as modeling steps. It also aims to enable data scientists to use tools of their choice through an R interface, and compose interoperable pipelines between R, Spark, and Python. Credits to Mu Sigma for supporting the development of the package.
1214
Depends: R (>= 3.4.0), tibble, magrittr, data.table, pipeR, devtools
13-
Imports: ggplot2, dplyr, futile.logger, RCurl
15+
Imports: ggplot2, dplyr, futile.logger, RCurl, proto
1416
Suggests: plotly, knitr, rmarkdown, SparkR, parallel, visNetwork, rjson, DT, shiny
1517
Remotes: github::cran/SparkR
1618
Encoding: UTF-8
@@ -23,6 +25,7 @@ Collate:
2325
'analysisPipelines_package.R'
2426
'core-functions.R'
2527
'core-functions-batch.R'
28+
'core-functions-meta-pipelines.R'
2629
'core-streaming-functions.R'
2730
'r-batch-eda-utilities.R'
2831
'spark-structured-streaming-utilities.R'

NAMESPACE

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,28 +3,35 @@
33
export(AnalysisPipeline)
44
export(BaseAnalysisPipeline)
55
export(CheckColumnType)
6+
export(MetaAnalysisPipeline)
67
export(StreamingAnalysisPipeline)
78
export(assessEngineSetUp)
89
export(bivarPlots)
910
export(castKafkaStreamAsString)
1011
export(convertKafkaValueFromJson)
1112
export(correlationMatPlot)
13+
export(createPipelineInstance)
14+
export(exportAsMetaPipeline)
1215
export(generateReport)
1316
export(genericPipelineException)
1417
export(getDatatype)
1518
export(getInput)
1619
export(getLoggerDetails)
1720
export(getOutputById)
1821
export(getPipeline)
22+
export(getPipelinePrototype)
1923
export(getRegistry)
2024
export(ignoreCols)
25+
export(loadMetaPipeline)
2126
export(loadPipeline)
2227
export(loadPredefinedFunctionRegistry)
28+
export(loadRegistry)
2329
export(multiVarOutlierPlot)
2430
export(outlierPlot)
2531
export(prepExecution)
2632
export(registerFunction)
2733
export(savePipeline)
34+
export(saveRegistry)
2835
export(setInput)
2936
export(setLoggerDetails)
3037
export(sparkRSessionCreateIfNotPresent)
@@ -33,6 +40,7 @@ export(updateObject)
3340
export(visualizePipeline)
3441
exportClasses(AnalysisPipeline)
3542
exportClasses(BaseAnalysisPipeline)
43+
exportClasses(MetaAnalysisPipeline)
3644
exportClasses(StreamingAnalysisPipeline)
3745
exportMethods(checkSchemaMatch)
3846
exportMethods(generateOutput)

R/core-functions-batch.R

Lines changed: 32 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -54,14 +54,6 @@ setMethod(
5454
## Calling the parent constructor
5555
.Object <- methods::callNextMethod(.Object, ...)
5656

57-
# for(rowNo in 1:nrow(batchPredefFunctions)){
58-
# .Object %>>% registerFunction(functionName = batchPredefFunctions[['functionName']][[rowNo]],
59-
# heading = batchPredefFunctions[['heading']][[rowNo]],
60-
# # batchPredefFunctions[['outAsIn']][[rowNo]],
61-
# engine = batchPredefFunctions[['engine']][[rowNo]],
62-
# exceptionFunction = batchPredefFunctions[['exceptionHandlingFunction']][[rowNo]],
63-
# userDefined = F, loadPipeline = F ) -> .Object
64-
# }
6557
return(.Object)
6658

6759
},error = function(e){
@@ -242,6 +234,7 @@ checkSchema <- function(dfOld, dfNew){
242234
startPipelineExecution <- Sys.time()
243235
futile.logger::flog.info("|| Pipeline Execution STARTED ||" , name='logger.execution')
244236

237+
maxEngineName <- "r"
245238
outputCache <- .getCache()
246239

247240
topOrder <- object@pipelineExecutor$topologicalOrdering
@@ -255,9 +248,10 @@ checkSchema <- function(dfOld, dfNew){
255248
engineCount %>>% dplyr::filter(numOp == max(numOp)) -> maxEngine
256249

257250

258-
maxEngineName <- "r"
259251
if(nrow(maxEngine) == 1){
260252
maxEngineName <- maxEngine$engine
253+
}else{
254+
maxEngineName <- maxEngine$engine[1]
261255
}
262256

263257
inputToExecute <- object@input
@@ -400,17 +394,21 @@ checkSchema <- function(dfOld, dfNew){
400394
# Set parameters
401395

402396
params <- unlist(funcDetails$parameters, recursive = F)
403-
dep <- unlist(funcDetails$dependencies, recursive = F)
397+
dep <- unique(unlist(funcDetails$dependencies, recursive = F))
404398
depTerms <- paste0("f", dep)
405399

406400
params <- lapply(params, function(p, depTerms, outputCache){
407401
if(class(p) == "formula"){
408-
formulaTerm <- attr(terms(p), "term.label")
409-
if(length(formulaTerm) == 1 && formulaTerm %in% depTerms){
410-
411-
## Formula of previous function in pipeline
412-
actualParamObjectName <- paste0(formulaTerm, ".out")
413-
p <- get(actualParamObjectName, envir = outputCache)
402+
isDepParam <- analysisPipelines:::isDependencyParam(p)
403+
if(isDepParam){
404+
formulaTerm <- analysisPipelines:::getTerm(p)
405+
argName <- analysisPipelines:::getResponse(p)
406+
if(formulaTerm %in% depTerms){
407+
408+
## Formula of previous function in pipeline
409+
actualParamObjectName <- paste0(formulaTerm, ".out")
410+
p <- get(actualParamObjectName, envir = outputCache)
411+
}
414412
}
415413
}
416414

@@ -421,10 +419,27 @@ checkSchema <- function(dfOld, dfNew){
421419
#Call
422420

423421
#Assign as named parameters
422+
#Get names of params
423+
# paramNames <- lapply(params, function(p){
424+
# return(names(p))
425+
# }) %>>% unlist
426+
# params <-lapply(params, function(p){
427+
# names(p) <- NULL
428+
# return(p)
429+
# })
430+
# names(params) <- paramNames
424431
args <- params
425432
if(funcDetails$isDataFunction){
426-
args <- append(list(inputToExecute), params)
433+
formals(funcDetails$operation) %>>% as.list %>>% names %>>% dplyr::first() -> firstArgName
434+
firstArg <- list(inputToExecute)
435+
names(firstArg) <- firstArgName
436+
args <- append(firstArg, params)
427437
}
438+
# }else{
439+
# firstParam <- params[1]
440+
# names(firstParam) <- "object"
441+
# args <- append(firstParam, params[-1])
442+
# }
428443
output <- tryCatch({do.call(what = funcDetails$operation,
429444
args = args)},
430445
error = function(e){

0 commit comments

Comments
 (0)