-
Notifications
You must be signed in to change notification settings - Fork 8
/
Copy pathasync_subprocess.py
64 lines (57 loc) · 2.16 KB
/
async_subprocess.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
"""asyncio subprocess create_subprocess_exec"""
import asyncio
import subprocess
from exception import AsyncCalledProcessError
async def check_call(args, *, cwd, env=None, shell=False):
"""
Similar to subprocess.check_call but adapted for asyncio. Please add new arguments as needed.
cwd is added as an explicit argument because asyncio will not work well with os.chdir, which is not bound to the
context of the running coroutine.
"""
returncode = await call(args, cwd=cwd, env=env, shell=shell)
if returncode != 0:
raise AsyncCalledProcessError(returncode, args if shell else args[0])
async def check_output(args, *, cwd, env=None, shell=False):
"""
Similar to subprocess.check_output but adapted for asyncio. Please add new arguments as needed.
cwd is added as an explicit argument because asyncio will not work well with os.chdir, which is not bound to the
context of the running coroutine.
"""
create_func = (
asyncio.create_subprocess_shell if shell else asyncio.create_subprocess_exec
)
popenargs = [args] if shell else args
proc = await create_func(
*popenargs,
stdin=None,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
env=env,
cwd=cwd,
)
stdout_data, stderr_data = await proc.communicate(input=None)
returncode = await proc.wait()
if returncode != 0:
raise AsyncCalledProcessError(
returncode, popenargs[0], output=stdout_data, stderr=stderr_data
)
return stdout_data
async def call(args, *, cwd, env=None, shell=False):
"""
Similar to subprocess.call but adapted for asyncio. Please add new arguments as needed.
cwd is added as an explicit argument because asyncio will not work well with os.chdir, which is not bound to the
context of the running coroutine.
"""
create_func = (
asyncio.create_subprocess_shell if shell else asyncio.create_subprocess_exec
)
popenargs = [args] if shell else args
proc = await create_func(
*popenargs,
stdin=None,
stdout=None,
stderr=None,
cwd=cwd,
env=env,
)
return await proc.wait()