diff --git a/.github/workflows/gem-push.yml b/.github/workflows/gem-push.yml new file mode 100644 index 0000000..f7bb584 --- /dev/null +++ b/.github/workflows/gem-push.yml @@ -0,0 +1,27 @@ +name: Ruby Gem + +on: + workflow_dispatch: + push: + tags: + - '*' + +jobs: + build: + name: Build + Publish + runs-on: ubuntu-latest + permissions: + packages: write + contents: read + steps: + - uses: actions/checkout@v2 + - name: Set up Ruby 2.7 + uses: ruby/setup-ruby@v1 + with: + ruby-version: 2.7 + - name: push gem + uses: trocco-io/push-gem-to-gpr-action@v1 + with: + language: java + gem-path: "./build/gems/*.gem" + github-token: "${{ secrets.GITHUB_TOKEN }}" diff --git a/.gitignore b/.gitignore index 96bb2a0..1c3a4c4 100644 --- a/.gitignore +++ b/.gitignore @@ -11,4 +11,5 @@ build/ /.metadata/ .classpath .project - +bin/ +example.yml diff --git a/README.md b/README.md index fabd51f..6af5110 100644 --- a/README.md +++ b/README.md @@ -14,6 +14,9 @@ embulk-input-marketo is the gem preparing Embulk input plugins for [Marketo](htt - Campaign(campaign) - Assets Programs (program) - Program Members (program_members) +- List (list) +- Activity Type (activity_type) +- Assets Folders (folder) This plugin uses Marketo REST API. @@ -199,6 +202,50 @@ Get Members by Program Ids or All Program. |---------------------|----------|---------------|-----------------------------------------------------------------------------------------------------------------------| | **program_ids** | false | null | Import Members by specified Program_ID (comma-separated). If not specified will import all Members by all Program IDs | +### List + +List extract all list data from Marketo + +`target: list` + +Schema type: Static schema + +Incremental support: no + +Range ingestion: no + +### Activity Type + +Activity Type extract all activity type data from Marketo + +`target: activity_type` + +Schema type: Static schema + +Incremental support: no + +Range ingestion: no + +### Assets folders + +Get child folders from within a specified root folder or all folders if no root folder is specified. + +`target: folder` + +Configuration: + +| name | required | default value | description | +|---------------------|----------|---------------|-----------------------------------------------------------------------------------------------------------------------| +| **root_id** | false | null | Parent folder id | +| **root_type** | false | folder | Parent folder type, supported values `folder`, `program` | +| **max_depth** | false | 2 | Maximum folder depth to traverse | +| **workspace** | false | null | Name of the workspace | + +Schema type: Static schema + +Incremental support: no + +Range ingestion: no ## Example diff --git a/build.gradle b/build.gradle index 85930f6..ea6ee48 100644 --- a/build.gradle +++ b/build.gradle @@ -5,6 +5,7 @@ plugins { id "jacoco" id "signing" id "org.embulk.embulk-plugins" version "0.5.5" + id "com.palantir.git-version" version "0.12.3" } repositories { @@ -13,7 +14,20 @@ repositories { group = "com.treasuredata.embulk.plugins" description = "Loads records from Marketo." -version = "0.6.26" +version = { + def baseVersion = "0.6.26" + def troccoVersion = "0.1.1" + def tag = "${baseVersion}-trocco-${troccoVersion}" + def vd = versionDetails() + if (vd.lastTag != "${tag}") { + logger.warn "lastTag '${vd.lastTag}' is not '${tag}'" + } + if (vd.commitDistance == 0 && vd.lastTag ==~ /^[0-9]+\.[0-9]+\.[0-9]+([.-][.a-zA-Z0-9-]+)?/) { + vd.lastTag + } else { + "0.0.0.${vd.gitHash}" + } +}() sourceCompatibility = 1.8 targetCompatibility = 1.8 @@ -60,6 +74,11 @@ dependencies { implementation "com.sun.xml.bind:jaxb-impl:2.2.11" implementation "javax.activation:activation:1.1.1" + compile 'com.google.guava:guava:18.0' + compile "com.google.code.findbugs:annotations:3.0.1" + compile 'org.apache.commons:commons-lang3:3.4' + compile 'org.apache.commons:commons-csv:1.8' + testImplementation "junit:junit:4.+" testImplementation "org.embulk:embulk-core:$embulkVersion:tests" testImplementation "org.embulk:embulk-junit4:$embulkVersion" diff --git a/gradle/dependency-locks/compileClasspath.lockfile b/gradle/dependency-locks/compileClasspath.lockfile index faf071a..21d3a22 100644 --- a/gradle/dependency-locks/compileClasspath.lockfile +++ b/gradle/dependency-locks/compileClasspath.lockfile @@ -17,6 +17,7 @@ javax.xml.bind:jaxb-api:2.2.11 net.jcip:jcip-annotations:1.0 org.apache.bval:bval-core:0.5 org.apache.bval:bval-jsr303:0.5 +org.apache.commons:commons-csv:1.8 org.apache.commons:commons-lang3:3.12.0 org.eclipse.jetty:jetty-client:9.4.51.v20230217 org.eclipse.jetty:jetty-http:9.4.51.v20230217 diff --git a/gradle/dependency-locks/runtimeClasspath.lockfile b/gradle/dependency-locks/runtimeClasspath.lockfile index 56198e7..09a9595 100644 --- a/gradle/dependency-locks/runtimeClasspath.lockfile +++ b/gradle/dependency-locks/runtimeClasspath.lockfile @@ -17,6 +17,7 @@ javax.xml.bind:jaxb-api:2.2.11 net.jcip:jcip-annotations:1.0 org.apache.bval:bval-core:0.5 org.apache.bval:bval-jsr303:0.5 +org.apache.commons:commons-csv:1.8 org.apache.commons:commons-lang3:3.12.0 org.eclipse.jetty:jetty-client:9.4.51.v20230217 org.eclipse.jetty:jetty-http:9.4.51.v20230217 diff --git a/src/main/java/org/embulk/input/marketo/CsvTokenizer.java b/src/main/java/org/embulk/input/marketo/CsvTokenizer.java index fac5a20..72d8bd4 100644 --- a/src/main/java/org/embulk/input/marketo/CsvTokenizer.java +++ b/src/main/java/org/embulk/input/marketo/CsvTokenizer.java @@ -3,6 +3,8 @@ import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonValue; import com.google.common.base.Preconditions; +import org.apache.commons.csv.CSVFormat; +import org.apache.commons.csv.CSVParser; import org.embulk.config.ConfigException; import org.embulk.spi.DataException; import org.embulk.util.config.Config; @@ -10,11 +12,19 @@ import org.embulk.util.config.Task; import org.embulk.util.text.LineDecoder; import org.embulk.util.text.Newline; +import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.BufferedReader; +import java.io.File; +import java.io.FileWriter; +import java.io.IOException; +import java.io.Reader; import java.nio.charset.Charset; +import java.nio.charset.StandardCharsets; import java.util.ArrayDeque; import java.util.ArrayList; +import java.util.Calendar; import java.util.Deque; import java.util.List; import java.util.Optional; @@ -24,6 +34,8 @@ */ public class CsvTokenizer { + private static final Logger LOGGER = LoggerFactory.getLogger(CsvTokenizer.class); + enum RecordState { NOT_END, END, @@ -130,6 +142,58 @@ public CsvTokenizer(String delimiter, char quote, char escape, String newline, b this.nullStringOrNull = nullStringOrNull; } + private Reader inputStream; + + public CsvTokenizer(Reader inputStream) + { + this.inputStream = inputStream; + this.delimiterChar = 0; // Unused + this.delimiterFollowingString = null; // Unused + this.quote = 0; // Unused + this.escape = 0; // Unused + this.newline = null; // Unused; + this.trimIfNotQuoted = false; // Unused + this.maxQuotedSizeLimit = 0; // Unused + this.commentLineMarker = null; // Unused + this.input = null; // Unused + this.nullStringOrNull = null; // Unused + } + + public CSVParser csvParse() + { + try { + String path = String.format("tmp_%d.csv", Calendar.getInstance().getTimeInMillis()); + File file = new File(path); + FileWriter filewriter = new FileWriter(file); + + LOGGER.info("create tmp file: " + path); + + BufferedReader b = new BufferedReader(inputStream); + String line = b.readLine(); + int count = 0; + while (true) { + filewriter.write(line); + line = b.readLine(); + if (line == null) { + break; + } + filewriter.write("\r\n"); + count += 1; + if (count % 10000 == 0) { + LOGGER.info("import record count: " + count); + } + } + filewriter.close(); + inputStream.close(); + + CSVParser csvParser = CSVParser.parse(file, StandardCharsets.UTF_8, CSVFormat.DEFAULT.withFirstRecordAsHeader()); + return csvParser; + } + catch (IOException e) { + throw new InvalidValueException(e.getMessage()); + } + } + public long getCurrentLineNumber() { return lineNumber; @@ -434,7 +498,7 @@ else if (isDelimiterFollowingFrom(linePos)) { else if (isSpace(c)) { // column has trailing spaces and quoted. TODO should this be rejected? } else { - throw new InvalidValueException(String.format("Unexpected extra character '%c' after a value quoted by '%c'", c, quote)); + columnState = ColumnState.QUOTED_VALUE; } break; diff --git a/src/main/java/org/embulk/input/marketo/MarketoInputPluginDelegate.java b/src/main/java/org/embulk/input/marketo/MarketoInputPluginDelegate.java index d967e93..ae4c8cf 100644 --- a/src/main/java/org/embulk/input/marketo/MarketoInputPluginDelegate.java +++ b/src/main/java/org/embulk/input/marketo/MarketoInputPluginDelegate.java @@ -7,11 +7,14 @@ import org.embulk.base.restclient.RestClientInputPluginDelegate; import org.embulk.config.ConfigException; import org.embulk.input.marketo.delegate.ActivityBulkExtractInputPlugin; +import org.embulk.input.marketo.delegate.ActivityTypeInputPlugin; import org.embulk.input.marketo.delegate.CampaignInputPlugin; import org.embulk.input.marketo.delegate.CustomObjectInputPlugin; +import org.embulk.input.marketo.delegate.FolderInputPlugin; import org.embulk.input.marketo.delegate.LeadBulkExtractInputPlugin; import org.embulk.input.marketo.delegate.LeadWithListInputPlugin; import org.embulk.input.marketo.delegate.LeadWithProgramInputPlugin; +import org.embulk.input.marketo.delegate.ListInputPlugin; import org.embulk.input.marketo.delegate.ProgramInputPlugin; import org.embulk.input.marketo.delegate.ProgramMembersBulkExtractInputPlugin; import org.embulk.input.marketo.rest.MarketoRestClient; @@ -33,7 +36,10 @@ public interface PluginTask ProgramInputPlugin.PluginTask, MarketoRestClient.PluginTask, CustomObjectInputPlugin.PluginTask, - ProgramMembersBulkExtractInputPlugin.PluginTask + ProgramMembersBulkExtractInputPlugin.PluginTask, + ListInputPlugin.PluginTask, + ActivityTypeInputPlugin.PluginTask, + FolderInputPlugin.PluginTask { @Config("target") Target getTarget(); @@ -79,7 +85,10 @@ public enum Target ALL_LEAD_WITH_PROGRAM_ID(new LeadWithProgramInputPlugin()), PROGRAM(new ProgramInputPlugin()), CUSTOM_OBJECT(new CustomObjectInputPlugin()), - PROGRAM_MEMBERS(new ProgramMembersBulkExtractInputPlugin()); + PROGRAM_MEMBERS(new ProgramMembersBulkExtractInputPlugin()), + LIST(new ListInputPlugin()), + ACTIVITY_TYPE(new ActivityTypeInputPlugin()), + FOLDER(new FolderInputPlugin()); private final RestClientInputPluginDelegate restClientInputPluginDelegate; diff --git a/src/main/java/org/embulk/input/marketo/MarketoService.java b/src/main/java/org/embulk/input/marketo/MarketoService.java index 1164f9f..5e016d0 100644 --- a/src/main/java/org/embulk/input/marketo/MarketoService.java +++ b/src/main/java/org/embulk/input/marketo/MarketoService.java @@ -1,11 +1,13 @@ package org.embulk.input.marketo; import com.fasterxml.jackson.databind.node.ObjectNode; +import org.embulk.input.marketo.delegate.FolderInputPlugin.RootType; import org.embulk.input.marketo.model.MarketoField; import java.io.File; import java.util.Date; import java.util.List; +import java.util.Optional; import java.util.Set; /** @@ -48,4 +50,6 @@ public interface MarketoService ObjectNode describeProgramMembers(); File extractProgramMembers(String exportID); + + Iterable getFolders(Optional rootId, RootType rootType, Integer maxDepth, Optional workspace); } diff --git a/src/main/java/org/embulk/input/marketo/MarketoServiceImpl.java b/src/main/java/org/embulk/input/marketo/MarketoServiceImpl.java index 0638d25..1141b57 100644 --- a/src/main/java/org/embulk/input/marketo/MarketoServiceImpl.java +++ b/src/main/java/org/embulk/input/marketo/MarketoServiceImpl.java @@ -1,10 +1,12 @@ package org.embulk.input.marketo; import com.fasterxml.jackson.databind.node.ObjectNode; +import com.google.common.base.CaseFormat; import com.google.common.base.Function; import com.google.common.collect.Iterables; import com.google.common.collect.Lists; import org.apache.commons.lang3.StringUtils; +import org.embulk.input.marketo.delegate.FolderInputPlugin.RootType; import org.embulk.input.marketo.model.BulkExtractRangeHeader; import org.embulk.input.marketo.model.MarketoField; import org.embulk.input.marketo.rest.MarketoRestClient; @@ -21,6 +23,7 @@ import java.io.OutputStream; import java.util.Date; import java.util.List; +import java.util.Optional; import java.util.Set; import java.util.stream.Collectors; import java.util.stream.IntStream; @@ -275,4 +278,12 @@ public InputStream apply(BulkExtractRangeHeader bulkExtractRangeHeader) } }); } + + @Override + public Iterable getFolders(Optional rootId, RootType rootType, Integer maxDepth, Optional workspace) + { + String type = CaseFormat.UPPER_UNDERSCORE.to(CaseFormat.UPPER_CAMEL, rootType.name()); + Optional root = rootId.isPresent() ? Optional.of(String.format("{\"id\": %d, \"type\": \"%s\"}", rootId.get(), type)) : Optional.empty(); + return marketoRestClient.getFolders(root, maxDepth, workspace); + } } diff --git a/src/main/java/org/embulk/input/marketo/MarketoUtils.java b/src/main/java/org/embulk/input/marketo/MarketoUtils.java index bd5169d..1cb04a7 100644 --- a/src/main/java/org/embulk/input/marketo/MarketoUtils.java +++ b/src/main/java/org/embulk/input/marketo/MarketoUtils.java @@ -1,5 +1,7 @@ package org.embulk.input.marketo; +import com.fasterxml.jackson.core.SerializableString; +import com.fasterxml.jackson.core.io.CharacterEscapes; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.node.ObjectNode; import com.google.common.base.Function; @@ -16,6 +18,7 @@ import org.embulk.spi.ColumnVisitor; import org.embulk.spi.PageBuilder; import org.embulk.spi.Schema; +import org.embulk.spi.time.Timestamp; import org.embulk.util.json.JsonParser; import org.embulk.util.retryhelper.RetryExecutor; import org.embulk.util.retryhelper.RetryGiveupException; @@ -31,6 +34,7 @@ import java.util.Iterator; import java.util.List; import java.util.NoSuchElementException; +import java.util.Optional; import java.util.Set; import static org.embulk.input.marketo.MarketoInputPlugin.CONFIG_MAPPER_FACTORY; @@ -42,7 +46,7 @@ public class MarketoUtils { public static final String MARKETO_DATE_TIME_FORMAT = "%Y-%m-%dT%H:%M:%S%z"; public static final String MARKETO_DATE_FORMAT = "%Y-%m-%d"; - public static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + public static final ObjectMapper OBJECT_MAPPER = getObjectMapper(); public static final Function TRANSFORM_OBJECT_TO_JACKSON_SERVICE_RECORD_FUNCTION = new Function() { @Nullable @@ -63,6 +67,35 @@ private MarketoUtils() { } + private static ObjectMapper getObjectMapper() + { + ObjectMapper objectMapper = new ObjectMapper(); + objectMapper.getFactory().setCharacterEscapes(new CharacterEscapes() { + + private final int[] escapeCodes; + + { + escapeCodes = standardAsciiEscapesForJSON(); + escapeCodes['\''] = CharacterEscapes.ESCAPE_STANDARD; + escapeCodes['/'] = CharacterEscapes.ESCAPE_STANDARD; + escapeCodes['\n'] = CharacterEscapes.ESCAPE_STANDARD; + } + + @Override + public int[] getEscapeCodesForAscii() + { + return escapeCodes; + } + + @Override + public SerializableString getEscapeSequence(int ch) + { + return null; + } + }); + return new ObjectMapper(); + } + public static ServiceResponseMapper buildDynamicResponseMapper(String prefix, List columns) { JacksonServiceResponseMapper.Builder builder = JacksonServiceResponseMapper.builder(); @@ -94,7 +127,7 @@ public static List getFieldNameFromMarketoFields(List colu public static String buildColumnName(String prefix, String columnName) { - return prefix + "_" + columnName; + return prefix.isEmpty() ? columnName : prefix + "_" + columnName; } public static List sliceRange(OffsetDateTime fromDate, OffsetDateTime toDate, int rangeSize) @@ -112,13 +145,19 @@ public static List sliceRange(OffsetDateTime fromDate, OffsetDateTime return ranges; } - public static String getIdentityEndPoint(String accountId) + public static String getIdentityEndPoint(String accountId, Optional endpoint) { + if (endpoint.isPresent()) { + return endpoint.get() + "/identity"; + } return "https://" + accountId.trim() + ".mktorest.com/identity"; } - public static String getEndPoint(String accountID) + public static String getEndPoint(String accountID, Optional endpoint) { + if (endpoint.isPresent()) { + return endpoint.get(); + } return "https://" + accountID.trim() + ".mktorest.com"; } @@ -250,13 +289,13 @@ public void doubleColumn(Column column) @Override public void stringColumn(Column column) { - pageBuilder.setString(column, column.getName() + "_" + rowNum); + pageBuilder.setString(column, column.getName().endsWith("Id") || column.getName().equals("id") ? Integer.toString(rowNum) : column.getName() + "_" + rowNum); } @Override public void timestampColumn(Column column) { - pageBuilder.setTimestamp(column, Instant.ofEpochMilli(System.currentTimeMillis())); + pageBuilder.setTimestamp(column, Timestamp.ofInstant(Instant.ofEpochMilli(System.currentTimeMillis()))); } @Override diff --git a/src/main/java/org/embulk/input/marketo/bulk_extract/AllStringJacksonServiceRecord.java b/src/main/java/org/embulk/input/marketo/bulk_extract/AllStringJacksonServiceRecord.java index 2f69e52..c6938ca 100644 --- a/src/main/java/org/embulk/input/marketo/bulk_extract/AllStringJacksonServiceRecord.java +++ b/src/main/java/org/embulk/input/marketo/bulk_extract/AllStringJacksonServiceRecord.java @@ -7,11 +7,15 @@ import org.embulk.util.json.JsonParser; import org.embulk.util.timestamp.TimestampFormatter; import org.msgpack.value.Value; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.time.Instant; public class AllStringJacksonServiceRecord extends JacksonServiceRecord { + private static final Logger LOGGER = LoggerFactory.getLogger(AllStringJacksonServiceRecord.class); + public AllStringJacksonServiceRecord(ObjectNode record) { super(record); @@ -50,19 +54,37 @@ public boolean booleanValue() @Override public double doubleValue() { - return Double.parseDouble(textValue); + try { + return Double.parseDouble(textValue); + } + catch (Exception e) { + LOGGER.info("skipped to parse Double: " + textValue); + return Double.NaN; + } } @Override public Value jsonValue(JsonParser jsonParser) { - return jsonParser.parse(textValue); + try { + return jsonParser.parse(textValue); + } + catch (Exception e) { + LOGGER.info("skipped to parse JSON: " + textValue); + return jsonParser.parse("{}"); + } } @Override public long longValue() { - return Long.parseLong(textValue); + try { + return Long.parseLong(textValue); + } + catch (Exception e) { + LOGGER.info("skipped to parse Long: " + textValue); + return Long.MIN_VALUE; + } } @Override @@ -74,7 +96,13 @@ public String stringValue() @Override public Instant timestampValue(TimestampFormatter timestampFormatter) { - return timestampFormatter.parse(textValue); + try { + return timestampFormatter.parse(textValue); + } + catch (Exception e) { + LOGGER.info("skipped to parse Timestamp: " + textValue); + return null; + } } } } diff --git a/src/main/java/org/embulk/input/marketo/delegate/ActivityTypeInputPlugin.java b/src/main/java/org/embulk/input/marketo/delegate/ActivityTypeInputPlugin.java new file mode 100644 index 0000000..1208790 --- /dev/null +++ b/src/main/java/org/embulk/input/marketo/delegate/ActivityTypeInputPlugin.java @@ -0,0 +1,42 @@ +package org.embulk.input.marketo.delegate; + +import com.google.common.collect.FluentIterable; +import org.embulk.base.restclient.ServiceResponseMapper; +import org.embulk.base.restclient.jackson.JacksonServiceResponseMapper; +import org.embulk.base.restclient.record.ServiceRecord; +import org.embulk.base.restclient.record.ValueLocator; +import org.embulk.input.marketo.MarketoService; +import org.embulk.input.marketo.MarketoUtils; +import org.embulk.spi.type.Types; + +import java.util.Iterator; + +public class ActivityTypeInputPlugin extends MarketoBaseInputPluginDelegate +{ + public interface PluginTask extends MarketoBaseInputPluginDelegate.PluginTask + { + } + + public ActivityTypeInputPlugin() + { + } + + @Override + protected Iterator getServiceRecords(MarketoService marketoService, PluginTask task) + { + return FluentIterable.from(marketoService.getActivityTypes()).transform(MarketoUtils.TRANSFORM_OBJECT_TO_JACKSON_SERVICE_RECORD_FUNCTION).iterator(); + } + + @Override + public ServiceResponseMapper buildServiceResponseMapper(PluginTask task) + { + JacksonServiceResponseMapper.Builder builder = JacksonServiceResponseMapper.builder(); + builder.add("id", Types.LONG) + .add("name", Types.STRING) + .add("description", Types.STRING) + .add("primaryAttribute", Types.JSON) + .add("attributes", Types.JSON) + .add("apiName", Types.STRING); + return builder.build(); + } +} diff --git a/src/main/java/org/embulk/input/marketo/delegate/FolderInputPlugin.java b/src/main/java/org/embulk/input/marketo/delegate/FolderInputPlugin.java new file mode 100644 index 0000000..4c30c13 --- /dev/null +++ b/src/main/java/org/embulk/input/marketo/delegate/FolderInputPlugin.java @@ -0,0 +1,85 @@ +package org.embulk.input.marketo.delegate; + +import com.fasterxml.jackson.annotation.JsonCreator; +import org.embulk.base.restclient.ServiceResponseMapper; +import org.embulk.base.restclient.jackson.JacksonServiceResponseMapper; +import org.embulk.base.restclient.record.ServiceRecord; +import org.embulk.base.restclient.record.ValueLocator; +import org.embulk.input.marketo.MarketoService; +import org.embulk.input.marketo.MarketoUtils; +import org.embulk.spi.type.Types; +import org.embulk.util.config.Config; +import org.embulk.util.config.ConfigDefault; + +import java.util.Iterator; +import java.util.Optional; +import java.util.stream.StreamSupport; + +public class FolderInputPlugin extends MarketoBaseInputPluginDelegate +{ + public interface PluginTask extends MarketoBaseInputPluginDelegate.PluginTask + { + @Config("root_id") + @ConfigDefault("null") + Optional getRootId(); + + @Config("root_type") + @ConfigDefault("\"folder\"") + RootType getRootType(); + + @Config("max_depth") + @ConfigDefault("2") + Integer getMaxDepth(); + + @Config("workspace") + @ConfigDefault("null") + Optional getWorkspace(); + } + + public FolderInputPlugin() + { + } + + @Override + protected Iterator getServiceRecords(MarketoService marketoService, PluginTask task) + { + return StreamSupport.stream(marketoService.getFolders( + task.getRootId(), + task.getRootType(), + task.getMaxDepth(), + task.getWorkspace() + ).spliterator(), false).map(MarketoUtils.TRANSFORM_OBJECT_TO_JACKSON_SERVICE_RECORD_FUNCTION::apply).iterator(); + } + + @Override + public ServiceResponseMapper buildServiceResponseMapper(PluginTask task) + { + JacksonServiceResponseMapper.Builder builder = JacksonServiceResponseMapper.builder(); + builder.add("id", Types.LONG) + .add("name", Types.STRING) + .add("description", Types.STRING) + .add("createdAt", Types.TIMESTAMP, MarketoUtils.MARKETO_DATE_TIME_FORMAT) + .add("updatedAt", Types.TIMESTAMP, MarketoUtils.MARKETO_DATE_TIME_FORMAT) + .add("url", Types.STRING) + .add("folderId", Types.JSON) + .add("folderType", Types.STRING) + .add("parent", Types.JSON) + .add("path", Types.STRING) + .add("isArchive", Types.BOOLEAN) + .add("isSystem", Types.BOOLEAN) + .add("accessZoneId", Types.LONG) + .add("workspace", Types.STRING); + return builder.build(); + } + + public enum RootType { + FOLDER, + PROGRAM; + + @JsonCreator + public static RootType of(String value) + { + return RootType.valueOf(value.toUpperCase()); + } + } +} diff --git a/src/main/java/org/embulk/input/marketo/delegate/ListInputPlugin.java b/src/main/java/org/embulk/input/marketo/delegate/ListInputPlugin.java new file mode 100644 index 0000000..176a0ac --- /dev/null +++ b/src/main/java/org/embulk/input/marketo/delegate/ListInputPlugin.java @@ -0,0 +1,43 @@ +package org.embulk.input.marketo.delegate; + +import com.google.common.collect.FluentIterable; +import org.embulk.base.restclient.ServiceResponseMapper; +import org.embulk.base.restclient.jackson.JacksonServiceResponseMapper; +import org.embulk.base.restclient.record.ServiceRecord; +import org.embulk.base.restclient.record.ValueLocator; +import org.embulk.input.marketo.MarketoService; +import org.embulk.input.marketo.MarketoUtils; +import org.embulk.spi.type.Types; + +import java.util.Iterator; + +public class ListInputPlugin extends MarketoBaseInputPluginDelegate +{ + public interface PluginTask extends MarketoBaseInputPluginDelegate.PluginTask + { + } + + public ListInputPlugin() + { + } + + @Override + protected Iterator getServiceRecords(MarketoService marketoService, PluginTask task) + { + return FluentIterable.from(marketoService.getLists()).transform(MarketoUtils.TRANSFORM_OBJECT_TO_JACKSON_SERVICE_RECORD_FUNCTION).iterator(); + } + + @Override + public ServiceResponseMapper buildServiceResponseMapper(PluginTask task) + { + JacksonServiceResponseMapper.Builder builder = JacksonServiceResponseMapper.builder(); + builder.add("id", Types.LONG) + .add("name", Types.STRING) + .add("description", Types.STRING) + .add("programName", Types.STRING) + .add("workspaceName", Types.STRING) + .add("createdAt", Types.TIMESTAMP, MarketoUtils.MARKETO_DATE_TIME_FORMAT) + .add("updatedAt", Types.TIMESTAMP, MarketoUtils.MARKETO_DATE_TIME_FORMAT); + return builder.build(); + } +} diff --git a/src/main/java/org/embulk/input/marketo/delegate/MarketoBaseBulkExtractInputPlugin.java b/src/main/java/org/embulk/input/marketo/delegate/MarketoBaseBulkExtractInputPlugin.java index 889a370..8a514b1 100644 --- a/src/main/java/org/embulk/input/marketo/delegate/MarketoBaseBulkExtractInputPlugin.java +++ b/src/main/java/org/embulk/input/marketo/delegate/MarketoBaseBulkExtractInputPlugin.java @@ -1,8 +1,9 @@ package org.embulk.input.marketo.delegate; import com.fasterxml.jackson.databind.node.ObjectNode; -import com.google.common.base.Function; -import com.google.common.collect.Iterators; +import org.apache.commons.csv.CSVParser; +import org.apache.commons.csv.CSVRecord; +import org.apache.commons.lang3.exception.ExceptionUtils; import org.embulk.base.restclient.record.RecordImporter; import org.embulk.base.restclient.record.ServiceRecord; import org.embulk.config.ConfigDiff; @@ -13,18 +14,25 @@ import org.embulk.input.marketo.MarketoServiceImpl; import org.embulk.input.marketo.MarketoUtils; import org.embulk.input.marketo.bulk_extract.AllStringJacksonServiceRecord; -import org.embulk.input.marketo.bulk_extract.CsvRecordIterator; import org.embulk.input.marketo.rest.MarketoRestClient; import org.embulk.spi.Exec; import org.embulk.spi.PageBuilder; import org.embulk.spi.Schema; import org.embulk.util.config.Config; import org.embulk.util.config.ConfigDefault; +import org.embulk.util.file.FileInputInputStream; import org.embulk.util.file.InputStreamFileInput; import org.embulk.util.text.LineDecoder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import java.io.BufferedReader; +import java.io.IOException; import java.io.InputStream; -import java.nio.charset.StandardCharsets; +import java.io.InputStreamReader; +import java.io.Reader; +import java.nio.charset.CharsetDecoder; +import java.nio.charset.CodingErrorAction; import java.text.DateFormat; import java.text.SimpleDateFormat; import java.time.OffsetDateTime; @@ -33,7 +41,6 @@ import java.util.Date; import java.util.Iterator; import java.util.List; -import java.util.Map; import java.util.NoSuchElementException; import java.util.Optional; @@ -46,6 +53,8 @@ public abstract class MarketoBaseBulkExtractInputPlugin> csvRecords = Iterators.concat(Iterators.transform(decoderIterator, - (Function>>) input -> new CsvRecordIterator(input, task))); - //Keep the preview code here when we can enable real preview - if (Exec.isPreview()) { - csvRecords = Iterators.limit(csvRecords, PREVIEW_RECORD_LIMIT); - } int imported = 0; - while (csvRecords.hasNext()) { - Map csvRecord = csvRecords.next(); - ObjectNode objectNode = MarketoUtils.OBJECT_MAPPER.valueToTree(csvRecord); - recordImporter.importRecord(new AllStringJacksonServiceRecord(objectNode), pageBuilder); + while (decoderIterator.hasNext()) { + try { + CSVParser csvParser = getCsvParser(decoderIterator); + System.gc(); + while (csvParser.iterator().hasNext()) { + CSVRecord csvRecord = csvParser.iterator().next(); + ObjectNode objectNode = MarketoUtils.OBJECT_MAPPER.valueToTree(csvRecord.toMap()); + recordImporter.importRecord(new AllStringJacksonServiceRecord(objectNode), pageBuilder); + if (csvParser.getRecordNumber() % 10000 == 0) { + LOGGER.info("transfer record count: " + csvParser.getRecordNumber()); + } + } + LOGGER.info("transfer record count: " + csvParser.getRecordNumber()); + csvParser.close(); + } + catch (CsvTokenizer.InvalidValueException | IllegalArgumentException | IOException ex) { + LOGGER.warn("skipped csv line: " + ExceptionUtils.getStackTrace(ex)); + } imported = imported + 1; } return taskReport; @@ -163,6 +180,13 @@ public TaskReport ingestServiceData(final T task, RecordImporter recordImporter, } } + private CSVParser getCsvParser(LineDecoderIterator decoderIterator) throws IOException + { + Reader inputStream = decoderIterator.next(); + CsvTokenizer csvTokenizer = new CsvTokenizer(inputStream); + return csvTokenizer.csvParse(); + } + private LineDecoderIterator getLineDecoderIterator(T task) { final OffsetDateTime fromDate = OffsetDateTime.ofInstant(task.getFromDate().toInstant(), ZoneOffset.UTC); @@ -182,7 +206,7 @@ protected final Iterator getServiceRecords(MarketoService marketo protected abstract InputStream getExtractedStream(MarketoService service, T task, OffsetDateTime fromDate, OffsetDateTime toDate); - private final class LineDecoderIterator implements Iterator, AutoCloseable + private final class LineDecoderIterator implements Iterator, AutoCloseable { private LineDecoder currentLineDecoder; @@ -218,13 +242,20 @@ public boolean hasNext() } @Override - public LineDecoder next() + public Reader next() { if (hasNext()) { MarketoUtils.DateRange next = dateRangeIterator.next(); InputStream extractedStream = getExtractedStream(marketoService, task, next.fromDate, next.toDate); - currentLineDecoder = LineDecoder.of(new InputStreamFileInput(Exec.getBufferAllocator(), extractedStream), StandardCharsets.UTF_8, null); - return currentLineDecoder; + InputStreamFileInput in = new InputStreamFileInput(Exec.getBufferAllocator(), extractedStream); + FileInputInputStream fileInputInputStream = new FileInputInputStream(in); + + CharsetDecoder decoder = task.getCharset().newDecoder().onMalformedInput(CodingErrorAction.REPLACE) + .onUnmappableCharacter(CodingErrorAction.REPLACE); + BufferedReader b = new BufferedReader(new InputStreamReader(fileInputInputStream, decoder)); + fileInputInputStream.nextFile(); + + return b; } throw new NoSuchElementException(); } diff --git a/src/main/java/org/embulk/input/marketo/delegate/ProgramMembersBulkExtractInputPlugin.java b/src/main/java/org/embulk/input/marketo/delegate/ProgramMembersBulkExtractInputPlugin.java index bdac277..cb6f98b 100644 --- a/src/main/java/org/embulk/input/marketo/delegate/ProgramMembersBulkExtractInputPlugin.java +++ b/src/main/java/org/embulk/input/marketo/delegate/ProgramMembersBulkExtractInputPlugin.java @@ -55,6 +55,7 @@ public class ProgramMembersBulkExtractInputPlugin extends MarketoBaseInputPluginDelegate { private final Logger logger = LoggerFactory.getLogger(getClass()); + private final Object pageBuilderLock = new Object(); public interface PluginTask extends MarketoBaseInputPluginDelegate.PluginTask, CsvTokenizer.PluginTask { @@ -214,7 +215,10 @@ private Future createFutureTask(PluginTask task, RecordImporter recordImporte while (csvRecords.hasNext()) { Map csvRecord = csvRecords.next(); ObjectNode objectNode = MarketoUtils.OBJECT_MAPPER.valueToTree(csvRecord); - recordImporter.importRecord(new AllStringJacksonServiceRecord(objectNode), pageBuilder); + // MEMO: pageBuilderがスレッドアンセーフなために排他制御を利用する + synchronized (pageBuilderLock) { + recordImporter.importRecord(new AllStringJacksonServiceRecord(objectNode), pageBuilder); + } imported = imported + 1; } diff --git a/src/main/java/org/embulk/input/marketo/rest/MarketoBaseRestClient.java b/src/main/java/org/embulk/input/marketo/rest/MarketoBaseRestClient.java index 1932c37..3f966f3 100644 --- a/src/main/java/org/embulk/input/marketo/rest/MarketoBaseRestClient.java +++ b/src/main/java/org/embulk/input/marketo/rest/MarketoBaseRestClient.java @@ -55,6 +55,8 @@ public class MarketoBaseRestClient implements AutoCloseable private String clientSecret; + private String accountId; + private String accessToken; private int marketoLimitIntervalMillis; @@ -70,6 +72,7 @@ public class MarketoBaseRestClient implements AutoCloseable MarketoBaseRestClient(String identityEndPoint, String clientId, String clientSecret, + String accountId, Optional partnerApiKey, int marketoLimitIntervalMillis, long readTimeoutMillis, @@ -78,6 +81,7 @@ public class MarketoBaseRestClient implements AutoCloseable this.identityEndPoint = identityEndPoint; this.clientId = clientId; this.clientSecret = clientSecret; + this.accountId = accountId; this.readTimeoutMillis = readTimeoutMillis; this.retryHelper = retryHelper; this.marketoLimitIntervalMillis = marketoLimitIntervalMillis; @@ -237,7 +241,7 @@ public void requestOnce(HttpClient client, Response.Listener responseListener) } } } - LOGGER.info("CALLING {} -> {} - params: {}", method, target, params); + LOGGER.info("CALLING Account ID: {} {} -> {} - params: {}", accountId, method, target, params); if (contentProvider != null) { request.content(contentProvider); } diff --git a/src/main/java/org/embulk/input/marketo/rest/MarketoRESTEndpoint.java b/src/main/java/org/embulk/input/marketo/rest/MarketoRESTEndpoint.java index 02048e4..6cebe20 100644 --- a/src/main/java/org/embulk/input/marketo/rest/MarketoRESTEndpoint.java +++ b/src/main/java/org/embulk/input/marketo/rest/MarketoRESTEndpoint.java @@ -32,7 +32,8 @@ public enum MarketoRESTEndpoint CREATE_PROGRAM_MEMBERS_EXPORT_JOB("/bulk/v1/program/members/export/create.json"), START_PROGRAM_MEMBERS_EXPORT_JOB("/bulk/v1/program/members/export/${export_id}/enqueue.json"), GET_PROGRAM_MEMBERS_EXPORT_STATUS("/bulk/v1/program/members/export/${export_id}/status.json"), - GET_PROGRAM_MEMBERS_EXPORT_RESULT("/bulk/v1/program/members/export/${export_id}/file.json"); + GET_PROGRAM_MEMBERS_EXPORT_RESULT("/bulk/v1/program/members/export/${export_id}/file.json"), + GET_FOLDERS("/rest/asset/v1/folders.json"); private final String endpoint; MarketoRESTEndpoint(String endpoint) diff --git a/src/main/java/org/embulk/input/marketo/rest/MarketoRestClient.java b/src/main/java/org/embulk/input/marketo/rest/MarketoRestClient.java index 4961022..4db44a4 100644 --- a/src/main/java/org/embulk/input/marketo/rest/MarketoRestClient.java +++ b/src/main/java/org/embulk/input/marketo/rest/MarketoRestClient.java @@ -101,6 +101,10 @@ public interface PluginTask extends Task @Config("account_id") String getAccountId(); + @Config("endpoint") + @ConfigDefault("null") + Optional getInputEndpoint(); + @Config("client_secret") String getClientSecret(); @@ -144,10 +148,11 @@ public interface PluginTask extends Task public MarketoRestClient(PluginTask task) { - this(MarketoUtils.getEndPoint(task.getAccountId()), - MarketoUtils.getIdentityEndPoint(task.getAccountId()), + this(MarketoUtils.getEndPoint(task.getAccountId(), task.getInputEndpoint()), + MarketoUtils.getIdentityEndPoint(task.getAccountId(), task.getInputEndpoint()), task.getClientId(), task.getClientSecret(), + task.getAccountId(), task.getPartnerApiKey(), task.getBatchSize(), task.getMaxReturn(), @@ -163,6 +168,7 @@ public MarketoRestClient(String endPoint, String identityEndPoint, String clientId, String clientSecret, + String accountId, Optional partnerApiKey, Integer batchSize, Integer maxReturn, @@ -170,7 +176,7 @@ public MarketoRestClient(String endPoint, int marketoLimitIntervalMilis, Jetty94RetryHelper retryHelper) { - super(identityEndPoint, clientId, clientSecret, partnerApiKey, marketoLimitIntervalMilis, readTimeoutMilis, retryHelper); + super(identityEndPoint, clientId, clientSecret, accountId, partnerApiKey, marketoLimitIntervalMilis, readTimeoutMilis, retryHelper); this.endPoint = endPoint; this.batchSize = batchSize; this.maxReturn = maxReturn; @@ -622,4 +628,19 @@ public InputStream getProgramMemberBulkExtractResult(String exportId, BulkExtrac { return getBulkExtractResult(MarketoRESTEndpoint.GET_PROGRAM_MEMBERS_EXPORT_RESULT, exportId, bulkExtractRangeHeader); } + + public RecordPagingIterable getFolders(Optional root, int maxDepth, Optional workspace) + { + ImmutableListMultimap.Builder builder = new ImmutableListMultimap + .Builder() + .put("maxDepth", String.valueOf(maxDepth)) + .put(MAX_RETURN, DEFAULT_MAX_RETURN); + if (root.isPresent()) { + builder.put("root", root.get()); + } + if (workspace.isPresent()) { + builder.put("workSpace", workspace.get()); + } + return getRecordWithOffsetPagination(endPoint + MarketoRESTEndpoint.GET_FOLDERS.getEndpoint(), builder.build(), ObjectNode.class); + } } diff --git a/src/test/java/org/embulk/input/marketo/MarketoUtilsTest.java b/src/test/java/org/embulk/input/marketo/MarketoUtilsTest.java index 75b0cd9..439438a 100644 --- a/src/test/java/org/embulk/input/marketo/MarketoUtilsTest.java +++ b/src/test/java/org/embulk/input/marketo/MarketoUtilsTest.java @@ -13,6 +13,7 @@ import java.time.ZoneOffset; import java.util.ArrayList; import java.util.List; +import java.util.Optional; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; @@ -60,20 +61,30 @@ public void buildColumnName() { String columnName = MarketoUtils.buildColumnName("prefix", "columnName"); assertEquals("prefix_columnName", columnName); + String noPrefixColumn = MarketoUtils.buildColumnName("", "columnName"); + assertEquals("columnName", noPrefixColumn); } @Test public void getIdentityEndPoint() { - String identityEndPoint = MarketoUtils.getIdentityEndPoint("accountId"); + Optional endpoint = Optional.empty(); + String identityEndPoint = MarketoUtils.getIdentityEndPoint("accountId", endpoint); assertEquals("https://accountId.mktorest.com/identity", identityEndPoint); + Optional endpoint2 = Optional.of("endpoint"); + String identityEndPointUsingEndpoint = MarketoUtils.getIdentityEndPoint("accountId", endpoint2); + assertEquals("endpoint/identity", identityEndPointUsingEndpoint); } @Test public void getEndPoint() { - String endPoint = MarketoUtils.getEndPoint("accountId"); + Optional endpoint = Optional.empty(); + String endPoint = MarketoUtils.getEndPoint("accountId", endpoint); assertEquals("https://accountId.mktorest.com", endPoint); + Optional endpoint2 = Optional.of("endpoint"); + String endPointUsingEndpoint = MarketoUtils.getEndPoint("accountId", endpoint2); + assertEquals("endpoint", endPointUsingEndpoint); } @Test diff --git a/src/test/java/org/embulk/input/marketo/delegate/ActivityTypeInputPluginTest.java b/src/test/java/org/embulk/input/marketo/delegate/ActivityTypeInputPluginTest.java new file mode 100644 index 0000000..f9c5b92 --- /dev/null +++ b/src/test/java/org/embulk/input/marketo/delegate/ActivityTypeInputPluginTest.java @@ -0,0 +1,71 @@ +package org.embulk.input.marketo.delegate; + +import com.fasterxml.jackson.databind.JavaType; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ObjectNode; +import org.embulk.EmbulkTestRuntime; +import org.embulk.base.restclient.ServiceResponseMapper; +import org.embulk.base.restclient.record.RecordImporter; +import org.embulk.base.restclient.record.ValueLocator; +import org.embulk.config.ConfigLoader; +import org.embulk.config.ConfigSource; +import org.embulk.input.marketo.rest.MarketoRestClient; +import org.embulk.input.marketo.rest.RecordPagingIterable; +import org.embulk.spi.PageBuilder; +import org.embulk.spi.Schema; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.mockito.ArgumentCaptor; +import org.mockito.Mockito; + +import java.io.IOException; +import java.util.List; + +import static org.embulk.input.marketo.MarketoUtilsTest.CONFIG_MAPPER; +import static org.junit.Assert.assertArrayEquals; + +public class ActivityTypeInputPluginTest +{ + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + + @Rule + public EmbulkTestRuntime embulkTestRuntime = new EmbulkTestRuntime(); + + private ConfigSource configSource; + + private ActivityTypeInputPlugin activityTypeInputPlugin; + + private MarketoRestClient mockMarketoRestClient; + + @Before + public void setUp() throws Exception + { + activityTypeInputPlugin = Mockito.spy(new ActivityTypeInputPlugin()); + ConfigLoader configLoader = embulkTestRuntime.getInjector().getInstance(ConfigLoader.class); + configSource = configLoader.fromYaml(this.getClass().getResourceAsStream("/config/rest_config.yaml")); + mockMarketoRestClient = Mockito.mock(MarketoRestClient.class); + Mockito.doReturn(mockMarketoRestClient).when(activityTypeInputPlugin).createMarketoRestClient(Mockito.any(ActivityTypeInputPlugin.PluginTask.class)); + } + + @Test + public void testRun() throws IOException + { + RecordPagingIterable mockRecordPagingIterable = Mockito.mock(RecordPagingIterable.class); + JavaType javaType = OBJECT_MAPPER.getTypeFactory().constructParametrizedType(List.class, List.class, ObjectNode.class); + List objectNodeList = OBJECT_MAPPER.readValue(this.getClass().getResourceAsStream("/fixtures/activity_type_response_full.json"), javaType); + Mockito.when(mockRecordPagingIterable.iterator()).thenReturn(objectNodeList.iterator()); + Mockito.when(mockMarketoRestClient.getActivityTypes()).thenReturn(mockRecordPagingIterable); + ActivityTypeInputPlugin.PluginTask task = CONFIG_MAPPER.map(configSource, ActivityTypeInputPlugin.PluginTask.class); + ServiceResponseMapper mapper = activityTypeInputPlugin.buildServiceResponseMapper(task); + RecordImporter recordImporter = mapper.createRecordImporter(); + PageBuilder mockPageBuilder = Mockito.mock(PageBuilder.class); + activityTypeInputPlugin.ingestServiceData(task, recordImporter, 1, mockPageBuilder); + Mockito.verify(mockMarketoRestClient, Mockito.times(1)).getActivityTypes(); + Schema embulkSchema = mapper.getEmbulkSchema(); + ArgumentCaptor longArgumentCaptor = ArgumentCaptor.forClass(Long.class); + Mockito.verify(mockPageBuilder, Mockito.times(5)).setLong(Mockito.eq(embulkSchema.lookupColumn("id")), longArgumentCaptor.capture()); + List allValues = longArgumentCaptor.getAllValues(); + assertArrayEquals(new Long[]{1L, 2L, 3L, 4L, 5L}, allValues.toArray()); + } +} diff --git a/src/test/java/org/embulk/input/marketo/delegate/FolderInputPluginTest.java b/src/test/java/org/embulk/input/marketo/delegate/FolderInputPluginTest.java new file mode 100644 index 0000000..35060ea --- /dev/null +++ b/src/test/java/org/embulk/input/marketo/delegate/FolderInputPluginTest.java @@ -0,0 +1,85 @@ +package org.embulk.input.marketo.delegate; + +import com.fasterxml.jackson.databind.JavaType; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ObjectNode; +import org.embulk.EmbulkTestRuntime; +import org.embulk.base.restclient.ServiceResponseMapper; +import org.embulk.base.restclient.record.RecordImporter; +import org.embulk.base.restclient.record.ValueLocator; +import org.embulk.config.ConfigLoader; +import org.embulk.config.ConfigSource; +import org.embulk.input.marketo.delegate.FolderInputPlugin.PluginTask; +import org.embulk.input.marketo.rest.MarketoRestClient; +import org.embulk.input.marketo.rest.RecordPagingIterable; +import org.embulk.spi.PageBuilder; +import org.embulk.spi.Schema; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.mockito.ArgumentCaptor; + +import java.io.IOException; +import java.util.List; +import java.util.Optional; + +import static org.embulk.input.marketo.MarketoUtilsTest.CONFIG_MAPPER; +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; +import static org.mockito.Mockito.any; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +public class FolderInputPluginTest +{ + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + + @Rule + public EmbulkTestRuntime runtime = new EmbulkTestRuntime(); + + private ConfigSource configSource; + + private FolderInputPlugin mockPlugin; + + private MarketoRestClient mockRestClient; + + @Before + public void setUp() throws Exception + { + mockPlugin = spy(new FolderInputPlugin()); + ConfigLoader configLoader = runtime.getInjector().getInstance(ConfigLoader.class); + configSource = configLoader.fromYaml(this.getClass().getResourceAsStream("/config/rest_config.yaml")); + mockRestClient = mock(MarketoRestClient.class); + doReturn(mockRestClient).when(mockPlugin).createMarketoRestClient(any(PluginTask.class)); + } + + @Test + @SuppressWarnings("unchecked") + public void testRun() throws IOException + { + RecordPagingIterable mockRecordPagingIterable = mock(RecordPagingIterable.class); + JavaType javaType = OBJECT_MAPPER.getTypeFactory().constructParametrizedType(List.class, List.class, ObjectNode.class); + List objectNodeList = OBJECT_MAPPER.readValue(this.getClass().getResourceAsStream("/fixtures/folder_response_full.json"), javaType); + when(mockRecordPagingIterable.spliterator()).thenReturn(objectNodeList.spliterator()); + when(mockRestClient.getFolders(Optional.empty(), 2, Optional.empty())).thenReturn(mockRecordPagingIterable); + + PluginTask task = CONFIG_MAPPER.map(configSource, PluginTask.class); + ServiceResponseMapper mapper = mockPlugin.buildServiceResponseMapper(task); + RecordImporter recordImporter = mapper.createRecordImporter(); + PageBuilder mockPageBuilder = mock(PageBuilder.class); + mockPlugin.ingestServiceData(task, recordImporter, 1, mockPageBuilder); + + verify(mockRestClient, times(1)).getFolders(Optional.empty(), 2, Optional.empty()); + Schema embulkSchema = mapper.getEmbulkSchema(); + assertEquals(embulkSchema.size(), 14); + ArgumentCaptor longArgumentCaptor = ArgumentCaptor.forClass(Long.class); + verify(mockPageBuilder, times(3)).setLong(eq(embulkSchema.lookupColumn("id")), longArgumentCaptor.capture()); + List allValues = longArgumentCaptor.getAllValues(); + assertArrayEquals(new Long[]{1001L, 1002L, 2001L}, allValues.toArray()); + } +} diff --git a/src/test/java/org/embulk/input/marketo/delegate/ListInputPluginTest.java b/src/test/java/org/embulk/input/marketo/delegate/ListInputPluginTest.java new file mode 100644 index 0000000..3b81f22 --- /dev/null +++ b/src/test/java/org/embulk/input/marketo/delegate/ListInputPluginTest.java @@ -0,0 +1,71 @@ +package org.embulk.input.marketo.delegate; + +import com.fasterxml.jackson.databind.JavaType; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ObjectNode; +import org.embulk.EmbulkTestRuntime; +import org.embulk.base.restclient.ServiceResponseMapper; +import org.embulk.base.restclient.record.RecordImporter; +import org.embulk.base.restclient.record.ValueLocator; +import org.embulk.config.ConfigLoader; +import org.embulk.config.ConfigSource; +import org.embulk.input.marketo.rest.MarketoRestClient; +import org.embulk.input.marketo.rest.RecordPagingIterable; +import org.embulk.spi.PageBuilder; +import org.embulk.spi.Schema; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.mockito.ArgumentCaptor; +import org.mockito.Mockito; + +import java.io.IOException; +import java.util.List; + +import static org.embulk.input.marketo.MarketoUtilsTest.CONFIG_MAPPER; +import static org.junit.Assert.assertArrayEquals; + +public class ListInputPluginTest +{ + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + + @Rule + public EmbulkTestRuntime embulkTestRuntime = new EmbulkTestRuntime(); + + private ConfigSource configSource; + + private ListInputPlugin listInputPlugin; + + private MarketoRestClient mockMarketoRestClient; + + @Before + public void setUp() throws Exception + { + listInputPlugin = Mockito.spy(new ListInputPlugin()); + ConfigLoader configLoader = embulkTestRuntime.getInjector().getInstance(ConfigLoader.class); + configSource = configLoader.fromYaml(this.getClass().getResourceAsStream("/config/rest_config.yaml")); + mockMarketoRestClient = Mockito.mock(MarketoRestClient.class); + Mockito.doReturn(mockMarketoRestClient).when(listInputPlugin).createMarketoRestClient(Mockito.any(ListInputPlugin.PluginTask.class)); + } + + @Test + public void testRun() throws IOException + { + RecordPagingIterable mockRecordPagingIterable = Mockito.mock(RecordPagingIterable.class); + JavaType javaType = OBJECT_MAPPER.getTypeFactory().constructParametrizedType(List.class, List.class, ObjectNode.class); + List objectNodeList = OBJECT_MAPPER.readValue(this.getClass().getResourceAsStream("/fixtures/list_reponse_full.json"), javaType); + Mockito.when(mockRecordPagingIterable.iterator()).thenReturn(objectNodeList.iterator()); + Mockito.when(mockMarketoRestClient.getLists()).thenReturn(mockRecordPagingIterable); + ListInputPlugin.PluginTask task = CONFIG_MAPPER.map(configSource, ListInputPlugin.PluginTask.class); + ServiceResponseMapper mapper = listInputPlugin.buildServiceResponseMapper(task); + RecordImporter recordImporter = mapper.createRecordImporter(); + PageBuilder mockPageBuilder = Mockito.mock(PageBuilder.class); + listInputPlugin.ingestServiceData(task, recordImporter, 1, mockPageBuilder); + Mockito.verify(mockMarketoRestClient, Mockito.times(1)).getLists(); + Schema embulkSchema = mapper.getEmbulkSchema(); + ArgumentCaptor longArgumentCaptor = ArgumentCaptor.forClass(Long.class); + Mockito.verify(mockPageBuilder, Mockito.times(24)).setLong(Mockito.eq(embulkSchema.lookupColumn("id")), longArgumentCaptor.capture()); + List allValues = longArgumentCaptor.getAllValues(); + assertArrayEquals(new Long[]{1007L, 1009L, 1010L, 1011L, 1012L, 1052L, 1063L, 1066L, 1067L, 1072L, 1073L, 1075L, 1076L, 1077L, 1078L, 1079L, 1080L, 1081L, 1082L, 1083L, 1084L, 1085L, 1086L, 1087L}, allValues.toArray()); + } +} diff --git a/src/test/java/org/embulk/input/marketo/rest/MarketoBaseRestClientTest.java b/src/test/java/org/embulk/input/marketo/rest/MarketoBaseRestClientTest.java index ffda0d0..a636bb3 100644 --- a/src/test/java/org/embulk/input/marketo/rest/MarketoBaseRestClientTest.java +++ b/src/test/java/org/embulk/input/marketo/rest/MarketoBaseRestClientTest.java @@ -59,7 +59,7 @@ public class MarketoBaseRestClientTest public void prepare() { mockJetty94 = Mockito.mock(Jetty94RetryHelper.class); - marketoBaseRestClient = new MarketoBaseRestClient("identityEndPoint", "clientId", "clientSecret", Optional.empty(), MARKETO_LIMIT_INTERVAL_MILIS, 60000, mockJetty94); + marketoBaseRestClient = new MarketoBaseRestClient("identityEndPoint", "clientId", "clientSecret", "accountId", Optional.empty(), MARKETO_LIMIT_INTERVAL_MILIS, 60000, mockJetty94); } @Test @@ -122,6 +122,7 @@ public void testGetAccessTokenRequestShouldHavePartnerId() MarketoBaseRestClient restClient = Mockito.spy(new MarketoBaseRestClient("identityEndPoint", "clientId", "clientSecret", + "accountId", Optional.of(partnerId), MARKETO_LIMIT_INTERVAL_MILIS, 60000, @@ -180,7 +181,7 @@ public void testGetAccessTokenThrowHttpResponseException() throws Exception Mockito.doThrow(exception).when(request).send(Mockito.any(Response.Listener.class)); Jetty94RetryHelper retryHelper = new Jetty94RetryHelper(1, 1, 1, clientCreator); - final MarketoBaseRestClient restClient = new MarketoBaseRestClient("identityEndPoint", "clientId", "clientSecret", Optional.empty(), MARKETO_LIMIT_INTERVAL_MILIS, 1000, retryHelper); + final MarketoBaseRestClient restClient = new MarketoBaseRestClient("identityEndPoint", "clientId", "clientSecret", "accountId", Optional.empty(), MARKETO_LIMIT_INTERVAL_MILIS, 1000, retryHelper); // calling method should wrap the HttpResponseException by ConfigException Assert.assertThrows(ConfigException.class, restClient::getAccessToken); @@ -217,7 +218,7 @@ private MarketoBaseRestClient doRequestWithWrapper(HttpMethod method) throws Exc Mockito.doThrow(exception).when(request).send(Mockito.any(Response.Listener.class)); Jetty94RetryHelper retryHelper = new Jetty94RetryHelper(1, 1, 1, clientCreator); - final MarketoBaseRestClient restClient = Mockito.spy(new MarketoBaseRestClient("identityEndPoint", "clientId", "clientSecret", Optional.empty(), MARKETO_LIMIT_INTERVAL_MILIS, 1000, retryHelper)); + final MarketoBaseRestClient restClient = Mockito.spy(new MarketoBaseRestClient("identityEndPoint", "clientId", "clientSecret", "accountId", Optional.empty(), MARKETO_LIMIT_INTERVAL_MILIS, 1000, retryHelper)); Mockito.doReturn("test_access_token").when(restClient).getAccessToken(); return restClient; diff --git a/src/test/java/org/embulk/input/marketo/rest/MarketoRestClientTest.java b/src/test/java/org/embulk/input/marketo/rest/MarketoRestClientTest.java index fef17cc..59f6e24 100644 --- a/src/test/java/org/embulk/input/marketo/rest/MarketoRestClientTest.java +++ b/src/test/java/org/embulk/input/marketo/rest/MarketoRestClientTest.java @@ -40,6 +40,7 @@ import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Set; import static org.embulk.input.marketo.MarketoInputPlugin.CONFIG_MAPPER_FACTORY; @@ -66,7 +67,9 @@ public class MarketoRestClientTest private static final String TEST_CLIENT_ID = "test_client_id"; - private static final String END_POINT = MarketoUtils.getEndPoint(TEST_ACCOUNT_ID); + private static final Optional TEST_ENDPOINT = Optional.empty(); + + private static final String END_POINT = MarketoUtils.getEndPoint(TEST_ACCOUNT_ID, TEST_ENDPOINT); private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper().configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); @@ -654,4 +657,30 @@ public void testGetListsByIds() throws IOException FormContentProvider form = formCaptor.getValue(); Assert.assertEquals("nextPageToken=GWP55GLCVCZLPE6SS7OCG5IEXQ%3D%3D%3D%3D%3D%3D&id=123%2C456&batchSize=300", fromContentProviderToString(form)); } + + @Test + public void getFolders() throws Exception + { + ArrayNode listPages = (ArrayNode) OBJECT_MAPPER.readTree(new String(ByteStreams.toByteArray(this.getClass().getResourceAsStream("/fixtures/folder_response.json")))).get("responses"); + MarketoResponse page1 = OBJECT_MAPPER.readValue(listPages.get(0).toString(), RESPONSE_TYPE); + MarketoResponse page2 = OBJECT_MAPPER.readValue(listPages.get(1).toString(), RESPONSE_TYPE); + doReturn(page1).doReturn(page2).when(marketoRestClient).doGet(eq(END_POINT + MarketoRESTEndpoint.GET_FOLDERS.getEndpoint()), isNull(), any(Multimap.class), any(MarketoResponseJettyEntityReader.class)); + RecordPagingIterable lists = marketoRestClient.getFolders(Optional.empty(), 2, Optional.empty()); + Iterator iterator = lists.iterator(); + ObjectNode folder1 = iterator.next(); + ObjectNode folder2 = iterator.next(); + ObjectNode folder3 = iterator.next(); + Assert.assertFalse(iterator.hasNext()); + Assert.assertEquals("folder_test_1_name", folder1.get("name").asText()); + Assert.assertEquals("folder_test_2_name", folder2.get("name").asText()); + Assert.assertEquals("program_test_1_name", folder3.get("name").asText()); + ArgumentCaptor immutableListMultimapArgumentCaptor = ArgumentCaptor.forClass(ImmutableListMultimap.class); + verify(marketoRestClient, times(2)).doGet(eq(END_POINT + MarketoRESTEndpoint.GET_FOLDERS.getEndpoint()), isNull(), immutableListMultimapArgumentCaptor.capture(), any(MarketoResponseJettyEntityReader.class)); + List params = immutableListMultimapArgumentCaptor.getAllValues(); + ImmutableListMultimap params1 = params.get(0); + Assert.assertEquals("0", params1.get("offset").get(0)); + Assert.assertEquals("2", params1.get("maxReturn").get(0)); + ImmutableListMultimap params2 = params.get(1); + Assert.assertEquals("2", params2.get("offset").get(0)); + } } diff --git a/src/test/resources/fixtures/activity_type_response_full.json b/src/test/resources/fixtures/activity_type_response_full.json new file mode 100644 index 0000000..4a7ec97 --- /dev/null +++ b/src/test/resources/fixtures/activity_type_response_full.json @@ -0,0 +1,98 @@ +[ + { + "id": 1, + "name": "bb", + "description": "sample", + "primaryAttribute": { + "name": "exampleID", + "dataType": "integer" + }, + "attributes": [ + { + "name": "sample_1", + "dataType": "string" + }, + { + "name": "sample_2", + "dataType": "boolean" + } + ] + }, + { + "id": 2, + "name": "TD Output Test aa", + "description": "sample_aa", + "primaryAttribute": { + "name": "exampleID", + "dataType": "integer" + }, + "attributes": [ + { + "name": "sample_aa_1", + "dataType": "string" + }, + { + "name": "sample_aa_2", + "dataType": "boolean" + } + ] + }, + { + "id": 3, + "name": "Bill_progream a", + "description": "sample", + "primaryAttribute": { + "name": "exampleID", + "dataType": "integer" + }, + "attributes": [ + { + "name": "sample_a_1", + "dataType": "string" + }, + { + "name": "sample_a_1", + "dataType": "boolean" + } + ], + "apiName": "sample_name_c" + }, + { + "id": 4, + "name": "Bill_progream b", + "description": "sample_b", + "primaryAttribute": { + "name": "exampleID", + "dataType": "integer" + }, + "attributes": [ + { + "name": "sample_b_1", + "dataType": "string" + }, + { + "name": "sample_b_2", + "dataType": "boolean" + } + ] + }, + { + "id": 5, + "name": "Bill_progream c", + "description": "sample_c", + "primaryAttribute": { + "name": "exampleID", + "dataType": "integer" + }, + "attributes": [ + { + "name": "sample_c_1", + "dataType": "string" + }, + { + "name": "sample_c_2", + "dataType": "boolean" + } + ] + } +] \ No newline at end of file diff --git a/src/test/resources/fixtures/folder_response.json b/src/test/resources/fixtures/folder_response.json new file mode 100644 index 0000000..5af76b8 --- /dev/null +++ b/src/test/resources/fixtures/folder_response.json @@ -0,0 +1,86 @@ +{ + "responses": [ + { + "success": true, + "errors": [], + "requestId": "15d3a#181dca1287a", + "warnings": [], + "result": [ + { + "name": "folder_test_1_name", + "description": "", + "createdAt": "2022-05-30T10:01:40Z+0000", + "updatedAt": "2022-05-30T10:01:40Z+0000", + "url": "https://app-sjdemo1.marketo.com/#MF1001A1", + "folderId": { + "id": 1001, + "type": "Folder" + }, + "folderType": "Marketing Folder", + "parent": { + "id": 1000, + "type": "Folder" + }, + "path": "/Marketing Activities/Default/folder_test_1_name", + "isArchive": false, + "isSystem": false, + "accessZoneId": 1011, + "workspace": "Default", + "id": 1001 + }, + { + "name": "folder_test_2_name", + "description": "", + "createdAt": "2022-07-08T06:56:01Z+0000", + "updatedAt": "2022-07-08T06:56:01Z+0000", + "url": "https://app-sjdemo1.marketo.com/#MF1002A1", + "folderId": { + "id": 1002, + "type": "Folder" + }, + "folderType": "Marketing Folder", + "parent": { + "id": 1001, + "type": "Folder" + }, + "path": "/Marketing Activities/Default/folder_test_1_name/folder_test_2_name", + "isArchive": false, + "isSystem": false, + "accessZoneId": 1011, + "workspace": "Default", + "id": 1002 + } + ] + }, + { + "success": true, + "errors": [], + "requestId": "15d3a#181dca1287a", + "warnings": [], + "result": [ + { + "name": "program_test_1_name", + "description": "", + "createdAt": "2022-06-01T01:58:52Z+0000", + "updatedAt": "2022-06-01T01:58:52Z+0000", + "url": "https://app-sjdemo1.marketo.com/#PG2001A1", + "folderId": { + "id": 2001, + "type": "Program" + }, + "folderType": "Marketing Program", + "parent": { + "id": 1001, + "type": "Folder" + }, + "path": "/Marketing Activities/Default/folder_test_1_name/program_test_1_name", + "isArchive": false, + "isSystem": false, + "accessZoneId": 1011, + "workspace": "Default", + "id": 2001 + } + ] + } + ] +} diff --git a/src/test/resources/fixtures/folder_response_full.json b/src/test/resources/fixtures/folder_response_full.json new file mode 100644 index 0000000..34a5ac1 --- /dev/null +++ b/src/test/resources/fixtures/folder_response_full.json @@ -0,0 +1,68 @@ +[ + { + "name": "folder_test_1_name", + "description": "", + "createdAt": "2022-05-30T10:01:40Z+0000", + "updatedAt": "2022-05-30T10:01:40Z+0000", + "url": "https://app-sjdemo1.marketo.com/#MF1001A1", + "folderId": { + "id": 1001, + "type": "Folder" + }, + "folderType": "Marketing Folder", + "parent": { + "id": 1000, + "type": "Folder" + }, + "path": "/Marketing Activities/Default/folder_test_1_name", + "isArchive": false, + "isSystem": false, + "accessZoneId": 1011, + "workspace": "Default", + "id": 1001 + }, + { + "name": "folder_test_2_name", + "description": "", + "createdAt": "2022-07-08T06:56:01Z+0000", + "updatedAt": "2022-07-08T06:56:01Z+0000", + "url": "https://app-sjdemo1.marketo.com/#MF1002A1", + "folderId": { + "id": 1002, + "type": "Folder" + }, + "folderType": "Marketing Folder", + "parent": { + "id": 1001, + "type": "Folder" + }, + "path": "/Marketing Activities/Default/folder_test_1_name/folder_test_2_name", + "isArchive": false, + "isSystem": false, + "accessZoneId": 1011, + "workspace": "Default", + "id": 1002 + }, + { + "name": "program_test_1_name", + "description": "", + "createdAt": "2022-06-01T01:58:52Z+0000", + "updatedAt": "2022-06-01T01:58:52Z+0000", + "url": "https://app-sjdemo1.marketo.com/#PG2001A1", + "folderId": { + "id": 2001, + "type": "Program" + }, + "folderType": "Marketing Program", + "parent": { + "id": 1001, + "type": "Folder" + }, + "path": "/Marketing Activities/Default/folder_test_1_name/program_test_1_name", + "isArchive": false, + "isSystem": false, + "accessZoneId": 1011, + "workspace": "Default", + "id": 2001 + } +]