select-sentry-crash-candidates

  1#!/usr/bin/env python3
  2"""Select top Sentry crash candidates ranked by solvability x impact.
  3
  4Usage:
  5    script/select-sentry-crash-candidates --top 3 --output /tmp/candidates.json
  6"""
  7
  8import argparse
  9import configparser
 10import json
 11import math
 12import os
 13import sys
 14import urllib.error
 15import urllib.parse
 16import urllib.request
 17
 18SENTRY_BASE_URL = "https://sentry.io/api/0"
 19DEFAULT_SENTRY_ORG = "zed-dev"
 20DEFAULT_QUERY = "is:unresolved issue.category:error"
 21
 22
 23class FetchError(Exception):
 24    pass
 25
 26
 27def find_auth_token() -> str | None:
 28    token = os.environ.get("SENTRY_AUTH_TOKEN")
 29    if token:
 30        return token
 31
 32    sentryclirc_path = os.path.expanduser("~/.sentryclirc")
 33    if os.path.isfile(sentryclirc_path):
 34        config = configparser.ConfigParser()
 35        try:
 36            config.read(sentryclirc_path)
 37            token = config.get("auth", "token", fallback=None)
 38            if token:
 39                return token
 40        except configparser.Error:
 41            return None
 42
 43    return None
 44
 45
 46def api_get(path: str, token: str):
 47    url = f"{SENTRY_BASE_URL}{path}"
 48    request = urllib.request.Request(url)
 49    request.add_header("Authorization", f"Bearer {token}")
 50    request.add_header("Accept", "application/json")
 51
 52    try:
 53        with urllib.request.urlopen(request, timeout=30) as response:
 54            return json.loads(response.read().decode("utf-8"))
 55    except urllib.error.HTTPError as error:
 56        body = error.read().decode("utf-8", errors="replace")
 57        try:
 58            detail = json.loads(body).get("detail", body)
 59        except (json.JSONDecodeError, AttributeError):
 60            detail = body
 61        raise FetchError(f"Sentry API returned HTTP {error.code} for {path}: {detail}")
 62    except urllib.error.URLError as error:
 63        raise FetchError(f"Failed to connect to Sentry API: {error.reason}")
 64
 65
 66def fetch_issues(token: str, organization: str, limit: int, query: str):
 67    encoded_query = urllib.parse.quote(query)
 68    path = (
 69        f"/organizations/{organization}/issues/"
 70        f"?limit={limit}&sort=freq&query={encoded_query}"
 71    )
 72    return api_get(path, token)
 73
 74
 75def fetch_latest_event(token: str, issue_id: str):
 76    return api_get(f"/issues/{issue_id}/events/latest/", token)
 77
 78
 79def parse_int(value, fallback=0) -> int:
 80    try:
 81        return int(value)
 82    except (TypeError, ValueError):
 83        return fallback
 84
 85
 86def in_app_frame_count(event) -> int:
 87    entries = event.get("entries", [])
 88    count = 0
 89    for entry in entries:
 90        if entry.get("type") != "exception":
 91            continue
 92        exceptions = entry.get("data", {}).get("values", [])
 93        for exception in exceptions:
 94            frames = (exception.get("stacktrace") or {}).get("frames") or []
 95            count += sum(1 for frame in frames if frame.get("inApp") or frame.get("in_app"))
 96    return count
 97
 98
 99def crash_signal_text(issue, event) -> str:
100    title = (issue.get("title") or "").lower()
101    culprit = (issue.get("culprit") or "").lower()
102    message = ""
103
104    entries = event.get("entries", [])
105    for entry in entries:
106        if entry.get("type") != "exception":
107            continue
108        exceptions = entry.get("data", {}).get("values", [])
109        for exception in exceptions:
110            value = exception.get("value")
111            if value:
112                message = value.lower()
113                break
114        if message:
115            break
116
117    return f"{title} {culprit} {message}".strip()
118
119
120def solvable_factor(issue, event) -> tuple[float, list[str]]:
121    factor = 0.6
122    reasons: list[str] = []
123
124    in_app_frames = in_app_frame_count(event)
125    if in_app_frames >= 6:
126        factor += 0.5
127        reasons.append("strong in-app stack coverage")
128    elif in_app_frames >= 3:
129        factor += 0.3
130        reasons.append("moderate in-app stack coverage")
131    else:
132        factor -= 0.1
133        reasons.append("limited in-app stack coverage")
134
135    signal_text = crash_signal_text(issue, event)
136    if "panic" in signal_text or "assert" in signal_text:
137        factor += 0.2
138        reasons.append("panic/assert style failure")
139
140    if "out of memory" in signal_text or "oom" in signal_text:
141        factor -= 0.35
142        reasons.append("likely resource/system failure")
143
144    if "segmentation fault" in signal_text or "sigsegv" in signal_text:
145        factor -= 0.2
146        reasons.append("low-level crash signal")
147
148    level = (issue.get("level") or "").lower()
149    if level == "error":
150        factor += 0.1
151
152    return max(0.2, min(1.5, factor)), reasons
153
154
155def candidate_payload(issue, event):
156    issue_id = str(issue.get("id"))
157    short_id = issue.get("shortId") or issue_id
158    issue_count = parse_int(issue.get("count"), 0)
159    user_count = parse_int(issue.get("userCount"), 0)
160    population_score = issue_count + (user_count * 10)
161    solvability, reasons = solvable_factor(issue, event)
162
163    score = int(math.floor(population_score * solvability))
164    issue_url = f"https://sentry.io/organizations/{DEFAULT_SENTRY_ORG}/issues/{issue_id}/"
165
166    return {
167        "issue_id": issue_id,
168        "short_id": short_id,
169        "title": issue.get("title") or "Unknown",
170        "count": issue_count,
171        "user_count": user_count,
172        "population_score": population_score,
173        "solvability_factor": round(solvability, 2),
174        "score": score,
175        "sentry_url": issue_url,
176        "reasons": reasons,
177    }
178
179
180def main() -> int:
181    parser = argparse.ArgumentParser(
182        description="Select top Sentry crash candidates ranked by solvability x impact."
183    )
184    parser.add_argument("--org", default=DEFAULT_SENTRY_ORG, help="Sentry organization slug")
185    parser.add_argument("--query", default=DEFAULT_QUERY, help="Sentry issue query")
186    parser.add_argument("--top", type=int, default=3, help="Number of candidates to select")
187    parser.add_argument(
188        "--sample-size",
189        type=int,
190        default=25,
191        help="Number of unresolved issues to consider before ranking",
192    )
193    parser.add_argument("--output", required=True, help="Output JSON file path")
194    args = parser.parse_args()
195
196    token = find_auth_token()
197    if not token:
198        print(
199            "Error: No Sentry auth token found. Set SENTRY_AUTH_TOKEN or run sentry-cli login.",
200            file=sys.stderr,
201        )
202        return 1
203
204    try:
205        issues = fetch_issues(token, args.org, args.sample_size, args.query)
206    except FetchError as error:
207        print(f"Error fetching issues: {error}", file=sys.stderr)
208        return 1
209
210    candidates = []
211    for issue in issues:
212        issue_id = issue.get("id")
213        if not issue_id:
214            continue
215
216        try:
217            event = fetch_latest_event(token, str(issue_id))
218        except FetchError:
219            continue
220
221        candidates.append(candidate_payload(issue, event))
222
223    candidates.sort(key=lambda candidate: candidate["score"], reverse=True)
224    selected = candidates[: max(1, args.top)]
225
226    output = {
227        "organization": args.org,
228        "query": args.query,
229        "sample_size": args.sample_size,
230        "top": args.top,
231        "selected": selected,
232    }
233
234    with open(args.output, "w", encoding="utf-8") as file:
235        json.dump(output, file, indent=2)
236
237    print(json.dumps(output, indent=2))
238    return 0
239
240
241if __name__ == "__main__":
242    sys.exit(main())