From 003f1532a8441f0521a3e989b4c92cec1219927c Mon Sep 17 00:00:00 2001 From: d-hrs Date: Wed, 17 Jan 2024 16:55:58 +0900 Subject: [PATCH] fixed to pass single SearchPage to consumer --- .../google_ads/GoogleAdsInputPlugin.java | 23 ++++++-------- .../input/google_ads/GoogleAdsReporter.java | 31 +++++++------------ 2 files changed, 21 insertions(+), 33 deletions(-) diff --git a/src/main/java/org/embulk/input/google_ads/GoogleAdsInputPlugin.java b/src/main/java/org/embulk/input/google_ads/GoogleAdsInputPlugin.java index bf1df0c..49ba180 100644 --- a/src/main/java/org/embulk/input/google_ads/GoogleAdsInputPlugin.java +++ b/src/main/java/org/embulk/input/google_ads/GoogleAdsInputPlugin.java @@ -69,19 +69,16 @@ public TaskReport run(TaskSource taskSource, try (PageBuilder pageBuilder = getPageBuilder(schema, output)) { Map params = new HashMap<>(); reporter.search( - searchPages -> { - searchPages.forEach(page -> { - for (GoogleAdsRow row : page.getValues()) { - Map result = new HashMap<>(); - reporter.flattenResource(null, row.getAllFields(), result); - schema.visitColumns(new GoogleAdsColumnVisitor(new GoogleAdsAccessor(task, result), pageBuilder, task)); - pageBuilder.addRecord(); - } - pageBuilder.flush(); - } - ); - }, - params + searchPage -> { + for (GoogleAdsRow row : searchPage.getValues()) { + Map result = new HashMap<>(); + reporter.flattenResource(null, row.getAllFields(), result); + schema.visitColumns(new GoogleAdsColumnVisitor(new GoogleAdsAccessor(task, result), pageBuilder, task)); + pageBuilder.addRecord(); + } + pageBuilder.flush(); + }, + params ); pageBuilder.finish(); } diff --git a/src/main/java/org/embulk/input/google_ads/GoogleAdsReporter.java b/src/main/java/org/embulk/input/google_ads/GoogleAdsReporter.java index f1bdfd2..55f6464 100644 --- a/src/main/java/org/embulk/input/google_ads/GoogleAdsReporter.java +++ b/src/main/java/org/embulk/input/google_ads/GoogleAdsReporter.java @@ -66,14 +66,19 @@ private Iterable search(Map p return response.iteratePages(); } - public void search(Consumer> consumer, Map params) { - Iterable pages = search(params); - consumer.accept(pages); + public void search(Consumer consumer, Map params) { + GoogleAdsServiceClient.SearchPage lastPage = null; + for(GoogleAdsServiceClient.SearchPage page: search(params)) { + consumer.accept(page); + lastPage = page; + } if (task.getResourceType().equals("change_event")) { - // reset iterator - pages.iterator(); - GoogleAdsRow lastRow = fetchLastRow(pages); + if (lastPage == null) return ; + GoogleAdsRow lastRow = null; + for (GoogleAdsRow row: lastPage.getValues()) { + lastRow = row; + } if (lastRow == null) return ; Map nextParams = new HashMap<>(); @@ -82,20 +87,6 @@ public void search(Consumer> consume } } - private GoogleAdsRow fetchLastRow(Iterable pages) { - GoogleAdsServiceClient.SearchPage lastPage = null; - for (GoogleAdsServiceClient.SearchPage searchPage : pages) { - lastPage = searchPage; - } - if (lastPage == null) return null; - - GoogleAdsRow lastRow = null; - for (GoogleAdsRow row: lastPage.getValues()) { - lastRow = row; - } - return lastRow; - } - public void flattenResource(String resourceName, Map fields, Map result) { for (Descriptors.FieldDescriptor key : fields.keySet()) {