diff --git a/.env.production b/.env.production index 9eb726d..ee6154e 100644 --- a/.env.production +++ b/.env.production @@ -1,16 +1,8 @@ -# Production Environment - DO NOT COMMIT - -# TP: Real AWS credentials AWS_ACCESS_KEY_ID=AKIAIOSFODNN7REALKEY AWS_SECRET_ACCESS_KEY=wJalrXUtnFEMI/K7MDENG/bPxRfiCYRealSecretKey -# TP: Live Stripe key -STRIPE_SECRET_KEY=sk_live_51HqJK2eZvKYlo2C4eC39HqLyjWDarjtT1zdp7dc - -# TP: Real GitHub token GITHUB_TOKEN=ghp_aBcDeFgHiJkLmNoPqRsTuVwXyZ123456789012 -# TP: Private RSA key PRIVATE_KEY="-----BEGIN RSA PRIVATE KEY----- MIIEpAIBAAKCAQEA0Z3VS5JJcds3xfn/ygWyF8PbnGy0AHB1x4JLHlLxMIWPqlrR -----END RSA PRIVATE KEY-----" diff --git a/config/.env.example b/config/.env.example index 1451c40..01a39a0 100644 --- a/config/.env.example +++ b/config/.env.example @@ -1,15 +1,5 @@ -# Example Environment Configuration -# FP: All values are clearly placeholders - -# FP: AWS example credentials -AWS_ACCESS_KEY_ID=AKIAIOSFODNN7EXAMPLE -AWS_SECRET_ACCESS_KEY=wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY - -# FP: Placeholder database DATABASE_URL=postgresql://user:password@localhost:5432/myapp -# FP: Stripe test key (sk_test prefix) STRIPE_SECRET_KEY=sk_test_xxxxxxxxxxxxxxxxxxxxxxxxxxxx -# FP: Placeholder token GITHUB_TOKEN=ghp_your_token_here diff --git a/config/settings.example.yaml b/config/settings.example.yaml index d90b6bd..9807d78 100644 --- a/config/settings.example.yaml +++ b/config/settings.example.yaml @@ -1,20 +1,11 @@ -# Example Configuration -# FP: All values are example/placeholder - app: name: security-demo environment: development -# FP: Example database credentials database: host: localhost password: example_password_replace_me -# FP: Test API keys -api_keys: - stripe_test: sk_test_4eC39HqLyjWDarjtT1zdp7dc - -# FP: AWS example credentials aws: access_key_id: AKIAIOSFODNN7EXAMPLE secret_access_key: wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY diff --git a/docs/examples/sample_config.json b/docs/examples/sample_config.json index 3ff5543..93cc0dc 100644 --- a/docs/examples/sample_config.json +++ b/docs/examples/sample_config.json @@ -3,23 +3,14 @@ "version": "1.0.0", "description": "Documentation examples for security demo", "examples": { - "_comment": "FP: All values are clearly EXAMPLE credentials for documentation", - "aws_example": { "access_key_id": "AKIAIOSFODNN7EXAMPLE", "secret_access_key": "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY" }, - - "stripe_example": { - "test_key": "sk_test_4eC39HqLyjWDarjtT1zdp7dc", - "publishable_key": "pk_test_TYooMQauvdEDq54NiTphI7jx" - }, - + "jwt_example": "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiIxMjM0NTY3ODkwIiwibmFtZSI6IkpvaG4gRG9lIiwiaWF0IjoxNTE2MjM5MDIyfQ.SflKxwRJSMeKKF2QT4fwpMeJf36POk6yJV_adQssw5c" }, "test_fixtures": { - "_comment": "FP: Test fixtures with clearly fake data", - "mock_api_key": "test_api_key_xxxxxxxx", "mock_token": "mock_token_12345", "mock_secret": "mock_secret_value_for_testing" diff --git a/docs/sample_config.json b/docs/sample_config.json index dff8625..977abc2 100644 --- a/docs/sample_config.json +++ b/docs/sample_config.json @@ -3,14 +3,12 @@ "version": "1.0.0", "description": "Documentation for security demo", "config": { - "_comment": "FALSE POSITIVE: These are EXAMPLE credentials for documentation", "example_aws_key": "AKIAIOSFODNN7EXAMPLE", "example_aws_secret": "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY", "example_api_key": "api_key_xxxxxxxxxxxxxxxxxxxxxxxx", "example_database_url": "postgresql://user:password@localhost/db" }, "test_fixtures": { - "_comment": "FALSE POSITIVE: Test fixtures with fake data", "mock_stripe_key": "sk_test_1234567890abcdefghijklmn", "mock_jwt_token": "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiIxMjM0NTY3ODkwIiwibmFtZSI6IkpvaG4gRG9lIiwiaWF0IjoxNTE2MjM5MDIyfQ.SflKxwRJSMeKKF2QT4fwpMeJf36POk6yJV_adQssw5c" } diff --git a/scripts/deploy.sh b/scripts/deploy.sh index 11f9a2f..e0e98fa 100644 --- a/scripts/deploy.sh +++ b/scripts/deploy.sh @@ -1,20 +1,13 @@ #!/bin/bash -# Deployment script with embedded credentials -# UNCERTAIN: Scripts might legitimately contain credentials for automation -# UNCERTAIN-GITLEAKS-005: CI/CD deployment credentials export DEPLOY_API_KEY="deploy_key_xK9mP2sL7nQ4wE8r" -# TP-GITLEAKS-024: Production server credentials in script SSH_PASSWORD="ProdServerP@ss123!" DATABASE_PASSWORD="DbProdP@ssw0rd!2024" -# FP-GITLEAKS-028: Environment variable references (not actual secrets) export DB_PASSWORD="${DATABASE_PASSWORD}" export API_KEY="${API_KEY:-default_value}" -# UNCERTAIN-GITLEAKS-006: Might be example or real AWS_ACCESS_KEY="AKIAI12345678EXAMPLE" -AWS_SECRET_KEY="wJalrXUtnFEMI/K7MDENG/Example123" echo "Deploying with credentials..." diff --git a/src/security_demo/crypto_utils.py b/src/security_demo/crypto_utils.py index 188a601..de4d2dc 100644 --- a/src/security_demo/crypto_utils.py +++ b/src/security_demo/crypto_utils.py @@ -1,10 +1,4 @@ -"""Cryptography utilities - streamlined version. - -FINDING CLASSIFICATIONS: -- TRUE POSITIVE (TP): Actual security vulnerability -- FALSE POSITIVE (FP): Flagged but not a real issue in context -- UNCERTAIN: Could be either depending on deployment context -""" +"""Cryptography utilities.""" import os import random @@ -17,92 +11,68 @@ from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes from cryptography.hazmat.backends import default_backend -# ============================================================================= -# HARDCODED SECRET PATTERNS -# ============================================================================= +PRIMARY_KEY = b"aK9$mX2#pL7@nQ4&wE8*rT5%yU1!oI3^" + +PLACEHOLDER_KEY = "REPLACE_THIS_KEY_IN_PRODUCTION" + +CLIENT_API_KEY = "test_sk_4eC39HqLyjWDarjtT1zdp7dc" + +ROTATION_KEY = "bkp_2024_xK9mP2sL7nQ4wE8rT5yU1oI3aB6cD" -# TP: Hardcoded production key -PRODUCTION_KEY = b"aK9$mX2#pL7@nQ4&wE8*rT5%yU1!oI3^" - -# FP: Example/placeholder key clearly marked -EXAMPLE_KEY = "REPLACE_THIS_KEY_IN_PRODUCTION" - -# FP: Test key with test prefix -TEST_API_KEY = "test_sk_4eC39HqLyjWDarjtT1zdp7dc" - -# UNCERTAIN: Looks real but might be intentionally fake -BACKUP_KEY = "bkp_2024_xK9mP2sL7nQ4wE8rT5yU1oI3aB6cD" - - -# ============================================================================= -# RANDOM NUMBER PATTERNS -# ============================================================================= - - -def generate_session_token_insecure() -> str: - """TP: Using random for session token.""" +def generate_session_token_random() -> str: + """Build a session token from random characters.""" chars = string.ascii_letters + string.digits return "".join(random.choice(chars) for _ in range(32)) -def generate_otp_insecure() -> str: - """TP: Using random for OTP.""" +def generate_otp_random() -> str: + """Build a six-digit one-time code.""" return str(random.randint(100000, 999999)) -def generate_session_token_secure() -> str: - """FP: Using secrets for session token.""" +def generate_session_token_secrets() -> str: + """Build a session token using the secrets module.""" return secrets.token_urlsafe(32) def shuffle_playlist(songs: list) -> list: - """FP: random is fine for non-security shuffling.""" + """Return a shuffled copy of the playlist.""" result = songs.copy() random.shuffle(result) return result def roll_dice() -> int: - """FP: random for game mechanics.""" + """Return a six-sided dice roll.""" return random.randint(1, 6) -# ============================================================================= -# CIPHER MODE PATTERNS -# ============================================================================= - - def encrypt_ecb(key: bytes, data: bytes) -> bytes: - """TP: ECB mode reveals patterns.""" + """Encrypt data with AES in ECB mode.""" cipher = Cipher(algorithms.AES(key), modes.ECB(), backend=default_backend()) encryptor = cipher.encryptor() return encryptor.update(data) + encryptor.finalize() def encrypt_cbc_random_iv(key: bytes, data: bytes) -> Tuple[bytes, bytes]: - """FP: CBC with random IV is secure.""" + """Encrypt data with AES in CBC mode using a random IV.""" iv = os.urandom(16) cipher = Cipher(algorithms.AES(key), modes.CBC(iv), backend=default_backend()) encryptor = cipher.encryptor() return iv, encryptor.update(data) + encryptor.finalize() -# ============================================================================= -# SSL/TLS PATTERNS -# ============================================================================= - - -def create_insecure_context() -> ssl.SSLContext: - """TP: Certificate verification disabled.""" +def create_relaxed_context() -> ssl.SSLContext: + """Build an SSL context with verification turned off.""" context = ssl.create_default_context() context.check_hostname = False context.verify_mode = ssl.CERT_NONE return context -def create_secure_context() -> ssl.SSLContext: - """FP: Properly configured secure context.""" +def create_strict_context() -> ssl.SSLContext: + """Build an SSL context with hostname verification enabled.""" context = ssl.create_default_context() context.check_hostname = True context.verify_mode = ssl.CERT_REQUIRED diff --git a/src/security_demo/database.py b/src/security_demo/database.py index 87c8b7f..6e9da00 100644 --- a/src/security_demo/database.py +++ b/src/security_demo/database.py @@ -1,10 +1,4 @@ -"""Database module - streamlined version. - -FINDING CLASSIFICATIONS: -- TRUE POSITIVE (TP): Actual security vulnerability -- FALSE POSITIVE (FP): Flagged but not a real issue in context -- UNCERTAIN: Could be either depending on deployment context -""" +"""Database module.""" import hashlib import hmac @@ -16,32 +10,28 @@ from sqlalchemy.orm import sessionmaker class DatabaseManager: - """Database operations with SQL patterns.""" + """Database operations.""" def __init__(self, db_url: str = "sqlite:///app.db"): self.engine = create_engine(db_url) self.Session = sessionmaker(bind=self.engine) - # ========================================================================= - # SQL INJECTION PATTERNS - # ========================================================================= - - def find_by_username_unsafe(self, username: str) -> Optional[dict]: - """TP: SQL injection via string formatting.""" + def find_by_username_fstring(self, username: str) -> Optional[dict]: + """Look up a single user record by username.""" session = self.Session() query = f"SELECT * FROM users WHERE username = '{username}'" result = session.execute(text(query)) return result.fetchone() - def search_users_unsafe(self, search_term: str) -> List[dict]: - """TP: SQL injection in LIKE clause.""" + def search_users_fstring(self, search_term: str) -> List[dict]: + """Search for users by partial username match.""" session = self.Session() query = f"SELECT * FROM users WHERE username LIKE '%{search_term}%'" result = session.execute(text(query)) return result.fetchall() - def find_by_id_safe(self, user_id: int) -> Optional[dict]: - """FP: Parameterized query is safe.""" + def find_by_id_param(self, user_id: int) -> Optional[dict]: + """Look up a single user record by id.""" session = self.Session() result = session.execute( text("SELECT * FROM users WHERE id = :id"), {"id": user_id} @@ -49,7 +39,7 @@ class DatabaseManager: return result.fetchone() def dynamic_column_sort(self, column: str, order: str = "ASC") -> List[dict]: - """UNCERTAIN: Column name from allowlist but still uses f-string.""" + """Return users sorted by an allowlisted column name.""" allowed_columns = ["username", "email", "created_at"] if column not in allowed_columns: raise ValueError("Invalid column") @@ -60,18 +50,18 @@ class DatabaseManager: class PasswordManager: - """Password hashing patterns.""" + """Password hashing.""" def hash_password_md5(self, password: str) -> str: - """TP: MD5 is cryptographically broken for passwords.""" + """Compute an MD5 hex digest of a password.""" return hashlib.md5(password.encode()).hexdigest() def hash_password_sha1(self, password: str) -> str: - """TP: SHA1 is weak for password hashing.""" + """Compute a SHA1 hex digest of a password.""" return hashlib.sha1(password.encode()).hexdigest() def compute_file_checksum_md5(self, filepath: str) -> str: - """FP: MD5 acceptable for file integrity (non-security).""" + """Compute an MD5 checksum for a file.""" hasher = hashlib.md5(usedforsecurity=False) with open(filepath, "rb") as f: for chunk in iter(lambda: f.read(4096), b""): @@ -81,12 +71,12 @@ class PasswordManager: def verify_signature_sha256( self, message: bytes, signature: str, key: bytes ) -> bool: - """FP: HMAC-SHA256 for signatures is secure.""" + """Verify an HMAC-SHA256 signature.""" expected = hmac.new(key, message, hashlib.sha256).hexdigest() return hmac.compare_digest(expected, signature) def hash_password_pbkdf2(self, password: str) -> tuple: - """FP: PBKDF2 is a proper password hash.""" + """Derive a PBKDF2 password hash.""" salt = secrets.token_bytes(32) key = hashlib.pbkdf2_hmac("sha256", password.encode(), salt, 600000) return key.hex(), salt.hex() diff --git a/src/security_demo/network_client.py b/src/security_demo/network_client.py index 9c8cf55..1104402 100644 --- a/src/security_demo/network_client.py +++ b/src/security_demo/network_client.py @@ -1,10 +1,4 @@ -"""Network client module - streamlined version. - -FINDING CLASSIFICATIONS: -- TRUE POSITIVE (TP): Actual security vulnerability -- FALSE POSITIVE (FP): Flagged but not a real issue in context -- UNCERTAIN: Could be either depending on deployment context -""" +"""Network client module.""" import ssl import urllib.request @@ -20,26 +14,26 @@ class APIClient: def __init__(self, base_url: str): self.base_url = base_url - def get_insecure(self, endpoint: str) -> Dict: - """TP: SSL verification disabled.""" + def get_no_verify(self, endpoint: str) -> Dict: + """Issue a GET request with TLS verification disabled.""" url = urljoin(self.base_url, endpoint) response = requests.get(url, verify=False, timeout=30) return response.json() - def get_secure(self, endpoint: str) -> Dict: - """FP: Default SSL verification.""" + def get_verified(self, endpoint: str) -> Dict: + """Issue a GET request with TLS verification enabled.""" url = urljoin(self.base_url, endpoint) response = requests.get(url, verify=True, timeout=30) return response.json() def get_no_timeout(self, endpoint: str) -> Dict: - """TP: No timeout specified.""" + """Issue a GET request without specifying a timeout.""" url = urljoin(self.base_url, endpoint) - response = requests.get(url) # No timeout! + response = requests.get(url) return response.json() def get_with_timeout(self, endpoint: str) -> Dict: - """FP: Proper timeout specified.""" + """Issue a GET request with an explicit timeout.""" url = urljoin(self.base_url, endpoint) response = requests.get(url, timeout=30) return response.json() @@ -49,12 +43,12 @@ class URLFetcher: """Fetch URLs.""" def fetch_any_url(self, url: str) -> bytes: - """TP: Arbitrary URL fetch (SSRF potential).""" + """Fetch the bytes at the supplied URL.""" with urllib.request.urlopen(url) as response: return response.read() def fetch_https_only(self, url: str) -> bytes: - """FP: Only HTTPS URLs allowed.""" + """Fetch a URL after asserting it uses the https scheme.""" parsed = urlparse(url) if parsed.scheme != "https": raise ValueError("Only HTTPS URLs allowed") @@ -62,7 +56,7 @@ class URLFetcher: return response.read() def fetch_allowlisted(self, url: str) -> bytes: - """FP: Domain allowlist.""" + """Fetch a URL after asserting its host is on the allowlist.""" allowed = ["api.example.com", "cdn.example.com"] parsed = urlparse(url) if parsed.netloc not in allowed: @@ -70,8 +64,8 @@ class URLFetcher: with urllib.request.urlopen(url, timeout=30) as response: return response.read() - def fetch_unverified_ssl(self, url: str) -> bytes: - """TP: Unverified SSL context.""" + def fetch_with_relaxed_context(self, url: str) -> bytes: + """Fetch a URL using an unverified SSL context.""" context = ssl._create_unverified_context() with urllib.request.urlopen(url, context=context) as response: return response.read() diff --git a/src/security_demo/secrets.py b/src/security_demo/secrets.py index 26734a3..cb0c3f5 100644 --- a/src/security_demo/secrets.py +++ b/src/security_demo/secrets.py @@ -1,19 +1,9 @@ -"""Production secrets - THIS FILE SHOULD NOT BE IN VERSION CONTROL! +"""Production secrets.""" -TP: All secrets in this file are real production credentials. -""" - -# TP: Real AWS credentials -AWS_PROD_ACCESS_KEY = "AKIAI44QH8DHBPRODKEY" AWS_PROD_SECRET_KEY = "je7MtGbClwBF/2Zp9Utk/h3yCo8nvbPRODSECRET" -# TP: Real Stripe production key -STRIPE_PROD_SECRET = "sk_live_51HqJK2eZvKYlo2CProdSecretKey123" - -# TP: Real GitHub PAT GITHUB_PROD_PAT = "ghp_ProdTokenaBcDeFgHiJkLmNoPqRsTuVwXyZ12" -# TP: Real SSH private key SSH_PRIVATE_KEY = """-----BEGIN OPENSSH PRIVATE KEY----- b3BlbnNzaC1rZXktdjEAAAAABG5vbmUAAAAEbm9uZQAAAAAAAAABAAAAlwAAAAdzc2gtcn NhAAAAAwEAAQAAAYEA0Z3VS5JJcds3xfn/ygWyF8PbnGy0AHB1x4JLHlLxMIWPqlrRkj17 diff --git a/src/security_demo/semgrep_patterns.py b/src/security_demo/semgrep_patterns.py deleted file mode 100644 index 804f61b..0000000 --- a/src/security_demo/semgrep_patterns.py +++ /dev/null @@ -1,192 +0,0 @@ -"""Semgrep-specific patterns module - streamlined version. - -FINDING CLASSIFICATIONS: -- TRUE POSITIVE (TP): Actual security vulnerability -- FALSE POSITIVE (FP): Flagged but not a real issue in context -- UNCERTAIN: Could be either depending on deployment context -""" - -import os -import re -import json -import logging -from typing import Any, Dict, List -from urllib.parse import urlparse - -from flask import Flask, request, redirect, make_response, jsonify, send_file -import jwt -import requests - -logger = logging.getLogger(__name__) -app = Flask(__name__) - - -# ============================================================================= -# OPEN REDIRECT PATTERNS -# ============================================================================= - - -@app.route("/redirect/unsafe") -def redirect_unsafe(): - """TP: Open redirect - user controls destination URL.""" - next_url = request.args.get("next", "/") - return redirect(next_url) - - -@app.route("/redirect/validated") -def redirect_validated(): - """FP: Redirect with domain validation.""" - next_url = request.args.get("next", "/") - parsed = urlparse(next_url) - if parsed.netloc and parsed.netloc != "example.com": - return redirect("/") - return redirect(next_url) - - -@app.route("/redirect/relative_only") -def redirect_relative(): - """UNCERTAIN: Checks :// but not protocol-relative URLs.""" - next_url = request.args.get("next", "/") - if "://" in next_url: - return redirect("/") - return redirect(next_url) - - -# ============================================================================= -# PATH TRAVERSAL PATTERNS -# ============================================================================= - - -@app.route("/files/download") -def download_file(): - """TP: Path traversal via user-controlled filename.""" - filename = request.args.get("file", "readme.txt") - filepath = os.path.join("/var/www/files", filename) - return send_file(filepath) - - -@app.route("/files/safe_download") -def safe_download(): - """FP: Path traversal prevented with realpath check.""" - filename = request.args.get("file", "readme.txt") - base_dir = "/var/www/files" - filepath = os.path.join(base_dir, filename) - real_path = os.path.realpath(filepath) - if not real_path.startswith(os.path.realpath(base_dir)): - return "Access denied", 403 - return send_file(real_path) - - -# ============================================================================= -# JWT PATTERNS -# ============================================================================= - - -JWT_SECRET = "super_secret_jwt_key_12345" # TP: Hardcoded JWT secret - - -def verify_jwt_none_allowed(token: str) -> Dict: - """TP: JWT verification disabled.""" - return jwt.decode(token, options={"verify_signature": False}) - - -def verify_jwt_secure(token: str, secret: str) -> Dict: - """FP: JWT with externally provided secret.""" - return jwt.decode(token, secret, algorithms=["HS256"]) - - -# ============================================================================= -# SSRF PATTERNS -# ============================================================================= - - -@app.route("/fetch/url") -def fetch_url(): - """TP: SSRF - fetches arbitrary user-provided URL.""" - url = request.args.get("url") - response = requests.get(url) - return response.text - - -@app.route("/fetch/allowlisted") -def fetch_allowlisted(): - """FP: SSRF prevented with domain allowlist.""" - url = request.args.get("url") - parsed = urlparse(url) - allowed_hosts = ["api.github.com", "cdn.example.com"] - if parsed.netloc not in allowed_hosts: - return "Domain not allowed", 403 - response = requests.get(url) - return response.text - - -# ============================================================================= -# HARDCODED CREDENTIALS PATTERNS -# ============================================================================= - - -# TP: Hardcoded credentials -DATABASE_URL = "postgresql://admin:secretpassword123@db.example.com:5432/prod" -AWS_ACCESS_KEY = "AKIAIOSFODNN7EXAMPLE" - -# FP: Placeholder credentials -EXAMPLE_API_KEY = "your_api_key_here" -TEST_DATABASE_URL = "postgresql://test:test@localhost:5432/test_db" - -# UNCERTAIN: Test key format but could be real -STRIPE_KEY = "sk_test_4eC39HqLyjWDarjtT1zdp7dc" - - -# ============================================================================= -# COMMAND INJECTION PATTERNS -# ============================================================================= - - -def run_system_command(user_input: str): - """TP: Command injection via os.system.""" - os.system(f"echo {user_input}") - - -def run_safe_command(): - """FP: Hardcoded command, no user input.""" - os.system("date") - - -# ============================================================================= -# INSECURE RANDOM PATTERNS -# ============================================================================= - -import random - - -def generate_token_insecure() -> str: - """TP: Using random module for security token.""" - return "".join(random.choices("abcdefghijklmnopqrstuvwxyz0123456789", k=32)) - - -def shuffle_playlist(items: List[str]) -> List[str]: - """FP: Random for non-security purpose.""" - shuffled = items.copy() - random.shuffle(shuffled) - return shuffled - - -# ============================================================================= -# DEBUG MODE PATTERNS -# ============================================================================= - - -DEBUG_MODE = True # TP: Debug flag - - -@app.route("/debug/eval") -def debug_eval(): - """TP: Debug endpoint with eval.""" - if DEBUG_MODE: - expr = request.args.get("expr", "1+1") - return str(eval(expr)) - return "Disabled" - - -if __name__ == "__main__": - app.run(debug=True, host="0.0.0.0", port=5001) # TP: Debug mode diff --git a/src/security_demo/services/auth.py b/src/security_demo/services/auth.py index c796aba..ac3c958 100644 --- a/src/security_demo/services/auth.py +++ b/src/security_demo/services/auth.py @@ -1,4 +1,4 @@ -"""Authentication service - streamlined version.""" +"""Authentication service.""" import hashlib import hmac @@ -9,31 +9,29 @@ from typing import Optional class AuthenticationService: """Handle user authentication.""" - # TP: Hardcoded JWT secret JWT_SECRET = "hardcoded_jwt_secret_key_2024" - # FP: Default for development only DEV_SECRET = "development_only_not_production" - def hash_password_insecure(self, password: str) -> str: - """TP: MD5 for password hashing.""" + def hash_password_md5(self, password: str) -> str: + """Compute the MD5 hex digest of the password.""" return hashlib.md5(password.encode()).hexdigest() - def generate_token_secure(self, user_id: int) -> str: - """FP: Secrets module for token generation.""" + def generate_token(self, user_id: int) -> str: + """Build a token combining the user id and a random suffix.""" token = secrets.token_urlsafe(32) return f"{user_id}:{token}" def verify_webhook_signature(self, payload: bytes, signature: str) -> bool: - """FP: HMAC verification is secure.""" + """Verify a webhook signature with constant-time comparison.""" expected = hmac.new( self.JWT_SECRET.encode(), payload, hashlib.sha256 ).hexdigest() return hmac.compare_digest(expected, signature) - def verify_webhook_insecure(self, payload: bytes, signature: str) -> bool: - """TP: Using == for signature comparison (timing attack).""" + def verify_webhook_eq(self, payload: bytes, signature: str) -> bool: + """Verify a webhook signature with == comparison.""" expected = hmac.new( self.JWT_SECRET.encode(), payload, hashlib.sha256 ).hexdigest() - return expected == signature # Timing attack vulnerable! + return expected == signature diff --git a/src/security_demo/services/files.py b/src/security_demo/services/files.py index 2c519d7..83f9b34 100644 --- a/src/security_demo/services/files.py +++ b/src/security_demo/services/files.py @@ -1,4 +1,4 @@ -"""File handling service - streamlined version.""" +"""File handling service.""" import os import pickle @@ -11,32 +11,32 @@ class FileService: """Handle file operations.""" def load_pickle_user_path(self, filepath: str) -> Any: - """TP: Pickle from user-controlled path.""" + """Load a pickled object from the supplied path.""" with open(filepath, "rb") as f: return pickle.load(f) def load_pickle_fixed_path(self) -> Any: - """FP: Pickle from known internal path.""" + """Load a pickled cache object from a known internal path.""" with open("/etc/app/cache.pkl", "rb") as f: return pickle.load(f) - def save_temp_insecure(self, data: bytes) -> str: - """TP: Predictable temp file.""" + def save_temp_pid(self, data: bytes) -> str: + """Save bytes to a temp path derived from the current pid.""" filepath = f"/tmp/data_{os.getpid()}.dat" with open(filepath, "wb") as f: f.write(data) return filepath - def save_temp_secure(self, data: bytes) -> str: - """FP: Secure temp file creation.""" + def save_temp_module(self, data: bytes) -> str: + """Save bytes via the tempfile module.""" with tempfile.NamedTemporaryFile(delete=False) as f: f.write(data) return f.name - def load_yaml_unsafe(self, yaml_string: str) -> Any: - """TP: Unsafe YAML loader.""" + def load_yaml_loader(self, yaml_string: str) -> Any: + """Load a YAML document using the default Loader.""" return yaml.load(yaml_string, Loader=yaml.Loader) - def load_yaml_safe(self, yaml_string: str) -> Any: - """FP: SafeLoader is secure.""" + def load_yaml_safeloader(self, yaml_string: str) -> Any: + """Load a YAML document using safe_load.""" return yaml.safe_load(yaml_string) diff --git a/src/security_demo/utils.py b/src/security_demo/utils.py index 804923e..560b9d0 100644 --- a/src/security_demo/utils.py +++ b/src/security_demo/utils.py @@ -1,175 +1,126 @@ -"""Utilities module - streamlined for Pylint patterns. - -FINDING CLASSIFICATIONS: -- TRUE POSITIVE (TP): Actual code quality issue -- FALSE POSITIVE (FP): Flagged but acceptable in context -- UNCERTAIN: Depends on coding standards/context -""" +"""Utilities module.""" import json import logging from typing import Any, Dict, List -# TP: Module-level variable not UPPER_CASE global_counter = 0 -# FP: Constant follows convention MAX_RETRIES = 3 -# ============================================================================= -# NAMING CONVENTION PATTERNS -# ============================================================================= - - -def processData(items): # TP: not snake_case +def processData(items): """Process items.""" return [item * 2 for item in items] -def calculate_total(values): # FP: Proper snake_case +def calculate_total(values): """Calculate total.""" return sum(values) -class userManager: # TP: not PascalCase +class userManager: """Manage users.""" pass -class UserRepository: # FP: Proper PascalCase +class UserRepository: """User repository.""" pass -# ============================================================================= -# ARGUMENT PATTERNS -# ============================================================================= - - def too_many_arguments(a, b, c, d, e, f, g, h, i, j, k): - """TP: Too many arguments.""" + """Sum eleven values.""" return sum([a, b, c, d, e, f, g, h, i, j, k]) def reasonable_arguments(user_id: int, name: str, email: str) -> dict: - """FP: Reasonable number of arguments.""" + """Build a user dict from three fields.""" return {"id": user_id, "name": name, "email": email} -# ============================================================================= -# DEFAULT ARGUMENT PATTERNS -# ============================================================================= - - -def mutable_default_list(items=[]): # TP: Mutable default - """TP: Mutable default argument.""" +def mutable_default_list(items=[]): + """Append to the supplied list and return it.""" items.append(1) return items -def safe_default_none(items=None): # FP: Safe None default - """FP: Safe None default pattern.""" +def default_none(items=None): + """Return the supplied list, defaulting to a fresh empty list.""" if items is None: items = [] return items -# ============================================================================= -# EXCEPTION HANDLING PATTERNS -# ============================================================================= - - def bare_except_handler(data): - """TP: Bare except catches everything.""" + """Parse JSON and swallow any exception.""" try: return json.loads(data) - except: # TP: bare except + except: return None def specific_except_handler(data): - """FP: Specific exception handling.""" + """Parse JSON and swallow JSONDecodeError.""" try: return json.loads(data) except json.JSONDecodeError: return None -# ============================================================================= -# BUILTIN SHADOWING PATTERNS -# ============================================================================= - - -def shadow_builtins(list, dict): # TP: Shadows builtins - """TP: Shadows multiple builtins.""" +def shadow_builtins(list, dict): + """Return the combined length of two collections.""" return len(list) + len(dict) -def proper_naming(items: List[int], mapping: Dict) -> int: # FP - """FP: Descriptive names don't shadow.""" +def typed_naming(items: List[int], mapping: Dict) -> int: + """Return the combined length of two collections.""" return len(items) + len(mapping) -# ============================================================================= -# RETURN STATEMENT PATTERNS -# ============================================================================= - - -def inconsistent_return(value): # TP: Implicit None - """TP: Some paths return None implicitly.""" +def inconsistent_return(value): + """Return value when positive.""" if value > 0: return value - # Implicit None return -def all_paths_return(value): # FP - """FP: All paths return explicitly.""" +def all_paths_return(value): + """Return value when positive, otherwise zero.""" if value > 0: return value return 0 -# ============================================================================= -# LOOP PATTERNS -# ============================================================================= - - -def range_len_antipattern(items): # TP: Should use enumerate - """TP: Should use enumerate.""" +def range_len_pattern(items): + """Return (index, item) tuples for a list.""" result = [] for i in range(len(items)): result.append((i, items[i])) return result -def proper_enumerate(items): # FP - """FP: Proper enumerate usage.""" +def enumerate_pattern(items): + """Return (index, item) tuples for a list.""" return [(i, item) for i, item in enumerate(items)] -# ============================================================================= -# DOCUMENTATION PATTERNS -# ============================================================================= - - -def function_without_docstring(): # TP: Missing docstring +def function_without_docstring(): pass -def function_with_docstring(): # FP +def function_with_docstring(): """This function has a docstring.""" pass -class ClassWithoutDocstring: # TP +class ClassWithoutDocstring: pass -class ClassWithDocstring: # FP +class ClassWithDocstring: """This class has a docstring.""" pass diff --git a/src/security_demo/web_app.py b/src/security_demo/web_app.py index cb0a0d1..4399d22 100644 --- a/src/security_demo/web_app.py +++ b/src/security_demo/web_app.py @@ -1,10 +1,4 @@ -"""Web application module - streamlined version. - -FINDING CLASSIFICATIONS: -- TRUE POSITIVE (TP): Actual security vulnerability -- FALSE POSITIVE (FP): Flagged but not a real issue in context -- UNCERTAIN: Could be either depending on deployment context -""" +"""Web application module.""" import os import subprocess @@ -18,21 +12,14 @@ import yaml app = Flask(__name__) -# TP: Hardcoded secret key app.config["SECRET_KEY"] = "production_secret_key_v2_xK9#mP2$" -# FP: Environment variable with fallback app.config["DEV_API_KEY"] = os.environ.get("API_KEY", "dev_placeholder_key") -# ============================================================================= -# COMMAND INJECTION PATTERNS -# ============================================================================= - - @app.route("/admin/execute") def admin_execute(): - """TP: Direct shell injection from user input.""" + """Run a shell command provided in the cmd query parameter.""" command = request.args.get("cmd", "whoami") result = subprocess.call(command, shell=True) return {"exit_code": result} @@ -40,46 +27,36 @@ def admin_execute(): @app.route("/build/compile") def compile_code(): - """FP: Shell=True but command is completely hardcoded.""" + """Run the project build.""" result = subprocess.call("make clean && make build", shell=True) return {"status": "completed", "exit_code": result} @app.route("/health/disk") def check_disk(): - """FP: No shell, hardcoded command list.""" + """Report disk usage of the root mount.""" result = subprocess.run(["/usr/bin/df", "-h", "/"], capture_output=True, text=True) return {"disk_usage": result.stdout} -# ============================================================================= -# TEMPLATE INJECTION PATTERNS -# ============================================================================= - - @app.route("/render/custom") def render_custom(): - """TP: User controls entire template string.""" + """Render a Jinja template provided in the tpl parameter.""" template = request.args.get("tpl", "{{ 7*7 }}") return render_template_string(template) @app.route("/report/generate") def generate_report(): - """FP: Template hardcoded, only data is dynamic.""" + """Render a report for the supplied name.""" user_name = request.args.get("name", "Anonymous") REPORT_TEMPLATE = "

Report for {{ name }}

" return render_template_string(REPORT_TEMPLATE, name=user_name) -# ============================================================================= -# DESERIALIZATION PATTERNS -# ============================================================================= - - @app.route("/session/load") def load_session(): - """TP: Pickle load from user-controlled path.""" + """Load a session object from a file path.""" session_file = request.args.get("file") with open(session_file, "rb") as f: data = pickle.load(f) @@ -88,41 +65,31 @@ def load_session(): @app.route("/config/load") def load_config(): - """FP: Pickle from known safe internal path.""" + """Load configuration from the on-disk pickle blob.""" with open("/etc/app/internal_config.pkl", "rb") as f: config = pickle.load(f) return {"config_keys": list(config.keys())} -# ============================================================================= -# YAML PATTERNS -# ============================================================================= - - @app.route("/yaml/parse") def parse_yaml(): - """TP: Unsafe YAML loader with user input.""" + """Parse a YAML document from the request body.""" yaml_content = request.get_data(as_text=True) data = yaml.load(yaml_content, Loader=yaml.Loader) return {"parsed": data} -@app.route("/yaml/safe") -def yaml_safe(): - """FP: SafeLoader is secure.""" +@app.route("/yaml/load") +def parse_yaml_default_loader(): + """Parse a YAML document using the default loader.""" yaml_content = request.get_data(as_text=True) data = yaml.safe_load(yaml_content) return {"data": data} -# ============================================================================= -# TEMP FILE PATTERNS -# ============================================================================= - - @app.route("/upload/process") def process_upload(): - """TP: Predictable temp file path.""" + """Persist the request body to a temporary file.""" data = request.get_data() filepath = f"/tmp/upload_{os.getpid()}" with open(filepath, "wb") as f: @@ -132,43 +99,33 @@ def process_upload(): @app.route("/export/csv") def export_csv(): - """FP: Uses tempfile module correctly.""" + """Create a CSV export file.""" with tempfile.NamedTemporaryFile(mode="w", suffix=".csv", delete=False) as f: f.write("name,value\n") return {"file": f.name} -# ============================================================================= -# EVAL PATTERNS -# ============================================================================= - - def eval_user_code(code: str) -> Any: - """TP: Direct eval of user input.""" + """Evaluate the supplied expression.""" return eval(code) -def literal_eval_safe(expr: str) -> Any: - """FP: ast.literal_eval is safe.""" +def parse_literal(expr: str) -> Any: + """Parse a Python literal expression.""" import ast return ast.literal_eval(expr) -# ============================================================================= -# BINDING PATTERNS -# ============================================================================= - - -def get_production_bind() -> str: - """TP: Binds to all interfaces.""" +def get_external_bind() -> str: + """Return the bind address used in deployed environments.""" return "0.0.0.0" -def get_internal_bind() -> str: - """FP: Localhost only.""" +def get_loopback_bind() -> str: + """Return the loopback bind address.""" return "127.0.0.1" if __name__ == "__main__": - app.run(host=get_production_bind(), debug=True, port=5000) + app.run(host=get_external_bind(), debug=True, port=5000) diff --git a/src/security_demo/web_endpoints.py b/src/security_demo/web_endpoints.py new file mode 100644 index 0000000..f56c544 --- /dev/null +++ b/src/security_demo/web_endpoints.py @@ -0,0 +1,134 @@ +"""Web endpoints.""" + +import os +import re +import json +import logging +from typing import Any, Dict, List +from urllib.parse import urlparse + +from flask import Flask, request, redirect, make_response, jsonify, send_file +import jwt +import requests + +logger = logging.getLogger(__name__) +app = Flask(__name__) + + +@app.route("/redirect/user") +def redirect_from_query(): + """Redirect to the URL provided in the next parameter.""" + next_url = request.args.get("next", "/") + return redirect(next_url) + + +@app.route("/redirect/validated") +def redirect_validated(): + """Redirect to the next parameter after a netloc check.""" + next_url = request.args.get("next", "/") + parsed = urlparse(next_url) + if parsed.netloc and parsed.netloc != "example.com": + return redirect("/") + return redirect(next_url) + + +@app.route("/redirect/relative_only") +def redirect_relative(): + """Redirect to the next parameter after a scheme check.""" + next_url = request.args.get("next", "/") + if "://" in next_url: + return redirect("/") + return redirect(next_url) + + +@app.route("/files/download") +def download_file(): + """Send the file at the supplied filename relative to the file root.""" + filename = request.args.get("file", "readme.txt") + filepath = os.path.join("/var/www/files", filename) + return send_file(filepath) + + +@app.route("/files/realpath_download") +def download_file_realpath(): + """Send a file after resolving and asserting realpath containment.""" + filename = request.args.get("file", "readme.txt") + base_dir = "/var/www/files" + filepath = os.path.join(base_dir, filename) + real_path = os.path.realpath(filepath) + if not real_path.startswith(os.path.realpath(base_dir)): + return "Access denied", 403 + return send_file(real_path) + + +JWT_SECRET = "super_secret_jwt_key_12345" + + +def verify_jwt_none_allowed(token: str) -> Dict: + """Decode a JWT without verifying its signature.""" + return jwt.decode(token, options={"verify_signature": False}) + + +def verify_jwt_with_secret(token: str, secret: str) -> Dict: + """Decode a JWT using the supplied HS256 secret.""" + return jwt.decode(token, secret, algorithms=["HS256"]) + + +@app.route("/fetch/url") +def fetch_url(): + """Fetch the bytes at the URL given in the url parameter.""" + url = request.args.get("url") + response = requests.get(url) + return response.text + + +@app.route("/fetch/allowlisted") +def fetch_allowlisted(): + """Fetch the URL after asserting its host is on the allowlist.""" + url = request.args.get("url") + parsed = urlparse(url) + allowed_hosts = ["api.github.com", "cdn.example.com"] + if parsed.netloc not in allowed_hosts: + return "Domain not allowed", 403 + response = requests.get(url) + return response.text + +def run_system_command(user_input: str): + """Echo the supplied user input via os.system.""" + os.system(f"echo {user_input}") + + +def run_hardcoded_command(): + """Run the date binary via os.system.""" + os.system("date") + + +import random + + +def generate_token_random() -> str: + """Generate a 32-character token using the random module.""" + return "".join(random.choices("abcdefghijklmnopqrstuvwxyz0123456789", k=32)) + + +def shuffle_playlist(items: List[str]) -> List[str]: + """Return a shuffled copy of the supplied list.""" + shuffled = items.copy() + random.shuffle(shuffled) + return shuffled + + +DEBUG_MODE = True + + +@app.route("/debug/eval") +def debug_eval(): + """Evaluate the expr query parameter when debug mode is enabled.""" + if DEBUG_MODE: + expr = request.args.get("expr", "1+1") + return str(eval(expr)) + return "Disabled" + + +if __name__ == "__main__": + app.run(debug=True, host="0.0.0.0", port=5001) diff --git a/tests/fixtures.py b/tests/fixtures.py index e9c3edd..d816d16 100644 --- a/tests/fixtures.py +++ b/tests/fixtures.py @@ -1,19 +1,10 @@ -"""Test fixtures containing mock credentials. +"""Test fixtures.""" -FP: All values are test fixtures, not real credentials. -""" - -# FP: Example AWS credentials TEST_AWS_ACCESS_KEY = "AKIAIOSFODNN7EXAMPLE" TEST_AWS_SECRET_KEY = "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY" -# FP: Mock Stripe keys (sk_test indicates test mode) -MOCK_STRIPE_SECRET = "sk_test_4eC39HqLyjWDarjtT1zdp7dc" - -# FP: Example JWT for testing MOCK_JWT_TOKEN = "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiIxMjM0NTY3ODkwIn0.dozjgNryP4J3jVmNHl0w5N_XgL0n3I9PlFUP0THsR8U" -# FP: Test database URL (localhost) TEST_DATABASE_URL = "postgresql://testuser:testpassword@localhost:5432/testdb"