diff --git a/Anthropic.sln b/Anthropic.sln index 15013d25..ac27bfc7 100644 --- a/Anthropic.sln +++ b/Anthropic.sln @@ -1,7 +1,7 @@  Microsoft Visual Studio Solution File, Format Version 12.00 # Visual Studio Version 18 -VisualStudioVersion = 18.0.11205.157 d18.0 +VisualStudioVersion = 18.0.11205.157 MinimumVisualStudioVersion = 10.0.40219.1 Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Anthropic", "src\Anthropic\Anthropic.csproj", "{5816A0C1-3BA1-454E-8D08-85B23DEF309D}" EndProject @@ -15,15 +15,23 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Anthropic.Bedrock", "src\An EndProject Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Solution Items", "Solution Items", "{93E58BA5-CEFE-447E-AC0C-F2C5BC4C411D}" ProjectSection(SolutionItems) = preProject - CHANGELOG.md - .csharpierignore - .editorconfig - .gitignore - LICENSE - README.md - SECURITY.md + CHANGELOG.md = CHANGELOG.md + .csharpierignore = .csharpierignore + .editorconfig = .editorconfig + .gitignore = .gitignore + LICENSE = LICENSE + README.md = README.md + SECURITY.md = SECURITY.md EndProjectSection EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Anthropic.Vertex", "src\Anthropic.Vertex\Anthropic.Vertex.csproj", "{A316C280-3880-4674-9B0C-2F6FE77B8B49}" +EndProject +Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "examples", "examples", "{B36A84DF-456D-A817-6EDD-3EC3E7F6E11F}" +EndProject +Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "StreamingAggregationBetaExample", "StreamingAggregationBetaExample", "{6D5DEC39-3673-615E-54FA-C74C69400686}" +EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "StreamingAggregationExample", "examples\StreamingAggregationBetaExample\StreamingAggregationExample.csproj", "{0E525CB1-B565-4ABE-B477-48C53F9BB258}" +EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug|Any CPU = Debug|Any CPU @@ -82,6 +90,30 @@ Global {72FC9906-07F4-4911-8D6B-F9814974BB37}.Release|x64.Build.0 = Release|Any CPU {72FC9906-07F4-4911-8D6B-F9814974BB37}.Release|x86.ActiveCfg = Release|Any CPU {72FC9906-07F4-4911-8D6B-F9814974BB37}.Release|x86.Build.0 = Release|Any CPU + {A316C280-3880-4674-9B0C-2F6FE77B8B49}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {A316C280-3880-4674-9B0C-2F6FE77B8B49}.Debug|Any CPU.Build.0 = Debug|Any CPU + {A316C280-3880-4674-9B0C-2F6FE77B8B49}.Debug|x64.ActiveCfg = Debug|Any CPU + {A316C280-3880-4674-9B0C-2F6FE77B8B49}.Debug|x64.Build.0 = Debug|Any CPU + {A316C280-3880-4674-9B0C-2F6FE77B8B49}.Debug|x86.ActiveCfg = Debug|Any CPU + {A316C280-3880-4674-9B0C-2F6FE77B8B49}.Debug|x86.Build.0 = Debug|Any CPU + {A316C280-3880-4674-9B0C-2F6FE77B8B49}.Release|Any CPU.ActiveCfg = Release|Any CPU + {A316C280-3880-4674-9B0C-2F6FE77B8B49}.Release|Any CPU.Build.0 = Release|Any CPU + {A316C280-3880-4674-9B0C-2F6FE77B8B49}.Release|x64.ActiveCfg = Release|Any CPU + {A316C280-3880-4674-9B0C-2F6FE77B8B49}.Release|x64.Build.0 = Release|Any CPU + {A316C280-3880-4674-9B0C-2F6FE77B8B49}.Release|x86.ActiveCfg = Release|Any CPU + {A316C280-3880-4674-9B0C-2F6FE77B8B49}.Release|x86.Build.0 = Release|Any CPU + {0E525CB1-B565-4ABE-B477-48C53F9BB258}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {0E525CB1-B565-4ABE-B477-48C53F9BB258}.Debug|Any CPU.Build.0 = Debug|Any CPU + {0E525CB1-B565-4ABE-B477-48C53F9BB258}.Debug|x64.ActiveCfg = Debug|Any CPU + {0E525CB1-B565-4ABE-B477-48C53F9BB258}.Debug|x64.Build.0 = Debug|Any CPU + {0E525CB1-B565-4ABE-B477-48C53F9BB258}.Debug|x86.ActiveCfg = Debug|Any CPU + {0E525CB1-B565-4ABE-B477-48C53F9BB258}.Debug|x86.Build.0 = Debug|Any CPU + {0E525CB1-B565-4ABE-B477-48C53F9BB258}.Release|Any CPU.ActiveCfg = Release|Any CPU + {0E525CB1-B565-4ABE-B477-48C53F9BB258}.Release|Any CPU.Build.0 = Release|Any CPU + {0E525CB1-B565-4ABE-B477-48C53F9BB258}.Release|x64.ActiveCfg = Release|Any CPU + {0E525CB1-B565-4ABE-B477-48C53F9BB258}.Release|x64.Build.0 = Release|Any CPU + {0E525CB1-B565-4ABE-B477-48C53F9BB258}.Release|x86.ActiveCfg = Release|Any CPU + {0E525CB1-B565-4ABE-B477-48C53F9BB258}.Release|x86.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE @@ -91,5 +123,8 @@ Global {0732C8A6-7313-4C33-AE2E-FFAA82EFB481} = {827E0CD3-B72D-47B6-A68D-7590B98EB39B} {DD0E539D-6D5F-45EB-A807-01BE0A443604} = {827E0CD3-B72D-47B6-A68D-7590B98EB39B} {72FC9906-07F4-4911-8D6B-F9814974BB37} = {827E0CD3-B72D-47B6-A68D-7590B98EB39B} + {A316C280-3880-4674-9B0C-2F6FE77B8B49} = {827E0CD3-B72D-47B6-A68D-7590B98EB39B} + {6D5DEC39-3673-615E-54FA-C74C69400686} = {B36A84DF-456D-A817-6EDD-3EC3E7F6E11F} + {0E525CB1-B565-4ABE-B477-48C53F9BB258} = {6D5DEC39-3673-615E-54FA-C74C69400686} EndGlobalSection EndGlobal diff --git a/README.md b/README.md index 86f89968..0e9c8f83 100644 --- a/README.md +++ b/README.md @@ -123,6 +123,13 @@ To send a request to the Anthropic API, build an instance of some `Params` class For example, `client.Messages.Create` should be called with an instance of `MessageCreateParams`, and it will return an instance of `Task`. +> [!IMPORTANT] +> We highly encourage you to use [streaming](#streaming) for longer running requests. + +We do not recommend setting a large `MaxTokens` value without using streaming. Some networks may drop idle connections after a certain period of time, which can cause the request to fail or [timeout](#timeouts) without receiving a response from Anthropic. We periodically ping the API to keep the connection alive and reduce the impact of these networks. + +The SDK throws an error if a non-streaming request is expected to take longer than 10 minutes. Using a [streaming method](#streaming) or [overriding the timeout](#timeouts) at the client or request level disables the error. + ## Streaming The SDK defines methods that return response "chunk" streams, where each chunk can be individually processed as soon as it arrives instead of waiting on the full response. Streaming methods generally correspond to [SSE](https://developer.mozilla.org/en-US/docs/Web/API/Server-sent_events) or [JSONL](https://jsonlines.org) responses. @@ -155,6 +162,43 @@ await foreach (var message in client.Messages.CreateStreaming(parameters)) } ``` +### Aggregators + +Both the [Messages](src/Anthropic/Models/Messages/Message.cs) and [BetaMessages](src/Anthropic/Models/Beta/Messages/BetaMessage.cs) streaming endpoints have built-in aggregators that can produce the same object as its non-streaming counterparts. + +It is possible to either only get the full result object via the `.Aggregate()` extension on the `IAsyncEnumerable` returned by the `CreateStreaming` method or insert an external aggregator into a LINQ tree: + +```csharp +IAsyncEnumerable responseUpdates = client.Messages.CreateStreaming( + parameters +); + +// This produces a single object based on the streaming output. +var message = await responseUpdates.Aggregate().ConfigureAwait(false); + +// You can also add an aggregator as part of your LINQ chain to get realtime streaming and aggregation + +var aggregator = new MessageContentAggregator(); +await foreach (RawMessageStreamEvent rawEvent in responseUpdates.CollectAsync(aggregator)) +{ + // Do something with the stream events + if (rawEvent.TryPickContentBlockDelta(out var delta)) + { + if (delta.Delta.TryPickThinking(out var thinkingDelta)) + { + Console.Write(thinkingDelta.Thinking); + } + else if (delta.Delta.TryPickText(out var textDelta)) + { + Console.Write(textDelta.Text); + } + } +} + +// And then get the full aggregated message. +var fullMessage = await aggregator.Message(); +``` + ## `IChatClient` The SDK provides an implementation of the `IChatClient` interface from the `Microsoft.Extensions.AI.Abstractions` library. diff --git a/examples/Anthropic.Examples.sln b/examples/Anthropic.Examples.sln index 933d318b..8be1c01b 100644 --- a/examples/Anthropic.Examples.sln +++ b/examples/Anthropic.Examples.sln @@ -13,10 +13,13 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "MessagesStreamingExample", EndProject Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "ChatClientExample", "ChatClientExample\ChatClientExample.csproj", "{AA6ED2E6-A693-4FD8-AF20-1D339FBFBF96}" EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "StreamingAggregationExample", "StreamingAggregationExample\StreamingAggregationExample.csproj", "{8E03AF57-D948-47B7-9038-D6F1468B6BCC}" Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "MessagesExample.Foundry", "MessagesExample.Foundry\MessagesExample.Foundry.csproj", "{5AD47EFB-4DE0-4F91-A6CB-C1DE6EFD5BD2}" EndProject Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "MessagesExample.Bedrock", "MessagesExample.Bedrock\MessagesExample.Bedrock.csproj", "{7AD57F3D-FF5A-472B-ABEF-FC381D5CCC0C}" EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "MessagesExample.Vertex", "MessagesExample.Vertex\MessagesExample.Vertex.csproj", "{E2B0C44D-3B9C-4ACD-BA53-2E27F4111004}" +EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug|Any CPU = Debug|Any CPU @@ -87,6 +90,18 @@ Global {AA6ED2E6-A693-4FD8-AF20-1D339FBFBF96}.Release|x64.Build.0 = Release|Any CPU {AA6ED2E6-A693-4FD8-AF20-1D339FBFBF96}.Release|x86.ActiveCfg = Release|Any CPU {AA6ED2E6-A693-4FD8-AF20-1D339FBFBF96}.Release|x86.Build.0 = Release|Any CPU + {8E03AF57-D948-47B7-9038-D6F1468B6BCC}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {8E03AF57-D948-47B7-9038-D6F1468B6BCC}.Debug|Any CPU.Build.0 = Debug|Any CPU + {8E03AF57-D948-47B7-9038-D6F1468B6BCC}.Debug|x64.ActiveCfg = Debug|Any CPU + {8E03AF57-D948-47B7-9038-D6F1468B6BCC}.Debug|x64.Build.0 = Debug|Any CPU + {8E03AF57-D948-47B7-9038-D6F1468B6BCC}.Debug|x86.ActiveCfg = Debug|Any CPU + {8E03AF57-D948-47B7-9038-D6F1468B6BCC}.Debug|x86.Build.0 = Debug|Any CPU + {8E03AF57-D948-47B7-9038-D6F1468B6BCC}.Release|Any CPU.ActiveCfg = Release|Any CPU + {8E03AF57-D948-47B7-9038-D6F1468B6BCC}.Release|Any CPU.Build.0 = Release|Any CPU + {8E03AF57-D948-47B7-9038-D6F1468B6BCC}.Release|x64.ActiveCfg = Release|Any CPU + {8E03AF57-D948-47B7-9038-D6F1468B6BCC}.Release|x64.Build.0 = Release|Any CPU + {8E03AF57-D948-47B7-9038-D6F1468B6BCC}.Release|x86.ActiveCfg = Release|Any CPU + {8E03AF57-D948-47B7-9038-D6F1468B6BCC}.Release|x86.Build.0 = Release|Any CPU {5AD47EFB-4DE0-4F91-A6CB-C1DE6EFD5BD2}.Debug|Any CPU.ActiveCfg = Debug|Any CPU {5AD47EFB-4DE0-4F91-A6CB-C1DE6EFD5BD2}.Debug|Any CPU.Build.0 = Debug|Any CPU {5AD47EFB-4DE0-4F91-A6CB-C1DE6EFD5BD2}.Debug|x64.ActiveCfg = Debug|Any CPU @@ -111,6 +126,18 @@ Global {7AD57F3D-FF5A-472B-ABEF-FC381D5CCC0C}.Release|x64.Build.0 = Release|Any CPU {7AD57F3D-FF5A-472B-ABEF-FC381D5CCC0C}.Release|x86.ActiveCfg = Release|Any CPU {7AD57F3D-FF5A-472B-ABEF-FC381D5CCC0C}.Release|x86.Build.0 = Release|Any CPU + {E2B0C44D-3B9C-4ACD-BA53-2E27F4111004}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {E2B0C44D-3B9C-4ACD-BA53-2E27F4111004}.Debug|Any CPU.Build.0 = Debug|Any CPU + {E2B0C44D-3B9C-4ACD-BA53-2E27F4111004}.Debug|x64.ActiveCfg = Debug|Any CPU + {E2B0C44D-3B9C-4ACD-BA53-2E27F4111004}.Debug|x64.Build.0 = Debug|Any CPU + {E2B0C44D-3B9C-4ACD-BA53-2E27F4111004}.Debug|x86.ActiveCfg = Debug|Any CPU + {E2B0C44D-3B9C-4ACD-BA53-2E27F4111004}.Debug|x86.Build.0 = Debug|Any CPU + {E2B0C44D-3B9C-4ACD-BA53-2E27F4111004}.Release|Any CPU.ActiveCfg = Release|Any CPU + {E2B0C44D-3B9C-4ACD-BA53-2E27F4111004}.Release|Any CPU.Build.0 = Release|Any CPU + {E2B0C44D-3B9C-4ACD-BA53-2E27F4111004}.Release|x64.ActiveCfg = Release|Any CPU + {E2B0C44D-3B9C-4ACD-BA53-2E27F4111004}.Release|x64.Build.0 = Release|Any CPU + {E2B0C44D-3B9C-4ACD-BA53-2E27F4111004}.Release|x86.ActiveCfg = Release|Any CPU + {E2B0C44D-3B9C-4ACD-BA53-2E27F4111004}.Release|x86.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE diff --git a/examples/MessagesExample.Bedrock/Program.cs b/examples/MessagesExample.Bedrock/Program.cs index 2908304b..527c0b20 100644 --- a/examples/MessagesExample.Bedrock/Program.cs +++ b/examples/MessagesExample.Bedrock/Program.cs @@ -32,8 +32,8 @@ await AnthropicBedrockCredentialsHelper.FromEnv().ConfigureAwait(false) var message = string.Join( "", response - .Content.Where(message => message.Value is TextBlock) - .Select(message => message.Value as TextBlock) + .Content.Select(message => message.Value) + .OfType() .Select((textBlock) => textBlock.Text) ); diff --git a/examples/MessagesExample.Vertex/MessagesExample.Vertex.csproj b/examples/MessagesExample.Vertex/MessagesExample.Vertex.csproj new file mode 100644 index 00000000..7aa8b946 --- /dev/null +++ b/examples/MessagesExample.Vertex/MessagesExample.Vertex.csproj @@ -0,0 +1,14 @@ + + + + Exe + net8.0 + enable + enable + + + + + + + diff --git a/examples/MessagesExample.Vertex/Program.cs b/examples/MessagesExample.Vertex/Program.cs new file mode 100644 index 00000000..fc7435bf --- /dev/null +++ b/examples/MessagesExample.Vertex/Program.cs @@ -0,0 +1,51 @@ +using Anthropic.Models.Messages; +using Anthropic.Vertex; +using Google.Apis.Auth.OAuth2; + +// The google vertex client needs a Project ID, use the ID from the google cloud dashboard. +// The region parameter is optional. + +// By default the Vertex Credential provider tries to load system wide credentials generated via the "gcloud" tool. +// For application wide credentials we recommend using service accounts instead and providing your own GoogleCredentials. Example: +/* +var client = new AnthropicVertexClient(new AnthropicVertexCredentials(null, "YourProjectId", GoogleCredential.FromJson( +""" +{ + ServiceAccount JSON +} +""").CreateScoped("https://www.googleapis.com/auth/cloud-platform"))); +*/ + +// or you can load the credentials from your system after you set it with the necessary environment variables by calling +// var client = new AnthropicVertexClient(await DefaultAnthropicVertexCredentials.FromEnvAsync()); + +// The main variables you can set are below. There are more options available; consult the method's documentation for more info. +// +// ANTHROPIC_VERTEX_PROJECT_ID=your_project_id +// CLOUD_ML_REGION=region_name +// VERTEX_ACCESS_TOKEN=vertex_access_token +// + +var client = new AnthropicVertexClient(new AnthropicVertexCredentials(null, "YourProjectId")); + +MessageCreateParams parameters = new() +{ + MaxTokens = 2048, + Messages = + [ + new() { Content = "Tell me a story about building the best SDK!", Role = Role.User }, + ], + Model = "claude-3-7-sonnet@20250219", +}; + +var response = await client.Messages.Create(parameters); + +var message = string.Join( + "", + response + .Content.Select(message => message.Value) + .OfType() + .Select((textBlock) => textBlock.Text) +); + +Console.WriteLine(message); diff --git a/examples/StreamingAggregationBetaExample/Program.cs b/examples/StreamingAggregationBetaExample/Program.cs new file mode 100644 index 00000000..667e8209 --- /dev/null +++ b/examples/StreamingAggregationBetaExample/Program.cs @@ -0,0 +1,48 @@ +using Anthropic; +using Anthropic.Helpers; +using Anthropic.Models.Beta.Messages; +using Anthropic.Services.Beta.Messages; +using Messages = Anthropic.Models.Messages; + +// Configured using the ANTHROPIC_API_KEY, ANTHROPIC_AUTH_TOKEN and ANTHROPIC_BASE_URL environment variables +AnthropicClient client = new(); + +var responseUpdates = client.Beta.Messages.CreateStreaming( + new() + { + MaxTokens = 2048, + Messages = + [ + new() { Content = "Tell me a story about building the best SDK!", Role = Role.User }, + ], + Model = Messages::Model.Claude4Sonnet20250514, + } +); + +// some streaming endpoints have built-in aggregators that create logically aggregated objects. +// these represent the full stream as a single object. +var message = await responseUpdates.Aggregate().ConfigureAwait(false); +Console.WriteLine(message); + +// you can also add an aggregator as part of your LINQ chain to get real-time streaming and aggregation + +var aggregator = new BetaMessageContentAggregator(); +await foreach (BetaRawMessageStreamEvent rawEvent in responseUpdates.CollectAsync(aggregator)) +{ + // do something with the stream events + if (rawEvent.TryPickContentBlockDelta(out var delta)) + { + if (delta.Delta.TryPickThinking(out var thinkingDelta)) + { + Console.Write(thinkingDelta.Thinking); + } + else if (delta.Delta.TryPickText(out var textDelta)) + { + Console.Write(textDelta.Text); + } + } +} + +// and then get the full aggregated message +var message2 = aggregator.Message(); +Console.WriteLine(message2); diff --git a/examples/StreamingAggregationBetaExample/StreamingAggregationExample.csproj b/examples/StreamingAggregationBetaExample/StreamingAggregationExample.csproj new file mode 100644 index 00000000..37dbf337 --- /dev/null +++ b/examples/StreamingAggregationBetaExample/StreamingAggregationExample.csproj @@ -0,0 +1,11 @@ + + + + + + Exe + net8.0 + enable + enable + + diff --git a/examples/StreamingAggregationExample/Program.cs b/examples/StreamingAggregationExample/Program.cs new file mode 100644 index 00000000..03aa82e7 --- /dev/null +++ b/examples/StreamingAggregationExample/Program.cs @@ -0,0 +1,50 @@ +using Anthropic; +using Anthropic.Helpers; +using Anthropic.Models.Messages; +using Anthropic.Services.Messages; + +// Configured using the ANTHROPIC_API_KEY, ANTHROPIC_AUTH_TOKEN and ANTHROPIC_BASE_URL environment variables +AnthropicClient client = new(); + +MessageCreateParams parameters = new() +{ + MaxTokens = 2048, + Messages = + [ + new() { Content = "Tell me a story about building the best SDK!", Role = Role.User }, + ], + Model = Model.Claude4Sonnet20250514, + Thinking = new ThinkingConfigEnabled() { BudgetTokens = 1024 }, +}; + +IAsyncEnumerable responseUpdates = client.Messages.CreateStreaming( + parameters +); + +// some streaming endpoints have built-in aggregators that create logically aggregated objects. +// these represent the full stream as a single object. +var message = await responseUpdates.Aggregate().ConfigureAwait(false); +Console.WriteLine(message); + +// you can also add an aggregator as part of your LINQ chain to get real-time streaming and aggregation + +var aggregator = new MessageContentAggregator(); +await foreach (RawMessageStreamEvent rawEvent in responseUpdates.CollectAsync(aggregator)) +{ + // do something with the stream events + if (rawEvent.TryPickContentBlockDelta(out var delta)) + { + if (delta.Delta.TryPickThinking(out var thinkingDelta)) + { + Console.Write(thinkingDelta.Thinking); + } + else if (delta.Delta.TryPickText(out var textDelta)) + { + Console.Write(textDelta.Text); + } + } +} + +// and then get the full aggregated message +var message2 = aggregator.Message(); +Console.WriteLine(message2); diff --git a/examples/StreamingAggregationExample/StreamingAggregationExample.csproj b/examples/StreamingAggregationExample/StreamingAggregationExample.csproj new file mode 100644 index 00000000..37dbf337 --- /dev/null +++ b/examples/StreamingAggregationExample/StreamingAggregationExample.csproj @@ -0,0 +1,11 @@ + + + + + + Exe + net8.0 + enable + enable + + diff --git a/src/Anthropic.Bedrock/AnthropicBedrockClient.cs b/src/Anthropic.Bedrock/AnthropicBedrockClient.cs index 2dfbc200..77b42d51 100644 --- a/src/Anthropic.Bedrock/AnthropicBedrockClient.cs +++ b/src/Anthropic.Bedrock/AnthropicBedrockClient.cs @@ -124,7 +124,11 @@ [.. betaVersions.Select(v => JsonValue.Create(v))] bodyContent["anthropic_version"] = JsonValue.Create(AnthropicVersion); - var modelValue = bodyContent["model"]!; + var modelValue = + bodyContent["model"] + ?? throw new AnthropicInvalidDataException( + "Expected to find property model in request json but found none." + ); bodyContent.Root.AsObject().Remove("model"); var parsedStreamValue = ((bool?)bodyContent["stream"]?.AsValue()) ?? false; bodyContent.Root.AsObject().Remove("stream"); diff --git a/src/Anthropic.Tests/Anthropic.Tests.csproj b/src/Anthropic.Tests/Anthropic.Tests.csproj index b5c6a908..73f63fb5 100644 --- a/src/Anthropic.Tests/Anthropic.Tests.csproj +++ b/src/Anthropic.Tests/Anthropic.Tests.csproj @@ -14,6 +14,7 @@ + @@ -22,7 +23,7 @@ - + diff --git a/src/Anthropic.Tests/AnthropicClientBetaExtensionsTests.cs b/src/Anthropic.Tests/AnthropicClientBetaExtensionsTests.cs index 83671810..df71cabc 100644 --- a/src/Anthropic.Tests/AnthropicClientBetaExtensionsTests.cs +++ b/src/Anthropic.Tests/AnthropicClientBetaExtensionsTests.cs @@ -2493,4 +2493,172 @@ public async Task GetResponseAsync_MeaiUserAgentHeader_PresentAlongsideDefaultHe "Default AnthropicClient user-agent header should be present" ); } + + [Fact] + public async Task GetResponseAsync_WithNullableUnionType_TransformsToSimpleType() + { + // The C# MCP SDK generates schemas with "type": ["integer", "null"], "default": null + // for optional nullable parameters. This test verifies the transformation removes + // the null from the union type and the default: null for non-required properties. + VerbatimHttpHandler handler = new( + expectedRequest: """ + { + "max_tokens": 1024, + "model": "claude-haiku-4-5", + "messages": [{ + "role": "user", + "content": [{ + "type": "text", + "text": "Call tool with optional param" + }] + }], + "tools": [{ + "name": "tool_with_optional", + "description": "A tool with an optional nullable parameter", + "input_schema": { + "type": "object", + "properties": { + "required_param": { + "type": "string" + }, + "optional_number": { + "type": "integer" + } + }, + "required": ["required_param"] + } + }] + } + """, + actualResponse: """ + { + "id": "msg_nullable_01", + "type": "message", + "role": "assistant", + "model": "claude-haiku-4-5", + "content": [{ + "type": "text", + "text": "Tool ready" + }], + "stop_reason": "end_turn", + "usage": { + "input_tokens": 30, + "output_tokens": 5 + } + } + """ + ); + + IChatClient chatClient = CreateChatClient(handler, "claude-haiku-4-5"); + + // Create a function with an optional nullable parameter - the schema will have + // "type": ["integer", "null"], "default": null which should be transformed + var functionWithOptional = AIFunctionFactory.Create( + (string required_param, int? optional_number = null) => "result", + new AIFunctionFactoryOptions + { + Name = "tool_with_optional", + Description = "A tool with an optional nullable parameter", + } + ); + + ChatOptions options = new() { Tools = [functionWithOptional] }; + + ChatResponse response = await chatClient.GetResponseAsync( + "Call tool with optional param", + options, + TestContext.Current.CancellationToken + ); + Assert.NotNull(response); + } + + [Fact] + public async Task GetResponseAsync_WithRequiredNullableUnionType_PreservesUnionType() + { + // When a property IS in the required array but has a nullable union type, + // we should NOT transform it - let the API fail with a meaningful error + // rather than silently misrepresenting the schema. + // Using BetaTool.AsAITool() bypasses the schema transformation, so this test + // verifies the raw tool passes through unchanged. + VerbatimHttpHandler handler = new( + expectedRequest: """ + { + "max_tokens": 1024, + "model": "claude-haiku-4-5", + "messages": [{ + "role": "user", + "content": [{ + "type": "text", + "text": "Call tool with required nullable" + }] + }], + "tools": [{ + "name": "tool_with_required_nullable", + "description": "A tool with a required nullable parameter", + "input_schema": { + "nullable_number": { + "type": ["integer", "null"], + "description": "A required but nullable number" + }, + "type": "object", + "required": ["nullable_number"] + } + }] + } + """, + actualResponse: """ + { + "id": "msg_required_nullable_01", + "type": "message", + "role": "assistant", + "model": "claude-haiku-4-5", + "content": [{ + "type": "text", + "text": "Tool ready" + }], + "stop_reason": "end_turn", + "usage": { + "input_tokens": 30, + "output_tokens": 5 + } + } + """ + ); + + IChatClient chatClient = CreateChatClient(handler, "claude-haiku-4-5"); + + // Create a BetaTool with a required nullable parameter using raw schema + // This simulates a schema that came from elsewhere (not C# MCP SDK pattern) +#pragma warning disable CA1861 // Prefer 'static readonly' fields over constant array arguments - test method only runs once + BetaToolUnion rawTool = new BetaTool + { + Name = "tool_with_required_nullable", + Description = "A tool with a required nullable parameter", + InputSchema = new InputSchema( + new Dictionary + { + ["nullable_number"] = JsonSerializer.SerializeToElement( + new + { + type = new[] { "integer", "null" }, + description = "A required but nullable number", + } + ), + } + ) + { + Required = ["nullable_number"], + }, + }; +#pragma warning restore CA1861 + + ChatOptions options = new() { Tools = [rawTool.AsAITool()] }; + + ChatResponse response = await chatClient.GetResponseAsync( + "Call tool with required nullable", + options, + TestContext.Current.CancellationToken + ); + Assert.NotNull(response); + } } diff --git a/src/Anthropic.Tests/AnthropicClientExtensionsTestsBase.cs b/src/Anthropic.Tests/AnthropicClientExtensionsTestsBase.cs index b645e1f6..2b396840 100644 --- a/src/Anthropic.Tests/AnthropicClientExtensionsTestsBase.cs +++ b/src/Anthropic.Tests/AnthropicClientExtensionsTestsBase.cs @@ -4613,6 +4613,232 @@ public async Task GetResponseAsync_WithFunctionResultContent_UriContent_PDF() Assert.NotNull(response); } + [Fact] + public void WithCacheControl_SetsAdditionalProperty() + { + var content = new TextContent("Hello, world!"); + + content.WithCacheControl(Anthropic.Models.Messages.Ttl.Ttl5m); + + Assert.NotNull(content.AdditionalProperties); + var cacheControl = content.GetCacheControl(); + Assert.NotNull(cacheControl); + Assert.True(cacheControl.Ttl == Anthropic.Models.Messages.Ttl.Ttl5m); + } + + [Fact] + public void WithCacheControl_CacheControlEphemeral_SetsAdditionalProperty() + { + var content = new TextContent("Hello, world!"); + var cacheControl = new Anthropic.Models.Messages.CacheControlEphemeral + { + Ttl = Anthropic.Models.Messages.Ttl.Ttl1h, + }; + + content.WithCacheControl(cacheControl); + + var retrieved = content.GetCacheControl(); + Assert.NotNull(retrieved); + Assert.True(retrieved.Ttl == Anthropic.Models.Messages.Ttl.Ttl1h); + } + + [Fact] + public void WithCacheControl_Null_RemovesCacheControl() + { + var content = new TextContent("Hello, world!"); + content.WithCacheControl(Anthropic.Models.Messages.Ttl.Ttl5m); + + Assert.NotNull(content.GetCacheControl()); + + content.WithCacheControl((Anthropic.Models.Messages.CacheControlEphemeral?)null); + + Assert.Null(content.GetCacheControl()); + } + + [Fact] + public async Task GetResponseAsync_WithCacheControlOnSystemMessage() + { + VerbatimHttpHandler handler = new( + expectedRequest: """ + { + "model": "claude-haiku-4-5", + "messages": [{ + "role": "user", + "content": [{ + "type": "text", + "text": "Hello" + }] + }], + "max_tokens": 1024, + "system": [{ + "type": "text", + "text": "You are a helpful assistant.", + "cache_control": { + "type": "ephemeral", + "ttl": "1h" + } + }] + } + """, + actualResponse: """ + { + "id": "msg_cache_01", + "type": "message", + "role": "assistant", + "model": "claude-haiku-4-5", + "content": [{ + "type": "text", + "text": "Hello!" + }], + "stop_reason": "end_turn", + "usage": { + "input_tokens": 10, + "output_tokens": 5 + } + } + """ + ); + + IChatClient chatClient = CreateChatClient(handler, "claude-haiku-4-5"); + + var systemContent = new TextContent("You are a helpful assistant.").WithCacheControl( + Anthropic.Models.Messages.Ttl.Ttl1h + ); + + List messages = + [ + new(ChatRole.System, [systemContent]), + new(ChatRole.User, "Hello"), + ]; + + ChatResponse response = await chatClient.GetResponseAsync( + messages, + cancellationToken: TestContext.Current.CancellationToken + ); + Assert.NotNull(response); + } + + [Fact] + public async Task GetResponseAsync_WithCacheControlOnUserMessage() + { + VerbatimHttpHandler handler = new( + expectedRequest: """ + { + "model": "claude-haiku-4-5", + "messages": [{ + "role": "user", + "content": [{ + "type": "text", + "text": "What is the meaning of life?", + "cache_control": { + "type": "ephemeral", + "ttl": "5m" + } + }] + }], + "max_tokens": 1024 + } + """, + actualResponse: """ + { + "id": "msg_cache_02", + "type": "message", + "role": "assistant", + "model": "claude-haiku-4-5", + "content": [{ + "type": "text", + "text": "42" + }], + "stop_reason": "end_turn", + "usage": { + "input_tokens": 15, + "output_tokens": 3 + } + } + """ + ); + + IChatClient chatClient = CreateChatClient(handler, "claude-haiku-4-5"); + + var userContent = new TextContent("What is the meaning of life?").WithCacheControl( + Anthropic.Models.Messages.Ttl.Ttl5m + ); + + List messages = [new(ChatRole.User, [userContent])]; + + ChatResponse response = await chatClient.GetResponseAsync( + messages, + cancellationToken: TestContext.Current.CancellationToken + ); + Assert.NotNull(response); + } + + [Fact] + public async Task GetResponseAsync_WithCacheControlOnImage() + { + VerbatimHttpHandler handler = new( + expectedRequest: """ + { + "model": "claude-haiku-4-5", + "messages": [{ + "role": "user", + "content": [{ + "type": "image", + "source": { + "type": "base64", + "media_type": "image/png", + "data": "iVBORw0KGgoAAAANSUhEUgAAAAEAAAABCAYAAAAfFcSJAAAADUlEQVR42mNk+M9QDwADhgGAWjR9awAAAABJRU5ErkJggg==" + }, + "cache_control": { + "type": "ephemeral", + "ttl": "1h" + } + }, { + "type": "text", + "text": "What do you see?" + }] + }], + "max_tokens": 1024 + } + """, + actualResponse: """ + { + "id": "msg_cache_03", + "type": "message", + "role": "assistant", + "model": "claude-haiku-4-5", + "content": [{ + "type": "text", + "text": "I see a small image." + }], + "stop_reason": "end_turn", + "usage": { + "input_tokens": 100, + "output_tokens": 10 + } + } + """ + ); + + IChatClient chatClient = CreateChatClient(handler, "claude-haiku-4-5"); + + var imageContent = new DataContent( + "data:image/png;base64,iVBORw0KGgoAAAANSUhEUgAAAAEAAAABCAYAAAAfFcSJAAAADUlEQVR42mNk+M9QDwADhgGAWjR9awAAAABJRU5ErkJggg==", + "image/png" + ).WithCacheControl(Anthropic.Models.Messages.Ttl.Ttl1h); + + List messages = + [ + new(ChatRole.User, [imageContent, new TextContent("What do you see?")]), + ]; + + ChatResponse response = await chatClient.GetResponseAsync( + messages, + cancellationToken: TestContext.Current.CancellationToken + ); + Assert.NotNull(response); + } + protected sealed class VerbatimHttpHandler(string expectedRequest, string actualResponse) : HttpMessageHandler { diff --git a/src/Anthropic.Tests/AnthropicTestClients.cs b/src/Anthropic.Tests/AnthropicTestClients.cs index 30c26cc5..02ba4e80 100644 --- a/src/Anthropic.Tests/AnthropicTestClients.cs +++ b/src/Anthropic.Tests/AnthropicTestClients.cs @@ -5,6 +5,7 @@ using System.Threading.Tasks; using Anthropic.Bedrock; using Anthropic.Foundry; +using Anthropic.Vertex; using Xunit.Sdk; using Xunit.v3; @@ -91,6 +92,25 @@ .. testData ) ); } + if (TestSupportTypes.HasFlag(TestSupportTypes.Vertex)) + { + rows.Add( + new TheoryDataRow( + [ + new AnthropicVertexClient( + new AnthropicVertexCredentials(Resource, "VertexProject") + ) + { + BaseUrl = DataServiceUrl, + }, + .. testData + .Where(e => e.TestSupport.HasFlag(TestSupportTypes.Vertex)) + .SelectMany(f => f.TestData) + .ToArray(), + ] + ) + ); + } return new ValueTask>(rows); } @@ -116,4 +136,5 @@ public enum TestSupportTypes Anthropic = 1 << 1, Foundry = 1 << 2, Bedrock = 1 << 3, + Vertex = 1 << 4, } diff --git a/src/Anthropic.Tests/Models/Beta/Messages/BetaInputTokensTriggerTest.cs b/src/Anthropic.Tests/Models/Beta/Messages/BetaInputTokensTriggerTest.cs index aafcbe4e..e9899703 100644 --- a/src/Anthropic.Tests/Models/Beta/Messages/BetaInputTokensTriggerTest.cs +++ b/src/Anthropic.Tests/Models/Beta/Messages/BetaInputTokensTriggerTest.cs @@ -9,19 +9,19 @@ public class BetaInputTokensTriggerTest : TestBase [Fact] public void FieldRoundtrip_Works() { - var model = new BetaInputTokensTrigger { Value = 1 }; + var model = new BetaInputTokensTrigger { ValueValue = 1 }; JsonElement expectedType = JsonSerializer.SerializeToElement("input_tokens"); - long expectedValue = 1; + long expectedValueValue = 1; Assert.True(JsonElement.DeepEquals(expectedType, model.Type)); - Assert.Equal(expectedValue, model.Value); + Assert.Equal(expectedValueValue, model.ValueValue); } [Fact] public void SerializationRoundtrip_Works() { - var model = new BetaInputTokensTrigger { Value = 1 }; + var model = new BetaInputTokensTrigger { ValueValue = 1 }; string json = JsonSerializer.Serialize(model, ModelBase.SerializerOptions); var deserialized = JsonSerializer.Deserialize( @@ -35,7 +35,7 @@ public void SerializationRoundtrip_Works() [Fact] public void FieldRoundtripThroughSerialization_Works() { - var model = new BetaInputTokensTrigger { Value = 1 }; + var model = new BetaInputTokensTrigger { ValueValue = 1 }; string element = JsonSerializer.Serialize(model, ModelBase.SerializerOptions); var deserialized = JsonSerializer.Deserialize( @@ -45,16 +45,16 @@ public void FieldRoundtripThroughSerialization_Works() Assert.NotNull(deserialized); JsonElement expectedType = JsonSerializer.SerializeToElement("input_tokens"); - long expectedValue = 1; + long expectedValueValue = 1; Assert.True(JsonElement.DeepEquals(expectedType, deserialized.Type)); - Assert.Equal(expectedValue, deserialized.Value); + Assert.Equal(expectedValueValue, deserialized.ValueValue); } [Fact] public void Validation_Works() { - var model = new BetaInputTokensTrigger { Value = 1 }; + var model = new BetaInputTokensTrigger { ValueValue = 1 }; model.Validate(); } @@ -62,7 +62,7 @@ public void Validation_Works() [Fact] public void CopyConstructor_Works() { - var model = new BetaInputTokensTrigger { Value = 1 }; + var model = new BetaInputTokensTrigger { ValueValue = 1 }; BetaInputTokensTrigger copied = new(model); diff --git a/src/Anthropic.Tests/Services/Beta/MessageServiceTest.cs b/src/Anthropic.Tests/Services/Beta/MessageServiceTest.cs index 56181335..a56b9b20 100644 --- a/src/Anthropic.Tests/Services/Beta/MessageServiceTest.cs +++ b/src/Anthropic.Tests/Services/Beta/MessageServiceTest.cs @@ -1,12 +1,62 @@ +using System.Collections.Generic; +using System.Threading; using System.Threading.Tasks; +using Anthropic.Helpers; using Anthropic.Models.Beta.Messages; -using Anthropic.Tests; +using Anthropic.Services.Beta; +using Moq; using Messages = Anthropic.Models.Messages; namespace Anthropic.Tests.Services.Beta; public class MessageServiceTest { + private static BetaMessage GenerateStartMessage => + new() + { + ID = "Test", + Content = [], + Model = Messages::Model.Claude3OpusLatest, + StopReason = BetaStopReason.ToolUse, + StopSequence = "", + Usage = new() + { + CacheCreation = null, + CacheCreationInputTokens = null, + CacheReadInputTokens = null, + InputTokens = 25, + OutputTokens = 25, + ServerToolUse = null, + ServiceTier = BetaUsageServiceTier.Standard, + InferenceGeo = "inference_geo", + Iterations = + [ + new BetaMessageIterationUsage() + { + CacheCreation = new() + { + Ephemeral1hInputTokens = 0, + Ephemeral5mInputTokens = 0, + }, + CacheCreationInputTokens = 0, + CacheReadInputTokens = 0, + InputTokens = 0, + OutputTokens = 0, + }, + ], + }, + Container = null, + ContextManagement = null, + }; + + private static MessageCreateParams StreamingParam => + new() + { + MaxTokens = 1024, + Messages = [new() { Content = new(""), Role = Role.User }], + Model = Messages::Model.Claude3OpusLatest, + }; + [Theory(Skip = "prism validates based on the non-beta endpoint")] [AnthropicTestClients] public async Task Create_Works(IAnthropicClient client) @@ -57,4 +107,321 @@ public async Task CountTokens_Works(IAnthropicClient client) ); betaMessageTokensCount.Validate(); } + + [Fact] + public async Task CreateStreamingAggregation_WorksNoContent_RawMessageStartEvent() + { + // arrange + + var messagesServiceMock = new Mock(); + static async IAsyncEnumerable GetTestValues() + { + yield return new(new BetaRawMessageStartEvent(GenerateStartMessage)); + yield return new(new BetaRawMessageStopEvent()); + await Task.CompletedTask; + } + messagesServiceMock + .Setup(e => + e.CreateStreaming(It.IsAny(), It.IsAny()) + ) + .Returns(GetTestValues); + + // act + + var stream = await messagesServiceMock + .Object.CreateStreaming(StreamingParam, TestContext.Current.CancellationToken) + .Aggregate(); + + // assert + + Assert.NotNull(stream); + Assert.Empty(stream.Content); + stream.Validate(); + } + + [Fact] + public async Task CreateStreamingAggregation_HandlesNoEndMessageInterrupt() + { + // arrange + + var messagesServiceMock = new Mock(); + static async IAsyncEnumerable GetTestValues() + { + yield return new(new BetaRawMessageStartEvent(GenerateStartMessage)); + await Task.CompletedTask; + } + messagesServiceMock + .Setup(e => + e.CreateStreaming(It.IsAny(), It.IsAny()) + ) + .Returns(GetTestValues); + + // act + + // assert + + await Assert.ThrowsAsync(async () => + await messagesServiceMock + .Object.CreateStreaming(StreamingParam, TestContext.Current.CancellationToken) + .Aggregate() + ); + } + + [Fact] + public async Task CreateStreamingAggregation_WorksNoContent_RawContentBlockStartEvent() + { + // arrange + + var messagesServiceMock = new Mock(); + static async IAsyncEnumerable GetTestValues() + { + yield return new(new BetaRawMessageStartEvent(GenerateStartMessage)); + yield return new( + new BetaRawContentBlockStartEvent() + { + Index = 0, + ContentBlock = new( + new BetaTextBlock() { Citations = [], Text = "Test Output" } + ), + } + ); + yield return new(new BetaRawContentBlockStopEvent() { Index = 0 }); + yield return new(new BetaRawMessageStopEvent()); + await Task.CompletedTask; + } + + messagesServiceMock + .Setup(e => + e.CreateStreaming(It.IsAny(), It.IsAny()) + ) + .Returns(GetTestValues); + + // act + + var stream = await messagesServiceMock + .Object.CreateStreaming(StreamingParam, TestContext.Current.CancellationToken) + .Aggregate(); + + // assert + + Assert.NotNull(stream); + stream.Validate(); + Assert.NotEmpty(stream.Content); + Assert.Single(stream.Content); + Assert.IsType(stream.Content[0].Value); + Assert.Equal("Test Output", ((BetaTextBlock)stream.Content[0].Value!).Text); + } + + [Fact] + public async Task CreateStreamingAggregation_WorksStopEndEvent() + { + // arrange + + var messagesServiceMock = new Mock(); + static async IAsyncEnumerable GetTestValues() + { + yield return new(new BetaRawMessageStartEvent(GenerateStartMessage)); + yield return new( + new BetaRawContentBlockStartEvent() + { + Index = 0, + ContentBlock = new BetaTextBlock() { Citations = [], Text = "this is a " }, + } + ); + yield return new(new BetaRawContentBlockStopEvent() { Index = 0 }); + yield return new( + new BetaRawContentBlockDeltaEvent() + { + Index = 0, + Delta = new(new BetaTextDelta("Test")), + } + ); + yield return new(new BetaRawMessageStopEvent()); + await Task.CompletedTask; + } + messagesServiceMock + .Setup(e => + e.CreateStreaming(It.IsAny(), It.IsAny()) + ) + .Returns(GetTestValues); + + // act + + var stream = await messagesServiceMock + .Object.CreateStreaming(StreamingParam, TestContext.Current.CancellationToken) + .Aggregate(); + + // assert + + Assert.NotNull(stream); + stream.Validate(); + Assert.NotEmpty(stream.Content); + Assert.Single(stream.Content); + Assert.IsType(stream.Content[0].Value); + Assert.Equal("this is a Test", ((BetaTextBlock)stream.Content[0].Value!).Text); + } + + [Fact] + public async Task CreateStreamingAggregationPartialAggregation_Throws() + { + // arrange + + var messagesServiceMock = new Mock(); + static async IAsyncEnumerable GetTestValues() + { + yield return new(new BetaRawMessageStartEvent(GenerateStartMessage)); + yield return new( + new BetaRawContentBlockStartEvent() + { + Index = 0, + ContentBlock = new(new BetaTextBlock() { Citations = [], Text = "This is a " }), + } + ); + yield return new( + new BetaRawContentBlockDeltaEvent() + { + Index = 0, + Delta = new(new BetaTextDelta("Test")), + } + ); + yield return new( + new BetaRawContentBlockDeltaEvent() + { + Index = 0, + Delta = new( + new BetaCitationsDelta( + new Citation( + new BetaCitationsWebSearchResultLocation() + { + CitedText = "Somewhere", + EncryptedIndex = "0", + Title = "Over", + Url = "the://rainbow", + } + ) + ) + ), + } + ); + yield return new(new BetaRawContentBlockStopEvent() { Index = 0 }); + yield return new( + new BetaRawContentBlockStartEvent() + { + Index = 1, + ContentBlock = new( + new BetaThinkingBlock() { Signature = "", Thinking = "Other Test" } + ), + } + ); + yield return new(new BetaRawContentBlockStopEvent() { Index = 1 }); + yield return new(new BetaRawMessageStopEvent()); + await Task.CompletedTask; + } + messagesServiceMock + .Setup(e => + e.CreateStreaming(It.IsAny(), It.IsAny()) + ) + .Returns(GetTestValues); + + // act + + var aggregator = new BetaMessageContentAggregator(); + var stream = messagesServiceMock + .Object.CreateStreaming(StreamingParam, TestContext.Current.CancellationToken) + .CollectAsync(aggregator); + await foreach (var _ in stream) + { + // don't iterate entirely + break; + } + + // assert + + var exception = Assert.Throws(() => + aggregator.Message() + ); + Assert.Equal("stop message not yet received", exception.Message); + } + + [Fact] + public async Task CreateStreamingAggregation_Works() + { + // arrange + + var messagesServiceMock = new Mock(); + static async IAsyncEnumerable GetTestValues() + { + yield return new(new BetaRawMessageStartEvent(GenerateStartMessage)); + yield return new( + new BetaRawContentBlockStartEvent() + { + Index = 0, + ContentBlock = new(new BetaTextBlock() { Citations = [], Text = "This is a " }), + } + ); + yield return new( + new BetaRawContentBlockDeltaEvent() + { + Index = 0, + Delta = new(new BetaTextDelta("Test")), + } + ); + yield return new( + new BetaRawContentBlockDeltaEvent() + { + Index = 0, + Delta = new( + new BetaCitationsDelta( + new Citation( + new BetaCitationsWebSearchResultLocation() + { + CitedText = "Somewhere", + EncryptedIndex = "0", + Title = "Over", + Url = "the://rainbow", + } + ) + ) + ), + } + ); + yield return new(new BetaRawContentBlockStopEvent() { Index = 0 }); + yield return new( + new BetaRawContentBlockStartEvent() + { + Index = 1, + ContentBlock = new( + new BetaThinkingBlock() { Signature = "", Thinking = "Other Test" } + ), + } + ); + yield return new(new BetaRawContentBlockStopEvent() { Index = 1 }); + yield return new(new BetaRawMessageStopEvent()); + await Task.CompletedTask; + } + messagesServiceMock + .Setup(e => + e.CreateStreaming(It.IsAny(), It.IsAny()) + ) + .Returns(GetTestValues); + + // act + + var stream = await messagesServiceMock + .Object.CreateStreaming(StreamingParam, TestContext.Current.CancellationToken) + .Aggregate(); + + // assert + + Assert.NotNull(stream); + stream.Validate(); + Assert.NotEmpty(stream.Content); + Assert.Equal(2, stream.Content.Count); + Assert.IsType(stream.Content[0].Value); + Assert.IsType(stream.Content[1].Value); + Assert.Equal("This is a Test", ((BetaTextBlock)stream.Content[0].Value!).Text); + Assert.NotNull(((BetaTextBlock)stream.Content[0].Value!).Citations); + Assert.NotEmpty(((BetaTextBlock)stream.Content[0].Value!).Citations!); + Assert.Equal("Other Test", ((BetaThinkingBlock)stream.Content[1].Value!).Thinking); + } } diff --git a/src/Anthropic.Tests/Services/MessageServiceTest.cs b/src/Anthropic.Tests/Services/MessageServiceTest.cs index 1ba047f2..6cf57182 100644 --- a/src/Anthropic.Tests/Services/MessageServiceTest.cs +++ b/src/Anthropic.Tests/Services/MessageServiceTest.cs @@ -1,16 +1,53 @@ +using System.Collections.Generic; +using System.Linq; +using System.Threading; using System.Threading.Tasks; using Anthropic.Bedrock; +using Anthropic.Helpers; using Anthropic.Models.Messages; +using Anthropic.Services; +using Anthropic.Tests; +using Moq; namespace Anthropic.Tests.Services; public class MessageServiceTest { + private static Message GenerateStartMessage => + new() + { + ID = "Test", + Content = [], + Model = Model.Claude3OpusLatest, + StopReason = StopReason.ToolUse, + StopSequence = "", + Usage = new() + { + CacheCreation = null, + CacheCreationInputTokens = null, + CacheReadInputTokens = null, + InputTokens = 25, + OutputTokens = 25, + ServerToolUse = null, + ServiceTier = UsageServiceTier.Standard, + InferenceGeo = "inference_geo", + }, + }; + + private static Anthropic.Models.Messages.MessageCreateParams StreamingParam => + new() + { + MaxTokens = 1024, + Messages = [new() { Content = new(""), Role = Anthropic.Models.Messages.Role.User }], + Model = Model.Claude3_7SonnetLatest, + }; + [Theory] [AnthropicTestClients] [AnthropicTestData(TestSupportTypes.Anthropic, "Claude3_7SonnetLatest")] [AnthropicTestData(TestSupportTypes.Foundry, "claude-sonnet-4-5")] [AnthropicTestData(TestSupportTypes.Bedrock, "global.anthropic.claude-haiku-4-5-20251001-v1:0")] + [AnthropicTestData(TestSupportTypes.Vertex, "claude-3-7-sonnet@20250219")] public async Task Create_Works(IAnthropicClient client, string modelName) { var message = await client.Messages.Create( @@ -64,4 +101,325 @@ public async Task CountTokens_Works(IAnthropicClient client, string modelName) ); messageTokensCount.Validate(); } + + [Fact] + public async Task CreateStreamingAggregation_WorksNoContent_RawMessageStartEvent() + { + // arrange + + var messagesServiceMock = new Mock(); + static async IAsyncEnumerable GetTestValues() + { + yield return new(new RawMessageStartEvent(GenerateStartMessage)); + yield return new(new RawMessageStopEvent()); + await Task.CompletedTask; + } + messagesServiceMock + .Setup(e => + e.CreateStreaming( + It.IsAny(), + It.IsAny() + ) + ) + .Returns(GetTestValues); + + // act + + var stream = await messagesServiceMock + .Object.CreateStreaming(StreamingParam, TestContext.Current.CancellationToken) + .Aggregate(); + + // assert + + Assert.NotNull(stream); + Assert.Empty(stream.Content); + stream.Validate(); + } + + [Fact] + public async Task CreateStreamingAggregation_HandlesNoEndMessageInterrupt() + { + // arrange + + var messagesServiceMock = new Mock(); + static async IAsyncEnumerable GetTestValues() + { + yield return new(new RawMessageStartEvent(GenerateStartMessage)); + await Task.CompletedTask; + } + messagesServiceMock + .Setup(e => + e.CreateStreaming( + It.IsAny(), + It.IsAny() + ) + ) + .Returns(GetTestValues); + + // act + + // assert + + await Assert.ThrowsAsync(async () => + await messagesServiceMock + .Object.CreateStreaming(StreamingParam, TestContext.Current.CancellationToken) + .Aggregate() + ); + } + + [Fact] + public async Task CreateStreamingAggregation_WorksNoContent_RawContentBlockStartEvent() + { + // arrange + + var messagesServiceMock = new Mock(); + static async IAsyncEnumerable GetTestValues() + { + yield return new(new RawMessageStartEvent(GenerateStartMessage)); + yield return new( + new RawContentBlockStartEvent() + { + Index = 0, + ContentBlock = new(new TextBlock() { Citations = [], Text = "Test Output" }), + } + ); + yield return new(new RawContentBlockStopEvent() { Index = 0 }); + yield return new(new RawMessageStopEvent()); + await Task.CompletedTask; + } + + messagesServiceMock + .Setup(e => + e.CreateStreaming( + It.IsAny(), + It.IsAny() + ) + ) + .Returns(GetTestValues); + + // act + + var stream = await messagesServiceMock + .Object.CreateStreaming(StreamingParam, TestContext.Current.CancellationToken) + .Aggregate(); + + // assert + + Assert.NotNull(stream); + stream.Validate(); + Assert.NotEmpty(stream.Content); + Assert.Single(stream.Content); + Assert.IsType(stream.Content[0].Value); + Assert.Equal("Test Output", ((TextBlock)stream.Content[0].Value!).Text); + } + + [Fact] + public async Task CreateStreamingAggregation_WorksStopEndEvent() + { + // arrange + + var messagesServiceMock = new Mock(); + static async IAsyncEnumerable GetTestValues() + { + yield return new(new RawMessageStartEvent(GenerateStartMessage)); + yield return new( + new RawContentBlockStartEvent() + { + Index = 0, + ContentBlock = new TextBlock() { Citations = [], Text = "this is a " }, + } + ); + yield return new(new RawContentBlockStopEvent() { Index = 0 }); + yield return new( + new RawContentBlockDeltaEvent() { Index = 0, Delta = new(new TextDelta("Test")) } + ); + yield return new(new RawMessageStopEvent()); + await Task.CompletedTask; + } + messagesServiceMock + .Setup(e => + e.CreateStreaming( + It.IsAny(), + It.IsAny() + ) + ) + .Returns(GetTestValues); + + // act + + var stream = await messagesServiceMock + .Object.CreateStreaming(StreamingParam, TestContext.Current.CancellationToken) + .Aggregate(); + + // assert + + Assert.NotNull(stream); + stream.Validate(); + Assert.NotEmpty(stream.Content); + Assert.Single(stream.Content); + Assert.IsType(stream.Content[0].Value); + Assert.Equal("this is a Test", ((TextBlock)stream.Content[0].Value!).Text); + } + + [Fact] + public async Task CreateStreamingAggregationPartialAggregation_Throws() + { + // arrange + + var messagesServiceMock = new Mock(); + static async IAsyncEnumerable GetTestValues() + { + yield return new(new RawMessageStartEvent(GenerateStartMessage)); + yield return new( + new RawContentBlockStartEvent() + { + Index = 0, + ContentBlock = new(new TextBlock() { Citations = [], Text = "This is a " }), + } + ); + yield return new( + new RawContentBlockDeltaEvent() { Index = 0, Delta = new(new TextDelta("Test")) } + ); + yield return new( + new RawContentBlockDeltaEvent() + { + Index = 0, + Delta = new( + new CitationsDelta( + new Anthropic.Models.Messages.Citation( + new CitationsWebSearchResultLocation() + { + CitedText = "Somewhere", + EncryptedIndex = "0", + Title = "Over", + Url = "the://rainbow", + } + ) + ) + ), + } + ); + yield return new(new RawContentBlockStopEvent() { Index = 0 }); + yield return new( + new RawContentBlockStartEvent() + { + Index = 1, + ContentBlock = new( + new ThinkingBlock() { Signature = "", Thinking = "Other Test" } + ), + } + ); + yield return new(new RawContentBlockStopEvent() { Index = 1 }); + yield return new(new RawMessageStopEvent()); + await Task.CompletedTask; + } + messagesServiceMock + .Setup(e => + e.CreateStreaming( + It.IsAny(), + It.IsAny() + ) + ) + .Returns(GetTestValues); + + // act + + var aggregator = new MessageContentAggregator(); + var stream = messagesServiceMock + .Object.CreateStreaming(StreamingParam, TestContext.Current.CancellationToken) + .CollectAsync(aggregator); + await foreach (var _ in stream) + { + // don't iterate entirely + break; + } + + // assert + + var exception = Assert.Throws(() => + aggregator.Message() + ); + Assert.Equal("stop message not yet received", exception.Message); + } + + [Fact] + public async Task CreateStreamingAggregation_Works() + { + // arrange + + var messagesServiceMock = new Mock(); + static async IAsyncEnumerable GetTestValues() + { + yield return new(new RawMessageStartEvent(GenerateStartMessage)); + yield return new( + new RawContentBlockStartEvent() + { + Index = 0, + ContentBlock = new(new TextBlock() { Citations = [], Text = "This is a " }), + } + ); + yield return new( + new RawContentBlockDeltaEvent() { Index = 0, Delta = new(new TextDelta("Test")) } + ); + yield return new( + new RawContentBlockDeltaEvent() + { + Index = 0, + Delta = new( + new CitationsDelta( + new Anthropic.Models.Messages.Citation( + new CitationsWebSearchResultLocation() + { + CitedText = "Somewhere", + EncryptedIndex = "0", + Title = "Over", + Url = "the://rainbow", + } + ) + ) + ), + } + ); + yield return new(new RawContentBlockStopEvent() { Index = 0 }); + yield return new( + new RawContentBlockStartEvent() + { + Index = 1, + ContentBlock = new( + new ThinkingBlock() { Signature = "", Thinking = "Other Test" } + ), + } + ); + yield return new(new RawContentBlockStopEvent() { Index = 1 }); + yield return new(new RawMessageStopEvent()); + await Task.CompletedTask; + } + messagesServiceMock + .Setup(e => + e.CreateStreaming( + It.IsAny(), + It.IsAny() + ) + ) + .Returns(GetTestValues); + + // act + + var stream = await messagesServiceMock + .Object.CreateStreaming(StreamingParam, TestContext.Current.CancellationToken) + .Aggregate(); + + // assert + + Assert.NotNull(stream); + stream.Validate(); + Assert.NotEmpty(stream.Content); + Assert.Equal(2, stream.Content.Count); + Assert.IsType(stream.Content[0].Value); + Assert.IsType(stream.Content[1].Value); + Assert.Equal("This is a Test", ((TextBlock)stream.Content[0].Value!).Text); + Assert.NotNull(((TextBlock)stream.Content[0].Value!).Citations); + Assert.NotEmpty(((TextBlock)stream.Content[0].Value!).Citations!); + Assert.Equal("Other Test", ((ThinkingBlock)stream.Content[1].Value!).Thinking); + } } diff --git a/src/Anthropic.Vertex/Anthropic.Vertex.csproj b/src/Anthropic.Vertex/Anthropic.Vertex.csproj new file mode 100644 index 00000000..41685899 --- /dev/null +++ b/src/Anthropic.Vertex/Anthropic.Vertex.csproj @@ -0,0 +1,27 @@ + + + + enable + enable + + Anthropic.Vertex + 0.1.0 + + + + + + + + + + + + + + + + + + + diff --git a/src/Anthropic.Vertex/AnthropicVertexClient.cs b/src/Anthropic.Vertex/AnthropicVertexClient.cs new file mode 100644 index 00000000..603bc285 --- /dev/null +++ b/src/Anthropic.Vertex/AnthropicVertexClient.cs @@ -0,0 +1,51 @@ +using Anthropic.Core; + +namespace Anthropic.Vertex; + +/// +/// Provides methods for invoking the vertex hosted Anthropic api. +/// +public class AnthropicVertexClient : AnthropicClient +{ + private readonly IAnthropicVertexCredentials _vertexCredentials; + + private readonly Lazy _withRawResponse; + + /// + /// Creates a new Instance of the . + /// + /// The credential Provider used to authenticate with the AWS Bedrock service. + public AnthropicVertexClient(IAnthropicVertexCredentials vertexCredentials) + : base() + { + _vertexCredentials = vertexCredentials; + BaseUrl = + $"https://{(_vertexCredentials.Region is "global" or null ? "" : _vertexCredentials.Region + "-")}aiplatform.googleapis.com"; + _withRawResponse = new(() => + new AnthropicVertexClientWithRawResponse(_vertexCredentials, _options) + ); + } + + private AnthropicVertexClient( + IAnthropicVertexCredentials vertexCredentials, + ClientOptions clientOptions + ) + : base(clientOptions) + { + _vertexCredentials = vertexCredentials; + BaseUrl = + $"https://{(_vertexCredentials.Region is "global" or null ? "" : _vertexCredentials.Region + "-")}aiplatform.googleapis.com"; + _withRawResponse = new(() => + new AnthropicVertexClientWithRawResponse(_vertexCredentials, _options) + ); + } + + /// + public override IAnthropicClient WithOptions(Func modifier) + { + return new AnthropicVertexClient(_vertexCredentials, modifier(this._options)); + } + + /// + public override IAnthropicClientWithRawResponse WithRawResponse => _withRawResponse.Value; +} diff --git a/src/Anthropic.Vertex/AnthropicVertexClientWithRawResponse.cs b/src/Anthropic.Vertex/AnthropicVertexClientWithRawResponse.cs new file mode 100644 index 00000000..c681fbf8 --- /dev/null +++ b/src/Anthropic.Vertex/AnthropicVertexClientWithRawResponse.cs @@ -0,0 +1,133 @@ +using System.Text; +using System.Text.Json; +using System.Text.Json.Nodes; +using Anthropic.Core; +using Anthropic.Exceptions; + +namespace Anthropic.Vertex; + +internal class AnthropicVertexClientWithRawResponse : AnthropicClientWithRawResponse +{ + private const string ANTHROPIC_VERSION = "vertex-2023-10-16"; + + private readonly IAnthropicVertexCredentials _vertexCredentials; + + /// + /// Creates a new Instance of the . + /// + /// The credential Provider used to authenticate with the AWS Bedrock service. + public AnthropicVertexClientWithRawResponse( + IAnthropicVertexCredentials vertexCredentials, + ClientOptions options + ) + : base(options) + { + _vertexCredentials = vertexCredentials; + } + + public override IAnthropicClientWithRawResponse WithOptions( + Func modifier + ) + { + return new AnthropicVertexClientWithRawResponse(_vertexCredentials, modifier(_options)); + } + + protected override async ValueTask BeforeSend( + HttpRequest request, + HttpRequestMessage requestMessage, + CancellationToken cancellationToken + ) + { + ValidateRequest(requestMessage, out var isCountEndpoint); + + var bodyContent = JsonNode.Parse( + await requestMessage.Content!.ReadAsStringAsync( +#if NET + cancellationToken +#endif + ).ConfigureAwait(false) + )!; + + bodyContent["anthropic_version"] = JsonValue.Create(ANTHROPIC_VERSION); + + var modelValue = + bodyContent["model"] + ?? throw new AnthropicInvalidDataException( + "Expected to find property model in request json but found none." + ); + + bodyContent.Root.AsObject().Remove("model"); + var parsedStreamValue = ((bool?)bodyContent["stream"]?.AsValue()) ?? false; + + var contentStream = new MemoryStream(); + requestMessage.Content = new StreamContent(contentStream); + using var writer = new Utf8JsonWriter(contentStream); + { + bodyContent.WriteTo(writer); + await writer.FlushAsync(cancellationToken).ConfigureAwait(false); + } + contentStream.Seek(0, SeekOrigin.Begin); + requestMessage.Headers.TryAddWithoutValidation( + "content-length", + contentStream.Length.ToString() + ); + + var requestBuilder = new StringBuilder( + $"{requestMessage.RequestUri!.Scheme}://{requestMessage.RequestUri.Host}/v1/projects/{_vertexCredentials.Project}/locations/{_vertexCredentials.Region}/publishers/anthropic/models/" + ); + if (isCountEndpoint) + { + requestBuilder.Append("count-tokens:rawPredict"); + } + else + { + requestBuilder.Append( + $"{modelValue.AsValue()}:{(parsedStreamValue ? "streamRawPredict" : "rawPredict")}" + ); + } + + requestMessage.RequestUri = new Uri(requestBuilder.ToString()); + + await _vertexCredentials.ApplyAsync(requestMessage).ConfigureAwait(false); + } + + private static void ValidateRequest(HttpRequestMessage requestMessage, out bool isCountEndpoint) + { + if (requestMessage.RequestUri is null) + { + throw new AnthropicInvalidDataException( + "Request is missing required path segments. Expected > 1 segments, found none." + ); + } + + if (requestMessage.RequestUri.Segments.Length < 1) + { + throw new AnthropicInvalidDataException( + "Request is missing required path segments. Expected > 1 segments, found none." + ); + } + + if (requestMessage.RequestUri.Segments[1].Trim('/') != "v1") + { + throw new AnthropicInvalidDataException( + $"Request is missing required path segments. Expected [0] segment to be 'v1', found {requestMessage.RequestUri.Segments[0]}." + ); + } + + if ( + requestMessage.RequestUri.Segments.Length >= 4 + && requestMessage.RequestUri.Segments[2].Trim('/') is "messages" + && requestMessage.RequestUri.Segments[3].Trim('/') is "batches" or "count_tokens" + ) + { + throw new AnthropicInvalidDataException( + $"The requested endpoint '{requestMessage.RequestUri.Segments.Last().Trim('/')}' is not yet supported." + ); + } + + isCountEndpoint = + requestMessage.RequestUri.Segments.Length >= 4 + && requestMessage.RequestUri.Segments[2].Trim('/') is "messages" + && requestMessage.RequestUri.Segments[3].Trim('/') is "count_tokens"; + } +} diff --git a/src/Anthropic.Vertex/AnthropicVertexCredentials.cs b/src/Anthropic.Vertex/AnthropicVertexCredentials.cs new file mode 100644 index 00000000..5664360f --- /dev/null +++ b/src/Anthropic.Vertex/AnthropicVertexCredentials.cs @@ -0,0 +1,52 @@ +using Google.Apis.Auth.OAuth2; + +namespace Anthropic.Vertex; + +/// +/// Defines methods to authenticate with vertex services using the api. +/// +public class AnthropicVertexCredentials : IAnthropicVertexCredentials +{ + private readonly GoogleCredential _googleCredentials; + + /// + /// Creates a new instance of the using the environment provided google authentication methods. + /// + /// The region string for the project or null for global. + /// The project string. + public AnthropicVertexCredentials(string? region, string project) + : this(region, project, GoogleCredential.GetApplicationDefault()) { } + + /// + /// Creates a new instance of the . + /// + /// The region string for the project or null for global. + /// The project string. + /// The authentication method. + public AnthropicVertexCredentials( + string? region, + string project, + GoogleCredential googleCredential + ) + { + Region = region ?? "global"; + Project = project; + _googleCredentials = googleCredential; + } + + /// + public string Region { get; } + + /// + public string Project { get; } + + /// + public async ValueTask ApplyAsync(HttpRequestMessage requestMessage) + { + var token = await _googleCredentials + .UnderlyingCredential.GetAccessTokenForRequestAsync() + .ConfigureAwait(false); + requestMessage.Headers.Authorization = + new System.Net.Http.Headers.AuthenticationHeaderValue("Bearer", token); + } +} diff --git a/src/Anthropic.Vertex/IAnthropicVertexCredentials.cs b/src/Anthropic.Vertex/IAnthropicVertexCredentials.cs new file mode 100644 index 00000000..f2123ebc --- /dev/null +++ b/src/Anthropic.Vertex/IAnthropicVertexCredentials.cs @@ -0,0 +1,75 @@ +using Google.Apis.Auth.OAuth2; + +namespace Anthropic.Vertex; + +/// +/// Defines methods for authenticating requests to the vertex api. +/// +public interface IAnthropicVertexCredentials +{ + /// + /// Gets the Region on the Project. + /// + string Region { get; } + + /// + /// Gets the Project name. + /// + string Project { get; } + + /// + /// Applies the authentication method to the request. + /// + /// The http Request message object. + /// A value task that is resolved when the authentication has been applied to the request message. + ValueTask ApplyAsync(HttpRequestMessage requestMessage); + +#if NET8_0_OR_GREATER + public static async Task FromEnvAsync() + { + return await DefaultAnthropicVertexCredentials.FromEnvAsync().ConfigureAwait(false); + } +#endif +} + +public static class DefaultAnthropicVertexCredentials +{ + /// + /// Creates a new instance of from environment variables. + /// + /// + /// Set the following environment variables: + /// + /// ANTHROPIC_VERTEX_PROJECT_ID=your_project_id + /// CLOUD_ML_REGION=region_name + /// VERTEX_ACCESS_TOKEN=vertex_access_token + /// + /// The CLOUD_ML_REGION environment variable is optional and if not set will fallback to global. + /// The VERTEX_ACCESS_TOKEN environment variable is optional and if unset the google system checks will be performed to obtain a valid set of credentials. See: https://docs.cloud.google.com/docs/authentication/application-default-credentials + /// + /// A new instance of an or null if it cannot be loaded from environment variables + public static async ValueTask FromEnvAsync() + { + var projId = Environment.GetEnvironmentVariable("ANTHROPIC_VERTEX_PROJECT_ID"); + var region = Environment.GetEnvironmentVariable("CLOUD_ML_REGION"); + var accessToken = Environment.GetEnvironmentVariable("VERTEX_ACCESS_TOKEN"); + + if (projId is null) + { + return null; + } + + region ??= "global"; + + var credentials = accessToken is null + ? GoogleCredential.FromAccessToken(accessToken) + : await GoogleCredential.GetApplicationDefaultAsync().ConfigureAwait(false); + + if (credentials.UnderlyingCredential is null) + { + return null; + } + + return new AnthropicVertexCredentials(region, projId, credentials); + } +} diff --git a/src/Anthropic/AIContentCacheExtensions.cs b/src/Anthropic/AIContentCacheExtensions.cs new file mode 100644 index 00000000..85e3eb82 --- /dev/null +++ b/src/Anthropic/AIContentCacheExtensions.cs @@ -0,0 +1,104 @@ +using Anthropic.Models.Messages; + +#pragma warning disable IDE0130 // Namespace does not match folder structure + +namespace Microsoft.Extensions.AI; + +/// +/// Extension methods for configuring Anthropic prompt caching on instances. +/// +/// +/// +/// Prompt caching allows you to cache frequently used context between API calls, reducing latency +/// and costs for repetitive workloads. Cache breakpoints are placed at the END of content blocks +/// that have cache control set. +/// +/// +/// These extensions are only effective when used with the returned by +/// . Other implementations will ignore the cache control settings. +/// +/// +public static class AIContentCacheExtensions +{ + private const string CacheControlKey = "anthropic:cache_control"; + + /// + /// Configures Anthropic prompt caching on this content block. + /// + /// The type of . + /// The content to configure caching for. + /// + /// The cache control configuration. Pass to remove any existing cache control. + /// + /// The same instance for method chaining. + /// + /// + /// The cache breakpoint is placed at the END of this content block. All content up to and including + /// this block will be cached together. + /// + /// + /// For optimal caching in agentic loops, place cache breakpoints on: + /// + /// System prompts (use for stable prompts) + /// The last content block before the current turn (use ) + /// Large tool results that won't change + /// + /// + /// + /// + /// + /// var systemContent = new TextContent(systemPrompt).WithCacheControl(new CacheControlEphemeral { Ttl = Ttl.Ttl1h }); + /// chatMessages.Add(new ChatMessage(ChatRole.System, [systemContent])); + /// + /// + public static T WithCacheControl(this T content, CacheControlEphemeral? cacheControl) + where T : AIContent + { + if (cacheControl is null) + { + content.AdditionalProperties?.Remove(CacheControlKey); + } + else + { + (content.AdditionalProperties ??= [])[CacheControlKey] = cacheControl; + } + + return content; + } + + /// + /// Configures Anthropic prompt caching on this content block with the specified TTL. + /// + /// The type of . + /// The content to configure caching for. + /// + /// The time-to-live for the cache. Use (5 minutes) for dynamic content + /// or (1 hour) for stable content like system prompts. + /// Pass for the default TTL (5 minutes). + /// + /// The same instance for method chaining. + /// + /// + /// // Cache system prompt for 1 hour + /// var systemContent = new TextContent(systemPrompt).WithCacheControl(Ttl.Ttl1h); + /// + /// // Cache conversation context for 5 minutes (default) + /// var lastMessage = messages[^1].Contents.Last(); + /// lastMessage.WithCacheControl(Ttl.Ttl5m); + /// + /// + public static T WithCacheControl(this T content, Ttl? ttl) + where T : AIContent => content.WithCacheControl(new CacheControlEphemeral { Ttl = ttl }); + + /// + /// Gets the cache control configuration for this content block, if any. + /// + /// The content to check. + /// + /// The if configured, or if no cache control is set. + /// + internal static CacheControlEphemeral? GetCacheControl(this AIContent content) => + content.AdditionalProperties?.TryGetValue(CacheControlKey, out var cc) == true + ? cc as CacheControlEphemeral + : null; +} diff --git a/src/Anthropic/Anthropic.csproj b/src/Anthropic/Anthropic.csproj index 82e43428..3b49b718 100644 --- a/src/Anthropic/Anthropic.csproj +++ b/src/Anthropic/Anthropic.csproj @@ -13,12 +13,12 @@ - - - - - - + + + + + + diff --git a/src/Anthropic/AnthropicClientExtensions.cs b/src/Anthropic/AnthropicClientExtensions.cs index a5f96cce..6ddc78ab 100644 --- a/src/Anthropic/AnthropicClientExtensions.cs +++ b/src/Anthropic/AnthropicClientExtensions.cs @@ -438,7 +438,8 @@ out List? systemMessages { if (content is TextContent tc) { - (systemMessages ??= []).Add(new() { Text = tc.Text }); + var block = new TextBlockParam { Text = tc.Text }; + (systemMessages ??= []).Add(WithCacheControlFrom(block, tc)); } } @@ -462,43 +463,59 @@ out List? systemMessages text = text.TrimEnd(); if (!string.IsNullOrWhiteSpace(text)) { - contents.Add(new TextBlockParam() { Text = text }); + contents.Add( + WithCacheControlFrom( + new TextBlockParam() { Text = text }, + tc + ) + ); } } else if (!string.IsNullOrWhiteSpace(text)) { - contents.Add(new TextBlockParam() { Text = text }); + contents.Add( + WithCacheControlFrom(new TextBlockParam() { Text = text }, tc) + ); } break; case TextReasoningContent trc when !string.IsNullOrEmpty(trc.Text): contents.Add( - new ThinkingBlockParam() - { - Thinking = trc.Text, - Signature = trc.ProtectedData ?? string.Empty, - } + WithCacheControlFrom( + new ThinkingBlockParam() + { + Thinking = trc.Text, + Signature = trc.ProtectedData ?? string.Empty, + }, + trc + ) ); break; case TextReasoningContent trc when !string.IsNullOrEmpty(trc.ProtectedData): contents.Add( - new RedactedThinkingBlockParam() { Data = trc.ProtectedData! } + WithCacheControlFrom( + new RedactedThinkingBlockParam() { Data = trc.ProtectedData! }, + trc + ) ); break; case DataContent dc when dc.HasTopLevelMediaType("image"): contents.Add( - new ImageBlockParam() - { - Source = new( - new Base64ImageSource() - { - Data = dc.Base64Data.ToString(), - MediaType = dc.MediaType, - } - ), - } + WithCacheControlFrom( + new ImageBlockParam() + { + Source = new( + new Base64ImageSource() + { + Data = dc.Base64Data.ToString(), + MediaType = dc.MediaType, + } + ), + }, + dc + ) ); break; @@ -509,35 +526,49 @@ when string.Equals( StringComparison.OrdinalIgnoreCase ): contents.Add( - new DocumentBlockParam() - { - Source = new( - new Base64PdfSource() { Data = dc.Base64Data.ToString() } - ), - } + WithCacheControlFrom( + new DocumentBlockParam() + { + Source = new( + new Base64PdfSource() + { + Data = dc.Base64Data.ToString(), + } + ), + }, + dc + ) ); break; case DataContent dc when dc.HasTopLevelMediaType("text"): contents.Add( - new DocumentBlockParam() - { - Source = new( - new PlainTextSource() - { - Data = Encoding.UTF8.GetString(dc.Data.ToArray()), - } - ), - } + WithCacheControlFrom( + new DocumentBlockParam() + { + Source = new( + new PlainTextSource() + { + Data = Encoding.UTF8.GetString(dc.Data.ToArray()), + } + ), + }, + dc + ) ); break; case UriContent uc when uc.HasTopLevelMediaType("image"): contents.Add( - new ImageBlockParam() - { - Source = new(new UrlImageSource() { Url = uc.Uri.AbsoluteUri }), - } + WithCacheControlFrom( + new ImageBlockParam() + { + Source = new( + new UrlImageSource() { Url = uc.Uri.AbsoluteUri } + ), + }, + uc + ) ); break; @@ -548,33 +579,41 @@ when string.Equals( StringComparison.OrdinalIgnoreCase ): contents.Add( - new DocumentBlockParam() - { - Source = new(new UrlPdfSource() { Url = uc.Uri.AbsoluteUri }), - } + WithCacheControlFrom( + new DocumentBlockParam() + { + Source = new( + new UrlPdfSource() { Url = uc.Uri.AbsoluteUri } + ), + }, + uc + ) ); break; case FunctionCallContent fcc: contents.Add( - new ToolUseBlockParam() - { - ID = fcc.CallId, - Name = fcc.Name, - Input = - fcc.Arguments?.ToDictionary( - e => e.Key, - e => - e.Value is JsonElement je - ? je - : JsonSerializer.SerializeToElement( - e.Value, - AIJsonUtilities.DefaultOptions.GetTypeInfo( - typeof(object) + WithCacheControlFrom( + new ToolUseBlockParam() + { + ID = fcc.CallId, + Name = fcc.Name, + Input = + fcc.Arguments?.ToDictionary( + e => e.Key, + e => + e.Value is JsonElement je + ? je + : JsonSerializer.SerializeToElement( + e.Value, + AIJsonUtilities.DefaultOptions.GetTypeInfo( + typeof(object) + ) ) - ) - ) ?? [], - } + ) ?? [], + }, + fcc + ) ); break; @@ -716,12 +755,15 @@ when string.Equals( } contents.Add( - new ToolResultBlockParam() - { - ToolUseID = frc.CallId, - IsError = frc.Exception is not null, - Content = result, - } + WithCacheControlFrom( + new ToolResultBlockParam() + { + ToolUseID = frc.CallId, + IsError = frc.Exception is not null, + Content = result, + }, + frc + ) ); break; } @@ -749,6 +791,33 @@ when string.Equals( return messageParams; } + /// + /// Applies cache control from an to a content block param if configured. + /// + /// + /// Note: ThinkingBlockParam and RedactedThinkingBlockParam do not support cache control. + /// + private static T WithCacheControlFrom(T block, AIContent content) + where T : class + { + var cacheControl = content.GetCacheControl(); + if (cacheControl is null) + { + return block; + } + + return block switch + { + TextBlockParam tb => (tb with { CacheControl = cacheControl }) as T ?? block, + ImageBlockParam ib => (ib with { CacheControl = cacheControl }) as T ?? block, + DocumentBlockParam db => (db with { CacheControl = cacheControl }) as T ?? block, + ToolUseBlockParam tub => (tub with { CacheControl = cacheControl }) as T ?? block, + ToolResultBlockParam trb => (trb with { CacheControl = cacheControl }) as T + ?? block, + _ => block, + }; + } + private MessageCreateParams GetMessageCreateParams( List messages, List? systemMessages, diff --git a/src/Anthropic/Core/ClientOptions.cs b/src/Anthropic/Core/ClientOptions.cs index a409b297..c8785733 100644 --- a/src/Anthropic/Core/ClientOptions.cs +++ b/src/Anthropic/Core/ClientOptions.cs @@ -97,4 +97,64 @@ public string? AuthToken readonly get { return _authToken.Value; } set { _authToken = new(() => value); } } + + internal static TimeSpan TimeoutFromMaxTokens( + long maxTokens, + bool isStreaming, + string? model = null + ) + { + // Check model-specific token limits for non-streaming requests + long? maxNonStreamingTokens = null; + + if (model != null) + { + maxNonStreamingTokens = model switch + { + "claude-opus-4-20250514" => 8_192, + "claude-4-opus-20250514" => 8_192, + "claude-opus-4-0" => 8_192, + "anthropic.claude-opus-4-20250514-v1:0" => 8_192, + "claude-opus-4@20250514" => 8_192, + "claude-opus-4-1-20250805" => 8_192, + "anthropic.claude-opus-4-1-20250805-v1:0" => 8_192, + "claude-opus-4-1@20250805" => 8_192, + _ => null, + }; + } + var exceedsModelLimit = maxNonStreamingTokens != null && maxTokens > maxNonStreamingTokens; + + long timeoutSeconds; + if (isStreaming) + { + timeoutSeconds = Math.Min( + 60 * 60, // 1 hour maximum + Math.Max( + 10 * 60, // 10 minute minimum + 60 * 60 * maxTokens / 128_000 + ) + ); + } + else + { + timeoutSeconds = Math.Min( + 10 * 60, // 10 minute maximum + Math.Max( + 30, // 30 second minimum + 30 * maxTokens / 1000 + ) + ); + } + + if (!isStreaming && (exceedsModelLimit || timeoutSeconds > 10 * 60)) // 10 minutes + { + throw new ArgumentOutOfRangeException( + nameof(maxTokens), + "Streaming is required for operations that may take longer than 10 minutes. " + + "For more information, see https://github.com/anthropics/anthropic-sdk-csharp#streaming" + ); + } + + return TimeSpan.FromSeconds(timeoutSeconds); + } } diff --git a/src/Anthropic/Helpers/BetaMessageContentAggregator.cs b/src/Anthropic/Helpers/BetaMessageContentAggregator.cs new file mode 100644 index 00000000..795ba2be --- /dev/null +++ b/src/Anthropic/Helpers/BetaMessageContentAggregator.cs @@ -0,0 +1,199 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using Anthropic.Exceptions; +using Anthropic.Models.Beta.Messages; + +namespace Anthropic.Helpers; + +/// +/// The aggregation model for a stream of +/// +public sealed class BetaMessageContentAggregator + : SseAggregator +{ + protected override BetaMessage GetResult( + IReadOnlyDictionary> messages + ) + { + var content = messages[FilterResult.Content].GroupBy(e => e.Index); + + var startMessage = + messages[FilterResult.StartMessage] + .Select(e => e.Value) + .OfType() + .FirstOrDefault() + ?? throw new AnthropicInvalidDataException("start message not yet received"); + var endMessage = + messages[FilterResult.EndMessage] + .Select(e => e.Value) + .OfType() + .FirstOrDefault() + ?? throw new AnthropicInvalidDataException("stop message not yet received"); + + var contentBlocks = new List(); + foreach (var item in content) + { + var startContent = + item.Select(e => e.Value).OfType().FirstOrDefault() + ?? throw new AnthropicInvalidDataException( + "start content message not yet received" + ); + var blockContent = item.Select(e => e.Value) + .OfType() + .ToArray(); + + var contentBlock = startContent.ContentBlock; + contentBlocks.Add(MergeBlock(contentBlock, blockContent.Select(e => e.Delta))); + } + + var stopSequence = startMessage.Message.StopSequence; + var stopReason = startMessage.Message.StopReason; + var container = startMessage.Message.Container; + var usage = startMessage.Message.Usage; + + if (messages.TryGetValue(FilterResult.Delta, out var deltaEvents)) + { + var deltas = deltaEvents.Select(e => e.Value).OfType(); + + foreach (var delta in deltas ?? []) + { + stopReason = delta.Delta.StopReason; + stopSequence = delta.Delta.StopSequence; + + if (delta.Delta.Container != null) + { + container = delta.Delta.Container; + } + + usage = usage with { OutputTokens = delta.Usage.OutputTokens }; + if (delta.Usage.InputTokens != null) + { + usage = usage with { InputTokens = delta.Usage.InputTokens.Value }; + } + if (delta.Usage.CacheCreationInputTokens != null) + { + usage = usage with + { + CacheCreationInputTokens = delta.Usage.CacheCreationInputTokens, + }; + } + if (delta.Usage.CacheReadInputTokens != null) + { + usage = usage with { CacheReadInputTokens = delta.Usage.CacheReadInputTokens }; + } + if (delta.Usage.ServerToolUse != null) + { + usage = usage with { ServerToolUse = delta.Usage.ServerToolUse }; + } + } + } + + return new() + { + Content = [.. contentBlocks], + Container = container, + ContextManagement = startMessage.Message.ContextManagement, + ID = startMessage.Message.ID, + Model = startMessage.Message.Model, + StopReason = stopReason, + StopSequence = stopSequence, + Usage = usage, + }; + } + + private static BetaContentBlock MergeBlock( + ContentBlock contentBlock, + IEnumerable blockContents + ) + { + BetaContentBlock? resultBlock = null; + + string StringJoinHelper( + string source, + IEnumerable sources, + Func expression + ) + { + return string.Join(null, (string[])[source, .. sources.Select(expression)]); + } + + void As(Func, BetaContentBlock> factory) + { + // those blocks are delta variants not the source block + // e.g TextBlock and TextDelta + resultBlock = factory([.. blockContents.Select(e => e.Value).OfType()]); + } + + IEnumerable Of() + { + return blockContents.Select(e => e.Value).OfType(); + } + + void Single(T item) + { + resultBlock = ( + blockContents.Select(e => e.Value).OfType().Single() as BetaContentBlock + ); + } + + contentBlock.Switch( + textBlock => + As(blocks => new BetaTextBlock() + { + Text = StringJoinHelper(textBlock.Text, blocks, e => e.Text), + Citations = + [ + .. (textBlock.Citations ?? []), + .. Of() + .Select(e => + e.Citation.Match( + f => f, + f => f, + f => f, + f => f, + f => f + ) + ), + ], + }), + thinkingBlock => + As(blocks => new BetaThinkingBlock() + { + Signature = StringJoinHelper( + thinkingBlock.Signature, + Of(), + e => e.Signature + ), + Thinking = StringJoinHelper(thinkingBlock.Thinking, blocks, e => e.Thinking), + }), + e => Single(e), + e => Single(e), + e => Single(e), + e => Single(e), + e => Single(e), + e => Single(e), + e => Single(e), + e => Single(e), + e => Single(e), + e => Single(e), + e => Single(e), + e => Single(e), + e => Single(e) + ); + + return resultBlock ?? throw new AnthropicInvalidDataException("Missing result block"); + } + + protected override FilterResult Filter(BetaRawMessageStreamEvent message) => + message.Value switch + { + BetaRawContentBlockStartEvent _ => FilterResult.Content, + BetaRawContentBlockStopEvent _ => FilterResult.Content, + BetaRawContentBlockDeltaEvent _ => FilterResult.Content, + BetaRawMessageDeltaEvent => FilterResult.Delta, + BetaRawMessageStartEvent => FilterResult.StartMessage, + BetaRawMessageStopEvent _ => FilterResult.EndMessage, + _ => FilterResult.Ignore, + }; +} diff --git a/src/Anthropic/Helpers/MessageContentAggregator.cs b/src/Anthropic/Helpers/MessageContentAggregator.cs new file mode 100644 index 00000000..173a62d3 --- /dev/null +++ b/src/Anthropic/Helpers/MessageContentAggregator.cs @@ -0,0 +1,183 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using Anthropic.Exceptions; +using Anthropic.Models.Messages; +using Anthropic.Services; + +namespace Anthropic.Helpers; + +/// +/// An implementation of the for aggregating BlockDeltaEvents from the method. +/// +public sealed class MessageContentAggregator : SseAggregator +{ + protected override Message GetResult( + IReadOnlyDictionary> messages + ) + { + var content = messages[FilterResult.Content].GroupBy(e => e.Index); + + var startMessage = + messages[FilterResult.StartMessage] + .Select(e => e.Value) + .OfType() + .FirstOrDefault() + ?? throw new AnthropicInvalidDataException("start message not yet received"); + + var endMessageCount = messages[FilterResult.EndMessage].Count; + if (endMessageCount == 0) + { + throw new AnthropicInvalidDataException("stop message not yet received"); + } + + var contentBlocks = new List(); + foreach (var item in content) + { + var startContent = + item.Select(e => e.Value).OfType().FirstOrDefault() + ?? throw new AnthropicInvalidDataException( + "start content message not yet received" + ); + var blockContent = item.Select(e => e.Value) + .OfType() + .ToArray(); + + var contentBlock = startContent.ContentBlock; + contentBlocks.Add(MergeBlock(contentBlock, [.. blockContent.Select(e => e.Delta)])); + } + + var stopSequence = startMessage.Message.StopSequence; + var stopReason = startMessage.Message.StopReason; + var usage = startMessage.Message.Usage; + + if (messages.TryGetValue(FilterResult.Delta, out var deltaEvents)) + { + var deltas = deltaEvents.Select(e => e.Value).OfType(); + foreach (var delta in deltas) + { + stopReason = delta.Delta.StopReason; + stopSequence = delta.Delta.StopSequence; + + usage = usage with { OutputTokens = delta.Usage.OutputTokens }; + if (delta.Usage.InputTokens != null) + { + usage = usage with { InputTokens = delta.Usage.InputTokens.Value }; + } + if (delta.Usage.CacheCreationInputTokens != null) + { + usage = usage with + { + CacheCreationInputTokens = delta.Usage.CacheCreationInputTokens, + }; + } + if (delta.Usage.CacheReadInputTokens != null) + { + usage = usage with { CacheReadInputTokens = delta.Usage.CacheReadInputTokens }; + } + if (delta.Usage.ServerToolUse != null) + { + usage = usage with { ServerToolUse = delta.Usage.ServerToolUse }; + } + } + } + + return new() + { + Content = [.. contentBlocks], + ID = startMessage.Message.ID, + Model = startMessage.Message.Model, + StopReason = stopReason, + StopSequence = stopSequence, + Usage = usage, + }; + } + + private static ContentBlock MergeBlock( + RawContentBlockStartEventContentBlock contentBlock, + IEnumerable blockContents + ) + { + ContentBlock? resultBlock = null; + + string StringJoinHelper( + string source, + IEnumerable sources, + Func expression + ) + { + return string.Join(null, (string[])[source, .. sources.Select(expression)]); + } + + void As(Func, ContentBlock> factory) + { + // those blocks are delta variants not the source block + // e.g TextBlock and TextDelta + resultBlock = factory([.. blockContents.Select(e => e.Value).OfType()]); + } + + IEnumerable Of() + { + return blockContents.Select(e => e.Value).OfType(); + } + + void Single(T item) + { + resultBlock = + (blockContents.Select(e => e.Value).OfType().Single() as ContentBlock) + ?? throw new AnthropicInvalidDataException( + "Could not convert block to content block" + ); + } + + contentBlock.Switch( + textBlock => + As(blocks => new TextBlock() + { + Text = StringJoinHelper(textBlock.Text, blocks, e => e.Text), + Citations = + [ + .. (textBlock.Citations ?? []), + .. Of() + .Select(e => + e.Citation.Match( + f => f, + f => f, + f => f, + f => f, + f => f + ) + ), + ], + }), + thinkingBlock => + As(blocks => new ThinkingBlock() + { + Signature = StringJoinHelper( + thinkingBlock.Signature, + Of(), + e => e.Signature + ), + Thinking = StringJoinHelper(thinkingBlock.Thinking, blocks, e => e.Thinking), + }), + e => Single(e), + e => Single(e), + e => Single(e), + e => Single(e) + ); + + return resultBlock ?? throw new AnthropicInvalidDataException("Missing result block"); + } + + protected override FilterResult Filter(RawMessageStreamEvent message) => + message.Value switch + { + RawContentBlockStartEvent _ => FilterResult.Content, + RawContentBlockStopEvent _ => FilterResult.Content, + RawContentBlockDeltaEvent _ => FilterResult.Content, + RawMessageDeltaEvent => FilterResult.Delta, + RawMessageStartEvent => FilterResult.StartMessage, + RawMessageStopEvent _ => FilterResult.EndMessage, + _ => FilterResult.Ignore, + }; +} diff --git a/src/Anthropic/Helpers/SseAggregator.cs b/src/Anthropic/Helpers/SseAggregator.cs new file mode 100644 index 00000000..b972f7d3 --- /dev/null +++ b/src/Anthropic/Helpers/SseAggregator.cs @@ -0,0 +1,119 @@ +using System; +using System.Collections.Generic; +using System.Collections.ObjectModel; +using Anthropic.Exceptions; + +namespace Anthropic.Helpers; + +/// +/// Defines the base for all Aggregators using ServerSideStreaming events. +/// +/// The raw message base element type. +/// The element type that defines an aggregated +public abstract class SseAggregator +{ + private Dictionary>? _messages; + private bool _streamEnded; + private TResult? _message; + + /// + /// Collects and filters the provided for aggregation with the method. + /// + /// An containing the messages to aggregate. + /// An of all content messages used to build the aggregation result. + /// Will be thrown if the aggregator is in an invalid state. + /// Will be thrown if the aggregator encounters an invalid state of the source message stream. + public virtual async IAsyncEnumerable CollectAsync( + IAsyncEnumerable messageStream + ) + { + if (_messages is not null) + { + throw new InvalidOperationException( + "Cannot collect multiple streams into same aggregator." + ); + } + + _messages = []; + + foreach (FilterResult item in Enum.GetValues(typeof(FilterResult))) + { + _messages[item] = []; + } + + var startMessageReceived = false; + FilterResult filterResult = FilterResult.Ignore; + await foreach (var message in messageStream) + { + Console.WriteLine(message); + if (!_streamEnded) + { + if (!startMessageReceived && Filter(message) != FilterResult.StartMessage) + { + _messages[FilterResult.StartMessage].Add(message); + continue; + } + + startMessageReceived = true; + filterResult = Filter(message); + _messages[filterResult].Add(message); + if (filterResult == FilterResult.EndMessage) + { + break; + } + } + + yield return message; + } + + if (filterResult != FilterResult.EndMessage) + { + throw new AnthropicInvalidDataException( + $"Expected last message to be the End message but found: {filterResult}" + ); + } + + _streamEnded = true; + } + + /// + /// Aggregates all items based on the Anthropic streaming protocol present in the provided on initialization. + /// + /// The result of the aggregation. + public virtual TResult Message() + { + if (_messages == null) + { + throw new AnthropicInvalidDataException("Stream was not passed to aggregator"); + } + + return _message ??= GetResult( + new ReadOnlyDictionary>(_messages) + ); + } + + /// + /// Applies a filter to each individual message. + /// + /// The message to filter. + /// [True] if the message should be included in the aggregation result, otherwise [False] + protected abstract FilterResult Filter(TMessage message); + + /// + /// Gets an aggregation result of the collected list of messages. + /// + /// The read only list of messages. + /// The aggregation result. + protected abstract TResult GetResult( + IReadOnlyDictionary> messages + ); + + protected enum FilterResult + { + Ignore = 0, + StartMessage = 1, + Delta = 2, + Content = 3, + EndMessage = 4, + } +} diff --git a/src/Anthropic/Models/Beta/Messages/BetaClearToolUses20250919Edit.cs b/src/Anthropic/Models/Beta/Messages/BetaClearToolUses20250919Edit.cs index cac1b505..d2ce73ff 100644 --- a/src/Anthropic/Models/Beta/Messages/BetaClearToolUses20250919Edit.cs +++ b/src/Anthropic/Models/Beta/Messages/BetaClearToolUses20250919Edit.cs @@ -446,9 +446,9 @@ public JsonElement Type get { return Match(betaInputTokens: (x) => x.Type, betaToolUses: (x) => x.Type); } } - public long Value1 + public long ValueValue { - get { return Match(betaInputTokens: (x) => x.Value, betaToolUses: (x) => x.Value); } + get { return Match(betaInputTokens: (x) => x.ValueValue, betaToolUses: (x) => x.Value); } } public Trigger(BetaInputTokensTrigger value, JsonElement? element = null) diff --git a/src/Anthropic/Models/Beta/Messages/BetaInputTokensTrigger.cs b/src/Anthropic/Models/Beta/Messages/BetaInputTokensTrigger.cs index f1f75820..2c298d3a 100644 --- a/src/Anthropic/Models/Beta/Messages/BetaInputTokensTrigger.cs +++ b/src/Anthropic/Models/Beta/Messages/BetaInputTokensTrigger.cs @@ -21,7 +21,7 @@ public JsonElement Type init { this._rawData.Set("type", value); } } - public required long Value + public required long ValueValue { get { @@ -38,7 +38,7 @@ public override void Validate() { throw new AnthropicInvalidDataException("Invalid value given for constant"); } - _ = this.Value; + _ = this.ValueValue; } public BetaInputTokensTrigger() @@ -76,10 +76,10 @@ IReadOnlyDictionary rawData } [SetsRequiredMembers] - public BetaInputTokensTrigger(long value) + public BetaInputTokensTrigger(long valueValue) : this() { - this.Value = value; + this.ValueValue = valueValue; } } diff --git a/src/Anthropic/Services/Beta/MessageService.cs b/src/Anthropic/Services/Beta/MessageService.cs index 47a99b95..b804354a 100644 --- a/src/Anthropic/Services/Beta/MessageService.cs +++ b/src/Anthropic/Services/Beta/MessageService.cs @@ -124,7 +124,13 @@ public async Task> Create( ._client.WithOptions(options => options with { - Timeout = options.Timeout ?? TimeSpan.FromMinutes(10), + Timeout = + options.Timeout + ?? ClientOptions.TimeoutFromMaxTokens( + parameters.MaxTokens, + isStreaming: false, + parameters.Model + ), } ) .Execute(request, cancellationToken) @@ -172,7 +178,13 @@ public async Task> CreateStream ._client.WithOptions(options => options with { - Timeout = options.Timeout ?? TimeSpan.FromMinutes(10), + Timeout = + options.Timeout + ?? ClientOptions.TimeoutFromMaxTokens( + parameters.MaxTokens, + isStreaming: true, + parameters.Model + ), } ) .Execute(request, cancellationToken) diff --git a/src/Anthropic/Services/Beta/Messages/AnthropicBetaClientExtensions.cs b/src/Anthropic/Services/Beta/Messages/AnthropicBetaClientExtensions.cs index 1736674e..72a8c46c 100644 --- a/src/Anthropic/Services/Beta/Messages/AnthropicBetaClientExtensions.cs +++ b/src/Anthropic/Services/Beta/Messages/AnthropicBetaClientExtensions.cs @@ -197,6 +197,87 @@ is string version { _ = schemaObj.Remove(propName); } + + // Handle nullable union types (e.g., "type": ["integer", "null"]) for + // non-required properties. The C# MCP SDK generates these for optional + // parameters, but structured outputs doesn't support union types. + // Only transform non-required properties; required + nullable is an + // incompatible schema that should fail at the API level. + if ( + schemaObj.TryGetPropertyValue("properties", out JsonNode? propsNode) + && propsNode is JsonObject propsObj + ) + { + HashSet requiredProps = []; + if ( + schemaObj.TryGetPropertyValue("required", out JsonNode? reqNode) + && reqNode is JsonArray reqArray + ) + { + foreach (JsonNode? req in reqArray) + { + if (req?.GetValue() is string reqName) + { + requiredProps.Add(reqName); + } + } + } + + foreach (var prop in propsObj) + { + if (requiredProps.Contains(prop.Key)) + { + continue; + } + + if (prop.Value is not JsonObject propSchema) + { + continue; + } + + if ( + !propSchema.TryGetPropertyValue("type", out JsonNode? typeNode) + || typeNode is not JsonArray typeArray + ) + { + continue; + } + + int nullIndex = -1; + for (int i = 0; i < typeArray.Count; i++) + { + if (typeArray[i]?.GetValue() == "null") + { + nullIndex = i; + break; + } + } + + if (nullIndex < 0) + { + continue; + } + + typeArray.RemoveAt(nullIndex); + + if (typeArray.Count == 1) + { + propSchema["type"] = typeArray[0]?.DeepClone(); + } + + if ( + propSchema.TryGetPropertyValue( + "default", + out JsonNode? defaultNode + ) + && defaultNode is JsonValue defaultValue + && defaultValue.GetValueKind() == JsonValueKind.Null + ) + { + propSchema.Remove("default"); + } + } + } } return schemaNode; @@ -516,7 +597,8 @@ out List? systemMessages { if (content is TextContent tc) { - (systemMessages ??= []).Add(new() { Text = tc.Text }); + var block = new BetaTextBlockParam { Text = tc.Text }; + (systemMessages ??= []).Add(WithCacheControlFrom(block, tc)); } } @@ -541,43 +623,65 @@ out List? systemMessages text = text.TrimEnd(); if (!string.IsNullOrWhiteSpace(text)) { - contents.Add(new BetaTextBlockParam() { Text = text }); + contents.Add( + WithCacheControlFrom( + new BetaTextBlockParam() { Text = text }, + tc + ) + ); } } else if (!string.IsNullOrWhiteSpace(text)) { - contents.Add(new BetaTextBlockParam() { Text = text }); + contents.Add( + WithCacheControlFrom( + new BetaTextBlockParam() { Text = text }, + tc + ) + ); } break; case TextReasoningContent trc when !string.IsNullOrEmpty(trc.Text): contents.Add( - new BetaThinkingBlockParam() - { - Thinking = trc.Text, - Signature = trc.ProtectedData ?? string.Empty, - } + WithCacheControlFrom( + new BetaThinkingBlockParam() + { + Thinking = trc.Text, + Signature = trc.ProtectedData ?? string.Empty, + }, + trc + ) ); break; case TextReasoningContent trc when !string.IsNullOrEmpty(trc.ProtectedData): contents.Add( - new BetaRedactedThinkingBlockParam() { Data = trc.ProtectedData! } + WithCacheControlFrom( + new BetaRedactedThinkingBlockParam() + { + Data = trc.ProtectedData!, + }, + trc + ) ); break; case DataContent dc when dc.HasTopLevelMediaType("image"): contents.Add( - new BetaImageBlockParam() - { - Source = new( - new BetaBase64ImageSource() - { - Data = dc.Base64Data.ToString(), - MediaType = dc.MediaType, - } - ), - } + WithCacheControlFrom( + new BetaImageBlockParam() + { + Source = new( + new BetaBase64ImageSource() + { + Data = dc.Base64Data.ToString(), + MediaType = dc.MediaType, + } + ), + }, + dc + ) ); break; @@ -588,44 +692,53 @@ when string.Equals( StringComparison.OrdinalIgnoreCase ): contents.Add( - new BetaRequestDocumentBlock() - { - Source = new( - new BetaBase64PdfSource() - { - Data = dc.Base64Data.ToString(), - } - ), - } + WithCacheControlFrom( + new BetaRequestDocumentBlock() + { + Source = new( + new BetaBase64PdfSource() + { + Data = dc.Base64Data.ToString(), + } + ), + }, + dc + ) ); break; case DataContent dc when dc.HasTopLevelMediaType("text"): contents.Add( - new BetaRequestDocumentBlock() - { - Source = new( - new BetaPlainTextSource() - { + WithCacheControlFrom( + new BetaRequestDocumentBlock() + { + Source = new( + new BetaPlainTextSource() + { #if NET - Data = Encoding.UTF8.GetString(dc.Data.Span), + Data = Encoding.UTF8.GetString(dc.Data.Span), #else - Data = Encoding.UTF8.GetString(dc.Data.ToArray()), + Data = Encoding.UTF8.GetString(dc.Data.ToArray()), #endif - } - ), - } + } + ), + }, + dc + ) ); break; case UriContent uc when uc.HasTopLevelMediaType("image"): contents.Add( - new BetaImageBlockParam() - { - Source = new( - new BetaUrlImageSource() { Url = uc.Uri.AbsoluteUri } - ), - } + WithCacheControlFrom( + new BetaImageBlockParam() + { + Source = new( + new BetaUrlImageSource() { Url = uc.Uri.AbsoluteUri } + ), + }, + uc + ) ); break; @@ -636,44 +749,53 @@ when string.Equals( StringComparison.OrdinalIgnoreCase ): contents.Add( - new BetaRequestDocumentBlock() - { - Source = new( - new BetaUrlPdfSource() { Url = uc.Uri.AbsoluteUri } - ), - } + WithCacheControlFrom( + new BetaRequestDocumentBlock() + { + Source = new( + new BetaUrlPdfSource() { Url = uc.Uri.AbsoluteUri } + ), + }, + uc + ) ); break; case HostedFileContent fc: contents.Add( - new BetaRequestDocumentBlock() - { - Source = new(new BetaFileDocumentSource(fc.FileId)), - } + WithCacheControlFrom( + new BetaRequestDocumentBlock() + { + Source = new(new BetaFileDocumentSource(fc.FileId)), + }, + fc + ) ); break; case FunctionCallContent fcc: contents.Add( - new BetaToolUseBlockParam() - { - ID = fcc.CallId, - Name = fcc.Name, - Input = - fcc.Arguments?.ToDictionary( - e => e.Key, - e => - e.Value is JsonElement je - ? je - : JsonSerializer.SerializeToElement( - e.Value, - AIJsonUtilities.DefaultOptions.GetTypeInfo( - typeof(object) + WithCacheControlFrom( + new BetaToolUseBlockParam() + { + ID = fcc.CallId, + Name = fcc.Name, + Input = + fcc.Arguments?.ToDictionary( + e => e.Key, + e => + e.Value is JsonElement je + ? je + : JsonSerializer.SerializeToElement( + e.Value, + AIJsonUtilities.DefaultOptions.GetTypeInfo( + typeof(object) + ) ) - ) - ) ?? [], - } + ) ?? [], + }, + fcc + ) ); break; @@ -824,12 +946,15 @@ when string.Equals( } contents.Add( - new BetaToolResultBlockParam() - { - ToolUseID = frc.CallId, - IsError = frc.Exception is not null, - Content = result, - } + WithCacheControlFrom( + new BetaToolResultBlockParam() + { + ToolUseID = frc.CallId, + IsError = frc.Exception is not null, + Content = result, + }, + frc + ) ); break; } @@ -857,6 +982,51 @@ when string.Equals( return messageParams; } + /// + /// Applies cache control from an to a beta content block param if configured. + /// + /// + /// Converts from (used by the extension) + /// to (used by the beta API). + /// Note: BetaThinkingBlockParam and BetaRedactedThinkingBlockParam do not support cache control. + /// + private static T WithCacheControlFrom(T block, AIContent content) + where T : class + { + var cacheControl = content.GetCacheControl(); + if (cacheControl is null) + { + return block; + } + + // Convert non-beta CacheControlEphemeral to BetaCacheControlEphemeral + // Note: Ttl enum exists in both namespaces, using fully qualified names to disambiguate + var betaCacheControl = new BetaCacheControlEphemeral + { + Ttl = cacheControl.Ttl?.Value() switch + { + Anthropic.Models.Messages.Ttl.Ttl5m => Anthropic.Models.Beta.Messages.Ttl.Ttl5m, + Anthropic.Models.Messages.Ttl.Ttl1h => Anthropic.Models.Beta.Messages.Ttl.Ttl1h, + _ => null, + }, + }; + + return block switch + { + BetaTextBlockParam tb => (tb with { CacheControl = betaCacheControl }) as T + ?? block, + BetaImageBlockParam ib => (ib with { CacheControl = betaCacheControl }) as T + ?? block, + BetaRequestDocumentBlock db => (db with { CacheControl = betaCacheControl }) as T + ?? block, + BetaToolUseBlockParam tub => (tub with { CacheControl = betaCacheControl }) as T + ?? block, + BetaToolResultBlockParam trb => (trb with { CacheControl = betaCacheControl }) as T + ?? block, + _ => block, + }; + } + private MessageCreateParams GetMessageCreateParams( List messages, List? systemMessages, @@ -998,20 +1168,6 @@ createParams.OutputFormat is null JsonElement inputSchema = af.JsonSchema; if (inputSchema.ValueKind is JsonValueKind.Object) { - if ( - inputSchema.TryGetProperty( - "properties", - out JsonElement propsElement - ) - && propsElement.ValueKind is JsonValueKind.Object - ) - { - foreach (JsonProperty p in propsElement.EnumerateObject()) - { - properties[p.Name] = p.Value; - } - } - if ( inputSchema.TryGetProperty( "required", @@ -1032,6 +1188,23 @@ r.ValueKind is JsonValueKind.String } } } + + if ( + inputSchema.TryGetProperty( + "properties", + out JsonElement propsElement + ) + && propsElement.ValueKind is JsonValueKind.Object + ) + { + foreach (JsonProperty p in propsElement.EnumerateObject()) + { + // Transform nullable union types for non-required properties + properties[p.Name] = required.Contains(p.Name) + ? p.Value + : TransformNullableUnionType(p.Value); + } + } } (createdTools ??= []).Add( @@ -1515,6 +1688,77 @@ out Uri? url return annotation; } + /// + /// Transforms a property schema with nullable union type (e.g., "type": ["integer", "null"]) + /// to a simple type (e.g., "type": "integer") and removes "default": null. + /// This handles schemas generated by the C# MCP SDK for optional nullable parameters. + /// + private static JsonElement TransformNullableUnionType(JsonElement propertySchema) + { + if (propertySchema.ValueKind is not JsonValueKind.Object) + { + return propertySchema; + } + + if ( + !propertySchema.TryGetProperty("type", out JsonElement typeElement) + || typeElement.ValueKind is not JsonValueKind.Array + ) + { + return propertySchema; + } + + List nonNullTypes = []; + bool hasNull = false; + foreach (JsonElement t in typeElement.EnumerateArray()) + { + if (t.ValueKind is JsonValueKind.String && t.GetString() == "null") + { + hasNull = true; + } + else + { + nonNullTypes.Add(t); + } + } + + if (!hasNull || nonNullTypes.Count == 0) + { + return propertySchema; + } + + var transformed = new Dictionary(); + foreach (JsonProperty prop in propertySchema.EnumerateObject()) + { + if (prop.Name == "type") + { + if (nonNullTypes.Count == 1) + { + transformed["type"] = nonNullTypes[0]; + } + else + { + transformed["type"] = JsonSerializer.SerializeToElement( + nonNullTypes.Select(t => t.GetString()) + ); + } + } + else if (prop.Name == "default") + { + if (prop.Value.ValueKind is not JsonValueKind.Null) + { + transformed[prop.Name] = prop.Value; + } + } + else + { + transformed[prop.Name] = prop.Value; + } + } + + return JsonSerializer.SerializeToElement(transformed); + } + private sealed class StreamingFunctionData { public string CallId { get; set; } = ""; diff --git a/src/Anthropic/Services/MessageService.cs b/src/Anthropic/Services/MessageService.cs index f90f19e6..425385af 100644 --- a/src/Anthropic/Services/MessageService.cs +++ b/src/Anthropic/Services/MessageService.cs @@ -124,7 +124,13 @@ public async Task> Create( ._client.WithOptions(options => options with { - Timeout = options.Timeout ?? TimeSpan.FromMinutes(10), + Timeout = + options.Timeout + ?? ClientOptions.TimeoutFromMaxTokens( + parameters.MaxTokens, + isStreaming: false, + parameters.Model + ), } ) .Execute(request, cancellationToken) @@ -170,7 +176,13 @@ public async Task> CreateStreaming( ._client.WithOptions(options => options with { - Timeout = options.Timeout ?? TimeSpan.FromMinutes(10), + Timeout = + options.Timeout + ?? ClientOptions.TimeoutFromMaxTokens( + parameters.MaxTokens, + isStreaming: true, + parameters.Model + ), } ) .Execute(request, cancellationToken) diff --git a/src/Anthropic/SseAggregatorExtensions.cs b/src/Anthropic/SseAggregatorExtensions.cs new file mode 100644 index 00000000..82a35701 --- /dev/null +++ b/src/Anthropic/SseAggregatorExtensions.cs @@ -0,0 +1,68 @@ +using System.Collections.Generic; +using System.Threading.Tasks; +using Anthropic.Helpers; +using Anthropic.Models.Beta.Messages; +using Anthropic.Models.Messages; + +namespace Anthropic; + +/// +/// Extension methods for providing easy access to Aggregators +/// +public static class SseAggregatorExtensions +{ + /// + /// Aggregates all messages received by the streaming event and aggregates them into a single object once the sender indicates a fully delivered stream. + /// + /// A enumerable as provided by the + /// A task that completes once all messages have been received or in the event of improper streaming and exception. + public static async Task Aggregate(this IAsyncEnumerable source) + { + return await new MessageContentAggregator().Aggregate(source).ConfigureAwait(false); + } + + /// + /// Aggregates all messages received by the streaming event and aggregates them into a single object once the sender indicates a fully delivered stream. + /// + /// A enumerable as provided by the + /// A task that completes once all messages have been received or in the event of improper streaming and exception. + public static async Task Aggregate( + this IAsyncEnumerable source + ) + { + return await new BetaMessageContentAggregator().Aggregate(source).ConfigureAwait(false); + } + + /// + /// Aggregates all messages received by the streaming event and aggregates them into a single object once the sender indicates a fully delivered stream. + /// + /// The type of message as provided by the Api. + /// The Result object that the aggregator should build. + /// The aggregator instance that will collect messages and build the result object. + /// The source stream of messages. + /// A task that completes after all messages from the source have been consumed. + public static async Task Aggregate( + this SseAggregator aggregator, + IAsyncEnumerable source + ) + { + await foreach (var _ in aggregator.CollectAsync(source)) { } + return aggregator.Message(); + } + + /// + /// Aggregates all messages received by the streaming event and collects them in the provided aggregator. + /// + /// The type of message as provided by the Api. + /// The Result object that the aggregator should build. + /// The source stream of messages. + /// The aggregator instance that will collect messages and build the result object. + /// An filtered by the aggregator. + public static IAsyncEnumerable CollectAsync( + this IAsyncEnumerable source, + SseAggregator aggregator + ) + { + return aggregator.CollectAsync(source); + } +}