The CleanStart Source Intelligence Core SDK for Python provides programmatic access to vulnerability data, package metadata, and supply chain intelligence. Query CVEs, track fix status, and monitor dependencies in your codebase.
Installation
Install the SDK from PyPI using pip install cleanstart-intelligence. After installation, verify it was successful by running python -c "import cleanstart_intelligence; print(cleanstart_intelligence.__version__)".
Configuration
Set your API key as an environment variable with export CLEANSTART_API_KEY="your-api-key-here". Alternatively, pass it directly when initializing the client as shown below.
from cleanstart_intelligence import IntelligenceClient client = IntelligenceClient(api_key="your-api-key-here")To obtain an API key, visit dashboard.cleanstart.com and follow these steps. First, log in to your account. Next, navigate to Settings > API Keys. Then click Generate New Key. Select the scope Source Intelligence Read (minimum). Finally, copy the key and store it securely.
First Query
Query vulnerabilities by CVE ID using the code below.
from cleanstart_intelligence import IntelligenceClient # Initialize clientclient = IntelligenceClient() # Query CVEresult = client.vulnerabilities.get_by_cve_id("CVE-2024-1234") print(result)# Output:# {# "cve_id": "CVE-2024-1234",# "description": "SQL injection in ...",# "severity": "HIGH",# "cvss_score": 8.6,# "affected_packages": [# {# "name": "django",# "ecosystem": "pypi",# "versions": ["<4.2.8", "<5.0.1"]# }# ],# "fix_available": true,# "disclosure_date": "2024-01-15"# }Querying Vulnerabilities
By CVE ID
# Get vulnerability details by CVE IDvuln = client.vulnerabilities.get_by_cve_id("CVE-2024-1234") print(f"Severity: {vuln['severity']}")print(f"CVSS Score: {vuln['cvss_score']}")print(f"Description: {vuln['description']}")By Package Name
# Get vulnerabilities for a specific packagevulns = client.vulnerabilities.get_by_package( package_name="django", ecosystem="pypi", limit=10) for vuln in vulns['results']: print(f"{vuln['cve_id']}: {vuln['description']}")By Ecosystem
# Get all vulnerabilities for an ecosystemvulns = client.vulnerabilities.get_by_ecosystem( ecosystem="npm", severity="HIGH", limit=50) print(f"Found {len(vulns['results'])} HIGH severity vulnerabilities")for vuln in vulns['results']: print(f"- {vuln['cve_id']}: {vuln['affected_packages']}")Filtering and Pagination
# Query with multiple filtersvulns = client.vulnerabilities.search( severity_min="HIGH", cvss_score_min=7.0, has_fix=True, published_after="2024-01-01", page=1, limit=20) print(f"Total results: {vulns['total']}")print(f"Page: {vulns['page']} of {vulns['pages']}") # Iterate through all pagesfor page in range(1, vulns['pages'] + 1): page_results = client.vulnerabilities.search( severity_min="HIGH", page=page ) # Process resultsPackage and Version Information
Check Package Metadata
# Get package detailspkg = client.packages.get_metadata( name="requests", ecosystem="pypi") print(f"Name: {pkg['name']}")print(f"Latest version: {pkg['latest_version']}")print(f"Repository: {pkg['repository_url']}")print(f"Description: {pkg['description']}")print(f"License: {pkg['license']}")Check Fix Status
# Check if a vulnerability is fixed in a specific versionis_fixed = client.packages.is_vulnerability_fixed( package_name="django", ecosystem="pypi", version="4.2.8", cve_id="CVE-2024-1234") if is_fixed: print("Vulnerability is fixed in Django 4.2.8")else: print("Upgrade required")Get Version History
# Get all versions and their vulnerability statusversions = client.packages.get_versions( package_name="requests", ecosystem="pypi") for version in versions['results']: if version['vulnerabilities']: print(f"{version['version']}: {len(version['vulnerabilities'])} vulnerabilities") else: print(f"{version['version']}: Clean")Find Latest Safe Version
# Find the latest version without known vulnerabilitiessafe_version = client.packages.get_latest_safe_version( package_name="requests", ecosystem="pypi") if safe_version: print(f"Latest safe version: {safe_version}") print(f"Current version: X.Y.Z") print(f"Upgrade available: {safe_version > 'X.Y.Z'}")else: print("No safe version found")Dependency Graph Traversal
Get Package Dependencies
# Get direct dependenciesdeps = client.dependencies.get_dependencies( package_name="django", version="4.2.0", ecosystem="pypi") for dep in deps['results']: print(f"- {dep['package_name']} {dep['version_spec']}")Get Transitive Dependencies
# Get all transitive dependenciesall_deps = client.dependencies.get_transitive_dependencies( package_name="requests", version="2.31.0", ecosystem="pypi") for dep in all_deps['results']: path = " -> ".join(dep['path']) print(f"{path} (version: {dep['version']})")Dependency Vulnerability Analysis
# Find all vulnerabilities in dependency treevulns_in_deps = client.dependencies.find_vulnerabilities_in_tree( package_name="django", version="4.2.0", ecosystem="pypi") for vuln in vulns_in_deps['results']: print(f"Vulnerability: {vuln['cve_id']}") print(f"Path: {' -> '.join(vuln['path'])}") print(f"Severity: {vuln['severity']}") print()Exploit Prediction Scoring (EPSS)
Get EPSS Score
# Get exploit probability scoreepss = client.vulnerabilities.get_epss(cve_id="CVE-2024-1234") print(f"EPSS Score: {epss['score']}") # 0.0 to 1.0print(f"Percentile: {epss['percentile']}") # 0-100print(f"Severity: {epss['severity']}") # None, Low, Medium, High, Critical if epss['score'] > 0.5: print("High exploit probability - prioritize patching")Batch EPSS Lookup
# Get EPSS for multiple CVEscves = ["CVE-2024-1234", "CVE-2024-5678", "CVE-2024-9999"]epss_data = client.vulnerabilities.get_epss_batch(cve_ids=cves) for cve_id, epss in epss_data.items(): print(f"{cve_id}: {epss['score']:.2f} (percentile: {epss['percentile']})")Advisory Subscriptions
Get Security Advisories
# Get advisories for a packageadvisories = client.advisories.get_by_package( package_name="django", ecosystem="pypi") for advisory in advisories['results']: print(f"Title: {advisory['title']}") print(f"Description: {advisory['description']}") print(f"Affected versions: {advisory['affected_versions']}") print(f"Fix version: {advisory['fixed_version']}") print(f"Published: {advisory['published_date']}") print()Subscribe to Advisories (Webhook)
# Register webhook for package advisorieswebhook = client.webhooks.register( url="https://your-app.example.com/webhooks/advisories", events=["advisory.created", "advisory.updated"], packages=[ {"name": "django", "ecosystem": "pypi"}, {"name": "requests", "ecosystem": "pypi"} ]) print(f"Webhook ID: {webhook['id']}")print(f"Endpoint: {webhook['url']}")print(f"Created: {webhook['created_at']}")Webhook Payload Example
Your endpoint will receive POST requests with this structure:
{ "event": "advisory.created", "timestamp": "2024-01-15T10:30:00Z", "advisory": { "id": "GHSA-1234-5678-9999", "cve_id": "CVE-2024-1234", "title": "SQL injection in Django ORM", "description": "...", "severity": "HIGH", "affected_packages": [ { "name": "django", "ecosystem": "pypi", "affected_versions": ["<4.2.8", "<5.0.1"] } ], "fixed_version": "4.2.8", "advisory_url": "https://github.com/advisories/GHSA-1234-5678-9999" }}Verify Webhook Signature
import hmacimport hashlib def verify_webhook(secret, payload, signature): """Verify webhook signature""" expected = hmac.new( secret.encode(), payload, hashlib.sha256 ).hexdigest() return hmac.compare_digest(signature, expected) # In your webhook handler@app.post("/webhooks/advisories")def handle_advisory(request): payload = request.body signature = request.headers.get("X-Signature") if not verify_webhook(os.environ["WEBHOOK_SECRET"], payload, signature): return {"error": "Invalid signature"}, 401 advisory = json.loads(payload) # Process advisory return {"status": "received"}Response Format Examples
All API responses follow a consistent JSON structure:
Success Response
{ "success": true, "data": { "cve_id": "CVE-2024-1234", "severity": "HIGH", "description": "..." }, "meta": { "request_id": "req-abc123def456", "timestamp": "2024-01-15T10:30:00Z" }}Paginated Response
{ "success": true, "data": [ { "cve_id": "CVE-2024-1234", "severity": "HIGH" }, { "cve_id": "CVE-2024-5678", "severity": "MEDIUM" } ], "pagination": { "page": 1, "limit": 20, "total": 542, "pages": 28, "has_next": true }, "meta": { "request_id": "req-abc123def456", "timestamp": "2024-01-15T10:30:00Z" }}Error Response
{ "success": false, "error": { "code": "NOT_FOUND", "message": "CVE-2024-9999 not found", "details": "This CVE ID does not exist in our database" }, "meta": { "request_id": "req-abc123def456", "timestamp": "2024-01-15T10:30:00Z" }}Error Handling
Try-Catch Patterns
from cleanstart_intelligence import ( IntelligenceClient, APIError, NotFoundError, RateLimitError, ValidationError) client = IntelligenceClient() try: vuln = client.vulnerabilities.get_by_cve_id("CVE-2024-1234")except NotFoundError as e: print(f"CVE not found: {e.message}")except RateLimitError as e: print(f"Rate limited. Retry after {e.retry_after} seconds")except ValidationError as e: print(f"Invalid input: {e.message}")except APIError as e: print(f"API error: {e.message}")Specific Error Codes
Error Code | HTTP Status | Meaning |
|---|---|---|
NOT_FOUND | 404 | Resource doesn't exist |
INVALID_INPUT | 400 | Validation failed |
UNAUTHORIZED | 401 | API key invalid or missing |
FORBIDDEN | 403 | Insufficient permissions |
RATE_LIMITED | 429 | Too many requests |
SERVER_ERROR | 500 | Internal server error |
Retry Logic
import timefrom cleanstart_intelligence import RateLimitError def query_with_retry(client, cve_id, max_retries=3): for attempt in range(max_retries): try: return client.vulnerabilities.get_by_cve_id(cve_id) except RateLimitError as e: if attempt < max_retries - 1: wait_time = e.retry_after or (2 ** attempt) print(f"Rate limited. Waiting {wait_time} seconds...") time.sleep(wait_time) else: raise except Exception as e: print(f"Error: {e}") raiseBulk Operations
Batch Query Multiple CVEs
cves = [ "CVE-2024-1234", "CVE-2024-5678", "CVE-2024-9999", "CVE-2024-1111", "CVE-2024-2222"] results = client.vulnerabilities.get_batch(cve_ids=cves) for cve_id, vuln in results.items(): if vuln: print(f"{cve_id}: {vuln['severity']}") else: print(f"{cve_id}: Not found")Scan Local Requirements File
def scan_requirements(requirements_file): """Scan requirements.txt for vulnerabilities""" vulns_found = [] with open(requirements_file) as f: for line in f: line = line.strip() if not line or line.startswith('#'): continue # Parse package name and version parts = line.split('==') if len(parts) != 2: continue package_name, version = parts # Check for vulnerabilities pkg_vulns = client.vulnerabilities.get_by_package( package_name=package_name, ecosystem="pypi" ) for vuln in pkg_vulns['results']: # Check if current version is affected if _is_version_affected(version, vuln['affected_versions']): vulns_found.append({ 'package': package_name, 'version': version, 'cve_id': vuln['cve_id'], 'severity': vuln['severity'] }) return vulns_found vulns = scan_requirements('requirements.txt')for vuln in vulns: print(f"{vuln['package']}: {vuln['cve_id']} ({vuln['severity']})")Logging and Debugging
Enable Debug Logging
import logging # Enable debug logginglogging.basicConfig(level=logging.DEBUG)logging.getLogger('cleanstart_intelligence').setLevel(logging.DEBUG) client = IntelligenceClient()# All API calls will now log request/response detailsRequest Inspection
from cleanstart_intelligence import IntelligenceClient client = IntelligenceClient() # Access last request detailsresponse = client.vulnerabilities.get_by_cve_id("CVE-2024-1234")print(f"Request ID: {response['meta']['request_id']}")print(f"Response time: {response['meta']['duration_ms']}ms")Rate Limiting and Quotas
Default rate limits are 1,000 requests per minute, 10,000 requests per hour, and 100,000 requests per day.
Check Rate Limit Status
# Rate limit info available after each requestresponse = client.vulnerabilities.get_by_cve_id("CVE-2024-1234") # Access rate limit headersprint(f"Remaining this minute: {response['meta']['rate_limit_remaining']}")print(f"Limit resets at: {response['meta']['rate_limit_reset']}")Caching
The SDK includes built-in caching to reduce API calls:
from cleanstart_intelligence import IntelligenceClient # Enable caching (default: 3600 seconds = 1 hour)client = IntelligenceClient(cache_ttl=3600) # First call hits APIvuln1 = client.vulnerabilities.get_by_cve_id("CVE-2024-1234") # Second call uses cache (within TTL)vuln2 = client.vulnerabilities.get_by_cve_id("CVE-2024-1234") # Clear cache if neededclient.clear_cache() # Disable cache for specific queriesvuln3 = client.vulnerabilities.get_by_cve_id( "CVE-2024-1234", use_cache=False)Examples
Security Scan Integration
import jsonfrom cleanstart_intelligence import IntelligenceClient def scan_project(requirements_file): """Scan Python project for vulnerabilities""" client = IntelligenceClient() vulns = [] with open(requirements_file) as f: for line in f: if not line.strip() or line.startswith('#'): continue parts = line.strip().split('==') if len(parts) != 2: continue pkg_name, version = parts # Check package for vulnerabilities result = client.vulnerabilities.get_by_package( package_name=pkg_name, ecosystem="pypi" ) for vuln in result['results']: vulns.append({ 'package': pkg_name, 'version': version, 'cve': vuln['cve_id'], 'severity': vuln['severity'], 'fix_version': vuln.get('fixed_version'), 'epss_score': vuln.get('epss_score') }) return sorted(vulns, key=lambda x: ['LOW', 'MEDIUM', 'HIGH', 'CRITICAL'].index(x['severity']), reverse=True) # Run scanvulns = scan_project('requirements.txt') # Output reportprint(f"Found {len(vulns)} vulnerabilities\n")for vuln in vulns: print(f"{vuln['package']} {vuln['version']}") print(f" CVE: {vuln['cve']}") print(f" Severity: {vuln['severity']}") print(f" Fix: Upgrade to {vuln['fix_version']}") if vuln['epss_score']: print(f" EPSS: {vuln['epss_score']:.2f}") print()Automated Patching Check
def check_patch_availability(package_name, current_version, ecosystem="pypi"): """Check if a patch is available for a package""" client = IntelligenceClient() # Get package info pkg = client.packages.get_metadata( name=package_name, ecosystem=ecosystem ) # Compare versions latest = pkg['latest_version'] current_tuple = tuple(map(int, current_version.split('.'))) latest_tuple = tuple(map(int, latest.split('.'))) if latest_tuple > current_tuple: # Get vulnerabilities fixed in newer versions fixed_vulns = client.packages.get_versions( package_name=package_name, ecosystem=ecosystem ) critical_fixes = [ v for v in fixed_vulns['results'] if v['version'] == latest and v['vulnerabilities'] ] return { 'update_available': True, 'current': current_version, 'latest': latest, 'critical_fixes': len(critical_fixes) } else: return {'update_available': False} # Check updatesresult = check_patch_availability('django', '4.2.0')print(f"Update available: {result['update_available']}")if result['update_available']: print(f"Upgrade to: {result['latest']}") print(f"Critical fixes: {result['critical_fixes']}")Next Steps
API Reference: See Source Intelligence Core API docs for complete endpoint documentation (internal use). Authentication: Learn about API key management. Webhooks: Set up security advisory subscriptions. Examples: Explore integration samples for your use case.
