-
Notifications
You must be signed in to change notification settings - Fork 0
/
run-consumer_1_2.py
175 lines (146 loc) · 7.69 KB
/
run-consumer_1_2.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
#!/usr/bin/python
# -*- coding: UTF-8
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at http://mozilla.org/MPL/2.0/. */
# Authors:
# Michael Berg-Mohnicke <michael.berg@zalf.de>
#
# Maintainers:
# Currently maintained by the authors.
#
# This file has been created at the Institute of
# Landscape Systems Analysis at the ZALF.
# Copyright (C: Leibniz Centre for Agricultural Landscape Research (ZALF)
import csv
from datetime import datetime, timedelta
import os
import sys
from collections import defaultdict
import zmq
import shared
import monica_io3
def run_consumer(server=None, port=None):
config = {
"port": port if port else "7777",
"server": server if server else "localhost",
"path-to-output-dir": "./out",
}
shared.update_config(config, sys.argv, print_config=True, allow_new_keys=False)
context = zmq.Context()
socket = context.socket(zmq.PULL)
socket.connect("tcp://" + config["server"] + ":" + config["port"])
socket.RCVTIMEO = 6000
path_to_out_dir = config["path-to-output-dir"]
if not os.path.exists(path_to_out_dir):
try:
os.makedirs(path_to_out_dir)
except OSError:
print(f"{os.path.basename(__file__)} Couldn't create dir {path_to_out_dir}! Exiting.")
exit(1)
daily_filepath = f"{path_to_out_dir}/Ex_1b_Daily_MONICA_calib_Results.csv"
daily_f = open(daily_filepath, "wt", newline="", encoding="utf-8")
daily_f.write(
"Tret: 1 to 13,Day after planting,Zadocks phenology stage,Total above biomass,Leaf Area Index,Daily transpiration,Actual evapotranspiration,Runoff,Deep Percolation,N Leaching,Soil Water Content_layer_1,Soil Water Content_layer_2,Soil Water Content_layer_3,Soil Water Content_layer_4,Soil Water Content_layer_5,Soil Water Content_layer_6,Soil Water Content_layer_7,Soil Water Content_layer_8,Soil Water Content_layer_9,Soil Water Content_layer_10,Soil Water Content_layer_11,Soil Water Content_layer_12,Soil Water Content_layer_13,Soil Water Content_layer_14,Soil Water Content_layer_15\n")
daily_f.write(
"Treatment,DAP,ZDPH,CWAD,LAI,TRANS,ETa,Roff,DPER,NLEA,SWC,SWC,SWC,SWC,SWC,SWC,SWC,SWC,SWC,SWC,SWC,SWC,SWC,SWC,SWC\n") ##This needs to change##
daily_f.flush()
daily_writer = csv.writer(daily_f, delimiter=",")
crop_filepath = f"{path_to_out_dir}/Ex_1b_MONICA_calib_Results.csv"
crop_f = open(crop_filepath, "wt", newline="", encoding="utf-8")
crop_f.write(
",Grain yield,Grain number ,grain unit weight ,Maximum Leaf Area Index,Total biomass at maturity ,Root Biomass\n") ##This needs to change##
crop_f.write("Treatment,GWAD,G#AD,GWGD,LAID,CWAD,RWAD\n")
crop_f.flush()
crop_writer = csv.writer(crop_f, delimiter=",")
no_of_trts_to_receive = None
no_of_trts_received = 0
while no_of_trts_to_receive != no_of_trts_received:
try:
msg: dict = socket.recv_json()
if msg.get("errors", []):
print(f"{os.path.basename(__file__)} received errors: {msg['errors']}")
continue
custom_id = msg.get("customId", {})
if custom_id.get("nodata", False):
no_of_trts_to_receive = custom_id.get("no_of_trts", None)
print(f"{os.path.basename(__file__)} received nodata=true -> done")
continue
no_of_trts_received += 1
trt_no = custom_id.get("trt_no", None)
soil_name = custom_id.get("soil_name", None)
#write_monica_out(trt_no, msg)
#continue
print(f"{os.path.basename(__file__)} received result trt_no: {trt_no}")
# [(layer_bottom_depth_cm, layer_index), ...]
layers = {
"CH5531001": [(5, 0), (10, 0), (20, 1), (30, 2), (40, 3), (50, 4), (60, 5), (70, 6),
(90, (7, 8)), (110, (9, 10)), (130, (11, 12)), (150, (13, 14)),
(170, (15, 16)), (190, (17, 18)), (210, 19)],
"CH5531002": [(5, 0), (10, 0), (20, 1), (30, 2), (40, 3), (50, 4), (60, 5), (70, 6),
(90, (7, 8)), (110, (9, 10)), (130, (11, 12)), (150, (13, 14)),
(170, (15, 16)), (190, (17, 18)), (210, 19)],
"LLWatelg": [(5, 0), (15, (0, 1)), (20, 1), (30, 2), (40, 3), (50, 4), (60, 5),
(70, 6), (90, (7, 8)), (110, (9, 10)), (125, (11, 13))],
"LLWatelg01": [(5, 0), (15, (0, 1)), (20, 1), (30, 2), (40, 3), (50, 4), (60, 5),
(70, 6), (90, (7, 8)), (110, (9, 10)), (125, (11, 13))]
}.get(soil_name, None)
if not layers:
continue
data: dict = msg["data"][0]
results: list = data["results"]
sowing_date = None
for vals in results:
if not sowing_date:
sowing_date = vals["Date"]
current_date = vals["Date"]
dap = (datetime.fromisoformat(current_date) - datetime.fromisoformat(sowing_date)).days
swcs = []
for layer_bottom_depth_cm, layer_indices in layers:
from_layer_index = layer_index = layer_indices if isinstance(layer_indices, int) else layer_indices[0]
to_layer_index = layer_indices if isinstance(layer_indices, int) else layer_indices[1]
layer_swc_vals = vals["SWC"][from_layer_index:to_layer_index+1]
swcs.append(sum(layer_swc_vals) / len(layer_swc_vals))
row = [trt_no, dap, vals["Stage"], vals["CWAD"], vals["LAI"], vals["TRANS"], vals["ETa"],
vals["Roff"], vals["DPER"][0], vals["NLEA"]] + swcs
daily_writer.writerow(row)
data: dict = msg["data"][1]
vals: dict = data["results"][0]
data2: dict = msg["data"][2]
vals2: dict = data2["results"][0]
row = [trt_no, vals2["GWAD"], -1, -1, vals["LAID"], vals2["CWAD"], vals2["RWAD"]]
crop_writer.writerow(row)
except Exception as e:
print(f"{os.path.basename(__file__)} Exception: {e}")
daily_f.close()
crop_f.close()
print(f"{os.path.basename(__file__)} exiting run_consumer()")
def write_monica_out(trt_no, msg):
path_to_out_dir = "out"
if not os.path.exists(path_to_out_dir):
try:
os.makedirs(path_to_out_dir)
except OSError:
print("c: Couldn't create dir:", path_to_out_dir, "! Exiting.")
exit(1)
# with open("out/out-" + str(i) + ".csv", 'wb') as _:
path_to_file = path_to_out_dir + "/trt_no-" + str(trt_no) + ".csv"
with open(path_to_file, "w", newline='') as _:
writer = csv.writer(_, delimiter=";")
for data_ in msg.get("data", []):
results = data_.get("results", [])
orig_spec = data_.get("origSpec", "")
output_ids = data_.get("outputIds", [])
if len(results) > 0:
writer.writerow([orig_spec.replace("\"", "")])
for row in monica_io3.write_output_header_rows(output_ids,
include_header_row=True,
include_units_row=False,
include_time_agg=False):
writer.writerow(row)
for row in monica_io3.write_output_obj(output_ids, results):
writer.writerow(row)
writer.writerow([])
print("wrote:", path_to_file)
if __name__ == "__main__":
run_consumer()