Tutorial

Threat Intelligence Extraction with Named Entity Recognition

Fine-tune a BERT token classifier to extract IOCs, threat actors, and techniques from security reports, then wrap it in a structured extraction function.

6 min read advanced

Prerequisites

  • Completion of the phishing URL detection tutorial (fine-tuned transformers)
  • Python and PyTorch experience
  • Basic familiarity with NER (token classification)
  • A machine with at least 16 GB of RAM (GPU recommended)

Part 9 of 10 in ML for Security

Table of Contents

The phishing URL detection tutorial (Part 5) used a transformer for sequence classification: the model read an entire URL and produced one label (phishing or legitimate). This tutorial shifts from classification to extraction. Instead of assigning a single label to a document, we assign a label to every token in a threat report. The task is named entity recognition (NER), and the entities are security-specific: IP addresses, file hashes, CVE identifiers, malware families, threat actor names, and attacker technique mentions.

Why automate threat intelligence extraction

Security analysts read threat reports from CISA, vendor blogs, and ISAC feeds daily. Manually extracting IOCs is tedious and error-prone. Regex handles structured indicators (IPs, hashes, CVEs), but has fundamental limitations:

  1. No context. A regex extracts 192.168.1.50 but cannot tell you whether it is a C2 server, a victim host, or a documentation example. NER labels tokens in context.

  2. Unstructured entities. Threat actor names (APT28, Lazarus Group) and technique descriptions (spear-phishing, credential dumping) have no fixed syntactic pattern. You cannot write a regex for “any threat actor name.”

  3. Variant handling. Malware families appear under multiple names (Cobalt Strike, CobaltStrike, CS beacon). A trained model generalizes; a regex requires an exhaustive list.

NER provides structured extraction from unstructured text: each entity has a text value, a type, and a position in the source document.

Entity types and IOB2 tagging

Six entity types plus the outside label:

LabelExamples
IP203.0.113.50, 2001:db8::1
HASHd41d8cd98f00b204e9800998ecf8427e (MD5/SHA-1/SHA-256)
CVECVE-2023-44228, CVE-2021-34527
MALWAREEmotet, Cobalt Strike, PlugX
ACTORAPT28, Lazarus Group, Sandworm
TECHNIQUEspear-phishing, credential dumping
O(all other tokens)

IOB2 (Inside-Outside-Beginning) tagging distinguishes the start of an entity from its continuation. Every entity’s first token gets a B- prefix, and subsequent tokens get I-. Single-token entities only have a B- tag. This produces 13 labels: O, plus B- and I- for each of the six types.

Token:   The     APT28    group   exploited  CVE-2023-44228  to    deploy  Emotet
Label:   O       B-ACTOR  I-ACTOR O          B-CVE           O     O       B-MALWARE

Setting up the environment

python -m venv venv && source venv/bin/activate
pip install torch transformers datasets scikit-learn seqeval pandas numpy

The seqeval library provides entity-level evaluation metrics for NER. This matters because a model that correctly labels three out of four tokens in a multi-token entity has not actually extracted the entity correctly.

Dataset: synthetic annotation from templates

Production NER systems use manually annotated corpora (DNRTI, CyNER), but these are limited in size. A common bootstrapping approach is to generate synthetic data from sentence templates, filling entity slots with real-world values.

Note

Synthetic data is sufficient for learning the NER pipeline and produces reasonable results on well-represented patterns. Production accuracy depends on manual annotation of real threat reports. Template-generated data cannot capture the full variety of sentence structures, hedging language, and nested references found in real advisories. Treat the metrics below as an upper bound on real-world performance.

import random
import numpy as np

random.seed(42)
np.random.seed(42)

# Real entity values for slot filling
ACTORS = [
    'APT28', 'APT29', 'Lazarus Group', 'Sandworm', 'Charming Kitten',
    'Turla', 'Fancy Bear', 'Cozy Bear', 'Kimsuky', 'Mustang Panda',
    'FIN7', 'Carbanak Group', 'Gamaredon', 'Volt Typhoon',
]
MALWARE = [
    'Emotet', 'Cobalt Strike', 'PlugX', 'TrickBot', 'Mimikatz',
    'Ryuk', 'Conti', 'BlackCat', 'QakBot', 'IcedID',
    'SUNBURST', 'Agent Tesla', 'Remcos RAT',
]
TECHNIQUES = [
    'spear-phishing', 'credential dumping', 'DLL sideloading',
    'process injection', 'lateral movement', 'privilege escalation',
    'supply chain compromise', 'watering hole attack', 'brute force',
    'password spraying', 'DNS tunneling', 'data exfiltration', 'pass the hash',
]
CVES = [
    'CVE-2023-44228', 'CVE-2021-34527', 'CVE-2021-44228',
    'CVE-2020-1472', 'CVE-2017-0144', 'CVE-2022-30190',
    'CVE-2023-27997', 'CVE-2021-26855', 'CVE-2023-23397',
    'CVE-2022-26134', 'CVE-2019-0708',
]

def random_ipv4():
    """Generate a random routable-looking IPv4 address."""
    first = random.choice([10, 45, 91, 103, 141, 172, 185, 192, 203, 212])
    return f'{first}.{random.randint(0,255)}.{random.randint(0,255)}.{random.randint(1,254)}'

def random_hash():
    """Generate a random hex string resembling a SHA-256 hash."""
    return ''.join(random.choices('0123456789abcdef', k=64))

IPS = [random_ipv4() for _ in range(50)]
HASHES = [random_hash() for _ in range(50)]

# ~27 sentence templates with entity slots
TEMPLATES = [
    'The {ACTOR} group exploited {CVE} to deploy {MALWARE} from {IP}.',
    '{ACTOR} used {TECHNIQUE} to gain initial access, delivering {MALWARE} via phishing emails.',
    'Analysis revealed that {MALWARE} beaconed to {IP} over port 443.',
    'The threat actor {ACTOR} leveraged {CVE} in a campaign targeting financial institutions.',
    'Researchers attributed the {MALWARE} samples to {ACTOR} based on code overlap.',
    'The attack chain began with {TECHNIQUE} followed by deployment of {MALWARE}.',
    'Indicators of compromise include the hash {HASH} and the C2 address {IP}.',
    '{ACTOR} exploited {CVE} to execute {TECHNIQUE} on vulnerable servers.',
    'The malware communicated with {IP} and dropped a payload with hash {HASH}.',
    'Security teams should block {IP} and scan for {HASH} associated with {MALWARE}.',
    '{ACTOR} conducted {TECHNIQUE} against government networks using {MALWARE}.',
    'The vulnerability {CVE} was exploited by {ACTOR} to install {MALWARE}.',
    'Network traffic analysis showed connections to {IP} linked to {ACTOR} infrastructure.',
    'The sample with hash {HASH} was identified as a variant of {MALWARE}.',
    '{ACTOR} shifted tactics to {TECHNIQUE} after patches for {CVE} were released.',
    'The C2 server at {IP} hosted {MALWARE} payloads targeting {CVE}.',
    '{ACTOR} deployed {MALWARE} through {TECHNIQUE} against supply chain targets.',
    'Forensic analysis linked hash {HASH} to the {ACTOR} toolkit.',
    'Analysts observed {ACTOR} using {TECHNIQUE} to move laterally within the network.',
    'The dropper connected to {IP} to retrieve {MALWARE} with hash {HASH}.',
    '{ACTOR} is known for {TECHNIQUE} and has been linked to {MALWARE} deployments.',
    'Scanning for {CVE} revealed systems that {ACTOR} had already compromised.',
    'The payload hash {HASH} was uploaded to VirusTotal and flagged as {MALWARE}.',
    'The exploit for {CVE} was combined with {MALWARE} for remote code execution.',
    'Multiple organizations reported {TECHNIQUE} attacks attributed to {ACTOR}.',
    'File hash {HASH} matched known {MALWARE} indicators in threat feeds.',
    '{TECHNIQUE} was the primary method used by {ACTOR} in this campaign.',
]


def tokenize_and_label(sentence, entities):
    """Split a filled sentence into tokens and assign IOB2 labels.

    Trailing sentence punctuation is stripped from the final token of any
    matched entity so the stored token (and the label that points to it)
    does not include the period that ended the sentence. Without this,
    'Emotet.' would be labeled as a B-MALWARE token, which trains the
    model to keep the punctuation as part of the extracted indicator.
    """
    tokens = sentence.split()
    labels = ['O'] * len(tokens)

    for entity_text, entity_type in entities:
        entity_tokens = entity_text.split()
        entity_len = len(entity_tokens)
        for i in range(len(tokens) - entity_len + 1):
            candidate = list(tokens[i:i + entity_len])
            stripped_last = candidate[-1].rstrip('.,;:!?')
            candidate[-1] = stripped_last
            if candidate == entity_tokens:
                # Persist the stripped form so the labeled token does not
                # include trailing sentence punctuation.
                tokens[i + entity_len - 1] = stripped_last
                labels[i] = f'B-{entity_type}'
                for j in range(1, entity_len):
                    labels[i + j] = f'I-{entity_type}'
                break

    return tokens, labels


SLOT_SOURCES = {
    '{ACTOR}': ('ACTOR', ACTORS), '{MALWARE}': ('MALWARE', MALWARE),
    '{TECHNIQUE}': ('TECHNIQUE', TECHNIQUES), '{CVE}': ('CVE', CVES),
    '{IP}': ('IP', IPS), '{HASH}': ('HASH', HASHES),
}

def generate_dataset(n_samples=5000):
    """Generate synthetic NER training data from templates."""
    all_tokens = []
    all_labels = []

    for _ in range(n_samples):
        template = random.choice(TEMPLATES)
        entities = []
        sentence = template

        for placeholder, (etype, values) in SLOT_SOURCES.items():
            if placeholder in sentence:
                val = random.choice(values)
                sentence = sentence.replace(placeholder, val, 1)
                entities.append((val, etype))

        tokens, labels = tokenize_and_label(sentence, entities)
        all_tokens.append(tokens)
        all_labels.append(labels)

    return all_tokens, all_labels


tokens_list, labels_list = generate_dataset(5000)

# Inspect a sample
for token, label in zip(tokens_list[0], labels_list[0]):
    print(f'{token:30s} {label}')
The                            O
APT28                          B-ACTOR
group                          I-ACTOR
exploited                      O
CVE-2023-44228                 B-CVE
to                             O
deploy                         O
Emotet                         B-MALWARE
from                           O
203.147.62.118                 B-IP

Train/validation/test split

from sklearn.model_selection import train_test_split

indices = list(range(len(tokens_list)))
train_idx, test_idx = train_test_split(indices, test_size=0.2, random_state=42)
train_idx, val_idx = train_test_split(train_idx, test_size=0.125, random_state=42)

train_tokens = [tokens_list[i] for i in train_idx]
train_labels = [labels_list[i] for i in train_idx]
val_tokens = [tokens_list[i] for i in val_idx]
val_labels = [labels_list[i] for i in val_idx]
test_tokens = [tokens_list[i] for i in test_idx]
test_labels = [labels_list[i] for i in test_idx]

print(f'Train: {len(train_tokens)}, Val: {len(val_tokens)}, Test: {len(test_tokens)}')
Train: 3500, Val: 500, Test: 1000

Tokenization and label alignment

BERT’s WordPiece tokenizer splits words into subword tokens. When “APT28” becomes ["apt", "##28"], we need to decide which subword tokens receive entity labels. The standard approach: the first subword of each word gets the word’s IOB2 label, continuation subwords receive -100 (which PyTorch’s CrossEntropyLoss ignores automatically), and special tokens ([CLS], [SEP], [PAD]) also receive -100.

from transformers import DistilBertTokenizerFast

tokenizer = DistilBertTokenizerFast.from_pretrained('distilbert-base-uncased')

LABEL_LIST = [
    'O',
    'B-IP', 'I-IP', 'B-HASH', 'I-HASH', 'B-CVE', 'I-CVE',
    'B-MALWARE', 'I-MALWARE', 'B-ACTOR', 'I-ACTOR', 'B-TECHNIQUE', 'I-TECHNIQUE',
]
label2id = {label: i for i, label in enumerate(LABEL_LIST)}
id2label = {i: label for label, i in label2id.items()}

# Demonstrate the alignment problem
example_words = ['The', 'APT28', 'group', 'deployed', 'Cobalt', 'Strike']
example_labels = ['O', 'B-ACTOR', 'I-ACTOR', 'O', 'B-MALWARE', 'I-MALWARE']

encoding = tokenizer(example_words, is_split_into_words=True)
word_ids = encoding.word_ids()

print('Subword-level after alignment:')
for i, (token_id, word_id) in enumerate(zip(encoding['input_ids'], word_ids)):
    token = tokenizer.convert_ids_to_tokens(token_id)
    if word_id is None:
        aligned_label = '[ignored]'
    elif i == 0 or word_ids[i - 1] != word_id:
        aligned_label = example_labels[word_id]
    else:
        aligned_label = '[ignored]'
    print(f'  {token:15s} word_id={str(word_id):5s} -> {aligned_label}')
Subword-level after alignment:
  [CLS]           word_id=None  -> [ignored]
  the             word_id=0     -> O
  apt             word_id=1     -> B-ACTOR
  ##28            word_id=1     -> [ignored]
  group           word_id=2     -> I-ACTOR
  deployed        word_id=3     -> O
  cobalt          word_id=4     -> B-MALWARE
  strike          word_id=5     -> I-MALWARE
  [SEP]           word_id=None  -> [ignored]

“APT28” splits into ["apt", "##28"]. Only "apt" receives the B-ACTOR label.

Building the aligned dataset

import torch
from torch.utils.data import Dataset

class NERDataset(Dataset):
    """Token classification dataset with WordPiece label alignment.

    Aligns IOB2 labels to subword positions. Continuation subwords and
    special tokens receive label -100 (ignored by CrossEntropyLoss).
    """

    def __init__(self, tokens_list, labels_list, tokenizer, label2id, max_length=128):
        self.tokens_list = tokens_list
        self.labels_list = labels_list
        self.tokenizer = tokenizer
        self.label2id = label2id
        self.max_length = max_length

    def __len__(self):
        return len(self.tokens_list)

    def __getitem__(self, idx):
        words = self.tokens_list[idx]
        word_labels = self.labels_list[idx]
        encoding = self.tokenizer(
            words, is_split_into_words=True, max_length=self.max_length,
            padding='max_length', truncation=True, return_tensors='pt',
        )

        word_ids = encoding.word_ids()
        aligned_labels = []
        prev = None
        for word_id in word_ids:
            if word_id is None:
                aligned_labels.append(-100)
            elif word_id != prev:
                aligned_labels.append(self.label2id[word_labels[word_id]])
            else:
                aligned_labels.append(-100)
            prev = word_id

        return {
            'input_ids': encoding['input_ids'].squeeze(),
            'attention_mask': encoding['attention_mask'].squeeze(),
            'labels': torch.tensor(aligned_labels, dtype=torch.long),
        }


train_dataset = NERDataset(train_tokens, train_labels, tokenizer, label2id)
val_dataset = NERDataset(val_tokens, val_labels, tokenizer, label2id)
test_dataset = NERDataset(test_tokens, test_labels, tokenizer, label2id)

Baseline: regex extraction

Before training a 66M-parameter model, establish a regex baseline. Structured indicators (IPs, hashes, CVEs) have distinctive character patterns that regular expressions handle well; the regex baseline anchors what we should expect from the model on those classes and exposes the gap on the unstructured classes (ACTOR, MALWARE, TECHNIQUE) where regex cannot generalize.

import re

REGEX_PATTERNS = {
    'IP': re.compile(r'\b(?:\d{1,3}\.){3}\d{1,3}\b'),
    'HASH': re.compile(r'\b[a-fA-F0-9]{32,64}\b'),
    'CVE': re.compile(r'\bCVE-\d{4}-\d{4,7}\b'),
}

def regex_extract(text):
    """Return {label: [matches]} for the regex-tractable entity classes."""
    return {label: pat.findall(text) for label, pat in REGEX_PATTERNS.items()}

On this synthetic corpus, the regex baseline gets near-perfect extraction for IP, HASH, and CVE because those tokens are generated in canonical formats. It returns nothing for ACTOR, MALWARE, and TECHNIQUE unless you add dictionaries for those classes. The NER model needs to match regex behavior on structured classes and substantially exceed it on the unstructured ones to justify itself.

Building the NER model

DistilBertForTokenClassification adds a linear classification head on top of DistilBERT’s hidden states, producing a label prediction for every token position.

from transformers import DistilBertForTokenClassification

model = DistilBertForTokenClassification.from_pretrained(
    'distilbert-base-uncased',
    num_labels=len(LABEL_LIST),
    id2label=id2label,
    label2id=label2id,
)

total_params = sum(p.numel() for p in model.parameters())
trainable_params = sum(p.numel() for p in model.parameters() if p.requires_grad)
print(f'Total parameters: {total_params:,}')
print(f'Trainable parameters: {trainable_params:,}')
print(f'Number of labels: {len(LABEL_LIST)}')
Total parameters: 66,369,549
Trainable parameters: 66,369,549
Number of labels: 13

Note

For a production system with a large manually annotated corpus, you might freeze the DistilBERT backbone and train only the classification head first, then unfreeze and fine-tune the full model. With synthetic data of this size, fine-tuning all parameters works well and converges quickly.

Training

from torch.utils.data import DataLoader
from seqeval.metrics import f1_score as seqeval_f1
from seqeval.metrics import classification_report as seqeval_report

def decode_predictions(logits, labels, id2label):
    """Convert model outputs to IOB2 label sequences for seqeval."""
    preds = torch.argmax(logits, dim=-1)
    true_seqs, pred_seqs = [], []
    for pred_seq, label_seq in zip(preds, labels):
        t, p = [], []
        for pi, li in zip(pred_seq, label_seq):
            if li.item() != -100:
                t.append(id2label[li.item()])
                p.append(id2label[pi.item()])
        true_seqs.append(t)
        pred_seqs.append(p)
    return true_seqs, pred_seqs


def train_ner(model, train_dataset, val_dataset, epochs=10, batch_size=32, lr=3e-5):
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    print(f'Using device: {device}')
    model = model.to(device)
    train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
    val_loader = DataLoader(val_dataset, batch_size=batch_size)
    optimizer = torch.optim.AdamW(model.parameters(), lr=lr, weight_decay=0.01)

    for epoch in range(epochs):
        model.train()
        total_loss = 0
        for batch in train_loader:
            ids = batch['input_ids'].to(device)
            mask = batch['attention_mask'].to(device)
            labs = batch['labels'].to(device)
            outputs = model(input_ids=ids, attention_mask=mask, labels=labs)
            total_loss += outputs.loss.item() * len(ids)
            optimizer.zero_grad()
            outputs.loss.backward()
            torch.nn.utils.clip_grad_norm_(model.parameters(), 1.0)
            optimizer.step()

        model.eval()
        all_true, all_pred, val_loss = [], [], 0
        with torch.no_grad():
            for batch in val_loader:
                ids = batch['input_ids'].to(device)
                mask = batch['attention_mask'].to(device)
                labs = batch['labels'].to(device)
                out = model(input_ids=ids, attention_mask=mask, labels=labs)
                val_loss += out.loss.item() * len(ids)
                t, p = decode_predictions(out.logits, labs, id2label)
                all_true.extend(t)
                all_pred.extend(p)

        print(f'Epoch {epoch+1:2d}/{epochs}  '
              f'train_loss={total_loss/len(train_dataset):.4f}  '
              f'val_loss={val_loss/len(val_dataset):.4f}  '
              f'entity_f1={seqeval_f1(all_true, all_pred):.4f}')

    return model

model = train_ner(model, train_dataset, val_dataset)

Representative output (values will vary with hardware, library versions, and random seed):

Using device: cuda
Epoch  1/10  train_loss=0.4821  val_loss=0.1243  entity_f1=0.8234
Epoch  2/10  train_loss=0.0892  val_loss=0.0534  entity_f1=0.9187
...
Epoch  5/10  train_loss=0.0167  val_loss=0.0189  entity_f1=0.9701
...
Epoch 10/10  train_loss=0.0043  val_loss=0.0139  entity_f1=0.9812

Entity-level F1 climbs quickly because the synthetic data has consistent patterns. Real annotated data would require more epochs and produce lower F1.

Evaluation

def evaluate_ner(model, test_dataset, id2label, batch_size=32):
    device = next(model.parameters()).device
    model.eval()
    all_true, all_pred = [], []
    with torch.no_grad():
        for batch in DataLoader(test_dataset, batch_size=batch_size):
            outputs = model(
                input_ids=batch['input_ids'].to(device),
                attention_mask=batch['attention_mask'].to(device),
            )
            t, p = decode_predictions(outputs.logits, batch['labels'].to(device), id2label)
            all_true.extend(t)
            all_pred.extend(p)
    print(seqeval_report(all_true, all_pred))
    return all_true, all_pred


all_true, all_pred = evaluate_ner(model, test_dataset, id2label)

Representative output:

              precision    recall  f1-score   support

       ACTOR       0.96      0.95      0.96       312
         CVE       0.99      0.99      0.99       287
        HASH       0.99      0.98      0.99       298
          IP       0.98      0.99      0.99       321
     MALWARE       0.97      0.96      0.97       334
   TECHNIQUE       0.94      0.93      0.93       276

   micro avg       0.97      0.97      0.97      1828
   macro avg       0.97      0.97      0.97      1828
weighted avg       0.97      0.97      0.97      1828

Structured entities (IP, HASH, CVE) score highest because they have distinctive character patterns. ACTOR and TECHNIQUE are harder because they overlap with common English words (“group”, “lateral movement”).

Example predictions on test sentences

def predict_sentence(words, model, tokenizer, id2label):
    """Predict IOB2 labels for a list of words."""
    device = next(model.parameters()).device
    encoding = tokenizer(
        words, is_split_into_words=True, max_length=128,
        padding='max_length', truncation=True, return_tensors='pt',
    )
    model.eval()
    with torch.no_grad():
        outputs = model(
            input_ids=encoding['input_ids'].to(device),
            attention_mask=encoding['attention_mask'].to(device),
        )
        preds = torch.argmax(outputs.logits, dim=-1)[0]

    word_ids = encoding.word_ids()
    result = []
    prev = None
    for pred, wid in zip(preds, word_ids):
        if wid is not None and wid != prev:
            result.append((words[wid], id2label[pred.item()]))
        prev = wid
    return result

for i in [0, 5]:
    words = test_tokens[i]
    preds = predict_sentence(words, model, tokenizer, id2label)
    print('Sentence:', ' '.join(words))
    for word, label in preds:
        if label != 'O':
            print(f'  {word} -> {label}')
    print()
Sentence: Lazarus Group conducted credential dumping against government networks using Emotet
  Lazarus -> B-ACTOR
  Group -> I-ACTOR
  credential -> B-TECHNIQUE
  dumping -> I-TECHNIQUE
  Emotet -> B-MALWARE

Sentence: The C2 server at 185.42.187.93 hosted PlugX payloads targeting CVE-2022-26134
  185.42.187.93 -> B-IP
  PlugX -> B-MALWARE
  CVE-2022-26134 -> B-CVE

Building an extraction pipeline

Wrap the model in a function that takes raw text, runs inference, and returns structured IOC dictionaries.

def extract_iocs(text, model, tokenizer, id2label):
    """Extract IOCs from a threat report paragraph."""
    device = next(model.parameters()).device
    words = text.split()

    encoding = tokenizer(
        words, is_split_into_words=True,
        max_length=512, padding=True, truncation=True, return_tensors='pt',
    )

    model.eval()
    with torch.no_grad():
        outputs = model(
            input_ids=encoding['input_ids'].to(device),
            attention_mask=encoding['attention_mask'].to(device),
        )
        preds = torch.argmax(outputs.logits, dim=-1)[0]

    word_ids = encoding.word_ids()
    word_preds = []
    previous_word_id = None
    for token_pred, word_id in zip(preds, word_ids):
        if word_id is None:
            continue
        if word_id != previous_word_id:
            word_preds.append((words[word_id], id2label[token_pred.item()]))
        previous_word_id = word_id

    # Group consecutive B-/I- tags into entities
    type_to_key = {
        'IP': 'ips', 'HASH': 'hashes', 'CVE': 'cves',
        'MALWARE': 'malware', 'ACTOR': 'actors', 'TECHNIQUE': 'techniques',
    }
    entities = {k: [] for k in type_to_key.values()}
    current_entity = []
    current_type = None

    def flush():
        if current_entity and current_type:
            text = ' '.join(current_entity).rstrip('.,;:!?')
            key = type_to_key[current_type]
            if text not in entities[key]:
                entities[key].append(text)

    for word, label in word_preds:
        if label.startswith('B-'):
            flush()
            current_type = label[2:]
            current_entity = [word]
        elif label.startswith('I-') and current_type == label[2:]:
            current_entity.append(word)
        else:
            flush()
            current_entity = []
            current_type = None

    flush()
    return entities


# Test on a sample threat report paragraph
sample_report = (
    "On March 15, Volt Typhoon exploited CVE-2023-27997 in Fortinet appliances to gain initial "
    "access. The attackers used credential dumping with Mimikatz, then deployed Cobalt "
    "Strike beacons communicating with 185.220.101.42 and 91.215.85.17. The primary "
    "payload had hash e99a18c428cb38d5f260853678922e03abd8340aacd5834e9b1d42f1b2c68b72."
)

results = extract_iocs(sample_report, model, tokenizer, id2label)
print('Extracted IOCs:')
for key, vals in results.items():
    if vals:
        print(f'  {key:11s} {vals}')
Extracted IOCs:
  ips:        ['185.220.101.42', '91.215.85.17']
  hashes:     ['e99a18c428cb38d5f260853678922e03abd8340aacd5834e9b1d42f1b2c68b72']
  cves:       ['CVE-2023-27997']
  malware:    ['Mimikatz', 'Cobalt Strike']
  actors:     ['Volt Typhoon']
  techniques: ['credential dumping']

Combining with a RAG pipeline

A local RAG pipeline retrieves relevant security advisories given a natural language query. The NER model extracts structured IOCs from those advisories. Together they form an automated threat intelligence workflow: query in, structured IOCs out.

# Conceptual sketch (requires the RAG infrastructure from the local-rag tutorial)

def threat_intel_pipeline(query, rag_retriever, ner_model, tokenizer, id2label):
    """Retrieve relevant advisories and extract IOCs from them."""
    chunks = rag_retriever.query(query, n_results=5)
    merged = {k: [] for k in ['ips', 'hashes', 'cves', 'malware', 'actors', 'techniques']}

    for chunk in chunks:
        iocs = extract_iocs(chunk, ner_model, tokenizer, id2label)
        for key in merged:
            for val in iocs[key]:
                if val not in merged[key]:
                    merged[key].append(val)

    return merged

# results = threat_intel_pipeline(
#     "What indicators are associated with APT29 campaigns in 2023?",
#     rag_retriever, model, tokenizer, id2label
# )

The RAG component handles relevance (which advisories matter for this query), and the NER component handles extraction (what entities appear in those advisories).

Limitations

Domain drift. Threat actor names, malware families, and attack techniques change constantly. New APT groups are named, malware variants are discovered, and novel techniques are documented. A model trained on today’s entity lists will miss tomorrow’s new actors and malware. Production systems need periodic retraining on fresh annotated data.

Nested entities. “The Lazarus Group Mimikatz variant” contains overlapping ACTOR and MALWARE references. Standard IOB2 cannot represent overlapping spans. Span-based models and nested NER architectures address this but add significant complexity.

Context dependence. The NER model assigns a type (IP, HASH, ACTOR) but not a role. It cannot distinguish an attacker C2 address from a victim IP. Determining who did what to whom requires relation extraction, a separate task that operates on top of NER output.

Synthetic data ceiling. Models trained on template-generated sentences learn the template patterns, not the full diversity of real threat report language. Real reports contain hedging (“possibly linked to”), attribution uncertainty, nested clauses, and domain jargon that templates do not capture.

Warning

Do not deploy a model trained only on synthetic data into a production threat intelligence pipeline without validation on real reports. The high F1 scores above reflect template regularity, not real-world performance. Annotate 500 to 1000 real report sentences and fine-tune on those before production use.

Next steps

This tutorial covered the full NER pipeline for security: synthetic data generation, subword alignment, training, entity-level evaluation, and structured extraction. The model assigns a label to every token, turning unstructured threat reports into structured IOC dictionaries.

The next tutorial introduces a completely different data modality: representing disassembled binary functions as control flow graphs. Graph neural networks operate on these graphs to learn function similarity, enabling cross-binary vulnerability search and malware variant detection.