main.py

  1import os
  2from collections import defaultdict
  3from datetime import datetime, timedelta
  4from typing import Optional
  5
  6from github import Github
  7from github.Issue import Issue
  8from github.Repository import Repository
  9from pytz import timezone
 10
 11import typer
 12from typer import Typer
 13
 14app: Typer = typer.Typer()
 15
 16DATETIME_FORMAT: str = "%m/%d/%Y %I:%M %p"
 17CORE_LABELS: set[str] = set(
 18    [
 19        "defect",
 20        "design",
 21        "documentation",
 22        "enhancement",
 23        "panic / crash",
 24        "platform support",
 25    ]
 26)
 27# A set of labels for adding in labels that we want present in the final
 28# report, but that we don't want being defined as a core label, since issues
 29# with without core labels are flagged as errors.
 30ADDITIONAL_LABELS: set[str] = set(["ai", "vim"])
 31IGNORED_LABELS: set[str] = set(
 32    [
 33        "meta",
 34    ]
 35)
 36ISSUES_PER_LABEL: int = 20
 37
 38
 39class IssueData:
 40    def __init__(self, issue: Issue) -> None:
 41        self.url: str = issue.html_url
 42        self.like_count: int = issue._rawData["reactions"]["+1"]  # type: ignore [attr-defined]
 43        self.creation_datetime: str = issue.created_at.strftime(DATETIME_FORMAT)
 44        # TODO: Change script to support storing labels here, rather than directly in the script
 45        self.labels: set[str] = set(label["name"] for label in issue._rawData["labels"])  # type: ignore [attr-defined]
 46
 47
 48@app.command()
 49def main(
 50    github_token: Optional[str] = None,
 51    prod: bool = False,
 52    query_day_interval: Optional[int] = None,
 53) -> None:
 54    start_time: datetime = datetime.now()
 55
 56    start_date: datetime | None = None
 57
 58    if query_day_interval:
 59        tz = timezone("america/new_york")
 60        start_date = datetime.now(tz) - timedelta(days=query_day_interval)
 61
 62    # GitHub Workflow will pass in the token as an environment variable,
 63    # but we can place it in our env when running the script locally, for convenience
 64    github_token = github_token or os.getenv("GITHUB_ACCESS_TOKEN")
 65    github = Github(github_token)
 66
 67    remaining_requests_before: int = github.rate_limiting[0]
 68    print(f"Remaining requests before: {remaining_requests_before}")
 69
 70    repo_name: str = "zed-industries/zed"
 71    repository: Repository = github.get_repo(repo_name)
 72
 73    # There has to be a nice way of adding types to tuple unpacking
 74    label_to_issue_data: dict[str, list[IssueData]]
 75    error_message_to_erroneous_issue_data: dict[str, list[IssueData]]
 76    (
 77        label_to_issue_data,
 78        error_message_to_erroneous_issue_data,
 79    ) = get_issue_maps(github, repository, start_date)
 80
 81    issue_text: str = get_issue_text(
 82        label_to_issue_data,
 83        error_message_to_erroneous_issue_data,
 84    )
 85
 86    if prod:
 87        top_ranking_issues_issue: Issue = repository.get_issue(5393)
 88        top_ranking_issues_issue.edit(body=issue_text)
 89    else:
 90        print(issue_text)
 91
 92    remaining_requests_after: int = github.rate_limiting[0]
 93    print(f"Remaining requests after: {remaining_requests_after}")
 94    print(f"Requests used: {remaining_requests_before - remaining_requests_after}")
 95
 96    run_duration: timedelta = datetime.now() - start_time
 97    print(run_duration)
 98
 99
100def get_issue_maps(
101    github: Github,
102    repository: Repository,
103    start_date: datetime | None = None,
104) -> tuple[dict[str, list[IssueData]], dict[str, list[IssueData]]]:
105    label_to_issues: defaultdict[str, list[Issue]] = get_label_to_issues(
106        github,
107        repository,
108        start_date,
109    )
110    label_to_issue_data: dict[str, list[IssueData]] = get_label_to_issue_data(
111        label_to_issues
112    )
113
114    error_message_to_erroneous_issues: defaultdict[
115        str, list[Issue]
116    ] = get_error_message_to_erroneous_issues(github, repository)
117    error_message_to_erroneous_issue_data: dict[
118        str, list[IssueData]
119    ] = get_error_message_to_erroneous_issue_data(error_message_to_erroneous_issues)
120
121    # Create a new dictionary with labels ordered by the summation the of likes on the associated issues
122    labels = list(label_to_issue_data.keys())
123
124    labels.sort(
125        key=lambda label: sum(
126            issue_data.like_count for issue_data in label_to_issue_data[label]
127        ),
128        reverse=True,
129    )
130
131    label_to_issue_data = {label: label_to_issue_data[label] for label in labels}
132
133    return (
134        label_to_issue_data,
135        error_message_to_erroneous_issue_data,
136    )
137
138
139def get_label_to_issues(
140    github: Github,
141    repository: Repository,
142    start_date: datetime | None = None,
143) -> defaultdict[str, list[Issue]]:
144    label_to_issues: defaultdict[str, list[Issue]] = defaultdict(list)
145
146    labels: set[str] = CORE_LABELS | ADDITIONAL_LABELS
147    ignored_labels_text: str = " ".join(
148        [f'-label:"{label}"' for label in IGNORED_LABELS]
149    )
150
151    date_query: str = (
152        f"created:>={start_date.strftime('%Y-%m-%d')}" if start_date else ""
153    )
154
155    for label in labels:
156        query: str = f'repo:{repository.full_name} is:open is:issue {date_query} label:"{label}" {ignored_labels_text} sort:reactions-+1-desc'
157
158        issues = github.search_issues(query)
159
160        if issues.totalCount > 0:
161            for issue in issues[0:ISSUES_PER_LABEL]:
162                label_to_issues[label].append(issue)
163
164    return label_to_issues
165
166
167def get_label_to_issue_data(
168    label_to_issues: defaultdict[str, list[Issue]]
169) -> dict[str, list[IssueData]]:
170    label_to_issue_data: dict[str, list[IssueData]] = {}
171
172    for label in label_to_issues:
173        issues: list[Issue] = label_to_issues[label]
174        issue_data: list[IssueData] = [IssueData(issue) for issue in issues]
175        issue_data.sort(
176            key=lambda issue_data: (
177                -issue_data.like_count,
178                issue_data.creation_datetime,
179            )
180        )
181
182        if issue_data:
183            label_to_issue_data[label] = issue_data
184
185    return label_to_issue_data
186
187
188def get_error_message_to_erroneous_issues(
189    github: Github, repository: Repository
190) -> defaultdict[str, list[Issue]]:
191    error_message_to_erroneous_issues: defaultdict[str, list[Issue]] = defaultdict(list)
192
193    # Query for all open issues that don't have either a core or ignored label and mark those as erroneous
194    filter_labels: set[str] = CORE_LABELS | IGNORED_LABELS
195    filter_labels_text: str = " ".join([f'-label:"{label}"' for label in filter_labels])
196    query: str = f"repo:{repository.full_name} is:open is:issue {filter_labels_text}"
197
198    for issue in github.search_issues(query):
199        error_message_to_erroneous_issues["missing core label"].append(issue)
200
201    return error_message_to_erroneous_issues
202
203
204def get_error_message_to_erroneous_issue_data(
205    error_message_to_erroneous_issues: defaultdict[str, list[Issue]],
206) -> dict[str, list[IssueData]]:
207    error_message_to_erroneous_issue_data: dict[str, list[IssueData]] = {}
208
209    for label in error_message_to_erroneous_issues:
210        issues: list[Issue] = error_message_to_erroneous_issues[label]
211        issue_data: list[IssueData] = [IssueData(issue) for issue in issues]
212        error_message_to_erroneous_issue_data[label] = issue_data
213
214    return error_message_to_erroneous_issue_data
215
216
217def get_issue_text(
218    label_to_issue_data: dict[str, list[IssueData]],
219    error_message_to_erroneous_issue_data: dict[str, list[IssueData]],
220) -> str:
221    tz = timezone("america/new_york")
222    current_datetime: str = datetime.now(tz).strftime(f"{DATETIME_FORMAT} (%Z)")
223
224    highest_ranking_issues_lines: list[str] = get_highest_ranking_issues_lines(
225        label_to_issue_data
226    )
227
228    issue_text_lines: list[str] = [
229        f"*Updated on {current_datetime}*",
230        *highest_ranking_issues_lines,
231        "",
232        "---\n",
233    ]
234
235    erroneous_issues_lines: list[str] = get_erroneous_issues_lines(
236        error_message_to_erroneous_issue_data
237    )
238
239    if erroneous_issues_lines:
240        core_labels_text: str = ", ".join(
241            f'"{core_label}"' for core_label in CORE_LABELS
242        )
243        ignored_labels_text: str = ", ".join(
244            f'"{ignored_label}"' for ignored_label in IGNORED_LABELS
245        )
246
247        issue_text_lines.extend(
248            [
249                "## errors with issues (this section only shows when there are errors with issues)\n",
250                f"This script expects every issue to have at least one of the following core labels: {core_labels_text}",
251                f"This script currently ignores issues that have one of the following labels: {ignored_labels_text}\n",
252                "### what to do?\n",
253                "- Adjust the core labels on an issue to put it into a correct state or add a currently-ignored label to the issue",
254                "- Adjust the core and ignored labels registered in this script",
255                *erroneous_issues_lines,
256                "",
257                "---\n",
258            ]
259        )
260
261    issue_text_lines.extend(
262        [
263            "*For details on how this issue is generated, [see the script](https://github.com/zed-industries/zed/blob/main/script/update_top_ranking_issues/main.py)*",
264        ]
265    )
266
267    return "\n".join(issue_text_lines)
268
269
270def get_highest_ranking_issues_lines(
271    label_to_issue_data: dict[str, list[IssueData]],
272) -> list[str]:
273    highest_ranking_issues_lines: list[str] = []
274
275    if label_to_issue_data:
276        for label, issue_data in label_to_issue_data.items():
277            highest_ranking_issues_lines.append(f"\n## {label}\n")
278
279            for i, issue_data in enumerate(issue_data):
280                markdown_bullet_point: str = (
281                    f"{issue_data.url} ({issue_data.like_count} :thumbsup:)"
282                )
283
284                markdown_bullet_point = f"{i + 1}. {markdown_bullet_point}"
285                highest_ranking_issues_lines.append(markdown_bullet_point)
286
287    return highest_ranking_issues_lines
288
289
290def get_erroneous_issues_lines(
291    error_message_to_erroneous_issue_data,
292) -> list[str]:
293    erroneous_issues_lines: list[str] = []
294
295    if error_message_to_erroneous_issue_data:
296        for (
297            error_message,
298            erroneous_issue_data,
299        ) in error_message_to_erroneous_issue_data.items():
300            erroneous_issues_lines.append(f"\n#### {error_message}\n")
301
302            for erroneous_issue_data in erroneous_issue_data:
303                erroneous_issues_lines.append(f"- {erroneous_issue_data.url}")
304
305    return erroneous_issues_lines
306
307
308if __name__ == "__main__":
309    app()
310
311# TODO: Sort label output into core and non core sections