-
Notifications
You must be signed in to change notification settings - Fork 6
/
Copy pathfind_sentinel_images.py
161 lines (136 loc) · 5.9 KB
/
find_sentinel_images.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
#!/usr/bin/env python2
# -*- coding: utf-8 -*-
"""
Created on Fri Oct 27 11:30:29 2017
@author: root
"""
from collections import OrderedDict
from datetime import datetime
import glob
import os
import re
import zipfile
from sentinelsat.sentinel import SentinelAPI, SentinelAPIError, read_geojson, geojson_to_wkt
def find_sentinel_images(area_of_interest, date_start, date_end, platform_name, user, password,
datastore_base_path, download_path,
hub_address="https://scihub.copernicus.eu/apihub",
area_relation="Intersects", limit_to_tiles=[], other_search_keywords={},
limit_to_scenes=[], download=True, silent=False):
def sprint(string):
if not silent:
print(string)
###################################
identifiers = []
products = {}
product_paths = []
sprint("Searching for scenes on "+hub_address)
sprint(date_start+" - "+date_end)
# search by polygon, time, and Hub query keywords
file_name = []
if limit_to_tiles:
file_name = ["*_" + limit_to_tiles[i] + "_*" for i in range(len(limit_to_tiles))]
file_name = file_name + limit_to_scenes
if len(file_name) == 0:
file_name = "*"
elif len(file_name) == 1:
file_name = file_name[0]
else:
file_name = " OR ".join(file_name)
file_name = "(" + file_name + ")"
footprint = geojson_to_wkt(read_geojson(area_of_interest))
products = _search_on_hub(user, password, hub_address, area=footprint,
area_relation=area_relation, date=(date_start, date_end),
platformname=platform_name, filename=file_name,
**other_search_keywords)
products = _remove_duplicate_acquisitions(products)
sprint("Found %i scenes" % len(products.keys()))
for k in products.keys():
identifiers.append(products[k]["identifier"])
sprint(products[k]["identifier"])
if not download:
return list(products.values())
##################################
# Then locate them in the IPT eodata store
sprint("Locating scenes in eodata store...")
for i, identifier in enumerate(identifiers):
path = _search_on_datastore(datastore_base_path, identifier)
# If they are not in the IPT eodata store (some S3 images are missing)
# then download them and store in the download directory in case they
# haven't been downloaded yet.
if not path:
if products:
product = products[list(products.keys())[i]]
else:
product = _search_on_hub(user, password, hub_address, filename=identifier)
if not product:
print("Product " + identifier + " does not exist and will not be downloaded!")
continue
sprint("Scene not found in eodata store, downloading from "+hub_address+"...")
path = _download_from_hub(product, download_path, user, password, hub_address, False)
if not path:
sprint("Could not download...")
continue
sprint(path)
product_paths.append(path)
return product_paths
def _search_on_hub(user, password, hub_address, **search_keywords):
# Connect to the hub and search
try:
print(SentinelAPI.format_query(**search_keywords))
hub = SentinelAPI(user, password, hub_address)
products = hub.query(**search_keywords)
except SentinelAPIError as e:
print(e)
print(SentinelAPI.format_query(**search_keywords))
products = {}
return products
def _search_on_datastore(datastore_base_path, product_identifier):
m = re.findall("_(\d{4})(\d{2})(\d{2})T(\d{6})_", product_identifier)[-1]
path = os.path.join(datastore_base_path, m[0], m[1], m[2], product_identifier+".*")
if glob.glob(path):
return path
else:
return None
def _download_from_hub(product, download_path, user, password,
hub_address="https://scihub.copernicus.eu/apihub", overwrite=False):
path = os.path.join(download_path, product["identifier"] + ".*")
if glob.glob(path) and not overwrite:
return glob.glob(path)[0]
else:
# Connect to the hub and download
try:
hub = SentinelAPI(user, password, hub_address)
p = hub.download(product["uuid"], download_path)
except SentinelAPIError as e:
print(e)
return ""
with zipfile.ZipFile(p["path"], "r") as z:
z.extractall(download_path)
os.remove(p["path"])
return glob.glob(path)[0]
# Sometimes multiple copies of the same product (acquisition image) are returned from sci-hub due
# to different processing dates. In that case keep only the product with the latest processing
# date.
def _remove_duplicate_acquisitions(products):
ingestion_dates = []
acquisitions = []
for product in products.values():
if product["platformname"] == "Sentinel-2":
match = re.match("(.*)(\d{4})(\d{2})(\d{2})T(\d{2})(\d{2})(\d{2})$", product["title"])
acquisitions.append(match.group(1))
else:
match = re.match("(.*\d{8}T\d{6}_\d{8}T\d{6}_)\d{8}T\d{6}(.*)", product["title"])
acquisitions.append(match.group(1)+match.group(2))
ingestion_dates.append(product["ingestiondate"])
unique_acquisitions = set(acquisitions)
keep_products = OrderedDict()
for acquisition in unique_acquisitions:
scene_index = [i for i, e in enumerate(acquisitions) if e == acquisition]
keep = None
latest_date = datetime(1, 1, 1)
for i in scene_index:
if ingestion_dates[i] > latest_date:
keep = i
latest_date = ingestion_dates[i]
keep_products[list(products.keys())[keep]] = list(products.values())[keep]
return keep_products