forked from apache/arrow
-
Notifications
You must be signed in to change notification settings - Fork 1
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
apacheGH-44361: [C#][Integration] Include .NET in Flight integration …
…tests (apache#44377) ### Rationale for this change See apache#44361. This allows testing compatibility of the .NET Flight implementation with other Flight implementations. ### What changes are included in this PR? * Adds a new `Apache.Arrow.Flight.IntegrationTest` project that can run in server or client mode for Flight integration tests. * Includes the integration tests that send then retrieve data defined in JSON files, but doesn't add any of the named scenarios * Configures archery to include C# in the Flight integration tests, but skip all the named scenarios * Also skips tests that use dictionary data due to apache#38045, and the empty data test due to apache#44363 ### Are these changes tested? These changes are tests. ### Are there any user-facing changes? No * GitHub Issue: apache#44361 Authored-by: Adam Reeve <adreeve@gmail.com> Signed-off-by: Curt Hagenlocher <curt@hagenlocher.org>
- Loading branch information
Showing
14 changed files
with
502 additions
and
20 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
18 changes: 18 additions & 0 deletions
18
csharp/test/Apache.Arrow.Flight.IntegrationTest/Apache.Arrow.Flight.IntegrationTest.csproj
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,18 @@ | ||
<?xml version="1.0" encoding="utf-8"?> | ||
<Project Sdk="Microsoft.NET.Sdk"> | ||
|
||
<PropertyGroup> | ||
<OutputType>Exe</OutputType> | ||
<TargetFramework>net8.0</TargetFramework> | ||
<RootNamespace>Apache.Arrow.Flight.IntegrationTest</RootNamespace> | ||
</PropertyGroup> | ||
|
||
<ItemGroup> | ||
<PackageReference Include="System.CommandLine" Version="2.0.0-beta4.22272.1" /> | ||
<PackageReference Include="System.Text.Json" Version="8.0.5" /> | ||
<ProjectReference Include="..\..\src\Apache.Arrow.Flight\Apache.Arrow.Flight.csproj" /> | ||
<ProjectReference Include="..\Apache.Arrow.Flight.TestWeb\Apache.Arrow.Flight.TestWeb.csproj" /> | ||
<ProjectReference Include="..\Apache.Arrow.IntegrationTest\Apache.Arrow.IntegrationTest.csproj" /> | ||
</ItemGroup> | ||
|
||
</Project> |
51 changes: 51 additions & 0 deletions
51
csharp/test/Apache.Arrow.Flight.IntegrationTest/FlightClientCommand.cs
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,51 @@ | ||
// Licensed to the Apache Software Foundation (ASF) under one or more | ||
// contributor license agreements. See the NOTICE file distributed with | ||
// this work for additional information regarding copyright ownership. | ||
// The ASF licenses this file to You under the Apache License, Version 2.0 | ||
// (the "License"); you may not use this file except in compliance with | ||
// the License. You may obtain a copy of the License at | ||
// | ||
// http://www.apache.org/licenses/LICENSE-2.0 | ||
// | ||
// Unless required by applicable law or agreed to in writing, software | ||
// distributed under the License is distributed on an "AS IS" BASIS, | ||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
// See the License for the specific language governing permissions and | ||
// limitations under the License. | ||
|
||
using System; | ||
using System.IO; | ||
using System.Threading.Tasks; | ||
|
||
namespace Apache.Arrow.Flight.IntegrationTest; | ||
|
||
public class FlightClientCommand | ||
{ | ||
private readonly int _port; | ||
private readonly string _scenario; | ||
private readonly FileInfo _jsonFileInfo; | ||
|
||
public FlightClientCommand(int port, string scenario, FileInfo jsonFileInfo) | ||
{ | ||
_port = port; | ||
_scenario = scenario; | ||
_jsonFileInfo = jsonFileInfo; | ||
} | ||
|
||
public async Task Execute() | ||
{ | ||
if (!string.IsNullOrEmpty(_scenario)) | ||
{ | ||
// No named scenarios are currently implemented | ||
throw new Exception($"Scenario '{_scenario}' is not supported."); | ||
} | ||
|
||
if (!(_jsonFileInfo?.Exists ?? false)) | ||
{ | ||
throw new Exception($"Invalid JSON file path '{_jsonFileInfo?.FullName}'"); | ||
} | ||
|
||
var scenario = new JsonTestScenario(_port, _jsonFileInfo); | ||
await scenario.RunClient().ConfigureAwait(false); | ||
} | ||
} |
68 changes: 68 additions & 0 deletions
68
csharp/test/Apache.Arrow.Flight.IntegrationTest/FlightServerCommand.cs
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,68 @@ | ||
// Licensed to the Apache Software Foundation (ASF) under one or more | ||
// contributor license agreements. See the NOTICE file distributed with | ||
// this work for additional information regarding copyright ownership. | ||
// The ASF licenses this file to You under the Apache License, Version 2.0 | ||
// (the "License"); you may not use this file except in compliance with | ||
// the License. You may obtain a copy of the License at | ||
// | ||
// http://www.apache.org/licenses/LICENSE-2.0 | ||
// | ||
// Unless required by applicable law or agreed to in writing, software | ||
// distributed under the License is distributed on an "AS IS" BASIS, | ||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
// See the License for the specific language governing permissions and | ||
// limitations under the License. | ||
|
||
using System; | ||
using System.Net; | ||
using System.Threading.Tasks; | ||
using Apache.Arrow.Flight.TestWeb; | ||
using Microsoft.AspNetCore.Hosting; | ||
using Microsoft.AspNetCore.Hosting.Server; | ||
using Microsoft.AspNetCore.Hosting.Server.Features; | ||
using Microsoft.AspNetCore.Server.Kestrel.Core; | ||
using Microsoft.Extensions.DependencyInjection; | ||
using Microsoft.Extensions.Hosting; | ||
|
||
namespace Apache.Arrow.Flight.IntegrationTest; | ||
|
||
public class FlightServerCommand | ||
{ | ||
private readonly string _scenario; | ||
|
||
public FlightServerCommand(string scenario) | ||
{ | ||
_scenario = scenario; | ||
} | ||
|
||
public async Task Execute() | ||
{ | ||
if (!string.IsNullOrEmpty(_scenario)) | ||
{ | ||
// No named scenarios are currently implemented | ||
throw new Exception($"Scenario '{_scenario}' is not supported."); | ||
} | ||
|
||
var host = Host.CreateDefaultBuilder() | ||
.ConfigureWebHostDefaults(webBuilder => | ||
{ | ||
webBuilder | ||
.ConfigureKestrel(options => | ||
{ | ||
options.Listen(IPEndPoint.Parse("127.0.0.1:0"), l => l.Protocols = HttpProtocols.Http2); | ||
}) | ||
.UseStartup<Startup>(); | ||
}) | ||
.Build(); | ||
|
||
await host.StartAsync().ConfigureAwait(false); | ||
|
||
var addresses = host.Services.GetService<IServer>().Features.Get<IServerAddressesFeature>().Addresses; | ||
foreach (var address in addresses) | ||
{ | ||
Console.WriteLine($"Server listening on {address}"); | ||
} | ||
|
||
await host.WaitForShutdownAsync().ConfigureAwait(false); | ||
} | ||
} |
34 changes: 34 additions & 0 deletions
34
csharp/test/Apache.Arrow.Flight.IntegrationTest/GrpcResolver.cs
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,34 @@ | ||
// Licensed to the Apache Software Foundation (ASF) under one or more | ||
// contributor license agreements. See the NOTICE file distributed with | ||
// this work for additional information regarding copyright ownership. | ||
// The ASF licenses this file to You under the Apache License, Version 2.0 | ||
// (the "License"); you may not use this file except in compliance with | ||
// the License. You may obtain a copy of the License at | ||
// | ||
// http://www.apache.org/licenses/LICENSE-2.0 | ||
// | ||
// Unless required by applicable law or agreed to in writing, software | ||
// distributed under the License is distributed on an "AS IS" BASIS, | ||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
// See the License for the specific language governing permissions and | ||
// limitations under the License. | ||
|
||
using Grpc.Net.Client.Balancer; | ||
|
||
namespace Apache.Arrow.Flight.IntegrationTest; | ||
|
||
/// <summary> | ||
/// The Grpc.Net.Client library doesn't know how to handle the "grpc+tcp" scheme used by Arrow Flight. | ||
/// This ResolverFactory passes these through to the standard Static Resolver used for the http scheme. | ||
/// </summary> | ||
public class GrpcTcpResolverFactory : ResolverFactory | ||
{ | ||
public override string Name => "grpc+tcp"; | ||
|
||
public override Resolver Create(ResolverOptions options) | ||
{ | ||
return new StaticResolverFactory( | ||
uri => new[] { new BalancerAddress(options.Address.Host, options.Address.Port) }) | ||
.Create(options); | ||
} | ||
} |
167 changes: 167 additions & 0 deletions
167
csharp/test/Apache.Arrow.Flight.IntegrationTest/JsonTestScenario.cs
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,167 @@ | ||
// Licensed to the Apache Software Foundation (ASF) under one or more | ||
// contributor license agreements. See the NOTICE file distributed with | ||
// this work for additional information regarding copyright ownership. | ||
// The ASF licenses this file to You under the Apache License, Version 2.0 | ||
// (the "License"); you may not use this file except in compliance with | ||
// the License. You may obtain a copy of the License at | ||
// | ||
// http://www.apache.org/licenses/LICENSE-2.0 | ||
// | ||
// Unless required by applicable law or agreed to in writing, software | ||
// distributed under the License is distributed on an "AS IS" BASIS, | ||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
// See the License for the specific language governing permissions and | ||
// limitations under the License. | ||
|
||
using System; | ||
using System.IO; | ||
using System.Linq; | ||
using System.Threading.Tasks; | ||
using Apache.Arrow.Flight.Client; | ||
using Apache.Arrow.IntegrationTest; | ||
using Apache.Arrow.Tests; | ||
using Apache.Arrow.Types; | ||
using Google.Protobuf; | ||
using Grpc.Net.Client; | ||
using Grpc.Core; | ||
using Grpc.Net.Client.Balancer; | ||
using Microsoft.Extensions.DependencyInjection; | ||
|
||
namespace Apache.Arrow.Flight.IntegrationTest; | ||
|
||
/// <summary> | ||
/// A test scenario defined using a JSON data file | ||
/// </summary> | ||
internal class JsonTestScenario | ||
{ | ||
private readonly int _serverPort; | ||
private readonly FileInfo _jsonFile; | ||
private readonly ServiceProvider _serviceProvider; | ||
|
||
public JsonTestScenario(int serverPort, FileInfo jsonFile) | ||
{ | ||
_serverPort = serverPort; | ||
_jsonFile = jsonFile; | ||
|
||
var services = new ServiceCollection(); | ||
services.AddSingleton<ResolverFactory>(new GrpcTcpResolverFactory()); | ||
_serviceProvider = services.BuildServiceProvider(); | ||
} | ||
|
||
public async Task RunClient() | ||
{ | ||
var address = $"grpc+tcp://localhost:{_serverPort}"; | ||
using var channel = GrpcChannel.ForAddress( | ||
address, | ||
new GrpcChannelOptions | ||
{ | ||
ServiceProvider = _serviceProvider, | ||
Credentials = ChannelCredentials.Insecure | ||
}); | ||
var client = new FlightClient(channel); | ||
|
||
var descriptor = FlightDescriptor.CreatePathDescriptor(_jsonFile.FullName); | ||
|
||
var jsonFile = await JsonFile.ParseAsync(_jsonFile).ConfigureAwait(false); | ||
var schema = jsonFile.GetSchemaAndDictionaries(out Func<DictionaryType, IArrowArray> dictionaries); | ||
var batches = jsonFile.Batches.Select(batch => batch.ToArrow(schema, dictionaries)).ToArray(); | ||
|
||
// 1. Put the data to the server. | ||
await UploadBatches(client, descriptor, batches).ConfigureAwait(false); | ||
|
||
// 2. Get the ticket for the data. | ||
var info = await client.GetInfo(descriptor).ConfigureAwait(false); | ||
if (info.Endpoints.Count == 0) | ||
{ | ||
throw new Exception("No endpoints received"); | ||
} | ||
|
||
// 3. Stream data from the server, comparing individual batches. | ||
foreach (var endpoint in info.Endpoints) | ||
{ | ||
var locations = endpoint.Locations.ToArray(); | ||
if (locations.Length == 0) | ||
{ | ||
// Can read with existing client | ||
await ConsumeFlightLocation(client, endpoint.Ticket, batches).ConfigureAwait(false); | ||
} | ||
else | ||
{ | ||
foreach (var location in locations) | ||
{ | ||
using var readChannel = GrpcChannel.ForAddress( | ||
location.Uri, | ||
new GrpcChannelOptions | ||
{ | ||
ServiceProvider = _serviceProvider, | ||
Credentials = ChannelCredentials.Insecure | ||
}); | ||
var readClient = new FlightClient(readChannel); | ||
await ConsumeFlightLocation(readClient, endpoint.Ticket, batches).ConfigureAwait(false); | ||
} | ||
} | ||
} | ||
} | ||
|
||
private static async Task UploadBatches(FlightClient client, FlightDescriptor descriptor, RecordBatch[] batches) | ||
{ | ||
using var putCall = client.StartPut(descriptor); | ||
using var writer = putCall.RequestStream; | ||
|
||
try | ||
{ | ||
var counter = 0; | ||
foreach (var batch in batches) | ||
{ | ||
var metadata = $"{counter}"; | ||
|
||
await writer.WriteAsync(batch, ByteString.CopyFromUtf8(metadata)).ConfigureAwait(false); | ||
|
||
// Verify server has acknowledged the write request | ||
await putCall.ResponseStream.MoveNext().ConfigureAwait(false); | ||
var responseString = putCall.ResponseStream.Current.ApplicationMetadata.ToStringUtf8(); | ||
|
||
if (responseString != metadata) | ||
{ | ||
throw new Exception($"Response metadata '{responseString}' does not match expected metadata '{metadata}'"); | ||
} | ||
|
||
counter++; | ||
} | ||
} | ||
finally | ||
{ | ||
await writer.CompleteAsync().ConfigureAwait(false); | ||
} | ||
|
||
// Drain the response stream to ensure the server has stored the data | ||
var hasMore = await putCall.ResponseStream.MoveNext().ConfigureAwait(false); | ||
if (hasMore) | ||
{ | ||
throw new Exception("Expected to have reached the end of the response stream"); | ||
} | ||
} | ||
|
||
private static async Task ConsumeFlightLocation(FlightClient client, FlightTicket ticket, RecordBatch[] batches) | ||
{ | ||
using var readStream = client.GetStream(ticket); | ||
var counter = 0; | ||
foreach (var originalBatch in batches) | ||
{ | ||
if (!await readStream.ResponseStream.MoveNext().ConfigureAwait(false)) | ||
{ | ||
throw new Exception($"Expected {batches.Length} batches but received {counter}"); | ||
} | ||
|
||
var batch = readStream.ResponseStream.Current; | ||
ArrowReaderVerifier.CompareBatches(originalBatch, batch, strictCompare: false); | ||
|
||
counter++; | ||
} | ||
|
||
if (await readStream.ResponseStream.MoveNext().ConfigureAwait(false)) | ||
{ | ||
throw new Exception($"Expected to reach the end of the response stream after {batches.Length} batches"); | ||
} | ||
} | ||
} |
Oops, something went wrong.