176 lines
5.5 KiB
Python
176 lines
5.5 KiB
Python
import hashlib
|
|
import hmac
|
|
import logging
|
|
from typing import Optional
|
|
|
|
from ..config import get_config
|
|
from ..models import GiteaPullRequest, WebhookEvent
|
|
from ..services import extract_issue_ids, get_gitea_client, get_linear_client
|
|
from ..services.linear_client import LinearIssueUpdate
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
MAGIC_WORDS = ["closes", "closes", "fixes", "resolves", "addresses"]
|
|
|
|
|
|
def verify_gitea_signature(payload: bytes, signature: str) -> bool:
|
|
config = get_config()
|
|
secret = config.gitea.webhook_secret
|
|
if not secret:
|
|
return True
|
|
expected = hmac.new(
|
|
secret.encode(), payload, hashlib.sha256
|
|
).hexdigest()
|
|
return hmac.compare_digest(f"sha256={expected}", signature)
|
|
|
|
|
|
def handle_push_event(payload: dict) -> list[str]:
|
|
if not get_config().features.commit_linking:
|
|
return []
|
|
|
|
commits = payload.get("commits", [])
|
|
repo = payload.get("repository", {}).get("full_name", "")
|
|
linked_issues = []
|
|
|
|
linear_client = get_linear_client()
|
|
|
|
for commit in commits:
|
|
message = commit.get("message", "")
|
|
issue_ids = extract_issue_ids(message)
|
|
|
|
for issue_id in issue_ids:
|
|
issue = linear_client.get_issue_by_identifier(issue_id)
|
|
if issue:
|
|
comment = f"Commit linked: {commit.get('id', '')[:7]} - {message[:100]}"
|
|
linear_client.add_comment(issue.id, comment)
|
|
linked_issues.append(issue_id)
|
|
logger.info(f"Linked commit to {issue_id}")
|
|
|
|
return linked_issues
|
|
|
|
|
|
def handle_pull_request_event(payload: dict) -> list[str]:
|
|
action = payload.get("action", "")
|
|
pr_data = payload.get("pull_request", {})
|
|
repo = payload.get("repository", {}).get("full_name", "")
|
|
|
|
if not repo or repo not in get_config().sync.enabled_repos:
|
|
return []
|
|
|
|
pr = GiteaPullRequest(
|
|
id=pr_data.get("id", 0),
|
|
number=pr_data.get("number", 0),
|
|
title=pr_data.get("title", ""),
|
|
body=pr_data.get("body"),
|
|
state=pr_data.get("state", "open"),
|
|
head_branch=pr_data.get("head", {}).get("ref", ""),
|
|
base_branch=pr_data.get("base", {}).get("ref", ""),
|
|
html_url=pr_data.get("html_url", ""),
|
|
merged=pr_data.get("merged", False),
|
|
repository=repo,
|
|
)
|
|
|
|
linked_issues = []
|
|
|
|
issue_ids = extract_issue_ids(pr.title)
|
|
if pr.body:
|
|
issue_ids.extend(extract_issue_ids(pr.body))
|
|
issue_ids.extend(extract_issue_ids(pr.head_branch))
|
|
|
|
issue_ids = list(set(issue_ids))
|
|
|
|
linear_client = get_linear_client()
|
|
|
|
for issue_id in issue_ids:
|
|
issue = linear_client.get_issue_by_identifier(issue_id)
|
|
if not issue:
|
|
continue
|
|
|
|
if get_config().features.pr_linking:
|
|
linear_client.link_pull_request(issue.id, pr.html_url, pr.title)
|
|
logger.info(f"Linked PR to {issue_id}")
|
|
|
|
if get_config().features.workflow_automation:
|
|
status_map = get_config().sync.status_mappings
|
|
|
|
if action == "opened" or action == "reopened":
|
|
new_state = status_map.pr_created
|
|
elif action == "closed" and pr.merged:
|
|
new_state = status_map.pr_merged
|
|
elif action == "closed" and not pr.merged:
|
|
new_state = status_map.pr_closed
|
|
else:
|
|
continue
|
|
|
|
states = linear_client.get_states_for_team(issue.team_id or "")
|
|
state_id = next(
|
|
(s["id"] for s in states if s["name"] == new_state), None
|
|
)
|
|
|
|
if state_id:
|
|
linear_client.update_issue(
|
|
issue.id, LinearIssueUpdate(state_id=state_id)
|
|
)
|
|
logger.info(f"Updated {issue_id} status to {new_state}")
|
|
|
|
linked_issues.append(issue_id)
|
|
|
|
return linked_issues
|
|
|
|
|
|
def handle_issues_event(payload: dict) -> Optional[str]:
|
|
if not get_config().features.issues_sync:
|
|
return None
|
|
|
|
action = payload.get("action", "")
|
|
if action not in ["opened", "closed"]:
|
|
return None
|
|
|
|
repo = payload.get("repository", {}).get("full_name", "")
|
|
if not repo or repo not in get_config().sync.enabled_repos:
|
|
return None
|
|
|
|
issue_data = payload.get("issue", {})
|
|
issue_id = issue_data.get("id", 0)
|
|
issue_number = issue_data.get("number", 0)
|
|
title = issue_data.get("title", "")
|
|
body = issue_data.get("body", "")
|
|
|
|
linear_client = get_linear_client()
|
|
gitea_client = get_gitea_client()
|
|
|
|
config = get_config()
|
|
sync_direction = config.sync.sync_direction
|
|
|
|
if sync_direction == "one-way":
|
|
existing_issue = linear_client.get_issue_by_identifier(f"GIT-{issue_number}")
|
|
if existing_issue:
|
|
return None
|
|
|
|
from ..models import LinearIssueCreate
|
|
|
|
linear_issue = linear_client.create_issue(
|
|
LinearIssueCreate(
|
|
team_id=config.sync.default_team_id,
|
|
title=title,
|
|
description=f"Synced from Gitea\n\n{body}",
|
|
)
|
|
)
|
|
if linear_issue:
|
|
logger.info(f"Created Linear issue {linear_issue.identifier} from Gitea issue #{issue_number}")
|
|
return linear_issue.identifier
|
|
|
|
return None
|
|
|
|
|
|
def handle_gitea_webhook(event: str, payload: dict) -> list[str]:
|
|
results = []
|
|
|
|
if event == "push":
|
|
results = handle_push_event(payload)
|
|
elif event == "pull_request":
|
|
results = handle_pull_request_event(payload)
|
|
elif event == "issues":
|
|
results = [handle_issues_event(payload)] if handle_issues_event(payload) else []
|
|
|
|
return results |