Remove FP/TP identifiers and rename methods and variables

This commit is contained in:
Alexander Braml
2026-04-09 13:44:42 +02:00
parent 5f95942b7b
commit 4d57410125
17 changed files with 259 additions and 521 deletions

View File

@@ -1,16 +1,8 @@
# Production Environment - DO NOT COMMIT
# TP: Real AWS credentials
AWS_ACCESS_KEY_ID=AKIAIOSFODNN7REALKEY
AWS_SECRET_ACCESS_KEY=wJalrXUtnFEMI/K7MDENG/bPxRfiCYRealSecretKey
# TP: Live Stripe key
STRIPE_SECRET_KEY=sk_live_51HqJK2eZvKYlo2C4eC39HqLyjWDarjtT1zdp7dc
# TP: Real GitHub token
GITHUB_TOKEN=ghp_aBcDeFgHiJkLmNoPqRsTuVwXyZ123456789012
# TP: Private RSA key
PRIVATE_KEY="-----BEGIN RSA PRIVATE KEY-----
MIIEpAIBAAKCAQEA0Z3VS5JJcds3xfn/ygWyF8PbnGy0AHB1x4JLHlLxMIWPqlrR
-----END RSA PRIVATE KEY-----"

View File

@@ -1,15 +1,5 @@
# Example Environment Configuration
# FP: All values are clearly placeholders
# FP: AWS example credentials
AWS_ACCESS_KEY_ID=AKIAIOSFODNN7EXAMPLE
AWS_SECRET_ACCESS_KEY=wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY
# FP: Placeholder database
DATABASE_URL=postgresql://user:password@localhost:5432/myapp
# FP: Stripe test key (sk_test prefix)
STRIPE_SECRET_KEY=sk_test_xxxxxxxxxxxxxxxxxxxxxxxxxxxx
# FP: Placeholder token
GITHUB_TOKEN=ghp_your_token_here

View File

@@ -1,20 +1,11 @@
# Example Configuration
# FP: All values are example/placeholder
app:
name: security-demo
environment: development
# FP: Example database credentials
database:
host: localhost
password: example_password_replace_me
# FP: Test API keys
api_keys:
stripe_test: sk_test_4eC39HqLyjWDarjtT1zdp7dc
# FP: AWS example credentials
aws:
access_key_id: AKIAIOSFODNN7EXAMPLE
secret_access_key: wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY

View File

@@ -3,23 +3,14 @@
"version": "1.0.0",
"description": "Documentation examples for security demo",
"examples": {
"_comment": "FP: All values are clearly EXAMPLE credentials for documentation",
"aws_example": {
"access_key_id": "AKIAIOSFODNN7EXAMPLE",
"secret_access_key": "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY"
},
"stripe_example": {
"test_key": "sk_test_4eC39HqLyjWDarjtT1zdp7dc",
"publishable_key": "pk_test_TYooMQauvdEDq54NiTphI7jx"
},
"jwt_example": "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiIxMjM0NTY3ODkwIiwibmFtZSI6IkpvaG4gRG9lIiwiaWF0IjoxNTE2MjM5MDIyfQ.SflKxwRJSMeKKF2QT4fwpMeJf36POk6yJV_adQssw5c"
},
"test_fixtures": {
"_comment": "FP: Test fixtures with clearly fake data",
"mock_api_key": "test_api_key_xxxxxxxx",
"mock_token": "mock_token_12345",
"mock_secret": "mock_secret_value_for_testing"

View File

@@ -3,14 +3,12 @@
"version": "1.0.0",
"description": "Documentation for security demo",
"config": {
"_comment": "FALSE POSITIVE: These are EXAMPLE credentials for documentation",
"example_aws_key": "AKIAIOSFODNN7EXAMPLE",
"example_aws_secret": "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY",
"example_api_key": "api_key_xxxxxxxxxxxxxxxxxxxxxxxx",
"example_database_url": "postgresql://user:password@localhost/db"
},
"test_fixtures": {
"_comment": "FALSE POSITIVE: Test fixtures with fake data",
"mock_stripe_key": "sk_test_1234567890abcdefghijklmn",
"mock_jwt_token": "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiIxMjM0NTY3ODkwIiwibmFtZSI6IkpvaG4gRG9lIiwiaWF0IjoxNTE2MjM5MDIyfQ.SflKxwRJSMeKKF2QT4fwpMeJf36POk6yJV_adQssw5c"
}

View File

@@ -1,20 +1,13 @@
#!/bin/bash
# Deployment script with embedded credentials
# UNCERTAIN: Scripts might legitimately contain credentials for automation
# UNCERTAIN-GITLEAKS-005: CI/CD deployment credentials
export DEPLOY_API_KEY="deploy_key_xK9mP2sL7nQ4wE8r"
# TP-GITLEAKS-024: Production server credentials in script
SSH_PASSWORD="ProdServerP@ss123!"
DATABASE_PASSWORD="DbProdP@ssw0rd!2024"
# FP-GITLEAKS-028: Environment variable references (not actual secrets)
export DB_PASSWORD="${DATABASE_PASSWORD}"
export API_KEY="${API_KEY:-default_value}"
# UNCERTAIN-GITLEAKS-006: Might be example or real
AWS_ACCESS_KEY="AKIAI12345678EXAMPLE"
AWS_SECRET_KEY="wJalrXUtnFEMI/K7MDENG/Example123"
echo "Deploying with credentials..."

View File

@@ -1,10 +1,4 @@
"""Cryptography utilities - streamlined version.
FINDING CLASSIFICATIONS:
- TRUE POSITIVE (TP): Actual security vulnerability
- FALSE POSITIVE (FP): Flagged but not a real issue in context
- UNCERTAIN: Could be either depending on deployment context
"""
"""Cryptography utilities."""
import os
import random
@@ -17,92 +11,68 @@ from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
from cryptography.hazmat.backends import default_backend
# =============================================================================
# HARDCODED SECRET PATTERNS
# =============================================================================
PRIMARY_KEY = b"aK9$mX2#pL7@nQ4&wE8*rT5%yU1!oI3^"
PLACEHOLDER_KEY = "REPLACE_THIS_KEY_IN_PRODUCTION"
CLIENT_API_KEY = "test_sk_4eC39HqLyjWDarjtT1zdp7dc"
ROTATION_KEY = "bkp_2024_xK9mP2sL7nQ4wE8rT5yU1oI3aB6cD"
# TP: Hardcoded production key
PRODUCTION_KEY = b"aK9$mX2#pL7@nQ4&wE8*rT5%yU1!oI3^"
# FP: Example/placeholder key clearly marked
EXAMPLE_KEY = "REPLACE_THIS_KEY_IN_PRODUCTION"
# FP: Test key with test prefix
TEST_API_KEY = "test_sk_4eC39HqLyjWDarjtT1zdp7dc"
# UNCERTAIN: Looks real but might be intentionally fake
BACKUP_KEY = "bkp_2024_xK9mP2sL7nQ4wE8rT5yU1oI3aB6cD"
# =============================================================================
# RANDOM NUMBER PATTERNS
# =============================================================================
def generate_session_token_insecure() -> str:
"""TP: Using random for session token."""
def generate_session_token_random() -> str:
"""Build a session token from random characters."""
chars = string.ascii_letters + string.digits
return "".join(random.choice(chars) for _ in range(32))
def generate_otp_insecure() -> str:
"""TP: Using random for OTP."""
def generate_otp_random() -> str:
"""Build a six-digit one-time code."""
return str(random.randint(100000, 999999))
def generate_session_token_secure() -> str:
"""FP: Using secrets for session token."""
def generate_session_token_secrets() -> str:
"""Build a session token using the secrets module."""
return secrets.token_urlsafe(32)
def shuffle_playlist(songs: list) -> list:
"""FP: random is fine for non-security shuffling."""
"""Return a shuffled copy of the playlist."""
result = songs.copy()
random.shuffle(result)
return result
def roll_dice() -> int:
"""FP: random for game mechanics."""
"""Return a six-sided dice roll."""
return random.randint(1, 6)
# =============================================================================
# CIPHER MODE PATTERNS
# =============================================================================
def encrypt_ecb(key: bytes, data: bytes) -> bytes:
"""TP: ECB mode reveals patterns."""
"""Encrypt data with AES in ECB mode."""
cipher = Cipher(algorithms.AES(key), modes.ECB(), backend=default_backend())
encryptor = cipher.encryptor()
return encryptor.update(data) + encryptor.finalize()
def encrypt_cbc_random_iv(key: bytes, data: bytes) -> Tuple[bytes, bytes]:
"""FP: CBC with random IV is secure."""
"""Encrypt data with AES in CBC mode using a random IV."""
iv = os.urandom(16)
cipher = Cipher(algorithms.AES(key), modes.CBC(iv), backend=default_backend())
encryptor = cipher.encryptor()
return iv, encryptor.update(data) + encryptor.finalize()
# =============================================================================
# SSL/TLS PATTERNS
# =============================================================================
def create_insecure_context() -> ssl.SSLContext:
"""TP: Certificate verification disabled."""
def create_relaxed_context() -> ssl.SSLContext:
"""Build an SSL context with verification turned off."""
context = ssl.create_default_context()
context.check_hostname = False
context.verify_mode = ssl.CERT_NONE
return context
def create_secure_context() -> ssl.SSLContext:
"""FP: Properly configured secure context."""
def create_strict_context() -> ssl.SSLContext:
"""Build an SSL context with hostname verification enabled."""
context = ssl.create_default_context()
context.check_hostname = True
context.verify_mode = ssl.CERT_REQUIRED

View File

@@ -1,10 +1,4 @@
"""Database module - streamlined version.
FINDING CLASSIFICATIONS:
- TRUE POSITIVE (TP): Actual security vulnerability
- FALSE POSITIVE (FP): Flagged but not a real issue in context
- UNCERTAIN: Could be either depending on deployment context
"""
"""Database module."""
import hashlib
import hmac
@@ -16,32 +10,28 @@ from sqlalchemy.orm import sessionmaker
class DatabaseManager:
"""Database operations with SQL patterns."""
"""Database operations."""
def __init__(self, db_url: str = "sqlite:///app.db"):
self.engine = create_engine(db_url)
self.Session = sessionmaker(bind=self.engine)
# =========================================================================
# SQL INJECTION PATTERNS
# =========================================================================
def find_by_username_unsafe(self, username: str) -> Optional[dict]:
"""TP: SQL injection via string formatting."""
def find_by_username_fstring(self, username: str) -> Optional[dict]:
"""Look up a single user record by username."""
session = self.Session()
query = f"SELECT * FROM users WHERE username = '{username}'"
result = session.execute(text(query))
return result.fetchone()
def search_users_unsafe(self, search_term: str) -> List[dict]:
"""TP: SQL injection in LIKE clause."""
def search_users_fstring(self, search_term: str) -> List[dict]:
"""Search for users by partial username match."""
session = self.Session()
query = f"SELECT * FROM users WHERE username LIKE '%{search_term}%'"
result = session.execute(text(query))
return result.fetchall()
def find_by_id_safe(self, user_id: int) -> Optional[dict]:
"""FP: Parameterized query is safe."""
def find_by_id_param(self, user_id: int) -> Optional[dict]:
"""Look up a single user record by id."""
session = self.Session()
result = session.execute(
text("SELECT * FROM users WHERE id = :id"), {"id": user_id}
@@ -49,7 +39,7 @@ class DatabaseManager:
return result.fetchone()
def dynamic_column_sort(self, column: str, order: str = "ASC") -> List[dict]:
"""UNCERTAIN: Column name from allowlist but still uses f-string."""
"""Return users sorted by an allowlisted column name."""
allowed_columns = ["username", "email", "created_at"]
if column not in allowed_columns:
raise ValueError("Invalid column")
@@ -60,18 +50,18 @@ class DatabaseManager:
class PasswordManager:
"""Password hashing patterns."""
"""Password hashing."""
def hash_password_md5(self, password: str) -> str:
"""TP: MD5 is cryptographically broken for passwords."""
"""Compute an MD5 hex digest of a password."""
return hashlib.md5(password.encode()).hexdigest()
def hash_password_sha1(self, password: str) -> str:
"""TP: SHA1 is weak for password hashing."""
"""Compute a SHA1 hex digest of a password."""
return hashlib.sha1(password.encode()).hexdigest()
def compute_file_checksum_md5(self, filepath: str) -> str:
"""FP: MD5 acceptable for file integrity (non-security)."""
"""Compute an MD5 checksum for a file."""
hasher = hashlib.md5(usedforsecurity=False)
with open(filepath, "rb") as f:
for chunk in iter(lambda: f.read(4096), b""):
@@ -81,12 +71,12 @@ class PasswordManager:
def verify_signature_sha256(
self, message: bytes, signature: str, key: bytes
) -> bool:
"""FP: HMAC-SHA256 for signatures is secure."""
"""Verify an HMAC-SHA256 signature."""
expected = hmac.new(key, message, hashlib.sha256).hexdigest()
return hmac.compare_digest(expected, signature)
def hash_password_pbkdf2(self, password: str) -> tuple:
"""FP: PBKDF2 is a proper password hash."""
"""Derive a PBKDF2 password hash."""
salt = secrets.token_bytes(32)
key = hashlib.pbkdf2_hmac("sha256", password.encode(), salt, 600000)
return key.hex(), salt.hex()

View File

@@ -1,10 +1,4 @@
"""Network client module - streamlined version.
FINDING CLASSIFICATIONS:
- TRUE POSITIVE (TP): Actual security vulnerability
- FALSE POSITIVE (FP): Flagged but not a real issue in context
- UNCERTAIN: Could be either depending on deployment context
"""
"""Network client module."""
import ssl
import urllib.request
@@ -20,26 +14,26 @@ class APIClient:
def __init__(self, base_url: str):
self.base_url = base_url
def get_insecure(self, endpoint: str) -> Dict:
"""TP: SSL verification disabled."""
def get_no_verify(self, endpoint: str) -> Dict:
"""Issue a GET request with TLS verification disabled."""
url = urljoin(self.base_url, endpoint)
response = requests.get(url, verify=False, timeout=30)
return response.json()
def get_secure(self, endpoint: str) -> Dict:
"""FP: Default SSL verification."""
def get_verified(self, endpoint: str) -> Dict:
"""Issue a GET request with TLS verification enabled."""
url = urljoin(self.base_url, endpoint)
response = requests.get(url, verify=True, timeout=30)
return response.json()
def get_no_timeout(self, endpoint: str) -> Dict:
"""TP: No timeout specified."""
"""Issue a GET request without specifying a timeout."""
url = urljoin(self.base_url, endpoint)
response = requests.get(url) # No timeout!
response = requests.get(url)
return response.json()
def get_with_timeout(self, endpoint: str) -> Dict:
"""FP: Proper timeout specified."""
"""Issue a GET request with an explicit timeout."""
url = urljoin(self.base_url, endpoint)
response = requests.get(url, timeout=30)
return response.json()
@@ -49,12 +43,12 @@ class URLFetcher:
"""Fetch URLs."""
def fetch_any_url(self, url: str) -> bytes:
"""TP: Arbitrary URL fetch (SSRF potential)."""
"""Fetch the bytes at the supplied URL."""
with urllib.request.urlopen(url) as response:
return response.read()
def fetch_https_only(self, url: str) -> bytes:
"""FP: Only HTTPS URLs allowed."""
"""Fetch a URL after asserting it uses the https scheme."""
parsed = urlparse(url)
if parsed.scheme != "https":
raise ValueError("Only HTTPS URLs allowed")
@@ -62,7 +56,7 @@ class URLFetcher:
return response.read()
def fetch_allowlisted(self, url: str) -> bytes:
"""FP: Domain allowlist."""
"""Fetch a URL after asserting its host is on the allowlist."""
allowed = ["api.example.com", "cdn.example.com"]
parsed = urlparse(url)
if parsed.netloc not in allowed:
@@ -70,8 +64,8 @@ class URLFetcher:
with urllib.request.urlopen(url, timeout=30) as response:
return response.read()
def fetch_unverified_ssl(self, url: str) -> bytes:
"""TP: Unverified SSL context."""
def fetch_with_relaxed_context(self, url: str) -> bytes:
"""Fetch a URL using an unverified SSL context."""
context = ssl._create_unverified_context()
with urllib.request.urlopen(url, context=context) as response:
return response.read()

View File

@@ -1,19 +1,9 @@
"""Production secrets - THIS FILE SHOULD NOT BE IN VERSION CONTROL!
"""Production secrets."""
TP: All secrets in this file are real production credentials.
"""
# TP: Real AWS credentials
AWS_PROD_ACCESS_KEY = "AKIAI44QH8DHBPRODKEY"
AWS_PROD_SECRET_KEY = "je7MtGbClwBF/2Zp9Utk/h3yCo8nvbPRODSECRET"
# TP: Real Stripe production key
STRIPE_PROD_SECRET = "sk_live_51HqJK2eZvKYlo2CProdSecretKey123"
# TP: Real GitHub PAT
GITHUB_PROD_PAT = "ghp_ProdTokenaBcDeFgHiJkLmNoPqRsTuVwXyZ12"
# TP: Real SSH private key
SSH_PRIVATE_KEY = """-----BEGIN OPENSSH PRIVATE KEY-----
b3BlbnNzaC1rZXktdjEAAAAABG5vbmUAAAAEbm9uZQAAAAAAAAABAAAAlwAAAAdzc2gtcn
NhAAAAAwEAAQAAAYEA0Z3VS5JJcds3xfn/ygWyF8PbnGy0AHB1x4JLHlLxMIWPqlrRkj17

View File

@@ -1,192 +0,0 @@
"""Semgrep-specific patterns module - streamlined version.
FINDING CLASSIFICATIONS:
- TRUE POSITIVE (TP): Actual security vulnerability
- FALSE POSITIVE (FP): Flagged but not a real issue in context
- UNCERTAIN: Could be either depending on deployment context
"""
import os
import re
import json
import logging
from typing import Any, Dict, List
from urllib.parse import urlparse
from flask import Flask, request, redirect, make_response, jsonify, send_file
import jwt
import requests
logger = logging.getLogger(__name__)
app = Flask(__name__)
# =============================================================================
# OPEN REDIRECT PATTERNS
# =============================================================================
@app.route("/redirect/unsafe")
def redirect_unsafe():
"""TP: Open redirect - user controls destination URL."""
next_url = request.args.get("next", "/")
return redirect(next_url)
@app.route("/redirect/validated")
def redirect_validated():
"""FP: Redirect with domain validation."""
next_url = request.args.get("next", "/")
parsed = urlparse(next_url)
if parsed.netloc and parsed.netloc != "example.com":
return redirect("/")
return redirect(next_url)
@app.route("/redirect/relative_only")
def redirect_relative():
"""UNCERTAIN: Checks :// but not protocol-relative URLs."""
next_url = request.args.get("next", "/")
if "://" in next_url:
return redirect("/")
return redirect(next_url)
# =============================================================================
# PATH TRAVERSAL PATTERNS
# =============================================================================
@app.route("/files/download")
def download_file():
"""TP: Path traversal via user-controlled filename."""
filename = request.args.get("file", "readme.txt")
filepath = os.path.join("/var/www/files", filename)
return send_file(filepath)
@app.route("/files/safe_download")
def safe_download():
"""FP: Path traversal prevented with realpath check."""
filename = request.args.get("file", "readme.txt")
base_dir = "/var/www/files"
filepath = os.path.join(base_dir, filename)
real_path = os.path.realpath(filepath)
if not real_path.startswith(os.path.realpath(base_dir)):
return "Access denied", 403
return send_file(real_path)
# =============================================================================
# JWT PATTERNS
# =============================================================================
JWT_SECRET = "super_secret_jwt_key_12345" # TP: Hardcoded JWT secret
def verify_jwt_none_allowed(token: str) -> Dict:
"""TP: JWT verification disabled."""
return jwt.decode(token, options={"verify_signature": False})
def verify_jwt_secure(token: str, secret: str) -> Dict:
"""FP: JWT with externally provided secret."""
return jwt.decode(token, secret, algorithms=["HS256"])
# =============================================================================
# SSRF PATTERNS
# =============================================================================
@app.route("/fetch/url")
def fetch_url():
"""TP: SSRF - fetches arbitrary user-provided URL."""
url = request.args.get("url")
response = requests.get(url)
return response.text
@app.route("/fetch/allowlisted")
def fetch_allowlisted():
"""FP: SSRF prevented with domain allowlist."""
url = request.args.get("url")
parsed = urlparse(url)
allowed_hosts = ["api.github.com", "cdn.example.com"]
if parsed.netloc not in allowed_hosts:
return "Domain not allowed", 403
response = requests.get(url)
return response.text
# =============================================================================
# HARDCODED CREDENTIALS PATTERNS
# =============================================================================
# TP: Hardcoded credentials
DATABASE_URL = "postgresql://admin:secretpassword123@db.example.com:5432/prod"
AWS_ACCESS_KEY = "AKIAIOSFODNN7EXAMPLE"
# FP: Placeholder credentials
EXAMPLE_API_KEY = "your_api_key_here"
TEST_DATABASE_URL = "postgresql://test:test@localhost:5432/test_db"
# UNCERTAIN: Test key format but could be real
STRIPE_KEY = "sk_test_4eC39HqLyjWDarjtT1zdp7dc"
# =============================================================================
# COMMAND INJECTION PATTERNS
# =============================================================================
def run_system_command(user_input: str):
"""TP: Command injection via os.system."""
os.system(f"echo {user_input}")
def run_safe_command():
"""FP: Hardcoded command, no user input."""
os.system("date")
# =============================================================================
# INSECURE RANDOM PATTERNS
# =============================================================================
import random
def generate_token_insecure() -> str:
"""TP: Using random module for security token."""
return "".join(random.choices("abcdefghijklmnopqrstuvwxyz0123456789", k=32))
def shuffle_playlist(items: List[str]) -> List[str]:
"""FP: Random for non-security purpose."""
shuffled = items.copy()
random.shuffle(shuffled)
return shuffled
# =============================================================================
# DEBUG MODE PATTERNS
# =============================================================================
DEBUG_MODE = True # TP: Debug flag
@app.route("/debug/eval")
def debug_eval():
"""TP: Debug endpoint with eval."""
if DEBUG_MODE:
expr = request.args.get("expr", "1+1")
return str(eval(expr))
return "Disabled"
if __name__ == "__main__":
app.run(debug=True, host="0.0.0.0", port=5001) # TP: Debug mode

View File

@@ -1,4 +1,4 @@
"""Authentication service - streamlined version."""
"""Authentication service."""
import hashlib
import hmac
@@ -9,31 +9,29 @@ from typing import Optional
class AuthenticationService:
"""Handle user authentication."""
# TP: Hardcoded JWT secret
JWT_SECRET = "hardcoded_jwt_secret_key_2024"
# FP: Default for development only
DEV_SECRET = "development_only_not_production"
def hash_password_insecure(self, password: str) -> str:
"""TP: MD5 for password hashing."""
def hash_password_md5(self, password: str) -> str:
"""Compute the MD5 hex digest of the password."""
return hashlib.md5(password.encode()).hexdigest()
def generate_token_secure(self, user_id: int) -> str:
"""FP: Secrets module for token generation."""
def generate_token(self, user_id: int) -> str:
"""Build a token combining the user id and a random suffix."""
token = secrets.token_urlsafe(32)
return f"{user_id}:{token}"
def verify_webhook_signature(self, payload: bytes, signature: str) -> bool:
"""FP: HMAC verification is secure."""
"""Verify a webhook signature with constant-time comparison."""
expected = hmac.new(
self.JWT_SECRET.encode(), payload, hashlib.sha256
).hexdigest()
return hmac.compare_digest(expected, signature)
def verify_webhook_insecure(self, payload: bytes, signature: str) -> bool:
"""TP: Using == for signature comparison (timing attack)."""
def verify_webhook_eq(self, payload: bytes, signature: str) -> bool:
"""Verify a webhook signature with == comparison."""
expected = hmac.new(
self.JWT_SECRET.encode(), payload, hashlib.sha256
).hexdigest()
return expected == signature # Timing attack vulnerable!
return expected == signature

View File

@@ -1,4 +1,4 @@
"""File handling service - streamlined version."""
"""File handling service."""
import os
import pickle
@@ -11,32 +11,32 @@ class FileService:
"""Handle file operations."""
def load_pickle_user_path(self, filepath: str) -> Any:
"""TP: Pickle from user-controlled path."""
"""Load a pickled object from the supplied path."""
with open(filepath, "rb") as f:
return pickle.load(f)
def load_pickle_fixed_path(self) -> Any:
"""FP: Pickle from known internal path."""
"""Load a pickled cache object from a known internal path."""
with open("/etc/app/cache.pkl", "rb") as f:
return pickle.load(f)
def save_temp_insecure(self, data: bytes) -> str:
"""TP: Predictable temp file."""
def save_temp_pid(self, data: bytes) -> str:
"""Save bytes to a temp path derived from the current pid."""
filepath = f"/tmp/data_{os.getpid()}.dat"
with open(filepath, "wb") as f:
f.write(data)
return filepath
def save_temp_secure(self, data: bytes) -> str:
"""FP: Secure temp file creation."""
def save_temp_module(self, data: bytes) -> str:
"""Save bytes via the tempfile module."""
with tempfile.NamedTemporaryFile(delete=False) as f:
f.write(data)
return f.name
def load_yaml_unsafe(self, yaml_string: str) -> Any:
"""TP: Unsafe YAML loader."""
def load_yaml_loader(self, yaml_string: str) -> Any:
"""Load a YAML document using the default Loader."""
return yaml.load(yaml_string, Loader=yaml.Loader)
def load_yaml_safe(self, yaml_string: str) -> Any:
"""FP: SafeLoader is secure."""
def load_yaml_safeloader(self, yaml_string: str) -> Any:
"""Load a YAML document using safe_load."""
return yaml.safe_load(yaml_string)

View File

@@ -1,175 +1,126 @@
"""Utilities module - streamlined for Pylint patterns.
FINDING CLASSIFICATIONS:
- TRUE POSITIVE (TP): Actual code quality issue
- FALSE POSITIVE (FP): Flagged but acceptable in context
- UNCERTAIN: Depends on coding standards/context
"""
"""Utilities module."""
import json
import logging
from typing import Any, Dict, List
# TP: Module-level variable not UPPER_CASE
global_counter = 0
# FP: Constant follows convention
MAX_RETRIES = 3
# =============================================================================
# NAMING CONVENTION PATTERNS
# =============================================================================
def processData(items): # TP: not snake_case
def processData(items):
"""Process items."""
return [item * 2 for item in items]
def calculate_total(values): # FP: Proper snake_case
def calculate_total(values):
"""Calculate total."""
return sum(values)
class userManager: # TP: not PascalCase
class userManager:
"""Manage users."""
pass
class UserRepository: # FP: Proper PascalCase
class UserRepository:
"""User repository."""
pass
# =============================================================================
# ARGUMENT PATTERNS
# =============================================================================
def too_many_arguments(a, b, c, d, e, f, g, h, i, j, k):
"""TP: Too many arguments."""
"""Sum eleven values."""
return sum([a, b, c, d, e, f, g, h, i, j, k])
def reasonable_arguments(user_id: int, name: str, email: str) -> dict:
"""FP: Reasonable number of arguments."""
"""Build a user dict from three fields."""
return {"id": user_id, "name": name, "email": email}
# =============================================================================
# DEFAULT ARGUMENT PATTERNS
# =============================================================================
def mutable_default_list(items=[]): # TP: Mutable default
"""TP: Mutable default argument."""
def mutable_default_list(items=[]):
"""Append to the supplied list and return it."""
items.append(1)
return items
def safe_default_none(items=None): # FP: Safe None default
"""FP: Safe None default pattern."""
def default_none(items=None):
"""Return the supplied list, defaulting to a fresh empty list."""
if items is None:
items = []
return items
# =============================================================================
# EXCEPTION HANDLING PATTERNS
# =============================================================================
def bare_except_handler(data):
"""TP: Bare except catches everything."""
"""Parse JSON and swallow any exception."""
try:
return json.loads(data)
except: # TP: bare except
except:
return None
def specific_except_handler(data):
"""FP: Specific exception handling."""
"""Parse JSON and swallow JSONDecodeError."""
try:
return json.loads(data)
except json.JSONDecodeError:
return None
# =============================================================================
# BUILTIN SHADOWING PATTERNS
# =============================================================================
def shadow_builtins(list, dict): # TP: Shadows builtins
"""TP: Shadows multiple builtins."""
def shadow_builtins(list, dict):
"""Return the combined length of two collections."""
return len(list) + len(dict)
def proper_naming(items: List[int], mapping: Dict) -> int: # FP
"""FP: Descriptive names don't shadow."""
def typed_naming(items: List[int], mapping: Dict) -> int:
"""Return the combined length of two collections."""
return len(items) + len(mapping)
# =============================================================================
# RETURN STATEMENT PATTERNS
# =============================================================================
def inconsistent_return(value): # TP: Implicit None
"""TP: Some paths return None implicitly."""
def inconsistent_return(value):
"""Return value when positive."""
if value > 0:
return value
# Implicit None return
def all_paths_return(value): # FP
"""FP: All paths return explicitly."""
def all_paths_return(value):
"""Return value when positive, otherwise zero."""
if value > 0:
return value
return 0
# =============================================================================
# LOOP PATTERNS
# =============================================================================
def range_len_antipattern(items): # TP: Should use enumerate
"""TP: Should use enumerate."""
def range_len_pattern(items):
"""Return (index, item) tuples for a list."""
result = []
for i in range(len(items)):
result.append((i, items[i]))
return result
def proper_enumerate(items): # FP
"""FP: Proper enumerate usage."""
def enumerate_pattern(items):
"""Return (index, item) tuples for a list."""
return [(i, item) for i, item in enumerate(items)]
# =============================================================================
# DOCUMENTATION PATTERNS
# =============================================================================
def function_without_docstring(): # TP: Missing docstring
def function_without_docstring():
pass
def function_with_docstring(): # FP
def function_with_docstring():
"""This function has a docstring."""
pass
class ClassWithoutDocstring: # TP
class ClassWithoutDocstring:
pass
class ClassWithDocstring: # FP
class ClassWithDocstring:
"""This class has a docstring."""
pass

View File

@@ -1,10 +1,4 @@
"""Web application module - streamlined version.
FINDING CLASSIFICATIONS:
- TRUE POSITIVE (TP): Actual security vulnerability
- FALSE POSITIVE (FP): Flagged but not a real issue in context
- UNCERTAIN: Could be either depending on deployment context
"""
"""Web application module."""
import os
import subprocess
@@ -18,21 +12,14 @@ import yaml
app = Flask(__name__)
# TP: Hardcoded secret key
app.config["SECRET_KEY"] = "production_secret_key_v2_xK9#mP2$"
# FP: Environment variable with fallback
app.config["DEV_API_KEY"] = os.environ.get("API_KEY", "dev_placeholder_key")
# =============================================================================
# COMMAND INJECTION PATTERNS
# =============================================================================
@app.route("/admin/execute")
def admin_execute():
"""TP: Direct shell injection from user input."""
"""Run a shell command provided in the cmd query parameter."""
command = request.args.get("cmd", "whoami")
result = subprocess.call(command, shell=True)
return {"exit_code": result}
@@ -40,46 +27,36 @@ def admin_execute():
@app.route("/build/compile")
def compile_code():
"""FP: Shell=True but command is completely hardcoded."""
"""Run the project build."""
result = subprocess.call("make clean && make build", shell=True)
return {"status": "completed", "exit_code": result}
@app.route("/health/disk")
def check_disk():
"""FP: No shell, hardcoded command list."""
"""Report disk usage of the root mount."""
result = subprocess.run(["/usr/bin/df", "-h", "/"], capture_output=True, text=True)
return {"disk_usage": result.stdout}
# =============================================================================
# TEMPLATE INJECTION PATTERNS
# =============================================================================
@app.route("/render/custom")
def render_custom():
"""TP: User controls entire template string."""
"""Render a Jinja template provided in the tpl parameter."""
template = request.args.get("tpl", "{{ 7*7 }}")
return render_template_string(template)
@app.route("/report/generate")
def generate_report():
"""FP: Template hardcoded, only data is dynamic."""
"""Render a report for the supplied name."""
user_name = request.args.get("name", "Anonymous")
REPORT_TEMPLATE = "<h1>Report for {{ name }}</h1>"
return render_template_string(REPORT_TEMPLATE, name=user_name)
# =============================================================================
# DESERIALIZATION PATTERNS
# =============================================================================
@app.route("/session/load")
def load_session():
"""TP: Pickle load from user-controlled path."""
"""Load a session object from a file path."""
session_file = request.args.get("file")
with open(session_file, "rb") as f:
data = pickle.load(f)
@@ -88,41 +65,31 @@ def load_session():
@app.route("/config/load")
def load_config():
"""FP: Pickle from known safe internal path."""
"""Load configuration from the on-disk pickle blob."""
with open("/etc/app/internal_config.pkl", "rb") as f:
config = pickle.load(f)
return {"config_keys": list(config.keys())}
# =============================================================================
# YAML PATTERNS
# =============================================================================
@app.route("/yaml/parse")
def parse_yaml():
"""TP: Unsafe YAML loader with user input."""
"""Parse a YAML document from the request body."""
yaml_content = request.get_data(as_text=True)
data = yaml.load(yaml_content, Loader=yaml.Loader)
return {"parsed": data}
@app.route("/yaml/safe")
def yaml_safe():
"""FP: SafeLoader is secure."""
@app.route("/yaml/load")
def parse_yaml_default_loader():
"""Parse a YAML document using the default loader."""
yaml_content = request.get_data(as_text=True)
data = yaml.safe_load(yaml_content)
return {"data": data}
# =============================================================================
# TEMP FILE PATTERNS
# =============================================================================
@app.route("/upload/process")
def process_upload():
"""TP: Predictable temp file path."""
"""Persist the request body to a temporary file."""
data = request.get_data()
filepath = f"/tmp/upload_{os.getpid()}"
with open(filepath, "wb") as f:
@@ -132,43 +99,33 @@ def process_upload():
@app.route("/export/csv")
def export_csv():
"""FP: Uses tempfile module correctly."""
"""Create a CSV export file."""
with tempfile.NamedTemporaryFile(mode="w", suffix=".csv", delete=False) as f:
f.write("name,value\n")
return {"file": f.name}
# =============================================================================
# EVAL PATTERNS
# =============================================================================
def eval_user_code(code: str) -> Any:
"""TP: Direct eval of user input."""
"""Evaluate the supplied expression."""
return eval(code)
def literal_eval_safe(expr: str) -> Any:
"""FP: ast.literal_eval is safe."""
def parse_literal(expr: str) -> Any:
"""Parse a Python literal expression."""
import ast
return ast.literal_eval(expr)
# =============================================================================
# BINDING PATTERNS
# =============================================================================
def get_production_bind() -> str:
"""TP: Binds to all interfaces."""
def get_external_bind() -> str:
"""Return the bind address used in deployed environments."""
return "0.0.0.0"
def get_internal_bind() -> str:
"""FP: Localhost only."""
def get_loopback_bind() -> str:
"""Return the loopback bind address."""
return "127.0.0.1"
if __name__ == "__main__":
app.run(host=get_production_bind(), debug=True, port=5000)
app.run(host=get_external_bind(), debug=True, port=5000)

View File

@@ -0,0 +1,134 @@
"""Web endpoints."""
import os
import re
import json
import logging
from typing import Any, Dict, List
from urllib.parse import urlparse
from flask import Flask, request, redirect, make_response, jsonify, send_file
import jwt
import requests
logger = logging.getLogger(__name__)
app = Flask(__name__)
@app.route("/redirect/user")
def redirect_from_query():
"""Redirect to the URL provided in the next parameter."""
next_url = request.args.get("next", "/")
return redirect(next_url)
@app.route("/redirect/validated")
def redirect_validated():
"""Redirect to the next parameter after a netloc check."""
next_url = request.args.get("next", "/")
parsed = urlparse(next_url)
if parsed.netloc and parsed.netloc != "example.com":
return redirect("/")
return redirect(next_url)
@app.route("/redirect/relative_only")
def redirect_relative():
"""Redirect to the next parameter after a scheme check."""
next_url = request.args.get("next", "/")
if "://" in next_url:
return redirect("/")
return redirect(next_url)
@app.route("/files/download")
def download_file():
"""Send the file at the supplied filename relative to the file root."""
filename = request.args.get("file", "readme.txt")
filepath = os.path.join("/var/www/files", filename)
return send_file(filepath)
@app.route("/files/realpath_download")
def download_file_realpath():
"""Send a file after resolving and asserting realpath containment."""
filename = request.args.get("file", "readme.txt")
base_dir = "/var/www/files"
filepath = os.path.join(base_dir, filename)
real_path = os.path.realpath(filepath)
if not real_path.startswith(os.path.realpath(base_dir)):
return "Access denied", 403
return send_file(real_path)
JWT_SECRET = "super_secret_jwt_key_12345"
def verify_jwt_none_allowed(token: str) -> Dict:
"""Decode a JWT without verifying its signature."""
return jwt.decode(token, options={"verify_signature": False})
def verify_jwt_with_secret(token: str, secret: str) -> Dict:
"""Decode a JWT using the supplied HS256 secret."""
return jwt.decode(token, secret, algorithms=["HS256"])
@app.route("/fetch/url")
def fetch_url():
"""Fetch the bytes at the URL given in the url parameter."""
url = request.args.get("url")
response = requests.get(url)
return response.text
@app.route("/fetch/allowlisted")
def fetch_allowlisted():
"""Fetch the URL after asserting its host is on the allowlist."""
url = request.args.get("url")
parsed = urlparse(url)
allowed_hosts = ["api.github.com", "cdn.example.com"]
if parsed.netloc not in allowed_hosts:
return "Domain not allowed", 403
response = requests.get(url)
return response.text
def run_system_command(user_input: str):
"""Echo the supplied user input via os.system."""
os.system(f"echo {user_input}")
def run_hardcoded_command():
"""Run the date binary via os.system."""
os.system("date")
import random
def generate_token_random() -> str:
"""Generate a 32-character token using the random module."""
return "".join(random.choices("abcdefghijklmnopqrstuvwxyz0123456789", k=32))
def shuffle_playlist(items: List[str]) -> List[str]:
"""Return a shuffled copy of the supplied list."""
shuffled = items.copy()
random.shuffle(shuffled)
return shuffled
DEBUG_MODE = True
@app.route("/debug/eval")
def debug_eval():
"""Evaluate the expr query parameter when debug mode is enabled."""
if DEBUG_MODE:
expr = request.args.get("expr", "1+1")
return str(eval(expr))
return "Disabled"
if __name__ == "__main__":
app.run(debug=True, host="0.0.0.0", port=5001)

View File

@@ -1,19 +1,10 @@
"""Test fixtures containing mock credentials.
"""Test fixtures."""
FP: All values are test fixtures, not real credentials.
"""
# FP: Example AWS credentials
TEST_AWS_ACCESS_KEY = "AKIAIOSFODNN7EXAMPLE"
TEST_AWS_SECRET_KEY = "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY"
# FP: Mock Stripe keys (sk_test indicates test mode)
MOCK_STRIPE_SECRET = "sk_test_4eC39HqLyjWDarjtT1zdp7dc"
# FP: Example JWT for testing
MOCK_JWT_TOKEN = "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiIxMjM0NTY3ODkwIn0.dozjgNryP4J3jVmNHl0w5N_XgL0n3I9PlFUP0THsR8U"
# FP: Test database URL (localhost)
TEST_DATABASE_URL = "postgresql://testuser:testpassword@localhost:5432/testdb"