diff --git a/.editorconfig b/.editorconfig
new file mode 100644
index 0000000..d1c262a
--- /dev/null
+++ b/.editorconfig
@@ -0,0 +1,114 @@
+# EditorConfig to support per-solution formatting.
+# Use the EditorConfig VS add-in to make this work.
+# http://editorconfig.org/
+root = true
+
+[*]
+indent_style = space
+charset = utf-8
+trim_trailing_whitespace = true
+insert_final_newline = false
+
+[*.cs]
+indent_size = 4
+#
+# Language Conventions
+#
+# https://docs.microsoft.com/en-us/visualstudio/ide/editorconfig-language-conventions?view=vs-2017#this-and-me
+dotnet_style_qualification_for_field = false
+dotnet_style_qualification_for_property = false
+dotnet_style_qualification_for_method = false
+dotnet_style_qualification_for_event = false
+# https://docs.microsoft.com/en-us/visualstudio/ide/editorconfig-language-conventions?view=vs-2017#language-keywords
+dotnet_style_predefined_type_for_locals_parameters_members = true
+dotnet_style_predefined_type_for_member_access = true
+# https://docs.microsoft.com/en-us/visualstudio/ide/editorconfig-language-conventions?view=vs-2017#normalize-modifiers
+dotnet_style_require_accessibility_modifiers = always
+dotnet_style_readonly_field = true
+csharp_preferred_modifier_order = public,private,protected,internal,static,extern,new,virtual,abstract,sealed,override,readonly,unsafe,volatile,async
+# https://docs.microsoft.com/en-us/visualstudio/ide/editorconfig-language-conventions?view=vs-2017#parentheses-preferences
+dotnet_style_parentheses_in_arithmetic_binary_operators = always_for_clarity
+dotnet_style_parentheses_in_relational_binary_operators = always_for_clarity
+dotnet_style_parentheses_in_other_binary_operators = always_for_clarity
+dotnet_style_parentheses_in_other_operators = never_if_unnecessary
+# https://docs.microsoft.com/en-us/visualstudio/ide/editorconfig-language-conventions?view=vs-2017#expression-level-preferences
+dotnet_style_object_initializer = true
+dotnet_style_collection_initializer = true
+dotnet_style_explicit_tuple_names = true
+dotnet_style_prefer_inferred_tuple_names = true
+dotnet_style_prefer_inferred_anonymous_type_member_names = true
+dotnet_style_prefer_auto_properties = true
+dotnet_style_prefer_conditional_expression_over_assignment = true
+dotnet_style_prefer_conditional_expression_over_return = true
+dotnet_style_prefer_compound_assignment = true
+# https://docs.microsoft.com/en-us/visualstudio/ide/editorconfig-language-conventions?view=vs-2017#null-checking-preferences
+dotnet_style_coalesce_expression = true
+dotnet_style_null_propagation = true
+#
+# Formatting Conventions
+#
+# https://docs.microsoft.com/en-us/visualstudio/ide/editorconfig-formatting-conventions?view=vs-2017#organize-using-directives
+dotnet_sort_system_directives_first = true
+dotnet_separate_import_directive_groups = false
+# https://docs.microsoft.com/en-us/visualstudio/ide/editorconfig-formatting-conventions?view=vs-2017#indentation-options
+csharp_indent_case_contents = true
+csharp_indent_switch_labels = true
+csharp_indent_labels = flush_left
+csharp_indent_block_contents = true
+csharp_indent_braces = false
+csharp_indent_case_contents_when_block = true
+# https://docs.microsoft.com/en-us/visualstudio/ide/editorconfig-formatting-conventions?view=vs-2017#spacing-options
+csharp_space_after_cast = true
+csharp_space_after_keywords_in_control_flow_statements = true
+csharp_space_between_parentheses = false
+csharp_space_before_colon_in_inheritance_clause = true
+csharp_space_after_colon_in_inheritance_clause = true
+csharp_space_around_binary_operators = before_and_after
+csharp_space_between_method_declaration_parameter_list_parentheses = false
+csharp_space_between_method_declaration_empty_parameter_list_parentheses = false
+csharp_space_between_method_declaration_name_and_open_parenthesis = false
+csharp_space_between_method_call_parameter_list_parentheses = false
+csharp_space_between_method_call_empty_parameter_list_parentheses = false
+csharp_space_between_method_call_name_and_opening_parenthesis = false
+csharp_space_after_comma = true
+csharp_space_before_comma = false
+csharp_space_after_dot = false
+csharp_space_before_dot = false
+csharp_space_after_semicolon_in_for_statement = true
+csharp_space_before_semicolon_in_for_statement = false
+csharp_space_around_declaration_statements = false
+csharp_space_before_open_square_brackets = false
+csharp_space_between_empty_square_brackets = false
+csharp_space_between_square_brackets = false
+# https://docs.microsoft.com/en-us/visualstudio/ide/editorconfig-formatting-conventions?view=vs-2017#wrap-options
+csharp_preserve_single_line_statements = false
+csharp_preserve_single_line_blocks = true
+
+# name all constant fields using PascalCase
+dotnet_naming_rule.constant_fields_should_be_pascal_case.severity = suggestion
+dotnet_naming_rule.constant_fields_should_be_pascal_case.symbols = constant_fields
+dotnet_naming_rule.constant_fields_should_be_pascal_case.style = pascal_case_style
+dotnet_naming_symbols.constant_fields.applicable_kinds = field
+dotnet_naming_symbols.constant_fields.required_modifiers = const
+dotnet_naming_style.pascal_case_style.capitalization = pascal_case
+# internal and private fields should be _camelCase
+dotnet_naming_rule.camel_case_for_private_internal_fields.severity = suggestion
+dotnet_naming_rule.camel_case_for_private_internal_fields.symbols = private_internal_fields
+dotnet_naming_rule.camel_case_for_private_internal_fields.style = camel_case_underscore_style
+dotnet_naming_symbols.private_internal_fields.applicable_kinds = field
+dotnet_naming_symbols.private_internal_fields.applicable_accessibilities = private, internal
+dotnet_naming_style.camel_case_underscore_style.required_prefix = _
+dotnet_naming_style.camel_case_underscore_style.capitalization = camel_case
+
+[*.{xml,config,*proj,nuspec,props,resx,targets,yml,tasks}]
+indent_size = 2
+
+[*.json]
+indent_size = 2
+
+[*.{ps1,psm1}]
+indent_size = 4
+
+[*.sh]
+indent_size = 4
+end_of_line = lf
\ No newline at end of file
diff --git a/.github/workflows/ci-pipeline.yml b/.github/workflows/ci-pipeline.yml
new file mode 100644
index 0000000..11c0604
--- /dev/null
+++ b/.github/workflows/ci-pipeline.yml
@@ -0,0 +1,25 @@
+name: CI Pipeline
+
+on:
+ push:
+ branches: [ master ]
+ pull_request:
+ branches: [ master ]
+
+jobs:
+ build:
+
+ runs-on: ubuntu-latest
+
+ steps:
+ - uses: actions/checkout@v2
+ - name: Setup .NET Core
+ uses: actions/setup-dotnet@v1
+ with:
+ dotnet-version: 3.1.401
+ - name: Install dependencies
+ run: dotnet restore
+ - name: Build
+ run: dotnet build --configuration Release --no-restore
+ - name: Test
+ run: dotnet test --no-restore --verbosity normal
\ No newline at end of file
diff --git a/.github/workflows/release-builds.yml b/.github/workflows/release-builds.yml
new file mode 100644
index 0000000..b4312ab
--- /dev/null
+++ b/.github/workflows/release-builds.yml
@@ -0,0 +1,26 @@
+name: Release Builds
+
+on:
+ create:
+ tags:
+ - '*'
+
+jobs:
+ build:
+
+ runs-on: ubuntu-latest
+
+ steps:
+ - uses: actions/checkout@v2
+ - name: Setup .NET Core
+ uses: actions/setup-dotnet@v1
+ with:
+ dotnet-version: 3.1.401
+ - name: Install dependencies
+ run: dotnet restore
+ - name: Build
+ run: dotnet build --configuration Release --no-restore
+ - name: Test
+ run: dotnet test --no-restore --verbosity normal
+ - name: Publish Package
+ run: dotnet nuget push "artifacts/nupkgs/*.nupkg" -s ${{ secrets.NUGET_URL }} -k ${{ secrets.NUGET_TOKEN }}
\ No newline at end of file
diff --git a/.gitignore b/.gitignore
index 3a2238d..d4412b8 100644
--- a/.gitignore
+++ b/.gitignore
@@ -1,7 +1,10 @@
## Ignore Visual Studio temporary files, build results, and
## files generated by popular Visual Studio add-ons.
+##
+## Get latest from https://github.com/github/gitignore/blob/master/VisualStudio.gitignore
# User-specific files
+*.rsuser
*.suo
*.user
*.userosscache
@@ -10,48 +13,68 @@
# User-specific files (MonoDevelop/Xamarin Studio)
*.userprefs
+# Mono auto generated files
+mono_crash.*
+
# Build results
[Dd]ebug/
[Dd]ebugPublic/
[Rr]elease/
[Rr]eleases/
-[Xx]64/
-[Xx]86/
-[Bb]uild/
+x64/
+x86/
+[Aa][Rr][Mm]/
+[Aa][Rr][Mm]64/
bld/
[Bb]in/
[Oo]bj/
+[Ll]og/
-# Visual Studio 2015 cache/options directory
+# Visual Studio 2015/2017 cache/options directory & Rider
.vs/
+.idea/
# Uncomment if you have tasks that create the project's static files in wwwroot
#wwwroot/
+# Visual Studio 2017 auto generated files
+Generated\ Files/
+
# MSTest test Results
[Tt]est[Rr]esult*/
[Bb]uild[Ll]og.*
-# NUNIT
+# NUnit
*.VisualState.xml
TestResult.xml
+nunit-*.xml
# Build Results of an ATL Project
[Dd]ebugPS/
[Rr]eleasePS/
dlldata.c
-# DNX
+# Benchmark Results
+BenchmarkDotNet.Artifacts/
+
+# .NET Core
project.lock.json
+project.fragment.lock.json
artifacts/
+# StyleCop
+StyleCopReport.xml
+
+# Files built by Visual Studio
*_i.c
*_p.c
-*_i.h
+*_h.h
*.ilk
*.meta
*.obj
+*.iobj
*.pch
*.pdb
+*.ipdb
*.pgc
*.pgd
*.rsp
@@ -61,6 +84,7 @@ artifacts/
*.tlh
*.tmp
*.tmp_proj
+*_wpftmp.csproj
*.log
*.vspscc
*.vssscc
@@ -81,6 +105,7 @@ ipch/
*.sdf
*.cachefile
*.VC.db
+*.VC.VC.opendb
# Visual Studio profiler
*.psess
@@ -88,6 +113,9 @@ ipch/
*.vspx
*.sap
+# Visual Studio Trace Files
+*.e2e
+
# TFS 2012 Local Workspace
$tf/
@@ -108,6 +136,14 @@ _TeamCity*
# DotCover is a Code Coverage Tool
*.dotCover
+# AxoCover is a Code Coverage Tool
+.axoCover/*
+!.axoCover/settings.json
+
+# Visual Studio code coverage results
+*.coverage
+*.coveragexml
+
# NCrunch
_NCrunch_*
.*crunch*.local.xml
@@ -139,22 +175,27 @@ publish/
# Publish Web Output
*.[Pp]ublish.xml
*.azurePubxml
-
-# TODO: Un-comment the next line if you do not want to checkin
-# your web deploy settings because they may include unencrypted
-# passwords
-#*.pubxml
+# Note: Comment the next line if you want to checkin your web deploy settings,
+# but database connection strings (with potential passwords) will be unencrypted
+*.pubxml
*.publishproj
+# Microsoft Azure Web App publish settings. Comment the next line if you want to
+# checkin your Azure Web App publish settings, but sensitive information contained
+# in these scripts will be unencrypted
+PublishScripts/
+
# NuGet Packages
*.nupkg
+# NuGet Symbol Packages
+*.snupkg
# The packages folder can be ignored because of Package Restore
-**/packages/*
+**/[Pp]ackages/*
# except build/, which is used as an MSBuild target.
-!**/packages/build/
+!**/[Pp]ackages/build/
# Uncomment if necessary however generally it will be regenerated when needed
-#!**/packages/repositories.config
-# NuGet v3's project.json files produces more ignoreable files
+#!**/[Pp]ackages/repositories.config
+# NuGet v3's project.json files produces more ignorable files
*.nuget.props
*.nuget.targets
@@ -166,31 +207,40 @@ csx/
ecf/
rcf/
-# Microsoft Azure ApplicationInsights config file
-ApplicationInsights.config
-
-# Windows Store app package directory
+# Windows Store app package directories and files
AppPackages/
BundleArtifacts/
+Package.StoreAssociation.xml
+_pkginfo.txt
+*.appx
+*.appxbundle
+*.appxupload
# Visual Studio cache files
# files ending in .cache can be ignored
*.[Cc]ache
# but keep track of directories ending in .cache
-!*.[Cc]ache/
+!?*.[Cc]ache/
# Others
ClientBin/
-[Ss]tyle[Cc]op.*
~$*
*~
*.dbmdl
*.dbproj.schemaview
+*.jfm
*.pfx
*.publishsettings
-node_modules/
orleans.codegen.cs
+# Including strong name files can present a security risk
+# (https://github.com/github/gitignore/pull/2483#issue-259490424)
+#*.snk
+
+# Since there are multiple workflows, uncomment next line to ignore bower_components
+# (https://github.com/github/gitignore/pull/1529#issuecomment-104372622)
+#bower_components/
+
# RIA/Silverlight projects
Generated_Code/
@@ -201,15 +251,22 @@ _UpgradeReport_Files/
Backup*/
UpgradeLog*.XML
UpgradeLog*.htm
+ServiceFabricBackup/
+*.rptproj.bak
# SQL Server files
*.mdf
*.ldf
+*.ndf
# Business Intelligence projects
*.rdl.data
*.bim.layout
*.bim_*.settings
+*.rptproj.rsuser
+*- [Bb]ackup.rdl
+*- [Bb]ackup ([0-9]).rdl
+*- [Bb]ackup ([0-9][0-9]).rdl
# Microsoft Fakes
FakesAssemblies/
@@ -219,6 +276,7 @@ FakesAssemblies/
# Node.js Tools for Visual Studio
.ntvs_analysis.dat
+node_modules/
# Visual Studio 6 build log
*.plg
@@ -226,6 +284,9 @@ FakesAssemblies/
# Visual Studio 6 workspace options file
*.opt
+# Visual Studio 6 auto-generated workspace file (contains which files were open etc.)
+*.vbw
+
# Visual Studio LightSwitch build output
**/*.HTMLClient/GeneratedArtifacts
**/*.DesktopClient/GeneratedArtifacts
@@ -234,12 +295,59 @@ FakesAssemblies/
**/*.Server/ModelManifest.xml
_Pvt_Extensions
-# LightSwitch generated files
-GeneratedArtifacts/
-ModelManifest.xml
-
# Paket dependency manager
.paket/paket.exe
+paket-files/
# FAKE - F# Make
-.fake/
\ No newline at end of file
+.fake/
+
+# CodeRush personal settings
+.cr/personal
+
+# Python Tools for Visual Studio (PTVS)
+__pycache__/
+*.pyc
+
+# Cake - Uncomment if you are using it
+# tools/**
+# !tools/packages.config
+
+# Tabs Studio
+*.tss
+
+# Telerik's JustMock configuration file
+*.jmconfig
+
+# BizTalk build output
+*.btp.cs
+*.btm.cs
+*.odx.cs
+*.xsd.cs
+
+# OpenCover UI analysis results
+OpenCover/
+
+# Azure Stream Analytics local run output
+ASALocalRun/
+
+# MSBuild Binary and Structured Log
+*.binlog
+
+# NVidia Nsight GPU debugger configuration file
+*.nvuser
+
+# MFractors (Xamarin productivity tool) working folder
+.mfractor/
+
+# Local History for Visual Studio
+.localhistory/
+
+# BeatPulse healthcheck temp database
+healthchecksdb
+
+# Backup folder for Package Reference Convert tool in Visual Studio 2017
+MigrationBackup/
+
+# Ionide (cross platform F# VS Code tools) working folder
+.ionide/
\ No newline at end of file
diff --git a/Directory.Build.props b/Directory.Build.props
new file mode 100644
index 0000000..3d2f34a
--- /dev/null
+++ b/Directory.Build.props
@@ -0,0 +1,21 @@
+
+
+ Im5tu;Stuart Blackler
+ OpenMessage;Messaging;ServiceBus;AspNetCore;ESB
+ https://github.com/Im5tu/OpenMessage
+ git
+ https://github.com/Im5tu/OpenMessage.git
+
+ Latest
+
+ enable
+
+ true
+
+ $(WarningsNotAsErrors);CS1591
+
+ $(WarningsNotAsErrors);xUnit1004
+
+ $(MSBuildThisFileDirectory)
+
+
\ No newline at end of file
diff --git a/Directory.Build.targets b/Directory.Build.targets
new file mode 100644
index 0000000..5960090
--- /dev/null
+++ b/Directory.Build.targets
@@ -0,0 +1,5 @@
+
+
+
+
+
\ No newline at end of file
diff --git a/OpenMessage.sln b/OpenMessage.sln
index a1ae23c..31957ca 100644
--- a/OpenMessage.sln
+++ b/OpenMessage.sln
@@ -1,30 +1,101 @@
Microsoft Visual Studio Solution File, Format Version 12.00
-# Visual Studio 14
-VisualStudioVersion = 14.0.25420.1
+# Visual Studio Version 16
+VisualStudioVersion = 16.0.28721.148
MinimumVisualStudioVersion = 10.0.40219.1
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "src", "src", "{E8751802-CCE4-4C40-9241-E5E01DBEF627}"
+ ProjectSection(SolutionItems) = preProject
+ src\Directory.Build.props = src\Directory.Build.props
+ src\Directory.Build.targets = src\Directory.Build.targets
+ EndProjectSection
EndProject
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Solution Items", "Solution Items", "{F8E4FD3C-F93D-40B6-9330-9415FB35B3E2}"
ProjectSection(SolutionItems) = preProject
- global.json = global.json
+ .editorconfig = .editorconfig
+ .gitattributes = .gitattributes
+ .gitignore = .gitignore
+ build.ps1 = build.ps1
+ Directory.Build.props = Directory.Build.props
+ Directory.Build.targets = Directory.Build.targets
+ readme.md = readme.md
EndProjectSection
EndProject
-Project("{8BB2217D-0F2D-49D1-97BC-3654ED321F3B}") = "OpenMessage", "src\OpenMessage\OpenMessage.xproj", "{7D68A283-6D90-4A50-B015-D8C2BB5C7184}"
-EndProject
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "tests", "tests", "{B4E58D4F-40CD-45CA-AC6F-1DF4D9CB9A49}"
+ ProjectSection(SolutionItems) = preProject
+ tests\Directory.Build.props = tests\Directory.Build.props
+ tests\Directory.Build.targets = tests\Directory.Build.targets
+ EndProjectSection
+EndProject
+Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "OpenMessage", "src\OpenMessage\OpenMessage.csproj", "{7D68A283-6D90-4A50-B015-D8C2BB5C7184}"
+EndProject
+Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "samples", "samples", "{B4CC2690-ADC2-4B78-A961-06C0D6CD948F}"
+ ProjectSection(SolutionItems) = preProject
+ docker-compose.yml = docker-compose.yml
+ EndProjectSection
+EndProject
+Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "OpenMessage.Azure.ServiceBus", "src\OpenMessage.Azure.ServiceBus\OpenMessage.Azure.ServiceBus.csproj", "{46AD3311-B953-49F4-8B7B-9E9AE5E368F7}"
+EndProject
+Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "OpenMessage.Azure.EventHubs", "src\OpenMessage.Azure.EventHubs\OpenMessage.Azure.EventHubs.csproj", "{D2EBDC48-95C2-4EBD-9F07-5E3379ECDCF9}"
+EndProject
+Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "OpenMessage.NATS", "src\OpenMessage.NATS\OpenMessage.NATS.csproj", "{D2AF2B8F-947B-447E-BC30-B13ECD28A40E}"
+EndProject
+Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "OpenMessage.RabbitMq", "src\OpenMessage.RabbitMq\OpenMessage.RabbitMq.csproj", "{46A8FD07-E04E-48F0-8D38-5B61320A3CB8}"
+EndProject
+Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "OpenMessage.AWS.SQS", "src\OpenMessage.AWS.SQS\OpenMessage.AWS.SQS.csproj", "{ECF700A0-BC08-4796-8424-6EEB31C649A9}"
+EndProject
+Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "OpenMessage.Apache.Kafka", "src\OpenMessage.Apache.Kafka\OpenMessage.Apache.Kafka.csproj", "{92AD2CB7-A31F-41CD-9B0E-B7677EDACF2F}"
+EndProject
+Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "OpenMessage.Samples.Core", "samples\OpenMessage.Samples.Core\OpenMessage.Samples.Core.csproj", "{4C2BF517-296A-4559-B092-C5F639E66DD7}"
+EndProject
+Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "OpenMessage.Samples.Memory", "samples\OpenMessage.Samples.Memory\OpenMessage.Samples.Memory.csproj", "{75E4E6D9-D4EF-4C53-BE74-9CDA176ECDFC}"
EndProject
-Project("{8BB2217D-0F2D-49D1-97BC-3654ED321F3B}") = "OpenMessage.Tests", "tests\OpenMessage.Tests\OpenMessage.Tests.xproj", "{D5C758B5-055C-41F0-A339-C8E4141076EB}"
+Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "OpenMessage.Serializer.Hyperion", "src\OpenMessage.Serializer.Hyperion\OpenMessage.Serializer.Hyperion.csproj", "{8C796D60-D79F-46D5-8CE9-8372CCF5F317}"
EndProject
-Project("{8BB2217D-0F2D-49D1-97BC-3654ED321F3B}") = "OpenMessage.Serializer.JsonNet", "src\OpenMessage.Serializer.JsonNet\OpenMessage.Serializer.JsonNet.xproj", "{3AB720E9-27B4-4A28-9C1F-B89EA1A4257F}"
+Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "OpenMessage.Serializer.Jil", "src\OpenMessage.Serializer.Jil\OpenMessage.Serializer.Jil.csproj", "{5F633833-217B-429D-86F1-8A2327751144}"
EndProject
-Project("{8BB2217D-0F2D-49D1-97BC-3654ED321F3B}") = "OpenMessage.Serializer.Jil", "src\OpenMessage.Serializer.Jil\OpenMessage.Serializer.Jil.xproj", "{E70C427C-7285-48AC-A13B-D18519463786}"
+Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "OpenMessage.Serializer.JsonDotNet", "src\OpenMessage.Serializer.JsonDotNet\OpenMessage.Serializer.JsonDotNet.csproj", "{5448B5C6-8C3A-4B71-8789-4AA8880066C4}"
EndProject
-Project("{8BB2217D-0F2D-49D1-97BC-3654ED321F3B}") = "OpenMessage.Serializer.ProtobufNet", "src\OpenMessage.Serializer.ProtobufNet\OpenMessage.Serializer.ProtobufNet.xproj", "{987E4E03-FF13-496A-9B72-BC2ADF3A35D2}"
+Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "OpenMessage.Serializer.MessagePack", "src\OpenMessage.Serializer.MessagePack\OpenMessage.Serializer.MessagePack.csproj", "{ED4255C2-9EAA-4EE4-AA71-EA7C219FC09E}"
EndProject
-Project("{8BB2217D-0F2D-49D1-97BC-3654ED321F3B}") = "OpenMessage.Providers.Azure", "src\OpenMessage.Providers.Azure\OpenMessage.Providers.Azure.xproj", "{66F710F3-460A-450B-8B07-9CD1476E5CF3}"
+Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "OpenMessage.Serializer.MsgPackCli", "src\OpenMessage.Serializer.MsgPackCli\OpenMessage.Serializer.MsgPackCli.csproj", "{00411544-72BC-4569-9EBD-491362002F96}"
EndProject
-Project("{8BB2217D-0F2D-49D1-97BC-3654ED321F3B}") = "OpenMessage.Providers.Azure.Tests", "tests\OpenMessage.Providers.Azure.Tests\OpenMessage.Providers.Azure.Tests.xproj", "{48B14E17-6FE3-4B0D-BACC-966D4273C07B}"
+Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "OpenMessage.Serializer.Protobuf", "src\OpenMessage.Serializer.Protobuf\OpenMessage.Serializer.Protobuf.csproj", "{A4A83F41-2CA1-4D90-B1F1-9EBEBC90B00D}"
+EndProject
+Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "OpenMessage.Serializer.ServiceStackJson", "src\OpenMessage.Serializer.ServiceStackJson\OpenMessage.Serializer.ServiceStackJson.csproj", "{62E00C13-154C-4ACC-962B-A0DCD06522E0}"
+EndProject
+Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "OpenMessage.Serializer.Utf8Json", "src\OpenMessage.Serializer.Utf8Json\OpenMessage.Serializer.Utf8Json.csproj", "{F36E3B9C-E14D-4F54-B7B7-FC9D54320A71}"
+EndProject
+Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "OpenMessage.Serializer.Wire", "src\OpenMessage.Serializer.Wire\OpenMessage.Serializer.Wire.csproj", "{69CDC76A-75D0-4B77-80D4-AD09D19AD2F1}"
+EndProject
+Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "OpenMessage.Samples.Kafka", "samples\OpenMessage.Samples.Kafka\OpenMessage.Samples.Kafka.csproj", "{058D8B78-1999-427E-A12E-D09785EDA977}"
+EndProject
+Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "OpenMessage.EventStore", "src\OpenMessage.EventStore\OpenMessage.EventStore.csproj", "{6D2199AB-4445-413A-8FE9-0E1898BA39F0}"
+EndProject
+Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "OpenMessage.MediatR", "src\OpenMessage.MediatR\OpenMessage.MediatR.csproj", "{21B732EF-AE9A-4156-B7C7-D5AF87753430}"
+EndProject
+Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "OpenMessage.AWS.SNS", "src\OpenMessage.AWS.SNS\OpenMessage.AWS.SNS.csproj", "{73607B4A-82A7-4E12-8488-C7E1A1B58EC4}"
+EndProject
+Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "OpenMessage.Redis", "src\OpenMessage.Redis\OpenMessage.Redis.csproj", "{0D0AD4AE-EA78-4342-8E78-E572EBC2DDC4}"
+EndProject
+Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "OpenMessage.Samples.AWS", "samples\OpenMessage.Samples.AWS\OpenMessage.Samples.AWS.csproj", "{8F9B755C-8AD8-4B90-A2D9-8F870309CA6B}"
+EndProject
+Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "OpenMessage.Samples.Setup", "samples\OpenMessage.Samples.Setup\OpenMessage.Samples.Setup.csproj", "{BF9B5246-3F07-4583-8793-7732F13CF5ED}"
+EndProject
+Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "OpenMessage.Tests", "tests\OpenMessage.Tests\OpenMessage.Tests.csproj", "{EB0F85A1-258F-45B2-8C76-BD786F717250}"
+EndProject
+Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "OpenMessage.MediatR.Tests", "tests\OpenMessage.MediatR.Tests\OpenMessage.MediatR.Tests.csproj", "{FD4CBC75-B898-46B9-B58D-A30228273F5D}"
+EndProject
+Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "OpenMessage.Testing", "src\OpenMessage.Testing\OpenMessage.Testing.csproj", "{DA130D21-63D3-4457-98CA-CA99A351EFA2}"
+EndProject
+Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "OpenMessage.Testing.Tests", "tests\OpenMessage.Testing.Tests\OpenMessage.Testing.Tests.csproj", "{86D9756B-65B2-46CA-9626-F439B387C7CB}"
+EndProject
+Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "OpenMessage.Polly", "src\OpenMessage.Polly\OpenMessage.Polly.csproj", "{3942DBA5-5FCC-4337-8FC7-C026DA054611}"
+EndProject
+Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "workflows", "workflows", "{9BA7DF3A-585E-429B-9129-0188FD4F7DB7}"
+ProjectSection(SolutionItems) = preProject
+ .github\workflows\ci-pipeline.yml = .github\workflows\ci-pipeline.yml
+ .github\workflows\release-builds.yml = .github\workflows\release-builds.yml
+EndProjectSection
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
@@ -36,41 +107,159 @@ Global
{7D68A283-6D90-4A50-B015-D8C2BB5C7184}.Debug|Any CPU.Build.0 = Debug|Any CPU
{7D68A283-6D90-4A50-B015-D8C2BB5C7184}.Release|Any CPU.ActiveCfg = Release|Any CPU
{7D68A283-6D90-4A50-B015-D8C2BB5C7184}.Release|Any CPU.Build.0 = Release|Any CPU
- {D5C758B5-055C-41F0-A339-C8E4141076EB}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
- {D5C758B5-055C-41F0-A339-C8E4141076EB}.Debug|Any CPU.Build.0 = Debug|Any CPU
- {D5C758B5-055C-41F0-A339-C8E4141076EB}.Release|Any CPU.ActiveCfg = Release|Any CPU
- {D5C758B5-055C-41F0-A339-C8E4141076EB}.Release|Any CPU.Build.0 = Release|Any CPU
- {3AB720E9-27B4-4A28-9C1F-B89EA1A4257F}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
- {3AB720E9-27B4-4A28-9C1F-B89EA1A4257F}.Debug|Any CPU.Build.0 = Debug|Any CPU
- {3AB720E9-27B4-4A28-9C1F-B89EA1A4257F}.Release|Any CPU.ActiveCfg = Release|Any CPU
- {3AB720E9-27B4-4A28-9C1F-B89EA1A4257F}.Release|Any CPU.Build.0 = Release|Any CPU
- {E70C427C-7285-48AC-A13B-D18519463786}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
- {E70C427C-7285-48AC-A13B-D18519463786}.Debug|Any CPU.Build.0 = Debug|Any CPU
- {E70C427C-7285-48AC-A13B-D18519463786}.Release|Any CPU.ActiveCfg = Release|Any CPU
- {E70C427C-7285-48AC-A13B-D18519463786}.Release|Any CPU.Build.0 = Release|Any CPU
- {987E4E03-FF13-496A-9B72-BC2ADF3A35D2}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
- {987E4E03-FF13-496A-9B72-BC2ADF3A35D2}.Debug|Any CPU.Build.0 = Debug|Any CPU
- {987E4E03-FF13-496A-9B72-BC2ADF3A35D2}.Release|Any CPU.ActiveCfg = Release|Any CPU
- {987E4E03-FF13-496A-9B72-BC2ADF3A35D2}.Release|Any CPU.Build.0 = Release|Any CPU
- {66F710F3-460A-450B-8B07-9CD1476E5CF3}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
- {66F710F3-460A-450B-8B07-9CD1476E5CF3}.Debug|Any CPU.Build.0 = Debug|Any CPU
- {66F710F3-460A-450B-8B07-9CD1476E5CF3}.Release|Any CPU.ActiveCfg = Release|Any CPU
- {66F710F3-460A-450B-8B07-9CD1476E5CF3}.Release|Any CPU.Build.0 = Release|Any CPU
- {48B14E17-6FE3-4B0D-BACC-966D4273C07B}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
- {48B14E17-6FE3-4B0D-BACC-966D4273C07B}.Debug|Any CPU.Build.0 = Debug|Any CPU
- {48B14E17-6FE3-4B0D-BACC-966D4273C07B}.Release|Any CPU.ActiveCfg = Release|Any CPU
- {48B14E17-6FE3-4B0D-BACC-966D4273C07B}.Release|Any CPU.Build.0 = Release|Any CPU
+ {46AD3311-B953-49F4-8B7B-9E9AE5E368F7}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
+ {46AD3311-B953-49F4-8B7B-9E9AE5E368F7}.Debug|Any CPU.Build.0 = Debug|Any CPU
+ {46AD3311-B953-49F4-8B7B-9E9AE5E368F7}.Release|Any CPU.ActiveCfg = Release|Any CPU
+ {46AD3311-B953-49F4-8B7B-9E9AE5E368F7}.Release|Any CPU.Build.0 = Release|Any CPU
+ {D2EBDC48-95C2-4EBD-9F07-5E3379ECDCF9}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
+ {D2EBDC48-95C2-4EBD-9F07-5E3379ECDCF9}.Debug|Any CPU.Build.0 = Debug|Any CPU
+ {D2EBDC48-95C2-4EBD-9F07-5E3379ECDCF9}.Release|Any CPU.ActiveCfg = Release|Any CPU
+ {D2EBDC48-95C2-4EBD-9F07-5E3379ECDCF9}.Release|Any CPU.Build.0 = Release|Any CPU
+ {D2AF2B8F-947B-447E-BC30-B13ECD28A40E}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
+ {D2AF2B8F-947B-447E-BC30-B13ECD28A40E}.Debug|Any CPU.Build.0 = Debug|Any CPU
+ {D2AF2B8F-947B-447E-BC30-B13ECD28A40E}.Release|Any CPU.ActiveCfg = Release|Any CPU
+ {D2AF2B8F-947B-447E-BC30-B13ECD28A40E}.Release|Any CPU.Build.0 = Release|Any CPU
+ {46A8FD07-E04E-48F0-8D38-5B61320A3CB8}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
+ {46A8FD07-E04E-48F0-8D38-5B61320A3CB8}.Debug|Any CPU.Build.0 = Debug|Any CPU
+ {46A8FD07-E04E-48F0-8D38-5B61320A3CB8}.Release|Any CPU.ActiveCfg = Release|Any CPU
+ {46A8FD07-E04E-48F0-8D38-5B61320A3CB8}.Release|Any CPU.Build.0 = Release|Any CPU
+ {ECF700A0-BC08-4796-8424-6EEB31C649A9}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
+ {ECF700A0-BC08-4796-8424-6EEB31C649A9}.Debug|Any CPU.Build.0 = Debug|Any CPU
+ {ECF700A0-BC08-4796-8424-6EEB31C649A9}.Release|Any CPU.ActiveCfg = Release|Any CPU
+ {ECF700A0-BC08-4796-8424-6EEB31C649A9}.Release|Any CPU.Build.0 = Release|Any CPU
+ {92AD2CB7-A31F-41CD-9B0E-B7677EDACF2F}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
+ {92AD2CB7-A31F-41CD-9B0E-B7677EDACF2F}.Debug|Any CPU.Build.0 = Debug|Any CPU
+ {92AD2CB7-A31F-41CD-9B0E-B7677EDACF2F}.Release|Any CPU.ActiveCfg = Release|Any CPU
+ {92AD2CB7-A31F-41CD-9B0E-B7677EDACF2F}.Release|Any CPU.Build.0 = Release|Any CPU
+ {4C2BF517-296A-4559-B092-C5F639E66DD7}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
+ {4C2BF517-296A-4559-B092-C5F639E66DD7}.Debug|Any CPU.Build.0 = Debug|Any CPU
+ {4C2BF517-296A-4559-B092-C5F639E66DD7}.Release|Any CPU.ActiveCfg = Release|Any CPU
+ {4C2BF517-296A-4559-B092-C5F639E66DD7}.Release|Any CPU.Build.0 = Release|Any CPU
+ {75E4E6D9-D4EF-4C53-BE74-9CDA176ECDFC}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
+ {75E4E6D9-D4EF-4C53-BE74-9CDA176ECDFC}.Debug|Any CPU.Build.0 = Debug|Any CPU
+ {75E4E6D9-D4EF-4C53-BE74-9CDA176ECDFC}.Release|Any CPU.ActiveCfg = Release|Any CPU
+ {75E4E6D9-D4EF-4C53-BE74-9CDA176ECDFC}.Release|Any CPU.Build.0 = Release|Any CPU
+ {8C796D60-D79F-46D5-8CE9-8372CCF5F317}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
+ {8C796D60-D79F-46D5-8CE9-8372CCF5F317}.Debug|Any CPU.Build.0 = Debug|Any CPU
+ {8C796D60-D79F-46D5-8CE9-8372CCF5F317}.Release|Any CPU.ActiveCfg = Release|Any CPU
+ {8C796D60-D79F-46D5-8CE9-8372CCF5F317}.Release|Any CPU.Build.0 = Release|Any CPU
+ {5F633833-217B-429D-86F1-8A2327751144}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
+ {5F633833-217B-429D-86F1-8A2327751144}.Debug|Any CPU.Build.0 = Debug|Any CPU
+ {5F633833-217B-429D-86F1-8A2327751144}.Release|Any CPU.ActiveCfg = Release|Any CPU
+ {5F633833-217B-429D-86F1-8A2327751144}.Release|Any CPU.Build.0 = Release|Any CPU
+ {5448B5C6-8C3A-4B71-8789-4AA8880066C4}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
+ {5448B5C6-8C3A-4B71-8789-4AA8880066C4}.Debug|Any CPU.Build.0 = Debug|Any CPU
+ {5448B5C6-8C3A-4B71-8789-4AA8880066C4}.Release|Any CPU.ActiveCfg = Release|Any CPU
+ {5448B5C6-8C3A-4B71-8789-4AA8880066C4}.Release|Any CPU.Build.0 = Release|Any CPU
+ {ED4255C2-9EAA-4EE4-AA71-EA7C219FC09E}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
+ {ED4255C2-9EAA-4EE4-AA71-EA7C219FC09E}.Debug|Any CPU.Build.0 = Debug|Any CPU
+ {ED4255C2-9EAA-4EE4-AA71-EA7C219FC09E}.Release|Any CPU.ActiveCfg = Release|Any CPU
+ {ED4255C2-9EAA-4EE4-AA71-EA7C219FC09E}.Release|Any CPU.Build.0 = Release|Any CPU
+ {00411544-72BC-4569-9EBD-491362002F96}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
+ {00411544-72BC-4569-9EBD-491362002F96}.Debug|Any CPU.Build.0 = Debug|Any CPU
+ {00411544-72BC-4569-9EBD-491362002F96}.Release|Any CPU.ActiveCfg = Release|Any CPU
+ {00411544-72BC-4569-9EBD-491362002F96}.Release|Any CPU.Build.0 = Release|Any CPU
+ {A4A83F41-2CA1-4D90-B1F1-9EBEBC90B00D}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
+ {A4A83F41-2CA1-4D90-B1F1-9EBEBC90B00D}.Debug|Any CPU.Build.0 = Debug|Any CPU
+ {A4A83F41-2CA1-4D90-B1F1-9EBEBC90B00D}.Release|Any CPU.ActiveCfg = Release|Any CPU
+ {A4A83F41-2CA1-4D90-B1F1-9EBEBC90B00D}.Release|Any CPU.Build.0 = Release|Any CPU
+ {62E00C13-154C-4ACC-962B-A0DCD06522E0}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
+ {62E00C13-154C-4ACC-962B-A0DCD06522E0}.Debug|Any CPU.Build.0 = Debug|Any CPU
+ {62E00C13-154C-4ACC-962B-A0DCD06522E0}.Release|Any CPU.ActiveCfg = Release|Any CPU
+ {62E00C13-154C-4ACC-962B-A0DCD06522E0}.Release|Any CPU.Build.0 = Release|Any CPU
+ {F36E3B9C-E14D-4F54-B7B7-FC9D54320A71}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
+ {F36E3B9C-E14D-4F54-B7B7-FC9D54320A71}.Debug|Any CPU.Build.0 = Debug|Any CPU
+ {F36E3B9C-E14D-4F54-B7B7-FC9D54320A71}.Release|Any CPU.ActiveCfg = Release|Any CPU
+ {F36E3B9C-E14D-4F54-B7B7-FC9D54320A71}.Release|Any CPU.Build.0 = Release|Any CPU
+ {69CDC76A-75D0-4B77-80D4-AD09D19AD2F1}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
+ {69CDC76A-75D0-4B77-80D4-AD09D19AD2F1}.Debug|Any CPU.Build.0 = Debug|Any CPU
+ {69CDC76A-75D0-4B77-80D4-AD09D19AD2F1}.Release|Any CPU.ActiveCfg = Release|Any CPU
+ {69CDC76A-75D0-4B77-80D4-AD09D19AD2F1}.Release|Any CPU.Build.0 = Release|Any CPU
+ {058D8B78-1999-427E-A12E-D09785EDA977}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
+ {058D8B78-1999-427E-A12E-D09785EDA977}.Debug|Any CPU.Build.0 = Debug|Any CPU
+ {058D8B78-1999-427E-A12E-D09785EDA977}.Release|Any CPU.ActiveCfg = Release|Any CPU
+ {058D8B78-1999-427E-A12E-D09785EDA977}.Release|Any CPU.Build.0 = Release|Any CPU
+ {6D2199AB-4445-413A-8FE9-0E1898BA39F0}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
+ {6D2199AB-4445-413A-8FE9-0E1898BA39F0}.Debug|Any CPU.Build.0 = Debug|Any CPU
+ {6D2199AB-4445-413A-8FE9-0E1898BA39F0}.Release|Any CPU.ActiveCfg = Release|Any CPU
+ {6D2199AB-4445-413A-8FE9-0E1898BA39F0}.Release|Any CPU.Build.0 = Release|Any CPU
+ {21B732EF-AE9A-4156-B7C7-D5AF87753430}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
+ {21B732EF-AE9A-4156-B7C7-D5AF87753430}.Debug|Any CPU.Build.0 = Debug|Any CPU
+ {21B732EF-AE9A-4156-B7C7-D5AF87753430}.Release|Any CPU.ActiveCfg = Release|Any CPU
+ {21B732EF-AE9A-4156-B7C7-D5AF87753430}.Release|Any CPU.Build.0 = Release|Any CPU
+ {73607B4A-82A7-4E12-8488-C7E1A1B58EC4}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
+ {73607B4A-82A7-4E12-8488-C7E1A1B58EC4}.Debug|Any CPU.Build.0 = Debug|Any CPU
+ {73607B4A-82A7-4E12-8488-C7E1A1B58EC4}.Release|Any CPU.ActiveCfg = Release|Any CPU
+ {73607B4A-82A7-4E12-8488-C7E1A1B58EC4}.Release|Any CPU.Build.0 = Release|Any CPU
+ {0D0AD4AE-EA78-4342-8E78-E572EBC2DDC4}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
+ {0D0AD4AE-EA78-4342-8E78-E572EBC2DDC4}.Debug|Any CPU.Build.0 = Debug|Any CPU
+ {0D0AD4AE-EA78-4342-8E78-E572EBC2DDC4}.Release|Any CPU.ActiveCfg = Release|Any CPU
+ {0D0AD4AE-EA78-4342-8E78-E572EBC2DDC4}.Release|Any CPU.Build.0 = Release|Any CPU
+ {8F9B755C-8AD8-4B90-A2D9-8F870309CA6B}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
+ {8F9B755C-8AD8-4B90-A2D9-8F870309CA6B}.Debug|Any CPU.Build.0 = Debug|Any CPU
+ {8F9B755C-8AD8-4B90-A2D9-8F870309CA6B}.Release|Any CPU.ActiveCfg = Release|Any CPU
+ {8F9B755C-8AD8-4B90-A2D9-8F870309CA6B}.Release|Any CPU.Build.0 = Release|Any CPU
+ {BF9B5246-3F07-4583-8793-7732F13CF5ED}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
+ {BF9B5246-3F07-4583-8793-7732F13CF5ED}.Debug|Any CPU.Build.0 = Debug|Any CPU
+ {BF9B5246-3F07-4583-8793-7732F13CF5ED}.Release|Any CPU.ActiveCfg = Release|Any CPU
+ {BF9B5246-3F07-4583-8793-7732F13CF5ED}.Release|Any CPU.Build.0 = Release|Any CPU
+ {EB0F85A1-258F-45B2-8C76-BD786F717250}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
+ {EB0F85A1-258F-45B2-8C76-BD786F717250}.Debug|Any CPU.Build.0 = Debug|Any CPU
+ {EB0F85A1-258F-45B2-8C76-BD786F717250}.Release|Any CPU.ActiveCfg = Release|Any CPU
+ {EB0F85A1-258F-45B2-8C76-BD786F717250}.Release|Any CPU.Build.0 = Release|Any CPU
+ {FD4CBC75-B898-46B9-B58D-A30228273F5D}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
+ {FD4CBC75-B898-46B9-B58D-A30228273F5D}.Debug|Any CPU.Build.0 = Debug|Any CPU
+ {FD4CBC75-B898-46B9-B58D-A30228273F5D}.Release|Any CPU.ActiveCfg = Release|Any CPU
+ {FD4CBC75-B898-46B9-B58D-A30228273F5D}.Release|Any CPU.Build.0 = Release|Any CPU
+ {DA130D21-63D3-4457-98CA-CA99A351EFA2}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
+ {DA130D21-63D3-4457-98CA-CA99A351EFA2}.Debug|Any CPU.Build.0 = Debug|Any CPU
+ {DA130D21-63D3-4457-98CA-CA99A351EFA2}.Release|Any CPU.ActiveCfg = Release|Any CPU
+ {DA130D21-63D3-4457-98CA-CA99A351EFA2}.Release|Any CPU.Build.0 = Release|Any CPU
+ {86D9756B-65B2-46CA-9626-F439B387C7CB}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
+ {86D9756B-65B2-46CA-9626-F439B387C7CB}.Debug|Any CPU.Build.0 = Debug|Any CPU
+ {86D9756B-65B2-46CA-9626-F439B387C7CB}.Release|Any CPU.ActiveCfg = Release|Any CPU
+ {86D9756B-65B2-46CA-9626-F439B387C7CB}.Release|Any CPU.Build.0 = Release|Any CPU
+ {3942DBA5-5FCC-4337-8FC7-C026DA054611}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
+ {3942DBA5-5FCC-4337-8FC7-C026DA054611}.Debug|Any CPU.Build.0 = Debug|Any CPU
+ {3942DBA5-5FCC-4337-8FC7-C026DA054611}.Release|Any CPU.ActiveCfg = Release|Any CPU
+ {3942DBA5-5FCC-4337-8FC7-C026DA054611}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
EndGlobalSection
GlobalSection(NestedProjects) = preSolution
{7D68A283-6D90-4A50-B015-D8C2BB5C7184} = {E8751802-CCE4-4C40-9241-E5E01DBEF627}
- {D5C758B5-055C-41F0-A339-C8E4141076EB} = {B4E58D4F-40CD-45CA-AC6F-1DF4D9CB9A49}
- {3AB720E9-27B4-4A28-9C1F-B89EA1A4257F} = {E8751802-CCE4-4C40-9241-E5E01DBEF627}
- {E70C427C-7285-48AC-A13B-D18519463786} = {E8751802-CCE4-4C40-9241-E5E01DBEF627}
- {987E4E03-FF13-496A-9B72-BC2ADF3A35D2} = {E8751802-CCE4-4C40-9241-E5E01DBEF627}
- {66F710F3-460A-450B-8B07-9CD1476E5CF3} = {E8751802-CCE4-4C40-9241-E5E01DBEF627}
- {48B14E17-6FE3-4B0D-BACC-966D4273C07B} = {B4E58D4F-40CD-45CA-AC6F-1DF4D9CB9A49}
+ {46AD3311-B953-49F4-8B7B-9E9AE5E368F7} = {E8751802-CCE4-4C40-9241-E5E01DBEF627}
+ {D2EBDC48-95C2-4EBD-9F07-5E3379ECDCF9} = {E8751802-CCE4-4C40-9241-E5E01DBEF627}
+ {D2AF2B8F-947B-447E-BC30-B13ECD28A40E} = {E8751802-CCE4-4C40-9241-E5E01DBEF627}
+ {46A8FD07-E04E-48F0-8D38-5B61320A3CB8} = {E8751802-CCE4-4C40-9241-E5E01DBEF627}
+ {ECF700A0-BC08-4796-8424-6EEB31C649A9} = {E8751802-CCE4-4C40-9241-E5E01DBEF627}
+ {92AD2CB7-A31F-41CD-9B0E-B7677EDACF2F} = {E8751802-CCE4-4C40-9241-E5E01DBEF627}
+ {4C2BF517-296A-4559-B092-C5F639E66DD7} = {B4CC2690-ADC2-4B78-A961-06C0D6CD948F}
+ {75E4E6D9-D4EF-4C53-BE74-9CDA176ECDFC} = {B4CC2690-ADC2-4B78-A961-06C0D6CD948F}
+ {8C796D60-D79F-46D5-8CE9-8372CCF5F317} = {E8751802-CCE4-4C40-9241-E5E01DBEF627}
+ {5F633833-217B-429D-86F1-8A2327751144} = {E8751802-CCE4-4C40-9241-E5E01DBEF627}
+ {5448B5C6-8C3A-4B71-8789-4AA8880066C4} = {E8751802-CCE4-4C40-9241-E5E01DBEF627}
+ {ED4255C2-9EAA-4EE4-AA71-EA7C219FC09E} = {E8751802-CCE4-4C40-9241-E5E01DBEF627}
+ {00411544-72BC-4569-9EBD-491362002F96} = {E8751802-CCE4-4C40-9241-E5E01DBEF627}
+ {A4A83F41-2CA1-4D90-B1F1-9EBEBC90B00D} = {E8751802-CCE4-4C40-9241-E5E01DBEF627}
+ {62E00C13-154C-4ACC-962B-A0DCD06522E0} = {E8751802-CCE4-4C40-9241-E5E01DBEF627}
+ {F36E3B9C-E14D-4F54-B7B7-FC9D54320A71} = {E8751802-CCE4-4C40-9241-E5E01DBEF627}
+ {69CDC76A-75D0-4B77-80D4-AD09D19AD2F1} = {E8751802-CCE4-4C40-9241-E5E01DBEF627}
+ {058D8B78-1999-427E-A12E-D09785EDA977} = {B4CC2690-ADC2-4B78-A961-06C0D6CD948F}
+ {6D2199AB-4445-413A-8FE9-0E1898BA39F0} = {E8751802-CCE4-4C40-9241-E5E01DBEF627}
+ {21B732EF-AE9A-4156-B7C7-D5AF87753430} = {E8751802-CCE4-4C40-9241-E5E01DBEF627}
+ {73607B4A-82A7-4E12-8488-C7E1A1B58EC4} = {E8751802-CCE4-4C40-9241-E5E01DBEF627}
+ {0D0AD4AE-EA78-4342-8E78-E572EBC2DDC4} = {E8751802-CCE4-4C40-9241-E5E01DBEF627}
+ {8F9B755C-8AD8-4B90-A2D9-8F870309CA6B} = {B4CC2690-ADC2-4B78-A961-06C0D6CD948F}
+ {BF9B5246-3F07-4583-8793-7732F13CF5ED} = {B4CC2690-ADC2-4B78-A961-06C0D6CD948F}
+ {EB0F85A1-258F-45B2-8C76-BD786F717250} = {B4E58D4F-40CD-45CA-AC6F-1DF4D9CB9A49}
+ {FD4CBC75-B898-46B9-B58D-A30228273F5D} = {B4E58D4F-40CD-45CA-AC6F-1DF4D9CB9A49}
+ {DA130D21-63D3-4457-98CA-CA99A351EFA2} = {E8751802-CCE4-4C40-9241-E5E01DBEF627}
+ {86D9756B-65B2-46CA-9626-F439B387C7CB} = {B4E58D4F-40CD-45CA-AC6F-1DF4D9CB9A49}
+ {3942DBA5-5FCC-4337-8FC7-C026DA054611} = {E8751802-CCE4-4C40-9241-E5E01DBEF627}
+ EndGlobalSection
+ GlobalSection(ExtensibilityGlobals) = postSolution
+ SolutionGuid = {C6B88150-594B-4719-8B95-AF622870727B}
EndGlobalSection
EndGlobal
diff --git a/docker-compose.yml b/docker-compose.yml
new file mode 100644
index 0000000..ca083ee
--- /dev/null
+++ b/docker-compose.yml
@@ -0,0 +1,102 @@
+version: '3.5'
+
+services:
+ eventstore:
+ image: eventstore/eventstore:latest
+ hostname: eventstore
+ container_name: eventstore
+ restart: always
+ networks: [ "OpenMessage" ]
+ ports:
+ - 1113:1113
+ - 2112:2112
+ - 2113:2113
+ environment:
+ EVENTSTORE_CLUSTER_DNS: eventstore
+ EVENTSTORE_CLUSTER_SIZE: 1
+ EVENTSTORE_CLUSTER_GOSSIP_PORT: 2112
+
+ zookeeper:
+ image: confluentinc/cp-zookeeper:latest
+ hostname: zookeeper
+ container_name: zookeeper
+ restart: always
+ networks: [ "OpenMessage" ]
+ environment:
+ ZOOKEEPER_CLIENT_PORT: 2181
+ ZOOKEEPER_TICK_TIME: 2000
+
+ kafka:
+ image: confluentinc/cp-enterprise-kafka:latest
+ hostname: kafka
+ container_name: kafka
+ restart: always
+ networks: [ "OpenMessage" ]
+ depends_on:
+ - zookeeper
+ ports:
+ - "9092:9092"
+ environment:
+ KAFKA_BROKER_ID: 1
+ KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
+ KAFKA_DELETE_TOPIC_ENABLE: "true"
+ KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
+ KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
+ KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
+
+ rabbit:
+ image: rabbitmq:3-management-alpine
+ hostname: rabbitmq
+ container_name: rabbitmq
+ restart: always
+ networks: [ "OpenMessage" ]
+ ports:
+ - 4369:4369
+ - 5671:5671
+ - 5672:5672
+ - 15672:15672
+ - 25672:25672
+
+ redis:
+ image: redis:alpine
+ hostname: redis
+ container_name: redis
+ restart: always
+ networks: [ "OpenMessage" ]
+ ports:
+ - 6379:6379
+
+ redisUI:
+ image: rediscommander/redis-commander
+ hostname: redisui
+ container_name: redisui
+ restart: always
+ networks: [ "OpenMessage" ]
+ depends_on:
+ - redis
+ environment:
+ - REDIS_HOSTS=local:redis:6379
+ ports:
+ - 8081:8081
+
+ localstack:
+ image: localstack/localstack
+ hostname: localstack
+ container_name: localstack
+ restart: always
+ networks: [ "OpenMessage" ]
+ environment:
+ SERVICES: "sns,sqs"
+ AWS_DEFAULT_REGION: eu-west-2
+ PORT_WEB_UI: 8082
+ AWS_ACCESS_KEY_ID: XXX
+ AWS_SECRET_ACCESS_KEY: XXX
+ DEBUG: 1
+ ports:
+ - 4575:4575
+ - 4576:4576
+ - 8082:8082
+
+networks:
+ OpenMessage:
+ driver: bridge
\ No newline at end of file
diff --git a/global.json b/global.json
deleted file mode 100644
index 37db865..0000000
--- a/global.json
+++ /dev/null
@@ -1,6 +0,0 @@
-{
- "projects": [ "src", "tests" ],
- "sdk": {
- "version": "1.0.0-preview2-003121"
- }
-}
diff --git a/readme.md b/readme.md
index b455b2b..dc3692c 100644
--- a/readme.md
+++ b/readme.md
@@ -1,86 +1,97 @@
-#OpenMessage
-
-Master Branch: 
-
-Dev Branch: 
+# OpenMessage
OpenMessage aims to simplify the service bus paradigm by using pre-existing patterns to build an extensible architecture.
-##Getting Started
+Designed for the generic hosting model that .Net Core 3 supports, the library aims to be able to cater for a wide range of scenarios, including receiving the same type from multiple sources - aiding a whole host of scenarios.
-The library is based around the `Microsoft.Extensions.*` packages and relies upon the abstractions for depenency injection and logging allowing you the freedom to pick the implementations that best suit your scenario.
+The core library `OpenMessage` ships with an InMemory provider and a JSON serializer from the AspNetCore team (`System.Text.Json`).
-Assuming you want to connect to Azure Service Bus, here is how you configure OpenMessage:
+## Getting Started
-1 - Add the provider:
+The library is based around the `Microsoft.Extensions.*` packages and relies upon the abstractions for dependency injection and logging allowing you the freedom to pick the implementations that best suit your scenario.
- PM> Install-Package OpenMessage.Providers.Azure
+_Note: The rest of this guide requires you to be using version 3 of `Microsoft.Extensions.*`._
-2 - Add the serializer (or write your own):
+1 - Install the `OpenMessage` package:
- PM> Install-Package OpenMessage.Serializer.JsonNet
-
-3 - Add the services to your service collection:
+> PM> Install-Package OpenMessage
- using OpenMessage;
- using OpenMessage.Providers.Azure.Configuration;
- using OpenMessage.Serializer.JsonNet;
-
- ...
-
- IServiceCollection AddServices(IServiceCollection services)
- {
- return services
- .AddOpenMessage()
- .AddJsonNetSerializer()
- .Configure(config => {
- config.ConnectionString = "YOUR CONNECTION STRING HERE");
- });
- }
-
-###Sending Messages
+_You may also any of the providers listed below for this sample as the Memory provider ships out of the box._
-4 - Add either a Queue/Topic dispatcher to the service collection:
+2 - Configure your host:
- IServiceCollection AddQueueBindings(IServiceCollection services)
+ internal class Program
{
- return services.AddQueueDispatcher();
+ private static async Task Main()
+ {
+ await Host.CreateDefaultBuilder()
+ .ConfigureServices(services => services.AddOptions().AddLogging())
+ // Configure OpenMessage
+ .ConfigureMessaging(host =>
+ {
+ // Adds a memory based consumer and dispatcher
+ host.ConfigureMemory();
+
+ // Adds a handler that writes the entire message in json format to the console
+ host.ConfigureHandler(msg => Console.WriteLine($"Hello {msg.Value.Name}"));
+ })
+ .Build()
+ .RunAsync();
+ }
}
-5 - Inject an `IDispatcher` into your class of choice:
+### Sending Messages
- internal class CommandGenerator
- {
- private readonly IDispatcher _dispatcher;
-
- public CommandGenerator(IDispatcher dispatcher)
- {
- _dispatcher = dispatcher;
- }
- }
+To send messages, inject `IDispatcher` and call `DispatchAsync` and the library will route your message to the configured dispatcher for that type.
-###Receiving Messages
+### Receiving Messages
-4 - Add either a Queue/Subscription observable to the service collection:
+When a message is received, it flows as follows:
- IServiceCollection AddQueueBindings(IServiceCollection services)
- {
- return services.AddQueueObservable();
- }
+> Message Pump > Channel > Consumer Pump > Pipeline > Handler
+
+This library takes care of everything except the handlers. You have a few choices for implementing a handler, all registered via `.ConfigureHandler`:
-5 - When done, resolve an `IEnumerable` from the service collection to begin receiving messages.
+1. Use a simple `Action>`
+2. Use a simple `Func, Task>`
+3. Inherit from `Handler`
+4. Implement `IHandler`
-##Serializers
+By default, after your handler has been run, and assuming the underlying provider supports it, the message is automatically acknowledged. This can be configured by calling `ConfigurePipelineOptions` as well as options for the consumer pump and handler timeout.
+
+## Serializers
You can add more than one serializer to OpenMessage. In this scenario, all registered serializers are checked to see whether they can deserialize the message. When serializing the last registered serializer is used, service collection provider depending.
-- [x] [Json.Net](http://www.nuget.org/packages/OpenMessage.Serializer.JsonNet/)
-- [x] [Jil](http://www.nuget.org/packages/OpenMessage.Serializer.Jil/)
-- [x] [Protobuf](http://www.nuget.org/packages/OpenMessage.Serializer.ProtobufNet/)
+Here is a list of the available serializers:
+
+- [x] Hyperion
+- [x] Jil
+- [x] JsonDotNet
+- [x] MessagePack
+- [x] MsgPack
+- [x] Protobuf
+- [x] ServiceStackJson
+- [x] Utf8Json
+- [x] Wire
+
+## Providers
+
+With OpenMessage you can easily receive from multiple sources in a centralised pipeline whilst providing as much of the underlying providers flexibility as possible.
-##Providers
+Here is a list of the available providers:
-- [x] [Azure Service Bus](http://www.nuget.org/packages/OpenMessage.Providers.Azure/)
+- [x] Apache Kafka
+- [x] AWS SQS
+- [x] AWS SNS
+- [ ] AWS Kinesis
+- [ ] AWS EventBridge
- [ ] Azure Event Hubs
-- [ ] In Memory
-- [ ] Rabbit MQ
\ No newline at end of file
+- [ ] Azure Service Bus
+- [ ] Eventstore
+- [x] InMemory
+- [x] MediatR
+- [ ] NATS
+- [ ] RabbitMq
+
+_Note: Any unchecked providers are currently a work in progress_.
diff --git a/samples/OpenMessage.Samples.AWS/OpenMessage.Samples.AWS.csproj b/samples/OpenMessage.Samples.AWS/OpenMessage.Samples.AWS.csproj
new file mode 100644
index 0000000..4bb3179
--- /dev/null
+++ b/samples/OpenMessage.Samples.AWS/OpenMessage.Samples.AWS.csproj
@@ -0,0 +1,14 @@
+
+
+
+ Exe
+ netcoreapp3.1
+
+
+
+
+
+
+
+
+
diff --git a/samples/OpenMessage.Samples.AWS/Program.cs b/samples/OpenMessage.Samples.AWS/Program.cs
new file mode 100644
index 0000000..4ceed8f
--- /dev/null
+++ b/samples/OpenMessage.Samples.AWS/Program.cs
@@ -0,0 +1,85 @@
+using Microsoft.Extensions.DependencyInjection;
+using Microsoft.Extensions.Hosting;
+using OpenMessage.AWS.SQS;
+using OpenMessage.Samples.Core.Models;
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Threading;
+using System.Threading.Tasks;
+
+namespace OpenMessage.Samples.AWS
+{
+ internal class Program
+ {
+ private static int _counter;
+
+ private static async Task Main(string[] args)
+ {
+ Environment.SetEnvironmentVariable("AWS_ACCESS_KEY_ID", "XXX", EnvironmentVariableTarget.Process);
+ Environment.SetEnvironmentVariable("AWS_SECRET_ACCESS_KEY", "XXX", EnvironmentVariableTarget.Process);
+ Environment.SetEnvironmentVariable("AWS_SESSION_TOKEN", "XXX", EnvironmentVariableTarget.Process);
+ Environment.SetEnvironmentVariable("AWS_DEFAULT_REGION", "eu-west-2", EnvironmentVariableTarget.Process);
+
+ var verbose = false;
+
+ await Host.CreateDefaultBuilder()
+ .ConfigureServices(services => services.AddOptions()
+ .AddLogging()
+ .AddSampleCore()
+ .AddMassProducerService() // Adds a producer that calls configured dispatcher
+ )
+ .ConfigureMessaging(host =>
+ {
+ // Adds a handler that writes to console every 100 messages
+ host.ConfigureHandler(msg =>
+ {
+ var counter = Interlocked.Increment(ref _counter);
+
+ if (verbose)
+ {
+ var properties = msg is ISupportProperties sp
+ ? sp.Properties
+ : Enumerable.Empty>();
+
+ Console.WriteLine($"Received: #{counter} Received: {DateTime.UtcNow} Properties: {string.Join(",", properties)}");
+ }
+ else if(counter % 100 == 1)
+ {
+ Console.WriteLine($"Received: #{counter}");
+ }
+ });
+
+ // Allow us to write to SNS
+ // host.ConfigureSnsDispatcher()
+ // .FromConfiguration(config =>
+ // {
+ // config.TopicArn = "arn:aws:sns:eu-west-2:000000000000:openmessage_samples_core_models_simplemodel";
+ // config.ServiceURL = "http://localhost:4575";
+ // })
+ // .Build();
+
+ // For testing the dispatchers
+ host.ConfigureSqsDispatcher()
+ .FromConfiguration(config =>
+ {
+ config.QueueUrl = "http://localhost:4576/000000000000/openmessage_samples_core_models_simplemodel.queue";
+ config.ServiceURL = "http://localhost:4576";
+ })
+ .WithBatchedDispatcher(true)
+ .Build();
+
+ // Consume from the same topic as we are writing to
+ host.ConfigureSqsConsumer()
+ .FromConfiguration(config =>
+ {
+ config.QueueUrl = "http://localhost:4576/000000000000/openmessage_samples_core_models_simplemodel.queue";
+ config.ServiceURL = "http://localhost:4576";
+ })
+ .Build();
+ })
+ .Build()
+ .RunAsync();
+ }
+ }
+}
\ No newline at end of file
diff --git a/samples/OpenMessage.Samples.Core/Extensions.cs b/samples/OpenMessage.Samples.Core/Extensions.cs
new file mode 100644
index 0000000..6571570
--- /dev/null
+++ b/samples/OpenMessage.Samples.Core/Extensions.cs
@@ -0,0 +1,18 @@
+using OpenMessage.Samples.Core.Services;
+
+namespace Microsoft.Extensions.DependencyInjection
+{
+ public static class Extensions
+ {
+ public static IServiceCollection AddSampleCore(this IServiceCollection services)
+ => services.AddHostedService();
+
+ public static IServiceCollection AddMassProducerService(this IServiceCollection services)
+ where T : class, new()
+ => services.AddHostedService>();
+
+ public static IServiceCollection AddProducerService(this IServiceCollection services)
+ where T : class, new()
+ => services.AddHostedService>();
+ }
+}
\ No newline at end of file
diff --git a/samples/OpenMessage.Samples.Core/Models/CoreModel.cs b/samples/OpenMessage.Samples.Core/Models/CoreModel.cs
new file mode 100644
index 0000000..29434f6
--- /dev/null
+++ b/samples/OpenMessage.Samples.Core/Models/CoreModel.cs
@@ -0,0 +1,9 @@
+using System;
+
+namespace OpenMessage.Samples.Core.Models
+{
+ public abstract class CoreModel
+ {
+ public DateTime CreatedAt { get; set; } = DateTime.UtcNow;
+ }
+}
\ No newline at end of file
diff --git a/samples/OpenMessage.Samples.Core/Models/SimpleModel.cs b/samples/OpenMessage.Samples.Core/Models/SimpleModel.cs
new file mode 100644
index 0000000..2870f94
--- /dev/null
+++ b/samples/OpenMessage.Samples.Core/Models/SimpleModel.cs
@@ -0,0 +1,16 @@
+using System;
+
+namespace OpenMessage.Samples.Core.Models
+{
+ public class SimpleModel : CoreModel
+ {
+ public string Property1 { get; set; } = Guid.NewGuid()
+ .ToString("n");
+
+ public string Property2 { get; set; } = Guid.NewGuid()
+ .ToString("n");
+
+ public string Property3 { get; set; } = Guid.NewGuid()
+ .ToString("n");
+ }
+}
\ No newline at end of file
diff --git a/samples/OpenMessage.Samples.Core/OpenMessage.Samples.Core.csproj b/samples/OpenMessage.Samples.Core/OpenMessage.Samples.Core.csproj
new file mode 100644
index 0000000..a57c244
--- /dev/null
+++ b/samples/OpenMessage.Samples.Core/OpenMessage.Samples.Core.csproj
@@ -0,0 +1,26 @@
+
+
+
+ netcoreapp3.1
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/samples/OpenMessage.Samples.Core/Services/DiagnosticService.cs b/samples/OpenMessage.Samples.Core/Services/DiagnosticService.cs
new file mode 100644
index 0000000..e4c640e
--- /dev/null
+++ b/samples/OpenMessage.Samples.Core/Services/DiagnosticService.cs
@@ -0,0 +1,55 @@
+using System.Collections.Generic;
+using System.Diagnostics;
+using System.Diagnostics.Tracing;
+using System.Threading;
+using System.Threading.Tasks;
+using Microsoft.Extensions.Hosting;
+
+namespace OpenMessage.Samples.Core.Services
+{
+ internal sealed class DiagnosticService : EventListener, IHostedService
+ {
+ private static readonly string EventCounterType = "CounterType";
+ private static readonly string EventCountEventName = "EventCounters";
+ private static readonly string EventName = "Name";
+ private static readonly string IncrementName = "Increment";
+ private static readonly string MeanType = "Mean";
+ private static readonly string SumType = "Sum";
+
+ protected override void OnEventWritten(EventWrittenEventArgs eventData)
+ {
+ if (eventData.EventName == EventCountEventName
+ && eventData.Payload?.Count > 0
+ && eventData.Payload[0] is IDictionary data
+ && data.TryGetValue(EventCounterType, out var counterType)
+ && data.TryGetValue(EventName, out var name))
+ {
+ if (name is null || counterType is null)
+ return;
+
+ var metricName = name.ToString();
+ var metricType = counterType.ToString();
+
+ if (SumType.Equals(metricType) && data.TryGetValue(IncrementName, out var increment))
+ {
+ Debug.WriteLine("{0}: {1}", metricName, increment);
+ }
+ else if (MeanType.Equals(metricType) && data.TryGetValue(MeanType, out var mean))
+ {
+ Debug.WriteLine("{0}: {1}", metricName, mean);
+ }
+ }
+ }
+
+ public Task StartAsync(CancellationToken cancellationToken)
+ {
+ EnableEvents(OpenMessageEventSource.Instance, EventLevel.LogAlways, EventKeywords.All, new Dictionary {{"EventCounterIntervalSec", "1"}});
+ return Task.CompletedTask;
+ }
+
+ public Task StopAsync(CancellationToken cancellationToken)
+ {
+ return Task.CompletedTask;
+ }
+ }
+}
\ No newline at end of file
diff --git a/samples/OpenMessage.Samples.Core/Services/MassProducerService.cs b/samples/OpenMessage.Samples.Core/Services/MassProducerService.cs
new file mode 100644
index 0000000..98332bc
--- /dev/null
+++ b/samples/OpenMessage.Samples.Core/Services/MassProducerService.cs
@@ -0,0 +1,54 @@
+using AutoFixture;
+using Microsoft.Extensions.Hosting;
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Threading;
+using System.Threading.Tasks;
+
+namespace OpenMessage.Samples.Core.Services
+{
+ internal sealed class MassProducerService : BackgroundService
+ where T : new()
+ {
+ private readonly IDispatcher _dispatcher;
+ private readonly Fixture _fixture = new Fixture();
+ private const int DispatchBatchSize = 100;
+
+ public MassProducerService(IDispatcher dispatcher) => _dispatcher = dispatcher ?? throw new ArgumentNullException(nameof(dispatcher));
+
+ protected override async Task ExecuteAsync(CancellationToken stoppingToken)
+ {
+ // Without this line we can encounter a blocking issue such as: https://github.com/dotnet/extensions/issues/2816
+ await Task.Yield();
+
+ while (!stoppingToken.IsCancellationRequested)
+ {
+ await Task.WhenAll(Enumerable.Range(1, DispatchBatchSize)
+ .Select(async x =>
+ {
+ try
+ {
+ await _dispatcher.DispatchAsync(new ExtendedMessage(_fixture.Create())
+ {
+ //SendDelay = TimeSpan.FromSeconds(15),
+ Properties = new List>
+ {
+ new KeyValuePair("Dispatched", DateTime.UtcNow.ToString())
+ }
+ }, stoppingToken);
+ }
+ catch (Exception e)
+ {
+ if (stoppingToken.IsCancellationRequested)
+ return;
+
+ Console.WriteLine("MassProducer: " + e.Message);
+ }
+ }));
+
+ Console.WriteLine($"Dispatched: {DispatchBatchSize}");
+ }
+ }
+ }
+}
\ No newline at end of file
diff --git a/samples/OpenMessage.Samples.Core/Services/ProducerService.cs b/samples/OpenMessage.Samples.Core/Services/ProducerService.cs
new file mode 100644
index 0000000..c5d7e73
--- /dev/null
+++ b/samples/OpenMessage.Samples.Core/Services/ProducerService.cs
@@ -0,0 +1,37 @@
+using AutoFixture;
+using Microsoft.Extensions.Hosting;
+using System;
+using System.Threading;
+using System.Threading.Tasks;
+
+namespace OpenMessage.Samples.Core.Services
+{
+ internal sealed class ProducerService : BackgroundService
+ where T : new()
+ {
+ private readonly IDispatcher _dispatcher;
+ private readonly Fixture _fixture = new Fixture();
+
+ public ProducerService(IDispatcher dispatcher) => _dispatcher = dispatcher ?? throw new ArgumentNullException(nameof(dispatcher));
+
+ protected override async Task ExecuteAsync(CancellationToken stoppingToken)
+ {
+ // Without this line we can encounter a blocking issue such as: https://github.com/dotnet/extensions/issues/2816
+ await Task.Yield();
+
+ while (!stoppingToken.IsCancellationRequested)
+ try
+ {
+ await _dispatcher.DispatchAsync(_fixture.Create(), stoppingToken);
+ }
+ catch (Exception e)
+ {
+ Console.WriteLine("Producer: " + e.Message);
+ }
+ finally
+ {
+ await Task.Delay(1000);
+ }
+ }
+ }
+}
\ No newline at end of file
diff --git a/samples/OpenMessage.Samples.Kafka/OpenMessage.Samples.Kafka.csproj b/samples/OpenMessage.Samples.Kafka/OpenMessage.Samples.Kafka.csproj
new file mode 100644
index 0000000..84ab4eb
--- /dev/null
+++ b/samples/OpenMessage.Samples.Kafka/OpenMessage.Samples.Kafka.csproj
@@ -0,0 +1,14 @@
+
+
+
+ Exe
+ netcoreapp3.1
+
+
+
+
+
+
+
+
+
diff --git a/samples/OpenMessage.Samples.Kafka/Program.cs b/samples/OpenMessage.Samples.Kafka/Program.cs
new file mode 100644
index 0000000..4a2849f
--- /dev/null
+++ b/samples/OpenMessage.Samples.Kafka/Program.cs
@@ -0,0 +1,45 @@
+using Microsoft.Extensions.DependencyInjection;
+using Microsoft.Extensions.Hosting;
+using OpenMessage.Samples.Core.Models;
+using System;
+using System.Threading;
+using System.Threading.Tasks;
+
+namespace OpenMessage.Samples.Kafka
+{
+ internal class Program
+ {
+ private static int _counter;
+
+ private static async Task Main()
+ {
+ await Host.CreateDefaultBuilder()
+ .ConfigureServices(services => services.AddOptions()
+ .AddLogging()
+ .AddSampleCore()
+ .AddMassProducerService() // Adds a producer that calls configured dispatcher
+ )
+ .ConfigureMessaging(host =>
+ {
+ // Adds a handler that writes to console every 1000 messages
+ host.ConfigureHandler(msg =>
+ {
+ var counter = Interlocked.Increment(ref _counter);
+
+ if (counter % 1000 == 0)
+ Console.WriteLine($"Counter: {counter}");
+ });
+
+ // Allow us to write to kafka
+ host.ConfigureKafkaDispatcher(options => { });
+
+ // Consume from the same topic as we are writing to
+ host.ConfigureKafkaConsumer()
+ .FromTopic("OpenMessage.Samples.Core.Models.SimpleModel".ToLowerInvariant())
+ .Build();
+ })
+ .Build()
+ .RunAsync();
+ }
+ }
+}
\ No newline at end of file
diff --git a/samples/OpenMessage.Samples.Memory/OpenMessage.Samples.Memory.csproj b/samples/OpenMessage.Samples.Memory/OpenMessage.Samples.Memory.csproj
new file mode 100644
index 0000000..f5325b1
--- /dev/null
+++ b/samples/OpenMessage.Samples.Memory/OpenMessage.Samples.Memory.csproj
@@ -0,0 +1,13 @@
+
+
+
+ Exe
+ netcoreapp3.1
+
+
+
+
+
+
+
+
diff --git a/samples/OpenMessage.Samples.Memory/Program.cs b/samples/OpenMessage.Samples.Memory/Program.cs
new file mode 100644
index 0000000..2182d35
--- /dev/null
+++ b/samples/OpenMessage.Samples.Memory/Program.cs
@@ -0,0 +1,40 @@
+using Microsoft.Extensions.DependencyInjection;
+using Microsoft.Extensions.Hosting;
+using OpenMessage.Samples.Core.Models;
+using System;
+using System.Threading;
+using System.Threading.Tasks;
+
+namespace OpenMessage.Samples.Memory
+{
+ internal class Program
+ {
+ private static int _counter;
+
+ private static async Task Main()
+ {
+ await Host.CreateDefaultBuilder()
+ .ConfigureServices(services => services.AddOptions()
+ .AddLogging()
+ .AddMassProducerService() // Adds a producer that calls configured dispatcher
+ )
+ .ConfigureMessaging(host =>
+ {
+ // Adds a memory based consumer and dispatcher
+ host.ConfigureMemory()
+ .Build();
+
+ // Adds a handler that writes to console every 1000 messages
+ host.ConfigureHandler(msg =>
+ {
+ var counter = Interlocked.Increment(ref _counter);
+
+ if (counter % 1000 == 0)
+ Console.WriteLine($"Counter: {counter}");
+ });
+ })
+ .Build()
+ .RunAsync();
+ }
+ }
+}
\ No newline at end of file
diff --git a/samples/OpenMessage.Samples.Setup/OpenMessage.Samples.Setup.csproj b/samples/OpenMessage.Samples.Setup/OpenMessage.Samples.Setup.csproj
new file mode 100644
index 0000000..13a19b6
--- /dev/null
+++ b/samples/OpenMessage.Samples.Setup/OpenMessage.Samples.Setup.csproj
@@ -0,0 +1,20 @@
+
+
+
+ Exe
+ netcoreapp3.1
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/samples/OpenMessage.Samples.Setup/Program.cs b/samples/OpenMessage.Samples.Setup/Program.cs
new file mode 100644
index 0000000..d67a90d
--- /dev/null
+++ b/samples/OpenMessage.Samples.Setup/Program.cs
@@ -0,0 +1,121 @@
+using Amazon.SimpleNotificationService;
+using Amazon.SimpleNotificationService.Model;
+using Amazon.SQS;
+using Amazon.SQS.Model;
+using Confluent.Kafka;
+using Confluent.Kafka.Admin;
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Threading.Tasks;
+
+namespace OpenMessage.Samples.Setup
+{
+ internal class Program
+ {
+ private static readonly string topic = "OpenMessage.Samples.Core.Models.SimpleModel".ToLowerInvariant();
+
+ private static async Task Main(string[] args)
+ {
+ await Task.WhenAll(SetupKafka(), SetupAws());
+ }
+
+ private static async Task SetupAws()
+ {
+ try
+ {
+ Environment.SetEnvironmentVariable("AWS_ACCESS_KEY_ID", "XXX", EnvironmentVariableTarget.Process);
+ Environment.SetEnvironmentVariable("AWS_SECRET_ACCESS_KEY", "XXX", EnvironmentVariableTarget.Process);
+ Environment.SetEnvironmentVariable("AWS_SESSION_TOKEN", "XXX", EnvironmentVariableTarget.Process);
+ Environment.SetEnvironmentVariable("AWS_DEFAULT_REGION", "us-east-1", EnvironmentVariableTarget.Process);
+
+ var snsClient = new AmazonSimpleNotificationServiceClient(new AmazonSimpleNotificationServiceConfig
+ {
+ ServiceURL = "http://localhost:4575"
+ });
+
+ var sqsClient = new AmazonSQSClient(new AmazonSQSConfig
+ {
+ ServiceURL = "http://localhost:4576"
+ });
+
+ var topicName = topic.Replace(".", "_");
+
+ var topicRequest = new CreateTopicRequest(topicName);
+ var topicResponse = await snsClient.CreateTopicAsync(topicRequest);
+
+ var queueRequest = new CreateQueueRequest($"{topicName}.queue");
+ var queueResponse = await sqsClient.CreateQueueAsync(queueRequest);
+
+ var subscribeRequest = new SubscribeRequest
+ {
+ Endpoint = queueResponse.QueueUrl,
+ TopicArn = topicResponse.TopicArn,
+ Protocol = "sqs",
+ ReturnSubscriptionArn = true,
+ Attributes = new Dictionary
+ {
+ ["RawMessageDelivery"] = "true"
+ }
+ };
+ var subscribeResponse = await snsClient.SubscribeAsync(subscribeRequest);
+
+ (await snsClient.ListTopicsAsync()).Topics.ForEach(x => Console.WriteLine($"[AWS] Topic: {x.TopicArn}"));
+ (await sqsClient.ListQueuesAsync(new ListQueuesRequest())).QueueUrls.ForEach(x => Console.WriteLine($"[AWS] Queue: {x}"));
+ (await snsClient.ListSubscriptionsAsync(new ListSubscriptionsRequest())).Subscriptions.ForEach(x => Console.WriteLine($"[AWS] Subscription: {x.TopicArn} -> {x.Endpoint}"));
+ }
+ catch (Exception e)
+ {
+ Console.WriteLine($"[AWS] {e.Message}");
+ }
+ }
+
+ private static async Task SetupKafka()
+ {
+ try
+ {
+ var client = new AdminClientBuilder(new Dictionary
+ {
+ ["bootstrap.servers"] = "localhost:9092",
+ ["topic.metadata.refresh.interval.ms"] = "500"
+ }).SetLogHandler((client, message) => Console.WriteLine($"[Kafka] [{message.Level}] {message.Message}"))
+ .Build();
+
+ var topics = client.GetMetadata(TimeSpan.FromMinutes(1))
+ .Topics;
+
+ if (topics.Any(x => x.Topic.StartsWith("OpenMessage", StringComparison.OrdinalIgnoreCase)))
+ {
+ await client.DeleteTopicsAsync(topics.Where(x => x.Topic.StartsWith("OpenMessage", StringComparison.OrdinalIgnoreCase))
+ .Select(x => x.Topic));
+
+ await Task.Delay(1000);
+ }
+
+ await client.CreateTopicsAsync(new[]
+ {
+ new TopicSpecification
+ {
+ Name = topic,
+ NumPartitions = 5,
+ ReplicationFactor = 1,
+ Configs = new Dictionary
+ {
+ ["retention.ms"] = TimeSpan.FromMinutes(15)
+ .Milliseconds.ToString()
+ }
+ }
+ });
+ await Task.Delay(1000);
+
+ topics = client.GetMetadata(TimeSpan.FromMinutes(1))
+ .Topics;
+ topics.ForEach(x => Console.WriteLine($"[Kafka] Topic: {x.Topic}(Partitions: {x.Partitions.Count})"));
+ }
+ catch (Exception e)
+ {
+ Console.WriteLine($"[Kafka] {e.Message}");
+ }
+ }
+ }
+}
\ No newline at end of file
diff --git a/src/Directory.Build.props b/src/Directory.Build.props
new file mode 100644
index 0000000..d437ad4
--- /dev/null
+++ b/src/Directory.Build.props
@@ -0,0 +1,41 @@
+
+
+
+
+ netstandard2.1;netcoreapp3.1
+ icon.png
+
+ preview
+
+
+
+ True
+
+
+
+
+
+ true
+ true
+ $(RepositoryRoot)/artifacts
+ Preview release for use with .Net Core 3
+ $(MSBuildProjectName)
+ MIT
+
+
+ $(NoWarn);NU5048;NU5105
+
+ true
+
+ true
+
+ $(AllowedOutputExtensionsInPackageBuildOutputFolder);.pdb
+
+ true
+ false
+ false
+ true
+ true
+ true
+
+
\ No newline at end of file
diff --git a/src/Directory.Build.targets b/src/Directory.Build.targets
new file mode 100644
index 0000000..0a14c6a
--- /dev/null
+++ b/src/Directory.Build.targets
@@ -0,0 +1,22 @@
+
+
+
+
+
+
+
+
+
+ all
+ runtime; build; native; contentfiles; analyzers
+
+
+ all
+ runtime; build; native; contentfiles; analyzers; buildtransitive
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/src/OpenMessage.AWS.SNS/Configuration/ISnsDispatcherBuilder.cs b/src/OpenMessage.AWS.SNS/Configuration/ISnsDispatcherBuilder.cs
new file mode 100644
index 0000000..536b4c9
--- /dev/null
+++ b/src/OpenMessage.AWS.SNS/Configuration/ISnsDispatcherBuilder.cs
@@ -0,0 +1,33 @@
+using Microsoft.Extensions.Hosting;
+using OpenMessage.Builders;
+using System;
+
+namespace OpenMessage.AWS.SNS.Configuration
+{
+ ///
+ /// SNS Dispatcher Builder
+ ///
+ public interface ISnsDispatcherBuilder : IBuilder
+ {
+ ///
+ /// Configure the dispatcher with the specified options
+ ///
+ /// The configuration action
+ /// The SQS dispatcher builder
+ ISnsDispatcherBuilder FromConfiguration(Action> configuration);
+
+ ///
+ /// Configure the dispatcher with the specified options
+ ///
+ /// The configuration action
+ /// The SQS dispatcher builder
+ ISnsDispatcherBuilder FromConfiguration(Action> configuration);
+
+ ///
+ /// Configure the dispatcher with the specified options
+ ///
+ /// The configuration section to use
+ /// The SNS dispatcher builder
+ ISnsDispatcherBuilder FromConfiguration(string configurationSection);
+ }
+}
\ No newline at end of file
diff --git a/src/OpenMessage.AWS.SNS/Configuration/SNSOptions.cs b/src/OpenMessage.AWS.SNS/Configuration/SNSOptions.cs
new file mode 100644
index 0000000..4f3c058
--- /dev/null
+++ b/src/OpenMessage.AWS.SNS/Configuration/SNSOptions.cs
@@ -0,0 +1,31 @@
+using Amazon.SimpleNotificationService;
+using System;
+
+namespace OpenMessage.AWS.SNS.Configuration
+{
+ ///
+ /// Configuration options for dispatchers
+ ///
+ public class SNSOptions
+ {
+ ///
+ /// Allow the configuration of the raw AWS SNS Dispatcher Config during initialization of the dispatcher.
+ ///
+ public Action? AwsDispatcherConfiguration { get; set; }
+
+ ///
+ /// The region endpoint to use
+ ///
+ public string? RegionEndpoint { get; set; }
+
+ ///
+ /// The url to use for authentication
+ ///
+ public string? ServiceURL { get; set; }
+
+ ///
+ /// The topic ARN to send to
+ ///
+ public string? TopicArn { get; set; }
+ }
+}
\ No newline at end of file
diff --git a/src/OpenMessage.AWS.SNS/Configuration/SnsDispatcherBuilder.cs b/src/OpenMessage.AWS.SNS/Configuration/SnsDispatcherBuilder.cs
new file mode 100644
index 0000000..02f6c84
--- /dev/null
+++ b/src/OpenMessage.AWS.SNS/Configuration/SnsDispatcherBuilder.cs
@@ -0,0 +1,42 @@
+using Microsoft.Extensions.Configuration;
+using Microsoft.Extensions.DependencyInjection;
+using Microsoft.Extensions.Hosting;
+using OpenMessage.Builders;
+using System;
+
+namespace OpenMessage.AWS.SNS.Configuration
+{
+ internal sealed class SnsDispatcherBuilder : Builder, ISnsDispatcherBuilder
+ {
+ private Action>? _configuration;
+
+ public SnsDispatcherBuilder(IMessagingBuilder hostBuilder)
+ : base(hostBuilder) { }
+
+ public override void Build()
+ {
+ if (_configuration is {})
+ ConfigureOptions(_configuration, true);
+ HostBuilder.Services.AddSingleton, SnsDispatcher>();
+ }
+
+ public ISnsDispatcherBuilder FromConfiguration(Action> configuration)
+ {
+ return FromConfiguration((context, options) => configuration(options));
+ }
+
+ public ISnsDispatcherBuilder FromConfiguration(Action> configuration)
+ {
+ _configuration = configuration;
+
+ return this;
+ }
+
+ public ISnsDispatcherBuilder FromConfiguration(string configurationSection)
+ {
+ _configuration = (context, options) => context.Configuration.Bind(configurationSection, options);
+
+ return this;
+ }
+ }
+}
\ No newline at end of file
diff --git a/src/OpenMessage.AWS.SNS/OpenMessage.AWS.SNS.csproj b/src/OpenMessage.AWS.SNS/OpenMessage.AWS.SNS.csproj
new file mode 100644
index 0000000..04691ac
--- /dev/null
+++ b/src/OpenMessage.AWS.SNS/OpenMessage.AWS.SNS.csproj
@@ -0,0 +1,13 @@
+
+
+
+ $(ProjectTargetFrameworks)
+ SNS dispatcher implementation for OpenMessage
+
+
+
+
+
+
+
+
diff --git a/src/OpenMessage.AWS.SNS/SnsDispatcher.cs b/src/OpenMessage.AWS.SNS/SnsDispatcher.cs
new file mode 100644
index 0000000..ab56d1c
--- /dev/null
+++ b/src/OpenMessage.AWS.SNS/SnsDispatcher.cs
@@ -0,0 +1,165 @@
+using Amazon;
+using Amazon.SimpleNotificationService;
+using Amazon.SimpleNotificationService.Model;
+using Microsoft.Extensions.Options;
+using OpenMessage.AWS.SNS.Configuration;
+using OpenMessage.Serialization;
+using System;
+using System.Collections.Generic;
+using System.Diagnostics;
+using System.Net;
+using System.Text;
+using System.Threading;
+using System.Threading.Tasks;
+using Microsoft.Extensions.Logging;
+
+namespace OpenMessage.AWS.SNS
+{
+ internal sealed class SnsDispatcher : DispatcherBase
+ {
+ private static readonly string AttributeType = "String";
+ private readonly AmazonSimpleNotificationServiceClient _client;
+ private readonly MessageAttributeValue _contentType;
+ private readonly ISerializer _serializer;
+ private readonly string _topicArn;
+ private readonly MessageAttributeValue _valueTypeName;
+
+ public SnsDispatcher(IOptions> options, ISerializer serializer, ILogger> logger)
+ : base(logger)
+ {
+ _serializer = serializer ?? throw new ArgumentNullException(nameof(serializer));
+ var config = options?.Value ?? throw new ArgumentNullException(nameof(options));
+
+ var snsConfig = new AmazonSimpleNotificationServiceConfig
+ {
+ ServiceURL = config.ServiceURL
+ };
+
+ if (!string.IsNullOrEmpty(config.RegionEndpoint))
+ snsConfig.RegionEndpoint = RegionEndpoint.GetBySystemName(config.RegionEndpoint);
+
+ config.AwsDispatcherConfiguration?.Invoke(snsConfig);
+ _client = new AmazonSimpleNotificationServiceClient(snsConfig);
+
+ _contentType = new MessageAttributeValue
+ {
+ DataType = AttributeType,
+ StringValue = _serializer.ContentType
+ };
+
+ _valueTypeName = new MessageAttributeValue
+ {
+ DataType = AttributeType,
+ StringValue = typeof(T).AssemblyQualifiedName
+ };
+ _topicArn = config.TopicArn ?? throw new Exception("No topic arn set for type: " + (TypeCache.FriendlyName ?? string.Empty));
+ }
+
+ public override async Task DispatchAsync(Message message, CancellationToken cancellationToken)
+ {
+ LogDispatch(message);
+
+ if (message.Value is null)
+ Throw.Exception("Message value cannot be null");
+
+ var msg = _serializer.AsString(message.Value);
+ if (string.IsNullOrWhiteSpace(msg))
+ Throw.Exception("Message could not be serialized");
+
+ var request = new PublishRequest
+ {
+ MessageAttributes = GetMessageProperties(message),
+ Message = msg,
+ TopicArn = _topicArn
+ };
+
+#if NETCOREAPP3_1
+ var stopwatch = OpenMessageEventSource.Instance.ProcessMessageDispatchStart();
+#endif
+
+ try
+ {
+ var response = await _client.PublishAsync(request, cancellationToken);
+
+ if (response.HttpStatusCode != HttpStatusCode.OK)
+ ThrowExceptionFromHttpResponse(response.HttpStatusCode);
+ }
+ catch (AmazonSimpleNotificationServiceException e) when (e.ErrorCode == "NotFound")
+ {
+ ThrowExceptionFromHttpResponse(e.StatusCode, e);
+ }
+ finally
+ {
+#if NETCOREAPP3_1
+ if (stopwatch.HasValue)
+ OpenMessageEventSource.Instance.ProcessMessageDispatchStop(stopwatch.Value);
+#endif
+ }
+ }
+
+ private Dictionary GetMessageProperties(Message message)
+ {
+ var result = new Dictionary
+ {
+ [KnownProperties.ContentType] = _contentType,
+ [KnownProperties.ValueTypeName] = _valueTypeName
+ };
+
+ if (Activity.Current is {})
+ result[KnownProperties.ActivityId] = new MessageAttributeValue
+ {
+ DataType = AttributeType,
+ StringValue = Activity.Current.Id
+ };
+
+ switch (message)
+ {
+ case ISupportProperties p:
+ {
+ foreach (var prop in p.Properties)
+ result[prop.Key] = new MessageAttributeValue
+ {
+ DataType = AttributeType,
+ StringValue = prop.Value
+ };
+
+ break;
+ }
+ case ISupportProperties p2:
+ {
+ foreach (var prop in p2.Properties)
+ result[prop.Key] = new MessageAttributeValue
+ {
+ DataType = AttributeType,
+ StringValue = Encoding.UTF8.GetString(prop.Value)
+ };
+
+ break;
+ }
+ case ISupportProperties p3:
+ {
+ foreach (var prop in p3.Properties)
+ result[Encoding.UTF8.GetString(prop.Key)] = new MessageAttributeValue
+ {
+ DataType = AttributeType,
+ StringValue = Encoding.UTF8.GetString(prop.Value)
+ };
+
+ break;
+ }
+ }
+
+ return result;
+ }
+
+ private void ThrowExceptionFromHttpResponse(HttpStatusCode statusCode, Exception? innerException = null)
+ {
+ var msg = $"Failed to send the message to SNS. Type: '{TypeCache.FriendlyName}' Topic ARN: '{_topicArn ?? ""}' Status Code: '{statusCode}'.";
+
+ if (innerException is null)
+ throw new Exception(msg);
+
+ throw new Exception(msg, innerException);
+ }
+ }
+}
\ No newline at end of file
diff --git a/src/OpenMessage.AWS.SNS/SnsServiceExtensions.cs b/src/OpenMessage.AWS.SNS/SnsServiceExtensions.cs
new file mode 100644
index 0000000..0bf36ee
--- /dev/null
+++ b/src/OpenMessage.AWS.SNS/SnsServiceExtensions.cs
@@ -0,0 +1,19 @@
+using Microsoft.Extensions.DependencyInjection;
+using OpenMessage.AWS.SNS.Configuration;
+
+namespace OpenMessage.AWS.SNS
+{
+ ///
+ /// SNS Extensions
+ ///
+ public static class SnsServiceExtensions
+ {
+ ///
+ /// Returns an SNS dispatcher builder
+ ///
+ /// The host the dispatcher belongs to
+ /// The type of message to dispatch
+ /// An SNS dispatcher builder
+ public static ISnsDispatcherBuilder ConfigureSnsDispatcher(this IMessagingBuilder messagingBuilder) => new SnsDispatcherBuilder(messagingBuilder);
+ }
+}
\ No newline at end of file
diff --git a/src/OpenMessage.AWS.SQS/Configuration/ISqsConsumerBuilder.cs b/src/OpenMessage.AWS.SQS/Configuration/ISqsConsumerBuilder.cs
new file mode 100644
index 0000000..da8de1c
--- /dev/null
+++ b/src/OpenMessage.AWS.SQS/Configuration/ISqsConsumerBuilder.cs
@@ -0,0 +1,34 @@
+using Microsoft.Extensions.Hosting;
+using OpenMessage.Builders;
+using System;
+
+namespace OpenMessage.AWS.SQS.Configuration
+{
+ ///
+ /// The builder for an SQS consumer
+ ///
+ /// The type to be consumed
+ public interface ISqsConsumerBuilder : IBuilder
+ {
+ ///
+ /// Configure the consumer with the specified options
+ ///
+ /// The configuration action
+ /// The SQS consumer builder
+ ISqsConsumerBuilder FromConfiguration(Action configuration);
+
+ ///
+ /// Configure the consumer with the specified options
+ ///
+ /// The configuration action
+ /// The SQS consumer builder
+ ISqsConsumerBuilder FromConfiguration(Action configuration);
+
+ ///
+ /// Configure the dispatcher with the specified options
+ ///
+ /// The configuration section to use
+ /// The SQS dispatcher builder
+ ISqsConsumerBuilder FromConfiguration(string configurationSection);
+ }
+}
\ No newline at end of file
diff --git a/src/OpenMessage.AWS.SQS/Configuration/ISqsDispatcherBuilder.cs b/src/OpenMessage.AWS.SQS/Configuration/ISqsDispatcherBuilder.cs
new file mode 100644
index 0000000..f71104c
--- /dev/null
+++ b/src/OpenMessage.AWS.SQS/Configuration/ISqsDispatcherBuilder.cs
@@ -0,0 +1,41 @@
+using Microsoft.Extensions.Hosting;
+using OpenMessage.Builders;
+using System;
+
+namespace OpenMessage.AWS.SQS.Configuration
+{
+ ///
+ /// The builder for an SQS consumer
+ ///
+ /// The type to be dispatched
+ public interface ISqsDispatcherBuilder : IBuilder
+ {
+ ///
+ /// Configure the dispatcher with the specified options
+ ///
+ /// The configuration action
+ /// The SQS dispatcher builder
+ ISqsDispatcherBuilder FromConfiguration(Action> configuration);
+
+ ///
+ /// Configure the dispatcher with the specified options
+ ///
+ /// The configuration action
+ /// The SQS dispatcher builder
+ ISqsDispatcherBuilder FromConfiguration(Action> configuration);
+
+ ///
+ /// Configure the dispatcher with the specified options
+ ///
+ /// The configuration section to use
+ /// The SQS dispatcher builder
+ ISqsDispatcherBuilder FromConfiguration(string configurationSection);
+
+ ///
+ /// Enables the batched dispatcher mechanism
+ ///
+ /// Whether or not to enable the batched dispatcher
+ /// The SQS dispatcher builder
+ ISqsDispatcherBuilder WithBatchedDispatcher(bool enabled = true);
+ }
+}
\ No newline at end of file
diff --git a/src/OpenMessage.AWS.SQS/Configuration/SQSConsumerOptions.cs b/src/OpenMessage.AWS.SQS/Configuration/SQSConsumerOptions.cs
new file mode 100644
index 0000000..eab969b
--- /dev/null
+++ b/src/OpenMessage.AWS.SQS/Configuration/SQSConsumerOptions.cs
@@ -0,0 +1,67 @@
+using Amazon.SQS;
+using System;
+using System.Collections.Generic;
+
+namespace OpenMessage.AWS.SQS.Configuration
+{
+ ///
+ /// Options for an SQS consumer
+ ///
+ public class SQSConsumerOptions
+ {
+ ///
+ /// Allow the configuration of the raw AWS SQS Client Config during initialization of the consumer.
+ ///
+ public Action? AwsConsumerConfiguration { get; set; }
+
+ ///
+ /// The maximum number of messages to consume
+ ///
+ public int MaxNumberOfMessages { get; set; } = 10;
+
+ ///
+ /// The url of the queue to consume from
+ ///
+ public string? QueueUrl { get; set; }
+
+ ///
+ /// The region endpoint to use
+ ///
+ public string? RegionEndpoint { get; set; }
+
+ ///
+ /// The service url to use
+ ///
+ public string? ServiceURL { get; set; }
+
+ ///
+ /// How long to leave the message on the queue before it becomes consumable again
+ ///
+ public int? VisibilityTimeout { get; set; }
+
+ ///
+ /// The waiting period before return the messages, in seconds
+ ///
+ public int WaitTimeSeconds { get; set; }
+
+ ///
+ /// The minimum number of consumers to manage
+ ///
+ public byte MinimumConsumerCount { get; set; } = 1;
+
+ ///
+ /// The maximum number of consumers to manage
+ ///
+ public byte MaximumConsumerCount { get; set; } = 10;
+
+ ///
+ /// The SQS specific messages attributes to retrieve, eg: ApproximateFirstReceiveTimestamp, ApproximateReceiveCount, AWSTraceHeader, SenderId, SentTimestamp, MessageDeduplicationId, MessageGroupId, SequenceNumber
+ ///
+ public List SQSMessageAttributes { get; set; } = new List(0);
+
+ ///
+ /// The custom message properties, eg: ContentType, to load from the message
+ ///
+ public List CustomMessageAttributes { get; set; } = new List { "All" };
+ }
+}
\ No newline at end of file
diff --git a/src/OpenMessage.AWS.SQS/Configuration/SQSDispatcherOptions.cs b/src/OpenMessage.AWS.SQS/Configuration/SQSDispatcherOptions.cs
new file mode 100644
index 0000000..90ebe08
--- /dev/null
+++ b/src/OpenMessage.AWS.SQS/Configuration/SQSDispatcherOptions.cs
@@ -0,0 +1,31 @@
+using Amazon.SQS;
+using System;
+
+namespace OpenMessage.AWS.SQS.Configuration
+{
+ ///
+ /// Configuration options for dispatchers
+ ///
+ public class SQSDispatcherOptions
+ {
+ ///
+ /// Allow the configuration of the raw AWS SQS Dispatcher Config during initialization of the dispatcher.
+ ///
+ public Action? AwsDispatcherConfiguration { get; set; }
+
+ ///
+ /// The queue url to dispatch to
+ ///
+ public string? QueueUrl { get; set; }
+
+ ///
+ /// The region endpoint to use
+ ///
+ public string? RegionEndpoint { get; set; }
+
+ ///
+ /// The url to use for authentication
+ ///
+ public string? ServiceURL { get; set; }
+ }
+}
\ No newline at end of file
diff --git a/src/OpenMessage.AWS.SQS/Configuration/SqsConsumerBuilder.cs b/src/OpenMessage.AWS.SQS/Configuration/SqsConsumerBuilder.cs
new file mode 100644
index 0000000..cb9fd1d
--- /dev/null
+++ b/src/OpenMessage.AWS.SQS/Configuration/SqsConsumerBuilder.cs
@@ -0,0 +1,49 @@
+using Microsoft.Extensions.Configuration;
+using Microsoft.Extensions.DependencyInjection;
+using Microsoft.Extensions.DependencyInjection.Extensions;
+using Microsoft.Extensions.Hosting;
+using OpenMessage.Builders;
+using System;
+
+namespace OpenMessage.AWS.SQS.Configuration
+{
+ internal sealed class SqsConsumerBuilder : Builder, ISqsConsumerBuilder
+ {
+ private Action? _configuration;
+
+ public SqsConsumerBuilder(IMessagingBuilder hostBuilder)
+ : base(hostBuilder) { }
+
+ public override void Build()
+ {
+ HostBuilder.Services.TryAddConsumerService();
+ HostBuilder.TryConfigureDefaultPipeline();
+
+ if (_configuration is {})
+ ConfigureOptions(_configuration);
+
+ HostBuilder.Services.TryAddTransient, SqsConsumer>();
+ HostBuilder.Services.TryAddTransient, QueueMonitor>();
+ HostBuilder.Services.AddConsumerService>(ConsumerId);
+ }
+
+ public ISqsConsumerBuilder FromConfiguration(Action configuration)
+ {
+ return FromConfiguration((context, options) => configuration(options));
+ }
+
+ public ISqsConsumerBuilder FromConfiguration(Action configuration)
+ {
+ _configuration = configuration;
+
+ return this;
+ }
+
+ public ISqsConsumerBuilder FromConfiguration(string configurationSection)
+ {
+ _configuration = (context, options) => context.Configuration.Bind(configurationSection, options);
+
+ return this;
+ }
+ }
+}
\ No newline at end of file
diff --git a/src/OpenMessage.AWS.SQS/Configuration/SqsDispatcherBuilder.cs b/src/OpenMessage.AWS.SQS/Configuration/SqsDispatcherBuilder.cs
new file mode 100644
index 0000000..72a2b00
--- /dev/null
+++ b/src/OpenMessage.AWS.SQS/Configuration/SqsDispatcherBuilder.cs
@@ -0,0 +1,60 @@
+using Microsoft.Extensions.Configuration;
+using Microsoft.Extensions.DependencyInjection;
+using Microsoft.Extensions.Hosting;
+using OpenMessage.Builders;
+using System;
+using System.Threading.Channels;
+
+namespace OpenMessage.AWS.SQS.Configuration
+{
+ internal sealed class SqsDispatcherBuilder : Builder, ISqsDispatcherBuilder
+ {
+ private Action>? _configuration;
+ private bool _batchedDispatcher = true;
+
+ public SqsDispatcherBuilder(IMessagingBuilder hostBuilder)
+ : base(hostBuilder) { }
+
+ public override void Build()
+ {
+ if (_configuration is {})
+ ConfigureOptions(_configuration, true);
+
+ if (_batchedDispatcher)
+ {
+ HostBuilder.Services.AddHostedService();
+ HostBuilder.Services.TryAddChannel(sp => Channel.CreateUnbounded(new UnboundedChannelOptions { SingleReader = true, SingleWriter = false }));
+ HostBuilder.Services.AddSingleton, SqsBatchedDispatcher>();
+ }
+ else
+ {
+ HostBuilder.Services.AddSingleton, SqsDispatcher>();
+ }
+ }
+
+ public ISqsDispatcherBuilder FromConfiguration(Action> configuration)
+ {
+ return FromConfiguration((context, options) => configuration(options));
+ }
+
+ public ISqsDispatcherBuilder FromConfiguration(Action> configuration)
+ {
+ _configuration = configuration;
+
+ return this;
+ }
+
+ public ISqsDispatcherBuilder FromConfiguration(string configurationSection)
+ {
+ _configuration = (context, options) => context.Configuration.Bind(configurationSection, options);
+
+ return this;
+ }
+
+ public ISqsDispatcherBuilder WithBatchedDispatcher(bool enabled = true)
+ {
+ _batchedDispatcher = enabled;
+ return this;
+ }
+ }
+}
\ No newline at end of file
diff --git a/src/OpenMessage.AWS.SQS/IQueueMonitor.cs b/src/OpenMessage.AWS.SQS/IQueueMonitor.cs
new file mode 100644
index 0000000..0ba3787
--- /dev/null
+++ b/src/OpenMessage.AWS.SQS/IQueueMonitor.cs
@@ -0,0 +1,10 @@
+using System.Threading;
+using System.Threading.Tasks;
+
+namespace OpenMessage.AWS.SQS
+{
+ internal interface IQueueMonitor
+ {
+ Task GetQueueCountAsync(string consumerId, CancellationToken cancellationToken);
+ }
+}
\ No newline at end of file
diff --git a/src/OpenMessage.AWS.SQS/ISqsConsumer.cs b/src/OpenMessage.AWS.SQS/ISqsConsumer.cs
new file mode 100644
index 0000000..1b90c79
--- /dev/null
+++ b/src/OpenMessage.AWS.SQS/ISqsConsumer.cs
@@ -0,0 +1,12 @@
+using System.Collections.Generic;
+using System.Threading;
+using System.Threading.Tasks;
+
+namespace OpenMessage.AWS.SQS
+{
+ internal interface ISqsConsumer
+ {
+ Task>> ConsumeAsync(CancellationToken cancellationToken);
+ void Initialize(string consumerId, CancellationToken cancellationToken);
+ }
+}
\ No newline at end of file
diff --git a/src/OpenMessage.AWS.SQS/OpenMessage.AWS.SQS.csproj b/src/OpenMessage.AWS.SQS/OpenMessage.AWS.SQS.csproj
new file mode 100644
index 0000000..74456b3
--- /dev/null
+++ b/src/OpenMessage.AWS.SQS/OpenMessage.AWS.SQS.csproj
@@ -0,0 +1,13 @@
+
+
+
+ $(ProjectTargetFrameworks)
+ SQS consumer and dispatcher implementation for OpenMessage
+
+
+
+
+
+
+
+
diff --git a/src/OpenMessage.AWS.SQS/QueueMonitor.cs b/src/OpenMessage.AWS.SQS/QueueMonitor.cs
new file mode 100644
index 0000000..1d47dbe
--- /dev/null
+++ b/src/OpenMessage.AWS.SQS/QueueMonitor.cs
@@ -0,0 +1,59 @@
+using System;
+using System.Collections.Concurrent;
+using System.Collections.Generic;
+using System.Threading;
+using System.Threading.Tasks;
+using Amazon;
+using Amazon.SQS;
+using Amazon.SQS.Model;
+using Microsoft.Extensions.Options;
+using OpenMessage.AWS.SQS.Configuration;
+
+namespace OpenMessage.AWS.SQS
+{
+ internal sealed class QueueMonitor : IQueueMonitor
+ {
+ private readonly IOptionsMonitor _sqsOptions;
+ private readonly List QueueAttributes = new List
+ {
+ "ApproximateNumberOfMessages",
+ "ApproximateNumberOfMessagesDelayed",
+ "ApproximateNumberOfMessagesNotVisible"
+ };
+ private readonly ConcurrentDictionary _clients = new ConcurrentDictionary();
+
+ public QueueMonitor(IOptionsMonitor sqsOptions)
+ {
+ _sqsOptions = sqsOptions ?? throw new ArgumentNullException(nameof(sqsOptions));
+ }
+
+ public async Task GetQueueCountAsync(string consumerId, CancellationToken cancellationToken)
+ {
+ if (string.IsNullOrWhiteSpace(consumerId))
+ throw new ArgumentNullException(nameof(consumerId));
+
+ var options = _sqsOptions.Get(consumerId);
+ var client = _clients.GetOrAdd(consumerId, id =>
+ {
+ var config = new AmazonSQSConfig
+ {
+ ServiceURL = options.ServiceURL
+ };
+
+ if (!string.IsNullOrEmpty(options.RegionEndpoint))
+ config.RegionEndpoint = RegionEndpoint.GetBySystemName(options.RegionEndpoint);
+
+ options.AwsConsumerConfiguration?.Invoke(config);
+ return new AmazonSQSClient(config);
+ });
+
+ var attributes = await client.GetQueueAttributesAsync(new GetQueueAttributesRequest
+ {
+ QueueUrl = options.QueueUrl,
+ AttributeNames = QueueAttributes
+ }, cancellationToken);
+
+ return attributes.ApproximateNumberOfMessages;
+ }
+ }
+}
\ No newline at end of file
diff --git a/src/OpenMessage.AWS.SQS/SendSqsMessageCommand.cs b/src/OpenMessage.AWS.SQS/SendSqsMessageCommand.cs
new file mode 100644
index 0000000..5aae698
--- /dev/null
+++ b/src/OpenMessage.AWS.SQS/SendSqsMessageCommand.cs
@@ -0,0 +1,57 @@
+using System;
+using System.Threading;
+using System.Threading.Tasks;
+using Amazon.SQS.Model;
+
+namespace OpenMessage.AWS.SQS
+{
+ internal class SendSqsMessageCommand
+ {
+ private string? _lookupKey;
+#if NETCOREAPP3_1
+ private OpenMessageEventSource.ValueStopwatch? _stopwatch;
+#endif
+ private TaskCompletionSource _taskCompletionSource = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously);
+
+ internal string? QueueUrl { get; set; }
+ internal SendMessageBatchRequestEntry? Message { get; set; }
+ internal string? ServiceUrl { get; set; }
+ internal string? RegionEndpoint { get; set; }
+ internal string LookupKey => _lookupKey ??= $"{QueueUrl ?? string.Empty}|{ServiceUrl ?? string.Empty}|{RegionEndpoint ?? string.Empty}";
+
+ public SendSqsMessageCommand()
+ {
+#if NETCOREAPP3_1
+ _stopwatch = OpenMessageEventSource.Instance.ProcessMessageDispatchStart();
+#endif
+ }
+
+ internal void Complete()
+ {
+ _taskCompletionSource.TrySetResult(true);
+ CompleteCore();
+ }
+
+ internal void Cancel(CancellationToken ct)
+ {
+ _taskCompletionSource.TrySetCanceled(ct);
+ CompleteCore();
+ }
+
+ internal void Exception(Exception ex)
+ {
+ _taskCompletionSource.TrySetException(ex);
+ CompleteCore();
+ }
+
+ internal Task WaitForCompletion() => _taskCompletionSource.Task;
+
+ private void CompleteCore()
+ {
+#if NETCOREAPP3_1
+ if (_stopwatch.HasValue)
+ OpenMessageEventSource.Instance.ProcessMessageDispatchStop(_stopwatch.Value);
+#endif
+ }
+ }
+}
\ No newline at end of file
diff --git a/src/OpenMessage.AWS.SQS/SqsBatchedDispatcher.cs b/src/OpenMessage.AWS.SQS/SqsBatchedDispatcher.cs
new file mode 100644
index 0000000..3364a99
--- /dev/null
+++ b/src/OpenMessage.AWS.SQS/SqsBatchedDispatcher.cs
@@ -0,0 +1,155 @@
+using System;
+using System.Collections.Generic;
+using System.Diagnostics;
+using System.Text;
+using System.Threading;
+using System.Threading.Channels;
+using System.Threading.Tasks;
+using Amazon.SQS.Model;
+using Microsoft.Extensions.Logging;
+using Microsoft.Extensions.Options;
+using OpenMessage.AWS.SQS.Configuration;
+using OpenMessage.Serialization;
+
+namespace OpenMessage.AWS.SQS
+{
+ internal sealed class SqsBatchedDispatcher : DispatcherBase
+ {
+ //15min = 900sec is the maximum delay supported by sqs
+ private const int MaximumSqsDelaySeconds = 900;
+ private static readonly string AttributeType = "String";
+ private readonly MessageAttributeValue _contentType;
+ private readonly IOptionsMonitor> _options;
+ private readonly ISerializer _serializer;
+ private readonly ChannelWriter _messageWriter;
+
+ public SqsBatchedDispatcher(IOptionsMonitor> options, ISerializer serializer, ILogger> logger, ChannelWriter messageWriter)
+ : base(logger)
+ {
+ _options = options ?? throw new ArgumentNullException(nameof(options));
+ _serializer = serializer ?? throw new ArgumentNullException(nameof(serializer));
+ _messageWriter = messageWriter ?? throw new ArgumentNullException(nameof(messageWriter));
+
+ _contentType = new MessageAttributeValue
+ {
+ DataType = AttributeType,
+ StringValue = _serializer.ContentType
+ };
+ }
+
+ public override async Task DispatchAsync(Message message, CancellationToken cancellationToken)
+ {
+ if (message.Value is null)
+ Throw.Exception("Message value cannot be null");
+
+ var json = _serializer.AsString(message.Value);
+ if (string.IsNullOrWhiteSpace(json))
+ Throw.Exception("Message could not be serialized");
+
+ var options = _options.CurrentValue;
+ if (options is null)
+ Throw.Exception("Options cannot be null");
+
+ LogDispatch(message);
+
+ var request = new SendMessageBatchRequestEntry
+ {
+ Id = Guid.NewGuid().ToString("N"),
+ MessageAttributes = GetMessageProperties(message),
+ DelaySeconds = DelaySeconds(message),
+ MessageBody = json
+ };
+
+ var msg = new SendSqsMessageCommand
+ {
+ Message = request,
+ QueueUrl = options.QueueUrl,
+ ServiceUrl = options.ServiceURL,
+ RegionEndpoint = options.RegionEndpoint
+ };
+
+ await _messageWriter.WriteAsync(msg, cancellationToken);
+
+ var taskCancellation = cancellationToken.Register(() => msg.Cancel(cancellationToken));
+ try
+ {
+ await msg.WaitForCompletion();
+ }
+ finally
+ {
+ taskCancellation.Dispose();
+ }
+ }
+
+ private static int DelaySeconds(Message message)
+ {
+ if (message is ISupportSendDelay delay && delay.SendDelay > TimeSpan.Zero)
+ {
+ return Math.Min(MaximumSqsDelaySeconds, (int) delay.SendDelay.TotalSeconds);
+ }
+
+ return 0;
+ }
+
+ private Dictionary GetMessageProperties(Message message)
+ {
+ var result = new Dictionary
+ {
+ [KnownProperties.ContentType] = _contentType
+ };
+
+ if (!(message.Value is null))
+ result[KnownProperties.ValueTypeName] = new MessageAttributeValue
+ {
+ DataType = AttributeType,
+ StringValue = message.Value.GetType().AssemblyQualifiedName
+ };
+
+ if (Activity.Current is {})
+ result[KnownProperties.ActivityId] = new MessageAttributeValue
+ {
+ DataType = AttributeType,
+ StringValue = Activity.Current.Id
+ };
+
+ switch (message)
+ {
+ case ISupportProperties p:
+ {
+ foreach (var prop in p.Properties)
+ result[prop.Key] = new MessageAttributeValue
+ {
+ DataType = AttributeType,
+ StringValue = prop.Value
+ };
+
+ break;
+ }
+ case ISupportProperties p2:
+ {
+ foreach (var prop in p2.Properties)
+ result[prop.Key] = new MessageAttributeValue
+ {
+ DataType = AttributeType,
+ StringValue = Encoding.UTF8.GetString(prop.Value)
+ };
+
+ break;
+ }
+ case ISupportProperties p3:
+ {
+ foreach (var prop in p3.Properties)
+ result[Encoding.UTF8.GetString(prop.Key)] = new MessageAttributeValue
+ {
+ DataType = AttributeType,
+ StringValue = Encoding.UTF8.GetString(prop.Value)
+ };
+
+ break;
+ }
+ }
+
+ return result;
+ }
+ }
+}
\ No newline at end of file
diff --git a/src/OpenMessage.AWS.SQS/SqsConsumer.cs b/src/OpenMessage.AWS.SQS/SqsConsumer.cs
new file mode 100644
index 0000000..4185045
--- /dev/null
+++ b/src/OpenMessage.AWS.SQS/SqsConsumer.cs
@@ -0,0 +1,123 @@
+using Amazon;
+using Amazon.SQS;
+using Amazon.SQS.Model;
+using Microsoft.Extensions.Options;
+using OpenMessage.AWS.SQS.Configuration;
+using OpenMessage.Serialization;
+using System;
+using System.Collections.Generic;
+using System.Net;
+using System.Threading;
+using System.Threading.Tasks;
+using Microsoft.Extensions.Logging;
+
+namespace OpenMessage.AWS.SQS
+{
+ internal sealed class SqsConsumer : ISqsConsumer
+ {
+ private static readonly string MisconfiguredConsumerMessage = "Consumer has not been initialized. Please call Initialize with the configured consumer id.";
+ private readonly IDeserializationProvider _deserializationProvider;
+ private readonly ILogger> _logger;
+ private readonly List> _emptyList = new List>(0);
+
+ private readonly IOptionsMonitor _options;
+ private Func, Task>? _acknowledgementAction;
+ private IAmazonSQS? _client;
+ private SQSConsumerOptions? _currentConsumerOptions;
+
+ public SqsConsumer(IOptionsMonitor options, IDeserializationProvider deserializationProvider, ILogger> logger)
+ {
+ _options = options ?? throw new ArgumentNullException(nameof(options));
+ _deserializationProvider = deserializationProvider ?? throw new ArgumentNullException(nameof(deserializationProvider));
+ _logger = logger ?? throw new ArgumentNullException(nameof(logger));
+ }
+
+ public async Task>> ConsumeAsync(CancellationToken cancellationToken)
+ {
+ if (_currentConsumerOptions is null || _client is null)
+ Throw.Exception(MisconfiguredConsumerMessage);
+
+ var request = new ReceiveMessageRequest
+ {
+ QueueUrl = _currentConsumerOptions.QueueUrl,
+ MaxNumberOfMessages = _currentConsumerOptions.MaxNumberOfMessages,
+ WaitTimeSeconds = _currentConsumerOptions.WaitTimeSeconds,
+ AttributeNames = _currentConsumerOptions.SQSMessageAttributes,
+ MessageAttributeNames = _currentConsumerOptions.CustomMessageAttributes
+ };
+
+ if (_currentConsumerOptions.VisibilityTimeout.HasValue)
+ request.VisibilityTimeout = _currentConsumerOptions.VisibilityTimeout.Value;
+
+ var response = await _client.ReceiveMessageAsync(request, cancellationToken);
+
+ if (response is null || response.HttpStatusCode != HttpStatusCode.OK || response.Messages is null || response.Messages.Count == 0)
+ return _emptyList;
+
+ var result = new List>(response.Messages.Count);
+
+ foreach (var message in response.Messages)
+ {
+ var properties = new Dictionary(message.Attributes.Count + message.MessageAttributes.Count, StringComparer.Ordinal);
+
+ foreach (var attribute in message.Attributes)
+ properties[attribute.Key] = attribute.Value;
+
+ foreach (var msgAttribute in message.MessageAttributes)
+ properties[msgAttribute.Key] = msgAttribute.Value.StringValue;
+
+ var contentType = ContentTypes.Json;
+
+ if (message.MessageAttributes.TryGetValue(KnownProperties.ContentType, out var cta))
+ contentType = cta.StringValue;
+
+ var messageType = default(string);
+ if (message.MessageAttributes.TryGetValue(KnownProperties.ValueTypeName, out var vtn))
+ messageType = vtn.StringValue;
+
+ if (_acknowledgementAction is null)
+ Throw.Exception("Acknowledgement action cannot be null for SQS message");
+
+ result.Add(new SqsMessage(_acknowledgementAction)
+ {
+ Id = message.MessageId,
+ Properties = properties,
+ ReceiptHandle = message.ReceiptHandle,
+ QueueUrl = _currentConsumerOptions.QueueUrl,
+ Value = _deserializationProvider.From(message.Body, contentType, messageType ?? string.Empty)
+ });
+ }
+
+ return result;
+ }
+
+ public void Initialize(string consumerId, CancellationToken cancellationToken)
+ {
+ while (!cancellationToken.IsCancellationRequested)
+ try
+ {
+ _currentConsumerOptions = _options.Get(consumerId);
+
+ var config = new AmazonSQSConfig
+ {
+ ServiceURL = _currentConsumerOptions.ServiceURL
+ };
+
+ if (!string.IsNullOrEmpty(_currentConsumerOptions.RegionEndpoint))
+ config.RegionEndpoint = RegionEndpoint.GetBySystemName(_currentConsumerOptions.RegionEndpoint);
+
+ _currentConsumerOptions.AwsConsumerConfiguration?.Invoke(config);
+ _client = new AmazonSQSClient(config);
+ _acknowledgementAction = msg => _client?.DeleteMessageAsync(msg.QueueUrl, msg.ReceiptHandle, default) ?? Task.CompletedTask;
+
+ return;
+ }
+ catch (Exception e)
+ {
+ _logger.LogError(e, e.Message);
+ if (!cancellationToken.IsCancellationRequested)
+ Thread.Sleep(TimeSpan.FromSeconds(2));
+ }
+ }
+ }
+}
\ No newline at end of file
diff --git a/src/OpenMessage.AWS.SQS/SqsDispatcher.cs b/src/OpenMessage.AWS.SQS/SqsDispatcher.cs
new file mode 100644
index 0000000..0af9f97
--- /dev/null
+++ b/src/OpenMessage.AWS.SQS/SqsDispatcher.cs
@@ -0,0 +1,168 @@
+using Amazon;
+using Amazon.SQS;
+using Amazon.SQS.Model;
+using Microsoft.Extensions.Options;
+using OpenMessage.AWS.SQS.Configuration;
+using OpenMessage.Serialization;
+using System;
+using System.Collections.Generic;
+using System.Diagnostics;
+using System.Net;
+using System.Text;
+using System.Threading;
+using System.Threading.Tasks;
+using Microsoft.Extensions.Logging;
+
+namespace OpenMessage.AWS.SQS
+{
+ internal sealed class SqsDispatcher : DispatcherBase
+ {
+ //15min = 900sec is the maximum delay supported by sqs
+ private const int MaximumSqsDelaySeconds = 900;
+ private static readonly string AttributeType = "String";
+ private readonly AmazonSQSClient _client;
+ private readonly MessageAttributeValue _contentType;
+ private readonly string _queueUrl;
+ private readonly ISerializer _serializer;
+
+ public SqsDispatcher(IOptions> options, ISerializer serializer, ILogger> logger)
+ : base(logger)
+ {
+ _serializer = serializer ?? throw new ArgumentNullException(nameof(serializer));
+ var config = options?.Value ?? throw new ArgumentNullException(nameof(options));
+ _queueUrl = config.QueueUrl ?? throw new Exception("No queue url set for type: " + (TypeCache.FriendlyName ?? string.Empty));
+
+ var sqsConfig = new AmazonSQSConfig
+ {
+ ServiceURL = config.ServiceURL
+ };
+
+ if (!string.IsNullOrEmpty(config.RegionEndpoint))
+ sqsConfig.RegionEndpoint = RegionEndpoint.GetBySystemName(config.RegionEndpoint);
+
+ config.AwsDispatcherConfiguration?.Invoke(sqsConfig);
+
+ _client = new AmazonSQSClient(sqsConfig);
+
+ _contentType = new MessageAttributeValue
+ {
+ DataType = AttributeType,
+ StringValue = _serializer.ContentType
+ };
+ }
+
+ public override async Task DispatchAsync(Message message, CancellationToken cancellationToken)
+ {
+ LogDispatch(message);
+
+ if (message.Value is null)
+ Throw.Exception("Message value cannot be null");
+
+ var msg = _serializer.AsString(message.Value);
+ if (string.IsNullOrWhiteSpace(msg))
+ Throw.Exception("Message could not be serialized");
+
+ var request = new SendMessageRequest
+ {
+ MessageAttributes = GetMessageProperties(message),
+ DelaySeconds = DelaySeconds(message),
+ MessageBody = msg,
+ QueueUrl = _queueUrl
+ };
+
+#if NETCOREAPP3_1
+ var stopwatch = OpenMessageEventSource.Instance.ProcessMessageDispatchStart();
+#endif
+
+ try
+ {
+ var response = await _client.SendMessageAsync(request, cancellationToken);
+ if (response.HttpStatusCode != HttpStatusCode.OK)
+ ThrowExceptionFromHttpResponse(response);
+ }
+ finally
+ {
+#if NETCOREAPP3_1
+ if (stopwatch.HasValue)
+ OpenMessageEventSource.Instance.ProcessMessageDispatchStop(stopwatch.Value);
+#endif
+ }
+ }
+
+ private static int DelaySeconds(Message message)
+ {
+ if (message is ISupportSendDelay delay && delay.SendDelay > TimeSpan.Zero)
+ {
+ return Math.Min(MaximumSqsDelaySeconds, (int) delay.SendDelay.TotalSeconds);
+ }
+
+ return 0;
+ }
+
+ private Dictionary GetMessageProperties(Message message)
+ {
+ var result = new Dictionary
+ {
+ [KnownProperties.ContentType] = _contentType
+ };
+
+ if (!(message.Value is null))
+ result[KnownProperties.ValueTypeName] = new MessageAttributeValue
+ {
+ DataType = AttributeType,
+ StringValue = message.Value.GetType().AssemblyQualifiedName
+ };
+
+ if (Activity.Current is {})
+ result[KnownProperties.ActivityId] = new MessageAttributeValue
+ {
+ DataType = AttributeType,
+ StringValue = Activity.Current.Id
+ };
+
+ switch (message)
+ {
+ case ISupportProperties p:
+ {
+ foreach (var prop in p.Properties)
+ result[prop.Key] = new MessageAttributeValue
+ {
+ DataType = AttributeType,
+ StringValue = prop.Value
+ };
+
+ break;
+ }
+ case ISupportProperties p2:
+ {
+ foreach (var prop in p2.Properties)
+ result[prop.Key] = new MessageAttributeValue
+ {
+ DataType = AttributeType,
+ StringValue = Encoding.UTF8.GetString(prop.Value)
+ };
+
+ break;
+ }
+ case ISupportProperties p3:
+ {
+ foreach (var prop in p3.Properties)
+ result[Encoding.UTF8.GetString(prop.Key)] = new MessageAttributeValue
+ {
+ DataType = AttributeType,
+ StringValue = Encoding.UTF8.GetString(prop.Value)
+ };
+
+ break;
+ }
+ }
+
+ return result;
+ }
+
+ private void ThrowExceptionFromHttpResponse(SendMessageResponse response)
+ {
+ throw new Exception($"Failed to send the message to SQS. Type: '{TypeCache.FriendlyName}' Queue Url: '{_queueUrl ?? ""}' Status Code: '{response.HttpStatusCode}'.");
+ }
+ }
+}
\ No newline at end of file
diff --git a/src/OpenMessage.AWS.SQS/SqsDispatcherService.cs b/src/OpenMessage.AWS.SQS/SqsDispatcherService.cs
new file mode 100644
index 0000000..2466875
--- /dev/null
+++ b/src/OpenMessage.AWS.SQS/SqsDispatcherService.cs
@@ -0,0 +1,155 @@
+using System;
+using System.Collections.Generic;
+using System.Threading;
+using System.Threading.Channels;
+using System.Threading.Tasks;
+using Amazon;
+using Amazon.SQS;
+using Amazon.SQS.Model;
+using Microsoft.Extensions.Hosting;
+using Microsoft.Extensions.Logging;
+
+namespace OpenMessage.AWS.SQS
+{
+ internal sealed class SqsDispatcherService : BackgroundService
+ {
+ private readonly ChannelReader _messageReader;
+ private readonly ILogger _logger;
+ private readonly Dictionary _clients = new Dictionary(StringComparer.Ordinal);
+
+ private readonly Dictionary> _channels = new Dictionary>(StringComparer.Ordinal);
+ private readonly Dictionary _channelReaderTasks = new Dictionary();
+
+ public SqsDispatcherService(ChannelReader messageReader, ILogger logger)
+ {
+ _messageReader = messageReader;
+ _logger = logger;
+ }
+
+ protected override async Task ExecuteAsync(CancellationToken cancellationToken)
+ {
+ // Without this line we can encounter a blocking issue such as: https://github.com/dotnet/extensions/issues/2816
+ await Task.Yield();
+
+ while (!cancellationToken.IsCancellationRequested)
+ {
+ try
+ {
+ if (_messageReader.TryRead(out var msg))
+ {
+ if (msg.QueueUrl is null)
+ {
+ msg.Exception(new Exception("Cannot process message without a destination queue url"));
+ continue;
+ }
+
+ if (msg.Message is null)
+ {
+ msg.Exception(new Exception("Cannot process message without a message to send"));
+ continue;
+ }
+
+ if (!_channels.TryGetValue(msg.LookupKey, out var channel))
+ {
+ _channels[msg.LookupKey] = channel = Channel.CreateUnbounded(new UnboundedChannelOptions
+ {
+ SingleReader = true,
+ SingleWriter = true
+ });
+ _channelReaderTasks[msg.LookupKey] = Task.Run(async () =>
+ {
+ var messagesToSend = new List(10);
+ while (!cancellationToken.IsCancellationRequested)
+ {
+ try
+ {
+ if (messagesToSend is null)
+ continue;
+
+ var readMessage = channel.Reader.TryRead(out var msg);
+ if (readMessage)
+ messagesToSend.Add(msg);
+
+ if (messagesToSend.Count == 10 || messagesToSend.Count > 0 && !readMessage)
+ {
+ var messages = Interlocked.Exchange(ref messagesToSend, new List(10));
+ if (messages is {})
+ _ = ProcessMessages(messages);
+ }
+ else if (!cancellationToken.IsCancellationRequested && !readMessage)
+ await channel.Reader.WaitToReadAsync(cancellationToken).ConfigureAwait(false);
+ }
+ catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested)
+ {
+ if (messagesToSend is {})
+ foreach (var msg in messagesToSend)
+ msg.Cancel(cancellationToken);
+ }
+ catch (Exception ex) when (!cancellationToken.IsCancellationRequested)
+ {
+ if (messagesToSend is {})
+ foreach (var msg in messagesToSend)
+ msg.Exception(ex);
+ }
+ }
+ });
+ }
+
+ await channel.Writer.WriteAsync(msg, cancellationToken).ConfigureAwait(false);
+ }
+ else
+ await _messageReader.WaitToReadAsync(cancellationToken).ConfigureAwait(false);
+ }
+ catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested) { }
+ catch (Exception e)
+ {
+ _logger.LogError(e, e.Message);
+ }
+ }
+ }
+
+ private async Task ProcessMessages(List messages)
+ {
+ if (messages.Count == 0)
+ return;
+
+ var firstMessage = messages[0];
+
+ try
+ {
+ var entries = new List(messages.Count);
+ foreach(var msg in messages)
+ if (msg.Message is {})
+ entries.Add(msg.Message);
+
+ var request = new SendMessageBatchRequest(firstMessage.QueueUrl, entries);
+ if (!_clients.TryGetValue(firstMessage.LookupKey, out var client))
+ {
+ var config = new AmazonSQSConfig
+ {
+ ServiceURL = firstMessage.ServiceUrl
+ };
+
+ if (firstMessage.RegionEndpoint != null)
+ config.RegionEndpoint = RegionEndpoint.GetBySystemName(firstMessage.RegionEndpoint);
+
+ _clients[firstMessage.LookupKey] = client = new AmazonSQSClient(config);
+ }
+
+ var response = await client.SendMessageBatchAsync(request);
+
+ // TODO :: we should be able to complete certain messages here
+ if (response.Failed.Count > 0)
+ Throw.Exception("One or more messages failed to send");
+
+ foreach (var msg in messages)
+ msg.Complete();
+ }
+ catch (Exception e)
+ {
+ foreach (var msg in messages)
+ msg.Exception(e);
+ }
+ }
+ }
+}
\ No newline at end of file
diff --git a/src/OpenMessage.AWS.SQS/SqsMessage.cs b/src/OpenMessage.AWS.SQS/SqsMessage.cs
new file mode 100644
index 0000000..51e6c91
--- /dev/null
+++ b/src/OpenMessage.AWS.SQS/SqsMessage.cs
@@ -0,0 +1,55 @@
+using System;
+using System.Collections.Generic;
+using System.Diagnostics.CodeAnalysis;
+using System.Linq;
+using System.Threading.Tasks;
+
+namespace OpenMessage.AWS.SQS
+{
+ internal sealed class SqsMessage : Message, ISupportAcknowledgement, ISupportIdentification, ISupportProperties
+ {
+ private readonly Func, Task> _acknowledgementAction;
+#if NETCOREAPP3_1
+ private OpenMessageEventSource.ValueStopwatch? _stopwatch;
+#endif
+
+ public AcknowledgementState AcknowledgementState { get; private set; }
+ [MaybeNull, AllowNull] public string Id { get; internal set; } = default;
+ public IEnumerable> Properties { get; internal set; } = Enumerable.Empty>();
+ internal string? ReceiptHandle { get; set; }
+ internal string? QueueUrl { get; set; }
+
+ public SqsMessage(Func, Task> acknowledgementAction)
+ {
+ _acknowledgementAction = acknowledgementAction;
+#if NETCOREAPP3_1
+ _stopwatch = OpenMessageEventSource.Instance.ProcessMessageStart();
+#endif
+ }
+
+ public async Task AcknowledgeAsync(bool positivelyAcknowledge = true, Exception? exception = null)
+ {
+ try
+ {
+ if (!positivelyAcknowledge)
+ {
+ AcknowledgementState = AcknowledgementState.NegativelyAcknowledged;
+ return;
+ }
+
+ await _acknowledgementAction(this);
+ AcknowledgementState = AcknowledgementState.Acknowledged;
+ }
+ finally
+ {
+#if NETCOREAPP3_1
+ if (_stopwatch.HasValue)
+ {
+ OpenMessageEventSource.Instance.ProcessMessageStop(_stopwatch.Value);
+ _stopwatch = null;
+ }
+#endif
+ }
+ }
+ }
+}
\ No newline at end of file
diff --git a/src/OpenMessage.AWS.SQS/SqsMessagePump.cs b/src/OpenMessage.AWS.SQS/SqsMessagePump.cs
new file mode 100644
index 0000000..c94a8f9
--- /dev/null
+++ b/src/OpenMessage.AWS.SQS/SqsMessagePump.cs
@@ -0,0 +1,177 @@
+using Amazon.SQS;
+using Amazon.SQS.Model;
+using Microsoft.Extensions.Logging;
+using OpenMessage.Pipelines.Pumps;
+using System;
+using System.Collections.Generic;
+using System.Threading;
+using System.Threading.Channels;
+using System.Threading.Tasks;
+using Microsoft.Extensions.DependencyInjection;
+using Microsoft.Extensions.Options;
+using OpenMessage.AWS.SQS.Configuration;
+
+namespace OpenMessage.AWS.SQS
+{
+ internal sealed class SqsMessagePump : MessagePump
+ {
+ private readonly string _consumerId;
+ private readonly IQueueMonitor _queueMonitor;
+ private readonly IOptionsMonitor _sqsOptions;
+ private readonly IServiceProvider _services;
+ private readonly CancellationTokenSource _cancellationTokenSource = new CancellationTokenSource();
+ private Task? _consumerCheckTask;
+ private List<(Task tsk, CancellationTokenSource cts)> _consumers = new List<(Task tsk, CancellationTokenSource cts)>();
+
+ public SqsMessagePump(ChannelWriter> channelWriter,
+ ILogger> logger,
+ IQueueMonitor queueMonitor,
+ IServiceScopeFactory serviceScopeFactory,
+ IOptionsMonitor sqsOptions,
+ string consumerId)
+ : base(channelWriter, logger)
+ {
+ _queueMonitor = queueMonitor ?? throw new ArgumentNullException(nameof(queueMonitor));
+ _sqsOptions = sqsOptions ?? throw new ArgumentNullException(nameof(sqsOptions));
+ if (serviceScopeFactory == null)
+ throw new ArgumentNullException(nameof(serviceScopeFactory));
+ _services = serviceScopeFactory.CreateScope().ServiceProvider;
+ _consumerId = consumerId ?? throw new ArgumentNullException(nameof(consumerId));
+ }
+
+ public override Task StartAsync(CancellationToken cancellationToken)
+ {
+ _consumerCheckTask = Task.Run(async () =>
+ {
+ await Task.Delay(100);
+ var token = _cancellationTokenSource.Token;
+ while (!token.IsCancellationRequested)
+ {
+ try
+ {
+ // This is hacky POC
+ var count = await _queueMonitor.GetQueueCountAsync(_consumerId, token);
+
+ lock (_consumers)
+ {
+ const int targetCountPerConsumer = 50;
+ var options = _sqsOptions.Get(_consumerId);
+ if (_consumers.Count == 0)
+ {
+ // This is the startup essentially
+ var newConsumerCount = Math.Min(count == 0 ? options.MinimumConsumerCount : Math.Max(count / targetCountPerConsumer, options.MinimumConsumerCount), options.MaximumConsumerCount);
+ for (var i = 0; i < newConsumerCount; i++)
+ {
+ InitialiseConsumer(count, cancellationToken);
+ }
+ }
+ else if (count >= 0)
+ {
+ var maxCapacity = _consumers.Count * targetCountPerConsumer;
+ if (count > (maxCapacity + targetCountPerConsumer * 3) && _consumers.Count < options.MaximumConsumerCount)
+ {
+ InitialiseConsumer(count, cancellationToken);
+ }
+ else if (count < (maxCapacity / 2) && _consumers.Count - 1 >= options.MinimumConsumerCount)
+ {
+ RemoveConsumer();
+ }
+ }
+ }
+ }
+ catch (OperationCanceledException) { }
+ catch (Exception ex)
+ {
+ Logger.LogError(ex, $"Error occurred while running '{TypeCache.FriendlyName}' {nameof(SqsMessagePump)}. {ex.Message}");
+ }
+ finally
+ {
+ if (!cancellationToken.IsCancellationRequested)
+ await Task.Delay(5000, cancellationToken);
+ }
+ }
+ });
+
+ return base.StartAsync(cancellationToken);
+ }
+
+ public override Task StopAsync(CancellationToken cancellationToken)
+ {
+ _cancellationTokenSource.Cancel();
+ return base.StopAsync(cancellationToken);
+ }
+
+ protected override async Task ExecuteAsync(CancellationToken cancellationToken)
+ {
+ await Task.Yield();
+ }
+
+ protected override Task ConsumeAsync(CancellationToken cancellationToken) => Task.CompletedTask;
+
+ private async Task HandleMissingQueueAsync(TException exception, CancellationToken cancellationToken)
+ where TException : Exception
+ {
+ Logger.LogError(exception, $"Queue for type '{TypeCache.FriendlyName}' does not exist. Retrying in 15 seconds.");
+ await Task.Delay(TimeSpan.FromSeconds(15), cancellationToken);
+ }
+
+ private void InitialiseConsumer(int queueLength, CancellationToken cancellationToken)
+ {
+ lock (_consumers)
+ {
+ var consumer = _services.GetRequiredService>();
+ consumer.Initialize(_consumerId, cancellationToken);
+ var cts = new CancellationTokenSource();
+ var ct = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, cts.Token);
+ var consumerTask = RunConsumerTask(consumer, ct.Token);
+ _consumers.Add((consumerTask, cts));
+ Logger.LogInformation("Initialized new '{0}' consumer. Current consumer count: {1}. Queue Length: {2}", TypeCache.FriendlyName, _consumers.Count, queueLength);
+ }
+ }
+
+ private void RemoveConsumer()
+ {
+ lock (_consumers)
+ {
+ if (_consumers.Count == 0)
+ return;
+
+ var index = _consumers.Count - 1;
+ var tskGroup = _consumers[index];
+ _consumers.RemoveAt(index);
+ tskGroup.cts.Cancel(false);
+ Logger.LogInformation("Removed '{0}' consumer. Current consumer count: {1}", TypeCache.FriendlyName, _consumers.Count);
+ }
+ }
+
+ private async Task RunConsumerTask(ISqsConsumer consumer, CancellationToken cancellationToken)
+ {
+ var writer = ChannelWriter;
+
+ while (!cancellationToken.IsCancellationRequested)
+ {
+ try
+ {
+ var messages = await consumer.ConsumeAsync(cancellationToken);
+ foreach (var message in messages)
+ if (!writer.TryWrite(message))
+ await writer.WriteAsync(message, cancellationToken);
+ }
+ catch (QueueDoesNotExistException queueException)
+ {
+ await HandleMissingQueueAsync(queueException, cancellationToken);
+ }
+ catch (AmazonSQSException sqsException) when (sqsException.ErrorCode == "AWS.SimpleQueueService.NonExistentQueue")
+ {
+ await HandleMissingQueueAsync(sqsException, cancellationToken);
+ }
+ catch (OperationCanceledException) { }
+ catch (Exception e)
+ {
+ Logger.LogError(e, e.Message);
+ throw;
+ }
+ }
+ }
+ }
+}
\ No newline at end of file
diff --git a/src/OpenMessage.AWS.SQS/SqsServiceExtensions.cs b/src/OpenMessage.AWS.SQS/SqsServiceExtensions.cs
new file mode 100644
index 0000000..6742154
--- /dev/null
+++ b/src/OpenMessage.AWS.SQS/SqsServiceExtensions.cs
@@ -0,0 +1,27 @@
+using Microsoft.Extensions.DependencyInjection;
+using OpenMessage.AWS.SQS.Configuration;
+
+namespace OpenMessage.AWS.SQS
+{
+ ///
+ /// SQS Extensions
+ ///
+ public static class SqsServiceExtensions
+ {
+ ///
+ /// Returns an SQS consumer builder
+ ///
+ /// The host the consumer belongs to
+ /// The type of message to consume
+ /// An SQS consumer builder
+ public static ISqsConsumerBuilder ConfigureSqsConsumer(this IMessagingBuilder messagingBuilder) => new SqsConsumerBuilder(messagingBuilder);
+
+ ///
+ /// Returns an SQS dispatcher builder
+ ///
+ /// The host the dispatcher belongs to
+ /// The type of message to dispatch
+ /// An SQS dispatcher builder
+ public static ISqsDispatcherBuilder ConfigureSqsDispatcher(this IMessagingBuilder messagingBuilder) => new SqsDispatcherBuilder(messagingBuilder);
+ }
+}
\ No newline at end of file
diff --git a/src/OpenMessage.Apache.Kafka/Configuration/IKafkaConsumerBuilder.cs b/src/OpenMessage.Apache.Kafka/Configuration/IKafkaConsumerBuilder.cs
new file mode 100644
index 0000000..2784d41
--- /dev/null
+++ b/src/OpenMessage.Apache.Kafka/Configuration/IKafkaConsumerBuilder.cs
@@ -0,0 +1,32 @@
+using Microsoft.Extensions.Hosting;
+using OpenMessage.Builders;
+using System;
+
+namespace OpenMessage.Apache.Kafka.Configuration
+{
+ ///
+ ///
+ public interface IKafkaConsumerBuilder : IBuilder
+ {
+ ///
+ /// Configures the consumer with the specified options
+ ///
+ /// The configuration to use
+ /// The modified consumer builder
+ IKafkaConsumerBuilder FromConfiguration(Action configuration);
+
+ ///
+ /// Configures the consumer with the specified options
+ ///
+ /// The configuration to use
+ /// The modified consumer builder
+ IKafkaConsumerBuilder FromConfiguration(Action configuration);
+
+ ///
+ /// Configures the consumer to consume from the specified topic
+ ///
+ /// The name of the topic to consume from
+ /// The modified consumer builder
+ IKafkaConsumerBuilder FromTopic(string topicName);
+ }
+}
\ No newline at end of file
diff --git a/src/OpenMessage.Apache.Kafka/Configuration/KafkaConsumerBuilder.cs b/src/OpenMessage.Apache.Kafka/Configuration/KafkaConsumerBuilder.cs
new file mode 100644
index 0000000..d8e4c65
--- /dev/null
+++ b/src/OpenMessage.Apache.Kafka/Configuration/KafkaConsumerBuilder.cs
@@ -0,0 +1,66 @@
+using Microsoft.Extensions.DependencyInjection;
+using Microsoft.Extensions.DependencyInjection.Extensions;
+using Microsoft.Extensions.Hosting;
+using Microsoft.Extensions.Options;
+using OpenMessage.Apache.Kafka.HostedServices;
+using OpenMessage.Builders;
+using System;
+
+namespace OpenMessage.Apache.Kafka.Configuration
+{
+ internal sealed class KafkaConsumerBuilder : Builder, IKafkaConsumerBuilder
+ {
+ private Action? _options;
+
+ private string? _topicName = TypeCache.FriendlyName?.ToLowerInvariant()
+ .Replace("<", "_")
+ .Replace(">", "_");
+
+ public KafkaConsumerBuilder(IMessagingBuilder hostBuilder)
+ : base(hostBuilder) { }
+
+ public override void Build()
+ {
+ var appName = HostBuilder.Context.HostingEnvironment.ApplicationName;
+ HostBuilder.Services.AddTransient, KafkaConsumer>();
+ HostBuilder.Services.AddConsumerService>(ConsumerId);
+
+ HostBuilder.Services.TryAddConsumerService()
+ .TryAddSingleton, KafkaOptionsPostConfigurationProvider>();
+ HostBuilder.TryConfigureDefaultPipeline();
+
+ ConfigureOptions((cntx, o) =>
+ {
+ o.TopicName = _topicName;
+
+ _options?.Invoke(cntx, o);
+ });
+
+ HostBuilder.Services.PostConfigure(ConsumerId, options =>
+ {
+ if (!options.KafkaConfiguration.ContainsKey("group.id"))
+ options.KafkaConfiguration["group.id"] = appName;
+ });
+ }
+
+ public IKafkaConsumerBuilder FromConfiguration(Action configuration)
+ {
+ return FromConfiguration((context, options) => configuration(options));
+ }
+
+ public IKafkaConsumerBuilder FromConfiguration(Action