Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HTTP Streams, not possible to implement backpressure or to detect when a client disconnects #294

Open
ftokarev opened this issue Aug 27, 2024 · 4 comments

Comments

@ftokarev
Copy link

I am writing a function that retrieves the files from Azure Blob Storage and returns a zip file containing them. HTTP Streams seem like a perfect match for this use case: I can just start loading files one by one from storage and stream the archive to the client on the fly.

The proof of concept works, but I'm facing the following issues:

  1. There seems to be no way to detect that a client closed the connection. The function keeps writing data somewhere, and only when everything is written, there appears an error in the log ([Error] Executed '<function name>' (Failed, Id=<id>, Duration=54018ms))
  2. There is no way to implement backpressure. The "drain" event is never emitted on the stream that I return in the response body, the data is consumed immediately and fully whenever I pass it into the stream. I tested with slow clients: the function keeps sending data, while the memory used by the function app rapidly increases. Out of curiosity I looked around the codebase, it seems the reason is this code, where the response is written unconditionally, without checking the value returned by ServerResponse.write.

For completeness, this is the PoC code I have:

import { InvocationContext, HttpHandler, HttpRequest } from '@azure/functions';
import * as archiver from "archiver";
import * as stream from "stream";

export const Download: HttpHandler = async (request: HttpRequest, context: InvocationContext) => {
    const ptStream = new stream.PassThrough({ highWaterMark: 64 * 1024 });

    ptStream.on("drain", () => {
        context.log("This is never called");
    });

    const archive = archiver("zip", { zlib: { level: 1 } });
    archive.pipe(ptStream);

    const queue = [...input.filenames];

    const processNext = async () => {
        const filename = queue.pop();
        const blockBlobClient = containerClient.getBlockBlobClient(filename);
        const downloadResponse = await blockBlobClient.download();
        const stream = downloadResponse.readableStreamBody as NodeJS.ReadableStream;

        archive.append(stream, { name: filename });
    };

    archive.on('entry', (e) => {
        if (queue.length == 0) {
            return archive.finalize();
        }

        processNext();
    });

    processNext();

    return {
        body: ptStream,
        status: 200,
        headers: {
            "Content-Type": "application/zip",
            "Content-Disposition": `attachment; filename=${input.archiveName}`
        }
    }
}
@ftokarev
Copy link
Author

For the second issue, perhaps the fix can be as simple as replacing this code with the following?

for await (const chunk of userRes.body.values()) {
    const canWrite = proxyRes.write(chunk);

    if (!canWrite) {
        await new Promise((resolve) => proxyRes.once("drain", resolve));
    }
}

@ejizba ejizba added this to the September 2024 milestone Aug 30, 2024
@ejizba
Copy link
Contributor

ejizba commented Aug 30, 2024

Hi @ftokarev thanks for the detailed report and suggested fix - we'll look into it

@DawidRubch
Copy link

Hey, any updates on this? I'm struggiling with the cancel stream part. I might have to move my entire API to somewhere else if that's not possible

@DawidRubch
Copy link

Or maybe(if possible) is there a way to cancel a running request by having the invocationId?

That could do it, not ideal approach.

@ejizba ejizba self-assigned this Sep 11, 2024
@ejizba ejizba modified the milestones: September 2024, October 2024 Sep 30, 2024
@ejizba ejizba removed their assignment Oct 15, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

3 participants