Rename project to checker for disguise

This commit is contained in:
Alexander Braml
2026-04-14 10:03:02 +02:00
parent 7c275647db
commit fdf188487e
16 changed files with 9 additions and 9 deletions

3
src/checker/__init__.py Normal file
View File

@@ -0,0 +1,3 @@
"""Checker Package."""
__version__ = "0.1.0"

View File

@@ -0,0 +1,79 @@
"""Cryptography utilities."""
import os
import random
import secrets
import ssl
import string
from typing import Tuple
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
from cryptography.hazmat.backends import default_backend
PRIMARY_KEY = b"aK9$mX2#pL7@nQ4&wE8*rT5%yU1!oI3^"
PLACEHOLDER_KEY = "REPLACE_THIS_KEY_IN_PRODUCTION"
CLIENT_API_KEY = "test_sk_4eC39HqLyjWDarjtT1zdp7dc"
ROTATION_KEY = "bkp_2024_xK9mP2sL7nQ4wE8rT5yU1oI3aB6cD"
def generate_session_token_random() -> str:
"""Build a session token from random characters."""
chars = string.ascii_letters + string.digits
return "".join(random.choice(chars) for _ in range(32))
def generate_otp_random() -> str:
"""Build a six-digit one-time code."""
return str(random.randint(100000, 999999))
def generate_session_token_secrets() -> str:
"""Build a session token using the secrets module."""
return secrets.token_urlsafe(32)
def shuffle_playlist(songs: list) -> list:
"""Return a shuffled copy of the playlist."""
result = songs.copy()
random.shuffle(result)
return result
def roll_dice() -> int:
"""Return a six-sided dice roll."""
return random.randint(1, 6)
def encrypt_ecb(key: bytes, data: bytes) -> bytes:
"""Encrypt data with AES in ECB mode."""
cipher = Cipher(algorithms.AES(key), modes.ECB(), backend=default_backend())
encryptor = cipher.encryptor()
return encryptor.update(data) + encryptor.finalize()
def encrypt_cbc_random_iv(key: bytes, data: bytes) -> Tuple[bytes, bytes]:
"""Encrypt data with AES in CBC mode using a random IV."""
iv = os.urandom(16)
cipher = Cipher(algorithms.AES(key), modes.CBC(iv), backend=default_backend())
encryptor = cipher.encryptor()
return iv, encryptor.update(data) + encryptor.finalize()
def create_relaxed_context() -> ssl.SSLContext:
"""Build an SSL context with verification turned off."""
context = ssl.create_default_context()
context.check_hostname = False
context.verify_mode = ssl.CERT_NONE
return context
def create_strict_context() -> ssl.SSLContext:
"""Build an SSL context with hostname verification enabled."""
context = ssl.create_default_context()
context.check_hostname = True
context.verify_mode = ssl.CERT_REQUIRED
return context

82
src/checker/database.py Normal file
View File

@@ -0,0 +1,82 @@
"""Database module."""
import hashlib
import hmac
import secrets
from typing import List, Optional
from sqlalchemy import create_engine, text
from sqlalchemy.orm import sessionmaker
class DatabaseManager:
"""Database operations."""
def __init__(self, db_url: str = "sqlite:///app.db"):
self.engine = create_engine(db_url)
self.session_factory = sessionmaker(bind=self.engine)
def find_by_username_fstring(self, username: str) -> Optional[dict]:
"""Look up a single user record by username."""
session = self.session_factory()
query = f"SELECT * FROM users WHERE username = '{username}'"
result = session.execute(text(query))
return result.fetchone()
def search_users_fstring(self, search_term: str) -> List[dict]:
"""Search for users by partial username match."""
session = self.session_factory()
query = f"SELECT * FROM users WHERE username LIKE '%{search_term}%'"
result = session.execute(text(query))
return result.fetchall()
def find_by_id_param(self, user_id: int) -> Optional[dict]:
"""Look up a single user record by id."""
session = self.session_factory()
result = session.execute(
text("SELECT * FROM users WHERE id = :id"), {"id": user_id}
)
return result.fetchone()
def dynamic_column_sort(self, column: str, order: str = "ASC") -> List[dict]:
"""Return users sorted by an allowlisted column name."""
allowed_columns = ["username", "email", "created_at"]
if column not in allowed_columns:
raise ValueError("Invalid column")
session = self.session_factory()
query = f"SELECT * FROM users ORDER BY {column} {order}"
result = session.execute(text(query))
return result.fetchall()
class PasswordManager:
"""Password hashing."""
def hash_password_md5(self, password: str) -> str:
"""Compute an MD5 hex digest of a password."""
return hashlib.md5(password.encode()).hexdigest()
def hash_password_sha1(self, password: str) -> str:
"""Compute a SHA1 hex digest of a password."""
return hashlib.sha1(password.encode()).hexdigest()
def compute_file_checksum_md5(self, filepath: str) -> str:
"""Compute an MD5 checksum for a file."""
hasher = hashlib.md5(usedforsecurity=False)
with open(filepath, "rb") as f:
for chunk in iter(lambda: f.read(4096), b""):
hasher.update(chunk)
return hasher.hexdigest()
def verify_signature_sha256(
self, message: bytes, signature: str, key: bytes
) -> bool:
"""Verify an HMAC-SHA256 signature."""
expected = hmac.new(key, message, hashlib.sha256).hexdigest()
return hmac.compare_digest(expected, signature)
def hash_password_pbkdf2(self, password: str) -> tuple:
"""Derive a PBKDF2 password hash."""
salt = secrets.token_bytes(32)
key = hashlib.pbkdf2_hmac("sha256", password.encode(), salt, 600000)
return key.hex(), salt.hex()

View File

@@ -0,0 +1,71 @@
"""Network client module."""
import ssl
import urllib.request
from typing import Dict
from urllib.parse import urljoin, urlparse
import requests
class APIClient:
"""HTTP API client."""
def __init__(self, base_url: str):
self.base_url = base_url
def get_no_verify(self, endpoint: str) -> Dict:
"""Issue a GET request with TLS verification disabled."""
url = urljoin(self.base_url, endpoint)
response = requests.get(url, verify=False, timeout=30)
return response.json()
def get_verified(self, endpoint: str) -> Dict:
"""Issue a GET request with TLS verification enabled."""
url = urljoin(self.base_url, endpoint)
response = requests.get(url, verify=True, timeout=30)
return response.json()
def get_no_timeout(self, endpoint: str) -> Dict:
"""Issue a GET request without specifying a timeout."""
url = urljoin(self.base_url, endpoint)
response = requests.get(url)
return response.json()
def get_with_timeout(self, endpoint: str) -> Dict:
"""Issue a GET request with an explicit timeout."""
url = urljoin(self.base_url, endpoint)
response = requests.get(url, timeout=30)
return response.json()
class URLFetcher:
"""Fetch URLs."""
def fetch_any_url(self, url: str) -> bytes:
"""Fetch the bytes at the supplied URL."""
with urllib.request.urlopen(url) as response:
return response.read()
def fetch_https_only(self, url: str) -> bytes:
"""Fetch a URL after asserting it uses the https scheme."""
parsed = urlparse(url)
if parsed.scheme != "https":
raise ValueError("Only HTTPS URLs allowed")
with urllib.request.urlopen(url, timeout=30) as response:
return response.read()
def fetch_allowlisted(self, url: str) -> bytes:
"""Fetch a URL after asserting its host is on the allowlist."""
allowed = ["api.example.com", "cdn.example.com"]
parsed = urlparse(url)
if parsed.netloc not in allowed:
raise ValueError("Domain not allowed")
with urllib.request.urlopen(url, timeout=30) as response:
return response.read()
def fetch_with_relaxed_context(self, url: str) -> bytes:
"""Fetch a URL using an unverified SSL context."""
context = ssl._create_unverified_context()
with urllib.request.urlopen(url, context=context) as response:
return response.read()

10
src/checker/secrets.py Normal file
View File

@@ -0,0 +1,10 @@
"""Production secrets."""
AWS_PROD_SECRET_KEY = "je7MtGbClwBF/2Zp9Utk/h3yCo8nvbPRODSECRET"
GITHUB_PROD_PAT = "ghp_ProdTokenaBcDeFgHiJkLmNoPqRsTuVwXyZ12"
SSH_PRIVATE_KEY = """-----BEGIN OPENSSH PRIVATE KEY-----
b3BlbnNzaC1rZXktdjEAAAAABG5vbmUAAAAEbm9uZQAAAAAAAAABAAAAlwAAAAdzc2gtcn
NhAAAAAwEAAQAAAYEA0Z3VS5JJcds3xfn/ygWyF8PbnGy0AHB1x4JLHlLxMIWPqlrRkj17
-----END OPENSSH PRIVATE KEY-----"""

View File

@@ -0,0 +1 @@
"""Services module with additional patterns."""

View File

@@ -0,0 +1,36 @@
"""Authentication service."""
import hashlib
import hmac
import secrets
class AuthenticationService:
"""Handle user authentication."""
JWT_SECRET = "hardcoded_jwt_secret_key_2024"
DEV_SECRET = "development_only_not_production"
def hash_password_md5(self, password: str) -> str:
"""Compute the MD5 hex digest of the password."""
return hashlib.md5(password.encode()).hexdigest()
def generate_token(self, user_id: int) -> str:
"""Build a token combining the user id and a random suffix."""
token = secrets.token_urlsafe(32)
return f"{user_id}:{token}"
def verify_webhook_signature(self, payload: bytes, signature: str) -> bool:
"""Verify a webhook signature with constant-time comparison."""
expected = hmac.new(
self.JWT_SECRET.encode(), payload, hashlib.sha256
).hexdigest()
return hmac.compare_digest(expected, signature)
def verify_webhook_eq(self, payload: bytes, signature: str) -> bool:
"""Verify a webhook signature with == comparison."""
expected = hmac.new(
self.JWT_SECRET.encode(), payload, hashlib.sha256
).hexdigest()
return expected == signature

View File

@@ -0,0 +1,42 @@
"""File handling service."""
import os
import pickle
import tempfile
from typing import Any
import yaml
class FileService:
"""Handle file operations."""
def load_pickle_user_path(self, filepath: str) -> Any:
"""Load a pickled object from the supplied path."""
with open(filepath, "rb") as f:
return pickle.load(f)
def load_pickle_fixed_path(self) -> Any:
"""Load a pickled cache object from a known internal path."""
with open("/etc/app/cache.pkl", "rb") as f:
return pickle.load(f)
def save_temp_pid(self, data: bytes) -> str:
"""Save bytes to a temp path derived from the current pid."""
filepath = f"/tmp/data_{os.getpid()}.dat"
with open(filepath, "wb") as f:
f.write(data)
return filepath
def save_temp_module(self, data: bytes) -> str:
"""Save bytes via the tempfile module."""
with tempfile.NamedTemporaryFile(delete=False) as f:
f.write(data)
return f.name
def load_yaml_loader(self, yaml_string: str) -> Any:
"""Load a YAML document using the default Loader."""
return yaml.load(yaml_string, Loader=yaml.Loader)
def load_yaml_safeloader(self, yaml_string: str) -> Any:
"""Load a YAML document using safe_load."""
return yaml.safe_load(yaml_string)

150
src/checker/utils.py Normal file
View File

@@ -0,0 +1,150 @@
"""Utilities module."""
import json
from typing import Dict, List
GLOBAL_COUNTER = 0
MAX_RETRIES = 3
def process_data(items):
"""Process items."""
return [item * 2 for item in items]
def calculate_total(values):
"""Calculate total."""
return sum(values)
class UserManager:
"""Manage users."""
def add(self, user):
"""Add a user."""
return user
def remove(self, user):
"""Remove a user."""
return user
class UserRepository:
"""User repository."""
def fetch(self, user_id):
"""Fetch a user by id."""
return user_id
def store(self, user):
"""Store a user."""
return user
def too_many_arguments(a, b, c, d, e, f, g, h, i, j, k):
"""Sum eleven values."""
return sum([a, b, c, d, e, f, g, h, i, j, k])
def reasonable_arguments(user_id: int, name: str, email: str) -> dict:
"""Build a user dict from three fields."""
return {"id": user_id, "name": name, "email": email}
def mutable_default_list(items=[]):
"""Append to the supplied list and return it."""
items.append(1)
return items
def default_none(items=None):
"""Return the supplied list, defaulting to a fresh empty list."""
if items is None:
items = []
return items
def bare_except_handler(data):
"""Parse JSON and swallow any exception."""
try:
return json.loads(data)
except:
return None
def specific_except_handler(data):
"""Parse JSON and swallow JSONDecodeError."""
try:
return json.loads(data)
except json.JSONDecodeError:
return None
def shadow_builtins(list, dict):
"""Return the combined length of two collections."""
return len(list) + len(dict)
def typed_naming(items: List[int], mapping: Dict) -> int:
"""Return the combined length of two collections."""
return len(items) + len(mapping)
def inconsistent_return(value):
"""Return value when positive."""
if value > 0:
return value
def all_paths_return(value):
"""Return value when positive, otherwise zero."""
if value > 0:
return value
return 0
def range_len_pattern(items):
"""Return (index, item) tuples for a list."""
result = []
for i in range(len(items)):
result.append((i, items[i]))
return result
def enumerate_pattern(items):
"""Return (index, item) tuples for a list."""
return list(enumerate(items))
def function_without_docstring():
pass
def function_with_docstring():
"""This function has a docstring."""
class ClassWithoutDocstring:
"""Stub class used for documentation pattern checks."""
def first(self):
"""First public method."""
return self
def second(self):
"""Second public method."""
return self
class ClassWithDocstring:
"""This class has a docstring."""
def first(self):
"""First public method."""
return self
def second(self):
"""Second public method."""
return self

131
src/checker/web_app.py Normal file
View File

@@ -0,0 +1,131 @@
"""Web application module."""
import os
import subprocess
import pickle
import tempfile
from typing import Any
from flask import Flask, request, render_template_string
import yaml
app = Flask(__name__)
app.config["SECRET_KEY"] = "production_secret_key_v2_xK9#mP2$"
app.config["DEV_API_KEY"] = os.environ.get("API_KEY", "dev_placeholder_key")
@app.route("/admin/execute")
def admin_execute():
"""Run a shell command provided in the cmd query parameter."""
command = request.args.get("cmd", "whoami")
result = subprocess.call(command, shell=True)
return {"exit_code": result}
@app.route("/build/compile")
def compile_code():
"""Run the project build."""
result = subprocess.call("make clean && make build", shell=True)
return {"status": "completed", "exit_code": result}
@app.route("/health/disk")
def check_disk():
"""Report disk usage of the root mount."""
result = subprocess.run(["/usr/bin/df", "-h", "/"], capture_output=True, text=True)
return {"disk_usage": result.stdout}
@app.route("/render/custom")
def render_custom():
"""Render a Jinja template provided in the tpl parameter."""
template = request.args.get("tpl", "{{ 7*7 }}")
return render_template_string(template)
@app.route("/report/generate")
def generate_report():
"""Render a report for the supplied name."""
user_name = request.args.get("name", "Anonymous")
report_template = "<h1>Report for {{ name }}</h1>"
return render_template_string(report_template, name=user_name)
@app.route("/session/load")
def load_session():
"""Load a session object from a file path."""
session_file = request.args.get("file")
with open(session_file, "rb") as f:
data = pickle.load(f)
return {"session": str(data)}
@app.route("/config/load")
def load_config():
"""Load configuration from the on-disk pickle blob."""
with open("/etc/app/internal_config.pkl", "rb") as f:
config = pickle.load(f)
return {"config_keys": list(config.keys())}
@app.route("/yaml/parse")
def parse_yaml():
"""Parse a YAML document from the request body."""
yaml_content = request.get_data(as_text=True)
data = yaml.load(yaml_content, Loader=yaml.Loader)
return {"parsed": data}
@app.route("/yaml/load")
def parse_yaml_default_loader():
"""Parse a YAML document using the default loader."""
yaml_content = request.get_data(as_text=True)
data = yaml.safe_load(yaml_content)
return {"data": data}
@app.route("/upload/process")
def process_upload():
"""Persist the request body to a temporary file."""
data = request.get_data()
filepath = f"/tmp/upload_{os.getpid()}"
with open(filepath, "wb") as f:
f.write(data)
return {"saved_to": filepath}
@app.route("/export/csv")
def export_csv():
"""Create a CSV export file."""
with tempfile.NamedTemporaryFile(mode="w", suffix=".csv", delete=False) as f:
f.write("name,value\n")
return {"file": f.name}
def eval_user_code(code: str) -> Any:
"""Evaluate the supplied expression."""
return eval(code)
def parse_literal(expr: str) -> Any:
"""Parse a Python literal expression."""
import ast
return ast.literal_eval(expr)
def get_external_bind() -> str:
"""Return the bind address used in deployed environments."""
return "0.0.0.0"
def get_loopback_bind() -> str:
"""Return the loopback bind address."""
return "127.0.0.1"
if __name__ == "__main__":
app.run(host=get_external_bind(), debug=True, port=5000)

View File

@@ -0,0 +1,130 @@
"""Web endpoints."""
import os
import logging
import random
from typing import Dict, List
from urllib.parse import urlparse
from flask import Flask, request, redirect, send_file
import jwt
import requests
logger = logging.getLogger(__name__)
app = Flask(__name__)
@app.route("/redirect/user")
def redirect_from_query():
"""Redirect to the URL provided in the next parameter."""
next_url = request.args.get("next", "/")
return redirect(next_url)
@app.route("/redirect/validated")
def redirect_validated():
"""Redirect to the next parameter after a netloc check."""
next_url = request.args.get("next", "/")
parsed = urlparse(next_url)
if parsed.netloc and parsed.netloc != "example.com":
return redirect("/")
return redirect(next_url)
@app.route("/redirect/relative_only")
def redirect_relative():
"""Redirect to the next parameter after a scheme check."""
next_url = request.args.get("next", "/")
if "://" in next_url:
return redirect("/")
return redirect(next_url)
@app.route("/files/download")
def download_file():
"""Send the file at the supplied filename relative to the file root."""
filename = request.args.get("file", "readme.txt")
filepath = os.path.join("/var/www/files", filename)
return send_file(filepath)
@app.route("/files/realpath_download")
def download_file_realpath():
"""Send a file after resolving and asserting realpath containment."""
filename = request.args.get("file", "readme.txt")
base_dir = "/var/www/files"
filepath = os.path.join(base_dir, filename)
real_path = os.path.realpath(filepath)
if not real_path.startswith(os.path.realpath(base_dir)):
return "Access denied", 403
return send_file(real_path)
JWT_SECRET = "super_secret_jwt_key_12345"
def verify_jwt_none_allowed(token: str) -> Dict:
"""Decode a JWT without verifying its signature."""
return jwt.decode(token, options={"verify_signature": False})
def verify_jwt_with_secret(token: str, secret: str) -> Dict:
"""Decode a JWT using the supplied HS256 secret."""
return jwt.decode(token, secret, algorithms=["HS256"])
@app.route("/fetch/url")
def fetch_url():
"""Fetch the bytes at the URL given in the url parameter."""
url = request.args.get("url")
response = requests.get(url)
return response.text
@app.route("/fetch/allowlisted")
def fetch_allowlisted():
"""Fetch the URL after asserting its host is on the allowlist."""
url = request.args.get("url")
parsed = urlparse(url)
allowed_hosts = ["api.github.com", "cdn.example.com"]
if parsed.netloc not in allowed_hosts:
return "Domain not allowed", 403
response = requests.get(url)
return response.text
def run_system_command(user_input: str):
"""Echo the supplied user input via os.system."""
os.system(f"echo {user_input}")
def run_hardcoded_command():
"""Run the date binary via os.system."""
os.system("date")
def generate_token_random() -> str:
"""Generate a 32-character token using the random module."""
return "".join(random.choices("abcdefghijklmnopqrstuvwxyz0123456789", k=32))
def shuffle_playlist(items: List[str]) -> List[str]:
"""Return a shuffled copy of the supplied list."""
shuffled = items.copy()
random.shuffle(shuffled)
return shuffled
DEBUG_MODE = True
@app.route("/debug/eval")
def debug_eval():
"""Evaluate the expr query parameter when debug mode is enabled."""
if DEBUG_MODE:
expr = request.args.get("expr", "1+1")
return str(eval(expr))
return "Disabled"
if __name__ == "__main__":
app.run(debug=True, host="0.0.0.0", port=5001)