Implementing TAXII Server with OpenTAXII Overview TAXII (Trusted Automated eXchange of Intelligence Information) is an OASIS standard protocol for exchanging cyber threat intelligence over HTTPS. OpenTAXII is an open-source TAXII server implementation by EclecticIQ that supports TAXII 1.x, while the OASIS cti-taxii-server provides a TAXII 2.1 reference implementation. This skill covers deploying a TAXII server, configuring collections for threat intelligence feeds, publishing STIX 2.1 bundles, and integrating with SIEM/SOAR platforms for automated indicator ingestion. When to Use - When deplo…

]\",\n pattern_type=\"stix\",\n valid_from=datetime(2020, 12, 13),\n indicator_types=[\"malicious-activity\"],\n object_marking_refs=[TLP_WHITE],\n )\n\n rel = Relationship(\n relationship_type=\"indicates\",\n source_ref=indicator_hash.id,\n target_ref=malware.id,\n )\n\n return [malware, indicator_hash, indicator_domain, rel]\n\npublisher = TAXIIPublisher(\n \"https://taxii.organization.com/taxii2/\",\n \"admin\", \"admin_password_change_me\"\n)\ncollections = publisher.list_collections()\nindicators = publisher.create_malware_indicators()\npublisher.publish_indicators(\"91a7b528-80eb-42ed-a74d-c6fbd5a26116\", indicators)\n```\n\n### Step 4: Consume Intelligence from TAXII Collections\n\n```python\nfrom taxii2client.v21 import Server, Collection, as_pages\nimport json\n\nclass TAXIIConsumer:\n def __init__(self, server_url, username, password):\n self.server = Server(server_url, user=username, password=password)\n\n def poll_collection(self, collection_id, added_after=None):\n \"\"\"Poll a collection for new STIX objects.\"\"\"\n api_root = self.server.api_roots[0]\n collection = Collection(\n f\"{api_root.url}collections/{collection_id}/\",\n user=self.server._user,\n password=self.server._password,\n )\n\n kwargs = {}\n if added_after:\n kwargs[\"added_after\"] = added_after\n\n all_objects = []\n for bundle in as_pages(collection.get_objects, per_request=50, **kwargs):\n objects = json.loads(bundle).get(\"objects\", [])\n all_objects.extend(objects)\n\n indicators = [o for o in all_objects if o.get(\"type\") == \"indicator\"]\n malware = [o for o in all_objects if o.get(\"type\") == \"malware\"]\n relationships = [o for o in all_objects if o.get(\"type\") == \"relationship\"]\n\n print(f\"[+] Polled {len(all_objects)} objects: \"\n f\"{len(indicators)} indicators, {len(malware)} malware, \"\n f\"{len(relationships)} relationships\")\n return all_objects\n\n def extract_iocs_for_siem(self, stix_objects):\n \"\"\"Extract IOCs from STIX objects for SIEM ingestion.\"\"\"\n iocs = []\n for obj in stix_objects:\n if obj.get(\"type\") == \"indicator\":\n pattern = obj.get(\"pattern\", \"\")\n iocs.append({\n \"id\": obj.get(\"id\"),\n \"name\": obj.get(\"name\", \"\"),\n \"pattern\": pattern,\n \"valid_from\": obj.get(\"valid_from\", \"\"),\n \"indicator_types\": obj.get(\"indicator_types\", []),\n \"confidence\": obj.get(\"confidence\", 0),\n })\n return iocs\n\nconsumer = TAXIIConsumer(\n \"https://taxii.organization.com/taxii2/\",\n \"analyst\", \"analyst_password_change_me\"\n)\nobjects = consumer.poll_collection(\"91a7b528-80eb-42ed-a74d-c6fbd5a26116\")\niocs = consumer.extract_iocs_for_siem(objects)\n```\n\n### Step 5: Integrate with SIEM/SOAR\n\n```python\nimport requests\n\ndef push_to_splunk(iocs, splunk_url, hec_token):\n \"\"\"Push extracted IOCs to Splunk via HEC.\"\"\"\n headers = {\"Authorization\": f\"Splunk {hec_token}\"}\n for ioc in iocs:\n event = {\n \"event\": ioc,\n \"sourcetype\": \"stix:indicator\",\n \"source\": \"taxii-server\",\n \"index\": \"threat_intel\",\n }\n resp = requests.post(\n f\"{splunk_url}/services/collector/event\",\n headers=headers,\n json=event,\n verify=not os.environ.get(\"SKIP_TLS_VERIFY\", \"\").lower() == \"true\", # Set SKIP_TLS_VERIFY=true for self-signed certs in lab environments\n )\n if resp.status_code != 200:\n print(f\"[-] Splunk HEC error: {resp.text}\")\n print(f\"[+] Pushed {len(iocs)} IOCs to Splunk\")\n\ndef push_to_elasticsearch(iocs, es_url, index=\"threat-intel\"):\n \"\"\"Push IOCs to Elasticsearch.\"\"\"\n for ioc in iocs:\n resp = requests.post(\n f\"{es_url}/{index}/_doc\",\n json=ioc,\n headers={\"Content-Type\": \"application/json\"},\n )\n if resp.status_code not in (200, 201):\n print(f\"[-] ES error: {resp.text}\")\n print(f\"[+] Indexed {len(iocs)} IOCs in Elasticsearch\")\n```\n\n## Validation Criteria\n\n- TAXII 2.1 server deployed and accessible via HTTPS\n- Collections created with appropriate read/write permissions\n- STIX 2.1 bundles published successfully to collections\n- Consumer can poll and retrieve objects with filtering\n- IOCs extracted and forwarded to SIEM platform\n- Authentication and authorization enforced correctly\n\n## References\n\n- [TAXII 2.1 Specification](https://docs.oasis-open.org/cti/taxii/v2.1/os/taxii-v2.1-os.html)\n- [OASIS CTI Documentation](https://oasis-open.github.io/cti-documentation/)\n- [EclecticIQ OpenTAXII](https://www.eclecticiq.com/open-source)\n- [cti-taxii-server (Medallion)](https://github.com/oasis-open/cti-taxii-server)\n- [taxii2-client Python Library](https://github.com/oasis-open/cti-taxii-client)\n- [Kraven Security: STIX/TAXII Complete Guide](https://kravensecurity.com/stix-and-taxii-a-full-guide/)\n---","attachment_filenames":["references/api-reference.md","scripts/agent.py"],"attachments":[{"filename":"references/api-reference.md","content":"# API Reference: OpenTAXII Server\n\n## Libraries Used\n\n| Library | Purpose |\n|---------|---------|\n| `opentaxii` | TAXII 1.x and 2.x server implementation |\n| `taxii2-client` | TAXII 2.1 client for testing and integration |\n| `stix2` | Create and parse STIX 2.1 objects |\n| `requests` | HTTP client for direct API testing |\n\n## Installation\n\n```bash\n# Server\npip install opentaxii\n\n# Client and testing\npip install taxii2-client stix2 requests\n```\n\n## Server Configuration\n\n### opentaxii.yml\n```yaml\n---\npersistence_api:\n class: opentaxii.persistence.sqldb.SQLDatabaseAPI\n parameters:\n db_connection: sqlite:////tmp/opentaxii.db\n create_tables: true\n\nauth_api:\n class: opentaxii.auth.sqldb.SQLDatabaseAuth\n parameters:\n db_connection: sqlite:////tmp/opentaxii.db\n create_tables: true\n secret: \"change-this-secret-in-production\"\n\ntaxii1:\n save_raw_inbox_messages: false\n\nlogging:\n opentaxii: info\n root: info\n```\n\n### Start the Server\n```bash\n# Set config path\nexport OPENTAXII_CONFIG=/path/to/opentaxii.yml\n\n# Run the server\nopentaxii-run-dev --host 0.0.0.0 --port 9000\n\n# Production (with gunicorn)\ngunicorn opentaxii.http:app --bind 0.0.0.0:9000\n```\n\n## TAXII 2.1 API Endpoints\n\n| Method | Endpoint | Description |\n|--------|----------|-------------|\n| GET | `/taxii2/` | Server discovery |\n| GET | `/{api-root}/` | API root information |\n| GET | `/{api-root}/collections/` | List collections |\n| GET | `/{api-root}/collections/{id}/` | Get collection details |\n| GET | `/{api-root}/collections/{id}/objects/` | Get STIX objects |\n| POST | `/{api-root}/collections/{id}/objects/` | Add STIX objects |\n| GET | `/{api-root}/collections/{id}/manifest/` | Object manifest |\n| GET | `/{api-root}/status/{id}/` | Async operation status |\n\n## Server Administration\n\n### Create Collections via CLI\n```bash\nopentaxii-create-services -c services.yml\nopentaxii-create-collections -c collections.yml\nopentaxii-create-account --username admin --password admin123\n```\n\n### collections.yml\n```yaml\n---\n- name: \"threat-indicators\"\n id: \"collection-001\"\n description: \"Threat intelligence indicators\"\n type: \"DATA_FEED\"\n accept_all_content: true\n can_read: true\n can_write: true\n\n- name: \"malware-samples\"\n id: \"collection-002\"\n description: \"Malware sample hashes and metadata\"\n type: \"DATA_SET\"\n can_read: true\n can_write: false\n```\n\n## Client Operations\n\n### Discover Server and Collections\n```python\nfrom taxii2client.v21 import Server\nimport os\n\nserver = Server(\n os.environ.get(\"OPENTAXII_URL\", \"http://localhost:9000/taxii2/\"),\n user=os.environ.get(\"TAXII_USER\", \"admin\"),\n password=os.environ.get(\"TAXII_PASS\", \"admin123\"),\n)\n\nfor api_root in server.api_roots:\n print(f\"API Root: {api_root.title}\")\n for coll in api_root.collections:\n print(f\" {coll.title} (ID: {coll.id})\")\n print(f\" Read: {coll.can_read} | Write: {coll.can_write}\")\n```\n\n### Push STIX Objects to a Collection\n```python\nimport stix2\nfrom taxii2client.v21 import Collection\n\ncollection = Collection(\n f\"http://localhost:9000/collections/collection-001/\",\n user=\"admin\",\n password=\"admin123\",\n)\n\nindicator = stix2.Indicator(\n name=\"Malicious C2 Domain\",\n pattern=\"[domain-name:value = 'evil.example.com']\",\n pattern_type=\"stix\",\n valid_from=\"2025-01-15T00:00:00Z\",\n labels=[\"malicious-activity\"],\n)\n\nbundle = stix2.Bundle(objects=[indicator])\ncollection.add_objects(bundle.serialize())\n```\n\n### Fetch Objects from a Collection\n```python\nobjects = collection.get_objects()\nfor obj in objects.get(\"objects\", []):\n print(f\" {obj['type']}: {obj.get('name', obj['id'])}\")\n```\n\n## Health Check\n\n```python\nimport requests\n\nresp = requests.get(\n \"http://localhost:9000/taxii2/\",\n auth=(\"admin\", \"admin123\"),\n timeout=10,\n)\nif resp.status_code == 200:\n discovery = resp.json()\n print(f\"Server title: {discovery.get('title')}\")\n print(f\"API roots: {discovery.get('api_roots', [])}\")\n```\n\n## Output Format\n\n```json\n{\n \"title\": \"OpenTAXII TAXII 2.1 Server\",\n \"description\": \"Threat intelligence sharing server\",\n \"api_roots\": [\"http://localhost:9000/api/\"],\n \"collections\": [\n {\n \"id\": \"collection-001\",\n \"title\": \"threat-indicators\",\n \"can_read\": true,\n \"can_write\": true,\n \"media_types\": [\"application/stix+json;version=2.1\"]\n }\n ]\n}\n```\n","content_type":"text/markdown; charset=utf-8","language":"markdown","size":4332,"content_sha256":"ba86d9d17e6f60fb540fcc4e9928e1d5a90fd263481e32e034b55bcd69dc4a48"},{"filename":"scripts/agent.py","content":"#!/usr/bin/env python3\n\"\"\"OpenTAXII server configuration and health audit agent.\n\nAudits an OpenTAXII server instance by checking service discovery,\ncollection availability, content block statistics, and API health.\nSupports both TAXII 1.1 and 2.0/2.1 endpoints.\n\"\"\"\nimport argparse\nimport json\nimport os\nimport sys\nfrom datetime import datetime, timezone\n\ntry:\n import requests\nexcept ImportError:\n print(\"[!] 'requests' required: pip install requests\", file=sys.stderr)\n sys.exit(1)\n\ntry:\n from taxii2client.v20 import Server as Server20\n from taxii2client.v21 import Server as Server21\n HAS_TAXII_CLIENT = True\nexcept ImportError:\n HAS_TAXII_CLIENT = False\n\n\ndef check_taxii1_discovery(base_url, username=None, password=None):\n \"\"\"Check TAXII 1.1 discovery service.\"\"\"\n findings = []\n discovery_url = f\"{base_url}/services/discovery\"\n print(f\"[*] Checking TAXII 1.1 discovery: {discovery_url}\")\n\n headers = {\"Content-Type\": \"application/xml\",\n \"X-TAXII-Content-Type\": \"urn:taxii.mitre.org:message:xml:1.1\",\n \"X-TAXII-Protocol\": \"urn:taxii.mitre.org:protocol:http:1.0\"}\n discovery_xml = (\n '\u003cDiscovery_Request xmlns=\"http://taxii.mitre.org/messages/taxii_xml_binding-1.1\" '\n 'message_id=\"1\"/>'\n )\n\n auth = (username, password) if username else None\n try:\n resp = requests.post(discovery_url, data=discovery_xml, headers=headers,\n auth=auth, timeout=15)\n if resp.status_code == 200:\n findings.append({\n \"check\": \"TAXII 1.1 Discovery\",\n \"status\": \"PASS\",\n \"severity\": \"INFO\",\n \"detail\": f\"Discovery service responding ({len(resp.content)} bytes)\",\n })\n else:\n findings.append({\n \"check\": \"TAXII 1.1 Discovery\",\n \"status\": \"FAIL\",\n \"severity\": \"HIGH\",\n \"detail\": f\"HTTP {resp.status_code}\",\n })\n except requests.RequestException as e:\n findings.append({\n \"check\": \"TAXII 1.1 Discovery\",\n \"status\": \"FAIL\",\n \"severity\": \"HIGH\",\n \"detail\": str(e)[:100],\n })\n\n return findings\n\n\ndef check_taxii2_discovery(base_url, username=None, password=None, version=\"2.1\"):\n \"\"\"Check TAXII 2.0/2.1 discovery and collections.\"\"\"\n findings = []\n print(f\"[*] Checking TAXII {version} discovery: {base_url}\")\n\n if HAS_TAXII_CLIENT:\n try:\n kwargs = {}\n if username and password:\n kwargs[\"user\"] = username\n kwargs[\"password\"] = password\n if version == \"2.0\":\n server = Server20(base_url, **kwargs)\n else:\n server = Server21(base_url, **kwargs)\n\n findings.append({\n \"check\": f\"TAXII {version} Discovery\",\n \"status\": \"PASS\",\n \"severity\": \"INFO\",\n \"detail\": f\"Server: {server.title or 'Untitled'}\",\n })\n\n for api_root in server.api_roots:\n collections = list(api_root.collections)\n findings.append({\n \"check\": f\"API Root: {api_root.title or api_root.url}\",\n \"status\": \"PASS\",\n \"severity\": \"INFO\",\n \"detail\": f\"{len(collections)} collections\",\n \"collections\": [{\n \"id\": c.id,\n \"title\": c.title,\n \"can_read\": getattr(c, \"can_read\", True),\n \"can_write\": getattr(c, \"can_write\", False),\n } for c in collections],\n })\n\n except Exception as e:\n findings.append({\n \"check\": f\"TAXII {version} Discovery\",\n \"status\": \"FAIL\",\n \"severity\": \"HIGH\",\n \"detail\": str(e)[:100],\n })\n else:\n # Fallback to raw HTTP\n auth = (username, password) if username else None\n headers = {\"Accept\": \"application/taxii+json;version=2.1\"}\n try:\n resp = requests.get(base_url, headers=headers, auth=auth, timeout=15)\n if resp.status_code == 200:\n data = resp.json()\n findings.append({\n \"check\": f\"TAXII {version} Discovery\",\n \"status\": \"PASS\",\n \"severity\": \"INFO\",\n \"detail\": f\"Title: {data.get('title', 'N/A')}, \"\n f\"API Roots: {len(data.get('api_roots', []))}\",\n })\n else:\n findings.append({\n \"check\": f\"TAXII {version} Discovery\",\n \"status\": \"FAIL\",\n \"severity\": \"HIGH\",\n \"detail\": f\"HTTP {resp.status_code}\",\n })\n except requests.RequestException as e:\n findings.append({\n \"check\": f\"TAXII {version} Discovery\",\n \"status\": \"FAIL\",\n \"severity\": \"HIGH\",\n \"detail\": str(e)[:100],\n })\n\n return findings\n\n\ndef check_server_health(base_url):\n \"\"\"Check basic server health.\"\"\"\n findings = []\n print(f\"[*] Checking server health: {base_url}\")\n\n # Check TLS\n if base_url.startswith(\"https://\"):\n findings.append({\n \"check\": \"TLS enabled\",\n \"status\": \"PASS\",\n \"severity\": \"INFO\",\n })\n else:\n findings.append({\n \"check\": \"TLS enabled\",\n \"status\": \"FAIL\",\n \"severity\": \"HIGH\",\n \"detail\": \"TAXII server not using HTTPS\",\n })\n\n # Check response time\n try:\n resp = requests.get(base_url, timeout=10)\n response_time = resp.elapsed.total_seconds()\n if response_time > 5:\n findings.append({\n \"check\": \"Response time\",\n \"status\": \"WARN\",\n \"severity\": \"MEDIUM\",\n \"detail\": f\"{response_time:.2f}s (slow)\",\n })\n else:\n findings.append({\n \"check\": \"Response time\",\n \"status\": \"PASS\",\n \"severity\": \"INFO\",\n \"detail\": f\"{response_time:.2f}s\",\n })\n except requests.RequestException:\n pass\n\n return findings\n\n\ndef format_summary(all_findings, base_url):\n \"\"\"Print audit summary.\"\"\"\n print(f\"\\n{'='*60}\")\n print(f\" OpenTAXII Server Audit Report\")\n print(f\"{'='*60}\")\n print(f\" Server : {base_url}\")\n print(f\" Findings : {len(all_findings)}\")\n\n pass_count = sum(1 for f in all_findings if f[\"status\"] == \"PASS\")\n fail_count = sum(1 for f in all_findings if f[\"status\"] == \"FAIL\")\n print(f\" Passed : {pass_count}\")\n print(f\" Failed : {fail_count}\")\n\n for f in all_findings:\n icon = \"OK\" if f[\"status\"] == \"PASS\" else \"!!\" if f[\"status\"] == \"FAIL\" else \"~~\"\n print(f\" [{icon}] {f['check']}: {f.get('detail', '')[:50]}\")\n\n severity_counts = {}\n for f in all_findings:\n sev = f.get(\"severity\", \"INFO\")\n severity_counts[sev] = severity_counts.get(sev, 0) + 1\n return severity_counts\n\n\ndef main():\n parser = argparse.ArgumentParser(description=\"OpenTAXII server audit agent\")\n parser.add_argument(\"--url\", required=True, help=\"TAXII server base URL\")\n parser.add_argument(\"--username\", help=\"Authentication username\")\n parser.add_argument(\"--password\", help=\"Authentication password\")\n parser.add_argument(\"--version\", choices=[\"1.1\", \"2.0\", \"2.1\"], default=\"2.1\")\n parser.add_argument(\"--output\", \"-o\", help=\"Output JSON report\")\n parser.add_argument(\"--verbose\", \"-v\", action=\"store_true\")\n args = parser.parse_args()\n\n all_findings = []\n all_findings.extend(check_server_health(args.url))\n\n if args.version == \"1.1\":\n all_findings.extend(check_taxii1_discovery(args.url, args.username, args.password))\n else:\n all_findings.extend(check_taxii2_discovery(args.url, args.username, args.password, args.version))\n\n severity_counts = format_summary(all_findings, args.url)\n\n report = {\n \"timestamp\": datetime.now(timezone.utc).isoformat(),\n \"tool\": \"OpenTAXII Audit\",\n \"server\": args.url,\n \"version\": args.version,\n \"findings\": all_findings,\n \"severity_counts\": severity_counts,\n }\n\n if args.output:\n with open(args.output, \"w\") as f:\n json.dump(report, f, indent=2)\n print(f\"\\n[+] Report saved to {args.output}\")\n elif args.verbose:\n print(json.dumps(report, indent=2))\n\n\nif __name__ == \"__main__\":\n main()\n","content_type":"text/x-python; charset=utf-8","language":"python","size":8729,"content_sha256":"23d622156fcca4117fc21dac67cef90e4c256a1c3db3a602d2f8fdc828b08227"}],"content_json":{"type":"doc","content":[{"type":"heading","attrs":{"level":1},"content":[{"text":"Implementing TAXII Server with OpenTAXII","type":"text"}]},{"type":"heading","attrs":{"level":2},"content":[{"text":"Overview","type":"text"}]},{"type":"paragraph","content":[{"text":"TAXII (Trusted Automated eXchange of Intelligence Information) is an OASIS standard protocol for exchanging cyber threat intelligence over HTTPS. OpenTAXII is an open-source TAXII server implementation by EclecticIQ that supports TAXII 1.x, while the OASIS cti-taxii-server provides a TAXII 2.1 reference implementation. This skill covers deploying a TAXII server, configuring collections for threat intelligence feeds, publishing STIX 2.1 bundles, and integrating with SIEM/SOAR platforms for automated indicator ingestion.","type":"text"}]},{"type":"heading","attrs":{"level":2},"content":[{"text":"When to Use","type":"text"}]},{"type":"bullet_list","content":[{"type":"list_item","content":[{"type":"paragraph","content":[{"text":"When deploying or configuring implementing taxii server with opentaxii capabilities in your environment","type":"text"}]}]},{"type":"list_item","content":[{"type":"paragraph","content":[{"text":"When establishing security controls aligned to compliance requirements","type":"text"}]}]},{"type":"list_item","content":[{"type":"paragraph","content":[{"text":"When building or improving security architecture for this domain","type":"text"}]}]},{"type":"list_item","content":[{"type":"paragraph","content":[{"text":"When conducting security assessments that require this implementation","type":"text"}]}]}]},{"type":"heading","attrs":{"level":2},"content":[{"text":"Prerequisites","type":"text"}]},{"type":"bullet_list","content":[{"type":"list_item","content":[{"type":"paragraph","content":[{"text":"Python 3.9+ with ","type":"text"},{"text":"medallion","type":"text","marks":[{"type":"code_inline"}]},{"text":", ","type":"text"},{"text":"stix2","type":"text","marks":[{"type":"code_inline"}]},{"text":", ","type":"text"},{"text":"taxii2-client","type":"text","marks":[{"type":"code_inline"}]},{"text":", ","type":"text"},{"text":"opentaxii","type":"text","marks":[{"type":"code_inline"}]},{"text":", ","type":"text"},{"text":"cabby","type":"text","marks":[{"type":"code_inline"}]},{"text":" libraries","type":"text"}]}]},{"type":"list_item","content":[{"type":"paragraph","content":[{"text":"Docker and Docker Compose for containerized deployment","type":"text"}]}]},{"type":"list_item","content":[{"type":"paragraph","content":[{"text":"Understanding of STIX 2.1 objects (Indicator, Malware, Attack Pattern, Relationship)","type":"text"}]}]},{"type":"list_item","content":[{"type":"paragraph","content":[{"text":"Familiarity with REST APIs and HTTPS configuration","type":"text"}]}]},{"type":"list_item","content":[{"type":"paragraph","content":[{"text":"TLS certificates for production deployment","type":"text"}]}]}]},{"type":"heading","attrs":{"level":2},"content":[{"text":"Key Concepts","type":"text"}]},{"type":"heading","attrs":{"level":3},"content":[{"text":"TAXII 2.1 Architecture","type":"text"}]},{"type":"paragraph","content":[{"text":"TAXII 2.1 defines three services: Discovery (find available API roots), API Root (entry point for collections), and Collections (repositories of CTI objects). Collections support two access models: the Collection endpoint allows consumers to poll for objects, and the Status endpoint tracks the result of add operations. TAXII uses HTTP content negotiation with ","type":"text"},{"text":"application/taxii+json;version=2.1","type":"text","marks":[{"type":"code_inline"}]},{"text":".","type":"text"}]},{"type":"heading","attrs":{"level":3},"content":[{"text":"Sharing Models","type":"text"}]},{"type":"paragraph","content":[{"text":"TAXII supports hub-and-spoke (central server distributes to consumers), peer-to-peer (bidirectional sharing between partners), and source-subscriber (producer publishes, consumers subscribe) models. Each collection can have read-only, write-only, or read-write access controls.","type":"text"}]},{"type":"heading","attrs":{"level":3},"content":[{"text":"STIX 2.1 Content","type":"text"}]},{"type":"paragraph","content":[{"text":"TAXII transports STIX 2.1 bundles containing Structured Threat Information objects: Indicators (detection patterns), Observed Data, Malware, Attack Patterns, Threat Actors, Intrusion Sets, Campaigns, Relationships, and Sightings. Each object has a unique STIX ID, creation/modification timestamps, and optional TLP marking definitions.","type":"text"}]},{"type":"heading","attrs":{"level":2},"content":[{"text":"Workflow","type":"text"}]},{"type":"heading","attrs":{"level":3},"content":[{"text":"Step 1: Deploy TAXII 2.1 Server with Medallion","type":"text"}]},{"type":"code_block","attrs":{"wrap":false,"language":"python"},"content":[{"text":"# Install medallion (OASIS reference implementation)\n# pip install medallion\n\n# medallion_config.json\nimport json\n\nconfig = {\n \"backend\": {\n \"module_class\": \"MemoryBackend\",\n \"filename\": \"taxii_data.json\"\n },\n \"users\": {\n \"admin\": \"admin_password_change_me\",\n \"analyst\": \"analyst_password_change_me\",\n \"readonly\": \"readonly_password_change_me\"\n },\n \"taxii\": {\n \"max_content_length\": 10485760\n }\n}\n\n# Create initial data store\ntaxii_data = {\n \"discovery\": {\n \"title\": \"Threat Intelligence TAXII Server\",\n \"description\": \"TAXII 2.1 server for sharing CTI indicators\",\n \"contact\": \"[email protected]\",\n \"default\": \"https://taxii.organization.com/api/\",\n \"api_roots\": [\"https://taxii.organization.com/api/\"]\n },\n \"api_roots\": {\n \"api\": {\n \"title\": \"Threat Intelligence API Root\",\n \"description\": \"Primary API root for threat intelligence sharing\",\n \"versions\": [\"application/taxii+json;version=2.1\"],\n \"max_content_length\": 10485760,\n \"collections\": {\n \"malware-iocs\": {\n \"id\": \"91a7b528-80eb-42ed-a74d-c6fbd5a26116\",\n \"title\": \"Malware IOCs\",\n \"description\": \"Indicators of compromise from malware analysis\",\n \"can_read\": True,\n \"can_write\": True,\n \"media_types\": [\"application/stix+json;version=2.1\"]\n },\n \"apt-intelligence\": {\n \"id\": \"52892447-4d7e-4f70-b94a-5460e242dd23\",\n \"title\": \"APT Intelligence\",\n \"description\": \"Advanced persistent threat group intelligence\",\n \"can_read\": True,\n \"can_write\": True,\n \"media_types\": [\"application/stix+json;version=2.1\"]\n },\n \"phishing-indicators\": {\n \"id\": \"64993447-4d7e-4f70-b94a-5460e242ee34\",\n \"title\": \"Phishing Indicators\",\n \"description\": \"Phishing URLs, domains, and email indicators\",\n \"can_read\": True,\n \"can_write\": True,\n \"media_types\": [\"application/stix+json;version=2.1\"]\n }\n }\n }\n }\n}\n\nwith open(\"medallion_config.json\", \"w\") as f:\n json.dump(config, f, indent=2)\nwith open(\"taxii_data.json\", \"w\") as f:\n json.dump(taxii_data, f, indent=2)\nprint(\"[+] TAXII server configuration created\")","type":"text"}]},{"type":"heading","attrs":{"level":3},"content":[{"text":"Step 2: Docker Deployment","type":"text"}]},{"type":"code_block","attrs":{"wrap":false,"language":"yaml"},"content":[{"text":"# docker-compose.yml\nversion: '3.8'\nservices:\n taxii-server:\n image: python:3.11-slim\n container_name: taxii-server\n working_dir: /app\n volumes:\n - ./medallion_config.json:/app/medallion_config.json\n - ./taxii_data.json:/app/taxii_data.json\n - ./certs:/app/certs\n ports:\n - \"6100:6100\"\n command: >\n bash -c \"pip install medallion &&\n medallion --host 0.0.0.0 --port 6100\n --config /app/medallion_config.json\"\n restart: unless-stopped\n healthcheck:\n test: [\"CMD\", \"curl\", \"-f\", \"http://localhost:6100/taxii2/\"]\n interval: 30s\n timeout: 10s\n retries: 3","type":"text"}]},{"type":"heading","attrs":{"level":3},"content":[{"text":"Step 3: Publish STIX 2.1 Objects to Collections","type":"text"}]},{"type":"code_block","attrs":{"wrap":false,"language":"python"},"content":[{"text":"from stix2 import Indicator, Malware, Relationship, Bundle, TLP_WHITE\nfrom taxii2client.v21 import Server, Collection, as_pages\nimport json\nfrom datetime import datetime\n\nclass TAXIIPublisher:\n def __init__(self, server_url, username, password):\n self.server = Server(\n server_url,\n user=username,\n password=password,\n )\n\n def list_collections(self):\n \"\"\"List all available collections.\"\"\"\n api_root = self.server.api_roots[0]\n for collection in api_root.collections:\n print(f\" [{collection.id}] {collection.title} \"\n f\"(read={collection.can_read}, write={collection.can_write})\")\n return api_root.collections\n\n def publish_indicators(self, collection_id, indicators):\n \"\"\"Publish STIX indicators to a TAXII collection.\"\"\"\n api_root = self.server.api_roots[0]\n collection = Collection(\n f\"{api_root.url}collections/{collection_id}/\",\n user=self.server._user,\n password=self.server._password,\n )\n bundle = Bundle(objects=indicators)\n response = collection.add_objects(bundle.serialize())\n print(f\"[+] Published {len(indicators)} objects to {collection_id}\")\n print(f\" Status: {response.status}\")\n return response\n\n def create_malware_indicators(self):\n \"\"\"Create sample STIX malware indicators.\"\"\"\n malware = Malware(\n name=\"SUNBURST\",\n description=\"Backdoor used in SolarWinds supply chain attack (2020). \"\n \"Trojanized SolarWinds.Orion.Core.BusinessLayer.dll module.\",\n malware_types=[\"backdoor\", \"trojan\"],\n is_family=True,\n object_marking_refs=[TLP_WHITE],\n )\n\n indicator_hash = Indicator(\n name=\"SUNBURST SHA-256 Hash\",\n description=\"SHA-256 hash of trojanized SolarWinds Orion DLL\",\n pattern=\"[file:hashes.'SHA-256' = \"\n \"'32519b85c0b422e4656de6e6c41878e95fd95026267daab4215ee59c107d6c77']\",\n pattern_type=\"stix\",\n valid_from=datetime(2020, 12, 13),\n indicator_types=[\"malicious-activity\"],\n object_marking_refs=[TLP_WHITE],\n )\n\n indicator_domain = Indicator(\n name=\"SUNBURST C2 Domain Pattern\",\n description=\"DGA domain pattern used by SUNBURST for C2\",\n pattern=\"[domain-name:value MATCHES \"\n \"'^[a-z0-9]{4,}\\\\.appsync-api\\\\..*\\\\.avsvmcloud\\\\.com

Implementing TAXII Server with OpenTAXII Overview TAXII (Trusted Automated eXchange of Intelligence Information) is an OASIS standard protocol for exchanging cyber threat intelligence over HTTPS. OpenTAXII is an open-source TAXII server implementation by EclecticIQ that supports TAXII 1.x, while the OASIS cti-taxii-server provides a TAXII 2.1 reference implementation. This skill covers deploying a TAXII server, configuring collections for threat intelligence feeds, publishing STIX 2.1 bundles, and integrating with SIEM/SOAR platforms for automated indicator ingestion. When to Use - When deplo…

]\",\n pattern_type=\"stix\",\n valid_from=datetime(2020, 12, 13),\n indicator_types=[\"malicious-activity\"],\n object_marking_refs=[TLP_WHITE],\n )\n\n rel = Relationship(\n relationship_type=\"indicates\",\n source_ref=indicator_hash.id,\n target_ref=malware.id,\n )\n\n return [malware, indicator_hash, indicator_domain, rel]\n\npublisher = TAXIIPublisher(\n \"https://taxii.organization.com/taxii2/\",\n \"admin\", \"admin_password_change_me\"\n)\ncollections = publisher.list_collections()\nindicators = publisher.create_malware_indicators()\npublisher.publish_indicators(\"91a7b528-80eb-42ed-a74d-c6fbd5a26116\", indicators)","type":"text"}]},{"type":"heading","attrs":{"level":3},"content":[{"text":"Step 4: Consume Intelligence from TAXII Collections","type":"text"}]},{"type":"code_block","attrs":{"wrap":false,"language":"python"},"content":[{"text":"from taxii2client.v21 import Server, Collection, as_pages\nimport json\n\nclass TAXIIConsumer:\n def __init__(self, server_url, username, password):\n self.server = Server(server_url, user=username, password=password)\n\n def poll_collection(self, collection_id, added_after=None):\n \"\"\"Poll a collection for new STIX objects.\"\"\"\n api_root = self.server.api_roots[0]\n collection = Collection(\n f\"{api_root.url}collections/{collection_id}/\",\n user=self.server._user,\n password=self.server._password,\n )\n\n kwargs = {}\n if added_after:\n kwargs[\"added_after\"] = added_after\n\n all_objects = []\n for bundle in as_pages(collection.get_objects, per_request=50, **kwargs):\n objects = json.loads(bundle).get(\"objects\", [])\n all_objects.extend(objects)\n\n indicators = [o for o in all_objects if o.get(\"type\") == \"indicator\"]\n malware = [o for o in all_objects if o.get(\"type\") == \"malware\"]\n relationships = [o for o in all_objects if o.get(\"type\") == \"relationship\"]\n\n print(f\"[+] Polled {len(all_objects)} objects: \"\n f\"{len(indicators)} indicators, {len(malware)} malware, \"\n f\"{len(relationships)} relationships\")\n return all_objects\n\n def extract_iocs_for_siem(self, stix_objects):\n \"\"\"Extract IOCs from STIX objects for SIEM ingestion.\"\"\"\n iocs = []\n for obj in stix_objects:\n if obj.get(\"type\") == \"indicator\":\n pattern = obj.get(\"pattern\", \"\")\n iocs.append({\n \"id\": obj.get(\"id\"),\n \"name\": obj.get(\"name\", \"\"),\n \"pattern\": pattern,\n \"valid_from\": obj.get(\"valid_from\", \"\"),\n \"indicator_types\": obj.get(\"indicator_types\", []),\n \"confidence\": obj.get(\"confidence\", 0),\n })\n return iocs\n\nconsumer = TAXIIConsumer(\n \"https://taxii.organization.com/taxii2/\",\n \"analyst\", \"analyst_password_change_me\"\n)\nobjects = consumer.poll_collection(\"91a7b528-80eb-42ed-a74d-c6fbd5a26116\")\niocs = consumer.extract_iocs_for_siem(objects)","type":"text"}]},{"type":"heading","attrs":{"level":3},"content":[{"text":"Step 5: Integrate with SIEM/SOAR","type":"text"}]},{"type":"code_block","attrs":{"wrap":false,"language":"python"},"content":[{"text":"import requests\n\ndef push_to_splunk(iocs, splunk_url, hec_token):\n \"\"\"Push extracted IOCs to Splunk via HEC.\"\"\"\n headers = {\"Authorization\": f\"Splunk {hec_token}\"}\n for ioc in iocs:\n event = {\n \"event\": ioc,\n \"sourcetype\": \"stix:indicator\",\n \"source\": \"taxii-server\",\n \"index\": \"threat_intel\",\n }\n resp = requests.post(\n f\"{splunk_url}/services/collector/event\",\n headers=headers,\n json=event,\n verify=not os.environ.get(\"SKIP_TLS_VERIFY\", \"\").lower() == \"true\", # Set SKIP_TLS_VERIFY=true for self-signed certs in lab environments\n )\n if resp.status_code != 200:\n print(f\"[-] Splunk HEC error: {resp.text}\")\n print(f\"[+] Pushed {len(iocs)} IOCs to Splunk\")\n\ndef push_to_elasticsearch(iocs, es_url, index=\"threat-intel\"):\n \"\"\"Push IOCs to Elasticsearch.\"\"\"\n for ioc in iocs:\n resp = requests.post(\n f\"{es_url}/{index}/_doc\",\n json=ioc,\n headers={\"Content-Type\": \"application/json\"},\n )\n if resp.status_code not in (200, 201):\n print(f\"[-] ES error: {resp.text}\")\n print(f\"[+] Indexed {len(iocs)} IOCs in Elasticsearch\")","type":"text"}]},{"type":"heading","attrs":{"level":2},"content":[{"text":"Validation Criteria","type":"text"}]},{"type":"bullet_list","content":[{"type":"list_item","content":[{"type":"paragraph","content":[{"text":"TAXII 2.1 server deployed and accessible via HTTPS","type":"text"}]}]},{"type":"list_item","content":[{"type":"paragraph","content":[{"text":"Collections created with appropriate read/write permissions","type":"text"}]}]},{"type":"list_item","content":[{"type":"paragraph","content":[{"text":"STIX 2.1 bundles published successfully to collections","type":"text"}]}]},{"type":"list_item","content":[{"type":"paragraph","content":[{"text":"Consumer can poll and retrieve objects with filtering","type":"text"}]}]},{"type":"list_item","content":[{"type":"paragraph","content":[{"text":"IOCs extracted and forwarded to SIEM platform","type":"text"}]}]},{"type":"list_item","content":[{"type":"paragraph","content":[{"text":"Authentication and authorization enforced correctly","type":"text"}]}]}]},{"type":"heading","attrs":{"level":2},"content":[{"text":"References","type":"text"}]},{"type":"bullet_list","content":[{"type":"list_item","content":[{"type":"paragraph","content":[{"text":"TAXII 2.1 Specification","type":"text","marks":[{"type":"link","attrs":{"href":"https://docs.oasis-open.org/cti/taxii/v2.1/os/taxii-v2.1-os.html","title":null}}]}]}]},{"type":"list_item","content":[{"type":"paragraph","content":[{"text":"OASIS CTI Documentation","type":"text","marks":[{"type":"link","attrs":{"href":"https://oasis-open.github.io/cti-documentation/","title":null}}]}]}]},{"type":"list_item","content":[{"type":"paragraph","content":[{"text":"EclecticIQ OpenTAXII","type":"text","marks":[{"type":"link","attrs":{"href":"https://www.eclecticiq.com/open-source","title":null}}]}]}]},{"type":"list_item","content":[{"type":"paragraph","content":[{"text":"cti-taxii-server (Medallion)","type":"text","marks":[{"type":"link","attrs":{"href":"https://github.com/oasis-open/cti-taxii-server","title":null}}]}]}]},{"type":"list_item","content":[{"type":"paragraph","content":[{"text":"taxii2-client Python Library","type":"text","marks":[{"type":"link","attrs":{"href":"https://github.com/oasis-open/cti-taxii-client","title":null}}]}]}]},{"type":"list_item","content":[{"type":"paragraph","content":[{"text":"Kraven Security: STIX/TAXII Complete Guide","type":"text","marks":[{"type":"link","attrs":{"href":"https://kravensecurity.com/stix-and-taxii-a-full-guide/","title":null}}]}]}]}]},{"type":"hr","attrs":{"markup":"---"}}]},"metadata":{"date":"2026-06-05","name":"implementing-taxii-server-with-opentaxii","tags":["taxii","stix","opentaxii","threat-sharing","cti","indicator-exchange","taxii-server","automation"],"author":"@skillopedia","domain":"cybersecurity","source":{"stars":13207,"repo_name":"anthropic-cybersecurity-skills","origin_url":"https://github.com/mukul975/anthropic-cybersecurity-skills/blob/HEAD/skills/implementing-taxii-server-with-opentaxii/SKILL.md","repo_owner":"mukul975","body_sha256":"0ef8c2eeab4b8d276cad0dfb5dc6c838aa7b98f2d86de3a513872b1a94321587","cluster_key":"5aee30ff4eaf99c0c145c3c0be351d26cba4a54d76d8242e509a7bfc668f5c3d","clean_bundle":{"format":"clean-skill-bundle-v1","source":"mukul975/anthropic-cybersecurity-skills/skills/implementing-taxii-server-with-opentaxii/SKILL.md","attachments":[{"id":"589882e2-fcfd-584b-a382-95732fa9c908","key":"uploads/10433ee7-ad12-4ae0-b34e-97553e46c6c8/589882e2-fcfd-584b-a382-95732fa9c908/attachment.md","path":"references/api-reference.md","size":4332,"sha256":"ba86d9d17e6f60fb540fcc4e9928e1d5a90fd263481e32e034b55bcd69dc4a48","contentType":"text/markdown; charset=utf-8"},{"id":"6a78788f-ff2d-5359-9f16-6dff31196a7d","key":"uploads/10433ee7-ad12-4ae0-b34e-97553e46c6c8/6a78788f-ff2d-5359-9f16-6dff31196a7d/attachment.py","path":"scripts/agent.py","size":8729,"sha256":"23d622156fcca4117fc21dac67cef90e4c256a1c3db3a602d2f8fdc828b08227","contentType":"text/x-python; charset=utf-8"}],"bundle_sha256":"d893b28ee26547dd2e9cd47f52e056399a7edf710aa9d290e1cc746c0f35f284","attachment_count":2,"text_attachments":2,"attachment_storage":"skillopedia-attachments-v1","binary_attachments":0,"excluded_attachments":[]},"cluster_size":1,"skill_md_path":"skills/implementing-taxii-server-with-opentaxii/SKILL.md","import_metadata":{"date":"2026-06-05","author":"@skillopedia","version":"v1","category":"security","category_label":"Security"},"exact_dupes_collapsed_into_this":0},"license":"Apache-2.0","version":"v1","category":"security","nist_csf":["ID.RA-01","ID.RA-05","DE.CM-01","DE.AE-02"],"subdomain":"threat-intelligence","import_tag":"clean-skills-v1","description":"Deploy and configure an OpenTAXII server to share and consume STIX-formatted cyber threat intelligence using the TAXII 2.1 protocol for automated indicator exchange between organizations."}},"renderedAt":1782981927806}

Implementing TAXII Server with OpenTAXII Overview TAXII (Trusted Automated eXchange of Intelligence Information) is an OASIS standard protocol for exchanging cyber threat intelligence over HTTPS. OpenTAXII is an open-source TAXII server implementation by EclecticIQ that supports TAXII 1.x, while the OASIS cti-taxii-server provides a TAXII 2.1 reference implementation. This skill covers deploying a TAXII server, configuring collections for threat intelligence feeds, publishing STIX 2.1 bundles, and integrating with SIEM/SOAR platforms for automated indicator ingestion. When to Use - When deplo…