1
- import time
2
1
from datetime import datetime , timedelta
3
2
from typing import Any , Dict , List , Tuple , cast
4
3
8
7
from shapely .geometry import Point
9
8
10
9
from vibe_core .client import FarmvibesAiClient , get_default_vibe_client
11
- from vibe_core .datamodel import RunConfig , RunConfigUser , RunDetails , SpatioTemporalJson
10
+ from vibe_core .datamodel import RunConfig , RunConfigUser , SpatioTemporalJson
12
11
13
12
14
13
class Forecast :
@@ -31,7 +30,8 @@ def submit_download_request(self):
31
30
"""
32
31
Submit request to worker to download forecast data
33
32
"""
34
- run_list = []
33
+ run_metadata_list = []
34
+ runs = []
35
35
for parameter in self .parameters :
36
36
run_name = f"forecast_{ parameter ['weather_type' ]} "
37
37
run = self .client .run (
@@ -42,57 +42,40 @@ def submit_download_request(self):
42
42
parameters = parameter ,
43
43
)
44
44
45
- try :
46
- run .block_until_complete (5 )
47
- except RuntimeError :
48
- print (run )
49
-
50
- run_list .append (
45
+ run_metadata_list .append (
51
46
{
52
47
"id" : run .id ,
53
48
"weather_type" : parameter ["weather_type" ],
54
49
}
55
50
)
51
+ runs .append (run )
52
+
53
+ self .client .monitor (runs , 5 )
56
54
57
- return run_list
55
+ return run_metadata_list
58
56
59
57
def get_run_status (self , run_list : List [Dict [str , str ]]):
60
58
clear_output (wait = True )
61
- all_done = True
62
- out_ = []
59
+ out = []
63
60
for run_item in run_list :
64
61
o = self .client .describe_run (run_item ["id" ])
65
62
print (f"Execution status for { run_item ['weather_type' ]} : { o .details .status } " )
66
63
67
64
if o .details .status == "done" :
68
- out_ .append (o )
69
- elif o .details .status == "failed" :
70
- print (o .details )
65
+ out .append (o )
71
66
else :
72
- all_done = False
73
- cnt_complete = 0
74
- for key , value in o .task_details .items ():
75
- value = cast (RunDetails , value )
76
- assert value .subtasks is not None , "Subtasks don't exist"
77
- for subtask in value .subtasks :
78
- if subtask .status == "done" :
79
- cnt_complete += 1
80
- print (
81
- "\t " ,
82
- f"Subtask { key } " ,
83
- cnt_complete ,
84
- "/" ,
85
- len (value .subtasks ),
86
- )
87
- cnt_complete = 0
88
- return all_done , out_
67
+ raise Exception (
68
+ f"Execution status for { run_item ['weather_type' ]} : { o .details .status } "
69
+ )
70
+
71
+ return out
89
72
90
73
def get_all_assets (self , details : RunConfigUser ):
91
74
asset_files = []
92
75
output = details .output ["weather_forecast" ]
93
76
record : Dict [str , Any ]
94
77
for record in cast (List [Dict [str , Any ]], output ):
95
- for _ , value in record ["assets" ].items ():
78
+ for value in record ["assets" ].values ():
96
79
asset_files .append (value ["href" ])
97
80
df_assets = [pd .read_csv (f , index_col = False ) for f in asset_files ]
98
81
df_out = pd .concat (df_assets )
@@ -104,21 +87,15 @@ def get_downloaded_data(self, run_list: List[Dict[str, str]], offset_hours: int
104
87
check the download status. If status is done, fetch the downloaded data
105
88
"""
106
89
forecast_dataset = pd .DataFrame ()
107
- status = False
108
- out_ = []
109
- while status is False :
110
- status , out_ = self .get_run_status (run_list )
111
- time .sleep (10 )
112
-
113
- if status :
114
- for detail in out_ :
115
- df = self .get_all_assets (detail )
90
+ out = self .get_run_status (run_list )
91
+ for detail in out :
92
+ df = self .get_all_assets (detail )
116
93
117
- # Offset from UTC to specified timezone
118
- df .index = df .index + pd .offsets .Hour (offset_hours )
94
+ # Offset from UTC to specified timezone
95
+ df .index = df .index + pd .offsets .Hour (offset_hours )
119
96
120
- if not df .empty :
121
- forecast_dataset = pd .concat ([forecast_dataset , df ], axis = 1 )
97
+ if not df .empty :
98
+ forecast_dataset = pd .concat ([forecast_dataset , df ], axis = 1 )
122
99
123
100
return forecast_dataset
124
101
0 commit comments