Skip to content

Commit

Permalink
Merge pull request #96 from treasure-data/support_activity_type_filter
Browse files Browse the repository at this point in the history
Support activity type filter
  • Loading branch information
xuantuan58 authored Dec 3, 2019
2 parents 6477111 + e0bc352 commit 326d349
Show file tree
Hide file tree
Showing 12 changed files with 223 additions and 38 deletions.
5 changes: 4 additions & 1 deletion src/main/java/org/embulk/input/marketo/MarketoService.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import com.fasterxml.jackson.databind.node.ObjectNode;
import org.embulk.input.marketo.model.MarketoField;
import org.embulk.input.marketo.model.MarketoResponse;

import java.io.File;
import java.util.Date;
Expand All @@ -16,7 +17,7 @@ public interface MarketoService

File extractLead(Date startTime, Date endTime, List<String> extractedFields, String filterField, int pollingTimeIntervalSecond, int bulkJobTimeoutSecond);

File extractAllActivity(Date startTime, Date endTime, int pollingTimeIntervalSecond, int bulkJobTimeoutSecond);
File extractAllActivity(List<Integer> activityTypeIds, Date startTime, Date endTime, int pollingTimeIntervalSecond, int bulkJobTimeoutSecond);

Iterable<ObjectNode> getAllListLead(List<String> extractFields);

Expand All @@ -33,4 +34,6 @@ public interface MarketoService
Iterable<ObjectNode> getCustomObject(String customObjectAPIName, String customObjectFilterType, String customObjectFields, Integer fromValue, Integer toValue);

List<MarketoField> describeCustomObject(String customObjectAPIName);

Iterable<ObjectNode> getActivityTypes();
}
10 changes: 8 additions & 2 deletions src/main/java/org/embulk/input/marketo/MarketoServiceImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -84,9 +84,9 @@ private long saveExtractedFile(InputStream extractResult, File tempFile) throws
}

@Override
public File extractAllActivity(Date startTime, Date endTime, int pollingTimeIntervalSecond, int bulkJobTimeoutSecond)
public File extractAllActivity(List<Integer> activityTypeIds, Date startTime, Date endTime, int pollingTimeIntervalSecond, int bulkJobTimeoutSecond)
{
final String exportID = marketoRestClient.createActivityExtract(startTime, endTime);
final String exportID = marketoRestClient.createActivityExtract(activityTypeIds, startTime, endTime);
marketoRestClient.startActitvityBulkExtract(exportID);
try {
marketoRestClient.waitActitvityExportJobComplete(exportID, pollingTimeIntervalSecond, bulkJobTimeoutSecond);
Expand Down Expand Up @@ -236,4 +236,10 @@ public Iterable<ObjectNode> getCustomObject(String customObjectAPIName, String c
{
return marketoRestClient.getCustomObject(customObjectAPIName, customObjectFilterType, customObjectFields, fromValue, toValue);
}

@Override
public Iterable<ObjectNode> getActivityTypes()
{
return marketoRestClient.getActivityTypes();
}
}
Original file line number Diff line number Diff line change
@@ -1,45 +1,151 @@
package org.embulk.input.marketo.delegate;

import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Optional;
import org.apache.commons.lang3.StringUtils;
import org.embulk.base.restclient.ServiceResponseMapper;
import org.embulk.base.restclient.jackson.JacksonServiceResponseMapper;
import org.embulk.base.restclient.record.ValueLocator;
import org.embulk.config.Config;
import org.embulk.config.ConfigDefault;
import org.embulk.config.ConfigException;
import org.embulk.input.marketo.MarketoService;
import org.embulk.input.marketo.MarketoServiceImpl;
import org.embulk.input.marketo.MarketoUtils;
import org.embulk.spi.DataException;
import org.embulk.spi.Exec;
import org.embulk.input.marketo.rest.MarketoRestClient;
import org.embulk.spi.type.Types;
import org.joda.time.DateTime;
import org.slf4j.Logger;

import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.InputStream;
import java.text.MessageFormat;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;

/**
* Created by tai.khuu on 9/18/17.
*/
public class ActivityBulkExtractInputPlugin extends MarketoBaseBulkExtractInputPlugin<ActivityBulkExtractInputPlugin.PluginTask>
{
private static final Logger LOGGER = Exec.getLogger(ActivityBulkExtractInputPlugin.class);
public static final String INCREMENTAL_COLUMN = "activityDate";
public static final String UID_COLUMN = "marketoGUID";

public interface PluginTask extends MarketoBaseBulkExtractInputPlugin.PluginTask {}
public interface PluginTask extends MarketoBaseBulkExtractInputPlugin.PluginTask
{
@Config("activity_type_ids")
@ConfigDefault("[]")
List<String> getActivityTypeIds();

@ConfigDefault("[]")
List<Integer> getActTypeIds();

void setActTypeIds(List<Integer> activityIds);
}

@Override
public void validateInputTask(PluginTask task)
{
task.setIncrementalColumn(Optional.of(INCREMENTAL_COLUMN));
task.setUidColumn(Optional.of(UID_COLUMN));
if (!task.getActivityTypeIds().isEmpty()) {
List<Integer> activityIds = checkValidActivityTypeIds(task);

// check input with values from server
try (MarketoRestClient restClient = createMarketoRestClient(task)) {
MarketoService marketoService = new MarketoServiceImpl(restClient);
Iterable<ObjectNode> nodes = marketoService.getActivityTypes();
if (nodes != null) {
checkValidActivityTypeIds(nodes, activityIds);
}
// ignorable if unable to get activity type ids. If thing gone wrong, the bulk extract will throw errors
}

task.setActTypeIds(activityIds);
}
else {
task.setActTypeIds(new ArrayList<Integer>());
}
super.validateInputTask(task);
}

/**
* Check if user input activity_type_ids valid
* @param task
* @return values transformed to array of Integer
*/
private List<Integer> checkValidActivityTypeIds(PluginTask task)
{
List<String> invalidIds = new ArrayList<>();
for (String id : task.getActivityTypeIds()) {
if (StringUtils.isBlank(id) || !StringUtils.isNumeric(StringUtils.trimToEmpty(id))) {
invalidIds.add(id);
}
}

if (!invalidIds.isEmpty()) {
throw new ConfigException(MessageFormat.format("Invalid activity type id: [{0}]", StringUtils.join(invalidIds, ", ")));
}

// transform and set
List<Integer> activityIds = new ArrayList<>();
for (String id : task.getActivityTypeIds()) {
activityIds.add(Integer.valueOf(StringUtils.trimToEmpty(id)));
}

return activityIds;
}

@VisibleForTesting
protected void checkValidActivityTypeIds(Iterable<ObjectNode> nodes, List<Integer> activityIds)
{
Iterator<ObjectNode> it = nodes.iterator();

List<Integer> inputIds = new ArrayList<>(activityIds);

while (it.hasNext()) {
ObjectNode node = it.next();
int id = node.get("id").asInt(0);
if (id > 0) {
inputIds.remove(Integer.valueOf(id));
}
}

if (!inputIds.isEmpty()) {
throw new ConfigException(MessageFormat.format("Invalid activity type ids: [{0}], Available activity types: \n{1}",
StringUtils.join(inputIds, ", "),
buildActivityIdNameInfo(nodes)));
}

}

private String buildActivityIdNameInfo(Iterable<ObjectNode> nodes)
{
Iterator<ObjectNode> it = nodes.iterator();
StringBuilder messageBuilder = new StringBuilder();
while (it.hasNext()) {
ObjectNode node = it.next();
int id = node.get("id").asInt(0);
String name = node.get("name").asText("");
if (id > 0) {
messageBuilder.append("- activity id: ");
messageBuilder.append(String.valueOf(id));
messageBuilder.append(", name: ");
messageBuilder.append(name);
messageBuilder.append("\n");
}
}

return messageBuilder.toString();
}

@Override
protected InputStream getExtractedStream(MarketoService service, PluginTask task, DateTime fromDate, DateTime toDate)
{
try {
return new FileInputStream(service.extractAllActivity(fromDate.toDate(), toDate.toDate(), task.getPollingIntervalSecond(), task.getBulkJobTimeoutSecond()));
return new FileInputStream(service.extractAllActivity(task.getActTypeIds(), fromDate.toDate(), toDate.toDate(), task.getPollingIntervalSecond(), task.getBulkJobTimeoutSecond()));
}
catch (FileNotFoundException e) {
throw new RuntimeException("Exception when trying to extract activity", e);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
package org.embulk.input.marketo.model;

import org.embulk.input.marketo.model.filter.MarketoFilter;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand All @@ -16,7 +14,7 @@ public class MarketoBulkExtractRequest

private Map<String, String> columnHeaderNames;

private Map<String, MarketoFilter> filter = new HashMap<>();
private Map<String, Object> filter = new HashMap<>();

public List<String> getFields()
{
Expand Down Expand Up @@ -48,12 +46,12 @@ public void setColumnHeaderNames(Map<String, String> columnHeaderNames)
this.columnHeaderNames = columnHeaderNames;
}

public Map<String, MarketoFilter> getFilter()
public Map<String, Object> getFilter()
{
return filter;
}

public void setFilter(Map<String, MarketoFilter> filter)
public void setFilter(Map<String, Object> filter)
{
this.filter = filter;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
/**
* Created by tai.khuu on 8/27/17.
*/
public class DateRangeFilter implements MarketoFilter
public class DateRangeFilter
{
private String startAt;

Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ public enum MarketoRESTEndpoint
GET_CAMPAIGN("/rest/v1/campaigns.json"),
GET_PROGRAMS_BY_TAG("/rest/asset/v1/program/byTag.json"),
GET_CUSTOM_OBJECT("/rest/v1/customobjects/${api_name}.json"),
GET_CUSTOM_OBJECT_DESCRIBE("/rest/v1/customobjects/${api_name}/describe.json");

GET_CUSTOM_OBJECT_DESCRIBE("/rest/v1/customobjects/${api_name}/describe.json"),
GET_ACTIVITY_TYPES("/rest/v1/activities/types.json");
private String endpoint;

MarketoRESTEndpoint(String endpoint)
Expand Down
16 changes: 12 additions & 4 deletions src/main/java/org/embulk/input/marketo/rest/MarketoRestClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import org.embulk.input.marketo.model.MarketoField;
import org.embulk.input.marketo.model.MarketoResponse;
import org.embulk.input.marketo.model.filter.DateRangeFilter;
import org.embulk.input.marketo.model.filter.MarketoFilter;
import org.embulk.spi.DataException;
import org.embulk.spi.Exec;
import org.embulk.spi.type.Type;
Expand All @@ -38,6 +37,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

/**
* Created by tai.khuu on 8/22/17.
Expand Down Expand Up @@ -187,7 +187,7 @@ private MarketoBulkExtractRequest getMarketoBulkExtractRequest(Date startTime, D
marketoBulkExtractRequest.setFields(extractFields);
}
marketoBulkExtractRequest.setFormat("CSV");
Map<String, MarketoFilter> filterMap = new HashMap<>();
Map<String, Object> filterMap = new HashMap<>();
DateRangeFilter dateRangeFilter = new DateRangeFilter();
dateRangeFilter.setStartAt(timeFormat.format(startTime));
dateRangeFilter.setEndAt(timeFormat.format(endTime));
Expand All @@ -196,9 +196,12 @@ private MarketoBulkExtractRequest getMarketoBulkExtractRequest(Date startTime, D
return marketoBulkExtractRequest;
}

public String createActivityExtract(Date startTime, Date endTime)
public String createActivityExtract(List<Integer> activityTypeIds, Date startTime, Date endTime)
{
MarketoBulkExtractRequest marketoBulkExtractRequest = getMarketoBulkExtractRequest(startTime, endTime, null, "createdAt");
if (activityTypeIds != null && !activityTypeIds.isEmpty()) {
marketoBulkExtractRequest.getFilter().put("activityTypeIds", activityTypeIds);
}
return sendCreateBulkExtractRequest(marketoBulkExtractRequest, MarketoRESTEndpoint.CREATE_ACTIVITY_EXTRACT);
}

Expand Down Expand Up @@ -442,7 +445,7 @@ public Iterable<ObjectNode> getProgramsByDateRange(Date earliestUpdatedAt, Date
// put filter params if exist.
if (filterType != null) {
multimap.put("filterType", filterType);
multimap.put("filterValues", String.join(",", filterValues));
multimap.put("filterValues", StringUtils.join(filterValues, ","));
}
return getRecordWithOffsetPagination(endPoint + MarketoRESTEndpoint.GET_PROGRAMS.getEndpoint(), multimap, ObjectNode.class);
}
Expand Down Expand Up @@ -534,4 +537,9 @@ public Iterable<ObjectNode> getCustomObject(String customObjectAPIName, String c
{
return getCustomObjectRecordWithPagination(endPoint + MarketoRESTEndpoint.GET_CUSTOM_OBJECT.getEndpoint(new ImmutableMap.Builder().put("api_name", customObjectAPIName).build()), customObjectFilterType, customObjectFields, fromValue, toValue, ObjectNode.class);
}

public Iterable<ObjectNode> getActivityTypes()
{
return getRecordWithOffsetPagination(endPoint + MarketoRESTEndpoint.GET_ACTIVITY_TYPES.getEndpoint(), new ImmutableListMultimap.Builder<String, String>().put(MAX_RETURN, DEFAULT_MAX_RETURN).build(), ObjectNode.class);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import java.util.Iterator;
import java.util.List;

import static org.mockito.ArgumentMatchers.any;

/**
* Created by tai.khuu on 10/9/17.
*/
Expand All @@ -45,12 +47,12 @@ public void extractLead() throws Exception
{
Date startDate = new Date(1507223374000L);
Date endDate = new Date(1507655374000L);
List<String> extractedFields = Arrays.asList("field1", "field2");
List<String> extractedFields = Arrays.asList("field1", "fActivityBulkExtractInputPluginTest.java:78ield2");
String filerField = "field1";
String exportId = "exportId";
Mockito.when(mockMarketoRestClient.createLeadBulkExtract(Mockito.eq(startDate), Mockito.eq(endDate), Mockito.eq(extractedFields), Mockito.eq(filerField))).thenReturn(exportId);
ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream("Test File Content".getBytes());
Mockito.when(mockMarketoRestClient.getLeadBulkExtractResult(Mockito.eq(exportId), Mockito.any(BulkExtractRangeHeader.class))).thenReturn(byteArrayInputStream);
Mockito.when(mockMarketoRestClient.getLeadBulkExtractResult(Mockito.eq(exportId), any(BulkExtractRangeHeader.class))).thenReturn(byteArrayInputStream);
File file = marketoService.extractLead(startDate, endDate, extractedFields, filerField, 1, 3);
Assert.assertEquals("Test File Content", new String(ByteStreams.toByteArray(new FileInputStream(file))));
Mockito.verify(mockMarketoRestClient, Mockito.times(1)).startLeadBulkExtract(Mockito.eq(exportId));
Expand All @@ -62,11 +64,12 @@ public void extractAllActivity() throws Exception
{
Date startDate = new Date(1507223374000L);
Date endDate = new Date(1507655374000L);
List<Integer> activityTypeIds = new ArrayList<>();
String exportId = "exportId";
Mockito.when(mockMarketoRestClient.createActivityExtract(Mockito.eq(startDate), Mockito.eq(endDate))).thenReturn(exportId);
Mockito.when(mockMarketoRestClient.createActivityExtract(any(List.class), Mockito.eq(startDate), Mockito.eq(endDate))).thenReturn(exportId);
ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream("Test File Content".getBytes());
Mockito.when(mockMarketoRestClient.getActivitiesBulkExtractResult(Mockito.eq(exportId), Mockito.any(BulkExtractRangeHeader.class))).thenReturn(byteArrayInputStream);
File file = marketoService.extractAllActivity(startDate, endDate, 1, 3);
Mockito.when(mockMarketoRestClient.getActivitiesBulkExtractResult(Mockito.eq(exportId), any(BulkExtractRangeHeader.class))).thenReturn(byteArrayInputStream);
File file = marketoService.extractAllActivity(activityTypeIds, startDate, endDate, 1, 3);
Assert.assertEquals("Test File Content", new String(ByteStreams.toByteArray(new FileInputStream(file))));
Mockito.verify(mockMarketoRestClient, Mockito.times(1)).startActitvityBulkExtract(Mockito.eq(exportId));
Mockito.verify(mockMarketoRestClient, Mockito.times(1)).waitActitvityExportJobComplete(Mockito.eq(exportId), Mockito.eq(1), Mockito.eq(3));
Expand Down
Loading

0 comments on commit 326d349

Please sign in to comment.