-
Notifications
You must be signed in to change notification settings - Fork 2
Expand file tree
/
Copy pathGetTask.py
More file actions
114 lines (106 loc) · 4.33 KB
/
GetTask.py
File metadata and controls
114 lines (106 loc) · 4.33 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
# -*- coding: utf-8 -*-
# 引入依赖包
# pip install alibabacloud_viapi20230117
import os
import json
from urllib.request import urlopen
from alibabacloud_tea_openapi.models import Config
from alibabacloud_tea_util.models import RuntimeOptions
from alibabacloud_viapi20230117.client import Client
from alibabacloud_viapi20230117.models import QueryAsyncJobListRequest,GetAsyncJobResultRequest
# 查询异步调用列表
def GetTaskList():
config = Config(
# 创建AccessKey ID和AccessKey Secret,请参考https://help.aliyun.com/document_detail/175144.html。
# 如果您用的是RAM用户的AccessKey,还需要为RAM用户授予权限AliyunVIAPIFullAccess,请参考https://help.aliyun.com/document_detail/145025.html。
# 从环境变量读取配置的AccessKey ID和AccessKey Secret。运行代码示例前必须先配置环境变量。
access_key_id=os.environ.get('ALIBABA_CLOUD_ACCESS_KEY_ID'),
access_key_secret=os.environ.get('ALIBABA_CLOUD_ACCESS_KEY_SECRET'),
# 访问的域名
endpoint='viapi.cn-shanghai.aliyuncs.com',
# 访问的域名对应的region
region_id='cn-shanghai'
)
query_async_job_list_request = QueryAsyncJobListRequest(
# start_time='2023-02-23 00:00:00',
# end_time='2023-02-23 23:00:00',
# job_id='1299348D-DFF2-5FDA-8C9C-C2D14EBF63F2'
)
runtime = RuntimeOptions()
try:
# 初始化Client
client = Client(config)
response = client.query_async_job_list_with_options(query_async_job_list_request, runtime)
# 获取整体结果
# print(response.body)
except Exception as error:
# 获取整体报错信息
print(error)
# 获取单个字段
print(error.code)
result_dict = json.loads(response.body.data.result)
return result_dict
#查询异步任务状态
def GetTaskState(RequestId):
config = Config(
access_key_id=os.environ.get('ALIBABA_CLOUD_ACCESS_KEY_ID'),
access_key_secret=os.environ.get('ALIBABA_CLOUD_ACCESS_KEY_SECRET'),
# 访问的域名
endpoint='viapi.cn-shanghai.aliyuncs.com',
# 访问的域名对应的region
region_id='cn-shanghai'
)
query_async_job_list_request = QueryAsyncJobListRequest(
# start_time='2023-02-23 00:00:00',
# end_time='2023-02-23 23:00:00',
job_id = RequestId
)
runtime = RuntimeOptions()
try:
# 初始化Client
client = Client(config)
response = client.query_async_job_list_with_options(query_async_job_list_request, runtime)
# 获取整体结果
# print(response.body)
except Exception as error:
# 获取整体报错信息
print(error)
# 获取单个字段
print(error.code)
# print(response.body)
# print(type(response.body.data.result[0]))
# print(response.body.data.result[0].status)
# # result_dict = json.loads(response.body.data.result)
# # print(result_dict[0]["Status"])
# print(1)
return response.body.data.result[0].status
#查询异步调用结果
def GetTaskResult(RequestId):
config = Config(
# 创建AccessKey ID和AccessKey Secret,请参考https://help.aliyun.com/document_detail/175144.html。
# 如果您用的是RAM用户的AccessKey,还需要为RAM用户授予权限AliyunVIAPIFullAccess,请参考https://help.aliyun.com/document_detail/145025.html。
# 从环境变量读取配置的AccessKey ID和AccessKey Secret。运行代码示例前必须先配置环境变量。
access_key_id=os.environ.get('ALIBABA_CLOUD_ACCESS_KEY_ID'),
access_key_secret=os.environ.get('ALIBABA_CLOUD_ACCESS_KEY_SECRET'),
# 访问的域名
endpoint='viapi.cn-shanghai.aliyuncs.com',
# 访问的域名对应的region
region_id='cn-shanghai'
)
#获取结果
get_async_job_result_request = GetAsyncJobResultRequest(
job_id= RequestId
)
runtime = RuntimeOptions()
try:
# 初始化Client
client = Client(config)
response = client.get_async_job_result_with_options(get_async_job_result_request, runtime)
# 获取整体结果
# print(response.body)
except Exception as error:
# 获取整体报错信息
print(error)
# 获取单个字段
print(error.code)
return response.body