-
Notifications
You must be signed in to change notification settings - Fork 0
/
My_Results.txt
659 lines (492 loc) · 23.7 KB
/
My_Results.txt
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
Prerequisite 1: Download and Store the CSV files
```python
import requests
import os
# Define the base URLs
base_url_1 = "https://www.ncei.noaa.gov/data/global-summary-of-the-day/access/{}/99495199999.csv"
base_url_2 = "https://www.ncei.noaa.gov/data/global-summary-of-the-day/access/{}/72429793812.csv"
# Define the range of years
years = range(2015, 2025)
# Base directory to save the downloaded files
base_output_dir = "./weather_data/"
# Loop through each year and download the CSV files for both datasets
for year in years:
# Create a directory for each year
year_dir = os.path.join(base_output_dir, str(year))
os.makedirs(year_dir, exist_ok=True)
# Download each file (Florida and Cincinnati)
for base_url, station_id in [(base_url_1, "99495199999"), (base_url_2, "72429793812")]:
url = base_url.format(year)
response = requests.get(url)
# Check if the request was successful
if response.status_code == 200:
# Save the file in the appropriate year directory
file_path = os.path.join(year_dir, f"{station_id}.csv")
with open(file_path, "wb") as file:
file.write(response.content)
print(f"Downloaded: {file_path}")
else:
print(f"Failed to download {url}. Status code: {response.status_code}")
```
Downloaded: ./weather_data/2015/99495199999.csv
Downloaded: ./weather_data/2015/72429793812.csv
Failed to download https://www.ncei.noaa.gov/data/global-summary-of-the-day/access/2016/99495199999.csv. Status code: 404
Downloaded: ./weather_data/2016/72429793812.csv
Downloaded: ./weather_data/2017/99495199999.csv
Downloaded: ./weather_data/2017/72429793812.csv
Downloaded: ./weather_data/2018/99495199999.csv
Downloaded: ./weather_data/2018/72429793812.csv
Downloaded: ./weather_data/2019/99495199999.csv
Downloaded: ./weather_data/2019/72429793812.csv
Downloaded: ./weather_data/2020/99495199999.csv
Downloaded: ./weather_data/2020/72429793812.csv
Downloaded: ./weather_data/2021/99495199999.csv
Downloaded: ./weather_data/2021/72429793812.csv
Downloaded: ./weather_data/2022/99495199999.csv
Downloaded: ./weather_data/2022/72429793812.csv
Downloaded: ./weather_data/2023/99495199999.csv
Downloaded: ./weather_data/2023/72429793812.csv
Downloaded: ./weather_data/2024/99495199999.csv
Downloaded: ./weather_data/2024/72429793812.csv
Prerequisite 2: Clean the data preserving original data
```python
import os
import pandas as pd
# Define the base input and output directories
base_input_dir = "./weather_data/"
base_output_dir = "./cleaned_weather_data/"
# Define the invalid value representations
invalid_values = {
# "TEMP": 9999.9,
# "DEWP": 9999.9,
# "SLP": 9999.9,
# "STP": 9999.9,
# "VISIB": 999.9,
# "WDSP": 999.9,
"MXSPD": 999.9,
# "GUST": 999.9,
"MAX": 9999.9,
# "MIN": 9999.9,
# "PRCP": 99.99,
# "SNDP": 999.9
}
# Loop through each year directory
for year in range(2015, 2025):
year_dir = os.path.join(base_input_dir, str(year))
# Check if the year directory exists
if os.path.exists(year_dir):
# Loop through each file in the year directory
for station_id in ["99495199999", "72429793812"]:
file_path = os.path.join(year_dir, f"{station_id}.csv")
# Check if the file exists
if os.path.exists(file_path):
# Read the CSV file into a DataFrame
df = pd.read_csv(file_path)
# Filter out rows with invalid values
for column, invalid_value in invalid_values.items():
df = df[df[column] != invalid_value]
# Create the output directory for the year if it doesn't exist
output_year_dir = os.path.join(base_output_dir, str(year))
os.makedirs(output_year_dir, exist_ok=True)
# Save the cleaned DataFrame to the new directory
cleaned_file_path = os.path.join(output_year_dir, f"{station_id}.csv")
df.to_csv(cleaned_file_path, index=False)
print(f"Cleaned data saved to: {cleaned_file_path}")
else:
print(f"File not found: {file_path}")
else:
print(f"Year directory not found: {year_dir}")
```
Cleaned data saved to: ./cleaned_weather_data/2015/99495199999.csv
Cleaned data saved to: ./cleaned_weather_data/2015/72429793812.csv
File not found: ./weather_data/2016/99495199999.csv
Cleaned data saved to: ./cleaned_weather_data/2016/72429793812.csv
Cleaned data saved to: ./cleaned_weather_data/2017/99495199999.csv
Cleaned data saved to: ./cleaned_weather_data/2017/72429793812.csv
Cleaned data saved to: ./cleaned_weather_data/2018/99495199999.csv
Cleaned data saved to: ./cleaned_weather_data/2018/72429793812.csv
Cleaned data saved to: ./cleaned_weather_data/2019/99495199999.csv
Cleaned data saved to: ./cleaned_weather_data/2019/72429793812.csv
Cleaned data saved to: ./cleaned_weather_data/2020/99495199999.csv
Cleaned data saved to: ./cleaned_weather_data/2020/72429793812.csv
Cleaned data saved to: ./cleaned_weather_data/2021/99495199999.csv
Cleaned data saved to: ./cleaned_weather_data/2021/72429793812.csv
Cleaned data saved to: ./cleaned_weather_data/2022/99495199999.csv
Cleaned data saved to: ./cleaned_weather_data/2022/72429793812.csv
Cleaned data saved to: ./cleaned_weather_data/2023/99495199999.csv
Cleaned data saved to: ./cleaned_weather_data/2023/72429793812.csv
Cleaned data saved to: ./cleaned_weather_data/2024/99495199999.csv
Cleaned data saved to: ./cleaned_weather_data/2024/72429793812.csv
Question 2: Load the CSV files and display the count of each dataset.
```python
from pyspark.sql import SparkSession
import os
# Initialize Spark session
spark = SparkSession.builder.appName("WeatherDataCount").getOrCreate()
# Base path to the weather data
base_path = "./weather_data/"
# Dictionary to hold the counts of datasets
dataset_counts = {}
# Loop through each year from 2015 to 2024
for year in range(2015, 2025):
for station_code in ['99495199999', '72429793812']: # Florida and Cincinnati
file_path = os.path.join(base_path, str(year), f"{station_code}.csv")
# Load the CSV file if it exists
if os.path.exists(file_path):
df = spark.read.csv(file_path, header=True, inferSchema=True)
count = df.count() # Get the count of rows
dataset_counts[f"{year}/{station_code}"] = count
# Display the counts of each dataset
for dataset, count in dataset_counts.items():
print(f"Dataset: {dataset}, Count: {count}")
# Stop the Spark session
spark.stop()
```
Dataset: 2015/99495199999, Count: 355
Dataset: 2015/72429793812, Count: 365
Dataset: 2016/72429793812, Count: 366
Dataset: 2017/99495199999, Count: 283
Dataset: 2017/72429793812, Count: 365
Dataset: 2018/99495199999, Count: 363
Dataset: 2018/72429793812, Count: 365
Dataset: 2019/99495199999, Count: 345
Dataset: 2019/72429793812, Count: 365
Dataset: 2020/99495199999, Count: 365
Dataset: 2020/72429793812, Count: 366
Dataset: 2021/99495199999, Count: 104
Dataset: 2021/72429793812, Count: 365
Dataset: 2022/99495199999, Count: 259
Dataset: 2022/72429793812, Count: 365
Dataset: 2023/99495199999, Count: 276
Dataset: 2023/72429793812, Count: 365
Dataset: 2024/99495199999, Count: 133
Dataset: 2024/72429793812, Count: 296
Question 3: Find the hottest day (column MAX) for each year.
```python
from pyspark.sql import functions as F
import os
# Base path to the weather data
base_path = "./cleaned_weather_data/"
# Initialize a dictionary to store the hottest days per year
hottest_days = {}
# Loop through the years to find the hottest day
for year in range(2015, 2025):
year_dir = os.path.join(base_path, str(year))
for filename in os.listdir(year_dir):
if filename.endswith('.csv'):
# Read the CSV file into a DataFrame
df = spark.read.csv(os.path.join(year_dir, filename), header=True, inferSchema=True)
# Check if the DataFrame is empty
if df.isEmpty():
continue # Skip to the next file
# Check if the "MAX" column exists
if "MAX" not in df.columns:
print(f"The 'MAX' column does not exist in {filename}.")
continue # Skip to the next file
# Find the hottest day for the current DataFrame
max_day = df.orderBy(F.desc("MAX")).first()
# Check if max_day is None
if max_day is not None:
# Store the hottest day only if the year is not already recorded
if year not in hottest_days:
hottest_days[year] = (max_day.STATION, max_day.NAME, max_day.DATE, max_day.MAX)
# Convert results to a DataFrame for display
if hottest_days:
hottest_days_list = [(year, *data) for year, data in hottest_days.items()]
hottest_days_df = spark.createDataFrame(hottest_days_list, ["YEAR", "STATION", "NAME", "DATE", "MAX"])
hottest_days_df.show()
else:
print("No hottest days found across the datasets.")
```
+----+-----------+--------------------+----------+----+
|YEAR| STATION| NAME| DATE| MAX|
+----+-----------+--------------------+----------+----+
|2015|99495199999|SEBASTIAN INLET S...|2015-07-28|90.0|
|2016|72429793812|CINCINNATI MUNICI...|2016-07-24|93.9|
|2017|99495199999|SEBASTIAN INLET S...|2017-05-13|88.3|
|2018|99495199999|SEBASTIAN INLET S...|2018-09-15|90.1|
|2019|99495199999|SEBASTIAN INLET S...|2019-09-06|91.6|
|2020|99495199999|SEBASTIAN INLET S...|2020-04-13|91.8|
|2021|72429793812|CINCINNATI MUNICI...|2021-08-12|95.0|
|2022|72429793812|CINCINNATI MUNICI...|2022-06-14|96.1|
|2023|99495199999|SEBASTIAN INLET S...|2023-12-10|79.5|
|2024|99495199999|SEBASTIAN INLET S...|2024-05-14|86.7|
+----+-----------+--------------------+----------+----+
Question. 4: Find the coldest day (column MIN) for the month of March across all years (2015-2024).
```python
from pyspark.sql import functions as F
import os
# Initialize an empty list to store results
march_data = []
# Initialize Spark session
spark = SparkSession.builder.appName("Coldest Day").getOrCreate()
# Base path to the weather data
base_path = "./cleaned_weather_data/"
# Loop through the years to collect March data
for year in range(2015, 2025):
year_dir = os.path.join(base_path, str(year))
for filename in os.listdir(year_dir):
if filename.endswith('.csv'):
df = spark.read.csv(os.path.join(year_dir, filename), header=True, inferSchema=True)
# Filter for March data
march_df = df.filter(df.DATE.contains('-03-'))
if not march_df.isEmpty():
# Get the coldest day for March in the current DataFrame
coldest_day = march_df.orderBy(F.asc("MIN")).first()
# Append results
if coldest_day is not None:
march_data.append((coldest_day.STATION, coldest_day.NAME, coldest_day.DATE, coldest_day.MIN))
# Convert results to a DataFrame for display
if march_data:
coldest_day_df = spark.createDataFrame(march_data, ["STATION", "NAME", "DATE", "MIN"])
# Sort by MIN to get the overall coldest day in March
overall_coldest_day = coldest_day_df.orderBy(F.asc("MIN")).first()
overall_coldest_day_df = spark.createDataFrame([overall_coldest_day], ["STATION", "NAME", "DATE", "MIN"])
overall_coldest_day_df.show() # Display only the overall coldest day
else:
print("No March data found across the datasets.")
```
+-----------+--------------------+----------+---+
| STATION| NAME| DATE|MIN|
+-----------+--------------------+----------+---+
|72429793812|CINCINNATI MUNICI...|2015-03-06|3.2|
+-----------+--------------------+----------+---+
Question 5: Find the year with the most precipitation for Cincinnati and Florida.
```python
from pyspark.sql import functions as F
import os
# Initialize an empty list to store results
annual_precipitation = []
# Initialize Spark session
spark = SparkSession.builder.appName("Most Precipitation").getOrCreate()
# Base path to the cleaned weather data
base_path = "./cleaned_weather_data/"
# Loop through the years to calculate mean precipitation
for year in range(2015, 2025):
year_dir = os.path.join(base_path, str(year))
for filename in os.listdir(year_dir):
if filename.endswith('.csv'):
# Read the CSV file into a DataFrame
df = spark.read.csv(os.path.join(year_dir, filename), header=True, inferSchema=True)
# Check if the DataFrame is empty
if df.isEmpty():
continue # Skip to the next file
# Check if the DataFrame contains the 'PRCP' column
if "PRCP" not in df.columns:
print(f"'PRCP' column not found in {filename}")
continue
# Calculate mean of PRCP
mean_prcp = df.agg(F.mean("PRCP").alias("Mean_PRCP")).first().Mean_PRCP
# Get station info
station_id = df.select("STATION").first().STATION
station_name = df.select("NAME").first().NAME
# Append results
annual_precipitation.append((station_id, station_name, year, mean_prcp))
# Create a DataFrame from the results
annual_precipitation_df = spark.createDataFrame(annual_precipitation, ["STATION", "NAME", "YEAR", "Mean_PRCP"])
# Find the year with the most precipitation for each station
cincinnati_max_prcp = annual_precipitation_df.filter(annual_precipitation_df.STATION == "72429793812") \
.orderBy(F.desc("Mean_PRCP")).first()
florida_max_prcp = annual_precipitation_df.filter(annual_precipitation_df.STATION == "99495199999") \
.orderBy(F.desc("Mean_PRCP")).first()
# Display the results
if cincinnati_max_prcp:
print(f"Cincinnati: STATION={cincinnati_max_prcp.STATION}, NAME={cincinnati_max_prcp.NAME}, YEAR={cincinnati_max_prcp.YEAR}, Mean of PRCP={cincinnati_max_prcp.Mean_PRCP}")
if florida_max_prcp:
print(f"Florida: STATION={florida_max_prcp.STATION}, NAME={florida_max_prcp.NAME}, YEAR={florida_max_prcp.YEAR}, Mean of PRCP={florida_max_prcp.Mean_PRCP}")
```
Cincinnati: STATION=72429793812, NAME=CINCINNATI MUNICIPAL AIRPORT LUNKEN FIELD, OH US, YEAR=2024, Mean of PRCP=5.546203389830502
Florida: STATION=99495199999, NAME=SEBASTIAN INLET STATE PARK, FL US, YEAR=2017, Mean of PRCP=0.0
Question 6: Count the percentage of missing values for wind gust (column GUST) for Cincinnati and Florida in the year 2024.
```python
from pyspark.sql import SparkSession
import os
# Initialize Spark session
spark = SparkSession.builder.appName("Wind Gust Missing Values").getOrCreate()
# Base path to the cleaned weather data
base_path = "./cleaned_weather_data/2024/"
# Station codes for Florida and Cincinnati
station_codes = ['99495199999', '72429793812'] # Florida, Cincinnati
results = []
# Loop through each station code
for station_code in station_codes:
file_path = os.path.join(base_path, f"{station_code}.csv")
# Load the CSV file if it exists
if os.path.exists(file_path):
df = spark.read.csv(file_path, header=True, inferSchema=True)
# Count total rows and missing values in the GUST column
total_count = df.count()
missing_count = df.filter(df.GUST == 999.9).count()
# Calculate the percentage of missing values
if total_count > 0:
missing_percentage = (missing_count / total_count) * 100
else:
missing_percentage = 0.0
# Store the result for this station
results.append((station_code, missing_percentage))
# Display the results
for station_code, missing_percentage in results:
print(f"Station Code: {station_code}, Missing GUST Percentage in the year 2024: {missing_percentage:.2f}%")
# Stop the Spark session
spark.stop()
```
Station Code: 99495199999, Missing GUST Percentage in the year 2024: 100.00%
Station Code: 72429793812, Missing GUST Percentage in the year 2024: 40.00%
Question 7: Find the mean, median, mode, and standard deviation of the temperature (column TEMP) for Cincinnati in each month for the year 2020.
```python
from pyspark.sql import SparkSession
from pyspark.sql.functions import mean, col, stddev, expr
from pyspark.sql import functions as F
# Initialize Spark session
spark = SparkSession.builder.appName("Temperature Analysis").getOrCreate()
# Load the data
df = spark.read.csv("./cleaned_weather_data/2020/72429793812.csv", header=True, inferSchema=True)
# Extract month from date (assuming there's a DATE column)
df_cincinnati = df.withColumn("MONTH", F.month(col("DATE")))
# Group by month and calculate statistics
result = df_cincinnati.groupBy("MONTH").agg(
mean("TEMP").alias("Mean"),
expr("percentile_approx(TEMP, 0.5)").alias("Median"), # Median
F.mode("TEMP").alias("Mode"), # Mode
stddev("TEMP").alias("Standard Deviation")
)
# Show results
result.orderBy("MONTH").show()
```
+-----+------------------+------+----+------------------+
|MONTH| Mean|Median|Mode|Standard Deviation|
+-----+------------------+------+----+------------------+
| 1| 37.94516129032259| 37.7|43.1| 8.345810873712928|
| 2| 36.5896551724138| 36.0|25.9| 7.90159770587055|
| 3| 49.0741935483871| 47.8|39.6| 8.779406500135623|
| 4|51.779999999999994| 51.0|48.4| 7.313162436838541|
| 5| 60.89032258064518| 63.7|73.9| 9.314768017820217|
| 6| 72.54666666666667| 73.7|74.2| 4.899946047087439|
| 7| 77.6| 77.9|72.5| 2.33794781806609|
| 8| 73.34516129032258| 73.7|73.2| 3.487868375734898|
| 9| 66.1| 65.8|60.6| 7.118262089331474|
| 10|55.193548387096776| 54.0|51.1| 6.72869157582517|
| 11|48.003333333333345| 47.7|47.7| 6.825938527529321|
| 12| 35.99354838709677| 35.2|32.1| 6.642787340861814|
+-----+------------------+------+----+------------------+
Question 8: Find the top 10 days with the lowest Wind Chill for Cincinnati in 2017.
```python
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, expr, unix_timestamp, date_format
# Initialize Spark session
spark = SparkSession.builder.appName("Wind Chill Analysis").getOrCreate()
# Load the data
df = spark.read.csv("./cleaned_weather_data/2017/72429793812.csv", header=True, inferSchema=True)
# Filter for TEMP < 50°F, and WDSP > 3 mph
df_cincinnati = df.filter((col("TEMP") < 50) & (col("WDSP") > 3))
# Calculate Wind Chill using the given formula
df_cincinnati = df_cincinnati.withColumn(
"Wind Chill",
35.74 + (0.6215 * col("TEMP")) - (35.75 * (col("WDSP") ** 0.16)) + (0.4275 * col("TEMP") * (col("WDSP") ** 0.16))
)
# Add a date column for sorting
# Assuming there's a DATE column, we format it to just keep the date part
df_cincinnati = df_cincinnati.withColumn("DATE", date_format("DATE", "yyyy-MM-dd"))
# Select relevant columns and sort by Wind Chill
result = df_cincinnati.select("DATE", "Wind Chill").orderBy("Wind Chill").limit(10)
# Show results
result.show()
```
+----------+-------------------+
| DATE| Wind Chill|
+----------+-------------------+
|2017-01-07|-0.4140156367932173|
|2017-12-31| 2.0339767075993116|
|2017-12-27| 3.820645509123832|
|2017-12-28| 4.533355269061226|
|2017-01-06| 4.868933041653884|
|2017-01-08| 7.929748208036862|
|2017-12-25| 14.285113218297408|
|2017-12-30| 14.539211253038193|
|2017-01-05| 14.748861828163854|
|2017-12-26| 15.688977805634499|
+----------+-------------------+
Question 9: Investigate how many days had extreme weather conditions for Florida (fog, rain, snow, etc.) using the FRSHTT column.
```python
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
import os
# Initialize a Spark session
spark = SparkSession.builder \
.appName("Extreme Weather Analysis for Florida") \
.getOrCreate()
# Define the directory containing cleaned weather data
base_directory = './cleaned_weather_data/'
file_paths = []
# Collect all relevant file paths for Florida
for year in range(2015, 2025): # Adjust the range as necessary
file_path = os.path.join(base_directory, str(year), '99495199999.csv')
if os.path.exists(file_path):
file_paths.append(file_path)
# Load all the CSV files into a single DataFrame
df = spark.read.csv(file_paths, header=True, inferSchema=True)
# Count the number of days with extreme weather conditions
extreme_weather_count = df.filter(col("FRSHTT") != 0).count()
# Show the result
print(f"Number of days with extreme weather conditions in Florida: {extreme_weather_count}")
# Stop the Spark session
spark.stop()
```
Number of days with extreme weather conditions in Florida: 0
Question 10: Predict the maximum Temperature for Cincinnati for November and December 2024, based on the previous 2 years of weather data.
```python
import os
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, dayofyear, month, max as spark_max, when
from pyspark.ml.regression import LinearRegression
from pyspark.ml.feature import VectorAssembler
# Initialize Spark session
spark = SparkSession.builder.appName("Weather Data Prediction").getOrCreate()
# Define base directory for your CSV files
base_directory = './cleaned_weather_data'
file_paths = []
# Collect file paths for relevant years (2022, 2023) for station "72429793812"
for year in [2022, 2023]:
file_path = os.path.join(base_directory, str(year), '72429793812.csv')
if os.path.exists(file_path):
file_paths.append(file_path)
# Load all the CSV files into a single DataFrame
historical_data = spark.read.csv(file_paths, header=True, inferSchema=True)
# Filter data for November and December (months 11 and 12) and for station "72429793812"
historical_df = historical_data.filter(
(col("STATION") == "72429793812") & (month("DATE").isin([11, 12]))
)
# Prepare the training data by adding the day of the year
training_data = historical_df.withColumn("DAY_OF_YEAR", dayofyear("DATE"))
# Assemble the features
assembler = VectorAssembler(inputCols=["DAY_OF_YEAR"], outputCol="features")
train_data = assembler.transform(training_data).select("features", col("MAX").alias("label"))
# Train the Linear Regression model
lr = LinearRegression()
lr_model = lr.fit(train_data)
# Prepare data for predicting for each day in November and December 2024 (days 305 to 365 of the year)
predictions_df = spark.createDataFrame(
[(day,) for day in range(305, 366)], ["DAY_OF_YEAR"]
)
# Transform the prediction data with the same assembler
predictions = assembler.transform(predictions_df)
# Generate predictions using the trained model
predicted_temps = lr_model.transform(predictions)
# Identify the maximum predicted temperature for November and December
max_predictions = predicted_temps.withColumn(
"MONTH", when(col("DAY_OF_YEAR") < 335, 11).otherwise(12)
).groupBy("MONTH").agg(spark_max("prediction").alias("Max Predicted Temp"))
# Show the maximum temperature predictions for November and December 2024
max_predictions.show()
# Stop the Spark session
spark.stop()
```
24/10/28 17:41:07 WARN Instrumentation: [51f9c0f7] regParam is zero, which might cause numerical instability and overfitting.
+-----+------------------+
|MONTH|Max Predicted Temp|
+-----+------------------+
| 11| 65.71804308370848|
| 12| 55.37627286049107|
+-----+------------------+
```python
```