main.py

  1import os
  2from collections import defaultdict
  3from datetime import datetime, timedelta
  4from typing import Optional
  5
  6from github import Github
  7from github.Issue import Issue
  8from github.Repository import Repository
  9from pytz import timezone
 10
 11import typer
 12from typer import Typer
 13
 14app: Typer = typer.Typer()
 15
 16DATETIME_FORMAT: str = "%m/%d/%Y %I:%M %p"
 17CORE_LABELS: set[str] = set(
 18    [
 19        "defect",
 20        "design",
 21        "documentation",
 22        "enhancement",
 23        "panic / crash",
 24    ]
 25)
 26# A set of labels for adding in labels that we want present in the final
 27# report, but that we don't want being defined as a core label, since issues
 28# with without core labels are flagged as errors.
 29ADDITIONAL_LABELS: set[str] = set(
 30    [
 31        "ai",
 32        "linux",
 33        "vim",
 34        "windows",
 35    ]
 36)
 37IGNORED_LABELS: set[str] = set(
 38    [
 39        "ignore top-ranking issues",
 40        "meta",
 41    ]
 42)
 43ISSUES_PER_LABEL: int = 20
 44
 45
 46class IssueData:
 47    def __init__(self, issue: Issue) -> None:
 48        self.url: str = issue.html_url
 49        self.like_count: int = issue._rawData["reactions"]["+1"]  # type: ignore [attr-defined]
 50        self.creation_datetime: str = issue.created_at.strftime(DATETIME_FORMAT)
 51        # TODO: Change script to support storing labels here, rather than directly in the script
 52        self.labels: set[str] = set(label["name"] for label in issue._rawData["labels"])  # type: ignore [attr-defined]
 53
 54
 55@app.command()
 56def main(
 57    github_token: Optional[str] = None,
 58    issue_reference_number: Optional[int] = None,
 59    query_day_interval: Optional[int] = None,
 60) -> None:
 61    start_time: datetime = datetime.now()
 62
 63    start_date: datetime | None = None
 64
 65    if query_day_interval:
 66        tz = timezone("america/new_york")
 67        current_time = datetime.now(tz).replace(
 68            hour=0, minute=0, second=0, microsecond=0
 69        )
 70        start_date = current_time - timedelta(days=query_day_interval)
 71
 72    # GitHub Workflow will pass in the token as an environment variable,
 73    # but we can place it in our env when running the script locally, for convenience
 74    github_token = github_token or os.getenv("GITHUB_ACCESS_TOKEN")
 75    github = Github(github_token)
 76
 77    remaining_requests_before: int = github.rate_limiting[0]
 78    print(f"Remaining requests before: {remaining_requests_before}")
 79
 80    repo_name: str = "zed-industries/zed"
 81    repository: Repository = github.get_repo(repo_name)
 82
 83    # There has to be a nice way of adding types to tuple unpacking
 84    label_to_issue_data: dict[str, list[IssueData]]
 85    error_message_to_erroneous_issue_data: dict[str, list[IssueData]]
 86    (
 87        label_to_issue_data,
 88        error_message_to_erroneous_issue_data,
 89    ) = get_issue_maps(github, repository, start_date)
 90
 91    issue_text: str = get_issue_text(
 92        label_to_issue_data,
 93        error_message_to_erroneous_issue_data,
 94    )
 95
 96    if issue_reference_number:
 97        top_ranking_issues_issue: Issue = repository.get_issue(issue_reference_number)
 98        top_ranking_issues_issue.edit(body=issue_text)
 99    else:
100        print(issue_text)
101
102    remaining_requests_after: int = github.rate_limiting[0]
103    print(f"Remaining requests after: {remaining_requests_after}")
104    print(f"Requests used: {remaining_requests_before - remaining_requests_after}")
105
106    run_duration: timedelta = datetime.now() - start_time
107    print(run_duration)
108
109
110def get_issue_maps(
111    github: Github,
112    repository: Repository,
113    start_date: datetime | None = None,
114) -> tuple[dict[str, list[IssueData]], dict[str, list[IssueData]]]:
115    label_to_issues: defaultdict[str, list[Issue]] = get_label_to_issues(
116        github,
117        repository,
118        start_date,
119    )
120    label_to_issue_data: dict[str, list[IssueData]] = get_label_to_issue_data(
121        label_to_issues
122    )
123
124    error_message_to_erroneous_issues: defaultdict[str, list[Issue]] = (
125        get_error_message_to_erroneous_issues(github, repository)
126    )
127    error_message_to_erroneous_issue_data: dict[str, list[IssueData]] = (
128        get_error_message_to_erroneous_issue_data(error_message_to_erroneous_issues)
129    )
130
131    # Create a new dictionary with labels ordered by the summation the of likes on the associated issues
132    labels = list(label_to_issue_data.keys())
133
134    labels.sort(
135        key=lambda label: sum(
136            issue_data.like_count for issue_data in label_to_issue_data[label]
137        ),
138        reverse=True,
139    )
140
141    label_to_issue_data = {label: label_to_issue_data[label] for label in labels}
142
143    return (
144        label_to_issue_data,
145        error_message_to_erroneous_issue_data,
146    )
147
148
149def get_label_to_issues(
150    github: Github,
151    repository: Repository,
152    start_date: datetime | None = None,
153) -> defaultdict[str, list[Issue]]:
154    label_to_issues: defaultdict[str, list[Issue]] = defaultdict(list)
155
156    labels: set[str] = CORE_LABELS | ADDITIONAL_LABELS
157    ignored_labels_text: str = " ".join(
158        [f'-label:"{label}"' for label in IGNORED_LABELS]
159    )
160
161    date_query: str = (
162        f"created:>={start_date.strftime('%Y-%m-%d')}" if start_date else ""
163    )
164
165    for label in labels:
166        query: str = f'repo:{repository.full_name} is:open is:issue {date_query} label:"{label}" {ignored_labels_text} sort:reactions-+1-desc'
167
168        issues = github.search_issues(query)
169
170        if issues.totalCount > 0:
171            for issue in issues[0:ISSUES_PER_LABEL]:
172                label_to_issues[label].append(issue)
173
174    return label_to_issues
175
176
177def get_label_to_issue_data(
178    label_to_issues: defaultdict[str, list[Issue]],
179) -> dict[str, list[IssueData]]:
180    label_to_issue_data: dict[str, list[IssueData]] = {}
181
182    for label in label_to_issues:
183        issues: list[Issue] = label_to_issues[label]
184        issue_data: list[IssueData] = [IssueData(issue) for issue in issues]
185        issue_data.sort(
186            key=lambda issue_data: (
187                -issue_data.like_count,
188                issue_data.creation_datetime,
189            )
190        )
191
192        if issue_data:
193            label_to_issue_data[label] = issue_data
194
195    return label_to_issue_data
196
197
198def get_error_message_to_erroneous_issues(
199    github: Github, repository: Repository
200) -> defaultdict[str, list[Issue]]:
201    error_message_to_erroneous_issues: defaultdict[str, list[Issue]] = defaultdict(list)
202
203    # Query for all open issues that don't have either a core or ignored label and mark those as erroneous
204    filter_labels: set[str] = CORE_LABELS | IGNORED_LABELS
205    filter_labels_text: str = " ".join([f'-label:"{label}"' for label in filter_labels])
206    query: str = f"repo:{repository.full_name} is:open is:issue {filter_labels_text}"
207
208    for issue in github.search_issues(query):
209        error_message_to_erroneous_issues["missing core label"].append(issue)
210
211    return error_message_to_erroneous_issues
212
213
214def get_error_message_to_erroneous_issue_data(
215    error_message_to_erroneous_issues: defaultdict[str, list[Issue]],
216) -> dict[str, list[IssueData]]:
217    error_message_to_erroneous_issue_data: dict[str, list[IssueData]] = {}
218
219    for label in error_message_to_erroneous_issues:
220        issues: list[Issue] = error_message_to_erroneous_issues[label]
221        issue_data: list[IssueData] = [IssueData(issue) for issue in issues]
222        error_message_to_erroneous_issue_data[label] = issue_data
223
224    return error_message_to_erroneous_issue_data
225
226
227def get_issue_text(
228    label_to_issue_data: dict[str, list[IssueData]],
229    error_message_to_erroneous_issue_data: dict[str, list[IssueData]],
230) -> str:
231    tz = timezone("america/new_york")
232    current_datetime: str = datetime.now(tz).strftime(f"{DATETIME_FORMAT} (%Z)")
233
234    highest_ranking_issues_lines: list[str] = get_highest_ranking_issues_lines(
235        label_to_issue_data
236    )
237
238    issue_text_lines: list[str] = [
239        f"*Updated on {current_datetime}*",
240        *highest_ranking_issues_lines,
241        "",
242        "---\n",
243    ]
244
245    erroneous_issues_lines: list[str] = get_erroneous_issues_lines(
246        error_message_to_erroneous_issue_data
247    )
248
249    if erroneous_issues_lines:
250        core_labels_text: str = ", ".join(
251            f'"{core_label}"' for core_label in CORE_LABELS
252        )
253        ignored_labels_text: str = ", ".join(
254            f'"{ignored_label}"' for ignored_label in IGNORED_LABELS
255        )
256
257        issue_text_lines.extend(
258            [
259                "## errors with issues (this section only shows when there are errors with issues)\n",
260                f"This script expects every issue to have at least one of the following core labels: {core_labels_text}",
261                f"This script currently ignores issues that have one of the following labels: {ignored_labels_text}\n",
262                "### what to do?\n",
263                "- Adjust the core labels on an issue to put it into a correct state or add a currently-ignored label to the issue",
264                "- Adjust the core and ignored labels registered in this script",
265                *erroneous_issues_lines,
266                "",
267                "---\n",
268            ]
269        )
270
271    issue_text_lines.extend(
272        [
273            "*For details on how this issue is generated, [see the script](https://github.com/zed-industries/zed/blob/main/script/update_top_ranking_issues/main.py)*",
274        ]
275    )
276
277    return "\n".join(issue_text_lines)
278
279
280def get_highest_ranking_issues_lines(
281    label_to_issue_data: dict[str, list[IssueData]],
282) -> list[str]:
283    highest_ranking_issues_lines: list[str] = []
284
285    if label_to_issue_data:
286        for label, issue_data in label_to_issue_data.items():
287            highest_ranking_issues_lines.append(f"\n## {label}\n")
288
289            for i, issue_data in enumerate(issue_data):
290                markdown_bullet_point: str = (
291                    f"{issue_data.url} ({issue_data.like_count} :thumbsup:)"
292                )
293
294                markdown_bullet_point = f"{i + 1}. {markdown_bullet_point}"
295                highest_ranking_issues_lines.append(markdown_bullet_point)
296
297    return highest_ranking_issues_lines
298
299
300def get_erroneous_issues_lines(
301    error_message_to_erroneous_issue_data,
302) -> list[str]:
303    erroneous_issues_lines: list[str] = []
304
305    if error_message_to_erroneous_issue_data:
306        for (
307            error_message,
308            erroneous_issue_data,
309        ) in error_message_to_erroneous_issue_data.items():
310            erroneous_issues_lines.append(f"\n#### {error_message}\n")
311
312            for erroneous_issue_data in erroneous_issue_data:
313                erroneous_issues_lines.append(f"- {erroneous_issue_data.url}")
314
315    return erroneous_issues_lines
316
317
318if __name__ == "__main__":
319    app()
320
321# TODO: Sort label output into core and non core sections