Skip to content

Commit

Permalink
Merge pull request #252 from raphaelrpl/release-0.8
Browse files Browse the repository at this point in the history
Fix build error and snippet for ssl disable
  • Loading branch information
raphaelrpl authored Apr 18, 2022
2 parents 33bb5f6 + 46f5f62 commit 0cbe2c9
Show file tree
Hide file tree
Showing 10 changed files with 178 additions and 53 deletions.
8 changes: 6 additions & 2 deletions CHANGES.rst
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,18 @@
Changes
=======

Version 0.8.3 (2022-04-18)
--------------------------

- Fix build error and snippet for ssl disable.


Version 0.8.2 (2022-03-25)
--------------------------

- Fix dependency deprecation git protocol for pip
- Fix MODIS nodata publishing
- Add support to use custom parameters in Catalog
- Publish Sentinel-2 PVI as quicklook
-

Version 0.8.1 (2021-05-07)
--------------------------
Expand Down
3 changes: 3 additions & 0 deletions bdc_collection_builder/celery/publish.py
Original file line number Diff line number Diff line change
Expand Up @@ -498,4 +498,7 @@ def publish_collection(scene_id: str, data: BaseCollection, collection: Collecti
if not destination_file.exists():
shutil.move(str(old_file_path), str(destination))

logging.info(f'Cleaning up {temporary_dir.name}')
shutil.rmtree(temporary_dir.name)

return item
48 changes: 29 additions & 19 deletions bdc_collection_builder/celery/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,9 @@
from flask import current_app as flask_app

from ..collections.models import RadcorActivity, RadcorActivityHistory
from ..collections.processor import sen2cor
from ..collections.utils import (get_or_create_model, get_provider,
is_valid_compressed_file, post_processing)
is_valid_compressed_file, post_processing, safe_request)
from ..config import Config
from .publish import get_item_path, publish_collection

Expand Down Expand Up @@ -151,8 +152,10 @@ def download(activity: dict, **kwargs):
catalog_name = activity['args']['catalog']
catalog_args.update(parallel=True, progress=False, lazy=True)

provider = get_provider(catalog=catalog_name, **catalog_args)
download_order = [provider]
provider, collector = get_provider(catalog=catalog_name, **catalog_args)
setattr(collector, 'instance', provider)
setattr(collector, 'provider_name', f'{provider.name} (CUSTOM)')
download_order = [collector]
else:
# Use parallel flag for providers which has number maximum of connections per client (Sentinel-Hub only)
download_order = collector_extension.get_provider_order(collection, lazy=True, parallel=True, progress=False,
Expand Down Expand Up @@ -182,9 +185,9 @@ def download(activity: dict, **kwargs):
Item.name == scene_id
).first()

if item:
if item and item.assets.get('asset'):
# TODO: Get asset name of download file
item_path = item.assets['asset']['href']
item_path = item.assets['asset'].get('href', '')
item_path = item_path if not item_path.startswith('/') else item_path[1:]
item_path = Path(prefix) / item_path

Expand Down Expand Up @@ -215,7 +218,9 @@ def download(activity: dict, **kwargs):
for collector in download_order:
try:
logging.info(f'Trying to download from {collector.provider_name}(id={collector.instance.id})')
temp_file = Path(collector.download(scene_id, output=tmp, dataset=activity['args']['dataset']))

with safe_request():
temp_file = Path(collector.download(scene_id, output=tmp, dataset=activity['args']['dataset']))

activity['args']['provider_id'] = collector.instance.id

Expand All @@ -231,6 +236,9 @@ def download(activity: dict, **kwargs):
raise RuntimeError(f'Download fails {activity["sceneid"]}.')

shutil.move(str(temp_file), str(download_file))
if tmp and Path(tmp).exists():
logging.info(f'Cleaning up {tmp}')
shutil.rmtree(tmp)

refresh_execution_args(execution, activity, compressed_file=str(download_file))

Expand All @@ -248,6 +256,7 @@ def correction(activity: dict, collection_id=None, **kwargs):
logging.info(f'Starting Correction Task for {collection.name}(id={collection.id}, scene_id={scene_id})')

data_collection = get_provider_collection_from_activity(activity)
tmp = None

try:
output_path = data_collection.path(collection, prefix=Config.PUBLISH_DATA_DIR)
Expand Down Expand Up @@ -293,15 +302,12 @@ def correction(activity: dict, collection_id=None, **kwargs):
logging.info(f'Removing {str(output_path_entry)} sen2cor file before.')
output_path_entry.unlink()

sen2cor_conf = Config.SEN2COR_CONFIG
logging.info(f'Using {entry} of sceneid {scene_id}')
# TODO: Use custom sen2cor version (2.5 or 2.8)
cmd = f'''docker run --rm -i \
-v $INDIR:/mnt/input-dir \
-v $OUTDIR:/mnt/output-dir \
-v {sen2cor_conf["SEN2COR_AUX_DIR"]}:/home/lib/python2.7/site-packages/sen2cor/aux_data \
{container_workdir} {sen2cor_conf["SEN2COR_DOCKER_IMAGE"]} {entry}'''
env['OUTDIR'] = str(Path(tmp) / 'output')

sen2cor(scene_id, input_dir=str(tmp), output_dir=env['OUTDIR'],
docker_container_work_dir=container_workdir.split(' '), **env)

logging.info(f'Using {entry} of sceneid {scene_id}')
else:
lasrc_conf = Config.LASRC_CONFIG

Expand All @@ -314,13 +320,13 @@ def correction(activity: dict, collection_id=None, **kwargs):
-v {lasrc_conf["LEDAPS_AUX_DIR"]}:/mnt/ledaps-aux:ro \
{container_workdir} {lasrc_conf["LASRC_DOCKER_IMAGE"]} {entry}'''

logging.debug(cmd)
logging.debug(cmd)

# Execute command line
process = subprocess.Popen(cmd, shell=True, env=env, stdin=subprocess.PIPE)
process.wait()
# Execute command line
process = subprocess.Popen(cmd, shell=True, env=env, stdin=subprocess.PIPE)
process.wait()

assert process.returncode == 0
assert process.returncode == 0

# TODO: We should be able to get output name from execution
if processor_name.lower() == 'sen2cor':
Expand All @@ -338,6 +344,10 @@ def correction(activity: dict, collection_id=None, **kwargs):
except Exception as e:
logging.error(f'Error in correction {scene_id} - {str(e)}', exc_info=True)
raise e
finally:
if tmp and Path(tmp).exists():
logging.info(f'Cleaning up {tmp}')
shutil.rmtree(tmp)

return activity

Expand Down
53 changes: 53 additions & 0 deletions bdc_collection_builder/collections/processor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
import logging
import subprocess
from pathlib import Path
from typing import Optional

from ..config import Config


def sen2cor(scene_id: str, input_dir: str, output_dir: str,
docker_container_work_dir: list, version: Optional[str] = None, **env):
if version is not None:
version_minor = '.'.join(version.split('.')[:-1])
args = [
'docker', 'run', '--rm', '-i',
'--name', scene_id,
'-v', f'{input_dir}:/mnt/input_dir',
'-v', f'{output_dir}:/mnt/output_dir',
'-v', f'{Config.SEN2COR_CONFIG["SEN2COR_DIR"]}/CCI4SEN2COR:/mnt/aux_data',
'-v', f'{Config.SEN2COR_CONFIG["SEN2COR_DIR"]}/{version_minor}/cfg/L2A_GIPP.xml:/opt/sen2cor/{version}/cfg/L2A_GIPP.xml',
*docker_container_work_dir,
f'{Config.SEN2COR_CONFIG["SEN2COR_DOCKER_IMAGE"]}:{version}',
f'{scene_id}.SAFE'
]

logging.info(f'Using Sen2Cor {version}')

process = subprocess.Popen(args, env=env, stdin=subprocess.PIPE)
process.wait()

if process.returncode != 0:
raise RuntimeError(f'Could not execute Sen2Cor using {version}')

output_tmp = list(Path(output_dir).iterdir())[0]

output_path = Path(output_dir) / output_tmp.name

return output_path

def _safe_execute(*args, **kwargs):
try:
return sen2cor(*args, **kwargs), None
except RuntimeError as e:
return None, e

versions_supported = ['2.10.0', '2.8.0', '2.5.5']

err = None
for version in versions_supported:
out, err = _safe_execute(scene_id, input_dir, output_dir, docker_container_work_dir, version=version, **env)
if out:
return out

raise RuntimeError(f'Could not execute Sen2Cor using {versions_supported} - {err}')
46 changes: 46 additions & 0 deletions bdc_collection_builder/collections/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,19 @@


# Python Native
import contextlib
import datetime
import logging
import shutil
import tarfile
import warnings
from json import loads as json_parser
from os import path as resource_path
from os import remove as resource_remove
from pathlib import Path
from tempfile import TemporaryDirectory
from typing import List, Tuple
from urllib3.exceptions import InsecureRequestWarning
from zipfile import BadZipfile, ZipFile
from zlib import error as zlib_error

Expand All @@ -29,6 +32,8 @@
import rasterio
import rasterio.features
import rasterio.warp
import requests

import shapely
import shapely.geometry
from bdc_catalog.models import Band, Collection, Provider, db
Expand Down Expand Up @@ -540,3 +545,44 @@ def is_sen2cor(collection: Collection) -> bool:
return True

return False


_settings = requests.Session.merge_environment_settings


@contextlib.contextmanager
def safe_request():
"""Define a decorator to disable any SSL Certificate Validation while requesting data.
This snippet was adapted from https://stackoverflow.com/questions/15445981/how-do-i-disable-the-security-certificate-check-in-python-requests.
"""
opened_adapters = set()

if not Config.DISABLE_SSL:
yield

logging.info('Disabling SSL validation')

def _merge_environment_settings(self, url, proxies, stream, verify, cert):
"""Stack the opened contexts into heap and set all the active adapters with verify=False."""
opened_adapters.add(self.get_adapter(url))

settings = _settings(self, url, proxies, stream, verify, cert)
settings['verify'] = False

return settings

requests.Session.merge_environment_settings = _merge_environment_settings

try:
with warnings.catch_warnings():
warnings.simplefilter('ignore', InsecureRequestWarning)
yield
finally:
requests.Session.merge_environment_settings = _settings

for adapter in opened_adapters:
try:
adapter.close()
except:
pass
4 changes: 4 additions & 0 deletions bdc_collection_builder/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ class Config:
)
# Sen2Cor/Fmask Processor
SEN2COR_CONFIG = dict(
SEN2COR_DIR=os.getenv('SEN2COR_DIR', '/data/auxiliaries/sen2cor'),
SEN2COR_DOCKER_IMAGE=os.getenv('SEN2COR_DOCKER_IMAGE', 'registry.dpi.inpe.br/brazildatacube/sen2cor:2.8.0'),
SEN2COR_AUX_DIR=os.getenv('SEN2COR_AUX_DIR', '/data/auxiliaries/sen2cor/CCI4SEN2COR'),
SEN2COR_CONFIG_DIR=os.getenv('SEN2COR_CONFIG_DIR', '/data/auxiliaries/sen2cor/config/2.8'),
Expand Down Expand Up @@ -78,6 +79,9 @@ class Config:
# The optional directory where published collections will be stored (Default is DATA_DIR)
PUBLISH_DATA_DIR = os.environ.get('PUBLISH_DATA_DIR', DATA_DIR)

# Disable any entry related requests and SSL validation.
DISABLE_SSL = strtobool(os.getenv('DISABLE_SSL', 'YES'))

TASK_RETRY_DELAY = int(os.environ.get('TASK_RETRY_DELAY', 60 * 60)) # a hour

CELERYD_PREFETCH_MULTIPLIER = int(os.environ.get('CELERYD_PREFETCH_MULTIPLIER', 4)) # disable
Expand Down
51 changes: 26 additions & 25 deletions bdc_collection_builder/controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
from .celery.tasks import correction, download, harmonization, post, publish
from .collections.models import (ActivitySRC, RadcorActivity,
RadcorActivityHistory, db)
from .collections.utils import get_or_create_model, get_provider
from .collections.utils import get_or_create_model, get_provider, safe_request
from .forms import CollectionForm, RadcorActivityForm, SimpleActivityForm


Expand Down Expand Up @@ -247,39 +247,40 @@ def radcor(cls, args: dict):
try:
catalog_provider, provider = get_provider(catalog=args['catalog'], **catalog_args)

if 'scenes' in args:
result = []
with safe_request():
if 'scenes' in args:
result = []

unique_scenes = set(args['scenes'])
unique_scenes = set(args['scenes'])

for scene in unique_scenes:
query_result = provider.search(
query=args['dataset'],
filename=f'{scene}*',
**options
)
for scene in unique_scenes:
query_result = provider.search(
query=args['dataset'],
filename=f'{scene}*',
**options
)

result.extend(query_result)
elif 'tiles' in args:
result = []
for tile in args['tiles']:
query_result = provider.search(
result.extend(query_result)
elif 'tiles' in args:
result = []
for tile in args['tiles']:
query_result = provider.search(
query=args['dataset'],
tile=tile,
start_date=args['start'],
end_date=args['end'],
cloud_cover=cloud,
**options
)
result.extend(query_result)
else:
result = provider.search(
query=args['dataset'],
tile=tile,
start_date=args['start'],
end_date=args['end'],
cloud_cover=cloud,
**options
)
result.extend(query_result)
else:
result = provider.search(
query=args['dataset'],
start_date=args['start'],
end_date=args['end'],
cloud_cover=cloud,
**options
)

def _recursive(scene, task, parent=None, parallel=True, pass_args=True):
"""Create task dispatcher recursive."""
Expand Down
2 changes: 1 addition & 1 deletion bdc_collection_builder/version.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,4 @@
"""


__version__ = '0.8.2'
__version__ = '0.8.3'
1 change: 1 addition & 0 deletions docker/Dockerfile.atm
Original file line number Diff line number Diff line change
Expand Up @@ -28,4 +28,5 @@ WORKDIR /app

RUN pip3 install -U pip && \
pip3 install wheel && \
pip3 install "Flask<2.1" "numpy==1.17.4" "imageio==2.10.3" && \
pip3 install -e .
Loading

0 comments on commit 0cbe2c9

Please sign in to comment.