Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

decrease memory utilization #50

Merged
merged 3 commits into from
Jan 17, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 16 additions & 13 deletions src/main/java/org/embulk/input/google_ads/GoogleAdsInputPlugin.java
Original file line number Diff line number Diff line change
Expand Up @@ -63,23 +63,26 @@ public TaskReport run(TaskSource taskSource,
PageOutput output)
{
PluginTask task = taskSource.loadTask(PluginTask.class);
Map<String, String> result;

GoogleAdsReporter reporter = new GoogleAdsReporter(task);
reporter.connect();
try {
try (PageBuilder pageBuilder = getPageBuilder(schema, output)) {
for (GoogleAdsServiceClient.SearchPage page : reporter.getReportPage()) {
for (GoogleAdsRow row : page.getValues()) {
result = new HashMap<String, String>()
{
};
reporter.flattenResource(null, row.getAllFields(), result);
schema.visitColumns(new GoogleAdsColumnVisitor(new GoogleAdsAccessor(task, result), pageBuilder, task));
pageBuilder.addRecord();
}
pageBuilder.flush();
}
Map<String, String> params = new HashMap<>();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NOTE:
Since params is empty, the initial value of start_datetime is null.

reporter.search(
searchPages -> {
searchPages.forEach(page -> {
for (GoogleAdsRow row : page.getValues()) {
Map<String, String> result = new HashMap<>();
reporter.flattenResource(null, row.getAllFields(), result);
schema.visitColumns(new GoogleAdsColumnVisitor(new GoogleAdsAccessor(task, result), pageBuilder, task));
pageBuilder.addRecord();
}
pageBuilder.flush();
}
);
},
params
);
pageBuilder.finish();
}
} catch (Exception e) {
Expand Down
76 changes: 42 additions & 34 deletions src/main/java/org/embulk/input/google_ads/GoogleAdsReporter.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.HashMap;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import java.util.stream.Stream;

Expand All @@ -38,6 +40,8 @@ public class GoogleAdsReporter
private GoogleAdsClient client;
private ObjectMapper mapper = new ObjectMapper();

private Iterable<GoogleAdsServiceClient.SearchPage> searchResult;

public GoogleAdsReporter(PluginTask task)
{
this.task = task;
Expand All @@ -53,40 +57,44 @@ private UserCredentials buildCredential(PluginTask task)
.build();
}

public Iterable<GoogleAdsServiceClient.SearchPage> getReportPage()
{
List<GoogleAdsServiceClient.SearchPage> pages = new ArrayList<GoogleAdsServiceClient.SearchPage>();

String startDateTime = null;
do {
String query = buildQuery(task, startDateTime);
logger.info(query);
SearchGoogleAdsRequest request = buildRequest(task, query);
GoogleAdsServiceClient googleAdsService = client.getVersion14().createGoogleAdsServiceClient();
GoogleAdsServiceClient.SearchPagedResponse response = googleAdsService.search(request);

if (response.getPage().getResponse().getResultsCount() == 0) {
return pages;
}
private Iterable<GoogleAdsServiceClient.SearchPage> search(Map<String, String> params) {
String query = buildQuery(task, params);
logger.info(query);
SearchGoogleAdsRequest request = buildRequest(task, query);
GoogleAdsServiceClient googleAdsService = client.getVersion14().createGoogleAdsServiceClient();
GoogleAdsServiceClient.SearchPagedResponse response = googleAdsService.search(request);
return response.iteratePages();
}

response.iteratePages().iterator().forEachRemaining(pages::add);
public void search(Consumer<Iterable<GoogleAdsServiceClient.SearchPage>> consumer, Map<String, String> params) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nits: Although Iterator is called twice to get the last row, it is called once when the search method params change to per page.

public void search(Consumer<GoogleAdsServiceClient.SearchPage> consumer, Map<String, String> params) {
     Iterable<GoogleAdsServiceClient.SearchPage> pages = search(params);
    for (GoogleAdsServiceClient.SearchPage page : pages) {
        consumer.accept(page);
        lastPage = page;
    }

    if (!task.getResourceType().equals("change_event") || lastPage == null) {
        return;
     }

     ...

Iterable<GoogleAdsServiceClient.SearchPage> pages = search(params);
consumer.accept(pages);

if (task.getResourceType().equals("change_event")) {
GoogleAdsServiceClient.SearchPage lastPage = pages.get(pages.size() - 1);
GoogleAdsRow lastRow = null;
for(GoogleAdsRow row : lastPage.getValues()) {
lastRow = row;
}
if (task.getResourceType().equals("change_event")) {
// reset iterator
pages.iterator();
GoogleAdsRow lastRow = fetchLastRow(pages);
if (lastRow == null) return ;

if (lastRow == null) {
break;
} else {
startDateTime = lastRow.getChangeEvent().getChangeDateTime();
}
}
} while (startDateTime != null && !startDateTime.isEmpty());
lastRow.getChangeEvent().getChangeDateTime();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This code seems unused.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed: de40057

Map<String, String> nextParams = new HashMap<>();
nextParams.put("start_datetime", lastRow.getChangeEvent().getChangeDateTime());
search(consumer, nextParams);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is a risk that pages may not be released.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed: 003f153

I fixed to pass single SearchPage to consumer, and pages is removed.

}
}

return pages;
private GoogleAdsRow fetchLastRow(Iterable<GoogleAdsServiceClient.SearchPage> pages) {
GoogleAdsServiceClient.SearchPage lastPage = null;
for (GoogleAdsServiceClient.SearchPage searchPage : pages) {
lastPage = searchPage;
}
if (lastPage == null) return null;

GoogleAdsRow lastRow = null;
for (GoogleAdsRow row: lastPage.getValues()) {
lastRow = row;
}
return lastRow;
}

public void flattenResource(String resourceName, Map<Descriptors.FieldDescriptor, Object> fields, Map<String, String> result)
Expand Down Expand Up @@ -230,7 +238,7 @@ public SearchGoogleAdsRequest buildRequest(PluginTask task, String query)
.build();
}

public String buildQuery(PluginTask task, String startDateTime)
public String buildQuery(PluginTask task, Map<String, String> params)
{
StringBuilder sb = new StringBuilder();

Expand All @@ -240,7 +248,7 @@ public String buildQuery(PluginTask task, String startDateTime)
sb.append(" FROM ");
sb.append(task.getResourceType());

List<String> whereClause = buildWhereClauseConditions(task, startDateTime);
List<String> whereClause = buildWhereClauseConditions(task, params);
if (!whereClause.isEmpty()) {
sb.append(" WHERE ");
sb.append(String.join(" AND ", whereClause));
Expand All @@ -255,7 +263,7 @@ public String buildQuery(PluginTask task, String startDateTime)
}

@VisibleForTesting
public List<String> buildWhereClauseConditions(PluginTask task, String startDateTime)
public List<String> buildWhereClauseConditions(PluginTask task, Map<String, String> params)
{
List<String> whereConditions = new ArrayList<String>()
{
Expand All @@ -264,7 +272,7 @@ public List<String> buildWhereClauseConditions(PluginTask task, String startDate
if (task.getDateRange().isPresent()) {
StringBuilder dateSb = new StringBuilder();
if (task.getResourceType().equals("change_event")) {
dateSb.append(buildWhereClauseConditionsForChangeEvent(startDateTime));
dateSb.append(buildWhereClauseConditionsForChangeEvent(params.get("start_datetime")));
} else {
dateSb.append("segments.date BETWEEN '");
dateSb.append(task.getDateRange().get().getStartDate());
Expand Down