Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

File descriptor leak in mixed pipeline & help offer #114

Open
bandoos opened this issue Mar 2, 2024 · 0 comments
Open

File descriptor leak in mixed pipeline & help offer #114

bandoos opened this issue Mar 2, 2024 · 0 comments
Labels
bug Something isn't working

Comments

@bandoos
Copy link

bandoos commented Mar 2, 2024

Describe the bug

NOTE: This is also a response to #112

Hey,
I too am about to start using your great lib in production, so...
I'd like to eventually help out.
I do not currently satisfy the clauses in #112 to become maintainer, but:

I think i have found a bug and maybe i can write a PR to fix it.
Before i go dig though i'd like a feedback.
if you think I am doing something wrong or if I am onto somtething.

So i might satisfy those clauses after this potential bug-hunt.

tldr;

in a mixed async-task/multiprocess pipeline i observe a file descriptor leak
by looking at (h)top you see the leaking subprocs, but the included scripts
meaures the leak using psutils.

The leak only occurs in the mixed setting and if the async stage preceeds the multiporcess one.

Minimal code to reproduce

import pypeln as pl
import time
import psutil
import asyncio
import gc


# ---- unit functions -----


def filter_int(x):
    return isinstance(x, int)


# --- sync implems ---
def op_1(x, cost_sec=0.01):
    time.sleep(cost_sec)
    return x + 1


def op_2(x, cost_sec=0.01):
    time.sleep(cost_sec)
    return x**2


# --- async implems ----
async def aop_1(x, cost_sec=0.01):
    await asyncio.sleep(cost_sec)
    return x + 1


async def aop_2(x, cost_sec=0.01):
    await asyncio.sleep(cost_sec)
    return x**2


def sink_print(x):
    print("===:", x, end="\r")


if __name__ == "__main__":
    REPEATS = 10

    # grab master process info
    self_proc = psutil.Process()

    fds = []
    for rep in range(REPEATS):
        xs = range(100)
        stage = (
            xs
            | pl.sync.filter(filter_int)
            # run the first stage as async task
            | pl.task.map(aop_1, workers=4)
            # the leak does not arise if both stages are process based
            # | pl.process.map(op_1, workers=4)
            | pl.process.map(op_2, workers=4)
            | pl.sync.each(sink_print)
        )

        # without partial
        #
        # stage = pl.sync.filter(filter_int, xs)
        # stage = pl.task.map(aop_1, stage, workers=4)
        # stage = pl.process.map(op_2, stage, workers=4)
        # stage = pl.sync.each(sink_print, stage)

        pl.sync.run(stage)

        fds.append(self_proc.num_fds())
        print(f"\n[{rep}] FDS:", self_proc.num_fds())

        # forcing colletion does not solve
        # gc.collect()

    print(fds)

Observed
The output if using sync -> task -> process combination

Growing number of open file descriptors

===: 10000
[0] FDS: 9
===: 10000
[1] FDS: 12
===: 10000
[2] FDS: 15
===: 10000
[3] FDS: 18
===: 10000
[4] FDS: 21
===: 10000
[5] FDS: 24
===: 10000
[6] FDS: 27
===: 98010
[7] FDS: 30
===: 10000
[8] FDS: 33
===: 10000
[9] FDS: 36
[9, 12, 15, 18, 21, 24, 27, 30, 33, 36]

Expected behavior

Which is what happens if we do not mix process/async

Note the actual number of FDS (6) is not important but rather
that it stays constant

===: 10000
[0] FDS: 6
===: 10000
[1] FDS: 6
===: 10000
[2] FDS: 6
===: 10000
[3] FDS: 6
===: 10000
[4] FDS: 6
===: 10000
[5] FDS: 6
===: 10000
[6] FDS: 6
===: 10000
[7] FDS: 6
===: 10000
[8] FDS: 6
===: 10000
[9] FDS: 6
[6, 6, 6, 6, 6, 6, 6, 6, 6, 6]

A clear and concise description of what you expected to happen.

Library Info

pypeln == 0.4.9
psutil == 5.9.8

Additional context

Context:

say we have a 4 stage pipeline where

  • stage 1 is a fixed filter
  • stages 2-3 are map operations that have a 10 millisecond time cost
    standing for some cpu-bound load.
  • stage 4 is sequential write operation.

say we have, for stages 2 and 3, the option to execute
the code locally (as a regular def)
or simulate delegating to a service rpc style
(here reprsented by the async def version of the operation unit-function).

in the small example this means chainging the time.sleep faking the cpu bound
load into an asyncio.sleep simulating the same but off-loaded over the network.

So could have mixed variations such as:

  • filter
  • op_1 (proc)
  • op_2 (task)
  • write

or

  • filter
  • op_1 (task)
  • op_2 (proc)
  • write
@bandoos bandoos added the bug Something isn't working label Mar 2, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

No branches or pull requests

1 participant