OpenCTI integration with SIEM (Splunk)

OpenCTI integration with Splunk

Using OpenCTI Add-on for Splunk

OpenCTI Add-on for Splunk enables Splunk users to interconnect Splunk With OpenCTI and to leverage threat information to improce detection capabilities and response to security incidents.

Features

Ability to ingest Indicator exposed through an OpenCTI live stream data.

Ability to trigger OpenCTI actions in response of Alert and to investigate them directly in OpenCTI

Events format

The Live Stream feature in OpenCTI simplifies the use of real-time data streams through connectors. Users can easily create a data stream with specific filters right in the user interface (UI), and access this stream through the /stream/{STREAM_ID} path.

id: {Event stream id} -> Like 1620249512318-0
event: {Event type} -> create / update / delete
data: { -> The complete event data
    version -> The version number of the event
    type -> The inner type of the event
    scope -> The scope of the event [internal or external]
    data: {STIX data} -> The STIX representation of the data.
    message -> A simple string to easy understand the event
    origin: {Data Origin} -> Complex object with different information about the origin of the event
    context: {Event context} -> Complex object with meta information depending of the event type
}

For example raw log:

id: 1752727242987-0
event: update
data: {
  "version": "4",
  "type": "update",
  "scope": "external",
  "message": "replaces `Agressive IP known malicious on AbuseIPDB - countryCode: KR - abuseConfidenceScore: 100 - lastReportedAt: 2025-07-17T02:56:48+00:00` in `Description`",
  "origin": {
    "socket": "query",
    "ip": "::ffff:10.0.2.4",
    "user_id": "88ec0c6a-13ce-5e39-b486-354fe4a7084f",
    "group_ids": [
      "31a99f81-cb65-4bb3-8aaa-ed8f8c3680cb"
    ],
    "organization_ids": [],
    "user_metadata": {},
    "applicant_id": "88ec0c6a-13ce-5e39-b486-354fe4a7084f",
    "call_retry_number": "0"
  },
  "data": {
    "id": "indicator--64262f4b-d143-5740-89b4-bd7589e94b39",
    "spec_version": "2.1",
    "type": "indicator",
    "extensions": {
      "extension-definition--ea279b3e-5c71-4632-ac08-831c66a786ba": {
        "extension_type": "property-extension",
        "id": "88ca01ed-1352-4cc5-8a6b-088b212d16bf",
        "type": "Indicator",
        "created_at": "2025-06-30T01:51:38.534Z",
        "updated_at": "2025-07-17T04:40:42.937Z",
        "is_inferred": false,
        "creator_ids": [
          "88ec0c6a-13ce-5e39-b486-354fe4a7084f"
        ],
        "labels_ids": [
          "1f253634-c047-46a5-89f4-afad63f75dce"
        ],
        "created_by_ref_id": "3ea86573-9b53-4fb0-bc98-f09034016dfe",
        "detection": false,
        "score": 100,
        "main_observable_type": "IPv4-Addr",
        "observable_values": [
          {
            "type": "IPv4-Addr",
            "value": "211.248.172.135"
          }
        ]
      },
      "extension-definition--322b8f77-262a-4cb8-a915-1e441e00329b": {
        "extension_type": "property-extension"
      }
    },
    "created": "2025-06-30T01:44:23.594Z",
    "modified": "2025-07-17T04:40:42.937Z",
    "revoked": false,
    "confidence": 100,
    "lang": "en",
    "labels": [
      "osint:source-type=\"block-or-filter-list\""
    ],
    "object_marking_refs": [
      "marking-definition--613f2e26-407d-48c7-9eca-b8e91df99dc9"
    ],
    "created_by_ref": "identity--ecab27e5-47a3-5d28-8ba1-bba10f4b6dbb",
    "external_references": [
      {
        "source_name": "AbuseIPDB database",
        "description": "AbuseIPDB database URL",
        "url": "https://www.abuseipdb.com/"
      }
    ],
    "name": "211.248.172.135",
    "description": "Agressive IP known malicious on AbuseIPDB - countryCode: KR - abuseConfidenceScore: 100 - lastReportedAt: 2025-07-17T02:56:48+00:00",
    "pattern": "[ipv4-addr:value = '211.248.172.135']",
    "pattern_type": "stix",
    "pattern_version": "2.1",
    "valid_from": "2025-07-16T03:08:10.196Z",
    "valid_until": "2025-08-17T15:41:54.518Z"
  },
  "context": {
    "patch": [
      {
        "op": "replace",
        "path": "/description",
        "value": "Agressive IP known malicious on AbuseIPDB - countryCode: KR - abuseConfidenceScore: 100 - lastReportedAt: 2025-07-17T02:56:48+00:00"
      },
      {
        "op": "replace",
        "path": "/modified",
        "value": "2025-07-17T04:40:42.937Z"
      },
      {
        "op": "replace",
        "path": "/extensions/extension-definition--ea279b3e-5c71-4632-ac08-831c66a786ba/updated_at",
        "value": "2025-07-17T04:40:42.937Z"
      }
    ],
    "reverse_patch": [
      {
        "op": "replace",
        "path": "/description",
        "value": "Agressive IP known malicious on AbuseIPDB - countryCode: KR - abuseConfidenceScore: 100 - lastReportedAt: 2025-07-16T01:44:05+00:00"
      },
      {
        "op": "replace",
        "path": "/modified",
        "value": "2025-07-16T04:18:18.630Z"
      },
      {
        "op": "replace",
        "path": "/extensions/extension-definition--ea279b3e-5c71-4632-ac08-831c66a786ba/updated_at",
        "value": "2025-07-16T04:18:18.630Z"
      }
    ],
    "related_restrictions": {
      "markings": []
    },
    "pir_ids": []
  },
  "event_id": "1752727242937-0"
}

Configuration

Before configuring the App, we need create a new livestream data suitable for your organization.

Live stream data status displayed below. You can Start, Stop and Delete stream data.

Add-on configure, navigate to Splunk Web and open OpenCTI add-on for Splunk and navigate to Configuration page.

Parameter
Description

OpenCTI URL

The URL of the OpenCTI platform (A HTTPS connection is required

The URL of the OpenCTI platform (A HTTPS connection is required

The API Token of the previously created user

OpenCTI Indicators Inputs Configuration, “OpenCTI Add-On for Splunk” enables Splunk to be feed with indicators exposed through a live stream. To do this, the add-on implements and manages Splunk modular inputs. Indicators are stored in a dedicated kvstore named “opencti_indicators”. A default lookup definition named "opencti_lookup" is also implemented to facilitate indicator management.

Parameter
Description

Name

Unique name for the input being configured

Interval

Time interval of input in seconds. Leave as default (0) to allow continuous execution of the ingestion process

Index

The index that the data will be stored in (default)

Stream Id

The Live Stream ID of the OpenCTI stream to consume

Import from

The number of days to go back for the initial data collection (default: 30) (optional)

Once the Input parameter have been correctly configured click add

We can also consult the Indicators Dashboard which gives an overview of the data ingested

Easy query splunk query SPL with data from kvstore OpenCTI provide indicator.

Example search with IOCs. Rule Splunk:

| tstats `security_content_summariesonly` count min(_time) as firstTime max(_time) as lastTime 
  from datamodel=Endpoint.Processes 
  where (Processes.process_name="mimikatz.exe" OR Processes.original_file_name="mimikatz.exe") 
  by Processes.action Processes.dest Processes.original_file_name Processes.parent_process 
     Processes.parent_process_exec Processes.parent_process_guid Processes.parent_process_id 
     Processes.parent_process_name Processes.parent_process_path Processes.process Processes.process_exec 
     Processes.process_guid Processes.process_hash Processes.process_id 
     Processes.process_integrity_level Processes.process_name Processes.process_path 
     Processes.user Processes.user_id Processes.vendor_product 
| `drop_dm_object_name(Processes)` 
| `security_content_ctime(firstTime)` 
| `security_content_ctime(lastTime)` 
| `windows_mimikatz_binary_execution_filter`
| rex field=process_hash "SHA256=(?<process_sha256>[A-F0-9]+)"
| eval process_sha256=lower(process_sha256) 
| lookup opencti_lookup value AS process_sha256 OUTPUT value, labels AS Tag, created_by AS Author_CTI
| table action, dest, process_sha256, process_exec, Tag, Author_CTI

By entering the Tag, it is easy to identify Hash files related to APT groups or Malware Type, etc.

Using OpenCTI GraphQL API + Splunk HEC

GraphQL is used as the query language. GraphQL allows users to flexibly select the required data fields and filter by specific conditions such as entity type, relationship type, creation time, confidence level, etc.

Some more important STIX naming shortcuts are:

  • STIX Domain Objects (SDO): Attack Patterns, Malware, Threat Actors, etc.

  • STIX Cyber Observable (SCO): IP Addresses, domain names, hashes, etc.

  • STIX Relationship Object (SRO): Relationships, Sightings

OpenCTI Data Model: OpenCTI builds the entire CTI system on the STIX 2.1 model, but also extends it to support real-world use cases such as:

  • Non-STIX entities: Narrative, Channel, Event

  • Extended relationships such as: amplifies, publishes, drops

This model is implemented as a knowledge graph. Helps to express tight connection and flexible query between objects.

For this way, first we need create a new Splunk HEC receive data. Navigate Settings > Data Inputs > HTTP Event Collector > Add new

Next, we select source type, index already exist or create new.

Recheck a little information and submit. Here, in the tab HTTP Event Collector you can set any port.

Now, we can check my token in HTTP Event Collector you just created.

Example GraphQL for usecase get relationship locations- malware- indicators.

query {
  locations {
    edges {
      node {
        id
        name
        stixCoreRelationships(
          fromTypes: "Malware"
          relationship_type: "targets"
        ) {
          edges {
            node {
              from {
                ... on Malware {
                  id
                  name
                  description
                  stixCoreRelationships(
                    fromTypes: "Indicator"
                    relationship_type: "indicates"
                    orderBy: created
                    orderMode: desc
                    first: 10
                  ) {
                    edges {
                      node {
                        from {
                          ... on Indicator {
                            id
                            name
                            pattern
                            valid_from
                            indicator_types
                          }
                        }
                      }
                    }
                  }
                }
              }
            }
          }
        }
      }
    }
  }
}

You can import parameter

Example python script for:

  • Get data from OpenCTI ( relation Location -> Malware -> Indicator)

  • Send data into Splunk (Splunk HEC) with suitable format (index=opencti sourcetype=opencti:relationships)

import requests
import json
import time
import urllib3
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)

OPENCTI_URL = "https://ip-opencti/graphql"
OPENCTI_TOKEN = "you_token"
SPLUNK_HEC_URL = "https://ip-splunk:8088/services/collector"
SPLUNK_HEC_TOKEN = "you_token"
SPLUNK_SOURCETYPE = "opencti:relationships"
SPLUNK_INDEX = "opencti"
VERIFY_SSL = False

query = """
query {
  locations {
    edges {
      node {
        id
        name
        stixCoreRelationships(
          fromTypes: "Malware"
          relationship_type: "targets"
        ) {
          edges {
            node {
              from {
                ... on Malware {
                  id
                  name
                  description
                  stixCoreRelationships(
                    fromTypes: "Indicator"
                    relationship_type: "indicates"
                    orderBy: created
                    orderMode: desc
                    first: 10
                  ) {
                    edges {
                      node {
                        from {
                          ... on Indicator {
                            id
                            name
                            pattern
                            valid_from
                            indicator_types
                          }
                        }
                      }
                    }
                  }
                }
              }
            }
          }
        }
      }
    }
  }
}
"""
def query_opencti():
    headers = {"Authorization": f"Bearer {OPENCTI_TOKEN}"}
    response = requests.post(
        OPENCTI_URL,
        json={"query": query},
        headers=headers,
        verify=VERIFY_SSL
    )
    if response.status_code != 200:
        raise Exception(f"Failed to query OpenCTI: {response.status_code} - {response.text}")
    return response.json()

def send_to_splunk(event_data):
    headers = {
        "Authorization": f"Splunk {SPLUNK_HEC_TOKEN}",
        "Content-Type": "application/json"
    }
    response = requests.post(
        SPLUNK_HEC_URL,
        headers=headers,
        data=json.dumps(event_data),
        verify=VERIFY_SSL
    )
    if response.status_code != 200:
        print(f"[!] Failed to send to Splunk: {response.status_code} - {response.text}")
    else:
        print(f"[+] Sent event to Splunk successfully.")

def main():
    data = query_opencti()

    for location_edge in data["data"]["locations"]["edges"]:
        location = location_edge["node"]
        location_id = location["id"]
        location_name = location["name"]

        for rel_edge in location.get("stixCoreRelationships", {}).get("edges", []):
            malware = rel_edge["node"]["from"]
            malware_id = malware["id"]
            malware_name = malware.get("name", "N/A")
            malware_desc = malware.get("description", "")

            for ind_edge in malware.get("stixCoreRelationships", {}).get("edges", []):
                indicator = ind_edge["node"]["from"]
                event = {
                    "event": {
                        "location_id": location_id,
                        "location_name": location_name,
                        "malware_id": malware_id,
                        "malware_name": malware_name,
                        "malware_description": malware_desc,
                        "indicator_id": indicator["id"],
                        "indicator_name": indicator.get("name"),
                        "indicator_pattern": indicator.get("pattern"),
                        "indicator_valid_from": indicator.get("valid_from"),
                        "indicator_types": indicator.get("indicator_types"),
                    },
                    "time": int(time.time()),
                    "host": "opencti",
                    "source": "opencti:graphql",
                    "sourcetype": SPLUNK_SOURCETYPE,
                    "index": SPLUNK_INDEX
                }
                send_to_splunk(event)

if __name__ == "__main__":
    main()

Switch Splunk Search & Reporting, based on the index=opencti, we can see the data that OpenCTI has provided.

Example SPL query Indicatorand Malware relate to Location VietNam.

index="opencti" sourcetype="opencti:relationships" location_name="Viet Nam"
| table _time, index, indicator_name, indicator_pattern, malware_name, malware_description

Raw log

{
  "location_id": "7679abc2-ee95-4f07-9552-e01fca12a92b",
  "location_name": "Viet Nam",
  "malware_id": "70296f1b-5a12-4282-9838-1957a87049a5",
  "malware_name": "Emotet",
  "malware_description": "[Emotet](https://attack.mitre.org/software/S0367) is a modular malware variant which is primarily used as a downloader for other malware variants such as [TrickBot](https://attack.mitre.org/software/S0266) and [IcedID](https://attack.mitre.org/software/S0483). Emotet first emerged in June 2014, initially targeting the financial sector, and has expanded to multiple verticals over time.(Citation: Trend Micro Banking Malware Jan 2019)",
  "indicator_id": "93355348-32c7-4a62-9e76-0667ea29d4e1",
  "indicator_name": "8704bb4e2b42e5f0d63abb4a0a3ea95d7e4f5165c96f2654b1e57af1b0937ffa",
  "indicator_pattern": "[file:hashes.'SHA-256' = '8704bb4e2b42e5f0d63abb4a0a3ea95d7e4f5165c96f2654b1e57af1b0937ffa']",
  "indicator_valid_from": "2023-03-25T21:46:49.000Z",
  "indicator_types": null
}

Last updated

Was this helpful?