Most intrusions don’t announce themselves. An attacker who lands a shell and escalates privileges generates syscall patterns that look like normal admin activity — unless you’re measuring the right things.
Signature-based detection catches known attacks. But insider threats, novel exploits, and living-off-the-land techniques slip through because no rule exists yet. Statistical anomaly detection flips the approach: instead of defining what’s bad, you model what’s normal, and flag anything that deviates.
This tutorial builds a complete pipeline. You’ll collect auditd logs, engineer features from raw syscall data, train an Isolation Forest model, and score live sessions — all with Python and scikit-learn, no GPU required. By the end, you’ll have a detector that flags sessions like the one below as anomalous while ignoring routine cron jobs and SSH logins.
SESSION 4829 score: -0.38 *** ANOMALY ***
execve: 47 open: 312 connect: 23 ptrace: 4
unique_binaries: 19 duration: 12s uid_changes: 3How Isolation Forest works
Isolation Forest detects anomalies by exploiting a simple insight: outliers are easier to isolate than normal points.
The algorithm builds an ensemble of random decision trees (isolation trees). Each tree recursively partitions the data by selecting a random feature and a random split value. Normal points, surrounded by similar points, require many splits before they’re isolated into a leaf node. Anomalies, sitting far from the crowd, get isolated in just a few splits.
Normal point (deep path): Anomaly (shallow path):
split split
/ \ / \
split ... [X] ← ...
/ \ isolated
split ... in 1 split
...
[X] ← isolated after 8 splitsThe anomaly score is derived from the average path length across all trees. Short average path = anomaly. Long average path = normal. The contamination parameter tells the model what fraction of training data to treat as anomalous — this sets the decision boundary.
Tip
Why Isolation Forest over other methods? One-Class SVM and autoencoders can also detect anomalies, but Isolation Forest handles high-dimensional, mixed-type data well, trains fast on small datasets, and requires almost no hyperparameter tuning. For log data, it’s the right starting point.
Setting up the environment
Create a virtual environment and install the dependencies. The full pipeline needs only three libraries.
python -m venv venv && source venv/bin/activate
pip install scikit-learn pandas numpySet up a project directory.
mkdir -p anomaly-detector/{data,models}Configuring auditd
If auditd isn’t already capturing syscalls, add rules that cover the events most useful for anomaly detection. Create a rule file.
sudo tee /etc/audit/rules.d/anomaly.rules << 'EOF'
# Track process execution
-a always,exit -S execve -k exec_log
# Track file opens
-a always,exit -S openat -k file_log
# Track network connections
-a always,exit -S connect -k net_log
# Track privilege changes
-a always,exit -S setuid,setgid,setreuid,setregid -k priv_log
# Track ptrace (debugger attach)
-a always,exit -S ptrace -k trace_log
EOFLoad the rules and verify.
sudo augenrules --load
sudo auditctl -lWarning
Audit volume These rules generate significant log volume on busy systems. On production machines, consider filtering by UID or limiting to specific directories. For this tutorial, a lab VM or development machine is ideal.
Let the system collect data for at least a few hours of normal activity — SSH sessions, cron jobs, package updates, whatever constitutes “normal” on your machine. A day or two of data produces better results.
Collecting and parsing audit logs
Auditd writes structured records to /var/log/audit/audit.log. Each syscall event spans one or more lines sharing a timestamp and serial number. The raw format looks like this:
type=SYSCALL msg=audit(1708300000.123:4567): arch=c000003e syscall=59 success=yes exit=0 a0=... ppid=1234 pid=1235 auid=1000 uid=0 gid=0 comm="curl" exe="/usr/bin/curl"
type=EXECVE msg=audit(1708300000.123:4567): argc=3 a0="curl" a1="-s" a2="http://example.com"Create anomaly-detector/parse_audit.py to extract structured records.
import re
import sys
import subprocess
from collections import defaultdict
FALLBACK_SYSCALL_MAP = {
'59': 'execve', '257': 'openat', '42': 'connect',
'105': 'setuid', '106': 'setgid', '101': 'ptrace',
'0': 'read', '1': 'write', '3': 'close',
'62': 'kill', '56': 'clone', '2': 'open',
}
def load_syscall_map():
"""Build syscall number->name mapping for the current architecture."""
try:
result = subprocess.run(
['ausyscall', '--dump'],
capture_output=True,
text=True,
check=True,
)
mapping = {}
for line in result.stdout.splitlines():
parts = line.split()
if len(parts) >= 2 and parts[0].isdigit():
mapping[parts[0]] = parts[1]
if mapping:
return mapping
except Exception:
pass
return FALLBACK_SYSCALL_MAP
SYSCALL_MAP = load_syscall_map()
def parse_field(line, field):
match = re.search(rf'{field}=(".*?"|\S+)', line)
if not match:
return None
value = match.group(1).strip('"')
return value
def parse_audit_log(path):
"""Parse audit.log into a list of syscall event dicts."""
events = []
with open(path) as f:
for line in f:
if 'type=SYSCALL' not in line:
continue
ts_match = re.search(r'msg=audit\((\d+\.\d+):(\d+)\)', line)
if not ts_match:
continue
timestamp = float(ts_match.group(1))
serial = ts_match.group(2)
syscall_nr = parse_field(line, 'syscall') or ''
syscall_name = SYSCALL_MAP.get(syscall_nr, f'sys_{syscall_nr}')
events.append({
'timestamp': timestamp,
'serial': serial,
'syscall': syscall_name,
'uid': parse_field(line, 'uid') or '-1',
'auid': parse_field(line, 'auid') or '-1',
'pid': parse_field(line, 'pid') or '0',
'ppid': parse_field(line, 'ppid') or '0',
'exe': parse_field(line, 'exe') or '',
'success': parse_field(line, 'success') or '',
'comm': parse_field(line, 'comm') or '',
})
return events
def group_into_sessions(events, gap_seconds=300):
"""Group events into sessions by auid with a time gap threshold."""
sessions = defaultdict(list)
for event in events:
key = event['auid']
sessions[key].append(event)
# Split on time gaps
split_sessions = []
for auid, evts in sessions.items():
evts.sort(key=lambda e: e['timestamp'])
current = [evts[0]]
for e in evts[1:]:
if e['timestamp'] - current[-1]['timestamp'] > gap_seconds:
split_sessions.append(current)
current = [e]
else:
current.append(e)
split_sessions.append(current)
return split_sessions
if __name__ == '__main__':
log_path = sys.argv[1] if len(sys.argv) > 1 else '/var/log/audit/audit.log'
events = parse_audit_log(log_path)
sessions = group_into_sessions(events)
print(f'Parsed {len(events)} events into {len(sessions)} sessions')Run it against your audit log.
# Avoid running Python as root: copy the audit log once and read it as your user
sudo cp /var/log/audit/audit.log anomaly-detector/data/audit.log
sudo chown "$USER":"$USER" anomaly-detector/data/audit.log
python anomaly-detector/parse_audit.py anomaly-detector/data/audit.logParsed 48213 events into 312 sessionsThe session grouping uses the audit UID (auid) and a 5-minute gap threshold. Events from the same user within 5 minutes belong to the same session. Tune the gap if your environment has longer idle periods.
Feature engineering
Raw syscall events aren’t useful to a model. You need to transform each session into a fixed-length feature vector that captures behavior — what syscalls were used, how many unique binaries ran, whether privileges changed, and how the activity was distributed over time.
Create anomaly-detector/features.py.
import numpy as np
from collections import Counter
TRACKED_SYSCALLS = [
'execve', 'openat', 'open', 'connect', 'read', 'write',
'close', 'setuid', 'setgid', 'ptrace', 'clone', 'kill',
]
def extract_features(session):
"""Convert a session (list of event dicts) into a feature vector."""
syscall_counts = Counter(e['syscall'] for e in session)
exes = set(e['exe'] for e in session if e['exe'])
uids = set(e['uid'] for e in session)
timestamps = [e['timestamp'] for e in session]
duration = max(timestamps) - min(timestamps) if len(timestamps) > 1 else 0
total_events = len(session)
failed = sum(1 for e in session if e['success'] == 'no')
features = {}
# Syscall frequency (normalized by total events)
for sc in TRACKED_SYSCALLS:
features[f'syscall_{sc}'] = syscall_counts.get(sc, 0) / max(total_events, 1)
# Raw counts for key offensive syscalls
features['raw_execve'] = syscall_counts.get('execve', 0)
features['raw_connect'] = syscall_counts.get('connect', 0)
features['raw_ptrace'] = syscall_counts.get('ptrace', 0)
# Session metadata
features['unique_binaries'] = len(exes)
features['unique_uids'] = len(uids)
features['uid_changes'] = max(len(uids) - 1, 0)
features['total_events'] = total_events
features['duration'] = duration
features['failed_ratio'] = failed / max(total_events, 1)
# Burstiness: stddev of inter-event times
if len(timestamps) > 2:
deltas = np.diff(sorted(timestamps))
features['timing_stddev'] = float(np.std(deltas))
features['timing_mean'] = float(np.mean(deltas))
else:
features['timing_stddev'] = 0.0
features['timing_mean'] = 0.0
# Syscall diversity (unique syscall types / total types tracked)
features['syscall_diversity'] = len(syscall_counts) / max(len(TRACKED_SYSCALLS), 1)
return features
def sessions_to_matrix(sessions):
"""Convert a list of sessions into a feature matrix (numpy array) and feature names."""
if not sessions:
return np.array([]), []
feature_dicts = [extract_features(s) for s in sessions]
feature_names = sorted(feature_dicts[0].keys())
matrix = np.array([[fd[name] for name in feature_names] for fd in feature_dicts])
return matrix, feature_namesChoosing features that matter
The features fall into four categories.
| Category | Features | What they capture |
|---|---|---|
| Syscall frequency | syscall_execve, syscall_connect, etc. | Behavioral profile — what the session does |
| Raw counts | raw_execve, raw_ptrace | Absolute volume of high-risk calls |
| Session metadata | unique_binaries, uid_changes, duration | Complexity and privilege behavior |
| Temporal | timing_stddev, timing_mean, failed_ratio | Rhythm — scripted attacks are bursty |
The ptrace count and uid_changes features are particularly useful. Legitimate sessions rarely attach a debugger or switch UIDs multiple times. Exploitation sessions — especially privilege escalation — do both.
Note
Feature engineering is where domain knowledge matters most. If your environment has specific patterns (e.g., a monitoring agent that calls
ptracelegitimately), add features or filter events to account for them. The model is only as good as the features you give it.
Training the model
Create anomaly-detector/train.py. The training script loads parsed logs, engineers features, trains the Isolation Forest, and saves the model.
import pickle
import sys
import numpy as np
from sklearn.ensemble import IsolationForest
from sklearn.preprocessing import StandardScaler
from parse_audit import parse_audit_log, group_into_sessions
from features import sessions_to_matrix
def train(log_path, model_path='models/detector.pkl', contamination=0.05):
print(f'Parsing {log_path}...')
events = parse_audit_log(log_path)
sessions = group_into_sessions(events)
print(f' {len(events)} events, {len(sessions)} sessions')
if len(sessions) < 20:
print('Not enough sessions to train. Collect more data.')
sys.exit(1)
X, feature_names = sessions_to_matrix(sessions)
print(f' Feature matrix: {X.shape}')
# Scale features so no single dimension dominates
scaler = StandardScaler()
X_scaled = scaler.fit_transform(X)
model = IsolationForest(
n_estimators=200,
contamination=contamination,
max_samples='auto',
random_state=42,
n_jobs=-1,
)
model.fit(X_scaled)
# Score the training data to show the baseline
scores = model.decision_function(X_scaled)
predictions = model.predict(X_scaled)
n_anomalies = (predictions == -1).sum()
print(f' Anomalies in training data: {n_anomalies}/{len(sessions)}')
print(f' Score range: [{scores.min():.3f}, {scores.max():.3f}]')
artifact = {
'model': model,
'scaler': scaler,
'feature_names': feature_names,
'contamination': contamination,
'training_sessions': len(sessions),
}
with open(model_path, 'wb') as f:
pickle.dump(artifact, f)
print(f' Model saved to {model_path}')
return artifact
if __name__ == '__main__':
log_path = sys.argv[1] if len(sys.argv) > 1 else '/var/log/audit/audit.log'
train(log_path)Run the training.
cd anomaly-detector
python train.py data/audit.logParsing data/audit.log...
48213 events, 312 sessions
Feature matrix: (312, 22)
Anomalies in training data: 16/312
Score range: [-0.421, 0.318]
Model saved to models/detector.pklUnderstanding the contamination parameter
The contamination value (0.05 = 5%) tells the model how much of the training data it should expect to be anomalous. This directly affects the decision threshold.
- Too low (0.01): misses real anomalies — high precision, low recall
- Too high (0.15): floods you with false positives
- 0.03–0.07 is a reasonable starting range for audit logs
If you have a known-clean training set (no attacks during collection), set contamination to a small value like 0.01. If your training data might include some malicious activity, 0.05 gives the model room to exclude those sessions from “normal.” To see how threshold choice drives the precision-recall tradeoff interactively, try the Classifier Threshold Lab.
Scoring live sessions
Create anomaly-detector/score.py to score new log data against the trained model.
import pickle
import sys
from parse_audit import parse_audit_log, group_into_sessions
from features import extract_features
def load_model(model_path='models/detector.pkl'):
with open(model_path, 'rb') as f:
return pickle.load(f)
def score_sessions(log_path, model_path='models/detector.pkl'):
artifact = load_model(model_path)
model = artifact['model']
scaler = artifact['scaler']
feature_names = artifact['feature_names']
events = parse_audit_log(log_path)
sessions = group_into_sessions(events)
results = []
for i, session in enumerate(sessions):
feat = extract_features(session)
vector = [[feat[name] for name in feature_names]]
vector_scaled = scaler.transform(vector)
score = model.decision_function(vector_scaled)[0]
prediction = model.predict(vector_scaled)[0]
results.append({
'session_idx': i,
'n_events': len(session),
'score': score,
'anomaly': prediction == -1,
'auid': session[0]['auid'],
'start': session[0]['timestamp'],
'features': feat,
})
return results
def print_results(results):
results.sort(key=lambda r: r['score'])
for r in results:
flag = '*** ANOMALY ***' if r['anomaly'] else ''
print(f"SESSION {r['session_idx']:>4} score: {r['score']:>7.3f} "
f"events: {r['n_events']:>5} auid: {r['auid']} {flag}")
if r['anomaly']:
f = r['features']
print(f" execve: {f['raw_execve']:<4} connect: {f['raw_connect']:<4} "
f"ptrace: {f['raw_ptrace']:<4} uid_changes: {f['uid_changes']}")
print(f" unique_binaries: {f['unique_binaries']:<4} "
f"duration: {f['duration']:.0f}s "
f"failed_ratio: {f['failed_ratio']:.2f}")
print()
if __name__ == '__main__':
log_path = sys.argv[1] if len(sys.argv) > 1 else '/var/log/audit/audit.log'
results = score_sessions(log_path)
print_results(results)python score.py data/audit.logSESSION 42 score: -0.381 events: 487 auid: 1000 *** ANOMALY ***
execve: 47 connect: 23 ptrace: 4 uid_changes: 3
unique_binaries: 19 duration: 12s failed_ratio: 0.18
SESSION 187 score: -0.294 events: 203 auid: 1000 *** ANOMALY ***
execve: 31 connect: 45 ptrace: 0 uid_changes: 2
unique_binaries: 14 duration: 8s failed_ratio: 0.24
SESSION 1 score: 0.127 events: 34 auid: 1000
SESSION 2 score: 0.198 events: 12 auid: 0
SESSION 3 score: 0.231 events: 87 auid: 1000
...Anomalous sessions float to the top with negative scores. The feature breakdown tells you why a session was flagged — high execve count, ptrace usage, UID changes — which gives you something actionable to investigate.
Generating test attacks
To validate the detector, simulate the kind of activity an attacker generates. Run these in a test environment.
Warning
Lab only Run these commands only on machines you own and control. These simulate attacker behavior and will trigger security tools.
# Simulate recon: rapid enumeration
for cmd in id whoami hostname uname cat; do $cmd 2>/dev/null; done
ls /etc/shadow /etc/passwd /home/*/.ssh 2>/dev/null
# Simulate lateral movement: many outbound connections
for port in 22 80 443 8080 8443; do
timeout 1 bash -c "echo >/dev/tcp/127.0.0.1/$port" 2>/dev/null
done
# Simulate privilege escalation pattern
sudo -l 2>/dev/null
find / -perm -4000 -type f 2>/dev/null | head -5After running these, wait 15-30 seconds for auditd to write the events, then re-score.
python score.py data/audit.logThe simulated attack session should appear with a strongly negative score, high execve count, and elevated unique_binaries.
Integrating with alerting
The detector is useful only if it feeds into something you monitor. Here are two practical integration points.
Cron-based scoring
Run the scorer on a schedule and pipe anomalies to syslog.
cat > anomaly-detector/cron_score.sh << 'SCRIPT'
#!/bin/bash
cd /path/to/anomaly-detector
source venv/bin/activate
python -c "
from score import score_sessions
results = score_sessions('/var/log/audit/audit.log')
anomalies = [r for r in results if r['anomaly']]
for a in anomalies:
import syslog
msg = (f\"ANOMALY_DETECTED session={a['session_idx']} \"
f\"score={a['score']:.3f} auid={a['auid']} \"
f\"events={a['n_events']}\")
syslog.syslog(syslog.LOG_WARNING, msg)
"
SCRIPT
chmod +x anomaly-detector/cron_score.sh# Run every 15 minutes (preserve existing root crontab entries)
(sudo crontab -l 2>/dev/null; echo "*/15 * * * * /path/to/anomaly-detector/cron_score.sh") | sudo crontab -Wazuh integration
If you run Wazuh, write a custom decoder and rule to pick up the syslog messages.
<!-- /var/ossec/etc/decoders/local_decoder.xml -->
<decoder name="anomaly_detector">
<prematch>ANOMALY_DETECTED</prematch>
<regex>session=(\d+) score=(-?\S+) auid=(\S+) events=(\d+)</regex>
<order>session_id, anomaly_score, audit_uid, event_count</order>
</decoder><!-- /var/ossec/etc/rules/local_rules.xml -->
<group name="anomaly_detection">
<rule id="100200" level="10">
<decoded_as>anomaly_detector</decoded_as>
<description>ML anomaly detector flagged suspicious session (auid: $(audit_uid), score: $(anomaly_score))</description>
<mitre>
<id>T1059</id>
</mitre>
</rule>
</group>Tip
Retraining cadence Retrain the model weekly or whenever your baseline changes significantly (new services deployed, team changes). Stale models drift — what was anomalous last month might be normal now.
Limitations and next steps
Isolation Forest on audit logs is a solid starting point, but it has real limitations.
What it catches well:
- Noisy enumeration (nmap, linpeas, manual recon)
- Rapid-fire exploitation attempts
- Privilege escalation with unusual syscall patterns
- Scripted attacks (bursty timing signature)
What it misses:
- Low-and-slow attacks that mimic normal session rhythms
- Attacks that use only common binaries (pure living-off-the-land)
- Insider threats where the attacker is the baseline
Next steps to consider:
- Add per-user baselines instead of a global model — what’s normal for
rootis anomalous forwww-data - Incorporate sequence information (n-gram features over syscall sequences)
- Feed the anomaly scores into a RAG pipeline to generate investigation summaries — combine this tutorial with the RAG pipeline tutorial
- Export features to a time-series database (Prometheus) and build Grafana dashboards for trend visualization