π 3 years since first release 0.1.0 π
Breaking Changes
-
Add Python 3.13. support. (#298)
-
Change the logic of
FileConnection.walk
andFileConnection.list_dir
. (#327)Previously
limits.stops_at(path) == True
considered as "return current file and stop", and could lead to exceeding some limit. Not it means "stop immediately". -
Change default value for
FileDFWriter.Options(if_exists=...)
fromerror
toappend
, to make it consistent with other.Options()
classes within onETL. (#343)
Features
-
Add support for
FileModifiedTimeHWM
HWM class (see etl-entities 2.5.0):from etl_entitites.hwm import FileModifiedTimeHWM from onetl.file import FileDownloader from onetl.strategy import IncrementalStrategy downloader = FileDownloader( ..., hwm=FileModifiedTimeHWM(name="somename"), ) with IncrementalStrategy(): downloader.run()
-
Introduce
FileSizeRange(min=..., max=...)
filter class. (#325)Now users can set
FileDownloader
/FileMover
to download/move only files with specific file size range:from onetl.file import FileDownloader from onetl.file.filter import FileSizeRange downloader = FileDownloader( ..., filters=[FileSizeRange(min="10KiB", max="1GiB")], )
-
Introduce
TotalFilesSize(...)
limit class. (#326)Now users can set
FileDownloader
/FileMover
to stop downloading/moving files after reaching a certain amount of data:from datetime import datetime, timedelta from onetl.file import FileDownloader from onetl.file.limit import TotalFilesSize downloader = FileDownloader( ..., limits=[TotalFilesSize("1GiB")], )
-
Implement
FileModifiedTime(since=..., until=...)
file filter. (#330)Now users can set
FileDownloader
/FileMover
to download/move only files with specific file modification time:from datetime import datetime, timedelta from onetl.file import FileDownloader from onetl.file.filter import FileModifiedTime downloader = FileDownloader( ..., filters=[FileModifiedTime(before=datetime.now() - timedelta(hours=1))], )
-
Add
SparkS3.get_exclude_packages()
andKafka.get_exclude_packages()
methods. (#341)Using them allows to skip downloading dependencies not required by this specific connector, or which are already a part of Spark/PySpark:
from onetl.connection import SparkS3, Kafka maven_packages = [ *SparkS3.get_packages(spark_version="3.5.4"), *Kafka.get_packages(spark_version="3.5.4"), ] exclude_packages = SparkS3.get_exclude_packages() + Kafka.get_exclude_packages() spark = ( SparkSession.builder.appName("spark_app_onetl_demo") .config("spark.jars.packages", ",".join(maven_packages)) .config("spark.jars.excludes", ",".join(exclude_packages)) .getOrCreate() )
Improvements
-
All DB connections opened by
JDBC.fetch(...)
,JDBC.execute(...)
orJDBC.check()
are immediately closed after the statements is executed. (#334)Previously Spark session with
master=local[3]
actually opened up to 5 connections to target DB - one forJDBC.check()
, another for Spark driver interaction with DB to create tables, and one for each Spark executor. Now only max 4 connections are opened, asJDBC.check()
does not hold opened connection.This is important for RDBMS like Postgres or Greenplum where number of connections is strictly limited and limit is usually quite low.
-
Set up
ApplicationName
(client info) for Clickhouse, MongoDB, MSSQL, MySQL and Oracle. (#339, #248)Also update
ApplicationName
format for Greenplum, Postgres, Kafka and SparkS3. Now all connectors have the sameApplicationName
format:${spark.applicationId} ${spark.appName} onETL/${onetl.version} Spark/${spark.version}
The only connections not sending
ApplicationName
are Teradata and FileConnection implementations. -
Now
DB.check()
will test connection availability not only on Spark driver, but also from some Spark executor. (#346)This allows to fail immediately if Spark driver host has network access to target DB, but Spark executors have not.
Bug Fixes
-
Avoid suppressing Hive Metastore errors while using
DBWriter
. (#329)Previously this was implemented as:
try: spark.sql(f"SELECT * FROM {table}") table_exists = True except Exception: table_exists = False
If Hive Metastore was overloaded and responded with an exception, it was considered as non-existing table, resulting to full table override instead of append or override only partitions subset.
-
Fix using onETL to write data to PostgreSQL or Greenplum instances behind pgbouncer with
pool_mode=transaction
. (#336)Previously
Postgres.check()
opened a read-only transaction, pgbouncer changed the entire connection type from read-write to read-only, and whenDBWriter.run(df)
executed in read-only connection, producing errors like:org.postgresql.util.PSQLException: ERROR: cannot execute INSERT in a read-only transaction org.postgresql.util.PSQLException: ERROR: cannot execute TRUNCATE TABLE in a read-only transaction
Added a workaround by passing
readOnly=True
to JDBC params for read-only connections, so pgbouncer may differ read-only and read-write connections properly.After upgrading onETL 0.13.x or higher the same error still may appear of pgbouncer still holds read-only connections and returns them for DBWriter. To this this, user can manually convert read-only connection to read-write:
postgres.execute("BEGIN READ WRITE;") # <-- add this line DBWriter(...).run()
After all connections in pgbouncer pool were converted from read-only to read-write, and error fixed, this additional line could be removed.
-
Fix
MSSQL.fetch(...)
andMySQL.fetch(...)
opened a read-write connection instead of read-only. (#337)-
Now this is fixed:
MSSQL.fetch(...)
establishes connection withApplicationIntent=ReadOnly
.MySQL.fetch(...)
callsSET SESSION TRANSACTION READ ONLY
statement.
-
-
Fixed passing multiple filters to
FileDownloader
andFileMover
. (#338) If was caused by sorting filters list in internal logging method, butFileFilter
subclasses are not sortable. -
Fix a false warning about a lof of parallel connections to Grenplum. (#342)
Creating Spark session with
.master("local[5]")
may open up to 6 connections to Greenplum (=number of Spark executors + 1 for driver), but onETL instead used number of CPU cores on the host as a number of parallel connections.This lead to showing a false warning that number of Greenplum connections is too high, which actually should be the case only if number of executors is higher than 30.
-
Fix MongoDB trying to use current database name as
authSource
. (#347)Use default connector value which is
admin
database. Previous onETL versions could be fixed by:from onetl.connection import MongoDB mongodb = MongoDB( ..., database="mydb", extra={ "authSource": "admin", }, )
Dependencies
-
-
Update DB connectors/drivers to latest versions: (#345)
- Clickhouse
0.6.5
β0.7.2
- MongoDB
10.4.0
β10.4.1
- MySQL
9.0.0
β9.2.0
- Oracle
23.5.0.24.07
β23.7.0.25.01
- Postgres
42.7.4
β42.7.5
- Clickhouse
-
Doc only Changes
- Split large code examples to tabs. (#344)