Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sync main #123

Merged
merged 18 commits into from
Mar 3, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions contributor-docs/code-contributions.md
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,13 @@ After authenticated, install the plugin into your local repository:
mvn clean install -pl plugins/templates-maven-plugin -am
```

WARNING: After any changes to the plugin itself, those changes may be cached
and prevent any future changes from being observed. Please reissue:

```shell
mvn clean install -pl plugins/templates-maven-plugin -am
```

### Staging (Deploying) Templates

To stage a Template, it is necessary to upload the images to Artifact
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import com.google.cloud.bigquery.TableId;
import com.google.cloud.teleport.metadata.DirectRunnerTest;
import com.google.cloud.teleport.metadata.MultiTemplateIntegrationTest;
import com.google.cloud.teleport.metadata.SkipRunnerV2Test;
import com.google.cloud.teleport.metadata.Template;
import com.google.cloud.teleport.metadata.Template.TemplateType;
import com.google.cloud.teleport.metadata.TemplateCreationParameter;
Expand Down Expand Up @@ -134,6 +135,7 @@ protected void starting(Description description) {
protected GcsResourceManager artifactClient;

private boolean usingDirectRunner;
private boolean skipRunnerV2;
protected PipelineLauncher pipelineLauncher;
protected boolean skipBaseCleanup;

Expand All @@ -153,6 +155,7 @@ public void setUpBase() throws ExecutionException {
if (category != null) {
usingDirectRunner =
Arrays.asList(category.value()).contains(DirectRunnerTest.class) || usingDirectRunner;
skipRunnerV2 = Arrays.asList(category.value()).contains(SkipRunnerV2Test.class);
}
} catch (NoSuchMethodException e) {
// ignore error
Expand Down Expand Up @@ -492,9 +495,10 @@ protected LaunchInfo launchTemplate(
// Property allows testing with Runner v2 / Unified Worker
String unifiedWorkerHarnessContainerImage =
System.getProperty("unifiedWorkerHarnessContainerImage");
if (System.getProperty("unifiedWorker") != null || unifiedWorkerHarnessContainerImage != null) {
if (!skipRunnerV2
&& (System.getProperty("unifiedWorker") != null
|| unifiedWorkerHarnessContainerImage != null)) {
appendExperiment(options, "use_runner_v2");

if (System.getProperty("sdkContainerImage") != null) {
options.addParameter("sdkContainerImage", System.getProperty("sdkContainerImage"));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import com.google.cloud.spanner.SpannerOptions;
import com.google.cloud.spanner.Statement;
import com.google.cloud.spanner.Struct;
import com.google.cloud.spanner.TransactionRunner.TransactionCallable;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
Expand All @@ -55,6 +56,7 @@
import java.util.List;
import java.util.Map;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.beam.it.common.ResourceManager;
import org.apache.beam.it.common.utils.ExceptionUtils;
Expand Down Expand Up @@ -370,6 +372,64 @@ public synchronized void write(Iterable<Mutation> tableRecords) throws IllegalSt
}
}

/**
* Writes a collection of mutations into one or more tables inside a ReadWriteTransaction. This
* method requires {@link SpannerResourceManager#executeDdlStatement(String)} to be called
* beforehand.
*
* @param mutations A collection of mutation objects.
*/
public void writeInTransaction(Iterable<Mutation> mutations) {
checkIsUsable();
checkHasInstanceAndDatabase();

LOG.info("Sending {} mutations to {}.{}", Iterables.size(mutations), instanceId, databaseId);
DatabaseClient databaseClient =
spanner.getDatabaseClient(DatabaseId.of(projectId, instanceId, databaseId));
databaseClient
.readWriteTransaction()
.run(
(TransactionCallable<Void>)
transaction -> {
transaction.buffer(mutations);
return null;
});
LOG.info("Successfully sent mutations to {}.{}", instanceId, databaseId);
}

/**
* Executes a list of DML statements. This method requires {@link
* SpannerResourceManager#executeDdlStatement(String)} to be called beforehand.
*
* @param statements The DML statements.
* @throws IllegalStateException if method is called after resources have been cleaned up.
*/
public synchronized void executeDMLStatements(List<String> statements)
throws IllegalStateException {
checkIsUsable();
checkHasInstanceAndDatabase();

LOG.info("Executing DML statements on database {}.", statements, databaseId);
List<Statement> statementsList =
statements.stream().map(s -> Statement.of(s)).collect(Collectors.toList());
try {
DatabaseClient databaseClient =
spanner.getDatabaseClient(DatabaseId.of(projectId, instanceId, databaseId));
databaseClient
.readWriteTransaction()
.run(
(TransactionCallable<Void>)
transaction -> {
transaction.batchUpdate(statementsList);
return null;
});
LOG.debug(
"Successfully executed DML statements '{}' on database {}.", statements, databaseId);
} catch (Exception e) {
throw new SpannerResourceManagerException("Failed to execute statement.", e);
}
}

/**
* Runs the specified query.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

import com.google.cloud.spanner.Mutation;
import com.google.cloud.spanner.Struct;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import org.apache.beam.it.common.utils.ResourceManagerUtils;
Expand Down Expand Up @@ -104,6 +105,73 @@ public void testResourceManagerE2E() {
Map.of("RowId", 2, "FirstName", "Jane", "LastName", "Doe", "Company", "Alphabet")));
}

@Test
public void testResourceManagerWriteInTransactionAndExecuteDML() {
// Arrange
spannerResourceManager.executeDdlStatement(
"CREATE TABLE "
+ TABLE_ID
+ " ("
+ "RowId INT64 NOT NULL,"
+ "FirstName STRING(1024),"
+ "LastName STRING(1024),"
+ "Company STRING(1024)"
+ ") PRIMARY KEY (RowId)");

List<Mutation> mutations =
List.of(
Mutation.newInsertBuilder(TABLE_ID)
.set("RowId")
.to(1)
.set("FirstName")
.to("John")
.set("LastName")
.to("Doe")
.set("Company")
.to("Google")
.build(),
Mutation.newInsertBuilder(TABLE_ID)
.set("RowId")
.to(2)
.set("FirstName")
.to("Jane")
.set("LastName")
.to("Doe")
.set("Company")
.to("Alphabet")
.build());

List<String> statements =
Arrays.asList(
"INSERT INTO "
+ TABLE_ID
+ " (RowId, FirstName, LastName, Company) values (3, 'Tester', 'Doe', 'Youtube')",
"INSERT INTO "
+ TABLE_ID
+ " (RowId, FirstName, LastName, Company) values (4, 'Jacob', 'Doe', 'DeepMind')");

// Act
spannerResourceManager.writeInTransaction(mutations);
spannerResourceManager.executeDMLStatements(statements);
long rowCount = spannerResourceManager.getRowCount(TABLE_ID);

List<Struct> fetchRecords =
spannerResourceManager.readTableRecords(
TABLE_ID, List.of("RowId", "FirstName", "LastName", "Company"));

// Assert
assertThat(rowCount).isEqualTo(4);
assertThat(fetchRecords).hasSize(4);
assertThatStructs(fetchRecords)
.hasRecordsUnorderedCaseInsensitiveColumns(
List.of(
Map.of("RowId", 1, "FirstName", "John", "LastName", "Doe", "Company", "Google"),
Map.of("RowId", 2, "FirstName", "Jane", "LastName", "Doe", "Company", "Alphabet"),
Map.of("RowId", 3, "FirstName", "Tester", "LastName", "Doe", "Company", "Youtube"),
Map.of(
"RowId", 4, "FirstName", "Jacob", "LastName", "Doe", "Company", "DeepMind")));
}

@After
public void tearDown() {
ResourceManagerUtils.cleanResources(spannerResourceManager);
Expand Down
Loading
Loading