Database Designer - POWERFUL Tier Skill Overview A comprehensive database design skill that provides expert-level analysis, optimization, and migration capabilities for modern database systems. This skill combines theoretical principles with practical tools to help architects and developers create scalable, performant, and maintainable database schemas. Core Competencies Schema Design & Analysis - Normalization Analysis : Automated detection of normalization levels (1NF through BCNF) - Denormalization Strategy : Smart recommendations for performance optimization - Data Type Optimization : Ide…

, r'^id

Database Designer - POWERFUL Tier Skill Overview A comprehensive database design skill that provides expert-level analysis, optimization, and migration capabilities for modern database systems. This skill combines theoretical principles with practical tools to help architects and developers create scalable, performant, and maintainable database schemas. Core Competencies Schema Design & Analysis - Normalization Analysis : Automated detection of normalization levels (1NF through BCNF) - Denormalization Strategy : Smart recommendations for performance optimization - Data Type Optimization : Ide…

, r'uuid', r'guid', r'email', r'username', r'ssn',\n r'account.*number', r'transaction.*id', r'reference.*number'\n ]\n \n self.medium_selectivity_patterns = [\n r'name

Database Designer - POWERFUL Tier Skill Overview A comprehensive database design skill that provides expert-level analysis, optimization, and migration capabilities for modern database systems. This skill combines theoretical principles with practical tools to help architects and developers create scalable, performant, and maintainable database schemas. Core Competencies Schema Design & Analysis - Normalization Analysis : Automated detection of normalization levels (1NF through BCNF) - Denormalization Strategy : Smart recommendations for performance optimization - Data Type Optimization : Ide…

, r'title

Database Designer - POWERFUL Tier Skill Overview A comprehensive database design skill that provides expert-level analysis, optimization, and migration capabilities for modern database systems. This skill combines theoretical principles with practical tools to help architects and developers create scalable, performant, and maintainable database schemas. Core Competencies Schema Design & Analysis - Normalization Analysis : Automated detection of normalization levels (1NF through BCNF) - Denormalization Strategy : Smart recommendations for performance optimization - Data Type Optimization : Ide…

, r'description

Database Designer - POWERFUL Tier Skill Overview A comprehensive database design skill that provides expert-level analysis, optimization, and migration capabilities for modern database systems. This skill combines theoretical principles with practical tools to help architects and developers create scalable, performant, and maintainable database schemas. Core Competencies Schema Design & Analysis - Normalization Analysis : Automated detection of normalization levels (1NF through BCNF) - Denormalization Strategy : Smart recommendations for performance optimization - Data Type Optimization : Ide…

, r'address', r'phone', r'zip',\n r'postal.*code', r'serial.*number', r'sku', r'product.*code'\n ]\n \n self.low_selectivity_patterns = [\n r'status

Database Designer - POWERFUL Tier Skill Overview A comprehensive database design skill that provides expert-level analysis, optimization, and migration capabilities for modern database systems. This skill combines theoretical principles with practical tools to help architects and developers create scalable, performant, and maintainable database schemas. Core Competencies Schema Design & Analysis - Normalization Analysis : Automated detection of normalization levels (1NF through BCNF) - Denormalization Strategy : Smart recommendations for performance optimization - Data Type Optimization : Ide…

, r'type

Database Designer - POWERFUL Tier Skill Overview A comprehensive database design skill that provides expert-level analysis, optimization, and migration capabilities for modern database systems. This skill combines theoretical principles with practical tools to help architects and developers create scalable, performant, and maintainable database schemas. Core Competencies Schema Design & Analysis - Normalization Analysis : Automated detection of normalization levels (1NF through BCNF) - Denormalization Strategy : Smart recommendations for performance optimization - Data Type Optimization : Ide…

, r'category', r'state

Database Designer - POWERFUL Tier Skill Overview A comprehensive database design skill that provides expert-level analysis, optimization, and migration capabilities for modern database systems. This skill combines theoretical principles with practical tools to help architects and developers create scalable, performant, and maintainable database schemas. Core Competencies Schema Design & Analysis - Normalization Analysis : Automated detection of normalization levels (1NF through BCNF) - Denormalization Strategy : Smart recommendations for performance optimization - Data Type Optimization : Ide…

, r'flag

Database Designer - POWERFUL Tier Skill Overview A comprehensive database design skill that provides expert-level analysis, optimization, and migration capabilities for modern database systems. This skill combines theoretical principles with practical tools to help architects and developers create scalable, performant, and maintainable database schemas. Core Competencies Schema Design & Analysis - Normalization Analysis : Automated detection of normalization levels (1NF through BCNF) - Denormalization Strategy : Smart recommendations for performance optimization - Data Type Optimization : Ide…

, r'active

Database Designer - POWERFUL Tier Skill Overview A comprehensive database design skill that provides expert-level analysis, optimization, and migration capabilities for modern database systems. This skill combines theoretical principles with practical tools to help architects and developers create scalable, performant, and maintainable database schemas. Core Competencies Schema Design & Analysis - Normalization Analysis : Automated detection of normalization levels (1NF through BCNF) - Denormalization Strategy : Smart recommendations for performance optimization - Data Type Optimization : Ide…

,\n r'enabled

Database Designer - POWERFUL Tier Skill Overview A comprehensive database design skill that provides expert-level analysis, optimization, and migration capabilities for modern database systems. This skill combines theoretical principles with practical tools to help architects and developers create scalable, performant, and maintainable database schemas. Core Competencies Schema Design & Analysis - Normalization Analysis : Automated detection of normalization levels (1NF through BCNF) - Denormalization Strategy : Smart recommendations for performance optimization - Data Type Optimization : Ide…

, r'deleted

Database Designer - POWERFUL Tier Skill Overview A comprehensive database design skill that provides expert-level analysis, optimization, and migration capabilities for modern database systems. This skill combines theoretical principles with practical tools to help architects and developers create scalable, performant, and maintainable database schemas. Core Competencies Schema Design & Analysis - Normalization Analysis : Automated detection of normalization levels (1NF through BCNF) - Denormalization Strategy : Smart recommendations for performance optimization - Data Type Optimization : Ide…

, r'visible

Database Designer - POWERFUL Tier Skill Overview A comprehensive database design skill that provides expert-level analysis, optimization, and migration capabilities for modern database systems. This skill combines theoretical principles with practical tools to help architects and developers create scalable, performant, and maintainable database schemas. Core Competencies Schema Design & Analysis - Normalization Analysis : Automated detection of normalization levels (1NF through BCNF) - Denormalization Strategy : Smart recommendations for performance optimization - Data Type Optimization : Ide…

, r'gender

Database Designer - POWERFUL Tier Skill Overview A comprehensive database design skill that provides expert-level analysis, optimization, and migration capabilities for modern database systems. This skill combines theoretical principles with practical tools to help architects and developers create scalable, performant, and maintainable database schemas. Core Competencies Schema Design & Analysis - Normalization Analysis : Automated detection of normalization levels (1NF through BCNF) - Denormalization Strategy : Smart recommendations for performance optimization - Data Type Optimization : Ide…

, r'priority

Database Designer - POWERFUL Tier Skill Overview A comprehensive database design skill that provides expert-level analysis, optimization, and migration capabilities for modern database systems. This skill combines theoretical principles with practical tools to help architects and developers create scalable, performant, and maintainable database schemas. Core Competencies Schema Design & Analysis - Normalization Analysis : Automated detection of normalization levels (1NF through BCNF) - Denormalization Strategy : Smart recommendations for performance optimization - Data Type Optimization : Ide…

\n ]\n \n self.very_low_selectivity_patterns = [\n r'is_.*', r'has_.*', r'can_.*', r'boolean', r'bool'\n ]\n \n def estimate_selectivity(self, column: Column, table_size_estimate: int = 10000) -> float:\n \"\"\"Estimate column selectivity (0.0 = all same values, 1.0 = all unique values).\"\"\"\n column_name_lower = column.name.lower()\n \n # Primary key or unique columns\n if column.unique or column.name.lower() in ['id', 'uuid', 'guid']:\n return 1.0\n \n # Check cardinality estimate if available\n if column.cardinality_estimate:\n return min(column.cardinality_estimate / table_size_estimate, 1.0)\n \n # Pattern-based estimation\n for pattern in self.high_selectivity_patterns:\n if re.search(pattern, column_name_lower):\n return 0.9 # Very high selectivity\n \n for pattern in self.medium_selectivity_patterns:\n if re.search(pattern, column_name_lower):\n return 0.7 # Good selectivity\n \n for pattern in self.low_selectivity_patterns:\n if re.search(pattern, column_name_lower):\n return 0.2 # Poor selectivity\n \n for pattern in self.very_low_selectivity_patterns:\n if re.search(pattern, column_name_lower):\n return 0.1 # Very poor selectivity\n \n # Data type based estimation\n data_type_upper = column.data_type.upper()\n if data_type_upper.startswith('BOOL'):\n return 0.1\n elif data_type_upper.startswith(('TINYINT', 'SMALLINT')):\n return 0.3\n elif data_type_upper.startswith('INT'):\n return 0.8\n elif data_type_upper.startswith(('VARCHAR', 'TEXT')):\n # Estimate based on column name\n if 'name' in column_name_lower:\n return 0.7\n elif 'description' in column_name_lower or 'comment' in column_name_lower:\n return 0.9\n else:\n return 0.6\n \n # Default moderate selectivity\n return 0.5\n\n\nclass IndexOptimizer:\n def __init__(self):\n self.tables: Dict[str, Dict[str, Column]] = {}\n self.existing_indexes: Dict[str, List[Index]] = {}\n self.query_patterns: List[QueryPattern] = []\n self.selectivity_estimator = SelectivityEstimator()\n \n # Configuration\n self.max_composite_index_columns = 6\n self.min_selectivity_for_index = 0.1\n self.redundancy_overlap_threshold = 0.8\n \n def load_schema(self, schema_data: Dict[str, Any]) -> None:\n \"\"\"Load schema definition.\"\"\"\n if 'tables' not in schema_data:\n raise ValueError(\"Schema must contain 'tables' key\")\n \n for table_name, table_def in schema_data['tables'].items():\n self.tables[table_name] = {}\n self.existing_indexes[table_name] = []\n \n # Load columns\n for col_name, col_def in table_def.get('columns', {}).items():\n column = Column(\n name=col_name,\n data_type=col_def.get('type', 'VARCHAR(255)'),\n nullable=col_def.get('nullable', True),\n unique=col_def.get('unique', False),\n cardinality_estimate=col_def.get('cardinality_estimate')\n )\n self.tables[table_name][col_name] = column\n \n # Load existing indexes\n for idx_def in table_def.get('indexes', []):\n index = Index(\n name=idx_def['name'],\n table=table_name,\n columns=idx_def['columns'],\n unique=idx_def.get('unique', False),\n index_type=idx_def.get('type', 'btree'),\n partial_condition=idx_def.get('partial_condition'),\n include_columns=idx_def.get('include_columns', [])\n )\n self.existing_indexes[table_name].append(index)\n \n def load_query_patterns(self, query_data: Dict[str, Any]) -> None:\n \"\"\"Load query patterns for analysis.\"\"\"\n if 'queries' not in query_data:\n raise ValueError(\"Query data must contain 'queries' key\")\n \n for query_def in query_data['queries']:\n pattern = QueryPattern(\n query_id=query_def['id'],\n query_type=query_def.get('type', 'SELECT').upper(),\n table=query_def['table'],\n where_conditions=query_def.get('where_conditions', []),\n join_conditions=query_def.get('join_conditions', []),\n order_by=query_def.get('order_by', []),\n group_by=query_def.get('group_by', []),\n frequency=query_def.get('frequency', 1),\n avg_execution_time_ms=query_def.get('avg_execution_time_ms')\n )\n self.query_patterns.append(pattern)\n \n def analyze_missing_indexes(self) -> List[IndexRecommendation]:\n \"\"\"Identify missing indexes based on query patterns.\"\"\"\n recommendations = []\n \n for pattern in self.query_patterns:\n table_name = pattern.table\n if table_name not in self.tables:\n continue\n \n # Analyze WHERE conditions for single-column indexes\n for condition in pattern.where_conditions:\n column = condition.get('column')\n operator = condition.get('operator', '=')\n \n if column and column in self.tables[table_name]:\n if not self._has_covering_index(table_name, [column]):\n recommendation = self._create_single_column_recommendation(\n table_name, column, pattern, operator\n )\n if recommendation:\n recommendations.append(recommendation)\n \n # Analyze composite indexes for multi-column WHERE conditions\n where_columns = [cond.get('column') for cond in pattern.where_conditions \n if cond.get('column') and cond.get('column') in self.tables[table_name]]\n \n if len(where_columns) > 1:\n composite_recommendation = self._create_composite_recommendation(\n table_name, where_columns, pattern\n )\n if composite_recommendation:\n recommendations.append(composite_recommendation)\n \n # Analyze covering indexes for SELECT with ORDER BY\n if pattern.order_by and where_columns:\n covering_recommendation = self._create_covering_index_recommendation(\n table_name, where_columns, pattern\n )\n if covering_recommendation:\n recommendations.append(covering_recommendation)\n \n # Analyze JOIN conditions\n for join_condition in pattern.join_conditions:\n local_column = join_condition.get('local_column')\n if local_column and local_column in self.tables[table_name]:\n if not self._has_covering_index(table_name, [local_column]):\n recommendation = self._create_join_index_recommendation(\n table_name, local_column, pattern, join_condition\n )\n if recommendation:\n recommendations.append(recommendation)\n \n # Remove duplicates and prioritize\n recommendations = self._deduplicate_recommendations(recommendations)\n recommendations = self._prioritize_recommendations(recommendations)\n \n return recommendations\n \n def _has_covering_index(self, table_name: str, columns: List[str]) -> bool:\n \"\"\"Check if existing indexes cover the specified columns.\"\"\"\n if table_name not in self.existing_indexes:\n return False\n \n for index in self.existing_indexes[table_name]:\n # Check if index starts with required columns (prefix match for composite)\n if len(index.columns) >= len(columns):\n if index.columns[:len(columns)] == columns:\n return True\n \n return False\n \n def _create_single_column_recommendation(\n self, \n table_name: str, \n column: str, \n pattern: QueryPattern,\n operator: str\n ) -> Optional[IndexRecommendation]:\n \"\"\"Create recommendation for single-column index.\"\"\"\n column_obj = self.tables[table_name][column]\n selectivity = self.selectivity_estimator.estimate_selectivity(column_obj)\n \n # Skip very low selectivity columns unless frequently used\n if selectivity \u003c self.min_selectivity_for_index and pattern.frequency \u003c 100:\n return None\n \n index_name = f\"idx_{table_name}_{column}\"\n index = Index(\n name=index_name,\n table=table_name,\n columns=[column],\n unique=column_obj.unique,\n index_type=\"btree\"\n )\n \n reason = f\"Optimize WHERE {column} {operator} queries\"\n if pattern.frequency > 10:\n reason += f\" (used {pattern.frequency} times)\"\n \n return IndexRecommendation(\n recommendation_id=self._generate_recommendation_id(table_name, [column]),\n table=table_name,\n recommended_index=index,\n reason=reason,\n query_patterns_helped=[pattern.query_id],\n estimated_benefit=self._estimate_benefit(selectivity, pattern.frequency),\n estimated_overhead=\"Low (single column)\",\n priority=self._calculate_priority(selectivity, pattern.frequency, 1),\n sql_statement=f\"CREATE INDEX {index_name} ON {table_name} ({column});\",\n selectivity_analysis={\n \"column_selectivity\": selectivity,\n \"estimated_reduction\": f\"{int(selectivity * 100)}%\"\n }\n )\n \n def _create_composite_recommendation(\n self, \n table_name: str, \n columns: List[str], \n pattern: QueryPattern\n ) -> Optional[IndexRecommendation]:\n \"\"\"Create recommendation for composite index.\"\"\"\n if len(columns) > self.max_composite_index_columns:\n columns = columns[:self.max_composite_index_columns]\n \n # Order columns by selectivity (most selective first)\n column_selectivities = []\n for col in columns:\n col_obj = self.tables[table_name][col]\n selectivity = self.selectivity_estimator.estimate_selectivity(col_obj)\n column_selectivities.append((col, selectivity))\n \n # Sort by selectivity descending\n column_selectivities.sort(key=lambda x: x[1], reverse=True)\n ordered_columns = [col for col, _ in column_selectivities]\n \n # Calculate combined selectivity\n combined_selectivity = min(sum(sel for _, sel in column_selectivities) / len(columns), 0.95)\n \n index_name = f\"idx_{table_name}_{'_'.join(ordered_columns)}\"\n if len(index_name) > 63: # PostgreSQL limit\n index_name = f\"idx_{table_name}_composite_{abs(hash('_'.join(ordered_columns))) % 10000}\"\n \n index = Index(\n name=index_name,\n table=table_name,\n columns=ordered_columns,\n index_type=\"btree\"\n )\n \n reason = f\"Optimize multi-column WHERE conditions: {', '.join(ordered_columns)}\"\n \n return IndexRecommendation(\n recommendation_id=self._generate_recommendation_id(table_name, ordered_columns),\n table=table_name,\n recommended_index=index,\n reason=reason,\n query_patterns_helped=[pattern.query_id],\n estimated_benefit=self._estimate_benefit(combined_selectivity, pattern.frequency),\n estimated_overhead=f\"Medium (composite index with {len(ordered_columns)} columns)\",\n priority=self._calculate_priority(combined_selectivity, pattern.frequency, len(ordered_columns)),\n sql_statement=f\"CREATE INDEX {index_name} ON {table_name} ({', '.join(ordered_columns)});\",\n selectivity_analysis={\n \"column_selectivities\": {col: sel for col, sel in column_selectivities},\n \"combined_selectivity\": combined_selectivity,\n \"column_order_rationale\": \"Ordered by selectivity (most selective first)\"\n }\n )\n \n def _create_covering_index_recommendation(\n self, \n table_name: str, \n where_columns: List[str], \n pattern: QueryPattern\n ) -> Optional[IndexRecommendation]:\n \"\"\"Create recommendation for covering index.\"\"\"\n order_columns = [col['column'] for col in pattern.order_by if col['column'] in self.tables[table_name]]\n \n # Combine WHERE and ORDER BY columns\n index_columns = where_columns.copy()\n include_columns = []\n \n # Add ORDER BY columns to index columns\n for col in order_columns:\n if col not in index_columns:\n index_columns.append(col)\n \n # Limit index columns\n if len(index_columns) > self.max_composite_index_columns:\n include_columns = index_columns[self.max_composite_index_columns:]\n index_columns = index_columns[:self.max_composite_index_columns]\n \n index_name = f\"idx_{table_name}_covering_{'_'.join(index_columns[:3])}\"\n if len(index_name) > 63:\n index_name = f\"idx_{table_name}_covering_{abs(hash('_'.join(index_columns))) % 10000}\"\n \n index = Index(\n name=index_name,\n table=table_name,\n columns=index_columns,\n include_columns=include_columns,\n index_type=\"btree\"\n )\n \n reason = f\"Covering index for WHERE + ORDER BY optimization\"\n \n # Calculate selectivity for main columns\n main_selectivity = 0.5 # Default for covering indexes\n if where_columns:\n selectivities = [\n self.selectivity_estimator.estimate_selectivity(self.tables[table_name][col])\n for col in where_columns[:2] # Consider first 2 columns\n ]\n main_selectivity = max(selectivities)\n \n sql_parts = [f\"CREATE INDEX {index_name} ON {table_name} ({', '.join(index_columns)})\"]\n if include_columns:\n sql_parts.append(f\" INCLUDE ({', '.join(include_columns)})\")\n sql_statement = ''.join(sql_parts) + \";\"\n \n return IndexRecommendation(\n recommendation_id=self._generate_recommendation_id(table_name, index_columns, \"covering\"),\n table=table_name,\n recommended_index=index,\n reason=reason,\n query_patterns_helped=[pattern.query_id],\n estimated_benefit=\"High (eliminates table lookups for SELECT)\",\n estimated_overhead=f\"High (covering index with {len(index_columns)} columns)\",\n priority=self._calculate_priority(main_selectivity, pattern.frequency, len(index_columns)),\n sql_statement=sql_statement,\n selectivity_analysis={\n \"main_columns_selectivity\": main_selectivity,\n \"covering_benefit\": \"Eliminates table lookup for SELECT queries\"\n }\n )\n \n def _create_join_index_recommendation(\n self, \n table_name: str, \n column: str, \n pattern: QueryPattern,\n join_condition: Dict[str, Any]\n ) -> Optional[IndexRecommendation]:\n \"\"\"Create recommendation for JOIN optimization index.\"\"\"\n column_obj = self.tables[table_name][column]\n selectivity = self.selectivity_estimator.estimate_selectivity(column_obj)\n \n index_name = f\"idx_{table_name}_{column}_join\"\n index = Index(\n name=index_name,\n table=table_name,\n columns=[column],\n index_type=\"btree\"\n )\n \n foreign_table = join_condition.get('foreign_table', 'unknown')\n reason = f\"Optimize JOIN with {foreign_table} table on {column}\"\n \n return IndexRecommendation(\n recommendation_id=self._generate_recommendation_id(table_name, [column], \"join\"),\n table=table_name,\n recommended_index=index,\n reason=reason,\n query_patterns_helped=[pattern.query_id],\n estimated_benefit=self._estimate_join_benefit(pattern.frequency),\n estimated_overhead=\"Low (single column for JOIN)\",\n priority=2, # JOINs are generally high priority\n sql_statement=f\"CREATE INDEX {index_name} ON {table_name} ({column});\",\n selectivity_analysis={\n \"column_selectivity\": selectivity,\n \"join_optimization\": True\n }\n )\n \n def _generate_recommendation_id(self, table: str, columns: List[str], suffix: str = \"\") -> str:\n \"\"\"Generate unique recommendation ID.\"\"\"\n content = f\"{table}_{'_'.join(sorted(columns))}_{suffix}\"\n return hashlib.md5(content.encode()).hexdigest()[:8]\n \n def _estimate_benefit(self, selectivity: float, frequency: int) -> str:\n \"\"\"Estimate performance benefit of index.\"\"\"\n if selectivity > 0.8 and frequency > 50:\n return \"Very High\"\n elif selectivity > 0.6 and frequency > 20:\n return \"High\"\n elif selectivity > 0.4 or frequency > 10:\n return \"Medium\"\n else:\n return \"Low\"\n \n def _estimate_join_benefit(self, frequency: int) -> str:\n \"\"\"Estimate benefit for JOIN indexes.\"\"\"\n if frequency > 50:\n return \"Very High (frequent JOINs)\"\n elif frequency > 20:\n return \"High (regular JOINs)\"\n elif frequency > 5:\n return \"Medium (occasional JOINs)\"\n else:\n return \"Low (rare JOINs)\"\n \n def _calculate_priority(self, selectivity: float, frequency: int, column_count: int) -> int:\n \"\"\"Calculate priority score (1 = highest priority).\"\"\"\n # Base score calculation\n score = 0\n \n # Selectivity contribution (0-50 points)\n score += int(selectivity * 50)\n \n # Frequency contribution (0-30 points)\n score += min(frequency, 30)\n \n # Penalty for complex indexes (subtract points)\n score -= (column_count - 1) * 5\n \n # Convert to priority levels\n if score >= 70:\n return 1 # Highest\n elif score >= 50:\n return 2 # High\n elif score >= 30:\n return 3 # Medium\n else:\n return 4 # Low\n \n def _deduplicate_recommendations(self, recommendations: List[IndexRecommendation]) -> List[IndexRecommendation]:\n \"\"\"Remove duplicate recommendations.\"\"\"\n seen_indexes = set()\n unique_recommendations = []\n \n for rec in recommendations:\n index_signature = (rec.table, tuple(rec.recommended_index.columns))\n if index_signature not in seen_indexes:\n seen_indexes.add(index_signature)\n unique_recommendations.append(rec)\n else:\n # Merge query patterns helped\n for existing_rec in unique_recommendations:\n if (existing_rec.table == rec.table and \n existing_rec.recommended_index.columns == rec.recommended_index.columns):\n existing_rec.query_patterns_helped.extend(rec.query_patterns_helped)\n break\n \n return unique_recommendations\n \n def _prioritize_recommendations(self, recommendations: List[IndexRecommendation]) -> List[IndexRecommendation]:\n \"\"\"Sort recommendations by priority.\"\"\"\n return sorted(recommendations, key=lambda x: (x.priority, -len(x.query_patterns_helped)))\n \n def analyze_redundant_indexes(self) -> List[RedundancyIssue]:\n \"\"\"Identify redundant, overlapping, and potentially unused indexes.\"\"\"\n redundancy_issues = []\n \n for table_name, indexes in self.existing_indexes.items():\n if len(indexes) \u003c 2:\n continue\n \n # Find duplicate indexes\n duplicates = self._find_duplicate_indexes(table_name, indexes)\n redundancy_issues.extend(duplicates)\n \n # Find overlapping indexes\n overlapping = self._find_overlapping_indexes(table_name, indexes)\n redundancy_issues.extend(overlapping)\n \n # Find potentially unused indexes\n unused = self._find_unused_indexes(table_name, indexes)\n redundancy_issues.extend(unused)\n \n return redundancy_issues\n \n def _find_duplicate_indexes(self, table_name: str, indexes: List[Index]) -> List[RedundancyIssue]:\n \"\"\"Find exactly duplicate indexes.\"\"\"\n issues = []\n seen_signatures = {}\n \n for index in indexes:\n signature = (tuple(index.columns), index.unique, index.partial_condition)\n if signature in seen_signatures:\n existing_index = seen_signatures[signature]\n issues.append(RedundancyIssue(\n issue_type=\"DUPLICATE\",\n affected_indexes=[existing_index.name, index.name],\n table=table_name,\n description=f\"Indexes '{existing_index.name}' and '{index.name}' are identical\",\n recommendation=f\"Drop one of the duplicate indexes\",\n sql_statements=[f\"DROP INDEX {index.name};\"]\n ))\n else:\n seen_signatures[signature] = index\n \n return issues\n \n def _find_overlapping_indexes(self, table_name: str, indexes: List[Index]) -> List[RedundancyIssue]:\n \"\"\"Find overlapping indexes that might be redundant.\"\"\"\n issues = []\n \n for i, index1 in enumerate(indexes):\n for index2 in indexes[i+1:]:\n overlap_ratio = self._calculate_overlap_ratio(index1, index2)\n \n if overlap_ratio >= self.redundancy_overlap_threshold:\n # Determine which index to keep\n if len(index1.columns) \u003c= len(index2.columns):\n redundant_index = index1\n keep_index = index2\n else:\n redundant_index = index2\n keep_index = index1\n \n issues.append(RedundancyIssue(\n issue_type=\"OVERLAPPING\",\n affected_indexes=[index1.name, index2.name],\n table=table_name,\n description=f\"Index '{redundant_index.name}' overlaps {int(overlap_ratio * 100)}% \"\n f\"with '{keep_index.name}'\",\n recommendation=f\"Consider dropping '{redundant_index.name}' as it's largely \"\n f\"covered by '{keep_index.name}'\",\n sql_statements=[f\"DROP INDEX {redundant_index.name};\"]\n ))\n \n return issues\n \n def _calculate_overlap_ratio(self, index1: Index, index2: Index) -> float:\n \"\"\"Calculate overlap ratio between two indexes.\"\"\"\n cols1 = set(index1.columns)\n cols2 = set(index2.columns)\n \n if not cols1 or not cols2:\n return 0.0\n \n intersection = len(cols1.intersection(cols2))\n union = len(cols1.union(cols2))\n \n return intersection / union if union > 0 else 0.0\n \n def _find_unused_indexes(self, table_name: str, indexes: List[Index]) -> List[RedundancyIssue]:\n \"\"\"Find potentially unused indexes based on query patterns.\"\"\"\n issues = []\n \n # Collect all columns used in query patterns for this table\n used_columns = set()\n table_patterns = [p for p in self.query_patterns if p.table == table_name]\n \n for pattern in table_patterns:\n # Add WHERE condition columns\n for condition in pattern.where_conditions:\n if condition.get('column'):\n used_columns.add(condition['column'])\n \n # Add JOIN columns\n for join in pattern.join_conditions:\n if join.get('local_column'):\n used_columns.add(join['local_column'])\n \n # Add ORDER BY columns\n for order in pattern.order_by:\n if order.get('column'):\n used_columns.add(order['column'])\n \n # Add GROUP BY columns\n used_columns.update(pattern.group_by)\n \n if not used_columns:\n return issues # Can't determine usage without query patterns\n \n for index in indexes:\n index_columns = set(index.columns)\n if not index_columns.intersection(used_columns):\n issues.append(RedundancyIssue(\n issue_type=\"UNUSED\",\n affected_indexes=[index.name],\n table=table_name,\n description=f\"Index '{index.name}' columns {index.columns} are not used in any query patterns\",\n recommendation=\"Consider dropping this index if it's truly unused (verify with query logs)\",\n sql_statements=[f\"-- Review usage before dropping\\n-- DROP INDEX {index.name};\"]\n ))\n \n return issues\n \n def estimate_index_sizes(self) -> Dict[str, Dict[str, Any]]:\n \"\"\"Estimate storage requirements for recommended indexes.\"\"\"\n size_estimates = {}\n \n # This is a simplified estimation - in practice, would need actual table statistics\n for table_name in self.tables:\n size_estimates[table_name] = {\n \"estimated_table_rows\": 10000, # Default estimate\n \"existing_indexes_size_mb\": len(self.existing_indexes.get(table_name, [])) * 5, # Rough estimate\n \"index_overhead_per_column_mb\": 2 # Rough estimate per column\n }\n \n return size_estimates\n \n def generate_analysis_report(self) -> Dict[str, Any]:\n \"\"\"Generate comprehensive analysis report.\"\"\"\n recommendations = self.analyze_missing_indexes()\n redundancy_issues = self.analyze_redundant_indexes()\n size_estimates = self.estimate_index_sizes()\n \n # Calculate statistics\n total_existing_indexes = sum(len(indexes) for indexes in self.existing_indexes.values())\n tables_analyzed = len(self.tables)\n query_patterns_analyzed = len(self.query_patterns)\n \n # Categorize recommendations by priority\n high_priority = [r for r in recommendations if r.priority \u003c= 2]\n medium_priority = [r for r in recommendations if r.priority == 3]\n low_priority = [r for r in recommendations if r.priority >= 4]\n \n return {\n \"analysis_summary\": {\n \"tables_analyzed\": tables_analyzed,\n \"query_patterns_analyzed\": query_patterns_analyzed,\n \"existing_indexes\": total_existing_indexes,\n \"total_recommendations\": len(recommendations),\n \"high_priority_recommendations\": len(high_priority),\n \"redundancy_issues_found\": len(redundancy_issues)\n },\n \"index_recommendations\": {\n \"high_priority\": [asdict(r) for r in high_priority],\n \"medium_priority\": [asdict(r) for r in medium_priority],\n \"low_priority\": [asdict(r) for r in low_priority]\n },\n \"redundancy_analysis\": [asdict(issue) for issue in redundancy_issues],\n \"size_estimates\": size_estimates,\n \"sql_statements\": {\n \"create_indexes\": [rec.sql_statement for rec in recommendations],\n \"drop_redundant\": [\n stmt for issue in redundancy_issues \n for stmt in issue.sql_statements\n ]\n },\n \"performance_impact\": self._generate_performance_impact_analysis(recommendations)\n }\n \n def _generate_performance_impact_analysis(self, recommendations: List[IndexRecommendation]) -> Dict[str, Any]:\n \"\"\"Generate performance impact analysis.\"\"\"\n impact_analysis = {\n \"query_optimization\": {},\n \"write_overhead\": {},\n \"storage_impact\": {}\n }\n \n # Analyze query optimization impact\n query_benefits = defaultdict(list)\n for rec in recommendations:\n for query_id in rec.query_patterns_helped:\n query_benefits[query_id].append(rec.estimated_benefit)\n \n impact_analysis[\"query_optimization\"] = {\n \"queries_improved\": len(query_benefits),\n \"high_impact_queries\": len([q for q, benefits in query_benefits.items() \n if any(\"High\" in benefit for benefit in benefits)]),\n \"benefit_distribution\": dict(Counter(\n rec.estimated_benefit for rec in recommendations\n ))\n }\n \n # Analyze write overhead\n impact_analysis[\"write_overhead\"] = {\n \"total_new_indexes\": len(recommendations),\n \"estimated_insert_overhead\": f\"{len(recommendations) * 5}%\", # Rough estimate\n \"tables_most_affected\": list(Counter(rec.table for rec in recommendations).most_common(3))\n }\n \n return impact_analysis\n \n def format_text_report(self, analysis: Dict[str, Any]) -> str:\n \"\"\"Format analysis as human-readable text report.\"\"\"\n lines = []\n lines.append(\"DATABASE INDEX OPTIMIZATION REPORT\")\n lines.append(\"=\" * 50)\n lines.append(\"\")\n \n # Summary\n summary = analysis[\"analysis_summary\"]\n lines.append(\"ANALYSIS SUMMARY\")\n lines.append(\"-\" * 16)\n lines.append(f\"Tables Analyzed: {summary['tables_analyzed']}\")\n lines.append(f\"Query Patterns: {summary['query_patterns_analyzed']}\")\n lines.append(f\"Existing Indexes: {summary['existing_indexes']}\")\n lines.append(f\"New Recommendations: {summary['total_recommendations']}\")\n lines.append(f\"High Priority: {summary['high_priority_recommendations']}\")\n lines.append(f\"Redundancy Issues: {summary['redundancy_issues_found']}\")\n lines.append(\"\")\n \n # High Priority Recommendations\n high_priority = analysis[\"index_recommendations\"][\"high_priority\"]\n if high_priority:\n lines.append(f\"HIGH PRIORITY RECOMMENDATIONS ({len(high_priority)})\")\n lines.append(\"-\" * 35)\n for i, rec in enumerate(high_priority[:10], 1): # Show top 10\n lines.append(f\"{i}. {rec['table']}: {rec['reason']}\")\n lines.append(f\" Columns: {', '.join(rec['recommended_index']['columns'])}\")\n lines.append(f\" Benefit: {rec['estimated_benefit']}\")\n lines.append(f\" SQL: {rec['sql_statement']}\")\n lines.append(\"\")\n \n # Redundancy Issues\n redundancy = analysis[\"redundancy_analysis\"]\n if redundancy:\n lines.append(f\"REDUNDANCY ISSUES ({len(redundancy)})\")\n lines.append(\"-\" * 20)\n for issue in redundancy[:5]: # Show first 5\n lines.append(f\"• {issue['issue_type']}: {issue['description']}\")\n lines.append(f\" Recommendation: {issue['recommendation']}\")\n if issue['sql_statements']:\n lines.append(f\" SQL: {issue['sql_statements'][0]}\")\n lines.append(\"\")\n \n # Performance Impact\n perf_impact = analysis[\"performance_impact\"]\n lines.append(\"PERFORMANCE IMPACT ANALYSIS\")\n lines.append(\"-\" * 30)\n query_opt = perf_impact[\"query_optimization\"]\n lines.append(f\"Queries to be optimized: {query_opt['queries_improved']}\")\n lines.append(f\"High impact optimizations: {query_opt['high_impact_queries']}\")\n \n write_overhead = perf_impact[\"write_overhead\"]\n lines.append(f\"Estimated insert overhead: {write_overhead['estimated_insert_overhead']}\")\n lines.append(\"\")\n \n # SQL Statements Summary\n sql_statements = analysis[\"sql_statements\"]\n create_statements = sql_statements[\"create_indexes\"]\n if create_statements:\n lines.append(\"RECOMMENDED CREATE INDEX STATEMENTS\")\n lines.append(\"-\" * 36)\n for i, stmt in enumerate(create_statements[:10], 1):\n lines.append(f\"{i}. {stmt}\")\n \n if len(create_statements) > 10:\n lines.append(f\"... and {len(create_statements) - 10} more\")\n lines.append(\"\")\n \n return \"\\n\".join(lines)\n\n\ndef main():\n parser = argparse.ArgumentParser(description=\"Optimize database indexes based on schema and query patterns\")\n parser.add_argument(\"--schema\", \"-s\", required=True, help=\"Schema definition JSON file\")\n parser.add_argument(\"--queries\", \"-q\", required=True, help=\"Query patterns JSON file\")\n parser.add_argument(\"--output\", \"-o\", help=\"Output file (default: stdout)\")\n parser.add_argument(\"--format\", \"-f\", choices=[\"json\", \"text\"], default=\"text\", \n help=\"Output format\")\n parser.add_argument(\"--analyze-existing\", \"-e\", action=\"store_true\", \n help=\"Include analysis of existing indexes\")\n parser.add_argument(\"--min-priority\", \"-p\", type=int, default=4, \n help=\"Minimum priority level to include (1=highest, 4=lowest)\")\n \n args = parser.parse_args()\n \n try:\n # Load schema\n with open(args.schema, 'r') as f:\n schema_data = json.load(f)\n \n # Load queries\n with open(args.queries, 'r') as f:\n query_data = json.load(f)\n \n # Initialize optimizer\n optimizer = IndexOptimizer()\n optimizer.load_schema(schema_data)\n optimizer.load_query_patterns(query_data)\n \n # Generate analysis\n analysis = optimizer.generate_analysis_report()\n \n # Filter by priority if specified\n if args.min_priority \u003c 4:\n for priority_level in [\"high_priority\", \"medium_priority\", \"low_priority\"]:\n analysis[\"index_recommendations\"][priority_level] = [\n rec for rec in analysis[\"index_recommendations\"][priority_level]\n if rec[\"priority\"] \u003c= args.min_priority\n ]\n \n # Format output\n if args.format == \"json\":\n output = json.dumps(analysis, indent=2)\n else:\n output = optimizer.format_text_report(analysis)\n \n # Write output\n if args.output:\n with open(args.output, 'w') as f:\n f.write(output)\n else:\n print(output)\n \n return 0\n \n except Exception as e:\n print(f\"Error: {e}\", file=sys.stderr)\n return 1\n\n\nif __name__ == \"__main__\":\n sys.exit(main())","content_type":"text/x-python; charset=utf-8","language":"python","size":38896,"content_sha256":"0c73e6724d02eae6d48a9dd2a626e06e7d551a3fcae3ab5f13a044384b2db485"},{"filename":"migration_generator.py","content":"#!/usr/bin/env python3\n\"\"\"\nDatabase Migration Generator\n\nGenerates safe migration scripts between schema versions:\n- Compares current and target schemas\n- Generates ALTER TABLE statements for schema changes\n- Implements zero-downtime migration strategies (expand-contract pattern)\n- Creates rollback scripts for all changes\n- Generates validation queries to verify migrations\n- Handles complex changes like table splits/merges\n\nInput: Current schema JSON + Target schema JSON\nOutput: Migration SQL + Rollback SQL + Validation queries + Execution plan\n\nUsage:\n python migration_generator.py --current current_schema.json --target target_schema.json --output migration.sql\n python migration_generator.py --current current.json --target target.json --format json\n python migration_generator.py --current current.json --target target.json --zero-downtime\n python migration_generator.py --current current.json --target target.json --validate-only\n\"\"\"\n\nimport argparse\nimport json\nimport re\nimport sys\nfrom collections import defaultdict, OrderedDict\nfrom typing import Dict, List, Set, Tuple, Optional, Any, Union\nfrom dataclasses import dataclass, asdict\nfrom datetime import datetime\nimport hashlib\n\n\n@dataclass\nclass Column:\n name: str\n data_type: str\n nullable: bool = True\n primary_key: bool = False\n unique: bool = False\n foreign_key: Optional[str] = None\n default_value: Optional[str] = None\n check_constraint: Optional[str] = None\n \n\n@dataclass\nclass Table:\n name: str\n columns: Dict[str, Column]\n primary_key: List[str]\n foreign_keys: Dict[str, str] # column -> referenced_table.column\n unique_constraints: List[List[str]]\n check_constraints: Dict[str, str]\n indexes: List[Dict[str, Any]]\n\n\n@dataclass\nclass MigrationStep:\n step_id: str\n step_type: str\n table: str\n description: str\n sql_forward: str\n sql_rollback: str\n validation_sql: Optional[str] = None\n dependencies: List[str] = None\n risk_level: str = \"LOW\" # LOW, MEDIUM, HIGH\n estimated_time: Optional[str] = None\n zero_downtime_phase: Optional[str] = None # EXPAND, CONTRACT, or None\n\n\n@dataclass\nclass MigrationPlan:\n migration_id: str\n created_at: str\n source_schema_hash: str\n target_schema_hash: str\n steps: List[MigrationStep]\n summary: Dict[str, Any]\n execution_order: List[str]\n rollback_order: List[str]\n\n\n@dataclass\nclass ValidationCheck:\n check_id: str\n check_type: str\n table: str\n description: str\n sql_query: str\n expected_result: Any\n critical: bool = True\n\n\nclass SchemaComparator:\n \"\"\"Compares two schema versions and identifies differences.\"\"\"\n \n def __init__(self):\n self.current_schema: Dict[str, Table] = {}\n self.target_schema: Dict[str, Table] = {}\n self.changes: Dict[str, List[Dict[str, Any]]] = {\n 'tables_added': [],\n 'tables_dropped': [],\n 'tables_renamed': [],\n 'columns_added': [],\n 'columns_dropped': [],\n 'columns_modified': [],\n 'columns_renamed': [],\n 'constraints_added': [],\n 'constraints_dropped': [],\n 'indexes_added': [],\n 'indexes_dropped': []\n }\n \n def load_schemas(self, current_data: Dict[str, Any], target_data: Dict[str, Any]):\n \"\"\"Load current and target schemas.\"\"\"\n self.current_schema = self._parse_schema(current_data)\n self.target_schema = self._parse_schema(target_data)\n \n def _parse_schema(self, schema_data: Dict[str, Any]) -> Dict[str, Table]:\n \"\"\"Parse schema JSON into Table objects.\"\"\"\n tables = {}\n \n if 'tables' not in schema_data:\n return tables\n \n for table_name, table_def in schema_data['tables'].items():\n columns = {}\n primary_key = table_def.get('primary_key', [])\n foreign_keys = {}\n \n # Parse columns\n for col_name, col_def in table_def.get('columns', {}).items():\n column = Column(\n name=col_name,\n data_type=col_def.get('type', 'VARCHAR(255)'),\n nullable=col_def.get('nullable', True),\n primary_key=col_name in primary_key,\n unique=col_def.get('unique', False),\n foreign_key=col_def.get('foreign_key'),\n default_value=col_def.get('default'),\n check_constraint=col_def.get('check_constraint')\n )\n columns[col_name] = column\n \n if column.foreign_key:\n foreign_keys[col_name] = column.foreign_key\n \n table = Table(\n name=table_name,\n columns=columns,\n primary_key=primary_key,\n foreign_keys=foreign_keys,\n unique_constraints=table_def.get('unique_constraints', []),\n check_constraints=table_def.get('check_constraints', {}),\n indexes=table_def.get('indexes', [])\n )\n tables[table_name] = table\n \n return tables\n \n def compare_schemas(self) -> Dict[str, List[Dict[str, Any]]]:\n \"\"\"Compare schemas and identify all changes.\"\"\"\n self._compare_tables()\n self._compare_columns()\n self._compare_constraints()\n self._compare_indexes()\n return self.changes\n \n def _compare_tables(self):\n \"\"\"Compare table-level changes.\"\"\"\n current_tables = set(self.current_schema.keys())\n target_tables = set(self.target_schema.keys())\n \n # Tables added\n for table_name in target_tables - current_tables:\n self.changes['tables_added'].append({\n 'table': table_name,\n 'definition': self.target_schema[table_name]\n })\n \n # Tables dropped\n for table_name in current_tables - target_tables:\n self.changes['tables_dropped'].append({\n 'table': table_name,\n 'definition': self.current_schema[table_name]\n })\n \n # Tables renamed (heuristic based on column similarity)\n self._detect_renamed_tables(current_tables - target_tables, target_tables - current_tables)\n \n def _detect_renamed_tables(self, dropped_tables: Set[str], added_tables: Set[str]):\n \"\"\"Detect renamed tables based on column similarity.\"\"\"\n if not dropped_tables or not added_tables:\n return\n \n # Calculate similarity scores\n similarity_scores = []\n for dropped_table in dropped_tables:\n for added_table in added_tables:\n score = self._calculate_table_similarity(dropped_table, added_table)\n if score > 0.7: # High similarity threshold\n similarity_scores.append((score, dropped_table, added_table))\n \n # Sort by similarity and identify renames\n similarity_scores.sort(reverse=True)\n used_tables = set()\n \n for score, old_name, new_name in similarity_scores:\n if old_name not in used_tables and new_name not in used_tables:\n self.changes['tables_renamed'].append({\n 'old_name': old_name,\n 'new_name': new_name,\n 'similarity_score': score\n })\n used_tables.add(old_name)\n used_tables.add(new_name)\n \n # Remove from added/dropped lists\n self.changes['tables_added'] = [t for t in self.changes['tables_added'] if t['table'] != new_name]\n self.changes['tables_dropped'] = [t for t in self.changes['tables_dropped'] if t['table'] != old_name]\n \n def _calculate_table_similarity(self, table1_name: str, table2_name: str) -> float:\n \"\"\"Calculate similarity between two tables based on columns.\"\"\"\n table1 = self.current_schema[table1_name]\n table2 = self.target_schema[table2_name]\n \n cols1 = set(table1.columns.keys())\n cols2 = set(table2.columns.keys())\n \n if not cols1 and not cols2:\n return 1.0\n elif not cols1 or not cols2:\n return 0.0\n \n intersection = len(cols1.intersection(cols2))\n union = len(cols1.union(cols2))\n \n return intersection / union\n \n def _compare_columns(self):\n \"\"\"Compare column-level changes.\"\"\"\n common_tables = set(self.current_schema.keys()).intersection(set(self.target_schema.keys()))\n \n for table_name in common_tables:\n current_table = self.current_schema[table_name]\n target_table = self.target_schema[table_name]\n \n current_columns = set(current_table.columns.keys())\n target_columns = set(target_table.columns.keys())\n \n # Columns added\n for col_name in target_columns - current_columns:\n self.changes['columns_added'].append({\n 'table': table_name,\n 'column': col_name,\n 'definition': target_table.columns[col_name]\n })\n \n # Columns dropped\n for col_name in current_columns - target_columns:\n self.changes['columns_dropped'].append({\n 'table': table_name,\n 'column': col_name,\n 'definition': current_table.columns[col_name]\n })\n \n # Columns modified\n for col_name in current_columns.intersection(target_columns):\n current_col = current_table.columns[col_name]\n target_col = target_table.columns[col_name]\n \n if self._columns_different(current_col, target_col):\n self.changes['columns_modified'].append({\n 'table': table_name,\n 'column': col_name,\n 'current_definition': current_col,\n 'target_definition': target_col,\n 'changes': self._describe_column_changes(current_col, target_col)\n })\n \n def _columns_different(self, col1: Column, col2: Column) -> bool:\n \"\"\"Check if two columns have different definitions.\"\"\"\n return (col1.data_type != col2.data_type or\n col1.nullable != col2.nullable or\n col1.default_value != col2.default_value or\n col1.unique != col2.unique or\n col1.foreign_key != col2.foreign_key or\n col1.check_constraint != col2.check_constraint)\n \n def _describe_column_changes(self, current_col: Column, target_col: Column) -> List[str]:\n \"\"\"Describe specific changes between column definitions.\"\"\"\n changes = []\n \n if current_col.data_type != target_col.data_type:\n changes.append(f\"type: {current_col.data_type} -> {target_col.data_type}\")\n \n if current_col.nullable != target_col.nullable:\n changes.append(f\"nullable: {current_col.nullable} -> {target_col.nullable}\")\n \n if current_col.default_value != target_col.default_value:\n changes.append(f\"default: {current_col.default_value} -> {target_col.default_value}\")\n \n if current_col.unique != target_col.unique:\n changes.append(f\"unique: {current_col.unique} -> {target_col.unique}\")\n \n if current_col.foreign_key != target_col.foreign_key:\n changes.append(f\"foreign_key: {current_col.foreign_key} -> {target_col.foreign_key}\")\n \n return changes\n \n def _compare_constraints(self):\n \"\"\"Compare constraint changes.\"\"\"\n common_tables = set(self.current_schema.keys()).intersection(set(self.target_schema.keys()))\n \n for table_name in common_tables:\n current_table = self.current_schema[table_name]\n target_table = self.target_schema[table_name]\n \n # Compare primary keys\n if current_table.primary_key != target_table.primary_key:\n if current_table.primary_key:\n self.changes['constraints_dropped'].append({\n 'table': table_name,\n 'constraint_type': 'PRIMARY_KEY',\n 'columns': current_table.primary_key\n })\n \n if target_table.primary_key:\n self.changes['constraints_added'].append({\n 'table': table_name,\n 'constraint_type': 'PRIMARY_KEY',\n 'columns': target_table.primary_key\n })\n \n # Compare unique constraints\n current_unique = set(tuple(uc) for uc in current_table.unique_constraints)\n target_unique = set(tuple(uc) for uc in target_table.unique_constraints)\n \n for constraint in target_unique - current_unique:\n self.changes['constraints_added'].append({\n 'table': table_name,\n 'constraint_type': 'UNIQUE',\n 'columns': list(constraint)\n })\n \n for constraint in current_unique - target_unique:\n self.changes['constraints_dropped'].append({\n 'table': table_name,\n 'constraint_type': 'UNIQUE',\n 'columns': list(constraint)\n })\n \n # Compare check constraints\n current_checks = set(current_table.check_constraints.items())\n target_checks = set(target_table.check_constraints.items())\n \n for name, condition in target_checks - current_checks:\n self.changes['constraints_added'].append({\n 'table': table_name,\n 'constraint_type': 'CHECK',\n 'constraint_name': name,\n 'condition': condition\n })\n \n for name, condition in current_checks - target_checks:\n self.changes['constraints_dropped'].append({\n 'table': table_name,\n 'constraint_type': 'CHECK',\n 'constraint_name': name,\n 'condition': condition\n })\n \n def _compare_indexes(self):\n \"\"\"Compare index changes.\"\"\"\n common_tables = set(self.current_schema.keys()).intersection(set(self.target_schema.keys()))\n \n for table_name in common_tables:\n current_indexes = {idx['name']: idx for idx in self.current_schema[table_name].indexes}\n target_indexes = {idx['name']: idx for idx in self.target_schema[table_name].indexes}\n \n current_names = set(current_indexes.keys())\n target_names = set(target_indexes.keys())\n \n # Indexes added\n for idx_name in target_names - current_names:\n self.changes['indexes_added'].append({\n 'table': table_name,\n 'index': target_indexes[idx_name]\n })\n \n # Indexes dropped\n for idx_name in current_names - target_names:\n self.changes['indexes_dropped'].append({\n 'table': table_name,\n 'index': current_indexes[idx_name]\n })\n\n\nclass MigrationGenerator:\n \"\"\"Generates migration steps from schema differences.\"\"\"\n \n def __init__(self, zero_downtime: bool = False):\n self.zero_downtime = zero_downtime\n self.migration_steps: List[MigrationStep] = []\n self.step_counter = 0\n \n # Data type conversion safety\n self.safe_type_conversions = {\n ('VARCHAR(50)', 'VARCHAR(100)'): True, # Expanding varchar\n ('INT', 'BIGINT'): True, # Expanding integer\n ('DECIMAL(10,2)', 'DECIMAL(12,2)'): True, # Expanding decimal precision\n }\n \n self.risky_type_conversions = {\n ('VARCHAR(100)', 'VARCHAR(50)'): 'Data truncation possible',\n ('BIGINT', 'INT'): 'Data loss possible for large values',\n ('TEXT', 'VARCHAR(255)'): 'Data truncation possible'\n }\n \n def generate_migration(self, changes: Dict[str, List[Dict[str, Any]]]) -> MigrationPlan:\n \"\"\"Generate complete migration plan from schema changes.\"\"\"\n self.migration_steps = []\n self.step_counter = 0\n \n # Generate steps in dependency order\n self._generate_table_creation_steps(changes['tables_added'])\n self._generate_column_addition_steps(changes['columns_added'])\n self._generate_constraint_addition_steps(changes['constraints_added'])\n self._generate_index_addition_steps(changes['indexes_added'])\n self._generate_column_modification_steps(changes['columns_modified'])\n self._generate_table_rename_steps(changes['tables_renamed'])\n self._generate_index_removal_steps(changes['indexes_dropped'])\n self._generate_constraint_removal_steps(changes['constraints_dropped'])\n self._generate_column_removal_steps(changes['columns_dropped'])\n self._generate_table_removal_steps(changes['tables_dropped'])\n \n # Create migration plan\n migration_id = self._generate_migration_id(changes)\n execution_order = [step.step_id for step in self.migration_steps]\n rollback_order = list(reversed(execution_order))\n \n return MigrationPlan(\n migration_id=migration_id,\n created_at=datetime.now().isoformat(),\n source_schema_hash=self._calculate_changes_hash(changes),\n target_schema_hash=\"\", # Would be calculated from target schema\n steps=self.migration_steps,\n summary=self._generate_summary(changes),\n execution_order=execution_order,\n rollback_order=rollback_order\n )\n \n def _generate_step_id(self) -> str:\n \"\"\"Generate unique step ID.\"\"\"\n self.step_counter += 1\n return f\"step_{self.step_counter:03d}\"\n \n def _generate_table_creation_steps(self, tables_added: List[Dict[str, Any]]):\n \"\"\"Generate steps for creating new tables.\"\"\"\n for table_info in tables_added:\n table = table_info['definition']\n step = self._create_table_step(table)\n self.migration_steps.append(step)\n \n def _create_table_step(self, table: Table) -> MigrationStep:\n \"\"\"Create migration step for table creation.\"\"\"\n columns_sql = []\n \n for col_name, column in table.columns.items():\n col_sql = f\"{col_name} {column.data_type}\"\n \n if not column.nullable:\n col_sql += \" NOT NULL\"\n \n if column.default_value:\n col_sql += f\" DEFAULT {column.default_value}\"\n \n if column.unique:\n col_sql += \" UNIQUE\"\n \n columns_sql.append(col_sql)\n \n # Add primary key\n if table.primary_key:\n pk_sql = f\"PRIMARY KEY ({', '.join(table.primary_key)})\"\n columns_sql.append(pk_sql)\n \n # Add foreign keys\n for col_name, ref in table.foreign_keys.items():\n fk_sql = f\"FOREIGN KEY ({col_name}) REFERENCES {ref}\"\n columns_sql.append(fk_sql)\n \n create_sql = f\"CREATE TABLE {table.name} (\\n \" + \",\\n \".join(columns_sql) + \"\\n);\"\n drop_sql = f\"DROP TABLE IF EXISTS {table.name};\"\n \n return MigrationStep(\n step_id=self._generate_step_id(),\n step_type=\"CREATE_TABLE\",\n table=table.name,\n description=f\"Create table {table.name} with {len(table.columns)} columns\",\n sql_forward=create_sql,\n sql_rollback=drop_sql,\n validation_sql=f\"SELECT COUNT(*) FROM information_schema.tables WHERE table_name = '{table.name}';\",\n risk_level=\"LOW\"\n )\n \n def _generate_column_addition_steps(self, columns_added: List[Dict[str, Any]]):\n \"\"\"Generate steps for adding columns.\"\"\"\n for col_info in columns_added:\n if self.zero_downtime:\n # For zero-downtime, add columns as nullable first\n step = self._add_column_zero_downtime_step(col_info)\n else:\n step = self._add_column_step(col_info)\n self.migration_steps.append(step)\n \n def _add_column_step(self, col_info: Dict[str, Any]) -> MigrationStep:\n \"\"\"Create step for adding a column.\"\"\"\n table = col_info['table']\n column = col_info['definition']\n \n col_sql = f\"{column.name} {column.data_type}\"\n \n if not column.nullable:\n if column.default_value:\n col_sql += f\" DEFAULT {column.default_value} NOT NULL\"\n else:\n # This is risky - adding NOT NULL without default\n col_sql += \" NOT NULL\"\n elif column.default_value:\n col_sql += f\" DEFAULT {column.default_value}\"\n \n add_sql = f\"ALTER TABLE {table} ADD COLUMN {col_sql};\"\n drop_sql = f\"ALTER TABLE {table} DROP COLUMN {column.name};\"\n \n risk_level = \"HIGH\" if not column.nullable and not column.default_value else \"LOW\"\n \n return MigrationStep(\n step_id=self._generate_step_id(),\n step_type=\"ADD_COLUMN\",\n table=table,\n description=f\"Add column {column.name} to {table}\",\n sql_forward=add_sql,\n sql_rollback=drop_sql,\n validation_sql=f\"SELECT COUNT(*) FROM information_schema.columns WHERE table_name = '{table}' AND column_name = '{column.name}';\",\n risk_level=risk_level\n )\n \n def _add_column_zero_downtime_step(self, col_info: Dict[str, Any]) -> MigrationStep:\n \"\"\"Create zero-downtime step for adding column.\"\"\"\n table = col_info['table']\n column = col_info['definition']\n \n # Phase 1: Add as nullable with default if needed\n col_sql = f\"{column.name} {column.data_type}\"\n if column.default_value:\n col_sql += f\" DEFAULT {column.default_value}\"\n \n add_sql = f\"ALTER TABLE {table} ADD COLUMN {col_sql};\"\n \n # If column should be NOT NULL, handle in separate phase\n if not column.nullable:\n # Add comment about needing follow-up step\n add_sql += f\"\\n-- Follow-up needed: Add NOT NULL constraint after data population\"\n \n drop_sql = f\"ALTER TABLE {table} DROP COLUMN {column.name};\"\n \n return MigrationStep(\n step_id=self._generate_step_id(),\n step_type=\"ADD_COLUMN_ZD\",\n table=table,\n description=f\"Add column {column.name} to {table} (zero-downtime phase 1)\",\n sql_forward=add_sql,\n sql_rollback=drop_sql,\n validation_sql=f\"SELECT COUNT(*) FROM information_schema.columns WHERE table_name = '{table}' AND column_name = '{column.name}';\",\n risk_level=\"LOW\",\n zero_downtime_phase=\"EXPAND\"\n )\n \n def _generate_column_modification_steps(self, columns_modified: List[Dict[str, Any]]):\n \"\"\"Generate steps for modifying columns.\"\"\"\n for col_info in columns_modified:\n if self.zero_downtime:\n steps = self._modify_column_zero_downtime_steps(col_info)\n self.migration_steps.extend(steps)\n else:\n step = self._modify_column_step(col_info)\n self.migration_steps.append(step)\n \n def _modify_column_step(self, col_info: Dict[str, Any]) -> MigrationStep:\n \"\"\"Create step for modifying a column.\"\"\"\n table = col_info['table']\n column = col_info['column']\n current_def = col_info['current_definition']\n target_def = col_info['target_definition']\n changes = col_info['changes']\n \n alter_statements = []\n rollback_statements = []\n \n # Handle different types of changes\n if current_def.data_type != target_def.data_type:\n alter_statements.append(f\"ALTER COLUMN {column} TYPE {target_def.data_type}\")\n rollback_statements.append(f\"ALTER COLUMN {column} TYPE {current_def.data_type}\")\n \n if current_def.nullable != target_def.nullable:\n if target_def.nullable:\n alter_statements.append(f\"ALTER COLUMN {column} DROP NOT NULL\")\n rollback_statements.append(f\"ALTER COLUMN {column} SET NOT NULL\")\n else:\n alter_statements.append(f\"ALTER COLUMN {column} SET NOT NULL\")\n rollback_statements.append(f\"ALTER COLUMN {column} DROP NOT NULL\")\n \n if current_def.default_value != target_def.default_value:\n if target_def.default_value:\n alter_statements.append(f\"ALTER COLUMN {column} SET DEFAULT {target_def.default_value}\")\n else:\n alter_statements.append(f\"ALTER COLUMN {column} DROP DEFAULT\")\n \n if current_def.default_value:\n rollback_statements.append(f\"ALTER COLUMN {column} SET DEFAULT {current_def.default_value}\")\n else:\n rollback_statements.append(f\"ALTER COLUMN {column} DROP DEFAULT\")\n \n # Build SQL\n alter_sql = f\"ALTER TABLE {table}\\n \" + \",\\n \".join(alter_statements) + \";\"\n rollback_sql = f\"ALTER TABLE {table}\\n \" + \",\\n \".join(rollback_statements) + \";\"\n \n # Assess risk\n risk_level = self._assess_column_modification_risk(current_def, target_def)\n \n return MigrationStep(\n step_id=self._generate_step_id(),\n step_type=\"MODIFY_COLUMN\",\n table=table,\n description=f\"Modify column {column}: {', '.join(changes)}\",\n sql_forward=alter_sql,\n sql_rollback=rollback_sql,\n validation_sql=f\"SELECT data_type, is_nullable FROM information_schema.columns WHERE table_name = '{table}' AND column_name = '{column}';\",\n risk_level=risk_level\n )\n \n def _modify_column_zero_downtime_steps(self, col_info: Dict[str, Any]) -> List[MigrationStep]:\n \"\"\"Create zero-downtime steps for column modification.\"\"\"\n table = col_info['table']\n column = col_info['column']\n current_def = col_info['current_definition']\n target_def = col_info['target_definition']\n \n steps = []\n \n # For zero-downtime, use expand-contract pattern\n temp_column = f\"{column}_new\"\n \n # Step 1: Add new column\n step1 = MigrationStep(\n step_id=self._generate_step_id(),\n step_type=\"ADD_TEMP_COLUMN\",\n table=table,\n description=f\"Add temporary column {temp_column} for zero-downtime migration\",\n sql_forward=f\"ALTER TABLE {table} ADD COLUMN {temp_column} {target_def.data_type};\",\n sql_rollback=f\"ALTER TABLE {table} DROP COLUMN {temp_column};\",\n zero_downtime_phase=\"EXPAND\"\n )\n steps.append(step1)\n \n # Step 2: Copy data\n step2 = MigrationStep(\n step_id=self._generate_step_id(),\n step_type=\"COPY_COLUMN_DATA\",\n table=table,\n description=f\"Copy data from {column} to {temp_column}\",\n sql_forward=f\"UPDATE {table} SET {temp_column} = {column};\",\n sql_rollback=f\"UPDATE {table} SET {temp_column} = NULL;\",\n zero_downtime_phase=\"EXPAND\"\n )\n steps.append(step2)\n \n # Step 3: Drop old column\n step3 = MigrationStep(\n step_id=self._generate_step_id(),\n step_type=\"DROP_OLD_COLUMN\",\n table=table,\n description=f\"Drop original column {column}\",\n sql_forward=f\"ALTER TABLE {table} DROP COLUMN {column};\",\n sql_rollback=f\"ALTER TABLE {table} ADD COLUMN {column} {current_def.data_type};\",\n zero_downtime_phase=\"CONTRACT\"\n )\n steps.append(step3)\n \n # Step 4: Rename new column\n step4 = MigrationStep(\n step_id=self._generate_step_id(),\n step_type=\"RENAME_COLUMN\",\n table=table,\n description=f\"Rename {temp_column} to {column}\",\n sql_forward=f\"ALTER TABLE {table} RENAME COLUMN {temp_column} TO {column};\",\n sql_rollback=f\"ALTER TABLE {table} RENAME COLUMN {column} TO {temp_column};\",\n zero_downtime_phase=\"CONTRACT\"\n )\n steps.append(step4)\n \n return steps\n \n def _assess_column_modification_risk(self, current: Column, target: Column) -> str:\n \"\"\"Assess risk level of column modification.\"\"\"\n if current.data_type != target.data_type:\n conversion_key = (current.data_type, target.data_type)\n if conversion_key in self.risky_type_conversions:\n return \"HIGH\"\n elif conversion_key not in self.safe_type_conversions:\n return \"MEDIUM\"\n \n if current.nullable and not target.nullable:\n return \"HIGH\" # Adding NOT NULL constraint\n \n return \"LOW\"\n \n def _generate_constraint_addition_steps(self, constraints_added: List[Dict[str, Any]]):\n \"\"\"Generate steps for adding constraints.\"\"\"\n for constraint_info in constraints_added:\n step = self._add_constraint_step(constraint_info)\n self.migration_steps.append(step)\n \n def _add_constraint_step(self, constraint_info: Dict[str, Any]) -> MigrationStep:\n \"\"\"Create step for adding constraint.\"\"\"\n table = constraint_info['table']\n constraint_type = constraint_info['constraint_type']\n \n if constraint_type == 'PRIMARY_KEY':\n columns = constraint_info['columns']\n constraint_name = f\"pk_{table}\"\n add_sql = f\"ALTER TABLE {table} ADD CONSTRAINT {constraint_name} PRIMARY KEY ({', '.join(columns)});\"\n drop_sql = f\"ALTER TABLE {table} DROP CONSTRAINT {constraint_name};\"\n description = f\"Add primary key on {', '.join(columns)}\"\n \n elif constraint_type == 'UNIQUE':\n columns = constraint_info['columns']\n constraint_name = f\"uq_{table}_{'_'.join(columns)}\"\n add_sql = f\"ALTER TABLE {table} ADD CONSTRAINT {constraint_name} UNIQUE ({', '.join(columns)});\"\n drop_sql = f\"ALTER TABLE {table} DROP CONSTRAINT {constraint_name};\"\n description = f\"Add unique constraint on {', '.join(columns)}\"\n \n elif constraint_type == 'CHECK':\n constraint_name = constraint_info['constraint_name']\n condition = constraint_info['condition']\n add_sql = f\"ALTER TABLE {table} ADD CONSTRAINT {constraint_name} CHECK ({condition});\"\n drop_sql = f\"ALTER TABLE {table} DROP CONSTRAINT {constraint_name};\"\n description = f\"Add check constraint: {condition}\"\n \n else:\n return None\n \n return MigrationStep(\n step_id=self._generate_step_id(),\n step_type=\"ADD_CONSTRAINT\",\n table=table,\n description=description,\n sql_forward=add_sql,\n sql_rollback=drop_sql,\n risk_level=\"MEDIUM\" # Constraints can fail if data doesn't comply\n )\n \n def _generate_index_addition_steps(self, indexes_added: List[Dict[str, Any]]):\n \"\"\"Generate steps for adding indexes.\"\"\"\n for index_info in indexes_added:\n step = self._add_index_step(index_info)\n self.migration_steps.append(step)\n \n def _add_index_step(self, index_info: Dict[str, Any]) -> MigrationStep:\n \"\"\"Create step for adding index.\"\"\"\n table = index_info['table']\n index = index_info['index']\n \n unique_keyword = \"UNIQUE \" if index.get('unique', False) else \"\"\n columns_sql = ', '.join(index['columns'])\n \n create_sql = f\"CREATE {unique_keyword}INDEX {index['name']} ON {table} ({columns_sql});\"\n drop_sql = f\"DROP INDEX {index['name']};\"\n \n return MigrationStep(\n step_id=self._generate_step_id(),\n step_type=\"ADD_INDEX\",\n table=table,\n description=f\"Create index {index['name']} on ({columns_sql})\",\n sql_forward=create_sql,\n sql_rollback=drop_sql,\n estimated_time=\"1-5 minutes depending on table size\",\n risk_level=\"LOW\"\n )\n \n def _generate_table_rename_steps(self, tables_renamed: List[Dict[str, Any]]):\n \"\"\"Generate steps for renaming tables.\"\"\"\n for rename_info in tables_renamed:\n step = self._rename_table_step(rename_info)\n self.migration_steps.append(step)\n \n def _rename_table_step(self, rename_info: Dict[str, Any]) -> MigrationStep:\n \"\"\"Create step for renaming table.\"\"\"\n old_name = rename_info['old_name']\n new_name = rename_info['new_name']\n \n rename_sql = f\"ALTER TABLE {old_name} RENAME TO {new_name};\"\n rollback_sql = f\"ALTER TABLE {new_name} RENAME TO {old_name};\"\n \n return MigrationStep(\n step_id=self._generate_step_id(),\n step_type=\"RENAME_TABLE\",\n table=old_name,\n description=f\"Rename table {old_name} to {new_name}\",\n sql_forward=rename_sql,\n sql_rollback=rollback_sql,\n validation_sql=f\"SELECT COUNT(*) FROM information_schema.tables WHERE table_name = '{new_name}';\",\n risk_level=\"LOW\"\n )\n \n def _generate_column_removal_steps(self, columns_dropped: List[Dict[str, Any]]):\n \"\"\"Generate steps for removing columns.\"\"\"\n for col_info in columns_dropped:\n step = self._drop_column_step(col_info)\n self.migration_steps.append(step)\n \n def _drop_column_step(self, col_info: Dict[str, Any]) -> MigrationStep:\n \"\"\"Create step for dropping column.\"\"\"\n table = col_info['table']\n column = col_info['definition']\n \n drop_sql = f\"ALTER TABLE {table} DROP COLUMN {column.name};\"\n \n # Recreate column for rollback\n col_sql = f\"{column.name} {column.data_type}\"\n if not column.nullable:\n col_sql += \" NOT NULL\"\n if column.default_value:\n col_sql += f\" DEFAULT {column.default_value}\"\n \n add_sql = f\"ALTER TABLE {table} ADD COLUMN {col_sql};\"\n \n return MigrationStep(\n step_id=self._generate_step_id(),\n step_type=\"DROP_COLUMN\",\n table=table,\n description=f\"Drop column {column.name} from {table}\",\n sql_forward=drop_sql,\n sql_rollback=add_sql,\n risk_level=\"HIGH\" # Data loss risk\n )\n \n def _generate_constraint_removal_steps(self, constraints_dropped: List[Dict[str, Any]]):\n \"\"\"Generate steps for removing constraints.\"\"\"\n for constraint_info in constraints_dropped:\n step = self._drop_constraint_step(constraint_info)\n if step:\n self.migration_steps.append(step)\n \n def _drop_constraint_step(self, constraint_info: Dict[str, Any]) -> Optional[MigrationStep]:\n \"\"\"Create step for dropping constraint.\"\"\"\n table = constraint_info['table']\n constraint_type = constraint_info['constraint_type']\n \n if constraint_type == 'PRIMARY_KEY':\n constraint_name = f\"pk_{table}\"\n drop_sql = f\"ALTER TABLE {table} DROP CONSTRAINT {constraint_name};\"\n columns = constraint_info['columns']\n add_sql = f\"ALTER TABLE {table} ADD CONSTRAINT {constraint_name} PRIMARY KEY ({', '.join(columns)});\"\n description = f\"Drop primary key constraint\"\n \n elif constraint_type == 'UNIQUE':\n columns = constraint_info['columns']\n constraint_name = f\"uq_{table}_{'_'.join(columns)}\"\n drop_sql = f\"ALTER TABLE {table} DROP CONSTRAINT {constraint_name};\"\n add_sql = f\"ALTER TABLE {table} ADD CONSTRAINT {constraint_name} UNIQUE ({', '.join(columns)});\"\n description = f\"Drop unique constraint on {', '.join(columns)}\"\n \n elif constraint_type == 'CHECK':\n constraint_name = constraint_info['constraint_name']\n condition = constraint_info.get('condition', '')\n drop_sql = f\"ALTER TABLE {table} DROP CONSTRAINT {constraint_name};\"\n add_sql = f\"ALTER TABLE {table} ADD CONSTRAINT {constraint_name} CHECK ({condition});\"\n description = f\"Drop check constraint {constraint_name}\"\n \n else:\n return None\n \n return MigrationStep(\n step_id=self._generate_step_id(),\n step_type=\"DROP_CONSTRAINT\",\n table=table,\n description=description,\n sql_forward=drop_sql,\n sql_rollback=add_sql,\n risk_level=\"MEDIUM\"\n )\n \n def _generate_index_removal_steps(self, indexes_dropped: List[Dict[str, Any]]):\n \"\"\"Generate steps for removing indexes.\"\"\"\n for index_info in indexes_dropped:\n step = self._drop_index_step(index_info)\n self.migration_steps.append(step)\n \n def _drop_index_step(self, index_info: Dict[str, Any]) -> MigrationStep:\n \"\"\"Create step for dropping index.\"\"\"\n table = index_info['table']\n index = index_info['index']\n \n drop_sql = f\"DROP INDEX {index['name']};\"\n \n # Recreate for rollback\n unique_keyword = \"UNIQUE \" if index.get('unique', False) else \"\"\n columns_sql = ', '.join(index['columns'])\n create_sql = f\"CREATE {unique_keyword}INDEX {index['name']} ON {table} ({columns_sql});\"\n \n return MigrationStep(\n step_id=self._generate_step_id(),\n step_type=\"DROP_INDEX\",\n table=table,\n description=f\"Drop index {index['name']}\",\n sql_forward=drop_sql,\n sql_rollback=create_sql,\n risk_level=\"LOW\"\n )\n \n def _generate_table_removal_steps(self, tables_dropped: List[Dict[str, Any]]):\n \"\"\"Generate steps for removing tables.\"\"\"\n for table_info in tables_dropped:\n step = self._drop_table_step(table_info)\n self.migration_steps.append(step)\n \n def _drop_table_step(self, table_info: Dict[str, Any]) -> MigrationStep:\n \"\"\"Create step for dropping table.\"\"\"\n table = table_info['definition']\n \n drop_sql = f\"DROP TABLE {table.name};\"\n \n # Would need to recreate entire table for rollback\n # This is simplified - full implementation would generate CREATE TABLE statement\n create_sql = f\"-- Recreate table {table.name} (implementation needed)\"\n \n return MigrationStep(\n step_id=self._generate_step_id(),\n step_type=\"DROP_TABLE\",\n table=table.name,\n description=f\"Drop table {table.name}\",\n sql_forward=drop_sql,\n sql_rollback=create_sql,\n risk_level=\"HIGH\" # Data loss risk\n )\n \n def _generate_migration_id(self, changes: Dict[str, List[Dict[str, Any]]]) -> str:\n \"\"\"Generate unique migration ID.\"\"\"\n content = json.dumps(changes, sort_keys=True)\n return hashlib.md5(content.encode()).hexdigest()[:8]\n \n def _calculate_changes_hash(self, changes: Dict[str, List[Dict[str, Any]]]) -> str:\n \"\"\"Calculate hash of changes for versioning.\"\"\"\n content = json.dumps(changes, sort_keys=True)\n return hashlib.md5(content.encode()).hexdigest()\n \n def _generate_summary(self, changes: Dict[str, List[Dict[str, Any]]]) -> Dict[str, Any]:\n \"\"\"Generate migration summary.\"\"\"\n summary = {\n \"total_steps\": len(self.migration_steps),\n \"changes_summary\": {\n \"tables_added\": len(changes['tables_added']),\n \"tables_dropped\": len(changes['tables_dropped']),\n \"tables_renamed\": len(changes['tables_renamed']),\n \"columns_added\": len(changes['columns_added']),\n \"columns_dropped\": len(changes['columns_dropped']),\n \"columns_modified\": len(changes['columns_modified']),\n \"constraints_added\": len(changes['constraints_added']),\n \"constraints_dropped\": len(changes['constraints_dropped']),\n \"indexes_added\": len(changes['indexes_added']),\n \"indexes_dropped\": len(changes['indexes_dropped'])\n },\n \"risk_assessment\": {\n \"high_risk_steps\": len([s for s in self.migration_steps if s.risk_level == \"HIGH\"]),\n \"medium_risk_steps\": len([s for s in self.migration_steps if s.risk_level == \"MEDIUM\"]),\n \"low_risk_steps\": len([s for s in self.migration_steps if s.risk_level == \"LOW\"])\n },\n \"zero_downtime\": self.zero_downtime\n }\n \n return summary\n\n\nclass ValidationGenerator:\n \"\"\"Generates validation queries for migration verification.\"\"\"\n \n def generate_validations(self, migration_plan: MigrationPlan) -> List[ValidationCheck]:\n \"\"\"Generate validation checks for migration plan.\"\"\"\n validations = []\n \n for step in migration_plan.steps:\n if step.step_type == \"CREATE_TABLE\":\n validations.append(self._create_table_validation(step))\n elif step.step_type == \"ADD_COLUMN\":\n validations.append(self._add_column_validation(step))\n elif step.step_type == \"MODIFY_COLUMN\":\n validations.append(self._modify_column_validation(step))\n elif step.step_type == \"ADD_INDEX\":\n validations.append(self._add_index_validation(step))\n \n return validations\n \n def _create_table_validation(self, step: MigrationStep) -> ValidationCheck:\n \"\"\"Create validation for table creation.\"\"\"\n return ValidationCheck(\n check_id=f\"validate_{step.step_id}\",\n check_type=\"TABLE_EXISTS\",\n table=step.table,\n description=f\"Verify table {step.table} exists\",\n sql_query=f\"SELECT COUNT(*) FROM information_schema.tables WHERE table_name = '{step.table}';\",\n expected_result=1\n )\n \n def _add_column_validation(self, step: MigrationStep) -> ValidationCheck:\n \"\"\"Create validation for column addition.\"\"\"\n # Extract column name from SQL\n column_match = re.search(r'ADD COLUMN (\\w+)', step.sql_forward)\n column_name = column_match.group(1) if column_match else \"unknown\"\n \n return ValidationCheck(\n check_id=f\"validate_{step.step_id}\",\n check_type=\"COLUMN_EXISTS\",\n table=step.table,\n description=f\"Verify column {column_name} exists in {step.table}\",\n sql_query=f\"SELECT COUNT(*) FROM information_schema.columns WHERE table_name = '{step.table}' AND column_name = '{column_name}';\",\n expected_result=1\n )\n \n def _modify_column_validation(self, step: MigrationStep) -> ValidationCheck:\n \"\"\"Create validation for column modification.\"\"\"\n return ValidationCheck(\n check_id=f\"validate_{step.step_id}\",\n check_type=\"COLUMN_MODIFIED\",\n table=step.table,\n description=f\"Verify column modification in {step.table}\",\n sql_query=step.validation_sql or f\"SELECT 1;\", # Use provided validation or default\n expected_result=1\n )\n \n def _add_index_validation(self, step: MigrationStep) -> ValidationCheck:\n \"\"\"Create validation for index addition.\"\"\"\n # Extract index name from SQL\n index_match = re.search(r'INDEX (\\w+)', step.sql_forward)\n index_name = index_match.group(1) if index_match else \"unknown\"\n \n return ValidationCheck(\n check_id=f\"validate_{step.step_id}\",\n check_type=\"INDEX_EXISTS\",\n table=step.table,\n description=f\"Verify index {index_name} exists\",\n sql_query=f\"SELECT COUNT(*) FROM information_schema.statistics WHERE index_name = '{index_name}';\",\n expected_result=1\n )\n\n\ndef format_migration_plan_text(plan: MigrationPlan, validations: List[ValidationCheck] = None) -> str:\n \"\"\"Format migration plan as human-readable text.\"\"\"\n lines = []\n lines.append(\"DATABASE MIGRATION PLAN\")\n lines.append(\"=\" * 50)\n lines.append(f\"Migration ID: {plan.migration_id}\")\n lines.append(f\"Created: {plan.created_at}\")\n lines.append(f\"Zero Downtime: {plan.summary['zero_downtime']}\")\n lines.append(\"\")\n \n # Summary\n summary = plan.summary\n lines.append(\"MIGRATION SUMMARY\")\n lines.append(\"-\" * 17)\n lines.append(f\"Total Steps: {summary['total_steps']}\")\n \n changes = summary['changes_summary']\n for change_type, count in changes.items():\n if count > 0:\n lines.append(f\"{change_type.replace('_', ' ').title()}: {count}\")\n lines.append(\"\")\n \n # Risk Assessment\n risk = summary['risk_assessment']\n lines.append(\"RISK ASSESSMENT\")\n lines.append(\"-\" * 15)\n lines.append(f\"High Risk Steps: {risk['high_risk_steps']}\")\n lines.append(f\"Medium Risk Steps: {risk['medium_risk_steps']}\")\n lines.append(f\"Low Risk Steps: {risk['low_risk_steps']}\")\n lines.append(\"\")\n \n # Migration Steps\n lines.append(\"MIGRATION STEPS\")\n lines.append(\"-\" * 15)\n for i, step in enumerate(plan.steps, 1):\n lines.append(f\"{i}. {step.description} ({step.risk_level} risk)\")\n lines.append(f\" Type: {step.step_type}\")\n if step.zero_downtime_phase:\n lines.append(f\" Phase: {step.zero_downtime_phase}\")\n lines.append(f\" Forward SQL: {step.sql_forward}\")\n lines.append(f\" Rollback SQL: {step.sql_rollback}\")\n if step.estimated_time:\n lines.append(f\" Estimated Time: {step.estimated_time}\")\n lines.append(\"\")\n \n # Validation Checks\n if validations:\n lines.append(\"VALIDATION CHECKS\")\n lines.append(\"-\" * 17)\n for validation in validations:\n lines.append(f\"• {validation.description}\")\n lines.append(f\" SQL: {validation.sql_query}\")\n lines.append(f\" Expected: {validation.expected_result}\")\n lines.append(\"\")\n \n return \"\\n\".join(lines)\n\n\ndef main():\n parser = argparse.ArgumentParser(description=\"Generate database migration scripts\")\n parser.add_argument(\"--current\", \"-c\", required=True, help=\"Current schema JSON file\")\n parser.add_argument(\"--target\", \"-t\", required=True, help=\"Target schema JSON file\")\n parser.add_argument(\"--output\", \"-o\", help=\"Output file (default: stdout)\")\n parser.add_argument(\"--format\", \"-f\", choices=[\"json\", \"text\", \"sql\"], default=\"text\",\n help=\"Output format\")\n parser.add_argument(\"--zero-downtime\", \"-z\", action=\"store_true\",\n help=\"Generate zero-downtime migration strategy\")\n parser.add_argument(\"--validate-only\", \"-v\", action=\"store_true\",\n help=\"Only generate validation queries\")\n parser.add_argument(\"--include-validations\", action=\"store_true\",\n help=\"Include validation queries in output\")\n \n args = parser.parse_args()\n \n try:\n # Load schemas\n with open(args.current, 'r') as f:\n current_schema = json.load(f)\n \n with open(args.target, 'r') as f:\n target_schema = json.load(f)\n \n # Compare schemas\n comparator = SchemaComparator()\n comparator.load_schemas(current_schema, target_schema)\n changes = comparator.compare_schemas()\n \n if not any(changes.values()):\n print(\"No schema changes detected.\")\n return 0\n \n # Generate migration\n generator = MigrationGenerator(zero_downtime=args.zero_downtime)\n migration_plan = generator.generate_migration(changes)\n \n # Generate validations if requested\n validations = None\n if args.include_validations or args.validate_only:\n validator = ValidationGenerator()\n validations = validator.generate_validations(migration_plan)\n \n # Format output\n if args.validate_only:\n output = json.dumps([asdict(v) for v in validations], indent=2)\n elif args.format == \"json\":\n result = {\"migration_plan\": asdict(migration_plan)}\n if validations:\n result[\"validations\"] = [asdict(v) for v in validations]\n output = json.dumps(result, indent=2)\n elif args.format == \"sql\":\n sql_lines = []\n sql_lines.append(\"-- Database Migration Script\")\n sql_lines.append(f\"-- Migration ID: {migration_plan.migration_id}\")\n sql_lines.append(f\"-- Created: {migration_plan.created_at}\")\n sql_lines.append(\"\")\n \n for step in migration_plan.steps:\n sql_lines.append(f\"-- Step: {step.description}\")\n sql_lines.append(step.sql_forward)\n sql_lines.append(\"\")\n \n output = \"\\n\".join(sql_lines)\n else: # text format\n output = format_migration_plan_text(migration_plan, validations)\n \n # Write output\n if args.output:\n with open(args.output, 'w') as f:\n f.write(output)\n else:\n print(output)\n \n return 0\n \n except Exception as e:\n print(f\"Error: {e}\", file=sys.stderr)\n return 1\n\n\nif __name__ == \"__main__\":\n sys.exit(main())","content_type":"text/x-python; charset=utf-8","language":"python","size":50373,"content_sha256":"420e49cc56fd5a3b1d9ad5983462f539389548dc885142fa0c0b596a44b3ec92"},{"filename":"README.md","content":"# Database Designer - POWERFUL Tier Skill\n\nA comprehensive database design and analysis toolkit that provides expert-level schema analysis, index optimization, and migration generation capabilities for modern database systems.\n\n## Features\n\n### 🔍 Schema Analyzer\n- **Normalization Analysis**: Automated detection of 1NF through BCNF violations\n- **Data Type Optimization**: Identifies antipatterns and inappropriate types\n- **Constraint Analysis**: Finds missing foreign keys, unique constraints, and checks\n- **ERD Generation**: Creates Mermaid diagrams from DDL or JSON schema\n- **Naming Convention Validation**: Ensures consistent naming patterns\n\n### ⚡ Index Optimizer \n- **Missing Index Detection**: Identifies indexes needed for query patterns\n- **Composite Index Design**: Optimizes column ordering for maximum efficiency\n- **Redundancy Analysis**: Finds duplicate and overlapping indexes\n- **Performance Modeling**: Estimates selectivity and query performance impact\n- **Covering Index Recommendations**: Eliminates table lookups\n\n### 🚀 Migration Generator\n- **Zero-Downtime Migrations**: Implements expand-contract patterns\n- **Schema Evolution**: Handles column changes, table renames, constraint updates\n- **Data Migration Scripts**: Automated data transformation and validation\n- **Rollback Planning**: Complete reversal capabilities for all changes\n- **Execution Orchestration**: Dependency-aware migration ordering\n\n## Quick Start\n\n### Prerequisites\n- Python 3.7+ (no external dependencies required)\n- Database schema in SQL DDL format or JSON\n- Query patterns (for index optimization)\n\n### Installation\n```bash\n# Clone or download the database-designer skill\ncd engineering/database-designer/\n\n# Make scripts executable\nchmod +x *.py\n```\n\n## Usage Examples\n\n### Schema Analysis\n\n**Analyze SQL DDL file:**\n```bash\npython schema_analyzer.py --input assets/sample_schema.sql --output-format text\n```\n\n**Generate ERD diagram:**\n```bash\npython schema_analyzer.py --input assets/sample_schema.sql --generate-erd --output analysis.txt\n```\n\n**JSON schema analysis:**\n```bash\npython schema_analyzer.py --input assets/sample_schema.json --output-format json --output results.json\n```\n\n### Index Optimization\n\n**Basic index analysis:**\n```bash\npython index_optimizer.py --schema assets/sample_schema.json --queries assets/sample_query_patterns.json\n```\n\n**High-priority recommendations only:**\n```bash\npython index_optimizer.py --schema assets/sample_schema.json --queries assets/sample_query_patterns.json --min-priority 2\n```\n\n**JSON output with existing index analysis:**\n```bash\npython index_optimizer.py --schema assets/sample_schema.json --queries assets/sample_query_patterns.json --format json --analyze-existing\n```\n\n### Migration Generation\n\n**Generate migration between schemas:**\n```bash\npython migration_generator.py --current assets/current_schema.json --target assets/target_schema.json\n```\n\n**Zero-downtime migration:**\n```bash\npython migration_generator.py --current current.json --target target.json --zero-downtime --format sql\n```\n\n**Include validation queries:**\n```bash\npython migration_generator.py --current current.json --target target.json --include-validations --output migration_plan.txt\n```\n\n## Tool Documentation\n\n### Schema Analyzer\n\n**Input Formats:**\n- SQL DDL files (.sql)\n- JSON schema definitions (.json)\n\n**Key Capabilities:**\n- Detects 1NF violations (non-atomic values, repeating groups)\n- Identifies 2NF issues (partial dependencies in composite keys)\n- Finds 3NF problems (transitive dependencies)\n- Checks BCNF compliance (determinant key requirements)\n- Validates data types (VARCHAR(255) antipattern, inappropriate types)\n- Missing constraints (NOT NULL, UNIQUE, CHECK, foreign keys)\n- Naming convention adherence\n\n**Sample Command:**\n```bash\npython schema_analyzer.py \\\n --input sample_schema.sql \\\n --generate-erd \\\n --output-format text \\\n --output analysis.txt\n```\n\n**Output:**\n- Comprehensive text or JSON analysis report\n- Mermaid ERD diagram\n- Prioritized recommendations\n- SQL statements for improvements\n\n### Index Optimizer\n\n**Input Requirements:**\n- Schema definition (JSON format)\n- Query patterns with frequency and selectivity data\n\n**Analysis Features:**\n- Selectivity estimation based on column patterns\n- Composite index column ordering optimization \n- Covering index recommendations for SELECT queries\n- Foreign key index validation\n- Redundancy detection (duplicates, overlaps, unused indexes)\n- Performance impact modeling\n\n**Sample Command:**\n```bash\npython index_optimizer.py \\\n --schema schema.json \\\n --queries query_patterns.json \\\n --format text \\\n --min-priority 3 \\\n --output recommendations.txt\n```\n\n**Output:**\n- Prioritized index recommendations\n- CREATE INDEX statements\n- Drop statements for redundant indexes\n- Performance impact analysis\n- Storage size estimates\n\n### Migration Generator\n\n**Input Requirements:**\n- Current schema (JSON format)\n- Target schema (JSON format)\n\n**Migration Strategies:**\n- Standard migrations with ALTER statements\n- Zero-downtime expand-contract patterns\n- Data migration and transformation scripts\n- Constraint management (add/drop in correct order)\n- Index management with timing estimates\n\n**Sample Command:**\n```bash\npython migration_generator.py \\\n --current current_schema.json \\\n --target target_schema.json \\\n --zero-downtime \\\n --include-validations \\\n --format text\n```\n\n**Output:**\n- Step-by-step migration plan\n- Forward and rollback SQL statements\n- Risk assessment for each step\n- Validation queries\n- Execution time estimates\n\n## File Structure\n\n```\ndatabase-designer/\n├── README.md # This file\n├── SKILL.md # Comprehensive database design guide\n├── schema_analyzer.py # Schema analysis tool\n├── index_optimizer.py # Index optimization tool \n├── migration_generator.py # Migration generation tool\n├── references/ # Reference documentation\n│ ├── normalization_guide.md # Normalization principles and patterns\n│ ├── index_strategy_patterns.md # Index design and optimization guide\n│ └── database_selection_decision_tree.md # Database technology selection\n├── assets/ # Sample files and test data\n│ ├── sample_schema.sql # Sample DDL with various issues\n│ ├── sample_schema.json # JSON schema definition\n│ └── sample_query_patterns.json # Query patterns for index analysis\n└── expected_outputs/ # Example tool outputs\n ├── schema_analysis_sample.txt # Sample schema analysis report\n ├── index_optimization_sample.txt # Sample index recommendations\n └── migration_sample.txt # Sample migration plan\n```\n\n## JSON Schema Format\n\nThe tools use a standardized JSON format for schema definitions:\n\n```json\n{\n \"tables\": {\n \"table_name\": {\n \"columns\": {\n \"column_name\": {\n \"type\": \"VARCHAR(255)\",\n \"nullable\": true,\n \"unique\": false,\n \"foreign_key\": \"other_table.column\",\n \"default\": \"default_value\",\n \"cardinality_estimate\": 1000\n }\n },\n \"primary_key\": [\"id\"],\n \"unique_constraints\": [[\"email\"], [\"username\"]],\n \"check_constraints\": {\n \"chk_positive_price\": \"price > 0\"\n },\n \"indexes\": [\n {\n \"name\": \"idx_table_column\",\n \"columns\": [\"column_name\"],\n \"unique\": false,\n \"partial_condition\": \"status = 'active'\"\n }\n ]\n }\n }\n}\n```\n\n## Query Patterns Format\n\nFor index optimization, provide query patterns in this format:\n\n```json\n{\n \"queries\": [\n {\n \"id\": \"user_lookup\",\n \"type\": \"SELECT\",\n \"table\": \"users\",\n \"where_conditions\": [\n {\n \"column\": \"email\",\n \"operator\": \"=\",\n \"selectivity\": 0.95\n }\n ],\n \"join_conditions\": [\n {\n \"local_column\": \"user_id\",\n \"foreign_table\": \"orders\",\n \"foreign_column\": \"id\",\n \"join_type\": \"INNER\"\n }\n ],\n \"order_by\": [\n {\"column\": \"created_at\", \"direction\": \"DESC\"}\n ],\n \"frequency\": 1000,\n \"avg_execution_time_ms\": 5.2\n }\n ]\n}\n```\n\n## Best Practices\n\n### Schema Analysis\n1. **Start with DDL**: Use actual CREATE TABLE statements when possible\n2. **Include Constraints**: Capture all existing constraints and indexes\n3. **Consider History**: Some denormalization may be intentional for performance\n4. **Validate Results**: Review recommendations against business requirements\n\n### Index Optimization \n1. **Real Query Patterns**: Use actual application queries, not theoretical ones\n2. **Include Frequency**: Query frequency is crucial for prioritization\n3. **Monitor Performance**: Validate recommendations with actual performance testing\n4. **Gradual Implementation**: Add indexes incrementally and monitor impact\n\n### Migration Planning\n1. **Test Migrations**: Always test on non-production environments first\n2. **Backup First**: Ensure complete backups before running migrations\n3. **Monitor Progress**: Watch for locks and performance impacts during execution\n4. **Rollback Ready**: Have rollback procedures tested and ready\n\n## Advanced Usage\n\n### Custom Selectivity Estimation\nThe index optimizer uses pattern-based selectivity estimation. You can improve accuracy by providing cardinality estimates in your schema JSON:\n\n```json\n{\n \"columns\": {\n \"status\": {\n \"type\": \"VARCHAR(20)\",\n \"cardinality_estimate\": 5 # Only 5 distinct values\n }\n }\n}\n```\n\n### Zero-Downtime Migration Strategy\nFor production systems, use the zero-downtime flag to generate expand-contract migrations:\n\n1. **Expand Phase**: Add new columns/tables without constraints\n2. **Dual Write**: Application writes to both old and new structures \n3. **Backfill**: Populate new structures with existing data\n4. **Contract Phase**: Remove old structures after validation\n\n### Integration with CI/CD\nIntegrate these tools into your deployment pipeline:\n\n```bash\n# Schema validation in CI\npython schema_analyzer.py --input schema.sql --output-format json | \\\n jq '.constraint_analysis.total_issues' | \\\n test $(cat) -eq 0 || exit 1\n\n# Generate migrations automatically\npython migration_generator.py \\\n --current prod_schema.json \\\n --target new_schema.json \\\n --zero-downtime \\\n --output migration.sql\n```\n\n## Troubleshooting\n\n### Common Issues\n\n**\"No tables found in input file\"**\n- Ensure SQL DDL uses standard CREATE TABLE syntax\n- Check for syntax errors in DDL\n- Verify file encoding (UTF-8 recommended)\n\n**\"Invalid JSON schema\"** \n- Validate JSON syntax with a JSON validator\n- Ensure all required fields are present\n- Check that foreign key references use \"table.column\" format\n\n**\"Analysis shows no issues but problems exist\"**\n- Tools use heuristic analysis - review recommendations carefully\n- Some design decisions may be intentional (denormalization for performance)\n- Consider domain-specific requirements not captured by general rules\n\n### Performance Tips\n\n**Large Schemas:**\n- Use `--output-format json` for machine processing\n- Consider analyzing subsets of tables for very large schemas\n- Provide cardinality estimates for better index recommendations\n\n**Complex Queries:**\n- Include actual execution times in query patterns\n- Provide realistic frequency estimates\n- Consider seasonal or usage pattern variations\n\n## Contributing\n\nThis is a self-contained skill with no external dependencies. To extend functionality:\n\n1. Follow the existing code patterns\n2. Maintain Python standard library only requirement\n3. Add comprehensive test cases for new features\n4. Update documentation and examples\n\n## License\n\nThis database designer skill is part of the claude-skills collection and follows the same licensing terms.","content_type":"text/markdown; charset=utf-8","language":"markdown","size":11966,"content_sha256":"18264cfce2a0cf29a83902a64909944eac6a6d3ae50d6366f307a6fd97c0bbb5"},{"filename":"references/database_selection_decision_tree.md","content":"# Database Selection Decision Tree\n\n## Overview\n\nChoosing the right database technology is crucial for application success. This guide provides a systematic approach to database selection based on specific requirements, data patterns, and operational constraints.\n\n## Decision Framework\n\n### Primary Questions\n\n1. **What is your primary use case?**\n - OLTP (Online Transaction Processing)\n - OLAP (Online Analytical Processing) \n - Real-time analytics\n - Content management\n - Search and discovery\n - Time-series data\n - Graph relationships\n\n2. **What are your consistency requirements?**\n - Strong consistency (ACID)\n - Eventual consistency\n - Causal consistency\n - Session consistency\n\n3. **What are your scalability needs?**\n - Vertical scaling sufficient\n - Horizontal scaling required\n - Global distribution needed\n - Multi-region requirements\n\n4. **What is your data structure?**\n - Structured (relational)\n - Semi-structured (JSON/XML)\n - Unstructured (documents, media)\n - Graph relationships\n - Time-series data\n - Key-value pairs\n\n## Decision Tree\n\n```\nSTART: What is your primary use case?\n│\n├── OLTP (Transactional Applications)\n│ │\n│ ├── Do you need strong ACID guarantees?\n│ │ ├── YES → Do you need horizontal scaling?\n│ │ │ ├── YES → Distributed SQL\n│ │ │ │ ├── CockroachDB (Global, multi-region)\n│ │ │ │ ├── TiDB (MySQL compatibility)\n│ │ │ │ └── Spanner (Google Cloud)\n│ │ │ └── NO → Traditional SQL\n│ │ │ ├── PostgreSQL (Feature-rich, extensions)\n│ │ │ ├── MySQL (Performance, ecosystem)\n│ │ │ └── SQL Server (Microsoft stack)\n│ │ └── NO → Are you primarily key-value access?\n│ │ ├── YES → Key-Value Stores\n│ │ │ ├── Redis (In-memory, caching)\n│ │ │ ├── DynamoDB (AWS managed)\n│ │ │ └── Cassandra (High availability)\n│ │ └── NO → Document Stores\n│ │ ├── MongoDB (General purpose)\n│ │ ├── CouchDB (Sync, replication)\n│ │ └── Amazon DocumentDB (MongoDB compatible)\n│ │\n├── OLAP (Analytics and Reporting)\n│ │\n│ ├── What is your data volume?\n│ │ ├── Small to Medium (\u003c 1TB) → Traditional SQL with optimization\n│ │ │ ├── PostgreSQL with columnar extensions\n│ │ │ ├── MySQL with analytics engine\n│ │ │ └── SQL Server with columnstore\n│ │ ├── Large (1TB - 100TB) → Data Warehouse Solutions\n│ │ │ ├── Snowflake (Cloud-native)\n│ │ │ ├── BigQuery (Google Cloud)\n│ │ │ ├── Redshift (AWS)\n│ │ │ └── Synapse (Azure)\n│ │ └── Very Large (> 100TB) → Big Data Platforms\n│ │ ├── Databricks (Unified analytics)\n│ │ ├── Apache Spark on cloud\n│ │ └── Hadoop ecosystem\n│ │\n├── Real-time Analytics\n│ │\n│ ├── Do you need sub-second query responses?\n│ │ ├── YES → Stream Processing + OLAP\n│ │ │ ├── ClickHouse (Fast analytics)\n│ │ │ ├── Apache Druid (Real-time OLAP)\n│ │ │ ├── Pinot (LinkedIn's real-time DB)\n│ │ │ └── TimescaleDB (Time-series)\n│ │ └── NO → Traditional OLAP solutions\n│ │\n├── Search and Discovery\n│ │\n│ ├── What type of search?\n│ │ ├── Full-text search → Search Engines\n│ │ │ ├── Elasticsearch (Full-featured)\n│ │ │ ├── OpenSearch (AWS fork of ES)\n│ │ │ └── Solr (Apache Lucene-based)\n│ │ ├── Vector/similarity search → Vector Databases\n│ │ │ ├── Pinecone (Managed vector DB)\n│ │ │ ├── Weaviate (Open source)\n│ │ │ ├── Chroma (Embeddings)\n│ │ │ └── PostgreSQL with pgvector\n│ │ └── Faceted search → Search + SQL combination\n│ │\n├── Graph Relationships\n│ │\n│ ├── Do you need complex graph traversals?\n│ │ ├── YES → Graph Databases\n│ │ │ ├── Neo4j (Property graph)\n│ │ │ ├── Amazon Neptune (Multi-model)\n│ │ │ ├── ArangoDB (Multi-model)\n│ │ │ └── TigerGraph (Analytics focused)\n│ │ └── NO → SQL with recursive queries\n│ │ └── PostgreSQL with recursive CTEs\n│ │\n└── Time-series Data\n │\n ├── What is your write volume?\n ├── High (millions/sec) → Specialized Time-series\n │ ├── InfluxDB (Purpose-built)\n │ ├── TimescaleDB (PostgreSQL extension)\n │ ├── Apache Druid (Analytics focused)\n │ └── Prometheus (Monitoring)\n └── Medium → SQL with time-series optimization\n └── PostgreSQL with partitioning\n```\n\n## Database Categories Deep Dive\n\n### Traditional SQL Databases\n\n**PostgreSQL**\n- **Best For**: Complex queries, JSON data, extensions, geospatial\n- **Strengths**: Feature-rich, reliable, strong consistency, extensible\n- **Use Cases**: OLTP, mixed workloads, JSON documents, geospatial applications\n- **Scaling**: Vertical scaling, read replicas, partitioning\n- **When to Choose**: Need SQL features, complex queries, moderate scale\n\n**MySQL**\n- **Best For**: Web applications, read-heavy workloads, simple schema\n- **Strengths**: Performance, replication, large ecosystem\n- **Use Cases**: Web apps, content management, e-commerce\n- **Scaling**: Read replicas, sharding, clustering (MySQL Cluster)\n- **When to Choose**: Simple schema, performance priority, large community\n\n**SQL Server**\n- **Best For**: Microsoft ecosystem, enterprise features, business intelligence\n- **Strengths**: Integration, tooling, enterprise features\n- **Use Cases**: Enterprise applications, .NET applications, BI\n- **Scaling**: Always On availability groups, partitioning\n- **When to Choose**: Microsoft stack, enterprise requirements\n\n### Distributed SQL (NewSQL)\n\n**CockroachDB**\n- **Best For**: Global applications, strong consistency, horizontal scaling\n- **Strengths**: ACID guarantees, automatic scaling, survival\n- **Use Cases**: Multi-region apps, financial services, global SaaS\n- **Trade-offs**: Complex setup, higher latency for global transactions\n- **When to Choose**: Need SQL + global scale + consistency\n\n**TiDB**\n- **Best For**: MySQL compatibility with horizontal scaling\n- **Strengths**: MySQL protocol, HTAP (hybrid), cloud-native\n- **Use Cases**: MySQL migrations, hybrid workloads\n- **When to Choose**: Existing MySQL expertise, need scale\n\n### NoSQL Document Stores\n\n**MongoDB**\n- **Best For**: Flexible schema, rapid development, document-centric data\n- **Strengths**: Developer experience, flexible schema, rich queries\n- **Use Cases**: Content management, catalogs, user profiles, IoT\n- **Scaling**: Automatic sharding, replica sets\n- **When to Choose**: Schema evolution, document structure, rapid development\n\n**CouchDB**\n- **Best For**: Offline-first applications, multi-master replication\n- **Strengths**: HTTP API, replication, conflict resolution\n- **Use Cases**: Mobile apps, distributed systems, offline scenarios\n- **When to Choose**: Need offline capabilities, bi-directional sync\n\n### Key-Value Stores\n\n**Redis**\n- **Best For**: Caching, sessions, real-time applications, pub/sub\n- **Strengths**: Performance, data structures, persistence options\n- **Use Cases**: Caching, leaderboards, real-time analytics, queues\n- **Scaling**: Clustering, sentinel for HA\n- **When to Choose**: High performance, simple data model, caching\n\n**DynamoDB**\n- **Best For**: Serverless applications, predictable performance, AWS ecosystem\n- **Strengths**: Managed, auto-scaling, consistent performance\n- **Use Cases**: Web applications, gaming, IoT, mobile backends\n- **Trade-offs**: Vendor lock-in, limited querying\n- **When to Choose**: AWS ecosystem, serverless, managed solution\n\n### Column-Family Stores\n\n**Cassandra**\n- **Best For**: Write-heavy workloads, high availability, linear scalability\n- **Strengths**: No single point of failure, tunable consistency\n- **Use Cases**: Time-series, IoT, messaging, activity feeds\n- **Trade-offs**: Complex operations, eventual consistency\n- **When to Choose**: High write volume, availability over consistency\n\n**HBase**\n- **Best For**: Big data applications, Hadoop ecosystem\n- **Strengths**: Hadoop integration, consistent reads\n- **Use Cases**: Analytics on big data, time-series at scale\n- **When to Choose**: Hadoop ecosystem, very large datasets\n\n### Graph Databases\n\n**Neo4j**\n- **Best For**: Complex relationships, graph algorithms, traversals\n- **Strengths**: Mature ecosystem, Cypher query language, algorithms\n- **Use Cases**: Social networks, recommendation engines, fraud detection\n- **Trade-offs**: Specialized use case, learning curve\n- **When to Choose**: Relationship-heavy data, graph algorithms\n\n### Time-Series Databases\n\n**InfluxDB**\n- **Best For**: Time-series data, IoT, monitoring, analytics\n- **Strengths**: Purpose-built, efficient storage, query language\n- **Use Cases**: IoT sensors, monitoring, DevOps metrics\n- **When to Choose**: High-volume time-series data\n\n**TimescaleDB**\n- **Best For**: Time-series with SQL familiarity\n- **Strengths**: PostgreSQL compatibility, SQL queries, ecosystem\n- **Use Cases**: Financial data, IoT with complex queries\n- **When to Choose**: Time-series + SQL requirements\n\n### Search Engines\n\n**Elasticsearch**\n- **Best For**: Full-text search, log analysis, real-time search\n- **Strengths**: Powerful search, analytics, ecosystem (ELK stack)\n- **Use Cases**: Search applications, log analysis, monitoring\n- **Trade-offs**: Complex operations, resource intensive\n- **When to Choose**: Advanced search requirements, analytics\n\n### Data Warehouses\n\n**Snowflake**\n- **Best For**: Cloud-native analytics, data sharing, varied workloads\n- **Strengths**: Separation of compute/storage, automatic scaling\n- **Use Cases**: Data warehousing, analytics, data science\n- **When to Choose**: Cloud-native, analytics-focused, multi-cloud\n\n**BigQuery**\n- **Best For**: Serverless analytics, Google ecosystem, machine learning\n- **Strengths**: Serverless, petabyte scale, ML integration\n- **Use Cases**: Analytics, data science, reporting\n- **When to Choose**: Google Cloud, serverless analytics\n\n## Selection Criteria Matrix\n\n| Criterion | SQL | NewSQL | Document | Key-Value | Column-Family | Graph | Time-Series |\n|-----------|-----|--------|----------|-----------|---------------|-------|-------------|\n| ACID Guarantees | ✅ Strong | ✅ Strong | ⚠️ Eventual | ⚠️ Eventual | ⚠️ Tunable | ⚠️ Varies | ⚠️ Varies |\n| Horizontal Scaling | ❌ Limited | ✅ Native | ✅ Native | ✅ Native | ✅ Native | ⚠️ Limited | ✅ Native |\n| Query Flexibility | ✅ High | ✅ High | ⚠️ Moderate | ❌ Low | ❌ Low | ✅ High | ⚠️ Specialized |\n| Schema Flexibility | ❌ Rigid | ❌ Rigid | ✅ High | ✅ High | ⚠️ Moderate | ✅ High | ⚠️ Structured |\n| Performance (Reads) | ⚠️ Good | ⚠️ Good | ✅ Excellent | ✅ Excellent | ✅ Excellent | ⚠️ Good | ✅ Excellent |\n| Performance (Writes) | ⚠️ Good | ⚠️ Good | ✅ Excellent | ✅ Excellent | ✅ Excellent | ⚠️ Good | ✅ Excellent |\n| Operational Complexity | ✅ Low | ❌ High | ⚠️ Moderate | ✅ Low | ❌ High | ⚠️ Moderate | ⚠️ Moderate |\n| Ecosystem Maturity | ✅ Mature | ⚠️ Growing | ✅ Mature | ✅ Mature | ✅ Mature | ✅ Mature | ⚠️ Growing |\n\n## Decision Checklist\n\n### Requirements Analysis\n- [ ] **Data Volume**: Current and projected data size\n- [ ] **Transaction Volume**: Reads per second, writes per second\n- [ ] **Consistency Requirements**: Strong vs eventual consistency needs\n- [ ] **Query Patterns**: Simple lookups vs complex analytics\n- [ ] **Schema Evolution**: How often does schema change?\n- [ ] **Geographic Distribution**: Single region vs global\n- [ ] **Availability Requirements**: Acceptable downtime\n- [ ] **Team Expertise**: Existing knowledge and learning curve\n- [ ] **Budget Constraints**: Licensing, infrastructure, operational costs\n- [ ] **Compliance Requirements**: Data residency, audit trails\n\n### Technical Evaluation\n- [ ] **Performance Testing**: Benchmark with realistic data and queries\n- [ ] **Scalability Testing**: Test scaling limits and patterns\n- [ ] **Failure Scenarios**: Test backup, recovery, and failure handling\n- [ ] **Integration Testing**: APIs, connectors, ecosystem tools\n- [ ] **Migration Path**: How to migrate from current system\n- [ ] **Monitoring and Observability**: Available tooling and metrics\n\n### Operational Considerations\n- [ ] **Management Complexity**: Setup, configuration, maintenance\n- [ ] **Backup and Recovery**: Built-in vs external tools\n- [ ] **Security Features**: Authentication, authorization, encryption\n- [ ] **Upgrade Path**: Version compatibility and upgrade process\n- [ ] **Support Options**: Community vs commercial support\n- [ ] **Lock-in Risk**: Portability and vendor independence\n\n## Common Decision Patterns\n\n### E-commerce Platform\n**Typical Choice**: PostgreSQL or MySQL\n- **Primary Data**: Product catalog, orders, users (structured)\n- **Query Patterns**: OLTP with some analytics\n- **Consistency**: Strong consistency for financial data\n- **Scale**: Moderate with read replicas\n- **Additional**: Redis for caching, Elasticsearch for product search\n\n### IoT/Sensor Data Platform\n**Typical Choice**: TimescaleDB or InfluxDB\n- **Primary Data**: Time-series sensor readings\n- **Query Patterns**: Time-based aggregations, trend analysis\n- **Scale**: High write volume, moderate read volume\n- **Additional**: Kafka for ingestion, PostgreSQL for metadata\n\n### Social Media Application\n**Typical Choice**: Combination approach\n- **User Profiles**: MongoDB (flexible schema)\n- **Relationships**: Neo4j (graph relationships)\n- **Activity Feeds**: Cassandra (high write volume)\n- **Search**: Elasticsearch (content discovery)\n- **Caching**: Redis (sessions, real-time data)\n\n### Analytics Platform\n**Typical Choice**: Snowflake or BigQuery\n- **Primary Use**: Complex analytical queries\n- **Data Volume**: Large (TB to PB scale)\n- **Query Patterns**: Ad-hoc analytics, reporting\n- **Users**: Data analysts, data scientists\n- **Additional**: Data lake (S3/GCS) for raw data storage\n\n### Global SaaS Application\n**Typical Choice**: CockroachDB or DynamoDB\n- **Requirements**: Multi-region, strong consistency\n- **Scale**: Global user base\n- **Compliance**: Data residency requirements\n- **Availability**: High availability across regions\n\n## Migration Strategies\n\n### From Monolithic to Distributed\n1. **Assessment**: Identify scaling bottlenecks\n2. **Data Partitioning**: Plan how to split data\n3. **Gradual Migration**: Move non-critical data first\n4. **Dual Writes**: Run both systems temporarily\n5. **Validation**: Verify data consistency\n6. **Cutover**: Switch reads and writes gradually\n\n### Technology Stack Evolution\n1. **Start Simple**: Begin with PostgreSQL or MySQL\n2. **Identify Bottlenecks**: Monitor performance and scaling issues\n3. **Selective Scaling**: Move specific workloads to specialized databases\n4. **Polyglot Persistence**: Use multiple databases for different use cases\n5. **Service Boundaries**: Align database choice with service boundaries\n\n## Conclusion\n\nDatabase selection should be driven by:\n\n1. **Specific Use Case Requirements**: Not all applications need the same database\n2. **Data Characteristics**: Structure, volume, and access patterns matter\n3. **Non-functional Requirements**: Consistency, availability, performance targets\n4. **Team and Organizational Factors**: Expertise, operational capacity, budget\n5. **Evolution Path**: How requirements and scale will change over time\n\nThe best database choice is often not a single technology, but a combination of databases that each excel at their specific use case within your application architecture.","content_type":"text/markdown; charset=utf-8","language":"markdown","size":16344,"content_sha256":"18d1a914c6097e6e7e8fd38da4e5847c24191e24063efb3d39f83526079f20ee"},{"filename":"references/database-design-reference.md","content":"# database-designer reference\n\n## Database Design Principles\n\n### Normalization Forms\n\n#### First Normal Form (1NF)\n- **Atomic Values**: Each column contains indivisible values\n- **Unique Column Names**: No duplicate column names within a table\n- **Uniform Data Types**: Each column contains the same type of data\n- **Row Uniqueness**: No duplicate rows in the table\n\n**Example Violation:**\n```sql\n-- BAD: Multiple phone numbers in one column\nCREATE TABLE contacts (\n id INT PRIMARY KEY,\n name VARCHAR(100),\n phones VARCHAR(200) -- \"123-456-7890, 098-765-4321\"\n);\n\n-- GOOD: Separate table for phone numbers\nCREATE TABLE contacts (\n id INT PRIMARY KEY,\n name VARCHAR(100)\n);\n\nCREATE TABLE contact_phones (\n id INT PRIMARY KEY,\n contact_id INT REFERENCES contacts(id),\n phone_number VARCHAR(20),\n phone_type VARCHAR(10)\n);\n```\n\n#### Second Normal Form (2NF)\n- **1NF Compliance**: Must satisfy First Normal Form\n- **Full Functional Dependency**: Non-key attributes depend on the entire primary key\n- **Partial Dependency Elimination**: Remove attributes that depend on part of a composite key\n\n**Example Violation:**\n```sql\n-- BAD: Student course table with partial dependencies\nCREATE TABLE student_courses (\n student_id INT,\n course_id INT,\n student_name VARCHAR(100), -- Depends only on student_id\n course_name VARCHAR(100), -- Depends only on course_id\n grade CHAR(1),\n PRIMARY KEY (student_id, course_id)\n);\n\n-- GOOD: Separate tables eliminate partial dependencies\nCREATE TABLE students (\n id INT PRIMARY KEY,\n name VARCHAR(100)\n);\n\nCREATE TABLE courses (\n id INT PRIMARY KEY,\n name VARCHAR(100)\n);\n\nCREATE TABLE enrollments (\n student_id INT REFERENCES students(id),\n course_id INT REFERENCES courses(id),\n grade CHAR(1),\n PRIMARY KEY (student_id, course_id)\n);\n```\n\n#### Third Normal Form (3NF)\n- **2NF Compliance**: Must satisfy Second Normal Form\n- **Transitive Dependency Elimination**: Non-key attributes should not depend on other non-key attributes\n- **Direct Dependency**: Non-key attributes depend directly on the primary key\n\n**Example Violation:**\n```sql\n-- BAD: Employee table with transitive dependency\nCREATE TABLE employees (\n id INT PRIMARY KEY,\n name VARCHAR(100),\n department_id INT,\n department_name VARCHAR(100), -- Depends on department_id, not employee id\n department_budget DECIMAL(10,2) -- Transitive dependency\n);\n\n-- GOOD: Separate department information\nCREATE TABLE departments (\n id INT PRIMARY KEY,\n name VARCHAR(100),\n budget DECIMAL(10,2)\n);\n\nCREATE TABLE employees (\n id INT PRIMARY KEY,\n name VARCHAR(100),\n department_id INT REFERENCES departments(id)\n);\n```\n\n#### Boyce-Codd Normal Form (BCNF)\n- **3NF Compliance**: Must satisfy Third Normal Form\n- **Determinant Key Rule**: Every determinant must be a candidate key\n- **Stricter 3NF**: Handles anomalies not covered by 3NF\n\n### Denormalization Strategies\n\n#### When to Denormalize\n1. **Read-Heavy Workloads**: High query frequency with acceptable write trade-offs\n2. **Performance Bottlenecks**: Join operations causing significant latency\n3. **Aggregation Needs**: Frequent calculation of derived values\n4. **Caching Requirements**: Pre-computed results for common queries\n\n#### Common Denormalization Patterns\n\n**Redundant Storage**\n```sql\n-- Store calculated values to avoid expensive joins\nCREATE TABLE orders (\n id INT PRIMARY KEY,\n customer_id INT REFERENCES customers(id),\n customer_name VARCHAR(100), -- Denormalized from customers table\n order_total DECIMAL(10,2), -- Denormalized calculation\n created_at TIMESTAMP\n);\n```\n\n**Materialized Aggregates**\n```sql\n-- Pre-computed summary tables\nCREATE TABLE customer_statistics (\n customer_id INT PRIMARY KEY,\n total_orders INT,\n lifetime_value DECIMAL(12,2),\n last_order_date DATE,\n updated_at TIMESTAMP\n);\n```\n\n## Index Optimization Strategies\n\n### B-Tree Indexes\n- **Default Choice**: Best for range queries, sorting, and equality matches\n- **Column Order**: Most selective columns first for composite indexes\n- **Prefix Matching**: Supports leading column subset queries\n- **Maintenance Cost**: Balanced tree structure with logarithmic operations\n\n### Hash Indexes\n- **Equality Queries**: Optimal for exact match lookups\n- **Memory Efficiency**: Constant-time access for single-value queries\n- **Range Limitations**: Cannot support range or partial matches\n- **Use Cases**: Primary keys, unique constraints, cache keys\n\n### Composite Indexes\n```sql\n-- Query pattern determines optimal column order\n-- Query: WHERE status = 'active' AND created_date > '2023-01-01' ORDER BY priority DESC\nCREATE INDEX idx_task_status_date_priority \nON tasks (status, created_date, priority DESC);\n\n-- Query: WHERE user_id = 123 AND category IN ('A', 'B') AND date_field BETWEEN '...' AND '...'\nCREATE INDEX idx_user_category_date \nON user_activities (user_id, category, date_field);\n```\n\n### Covering Indexes\n```sql\n-- Include additional columns to avoid table lookups\nCREATE INDEX idx_user_email_covering \nON users (email) \nINCLUDE (first_name, last_name, status);\n\n-- Query can be satisfied entirely from the index\n-- SELECT first_name, last_name, status FROM users WHERE email = '[email protected]';\n```\n\n### Partial Indexes\n```sql\n-- Index only relevant subset of data\nCREATE INDEX idx_active_users_email \nON users (email) \nWHERE status = 'active';\n\n-- Index for recent orders only\nCREATE INDEX idx_recent_orders_customer \nON orders (customer_id, created_at) \nWHERE created_at > CURRENT_DATE - INTERVAL '30 days';\n```\n\n## Query Analysis & Optimization\n\n### Query Patterns Recognition\n1. **Equality Filters**: Single-column B-tree indexes\n2. **Range Queries**: B-tree with proper column ordering\n3. **Text Search**: Full-text indexes or trigram indexes\n4. **Join Operations**: Foreign key indexes on both sides\n5. **Sorting Requirements**: Indexes matching ORDER BY clauses\n\n### Index Selection Algorithm\n```\n1. Identify WHERE clause columns\n2. Determine most selective columns first\n3. Consider JOIN conditions\n4. Include ORDER BY columns if possible\n5. Evaluate covering index opportunities\n6. Check for existing overlapping indexes\n```\n\n## Data Modeling Patterns\n\n### Star Schema (Data Warehousing)\n```sql\n-- Central fact table\nCREATE TABLE sales_facts (\n sale_id BIGINT PRIMARY KEY,\n product_id INT REFERENCES products(id),\n customer_id INT REFERENCES customers(id),\n date_id INT REFERENCES date_dimension(id),\n store_id INT REFERENCES stores(id),\n quantity INT,\n unit_price DECIMAL(8,2),\n total_amount DECIMAL(10,2)\n);\n\n-- Dimension tables\nCREATE TABLE date_dimension (\n id INT PRIMARY KEY,\n date_value DATE,\n year INT,\n quarter INT,\n month INT,\n day_of_week INT,\n is_weekend BOOLEAN\n);\n```\n\n### Snowflake Schema\n```sql\n-- Normalized dimension tables\nCREATE TABLE products (\n id INT PRIMARY KEY,\n name VARCHAR(200),\n category_id INT REFERENCES product_categories(id),\n brand_id INT REFERENCES brands(id)\n);\n\nCREATE TABLE product_categories (\n id INT PRIMARY KEY,\n name VARCHAR(100),\n parent_category_id INT REFERENCES product_categories(id)\n);\n```\n\n### Document Model (JSON Storage)\n```sql\n-- Flexible document storage with indexing\nCREATE TABLE documents (\n id UUID PRIMARY KEY,\n document_type VARCHAR(50),\n data JSONB,\n created_at TIMESTAMP DEFAULT NOW(),\n updated_at TIMESTAMP DEFAULT NOW()\n);\n\n-- Index on JSON properties\nCREATE INDEX idx_documents_user_id \nON documents USING GIN ((data->>'user_id'));\n\nCREATE INDEX idx_documents_status \nON documents ((data->>'status')) \nWHERE document_type = 'order';\n```\n\n### Graph Data Patterns\n```sql\n-- Adjacency list for hierarchical data\nCREATE TABLE categories (\n id INT PRIMARY KEY,\n name VARCHAR(100),\n parent_id INT REFERENCES categories(id),\n level INT,\n path VARCHAR(500) -- Materialized path: \"/1/5/12/\"\n);\n\n-- Many-to-many relationships\nCREATE TABLE relationships (\n id UUID PRIMARY KEY,\n from_entity_id UUID,\n to_entity_id UUID,\n relationship_type VARCHAR(50),\n created_at TIMESTAMP,\n INDEX (from_entity_id, relationship_type),\n INDEX (to_entity_id, relationship_type)\n);\n```\n\n## Migration Strategies\n\n### Zero-Downtime Migration (Expand-Contract Pattern)\n\n**Phase 1: Expand**\n```sql\n-- Add new column without constraints\nALTER TABLE users ADD COLUMN new_email VARCHAR(255);\n\n-- Backfill data in batches\nUPDATE users SET new_email = email WHERE id BETWEEN 1 AND 1000;\n-- Continue in batches...\n\n-- Add constraints after backfill\nALTER TABLE users ADD CONSTRAINT users_new_email_unique UNIQUE (new_email);\nALTER TABLE users ALTER COLUMN new_email SET NOT NULL;\n```\n\n**Phase 2: Contract**\n```sql\n-- Update application to use new column\n-- Deploy application changes\n-- Verify new column is being used\n\n-- Remove old column\nALTER TABLE users DROP COLUMN email;\n-- Rename new column\nALTER TABLE users RENAME COLUMN new_email TO email;\n```\n\n### Data Type Changes\n```sql\n-- Safe string to integer conversion\nALTER TABLE products ADD COLUMN sku_number INTEGER;\nUPDATE products SET sku_number = CAST(sku AS INTEGER) WHERE sku ~ '^[0-9]+

Database Designer - POWERFUL Tier Skill Overview A comprehensive database design skill that provides expert-level analysis, optimization, and migration capabilities for modern database systems. This skill combines theoretical principles with practical tools to help architects and developers create scalable, performant, and maintainable database schemas. Core Competencies Schema Design & Analysis - Normalization Analysis : Automated detection of normalization levels (1NF through BCNF) - Denormalization Strategy : Smart recommendations for performance optimization - Data Type Optimization : Ide…

;\n-- Validate conversion success before dropping old column\n```\n\n## Partitioning Strategies\n\n### Horizontal Partitioning (Sharding)\n```sql\n-- Range partitioning by date\nCREATE TABLE sales_2023 PARTITION OF sales\nFOR VALUES FROM ('2023-01-01') TO ('2024-01-01');\n\nCREATE TABLE sales_2024 PARTITION OF sales\nFOR VALUES FROM ('2024-01-01') TO ('2025-01-01');\n\n-- Hash partitioning by user_id\nCREATE TABLE user_data_0 PARTITION OF user_data\nFOR VALUES WITH (MODULUS 4, REMAINDER 0);\n\nCREATE TABLE user_data_1 PARTITION OF user_data\nFOR VALUES WITH (MODULUS 4, REMAINDER 1);\n```\n\n### Vertical Partitioning\n```sql\n-- Separate frequently accessed columns\nCREATE TABLE users_core (\n id INT PRIMARY KEY,\n email VARCHAR(255),\n status VARCHAR(20),\n created_at TIMESTAMP\n);\n\n-- Less frequently accessed profile data\nCREATE TABLE users_profile (\n user_id INT PRIMARY KEY REFERENCES users_core(id),\n bio TEXT,\n preferences JSONB,\n last_login TIMESTAMP\n);\n```\n\n## Connection Management\n\n### Connection Pooling\n- **Pool Size**: CPU cores × 2 + effective spindle count\n- **Connection Lifetime**: Rotate connections to prevent resource leaks\n- **Timeout Settings**: Connection, idle, and query timeouts\n- **Health Checks**: Regular connection validation\n\n### Read Replicas Strategy\n```sql\n-- Write queries to primary\nINSERT INTO users (email, name) VALUES ('[email protected]', 'John Doe');\n\n-- Read queries to replicas (with appropriate read preference)\nSELECT * FROM users WHERE status = 'active'; -- Route to read replica\n\n-- Consistent reads when required\nSELECT * FROM users WHERE id = LAST_INSERT_ID(); -- Route to primary\n```\n\n## Caching Layers\n\n### Cache-Aside Pattern\n```python\ndef get_user(user_id):\n # Try cache first\n user = cache.get(f\"user:{user_id}\")\n if user is None:\n # Cache miss - query database\n user = db.query(\"SELECT * FROM users WHERE id = %s\", user_id)\n # Store in cache\n cache.set(f\"user:{user_id}\", user, ttl=3600)\n return user\n```\n\n### Write-Through Cache\n- **Consistency**: Always keep cache and database in sync\n- **Write Latency**: Higher due to dual writes\n- **Data Safety**: No data loss on cache failures\n\n### Cache Invalidation Strategies\n1. **TTL-Based**: Time-based expiration\n2. **Event-Driven**: Invalidate on data changes\n3. **Version-Based**: Use version numbers for consistency\n4. **Tag-Based**: Group related cache entries\n\n## Database Selection Guide\n\n### SQL Databases\n**PostgreSQL**\n- **Strengths**: ACID compliance, complex queries, JSON support, extensibility\n- **Use Cases**: OLTP applications, data warehousing, geospatial data\n- **Scale**: Vertical scaling with read replicas\n\n**MySQL**\n- **Strengths**: Performance, replication, wide ecosystem support\n- **Use Cases**: Web applications, content management, e-commerce\n- **Scale**: Horizontal scaling through sharding\n\n### NoSQL Databases\n\n**Document Stores (MongoDB, CouchDB)**\n- **Strengths**: Flexible schema, horizontal scaling, developer productivity\n- **Use Cases**: Content management, catalogs, user profiles\n- **Trade-offs**: Eventual consistency, complex queries limitations\n\n**Key-Value Stores (Redis, DynamoDB)**\n- **Strengths**: High performance, simple model, excellent caching\n- **Use Cases**: Session storage, real-time analytics, gaming leaderboards\n- **Trade-offs**: Limited query capabilities, data modeling constraints\n\n**Column-Family (Cassandra, HBase)**\n- **Strengths**: Write-heavy workloads, linear scalability, fault tolerance\n- **Use Cases**: Time-series data, IoT applications, messaging systems\n- **Trade-offs**: Query flexibility, consistency model complexity\n\n**Graph Databases (Neo4j, Amazon Neptune)**\n- **Strengths**: Relationship queries, pattern matching, recommendation engines\n- **Use Cases**: Social networks, fraud detection, knowledge graphs\n- **Trade-offs**: Specialized use cases, learning curve\n\n### NewSQL Databases\n**Distributed SQL (CockroachDB, TiDB, Spanner)**\n- **Strengths**: SQL compatibility with horizontal scaling\n- **Use Cases**: Global applications requiring ACID guarantees\n- **Trade-offs**: Complexity, latency for distributed transactions\n\n## Tools & Scripts\n\n### Schema Analyzer\n- **Input**: SQL DDL files, JSON schema definitions\n- **Analysis**: Normalization compliance, constraint validation, naming conventions\n- **Output**: Analysis report, Mermaid ERD, improvement recommendations\n\n### Index Optimizer\n- **Input**: Schema definition, query patterns\n- **Analysis**: Missing indexes, redundancy detection, selectivity estimation\n- **Output**: Index recommendations, CREATE INDEX statements, performance projections\n\n### Migration Generator\n- **Input**: Current and target schemas\n- **Analysis**: Schema differences, dependency resolution, risk assessment\n- **Output**: Migration scripts, rollback plans, validation queries\n","content_type":"text/markdown; charset=utf-8","language":"markdown","size":14014,"content_sha256":"b189de7137b7a1acd9408f3324ca5fa0e1a1bbdc53bf82f2eeda0911239f0370"},{"filename":"references/index_strategy_patterns.md","content":"# Index Strategy Patterns\n\n## Overview\n\nDatabase indexes are critical for query performance, but they come with trade-offs. This guide covers proven patterns for index design, optimization strategies, and common pitfalls to avoid.\n\n## Index Types and Use Cases\n\n### B-Tree Indexes (Default)\n\n**Best For:**\n- Equality queries (`WHERE column = value`)\n- Range queries (`WHERE column BETWEEN x AND y`)\n- Sorting (`ORDER BY column`)\n- Pattern matching with leading wildcards (`WHERE column LIKE 'prefix%'`)\n\n**Characteristics:**\n- Logarithmic lookup time O(log n)\n- Supports partial matches on composite indexes\n- Most versatile index type\n\n**Example:**\n```sql\n-- Single column B-tree index\nCREATE INDEX idx_customers_email ON customers (email);\n\n-- Composite B-tree index\nCREATE INDEX idx_orders_customer_date ON orders (customer_id, order_date);\n```\n\n### Hash Indexes\n\n**Best For:**\n- Exact equality matches only\n- High-cardinality columns\n- Primary key lookups\n\n**Characteristics:**\n- Constant lookup time O(1) for exact matches\n- Cannot support range queries or sorting\n- Memory-efficient for equality operations\n\n**Example:**\n```sql\n-- Hash index for exact lookups (PostgreSQL)\nCREATE INDEX idx_users_id_hash ON users USING HASH (user_id);\n```\n\n### Partial Indexes\n\n**Best For:**\n- Filtering on subset of data\n- Reducing index size and maintenance overhead\n- Query patterns that consistently use specific filters\n\n**Example:**\n```sql\n-- Index only active users\nCREATE INDEX idx_active_users_email \nON users (email) \nWHERE status = 'active';\n\n-- Index recent orders only\nCREATE INDEX idx_recent_orders \nON orders (customer_id, created_at) \nWHERE created_at > CURRENT_DATE - INTERVAL '90 days';\n\n-- Index non-null values only\nCREATE INDEX idx_customers_phone \nON customers (phone_number) \nWHERE phone_number IS NOT NULL;\n```\n\n### Covering Indexes\n\n**Best For:**\n- Eliminating table lookups for SELECT queries\n- Frequently accessed column combinations\n- Read-heavy workloads\n\n**Example:**\n```sql\n-- Covering index with INCLUDE clause (SQL Server/PostgreSQL)\nCREATE INDEX idx_orders_customer_covering \nON orders (customer_id, order_date) \nINCLUDE (order_total, status);\n\n-- Query can be satisfied entirely from index:\n-- SELECT order_total, status FROM orders \n-- WHERE customer_id = 123 AND order_date > '2024-01-01';\n```\n\n### Functional/Expression Indexes\n\n**Best For:**\n- Queries on transformed column values\n- Case-insensitive searches\n- Complex calculations\n\n**Example:**\n```sql\n-- Case-insensitive email searches\nCREATE INDEX idx_users_email_lower \nON users (LOWER(email));\n\n-- Date part extraction\nCREATE INDEX idx_orders_month \nON orders (EXTRACT(MONTH FROM order_date));\n\n-- JSON field indexing\nCREATE INDEX idx_users_preferences_theme \nON users ((preferences->>'theme'));\n```\n\n## Composite Index Design Patterns\n\n### Column Ordering Strategy\n\n**Rule: Most Selective First**\n```sql\n-- Query: WHERE status = 'active' AND city = 'New York' AND age > 25\n-- Assume: status has 3 values, city has 100 values, age has 80 values\n\n-- GOOD: Most selective column first\nCREATE INDEX idx_users_city_age_status ON users (city, age, status);\n\n-- BAD: Least selective first\nCREATE INDEX idx_users_status_city_age ON users (status, city, age);\n```\n\n**Selectivity Calculation:**\n```sql\n-- Estimate selectivity for each column\nSELECT \n 'status' as column_name,\n COUNT(DISTINCT status)::float / COUNT(*) as selectivity\nFROM users\nUNION ALL\nSELECT \n 'city' as column_name,\n COUNT(DISTINCT city)::float / COUNT(*) as selectivity\nFROM users\nUNION ALL\nSELECT \n 'age' as column_name,\n COUNT(DISTINCT age)::float / COUNT(*) as selectivity\nFROM users;\n```\n\n### Query Pattern Matching\n\n**Pattern 1: Equality + Range**\n```sql\n-- Query: WHERE customer_id = 123 AND order_date BETWEEN '2024-01-01' AND '2024-03-31'\nCREATE INDEX idx_orders_customer_date ON orders (customer_id, order_date);\n```\n\n**Pattern 2: Multiple Equality Conditions**\n```sql\n-- Query: WHERE status = 'active' AND category = 'premium' AND region = 'US'\nCREATE INDEX idx_users_status_category_region ON users (status, category, region);\n```\n\n**Pattern 3: Equality + Sorting**\n```sql\n-- Query: WHERE category = 'electronics' ORDER BY price DESC, created_at DESC\nCREATE INDEX idx_products_category_price_date ON products (category, price DESC, created_at DESC);\n```\n\n### Prefix Optimization\n\n**Efficient Prefix Usage:**\n```sql\n-- Index supports all these queries efficiently:\nCREATE INDEX idx_users_lastname_firstname_email ON users (last_name, first_name, email);\n\n-- ✓ Uses index: WHERE last_name = 'Smith'\n-- ✓ Uses index: WHERE last_name = 'Smith' AND first_name = 'John' \n-- ✓ Uses index: WHERE last_name = 'Smith' AND first_name = 'John' AND email = 'john@...'\n-- ✗ Cannot use index: WHERE first_name = 'John'\n-- ✗ Cannot use index: WHERE email = 'john@...'\n```\n\n## Performance Optimization Patterns\n\n### Index Intersection vs Composite Indexes\n\n**Scenario: Multiple single-column indexes**\n```sql\nCREATE INDEX idx_users_age ON users (age);\nCREATE INDEX idx_users_city ON users (city);\nCREATE INDEX idx_users_status ON users (status);\n\n-- Query: WHERE age > 25 AND city = 'NYC' AND status = 'active'\n-- Database may use index intersection (combining multiple indexes)\n-- Performance varies by database engine and data distribution\n```\n\n**Better: Purpose-built composite index**\n```sql\n-- More efficient for the specific query pattern\nCREATE INDEX idx_users_city_status_age ON users (city, status, age);\n```\n\n### Index Size vs Performance Trade-off\n\n**Wide Indexes (Many Columns):**\n```sql\n-- Pros: Covers many query patterns, excellent for covering queries\n-- Cons: Large index size, slower writes, more memory usage\nCREATE INDEX idx_orders_comprehensive \nON orders (customer_id, order_date, status, total_amount, shipping_method, created_at)\nINCLUDE (order_notes, billing_address);\n```\n\n**Narrow Indexes (Few Columns):**\n```sql\n-- Pros: Smaller size, faster writes, less memory\n-- Cons: May not cover all query patterns\nCREATE INDEX idx_orders_customer_date ON orders (customer_id, order_date);\nCREATE INDEX idx_orders_status ON orders (status);\n```\n\n### Maintenance Optimization\n\n**Regular Index Analysis:**\n```sql\n-- PostgreSQL: Check index usage statistics\nSELECT \n schemaname,\n tablename,\n indexname,\n idx_scan as index_scans,\n idx_tup_read as tuples_read,\n idx_tup_fetch as tuples_fetched\nFROM pg_stat_user_indexes\nWHERE idx_scan = 0 -- Potentially unused indexes\nORDER BY schemaname, tablename;\n\n-- Check index size\nSELECT \n indexname,\n pg_size_pretty(pg_relation_size(indexname::regclass)) as index_size\nFROM pg_indexes\nWHERE schemaname = 'public'\nORDER BY pg_relation_size(indexname::regclass) DESC;\n```\n\n## Common Anti-Patterns\n\n### 1. Over-Indexing\n\n**Problem:**\n```sql\n-- Too many similar indexes\nCREATE INDEX idx_orders_customer ON orders (customer_id);\nCREATE INDEX idx_orders_customer_date ON orders (customer_id, order_date); \nCREATE INDEX idx_orders_customer_status ON orders (customer_id, status);\nCREATE INDEX idx_orders_customer_date_status ON orders (customer_id, order_date, status);\n```\n\n**Solution:**\n```sql\n-- One well-designed composite index can often replace several\nCREATE INDEX idx_orders_customer_date_status ON orders (customer_id, order_date, status);\n-- Drop redundant indexes: idx_orders_customer, idx_orders_customer_date, idx_orders_customer_status\n```\n\n### 2. Wrong Column Order\n\n**Problem:**\n```sql\n-- Query: WHERE active = true AND user_type = 'premium' AND city = 'Chicago'\n-- Bad order: boolean first (lowest selectivity)\nCREATE INDEX idx_users_active_type_city ON users (active, user_type, city);\n```\n\n**Solution:**\n```sql\n-- Good order: most selective first\nCREATE INDEX idx_users_city_type_active ON users (city, user_type, active);\n```\n\n### 3. Ignoring Query Patterns\n\n**Problem:**\n```sql\n-- Index doesn't match common query patterns\nCREATE INDEX idx_products_name ON products (product_name);\n\n-- But queries are: WHERE category = 'electronics' AND price BETWEEN 100 AND 500\n-- Index is not helpful for these queries\n```\n\n**Solution:**\n```sql\n-- Match actual query patterns\nCREATE INDEX idx_products_category_price ON products (category, price);\n```\n\n### 4. Function in WHERE Without Functional Index\n\n**Problem:**\n```sql\n-- Query uses function but no functional index\nSELECT * FROM users WHERE LOWER(email) = '[email protected]';\n-- Regular index on email won't help\n```\n\n**Solution:**\n```sql\n-- Create functional index\nCREATE INDEX idx_users_email_lower ON users (LOWER(email));\n```\n\n## Advanced Patterns\n\n### Multi-Column Statistics\n\n**When Columns Are Correlated:**\n```sql\n-- If city and state are highly correlated, create extended statistics\nCREATE STATISTICS stats_address_correlation ON city, state FROM addresses;\nANALYZE addresses;\n\n-- Helps query planner make better decisions for:\n-- WHERE city = 'New York' AND state = 'NY'\n```\n\n### Conditional Indexes for Data Lifecycle\n\n**Pattern: Different indexes for different data ages**\n```sql\n-- Hot data (recent orders) - optimized for OLTP\nCREATE INDEX idx_orders_hot_customer_date \nON orders (customer_id, order_date DESC) \nWHERE order_date > CURRENT_DATE - INTERVAL '30 days';\n\n-- Warm data (older orders) - optimized for analytics \nCREATE INDEX idx_orders_warm_date_total \nON orders (order_date, total_amount) \nWHERE order_date \u003c= CURRENT_DATE - INTERVAL '30 days' \n AND order_date > CURRENT_DATE - INTERVAL '1 year';\n\n-- Cold data (archived orders) - minimal indexing\nCREATE INDEX idx_orders_cold_date \nON orders (order_date) \nWHERE order_date \u003c= CURRENT_DATE - INTERVAL '1 year';\n```\n\n### Index-Only Scan Optimization\n\n**Design indexes to avoid table access:**\n```sql\n-- Query: SELECT order_id, total_amount, status FROM orders WHERE customer_id = ?\nCREATE INDEX idx_orders_customer_covering \nON orders (customer_id) \nINCLUDE (order_id, total_amount, status);\n\n-- Or as composite index (if database doesn't support INCLUDE)\nCREATE INDEX idx_orders_customer_covering \nON orders (customer_id, order_id, total_amount, status);\n```\n\n## Index Monitoring and Maintenance\n\n### Performance Monitoring Queries\n\n**Find slow queries that might benefit from indexes:**\n```sql\n-- PostgreSQL: Find queries with high cost\nSELECT \n query,\n calls,\n total_time,\n mean_time,\n rows\nFROM pg_stat_statements\nWHERE mean_time > 1000 -- Queries taking > 1 second\nORDER BY mean_time DESC;\n```\n\n**Identify missing indexes:**\n```sql\n-- Look for sequential scans on large tables\nSELECT \n schemaname,\n tablename,\n seq_scan,\n seq_tup_read,\n idx_scan,\n n_tup_ins + n_tup_upd + n_tup_del as write_activity\nFROM pg_stat_user_tables\nWHERE seq_scan > 100 \n AND seq_tup_read > 100000 -- Large sequential scans\n AND (idx_scan = 0 OR seq_scan > idx_scan * 2)\nORDER BY seq_tup_read DESC;\n```\n\n### Index Maintenance Schedule\n\n**Regular Maintenance Tasks:**\n```sql\n-- Rebuild fragmented indexes (SQL Server)\nALTER INDEX ALL ON orders REBUILD;\n\n-- Update statistics (PostgreSQL)\nANALYZE orders;\n\n-- Check for unused indexes monthly\nSELECT * FROM pg_stat_user_indexes WHERE idx_scan = 0;\n```\n\n## Conclusion\n\nEffective index strategy requires:\n\n1. **Understanding Query Patterns**: Analyze actual application queries, not theoretical scenarios\n2. **Measuring Performance**: Use query execution plans and timing to validate index effectiveness \n3. **Balancing Trade-offs**: More indexes improve reads but slow writes and increase storage\n4. **Regular Maintenance**: Monitor index usage and performance, remove unused indexes\n5. **Iterative Improvement**: Start with essential indexes, add and optimize based on real usage\n\nThe goal is not to index every possible query pattern, but to create a focused set of indexes that provide maximum benefit for your application's specific workload while minimizing maintenance overhead.","content_type":"text/markdown; charset=utf-8","language":"markdown","size":11831,"content_sha256":"8d85e83b63c7977ac1cbfd70b2d42ac6fbdbfbeb9219d1dee98f0fafdafaabe8"},{"filename":"references/normalization_guide.md","content":"# Database Normalization Guide\n\n## Overview\n\nDatabase normalization is the process of organizing data to minimize redundancy and dependency issues. It involves decomposing tables to eliminate data anomalies and improve data integrity.\n\n## Normal Forms\n\n### First Normal Form (1NF)\n\n**Requirements:**\n- Each column contains atomic (indivisible) values\n- Each column contains values of the same type\n- Each column has a unique name\n- The order of data storage doesn't matter\n\n**Violations and Solutions:**\n\n**Problem: Multiple values in single column**\n```sql\n-- BAD: Multiple phone numbers in one column\nCREATE TABLE customers (\n id INT PRIMARY KEY,\n name VARCHAR(100),\n phones VARCHAR(500) -- \"555-1234, 555-5678, 555-9012\"\n);\n\n-- GOOD: Separate table for multiple phones\nCREATE TABLE customers (\n id INT PRIMARY KEY,\n name VARCHAR(100)\n);\n\nCREATE TABLE customer_phones (\n id INT PRIMARY KEY,\n customer_id INT REFERENCES customers(id),\n phone VARCHAR(20),\n phone_type VARCHAR(10) -- 'mobile', 'home', 'work'\n);\n```\n\n**Problem: Repeating groups**\n```sql\n-- BAD: Repeating column patterns\nCREATE TABLE orders (\n order_id INT PRIMARY KEY,\n customer_id INT,\n item1_name VARCHAR(100),\n item1_qty INT,\n item1_price DECIMAL(8,2),\n item2_name VARCHAR(100),\n item2_qty INT,\n item2_price DECIMAL(8,2),\n item3_name VARCHAR(100),\n item3_qty INT,\n item3_price DECIMAL(8,2)\n);\n\n-- GOOD: Separate table for order items\nCREATE TABLE orders (\n order_id INT PRIMARY KEY,\n customer_id INT,\n order_date DATE\n);\n\nCREATE TABLE order_items (\n id INT PRIMARY KEY,\n order_id INT REFERENCES orders(order_id),\n item_name VARCHAR(100),\n quantity INT,\n unit_price DECIMAL(8,2)\n);\n```\n\n### Second Normal Form (2NF)\n\n**Requirements:**\n- Must be in 1NF\n- All non-key attributes must be fully functionally dependent on the primary key\n- No partial dependencies (applies only to tables with composite primary keys)\n\n**Violations and Solutions:**\n\n**Problem: Partial dependency on composite key**\n```sql\n-- BAD: Student course enrollment with partial dependencies\nCREATE TABLE student_courses (\n student_id INT,\n course_id INT,\n student_name VARCHAR(100), -- Depends only on student_id\n student_major VARCHAR(50), -- Depends only on student_id\n course_title VARCHAR(200), -- Depends only on course_id\n course_credits INT, -- Depends only on course_id\n grade CHAR(2), -- Depends on both student_id AND course_id\n PRIMARY KEY (student_id, course_id)\n);\n\n-- GOOD: Separate tables eliminate partial dependencies\nCREATE TABLE students (\n student_id INT PRIMARY KEY,\n student_name VARCHAR(100),\n student_major VARCHAR(50)\n);\n\nCREATE TABLE courses (\n course_id INT PRIMARY KEY,\n course_title VARCHAR(200),\n course_credits INT\n);\n\nCREATE TABLE enrollments (\n student_id INT,\n course_id INT,\n grade CHAR(2),\n enrollment_date DATE,\n PRIMARY KEY (student_id, course_id),\n FOREIGN KEY (student_id) REFERENCES students(student_id),\n FOREIGN KEY (course_id) REFERENCES courses(course_id)\n);\n```\n\n### Third Normal Form (3NF)\n\n**Requirements:**\n- Must be in 2NF\n- No transitive dependencies (non-key attributes should not depend on other non-key attributes)\n- All non-key attributes must depend directly on the primary key\n\n**Violations and Solutions:**\n\n**Problem: Transitive dependency**\n```sql\n-- BAD: Employee table with transitive dependency\nCREATE TABLE employees (\n employee_id INT PRIMARY KEY,\n employee_name VARCHAR(100),\n department_id INT,\n department_name VARCHAR(100), -- Depends on department_id, not employee_id\n department_location VARCHAR(100), -- Transitive dependency through department_id\n department_budget DECIMAL(10,2), -- Transitive dependency through department_id\n salary DECIMAL(8,2)\n);\n\n-- GOOD: Separate department information\nCREATE TABLE departments (\n department_id INT PRIMARY KEY,\n department_name VARCHAR(100),\n department_location VARCHAR(100),\n department_budget DECIMAL(10,2)\n);\n\nCREATE TABLE employees (\n employee_id INT PRIMARY KEY,\n employee_name VARCHAR(100),\n department_id INT,\n salary DECIMAL(8,2),\n FOREIGN KEY (department_id) REFERENCES departments(department_id)\n);\n```\n\n### Boyce-Codd Normal Form (BCNF)\n\n**Requirements:**\n- Must be in 3NF\n- Every determinant must be a candidate key\n- Stricter than 3NF - handles cases where 3NF doesn't eliminate all anomalies\n\n**Violations and Solutions:**\n\n**Problem: Determinant that's not a candidate key**\n```sql\n-- BAD: Student advisor relationship with BCNF violation\n-- Assumption: Each student has one advisor per subject, \n-- each advisor teaches only one subject, but can advise multiple students\nCREATE TABLE student_advisor (\n student_id INT,\n subject VARCHAR(50),\n advisor_id INT,\n PRIMARY KEY (student_id, subject)\n);\n-- Problem: advisor_id determines subject, but advisor_id is not a candidate key\n\n-- GOOD: Separate the functional dependencies\nCREATE TABLE advisors (\n advisor_id INT PRIMARY KEY,\n subject VARCHAR(50)\n);\n\nCREATE TABLE student_advisor_assignments (\n student_id INT,\n advisor_id INT,\n PRIMARY KEY (student_id, advisor_id),\n FOREIGN KEY (advisor_id) REFERENCES advisors(advisor_id)\n);\n```\n\n## Denormalization Strategies\n\n### When to Denormalize\n\n1. **Performance Requirements**: When query performance is more critical than storage efficiency\n2. **Read-Heavy Workloads**: When data is read much more frequently than it's updated\n3. **Reporting Systems**: When complex joins negatively impact reporting performance\n4. **Caching Strategies**: When pre-computed values eliminate expensive calculations\n\n### Common Denormalization Patterns\n\n**1. Redundant Storage for Performance**\n```sql\n-- Store frequently accessed calculated values\nCREATE TABLE orders (\n order_id INT PRIMARY KEY,\n customer_id INT,\n order_total DECIMAL(10,2), -- Denormalized: sum of order_items.total\n item_count INT, -- Denormalized: count of order_items\n created_at TIMESTAMP\n);\n\nCREATE TABLE order_items (\n item_id INT PRIMARY KEY,\n order_id INT,\n product_id INT,\n quantity INT,\n unit_price DECIMAL(8,2),\n total DECIMAL(10,2) -- quantity * unit_price (denormalized)\n);\n```\n\n**2. Materialized Aggregates**\n```sql\n-- Pre-computed summary tables for reporting\nCREATE TABLE monthly_sales_summary (\n year_month VARCHAR(7), -- '2024-03'\n product_category VARCHAR(50),\n total_sales DECIMAL(12,2),\n total_units INT,\n avg_order_value DECIMAL(8,2),\n unique_customers INT,\n updated_at TIMESTAMP\n);\n```\n\n**3. Historical Data Snapshots**\n```sql\n-- Store historical state to avoid complex temporal queries\nCREATE TABLE customer_status_history (\n id INT PRIMARY KEY,\n customer_id INT,\n status VARCHAR(20),\n tier VARCHAR(10),\n total_lifetime_value DECIMAL(12,2), -- Snapshot at this point in time\n snapshot_date DATE\n);\n```\n\n## Trade-offs Analysis\n\n### Normalization Benefits\n- **Data Integrity**: Reduced risk of inconsistent data\n- **Storage Efficiency**: Less data duplication\n- **Update Efficiency**: Changes need to be made in only one place\n- **Flexibility**: Easier to modify schema as requirements change\n\n### Normalization Costs\n- **Query Complexity**: More joins required for data retrieval\n- **Performance Impact**: Joins can be expensive on large datasets\n- **Development Complexity**: More complex data access patterns\n\n### Denormalization Benefits\n- **Query Performance**: Fewer joins, faster queries\n- **Simplified Queries**: Direct access to related data\n- **Read Optimization**: Optimized for data retrieval patterns\n- **Reduced Load**: Less database processing for common operations\n\n### Denormalization Costs\n- **Data Redundancy**: Increased storage requirements\n- **Update Complexity**: Multiple places may need updates\n- **Consistency Risk**: Higher risk of data inconsistencies\n- **Maintenance Overhead**: Additional code to maintain derived values\n\n## Best Practices\n\n### 1. Start with Full Normalization\n- Begin with a fully normalized design\n- Identify performance bottlenecks through testing\n- Selectively denormalize based on actual performance needs\n\n### 2. Use Triggers for Consistency\n```sql\n-- Trigger to maintain denormalized order_total\nCREATE TRIGGER update_order_total\nAFTER INSERT OR UPDATE OR DELETE ON order_items\nFOR EACH ROW\nBEGIN\n UPDATE orders \n SET order_total = (\n SELECT SUM(quantity * unit_price) \n FROM order_items \n WHERE order_id = NEW.order_id\n )\n WHERE order_id = NEW.order_id;\nEND;\n```\n\n### 3. Consider Materialized Views\n```sql\n-- Materialized view for complex aggregations\nCREATE MATERIALIZED VIEW customer_summary AS\nSELECT \n c.customer_id,\n c.customer_name,\n COUNT(o.order_id) as order_count,\n SUM(o.order_total) as lifetime_value,\n AVG(o.order_total) as avg_order_value,\n MAX(o.created_at) as last_order_date\nFROM customers c\nLEFT JOIN orders o ON c.customer_id = o.customer_id\nGROUP BY c.customer_id, c.customer_name;\n```\n\n### 4. Document Denormalization Decisions\n- Clearly document why denormalization was chosen\n- Specify which data is derived and how it's maintained\n- Include performance benchmarks that justify the decision\n\n### 5. Monitor and Validate\n- Implement validation checks for denormalized data\n- Regular audits to ensure data consistency\n- Performance monitoring to validate denormalization benefits\n\n## Common Anti-Patterns\n\n### 1. Premature Denormalization\nStarting with denormalized design without understanding actual performance requirements.\n\n### 2. Over-Normalization\nCreating too many small tables that require excessive joins for simple queries.\n\n### 3. Inconsistent Approach\nMixing normalized and denormalized patterns without clear strategy.\n\n### 4. Ignoring Maintenance\nDenormalizing without proper mechanisms to maintain data consistency.\n\n## Conclusion\n\nNormalization and denormalization are both valuable tools in database design. The key is understanding when to apply each approach:\n\n- **Use normalization** for transactional systems where data integrity is paramount\n- **Consider denormalization** for analytical systems or when performance testing reveals bottlenecks\n- **Apply selectively** based on actual usage patterns and performance requirements\n- **Maintain consistency** through proper design patterns and validation mechanisms\n\nThe goal is not to achieve perfect normalization or denormalization, but to create a design that best serves your application's specific needs while maintaining data quality and system performance.","content_type":"text/markdown; charset=utf-8","language":"markdown","size":10672,"content_sha256":"30f05dfb9c474203c3377cbd7814f2d61971bde0bbad0807c332ddf2d27a4c30"},{"filename":"schema_analyzer.py","content":"#!/usr/bin/env python3\n\"\"\"\nDatabase Schema Analyzer\n\nAnalyzes SQL DDL statements and JSON schema definitions for:\n- Normalization level compliance (1NF-BCNF)\n- Missing constraints (FK, NOT NULL, UNIQUE)\n- Data type issues and antipatterns\n- Naming convention violations\n- Missing indexes on foreign key columns\n- Table relationship mapping\n- Generates Mermaid ERD diagrams\n\nInput: SQL DDL file or JSON schema definition\nOutput: Analysis report + Mermaid ERD + recommendations\n\nUsage:\n python schema_analyzer.py --input schema.sql --output-format json\n python schema_analyzer.py --input schema.json --output-format text\n python schema_analyzer.py --input schema.sql --generate-erd --output analysis.json\n\"\"\"\n\nimport argparse\nimport json\nimport re\nimport sys\nfrom collections import defaultdict, namedtuple\nfrom typing import Dict, List, Set, Tuple, Optional, Any\nfrom dataclasses import dataclass, asdict\n\n\n@dataclass\nclass Column:\n name: str\n data_type: str\n nullable: bool = True\n primary_key: bool = False\n unique: bool = False\n foreign_key: Optional[str] = None\n default_value: Optional[str] = None\n check_constraint: Optional[str] = None\n\n\n@dataclass\nclass Index:\n name: str\n table: str\n columns: List[str]\n unique: bool = False\n index_type: str = \"btree\"\n\n\n@dataclass\nclass Table:\n name: str\n columns: List[Column]\n primary_key: List[str]\n foreign_keys: List[Tuple[str, str]] # (column, referenced_table.column)\n unique_constraints: List[List[str]]\n check_constraints: Dict[str, str]\n indexes: List[Index]\n\n\n@dataclass\nclass NormalizationIssue:\n table: str\n issue_type: str\n severity: str\n description: str\n suggestion: str\n columns_affected: List[str]\n\n\n@dataclass\nclass DataTypeIssue:\n table: str\n column: str\n current_type: str\n issue: str\n suggested_type: str\n rationale: str\n\n\n@dataclass\nclass ConstraintIssue:\n table: str\n issue_type: str\n severity: str\n description: str\n suggestion: str\n columns_affected: List[str]\n\n\n@dataclass\nclass NamingIssue:\n table: str\n column: Optional[str]\n issue: str\n current_name: str\n suggested_name: str\n\n\nclass SchemaAnalyzer:\n def __init__(self):\n self.tables: Dict[str, Table] = {}\n self.normalization_issues: List[NormalizationIssue] = []\n self.datatype_issues: List[DataTypeIssue] = []\n self.constraint_issues: List[ConstraintIssue] = []\n self.naming_issues: List[NamingIssue] = []\n \n # Data type antipatterns\n self.varchar_255_pattern = re.compile(r'VARCHAR\\(255\\)', re.IGNORECASE)\n self.bad_datetime_patterns = [\n re.compile(r'VARCHAR\\(\\d+\\)', re.IGNORECASE),\n re.compile(r'CHAR\\(\\d+\\)', re.IGNORECASE)\n ]\n \n # Naming conventions\n self.table_naming_pattern = re.compile(r'^[a-z][a-z0-9_]*[a-z0-9]

Database Designer - POWERFUL Tier Skill Overview A comprehensive database design skill that provides expert-level analysis, optimization, and migration capabilities for modern database systems. This skill combines theoretical principles with practical tools to help architects and developers create scalable, performant, and maintainable database schemas. Core Competencies Schema Design & Analysis - Normalization Analysis : Automated detection of normalization levels (1NF through BCNF) - Denormalization Strategy : Smart recommendations for performance optimization - Data Type Optimization : Ide…

)\n self.column_naming_pattern = re.compile(r'^[a-z][a-z0-9_]*[a-z0-9]

Database Designer - POWERFUL Tier Skill Overview A comprehensive database design skill that provides expert-level analysis, optimization, and migration capabilities for modern database systems. This skill combines theoretical principles with practical tools to help architects and developers create scalable, performant, and maintainable database schemas. Core Competencies Schema Design & Analysis - Normalization Analysis : Automated detection of normalization levels (1NF through BCNF) - Denormalization Strategy : Smart recommendations for performance optimization - Data Type Optimization : Ide…

)\n \n def parse_sql_ddl(self, ddl_content: str) -> None:\n \"\"\"Parse SQL DDL statements and extract schema information.\"\"\"\n # Remove comments and normalize whitespace\n ddl_content = re.sub(r'--.*

Database Designer - POWERFUL Tier Skill Overview A comprehensive database design skill that provides expert-level analysis, optimization, and migration capabilities for modern database systems. This skill combines theoretical principles with practical tools to help architects and developers create scalable, performant, and maintainable database schemas. Core Competencies Schema Design & Analysis - Normalization Analysis : Automated detection of normalization levels (1NF through BCNF) - Denormalization Strategy : Smart recommendations for performance optimization - Data Type Optimization : Ide…

, '', ddl_content, flags=re.MULTILINE)\n ddl_content = re.sub(r'/\\*.*?\\*/', '', ddl_content, flags=re.DOTALL)\n ddl_content = re.sub(r'\\s+', ' ', ddl_content.strip())\n \n # Extract CREATE TABLE statements\n create_table_pattern = re.compile(\n r'CREATE\\s+TABLE\\s+(\\w+)\\s*\\(\\s*(.*?)\\s*\\)',\n re.IGNORECASE | re.DOTALL\n )\n \n for match in create_table_pattern.finditer(ddl_content):\n table_name = match.group(1).lower()\n table_definition = match.group(2)\n \n table = self._parse_table_definition(table_name, table_definition)\n self.tables[table_name] = table\n \n # Extract CREATE INDEX statements\n self._parse_indexes(ddl_content)\n \n def _parse_table_definition(self, table_name: str, definition: str) -> Table:\n \"\"\"Parse individual table definition.\"\"\"\n columns = []\n primary_key = []\n foreign_keys = []\n unique_constraints = []\n check_constraints = {}\n \n # Split by commas, but handle nested parentheses\n parts = self._split_table_parts(definition)\n \n for part in parts:\n part = part.strip()\n if not part:\n continue\n \n if part.upper().startswith('PRIMARY KEY'):\n primary_key = self._parse_primary_key(part)\n elif part.upper().startswith('FOREIGN KEY'):\n fk = self._parse_foreign_key(part)\n if fk:\n foreign_keys.append(fk)\n elif part.upper().startswith('UNIQUE'):\n unique = self._parse_unique_constraint(part)\n if unique:\n unique_constraints.append(unique)\n elif part.upper().startswith('CHECK'):\n check = self._parse_check_constraint(part)\n if check:\n check_constraints.update(check)\n else:\n # Column definition\n column = self._parse_column_definition(part)\n if column:\n columns.append(column)\n if column.primary_key:\n primary_key.append(column.name)\n \n return Table(\n name=table_name,\n columns=columns,\n primary_key=primary_key,\n foreign_keys=foreign_keys,\n unique_constraints=unique_constraints,\n check_constraints=check_constraints,\n indexes=[]\n )\n \n def _split_table_parts(self, definition: str) -> List[str]:\n \"\"\"Split table definition by commas, respecting nested parentheses.\"\"\"\n parts = []\n current_part = \"\"\n paren_count = 0\n \n for char in definition:\n if char == '(':\n paren_count += 1\n elif char == ')':\n paren_count -= 1\n elif char == ',' and paren_count == 0:\n parts.append(current_part.strip())\n current_part = \"\"\n continue\n \n current_part += char\n \n if current_part.strip():\n parts.append(current_part.strip())\n \n return parts\n \n def _parse_column_definition(self, definition: str) -> Optional[Column]:\n \"\"\"Parse individual column definition.\"\"\"\n # Pattern for column definition\n pattern = re.compile(\n r'(\\w+)\\s+([A-Z]+(?:\\(\\d+(?:,\\d+)?\\))?)\\s*(.*)',\n re.IGNORECASE\n )\n \n match = pattern.match(definition.strip())\n if not match:\n return None\n \n column_name = match.group(1).lower()\n data_type = match.group(2).upper()\n constraints = match.group(3).upper() if match.group(3) else \"\"\n \n column = Column(\n name=column_name,\n data_type=data_type,\n nullable='NOT NULL' not in constraints,\n primary_key='PRIMARY KEY' in constraints,\n unique='UNIQUE' in constraints\n )\n \n # Parse foreign key reference\n fk_pattern = re.compile(r'REFERENCES\\s+(\\w+)\\s*\\(\\s*(\\w+)\\s*\\)', re.IGNORECASE)\n fk_match = fk_pattern.search(constraints)\n if fk_match:\n column.foreign_key = f\"{fk_match.group(1).lower()}.{fk_match.group(2).lower()}\"\n \n # Parse default value\n default_pattern = re.compile(r'DEFAULT\\s+([^,\\s]+)', re.IGNORECASE)\n default_match = default_pattern.search(constraints)\n if default_match:\n column.default_value = default_match.group(1)\n \n return column\n \n def _parse_primary_key(self, definition: str) -> List[str]:\n \"\"\"Parse PRIMARY KEY constraint.\"\"\"\n pattern = re.compile(r'PRIMARY\\s+KEY\\s*\\(\\s*(.*?)\\s*\\)', re.IGNORECASE)\n match = pattern.search(definition)\n if match:\n columns = [col.strip().lower() for col in match.group(1).split(',')]\n return columns\n return []\n \n def _parse_foreign_key(self, definition: str) -> Optional[Tuple[str, str]]:\n \"\"\"Parse FOREIGN KEY constraint.\"\"\"\n pattern = re.compile(\n r'FOREIGN\\s+KEY\\s*\\(\\s*(\\w+)\\s*\\)\\s+REFERENCES\\s+(\\w+)\\s*\\(\\s*(\\w+)\\s*\\)',\n re.IGNORECASE\n )\n match = pattern.search(definition)\n if match:\n column = match.group(1).lower()\n ref_table = match.group(2).lower()\n ref_column = match.group(3).lower()\n return (column, f\"{ref_table}.{ref_column}\")\n return None\n \n def _parse_unique_constraint(self, definition: str) -> Optional[List[str]]:\n \"\"\"Parse UNIQUE constraint.\"\"\"\n pattern = re.compile(r'UNIQUE\\s*\\(\\s*(.*?)\\s*\\)', re.IGNORECASE)\n match = pattern.search(definition)\n if match:\n columns = [col.strip().lower() for col in match.group(1).split(',')]\n return columns\n return None\n \n def _parse_check_constraint(self, definition: str) -> Optional[Dict[str, str]]:\n \"\"\"Parse CHECK constraint.\"\"\"\n pattern = re.compile(r'CHECK\\s*\\(\\s*(.*?)\\s*\\)', re.IGNORECASE)\n match = pattern.search(definition)\n if match:\n constraint_name = f\"check_constraint_{len(self.tables)}\"\n return {constraint_name: match.group(1)}\n return None\n \n def _parse_indexes(self, ddl_content: str) -> None:\n \"\"\"Parse CREATE INDEX statements.\"\"\"\n index_pattern = re.compile(\n r'CREATE\\s+(?:(UNIQUE)\\s+)?INDEX\\s+(\\w+)\\s+ON\\s+(\\w+)\\s*\\(\\s*(.*?)\\s*\\)',\n re.IGNORECASE\n )\n \n for match in index_pattern.finditer(ddl_content):\n unique = match.group(1) is not None\n index_name = match.group(2).lower()\n table_name = match.group(3).lower()\n columns_str = match.group(4)\n \n columns = [col.strip().lower() for col in columns_str.split(',')]\n \n index = Index(\n name=index_name,\n table=table_name,\n columns=columns,\n unique=unique\n )\n \n if table_name in self.tables:\n self.tables[table_name].indexes.append(index)\n \n def parse_json_schema(self, json_content: str) -> None:\n \"\"\"Parse JSON schema definition.\"\"\"\n try:\n schema = json.loads(json_content)\n \n if 'tables' not in schema:\n raise ValueError(\"JSON schema must contain 'tables' key\")\n \n for table_name, table_def in schema['tables'].items():\n table = self._parse_json_table(table_name.lower(), table_def)\n self.tables[table_name.lower()] = table\n \n except json.JSONDecodeError as e:\n raise ValueError(f\"Invalid JSON: {e}\")\n \n def _parse_json_table(self, table_name: str, table_def: Dict[str, Any]) -> Table:\n \"\"\"Parse JSON table definition.\"\"\"\n columns = []\n primary_key = table_def.get('primary_key', [])\n foreign_keys = []\n unique_constraints = table_def.get('unique_constraints', [])\n check_constraints = table_def.get('check_constraints', {})\n \n for col_name, col_def in table_def.get('columns', {}).items():\n column = Column(\n name=col_name.lower(),\n data_type=col_def.get('type', 'VARCHAR(255)').upper(),\n nullable=col_def.get('nullable', True),\n primary_key=col_name.lower() in [pk.lower() for pk in primary_key],\n unique=col_def.get('unique', False),\n foreign_key=col_def.get('foreign_key'),\n default_value=col_def.get('default')\n )\n \n columns.append(column)\n \n if column.foreign_key:\n foreign_keys.append((column.name, column.foreign_key))\n \n return Table(\n name=table_name,\n columns=columns,\n primary_key=[pk.lower() for pk in primary_key],\n foreign_keys=foreign_keys,\n unique_constraints=unique_constraints,\n check_constraints=check_constraints,\n indexes=[]\n )\n \n def analyze_normalization(self) -> None:\n \"\"\"Analyze normalization compliance.\"\"\"\n for table_name, table in self.tables.items():\n self._check_first_normal_form(table)\n self._check_second_normal_form(table)\n self._check_third_normal_form(table)\n self._check_bcnf(table)\n \n def _check_first_normal_form(self, table: Table) -> None:\n \"\"\"Check First Normal Form compliance.\"\"\"\n # Check for atomic values (no arrays or delimited strings)\n for column in table.columns:\n if any(pattern in column.data_type.upper() for pattern in ['ARRAY', 'JSON', 'TEXT']):\n if 'JSON' in column.data_type.upper():\n # JSON columns can violate 1NF if storing arrays\n self.normalization_issues.append(NormalizationIssue(\n table=table.name,\n issue_type=\"1NF_VIOLATION\",\n severity=\"WARNING\",\n description=f\"Column '{column.name}' uses JSON type which may contain non-atomic values\",\n suggestion=\"Consider normalizing JSON arrays into separate tables\",\n columns_affected=[column.name]\n ))\n \n # Check for potential delimited values in VARCHAR/TEXT\n if column.data_type.upper().startswith(('VARCHAR', 'CHAR', 'TEXT')):\n if any(delimiter in column.name.lower() for delimiter in ['list', 'array', 'tags', 'items']):\n self.normalization_issues.append(NormalizationIssue(\n table=table.name,\n issue_type=\"1NF_VIOLATION\",\n severity=\"HIGH\",\n description=f\"Column '{column.name}' appears to store delimited values\",\n suggestion=\"Create separate table for individual values with foreign key relationship\",\n columns_affected=[column.name]\n ))\n \n def _check_second_normal_form(self, table: Table) -> None:\n \"\"\"Check Second Normal Form compliance.\"\"\"\n if len(table.primary_key) \u003c= 1:\n return # 2NF only applies to tables with composite primary keys\n \n # Look for potential partial dependencies\n non_key_columns = [col for col in table.columns if col.name not in table.primary_key]\n \n for column in non_key_columns:\n # Heuristic: columns that seem related to only part of the composite key\n for pk_part in table.primary_key:\n if pk_part in column.name or column.name.startswith(pk_part.split('_')[0]):\n self.normalization_issues.append(NormalizationIssue(\n table=table.name,\n issue_type=\"2NF_VIOLATION\",\n severity=\"MEDIUM\",\n description=f\"Column '{column.name}' may have partial dependency on '{pk_part}'\",\n suggestion=f\"Consider moving '{column.name}' to a separate table related to '{pk_part}'\",\n columns_affected=[column.name, pk_part]\n ))\n break\n \n def _check_third_normal_form(self, table: Table) -> None:\n \"\"\"Check Third Normal Form compliance.\"\"\"\n # Look for transitive dependencies\n non_key_columns = [col for col in table.columns if col.name not in table.primary_key]\n \n # Group columns by potential entities they describe\n entity_groups = defaultdict(list)\n for column in non_key_columns:\n # Simple heuristic: group by prefix before underscore\n prefix = column.name.split('_')[0]\n if prefix != column.name: # Has underscore\n entity_groups[prefix].append(column.name)\n \n for entity, columns in entity_groups.items():\n if len(columns) > 1 and entity != table.name.split('_')[0]:\n # Potential entity that should be in its own table\n id_column = f\"{entity}_id\"\n if id_column in [col.name for col in table.columns]:\n self.normalization_issues.append(NormalizationIssue(\n table=table.name,\n issue_type=\"3NF_VIOLATION\",\n severity=\"MEDIUM\",\n description=f\"Columns {columns} may have transitive dependency through '{id_column}'\",\n suggestion=f\"Consider creating separate '{entity}' table with these columns\",\n columns_affected=columns + [id_column]\n ))\n \n def _check_bcnf(self, table: Table) -> None:\n \"\"\"Check Boyce-Codd Normal Form compliance.\"\"\"\n # BCNF violations are complex to detect without functional dependencies\n # Provide general guidance for composite keys\n if len(table.primary_key) > 2:\n self.normalization_issues.append(NormalizationIssue(\n table=table.name,\n issue_type=\"BCNF_WARNING\",\n severity=\"LOW\",\n description=f\"Table has composite primary key with {len(table.primary_key)} columns\",\n suggestion=\"Review functional dependencies to ensure BCNF compliance\",\n columns_affected=table.primary_key\n ))\n \n def analyze_data_types(self) -> None:\n \"\"\"Analyze data type usage for antipatterns.\"\"\"\n for table_name, table in self.tables.items():\n for column in table.columns:\n self._check_varchar_255_antipattern(table.name, column)\n self._check_inappropriate_types(table.name, column)\n self._check_size_optimization(table.name, column)\n \n def _check_varchar_255_antipattern(self, table_name: str, column: Column) -> None:\n \"\"\"Check for VARCHAR(255) antipattern.\"\"\"\n if self.varchar_255_pattern.match(column.data_type):\n self.datatype_issues.append(DataTypeIssue(\n table=table_name,\n column=column.name,\n current_type=column.data_type,\n issue=\"VARCHAR(255) antipattern\",\n suggested_type=\"Appropriately sized VARCHAR or TEXT\",\n rationale=\"VARCHAR(255) is often used as default without considering actual data length requirements\"\n ))\n \n def _check_inappropriate_types(self, table_name: str, column: Column) -> None:\n \"\"\"Check for inappropriate data types.\"\"\"\n # Date/time stored as string\n if column.name.lower() in ['date', 'time', 'created', 'updated', 'modified', 'timestamp']:\n if column.data_type.upper().startswith(('VARCHAR', 'CHAR', 'TEXT')):\n self.datatype_issues.append(DataTypeIssue(\n table=table_name,\n column=column.name,\n current_type=column.data_type,\n issue=\"Date/time stored as string\",\n suggested_type=\"TIMESTAMP, DATE, or TIME\",\n rationale=\"Proper date/time types enable date arithmetic and indexing optimization\"\n ))\n \n # Boolean stored as string/integer\n if column.name.lower() in ['active', 'enabled', 'deleted', 'visible', 'published']:\n if not column.data_type.upper().startswith('BOOL'):\n self.datatype_issues.append(DataTypeIssue(\n table=table_name,\n column=column.name,\n current_type=column.data_type,\n issue=\"Boolean value stored as non-boolean type\",\n suggested_type=\"BOOLEAN\",\n rationale=\"Boolean type is more explicit and can be more storage efficient\"\n ))\n \n # Numeric IDs as VARCHAR\n if column.name.lower().endswith('_id') or column.name.lower() == 'id':\n if column.data_type.upper().startswith(('VARCHAR', 'CHAR')):\n self.datatype_issues.append(DataTypeIssue(\n table=table_name,\n column=column.name,\n current_type=column.data_type,\n issue=\"Numeric ID stored as string\",\n suggested_type=\"INTEGER, BIGINT, or UUID\",\n rationale=\"Numeric types are more efficient for ID columns and enable better indexing\"\n ))\n \n def _check_size_optimization(self, table_name: str, column: Column) -> None:\n \"\"\"Check for size optimization opportunities.\"\"\"\n # Oversized integer types\n if column.data_type.upper() == 'BIGINT':\n if not any(keyword in column.name.lower() for keyword in ['timestamp', 'big', 'large', 'count']):\n self.datatype_issues.append(DataTypeIssue(\n table=table_name,\n column=column.name,\n current_type=column.data_type,\n issue=\"Potentially oversized integer type\",\n suggested_type=\"INTEGER\",\n rationale=\"INTEGER is sufficient for most ID and count fields unless very large values are expected\"\n ))\n \n def analyze_constraints(self) -> None:\n \"\"\"Analyze missing constraints.\"\"\"\n for table_name, table in self.tables.items():\n self._check_missing_primary_key(table)\n self._check_missing_foreign_key_constraints(table)\n self._check_missing_not_null_constraints(table)\n self._check_missing_unique_constraints(table)\n self._check_missing_check_constraints(table)\n \n def _check_missing_primary_key(self, table: Table) -> None:\n \"\"\"Check for missing primary key.\"\"\"\n if not table.primary_key:\n self.constraint_issues.append(ConstraintIssue(\n table=table.name,\n issue_type=\"MISSING_PRIMARY_KEY\",\n severity=\"HIGH\",\n description=\"Table has no primary key defined\",\n suggestion=\"Add a primary key column (e.g., 'id' with auto-increment)\",\n columns_affected=[]\n ))\n \n def _check_missing_foreign_key_constraints(self, table: Table) -> None:\n \"\"\"Check for missing foreign key constraints.\"\"\"\n for column in table.columns:\n if column.name.endswith('_id') and column.name != 'id':\n # Potential foreign key column\n if not column.foreign_key:\n referenced_table = column.name[:-3] # Remove '_id' suffix\n if referenced_table in self.tables or referenced_table + 's' in self.tables:\n self.constraint_issues.append(ConstraintIssue(\n table=table.name,\n issue_type=\"MISSING_FOREIGN_KEY\",\n severity=\"MEDIUM\",\n description=f\"Column '{column.name}' appears to be a foreign key but has no constraint\",\n suggestion=f\"Add foreign key constraint referencing {referenced_table} table\",\n columns_affected=[column.name]\n ))\n \n def _check_missing_not_null_constraints(self, table: Table) -> None:\n \"\"\"Check for missing NOT NULL constraints.\"\"\"\n for column in table.columns:\n if column.nullable and column.name in ['email', 'name', 'title', 'status']:\n self.constraint_issues.append(ConstraintIssue(\n table=table.name,\n issue_type=\"MISSING_NOT_NULL\",\n severity=\"LOW\",\n description=f\"Column '{column.name}' allows NULL but typically should not\",\n suggestion=f\"Consider adding NOT NULL constraint to '{column.name}'\",\n columns_affected=[column.name]\n ))\n \n def _check_missing_unique_constraints(self, table: Table) -> None:\n \"\"\"Check for missing unique constraints.\"\"\"\n for column in table.columns:\n if column.name in ['email', 'username', 'slug', 'code'] and not column.unique:\n if column.name not in table.primary_key:\n self.constraint_issues.append(ConstraintIssue(\n table=table.name,\n issue_type=\"MISSING_UNIQUE\",\n severity=\"MEDIUM\",\n description=f\"Column '{column.name}' should likely have UNIQUE constraint\",\n suggestion=f\"Add UNIQUE constraint to '{column.name}'\",\n columns_affected=[column.name]\n ))\n \n def _check_missing_check_constraints(self, table: Table) -> None:\n \"\"\"Check for missing check constraints.\"\"\"\n for column in table.columns:\n # Email format validation\n if column.name == 'email' and 'email' not in str(table.check_constraints):\n self.constraint_issues.append(ConstraintIssue(\n table=table.name,\n issue_type=\"MISSING_CHECK_CONSTRAINT\",\n severity=\"LOW\",\n description=f\"Email column lacks format validation\",\n suggestion=\"Add CHECK constraint for email format validation\",\n columns_affected=[column.name]\n ))\n \n # Positive values for counts, prices, etc.\n if column.name.lower() in ['price', 'amount', 'count', 'quantity', 'age']:\n if column.name not in str(table.check_constraints):\n self.constraint_issues.append(ConstraintIssue(\n table=table.name,\n issue_type=\"MISSING_CHECK_CONSTRAINT\",\n severity=\"LOW\",\n description=f\"Column '{column.name}' should validate positive values\",\n suggestion=f\"Add CHECK constraint: {column.name} > 0\",\n columns_affected=[column.name]\n ))\n \n def analyze_naming_conventions(self) -> None:\n \"\"\"Analyze naming convention compliance.\"\"\"\n for table_name, table in self.tables.items():\n self._check_table_naming(table_name)\n for column in table.columns:\n self._check_column_naming(table_name, column.name)\n \n def _check_table_naming(self, table_name: str) -> None:\n \"\"\"Check table naming conventions.\"\"\"\n if not self.table_naming_pattern.match(table_name):\n suggested_name = self._suggest_table_name(table_name)\n self.naming_issues.append(NamingIssue(\n table=table_name,\n column=None,\n issue=\"Invalid table naming convention\",\n current_name=table_name,\n suggested_name=suggested_name\n ))\n \n # Check for plural naming\n if not table_name.endswith('s') and table_name not in ['data', 'information']:\n self.naming_issues.append(NamingIssue(\n table=table_name,\n column=None,\n issue=\"Table name should be plural\",\n current_name=table_name,\n suggested_name=table_name + 's'\n ))\n \n def _check_column_naming(self, table_name: str, column_name: str) -> None:\n \"\"\"Check column naming conventions.\"\"\"\n if not self.column_naming_pattern.match(column_name):\n suggested_name = self._suggest_column_name(column_name)\n self.naming_issues.append(NamingIssue(\n table=table_name,\n column=column_name,\n issue=\"Invalid column naming convention\",\n current_name=column_name,\n suggested_name=suggested_name\n ))\n \n def _suggest_table_name(self, table_name: str) -> str:\n \"\"\"Suggest corrected table name.\"\"\"\n # Convert to snake_case and make plural\n name = re.sub(r'([A-Z])', r'_\\1', table_name).lower().strip('_')\n return name + 's' if not name.endswith('s') else name\n \n def _suggest_column_name(self, column_name: str) -> str:\n \"\"\"Suggest corrected column name.\"\"\"\n # Convert to snake_case\n return re.sub(r'([A-Z])', r'_\\1', column_name).lower().strip('_')\n \n def check_missing_indexes(self) -> List[Dict[str, Any]]:\n \"\"\"Check for missing indexes on foreign key columns.\"\"\"\n missing_indexes = []\n \n for table_name, table in self.tables.items():\n existing_indexed_columns = set()\n \n # Collect existing indexed columns\n for index in table.indexes:\n existing_indexed_columns.update(index.columns)\n \n # Primary key columns are automatically indexed\n existing_indexed_columns.update(table.primary_key)\n \n # Check foreign key columns\n for column in table.columns:\n if column.foreign_key and column.name not in existing_indexed_columns:\n missing_indexes.append({\n 'table': table_name,\n 'column': column.name,\n 'type': 'foreign_key',\n 'suggestion': f\"CREATE INDEX idx_{table_name}_{column.name} ON {table_name} ({column.name});\"\n })\n \n return missing_indexes\n \n def generate_mermaid_erd(self) -> str:\n \"\"\"Generate Mermaid ERD diagram.\"\"\"\n erd_lines = [\"erDiagram\"]\n \n # Add table definitions\n for table_name, table in self.tables.items():\n erd_lines.append(f\" {table_name.upper()} {{\")\n \n for column in table.columns:\n data_type = column.data_type\n constraints = []\n \n if column.primary_key:\n constraints.append(\"PK\")\n if column.foreign_key:\n constraints.append(\"FK\")\n if not column.nullable:\n constraints.append(\"NOT NULL\")\n if column.unique:\n constraints.append(\"UNIQUE\")\n \n constraint_str = \" \".join(constraints)\n if constraint_str:\n constraint_str = f\" \\\"{constraint_str}\\\"\"\n \n erd_lines.append(f\" {data_type} {column.name}{constraint_str}\")\n \n erd_lines.append(\" }\")\n \n # Add relationships\n relationships = set()\n for table_name, table in self.tables.items():\n for column in table.columns:\n if column.foreign_key:\n ref_table = column.foreign_key.split('.')[0]\n if ref_table in self.tables:\n relationship = f\" {ref_table.upper()} ||--o{{ {table_name.upper()} : has\"\n relationships.add(relationship)\n \n erd_lines.extend(sorted(relationships))\n \n return \"\\n\".join(erd_lines)\n \n def get_analysis_summary(self) -> Dict[str, Any]:\n \"\"\"Get comprehensive analysis summary.\"\"\"\n return {\n \"schema_overview\": {\n \"total_tables\": len(self.tables),\n \"total_columns\": sum(len(table.columns) for table in self.tables.values()),\n \"tables_with_primary_keys\": len([t for t in self.tables.values() if t.primary_key]),\n \"total_foreign_keys\": sum(len(table.foreign_keys) for table in self.tables.values()),\n \"total_indexes\": sum(len(table.indexes) for table in self.tables.values())\n },\n \"normalization_analysis\": {\n \"total_issues\": len(self.normalization_issues),\n \"by_severity\": {\n \"high\": len([i for i in self.normalization_issues if i.severity == \"HIGH\"]),\n \"medium\": len([i for i in self.normalization_issues if i.severity == \"MEDIUM\"]),\n \"low\": len([i for i in self.normalization_issues if i.severity == \"LOW\"]),\n \"warning\": len([i for i in self.normalization_issues if i.severity == \"WARNING\"])\n },\n \"issues\": [asdict(issue) for issue in self.normalization_issues]\n },\n \"data_type_analysis\": {\n \"total_issues\": len(self.datatype_issues),\n \"issues\": [asdict(issue) for issue in self.datatype_issues]\n },\n \"constraint_analysis\": {\n \"total_issues\": len(self.constraint_issues),\n \"by_severity\": {\n \"high\": len([i for i in self.constraint_issues if i.severity == \"HIGH\"]),\n \"medium\": len([i for i in self.constraint_issues if i.severity == \"MEDIUM\"]),\n \"low\": len([i for i in self.constraint_issues if i.severity == \"LOW\"])\n },\n \"issues\": [asdict(issue) for issue in self.constraint_issues]\n },\n \"naming_analysis\": {\n \"total_issues\": len(self.naming_issues),\n \"issues\": [asdict(issue) for issue in self.naming_issues]\n },\n \"missing_indexes\": self.check_missing_indexes(),\n \"recommendations\": self._generate_recommendations()\n }\n \n def _generate_recommendations(self) -> List[str]:\n \"\"\"Generate high-level recommendations.\"\"\"\n recommendations = []\n \n # High severity issues\n high_severity_issues = [\n i for i in self.normalization_issues + self.constraint_issues \n if i.severity == \"HIGH\"\n ]\n \n if high_severity_issues:\n recommendations.append(f\"Address {len(high_severity_issues)} high-severity issues immediately\")\n \n # Missing primary keys\n tables_without_pk = [name for name, table in self.tables.items() if not table.primary_key]\n if tables_without_pk:\n recommendations.append(f\"Add primary keys to tables: {', '.join(tables_without_pk)}\")\n \n # Data type improvements\n varchar_255_issues = [i for i in self.datatype_issues if \"VARCHAR(255)\" in i.issue]\n if varchar_255_issues:\n recommendations.append(f\"Review {len(varchar_255_issues)} VARCHAR(255) columns for right-sizing\")\n \n # Missing foreign keys\n missing_fks = [i for i in self.constraint_issues if i.issue_type == \"MISSING_FOREIGN_KEY\"]\n if missing_fks:\n recommendations.append(f\"Consider adding {len(missing_fks)} foreign key constraints for referential integrity\")\n \n # Normalization improvements\n normalization_issues_count = len(self.normalization_issues)\n if normalization_issues_count > 0:\n recommendations.append(f\"Review {normalization_issues_count} normalization issues for schema optimization\")\n \n return recommendations\n \n def format_text_report(self, analysis: Dict[str, Any]) -> str:\n \"\"\"Format analysis as human-readable text report.\"\"\"\n lines = []\n lines.append(\"DATABASE SCHEMA ANALYSIS REPORT\")\n lines.append(\"=\" * 50)\n lines.append(\"\")\n \n # Overview\n overview = analysis[\"schema_overview\"]\n lines.append(\"SCHEMA OVERVIEW\")\n lines.append(\"-\" * 15)\n lines.append(f\"Total Tables: {overview['total_tables']}\")\n lines.append(f\"Total Columns: {overview['total_columns']}\")\n lines.append(f\"Tables with Primary Keys: {overview['tables_with_primary_keys']}\")\n lines.append(f\"Total Foreign Keys: {overview['total_foreign_keys']}\")\n lines.append(f\"Total Indexes: {overview['total_indexes']}\")\n lines.append(\"\")\n \n # Recommendations\n if analysis[\"recommendations\"]:\n lines.append(\"KEY RECOMMENDATIONS\")\n lines.append(\"-\" * 18)\n for i, rec in enumerate(analysis[\"recommendations\"], 1):\n lines.append(f\"{i}. {rec}\")\n lines.append(\"\")\n \n # Normalization Issues\n norm_analysis = analysis[\"normalization_analysis\"]\n if norm_analysis[\"total_issues\"] > 0:\n lines.append(f\"NORMALIZATION ISSUES ({norm_analysis['total_issues']} total)\")\n lines.append(\"-\" * 25)\n severity_counts = norm_analysis[\"by_severity\"]\n lines.append(f\"High: {severity_counts['high']}, Medium: {severity_counts['medium']}, \"\n f\"Low: {severity_counts['low']}, Warning: {severity_counts['warning']}\")\n lines.append(\"\")\n \n for issue in norm_analysis[\"issues\"][:5]: # Show first 5\n lines.append(f\"• {issue['table']}: {issue['description']}\")\n lines.append(f\" Suggestion: {issue['suggestion']}\")\n lines.append(\"\")\n \n # Data Type Issues\n dt_analysis = analysis[\"data_type_analysis\"]\n if dt_analysis[\"total_issues\"] > 0:\n lines.append(f\"DATA TYPE ISSUES ({dt_analysis['total_issues']} total)\")\n lines.append(\"-\" * 20)\n for issue in dt_analysis[\"issues\"][:5]: # Show first 5\n lines.append(f\"• {issue['table']}.{issue['column']}: {issue['issue']}\")\n lines.append(f\" Current: {issue['current_type']} → Suggested: {issue['suggested_type']}\")\n lines.append(f\" Rationale: {issue['rationale']}\")\n lines.append(\"\")\n \n # Constraint Issues\n const_analysis = analysis[\"constraint_analysis\"]\n if const_analysis[\"total_issues\"] > 0:\n lines.append(f\"CONSTRAINT ISSUES ({const_analysis['total_issues']} total)\")\n lines.append(\"-\" * 20)\n severity_counts = const_analysis[\"by_severity\"]\n lines.append(f\"High: {severity_counts['high']}, Medium: {severity_counts['medium']}, \"\n f\"Low: {severity_counts['low']}\")\n lines.append(\"\")\n \n for issue in const_analysis[\"issues\"][:5]: # Show first 5\n lines.append(f\"• {issue['table']}: {issue['description']}\")\n lines.append(f\" Suggestion: {issue['suggestion']}\")\n lines.append(\"\")\n \n # Missing Indexes\n missing_idx = analysis[\"missing_indexes\"]\n if missing_idx:\n lines.append(f\"MISSING INDEXES ({len(missing_idx)} total)\")\n lines.append(\"-\" * 17)\n for idx in missing_idx[:5]: # Show first 5\n lines.append(f\"• {idx['table']}.{idx['column']} ({idx['type']})\")\n lines.append(f\" SQL: {idx['suggestion']}\")\n lines.append(\"\")\n \n return \"\\n\".join(lines)\n\n\ndef main():\n parser = argparse.ArgumentParser(description=\"Analyze database schema for design issues and generate ERD\")\n parser.add_argument(\"--input\", \"-i\", required=True, help=\"Input file (SQL DDL or JSON schema)\")\n parser.add_argument(\"--output\", \"-o\", help=\"Output file (default: stdout)\")\n parser.add_argument(\"--output-format\", \"-f\", choices=[\"json\", \"text\"], default=\"text\",\n help=\"Output format\")\n parser.add_argument(\"--generate-erd\", \"-e\", action=\"store_true\", help=\"Include Mermaid ERD in output\")\n parser.add_argument(\"--erd-only\", action=\"store_true\", help=\"Output only the Mermaid ERD\")\n \n args = parser.parse_args()\n \n try:\n # Read input file\n with open(args.input, 'r') as f:\n content = f.read()\n \n # Initialize analyzer\n analyzer = SchemaAnalyzer()\n \n # Parse input based on file extension\n if args.input.lower().endswith('.json'):\n analyzer.parse_json_schema(content)\n else:\n analyzer.parse_sql_ddl(content)\n \n if not analyzer.tables:\n print(\"Error: No tables found in input file\", file=sys.stderr)\n return 1\n \n if args.erd_only:\n # Output only ERD\n erd = analyzer.generate_mermaid_erd()\n if args.output:\n with open(args.output, 'w') as f:\n f.write(erd)\n else:\n print(erd)\n return 0\n \n # Perform analysis\n analyzer.analyze_normalization()\n analyzer.analyze_data_types()\n analyzer.analyze_constraints()\n analyzer.analyze_naming_conventions()\n \n # Generate report\n analysis = analyzer.get_analysis_summary()\n \n if args.generate_erd:\n analysis[\"mermaid_erd\"] = analyzer.generate_mermaid_erd()\n \n # Output results\n if args.output_format == \"json\":\n output = json.dumps(analysis, indent=2)\n else:\n output = analyzer.format_text_report(analysis)\n if args.generate_erd:\n output += \"\\n\\nMERMAID ERD\\n\" + \"=\" * 11 + \"\\n\"\n output += analysis[\"mermaid_erd\"]\n \n if args.output:\n with open(args.output, 'w') as f:\n f.write(output)\n else:\n print(output)\n \n return 0\n \n except Exception as e:\n print(f\"Error: {e}\", file=sys.stderr)\n return 1\n\n\nif __name__ == \"__main__\":\n sys.exit(main())","content_type":"text/x-python; charset=utf-8","language":"python","size":42034,"content_sha256":"4c44912b055045f278c938e6dc90f8559bded16cf6dfa066860d66a3a7a42e42"}],"content_json":{"type":"doc","content":[{"type":"heading","attrs":{"level":1},"content":[{"text":"Database Designer - POWERFUL Tier Skill","type":"text"}]},{"type":"heading","attrs":{"level":2},"content":[{"text":"Overview","type":"text"}]},{"type":"paragraph","content":[{"text":"A comprehensive database design skill that provides expert-level analysis, optimization, and migration capabilities for modern database systems. This skill combines theoretical principles with practical tools to help architects and developers create scalable, performant, and maintainable database schemas.","type":"text"}]},{"type":"heading","attrs":{"level":2},"content":[{"text":"Core Competencies","type":"text"}]},{"type":"heading","attrs":{"level":3},"content":[{"text":"Schema Design & Analysis","type":"text"}]},{"type":"bullet_list","content":[{"type":"list_item","content":[{"type":"paragraph","content":[{"text":"Normalization Analysis","type":"text","marks":[{"type":"strong"}]},{"text":": Automated detection of normalization levels (1NF through BCNF)","type":"text"}]}]},{"type":"list_item","content":[{"type":"paragraph","content":[{"text":"Denormalization Strategy","type":"text","marks":[{"type":"strong"}]},{"text":": Smart recommendations for performance optimization","type":"text"}]}]},{"type":"list_item","content":[{"type":"paragraph","content":[{"text":"Data Type Optimization","type":"text","marks":[{"type":"strong"}]},{"text":": Identification of inappropriate types and size issues","type":"text"}]}]},{"type":"list_item","content":[{"type":"paragraph","content":[{"text":"Constraint Analysis","type":"text","marks":[{"type":"strong"}]},{"text":": Missing foreign keys, unique constraints, and null checks","type":"text"}]}]},{"type":"list_item","content":[{"type":"paragraph","content":[{"text":"Naming Convention Validation","type":"text","marks":[{"type":"strong"}]},{"text":": Consistent table and column naming patterns","type":"text"}]}]},{"type":"list_item","content":[{"type":"paragraph","content":[{"text":"ERD Generation","type":"text","marks":[{"type":"strong"}]},{"text":": Automatic Mermaid diagram creation from DDL","type":"text"}]}]}]},{"type":"heading","attrs":{"level":3},"content":[{"text":"Index Optimization","type":"text"}]},{"type":"bullet_list","content":[{"type":"list_item","content":[{"type":"paragraph","content":[{"text":"Index Gap Analysis","type":"text","marks":[{"type":"strong"}]},{"text":": Identification of missing indexes on foreign keys and query patterns","type":"text"}]}]},{"type":"list_item","content":[{"type":"paragraph","content":[{"text":"Composite Index Strategy","type":"text","marks":[{"type":"strong"}]},{"text":": Optimal column ordering for multi-column indexes","type":"text"}]}]},{"type":"list_item","content":[{"type":"paragraph","content":[{"text":"Index Redundancy Detection","type":"text","marks":[{"type":"strong"}]},{"text":": Elimination of overlapping and unused indexes","type":"text"}]}]},{"type":"list_item","content":[{"type":"paragraph","content":[{"text":"Performance Impact Modeling","type":"text","marks":[{"type":"strong"}]},{"text":": Selectivity estimation and query cost analysis","type":"text"}]}]},{"type":"list_item","content":[{"type":"paragraph","content":[{"text":"Index Type Selection","type":"text","marks":[{"type":"strong"}]},{"text":": B-tree, hash, partial, covering, and specialized indexes","type":"text"}]}]}]},{"type":"heading","attrs":{"level":3},"content":[{"text":"Migration Management","type":"text"}]},{"type":"bullet_list","content":[{"type":"list_item","content":[{"type":"paragraph","content":[{"text":"Zero-Downtime Migrations","type":"text","marks":[{"type":"strong"}]},{"text":": Expand-contract pattern implementation","type":"text"}]}]},{"type":"list_item","content":[{"type":"paragraph","content":[{"text":"Schema Evolution","type":"text","marks":[{"type":"strong"}]},{"text":": Safe column additions, deletions, and type changes","type":"text"}]}]},{"type":"list_item","content":[{"type":"paragraph","content":[{"text":"Data Migration Scripts","type":"text","marks":[{"type":"strong"}]},{"text":": Automated data transformation and validation","type":"text"}]}]},{"type":"list_item","content":[{"type":"paragraph","content":[{"text":"Rollback Strategy","type":"text","marks":[{"type":"strong"}]},{"text":": Complete reversal capabilities with validation","type":"text"}]}]},{"type":"list_item","content":[{"type":"paragraph","content":[{"text":"Execution Planning","type":"text","marks":[{"type":"strong"}]},{"text":": Ordered migration steps with dependency resolution","type":"text"}]}]}]},{"type":"heading","attrs":{"level":2},"content":[{"text":"Database Design Principles","type":"text"}]},{"type":"paragraph","content":[{"text":"→ See references/database-design-reference.md for details","type":"text"}]},{"type":"heading","attrs":{"level":2},"content":[{"text":"Best Practices","type":"text"}]},{"type":"heading","attrs":{"level":3},"content":[{"text":"Schema Design","type":"text"}]},{"type":"ordered_list","attrs":{"order":1,"listStyle":"number"},"content":[{"type":"list_item","content":[{"type":"paragraph","content":[{"text":"Use meaningful names","type":"text","marks":[{"type":"strong"}]},{"text":": Clear, consistent naming conventions","type":"text"}]}]},{"type":"list_item","content":[{"type":"paragraph","content":[{"text":"Choose appropriate data types","type":"text","marks":[{"type":"strong"}]},{"text":": Right-sized columns for storage efficiency","type":"text"}]}]},{"type":"list_item","content":[{"type":"paragraph","content":[{"text":"Define proper constraints","type":"text","marks":[{"type":"strong"}]},{"text":": Foreign keys, check constraints, unique indexes","type":"text"}]}]},{"type":"list_item","content":[{"type":"paragraph","content":[{"text":"Consider future growth","type":"text","marks":[{"type":"strong"}]},{"text":": Plan for scale from the beginning","type":"text"}]}]},{"type":"list_item","content":[{"type":"paragraph","content":[{"text":"Document relationships","type":"text","marks":[{"type":"strong"}]},{"text":": Clear foreign key relationships and business rules","type":"text"}]}]}]},{"type":"heading","attrs":{"level":3},"content":[{"text":"Performance Optimization","type":"text"}]},{"type":"ordered_list","attrs":{"order":1,"listStyle":"number"},"content":[{"type":"list_item","content":[{"type":"paragraph","content":[{"text":"Index strategically","type":"text","marks":[{"type":"strong"}]},{"text":": Cover common query patterns without over-indexing","type":"text"}]}]},{"type":"list_item","content":[{"type":"paragraph","content":[{"text":"Monitor query performance","type":"text","marks":[{"type":"strong"}]},{"text":": Regular analysis of slow queries","type":"text"}]}]},{"type":"list_item","content":[{"type":"paragraph","content":[{"text":"Partition large tables","type":"text","marks":[{"type":"strong"}]},{"text":": Improve query performance and maintenance","type":"text"}]}]},{"type":"list_item","content":[{"type":"paragraph","content":[{"text":"Use appropriate isolation levels","type":"text","marks":[{"type":"strong"}]},{"text":": Balance consistency with performance","type":"text"}]}]},{"type":"list_item","content":[{"type":"paragraph","content":[{"text":"Implement connection pooling","type":"text","marks":[{"type":"strong"}]},{"text":": Efficient resource utilization","type":"text"}]}]}]},{"type":"heading","attrs":{"level":3},"content":[{"text":"Security Considerations","type":"text"}]},{"type":"ordered_list","attrs":{"order":1,"listStyle":"number"},"content":[{"type":"list_item","content":[{"type":"paragraph","content":[{"text":"Principle of least privilege","type":"text","marks":[{"type":"strong"}]},{"text":": Grant minimal necessary permissions","type":"text"}]}]},{"type":"list_item","content":[{"type":"paragraph","content":[{"text":"Encrypt sensitive data","type":"text","marks":[{"type":"strong"}]},{"text":": At rest and in transit","type":"text"}]}]},{"type":"list_item","content":[{"type":"paragraph","content":[{"text":"Audit access patterns","type":"text","marks":[{"type":"strong"}]},{"text":": Monitor and log database access","type":"text"}]}]},{"type":"list_item","content":[{"type":"paragraph","content":[{"text":"Validate inputs","type":"text","marks":[{"type":"strong"}]},{"text":": Prevent SQL injection attacks","type":"text"}]}]},{"type":"list_item","content":[{"type":"paragraph","content":[{"text":"Regular security updates","type":"text","marks":[{"type":"strong"}]},{"text":": Keep database software current","type":"text"}]}]}]},{"type":"heading","attrs":{"level":2},"content":[{"text":"Query Generation Patterns","type":"text"}]},{"type":"heading","attrs":{"level":3},"content":[{"text":"SELECT with JOINs","type":"text"}]},{"type":"code_block","attrs":{"wrap":false,"language":"sql"},"content":[{"text":"-- INNER JOIN: only matching rows\nSELECT o.id, c.name, o.total\nFROM orders o\nINNER JOIN customers c ON c.id = o.customer_id;\n\n-- LEFT JOIN: all left rows, NULLs for non-matches\nSELECT c.name, COUNT(o.id) AS order_count\nFROM customers c\nLEFT JOIN orders o ON o.customer_id = c.id\nGROUP BY c.name;\n\n-- Self-join: hierarchical data (employees/managers)\nSELECT e.name AS employee, m.name AS manager\nFROM employees e\nLEFT JOIN employees m ON m.id = e.manager_id;","type":"text"}]},{"type":"heading","attrs":{"level":3},"content":[{"text":"Common Table Expressions (CTEs)","type":"text"}]},{"type":"code_block","attrs":{"wrap":false,"language":"sql"},"content":[{"text":"-- Recursive CTE for org chart\nWITH RECURSIVE org AS (\n SELECT id, name, manager_id, 1 AS depth\n FROM employees WHERE manager_id IS NULL\n UNION ALL\n SELECT e.id, e.name, e.manager_id, o.depth + 1\n FROM employees e INNER JOIN org o ON o.id = e.manager_id\n)\nSELECT * FROM org ORDER BY depth, name;","type":"text"}]},{"type":"heading","attrs":{"level":3},"content":[{"text":"Window Functions","type":"text"}]},{"type":"code_block","attrs":{"wrap":false,"language":"sql"},"content":[{"text":"-- ROW_NUMBER for pagination / dedup\nSELECT *, ROW_NUMBER() OVER (PARTITION BY customer_id ORDER BY created_at DESC) AS rn\nFROM orders;\n\n-- RANK with gaps, DENSE_RANK without gaps\nSELECT name, score, RANK() OVER (ORDER BY score DESC) AS rank FROM leaderboard;\n\n-- LAG/LEAD for comparing adjacent rows\nSELECT date, revenue,\n revenue - LAG(revenue) OVER (ORDER BY date) AS daily_change\nFROM daily_sales;","type":"text"}]},{"type":"heading","attrs":{"level":3},"content":[{"text":"Aggregation Patterns","type":"text"}]},{"type":"code_block","attrs":{"wrap":false,"language":"sql"},"content":[{"text":"-- FILTER clause (PostgreSQL) for conditional aggregation\nSELECT\n COUNT(*) AS total,\n COUNT(*) FILTER (WHERE status = 'active') AS active,\n AVG(amount) FILTER (WHERE amount > 0) AS avg_positive\nFROM accounts;\n\n-- GROUPING SETS for multi-level rollups\nSELECT region, product, SUM(revenue)\nFROM sales\nGROUP BY GROUPING SETS ((region, product), (region), ());","type":"text"}]},{"type":"hr","attrs":{"markup":"---"}},{"type":"heading","attrs":{"level":2},"content":[{"text":"Migration Patterns","type":"text"}]},{"type":"heading","attrs":{"level":3},"content":[{"text":"Up/Down Migration Scripts","type":"text"}]},{"type":"paragraph","content":[{"text":"Every migration must have a reversible counterpart. Name files with a timestamp prefix for ordering:","type":"text"}]},{"type":"code_block","attrs":{"wrap":false,"language":""},"content":[{"text":"migrations/\n├── 20260101_000001_create_users.up.sql\n├── 20260101_000001_create_users.down.sql\n├── 20260115_000002_add_users_email_index.up.sql\n└── 20260115_000002_add_users_email_index.down.sql","type":"text"}]},{"type":"heading","attrs":{"level":3},"content":[{"text":"Zero-Downtime Migrations (Expand/Contract)","type":"text"}]},{"type":"paragraph","content":[{"text":"Use the expand-contract pattern to avoid locking or breaking running code:","type":"text"}]},{"type":"ordered_list","attrs":{"order":1,"listStyle":"number"},"content":[{"type":"list_item","content":[{"type":"paragraph","content":[{"text":"Expand","type":"text","marks":[{"type":"strong"}]},{"text":" — add the new column/table (nullable, with default)","type":"text"}]}]},{"type":"list_item","content":[{"type":"paragraph","content":[{"text":"Migrate data","type":"text","marks":[{"type":"strong"}]},{"text":" — backfill in batches; dual-write from application","type":"text"}]}]},{"type":"list_item","content":[{"type":"paragraph","content":[{"text":"Transition","type":"text","marks":[{"type":"strong"}]},{"text":" — application reads from new column; stop writing to old","type":"text"}]}]},{"type":"list_item","content":[{"type":"paragraph","content":[{"text":"Contract","type":"text","marks":[{"type":"strong"}]},{"text":" — drop old column in a follow-up migration","type":"text"}]}]}]},{"type":"heading","attrs":{"level":3},"content":[{"text":"Data Backfill Strategies","type":"text"}]},{"type":"code_block","attrs":{"wrap":false,"language":"sql"},"content":[{"text":"-- Batch update to avoid long-running locks\nUPDATE users SET email_normalized = LOWER(email)\nWHERE id IN (SELECT id FROM users WHERE email_normalized IS NULL LIMIT 5000);\n-- Repeat in a loop until 0 rows affected","type":"text"}]},{"type":"heading","attrs":{"level":3},"content":[{"text":"Rollback Procedures","type":"text"}]},{"type":"bullet_list","content":[{"type":"list_item","content":[{"type":"paragraph","content":[{"text":"Always test the ","type":"text"},{"text":"down.sql","type":"text","marks":[{"type":"code_inline"}]},{"text":" in staging before deploying ","type":"text"},{"text":"up.sql","type":"text","marks":[{"type":"code_inline"}]},{"text":" to production","type":"text"}]}]},{"type":"list_item","content":[{"type":"paragraph","content":[{"text":"Keep rollback window short — if the contract step has run, rollback requires a new forward migration","type":"text"}]}]},{"type":"list_item","content":[{"type":"paragraph","content":[{"text":"For irreversible changes (dropping columns with data), take a logical backup first","type":"text"}]}]}]},{"type":"hr","attrs":{"markup":"---"}},{"type":"heading","attrs":{"level":2},"content":[{"text":"Performance Optimization","type":"text"}]},{"type":"heading","attrs":{"level":3},"content":[{"text":"Indexing Strategies","type":"text"}]},{"type":"table","attrs":{"layout":null},"content":[{"type":"tr","content":[{"type":"th","attrs":{"colspan":1,"rowspan":1,"colwidth":null,"alignment":""},"content":[{"type":"paragraph","content":[{"text":"Index Type","type":"text"}]}]},{"type":"th","attrs":{"colspan":1,"rowspan":1,"colwidth":null,"alignment":""},"content":[{"type":"paragraph","content":[{"text":"Use Case","type":"text"}]}]},{"type":"th","attrs":{"colspan":1,"rowspan":1,"colwidth":null,"alignment":""},"content":[{"type":"paragraph","content":[{"text":"Example","type":"text"}]}]}]},{"type":"tr","content":[{"type":"td","attrs":{"colspan":1,"rowspan":1,"colwidth":null,"alignment":""},"content":[{"type":"paragraph","content":[{"text":"B-tree","type":"text","marks":[{"type":"strong"}]},{"text":" (default)","type":"text"}]}]},{"type":"td","attrs":{"colspan":1,"rowspan":1,"colwidth":null,"alignment":""},"content":[{"type":"paragraph","content":[{"text":"Equality, range, ORDER BY","type":"text"}]}]},{"type":"td","attrs":{"colspan":1,"rowspan":1,"colwidth":null,"alignment":""},"content":[{"type":"paragraph","content":[{"text":"CREATE INDEX idx_users_email ON users(email);","type":"text","marks":[{"type":"code_inline"}]}]}]}]},{"type":"tr","content":[{"type":"td","attrs":{"colspan":1,"rowspan":1,"colwidth":null,"alignment":""},"content":[{"type":"paragraph","content":[{"text":"GIN","type":"text","marks":[{"type":"strong"}]}]}]},{"type":"td","attrs":{"colspan":1,"rowspan":1,"colwidth":null,"alignment":""},"content":[{"type":"paragraph","content":[{"text":"Full-text search, JSONB, arrays","type":"text"}]}]},{"type":"td","attrs":{"colspan":1,"rowspan":1,"colwidth":null,"alignment":""},"content":[{"type":"paragraph","content":[{"text":"CREATE INDEX idx_docs_body ON docs USING gin(to_tsvector('english', body));","type":"text","marks":[{"type":"code_inline"}]}]}]}]},{"type":"tr","content":[{"type":"td","attrs":{"colspan":1,"rowspan":1,"colwidth":null,"alignment":""},"content":[{"type":"paragraph","content":[{"text":"GiST","type":"text","marks":[{"type":"strong"}]}]}]},{"type":"td","attrs":{"colspan":1,"rowspan":1,"colwidth":null,"alignment":""},"content":[{"type":"paragraph","content":[{"text":"Geometry, range types, nearest-neighbor","type":"text"}]}]},{"type":"td","attrs":{"colspan":1,"rowspan":1,"colwidth":null,"alignment":""},"content":[{"type":"paragraph","content":[{"text":"CREATE INDEX idx_locations ON places USING gist(coords);","type":"text","marks":[{"type":"code_inline"}]}]}]}]},{"type":"tr","content":[{"type":"td","attrs":{"colspan":1,"rowspan":1,"colwidth":null,"alignment":""},"content":[{"type":"paragraph","content":[{"text":"Partial","type":"text","marks":[{"type":"strong"}]}]}]},{"type":"td","attrs":{"colspan":1,"rowspan":1,"colwidth":null,"alignment":""},"content":[{"type":"paragraph","content":[{"text":"Subset of rows (reduce size)","type":"text"}]}]},{"type":"td","attrs":{"colspan":1,"rowspan":1,"colwidth":null,"alignment":""},"content":[{"type":"paragraph","content":[{"text":"CREATE INDEX idx_active ON users(email) WHERE active = true;","type":"text","marks":[{"type":"code_inline"}]}]}]}]},{"type":"tr","content":[{"type":"td","attrs":{"colspan":1,"rowspan":1,"colwidth":null,"alignment":""},"content":[{"type":"paragraph","content":[{"text":"Covering","type":"text","marks":[{"type":"strong"}]}]}]},{"type":"td","attrs":{"colspan":1,"rowspan":1,"colwidth":null,"alignment":""},"content":[{"type":"paragraph","content":[{"text":"Index-only scans","type":"text"}]}]},{"type":"td","attrs":{"colspan":1,"rowspan":1,"colwidth":null,"alignment":""},"content":[{"type":"paragraph","content":[{"text":"CREATE INDEX idx_cov ON orders(customer_id) INCLUDE (total, created_at);","type":"text","marks":[{"type":"code_inline"}]}]}]}]}]},{"type":"heading","attrs":{"level":3},"content":[{"text":"EXPLAIN Plan Reading","type":"text"}]},{"type":"code_block","attrs":{"wrap":false,"language":"sql"},"content":[{"text":"EXPLAIN (ANALYZE, BUFFERS, FORMAT TEXT) SELECT ...;","type":"text"}]},{"type":"paragraph","content":[{"text":"Key signals to watch:","type":"text"}]},{"type":"bullet_list","content":[{"type":"list_item","content":[{"type":"paragraph","content":[{"text":"Seq Scan","type":"text","marks":[{"type":"strong"}]},{"text":" on large tables — missing index","type":"text"}]}]},{"type":"list_item","content":[{"type":"paragraph","content":[{"text":"Nested Loop","type":"text","marks":[{"type":"strong"}]},{"text":" with high row estimates — consider hash/merge join or add index","type":"text"}]}]},{"type":"list_item","content":[{"type":"paragraph","content":[{"text":"Buffers shared read","type":"text","marks":[{"type":"strong"}]},{"text":" much higher than ","type":"text"},{"text":"hit","type":"text","marks":[{"type":"strong"}]},{"text":" — working set exceeds memory","type":"text"}]}]}]},{"type":"heading","attrs":{"level":3},"content":[{"text":"N+1 Query Detection","type":"text"}]},{"type":"paragraph","content":[{"text":"Symptoms: application issues one query per row (e.g., fetching related records in a loop).","type":"text"}]},{"type":"paragraph","content":[{"text":"Fixes:","type":"text"}]},{"type":"bullet_list","content":[{"type":"list_item","content":[{"type":"paragraph","content":[{"text":"Use ","type":"text"},{"text":"JOIN","type":"text","marks":[{"type":"code_inline"}]},{"text":" or subquery to fetch in one round-trip","type":"text"}]}]},{"type":"list_item","content":[{"type":"paragraph","content":[{"text":"ORM eager loading (","type":"text"},{"text":"select_related","type":"text","marks":[{"type":"code_inline"}]},{"text":" / ","type":"text"},{"text":"includes","type":"text","marks":[{"type":"code_inline"}]},{"text":" / ","type":"text"},{"text":"with","type":"text","marks":[{"type":"code_inline"}]},{"text":")","type":"text"}]}]},{"type":"list_item","content":[{"type":"paragraph","content":[{"text":"DataLoader pattern for GraphQL resolvers","type":"text"}]}]}]},{"type":"heading","attrs":{"level":3},"content":[{"text":"Connection Pooling","type":"text"}]},{"type":"table","attrs":{"layout":null},"content":[{"type":"tr","content":[{"type":"th","attrs":{"colspan":1,"rowspan":1,"colwidth":null,"alignment":""},"content":[{"type":"paragraph","content":[{"text":"Tool","type":"text"}]}]},{"type":"th","attrs":{"colspan":1,"rowspan":1,"colwidth":null,"alignment":""},"content":[{"type":"paragraph","content":[{"text":"Protocol","type":"text"}]}]},{"type":"th","attrs":{"colspan":1,"rowspan":1,"colwidth":null,"alignment":""},"content":[{"type":"paragraph","content":[{"text":"Best For","type":"text"}]}]}]},{"type":"tr","content":[{"type":"td","attrs":{"colspan":1,"rowspan":1,"colwidth":null,"alignment":""},"content":[{"type":"paragraph","content":[{"text":"PgBouncer","type":"text","marks":[{"type":"strong"}]}]}]},{"type":"td","attrs":{"colspan":1,"rowspan":1,"colwidth":null,"alignment":""},"content":[{"type":"paragraph","content":[{"text":"PostgreSQL","type":"text"}]}]},{"type":"td","attrs":{"colspan":1,"rowspan":1,"colwidth":null,"alignment":""},"content":[{"type":"paragraph","content":[{"text":"Transaction/statement pooling, low overhead","type":"text"}]}]}]},{"type":"tr","content":[{"type":"td","attrs":{"colspan":1,"rowspan":1,"colwidth":null,"alignment":""},"content":[{"type":"paragraph","content":[{"text":"ProxySQL","type":"text","marks":[{"type":"strong"}]}]}]},{"type":"td","attrs":{"colspan":1,"rowspan":1,"colwidth":null,"alignment":""},"content":[{"type":"paragraph","content":[{"text":"MySQL","type":"text"}]}]},{"type":"td","attrs":{"colspan":1,"rowspan":1,"colwidth":null,"alignment":""},"content":[{"type":"paragraph","content":[{"text":"Query routing, read/write splitting","type":"text"}]}]}]},{"type":"tr","content":[{"type":"td","attrs":{"colspan":1,"rowspan":1,"colwidth":null,"alignment":""},"content":[{"type":"paragraph","content":[{"text":"Built-in pool","type":"text","marks":[{"type":"strong"}]},{"text":" (HikariCP, SQLAlchemy pool)","type":"text"}]}]},{"type":"td","attrs":{"colspan":1,"rowspan":1,"colwidth":null,"alignment":""},"content":[{"type":"paragraph","content":[{"text":"Any","type":"text"}]}]},{"type":"td","attrs":{"colspan":1,"rowspan":1,"colwidth":null,"alignment":""},"content":[{"type":"paragraph","content":[{"text":"Application-level pooling","type":"text"}]}]}]}]},{"type":"paragraph","content":[{"text":"Rule of thumb:","type":"text","marks":[{"type":"strong"}]},{"text":" Set pool size to ","type":"text"},{"text":"(2 * CPU cores) + disk spindles","type":"text","marks":[{"type":"code_inline"}]},{"text":". For cloud SSDs, start with ","type":"text"},{"text":"2 * vCPUs","type":"text","marks":[{"type":"code_inline"}]},{"text":" and tune.","type":"text"}]},{"type":"heading","attrs":{"level":3},"content":[{"text":"Read Replicas and Query Routing","type":"text"}]},{"type":"bullet_list","content":[{"type":"list_item","content":[{"type":"paragraph","content":[{"text":"Route all ","type":"text"},{"text":"SELECT","type":"text","marks":[{"type":"code_inline"}]},{"text":" queries to replicas; writes to primary","type":"text"}]}]},{"type":"list_item","content":[{"type":"paragraph","content":[{"text":"Account for replication lag (typically \u003c1s for async, 0 for sync)","type":"text"}]}]},{"type":"list_item","content":[{"type":"paragraph","content":[{"text":"Use ","type":"text"},{"text":"pg_last_wal_replay_lsn()","type":"text","marks":[{"type":"code_inline"}]},{"text":" to detect lag before reading critical data","type":"text"}]}]}]},{"type":"hr","attrs":{"markup":"---"}},{"type":"heading","attrs":{"level":2},"content":[{"text":"Multi-Database Decision Matrix","type":"text"}]},{"type":"table","attrs":{"layout":null},"content":[{"type":"tr","content":[{"type":"th","attrs":{"colspan":1,"rowspan":1,"colwidth":null,"alignment":""},"content":[{"type":"paragraph","content":[{"text":"Criteria","type":"text"}]}]},{"type":"th","attrs":{"colspan":1,"rowspan":1,"colwidth":null,"alignment":""},"content":[{"type":"paragraph","content":[{"text":"PostgreSQL","type":"text"}]}]},{"type":"th","attrs":{"colspan":1,"rowspan":1,"colwidth":null,"alignment":""},"content":[{"type":"paragraph","content":[{"text":"MySQL","type":"text"}]}]},{"type":"th","attrs":{"colspan":1,"rowspan":1,"colwidth":null,"alignment":""},"content":[{"type":"paragraph","content":[{"text":"SQLite","type":"text"}]}]},{"type":"th","attrs":{"colspan":1,"rowspan":1,"colwidth":null,"alignment":""},"content":[{"type":"paragraph","content":[{"text":"SQL Server","type":"text"}]}]}]},{"type":"tr","content":[{"type":"td","attrs":{"colspan":1,"rowspan":1,"colwidth":null,"alignment":""},"content":[{"type":"paragraph","content":[{"text":"Best for","type":"text","marks":[{"type":"strong"}]}]}]},{"type":"td","attrs":{"colspan":1,"rowspan":1,"colwidth":null,"alignment":""},"content":[{"type":"paragraph","content":[{"text":"Complex queries, JSONB, extensions","type":"text"}]}]},{"type":"td","attrs":{"colspan":1,"rowspan":1,"colwidth":null,"alignment":""},"content":[{"type":"paragraph","content":[{"text":"Web apps, read-heavy workloads","type":"text"}]}]},{"type":"td","attrs":{"colspan":1,"rowspan":1,"colwidth":null,"alignment":""},"content":[{"type":"paragraph","content":[{"text":"Embedded, dev/test, edge","type":"text"}]}]},{"type":"td","attrs":{"colspan":1,"rowspan":1,"colwidth":null,"alignment":""},"content":[{"type":"paragraph","content":[{"text":"Enterprise .NET stacks","type":"text"}]}]}]},{"type":"tr","content":[{"type":"td","attrs":{"colspan":1,"rowspan":1,"colwidth":null,"alignment":""},"content":[{"type":"paragraph","content":[{"text":"JSON support","type":"text","marks":[{"type":"strong"}]}]}]},{"type":"td","attrs":{"colspan":1,"rowspan":1,"colwidth":null,"alignment":""},"content":[{"type":"paragraph","content":[{"text":"Excellent (JSONB + GIN)","type":"text"}]}]},{"type":"td","attrs":{"colspan":1,"rowspan":1,"colwidth":null,"alignment":""},"content":[{"type":"paragraph","content":[{"text":"Good (JSON type)","type":"text"}]}]},{"type":"td","attrs":{"colspan":1,"rowspan":1,"colwidth":null,"alignment":""},"content":[{"type":"paragraph","content":[{"text":"Minimal","type":"text"}]}]},{"type":"td","attrs":{"colspan":1,"rowspan":1,"colwidth":null,"alignment":""},"content":[{"type":"paragraph","content":[{"text":"Good (OPENJSON)","type":"text"}]}]}]},{"type":"tr","content":[{"type":"td","attrs":{"colspan":1,"rowspan":1,"colwidth":null,"alignment":""},"content":[{"type":"paragraph","content":[{"text":"Replication","type":"text","marks":[{"type":"strong"}]}]}]},{"type":"td","attrs":{"colspan":1,"rowspan":1,"colwidth":null,"alignment":""},"content":[{"type":"paragraph","content":[{"text":"Streaming, logical","type":"text"}]}]},{"type":"td","attrs":{"colspan":1,"rowspan":1,"colwidth":null,"alignment":""},"content":[{"type":"paragraph","content":[{"text":"Group replication, InnoDB cluster","type":"text"}]}]},{"type":"td","attrs":{"colspan":1,"rowspan":1,"colwidth":null,"alignment":""},"content":[{"type":"paragraph","content":[{"text":"N/A","type":"text"}]}]},{"type":"td","attrs":{"colspan":1,"rowspan":1,"colwidth":null,"alignment":""},"content":[{"type":"paragraph","content":[{"text":"Always On AG","type":"text"}]}]}]},{"type":"tr","content":[{"type":"td","attrs":{"colspan":1,"rowspan":1,"colwidth":null,"alignment":""},"content":[{"type":"paragraph","content":[{"text":"Licensing","type":"text","marks":[{"type":"strong"}]}]}]},{"type":"td","attrs":{"colspan":1,"rowspan":1,"colwidth":null,"alignment":""},"content":[{"type":"paragraph","content":[{"text":"Open source (PostgreSQL License)","type":"text"}]}]},{"type":"td","attrs":{"colspan":1,"rowspan":1,"colwidth":null,"alignment":""},"content":[{"type":"paragraph","content":[{"text":"Open source (GPL) / commercial","type":"text"}]}]},{"type":"td","attrs":{"colspan":1,"rowspan":1,"colwidth":null,"alignment":""},"content":[{"type":"paragraph","content":[{"text":"Public domain","type":"text"}]}]},{"type":"td","attrs":{"colspan":1,"rowspan":1,"colwidth":null,"alignment":""},"content":[{"type":"paragraph","content":[{"text":"Commercial","type":"text"}]}]}]},{"type":"tr","content":[{"type":"td","attrs":{"colspan":1,"rowspan":1,"colwidth":null,"alignment":""},"content":[{"type":"paragraph","content":[{"text":"Max practical size","type":"text","marks":[{"type":"strong"}]}]}]},{"type":"td","attrs":{"colspan":1,"rowspan":1,"colwidth":null,"alignment":""},"content":[{"type":"paragraph","content":[{"text":"Multi-TB","type":"text"}]}]},{"type":"td","attrs":{"colspan":1,"rowspan":1,"colwidth":null,"alignment":""},"content":[{"type":"paragraph","content":[{"text":"Multi-TB","type":"text"}]}]},{"type":"td","attrs":{"colspan":1,"rowspan":1,"colwidth":null,"alignment":""},"content":[{"type":"paragraph","content":[{"text":"~1 TB (single-writer)","type":"text"}]}]},{"type":"td","attrs":{"colspan":1,"rowspan":1,"colwidth":null,"alignment":""},"content":[{"type":"paragraph","content":[{"text":"Multi-TB","type":"text"}]}]}]}]},{"type":"paragraph","content":[{"text":"When to choose:","type":"text","marks":[{"type":"strong"}]}]},{"type":"bullet_list","content":[{"type":"list_item","content":[{"type":"paragraph","content":[{"text":"PostgreSQL","type":"text","marks":[{"type":"strong"}]},{"text":" — default choice for new projects; best extensibility and standards compliance","type":"text"}]}]},{"type":"list_item","content":[{"type":"paragraph","content":[{"text":"MySQL","type":"text","marks":[{"type":"strong"}]},{"text":" — existing MySQL ecosystem; simple read-heavy web applications","type":"text"}]}]},{"type":"list_item","content":[{"type":"paragraph","content":[{"text":"SQLite","type":"text","marks":[{"type":"strong"}]},{"text":" — mobile apps, CLI tools, unit test databases, IoT/edge","type":"text"}]}]},{"type":"list_item","content":[{"type":"paragraph","content":[{"text":"SQL Server","type":"text","marks":[{"type":"strong"}]},{"text":" — mandated by enterprise policy; deep .NET/Azure integration","type":"text"}]}]}]},{"type":"heading","attrs":{"level":3},"content":[{"text":"NoSQL Considerations","type":"text"}]},{"type":"table","attrs":{"layout":null},"content":[{"type":"tr","content":[{"type":"th","attrs":{"colspan":1,"rowspan":1,"colwidth":null,"alignment":""},"content":[{"type":"paragraph","content":[{"text":"Database","type":"text"}]}]},{"type":"th","attrs":{"colspan":1,"rowspan":1,"colwidth":null,"alignment":""},"content":[{"type":"paragraph","content":[{"text":"Model","type":"text"}]}]},{"type":"th","attrs":{"colspan":1,"rowspan":1,"colwidth":null,"alignment":""},"content":[{"type":"paragraph","content":[{"text":"Use When","type":"text"}]}]}]},{"type":"tr","content":[{"type":"td","attrs":{"colspan":1,"rowspan":1,"colwidth":null,"alignment":""},"content":[{"type":"paragraph","content":[{"text":"MongoDB","type":"text","marks":[{"type":"strong"}]}]}]},{"type":"td","attrs":{"colspan":1,"rowspan":1,"colwidth":null,"alignment":""},"content":[{"type":"paragraph","content":[{"text":"Document","type":"text"}]}]},{"type":"td","attrs":{"colspan":1,"rowspan":1,"colwidth":null,"alignment":""},"content":[{"type":"paragraph","content":[{"text":"Schema flexibility, rapid prototyping, content management","type":"text"}]}]}]},{"type":"tr","content":[{"type":"td","attrs":{"colspan":1,"rowspan":1,"colwidth":null,"alignment":""},"content":[{"type":"paragraph","content":[{"text":"Redis","type":"text","marks":[{"type":"strong"}]}]}]},{"type":"td","attrs":{"colspan":1,"rowspan":1,"colwidth":null,"alignment":""},"content":[{"type":"paragraph","content":[{"text":"Key-value / cache","type":"text"}]}]},{"type":"td","attrs":{"colspan":1,"rowspan":1,"colwidth":null,"alignment":""},"content":[{"type":"paragraph","content":[{"text":"Session store, rate limiting, leaderboards, pub/sub","type":"text"}]}]}]},{"type":"tr","content":[{"type":"td","attrs":{"colspan":1,"rowspan":1,"colwidth":null,"alignment":""},"content":[{"type":"paragraph","content":[{"text":"DynamoDB","type":"text","marks":[{"type":"strong"}]}]}]},{"type":"td","attrs":{"colspan":1,"rowspan":1,"colwidth":null,"alignment":""},"content":[{"type":"paragraph","content":[{"text":"Wide-column","type":"text"}]}]},{"type":"td","attrs":{"colspan":1,"rowspan":1,"colwidth":null,"alignment":""},"content":[{"type":"paragraph","content":[{"text":"Serverless AWS apps, single-digit-ms latency at any scale","type":"text"}]}]}]}]},{"type":"blockquote","content":[{"type":"paragraph","content":[{"text":"Use SQL as default. Reach for NoSQL only when the access pattern clearly benefits from it.","type":"text"}]}]},{"type":"hr","attrs":{"markup":"---"}},{"type":"heading","attrs":{"level":2},"content":[{"text":"Sharding & Replication","type":"text"}]},{"type":"heading","attrs":{"level":3},"content":[{"text":"Horizontal vs Vertical Partitioning","type":"text"}]},{"type":"bullet_list","content":[{"type":"list_item","content":[{"type":"paragraph","content":[{"text":"Vertical partitioning","type":"text","marks":[{"type":"strong"}]},{"text":": Split columns across tables (e.g., separate BLOB columns). Reduces I/O for narrow queries.","type":"text"}]}]},{"type":"list_item","content":[{"type":"paragraph","content":[{"text":"Horizontal partitioning (sharding)","type":"text","marks":[{"type":"strong"}]},{"text":": Split rows across databases/servers. Required when a single node cannot hold the dataset or handle the throughput.","type":"text"}]}]}]},{"type":"heading","attrs":{"level":3},"content":[{"text":"Sharding Strategies","type":"text"}]},{"type":"table","attrs":{"layout":null},"content":[{"type":"tr","content":[{"type":"th","attrs":{"colspan":1,"rowspan":1,"colwidth":null,"alignment":""},"content":[{"type":"paragraph","content":[{"text":"Strategy","type":"text"}]}]},{"type":"th","attrs":{"colspan":1,"rowspan":1,"colwidth":null,"alignment":""},"content":[{"type":"paragraph","content":[{"text":"How It Works","type":"text"}]}]},{"type":"th","attrs":{"colspan":1,"rowspan":1,"colwidth":null,"alignment":""},"content":[{"type":"paragraph","content":[{"text":"Pros","type":"text"}]}]},{"type":"th","attrs":{"colspan":1,"rowspan":1,"colwidth":null,"alignment":""},"content":[{"type":"paragraph","content":[{"text":"Cons","type":"text"}]}]}]},{"type":"tr","content":[{"type":"td","attrs":{"colspan":1,"rowspan":1,"colwidth":null,"alignment":""},"content":[{"type":"paragraph","content":[{"text":"Hash","type":"text","marks":[{"type":"strong"}]}]}]},{"type":"td","attrs":{"colspan":1,"rowspan":1,"colwidth":null,"alignment":""},"content":[{"type":"paragraph","content":[{"text":"shard = hash(key) % N","type":"text","marks":[{"type":"code_inline"}]}]}]},{"type":"td","attrs":{"colspan":1,"rowspan":1,"colwidth":null,"alignment":""},"content":[{"type":"paragraph","content":[{"text":"Even distribution","type":"text"}]}]},{"type":"td","attrs":{"colspan":1,"rowspan":1,"colwidth":null,"alignment":""},"content":[{"type":"paragraph","content":[{"text":"Resharding is expensive","type":"text"}]}]}]},{"type":"tr","content":[{"type":"td","attrs":{"colspan":1,"rowspan":1,"colwidth":null,"alignment":""},"content":[{"type":"paragraph","content":[{"text":"Range","type":"text","marks":[{"type":"strong"}]}]}]},{"type":"td","attrs":{"colspan":1,"rowspan":1,"colwidth":null,"alignment":""},"content":[{"type":"paragraph","content":[{"text":"Shard by date or ID range","type":"text"}]}]},{"type":"td","attrs":{"colspan":1,"rowspan":1,"colwidth":null,"alignment":""},"content":[{"type":"paragraph","content":[{"text":"Simple, good for time-series","type":"text"}]}]},{"type":"td","attrs":{"colspan":1,"rowspan":1,"colwidth":null,"alignment":""},"content":[{"type":"paragraph","content":[{"text":"Hot spots on latest shard","type":"text"}]}]}]},{"type":"tr","content":[{"type":"td","attrs":{"colspan":1,"rowspan":1,"colwidth":null,"alignment":""},"content":[{"type":"paragraph","content":[{"text":"Geographic","type":"text","marks":[{"type":"strong"}]}]}]},{"type":"td","attrs":{"colspan":1,"rowspan":1,"colwidth":null,"alignment":""},"content":[{"type":"paragraph","content":[{"text":"Shard by user region","type":"text"}]}]},{"type":"td","attrs":{"colspan":1,"rowspan":1,"colwidth":null,"alignment":""},"content":[{"type":"paragraph","content":[{"text":"Data locality, compliance","type":"text"}]}]},{"type":"td","attrs":{"colspan":1,"rowspan":1,"colwidth":null,"alignment":""},"content":[{"type":"paragraph","content":[{"text":"Cross-region queries are hard","type":"text"}]}]}]}]},{"type":"heading","attrs":{"level":3},"content":[{"text":"Replication Patterns","type":"text"}]},{"type":"table","attrs":{"layout":null},"content":[{"type":"tr","content":[{"type":"th","attrs":{"colspan":1,"rowspan":1,"colwidth":null,"alignment":""},"content":[{"type":"paragraph","content":[{"text":"Pattern","type":"text"}]}]},{"type":"th","attrs":{"colspan":1,"rowspan":1,"colwidth":null,"alignment":""},"content":[{"type":"paragraph","content":[{"text":"Consistency","type":"text"}]}]},{"type":"th","attrs":{"colspan":1,"rowspan":1,"colwidth":null,"alignment":""},"content":[{"type":"paragraph","content":[{"text":"Latency","type":"text"}]}]},{"type":"th","attrs":{"colspan":1,"rowspan":1,"colwidth":null,"alignment":""},"content":[{"type":"paragraph","content":[{"text":"Use Case","type":"text"}]}]}]},{"type":"tr","content":[{"type":"td","attrs":{"colspan":1,"rowspan":1,"colwidth":null,"alignment":""},"content":[{"type":"paragraph","content":[{"text":"Synchronous","type":"text","marks":[{"type":"strong"}]}]}]},{"type":"td","attrs":{"colspan":1,"rowspan":1,"colwidth":null,"alignment":""},"content":[{"type":"paragraph","content":[{"text":"Strong","type":"text"}]}]},{"type":"td","attrs":{"colspan":1,"rowspan":1,"colwidth":null,"alignment":""},"content":[{"type":"paragraph","content":[{"text":"Higher write latency","type":"text"}]}]},{"type":"td","attrs":{"colspan":1,"rowspan":1,"colwidth":null,"alignment":""},"content":[{"type":"paragraph","content":[{"text":"Financial transactions","type":"text"}]}]}]},{"type":"tr","content":[{"type":"td","attrs":{"colspan":1,"rowspan":1,"colwidth":null,"alignment":""},"content":[{"type":"paragraph","content":[{"text":"Asynchronous","type":"text","marks":[{"type":"strong"}]}]}]},{"type":"td","attrs":{"colspan":1,"rowspan":1,"colwidth":null,"alignment":""},"content":[{"type":"paragraph","content":[{"text":"Eventual","type":"text"}]}]},{"type":"td","attrs":{"colspan":1,"rowspan":1,"colwidth":null,"alignment":""},"content":[{"type":"paragraph","content":[{"text":"Low write latency","type":"text"}]}]},{"type":"td","attrs":{"colspan":1,"rowspan":1,"colwidth":null,"alignment":""},"content":[{"type":"paragraph","content":[{"text":"Read-heavy web apps","type":"text"}]}]}]},{"type":"tr","content":[{"type":"td","attrs":{"colspan":1,"rowspan":1,"colwidth":null,"alignment":""},"content":[{"type":"paragraph","content":[{"text":"Semi-synchronous","type":"text","marks":[{"type":"strong"}]}]}]},{"type":"td","attrs":{"colspan":1,"rowspan":1,"colwidth":null,"alignment":""},"content":[{"type":"paragraph","content":[{"text":"At-least-one replica confirmed","type":"text"}]}]},{"type":"td","attrs":{"colspan":1,"rowspan":1,"colwidth":null,"alignment":""},"content":[{"type":"paragraph","content":[{"text":"Moderate","type":"text"}]}]},{"type":"td","attrs":{"colspan":1,"rowspan":1,"colwidth":null,"alignment":""},"content":[{"type":"paragraph","content":[{"text":"Balance of safety and speed","type":"text"}]}]}]}]},{"type":"hr","attrs":{"markup":"---"}},{"type":"heading","attrs":{"level":2},"content":[{"text":"Cross-References","type":"text"}]},{"type":"bullet_list","content":[{"type":"list_item","content":[{"type":"paragraph","content":[{"text":"sql-database-assistant","type":"text","marks":[{"type":"strong"}]},{"text":" — query writing, optimization, and debugging for day-to-day SQL work","type":"text"}]}]},{"type":"list_item","content":[{"type":"paragraph","content":[{"text":"database-schema-designer","type":"text","marks":[{"type":"strong"}]},{"text":" — ERD modeling, normalization analysis, and schema generation","type":"text"}]}]},{"type":"list_item","content":[{"type":"paragraph","content":[{"text":"migration-architect","type":"text","marks":[{"type":"strong"}]},{"text":" — large-scale migration planning across database engines or major schema overhauls","type":"text"}]}]},{"type":"list_item","content":[{"type":"paragraph","content":[{"text":"senior-backend","type":"text","marks":[{"type":"strong"}]},{"text":" — application-layer patterns (connection pooling, ORM best practices)","type":"text"}]}]},{"type":"list_item","content":[{"type":"paragraph","content":[{"text":"senior-devops","type":"text","marks":[{"type":"strong"}]},{"text":" — infrastructure provisioning for database clusters and replicas","type":"text"}]}]}]},{"type":"hr","attrs":{"markup":"---"}},{"type":"heading","attrs":{"level":2},"content":[{"text":"Conclusion","type":"text"}]},{"type":"paragraph","content":[{"text":"Effective database design requires balancing multiple competing concerns: performance, scalability, maintainability, and business requirements. This skill provides the tools and knowledge to make informed decisions throughout the database lifecycle, from initial schema design through production optimization and evolution.","type":"text"}]},{"type":"heading","attrs":{"level":2},"content":[{"text":"The included tools automate common analysis and optimization tasks, while the comprehensive guides provide the theoretical foundation for making sound architectural decisions. Whether building a new system or optimizing an existing one, these resources provide expert-level guidance for creating robust, scalable database solutions.","type":"text"}]}]},"metadata":{"date":"2026-06-05","name":"database-designer","author":"@skillopedia","source":{"stars":16818,"repo_name":"claude-skills","origin_url":"https://github.com/alirezarezvani/claude-skills/blob/HEAD/engineering/skills/database-designer/SKILL.md","repo_owner":"alirezarezvani","body_sha256":"f94265486a718a36d124918c7f763bc87eec8ebaa8225b479cb9e20aa97b7139","cluster_key":"89b3c00215db939503ed1a10ce159d833dc733696a077c31a3f9c031cc6ce196","clean_bundle":{"format":"clean-skill-bundle-v1","source":"alirezarezvani/claude-skills/engineering/skills/database-designer/SKILL.md","attachments":[{"id":"7d454880-25b7-552a-a3d7-a59280ab3c9a","key":"uploads/10433ee7-ad12-4ae0-b34e-97553e46c6c8/7d454880-25b7-552a-a3d7-a59280ab3c9a/attachment.md","path":"README.md","size":11966,"sha256":"18264cfce2a0cf29a83902a64909944eac6a6d3ae50d6366f307a6fd97c0bbb5","contentType":"text/markdown; charset=utf-8"},{"id":"3cff81b2-1050-503c-8873-8a2065fab955","key":"uploads/10433ee7-ad12-4ae0-b34e-97553e46c6c8/3cff81b2-1050-503c-8873-8a2065fab955/attachment.json","path":"assets/sample_query_patterns.json","size":9244,"sha256":"de9d9fe65e8f8ff9aeac7f055d475e5b742999081caaf50ffdd3ff88aab98d4b","contentType":"application/json; charset=utf-8"},{"id":"ddb91aa0-4a46-5262-b0eb-6f93841045d7","key":"uploads/10433ee7-ad12-4ae0-b34e-97553e46c6c8/ddb91aa0-4a46-5262-b0eb-6f93841045d7/attachment.json","path":"assets/sample_schema.json","size":9365,"sha256":"006d510d7807dc1cf7a23710fcfe4b0cf0f6f37a7fafb559e79f91d6c56ed177","contentType":"application/json; charset=utf-8"},{"id":"4209c8f2-b72b-530c-ba2d-3755a03b3b1b","key":"uploads/10433ee7-ad12-4ae0-b34e-97553e46c6c8/4209c8f2-b72b-530c-ba2d-3755a03b3b1b/attachment.sql","path":"assets/sample_schema.sql","size":8301,"sha256":"8dfa99c27922537d64243a9dfddcee996d29f1c4d4ef23ce21db6f631bf828d5","contentType":"application/sql"},{"id":"ff8641e5-b435-5914-a42f-f0dcf672092e","key":"uploads/10433ee7-ad12-4ae0-b34e-97553e46c6c8/ff8641e5-b435-5914-a42f-f0dcf672092e/attachment.txt","path":"expected_outputs/index_optimization_sample.txt","size":2639,"sha256":"a31ac7a8eaccc2b1db5701ec025304b12ea5dc82e81df25eb662ccbc0cc48f67","contentType":"text/plain; charset=utf-8"},{"id":"c9b3a7a2-8f79-5eba-84be-a76bf54840ba","key":"uploads/10433ee7-ad12-4ae0-b34e-97553e46c6c8/c9b3a7a2-8f79-5eba-84be-a76bf54840ba/attachment.txt","path":"expected_outputs/migration_sample.txt","size":4509,"sha256":"bb34e3bf0728810756a184c0f1bd4e1ad61ee5adda2efcbce668f6d1dcadeee9","contentType":"text/plain; charset=utf-8"},{"id":"4ba3fb13-3d3e-5190-9b9b-1fe54e7f89b4","key":"uploads/10433ee7-ad12-4ae0-b34e-97553e46c6c8/4ba3fb13-3d3e-5190-9b9b-1fe54e7f89b4/attachment.txt","path":"expected_outputs/schema_analysis_sample.txt","size":7474,"sha256":"97335e832292f3687d9e0f576eee50e8b55dc184376cd457ae56578c4802c987","contentType":"text/plain; charset=utf-8"},{"id":"8a7303b7-fbc9-5b26-b8fb-c54830aabaaa","key":"uploads/10433ee7-ad12-4ae0-b34e-97553e46c6c8/8a7303b7-fbc9-5b26-b8fb-c54830aabaaa/attachment.py","path":"index_optimizer.py","size":38896,"sha256":"0c73e6724d02eae6d48a9dd2a626e06e7d551a3fcae3ab5f13a044384b2db485","contentType":"text/x-python; charset=utf-8"},{"id":"f35cde6e-f028-5bd0-9620-fef14c2117ee","key":"uploads/10433ee7-ad12-4ae0-b34e-97553e46c6c8/f35cde6e-f028-5bd0-9620-fef14c2117ee/attachment.py","path":"migration_generator.py","size":50373,"sha256":"420e49cc56fd5a3b1d9ad5983462f539389548dc885142fa0c0b596a44b3ec92","contentType":"text/x-python; charset=utf-8"},{"id":"3b2b1c5d-918e-528d-bbd7-2940d9415f68","key":"uploads/10433ee7-ad12-4ae0-b34e-97553e46c6c8/3b2b1c5d-918e-528d-bbd7-2940d9415f68/attachment.md","path":"references/database-design-reference.md","size":14014,"sha256":"b189de7137b7a1acd9408f3324ca5fa0e1a1bbdc53bf82f2eeda0911239f0370","contentType":"text/markdown; charset=utf-8"},{"id":"bbd0d5a5-627d-55a1-9f0e-9c2b5b2f7343","key":"uploads/10433ee7-ad12-4ae0-b34e-97553e46c6c8/bbd0d5a5-627d-55a1-9f0e-9c2b5b2f7343/attachment.md","path":"references/database_selection_decision_tree.md","size":16344,"sha256":"18d1a914c6097e6e7e8fd38da4e5847c24191e24063efb3d39f83526079f20ee","contentType":"text/markdown; charset=utf-8"},{"id":"6bd1b861-a132-53e9-b679-8b2efad68a76","key":"uploads/10433ee7-ad12-4ae0-b34e-97553e46c6c8/6bd1b861-a132-53e9-b679-8b2efad68a76/attachment.md","path":"references/index_strategy_patterns.md","size":11831,"sha256":"8d85e83b63c7977ac1cbfd70b2d42ac6fbdbfbeb9219d1dee98f0fafdafaabe8","contentType":"text/markdown; charset=utf-8"},{"id":"7a12734d-235e-5d6b-862e-41509852d1ee","key":"uploads/10433ee7-ad12-4ae0-b34e-97553e46c6c8/7a12734d-235e-5d6b-862e-41509852d1ee/attachment.md","path":"references/normalization_guide.md","size":10672,"sha256":"30f05dfb9c474203c3377cbd7814f2d61971bde0bbad0807c332ddf2d27a4c30","contentType":"text/markdown; charset=utf-8"},{"id":"999bdfff-1eb2-5f80-a36d-e13e65556b36","key":"uploads/10433ee7-ad12-4ae0-b34e-97553e46c6c8/999bdfff-1eb2-5f80-a36d-e13e65556b36/attachment.py","path":"schema_analyzer.py","size":42034,"sha256":"4c44912b055045f278c938e6dc90f8559bded16cf6dfa066860d66a3a7a42e42","contentType":"text/x-python; charset=utf-8"}],"bundle_sha256":"598c0887f1ad17cf2a302cb2181c9bea17ec6988feb0b7bfff42d8ba0dbf659c","attachment_count":14,"text_attachments":14,"attachment_storage":"skillopedia-attachments-v1","binary_attachments":0,"excluded_attachments":[]},"cluster_size":2,"skill_md_path":"engineering/skills/database-designer/SKILL.md","import_metadata":{"date":"2026-06-05","author":"@skillopedia","version":"v1","category":"data-analytics","category_label":"Data"},"exact_dupes_collapsed_into_this":1},"version":"v1","category":"data-analytics","import_tag":"clean-skills-v1","description":"Use when the user asks to design database schemas, plan data migrations, optimize queries, choose between SQL and NoSQL, or model data relationships."}},"renderedAt":1782979641093}

Database Designer - POWERFUL Tier Skill Overview A comprehensive database design skill that provides expert-level analysis, optimization, and migration capabilities for modern database systems. This skill combines theoretical principles with practical tools to help architects and developers create scalable, performant, and maintainable database schemas. Core Competencies Schema Design & Analysis - Normalization Analysis : Automated detection of normalization levels (1NF through BCNF) - Denormalization Strategy : Smart recommendations for performance optimization - Data Type Optimization : Ide…