Skip to content

Commit

Permalink
Merge pull request #187 from patrickbkr/goaway-retry
Browse files Browse the repository at this point in the history
Implement a retry mechanism when GoAways are received
  • Loading branch information
jnthn authored Jan 22, 2024
2 parents d21b6c6 + 81121ce commit 0c3c5bc
Show file tree
Hide file tree
Showing 2 changed files with 167 additions and 127 deletions.
270 changes: 151 additions & 119 deletions lib/Cro/HTTP/Client.pm6
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,12 @@ use Cro::HTTP::LogTimelineSchema;
use Cro::HTTP::Request;
use Cro::HTTP::RequestSerializer;
use Cro::HTTP::ResponseParser;
use Cro::HTTP2::Frame;
use Cro::HTTP2::FrameParser;
use Cro::HTTP2::FrameSerializer;
use Cro::HTTP2::RequestSerializer;
use Cro::HTTP2::ResponseParser;
use Cro::HTTP2::GeneralParser;
use Cro::TCP;
use Cro::TLS;
use Cro::Uri;
Expand Down Expand Up @@ -163,6 +165,10 @@ class Cro::HTTP::Client {
method close() { $!in.done }
}

my class GoAwayRetry is Exception {
has $.goaway-exception;
}

my class Pipeline2 {
has Lock $!lock = Lock.new;
has Bool $.secure;
Expand All @@ -174,18 +180,7 @@ class Cro::HTTP::Client {
has $!next-stream-id = 1;
has %!outstanding-stream-responses{Int};

submethod BUILD(:$!secure!, :$!host!, :$!port!, :$!in!, :$out!, :$go-away-supply!) {
$go-away-supply.tap: -> $last-processed-sid {
$!dead = True;
$!lock.protect: {
for %!outstanding-stream-responses.kv -> $sid, $vow {
if $sid > $last-processed-sid {
%!outstanding-stream-responses{$sid}:delete;
$vow.break(X::AdHoc.new(message => 'GoAway packet received'));
}
}
}
}
submethod BUILD(:$!secure!, :$!host!, :$!port!, :$!in!, :$out!) {

$!tap = supply {
whenever $out -> $response {
Expand All @@ -195,6 +190,22 @@ class Cro::HTTP::Client {
self.break-all-responses(X::AdHoc.new(message => 'Connection to server lost'));
}
QUIT {
when X::Cro::HTTP2::GoAway {
$!dead = True;
$!lock.protect: {
for %!outstanding-stream-responses.kv -> $sid, $vow {
if $sid > .last-processed-sid {
%!outstanding-stream-responses{$sid}:delete;
if .code == NO_ERROR {
$vow.break(GoAwayRetry.new(goaway-exception => $_));
}
else {
$vow.break($_);
}
}
}
}
}
default {
$!dead = True;
self.break-all-responses($_);
Expand Down Expand Up @@ -339,6 +350,10 @@ class Cro::HTTP::Client {
#| Request timeout policy.
has Cro::Policy::Timeout $.timeout-policy;

#| How often should we retry to send a request when the server answered
#| with a NO_ERROR GoAway packet?
has $.http2-goaway-retries;

has $!persistent;
has $!connection-cache = ConnectionCache.new;

Expand All @@ -357,7 +372,8 @@ class Cro::HTTP::Client {
:$http-proxy, :$https-proxy,
:$!follow = $DEFAULT-MAX-REDIRECTS, :%!auth, :$!http,
:$!persistent = True, :$!ca, :$!push-promises = False,
:ssl(:%!tls), :$timeout, :$!user-agent = 'Cro') {
:ssl(:%!tls), :$timeout, :$!http2-goaway-retries = 1,
:$!user-agent = 'Cro') {
if $cookie-jar ~~ Bool {
$!cookie-jar = Cro::HTTP::Client::CookieJar.new;
}
Expand Down Expand Up @@ -577,123 +593,145 @@ class Cro::HTTP::Client {
Promise(supply {
my $request-start-time = now;
my $conn-timeout = $timeout-policy.get-timeout(0, 'connection');
whenever self!get-pipeline($proxy-url // $parsed-url, $http, $conn-timeout, $request-log, ca => %options<ca>, tls => %options<tls> // %options<ssl>, :$enable-push) -> $pipeline {
# Handle connection persistence.
if $pipeline !~~ Pipeline2 {
unless self.persistent || $request-object.has-header('connection') {
$request-object.append-header('Connection', 'close');
my $goaway-retries = self ?? $!http2-goaway-retries !! %options<http2-goaway-retries> // 1;
my Supplier $retry-supplier .= new;
my $retry-supply = $retry-supplier.Supply;
sub do-request-on-pipeline() {
whenever self!get-pipeline($proxy-url // $parsed-url, $http, $conn-timeout, $request-log, ca => %options<ca>, tls => %options<tls> // %options<ssl>, :$enable-push) -> $pipeline {

# Handle connection persistence.
if $pipeline !~~ Pipeline2 {
unless self.persistent || $request-object.has-header('connection') {
$request-object.append-header('Connection', 'close');
}
}
}

# Set up any timeout for receiving the response headers.
my $timeout = $timeout-policy.get-timeout(now - $request-start-time, 'headers');
my Bool $headers-kept = False;
if $timeout !~~ Inf {
whenever Promise.in($timeout) {
die X::Cro::HTTP::Client::Timeout.new(phase => 'headers', uri => $url) unless $headers-kept;
# Set up any timeout for receiving the response headers.
my $timeout = $timeout-policy.get-timeout(now - $request-start-time, 'headers');
my Bool $headers-kept = False;
if $timeout !~~ Inf {
whenever Promise.in($timeout) {
die X::Cro::HTTP::Client::Timeout.new(phase => 'headers', uri => $url) unless $headers-kept || $pipeline.dead;
}
}
}

# Send the request.
whenever $pipeline.send-request($request-object) {
$headers-kept = True;
QUIT { $request-log.end }

# Consider adding the connection back into the cache to use it
# again.
if self && $!persistent {
unless .http-version eq '1.0' || (.header('connection') // '').lc eq 'close' {
$!connection-cache.add-pipeline($pipeline);
# Send the request.
whenever $pipeline.send-request($request-object) {
$headers-kept = True;
QUIT {
$request-log.end;
when GoAwayRetry {
if $goaway-retries > 0 && !$headers-kept {
$retry-supplier.emit: True;
}
else {
.goaway-exception.rethrow;
}
}
}
}
else {
$pipeline.close;
}

# If there's a body timeout, enforce it. Note that we need to detach
# this from the current supply, since it outlives it.
my $body-timeout = $timeout-policy.get-timeout(now - $request-start-time, 'body');
if $body-timeout != Inf {
my $response-to-timeout = $_;
Promise.in($body-timeout).then: { $response-to-timeout.cancel }
}
# Consider adding the connection back into the cache to use it
# again.
if self && $!persistent {
unless .http-version eq '1.0' || (.header('connection') // '').lc eq 'close' {
$!connection-cache.add-pipeline($pipeline);
}
}
else {
$pipeline.close;
}

# Set request object for received response.
.request = $request-object;
.request.http-version = $pipeline ~~ Pipeline2 ?? '2' !! '1.1';

# Pick next steps according to response.
if 200 <= .status < 400 || .status == 101 {
my $follow;
if self {
$follow = %options<follow> // $!follow // $DEFAULT-MAX-REDIRECTS;
} else {
$follow = %options<follow> // $DEFAULT-MAX-REDIRECTS;
# If there's a body timeout, enforce it. Note that we need to detach
# this from the current supply, since it outlives it.
my $body-timeout = $timeout-policy.get-timeout(now - $request-start-time, 'body');
if $body-timeout != Inf {
my $response-to-timeout = $_;
Promise.in($body-timeout).then: { $response-to-timeout.cancel }
}
if .status $redirect-codes && ($follow !=== False) {
my $remain = $follow === True ?? 4 !! $follow.Int - 1;
if $remain < 0 {
$request-log.end;
die X::Cro::HTTP::Client::TooManyRedirects.new;
}
my $new-method = .status == 302 | 303 ?? 'GET' !! $method;
my %new-opts = %options;
%new-opts<follow> = $remain;
if .status == 302 | 303 {
%new-opts<body>:delete;
%new-opts<content-type>:delete;
%new-opts<content-length>:delete;

# Set request object for received response.
.request = $request-object;
.request.http-version = $pipeline ~~ Pipeline2 ?? '2' !! '1.1';

# Pick next steps according to response.
if 200 <= .status < 400 || .status == 101 {
my $follow;
if self {
$follow = %options<follow> // $!follow // $DEFAULT-MAX-REDIRECTS;
} else {
$follow = %options<follow> // $DEFAULT-MAX-REDIRECTS;
}
my $new-url = $parsed-url.add(Cro::Uri::HTTP.parse-ref(.header('location')));
%new-opts<PARENT-REQUEST-LOG> = $request-log;
Cro::HTTP::LogTimeline::Redirected.log($request-log, :status(.status), :url($new-url));
my $req = self.request($new-method, $new-url, %new-opts);
CATCH { $request-log.end; }
whenever $req {
QUIT { $request-log.end; }
if .status $redirect-codes && ($follow !=== False) {
my $remain = $follow === True ?? 4 !! $follow.Int - 1;
if $remain < 0 {
$request-log.end;
die X::Cro::HTTP::Client::TooManyRedirects.new;
}
my $new-method = .status == 302 | 303 ?? 'GET' !! $method;
my %new-opts = %options;
%new-opts<follow> = $remain;
if .status == 302 | 303 {
%new-opts<body>:delete;
%new-opts<content-type>:delete;
%new-opts<content-length>:delete;
}
my $new-url = $parsed-url.add(Cro::Uri::HTTP.parse-ref(.header('location')));
%new-opts<PARENT-REQUEST-LOG> = $request-log;
Cro::HTTP::LogTimeline::Redirected.log($request-log, :status(.status), :url($new-url));
my $req = self.request($new-method, $new-url, %new-opts);
CATCH { $request-log.end; }
whenever $req {
QUIT { $request-log.end; }
$request-log.end;
.emit;
done;
};
} else {
if self && $.cookie-jar.defined {
$.cookie-jar.add-from-response($_, $parsed-url);
}
$request-log.end;
.emit;
done;
};
} else {
if self && $.cookie-jar.defined {
$.cookie-jar.add-from-response($_, $parsed-url);
}
$request-log.end;
.emit;
done;
}
} elsif 400 <= .status < 500 {
my $auth;
if self {
$auth = %options<auth> // %!auth;
} else {
$auth = %options<auth> // {};
}
if .status == 401 && (%options<auth><if-asked>:exists) {
my %opts = %options;
%opts<auth><if-asked>:delete;
%opts<PARENT-REQUEST-LOG> = $request-log;
Cro::HTTP::LogTimeline::AuthorizationRequested.log($request-log);
CATCH { $request-log.end; }
whenever self.request($method, $parsed-url, %opts) {
QUIT { $request-log.end; }
} elsif 400 <= .status < 500 {
my $auth;
if self {
$auth = %options<auth> // %!auth;
} else {
$auth = %options<auth> // {};
}
if .status == 401 && (%options<auth><if-asked>:exists) {
my %opts = %options;
%opts<auth><if-asked>:delete;
%opts<PARENT-REQUEST-LOG> = $request-log;
Cro::HTTP::LogTimeline::AuthorizationRequested.log($request-log);
CATCH { $request-log.end; }
whenever self.request($method, $parsed-url, %opts) {
QUIT { $request-log.end; }
$request-log.end;
.emit;
done;
};
} else {
Cro::HTTP::LogTimeline::ErrorResponse.log($request-log, :status(.status));
$request-log.end;
.emit;
done;
};
} else {
die X::Cro::HTTP::Error::Client.new(response => $_);
}
} elsif .status >= 500 {
Cro::HTTP::LogTimeline::ErrorResponse.log($request-log, :status(.status));
$request-log.end;
die X::Cro::HTTP::Error::Client.new(response => $_);
die X::Cro::HTTP::Error::Server.new(response => $_);
}
} elsif .status >= 500 {
Cro::HTTP::LogTimeline::ErrorResponse.log($request-log, :status(.status));
$request-log.end;
die X::Cro::HTTP::Error::Server.new(response => $_);
}
}
}

whenever $retry-supply {
$goaway-retries--;
do-request-on-pipeline();
}
do-request-on-pipeline();
})
}

Expand Down Expand Up @@ -808,15 +846,9 @@ class Cro::HTTP::Client {
}
push @parts, self.choose-connector($secure);

my $go-away-supply;
sub create-response-parser(*%params) {
my $res-parser = Cro::HTTP2::ResponseParser.new(|%params);
$go-away-supply = $res-parser.go-away-supply;
return $res-parser;
}
if $http eq '2' {
push @parts, Cro::HTTP2::FrameParser.new(:client);
push @parts, create-response-parser(:$enable-push);
push @parts, Cro::HTTP2::ResponseParser.new(:$enable-push);
}
elsif $http eq '1.1' || !$secure || !$supports-alpn {
push @parts, Cro::HTTP::ResponseParser.new();
Expand All @@ -825,7 +857,7 @@ class Cro::HTTP::Client {
push @parts, Cro::ConnectionConditional.new(
{ (.alpn-result // '') eq 'h2' } => [
Cro::HTTP2::FrameParser.new(:client),
create-response-parser()
Cro::HTTP2::ResponseParser.new()
],
Cro::HTTP::ResponseParser.new()
);
Expand Down Expand Up @@ -862,7 +894,7 @@ class Cro::HTTP::Client {
};
$version-decision.then: -> $version {
$version.result eq '2'
?? Pipeline2.new(:$secure, :$host, :$port, :$in, :$out, :$go-away-supply)
?? Pipeline2.new(:$secure, :$host, :$port, :$in, :$out)
!! Pipeline.new(:$secure, :$host, :$port, :$in, :$out)
}
}
Expand Down
Loading

0 comments on commit 0c3c5bc

Please sign in to comment.