@@ -0,0 +1,210 @@
+#!/usr/bin/env python3
+"""
+GitHub PR Analyzer for zed-industries/zed repository
+Downloads all PRs and groups them by first assignee with status, open date, and last updated date.
+"""
+
+import urllib.request
+import urllib.parse
+import urllib.error
+import json
+from datetime import datetime
+from collections import defaultdict
+import sys
+import os
+
+# GitHub API configuration
+GITHUB_API_BASE = "https://api.github.com"
+REPO_OWNER = "zed-industries"
+REPO_NAME = "zed"
+GITHUB_TOKEN = os.getenv("GITHUB_TOKEN")
+
+def make_github_request(url, params=None):
+ """Make a request to GitHub API with proper headers and pagination support."""
+ if params:
+ url_parts = list(urllib.parse.urlparse(url))
+ query = dict(urllib.parse.parse_qsl(url_parts[4]))
+ query.update(params)
+ url_parts[4] = urllib.parse.urlencode(query)
+ url = urllib.parse.urlunparse(url_parts)
+
+ req = urllib.request.Request(url)
+ req.add_header("Accept", "application/vnd.github.v3+json")
+ req.add_header("User-Agent", "GitHub-PR-Analyzer")
+
+ if GITHUB_TOKEN:
+ req.add_header("Authorization", f"token {GITHUB_TOKEN}")
+
+ try:
+ response = urllib.request.urlopen(req)
+ return response
+ except urllib.error.URLError as e:
+ print(f"Error making request to {url}: {e}")
+ return None
+ except urllib.error.HTTPError as e:
+ print(f"HTTP error {e.code} for {url}: {e.reason}")
+ return None
+
+def fetch_all_prs():
+ """Fetch all PRs from the repository using pagination."""
+ prs = []
+ page = 1
+ per_page = 100
+
+ print("Fetching PRs from GitHub API...")
+
+ while True:
+ url = f"{GITHUB_API_BASE}/repos/{REPO_OWNER}/{REPO_NAME}/pulls"
+ params = {
+ "state": "open",
+ "sort": "updated",
+ "direction": "desc",
+ "per_page": per_page,
+ "page": page
+ }
+
+ response = make_github_request(url, params)
+ if not response:
+ break
+
+ try:
+ data = response.read().decode('utf-8')
+ page_prs = json.loads(data)
+ except (json.JSONDecodeError, UnicodeDecodeError) as e:
+ print(f"Error parsing response: {e}")
+ break
+
+ if not page_prs:
+ break
+
+ prs.extend(page_prs)
+ print(f"Fetched page {page}: {len(page_prs)} PRs (Total: {len(prs)})")
+
+ # Check if we have more pages
+ link_header = response.getheader('Link', '')
+ if 'rel="next"' not in link_header:
+ break
+
+ page += 1
+
+ print(f"Total PRs fetched: {len(prs)}")
+ return prs
+
+def format_date_as_days_ago(date_string):
+ """Format ISO date string as 'X days ago'."""
+ if not date_string:
+ return "N/A days ago"
+
+ try:
+ dt = datetime.fromisoformat(date_string.replace('Z', '+00:00'))
+ now = datetime.now(dt.tzinfo)
+ days_diff = (now - dt).days
+
+ if days_diff == 0:
+ return "today"
+ elif days_diff == 1:
+ return "1 day ago"
+ else:
+ return f"{days_diff} days ago"
+ except:
+ return "N/A days ago"
+
+def get_first_assignee(pr):
+ """Get the first assignee from a PR, or return 'Unassigned' if none."""
+ assignees = pr.get('assignees', [])
+ if assignees:
+ return assignees[0].get('login', 'Unknown')
+ return 'Unassigned'
+
+def get_pr_status(pr):
+ """Determine if PR is draft or ready for review."""
+ if pr.get('draft', False):
+ return "Draft"
+ return "Ready"
+
+def analyze_prs(prs):
+ """Group PRs by first assignee and organize the data."""
+ grouped_prs = defaultdict(list)
+
+ for pr in prs:
+ assignee = get_first_assignee(pr)
+
+ pr_info = {
+ 'number': pr['number'],
+ 'title': pr['title'],
+ 'status': get_pr_status(pr),
+ 'state': pr['state'],
+ 'created_at': format_date_as_days_ago(pr['created_at']),
+ 'updated_at': format_date_as_days_ago(pr['updated_at']),
+ 'updated_at_raw': pr['updated_at'],
+ 'url': pr['html_url'],
+ 'author': pr['user']['login']
+ }
+
+ grouped_prs[assignee].append(pr_info)
+
+ # Sort PRs within each group by update date (newest first)
+ for assignee in grouped_prs:
+ grouped_prs[assignee].sort(key=lambda x: x['updated_at_raw'], reverse=True)
+
+ return dict(grouped_prs)
+
+def print_pr_report(grouped_prs):
+ """Print formatted report of PRs grouped by assignee."""
+ print(f"OPEN PR REPORT FOR {REPO_OWNER}/{REPO_NAME}")
+ print()
+
+ # Sort assignees alphabetically, but put 'Unassigned' last
+ assignees = sorted(grouped_prs.keys())
+ if 'Unassigned' in assignees:
+ assignees.remove('Unassigned')
+ assignees.append('Unassigned')
+
+ total_prs = sum(len(prs) for prs in grouped_prs.values())
+ print(f"Total Open PRs: {total_prs}")
+ print()
+
+ for assignee in assignees:
+ prs = grouped_prs[assignee]
+ assignee_display = f"@{assignee}" if assignee != 'Unassigned' else assignee
+ print(f"assigned to {assignee_display} ({len(prs)} PRs):")
+
+ for pr in prs:
+ print(f"- {pr['author']}: [{pr['title']}]({pr['url']}) opened:{pr['created_at']} updated:{pr['updated_at']}")
+
+ print()
+
+def save_json_report(grouped_prs, filename="pr_report.json"):
+ """Save the PR data to a JSON file."""
+ try:
+ with open(filename, 'w') as f:
+ json.dump(grouped_prs, f, indent=2)
+ print(f"📄 Report saved to {filename}")
+ except Exception as e:
+ print(f"Error saving JSON report: {e}")
+
+def main():
+ """Main function to orchestrate the PR analysis."""
+ print("GitHub PR Analyzer")
+ print("==================")
+
+ if not GITHUB_TOKEN:
+ print("⚠️ Warning: GITHUB_TOKEN not set. You may hit rate limits.")
+ print(" Set GITHUB_TOKEN environment variable for authenticated requests.")
+ print()
+
+ # Fetch all PRs
+ prs = fetch_all_prs()
+
+ if not prs:
+ print("❌ Failed to fetch PRs. Please check your connection and try again.")
+ sys.exit(1)
+
+ # Analyze and group PRs
+ grouped_prs = analyze_prs(prs)
+
+ # Print report
+ print_pr_report(grouped_prs)
+
+if __name__ == "__main__":
+ main()