Tutorial

Connect Your RAG Pipeline to Live CVE Feeds

Pull from the NVD API and OSV.dev, implement incremental ingestion, and add metadata filtering to your local RAG pipeline.

7 min read intermediate

Prerequisites

  • Tutorial: Build a Local RAG Pipeline with Ollama and ChromaDB
  • Basic Python knowledge

Part 4 of 5 in Local RAG Pipeline

Table of Contents

The base RAG tutorial uses three static advisory files. That is enough to prove the pipeline works, but it is not useful for real vulnerability management. New CVEs are published daily. If your knowledge base does not grow with them, your pipeline answers stale questions.

This tutorial connects the pipeline to live CVE feeds. You will pull vulnerability data from the NVD (National Vulnerability Database) API, normalize it into the format your pipeline expects, implement incremental ingestion so you only embed new CVEs, and add metadata filtering so you can query by severity, date, or product. You will also see how to use OSV.dev as an alternative source for open-source vulnerabilities.

Everything still runs locally. The only network calls are to public APIs to fetch CVE data.

Architecture

The pipeline gains a new front end: a fetcher that pulls from external feeds and writes normalized documents for the existing ingestion flow to process.

   ┌───────────┐      ┌───────────┐
   │  NVD API  │      │  OSV.dev  │
   └─────┬─────┘      └─────┬─────┘
         │                   │
         └─────────┬─────────┘

            ┌──────▼───────┐
            │   Fetcher    │
            │  (normalize) │
            └──────┬───────┘

            ┌──────▼───────┐
            │ State File   │
            │ (seen CVEs)  │
            └──────┬───────┘

             new CVEs only

            ┌──────▼───────┐
            │ chunk + embed│
            └──────┬───────┘

            ┌──────▼───────┐
            │  ChromaDB    │
            └──────────────┘

The state file tracks which CVEs have already been ingested. On each run, the fetcher pulls recent CVEs, skips ones it has already processed, and embeds only the new ones. This makes the pipeline efficient for scheduled runs (cron or systemd timer) without re-embedding the entire corpus each time.

Step 1: Understand the NVD API

The NVD API v2.0 is the authoritative source for CVE data. It is free to use without an API key, but requests without a key are rate-limited to 5 per 30-second window. With a key (free registration), the limit increases to 50 per 30 seconds.

Register for an API key at https://nvd.nist.gov/developers/request-an-api-key. The key arrives by email within minutes. It is optional for this tutorial, but you will want it for production use.

A basic request:

curl -s "https://services.nvd.nist.gov/rest/json/cves/2.0?resultsPerPage=2" | python -m json.tool | head -40

The response structure (simplified):

{
  "resultsPerPage": 2,
  "totalResults": 274000,
  "vulnerabilities": [
    {
      "cve": {
        "id": "CVE-2024-3094",
        "descriptions": [
          {"lang": "en", "value": "Malicious code was discovered in..."}
        ],
        "metrics": {
          "cvssMetricV31": [
            {"cvssData": {"baseScore": 10.0, "baseSeverity": "CRITICAL"}}
          ]
        },
        "published": "2024-03-29T17:15:00",
        "configurations": [...]
      }
    }
  ]
}

Key fields:

  • cve.id: The CVE identifier
  • cve.descriptions: Array of descriptions, filter for lang: "en"
  • cve.metrics.cvssMetricV31: CVSS v3.1 scoring (may also appear under cvssMetricV30 or cvssMetricV40)
  • cve.published: Publication timestamp
  • cve.configurations: Affected product information (CPE strings)

Step 2: Build the CVE fetcher

Install the HTTP library:

pip install httpx

Create cve_fetcher.py:

import httpx
import json
import os
import time
from datetime import datetime, timedelta, timezone

NVD_API_URL = "https://services.nvd.nist.gov/rest/json/cves/2.0"
STATE_FILE = "ingestion_state.json"

# Set to your API key, or None for unauthenticated requests
NVD_API_KEY = os.environ.get("NVD_API_KEY")


def load_state():
    """Load the ingestion state.

    Schema: {"ingested": {cve_id: last_modified_iso_ts}, "last_fetch": iso_ts}
    Older versions of this tutorial used a list of CVE IDs; we accept that on
    read and let fetch_new_cves migrate it on first run.
    """
    if os.path.exists(STATE_FILE):
        with open(STATE_FILE) as f:
            return json.load(f)
    return {"ingested": {}, "last_fetch": None}


def save_state(state):
    with open(STATE_FILE, "w") as f:
        json.dump(state, f, indent=2)


def fetch_cves(days_back=7, keyword=None):
    """Fetch CVEs changed recently from the NVD API.

    We query by lastModified, not published date, so edits to older CVEs are
    picked up when NVD revises descriptions, CVSS scores, or affected products.
    Newly published CVEs are included too because their first publication also
    sets lastModified.
    """
    now = datetime.now(timezone.utc)
    start = now - timedelta(days=days_back)

    params = {
        "lastModStartDate": start.strftime("%Y-%m-%dT%H:%M:%S.000"),
        "lastModEndDate": now.strftime("%Y-%m-%dT%H:%M:%S.000"),
        "resultsPerPage": 100,
        "startIndex": 0,
    }
    if keyword:
        params["keywordSearch"] = keyword

    headers = {}
    if NVD_API_KEY:
        headers["apiKey"] = NVD_API_KEY

    all_cves = []
    total = None

    while True:
        response = httpx.get(
            NVD_API_URL, params=params, headers=headers, timeout=30
        )
        response.raise_for_status()
        data = response.json()

        if total is None:
            total = data["totalResults"]
            print(f"Found {total} CVEs changed in the last {days_back} days")

        all_cves.extend(data["vulnerabilities"])

        if len(all_cves) >= total:
            break

        params["startIndex"] += data["resultsPerPage"]

        # Respect rate limits
        wait = 1 if NVD_API_KEY else 6
        time.sleep(wait)

    return all_cves


def extract_cvss(cve_data):
    """Extract the best available CVSS score and severity.

    Returns score=None (not 0.0) when nothing is available, so callers can
    distinguish "genuinely unscored" from "score is exactly zero". Returning
    0.0 silently buries unscored CVEs at the bottom of severity-sorted lists,
    which is exactly the wrong behaviour for triage.
    """
    metrics = cve_data.get("metrics", {})

    for version in ["cvssMetricV40", "cvssMetricV31", "cvssMetricV30", "cvssMetricV2"]:
        entries = metrics.get(version)
        if not entries:
            continue
        cvss = entries[0].get("cvssData", {})
        score = cvss.get("baseScore")
        severity = cvss.get("baseSeverity") or cvss.get("severity")  # v2 uses "severity"
        if score is None:
            continue
        return {
            "score": float(score),
            "severity": severity or "UNKNOWN",
            "version": version,
        }

    return {"score": None, "severity": "UNKNOWN", "version": None}


def normalize_cve(vuln):
    """Convert NVD API response to plain text advisory format."""
    cve = vuln["cve"]
    cve_id = cve["id"]

    # English description
    description = ""
    for desc in cve.get("descriptions", []):
        if desc["lang"] == "en":
            description = desc["value"]
            break

    cvss = extract_cvss(cve)
    published = cve.get("published", "")[:10]
    last_modified = cve.get("lastModified", "")

    score_text = f"CVSS {cvss['score']}" if cvss["score"] is not None else "CVSS unscored"
    text = f"""{cve_id}

Severity: {cvss['severity']} ({score_text})
Published: {published}
Last modified: {last_modified}

{description}"""

    return {
        "id": cve_id,
        "text": text,
        "metadata": {
            "cve_id": cve_id,
            "severity": cvss["severity"],
            # ChromaDB requires scalar metadata; use -1 sentinel for unscored so
            # downstream filters like {"cvss_score": {"$gte": 7.0}} can still exclude them.
            "cvss_score": cvss["score"] if cvss["score"] is not None else -1.0,
            "cvss_version": cvss["version"] or "none",
            "published": published,
            "last_modified": last_modified,
        },
    }


def fetch_new_cves(days_back=7, keyword=None):
    """Fetch CVEs and return ones that are new OR have been modified since
    the last time we ingested them.

    NVD edits CVE descriptions and CVSS scores after publication; treating
    an ID as "done forever" leaves stale embeddings in the index. We track
    the last_modified timestamp per ID and re-ingest when it advances.
    """
    state = load_state()
    # state["ingested"] is now a dict: cve_id -> last_modified seen
    if isinstance(state.get("ingested"), list):
        # Backwards-compat with the old list-of-IDs schema.
        state["ingested"] = {cve_id: "" for cve_id in state["ingested"]}

    raw_cves = fetch_cves(days_back=days_back, keyword=keyword)
    normalized = [normalize_cve(v) for v in raw_cves]

    new_or_updated = []
    skipped = 0
    for c in normalized:
        prev = state["ingested"].get(c["id"])
        current = c["metadata"]["last_modified"]
        if prev is None or (current and current > prev):
            new_or_updated.append(c)
        else:
            skipped += 1

    print(
        f"To ingest: {len(new_or_updated)} "
        f"(skipped {skipped} already-current; "
        f"of those to ingest, "
        f"{sum(1 for c in new_or_updated if c['id'] in state['ingested'])} are updates)"
    )

    return new_or_updated


if __name__ == "__main__":
    cves = fetch_new_cves(days_back=7)
    for cve in cves[:3]:
        print(f"\n{'='*60}")
        print(cve["text"][:300])

Test it:

python cve_fetcher.py

You should see recent CVEs pulled from the NVD. If you get a 403, you are being rate-limited. Wait 30 seconds and try again, or set the NVD_API_KEY environment variable:

export NVD_API_KEY="your-key-here"
python cve_fetcher.py

Tip

Keyword filtering The NVD API supports keyword search. Use it to focus on your domain: fetch_new_cves(keyword="linux kernel") or fetch_new_cves(keyword="openssh"). This dramatically reduces the number of CVEs to process and improves relevance.

Step 3: Incremental ingestion

Now wire the fetcher into the embedding pipeline. Create live_ingest.py:

import chromadb
import ollama
from datetime import datetime, timezone
from cve_fetcher import fetch_new_cves, load_state, save_state

COLLECTION_NAME = "security_advisories"
EMBED_MODEL = "nomic-embed-text"
CHUNK_SIZE = 500
CHUNK_OVERLAP = 50


def chunk_text(text, size=CHUNK_SIZE, overlap=CHUNK_OVERLAP):
    chunks = []
    start = 0
    while start < len(text):
        end = start + size
        chunks.append(text[start:end])
        start += size - overlap
    return chunks


def embed(text):
    return ollama.embed(model=EMBED_MODEL, input=text)["embeddings"][0]


def ingest_cves(days_back=7, keyword=None):
    """Fetch new CVEs and add them to ChromaDB."""
    new_cves = fetch_new_cves(days_back=days_back, keyword=keyword)
    if not new_cves:
        print("No new CVEs to ingest.")
        return

    client = chromadb.PersistentClient(path="./chroma_db")

    # Get or create the collection (don't delete existing data)
    try:
        collection = client.get_collection(COLLECTION_NAME)
        print(f"Adding to existing collection '{COLLECTION_NAME}'")
    except ValueError:
        collection = client.create_collection(
            name=COLLECTION_NAME,
            metadata={"hnsw:space": "cosine"},
        )
        print(f"Created new collection '{COLLECTION_NAME}'")

    ids, embeddings, documents, metadatas = [], [], [], []
    state = load_state()
    if isinstance(state.get("ingested"), list):
        state["ingested"] = {cve_id: "" for cve_id in state["ingested"]}

    for cve in new_cves:
        # If we've ingested this CVE before, drop its previous chunks first
        # so the description/CVSS edit replaces (not duplicates) what's there.
        if cve["id"] in state["ingested"]:
            collection.delete(where={"source": cve["id"]})

        chunks = chunk_text(cve["text"])
        print(f"{cve['id']}: {len(chunks)} chunks")

        for i, chunk in enumerate(chunks):
            chunk_id = f"{cve['id']}::chunk{i}"
            ids.append(chunk_id)
            embeddings.append(embed(chunk))
            documents.append(chunk)
            metadatas.append({
                **cve["metadata"],
                "chunk_index": i,
                "source": cve["id"],
            })

        state["ingested"][cve["id"]] = cve["metadata"]["last_modified"]

    if ids:
        collection.add(
            ids=ids,
            embeddings=embeddings,
            documents=documents,
            metadatas=metadatas,
        )
        state["last_fetch"] = datetime.now(timezone.utc).isoformat()
        save_state(state)
        print(f"\nIngested {len(new_cves)} new or updated CVEs ({len(ids)} chunks)")


if __name__ == "__main__":
    import sys

    days = int(sys.argv[1]) if len(sys.argv) > 1 else 7
    keyword = sys.argv[2] if len(sys.argv) > 2 else None
    ingest_cves(days_back=days, keyword=keyword)

The key differences from the base tutorial’s ingest.py:

  1. It uses get_collection() instead of deleting and recreating the collection, so new chunks are added alongside existing ones.
  2. The state file maps cve_id -> last_modified, not just a flat list of IDs. NVD frequently edits descriptions and re-scores CVEs after publication (sometimes weeks later, when a vendor confirms the affected versions). The fetcher queries lastModStartDate / lastModEndDate, and when the full lastModified timestamp advances we re-ingest, dropping the previous chunks first so the index doesn’t drift away from the source over time.
  3. Unscored CVEs are kept with cvss_score = -1 so severity-threshold filters ({"cvss_score": {"$gte": 7.0}}) still exclude them rather than treating them as 0.0 critical-floor matches.
# Ingest CVEs from the last 7 days
python live_ingest.py

# Ingest CVEs from the last 30 days, filtered to Linux
python live_ingest.py 30 linux

Step 4: Add metadata filtering

With CVSS scores and severity levels stored in ChromaDB metadata, you can filter queries to show only relevant results. Update your query function:

import chromadb
import ollama

COLLECTION_NAME = "security_advisories"
EMBED_MODEL = "nomic-embed-text"
CHAT_MODEL = "llama3.2"


def query_with_filters(question, severity=None, min_cvss=None, n_results=3):
    """Query the pipeline with optional metadata filters."""
    client = chromadb.PersistentClient(path="./chroma_db")
    collection = client.get_collection(COLLECTION_NAME)

    query_embedding = ollama.embed(
        model=EMBED_MODEL, input=question
    )["embeddings"][0]

    # Build filter clause
    where = None
    conditions = []

    if severity:
        conditions.append({"severity": severity.upper()})
    if min_cvss is not None:
        conditions.append({"cvss_score": {"$gte": min_cvss}})

    if len(conditions) == 1:
        where = conditions[0]
    elif len(conditions) > 1:
        where = {"$and": conditions}

    results = collection.query(
        query_embeddings=[query_embedding],
        n_results=n_results,
        where=where,
        include=["documents", "metadatas", "distances"],
    )

    return results


# Examples
results = query_with_filters(
    "remote code execution vulnerabilities",
    severity="CRITICAL",
)

results = query_with_filters(
    "container escape",
    min_cvss=7.0,
)

ChromaDB’s where clause supports comparison operators ($eq, $gte, $lte, $in) and logical operators ($and, $or). This is efficient because filtering happens at the metadata level before the distance calculation, not after.

Warning

Metadata filters reduce result count If you filter for severity: "CRITICAL" but your corpus only has two critical CVEs, ChromaDB returns at most two results even if you requested five. Always check the length of the results array and fall back to unfiltered queries when the filtered set is too small.

Step 5: OSV.dev as an alternative source

OSV.dev (Open Source Vulnerabilities) is a free, open-source vulnerability database focused on packages and ecosystems. Its API is simpler than the NVD and does not require an API key.

Create osv_fetcher.py:

import httpx


OSV_API_URL = "https://api.osv.dev/v1/query"


def fetch_osv_vulns(package, version, ecosystem="PyPI"):
    """Query OSV.dev for vulnerabilities affecting a package version."""
    payload = {
        "package": {
            "name": package,
            "ecosystem": ecosystem,
        },
        "version": version,
    }

    vulns = []
    page_token = None

    while True:
        request_payload = dict(payload)
        if page_token:
            request_payload["page_token"] = page_token

        response = httpx.post(OSV_API_URL, json=request_payload, timeout=30)
        response.raise_for_status()
        data = response.json()

        vulns.extend(data.get("vulns", []))
        page_token = data.get("next_page_token")
        if not page_token:
            break

    print(
        f"Found {len(vulns)} vulnerabilities for "
        f"{ecosystem}/{package}@{version}"
    )

    normalized = []
    for vuln in vulns:
        vuln_id = vuln.get("id", "UNKNOWN")
        summary = vuln.get("summary", "No summary available.")
        details = vuln.get("details", "")
        published = vuln.get("published", "")[:10]

        # OSV stores severity.score as a CVSS vector string
        # (e.g., "CVSS:3.1/AV:N/AC:L/..."), not a numeric base score.
        severity_info = ""
        for severity in vuln.get("severity", []):
            if severity.get("type") in ("CVSS_V4", "CVSS_V3", "CVSS_V2"):
                severity_info = f"CVSS vector ({severity['type']}): {severity['score']}"
                break

        affected_text = ""
        for affected in vuln.get("affected", []):
            pkg = affected.get("package", {})
            ranges = affected.get("ranges", [])
            for r in ranges:
                events = r.get("events", [])
                for event in events:
                    if "fixed" in event:
                        affected_text += f"Fixed in: {event['fixed']}\n"

        text = f"""{vuln_id}: {summary}

{severity_info}
Published: {published}

{details[:500]}

{affected_text}"""

        normalized.append({
            "id": vuln_id,
            "text": text.strip(),
            "metadata": {
                "source": vuln_id,
                "ecosystem": ecosystem,
                "package": package,
                "published": published,
            },
        })

    return normalized


if __name__ == "__main__":
    vulns = fetch_osv_vulns("chromadb", "0.5.23", ecosystem="PyPI")
    for v in vulns[:3]:
        print(f"\n{'='*60}")
        print(v["text"][:300])

OSV.dev is especially useful when you want to track vulnerabilities in your project’s dependencies. Query it with an ecosystem, package name, and version, or use /v1/querybatch when you need to check many dependencies at once.

Tip

OSV queries are version-scoped Unlike the NVD example above, OSV’s /v1/query API is designed around package+version (or commit) lookups. If you only have a package name and want to enumerate all historical advisories, use OSV’s data exports, /v1/vulns/{id} after collecting IDs elsewhere, or a tool such as osv-scanner.

Note

Metadata schema asymmetry The OSV normalizer above does not emit severity or cvss_score metadata fields, because OSV’s severity.score is a CVSS vector string rather than a numeric base score. If you mix NVD and OSV documents in the same collection, query_with_filters(severity=...) and min_cvss=... will silently exclude the OSV-sourced ones. Either parse the CVSS vector into a numeric score (e.g., with the cvss PyPI package) or maintain two collections and route queries accordingly.

Note

Pagination matters OSV may return a next_page_token for larger result sets. The loop above keeps following that token until exhaustion so you do not silently miss vulnerabilities for heavily used packages.

python osv_fetcher.py

The output follows the same {id, text, metadata} format as the NVD fetcher, so you can feed it into live_ingest.py with minimal changes. Add a --source osv flag to your ingestion script to choose between feeds.

Step 6: Schedule automated ingestion

For a pipeline that stays current, run the fetcher on a schedule. A cron job is the simplest option:

crontab -e

Add a line to run daily at 6 AM:

0 6 * * * cd /path/to/rag-pipeline && /path/to/.venv/bin/python live_ingest.py 1 >> /var/log/rag-ingest.log 2>&1

The 1 argument tells the fetcher to look back 1 day, matching the daily schedule. Use the full path to the virtual environment’s Python to avoid environment issues.

For systemd, create a timer:

# /etc/systemd/system/rag-ingest.service
[Unit]
Description=RAG Pipeline CVE Ingestion

[Service]
Type=oneshot
WorkingDirectory=/path/to/rag-pipeline
ExecStart=/path/to/.venv/bin/python live_ingest.py 1
User=your-username
# /etc/systemd/system/rag-ingest.timer
[Unit]
Description=Daily RAG CVE ingestion

[Timer]
OnCalendar=*-*-* 06:00:00
Persistent=true

[Install]
WantedBy=timers.target
sudo systemctl enable --now rag-ingest.timer

Note

Treat your state file like data The ingestion_state.json file tracks what has been ingested. If you delete it, the next run will fetch and re-embed everything again. Because the chunk IDs are deterministic, ChromaDB ignores existing IDs rather than storing duplicate records, but you still waste time and compute. Back it up alongside your chroma_db/ directory.

Common mistakes

Hitting API rate limits. The NVD allows 5 requests per 30 seconds without a key. If you are fetching thousands of CVEs, you will hit this quickly. The fetcher includes time.sleep() between pages, but bursts from other tools using the same IP can still trigger limits. Use an API key for production.

Stale embeddings after CVE updates. The NVD sometimes updates CVE descriptions after initial publication (adding analysis, correcting severity scores). The simple state file approach skips already-ingested CVEs even if they have been updated. For production, track the lastModifiedDate and re-embed CVEs whose descriptions changed.

Context window bloat from large corpora. After a month of daily ingestion, you might have thousands of CVEs in ChromaDB. Retrieving 5 chunks from 5 different CVEs produces a large context. Use metadata filtering to narrow results before retrieval, not after.

Forgetting to handle pagination. The NVD API returns at most 2,000 results per request. The fetcher handles pagination with startIndex, but if you modify the code, make sure the loop runs until startIndex + resultsPerPage >= totalResults.

Next steps