Skip to content

Commit

Permalink
Merge pull request #195 from qld-gov-au/upstream_updates
Browse files Browse the repository at this point in the history
Quality of life improvements
  • Loading branch information
ThrawnCA authored Feb 2, 2024
2 parents 959dfd2 + 6eb5658 commit 2b26209
Show file tree
Hide file tree
Showing 22 changed files with 584 additions and 224 deletions.
18 changes: 18 additions & 0 deletions .github/dependabot.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
version: 2
registries:
python-index-pypi-org:
type: python-index
url: https://pypi.org/
replaces-base: true
username: "${{secrets.PYTHON_INDEX_PYPI_ORG_USERNAME}}"
password: "${{secrets.PYTHON_INDEX_PYPI_ORG_PASSWORD}}"

updates:
- package-ecosystem: pip
directory: "/"
schedule:
interval: daily
time: "19:00"
open-pull-requests-limit: 10
registries:
- python-index-pypi-org
6 changes: 6 additions & 0 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,12 @@ This setting is shared with other plugins that download resource files, such as

ckan.download_proxy = http://my-proxy:1234/

You may also wish to configure the database to use your preferred date input style on COPY.
For example, to make [PostgreSQL](https://www.postgresql.org/docs/current/runtime-config-client.html#RUNTIME-CONFIG-CLIENT-FORMAT)
expect European (day-first) dates, you could add to ``postgresql.conf``:

datestyle=ISO,DMY

------------------------
Developer installation
------------------------
Expand Down
11 changes: 9 additions & 2 deletions ckanext/xloader/action.py
Original file line number Diff line number Diff line change
Expand Up @@ -152,10 +152,17 @@ def xloader_submit(context, data_dict):
'original_url': resource_dict.get('url'),
}
}
timeout = config.get('ckanext.xloader.job_timeout', '3600')
# Expand timeout for resources that have to be type-guessed
timeout = config.get(
'ckanext.xloader.job_timeout',
'3600' if utils.datastore_resource_exists(res_id) else '10800')
log.debug("Timeout for XLoading resource %s is %s", res_id, timeout)

try:
job = enqueue_job(
jobs.xloader_data_into_datastore, [data], rq_kwargs=dict(timeout=timeout)
jobs.xloader_data_into_datastore, [data],
title="xloader_submit: package: {} resource: {}".format(resource_dict.get('package_id'), res_id),
rq_kwargs=dict(timeout=timeout)
)
except Exception:
log.exception('Unable to enqueued xloader res_id=%s', res_id)
Expand Down
13 changes: 10 additions & 3 deletions ckanext/xloader/config_declaration.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,7 @@ groups:
default: 1_000_000_000
example: 100000
description: |
The connection string for the jobs database used by XLoader. The
default of an sqlite file is fine for development. For production use a
Postgresql database.
The maximum file size that XLoader will attempt to load.
type: int
required: false
- key: ckanext.xloader.use_type_guessing
Expand All @@ -48,6 +46,15 @@ groups:
type: bool
required: false
legacy_key: ckanext.xloader.just_load_with_messytables
- key: ckanext.xloader.max_type_guessing_length
default: 0
example: 100000
description: |
The maximum file size that will be passed to Tabulator if the
use_type_guessing flag is enabled. Larger files will use COPY even if
the flag is set. Defaults to 1/10 of the maximum content length.
type: int
required: false
- key: ckanext.xloader.parse_dates_dayfirst
default: False
example: False
Expand Down
10 changes: 1 addition & 9 deletions ckanext/xloader/db.py
Original file line number Diff line number Diff line change
Expand Up @@ -191,9 +191,7 @@ def add_pending_job(job_id, job_type, api_key,
if not metadata:
metadata = {}

conn = ENGINE.connect()
trans = conn.begin()
try:
with ENGINE.begin() as conn:
conn.execute(JOBS_TABLE.insert().values(
job_id=job_id,
job_type=job_type,
Expand Down Expand Up @@ -225,12 +223,6 @@ def add_pending_job(job_id, job_type, api_key,
)
if inserts:
conn.execute(METADATA_TABLE.insert(), inserts)
trans.commit()
except Exception:
trans.rollback()
raise
finally:
conn.close()


class InvalidErrorObjectError(Exception):
Expand Down
14 changes: 9 additions & 5 deletions ckanext/xloader/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,17 @@ def xloader_status_description(status):
return _('Not Uploaded Yet')


def is_resource_supported_by_xloader(res_dict, check_access = True):
def is_resource_supported_by_xloader(res_dict, check_access=True):
is_supported_format = XLoaderFormats.is_it_an_xloader_format(res_dict.get('format'))
is_datastore_active = res_dict.get('datastore_active', False)
user_has_access = not check_access or toolkit.h.check_access('package_update',
{'id':res_dict.get('package_id')})
try:
is_supported_url_type = res_dict.get('url_type') not in toolkit.h.datastore_rw_resource_url_types()
except AttributeError:
is_supported_url_type = (res_dict.get('url_type') == 'upload' or not res_dict.get('url_type'))
url_type = res_dict.get('url_type')
if url_type:
try:
is_supported_url_type = url_type not in toolkit.h.datastore_rw_resource_url_types()
except AttributeError:
is_supported_url_type = (url_type == 'upload')
else:
is_supported_url_type = True
return (is_supported_format or is_datastore_active) and user_has_access and is_supported_url_type
33 changes: 19 additions & 14 deletions ckanext/xloader/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import tempfile
import json
import datetime
import os
import traceback
import sys

Expand All @@ -21,7 +22,7 @@

from . import db, loader
from .job_exceptions import JobError, HTTPError, DataTooBigError, FileCouldNotBeLoadedError
from .utils import set_resource_metadata
from .utils import datastore_resource_exists, set_resource_metadata

try:
from ckan.lib.api_token import get_user_from_token
Expand All @@ -35,10 +36,13 @@
requests.packages.urllib3.disable_warnings()

MAX_CONTENT_LENGTH = int(config.get('ckanext.xloader.max_content_length') or 1e9)
# Don't try Tabulator load on large files
MAX_TYPE_GUESSING_LENGTH = int(config.get('ckanext.xloader.max_type_guessing_length') or MAX_CONTENT_LENGTH / 10)
MAX_EXCERPT_LINES = int(config.get('ckanext.xloader.max_excerpt_lines') or 0)
CHUNK_SIZE = 16 * 1024 # 16kb
DOWNLOAD_TIMEOUT = 30

MAX_RETRIES = 1
RETRYABLE_ERRORS = (
errors.DeadlockDetected,
errors.LockNotAvailable,
Expand Down Expand Up @@ -89,18 +93,21 @@ def xloader_data_into_datastore(input):
db.mark_job_as_errored(job_id, str(e))
job_dict['status'] = 'error'
job_dict['error'] = str(e)
log.error('xloader error: {0}, {1}'.format(e, traceback.format_exc()))
log.error('xloader error: %s, %s', e, traceback.format_exc())
errored = True
except Exception as e:
if isinstance(e, RETRYABLE_ERRORS):
tries = job_dict['metadata'].get('tries', 0)
if tries == 0:
if tries < MAX_RETRIES:
tries = tries + 1
log.info("Job %s failed due to temporary error [%s], retrying", job_id, e)
job_dict['status'] = 'pending'
job_dict['metadata']['tries'] = tries + 1
job_dict['metadata']['tries'] = tries
enqueue_job(
xloader_data_into_datastore,
[input],
title="retry xloader_data_into_datastore: resource: {} attempt {}".format(
job_dict['metadata']['resource_id'], tries),
rq_kwargs=dict(timeout=RETRIED_JOB_TIMEOUT)
)
return None
Expand All @@ -109,7 +116,7 @@ def xloader_data_into_datastore(input):
job_id, traceback.format_tb(sys.exc_info()[2])[-1] + repr(e))
job_dict['status'] = 'error'
job_dict['error'] = str(e)
log.error('xloader error: {0}, {1}'.format(e, traceback.format_exc()))
log.error('xloader error: %s, %s', e, traceback.format_exc())
errored = True
finally:
# job_dict is defined in xloader_hook's docstring
Expand Down Expand Up @@ -226,11 +233,12 @@ def tabulator_load():
logger.info('Loading CSV')
# If ckanext.xloader.use_type_guessing is not configured, fall back to
# deprecated ckanext.xloader.just_load_with_messytables
use_type_guessing = asbool(config.get(
'ckanext.xloader.use_type_guessing', config.get(
'ckanext.xloader.just_load_with_messytables', False)))
logger.info("'use_type_guessing' mode is: %s",
use_type_guessing)
use_type_guessing = asbool(
config.get('ckanext.xloader.use_type_guessing', config.get(
'ckanext.xloader.just_load_with_messytables', False))) \
and not datastore_resource_exists(resource['id']) \
and os.path.getsize(tmp_file.name) <= MAX_TYPE_GUESSING_LENGTH
logger.info("'use_type_guessing' mode is: %s", use_type_guessing)
try:
if use_type_guessing:
tabulator_load()
Expand Down Expand Up @@ -558,8 +566,7 @@ def __init__(self, task_id, input):
self.input = input

def emit(self, record):
conn = db.ENGINE.connect()
try:
with db.ENGINE.connect() as conn:
# Turn strings into unicode to stop SQLAlchemy
# "Unicode type received non-unicode bind param value" warnings.
message = str(record.getMessage())
Expand All @@ -575,8 +582,6 @@ def emit(self, record):
module=module,
funcName=funcName,
lineno=record.lineno))
finally:
conn.close()


class DatetimeJsonEncoder(json.JSONEncoder):
Expand Down
100 changes: 74 additions & 26 deletions ckanext/xloader/loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,16 @@
from decimal import Decimal

import psycopg2
from chardet.universaldetector import UniversalDetector
from six.moves import zip
from tabulator import config as tabulator_config, Stream, TabulatorException
from tabulator import config as tabulator_config, EncodingError, Stream, TabulatorException
from unidecode import unidecode

import ckan.plugins as p

from .job_exceptions import FileCouldNotBeLoadedError, LoaderError
from .parser import CSV_SAMPLE_LINES, XloaderCSVParser
from .utils import headers_guess, type_guess
from .parser import CSV_SAMPLE_LINES, TypeConverter
from .utils import datastore_resource_exists, headers_guess, type_guess

from ckan.plugins.toolkit import config

Expand All @@ -30,6 +31,52 @@
MAX_COLUMN_LENGTH = 63
tabulator_config.CSV_SAMPLE_LINES = CSV_SAMPLE_LINES

SINGLE_BYTE_ENCODING = 'cp1252'


class UnknownEncodingStream(object):
""" Provides a context manager that wraps a Tabulator stream
and tries multiple encodings if one fails.
This is particularly relevant in cases like Latin-1 encoding,
which is usually ASCII and thus the sample could be sniffed as UTF-8,
only to run into problems later in the file.
"""

def __init__(self, filepath, file_format, decoding_result, **kwargs):
self.filepath = filepath
self.file_format = file_format
self.stream_args = kwargs
self.decoding_result = decoding_result # {'encoding': 'EUC-JP', 'confidence': 0.99}

def __enter__(self):
try:

if (self.decoding_result and self.decoding_result['confidence'] and self.decoding_result['confidence'] > 0.7):
self.stream = Stream(self.filepath, format=self.file_format, encoding=self.decoding_result['encoding'],
** self.stream_args).__enter__()
else:
self.stream = Stream(self.filepath, format=self.file_format, ** self.stream_args).__enter__()

except (EncodingError, UnicodeDecodeError):
self.stream = Stream(self.filepath, format=self.file_format,
encoding=SINGLE_BYTE_ENCODING, **self.stream_args).__enter__()
return self.stream

def __exit__(self, *args):
return self.stream.__exit__(*args)


def detect_encoding(file_path):
detector = UniversalDetector()
with open(file_path, 'rb') as file:
for line in file:
detector.feed(line)
if detector.done:
break
detector.close()
return detector.result # e.g. {'encoding': 'EUC-JP', 'confidence': 0.99}


def _fields_match(fields, existing_fields, logger):
''' Check whether all columns have the same names and types as previously,
Expand Down Expand Up @@ -77,15 +124,17 @@ def _clear_datastore_resource(resource_id):
def load_csv(csv_filepath, resource_id, mimetype='text/csv', logger=None):
'''Loads a CSV into DataStore. Does not create the indexes.'''

decoding_result = detect_encoding(csv_filepath)
logger.info("load_csv: Decoded encoding: %s", decoding_result)
# Determine the header row
try:
file_format = os.path.splitext(csv_filepath)[1].strip('.')
with Stream(csv_filepath, format=file_format) as stream:
with UnknownEncodingStream(csv_filepath, file_format, decoding_result) as stream:
header_offset, headers = headers_guess(stream.sample)
except TabulatorException:
try:
file_format = mimetype.lower().split('/')[-1]
with Stream(csv_filepath, format=file_format) as stream:
with UnknownEncodingStream(csv_filepath, file_format, decoding_result) as stream:
header_offset, headers = headers_guess(stream.sample)
except TabulatorException as e:
raise LoaderError('Tabulator error: {}'.format(e))
Expand Down Expand Up @@ -116,10 +165,16 @@ def load_csv(csv_filepath, resource_id, mimetype='text/csv', logger=None):
logger.info('Ensuring character coding is UTF8')
f_write = tempfile.NamedTemporaryFile(suffix=file_format, delete=False)
try:
with Stream(csv_filepath, format=file_format, skip_rows=skip_rows) as stream:
stream.save(target=f_write.name, format='csv', encoding='utf-8',
delimiter=delimiter)
csv_filepath = f_write.name
save_args = {'target': f_write.name, 'format': 'csv', 'encoding': 'utf-8', 'delimiter': delimiter}
try:
with UnknownEncodingStream(csv_filepath, file_format, decoding_result,
skip_rows=skip_rows) as stream:
stream.save(**save_args)
except (EncodingError, UnicodeDecodeError):
with Stream(csv_filepath, format=file_format, encoding=SINGLE_BYTE_ENCODING,
skip_rows=skip_rows) as stream:
stream.save(**save_args)
csv_filepath = f_write.name

# datastore db connection
engine = get_write_engine()
Expand Down Expand Up @@ -287,16 +342,18 @@ def load_table(table_filepath, resource_id, mimetype='text/csv', logger=None):

# Determine the header row
logger.info('Determining column names and types')
decoding_result = detect_encoding(table_filepath)
logger.info("load_table: Decoded encoding: %s", decoding_result)
try:
file_format = os.path.splitext(table_filepath)[1].strip('.')
with Stream(table_filepath, format=file_format,
custom_parsers={'csv': XloaderCSVParser}) as stream:
with UnknownEncodingStream(table_filepath, file_format, decoding_result,
post_parse=[TypeConverter().convert_types]) as stream:
header_offset, headers = headers_guess(stream.sample)
except TabulatorException:
try:
file_format = mimetype.lower().split('/')[-1]
with Stream(table_filepath, format=file_format,
custom_parsers={'csv': XloaderCSVParser}) as stream:
with UnknownEncodingStream(table_filepath, file_format, decoding_result,
post_parse=[TypeConverter().convert_types]) as stream:
header_offset, headers = headers_guess(stream.sample)
except TabulatorException as e:
raise LoaderError('Tabulator error: {}'.format(e))
Expand Down Expand Up @@ -332,9 +389,11 @@ def load_table(table_filepath, resource_id, mimetype='text/csv', logger=None):
for t, h in zip(types, headers)]

headers = [header.strip()[:MAX_COLUMN_LENGTH] for header in headers if header.strip()]
type_converter = TypeConverter(types=types)

with Stream(table_filepath, format=file_format, skip_rows=skip_rows,
custom_parsers={'csv': XloaderCSVParser}) as stream:
with UnknownEncodingStream(table_filepath, file_format, decoding_result,
skip_rows=skip_rows,
post_parse=[type_converter.convert_types]) as stream:
def row_iterator():
for row in stream:
data_row = {}
Expand Down Expand Up @@ -457,17 +516,6 @@ def send_resource_to_datastore(resource_id, headers, records):
.format(str(e)))


def datastore_resource_exists(resource_id):
from ckan import model
context = {'model': model, 'ignore_auth': True}
try:
response = p.toolkit.get_action('datastore_search')(context, dict(
id=resource_id, limit=0))
except p.toolkit.ObjectNotFound:
return False
return response or {'fields': []}


def delete_datastore_resource(resource_id):
from ckan import model
context = {'model': model, 'user': '', 'ignore_auth': True}
Expand Down
Loading

0 comments on commit 2b26209

Please sign in to comment.