Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 21 additions & 3 deletions aggregate_tables.R
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
library(dplyr)
library(DBI)
library(rjson)
library(parallel)

source('s_dplyr.R')
source('aggregate_tables/indicator_functions.R')
Expand All @@ -17,6 +18,11 @@ get_db_connection <- function(system_config_path='config_system.json') {
return(db)
}

get_cores <- function(system_config_path='config_system.json') {
config <- fromJSON(file=system_config_path)[['data_platform']]
if ('parallel' %in% names(config)) {return(config[['parallel']][['cores']])} else {return (1)}
}

drop_tables <- function(file) {
config <- fromJSON(file=file)

Expand All @@ -32,26 +38,38 @@ write_tables <- function(file, debug) {
db <- get_db_connection()
for (table.info in config) {
print(paste('Computing indicators for ', table.info$table, 'indicator table.'))
df <- compute_indicators(table.info, db, debug)
df <- compute_indicators(table.info, debug)
print(paste('Writing ', table.info$table, 'indicator table.'))
dbRemoveTable(db$con, name=table.info$table)
copy_to(db, df=df, name=table.info$table, temporary=FALSE)
}
}

compute_indicators <- function(info, db, debug) {
compute_indicators <- function(info, debug) {
debug <- as.logical(debug)
if (debug == T) {limit = 5000} else {limit = -1}
dfs <- lapply(info$components, function(component) {
cores <- get_cores()
cl <- makeCluster(cores, outfile='/tmp/aggregation.log', type="FORK")
clusterExport(cl,varlist=c('info','limit'),envir=environment())
clusterExport(cl,varlist=c('get_data_source','s_group_by','aggregate','get_db_connection','fromJSON'))

dfs <- parLapply(cl,info$components, function(component) {
print(paste('Getting data source ', component$table))
source('s_dplyr.R')
source('aggregate_tables/indicator_functions.R')
source('data_sources.R')
db <- get_db_connection()
source.data <- get_data_source(db, component$table, limit)
group.by.str <- paste(info$by, collapse=', ')
print(paste('Grouping and aggregating', component$table))
df <- source.data %.% s_group_by(group.by.str) %.% aggregate(component$columns)
print(paste('Returning aggregated data from ', component$table))
return(df)
})

print('merging...')
merged <- Reduce(function(...) merge(..., all.x=TRUE, all.y=TRUE, by=info$by), dfs)
stopCluster(cl)
return(merged)
}

Expand Down
8 changes: 4 additions & 4 deletions aggregate_tables.json
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
"active_month_percent":"active_month_percent",
"active_days":"active_days",
"active_day_percent":"active_day_percent",
"nforms":"nforms",
"median_visit_duration":"median_visit_duration",
"median_visits_per_day":"median_visits_per_day",
"median_visits_per_month":"median_visits_per_month",
Expand Down Expand Up @@ -49,7 +48,8 @@
{
"table":"device_type",
"columns":{
"summary_device_type":"summary_device_type"
"summary_device_type":"summary_device_type",
"nforms": "nforms"
}
},
{
Expand Down Expand Up @@ -79,7 +79,6 @@
"nvisits":"nvisits",
"active_days":"active_days",
"active_day_percent":"active_day_percent",
"nforms":"nforms",
"median_visit_duration":"median_visit_duration",
"median_visits_per_day":"median_visits_per_day",
"morning":"morning",
Expand Down Expand Up @@ -107,7 +106,8 @@
{
"table":"device_type",
"columns":{
"summary_device_type":"summary_device_type"
"summary_device_type":"summary_device_type",
"nforms": "nforms"
}
},
{
Expand Down
18 changes: 9 additions & 9 deletions aggregate_tables/indicator_functions.R
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,8 @@ library(zoo)
date_first_visit <- function(x) min(x$visit_date, na.rm=TRUE)
date_last_visit <- function(x) max(x$visit_date, na.rm=TRUE)
active_days <- function(x) length(unique(x$visit_date))
active_day_percent <- function(x) (active_days(x) / days_in_month(date_first_visit(x)))*100
active_day_percent <- function(x) (active_days(x) / days_in_month(date_first_visit(x)))*100 # post-processing
nvisits <- function(x) NROW(x)
nforms <- function(x) sum(x$total_forms, na.rm=TRUE)
median_visit_duration <- function(x) round(as.numeric(median((x$time_end - x$time_start)/ 60, na.rm=TRUE)), digits = 1)
time_using_cc <- function(x) sum(x$form_duration, na.rm = T)
median_visits_per_day <- function(x) median(as.numeric(table(x$visit_date)), na.rm=TRUE)
Expand All @@ -20,7 +19,7 @@ travel_batch_percent <- function(x) {
travel_vis <- nvisits_travel(x)
batch_vis <- nvisits_travel_batch(x)
if (travel_vis==0) return (0) else return ((batch_vis/travel_vis )*100)
}
} # post-processing

# Proportion of visits by time of day
morning <- function(x) mean(x$visit_time == 'morning')*100
Expand All @@ -40,20 +39,20 @@ numeric_index <- function (x) {

total_months <- length(seq(from=start_month, to=this_month, by='month'))
return (total_months)
}
} # post-processing

## The next indicators are only applicable for the lifetime table.
days_on_cc <- function(x) as.numeric(date_last_visit(x) - date_first_visit(x)) + 1
days_visit_last <- function(x) as.numeric(Sys.Date() - date_last_visit(x))
active_user <- function(x) ifelse(days_visit_last(x) <= 30, 1, 0)
days_on_cc <- function(x) as.numeric(date_last_visit(x) - date_first_visit(x)) + 1 # post-processing
days_visit_last <- function(x) as.numeric(Sys.Date() - date_last_visit(x)) # post-processing
active_user <- function(x) ifelse(days_visit_last(x) <= 30, 1, 0) # post-processing
calendar_month_on_cc <- function(x) {
first.month <- as.yearmon(date_first_visit(x))
last.month <- as.yearmon(date_last_visit(x))
nmonths <- 12 * as.numeric(last.month - first.month) + 1
return(nmonths)
}
} # post-processing
active_months <- function(x) length(unique(as.yearmon(x$visit_date)))
active_month_percent <- function(x) active_months(x) / calendar_month_on_cc(x)
active_month_percent <- function(x) active_months(x) / calendar_month_on_cc(x) # post-processing
median_visits_per_month <- function(x) median(as.numeric(table(as.yearmon(x$visit_date))), na.rm=TRUE)

# INTERACTION TABLE INDICATORS:
Expand Down Expand Up @@ -85,6 +84,7 @@ summary_device_type <- function (x) {
return ('Multi')
}
}
nforms <- function (x) NROW(x)

# DEVICE LOG TABLE INDICATORS:
total_logs <-function(x) sum(x$num_logs)
Expand Down
98 changes: 98 additions & 0 deletions analysis_scripts/mchen/blog_5/data_prep_no_gap.R
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
library(plyr)
library(dplyr)
library(zoo)

# load data
data = tbl_df(read.csv("blog.csv", stringsAsFactors=FALSE))
data$calendar_month = as.Date(data$calendar_month, "%m/%d/%y")

# subset columns of interest
all = select(data, domain_numeric, user_pk, active_days, ncases_touched, calendar_month)
all = filter(all, calendar_month >= as.Date("2010-01-01")) %>%
filter(., calendar_month <= as.Date("2014-12-01")) # reformat date

# flagship project in India (pre-rebalancing)
crs = filter(all, domain_numeric == 40)

# total active months per user
m = all %>%
group_by(domain_numeric, user_pk) %>% # duplicated user_id exists across domains
summarise(nmonths = n_distinct(calendar_month))

# excluding first N months of data (depends on reporting needs)
excdMonth = function(data, n) {
data = data %>%
group_by(domain_numeric, user_pk) %>%
filter(., row_number() > n)
return(data)
}


# Rebalancing data: no domain contribute more than 10% to the data
# total active months per domain
totalMonths = function(x) {
y = tbl_df(as.data.frame(table(x$domain_numeric)))
y = arrange(y, desc(Freq))
names(y) = c("domain_numeric", "months")
y$pct = y$months/sum(y$months)
return(y)
}
# min months to be dropped (or added) on each domain to get to 10% contribution
drop = function(x, N) {
x$bal = (x$months - sum(x$months)*N)/(1-N)
return(x)
}

rebalanceData = function(data, N) {
ov_month_data = totalMonths(data)
K = drop(ov_month_data, N)
dms = filter(K, bal > 0)
keepVal = dms$months - ceiling(dms$bal)
rebal_dm = as.numeric(as.character(dms$domain_numeric))
sdata = filter(data, domain_numeric %in% rebal_dm)
ndata = filter(data, !(domain_numeric %in% rebal_dm))
sp = split(sdata, sdata$domain_numeric)
for (i in seq_len(length(sp))) {
sp[[i]] = sp[[i]][sample(1:nrow(sp[[i]]), keepVal[i], FALSE),]
}
out = tbl_df(do.call(rbind, sp))
rebalanced_data = tbl_df(rbind(out, ndata))
return(rebalanced_data)
}

# subset 1: all data rebalanced
rbl_all = rebalanceData(all, 0.10)
p1 = excdMonth(rbl_all, 3)

# subset 2: all data excluding first 6 months
p2 = excdMonth(all, 6)
# reblanaced
rbl_p2 = rebalanceData(p2, 0.10)

# subset 3: all data (unbalanced) including only users that have at least 18 months of usage
# rebalanced
u = select(filter(m, nmonths >= 18), user_pk)
qdata = filter(all, user_pk %in% u$user_pk) # exclude users active for < 18 months

# indexing each user-month
qdata = arrange(qdata, domain_numeric, user_pk, calendar_month)
qdata = qdata %>%
group_by(domain_numeric, user_pk) %>%
mutate(user_month_index = seq(n()))

# Bin user months to custom quarters
qdata$user_quarter_index = round_any(qdata$user_month_index, 3, ceiling)/3
# Rebalance data from quarterly data (only for Q1-Q6)
qdata_sub = filter(qdata, user_quarter_index <= 6)
qdata_split = split(qdata_sub, qdata_sub$user_quarter_index, drop=TRUE)
rbl_qdata_split = lapply(qdata_split, function(x) rebalanceData(x, 0.10))
rbl_qdata = do.call(rbind, rbl_qdata_split)

# subset 4: 12 most active domains in terms of total user-months
bb = m %>%
group_by(domain_numeric) %>%
summarise(um = sum(nmonths)) %>%
arrange(., desc(um)) %>%
top_n(12)

bb_data = filter(rbl_all, domain_numeric %in% bb$domain_numeric)
78 changes: 78 additions & 0 deletions analysis_scripts/mchen/blog_5/data_prep_with_gap.R
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
###################################
# ADD GAP MONTHS BACK TO THE GAME #
###################################

library(Hmisc) # monthDays() to get the exact number of days in a specific month

# for each user, generate a vector of months according to the distance variable
expandMonths = function(data) {
months = seq(from = data$first_active_month, to = data$last_active_month,
by = "months") # length.out is adding extra month for some reason
noGap = data.frame(rep(data$user_pk,length(months)), months)
return(noGap)
}

# for each user, get distance between first and last active month
gapBack = function(data){
data = arrange(data, domain_numeric, user_pk, calendar_month)
uf = tbl_df(ddply(data, .(domain_numeric, user_pk), function(x)head(x,n=1)))
ul = tbl_df(ddply(data, .(domain_numeric, user_pk), function(x)tail(x,n=1)))
colnames(uf)[5] = c("first_active_month")
colnames(ul)[5] = c("last_active_month")
uf = select(uf, domain_numeric, user_pk, first_active_month)
ul = select(ul, domain_numeric, user_pk, last_active_month)
ufl = inner_join(uf, ul)
ufl$distance = 12*as.numeric(as.yearmon(ufl$last_active_month) -
as.yearmon(ufl$first_active_month)) + 1
expanded = tbl_df(ddply(ufl, .(domain_numeric, user_pk), function(x) expandMonths(x)))
expanded = select(expanded, domain_numeric, user_pk, months)
names(expanded) = c("domain_numeric", "user_pk", "calendar_month")
return(expanded)
}

# join the expanded data frame with all monthly data
all_expanded = gapBack(all)
all_expanded = left_join(expanded, all)
all_expanded[is.na(all_expanded)] <- 0

# for all data excluding 3 months, add back gap months
p1$percent_active_days = p1$active_days/monthDays(p1$calendar_month)
p1_expanded = gapBack(p1)
p1_expanded = left_join(p1_expanded, p1)
p1_expanded[is.na(p1_expanded)] <- 0

# for all data excluding 6 months, add back gap months
p2$percent_active_days = p2$active_days/monthDays(p2$calendar_month)
p2_expanded = gapBack(p2)
p2_expanded = left_join(p2_expanded, p2)
p2_expanded[is.na(p2_expanded)] <- 0

# for 12 big projects, add back gap months
bb_data$percent_active_days = bb_data$active_days/monthDays(bb_data$calendar_month)
bb_list = split(bb_data, bb_data$domain_numeric)
bb_data_expanded = gapBack(bb_data)
bb_data_expanded = left_join(bb_data_expanded, bb_data)
bb_data_expanded[is.na(bb_data_expanded)] <- 0

# for quarterly data, add back gap months
# adding back gap months means user_quarter_index needs to be recomputed
qdata$percent_active_days = qdata$active_days/monthDays(qdata$calendar_month)
qdata_expanded = gapBack(qdata)
qdata_expanded = left_join(qdata_expanded, qdata)
qdata_expanded[is.na(qdata_expanded)] <- 0

# recompute quarterly index after adding gap months
# reindex user_month_index
qdata_expanded =
qdata_expanded %>%
group_by(domain_numeric, user_pk) %>%
mutate(umi = seq_len(n()))
qdata_expanded$user_month_index <- NULL

# Bin user months to custom quarters
bins = c(1+(3*(0:ceiling(max(qdata_expanded$umi)/3))))
labs = paste("Month ", 3*seq(length(bins)-1)-2, "-", 3*seq(length(bins)-1), sep = "")

qdata_expanded = qdata_expanded %>%
group_by(domain_numeric, user_pk) %>%
mutate(uqi = cut(umi, breaks = bins, labels = labs, right = FALSE))
Loading