Tutorial

Anomaly Detection on Linux Audit Logs with Isolation Forest

Build a lightweight anomaly detection pipeline that flags suspicious Linux sessions using auditd logs, feature engineering, and scikit-learn — no GPU required.

7 min read intermediate

Prerequisites

  • Basic Python knowledge
  • A Linux machine with auditd installed
  • Familiarity with the command line

Part 2 of 3 in ML for Security

Table of Contents

Most intrusions don’t announce themselves. An attacker who lands a shell and escalates privileges generates syscall patterns that look like normal admin activity — unless you’re measuring the right things.

Signature-based detection catches known attacks. But insider threats, novel exploits, and living-off-the-land techniques slip through because no rule exists yet. Statistical anomaly detection flips the approach: instead of defining what’s bad, you model what’s normal, and flag anything that deviates.

This tutorial builds a complete pipeline. You’ll collect auditd logs, engineer features from raw syscall data, train an Isolation Forest model, and score live sessions — all with Python and scikit-learn, no GPU required. By the end, you’ll have a detector that flags sessions like the one below as anomalous while ignoring routine cron jobs and SSH logins.

SESSION 4829  score: -0.38  *** ANOMALY ***
  execve: 47   open: 312   connect: 23   ptrace: 4
  unique_binaries: 19   duration: 12s   uid_changes: 3

How Isolation Forest works

Isolation Forest detects anomalies by exploiting a simple insight: outliers are easier to isolate than normal points.

The algorithm builds an ensemble of random decision trees (isolation trees). Each tree recursively partitions the data by selecting a random feature and a random split value. Normal points, surrounded by similar points, require many splits before they’re isolated into a leaf node. Anomalies, sitting far from the crowd, get isolated in just a few splits.

Normal point (deep path):        Anomaly (shallow path):

       split                          split
      /     \                        /     \
   split     ...                  [X] ←     ...
  /     \                        isolated
split    ...                     in 1 split
 ...
 [X] ← isolated after 8 splits

The anomaly score is derived from the average path length across all trees. Short average path = anomaly. Long average path = normal. The contamination parameter tells the model what fraction of training data to treat as anomalous — this sets the decision boundary.

Tip

Why Isolation Forest over other methods? One-Class SVM and autoencoders can also detect anomalies, but Isolation Forest handles high-dimensional, mixed-type data well, trains fast on small datasets, and requires almost no hyperparameter tuning. For log data, it’s the right starting point.

Setting up the environment

Create a virtual environment and install the dependencies. The full pipeline needs only three libraries.

python -m venv venv && source venv/bin/activate
pip install scikit-learn pandas numpy

Set up a project directory.

mkdir -p anomaly-detector/{data,models}

Configuring auditd

If auditd isn’t already capturing syscalls, add rules that cover the events most useful for anomaly detection. Create a rule file.

sudo tee /etc/audit/rules.d/anomaly.rules << 'EOF'
# Track process execution
-a always,exit -S execve -k exec_log

# Track file opens
-a always,exit -S openat -k file_log

# Track network connections
-a always,exit -S connect -k net_log

# Track privilege changes
-a always,exit -S setuid,setgid,setreuid,setregid -k priv_log

# Track ptrace (debugger attach)
-a always,exit -S ptrace -k trace_log
EOF

Load the rules and verify.

sudo augenrules --load
sudo auditctl -l

Warning

Audit volume These rules generate significant log volume on busy systems. On production machines, consider filtering by UID or limiting to specific directories. For this tutorial, a lab VM or development machine is ideal.

Let the system collect data for at least a few hours of normal activity — SSH sessions, cron jobs, package updates, whatever constitutes “normal” on your machine. A day or two of data produces better results.

Collecting and parsing audit logs

Auditd writes structured records to /var/log/audit/audit.log. Each syscall event spans one or more lines sharing a timestamp and serial number. The raw format looks like this:

type=SYSCALL msg=audit(1708300000.123:4567): arch=c000003e syscall=59 success=yes exit=0 a0=... ppid=1234 pid=1235 auid=1000 uid=0 gid=0 comm="curl" exe="/usr/bin/curl"
type=EXECVE msg=audit(1708300000.123:4567): argc=3 a0="curl" a1="-s" a2="http://example.com"

Create anomaly-detector/parse_audit.py to extract structured records.

import re
import sys
import subprocess
from collections import defaultdict

FALLBACK_SYSCALL_MAP = {
    '59': 'execve', '257': 'openat', '42': 'connect',
    '105': 'setuid', '106': 'setgid', '101': 'ptrace',
    '0': 'read', '1': 'write', '3': 'close',
    '62': 'kill', '56': 'clone', '2': 'open',
}

def load_syscall_map():
    """Build syscall number->name mapping for the current architecture."""
    try:
        result = subprocess.run(
            ['ausyscall', '--dump'],
            capture_output=True,
            text=True,
            check=True,
        )
        mapping = {}
        for line in result.stdout.splitlines():
            parts = line.split()
            if len(parts) >= 2 and parts[0].isdigit():
                mapping[parts[0]] = parts[1]
        if mapping:
            return mapping
    except Exception:
        pass
    return FALLBACK_SYSCALL_MAP

SYSCALL_MAP = load_syscall_map()

def parse_field(line, field):
    match = re.search(rf'{field}=(".*?"|\S+)', line)
    if not match:
        return None
    value = match.group(1).strip('"')
    return value

def parse_audit_log(path):
    """Parse audit.log into a list of syscall event dicts."""
    events = []
    with open(path) as f:
        for line in f:
            if 'type=SYSCALL' not in line:
                continue

            ts_match = re.search(r'msg=audit\((\d+\.\d+):(\d+)\)', line)
            if not ts_match:
                continue

            timestamp = float(ts_match.group(1))
            serial = ts_match.group(2)
            syscall_nr = parse_field(line, 'syscall') or ''
            syscall_name = SYSCALL_MAP.get(syscall_nr, f'sys_{syscall_nr}')

            events.append({
                'timestamp': timestamp,
                'serial': serial,
                'syscall': syscall_name,
                'uid': parse_field(line, 'uid') or '-1',
                'auid': parse_field(line, 'auid') or '-1',
                'pid': parse_field(line, 'pid') or '0',
                'ppid': parse_field(line, 'ppid') or '0',
                'exe': parse_field(line, 'exe') or '',
                'success': parse_field(line, 'success') or '',
                'comm': parse_field(line, 'comm') or '',
            })

    return events

def group_into_sessions(events, gap_seconds=300):
    """Group events into sessions by auid with a time gap threshold."""
    sessions = defaultdict(list)
    for event in events:
        key = event['auid']
        sessions[key].append(event)

    # Split on time gaps
    split_sessions = []
    for auid, evts in sessions.items():
        evts.sort(key=lambda e: e['timestamp'])
        current = [evts[0]]
        for e in evts[1:]:
            if e['timestamp'] - current[-1]['timestamp'] > gap_seconds:
                split_sessions.append(current)
                current = [e]
            else:
                current.append(e)
        split_sessions.append(current)

    return split_sessions

if __name__ == '__main__':
    log_path = sys.argv[1] if len(sys.argv) > 1 else '/var/log/audit/audit.log'
    events = parse_audit_log(log_path)
    sessions = group_into_sessions(events)
    print(f'Parsed {len(events)} events into {len(sessions)} sessions')

Run it against your audit log.

# Avoid running Python as root: copy the audit log once and read it as your user
sudo cp /var/log/audit/audit.log anomaly-detector/data/audit.log
sudo chown "$USER":"$USER" anomaly-detector/data/audit.log
python anomaly-detector/parse_audit.py anomaly-detector/data/audit.log
Parsed 48213 events into 312 sessions

The session grouping uses the audit UID (auid) and a 5-minute gap threshold. Events from the same user within 5 minutes belong to the same session. Tune the gap if your environment has longer idle periods.

Feature engineering

Raw syscall events aren’t useful to a model. You need to transform each session into a fixed-length feature vector that captures behavior — what syscalls were used, how many unique binaries ran, whether privileges changed, and how the activity was distributed over time.

Create anomaly-detector/features.py.

import numpy as np
from collections import Counter

TRACKED_SYSCALLS = [
    'execve', 'openat', 'open', 'connect', 'read', 'write',
    'close', 'setuid', 'setgid', 'ptrace', 'clone', 'kill',
]

def extract_features(session):
    """Convert a session (list of event dicts) into a feature vector."""
    syscall_counts = Counter(e['syscall'] for e in session)
    exes = set(e['exe'] for e in session if e['exe'])
    uids = set(e['uid'] for e in session)

    timestamps = [e['timestamp'] for e in session]
    duration = max(timestamps) - min(timestamps) if len(timestamps) > 1 else 0

    total_events = len(session)
    failed = sum(1 for e in session if e['success'] == 'no')

    features = {}

    # Syscall frequency (normalized by total events)
    for sc in TRACKED_SYSCALLS:
        features[f'syscall_{sc}'] = syscall_counts.get(sc, 0) / max(total_events, 1)

    # Raw counts for key offensive syscalls
    features['raw_execve'] = syscall_counts.get('execve', 0)
    features['raw_connect'] = syscall_counts.get('connect', 0)
    features['raw_ptrace'] = syscall_counts.get('ptrace', 0)

    # Session metadata
    features['unique_binaries'] = len(exes)
    features['unique_uids'] = len(uids)
    features['uid_changes'] = max(len(uids) - 1, 0)
    features['total_events'] = total_events
    features['duration'] = duration
    features['failed_ratio'] = failed / max(total_events, 1)

    # Burstiness: stddev of inter-event times
    if len(timestamps) > 2:
        deltas = np.diff(sorted(timestamps))
        features['timing_stddev'] = float(np.std(deltas))
        features['timing_mean'] = float(np.mean(deltas))
    else:
        features['timing_stddev'] = 0.0
        features['timing_mean'] = 0.0

    # Syscall diversity (unique syscall types / total types tracked)
    features['syscall_diversity'] = len(syscall_counts) / max(len(TRACKED_SYSCALLS), 1)

    return features

def sessions_to_matrix(sessions):
    """Convert a list of sessions into a feature matrix (numpy array) and feature names."""
    if not sessions:
        return np.array([]), []

    feature_dicts = [extract_features(s) for s in sessions]
    feature_names = sorted(feature_dicts[0].keys())
    matrix = np.array([[fd[name] for name in feature_names] for fd in feature_dicts])

    return matrix, feature_names

Choosing features that matter

The features fall into four categories.

CategoryFeaturesWhat they capture
Syscall frequencysyscall_execve, syscall_connect, etc.Behavioral profile — what the session does
Raw countsraw_execve, raw_ptraceAbsolute volume of high-risk calls
Session metadataunique_binaries, uid_changes, durationComplexity and privilege behavior
Temporaltiming_stddev, timing_mean, failed_ratioRhythm — scripted attacks are bursty

The ptrace count and uid_changes features are particularly useful. Legitimate sessions rarely attach a debugger or switch UIDs multiple times. Exploitation sessions — especially privilege escalation — do both.

Note

Feature engineering is where domain knowledge matters most. If your environment has specific patterns (e.g., a monitoring agent that calls ptrace legitimately), add features or filter events to account for them. The model is only as good as the features you give it.

Training the model

Create anomaly-detector/train.py. The training script loads parsed logs, engineers features, trains the Isolation Forest, and saves the model.

import pickle
import sys
import numpy as np
from sklearn.ensemble import IsolationForest
from sklearn.preprocessing import StandardScaler

from parse_audit import parse_audit_log, group_into_sessions
from features import sessions_to_matrix

def train(log_path, model_path='models/detector.pkl', contamination=0.05):
    print(f'Parsing {log_path}...')
    events = parse_audit_log(log_path)
    sessions = group_into_sessions(events)
    print(f'  {len(events)} events, {len(sessions)} sessions')

    if len(sessions) < 20:
        print('Not enough sessions to train. Collect more data.')
        sys.exit(1)

    X, feature_names = sessions_to_matrix(sessions)
    print(f'  Feature matrix: {X.shape}')

    # Scale features so no single dimension dominates
    scaler = StandardScaler()
    X_scaled = scaler.fit_transform(X)

    model = IsolationForest(
        n_estimators=200,
        contamination=contamination,
        max_samples='auto',
        random_state=42,
        n_jobs=-1,
    )
    model.fit(X_scaled)

    # Score the training data to show the baseline
    scores = model.decision_function(X_scaled)
    predictions = model.predict(X_scaled)
    n_anomalies = (predictions == -1).sum()

    print(f'  Anomalies in training data: {n_anomalies}/{len(sessions)}')
    print(f'  Score range: [{scores.min():.3f}, {scores.max():.3f}]')

    artifact = {
        'model': model,
        'scaler': scaler,
        'feature_names': feature_names,
        'contamination': contamination,
        'training_sessions': len(sessions),
    }
    with open(model_path, 'wb') as f:
        pickle.dump(artifact, f)

    print(f'  Model saved to {model_path}')
    return artifact

if __name__ == '__main__':
    log_path = sys.argv[1] if len(sys.argv) > 1 else '/var/log/audit/audit.log'
    train(log_path)

Run the training.

cd anomaly-detector
python train.py data/audit.log
Parsing data/audit.log...
  48213 events, 312 sessions
  Feature matrix: (312, 22)
  Anomalies in training data: 16/312
  Score range: [-0.421, 0.318]
  Model saved to models/detector.pkl

Understanding the contamination parameter

The contamination value (0.05 = 5%) tells the model how much of the training data it should expect to be anomalous. This directly affects the decision threshold.

  • Too low (0.01): misses real anomalies — high precision, low recall
  • Too high (0.15): floods you with false positives
  • 0.03–0.07 is a reasonable starting range for audit logs

If you have a known-clean training set (no attacks during collection), set contamination to a small value like 0.01. If your training data might include some malicious activity, 0.05 gives the model room to exclude those sessions from “normal.” To see how threshold choice drives the precision-recall tradeoff interactively, try the Classifier Threshold Lab.

Scoring live sessions

Create anomaly-detector/score.py to score new log data against the trained model.

import pickle
import sys
from parse_audit import parse_audit_log, group_into_sessions
from features import extract_features

def load_model(model_path='models/detector.pkl'):
    with open(model_path, 'rb') as f:
        return pickle.load(f)

def score_sessions(log_path, model_path='models/detector.pkl'):
    artifact = load_model(model_path)
    model = artifact['model']
    scaler = artifact['scaler']
    feature_names = artifact['feature_names']

    events = parse_audit_log(log_path)
    sessions = group_into_sessions(events)

    results = []
    for i, session in enumerate(sessions):
        feat = extract_features(session)
        vector = [[feat[name] for name in feature_names]]
        vector_scaled = scaler.transform(vector)

        score = model.decision_function(vector_scaled)[0]
        prediction = model.predict(vector_scaled)[0]

        results.append({
            'session_idx': i,
            'n_events': len(session),
            'score': score,
            'anomaly': prediction == -1,
            'auid': session[0]['auid'],
            'start': session[0]['timestamp'],
            'features': feat,
        })

    return results

def print_results(results):
    results.sort(key=lambda r: r['score'])
    for r in results:
        flag = '*** ANOMALY ***' if r['anomaly'] else ''
        print(f"SESSION {r['session_idx']:>4}  score: {r['score']:>7.3f}  "
              f"events: {r['n_events']:>5}  auid: {r['auid']}  {flag}")
        if r['anomaly']:
            f = r['features']
            print(f"  execve: {f['raw_execve']:<4}  connect: {f['raw_connect']:<4}  "
                  f"ptrace: {f['raw_ptrace']:<4}  uid_changes: {f['uid_changes']}")
            print(f"  unique_binaries: {f['unique_binaries']:<4}  "
                  f"duration: {f['duration']:.0f}s  "
                  f"failed_ratio: {f['failed_ratio']:.2f}")
            print()

if __name__ == '__main__':
    log_path = sys.argv[1] if len(sys.argv) > 1 else '/var/log/audit/audit.log'
    results = score_sessions(log_path)
    print_results(results)
python score.py data/audit.log
SESSION   42  score:  -0.381  events:   487  auid: 1000  *** ANOMALY ***
  execve: 47   connect: 23   ptrace: 4   uid_changes: 3
  unique_binaries: 19   duration: 12s   failed_ratio: 0.18

SESSION  187  score:  -0.294  events:   203  auid: 1000  *** ANOMALY ***
  execve: 31   connect: 45   ptrace: 0   uid_changes: 2
  unique_binaries: 14   duration: 8s   failed_ratio: 0.24

SESSION    1  score:   0.127  events:    34  auid: 1000
SESSION    2  score:   0.198  events:    12  auid: 0
SESSION    3  score:   0.231  events:    87  auid: 1000
...

Anomalous sessions float to the top with negative scores. The feature breakdown tells you why a session was flagged — high execve count, ptrace usage, UID changes — which gives you something actionable to investigate.

Generating test attacks

To validate the detector, simulate the kind of activity an attacker generates. Run these in a test environment.

Warning

Lab only Run these commands only on machines you own and control. These simulate attacker behavior and will trigger security tools.

# Simulate recon: rapid enumeration
for cmd in id whoami hostname uname cat; do $cmd 2>/dev/null; done
ls /etc/shadow /etc/passwd /home/*/.ssh 2>/dev/null

# Simulate lateral movement: many outbound connections
for port in 22 80 443 8080 8443; do
  timeout 1 bash -c "echo >/dev/tcp/127.0.0.1/$port" 2>/dev/null
done

# Simulate privilege escalation pattern
sudo -l 2>/dev/null
find / -perm -4000 -type f 2>/dev/null | head -5

After running these, wait 15-30 seconds for auditd to write the events, then re-score.

python score.py data/audit.log

The simulated attack session should appear with a strongly negative score, high execve count, and elevated unique_binaries.

Integrating with alerting

The detector is useful only if it feeds into something you monitor. Here are two practical integration points.

Cron-based scoring

Run the scorer on a schedule and pipe anomalies to syslog.

cat > anomaly-detector/cron_score.sh << 'SCRIPT'
#!/bin/bash
cd /path/to/anomaly-detector
source venv/bin/activate

python -c "
from score import score_sessions
results = score_sessions('/var/log/audit/audit.log')
anomalies = [r for r in results if r['anomaly']]
for a in anomalies:
    import syslog
    msg = (f\"ANOMALY_DETECTED session={a['session_idx']} \"
           f\"score={a['score']:.3f} auid={a['auid']} \"
           f\"events={a['n_events']}\")
    syslog.syslog(syslog.LOG_WARNING, msg)
"
SCRIPT
chmod +x anomaly-detector/cron_score.sh
# Run every 15 minutes (preserve existing root crontab entries)
(sudo crontab -l 2>/dev/null; echo "*/15 * * * * /path/to/anomaly-detector/cron_score.sh") | sudo crontab -

Wazuh integration

If you run Wazuh, write a custom decoder and rule to pick up the syslog messages.

<!-- /var/ossec/etc/decoders/local_decoder.xml -->
<decoder name="anomaly_detector">
  <prematch>ANOMALY_DETECTED</prematch>
  <regex>session=(\d+) score=(-?\S+) auid=(\S+) events=(\d+)</regex>
  <order>session_id, anomaly_score, audit_uid, event_count</order>
</decoder>
<!-- /var/ossec/etc/rules/local_rules.xml -->
<group name="anomaly_detection">
  <rule id="100200" level="10">
    <decoded_as>anomaly_detector</decoded_as>
    <description>ML anomaly detector flagged suspicious session (auid: $(audit_uid), score: $(anomaly_score))</description>
    <mitre>
      <id>T1059</id>
    </mitre>
  </rule>
</group>

Tip

Retraining cadence Retrain the model weekly or whenever your baseline changes significantly (new services deployed, team changes). Stale models drift — what was anomalous last month might be normal now.

Limitations and next steps

Isolation Forest on audit logs is a solid starting point, but it has real limitations.

What it catches well:

  • Noisy enumeration (nmap, linpeas, manual recon)
  • Rapid-fire exploitation attempts
  • Privilege escalation with unusual syscall patterns
  • Scripted attacks (bursty timing signature)

What it misses:

  • Low-and-slow attacks that mimic normal session rhythms
  • Attacks that use only common binaries (pure living-off-the-land)
  • Insider threats where the attacker is the baseline

Next steps to consider:

  • Add per-user baselines instead of a global model — what’s normal for root is anomalous for www-data
  • Incorporate sequence information (n-gram features over syscall sequences)
  • Feed the anomaly scores into a RAG pipeline to generate investigation summaries — combine this tutorial with the RAG pipeline tutorial
  • Export features to a time-series database (Prometheus) and build Grafana dashboards for trend visualization