diff --git a/.github/workflows/elixir.yml b/.github/workflows/elixir.yml index 907a251..d11b17e 100644 --- a/.github/workflows/elixir.yml +++ b/.github/workflows/elixir.yml @@ -10,24 +10,28 @@ permissions: contents: read jobs: - build: - - name: Build and test - runs-on: ubuntu-20 - + test: + name: Build and Test + runs-on: ubuntu-20.04 + strategy: + # Don't crash if the other workers fail (i.e. 27+1.17 currently) + fail-fast: false + matrix: + otp: ['25', '26', '27'] + elixir: ['1.15', '1.16', '1.17'] steps: - - uses: actions/checkout@v3 + - uses: actions/checkout@v4 - name: Set up Elixir - uses: erlef/setup-beam@988e02bfe678367a02564f65ca2e37726dc0268f + uses: erlef/setup-beam@v1 with: - elixir-version: '1.13.3' # Define the elixir version [required] - otp-version: '24.1' # Define the OTP version [required] + otp-version: ${{matrix.otp}} # Define the OTP version [required] + elixir-version: ${{matrix.elixir}} # Define the elixir version [required] - name: Restore dependencies cache uses: actions/cache@v3 with: path: deps - key: ${{ runner.os }}-mix-${{ hashFiles('**/mix.lock') }} - restore-keys: ${{ runner.os }}-mix- + key: otp-${{matrix.otp}}-ex-${{matrix.elixir}}-${{ runner.os }}-mix-${{ hashFiles('**/mix.lock') }} + restore-keys: otp-${{matrix.otp}}-ex-${{matrix.elixir}}-${{ runner.os }}-mix- - name: Install dependencies run: mix deps.get - name: Run tests diff --git a/CHANGELOG.md b/CHANGELOG.md index 4749436..4242ebd 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,20 @@ +# 0.8.0 + +## Breaking Changes + +* Changed underlying XML encoding and decoder library to `saxy`, while encoding should remain the same, those handling XML responses must change their code to accept Saxy's "simple" format. + * If you use the `normalize_xml` option, keys are no longer atoms but strings. +* Removed `:jsonapi` response body type, it will just report as `:json` instead +* Remove `mode` option, do not use it the respective connection manager will set the mode for itself +* ContentClient will now return `:unaccepted` and `{:malformed, term}`` in addition to :unk to differentiate response bodies. + * `:unaccepted` will be returned when the response `content-type` was not acceptable from the request's accept header + * `:malformed` will be returned whenever the response body could not be parsed (or the `content-type` header was malformed) + * `:unk` will be returned for all other cases + +## Changes + +* Pepper.HTTP.ConnectionManager.PooledConnection is always in active mode + # 0.7.0 * `Pepper.HTTP.Client` and `Pepper.HTTP.ContentClient` can now accept a URI struct in place of a URL string diff --git a/README.md b/README.md index e6eae9f..a9f4556 100644 --- a/README.md +++ b/README.md @@ -19,3 +19,6 @@ resp.body # => "Hello, World" * [Clients](docs/clients.md) * [Connection Managers - Pools](docs/connection_managers.md) * [Response Body Handlers](docs/response_body_handlers.md) +* [Body Decoders](docs/body_decoders.md) +* [Body Encoders](docs/body_encoders.md) +* [Additional Headers](docs/additional_headers.md) diff --git a/docs/additional_headers.md b/docs/additional_headers.md new file mode 100644 index 0000000..8cd2da4 --- /dev/null +++ b/docs/additional_headers.md @@ -0,0 +1,37 @@ +## Additional Headers + +Sometimes it's nice to have a common module that decorates the request with some additional headers, for example adding authorization based on the request options. + +By default Pepper provides an Authorization module which + +Starting at `pepper_http >= 0.8.0`, ContentClient additional header modules can be added: + +```elixir +config :pepper_http, + additional_header_modules: [ + MyAdditionalHeaderModule + ] +``` + +Additional header modules are expected to define a `call/3` function which returns a tuple with the headers and options respectively: + +```elixir +defmodule MyAdditionalHeaderModule do + def call(_body, headers, options) do + # Note the body is also provided for reference, in case the header module needs to know + # the body ahead of time, the body is expected to be in its encoded form but may not be + # a valid binary + {headers, options} + end +end +``` + +As with any configurable property pepper also provides a `base_additional_headers_modules`: + +```elixir +config :pepper_http, + # by default the authorization module is provided + base_additional_header_modules: [ + Pepper.HTTP.ContentClient.Headers.Authorization, + ] +``` diff --git a/docs/body_decoders.md b/docs/body_decoders.md new file mode 100644 index 0000000..2b41e6b --- /dev/null +++ b/docs/body_decoders.md @@ -0,0 +1,50 @@ +## Body Decoders + +__Accept__ `Accept` + +__Content__ `Content-Type` + +Starting at `pepper_http >= 0.8.0`, ContentClient decoders can be set using the config: + +```elixir +config :pepper_http, + # Before you can use your decoder, pepper needs to know how to translate content-type headers into internal type names. + # The decoder_content_types (and base_decoder_content_types) config provides that information. + # Note the the decoder is expected to return the same type name or similar. + decoder_content_types: [ + {{"application", "x-my-type"}, :my_type} + ], + # Once you have your type name, you can finally decode it using the specified decoder module + # The module is expected to return {type_name::atom(), any()} + # This is the third element returned in the response tuple: + # {:ok, Pepper.HTTP.Response.t(), {type_name::atom(), any()}} + # Example: + # {:ok, _resp, {:json, doc}} + decoders: [ + my_type: MyType.Decoder + ] +``` + +As one may have noticed, `base_decoder_content_types` was mentioned, this config is the _default_ for pepper, in addition to `base_decoders`: + +```elixir +config :pepper_http + base_decoder_content_types: [ + {{"application", "json"}, :json}, + {{"application", "vnd.api+json"}, :json}, + {{"application", "xml"}, :xml}, + {{"application", "vnd.api+xml"}, :xml}, + {{"text", "xml"}, :xml}, + {{"text", "plain"}, :text}, + {{"application", "csv"}, :csv}, + {{"text", "csv"}, :csv}, + ], + base_decoders: [ + csv: Pepper.HTTP.BodyDecoder.CSV, + json: Pepper.HTTP.BodyDecoder.JSON, + xml: Pepper.HTTP.BodyDecoder.XML, + text: Pepper.HTTP.BodyDecoder.Text, + ] +``` + +Normally there is no need to overwrite that config, but it is provided just in case, otherwise you are expected to use `:decoder_content_types` and `:decoders` which will be added to the base. diff --git a/docs/body_decompressors.md b/docs/body_decompressors.md new file mode 100644 index 0000000..f5dd86c --- /dev/null +++ b/docs/body_decompressors.md @@ -0,0 +1,37 @@ +## Body Decompressors + +__Accept__ `Accept-Encoding` + +__Content__ `Content-Encoding` + +Starting at `pepper_http >= 0.8.0`, ContentClient decompressors can be set using the config: + +```elixir +config :pepper_http, + # As with most custom modules, content-encoding types must be translated to an internal name + decompressor_types: [ + {"br", :br}, + ], + # Once the name is available, the decompressor module can be specified + decompressors: [ + br: MyBrDecompressor + ] +``` + +Pepper provides modules for Identity, Gzip and Deflate out of the box + +```elixir +config :pepper_http + base_decompressor_types: [ + {"identity", :identity}, + {"deflate", :deflate}, + {"gzip", :gzip}, + ], + base_decompressors: [ + identity: Pepper.HTTP.BodyDecompressor.Identity, + deflate: Pepper.HTTP.BodyDecompressor.Deflate, + gzip: Pepper.HTTP.BodyDecompressor.Gzip, + ] +``` + +Normally there is no need to overwrite that config, but it is provided just in case, otherwise you are expected to use `:decompressor_types` and `:decompressors` which will be added to the base. diff --git a/docs/body_encoders.md b/docs/body_encoders.md new file mode 100644 index 0000000..d72b8a6 --- /dev/null +++ b/docs/body_encoders.md @@ -0,0 +1,30 @@ +## Body Encoders + +Starting at `pepper_http >= 0.8.0`, ContentClient encoders can be set using the config: + +```elixir +config :pepper_http, + # Unlike the decodes, the encoders use the type name from the body parameter during the request + # so there is no need to transform content types + encoders: [ + my_type: MyType.Encoder, + ] +``` + +Same as with the body decoders, encoders also have a `base_encoders` config: + +```elixir +config :pepper_http + base_encoders: [ + csv: Pepper.HTTP.BodyEncoder.CSV, + form: Pepper.HTTP.BodyEncoder.Form, + form_stream: Pepper.HTTP.BodyEncoder.FormStream, + form_urlencoded: Pepper.HTTP.BodyEncoder.FormUrlencoded, + json: Pepper.HTTP.BodyEncoder.JSON, + stream: Pepper.HTTP.BodyEncoder.Stream, + text: Pepper.HTTP.BodyEncoder.Text, + xml: Pepper.HTTP.BodyEncoder.XML, + ] +``` + +Normally there is no need to overwrite that config, but it is provided just in case, otherwise you are expected to use `:encoders` which will be added to the base. diff --git a/docs/clients.md b/docs/clients.md index 2294572..5fab6f2 100644 --- a/docs/clients.md +++ b/docs/clients.md @@ -85,7 +85,7 @@ case result do # Content-Type: application/xml # Content-Type: text/xml {:xml, doc} -> - # SweetXml will be used to parse the blob and returns the document + # `Saxy.SimpleForm.parse_string/1` will be used to parse the blob and returns the document :ok {:xmldoc, doc} -> diff --git a/lib/pepper/http/body_decoder.ex b/lib/pepper/http/body_decoder.ex new file mode 100644 index 0000000..cf4ad1c --- /dev/null +++ b/lib/pepper/http/body_decoder.ex @@ -0,0 +1,122 @@ +defmodule Pepper.HTTP.BodyDecoder do + alias Pepper.HTTP.Response + alias Pepper.HTTP.Proplist + + def decode_body(%Response{body: body} = response, options) do + accepted_content_type = determine_accepted_content_type(response, options) + + type = + case accepted_content_type do + nil -> + # no content-type + :unk + + :unaccepted -> + # mismatched content-type and accept, return unk(nown) + :unaccepted + + accepted_content_type -> + case Plug.Conn.Utils.content_type(accepted_content_type) do + {:ok, type, subtype, _params} -> + content_type_to_type(type, subtype) + + :error -> + {:malformed, :content_type} + end + end + + decode_body_by_type(type, body, options) + end + + defp determine_accepted_content_type(%Response{ + request: %{ + headers: req_headers, + }, + headers: res_headers, + }, _options) do + # retrieve the original request accept header, this will be used to "allow" the content-type + # to be parsed + accept = Proplist.get(req_headers, "accept") + # retrieve the response content-type + content_type = Proplist.get(res_headers, "content-type") + + # ensure that we only parse content for the given accept header to avoid parsing bodies we + # didn't want or even expect + case accept do + nil -> + # no accept header was given, expect to parse anything, this is dangerous + # but allows the default behaviour to continue + # you should ALWAYS specify an accept header + content_type + + "*/*" -> + content_type + + _ -> + if content_type do + # a content-type was returned, try negotiate with the accept header and content-type + case :accept_header.negotiate(accept, [content_type]) do + :undefined -> + # mismatch accept and content-type, refuse to parse the content and return + # nil for the accepted_content_type + :unaccepted + + name when is_binary(name) -> + # return the matched content_type + name + end + else + # there was no content-type, return nil + nil + end + end + end + + decoder_content_types = + Application.compile_env(:pepper_http, :base_decoder_content_types, [ + {{"application", "json"}, :json}, + {{"application", "vnd.api+json"}, :json}, + {{"application", "xml"}, :xml}, + {{"application", "vnd.api+xml"}, :xml}, + {{"text", "xml"}, :xml}, + {{"text", "plain"}, :text}, + {{"application", "csv"}, :csv}, + {{"text", "csv"}, :csv}, + ]) ++ Application.compile_env(:pepper_http, :decoder_content_types, []) + + Enum.each(decoder_content_types, fn {{type, subtype}, value} -> + def content_type_to_type(unquote(type), unquote(subtype)) do + unquote(value) + end + end) + + def content_type_to_type(_type, _subtype) do + :unk + end + + decoders = + Application.compile_env(:pepper_http, :base_decoders, [ + csv: Pepper.HTTP.BodyDecoder.CSV, + json: Pepper.HTTP.BodyDecoder.JSON, + xml: Pepper.HTTP.BodyDecoder.XML, + text: Pepper.HTTP.BodyDecoder.Text, + ]) ++ Application.compile_env(:pepper_http, :decoders, []) + + Enum.each(decoders, fn {type, module} -> + def decode_body_by_type(unquote(type), body, options) do + unquote(module).decode_body(body, options) + end + end) + + def decode_body_by_type(:unaccepted, body, _options) do + {:unaccepted, body} + end + + def decode_body_by_type(:unk, body, _options) do + {:unk, body} + end + + def decode_body_by_type({:malformed, _} = res, body, _options) do + {res, body} + end +end diff --git a/lib/pepper/http/body_decoder/csv.ex b/lib/pepper/http/body_decoder/csv.ex new file mode 100644 index 0000000..7398a61 --- /dev/null +++ b/lib/pepper/http/body_decoder/csv.ex @@ -0,0 +1,5 @@ +defmodule Pepper.HTTP.BodyDecoder.CSV do + def decode_body(body, _options) do + {:csv, body} + end +end diff --git a/lib/pepper/http/body_decoder/json.ex b/lib/pepper/http/body_decoder/json.ex new file mode 100644 index 0000000..0344cb3 --- /dev/null +++ b/lib/pepper/http/body_decoder/json.ex @@ -0,0 +1,11 @@ +defmodule Pepper.HTTP.BodyDecoder.JSON do + def decode_body(body, _options) do + case Jason.decode(body) do + {:ok, doc} -> + {:json, doc} + + {:error, _} -> + {{:malformed, :json}, body} + end + end +end diff --git a/lib/pepper/http/body_decoder/text.ex b/lib/pepper/http/body_decoder/text.ex new file mode 100644 index 0000000..a690e90 --- /dev/null +++ b/lib/pepper/http/body_decoder/text.ex @@ -0,0 +1,5 @@ +defmodule Pepper.HTTP.BodyDecoder.Text do + def decode_body(body, _options) do + {:text, body} + end +end diff --git a/lib/pepper/http/body_decoder/xml.ex b/lib/pepper/http/body_decoder/xml.ex new file mode 100644 index 0000000..20e4993 --- /dev/null +++ b/lib/pepper/http/body_decoder/xml.ex @@ -0,0 +1,13 @@ +defmodule Pepper.HTTP.BodyDecoder.XML do + import Pepper.HTTP.Utils + + def decode_body(body, options) do + # Parse XML + {:ok, doc} = Saxy.SimpleForm.parse_string(body) + if options[:normalize_xml] do + {:xmldoc, handle_xml_body(doc)} + else + {:xml, doc} + end + end +end diff --git a/lib/pepper/http/body_decompressor.ex b/lib/pepper/http/body_decompressor.ex new file mode 100644 index 0000000..a0c8605 --- /dev/null +++ b/lib/pepper/http/body_decompressor.ex @@ -0,0 +1,102 @@ +defmodule Pepper.HTTP.BodyDecompressor do + alias Pepper.HTTP.Response + alias Pepper.HTTP.Proplist + + def decompress_response(%Response{} = response, options) do + accepted_content_encoding = determine_accepted_content_encoding(response, options) + + encoding = + case accepted_content_encoding do + nil -> + # no content-encoding, assume identity + :identity + + value -> + content_encoding_to_encoding(value) + end + + + decode_body_by_encoding(encoding, response, options) + end + + defp determine_accepted_content_encoding(%Response{ + request: %{ + headers: req_headers, + }, + headers: res_headers, + }, _options) do + accept_encodings = case Proplist.get(req_headers, "accept-encoding") do + nil -> + :any + + value -> + Enum.map(:accept_encoding_header.parse(value), fn {:content_coding, name, _, _} -> + to_string(name) + end) + end + + content_encoding = + case Proplist.get(res_headers, "content-encoding") do + nil -> + "identity" + + value -> + [{:content_coding, name, _, _}] = :accept_encoding_header.parse(value) + to_string(name) + end + + # ensure that we only parse content for the given accept-encoding header to avoid parsing bodies we + # didn't want or even expect + if accept_encodings == :any or content_encoding in accept_encodings or "*" in accept_encodings do + content_encoding + else + {:unaccepted, content_encoding} + end + end + + decoder_content_types = + Application.compile_env(:pepper_http, :base_decompressor_types, [ + {"identity", :identity}, + {"deflate", :deflate}, + {"gzip", :gzip}, + ]) ++ Application.compile_env(:pepper_http, :decompressor_types, []) + + Enum.each(decoder_content_types, fn {encoding, value} -> + def content_encoding_to_encoding(unquote(encoding)) do + unquote(value) + end + end) + + def content_encoding_to_encoding({_, _} = res) do + res + end + + def content_encoding_to_encoding(encoding) when is_binary(encoding) do + {:unk, encoding} + end + + decoders = + Application.compile_env(:pepper_http, :base_decompressors, [ + identity: Pepper.HTTP.BodyDecompressor.Identity, + deflate: Pepper.HTTP.BodyDecompressor.Deflate, + gzip: Pepper.HTTP.BodyDecompressor.GZip, + ]) ++ Application.compile_env(:pepper_http, :decompressors, []) + + Enum.each(decoders, fn {type, module} -> + def decode_body_by_encoding(unquote(type), %Response{} = response, options) do + unquote(module).decompress_response(response, options) + end + end) + + def decode_body_by_encoding({:unaccepted, content_encoding}, response, _options) do + {:error, {:unaccepted_content_encoding, content_encoding, response}} + end + + def decode_body_by_encoding({:unk, content_encoding}, response, _options) do + {:error, {:unknown_content_encoding, content_encoding, response}} + end + + def decode_body_by_encoding(encoding, response, _options) do + {:error, {:unhandled_content_encoding, encoding, response}} + end +end diff --git a/lib/pepper/http/body_decompressor/deflate.ex b/lib/pepper/http/body_decompressor/deflate.ex new file mode 100644 index 0000000..c5cbba0 --- /dev/null +++ b/lib/pepper/http/body_decompressor/deflate.ex @@ -0,0 +1,15 @@ +defmodule Pepper.HTTP.BodyDecompressor.Deflate do + alias Pepper.HTTP.Response + + def decompress_response(%Response{} = response, _options) do + stream = :zlib.open() + try do + :ok = :zlib.inflateInit(stream) + blob = :zlib.inflate(stream, response.body) + :ok = :zlib.inflateEnd(stream) + {:ok, %{response | original_body: response.body, body: IO.iodata_to_binary(blob)}} + after + :zlib.close(stream) + end + end +end diff --git a/lib/pepper/http/body_decompressor/gzip.ex b/lib/pepper/http/body_decompressor/gzip.ex new file mode 100644 index 0000000..948418f --- /dev/null +++ b/lib/pepper/http/body_decompressor/gzip.ex @@ -0,0 +1,8 @@ +defmodule Pepper.HTTP.BodyDecompressor.GZip do + alias Pepper.HTTP.Response + + def decompress_response(%Response{} = response, _options) do + blob = :zlib.gunzip(response.body) + {:ok, %{response | original_body: response.body, body: blob}} + end +end diff --git a/lib/pepper/http/body_decompressor/identity.ex b/lib/pepper/http/body_decompressor/identity.ex new file mode 100644 index 0000000..ad26cbc --- /dev/null +++ b/lib/pepper/http/body_decompressor/identity.ex @@ -0,0 +1,7 @@ +defmodule Pepper.HTTP.BodyDecompressor.Identity do + alias Pepper.HTTP.Response + + def decompress_response(%Response{} = response, _options) do + {:ok, response} + end +end diff --git a/lib/pepper/http/body_encoder.ex b/lib/pepper/http/body_encoder.ex new file mode 100644 index 0000000..5b088b0 --- /dev/null +++ b/lib/pepper/http/body_encoder.ex @@ -0,0 +1,33 @@ +defmodule Pepper.HTTP.BodyEncoder do + def encode_body(body, options \\ []) + + def encode_body(nil, _options) do + {:ok, {[], ""}} + end + + def encode_body(binary, _options) when is_binary(binary) do + {:ok, {[], binary}} + end + + def encode_body(list, _options) when is_list(list) do + {:ok, {[], list}} + end + + encoders = + Application.compile_env(:pepper_http, :base_encoders, [ + csv: Pepper.HTTP.BodyEncoder.CSV, + form: Pepper.HTTP.BodyEncoder.Form, + form_stream: Pepper.HTTP.BodyEncoder.FormStream, + form_urlencoded: Pepper.HTTP.BodyEncoder.FormUrlencoded, + json: Pepper.HTTP.BodyEncoder.JSON, + stream: Pepper.HTTP.BodyEncoder.Stream, + text: Pepper.HTTP.BodyEncoder.Text, + xml: Pepper.HTTP.BodyEncoder.XML, + ]) ++ Application.compile_env(:pepper_http, :encoders, []) + + Enum.each(encoders, fn {name, module} -> + def encode_body({unquote(name), value}, options) do + unquote(module).encode_body(value, options) + end + end) +end diff --git a/lib/pepper/http/body_encoder/csv.ex b/lib/pepper/http/body_encoder/csv.ex new file mode 100644 index 0000000..62d9b7e --- /dev/null +++ b/lib/pepper/http/body_encoder/csv.ex @@ -0,0 +1,21 @@ +defmodule Pepper.HTTP.BodyEncoder.CSV do + def encode_body({csv_headers, rows}, _options) when is_list(rows) do + blob = + rows + |> CSV.encode(headers: csv_headers) + |> Enum.to_list() + + headers = [{"content-type", "application/csv"}] + {:ok, {headers, blob}} + end + + def encode_body(rows, _options) when is_list(rows) do + blob = + rows + |> CSV.encode() + |> Enum.to_list() + + headers = [{"content-type", "application/csv"}] + {:ok, {headers, blob}} + end +end diff --git a/lib/pepper/http/body_encoder/form.ex b/lib/pepper/http/body_encoder/form.ex new file mode 100644 index 0000000..89ae277 --- /dev/null +++ b/lib/pepper/http/body_encoder/form.ex @@ -0,0 +1,41 @@ +defmodule Pepper.HTTP.BodyEncoder.Form do + import Pepper.HTTP.Utils + + def encode_body(items, _options) when is_list(items) do + boundary = generate_boundary() + boundary = "------------#{boundary}" + + request_headers = [ + {"content-type", "multipart/form-data; boundary=#{boundary}"} + ] + + blob = + [ + Enum.map(items, fn {name, headers, blob} -> + [ + "--",boundary,"\r\n", + encode_item(name, headers, blob),"\r\n", + ] + end), + "--",boundary, "--\r\n" + ] + + {:ok, {request_headers, blob}} + end + + defp encode_item(name, headers, blob) when (is_atom(name) or is_binary(name)) and + is_list(headers) and + is_binary(blob) do + headers = [ + {"content-disposition", "form-data; name=\"#{name}\""}, + {"content-length", to_string(byte_size(blob))} + | headers + ] + + [ + encode_headers(headers), + "\r\n", + blob, + ] + end +end diff --git a/lib/pepper/http/body_encoder/form_stream.ex b/lib/pepper/http/body_encoder/form_stream.ex new file mode 100644 index 0000000..92a21fe --- /dev/null +++ b/lib/pepper/http/body_encoder/form_stream.ex @@ -0,0 +1,113 @@ +defmodule Pepper.HTTP.BodyEncoder.FormStream do + alias Pepper.HTTP.Proplist + + import Pepper.HTTP.Utils + + def encode_body(items, _options) when is_list(items) do + boundary = generate_boundary() + boundary = "------------#{boundary}" + + request_headers = [ + {"content-type", "multipart/form-data; boundary=#{boundary}"} + ] + + stream = + Stream.resource( + fn -> + {:next_item, boundary, items} + end, + &form_data_stream/1, + fn _ -> + :ok + end + ) + + {:ok, {request_headers, {:stream, stream}}} + end + + defp form_data_stream(:end) do + {:halt, :end} + end + + defp form_data_stream({:next_item, boundary, []}) do + {["--", boundary, "--\r\n"], :end} + end + + defp form_data_stream({:next_item, boundary, [item | items]}) do + form_data_stream({:send_item_start, boundary, item, items}) + end + + defp form_data_stream( + {:send_item_start, boundary, {name, headers, body}, items} + ) when is_binary(body) or is_list(body) do + headers = Proplist.merge([ + {"content-disposition", "form-data; name=\"#{name}\""}, + {"content-length", to_string(IO.iodata_length(body))}, + ], headers) + + iolist = [ + "--",boundary,"\r\n", + encode_headers(headers), + "\r\n" + ] + + {iolist, {:send_item_body, boundary, {name, headers, body}, items}} + end + + defp form_data_stream( + {:send_item_start, boundary, {name, headers, {:chunked, _stream} = res}, items} + ) do + headers = Proplist.merge([ + {"content-disposition", "form-data; name=\"#{name}\""}, + {"transfer-encoding", "chunked"}, + ], headers) + + iolist = [ + "--",boundary,"\r\n", + encode_headers(headers), + "\r\n" + ] + + {iolist, {:send_item_body, boundary, {name, headers, res}, items}} + end + + defp form_data_stream( + {:send_item_start, boundary, {name, headers, stream}, items} + ) do + headers = Proplist.merge([ + {"content-disposition", "form-data; name=\"#{name}\""}, + ], headers) + + iolist = [ + "--",boundary,"\r\n", + encode_headers(headers), + "\r\n" + ] + + {iolist, {:send_item_body, boundary, {name, headers, stream}, items}} + end + + defp form_data_stream( + {:send_item_body, boundary, {_name, _headers, body}, items} + ) when is_binary(body) do + {[body, "\r\n"], {:next_item, boundary, items}} + end + + defp form_data_stream( + {:send_item_body, boundary, {_name, _headers, {:chunked, stream}} = item, items} + ) do + {stream, {:end_current_item, boundary, item, items}} + end + + defp form_data_stream( + {:send_item_body, boundary, {_name, _headers, stream} = item, items} + ) do + {stream, {:end_current_item, boundary, item, items}} + end + + defp form_data_stream( + {:end_current_item, boundary, {_name, _headers, _stream}, items} + ) do + {["\r\n"], {:next_item, boundary, items}} + end +end diff --git a/lib/pepper/http/body_encoder/form_urlencoded.ex b/lib/pepper/http/body_encoder/form_urlencoded.ex new file mode 100644 index 0000000..133f515 --- /dev/null +++ b/lib/pepper/http/body_encoder/form_urlencoded.ex @@ -0,0 +1,9 @@ +defmodule Pepper.HTTP.BodyEncoder.FormUrlencoded do + import Pepper.HTTP.Utils + + def encode_body(term, _options) when is_list(term) or is_map(term) do + blob = encode_query_params(term, :default) + headers = [{"content-type", "application/x-www-form-urlencoded"}] + {:ok, {headers, blob}} + end +end diff --git a/lib/pepper/http/body_encoder/json.ex b/lib/pepper/http/body_encoder/json.ex new file mode 100644 index 0000000..0e75c82 --- /dev/null +++ b/lib/pepper/http/body_encoder/json.ex @@ -0,0 +1,12 @@ +defmodule Pepper.HTTP.BodyEncoder.JSON do + def encode_body(term, _options) do + case Jason.encode(term) do + {:ok, blob} -> + headers = [{"content-type", "application/json"}] + {:ok, {headers, blob}} + + {:error, _} = err -> + err + end + end +end diff --git a/lib/pepper/http/body_encoder/stream.ex b/lib/pepper/http/body_encoder/stream.ex new file mode 100644 index 0000000..45a5e36 --- /dev/null +++ b/lib/pepper/http/body_encoder/stream.ex @@ -0,0 +1,5 @@ +defmodule Pepper.HTTP.BodyEncoder.Stream do + def encode_body(stream, _options) do + {:ok, {[], {:stream, stream}}} + end +end diff --git a/lib/pepper/http/body_encoder/text.ex b/lib/pepper/http/body_encoder/text.ex new file mode 100644 index 0000000..b237c59 --- /dev/null +++ b/lib/pepper/http/body_encoder/text.ex @@ -0,0 +1,7 @@ +defmodule Pepper.HTTP.BodyEncoder.Text do + def encode_body(term, _options) do + blob = IO.iodata_to_binary(term) + headers = [{"content-type", "text/plain"}] + {:ok, {headers, blob}} + end +end diff --git a/lib/pepper/http/body_encoder/xml.ex b/lib/pepper/http/body_encoder/xml.ex new file mode 100644 index 0000000..274553e --- /dev/null +++ b/lib/pepper/http/body_encoder/xml.ex @@ -0,0 +1,7 @@ +defmodule Pepper.HTTP.BodyEncoder.XML do + def encode_body(term, _options) do + blob = Saxy.encode!(term) + headers = [{"content-type", "application/xml"}] + {:ok, {headers, blob}} + end +end diff --git a/lib/pepper/http/client.ex b/lib/pepper/http/client.ex index 614efed..b84797d 100644 --- a/lib/pepper/http/client.ex +++ b/lib/pepper/http/client.ex @@ -14,7 +14,7 @@ defmodule Pepper.HTTP.Client do import Pepper.HTTP.Utils - @type method :: String.t() | :head | :get | :post | :put | :patch | :delete | :options + @type method :: Pepper.HTTP.Utils.http_method() @typedoc """ By default ALL http requests are one-off requests, meaning the connection is opened and closed @@ -34,7 +34,6 @@ defmodule Pepper.HTTP.Client do @type request_option :: {:connect_timeout, timeout()} | {:recv_timeout, timeout()} | {:recv_size, non_neg_integer()} - | {:mode, :active | :passive} | {:connection_manager, connection_manager()} | {:connection_manager_id, term()} | {:response_body_handler, module()} @@ -116,7 +115,6 @@ defmodule Pepper.HTTP.Client do |> Keyword.put_new(:connect_timeout, 30_000) # 30 seconds |> Keyword.put_new(:recv_timeout, 30_000) # 30 seconds |> Keyword.put_new(:recv_size, 8 * 1024 * 1024) # 8 megabytes - |> Keyword.put_new(:mode, :passive) # passive will pull bytes off, safer for inline process |> Keyword.put_new(:response_body_handler, Pepper.HTTP.ResponseBodyHandler.Default) |> Keyword.put_new(:response_body_handler_options, []) |> Keyword.put_new(:connection_manager, :one_off) @@ -197,7 +195,6 @@ defmodule Pepper.HTTP.Client do :response_body_handler, :response_body_handler_options, :attempts, - :mode, :connection_manager, :connection_manager_id, :recv_size, diff --git a/lib/pepper/http/connection_manager/one_off.ex b/lib/pepper/http/connection_manager/one_off.ex index 8b784ff..83ec790 100644 --- a/lib/pepper/http/connection_manager/one_off.ex +++ b/lib/pepper/http/connection_manager/one_off.ex @@ -18,11 +18,10 @@ defmodule Pepper.HTTP.ConnectionManager.OneOff do @spec request(term(), Request.t()) :: {:ok, Response.t()} | {:error, error_reasons()} def request(_id, %Request{} = request) do - mode = request.options[:mode] connect_options = Keyword.merge( [ - mode: mode, + mode: :passive, transport_opts: [ timeout: request.options[:connect_timeout], ], @@ -42,11 +41,11 @@ defmodule Pepper.HTTP.ConnectionManager.OneOff do body_handler: request.response_body_handler, body_handler_options: request.response_body_handler_options, } - result = maybe_stream_request_body(conn, ref, request, is_stream?, []) + result = maybe_stream_request_body(:passive, conn, ref, request, is_stream?, []) case result do {:ok, conn, responses} -> - case read_responses(mode, conn, ref, response, request, responses) do + case read_responses(:passive, conn, ref, response, request, responses) do {:ok, conn, response} -> Mint.HTTP.close(conn) {:ok, response} @@ -54,10 +53,6 @@ defmodule Pepper.HTTP.ConnectionManager.OneOff do {:error, conn, reason} -> Mint.HTTP.close(conn) handle_receive_error(conn, reason, request) - - {:error, conn, reason, _} -> - Mint.HTTP.close(conn) - handle_receive_error(conn, reason, request) end {:error, conn, reason} -> diff --git a/lib/pepper/http/connection_manager.ex b/lib/pepper/http/connection_manager/pooled.ex similarity index 69% rename from lib/pepper/http/connection_manager.ex rename to lib/pepper/http/connection_manager/pooled.ex index b647ed2..322fad5 100644 --- a/lib/pepper/http/connection_manager.ex +++ b/lib/pepper/http/connection_manager/pooled.ex @@ -5,9 +5,18 @@ defmodule Pepper.HTTP.ConnectionManager.Pooled do available_connections: nil, default_lifespan: nil, pool_size: nil, + total_size: 0, busy_size: 0, available_size: 0, ] + + @type t :: %__MODULE__{ + busy_connections: :ets.table(), + available_connections: :ets.table(), + total_size: non_neg_integer(), + busy_size: non_neg_integer(), + available_size: non_neg_integer(), + } end @moduledoc """ @@ -41,6 +50,8 @@ defmodule Pepper.HTTP.ConnectionManager.Pooled do alias Pepper.HTTP.SendError alias Pepper.HTTP.ConnectError + import Pepper.HTTP.Utils, only: [safe_reduce_ets_table: 3] + @type conn_key :: {scheme::atom(), host::String.t(), port::integer(), Keyword.t()} @type connection_id :: GenServer.server() @@ -87,6 +98,10 @@ defmodule Pepper.HTTP.ConnectionManager.Pooled do end end + def close_all(server, reason \\ :normal, timeout \\ :infinity) do + GenServer.call(server, {:close_all, reason}, timeout) + end + @spec get_stats(connection_id(), timeout()) :: map() def get_stats(server, timeout \\ 15_000) do GenServer.call(server, :get_stats, timeout) @@ -98,26 +113,59 @@ defmodule Pepper.HTTP.ConnectionManager.Pooled do options |> Keyword.put_new(:pool_size, 100) |> Keyword.put_new(:default_lifespan, 30_000) + |> validate_start_options([]) + end + + defp validate_start_options([], acc) do + {:ok, Enum.reverse(acc)} + end + + defp validate_start_options( + [{:pool_size, pool_size} = pair | rest], + acc + ) when is_integer(pool_size) and pool_size > 0 do + validate_start_options(rest, [pair | acc]) + end + + defp validate_start_options( + [{:default_lifespan, lifespan} = pair | rest], + acc + ) when is_integer(lifespan) and lifespan > 0 do + validate_start_options(rest, [pair | acc]) + end + + defp validate_start_options([value | _rest], _acc) do + {:error, {:unexpected_start_option, value}} end @spec start(start_options(), GenServer.options()) :: GenServer.on_start() def start(opts, process_options \\ []) when is_list(opts) and is_list(process_options) do - opts = patch_start_options(opts) - GenServer.start(__MODULE__, opts, process_options) + case patch_start_options(opts) do + {:ok, opts} -> + GenServer.start(__MODULE__, opts, process_options) + + {:error, _} = err -> + err + end end @spec start_link(start_options(), GenServer.options()) :: GenServer.on_start() def start_link(opts, process_options \\ []) when is_list(opts) and is_list(process_options) do - opts = patch_start_options(opts) - GenServer.start_link(__MODULE__, opts, process_options) + case patch_start_options(opts) do + {:ok, opts} -> + GenServer.start_link(__MODULE__, opts, process_options) + + {:error, _} = err -> + err + end end @impl true def init(opts) do Process.flag(:trap_exit, true) - available_connections = :ets.new(:available_connections, [:duplicate_bag, :private]) - busy_connections = :ets.new(:busy_connections, [:duplicate_bag, :private]) + available_connections = :ets.new(:available_connections, [:bag, :private]) + busy_connections = :ets.new(:busy_connections, [:bag, :private]) state = %State{ @@ -130,9 +178,29 @@ defmodule Pepper.HTTP.ConnectionManager.Pooled do {:ok, state} end + @impl true + def terminate(_reason, %State{} = state) do + Process.flag(:trap_exit, false) + + safe_reduce_ets_table(state.available_connections, nil, fn {_key, pid}, acc -> + Process.exit(pid, :normal) + acc + end) + + safe_reduce_ets_table(state.busy_connections, nil, fn {_key, {pid, _from}}, acc -> + Process.exit(pid, :normal) + acc + end) + + :ets.delete(state.busy_connections) + :ets.delete(state.available_connections) + :ok + end + @impl true def handle_call(:get_stats, _from, %State{} = state) do stats = %{ + total_size: state.total_size, pool_size: state.pool_size, busy_size: state.busy_size, available_size: state.available_size, @@ -143,6 +211,8 @@ defmodule Pepper.HTTP.ConnectionManager.Pooled do @impl true def handle_call({:request, %Pepper.HTTP.Request{} = request}, from, %State{} = state) do + start_time = System.monotonic_time(:microsecond) + request = %{request | time: start_time} connect_options = Keyword.get(request.options, :connect_options, []) key = {request.scheme, request.uri.host, request.uri.port, connect_options} @@ -171,22 +241,9 @@ defmodule Pepper.HTTP.ConnectionManager.Pooled do end @impl true - def handle_cast({:connection_expired, pid}, %State{} = state) do - case get_available_connection_by_pid(pid, state) do - {[{_key, ^pid}], _continuation} -> - :ok = PooledConnection.schedule_stop(pid) - {:noreply, state} - - :'$end_of_table' -> - case get_busy_connection_by_pid(pid, state) do - {[{_key, {^pid, _from}}], _continuation} -> - :ok = PooledConnection.schedule_stop(pid) - {:noreply, state} - - :'$end_of_table' -> - {:noreply, state} - end - end + def handle_call({:close_all, reason}, _from, %State{} = state) do + %State{} = state = do_close_all(reason, state) + {:reply, :ok, state} end @impl true @@ -194,9 +251,9 @@ defmodule Pepper.HTTP.ConnectionManager.Pooled do case get_busy_connection_by_pid(pid, state) do {[{key, {^pid, _from}} = pair], _continuation} -> state = remove_busy_connection(pair, state) + state = add_available_connection({key, pid}, state) case reason do :ok -> - state = add_available_connection({key, pid}, state) {:noreply, state} {:error, _reason} -> @@ -207,7 +264,7 @@ defmodule Pepper.HTTP.ConnectionManager.Pooled do end :'$end_of_table' -> - Logger.warning "unexpected checkin", [pid: inspect(pid), reason: inspect(reason)] + Logger.warning "unexpected checkin", pid: inspect(pid), reason: inspect(reason) {:noreply, state} end end @@ -215,18 +272,21 @@ defmodule Pepper.HTTP.ConnectionManager.Pooled do @impl true def handle_info({:EXIT, pid, reason}, %State{} = state) do case get_busy_connection_by_pid(pid, state) do - {[{_key, {^pid, _from}} = pair], _continuation} -> + {[{_key, {^pid, from}} = pair], _continuation} -> state = remove_busy_connection(pair, state) + state = %{state | total_size: state.total_size - 1} + GenServer.reply(from, {:error, reason}) {:noreply, state} :'$end_of_table' -> case get_available_connection_by_pid(pid, state) do {[{_key, ^pid} = pair], _continuation} -> + state = %{state | total_size: state.total_size - 1} state = remove_available_connection(pair, state) {:noreply, state} :'$end_of_table' -> - Logger.error "an unknown process has terminated", [reason: inspect(reason)] + Logger.error "an unknown process has terminated", reason: inspect(reason) #{:stop, {:unexpected_exit, pid}, state} {:noreply, state} end @@ -279,9 +339,9 @@ defmodule Pepper.HTTP.ConnectionManager.Pooled do defp checkout_new_connection(key, %State{} = state) do if state.pool_size > calc_used_connections(state) do # there are new connections still available - case PooledConnection.start_link(self(), [lifespan: state.default_lifespan]) do + case PooledConnection.start_link(self(), make_ref(), [lifespan: state.default_lifespan]) do {:ok, pid} -> - {:ok, {key, pid}, state} + {:ok, {key, pid}, %{state | total_size: state.total_size + 1}} {:error, reason} -> {:error, reason, state} @@ -376,4 +436,50 @@ defmodule Pepper.HTTP.ConnectionManager.Pooled do true = :ets.insert(state.available_connections, pair) %{state | available_size: state.available_size + 1} end + + defp do_close_all(reason, %State{} = state) do + state = close_all_busy_connections(reason, state) + state = close_all_available_connections(reason, state) + state + end + + defp close_all_busy_connections(reason, state) do + reduce_ets_table(state.busy_connections, state, fn {_key, {pid, _from}}, state -> + :ok = PooledConnection.schedule_stop(pid, reason) + state + end) + end + + defp close_all_available_connections(reason, state) do + reduce_ets_table(state.available_connections, state, fn {_key, pid}, state -> + :ok = PooledConnection.schedule_stop(pid, reason) + state + end) + end + + defp reduce_ets_table(table, acc, callback) do + true = :ets.safe_fixtable(table, true) + try do + do_reduce_ets_table(:ets.first(table), table, acc, callback) + after + true = :ets.safe_fixtable(table, false) + end + end + + defp do_reduce_ets_table(:"$end_of_table", _table, acc, _callback) do + acc + end + + defp do_reduce_ets_table(key, table, acc, callback) do + acc = + case :ets.lookup(table, key) do + [] -> + acc + + [{^key, _value} = pair] -> + callback.(pair, acc) + end + + do_reduce_ets_table(:ets.next(table, key), table, acc, callback) + end end diff --git a/lib/pepper/http/connection_manager/pooled_connection.ex b/lib/pepper/http/connection_manager/pooled_connection.ex index 394967f..ec01dd9 100644 --- a/lib/pepper/http/connection_manager/pooled_connection.ex +++ b/lib/pepper/http/connection_manager/pooled_connection.ex @@ -1,6 +1,9 @@ defmodule Pepper.HTTP.ConnectionManager.PooledConnection do defmodule State do defstruct [ + ref: nil, + stage: :idle, + lifespan: nil, pool_pid: nil, conn: nil, scheme: nil, @@ -8,6 +11,10 @@ defmodule Pepper.HTTP.ConnectionManager.PooledConnection do port: nil, status: :ok, just_reconnected: false, + # + response: nil, + request: nil, + active_request: nil, ] end @@ -22,8 +29,8 @@ defmodule Pepper.HTTP.ConnectionManager.PooledConnection do import Pepper.HTTP.ConnectionManager.Utils - def start_link(pool_pid, opts, process_options \\ []) do - GenServer.start_link(__MODULE__, {pool_pid, opts}, process_options) + def start_link(pool_pid, ref, opts, process_options \\ []) do + GenServer.start_link(__MODULE__, {pool_pid, ref, opts}, process_options) end @doc """ @@ -35,14 +42,92 @@ defmodule Pepper.HTTP.ConnectionManager.PooledConnection do GenServer.cast(pid, {:request, request, from}) end - @spec schedule_stop(GenServer.server()) :: :ok - def schedule_stop(pid) do - GenServer.cast(pid, :stop) + @spec schedule_stop(GenServer.server(), reason::any()) :: :ok + def schedule_stop(pid, reason \\ :normal) do + GenServer.cast(pid, {:stop, reason}) end @impl true - def init({pool_pid, opts}) do - {:ok, %State{pool_pid: pool_pid}, Keyword.fetch!(opts, :lifespan)} + def init({pool_pid, ref, opts}) do + lifespan = Keyword.fetch!(opts, :lifespan) + {:ok, %State{ref: ref, pool_pid: pool_pid, lifespan: lifespan}, lifespan} + end + + @impl true + def handle_continue({:handle_responses, []}, %State{} = state) do + timeout = determine_timeout(state) + {:noreply, state, timeout} + end + + @impl true + def handle_continue( + {:handle_responses, [http_response | http_responses]}, + %State{ + conn: conn, + request: request, + response: response, + active_request: %{ + from: from, + ref: ref, + } + } = state + ) do + case handle_response(conn, ref, response, request, http_response) do + {:next, %Response{} = response} -> + state = %State{ + state + | response: response + } + {:noreply, state, {:continue, {:handle_responses, http_responses}}} + + {:done, %Response{} = response} -> + response = + %{ + response + | time: System.monotonic_time(:microsecond), + } + + :ok = GenServer.reply(from, {:ok, response}) + state = checkin(state, :ok) + state = %State{ + state + | stage: :idle, + response: nil, + request: nil, + active_request: nil, + } + {:noreply, state, state.lifespan} + + {:error, conn, reason} -> + handle_receive_error(conn, reason, request, from, state) + end + end + + @impl true + def handle_continue( + :send_body, + %State{ + conn: conn, + request: request, + active_request: %{ + from: from, + ref: ref, + is_stream?: is_stream?, + }, + } = state + ) do + case maybe_stream_request_body(:active, conn, ref, request, is_stream?, []) do + {:ok, conn, responses} -> + state = %{ + state + | stage: :recv, + conn: conn, + } + {:noreply, state, {:continue, {:handle_responses, responses}}} + + {:error, conn, reason} -> + handle_send_error(conn, reason, request, from, state) + end end @impl true @@ -54,80 +139,91 @@ defmodule Pepper.HTTP.ConnectionManager.PooledConnection do end @impl true - def handle_cast(:stop, %State{} = state) do - {:stop, :normal, state} + def handle_cast({:stop, reason}, %State{} = state) do + {:stop, reason, state} end @impl true def handle_cast({:request, %Request{} = request, from}, %State{} = state) do - do_request(request, from, state) + do_start_request(request, from, state) end @impl true - def handle_info(:timeout, %State{} = state) do - GenServer.cast(state.pool_pid, {:connection_expired, self()}) - {:noreply, %{state | status: :expired}} + def handle_info(:timeout, %State{stage: :idle} = state) do + {:stop, :normal, %{state | status: :expired}} end @impl true - def handle_info(message, %State{} = state) do - if state.conn do - case Mint.HTTP.stream(state.conn, message) do - {:error, conn, %Mint.TransportError{reason: :closed}, _rest} -> - {:stop, :normal, %{state | conn: conn}} + def handle_info(:timeout, %State{stage: :recv, active_request: %{from: from}} = state) do + reason = %Pepper.HTTP.ReceiveError{reason: %Mint.TransportError{reason: :timeout}} + err = {:error, reason} + :ok = GenServer.reply(from, err) + state = checkin(state, err) + {:stop, :normal, %{state | status: :recv_timeout}} + end + + @impl true + def handle_info(:timeout, %State{stage: _} = state) do + {:stop, :timeout, %{state | status: :expired}} + end - {:error, conn, reason, _rest} -> - {:stop, reason, %{state | conn: conn}} + @impl true + def handle_info(message, %State{conn: nil} = state) do + {:stop, {:unexpected_message_on_closed_connection, message}, state} + end - _ -> - {:noreply, state} - end - else - {:noreply, state} + @impl true + def handle_info(message, %State{conn: conn} = state) do + case Mint.HTTP.stream(conn, message) do + {:error, conn, %Mint.TransportError{reason: :closed}, _rest} -> + state = %State{state | conn: conn} + {:stop, :normal, state} + + {:error, conn, reason, _rest} -> + state = %State{state | conn: conn} + {:stop, reason, state} + + {:ok, conn, []} -> + state = %State{state | conn: conn} + timeout = determine_timeout(state) + {:noreply, state, timeout} + + {:ok, conn, responses} -> + state = %State{state | conn: conn} + {:noreply, state, {:continue, {:handle_responses, responses}}} end end - defp do_request(%Request{} = request, from, %State{} = state) do + defp do_start_request(%Request{} = request, from, %State{} = state) do case prepare_connection(request, state) do {:ok, state} -> - # go passive so messages can be captured manually - state = try_set_connection_mode(:passive, state) - {request, is_stream?, body} = determine_if_body_should_stream(state.conn, request) case Mint.HTTP.request(state.conn, request.method, request.path, request.headers, body) do {:ok, conn, ref} -> - # if the request was done, then reset the just_connected state - # it was only set initially to ensure that the request doesn't enter a reconnecting - # loop - state = %{state | just_reconnected: false} - response = %Response{ - protocol: Mint.HTTP.protocol(conn), - body_handler: request.response_body_handler, - body_handler_options: request.response_body_handler_options, + state = %{ + state + | stage: :send, + # if the request was done, then reset the just_connected state + # it was only set initially to ensure that the request doesn't enter a reconnecting + # loop + just_reconnected: false, + conn: conn, + request: request, + active_request: %{ + ref: ref, + from: from, + is_stream?: is_stream? + }, + response: %Response{ + ref: state.ref, + protocol: Mint.HTTP.protocol(conn), + body_handler: request.response_body_handler, + body_handler_options: request.response_body_handler_options + } } - result = maybe_stream_request_body(conn, ref, request, is_stream?, []) - - case result do - {:ok, conn, responses} -> - case read_responses(request.options[:mode], conn, ref, response, request, responses) do - {:ok, conn, result} -> - state = %{state | conn: conn} - :ok = GenServer.reply(from, {:ok, result}) - state = checkin(state, :ok) - {:noreply, state} - - {:error, conn, reason} -> - handle_receive_error(conn, reason, request, from, state) - - {:error, conn, reason, _} -> - handle_receive_error(conn, reason, request, from, state) - end - - {:error, conn, reason} -> - handle_send_error(conn, reason, request, from, state) - end + {:noreply, state, {:continue, :send_body}} {:error, conn, reason} -> should_reconnect? = @@ -153,7 +249,7 @@ defmodule Pepper.HTTP.ConnectionManager.PooledConnection do # if attempting a reconnect, close the existing connection (if its open at all) # and try the request again state = close_and_clear_connection(state) - do_request(request, from, state) + do_start_request(request, from, state) else # otherwise the request should not be retried and this worker will emit an error handle_request_error(conn, reason, request, from, state) @@ -163,16 +259,20 @@ defmodule Pepper.HTTP.ConnectionManager.PooledConnection do {:error, reason, state} -> handle_connect_error(reason, request, from, state) end - rescue ex -> - msg = {:exception, ex, __STACKTRACE__} - :ok = GenServer.reply(from, msg) - state = checkin(state, msg) - {:noreply, state} + end + + defp determine_timeout(%State{request: request} = state) do + case state.stage do + stage when stage in [:send, :idle] -> + state.lifespan + + :recv -> + request.options[:recv_timeout] + end end defp checkin(%State{} = state, reason) do - # return to active mode to capture errors and closed messages - state = try_set_connection_mode(:active, state) + # return the connection to its parent pool :ok = GenServer.cast(state.pool_pid, {:checkin, self(), reason}) state end @@ -205,7 +305,7 @@ defmodule Pepper.HTTP.ConnectionManager.PooledConnection do req_options = request.options connect_options = Keyword.merge( [ - mode: req_options[:mode], + mode: :active, transport_opts: [ timeout: req_options[:connect_timeout], ], @@ -236,20 +336,6 @@ defmodule Pepper.HTTP.ConnectionManager.PooledConnection do %{state | conn: nil} end - defp try_set_connection_mode(_mode, %State{conn: nil} = state) do - state - end - - defp try_set_connection_mode(mode, %State{} = state) do - case Mint.HTTP.set_mode(state.conn, mode) do - {:ok, conn} -> - %{state | conn: conn} - - {:error, _} -> - state - end - end - defp handle_request_error(conn, reason, request, from, %State{} = state) do state = %{state | conn: conn} ex = %RequestError{ @@ -259,7 +345,7 @@ defmodule Pepper.HTTP.ConnectionManager.PooledConnection do } :ok = GenServer.reply(from, {:error, ex}) state = checkin(state, {:error, ex}) - {:noreply, state} + {:noreply, state, state.lifespan} end defp handle_send_error(conn, reason, request, from, %State{} = state) do @@ -271,7 +357,7 @@ defmodule Pepper.HTTP.ConnectionManager.PooledConnection do } :ok = GenServer.reply(from, {:error, ex}) state = checkin(state, {:error, ex}) - {:noreply, state} + {:noreply, state, state.lifespan} end defp handle_receive_error(conn, reason, request, from, %State{} = state) do @@ -283,7 +369,7 @@ defmodule Pepper.HTTP.ConnectionManager.PooledConnection do } :ok = GenServer.reply(from, {:error, ex}) state = checkin(state, {:error, ex}) - {:noreply, state} + {:noreply, state, state.lifespan} end defp handle_connect_error(reason, request, from, %State{} = state) do @@ -294,6 +380,6 @@ defmodule Pepper.HTTP.ConnectionManager.PooledConnection do } :ok = GenServer.reply(from, {:error, ex}) state = checkin(state, {:error, ex}) - {:noreply, state} + {:noreply, state, state.lifespan} end end diff --git a/lib/pepper/http/connection_manager/utils.ex b/lib/pepper/http/connection_manager/utils.ex index 4527f39..718bf4b 100644 --- a/lib/pepper/http/connection_manager/utils.ex +++ b/lib/pepper/http/connection_manager/utils.ex @@ -4,16 +4,29 @@ defmodule Pepper.HTTP.ConnectionManager.Utils do import Pepper.HTTP.Utils - def read_responses(mode, conn, ref, response, %Request{} = request, []) do + @type mode :: :passive | :active + + @type conn :: Mint.Core.Conn.conn() + + @spec timespan(function(), System.time_unit()) :: + {{start_at::integer(), end_at::integer()}, result::any()} + def timespan(callback, unit \\ :microsecond) when is_function(callback, 0) do + start_at = :erlang.monotonic_time(unit) + result = callback.() + end_at = :erlang.monotonic_time(unit) + {{start_at, end_at}, result} + end + + @spec read_responses(mode(), conn(), reference(), Response.t(), Request.t(), [any()]) :: + {:ok, conn(), Response.t()} + | {:error, conn(), reason::any()} + def read_responses(mode, conn, ref, %Response{} = response, %Request{} = request, []) do case read_response(mode, conn, ref, request) do {:ok, conn, http_responses} -> read_responses(mode, conn, ref, response, request, http_responses) - {:error, conn, reason} -> - {:error, conn, reason, []} - - {:error, _conn, _reason, _responses} = err -> - err + {:error, conn, reason, []} -> + {:error, conn, reason} end end @@ -25,7 +38,7 @@ defmodule Pepper.HTTP.ConnectionManager.Utils do %Request{} = request, [http_response | http_responses] ) do - case handle_response(mode, conn, ref, response, request, http_response) do + case handle_response(conn, ref, response, request, http_response) do {:next, response} -> read_responses(mode, conn, ref, response, request, http_responses) @@ -33,44 +46,41 @@ defmodule Pepper.HTTP.ConnectionManager.Utils do {:ok, conn, response} {:error, conn, reason} -> - {:error, conn, reason, http_responses} + {:error, conn, reason} end end - @spec read_response(:passive | :active, Mint.Conn.t(), reference(), Pepper.HTTP.Request.t()) :: - {:ok, Mint.Conn.t(), [any()]} - | {:error, Mint.Conn.t(), reasonn::any(), responses::list()} + @spec read_response(:passive | :active, conn(), reference(), Request.t()) :: + {:ok, conn(), [any()]} + | {:error, conn(), reasonn::any(), responses::list()} def read_response(:passive, conn, _ref, %Request{} = request) do - case Mint.HTTP.recv(conn, 0, request.options[:recv_timeout]) do + recv_timeout = Keyword.fetch!(request.options, :recv_timeout) + case Mint.HTTP.recv(conn, 0, recv_timeout) do {:ok, _conn, _responses} = res -> res - {:error, conn, reason} -> - {:error, conn, reason, []} - {:error, _conn, _reason, _responses} = err -> err end end def read_response(:active, conn, _ref, %Request{} = request) do + recv_timeout = Keyword.fetch!(request.options, :recv_timeout) receive do message -> case Mint.HTTP.stream(conn, message) do {:ok, _conn, _responses} = res -> res - {:error, _conn, _reason} = err -> + {:error, _conn, _reason, _responses} = err -> err end - after - request.options[:recv_timeout] -> - {:error, conn, :timeout} + after recv_timeout -> + {:error, conn, :timeout, []} end end def handle_response( - _mode, conn, ref, %Response{} = response, @@ -97,6 +107,9 @@ defmodule Pepper.HTTP.ConnectionManager.Utils do {:ok, response} -> {:done, %{response | request: request}} end + + {:error, ^ref, reason} -> + {:error, conn, reason} end end @@ -147,15 +160,15 @@ defmodule Pepper.HTTP.ConnectionManager.Utils do {:error, conn, {:handle_data_error, reason}} end - def maybe_stream_request_body(conn, ref, %Request{} = request, is_stream?, responses) do + def maybe_stream_request_body(mode, conn, ref, %Request{} = request, is_stream?, responses) do if is_stream? do - stream_request_body(conn, ref, request, responses) + stream_request_body(mode, conn, ref, request, responses) else {:ok, conn, responses} end end - defp stream_request_body(conn, ref, %Request{} = request, responses) do + defp stream_request_body(mode, conn, ref, %Request{} = request, responses) do {:stream, stream} = request.body protocol = Mint.HTTP.protocol(conn) @@ -163,7 +176,7 @@ defmodule Pepper.HTTP.ConnectionManager.Utils do stream |> Enum.reduce_while( {:ok, conn, responses}, - &do_stream_request_body(protocol, &1, &2, ref, request) + &do_stream_request_body(mode, protocol, &1, &2, ref, request) ) case result do @@ -184,7 +197,7 @@ defmodule Pepper.HTTP.ConnectionManager.Utils do end end - defp do_stream_request_body(:http1, blob, {:ok, conn, responses}, ref, _request) do + defp do_stream_request_body(_mode, :http1, blob, {:ok, conn, responses}, ref, _request) do case Mint.HTTP.stream_request_body(conn, ref, blob) do {:ok, conn} -> {:cont, {:ok, conn, responses}} @@ -194,24 +207,24 @@ defmodule Pepper.HTTP.ConnectionManager.Utils do end end - defp do_stream_request_body(:http2, <<>>, {:ok, _conn, _responses} = res, _ref, _request) do + defp do_stream_request_body(_mode, :http2, <<>>, {:ok, _conn, _responses} = res, _ref, _request) do {:cont, res} end - defp do_stream_request_body(:http2, blob, {:ok, conn, responses}, ref, request) do + defp do_stream_request_body(mode, :http2, blob, {:ok, conn, responses}, ref, request) do conn_window_size = Mint.HTTP2.get_window_size(conn, :connection) window_size = Mint.HTTP2.get_window_size(conn, {:request, ref}) if conn_window_size <= 0 or window_size <= 0 do - case read_response(request.options[:mode], conn, ref, request) do + case read_response(mode, conn, ref, request) do {:ok, conn, []} -> - do_stream_request_body(:http2, blob, {:ok, conn, responses}, ref, request) + do_stream_request_body(mode, :http2, blob, {:ok, conn, responses}, ref, request) {:ok, conn, next_responses} -> {:halt, {:unexpected_responses, conn, responses ++ next_responses}} - {:error, _conn, _reason} = err -> - {:halt, err} + {:error, conn, reason, _responses} -> + {:halt, {:error, conn, reason}} end else blob = IO.iodata_to_binary(blob) @@ -228,7 +241,7 @@ defmodule Pepper.HTTP.ConnectionManager.Utils do case Mint.HTTP.stream_request_body(conn, ref, next_blob) do {:ok, conn} -> - do_stream_request_body(:http2, rest, {:ok, conn, responses}, ref, request) + do_stream_request_body(mode, :http2, rest, {:ok, conn, responses}, ref, request) {:error, _conn, _reason} = err -> {:halt, err} @@ -238,6 +251,9 @@ defmodule Pepper.HTTP.ConnectionManager.Utils do def determine_if_body_should_stream(conn, request) do case request.body do + nil -> + {request, false, ""} + {:stream, _stream} -> {request, true, :stream} diff --git a/lib/pepper/http/content_client.ex b/lib/pepper/http/content_client.ex index fddbb38..4f373fd 100644 --- a/lib/pepper/http/content_client.ex +++ b/lib/pepper/http/content_client.ex @@ -1,7 +1,10 @@ defmodule Pepper.HTTP.ContentClient do alias Pepper.HTTP.Client alias Pepper.HTTP.Response - alias Pepper.HTTP.Proplist + alias Pepper.HTTP.BodyDecoder + alias Pepper.HTTP.BodyDecompressor + alias Pepper.HTTP.BodyEncoder + alias Pepper.HTTP.ContentClient.Headers import Pepper.HTTP.Utils @@ -48,22 +51,47 @@ defmodule Pepper.HTTP.ContentClient do | {:auth_method, String.t() | :none | :basic | :bearer} | {:auth_identity, String.t()} | {:auth_secret, String.t()} + | {:query_params_encoding, :default | :duplicate} | Client.request_option() @type options :: [request_option()] - @type response_body :: {:json, term()} - | {:jsonapi, term()} - | {:xmldoc, term()} - | {:xml, term()} - | {:text, term()} - | {:csv, term()} + @type response_body :: {:unaccepted, term()} | {:unk, term()} + | {{:malformed, term()}, term()} + # For all other cases + | {atom(), term()} @type response_error :: Pepper.HTTP.BodyError.t() | Client.response_error() @type response :: {:ok, Response.t(), response_body()} | {:error, response_error()} + @no_options [] + + def post(url, query_params, headers, body, options \\ []) do + request(:post, url, query_params, headers, body, options) + end + + def patch(url, query_params, headers, body, options \\ []) do + request(:patch, url, query_params, headers, body, options) + end + + def put(url, query_params, headers, body, options \\ []) do + request(:put, url, query_params, headers, body, options) + end + + def delete(url, query_params \\ [], headers \\ [], options \\ []) do + request(:delete, url, query_params, headers, nil, options) + end + + def get(url, query_params \\ [], headers \\ [], options \\ []) do + request(:get, url, query_params, headers, nil, options) + end + + def options(url, query_params \\ [], headers \\ [], options \\ []) do + request(:options, url, query_params, headers, nil, options) + end + @doc """ Perform an HTTP Request @@ -76,15 +104,18 @@ defmodule Pepper.HTTP.ContentClient do """ @spec request(method(), url(), query_params(), headers(), body(), options()) :: response() def request(method, url, query_params, headers, body, options \\ []) do - case encode_body(body) do + {encoder_options, options} = Keyword.pop(options, :encoder_options, @no_options) + {uri_options, options} = Keyword.split(options, [:query_params_encoding]) + + case BodyEncoder.encode_body(body, encoder_options) do {:ok, {body_headers, blob}} -> - case encode_new_uri(url, query_params) do + case encode_new_uri(url, query_params, uri_options) do {:ok, new_uri} -> all_headers = body_headers ++ Enum.map(headers, fn {key, value} -> {String.downcase(key), value} end) - {all_headers, options} = maybe_add_auth_headers(all_headers, options) + {all_headers, options} = Headers.add_additional_headers(blob, all_headers, options) client_options = Keyword.drop(options, [:normalize_xml]) @@ -114,188 +145,24 @@ defmodule Pepper.HTTP.ContentClient do end end - def post(url, query_params, headers, body, options \\ []) do - request(:post, url, query_params, headers, body, options) - end - - def patch(url, query_params, headers, body, options \\ []) do - request(:patch, url, query_params, headers, body, options) - end - - def put(url, query_params, headers, body, options \\ []) do - request(:put, url, query_params, headers, body, options) - end - - def delete(url, query_params \\ [], headers \\ [], options \\ []) do - request(:delete, url, query_params, headers, nil, options) - end - - def get(url, query_params \\ [], headers \\ [], options \\ []) do - request(:get, url, query_params, headers, nil, options) - end - - defp maybe_add_auth_headers(headers, options) do - {auth_method, options} = Keyword.pop(options, :auth_method, "none") - {username, options} = Keyword.pop(options, :auth_identity) - {password, options} = Keyword.pop(options, :auth_secret) - - headers = - case to_string(auth_method) do - "none" -> - headers - - "basic" -> - auth = Base.encode64("#{username}:#{password}") - - [ - {"authorization", "Basic #{auth}"} - | headers - ] - - "bearer" -> - [ - {"authorization", "Bearer #{password}"} - | headers - ] - - _ -> - headers - end - - {headers, options} - end - defp handle_response({:ok, %Response{} = response}, options) do - {:ok, response, decode_body(response, options)} + with {:ok, response} <- BodyDecompressor.decompress_response(response, options) do + {:ok, response, BodyDecoder.decode_body(response, options)} + else + {:error, _} = err -> + err + end end defp handle_response({:error, reason}, _options) do {:error, reason} end - defp decode_body(%Response{ - request: %{ - headers: req_headers, - }, - headers: res_headers, - body: body - }, options) do - # retrieve the original request accept header, this will be used to "allow" the content-type - # to be parsed - accept = Proplist.get(req_headers, "accept") - # retrieve the response content-type - content_type = Proplist.get(res_headers, "content-type") - - # ensure that we only parse content for the given accept header to avoid parsing bodies we - # didn't want or even expect - accepted_content_type = - case accept do - nil -> - # no accept header was given, expect to parse anything, this is dangerous - # but allows the default behaviour to continue - # you should ALWAYS specify an accept header - content_type - - _ -> - if content_type do - # a content-type was returned, try negotiate with the accept header and content-type - case :accept_header.negotiate(accept, [content_type]) do - :undefined -> - # mismatch accept and content-type, refuse to parse the content and return - # nil for the accepted_content_type - nil - - name when is_binary(name) -> - # return the matched content_type - name - end - else - # there was no content-type, return nil - nil - end - end - - type = - if accepted_content_type do - case Plug.Conn.Utils.content_type(accepted_content_type) do - {:ok, "application", "json", _params} -> - # parse standard json - :json - - {:ok, "application", "vnd.api+json", _params} -> - # parse jsonapi - :jsonapi - - {:ok, "application", "xml", _params} -> - # parse application xml - :xml - - {:ok, "text", "xml", _params} -> - # parse text xml - :xml - - {:ok, "text", "plain", _params} -> - # return plain text as is - :text - - {:ok, "text", "csv", _params} -> - # return csv as is - :csv - - {:ok, "application", "csv", _params} -> - # return csv as is - :csv - - {:ok, _, _, _} -> - # some other content-type, return it as unknown - :unk - - :error -> - # the content-type failed to parse, return it as unknown as well - :unk - end - else - # no content-type or mismatched content-type and accept, return unk(nown) - :unk - end - - case type do - type when type in [:json, :jsonapi] -> - case Jason.decode(body) do - {:ok, doc} -> - {type, doc} - - {:error, _} -> - {:unk, body} - end - - :xml -> - data = SweetXml.parse(body) - # Parse XML - if options[:normalize_xml] do - {:xmldoc, handle_xml_body(data)} - else - {type, data} - end - - type -> - {type, body} - end - end - - defp encode_query_params(nil) do - nil - end - - defp encode_query_params(query_params) when is_list(query_params) or is_map(query_params) do - Plug.Conn.Query.encode(query_params) - end - - @spec encode_new_uri(URI.t() | String.t(), map() | Keyword.t()) :: URI.t() - defp encode_new_uri(url, query_params) do + @spec encode_new_uri(URI.t() | String.t(), map() | Keyword.t(), Keyword.t()) :: URI.t() + defp encode_new_uri(url, query_params, options) do case URI.new(url) do {:ok, %URI{} = uri} -> - case encode_query_params(query_params) do + case encode_query_params(query_params, Keyword.get(options, :query_params_encoding, :default)) do nil -> {:ok, uri} @@ -311,189 +178,4 @@ defmodule Pepper.HTTP.ContentClient do err end end - - defp encode_body(nil) do - {:ok, {[], ""}} - end - - defp encode_body({:csv, {csv_headers, rows}}) when is_list(rows) do - blob = - rows - |> CSV.encode(headers: csv_headers) - |> Enum.to_list() - - headers = [{"content-type", "application/csv"}] - {:ok, {headers, blob}} - end - - defp encode_body({:csv, rows}) when is_list(rows) do - blob = - rows - |> CSV.encode() - |> Enum.to_list() - - headers = [{"content-type", "application/csv"}] - {:ok, {headers, blob}} - end - - defp encode_body({:form_urlencoded, term}) when is_list(term) or is_map(term) do - blob = encode_query_params(term) - headers = [{"content-type", "application/x-www-form-urlencoded"}] - {:ok, {headers, blob}} - end - - defp encode_body({:form, items}) do - boundary = generate_boundary() - boundary = "------------#{boundary}" - - request_headers = [ - {"content-type", "multipart/form-data; boundary=#{boundary}"} - ] - - blob = - [ - Enum.map(items, fn {name, headers, blob} -> - [ - "--",boundary,"\r\n", - encode_item(name, headers, blob),"\r\n", - ] - end), - "--",boundary, "--\r\n" - ] - - {:ok, {request_headers, blob}} - end - - defp encode_body({:form_stream, items}) do - boundary = generate_boundary() - boundary = "------------#{boundary}" - - request_headers = [ - {"content-type", "multipart/form-data; boundary=#{boundary}"} - ] - - stream = - Stream.resource( - fn -> - {:next_item, boundary, items} - end, - &form_data_stream/1, - fn _ -> - :ok - end - ) - - {:ok, {request_headers, {:stream, stream}}} - end - - defp encode_body({:text, term}) do - blob = IO.iodata_to_binary(term) - headers = [{"content-type", "text/plain"}] - {:ok, {headers, blob}} - end - - defp encode_body({:xml, term}) do - blob = XmlBuilder.generate(term, format: :none) - headers = [{"content-type", "application/xml"}] - {:ok, {headers, blob}} - end - - defp encode_body({:json, term}) do - case Jason.encode(term) do - {:ok, blob} -> - headers = [{"content-type", "application/json"}] - {:ok, {headers, blob}} - - {:error, _} = err -> - err - end - end - - defp encode_body({:stream, stream}) do - {:ok, {[], {:stream, stream}}} - end - - defp encode_body(binary) when is_binary(binary) do - {:ok, {[], binary}} - end - - defp encode_item(name, headers, blob) when (is_atom(name) or is_binary(name)) and - is_list(headers) and - is_binary(blob) do - headers = [ - {"content-disposition", "form-data; name=\"#{name}\""}, - {"content-length", to_string(byte_size(blob))} - | headers - ] - - [ - encode_headers(headers), - "\r\n", - blob, - ] - end - - defp form_data_stream(:end) do - {:halt, :end} - end - - defp form_data_stream({:next_item, boundary, []}) do - {["--", boundary, "--\r\n"], :end} - end - - defp form_data_stream({:next_item, boundary, [item | items]}) do - form_data_stream({:send_item_start, boundary, item, items}) - end - - defp form_data_stream( - {:send_item_start, boundary, {name, headers, body}, items} - ) when is_binary(body) or is_list(body) do - headers = Proplist.merge([ - {"content-disposition", "form-data; name=\"#{name}\""}, - {"content-length", to_string(IO.iodata_length(body))}, - ], headers) - - iolist = [ - "--",boundary,"\r\n", - encode_headers(headers), - "\r\n" - ] - - {iolist, {:send_item_body, boundary, {name, headers, body}, items}} - end - - defp form_data_stream( - {:send_item_start, boundary, {name, headers, stream}, items} - ) do - headers = Proplist.merge([ - {"content-disposition", "form-data; name=\"#{name}\""}, - {"transfer-encoding", "chunked"}, - ], headers) - - iolist = [ - "--",boundary,"\r\n", - encode_headers(headers), - "\r\n" - ] - - {iolist, {:send_item_body, boundary, {name, headers, stream}, items}} - end - - defp form_data_stream( - {:send_item_body, boundary, {_name, _headers, body}, items} - ) when is_binary(body) do - {[body, "\r\n"], {:next_item, boundary, items}} - end - - defp form_data_stream( - {:send_item_body, boundary, {_name, _headers, stream} = item, items} - ) do - {stream, {:end_current_item, boundary, item, items}} - end - - defp form_data_stream( - {:end_current_item, boundary, {_name, _headers, _stream}, items} - ) do - {["\r\n"], {:next_item, boundary, items}} - end end diff --git a/lib/pepper/http/content_client/headers.ex b/lib/pepper/http/content_client/headers.ex new file mode 100644 index 0000000..e0afa54 --- /dev/null +++ b/lib/pepper/http/content_client/headers.ex @@ -0,0 +1,15 @@ +defmodule Pepper.HTTP.ContentClient.Headers do + @moduledoc """ + ContentClient additional headers module, will add additional headers based on the options. + """ + header_modules = + Application.compile_env(:pepper_http, :base_additional_header_modules, [ + Pepper.HTTP.ContentClient.Headers.Authorization, + ]) ++ Application.compile_env(:pepper_http, :additional_header_modules, []) + + def add_additional_headers(blob, headers, options) do + Enum.reduce(unquote(header_modules), {headers, options}, fn module, {headers, options} -> + module.call(blob, headers, options) + end) + end +end diff --git a/lib/pepper/http/content_client/headers/authorization.ex b/lib/pepper/http/content_client/headers/authorization.ex new file mode 100644 index 0000000..1b6f935 --- /dev/null +++ b/lib/pepper/http/content_client/headers/authorization.ex @@ -0,0 +1,56 @@ +defmodule Pepper.HTTP.ContentClient.Headers.Authorization do + defmodule UnhandledAuthMethodError do + defexception [:message, :method] + end + + @moduledoc """ + Header module for adding an Authorization Header + """ + + @spec call(any(), headers::Proplist.t(), options::Keyword.t()) :: + {headers::Proplist.t(), options::Keyword.t()} + def call(_blob, headers, options) do + {auth_method, options} = Keyword.pop(options, :auth_method, :none) + {username, options} = Keyword.pop(options, :auth_identity) + {password, options} = Keyword.pop(options, :auth_secret) + + headers = set_authorization_header(auth_method, username, password, headers) + {headers, options} + end + + def set_authorization_header(:none, _unused1, _unsued2, headers) do + headers + end + + def set_authorization_header(:basic, username, password, headers) do + auth = Base.encode64("#{username}:#{password}") + + [ + {"authorization", "Basic #{auth}"} + | headers + ] + end + + def set_authorization_header(:bearer, _unused, token, headers) do + [ + {"authorization", "Bearer #{token}"} + | headers + ] + end + + def set_authorization_header("none", _unused1, _unsued2, headers) do + headers + end + + def set_authorization_header("basic", username, password, headers) do + set_authorization_header(:basic, username, password, headers) + end + + def set_authorization_header("bearer", unused, token, headers) do + set_authorization_header(:bearer, unused, token, headers) + end + + def set_authorization_header(method, _username, _password, _headers) do + raise UnhandledAuthMethodError, method: method, message: "unhandled auth method" + end +end diff --git a/lib/pepper/http/request.ex b/lib/pepper/http/request.ex index f2340f5..73f116a 100644 --- a/lib/pepper/http/request.ex +++ b/lib/pepper/http/request.ex @@ -13,6 +13,7 @@ defmodule Pepper.HTTP.Request do response_body_handler: nil, response_body_handler_options: nil, options: nil, + time: nil, ] @type t :: %__MODULE__{ @@ -29,5 +30,6 @@ defmodule Pepper.HTTP.Request do response_body_handler: module(), response_body_handler_options: any(), options: Keyword.t(), + time: nil, } end diff --git a/lib/pepper/http/response.ex b/lib/pepper/http/response.ex index df4e5e5..0bc6d15 100644 --- a/lib/pepper/http/response.ex +++ b/lib/pepper/http/response.ex @@ -1,5 +1,6 @@ defmodule Pepper.HTTP.Response do defstruct [ + ref: nil, request: nil, headers: [], protocol: :unknown, @@ -7,12 +8,15 @@ defmodule Pepper.HTTP.Response do body_state: :none, body_handler: nil, body_handler_options: nil, + original_body: nil, body: "", data: nil, status_code: nil, + time: nil, ] @type t :: %__MODULE__{ + ref: reference(), request: Pepper.HTTP.Request.t(), headers: [{String.t(), String.t()}], protocol: :unknown | :http1 | :http2, @@ -20,8 +24,10 @@ defmodule Pepper.HTTP.Response do body_state: :none, body_handler: module(), body_handler_options: any(), + original_body: binary(), body: binary(), data: any(), status_code: non_neg_integer() | nil, + time: integer(), } end diff --git a/lib/pepper/http/response_body_handler/file.ex b/lib/pepper/http/response_body_handler/file.ex index 27b5be2..532cd57 100644 --- a/lib/pepper/http/response_body_handler/file.ex +++ b/lib/pepper/http/response_body_handler/file.ex @@ -53,7 +53,13 @@ defmodule Pepper.HTTP.ResponseBodyHandler.File do @impl true def cancel(%Response{data: {_filename, file}} = response) do - :ok = File.close(file) + case File.close(file) do + :ok -> + :ok + + {:error, :einval} -> + :ok + end {:ok, %{response | data: nil}} end diff --git a/lib/pepper/http/utils.ex b/lib/pepper/http/utils.ex index d6323a9..3eb2861 100644 --- a/lib/pepper/http/utils.ex +++ b/lib/pepper/http/utils.ex @@ -8,7 +8,53 @@ defmodule Pepper.HTTP.Utils do import Mint.HTTP1.Parse - require SweetXml + @type http_method :: String.t() + | :connect + | :delete + | :get + | :head + | :options + | :patch + | :post + | :put + | :trace + + @type ets_reducer :: (obj::tuple(), acc::any() -> acc::any()) + + @spec safe_reduce_ets_table(:ets.table(), any(), ets_reducer()) :: + (acc::any()) + def safe_reduce_ets_table(table, acc, callback) do + try do + :ets.safe_fixtable(table, true) + reduce_ets_table(table, acc, callback) + after + :ets.safe_fixtable(table, false) + end + end + + @spec reduce_ets_table(:ets.table(), any(), ets_reducer()) :: + (acc::any()) + def reduce_ets_table(table, acc, callback) do + match_spec = [ + { + :"$1", + [], + [:"$_"], + } + ] + do_reduce_ets_table_bag(:ets.select(table, match_spec, 1), acc, callback) + end + + defp do_reduce_ets_table_bag(res, acc, callback) do + case res do + :"$end_of_table" -> + acc + + {[row], continuation} -> + acc = callback.(row, acc) + do_reduce_ets_table_bag(:ets.select(continuation), acc, callback) + end + end def to_multipart_message(rows, state \\ {:headers, %Segment{}}) @@ -104,59 +150,34 @@ defmodule Pepper.HTTP.Utils do |> binary_part(0, len) end - def handle_xml_body(doc) do + @spec generate_random_binary(non_neg_integer()) :: binary() + def generate_random_binary(len) when is_integer(len) and len > 0 do + :crypto.strong_rand_bytes(len) + end + + def handle_xml_body(doc) when is_tuple(doc) or is_list(doc) do doc = doc |> List.wrap() - |> Enum.map(fn item -> - record_type = elem(item, 0) - xml_item_to_map(record_type, item) + |> Enum.map(fn {_elem_name, _attributes, _children} = item -> + xml_item_to_map(item) end) |> deflate_xml_map() doc end - for name <- [ - :xmlDecl, - :xmlAttribute, - :xmlNamespace, - :xmlNsNode, - :xmlElement, - :xmlText, - :xmlComment, - :xmlPI, - :xmlDocument, - :xmlObj, - ] do - def xml_item_to_map(unquote(name), item) do - SweetXml.unquote(name)(item) - |> xml_item_deep_to_map(unquote(name)) - end + @spec xml_item_to_map(tuple()) :: tuple() + def xml_item_to_map({elem_name, _attributes, children}) do + {elem_name, Enum.map(children, fn item -> + xml_item_to_map(item) + end)} end - def xml_item_deep_to_map(item, :xmlElement) do - #namespace = xml_item_to_map(:xmlNamespace, item[:namespace]) - #item = put_in(item[:namespace], namespace) - #put_in(item[:content], Enum.map(item[:content], fn item -> - # xml_item_to_map(elem(item, 0), item) - #end)) - - {item[:expanded_name], - Enum.map(item[:content], fn item -> - xml_item_to_map(elem(item, 0), item) - end) - } - end - - def xml_item_deep_to_map(item, :xmlNamespace) do + def xml_item_to_map(item) do item end - def xml_item_deep_to_map(item, :xmlText) do - to_string(item[:value]) - end - def deflate_xml_map([{_, _} | _] = list) when is_list(list) do [ Enum.reduce(list, %{}, fn @@ -186,13 +207,28 @@ defmodule Pepper.HTTP.Utils do end) end - def normalize_http_method(:head), do: "HEAD" + @spec encode_query_params(map() | Keyword.t(), :default | :duplicate) :: String.t() + def encode_query_params(nil, _encoding) do + nil + end + + def encode_query_params(query_params, :default) when is_list(query_params) or is_map(query_params) do + Plug.Conn.Query.encode(query_params) + end + + def encode_query_params(query_params, :duplicate) when is_list(query_params) or is_map(query_params) do + Pepper.HTTP.Utils.QP.encode(query_params) + end + + def normalize_http_method(:connect), do: "CONNECT" + def normalize_http_method(:delete), do: "DELETE" def normalize_http_method(:get), do: "GET" + def normalize_http_method(:head), do: "HEAD" + def normalize_http_method(:options), do: "OPTIONS" def normalize_http_method(:patch), do: "PATCH" def normalize_http_method(:post), do: "POST" def normalize_http_method(:put), do: "PUT" - def normalize_http_method(:delete), do: "DELETE" - def normalize_http_method(:options), do: "OPTIONS" + def normalize_http_method(:trace), do: "TRACE" def normalize_http_method(method) when is_binary(method) do String.upcase(method) @@ -221,7 +257,7 @@ defmodule Pepper.HTTP.Utils do end # Percent-encoding is not case sensitive so we have to account for lowercase and uppercase. - @hex_characters '0123456789abcdefABCDEF' + @hex_characters ~c'0123456789abcdefABCDEF' def validate_target!(target), do: validate_target!(target, target) @@ -256,7 +292,7 @@ defmodule Pepper.HTTP.Utils do def validate_header_value!(name, value) do _ = for <> do - unless is_vchar(char) or char in '\s\t' do + unless is_vchar(char) or char in ~c'\s\t' do throw({:mint, {:invalid_header_value, name, value}}) end end diff --git a/lib/pepper/http/utils/qp.ex b/lib/pepper/http/utils/qp.ex new file mode 100644 index 0000000..545f614 --- /dev/null +++ b/lib/pepper/http/utils/qp.ex @@ -0,0 +1,87 @@ +defmodule Pepper.HTTP.Utils.QP do + @moduledoc """ + Copy of Plug.Conn.Query, but with the uniqueness removed from lists. + """ + + @doc """ + Encodes the given map or list of tuples. + """ + @spec encode(Enumerable.t(), (term() -> binary())) :: binary() + def encode(kv, encoder \\ &to_string/1) do + IO.iodata_to_binary(encode_pair("", kv, encoder)) + end + + # covers structs + defp encode_pair(field, %{__struct__: struct} = map, encoder) when is_atom(struct) do + [field, ?= | encode_value(map, encoder)] + end + + # covers maps + defp encode_pair(parent_field, %{} = map, encoder) do + encode_kv(map, parent_field, encoder) + end + + # covers keyword lists + defp encode_pair(parent_field, list, encoder) when is_list(list) and is_tuple(hd(list)) do + encode_kv(list, parent_field, encoder) + end + + # covers non-keyword lists + defp encode_pair(parent_field, list, encoder) when is_list(list) do + mapper = fn + value when is_map(value) and map_size(value) != 1 -> + raise ArgumentError, + "cannot encode maps inside lists when the map has 0 or more than 1 element, " <> + "got: #{inspect(value)}" + + value -> + [?&, encode_pair(parent_field <> "[]", value, encoder)] + end + + list + |> Enum.flat_map(mapper) + |> prune() + end + + # covers nil + defp encode_pair(field, nil, _encoder) do + [field, ?=] + end + + # encoder fallback + defp encode_pair(field, value, encoder) do + [field, ?= | encode_value(value, encoder)] + end + + defp encode_kv(kv, parent_field, encoder) do + mapper = fn + {_, value} when value in [%{}, []] -> + [] + + {field, value} -> + field = + if parent_field == "" do + encode_key(field) + else + parent_field <> "[" <> encode_key(field) <> "]" + end + + [?&, encode_pair(field, value, encoder)] + end + + kv + |> Enum.flat_map(mapper) + |> prune() + end + + defp encode_key(item) do + item |> to_string |> URI.encode_www_form() + end + + defp encode_value(item, encoder) do + item |> encoder.() |> URI.encode_www_form() + end + + defp prune([?& | t]), do: t + defp prune([]), do: [] +end diff --git a/mix.exs b/mix.exs index f6f65f2..fbcbe3c 100644 --- a/mix.exs +++ b/mix.exs @@ -35,17 +35,15 @@ defmodule Pepper.HTTP.MixProject do {:plug, "~> 1.6"}, # JSON Parser {:jason, "~> 1.2"}, - # XML Parser - {:sweet_xml, "~> 0.6"}, - # XML Encoder - {:xml_builder, "~> 2.0"}, + # XML Decoder / Encoder + {:saxy, "~> 1.5"}, # CSV - {:csv, "~> 2.0"}, + {:csv, "~> 2.0 or ~> 3.0"}, # HTTP Library {:mint, "~> 1.0"}, # Certificate Store {:castore, "~> 0.1 or ~> 1.0"}, - {:bypass, "~> 1.0 or ~> 2.1", [only: :test]}, + {:bypass, "~> 2.1", git: "https://github.com/IceDragon200/bypass", branch: "awaiting-patch", only: [:test]}, ] end diff --git a/mix.lock b/mix.lock index 87f2aa7..abfb476 100644 --- a/mix.lock +++ b/mix.lock @@ -1,21 +1,19 @@ %{ "accept": {:hex, :accept, "0.3.5", "b33b127abca7cc948bbe6caa4c263369abf1347cfa9d8e699c6d214660f10cd1", [:rebar3], [], "hexpm", "11b18c220bcc2eab63b5470c038ef10eb6783bcb1fcdb11aa4137defa5ac1bb8"}, - "bypass": {:hex, :bypass, "2.1.0", "909782781bf8e20ee86a9cabde36b259d44af8b9f38756173e8f5e2e1fabb9b1", [:mix], [{:plug, "~> 1.7", [hex: :plug, repo: "hexpm", optional: false]}, {:plug_cowboy, "~> 2.0", [hex: :plug_cowboy, repo: "hexpm", optional: false]}, {:ranch, "~> 1.3", [hex: :ranch, repo: "hexpm", optional: false]}], "hexpm", "d9b5df8fa5b7a6efa08384e9bbecfe4ce61c77d28a4282f79e02f1ef78d96b80"}, - "castore": {:hex, :castore, "1.0.5", "9eeebb394cc9a0f3ae56b813459f990abb0a3dedee1be6b27fdb50301930502f", [:mix], [], "hexpm", "8d7c597c3e4a64c395980882d4bca3cebb8d74197c590dc272cfd3b6a6310578"}, - "cowboy": {:hex, :cowboy, "2.10.0", "ff9ffeff91dae4ae270dd975642997afe2a1179d94b1887863e43f681a203e26", [:make, :rebar3], [{:cowlib, "2.12.1", [hex: :cowlib, repo: "hexpm", optional: false]}, {:ranch, "1.8.0", [hex: :ranch, repo: "hexpm", optional: false]}], "hexpm", "3afdccb7183cc6f143cb14d3cf51fa00e53db9ec80cdcd525482f5e99bc41d6b"}, + "bypass": {:git, "https://github.com/IceDragon200/bypass", "36a0a15556b62f115408a0dc92ba012104f667c7", [branch: "awaiting-patch"]}, + "castore": {:hex, :castore, "1.0.7", "b651241514e5f6956028147fe6637f7ac13802537e895a724f90bf3e36ddd1dd", [:mix], [], "hexpm", "da7785a4b0d2a021cd1292a60875a784b6caef71e76bf4917bdee1f390455cf5"}, + "cowboy": {:hex, :cowboy, "2.8.0", "f3dc62e35797ecd9ac1b50db74611193c29815401e53bac9a5c0577bd7bc667d", [:rebar3], [{:cowlib, "~> 2.9.1", [hex: :cowlib, repo: "hexpm", optional: false]}, {:ranch, "~> 1.7.1", [hex: :ranch, repo: "hexpm", optional: false]}], "hexpm", "4643e4fba74ac96d4d152c75803de6fad0b3fa5df354c71afdd6cbeeb15fac8a"}, "cowboy_telemetry": {:hex, :cowboy_telemetry, "0.4.0", "f239f68b588efa7707abce16a84d0d2acf3a0f50571f8bb7f56a15865aae820c", [:rebar3], [{:cowboy, "~> 2.7", [hex: :cowboy, repo: "hexpm", optional: false]}, {:telemetry, "~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "7d98bac1ee4565d31b62d59f8823dfd8356a169e7fcbb83831b8a5397404c9de"}, - "cowlib": {:hex, :cowlib, "2.12.1", "a9fa9a625f1d2025fe6b462cb865881329b5caff8f1854d1cbc9f9533f00e1e1", [:make, :rebar3], [], "hexpm", "163b73f6367a7341b33c794c4e88e7dbfe6498ac42dcd69ef44c5bc5507c8db0"}, - "csv": {:hex, :csv, "2.5.0", "c47b5a5221bf2e56d6e8eb79e77884046d7fd516280dc7d9b674251e0ae46246", [:mix], [{:parallel_stream, "~> 1.0.4 or ~> 1.1.0", [hex: :parallel_stream, repo: "hexpm", optional: false]}], "hexpm", "e821f541487045c7591a1963eeb42afff0dfa99bdcdbeb3410795a2f59c77d34"}, - "hpax": {:hex, :hpax, "0.1.2", "09a75600d9d8bbd064cdd741f21fc06fc1f4cf3d0fcc335e5aa19be1a7235c84", [:mix], [], "hexpm", "2c87843d5a23f5f16748ebe77969880e29809580efdaccd615cd3bed628a8c13"}, + "cowlib": {:hex, :cowlib, "2.9.1", "61a6c7c50cf07fdd24b2f45b89500bb93b6686579b069a89f88cb211e1125c78", [:rebar3], [], "hexpm", "e4175dc240a70d996156160891e1c62238ede1729e45740bdd38064dad476170"}, + "csv": {:hex, :csv, "3.2.1", "6d401f1ed33acb2627682a9ab6021e96d33ca6c1c6bccc243d8f7e2197d032f5", [:mix], [], "hexpm", "8f55a0524923ae49e97ff2642122a2ce7c61e159e7fe1184670b2ce847aee6c8"}, + "hpax": {:hex, :hpax, "0.2.0", "5a58219adcb75977b2edce5eb22051de9362f08236220c9e859a47111c194ff5", [:mix], [], "hexpm", "bea06558cdae85bed075e6c036993d43cd54d447f76d8190a8db0dc5893fa2f1"}, "jason": {:hex, :jason, "1.4.1", "af1504e35f629ddcdd6addb3513c3853991f694921b1b9368b0bd32beb9f1b63", [:mix], [{:decimal, "~> 1.0 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: true]}], "hexpm", "fbb01ecdfd565b56261302f7e1fcc27c4fb8f32d56eab74db621fc154604a7a1"}, "mime": {:hex, :mime, "2.0.5", "dc34c8efd439abe6ae0343edbb8556f4d63f178594894720607772a041b04b02", [:mix], [], "hexpm", "da0d64a365c45bc9935cc5c8a7fc5e49a0e0f9932a761c55d6c52b142780a05c"}, - "mint": {:hex, :mint, "1.5.2", "4805e059f96028948870d23d7783613b7e6b0e2fb4e98d720383852a760067fd", [:mix], [{:castore, "~> 0.1.0 or ~> 1.0", [hex: :castore, repo: "hexpm", optional: true]}, {:hpax, "~> 0.1.1", [hex: :hpax, repo: "hexpm", optional: false]}], "hexpm", "d77d9e9ce4eb35941907f1d3df38d8f750c357865353e21d335bdcdf6d892a02"}, - "parallel_stream": {:hex, :parallel_stream, "1.1.0", "f52f73eb344bc22de335992377413138405796e0d0ad99d995d9977ac29f1ca9", [:mix], [], "hexpm", "684fd19191aedfaf387bbabbeb8ff3c752f0220c8112eb907d797f4592d6e871"}, - "plug": {:hex, :plug, "1.15.2", "94cf1fa375526f30ff8770837cb804798e0045fd97185f0bb9e5fcd858c792a3", [:mix], [{:mime, "~> 1.0 or ~> 2.0", [hex: :mime, repo: "hexpm", optional: false]}, {:plug_crypto, "~> 1.1.1 or ~> 1.2 or ~> 2.0", [hex: :plug_crypto, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4.3 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "02731fa0c2dcb03d8d21a1d941bdbbe99c2946c0db098eee31008e04c6283615"}, - "plug_cowboy": {:hex, :plug_cowboy, "2.6.1", "9a3bbfceeb65eff5f39dab529e5cd79137ac36e913c02067dba3963a26efe9b2", [:mix], [{:cowboy, "~> 2.7", [hex: :cowboy, repo: "hexpm", optional: false]}, {:cowboy_telemetry, "~> 0.3", [hex: :cowboy_telemetry, repo: "hexpm", optional: false]}, {:plug, "~> 1.14", [hex: :plug, repo: "hexpm", optional: false]}], "hexpm", "de36e1a21f451a18b790f37765db198075c25875c64834bcc82d90b309eb6613"}, - "plug_crypto": {:hex, :plug_crypto, "2.0.0", "77515cc10af06645abbfb5e6ad7a3e9714f805ae118fa1a70205f80d2d70fe73", [:mix], [], "hexpm", "53695bae57cc4e54566d993eb01074e4d894b65a3766f1c43e2c61a1b0f45ea9"}, - "ranch": {:hex, :ranch, "1.8.0", "8c7a100a139fd57f17327b6413e4167ac559fbc04ca7448e9be9057311597a1d", [:make, :rebar3], [], "hexpm", "49fbcfd3682fab1f5d109351b61257676da1a2fdbe295904176d5e521a2ddfe5"}, - "sweet_xml": {:hex, :sweet_xml, "0.7.4", "a8b7e1ce7ecd775c7e8a65d501bc2cd933bff3a9c41ab763f5105688ef485d08", [:mix], [], "hexpm", "e7c4b0bdbf460c928234951def54fe87edf1a170f6896675443279e2dbeba167"}, + "mint": {:hex, :mint, "1.6.1", "065e8a5bc9bbd46a41099dfea3e0656436c5cbcb6e741c80bd2bad5cd872446f", [:mix], [{:castore, "~> 0.1.0 or ~> 1.0", [hex: :castore, repo: "hexpm", optional: true]}, {:hpax, "~> 0.1.1 or ~> 0.2.0", [hex: :hpax, repo: "hexpm", optional: false]}], "hexpm", "4fc518dcc191d02f433393a72a7ba3f6f94b101d094cb6bf532ea54c89423780"}, + "plug": {:hex, :plug, "1.16.1", "40c74619c12f82736d2214557dedec2e9762029b2438d6d175c5074c933edc9d", [:mix], [{:mime, "~> 1.0 or ~> 2.0", [hex: :mime, repo: "hexpm", optional: false]}, {:plug_crypto, "~> 1.1.1 or ~> 1.2 or ~> 2.0", [hex: :plug_crypto, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4.3 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "a13ff6b9006b03d7e33874945b2755253841b238c34071ed85b0e86057f8cddc"}, + "plug_cowboy": {:hex, :plug_cowboy, "2.7.1", "87677ffe3b765bc96a89be7960f81703223fe2e21efa42c125fcd0127dd9d6b2", [:mix], [{:cowboy, "~> 2.7", [hex: :cowboy, repo: "hexpm", optional: false]}, {:cowboy_telemetry, "~> 0.3", [hex: :cowboy_telemetry, repo: "hexpm", optional: false]}, {:plug, "~> 1.14", [hex: :plug, repo: "hexpm", optional: false]}], "hexpm", "02dbd5f9ab571b864ae39418db7811618506256f6d13b4a45037e5fe78dc5de3"}, + "plug_crypto": {:hex, :plug_crypto, "2.1.0", "f44309c2b06d249c27c8d3f65cfe08158ade08418cf540fd4f72d4d6863abb7b", [:mix], [], "hexpm", "131216a4b030b8f8ce0f26038bc4421ae60e4bb95c5cf5395e1421437824c4fa"}, + "ranch": {:hex, :ranch, "1.7.1", "6b1fab51b49196860b733a49c07604465a47bdb78aa10c1c16a3d199f7f8c881", [:rebar3], [], "hexpm", "451d8527787df716d99dc36162fca05934915db0b6141bbdac2ea8d3c7afc7d7"}, + "saxy": {:hex, :saxy, "1.5.0", "0141127f2d042856f135fb2d94e0beecda7a2306f47546dbc6411fc5b07e28bf", [:mix], [], "hexpm", "ea7bb6328fbd1f2aceffa3ec6090bfb18c85aadf0f8e5030905e84235861cf89"}, "telemetry": {:hex, :telemetry, "1.2.1", "68fdfe8d8f05a8428483a97d7aab2f268aaff24b49e0f599faa091f1d4e7f61c", [:rebar3], [], "hexpm", "dad9ce9d8effc621708f99eac538ef1cbe05d6a874dd741de2e689c47feafed5"}, - "xml_builder": {:hex, :xml_builder, "2.2.0", "cc5f1eeefcfcde6e90a9b77fb6c490a20bc1b856a7010ce6396f6da9719cbbab", [:mix], [], "hexpm", "9d66d52fb917565d358166a4314078d39ef04d552904de96f8e73f68f64a62c9"}, } diff --git a/test/pepper/http/accept_encoding_header_test.exs b/test/pepper/http/accept_encoding_header_test.exs new file mode 100644 index 0000000..c420482 --- /dev/null +++ b/test/pepper/http/accept_encoding_header_test.exs @@ -0,0 +1,24 @@ +defmodule Pepper.HTTP.AcceptEncodingHeaderTest do + @moduledoc """ + This test is a sanity check of :accept_encoding_header + """ + use ExUnit.Case + + describe "parse/1" do + test "can correctly parse" do + assert [ + {:content_coding, ~c'*', 1, []} + ] = :accept_encoding_header.parse("*") + + assert [ + {:content_coding, ~c'identity', 1, []} + ] = :accept_encoding_header.parse("identity") + + assert [ + {:content_coding, ~c'identity', 1, []}, + {:content_coding, ~c'deflate', 1, []}, + {:content_coding, ~c'gzip', 1, []}, + ] = :accept_encoding_header.parse("identity, deflate, gzip") + end + end +end diff --git a/test/pepper/http/client/get_test.exs b/test/pepper/http/client/get_test.exs new file mode 100644 index 0000000..0948285 --- /dev/null +++ b/test/pepper/http/client/get_test.exs @@ -0,0 +1,135 @@ +defmodule Pepper.HTTP.Client.GetTest do + use Pepper.HTTP.Support.ClientCase + + alias Pepper.HTTP.ContentClient, as: Client + + import Plug.Conn + + Enum.each([true, false], fn with_connection_pool -> + Enum.each([:http1, :http2], fn protocol -> + describe "request/6 (with_connection_pool:#{with_connection_pool}, protocol:#{protocol}, method:GET)" do + @describetag with_connection_pool: with_connection_pool, protocol: to_string(protocol), method: "GET" + + test "can perform a GET request with string url", %{client_options: client_options} do + bypass = Bypass.open() + + Bypass.expect bypass, "GET", "/path/to/glory", fn conn -> + conn = Plug.Conn.fetch_query_params(conn) + + assert %{ + "this" => "is a test", + "also" => %{ + "that" => "was a test", + } + } = conn.query_params + + send_resp(conn, 200, "") + end + + headers = [] + + assert {:ok, %{status_code: 200}, _} = + Client.request( + "GET", + "http://localhost:#{bypass.port}/path/to/glory", + [{"this", "is a test"}, {"also", %{"that" => "was a test"}}], + headers, + nil, + client_options + ) + end + + test "can perform a GET request with URI", %{client_options: client_options} do + bypass = Bypass.open() + + Bypass.expect bypass, "GET", "/path/to/glory", fn conn -> + conn = Plug.Conn.fetch_query_params(conn) + + assert %{ + "this" => "is a test", + "also" => %{ + "that" => "was a test", + } + } = conn.query_params + + send_resp(conn, 200, "") + end + + headers = [] + + uri = %URI{ + scheme: "http", + host: "localhost", + path: "/path/to/glory", + port: bypass.port, + } + + assert {:ok, %{status_code: 200}, _} = + Client.request( + "GET", + uri, + [{"this", "is a test"}, {"also", %{"that" => "was a test"}}], + headers, + nil, + client_options + ) + end + + test "can handle a request that fails to connect", %{client_options: client_options} do + bypass = Bypass.open() + + Bypass.down bypass + + headers = [] + + assert {:error, %Pepper.HTTP.ConnectError{reason: %Mint.TransportError{reason: :econnrefused}}} = + Client.request( + "GET", + "http://localhost:#{bypass.port}/path/to/glory", + [{"this", "is a test"}, {"also", %{"that" => "was a test"}}], + headers, + nil, + # timeout is intentionally lower than sleep timer in server + Keyword.merge(client_options, [ + recv_timeout: 1000, + connect_timeout: 1000, + ]) + ) + end + + test "can handle a timeout while receiving data from endpoint", %{client_options: client_options} do + bypass = Bypass.open() + + parent = self() + Bypass.expect bypass, "GET", "/path/to/glory", fn conn -> + # purposely stall + send(parent, {:stalling, self()}) + receive do + :abort -> + :ok + after 1000 -> + :ok + end + send_resp(conn, 200, "") + end + + headers = [] + + assert {:error, %Pepper.HTTP.ReceiveError{reason: %Mint.TransportError{reason: :timeout}}} = + Client.request( + "GET", + "http://localhost:#{bypass.port}/path/to/glory", + [{"this", "is a test"}, {"also", %{"that" => "was a test"}}], + headers, + nil, + # timeout is intentionally lower than sleep timer in server + Keyword.merge(client_options, [recv_timeout: 200]) + ) + + assert_received {:stalling, bypass_session_pid} + send bypass_session_pid, :abort + end + end + end) + end) +end diff --git a/test/pepper/http/client/post_test.exs b/test/pepper/http/client/post_test.exs new file mode 100644 index 0000000..9a55268 --- /dev/null +++ b/test/pepper/http/client/post_test.exs @@ -0,0 +1,48 @@ +defmodule Pepper.HTTP.Client.PostTest do + use Pepper.HTTP.Support.ClientCase + + alias Pepper.HTTP.ContentClient, as: Client + + import Plug.Conn + + Enum.each([true, false], fn with_connection_pool -> + Enum.each([:http1, :http2], fn protocol -> + describe "request/6 (with_connection_pool:#{with_connection_pool}, protocol:#{protocol}, method:POST)" do + @describetag with_connection_pool: with_connection_pool, protocol: to_string(protocol), method: "POST" + + test "can perform a POST request", %{client_options: client_options} do + bypass = Bypass.open() + + Bypass.expect bypass, "POST", "/path/to/fame", fn conn -> + {:ok, body, conn} = Plug.Conn.read_body(conn) + conn = Plug.Conn.fetch_query_params(conn) + + assert "Hello, World" == body + + assert %{ + "this" => "is a test", + "also" => %{ + "that" => "was a test", + } + } = conn.query_params + + send_resp(conn, 200, "") + end + + headers = [] + body = {:text, "Hello, World"} + + assert {:ok, %{status_code: 200}, _} = + Client.request( + "POST", + "http://localhost:#{bypass.port}/path/to/fame", + [{"this", "is a test"}, {"also", %{"that" => "was a test"}}], + headers, + body, + client_options + ) + end + end + end) + end) +end diff --git a/test/pepper/http/client_test.exs b/test/pepper/http/client_test.exs deleted file mode 100644 index 8f4d467..0000000 --- a/test/pepper/http/client_test.exs +++ /dev/null @@ -1,157 +0,0 @@ -defmodule Pepper.HTTP.ClientTest do - use Pepper.HTTP.Support.ClientCase - - alias Pepper.HTTP.ContentClient, as: Client - - import Plug.Conn - - describe "request/6 (GET)" do - test "can perform a GET request with string url" do - bypass = Bypass.open() - - Bypass.expect bypass, "GET", "/path/to/glory", fn conn -> - conn = Plug.Conn.fetch_query_params(conn) - - assert %{ - "this" => "is a test", - "also" => %{ - "that" => "was a test", - } - } = conn.query_params - - send_resp(conn, 200, "") - end - - headers = [] - - assert {:ok, %{status_code: 200}, _} = - Client.request( - "GET", - "http://localhost:#{bypass.port}/path/to/glory", - [{"this", "is a test"}, {"also", %{"that" => "was a test"}}], - headers, - nil, - [] - ) - end - - test "can perform a GET request with URI" do - bypass = Bypass.open() - - Bypass.expect bypass, "GET", "/path/to/glory", fn conn -> - conn = Plug.Conn.fetch_query_params(conn) - - assert %{ - "this" => "is a test", - "also" => %{ - "that" => "was a test", - } - } = conn.query_params - - send_resp(conn, 200, "") - end - - headers = [] - - uri = %URI{ - scheme: "http", - host: "localhost", - path: "/path/to/glory", - port: bypass.port, - } - - assert {:ok, %{status_code: 200}, _} = - Client.request( - "GET", - uri, - [{"this", "is a test"}, {"also", %{"that" => "was a test"}}], - headers, - nil, - [] - ) - end - - test "can handle a request that fails to connect" do - bypass = Bypass.open() - - Bypass.down bypass - - headers = [] - - assert {:error, %Pepper.HTTP.ConnectError{reason: %Mint.TransportError{reason: :econnrefused}}} = - Client.request( - "GET", - "http://localhost:#{bypass.port}/path/to/glory", - [{"this", "is a test"}, {"also", %{"that" => "was a test"}}], - headers, - nil, - # timeout is intentionally lower than sleep timer in server - [ - recv_timeout: 1000, - connect_timeout: 1000, - ] - ) - end - - test "can handle a timeout while receiving data from endpoint" do - bypass = Bypass.open() - - Bypass.expect bypass, "GET", "/path/to/glory", fn conn -> - # purposely stall - Process.sleep 3000 - - send_resp(conn, 200, "") - end - - headers = [] - - assert {:error, %Pepper.HTTP.ReceiveError{reason: %Mint.TransportError{reason: :timeout}}} = - Client.request( - "GET", - "http://localhost:#{bypass.port}/path/to/glory", - [{"this", "is a test"}, {"also", %{"that" => "was a test"}}], - headers, - nil, - # timeout is intentionally lower than sleep timer in server - [recv_timeout: 1000] - ) - - Bypass.down bypass - end - end - - describe "request/6 (POST)" do - test "can perform a POST request" do - bypass = Bypass.open() - - Bypass.expect bypass, "POST", "/path/to/fame", fn conn -> - {:ok, body, conn} = Plug.Conn.read_body(conn) - conn = Plug.Conn.fetch_query_params(conn) - - assert "Hello, World" == body - - assert %{ - "this" => "is a test", - "also" => %{ - "that" => "was a test", - } - } = conn.query_params - - send_resp(conn, 200, "") - end - - headers = [] - body = {:text, "Hello, World"} - - assert {:ok, %{status_code: 200}, _} = - Client.request( - "POST", - "http://localhost:#{bypass.port}/path/to/fame", - [{"this", "is a test"}, {"also", %{"that" => "was a test"}}], - headers, - body, - [] - ) - end - end -end diff --git a/test/pepper/http/connection_manager_test.exs b/test/pepper/http/connection_manager_test.exs index 291a065..b3039c4 100644 --- a/test/pepper/http/connection_manager_test.exs +++ b/test/pepper/http/connection_manager_test.exs @@ -9,107 +9,237 @@ defmodule Pepper.HTTP.ConnectionManagerTest do @port 9899 - describe "connection pool" do - test "can perform GET requests without connection pool" do - bypass = Bypass.open(port: @port) + Enum.each([:http1, :http2], fn protocol -> + describe "without connection pool (protocol:#{protocol})" do + @describetag with_connection_pool: false, protocol: to_string(protocol) - Bypass.stub(bypass, "GET", "/path", fn conn -> - assert [@user_agent] == get_req_header(conn, "user-agent") - send_resp(conn, 200, "DONE") - end) + test "can perform GET requests without connection pool", %{client_options: client_options} do + bypass = Bypass.open(port: @port) - headers = [ - {"accept", "*/*"}, - {"user-agent", @user_agent} - ] + Bypass.stub(bypass, "GET", "/path", fn conn -> + assert [@user_agent] == get_req_header(conn, "user-agent") + send_resp(conn, 200, "DONE") + end) - query_params = [] + headers = [ + {"accept", "*/*"}, + {"user-agent", @user_agent} + ] + + query_params = [] + + for _ <- 1..1000 do + assert {:ok, %{status_code: 200}, {:unk, "DONE"}} = + Client.request( + :get, + "http://localhost:#{@port}/path", + query_params, + headers, + "", + client_options + ) + end + end + + test "can perform HEAD requests without connection pool", %{client_options: client_options} do + bypass = Bypass.open(port: @port) + + Bypass.stub(bypass, "HEAD", "/path", fn conn -> + assert [@user_agent] == get_req_header(conn, "user-agent") + send_resp(conn, 200, "") + end) + + headers = [ + {"accept", "*/*"}, + {"user-agent", @user_agent} + ] - options = [] + query_params = [] - for _ <- 1..1000 do - assert {:ok, %{status_code: 200}, {:unk, "DONE"}} = + for _ <- 1..1000 do + assert {:ok, %{status_code: 200}, {:unk, ""}} = + Client.request( + :head, + "http://localhost:#{@port}/path", + query_params, + headers, + "", + client_options + ) + end + end + end + + describe "with connection pool (protocol:#{protocol})" do + @describetag with_connection_pool: true, protocol: to_string(protocol) + + @tag connection_pool_options: [default_lifespan: 1000] + test "shortlived connections", %{connection_pool_pid: connection_pool_pid, client_options: client_options} do + bypass = Bypass.open(port: @port) + + Bypass.stub(bypass, "GET", "/path", fn conn -> + assert [@user_agent] == get_req_header(conn, "user-agent") + send_resp(conn, 200, "DONE") + end) + + headers = [ + {"accept", "*/*"}, + {"user-agent", @user_agent} + ] + + query_params = [] + + assert %{ + pool_size: 10, + total_size: 0, + busy_size: 0, + available_size: 0, + } = Pepper.HTTP.ConnectionManager.Pooled.get_stats(connection_pool_pid) + + assert {:ok, %{protocol: unquote(protocol), status_code: 200}, {:unk, "DONE"}} = Client.request( :get, "http://localhost:#{@port}/path", query_params, headers, "", - options + client_options ) - end - end - test "can perform a GET request with connection pool" do - bypass = Bypass.open(port: @port) + assert %{ + pool_size: 10, + total_size: 1, + busy_size: 0, + available_size: 1 + } = Pepper.HTTP.ConnectionManager.Pooled.get_stats(connection_pool_pid) - Bypass.stub(bypass, "GET", "/path", fn conn -> - assert [@user_agent] == get_req_header(conn, "user-agent") - send_resp(conn, 200, "DONE") - end) + Process.sleep 1100 - headers = [ - {"accept", "*/*"}, - {"user-agent", @user_agent} - ] + assert %{ + pool_size: 10, + total_size: 0, + busy_size: 0, + available_size: 0 + } = Pepper.HTTP.ConnectionManager.Pooled.get_stats(connection_pool_pid) + end - query_params = [] + test "can close all connections", %{connection_pool_pid: connection_pool_pid, client_options: client_options} do + bypass = Bypass.open(port: @port) - {:ok, pid} = Pepper.HTTP.ConnectionManager.Pooled.start_link([pool_size: 100], []) + Bypass.stub(bypass, "GET", "/path", fn conn -> + assert [@user_agent] == get_req_header(conn, "user-agent") + send_resp(conn, 200, "DONE") + end) - try do - options = [ - connection_manager: :pooled, - connection_manager_id: pid + headers = [ + {"accept", "*/*"}, + {"user-agent", @user_agent} ] - for _ <- 1..1000 do - assert {:ok, %{status_code: 200}, {:unk, "DONE"}} = + query_params = [] + + assert %{ + pool_size: 10, + total_size: 0, + busy_size: 0, + available_size: 0, + } = Pepper.HTTP.ConnectionManager.Pooled.get_stats(connection_pool_pid) + + assert {:ok, %{protocol: unquote(protocol), status_code: 200}, {:unk, "DONE"}} = + Client.request( + :get, + "http://localhost:#{@port}/path", + query_params, + headers, + "", + client_options + ) + + Enum.each(["a", "b", "c", "d", "e", "f"], fn prefix -> + assert {:ok, %{protocol: unquote(protocol), status_code: 200}, {:unk, "DONE"}} = Client.request( :get, - "http://localhost:#{@port}/path", + "http://#{prefix}.localhost:#{@port}/path", query_params, headers, "", - options + client_options ) - end + end) assert %{ - busy_size: 0, - available_size: 1 - } = Pepper.HTTP.ConnectionManager.Pooled.get_stats(pid) - after - Pepper.HTTP.ConnectionManager.Pooled.stop(pid) + pool_size: 10, + total_size: 7, + busy_size: 0, + available_size: 7 + } = Pepper.HTTP.ConnectionManager.Pooled.get_stats(connection_pool_pid) + + :ok = Pepper.HTTP.ConnectionManager.Pooled.close_all(connection_pool_pid, :nuke) end - end - test "can perform a GET request with connection pool and handle a connect timeout" do - bypass = Bypass.open(port: @port) + test "can perform a GET request with connection pool", %{connection_pool_pid: connection_pool_pid, client_options: client_options} do + bypass = Bypass.open(port: @port) - Bypass.down bypass + Bypass.stub(bypass, "GET", "/path", fn conn -> + assert [@user_agent] == get_req_header(conn, "user-agent") + send_resp(conn, 200, "DONE") + end) - headers = [ - {"accept", "*/*"}, - {"user-agent", @user_agent} - ] + headers = [ + {"accept", "*/*"}, + {"user-agent", @user_agent} + ] - query_params = [] + query_params = [] - {:ok, pid} = Pepper.HTTP.ConnectionManager.Pooled.start_link([pool_size: 100], []) + assert %{ + pool_size: 10, + total_size: 0, + busy_size: 0, + available_size: 0, + } = Pepper.HTTP.ConnectionManager.Pooled.get_stats(connection_pool_pid) - try do - options = [ - connection_manager: :pooled, - connection_manager_id: pid, + for _x <- 1..20 do + assert {:ok, %{protocol: unquote(protocol), status_code: 200}, {:unk, "DONE"}} = + Client.request( + :get, + "http://localhost:#{@port}/path", + query_params, + headers, + nil, + client_options + ) + end + + assert %{ + pool_size: 10, + total_size: 1, + busy_size: 0, + available_size: 1 + } = Pepper.HTTP.ConnectionManager.Pooled.get_stats(connection_pool_pid) + end + + test "can perform a GET request with connection pool and handle a connect timeout", %{client_options: client_options, connection_pool_pid: connection_pool_pid} do + bypass = Bypass.open(port: @port) + + Bypass.down bypass + + headers = [ + {"accept", "*/*"}, + {"user-agent", @user_agent} + ] + + query_params = [] + + client_options = Keyword.merge(client_options, [ connect_timeout: 1000, recv_timeout: 1000, - ] + ]) assert %{ busy_size: 0, available_size: 0 - } = Pepper.HTTP.ConnectionManager.Pooled.get_stats(pid) + } = Pepper.HTTP.ConnectionManager.Pooled.get_stats(connection_pool_pid) for _ <- 1..1000 do assert {:error, @@ -122,35 +252,24 @@ defmodule Pepper.HTTP.ConnectionManagerTest do query_params, headers, "", - options + client_options ) end assert %{ busy_size: 0, - available_size: 0 - } = Pepper.HTTP.ConnectionManager.Pooled.get_stats(pid) - after - Pepper.HTTP.ConnectionManager.Pooled.stop(pid) + available_size: 1 + } = Pepper.HTTP.ConnectionManager.Pooled.get_stats(connection_pool_pid) end - end - test "can perform a GET request with connection pool and non-connectable endpoint" do - headers = [ - {"accept", "*/*"}, - {"user-agent", @user_agent} - ] - - query_params = [] - - {:ok, pid} = Pepper.HTTP.ConnectionManager.Pooled.start_link([pool_size: 100], []) - - try do - options = [ - connection_manager: :pooled, - connection_manager_id: pid + test "can perform a GET request with connection pool and non-connectable endpoint", %{client_options: client_options, connection_pool_pid: connection_pool_pid} do + headers = [ + {"accept", "*/*"}, + {"user-agent", @user_agent} ] + query_params = [] + for _ <- 1..1000 do assert {:error, reason} = Client.request( @@ -159,7 +278,7 @@ defmodule Pepper.HTTP.ConnectionManagerTest do query_params, headers, "", - options + client_options ) assert %Pepper.HTTP.ConnectError{ @@ -169,11 +288,9 @@ defmodule Pepper.HTTP.ConnectionManagerTest do assert %{ busy_size: 0, - available_size: 0 - } = Pepper.HTTP.ConnectionManager.Pooled.get_stats(pid) - after - Pepper.HTTP.ConnectionManager.Pooled.stop(pid) + available_size: 1 + } = Pepper.HTTP.ConnectionManager.Pooled.get_stats(connection_pool_pid) end end - end + end) end diff --git a/test/pepper/http/content_client/accept_header_test.exs b/test/pepper/http/content_client/accept_header_test.exs new file mode 100644 index 0000000..d640f24 --- /dev/null +++ b/test/pepper/http/content_client/accept_header_test.exs @@ -0,0 +1,304 @@ +defmodule Pepper.HTTP.ContentClient.AcceptHeaderTest do + use Pepper.HTTP.Support.ClientCase + + Enum.each([true, false], fn with_connection_pool -> + Enum.each([:http1, :http2], fn protocol -> + Enum.each([{:get, "GET"}, {:delete, "DELETE"}], fn {method, method_string} -> + describe "request/6 [with_connection_pool:#{with_connection_pool}, protocol:#{protocol}, method:#{method}, body:json]" do + @describetag with_connection_pool: with_connection_pool, protocol: to_string(protocol), method: to_string(method) + + test "will not parse an json response if not accepted", %{client_options: client_options} do + test_json_unparsed_with_header(%{ + protocol: unquote(protocol), + method: unquote(method), + method_string: unquote(method_string), + client_options: client_options, + }) + end + + test "will parse an json response if no accept header is given", %{client_options: client_options} do + test_json_parsed_without_header(%{ + protocol: unquote(protocol), + method: unquote(method), + method_string: unquote(method_string), + client_options: client_options, + }) + end + + test "will parse an json response if accepted", %{client_options: client_options} do + test_json_parsed_with_accept(%{ + protocol: unquote(protocol), + method: unquote(method), + method_string: unquote(method_string), + client_options: client_options, + }) + end + end + + describe "request/6 [with_connection_pool:#{with_connection_pool}, protocol:#{protocol}, method:#{method}, body:xml]" do + @describetag with_connection_pool: with_connection_pool, protocol: to_string(protocol), method: to_string(method) + + test "will not parse an xml response if not accepted", %{client_options: client_options} do + test_xml_unparsed_with_unaccepted(%{ + protocol: unquote(protocol), + method: unquote(method), + method_string: unquote(method_string), + client_options: client_options, + }) + end + + test "will parse an xml response if no accept header is given", %{client_options: client_options} do + test_xml_parsed_without_header(%{ + protocol: unquote(protocol), + method: unquote(method), + method_string: unquote(method_string), + client_options: client_options, + }) + end + + test "will parse an xml response if accepted", %{client_options: client_options} do + test_xml_parsed_if_accepted(%{ + protocol: unquote(protocol), + method: unquote(method), + method_string: unquote(method_string), + client_options: client_options, + }) + end + end + end) + end) + end) + + defp test_json_unparsed_with_header(options) do + %{ + protocol: protocol, + method: method, + method_string: method_string, + client_options: client_options, + } = options + + bypass = Bypass.open() + + Bypass.expect bypass, method_string, "/path/to/json", fn conn -> + conn + |> put_resp_content_type("application/json") + |> send_resp(200, """ + { + "response": { + "status": "ok" + } + } + """) + end + + headers = [ + {"Accept", "application/xml"} + ] + + assert {:ok, %{protocol: ^protocol, status_code: 200}, {:unaccepted, _}} = + ContentClient.request( + method, + "http://localhost:#{bypass.port}/path/to/json", + @no_query_params, + headers, + @no_body, + client_options + ) + end + + defp test_json_parsed_without_header(options) do + %{ + protocol: protocol, + method: method, + method_string: method_string, + client_options: client_options, + } = options + + bypass = Bypass.open() + + Bypass.expect bypass, method_string, "/path/to/json", fn conn -> + conn + |> put_resp_content_type("application/json") + |> send_resp(200, """ + { + "response": { + "status": "ok" + } + } + """) + end + + assert {:ok, %{protocol: ^protocol, status_code: 200}, {:json, _}} = + ContentClient.request( + method, + "http://localhost:#{bypass.port}/path/to/json", + @no_query_params, + @no_headers, + @no_body, + client_options + ) + end + + defp test_json_parsed_with_accept(options) do + %{ + protocol: protocol, + method: method, + method_string: method_string, + client_options: client_options, + } = options + + bypass = Bypass.open() + + Bypass.expect bypass, method_string, "/path/to/json", fn conn -> + conn + |> put_resp_content_type("application/json") + |> send_resp(200, """ + { + "response": { + "status": "ok" + } + } + """) + end + + headers = [ + # testing downcased headers + {"Accept", "application/json"} + ] + + # regular json + assert {:ok, %{protocol: ^protocol, status_code: 200}, {:json, doc}} = + ContentClient.request( + method, + "http://localhost:#{bypass.port}/path/to/json", + @no_query_params, + headers, + @no_body, + client_options + ) + + assert %{ + "response" => %{ + "status" => "ok" + } + } = doc + end + + defp test_xml_unparsed_with_unaccepted(options) do + %{ + protocol: protocol, + method: method, + method_string: method_string, + client_options: client_options, + } = options + + bypass = Bypass.open() + + Bypass.expect bypass, method_string, "/path/to/xml", fn conn -> + conn + |> put_resp_content_type("application/xml") + |> send_resp(200, """ + + OK + + """) + end + + headers = [ + {"Accept", "application/json"} + ] + + assert {:ok, %{protocol: ^protocol, status_code: 200}, {:unaccepted, _}} = + ContentClient.request( + method, + "http://localhost:#{bypass.port}/path/to/xml", + @no_query_params, + headers, + @no_body, + client_options + ) + end + + defp test_xml_parsed_without_header(options) do + %{ + protocol: protocol, + method: method, + method_string: method_string, + client_options: client_options, + } = options + + bypass = Bypass.open() + + Bypass.expect bypass, method_string, "/path/to/xml", fn conn -> + conn + |> put_resp_content_type("application/xml") + |> send_resp(200, """ + + OK + + """) + end + + assert {:ok, %{protocol: ^protocol, status_code: 200}, {:xml, _}} = + ContentClient.request( + method, + "http://localhost:#{bypass.port}/path/to/xml", + @no_query_params, + @no_headers, + @no_body, + client_options + ) + end + + defp test_xml_parsed_if_accepted(options) do + %{ + protocol: protocol, + method: method, + method_string: method_string, + client_options: client_options, + } = options + + bypass = Bypass.open() + + Bypass.expect bypass, method_string, "/path/to/xml", fn conn -> + conn + |> put_resp_content_type("application/xml") + |> send_resp(200, """ + OK + """) + end + + headers = [ + # testing downcased headers + {"Accept", "application/xml"} + ] + + # regular xml + assert {:ok, %{protocol: ^protocol, status_code: 200}, {:xml, _}} = + ContentClient.request( + method, + "http://localhost:#{bypass.port}/path/to/xml", + @no_query_params, + headers, + @no_body, + client_options + ) + + # normalized xml + assert {:ok, %{protocol: ^protocol, status_code: 200}, {:xmldoc, doc}} = + ContentClient.request( + method, + "http://localhost:#{bypass.port}/path/to/xml", + @no_query_params, + headers, + @no_body, + Keyword.put(client_options, :normalize_xml, true) + ) + + assert [%{ + "Response" => [%{ + "Status" => ["OK"] + }] + }] = doc + end +end diff --git a/test/pepper/http/content_client/content_encoding_test.exs b/test/pepper/http/content_client/content_encoding_test.exs new file mode 100644 index 0000000..70f7e72 --- /dev/null +++ b/test/pepper/http/content_client/content_encoding_test.exs @@ -0,0 +1,188 @@ +defmodule Pepper.HTTP.ContentClient.GetTest do + use Pepper.HTTP.Support.ClientCase + + alias Pepper.HTTP.ContentClient, as: Client + + import Plug.Conn + + Enum.each([true, false], fn with_connection_pool -> + Enum.each([:http1, :http2], fn protocol -> + describe "request/6 (with_connection_pool:#{with_connection_pool}, protocol:#{protocol}, method:GET)" do + @describetag with_connection_pool: with_connection_pool, protocol: to_string(protocol), method: "GET" + + test "can perform a GET request with string url and no special encodings", %{client_options: client_options} do + bypass = Bypass.open() + + Bypass.expect bypass, "GET", "/path/to/glory", fn conn -> + conn = Plug.Conn.fetch_query_params(conn) + + assert %{ + "this" => "is a test", + "also" => %{ + "that" => "was a test", + } + } = conn.query_params + + send_resp(conn, 200, "") + end + + headers = [] + + assert {:ok, %{status_code: 200}, _} = + Client.request( + "GET", + "http://localhost:#{bypass.port}/path/to/glory", + [{"this", "is a test"}, {"also", %{"that" => "was a test"}}], + headers, + nil, + client_options + ) + end + + test "can perform a GET request with string url and with accept-encoding: identity", %{client_options: client_options} do + bypass = Bypass.open() + + Bypass.expect bypass, "GET", "/path/to/glory", fn conn -> + conn = Plug.Conn.fetch_query_params(conn) + + assert %{ + "this" => "is a test", + "also" => %{ + "that" => "was a test", + } + } = conn.query_params + + conn + |> put_resp_header("content-encoding", "identity") + |> send_resp(200, "") + end + + headers = [ + {"accept-encoding", "identity"} + ] + + assert {:ok, %{status_code: 200}, _} = + Client.request( + "GET", + "http://localhost:#{bypass.port}/path/to/glory", + [{"this", "is a test"}, {"also", %{"that" => "was a test"}}], + headers, + nil, + client_options + ) + end + + test "can perform a GET request with string url and with accept-encoding: gzip", %{client_options: client_options} do + bypass = Bypass.open() + + Bypass.expect bypass, "GET", "/path/to/glory", fn conn -> + conn = Plug.Conn.fetch_query_params(conn) + + assert %{ + "this" => "is a test", + "also" => %{ + "that" => "was a test", + } + } = conn.query_params + + conn + |> put_resp_header("content-encoding", "gzip") + |> send_resp(200, :zlib.gzip("Hello, World")) + end + + headers = [ + {"accept-encoding", "gzip"} + ] + + assert {:ok, %{status_code: 200}, {:unk, "Hello, World"}} = + Client.request( + "GET", + "http://localhost:#{bypass.port}/path/to/glory", + [{"this", "is a test"}, {"also", %{"that" => "was a test"}}], + headers, + nil, + client_options + ) + end + + test "can perform a GET request with string url and with accept-encoding: deflate", %{client_options: client_options} do + bypass = Bypass.open() + + Bypass.expect bypass, "GET", "/path/to/glory", fn conn -> + conn = Plug.Conn.fetch_query_params(conn) + + assert %{ + "this" => "is a test", + "also" => %{ + "that" => "was a test", + } + } = conn.query_params + + conn + |> put_resp_header("content-encoding", "deflate") + |> send_resp(200, deflate("Hello, World")) + end + + headers = [ + {"accept-encoding", "deflate"} + ] + + assert {:ok, %{status_code: 200}, {:unk, "Hello, World"}} = + Client.request( + "GET", + "http://localhost:#{bypass.port}/path/to/glory", + [{"this", "is a test"}, {"also", %{"that" => "was a test"}}], + headers, + nil, + client_options + ) + end + + test "can perform a GET request with string url and with accept-encoding that doesn't match", %{client_options: client_options} do + bypass = Bypass.open() + + Bypass.expect bypass, "GET", "/path/to/glory", fn conn -> + conn = Plug.Conn.fetch_query_params(conn) + + assert %{ + "this" => "is a test", + "also" => %{ + "that" => "was a test", + } + } = conn.query_params + + conn + |> put_resp_header("content-encoding", "gzip") + |> send_resp(200, :zlib.gzip("Hello, World")) + end + + headers = [ + {"accept-encoding", "deflate"} + ] + + assert {:error, {:unaccepted_content_encoding, "gzip", _}} = + Client.request( + "GET", + "http://localhost:#{bypass.port}/path/to/glory", + [{"this", "is a test"}, {"also", %{"that" => "was a test"}}], + headers, + nil, + client_options + ) + end + end + end) + end) + + defp deflate(message) do + z = :zlib.open() + try do + :ok = :zlib.deflateInit(z, :default) + blob = :zlib.deflate(z, message, :finish) + :ok = :zlib.deflateEnd(z) + blob + after + :zlib.close(z) + end + end +end diff --git a/test/pepper/http/content_client/large_blobs_test.exs b/test/pepper/http/content_client/large_blobs_test.exs new file mode 100644 index 0000000..db7957c --- /dev/null +++ b/test/pepper/http/content_client/large_blobs_test.exs @@ -0,0 +1,165 @@ +defmodule Pepper.HTTP.ContentClient.LargeBlobsTest do + use Pepper.HTTP.Support.ClientCase + + Enum.each([true, false], fn with_connection_pool -> + Enum.each([:http1, :http2], fn protocol -> + Enum.each([{:post, "POST"}, {:patch, "PATCH"}, {:put, "PUT"}], fn {method, method_string} -> + describe "request/6 [with_connection_pool:#{with_connection_pool}, protocol:#{protocol}, method:#{method}, body:text]" do + @describetag with_connection_pool: with_connection_pool, protocol: to_string(protocol), method: to_string(method) + + test "can send a large text blob (without server reading body)", %{client_options: client_options} do + test_send_large_text_blob(%{ + server_read_body: false, + protocol: unquote(protocol), + method: unquote(method), + method_string: unquote(method_string), + client_options: client_options, + }) + end + + test "can send a large text blob (with server reading body)", %{client_options: client_options} do + test_send_large_text_blob(%{ + server_read_body: true, + protocol: unquote(protocol), + method: unquote(method), + method_string: unquote(method_string), + client_options: client_options, + }) + end + end + + describe "request/6 [with_connection_pool:#{with_connection_pool}, protocol:#{protocol}, method:#{method}, body:binary]" do + @describetag with_connection_pool: with_connection_pool, protocol: to_string(protocol), method: to_string(method) + + test "can send a large binary blob (without server reading body)", %{client_options: client_options} do + test_send_large_binary_blob(%{ + server_read_body: false, + protocol: unquote(protocol), + method: unquote(method), + method_string: unquote(method_string), + client_options: client_options, + }) + end + + test "can send a large binary blob (with server reading body)", %{client_options: client_options} do + test_send_large_binary_blob(%{ + server_read_body: true, + protocol: unquote(protocol), + method: unquote(method), + method_string: unquote(method_string), + client_options: client_options, + }) + end + end + + describe "request/6 [with_connection_pool:#{with_connection_pool}, protocol:#{protocol}, method:#{method}, body:form-data]" do + @describetag with_connection_pool: with_connection_pool, protocol: to_string(protocol), method: to_string(method) + # test "can send a form-data" do + # end + end + end) + end) + end) + + # 16mb blobs + @text_blob Pepper.HTTP.Utils.generate_random_base32(0x100_0000) + @binary_blob Pepper.HTTP.Utils.generate_random_binary(0x100_0000) + + defp test_send_large_text_blob(options) do + %{ + protocol: protocol, + method: method, + method_string: method_string, + client_options: client_options, + server_read_body: server_read_body?, + } = options + + blob = @text_blob + + bypass = Bypass.open() + + Bypass.expect bypass, method_string, "/path/to/text", fn conn -> + if server_read_body? do + {:ok, body, conn} = read_all_body(conn) + + assert blob == body + + conn + |> send_resp(204, "") + else + conn + |> send_resp(204, "") + end + end + + headers = [ + {"accept", "*/*"}, + {"user-agent", @user_agent} + ] + + query_params = [] + + body = {:text, blob} + + assert {:ok, %{protocol: ^protocol, status_code: 204}, {:unk, ""}} = + ContentClient.request( + method, + "http://localhost:#{bypass.port}/path/to/text", + query_params, + headers, + body, + Keyword.merge([ + recv_timeout: 5000, + ], client_options) + ) + end + + defp test_send_large_binary_blob(options) do + %{ + protocol: protocol, + method: method, + method_string: method_string, + server_read_body: server_read_body?, + client_options: client_options, + } = options + + blob = @binary_blob + + bypass = Bypass.open() + + Bypass.expect bypass, method_string, "/path/to/text", fn conn -> + if server_read_body? do + {:ok, body, conn} = read_all_body(conn) + + assert blob == body + + conn + |> send_resp(204, "") + else + conn + |> send_resp(204, "") + end + end + + headers = [ + {"accept", "*/*"}, + {"user-agent", @user_agent} + ] + + query_params = [] + + body = {:text, blob} + + assert {:ok, %{protocol: ^protocol, status_code: 204}, {:unk, ""}} = + ContentClient.request( + method, + "http://localhost:#{bypass.port}/path/to/text", + query_params, + headers, + body, + Keyword.merge([ + recv_timeout: 5000, + ], client_options) + ) + end +end diff --git a/test/pepper/http/content_client/query_params_test.exs b/test/pepper/http/content_client/query_params_test.exs new file mode 100644 index 0000000..f5391ee --- /dev/null +++ b/test/pepper/http/content_client/query_params_test.exs @@ -0,0 +1,78 @@ +defmodule Pepper.HTTP.ContentClient.QueryParamsTest do + use Pepper.HTTP.Support.ClientCase + + alias Pepper.HTTP.ContentClient, as: Client + + import Plug.Conn + + Enum.each([true, false], fn with_connection_pool -> + Enum.each([:http1, :http2], fn protocol -> + describe "request/6 (with_connection_pool:#{with_connection_pool}, protocol:#{protocol}, method:GET)" do + @describetag with_connection_pool: with_connection_pool, protocol: to_string(protocol), method: "GET" + + test "can encode query params with default encoding", %{client_options: client_options} do + bypass = Bypass.open() + + Bypass.expect bypass, "GET", "/path/to/glory", fn conn -> + conn = Plug.Conn.fetch_query_params(conn) + + assert %{ + "a" => "1", + "b" => "2", + "c" => "3", + "d" => "5", + } = conn.query_params + + send_resp(conn, 200, "") + end + + headers = [] + + assert {:ok, %{status_code: 200}, _} = + Client.request( + "GET", + "http://localhost:#{bypass.port}/path/to/glory", + [ + {"a", "1"}, + {"b", "2"}, + {"c", "3"}, + {"d", "5"}, + {"d", "4"}, + ], + headers, + nil, + client_options + ) + end + + test "can encode query params with duplicate encoding", %{client_options: client_options} do + bypass = Bypass.open() + + Bypass.expect bypass, "GET", "/path/to/glory", fn conn -> + assert "a=1&b=2&c=3&d=5&d=4" == conn.query_string + + send_resp(conn, 200, "") + end + + headers = [] + + assert {:ok, %{status_code: 200}, _} = + Client.request( + "GET", + "http://localhost:#{bypass.port}/path/to/glory", + [ + {"a", "1"}, + {"b", "2"}, + {"c", "3"}, + {"d", "5"}, + {"d", "4"}, + ], + headers, + nil, + Keyword.put(client_options, :query_params_encoding, :duplicate) + ) + end + end + end) + end) +end diff --git a/test/pepper/http/content_client2_test.exs b/test/pepper/http/content_client2_test.exs deleted file mode 100644 index 97929f6..0000000 --- a/test/pepper/http/content_client2_test.exs +++ /dev/null @@ -1,403 +0,0 @@ -defmodule Pepper.HTTP.ContentClient2Test do - use Pepper.HTTP.Support.ClientCase - - for protocol <- [:http1, :http2] do - for {method, method_string} <- [{:post, "POST"}, {:patch, "PATCH"}, {:put, "PUT"}] do - describe "request/6 [protocol:#{protocol}, method:#{method}, body:text]" do - test "can send a large text blob (without server reading body)" do - test_send_large_text_blob(%{ - server_read_body: false, - protocol: unquote(protocol), - method: unquote(method), - method_string: unquote(method_string) - }) - end - - test "can send a large text blob (with server reading body)" do - test_send_large_text_blob(%{ - server_read_body: true, - protocol: unquote(protocol), - method: unquote(method), - method_string: unquote(method_string) - }) - end - end - - describe "request/6 [protocol:#{protocol}, method:#{method}, body:form-data]" do - # test "can send a form-data" do - # end - end - end - - for {method, method_string} <- [{:get, "GET"}, {:delete, "DELETE"}] do - describe "request/6 [protocol:#{protocol}, method:#{method}, body:json]" do - test "will not parse an json response if not accepted" do - test_json_unparsed_with_header(%{ - protocol: unquote(protocol), - method: unquote(method), - method_string: unquote(method_string) - }) - end - - test "will parse an json response if no accept header is given" do - test_json_parsed_without_header(%{ - protocol: unquote(protocol), - method: unquote(method), - method_string: unquote(method_string) - }) - end - - test "will parse an json response if accepted" do - test_json_parsed_with_accept(%{ - protocol: unquote(protocol), - method: unquote(method), - method_string: unquote(method_string) - }) - end - end - - describe "request/6 [protocol:#{protocol}, method:#{method}, body:xml]" do - test "will not parse an xml response if not accepted" do - test_xml_unparsed_with_unaccepted(%{ - protocol: unquote(protocol), - method: unquote(method), - method_string: unquote(method_string) - }) - end - - test "will parse an xml response if no accept header is given" do - test_xml_parsed_without_header(%{ - protocol: unquote(protocol), - method: unquote(method), - method_string: unquote(method_string) - }) - end - - test "will parse an xml response if accepted" do - test_xml_parsed_if_accepted(%{ - protocol: unquote(protocol), - method: unquote(method), - method_string: unquote(method_string) - }) - end - end - end - end - - defp test_send_large_text_blob(options) do - %{ - protocol: protocol, - method: method, - method_string: method_string, - server_read_body: server_read_body?, - } = options - - client_options = [ - connect_options: [ - protocols: [protocol] - ] - ] - - blob = Pepper.HTTP.Utils.generate_random_base32(0x100_0000) - - bypass = Bypass.open() - - Bypass.expect bypass, method_string, "/path/to/text", fn conn -> - if server_read_body? do - {:ok, body, conn} = read_all_body(conn) - - assert blob == body - - conn - |> send_resp(204, "") - else - conn - |> send_resp(204, "") - end - end - - headers = [ - {"accept", "*/*"}, - {"user-agent", @user_agent} - ] - - query_params = [] - - body = {:text, blob} - - assert {:ok, %{status_code: 204}, {:unk, ""}} = - ContentClient.request( - method, - "http://localhost:#{bypass.port}/path/to/text", - query_params, - headers, - body, - Keyword.merge([ - recv_timeout: 5000, - ], client_options) - ) - end - - defp test_json_unparsed_with_header(options) do - %{ - protocol: protocol, - method: method, - method_string: method_string, - } = options - - client_options = [ - connect_options: [ - protocols: [protocol] - ] - ] - - bypass = Bypass.open() - - Bypass.expect bypass, method_string, "/path/to/json", fn conn -> - conn - |> put_resp_content_type("application/json") - |> send_resp(200, """ - { - "response": { - "status": "ok" - } - } - """) - end - - headers = [ - {"Accept", "application/xml"} - ] - - assert {:ok, %{status_code: 200}, {:unk, _}} = - ContentClient.request( - method, - "http://localhost:#{bypass.port}/path/to/json", - @no_query_params, - headers, - @no_body, - client_options - ) - end - - defp test_json_parsed_without_header(options) do - %{ - protocol: protocol, - method: method, - method_string: method_string, - } = options - - client_options = [ - connect_options: [ - protocols: [protocol] - ] - ] - - bypass = Bypass.open() - - Bypass.expect bypass, method_string, "/path/to/json", fn conn -> - conn - |> put_resp_content_type("application/json") - |> send_resp(200, """ - { - "response": { - "status": "ok" - } - } - """) - end - - assert {:ok, %{status_code: 200}, {:json, _}} = - ContentClient.request( - method, - "http://localhost:#{bypass.port}/path/to/json", - @no_query_params, - @no_headers, - @no_body, - client_options - ) - end - - defp test_json_parsed_with_accept(options) do - %{ - protocol: protocol, - method: method, - method_string: method_string, - } = options - - client_options = [ - connect_options: [ - protocols: [protocol] - ] - ] - - bypass = Bypass.open() - - Bypass.expect bypass, method_string, "/path/to/json", fn conn -> - conn - |> put_resp_content_type("application/json") - |> send_resp(200, """ - { - "response": { - "status": "ok" - } - } - """) - end - - headers = [ - # testing downcased headers - {"Accept", "application/json"} - ] - - # regular json - assert {:ok, %{status_code: 200}, {:json, doc}} = - ContentClient.request( - method, - "http://localhost:#{bypass.port}/path/to/json", - @no_query_params, - headers, - @no_body, - client_options - ) - - assert %{ - "response" => %{ - "status" => "ok" - } - } = doc - end - - defp test_xml_unparsed_with_unaccepted(options) do - %{ - protocol: protocol, - method: method, - method_string: method_string, - } = options - - client_options = [ - connect_options: [ - protocols: [protocol] - ] - ] - - bypass = Bypass.open() - - Bypass.expect bypass, method_string, "/path/to/xml", fn conn -> - conn - |> put_resp_content_type("application/xml") - |> send_resp(200, """ - - OK - - """) - end - - headers = [ - {"Accept", "application/json"} - ] - - assert {:ok, %{status_code: 200}, {:unk, _}} = - ContentClient.request( - method, - "http://localhost:#{bypass.port}/path/to/xml", - @no_query_params, - headers, - @no_body, - client_options - ) - end - - defp test_xml_parsed_without_header(options) do - %{ - protocol: protocol, - method: method, - method_string: method_string, - } = options - - client_options = [ - connect_options: [ - protocols: [protocol] - ] - ] - - bypass = Bypass.open() - - Bypass.expect bypass, method_string, "/path/to/xml", fn conn -> - conn - |> put_resp_content_type("application/xml") - |> send_resp(200, """ - - OK - - """) - end - - assert {:ok, %{status_code: 200}, {:xml, _}} = - ContentClient.request( - method, - "http://localhost:#{bypass.port}/path/to/xml", - @no_query_params, - @no_headers, - @no_body, - client_options - ) - end - - defp test_xml_parsed_if_accepted(options) do - %{ - protocol: protocol, - method: method, - method_string: method_string, - } = options - - client_options = [ - connect_options: [ - protocols: [protocol] - ] - ] - - bypass = Bypass.open() - - Bypass.expect bypass, method_string, "/path/to/xml", fn conn -> - conn - |> put_resp_content_type("application/xml") - |> send_resp(200, """ - OK - """) - end - - headers = [ - # testing downcased headers - {"Accept", "application/xml"} - ] - - # regular xml - assert {:ok, %{status_code: 200}, {:xml, _}} = - ContentClient.request( - method, - "http://localhost:#{bypass.port}/path/to/xml", - @no_query_params, - headers, - @no_body, - client_options - ) - - # normalized xml - assert {:ok, %{status_code: 200}, {:xmldoc, doc}} = - ContentClient.request( - method, - "http://localhost:#{bypass.port}/path/to/xml", - @no_query_params, - headers, - @no_body, - Keyword.put(client_options, :normalize_xml, true) - ) - - assert [%{ - :Response => [%{ - :Status => ["OK"] - }] - }] = doc - end -end diff --git a/test/pepper/http/content_client_test.exs b/test/pepper/http/content_client_test.exs index bf7775b..ae9b8c8 100644 --- a/test/pepper/http/content_client_test.exs +++ b/test/pepper/http/content_client_test.exs @@ -4,7 +4,6 @@ defmodule Pepper.HTTP.ContentClientTest do alias Pepper.HTTP.ContentClient, as: Client import Plug.Conn - import Pepper.HTTP.Utils @user_agent "content-client-test/1.0" @@ -12,48 +11,9 @@ defmodule Pepper.HTTP.ContentClientTest do @response_body_types [:none, :json, :text, :xml, :csv, :other] - setup tags do - tags = - case tags[:with_connection_pool] do - true -> - {:ok, pid} = - start_supervised({Pepper.HTTP.ConnectionManager.Pooled, [ - [pool_size: 10], - [] - ]}) - - Map.put(tags, :connection_pool_pid, pid) - - false -> - tags - end - - protocol = String.to_existing_atom(Map.fetch!(tags, :protocol)) - - client_options = [ - connect_options: [ - protocols: [protocol] - ] - ] - - client_options = - case tags[:connection_pool_pid] do - nil -> - client_options - - pid -> - Keyword.merge(client_options, [ - connection_manager: :pooled, - connection_manager_id: pid, - ]) - end - - Map.put(tags, :client_options, client_options) - end - - for with_connection_pool <- [true, false] do - for protocol <- [:http1, :http2] do - for {method, method_string} <- [{:get, "GET"}, {:delete, "DELETE"}] do + Enum.each([true, false], fn with_connection_pool -> + Enum.each([:http1, :http2], fn protocol -> + Enum.each([{:get, "GET"}, {:delete, "DELETE"}], fn {method, method_string} -> describe "request/6 [with_connection_pool:#{with_connection_pool}, protocol:#{protocol}, method:#{method}]" do @describetag with_connection_pool: with_connection_pool, protocol: to_string(protocol), method: to_string(method) @@ -67,7 +27,7 @@ defmodule Pepper.HTTP.ContentClientTest do end end - for response_body_type <- @response_body_types do + Enum.each(@response_body_types, fn response_body_type -> describe "request/6 [with_connection_pool:#{with_connection_pool}, protocol:#{protocol}, method:#{method}, response_body:#{response_body_type}]" do @describetag with_connection_pool: with_connection_pool, protocol: to_string(protocol), method: to_string(method) @@ -101,10 +61,10 @@ defmodule Pepper.HTTP.ContentClientTest do }) end end - end - end + end) + end) - for {method, method_string} <- [{:post, "POST"}, {:patch, "PATCH"}, {:put, "PUT"}] do + Enum.each([{:post, "POST"}, {:patch, "PATCH"}, {:put, "PUT"}], fn {method, method_string} -> describe "request/6 [with_connection_pool:#{with_connection_pool}, protocol:#{protocol}, method:#{method}]" do @describetag with_connection_pool: with_connection_pool, protocol: to_string(protocol), method: to_string(method) @@ -143,94 +103,16 @@ defmodule Pepper.HTTP.ContentClientTest do client_options: client_options }) end - - test "can send a large text blob (without server reading)", %{client_options: client_options} do - test_large_text_request_with_no_content_response(%{ - protocol: unquote(protocol), - method: unquote(method), - method_string: unquote(method_string), - client_options: client_options, - server_read: false - }) - end - - test "can send a large text blob (with server reading)", %{client_options: client_options} do - test_large_text_request_with_no_content_response(%{ - protocol: unquote(protocol), - method: unquote(method), - method_string: unquote(method_string), - client_options: client_options, - server_read: true - }) - end end - for response_body_type <- @response_body_types do + Enum.each(@response_body_types, fn response_body_type -> describe "request/6 [with_connection_pool:#{with_connection_pool}, protocol:#{protocol}, method:#{method}, response_body_type:#{response_body_type}]" do @describetag with_connection_pool: with_connection_pool, protocol: to_string(protocol), method: to_string(method), response_body_type: response_body_type end - end - end - end - end - - # 16mb blob - @large_text_blob generate_random_base32(0x100_0000) - - defp large_text_blob do - @large_text_blob - end - - defp test_large_text_request_with_no_content_response(options) do - %{ - protocol: protocol, - method: method, - method_string: method_string, - client_options: client_options, - server_read: server_read? - } = options - - bypass = Bypass.open(port: @port) - - blob = large_text_blob() - - Bypass.expect(bypass, method_string, "/path", fn conn -> - assert [@user_agent] == get_req_header(conn, "user-agent") - assert ["text/plain"] == get_req_header(conn, "content-type") - - conn = - if server_read? do - {:ok, text, conn} = read_all_body(conn) - assert blob == text - conn - else - conn - end - - send_resp(conn, 204, "") + end) + end) end) - - headers = [ - {"accept", "*/*"}, - {"user-agent", @user_agent} - ] - - query_params = [] - - body = {:text, blob} - - assert {:ok, %{protocol: ^protocol, status_code: 204}, {:unk, ""}} = - Client.request( - method, - "http://localhost:#{bypass.port}/path", - query_params, - headers, - body, - Keyword.merge([ - recv_timeout: 5000, - ], client_options) - ) - end + end) defp test_xml_request_with_no_content_response(options) do %{ @@ -261,14 +143,15 @@ defmodule Pepper.HTTP.ContentClientTest do query_params = [] - body = - {:xml, - XmlBuilder.document([ - {:head, [], - [ - {:ref, [], ["Test Value"]} - ]} - ])} + body = {:xml, + { + "head", + [], + [ + {"ref", [], ["Test Value"]} + ] + } + } assert {:ok, %{protocol: ^protocol, status_code: 204}, {:unk, ""}} = Client.request( @@ -707,29 +590,37 @@ defmodule Pepper.HTTP.ContentClientTest do assert "Hello, World" == blob {:csv, blob} -> - rows = + {:ok, stream} = blob - |> String.split("\r\n") - |> CSV.decode(headers: true) - |> Enum.map(fn {:ok, row} -> - row - end) - |> Enum.into([]) - - assert [ - %{ - "header1" => "r1_value1", - "header2" => "r1_value2", - }, - %{ - "header1" => "r2_value1", - "header2" => "r2_value2", - }, - %{ - "header1" => "r3_value1", - "header2" => "r3_value2", - }, - ] == rows + |> StringIO.open() + + try do + rows = + stream + |> IO.binstream(:line) + |> CSV.decode(headers: true) + |> Enum.map(fn {:ok, row} -> + row + end) + |> Enum.to_list() + + assert [ + %{ + "header1" => "r1_value1", + "header2" => "r1_value2", + }, + %{ + "header1" => "r2_value1", + "header2" => "r2_value2", + }, + %{ + "header1" => "r3_value1", + "header2" => "r3_value2", + }, + ] == rows + after + StringIO.close(stream) + end {:json, %{"body" => "Hello, World"}} -> assert :json == response_body_type diff --git a/test/pepper/http/utils_test.exs b/test/pepper/http/utils_test.exs new file mode 100644 index 0000000..8f95573 --- /dev/null +++ b/test/pepper/http/utils_test.exs @@ -0,0 +1,56 @@ +defmodule Pepper.HTTP.UtilsTest do + use ExUnit.Case + + alias Pepper.HTTP.Utils + + describe "safe_reduce_ets_table/3" do + test "can reduce a bag" do + tab = :ets.new(:my_bag, [:bag, :private]) + :ets.insert(tab, {:a, 1}) + :ets.insert(tab, {:b, 2}) + :ets.insert(tab, {:c, 3}) + :ets.insert(tab, {:d, 5}) + :ets.insert(tab, {:d, 4}) + :ets.insert(tab, {:d, 5}) + + try do + assert [ + {:d, 5}, + {:d, 4}, + {:c, 3}, + {:b, 2}, + {:a, 1}, + ] = Utils.safe_reduce_ets_table(tab, [], fn item, acc -> + [item | acc] + end) + after + :ets.delete(tab) + end + end + + test "can reduce a duplicate_bag" do + tab = :ets.new(:my_bag, [:duplicate_bag, :private]) + :ets.insert(tab, {:a, 1}) + :ets.insert(tab, {:b, 2}) + :ets.insert(tab, {:c, 3}) + :ets.insert(tab, {:d, 5}) + :ets.insert(tab, {:d, 4}) + :ets.insert(tab, {:d, 5}) + + try do + assert [ + {:d, 4}, + {:d, 5}, + {:d, 5}, + {:c, 3}, + {:b, 2}, + {:a, 1}, + ] = Utils.safe_reduce_ets_table(tab, [], fn item, acc -> + [item | acc] + end) + after + :ets.delete(tab) + end + end + end +end diff --git a/test/support/client_case.ex b/test/support/client_case.ex index 2caf9a4..02e0acc 100644 --- a/test/support/client_case.ex +++ b/test/support/client_case.ex @@ -21,6 +21,48 @@ defmodule Pepper.HTTP.Support.ClientCase do end end + setup tags do + tags = + case Map.get(tags, :with_connection_pool, false) do + true -> + {:ok, pid} = + start_supervised({Pepper.HTTP.ConnectionManager.Pooled, [ + Keyword.merge( + [{:pool_size, 10}], + Map.get(tags, :connection_pool_options, []) + ), + [] + ]}) + + Map.put(tags, :connection_pool_pid, pid) + + false -> + tags + end + + protocol = String.to_existing_atom(Map.fetch!(tags, :protocol)) + + client_options = [ + connect_options: [ + protocols: [protocol] + ] + ] + + client_options = + case tags[:connection_pool_pid] do + nil -> + client_options + + pid -> + Keyword.merge(client_options, [ + connection_manager: :pooled, + connection_manager_id: pid, + ]) + end + + Map.put(tags, :client_options, client_options) + end + def bypass_for_configured_endpoint(application_name, field) do value = Application.get_env(application_name, field) diff --git a/test/support/plug_pipeline_helpers.ex b/test/support/plug_pipeline_helpers.ex index bdad893..4700325 100644 --- a/test/support/plug_pipeline_helpers.ex +++ b/test/support/plug_pipeline_helpers.ex @@ -28,7 +28,7 @@ defmodule Pepper.HTTP.Support.PlugPipelineHelpers do def parse_xml(conn, _) do {:ok, body, conn} = Plug.Conn.read_body(conn) try do - doc = SweetXml.parse(body) + doc = Saxy.SimpleForm.parse_string(body) put_in(conn.params["_xml"], doc) rescue ex -> raise Plug.Parsers.ParseError, exception: ex