OpenCTI integration with SIEM (Splunk)
OpenCTI integration with Splunk
Using OpenCTI Add-on for Splunk
OpenCTI Add-on for Splunk enables Splunk users to interconnect Splunk With OpenCTI and to leverage threat information to improce detection capabilities and response to security incidents.
Features
Ability to ingest Indicator exposed through an OpenCTI live stream data.
Ability to trigger OpenCTI actions in response of Alert and to investigate them directly in OpenCTI
Events format
The Live Stream feature in OpenCTI simplifies the use of real-time data streams through connectors. Users can easily create a data stream with specific filters right in the user interface (UI), and access this stream through the /stream/{STREAM_ID}
path.
id: {Event stream id} -> Like 1620249512318-0
event: {Event type} -> create / update / delete
data: { -> The complete event data
version -> The version number of the event
type -> The inner type of the event
scope -> The scope of the event [internal or external]
data: {STIX data} -> The STIX representation of the data.
message -> A simple string to easy understand the event
origin: {Data Origin} -> Complex object with different information about the origin of the event
context: {Event context} -> Complex object with meta information depending of the event type
}
For example raw log:
id: 1752727242987-0
event: update
data: {
"version": "4",
"type": "update",
"scope": "external",
"message": "replaces `Agressive IP known malicious on AbuseIPDB - countryCode: KR - abuseConfidenceScore: 100 - lastReportedAt: 2025-07-17T02:56:48+00:00` in `Description`",
"origin": {
"socket": "query",
"ip": "::ffff:10.0.2.4",
"user_id": "88ec0c6a-13ce-5e39-b486-354fe4a7084f",
"group_ids": [
"31a99f81-cb65-4bb3-8aaa-ed8f8c3680cb"
],
"organization_ids": [],
"user_metadata": {},
"applicant_id": "88ec0c6a-13ce-5e39-b486-354fe4a7084f",
"call_retry_number": "0"
},
"data": {
"id": "indicator--64262f4b-d143-5740-89b4-bd7589e94b39",
"spec_version": "2.1",
"type": "indicator",
"extensions": {
"extension-definition--ea279b3e-5c71-4632-ac08-831c66a786ba": {
"extension_type": "property-extension",
"id": "88ca01ed-1352-4cc5-8a6b-088b212d16bf",
"type": "Indicator",
"created_at": "2025-06-30T01:51:38.534Z",
"updated_at": "2025-07-17T04:40:42.937Z",
"is_inferred": false,
"creator_ids": [
"88ec0c6a-13ce-5e39-b486-354fe4a7084f"
],
"labels_ids": [
"1f253634-c047-46a5-89f4-afad63f75dce"
],
"created_by_ref_id": "3ea86573-9b53-4fb0-bc98-f09034016dfe",
"detection": false,
"score": 100,
"main_observable_type": "IPv4-Addr",
"observable_values": [
{
"type": "IPv4-Addr",
"value": "211.248.172.135"
}
]
},
"extension-definition--322b8f77-262a-4cb8-a915-1e441e00329b": {
"extension_type": "property-extension"
}
},
"created": "2025-06-30T01:44:23.594Z",
"modified": "2025-07-17T04:40:42.937Z",
"revoked": false,
"confidence": 100,
"lang": "en",
"labels": [
"osint:source-type=\"block-or-filter-list\""
],
"object_marking_refs": [
"marking-definition--613f2e26-407d-48c7-9eca-b8e91df99dc9"
],
"created_by_ref": "identity--ecab27e5-47a3-5d28-8ba1-bba10f4b6dbb",
"external_references": [
{
"source_name": "AbuseIPDB database",
"description": "AbuseIPDB database URL",
"url": "https://www.abuseipdb.com/"
}
],
"name": "211.248.172.135",
"description": "Agressive IP known malicious on AbuseIPDB - countryCode: KR - abuseConfidenceScore: 100 - lastReportedAt: 2025-07-17T02:56:48+00:00",
"pattern": "[ipv4-addr:value = '211.248.172.135']",
"pattern_type": "stix",
"pattern_version": "2.1",
"valid_from": "2025-07-16T03:08:10.196Z",
"valid_until": "2025-08-17T15:41:54.518Z"
},
"context": {
"patch": [
{
"op": "replace",
"path": "/description",
"value": "Agressive IP known malicious on AbuseIPDB - countryCode: KR - abuseConfidenceScore: 100 - lastReportedAt: 2025-07-17T02:56:48+00:00"
},
{
"op": "replace",
"path": "/modified",
"value": "2025-07-17T04:40:42.937Z"
},
{
"op": "replace",
"path": "/extensions/extension-definition--ea279b3e-5c71-4632-ac08-831c66a786ba/updated_at",
"value": "2025-07-17T04:40:42.937Z"
}
],
"reverse_patch": [
{
"op": "replace",
"path": "/description",
"value": "Agressive IP known malicious on AbuseIPDB - countryCode: KR - abuseConfidenceScore: 100 - lastReportedAt: 2025-07-16T01:44:05+00:00"
},
{
"op": "replace",
"path": "/modified",
"value": "2025-07-16T04:18:18.630Z"
},
{
"op": "replace",
"path": "/extensions/extension-definition--ea279b3e-5c71-4632-ac08-831c66a786ba/updated_at",
"value": "2025-07-16T04:18:18.630Z"
}
],
"related_restrictions": {
"markings": []
},
"pir_ids": []
},
"event_id": "1752727242937-0"
}
Configuration
Before configuring the App, we need create a new livestream data suitable for your organization.

Live stream data status displayed below. You can Start, Stop and Delete stream data.

Add-on configure, navigate to Splunk Web and open OpenCTI add-on for Splunk
and navigate to Configuration
page.
OpenCTI URL
The URL of the OpenCTI platform (A HTTPS connection is required
The URL of the OpenCTI platform (A HTTPS connection is required
The API Token of the previously created user

OpenCTI Indicators Inputs Configuration, “OpenCTI Add-On for Splunk” enables Splunk to be feed with indicators exposed through a live stream. To do this, the add-on implements and manages Splunk modular inputs. Indicators are stored in a dedicated kvstore named “opencti_indicators”. A default lookup definition named "opencti_lookup" is also implemented to facilitate indicator management.
Name
Unique name for the input being configured
Interval
Time interval of input in seconds. Leave as default (0) to allow continuous execution of the ingestion process
Index
The index that the data will be stored in (default)
Stream Id
The Live Stream ID of the OpenCTI stream to consume
Import from
The number of days to go back for the initial data collection (default: 30) (optional)
Once the Input parameter have been correctly configured click add

We can also consult the Indicators Dashboard
which gives an overview of the data ingested

Easy query splunk query SPL with data from kvstore OpenCTI provide indicator.

Example search with IOCs. Rule Splunk:
| tstats `security_content_summariesonly` count min(_time) as firstTime max(_time) as lastTime
from datamodel=Endpoint.Processes
where (Processes.process_name="mimikatz.exe" OR Processes.original_file_name="mimikatz.exe")
by Processes.action Processes.dest Processes.original_file_name Processes.parent_process
Processes.parent_process_exec Processes.parent_process_guid Processes.parent_process_id
Processes.parent_process_name Processes.parent_process_path Processes.process Processes.process_exec
Processes.process_guid Processes.process_hash Processes.process_id
Processes.process_integrity_level Processes.process_name Processes.process_path
Processes.user Processes.user_id Processes.vendor_product
| `drop_dm_object_name(Processes)`
| `security_content_ctime(firstTime)`
| `security_content_ctime(lastTime)`
| `windows_mimikatz_binary_execution_filter`
| rex field=process_hash "SHA256=(?<process_sha256>[A-F0-9]+)"
| eval process_sha256=lower(process_sha256)
| lookup opencti_lookup value AS process_sha256 OUTPUT value, labels AS Tag, created_by AS Author_CTI
| table action, dest, process_sha256, process_exec, Tag, Author_CTI
By entering the Tag, it is easy to identify Hash files related to APT groups or Malware Type, etc.


Using OpenCTI GraphQL API + Splunk HEC
GraphQL is used as the query language. GraphQL allows users to flexibly select the required data fields and filter by specific conditions such as entity type, relationship type, creation time, confidence level, etc.
Some more important STIX naming shortcuts are:
STIX Domain Objects (SDO): Attack Patterns, Malware, Threat Actors, etc.
STIX Cyber Observable (SCO): IP Addresses, domain names, hashes, etc.
STIX Relationship Object (SRO): Relationships, Sightings

OpenCTI Data Model: OpenCTI builds the entire CTI system on the STIX 2.1 model, but also extends it to support real-world use cases such as:
Non-STIX entities: Narrative, Channel, Event
Extended relationships such as: amplifies, publishes, drops
This model is implemented as a knowledge graph. Helps to express tight connection and flexible query between objects.
For this way, first we need create a new Splunk HEC receive data. Navigate Settings > Data Inputs > HTTP Event Collector > Add new

Next, we select source type, index already exist or create new.

Recheck a little information and submit. Here, in the tab HTTP Event Collector you can set any port.

Now, we can check my token in HTTP Event Collector you just created.

Example GraphQL for usecase get relationship locations
- malware
- indicators
.
query {
locations {
edges {
node {
id
name
stixCoreRelationships(
fromTypes: "Malware"
relationship_type: "targets"
) {
edges {
node {
from {
... on Malware {
id
name
description
stixCoreRelationships(
fromTypes: "Indicator"
relationship_type: "indicates"
orderBy: created
orderMode: desc
first: 10
) {
edges {
node {
from {
... on Indicator {
id
name
pattern
valid_from
indicator_types
}
}
}
}
}
}
}
}
}
}
}
}
}
}
You can import parameter

Example python script for:
Get data from OpenCTI ( relation Location -> Malware -> Indicator)
Send data into Splunk (Splunk HEC) with suitable format (index=opencti sourcetype=opencti:relationships)
import requests
import json
import time
import urllib3
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
OPENCTI_URL = "https://ip-opencti/graphql"
OPENCTI_TOKEN = "you_token"
SPLUNK_HEC_URL = "https://ip-splunk:8088/services/collector"
SPLUNK_HEC_TOKEN = "you_token"
SPLUNK_SOURCETYPE = "opencti:relationships"
SPLUNK_INDEX = "opencti"
VERIFY_SSL = False
query = """
query {
locations {
edges {
node {
id
name
stixCoreRelationships(
fromTypes: "Malware"
relationship_type: "targets"
) {
edges {
node {
from {
... on Malware {
id
name
description
stixCoreRelationships(
fromTypes: "Indicator"
relationship_type: "indicates"
orderBy: created
orderMode: desc
first: 10
) {
edges {
node {
from {
... on Indicator {
id
name
pattern
valid_from
indicator_types
}
}
}
}
}
}
}
}
}
}
}
}
}
}
"""
def query_opencti():
headers = {"Authorization": f"Bearer {OPENCTI_TOKEN}"}
response = requests.post(
OPENCTI_URL,
json={"query": query},
headers=headers,
verify=VERIFY_SSL
)
if response.status_code != 200:
raise Exception(f"Failed to query OpenCTI: {response.status_code} - {response.text}")
return response.json()
def send_to_splunk(event_data):
headers = {
"Authorization": f"Splunk {SPLUNK_HEC_TOKEN}",
"Content-Type": "application/json"
}
response = requests.post(
SPLUNK_HEC_URL,
headers=headers,
data=json.dumps(event_data),
verify=VERIFY_SSL
)
if response.status_code != 200:
print(f"[!] Failed to send to Splunk: {response.status_code} - {response.text}")
else:
print(f"[+] Sent event to Splunk successfully.")
def main():
data = query_opencti()
for location_edge in data["data"]["locations"]["edges"]:
location = location_edge["node"]
location_id = location["id"]
location_name = location["name"]
for rel_edge in location.get("stixCoreRelationships", {}).get("edges", []):
malware = rel_edge["node"]["from"]
malware_id = malware["id"]
malware_name = malware.get("name", "N/A")
malware_desc = malware.get("description", "")
for ind_edge in malware.get("stixCoreRelationships", {}).get("edges", []):
indicator = ind_edge["node"]["from"]
event = {
"event": {
"location_id": location_id,
"location_name": location_name,
"malware_id": malware_id,
"malware_name": malware_name,
"malware_description": malware_desc,
"indicator_id": indicator["id"],
"indicator_name": indicator.get("name"),
"indicator_pattern": indicator.get("pattern"),
"indicator_valid_from": indicator.get("valid_from"),
"indicator_types": indicator.get("indicator_types"),
},
"time": int(time.time()),
"host": "opencti",
"source": "opencti:graphql",
"sourcetype": SPLUNK_SOURCETYPE,
"index": SPLUNK_INDEX
}
send_to_splunk(event)
if __name__ == "__main__":
main()
Switch Splunk Search & Reporting, based on the index=opencti
, we can see the data that OpenCTI has provided.

Example SPL query Indicator
and Malware
relate to Location
VietNam.
index="opencti" sourcetype="opencti:relationships" location_name="Viet Nam"
| table _time, index, indicator_name, indicator_pattern, malware_name, malware_description

Raw log
{
"location_id": "7679abc2-ee95-4f07-9552-e01fca12a92b",
"location_name": "Viet Nam",
"malware_id": "70296f1b-5a12-4282-9838-1957a87049a5",
"malware_name": "Emotet",
"malware_description": "[Emotet](https://attack.mitre.org/software/S0367) is a modular malware variant which is primarily used as a downloader for other malware variants such as [TrickBot](https://attack.mitre.org/software/S0266) and [IcedID](https://attack.mitre.org/software/S0483). Emotet first emerged in June 2014, initially targeting the financial sector, and has expanded to multiple verticals over time.(Citation: Trend Micro Banking Malware Jan 2019)",
"indicator_id": "93355348-32c7-4a62-9e76-0667ea29d4e1",
"indicator_name": "8704bb4e2b42e5f0d63abb4a0a3ea95d7e4f5165c96f2654b1e57af1b0937ffa",
"indicator_pattern": "[file:hashes.'SHA-256' = '8704bb4e2b42e5f0d63abb4a0a3ea95d7e4f5165c96f2654b1e57af1b0937ffa']",
"indicator_valid_from": "2023-03-25T21:46:49.000Z",
"indicator_types": null
}
Last updated
Was this helpful?