diff --git a/README.md b/README.md index 42f38f3..bf3eacc 100644 --- a/README.md +++ b/README.md @@ -18,6 +18,9 @@ Automated agents and agentic workflows (like Ralph, AutoGPT, or custom CI/CD bui * **Directory Scanning:** Scan a single file OR an entire folder of specs (Distributed Compliance). * **Compliance Reporting:** Generate executive summaries (`--output compliance`) with % completion metrics. +* **Code-to-Spec Verification (Drift Detection):** 🆕 + * **Reality Checks:** Verify if claims in the Spec (e.g., "Database: Postgres") actually exist in the code (e.g., `requirements.txt`). + * **Drift Reporting:** Flag contradictions between documentation and implementation files. * **Deep Validation:** * **Field Patterns:** Validate specific values (e.g., "Retention: 30 days") using regex. * **Structure:** Ensure sections contain specific subsections (`must_contain`). @@ -31,7 +34,7 @@ Automated agents and agentic workflows (like Ralph, AutoGPT, or custom CI/CD bui * **Gap Severity Model:** Categorizes issues as **CRITICAL**, **HIGH**, **MEDIUM**, or **LOW**. * **SARIF Output:** Native integration with GitHub Advanced Security and GitLab Security Dashboards. * **Exception Management:** Formalize risk acceptance using a `.nodignore` file. -* **Remote Rule Registry:** Securely fetch industry-standard rules via HTTPS with strict SSL verification. +* **Remote Rule Registry:** Securely fetch industry-standard rules via HTTPS with strict SSL verification. * **Community Rules Library:** https://github.com/mraml/nod-rules ## **⚠️ Important Disclaimer** @@ -53,6 +56,7 @@ Don't know what headers strict compliance requires? Let `nod` build the skeleton ``` # Generate a spec with all headers for EU AI Act, NIST, and OWASP nod ai-spec.md --init --rules rules.yaml + ``` ### **2\. Build: Agentic Context Injection (`--export`)** @@ -65,6 +69,7 @@ nod --export --rules rules.yaml # Generate Cursor/Windsurf rules nod --export cursor + ``` ### **3\. Audit: The Gatekeeper** @@ -77,6 +82,7 @@ nod docs/ --strict --min-severity HIGH # Generate Manager Report nod docs/ --output compliance + ``` ### **4\. Maintain: Auto-Fix (`--fix`)** @@ -85,6 +91,7 @@ Did you miss a new requirement? `nod` can append the missing sections for you. ``` nod docs/ --fix --rules rules.yaml + ``` ### **5\. Secure: Integrity Signing** @@ -95,6 +102,7 @@ To verify that an audit result hasn't been tampered with, set the `NOD_SECRET_KE export NOD_SECRET_KEY="my-secret-ci-key" nod ai-spec.md --output json # Output includes "signature": "a1b2c3..." + ``` ### **6\. Baseline: Freeze & Verify** @@ -107,12 +115,55 @@ nod docs/ --freeze # Verify current state against lockfile (CI/CD) nod docs/ --verify + +``` + +## **💡 CLI Power Tips** + +* **Registry Shorthand:** Skip manually downloading files. Use `registry:name` to fetch from the official library. + +``` + nod docs/ --rules registry:owasp-llm +``` + +* + **Silent Mode (`-q`):** Suppress banner art and success messages. Perfect for clean CI logs. + +``` + nod docs/ -q --strict +``` + +* + **File Output (`--save-to`):** Save reports directly to a file without piping. + +``` + nod docs/ --output sarif --save-to report.sarif ``` ## **🧠 Advanced Rule Logic** **nod** supports sophisticated rule definitions in `rules.yaml` to handle complex compliance scenarios. +### **Reality Checks (Drift Detection)** + +Ensure that what is written in the Spec actually exists in the Code. + +``` +reality_checks: + # Check if the DB defined in Spec matches requirements.txt + - spec_pattern: "Database:\\s*(\\w+)" # Captures 'Postgres' + target_file: "requirements.txt" # Scans this file + reality_pattern: "(?i)\\1" # Looks for 'Postgres' (case-insensitive) + severity: "HIGH" + + # Check if Isolation claims match Dockerfile + - spec_pattern: "Isolation:\\s*(\\w+)" # Captures 'Alpine' + target_file: "Dockerfile" + reality_pattern: "(?i)FROM.*\\1" # Looks for 'FROM ... Alpine' + severity: "CRITICAL" + +``` + ### **Enforcement Modes** Control *where* a requirement must appear. @@ -121,6 +172,7 @@ Control *where* a requirement must appear. - id: "## Data Privacy" mode: "in_all_files" # Must exist in EVERY file scanned (e.g., footer policy) # Default mode is "at_least_one" (Distributed compliance) + ``` ### **Field Validation** @@ -132,6 +184,7 @@ Go beyond headers. Check for specific content patterns. must_match: - pattern: "Retention Period: \d+ (days|years)" message: "Must specify numeric retention period" + ``` ### **Cross-Reference Validation** @@ -142,6 +195,7 @@ Ensure traceabilty between documents (e.g., Threats must have Controls). cross_references: - source: "Threat T-(\d+)" must_have: "Control C-\1" + ``` ## **⚙️ Configuration (`rules.yaml`)** @@ -172,7 +226,7 @@ jobs: # Run nod using the Official Action - name: Run nod Gatekeeper - uses: mraml/nod@v2.0.0 + uses: mraml/nod@v2.1.0 with: target: 'docs/' rules: 'rules.yaml' @@ -187,11 +241,12 @@ jobs: if: always() with: sarif_file: nod-results.sarif + ``` ## **🤝 Contributing** -We welcome contributions\! Please see [CONTRIBUTING.md](https://github.com/mraml/nod/blob/main/CONTRIBUTING.md) for details on how to add new rules or features. +We welcome contributions\! Please see [CONTRIBUTING.md](https://www.google.com/search?q=CONTRIBUTING.md) for details on how to add new rules or features. If you find **nod** useful for your organization, please consider **starring the repository** to help others find it. @@ -201,6 +256,7 @@ Add this to your `README.md` to show if your specs are currently passing the gat ``` ![Nod Gatekeeper](https://github.com///actions/workflows/nod-gatekeeper.yml/badge.svg) + ``` ## **🤖 Transparency** @@ -216,3 +272,5 @@ Apache 2.0 + + diff --git a/src/nod/cli.py b/src/nod/cli.py index 531d878..d680a69 100644 --- a/src/nod/cli.py +++ b/src/nod/cli.py @@ -4,7 +4,7 @@ import json from .config import load_rules, load_ignore from .scanner import Scanner, SEVERITY_MAP -from .generator import gen_template, gen_context, apply_fix +from .generator import gen_template, gen_context, gen_schema, apply_fix from .reporters import gen_sarif, gen_report from .security import sign_attestation, freeze, verify from .utils import Colors, colorize @@ -17,6 +17,7 @@ def main(): parser.add_argument("--init", action="store_true") parser.add_argument("--fix", action="store_true") parser.add_argument("--export", nargs="?", const="context", choices=["context", "cursor", "windsurf"], help="Export context/rules") + parser.add_argument("--export-schema", action="store_true", help="Export active rules as JSON Schema") parser.add_argument("--strict", action="store_true") parser.add_argument("--freeze", action="store_true") parser.add_argument("--verify", action="store_true") @@ -35,6 +36,10 @@ def main(): policy_version = config.get("version", "unknown") ignored = load_ignore(".nodignore") + if args.export_schema: + print(gen_schema(config, policy_version)) + sys.exit(0) + if args.export: print(gen_context(config, policy_version, ignored, args.export)) sys.exit(0) @@ -101,8 +106,6 @@ def main(): min_val = SEVERITY_MAP.get(args.min_severity, 0) for data in results.values(): - # In quiet mode, skip profile headers unless there's a failure inside? - # Or just print failures. Let's print failures only in quiet mode. profile_buffer = [] if not args.quiet: profile_buffer.append(f"\n[{colorize(data['label'], Colors.BOLD)}]") @@ -127,7 +130,6 @@ def main(): else: profile_buffer.append(f" {colorize('✅', Colors.GREEN)} [PASS] {name}") - # In quiet mode, only append buffer if there were failures if not args.quiet or has_failures: summary.extend(profile_buffer) @@ -141,7 +143,6 @@ def main(): output_content = "\n".join(summary) - # Check exit code based on severity for non-text outputs too if SEVERITY_MAP.get(max_sev_label, 0) >= SEVERITY_MAP.get(args.min_severity, 0): exit_code = 1 @@ -155,7 +156,6 @@ def main(): print(f"Error saving file: {e}", file=sys.stderr) sys.exit(1) else: - # Only print if there is content (quiet mode with no errors might be empty) if output_content.strip(): print(output_content) diff --git a/src/nod/generator.py b/src/nod/generator.py index b9753a1..c76fcdc 100644 --- a/src/nod/generator.py +++ b/src/nod/generator.py @@ -1,4 +1,5 @@ import sys +import json from typing import Dict, Any from .utils import clean_header @@ -49,6 +50,41 @@ def gen_context(config: Dict[str, Any], policy_version: str, ignored: list, fmt: return "\n".join(lines) +def gen_schema(config: Dict[str, Any], policy_version: str) -> str: + """Generates a JSON Schema representation of the active policy.""" + schema = { + "$schema": "http://json-schema.org/draft-07/schema#", + "title": "nod Compliance Policy", + "version": policy_version, + "description": "Active compliance rules loaded by nod.", + "type": "object", + "properties": { + "profiles": { + "type": "object", + "properties": {} + } + } + } + + for name, data in config.get("profiles", {}).items(): + profile_schema = { + "type": "object", + "description": data.get('badge_label', name), + "properties": { + "requirements": {"type": "array", "items": {"type": "string"}}, + "red_flags": {"type": "array", "items": {"type": "string"}} + } + } + + # Populate specific requirements as enums/examples + reqs = [r.get("label") or clean_header(r['id']) for r in data.get("requirements", [])] + if reqs: + profile_schema["properties"]["requirements"]["examples"] = reqs + + schema["properties"]["profiles"]["properties"][name] = profile_schema + + return json.dumps(schema, indent=2) + def apply_fix(path: str, results: Dict[str, Any]) -> None: """Appends missing sections to the target file.""" # Logic to determine target file (if directory passed, create nod-compliance.md) diff --git a/src/nod/reporters.py b/src/nod/reporters.py index 0c57858..42c9a74 100644 --- a/src/nod/reporters.py +++ b/src/nod/reporters.py @@ -39,6 +39,8 @@ def gen_sarif(attestation: Dict[str, Any], path: str) -> Dict[str, Any]: if c.get("control_id"): props["compliance-ref"] = c["control_id"] props["security-severity"] = SARIF_SCORE_MAP.get(c["severity"], "1.0") + if c.get("type") == "contradiction": + props["tags"] = ["drift", "spec-contradiction"] desc = c.get("label") or rule_id rules.append({ @@ -82,11 +84,19 @@ def gen_sarif(attestation: Dict[str, Any], path: str) -> Dict[str, Any]: def gen_report(attestation: Dict[str, Any]) -> str: """Generates a human-readable text report.""" out = [] + contradictions = [] + for data in attestation["results"].values(): chks = data.get("checks", []) pct = int((len([c for c in chks if c["status"] != "FAIL"]) / len(chks) * 100) if chks else 0) out.append(f"{data['label']} Report ({datetime.utcnow().strftime('%Y-%m-%d')})\nStatus: {pct}% Compliant\n") + for c in chks: + # Separate Contradictions/Drift for special section + if c.get("type") == "contradiction" and c["status"] == "FAIL": + contradictions.append(f"⚠️ {c['remediation']} (Line {c.get('line')} in {c.get('source')})") + continue + icon = {"FAIL": "❌", "EXCEPTION": "⚪", "SKIPPED": "⏭️"}.get(c["status"], "✅") ref = c.get("article") or c.get("control_id") name = c.get("label") or clean_header(c['id']) @@ -98,4 +108,13 @@ def gen_report(attestation: Dict[str, Any]) -> str: out.append(f" Ev: {c['source']}:{c.get('line')}") out.append("") out.append("-" * 40) + + # Append Drift Report if contradictions found + if contradictions: + out.append("\n" + "="*40) + out.append("📊 POTENTIAL CODE CONTRADICTIONS (DRIFT)") + out.append("="*40) + out.extend(contradictions) + out.append("") + return "\n".join(out) diff --git a/src/nod/scanner.py b/src/nod/scanner.py index 10b8355..cbe2ec5 100644 --- a/src/nod/scanner.py +++ b/src/nod/scanner.py @@ -12,30 +12,77 @@ SEVERITY_MAP = {"CRITICAL": 4, "HIGH": 3, "MEDIUM": 2, "LOW": 1, "INFO": 0} class Scanner: + """ + Core engine for scanning files against compliance profiles. + """ + def __init__(self, config: Dict[str, Any], ignored_rules: List[str]): + """ + Initialize the Scanner. + + Args: + config: The loaded rule configuration. + ignored_rules: List of rule IDs or patterns to ignore. + """ self.config = config self.ignored_rules = ignored_rules self.attestation = {} def _collect_files(self, path: str) -> List[str]: + """ + recursively collects files to scan, respecting ignore patterns. + + Args: + path: The directory or file path to scan. + + Returns: + A list of valid file paths. + """ if os.path.isfile(path): return [path] + found = [] + valid_extensions = { + '.md', '.markdown', '.mdx', '.json', '.txt', + '.py', '.js', '.ts', '.yml', '.yaml', '.dockerfile' + } + valid_filenames = {'Dockerfile', 'Makefile'} + for root, dirs, files in os.walk(path): # Performance Fix: Prune directory tree based on ignores - # Modify dirs in-place to prevent os.walk from entering them - dirs[:] = [d for d in dirs if not should_ignore(os.path.join(root, d), self.ignored_rules)] + dirs[:] = [ + d for d in dirs + if not should_ignore(os.path.join(root, d), self.ignored_rules) + ] for f in files: fpath = os.path.join(root, f) if should_ignore(fpath, self.ignored_rules): continue - - if os.path.splitext(f)[1].lower() in {'.md', '.markdown', '.mdx', '.json', '.txt'}: + + # Collect implementation files for Reality Checks, not just specs + ext = os.path.splitext(f)[1].lower() + if ext in valid_extensions or f in valid_filenames: found.append(fpath) return found - def scan_input(self, path: str, strict: bool = False, version: str = "unknown") -> Tuple[Dict[str, Any], str]: + def scan_input( + self, + path: str, + strict: bool = False, + version: str = "unknown" + ) -> Tuple[Dict[str, Any], str]: + """ + Orchestrates the scanning process. + + Args: + path: Target path to scan. + strict: Enforce stricter validation rules. + version: Policy version string. + + Returns: + A tuple containing the results dict and the max severity label. + """ files = self._collect_files(path) if not files: return {"error": f"No files in {path}"}, "NONE" @@ -45,34 +92,46 @@ def scan_input(self, path: str, strict: bool = False, version: str = "unknown") hashes = [] file_map = {} base_dir = path if os.path.isdir(path) else os.path.dirname(path) + + # Determine if we are scanning a single JSON (e.g. an API spec) is_single_json = len(files) == 1 and files[0].endswith(".json") default_source = files[0] if is_single_json else None + spec_extensions = ('.md', '.markdown', '.mdx', '.json', '.txt') for file_path in files: try: size = os.path.getsize(file_path) if size > MAX_FILE_SIZE: - print(f"Warning: Skipping {file_path} (Size limit)", file=sys.stderr) + print(f"Warn: Skipping {file_path} (Size)", file=sys.stderr) continue total_size += size if total_size > MAX_TOTAL_SIZE: - return {"error": "Total aggregation size exceeds memory limit"}, "NONE" + return { + "error": "Total aggregation size exceeds memory limit" + }, "NONE" with open(file_path, "r", encoding="utf-8") as f: raw = f.read() file_map[file_path] = raw hashes.append(hashlib.sha256(raw.encode()).hexdigest()) - if is_single_json: - agg_content = raw - else: - agg_content += f"\n\n\n{raw}" + + # Only aggregate "Spec" files for the main compliance audit + if file_path.endswith(spec_extensions): + if is_single_json: + agg_content = raw + else: + agg_content += ( + f"\n\n\n{raw}" + ) except Exception as e: print(f"Warn: {e}", file=sys.stderr) agg_hash = hashlib.sha256("".join(sorted(hashes)).encode()).hexdigest() ext = ".json" if is_single_json else ".md" - results = self._audit(agg_content, ext, strict, base_dir, default_source, file_map) + results = self._audit( + agg_content, ext, strict, base_dir, default_source, file_map + ) max_sev_val = -1 max_sev_label = "NONE" @@ -97,7 +156,16 @@ def scan_input(self, path: str, strict: bool = False, version: str = "unknown") } return results, max_sev_label - def _check_req(self, text: str, ext: str, req: Dict, strict: bool) -> Tuple[bool, int, int, str]: + def _check_req( + self, + text: str, + ext: str, + req: Dict, + strict: bool + ) -> Tuple[bool, int, int, str]: + """ + Validates a single requirement against text content. + """ rule_id = req["id"] passed = False line = 1 @@ -112,10 +180,12 @@ def _check_req(self, text: str, ext: str, req: Dict, strict: bool) -> Tuple[bool if not strict or val.strip(): passed = True for p in req.get("must_match", []): - if p.get("pattern") and not re.search(p["pattern"], val, re.I | re.M): + pat = p.get("pattern") + if pat and not re.search(pat, val, re.I | re.M): passed = False err = p.get('message', 'Value mismatch') - except Exception: pass + except Exception: + pass else: try: match = re.search(rule_id, text, re.I | re.M) @@ -124,83 +194,148 @@ def _check_req(self, text: str, ext: str, req: Dict, strict: bool) -> Tuple[bool line = get_line_number(text, start_idx) passed = True match_str = match.group(0).strip() - level = len(match_str) - len(match_str.lstrip('#')) if match_str.startswith('#') else 0 + + # Calculate header level to define section boundary + stripped = match_str.lstrip('#') + level = len(match_str) - len(stripped) if match_str.startswith('#') else 0 + section = text[match.end():] - next_pattern = r"^#{1," + str(level) + r"}\s" if level else r"^#+\s" + next_pattern = ( + r"^#{1," + str(level) + r"}\s" if level else r"^#+\s" + ) + if next_match := re.search(next_pattern, section, re.M): section = section[:next_match.start()] - if strict and len(section.strip()) <= 15: passed = False + if strict and len(section.strip()) <= 15: + passed = False if passed: - if missing := [s for s in req.get("must_contain", []) if not re.search(re.escape(s), section, re.I)]: + missing = [ + s for s in req.get("must_contain", []) + if not re.search(re.escape(s), section, re.I) + ] + if missing: passed = False err = f"Missing: {', '.join(missing)}" + for p in req.get("must_match", []): - if p.get("pattern") and not re.search(p["pattern"], section, re.I | re.M): + pat = p.get("pattern") + if pat and not re.search(pat, section, re.I | re.M): passed = False err = p.get('message', 'Pattern mismatch') - except re.error: pass + except re.error: + pass return passed, line, start_idx, err - def _audit(self, content: str, ext: str, strict: bool, base: str, def_src: str, fmap: Dict) -> Dict: + def _audit( + self, + content: str, + ext: str, + strict: bool, + base: str, + def_src: str, + fmap: Dict + ) -> Dict: + """ + Performs the main audit loop against all profiles. + """ report = {} for name, data in self.config.get("profiles", {}).items(): checks, skip, added_reqs = [], [], [] + # 1. Evaluate Conditions for c in data.get("conditions", []): try: if re.search(c["if"]["regex_match"], content, re.I | re.M): skip.extend(c["then"].get("skip", [])) for r in c["then"].get("require", []): - if isinstance(r, str): added_reqs.append({"id": r, "severity": "HIGH", "remediation": "Conditional Req"}) - elif isinstance(r, dict): added_reqs.append(r) - except re.error as e: print(f"Warning: Regex error: {e}", file=sys.stderr) + if isinstance(r, str): + added_reqs.append({ + "id": r, + "severity": "HIGH", + "remediation": "Conditional Req" + }) + elif isinstance(r, dict): + added_reqs.append(r) + except re.error as e: + print(f"Warning: Regex error: {e}", file=sys.stderr) + # 2. Check Requirements for req in data.get("requirements", []) + added_reqs: rule_id = req["id"] status, passed, line, src = "FAIL", False, 1, def_src remediation = req.get("remediation", "") - if rule_id in skip: status, passed = "SKIPPED", True - elif rule_id in self.ignored_rules: status, passed = "EXCEPTION", True + if rule_id in skip: + status, passed = "SKIPPED", True + elif rule_id in self.ignored_rules: + status, passed = "EXCEPTION", True else: mode = req.get("mode", "at_least_one") if mode == "in_all_files": - # Check EVERY file individually - missing = [os.path.basename(fp) for fp, txt in fmap.items() - if not self._check_req(txt, os.path.splitext(fp)[1], req, strict)[0]] - if missing: remediation = f"Missing in: {', '.join(missing)}. " + remediation - else: status, passed, src = "PASS", True, "all_files" + spec_files = [ + fp for fp in fmap.keys() + if fp.endswith(('.md', '.markdown', '.json')) + ] + missing = [ + os.path.basename(fp) for fp in spec_files + if not self._check_req( + fmap[fp], + os.path.splitext(fp)[1], + req, + strict + )[0] + ] + if missing: + remediation = ( + f"Missing in: {', '.join(missing)}. " + + remediation + ) + else: + status, passed, src = "PASS", True, "all_files" else: - # Logic Fix for Distributed JSON: - # Instead of checking 'agg_content' which might be mangled JSON text, - # iterate through the files in fmap and see if ANY satisfy the req. - # This preserves 'at_least_one' logic without relying on text aggregation for JSON. any_pass = False - - # Optimization: Try the aggregate first for Markdown (fast), but iterate for JSON/Mixed if ext == ".md": - p_ok, ln, idx, err = self._check_req(content, ext, req, strict) + p_ok, ln, idx, err = self._check_req( + content, ext, req, strict + ) if p_ok: status, passed, line = "PASS", True, ln - if not src and idx >= 0: src = resolve_source(content, idx) + if not src and idx >= 0: + src = resolve_source(content, idx) any_pass = True - elif err: remediation = f"{err}. " + remediation + elif err: + remediation = f"{err}. " + remediation - # Fallback for JSON or if aggregate failed (double check individual files) if not any_pass: for fp, txt in fmap.items(): - p_ok, ln, _, _ = self._check_req(txt, os.path.splitext(fp)[1], req, strict) + if not fp.endswith(('.md', '.markdown', '.json')): + continue + f_ext = os.path.splitext(fp)[1] + p_ok, ln, _, _ = self._check_req( + txt, f_ext, req, strict + ) if p_ok: status, passed, line, src = "PASS", True, ln, fp any_pass = True break - checks.append({"id": rule_id, "label": req.get("label"), "passed": passed, "status": status, "severity": req.get("severity", "HIGH"), "remediation": remediation, "source": src, "line": line, "control_id": req.get("control_id"), "article": req.get("article")}) + checks.append({ + "id": rule_id, + "label": req.get("label"), + "passed": passed, + "status": status, + "severity": req.get("severity", "HIGH"), + "remediation": remediation, + "source": src, + "line": line, + "control_id": req.get("control_id"), + "article": req.get("article") + }) - # ... (Red Flags & Cross Refs remain similar, using content for regex is safe for text scan) ... + # 3. Check Red Flags for flag in data.get("red_flags", []): rule_id = flag["pattern"] status, passed, line, src = "PASS", True, 1, def_src @@ -208,29 +343,134 @@ def _audit(self, content: str, ext: str, strict: bool, base: str, def_src: str, match = re.search(rule_id, content, re.I | re.M) if match: line = get_line_number(content, match.start()) - if not src: src = resolve_source(content, match.start()) - if rule_id in self.ignored_rules: status = "EXCEPTION" - elif rule_id in skip: status = "SKIPPED" - else: status, passed = "FAIL", False - except re.error: pass - checks.append({"id": rule_id, "label": flag.get("label"), "passed": passed, "status": status, "severity": flag.get("severity", "CRITICAL"), "type": "red_flag", "remediation": flag.get("remediation"), "source": src, "line": line, "control_id": flag.get("control_id"), "article": flag.get("article")}) + if not src: + src = resolve_source(content, match.start()) + + if rule_id in self.ignored_rules: + status = "EXCEPTION" + elif rule_id in skip: + status = "SKIPPED" + else: + status, passed = "FAIL", False + except re.error: + pass + + checks.append({ + "id": rule_id, + "label": flag.get("label"), + "passed": passed, + "status": status, + "severity": flag.get("severity", "CRITICAL"), + "type": "red_flag", + "remediation": flag.get("remediation"), + "source": src, + "line": line, + "control_id": flag.get("control_id"), + "article": flag.get("article") + }) + # 4. Cross-Reference Validation for xr in data.get("cross_references", []): try: for match in re.finditer(xr["source"], content, re.I | re.M): expected = match.expand(xr["must_have"]) line = get_line_number(content, match.start()) passed = expected in content - checks.append({"id": f"XRef: {match.group(0)}->{expected}", "label": "Cross-Reference Validation", "passed": passed, "status": "PASS" if passed else "FAIL", "severity": xr.get("severity", "HIGH"), "remediation": f"Missing {expected}", "line": line, "source": resolve_source(content, match.start(), def_src)}) - except re.error: pass + checks.append({ + "id": f"XRef: {match.group(0)}->{expected}", + "label": "Cross-Reference Validation", + "passed": passed, + "status": "PASS" if passed else "FAIL", + "severity": xr.get("severity", "HIGH"), + "remediation": f"Missing {expected}", + "line": line, + "source": resolve_source( + content, match.start(), def_src + ) + }) + except re.error: + pass + + # 5. Reality Checks (Code-to-Spec Verification) + for rc in data.get("reality_checks", []): + try: + # Find the assertion in the Spec + for match in re.finditer(rc["spec_pattern"], content, re.I | re.M): + spec_val = match.group(1) if match.groups() else match.group(0) + + target_pat = rc["reality_pattern"].replace("\\1", spec_val) + target_file_suffix = rc["target_file"] + + target_contents = [] + for fp, txt in fmap.items(): + if fp.endswith(target_file_suffix): + target_contents.append((fp, txt)) + + if not target_contents: + checks.append({ + "id": f"RealityCheck: {spec_val} -> {target_file_suffix}", + "label": "Code-to-Spec Missing File", + "passed": False, + "status": "FAIL", + "severity": rc.get("severity", "MEDIUM"), + "type": "contradiction", + "remediation": f"Spec claims '{spec_val}', but {target_file_suffix} missing.", + "line": get_line_number(content, match.start()), + "source": resolve_source(content, match.start(), def_src) + }) + continue + + found_in_code = False + for fp, txt in target_contents: + if re.search(target_pat, txt, re.I | re.M): + found_in_code = True + break + + checks.append({ + "id": f"RealityCheck: {spec_val}", + "label": "Code-to-Spec Alignment", + "passed": found_in_code, + "status": "PASS" if found_in_code else "FAIL", + "severity": rc.get("severity", "MEDIUM"), + "type": "contradiction", + "remediation": ( + f"Spec claims '{spec_val}', but pattern " + f"'{target_pat}' not found in {target_file_suffix}" + ), + "line": get_line_number(content, match.start()), + "source": resolve_source(content, match.start(), def_src) + }) + + except re.error as e: + print(f"Reality Check Regex Error: {e}", file=sys.stderr) + # 6. Strict Evidence Check if strict and ext != ".json" and ("security" in name or "baseline" in name): for match in re.finditer(r"\[([^\]]+)\]\((?!http)([^)]+)\)", content): path = match.group(2).strip() if not path.startswith("#"): exists = os.path.exists(os.path.join(base, path)) - checks.append({"id": f"Ev: {match.group(1)}", "label": "Evidence Check", "passed": exists, "status": "PASS" if exists else "FAIL", "severity": "MEDIUM", "remediation": f"Missing: {path}", "line": 1, "source": resolve_source(content, match.start(), def_src)}) + checks.append({ + "id": f"Ev: {match.group(1)}", + "label": "Evidence Check", + "passed": exists, + "status": "PASS" if exists else "FAIL", + "severity": "MEDIUM", + "remediation": f"Missing: {path}", + "line": 1, + "source": resolve_source( + content, match.start(), def_src + ) + }) - block = [c for c in checks if c["status"] == "FAIL" and SEVERITY_MAP.get(c["severity"], 0) >= 3] - report[name] = {"label": data.get("badge_label", name), "checks": checks, "passed": not block} + block = [ + c for c in checks + if c["status"] == "FAIL" + and SEVERITY_MAP.get(c["severity"], 0) >= 3 + ] + report[name] = { + "label": data.get("badge_label", name), + "checks": checks, + "passed": not block + } return report diff --git a/templates/.gitlab-ci.yml b/templates/.gitlab-ci.yml new file mode 100644 index 0000000..ba0035f --- /dev/null +++ b/templates/.gitlab-ci.yml @@ -0,0 +1,23 @@ +# nod Compliance Gatekeeper for GitLab CI +# Usage: Copy this file to your repo root or include it in your existing .gitlab-ci.yml + +stages: + - compliance + +nod_check: + stage: compliance + image: python:3.10-slim + script: + - pip install nod-linter + # Scan the current directory + - nod . --strict --min-severity HIGH --output sarif --save-to nod-results.sarif + artifacts: + when: always + reports: + # Allows GitLab Ultimate users to see results in the Vulnerability Report + sast: nod-results.sarif + paths: + - nod-results.sarif + rules: + - if: $CI_PIPELINE_SOURCE == "merge_request_event" + - if: $CI_COMMIT_BRANCH == $CI_DEFAULT_BRANCH diff --git a/templates/azure-pipelines.yml b/templates/azure-pipelines.yml new file mode 100644 index 0000000..1373c51 --- /dev/null +++ b/templates/azure-pipelines.yml @@ -0,0 +1,27 @@ +# nod Compliance Gatekeeper for Azure DevOps +# Usage: Add this as a step in your pipeline job + +jobs: +- job: ComplianceAudit + displayName: 'nod Compliance Check' + pool: + vmImage: 'ubuntu-latest' + + steps: + - task: UsePythonVersion@0 + inputs: + versionSpec: '3.10' + addToPath: true + + - script: | + pip install nod-linter + nod . --strict --min-severity HIGH --output sarif --save-to nod-results.sarif + displayName: 'Run nod Gatekeeper' + + # Publish SARIF for Azure Advanced Security (if enabled) + - task: PublishBuildArtifacts@1 + condition: always() + inputs: + PathtoPublish: 'nod-results.sarif' + ArtifactName: 'CodeAnalysisLogs' + publishLocation: 'Container' diff --git a/tests/test_core.py b/tests/test_core.py index 5e158b2..e791b41 100644 --- a/tests/test_core.py +++ b/tests/test_core.py @@ -30,6 +30,14 @@ def setUp(self): "severity": "CRITICAL", "remediation": "Do not include forbidden text" } + ], + "reality_checks": [ + { + "spec_pattern": "Database: (\\w+)", + "target_file": "requirements.txt", + "reality_pattern": "(?i)\\1", + "severity": "HIGH" + } ] } } @@ -51,44 +59,47 @@ def test_scanner_pass(self): self.assertTrue(checks[0]["passed"]) self.assertEqual(checks[0]["status"], "PASS") - def test_scanner_fail_missing_header(self): - content = "# Wrong Header" - results = self.scanner._audit(content, ".md", strict=True, base_dir=".", def_src="test.md", fmap={}) + def test_reality_check_pass(self): + # Spec says Database: Postgres + spec_content = "# Database\nDatabase: Postgres" + # Requirements has postgres + req_content = "psycopg2-binary\nPostgres==13.0" + + fmap = {"test.md": spec_content, "requirements.txt": req_content} + + results = self.scanner._audit(spec_content, ".md", strict=True, base_dir=".", def_src="test.md", fmap=fmap) checks = results["test_profile"]["checks"] - self.assertFalse(checks[0]["passed"]) - self.assertEqual(checks[0]["status"], "FAIL") + # Find Reality Check + rc = next(c for c in checks if c["id"].startswith("RealityCheck")) + self.assertTrue(rc["passed"]) + self.assertEqual(rc["status"], "PASS") - def test_scanner_fail_deep_validation(self): - # Header present, but value is wrong (ABC instead of number) - content = "# Required Header\nValue: ABC" - results = self.scanner._audit(content, ".md", strict=True, base_dir=".", def_src="test.md", fmap={}) + def test_reality_check_fail(self): + # Spec says Database: Postgres + spec_content = "# Database\nDatabase: Postgres" + # Requirements has MySQL + req_content = "mysql-connector" + + fmap = {"test.md": spec_content, "requirements.txt": req_content} + + results = self.scanner._audit(spec_content, ".md", strict=True, base_dir=".", def_src="test.md", fmap=fmap) checks = results["test_profile"]["checks"] - self.assertFalse(checks[0]["passed"]) - self.assertIn("Must be number", checks[0]["remediation"]) + # Find Reality Check + rc = next(c for c in checks if c["id"].startswith("RealityCheck")) + self.assertFalse(rc["passed"]) + self.assertEqual(rc["status"], "FAIL") + self.assertEqual(rc["type"], "contradiction") def test_red_flag_detection(self): content = "Some text with FORBIDDEN_TEXT inside." results = self.scanner._audit(content, ".md", strict=True, base_dir=".", def_src="test.md", fmap={}) checks = results["test_profile"]["checks"] - # Should have 2 checks: 1 Req (Fail) + 1 Red Flag (Fail) - self.assertEqual(len(checks), 2) - flag_check = next(c for c in checks if c["type"] == "red_flag") self.assertFalse(flag_check["passed"]) self.assertEqual(flag_check["severity"], "CRITICAL") - def test_ignore_logic(self): - # Ignore the requirement - self.scanner.ignored_rules = ["#+.*Required Header"] - content = "# Wrong Header" - results = self.scanner._audit(content, ".md", strict=True, base_dir=".", def_src="test.md", fmap={}) - checks = results["test_profile"]["checks"] - - self.assertTrue(checks[0]["passed"]) - self.assertEqual(checks[0]["status"], "EXCEPTION") - if __name__ == '__main__': unittest.main()