Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Bug] maxsize is ignored when chaining multiple stages #111

Open
bzamecnik opened this issue Nov 10, 2023 · 0 comments
Open

[Bug] maxsize is ignored when chaining multiple stages #111

bzamecnik opened this issue Nov 10, 2023 · 0 comments
Labels
bug Something isn't working

Comments

@bzamecnik
Copy link

Describe the bug
A clear and concise description of what the bug is.

Although maxsize on stages such as map() should limit the queue of items in the stage, it does not work as expected, when multiple map() stages are chained. The first stage is drained immediately.

Note: In addition to that each stage adds two extra items over maxsize.

Minimal code to reproduce
Small snippet that contains a minimal amount of code.

import logging
import threading
import time

import pypeln as pl

# since print() among threads results in wrong ordering
logger = logging.getLogger('foo')
logging.basicConfig(level=logging.DEBUG)

def load(x):
    logger.debug(f"{threading.get_ident()} loading {x}")
    return x

def process(x):
    time.sleep(0.1) # some slow computation
    return f"processed {x}"

def show_results(stage):
    for result in stage:
        logger.debug(f"{threading.get_ident()} result '{result}'")

stage = pl.thread.map(load, range(10), workers=1, maxsize=1)
stage = pl.thread.map(process, stage, workers=1)

show_results(stage)

Output (wrong):

DEBUG:foo:140324684944960 loading 0
DEBUG:foo:140324684944960 loading 1
DEBUG:foo:140324684944960 loading 2
DEBUG:foo:140324684944960 loading 3
DEBUG:foo:140324684944960 loading 4
DEBUG:foo:140324684944960 loading 5
DEBUG:foo:140324684944960 loading 6
DEBUG:foo:140324684944960 loading 7
DEBUG:foo:140324684944960 loading 8
DEBUG:foo:140324684944960 loading 9
DEBUG:foo:140325310869696 result 'processed 0'
DEBUG:foo:140325310869696 result 'processed 1'
DEBUG:foo:140325310869696 result 'processed 2'
DEBUG:foo:140325310869696 result 'processed 3'
DEBUG:foo:140325310869696 result 'processed 4'
DEBUG:foo:140325310869696 result 'processed 5'
DEBUG:foo:140325310869696 result 'processed 6'
DEBUG:foo:140325310869696 result 'processed 7'
DEBUG:foo:140325310869696 result 'processed 8'
DEBUG:foo:140325310869696 result 'processed 9'

If we only have one stage it works well (OK):

stage = pl.thread.map(load, range(10), workers=1, maxsize=1)
show_results(stage)
DEBUG:foo:140324659766848 loading 0
DEBUG:foo:140324659766848 loading 1
DEBUG:foo:140325310869696 result '0'
DEBUG:foo:140324659766848 loading 2
DEBUG:foo:140325310869696 result '1'
DEBUG:foo:140324659766848 loading 3
DEBUG:foo:140325310869696 result '2'
DEBUG:foo:140324659766848 loading 4
DEBUG:foo:140325310869696 result '3'
DEBUG:foo:140324659766848 loading 5
DEBUG:foo:140325310869696 result '4'
DEBUG:foo:140324659766848 loading 6
DEBUG:foo:140325310869696 result '5'
DEBUG:foo:140324659766848 loading 7
DEBUG:foo:140325310869696 result '6'
DEBUG:foo:140324659766848 loading 8
DEBUG:foo:140325310869696 result '7'
DEBUG:foo:140324659766848 loading 9
DEBUG:foo:140325310869696 result '8'
DEBUG:foo:140325310869696 result '9'

If we set maxsize in the second stage it limits the queue in the first stage, not the second (wrong):

stage = pl.thread.map(load, range(10), workers=1)
stage = pl.thread.map(process, stage, workers=1, maxsize=1)

show_results(stage)
DEBUG:foo:140324684944960 loading 0
DEBUG:foo:140324684944960 loading 1
DEBUG:foo:140324684944960 loading 2
DEBUG:foo:140325310869696 result 'processed 0'
DEBUG:foo:140324684944960 loading 3
DEBUG:foo:140324684944960 loading 4
DEBUG:foo:140325310869696 result 'processed 1'
DEBUG:foo:140325310869696 result 'processed 2'
DEBUG:foo:140324684944960 loading 5
DEBUG:foo:140325310869696 result 'processed 3'
DEBUG:foo:140324684944960 loading 6
DEBUG:foo:140325310869696 result 'processed 4'
DEBUG:foo:140324684944960 loading 7
DEBUG:foo:140324684944960 loading 8
DEBUG:foo:140325310869696 result 'processed 5'
DEBUG:foo:140325310869696 result 'processed 6'
DEBUG:foo:140324684944960 loading 9
DEBUG:foo:140325310869696 result 'processed 7'
DEBUG:foo:140325310869696 result 'processed 8'
DEBUG:foo:140325310869696 result 'processed 9'

Expected behavior
A clear and concise description of what you expected to happen.

  • when maxsize=1 is set on the first stage, the result should look like in the second output (the queue for the first stage should be limited)
  • when maxsize=1 is set on the second stage, the result should look like in the first output (the queue for the first stage should not be limited)

Library Info
Please provide os info and elegy version.

import pypeln
print(pypeln.__version__)

0.4.9

Screenshots
If applicable, add screenshots to help explain your problem.

Additional context
Add any other context about the problem here.

@bzamecnik bzamecnik added the bug Something isn't working label Nov 10, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

No branches or pull requests

1 participant