-
Notifications
You must be signed in to change notification settings - Fork 2
fill transaction meta on chunk store and simplify full outer join #355
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
05ed9cc to
32e4854
Compare
| sa.Column("process_ts", sa.Float), # Время последней успешной обработки | ||
| sa.Column("is_success", sa.Boolean), # Успешно ли обработана строка | ||
| sa.Column("priority", sa.Integer), # Приоритет обработки (чем больше, тем выше) | ||
| sa.Column("status", sa.String), # Статус исполнения трансформации |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Возможно тогда is_success лишняя колонка
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
удаляю
|
Пока из того что я вижу, тут получится full table scan + cross joins при записи: O(N × T × M) где T - кол-во трансформаций, M - размер связанных таблиц. Вот в чем суть проблемы: Я бы покрыл новую логику |
|
@halconel суть вот в чем. Если на вход трансформации приходит две или больше таблиц, то по transform_keys этот джойн все равно будет исполняться. Ну просто по логике работы трубы. Насколько я понимаю, этот джойн еще нужно делать вручную в коде трансформации, потому что на вход трансформации приходит несколько таблиц и они там внутри джойнятся. И это не перекладывание проблемы из одного места в другое, это просто то, что задается человеком в пайплайне. От этого никуда не уйти. На примере: если есть таблица картинок и таблица моделей классификации, которые надо прогнать по этим картинкам, то тут будет кросс джойн, просто из-за логики того, что хочет человек. Получается, что раньше в этом месте потенциально было две больших операции: кросс джойн из-за логики и большой full outer join, от которого хотим уйти. Теперь большой full outer join уходит, а кросс джойн переносится на этап записи данных в таблицу |
@swsvc Я все еще очень надеюсь, что это будет покрыто нагрузочными тестами и мы увидим, как изменилась производительность при записи. |
|
| assert len(list(tmp_dir.glob("tbl2/*.png"))) == 10 | ||
|
|
||
|
|
||
| @pytest.mark.skip(reason="impossible to trace changes when they happen externally") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Этот тест изменяет данные на диске вручную. Так как изменения в мету трансформаций прилетают только когда эти изменения отлавливаются (а здесь они не отлавливаются), то заскипал тест
| out.write('{"id": "2", "text": "text2"}\n') | ||
|
|
||
|
|
||
| @pytest.mark.skip(reason="impossible to trace changes when they happen externally") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Точто так же как в предыдущем тесте: файл меняется не через трубу а напрямую через file IO. Скипнул
tests/test_table_store_qdrant.py
Outdated
| yield pd.DataFrame({"id": [1], "embedding": [[0.1]], "str_payload": ["foo"], "int_payload": [42]}) | ||
|
|
||
|
|
||
| @pytest.mark.skip(reason="qdrant store cannot read all the rows from the index") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Для корректной работы нужно чтение всего индекса из квадранта, что он делать не умеет. Тест скипнул
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Вот это теперь снова работает
d7b59d9 to
aaa2756
Compare
datapipe/step/batch_transform.py
Outdated
| if not transform_meta_table_exists: | ||
| meta_index = extract_transformation_meta(self.input_dts, self.transform_keys) | ||
| if not meta_index.empty: | ||
| self.meta_table.insert_rows(meta_index) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Я вот здесь не уверен, надо ли это прямо так делать. Может быть лучше это контролируемо через отдельные менеджмент команды?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
я убрал в метод класса и напрямую вызываю в тестах. Потом если будет юзкейс, то станет понятно, как лучше сделать
2153276 to
565deb4
Compare
|
В этом коммите есть реализованный алгоритм записи меты для транзакций полностью на sql. Однако он используется только в одном случае: когда все таблицы, участвующие в трансформации, лежат в одной sql базе данных (postgres, sqlite). Если это не так, то джойн происходит в пандасе (потребляет много памяти, потому что выкачивает таблицы для джойна) |
e9e6b1a to
bd2dbfb
Compare
No description provided.