From fcd6ee80ec6db71f1796b2977a47dfc30c722a02 Mon Sep 17 00:00:00 2001 From: d-hrs Date: Fri, 12 Jan 2024 21:48:20 +0900 Subject: [PATCH] tmp --- .../google_ads/GoogleAdsInputPlugin.java | 25 ++++--- .../input/google_ads/GoogleAdsReporter.java | 75 ++++++++++--------- 2 files changed, 56 insertions(+), 44 deletions(-) diff --git a/src/main/java/org/embulk/input/google_ads/GoogleAdsInputPlugin.java b/src/main/java/org/embulk/input/google_ads/GoogleAdsInputPlugin.java index 53f038a..08d0be3 100644 --- a/src/main/java/org/embulk/input/google_ads/GoogleAdsInputPlugin.java +++ b/src/main/java/org/embulk/input/google_ads/GoogleAdsInputPlugin.java @@ -69,17 +69,22 @@ public TaskReport run(TaskSource taskSource, reporter.connect(); try { try (PageBuilder pageBuilder = getPageBuilder(schema, output)) { - for (GoogleAdsServiceClient.SearchPage page : reporter.getReportPage()) { - for (GoogleAdsRow row : page.getValues()) { - result = new HashMap() - { - }; - reporter.flattenResource(null, row.getAllFields(), result); - schema.visitColumns(new GoogleAdsColumnVisitor(new GoogleAdsAccessor(task, result), pageBuilder, task)); - pageBuilder.addRecord(); - } + Map params = new HashMap<>(); + reporter.search(params); + while(reporter.fetchResult( + pages -> { + pages.forEach(page -> { + for (GoogleAdsRow row : page.getValues()) { + Map ret = new HashMap<>(); + reporter.flattenResource(null, row.getAllFields(), ret); + schema.visitColumns(new GoogleAdsColumnVisitor(new GoogleAdsAccessor(task, ret), pageBuilder, task)); + pageBuilder.addRecord(); + } + }); + } + )){ pageBuilder.flush(); - } + }; pageBuilder.finish(); } } catch (Exception e) { diff --git a/src/main/java/org/embulk/input/google_ads/GoogleAdsReporter.java b/src/main/java/org/embulk/input/google_ads/GoogleAdsReporter.java index bee630c..219567b 100644 --- a/src/main/java/org/embulk/input/google_ads/GoogleAdsReporter.java +++ b/src/main/java/org/embulk/input/google_ads/GoogleAdsReporter.java @@ -26,6 +26,9 @@ import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.HashMap; +import java.util.function.Consumer; +import java.util.function.Function; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -38,6 +41,8 @@ public class GoogleAdsReporter private GoogleAdsClient client; private ObjectMapper mapper = new ObjectMapper(); + private Iterable searchResult; + public GoogleAdsReporter(PluginTask task) { this.task = task; @@ -53,40 +58,42 @@ private UserCredentials buildCredential(PluginTask task) .build(); } - public Iterable getReportPage() - { - List pages = new ArrayList(); - - String startDateTime = null; - do { - String query = buildQuery(task, startDateTime); - logger.info(query); - SearchGoogleAdsRequest request = buildRequest(task, query); - GoogleAdsServiceClient googleAdsService = client.getVersion14().createGoogleAdsServiceClient(); - GoogleAdsServiceClient.SearchPagedResponse response = googleAdsService.search(request); - - if (response.getPage().getResponse().getResultsCount() == 0) { - return pages; - } + public void search(Map params) { + String query = buildQuery(task, params); + logger.info(query); + SearchGoogleAdsRequest request = buildRequest(task, query); + GoogleAdsServiceClient googleAdsService = client.getVersion14().createGoogleAdsServiceClient(); + GoogleAdsServiceClient.SearchPagedResponse response = googleAdsService.search(request); + this.searchResult = response.iteratePages(); + } - response.iteratePages().iterator().forEachRemaining(pages::add); + public boolean fetchResult(Consumer> consumer) { + consumer.accept(this.searchResult); - if (task.getResourceType().equals("change_event")) { - GoogleAdsServiceClient.SearchPage lastPage = pages.get(pages.size() - 1); - GoogleAdsRow lastRow = null; - for(GoogleAdsRow row : lastPage.getValues()) { - lastRow = row; - } + if (!task.getResourceType().equals("change_event")) return false; - if (lastRow == null) { - break; - } else { - startDateTime = lastRow.getChangeEvent().getChangeDateTime(); - } - } - } while (startDateTime != null && !startDateTime.isEmpty()); + // NOTE: reset iterator + this.searchResult.iterator(); + + GoogleAdsServiceClient.SearchPage lastPage = null; + for (GoogleAdsServiceClient.SearchPage searchPage : this.searchResult) { + lastPage = searchPage; + } + if (lastPage == null) return false; + + GoogleAdsRow lastRow = null; + for (GoogleAdsRow row: lastPage.getValues()) { + lastRow = row; + } + if (lastRow == null) return false; + + lastRow.getChangeEvent().getChangeDateTime(); - return pages; + this.searchResult = null; + Map params = new HashMap<>(); + params.put("start_datetime", lastRow.getChangeEvent().getChangeDateTime()); + search(params); + return true; } public void flattenResource(String resourceName, Map fields, Map result) @@ -230,7 +237,7 @@ public SearchGoogleAdsRequest buildRequest(PluginTask task, String query) .build(); } - public String buildQuery(PluginTask task, String startDateTime) + public String buildQuery(PluginTask task, Map params) { StringBuilder sb = new StringBuilder(); @@ -240,7 +247,7 @@ public String buildQuery(PluginTask task, String startDateTime) sb.append(" FROM "); sb.append(task.getResourceType()); - List whereClause = buildWhereClauseConditions(task, startDateTime); + List whereClause = buildWhereClauseConditions(task, params); if (!whereClause.isEmpty()) { sb.append(" WHERE "); sb.append(String.join(" AND ", whereClause)); @@ -255,7 +262,7 @@ public String buildQuery(PluginTask task, String startDateTime) } @VisibleForTesting - public List buildWhereClauseConditions(PluginTask task, String startDateTime) + public List buildWhereClauseConditions(PluginTask task, Map params) { List whereConditions = new ArrayList() { @@ -264,7 +271,7 @@ public List buildWhereClauseConditions(PluginTask task, String startDate if (task.getDateRange().isPresent()) { StringBuilder dateSb = new StringBuilder(); if (task.getResourceType().equals("change_event")) { - dateSb.append(buildWhereClauseConditionsForChangeEvent(startDateTime)); + dateSb.append(buildWhereClauseConditionsForChangeEvent(params.get("start_datetime"))); } else { dateSb.append("segments.date BETWEEN '"); dateSb.append(task.getDateRange().get().getStartDate());