The base RAG tutorial uses three static advisory files. That is enough to prove the pipeline works, but it is not useful for real vulnerability management. New CVEs are published daily. If your knowledge base does not grow with them, your pipeline answers stale questions.
This tutorial connects the pipeline to live CVE feeds. You will pull vulnerability data from the NVD (National Vulnerability Database) API, normalize it into the format your pipeline expects, implement incremental ingestion so you only embed new CVEs, and add metadata filtering so you can query by severity, date, or product. You will also see how to use OSV.dev as an alternative source for open-source vulnerabilities.
Everything still runs locally. The only network calls are to public APIs to fetch CVE data.
Architecture
The pipeline gains a new front end: a fetcher that pulls from external feeds and writes normalized documents for the existing ingestion flow to process.
┌───────────┐ ┌───────────┐
│ NVD API │ │ OSV.dev │
└─────┬─────┘ └─────┬─────┘
│ │
└─────────┬─────────┘
│
┌──────▼───────┐
│ Fetcher │
│ (normalize) │
└──────┬───────┘
│
┌──────▼───────┐
│ State File │
│ (seen CVEs) │
└──────┬───────┘
│
new CVEs only
│
┌──────▼───────┐
│ chunk + embed│
└──────┬───────┘
│
┌──────▼───────┐
│ ChromaDB │
└──────────────┘The state file tracks which CVEs have already been ingested. On each run, the fetcher pulls recent CVEs, skips ones it has already processed, and embeds only the new ones. This makes the pipeline efficient for scheduled runs (cron or systemd timer) without re-embedding the entire corpus each time.
Step 1: Understand the NVD API
The NVD API v2.0 is the authoritative source for CVE data. It is free to use without an API key, but requests without a key are rate-limited to 5 per 30-second window. With a key (free registration), the limit increases to 50 per 30 seconds.
Register for an API key at https://nvd.nist.gov/developers/request-an-api-key. The key arrives by email within minutes. It is optional for this tutorial, but you will want it for production use.
A basic request:
curl -s "https://services.nvd.nist.gov/rest/json/cves/2.0?resultsPerPage=2" | python -m json.tool | head -40The response structure (simplified):
{
"resultsPerPage": 2,
"totalResults": 274000,
"vulnerabilities": [
{
"cve": {
"id": "CVE-2024-3094",
"descriptions": [
{"lang": "en", "value": "Malicious code was discovered in..."}
],
"metrics": {
"cvssMetricV31": [
{"cvssData": {"baseScore": 10.0, "baseSeverity": "CRITICAL"}}
]
},
"published": "2024-03-29T17:15:00",
"configurations": [...]
}
}
]
}Key fields:
cve.id: The CVE identifiercve.descriptions: Array of descriptions, filter forlang: "en"cve.metrics.cvssMetricV31: CVSS v3.1 scoring (may also appear undercvssMetricV30orcvssMetricV40)cve.published: Publication timestampcve.configurations: Affected product information (CPE strings)
Step 2: Build the CVE fetcher
Install the HTTP library:
pip install httpxCreate cve_fetcher.py:
import httpx
import json
import os
import time
from datetime import datetime, timedelta, timezone
NVD_API_URL = "https://services.nvd.nist.gov/rest/json/cves/2.0"
STATE_FILE = "ingestion_state.json"
# Set to your API key, or None for unauthenticated requests
NVD_API_KEY = os.environ.get("NVD_API_KEY")
def load_state():
"""Load the ingestion state.
Schema: {"ingested": {cve_id: last_modified_iso_ts}, "last_fetch": iso_ts}
Older versions of this tutorial used a list of CVE IDs; we accept that on
read and let fetch_new_cves migrate it on first run.
"""
if os.path.exists(STATE_FILE):
with open(STATE_FILE) as f:
return json.load(f)
return {"ingested": {}, "last_fetch": None}
def save_state(state):
with open(STATE_FILE, "w") as f:
json.dump(state, f, indent=2)
def fetch_cves(days_back=7, keyword=None):
"""Fetch CVEs changed recently from the NVD API.
We query by lastModified, not published date, so edits to older CVEs are
picked up when NVD revises descriptions, CVSS scores, or affected products.
Newly published CVEs are included too because their first publication also
sets lastModified.
"""
now = datetime.now(timezone.utc)
start = now - timedelta(days=days_back)
params = {
"lastModStartDate": start.strftime("%Y-%m-%dT%H:%M:%S.000"),
"lastModEndDate": now.strftime("%Y-%m-%dT%H:%M:%S.000"),
"resultsPerPage": 100,
"startIndex": 0,
}
if keyword:
params["keywordSearch"] = keyword
headers = {}
if NVD_API_KEY:
headers["apiKey"] = NVD_API_KEY
all_cves = []
total = None
while True:
response = httpx.get(
NVD_API_URL, params=params, headers=headers, timeout=30
)
response.raise_for_status()
data = response.json()
if total is None:
total = data["totalResults"]
print(f"Found {total} CVEs changed in the last {days_back} days")
all_cves.extend(data["vulnerabilities"])
if len(all_cves) >= total:
break
params["startIndex"] += data["resultsPerPage"]
# Respect rate limits
wait = 1 if NVD_API_KEY else 6
time.sleep(wait)
return all_cves
def extract_cvss(cve_data):
"""Extract the best available CVSS score and severity.
Returns score=None (not 0.0) when nothing is available, so callers can
distinguish "genuinely unscored" from "score is exactly zero". Returning
0.0 silently buries unscored CVEs at the bottom of severity-sorted lists,
which is exactly the wrong behaviour for triage.
"""
metrics = cve_data.get("metrics", {})
for version in ["cvssMetricV40", "cvssMetricV31", "cvssMetricV30", "cvssMetricV2"]:
entries = metrics.get(version)
if not entries:
continue
cvss = entries[0].get("cvssData", {})
score = cvss.get("baseScore")
severity = cvss.get("baseSeverity") or cvss.get("severity") # v2 uses "severity"
if score is None:
continue
return {
"score": float(score),
"severity": severity or "UNKNOWN",
"version": version,
}
return {"score": None, "severity": "UNKNOWN", "version": None}
def normalize_cve(vuln):
"""Convert NVD API response to plain text advisory format."""
cve = vuln["cve"]
cve_id = cve["id"]
# English description
description = ""
for desc in cve.get("descriptions", []):
if desc["lang"] == "en":
description = desc["value"]
break
cvss = extract_cvss(cve)
published = cve.get("published", "")[:10]
last_modified = cve.get("lastModified", "")
score_text = f"CVSS {cvss['score']}" if cvss["score"] is not None else "CVSS unscored"
text = f"""{cve_id}
Severity: {cvss['severity']} ({score_text})
Published: {published}
Last modified: {last_modified}
{description}"""
return {
"id": cve_id,
"text": text,
"metadata": {
"cve_id": cve_id,
"severity": cvss["severity"],
# ChromaDB requires scalar metadata; use -1 sentinel for unscored so
# downstream filters like {"cvss_score": {"$gte": 7.0}} can still exclude them.
"cvss_score": cvss["score"] if cvss["score"] is not None else -1.0,
"cvss_version": cvss["version"] or "none",
"published": published,
"last_modified": last_modified,
},
}
def fetch_new_cves(days_back=7, keyword=None):
"""Fetch CVEs and return ones that are new OR have been modified since
the last time we ingested them.
NVD edits CVE descriptions and CVSS scores after publication; treating
an ID as "done forever" leaves stale embeddings in the index. We track
the last_modified timestamp per ID and re-ingest when it advances.
"""
state = load_state()
# state["ingested"] is now a dict: cve_id -> last_modified seen
if isinstance(state.get("ingested"), list):
# Backwards-compat with the old list-of-IDs schema.
state["ingested"] = {cve_id: "" for cve_id in state["ingested"]}
raw_cves = fetch_cves(days_back=days_back, keyword=keyword)
normalized = [normalize_cve(v) for v in raw_cves]
new_or_updated = []
skipped = 0
for c in normalized:
prev = state["ingested"].get(c["id"])
current = c["metadata"]["last_modified"]
if prev is None or (current and current > prev):
new_or_updated.append(c)
else:
skipped += 1
print(
f"To ingest: {len(new_or_updated)} "
f"(skipped {skipped} already-current; "
f"of those to ingest, "
f"{sum(1 for c in new_or_updated if c['id'] in state['ingested'])} are updates)"
)
return new_or_updated
if __name__ == "__main__":
cves = fetch_new_cves(days_back=7)
for cve in cves[:3]:
print(f"\n{'='*60}")
print(cve["text"][:300])Test it:
python cve_fetcher.pyYou should see recent CVEs pulled from the NVD. If you get a 403, you are being rate-limited. Wait 30 seconds and try again, or set the NVD_API_KEY environment variable:
export NVD_API_KEY="your-key-here"
python cve_fetcher.pyTip
Keyword filtering The NVD API supports keyword search. Use it to focus on your domain:
fetch_new_cves(keyword="linux kernel")orfetch_new_cves(keyword="openssh"). This dramatically reduces the number of CVEs to process and improves relevance.
Step 3: Incremental ingestion
Now wire the fetcher into the embedding pipeline. Create live_ingest.py:
import chromadb
import ollama
from datetime import datetime, timezone
from cve_fetcher import fetch_new_cves, load_state, save_state
COLLECTION_NAME = "security_advisories"
EMBED_MODEL = "nomic-embed-text"
CHUNK_SIZE = 500
CHUNK_OVERLAP = 50
def chunk_text(text, size=CHUNK_SIZE, overlap=CHUNK_OVERLAP):
chunks = []
start = 0
while start < len(text):
end = start + size
chunks.append(text[start:end])
start += size - overlap
return chunks
def embed(text):
return ollama.embed(model=EMBED_MODEL, input=text)["embeddings"][0]
def ingest_cves(days_back=7, keyword=None):
"""Fetch new CVEs and add them to ChromaDB."""
new_cves = fetch_new_cves(days_back=days_back, keyword=keyword)
if not new_cves:
print("No new CVEs to ingest.")
return
client = chromadb.PersistentClient(path="./chroma_db")
# Get or create the collection (don't delete existing data)
try:
collection = client.get_collection(COLLECTION_NAME)
print(f"Adding to existing collection '{COLLECTION_NAME}'")
except ValueError:
collection = client.create_collection(
name=COLLECTION_NAME,
metadata={"hnsw:space": "cosine"},
)
print(f"Created new collection '{COLLECTION_NAME}'")
ids, embeddings, documents, metadatas = [], [], [], []
state = load_state()
if isinstance(state.get("ingested"), list):
state["ingested"] = {cve_id: "" for cve_id in state["ingested"]}
for cve in new_cves:
# If we've ingested this CVE before, drop its previous chunks first
# so the description/CVSS edit replaces (not duplicates) what's there.
if cve["id"] in state["ingested"]:
collection.delete(where={"source": cve["id"]})
chunks = chunk_text(cve["text"])
print(f"{cve['id']}: {len(chunks)} chunks")
for i, chunk in enumerate(chunks):
chunk_id = f"{cve['id']}::chunk{i}"
ids.append(chunk_id)
embeddings.append(embed(chunk))
documents.append(chunk)
metadatas.append({
**cve["metadata"],
"chunk_index": i,
"source": cve["id"],
})
state["ingested"][cve["id"]] = cve["metadata"]["last_modified"]
if ids:
collection.add(
ids=ids,
embeddings=embeddings,
documents=documents,
metadatas=metadatas,
)
state["last_fetch"] = datetime.now(timezone.utc).isoformat()
save_state(state)
print(f"\nIngested {len(new_cves)} new or updated CVEs ({len(ids)} chunks)")
if __name__ == "__main__":
import sys
days = int(sys.argv[1]) if len(sys.argv) > 1 else 7
keyword = sys.argv[2] if len(sys.argv) > 2 else None
ingest_cves(days_back=days, keyword=keyword)The key differences from the base tutorial’s ingest.py:
- It uses
get_collection()instead of deleting and recreating the collection, so new chunks are added alongside existing ones. - The state file maps
cve_id -> last_modified, not just a flat list of IDs. NVD frequently edits descriptions and re-scores CVEs after publication (sometimes weeks later, when a vendor confirms the affected versions). The fetcher querieslastModStartDate/lastModEndDate, and when the fulllastModifiedtimestamp advances we re-ingest, dropping the previous chunks first so the index doesn’t drift away from the source over time. - Unscored CVEs are kept with
cvss_score = -1so severity-threshold filters ({"cvss_score": {"$gte": 7.0}}) still exclude them rather than treating them as 0.0 critical-floor matches.
# Ingest CVEs from the last 7 days
python live_ingest.py
# Ingest CVEs from the last 30 days, filtered to Linux
python live_ingest.py 30 linuxStep 4: Add metadata filtering
With CVSS scores and severity levels stored in ChromaDB metadata, you can filter queries to show only relevant results. Update your query function:
import chromadb
import ollama
COLLECTION_NAME = "security_advisories"
EMBED_MODEL = "nomic-embed-text"
CHAT_MODEL = "llama3.2"
def query_with_filters(question, severity=None, min_cvss=None, n_results=3):
"""Query the pipeline with optional metadata filters."""
client = chromadb.PersistentClient(path="./chroma_db")
collection = client.get_collection(COLLECTION_NAME)
query_embedding = ollama.embed(
model=EMBED_MODEL, input=question
)["embeddings"][0]
# Build filter clause
where = None
conditions = []
if severity:
conditions.append({"severity": severity.upper()})
if min_cvss is not None:
conditions.append({"cvss_score": {"$gte": min_cvss}})
if len(conditions) == 1:
where = conditions[0]
elif len(conditions) > 1:
where = {"$and": conditions}
results = collection.query(
query_embeddings=[query_embedding],
n_results=n_results,
where=where,
include=["documents", "metadatas", "distances"],
)
return results
# Examples
results = query_with_filters(
"remote code execution vulnerabilities",
severity="CRITICAL",
)
results = query_with_filters(
"container escape",
min_cvss=7.0,
)ChromaDB’s where clause supports comparison operators ($eq, $gte, $lte, $in) and logical operators ($and, $or). This is efficient because filtering happens at the metadata level before the distance calculation, not after.
Warning
Metadata filters reduce result count If you filter for
severity: "CRITICAL"but your corpus only has two critical CVEs, ChromaDB returns at most two results even if you requested five. Always check the length of the results array and fall back to unfiltered queries when the filtered set is too small.
Step 5: OSV.dev as an alternative source
OSV.dev (Open Source Vulnerabilities) is a free, open-source vulnerability database focused on packages and ecosystems. Its API is simpler than the NVD and does not require an API key.
Create osv_fetcher.py:
import httpx
OSV_API_URL = "https://api.osv.dev/v1/query"
def fetch_osv_vulns(package, version, ecosystem="PyPI"):
"""Query OSV.dev for vulnerabilities affecting a package version."""
payload = {
"package": {
"name": package,
"ecosystem": ecosystem,
},
"version": version,
}
vulns = []
page_token = None
while True:
request_payload = dict(payload)
if page_token:
request_payload["page_token"] = page_token
response = httpx.post(OSV_API_URL, json=request_payload, timeout=30)
response.raise_for_status()
data = response.json()
vulns.extend(data.get("vulns", []))
page_token = data.get("next_page_token")
if not page_token:
break
print(
f"Found {len(vulns)} vulnerabilities for "
f"{ecosystem}/{package}@{version}"
)
normalized = []
for vuln in vulns:
vuln_id = vuln.get("id", "UNKNOWN")
summary = vuln.get("summary", "No summary available.")
details = vuln.get("details", "")
published = vuln.get("published", "")[:10]
# OSV stores severity.score as a CVSS vector string
# (e.g., "CVSS:3.1/AV:N/AC:L/..."), not a numeric base score.
severity_info = ""
for severity in vuln.get("severity", []):
if severity.get("type") in ("CVSS_V4", "CVSS_V3", "CVSS_V2"):
severity_info = f"CVSS vector ({severity['type']}): {severity['score']}"
break
affected_text = ""
for affected in vuln.get("affected", []):
pkg = affected.get("package", {})
ranges = affected.get("ranges", [])
for r in ranges:
events = r.get("events", [])
for event in events:
if "fixed" in event:
affected_text += f"Fixed in: {event['fixed']}\n"
text = f"""{vuln_id}: {summary}
{severity_info}
Published: {published}
{details[:500]}
{affected_text}"""
normalized.append({
"id": vuln_id,
"text": text.strip(),
"metadata": {
"source": vuln_id,
"ecosystem": ecosystem,
"package": package,
"published": published,
},
})
return normalized
if __name__ == "__main__":
vulns = fetch_osv_vulns("chromadb", "0.5.23", ecosystem="PyPI")
for v in vulns[:3]:
print(f"\n{'='*60}")
print(v["text"][:300])OSV.dev is especially useful when you want to track vulnerabilities in your project’s dependencies. Query it with an ecosystem, package name, and version, or use /v1/querybatch when you need to check many dependencies at once.
Tip
OSV queries are version-scoped Unlike the NVD example above, OSV’s
/v1/queryAPI is designed around package+version (or commit) lookups. If you only have a package name and want to enumerate all historical advisories, use OSV’s data exports,/v1/vulns/{id}after collecting IDs elsewhere, or a tool such asosv-scanner.
Note
Metadata schema asymmetry The OSV normalizer above does not emit
severityorcvss_scoremetadata fields, because OSV’sseverity.scoreis a CVSS vector string rather than a numeric base score. If you mix NVD and OSV documents in the same collection,query_with_filters(severity=...)andmin_cvss=...will silently exclude the OSV-sourced ones. Either parse the CVSS vector into a numeric score (e.g., with thecvssPyPI package) or maintain two collections and route queries accordingly.
Note
Pagination matters OSV may return a
next_page_tokenfor larger result sets. The loop above keeps following that token until exhaustion so you do not silently miss vulnerabilities for heavily used packages.
python osv_fetcher.pyThe output follows the same {id, text, metadata} format as the NVD fetcher, so you can feed it into live_ingest.py with minimal changes. Add a --source osv flag to your ingestion script to choose between feeds.
Step 6: Schedule automated ingestion
For a pipeline that stays current, run the fetcher on a schedule. A cron job is the simplest option:
crontab -eAdd a line to run daily at 6 AM:
0 6 * * * cd /path/to/rag-pipeline && /path/to/.venv/bin/python live_ingest.py 1 >> /var/log/rag-ingest.log 2>&1The 1 argument tells the fetcher to look back 1 day, matching the daily schedule. Use the full path to the virtual environment’s Python to avoid environment issues.
For systemd, create a timer:
# /etc/systemd/system/rag-ingest.service
[Unit]
Description=RAG Pipeline CVE Ingestion
[Service]
Type=oneshot
WorkingDirectory=/path/to/rag-pipeline
ExecStart=/path/to/.venv/bin/python live_ingest.py 1
User=your-username# /etc/systemd/system/rag-ingest.timer
[Unit]
Description=Daily RAG CVE ingestion
[Timer]
OnCalendar=*-*-* 06:00:00
Persistent=true
[Install]
WantedBy=timers.targetsudo systemctl enable --now rag-ingest.timerNote
Treat your state file like data The
ingestion_state.jsonfile tracks what has been ingested. If you delete it, the next run will fetch and re-embed everything again. Because the chunk IDs are deterministic, ChromaDB ignores existing IDs rather than storing duplicate records, but you still waste time and compute. Back it up alongside yourchroma_db/directory.
Common mistakes
Hitting API rate limits. The NVD allows 5 requests per 30 seconds without a key. If you are fetching thousands of CVEs, you will hit this quickly. The fetcher includes time.sleep() between pages, but bursts from other tools using the same IP can still trigger limits. Use an API key for production.
Stale embeddings after CVE updates. The NVD sometimes updates CVE descriptions after initial publication (adding analysis, correcting severity scores). The simple state file approach skips already-ingested CVEs even if they have been updated. For production, track the lastModifiedDate and re-embed CVEs whose descriptions changed.
Context window bloat from large corpora. After a month of daily ingestion, you might have thousands of CVEs in ChromaDB. Retrieving 5 chunks from 5 different CVEs produces a large context. Use metadata filtering to narrow results before retrieval, not after.
Forgetting to handle pagination. The NVD API returns at most 2,000 results per request. The fetcher handles pagination with startIndex, but if you modify the code, make sure the loop runs until startIndex + resultsPerPage >= totalResults.
Next steps
- Add PDF support for vendor advisories that arrive as formatted reports. See Local RAG with PDF Documents.
- Understand the adversarial risk. When your pipeline ingests from external feeds, poisoning becomes a real concern. RAG Poisoning walks through the attack surface and defenses.
- Build a SOC triage assistant that combines CVE data with Wazuh alerts. See Run a Private AI Assistant in Your SOC.
- Add a chat interface to query your growing knowledge base interactively. See Add a Chat Interface to Your Local RAG Pipeline.