11import re
2- from typing import Any , Optional
2+ from typing import Optional
33
44from pydantic import BaseModel , Field , field_validator , model_validator
55
6- from ..errors import InvalidDataTypeMappingError
76from .base import CaseInsensitiveStrEnum
8- from .data_types import kafka_to_clickhouse_data_type_mappings
7+ from .filter import FilterConfig , FilterConfigPatch
98from .join import JoinConfig , JoinConfigPatch
9+ from .metadata import MetadataConfig
10+ from .schema import Schema
1011from .sink import SinkConfig , SinkConfigPatch
1112from .source import SourceConfig , SourceConfigPatch
1213
@@ -24,11 +25,15 @@ class PipelineStatus(CaseInsensitiveStrEnum):
2425
2526
2627class PipelineConfig (BaseModel ):
28+ version : str = Field (default = "2.0.0" )
2729 pipeline_id : str
2830 name : Optional [str ] = Field (default = None )
2931 source : SourceConfig
3032 join : Optional [JoinConfig ] = Field (default = JoinConfig ())
33+ filter : Optional [FilterConfig ] = Field (default = FilterConfig ())
34+ metadata : Optional [MetadataConfig ] = Field (default = MetadataConfig ())
3135 sink : SinkConfig
36+ pipeline_schema : Schema = Field (alias = "schema" )
3237
3338 @field_validator ("pipeline_id" )
3439 @classmethod
@@ -48,147 +53,55 @@ def validate_pipeline_id(cls, v: str) -> str:
4853 return v
4954
5055 @model_validator (mode = "after" )
51- def set_pipeline_name (self ) -> "PipelineConfig" :
56+ def validate_config (self ) -> "PipelineConfig" :
5257 """
53- If name is not provided, use the pipeline_id and replace hyphens
54- with spaces.
58+ Set pipeline name if not provided and validate configuration.
5559 """
60+ # Set pipeline name if not provided
5661 if self .name is None :
5762 self .name = self .pipeline_id .replace ("-" , " " ).title ()
58- return self
5963
60- @field_validator ("join" )
61- @classmethod
62- def validate_join_config (
63- cls ,
64- v : Optional [JoinConfig ],
65- info : Any ,
66- ) -> Optional [JoinConfig ]:
67- if not v or not v .enabled :
68- return v
69-
70- # Get the source topics from the parent model's data
71- source = info .data .get ("source" , {})
72- if isinstance (source , dict ):
73- source_topics = source .get ("topics" , [])
74- else :
75- source_topics = source .topics
76- if not source_topics :
77- return v
78-
79- # Validate each source in the join config
80- for source in v .sources :
81- # Check if source_id exists in any topic
82- source_exists = any (
83- topic .name == source .source_id for topic in source_topics
84- )
85- if not source_exists :
64+ # Validate schema
65+ topic_names = [topic .name for topic in self .source .topics ]
66+ for field in self .pipeline_schema .fields :
67+ if field .source_id not in topic_names :
8668 raise ValueError (
87- f"Source ID ' { source .source_id } ' does not exist in any topic"
69+ f"Source ' { field .source_id } ' does not exist in any topic"
8870 )
8971
90- # Find the topic and check if join_key exists in its schema
91- topic = next (( t for t in source_topics if t . name == source .source_id ), None )
92- if not topic :
72+ # Validate deduplication ID fields
73+ for topic in self . source .topics :
74+ if topic . deduplication is None or not topic . deduplication . enabled :
9375 continue
9476
95- field_exists = any (
96- field .name == source .join_key for field in topic .event_schema .fields
97- )
98- if not field_exists :
77+ if not self .pipeline_schema .is_field_in_schema (
78+ topic .deduplication .id_field , topic .name
79+ ):
9980 raise ValueError (
100- f"Join key '{ source . join_key } ' does not exist in source "
101- f"' { source . source_id } ' schema "
81+ f"Deduplication id_field '{ topic . deduplication . id_field } ' "
82+ f"not found in schema from source ' { topic . name } ' "
10283 )
10384
104- return v
105-
106- @field_validator ("sink" )
107- @classmethod
108- def validate_sink_config (cls , v : SinkConfig , info : Any ) -> SinkConfig :
109- # Get the source topics from the parent model's data
110- source = info .data .get ("source" , {})
111- if isinstance (source , dict ):
112- source_topics = source .get ("topics" , [])
113- else :
114- source_topics = source .topics
115- if not source_topics :
116- return v
117-
118- # Validate each table mapping
119- for mapping in v .table_mapping :
120- # Check if source_id exists in any topic
121- source_exists = any (
122- topic .name == mapping .source_id for topic in source_topics
123- )
124- if not source_exists :
125- raise ValueError (
126- f"Source ID '{ mapping .source_id } ' does not exist in any topic"
127- )
128-
129- # Find the topic and check if field_name exists in its schema
130- topic = next (
131- (t for t in source_topics if t .name == mapping .source_id ), None
132- )
133- if not topic :
134- continue
135-
136- field_exists = any (
137- field .name == mapping .field_name for field in topic .event_schema .fields
138- )
139- if not field_exists :
140- raise ValueError (
141- f"Field '{ mapping .field_name } ' does not exist in source "
142- f"'{ mapping .source_id } ' event schema"
143- )
144-
145- return v
146-
147- @field_validator ("sink" )
148- @classmethod
149- def validate_data_type_compatibility (cls , v : SinkConfig , info : Any ) -> SinkConfig :
150- # Get the source topics from the parent model's data
151- source = info .data .get ("source" , {})
152- if isinstance (source , dict ):
153- source_topics = source .get ("topics" , [])
154- else :
155- source_topics = source .topics
156- if not source_topics :
157- return v
158-
159- # Validate each table mapping
160- for mapping in v .table_mapping :
161- # Find the topic
162- topic = next (
163- (t for t in source_topics if t .name == mapping .source_id ), None
164- )
165- if not topic :
166- continue
167-
168- # Find the source field
169- source_field = next (
170- (f for f in topic .event_schema .fields if f .name == mapping .field_name ),
171- None ,
172- )
173- if not source_field :
174- continue
175-
176- # Get the source and target data types
177- source_type = source_field .type
178- target_type = mapping .column_type
85+ # Validate join configuration
86+ if self .join and self .join .enabled :
87+ # Validate each source in the join config
88+ for join_source in self .join .sources :
89+ if join_source .source_id not in topic_names :
90+ raise ValueError (
91+ f"Join source '{ join_source .source_id } ' does not exist in any "
92+ "topic"
93+ )
94+
95+ if not self .pipeline_schema .is_field_in_schema (
96+ join_source .join_key ,
97+ join_source .source_id ,
98+ ):
99+ raise ValueError (
100+ f"Join key '{ join_source .join_key } ' does not exist in source "
101+ f"'{ join_source .source_id } ' schema"
102+ )
179103
180- # Check if the target type is compatible with the source type
181- compatible_types = kafka_to_clickhouse_data_type_mappings .get (
182- source_type , []
183- )
184- if target_type not in compatible_types :
185- raise InvalidDataTypeMappingError (
186- f"Data type '{ target_type } ' is not compatible with source type "
187- f"'{ source_type } ' for field '{ mapping .field_name } ' in source "
188- f"'{ mapping .source_id } '"
189- )
190-
191- return v
104+ return self
192105
193106 def update (self , config_patch : "PipelineConfigPatch" ) -> "PipelineConfig" :
194107 """
@@ -217,15 +130,31 @@ def update(self, config_patch: "PipelineConfigPatch") -> "PipelineConfig":
217130 config_patch .join
218131 )
219132
133+ # Update filter if provided
134+ if config_patch .filter is not None :
135+ updated_config .filter = (updated_config .filter or FilterConfig ()).update (
136+ config_patch .filter
137+ )
138+
220139 # Update sink if provided
221140 if config_patch .sink is not None :
222141 updated_config .sink = updated_config .sink .update (config_patch .sink )
223142
143+ # Update schema if provided
144+ if config_patch .pipeline_schema is not None :
145+ updated_config .pipeline_schema = config_patch .pipeline_schema
146+
147+ if config_patch .metadata is not None :
148+ updated_config .metadata = config_patch .metadata
149+
224150 return updated_config
225151
226152
227153class PipelineConfigPatch (BaseModel ):
228154 name : Optional [str ] = Field (default = None )
229- source : Optional [SourceConfigPatch ] = Field (default = None )
230155 join : Optional [JoinConfigPatch ] = Field (default = None )
156+ filter : Optional [FilterConfigPatch ] = Field (default = None )
157+ metadata : Optional [MetadataConfig ] = Field (default = None )
158+ pipeline_schema : Optional [Schema ] = Field (default = None , alias = "schema" )
231159 sink : Optional [SinkConfigPatch ] = Field (default = None )
160+ source : Optional [SourceConfigPatch ] = Field (default = None )
0 commit comments