Senior Data Engineer Production-grade data engineering skill for building scalable, reliable data systems. Table of Contents 1. Trigger Phrases 2. Quick Start 3. Workflows 4. Architecture Decision Framework 5. Tech Stack 6. Reference Documentation 7. Troubleshooting --- Trigger Phrases Activate this skill when you see: Pipeline Design: - "Design a data pipeline for..." - "Build an ETL/ELT process..." - "How should I ingest data from..." - "Set up data extraction from..." Architecture: - "Should I use batch or streaming?" - "Lambda vs Kappa architecture" - "How to handle late-arriving data" -…

,\n 'uuid': r'^[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}

Senior Data Engineer Production-grade data engineering skill for building scalable, reliable data systems. Table of Contents 1. Trigger Phrases 2. Quick Start 3. Workflows 4. Architecture Decision Framework 5. Tech Stack 6. Reference Documentation 7. Troubleshooting --- Trigger Phrases Activate this skill when you see: Pipeline Design: - "Design a data pipeline for..." - "Build an ETL/ELT process..." - "How should I ingest data from..." - "Set up data extraction from..." Architecture: - "Should I use batch or streaming?" - "Lambda vs Kappa architecture" - "How to handle late-arriving data" -…

,\n 'phone': r'^\\+?[\\d\\s\\-\\(\\)]{10,}

Senior Data Engineer Production-grade data engineering skill for building scalable, reliable data systems. Table of Contents 1. Trigger Phrases 2. Quick Start 3. Workflows 4. Architecture Decision Framework 5. Tech Stack 6. Reference Documentation 7. Troubleshooting --- Trigger Phrases Activate this skill when you see: Pipeline Design: - "Design a data pipeline for..." - "Build an ETL/ELT process..." - "How should I ingest data from..." - "Set up data extraction from..." Architecture: - "Should I use batch or streaming?" - "Lambda vs Kappa architecture" - "How to handle late-arriving data" -…

,\n 'url': r'^https?://[^\\s]+

Senior Data Engineer Production-grade data engineering skill for building scalable, reliable data systems. Table of Contents 1. Trigger Phrases 2. Quick Start 3. Workflows 4. Architecture Decision Framework 5. Tech Stack 6. Reference Documentation 7. Troubleshooting --- Trigger Phrases Activate this skill when you see: Pipeline Design: - "Design a data pipeline for..." - "Build an ETL/ELT process..." - "How should I ingest data from..." - "Set up data extraction from..." Architecture: - "Should I use batch or streaming?" - "Lambda vs Kappa architecture" - "How to handle late-arriving data" -…

,\n 'ipv4': r'^(\\d{1,3}\\.){3}\\d{1,3}

Senior Data Engineer Production-grade data engineering skill for building scalable, reliable data systems. Table of Contents 1. Trigger Phrases 2. Quick Start 3. Workflows 4. Architecture Decision Framework 5. Tech Stack 6. Reference Documentation 7. Troubleshooting --- Trigger Phrases Activate this skill when you see: Pipeline Design: - "Design a data pipeline for..." - "Build an ETL/ELT process..." - "How should I ingest data from..." - "Set up data extraction from..." Architecture: - "Should I use batch or streaming?" - "Lambda vs Kappa architecture" - "How to handle late-arriving data" -…

,\n 'date_iso': r'^\\d{4}-\\d{2}-\\d{2}

Senior Data Engineer Production-grade data engineering skill for building scalable, reliable data systems. Table of Contents 1. Trigger Phrases 2. Quick Start 3. Workflows 4. Architecture Decision Framework 5. Tech Stack 6. Reference Documentation 7. Troubleshooting --- Trigger Phrases Activate this skill when you see: Pipeline Design: - "Design a data pipeline for..." - "Build an ETL/ELT process..." - "How should I ingest data from..." - "Set up data extraction from..." Architecture: - "Should I use batch or streaming?" - "Lambda vs Kappa architecture" - "How to handle late-arriving data" -…

,\n 'datetime_iso': r'^\\d{4}-\\d{2}-\\d{2}[T ]\\d{2}:\\d{2}:\\d{2}',\n 'credit_card': r'^\\d{4}[\\s\\-]?\\d{4}[\\s\\-]?\\d{4}[\\s\\-]?\\d{4}

Senior Data Engineer Production-grade data engineering skill for building scalable, reliable data systems. Table of Contents 1. Trigger Phrases 2. Quick Start 3. Workflows 4. Architecture Decision Framework 5. Tech Stack 6. Reference Documentation 7. Troubleshooting --- Trigger Phrases Activate this skill when you see: Pipeline Design: - "Design a data pipeline for..." - "Build an ETL/ELT process..." - "How should I ingest data from..." - "Set up data extraction from..." Architecture: - "Should I use batch or streaming?" - "Lambda vs Kappa architecture" - "How to handle late-arriving data" -…

,\n }\n\n @classmethod\n def detect_type(cls, values: List[str]) -> str:\n \"\"\"Detect the most likely data type from a sample of values\"\"\"\n non_empty = [v for v in values if v and v.strip()]\n if not non_empty:\n return \"string\"\n\n # Check for patterns first\n for pattern_name, pattern in cls.PATTERNS.items():\n regex = re.compile(pattern, re.IGNORECASE)\n matches = sum(1 for v in non_empty if regex.match(v.strip()))\n if matches / len(non_empty) > 0.9:\n return pattern_name\n\n # Check for numeric types\n int_count = 0\n float_count = 0\n bool_count = 0\n\n for v in non_empty:\n v = v.strip()\n if v.lower() in ('true', 'false', 'yes', 'no', '1', '0'):\n bool_count += 1\n try:\n int(v)\n int_count += 1\n except ValueError:\n try:\n float(v)\n float_count += 1\n except ValueError:\n pass\n\n if bool_count / len(non_empty) > 0.9:\n return \"boolean\"\n if int_count / len(non_empty) > 0.9:\n return \"integer\"\n if (int_count + float_count) / len(non_empty) > 0.9:\n return \"float\"\n\n return \"string\"\n\n @classmethod\n def detect_pattern(cls, values: List[str]) -> Optional[str]:\n \"\"\"Try to detect a common pattern in string values\"\"\"\n non_empty = [v for v in values if v and v.strip()]\n if not non_empty or len(non_empty) \u003c 10:\n return None\n\n for pattern_name, pattern in cls.PATTERNS.items():\n regex = re.compile(pattern, re.IGNORECASE)\n matches = sum(1 for v in non_empty if regex.match(v.strip()))\n if matches / len(non_empty) > 0.8:\n return pattern_name\n\n return None\n\n\n# =============================================================================\n# Validators\n# =============================================================================\n\nclass BaseValidator(ABC):\n \"\"\"Base class for validators\"\"\"\n\n @abstractmethod\n def validate(self, data: List[Dict], schema: Optional[DataSchema] = None) -> List[ValidationResult]:\n pass\n\n\nclass SchemaValidator(BaseValidator):\n \"\"\"Validate data against a schema\"\"\"\n\n def validate(self, data: List[Dict], schema: DataSchema) -> List[ValidationResult]:\n results = []\n\n if not data:\n results.append(ValidationResult(\n check_name=\"data_not_empty\",\n column=None,\n passed=False,\n expected=\"non-empty dataset\",\n actual=\"empty dataset\",\n severity=\"error\",\n message=\"Dataset is empty\"\n ))\n return results\n\n # Validate row count\n row_count = len(data)\n if schema.row_count_min and row_count \u003c schema.row_count_min:\n results.append(ValidationResult(\n check_name=\"row_count_min\",\n column=None,\n passed=False,\n expected=f\">= {schema.row_count_min}\",\n actual=row_count,\n severity=\"error\",\n message=f\"Row count {row_count} is below minimum {schema.row_count_min}\"\n ))\n\n if schema.row_count_max and row_count > schema.row_count_max:\n results.append(ValidationResult(\n check_name=\"row_count_max\",\n column=None,\n passed=False,\n expected=f\"\u003c= {schema.row_count_max}\",\n actual=row_count,\n severity=\"warning\",\n message=f\"Row count {row_count} exceeds maximum {schema.row_count_max}\"\n ))\n\n # Validate each column\n for col_schema in schema.columns:\n col_results = self._validate_column(data, col_schema)\n results.extend(col_results)\n\n # Validate primary key uniqueness\n if schema.primary_key:\n pk_results = self._validate_primary_key(data, schema.primary_key)\n results.extend(pk_results)\n\n return results\n\n def _validate_column(self, data: List[Dict], col_schema: ColumnSchema) -> List[ValidationResult]:\n results = []\n col_name = col_schema.name\n\n # Check column exists\n if data and col_name not in data[0]:\n results.append(ValidationResult(\n check_name=\"column_exists\",\n column=col_name,\n passed=False,\n expected=\"column present\",\n actual=\"column missing\",\n severity=\"error\",\n message=f\"Column '{col_name}' not found in data\"\n ))\n return results\n\n values = [row.get(col_name) for row in data]\n failed_rows = []\n\n # Null check\n null_count = sum(1 for v in values if v is None or v == '')\n if not col_schema.nullable and null_count > 0:\n failed_rows = [i for i, v in enumerate(values) if v is None or v == '']\n results.append(ValidationResult(\n check_name=\"not_null\",\n column=col_name,\n passed=False,\n expected=\"no nulls\",\n actual=f\"{null_count} nulls\",\n severity=\"error\",\n message=f\"Column '{col_name}' has {null_count} null values but is not nullable\",\n failed_rows=failed_rows[:100] # Limit to first 100\n ))\n\n non_null_values = [v for v in values if v is not None and v != '']\n\n # Uniqueness check\n if col_schema.unique and non_null_values:\n unique_count = len(set(non_null_values))\n if unique_count != len(non_null_values):\n duplicate_values = [v for v, count in Counter(non_null_values).items() if count > 1]\n results.append(ValidationResult(\n check_name=\"unique\",\n column=col_name,\n passed=False,\n expected=\"all unique\",\n actual=f\"{len(non_null_values) - unique_count} duplicates\",\n severity=\"error\",\n message=f\"Column '{col_name}' has duplicate values: {duplicate_values[:5]}\"\n ))\n\n # Type validation\n type_failures = self._validate_type(non_null_values, col_schema.data_type)\n if type_failures:\n results.append(ValidationResult(\n check_name=\"data_type\",\n column=col_name,\n passed=False,\n expected=col_schema.data_type,\n actual=f\"{len(type_failures)} invalid values\",\n severity=\"error\",\n message=f\"Column '{col_name}' has {len(type_failures)} values not matching type {col_schema.data_type}\",\n failed_rows=type_failures[:100]\n ))\n\n # Range validation for numeric columns\n if col_schema.min_value is not None or col_schema.max_value is not None:\n range_failures = self._validate_range(non_null_values, col_schema)\n if range_failures:\n results.append(ValidationResult(\n check_name=\"value_range\",\n column=col_name,\n passed=False,\n expected=f\"[{col_schema.min_value}, {col_schema.max_value}]\",\n actual=f\"{len(range_failures)} out of range\",\n severity=\"error\",\n message=f\"Column '{col_name}' has values outside range\",\n failed_rows=range_failures[:100]\n ))\n\n # Length validation for string columns\n if col_schema.min_length is not None or col_schema.max_length is not None:\n length_failures = self._validate_length(non_null_values, col_schema)\n if length_failures:\n results.append(ValidationResult(\n check_name=\"string_length\",\n column=col_name,\n passed=False,\n expected=f\"length [{col_schema.min_length}, {col_schema.max_length}]\",\n actual=f\"{len(length_failures)} out of range\",\n severity=\"warning\",\n message=f\"Column '{col_name}' has values with invalid length\",\n failed_rows=length_failures[:100]\n ))\n\n # Pattern validation\n if col_schema.pattern:\n pattern_failures = self._validate_pattern(non_null_values, col_schema.pattern)\n if pattern_failures:\n results.append(ValidationResult(\n check_name=\"pattern_match\",\n column=col_name,\n passed=False,\n expected=f\"matches {col_schema.pattern}\",\n actual=f\"{len(pattern_failures)} non-matching\",\n severity=\"error\",\n message=f\"Column '{col_name}' has values not matching pattern\",\n failed_rows=pattern_failures[:100]\n ))\n\n # Allowed values validation\n if col_schema.allowed_values:\n allowed_set = set(col_schema.allowed_values)\n invalid = [i for i, v in enumerate(non_null_values) if str(v) not in allowed_set]\n if invalid:\n results.append(ValidationResult(\n check_name=\"allowed_values\",\n column=col_name,\n passed=False,\n expected=f\"one of {col_schema.allowed_values}\",\n actual=f\"{len(invalid)} invalid values\",\n severity=\"error\",\n message=f\"Column '{col_name}' has values not in allowed list\",\n failed_rows=invalid[:100]\n ))\n\n return results\n\n def _validate_type(self, values: List[Any], expected_type: str) -> List[int]:\n \"\"\"Return indices of values that don't match expected type\"\"\"\n failures = []\n\n for i, v in enumerate(values):\n v_str = str(v)\n valid = False\n\n if expected_type == \"integer\":\n try:\n int(v_str)\n valid = True\n except ValueError:\n pass\n elif expected_type == \"float\":\n try:\n float(v_str)\n valid = True\n except ValueError:\n pass\n elif expected_type == \"boolean\":\n valid = v_str.lower() in ('true', 'false', 'yes', 'no', '1', '0')\n elif expected_type == \"email\":\n valid = bool(re.match(TypeDetector.PATTERNS['email'], v_str, re.IGNORECASE))\n elif expected_type == \"uuid\":\n valid = bool(re.match(TypeDetector.PATTERNS['uuid'], v_str, re.IGNORECASE))\n elif expected_type in (\"date\", \"date_iso\"):\n valid = bool(re.match(TypeDetector.PATTERNS['date_iso'], v_str))\n elif expected_type in (\"datetime\", \"datetime_iso\"):\n valid = bool(re.match(TypeDetector.PATTERNS['datetime_iso'], v_str))\n else:\n valid = True # string accepts anything\n\n if not valid:\n failures.append(i)\n\n return failures\n\n def _validate_range(self, values: List[Any], col_schema: ColumnSchema) -> List[int]:\n \"\"\"Return indices of values outside the specified range\"\"\"\n failures = []\n for i, v in enumerate(values):\n try:\n num = float(v)\n if col_schema.min_value is not None and num \u003c col_schema.min_value:\n failures.append(i)\n elif col_schema.max_value is not None and num > col_schema.max_value:\n failures.append(i)\n except (ValueError, TypeError):\n pass\n return failures\n\n def _validate_length(self, values: List[Any], col_schema: ColumnSchema) -> List[int]:\n \"\"\"Return indices of values with invalid string length\"\"\"\n failures = []\n for i, v in enumerate(values):\n length = len(str(v))\n if col_schema.min_length is not None and length \u003c col_schema.min_length:\n failures.append(i)\n elif col_schema.max_length is not None and length > col_schema.max_length:\n failures.append(i)\n return failures\n\n def _validate_pattern(self, values: List[Any], pattern: str) -> List[int]:\n \"\"\"Return indices of values not matching the pattern\"\"\"\n regex = re.compile(pattern)\n return [i for i, v in enumerate(values) if not regex.match(str(v))]\n\n def _validate_primary_key(self, data: List[Dict], pk_columns: List[str]) -> List[ValidationResult]:\n \"\"\"Validate primary key uniqueness\"\"\"\n results = []\n pk_values = []\n\n for row in data:\n pk = tuple(row.get(col) for col in pk_columns)\n pk_values.append(pk)\n\n pk_counts = Counter(pk_values)\n duplicates = {pk: count for pk, count in pk_counts.items() if count > 1}\n\n if duplicates:\n results.append(ValidationResult(\n check_name=\"primary_key_unique\",\n column=\",\".join(pk_columns),\n passed=False,\n expected=\"all unique\",\n actual=f\"{len(duplicates)} duplicate keys\",\n severity=\"error\",\n message=f\"Primary key has {len(duplicates)} duplicate combinations\"\n ))\n\n return results\n\n\nclass AnomalyDetector(BaseValidator):\n \"\"\"Detect anomalies in data\"\"\"\n\n def __init__(self, z_threshold: float = 3.0, iqr_multiplier: float = 1.5):\n self.z_threshold = z_threshold\n self.iqr_multiplier = iqr_multiplier\n\n def validate(self, data: List[Dict], schema: Optional[DataSchema] = None) -> List[ValidationResult]:\n results = []\n\n if not data:\n return results\n\n # Get numeric columns\n numeric_columns = []\n for col in data[0].keys():\n values = [row.get(col) for row in data]\n non_null = [v for v in values if v is not None and v != '']\n try:\n [float(v) for v in non_null[:100]]\n numeric_columns.append(col)\n except (ValueError, TypeError):\n pass\n\n for col in numeric_columns:\n col_results = self._detect_numeric_anomalies(data, col)\n results.extend(col_results)\n\n return results\n\n def _detect_numeric_anomalies(self, data: List[Dict], column: str) -> List[ValidationResult]:\n results = []\n\n values = []\n for row in data:\n v = row.get(column)\n if v is not None and v != '':\n try:\n values.append(float(v))\n except (ValueError, TypeError):\n pass\n\n if len(values) \u003c 10:\n return results\n\n # Z-score method\n mean = statistics.mean(values)\n std = statistics.stdev(values) if len(values) > 1 else 0\n\n if std > 0:\n z_outliers = []\n for i, v in enumerate(values):\n z_score = abs((v - mean) / std)\n if z_score > self.z_threshold:\n z_outliers.append((i, v, z_score))\n\n if z_outliers:\n results.append(ValidationResult(\n check_name=\"z_score_outlier\",\n column=column,\n passed=len(z_outliers) == 0,\n expected=f\"z-score \u003c= {self.z_threshold}\",\n actual=f\"{len(z_outliers)} outliers\",\n severity=\"warning\",\n message=f\"Column '{column}' has {len(z_outliers)} statistical outliers (z-score method)\",\n failed_rows=[o[0] for o in z_outliers[:100]]\n ))\n\n # IQR method\n sorted_values = sorted(values)\n q1_idx = len(sorted_values) // 4\n q3_idx = (3 * len(sorted_values)) // 4\n q1 = sorted_values[q1_idx]\n q3 = sorted_values[q3_idx]\n iqr = q3 - q1\n\n lower_bound = q1 - self.iqr_multiplier * iqr\n upper_bound = q3 + self.iqr_multiplier * iqr\n\n iqr_outliers = [(i, v) for i, v in enumerate(values) if v \u003c lower_bound or v > upper_bound]\n\n if iqr_outliers:\n results.append(ValidationResult(\n check_name=\"iqr_outlier\",\n column=column,\n passed=len(iqr_outliers) == 0,\n expected=f\"value in [{lower_bound:.2f}, {upper_bound:.2f}]\",\n actual=f\"{len(iqr_outliers)} outliers\",\n severity=\"warning\",\n message=f\"Column '{column}' has {len(iqr_outliers)} outliers (IQR method)\",\n failed_rows=[o[0] for o in iqr_outliers[:100]]\n ))\n\n return results\n\n\n# =============================================================================\n# Data Profiler\n# =============================================================================\n\nclass DataProfiler:\n \"\"\"Generate statistical profiles of datasets\"\"\"\n\n def profile(self, data: List[Dict], name: str = \"dataset\") -> DataProfile:\n \"\"\"Generate a complete profile of the dataset\"\"\"\n if not data:\n return DataProfile(\n name=name,\n row_count=0,\n column_count=0,\n columns=[],\n duplicate_rows=0,\n memory_size_bytes=0,\n profile_timestamp=datetime.now().isoformat()\n )\n\n columns = list(data[0].keys())\n column_profiles = []\n\n for col in columns:\n profile = self._profile_column(data, col)\n column_profiles.append(profile)\n\n # Count duplicates\n row_tuples = [tuple(sorted(row.items())) for row in data]\n duplicate_count = len(row_tuples) - len(set(row_tuples))\n\n # Estimate memory size\n memory_size = sys.getsizeof(data) + sum(\n sys.getsizeof(row) + sum(sys.getsizeof(v) for v in row.values())\n for row in data\n )\n\n return DataProfile(\n name=name,\n row_count=len(data),\n column_count=len(columns),\n columns=column_profiles,\n duplicate_rows=duplicate_count,\n memory_size_bytes=memory_size,\n profile_timestamp=datetime.now().isoformat()\n )\n\n def _profile_column(self, data: List[Dict], column: str) -> ColumnProfile:\n \"\"\"Generate profile for a single column\"\"\"\n values = [row.get(column) for row in data]\n non_null = [v for v in values if v is not None and v != '']\n\n total_count = len(values)\n null_count = total_count - len(non_null)\n null_pct = (null_count / total_count * 100) if total_count > 0 else 0\n\n unique_values = set(str(v) for v in non_null)\n unique_count = len(unique_values)\n unique_pct = (unique_count / len(non_null) * 100) if non_null else 0\n\n # Detect type\n sample = [str(v) for v in non_null[:1000]]\n detected_type = TypeDetector.detect_type(sample)\n detected_pattern = TypeDetector.detect_pattern(sample)\n\n # Top values\n value_counts = Counter(str(v) for v in non_null)\n top_values = value_counts.most_common(10)\n\n profile = ColumnProfile(\n name=column,\n data_type=detected_type,\n total_count=total_count,\n null_count=null_count,\n null_percentage=null_pct,\n unique_count=unique_count,\n unique_percentage=unique_pct,\n detected_pattern=detected_pattern,\n top_values=top_values\n )\n\n # Add numeric stats if applicable\n if detected_type in ('integer', 'float'):\n numeric_values = []\n for v in non_null:\n try:\n numeric_values.append(float(v))\n except (ValueError, TypeError):\n pass\n\n if numeric_values:\n sorted_vals = sorted(numeric_values)\n profile.min_value = min(numeric_values)\n profile.max_value = max(numeric_values)\n profile.mean = statistics.mean(numeric_values)\n profile.median = statistics.median(numeric_values)\n if len(numeric_values) > 1:\n profile.std_dev = statistics.stdev(numeric_values)\n profile.percentile_25 = sorted_vals[len(sorted_vals) // 4]\n profile.percentile_75 = sorted_vals[(3 * len(sorted_vals)) // 4]\n\n # Add string stats\n if detected_type == 'string':\n lengths = [len(str(v)) for v in non_null]\n if lengths:\n profile.min_length = min(lengths)\n profile.max_length = max(lengths)\n profile.avg_length = statistics.mean(lengths)\n\n return profile\n\n\n# =============================================================================\n# Great Expectations Suite Generator\n# =============================================================================\n\nclass GreatExpectationsGenerator:\n \"\"\"Generate Great Expectations validation suites\"\"\"\n\n def generate_suite(self, profile: DataProfile) -> Dict:\n \"\"\"Generate a Great Expectations suite from a data profile\"\"\"\n expectations = []\n\n for col_profile in profile.columns:\n col_expectations = self._generate_column_expectations(col_profile)\n expectations.extend(col_expectations)\n\n # Table-level expectations\n expectations.append({\n \"expectation_type\": \"expect_table_row_count_to_be_between\",\n \"kwargs\": {\n \"min_value\": max(1, int(profile.row_count * 0.5)),\n \"max_value\": int(profile.row_count * 2)\n }\n })\n\n expectations.append({\n \"expectation_type\": \"expect_table_column_count_to_equal\",\n \"kwargs\": {\n \"value\": profile.column_count\n }\n })\n\n suite = {\n \"expectation_suite_name\": f\"{profile.name}_suite\",\n \"expectations\": expectations,\n \"meta\": {\n \"generated_at\": datetime.now().isoformat(),\n \"generator\": \"data_quality_validator\",\n \"source_profile\": profile.name\n }\n }\n\n return suite\n\n def _generate_column_expectations(self, col_profile: ColumnProfile) -> List[Dict]:\n \"\"\"Generate expectations for a single column\"\"\"\n expectations = []\n col_name = col_profile.name\n\n # Column exists\n expectations.append({\n \"expectation_type\": \"expect_column_to_exist\",\n \"kwargs\": {\"column\": col_name}\n })\n\n # Null percentage\n if col_profile.null_percentage \u003c 1:\n expectations.append({\n \"expectation_type\": \"expect_column_values_to_not_be_null\",\n \"kwargs\": {\"column\": col_name}\n })\n elif col_profile.null_percentage \u003c 50:\n expectations.append({\n \"expectation_type\": \"expect_column_values_to_not_be_null\",\n \"kwargs\": {\n \"column\": col_name,\n \"mostly\": 1 - (col_profile.null_percentage / 100 * 1.5)\n }\n })\n\n # Uniqueness\n if col_profile.unique_percentage > 99:\n expectations.append({\n \"expectation_type\": \"expect_column_values_to_be_unique\",\n \"kwargs\": {\"column\": col_name}\n })\n\n # Type-specific expectations\n if col_profile.data_type == 'integer':\n expectations.append({\n \"expectation_type\": \"expect_column_values_to_be_in_type_list\",\n \"kwargs\": {\n \"column\": col_name,\n \"type_list\": [\"int\", \"int64\", \"INTEGER\", \"BIGINT\"]\n }\n })\n if col_profile.min_value is not None:\n expectations.append({\n \"expectation_type\": \"expect_column_values_to_be_between\",\n \"kwargs\": {\n \"column\": col_name,\n \"min_value\": col_profile.min_value,\n \"max_value\": col_profile.max_value\n }\n })\n\n elif col_profile.data_type == 'float':\n expectations.append({\n \"expectation_type\": \"expect_column_values_to_be_in_type_list\",\n \"kwargs\": {\n \"column\": col_name,\n \"type_list\": [\"float\", \"float64\", \"FLOAT\", \"DOUBLE\"]\n }\n })\n if col_profile.min_value is not None:\n expectations.append({\n \"expectation_type\": \"expect_column_values_to_be_between\",\n \"kwargs\": {\n \"column\": col_name,\n \"min_value\": col_profile.min_value,\n \"max_value\": col_profile.max_value\n }\n })\n\n elif col_profile.data_type == 'email':\n expectations.append({\n \"expectation_type\": \"expect_column_values_to_match_regex\",\n \"kwargs\": {\n \"column\": col_name,\n \"regex\": r\"^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\\.[a-zA-Z]{2,}$\"\n }\n })\n\n elif col_profile.data_type in ('date_iso', 'date'):\n expectations.append({\n \"expectation_type\": \"expect_column_values_to_match_strftime_format\",\n \"kwargs\": {\n \"column\": col_name,\n \"strftime_format\": \"%Y-%m-%d\"\n }\n })\n\n # String length expectations\n if col_profile.min_length is not None:\n expectations.append({\n \"expectation_type\": \"expect_column_value_lengths_to_be_between\",\n \"kwargs\": {\n \"column\": col_name,\n \"min_value\": max(1, col_profile.min_length),\n \"max_value\": col_profile.max_length * 2 if col_profile.max_length else None\n }\n })\n\n # Categorical (low cardinality) columns\n if col_profile.unique_count \u003c= 20 and col_profile.unique_percentage \u003c 10:\n top_values = [v[0] for v in col_profile.top_values if v[1] > col_profile.total_count * 0.01]\n if top_values:\n expectations.append({\n \"expectation_type\": \"expect_column_values_to_be_in_set\",\n \"kwargs\": {\n \"column\": col_name,\n \"value_set\": top_values,\n \"mostly\": 0.95\n }\n })\n\n return expectations\n\n\n# =============================================================================\n# Quality Score Calculator\n# =============================================================================\n\nclass QualityScoreCalculator:\n \"\"\"Calculate overall data quality scores\"\"\"\n\n def calculate(self, profile: DataProfile, validation_results: List[ValidationResult]) -> QualityScore:\n \"\"\"Calculate quality score from profile and validation results\"\"\"\n # Completeness: average non-null percentage\n completeness = 100 - statistics.mean([c.null_percentage for c in profile.columns]) if profile.columns else 0\n\n # Uniqueness: average unique percentage for columns expected to be unique\n unique_cols = [c for c in profile.columns if c.unique_percentage > 90]\n uniqueness = statistics.mean([c.unique_percentage for c in unique_cols]) if unique_cols else 100\n\n # Validity: percentage of passed checks\n total_checks = len(validation_results)\n passed_checks = sum(1 for r in validation_results if r.passed)\n validity = (passed_checks / total_checks * 100) if total_checks > 0 else 100\n\n # Consistency: percentage of non-error results\n error_checks = sum(1 for r in validation_results if not r.passed and r.severity == \"error\")\n consistency = ((total_checks - error_checks) / total_checks * 100) if total_checks > 0 else 100\n\n # Accuracy: based on pattern matching and type detection\n pattern_detected = sum(1 for c in profile.columns if c.detected_pattern)\n accuracy = min(100, 50 + (pattern_detected / len(profile.columns) * 50)) if profile.columns else 50\n\n # Overall: weighted average\n overall = (\n completeness * 0.25 +\n uniqueness * 0.15 +\n validity * 0.30 +\n consistency * 0.20 +\n accuracy * 0.10\n )\n\n return QualityScore(\n completeness=round(completeness, 2),\n uniqueness=round(uniqueness, 2),\n validity=round(validity, 2),\n consistency=round(consistency, 2),\n accuracy=round(accuracy, 2),\n overall=round(overall, 2)\n )\n\n\n# =============================================================================\n# Data Contract Validator\n# =============================================================================\n\nclass DataContractValidator:\n \"\"\"Validate data against a data contract\"\"\"\n\n def load_contract(self, contract_path: str) -> Dict:\n \"\"\"Load a data contract from file\"\"\"\n with open(contract_path, 'r') as f:\n content = f.read()\n\n # Support both YAML and JSON\n if contract_path.endswith('.yaml') or contract_path.endswith('.yml'):\n # Simple YAML parsing (for basic contracts)\n contract = self._parse_simple_yaml(content)\n else:\n contract = json.loads(content)\n\n return contract\n\n def _parse_simple_yaml(self, content: str) -> Dict:\n \"\"\"Parse simple YAML-like format\"\"\"\n result = {}\n current_section = result\n section_stack = [(result, -1)]\n\n for line in content.split('\\n'):\n if not line.strip() or line.strip().startswith('#'):\n continue\n\n # Calculate indentation\n indent = len(line) - len(line.lstrip())\n line = line.strip()\n\n # Pop sections with greater or equal indentation\n while section_stack and section_stack[-1][1] >= indent:\n section_stack.pop()\n\n current_section = section_stack[-1][0]\n\n if ':' in line:\n key, value = line.split(':', 1)\n key = key.strip()\n value = value.strip()\n\n if value:\n # Handle lists\n if value.startswith('[') and value.endswith(']'):\n current_section[key] = [v.strip().strip('\"\\'') for v in value[1:-1].split(',')]\n elif value.lower() in ('true', 'false'):\n current_section[key] = value.lower() == 'true'\n elif value.isdigit():\n current_section[key] = int(value)\n else:\n current_section[key] = value.strip('\"\\'')\n else:\n current_section[key] = {}\n section_stack.append((current_section[key], indent))\n elif line.startswith('- '):\n # List item\n if not isinstance(current_section, list):\n # Convert to list\n parent = section_stack[-2][0] if len(section_stack) > 1 else result\n for k, v in parent.items():\n if v is current_section:\n parent[k] = [current_section] if current_section else []\n current_section = parent[k]\n section_stack[-1] = (current_section, section_stack[-1][1])\n break\n current_section.append(line[2:].strip())\n\n return result\n\n def validate_contract(self, data: List[Dict], contract: Dict) -> List[ValidationResult]:\n \"\"\"Validate data against contract\"\"\"\n results = []\n\n # Validate schema section\n if 'schema' in contract:\n schema_def = contract['schema']\n columns = schema_def.get('columns', schema_def.get('fields', []))\n\n for col_def in columns:\n col_name = col_def.get('name', col_def.get('column', ''))\n if not col_name:\n continue\n\n # Check column exists\n if data and col_name not in data[0]:\n results.append(ValidationResult(\n check_name=\"contract_column_exists\",\n column=col_name,\n passed=False,\n expected=\"column present\",\n actual=\"column missing\",\n severity=\"error\",\n message=f\"Contract requires column '{col_name}' but it's missing\"\n ))\n continue\n\n # Check data type\n expected_type = col_def.get('type', col_def.get('data_type', 'string'))\n values = [row.get(col_name) for row in data]\n non_null = [str(v) for v in values if v is not None and v != '']\n\n if non_null:\n detected_type = TypeDetector.detect_type(non_null[:1000])\n type_compatible = self._types_compatible(detected_type, expected_type)\n\n if not type_compatible:\n results.append(ValidationResult(\n check_name=\"contract_data_type\",\n column=col_name,\n passed=False,\n expected=expected_type,\n actual=detected_type,\n severity=\"error\",\n message=f\"Contract expects type '{expected_type}' but detected '{detected_type}'\"\n ))\n\n # Check nullable\n if not col_def.get('nullable', True):\n null_count = sum(1 for v in values if v is None or v == '')\n if null_count > 0:\n results.append(ValidationResult(\n check_name=\"contract_not_null\",\n column=col_name,\n passed=False,\n expected=\"no nulls\",\n actual=f\"{null_count} nulls\",\n severity=\"error\",\n message=f\"Contract requires non-null but found {null_count} nulls\"\n ))\n\n # Validate SLA section\n if 'sla' in contract:\n sla = contract['sla']\n\n # Row count bounds\n min_rows = sla.get('min_rows', sla.get('minimum_records'))\n max_rows = sla.get('max_rows', sla.get('maximum_records'))\n\n row_count = len(data)\n if min_rows and row_count \u003c min_rows:\n results.append(ValidationResult(\n check_name=\"contract_min_rows\",\n column=None,\n passed=False,\n expected=f\">= {min_rows} rows\",\n actual=f\"{row_count} rows\",\n severity=\"error\",\n message=f\"Contract requires at least {min_rows} rows\"\n ))\n\n if max_rows and row_count > max_rows:\n results.append(ValidationResult(\n check_name=\"contract_max_rows\",\n column=None,\n passed=False,\n expected=f\"\u003c= {max_rows} rows\",\n actual=f\"{row_count} rows\",\n severity=\"warning\",\n message=f\"Contract allows at most {max_rows} rows\"\n ))\n\n return results\n\n def _types_compatible(self, detected: str, expected: str) -> bool:\n \"\"\"Check if detected type is compatible with expected type\"\"\"\n expected = expected.lower()\n detected = detected.lower()\n\n type_groups = {\n 'numeric': ['integer', 'int', 'float', 'double', 'decimal', 'number'],\n 'string': ['string', 'varchar', 'char', 'text'],\n 'boolean': ['boolean', 'bool'],\n 'date': ['date', 'date_iso'],\n 'datetime': ['datetime', 'datetime_iso', 'timestamp'],\n }\n\n for group, types in type_groups.items():\n if expected in types and detected in types:\n return True\n\n return detected == expected\n\n\n# =============================================================================\n# Report Generator\n# =============================================================================\n\nclass ReportGenerator:\n \"\"\"Generate validation reports\"\"\"\n\n def generate_text_report(self,\n profile: DataProfile,\n results: List[ValidationResult],\n score: QualityScore) -> str:\n \"\"\"Generate a text report\"\"\"\n lines = []\n lines.append(\"=\" * 80)\n lines.append(\"DATA QUALITY VALIDATION REPORT\")\n lines.append(\"=\" * 80)\n lines.append(f\"\\nDataset: {profile.name}\")\n lines.append(f\"Generated: {datetime.now().isoformat()}\")\n lines.append(f\"Rows: {profile.row_count:,}\")\n lines.append(f\"Columns: {profile.column_count}\")\n lines.append(f\"Duplicate Rows: {profile.duplicate_rows:,}\")\n\n # Quality Score\n lines.append(\"\\n\" + \"-\" * 40)\n lines.append(\"QUALITY SCORES\")\n lines.append(\"-\" * 40)\n lines.append(f\" Overall: {score.overall:>6.1f}% {'✓' if score.overall >= 80 else '✗'}\")\n lines.append(f\" Completeness: {score.completeness:>6.1f}%\")\n lines.append(f\" Uniqueness: {score.uniqueness:>6.1f}%\")\n lines.append(f\" Validity: {score.validity:>6.1f}%\")\n lines.append(f\" Consistency: {score.consistency:>6.1f}%\")\n lines.append(f\" Accuracy: {score.accuracy:>6.1f}%\")\n\n # Validation Results Summary\n passed = sum(1 for r in results if r.passed)\n failed = len(results) - passed\n errors = sum(1 for r in results if not r.passed and r.severity == \"error\")\n warnings = sum(1 for r in results if not r.passed and r.severity == \"warning\")\n\n lines.append(\"\\n\" + \"-\" * 40)\n lines.append(\"VALIDATION SUMMARY\")\n lines.append(\"-\" * 40)\n lines.append(f\" Total Checks: {len(results)}\")\n lines.append(f\" Passed: {passed} ✓\")\n lines.append(f\" Failed: {failed} ✗\")\n lines.append(f\" Errors: {errors}\")\n lines.append(f\" Warnings: {warnings}\")\n\n # Failed checks details\n if failed > 0:\n lines.append(\"\\n\" + \"-\" * 40)\n lines.append(\"FAILED CHECKS\")\n lines.append(\"-\" * 40)\n\n for r in results:\n if not r.passed:\n severity_icon = \"❌\" if r.severity == \"error\" else \"⚠️\"\n col_str = f\"[{r.column}]\" if r.column else \"\"\n lines.append(f\"\\n{severity_icon} {r.check_name} {col_str}\")\n lines.append(f\" Expected: {r.expected}\")\n lines.append(f\" Actual: {r.actual}\")\n if r.message:\n lines.append(f\" Message: {r.message}\")\n\n # Column profiles\n lines.append(\"\\n\" + \"-\" * 40)\n lines.append(\"COLUMN PROFILES\")\n lines.append(\"-\" * 40)\n\n for col in profile.columns:\n lines.append(f\"\\n {col.name}\")\n lines.append(f\" Type: {col.data_type}\")\n lines.append(f\" Nulls: {col.null_count:,} ({col.null_percentage:.1f}%)\")\n lines.append(f\" Unique: {col.unique_count:,} ({col.unique_percentage:.1f}%)\")\n\n if col.min_value is not None:\n lines.append(f\" Range: [{col.min_value:.2f}, {col.max_value:.2f}]\")\n lines.append(f\" Mean: {col.mean:.2f}, Median: {col.median:.2f}\")\n\n if col.min_length is not None:\n lines.append(f\" Length: [{col.min_length}, {col.max_length}] (avg: {col.avg_length:.1f})\")\n\n if col.detected_pattern:\n lines.append(f\" Pattern: {col.detected_pattern}\")\n\n if col.top_values:\n top_3 = col.top_values[:3]\n lines.append(f\" Top values: {', '.join(f'{v[0]} ({v[1]})' for v in top_3)}\")\n\n lines.append(\"\\n\" + \"=\" * 80)\n\n return \"\\n\".join(lines)\n\n def generate_json_report(self,\n profile: DataProfile,\n results: List[ValidationResult],\n score: QualityScore) -> Dict:\n \"\"\"Generate a JSON report\"\"\"\n return {\n \"report_type\": \"data_quality_validation\",\n \"generated_at\": datetime.now().isoformat(),\n \"dataset\": {\n \"name\": profile.name,\n \"row_count\": profile.row_count,\n \"column_count\": profile.column_count,\n \"duplicate_rows\": profile.duplicate_rows,\n \"memory_bytes\": profile.memory_size_bytes\n },\n \"quality_score\": asdict(score),\n \"validation_summary\": {\n \"total_checks\": len(results),\n \"passed\": sum(1 for r in results if r.passed),\n \"failed\": sum(1 for r in results if not r.passed),\n \"errors\": sum(1 for r in results if not r.passed and r.severity == \"error\"),\n \"warnings\": sum(1 for r in results if not r.passed and r.severity == \"warning\")\n },\n \"validation_results\": [\n {\n \"check\": r.check_name,\n \"column\": r.column,\n \"passed\": r.passed,\n \"severity\": r.severity,\n \"expected\": str(r.expected),\n \"actual\": str(r.actual),\n \"message\": r.message\n }\n for r in results\n ],\n \"column_profiles\": [asdict(c) for c in profile.columns]\n }\n\n\n# =============================================================================\n# Data Loader\n# =============================================================================\n\nclass DataLoader:\n \"\"\"Load data from various formats\"\"\"\n\n @staticmethod\n def load(file_path: str) -> List[Dict]:\n \"\"\"Load data from file\"\"\"\n path = Path(file_path)\n\n if not path.exists():\n raise FileNotFoundError(f\"File not found: {file_path}\")\n\n suffix = path.suffix.lower()\n\n if suffix == '.csv':\n return DataLoader._load_csv(file_path)\n elif suffix == '.json':\n return DataLoader._load_json(file_path)\n elif suffix == '.jsonl':\n return DataLoader._load_jsonl(file_path)\n else:\n raise ValueError(f\"Unsupported file format: {suffix}\")\n\n @staticmethod\n def _load_csv(file_path: str) -> List[Dict]:\n \"\"\"Load CSV file\"\"\"\n data = []\n with open(file_path, 'r', newline='', encoding='utf-8') as f:\n reader = csv.DictReader(f)\n for row in reader:\n data.append(dict(row))\n return data\n\n @staticmethod\n def _load_json(file_path: str) -> List[Dict]:\n \"\"\"Load JSON file\"\"\"\n with open(file_path, 'r', encoding='utf-8') as f:\n content = json.load(f)\n\n if isinstance(content, list):\n return content\n elif isinstance(content, dict):\n # Check for common data keys\n for key in ['data', 'records', 'rows', 'items']:\n if key in content and isinstance(content[key], list):\n return content[key]\n return [content]\n else:\n raise ValueError(\"JSON must contain array or object with data key\")\n\n @staticmethod\n def _load_jsonl(file_path: str) -> List[Dict]:\n \"\"\"Load JSON Lines file\"\"\"\n data = []\n with open(file_path, 'r', encoding='utf-8') as f:\n for line in f:\n line = line.strip()\n if line:\n data.append(json.loads(line))\n return data\n\n\n# =============================================================================\n# Schema Loader\n# =============================================================================\n\nclass SchemaLoader:\n \"\"\"Load schema definitions\"\"\"\n\n @staticmethod\n def load(file_path: str) -> DataSchema:\n \"\"\"Load schema from JSON file\"\"\"\n with open(file_path, 'r', encoding='utf-8') as f:\n schema_dict = json.load(f)\n\n columns = []\n for col_def in schema_dict.get('columns', []):\n columns.append(ColumnSchema(\n name=col_def['name'],\n data_type=col_def.get('type', col_def.get('data_type', 'string')),\n nullable=col_def.get('nullable', True),\n unique=col_def.get('unique', False),\n min_value=col_def.get('min_value'),\n max_value=col_def.get('max_value'),\n min_length=col_def.get('min_length'),\n max_length=col_def.get('max_length'),\n pattern=col_def.get('pattern'),\n allowed_values=col_def.get('allowed_values'),\n description=col_def.get('description', '')\n ))\n\n return DataSchema(\n name=schema_dict.get('name', 'unknown'),\n version=schema_dict.get('version', '1.0'),\n columns=columns,\n primary_key=schema_dict.get('primary_key'),\n row_count_min=schema_dict.get('row_count_min'),\n row_count_max=schema_dict.get('row_count_max')\n )\n\n\n# =============================================================================\n# CLI Interface\n# =============================================================================\n\ndef cmd_validate(args):\n \"\"\"Run validation against schema\"\"\"\n logger.info(f\"Loading data from {args.input}\")\n data = DataLoader.load(args.input)\n\n results = []\n\n if args.schema:\n logger.info(f\"Loading schema from {args.schema}\")\n schema = SchemaLoader.load(args.schema)\n\n validator = SchemaValidator()\n results = validator.validate(data, schema)\n\n if args.detect_anomalies:\n logger.info(\"Running anomaly detection\")\n anomaly_detector = AnomalyDetector()\n anomaly_results = anomaly_detector.validate(data)\n results.extend(anomaly_results)\n\n # Profile data\n profiler = DataProfiler()\n profile = profiler.profile(data, name=Path(args.input).stem)\n\n # Calculate score\n score_calc = QualityScoreCalculator()\n score = score_calc.calculate(profile, results)\n\n # Generate report\n reporter = ReportGenerator()\n\n if args.json:\n report = reporter.generate_json_report(profile, results, score)\n output = json.dumps(report, indent=2)\n else:\n output = reporter.generate_text_report(profile, results, score)\n\n if args.output:\n with open(args.output, 'w') as f:\n f.write(output)\n logger.info(f\"Report saved to {args.output}\")\n else:\n print(output)\n\n # Exit with error if validation failed\n errors = sum(1 for r in results if not r.passed and r.severity == \"error\")\n if errors > 0:\n sys.exit(1)\n\n\ndef cmd_profile(args):\n \"\"\"Generate data profile\"\"\"\n logger.info(f\"Loading data from {args.input}\")\n data = DataLoader.load(args.input)\n\n profiler = DataProfiler()\n profile = profiler.profile(data, name=Path(args.input).stem)\n\n if args.json or args.output:\n output = json.dumps(asdict(profile), indent=2, default=str)\n else:\n # Text output\n lines = []\n lines.append(f\"Dataset: {profile.name}\")\n lines.append(f\"Rows: {profile.row_count:,}\")\n lines.append(f\"Columns: {profile.column_count}\")\n lines.append(f\"Duplicate rows: {profile.duplicate_rows:,}\")\n lines.append(f\"\\nColumn Profiles:\")\n\n for col in profile.columns:\n lines.append(f\"\\n {col.name} ({col.data_type})\")\n lines.append(f\" Nulls: {col.null_percentage:.1f}%\")\n lines.append(f\" Unique: {col.unique_percentage:.1f}%\")\n if col.mean is not None:\n lines.append(f\" Stats: min={col.min_value}, max={col.max_value}, mean={col.mean:.2f}\")\n\n output = \"\\n\".join(lines)\n\n if args.output:\n with open(args.output, 'w') as f:\n f.write(output)\n logger.info(f\"Profile saved to {args.output}\")\n else:\n print(output)\n\n\ndef cmd_generate_suite(args):\n \"\"\"Generate Great Expectations suite\"\"\"\n logger.info(f\"Loading data from {args.input}\")\n data = DataLoader.load(args.input)\n\n # Profile first\n profiler = DataProfiler()\n profile = profiler.profile(data, name=Path(args.input).stem)\n\n # Generate suite\n generator = GreatExpectationsGenerator()\n suite = generator.generate_suite(profile)\n\n output = json.dumps(suite, indent=2)\n\n if args.output:\n with open(args.output, 'w') as f:\n f.write(output)\n logger.info(f\"Expectation suite saved to {args.output}\")\n else:\n print(output)\n\n\ndef cmd_contract(args):\n \"\"\"Validate against data contract\"\"\"\n logger.info(f\"Loading data from {args.input}\")\n data = DataLoader.load(args.input)\n\n logger.info(f\"Loading contract from {args.contract}\")\n contract_validator = DataContractValidator()\n contract = contract_validator.load_contract(args.contract)\n\n results = contract_validator.validate_contract(data, contract)\n\n # Profile data\n profiler = DataProfiler()\n profile = profiler.profile(data, name=Path(args.input).stem)\n\n # Calculate score\n score_calc = QualityScoreCalculator()\n score = score_calc.calculate(profile, results)\n\n # Generate report\n reporter = ReportGenerator()\n\n if args.json:\n report = reporter.generate_json_report(profile, results, score)\n output = json.dumps(report, indent=2)\n else:\n output = reporter.generate_text_report(profile, results, score)\n\n if args.output:\n with open(args.output, 'w') as f:\n f.write(output)\n logger.info(f\"Report saved to {args.output}\")\n else:\n print(output)\n\n # Exit with error if contract validation failed\n errors = sum(1 for r in results if not r.passed and r.severity == \"error\")\n if errors > 0:\n sys.exit(1)\n\n\ndef cmd_schema(args):\n \"\"\"Generate schema from data\"\"\"\n logger.info(f\"Loading data from {args.input}\")\n data = DataLoader.load(args.input)\n\n if not data:\n logger.error(\"Empty dataset\")\n sys.exit(1)\n\n # Profile to detect types\n profiler = DataProfiler()\n profile = profiler.profile(data, name=Path(args.input).stem)\n\n # Generate schema\n schema = {\n \"name\": profile.name,\n \"version\": \"1.0\",\n \"columns\": []\n }\n\n for col in profile.columns:\n col_schema = {\n \"name\": col.name,\n \"type\": col.data_type,\n \"nullable\": col.null_percentage > 0,\n \"description\": \"\"\n }\n\n if col.unique_percentage > 99:\n col_schema[\"unique\"] = True\n\n if col.min_value is not None:\n col_schema[\"min_value\"] = col.min_value\n col_schema[\"max_value\"] = col.max_value\n\n if col.min_length is not None:\n col_schema[\"min_length\"] = col.min_length\n col_schema[\"max_length\"] = col.max_length\n\n if col.detected_pattern:\n col_schema[\"pattern\"] = col.detected_pattern\n\n # Add allowed values for low-cardinality columns\n if col.unique_count \u003c= 20 and col.unique_percentage \u003c 10:\n col_schema[\"allowed_values\"] = [v[0] for v in col.top_values]\n\n schema[\"columns\"].append(col_schema)\n\n output = json.dumps(schema, indent=2)\n\n if args.output:\n with open(args.output, 'w') as f:\n f.write(output)\n logger.info(f\"Schema saved to {args.output}\")\n else:\n print(output)\n\n\ndef main():\n \"\"\"Main entry point\"\"\"\n parser = argparse.ArgumentParser(\n description=\"Data Quality Validator - Comprehensive data quality validation\",\n formatter_class=argparse.RawDescriptionHelpFormatter,\n epilog=\"\"\"\nExamples:\n # Validate data against schema\n python data_quality_validator.py validate data.csv --schema schema.json\n\n # Profile data\n python data_quality_validator.py profile data.csv --output profile.json\n\n # Generate Great Expectations suite\n python data_quality_validator.py generate-suite data.csv --output expectations.json\n\n # Validate against data contract\n python data_quality_validator.py contract data.csv --contract contract.yaml\n\n # Generate schema from data\n python data_quality_validator.py schema data.csv --output schema.json\n \"\"\"\n )\n\n parser.add_argument('--verbose', '-v', action='store_true', help='Verbose output')\n\n subparsers = parser.add_subparsers(dest='command', help='Command to run')\n\n # Validate command\n validate_parser = subparsers.add_parser('validate', help='Validate data against schema')\n validate_parser.add_argument('input', help='Input data file (CSV, JSON, JSONL)')\n validate_parser.add_argument('--schema', '-s', help='Schema file (JSON)')\n validate_parser.add_argument('--output', '-o', help='Output report file')\n validate_parser.add_argument('--json', action='store_true', help='Output as JSON')\n validate_parser.add_argument('--detect-anomalies', action='store_true', help='Detect statistical anomalies')\n validate_parser.set_defaults(func=cmd_validate)\n\n # Profile command\n profile_parser = subparsers.add_parser('profile', help='Generate data profile')\n profile_parser.add_argument('input', help='Input data file')\n profile_parser.add_argument('--output', '-o', help='Output profile file')\n profile_parser.add_argument('--json', action='store_true', help='Output as JSON')\n profile_parser.set_defaults(func=cmd_profile)\n\n # Generate suite command\n suite_parser = subparsers.add_parser('generate-suite', help='Generate Great Expectations suite')\n suite_parser.add_argument('input', help='Input data file')\n suite_parser.add_argument('--output', '-o', help='Output expectations file')\n suite_parser.set_defaults(func=cmd_generate_suite)\n\n # Contract command\n contract_parser = subparsers.add_parser('contract', help='Validate against data contract')\n contract_parser.add_argument('input', help='Input data file')\n contract_parser.add_argument('--contract', '-c', required=True, help='Data contract file (YAML or JSON)')\n contract_parser.add_argument('--output', '-o', help='Output report file')\n contract_parser.add_argument('--json', action='store_true', help='Output as JSON')\n contract_parser.set_defaults(func=cmd_contract)\n\n # Schema command\n schema_parser = subparsers.add_parser('schema', help='Generate schema from data')\n schema_parser.add_argument('input', help='Input data file')\n schema_parser.add_argument('--output', '-o', help='Output schema file')\n schema_parser.set_defaults(func=cmd_schema)\n\n args = parser.parse_args()\n\n if args.verbose:\n logging.getLogger().setLevel(logging.DEBUG)\n\n if not args.command:\n parser.print_help()\n sys.exit(1)\n\n try:\n args.func(args)\n except Exception as e:\n logger.error(f\"Error: {e}\")\n if args.verbose:\n import traceback\n traceback.print_exc()\n sys.exit(1)\n\n\nif __name__ == '__main__':\n main()\n","content_type":"text/x-python; charset=utf-8","language":"python","size":60660,"content_sha256":"2a1d1bc2c5f13692eeed3b35961338cc8e5ef38a2c4b1e3c062233931175ee4d"},{"filename":"scripts/etl_performance_optimizer.py","content":"#!/usr/bin/env python3\n\"\"\"\nETL Performance Optimizer\nComprehensive ETL/ELT performance analysis and optimization tool.\n\nFeatures:\n- SQL query analysis and optimization recommendations\n- Spark job configuration analysis\n- Data skew detection and mitigation\n- Partition strategy recommendations\n- Join optimization suggestions\n- Memory and shuffle analysis\n- Cost estimation for cloud warehouses\n\nUsage:\n python etl_performance_optimizer.py analyze-sql query.sql\n python etl_performance_optimizer.py analyze-spark spark-history.json\n python etl_performance_optimizer.py optimize-partition data_stats.json\n python etl_performance_optimizer.py estimate-cost query.sql --warehouse snowflake\n\"\"\"\n\nimport os\nimport sys\nimport json\nimport re\nimport argparse\nimport logging\nimport math\nfrom pathlib import Path\nfrom typing import Dict, List, Optional, Any, Tuple, Set\nfrom dataclasses import dataclass, field, asdict\nfrom datetime import datetime\nfrom collections import defaultdict\nfrom abc import ABC, abstractmethod\n\nlogging.basicConfig(\n level=logging.INFO,\n format='%(asctime)s - %(levelname)s - %(message)s'\n)\nlogger = logging.getLogger(__name__)\n\n\n# =============================================================================\n# Data Classes\n# =============================================================================\n\n@dataclass\nclass SQLQueryInfo:\n \"\"\"Parsed information about a SQL query\"\"\"\n query_type: str # SELECT, INSERT, UPDATE, DELETE, MERGE, CREATE\n tables: List[str]\n columns: List[str]\n joins: List[Dict[str, str]]\n where_conditions: List[str]\n group_by: List[str]\n order_by: List[str]\n aggregations: List[str]\n subqueries: int\n distinct: bool\n limit: Optional[int]\n ctes: List[str]\n window_functions: List[str]\n estimated_complexity: str # low, medium, high, very_high\n\n\n@dataclass\nclass OptimizationRecommendation:\n \"\"\"A single optimization recommendation\"\"\"\n category: str # index, partition, join, filter, aggregation, memory, shuffle\n severity: str # critical, high, medium, low\n title: str\n description: str\n current_issue: str\n recommendation: str\n expected_improvement: str\n implementation: str\n priority: int = 1\n\n\n@dataclass\nclass SparkJobMetrics:\n \"\"\"Metrics from a Spark job\"\"\"\n job_id: str\n duration_ms: int\n stages: int\n tasks: int\n shuffle_read_bytes: int\n shuffle_write_bytes: int\n input_bytes: int\n output_bytes: int\n peak_memory_bytes: int\n gc_time_ms: int\n failed_tasks: int\n speculative_tasks: int\n skew_ratio: float # max_task_time / median_task_time\n\n\n@dataclass\nclass PartitionStrategy:\n \"\"\"Recommended partition strategy\"\"\"\n column: str\n partition_type: str # range, hash, list\n num_partitions: Optional[int]\n partition_size_mb: float\n reasoning: str\n implementation: str\n\n\n@dataclass\nclass CostEstimate:\n \"\"\"Cost estimate for a query\"\"\"\n warehouse: str\n compute_cost: float\n storage_cost: float\n data_transfer_cost: float\n total_cost: float\n currency: str = \"USD\"\n assumptions: List[str] = field(default_factory=list)\n\n\n# =============================================================================\n# SQL Parser\n# =============================================================================\n\nclass SQLParser:\n \"\"\"Parse and analyze SQL queries\"\"\"\n\n # Common SQL patterns\n PATTERNS = {\n 'select': re.compile(r'\\bSELECT\\b', re.IGNORECASE),\n 'from': re.compile(r'\\bFROM\\b', re.IGNORECASE),\n 'join': re.compile(r'\\b(INNER|LEFT|RIGHT|FULL|CROSS)?\\s*JOIN\\b', re.IGNORECASE),\n 'where': re.compile(r'\\bWHERE\\b', re.IGNORECASE),\n 'group_by': re.compile(r'\\bGROUP\\s+BY\\b', re.IGNORECASE),\n 'order_by': re.compile(r'\\bORDER\\s+BY\\b', re.IGNORECASE),\n 'having': re.compile(r'\\bHAVING\\b', re.IGNORECASE),\n 'distinct': re.compile(r'\\bDISTINCT\\b', re.IGNORECASE),\n 'limit': re.compile(r'\\bLIMIT\\s+(\\d+)', re.IGNORECASE),\n 'cte': re.compile(r'\\bWITH\\b', re.IGNORECASE),\n 'subquery': re.compile(r'\\(\\s*SELECT\\b', re.IGNORECASE),\n 'window': re.compile(r'\\bOVER\\s*\\(', re.IGNORECASE),\n 'aggregation': re.compile(r'\\b(COUNT|SUM|AVG|MIN|MAX|STDDEV|VARIANCE)\\s*\\(', re.IGNORECASE),\n 'insert': re.compile(r'\\bINSERT\\s+INTO\\b', re.IGNORECASE),\n 'update': re.compile(r'\\bUPDATE\\b', re.IGNORECASE),\n 'delete': re.compile(r'\\bDELETE\\s+FROM\\b', re.IGNORECASE),\n 'merge': re.compile(r'\\bMERGE\\s+INTO\\b', re.IGNORECASE),\n 'create': re.compile(r'\\bCREATE\\s+(TABLE|VIEW|INDEX)\\b', re.IGNORECASE),\n }\n\n def parse(self, sql: str) -> SQLQueryInfo:\n \"\"\"Parse a SQL query and extract information\"\"\"\n # Clean up the query\n sql = self._clean_sql(sql)\n\n # Determine query type\n query_type = self._detect_query_type(sql)\n\n # Extract tables\n tables = self._extract_tables(sql)\n\n # Extract columns (for SELECT queries)\n columns = self._extract_columns(sql) if query_type == 'SELECT' else []\n\n # Extract joins\n joins = self._extract_joins(sql)\n\n # Extract WHERE conditions\n where_conditions = self._extract_where_conditions(sql)\n\n # Extract GROUP BY\n group_by = self._extract_group_by(sql)\n\n # Extract ORDER BY\n order_by = self._extract_order_by(sql)\n\n # Extract aggregations\n aggregations = self._extract_aggregations(sql)\n\n # Count subqueries\n subqueries = len(self.PATTERNS['subquery'].findall(sql))\n\n # Check for DISTINCT\n distinct = bool(self.PATTERNS['distinct'].search(sql))\n\n # Extract LIMIT\n limit_match = self.PATTERNS['limit'].search(sql)\n limit = int(limit_match.group(1)) if limit_match else None\n\n # Extract CTEs\n ctes = self._extract_ctes(sql)\n\n # Extract window functions\n window_functions = self._extract_window_functions(sql)\n\n # Estimate complexity\n complexity = self._estimate_complexity(\n tables, joins, subqueries, aggregations, window_functions\n )\n\n return SQLQueryInfo(\n query_type=query_type,\n tables=tables,\n columns=columns,\n joins=joins,\n where_conditions=where_conditions,\n group_by=group_by,\n order_by=order_by,\n aggregations=aggregations,\n subqueries=subqueries,\n distinct=distinct,\n limit=limit,\n ctes=ctes,\n window_functions=window_functions,\n estimated_complexity=complexity\n )\n\n def _clean_sql(self, sql: str) -> str:\n \"\"\"Clean and normalize SQL\"\"\"\n # Remove comments\n sql = re.sub(r'--.*

Senior Data Engineer Production-grade data engineering skill for building scalable, reliable data systems. Table of Contents 1. Trigger Phrases 2. Quick Start 3. Workflows 4. Architecture Decision Framework 5. Tech Stack 6. Reference Documentation 7. Troubleshooting --- Trigger Phrases Activate this skill when you see: Pipeline Design: - "Design a data pipeline for..." - "Build an ETL/ELT process..." - "How should I ingest data from..." - "Set up data extraction from..." Architecture: - "Should I use batch or streaming?" - "Lambda vs Kappa architecture" - "How to handle late-arriving data" -…

, '', sql, flags=re.MULTILINE)\n sql = re.sub(r'/\\*.*?\\*/', '', sql, flags=re.DOTALL)\n # Normalize whitespace\n sql = ' '.join(sql.split())\n return sql\n\n def _detect_query_type(self, sql: str) -> str:\n \"\"\"Detect the type of SQL query\"\"\"\n sql_upper = sql.upper().strip()\n\n if sql_upper.startswith('WITH') or sql_upper.startswith('SELECT'):\n return 'SELECT'\n elif self.PATTERNS['insert'].search(sql):\n return 'INSERT'\n elif self.PATTERNS['update'].search(sql):\n return 'UPDATE'\n elif self.PATTERNS['delete'].search(sql):\n return 'DELETE'\n elif self.PATTERNS['merge'].search(sql):\n return 'MERGE'\n elif self.PATTERNS['create'].search(sql):\n return 'CREATE'\n else:\n return 'UNKNOWN'\n\n def _extract_tables(self, sql: str) -> List[str]:\n \"\"\"Extract table names from SQL\"\"\"\n tables = []\n\n # FROM clause tables\n from_pattern = re.compile(\n r'\\bFROM\\s+([a-zA-Z_][a-zA-Z0-9_]*(?:\\.[a-zA-Z_][a-zA-Z0-9_]*)?)',\n re.IGNORECASE\n )\n tables.extend(from_pattern.findall(sql))\n\n # JOIN clause tables\n join_pattern = re.compile(\n r'\\bJOIN\\s+([a-zA-Z_][a-zA-Z0-9_]*(?:\\.[a-zA-Z_][a-zA-Z0-9_]*)?)',\n re.IGNORECASE\n )\n tables.extend(join_pattern.findall(sql))\n\n # INSERT INTO table\n insert_pattern = re.compile(\n r'\\bINSERT\\s+INTO\\s+([a-zA-Z_][a-zA-Z0-9_]*(?:\\.[a-zA-Z_][a-zA-Z0-9_]*)?)',\n re.IGNORECASE\n )\n tables.extend(insert_pattern.findall(sql))\n\n # UPDATE table\n update_pattern = re.compile(\n r'\\bUPDATE\\s+([a-zA-Z_][a-zA-Z0-9_]*(?:\\.[a-zA-Z_][a-zA-Z0-9_]*)?)',\n re.IGNORECASE\n )\n tables.extend(update_pattern.findall(sql))\n\n return list(set(tables))\n\n def _extract_columns(self, sql: str) -> List[str]:\n \"\"\"Extract column references from SELECT clause\"\"\"\n # Find SELECT ... FROM\n match = re.search(r'\\bSELECT\\s+(.*?)\\s+FROM\\b', sql, re.IGNORECASE | re.DOTALL)\n if not match:\n return []\n\n select_clause = match.group(1)\n\n # Handle SELECT *\n if '*' in select_clause and 'COUNT(*)' not in select_clause.upper():\n return ['*']\n\n # Extract column names (simplified)\n columns = []\n for part in select_clause.split(','):\n part = part.strip()\n # Handle aliases\n alias_match = re.search(r'\\bAS\\s+(\\w+)\\s*

Senior Data Engineer Production-grade data engineering skill for building scalable, reliable data systems. Table of Contents 1. Trigger Phrases 2. Quick Start 3. Workflows 4. Architecture Decision Framework 5. Tech Stack 6. Reference Documentation 7. Troubleshooting --- Trigger Phrases Activate this skill when you see: Pipeline Design: - "Design a data pipeline for..." - "Build an ETL/ELT process..." - "How should I ingest data from..." - "Set up data extraction from..." Architecture: - "Should I use batch or streaming?" - "Lambda vs Kappa architecture" - "How to handle late-arriving data" -…

, part, re.IGNORECASE)\n if alias_match:\n columns.append(alias_match.group(1))\n else:\n # Get the last identifier\n col_match = re.search(r'([a-zA-Z_][a-zA-Z0-9_]*)(?:\\s*$|\\s+AS\\b)', part, re.IGNORECASE)\n if col_match:\n columns.append(col_match.group(1))\n\n return columns\n\n def _extract_joins(self, sql: str) -> List[Dict[str, str]]:\n \"\"\"Extract join information\"\"\"\n joins = []\n\n join_pattern = re.compile(\n r'\\b(INNER|LEFT\\s+OUTER?|RIGHT\\s+OUTER?|FULL\\s+OUTER?|CROSS)?\\s*JOIN\\s+'\n r'([a-zA-Z_][a-zA-Z0-9_.]*)\\s*(?:AS\\s+)?(\\w+)?\\s*'\n r'(?:ON\\s+(.+?))?(?=\\s+(?:INNER|LEFT|RIGHT|FULL|CROSS|WHERE|GROUP|ORDER|HAVING|LIMIT|$))',\n re.IGNORECASE | re.DOTALL\n )\n\n for match in join_pattern.finditer(sql):\n join_type = match.group(1) or 'INNER'\n table = match.group(2)\n alias = match.group(3)\n condition = match.group(4)\n\n joins.append({\n 'type': join_type.strip().upper(),\n 'table': table,\n 'alias': alias,\n 'condition': condition.strip() if condition else None\n })\n\n return joins\n\n def _extract_where_conditions(self, sql: str) -> List[str]:\n \"\"\"Extract WHERE clause conditions\"\"\"\n # Find WHERE ... (GROUP BY | ORDER BY | HAVING | LIMIT | end)\n match = re.search(\n r'\\bWHERE\\s+(.*?)(?=\\s+(?:GROUP\\s+BY|ORDER\\s+BY|HAVING|LIMIT)|$)',\n sql, re.IGNORECASE | re.DOTALL\n )\n if not match:\n return []\n\n where_clause = match.group(1).strip()\n\n # Split by AND/OR (simplified)\n conditions = re.split(r'\\s+AND\\s+|\\s+OR\\s+', where_clause, flags=re.IGNORECASE)\n return [c.strip() for c in conditions if c.strip()]\n\n def _extract_group_by(self, sql: str) -> List[str]:\n \"\"\"Extract GROUP BY columns\"\"\"\n match = re.search(\n r'\\bGROUP\\s+BY\\s+(.*?)(?=\\s+(?:HAVING|ORDER\\s+BY|LIMIT)|$)',\n sql, re.IGNORECASE | re.DOTALL\n )\n if not match:\n return []\n\n group_clause = match.group(1).strip()\n columns = [c.strip() for c in group_clause.split(',')]\n return columns\n\n def _extract_order_by(self, sql: str) -> List[str]:\n \"\"\"Extract ORDER BY columns\"\"\"\n match = re.search(\n r'\\bORDER\\s+BY\\s+(.*?)(?=\\s+LIMIT|$)',\n sql, re.IGNORECASE | re.DOTALL\n )\n if not match:\n return []\n\n order_clause = match.group(1).strip()\n columns = [c.strip() for c in order_clause.split(',')]\n return columns\n\n def _extract_aggregations(self, sql: str) -> List[str]:\n \"\"\"Extract aggregation functions used\"\"\"\n agg_pattern = re.compile(\n r'\\b(COUNT|SUM|AVG|MIN|MAX|STDDEV|VARIANCE|MEDIAN|PERCENTILE_CONT|PERCENTILE_DISC)\\s*\\(',\n re.IGNORECASE\n )\n return list(set(m.upper() for m in agg_pattern.findall(sql)))\n\n def _extract_ctes(self, sql: str) -> List[str]:\n \"\"\"Extract CTE names\"\"\"\n cte_pattern = re.compile(\n r'\\bWITH\\s+(\\w+)\\s+AS\\s*\\(|,\\s*(\\w+)\\s+AS\\s*\\(',\n re.IGNORECASE\n )\n ctes = []\n for match in cte_pattern.finditer(sql):\n cte_name = match.group(1) or match.group(2)\n if cte_name:\n ctes.append(cte_name)\n return ctes\n\n def _extract_window_functions(self, sql: str) -> List[str]:\n \"\"\"Extract window function patterns\"\"\"\n window_pattern = re.compile(\n r'\\b(\\w+)\\s*\\([^)]*\\)\\s+OVER\\s*\\(',\n re.IGNORECASE\n )\n return list(set(m.upper() for m in window_pattern.findall(sql)))\n\n def _estimate_complexity(self, tables: List[str], joins: List[Dict],\n subqueries: int, aggregations: List[str],\n window_functions: List[str]) -> str:\n \"\"\"Estimate query complexity\"\"\"\n score = 0\n\n # Table count\n score += len(tables) * 10\n\n # Join count and types\n for join in joins:\n if join['type'] in ('CROSS', 'FULL OUTER'):\n score += 30\n elif join['type'] in ('LEFT OUTER', 'RIGHT OUTER'):\n score += 20\n else:\n score += 15\n\n # Subqueries\n score += subqueries * 25\n\n # Aggregations\n score += len(aggregations) * 5\n\n # Window functions\n score += len(window_functions) * 15\n\n if score \u003c 30:\n return 'low'\n elif score \u003c 60:\n return 'medium'\n elif score \u003c 100:\n return 'high'\n else:\n return 'very_high'\n\n\n# =============================================================================\n# SQL Optimizer\n# =============================================================================\n\nclass SQLOptimizer:\n \"\"\"Analyze SQL queries and provide optimization recommendations\"\"\"\n\n def analyze(self, query_info: SQLQueryInfo, sql: str) -> List[OptimizationRecommendation]:\n \"\"\"Analyze a SQL query and generate optimization recommendations\"\"\"\n recommendations = []\n\n # Check for SELECT *\n if '*' in query_info.columns:\n recommendations.append(self._recommend_explicit_columns())\n\n # Check for missing WHERE clause on large tables\n if not query_info.where_conditions and query_info.tables:\n recommendations.append(self._recommend_add_filters())\n\n # Check for inefficient joins\n join_recs = self._analyze_joins(query_info)\n recommendations.extend(join_recs)\n\n # Check for DISTINCT usage\n if query_info.distinct:\n recommendations.append(self._recommend_distinct_alternative())\n\n # Check for ORDER BY without LIMIT\n if query_info.order_by and not query_info.limit:\n recommendations.append(self._recommend_add_limit())\n\n # Check for subquery optimization\n if query_info.subqueries > 0:\n recommendations.append(self._recommend_cte_conversion())\n\n # Check for index opportunities\n index_recs = self._analyze_index_opportunities(query_info)\n recommendations.extend(index_recs)\n\n # Check for partition pruning\n partition_recs = self._analyze_partition_pruning(query_info, sql)\n recommendations.extend(partition_recs)\n\n # Check for aggregation optimization\n if query_info.aggregations and query_info.group_by:\n agg_recs = self._analyze_aggregation(query_info)\n recommendations.extend(agg_recs)\n\n # Sort by priority\n recommendations.sort(key=lambda r: r.priority)\n\n return recommendations\n\n def _recommend_explicit_columns(self) -> OptimizationRecommendation:\n return OptimizationRecommendation(\n category=\"query_structure\",\n severity=\"medium\",\n title=\"Avoid SELECT *\",\n description=\"Using SELECT * retrieves all columns, increasing I/O and memory usage.\",\n current_issue=\"Query uses SELECT * which fetches unnecessary columns\",\n recommendation=\"Specify only the columns you need\",\n expected_improvement=\"10-50% reduction in data scanned depending on table width\",\n implementation=\"Replace SELECT * with SELECT col1, col2, col3 ...\",\n priority=2\n )\n\n def _recommend_add_filters(self) -> OptimizationRecommendation:\n return OptimizationRecommendation(\n category=\"filter\",\n severity=\"high\",\n title=\"Add WHERE Clause Filters\",\n description=\"Query scans entire tables without filtering, causing full table scans.\",\n current_issue=\"No WHERE clause filters found - full table scan required\",\n recommendation=\"Add appropriate WHERE conditions to filter data early\",\n expected_improvement=\"Up to 90%+ reduction in data processed if highly selective\",\n implementation=\"Add WHERE column = value or WHERE date_column >= '2024-01-01'\",\n priority=1\n )\n\n def _analyze_joins(self, query_info: SQLQueryInfo) -> List[OptimizationRecommendation]:\n \"\"\"Analyze joins for optimization opportunities\"\"\"\n recommendations = []\n\n for join in query_info.joins:\n # Check for CROSS JOIN\n if join['type'] == 'CROSS':\n recommendations.append(OptimizationRecommendation(\n category=\"join\",\n severity=\"critical\",\n title=\"Avoid CROSS JOIN\",\n description=\"CROSS JOIN creates a Cartesian product, which can explode data volume.\",\n current_issue=f\"CROSS JOIN with table {join['table']} detected\",\n recommendation=\"Replace with appropriate INNER/LEFT JOIN with ON condition\",\n expected_improvement=\"Exponential reduction in intermediate data\",\n implementation=f\"Convert CROSS JOIN {join['table']} to INNER JOIN {join['table']} ON ...\",\n priority=1\n ))\n\n # Check for missing join condition\n if not join.get('condition'):\n recommendations.append(OptimizationRecommendation(\n category=\"join\",\n severity=\"high\",\n title=\"Missing Join Condition\",\n description=\"Join without explicit ON condition may cause Cartesian product.\",\n current_issue=f\"JOIN with {join['table']} has no explicit ON condition\",\n recommendation=\"Add explicit ON condition to the join\",\n expected_improvement=\"Prevents accidental Cartesian products\",\n implementation=f\"Add ON {join['table']}.id = other_table.foreign_key\",\n priority=1\n ))\n\n # Check for many joins\n if len(query_info.joins) > 5:\n recommendations.append(OptimizationRecommendation(\n category=\"join\",\n severity=\"medium\",\n title=\"High Number of Joins\",\n description=\"Many joins can lead to complex execution plans and performance issues.\",\n current_issue=f\"{len(query_info.joins)} joins detected in single query\",\n recommendation=\"Consider breaking into smaller queries or pre-aggregating\",\n expected_improvement=\"Better plan optimization and memory usage\",\n implementation=\"Use CTEs to materialize intermediate results, or denormalize frequently joined data\",\n priority=3\n ))\n\n return recommendations\n\n def _recommend_distinct_alternative(self) -> OptimizationRecommendation:\n return OptimizationRecommendation(\n category=\"query_structure\",\n severity=\"medium\",\n title=\"Consider Alternatives to DISTINCT\",\n description=\"DISTINCT requires sorting/hashing all rows which can be expensive.\",\n current_issue=\"DISTINCT used - may indicate data quality or join issues\",\n recommendation=\"Review if DISTINCT is necessary or if joins produce duplicates\",\n expected_improvement=\"Eliminates expensive deduplication step if not needed\",\n implementation=\"Review join conditions, or use GROUP BY if aggregating anyway\",\n priority=3\n )\n\n def _recommend_add_limit(self) -> OptimizationRecommendation:\n return OptimizationRecommendation(\n category=\"query_structure\",\n severity=\"low\",\n title=\"Add LIMIT to ORDER BY\",\n description=\"ORDER BY without LIMIT sorts entire result set unnecessarily.\",\n current_issue=\"ORDER BY present without LIMIT clause\",\n recommendation=\"Add LIMIT if only top N rows are needed\",\n expected_improvement=\"Significant reduction in sorting overhead for large results\",\n implementation=\"Add LIMIT 100 (or appropriate number) after ORDER BY\",\n priority=4\n )\n\n def _recommend_cte_conversion(self) -> OptimizationRecommendation:\n return OptimizationRecommendation(\n category=\"query_structure\",\n severity=\"medium\",\n title=\"Convert Subqueries to CTEs\",\n description=\"Subqueries can be harder to optimize and maintain than CTEs.\",\n current_issue=\"Subqueries detected in the query\",\n recommendation=\"Convert correlated subqueries to CTEs or JOINs\",\n expected_improvement=\"Better query plan optimization and readability\",\n implementation=\"WITH subquery_name AS (SELECT ...) SELECT ... FROM main_table JOIN subquery_name\",\n priority=3\n )\n\n def _analyze_index_opportunities(self, query_info: SQLQueryInfo) -> List[OptimizationRecommendation]:\n \"\"\"Identify potential index opportunities\"\"\"\n recommendations = []\n\n # Columns in WHERE clause are index candidates\n where_columns = set()\n for condition in query_info.where_conditions:\n # Extract column names from conditions\n col_pattern = re.compile(r'\\b([a-zA-Z_][a-zA-Z0-9_]*)\\s*(?:=|>|\u003c|>=|\u003c=|\u003c>|!=|LIKE|IN|BETWEEN)', re.IGNORECASE)\n where_columns.update(col_pattern.findall(condition))\n\n if where_columns:\n recommendations.append(OptimizationRecommendation(\n category=\"index\",\n severity=\"medium\",\n title=\"Consider Indexes on Filter Columns\",\n description=\"Columns used in WHERE clauses benefit from indexes.\",\n current_issue=f\"Filter columns detected: {', '.join(where_columns)}\",\n recommendation=\"Create indexes on frequently filtered columns\",\n expected_improvement=\"Orders of magnitude faster for selective queries\",\n implementation=f\"CREATE INDEX idx_name ON table ({', '.join(list(where_columns)[:3])})\",\n priority=2\n ))\n\n # JOIN columns are index candidates\n join_columns = set()\n for join in query_info.joins:\n if join.get('condition'):\n col_pattern = re.compile(r'\\.([a-zA-Z_][a-zA-Z0-9_]*)\\s*=', re.IGNORECASE)\n join_columns.update(col_pattern.findall(join['condition']))\n\n if join_columns:\n recommendations.append(OptimizationRecommendation(\n category=\"index\",\n severity=\"high\",\n title=\"Index Join Columns\",\n description=\"Join columns without indexes cause expensive full table scans.\",\n current_issue=f\"Join columns detected: {', '.join(join_columns)}\",\n recommendation=\"Ensure indexes exist on join key columns\",\n expected_improvement=\"Dramatic improvement in join performance\",\n implementation=f\"CREATE INDEX idx_join ON table ({list(join_columns)[0]})\",\n priority=1\n ))\n\n return recommendations\n\n def _analyze_partition_pruning(self, query_info: SQLQueryInfo, sql: str) -> List[OptimizationRecommendation]:\n \"\"\"Check for partition pruning opportunities\"\"\"\n recommendations = []\n\n # Look for date/time columns in WHERE clause\n date_pattern = re.compile(\n r'\\b(date|time|timestamp|created|updated|modified)_?\\w*\\s*(?:=|>|\u003c|>=|\u003c=|BETWEEN)',\n re.IGNORECASE\n )\n\n if date_pattern.search(sql):\n recommendations.append(OptimizationRecommendation(\n category=\"partition\",\n severity=\"medium\",\n title=\"Leverage Partition Pruning\",\n description=\"Date-based filters can leverage partitioned tables for massive speedups.\",\n current_issue=\"Date/time filter detected - ensure table is partitioned\",\n recommendation=\"Partition table by date column and ensure filter format matches\",\n expected_improvement=\"90%+ reduction in data scanned for time-bounded queries\",\n implementation=\"CREATE TABLE ... PARTITION BY RANGE (date_column) or use dynamic partitioning\",\n priority=2\n ))\n\n return recommendations\n\n def _analyze_aggregation(self, query_info: SQLQueryInfo) -> List[OptimizationRecommendation]:\n \"\"\"Analyze aggregation patterns\"\"\"\n recommendations = []\n\n # High cardinality GROUP BY warning\n if len(query_info.group_by) > 3:\n recommendations.append(OptimizationRecommendation(\n category=\"aggregation\",\n severity=\"medium\",\n title=\"High Cardinality GROUP BY\",\n description=\"Grouping by many columns increases memory usage and reduces aggregation benefit.\",\n current_issue=f\"GROUP BY with {len(query_info.group_by)} columns detected\",\n recommendation=\"Review if all group by columns are necessary\",\n expected_improvement=\"Reduced memory and faster aggregation\",\n implementation=\"Remove non-essential GROUP BY columns or pre-aggregate\",\n priority=3\n ))\n\n # COUNT DISTINCT optimization\n if 'COUNT' in query_info.aggregations and query_info.distinct:\n recommendations.append(OptimizationRecommendation(\n category=\"aggregation\",\n severity=\"medium\",\n title=\"Optimize COUNT DISTINCT\",\n description=\"COUNT DISTINCT can be expensive for high cardinality columns.\",\n current_issue=\"COUNT DISTINCT pattern detected\",\n recommendation=\"Consider HyperLogLog approximation for very large datasets\",\n expected_improvement=\"Massive speedup with ~2% error tolerance\",\n implementation=\"Use APPROX_COUNT_DISTINCT() if available in your warehouse\",\n priority=3\n ))\n\n return recommendations\n\n\n# =============================================================================\n# Spark Job Analyzer\n# =============================================================================\n\nclass SparkJobAnalyzer:\n \"\"\"Analyze Spark job metrics and provide optimization recommendations\"\"\"\n\n def analyze(self, metrics: SparkJobMetrics) -> List[OptimizationRecommendation]:\n \"\"\"Analyze Spark job metrics\"\"\"\n recommendations = []\n\n # Check for data skew\n if metrics.skew_ratio > 5:\n recommendations.append(self._recommend_skew_mitigation(metrics))\n\n # Check for excessive shuffle\n shuffle_ratio = metrics.shuffle_write_bytes / max(metrics.input_bytes, 1)\n if shuffle_ratio > 1.5:\n recommendations.append(self._recommend_reduce_shuffle(metrics, shuffle_ratio))\n\n # Check for GC overhead\n gc_ratio = metrics.gc_time_ms / max(metrics.duration_ms, 1)\n if gc_ratio > 0.1:\n recommendations.append(self._recommend_memory_tuning(metrics, gc_ratio))\n\n # Check for failed tasks\n if metrics.failed_tasks > 0:\n fail_ratio = metrics.failed_tasks / max(metrics.tasks, 1)\n recommendations.append(self._recommend_failure_handling(metrics, fail_ratio))\n\n # Check for speculative execution overhead\n if metrics.speculative_tasks > metrics.tasks * 0.1:\n recommendations.append(self._recommend_reduce_speculation(metrics))\n\n # Check task count\n if metrics.tasks > 10000:\n recommendations.append(self._recommend_reduce_tasks(metrics))\n elif metrics.tasks \u003c 10 and metrics.input_bytes > 1e9:\n recommendations.append(self._recommend_increase_parallelism(metrics))\n\n return recommendations\n\n def _recommend_skew_mitigation(self, metrics: SparkJobMetrics) -> OptimizationRecommendation:\n return OptimizationRecommendation(\n category=\"skew\",\n severity=\"critical\",\n title=\"Severe Data Skew Detected\",\n description=f\"Skew ratio of {metrics.skew_ratio:.1f}x indicates uneven data distribution.\",\n current_issue=f\"Task execution time varies by {metrics.skew_ratio:.1f}x, causing stragglers\",\n recommendation=\"Apply skew handling techniques to rebalance data\",\n expected_improvement=\"Up to 80% reduction in job time by eliminating stragglers\",\n implementation=\"\"\"Options:\n1. Salting: Add random prefix to skewed keys\n df.withColumn(\"salted_key\", concat(col(\"key\"), lit(\"_\"), (rand() * 10).cast(\"int\")))\n2. Broadcast join for small tables:\n df1.join(broadcast(df2), \"key\")\n3. Adaptive Query Execution (Spark 3.0+):\n spark.conf.set(\"spark.sql.adaptive.enabled\", \"true\")\n spark.conf.set(\"spark.sql.adaptive.skewJoin.enabled\", \"true\")\"\"\",\n priority=1\n )\n\n def _recommend_reduce_shuffle(self, metrics: SparkJobMetrics, ratio: float) -> OptimizationRecommendation:\n return OptimizationRecommendation(\n category=\"shuffle\",\n severity=\"high\",\n title=\"Excessive Shuffle Data\",\n description=f\"Shuffle writes {ratio:.1f}x the input data size.\",\n current_issue=f\"Shuffle write: {metrics.shuffle_write_bytes / 1e9:.2f} GB vs input: {metrics.input_bytes / 1e9:.2f} GB\",\n recommendation=\"Reduce shuffle through partitioning and early aggregation\",\n expected_improvement=\"Significant network I/O and storage reduction\",\n implementation=\"\"\"Options:\n1. Pre-aggregate before shuffle:\n df.groupBy(\"key\").agg(sum(\"value\")).repartition(\"key\")\n2. Use map-side combining:\n df.reduceByKey((a, b) => a + b)\n3. Optimize partition count:\n spark.conf.set(\"spark.sql.shuffle.partitions\", optimal_count)\n4. Use bucketing for repeated joins:\n df.write.bucketBy(200, \"key\").saveAsTable(\"bucketed_table\")\"\"\",\n priority=1\n )\n\n def _recommend_memory_tuning(self, metrics: SparkJobMetrics, gc_ratio: float) -> OptimizationRecommendation:\n return OptimizationRecommendation(\n category=\"memory\",\n severity=\"high\",\n title=\"High GC Overhead\",\n description=f\"GC time is {gc_ratio * 100:.1f}% of total execution time.\",\n current_issue=f\"GC time: {metrics.gc_time_ms / 1000:.1f}s out of {metrics.duration_ms / 1000:.1f}s total\",\n recommendation=\"Tune memory settings to reduce garbage collection\",\n expected_improvement=\"20-50% faster execution with proper memory config\",\n implementation=\"\"\"Memory tuning options:\n1. Increase executor memory:\n --executor-memory 8g\n2. Adjust memory fractions:\n spark.memory.fraction=0.6\n spark.memory.storageFraction=0.5\n3. Use off-heap memory:\n spark.memory.offHeap.enabled=true\n spark.memory.offHeap.size=4g\n4. Reduce cached data:\n df.unpersist() when no longer needed\n5. Use Kryo serialization:\n spark.serializer=org.apache.spark.serializer.KryoSerializer\"\"\",\n priority=2\n )\n\n def _recommend_failure_handling(self, metrics: SparkJobMetrics, fail_ratio: float) -> OptimizationRecommendation:\n return OptimizationRecommendation(\n category=\"reliability\",\n severity=\"high\" if fail_ratio > 0.1 else \"medium\",\n title=\"Task Failures Detected\",\n description=f\"{metrics.failed_tasks} tasks failed ({fail_ratio * 100:.1f}% failure rate).\",\n current_issue=\"Task failures increase job time and resource usage due to retries\",\n recommendation=\"Investigate failure causes and add resilience\",\n expected_improvement=\"Reduced retries and more predictable job times\",\n implementation=\"\"\"Failure handling options:\n1. Check executor logs for OOM:\n spark.executor.memoryOverhead=2g\n2. Handle data issues:\n df.filter(col(\"value\").isNotNull())\n3. Increase task retries:\n spark.task.maxFailures=4\n4. Add checkpointing for long jobs:\n df.checkpoint()\n5. Check for network timeouts:\n spark.network.timeout=300s\"\"\",\n priority=1\n )\n\n def _recommend_reduce_speculation(self, metrics: SparkJobMetrics) -> OptimizationRecommendation:\n return OptimizationRecommendation(\n category=\"execution\",\n severity=\"medium\",\n title=\"High Speculative Execution\",\n description=f\"{metrics.speculative_tasks} speculative tasks launched.\",\n current_issue=\"Excessive speculation wastes resources and indicates underlying issues\",\n recommendation=\"Address root cause of slow tasks instead of speculation\",\n expected_improvement=\"Better resource utilization\",\n implementation=\"\"\"Options:\n1. Disable speculation if not needed:\n spark.speculation=false\n2. Or tune speculation settings:\n spark.speculation.multiplier=1.5\n spark.speculation.quantile=0.9\n3. Fix underlying skew/memory issues first\"\"\",\n priority=3\n )\n\n def _recommend_reduce_tasks(self, metrics: SparkJobMetrics) -> OptimizationRecommendation:\n return OptimizationRecommendation(\n category=\"parallelism\",\n severity=\"medium\",\n title=\"Too Many Tasks\",\n description=f\"{metrics.tasks} tasks may cause excessive scheduling overhead.\",\n current_issue=\"Very high task count increases driver overhead\",\n recommendation=\"Reduce partition count for better efficiency\",\n expected_improvement=\"Reduced scheduling overhead and driver memory usage\",\n implementation=f\"\"\"\n1. Reduce shuffle partitions:\n spark.sql.shuffle.partitions={max(200, metrics.tasks // 10)}\n2. Coalesce partitions:\n df.coalesce({max(200, metrics.tasks // 10)})\n3. Use adaptive partitioning (Spark 3.0+):\n spark.sql.adaptive.enabled=true\"\"\",\n priority=3\n )\n\n def _recommend_increase_parallelism(self, metrics: SparkJobMetrics) -> OptimizationRecommendation:\n recommended_partitions = max(200, int(metrics.input_bytes / (128 * 1e6))) # 128MB per partition\n return OptimizationRecommendation(\n category=\"parallelism\",\n severity=\"high\",\n title=\"Low Parallelism\",\n description=f\"Only {metrics.tasks} tasks for {metrics.input_bytes / 1e9:.2f} GB of data.\",\n current_issue=\"Under-utilization of cluster resources\",\n recommendation=\"Increase parallelism to better utilize cluster\",\n expected_improvement=\"Linear speedup with added parallelism\",\n implementation=f\"\"\"\n1. Increase shuffle partitions:\n spark.sql.shuffle.partitions={recommended_partitions}\n2. Repartition input:\n df.repartition({recommended_partitions})\n3. Adjust default parallelism:\n spark.default.parallelism={recommended_partitions}\"\"\",\n priority=2\n )\n\n\n# =============================================================================\n# Partition Strategy Advisor\n# =============================================================================\n\nclass PartitionAdvisor:\n \"\"\"Recommend partitioning strategies based on data characteristics\"\"\"\n\n def recommend(self, data_stats: Dict) -> List[PartitionStrategy]:\n \"\"\"Generate partition recommendations from data statistics\"\"\"\n recommendations = []\n\n columns = data_stats.get('columns', {})\n total_size_bytes = data_stats.get('total_size_bytes', 0)\n row_count = data_stats.get('row_count', 0)\n\n for col_name, col_stats in columns.items():\n strategy = self._evaluate_column(col_name, col_stats, total_size_bytes, row_count)\n if strategy:\n recommendations.append(strategy)\n\n # Sort by partition effectiveness\n recommendations.sort(key=lambda s: s.partition_size_mb)\n\n return recommendations[:3] # Top 3 recommendations\n\n def _evaluate_column(self, col_name: str, col_stats: Dict,\n total_size_bytes: int, row_count: int) -> Optional[PartitionStrategy]:\n \"\"\"Evaluate a column for partitioning potential\"\"\"\n cardinality = col_stats.get('cardinality', 0)\n data_type = col_stats.get('data_type', 'string')\n null_percentage = col_stats.get('null_percentage', 0)\n\n # Skip high-null columns\n if null_percentage > 20:\n return None\n\n # Date/timestamp columns are ideal for range partitioning\n if data_type in ('date', 'timestamp', 'datetime'):\n return self._recommend_date_partition(col_name, col_stats, total_size_bytes, row_count)\n\n # Low cardinality columns are good for list partitioning\n if cardinality and cardinality \u003c= 100:\n return self._recommend_list_partition(col_name, col_stats, total_size_bytes, cardinality)\n\n # Medium cardinality columns can use hash partitioning\n if cardinality and 100 \u003c cardinality \u003c= 10000:\n return self._recommend_hash_partition(col_name, col_stats, total_size_bytes)\n\n return None\n\n def _recommend_date_partition(self, col_name: str, col_stats: Dict,\n total_size_bytes: int, row_count: int) -> PartitionStrategy:\n # Estimate daily partition size (assume 365 days of data)\n estimated_days = 365\n partition_size_mb = (total_size_bytes / estimated_days) / (1024 * 1024)\n\n return PartitionStrategy(\n column=col_name,\n partition_type=\"range\",\n num_partitions=None, # Dynamic based on date range\n partition_size_mb=partition_size_mb,\n reasoning=f\"Date column '{col_name}' is ideal for range partitioning. \"\n f\"Estimated daily partition size: {partition_size_mb:.1f} MB\",\n implementation=f\"\"\"\n-- BigQuery\nCREATE TABLE table_name\nPARTITION BY DATE({col_name})\nAS SELECT * FROM source_table;\n\n-- Snowflake\nCREATE TABLE table_name\nCLUSTER BY (DATE_TRUNC('DAY', {col_name}));\n\n-- Spark/Hive\ndf.write.partitionBy(\"{col_name}\").parquet(\"path\")\n\n-- PostgreSQL\nCREATE TABLE table_name (...)\nPARTITION BY RANGE ({col_name});\"\"\"\n )\n\n def _recommend_list_partition(self, col_name: str, col_stats: Dict,\n total_size_bytes: int, cardinality: int) -> PartitionStrategy:\n partition_size_mb = (total_size_bytes / cardinality) / (1024 * 1024)\n\n return PartitionStrategy(\n column=col_name,\n partition_type=\"list\",\n num_partitions=cardinality,\n partition_size_mb=partition_size_mb,\n reasoning=f\"Column '{col_name}' has {cardinality} distinct values - ideal for list partitioning. \"\n f\"Estimated partition size: {partition_size_mb:.1f} MB\",\n implementation=f\"\"\"\n-- Spark/Hive\ndf.write.partitionBy(\"{col_name}\").parquet(\"path\")\n\n-- PostgreSQL\nCREATE TABLE table_name (...)\nPARTITION BY LIST ({col_name});\n\n-- Note: List partitioning works best with stable, low-cardinality values\"\"\"\n )\n\n def _recommend_hash_partition(self, col_name: str, col_stats: Dict,\n total_size_bytes: int) -> PartitionStrategy:\n # Target ~128MB partitions\n target_partition_size = 128 * 1024 * 1024\n num_partitions = max(1, int(total_size_bytes / target_partition_size))\n\n # Round to power of 2 for better distribution\n num_partitions = 2 ** int(math.log2(num_partitions) + 0.5)\n partition_size_mb = (total_size_bytes / num_partitions) / (1024 * 1024)\n\n return PartitionStrategy(\n column=col_name,\n partition_type=\"hash\",\n num_partitions=num_partitions,\n partition_size_mb=partition_size_mb,\n reasoning=f\"Column '{col_name}' has medium cardinality - hash partitioning provides even distribution. \"\n f\"Recommended {num_partitions} partitions (~{partition_size_mb:.1f} MB each)\",\n implementation=f\"\"\"\n-- Spark\ndf.repartition({num_partitions}, col(\"{col_name}\"))\n\n-- PostgreSQL\nCREATE TABLE table_name (...)\nPARTITION BY HASH ({col_name});\n\n-- Snowflake (clustering)\nALTER TABLE table_name CLUSTER BY ({col_name});\"\"\"\n )\n\n\n# =============================================================================\n# Cost Estimator\n# =============================================================================\n\nclass CostEstimator:\n \"\"\"Estimate query costs for cloud data warehouses\"\"\"\n\n # Pricing (approximate, varies by region and contract)\n PRICING = {\n 'snowflake': {\n 'compute_per_credit': 2.00, # USD per credit\n 'credits_per_hour': {\n 'x-small': 1,\n 'small': 2,\n 'medium': 4,\n 'large': 8,\n 'x-large': 16,\n },\n 'storage_per_tb_month': 23.00,\n },\n 'bigquery': {\n 'on_demand_per_tb': 5.00, # USD per TB scanned\n 'storage_per_tb_month': 20.00,\n 'streaming_insert_per_gb': 0.01,\n },\n 'redshift': {\n 'dc2_large_per_hour': 0.25,\n 'ra3_xlarge_per_hour': 1.086,\n 'storage_per_gb_month': 0.024,\n },\n 'databricks': {\n 'dbu_per_hour_sql': 0.22,\n 'dbu_per_hour_jobs': 0.15,\n }\n }\n\n def estimate(self, query_info: SQLQueryInfo, warehouse: str,\n data_stats: Optional[Dict] = None) -> CostEstimate:\n \"\"\"Estimate query cost\"\"\"\n warehouse = warehouse.lower()\n\n if warehouse not in self.PRICING:\n raise ValueError(f\"Unknown warehouse: {warehouse}. Supported: {list(self.PRICING.keys())}\")\n\n # Estimate data scanned\n data_scanned_bytes = self._estimate_data_scanned(query_info, data_stats)\n data_scanned_tb = data_scanned_bytes / (1024 ** 4)\n\n if warehouse == 'bigquery':\n return self._estimate_bigquery(query_info, data_scanned_tb, data_stats)\n elif warehouse == 'snowflake':\n return self._estimate_snowflake(query_info, data_scanned_tb, data_stats)\n elif warehouse == 'redshift':\n return self._estimate_redshift(query_info, data_scanned_tb, data_stats)\n elif warehouse == 'databricks':\n return self._estimate_databricks(query_info, data_scanned_tb, data_stats)\n\n def _estimate_data_scanned(self, query_info: SQLQueryInfo,\n data_stats: Optional[Dict]) -> int:\n \"\"\"Estimate bytes of data that will be scanned\"\"\"\n if data_stats and 'total_size_bytes' in data_stats:\n base_size = data_stats['total_size_bytes']\n else:\n # Default assumption: 1GB per table\n base_size = len(query_info.tables) * 1e9\n\n # Adjust for filters\n filter_factor = 1.0\n if query_info.where_conditions:\n # Assume each filter reduces data by 50% (very rough)\n filter_factor = 0.5 ** min(len(query_info.where_conditions), 3)\n\n # Adjust for column projection\n if '*' not in query_info.columns and query_info.columns:\n # Assume selecting specific columns reduces scan by 50%\n filter_factor *= 0.5\n\n return int(base_size * filter_factor)\n\n def _estimate_bigquery(self, query_info: SQLQueryInfo,\n data_scanned_tb: float, data_stats: Optional[Dict]) -> CostEstimate:\n pricing = self.PRICING['bigquery']\n\n compute_cost = data_scanned_tb * pricing['on_demand_per_tb']\n\n # Minimum billing of 10MB\n if data_scanned_tb \u003c 10 / (1024 ** 2):\n compute_cost = 10 / (1024 ** 2) * pricing['on_demand_per_tb']\n\n return CostEstimate(\n warehouse='BigQuery',\n compute_cost=compute_cost,\n storage_cost=0, # Storage cost separate\n data_transfer_cost=0,\n total_cost=compute_cost,\n assumptions=[\n f\"Estimated {data_scanned_tb * 1024:.2f} GB data scanned\",\n \"Using on-demand pricing ($5/TB)\",\n \"Assumes no slot reservations\",\n \"Actual cost depends on partitioning and clustering\"\n ]\n )\n\n def _estimate_snowflake(self, query_info: SQLQueryInfo,\n data_scanned_tb: float, data_stats: Optional[Dict]) -> CostEstimate:\n pricing = self.PRICING['snowflake']\n\n # Estimate warehouse size and time\n complexity_to_size = {\n 'low': 'x-small',\n 'medium': 'small',\n 'high': 'medium',\n 'very_high': 'large'\n }\n warehouse_size = complexity_to_size.get(query_info.estimated_complexity, 'small')\n credits_per_hour = pricing['credits_per_hour'][warehouse_size]\n\n # Estimate runtime (very rough)\n estimated_seconds = max(1, data_scanned_tb * 1024 * 10) # 10 seconds per GB\n estimated_hours = estimated_seconds / 3600\n\n credits_used = credits_per_hour * estimated_hours\n compute_cost = credits_used * pricing['compute_per_credit']\n\n # Minimum 1 minute billing\n min_cost = (credits_per_hour / 60) * pricing['compute_per_credit']\n compute_cost = max(compute_cost, min_cost)\n\n return CostEstimate(\n warehouse='Snowflake',\n compute_cost=compute_cost,\n storage_cost=0,\n data_transfer_cost=0,\n total_cost=compute_cost,\n assumptions=[\n f\"Warehouse size: {warehouse_size}\",\n f\"Estimated runtime: {estimated_seconds:.1f} seconds\",\n f\"Credits used: {credits_used:.4f}\",\n \"Minimum 1-minute billing applies\",\n \"Actual cost depends on warehouse auto-suspend settings\"\n ]\n )\n\n def _estimate_redshift(self, query_info: SQLQueryInfo,\n data_scanned_tb: float, data_stats: Optional[Dict]) -> CostEstimate:\n pricing = self.PRICING['redshift']\n\n # Assume RA3 xl node type\n hourly_rate = pricing['ra3_xlarge_per_hour']\n\n # Estimate runtime\n estimated_seconds = max(1, data_scanned_tb * 1024 * 15) # 15 seconds per GB\n estimated_hours = estimated_seconds / 3600\n\n compute_cost = hourly_rate * estimated_hours\n\n return CostEstimate(\n warehouse='Redshift',\n compute_cost=compute_cost,\n storage_cost=0,\n data_transfer_cost=0,\n total_cost=compute_cost,\n assumptions=[\n f\"Using RA3.xlplus node type\",\n f\"Estimated runtime: {estimated_seconds:.1f} seconds\",\n \"Assumes dedicated cluster (not serverless)\",\n \"Actual cost depends on cluster configuration\"\n ]\n )\n\n def _estimate_databricks(self, query_info: SQLQueryInfo,\n data_scanned_tb: float, data_stats: Optional[Dict]) -> CostEstimate:\n pricing = self.PRICING['databricks']\n\n # Estimate DBUs\n estimated_seconds = max(1, data_scanned_tb * 1024 * 12)\n estimated_hours = estimated_seconds / 3600\n\n dbu_cost = pricing['dbu_per_hour_sql'] * estimated_hours\n\n return CostEstimate(\n warehouse='Databricks',\n compute_cost=dbu_cost,\n storage_cost=0,\n data_transfer_cost=0,\n total_cost=dbu_cost,\n assumptions=[\n f\"Using SQL warehouse\",\n f\"Estimated runtime: {estimated_seconds:.1f} seconds\",\n \"DBU rate may vary by workspace tier\",\n \"Does not include underlying cloud costs\"\n ]\n )\n\n\n# =============================================================================\n# Report Generator\n# =============================================================================\n\nclass ReportGenerator:\n \"\"\"Generate optimization reports\"\"\"\n\n def generate_text_report(self, query_info: SQLQueryInfo,\n recommendations: List[OptimizationRecommendation],\n cost_estimate: Optional[CostEstimate] = None) -> str:\n \"\"\"Generate a text report\"\"\"\n lines = []\n lines.append(\"=\" * 80)\n lines.append(\"ETL PERFORMANCE OPTIMIZATION REPORT\")\n lines.append(\"=\" * 80)\n lines.append(f\"\\nGenerated: {datetime.now().isoformat()}\")\n\n # Query summary\n lines.append(\"\\n\" + \"-\" * 40)\n lines.append(\"QUERY ANALYSIS\")\n lines.append(\"-\" * 40)\n lines.append(f\"Query Type: {query_info.query_type}\")\n lines.append(f\"Tables: {', '.join(query_info.tables) or 'None'}\")\n lines.append(f\"Joins: {len(query_info.joins)}\")\n lines.append(f\"Subqueries: {query_info.subqueries}\")\n lines.append(f\"Aggregations: {', '.join(query_info.aggregations) or 'None'}\")\n lines.append(f\"Window Functions: {', '.join(query_info.window_functions) or 'None'}\")\n lines.append(f\"Complexity: {query_info.estimated_complexity.upper()}\")\n\n # Cost estimate\n if cost_estimate:\n lines.append(\"\\n\" + \"-\" * 40)\n lines.append(\"COST ESTIMATE\")\n lines.append(\"-\" * 40)\n lines.append(f\"Warehouse: {cost_estimate.warehouse}\")\n lines.append(f\"Estimated Cost: ${cost_estimate.total_cost:.4f} {cost_estimate.currency}\")\n lines.append(\"Assumptions:\")\n for assumption in cost_estimate.assumptions:\n lines.append(f\" - {assumption}\")\n\n # Recommendations\n if recommendations:\n lines.append(\"\\n\" + \"-\" * 40)\n lines.append(f\"OPTIMIZATION RECOMMENDATIONS ({len(recommendations)} found)\")\n lines.append(\"-\" * 40)\n\n for i, rec in enumerate(recommendations, 1):\n severity_icon = {\n 'critical': '🔴',\n 'high': '🟠',\n 'medium': '🟡',\n 'low': '🟢'\n }.get(rec.severity, '⚪')\n\n lines.append(f\"\\n{i}. {severity_icon} [{rec.severity.upper()}] {rec.title}\")\n lines.append(f\" Category: {rec.category}\")\n lines.append(f\" Issue: {rec.current_issue}\")\n lines.append(f\" Recommendation: {rec.recommendation}\")\n lines.append(f\" Expected Improvement: {rec.expected_improvement}\")\n lines.append(f\"\\n Implementation:\")\n for impl_line in rec.implementation.strip().split('\\n'):\n lines.append(f\" {impl_line}\")\n else:\n lines.append(\"\\n✅ No optimization issues detected\")\n\n lines.append(\"\\n\" + \"=\" * 80)\n\n return \"\\n\".join(lines)\n\n def generate_json_report(self, query_info: SQLQueryInfo,\n recommendations: List[OptimizationRecommendation],\n cost_estimate: Optional[CostEstimate] = None) -> Dict:\n \"\"\"Generate a JSON report\"\"\"\n return {\n \"report_type\": \"etl_performance_optimization\",\n \"generated_at\": datetime.now().isoformat(),\n \"query_analysis\": {\n \"query_type\": query_info.query_type,\n \"tables\": query_info.tables,\n \"joins\": query_info.joins,\n \"subqueries\": query_info.subqueries,\n \"aggregations\": query_info.aggregations,\n \"window_functions\": query_info.window_functions,\n \"complexity\": query_info.estimated_complexity\n },\n \"cost_estimate\": asdict(cost_estimate) if cost_estimate else None,\n \"recommendations\": [asdict(r) for r in recommendations],\n \"summary\": {\n \"total_recommendations\": len(recommendations),\n \"critical\": sum(1 for r in recommendations if r.severity == \"critical\"),\n \"high\": sum(1 for r in recommendations if r.severity == \"high\"),\n \"medium\": sum(1 for r in recommendations if r.severity == \"medium\"),\n \"low\": sum(1 for r in recommendations if r.severity == \"low\")\n }\n }\n\n\n# =============================================================================\n# CLI Commands\n# =============================================================================\n\ndef cmd_analyze_sql(args):\n \"\"\"Analyze SQL query for optimization opportunities\"\"\"\n # Load SQL\n sql_path = Path(args.input)\n if sql_path.exists():\n with open(sql_path, 'r') as f:\n sql = f.read()\n else:\n sql = args.input # Treat as inline SQL\n\n # Parse and analyze\n parser = SQLParser()\n query_info = parser.parse(sql)\n\n optimizer = SQLOptimizer()\n recommendations = optimizer.analyze(query_info, sql)\n\n # Cost estimate if warehouse specified\n cost_estimate = None\n if args.warehouse:\n estimator = CostEstimator()\n data_stats = None\n if args.stats:\n with open(args.stats, 'r') as f:\n data_stats = json.load(f)\n cost_estimate = estimator.estimate(query_info, args.warehouse, data_stats)\n\n # Generate report\n reporter = ReportGenerator()\n\n if args.json:\n report = reporter.generate_json_report(query_info, recommendations, cost_estimate)\n output = json.dumps(report, indent=2)\n else:\n output = reporter.generate_text_report(query_info, recommendations, cost_estimate)\n\n if args.output:\n with open(args.output, 'w') as f:\n f.write(output)\n logger.info(f\"Report saved to {args.output}\")\n else:\n print(output)\n\n\ndef cmd_analyze_spark(args):\n \"\"\"Analyze Spark job metrics\"\"\"\n with open(args.input, 'r') as f:\n metrics_data = json.load(f)\n\n # Handle both single job and array of jobs\n if isinstance(metrics_data, list):\n jobs = metrics_data\n else:\n jobs = [metrics_data]\n\n all_recommendations = []\n analyzer = SparkJobAnalyzer()\n\n for job_data in jobs:\n metrics = SparkJobMetrics(\n job_id=job_data.get('jobId', 'unknown'),\n duration_ms=job_data.get('duration', 0),\n stages=job_data.get('numStages', 0),\n tasks=job_data.get('numTasks', 0),\n shuffle_read_bytes=job_data.get('shuffleReadBytes', 0),\n shuffle_write_bytes=job_data.get('shuffleWriteBytes', 0),\n input_bytes=job_data.get('inputBytes', 0),\n output_bytes=job_data.get('outputBytes', 0),\n peak_memory_bytes=job_data.get('peakMemoryBytes', 0),\n gc_time_ms=job_data.get('gcTime', 0),\n failed_tasks=job_data.get('failedTasks', 0),\n speculative_tasks=job_data.get('speculativeTasks', 0),\n skew_ratio=job_data.get('skewRatio', 1.0)\n )\n\n recommendations = analyzer.analyze(metrics)\n all_recommendations.extend(recommendations)\n\n # Deduplicate similar recommendations\n unique_recs = []\n seen_titles = set()\n for rec in all_recommendations:\n if rec.title not in seen_titles:\n unique_recs.append(rec)\n seen_titles.add(rec.title)\n\n # Output\n if args.json:\n output = json.dumps([asdict(r) for r in unique_recs], indent=2)\n else:\n lines = []\n lines.append(\"=\" * 60)\n lines.append(\"SPARK JOB OPTIMIZATION REPORT\")\n lines.append(\"=\" * 60)\n lines.append(f\"\\nJobs Analyzed: {len(jobs)}\")\n lines.append(f\"Recommendations: {len(unique_recs)}\")\n\n for i, rec in enumerate(unique_recs, 1):\n lines.append(f\"\\n{i}. [{rec.severity.upper()}] {rec.title}\")\n lines.append(f\" {rec.description}\")\n lines.append(f\" Implementation: {rec.implementation[:200]}...\")\n\n output = \"\\n\".join(lines)\n\n if args.output:\n with open(args.output, 'w') as f:\n f.write(output)\n else:\n print(output)\n\n\ndef cmd_optimize_partition(args):\n \"\"\"Recommend partition strategies\"\"\"\n with open(args.input, 'r') as f:\n data_stats = json.load(f)\n\n advisor = PartitionAdvisor()\n strategies = advisor.recommend(data_stats)\n\n if args.json:\n output = json.dumps([asdict(s) for s in strategies], indent=2)\n else:\n lines = []\n lines.append(\"=\" * 60)\n lines.append(\"PARTITION STRATEGY RECOMMENDATIONS\")\n lines.append(\"=\" * 60)\n\n if not strategies:\n lines.append(\"\\nNo partition recommendations based on provided data statistics.\")\n else:\n for i, strategy in enumerate(strategies, 1):\n lines.append(f\"\\n{i}. Partition by: {strategy.column}\")\n lines.append(f\" Type: {strategy.partition_type}\")\n if strategy.num_partitions:\n lines.append(f\" Partitions: {strategy.num_partitions}\")\n lines.append(f\" Estimated size: {strategy.partition_size_mb:.1f} MB per partition\")\n lines.append(f\" Reasoning: {strategy.reasoning}\")\n lines.append(f\"\\n Implementation:\")\n for impl_line in strategy.implementation.strip().split('\\n'):\n lines.append(f\" {impl_line}\")\n\n output = \"\\n\".join(lines)\n\n if args.output:\n with open(args.output, 'w') as f:\n f.write(output)\n else:\n print(output)\n\n\ndef cmd_estimate_cost(args):\n \"\"\"Estimate query cost\"\"\"\n # Load SQL\n sql_path = Path(args.input)\n if sql_path.exists():\n with open(sql_path, 'r') as f:\n sql = f.read()\n else:\n sql = args.input\n\n # Parse\n parser = SQLParser()\n query_info = parser.parse(sql)\n\n # Load data stats if provided\n data_stats = None\n if args.stats:\n with open(args.stats, 'r') as f:\n data_stats = json.load(f)\n\n # Estimate cost\n estimator = CostEstimator()\n cost = estimator.estimate(query_info, args.warehouse, data_stats)\n\n if args.json:\n output = json.dumps(asdict(cost), indent=2)\n else:\n lines = []\n lines.append(f\"Cost Estimate for {cost.warehouse}\")\n lines.append(\"=\" * 40)\n lines.append(f\"Compute Cost: ${cost.compute_cost:.4f}\")\n lines.append(f\"Storage Cost: ${cost.storage_cost:.4f}\")\n lines.append(f\"Data Transfer: ${cost.data_transfer_cost:.4f}\")\n lines.append(\"-\" * 40)\n lines.append(f\"Total: ${cost.total_cost:.4f} {cost.currency}\")\n lines.append(\"\\nAssumptions:\")\n for assumption in cost.assumptions:\n lines.append(f\" - {assumption}\")\n\n output = \"\\n\".join(lines)\n\n if args.output:\n with open(args.output, 'w') as f:\n f.write(output)\n else:\n print(output)\n\n\ndef cmd_generate_template(args):\n \"\"\"Generate template files\"\"\"\n templates = {\n 'data_stats': {\n \"total_size_bytes\": 10737418240,\n \"row_count\": 10000000,\n \"columns\": {\n \"id\": {\n \"data_type\": \"integer\",\n \"cardinality\": 10000000,\n \"null_percentage\": 0\n },\n \"created_at\": {\n \"data_type\": \"timestamp\",\n \"cardinality\": 1000000,\n \"null_percentage\": 0\n },\n \"category\": {\n \"data_type\": \"string\",\n \"cardinality\": 50,\n \"null_percentage\": 2\n },\n \"amount\": {\n \"data_type\": \"float\",\n \"cardinality\": 100000,\n \"null_percentage\": 5\n }\n }\n },\n 'spark_metrics': {\n \"jobId\": \"job_12345\",\n \"duration\": 300000,\n \"numStages\": 5,\n \"numTasks\": 200,\n \"shuffleReadBytes\": 5368709120,\n \"shuffleWriteBytes\": 2147483648,\n \"inputBytes\": 10737418240,\n \"outputBytes\": 1073741824,\n \"peakMemoryBytes\": 4294967296,\n \"gcTime\": 15000,\n \"failedTasks\": 2,\n \"speculativeTasks\": 5,\n \"skewRatio\": 3.5\n }\n }\n\n if args.template not in templates:\n logger.error(f\"Unknown template: {args.template}. Available: {list(templates.keys())}\")\n sys.exit(1)\n\n output = json.dumps(templates[args.template], indent=2)\n\n if args.output:\n with open(args.output, 'w') as f:\n f.write(output)\n logger.info(f\"Template saved to {args.output}\")\n else:\n print(output)\n\n\ndef main():\n \"\"\"Main entry point\"\"\"\n parser = argparse.ArgumentParser(\n description=\"ETL Performance Optimizer - Analyze and optimize data pipelines\",\n formatter_class=argparse.RawDescriptionHelpFormatter,\n epilog=\"\"\"\nExamples:\n # Analyze SQL query\n python etl_performance_optimizer.py analyze-sql query.sql\n\n # Analyze with cost estimate\n python etl_performance_optimizer.py analyze-sql query.sql --warehouse bigquery\n\n # Analyze Spark job metrics\n python etl_performance_optimizer.py analyze-spark spark-history.json\n\n # Get partition recommendations\n python etl_performance_optimizer.py optimize-partition data_stats.json\n\n # Estimate query cost\n python etl_performance_optimizer.py estimate-cost query.sql --warehouse snowflake\n\n # Generate template files\n python etl_performance_optimizer.py template data_stats --output stats.json\n \"\"\"\n )\n\n parser.add_argument('--verbose', '-v', action='store_true', help='Verbose output')\n\n subparsers = parser.add_subparsers(dest='command', help='Command to run')\n\n # Analyze SQL command\n sql_parser = subparsers.add_parser('analyze-sql', help='Analyze SQL query')\n sql_parser.add_argument('input', help='SQL file or inline query')\n sql_parser.add_argument('--warehouse', '-w', choices=['bigquery', 'snowflake', 'redshift', 'databricks'],\n help='Warehouse for cost estimation')\n sql_parser.add_argument('--stats', '-s', help='Data statistics JSON file')\n sql_parser.add_argument('--output', '-o', help='Output file')\n sql_parser.add_argument('--json', action='store_true', help='Output as JSON')\n sql_parser.set_defaults(func=cmd_analyze_sql)\n\n # Analyze Spark command\n spark_parser = subparsers.add_parser('analyze-spark', help='Analyze Spark job metrics')\n spark_parser.add_argument('input', help='Spark metrics JSON file')\n spark_parser.add_argument('--output', '-o', help='Output file')\n spark_parser.add_argument('--json', action='store_true', help='Output as JSON')\n spark_parser.set_defaults(func=cmd_analyze_spark)\n\n # Optimize partition command\n partition_parser = subparsers.add_parser('optimize-partition', help='Recommend partition strategies')\n partition_parser.add_argument('input', help='Data statistics JSON file')\n partition_parser.add_argument('--output', '-o', help='Output file')\n partition_parser.add_argument('--json', action='store_true', help='Output as JSON')\n partition_parser.set_defaults(func=cmd_optimize_partition)\n\n # Estimate cost command\n cost_parser = subparsers.add_parser('estimate-cost', help='Estimate query cost')\n cost_parser.add_argument('input', help='SQL file or inline query')\n cost_parser.add_argument('--warehouse', '-w', required=True,\n choices=['bigquery', 'snowflake', 'redshift', 'databricks'],\n help='Target warehouse')\n cost_parser.add_argument('--stats', '-s', help='Data statistics JSON file')\n cost_parser.add_argument('--output', '-o', help='Output file')\n cost_parser.add_argument('--json', action='store_true', help='Output as JSON')\n cost_parser.set_defaults(func=cmd_estimate_cost)\n\n # Template command\n template_parser = subparsers.add_parser('template', help='Generate template files')\n template_parser.add_argument('template', choices=['data_stats', 'spark_metrics'],\n help='Template type')\n template_parser.add_argument('--output', '-o', help='Output file')\n template_parser.set_defaults(func=cmd_generate_template)\n\n args = parser.parse_args()\n\n if args.verbose:\n logging.getLogger().setLevel(logging.DEBUG)\n\n if not args.command:\n parser.print_help()\n sys.exit(1)\n\n try:\n args.func(args)\n except Exception as e:\n logger.error(f\"Error: {e}\")\n if args.verbose:\n import traceback\n traceback.print_exc()\n sys.exit(1)\n\n\nif __name__ == '__main__':\n main()\n","content_type":"text/x-python; charset=utf-8","language":"python","size":66066,"content_sha256":"6359b714f17bc1538518f8764daa921f86d09fac297c733f6448771714b5e194"},{"filename":"scripts/pipeline_orchestrator.py","content":"#!/usr/bin/env python3\n\"\"\"\nPipeline Orchestrator\n\nGenerate pipeline configurations for Airflow, Prefect, and Dagster.\nSupports ETL pattern generation, dependency management, and scheduling.\n\nUsage:\n python pipeline_orchestrator.py generate --type airflow --source postgres --destination snowflake\n python pipeline_orchestrator.py generate --type prefect --config pipeline.yaml\n python pipeline_orchestrator.py visualize --dag dags/my_dag.py\n python pipeline_orchestrator.py validate --dag dags/my_dag.py\n\"\"\"\n\nimport os\nimport sys\nimport json\ntry:\n import yaml\n HAS_YAML = True\nexcept ImportError:\n HAS_YAML = False\nimport logging\nimport argparse\nfrom pathlib import Path\nfrom typing import Dict, List, Optional, Any\nfrom datetime import datetime, timedelta\nfrom dataclasses import dataclass, field, asdict\nfrom abc import ABC, abstractmethod\n\nlogging.basicConfig(\n level=logging.INFO,\n format='%(asctime)s - %(levelname)s - %(message)s'\n)\nlogger = logging.getLogger(__name__)\n\n\n# ============================================================================\n# Data Classes\n# ============================================================================\n\n@dataclass\nclass SourceConfig:\n \"\"\"Source system configuration.\"\"\"\n type: str # postgres, mysql, s3, kafka, api\n connection_id: str\n schema: Optional[str] = None\n tables: List[str] = field(default_factory=list)\n query: Optional[str] = None\n incremental_column: Optional[str] = None\n incremental_strategy: str = \"timestamp\" # timestamp, id, cdc\n\n@dataclass\nclass DestinationConfig:\n \"\"\"Destination system configuration.\"\"\"\n type: str # snowflake, bigquery, redshift, s3, delta\n connection_id: str\n schema: str = \"raw\"\n write_mode: str = \"append\" # append, overwrite, merge\n partition_by: Optional[str] = None\n cluster_by: List[str] = field(default_factory=list)\n\n@dataclass\nclass TaskConfig:\n \"\"\"Individual task configuration.\"\"\"\n task_id: str\n operator: str\n dependencies: List[str] = field(default_factory=list)\n params: Dict[str, Any] = field(default_factory=dict)\n retries: int = 2\n retry_delay_minutes: int = 5\n timeout_minutes: int = 60\n pool: Optional[str] = None\n priority_weight: int = 1\n\n@dataclass\nclass PipelineConfig:\n \"\"\"Complete pipeline configuration.\"\"\"\n name: str\n description: str\n schedule: str # cron expression or @daily, @hourly\n owner: str = \"data-team\"\n tags: List[str] = field(default_factory=list)\n catchup: bool = False\n max_active_runs: int = 1\n default_retries: int = 2\n source: Optional[SourceConfig] = None\n destination: Optional[DestinationConfig] = None\n tasks: List[TaskConfig] = field(default_factory=list)\n\n\n# ============================================================================\n# Pipeline Generators\n# ============================================================================\n\nclass PipelineGenerator(ABC):\n \"\"\"Abstract base class for pipeline generators.\"\"\"\n\n @abstractmethod\n def generate(self, config: PipelineConfig) -> str:\n \"\"\"Generate pipeline code from config.\"\"\"\n pass\n\n @abstractmethod\n def validate(self, code: str) -> Dict[str, Any]:\n \"\"\"Validate generated pipeline code.\"\"\"\n pass\n\n\nclass AirflowGenerator(PipelineGenerator):\n \"\"\"Generate Airflow DAG code.\"\"\"\n\n OPERATOR_IMPORTS = {\n 'python': 'from airflow.operators.python import PythonOperator',\n 'bash': 'from airflow.operators.bash import BashOperator',\n 'postgres': 'from airflow.providers.postgres.operators.postgres import PostgresOperator',\n 'snowflake': 'from airflow.providers.snowflake.operators.snowflake import SnowflakeOperator',\n 's3': 'from airflow.providers.amazon.aws.operators.s3 import S3CreateBucketOperator',\n 's3_to_snowflake': 'from airflow.providers.snowflake.transfers.s3_to_snowflake import S3ToSnowflakeOperator',\n 'sensor': 'from airflow.sensors.base import BaseSensorOperator',\n 'trigger': 'from airflow.operators.trigger_dagrun import TriggerDagRunOperator',\n 'email': 'from airflow.operators.email import EmailOperator',\n 'slack': 'from airflow.providers.slack.operators.slack_webhook import SlackWebhookOperator',\n }\n\n def generate(self, config: PipelineConfig) -> str:\n \"\"\"Generate Airflow DAG from configuration.\"\"\"\n\n # Collect required imports\n imports = self._collect_imports(config)\n\n # Generate DAG code\n code = self._generate_header(imports)\n code += self._generate_default_args(config)\n code += self._generate_dag_definition(config)\n code += self._generate_tasks(config)\n code += self._generate_dependencies(config)\n\n return code\n\n def _collect_imports(self, config: PipelineConfig) -> List[str]:\n \"\"\"Collect required import statements.\"\"\"\n imports = [\n \"from airflow import DAG\",\n \"from airflow.utils.dates import days_ago\",\n \"from datetime import datetime, timedelta\",\n ]\n\n operators_used = set()\n for task in config.tasks:\n op_type = task.operator.split('_')[0].lower()\n if op_type in self.OPERATOR_IMPORTS:\n operators_used.add(op_type)\n\n # Add source/destination specific imports\n if config.source:\n if config.source.type == 'postgres':\n operators_used.add('postgres')\n elif config.source.type == 's3':\n operators_used.add('s3')\n\n if config.destination:\n if config.destination.type == 'snowflake':\n operators_used.add('snowflake')\n operators_used.add('s3_to_snowflake')\n\n for op in operators_used:\n if op in self.OPERATOR_IMPORTS:\n imports.append(self.OPERATOR_IMPORTS[op])\n\n return imports\n\n def _generate_header(self, imports: List[str]) -> str:\n \"\"\"Generate file header with imports.\"\"\"\n header = '''\"\"\"\nAuto-generated Airflow DAG\nGenerated by Pipeline Orchestrator\n\"\"\"\n\n'''\n header += '\\n'.join(imports)\n header += '\\n\\n'\n return header\n\n def _generate_default_args(self, config: PipelineConfig) -> str:\n \"\"\"Generate default_args dictionary.\"\"\"\n return f'''\ndefault_args = {{\n 'owner': '{config.owner}',\n 'depends_on_past': False,\n 'email_on_failure': True,\n 'email_on_retry': False,\n 'retries': {config.default_retries},\n 'retry_delay': timedelta(minutes=5),\n}}\n\n'''\n\n def _generate_dag_definition(self, config: PipelineConfig) -> str:\n \"\"\"Generate DAG definition.\"\"\"\n tags_str = str(config.tags) if config.tags else \"[]\"\n\n return f'''\nwith DAG(\n dag_id='{config.name}',\n default_args=default_args,\n description='{config.description}',\n schedule_interval='{config.schedule}',\n start_date=days_ago(1),\n catchup={config.catchup},\n max_active_runs={config.max_active_runs},\n tags={tags_str},\n) as dag:\n\n'''\n\n def _generate_tasks(self, config: PipelineConfig) -> str:\n \"\"\"Generate task definitions.\"\"\"\n tasks_code = \"\"\n\n for task in config.tasks:\n if 'python' in task.operator.lower():\n tasks_code += self._generate_python_task(task)\n elif 'bash' in task.operator.lower():\n tasks_code += self._generate_bash_task(task)\n elif 'sql' in task.operator.lower() or 'postgres' in task.operator.lower():\n tasks_code += self._generate_sql_task(task, config)\n elif 'snowflake' in task.operator.lower():\n tasks_code += self._generate_snowflake_task(task)\n else:\n tasks_code += self._generate_generic_task(task)\n\n return tasks_code\n\n def _generate_python_task(self, task: TaskConfig) -> str:\n \"\"\"Generate PythonOperator task.\"\"\"\n callable_name = task.params.get('callable', 'process_data')\n return f'''\n def {callable_name}(**kwargs):\n \"\"\"Task: {task.task_id}\"\"\"\n # Add your processing logic here\n execution_date = kwargs.get('ds')\n print(f\"Processing data for {{execution_date}}\")\n return True\n\n {task.task_id} = PythonOperator(\n task_id='{task.task_id}',\n python_callable={callable_name},\n retries={task.retries},\n retry_delay=timedelta(minutes={task.retry_delay_minutes}),\n execution_timeout=timedelta(minutes={task.timeout_minutes}),\n )\n\n'''\n\n def _generate_bash_task(self, task: TaskConfig) -> str:\n \"\"\"Generate BashOperator task.\"\"\"\n command = task.params.get('command', 'echo \"Hello World\"')\n return f'''\n {task.task_id} = BashOperator(\n task_id='{task.task_id}',\n bash_command='{command}',\n retries={task.retries},\n retry_delay=timedelta(minutes={task.retry_delay_minutes}),\n execution_timeout=timedelta(minutes={task.timeout_minutes}),\n )\n\n'''\n\n def _generate_sql_task(self, task: TaskConfig, config: PipelineConfig) -> str:\n \"\"\"Generate SQL operator task.\"\"\"\n sql = task.params.get('sql', 'SELECT 1')\n conn_id = config.source.connection_id if config.source else 'default_conn'\n\n return f'''\n {task.task_id} = PostgresOperator(\n task_id='{task.task_id}',\n postgres_conn_id='{conn_id}',\n sql=\"\"\"{sql}\"\"\",\n retries={task.retries},\n retry_delay=timedelta(minutes={task.retry_delay_minutes}),\n )\n\n'''\n\n def _generate_snowflake_task(self, task: TaskConfig) -> str:\n \"\"\"Generate SnowflakeOperator task.\"\"\"\n sql = task.params.get('sql', 'SELECT 1')\n return f'''\n {task.task_id} = SnowflakeOperator(\n task_id='{task.task_id}',\n snowflake_conn_id='snowflake_default',\n sql=\"\"\"{sql}\"\"\",\n retries={task.retries},\n retry_delay=timedelta(minutes={task.retry_delay_minutes}),\n )\n\n'''\n\n def _generate_generic_task(self, task: TaskConfig) -> str:\n \"\"\"Generate generic task placeholder.\"\"\"\n return f'''\n # TODO: Implement {task.operator} for {task.task_id}\n {task.task_id} = PythonOperator(\n task_id='{task.task_id}',\n python_callable=lambda: print(\"{task.task_id}\"),\n )\n\n'''\n\n def _generate_dependencies(self, config: PipelineConfig) -> str:\n \"\"\"Generate task dependencies.\"\"\"\n deps_code = \"\\n # Task dependencies\\n\"\n\n for task in config.tasks:\n if task.dependencies:\n for dep in task.dependencies:\n deps_code += f\" {dep} >> {task.task_id}\\n\"\n\n return deps_code\n\n def validate(self, code: str) -> Dict[str, Any]:\n \"\"\"Validate generated DAG code.\"\"\"\n issues = []\n warnings = []\n\n # Check for common issues\n if 'default_args' not in code:\n issues.append(\"Missing default_args definition\")\n\n if 'with DAG' not in code:\n issues.append(\"Missing DAG context manager\")\n\n if 'schedule_interval' not in code:\n warnings.append(\"No schedule_interval defined, DAG won't run automatically\")\n\n # Try to parse the code\n try:\n compile(code, '\u003cstring>', 'exec')\n except SyntaxError as e:\n issues.append(f\"Syntax error: {e}\")\n\n return {\n 'valid': len(issues) == 0,\n 'issues': issues,\n 'warnings': warnings\n }\n\n\nclass PrefectGenerator(PipelineGenerator):\n \"\"\"Generate Prefect flow code.\"\"\"\n\n def generate(self, config: PipelineConfig) -> str:\n \"\"\"Generate Prefect flow from configuration.\"\"\"\n\n code = self._generate_header()\n code += self._generate_tasks(config)\n code += self._generate_flow(config)\n\n return code\n\n def _generate_header(self) -> str:\n \"\"\"Generate file header.\"\"\"\n return '''\"\"\"\nAuto-generated Prefect Flow\nGenerated by Pipeline Orchestrator\n\"\"\"\n\nfrom prefect import flow, task, get_run_logger\nfrom prefect.tasks import task_input_hash\nfrom datetime import timedelta\nimport pandas as pd\n\n'''\n\n def _generate_tasks(self, config: PipelineConfig) -> str:\n \"\"\"Generate Prefect tasks.\"\"\"\n tasks_code = \"\"\n\n for task_config in config.tasks:\n cache_expiration = task_config.params.get('cache_hours', 1)\n tasks_code += f'''\n@task(\n name=\"{task_config.task_id}\",\n retries={task_config.retries},\n retry_delay_seconds={task_config.retry_delay_minutes * 60},\n cache_key_fn=task_input_hash,\n cache_expiration=timedelta(hours={cache_expiration}),\n)\ndef {task_config.task_id}(input_data=None):\n \"\"\"Task: {task_config.task_id}\"\"\"\n logger = get_run_logger()\n logger.info(f\"Executing {task_config.task_id}\")\n\n # Add processing logic here\n result = input_data\n\n return result\n\n'''\n return tasks_code\n\n def _generate_flow(self, config: PipelineConfig) -> str:\n \"\"\"Generate Prefect flow.\"\"\"\n flow_code = f'''\n@flow(\n name=\"{config.name}\",\n description=\"{config.description}\",\n version=\"1.0.0\",\n)\ndef {config.name.replace('-', '_')}_flow():\n \"\"\"Main flow orchestrating all tasks.\"\"\"\n logger = get_run_logger()\n logger.info(\"Starting flow: {config.name}\")\n\n'''\n # Generate task calls with dependencies\n task_vars = {}\n for i, task_config in enumerate(config.tasks):\n task_name = task_config.task_id\n var_name = f\"result_{i}\"\n task_vars[task_name] = var_name\n\n if task_config.dependencies:\n # Get input from first dependency\n dep_var = task_vars.get(task_config.dependencies[0], \"None\")\n flow_code += f\" {var_name} = {task_name}({dep_var})\\n\"\n else:\n flow_code += f\" {var_name} = {task_name}()\\n\"\n\n flow_code += '''\n logger.info(\"Flow completed successfully\")\n return True\n\n\nif __name__ == \"__main__\":\n ''' + f'{config.name.replace(\"-\", \"_\")}_flow()' + '\\n'\n\n return flow_code\n\n def validate(self, code: str) -> Dict[str, Any]:\n \"\"\"Validate Prefect flow code.\"\"\"\n issues = []\n\n if '@flow' not in code:\n issues.append(\"Missing @flow decorator\")\n\n if '@task' not in code:\n issues.append(\"No tasks defined with @task decorator\")\n\n try:\n compile(code, '\u003cstring>', 'exec')\n except SyntaxError as e:\n issues.append(f\"Syntax error: {e}\")\n\n return {\n 'valid': len(issues) == 0,\n 'issues': issues,\n 'warnings': []\n }\n\n\nclass DagsterGenerator(PipelineGenerator):\n \"\"\"Generate Dagster job code.\"\"\"\n\n def generate(self, config: PipelineConfig) -> str:\n \"\"\"Generate Dagster job from configuration.\"\"\"\n\n code = self._generate_header()\n code += self._generate_ops(config)\n code += self._generate_job(config)\n\n return code\n\n def _generate_header(self) -> str:\n \"\"\"Generate file header.\"\"\"\n return '''\"\"\"\nAuto-generated Dagster Job\nGenerated by Pipeline Orchestrator\n\"\"\"\n\nfrom dagster import op, job, In, Out, Output, DynamicOut, graph\nfrom dagster import AssetMaterialization, MetadataValue\nimport pandas as pd\n\n'''\n\n def _generate_ops(self, config: PipelineConfig) -> str:\n \"\"\"Generate Dagster ops.\"\"\"\n ops_code = \"\"\n\n for task_config in config.tasks:\n has_input = len(task_config.dependencies) > 0\n\n if has_input:\n ops_code += f'''\n@op(\n ins={{\"input_data\": In()}},\n out=Out(),\n)\ndef {task_config.task_id}(context, input_data):\n \"\"\"Op: {task_config.task_id}\"\"\"\n context.log.info(f\"Executing {task_config.task_id}\")\n\n # Add processing logic here\n result = input_data\n\n # Log asset materialization\n yield AssetMaterialization(\n asset_key=\"{task_config.task_id}\",\n metadata={{\n \"row_count\": MetadataValue.int(len(result) if hasattr(result, '__len__') else 0),\n }}\n )\n yield Output(result)\n\n'''\n else:\n ops_code += f'''\n@op(out=Out())\ndef {task_config.task_id}(context):\n \"\"\"Op: {task_config.task_id}\"\"\"\n context.log.info(f\"Executing {task_config.task_id}\")\n\n # Add processing logic here\n result = {{}}\n\n yield AssetMaterialization(\n asset_key=\"{task_config.task_id}\",\n )\n yield Output(result)\n\n'''\n return ops_code\n\n def _generate_job(self, config: PipelineConfig) -> str:\n \"\"\"Generate Dagster job.\"\"\"\n job_code = f'''\n@job(\n name=\"{config.name}\",\n description=\"{config.description}\",\n tags={{\n \"owner\": \"{config.owner}\",\n \"schedule\": \"{config.schedule}\",\n }},\n)\ndef {config.name.replace('-', '_')}_job():\n \"\"\"Main job orchestrating all ops.\"\"\"\n'''\n # Build dependency graph\n task_outputs = {}\n for task_config in config.tasks:\n task_name = task_config.task_id\n\n if task_config.dependencies:\n dep_output = task_outputs.get(task_config.dependencies[0], None)\n if dep_output:\n job_code += f\" {task_name}_output = {task_name}({dep_output})\\n\"\n else:\n job_code += f\" {task_name}_output = {task_name}()\\n\"\n else:\n job_code += f\" {task_name}_output = {task_name}()\\n\"\n\n task_outputs[task_name] = f\"{task_name}_output\"\n\n return job_code\n\n def validate(self, code: str) -> Dict[str, Any]:\n \"\"\"Validate Dagster job code.\"\"\"\n issues = []\n\n if '@job' not in code:\n issues.append(\"Missing @job decorator\")\n\n if '@op' not in code:\n issues.append(\"No ops defined with @op decorator\")\n\n try:\n compile(code, '\u003cstring>', 'exec')\n except SyntaxError as e:\n issues.append(f\"Syntax error: {e}\")\n\n return {\n 'valid': len(issues) == 0,\n 'issues': issues,\n 'warnings': []\n }\n\n\n# ============================================================================\n# ETL Pattern Templates\n# ============================================================================\n\nclass ETLPatternGenerator:\n \"\"\"Generate common ETL patterns.\"\"\"\n\n @staticmethod\n def generate_extract_load(\n source_type: str,\n destination_type: str,\n tables: List[str],\n mode: str = \"incremental\"\n ) -> PipelineConfig:\n \"\"\"Generate extract-load pipeline configuration.\"\"\"\n\n tasks = []\n\n # Extract tasks\n for table in tables:\n extract_task = TaskConfig(\n task_id=f\"extract_{table}\",\n operator=\"python_operator\",\n params={\n 'callable': f'extract_{table}',\n 'sql': f'SELECT * FROM {table}' + (\n ' WHERE updated_at > {{{{ prev_ds }}}}' if mode == 'incremental' else ''\n )\n }\n )\n tasks.append(extract_task)\n\n # Load tasks with dependencies\n for table in tables:\n load_task = TaskConfig(\n task_id=f\"load_{table}\",\n operator=\"python_operator\",\n dependencies=[f\"extract_{table}\"],\n params={'callable': f'load_{table}'}\n )\n tasks.append(load_task)\n\n # Quality check task\n quality_task = TaskConfig(\n task_id=\"quality_check\",\n operator=\"python_operator\",\n dependencies=[f\"load_{table}\" for table in tables],\n params={'callable': 'run_quality_checks'}\n )\n tasks.append(quality_task)\n\n return PipelineConfig(\n name=f\"el_{source_type}_to_{destination_type}\",\n description=f\"Extract from {source_type}, load to {destination_type}\",\n schedule=\"0 5 * * *\", # Daily at 5 AM\n tags=[\"etl\", source_type, destination_type],\n source=SourceConfig(\n type=source_type,\n connection_id=f\"{source_type}_default\",\n tables=tables,\n incremental_strategy=\"timestamp\" if mode == \"incremental\" else \"full\"\n ),\n destination=DestinationConfig(\n type=destination_type,\n connection_id=f\"{destination_type}_default\",\n write_mode=\"append\" if mode == \"incremental\" else \"overwrite\"\n ),\n tasks=tasks\n )\n\n @staticmethod\n def generate_transform_pipeline(\n source_tables: List[str],\n target_table: str,\n dbt_models: List[str]\n ) -> PipelineConfig:\n \"\"\"Generate transformation pipeline with dbt.\"\"\"\n\n tasks = []\n\n # Sensor for source freshness\n for table in source_tables:\n sensor_task = TaskConfig(\n task_id=f\"wait_for_{table}\",\n operator=\"sql_sensor\",\n params={\n 'sql': f\"SELECT MAX(updated_at) FROM {table} WHERE updated_at > '{{{{ ds }}}}'\"\n }\n )\n tasks.append(sensor_task)\n\n # dbt run task\n dbt_run = TaskConfig(\n task_id=\"dbt_run\",\n operator=\"bash_operator\",\n dependencies=[f\"wait_for_{t}\" for t in source_tables],\n params={\n 'command': f'cd /opt/dbt && dbt run --select {\" \".join(dbt_models)}'\n },\n timeout_minutes=120\n )\n tasks.append(dbt_run)\n\n # dbt test task\n dbt_test = TaskConfig(\n task_id=\"dbt_test\",\n operator=\"bash_operator\",\n dependencies=[\"dbt_run\"],\n params={\n 'command': f'cd /opt/dbt && dbt test --select {\" \".join(dbt_models)}'\n }\n )\n tasks.append(dbt_test)\n\n return PipelineConfig(\n name=f\"transform_{target_table}\",\n description=f\"Transform data into {target_table} using dbt\",\n schedule=\"0 6 * * *\", # Daily at 6 AM (after extraction)\n tags=[\"transform\", \"dbt\"],\n tasks=tasks\n )\n\n\n# ============================================================================\n# CLI Interface\n# ============================================================================\n\ndef main():\n parser = argparse.ArgumentParser(\n description=\"Pipeline Orchestrator - Generate and manage data pipeline configurations\",\n formatter_class=argparse.RawDescriptionHelpFormatter,\n epilog=\"\"\"\nExamples:\n Generate Airflow DAG:\n python pipeline_orchestrator.py generate --type airflow --source postgres --destination snowflake --tables orders,customers\n\n Generate from config file:\n python pipeline_orchestrator.py generate --config pipeline.yaml --type prefect\n\n Validate existing DAG:\n python pipeline_orchestrator.py validate --dag dags/my_dag.py --type airflow\n \"\"\"\n )\n\n subparsers = parser.add_subparsers(dest='command', help='Command to run')\n\n # Generate command\n gen_parser = subparsers.add_parser('generate', help='Generate pipeline code')\n gen_parser.add_argument('--type', '-t', required=True,\n choices=['airflow', 'prefect', 'dagster'],\n help='Pipeline framework type')\n gen_parser.add_argument('--source', '-s', help='Source system type')\n gen_parser.add_argument('--destination', '-d', help='Destination system type')\n gen_parser.add_argument('--tables', help='Comma-separated list of tables')\n gen_parser.add_argument('--config', '-c', help='Configuration YAML file')\n gen_parser.add_argument('--output', '-o', help='Output file path')\n gen_parser.add_argument('--name', '-n', help='Pipeline name')\n gen_parser.add_argument('--schedule', default='0 5 * * *', help='Cron schedule')\n gen_parser.add_argument('--mode', default='incremental',\n choices=['incremental', 'full'],\n help='Load mode')\n\n # Validate command\n val_parser = subparsers.add_parser('validate', help='Validate pipeline code')\n val_parser.add_argument('--dag', required=True, help='DAG file to validate')\n val_parser.add_argument('--type', '-t', required=True,\n choices=['airflow', 'prefect', 'dagster'])\n\n # Template command\n tmpl_parser = subparsers.add_parser('template', help='Generate from template')\n tmpl_parser.add_argument('--pattern', '-p', required=True,\n choices=['extract-load', 'transform', 'cdc'],\n help='ETL pattern to generate')\n tmpl_parser.add_argument('--type', '-t', required=True,\n choices=['airflow', 'prefect', 'dagster'])\n tmpl_parser.add_argument('--source', '-s', required=True)\n tmpl_parser.add_argument('--destination', '-d', required=True)\n tmpl_parser.add_argument('--tables', required=True)\n tmpl_parser.add_argument('--output', '-o', help='Output file path')\n\n args = parser.parse_args()\n\n if args.command is None:\n parser.print_help()\n sys.exit(1)\n\n try:\n if args.command == 'generate':\n # Load config if provided\n if args.config:\n with open(args.config) as f:\n if HAS_YAML:\n config_data = yaml.safe_load(f)\n else:\n config_data = json.load(f)\n config = PipelineConfig(**config_data)\n else:\n # Build config from arguments\n tables = args.tables.split(',') if args.tables else []\n\n config = ETLPatternGenerator.generate_extract_load(\n source_type=args.source or 'postgres',\n destination_type=args.destination or 'snowflake',\n tables=tables,\n mode=args.mode\n )\n\n if args.name:\n config.name = args.name\n config.schedule = args.schedule\n\n # Generate code\n generators = {\n 'airflow': AirflowGenerator(),\n 'prefect': PrefectGenerator(),\n 'dagster': DagsterGenerator()\n }\n\n generator = generators[args.type]\n code = generator.generate(config)\n\n # Validate\n validation = generator.validate(code)\n if not validation['valid']:\n logger.warning(f\"Validation issues: {validation['issues']}\")\n\n # Output\n if args.output:\n with open(args.output, 'w') as f:\n f.write(code)\n logger.info(f\"Generated pipeline saved to {args.output}\")\n else:\n print(code)\n\n elif args.command == 'validate':\n with open(args.dag) as f:\n code = f.read()\n\n generators = {\n 'airflow': AirflowGenerator(),\n 'prefect': PrefectGenerator(),\n 'dagster': DagsterGenerator()\n }\n\n generator = generators[args.type]\n result = generator.validate(code)\n\n print(json.dumps(result, indent=2))\n sys.exit(0 if result['valid'] else 1)\n\n elif args.command == 'template':\n tables = args.tables.split(',')\n\n if args.pattern == 'extract-load':\n config = ETLPatternGenerator.generate_extract_load(\n source_type=args.source,\n destination_type=args.destination,\n tables=tables\n )\n elif args.pattern == 'transform':\n config = ETLPatternGenerator.generate_transform_pipeline(\n source_tables=tables,\n target_table='fct_output',\n dbt_models=['stg_*', 'fct_*']\n )\n else:\n logger.error(f\"Pattern {args.pattern} not yet implemented\")\n sys.exit(1)\n\n generators = {\n 'airflow': AirflowGenerator(),\n 'prefect': PrefectGenerator(),\n 'dagster': DagsterGenerator()\n }\n\n generator = generators[args.type]\n code = generator.generate(config)\n\n if args.output:\n with open(args.output, 'w') as f:\n f.write(code)\n logger.info(f\"Generated {args.pattern} pipeline saved to {args.output}\")\n else:\n print(code)\n\n sys.exit(0)\n\n except Exception as e:\n logger.error(f\"Error: {e}\")\n sys.exit(1)\n\n\nif __name__ == '__main__':\n main()\n","content_type":"text/x-python; charset=utf-8","language":"python","size":28771,"content_sha256":"e3398fb1698ee68a345e4a8e8f03c134d2c4ab6f07c43e0658bb1b1c2e9c5db9"}],"content_json":{"type":"doc","content":[{"type":"heading","attrs":{"level":1},"content":[{"text":"Senior Data Engineer","type":"text"}]},{"type":"paragraph","content":[{"text":"Production-grade data engineering skill for building scalable, reliable data systems.","type":"text"}]},{"type":"heading","attrs":{"level":2},"content":[{"text":"Table of Contents","type":"text"}]},{"type":"ordered_list","attrs":{"order":1,"listStyle":"number"},"content":[{"type":"list_item","content":[{"type":"paragraph","content":[{"text":"Trigger Phrases","type":"text","marks":[{"type":"link","attrs":{"href":"#trigger-phrases","title":null}}]}]}]},{"type":"list_item","content":[{"type":"paragraph","content":[{"text":"Quick Start","type":"text","marks":[{"type":"link","attrs":{"href":"#quick-start","title":null}}]}]}]},{"type":"list_item","content":[{"type":"paragraph","content":[{"text":"Workflows","type":"text","marks":[{"type":"link","attrs":{"href":"#workflows","title":null}}]}]}]},{"type":"list_item","content":[{"type":"paragraph","content":[{"text":"Architecture Decision Framework","type":"text","marks":[{"type":"link","attrs":{"href":"#architecture-decision-framework","title":null}}]}]}]},{"type":"list_item","content":[{"type":"paragraph","content":[{"text":"Tech Stack","type":"text","marks":[{"type":"link","attrs":{"href":"#tech-stack","title":null}}]}]}]},{"type":"list_item","content":[{"type":"paragraph","content":[{"text":"Reference Documentation","type":"text","marks":[{"type":"link","attrs":{"href":"#reference-documentation","title":null}}]}]}]},{"type":"list_item","content":[{"type":"paragraph","content":[{"text":"Troubleshooting","type":"text","marks":[{"type":"link","attrs":{"href":"#troubleshooting","title":null}}]}]}]}]},{"type":"hr","attrs":{"markup":"---"}},{"type":"heading","attrs":{"level":2},"content":[{"text":"Trigger Phrases","type":"text"}]},{"type":"paragraph","content":[{"text":"Activate this skill when you see:","type":"text"}]},{"type":"paragraph","content":[{"text":"Pipeline Design:","type":"text","marks":[{"type":"strong"}]}]},{"type":"bullet_list","content":[{"type":"list_item","content":[{"type":"paragraph","content":[{"text":"\"Design a data pipeline for...\"","type":"text"}]}]},{"type":"list_item","content":[{"type":"paragraph","content":[{"text":"\"Build an ETL/ELT process...\"","type":"text"}]}]},{"type":"list_item","content":[{"type":"paragraph","content":[{"text":"\"How should I ingest data from...\"","type":"text"}]}]},{"type":"list_item","content":[{"type":"paragraph","content":[{"text":"\"Set up data extraction from...\"","type":"text"}]}]}]},{"type":"paragraph","content":[{"text":"Architecture:","type":"text","marks":[{"type":"strong"}]}]},{"type":"bullet_list","content":[{"type":"list_item","content":[{"type":"paragraph","content":[{"text":"\"Should I use batch or streaming?\"","type":"text"}]}]},{"type":"list_item","content":[{"type":"paragraph","content":[{"text":"\"Lambda vs Kappa architecture\"","type":"text"}]}]},{"type":"list_item","content":[{"type":"paragraph","content":[{"text":"\"How to handle late-arriving data\"","type":"text"}]}]},{"type":"list_item","content":[{"type":"paragraph","content":[{"text":"\"Design a data lakehouse\"","type":"text"}]}]}]},{"type":"paragraph","content":[{"text":"Data Modeling:","type":"text","marks":[{"type":"strong"}]}]},{"type":"bullet_list","content":[{"type":"list_item","content":[{"type":"paragraph","content":[{"text":"\"Create a dimensional model...\"","type":"text"}]}]},{"type":"list_item","content":[{"type":"paragraph","content":[{"text":"\"Star schema vs snowflake\"","type":"text"}]}]},{"type":"list_item","content":[{"type":"paragraph","content":[{"text":"\"Implement slowly changing dimensions\"","type":"text"}]}]},{"type":"list_item","content":[{"type":"paragraph","content":[{"text":"\"Design a data vault\"","type":"text"}]}]}]},{"type":"paragraph","content":[{"text":"Data Quality:","type":"text","marks":[{"type":"strong"}]}]},{"type":"bullet_list","content":[{"type":"list_item","content":[{"type":"paragraph","content":[{"text":"\"Add data validation to...\"","type":"text"}]}]},{"type":"list_item","content":[{"type":"paragraph","content":[{"text":"\"Set up data quality checks\"","type":"text"}]}]},{"type":"list_item","content":[{"type":"paragraph","content":[{"text":"\"Monitor data freshness\"","type":"text"}]}]},{"type":"list_item","content":[{"type":"paragraph","content":[{"text":"\"Implement data contracts\"","type":"text"}]}]}]},{"type":"paragraph","content":[{"text":"Performance:","type":"text","marks":[{"type":"strong"}]}]},{"type":"bullet_list","content":[{"type":"list_item","content":[{"type":"paragraph","content":[{"text":"\"Optimize this Spark job\"","type":"text"}]}]},{"type":"list_item","content":[{"type":"paragraph","content":[{"text":"\"Query is running slow\"","type":"text"}]}]},{"type":"list_item","content":[{"type":"paragraph","content":[{"text":"\"Reduce pipeline execution time\"","type":"text"}]}]},{"type":"list_item","content":[{"type":"paragraph","content":[{"text":"\"Tune Airflow DAG\"","type":"text"}]}]}]},{"type":"hr","attrs":{"markup":"---"}},{"type":"heading","attrs":{"level":2},"content":[{"text":"Quick Start","type":"text"}]},{"type":"heading","attrs":{"level":3},"content":[{"text":"Core Tools","type":"text"}]},{"type":"code_block","attrs":{"wrap":false,"language":"bash"},"content":[{"text":"# Generate pipeline orchestration config\npython scripts/pipeline_orchestrator.py generate \\\n --type airflow \\\n --source postgres \\\n --destination snowflake \\\n --schedule \"0 5 * * *\"\n\n# Validate data quality\npython scripts/data_quality_validator.py validate \\\n --input data/sales.parquet \\\n --schema schemas/sales.json \\\n --checks freshness,completeness,uniqueness\n\n# Optimize ETL performance\npython scripts/etl_performance_optimizer.py analyze \\\n --query queries/daily_aggregation.sql \\\n --engine spark \\\n --recommend","type":"text"}]},{"type":"hr","attrs":{"markup":"---"}},{"type":"heading","attrs":{"level":2},"content":[{"text":"Workflows","type":"text"}]},{"type":"paragraph","content":[{"text":"→ See references/workflows.md for details","type":"text"}]},{"type":"heading","attrs":{"level":2},"content":[{"text":"Architecture Decision Framework","type":"text"}]},{"type":"paragraph","content":[{"text":"Use this framework to choose the right approach for your data pipeline.","type":"text"}]},{"type":"heading","attrs":{"level":3},"content":[{"text":"Batch vs Streaming","type":"text"}]},{"type":"table","attrs":{"layout":null},"content":[{"type":"tr","content":[{"type":"th","attrs":{"colspan":1,"rowspan":1,"colwidth":null,"alignment":""},"content":[{"type":"paragraph","content":[{"text":"Criteria","type":"text"}]}]},{"type":"th","attrs":{"colspan":1,"rowspan":1,"colwidth":null,"alignment":""},"content":[{"type":"paragraph","content":[{"text":"Batch","type":"text"}]}]},{"type":"th","attrs":{"colspan":1,"rowspan":1,"colwidth":null,"alignment":""},"content":[{"type":"paragraph","content":[{"text":"Streaming","type":"text"}]}]}]},{"type":"tr","content":[{"type":"td","attrs":{"colspan":1,"rowspan":1,"colwidth":null,"alignment":""},"content":[{"type":"paragraph","content":[{"text":"Latency requirement","type":"text","marks":[{"type":"strong"}]}]}]},{"type":"td","attrs":{"colspan":1,"rowspan":1,"colwidth":null,"alignment":""},"content":[{"type":"paragraph","content":[{"text":"Hours to days","type":"text"}]}]},{"type":"td","attrs":{"colspan":1,"rowspan":1,"colwidth":null,"alignment":""},"content":[{"type":"paragraph","content":[{"text":"Seconds to minutes","type":"text"}]}]}]},{"type":"tr","content":[{"type":"td","attrs":{"colspan":1,"rowspan":1,"colwidth":null,"alignment":""},"content":[{"type":"paragraph","content":[{"text":"Data volume","type":"text","marks":[{"type":"strong"}]}]}]},{"type":"td","attrs":{"colspan":1,"rowspan":1,"colwidth":null,"alignment":""},"content":[{"type":"paragraph","content":[{"text":"Large historical datasets","type":"text"}]}]},{"type":"td","attrs":{"colspan":1,"rowspan":1,"colwidth":null,"alignment":""},"content":[{"type":"paragraph","content":[{"text":"Continuous event streams","type":"text"}]}]}]},{"type":"tr","content":[{"type":"td","attrs":{"colspan":1,"rowspan":1,"colwidth":null,"alignment":""},"content":[{"type":"paragraph","content":[{"text":"Processing complexity","type":"text","marks":[{"type":"strong"}]}]}]},{"type":"td","attrs":{"colspan":1,"rowspan":1,"colwidth":null,"alignment":""},"content":[{"type":"paragraph","content":[{"text":"Complex transformations, ML","type":"text"}]}]},{"type":"td","attrs":{"colspan":1,"rowspan":1,"colwidth":null,"alignment":""},"content":[{"type":"paragraph","content":[{"text":"Simple aggregations, filtering","type":"text"}]}]}]},{"type":"tr","content":[{"type":"td","attrs":{"colspan":1,"rowspan":1,"colwidth":null,"alignment":""},"content":[{"type":"paragraph","content":[{"text":"Cost sensitivity","type":"text","marks":[{"type":"strong"}]}]}]},{"type":"td","attrs":{"colspan":1,"rowspan":1,"colwidth":null,"alignment":""},"content":[{"type":"paragraph","content":[{"text":"More cost-effective","type":"text"}]}]},{"type":"td","attrs":{"colspan":1,"rowspan":1,"colwidth":null,"alignment":""},"content":[{"type":"paragraph","content":[{"text":"Higher infrastructure cost","type":"text"}]}]}]},{"type":"tr","content":[{"type":"td","attrs":{"colspan":1,"rowspan":1,"colwidth":null,"alignment":""},"content":[{"type":"paragraph","content":[{"text":"Error handling","type":"text","marks":[{"type":"strong"}]}]}]},{"type":"td","attrs":{"colspan":1,"rowspan":1,"colwidth":null,"alignment":""},"content":[{"type":"paragraph","content":[{"text":"Easier to reprocess","type":"text"}]}]},{"type":"td","attrs":{"colspan":1,"rowspan":1,"colwidth":null,"alignment":""},"content":[{"type":"paragraph","content":[{"text":"Requires careful design","type":"text"}]}]}]}]},{"type":"paragraph","content":[{"text":"Decision Tree:","type":"text","marks":[{"type":"strong"}]}]},{"type":"code_block","attrs":{"wrap":false,"language":""},"content":[{"text":"Is real-time insight required?\n├── Yes → Use streaming\n│ └── Is exactly-once semantics needed?\n│ ├── Yes → Kafka + Flink/Spark Structured Streaming\n│ └── No → Kafka + consumer groups\n└── No → Use batch\n └── Is data volume > 1TB daily?\n ├── Yes → Spark/Databricks\n └── No → dbt + warehouse compute","type":"text"}]},{"type":"heading","attrs":{"level":3},"content":[{"text":"Lambda vs Kappa Architecture","type":"text"}]},{"type":"table","attrs":{"layout":null},"content":[{"type":"tr","content":[{"type":"th","attrs":{"colspan":1,"rowspan":1,"colwidth":null,"alignment":""},"content":[{"type":"paragraph","content":[{"text":"Aspect","type":"text"}]}]},{"type":"th","attrs":{"colspan":1,"rowspan":1,"colwidth":null,"alignment":""},"content":[{"type":"paragraph","content":[{"text":"Lambda","type":"text"}]}]},{"type":"th","attrs":{"colspan":1,"rowspan":1,"colwidth":null,"alignment":""},"content":[{"type":"paragraph","content":[{"text":"Kappa","type":"text"}]}]}]},{"type":"tr","content":[{"type":"td","attrs":{"colspan":1,"rowspan":1,"colwidth":null,"alignment":""},"content":[{"type":"paragraph","content":[{"text":"Complexity","type":"text","marks":[{"type":"strong"}]}]}]},{"type":"td","attrs":{"colspan":1,"rowspan":1,"colwidth":null,"alignment":""},"content":[{"type":"paragraph","content":[{"text":"Two codebases (batch + stream)","type":"text"}]}]},{"type":"td","attrs":{"colspan":1,"rowspan":1,"colwidth":null,"alignment":""},"content":[{"type":"paragraph","content":[{"text":"Single codebase","type":"text"}]}]}]},{"type":"tr","content":[{"type":"td","attrs":{"colspan":1,"rowspan":1,"colwidth":null,"alignment":""},"content":[{"type":"paragraph","content":[{"text":"Maintenance","type":"text","marks":[{"type":"strong"}]}]}]},{"type":"td","attrs":{"colspan":1,"rowspan":1,"colwidth":null,"alignment":""},"content":[{"type":"paragraph","content":[{"text":"Higher (sync batch/stream logic)","type":"text"}]}]},{"type":"td","attrs":{"colspan":1,"rowspan":1,"colwidth":null,"alignment":""},"content":[{"type":"paragraph","content":[{"text":"Lower","type":"text"}]}]}]},{"type":"tr","content":[{"type":"td","attrs":{"colspan":1,"rowspan":1,"colwidth":null,"alignment":""},"content":[{"type":"paragraph","content":[{"text":"Reprocessing","type":"text","marks":[{"type":"strong"}]}]}]},{"type":"td","attrs":{"colspan":1,"rowspan":1,"colwidth":null,"alignment":""},"content":[{"type":"paragraph","content":[{"text":"Native batch layer","type":"text"}]}]},{"type":"td","attrs":{"colspan":1,"rowspan":1,"colwidth":null,"alignment":""},"content":[{"type":"paragraph","content":[{"text":"Replay from source","type":"text"}]}]}]},{"type":"tr","content":[{"type":"td","attrs":{"colspan":1,"rowspan":1,"colwidth":null,"alignment":""},"content":[{"type":"paragraph","content":[{"text":"Use case","type":"text","marks":[{"type":"strong"}]}]}]},{"type":"td","attrs":{"colspan":1,"rowspan":1,"colwidth":null,"alignment":""},"content":[{"type":"paragraph","content":[{"text":"ML training + real-time serving","type":"text"}]}]},{"type":"td","attrs":{"colspan":1,"rowspan":1,"colwidth":null,"alignment":""},"content":[{"type":"paragraph","content":[{"text":"Pure event-driven","type":"text"}]}]}]}]},{"type":"paragraph","content":[{"text":"When to choose Lambda:","type":"text","marks":[{"type":"strong"}]}]},{"type":"bullet_list","content":[{"type":"list_item","content":[{"type":"paragraph","content":[{"text":"Need to train ML models on historical data","type":"text"}]}]},{"type":"list_item","content":[{"type":"paragraph","content":[{"text":"Complex batch transformations not feasible in streaming","type":"text"}]}]},{"type":"list_item","content":[{"type":"paragraph","content":[{"text":"Existing batch infrastructure","type":"text"}]}]}]},{"type":"paragraph","content":[{"text":"When to choose Kappa:","type":"text","marks":[{"type":"strong"}]}]},{"type":"bullet_list","content":[{"type":"list_item","content":[{"type":"paragraph","content":[{"text":"Event-sourced architecture","type":"text"}]}]},{"type":"list_item","content":[{"type":"paragraph","content":[{"text":"All processing can be expressed as stream operations","type":"text"}]}]},{"type":"list_item","content":[{"type":"paragraph","content":[{"text":"Starting fresh without legacy systems","type":"text"}]}]}]},{"type":"heading","attrs":{"level":3},"content":[{"text":"Data Warehouse vs Data Lakehouse","type":"text"}]},{"type":"table","attrs":{"layout":null},"content":[{"type":"tr","content":[{"type":"th","attrs":{"colspan":1,"rowspan":1,"colwidth":null,"alignment":""},"content":[{"type":"paragraph","content":[{"text":"Feature","type":"text"}]}]},{"type":"th","attrs":{"colspan":1,"rowspan":1,"colwidth":null,"alignment":""},"content":[{"type":"paragraph","content":[{"text":"Warehouse (Snowflake/BigQuery)","type":"text"}]}]},{"type":"th","attrs":{"colspan":1,"rowspan":1,"colwidth":null,"alignment":""},"content":[{"type":"paragraph","content":[{"text":"Lakehouse (Delta/Iceberg)","type":"text"}]}]}]},{"type":"tr","content":[{"type":"td","attrs":{"colspan":1,"rowspan":1,"colwidth":null,"alignment":""},"content":[{"type":"paragraph","content":[{"text":"Best for","type":"text","marks":[{"type":"strong"}]}]}]},{"type":"td","attrs":{"colspan":1,"rowspan":1,"colwidth":null,"alignment":""},"content":[{"type":"paragraph","content":[{"text":"BI, SQL analytics","type":"text"}]}]},{"type":"td","attrs":{"colspan":1,"rowspan":1,"colwidth":null,"alignment":""},"content":[{"type":"paragraph","content":[{"text":"ML, unstructured data","type":"text"}]}]}]},{"type":"tr","content":[{"type":"td","attrs":{"colspan":1,"rowspan":1,"colwidth":null,"alignment":""},"content":[{"type":"paragraph","content":[{"text":"Storage cost","type":"text","marks":[{"type":"strong"}]}]}]},{"type":"td","attrs":{"colspan":1,"rowspan":1,"colwidth":null,"alignment":""},"content":[{"type":"paragraph","content":[{"text":"Higher (proprietary format)","type":"text"}]}]},{"type":"td","attrs":{"colspan":1,"rowspan":1,"colwidth":null,"alignment":""},"content":[{"type":"paragraph","content":[{"text":"Lower (open formats)","type":"text"}]}]}]},{"type":"tr","content":[{"type":"td","attrs":{"colspan":1,"rowspan":1,"colwidth":null,"alignment":""},"content":[{"type":"paragraph","content":[{"text":"Flexibility","type":"text","marks":[{"type":"strong"}]}]}]},{"type":"td","attrs":{"colspan":1,"rowspan":1,"colwidth":null,"alignment":""},"content":[{"type":"paragraph","content":[{"text":"Schema-on-write","type":"text"}]}]},{"type":"td","attrs":{"colspan":1,"rowspan":1,"colwidth":null,"alignment":""},"content":[{"type":"paragraph","content":[{"text":"Schema-on-read","type":"text"}]}]}]},{"type":"tr","content":[{"type":"td","attrs":{"colspan":1,"rowspan":1,"colwidth":null,"alignment":""},"content":[{"type":"paragraph","content":[{"text":"Performance","type":"text","marks":[{"type":"strong"}]}]}]},{"type":"td","attrs":{"colspan":1,"rowspan":1,"colwidth":null,"alignment":""},"content":[{"type":"paragraph","content":[{"text":"Excellent for SQL","type":"text"}]}]},{"type":"td","attrs":{"colspan":1,"rowspan":1,"colwidth":null,"alignment":""},"content":[{"type":"paragraph","content":[{"text":"Good, improving","type":"text"}]}]}]},{"type":"tr","content":[{"type":"td","attrs":{"colspan":1,"rowspan":1,"colwidth":null,"alignment":""},"content":[{"type":"paragraph","content":[{"text":"Ecosystem","type":"text","marks":[{"type":"strong"}]}]}]},{"type":"td","attrs":{"colspan":1,"rowspan":1,"colwidth":null,"alignment":""},"content":[{"type":"paragraph","content":[{"text":"Mature BI tools","type":"text"}]}]},{"type":"td","attrs":{"colspan":1,"rowspan":1,"colwidth":null,"alignment":""},"content":[{"type":"paragraph","content":[{"text":"Growing ML tooling","type":"text"}]}]}]}]},{"type":"hr","attrs":{"markup":"---"}},{"type":"heading","attrs":{"level":2},"content":[{"text":"Tech Stack","type":"text"}]},{"type":"table","attrs":{"layout":null},"content":[{"type":"tr","content":[{"type":"th","attrs":{"colspan":1,"rowspan":1,"colwidth":null,"alignment":""},"content":[{"type":"paragraph","content":[{"text":"Category","type":"text"}]}]},{"type":"th","attrs":{"colspan":1,"rowspan":1,"colwidth":null,"alignment":""},"content":[{"type":"paragraph","content":[{"text":"Technologies","type":"text"}]}]}]},{"type":"tr","content":[{"type":"td","attrs":{"colspan":1,"rowspan":1,"colwidth":null,"alignment":""},"content":[{"type":"paragraph","content":[{"text":"Languages","type":"text","marks":[{"type":"strong"}]}]}]},{"type":"td","attrs":{"colspan":1,"rowspan":1,"colwidth":null,"alignment":""},"content":[{"type":"paragraph","content":[{"text":"Python, SQL, Scala","type":"text"}]}]}]},{"type":"tr","content":[{"type":"td","attrs":{"colspan":1,"rowspan":1,"colwidth":null,"alignment":""},"content":[{"type":"paragraph","content":[{"text":"Orchestration","type":"text","marks":[{"type":"strong"}]}]}]},{"type":"td","attrs":{"colspan":1,"rowspan":1,"colwidth":null,"alignment":""},"content":[{"type":"paragraph","content":[{"text":"Airflow, Prefect, Dagster","type":"text"}]}]}]},{"type":"tr","content":[{"type":"td","attrs":{"colspan":1,"rowspan":1,"colwidth":null,"alignment":""},"content":[{"type":"paragraph","content":[{"text":"Transformation","type":"text","marks":[{"type":"strong"}]}]}]},{"type":"td","attrs":{"colspan":1,"rowspan":1,"colwidth":null,"alignment":""},"content":[{"type":"paragraph","content":[{"text":"dbt, Spark, Flink","type":"text"}]}]}]},{"type":"tr","content":[{"type":"td","attrs":{"colspan":1,"rowspan":1,"colwidth":null,"alignment":""},"content":[{"type":"paragraph","content":[{"text":"Streaming","type":"text","marks":[{"type":"strong"}]}]}]},{"type":"td","attrs":{"colspan":1,"rowspan":1,"colwidth":null,"alignment":""},"content":[{"type":"paragraph","content":[{"text":"Kafka, Kinesis, Pub/Sub","type":"text"}]}]}]},{"type":"tr","content":[{"type":"td","attrs":{"colspan":1,"rowspan":1,"colwidth":null,"alignment":""},"content":[{"type":"paragraph","content":[{"text":"Storage","type":"text","marks":[{"type":"strong"}]}]}]},{"type":"td","attrs":{"colspan":1,"rowspan":1,"colwidth":null,"alignment":""},"content":[{"type":"paragraph","content":[{"text":"S3, GCS, Delta Lake, Iceberg","type":"text"}]}]}]},{"type":"tr","content":[{"type":"td","attrs":{"colspan":1,"rowspan":1,"colwidth":null,"alignment":""},"content":[{"type":"paragraph","content":[{"text":"Warehouses","type":"text","marks":[{"type":"strong"}]}]}]},{"type":"td","attrs":{"colspan":1,"rowspan":1,"colwidth":null,"alignment":""},"content":[{"type":"paragraph","content":[{"text":"Snowflake, BigQuery, Redshift, Databricks","type":"text"}]}]}]},{"type":"tr","content":[{"type":"td","attrs":{"colspan":1,"rowspan":1,"colwidth":null,"alignment":""},"content":[{"type":"paragraph","content":[{"text":"Quality","type":"text","marks":[{"type":"strong"}]}]}]},{"type":"td","attrs":{"colspan":1,"rowspan":1,"colwidth":null,"alignment":""},"content":[{"type":"paragraph","content":[{"text":"Great Expectations, dbt tests, Monte Carlo","type":"text"}]}]}]},{"type":"tr","content":[{"type":"td","attrs":{"colspan":1,"rowspan":1,"colwidth":null,"alignment":""},"content":[{"type":"paragraph","content":[{"text":"Monitoring","type":"text","marks":[{"type":"strong"}]}]}]},{"type":"td","attrs":{"colspan":1,"rowspan":1,"colwidth":null,"alignment":""},"content":[{"type":"paragraph","content":[{"text":"Prometheus, Grafana, Datadog","type":"text"}]}]}]}]},{"type":"hr","attrs":{"markup":"---"}},{"type":"heading","attrs":{"level":2},"content":[{"text":"Reference Documentation","type":"text"}]},{"type":"heading","attrs":{"level":3},"content":[{"text":"1. Data Pipeline Architecture","type":"text"}]},{"type":"paragraph","content":[{"text":"See ","type":"text"},{"text":"references/data_pipeline_architecture.md","type":"text","marks":[{"type":"code_inline"}]},{"text":" for:","type":"text"}]},{"type":"bullet_list","content":[{"type":"list_item","content":[{"type":"paragraph","content":[{"text":"Lambda vs Kappa architecture patterns","type":"text"}]}]},{"type":"list_item","content":[{"type":"paragraph","content":[{"text":"Batch processing with Spark and Airflow","type":"text"}]}]},{"type":"list_item","content":[{"type":"paragraph","content":[{"text":"Stream processing with Kafka and Flink","type":"text"}]}]},{"type":"list_item","content":[{"type":"paragraph","content":[{"text":"Exactly-once semantics implementation","type":"text"}]}]},{"type":"list_item","content":[{"type":"paragraph","content":[{"text":"Error handling and dead letter queues","type":"text"}]}]}]},{"type":"heading","attrs":{"level":3},"content":[{"text":"2. Data Modeling Patterns","type":"text"}]},{"type":"paragraph","content":[{"text":"See ","type":"text"},{"text":"references/data_modeling_patterns.md","type":"text","marks":[{"type":"code_inline"}]},{"text":" for:","type":"text"}]},{"type":"bullet_list","content":[{"type":"list_item","content":[{"type":"paragraph","content":[{"text":"Dimensional modeling (Star/Snowflake)","type":"text"}]}]},{"type":"list_item","content":[{"type":"paragraph","content":[{"text":"Slowly Changing Dimensions (SCD Types 1-6)","type":"text"}]}]},{"type":"list_item","content":[{"type":"paragraph","content":[{"text":"Data Vault modeling","type":"text"}]}]},{"type":"list_item","content":[{"type":"paragraph","content":[{"text":"dbt best practices","type":"text"}]}]},{"type":"list_item","content":[{"type":"paragraph","content":[{"text":"Partitioning and clustering","type":"text"}]}]}]},{"type":"heading","attrs":{"level":3},"content":[{"text":"3. DataOps Best Practices","type":"text"}]},{"type":"paragraph","content":[{"text":"See ","type":"text"},{"text":"references/dataops_best_practices.md","type":"text","marks":[{"type":"code_inline"}]},{"text":" for:","type":"text"}]},{"type":"bullet_list","content":[{"type":"list_item","content":[{"type":"paragraph","content":[{"text":"Data testing frameworks","type":"text"}]}]},{"type":"list_item","content":[{"type":"paragraph","content":[{"text":"Data contracts and schema validation","type":"text"}]}]},{"type":"list_item","content":[{"type":"paragraph","content":[{"text":"CI/CD for data pipelines","type":"text"}]}]},{"type":"list_item","content":[{"type":"paragraph","content":[{"text":"Observability and lineage","type":"text"}]}]},{"type":"list_item","content":[{"type":"paragraph","content":[{"text":"Incident response","type":"text"}]}]}]},{"type":"hr","attrs":{"markup":"---"}},{"type":"heading","attrs":{"level":2},"content":[{"text":"Troubleshooting","type":"text"}]},{"type":"paragraph","content":[{"text":"→ See references/troubleshooting.md for details","type":"text"}]},{"type":"hr","attrs":{"markup":"---"}}]},"metadata":{"date":"2026-06-05","name":"senior-data-engineer","author":"@skillopedia","source":{"stars":16818,"repo_name":"claude-skills","origin_url":"https://github.com/alirezarezvani/claude-skills/blob/HEAD/engineering-team/skills/senior-data-engineer/SKILL.md","repo_owner":"alirezarezvani","body_sha256":"144bcc751029939da58b55b78ee42c358911a25bbaed2a5eba9aafaf9d6074e9","cluster_key":"47dc70f09882d78d4ede141554721c5a71aff61195a1400e77d1de70823ea832","clean_bundle":{"format":"clean-skill-bundle-v1","source":"alirezarezvani/claude-skills/engineering-team/skills/senior-data-engineer/SKILL.md","attachments":[{"id":"0a2f5bdb-9641-58b4-9de5-ec75817e440e","key":"uploads/10433ee7-ad12-4ae0-b34e-97553e46c6c8/0a2f5bdb-9641-58b4-9de5-ec75817e440e/attachment.md","path":"references/data_modeling_patterns.md","size":26737,"sha256":"377c8e7012c3ebe4c6057567180cdec23e9da0fc540b68d2b3129b31bc635be0","contentType":"text/markdown; charset=utf-8"},{"id":"605b78a9-81ad-5e9a-b256-cc669eea0a6f","key":"uploads/10433ee7-ad12-4ae0-b34e-97553e46c6c8/605b78a9-81ad-5e9a-b256-cc669eea0a6f/attachment.md","path":"references/data_pipeline_architecture.md","size":34584,"sha256":"a538d2b41e81b62457a5c0da3e06c56713339f497d5dbff65bde9a29575b931b","contentType":"text/markdown; charset=utf-8"},{"id":"a4910a1e-0619-546d-9e3e-6c8c0f7f564c","key":"uploads/10433ee7-ad12-4ae0-b34e-97553e46c6c8/a4910a1e-0619-546d-9e3e-6c8c0f7f564c/attachment.md","path":"references/dataops_best_practices.md","size":43421,"sha256":"e2d1db7f761fe873c7af9602cd15ae3493bbfeb0cd5e41bda813dd4e5e154570","contentType":"text/markdown; charset=utf-8"},{"id":"bce8d8dc-6e2b-57b3-99bd-2615e93eaa2a","key":"uploads/10433ee7-ad12-4ae0-b34e-97553e46c6c8/bce8d8dc-6e2b-57b3-99bd-2615e93eaa2a/attachment.md","path":"references/troubleshooting.md","size":3317,"sha256":"ecac51622e9d97d1eb93492981b29ed23805121d914905f37453c45c404d1f83","contentType":"text/markdown; charset=utf-8"},{"id":"23b45d0e-d93c-5393-a8c3-8fa77c1f8413","key":"uploads/10433ee7-ad12-4ae0-b34e-97553e46c6c8/23b45d0e-d93c-5393-a8c3-8fa77c1f8413/attachment.md","path":"references/workflows.md","size":15500,"sha256":"03211f89aab48e75a695acc6407013bb796461bd24392776b878bf65239f6ba4","contentType":"text/markdown; charset=utf-8"},{"id":"0c95766a-ce99-50b1-9198-4bcbeb1a73af","key":"uploads/10433ee7-ad12-4ae0-b34e-97553e46c6c8/0c95766a-ce99-50b1-9198-4bcbeb1a73af/attachment.py","path":"scripts/data_quality_validator.py","size":60660,"sha256":"2a1d1bc2c5f13692eeed3b35961338cc8e5ef38a2c4b1e3c062233931175ee4d","contentType":"text/x-python; charset=utf-8"},{"id":"d91dc96a-c40a-5468-9740-f63eb2e5e258","key":"uploads/10433ee7-ad12-4ae0-b34e-97553e46c6c8/d91dc96a-c40a-5468-9740-f63eb2e5e258/attachment.py","path":"scripts/etl_performance_optimizer.py","size":66066,"sha256":"6359b714f17bc1538518f8764daa921f86d09fac297c733f6448771714b5e194","contentType":"text/x-python; charset=utf-8"},{"id":"a51ee15b-d4ef-52da-8c2c-849d510bc543","key":"uploads/10433ee7-ad12-4ae0-b34e-97553e46c6c8/a51ee15b-d4ef-52da-8c2c-849d510bc543/attachment.py","path":"scripts/pipeline_orchestrator.py","size":28771,"sha256":"e3398fb1698ee68a345e4a8e8f03c134d2c4ab6f07c43e0658bb1b1c2e9c5db9","contentType":"text/x-python; charset=utf-8"}],"bundle_sha256":"2a0ac9b0198c01d66e2e37ef83cb9c778713982abc097b79909828884baf990c","attachment_count":8,"text_attachments":8,"attachment_storage":"skillopedia-attachments-v1","binary_attachments":0,"excluded_attachments":[]},"cluster_size":2,"skill_md_path":"engineering-team/skills/senior-data-engineer/SKILL.md","import_metadata":{"date":"2026-06-05","author":"@skillopedia","version":"v1","category":"devops-infrastructure","category_label":"DevOps"},"exact_dupes_collapsed_into_this":1},"version":"v1","category":"devops-infrastructure","import_tag":"clean-skills-v1","description":"Data engineering skill for building scalable data pipelines, ETL/ELT systems, and data infrastructure. Expertise in Python, SQL, Spark, Airflow, dbt, Kafka, and modern data stack. Includes data modeling, pipeline orchestration, data quality, and DataOps. Use when designing data architectures, building data pipelines, optimizing data workflows, implementing data governance, or troubleshooting data issues."}},"renderedAt":1782987058270}

Senior Data Engineer Production-grade data engineering skill for building scalable, reliable data systems. Table of Contents 1. Trigger Phrases 2. Quick Start 3. Workflows 4. Architecture Decision Framework 5. Tech Stack 6. Reference Documentation 7. Troubleshooting --- Trigger Phrases Activate this skill when you see: Pipeline Design: - "Design a data pipeline for..." - "Build an ETL/ELT process..." - "How should I ingest data from..." - "Set up data extraction from..." Architecture: - "Should I use batch or streaming?" - "Lambda vs Kappa architecture" - "How to handle late-arriving data" -…