11#!/usr/bin/env python3
22"""
3- Validate YAML files against their declared JSON Schema.
3+ Validate YAML and JSON files against their declared JSON Schema.
44
5- Looks for a $schema declaration at the top of YAML files and validates
6- the file content against that schema. The schema can be a URL or a local path.
5+ This script first performs a quick text search for $schema in the file.
6+ Only if a schema reference is found does it fully parse the file and validate.
7+
8+ Supported file types: .yml, .yaml, .json
79
810Exit codes:
911 0 - Validation passed (or no schema declared)
1214"""
1315
1416import json
17+ import re
1518import sys
1619import urllib .request
1720import urllib .error
2023try :
2124 import yaml
2225except ImportError :
23- print (json .dumps ({
24- "status" : "error" ,
25- "message" : "PyYAML is not installed. Run: pip install pyyaml"
26- }))
27- sys .exit (2 )
26+ yaml = None
2827
2928try :
3029 import jsonschema
31- from jsonschema import Draft7Validator , ValidationError
30+ from jsonschema import Draft7Validator
3231except ImportError :
3332 print (json .dumps ({
3433 "status" : "error" ,
3736 sys .exit (2 )
3837
3938
40- def load_yaml_file (file_path : str ) -> tuple [dict | list | None , str | None ]:
41- """Load a YAML file and return its contents."""
39+ # Pattern to quickly detect $schema in file content without full parsing
40+ # Matches both JSON ("$schema": "...") and YAML ($schema: ...)
41+ SCHEMA_PATTERN = re .compile (
42+ r'''["']?\$schema["']?\s*[:=]\s*["']?([^"'\s,}\]]+)''' ,
43+ re .IGNORECASE
44+ )
45+
46+
47+ def quick_detect_schema (file_path : str ) -> str | None :
48+ """
49+ Quickly scan file for $schema declaration without full parsing.
50+ Returns the schema reference if found, None otherwise.
51+ """
4252 try :
4353 with open (file_path , "r" , encoding = "utf-8" ) as f :
44- content = yaml .safe_load (f )
45- return content , None
54+ # Read first 4KB - schema should be near the top
55+ content = f .read (4096 )
56+
57+ match = SCHEMA_PATTERN .search (content )
58+ if match :
59+ return match .group (1 ).rstrip ("'\" " )
60+ return None
61+
62+ except Exception :
63+ return None
64+
65+
66+ def parse_file (file_path : str ) -> tuple [dict | list | None , str | None ]:
67+ """Parse a YAML or JSON file and return its contents."""
68+ path = Path (file_path )
69+ suffix = path .suffix .lower ()
70+
71+ try :
72+ with open (file_path , "r" , encoding = "utf-8" ) as f :
73+ content = f .read ()
74+
75+ if suffix == ".json" :
76+ return json .loads (content ), None
77+ elif suffix in (".yml" , ".yaml" ):
78+ if yaml is None :
79+ return None , "PyYAML is not installed. Run: pip install pyyaml"
80+ return yaml .safe_load (content ), None
81+ else :
82+ # Try JSON first, then YAML
83+ try :
84+ return json .loads (content ), None
85+ except json .JSONDecodeError :
86+ if yaml :
87+ return yaml .safe_load (content ), None
88+ return None , f"Unsupported file type: { suffix } "
89+
90+ except json .JSONDecodeError as e :
91+ return None , f"Invalid JSON syntax: { e } "
4692 except yaml .YAMLError as e :
4793 return None , f"Invalid YAML syntax: { e } "
4894 except FileNotFoundError :
@@ -52,7 +98,7 @@ def load_yaml_file(file_path: str) -> tuple[dict | list | None, str | None]:
5298
5399
54100def extract_schema_reference (content : dict ) -> str | None :
55- """Extract the $schema reference from YAML content."""
101+ """Extract the $schema reference from parsed content."""
56102 if not isinstance (content , dict ):
57103 return None
58104 return content .get ("$schema" )
@@ -63,7 +109,7 @@ def fetch_schema_from_url(url: str) -> tuple[dict | None, str | None]:
63109 try :
64110 req = urllib .request .Request (
65111 url ,
66- headers = {"User-Agent" : "yaml- schema-validator/1.0" }
112+ headers = {"User-Agent" : "schema-validator/1.0" }
67113 )
68114 with urllib .request .urlopen (req , timeout = 30 ) as response :
69115 schema_content = response .read ().decode ("utf-8" )
@@ -72,24 +118,26 @@ def fetch_schema_from_url(url: str) -> tuple[dict | None, str | None]:
72118 try :
73119 return json .loads (schema_content ), None
74120 except json .JSONDecodeError :
75- try :
76- return yaml .safe_load (schema_content ), None
77- except yaml .YAMLError as e :
78- return None , f"Invalid schema format at URL: { e } "
121+ if yaml :
122+ try :
123+ return yaml .safe_load (schema_content ), None
124+ except yaml .YAMLError as e :
125+ return None , f"Invalid schema format at URL: { e } "
126+ return None , "Invalid JSON schema format at URL"
79127
80128 except urllib .error .URLError as e :
81129 return None , f"Failed to fetch schema from URL: { e } "
82130 except Exception as e :
83131 return None , f"Error fetching schema: { e } "
84132
85133
86- def load_schema_from_path (schema_path : str , yaml_file_path : str ) -> tuple [dict | None , str | None ]:
134+ def load_schema_from_path (schema_path : str , source_file_path : str ) -> tuple [dict | None , str | None ]:
87135 """Load a JSON Schema from a local file path."""
88- # Resolve relative paths relative to the YAML file's directory
136+ # Resolve relative paths relative to the source file's directory
89137 path = Path (schema_path )
90138 if not path .is_absolute ():
91- yaml_dir = Path (yaml_file_path ).parent
92- path = yaml_dir / path
139+ source_dir = Path (source_file_path ).parent
140+ path = source_dir / path
93141
94142 path = path .resolve ()
95143
@@ -104,21 +152,23 @@ def load_schema_from_path(schema_path: str, yaml_file_path: str) -> tuple[dict |
104152 try :
105153 return json .loads (content ), None
106154 except json .JSONDecodeError :
107- try :
108- return yaml .safe_load (content ), None
109- except yaml .YAMLError as e :
110- return None , f"Invalid schema format: { e } "
155+ if yaml :
156+ try :
157+ return yaml .safe_load (content ), None
158+ except yaml .YAMLError as e :
159+ return None , f"Invalid schema format: { e } "
160+ return None , "Invalid JSON schema format"
111161
112162 except Exception as e :
113163 return None , f"Error reading schema file: { e } "
114164
115165
116- def load_schema (schema_ref : str , yaml_file_path : str ) -> tuple [dict | None , str | None ]:
166+ def load_schema (schema_ref : str , source_file_path : str ) -> tuple [dict | None , str | None ]:
117167 """Load a schema from either a URL or local path."""
118168 if schema_ref .startswith (("http://" , "https://" )):
119169 return fetch_schema_from_url (schema_ref )
120170 else :
121- return load_schema_from_path (schema_ref , yaml_file_path )
171+ return load_schema_from_path (schema_ref , source_file_path )
122172
123173
124174def validate_against_schema (content : dict , schema : dict ) -> list [dict ]:
@@ -146,14 +196,25 @@ def main():
146196 if len (sys .argv ) < 2 :
147197 print (json .dumps ({
148198 "status" : "error" ,
149- "message" : "Usage: validate_yaml_schema .py <file.yaml>"
199+ "message" : "Usage: validate_schema .py <file.yaml|file.json >"
150200 }))
151201 sys .exit (2 )
152202
153203 file_path = sys .argv [1 ]
154204
155- # Load the YAML file
156- content , error = load_yaml_file (file_path )
205+ # Step 1: Quick detection - scan for $schema without parsing
206+ quick_schema = quick_detect_schema (file_path )
207+ if not quick_schema :
208+ # No schema found in quick scan - pass without full parsing
209+ print (json .dumps ({
210+ "status" : "pass" ,
211+ "file" : file_path ,
212+ "message" : "No $schema declared, skipping validation"
213+ }))
214+ sys .exit (0 )
215+
216+ # Step 2: Schema detected - now do full parsing
217+ content , error = parse_file (file_path )
157218 if error :
158219 print (json .dumps ({
159220 "status" : "error" ,
@@ -162,18 +223,18 @@ def main():
162223 }))
163224 sys .exit (2 )
164225
165- # Check for $ schema reference
226+ # Get the actual schema reference from parsed content
166227 schema_ref = extract_schema_reference (content )
167228 if not schema_ref :
168- # No schema declared - pass silently
229+ # Quick scan found something but it wasn't actually a $schema field
169230 print (json .dumps ({
170231 "status" : "pass" ,
171232 "file" : file_path ,
172233 "message" : "No $schema declared, skipping validation"
173234 }))
174235 sys .exit (0 )
175236
176- # Load the schema
237+ # Step 3: Load the schema
177238 schema , error = load_schema (schema_ref , file_path )
178239 if error :
179240 print (json .dumps ({
@@ -184,7 +245,7 @@ def main():
184245 }))
185246 sys .exit (2 )
186247
187- # Validate
248+ # Step 4: Validate
188249 errors = validate_against_schema (content , schema )
189250
190251 if not errors :
0 commit comments