Data Ingestion Patterns This skill provides patterns for getting data INTO systems from external sources. When to Use This Skill - Importing CSV, JSON, Parquet, or Excel files - Loading data from S3, GCS, or Azure Blob storage - Consuming REST/GraphQL API feeds - Building ETL/ELT pipelines - Database migration and CDC (Change Data Capture) - Streaming data ingestion from Kafka/Kinesis Ingestion Pattern Decision Tree Quick Start by Language Python (Recommended for ETL) dlt (data load tool) - Modern Python ETL: Polars for file processing (faster than pandas): TypeScript/Node.js S3 ingestion: AP…

)),\n \"amount\": pa.Column(float, pa.Check.ge(0)),\n \"status\": pa.Column(str, pa.Check.isin([\"pending\", \"completed\", \"failed\"]))\n})\n\n# Validate\ndf = pl.read_csv(\"data.csv\")\nvalidated = schema.validate(df.to_pandas())\n```\n\n## Format Selection Guide\n\n| Use Case | Format | Why |\n|----------|--------|-----|\n| Analytics/BI | Parquet | Columnar, compressed, fast |\n| Streaming/Logs | JSON Lines | Appendable, streamable |\n| Data Exchange | CSV | Universal compatibility |\n| Human Editing | Excel/CSV | Familiar tools |\n| Configuration | JSON/YAML | Structured, readable |\n","content_type":"text/markdown; charset=utf-8","language":"markdown","size":4194,"content_sha256":"5eab7f14b836a202e7e7edb43aab75db52ead5b8e3cb2b4692c21d2243848c63"},{"filename":"references/streaming-sources.md","content":"# Streaming Data Ingestion\n\n\n## Table of Contents\n\n- [Apache Kafka](#apache-kafka)\n - [Python (confluent-kafka)](#python-confluent-kafka)\n - [TypeScript (kafkajs)](#typescript-kafkajs)\n - [Rust (rdkafka)](#rust-rdkafka)\n- [AWS Kinesis](#aws-kinesis)\n - [Python (boto3)](#python-boto3)\n- [Google Pub/Sub](#google-pubsub)\n - [Python](#python)\n- [Exactly-Once Semantics](#exactly-once-semantics)\n - [Pattern: Idempotent Processing](#pattern-idempotent-processing)\n - [Pattern: Outbox for Reliability](#pattern-outbox-for-reliability)\n- [Dead Letter Queues](#dead-letter-queues)\n- [Backpressure Handling](#backpressure-handling)\n\n## Apache Kafka\n\n### Python (confluent-kafka)\n```python\nfrom confluent_kafka import Consumer, KafkaError\nimport json\n\ndef create_consumer(group_id: str, topics: list[str]) -> Consumer:\n conf = {\n 'bootstrap.servers': 'localhost:9092',\n 'group.id': group_id,\n 'auto.offset.reset': 'earliest',\n 'enable.auto.commit': False # Manual commit for exactly-once\n }\n consumer = Consumer(conf)\n consumer.subscribe(topics)\n return consumer\n\ndef consume_messages(consumer: Consumer, batch_size: int = 100):\n \"\"\"Consume messages with manual commit.\"\"\"\n messages = []\n\n while True:\n msg = consumer.poll(timeout=1.0)\n\n if msg is None:\n continue\n if msg.error():\n if msg.error().code() == KafkaError._PARTITION_EOF:\n continue\n raise Exception(msg.error())\n\n messages.append(json.loads(msg.value().decode()))\n\n if len(messages) >= batch_size:\n yield messages\n consumer.commit()\n messages = []\n\n# Usage\nconsumer = create_consumer(\"my-group\", [\"events\"])\nfor batch in consume_messages(consumer):\n process_batch(batch)\n```\n\n### TypeScript (kafkajs)\n```typescript\nimport { Kafka, Consumer, EachBatchPayload } from \"kafkajs\";\n\nconst kafka = new Kafka({\n clientId: \"my-app\",\n brokers: [\"localhost:9092\"]\n});\n\nconst consumer = kafka.consumer({ groupId: \"my-group\" });\n\nasync function startConsumer() {\n await consumer.connect();\n await consumer.subscribe({ topic: \"events\", fromBeginning: true });\n\n await consumer.run({\n eachBatch: async ({ batch, resolveOffset, heartbeat }: EachBatchPayload) => {\n for (const message of batch.messages) {\n const event = JSON.parse(message.value!.toString());\n await processEvent(event);\n resolveOffset(message.offset);\n await heartbeat();\n }\n }\n });\n}\n```\n\n### Rust (rdkafka)\n```rust\nuse rdkafka::consumer::{Consumer, StreamConsumer};\nuse rdkafka::Message;\n\nasync fn consume_events(consumer: StreamConsumer) -> Result\u003c()> {\n loop {\n match consumer.recv().await {\n Ok(msg) => {\n if let Some(payload) = msg.payload() {\n let event: Event = serde_json::from_slice(payload)?;\n process_event(event).await?;\n }\n consumer.commit_message(&msg, CommitMode::Async)?;\n }\n Err(e) => eprintln!(\"Kafka error: {}\", e),\n }\n }\n}\n```\n\n## AWS Kinesis\n\n### Python (boto3)\n```python\nimport boto3\nimport json\n\nkinesis = boto3.client('kinesis')\n\ndef consume_kinesis(stream_name: str, shard_id: str):\n # Get shard iterator\n response = kinesis.get_shard_iterator(\n StreamName=stream_name,\n ShardId=shard_id,\n ShardIteratorType='LATEST'\n )\n shard_iterator = response['ShardIterator']\n\n while True:\n response = kinesis.get_records(\n ShardIterator=shard_iterator,\n Limit=100\n )\n\n for record in response['Records']:\n data = json.loads(record['Data'])\n yield data\n\n shard_iterator = response['NextShardIterator']\n```\n\n## Google Pub/Sub\n\n### Python\n```python\nfrom google.cloud import pubsub_v1\nimport json\n\nsubscriber = pubsub_v1.SubscriberClient()\nsubscription_path = subscriber.subscription_path(\"project\", \"subscription\")\n\ndef callback(message):\n data = json.loads(message.data.decode())\n process_message(data)\n message.ack()\n\nstreaming_pull = subscriber.subscribe(subscription_path, callback=callback)\n\n# Run forever\nstreaming_pull.result()\n```\n\n## Exactly-Once Semantics\n\n### Pattern: Idempotent Processing\n```python\nasync def process_with_idempotency(event: dict):\n event_id = event[\"id\"]\n\n # Check if already processed\n existing = await db.execute(\n \"SELECT 1 FROM processed_events WHERE event_id = ?\",\n (event_id,)\n )\n if existing:\n return # Skip duplicate\n\n # Process in transaction\n async with db.transaction():\n await process_event(event)\n await db.execute(\n \"INSERT INTO processed_events (event_id, processed_at) VALUES (?, ?)\",\n (event_id, datetime.utcnow())\n )\n```\n\n### Pattern: Outbox for Reliability\n```python\nasync def process_with_outbox(event: dict):\n async with db.transaction():\n # 1. Write to outbox\n await db.execute(\n \"INSERT INTO outbox (event_id, payload, status) VALUES (?, ?, 'pending')\",\n (event[\"id\"], json.dumps(event))\n )\n\n # 2. Process event\n result = await process_event(event)\n\n # 3. Mark complete\n await db.execute(\n \"UPDATE outbox SET status = 'complete' WHERE event_id = ?\",\n (event[\"id\"],)\n )\n\n return result\n```\n\n## Dead Letter Queues\n\n```python\nasync def consume_with_dlq(consumer, dlq_producer):\n for msg in consumer:\n try:\n await process_message(msg)\n consumer.commit()\n except Exception as e:\n # Send to DLQ after max retries\n if msg.retry_count >= 3:\n await dlq_producer.send(\n topic=\"events-dlq\",\n value={\n \"original\": msg.value,\n \"error\": str(e),\n \"failed_at\": datetime.utcnow().isoformat()\n }\n )\n consumer.commit()\n else:\n # Re-queue for retry\n raise\n```\n\n## Backpressure Handling\n\n```python\nimport asyncio\nfrom asyncio import Semaphore\n\nclass BackpressureConsumer:\n def __init__(self, max_concurrent: int = 100):\n self.semaphore = Semaphore(max_concurrent)\n\n async def process_with_backpressure(self, message):\n async with self.semaphore:\n await process_message(message)\n\n async def consume(self, consumer):\n tasks = []\n async for msg in consumer:\n task = asyncio.create_task(self.process_with_backpressure(msg))\n tasks.append(task)\n\n # Periodically clean completed tasks\n if len(tasks) > 1000:\n tasks = [t for t in tasks if not t.done()]\n```\n","content_type":"text/markdown; charset=utf-8","language":"markdown","size":6852,"content_sha256":"1b20afed902507c178dc50767b8cd2c6804bf748e7f9e98474028536903c87a0"},{"filename":"scripts/generate_dlt_pipeline.py","content":"#!/usr/bin/env python3\n\"\"\"\ndlt Pipeline Generator\n\nGenerates a dlt (data load tool) pipeline scaffold for common ingestion patterns.\nCreates boilerplate code for API, file, or database sources.\n\nUsage:\n python generate_dlt_pipeline.py --source api --name github_issues --output ./pipelines\n python generate_dlt_pipeline.py --source s3 --name raw_events --output ./pipelines\n python generate_dlt_pipeline.py --source database --name legacy_users --output ./pipelines\n\"\"\"\n\nimport argparse\nimport sys\nfrom pathlib import Path\nfrom textwrap import dedent\n\n\ndef generate_api_pipeline(name: str) -> str:\n \"\"\"Generate API source pipeline.\"\"\"\n return dedent(f'''\n \"\"\"\n dlt Pipeline: {name}\n\n Ingests data from REST API with incremental loading support.\n\n Usage:\n python {name}_pipeline.py\n \"\"\"\n\n import dlt\n import requests\n from typing import Iterator\n\n\n @dlt.source\n def {name}_source(api_base_url: str, api_key: str = dlt.secrets.value):\n \"\"\"\n Source for {name} API data.\n\n Args:\n api_base_url: Base URL of the API\n api_key: API key for authentication\n \"\"\"\n\n @dlt.resource(\n write_disposition=\"merge\",\n primary_key=\"id\"\n )\n def items(\n updated_at=dlt.sources.incremental(\"updated_at\", initial_value=\"2024-01-01T00:00:00Z\")\n ) -> Iterator[dict]:\n \"\"\"Fetch items with incremental loading.\"\"\"\n headers = {{\"Authorization\": f\"Bearer {{api_key}}\"}}\n page = 1\n\n while True:\n response = requests.get(\n f\"{{api_base_url}}/items\",\n headers=headers,\n params={{\n \"updated_after\": updated_at.last_value,\n \"page\": page,\n \"per_page\": 100\n }}\n )\n response.raise_for_status()\n data = response.json()\n\n if not data[\"items\"]:\n break\n\n yield from data[\"items\"]\n page += 1\n\n return items\n\n\n def main():\n # Create pipeline\n pipeline = dlt.pipeline(\n pipeline_name=\"{name}\",\n destination=\"duckdb\", # Change to postgres, bigquery, etc.\n dataset_name=\"{name}_data\"\n )\n\n # Run pipeline\n load_info = pipeline.run(\n {name}_source(api_base_url=\"https://api.example.com\")\n )\n\n print(load_info)\n\n\n if __name__ == \"__main__\":\n main()\n ''').strip()\n\n\ndef generate_s3_pipeline(name: str) -> str:\n \"\"\"Generate S3 source pipeline.\"\"\"\n return dedent(f'''\n \"\"\"\n dlt Pipeline: {name}\n\n Ingests Parquet/CSV files from S3 bucket.\n\n Usage:\n python {name}_pipeline.py\n \"\"\"\n\n import dlt\n import boto3\n import polars as pl\n from io import BytesIO\n from typing import Iterator\n\n\n @dlt.source\n def {name}_source(bucket: str, prefix: str):\n \"\"\"\n Source for {name} S3 data.\n\n Args:\n bucket: S3 bucket name\n prefix: Object prefix to filter\n \"\"\"\n s3 = boto3.client(\"s3\")\n\n @dlt.resource(write_disposition=\"append\")\n def files() -> Iterator[dict]:\n \"\"\"Ingest files from S3.\"\"\"\n paginator = s3.get_paginator(\"list_objects_v2\")\n\n for page in paginator.paginate(Bucket=bucket, Prefix=prefix):\n for obj in page.get(\"Contents\", []):\n key = obj[\"Key\"]\n\n # Skip non-data files\n if not (key.endswith(\".parquet\") or key.endswith(\".csv\")):\n continue\n\n # Download and parse\n response = s3.get_object(Bucket=bucket, Key=key)\n body = response[\"Body\"].read()\n\n if key.endswith(\".parquet\"):\n df = pl.read_parquet(BytesIO(body))\n else:\n df = pl.read_csv(BytesIO(body))\n\n # Yield records\n for record in df.to_dicts():\n record[\"_source_file\"] = key\n yield record\n\n return files\n\n\n def main():\n # Create pipeline\n pipeline = dlt.pipeline(\n pipeline_name=\"{name}\",\n destination=\"duckdb\",\n dataset_name=\"{name}_data\"\n )\n\n # Run pipeline\n load_info = pipeline.run(\n {name}_source(\n bucket=\"my-data-bucket\",\n prefix=\"raw/events/\"\n )\n )\n\n print(load_info)\n\n\n if __name__ == \"__main__\":\n main()\n ''').strip()\n\n\ndef generate_database_pipeline(name: str) -> str:\n \"\"\"Generate database source pipeline.\"\"\"\n return dedent(f'''\n \"\"\"\n dlt Pipeline: {name}\n\n Migrates data from source database with incremental loading.\n\n Usage:\n python {name}_pipeline.py\n \"\"\"\n\n import dlt\n from dlt.sources.sql_database import sql_database\n\n\n def main():\n # Source database connection\n source_db = sql_database(\n credentials=dlt.secrets[\"sources.{name}.credentials\"],\n schema=\"public\",\n table_names=[\"users\", \"orders\", \"products\"],\n incremental=dlt.sources.incremental(\"updated_at\")\n )\n\n # Create pipeline\n pipeline = dlt.pipeline(\n pipeline_name=\"{name}\",\n destination=\"postgres\",\n dataset_name=\"{name}_data\"\n )\n\n # Run pipeline\n load_info = pipeline.run(source_db)\n\n print(load_info)\n\n\n if __name__ == \"__main__\":\n main()\n ''').strip()\n\n\nTEMPLATES = {\n \"api\": generate_api_pipeline,\n \"s3\": generate_s3_pipeline,\n \"database\": generate_database_pipeline,\n}\n\n\ndef main():\n parser = argparse.ArgumentParser(\n description=\"Generate dlt pipeline scaffold\"\n )\n parser.add_argument(\n \"--source\",\n choices=[\"api\", \"s3\", \"database\"],\n required=True,\n help=\"Source type\"\n )\n parser.add_argument(\n \"--name\",\n required=True,\n help=\"Pipeline name (snake_case)\"\n )\n parser.add_argument(\n \"--output\",\n type=Path,\n default=Path(\".\"),\n help=\"Output directory\"\n )\n\n args = parser.parse_args()\n\n # Generate code\n generator = TEMPLATES[args.source]\n code = generator(args.name)\n\n # Write file\n output_file = args.output / f\"{args.name}_pipeline.py\"\n args.output.mkdir(parents=True, exist_ok=True)\n output_file.write_text(code)\n\n print(f\"Generated: {output_file}\")\n print(\"\")\n print(\"Next steps:\")\n print(f\" 1. Edit {output_file} to configure your source\")\n print(\" 2. Add credentials to .dlt/secrets.toml\")\n print(f\" 3. Run: python {output_file}\")\n\n\nif __name__ == \"__main__\":\n main()\n","content_type":"text/x-python; charset=utf-8","language":"python","size":7517,"content_sha256":"278c9f40c088094b914f3e8f37dbdd4cda2d96243b1b21f1ad795192dfd8626e"},{"filename":"scripts/test_s3_connection.py","content":"#!/usr/bin/env python3\n\"\"\"\nS3 Connection Testing Tool\n\nTests S3 bucket connectivity, permissions, and file listing.\nUseful for validating AWS credentials before ingestion.\n\nUsage:\n python test_s3_connection.py --bucket my-bucket\n python test_s3_connection.py --bucket my-bucket --prefix data/2024/\n python test_s3_connection.py --bucket my-bucket --profile production\n\"\"\"\n\nimport argparse\nimport sys\nfrom datetime import datetime\n\ntry:\n import boto3\n from botocore.exceptions import ClientError, NoCredentialsError\nexcept ImportError:\n print(\"Error: boto3 library not installed\")\n print(\"Install with: pip install boto3\")\n sys.exit(1)\n\n\ndef test_connection(bucket: str, prefix: str = \"\", profile: str = None) -> bool:\n \"\"\"Test S3 bucket connection and permissions.\"\"\"\n\n # Create session\n if profile:\n session = boto3.Session(profile_name=profile)\n print(f\"Using AWS profile: {profile}\")\n else:\n session = boto3.Session()\n print(\"Using default AWS credentials\")\n\n s3 = session.client(\"s3\")\n print(f\"Bucket: {bucket}\")\n print(f\"Prefix: {prefix or '(root)'}\")\n print(\"\")\n\n # Test 1: Check bucket exists and is accessible\n print(\"1. Testing bucket access...\")\n try:\n s3.head_bucket(Bucket=bucket)\n print(\" Bucket exists and is accessible\")\n except ClientError as e:\n error_code = e.response[\"Error\"][\"Code\"]\n if error_code == \"404\":\n print(f\" Bucket not found: {bucket}\")\n elif error_code == \"403\":\n print(f\" Access denied to bucket: {bucket}\")\n else:\n print(f\" Error: {e}\")\n return False\n except NoCredentialsError:\n print(\" No AWS credentials found\")\n print(\" Set AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY\")\n return False\n\n # Test 2: List objects\n print(\"2. Testing list objects permission...\")\n try:\n response = s3.list_objects_v2(\n Bucket=bucket,\n Prefix=prefix,\n MaxKeys=10\n )\n count = response.get(\"KeyCount\", 0)\n print(f\" Found {count} objects (showing up to 10)\")\n\n if \"Contents\" in response:\n for obj in response[\"Contents\"][:5]:\n size = obj[\"Size\"]\n key = obj[\"Key\"]\n print(f\" - {key} ({format_size(size)})\")\n if count > 5:\n print(f\" ... and {count - 5} more\")\n except ClientError as e:\n print(f\" List objects failed: {e}\")\n return False\n\n # Test 3: Check read permission on first object\n print(\"3. Testing read permission...\")\n if \"Contents\" in response and response[\"Contents\"]:\n test_key = response[\"Contents\"][0][\"Key\"]\n try:\n s3.head_object(Bucket=bucket, Key=test_key)\n print(f\" Can read: {test_key}\")\n except ClientError as e:\n print(f\" Read permission denied: {e}\")\n return False\n else:\n print(\" No objects to test (bucket may be empty)\")\n\n # Test 4: Check region\n print(\"4. Checking bucket region...\")\n try:\n location = s3.get_bucket_location(Bucket=bucket)\n region = location.get(\"LocationConstraint\") or \"us-east-1\"\n print(f\" Bucket region: {region}\")\n except ClientError:\n print(\" Could not determine region\")\n\n print(\"\")\n print(\"Connection test PASSED\")\n return True\n\n\ndef format_size(size: int) -> str:\n \"\"\"Format byte size to human readable.\"\"\"\n for unit in [\"B\", \"KB\", \"MB\", \"GB\"]:\n if size \u003c 1024:\n return f\"{size:.1f} {unit}\"\n size /= 1024\n return f\"{size:.1f} TB\"\n\n\ndef main():\n parser = argparse.ArgumentParser(\n description=\"Test S3 bucket connectivity\"\n )\n parser.add_argument(\n \"--bucket\",\n required=True,\n help=\"S3 bucket name\"\n )\n parser.add_argument(\n \"--prefix\",\n default=\"\",\n help=\"Object prefix to filter (optional)\"\n )\n parser.add_argument(\n \"--profile\",\n help=\"AWS profile name (optional)\"\n )\n\n args = parser.parse_args()\n\n success = test_connection(\n bucket=args.bucket,\n prefix=args.prefix,\n profile=args.profile\n )\n\n sys.exit(0 if success else 1)\n\n\nif __name__ == \"__main__\":\n main()\n","content_type":"text/x-python; charset=utf-8","language":"python","size":4320,"content_sha256":"54d5f35ccd891b4cb18938e281712038439dd7f56cd96dcb7cc5367bd697cf97"},{"filename":"scripts/validate_csv_schema.py","content":"#!/usr/bin/env python3\n\"\"\"\nCSV Schema Validation Tool\n\nValidates CSV files against expected schema before ingestion.\nChecks column names, data types, and value constraints.\n\nUsage:\n python validate_csv_schema.py --file data.csv --schema schema.json\n python validate_csv_schema.py --file data.csv --columns id:int,name:str,amount:float\n\"\"\"\n\nimport argparse\nimport json\nimport sys\nfrom pathlib import Path\nfrom typing import Dict, List, Any\n\ntry:\n import polars as pl\nexcept ImportError:\n print(\"Error: polars library not installed\")\n print(\"Install with: pip install polars\")\n sys.exit(1)\n\n\ndef parse_column_spec(spec: str) -> Dict[str, str]:\n \"\"\"Parse column:type specification.\"\"\"\n columns = {}\n for item in spec.split(\",\"):\n name, dtype = item.split(\":\")\n columns[name.strip()] = dtype.strip()\n return columns\n\n\ndef validate_columns(df: pl.DataFrame, expected: Dict[str, str]) -> List[str]:\n \"\"\"Validate column names and types.\"\"\"\n errors = []\n\n # Check for missing columns\n for col in expected:\n if col not in df.columns:\n errors.append(f\"Missing column: {col}\")\n\n # Check for extra columns\n for col in df.columns:\n if col not in expected:\n errors.append(f\"Unexpected column: {col}\")\n\n # Check types\n type_map = {\n \"int\": [pl.Int8, pl.Int16, pl.Int32, pl.Int64],\n \"float\": [pl.Float32, pl.Float64],\n \"str\": [pl.Utf8, pl.String],\n \"bool\": [pl.Boolean],\n \"date\": [pl.Date],\n \"datetime\": [pl.Datetime],\n }\n\n for col, expected_type in expected.items():\n if col in df.columns:\n actual_type = df[col].dtype\n valid_types = type_map.get(expected_type, [])\n\n if valid_types and actual_type not in valid_types:\n errors.append(\n f\"Column '{col}': expected {expected_type}, got {actual_type}\"\n )\n\n return errors\n\n\ndef validate_nulls(df: pl.DataFrame, required: List[str]) -> List[str]:\n \"\"\"Check for null values in required columns.\"\"\"\n errors = []\n\n for col in required:\n if col in df.columns:\n null_count = df[col].null_count()\n if null_count > 0:\n errors.append(f\"Column '{col}' has {null_count} null values\")\n\n return errors\n\n\ndef validate_unique(df: pl.DataFrame, unique: List[str]) -> List[str]:\n \"\"\"Check for unique constraints.\"\"\"\n errors = []\n\n for col in unique:\n if col in df.columns:\n total = len(df)\n unique_count = df[col].n_unique()\n if unique_count \u003c total:\n errors.append(\n f\"Column '{col}' has {total - unique_count} duplicate values\"\n )\n\n return errors\n\n\ndef main():\n parser = argparse.ArgumentParser(\n description=\"Validate CSV file against schema\"\n )\n parser.add_argument(\n \"--file\",\n type=Path,\n required=True,\n help=\"CSV file to validate\"\n )\n parser.add_argument(\n \"--schema\",\n type=Path,\n help=\"JSON schema file\"\n )\n parser.add_argument(\n \"--columns\",\n help=\"Column spec (e.g., id:int,name:str,amount:float)\"\n )\n parser.add_argument(\n \"--required\",\n help=\"Comma-separated list of required (non-null) columns\"\n )\n parser.add_argument(\n \"--unique\",\n help=\"Comma-separated list of unique columns\"\n )\n parser.add_argument(\n \"--sample\",\n type=int,\n default=1000,\n help=\"Number of rows to sample for validation (default: 1000)\"\n )\n\n args = parser.parse_args()\n\n # Load CSV\n try:\n df = pl.read_csv(args.file, n_rows=args.sample)\n print(f\"Loaded {len(df)} rows from {args.file}\")\n print(f\"Columns: {df.columns}\")\n print(\"\")\n except Exception as e:\n print(f\"Error reading CSV: {e}\")\n sys.exit(1)\n\n all_errors = []\n\n # Load schema\n if args.schema:\n with open(args.schema) as f:\n schema = json.load(f)\n expected_columns = schema.get(\"columns\", {})\n required = schema.get(\"required\", [])\n unique = schema.get(\"unique\", [])\n elif args.columns:\n expected_columns = parse_column_spec(args.columns)\n required = args.required.split(\",\") if args.required else []\n unique = args.unique.split(\",\") if args.unique else []\n else:\n print(\"Error: Either --schema or --columns must be provided\")\n sys.exit(1)\n\n # Validate columns\n print(\"Validating columns...\")\n all_errors.extend(validate_columns(df, expected_columns))\n\n # Validate nulls\n if required:\n print(\"Checking required columns...\")\n all_errors.extend(validate_nulls(df, required))\n\n # Validate unique\n if unique:\n print(\"Checking unique constraints...\")\n all_errors.extend(validate_unique(df, unique))\n\n # Print results\n print(\"\")\n if all_errors:\n print(\"Validation FAILED:\")\n for error in all_errors:\n print(f\" - {error}\")\n sys.exit(1)\n else:\n print(\"Validation PASSED\")\n print(\"\")\n print(\"Schema summary:\")\n for col in df.columns:\n print(f\" {col}: {df[col].dtype}\")\n sys.exit(0)\n\n\nif __name__ == \"__main__\":\n main()\n","content_type":"text/x-python; charset=utf-8","language":"python","size":5325,"content_sha256":"bbfc67c3fc42f8b85b35a5e8286d958f4465f406261086e2a53b25337d5fa1b1"}],"content_json":{"type":"doc","content":[{"type":"heading","attrs":{"level":1},"content":[{"text":"Data Ingestion Patterns","type":"text"}]},{"type":"paragraph","content":[{"text":"This skill provides patterns for getting data INTO systems from external sources.","type":"text"}]},{"type":"heading","attrs":{"level":2},"content":[{"text":"When to Use This Skill","type":"text"}]},{"type":"bullet_list","content":[{"type":"list_item","content":[{"type":"paragraph","content":[{"text":"Importing CSV, JSON, Parquet, or Excel files","type":"text"}]}]},{"type":"list_item","content":[{"type":"paragraph","content":[{"text":"Loading data from S3, GCS, or Azure Blob storage","type":"text"}]}]},{"type":"list_item","content":[{"type":"paragraph","content":[{"text":"Consuming REST/GraphQL API feeds","type":"text"}]}]},{"type":"list_item","content":[{"type":"paragraph","content":[{"text":"Building ETL/ELT pipelines","type":"text"}]}]},{"type":"list_item","content":[{"type":"paragraph","content":[{"text":"Database migration and CDC (Change Data Capture)","type":"text"}]}]},{"type":"list_item","content":[{"type":"paragraph","content":[{"text":"Streaming data ingestion from Kafka/Kinesis","type":"text"}]}]}]},{"type":"heading","attrs":{"level":2},"content":[{"text":"Ingestion Pattern Decision Tree","type":"text"}]},{"type":"code_block","attrs":{"wrap":false,"language":""},"content":[{"text":"What is your data source?\n├── Cloud Storage (S3, GCS, Azure) → See cloud-storage.md\n├── Files (CSV, JSON, Parquet) → See file-formats.md\n├── REST/GraphQL APIs → See api-feeds.md\n├── Streaming (Kafka, Kinesis) → See streaming-sources.md\n├── Legacy Database → See database-migration.md\n└── Need full ETL framework → See etl-tools.md","type":"text"}]},{"type":"heading","attrs":{"level":2},"content":[{"text":"Quick Start by Language","type":"text"}]},{"type":"heading","attrs":{"level":3},"content":[{"text":"Python (Recommended for ETL)","type":"text"}]},{"type":"paragraph","content":[{"text":"dlt (data load tool) - Modern Python ETL:","type":"text","marks":[{"type":"strong"}]}]},{"type":"code_block","attrs":{"wrap":false,"language":"python"},"content":[{"text":"import dlt\n\n# Define a source\[email protected]\ndef github_source(repo: str):\n @dlt.resource(write_disposition=\"merge\", primary_key=\"id\")\n def issues():\n response = requests.get(f\"https://api.github.com/repos/{repo}/issues\")\n yield response.json()\n return issues\n\n# Load to destination\npipeline = dlt.pipeline(\n pipeline_name=\"github_issues\",\n destination=\"postgres\", # or duckdb, bigquery, snowflake\n dataset_name=\"github_data\"\n)\n\nload_info = pipeline.run(github_source(\"owner/repo\"))\nprint(load_info)","type":"text"}]},{"type":"paragraph","content":[{"text":"Polars for file processing (faster than pandas):","type":"text","marks":[{"type":"strong"}]}]},{"type":"code_block","attrs":{"wrap":false,"language":"python"},"content":[{"text":"import polars as pl\n\n# Read CSV with schema inference\ndf = pl.read_csv(\"data.csv\")\n\n# Read Parquet (columnar, efficient)\ndf = pl.read_parquet(\"s3://bucket/data.parquet\")\n\n# Read JSON lines\ndf = pl.read_ndjson(\"events.jsonl\")\n\n# Write to database\ndf.write_database(\n table_name=\"events\",\n connection=\"postgresql://user:pass@localhost/db\",\n if_table_exists=\"append\"\n)","type":"text"}]},{"type":"heading","attrs":{"level":3},"content":[{"text":"TypeScript/Node.js","type":"text"}]},{"type":"paragraph","content":[{"text":"S3 ingestion:","type":"text","marks":[{"type":"strong"}]}]},{"type":"code_block","attrs":{"wrap":false,"language":"typescript"},"content":[{"text":"import { S3Client, GetObjectCommand } from \"@aws-sdk/client-s3\";\nimport { parse } from \"csv-parse/sync\";\n\nconst s3 = new S3Client({ region: \"us-east-1\" });\n\nasync function ingestFromS3(bucket: string, key: string) {\n const response = await s3.send(new GetObjectCommand({ Bucket: bucket, Key: key }));\n const body = await response.Body?.transformToString();\n\n // Parse CSV\n const records = parse(body, { columns: true, skip_empty_lines: true });\n\n // Insert to database\n await db.insert(eventsTable).values(records);\n}","type":"text"}]},{"type":"paragraph","content":[{"text":"API feed polling:","type":"text","marks":[{"type":"strong"}]}]},{"type":"code_block","attrs":{"wrap":false,"language":"typescript"},"content":[{"text":"import { Hono } from \"hono\";\n\n// Webhook receiver for real-time ingestion\nconst app = new Hono();\n\napp.post(\"/webhooks/stripe\", async (c) => {\n const event = await c.req.json();\n\n // Validate webhook signature\n const signature = c.req.header(\"stripe-signature\");\n // ... validation logic\n\n // Ingest event\n await db.insert(stripeEventsTable).values({\n eventId: event.id,\n type: event.type,\n data: event.data,\n receivedAt: new Date()\n });\n\n return c.json({ received: true });\n});","type":"text"}]},{"type":"heading","attrs":{"level":3},"content":[{"text":"Rust","type":"text"}]},{"type":"paragraph","content":[{"text":"High-performance file ingestion:","type":"text","marks":[{"type":"strong"}]}]},{"type":"code_block","attrs":{"wrap":false,"language":"rust"},"content":[{"text":"use polars::prelude::*;\nuse aws_sdk_s3::Client;\n\nasync fn ingest_parquet(client: &Client, bucket: &str, key: &str) -> Result\u003cDataFrame> {\n // Download from S3\n let resp = client.get_object()\n .bucket(bucket)\n .key(key)\n .send()\n .await?;\n\n let bytes = resp.body.collect().await?.into_bytes();\n\n // Parse with Polars\n let df = ParquetReader::new(Cursor::new(bytes))\n .finish()?;\n\n Ok(df)\n}","type":"text"}]},{"type":"heading","attrs":{"level":3},"content":[{"text":"Go","type":"text"}]},{"type":"paragraph","content":[{"text":"Concurrent file processing:","type":"text","marks":[{"type":"strong"}]}]},{"type":"code_block","attrs":{"wrap":false,"language":"go"},"content":[{"text":"package main\n\nimport (\n \"context\"\n \"encoding/csv\"\n \"github.com/aws/aws-sdk-go-v2/service/s3\"\n)\n\nfunc ingestCSV(ctx context.Context, client *s3.Client, bucket, key string) error {\n resp, err := client.GetObject(ctx, &s3.GetObjectInput{\n Bucket: &bucket,\n Key: &key,\n })\n if err != nil {\n return err\n }\n defer resp.Body.Close()\n\n reader := csv.NewReader(resp.Body)\n records, err := reader.ReadAll()\n if err != nil {\n return err\n }\n\n // Batch insert to database\n return batchInsert(ctx, records)\n}","type":"text"}]},{"type":"heading","attrs":{"level":2},"content":[{"text":"Ingestion Patterns","type":"text"}]},{"type":"heading","attrs":{"level":3},"content":[{"text":"1. Batch Ingestion (Files/Storage)","type":"text"}]},{"type":"paragraph","content":[{"text":"For periodic bulk loads:","type":"text"}]},{"type":"code_block","attrs":{"wrap":false,"language":""},"content":[{"text":"Source → Extract → Transform → Load → Validate\n ↓ ↓ ↓ ↓ ↓\n S3 Download Clean/Map Insert Count check","type":"text"}]},{"type":"paragraph","content":[{"text":"Key considerations:","type":"text","marks":[{"type":"strong"}]}]},{"type":"bullet_list","content":[{"type":"list_item","content":[{"type":"paragraph","content":[{"text":"Use chunked reading for large files (>100MB)","type":"text"}]}]},{"type":"list_item","content":[{"type":"paragraph","content":[{"text":"Implement idempotency with checksums","type":"text"}]}]},{"type":"list_item","content":[{"type":"paragraph","content":[{"text":"Track file processing state","type":"text"}]}]},{"type":"list_item","content":[{"type":"paragraph","content":[{"text":"Handle partial failures","type":"text"}]}]}]},{"type":"heading","attrs":{"level":3},"content":[{"text":"2. Streaming Ingestion (Real-time)","type":"text"}]},{"type":"paragraph","content":[{"text":"For continuous data flow:","type":"text"}]},{"type":"code_block","attrs":{"wrap":false,"language":""},"content":[{"text":"Source → Buffer → Process → Load → Ack\n ↓ ↓ ↓ ↓ ↓\nKafka In-memory Transform DB Commit offset","type":"text"}]},{"type":"paragraph","content":[{"text":"Key considerations:","type":"text","marks":[{"type":"strong"}]}]},{"type":"bullet_list","content":[{"type":"list_item","content":[{"type":"paragraph","content":[{"text":"At-least-once vs exactly-once semantics","type":"text"}]}]},{"type":"list_item","content":[{"type":"paragraph","content":[{"text":"Backpressure handling","type":"text"}]}]},{"type":"list_item","content":[{"type":"paragraph","content":[{"text":"Dead letter queues for failures","type":"text"}]}]},{"type":"list_item","content":[{"type":"paragraph","content":[{"text":"Checkpoint management","type":"text"}]}]}]},{"type":"heading","attrs":{"level":3},"content":[{"text":"3. API Polling (Feeds)","type":"text"}]},{"type":"paragraph","content":[{"text":"For external API data:","type":"text"}]},{"type":"code_block","attrs":{"wrap":false,"language":""},"content":[{"text":"Schedule → Fetch → Dedupe → Load → Update cursor\n ↓ ↓ ↓ ↓ ↓\n Cron API call By ID Insert Last timestamp","type":"text"}]},{"type":"paragraph","content":[{"text":"Key considerations:","type":"text","marks":[{"type":"strong"}]}]},{"type":"bullet_list","content":[{"type":"list_item","content":[{"type":"paragraph","content":[{"text":"Rate limiting and backoff","type":"text"}]}]},{"type":"list_item","content":[{"type":"paragraph","content":[{"text":"Incremental loading (cursors, timestamps)","type":"text"}]}]},{"type":"list_item","content":[{"type":"paragraph","content":[{"text":"API pagination handling","type":"text"}]}]},{"type":"list_item","content":[{"type":"paragraph","content":[{"text":"Retry with exponential backoff","type":"text"}]}]}]},{"type":"heading","attrs":{"level":3},"content":[{"text":"4. Change Data Capture (CDC)","type":"text"}]},{"type":"paragraph","content":[{"text":"For database replication:","type":"text"}]},{"type":"code_block","attrs":{"wrap":false,"language":""},"content":[{"text":"Source DB → Capture changes → Transform → Target DB\n ↓ ↓ ↓ ↓\n Postgres Debezium/WAL Map schema Insert/Update","type":"text"}]},{"type":"paragraph","content":[{"text":"Key considerations:","type":"text","marks":[{"type":"strong"}]}]},{"type":"bullet_list","content":[{"type":"list_item","content":[{"type":"paragraph","content":[{"text":"Initial snapshot + streaming changes","type":"text"}]}]},{"type":"list_item","content":[{"type":"paragraph","content":[{"text":"Schema evolution handling","type":"text"}]}]},{"type":"list_item","content":[{"type":"paragraph","content":[{"text":"Ordering guarantees","type":"text"}]}]},{"type":"list_item","content":[{"type":"paragraph","content":[{"text":"Conflict resolution","type":"text"}]}]}]},{"type":"heading","attrs":{"level":2},"content":[{"text":"Library Recommendations","type":"text"}]},{"type":"table","attrs":{"layout":null},"content":[{"type":"tr","content":[{"type":"th","attrs":{"colspan":1,"rowspan":1,"colwidth":null,"alignment":""},"content":[{"type":"paragraph","content":[{"text":"Use Case","type":"text"}]}]},{"type":"th","attrs":{"colspan":1,"rowspan":1,"colwidth":null,"alignment":""},"content":[{"type":"paragraph","content":[{"text":"Python","type":"text"}]}]},{"type":"th","attrs":{"colspan":1,"rowspan":1,"colwidth":null,"alignment":""},"content":[{"type":"paragraph","content":[{"text":"TypeScript","type":"text"}]}]},{"type":"th","attrs":{"colspan":1,"rowspan":1,"colwidth":null,"alignment":""},"content":[{"type":"paragraph","content":[{"text":"Rust","type":"text"}]}]},{"type":"th","attrs":{"colspan":1,"rowspan":1,"colwidth":null,"alignment":""},"content":[{"type":"paragraph","content":[{"text":"Go","type":"text"}]}]}]},{"type":"tr","content":[{"type":"td","attrs":{"colspan":1,"rowspan":1,"colwidth":null,"alignment":""},"content":[{"type":"paragraph","content":[{"text":"ETL Framework","type":"text","marks":[{"type":"strong"}]}]}]},{"type":"td","attrs":{"colspan":1,"rowspan":1,"colwidth":null,"alignment":""},"content":[{"type":"paragraph","content":[{"text":"dlt, Meltano, Dagster","type":"text"}]}]},{"type":"td","attrs":{"colspan":1,"rowspan":1,"colwidth":null,"alignment":""},"content":[{"type":"paragraph","content":[{"text":"-","type":"text"}]}]},{"type":"td","attrs":{"colspan":1,"rowspan":1,"colwidth":null,"alignment":""},"content":[{"type":"paragraph","content":[{"text":"-","type":"text"}]}]},{"type":"td","attrs":{"colspan":1,"rowspan":1,"colwidth":null,"alignment":""},"content":[{"type":"paragraph","content":[{"text":"-","type":"text"}]}]}]},{"type":"tr","content":[{"type":"td","attrs":{"colspan":1,"rowspan":1,"colwidth":null,"alignment":""},"content":[{"type":"paragraph","content":[{"text":"Cloud Storage","type":"text","marks":[{"type":"strong"}]}]}]},{"type":"td","attrs":{"colspan":1,"rowspan":1,"colwidth":null,"alignment":""},"content":[{"type":"paragraph","content":[{"text":"boto3, gcsfs, adlfs","type":"text"}]}]},{"type":"td","attrs":{"colspan":1,"rowspan":1,"colwidth":null,"alignment":""},"content":[{"type":"paragraph","content":[{"text":"@aws-sdk/","type":"text"},{"text":", @google-cloud/","type":"text","marks":[{"type":"em"}]}]}]},{"type":"td","attrs":{"colspan":1,"rowspan":1,"colwidth":null,"alignment":""},"content":[{"type":"paragraph","content":[{"text":"aws-sdk-s3, object_store","type":"text"}]}]},{"type":"td","attrs":{"colspan":1,"rowspan":1,"colwidth":null,"alignment":""},"content":[{"type":"paragraph","content":[{"text":"aws-sdk-go-v2","type":"text"}]}]}]},{"type":"tr","content":[{"type":"td","attrs":{"colspan":1,"rowspan":1,"colwidth":null,"alignment":""},"content":[{"type":"paragraph","content":[{"text":"File Processing","type":"text","marks":[{"type":"strong"}]}]}]},{"type":"td","attrs":{"colspan":1,"rowspan":1,"colwidth":null,"alignment":""},"content":[{"type":"paragraph","content":[{"text":"polars, pandas, pyarrow","type":"text"}]}]},{"type":"td","attrs":{"colspan":1,"rowspan":1,"colwidth":null,"alignment":""},"content":[{"type":"paragraph","content":[{"text":"papaparse, xlsx, parquetjs","type":"text"}]}]},{"type":"td","attrs":{"colspan":1,"rowspan":1,"colwidth":null,"alignment":""},"content":[{"type":"paragraph","content":[{"text":"polars-rs, arrow-rs","type":"text"}]}]},{"type":"td","attrs":{"colspan":1,"rowspan":1,"colwidth":null,"alignment":""},"content":[{"type":"paragraph","content":[{"text":"encoding/csv, parquet-go","type":"text"}]}]}]},{"type":"tr","content":[{"type":"td","attrs":{"colspan":1,"rowspan":1,"colwidth":null,"alignment":""},"content":[{"type":"paragraph","content":[{"text":"Streaming","type":"text","marks":[{"type":"strong"}]}]}]},{"type":"td","attrs":{"colspan":1,"rowspan":1,"colwidth":null,"alignment":""},"content":[{"type":"paragraph","content":[{"text":"confluent-kafka, aiokafka","type":"text"}]}]},{"type":"td","attrs":{"colspan":1,"rowspan":1,"colwidth":null,"alignment":""},"content":[{"type":"paragraph","content":[{"text":"kafkajs","type":"text"}]}]},{"type":"td","attrs":{"colspan":1,"rowspan":1,"colwidth":null,"alignment":""},"content":[{"type":"paragraph","content":[{"text":"rdkafka-rs","type":"text"}]}]},{"type":"td","attrs":{"colspan":1,"rowspan":1,"colwidth":null,"alignment":""},"content":[{"type":"paragraph","content":[{"text":"franz-go, sarama","type":"text"}]}]}]},{"type":"tr","content":[{"type":"td","attrs":{"colspan":1,"rowspan":1,"colwidth":null,"alignment":""},"content":[{"type":"paragraph","content":[{"text":"CDC","type":"text","marks":[{"type":"strong"}]}]}]},{"type":"td","attrs":{"colspan":1,"rowspan":1,"colwidth":null,"alignment":""},"content":[{"type":"paragraph","content":[{"text":"Debezium, pg_logical","type":"text"}]}]},{"type":"td","attrs":{"colspan":1,"rowspan":1,"colwidth":null,"alignment":""},"content":[{"type":"paragraph","content":[{"text":"-","type":"text"}]}]},{"type":"td","attrs":{"colspan":1,"rowspan":1,"colwidth":null,"alignment":""},"content":[{"type":"paragraph","content":[{"text":"-","type":"text"}]}]},{"type":"td","attrs":{"colspan":1,"rowspan":1,"colwidth":null,"alignment":""},"content":[{"type":"paragraph","content":[{"text":"-","type":"text"}]}]}]}]},{"type":"heading","attrs":{"level":2},"content":[{"text":"Reference Documentation","type":"text"}]},{"type":"bullet_list","content":[{"type":"list_item","content":[{"type":"paragraph","content":[{"text":"references/cloud-storage.md","type":"text","marks":[{"type":"code_inline"}]},{"text":" - S3, GCS, Azure Blob patterns","type":"text"}]}]},{"type":"list_item","content":[{"type":"paragraph","content":[{"text":"references/file-formats.md","type":"text","marks":[{"type":"code_inline"}]},{"text":" - CSV, JSON, Parquet, Excel handling","type":"text"}]}]},{"type":"list_item","content":[{"type":"paragraph","content":[{"text":"references/api-feeds.md","type":"text","marks":[{"type":"code_inline"}]},{"text":" - REST polling, webhooks, GraphQL subscriptions","type":"text"}]}]},{"type":"list_item","content":[{"type":"paragraph","content":[{"text":"references/streaming-sources.md","type":"text","marks":[{"type":"code_inline"}]},{"text":" - Kafka, Kinesis, Pub/Sub","type":"text"}]}]},{"type":"list_item","content":[{"type":"paragraph","content":[{"text":"references/database-migration.md","type":"text","marks":[{"type":"code_inline"}]},{"text":" - Schema migration, CDC patterns","type":"text"}]}]},{"type":"list_item","content":[{"type":"paragraph","content":[{"text":"references/etl-tools.md","type":"text","marks":[{"type":"code_inline"}]},{"text":" - dlt, Meltano, Airbyte, Fivetran","type":"text"}]}]}]},{"type":"heading","attrs":{"level":2},"content":[{"text":"Scripts","type":"text"}]},{"type":"bullet_list","content":[{"type":"list_item","content":[{"type":"paragraph","content":[{"text":"scripts/validate_csv_schema.py","type":"text","marks":[{"type":"code_inline"}]},{"text":" - Validate CSV against expected schema","type":"text"}]}]},{"type":"list_item","content":[{"type":"paragraph","content":[{"text":"scripts/test_s3_connection.py","type":"text","marks":[{"type":"code_inline"}]},{"text":" - Test S3 bucket connectivity","type":"text"}]}]},{"type":"list_item","content":[{"type":"paragraph","content":[{"text":"scripts/generate_dlt_pipeline.py","type":"text","marks":[{"type":"code_inline"}]},{"text":" - Generate dlt pipeline scaffold","type":"text"}]}]}]},{"type":"heading","attrs":{"level":2},"content":[{"text":"Chaining with Database Skills","type":"text"}]},{"type":"paragraph","content":[{"text":"After ingestion, chain to appropriate database skill:","type":"text"}]},{"type":"table","attrs":{"layout":null},"content":[{"type":"tr","content":[{"type":"th","attrs":{"colspan":1,"rowspan":1,"colwidth":null,"alignment":""},"content":[{"type":"paragraph","content":[{"text":"Destination","type":"text"}]}]},{"type":"th","attrs":{"colspan":1,"rowspan":1,"colwidth":null,"alignment":""},"content":[{"type":"paragraph","content":[{"text":"Chain to Skill","type":"text"}]}]}]},{"type":"tr","content":[{"type":"td","attrs":{"colspan":1,"rowspan":1,"colwidth":null,"alignment":""},"content":[{"type":"paragraph","content":[{"text":"PostgreSQL, MySQL","type":"text"}]}]},{"type":"td","attrs":{"colspan":1,"rowspan":1,"colwidth":null,"alignment":""},"content":[{"type":"paragraph","content":[{"text":"databases-relational","type":"text","marks":[{"type":"code_inline"}]}]}]}]},{"type":"tr","content":[{"type":"td","attrs":{"colspan":1,"rowspan":1,"colwidth":null,"alignment":""},"content":[{"type":"paragraph","content":[{"text":"MongoDB, DynamoDB","type":"text"}]}]},{"type":"td","attrs":{"colspan":1,"rowspan":1,"colwidth":null,"alignment":""},"content":[{"type":"paragraph","content":[{"text":"databases-document","type":"text","marks":[{"type":"code_inline"}]}]}]}]},{"type":"tr","content":[{"type":"td","attrs":{"colspan":1,"rowspan":1,"colwidth":null,"alignment":""},"content":[{"type":"paragraph","content":[{"text":"Qdrant, Pinecone","type":"text"}]}]},{"type":"td","attrs":{"colspan":1,"rowspan":1,"colwidth":null,"alignment":""},"content":[{"type":"paragraph","content":[{"text":"databases-vector","type":"text","marks":[{"type":"code_inline"}]},{"text":" (after embedding)","type":"text"}]}]}]},{"type":"tr","content":[{"type":"td","attrs":{"colspan":1,"rowspan":1,"colwidth":null,"alignment":""},"content":[{"type":"paragraph","content":[{"text":"ClickHouse, TimescaleDB","type":"text"}]}]},{"type":"td","attrs":{"colspan":1,"rowspan":1,"colwidth":null,"alignment":""},"content":[{"type":"paragraph","content":[{"text":"databases-timeseries","type":"text","marks":[{"type":"code_inline"}]}]}]}]},{"type":"tr","content":[{"type":"td","attrs":{"colspan":1,"rowspan":1,"colwidth":null,"alignment":""},"content":[{"type":"paragraph","content":[{"text":"Neo4j","type":"text"}]}]},{"type":"td","attrs":{"colspan":1,"rowspan":1,"colwidth":null,"alignment":""},"content":[{"type":"paragraph","content":[{"text":"databases-graph","type":"text","marks":[{"type":"code_inline"}]}]}]}]}]},{"type":"paragraph","content":[{"text":"For vector databases, chain through ","type":"text"},{"text":"ai-data-engineering","type":"text","marks":[{"type":"code_inline"}]},{"text":" for embedding:","type":"text"}]},{"type":"code_block","attrs":{"wrap":false,"language":""},"content":[{"text":"ingesting-data → ai-data-engineering → databases-vector","type":"text"}]},{"type":"hr","attrs":{"markup":"---"}}]},"metadata":{"date":"2026-06-05","name":"ingesting-data","author":"@skillopedia","source":{"stars":368,"repo_name":"ai-design-components","origin_url":"https://github.com/ancoleman/ai-design-components/blob/HEAD/skills/ingesting-data/SKILL.md","repo_owner":"ancoleman","body_sha256":"5edcf6b9921b0fe55d3f31e3f4e8ce00f2fe21530b4e247b46094f37687822c3","cluster_key":"4e316951bebc44c3cacc9c7bae8cdee69fd8301ba8cc141492c39bcde888e1e9","clean_bundle":{"format":"clean-skill-bundle-v1","source":"ancoleman/ai-design-components/skills/ingesting-data/SKILL.md","attachments":[{"id":"f3fb0dfe-253f-5184-898a-87ad3f8cfa88","key":"uploads/10433ee7-ad12-4ae0-b34e-97553e46c6c8/f3fb0dfe-253f-5184-898a-87ad3f8cfa88/attachment.yaml","path":"outputs.yaml","size":7351,"sha256":"98bb61179e11ee65ba5fe3992b5b20d6ef877fa91ca24b5aafb6ea4b94d6f079","contentType":"application/yaml; charset=utf-8"},{"id":"bc277833-d77b-5f1b-b37b-bfdada45427d","key":"uploads/10433ee7-ad12-4ae0-b34e-97553e46c6c8/bc277833-d77b-5f1b-b37b-bfdada45427d/attachment.md","path":"references/api-feeds.md","size":6023,"sha256":"5760942d54309439834b5ac1f394af8601a0630b742f346f02a7a9114a77529f","contentType":"text/markdown; charset=utf-8"},{"id":"491ec5f6-dd99-5017-a2b7-947c0bcd7128","key":"uploads/10433ee7-ad12-4ae0-b34e-97553e46c6c8/491ec5f6-dd99-5017-a2b7-947c0bcd7128/attachment.md","path":"references/cloud-storage.md","size":4032,"sha256":"54db0a51c368e5547b22420e6108bb6fef707918dd1d47a2e5f75a741f112d3b","contentType":"text/markdown; charset=utf-8"},{"id":"dd8cb038-2ad9-5ff8-9bcb-f5252325eb4b","key":"uploads/10433ee7-ad12-4ae0-b34e-97553e46c6c8/dd8cb038-2ad9-5ff8-9bcb-f5252325eb4b/attachment.md","path":"references/database-migration.md","size":6500,"sha256":"0074e66156289360d42817ac69d143c4b718fe81248655c57c2ba12dde60cebb","contentType":"text/markdown; charset=utf-8"},{"id":"3cd5f6e4-dd6c-5bff-a0b7-6aeb45b047e9","key":"uploads/10433ee7-ad12-4ae0-b34e-97553e46c6c8/3cd5f6e4-dd6c-5bff-a0b7-6aeb45b047e9/attachment.md","path":"references/etl-tools.md","size":5725,"sha256":"0b635f6b0c9afa378c86e86d7d12f64bea4029cdd1383c774462ccbdc956a6c2","contentType":"text/markdown; charset=utf-8"},{"id":"c9f85027-a433-520d-a8cd-0c138f91daf2","key":"uploads/10433ee7-ad12-4ae0-b34e-97553e46c6c8/c9f85027-a433-520d-a8cd-0c138f91daf2/attachment.md","path":"references/file-formats.md","size":4194,"sha256":"5eab7f14b836a202e7e7edb43aab75db52ead5b8e3cb2b4692c21d2243848c63","contentType":"text/markdown; charset=utf-8"},{"id":"1abec664-7d58-5e41-9088-31bf32f41ec1","key":"uploads/10433ee7-ad12-4ae0-b34e-97553e46c6c8/1abec664-7d58-5e41-9088-31bf32f41ec1/attachment.md","path":"references/streaming-sources.md","size":6852,"sha256":"1b20afed902507c178dc50767b8cd2c6804bf748e7f9e98474028536903c87a0","contentType":"text/markdown; charset=utf-8"},{"id":"b117a34b-e407-581e-9a08-8d42cb5fe8f3","key":"uploads/10433ee7-ad12-4ae0-b34e-97553e46c6c8/b117a34b-e407-581e-9a08-8d42cb5fe8f3/attachment.py","path":"scripts/generate_dlt_pipeline.py","size":7517,"sha256":"278c9f40c088094b914f3e8f37dbdd4cda2d96243b1b21f1ad795192dfd8626e","contentType":"text/x-python; charset=utf-8"},{"id":"931de7cf-157f-516c-a9be-fa6f76156b2a","key":"uploads/10433ee7-ad12-4ae0-b34e-97553e46c6c8/931de7cf-157f-516c-a9be-fa6f76156b2a/attachment.py","path":"scripts/test_s3_connection.py","size":4320,"sha256":"54d5f35ccd891b4cb18938e281712038439dd7f56cd96dcb7cc5367bd697cf97","contentType":"text/x-python; charset=utf-8"},{"id":"75b88745-8add-5c4b-87a9-5bbde74c239b","key":"uploads/10433ee7-ad12-4ae0-b34e-97553e46c6c8/75b88745-8add-5c4b-87a9-5bbde74c239b/attachment.py","path":"scripts/validate_csv_schema.py","size":5325,"sha256":"bbfc67c3fc42f8b85b35a5e8286d958f4465f406261086e2a53b25337d5fa1b1","contentType":"text/x-python; charset=utf-8"}],"bundle_sha256":"f35ae566130fb3102bf1af19dcb611ccb71fc0b4238bc5c3cee3c713a0fc146f","attachment_count":10,"text_attachments":10,"attachment_storage":"skillopedia-attachments-v1","binary_attachments":0,"excluded_attachments":[]},"cluster_size":1,"skill_md_path":"skills/ingesting-data/SKILL.md","import_metadata":{"date":"2026-06-05","author":"@skillopedia","version":"v1","category":"web-development","category_label":"Web"},"exact_dupes_collapsed_into_this":0},"version":"v1","category":"web-development","import_tag":"clean-skills-v1","description":"Data ingestion patterns for loading data from cloud storage, APIs, files, and streaming sources into databases. Use when importing CSV/JSON/Parquet files, pulling from S3/GCS buckets, consuming API feeds, or building ETL pipelines."}},"renderedAt":1782979321911}

Data Ingestion Patterns This skill provides patterns for getting data INTO systems from external sources. When to Use This Skill - Importing CSV, JSON, Parquet, or Excel files - Loading data from S3, GCS, or Azure Blob storage - Consuming REST/GraphQL API feeds - Building ETL/ELT pipelines - Database migration and CDC (Change Data Capture) - Streaming data ingestion from Kafka/Kinesis Ingestion Pattern Decision Tree Quick Start by Language Python (Recommended for ETL) dlt (data load tool) - Modern Python ETL: Polars for file processing (faster than pandas): TypeScript/Node.js S3 ingestion: AP…