Skip to content

Commit

Permalink
DocumentInfo S3 downloads: Support parallel downloads.
Browse files Browse the repository at this point in the history
Otherwise, when downloading a very large file, could error out on
a memory error.
  • Loading branch information
kohler committed Aug 1, 2024
1 parent f23eb6c commit 042ed11
Showing 1 changed file with 50 additions and 34 deletions.
84 changes: 50 additions & 34 deletions src/documentinfo.php
Original file line number Diff line number Diff line change
Expand Up @@ -910,19 +910,26 @@ function s3_accel_redirect() {
}
}

/** @param string $dspath
* @return ?resource */
/** @param ?string $dspath
* @return ?array{resource,string} */
static private function fopen_docstore($dspath) {
if (!$dspath) {
return null;
}
$stream = @fopen("{$dspath}~", "x+b");
if ($stream === false
&& @filemtime("{$dspath}~") < Conf::$now - 3600
&& @unlink("{$dspath}~")) {
$stream = @fopen("{$dspath}~", "x+b");
for ($i = 0; $i < 50; ++$i) {
$dstmp = $i ? "{$dspath}~{$i}~" : "{$dspath}~";
$stream = @fopen($dstmp, "x+b");
if ($stream === false
&& ($t = @filemtime($dstmp)) !== false
&& $t < Conf::$now - 3600
&& @unlink($dstmp)) {
$stream = @fopen($dstmp, "x+b");
}
if ($stream) {
return [$stream, $dstmp];
}
}
return $stream ? : null;
return null;
}

/** @return bool */
Expand All @@ -933,51 +940,53 @@ private function load_s3() {
return false;
}
$dspath = Filer::docstore_path($this, Filer::FPATH_MKDIR);
if (function_exists("curl_init")) {
$stream = $dspath ? self::fopen_docstore($dspath) : null;
$s3l = $s3->start_curl_get($s3k)
->set_response_body_stream($stream)
->set_timeout_size($this->size(self::SIZE_NO_CONTENT));
$s3l->run();
return $this->handle_load_s3_curl($s3l, $stream ? $dspath : null);
} else {
if (!function_exists("curl_init")) {
return $this->load_s3_direct($s3, $s3k, $dspath);
}
$s3l = $s3->start_curl_get($s3k)->set_timeout_size($this->size(self::SIZE_NO_CONTENT));
if (($dsstmp = self::fopen_docstore($dspath))) {
$s3l->set_response_body_stream($dsstmp[0])->run();
return $this->handle_load_s3_curl($s3l, $dspath, $dsstmp[1]);
} else {
$s3l->run();
return $this->handle_load_s3_curl($s3l, null, null);
}
}

/** @param CurlS3Result $s3l
* @param ?string $dspath
* @param ?string $dstmp
* @return bool */
private function handle_load_s3_curl($s3l, $dspath) {
private function handle_load_s3_curl($s3l, $dspath, $dstmp) {
if ($s3l->status === 404
&& $this->s3_upgrade_extension($s3l->s3, $s3l->skey)) {
$s3l->reset()->run();
}
if ($s3l->status !== 200) {
error_log("S3 error: GET {$s3l->skey}: {$s3l->status} {$s3l->status_text} " . json_encode_db($s3l->response_headers));
$s3l->close_response_body_stream();
if ($dspath) {
@unlink("{$dspath}~");
if ($dstmp) {
@unlink($dstmp);
}
return false;
}
if (!$dspath) {
if (!$dstmp) {
$this->content = $s3l->response_body();
$s3l->close_response_body_stream();
return true;
}
$s3l->close_response_body_stream();
$sz = self::filesize_expected("{$dspath}~", $this->size);
$sz = self::filesize_expected($dstmp, $this->size);
if ($sz !== $this->size) {
error_log("Disk error: GET {$s3l->skey}: expected size {$this->size}, got " . json_encode($sz));
$s3l->status = 500;
} else if (rename("{$dspath}~", $dspath)) {
} else if (rename($dstmp, $dspath)) {
$this->filestore = $dspath;
return true;
} else {
$this->content = file_get_contents("{$dspath}~");
$this->content = file_get_contents($dstmp);
}
@unlink("{$dspath}~");
@unlink($dstmp);
return $s3l->status === 200;
}

Expand Down Expand Up @@ -1257,6 +1266,7 @@ static function prefetch_content($docs, $flags = 0) {
}

$adocs = [];
'@phan-var-force list<array{DocumentInfo,CurlS3Result,int|float,?string,?string}> $adocs';
$curlm = curl_multi_init();
$starttime = $stoptime = null;
$docstore = ($flags & self::FLAG_NO_DOCSTORE) === 0;
Expand All @@ -1279,14 +1289,19 @@ static function prefetch_content($docs, $flags = 0) {
// add documents to sliding window
while (count($adocs) < 8 && !empty($pfdocs)) {
$doc = array_pop($pfdocs);
$s3k = $doc->s3_key();
if (!$s3k) {
continue;
}
$s3 = $doc->conf->s3_client();
if (($s3k = $doc->s3_key())) {
$dspath = $docstore ? Filer::docstore_path($doc, Filer::FPATH_MKDIR) : null;
$stream = $dspath ? self::fopen_docstore($dspath) : null;
$s3l = $s3->start_curl_get($s3k)
->set_response_body_stream($stream)
->set_timeout_size($doc->size());
$adocs[] = [$doc, $s3l, 0, $stream ? $dspath : null];
$s3l = $s3->start_curl_get($s3k)->set_timeout_size($doc->size(self::SIZE_NO_CONTENT));
if ($docstore
&& ($dspath = Filer::docstore_path($doc, Filer::FPATH_MKDIR))
&& ($dsstmp = self::fopen_docstore($dspath))) {
$s3l->set_response_body_stream($dsstmp[0]);
$adocs[] = [$doc, $s3l, 0, $dspath, $dsstmp[1]];
} else {
$adocs[] = [$doc, $s3l, 0, null, null];
}
}
if (empty($adocs)) {
Expand Down Expand Up @@ -1322,10 +1337,11 @@ static function prefetch_content($docs, $flags = 0) {
$curlh = $minfo["handle"];
for ($i = 0; $i < count($adocs) && $adocs[$i][1]->curlh !== $curlh; ++$i) {
}
$s3l = $adocs[$i][1];
$adoc = $adocs[$i];
$s3l = $adoc[1];
curl_multi_remove_handle($curlm, $s3l->curlh);
if ($s3l->parse_result()) {
$adocs[$i][0]->handle_load_s3_curl($s3l, $adocs[$i][3]);
$adoc[0]->handle_load_s3_curl($s3l, $adoc[3], $adoc[4]);
array_splice($adocs, $i, 1);
} else {
$adocs[$i][2] = microtime(true) + 0.005 * (1 << $s3l->runindex);
Expand All @@ -1341,7 +1357,7 @@ static function prefetch_content($docs, $flags = 0) {
// clean up leftovers
foreach ($adocs as $adoc) {
$adoc[1]->status = null;
$adoc[0]->handle_load_s3_curl($adoc[1], $adoc[3]);
$adoc[0]->handle_load_s3_curl($adoc[1], $adoc[3], $adoc[4]);
}
curl_multi_close($curlm);
}
Expand Down

0 comments on commit 042ed11

Please sign in to comment.