Detecting Command and Control Over DNS When to Use - Investigating suspected DNS tunneling used for C2 communication or data exfiltration - Analyzing DNS query logs for signs of encoded payloads in subdomain strings - Classifying domains as DGA-generated vs. legitimate using statistical or ML methods - Detecting DNS beaconing patterns (regular intervals, consistent query sizes) - Hunting for Iodine, dnscat2, dns2tcp, Cobalt Strike DNS, or Sliver DNS traffic - Monitoring TXT record abuse for command delivery or staged payload download - Building DNS anomaly detection rules for SOC/SIEM deploym…

)\n if b64_pattern.match(txt_data.strip()):\n findings[\"indicators\"].append({\n \"type\": \"base64_encoded\",\n \"detail\": \"Content matches base64 pattern\",\n \"severity\": \"high\",\n })\n try:\n decoded = base64.b64decode(txt_data.strip())\n preview = decoded[:200]\n\n # Check for PE header (MZ)\n if preview[:2] == b'MZ':\n findings[\"indicators\"].append({\n \"type\": \"pe_executable\",\n \"detail\": \"Decoded base64 contains PE executable (MZ header)\",\n \"severity\": \"critical\",\n })\n\n # Check for ELF header\n if preview[:4] == b'\\x7fELF':\n findings[\"indicators\"].append({\n \"type\": \"elf_executable\",\n \"detail\": \"Decoded base64 contains ELF executable\",\n \"severity\": \"critical\",\n })\n\n # Check for PowerShell patterns\n decoded_str = decoded.decode(\"utf-8\", errors=\"ignore\")\n ps_patterns = [\n r\"Invoke-Expression\",\n r\"IEX\\s*\\(\",\n r\"New-Object\\s+System\\.Net\",\n r\"DownloadString\",\n r\"FromBase64String\",\n r\"Start-Process\",\n r\"\\-enc\\s\",\n r\"powershell\\s.*\\-e\\s\",\n ]\n for pattern in ps_patterns:\n if re.search(pattern, decoded_str, re.IGNORECASE):\n findings[\"indicators\"].append({\n \"type\": \"powershell_stager\",\n \"detail\": f\"Decoded content contains PowerShell pattern: {pattern}\",\n \"severity\": \"critical\",\n })\n break\n\n findings[\"decoded_preview\"] = repr(preview[:100])\n\n except Exception:\n pass\n\n # Known C2 TXT patterns\n cobalt_pattern = re.compile(r'^[a-f0-9]{32,}

Detecting Command and Control Over DNS When to Use - Investigating suspected DNS tunneling used for C2 communication or data exfiltration - Analyzing DNS query logs for signs of encoded payloads in subdomain strings - Classifying domains as DGA-generated vs. legitimate using statistical or ML methods - Detecting DNS beaconing patterns (regular intervals, consistent query sizes) - Hunting for Iodine, dnscat2, dns2tcp, Cobalt Strike DNS, or Sliver DNS traffic - Monitoring TXT record abuse for command delivery or staged payload download - Building DNS anomaly detection rules for SOC/SIEM deploym…

, re.IGNORECASE)\n if cobalt_pattern.match(txt_data.strip()):\n findings[\"indicators\"].append({\n \"type\": \"hex_encoded_payload\",\n \"detail\": \"Pure hex string in TXT record - possible Cobalt Strike beacon config\",\n \"severity\": \"high\",\n })\n\n # Multiple concatenated base64 blocks (common in staged delivery)\n b64_blocks = re.findall(r'[A-Za-z0-9+/]{50,}={0,2}', txt_data)\n if len(b64_blocks) > 3:\n findings[\"indicators\"].append({\n \"type\": \"multi_block_payload\",\n \"detail\": f\"{len(b64_blocks)} base64 blocks found - possible staged payload\",\n \"severity\": \"high\",\n })\n\n # Check for known legitimate TXT patterns to reduce false positives\n legitimate_patterns = [\n r'^v=spf1\\s', # SPF record\n r'^v=DKIM1', # DKIM record\n r'^v=DMARC1', # DMARC record\n r'^google-site-verification=',\n r'^MS=', # Microsoft domain verification\n r'^docusign=',\n r'^apple-domain-verification=',\n r'^facebook-domain-verification=',\n r'^_globalsign-domain-verification=',\n ]\n for pattern in legitimate_patterns:\n if re.match(pattern, txt_data, re.IGNORECASE):\n findings[\"indicators\"] = []\n findings[\"legitimate\"] = True\n return findings\n\n findings[\"suspicious\"] = len(findings[\"indicators\"]) > 0\n return findings\n\n\ndef analyze_txt_records_bulk(records):\n \"\"\"Analyze a batch of DNS TXT records.\"\"\"\n results = []\n for record in records:\n domain = record.get(\"domain\", record.get(\"query\", \"\"))\n txt_data = record.get(\"txt\", record.get(\"answer\", \"\"))\n if txt_data:\n finding = analyze_txt_record(txt_data, domain)\n if finding[\"suspicious\"]:\n results.append(finding)\n\n results.sort(\n key=lambda x: max((i.get(\"severity_score\", 0) for i in x[\"indicators\"]),\n default=0),\n reverse=True,\n )\n return results\n```\n\n### Step 4: DGA Domain Classification with Machine Learning\n\nTrain a classifier to distinguish DGA-generated domains from legitimate ones:\n\n```python\n#!/usr/bin/env python3\n\"\"\"\nDGA domain classification using character-level feature extraction and ML.\n\nFeatures extracted per domain:\n - Shannon entropy of the domain string\n - Domain length\n - Digit ratio, consonant ratio, vowel ratio\n - Longest consecutive consonant sequence\n - N-gram frequency deviation from English\n - Number of distinct characters\n - Presence of dictionary words\n\"\"\"\n\nimport math\nimport re\nimport string\nfrom collections import Counter\n\nimport numpy as np\n\ntry:\n from sklearn.ensemble import RandomForestClassifier, GradientBoostingClassifier\n from sklearn.model_selection import train_test_split, cross_val_score\n from sklearn.metrics import classification_report, confusion_matrix\n from sklearn.preprocessing import StandardScaler\n HAS_SKLEARN = True\nexcept ImportError:\n HAS_SKLEARN = False\n\n\n# English language character bigram frequencies (normalized, top bigrams)\n# Source: Peter Norvig's English letter frequency analysis\nENGLISH_BIGRAMS = {\n \"th\": 0.0356, \"he\": 0.0307, \"in\": 0.0243, \"er\": 0.0205,\n \"an\": 0.0199, \"re\": 0.0185, \"on\": 0.0176, \"at\": 0.0149,\n \"en\": 0.0145, \"nd\": 0.0135, \"ti\": 0.0134, \"es\": 0.0134,\n \"or\": 0.0128, \"te\": 0.0120, \"of\": 0.0117, \"ed\": 0.0117,\n \"is\": 0.0113, \"it\": 0.0112, \"al\": 0.0109, \"ar\": 0.0107,\n \"st\": 0.0105, \"to\": 0.0104, \"nt\": 0.0104, \"ng\": 0.0095,\n \"se\": 0.0093, \"ha\": 0.0093, \"as\": 0.0087, \"ou\": 0.0087,\n \"io\": 0.0083, \"le\": 0.0083, \"ve\": 0.0083, \"co\": 0.0079,\n \"me\": 0.0079, \"de\": 0.0076, \"hi\": 0.0076, \"ri\": 0.0073,\n \"ro\": 0.0073, \"ic\": 0.0070, \"ne\": 0.0069, \"ea\": 0.0069,\n}\n\nVOWELS = set(\"aeiou\")\nCONSONANTS = set(\"bcdfghjklmnpqrstvwxyz\")\n\n\ndef extract_domain_features(domain):\n \"\"\"Extract numerical features from a domain name for ML classification.\"\"\"\n domain = domain.lower().strip(\".\")\n\n # Remove TLD for analysis (focus on SLD + subdomain)\n parts = domain.split(\".\")\n if len(parts) > 1:\n analysis_str = \".\".join(parts[:-1]) # Drop TLD\n else:\n analysis_str = domain\n\n # Remove dots for character analysis\n flat = analysis_str.replace(\".\", \"\")\n length = len(flat)\n\n if length == 0:\n return None\n\n # 1. Shannon entropy\n entropy = 0.0\n counter = Counter(flat)\n for count in counter.values():\n p = count / length\n entropy -= p * math.log2(p)\n\n # 2. Character ratios\n digit_count = sum(1 for c in flat if c.isdigit())\n vowel_count = sum(1 for c in flat if c in VOWELS)\n consonant_count = sum(1 for c in flat if c in CONSONANTS)\n special_count = sum(1 for c in flat if c == '-')\n\n digit_ratio = digit_count / length\n vowel_ratio = vowel_count / length\n consonant_ratio = consonant_count / length\n\n # 3. Longest consecutive consonant run\n max_consonant_run = 0\n current_run = 0\n for c in flat:\n if c in CONSONANTS:\n current_run += 1\n max_consonant_run = max(max_consonant_run, current_run)\n else:\n current_run = 0\n\n # 4. Distinct character count and ratio\n distinct_chars = len(set(flat))\n distinct_ratio = distinct_chars / length\n\n # 5. Bigram frequency deviation from English\n bigrams = [flat[i:i+2] for i in range(len(flat) - 1)]\n if bigrams:\n english_score = sum(\n ENGLISH_BIGRAMS.get(bg, 0) for bg in bigrams\n ) / len(bigrams)\n else:\n english_score = 0\n\n # 6. Number of labels (dots + 1)\n label_count = len(parts)\n\n # 7. Hex character ratio (common in DGA)\n hex_chars = set(\"0123456789abcdef\")\n hex_ratio = sum(1 for c in flat if c in hex_chars) / length\n\n # 8. Digit-letter transitions (DGA domains mix digits and letters)\n transitions = 0\n for i in range(1, len(flat)):\n if (flat[i].isdigit() != flat[i-1].isdigit()):\n transitions += 1\n transition_ratio = transitions / max(length - 1, 1)\n\n # 9. Repeated character ratio\n if length > 1:\n repeats = sum(1 for i in range(1, len(flat)) if flat[i] == flat[i-1])\n repeat_ratio = repeats / (length - 1)\n else:\n repeat_ratio = 0\n\n return {\n \"domain\": domain,\n \"length\": length,\n \"entropy\": round(entropy, 4),\n \"digit_ratio\": round(digit_ratio, 4),\n \"vowel_ratio\": round(vowel_ratio, 4),\n \"consonant_ratio\": round(consonant_ratio, 4),\n \"max_consonant_run\": max_consonant_run,\n \"distinct_chars\": distinct_chars,\n \"distinct_ratio\": round(distinct_ratio, 4),\n \"english_bigram_score\": round(english_score, 6),\n \"label_count\": label_count,\n \"hex_ratio\": round(hex_ratio, 4),\n \"transition_ratio\": round(transition_ratio, 4),\n \"repeat_ratio\": round(repeat_ratio, 4),\n \"special_count\": special_count,\n }\n\n\nFEATURE_COLUMNS = [\n \"length\", \"entropy\", \"digit_ratio\", \"vowel_ratio\", \"consonant_ratio\",\n \"max_consonant_run\", \"distinct_chars\", \"distinct_ratio\",\n \"english_bigram_score\", \"label_count\", \"hex_ratio\",\n \"transition_ratio\", \"repeat_ratio\", \"special_count\",\n]\n\n\ndef features_to_vector(features):\n \"\"\"Convert feature dict to numpy array.\"\"\"\n return np.array([features[col] for col in FEATURE_COLUMNS])\n\n\ndef train_dga_classifier(legitimate_domains, dga_domains, model_type=\"random_forest\"):\n \"\"\"\n Train a DGA classifier on labeled domain lists.\n\n Args:\n legitimate_domains: list of known-good domain strings\n dga_domains: list of known DGA domain strings\n model_type: 'random_forest' or 'gradient_boosting'\n\n Returns:\n trained model, scaler, and evaluation metrics\n \"\"\"\n if not HAS_SKLEARN:\n print(\"[ERROR] scikit-learn required: pip install scikit-learn\")\n return None, None, None\n\n # Extract features\n X_legit = []\n X_dga = []\n\n for d in legitimate_domains:\n feats = extract_domain_features(d)\n if feats:\n X_legit.append(features_to_vector(feats))\n\n for d in dga_domains:\n feats = extract_domain_features(d)\n if feats:\n X_dga.append(features_to_vector(feats))\n\n if not X_legit or not X_dga:\n print(\"[ERROR] Insufficient feature data\")\n return None, None, None\n\n X = np.vstack([np.array(X_legit), np.array(X_dga)])\n y = np.array([0] * len(X_legit) + [1] * len(X_dga))\n\n # Scale features\n scaler = StandardScaler()\n X_scaled = scaler.fit_transform(X)\n\n # Train/test split\n X_train, X_test, y_train, y_test = train_test_split(\n X_scaled, y, test_size=0.2, random_state=42, stratify=y\n )\n\n # Train model\n if model_type == \"gradient_boosting\":\n model = GradientBoostingClassifier(\n n_estimators=200, max_depth=6, learning_rate=0.1,\n min_samples_split=10, random_state=42,\n )\n else:\n model = RandomForestClassifier(\n n_estimators=200, max_depth=15, min_samples_split=5,\n random_state=42, n_jobs=-1,\n )\n\n model.fit(X_train, y_train)\n\n # Evaluate\n y_pred = model.predict(X_test)\n report = classification_report(y_test, y_pred, target_names=[\"legitimate\", \"dga\"],\n output_dict=True)\n cm = confusion_matrix(y_test, y_pred)\n\n # Cross-validation\n cv_scores = cross_val_score(model, X_scaled, y, cv=5, scoring=\"f1\")\n\n metrics = {\n \"accuracy\": report[\"accuracy\"],\n \"precision_dga\": report[\"dga\"][\"precision\"],\n \"recall_dga\": report[\"dga\"][\"recall\"],\n \"f1_dga\": report[\"dga\"][\"f1-score\"],\n \"precision_legit\": report[\"legitimate\"][\"precision\"],\n \"recall_legit\": report[\"legitimate\"][\"recall\"],\n \"confusion_matrix\": cm.tolist(),\n \"cv_f1_mean\": cv_scores.mean(),\n \"cv_f1_std\": cv_scores.std(),\n \"feature_importance\": dict(zip(\n FEATURE_COLUMNS,\n [round(float(x), 4) for x in model.feature_importances_]\n )),\n }\n\n print(f\"[+] Model trained: {model_type}\")\n print(f\" Accuracy: {metrics['accuracy']:.4f}\")\n print(f\" DGA F1: {metrics['f1_dga']:.4f}\")\n print(f\" DGA Recall: {metrics['recall_dga']:.4f}\")\n print(f\" CV F1 (5-fold): {metrics['cv_f1_mean']:.4f} +/- {metrics['cv_f1_std']:.4f}\")\n print(f\" Top features: \", end=\"\")\n top_feats = sorted(metrics[\"feature_importance\"].items(), key=lambda x: x[1], reverse=True)[:5]\n print(\", \".join(f\"{k}={v:.3f}\" for k, v in top_feats))\n\n return model, scaler, metrics\n\n\ndef classify_domains(domains, model, scaler):\n \"\"\"Classify a list of domains as legitimate or DGA using a trained model.\"\"\"\n results = []\n for domain in domains:\n feats = extract_domain_features(domain)\n if feats is None:\n continue\n\n vec = features_to_vector(feats).reshape(1, -1)\n vec_scaled = scaler.transform(vec)\n\n prediction = model.predict(vec_scaled)[0]\n probability = model.predict_proba(vec_scaled)[0]\n\n results.append({\n \"domain\": domain,\n \"prediction\": \"dga\" if prediction == 1 else \"legitimate\",\n \"confidence\": round(float(max(probability)), 4),\n \"dga_probability\": round(float(probability[1]), 4),\n \"features\": feats,\n })\n\n return results\n```\n\n### Step 5: DNS Beaconing Pattern Detection\n\nIdentify periodic DNS query patterns indicative of C2 check-ins:\n\n```python\n#!/usr/bin/env python3\n\"\"\"DNS beaconing detection - identifies periodic C2 check-in patterns.\"\"\"\n\nimport math\nfrom collections import defaultdict\nfrom datetime import datetime, timedelta\n\nimport numpy as np\n\n\ndef parse_timestamp(ts_str):\n \"\"\"Parse various timestamp formats to datetime.\"\"\"\n formats = [\n \"%Y-%m-%dT%H:%M:%S.%fZ\",\n \"%Y-%m-%dT%H:%M:%S.%f\",\n \"%Y-%m-%dT%H:%M:%S\",\n \"%Y-%m-%d %H:%M:%S.%f\",\n \"%Y-%m-%d %H:%M:%S\",\n ]\n for fmt in formats:\n try:\n return datetime.strptime(ts_str, fmt)\n except ValueError:\n continue\n\n # Try epoch timestamp\n try:\n ts_float = float(ts_str)\n return datetime.utcfromtimestamp(ts_float)\n except (ValueError, OverflowError, OSError):\n pass\n\n return None\n\n\ndef detect_beaconing(dns_queries, min_queries=10, max_jitter_pct=25,\n min_interval_sec=10, max_interval_sec=7200):\n \"\"\"\n Detect DNS beaconing by analyzing inter-query timing intervals.\n\n Beaconing indicators:\n - Regular inter-query intervals (low standard deviation)\n - Consistent query sizes\n - Single source IP to single domain over extended period\n - Low jitter (variation in timing)\n\n Args:\n dns_queries: list of dicts with 'src_ip', 'query', 'timestamp'\n min_queries: minimum queries to analyze (default 10)\n max_jitter_pct: maximum coefficient of variation for beacon (default 25%)\n min_interval_sec: minimum beacon interval to detect (default 10s)\n max_interval_sec: maximum beacon interval to detect (default 7200s / 2hr)\n\n Returns:\n list of detected beacon patterns with confidence scores\n \"\"\"\n # Group queries by (source IP, base domain)\n groups = defaultdict(list)\n\n for q in dns_queries:\n src_ip = q.get(\"src_ip\", \"\")\n fqdn = q.get(\"query\", \"\").lower().rstrip(\".\")\n ts_str = q.get(\"timestamp\", \"\")\n\n ts = parse_timestamp(ts_str)\n if not ts or not src_ip or not fqdn:\n continue\n\n # Extract base domain (last 2 labels)\n parts = fqdn.split(\".\")\n if len(parts) >= 2:\n base_domain = \".\".join(parts[-2:])\n else:\n base_domain = fqdn\n\n groups[(src_ip, base_domain)].append(ts)\n\n beacons = []\n\n for (src_ip, base_domain), timestamps in groups.items():\n if len(timestamps) \u003c min_queries:\n continue\n\n # Sort timestamps and compute intervals\n timestamps.sort()\n intervals = [\n (timestamps[i+1] - timestamps[i]).total_seconds()\n for i in range(len(timestamps) - 1)\n ]\n\n if not intervals:\n continue\n\n intervals = np.array(intervals)\n\n # Filter out zero intervals (duplicate timestamps)\n intervals = intervals[intervals > 0]\n if len(intervals) \u003c min_queries - 1:\n continue\n\n mean_interval = np.mean(intervals)\n std_interval = np.std(intervals)\n median_interval = np.median(intervals)\n\n # Skip if interval is outside detection range\n if mean_interval \u003c min_interval_sec or mean_interval > max_interval_sec:\n continue\n\n # Coefficient of variation (jitter)\n cv = (std_interval / mean_interval * 100) if mean_interval > 0 else 100\n\n # Time span of activity\n time_span = (timestamps[-1] - timestamps[0]).total_seconds()\n hours_active = time_span / 3600\n\n # Beacon scoring\n score = 0.0\n flags = []\n\n # Low jitter = strong beacon indicator\n if cv \u003c 5:\n score += 40\n flags.append(f\"very_low_jitter:CV={cv:.1f}%\")\n elif cv \u003c 15:\n score += 30\n flags.append(f\"low_jitter:CV={cv:.1f}%\")\n elif cv \u003c max_jitter_pct:\n score += 15\n flags.append(f\"moderate_jitter:CV={cv:.1f}%\")\n else:\n continue # Too much jitter, not a beacon\n\n # Long duration increases confidence\n if hours_active > 24:\n score += 20\n flags.append(f\"persistent:{hours_active:.1f}h\")\n elif hours_active > 4:\n score += 10\n flags.append(f\"sustained:{hours_active:.1f}h\")\n\n # High query count increases confidence\n if len(timestamps) > 100:\n score += 15\n flags.append(f\"high_volume:{len(timestamps)}\")\n elif len(timestamps) > 50:\n score += 10\n flags.append(f\"moderate_volume:{len(timestamps)}\")\n\n # Common C2 intervals (60s, 120s, 300s, 600s, 900s, 1800s, 3600s)\n common_intervals = [60, 120, 300, 600, 900, 1800, 3600]\n for ci in common_intervals:\n if abs(mean_interval - ci) \u003c ci * 0.1: # Within 10% of common interval\n score += 10\n flags.append(f\"common_c2_interval:~{ci}s\")\n break\n\n beacons.append({\n \"src_ip\": src_ip,\n \"base_domain\": base_domain,\n \"query_count\": len(timestamps),\n \"mean_interval_sec\": round(mean_interval, 2),\n \"median_interval_sec\": round(median_interval, 2),\n \"std_interval_sec\": round(std_interval, 2),\n \"jitter_cv_pct\": round(cv, 2),\n \"first_seen\": timestamps[0].isoformat(),\n \"last_seen\": timestamps[-1].isoformat(),\n \"duration_hours\": round(hours_active, 2),\n \"score\": round(score, 1),\n \"flags\": flags,\n })\n\n beacons.sort(key=lambda x: x[\"score\"], reverse=True)\n return beacons\n\n\ndef print_beacon_report(beacons, top_n=20):\n \"\"\"Print formatted beacon detection report.\"\"\"\n print(\"=\" * 80)\n print(\" DNS BEACONING DETECTION REPORT\")\n print(\"=\" * 80)\n print(f\" Beacon patterns detected: {len(beacons)}\")\n print()\n\n if not beacons:\n print(\" No beaconing patterns detected.\")\n return\n\n print(f\" TOP {min(top_n, len(beacons))} BEACON CANDIDATES\")\n print(\" \" + \"-\" * 76)\n\n for b in beacons[:top_n]:\n print(f\" Score: {b['score']:.1f} | {b['src_ip']} -> {b['base_domain']}\")\n print(f\" Queries: {b['query_count']} \"\n f\"Interval: {b['mean_interval_sec']:.1f}s +/- {b['std_interval_sec']:.1f}s \"\n f\"Jitter: {b['jitter_cv_pct']:.1f}%\")\n print(f\" Active: {b['duration_hours']:.1f}h \"\n f\"({b['first_seen']} to {b['last_seen']})\")\n print(f\" Flags: {', '.join(b['flags'])}\")\n print()\n```\n\n### Step 6: Integrated DNS C2 Detection Pipeline\n\nCombine all detection methods into a unified analysis:\n\n```\nDNS C2 Detection Pipeline Architecture:\n━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━\n\n ┌────────────────────────────────────────────────────────┐\n │ DATA SOURCES │\n │ Zeek dns.log | Suricata EVE | Recursive Resolver │\n │ Passive DNS | PCAP capture | EDR DNS telemetry │\n └───────────────────────┬────────────────────────────────┘\n │\n ┌───────────────────────▼────────────────────────────────┐\n │ PREPROCESSING │\n │ Parse timestamps | Extract subdomains | Normalize │\n │ FQDN | Resolve base domain | Lookup in whitelist │\n └───────────────────────┬────────────────────────────────┘\n │\n ┌───────────────┼───────────────┐\n │ │ │\n ┌───────▼──────┐ ┌──────▼──────┐ ┌──────▼──────┐\n │ ENTROPY │ │ BEACONING │ │ DGA │\n │ ANALYSIS │ │ DETECTION │ │ CLASSIFIER │\n │ │ │ │ │ │\n │ Shannon ent. │ │ Interval │ │ ML model │\n │ Subdomain │ │ analysis │ │ Random │\n │ length │ │ Jitter/CV │ │ Forest or │\n │ Encoding │ │ Duration │ │ Gradient │\n │ patterns │ │ Periodicity │ │ Boosting │\n └───────┬──────┘ └──────┬──────┘ └──────┬──────┘\n │ │ │\n ┌───────▼──────┐ ┌──────▼──────┐ ┌──────▼──────┐\n │ TXT RECORD │ │ TOOL │ │ PASSIVE │\n │ PAYLOAD │ │ SIGNATURE │ │ DNS │\n │ ANALYSIS │ │ MATCHING │ │ ENRICHMENT │\n │ │ │ │ │ │\n │ Base64 decode│ │ Iodine │ │ First seen │\n │ PE/ELF detect│ │ dnscat2 │ │ Registrar │\n │ PS stager │ │ dns2tcp │ │ Age check │\n │ Size anomaly │ │ Cobalt DNS │ │ Reputation │\n └───────┬──────┘ └──────┬──────┘ └──────┬──────┘\n │ │ │\n ┌───────▼───────────────▼───────────────▼────────────────┐\n │ CORRELATION ENGINE │\n │ Combine scores from all detectors │\n │ Weighted scoring: entropy(30%) + beacon(25%) + │\n │ DGA(20%) + TXT payload(15%) + signature(10%) │\n │ Threshold: score > 60 = alert, > 40 = investigate │\n └───────────────────────┬────────────────────────────────┘\n │\n ┌───────────────────────▼────────────────────────────────┐\n │ ALERTING & RESPONSE │\n │ Generate SIEM alert with all evidence │\n │ Block domain in DNS firewall / RPZ │\n │ Isolate endpoint via EDR │\n │ Create incident ticket with IOCs │\n └────────────────────────────────────────────────────────┘\n```\n\n### Step 7: SIEM Detection Rules\n\nDeploy detection queries in your SIEM platform:\n\n```\nSplunk SPL - DNS Tunneling Detection:\n━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━\n\n-- High entropy subdomain queries\nindex=dns sourcetype=\"bro:dns:json\" OR sourcetype=\"suricata:dns\"\n| eval subdomain=mvindex(split(query,\".\"),0)\n| eval sub_len=len(subdomain)\n| where sub_len > 30\n| eval char_counts=mvmap(split(subdomain,\"\"),1)\n| lookup dns_entropy_lookup subdomain OUTPUT entropy\n| where entropy > 3.5\n| stats count as query_count dc(query) as unique_queries\n avg(sub_len) as avg_sub_len values(query) as sample_queries\n by src_ip, domain\n| where query_count > 20\n| sort -query_count\n\n-- DNS TXT record abuse\nindex=dns (qtype=\"TXT\" OR qtype_name=\"TXT\")\n NOT (query=\"*._domainkey.*\" OR query=\"*._dmarc.*\" OR query=\"*._spf.*\")\n| stats count as txt_queries dc(query) as unique_txt_queries\n values(query) as domains\n by src_ip\n| where txt_queries > 50\n| sort -txt_queries\n\n-- DNS beaconing (regular interval queries)\nindex=dns sourcetype=\"bro:dns:json\"\n| bin _time span=60s\n| stats count by src_ip, query, _time\n| streamstats window=10 current=t avg(count) as avg_count stdev(count) as std_count by src_ip, query\n| eval cv = if(avg_count>0, (std_count/avg_count)*100, 100)\n| where cv \u003c 20 AND avg_count > 0\n| stats count as beacon_windows avg(cv) as avg_jitter\n min(_time) as first_seen max(_time) as last_seen\n by src_ip, query\n| where beacon_windows > 10\n| sort -beacon_windows\n\n-- Unusual record type volume (NULL, KEY, SRV for tunneling)\nindex=dns (qtype_name=\"NULL\" OR qtype_name=\"KEY\" OR qtype_name=\"SRV\"\n OR qtype_name=\"CNAME\" OR qtype_name=\"MX\")\n NOT qtype_name=\"A\" NOT qtype_name=\"AAAA\" NOT qtype_name=\"PTR\"\n| stats count by src_ip, qtype_name, query\n| where count > 10\n| sort -count\n```\n\n```\nElastic KQL - DNS C2 Detection:\n━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━\n\n-- Long subdomain queries (potential tunneling)\ndns.question.name: * and not dns.question.name: *.in-addr.arpa\n| where length(dns.question.subdomain) > 40\n\n-- High volume DNS to single domain\nevent.dataset: \"zeek.dns\" or event.dataset: \"suricata.dns\"\n| stats count by source.ip, dns.question.registered_domain\n| where count > 500\n\n-- TXT record queries to non-standard domains\ndns.question.type: \"TXT\"\n and not dns.question.name: (*._domainkey.* or *._dmarc.* or *._spf.*)\n```\n\n```\nZeek Script - DNS Tunneling Indicator:\n━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━\n\n# dns_tunnel_detect.zeek\n@load base/protocols/dns\n\nmodule DNSTunnel;\n\nexport {\n redef enum Notice::Type += {\n DNS_Tunneling_Suspected,\n DNS_High_Entropy_Query,\n DNS_Excessive_TXT_Queries,\n };\n\n const entropy_threshold = 3.5 &redef;\n const subdomain_length_threshold = 40 &redef;\n const txt_query_threshold = 50 &redef;\n const tracking_interval = 5min &redef;\n}\n\nglobal txt_query_tracker: table[addr] of count &create_expire=5min &default=0;\nglobal domain_query_tracker: table[addr, string] of count &create_expire=10min &default=0;\n\nfunction shannon_entropy(s: string): double\n{\n local counts: table[string] of count;\n local total = |s|;\n\n if (total == 0) return 0.0;\n\n for (i in s)\n {\n local c = s[i];\n if (c !in counts) counts[c] = 0;\n ++counts[c];\n }\n\n local ent = 0.0;\n for (ch, cnt in counts)\n {\n local p = cnt * 1.0 / total;\n ent -= p * log2(p);\n }\n\n return ent;\n}\n\nevent dns_request(c: connection, msg: dns_msg, query: string, qtype: count,\n qclass: count)\n{\n if (|query| == 0) return;\n\n # Track TXT queries\n if (qtype == 16) # TXT\n {\n ++txt_query_tracker[c$id$orig_h];\n if (txt_query_tracker[c$id$orig_h] == txt_query_threshold)\n {\n NOTICE([\n $note=DNS_Excessive_TXT_Queries,\n $conn=c,\n $msg=fmt(\"Host %s made %d TXT queries in tracking window\",\n c$id$orig_h, txt_query_threshold),\n $identifier=cat(c$id$orig_h),\n ]);\n }\n }\n\n # Extract subdomain and check entropy\n local parts = split_string(query, /\\./);\n if (|parts| \u003c 3) return;\n\n # Subdomain = everything except last two labels\n local subdomain = \"\";\n local i = 0;\n for (idx in parts)\n {\n if (i \u003c |parts| - 2)\n subdomain += parts[idx];\n ++i;\n }\n\n if (|subdomain| > subdomain_length_threshold)\n {\n local ent = shannon_entropy(subdomain);\n if (ent > entropy_threshold)\n {\n NOTICE([\n $note=DNS_High_Entropy_Query,\n $conn=c,\n $msg=fmt(\"High entropy DNS query: entropy=%.2f len=%d query=%s\",\n ent, |subdomain|, query),\n $identifier=cat(c$id$orig_h, query),\n ]);\n }\n }\n}\n```\n\n### Step 8: Suricata Rules for Known DNS C2 Tools\n\n```\n# suricata-dns-c2.rules\n# DNS Tunneling and C2 Detection Rules\n\n# Iodine DNS tunnel detection\nalert dns any any -> any any (msg:\"ET TROJAN Iodine DNS Tunnel Activity - NULL Query\"; \\\n dns.query; content:\".\"; pcre:\"/^[a-z0-9]{50,}\\.[a-z0-9.-]+$/i\"; \\\n dns_query; content:\"|00 0a|\"; \\\n classtype:trojan-activity; sid:2030001; rev:1;)\n\n# dnscat2 DNS tunnel detection\nalert dns any any -> any any (msg:\"ET TROJAN dnscat2 DNS Tunnel - Handshake\"; \\\n dns.query; content:\"dnscat.\"; nocase; fast_pattern; \\\n classtype:trojan-activity; sid:2030002; rev:1;)\n\nalert dns any any -> any any (msg:\"ET TROJAN dnscat2 DNS Tunnel - Data Channel\"; \\\n dns.query; pcre:\"/^[a-f0-9]{16,}\\./i\"; \\\n dns_query; content:\"|00 10|\"; \\\n classtype:trojan-activity; sid:2030003; rev:1;)\n\n# Cobalt Strike DNS beacon\nalert dns any any -> any any (msg:\"ET TROJAN Cobalt Strike DNS Beacon - A Record\"; \\\n dns.query; pcre:\"/^[a-f0-9]{12,}\\.[a-z0-9.-]+$/i\"; \\\n threshold:type both, track by_src, count 20, seconds 60; \\\n classtype:trojan-activity; sid:2030004; rev:1;)\n\n# Generic DNS tunneling - high volume TXT queries to single domain\nalert dns any any -> any any (msg:\"ET POLICY Excessive TXT DNS Queries - Possible Tunneling\"; \\\n dns_query; content:\"|00 10|\"; \\\n threshold:type threshold, track by_src, count 50, seconds 300; \\\n classtype:policy-violation; sid:2030005; rev:1;)\n\n# Long subdomain query (generic tunneling indicator)\nalert dns any any -> any any (msg:\"ET POLICY Unusually Long DNS Subdomain - Possible Tunneling\"; \\\n dns.query; pcre:\"/^[a-z0-9-]{52,}\\./i\"; \\\n threshold:type limit, track by_src, count 1, seconds 60; \\\n classtype:policy-violation; sid:2030006; rev:1;)\n\n# DNS query for known C2 TXT payload staging\nalert dns any any -> any any (msg:\"ET TROJAN DNS TXT Record Staged Payload Request\"; \\\n dns_query; content:\"|00 10|\"; \\\n dns.query; pcre:\"/^(stage|payload|cmd|exec|download|update|config)\\d*\\./i\"; \\\n classtype:trojan-activity; sid:2030007; rev:1;)\n```\n\n## Key Concepts\n\n| Term | Definition |\n|------|------------|\n| **DNS Tunneling** | Technique of encoding data within DNS queries and responses to create a covert communication channel, bypassing firewalls that allow DNS traffic |\n| **Shannon Entropy** | Information theory metric measuring randomness in a string; legitimate domains typically have entropy below 3.5, while encoded tunnel data exceeds 3.8-4.5 |\n| **Domain Generation Algorithm (DGA)** | Malware technique that algorithmically generates thousands of pseudo-random domain names for C2 rendezvous, making domain-based blocking impractical |\n| **DNS Beaconing** | Regular, periodic DNS queries from a compromised host to a C2 domain, identifiable by consistent inter-query intervals and low timing jitter |\n| **TXT Record Abuse** | Using DNS TXT records to deliver encoded C2 commands or staged payloads, exploiting the large payload capacity (up to 65535 bytes across multiple strings) |\n| **Iodine** | Open-source DNS tunneling tool that tunnels IPv4 traffic through DNS using NULL, TXT, or CNAME records, commonly used to bypass captive portals |\n| **dnscat2** | Encrypted C2 tool that creates a command channel over DNS, supporting file transfer, port forwarding, and shell access through DNS queries |\n| **Cobalt Strike DNS Beacon** | Commercial C2 framework's DNS communication mode that uses A, AAAA, and TXT records to receive tasks and return results via DNS resolution |\n| **Passive DNS (pDNS)** | Database of historical DNS resolution data collected by monitoring DNS traffic; used to identify infrastructure reuse and domain history |\n| **Response Policy Zone (RPZ)** | DNS firewall mechanism that allows real-time blocking of malicious domains by injecting override responses at the recursive resolver level |\n| **Coefficient of Variation** | Standard deviation divided by mean, expressed as percentage; used to measure beacon jitter -- lower CV indicates more regular (suspicious) timing |\n| **NXDOMAIN** | DNS response code indicating the queried domain does not exist; high NXDOMAIN rates from a host suggest DGA activity where most generated domains are unregistered |\n\n## Tools & Systems\n\n- **Zeek (Bro)**: Network security monitor that produces structured dns.log with query/response details for offline analysis\n- **Suricata**: IDS/IPS with DNS protocol parsing and signature-based detection of tunneling patterns\n- **tshark/Wireshark**: Packet capture and analysis tools for deep DNS protocol inspection\n- **tldextract**: Python library for accurate domain/subdomain extraction using the Public Suffix List\n- **dnspython**: Python DNS toolkit for programmatic query resolution and record parsing\n- **scikit-learn**: ML library used to train DGA classifiers (Random Forest, Gradient Boosting)\n- **Farsight DNSDB / CIRCL pDNS**: Passive DNS databases for historical domain resolution lookups\n- **DNS Response Policy Zone (RPZ)**: Recursive resolver feature for real-time DNS blocking of identified C2 domains\n- **Splunk / Elastic**: SIEM platforms for DNS log aggregation, entropy calculation, and beacon detection queries\n\n## Common Scenarios\n\n### Scenario: Investigating Suspected DNS Tunneling from an Internal Host\n\n**Context**: The SOC receives an alert from the DNS firewall showing a single internal host (10.1.5.42) making 15,000+ DNS queries to the domain `c8a3f1e2.tunnelsvc.example.com` in the past hour. All queries are TXT type with long, random-looking subdomains. Normal DNS volume for this host is ~200 queries/hour.\n\n**Approach**:\n1. Extract all DNS queries from 10.1.5.42 for the past 24 hours from Zeek dns.log\n2. Run entropy analysis on subdomain strings -- expect Shannon entropy > 4.0 for encoded tunnel data\n3. Check query timing intervals for beaconing pattern (likely sub-second for active tunnel)\n4. Examine TXT record responses for size anomalies (tunnel tools use maximum-size TXT responses)\n5. Compare subdomain patterns against known tool signatures (Iodine, dnscat2, dns2tcp)\n6. Query passive DNS for `tunnelsvc.example.com` registration date, nameserver, and historical resolutions\n7. If confirmed, add domain to DNS RPZ blocklist and isolate endpoint via EDR\n8. Capture full packet trace for forensic analysis of tunnel payload content\n\n**Pitfalls**:\n- Blocking the domain before capturing evidence (need packet captures for forensics)\n- Assuming all high-entropy DNS is malicious (CDN subdomains like Akamai can have high entropy)\n- Not checking for multiple tunnel domains (attacker may have fallback C2 channels)\n- Missing the initial compromise vector by focusing only on the DNS channel\n- Not checking other hosts for similar patterns (lateral movement may have already occurred)\n\n### Scenario: Building a DGA Detection Model for SOC Deployment\n\n**Context**: The threat intelligence team identified that a botnet family active in the industry uses DGA for C2 domain generation. The SOC needs an automated way to classify DNS queries as potentially DGA-generated and alert on matches.\n\n**Approach**:\n1. Collect training data: Tranco/Alexa top 1M for legitimate domains, DGArchive or OSINT feeds for known DGA domains\n2. Extract character-level features: entropy, length, digit ratio, consonant sequences, bigram scores\n3. Train Random Forest and Gradient Boosting classifiers, evaluate with 5-fold cross-validation\n4. Deploy the model as a scoring enrichment in the SIEM (Splunk ML Toolkit or Elastic ML)\n5. Set threshold: DGA probability > 0.85 generates alert, > 0.65 generates investigation ticket\n6. Create a whitelist of known high-entropy legitimate domains (CDNs, cloud services) to reduce false positives\n7. Retrain monthly with new DGA samples from threat intel feeds\n\n**Pitfalls**:\n- Training only on one DGA family and missing others (dictionary-based DGAs like Suppobox have low entropy)\n- Not whitelisting CDN and cloud service domains that have randomized subdomains\n- Setting the threshold too low, overwhelming the SOC with false positives\n- Not accounting for punycode/internationalized domain names in feature extraction\n- Deploying without a feedback loop for analysts to flag false positives for model retraining\n\n## Output Format\n\n```\nDNS C2 DETECTION ANALYSIS REPORT\n====================================\nAnalysis Period: 2026-03-15 00:00 to 2026-03-19 23:59\nData Source: Zeek dns.log (gateway sensor)\nTotal Queries: 14,283,501\nUnique Domains: 892,041\nHosts Analyzed: 3,847\n\nENTROPY ANALYSIS\nQueries with entropy > 3.5: 2,847 (0.02%)\nQueries with subdomain > 40 chars: 1,203 (0.008%)\nSuspicious base domains: 12\n\n [CRITICAL] tunnelsvc.example[.]com\n Queries: 15,247 Source: 10.1.5.42 Avg Entropy: 4.21\n Avg Subdomain Length: 63 Record Types: TXT (98%), A (2%)\n Tool Signature: dnscat2 (hex prefix pattern match)\n\n [HIGH] update-cdn.malicious[.]net\n Queries: 3,891 Source: 10.1.12.7 Avg Entropy: 3.87\n Avg Subdomain Length: 48 Record Types: A (60%), TXT (40%)\n Tool Signature: Cobalt Strike DNS beacon (interval pattern)\n\nBEACONING DETECTION\nBeacon patterns detected: 4\n\n Score: 85.0 10.1.5.42 -> tunnelsvc.example[.]com\n Interval: 0.5s +/- 0.1s Jitter: 8.2% Duration: 18.4h\n Queries: 15,247 Flags: very_low_jitter, persistent, high_volume\n\n Score: 72.0 10.1.12.7 -> update-cdn.malicious[.]net\n Interval: 60.2s +/- 3.1s Jitter: 5.1% Duration: 72.1h\n Queries: 3,891 Flags: very_low_jitter, persistent, common_c2_interval:~60s\n\nDGA CLASSIFICATION\nDomains classified: 892,041\nDGA predictions (>0.85 conf): 47\nDGA predictions (0.65-0.85): 183\n\n [HIGH] a8f3k2m1x9.com (DGA prob: 0.97, entropy: 3.92)\n [HIGH] j7t2p5q8w3.net (DGA prob: 0.95, entropy: 4.01)\n [HIGH] m3x8k1f6y2.org (DGA prob: 0.94, entropy: 3.88)\n\nTXT RECORD ANALYSIS\nSuspicious TXT responses: 8\nBase64 payloads detected: 3\nPowerShell stager patterns: 1\n\n [CRITICAL] cmd.staging[.]example.com\n TXT Length: 4,096 Entropy: 5.82\n Finding: Base64-encoded PowerShell stager with IEX pattern\n\nRECOMMENDED ACTIONS\n[CRITICAL] Block tunnelsvc.example[.]com and update-cdn.malicious[.]net in DNS RPZ\n[CRITICAL] Isolate hosts 10.1.5.42 and 10.1.12.7 for forensic investigation\n[HIGH] Block 47 high-confidence DGA domains in DNS firewall\n[HIGH] Investigate cmd.staging[.]example.com TXT payload staging\n[MEDIUM] Review 183 moderate-confidence DGA domains with threat intel\n[MEDIUM] Deploy Suricata rules for dnscat2 and Cobalt Strike DNS signatures\n```\n---","attachment_filenames":["references/api-reference.md","scripts/agent.py"],"attachments":[{"filename":"references/api-reference.md","content":"# DNS C2 Detection API Reference\n\n## MITRE ATT&CK Mapping\n\n| Technique | ID | Description |\n|-----------|----|-------------|\n| Application Layer Protocol: DNS | T1071.004 | C2 communication over DNS protocol |\n| Exfiltration Over Alternative Protocol | T1048 | Data exfiltration via DNS tunneling |\n| Dynamic Resolution: Domain Generation Algorithms | T1568.002 | DGA-based C2 infrastructure |\n| Protocol Tunneling | T1572 | Tunneling arbitrary traffic through DNS |\n| Encrypted Channel | T1573 | Encrypted C2 payloads in DNS records |\n\n## DNS Record Types Used in C2\n\n| Record Type | Typical C2 Use | Max Data Per Query |\n|-------------|----------------|--------------------|\n| A | Beacon check-in, small responses (IP-encoded) | 4 bytes (IPv4 address) |\n| AAAA | Beacon check-in, slightly larger responses | 16 bytes (IPv6 address) |\n| TXT | Command delivery, large payload transfer | ~255 bytes per string, multiple strings |\n| CNAME | Data exfiltration in subdomain, response tunneling | ~253 bytes |\n| MX | Data tunneling via preference + exchange fields | ~253 bytes |\n| NULL | Iodine tunnel primary record type | ~65535 bytes |\n| SRV | C2 with port/priority metadata | ~253 bytes |\n\n## Shannon Entropy Thresholds\n\n| Entropy Range | Classification | Typical Source |\n|---------------|----------------|----------------|\n| 0.0 - 2.0 | Very low | Single-character or trivial labels |\n| 2.0 - 3.0 | Normal | Common English-based domain labels |\n| 3.0 - 3.5 | Elevated | Long or mixed-case labels, some CDNs |\n| 3.5 - 4.0 | Suspicious | Hex-encoded data, base32 encoding, DGA |\n| 4.0 - 4.5 | High | DNS tunneling (Iodine, dnscat2, dns2tcp) |\n| 4.5+ | Very high | Encrypted or base64-encoded payloads |\n\n## Known Tunneling Tool Signatures\n\n### Iodine\n- **Encoding**: Base32, Base64, Base128, Raw\n- **Record types**: NULL (primary), TXT, CNAME, MX, A\n- **Subdomain pattern**: Long alphanumeric strings (50+ chars)\n- **Entropy range**: 3.8 - 4.2\n- **Detection**: High query volume to single domain, NULL record type queries\n\n### dnscat2\n- **Encoding**: Hex-encoded, encrypted\n- **Record types**: TXT, CNAME, MX, A\n- **Subdomain pattern**: Hex strings (16+ chars), optional `dnscat.` prefix\n- **Entropy range**: 3.5 - 4.5\n- **Detection**: Consistent query intervals, hex-only subdomain labels\n\n### dns2tcp\n- **Encoding**: Base32\n- **Record types**: TXT, KEY\n- **Subdomain pattern**: Base32 strings (20+ chars)\n- **Entropy range**: 3.6 - 4.0\n- **Detection**: KEY record type usage, base32 character set\n\n### Cobalt Strike DNS Beacon\n- **Encoding**: Hex-encoded metadata\n- **Record types**: A, AAAA, TXT\n- **Subdomain pattern**: Short hex strings (8-20 chars)\n- **Entropy range**: 3.2 - 4.0\n- **Detection**: Regular beacon intervals (default 60s), A-record check-ins followed by TXT downloads\n\n### Sliver DNS C2\n- **Encoding**: Base32/custom\n- **Record types**: A, TXT\n- **Subdomain pattern**: Alphanumeric strings (30+ chars)\n- **Entropy range**: 3.5 - 4.2\n- **Detection**: High subdomain length variance, mixed record types\n\n## DGA Feature Extraction\n\n| Feature | Description | DGA Indicator |\n|---------|-------------|---------------|\n| Shannon entropy | Bits per character of domain label | > 3.5 |\n| Label length | Character count of domain (excl. TLD) | > 15 unusual |\n| Consonant ratio | Consonants / total alphabetic chars | > 0.7 |\n| Digit ratio | Digits / total characters | > 0.3 |\n| Vowel-consonant ratio | Vowels / consonants | \u003c 0.3 |\n| Bigram frequency score | Average English bigram match frequency | \u003c 0.002 |\n| Hex character ratio | Hex chars / total chars | > 0.8 |\n| Max consecutive consonants | Longest consonant run | > 4 |\n| Unique character ratio | Unique chars / total chars | \u003c 0.4 |\n| Has dictionary words | Whether label contains English words | No = DGA indicator |\n\n## Beaconing Detection Parameters\n\n| Parameter | Typical Threshold | Description |\n|-----------|-------------------|-------------|\n| Interval regularity | Jitter \u003c 10% of mean interval | Low variance indicates automated beaconing |\n| Min queries | > 50 queries to same domain | Sufficient data for statistical analysis |\n| Time span | > 1 hour | Beacon must persist across time |\n| Consistent query size | Std dev \u003c 5 bytes | Tunnel payloads have consistent sizes |\n| Night-time activity | Queries during 00:00-06:00 | Unusual for legitimate user browsing |\n| Single source | 1-3 source IPs per domain | C2 typically from compromised host only |\n\n## Zeek DNS Log Fields\n\n| Field | Type | Forensic Use |\n|-------|------|--------------|\n| ts | time | Query timestamp |\n| uid | string | Connection UID |\n| id.orig_h | addr | Source IP (compromised host) |\n| id.resp_h | addr | DNS resolver IP |\n| query | string | Full queried domain name |\n| qtype_name | string | Query type (A, TXT, NULL, CNAME) |\n| rcode_name | string | Response code (NOERROR, NXDOMAIN) |\n| answers | vector | Response records |\n| TTLs | vector | TTL values for answers |\n| rejected | bool | Whether query was rejected |\n\n## Suricata EVE DNS Fields\n\n| Field | Type | Forensic Use |\n|-------|------|--------------|\n| timestamp | string | Event timestamp (ISO 8601) |\n| src_ip | string | Source IP |\n| dest_ip | string | Destination IP (resolver) |\n| dns.type | string | \"query\" or \"answer\" |\n| dns.rrname | string | Queried domain name |\n| dns.rrtype | string | Record type |\n| dns.rcode | string | Response code |\n| dns.answers | array | Response answer records |\n| dns.tx_id | int | Transaction ID |\n\n## Suricata Rules for DNS C2\n\n```\n# Detect high-entropy DNS queries (potential tunneling)\nalert dns any any -> any any (msg:\"ET DNS Potential DNS Tunneling - High Entropy Query\"; dns.query; pcre:\"/^[a-z0-9]{30,}\\./i\"; threshold:type threshold, track by_src, count 10, seconds 60; sid:9000001; rev:1;)\n\n# Detect TXT record queries to unusual domains\nalert dns any any -> any any (msg:\"ET DNS Suspicious TXT Record Query Volume\"; dns.query; dns_query; content:\"|00 10|\"; threshold:type threshold, track by_src, count 20, seconds 60; sid:9000002; rev:1;)\n\n# Detect NULL record queries (Iodine indicator)\nalert dns any any -> any any (msg:\"ET DNS NULL Record Query - Possible Iodine Tunnel\"; dns.query; content:\"|00 0a|\"; threshold:type threshold, track by_src, count 5, seconds 60; sid:9000003; rev:1;)\n```\n\n## Splunk SPL Queries\n\n```spl\n# High-entropy DNS subdomain detection\nindex=dns sourcetype=zeek_dns\n| eval subdomain=mvindex(split(query,\".\"),0)\n| eval sub_len=len(subdomain)\n| where sub_len > 20\n| eval entropy=0\n| stats count dc(query) as unique_queries avg(sub_len) as avg_len by src_ip query_type\n| where count > 50 AND avg_len > 25\n\n# DNS beaconing detection via standard deviation\nindex=dns sourcetype=zeek_dns\n| sort 0 _time\n| streamstats current=f last(_time) as prev_time by src_ip query\n| eval interval=_time - prev_time\n| stats count avg(interval) as avg_interval stdev(interval) as stdev_interval by src_ip query\n| where count > 50 AND stdev_interval \u003c (avg_interval * 0.1)\n| table src_ip query count avg_interval stdev_interval\n```\n\n## Python API - Key Functions\n\n```python\n# Shannon entropy calculation\nimport math\nfrom collections import Counter\n\ndef shannon_entropy(data):\n counter = Counter(data)\n length = len(data)\n return -sum((c / length) * math.log2(c / length) for c in counter.values())\n\n# DGA feature extraction\ndef extract_features(domain):\n return {\n \"length\": len(domain),\n \"entropy\": shannon_entropy(domain),\n \"digit_ratio\": sum(c.isdigit() for c in domain) / len(domain),\n \"consonant_ratio\": sum(c in \"bcdfghjklmnpqrstvwxyz\" for c in domain.lower()) / max(sum(c.isalpha() for c in domain), 1),\n }\n```\n\n## References\n\n- Zeek DNS logging: https://docs.zeek.org/en/current/scripts/base/protocols/dns/main.zeek.html\n- Suricata DNS rules: https://docs.suricata.io/en/latest/rules/dns-keywords.html\n- Iodine DNS tunnel: https://github.com/yarrick/iodine\n- dnscat2: https://github.com/iagox86/dnscat2\n- dns2tcp: https://github.com/alex-sector/dns2tcp\n- Cobalt Strike DNS beacon: https://hstechdocs.helpsystems.com/manuals/cobaltstrike/current/userguide/content/topics/listener-setup_dns-beacon.htm\n- SANS DNS tunneling detection: https://www.sans.org/white-papers/34152/\n- MITRE T1071.004: https://attack.mitre.org/techniques/T1071/004/\n- MITRE T1568.002: https://attack.mitre.org/techniques/T1568/002/\n","content_type":"text/markdown; charset=utf-8","language":"markdown","size":8333,"content_sha256":"d7715d9e11858d671cb4e5a3ac8b5e0ba6f00d9ba315c9d4aac6cdaa1a518cfc"},{"filename":"scripts/agent.py","content":"#!/usr/bin/env python3\n\"\"\"\nDNS C2 Detection Agent\n\nComprehensive detection pipeline for command-and-control communications over DNS.\nCombines Shannon entropy analysis, DNS beaconing detection, DGA classification,\nTXT record payload inspection, and known tool signature matching.\n\nUsage:\n python agent.py --dns-log /path/to/dns.log --format zeek\n python agent.py --dns-log /path/to/eve.json --format suricata\n python agent.py --dns-log /path/to/queries.csv --format csv\n python agent.py --mode train-dga --legit-domains legit.txt --dga-domains dga.txt\n python agent.py --mode entropy --dns-log dns.log --format zeek\n\nRequirements:\n pip install numpy scikit-learn tldextract\n\"\"\"\n\nimport argparse\nimport base64\nimport csv\nimport json\nimport math\nimport os\nimport re\nimport sys\nfrom collections import Counter, defaultdict\nfrom datetime import datetime, timedelta\nfrom pathlib import Path\n\nimport numpy as np\n\ntry:\n import tldextract\n HAS_TLDEXTRACT = True\nexcept ImportError:\n HAS_TLDEXTRACT = False\n\ntry:\n from sklearn.ensemble import RandomForestClassifier, GradientBoostingClassifier\n from sklearn.model_selection import train_test_split, cross_val_score\n from sklearn.metrics import classification_report, confusion_matrix\n from sklearn.preprocessing import StandardScaler\n import pickle\n HAS_SKLEARN = True\nexcept ImportError:\n HAS_SKLEARN = False\n\n\n# ---------------------------------------------------------------------------\n# Constants\n# ---------------------------------------------------------------------------\n\nVOWELS = set(\"aeiou\")\nCONSONANTS = set(\"bcdfghjklmnpqrstvwxyz\")\nHEX_CHARS = set(\"0123456789abcdef\")\nBASE32_CHARS = set(\"abcdefghijklmnopqrstuvwxyz234567\")\n\n# English bigram frequencies (top 40, from Peter Norvig's analysis)\nENGLISH_BIGRAMS = {\n \"th\": 0.0356, \"he\": 0.0307, \"in\": 0.0243, \"er\": 0.0205,\n \"an\": 0.0199, \"re\": 0.0185, \"on\": 0.0176, \"at\": 0.0149,\n \"en\": 0.0145, \"nd\": 0.0135, \"ti\": 0.0134, \"es\": 0.0134,\n \"or\": 0.0128, \"te\": 0.0120, \"of\": 0.0117, \"ed\": 0.0117,\n \"is\": 0.0113, \"it\": 0.0112, \"al\": 0.0109, \"ar\": 0.0107,\n \"st\": 0.0105, \"to\": 0.0104, \"nt\": 0.0104, \"ng\": 0.0095,\n \"se\": 0.0093, \"ha\": 0.0093, \"as\": 0.0087, \"ou\": 0.0087,\n \"io\": 0.0083, \"le\": 0.0083, \"ve\": 0.0083, \"co\": 0.0079,\n \"me\": 0.0079, \"de\": 0.0076, \"hi\": 0.0076, \"ri\": 0.0073,\n \"ro\": 0.0073, \"ic\": 0.0070, \"ne\": 0.0069, \"ea\": 0.0069,\n}\n\n# Known tunneling tool signatures\nTOOL_SIGNATURES = {\n \"iodine\": {\n \"pattern\": re.compile(r\"^[a-z0-9]{50,}\\.\", re.IGNORECASE),\n \"qtypes\": {\"NULL\", \"TXT\", \"CNAME\", \"MX\", \"A\"},\n \"entropy_range\": (3.8, 4.2),\n \"description\": \"Iodine DNS tunnel - IPv4 over DNS\",\n },\n \"dnscat2\": {\n \"pattern\": re.compile(r\"^(dnscat\\.)|^[a-f0-9]{16,}\\.\", re.IGNORECASE),\n \"qtypes\": {\"TXT\", \"CNAME\", \"MX\", \"A\"},\n \"entropy_range\": (3.5, 4.5),\n \"description\": \"dnscat2 encrypted C2 channel\",\n },\n \"dns2tcp\": {\n \"pattern\": re.compile(r\"^[a-z2-7]{20,}\\.\", re.IGNORECASE),\n \"qtypes\": {\"TXT\", \"KEY\"},\n \"entropy_range\": (3.6, 4.0),\n \"description\": \"dns2tcp TCP-over-DNS tunnel\",\n },\n \"cobalt_strike_dns\": {\n \"pattern\": re.compile(r\"^[a-f0-9]{8,20}\\.\", re.IGNORECASE),\n \"qtypes\": {\"A\", \"AAAA\", \"TXT\"},\n \"entropy_range\": (3.2, 4.0),\n \"description\": \"Cobalt Strike DNS beacon\",\n },\n \"sliver_dns\": {\n \"pattern\": re.compile(r\"^[a-z0-9]{30,}\\.\", re.IGNORECASE),\n \"qtypes\": {\"A\", \"TXT\"},\n \"entropy_range\": (3.5, 4.2),\n \"description\": \"Sliver C2 DNS implant\",\n },\n}\n\n# Common legitimate high-entropy domains to whitelist\nWHITELIST_PATTERNS = [\n re.compile(r\".*\\.in-addr\\.arpa$\"),\n re.compile(r\".*\\.ip6\\.arpa$\"),\n re.compile(r\".*\\._domainkey\\..*\"),\n re.compile(r\".*\\._dmarc\\..*\"),\n re.compile(r\".*\\._spf\\..*\"),\n re.compile(r\".*\\.akadns\\.net$\"),\n re.compile(r\".*\\.akamaiedge\\.net$\"),\n re.compile(r\".*\\.cloudfront\\.net$\"),\n re.compile(r\".*\\.googleapis\\.com$\"),\n re.compile(r\".*\\.windows\\.net$\"),\n re.compile(r\".*\\.azure-dns\\..*\"),\n re.compile(r\".*\\.1e100\\.net$\"),\n]\n\n\n# ---------------------------------------------------------------------------\n# Core Functions\n# ---------------------------------------------------------------------------\n\ndef shannon_entropy(data):\n \"\"\"Calculate Shannon entropy of a string in bits per character.\"\"\"\n if not data:\n return 0.0\n counter = Counter(data)\n length = len(data)\n return -sum((c / length) * math.log2(c / length) for c in counter.values())\n\n\ndef extract_subdomain(fqdn):\n \"\"\"Extract subdomain and base domain from FQDN.\"\"\"\n fqdn = fqdn.lower().rstrip(\".\")\n if HAS_TLDEXTRACT:\n ext = tldextract.extract(fqdn)\n subdomain = ext.subdomain or \"\"\n base = f\"{ext.domain}.{ext.suffix}\" if ext.suffix else ext.domain\n return subdomain, base\n else:\n parts = fqdn.split(\".\")\n if len(parts) > 2:\n return \".\".join(parts[:-2]), \".\".join(parts[-2:])\n return \"\", fqdn\n\n\ndef is_whitelisted(fqdn):\n \"\"\"Check if domain matches a known-legitimate pattern.\"\"\"\n for pattern in WHITELIST_PATTERNS:\n if pattern.match(fqdn.lower()):\n return True\n return False\n\n\ndef parse_timestamp(ts_str):\n \"\"\"Parse various timestamp formats.\"\"\"\n formats = [\n \"%Y-%m-%dT%H:%M:%S.%fZ\",\n \"%Y-%m-%dT%H:%M:%S.%f\",\n \"%Y-%m-%dT%H:%M:%S\",\n \"%Y-%m-%d %H:%M:%S.%f\",\n \"%Y-%m-%d %H:%M:%S\",\n ]\n for fmt in formats:\n try:\n return datetime.strptime(ts_str, fmt)\n except ValueError:\n continue\n try:\n return datetime.utcfromtimestamp(float(ts_str))\n except (ValueError, OverflowError, OSError):\n return None\n\n\n# ---------------------------------------------------------------------------\n# Log Parsers\n# ---------------------------------------------------------------------------\n\ndef parse_zeek_dns_log(filepath):\n \"\"\"Parse Zeek dns.log (tab-separated format).\"\"\"\n queries = []\n with open(filepath, \"r\", encoding=\"utf-8\", errors=\"replace\") as f:\n headers = None\n for line in f:\n line = line.strip()\n if line.startswith(\"#fields\"):\n headers = line.split(\"\\t\")[1:]\n continue\n if line.startswith(\"#\") or not line:\n continue\n\n fields = line.split(\"\\t\")\n if headers and len(fields) >= len(headers):\n record = dict(zip(headers, fields))\n elif len(fields) >= 10:\n record = {\n \"ts\": fields[0],\n \"id.orig_h\": fields[2],\n \"query\": fields[9] if len(fields) > 9 else \"\",\n \"qtype_name\": fields[13] if len(fields) > 13 else \"\",\n \"answers\": fields[21] if len(fields) > 21 else \"\",\n }\n else:\n continue\n\n ts = record.get(\"ts\", \"\")\n src_ip = record.get(\"id.orig_h\", \"\")\n query = record.get(\"query\", \"\")\n qtype = record.get(\"qtype_name\", record.get(\"qtype\", \"\"))\n answers = record.get(\"answers\", \"\")\n\n if query and query != \"-\":\n queries.append({\n \"timestamp\": ts,\n \"src_ip\": src_ip,\n \"query\": query,\n \"qtype\": qtype,\n \"answers\": answers,\n })\n\n return queries\n\n\ndef parse_suricata_eve(filepath):\n \"\"\"Parse Suricata EVE JSON log for DNS events.\"\"\"\n queries = []\n with open(filepath, \"r\", encoding=\"utf-8\", errors=\"replace\") as f:\n for line in f:\n line = line.strip()\n if not line:\n continue\n try:\n event = json.loads(line)\n except json.JSONDecodeError:\n continue\n\n if event.get(\"event_type\") != \"dns\":\n continue\n\n dns = event.get(\"dns\", {})\n query = dns.get(\"rrname\", dns.get(\"query\", \"\"))\n qtype = dns.get(\"rrtype\", dns.get(\"type\", \"\"))\n src_ip = event.get(\"src_ip\", \"\")\n ts = event.get(\"timestamp\", \"\")\n\n answers_list = dns.get(\"answers\", [])\n answers = \"\"\n if isinstance(answers_list, list):\n answers = \",\".join(\n a.get(\"rdata\", \"\") for a in answers_list if isinstance(a, dict)\n )\n\n if query:\n queries.append({\n \"timestamp\": ts,\n \"src_ip\": src_ip,\n \"query\": query,\n \"qtype\": str(qtype),\n \"answers\": answers,\n })\n\n return queries\n\n\ndef parse_csv_dns(filepath):\n \"\"\"Parse CSV DNS log with columns: timestamp, src_ip, query, qtype, answers.\"\"\"\n queries = []\n with open(filepath, \"r\", encoding=\"utf-8\", errors=\"replace\") as f:\n reader = csv.DictReader(f)\n for row in reader:\n query = row.get(\"query\", row.get(\"domain\", row.get(\"qname\", \"\")))\n if query:\n queries.append({\n \"timestamp\": row.get(\"timestamp\", row.get(\"ts\", \"\")),\n \"src_ip\": row.get(\"src_ip\", row.get(\"source\", row.get(\"client_ip\", \"\"))),\n \"query\": query,\n \"qtype\": row.get(\"qtype\", row.get(\"type\", row.get(\"qtype_name\", \"\"))),\n \"answers\": row.get(\"answers\", row.get(\"answer\", \"\")),\n })\n return queries\n\n\ndef load_dns_queries(filepath, fmt=\"zeek\"):\n \"\"\"Load DNS queries from log file.\"\"\"\n parsers = {\n \"zeek\": parse_zeek_dns_log,\n \"suricata\": parse_suricata_eve,\n \"csv\": parse_csv_dns,\n }\n parser = parsers.get(fmt)\n if not parser:\n print(f\"[ERROR] Unknown format '{fmt}'. Supported: {', '.join(parsers.keys())}\")\n return []\n return parser(filepath)\n\n\n# ---------------------------------------------------------------------------\n# Entropy Analysis\n# ---------------------------------------------------------------------------\n\ndef analyze_entropy(queries, entropy_threshold=3.5, length_threshold=30):\n \"\"\"Analyze DNS queries for tunneling indicators via entropy and subdomain length.\"\"\"\n results = []\n\n for q in queries:\n fqdn = q.get(\"query\", \"\").lower().rstrip(\".\")\n if not fqdn or is_whitelisted(fqdn):\n continue\n\n subdomain, base_domain = extract_subdomain(fqdn)\n if not subdomain:\n continue\n\n flat = subdomain.replace(\".\", \"\")\n if not flat:\n continue\n\n entropy = shannon_entropy(flat)\n length = len(flat)\n label_count = subdomain.count(\".\") + 1\n\n score = 0.0\n flags = []\n\n # Entropy scoring\n if entropy > 4.0:\n score += (entropy - 3.5) * 30\n flags.append(f\"very_high_entropy:{entropy:.2f}\")\n elif entropy > entropy_threshold:\n score += (entropy - entropy_threshold) * 25\n flags.append(f\"high_entropy:{entropy:.2f}\")\n\n # Length scoring\n if length > 50:\n score += (length - 30) * 0.8\n flags.append(f\"very_long_subdomain:{length}\")\n elif length > length_threshold:\n score += (length - length_threshold) * 0.5\n flags.append(f\"long_subdomain:{length}\")\n\n # Label count\n if label_count > 5:\n score += label_count * 3\n flags.append(f\"many_labels:{label_count}\")\n\n # Encoding detection\n hex_ratio = sum(1 for c in flat if c in HEX_CHARS) / len(flat)\n if hex_ratio > 0.85 and length > 20:\n score += 20\n flags.append(\"hex_encoded\")\n\n b32_ratio = sum(1 for c in flat if c in BASE32_CHARS) / len(flat)\n if b32_ratio > 0.95 and length > 20 and hex_ratio \u003c= 0.85:\n score += 15\n flags.append(\"base32_encoded\")\n\n # Tool signature matching\n for tool_name, sig in TOOL_SIGNATURES.items():\n if sig[\"pattern\"].match(fqdn):\n qtype = q.get(\"qtype\", \"\").upper()\n if not qtype or qtype in sig[\"qtypes\"]:\n ent_low, ent_high = sig[\"entropy_range\"]\n if ent_low \u003c= entropy \u003c= ent_high or entropy > ent_high:\n score += 25\n flags.append(f\"tool_sig:{tool_name}\")\n break\n\n if flags:\n results.append({\n \"fqdn\": fqdn,\n \"subdomain\": subdomain,\n \"base_domain\": base_domain,\n \"entropy\": round(entropy, 4),\n \"subdomain_length\": length,\n \"label_count\": label_count,\n \"score\": round(score, 2),\n \"flags\": flags,\n \"src_ip\": q.get(\"src_ip\", \"\"),\n \"timestamp\": q.get(\"timestamp\", \"\"),\n \"qtype\": q.get(\"qtype\", \"\"),\n })\n\n results.sort(key=lambda x: x[\"score\"], reverse=True)\n return results\n\n\n# ---------------------------------------------------------------------------\n# Beaconing Detection\n# ---------------------------------------------------------------------------\n\ndef detect_beaconing(queries, min_queries=10, max_jitter_pct=25,\n min_interval=10, max_interval=7200):\n \"\"\"Detect periodic DNS beaconing patterns.\"\"\"\n groups = defaultdict(list)\n\n for q in queries:\n src_ip = q.get(\"src_ip\", \"\")\n fqdn = q.get(\"query\", \"\").lower().rstrip(\".\")\n ts = parse_timestamp(q.get(\"timestamp\", \"\"))\n if not ts or not src_ip or not fqdn:\n continue\n\n _, base_domain = extract_subdomain(fqdn)\n if is_whitelisted(fqdn):\n continue\n groups[(src_ip, base_domain)].append(ts)\n\n beacons = []\n\n for (src_ip, base_domain), timestamps in groups.items():\n if len(timestamps) \u003c min_queries:\n continue\n\n timestamps.sort()\n intervals = np.array([\n (timestamps[i+1] - timestamps[i]).total_seconds()\n for i in range(len(timestamps) - 1)\n ])\n\n # Remove zero/negative intervals\n intervals = intervals[intervals > 0]\n if len(intervals) \u003c min_queries - 1:\n continue\n\n mean_int = float(np.mean(intervals))\n std_int = float(np.std(intervals))\n median_int = float(np.median(intervals))\n\n if mean_int \u003c min_interval or mean_int > max_interval:\n continue\n\n cv = (std_int / mean_int * 100) if mean_int > 0 else 100\n if cv > max_jitter_pct:\n continue\n\n time_span = (timestamps[-1] - timestamps[0]).total_seconds()\n hours = time_span / 3600\n\n score = 0.0\n flags = []\n\n if cv \u003c 5:\n score += 40\n flags.append(f\"very_low_jitter:CV={cv:.1f}%\")\n elif cv \u003c 15:\n score += 30\n flags.append(f\"low_jitter:CV={cv:.1f}%\")\n else:\n score += 15\n flags.append(f\"moderate_jitter:CV={cv:.1f}%\")\n\n if hours > 24:\n score += 20\n flags.append(f\"persistent:{hours:.1f}h\")\n elif hours > 4:\n score += 10\n flags.append(f\"sustained:{hours:.1f}h\")\n\n if len(timestamps) > 100:\n score += 15\n flags.append(f\"high_volume:{len(timestamps)}\")\n elif len(timestamps) > 50:\n score += 10\n\n common_intervals = [60, 120, 300, 600, 900, 1800, 3600]\n for ci in common_intervals:\n if abs(mean_int - ci) \u003c ci * 0.1:\n score += 10\n flags.append(f\"common_c2_interval:~{ci}s\")\n break\n\n beacons.append({\n \"src_ip\": src_ip,\n \"base_domain\": base_domain,\n \"query_count\": len(timestamps),\n \"mean_interval\": round(mean_int, 2),\n \"median_interval\": round(median_int, 2),\n \"std_interval\": round(std_int, 2),\n \"jitter_cv\": round(cv, 2),\n \"first_seen\": timestamps[0].isoformat(),\n \"last_seen\": timestamps[-1].isoformat(),\n \"duration_hours\": round(hours, 2),\n \"score\": round(score, 1),\n \"flags\": flags,\n })\n\n beacons.sort(key=lambda x: x[\"score\"], reverse=True)\n return beacons\n\n\n# ---------------------------------------------------------------------------\n# TXT Record Analysis\n# ---------------------------------------------------------------------------\n\ndef analyze_txt_records(queries):\n \"\"\"Analyze TXT record queries and responses for C2 payload indicators.\"\"\"\n findings = []\n\n # Filter TXT queries\n txt_queries = [\n q for q in queries\n if q.get(\"qtype\", \"\").upper() in (\"TXT\", \"16\")\n ]\n\n if not txt_queries:\n return findings\n\n # Group by base domain\n domain_groups = defaultdict(list)\n for q in txt_queries:\n fqdn = q.get(\"query\", \"\").lower().rstrip(\".\")\n if is_whitelisted(fqdn):\n continue\n _, base_domain = extract_subdomain(fqdn)\n domain_groups[base_domain].append(q)\n\n for base_domain, group in domain_groups.items():\n count = len(group)\n src_ips = set(q.get(\"src_ip\", \"\") for q in group)\n\n indicators = []\n\n # Volume anomaly\n if count > 50:\n indicators.append({\n \"type\": \"high_txt_volume\",\n \"detail\": f\"{count} TXT queries to {base_domain}\",\n \"severity\": \"high\",\n })\n elif count > 20:\n indicators.append({\n \"type\": \"elevated_txt_volume\",\n \"detail\": f\"{count} TXT queries to {base_domain}\",\n \"severity\": \"medium\",\n })\n\n # Check answer content\n for q in group:\n answer = q.get(\"answers\", \"\")\n if not answer or answer == \"-\":\n continue\n\n # Large TXT response\n if len(answer) > 500:\n indicators.append({\n \"type\": \"oversized_txt_response\",\n \"detail\": f\"TXT response length: {len(answer)}\",\n \"severity\": \"high\",\n })\n\n # High entropy in response\n ent = shannon_entropy(answer)\n if ent > 4.5 and len(answer) > 100:\n indicators.append({\n \"type\": \"high_entropy_txt\",\n \"detail\": f\"TXT response entropy: {ent:.3f}\",\n \"severity\": \"high\",\n })\n\n # Base64 pattern in response\n b64_pattern = re.compile(r'[A-Za-z0-9+/]{40,}={0,2}')\n if b64_pattern.search(answer):\n indicators.append({\n \"type\": \"base64_in_txt\",\n \"detail\": \"Base64-encoded content in TXT response\",\n \"severity\": \"high\",\n })\n\n # Try to decode and check for executable\n try:\n match = b64_pattern.search(answer)\n decoded = base64.b64decode(match.group())\n if decoded[:2] == b'MZ':\n indicators.append({\n \"type\": \"pe_in_txt\",\n \"detail\": \"PE executable found in decoded TXT response\",\n \"severity\": \"critical\",\n })\n if decoded[:4] == b'\\x7fELF':\n indicators.append({\n \"type\": \"elf_in_txt\",\n \"detail\": \"ELF executable found in decoded TXT response\",\n \"severity\": \"critical\",\n })\n decoded_str = decoded.decode(\"utf-8\", errors=\"ignore\")\n ps_patterns = [\n r\"Invoke-Expression\", r\"IEX\\s*\\(\", r\"DownloadString\",\n r\"FromBase64String\", r\"New-Object\\s+System\\.Net\",\n ]\n for pat in ps_patterns:\n if re.search(pat, decoded_str, re.IGNORECASE):\n indicators.append({\n \"type\": \"powershell_stager_in_txt\",\n \"detail\": f\"PowerShell pattern in decoded TXT: {pat}\",\n \"severity\": \"critical\",\n })\n break\n except Exception:\n pass\n\n if indicators:\n findings.append({\n \"base_domain\": base_domain,\n \"txt_query_count\": count,\n \"source_ips\": sorted(src_ips),\n \"indicators\": indicators,\n \"max_severity\": max(\n (i[\"severity\"] for i in indicators),\n key=lambda s: {\"critical\": 4, \"high\": 3, \"medium\": 2, \"low\": 1}.get(s, 0)\n ),\n \"sample_queries\": [q[\"query\"] for q in group[:5]],\n })\n\n findings.sort(\n key=lambda x: {\"critical\": 4, \"high\": 3, \"medium\": 2, \"low\": 1}.get(\n x[\"max_severity\"], 0),\n reverse=True,\n )\n return findings\n\n\n# ---------------------------------------------------------------------------\n# DGA Classification\n# ---------------------------------------------------------------------------\n\nDGA_FEATURE_COLUMNS = [\n \"length\", \"entropy\", \"digit_ratio\", \"vowel_ratio\", \"consonant_ratio\",\n \"max_consonant_run\", \"distinct_chars\", \"distinct_ratio\",\n \"english_bigram_score\", \"label_count\", \"hex_ratio\",\n \"transition_ratio\", \"repeat_ratio\", \"special_count\",\n]\n\n\ndef extract_domain_features(domain):\n \"\"\"Extract numerical features from a domain for DGA classification.\"\"\"\n domain = domain.lower().strip(\".\")\n parts = domain.split(\".\")\n analysis_str = \".\".join(parts[:-1]) if len(parts) > 1 else domain\n flat = analysis_str.replace(\".\", \"\")\n length = len(flat)\n\n if length == 0:\n return None\n\n entropy = shannon_entropy(flat)\n\n digit_count = sum(1 for c in flat if c.isdigit())\n vowel_count = sum(1 for c in flat if c in VOWELS)\n consonant_count = sum(1 for c in flat if c in CONSONANTS)\n\n max_consonant_run = 0\n current_run = 0\n for c in flat:\n if c in CONSONANTS:\n current_run += 1\n max_consonant_run = max(max_consonant_run, current_run)\n else:\n current_run = 0\n\n distinct_chars = len(set(flat))\n bigrams = [flat[i:i+2] for i in range(len(flat) - 1)]\n english_score = (\n sum(ENGLISH_BIGRAMS.get(bg, 0) for bg in bigrams) / len(bigrams)\n if bigrams else 0\n )\n\n hex_ratio = sum(1 for c in flat if c in HEX_CHARS) / length\n transitions = sum(\n 1 for i in range(1, len(flat))\n if flat[i].isdigit() != flat[i-1].isdigit()\n )\n repeats = sum(1 for i in range(1, len(flat)) if flat[i] == flat[i-1]) if length > 1 else 0\n\n return {\n \"domain\": domain,\n \"length\": length,\n \"entropy\": round(entropy, 4),\n \"digit_ratio\": round(digit_count / length, 4),\n \"vowel_ratio\": round(vowel_count / length, 4),\n \"consonant_ratio\": round(consonant_count / length, 4),\n \"max_consonant_run\": max_consonant_run,\n \"distinct_chars\": distinct_chars,\n \"distinct_ratio\": round(distinct_chars / length, 4),\n \"english_bigram_score\": round(english_score, 6),\n \"label_count\": len(parts),\n \"hex_ratio\": round(hex_ratio, 4),\n \"transition_ratio\": round(transitions / max(length - 1, 1), 4),\n \"repeat_ratio\": round(repeats / max(length - 1, 1), 4),\n \"special_count\": sum(1 for c in flat if c == '-'),\n }\n\n\ndef features_to_vector(features):\n \"\"\"Convert feature dict to numpy array.\"\"\"\n return np.array([features[col] for col in DGA_FEATURE_COLUMNS])\n\n\ndef train_dga_model(legit_domains, dga_domains, model_type=\"random_forest\",\n output_model=None):\n \"\"\"Train and evaluate a DGA classification model.\"\"\"\n if not HAS_SKLEARN:\n print(\"[ERROR] scikit-learn required: pip install scikit-learn\")\n return None, None, None\n\n print(f\"[*] Extracting features from {len(legit_domains)} legitimate \"\n f\"and {len(dga_domains)} DGA domains...\")\n\n X_legit = [features_to_vector(f) for d in legit_domains\n if (f := extract_domain_features(d)) is not None]\n X_dga = [features_to_vector(f) for d in dga_domains\n if (f := extract_domain_features(d)) is not None]\n\n if len(X_legit) \u003c 100 or len(X_dga) \u003c 100:\n print(f\"[ERROR] Insufficient data: {len(X_legit)} legit, {len(X_dga)} DGA\")\n return None, None, None\n\n print(f\" Features extracted: {len(X_legit)} legit, {len(X_dga)} DGA\")\n\n X = np.vstack([np.array(X_legit), np.array(X_dga)])\n y = np.array([0] * len(X_legit) + [1] * len(X_dga))\n\n scaler = StandardScaler()\n X_scaled = scaler.fit_transform(X)\n\n X_train, X_test, y_train, y_test = train_test_split(\n X_scaled, y, test_size=0.2, random_state=42, stratify=y\n )\n\n if model_type == \"gradient_boosting\":\n model = GradientBoostingClassifier(\n n_estimators=200, max_depth=6, learning_rate=0.1,\n min_samples_split=10, random_state=42,\n )\n else:\n model = RandomForestClassifier(\n n_estimators=200, max_depth=15, min_samples_split=5,\n random_state=42, n_jobs=-1,\n )\n\n print(f\"[*] Training {model_type} classifier...\")\n model.fit(X_train, y_train)\n\n y_pred = model.predict(X_test)\n report = classification_report(y_test, y_pred, target_names=[\"legitimate\", \"dga\"],\n output_dict=True)\n cm = confusion_matrix(y_test, y_pred)\n cv_scores = cross_val_score(model, X_scaled, y, cv=5, scoring=\"f1\")\n\n metrics = {\n \"model_type\": model_type,\n \"train_size\": len(X_train),\n \"test_size\": len(X_test),\n \"accuracy\": round(report[\"accuracy\"], 4),\n \"dga_precision\": round(report[\"dga\"][\"precision\"], 4),\n \"dga_recall\": round(report[\"dga\"][\"recall\"], 4),\n \"dga_f1\": round(report[\"dga\"][\"f1-score\"], 4),\n \"legit_precision\": round(report[\"legitimate\"][\"precision\"], 4),\n \"legit_recall\": round(report[\"legitimate\"][\"recall\"], 4),\n \"confusion_matrix\": cm.tolist(),\n \"cv_f1_mean\": round(float(cv_scores.mean()), 4),\n \"cv_f1_std\": round(float(cv_scores.std()), 4),\n \"feature_importance\": {\n k: round(float(v), 4)\n for k, v in zip(DGA_FEATURE_COLUMNS, model.feature_importances_)\n },\n }\n\n print(f\"[+] Model trained successfully\")\n print(f\" Accuracy: {metrics['accuracy']}\")\n print(f\" DGA F1: {metrics['dga_f1']}\")\n print(f\" DGA Recall: {metrics['dga_recall']}\")\n print(f\" CV F1 (5-fold): {metrics['cv_f1_mean']} +/- {metrics['cv_f1_std']}\")\n\n top_feats = sorted(metrics[\"feature_importance\"].items(),\n key=lambda x: x[1], reverse=True)[:5]\n print(f\" Top features: {', '.join(f'{k}={v:.3f}' for k, v in top_feats)}\")\n\n if output_model:\n with open(output_model, \"wb\") as f:\n pickle.dump({\"model\": model, \"scaler\": scaler, \"metrics\": metrics}, f)\n print(f\"[+] Model saved to {output_model}\")\n\n return model, scaler, metrics\n\n\ndef classify_domains_dga(domains, model, scaler, threshold=0.65):\n \"\"\"Classify domains as DGA or legitimate.\"\"\"\n results = []\n for domain in domains:\n feats = extract_domain_features(domain)\n if feats is None:\n continue\n\n vec = features_to_vector(feats).reshape(1, -1)\n vec_scaled = scaler.transform(vec)\n prob = model.predict_proba(vec_scaled)[0]\n\n if prob[1] >= threshold:\n results.append({\n \"domain\": domain,\n \"prediction\": \"dga\" if prob[1] >= 0.5 else \"legitimate\",\n \"dga_probability\": round(float(prob[1]), 4),\n \"confidence\": \"high\" if prob[1] > 0.85 else \"medium\",\n \"entropy\": feats[\"entropy\"],\n \"length\": feats[\"length\"],\n })\n\n results.sort(key=lambda x: x[\"dga_probability\"], reverse=True)\n return results\n\n\n# ---------------------------------------------------------------------------\n# Reporting\n# ---------------------------------------------------------------------------\n\ndef print_report(entropy_results, beacons, txt_findings, dga_results,\n total_queries, unique_domains):\n \"\"\"Print unified DNS C2 detection report.\"\"\"\n print(\"=\" * 80)\n print(\" DNS C2 DETECTION ANALYSIS REPORT\")\n print(\"=\" * 80)\n print(f\" Generated: {datetime.utcnow().isoformat()}Z\")\n print(f\" Total Queries: {total_queries:,}\")\n print(f\" Unique Domains: {unique_domains:,}\")\n print()\n\n # Entropy section\n print(\" ENTROPY ANALYSIS\")\n print(\" \" + \"-\" * 76)\n print(f\" Suspicious queries: {len(entropy_results)}\")\n\n if entropy_results:\n # Group by base domain\n domain_agg = defaultdict(lambda: {\"count\": 0, \"max_ent\": 0, \"max_score\": 0, \"ips\": set()})\n for r in entropy_results:\n bd = r[\"base_domain\"]\n domain_agg[bd][\"count\"] += 1\n domain_agg[bd][\"max_ent\"] = max(domain_agg[bd][\"max_ent\"], r[\"entropy\"])\n domain_agg[bd][\"max_score\"] = max(domain_agg[bd][\"max_score\"], r[\"score\"])\n domain_agg[bd][\"ips\"].add(r[\"src_ip\"])\n\n sorted_domains = sorted(domain_agg.items(), key=lambda x: x[1][\"max_score\"], reverse=True)\n for domain, data in sorted_domains[:10]:\n severity = \"CRITICAL\" if data[\"max_score\"] > 60 else \"HIGH\" if data[\"max_score\"] > 30 else \"MEDIUM\"\n print(f\"\\n [{severity}] {domain}\")\n print(f\" Suspicious queries: {data['count']} Max entropy: {data['max_ent']:.3f}\")\n print(f\" Source IPs: {', '.join(sorted(data['ips']))}\")\n\n # Show tool signature if matched\n for r in entropy_results:\n if r[\"base_domain\"] == domain:\n tool_flags = [f for f in r[\"flags\"] if f.startswith(\"tool_sig:\")]\n if tool_flags:\n print(f\" Tool match: {tool_flags[0].split(':')[1]}\")\n break\n print()\n\n # Beaconing section\n print(\" BEACONING DETECTION\")\n print(\" \" + \"-\" * 76)\n print(f\" Beacon patterns: {len(beacons)}\")\n for b in beacons[:10]:\n severity = \"CRITICAL\" if b[\"score\"] > 70 else \"HIGH\" if b[\"score\"] > 50 else \"MEDIUM\"\n print(f\"\\n [{severity}] {b['src_ip']} -> {b['base_domain']}\")\n print(f\" Score: {b['score']} Queries: {b['query_count']} \"\n f\"Interval: {b['mean_interval']:.1f}s +/- {b['std_interval']:.1f}s\")\n print(f\" Jitter: {b['jitter_cv']:.1f}% Duration: {b['duration_hours']:.1f}h\")\n print(f\" Flags: {', '.join(b['flags'])}\")\n print()\n\n # TXT record section\n print(\" TXT RECORD ANALYSIS\")\n print(\" \" + \"-\" * 76)\n print(f\" Suspicious TXT patterns: {len(txt_findings)}\")\n for finding in txt_findings[:10]:\n print(f\"\\n [{finding['max_severity'].upper()}] {finding['base_domain']}\")\n print(f\" TXT queries: {finding['txt_query_count']} \"\n f\"Sources: {', '.join(finding['source_ips'][:3])}\")\n for ind in finding[\"indicators\"][:3]:\n print(f\" - {ind['type']}: {ind['detail']}\")\n print()\n\n # DGA section\n if dga_results:\n print(\" DGA CLASSIFICATION\")\n print(\" \" + \"-\" * 76)\n high_conf = [r for r in dga_results if r[\"confidence\"] == \"high\"]\n med_conf = [r for r in dga_results if r[\"confidence\"] == \"medium\"]\n print(f\" High confidence DGA: {len(high_conf)}\")\n print(f\" Medium confidence: {len(med_conf)}\")\n for r in dga_results[:15]:\n print(f\" [{r['confidence'].upper()}] {r['domain']} \"\n f\"(prob: {r['dga_probability']:.3f}, ent: {r['entropy']:.2f})\")\n print()\n\n # Recommendations\n print(\" RECOMMENDED ACTIONS\")\n print(\" \" + \"-\" * 76)\n action_num = 1\n\n critical_domains = set()\n for r in entropy_results:\n if r[\"score\"] > 60:\n critical_domains.add(r[\"base_domain\"])\n for b in beacons:\n if b[\"score\"] > 70:\n critical_domains.add(b[\"base_domain\"])\n for f in txt_findings:\n if f[\"max_severity\"] == \"critical\":\n critical_domains.add(f[\"base_domain\"])\n\n if critical_domains:\n print(f\" {action_num}. [CRITICAL] Block in DNS RPZ/firewall: \"\n f\"{', '.join(sorted(critical_domains)[:5])}\")\n action_num += 1\n\n critical_ips = set()\n for r in entropy_results[:5]:\n if r[\"score\"] > 60 and r[\"src_ip\"]:\n critical_ips.add(r[\"src_ip\"])\n for b in beacons[:5]:\n if b[\"score\"] > 70:\n critical_ips.add(b[\"src_ip\"])\n\n if critical_ips:\n print(f\" {action_num}. [CRITICAL] Isolate and investigate hosts: \"\n f\"{', '.join(sorted(critical_ips)[:5])}\")\n action_num += 1\n\n if dga_results:\n high_dga = [r[\"domain\"] for r in dga_results if r[\"confidence\"] == \"high\"]\n if high_dga:\n print(f\" {action_num}. [HIGH] Block {len(high_dga)} high-confidence DGA domains\")\n action_num += 1\n\n if txt_findings:\n print(f\" {action_num}. [HIGH] Review {len(txt_findings)} domains with suspicious TXT activity\")\n action_num += 1\n\n print(f\" {action_num}. [MEDIUM] Deploy Zeek/Suricata DNS tunneling signatures\")\n print()\n\n\n# ---------------------------------------------------------------------------\n# Main\n# ---------------------------------------------------------------------------\n\ndef main():\n parser = argparse.ArgumentParser(\n description=\"DNS C2 Detection Agent - Tunneling, DGA, Beaconing, TXT Payload Analysis\"\n )\n parser.add_argument(\"--dns-log\", help=\"Path to DNS log file\")\n parser.add_argument(\"--format\", choices=[\"zeek\", \"suricata\", \"csv\"],\n default=\"zeek\", help=\"DNS log format\")\n parser.add_argument(\"--mode\", choices=[\"full\", \"entropy\", \"beacon\", \"txt\",\n \"dga-classify\", \"train-dga\"],\n default=\"full\", help=\"Analysis mode\")\n\n # Thresholds\n parser.add_argument(\"--entropy-threshold\", type=float, default=3.5,\n help=\"Shannon entropy threshold for suspicious queries\")\n parser.add_argument(\"--length-threshold\", type=int, default=30,\n help=\"Subdomain length threshold\")\n parser.add_argument(\"--beacon-min-queries\", type=int, default=10,\n help=\"Minimum queries for beacon detection\")\n parser.add_argument(\"--beacon-max-jitter\", type=float, default=25,\n help=\"Maximum jitter CV%% for beacon detection\")\n parser.add_argument(\"--dga-threshold\", type=float, default=0.65,\n help=\"DGA probability threshold for reporting\")\n\n # DGA training\n parser.add_argument(\"--legit-domains\", help=\"File with legitimate domains (one per line)\")\n parser.add_argument(\"--dga-domains\", help=\"File with DGA domains (one per line)\")\n parser.add_argument(\"--model-type\", choices=[\"random_forest\", \"gradient_boosting\"],\n default=\"random_forest\", help=\"ML model type for DGA\")\n parser.add_argument(\"--dga-model\", help=\"Path to saved DGA model (pickle)\")\n\n # Output\n parser.add_argument(\"--output\", default=\"dns_c2_report.json\",\n help=\"Output path for JSON report\")\n parser.add_argument(\"--output-model\", default=\"dga_model.pkl\",\n help=\"Output path for trained DGA model\")\n\n args = parser.parse_args()\n\n print(\"[*] DNS C2 Detection Agent\")\n print(f\" Mode: {args.mode}\")\n print()\n\n # DGA training mode\n if args.mode == \"train-dga\":\n if not args.legit_domains or not args.dga_domains:\n print(\"[ERROR] --legit-domains and --dga-domains required for training\")\n sys.exit(1)\n\n with open(args.legit_domains) as f:\n legit = [line.strip() for line in f if line.strip()]\n with open(args.dga_domains) as f:\n dga = [line.strip() for line in f if line.strip()]\n\n print(f\"[*] Loaded {len(legit)} legitimate and {len(dga)} DGA domains\")\n model, scaler, metrics = train_dga_model(\n legit, dga, args.model_type, args.output_model\n )\n if metrics:\n with open(args.output, \"w\") as f:\n json.dump(metrics, f, indent=2)\n print(f\"[+] Metrics saved to {args.output}\")\n return\n\n # Analysis modes require DNS log\n if not args.dns_log:\n print(\"[ERROR] --dns-log required for analysis\")\n sys.exit(1)\n\n print(f\"[*] Loading DNS queries from {args.dns_log} (format: {args.format})...\")\n queries = load_dns_queries(args.dns_log, args.format)\n print(f\" Loaded {len(queries):,} queries\")\n\n if not queries:\n print(\"[ERROR] No queries loaded. Check file path and format.\")\n sys.exit(1)\n\n unique_domains = len(set(q.get(\"query\", \"\") for q in queries))\n print(f\" Unique domains: {unique_domains:,}\")\n print()\n\n entropy_results = []\n beacons = []\n txt_findings = []\n dga_results = []\n\n # Entropy analysis\n if args.mode in (\"full\", \"entropy\"):\n print(\"[*] Running entropy analysis...\")\n entropy_results = analyze_entropy(\n queries, args.entropy_threshold, args.length_threshold\n )\n print(f\" Suspicious queries: {len(entropy_results)}\")\n\n # Beaconing detection\n if args.mode in (\"full\", \"beacon\"):\n print(\"[*] Running beacon detection...\")\n beacons = detect_beaconing(\n queries, args.beacon_min_queries, args.beacon_max_jitter\n )\n print(f\" Beacon patterns: {len(beacons)}\")\n\n # TXT record analysis\n if args.mode in (\"full\", \"txt\"):\n print(\"[*] Running TXT record analysis...\")\n txt_findings = analyze_txt_records(queries)\n print(f\" Suspicious TXT patterns: {len(txt_findings)}\")\n\n # DGA classification\n if args.mode in (\"full\", \"dga-classify\"):\n model = None\n scaler = None\n\n if args.dga_model and os.path.exists(args.dga_model):\n print(f\"[*] Loading DGA model from {args.dga_model}...\")\n with open(args.dga_model, \"rb\") as f:\n saved = pickle.load(f)\n model = saved[\"model\"]\n scaler = saved[\"scaler\"]\n elif HAS_SKLEARN:\n print(\"[*] No DGA model provided, using feature-based heuristic scoring\")\n else:\n print(\"[WARN] scikit-learn not available, skipping DGA classification\")\n\n if model and scaler:\n domains = list(set(q.get(\"query\", \"\").lower().rstrip(\".\")\n for q in queries if q.get(\"query\")))\n print(f\"[*] Classifying {len(domains)} unique domains...\")\n dga_results = classify_domains_dga(domains, model, scaler, args.dga_threshold)\n print(f\" DGA candidates: {len(dga_results)}\")\n\n print()\n\n # Print report\n print_report(entropy_results, beacons, txt_findings, dga_results,\n len(queries), unique_domains)\n\n # Save JSON report\n report = {\n \"generated_at\": datetime.utcnow().isoformat() + \"Z\",\n \"total_queries\": len(queries),\n \"unique_domains\": unique_domains,\n \"entropy_analysis\": {\n \"threshold\": args.entropy_threshold,\n \"suspicious_count\": len(entropy_results),\n \"results\": entropy_results[:100],\n },\n \"beaconing\": {\n \"min_queries\": args.beacon_min_queries,\n \"max_jitter_pct\": args.beacon_max_jitter,\n \"patterns_detected\": len(beacons),\n \"results\": beacons[:50],\n },\n \"txt_analysis\": {\n \"suspicious_count\": len(txt_findings),\n \"results\": txt_findings[:50],\n },\n \"dga_classification\": {\n \"threshold\": args.dga_threshold,\n \"candidates\": len(dga_results),\n \"results\": dga_results[:100],\n },\n }\n\n with open(args.output, \"w\", encoding=\"utf-8\") as f:\n json.dump(report, f, indent=2, default=str)\n print(f\"[+] Report saved to {args.output}\")\n print(\"[*] Done.\")\n\n\nif __name__ == \"__main__\":\n main()\n","content_type":"text/x-python; charset=utf-8","language":"python","size":40670,"content_sha256":"7dddc52c41603c926bc638441a0405e1d23cbd44c133437af14375ce7f5bbba2"}],"content_json":{"type":"doc","content":[{"type":"heading","attrs":{"level":1},"content":[{"text":"Detecting Command and Control Over DNS","type":"text"}]},{"type":"heading","attrs":{"level":2},"content":[{"text":"When to Use","type":"text"}]},{"type":"bullet_list","content":[{"type":"list_item","content":[{"type":"paragraph","content":[{"text":"Investigating suspected DNS tunneling used for C2 communication or data exfiltration","type":"text"}]}]},{"type":"list_item","content":[{"type":"paragraph","content":[{"text":"Analyzing DNS query logs for signs of encoded payloads in subdomain strings","type":"text"}]}]},{"type":"list_item","content":[{"type":"paragraph","content":[{"text":"Classifying domains as DGA-generated vs. legitimate using statistical or ML methods","type":"text"}]}]},{"type":"list_item","content":[{"type":"paragraph","content":[{"text":"Detecting DNS beaconing patterns (regular intervals, consistent query sizes)","type":"text"}]}]},{"type":"list_item","content":[{"type":"paragraph","content":[{"text":"Hunting for Iodine, dnscat2, dns2tcp, Cobalt Strike DNS, or Sliver DNS traffic","type":"text"}]}]},{"type":"list_item","content":[{"type":"paragraph","content":[{"text":"Monitoring TXT record abuse for command delivery or staged payload download","type":"text"}]}]},{"type":"list_item","content":[{"type":"paragraph","content":[{"text":"Building DNS anomaly detection rules for SOC/SIEM deployment","type":"text"}]}]}]},{"type":"paragraph","content":[{"text":"Do not use","type":"text","marks":[{"type":"strong"}]},{"text":" for general DNS performance monitoring or DNS configuration auditing; use DNS health monitoring tools for those. For HTTP/HTTPS-based C2 detection, use network traffic analysis skills focused on web protocols.","type":"text"}]},{"type":"paragraph","content":[{"text":"DISCLAIMER","type":"text","marks":[{"type":"strong"}]},{"text":": DNS tunneling tools referenced in this skill (Iodine, dnscat2, dns2tcp) are dual-use. They have legitimate uses (bypassing captive portals, security research) and malicious uses (C2 channels, exfiltration). Only deploy detection in networks you are authorized to monitor. Testing tunneling tools requires explicit authorization.","type":"text"}]},{"type":"heading","attrs":{"level":2},"content":[{"text":"Prerequisites","type":"text"}]},{"type":"bullet_list","content":[{"type":"list_item","content":[{"type":"paragraph","content":[{"text":"DNS query logs from recursive resolver, Zeek/Bro, Suricata, or passive DNS tap","type":"text"}]}]},{"type":"list_item","content":[{"type":"paragraph","content":[{"text":"Python 3.9+ with ","type":"text"},{"text":"numpy","type":"text","marks":[{"type":"code_inline"}]},{"text":", ","type":"text"},{"text":"scikit-learn","type":"text","marks":[{"type":"code_inline"}]},{"text":", ","type":"text"},{"text":"pandas","type":"text","marks":[{"type":"code_inline"}]},{"text":", ","type":"text"},{"text":"tldextract","type":"text","marks":[{"type":"code_inline"}]},{"text":", and ","type":"text"},{"text":"dnspython","type":"text","marks":[{"type":"code_inline"}]}]}]},{"type":"list_item","content":[{"type":"paragraph","content":[{"text":"Zeek (formerly Bro) with dns.log output or Suricata with DNS EVE JSON logging","type":"text"}]}]},{"type":"list_item","content":[{"type":"paragraph","content":[{"text":"SIEM access (Splunk, Elastic, Microsoft Sentinel) for log correlation","type":"text"}]}]},{"type":"list_item","content":[{"type":"paragraph","content":[{"text":"Passive DNS database access (CIRCL pDNS, Farsight DNSDB, or internal) for enrichment","type":"text"}]}]},{"type":"list_item","content":[{"type":"paragraph","content":[{"text":"Wireshark/tshark for packet-level DNS inspection","type":"text"}]}]},{"type":"list_item","content":[{"type":"paragraph","content":[{"text":"Known-good domain whitelist (Alexa/Tranco top 1M or Majestic Million)","type":"text"}]}]}]},{"type":"heading","attrs":{"level":2},"content":[{"text":"Workflow","type":"text"}]},{"type":"heading","attrs":{"level":3},"content":[{"text":"Step 1: Collect and Parse DNS Query Logs","type":"text"}]},{"type":"paragraph","content":[{"text":"Ingest DNS traffic from network sensors and parse into analyzable format:","type":"text"}]},{"type":"code_block","attrs":{"wrap":false,"language":"bash"},"content":[{"text":"# Zeek - extract dns.log fields\n# Default Zeek dns.log columns:\n# ts uid id.orig_h id.orig_p id.resp_h id.resp_p proto trans_id rtt query\n# qclass qclass_name qtype qtype_name rcode rcode_name AA TC RD RA Z\n# answers TTLs rejected\n\n# Filter for potentially suspicious record types\ncat dns.log | zeek-cut ts id.orig_h query qtype_name answers rcode_name | \\\n grep -E \"TXT|NULL|CNAME|MX\" > suspicious_qtypes.log\n\n# Extract unique queried domains\ncat dns.log | zeek-cut query | sort -u > unique_domains.txt\n\n# Suricata EVE JSON - extract DNS events\ncat eve.json | jq -r 'select(.event_type==\"dns\") |\n [.timestamp, .src_ip, .dns.rrname, .dns.rrtype, .dns.rcode] |\n @tsv' > dns_events.tsv\n\n# tshark - extract DNS queries from pcap\ntshark -r capture.pcap -T fields \\\n -e frame.time -e ip.src -e ip.dst \\\n -e dns.qry.name -e dns.qry.type \\\n -e dns.resp.type -e dns.txt \\\n -Y \"dns\" > dns_queries.tsv\n\n# Count queries per domain (find high-volume destinations)\ncat dns.log | zeek-cut query | \\\n awk -F. '{print $(NF-1)\".\"$NF}' | \\\n sort | uniq -c | sort -rn | head -50","type":"text"}]},{"type":"heading","attrs":{"level":3},"content":[{"text":"Step 2: Shannon Entropy Analysis of DNS Queries","type":"text"}]},{"type":"paragraph","content":[{"text":"Calculate entropy of subdomain strings to identify encoded/encrypted data:","type":"text"}]},{"type":"code_block","attrs":{"wrap":false,"language":"python"},"content":[{"text":"#!/usr/bin/env python3\n\"\"\"Shannon entropy analysis for DNS query subdomains.\"\"\"\n\nimport math\nimport csv\nimport sys\nfrom collections import Counter\n\ntry:\n import tldextract\n HAS_TLDEXTRACT = True\nexcept ImportError:\n HAS_TLDEXTRACT = False\n\n\ndef shannon_entropy(data):\n \"\"\"Calculate Shannon entropy of a string (bits per character).\"\"\"\n if not data:\n return 0.0\n counter = Counter(data)\n length = len(data)\n entropy = -sum(\n (count / length) * math.log2(count / length)\n for count in counter.values()\n )\n return entropy\n\n\ndef extract_subdomain(fqdn):\n \"\"\"Extract the subdomain portion from a fully qualified domain name.\"\"\"\n if HAS_TLDEXTRACT:\n ext = tldextract.extract(fqdn)\n if ext.subdomain:\n return ext.subdomain, f\"{ext.domain}.{ext.suffix}\"\n return \"\", f\"{ext.domain}.{ext.suffix}\"\n else:\n # Fallback: assume last two labels are domain + TLD\n parts = fqdn.rstrip(\".\").split(\".\")\n if len(parts) > 2:\n return \".\".join(parts[:-2]), \".\".join(parts[-2:])\n return \"\", fqdn\n\n\ndef analyze_dns_entropy(queries, entropy_threshold=3.5, length_threshold=30):\n \"\"\"\n Analyze DNS queries for tunneling indicators using entropy.\n\n Thresholds (tunable per environment):\n - entropy_threshold: Shannon entropy above this flags as suspicious (3.5-4.0 typical)\n - length_threshold: Subdomain length above this flags as suspicious (30-50 chars)\n\n Returns list of flagged queries with scores.\n \"\"\"\n results = []\n\n for query_record in queries:\n fqdn = query_record.get(\"query\", \"\").lower().rstrip(\".\")\n if not fqdn:\n continue\n\n subdomain, base_domain = extract_subdomain(fqdn)\n if not subdomain:\n continue\n\n # Remove dots from subdomain for entropy calculation\n subdomain_flat = subdomain.replace(\".\", \"\")\n if not subdomain_flat:\n continue\n\n entropy = shannon_entropy(subdomain_flat)\n length = len(subdomain_flat)\n label_count = subdomain.count(\".\") + 1\n\n # Scoring: higher = more suspicious\n score = 0.0\n flags = []\n\n if entropy > entropy_threshold:\n score += (entropy - entropy_threshold) * 25\n flags.append(f\"high_entropy:{entropy:.2f}\")\n\n if length > length_threshold:\n score += (length - length_threshold) * 0.5\n flags.append(f\"long_subdomain:{length}\")\n\n if label_count > 4:\n score += label_count * 2\n flags.append(f\"many_labels:{label_count}\")\n\n # Check for hex/base32/base64 encoding patterns\n hex_ratio = sum(1 for c in subdomain_flat if c in \"0123456789abcdef\") / max(len(subdomain_flat), 1)\n if hex_ratio > 0.85 and length > 20:\n score += 20\n flags.append(\"hex_encoded\")\n\n b32_chars = set(\"abcdefghijklmnopqrstuvwxyz234567\")\n b32_ratio = sum(1 for c in subdomain_flat if c in b32_chars) / max(len(subdomain_flat), 1)\n if b32_ratio > 0.95 and length > 20:\n score += 15\n flags.append(\"base32_encoded\")\n\n # Only report if at least one flag triggered\n if flags:\n results.append({\n \"fqdn\": fqdn,\n \"subdomain\": subdomain,\n \"base_domain\": base_domain,\n \"entropy\": round(entropy, 4),\n \"subdomain_length\": length,\n \"label_count\": label_count,\n \"score\": round(score, 2),\n \"flags\": flags,\n \"src_ip\": query_record.get(\"src_ip\", \"\"),\n \"timestamp\": query_record.get(\"timestamp\", \"\"),\n \"qtype\": query_record.get(\"qtype\", \"\"),\n })\n\n # Sort by score descending\n results.sort(key=lambda x: x[\"score\"], reverse=True)\n return results\n\n\n# Thresholds for known tunneling tools\nTOOL_SIGNATURES = {\n \"iodine\": {\n \"subdomain_pattern\": r\"^[a-z0-9]{50,}$\", # Long hex-like subdomains\n \"common_qtypes\": [\"NULL\", \"TXT\", \"CNAME\", \"MX\", \"A\"],\n \"typical_entropy\": (3.8, 4.2),\n \"description\": \"Iodine DNS tunnel - IPv4 over DNS, uses NULL/TXT records\",\n },\n \"dnscat2\": {\n \"subdomain_pattern\": r\"^dnscat\\.|^[a-f0-9]{16,}\",\n \"common_qtypes\": [\"TXT\", \"CNAME\", \"MX\", \"A\"],\n \"typical_entropy\": (3.5, 4.5),\n \"description\": \"dnscat2 encrypted C2 channel over DNS\",\n },\n \"dns2tcp\": {\n \"subdomain_pattern\": r\"^[a-z2-7]{20,}\", # Base32 encoding\n \"common_qtypes\": [\"TXT\", \"KEY\"],\n \"typical_entropy\": (3.6, 4.0),\n \"description\": \"dns2tcp tunnel - TCP over DNS using TXT/KEY records\",\n },\n \"cobalt_strike_dns\": {\n \"subdomain_pattern\": r\"^[a-f0-9]{12,}\\.\",\n \"common_qtypes\": [\"A\", \"AAAA\", \"TXT\"],\n \"typical_entropy\": (3.2, 4.0),\n \"description\": \"Cobalt Strike DNS beacon - encoded commands in A/TXT records\",\n },\n}\n\n\ndef print_entropy_report(results, top_n=25):\n \"\"\"Print formatted entropy analysis report.\"\"\"\n print(\"=\" * 80)\n print(\" DNS ENTROPY ANALYSIS - TUNNELING DETECTION\")\n print(\"=\" * 80)\n print(f\" Suspicious queries found: {len(results)}\")\n print()\n\n if not results:\n print(\" No suspicious queries detected.\")\n return\n\n # Group by base domain\n domain_groups = {}\n for r in results:\n bd = r[\"base_domain\"]\n if bd not in domain_groups:\n domain_groups[bd] = {\"count\": 0, \"max_entropy\": 0, \"max_score\": 0, \"queries\": []}\n domain_groups[bd][\"count\"] += 1\n domain_groups[bd][\"max_entropy\"] = max(domain_groups[bd][\"max_entropy\"], r[\"entropy\"])\n domain_groups[bd][\"max_score\"] = max(domain_groups[bd][\"max_score\"], r[\"score\"])\n domain_groups[bd][\"queries\"].append(r)\n\n # Sort domains by total suspicious query count\n sorted_domains = sorted(domain_groups.items(), key=lambda x: x[1][\"count\"], reverse=True)\n\n print(\" TOP SUSPICIOUS BASE DOMAINS\")\n print(\" \" + \"-\" * 76)\n print(f\" {'Domain':\u003c35} {'Queries':>8} {'Max Ent':>8} {'Max Score':>10}\")\n print(\" \" + \"-\" * 76)\n for domain, data in sorted_domains[:20]:\n print(f\" {domain:\u003c35} {data['count']:>8} {data['max_entropy']:>8.3f} {data['max_score']:>10.1f}\")\n print()\n\n print(f\" TOP {top_n} HIGHEST-SCORING QUERIES\")\n print(\" \" + \"-\" * 76)\n for r in results[:top_n]:\n print(f\" Score: {r['score']:.1f} Entropy: {r['entropy']:.3f} Len: {r['subdomain_length']}\")\n print(f\" FQDN: {r['fqdn'][:75]}\")\n print(f\" Flags: {', '.join(r['flags'])}\")\n print(f\" Source: {r['src_ip']} Type: {r['qtype']}\")\n print()","type":"text"}]},{"type":"heading","attrs":{"level":3},"content":[{"text":"Step 3: TXT Record Payload Detection","type":"text"}]},{"type":"paragraph","content":[{"text":"Identify C2 commands or staged payloads delivered via DNS TXT records:","type":"text"}]},{"type":"code_block","attrs":{"wrap":false,"language":"python"},"content":[{"text":"#!/usr/bin/env python3\n\"\"\"DNS TXT record payload detection for C2 command delivery.\"\"\"\n\nimport base64\nimport re\nimport math\nfrom collections import Counter\n\n\ndef shannon_entropy(data):\n \"\"\"Calculate Shannon entropy.\"\"\"\n if not data:\n return 0.0\n counter = Counter(data)\n length = len(data)\n return -sum((c / length) * math.log2(c / length) for c in counter.values())\n\n\ndef analyze_txt_record(txt_data, domain=\"\"):\n \"\"\"\n Analyze a DNS TXT record response for C2 payload indicators.\n\n Indicators:\n - High entropy content (encoded/encrypted payloads)\n - Base64-encoded executable content\n - PowerShell stager patterns\n - Unusually large TXT records (>255 bytes per string, multiple strings)\n - Known C2 framework patterns\n \"\"\"\n findings = {\n \"domain\": domain,\n \"txt_length\": len(txt_data),\n \"entropy\": shannon_entropy(txt_data),\n \"suspicious\": False,\n \"indicators\": [],\n \"decoded_preview\": None,\n }\n\n # Length check - legitimate TXT records are typically short (SPF, DKIM, verification)\n if len(txt_data) > 500:\n findings[\"indicators\"].append({\n \"type\": \"oversized_txt\",\n \"detail\": f\"TXT record length {len(txt_data)} exceeds normal threshold (500)\",\n \"severity\": \"medium\",\n })\n\n # High entropy - suggests encoded/encrypted payload\n if findings[\"entropy\"] > 4.5 and len(txt_data) > 100:\n findings[\"indicators\"].append({\n \"type\": \"high_entropy_payload\",\n \"detail\": f\"Entropy {findings['entropy']:.3f} suggests encoded data\",\n \"severity\": \"high\",\n })\n\n # Base64 detection\n b64_pattern = re.compile(r'^[A-Za-z0-9+/]{40,}={0,2}

Detecting Command and Control Over DNS When to Use - Investigating suspected DNS tunneling used for C2 communication or data exfiltration - Analyzing DNS query logs for signs of encoded payloads in subdomain strings - Classifying domains as DGA-generated vs. legitimate using statistical or ML methods - Detecting DNS beaconing patterns (regular intervals, consistent query sizes) - Hunting for Iodine, dnscat2, dns2tcp, Cobalt Strike DNS, or Sliver DNS traffic - Monitoring TXT record abuse for command delivery or staged payload download - Building DNS anomaly detection rules for SOC/SIEM deploym…

)\n if b64_pattern.match(txt_data.strip()):\n findings[\"indicators\"].append({\n \"type\": \"base64_encoded\",\n \"detail\": \"Content matches base64 pattern\",\n \"severity\": \"high\",\n })\n try:\n decoded = base64.b64decode(txt_data.strip())\n preview = decoded[:200]\n\n # Check for PE header (MZ)\n if preview[:2] == b'MZ':\n findings[\"indicators\"].append({\n \"type\": \"pe_executable\",\n \"detail\": \"Decoded base64 contains PE executable (MZ header)\",\n \"severity\": \"critical\",\n })\n\n # Check for ELF header\n if preview[:4] == b'\\x7fELF':\n findings[\"indicators\"].append({\n \"type\": \"elf_executable\",\n \"detail\": \"Decoded base64 contains ELF executable\",\n \"severity\": \"critical\",\n })\n\n # Check for PowerShell patterns\n decoded_str = decoded.decode(\"utf-8\", errors=\"ignore\")\n ps_patterns = [\n r\"Invoke-Expression\",\n r\"IEX\\s*\\(\",\n r\"New-Object\\s+System\\.Net\",\n r\"DownloadString\",\n r\"FromBase64String\",\n r\"Start-Process\",\n r\"\\-enc\\s\",\n r\"powershell\\s.*\\-e\\s\",\n ]\n for pattern in ps_patterns:\n if re.search(pattern, decoded_str, re.IGNORECASE):\n findings[\"indicators\"].append({\n \"type\": \"powershell_stager\",\n \"detail\": f\"Decoded content contains PowerShell pattern: {pattern}\",\n \"severity\": \"critical\",\n })\n break\n\n findings[\"decoded_preview\"] = repr(preview[:100])\n\n except Exception:\n pass\n\n # Known C2 TXT patterns\n cobalt_pattern = re.compile(r'^[a-f0-9]{32,}

Detecting Command and Control Over DNS When to Use - Investigating suspected DNS tunneling used for C2 communication or data exfiltration - Analyzing DNS query logs for signs of encoded payloads in subdomain strings - Classifying domains as DGA-generated vs. legitimate using statistical or ML methods - Detecting DNS beaconing patterns (regular intervals, consistent query sizes) - Hunting for Iodine, dnscat2, dns2tcp, Cobalt Strike DNS, or Sliver DNS traffic - Monitoring TXT record abuse for command delivery or staged payload download - Building DNS anomaly detection rules for SOC/SIEM deploym…

, re.IGNORECASE)\n if cobalt_pattern.match(txt_data.strip()):\n findings[\"indicators\"].append({\n \"type\": \"hex_encoded_payload\",\n \"detail\": \"Pure hex string in TXT record - possible Cobalt Strike beacon config\",\n \"severity\": \"high\",\n })\n\n # Multiple concatenated base64 blocks (common in staged delivery)\n b64_blocks = re.findall(r'[A-Za-z0-9+/]{50,}={0,2}', txt_data)\n if len(b64_blocks) > 3:\n findings[\"indicators\"].append({\n \"type\": \"multi_block_payload\",\n \"detail\": f\"{len(b64_blocks)} base64 blocks found - possible staged payload\",\n \"severity\": \"high\",\n })\n\n # Check for known legitimate TXT patterns to reduce false positives\n legitimate_patterns = [\n r'^v=spf1\\s', # SPF record\n r'^v=DKIM1', # DKIM record\n r'^v=DMARC1', # DMARC record\n r'^google-site-verification=',\n r'^MS=', # Microsoft domain verification\n r'^docusign=',\n r'^apple-domain-verification=',\n r'^facebook-domain-verification=',\n r'^_globalsign-domain-verification=',\n ]\n for pattern in legitimate_patterns:\n if re.match(pattern, txt_data, re.IGNORECASE):\n findings[\"indicators\"] = []\n findings[\"legitimate\"] = True\n return findings\n\n findings[\"suspicious\"] = len(findings[\"indicators\"]) > 0\n return findings\n\n\ndef analyze_txt_records_bulk(records):\n \"\"\"Analyze a batch of DNS TXT records.\"\"\"\n results = []\n for record in records:\n domain = record.get(\"domain\", record.get(\"query\", \"\"))\n txt_data = record.get(\"txt\", record.get(\"answer\", \"\"))\n if txt_data:\n finding = analyze_txt_record(txt_data, domain)\n if finding[\"suspicious\"]:\n results.append(finding)\n\n results.sort(\n key=lambda x: max((i.get(\"severity_score\", 0) for i in x[\"indicators\"]),\n default=0),\n reverse=True,\n )\n return results","type":"text"}]},{"type":"heading","attrs":{"level":3},"content":[{"text":"Step 4: DGA Domain Classification with Machine Learning","type":"text"}]},{"type":"paragraph","content":[{"text":"Train a classifier to distinguish DGA-generated domains from legitimate ones:","type":"text"}]},{"type":"code_block","attrs":{"wrap":false,"language":"python"},"content":[{"text":"#!/usr/bin/env python3\n\"\"\"\nDGA domain classification using character-level feature extraction and ML.\n\nFeatures extracted per domain:\n - Shannon entropy of the domain string\n - Domain length\n - Digit ratio, consonant ratio, vowel ratio\n - Longest consecutive consonant sequence\n - N-gram frequency deviation from English\n - Number of distinct characters\n - Presence of dictionary words\n\"\"\"\n\nimport math\nimport re\nimport string\nfrom collections import Counter\n\nimport numpy as np\n\ntry:\n from sklearn.ensemble import RandomForestClassifier, GradientBoostingClassifier\n from sklearn.model_selection import train_test_split, cross_val_score\n from sklearn.metrics import classification_report, confusion_matrix\n from sklearn.preprocessing import StandardScaler\n HAS_SKLEARN = True\nexcept ImportError:\n HAS_SKLEARN = False\n\n\n# English language character bigram frequencies (normalized, top bigrams)\n# Source: Peter Norvig's English letter frequency analysis\nENGLISH_BIGRAMS = {\n \"th\": 0.0356, \"he\": 0.0307, \"in\": 0.0243, \"er\": 0.0205,\n \"an\": 0.0199, \"re\": 0.0185, \"on\": 0.0176, \"at\": 0.0149,\n \"en\": 0.0145, \"nd\": 0.0135, \"ti\": 0.0134, \"es\": 0.0134,\n \"or\": 0.0128, \"te\": 0.0120, \"of\": 0.0117, \"ed\": 0.0117,\n \"is\": 0.0113, \"it\": 0.0112, \"al\": 0.0109, \"ar\": 0.0107,\n \"st\": 0.0105, \"to\": 0.0104, \"nt\": 0.0104, \"ng\": 0.0095,\n \"se\": 0.0093, \"ha\": 0.0093, \"as\": 0.0087, \"ou\": 0.0087,\n \"io\": 0.0083, \"le\": 0.0083, \"ve\": 0.0083, \"co\": 0.0079,\n \"me\": 0.0079, \"de\": 0.0076, \"hi\": 0.0076, \"ri\": 0.0073,\n \"ro\": 0.0073, \"ic\": 0.0070, \"ne\": 0.0069, \"ea\": 0.0069,\n}\n\nVOWELS = set(\"aeiou\")\nCONSONANTS = set(\"bcdfghjklmnpqrstvwxyz\")\n\n\ndef extract_domain_features(domain):\n \"\"\"Extract numerical features from a domain name for ML classification.\"\"\"\n domain = domain.lower().strip(\".\")\n\n # Remove TLD for analysis (focus on SLD + subdomain)\n parts = domain.split(\".\")\n if len(parts) > 1:\n analysis_str = \".\".join(parts[:-1]) # Drop TLD\n else:\n analysis_str = domain\n\n # Remove dots for character analysis\n flat = analysis_str.replace(\".\", \"\")\n length = len(flat)\n\n if length == 0:\n return None\n\n # 1. Shannon entropy\n entropy = 0.0\n counter = Counter(flat)\n for count in counter.values():\n p = count / length\n entropy -= p * math.log2(p)\n\n # 2. Character ratios\n digit_count = sum(1 for c in flat if c.isdigit())\n vowel_count = sum(1 for c in flat if c in VOWELS)\n consonant_count = sum(1 for c in flat if c in CONSONANTS)\n special_count = sum(1 for c in flat if c == '-')\n\n digit_ratio = digit_count / length\n vowel_ratio = vowel_count / length\n consonant_ratio = consonant_count / length\n\n # 3. Longest consecutive consonant run\n max_consonant_run = 0\n current_run = 0\n for c in flat:\n if c in CONSONANTS:\n current_run += 1\n max_consonant_run = max(max_consonant_run, current_run)\n else:\n current_run = 0\n\n # 4. Distinct character count and ratio\n distinct_chars = len(set(flat))\n distinct_ratio = distinct_chars / length\n\n # 5. Bigram frequency deviation from English\n bigrams = [flat[i:i+2] for i in range(len(flat) - 1)]\n if bigrams:\n english_score = sum(\n ENGLISH_BIGRAMS.get(bg, 0) for bg in bigrams\n ) / len(bigrams)\n else:\n english_score = 0\n\n # 6. Number of labels (dots + 1)\n label_count = len(parts)\n\n # 7. Hex character ratio (common in DGA)\n hex_chars = set(\"0123456789abcdef\")\n hex_ratio = sum(1 for c in flat if c in hex_chars) / length\n\n # 8. Digit-letter transitions (DGA domains mix digits and letters)\n transitions = 0\n for i in range(1, len(flat)):\n if (flat[i].isdigit() != flat[i-1].isdigit()):\n transitions += 1\n transition_ratio = transitions / max(length - 1, 1)\n\n # 9. Repeated character ratio\n if length > 1:\n repeats = sum(1 for i in range(1, len(flat)) if flat[i] == flat[i-1])\n repeat_ratio = repeats / (length - 1)\n else:\n repeat_ratio = 0\n\n return {\n \"domain\": domain,\n \"length\": length,\n \"entropy\": round(entropy, 4),\n \"digit_ratio\": round(digit_ratio, 4),\n \"vowel_ratio\": round(vowel_ratio, 4),\n \"consonant_ratio\": round(consonant_ratio, 4),\n \"max_consonant_run\": max_consonant_run,\n \"distinct_chars\": distinct_chars,\n \"distinct_ratio\": round(distinct_ratio, 4),\n \"english_bigram_score\": round(english_score, 6),\n \"label_count\": label_count,\n \"hex_ratio\": round(hex_ratio, 4),\n \"transition_ratio\": round(transition_ratio, 4),\n \"repeat_ratio\": round(repeat_ratio, 4),\n \"special_count\": special_count,\n }\n\n\nFEATURE_COLUMNS = [\n \"length\", \"entropy\", \"digit_ratio\", \"vowel_ratio\", \"consonant_ratio\",\n \"max_consonant_run\", \"distinct_chars\", \"distinct_ratio\",\n \"english_bigram_score\", \"label_count\", \"hex_ratio\",\n \"transition_ratio\", \"repeat_ratio\", \"special_count\",\n]\n\n\ndef features_to_vector(features):\n \"\"\"Convert feature dict to numpy array.\"\"\"\n return np.array([features[col] for col in FEATURE_COLUMNS])\n\n\ndef train_dga_classifier(legitimate_domains, dga_domains, model_type=\"random_forest\"):\n \"\"\"\n Train a DGA classifier on labeled domain lists.\n\n Args:\n legitimate_domains: list of known-good domain strings\n dga_domains: list of known DGA domain strings\n model_type: 'random_forest' or 'gradient_boosting'\n\n Returns:\n trained model, scaler, and evaluation metrics\n \"\"\"\n if not HAS_SKLEARN:\n print(\"[ERROR] scikit-learn required: pip install scikit-learn\")\n return None, None, None\n\n # Extract features\n X_legit = []\n X_dga = []\n\n for d in legitimate_domains:\n feats = extract_domain_features(d)\n if feats:\n X_legit.append(features_to_vector(feats))\n\n for d in dga_domains:\n feats = extract_domain_features(d)\n if feats:\n X_dga.append(features_to_vector(feats))\n\n if not X_legit or not X_dga:\n print(\"[ERROR] Insufficient feature data\")\n return None, None, None\n\n X = np.vstack([np.array(X_legit), np.array(X_dga)])\n y = np.array([0] * len(X_legit) + [1] * len(X_dga))\n\n # Scale features\n scaler = StandardScaler()\n X_scaled = scaler.fit_transform(X)\n\n # Train/test split\n X_train, X_test, y_train, y_test = train_test_split(\n X_scaled, y, test_size=0.2, random_state=42, stratify=y\n )\n\n # Train model\n if model_type == \"gradient_boosting\":\n model = GradientBoostingClassifier(\n n_estimators=200, max_depth=6, learning_rate=0.1,\n min_samples_split=10, random_state=42,\n )\n else:\n model = RandomForestClassifier(\n n_estimators=200, max_depth=15, min_samples_split=5,\n random_state=42, n_jobs=-1,\n )\n\n model.fit(X_train, y_train)\n\n # Evaluate\n y_pred = model.predict(X_test)\n report = classification_report(y_test, y_pred, target_names=[\"legitimate\", \"dga\"],\n output_dict=True)\n cm = confusion_matrix(y_test, y_pred)\n\n # Cross-validation\n cv_scores = cross_val_score(model, X_scaled, y, cv=5, scoring=\"f1\")\n\n metrics = {\n \"accuracy\": report[\"accuracy\"],\n \"precision_dga\": report[\"dga\"][\"precision\"],\n \"recall_dga\": report[\"dga\"][\"recall\"],\n \"f1_dga\": report[\"dga\"][\"f1-score\"],\n \"precision_legit\": report[\"legitimate\"][\"precision\"],\n \"recall_legit\": report[\"legitimate\"][\"recall\"],\n \"confusion_matrix\": cm.tolist(),\n \"cv_f1_mean\": cv_scores.mean(),\n \"cv_f1_std\": cv_scores.std(),\n \"feature_importance\": dict(zip(\n FEATURE_COLUMNS,\n [round(float(x), 4) for x in model.feature_importances_]\n )),\n }\n\n print(f\"[+] Model trained: {model_type}\")\n print(f\" Accuracy: {metrics['accuracy']:.4f}\")\n print(f\" DGA F1: {metrics['f1_dga']:.4f}\")\n print(f\" DGA Recall: {metrics['recall_dga']:.4f}\")\n print(f\" CV F1 (5-fold): {metrics['cv_f1_mean']:.4f} +/- {metrics['cv_f1_std']:.4f}\")\n print(f\" Top features: \", end=\"\")\n top_feats = sorted(metrics[\"feature_importance\"].items(), key=lambda x: x[1], reverse=True)[:5]\n print(\", \".join(f\"{k}={v:.3f}\" for k, v in top_feats))\n\n return model, scaler, metrics\n\n\ndef classify_domains(domains, model, scaler):\n \"\"\"Classify a list of domains as legitimate or DGA using a trained model.\"\"\"\n results = []\n for domain in domains:\n feats = extract_domain_features(domain)\n if feats is None:\n continue\n\n vec = features_to_vector(feats).reshape(1, -1)\n vec_scaled = scaler.transform(vec)\n\n prediction = model.predict(vec_scaled)[0]\n probability = model.predict_proba(vec_scaled)[0]\n\n results.append({\n \"domain\": domain,\n \"prediction\": \"dga\" if prediction == 1 else \"legitimate\",\n \"confidence\": round(float(max(probability)), 4),\n \"dga_probability\": round(float(probability[1]), 4),\n \"features\": feats,\n })\n\n return results","type":"text"}]},{"type":"heading","attrs":{"level":3},"content":[{"text":"Step 5: DNS Beaconing Pattern Detection","type":"text"}]},{"type":"paragraph","content":[{"text":"Identify periodic DNS query patterns indicative of C2 check-ins:","type":"text"}]},{"type":"code_block","attrs":{"wrap":false,"language":"python"},"content":[{"text":"#!/usr/bin/env python3\n\"\"\"DNS beaconing detection - identifies periodic C2 check-in patterns.\"\"\"\n\nimport math\nfrom collections import defaultdict\nfrom datetime import datetime, timedelta\n\nimport numpy as np\n\n\ndef parse_timestamp(ts_str):\n \"\"\"Parse various timestamp formats to datetime.\"\"\"\n formats = [\n \"%Y-%m-%dT%H:%M:%S.%fZ\",\n \"%Y-%m-%dT%H:%M:%S.%f\",\n \"%Y-%m-%dT%H:%M:%S\",\n \"%Y-%m-%d %H:%M:%S.%f\",\n \"%Y-%m-%d %H:%M:%S\",\n ]\n for fmt in formats:\n try:\n return datetime.strptime(ts_str, fmt)\n except ValueError:\n continue\n\n # Try epoch timestamp\n try:\n ts_float = float(ts_str)\n return datetime.utcfromtimestamp(ts_float)\n except (ValueError, OverflowError, OSError):\n pass\n\n return None\n\n\ndef detect_beaconing(dns_queries, min_queries=10, max_jitter_pct=25,\n min_interval_sec=10, max_interval_sec=7200):\n \"\"\"\n Detect DNS beaconing by analyzing inter-query timing intervals.\n\n Beaconing indicators:\n - Regular inter-query intervals (low standard deviation)\n - Consistent query sizes\n - Single source IP to single domain over extended period\n - Low jitter (variation in timing)\n\n Args:\n dns_queries: list of dicts with 'src_ip', 'query', 'timestamp'\n min_queries: minimum queries to analyze (default 10)\n max_jitter_pct: maximum coefficient of variation for beacon (default 25%)\n min_interval_sec: minimum beacon interval to detect (default 10s)\n max_interval_sec: maximum beacon interval to detect (default 7200s / 2hr)\n\n Returns:\n list of detected beacon patterns with confidence scores\n \"\"\"\n # Group queries by (source IP, base domain)\n groups = defaultdict(list)\n\n for q in dns_queries:\n src_ip = q.get(\"src_ip\", \"\")\n fqdn = q.get(\"query\", \"\").lower().rstrip(\".\")\n ts_str = q.get(\"timestamp\", \"\")\n\n ts = parse_timestamp(ts_str)\n if not ts or not src_ip or not fqdn:\n continue\n\n # Extract base domain (last 2 labels)\n parts = fqdn.split(\".\")\n if len(parts) >= 2:\n base_domain = \".\".join(parts[-2:])\n else:\n base_domain = fqdn\n\n groups[(src_ip, base_domain)].append(ts)\n\n beacons = []\n\n for (src_ip, base_domain), timestamps in groups.items():\n if len(timestamps) \u003c min_queries:\n continue\n\n # Sort timestamps and compute intervals\n timestamps.sort()\n intervals = [\n (timestamps[i+1] - timestamps[i]).total_seconds()\n for i in range(len(timestamps) - 1)\n ]\n\n if not intervals:\n continue\n\n intervals = np.array(intervals)\n\n # Filter out zero intervals (duplicate timestamps)\n intervals = intervals[intervals > 0]\n if len(intervals) \u003c min_queries - 1:\n continue\n\n mean_interval = np.mean(intervals)\n std_interval = np.std(intervals)\n median_interval = np.median(intervals)\n\n # Skip if interval is outside detection range\n if mean_interval \u003c min_interval_sec or mean_interval > max_interval_sec:\n continue\n\n # Coefficient of variation (jitter)\n cv = (std_interval / mean_interval * 100) if mean_interval > 0 else 100\n\n # Time span of activity\n time_span = (timestamps[-1] - timestamps[0]).total_seconds()\n hours_active = time_span / 3600\n\n # Beacon scoring\n score = 0.0\n flags = []\n\n # Low jitter = strong beacon indicator\n if cv \u003c 5:\n score += 40\n flags.append(f\"very_low_jitter:CV={cv:.1f}%\")\n elif cv \u003c 15:\n score += 30\n flags.append(f\"low_jitter:CV={cv:.1f}%\")\n elif cv \u003c max_jitter_pct:\n score += 15\n flags.append(f\"moderate_jitter:CV={cv:.1f}%\")\n else:\n continue # Too much jitter, not a beacon\n\n # Long duration increases confidence\n if hours_active > 24:\n score += 20\n flags.append(f\"persistent:{hours_active:.1f}h\")\n elif hours_active > 4:\n score += 10\n flags.append(f\"sustained:{hours_active:.1f}h\")\n\n # High query count increases confidence\n if len(timestamps) > 100:\n score += 15\n flags.append(f\"high_volume:{len(timestamps)}\")\n elif len(timestamps) > 50:\n score += 10\n flags.append(f\"moderate_volume:{len(timestamps)}\")\n\n # Common C2 intervals (60s, 120s, 300s, 600s, 900s, 1800s, 3600s)\n common_intervals = [60, 120, 300, 600, 900, 1800, 3600]\n for ci in common_intervals:\n if abs(mean_interval - ci) \u003c ci * 0.1: # Within 10% of common interval\n score += 10\n flags.append(f\"common_c2_interval:~{ci}s\")\n break\n\n beacons.append({\n \"src_ip\": src_ip,\n \"base_domain\": base_domain,\n \"query_count\": len(timestamps),\n \"mean_interval_sec\": round(mean_interval, 2),\n \"median_interval_sec\": round(median_interval, 2),\n \"std_interval_sec\": round(std_interval, 2),\n \"jitter_cv_pct\": round(cv, 2),\n \"first_seen\": timestamps[0].isoformat(),\n \"last_seen\": timestamps[-1].isoformat(),\n \"duration_hours\": round(hours_active, 2),\n \"score\": round(score, 1),\n \"flags\": flags,\n })\n\n beacons.sort(key=lambda x: x[\"score\"], reverse=True)\n return beacons\n\n\ndef print_beacon_report(beacons, top_n=20):\n \"\"\"Print formatted beacon detection report.\"\"\"\n print(\"=\" * 80)\n print(\" DNS BEACONING DETECTION REPORT\")\n print(\"=\" * 80)\n print(f\" Beacon patterns detected: {len(beacons)}\")\n print()\n\n if not beacons:\n print(\" No beaconing patterns detected.\")\n return\n\n print(f\" TOP {min(top_n, len(beacons))} BEACON CANDIDATES\")\n print(\" \" + \"-\" * 76)\n\n for b in beacons[:top_n]:\n print(f\" Score: {b['score']:.1f} | {b['src_ip']} -> {b['base_domain']}\")\n print(f\" Queries: {b['query_count']} \"\n f\"Interval: {b['mean_interval_sec']:.1f}s +/- {b['std_interval_sec']:.1f}s \"\n f\"Jitter: {b['jitter_cv_pct']:.1f}%\")\n print(f\" Active: {b['duration_hours']:.1f}h \"\n f\"({b['first_seen']} to {b['last_seen']})\")\n print(f\" Flags: {', '.join(b['flags'])}\")\n print()","type":"text"}]},{"type":"heading","attrs":{"level":3},"content":[{"text":"Step 6: Integrated DNS C2 Detection Pipeline","type":"text"}]},{"type":"paragraph","content":[{"text":"Combine all detection methods into a unified analysis:","type":"text"}]},{"type":"code_block","attrs":{"wrap":false,"language":""},"content":[{"text":"DNS C2 Detection Pipeline Architecture:\n━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━\n\n ┌────────────────────────────────────────────────────────┐\n │ DATA SOURCES │\n │ Zeek dns.log | Suricata EVE | Recursive Resolver │\n │ Passive DNS | PCAP capture | EDR DNS telemetry │\n └───────────────────────┬────────────────────────────────┘\n │\n ┌───────────────────────▼────────────────────────────────┐\n │ PREPROCESSING │\n │ Parse timestamps | Extract subdomains | Normalize │\n │ FQDN | Resolve base domain | Lookup in whitelist │\n └───────────────────────┬────────────────────────────────┘\n │\n ┌───────────────┼───────────────┐\n │ │ │\n ┌───────▼──────┐ ┌──────▼──────┐ ┌──────▼──────┐\n │ ENTROPY │ │ BEACONING │ │ DGA │\n │ ANALYSIS │ │ DETECTION │ │ CLASSIFIER │\n │ │ │ │ │ │\n │ Shannon ent. │ │ Interval │ │ ML model │\n │ Subdomain │ │ analysis │ │ Random │\n │ length │ │ Jitter/CV │ │ Forest or │\n │ Encoding │ │ Duration │ │ Gradient │\n │ patterns │ │ Periodicity │ │ Boosting │\n └───────┬──────┘ └──────┬──────┘ └──────┬──────┘\n │ │ │\n ┌───────▼──────┐ ┌──────▼──────┐ ┌──────▼──────┐\n │ TXT RECORD │ │ TOOL │ │ PASSIVE │\n │ PAYLOAD │ │ SIGNATURE │ │ DNS │\n │ ANALYSIS │ │ MATCHING │ │ ENRICHMENT │\n │ │ │ │ │ │\n │ Base64 decode│ │ Iodine │ │ First seen │\n │ PE/ELF detect│ │ dnscat2 │ │ Registrar │\n │ PS stager │ │ dns2tcp │ │ Age check │\n │ Size anomaly │ │ Cobalt DNS │ │ Reputation │\n └───────┬──────┘ └──────┬──────┘ └──────┬──────┘\n │ │ │\n ┌───────▼───────────────▼───────────────▼────────────────┐\n │ CORRELATION ENGINE │\n │ Combine scores from all detectors │\n │ Weighted scoring: entropy(30%) + beacon(25%) + │\n │ DGA(20%) + TXT payload(15%) + signature(10%) │\n │ Threshold: score > 60 = alert, > 40 = investigate │\n └───────────────────────┬────────────────────────────────┘\n │\n ┌───────────────────────▼────────────────────────────────┐\n │ ALERTING & RESPONSE │\n │ Generate SIEM alert with all evidence │\n │ Block domain in DNS firewall / RPZ │\n │ Isolate endpoint via EDR │\n │ Create incident ticket with IOCs │\n └────────────────────────────────────────────────────────┘","type":"text"}]},{"type":"heading","attrs":{"level":3},"content":[{"text":"Step 7: SIEM Detection Rules","type":"text"}]},{"type":"paragraph","content":[{"text":"Deploy detection queries in your SIEM platform:","type":"text"}]},{"type":"code_block","attrs":{"wrap":false,"language":""},"content":[{"text":"Splunk SPL - DNS Tunneling Detection:\n━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━\n\n-- High entropy subdomain queries\nindex=dns sourcetype=\"bro:dns:json\" OR sourcetype=\"suricata:dns\"\n| eval subdomain=mvindex(split(query,\".\"),0)\n| eval sub_len=len(subdomain)\n| where sub_len > 30\n| eval char_counts=mvmap(split(subdomain,\"\"),1)\n| lookup dns_entropy_lookup subdomain OUTPUT entropy\n| where entropy > 3.5\n| stats count as query_count dc(query) as unique_queries\n avg(sub_len) as avg_sub_len values(query) as sample_queries\n by src_ip, domain\n| where query_count > 20\n| sort -query_count\n\n-- DNS TXT record abuse\nindex=dns (qtype=\"TXT\" OR qtype_name=\"TXT\")\n NOT (query=\"*._domainkey.*\" OR query=\"*._dmarc.*\" OR query=\"*._spf.*\")\n| stats count as txt_queries dc(query) as unique_txt_queries\n values(query) as domains\n by src_ip\n| where txt_queries > 50\n| sort -txt_queries\n\n-- DNS beaconing (regular interval queries)\nindex=dns sourcetype=\"bro:dns:json\"\n| bin _time span=60s\n| stats count by src_ip, query, _time\n| streamstats window=10 current=t avg(count) as avg_count stdev(count) as std_count by src_ip, query\n| eval cv = if(avg_count>0, (std_count/avg_count)*100, 100)\n| where cv \u003c 20 AND avg_count > 0\n| stats count as beacon_windows avg(cv) as avg_jitter\n min(_time) as first_seen max(_time) as last_seen\n by src_ip, query\n| where beacon_windows > 10\n| sort -beacon_windows\n\n-- Unusual record type volume (NULL, KEY, SRV for tunneling)\nindex=dns (qtype_name=\"NULL\" OR qtype_name=\"KEY\" OR qtype_name=\"SRV\"\n OR qtype_name=\"CNAME\" OR qtype_name=\"MX\")\n NOT qtype_name=\"A\" NOT qtype_name=\"AAAA\" NOT qtype_name=\"PTR\"\n| stats count by src_ip, qtype_name, query\n| where count > 10\n| sort -count","type":"text"}]},{"type":"code_block","attrs":{"wrap":false,"language":""},"content":[{"text":"Elastic KQL - DNS C2 Detection:\n━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━\n\n-- Long subdomain queries (potential tunneling)\ndns.question.name: * and not dns.question.name: *.in-addr.arpa\n| where length(dns.question.subdomain) > 40\n\n-- High volume DNS to single domain\nevent.dataset: \"zeek.dns\" or event.dataset: \"suricata.dns\"\n| stats count by source.ip, dns.question.registered_domain\n| where count > 500\n\n-- TXT record queries to non-standard domains\ndns.question.type: \"TXT\"\n and not dns.question.name: (*._domainkey.* or *._dmarc.* or *._spf.*)","type":"text"}]},{"type":"code_block","attrs":{"wrap":false,"language":""},"content":[{"text":"Zeek Script - DNS Tunneling Indicator:\n━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━\n\n# dns_tunnel_detect.zeek\n@load base/protocols/dns\n\nmodule DNSTunnel;\n\nexport {\n redef enum Notice::Type += {\n DNS_Tunneling_Suspected,\n DNS_High_Entropy_Query,\n DNS_Excessive_TXT_Queries,\n };\n\n const entropy_threshold = 3.5 &redef;\n const subdomain_length_threshold = 40 &redef;\n const txt_query_threshold = 50 &redef;\n const tracking_interval = 5min &redef;\n}\n\nglobal txt_query_tracker: table[addr] of count &create_expire=5min &default=0;\nglobal domain_query_tracker: table[addr, string] of count &create_expire=10min &default=0;\n\nfunction shannon_entropy(s: string): double\n{\n local counts: table[string] of count;\n local total = |s|;\n\n if (total == 0) return 0.0;\n\n for (i in s)\n {\n local c = s[i];\n if (c !in counts) counts[c] = 0;\n ++counts[c];\n }\n\n local ent = 0.0;\n for (ch, cnt in counts)\n {\n local p = cnt * 1.0 / total;\n ent -= p * log2(p);\n }\n\n return ent;\n}\n\nevent dns_request(c: connection, msg: dns_msg, query: string, qtype: count,\n qclass: count)\n{\n if (|query| == 0) return;\n\n # Track TXT queries\n if (qtype == 16) # TXT\n {\n ++txt_query_tracker[c$id$orig_h];\n if (txt_query_tracker[c$id$orig_h] == txt_query_threshold)\n {\n NOTICE([\n $note=DNS_Excessive_TXT_Queries,\n $conn=c,\n $msg=fmt(\"Host %s made %d TXT queries in tracking window\",\n c$id$orig_h, txt_query_threshold),\n $identifier=cat(c$id$orig_h),\n ]);\n }\n }\n\n # Extract subdomain and check entropy\n local parts = split_string(query, /\\./);\n if (|parts| \u003c 3) return;\n\n # Subdomain = everything except last two labels\n local subdomain = \"\";\n local i = 0;\n for (idx in parts)\n {\n if (i \u003c |parts| - 2)\n subdomain += parts[idx];\n ++i;\n }\n\n if (|subdomain| > subdomain_length_threshold)\n {\n local ent = shannon_entropy(subdomain);\n if (ent > entropy_threshold)\n {\n NOTICE([\n $note=DNS_High_Entropy_Query,\n $conn=c,\n $msg=fmt(\"High entropy DNS query: entropy=%.2f len=%d query=%s\",\n ent, |subdomain|, query),\n $identifier=cat(c$id$orig_h, query),\n ]);\n }\n }\n}","type":"text"}]},{"type":"heading","attrs":{"level":3},"content":[{"text":"Step 8: Suricata Rules for Known DNS C2 Tools","type":"text"}]},{"type":"code_block","attrs":{"wrap":false,"language":""},"content":[{"text":"# suricata-dns-c2.rules\n# DNS Tunneling and C2 Detection Rules\n\n# Iodine DNS tunnel detection\nalert dns any any -> any any (msg:\"ET TROJAN Iodine DNS Tunnel Activity - NULL Query\"; \\\n dns.query; content:\".\"; pcre:\"/^[a-z0-9]{50,}\\.[a-z0-9.-]+$/i\"; \\\n dns_query; content:\"|00 0a|\"; \\\n classtype:trojan-activity; sid:2030001; rev:1;)\n\n# dnscat2 DNS tunnel detection\nalert dns any any -> any any (msg:\"ET TROJAN dnscat2 DNS Tunnel - Handshake\"; \\\n dns.query; content:\"dnscat.\"; nocase; fast_pattern; \\\n classtype:trojan-activity; sid:2030002; rev:1;)\n\nalert dns any any -> any any (msg:\"ET TROJAN dnscat2 DNS Tunnel - Data Channel\"; \\\n dns.query; pcre:\"/^[a-f0-9]{16,}\\./i\"; \\\n dns_query; content:\"|00 10|\"; \\\n classtype:trojan-activity; sid:2030003; rev:1;)\n\n# Cobalt Strike DNS beacon\nalert dns any any -> any any (msg:\"ET TROJAN Cobalt Strike DNS Beacon - A Record\"; \\\n dns.query; pcre:\"/^[a-f0-9]{12,}\\.[a-z0-9.-]+$/i\"; \\\n threshold:type both, track by_src, count 20, seconds 60; \\\n classtype:trojan-activity; sid:2030004; rev:1;)\n\n# Generic DNS tunneling - high volume TXT queries to single domain\nalert dns any any -> any any (msg:\"ET POLICY Excessive TXT DNS Queries - Possible Tunneling\"; \\\n dns_query; content:\"|00 10|\"; \\\n threshold:type threshold, track by_src, count 50, seconds 300; \\\n classtype:policy-violation; sid:2030005; rev:1;)\n\n# Long subdomain query (generic tunneling indicator)\nalert dns any any -> any any (msg:\"ET POLICY Unusually Long DNS Subdomain - Possible Tunneling\"; \\\n dns.query; pcre:\"/^[a-z0-9-]{52,}\\./i\"; \\\n threshold:type limit, track by_src, count 1, seconds 60; \\\n classtype:policy-violation; sid:2030006; rev:1;)\n\n# DNS query for known C2 TXT payload staging\nalert dns any any -> any any (msg:\"ET TROJAN DNS TXT Record Staged Payload Request\"; \\\n dns_query; content:\"|00 10|\"; \\\n dns.query; pcre:\"/^(stage|payload|cmd|exec|download|update|config)\\d*\\./i\"; \\\n classtype:trojan-activity; sid:2030007; rev:1;)","type":"text"}]},{"type":"heading","attrs":{"level":2},"content":[{"text":"Key Concepts","type":"text"}]},{"type":"table","attrs":{"layout":null},"content":[{"type":"tr","content":[{"type":"th","attrs":{"colspan":1,"rowspan":1,"colwidth":null,"alignment":""},"content":[{"type":"paragraph","content":[{"text":"Term","type":"text"}]}]},{"type":"th","attrs":{"colspan":1,"rowspan":1,"colwidth":null,"alignment":""},"content":[{"type":"paragraph","content":[{"text":"Definition","type":"text"}]}]}]},{"type":"tr","content":[{"type":"td","attrs":{"colspan":1,"rowspan":1,"colwidth":null,"alignment":""},"content":[{"type":"paragraph","content":[{"text":"DNS Tunneling","type":"text","marks":[{"type":"strong"}]}]}]},{"type":"td","attrs":{"colspan":1,"rowspan":1,"colwidth":null,"alignment":""},"content":[{"type":"paragraph","content":[{"text":"Technique of encoding data within DNS queries and responses to create a covert communication channel, bypassing firewalls that allow DNS traffic","type":"text"}]}]}]},{"type":"tr","content":[{"type":"td","attrs":{"colspan":1,"rowspan":1,"colwidth":null,"alignment":""},"content":[{"type":"paragraph","content":[{"text":"Shannon Entropy","type":"text","marks":[{"type":"strong"}]}]}]},{"type":"td","attrs":{"colspan":1,"rowspan":1,"colwidth":null,"alignment":""},"content":[{"type":"paragraph","content":[{"text":"Information theory metric measuring randomness in a string; legitimate domains typically have entropy below 3.5, while encoded tunnel data exceeds 3.8-4.5","type":"text"}]}]}]},{"type":"tr","content":[{"type":"td","attrs":{"colspan":1,"rowspan":1,"colwidth":null,"alignment":""},"content":[{"type":"paragraph","content":[{"text":"Domain Generation Algorithm (DGA)","type":"text","marks":[{"type":"strong"}]}]}]},{"type":"td","attrs":{"colspan":1,"rowspan":1,"colwidth":null,"alignment":""},"content":[{"type":"paragraph","content":[{"text":"Malware technique that algorithmically generates thousands of pseudo-random domain names for C2 rendezvous, making domain-based blocking impractical","type":"text"}]}]}]},{"type":"tr","content":[{"type":"td","attrs":{"colspan":1,"rowspan":1,"colwidth":null,"alignment":""},"content":[{"type":"paragraph","content":[{"text":"DNS Beaconing","type":"text","marks":[{"type":"strong"}]}]}]},{"type":"td","attrs":{"colspan":1,"rowspan":1,"colwidth":null,"alignment":""},"content":[{"type":"paragraph","content":[{"text":"Regular, periodic DNS queries from a compromised host to a C2 domain, identifiable by consistent inter-query intervals and low timing jitter","type":"text"}]}]}]},{"type":"tr","content":[{"type":"td","attrs":{"colspan":1,"rowspan":1,"colwidth":null,"alignment":""},"content":[{"type":"paragraph","content":[{"text":"TXT Record Abuse","type":"text","marks":[{"type":"strong"}]}]}]},{"type":"td","attrs":{"colspan":1,"rowspan":1,"colwidth":null,"alignment":""},"content":[{"type":"paragraph","content":[{"text":"Using DNS TXT records to deliver encoded C2 commands or staged payloads, exploiting the large payload capacity (up to 65535 bytes across multiple strings)","type":"text"}]}]}]},{"type":"tr","content":[{"type":"td","attrs":{"colspan":1,"rowspan":1,"colwidth":null,"alignment":""},"content":[{"type":"paragraph","content":[{"text":"Iodine","type":"text","marks":[{"type":"strong"}]}]}]},{"type":"td","attrs":{"colspan":1,"rowspan":1,"colwidth":null,"alignment":""},"content":[{"type":"paragraph","content":[{"text":"Open-source DNS tunneling tool that tunnels IPv4 traffic through DNS using NULL, TXT, or CNAME records, commonly used to bypass captive portals","type":"text"}]}]}]},{"type":"tr","content":[{"type":"td","attrs":{"colspan":1,"rowspan":1,"colwidth":null,"alignment":""},"content":[{"type":"paragraph","content":[{"text":"dnscat2","type":"text","marks":[{"type":"strong"}]}]}]},{"type":"td","attrs":{"colspan":1,"rowspan":1,"colwidth":null,"alignment":""},"content":[{"type":"paragraph","content":[{"text":"Encrypted C2 tool that creates a command channel over DNS, supporting file transfer, port forwarding, and shell access through DNS queries","type":"text"}]}]}]},{"type":"tr","content":[{"type":"td","attrs":{"colspan":1,"rowspan":1,"colwidth":null,"alignment":""},"content":[{"type":"paragraph","content":[{"text":"Cobalt Strike DNS Beacon","type":"text","marks":[{"type":"strong"}]}]}]},{"type":"td","attrs":{"colspan":1,"rowspan":1,"colwidth":null,"alignment":""},"content":[{"type":"paragraph","content":[{"text":"Commercial C2 framework's DNS communication mode that uses A, AAAA, and TXT records to receive tasks and return results via DNS resolution","type":"text"}]}]}]},{"type":"tr","content":[{"type":"td","attrs":{"colspan":1,"rowspan":1,"colwidth":null,"alignment":""},"content":[{"type":"paragraph","content":[{"text":"Passive DNS (pDNS)","type":"text","marks":[{"type":"strong"}]}]}]},{"type":"td","attrs":{"colspan":1,"rowspan":1,"colwidth":null,"alignment":""},"content":[{"type":"paragraph","content":[{"text":"Database of historical DNS resolution data collected by monitoring DNS traffic; used to identify infrastructure reuse and domain history","type":"text"}]}]}]},{"type":"tr","content":[{"type":"td","attrs":{"colspan":1,"rowspan":1,"colwidth":null,"alignment":""},"content":[{"type":"paragraph","content":[{"text":"Response Policy Zone (RPZ)","type":"text","marks":[{"type":"strong"}]}]}]},{"type":"td","attrs":{"colspan":1,"rowspan":1,"colwidth":null,"alignment":""},"content":[{"type":"paragraph","content":[{"text":"DNS firewall mechanism that allows real-time blocking of malicious domains by injecting override responses at the recursive resolver level","type":"text"}]}]}]},{"type":"tr","content":[{"type":"td","attrs":{"colspan":1,"rowspan":1,"colwidth":null,"alignment":""},"content":[{"type":"paragraph","content":[{"text":"Coefficient of Variation","type":"text","marks":[{"type":"strong"}]}]}]},{"type":"td","attrs":{"colspan":1,"rowspan":1,"colwidth":null,"alignment":""},"content":[{"type":"paragraph","content":[{"text":"Standard deviation divided by mean, expressed as percentage; used to measure beacon jitter -- lower CV indicates more regular (suspicious) timing","type":"text"}]}]}]},{"type":"tr","content":[{"type":"td","attrs":{"colspan":1,"rowspan":1,"colwidth":null,"alignment":""},"content":[{"type":"paragraph","content":[{"text":"NXDOMAIN","type":"text","marks":[{"type":"strong"}]}]}]},{"type":"td","attrs":{"colspan":1,"rowspan":1,"colwidth":null,"alignment":""},"content":[{"type":"paragraph","content":[{"text":"DNS response code indicating the queried domain does not exist; high NXDOMAIN rates from a host suggest DGA activity where most generated domains are unregistered","type":"text"}]}]}]}]},{"type":"heading","attrs":{"level":2},"content":[{"text":"Tools & Systems","type":"text"}]},{"type":"bullet_list","content":[{"type":"list_item","content":[{"type":"paragraph","content":[{"text":"Zeek (Bro)","type":"text","marks":[{"type":"strong"}]},{"text":": Network security monitor that produces structured dns.log with query/response details for offline analysis","type":"text"}]}]},{"type":"list_item","content":[{"type":"paragraph","content":[{"text":"Suricata","type":"text","marks":[{"type":"strong"}]},{"text":": IDS/IPS with DNS protocol parsing and signature-based detection of tunneling patterns","type":"text"}]}]},{"type":"list_item","content":[{"type":"paragraph","content":[{"text":"tshark/Wireshark","type":"text","marks":[{"type":"strong"}]},{"text":": Packet capture and analysis tools for deep DNS protocol inspection","type":"text"}]}]},{"type":"list_item","content":[{"type":"paragraph","content":[{"text":"tldextract","type":"text","marks":[{"type":"strong"}]},{"text":": Python library for accurate domain/subdomain extraction using the Public Suffix List","type":"text"}]}]},{"type":"list_item","content":[{"type":"paragraph","content":[{"text":"dnspython","type":"text","marks":[{"type":"strong"}]},{"text":": Python DNS toolkit for programmatic query resolution and record parsing","type":"text"}]}]},{"type":"list_item","content":[{"type":"paragraph","content":[{"text":"scikit-learn","type":"text","marks":[{"type":"strong"}]},{"text":": ML library used to train DGA classifiers (Random Forest, Gradient Boosting)","type":"text"}]}]},{"type":"list_item","content":[{"type":"paragraph","content":[{"text":"Farsight DNSDB / CIRCL pDNS","type":"text","marks":[{"type":"strong"}]},{"text":": Passive DNS databases for historical domain resolution lookups","type":"text"}]}]},{"type":"list_item","content":[{"type":"paragraph","content":[{"text":"DNS Response Policy Zone (RPZ)","type":"text","marks":[{"type":"strong"}]},{"text":": Recursive resolver feature for real-time DNS blocking of identified C2 domains","type":"text"}]}]},{"type":"list_item","content":[{"type":"paragraph","content":[{"text":"Splunk / Elastic","type":"text","marks":[{"type":"strong"}]},{"text":": SIEM platforms for DNS log aggregation, entropy calculation, and beacon detection queries","type":"text"}]}]}]},{"type":"heading","attrs":{"level":2},"content":[{"text":"Common Scenarios","type":"text"}]},{"type":"heading","attrs":{"level":3},"content":[{"text":"Scenario: Investigating Suspected DNS Tunneling from an Internal Host","type":"text"}]},{"type":"paragraph","content":[{"text":"Context","type":"text","marks":[{"type":"strong"}]},{"text":": The SOC receives an alert from the DNS firewall showing a single internal host (10.1.5.42) making 15,000+ DNS queries to the domain ","type":"text"},{"text":"c8a3f1e2.tunnelsvc.example.com","type":"text","marks":[{"type":"code_inline"}]},{"text":" in the past hour. All queries are TXT type with long, random-looking subdomains. Normal DNS volume for this host is ~200 queries/hour.","type":"text"}]},{"type":"paragraph","content":[{"text":"Approach","type":"text","marks":[{"type":"strong"}]},{"text":":","type":"text"}]},{"type":"ordered_list","attrs":{"order":1,"listStyle":"number"},"content":[{"type":"list_item","content":[{"type":"paragraph","content":[{"text":"Extract all DNS queries from 10.1.5.42 for the past 24 hours from Zeek dns.log","type":"text"}]}]},{"type":"list_item","content":[{"type":"paragraph","content":[{"text":"Run entropy analysis on subdomain strings -- expect Shannon entropy > 4.0 for encoded tunnel data","type":"text"}]}]},{"type":"list_item","content":[{"type":"paragraph","content":[{"text":"Check query timing intervals for beaconing pattern (likely sub-second for active tunnel)","type":"text"}]}]},{"type":"list_item","content":[{"type":"paragraph","content":[{"text":"Examine TXT record responses for size anomalies (tunnel tools use maximum-size TXT responses)","type":"text"}]}]},{"type":"list_item","content":[{"type":"paragraph","content":[{"text":"Compare subdomain patterns against known tool signatures (Iodine, dnscat2, dns2tcp)","type":"text"}]}]},{"type":"list_item","content":[{"type":"paragraph","content":[{"text":"Query passive DNS for ","type":"text"},{"text":"tunnelsvc.example.com","type":"text","marks":[{"type":"code_inline"}]},{"text":" registration date, nameserver, and historical resolutions","type":"text"}]}]},{"type":"list_item","content":[{"type":"paragraph","content":[{"text":"If confirmed, add domain to DNS RPZ blocklist and isolate endpoint via EDR","type":"text"}]}]},{"type":"list_item","content":[{"type":"paragraph","content":[{"text":"Capture full packet trace for forensic analysis of tunnel payload content","type":"text"}]}]}]},{"type":"paragraph","content":[{"text":"Pitfalls","type":"text","marks":[{"type":"strong"}]},{"text":":","type":"text"}]},{"type":"bullet_list","content":[{"type":"list_item","content":[{"type":"paragraph","content":[{"text":"Blocking the domain before capturing evidence (need packet captures for forensics)","type":"text"}]}]},{"type":"list_item","content":[{"type":"paragraph","content":[{"text":"Assuming all high-entropy DNS is malicious (CDN subdomains like Akamai can have high entropy)","type":"text"}]}]},{"type":"list_item","content":[{"type":"paragraph","content":[{"text":"Not checking for multiple tunnel domains (attacker may have fallback C2 channels)","type":"text"}]}]},{"type":"list_item","content":[{"type":"paragraph","content":[{"text":"Missing the initial compromise vector by focusing only on the DNS channel","type":"text"}]}]},{"type":"list_item","content":[{"type":"paragraph","content":[{"text":"Not checking other hosts for similar patterns (lateral movement may have already occurred)","type":"text"}]}]}]},{"type":"heading","attrs":{"level":3},"content":[{"text":"Scenario: Building a DGA Detection Model for SOC Deployment","type":"text"}]},{"type":"paragraph","content":[{"text":"Context","type":"text","marks":[{"type":"strong"}]},{"text":": The threat intelligence team identified that a botnet family active in the industry uses DGA for C2 domain generation. The SOC needs an automated way to classify DNS queries as potentially DGA-generated and alert on matches.","type":"text"}]},{"type":"paragraph","content":[{"text":"Approach","type":"text","marks":[{"type":"strong"}]},{"text":":","type":"text"}]},{"type":"ordered_list","attrs":{"order":1,"listStyle":"number"},"content":[{"type":"list_item","content":[{"type":"paragraph","content":[{"text":"Collect training data: Tranco/Alexa top 1M for legitimate domains, DGArchive or OSINT feeds for known DGA domains","type":"text"}]}]},{"type":"list_item","content":[{"type":"paragraph","content":[{"text":"Extract character-level features: entropy, length, digit ratio, consonant sequences, bigram scores","type":"text"}]}]},{"type":"list_item","content":[{"type":"paragraph","content":[{"text":"Train Random Forest and Gradient Boosting classifiers, evaluate with 5-fold cross-validation","type":"text"}]}]},{"type":"list_item","content":[{"type":"paragraph","content":[{"text":"Deploy the model as a scoring enrichment in the SIEM (Splunk ML Toolkit or Elastic ML)","type":"text"}]}]},{"type":"list_item","content":[{"type":"paragraph","content":[{"text":"Set threshold: DGA probability > 0.85 generates alert, > 0.65 generates investigation ticket","type":"text"}]}]},{"type":"list_item","content":[{"type":"paragraph","content":[{"text":"Create a whitelist of known high-entropy legitimate domains (CDNs, cloud services) to reduce false positives","type":"text"}]}]},{"type":"list_item","content":[{"type":"paragraph","content":[{"text":"Retrain monthly with new DGA samples from threat intel feeds","type":"text"}]}]}]},{"type":"paragraph","content":[{"text":"Pitfalls","type":"text","marks":[{"type":"strong"}]},{"text":":","type":"text"}]},{"type":"bullet_list","content":[{"type":"list_item","content":[{"type":"paragraph","content":[{"text":"Training only on one DGA family and missing others (dictionary-based DGAs like Suppobox have low entropy)","type":"text"}]}]},{"type":"list_item","content":[{"type":"paragraph","content":[{"text":"Not whitelisting CDN and cloud service domains that have randomized subdomains","type":"text"}]}]},{"type":"list_item","content":[{"type":"paragraph","content":[{"text":"Setting the threshold too low, overwhelming the SOC with false positives","type":"text"}]}]},{"type":"list_item","content":[{"type":"paragraph","content":[{"text":"Not accounting for punycode/internationalized domain names in feature extraction","type":"text"}]}]},{"type":"list_item","content":[{"type":"paragraph","content":[{"text":"Deploying without a feedback loop for analysts to flag false positives for model retraining","type":"text"}]}]}]},{"type":"heading","attrs":{"level":2},"content":[{"text":"Output Format","type":"text"}]},{"type":"code_block","attrs":{"wrap":false,"language":""},"content":[{"text":"DNS C2 DETECTION ANALYSIS REPORT\n====================================\nAnalysis Period: 2026-03-15 00:00 to 2026-03-19 23:59\nData Source: Zeek dns.log (gateway sensor)\nTotal Queries: 14,283,501\nUnique Domains: 892,041\nHosts Analyzed: 3,847\n\nENTROPY ANALYSIS\nQueries with entropy > 3.5: 2,847 (0.02%)\nQueries with subdomain > 40 chars: 1,203 (0.008%)\nSuspicious base domains: 12\n\n [CRITICAL] tunnelsvc.example[.]com\n Queries: 15,247 Source: 10.1.5.42 Avg Entropy: 4.21\n Avg Subdomain Length: 63 Record Types: TXT (98%), A (2%)\n Tool Signature: dnscat2 (hex prefix pattern match)\n\n [HIGH] update-cdn.malicious[.]net\n Queries: 3,891 Source: 10.1.12.7 Avg Entropy: 3.87\n Avg Subdomain Length: 48 Record Types: A (60%), TXT (40%)\n Tool Signature: Cobalt Strike DNS beacon (interval pattern)\n\nBEACONING DETECTION\nBeacon patterns detected: 4\n\n Score: 85.0 10.1.5.42 -> tunnelsvc.example[.]com\n Interval: 0.5s +/- 0.1s Jitter: 8.2% Duration: 18.4h\n Queries: 15,247 Flags: very_low_jitter, persistent, high_volume\n\n Score: 72.0 10.1.12.7 -> update-cdn.malicious[.]net\n Interval: 60.2s +/- 3.1s Jitter: 5.1% Duration: 72.1h\n Queries: 3,891 Flags: very_low_jitter, persistent, common_c2_interval:~60s\n\nDGA CLASSIFICATION\nDomains classified: 892,041\nDGA predictions (>0.85 conf): 47\nDGA predictions (0.65-0.85): 183\n\n [HIGH] a8f3k2m1x9.com (DGA prob: 0.97, entropy: 3.92)\n [HIGH] j7t2p5q8w3.net (DGA prob: 0.95, entropy: 4.01)\n [HIGH] m3x8k1f6y2.org (DGA prob: 0.94, entropy: 3.88)\n\nTXT RECORD ANALYSIS\nSuspicious TXT responses: 8\nBase64 payloads detected: 3\nPowerShell stager patterns: 1\n\n [CRITICAL] cmd.staging[.]example.com\n TXT Length: 4,096 Entropy: 5.82\n Finding: Base64-encoded PowerShell stager with IEX pattern\n\nRECOMMENDED ACTIONS\n[CRITICAL] Block tunnelsvc.example[.]com and update-cdn.malicious[.]net in DNS RPZ\n[CRITICAL] Isolate hosts 10.1.5.42 and 10.1.12.7 for forensic investigation\n[HIGH] Block 47 high-confidence DGA domains in DNS firewall\n[HIGH] Investigate cmd.staging[.]example.com TXT payload staging\n[MEDIUM] Review 183 moderate-confidence DGA domains with threat intel\n[MEDIUM] Deploy Suricata rules for dnscat2 and Cobalt Strike DNS signatures","type":"text"}]},{"type":"hr","attrs":{"markup":"---"}}]},"metadata":{"date":"2026-06-05","name":"detecting-command-and-control-over-dns","tags":["dns","c2","tunneling","dga","network-forensics","threat-detection"],"author":"@skillopedia","domain":"cybersecurity","source":{"stars":13207,"repo_name":"anthropic-cybersecurity-skills","origin_url":"https://github.com/mukul975/anthropic-cybersecurity-skills/blob/HEAD/skills/detecting-command-and-control-over-dns/SKILL.md","repo_owner":"mukul975","body_sha256":"43236a6530575316cf9f973bfa67952bc4a1b6c3d95741dbd82abc0fc689fa38","cluster_key":"5866ab8ebd2aa14a77984ca6ccdbbd086a151167e170b387b2d970b5e87df58c","clean_bundle":{"format":"clean-skill-bundle-v1","source":"mukul975/anthropic-cybersecurity-skills/skills/detecting-command-and-control-over-dns/SKILL.md","attachments":[{"id":"7f4c0453-81e2-575f-bcb2-560891615eda","key":"uploads/10433ee7-ad12-4ae0-b34e-97553e46c6c8/7f4c0453-81e2-575f-bcb2-560891615eda/attachment.md","path":"references/api-reference.md","size":8333,"sha256":"d7715d9e11858d671cb4e5a3ac8b5e0ba6f00d9ba315c9d4aac6cdaa1a518cfc","contentType":"text/markdown; charset=utf-8"},{"id":"dbd19598-6bea-5018-ad4d-5389e7ac6e4f","key":"uploads/10433ee7-ad12-4ae0-b34e-97553e46c6c8/dbd19598-6bea-5018-ad4d-5389e7ac6e4f/attachment.py","path":"scripts/agent.py","size":40670,"sha256":"7dddc52c41603c926bc638441a0405e1d23cbd44c133437af14375ce7f5bbba2","contentType":"text/x-python; charset=utf-8"}],"bundle_sha256":"8cdbfc6625d442c439574a27dfe544340db870d601cd0b6eb633d96b93f00b90","attachment_count":2,"text_attachments":2,"attachment_storage":"skillopedia-attachments-v1","binary_attachments":0,"excluded_attachments":[]},"cluster_size":1,"skill_md_path":"skills/detecting-command-and-control-over-dns/SKILL.md","import_metadata":{"date":"2026-06-05","author":"@skillopedia","version":"v1","category":"security","category_label":"Security"},"exact_dupes_collapsed_into_this":0},"license":"Apache-2.0","version":"v1","category":"security","nist_csf":["PR.IR-01","DE.CM-01","ID.AM-03","PR.DS-02"],"subdomain":"network-security","import_tag":"clean-skills-v1","description":"Detects command-and-control (C2) communications tunneled through DNS protocol including DNS tunneling tools (Iodine, dnscat2, dns2tcp, Cobalt Strike DNS beacon), domain generation algorithms (DGA), encoded payload delivery via TXT/CNAME records, and DNS beaconing patterns. Covers Shannon entropy analysis of query subdomains, statistical anomaly detection, ML-based DGA classification, passive DNS correlation, and Zeek/Suricata signature development. Activates for requests involving DNS-based C2 detection, DNS tunnel identification, suspicious DNS traffic investigation, or DGA domain classification.\n"}},"renderedAt":1782980881603}

Detecting Command and Control Over DNS When to Use - Investigating suspected DNS tunneling used for C2 communication or data exfiltration - Analyzing DNS query logs for signs of encoded payloads in subdomain strings - Classifying domains as DGA-generated vs. legitimate using statistical or ML methods - Detecting DNS beaconing patterns (regular intervals, consistent query sizes) - Hunting for Iodine, dnscat2, dns2tcp, Cobalt Strike DNS, or Sliver DNS traffic - Monitoring TXT record abuse for command delivery or staged payload download - Building DNS anomaly detection rules for SOC/SIEM deploym…