main.py

  1import os
  2from collections import defaultdict
  3from datetime import datetime, timedelta
  4from typing import Optional
  5
  6from github import Github
  7from github.Issue import Issue
  8from github.Repository import Repository
  9from pytz import timezone
 10
 11import typer
 12from typer import Typer
 13
 14app: Typer = typer.Typer()
 15
 16DATETIME_FORMAT: str = "%m/%d/%Y %I:%M %p"
 17CORE_LABELS: set[str] = {
 18    "defect",
 19    "design",
 20    "documentation",
 21    "enhancement",
 22    "panic / crash",
 23}
 24# A set of labels for adding in labels that we want present in the final
 25# report, but that we don't want being defined as a core label, since issues
 26# with without core labels are flagged as errors.
 27ADDITIONAL_LABELS: set[str] = {
 28    "ai",
 29    "linux",
 30    "vim",
 31    "windows",
 32}
 33IGNORED_LABEL_TEXT: str = "ignore top-ranking issues"
 34ISSUES_PER_LABEL: int = 20
 35
 36
 37class IssueData:
 38    def __init__(self, issue: Issue) -> None:
 39        self.url: str = issue.html_url
 40        self.like_count: int = issue._rawData["reactions"]["+1"]  # type: ignore [attr-defined]
 41        self.creation_datetime: str = issue.created_at.strftime(DATETIME_FORMAT)
 42        # TODO: Change script to support storing labels here, rather than directly in the script
 43        self.labels: set[str] = {label["name"] for label in issue._rawData["labels"]}  # type: ignore [attr-defined]
 44
 45
 46@app.command()
 47def main(
 48    github_token: Optional[str] = None,
 49    issue_reference_number: Optional[int] = None,
 50    query_day_interval: Optional[int] = None,
 51) -> None:
 52    start_time: datetime = datetime.now()
 53
 54    start_date: datetime | None = None
 55
 56    if query_day_interval:
 57        tz = timezone("america/new_york")
 58        current_time = datetime.now(tz).replace(
 59            hour=0, minute=0, second=0, microsecond=0
 60        )
 61        start_date = current_time - timedelta(days=query_day_interval)
 62
 63    # GitHub Workflow will pass in the token as an environment variable,
 64    # but we can place it in our env when running the script locally, for convenience
 65    github_token = github_token or os.getenv("GITHUB_ACCESS_TOKEN")
 66    github = Github(github_token)
 67
 68    remaining_requests_before: int = github.rate_limiting[0]
 69    print(f"Remaining requests before: {remaining_requests_before}")
 70
 71    repo_name: str = "zed-industries/zed"
 72    repository: Repository = github.get_repo(repo_name)
 73
 74    # There has to be a nice way of adding types to tuple unpacking
 75    label_to_issue_data: dict[str, list[IssueData]]
 76    error_message_to_erroneous_issue_data: dict[str, list[IssueData]]
 77    (
 78        label_to_issue_data,
 79        error_message_to_erroneous_issue_data,
 80    ) = get_issue_maps(github, repository, start_date)
 81
 82    issue_text: str = get_issue_text(
 83        label_to_issue_data,
 84        error_message_to_erroneous_issue_data,
 85    )
 86
 87    if issue_reference_number:
 88        top_ranking_issues_issue: Issue = repository.get_issue(issue_reference_number)
 89        top_ranking_issues_issue.edit(body=issue_text)
 90    else:
 91        print(issue_text)
 92
 93    remaining_requests_after: int = github.rate_limiting[0]
 94    print(f"Remaining requests after: {remaining_requests_after}")
 95    print(f"Requests used: {remaining_requests_before - remaining_requests_after}")
 96
 97    run_duration: timedelta = datetime.now() - start_time
 98    print(run_duration)
 99
100
101def get_issue_maps(
102    github: Github,
103    repository: Repository,
104    start_date: datetime | None = None,
105) -> tuple[dict[str, list[IssueData]], dict[str, list[IssueData]]]:
106    label_to_issues: defaultdict[str, list[Issue]] = get_label_to_issues(
107        github,
108        repository,
109        start_date,
110    )
111    label_to_issue_data: dict[str, list[IssueData]] = get_label_to_issue_data(
112        label_to_issues
113    )
114
115    error_message_to_erroneous_issues: defaultdict[str, list[Issue]] = (
116        get_error_message_to_erroneous_issues(github, repository)
117    )
118    error_message_to_erroneous_issue_data: dict[str, list[IssueData]] = (
119        get_error_message_to_erroneous_issue_data(error_message_to_erroneous_issues)
120    )
121
122    # Create a new dictionary with labels ordered by the summation the of likes on the associated issues
123    labels = list(label_to_issue_data.keys())
124
125    labels.sort(
126        key=lambda label: sum(
127            issue_data.like_count for issue_data in label_to_issue_data[label]
128        ),
129        reverse=True,
130    )
131
132    label_to_issue_data = {label: label_to_issue_data[label] for label in labels}
133
134    return (
135        label_to_issue_data,
136        error_message_to_erroneous_issue_data,
137    )
138
139
140def get_label_to_issues(
141    github: Github,
142    repository: Repository,
143    start_date: datetime | None = None,
144) -> defaultdict[str, list[Issue]]:
145    label_to_issues: defaultdict[str, list[Issue]] = defaultdict(list)
146
147    labels: set[str] = CORE_LABELS | ADDITIONAL_LABELS
148
149    date_query: str = (
150        f"created:>={start_date.strftime('%Y-%m-%d')}" if start_date else ""
151    )
152
153    for label in labels:
154        query: str = f'repo:{repository.full_name} is:open is:issue {date_query} label:"{label}" -label:"{IGNORED_LABEL_TEXT}" sort:reactions-+1-desc'
155
156        issues = github.search_issues(query)
157
158        if issues.totalCount > 0:
159            for issue in issues[0:ISSUES_PER_LABEL]:
160                label_to_issues[label].append(issue)
161
162    return label_to_issues
163
164
165def get_label_to_issue_data(
166    label_to_issues: defaultdict[str, list[Issue]],
167) -> dict[str, list[IssueData]]:
168    label_to_issue_data: dict[str, list[IssueData]] = {}
169
170    for label in label_to_issues:
171        issues: list[Issue] = label_to_issues[label]
172        issue_data: list[IssueData] = [IssueData(issue) for issue in issues]
173        issue_data.sort(
174            key=lambda issue_data: (
175                -issue_data.like_count,
176                issue_data.creation_datetime,
177            )
178        )
179
180        if issue_data:
181            label_to_issue_data[label] = issue_data
182
183    return label_to_issue_data
184
185
186def get_error_message_to_erroneous_issues(
187    github: Github, repository: Repository
188) -> defaultdict[str, list[Issue]]:
189    error_message_to_erroneous_issues: defaultdict[str, list[Issue]] = defaultdict(list)
190
191    # Query for all open issues that don't have either a core or the ignored label and mark those as erroneous
192    filter_labels: set[str] = CORE_LABELS | {IGNORED_LABEL_TEXT}
193    filter_labels_text: str = " ".join([f'-label:"{label}"' for label in filter_labels])
194    query: str = f"repo:{repository.full_name} is:open is:issue {filter_labels_text}"
195
196    for issue in github.search_issues(query):
197        error_message_to_erroneous_issues["missing core label"].append(issue)
198
199    return error_message_to_erroneous_issues
200
201
202def get_error_message_to_erroneous_issue_data(
203    error_message_to_erroneous_issues: defaultdict[str, list[Issue]],
204) -> dict[str, list[IssueData]]:
205    error_message_to_erroneous_issue_data: dict[str, list[IssueData]] = {}
206
207    for label in error_message_to_erroneous_issues:
208        issues: list[Issue] = error_message_to_erroneous_issues[label]
209        issue_data: list[IssueData] = [IssueData(issue) for issue in issues]
210        error_message_to_erroneous_issue_data[label] = issue_data
211
212    return error_message_to_erroneous_issue_data
213
214
215def get_issue_text(
216    label_to_issue_data: dict[str, list[IssueData]],
217    error_message_to_erroneous_issue_data: dict[str, list[IssueData]],
218) -> str:
219    tz = timezone("america/new_york")
220    current_datetime: str = datetime.now(tz).strftime(f"{DATETIME_FORMAT} (%Z)")
221
222    highest_ranking_issues_lines: list[str] = get_highest_ranking_issues_lines(
223        label_to_issue_data
224    )
225
226    issue_text_lines: list[str] = [
227        f"*Updated on {current_datetime}*",
228        *highest_ranking_issues_lines,
229        "",
230        "---\n",
231    ]
232
233    erroneous_issues_lines: list[str] = get_erroneous_issues_lines(
234        error_message_to_erroneous_issue_data
235    )
236
237    if erroneous_issues_lines:
238        core_labels_text: str = ", ".join(
239            f'"{core_label}"' for core_label in CORE_LABELS
240        )
241
242        issue_text_lines.extend(
243            [
244                "## errors with issues (this section only shows when there are errors with issues)\n",
245                f"This script expects every issue to have at least one of the following core labels: {core_labels_text}",
246                f"This script currently ignores issues that have the following label: {IGNORED_LABEL_TEXT}\n",
247                "### what to do?\n",
248                "- Adjust the core labels on an issue to put it into a correct state or add a currently-ignored label to the issue",
249                "- Adjust the core and ignored labels registered in this script",
250                *erroneous_issues_lines,
251                "",
252                "---\n",
253            ]
254        )
255
256    issue_text_lines.extend(
257        [
258            "*For details on how this issue is generated, [see the script](https://github.com/zed-industries/zed/blob/main/script/update_top_ranking_issues/main.py)*",
259        ]
260    )
261
262    return "\n".join(issue_text_lines)
263
264
265def get_highest_ranking_issues_lines(
266    label_to_issue_data: dict[str, list[IssueData]],
267) -> list[str]:
268    highest_ranking_issues_lines: list[str] = []
269
270    if label_to_issue_data:
271        for label, issue_data in label_to_issue_data.items():
272            highest_ranking_issues_lines.append(f"\n## {label}\n")
273
274            for i, issue_data in enumerate(issue_data):
275                markdown_bullet_point: str = (
276                    f"{issue_data.url} ({issue_data.like_count} :thumbsup:)"
277                )
278
279                markdown_bullet_point = f"{i + 1}. {markdown_bullet_point}"
280                highest_ranking_issues_lines.append(markdown_bullet_point)
281
282    return highest_ranking_issues_lines
283
284
285def get_erroneous_issues_lines(
286    error_message_to_erroneous_issue_data,
287) -> list[str]:
288    erroneous_issues_lines: list[str] = []
289
290    if error_message_to_erroneous_issue_data:
291        for (
292            error_message,
293            erroneous_issue_data,
294        ) in error_message_to_erroneous_issue_data.items():
295            erroneous_issues_lines.append(f"\n#### {error_message}\n")
296
297            for erroneous_issue_data in erroneous_issue_data:
298                erroneous_issues_lines.append(f"- {erroneous_issue_data.url}")
299
300    return erroneous_issues_lines
301
302
303if __name__ == "__main__":
304    app()
305
306# TODO: Sort label output into core and non core sections