From e91b53b4773727da47bd34c62c21b733a6d6d83d Mon Sep 17 00:00:00 2001 From: d-hrs Date: Fri, 12 Jan 2024 21:48:20 +0900 Subject: [PATCH] tmp --- .../google_ads/GoogleAdsInputPlugin.java | 39 ++++++--- .../input/google_ads/GoogleAdsReporter.java | 80 ++++++++++--------- 2 files changed, 71 insertions(+), 48 deletions(-) diff --git a/src/main/java/org/embulk/input/google_ads/GoogleAdsInputPlugin.java b/src/main/java/org/embulk/input/google_ads/GoogleAdsInputPlugin.java index 53f038a..ddf6008 100644 --- a/src/main/java/org/embulk/input/google_ads/GoogleAdsInputPlugin.java +++ b/src/main/java/org/embulk/input/google_ads/GoogleAdsInputPlugin.java @@ -69,18 +69,37 @@ public TaskReport run(TaskSource taskSource, reporter.connect(); try { try (PageBuilder pageBuilder = getPageBuilder(schema, output)) { - for (GoogleAdsServiceClient.SearchPage page : reporter.getReportPage()) { - for (GoogleAdsRow row : page.getValues()) { - result = new HashMap() - { - }; - reporter.flattenResource(null, row.getAllFields(), result); - schema.visitColumns(new GoogleAdsColumnVisitor(new GoogleAdsAccessor(task, result), pageBuilder, task)); - pageBuilder.addRecord(); - } + Map params = new HashMap<>(); + reporter.search(params); + while(reporter.fetchResult( + pages -> { + pages.forEach(page -> { + for (GoogleAdsRow row : page.getValues()) { + Map ret = new HashMap<>(); + reporter.flattenResource(null, row.getAllFields(), ret); + schema.visitColumns(new GoogleAdsColumnVisitor(new GoogleAdsAccessor(task, ret), pageBuilder, task)); + pageBuilder.addRecord(); + } + }); + return null; + } + )){ pageBuilder.flush(); - } + }; pageBuilder.finish(); + + // for (GoogleAdsServiceClient.SearchPage page : reporter.getReportPage(params)) { + // for (GoogleAdsRow row : page.getValues()) { + // result = new HashMap() + // { + // }; + // reporter.flattenResource(null, row.getAllFields(), result); + // schema.visitColumns(new GoogleAdsColumnVisitor(new GoogleAdsAccessor(task, result), pageBuilder, task)); + // pageBuilder.addRecord(); + // } + // pageBuilder.flush(); + // } + // pageBuilder.finish(); } } catch (Exception e) { logger.error(e.getMessage()); diff --git a/src/main/java/org/embulk/input/google_ads/GoogleAdsReporter.java b/src/main/java/org/embulk/input/google_ads/GoogleAdsReporter.java index bee630c..ebbbb07 100644 --- a/src/main/java/org/embulk/input/google_ads/GoogleAdsReporter.java +++ b/src/main/java/org/embulk/input/google_ads/GoogleAdsReporter.java @@ -22,10 +22,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.ArrayList; -import java.util.Iterator; -import java.util.List; -import java.util.Map; +import java.util.*; +import java.util.function.Function; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -38,6 +36,8 @@ public class GoogleAdsReporter private GoogleAdsClient client; private ObjectMapper mapper = new ObjectMapper(); + private Iterable searchResult; + public GoogleAdsReporter(PluginTask task) { this.task = task; @@ -53,40 +53,44 @@ private UserCredentials buildCredential(PluginTask task) .build(); } - public Iterable getReportPage() - { - List pages = new ArrayList(); - - String startDateTime = null; - do { - String query = buildQuery(task, startDateTime); - logger.info(query); - SearchGoogleAdsRequest request = buildRequest(task, query); - GoogleAdsServiceClient googleAdsService = client.getVersion14().createGoogleAdsServiceClient(); - GoogleAdsServiceClient.SearchPagedResponse response = googleAdsService.search(request); - - if (response.getPage().getResponse().getResultsCount() == 0) { - return pages; - } + public void search(Map params) { + String query = buildQuery(task, params); + logger.info(query); + SearchGoogleAdsRequest request = buildRequest(task, query); + GoogleAdsServiceClient googleAdsService = client.getVersion14().createGoogleAdsServiceClient(); + GoogleAdsServiceClient.SearchPagedResponse response = googleAdsService.search(request); + this.searchResult = response.iteratePages(); + } - response.iteratePages().iterator().forEachRemaining(pages::add); + public boolean fetchResult(Function, Void> func) { + logger.info("fetchResult"); + func.apply(this.searchResult); + Iterator< GoogleAdsServiceClient.SearchPage> pageItr = this.searchResult.iterator(); - if (task.getResourceType().equals("change_event")) { - GoogleAdsServiceClient.SearchPage lastPage = pages.get(pages.size() - 1); - GoogleAdsRow lastRow = null; - for(GoogleAdsRow row : lastPage.getValues()) { - lastRow = row; - } + if (!task.getResourceType().equals("change_event")) return false; - if (lastRow == null) { - break; - } else { - startDateTime = lastRow.getChangeEvent().getChangeDateTime(); - } - } - } while (startDateTime != null && !startDateTime.isEmpty()); + // NOTE: reset iterator + this.searchResult.iterator(); - return pages; + GoogleAdsServiceClient.SearchPage lastPage = null; + while(pageItr.hasNext()) { + lastPage = pageItr.next(); + } + if (lastPage == null) return false; + + Iterator rowItr = lastPage.getValues().iterator(); + GoogleAdsRow lastRow = null; + while(rowItr.hasNext()) { + lastRow = rowItr.next(); + } + if (lastRow == null) return false; + lastRow.getChangeEvent().getChangeDateTime(); + + this.searchResult = null; + Map params = new HashMap<>(); + params.put("start_datetime", lastRow.getChangeEvent().getChangeDateTime()); + search(params); + return true; } public void flattenResource(String resourceName, Map fields, Map result) @@ -230,7 +234,7 @@ public SearchGoogleAdsRequest buildRequest(PluginTask task, String query) .build(); } - public String buildQuery(PluginTask task, String startDateTime) + public String buildQuery(PluginTask task, Map params) { StringBuilder sb = new StringBuilder(); @@ -240,7 +244,7 @@ public String buildQuery(PluginTask task, String startDateTime) sb.append(" FROM "); sb.append(task.getResourceType()); - List whereClause = buildWhereClauseConditions(task, startDateTime); + List whereClause = buildWhereClauseConditions(task, params); if (!whereClause.isEmpty()) { sb.append(" WHERE "); sb.append(String.join(" AND ", whereClause)); @@ -255,7 +259,7 @@ public String buildQuery(PluginTask task, String startDateTime) } @VisibleForTesting - public List buildWhereClauseConditions(PluginTask task, String startDateTime) + public List buildWhereClauseConditions(PluginTask task, Map params) { List whereConditions = new ArrayList() { @@ -264,7 +268,7 @@ public List buildWhereClauseConditions(PluginTask task, String startDate if (task.getDateRange().isPresent()) { StringBuilder dateSb = new StringBuilder(); if (task.getResourceType().equals("change_event")) { - dateSb.append(buildWhereClauseConditionsForChangeEvent(startDateTime)); + dateSb.append(buildWhereClauseConditionsForChangeEvent(params.get("start_datetime"))); } else { dateSb.append("segments.date BETWEEN '"); dateSb.append(task.getDateRange().get().getStartDate());