diff --git a/src/main/java/org/embulk/input/google_ads/GoogleAdsReporter.java b/src/main/java/org/embulk/input/google_ads/GoogleAdsReporter.java index a03ac70..bbc0e25 100644 --- a/src/main/java/org/embulk/input/google_ads/GoogleAdsReporter.java +++ b/src/main/java/org/embulk/input/google_ads/GoogleAdsReporter.java @@ -7,6 +7,7 @@ import com.fasterxml.jackson.databind.node.JsonNodeType; import com.fasterxml.jackson.databind.node.ObjectNode; import com.google.ads.googleads.lib.GoogleAdsClient; +import com.google.ads.googleads.v13.services.GoogleAdsRow; import com.google.ads.googleads.v13.services.GoogleAdsServiceClient; import com.google.ads.googleads.v13.services.SearchGoogleAdsRequest; import com.google.auth.oauth2.UserCredentials; @@ -54,12 +55,38 @@ private UserCredentials buildCredential(PluginTask task) public Iterable getReportPage() { - String query = buildQuery(task); - logger.info(query); - SearchGoogleAdsRequest request = buildRequest(task, query); - GoogleAdsServiceClient googleAdsService = client.getVersion13().createGoogleAdsServiceClient(); - GoogleAdsServiceClient.SearchPagedResponse response = googleAdsService.search(request); - return response.iteratePages(); + List pages = new ArrayList(); + + String startDateTime = null; + do { + String query = buildQuery(task, startDateTime); + logger.info(query); + SearchGoogleAdsRequest request = buildRequest(task, query); + GoogleAdsServiceClient googleAdsService = client.getVersion13().createGoogleAdsServiceClient(); + GoogleAdsServiceClient.SearchPagedResponse response = googleAdsService.search(request); + + if (response.getPage().getResponse().getResultsCount() == 0) { + return pages; + } + + response.iteratePages().iterator().forEachRemaining(pages::add); + + if (task.getResourceType().equals("change_event")) { + GoogleAdsServiceClient.SearchPage lastPage = pages.get(pages.size() - 1); + GoogleAdsRow lastRow = null; + for(GoogleAdsRow row : lastPage.getValues()) { + lastRow = row; + } + + if (lastRow == null) { + break; + } else { + startDateTime = lastRow.getChangeEvent().getChangeDateTime(); + } + } + } while (startDateTime != null && !startDateTime.isEmpty()); + + return pages; } public void flattenResource(String resourceName, Map fields, Map result) @@ -203,7 +230,7 @@ public SearchGoogleAdsRequest buildRequest(PluginTask task, String query) .build(); } - public String buildQuery(PluginTask task) + public String buildQuery(PluginTask task, String startDateTime) { StringBuilder sb = new StringBuilder(); @@ -213,17 +240,22 @@ public String buildQuery(PluginTask task) sb.append(" FROM "); sb.append(task.getResourceType()); - List whereClause = buildWhereClauseConditions(task); + List whereClause = buildWhereClauseConditions(task, startDateTime); if (!whereClause.isEmpty()) { sb.append(" WHERE "); sb.append(String.join(" AND ", whereClause)); } + if (task.getLimit().isPresent()) { + sb.append(" LIMIT "); + sb.append(task.getLimit().get()); + } + return sb.toString(); } @VisibleForTesting - public List buildWhereClauseConditions(PluginTask task) + public List buildWhereClauseConditions(PluginTask task, String startDateTime) { List whereConditions = new ArrayList() { @@ -231,11 +263,15 @@ public List buildWhereClauseConditions(PluginTask task) if (task.getDateRange().isPresent()) { StringBuilder dateSb = new StringBuilder(); - dateSb.append("segments.date BETWEEN '"); - dateSb.append(task.getDateRange().get().getStartDate()); - dateSb.append("' AND '"); - dateSb.append(task.getDateRange().get().getEndDate()); - dateSb.append("'"); + if (task.getResourceType().equals("change_event")) { + dateSb.append(buildWhereClauseConditionsForChangeEvent(startDateTime)); + } else { + dateSb.append("segments.date BETWEEN '"); + dateSb.append(task.getDateRange().get().getStartDate()); + dateSb.append("' AND '"); + dateSb.append(task.getDateRange().get().getEndDate()); + dateSb.append("'"); + } whereConditions.add(dateSb.toString()); } @@ -257,4 +293,25 @@ public void connect() } this.client = builder.build(); } + + private String buildWhereClauseConditionsForChangeEvent(String startDateTime) + { + StringBuilder dateSb = new StringBuilder(); + dateSb.append("change_event.change_date_time "); + if (startDateTime == null) { + dateSb.append(" >= '"); + dateSb.append(task.getDateRange().get().getStartDate()); + } else { + dateSb.append(" > '"); + dateSb.append(startDateTime); + } + dateSb.append("' AND "); + dateSb.append("change_event.change_date_time "); + dateSb.append(" <= '"); + dateSb.append(task.getDateRange().get().getEndDate()); + dateSb.append("'"); + dateSb.append(" ORDER BY change_event.change_date_time ASC"); + + return dateSb.toString(); + } } diff --git a/src/main/java/org/embulk/input/google_ads/PluginTask.java b/src/main/java/org/embulk/input/google_ads/PluginTask.java index f5b1b28..ced63a8 100644 --- a/src/main/java/org/embulk/input/google_ads/PluginTask.java +++ b/src/main/java/org/embulk/input/google_ads/PluginTask.java @@ -43,6 +43,10 @@ public interface PluginTask extends Task @ConfigDefault("null") Optional getDateRange(); + @Config("limit") + @ConfigDefault("null") + Optional getLimit(); + @Config("_use_micro") @ConfigDefault("true") boolean getUseMicro();