Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add distribution strategy and the ByRemainingRequests implementation #11

Merged
merged 6 commits into from
Oct 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,31 @@ $payload= [
Console::writeLine($ai->api('/chat/completions')->invoke($payload));
```

Distributing requests
---------------------
The *Distributed* endpoint allows to distribute requests over multiple endpoints. The *ByRemainingRequests* class uses the `x-ratelimit-remaining-requests` header to determine the target. See https://platform.openai.com/docs/guides/rate-limits

```php
use com\openai\rest\{AzureAIEndpoint, Distributed, ByRemainingRequests};
use util\cmd\Console;

$endpoints= [
new AzureAIEndpoint('https://...@r1.openai.azure.com/openai/deployments/mini', '2024-02-01'),
new AzureAIEndpoint('https://...@r2.openai.azure.com/openai/deployments/mini', '2024-02-01'),
];

$ai= new Distributed($endpoints, new ByRemainingRequests());
$payload= [
'model' => 'gpt-4o-mini',
'messages' => [['role' => 'user', 'content' => $prompt]],
];

Console::writeLine($ai->api('/chat/completions')->invoke($payload));
foreach ($endpoints as $i => $endpoint) {
Console::writeLine('Endpoint #', $i, ': ', $endpoint->rateLimit());
}
```

Realtime API
------------
*Coming soon*
Expand Down
3 changes: 3 additions & 0 deletions src/main/php/com/openai/rest/ApiEndpoint.class.php
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@
abstract class ApiEndpoint implements Traceable, Value {
use Comparison;

/** Returns rate limit */
public abstract function rateLimit(): RateLimit;

/** Returns an API */
public abstract function api(string $path, array $segments= []): Api;
}
7 changes: 5 additions & 2 deletions src/main/php/com/openai/rest/AzureAIEndpoint.class.php
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@
* @test com.openai.unittest.AzureAIEndpointTest
*/
class AzureAIEndpoint extends ApiEndpoint {
private $endpoint;
public $version, $rateLimit;
private $endpoint, $rateLimit;
public $version;

/**
* Creates a new OpenAI endpoint
Expand All @@ -32,6 +32,9 @@ public function __construct($arg, $version= null) {
$this->rateLimit= new RateLimit();
}

/** Returns rate limit */
public function rateLimit(): RateLimit { return $this->rateLimit; }

/** @return [:var] */
public function headers() { return $this->endpoint->headers(); }

Expand Down
38 changes: 38 additions & 0 deletions src/main/php/com/openai/rest/ByRemainingRequests.class.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
<?php namespace com\openai\rest;

/**
* Distributes requests using the rate limits returned in the response headers
* as weights for selecting the target.
*
* @see https://platform.openai.com/docs/guides/rate-limits
* @see https://learn.microsoft.com/en-us/azure/ai-services/openai/quotas-limits
* @test com.openai.unittest.ByRemainingRequestsTest
*/
class ByRemainingRequests implements Distribution {

/** Distributes API calls */
public function distribute(array $endpoints): ApiEndpoint {
$max= 0;
$most= null;
$candidates= [];
foreach ($endpoints as $i => $endpoint) {
$rateLimit= $endpoint->rateLimit();
if (null === $rateLimit->remaining) {
$candidates[]= $endpoint;
} else if ($rateLimit->remaining > $max) {
$most= $endpoint;
$max= $rateLimit->remaining;
}
}

// Select between the one with the most remaining requests, including any
// unlimited ones, and fall back to a random endpoint.
if ($most) {
$candidates[]= $most;
} else if (empty($candidates)) {
$candidates= $endpoints;
}

return $candidates[rand(0, sizeof($candidates) - 1)];
}
}
46 changes: 16 additions & 30 deletions src/main/php/com/openai/rest/Distributed.class.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,25 +5,36 @@

/**
* Supports distributing requests over multiple endpoints to increase
* performance, using the rate limits returned in the response headers
* as weights for selecting the target.
* performance, using a given distribution strategy to select target
* endpoints.
*
* @test com.openai.unittest.DistributedTest
*/
class Distributed extends ApiEndpoint {
private $endpoints;
private $endpoints, $distribution;

/**
* Creates a new distributed endpoint from a list of endpoints
*
* @param com.openai.rest.ApiEndpoint[] $endpoints
* @param com.openai.rest.Distribution $distribution
* @throws lang.IllegalArgumentException;
*/
public function __construct(array $endpoints) {
public function __construct(array $endpoints, Distribution $distribution) {
if (empty($endpoints)) {
throw new IllegalArgumentException('Endpoints cannot be empty');
}
$this->endpoints= $endpoints;
$this->distribution= $distribution;
}

/** Returns rate limit */
public function rateLimit(): RateLimit {
$r= new RateLimit();
foreach ($this->endpoints as $endpoint) {
$r->remaining+= $endpoint->rateLimit()->remaining;
}
return $r;
}

/**
Expand All @@ -37,34 +48,9 @@ public function setTrace($cat) {
}
}

/** Distributes API calls */
public function distribute(): ApiEndpoint {
$max= 0;
$most= null;
$candidates= [];
foreach ($this->endpoints as $i => $endpoint) {
if (null === $endpoint->rateLimit->remaining) {
$candidates[]= $endpoint;
} else if ($endpoint->rateLimit->remaining > $max) {
$most= $endpoint;
$max= $endpoint->rateLimit->remaining;
}
}

// Select between the one with the most remaining requests, including any
// unlimited ones, and fall back to a random endpoint.
if ($most) {
$candidates[]= $most;
} else if (empty($candidates)) {
$candidates= $this->endpoints;
}

return $candidates[rand(0, sizeof($candidates) - 1)];
}

/** Distributes request and returns an API */
public function api(string $path, array $segments= []): Api {
return $this->distribute()->api($path, $segments);
return $this->distribution->distribute($this->endpoints)->api($path, $segments);
}

/** @return string */
Expand Down
7 changes: 7 additions & 0 deletions src/main/php/com/openai/rest/Distribution.class.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
<?php namespace com\openai\rest;

interface Distribution {

/** Distributes API calls */
public function distribute(array $endpoints): ApiEndpoint;
}
6 changes: 4 additions & 2 deletions src/main/php/com/openai/rest/OpenAIEndpoint.class.php
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,7 @@
* @test com.openai.unittest.OpenAIEndpointTest
*/
class OpenAIEndpoint extends ApiEndpoint {
private $endpoint;
public $rateLimit;
private $endpoint, $rateLimit;

/**
* Creates a new OpenAI endpoint
Expand All @@ -30,6 +29,9 @@ public function __construct($arg, $organization= null, $project= null) {
$headers && $this->endpoint->with($headers);
}

/** Returns rate limit */
public function rateLimit(): RateLimit { return $this->rateLimit; }

/** @return [:var] */
public function headers() { return $this->endpoint->headers(); }

Expand Down
62 changes: 62 additions & 0 deletions src/test/php/com/openai/unittest/ByRemainingRequestsTest.class.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
<?php namespace com\openai\unittest;

use com\openai\rest\{Distributed, ByRemainingRequests};
use test\{Assert, Before, Expect, Test};

class ByRemainingRequestsTest {
use TestingEndpoint;

private $fixture;

#[Before]
public function fixture() {
$this->fixture= new ByRemainingRequests();
}

#[Test]
public function using_single_endpoint() {
$a= $this->testingEndpoint(1000);

Assert::equals($a, $this->fixture->distribute([$a]));
}

#[Test]
public function distributes_to_endpoint_with_most_remaining_requests() {
$a= $this->testingEndpoint(1000);
$b= $this->testingEndpoint(100);

Assert::equals($a, $this->fixture->distribute([$a, $b]));
}

#[Test]
public function chooses_randomly_if_ratelimit_unknown() {
$a= $this->testingEndpoint(null);
$b= $this->testingEndpoint(null);

Assert::true(in_array($this->fixture->distribute([$a, $b]), [$a, $b], true));
}

#[Test]
public function chooses_randomly_if_ratelimit_zero() {
$a= $this->testingEndpoint(0);
$b= $this->testingEndpoint(0);

Assert::true(in_array($this->fixture->distribute([$a, $b]), [$a, $b], true));
}

#[Test]
public function invokes_endpoint_with_most_remaining_requests() {
$a= $this->testingEndpoint(1000);
$b= $this->testingEndpoint(997);

// Invoke in a distributed manner. All requests will go to $a, since it
// has more remaining requests than $b
$distributed= new Distributed([$a, $b], $this->fixture);
for ($i= 0; $i < 3; $i++) {
$distributed->api('/chat/completions')->invoke(['prompt' => 'Test']);
}

Assert::equals(997, $a->rateLimit()->remaining);
Assert::equals(997, $b->rateLimit()->remaining);
}
}
94 changes: 28 additions & 66 deletions src/test/php/com/openai/unittest/DistributedTest.class.php
Original file line number Diff line number Diff line change
@@ -1,94 +1,56 @@
<?php namespace com\openai\unittest;

use com\openai\rest\{Api, Distributed, OpenAIEndpoint};
use com\openai\rest\{Api, ApiEndpoint, Distributed, Distribution};
use lang\IllegalArgumentException;
use test\{Assert, Before, Expect, Test};
use webservices\rest\TestEndpoint;
use test\{Assert, Before, Expect, Test, Values};

class DistributedTest {
private $endpoints;
use TestingEndpoint;

/** Returns a testing API endpoint */
private function testingEndpoint(int $remaining= 0): OpenAIEndpoint {
return new OpenAIEndpoint(new TestEndpoint([
'POST /chat/completions' => function($call) use(&$remaining) {
$remaining--;
return $call->respond(
200, 'OK',
['x-ratelimit-remaining-requests' => max(0, $remaining), 'Content-Type' => 'application/json'],
'{"choices":[{"message":{"role":"assistant","content":"Test"}}]}'
);
}
]));
}
private $strategy;

#[Before]
public function endpoints() {
$this->endpoints= [
new OpenAIEndpoint('https://sk-123@api.openai.example.com/v1'),
new OpenAIEndpoint('https://sk-234@api.openai.example.com/v1'),
];
/** Returns single and multiple endpoints */
private function endpoints(): iterable {
yield [[$this->testingEndpoint(1000)]];
yield [[$this->testingEndpoint(1000), $this->testingEndpoint(null)]];
}

#[Test]
public function can_create() {
new Distributed($this->endpoints);
#[Before]
public function strategy() {
$this->strategy= new class() implements Distribution {
public function distribute(array $endpoints): ApiEndpoint {
return $endpoints[rand(0, sizeof($endpoints) - 1)];
}
};
}

#[Test, Expect(IllegalArgumentException::class)]
public function cannot_be_empty() {
new Distributed([]);
new Distributed([], $this->strategy);
}

#[Test]
public function distribute_to_one_of_the_given_endpoints() {
$target= (new Distributed($this->endpoints))->distribute();
Assert::true(in_array($target, $this->endpoints, true));
#[Test, Values(from: 'endpoints')]
public function can_create($endpoints) {
new Distributed($endpoints, $this->strategy);
}

#[Test]
public function api_endpoint_returned() {
Assert::instance(Api::class, (new Distributed($this->endpoints))->api('/embeddings'));
#[Test, Values(from: 'endpoints')]
public function api_endpoint_returned($endpoints) {
Assert::instance(Api::class, (new Distributed($endpoints, $this->strategy))->api('/embeddings'));
}

#[Test]
public function rate_limit_updated() {
$target= (new Distributed([$this->testingEndpoint(1000)]))->distribute();
$target->api('/chat/completions')->invoke(['prompt' => 'Test']);
$target= $this->testingEndpoint(1000);
(new Distributed([$target], $this->strategy))->api('/chat/completions')->invoke(['prompt' => 'Test']);

Assert::equals(999, $target->rateLimit->remaining);
Assert::equals(999, $target->rateLimit()->remaining);
}

#[Test]
public function distributes_to_endpoint_with_most_remaining_requests() {
$a= $this->testingEndpoint(1000);
$b= $this->testingEndpoint(100);

// Invoke both as the limits are not updated until after a request
$a->api('/chat/completions')->invoke(['prompt' => 'Test a']);
$b->api('/chat/completions')->invoke(['prompt' => 'Test b']);

Assert::equals($a, (new Distributed([$a, $b]))->distribute());
}

#[Test]
public function invokes_endpoint_with_most_remaining_requests() {
$a= $this->testingEndpoint(1000);
$b= $this->testingEndpoint(997);

// Invoke both as the limits are not updated until after a request
// The rate limits will be $a= 999, $b= 996 after these.
$a->api('/chat/completions')->invoke(['prompt' => 'Test a']);
$b->api('/chat/completions')->invoke(['prompt' => 'Test b']);

// Now invoke in a distributed manner. All requests will go to $a,
// since it has more remaining requests than $b
$distributed= new Distributed([$a, $b]);
for ($i= 0; $i < 3; $i++) {
$distributed->api('/chat/completions')->invoke(['prompt' => 'Test']);
}
public function rate_limit_reflects_sum_of_limits() {
$endpoints= [$this->testingEndpoint(1000), $this->testingEndpoint(100)];

Assert::equals(996, $a->rateLimit->remaining);
Assert::equals(996, $b->rateLimit->remaining);
Assert::equals(1100, (new Distributed($endpoints, $this->strategy))->rateLimit()->remaining);
}
}
Loading
Loading