diff --git a/src/infra/drive-server/client/axios.d.ts b/src/infra/drive-server/client/axios.d.ts new file mode 100644 index 000000000..99fdc3bb1 --- /dev/null +++ b/src/infra/drive-server/client/axios.d.ts @@ -0,0 +1,7 @@ +import { RETRY_CONFIG_KEY } from './drive-server.constants'; + +declare module 'axios' { + interface InternalAxiosRequestConfig { + [RETRY_CONFIG_KEY]?: number; + } +} diff --git a/src/infra/drive-server/client/drive-server.client.instance.test.ts b/src/infra/drive-server/client/drive-server.client.instance.test.ts index 6cc77d1a1..e499bf7da 100644 --- a/src/infra/drive-server/client/drive-server.client.instance.test.ts +++ b/src/infra/drive-server/client/drive-server.client.instance.test.ts @@ -1,8 +1,7 @@ import { createClient } from '../drive-server.client'; -import Bottleneck from 'bottleneck'; import eventBus from '../../../apps/main/event-bus'; import { logout } from '../../../apps/main/auth/service'; -import { Mock } from 'vitest'; +import { call } from 'tests/vitest/utils.helper'; vi.mock('../drive-server.client', () => ({ createClient: vi.fn(() => ({})), @@ -36,22 +35,19 @@ describe('driveServerClient instance', () => { it('should call createClient with expected options', async () => { await import('./drive-server.client.instance'); - expect(createClient).toHaveBeenCalledWith( - expect.objectContaining({ - baseUrl: expect.any(String), - limiter: expect.any(Bottleneck), - onUnauthorized: expect.any(Function), - }), - ); + call(createClient).toMatchObject({ + baseUrl: expect.any(String), + onUnauthorized: expect.any(Function), + }); }); it('should call eventBus.emit and logout when onUnauthorized is triggered', async () => { await import('./drive-server.client.instance'); - const clientOptionsArg = (createClient as Mock).mock.calls[0][0]; + const clientOptions = vi.mocked(createClient).mock.calls[0]![0]!; - clientOptionsArg.onUnauthorized(); + clientOptions.onUnauthorized!(); - expect(eventBus.emit).toHaveBeenCalledWith('USER_WAS_UNAUTHORIZED'); + call(eventBus.emit).toEqual('USER_WAS_UNAUTHORIZED'); expect(logout).toHaveBeenCalled(); }); @@ -63,8 +59,6 @@ describe('driveServerClient instance', () => { await import('./drive-server.client.instance'); - const mostRecentCall = (createClient as Mock).mock.calls[(createClient as Mock).mock.calls.length - 1]; - const clientOptions = mostRecentCall[0]; - expect(clientOptions.baseUrl).toBe('https://mock.api'); + call(createClient).toMatchObject({ baseUrl: 'https://mock.api' }); }); }); diff --git a/src/infra/drive-server/client/drive-server.client.instance.ts b/src/infra/drive-server/client/drive-server.client.instance.ts index b165161e6..536117cf5 100644 --- a/src/infra/drive-server/client/drive-server.client.instance.ts +++ b/src/infra/drive-server/client/drive-server.client.instance.ts @@ -1,5 +1,4 @@ import { paths } from '../../schemas'; -import Bottleneck from 'bottleneck'; import { logout } from '../../../apps/main/auth/service'; import eventBus from '../../../apps/main/event-bus'; import { ClientOptions, createClient } from '../drive-server.client'; @@ -9,14 +8,8 @@ function handleOnUserUnauthorized(): void { logout(); } -const limiter = new Bottleneck({ - maxConcurrent: 2, - minTime: 500, -}); - const clientOptions: ClientOptions = { baseUrl: process.env.NEW_DRIVE_URL || '', - limiter, onUnauthorized: handleOnUserUnauthorized, }; diff --git a/src/infra/drive-server/client/drive-server.constants.ts b/src/infra/drive-server/client/drive-server.constants.ts new file mode 100644 index 000000000..0676554e6 --- /dev/null +++ b/src/infra/drive-server/client/drive-server.constants.ts @@ -0,0 +1,2 @@ +export const RETRY_CONFIG_KEY = '__rateLimiterRetryCount'; +export const MAX_RETRIES = 3; diff --git a/src/infra/drive-server/client/interceptors/rate-limiter/add-jitter.test.ts b/src/infra/drive-server/client/interceptors/rate-limiter/add-jitter.test.ts new file mode 100644 index 000000000..33edc26fa --- /dev/null +++ b/src/infra/drive-server/client/interceptors/rate-limiter/add-jitter.test.ts @@ -0,0 +1,40 @@ +import { addJitter } from './add-jitter'; + +describe('addJitter - spreads retry timing to avoid thundering herd', () => { + it('should never return less than the base delay', () => { + const result = addJitter(1000); + expect(result).toBeGreaterThanOrEqual(1000); + }); + + it('should add up to maxJitter ms on top of the base delay', () => { + const result = addJitter(1000, 200); + expect(result).toBeGreaterThanOrEqual(1000); + expect(result).toBeLessThan(1200); + }); + + it(' should default maxJitter to 100ms', () => { + const result = addJitter(500); + expect(result).toBeGreaterThanOrEqual(500); + expect(result).toBeLessThan(600); + }); + + it('should add no jitter when randomness is 0', () => { + vi.spyOn(Math, 'random').mockReturnValue(0); + expect(addJitter(1000, 200)).toBe(1000); + }); + + it('should add the maximum jitter when randomness is near 1', () => { + vi.spyOn(Math, 'random').mockReturnValue(0.999); + expect(addJitter(1000, 100)).toBe(1099); + }); + + it('should return the base delay exactly when maxJitter is 0', () => { + expect(addJitter(500, 0)).toBe(500); + }); + + it('should work with a base delay of 0', () => { + const result = addJitter(0, 50); + expect(result).toBeGreaterThanOrEqual(0); + expect(result).toBeLessThan(50); + }); +}); diff --git a/src/infra/drive-server/client/interceptors/rate-limiter/add-jitter.ts b/src/infra/drive-server/client/interceptors/rate-limiter/add-jitter.ts new file mode 100644 index 000000000..65177b0b2 --- /dev/null +++ b/src/infra/drive-server/client/interceptors/rate-limiter/add-jitter.ts @@ -0,0 +1,3 @@ +export function addJitter(baseMs: number, maxJitter = 100): number { + return baseMs + Math.floor(Math.random() * maxJitter); +} diff --git a/src/infra/drive-server/client/interceptors/rate-limiter/attach-rate-limiter-interceptors.test.ts b/src/infra/drive-server/client/interceptors/rate-limiter/attach-rate-limiter-interceptors.test.ts new file mode 100644 index 000000000..61532eade --- /dev/null +++ b/src/infra/drive-server/client/interceptors/rate-limiter/attach-rate-limiter-interceptors.test.ts @@ -0,0 +1,69 @@ +import { type Mock } from 'vitest'; +import { call } from 'tests/vitest/utils.helper'; +import { attachRateLimiterInterceptors } from './attach-rate-limiter-interceptors'; +import { createRequestInterceptor } from './create-request-interceptor'; +import { createResponseInterceptor } from './create-response-interceptor'; + +vi.mock('./create-request-interceptor'); +vi.mock('./create-response-interceptor'); + +describe('attachRateLimiterInterceptors', () => { + const mockRequestInterceptor = vi.fn(); + const mockOnFulfilled = vi.fn(); + const mockOnRejected = vi.fn(); + + const mockRequestUse = vi.fn(); + const mockResponseUse = vi.fn(); + + const instance = { + interceptors: { + request: { use: mockRequestUse }, + response: { use: mockResponseUse }, + }, + } as any; + + beforeEach(() => { + (createRequestInterceptor as Mock).mockReturnValue(mockRequestInterceptor); + (createResponseInterceptor as Mock).mockReturnValue({ + onFulfilled: mockOnFulfilled, + onRejected: mockOnRejected, + }); + }); + + it('should create a request interceptor with a fresh delay state', () => { + attachRateLimiterInterceptors(instance); + + call(createRequestInterceptor).toMatchObject({ pending: null }); + }); + + it('should register the request interceptor on the instance', () => { + attachRateLimiterInterceptors(instance); + + call(mockRequestUse).toMatchObject(mockRequestInterceptor); + }); + + it('should create a response interceptor with the instance, fresh rate limit state, and delay state', () => { + attachRateLimiterInterceptors(instance); + + call(createResponseInterceptor).toMatchObject([ + instance, + { limit: null, remaining: null, reset: null }, + { pending: null }, + ]); + }); + + it('should register the response interceptor on the instance', () => { + attachRateLimiterInterceptors(instance); + + call(mockResponseUse).toMatchObject([mockOnFulfilled, mockOnRejected]); + }); + + it('should share the same delay state between request and response interceptors', () => { + const delayState = { pending: null }; + + attachRateLimiterInterceptors(instance); + + call(createRequestInterceptor).toMatchObject(delayState); + call(createResponseInterceptor).toMatchObject([expect.anything(), expect.anything(), delayState]); + }); +}); diff --git a/src/infra/drive-server/client/interceptors/rate-limiter/attach-rate-limiter-interceptors.ts b/src/infra/drive-server/client/interceptors/rate-limiter/attach-rate-limiter-interceptors.ts new file mode 100644 index 000000000..bea17de34 --- /dev/null +++ b/src/infra/drive-server/client/interceptors/rate-limiter/attach-rate-limiter-interceptors.ts @@ -0,0 +1,14 @@ +import type { AxiosInstance } from 'axios'; +import { DelayState, RateLimitState } from './rate-limiter.types'; +import { createRequestInterceptor } from './create-request-interceptor'; +import { createResponseInterceptor } from './create-response-interceptor'; + +export function attachRateLimiterInterceptors(instance: AxiosInstance): void { + const state: RateLimitState = { limit: null, remaining: null, reset: null }; + const delayState: DelayState = { pending: null }; + + instance.interceptors.request.use(createRequestInterceptor(delayState)); + + const { onFulfilled, onRejected } = createResponseInterceptor(instance, state, delayState); + instance.interceptors.response.use(onFulfilled, onRejected); +} diff --git a/src/infra/drive-server/client/interceptors/rate-limiter/create-request-interceptor.test.ts b/src/infra/drive-server/client/interceptors/rate-limiter/create-request-interceptor.test.ts new file mode 100644 index 000000000..9af3b5a0c --- /dev/null +++ b/src/infra/drive-server/client/interceptors/rate-limiter/create-request-interceptor.test.ts @@ -0,0 +1,64 @@ +import type { InternalAxiosRequestConfig } from 'axios'; +import { createRequestInterceptor } from './create-request-interceptor'; +import { DelayState } from './rate-limiter.types'; + +describe('createRequestInterceptor', () => { + const mockConfig = { url: '/test' } as InternalAxiosRequestConfig; + + it('should return the config immediately when there is no pending delay', async () => { + const state: DelayState = { pending: null }; + const interceptor = createRequestInterceptor(state); + + const result = await interceptor(mockConfig); + + expect(result).toBe(mockConfig); + }); + + it('should wait for the pending delay before returning the config', async () => { + let resolveDelay!: () => void; + const state: DelayState = { + pending: new Promise((resolve) => { + resolveDelay = resolve; + }), + }; + const interceptor = createRequestInterceptor(state); + + let resolved = false; + const resultPromise = interceptor(mockConfig).then((config) => { + resolved = true; + return config; + }); + + await Promise.resolve(); + expect(resolved).toBe(false); + + resolveDelay(); + const result = await resultPromise; + + expect(resolved).toBe(true); + expect(result).toBe(mockConfig); + }); + + it('should make multiple concurrent requests wait for the same delay', async () => { + let resolveDelay!: () => void; + const state: DelayState = { + pending: new Promise((resolve) => { + resolveDelay = resolve; + }), + }; + const interceptor = createRequestInterceptor(state); + + const configA = { url: '/a' } as InternalAxiosRequestConfig; + const configB = { url: '/b' } as InternalAxiosRequestConfig; + + const promiseA = interceptor(configA); + const promiseB = interceptor(configB); + + resolveDelay(); + + const [resultA, resultB] = await Promise.all([promiseA, promiseB]); + + expect(resultA).toBe(configA); + expect(resultB).toBe(configB); + }); +}); diff --git a/src/infra/drive-server/client/interceptors/rate-limiter/create-request-interceptor.ts b/src/infra/drive-server/client/interceptors/rate-limiter/create-request-interceptor.ts new file mode 100644 index 000000000..696ff992b --- /dev/null +++ b/src/infra/drive-server/client/interceptors/rate-limiter/create-request-interceptor.ts @@ -0,0 +1,14 @@ +import type { InternalAxiosRequestConfig } from 'axios'; +import { DelayState } from './rate-limiter.types'; + +export function createRequestInterceptor( + delayState: DelayState, +): (config: InternalAxiosRequestConfig) => Promise { + return async (config: InternalAxiosRequestConfig) => { + if (delayState.pending) { + await delayState.pending; + } + + return config; + }; +} diff --git a/src/infra/drive-server/client/interceptors/rate-limiter/create-response-interceptor.test.ts b/src/infra/drive-server/client/interceptors/rate-limiter/create-response-interceptor.test.ts new file mode 100644 index 000000000..29dcd6c5c --- /dev/null +++ b/src/infra/drive-server/client/interceptors/rate-limiter/create-response-interceptor.test.ts @@ -0,0 +1,182 @@ +import type { AxiosInstance, AxiosResponse, InternalAxiosRequestConfig } from 'axios'; +import { AxiosError } from 'axios'; +import { call } from 'tests/vitest/utils.helper'; +import { createResponseInterceptor } from './create-response-interceptor'; +import { DelayState, RateLimitState } from './rate-limiter.types'; +import { RETRY_CONFIG_KEY, MAX_RETRIES } from '../../drive-server.constants'; + +vi.unmock('axios'); + +vi.mock('@internxt/drive-desktop-core/build/backend/core/logger/logger', () => ({ + logger: { warn: vi.fn() }, +})); + +vi.mock('./update-state-from-headers', () => ({ + updateStateFromHeaders: vi.fn(), +})); + +vi.mock('./wait-for-delay', () => ({ + waitForDelay: vi.fn(), +})); + +vi.mock('./add-jitter', () => ({ + addJitter: vi.fn((ms: number) => ms), +})); + +import { updateStateFromHeaders } from './update-state-from-headers'; +import { waitForDelay } from './wait-for-delay'; +import { addJitter } from './add-jitter'; +import { logger } from '@internxt/drive-desktop-core/build/backend/core/logger/logger'; + +function makeResponse(headers: Record = {}): AxiosResponse { + return { + data: {}, + status: 200, + statusText: 'OK', + headers, + config: {} as InternalAxiosRequestConfig, + }; +} + +function makeConfig(): InternalAxiosRequestConfig { + return {} as InternalAxiosRequestConfig; +} + +function make429Error(config?: InternalAxiosRequestConfig, headers: Record = {}): AxiosError { + const error = new AxiosError('Rate limited', '429', config); + error.response = { + status: 429, + statusText: 'Too Many Requests', + headers, + data: {}, + config: config ?? makeConfig(), + }; + return error; +} + +function makeNon429Error(status: number): AxiosError { + const error = new AxiosError('Server error', String(status)); + error.response = { + status, + statusText: 'Error', + headers: {}, + data: {}, + config: makeConfig(), + }; + return error; +} + +describe('createResponseInterceptor', () => { + let state: RateLimitState; + let delayState: DelayState; + let instance: AxiosInstance; + let retryResponse: AxiosResponse; + + beforeEach(() => { + state = { limit: null, remaining: null, reset: null }; + delayState = { pending: null }; + retryResponse = makeResponse(); + instance = { request: vi.fn().mockResolvedValue(retryResponse) } as unknown as AxiosInstance; + }); + + describe('onFulfilled', () => { + it('should update state from response headers and return the response', () => { + const { onFulfilled } = createResponseInterceptor(instance, state, delayState); + const response = makeResponse({ 'x-internxt-ratelimit-remaining': '10' }); + + const result = onFulfilled(response); + + call(updateStateFromHeaders).toMatchObject([state, response.headers]); + expect(result).toBe(response); + }); + }); + + describe('onRejected', () => { + it('should reject non-429 errors without retrying', async () => { + const { onRejected } = createResponseInterceptor(instance, state, delayState); + const error = makeNon429Error(500); + + await expect(onRejected(error)).rejects.toBe(error); + expect(instance.request).not.toHaveBeenCalled(); + }); + + it('should reject if error has no config', async () => { + const { onRejected } = createResponseInterceptor(instance, state, delayState); + const error = make429Error(undefined); + error.config = undefined; + + await expect(onRejected(error)).rejects.toBe(error); + expect(instance.request).not.toHaveBeenCalled(); + }); + + it('should update state from 429 response headers', async () => { + const { onRejected } = createResponseInterceptor(instance, state, delayState); + const headers = { 'x-internxt-ratelimit-reset': '5000' }; + const config = makeConfig(); + const error = make429Error(config, headers); + + await onRejected(error); + + call(updateStateFromHeaders).toMatchObject([state, headers]); + }); + + it('should wait using the reset value from state with jitter and retry', async () => { + state.reset = 3000; + vi.mocked(addJitter).mockReturnValue(3050); + + const { onRejected } = createResponseInterceptor(instance, state, delayState); + const config = makeConfig(); + const error = make429Error(config); + + const result = await onRejected(error); + + call(addJitter).toStrictEqual(3000); + call(waitForDelay).toMatchObject([delayState, 3050]); + call(instance.request).toMatchObject(config); + expect(result).toBe(retryResponse); + }); + + it('should default to 5000ms when state.reset is null', async () => { + state.reset = null; + + const { onRejected } = createResponseInterceptor(instance, state, delayState); + const config = makeConfig(); + const error = make429Error(config); + + await onRejected(error); + + call(addJitter).toStrictEqual(5000); + }); + + it('should increment the retry count on the config', async () => { + const { onRejected } = createResponseInterceptor(instance, state, delayState); + const config = makeConfig(); + const error = make429Error(config); + + await onRejected(error); + + expect(config[RETRY_CONFIG_KEY]).toBe(1); + }); + + it('should reject when retry count reaches MAX_RETRIES', async () => { + const { onRejected } = createResponseInterceptor(instance, state, delayState); + const config = { ...makeConfig(), [RETRY_CONFIG_KEY]: MAX_RETRIES } as InternalAxiosRequestConfig; + const error = make429Error(config); + + await expect(onRejected(error)).rejects.toBe(error); + call(logger.warn).toMatchObject({ msg: '[RATE LIMITER] Max retries exceeded for 429 response' }); + expect(instance.request).not.toBeCalled(); + }); + + it('should reject errors without a response status', async () => { + const { onRejected } = createResponseInterceptor(instance, state, delayState); + const error = new AxiosError('Network error'); + + await expect(onRejected(error)).rejects.toBe(error); + expect(updateStateFromHeaders).not.toBeCalled(); + expect(addJitter).not.toBeCalled(); + expect(waitForDelay).not.toBeCalled(); + expect(instance.request).not.toBeCalled(); + }); + }); +}); diff --git a/src/infra/drive-server/client/interceptors/rate-limiter/create-response-interceptor.ts b/src/infra/drive-server/client/interceptors/rate-limiter/create-response-interceptor.ts new file mode 100644 index 000000000..dd64b7598 --- /dev/null +++ b/src/infra/drive-server/client/interceptors/rate-limiter/create-response-interceptor.ts @@ -0,0 +1,59 @@ +import { logger } from '@internxt/drive-desktop-core/build/backend/core/logger/logger'; +import type { AxiosInstance, AxiosResponse, AxiosError } from 'axios'; +import { DelayState, RateLimitState, ResponseInterceptor } from './rate-limiter.types'; +import { updateStateFromHeaders } from './update-state-from-headers'; +import { waitForDelay } from './wait-for-delay'; +import { addJitter } from './add-jitter'; +import { MAX_RETRIES, RETRY_CONFIG_KEY } from '../../drive-server.constants'; + +export function createResponseInterceptor( + instance: AxiosInstance, + state: RateLimitState, + delayState: DelayState, +): ResponseInterceptor { + const onFulfilled = (response: AxiosResponse): AxiosResponse => { + updateStateFromHeaders(state, response.headers); + return response; + }; + + const onRejected = async (error: AxiosError): Promise => { + if (error.response?.status !== 429) { + return Promise.reject(error); + } + + const config = error.config; + if (!config) { + return Promise.reject(error); + } + + updateStateFromHeaders(state, error.response.headers); + + const retryCount = config[RETRY_CONFIG_KEY] ?? 0; + + if (retryCount >= MAX_RETRIES) { + logger.warn({ + msg: '[RATE LIMITER] Max retries exceeded for 429 response', + url: config.url, + retryCount, + }); + return Promise.reject(error); + } + + const waitMs = addJitter(state.reset ?? 5000); + + logger.warn({ + msg: '[RATE LIMITER] Rate limit exceeded (429), waiting and retrying', + url: config.url, + waitMs, + retryCount: retryCount + 1, + maxRetries: MAX_RETRIES, + }); + + await waitForDelay(delayState, waitMs); + + config[RETRY_CONFIG_KEY] = retryCount + 1; + return instance.request(config); + }; + + return { onFulfilled, onRejected }; +} diff --git a/src/infra/drive-server/client/interceptors/rate-limiter/delay.ts b/src/infra/drive-server/client/interceptors/rate-limiter/delay.ts new file mode 100644 index 000000000..59d801911 --- /dev/null +++ b/src/infra/drive-server/client/interceptors/rate-limiter/delay.ts @@ -0,0 +1,3 @@ +export function delay(ms: number): Promise { + return new Promise((resolve) => setTimeout(resolve, ms)); +} diff --git a/src/infra/drive-server/client/interceptors/rate-limiter/rate-limiter.types.ts b/src/infra/drive-server/client/interceptors/rate-limiter/rate-limiter.types.ts new file mode 100644 index 000000000..07d6f9890 --- /dev/null +++ b/src/infra/drive-server/client/interceptors/rate-limiter/rate-limiter.types.ts @@ -0,0 +1,13 @@ +import { AxiosError, AxiosResponse } from 'axios'; + +export type RateLimitState = { + limit: number | null; + remaining: number | null; + reset: number | null; +}; +export type DelayState = { pending: Promise | null }; + +export type ResponseInterceptor = { + onFulfilled: (response: AxiosResponse) => AxiosResponse; + onRejected: (error: AxiosError) => Promise; +}; diff --git a/src/infra/drive-server/client/interceptors/rate-limiter/update-state-from-headers.test.ts b/src/infra/drive-server/client/interceptors/rate-limiter/update-state-from-headers.test.ts new file mode 100644 index 000000000..ba0cbbc6e --- /dev/null +++ b/src/infra/drive-server/client/interceptors/rate-limiter/update-state-from-headers.test.ts @@ -0,0 +1,54 @@ +import { RateLimitState } from './rate-limiter.types'; +import { updateStateFromHeaders } from './update-state-from-headers'; + +describe('updateStateFromHeaders', () => { + it('should update all state fields when all headers are present', () => { + const state: RateLimitState = { limit: null, remaining: null, reset: null }; + + updateStateFromHeaders(state, { + 'x-internxt-ratelimit-limit': '100', + 'x-internxt-ratelimit-remaining': '50', + 'x-internxt-ratelimit-reset': '3000', + }); + + expect(state.limit).toBe(100); + expect(state.remaining).toBe(50); + expect(state.reset).toBe(3000); + }); + + it('should not change state when no rate limit headers are present', () => { + const state: RateLimitState = { limit: 100, remaining: 50, reset: 3000 }; + + updateStateFromHeaders(state, { 'content-type': 'application/json' }); + + expect(state.limit).toBe(100); + expect(state.remaining).toBe(50); + expect(state.reset).toBe(3000); + }); + + it('should only update the headers that are present', () => { + const state: RateLimitState = { limit: null, remaining: null, reset: null }; + + updateStateFromHeaders(state, { + 'x-internxt-ratelimit-remaining': '25', + }); + + expect(state.limit).toBeNull(); + expect(state.remaining).toBe(25); + expect(state.reset).toBeNull(); + }); + + it('should overwrite existing state values with new header values', () => { + const state: RateLimitState = { limit: 100, remaining: 50, reset: 3000 }; + + updateStateFromHeaders(state, { + 'x-internxt-ratelimit-limit': '200', + 'x-internxt-ratelimit-remaining': '199', + 'x-internxt-ratelimit-reset': '5000', + }); + + expect(state.limit).toBe(200); + expect(state.remaining).toBe(199); + expect(state.reset).toBe(5000); + }); +}); diff --git a/src/infra/drive-server/client/interceptors/rate-limiter/update-state-from-headers.ts b/src/infra/drive-server/client/interceptors/rate-limiter/update-state-from-headers.ts new file mode 100644 index 000000000..6efc9cebe --- /dev/null +++ b/src/infra/drive-server/client/interceptors/rate-limiter/update-state-from-headers.ts @@ -0,0 +1,18 @@ +import { AxiosResponse } from 'axios'; +import { RateLimitState } from './rate-limiter.types'; + +export function updateStateFromHeaders(state: RateLimitState, headers: AxiosResponse['headers']): void { + const limitHeader = headers['x-internxt-ratelimit-limit']; + const remainingHeader = headers['x-internxt-ratelimit-remaining']; + const resetHeader = headers['x-internxt-ratelimit-reset']; + + if (limitHeader) { + state.limit = parseInt(limitHeader, 10); + } + if (remainingHeader) { + state.remaining = parseInt(remainingHeader, 10); + } + if (resetHeader) { + state.reset = parseInt(resetHeader, 10); + } +} diff --git a/src/infra/drive-server/client/interceptors/rate-limiter/wait-for-delay.test.ts b/src/infra/drive-server/client/interceptors/rate-limiter/wait-for-delay.test.ts new file mode 100644 index 000000000..48aa234c9 --- /dev/null +++ b/src/infra/drive-server/client/interceptors/rate-limiter/wait-for-delay.test.ts @@ -0,0 +1,40 @@ +import { waitForDelay } from './wait-for-delay'; + +describe('waitForDelay', () => { + beforeEach(() => { + vi.useFakeTimers(); + }); + + afterEach(() => { + vi.useRealTimers(); + }); + + it('should clear the pending state after the delay resolves', async () => { + const state = { pending: null }; + + const promise = waitForDelay(state, 100); + expect(state.pending).not.toBeNull(); + + await vi.advanceTimersByTimeAsync(100); + await promise; + + expect(state.pending).toBeNull(); + }); + + it('should share the same delay for concurrent callers instead of creating separate ones', async () => { + const state = { pending: null }; + + const first = waitForDelay(state, 1000); + const pendingPromise = state.pending; + + const second = waitForDelay(state, 1000); + const third = waitForDelay(state, 1000); + + expect(state.pending).toBe(pendingPromise); + + await vi.advanceTimersByTimeAsync(1000); + await Promise.all([first, second, third]); + + expect(state.pending).toBeNull(); + }); +}); diff --git a/src/infra/drive-server/client/interceptors/rate-limiter/wait-for-delay.ts b/src/infra/drive-server/client/interceptors/rate-limiter/wait-for-delay.ts new file mode 100644 index 000000000..94a26222c --- /dev/null +++ b/src/infra/drive-server/client/interceptors/rate-limiter/wait-for-delay.ts @@ -0,0 +1,13 @@ +import { delay } from './delay'; +import { DelayState } from './rate-limiter.types'; + +export async function waitForDelay(delayState: DelayState, ms: number): Promise { + if (delayState.pending) { + await delayState.pending; + return; + } + + delayState.pending = delay(ms); + await delayState.pending; + delayState.pending = null; +} diff --git a/src/infra/drive-server/drive-server.client.ts b/src/infra/drive-server/drive-server.client.ts index b9d176877..c7b988072 100644 --- a/src/infra/drive-server/drive-server.client.ts +++ b/src/infra/drive-server/drive-server.client.ts @@ -1,5 +1,5 @@ import axios from 'axios'; -import Bottleneck from 'bottleneck'; +import { attachRateLimiterInterceptors } from './client/interceptors/rate-limiter/attach-rate-limiter-interceptors'; type HTTPMethod = 'get' | 'post' | 'put' | 'delete' | 'patch'; @@ -53,7 +53,6 @@ type OperationResponse = export interface ClientOptions { baseUrl: string; - limiter?: Bottleneck; onUnauthorized?: () => void; } @@ -72,9 +71,7 @@ export function createClient(opts: ClientOptions) { headers: { 'content-type': 'application/json' }, }); - if (opts.limiter) { - http.interceptors.request.use(opts.limiter.wrap(async (config: any) => config)); - } + attachRateLimiterInterceptors(http); if (opts.onUnauthorized) { http.interceptors.response.use( diff --git a/src/infra/drive-server/services/files/services/create-file.ts b/src/infra/drive-server/services/files/services/create-file.ts index 5d9b71a67..cbcccb4d6 100644 --- a/src/infra/drive-server/services/files/services/create-file.ts +++ b/src/infra/drive-server/services/files/services/create-file.ts @@ -1,27 +1,27 @@ import { logger } from '@internxt/drive-desktop-core/build/backend/core/logger/logger'; import { Result } from './../../../../../context/shared/domain/Result'; -import fetch from 'electron-fetch'; import { FileError } from '../file.error'; -import { errorHandler } from './file-error-handler'; -import { getNewApiHeadersIPC } from '../../../../ipc/get-new-api-headers-ipc'; import { mapError } from '../../utils/mapError'; import { FileDto, CreateFileDto } from '../../../out/dto'; +import { driveServerClient } from '../../../client/drive-server.client.instance'; +import { getNewApiHeaders } from '../../../../../apps/main/auth/service'; export async function createFile(body: CreateFileDto): Promise> { try { - const headers = await getNewApiHeadersIPC(); - const response = await fetch(`${process.env.NEW_DRIVE_URL}/files`, { - method: 'POST', - headers, - body: JSON.stringify(body), + const { data } = await driveServerClient.POST('/files', { + body, + headers: getNewApiHeaders(), }); - if (response.ok) { - const data: FileDto = await response.json(); + if (data) { return { data }; } - - return errorHandler(response); + logger.error({ + msg: 'unknown error creating a file', + path: '/files', + body, + }); + return { error: new FileError('UNKNOWN') }; } catch (error) { const mappedError = mapError(error); logger.error({ diff --git a/src/infra/ipc/get-new-api-headers-ipc.ts b/src/infra/ipc/get-new-api-headers-ipc.ts index a13e0d2c7..39b4d7625 100644 --- a/src/infra/ipc/get-new-api-headers-ipc.ts +++ b/src/infra/ipc/get-new-api-headers-ipc.ts @@ -1,8 +1,9 @@ const isMainProcess = process.type === 'browser'; /** - * Gets new API headers via IPC or direct call based on process type - * @returns Promise resolving to the API headers object + * @deprecated we dont need to do the disction between main and renderer process anymore, just use getNewApiHeaders + * ~~Gets new API headers via IPC or direct call based on process type~~ + * ~~@returns Promise resolving to the API headers object~~ */ export async function getNewApiHeadersIPC(): Promise> { if (isMainProcess) {