Back to Projects
threat intelligenceproduction

Azure DevOps Security Alert Agent

11/15/2024
15 stars
PythonAzure DevOps APISQLiteOAuth 2.0REST APIsYAML
Azure DevOps Security Alert Agent

Azure DevOps Security Alert Agent

A comprehensive Python-based tool designed to collect, analyze, and manage Advanced Security alerts from Azure DevOps repositories. This agent provides security teams with centralized visibility into their organization's security posture across multiple repositories and projects.

Project Overview

Modern software development organizations often have hundreds of repositories across multiple Azure DevOps projects, making it challenging to maintain visibility into security alerts and vulnerabilities. The Azure DevOps Security Alert Agent solves this problem by automatically collecting security alerts from across your entire Azure DevOps organization and providing powerful analysis and reporting capabilities.

The tool leverages Azure DevOps Advanced Security features to gather alert data, stores it in a structured format, and enables comprehensive analysis to help security teams prioritize remediation efforts and track security trends over time.

Core Features

Alert Collection and Management

Comprehensive Alert Retrieval

# Example: Collecting alerts from multiple repositories
from src.client.azure_devops_client import AzureDevOpsClient
from src.models.alert import Alert

client = AzureDevOpsClient(config)

# Collect alerts from all repositories in an organization
for project in client.get_projects():
    for repository in client.get_repositories(project.id):
        alerts = client.get_security_alerts(project.id, repository.id)
        
        for alert_data in alerts:
            alert = Alert.from_api_response(alert_data)
            storage.save_alert(alert)
            
print(f"Collected {len(alerts)} security alerts")

Rich Metadata Extraction

  • Alert severity levels (Critical, High, Medium, Low)
  • Alert states (Active, Dismissed, Fixed)
  • Vulnerability types (Code Quality, Secret Scanning, Dependency Vulnerabilities)
  • Repository and project context
  • Timeline and historical data
  • Remediation status tracking

Advanced Analysis Engine

Multi-dimensional Filtering and Grouping

# Example: Advanced alert analysis
from src.analysis.query import AlertAnalyzer

analyzer = AlertAnalyzer(database_connection)

# Filter alerts by multiple criteria
critical_alerts = analyzer.filter_alerts(
    severity=['Critical', 'High'],
    state='Active',
    repository_pattern='*-production-*',
    date_range=(start_date, end_date)
)

# Group alerts by various dimensions
severity_breakdown = analyzer.group_by_severity()
repository_distribution = analyzer.group_by_repository()
trend_analysis = analyzer.analyze_trends(period='monthly')

# Generate comprehensive reports
report = analyzer.generate_executive_summary()

Trend Analysis and Reporting

  • Historical trend analysis with configurable time periods
  • Alert velocity tracking (new alerts vs. resolved alerts)
  • Security posture improvement metrics
  • Repository-level security scoring
  • Executive dashboard data generation

Flexible Authentication

Multiple Authentication Methods

# config/config.yaml - Personal Access Token
authentication:
  method: "pat"
  personal_access_token: "${AZURE_DEVOPS_PAT}"
  organization: "your-organization"

# OAuth 2.0 Configuration
authentication:
  method: "oauth"
  client_id: "${AZURE_CLIENT_ID}"
  client_secret: "${AZURE_CLIENT_SECRET}"
  tenant_id: "${AZURE_TENANT_ID}"
  organization: "your-organization"

Secure Credential Management

  • Environment variable support for sensitive data
  • OAuth 2.0 flow implementation for enterprise scenarios
  • Token refresh and rotation handling
  • Secure storage of authentication credentials

Technical Architecture

Core Components

Modular Architecture Design

src/
├── authentication/       # Authentication modules
│   ├── pat_auth.py      # Personal Access Token auth
│   └── oauth_auth.py    # OAuth 2.0 authentication
├── client/              # Azure DevOps API client
│   └── azure_devops_client.py
├── models/              # Data models
│   ├── alert.py         # Alert data structure
│   ├── repository.py    # Repository information
│   └── project.py       # Project metadata
├── storage/             # Data persistence layer
│   ├── sqlite_storage.py # SQLite database operations
│   └── schema.sql       # Database schema
├── analysis/            # Analysis and reporting
│   ├── query.py         # Alert querying and filtering
│   ├── trends.py        # Trend analysis
│   └── reports.py       # Report generation
└── enrichment/          # Data enrichment (extensible)
    └── __init__.py

Robust Data Storage

-- Database schema for alert storage
CREATE TABLE alerts (
    id TEXT PRIMARY KEY,
    repository_id TEXT NOT NULL,
    project_id TEXT NOT NULL,
    alert_type TEXT NOT NULL,
    severity TEXT NOT NULL,
    state TEXT NOT NULL,
    title TEXT NOT NULL,
    description TEXT,
    created_date DATETIME NOT NULL,
    updated_date DATETIME NOT NULL,
    dismissed_date DATETIME,
    fixed_date DATETIME,
    file_path TEXT,
    line_number INTEGER,
    metadata JSON
);

CREATE INDEX idx_severity ON alerts(severity);
CREATE INDEX idx_state ON alerts(state);
CREATE INDEX idx_repository ON alerts(repository_id);
CREATE INDEX idx_created_date ON alerts(created_date);

API Integration

Azure DevOps REST API Client

class AzureDevOpsClient:
    def __init__(self, config):
        self.auth = self._setup_authentication(config)
        self.base_url = f"https://dev.azure.com/{config.organization}"
        self.session = requests.Session()
        
    def get_security_alerts(self, project_id, repository_id):
        """Retrieve security alerts for a specific repository"""
        url = f"{self.base_url}/{project_id}/_apis/alert/repository/{repository_id}/alerts"
        
        response = self.session.get(
            url,
            headers=self.auth.get_headers(),
            params={
                'api-version': '7.1-preview.1',
                'includeResolved': 'true'
            }
        )
        
        response.raise_for_status()
        return response.json()['value']
        
    def get_alert_details(self, project_id, repository_id, alert_id):
        """Get detailed information for a specific alert"""
        url = f"{self.base_url}/{project_id}/_apis/alert/repository/{repository_id}/alerts/{alert_id}"
        
        response = self.session.get(
            url,
            headers=self.auth.get_headers(),
            params={'api-version': '7.1-preview.1'}
        )
        
        return response.json()

Rate Limiting and Error Handling

  • Intelligent request throttling to respect API limits
  • Exponential backoff retry logic for transient failures
  • Comprehensive error logging and reporting
  • Graceful handling of authentication token expiration

Analysis Capabilities

Security Metrics and KPIs

Alert Distribution Analysis

class SecurityMetrics:
    def __init__(self, storage):
        self.storage = storage
        
    def calculate_security_score(self, repository_id, time_period=30):
        """Calculate a security score for a repository"""
        alerts = self.storage.get_alerts_by_repository(
            repository_id, 
            days=time_period
        )
        
        # Weight alerts by severity
        severity_weights = {
            'Critical': 10,
            'High': 7,
            'Medium': 4,
            'Low': 1
        }
        
        total_weight = sum(
            severity_weights.get(alert.severity, 0) 
            for alert in alerts if alert.state == 'Active'
        )
        
        # Calculate score (0-100, where 100 is most secure)
        max_possible_score = len(alerts) * 10  # All critical
        if max_possible_score == 0:
            return 100
            
        score = max(0, 100 - (total_weight / max_possible_score * 100))
        return round(score, 2)
        
    def generate_trend_report(self, days=90):
        """Generate security trend analysis"""
        end_date = datetime.now()
        start_date = end_date - timedelta(days=days)
        
        daily_counts = {}
        current_date = start_date
        
        while current_date <= end_date:
            day_str = current_date.strftime('%Y-%m-%d')
            alerts = self.storage.get_alerts_by_date(current_date)
            
            daily_counts[day_str] = {
                'new_alerts': len([a for a in alerts if a.created_date.date() == current_date.date()]),
                'resolved_alerts': len([a for a in alerts if a.fixed_date and a.fixed_date.date() == current_date.date()]),
                'active_alerts': len([a for a in alerts if a.state == 'Active'])
            }
            
            current_date += timedelta(days=1)
            
        return daily_counts

Executive Reporting

def generate_executive_summary(self, time_period=30):
    """Generate executive-level security summary"""
    summary = {
        'total_alerts': self.storage.count_alerts(),
        'active_alerts': self.storage.count_active_alerts(),
        'critical_alerts': self.storage.count_alerts_by_severity('Critical'),
        'top_vulnerable_repositories': self.get_top_vulnerable_repos(limit=10),
        'security_trends': self.generate_trend_report(time_period),
        'remediation_velocity': self.calculate_remediation_velocity(),
        'compliance_status': self.assess_compliance_status()
    }
    
    return summary

Customizable Analysis Queries

Flexible Query Interface

# Example: Custom analysis queries
from src.analysis.query import AlertQuery

# Find all critical secrets in production repositories
critical_secrets = AlertQuery(storage) \
    .filter_by_severity('Critical') \
    .filter_by_type('Secret Scanning') \
    .filter_by_repository_pattern('*-prod-*') \
    .filter_by_state('Active') \
    .execute()

# Analyze dependency vulnerabilities by project
dependency_analysis = AlertQuery(storage) \
    .filter_by_type('Dependency Vulnerability') \
    .group_by('project_id') \
    .aggregate('count', 'avg_severity') \
    .execute()

# Track remediation progress over time
remediation_trends = AlertQuery(storage) \
    .filter_by_date_range(start_date, end_date) \
    .filter_by_state('Fixed') \
    .group_by_date('week') \
    .count() \
    .execute()

Installation and Setup

Prerequisites

System Requirements

# Required software
- Python 3.8 or higher
- pip package manager
- Git

# Azure DevOps permissions
- Organization-level access to Advanced Security
- Project Collection Administrator or Project Administrator role
- Personal Access Token with appropriate scopes:
  - Code (read)
  - Project and team (read)
  - Advanced Security (read)

Quick Start Guide

1. Clone and Setup

# Clone the repository
git clone https://github.com/kfb1007/azure_devops_security_agent.git
cd azure_devops_security_agent

# Install dependencies
pip install -r requirements.txt

# Create configuration file
cp config/config.yaml.example config/config.yaml

2. Configuration

# config/config.yaml
azure_devops:
  organization: "your-organization-name"
  projects:
    - "project1"
    - "project2"
    # or use "*" for all projects

authentication:
  method: "pat"  # or "oauth"
  personal_access_token: "${AZURE_DEVOPS_PAT}"

storage:
  type: "sqlite"
  database_path: "data/alerts.db"

collection:
  include_resolved: true
  batch_size: 100
  rate_limit_requests_per_minute: 60

analysis:
  default_time_range_days: 30
  severity_weights:
    Critical: 10
    High: 7
    Medium: 4
    Low: 1

3. Set Environment Variables

# Set your Azure DevOps Personal Access Token
export AZURE_DEVOPS_PAT="your_personal_access_token_here"

# Optional: Set organization if not in config
export AZURE_DEVOPS_ORG="your-organization"

Running the Agent

Basic Usage

# Collect alerts from configured repositories
python main.py --config config/config.yaml

# Run analysis on collected data
python analyze.py --config config/config.yaml

# Generate reports
python reports.py --config config/config.yaml --format json --output reports/

Advanced Usage Options

# Collect alerts from specific project
python main.py --config config/config.yaml --project "MyProject"

# Collect only critical and high severity alerts
python main.py --config config/config.yaml --severity Critical,High

# Run incremental collection (only new/updated alerts)
python main.py --config config/config.yaml --incremental

# Generate trend analysis for last 90 days
python analyze.py --config config/config.yaml --trends --days 90

Extensibility and Customization

Adding Custom Analysis

Creating Custom Analyzers

# src/analysis/custom_analyzer.py
from .base_analyzer import BaseAnalyzer

class CustomSecurityAnalyzer(BaseAnalyzer):
    def analyze_repository_risk(self, repository_id):
        """Custom risk analysis for a repository"""
        alerts = self.storage.get_alerts_by_repository(repository_id)
        
        # Custom risk calculation logic
        risk_factors = {
            'critical_secrets': len([a for a in alerts if a.type == 'Secret' and a.severity == 'Critical']),
            'dependency_age': self.calculate_dependency_age(alerts),
            'remediation_velocity': self.calculate_remediation_velocity(alerts),
            'code_quality_issues': len([a for a in alerts if a.type == 'Code Quality'])
        }
        
        # Calculate composite risk score
        risk_score = self.calculate_composite_risk(risk_factors)
        
        return {
            'repository_id': repository_id,
            'risk_score': risk_score,
            'risk_factors': risk_factors,
            'recommendations': self.generate_recommendations(risk_factors)
        }

Data Enrichment Pipeline

Enrichment Framework

# src/enrichment/enricher.py
class AlertEnricher:
    def __init__(self, config):
        self.enrichers = [
            CVEEnricher(),
            ThreatIntelligenceEnricher(),
            BusinessContextEnricher()
        ]
        
    def enrich_alert(self, alert):
        """Apply all enrichment modules to an alert"""
        enriched_data = {}
        
        for enricher in self.enrichers:
            try:
                enrichment = enricher.enrich(alert)
                enriched_data.update(enrichment)
            except Exception as e:
                self.logger.warning(f"Enrichment failed for {enricher.__class__.__name__}: {e}")
                
        alert.enriched_data = enriched_data
        return alert

class CVEEnricher:
    def enrich(self, alert):
        """Enrich with CVE database information"""
        if alert.type == 'Dependency Vulnerability':
            cve_id = self.extract_cve_id(alert.description)
            if cve_id:
                cve_data = self.lookup_cve(cve_id)
                return {
                    'cve_score': cve_data.get('cvss_score'),
                    'cve_vector': cve_data.get('attack_vector'),
                    'exploitability': cve_data.get('exploitability_score')
                }
        return {}

Integration Capabilities

CI/CD Pipeline Integration

Azure Pipelines Integration

# azure-pipelines.yml
trigger:
  branches:
    include:
    - main

stages:
- stage: SecurityMonitoring
  displayName: 'Security Alert Monitoring'
  jobs:
  - job: CollectAlerts
    displayName: 'Collect Security Alerts'
    steps:
    - task: UsePythonVersion@0
      inputs:
        versionSpec: '3.9'
        
    - script: |
        pip install -r requirements.txt
        python main.py --config config/config.yaml --project $(System.TeamProject)
      displayName: 'Run Security Alert Collection'
      env:
        AZURE_DEVOPS_PAT: $(AZURE_DEVOPS_PAT)
        
    - task: PublishTestResults@2
      inputs:
        testResultsFormat: 'JUnit'
        testResultsFiles: 'reports/security-report.xml'
        testRunTitle: 'Security Alert Analysis'

External Tool Integration

SIEM Integration Example

# src/integrations/siem_integration.py
class SIEMIntegration:
    def __init__(self, siem_config):
        self.siem_endpoint = siem_config['endpoint']
        self.api_key = siem_config['api_key']
        
    def send_critical_alerts(self, alerts):
        """Send critical alerts to SIEM system"""
        critical_alerts = [a for a in alerts if a.severity == 'Critical']
        
        for alert in critical_alerts:
            siem_event = {
                'timestamp': alert.created_date.isoformat(),
                'source': 'Azure DevOps Security Agent',
                'severity': alert.severity,
                'event_type': 'security_alert',
                'repository': alert.repository_id,
                'alert_type': alert.type,
                'description': alert.description,
                'remediation_required': True
            }
            
            self.send_event(siem_event)

Performance and Scalability

Optimization Features

Efficient Data Collection

  • Incremental collection to minimize API calls
  • Parallel processing for multiple repositories
  • Configurable batch sizes for large organizations
  • Rate limiting to respect Azure DevOps API constraints
  • Resume capability for interrupted collection runs

Storage Optimization

# Example: Optimized batch operations
class OptimizedStorage:
    def batch_insert_alerts(self, alerts, batch_size=1000):
        """Efficiently insert large numbers of alerts"""
        for i in range(0, len(alerts), batch_size):
            batch = alerts[i:i + batch_size]
            
            with self.connection:
                self.connection.executemany(
                    """
                    INSERT OR REPLACE INTO alerts 
                    (id, repository_id, project_id, alert_type, severity, state, 
                     title, description, created_date, updated_date, metadata)
                    VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
                    """,
                    [alert.to_tuple() for alert in batch]
                )

Performance Metrics

| Operation | Performance | Scalability |

|-----------|-------------|-------------|

| Alert Collection | 100-500 alerts/minute | Supports 1000+ repositories |

| Database Operations | 10,000+ inserts/second | Multi-GB databases |

| Analysis Queries | <1 second for 100K alerts | Complex aggregations |

| Report Generation | <5 seconds | Executive summaries |

| Trend Analysis | <10 seconds for 1 year | Historical data analysis |

Use Cases and Benefits

Enterprise Security Management

Centralized Security Visibility

  • Organization-wide security alert dashboard
  • Cross-project vulnerability tracking
  • Executive reporting and KPI monitoring
  • Compliance audit trail maintenance

Risk-Based Prioritization

  • Severity-weighted security scoring
  • Business context integration
  • Resource allocation optimization
  • Remediation workflow automation

Development Team Enablement

Developer-Friendly Security

  • Repository-specific security insights
  • Integration with existing development workflows
  • Actionable remediation guidance
  • Progress tracking and gamification

Security Culture Building

  • Team-level security metrics
  • Trend visualization and improvement tracking
  • Security awareness through data-driven insights
  • Recognition of security achievements

Future Roadmap

Planned Enhancements

Advanced Analytics

  • Machine learning-based alert prioritization
  • Predictive vulnerability analysis
  • Anomaly detection in security patterns
  • Natural language processing for alert categorization

Enhanced Integrations

  • GitHub Advanced Security support
  • GitLab security integration
  • Microsoft Sentinel connector
  • Slack/Teams notification bots

Enterprise Features

  • Multi-tenant organization support
  • Advanced role-based access control
  • Custom dashboard builder
  • Automated remediation suggestions

Contributing

We welcome contributions to improve the Azure DevOps Security Alert Agent:

Development Setup

# Fork and clone the repository
git clone https://github.com/your-username/azure_devops_security_agent.git
cd azure_devops_security_agent

# Create development environment
python -m venv venv
source venv/bin/activate  # On Windows: venv\Scripts\activate

# Install development dependencies
pip install -r requirements-dev.txt

# Run tests
python -m pytest tests/ --coverage

# Run linting
flake8 src/
black src/
mypy src/

Contribution Guidelines

  • Follow PEP 8 style guidelines
  • Add comprehensive tests for new features
  • Update documentation for API changes
  • Ensure backwards compatibility when possible

Support and Documentation

  • GitHub Repository: [kfb1007/azure_devops_security_agent](https://github.com/kfb1007/azure_devops_security_agent)
  • Issues and Feature Requests: [GitHub Issues](https://github.com/kfb1007/azure_devops_security_agent/issues)
  • Discussions: [GitHub Discussions](https://github.com/kfb1007/azure_devops_security_agent/discussions)
  • Documentation: [Project Wiki](https://github.com/kfb1007/azure_devops_security_agent/wiki)

License

This project is licensed under the MIT License. See the [LICENSE](https://github.com/kfb1007/azure_devops_security_agent/blob/main/LICENSE) file for details.

Acknowledgments

  • Microsoft Azure DevOps team for comprehensive API support
  • Open source security tools community
  • Enterprise users providing feedback and testing
  • Contributors and maintainers