-
Notifications
You must be signed in to change notification settings - Fork 0
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Multiprocessing on top of async #38
Comments
Hard to tell apart, but I recognise Thomas Grainger among the contributors to aiomultiprocess, also a contributor to httpx and in the Python IRC / general community / stdlib contrib I think |
I've now looked at the source code for aiostream, and getting a better understanding of what the following does under the hood: range-streams/src/range_streams/async_utils.py Lines 160 to 167 in 4dad2c9
|
The example code for aiomultiprocess is: import asyncio
from aiohttp import request
from aiomultiprocess import Pool
async def get(url):
async with request("GET", url) as response:
return await response.text("utf-8")
async def main(urls: list[str]):
async with Pool() as pool:
async for result in pool.map(get, urls):
... # process result
asyncio.run(main()) The map here is def map(
self,
func: Callable[[T], Awaitable[R]],
iterable: Sequence[T],
# chunksize: int = None, # todo: implement chunking maybe
) -> PoolResult[R]:
"""Run a coroutine once for each item in the iterable."""
if not self.running:
raise RuntimeError("pool is closed")
tids = [self.queue_work(func, (item,), {}) for item in iterable]
return PoolResult(self, tids) Note that there is nothing special about the
I think then it would work to change the assignment of However note that the example given above is for the fetch (they make an async funcdef
...anyway, note that the example is for the fetch, and for me the fetch is for def starmap(
self,
func: Callable[..., Awaitable[R]],
iterable: Sequence[Sequence[T]],
# chunksize: int = None, # todo: implement chunking maybe
) -> PoolResult[R]:
"""Run a coroutine once for each sequence of items in the iterable."""
if not self.running:
raise RuntimeError("pool is closed")
tids = [self.queue_work(func, args, {}) for args in iterable]
return PoolResult(self, tids)
I think that this means the async def fetch_and_process(self, urls: Iterator[str], client):
assert isinstance(client, httpx.AsyncClient) # Not type checked due to Sphinx
client.timeout = self.timeout
ws = stream.repeat(client)
xs = stream.zip(ws, stream.iterate(urls))
async with Pool() as pool:
ys = pool.starmap(iterable=xs, func=self.fetch)
zs = stream.map(ys, self.process_stream, ordered=False, task_limit=20)
return await zs Note that configuring the
This fails with:
|
Given how these don't seem to work well together, perhaps I should just replace the It doesn't seem to work well with An alternative approach then would be to clone the client (there is nothing preventing from doing this other than it takes time, but can do it in parallel).
|
You can share memory in Python 3.8+, perhaps a client could reside in shared memory for access on all cores... (It has a bug though so maybe not) |
I got it to work as follows: # Multiple processes
from multiprocessing import cpu_count, freeze_support
from aiomultiprocess import Pool
from aiostream import stream
from itertools import repeat
from more_itertools import divide
import httpx
import time
import asyncio
CPU_COUNT = cpu_count()
l_alphabet = "abcdefghijklmnopqrstuvwxyz"
u_alphabet = l_alphabet.upper()
# Pairs from Aa, Ab, ... to ... Zy, Zz
all_combos = [
f"{u}{l}{ll}" for u in u_alphabet for l in l_alphabet for ll in l_alphabet
]
assert len(all_combos) == 26 ** 3
# If you divide N items into L parts where N < L, the final (L-N) will be empty
# which would mean creating AsyncFetcher with empty URL lists, causing errors
# Avoid that problem by always using the minimum between CPU count and iterable size:
n_parts = min(CPU_COUNT, len(all_combos))
split_combos_it = divide(n_parts, all_combos) # split `all_combos` into `n_parts` lists
split_combos_lists = map(list, split_combos_it)
async def sleep(items):
# async with httpx.AsyncClient() as client:
client = httpx.AsyncClient()
await asyncio.sleep(2)
print(items)
await client.aclose()
async def multi_sleep():
async with Pool() as pool:
t0 = time.time()
async for result in pool.map(sleep, split_combos_lists):
await asyncio.sleep(0) # this executes in the process pool context
t1 = time.time()
print(f"{t1-t0}s")
if __name__ == "__main__":
freeze_support()
asyncio.run(multi_sleep()) Click to show output
This approach requires multiple clients where the previous required only one. I also suspect this will mean they cannot be reused across files (unless provisioned above the level at which the files were looped over!) as the client cannot be passed out of the process pool context (as annotated in the code comment above). An alternative approach that would make such a problem negligible would be:
|
Now with tqdm and logging: # Multiple processes
from multiprocessing import cpu_count, freeze_support
from aiomultiprocess import Pool
from aiostream import stream
from itertools import repeat
from more_itertools import divide
import httpx
import time
import asyncio
from tqdm.asyncio import tqdm
import logging
def write_log(msg):
log = logging.getLogger()
log.setLevel(logging.DEBUG)
log_format = logging.Formatter('[%(asctime)s] [%(levelname)s] - %(message)s')
console = logging.StreamHandler()
console.setLevel(logging.DEBUG)
console.setFormatter(log_format)
log.addHandler(console)
log.debug(msg)
CPU_COUNT = cpu_count()
l_alphabet = "abcdefghijklmnopqrstuvwxyz"
u_alphabet = l_alphabet.upper()
# Pairs from Aa, Ab, ... to ... Zy, Zz
all_combos = [
f"{u}{l}{ll}" for u in u_alphabet for l in l_alphabet for ll in l_alphabet
]
assert len(all_combos) == 26 ** 3
# If you divide N items into L parts where N < L, the final (L-N) will be empty
# which would mean creating AsyncFetcher with empty URL lists, causing errors
# Avoid that problem by always using the minimum between CPU count and iterable size:
n_parts = min(CPU_COUNT, len(all_combos))
split_combos_it = divide(n_parts, all_combos) # split `all_combos` into `n_parts` lists
split_combos_lists = map(list, split_combos_it)
async def sleep(items):
# async with httpx.AsyncClient() as client:
client = httpx.AsyncClient()
await asyncio.sleep(2)
#print(items)
await client.aclose()
async def multi_sleep():
pbar = tqdm(total=n_parts)
async with Pool() as pool:
async for result in pool.map(sleep, split_combos_lists):
pbar.update()
await asyncio.sleep(0) # this executes in the process pool context
pbar.close()
if __name__ == "__main__":
freeze_support()
t0 = time.time()
asyncio.run(multi_sleep())
t1 = time.time()
write_log(f"{t1-t0}s") ⇣
|
The speed of the AsyncFetcher appears to be limited by the speed of the GIL (i.e. a single CPU core is being pinned at 100% when the fetcher makes calls), which could be resolved by this solution (with slight updates for current asyncio usage)
I don't really understand this example, and I've just rewritten my code to be non-blocking, so not sure why it's being assumed I will be multiprocessing a synchronous function rather than a top-level sync function...
However ignoring that, I think the principle may still apply (of passing the ProcessPoolExecutor)?
The other matter is that of splitting the work up into separate processes in the pool (which I'll need to do regardless of how the multiprocessing is to be achieved).
I think the "data-parallelism" should be at the file level, i.e. when processing one TSV at a time (not doing multiple), implemented by splitting up the URL list for the one file into
n_splits
splits (corresponding to the number of processes in the pool,mp.cpu_count
I think), and then creating anAsyncFetcher
for each of these (100/n_splits
)% URL lists.Resuming would still be fairly trivial (instead of the last seen PNG as in serial, you'd want to use the minimum non-completed row, i.e. the end value of the minimum range in the RangeSet formed from conjoining all of the
AsyncFetcher.completed
RangeSet
s for each of the fetchers)The text was updated successfully, but these errors were encountered: