Skip to content

Commit e91b53b

Browse files
committed
tmp
1 parent 1e37a05 commit e91b53b

File tree

2 files changed

+71
-48
lines changed

2 files changed

+71
-48
lines changed

src/main/java/org/embulk/input/google_ads/GoogleAdsInputPlugin.java

Lines changed: 29 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -69,18 +69,37 @@ public TaskReport run(TaskSource taskSource,
6969
reporter.connect();
7070
try {
7171
try (PageBuilder pageBuilder = getPageBuilder(schema, output)) {
72-
for (GoogleAdsServiceClient.SearchPage page : reporter.getReportPage()) {
73-
for (GoogleAdsRow row : page.getValues()) {
74-
result = new HashMap<String, String>()
75-
{
76-
};
77-
reporter.flattenResource(null, row.getAllFields(), result);
78-
schema.visitColumns(new GoogleAdsColumnVisitor(new GoogleAdsAccessor(task, result), pageBuilder, task));
79-
pageBuilder.addRecord();
80-
}
72+
Map<String, String> params = new HashMap<>();
73+
reporter.search(params);
74+
while(reporter.fetchResult(
75+
pages -> {
76+
pages.forEach(page -> {
77+
for (GoogleAdsRow row : page.getValues()) {
78+
Map<String, String> ret = new HashMap<>();
79+
reporter.flattenResource(null, row.getAllFields(), ret);
80+
schema.visitColumns(new GoogleAdsColumnVisitor(new GoogleAdsAccessor(task, ret), pageBuilder, task));
81+
pageBuilder.addRecord();
82+
}
83+
});
84+
return null;
85+
}
86+
)){
8187
pageBuilder.flush();
82-
}
88+
};
8389
pageBuilder.finish();
90+
91+
// for (GoogleAdsServiceClient.SearchPage page : reporter.getReportPage(params)) {
92+
// for (GoogleAdsRow row : page.getValues()) {
93+
// result = new HashMap<String, String>()
94+
// {
95+
// };
96+
// reporter.flattenResource(null, row.getAllFields(), result);
97+
// schema.visitColumns(new GoogleAdsColumnVisitor(new GoogleAdsAccessor(task, result), pageBuilder, task));
98+
// pageBuilder.addRecord();
99+
// }
100+
// pageBuilder.flush();
101+
// }
102+
// pageBuilder.finish();
84103
}
85104
} catch (Exception e) {
86105
logger.error(e.getMessage());

src/main/java/org/embulk/input/google_ads/GoogleAdsReporter.java

Lines changed: 42 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -22,10 +22,8 @@
2222
import org.slf4j.Logger;
2323
import org.slf4j.LoggerFactory;
2424

25-
import java.util.ArrayList;
26-
import java.util.Iterator;
27-
import java.util.List;
28-
import java.util.Map;
25+
import java.util.*;
26+
import java.util.function.Function;
2927
import java.util.stream.Collectors;
3028
import java.util.stream.Stream;
3129

@@ -38,6 +36,8 @@ public class GoogleAdsReporter
3836
private GoogleAdsClient client;
3937
private ObjectMapper mapper = new ObjectMapper();
4038

39+
private Iterable<GoogleAdsServiceClient.SearchPage> searchResult;
40+
4141
public GoogleAdsReporter(PluginTask task)
4242
{
4343
this.task = task;
@@ -53,40 +53,44 @@ private UserCredentials buildCredential(PluginTask task)
5353
.build();
5454
}
5555

56-
public Iterable<GoogleAdsServiceClient.SearchPage> getReportPage()
57-
{
58-
List<GoogleAdsServiceClient.SearchPage> pages = new ArrayList<GoogleAdsServiceClient.SearchPage>();
59-
60-
String startDateTime = null;
61-
do {
62-
String query = buildQuery(task, startDateTime);
63-
logger.info(query);
64-
SearchGoogleAdsRequest request = buildRequest(task, query);
65-
GoogleAdsServiceClient googleAdsService = client.getVersion14().createGoogleAdsServiceClient();
66-
GoogleAdsServiceClient.SearchPagedResponse response = googleAdsService.search(request);
67-
68-
if (response.getPage().getResponse().getResultsCount() == 0) {
69-
return pages;
70-
}
56+
public void search(Map<String, String> params) {
57+
String query = buildQuery(task, params);
58+
logger.info(query);
59+
SearchGoogleAdsRequest request = buildRequest(task, query);
60+
GoogleAdsServiceClient googleAdsService = client.getVersion14().createGoogleAdsServiceClient();
61+
GoogleAdsServiceClient.SearchPagedResponse response = googleAdsService.search(request);
62+
this.searchResult = response.iteratePages();
63+
}
7164

72-
response.iteratePages().iterator().forEachRemaining(pages::add);
65+
public boolean fetchResult(Function<Iterable<GoogleAdsServiceClient.SearchPage>, Void> func) {
66+
logger.info("fetchResult");
67+
func.apply(this.searchResult);
68+
Iterator< GoogleAdsServiceClient.SearchPage> pageItr = this.searchResult.iterator();
7369

74-
if (task.getResourceType().equals("change_event")) {
75-
GoogleAdsServiceClient.SearchPage lastPage = pages.get(pages.size() - 1);
76-
GoogleAdsRow lastRow = null;
77-
for(GoogleAdsRow row : lastPage.getValues()) {
78-
lastRow = row;
79-
}
70+
if (!task.getResourceType().equals("change_event")) return false;
8071

81-
if (lastRow == null) {
82-
break;
83-
} else {
84-
startDateTime = lastRow.getChangeEvent().getChangeDateTime();
85-
}
86-
}
87-
} while (startDateTime != null && !startDateTime.isEmpty());
72+
// NOTE: reset iterator
73+
this.searchResult.iterator();
8874

89-
return pages;
75+
GoogleAdsServiceClient.SearchPage lastPage = null;
76+
while(pageItr.hasNext()) {
77+
lastPage = pageItr.next();
78+
}
79+
if (lastPage == null) return false;
80+
81+
Iterator<GoogleAdsRow> rowItr = lastPage.getValues().iterator();
82+
GoogleAdsRow lastRow = null;
83+
while(rowItr.hasNext()) {
84+
lastRow = rowItr.next();
85+
}
86+
if (lastRow == null) return false;
87+
lastRow.getChangeEvent().getChangeDateTime();
88+
89+
this.searchResult = null;
90+
Map<String, String> params = new HashMap<>();
91+
params.put("start_datetime", lastRow.getChangeEvent().getChangeDateTime());
92+
search(params);
93+
return true;
9094
}
9195

9296
public void flattenResource(String resourceName, Map<Descriptors.FieldDescriptor, Object> fields, Map<String, String> result)
@@ -230,7 +234,7 @@ public SearchGoogleAdsRequest buildRequest(PluginTask task, String query)
230234
.build();
231235
}
232236

233-
public String buildQuery(PluginTask task, String startDateTime)
237+
public String buildQuery(PluginTask task, Map<String, String> params)
234238
{
235239
StringBuilder sb = new StringBuilder();
236240

@@ -240,7 +244,7 @@ public String buildQuery(PluginTask task, String startDateTime)
240244
sb.append(" FROM ");
241245
sb.append(task.getResourceType());
242246

243-
List<String> whereClause = buildWhereClauseConditions(task, startDateTime);
247+
List<String> whereClause = buildWhereClauseConditions(task, params);
244248
if (!whereClause.isEmpty()) {
245249
sb.append(" WHERE ");
246250
sb.append(String.join(" AND ", whereClause));
@@ -255,7 +259,7 @@ public String buildQuery(PluginTask task, String startDateTime)
255259
}
256260

257261
@VisibleForTesting
258-
public List<String> buildWhereClauseConditions(PluginTask task, String startDateTime)
262+
public List<String> buildWhereClauseConditions(PluginTask task, Map<String, String> params)
259263
{
260264
List<String> whereConditions = new ArrayList<String>()
261265
{
@@ -264,7 +268,7 @@ public List<String> buildWhereClauseConditions(PluginTask task, String startDate
264268
if (task.getDateRange().isPresent()) {
265269
StringBuilder dateSb = new StringBuilder();
266270
if (task.getResourceType().equals("change_event")) {
267-
dateSb.append(buildWhereClauseConditionsForChangeEvent(startDateTime));
271+
dateSb.append(buildWhereClauseConditionsForChangeEvent(params.get("start_datetime")));
268272
} else {
269273
dateSb.append("segments.date BETWEEN '");
270274
dateSb.append(task.getDateRange().get().getStartDate());

0 commit comments

Comments
 (0)