Convert Sigma Cloud Rules into D&R and Upload them (AWS, Azure, GCP)

Hi everyone,

The standard Sigma converter tool for LimaCharlie doesn’t work to convert cloud rules. I get most of my detectors from here and wanted to upload AWS, Azure, and GCP related rules. This lead me to create a Python script that will both convert those rules and upload them into your chosen OID.

Hope this helps!

#!/usr/bin/env python3
"""
SIGMA to LimaCharlie D&R Rule Converter for Cloud Platforms

A converter for SIGMA detection rules targeting cloud platforms
(AWS CloudTrail, Azure Activity/Audit/SignIn Logs, GCP Audit Logs) into
LimaCharlie Detection & Response (D&R) rules.

SUPPORTED PLATFORMS:
    - AWS CloudTrail
    - Azure Activity Logs, Audit Logs, SignIn Logs, Identity Protection
    - GCP Audit Logs, Google Workspace

SIGMA FEATURES SUPPORTED:
    - Field matching: eventName, Operation, methodName, etc.
    - Modifiers: |contains, |startswith, |endswith, |re (regex)
    - Null checks: field: null
    - Array notation: field{}.subfield
    - Conditions: and, or, not, all of selection*, 1 of selection*
    - Nested field paths: userIdentity.arn, data.protoPayload.serviceName

REQUIREMENTS:
    pip install pyyaml limacharlie

USAGE:
    # Convert a single rule and print output
    python sigma_to_limacharlie.py --single rule.yaml

    # Convert all rules in a folder
    python sigma_to_limacharlie.py --sigma-folder ./sigma-rules --output-dir ./converted

    # Dry run (validate without saving)
    python sigma_to_limacharlie.py --sigma-folder ./sigma-rules --dry-run

UPLOAD TO LIMACHARLIE:
    After converting, upload using the LimaCharlie CLI:

    # 1. Set credentials as environment variables
    export LC_OID="your-org-id"
    export LC_UID="your-user-id"
    export LC_API_KEY="your-api-key"

    # 2. Upload all converted rules
    for f in converted/*.yaml; do
        python -m limacharlie dr add -r "$(basename "$f" .yaml)" -f "$f" --replace
    done

    # Or upload a single rule
    python -m limacharlie dr add -r "AWS-bucket-deleted" -f converted/AWS-bucket-deleted.yaml

CONVERSION MAPPING:
    SIGMA                           LimaCharlie D&R
    ─────────────────────────────   ─────────────────────────────────────
    eventName: DeleteBucket         op: is, path: event/eventName, value: DeleteBucket
    field|contains: value           op: contains, path: event/field, value: value
    field|startswith: value         op: starts with, path: event/field, value: value
    field|endswith: value           op: ends with, path: event/field, value: value
    field|re: pattern               op: matches, path: event/field, re: pattern
    field: null                     op: exists, path: event/field, not: true
    field{}.subfield                path: event/field/?/subfield (wildcard)
    userIdentity.arn                path: event/userIdentity/arn
    condition: A and B              op: and, rules: [A, B]
    condition: A or B               op: or, rules: [A, B]
    condition: A and not B          op: and, rules: [A, {B, not: true}]
    all of selection*               op: and with all matching selections
    1 of selection*                 op: or with all matching selections

AUTHOR:
    Joshua Strickland / Secnap

LICENSE:
    MIT License - Free to use, modify, and distribute

"""

import os
import re
import sys
import yaml
import argparse
from pathlib import Path
from typing import Any, Dict, List, Optional, Tuple
from dataclasses import dataclass, field


# ============================================================================
# Configuration (loaded from environment variables)
# ============================================================================

def get_config() -> Dict[str, str]:
    """Load configuration from environment variables."""
    return {
        "oid": os.environ.get("LC_OID", ""),
        "uid": os.environ.get("LC_UID", ""),
        "api_key": os.environ.get("LC_API_KEY", ""),
        "api_base": os.environ.get("LC_API_BASE", "https://api.limacharlie.io/v1"),
    }


# ============================================================================
# SIGMA Rule Parser
# ============================================================================

@dataclass
class SigmaRule:
    """
    Parsed SIGMA rule structure.

    Represents a SIGMA detection rule with all its components extracted
    from the YAML source file.
    """
    title: str
    id: str
    status: str
    description: str
    author: str
    date: str
    level: str
    logsource: Dict[str, str]
    detection: Dict[str, Any]
    condition: str
    tags: List[str] = field(default_factory=list)
    references: List[str] = field(default_factory=list)
    falsepositives: List[str] = field(default_factory=list)
    modified: Optional[str] = None

    @classmethod
    def from_yaml(cls, data: Dict[str, Any]) -> "SigmaRule":
        """Parse SIGMA rule from YAML data."""
        detection = data.get("detection", {})
        condition = detection.pop("condition", "")

        return cls(
            title=data.get("title", "Untitled"),
            id=data.get("id", ""),
            status=data.get("status", "experimental"),
            description=data.get("description", "").strip() if data.get("description") else "",
            author=data.get("author", "Unknown"),
            date=str(data.get("date", "")),
            modified=str(data.get("modified", "")) if data.get("modified") else None,
            level=data.get("level", "medium"),
            logsource=data.get("logsource", {}),
            detection=detection,
            condition=condition,
            tags=data.get("tags", []),
            references=data.get("references", []),
            falsepositives=data.get("falsepositives", []),
        )


def parse_sigma_file(filepath: Path) -> Optional[SigmaRule]:
    """
    Parse a SIGMA YAML file into a SigmaRule object.

    Args:
        filepath: Path to the SIGMA YAML file

    Returns:
        SigmaRule object or None if parsing fails
    """
    try:
        with open(filepath, "r", encoding="utf-8") as f:
            data = yaml.safe_load(f)
        if not data:
            return None
        return SigmaRule.from_yaml(data)
    except Exception as e:
        print(f"  ERROR parsing {filepath.name}: {e}")
        return None


# ============================================================================
# Field Path Converter
# ============================================================================

def convert_field_path(sigma_path: str) -> str:
    """
    Convert SIGMA field path to LimaCharlie path format.

    SIGMA uses dot notation for nested fields and {} for array access.
    LimaCharlie uses forward slashes and ? for wildcards.

    Examples:
        eventName                       -> event/eventName
        userIdentity.arn                -> event/userIdentity/arn
        data.protoPayload.serviceName   -> event/data/protoPayload/serviceName
        ModifiedProperties{}.NewValue   -> event/ModifiedProperties/?/NewValue

    Args:
        sigma_path: SIGMA field path (dot notation)

    Returns:
        LimaCharlie field path (slash notation with event/ prefix)
    """
    # Handle array notation: field{} or field{}.subfield -> field/?/subfield
    path = re.sub(r'\{\}', '/?', sigma_path)

    # Replace dots with slashes for nested fields
    path = path.replace('.', '/')

    # Ensure path starts with event/ (LimaCharlie event data is under "event")
    if not path.startswith('event/') and not path.startswith('routing/'):
        path = f"event/{path}"

    return path


def parse_field_with_modifier(field_key: str) -> Tuple[str, Optional[str]]:
    """
    Parse SIGMA field key that may contain a modifier.

    SIGMA modifiers are appended to field names with a pipe character.

    Examples:
        fieldName           -> (fieldName, None)
        fieldName|contains  -> (fieldName, contains)
        fieldName|re        -> (fieldName, re)
        fieldName|endswith  -> (fieldName, endswith)

    Args:
        field_key: SIGMA field key, possibly with modifier

    Returns:
        Tuple of (field_name, modifier) where modifier may be None
    """
    if '|' in field_key:
        parts = field_key.split('|', 1)
        return parts[0], parts[1]
    return field_key, None


# ============================================================================
# Detection Logic Converter
# ============================================================================

def convert_value_to_rule(path: str, value: Any, modifier: Optional[str] = None) -> Dict[str, Any]:
    """
    Convert a single SIGMA field/value pair to a LimaCharlie rule.

    Handles:
        - Null values (converts to exists check with not: true)
        - Lists of values (converts to OR condition)
        - Modifiers (contains, startswith, endswith, re, all)
        - Simple equality matching

    Args:
        path: SIGMA field path
        value: Value to match (can be string, list, or None)
        modifier: Optional SIGMA modifier (contains, re, etc.)

    Returns:
        LimaCharlie rule dict with op, path, and value/re
    """
    lc_path = convert_field_path(path)

    # Handle null values -> check field doesn't exist
    if value is None:
        return {
            "op": "exists",
            "path": lc_path,
            "not": True
        }

    # Handle list of values -> OR condition
    if isinstance(value, list):
        if len(value) == 1:
            return convert_value_to_rule(path, value[0], modifier)

        rules = [convert_value_to_rule(path, v, modifier) for v in value]
        return {
            "op": "or",
            "rules": rules
        }

    # Handle modifiers
    if modifier:
        modifier = modifier.lower()

        if modifier == 're':
            # Regex matching
            return {
                "op": "matches",
                "path": lc_path,
                "re": str(value),
                "case sensitive": False
            }
        elif modifier == 'contains':
            # Substring matching
            return {
                "op": "contains",
                "path": lc_path,
                "value": str(value),
                "case sensitive": False
            }
        elif modifier == 'startswith':
            # Prefix matching
            return {
                "op": "starts with",
                "path": lc_path,
                "value": str(value)
            }
        elif modifier == 'endswith':
            # Suffix matching
            return {
                "op": "ends with",
                "path": lc_path,
                "value": str(value)
            }
        elif modifier == 'all':
            # All values must match (AND condition for lists)
            if isinstance(value, list):
                rules = [convert_value_to_rule(path, v, None) for v in value]
                return {
                    "op": "and",
                    "rules": rules
                }

    # Default: exact equality match
    return {
        "op": "is",
        "path": lc_path,
        "value": value if isinstance(value, (str, int, float, bool)) else str(value)
    }


def convert_selection_block(selection: Dict[str, Any]) -> Dict[str, Any]:
    """
    Convert a SIGMA selection block to LimaCharlie rules.

    A selection block can be:
        - A dict of field conditions (AND them together)
        - A list of dicts (OR them together)

    Args:
        selection: SIGMA selection block (dict or list)

    Returns:
        LimaCharlie rule dict
    """
    if isinstance(selection, list):
        # List of dicts = OR condition
        rules = []
        for item in selection:
            if isinstance(item, dict):
                rules.append(convert_selection_block(item))
            else:
                rules.append({"op": "is", "value": item})

        if len(rules) == 1:
            return rules[0]
        return {"op": "or", "rules": rules}

    if not isinstance(selection, dict):
        return {"op": "is", "value": selection}

    # Dict of field conditions - AND them together
    rules = []
    for field_key, value in selection.items():
        field_name, modifier = parse_field_with_modifier(field_key)
        rule = convert_value_to_rule(field_name, value, modifier)
        rules.append(rule)

    if len(rules) == 1:
        return rules[0]

    return {"op": "and", "rules": rules}


def parse_condition(condition: str, selections: Dict[str, Dict]) -> Dict[str, Any]:
    """
    Parse SIGMA condition string and build LimaCharlie detection logic.

    Supports:
        - Simple selection reference: "selection"
        - AND conditions: "selection1 and selection2"
        - OR conditions: "selection1 or selection2"
        - NOT conditions: "selection and not filter"
        - Wildcards: "all of selection*", "1 of selection*"

    Args:
        condition: SIGMA condition string
        selections: Dict of selection name -> selection block

    Returns:
        LimaCharlie detection rule dict
    """
    condition = condition.strip()

    # Handle "all of selection*" or "all of them"
    all_match = re.match(r'all\s+of\s+(\w+)\*', condition)
    if all_match:
        prefix = all_match.group(1)
        matching = [k for k in selections if k.startswith(prefix)]
        if matching:
            rules = [convert_selection_block(selections[k]) for k in matching]
            if len(rules) == 1:
                return rules[0]
            return {"op": "and", "rules": rules}

    # Handle "1 of selection*"
    one_match = re.match(r'1\s+of\s+(\w+)\*', condition)
    if one_match:
        prefix = one_match.group(1)
        matching = [k for k in selections if k.startswith(prefix)]
        if matching:
            rules = [convert_selection_block(selections[k]) for k in matching]
            if len(rules) == 1:
                return rules[0]
            return {"op": "or", "rules": rules}

    # Handle "X and not Y" pattern
    and_not_match = re.match(r'(.+)\s+and\s+not\s+(.+)', condition, re.IGNORECASE)
    if and_not_match:
        positive_part = and_not_match.group(1).strip()
        negative_part = and_not_match.group(2).strip()

        positive_rule = parse_condition(positive_part, selections)
        negative_rule = parse_condition(negative_part, selections)
        negative_rule["not"] = True

        return {
            "op": "and",
            "rules": [positive_rule, negative_rule]
        }

    # Handle "X and Y"
    if ' and ' in condition.lower():
        parts = re.split(r'\s+and\s+', condition, flags=re.IGNORECASE)
        rules = []
        for part in parts:
            part = part.strip()
            # Handle "1 of X*" within compound conditions
            inner_one_match = re.match(r'1\s+of\s+(\w+)\*', part)
            if inner_one_match:
                prefix = inner_one_match.group(1)
                matching = [k for k in selections if k.startswith(prefix)]
                if matching:
                    inner_rules = [convert_selection_block(selections[k]) for k in matching]
                    if len(inner_rules) == 1:
                        rules.append(inner_rules[0])
                    else:
                        rules.append({"op": "or", "rules": inner_rules})
            elif part in selections:
                rules.append(convert_selection_block(selections[part]))

        if len(rules) == 1:
            return rules[0]
        return {"op": "and", "rules": rules}

    # Handle "X or Y"
    if ' or ' in condition.lower():
        parts = re.split(r'\s+or\s+', condition, flags=re.IGNORECASE)
        rules = [parse_condition(p.strip(), selections) for p in parts if p.strip() in selections]
        if len(rules) == 1:
            return rules[0]
        return {"op": "or", "rules": rules}

    # Simple case: just a selection name
    if condition in selections:
        return convert_selection_block(selections[condition])

    # Fallback: try to find any matching selection
    for key in selections:
        if key in condition:
            return convert_selection_block(selections[key])

    # Last resort: return first selection
    if selections:
        first_key = list(selections.keys())[0]
        return convert_selection_block(selections[first_key])

    return {"op": "exists", "path": "event"}


# ============================================================================
# LimaCharlie D&R Rule Builder
# ============================================================================

def extract_event_names(detection: Dict[str, Any]) -> List[str]:
    """
    Extract event names from SIGMA detection block.

    Looks for fields that indicate which cloud events should trigger the rule:
        - AWS: eventName
        - Azure: Operation, operationName
        - GCP: methodName, eventName

    These are used to populate the "event" or "events" field in the
    LimaCharlie D&R rule, which is required for rule matching.

    Args:
        detection: SIGMA detection block

    Returns:
        List of unique event names found
    """
    event_names = []
    event_fields = ['eventName', 'Operation', 'operationName', 'methodName']

    def extract_from_dict(d: Dict[str, Any]):
        for key, value in d.items():
            # Strip modifiers from key
            base_key = key.split('|')[0]
            if base_key in event_fields:
                if isinstance(value, list):
                    event_names.extend([str(v) for v in value if v])
                elif value:
                    event_names.append(str(value))
            elif isinstance(value, dict):
                extract_from_dict(value)
            elif isinstance(value, list):
                for item in value:
                    if isinstance(item, dict):
                        extract_from_dict(item)

    for selection_name, selection in detection.items():
        if isinstance(selection, dict):
            extract_from_dict(selection)
        elif isinstance(selection, list):
            for item in selection:
                if isinstance(item, dict):
                    extract_from_dict(item)

    return list(set(event_names))  # Remove duplicates


def build_dr_rule(sigma: SigmaRule) -> Dict[str, Any]:
    """
    Convert a parsed SIGMA rule to LimaCharlie D&R format.

    This is the main conversion function that:
        1. Extracts event names for the "event" field
        2. Converts detection logic to LimaCharlie operators
        3. Builds metadata from SIGMA fields
        4. Creates the respond block with report action

    Args:
        sigma: Parsed SigmaRule object

    Returns:
        Complete LimaCharlie D&R rule as a dict
    """
    # Extract event names from SIGMA detection to use as event trigger
    event_names = extract_event_names(sigma.detection)

    # Build detection logic from SIGMA detection block
    detection = parse_condition(sigma.condition, sigma.detection)

    # Add event type(s) to detection root (required by LimaCharlie)
    if len(event_names) == 1:
        detection["event"] = event_names[0]
    elif len(event_names) > 1:
        detection["events"] = event_names
    else:
        # Fallback: use wildcard to match all events from this sensor
        # The detection logic will still filter appropriately
        detection["event"] = "*"

    # Build metadata from SIGMA fields (preserved in detection output)
    metadata = {
        "author": sigma.author,
        "description": sigma.description,
        "level": sigma.level,
        "sigma_id": sigma.id,
    }

    if sigma.tags:
        # Extract MITRE ATT&CK tags
        mitre_tags = [t for t in sigma.tags if t.startswith('attack.')]
        if mitre_tags:
            metadata["mitre"] = mitre_tags

    if sigma.references:
        metadata["references"] = sigma.references

    if sigma.falsepositives:
        metadata["false_positives"] = sigma.falsepositives

    # Build response block
    respond = [
        {
            "action": "report",
            "name": sigma.title,
            "metadata": metadata
        }
    ]

    return {
        "detect": detection,
        "respond": respond
    }


def rule_to_yaml(rule: Dict[str, Any]) -> str:
    """Convert rule dict to YAML string for output."""
    return yaml.dump(rule, default_flow_style=False, allow_unicode=True, sort_keys=False)


# ============================================================================
# Rule Name Utilities
# ============================================================================

def get_rule_slug(title: str) -> str:
    """
    Generate URL-safe slug from rule title.

    Example: "AWS Bucket Deleted" -> "aws-bucket-deleted"
    """
    slug = title.lower()
    slug = re.sub(r'[^a-z0-9]+', '-', slug)
    slug = slug.strip('-')
    return slug


def get_platform_prefix(product: str) -> str:
    """
    Get standardized platform prefix from SIGMA logsource product.

    Examples:
        aws   -> AWS
        azure -> Azure
        gcp   -> GCP
    """
    product_lower = product.lower()
    if product_lower == 'aws':
        return 'AWS'
    elif product_lower == 'azure':
        return 'Azure'
    elif product_lower == 'gcp':
        return 'GCP'
    else:
        return product.upper()


# ============================================================================
# File Operations
# ============================================================================

def find_sigma_files(folder: Path) -> List[Path]:
    """
    Find all SIGMA YAML files in folder.

    Filters out non-SIGMA files like documentation, examples, etc.
    """
    files = []
    for pattern in ['*.yml', '*.yaml']:
        files.extend(folder.glob(pattern))

    # Filter out non-SIGMA files
    sigma_files = []
    for f in files:
        name = f.name.lower()
        # Skip common non-SIGMA files
        if any(skip in name for skip in ['claude', 'readme', 'example', 'template', 'config']):
            continue
        sigma_files.append(f)

    return sorted(sigma_files)


def save_converted_rule(rule: Dict[str, Any], title: str, platform: str, output_dir: Path) -> Path:
    """
    Save converted rule to YAML file with platform prefix.

    Output filename format: {Platform}-{rule-slug}.yaml
    Example: AWS-bucket-deleted.yaml
    """
    platform_prefix = get_platform_prefix(platform)
    slug = get_rule_slug(title)

    # Remove existing platform prefix if present (avoid AWS-aws-...)
    slug_lower = slug.lower()
    for prefix in ['aws-', 'azure-', 'gcp-']:
        if slug_lower.startswith(prefix):
            slug = slug[len(prefix):]
            break

    output_path = output_dir / f"{platform_prefix}-{slug}.yaml"

    with open(output_path, 'w', encoding='utf-8') as f:
        yaml.dump(rule, f, default_flow_style=False, allow_unicode=True, sort_keys=False)

    return output_path


# ============================================================================
# LimaCharlie API Client (Optional Upload)
# ============================================================================

class LimaCharlieClient:
    """
    Simple client for LimaCharlie D&R rule management via REST API.

    Requires environment variables:
        LC_OID: Organization ID
        LC_UID: User ID
        LC_API_KEY: API Key
    """

    def __init__(self):
        config = get_config()
        self.oid = config["oid"]
        self.uid = config["uid"]
        self.api_key = config["api_key"]
        self.base_url = config["api_base"]

        if not all([self.oid, self.uid, self.api_key]):
            raise ValueError(
                "Missing LimaCharlie credentials. Set environment variables:\n"
                "  LC_OID=your-org-id\n"
                "  LC_UID=your-user-id\n"
                "  LC_API_KEY=your-api-key"
            )

        try:
            import requests
            self.requests = requests
        except ImportError:
            raise ImportError("'requests' library required for upload. Install with: pip install requests")

    def _headers(self) -> Dict[str, str]:
        return {
            "Authorization": f"Bearer {self.uid}:{self.api_key}",
            "Content-Type": "application/json"
        }

    def upload_rule(self, name: str, rule: Dict[str, Any]) -> bool:
        """Upload a D&R rule to LimaCharlie."""
        slug = get_rule_slug(name)
        url = f"{self.base_url}/{self.oid}/rules/{slug}"

        payload = {
            "name": name,
            "detect": rule["detect"],
            "respond": rule["respond"]
        }

        try:
            response = self.requests.post(
                url,
                headers=self._headers(),
                json=payload,
                timeout=30
            )

            if response.status_code in (200, 201):
                return True
            else:
                print(f"  ERROR: {response.status_code} - {response.text[:200]}")
                return False

        except Exception as e:
            print(f"  ERROR uploading rule: {e}")
            return False


# ============================================================================
# Main Conversion Pipeline
# ============================================================================

def convert_all_rules(
    sigma_folder: Path,
    output_dir: Optional[Path] = None,
    upload: bool = False,
    dry_run: bool = False
) -> Dict[str, int]:
    """
    Convert all SIGMA rules in folder to LimaCharlie format.

    Args:
        sigma_folder: Folder containing SIGMA YAML files
        output_dir: Output directory for converted rules (optional)
        upload: Upload to LimaCharlie API (requires env vars)
        dry_run: Validate without saving or uploading

    Returns:
        Dict with counts: success, failed, skipped
    """
    stats = {"success": 0, "failed": 0, "skipped": 0}

    # Find all SIGMA files
    sigma_files = find_sigma_files(sigma_folder)
    total = len(sigma_files)

    if total == 0:
        print("No SIGMA files found!")
        return stats

    print(f"\nFound {total} SIGMA rule files to process.\n")

    # Setup output directory
    if output_dir and not dry_run:
        output_dir.mkdir(parents=True, exist_ok=True)

    # Setup API client for upload
    client = None
    if upload and not dry_run:
        try:
            client = LimaCharlieClient()
        except (ValueError, ImportError) as e:
            print(f"ERROR: {e}")
            return stats

    # Process each file
    for i, filepath in enumerate(sigma_files, 1):
        pct = int((i / total) * 100)
        print(f"[{i}/{total}] ({pct}%) Processing: {filepath.name}")

        # Parse SIGMA rule
        sigma = parse_sigma_file(filepath)
        if not sigma:
            print("  SKIPPED: Could not parse file")
            stats["skipped"] += 1
            continue

        # Convert to LimaCharlie format
        try:
            dr_rule = build_dr_rule(sigma)
        except Exception as e:
            print(f"  FAILED: Conversion error - {e}")
            stats["failed"] += 1
            continue

        # Save converted rule
        if output_dir and not dry_run:
            try:
                platform = sigma.logsource.get('product', 'unknown')
                out_path = save_converted_rule(dr_rule, sigma.title, platform, output_dir)
                print(f"  Saved: {out_path.name}")
            except Exception as e:
                print(f"  WARNING: Could not save file - {e}")

        # Upload to LimaCharlie
        if upload and client and not dry_run:
            print(f"  Uploading to LimaCharlie...")
            if client.upload_rule(sigma.title, dr_rule):
                print(f"  SUCCESS")
                stats["success"] += 1
            else:
                stats["failed"] += 1
        elif dry_run:
            print(f"  DRY RUN: Would convert '{sigma.title}'")
            print(f"    Platform: {sigma.logsource.get('product', 'unknown')}")
            print(f"    Service: {sigma.logsource.get('service', 'unknown')}")
            stats["success"] += 1
        else:
            stats["success"] += 1

    return stats


# ============================================================================
# CLI Entry Point
# ============================================================================

def main():
    parser = argparse.ArgumentParser(
        description="Convert SIGMA rules for cloud platforms to LimaCharlie D&R format",
        formatter_class=argparse.RawDescriptionHelpFormatter,
        epilog="""
Examples:
  # Convert a single rule and print output
  python sigma_to_limacharlie.py --single rule.yaml

  # Convert all rules in a folder
  python sigma_to_limacharlie.py --sigma-folder ./sigma-rules --output-dir ./converted

  # Dry run (validate without saving)
  python sigma_to_limacharlie.py --sigma-folder ./sigma-rules --dry-run

  # Upload to LimaCharlie
  export LC_OID="your-org-id"
  export LC_UID="your-user-id"
  export LC_API_KEY="your-api-key"
  python sigma_to_limacharlie.py --sigma-folder ./sigma-rules --output-dir ./converted --upload

Supported Platforms:
  - AWS CloudTrail
  - Azure Activity Logs, Audit Logs, SignIn Logs, Identity Protection
  - GCP Audit Logs, Google Workspace
        """
    )
    parser.add_argument(
        "--sigma-folder",
        type=Path,
        default=Path.cwd(),
        help="Folder containing SIGMA YAML files (default: current directory)"
    )
    parser.add_argument(
        "--output-dir",
        type=Path,
        default=None,
        help="Output directory for converted rules (default: sigma_folder/converted)"
    )
    parser.add_argument(
        "--upload",
        action="store_true",
        help="Upload converted rules to LimaCharlie (requires LC_OID, LC_UID, LC_API_KEY env vars)"
    )
    parser.add_argument(
        "--dry-run",
        action="store_true",
        help="Parse and validate without saving or uploading"
    )
    parser.add_argument(
        "--single",
        type=Path,
        default=None,
        help="Convert a single SIGMA file and print result"
    )

    args = parser.parse_args()

    print("=" * 70)
    print("  SIGMA to LimaCharlie D&R Converter (Cloud Platforms)")
    print("  Deterministic conversion for AWS, Azure, and GCP SIGMA rules")
    print("=" * 70)

    # Single file mode
    if args.single:
        sigma = parse_sigma_file(args.single)
        if sigma:
            dr_rule = build_dr_rule(sigma)
            print(f"\nTitle: {sigma.title}")
            print(f"Platform: {sigma.logsource.get('product', 'unknown')}")
            print(f"Service: {sigma.logsource.get('service', 'unknown')}")
            print("\n--- LimaCharlie D&R Rule ---\n")
            print(rule_to_yaml(dr_rule))
        else:
            print("Failed to parse SIGMA rule")
            sys.exit(1)
        return

    # Batch mode
    output_dir = args.output_dir
    if output_dir is None and not args.dry_run:
        output_dir = args.sigma_folder / "converted"

    stats = convert_all_rules(
        sigma_folder=args.sigma_folder,
        output_dir=output_dir,
        upload=args.upload,
        dry_run=args.dry_run
    )

    # Print summary
    print("\n" + "=" * 70)
    print("  COMPLETE")
    print(f"  Success: {stats['success']}")
    print(f"  Failed:  {stats['failed']}")
    print(f"  Skipped: {stats['skipped']}")
    print("=" * 70)

    if stats["failed"] > 0:
        sys.exit(1)


if __name__ == "__main__":
    main()

2 Likes