Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Quality of life improvements #195

Merged
merged 7 commits into from
Feb 2, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 0 additions & 4 deletions .flake8
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,4 @@ max-line-length=127

# List ignore rules one per line.
ignore =
E501
C901
W503
F401
F403
18 changes: 18 additions & 0 deletions .github/dependabot.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
version: 2
registries:
python-index-pypi-org:
type: python-index
url: https://pypi.org/
replaces-base: true
username: "${{secrets.PYTHON_INDEX_PYPI_ORG_USERNAME}}"
password: "${{secrets.PYTHON_INDEX_PYPI_ORG_PASSWORD}}"

updates:
- package-ecosystem: pip
directory: "/"
schedule:
interval: daily
time: "19:00"
open-pull-requests-limit: 10
registries:
- python-index-pypi-org
7 changes: 6 additions & 1 deletion ckanext/xloader/action.py
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,12 @@ def xloader_submit(context, data_dict):
'original_url': resource_dict.get('url'),
}
}
timeout = config.get('ckanext.xloader.job_timeout', '3600')
# Expand timeout for resources that have to be type-guessed
timeout = config.get(
'ckanext.xloader.job_timeout',
'3600' if utils.datastore_resource_exists(res_id) else '10800')
log.debug("Timeout for XLoading resource %s is %s", res_id, timeout)

try:
job = enqueue_job(
jobs.xloader_data_into_datastore, [data], rq_kwargs=dict(timeout=timeout)
Expand Down
3 changes: 1 addition & 2 deletions ckanext/xloader/command.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import sys
import logging
import ckan.plugins.toolkit as tk
from ckanext.xloader.utils import XLoaderFormats


class XloaderCmd:
Expand Down Expand Up @@ -84,8 +85,6 @@ def _submit_resource(self, resource, user, indent=0):
'''resource: resource dictionary
'''
indentation = ' ' * indent
# import here, so that that loggers are setup
from ckanext.xloader.plugin import XLoaderFormats

if not XLoaderFormats.is_it_an_xloader_format(resource['format']):
print(indentation
Expand Down
13 changes: 10 additions & 3 deletions ckanext/xloader/config_declaration.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,7 @@ groups:
default: 1_000_000_000
example: 100000
description: |
The connection string for the jobs database used by XLoader. The
default of an sqlite file is fine for development. For production use a
Postgresql database.
The maximum file size that XLoader will attempt to load.
type: int
required: false
- key: ckanext.xloader.use_type_guessing
Expand All @@ -48,6 +46,15 @@ groups:
type: bool
required: false
legacy_key: ckanext.xloader.just_load_with_messytables
- key: ckanext.xloader.max_type_guessing_length
default: 0
example: 100000
description: |
The maximum file size that will be passed to Tabulator if the
use_type_guessing flag is enabled. Larger files will use COPY even if
the flag is set. Defaults to 1/10 of the maximum content length.
type: int
required: false
- key: ckanext.xloader.parse_dates_dayfirst
default: False
example: False
Expand Down
19 changes: 19 additions & 0 deletions ckanext/xloader/helpers.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import ckan.plugins.toolkit as toolkit
from ckanext.xloader.utils import XLoaderFormats


def xloader_status(resource_id):
Expand All @@ -25,3 +26,21 @@ def xloader_status_description(status):
return captions.get(status['status'], status['status'].capitalize())
else:
return _('Not Uploaded Yet')


def is_resource_supported_by_xloader(res_dict, check_access=True):
is_supported_format = XLoaderFormats.is_it_an_xloader_format(res_dict.get('format'))
is_datastore_active = res_dict.get('datastore_active', False)
if check_access:
user_has_access = toolkit.h.check_access('package_update', {'id': res_dict.get('package_id')})
else:
user_has_access = True
url_type = res_dict.get('url_type')
if url_type:
try:
is_supported_url_type = url_type not in toolkit.h.datastore_rw_resource_url_types()
except AttributeError:
is_supported_url_type = (url_type == 'upload')
else:
is_supported_url_type = True
return (is_supported_format or is_datastore_active) and user_has_access and is_supported_url_type
25 changes: 14 additions & 11 deletions ckanext/xloader/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import tempfile
import json
import datetime
import os
import traceback
import sys

Expand All @@ -16,23 +17,26 @@
import sqlalchemy as sa

from ckan import model
from ckan.plugins.toolkit import get_action, asbool, ObjectNotFound, config, check_ckan_version
from ckan.plugins.toolkit import get_action, asbool, ObjectNotFound, config

from . import loader
from . import db
from . import db, loader
from .job_exceptions import JobError, HTTPError, DataTooBigError, FileCouldNotBeLoadedError
from .utils import set_resource_metadata
from .utils import datastore_resource_exists, set_resource_metadata

try:
from ckan.lib.api_token import get_user_from_token
except ImportError:
get_user_from_token = None

log = logging.getLogger(__name__)

SSL_VERIFY = asbool(config.get('ckanext.xloader.ssl_verify', True))
if not SSL_VERIFY:
requests.packages.urllib3.disable_warnings()

MAX_CONTENT_LENGTH = int(config.get('ckanext.xloader.max_content_length') or 1e9)
# Don't try Tabulator load on large files
MAX_TYPE_GUESSING_LENGTH = int(config.get('ckanext.xloader.max_type_guessing_length') or MAX_CONTENT_LENGTH / 10)
MAX_EXCERPT_LINES = int(config.get('ckanext.xloader.max_excerpt_lines') or 0)
CHUNK_SIZE = 16 * 1024 # 16kb
DOWNLOAD_TIMEOUT = 30
Expand Down Expand Up @@ -80,15 +84,13 @@ def xloader_data_into_datastore(input):
db.mark_job_as_errored(job_id, str(e))
job_dict['status'] = 'error'
job_dict['error'] = str(e)
log = logging.getLogger(__name__)
log.error('xloader error: {0}, {1}'.format(e, traceback.format_exc()))
errored = True
except Exception as e:
db.mark_job_as_errored(
job_id, traceback.format_tb(sys.exc_info()[2])[-1] + repr(e))
job_dict['status'] = 'error'
job_dict['error'] = str(e)
log = logging.getLogger(__name__)
log.error('xloader error: {0}, {1}'.format(e, traceback.format_exc()))
errored = True
finally:
Expand Down Expand Up @@ -206,11 +208,12 @@ def tabulator_load():
logger.info('Loading CSV')
# If ckanext.xloader.use_type_guessing is not configured, fall back to
# deprecated ckanext.xloader.just_load_with_messytables
use_type_guessing = asbool(config.get(
'ckanext.xloader.use_type_guessing', config.get(
'ckanext.xloader.just_load_with_messytables', False)))
logger.info("'use_type_guessing' mode is: %s",
use_type_guessing)
use_type_guessing = asbool(
config.get('ckanext.xloader.use_type_guessing', config.get(
'ckanext.xloader.just_load_with_messytables', False))) \
and not datastore_resource_exists(resource['id']) \
and os.path.getsize(tmp_file.name) <= MAX_TYPE_GUESSING_LENGTH
logger.info("'use_type_guessing' mode is: %s", use_type_guessing)
try:
if use_type_guessing:
tabulator_load()
Expand Down
109 changes: 82 additions & 27 deletions ckanext/xloader/loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,16 +9,16 @@
from decimal import Decimal

import psycopg2
from chardet.universaldetector import UniversalDetector
from six.moves import zip
from tabulator import Stream, TabulatorException
from tabulator import config as tabulator_config, EncodingError, Stream, TabulatorException
from unidecode import unidecode

import ckan.plugins as p
import ckan.plugins.toolkit as tk

from .job_exceptions import FileCouldNotBeLoadedError, LoaderError
from .parser import XloaderCSVParser
from .utils import headers_guess, type_guess
from .parser import CSV_SAMPLE_LINES, TypeConverter
from .utils import datastore_resource_exists, headers_guess, type_guess

from ckan.plugins.toolkit import config

Expand All @@ -29,20 +29,69 @@
_drop_indexes = datastore_db._drop_indexes

MAX_COLUMN_LENGTH = 63
tabulator_config.CSV_SAMPLE_LINES = CSV_SAMPLE_LINES

SINGLE_BYTE_ENCODING = 'cp1252'


class UnknownEncodingStream(object):
""" Provides a context manager that wraps a Tabulator stream
and tries multiple encodings if one fails.

This is particularly relevant in cases like Latin-1 encoding,
which is usually ASCII and thus the sample could be sniffed as UTF-8,
only to run into problems later in the file.
"""

def __init__(self, filepath, file_format, decoding_result, **kwargs):
self.filepath = filepath
self.file_format = file_format
self.stream_args = kwargs
self.decoding_result = decoding_result # {'encoding': 'EUC-JP', 'confidence': 0.99}

def __enter__(self):
try:

if (self.decoding_result and self.decoding_result['confidence'] and self.decoding_result['confidence'] > 0.7):
self.stream = Stream(self.filepath, format=self.file_format, encoding=self.decoding_result['encoding'],
** self.stream_args).__enter__()
else:
self.stream = Stream(self.filepath, format=self.file_format, ** self.stream_args).__enter__()

except (EncodingError, UnicodeDecodeError):
self.stream = Stream(self.filepath, format=self.file_format,
encoding=SINGLE_BYTE_ENCODING, **self.stream_args).__enter__()
return self.stream

def __exit__(self, *args):
return self.stream.__exit__(*args)


def detect_encoding(file_path):
detector = UniversalDetector()
with open(file_path, 'rb') as file:
for line in file:
detector.feed(line)
if detector.done:
break
detector.close()
return detector.result # e.g. {'encoding': 'EUC-JP', 'confidence': 0.99}


def load_csv(csv_filepath, resource_id, mimetype='text/csv', logger=None):
'''Loads a CSV into DataStore. Does not create the indexes.'''

decoding_result = detect_encoding(csv_filepath)
logger.info("load_csv: Decoded encoding: %s", decoding_result)
# Determine the header row
try:
file_format = os.path.splitext(csv_filepath)[1].strip('.')
with Stream(csv_filepath, format=file_format) as stream:
with UnknownEncodingStream(csv_filepath, file_format, decoding_result) as stream:
header_offset, headers = headers_guess(stream.sample)
except TabulatorException:
try:
file_format = mimetype.lower().split('/')[-1]
with Stream(csv_filepath, format=file_format) as stream:
with UnknownEncodingStream(csv_filepath, file_format, decoding_result) as stream:
header_offset, headers = headers_guess(stream.sample)
except TabulatorException as e:
raise LoaderError('Tabulator error: {}'.format(e))
Expand Down Expand Up @@ -73,10 +122,16 @@ def load_csv(csv_filepath, resource_id, mimetype='text/csv', logger=None):
logger.info('Ensuring character coding is UTF8')
f_write = tempfile.NamedTemporaryFile(suffix=file_format, delete=False)
try:
with Stream(csv_filepath, format=file_format, skip_rows=skip_rows) as stream:
stream.save(target=f_write.name, format='csv', encoding='utf-8',
delimiter=delimiter)
csv_filepath = f_write.name
save_args = {'target': f_write.name, 'format': 'csv', 'encoding': 'utf-8', 'delimiter': delimiter}
try:
with UnknownEncodingStream(csv_filepath, file_format, decoding_result,
skip_rows=skip_rows) as stream:
stream.save(**save_args)
except (EncodingError, UnicodeDecodeError):
with Stream(csv_filepath, format=file_format, encoding=SINGLE_BYTE_ENCODING,
skip_rows=skip_rows) as stream:
stream.save(**save_args)
csv_filepath = f_write.name

# datastore db connection
engine = get_write_engine()
Expand Down Expand Up @@ -235,16 +290,18 @@ def load_table(table_filepath, resource_id, mimetype='text/csv', logger=None):

# Determine the header row
logger.info('Determining column names and types')
decoding_result = detect_encoding(table_filepath)
logger.info("load_table: Decoded encoding: %s", decoding_result)
try:
file_format = os.path.splitext(table_filepath)[1].strip('.')
with Stream(table_filepath, format=file_format,
custom_parsers={'csv': XloaderCSVParser}) as stream:
with UnknownEncodingStream(table_filepath, file_format, decoding_result,
post_parse=[TypeConverter().convert_types]) as stream:
header_offset, headers = headers_guess(stream.sample)
except TabulatorException:
try:
file_format = mimetype.lower().split('/')[-1]
with Stream(table_filepath, format=file_format,
custom_parsers={'csv': XloaderCSVParser}) as stream:
with UnknownEncodingStream(table_filepath, file_format, decoding_result,
post_parse=[TypeConverter().convert_types]) as stream:
header_offset, headers = headers_guess(stream.sample)
except TabulatorException as e:
raise LoaderError('Tabulator error: {}'.format(e))
Expand Down Expand Up @@ -279,9 +336,11 @@ def load_table(table_filepath, resource_id, mimetype='text/csv', logger=None):
for t, h in zip(types, headers)]

headers = [header.strip()[:MAX_COLUMN_LENGTH] for header in headers if header.strip()]
type_converter = TypeConverter(types=types)

with Stream(table_filepath, format=file_format, skip_rows=skip_rows,
custom_parsers={'csv': XloaderCSVParser}) as stream:
with UnknownEncodingStream(table_filepath, file_format, decoding_result,
skip_rows=skip_rows,
post_parse=[type_converter.convert_types]) as stream:
def row_iterator():
for row in stream:
data_row = {}
Expand Down Expand Up @@ -318,9 +377,16 @@ def row_iterator():

logger.info('Copying to database...')
count = 0
# Some types cannot be stored as empty strings and must be converted to None,
# https://github.com/ckan/ckanext-xloader/issues/182
non_empty_types = ['timestamp', 'numeric']
for i, records in enumerate(chunky(result, 250)):
count += len(records)
logger.info('Saving chunk {number}'.format(number=i))
for row in records:
for column_index, column_name in enumerate(row):
if headers_dicts[column_index]['type'] in non_empty_types and row[column_name] == '':
row[column_name] = None
send_resource_to_datastore(resource_id, headers_dicts, records)
logger.info('...copying done')

Expand Down Expand Up @@ -395,17 +461,6 @@ def send_resource_to_datastore(resource_id, headers, records):
.format(str(e)))


def datastore_resource_exists(resource_id):
from ckan import model
context = {'model': model, 'ignore_auth': True}
try:
response = p.toolkit.get_action('datastore_search')(context, dict(
id=resource_id, limit=0))
except p.toolkit.ObjectNotFound:
return False
return response or {'fields': []}


def delete_datastore_resource(resource_id):
from ckan import model
context = {'model': model, 'user': '', 'ignore_auth': True}
Expand Down
Loading