Skip to content

Commit

Permalink
fixed to pass single SearchPage to consumer
Browse files Browse the repository at this point in the history
  • Loading branch information
d-hrs committed Jan 17, 2024
1 parent de40057 commit 003f153
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 33 deletions.
23 changes: 10 additions & 13 deletions src/main/java/org/embulk/input/google_ads/GoogleAdsInputPlugin.java
Original file line number Diff line number Diff line change
Expand Up @@ -69,19 +69,16 @@ public TaskReport run(TaskSource taskSource,
try (PageBuilder pageBuilder = getPageBuilder(schema, output)) {
Map<String, String> params = new HashMap<>();
reporter.search(
searchPages -> {
searchPages.forEach(page -> {
for (GoogleAdsRow row : page.getValues()) {
Map<String, String> result = new HashMap<>();
reporter.flattenResource(null, row.getAllFields(), result);
schema.visitColumns(new GoogleAdsColumnVisitor(new GoogleAdsAccessor(task, result), pageBuilder, task));
pageBuilder.addRecord();
}
pageBuilder.flush();
}
);
},
params
searchPage -> {
for (GoogleAdsRow row : searchPage.getValues()) {
Map<String, String> result = new HashMap<>();
reporter.flattenResource(null, row.getAllFields(), result);
schema.visitColumns(new GoogleAdsColumnVisitor(new GoogleAdsAccessor(task, result), pageBuilder, task));
pageBuilder.addRecord();
}
pageBuilder.flush();
},
params
);
pageBuilder.finish();
}
Expand Down
31 changes: 11 additions & 20 deletions src/main/java/org/embulk/input/google_ads/GoogleAdsReporter.java
Original file line number Diff line number Diff line change
Expand Up @@ -66,14 +66,19 @@ private Iterable<GoogleAdsServiceClient.SearchPage> search(Map<String, String> p
return response.iteratePages();
}

public void search(Consumer<Iterable<GoogleAdsServiceClient.SearchPage>> consumer, Map<String, String> params) {
Iterable<GoogleAdsServiceClient.SearchPage> pages = search(params);
consumer.accept(pages);
public void search(Consumer<GoogleAdsServiceClient.SearchPage> consumer, Map<String, String> params) {
GoogleAdsServiceClient.SearchPage lastPage = null;
for(GoogleAdsServiceClient.SearchPage page: search(params)) {
consumer.accept(page);
lastPage = page;
}

if (task.getResourceType().equals("change_event")) {
// reset iterator
pages.iterator();
GoogleAdsRow lastRow = fetchLastRow(pages);
if (lastPage == null) return ;
GoogleAdsRow lastRow = null;
for (GoogleAdsRow row: lastPage.getValues()) {
lastRow = row;
}
if (lastRow == null) return ;

Map<String, String> nextParams = new HashMap<>();
Expand All @@ -82,20 +87,6 @@ public void search(Consumer<Iterable<GoogleAdsServiceClient.SearchPage>> consume
}
}

private GoogleAdsRow fetchLastRow(Iterable<GoogleAdsServiceClient.SearchPage> pages) {
GoogleAdsServiceClient.SearchPage lastPage = null;
for (GoogleAdsServiceClient.SearchPage searchPage : pages) {
lastPage = searchPage;
}
if (lastPage == null) return null;

GoogleAdsRow lastRow = null;
for (GoogleAdsRow row: lastPage.getValues()) {
lastRow = row;
}
return lastRow;
}

public void flattenResource(String resourceName, Map<Descriptors.FieldDescriptor, Object> fields, Map<String, String> result)
{
for (Descriptors.FieldDescriptor key : fields.keySet()) {
Expand Down

0 comments on commit 003f153

Please sign in to comment.