sweepai/sweep

Sweep: Downgrade this to a warning

kevinlu1248 opened this issue ยท 1 comments

Here's the logs that trigger this, find it in the codebase.

2024-05-20 19:13:08.533 | ERROR    | sweepai.utils.ticket_rendering_utils:get_failing_gha_logs:203 - no jobs for this run: WorkflowRun(url="https://api.github.com/repos/sweepai/sweep/actions/runs/9163747975", id=9163747975), continuing...

๐Ÿš€ Here's the PR! #3808

๐Ÿ’Ž Sweep Pro: You have unlimited Sweep issues

Actions

  • โ†ป Restart Sweep

Step 1: ๐Ÿ”Ž Searching

Here are the code search results. I'm now analyzing these search results to write the PR.

Relevant files (click to expand). Mentioned files will always appear here.

"""
on_ticket is the main function that is called when a new issue is created.
It is only called by the webhook handler in sweepai/api.py.
"""
import difflib
import io
import os
import re
import zipfile
import markdown
import requests
from github import Github, Repository
from github.PullRequest import PullRequest
from github.Issue import Issue
from loguru import logger
from tqdm import tqdm
import hashlib
from sweepai.agents.modify_utils import parse_fcr
from sweepai.agents.pr_description_bot import PRDescriptionBot
from sweepai.chat.api import posthog_trace
from sweepai.config.client import (
RESTART_SWEEP_BUTTON,
SweepConfig,
)
from sweepai.core.entities import (
FileChangeRequest,
SandboxResponse,
)
from sweepai.core.entities import create_error_logs as entities_create_error_logs
from sweepai.dataclasses.codereview import CodeReview, CodeReviewIssue
from sweepai.handlers.create_pr import (
safe_delete_sweep_branch,
)
from sweepai.handlers.on_check_suite import clean_gh_logs
from sweepai.utils.buttons import create_action_buttons
from sweepai.utils.chat_logger import ChatLogger
from sweepai.utils.diff import generate_diff
from sweepai.utils.github_utils import (
CURRENT_USERNAME,
get_github_client,
get_token,
)
from sweepai.utils.str_utils import (
BOT_SUFFIX,
blockquote,
bot_suffix,
clean_logs,
create_collapsible,
discord_suffix,
format_sandbox_success,
sep,
stars_suffix,
)
from sweepai.utils.ticket_utils import (
center,
fire_and_forget_wrapper,
)
from sweepai.utils.user_settings import UserSettings
sweeping_gif = """<a href="https://github.com/sweepai/sweep"><img class="swing" src="https://raw.githubusercontent.com/sweepai/sweep/main/.assets/sweeping.gif" width="100" style="width:50px; margin-bottom:10px" alt="Sweeping"></a>"""
custom_config = """
extends: relaxed
rules:
line-length: disable
indentation: disable
"""
INSTRUCTIONS_FOR_REVIEW = """\
### ๐Ÿ’ก To get Sweep to edit this pull request, you can:
* Comment below, and Sweep can edit the entire PR
* Comment on a file, Sweep will only modify the commented file
* Edit the original issue to get Sweep to recreate the PR from scratch"""
email_template = """Hey {name},
<br/><br/>
๐Ÿš€ I just finished creating a pull request for your issue ({repo_full_name}#{issue_number}) at <a href="{pr_url}">{repo_full_name}#{pr_number}</a>!
<br/><br/>
<h2>Summary</h2>
<blockquote>
{summary}
</blockquote>
<h2>Files Changed</h2>
<ul>
{files_changed}
</ul>
{sweeping_gif}
<br/>
Cheers,
<br/>
Sweep
<br/>"""
FAILING_GITHUB_ACTION_PROMPT = """\
The following Github Actions failed on a previous attempt at fixing this issue.
Propose a fix to the failing github actions. You must edit the source code, not the github action itself.
{github_action_log}
"""
SWEEP_PR_REVIEW_HEADER = "# Sweep: PR Review"
# Add :eyes: emoji to ticket
def add_emoji(issue: Issue, comment_id: int = None, reaction_content="eyes"):
item_to_react_to = issue.get_comment(comment_id) if comment_id else issue
item_to_react_to.create_reaction(reaction_content)
# Add :eyes: emoji to ticket
def add_emoji_to_pr(pr: PullRequest, comment_id: int = None, reaction_content="eyes"):
item_to_react_to = pr.get_comment(comment_id) if comment_id else pr
item_to_react_to.create_reaction(reaction_content)
# If SWEEP_BOT reacted to item_to_react_to with "rocket", then remove it.
def remove_emoji(issue: Issue, comment_id: int = None, content_to_delete="eyes"):
item_to_react_to = issue.get_comment(comment_id) if comment_id else issue
reactions = item_to_react_to.get_reactions()
for reaction in reactions:
if (
reaction.content == content_to_delete
and reaction.user.login == CURRENT_USERNAME
):
item_to_react_to.delete_reaction(reaction.id)
def create_error_logs(
commit_url_display: str,
sandbox_response: SandboxResponse,
status: str = "โœ“",
):
return (
(
"<br/>"
+ create_collapsible(
f"Sandbox logs for {commit_url_display} {status}",
blockquote(
"\n\n".join(
[
create_collapsible(
f"<code>{output}</code> {i + 1}/{len(sandbox_response.outputs)} {format_sandbox_success(sandbox_response.success)}",
f"<pre>{clean_logs(output)}</pre>",
i == len(sandbox_response.outputs) - 1,
)
for i, output in enumerate(sandbox_response.outputs)
if len(sandbox_response.outputs) > 0
]
)
),
opened=True,
)
)
if sandbox_response
else ""
)
# takes in a list of workflow runs and returns a list of messages containing the logs of the failing runs
def get_failing_gha_logs(runs, installation_id) -> str:
token = get_token(installation_id)
all_logs = ""
for run in runs:
# jobs_url
jobs_url = run.jobs_url
jobs_response = requests.get(
jobs_url,
headers={
"Accept": "application/vnd.github+json",
"Authorization": f"Bearer {token}",
"X-GitHub-Api-Version": "2022-11-28",
},
)
if jobs_response.status_code == 200:
failed_jobs = []
jobs = jobs_response.json()["jobs"]
for job in jobs:
if job["conclusion"] == "failure":
failed_jobs.append(job)
failed_jobs_name_list = []
for job in failed_jobs:
# add failed steps
for step in job["steps"]:
if step["conclusion"] == "failure":
failed_jobs_name_list.append(
f"{job['name']}/{step['number']}_{step['name']}"
)
else:
logger.error(
"Failed to get jobs for failing github actions, possible a credentials issue"
)
return all_logs
# make sure jobs in valid
if jobs_response.json()['total_count'] == 0:
logger.error(f"no jobs for this run: {run}, continuing...")
continue
# logs url
logs_url = run.logs_url
logs_response = requests.get(
logs_url,
headers={
"Accept": "application/vnd.github+json",
"Authorization": f"Bearer {token}",
"X-GitHub-Api-Version": "2022-11-28",
},
allow_redirects=True,
)
# Check if the request was successful
if logs_response.status_code == 200:
zip_data = io.BytesIO(logs_response.content)
zip_file = zipfile.ZipFile(zip_data, "r")
zip_file_names = zip_file.namelist()
for file in failed_jobs_name_list:
if f"{file}.txt" in zip_file_names:
logs = zip_file.read(f"{file}.txt").decode("utf-8")
logs_prompt = clean_gh_logs(logs)
all_logs += logs_prompt + "\n"
else:
logger.error(
"Failed to get logs for failing github actions, likely a credentials issue"
)
return all_logs
def delete_old_prs(repo: Repository, issue_number: int):
logger.info("Deleting old PRs...")
prs = repo.get_pulls(
state="open",
sort="created",
direction="desc",
base=SweepConfig.get_branch(repo),
)
for pr in tqdm(prs.get_page(0)):
# # Check if this issue is mentioned in the PR, and pr is owned by bot
# # This is done in create_pr, (pr_description = ...)
if pr.user.login == CURRENT_USERNAME and f"Fixes #{issue_number}.\n" in pr.body:
safe_delete_sweep_branch(pr, repo)
break
def get_comment_header(
index: int,
g: Github,
repo_full_name: str,
progress_headers: list[None | str],
tracking_id: str | None,
payment_message_start: str,
errored: bool = False,
pr_message: str = "",
done: bool = False,
initial_sandbox_response: int | SandboxResponse = -1,
initial_sandbox_response_file=None,
config_pr_url: str | None = None,
):
config_pr_message = (
"\n"
+ f"<div align='center'>Install Sweep Configs: <a href='{config_pr_url}'>Pull Request</a></div>"
if config_pr_url is not None
else ""
)
actions_message = create_action_buttons(
[
RESTART_SWEEP_BUTTON,
]
)
sandbox_execution_message = "\n\n## GitHub Actions failed\n\nThe sandbox appears to be unavailable or down.\n\n"
if initial_sandbox_response == -1:
sandbox_execution_message = ""
elif initial_sandbox_response is not None:
repo = g.get_repo(repo_full_name)
commit_hash = repo.get_commits()[0].sha
success = initial_sandbox_response.outputs and initial_sandbox_response.success
status = "โœ“" if success else "X"
sandbox_execution_message = (
"\n\n## GitHub Actions"
+ status
+ "\n\nHere are the GitHub Actions logs prior to making any changes:\n\n"
)
sandbox_execution_message += entities_create_error_logs(
f'<a href="https://github.com/{repo_full_name}/commit/{commit_hash}"><code>{commit_hash[:7]}</code></a>',
initial_sandbox_response,
initial_sandbox_response_file,
)
if success:
sandbox_execution_message += f"\n\nSandbox passed on the latest `{repo.default_branch}`, so sandbox checks will be enabled for this issue."
else:
sandbox_execution_message += "\n\nSandbox failed, so all sandbox checks will be disabled for this issue."
if index < 0:
index = 0
if index == 4:
return (
pr_message
+ config_pr_message
+ f"\n\n{actions_message}"
)
total = len(progress_headers)
index += 1 if done else 0
index *= 100 / total
index = int(index)
index = min(100, index)
if errored:
pbar = f"\n\n<img src='https://progress-bar.dev/{index}/?&title=Errored&width=600' alt='{index}%' />"
return (
f"{center(sweeping_gif)}<br/>{center(pbar)}\n\n"
+ f"\n\n{actions_message}"
)
pbar = f"\n\n<img src='https://progress-bar.dev/{index}/?&title=Progress&width=600' alt='{index}%' />"
return (
f"{center(sweeping_gif)}"
+ f"<br/>{center(pbar)}"
+ ("\n" + stars_suffix if index != -1 else "")
+ "\n"
+ center(payment_message_start)
+ config_pr_message
+ f"\n\n{actions_message}"
)
def process_summary(summary, issue_number, repo_full_name, installation_id):
summary = summary or ""
summary = re.sub(
"<details (open)?>(\r)?\n<summary>Checklist</summary>.*",
"",
summary,
flags=re.DOTALL,
).strip()
summary = re.sub(
"---\s+Checklist:(\r)?\n(\r)?\n- \[[ X]\].*",
"",
summary,
flags=re.DOTALL,
).strip()
summary = re.sub(
"### Details\n\n_No response_", "", summary, flags=re.DOTALL
)
summary = re.sub("\n\n", "\n", summary, flags=re.DOTALL)
repo_name = repo_full_name
user_token, g = get_github_client(installation_id)
repo = g.get_repo(repo_full_name)
current_issue: Issue = repo.get_issue(number=issue_number)
assignee = current_issue.assignee.login if current_issue.assignee else None
if assignee is None:
assignee = current_issue.user.login
branch_match = re.search(
r"([B|b]ranch:) *(?P<branch_name>.+?)(\s|$)", summary
)
overrided_branch_name = None
if branch_match and "branch_name" in branch_match.groupdict():
overrided_branch_name = (
branch_match.groupdict()["branch_name"].strip().strip("`\"'")
)
# TODO: this code might be finicky, might have missed edge cases
if overrided_branch_name.startswith("https://github.com/"):
overrided_branch_name = overrided_branch_name.split("?")[0].split(
"tree/"
)[-1]
SweepConfig.get_branch(repo, overrided_branch_name)
return summary,repo_name,user_token,g,repo,current_issue,assignee,overrided_branch_name
def raise_on_no_file_change_requests(title, summary, edit_sweep_comment, file_change_requests):
if not file_change_requests:
if len(title + summary) < 60:
edit_sweep_comment(
(
"Sorry, I could not find any files to modify, can you please"
" provide more details? Please make sure that the title and"
" summary of the issue are at least 60 characters."
),
-1,
)
else:
edit_sweep_comment(
(
"Sorry, I could not find any files to modify, can you please"
" provide more details?"
),
-1,
)
raise Exception("No files to modify.")
def rewrite_pr_description(issue_number, repo, overrided_branch_name, pull_request, pr_changes):
# change the body here
diff_text = get_branch_diff_text(
repo=repo,
branch=pull_request.branch_name,
base_branch=overrided_branch_name,
)
new_description = PRDescriptionBot().describe_diffs(
diff_text,
pull_request.title,
) # TODO: update the title as well
if new_description:
pr_changes.body = (
f"{new_description}\n\nFixes"
f" #{issue_number}.\n\n---\n\n{INSTRUCTIONS_FOR_REVIEW}{BOT_SUFFIX}"
)
return pr_changes
def send_email_to_user(title, issue_number, username, repo_full_name, tracking_id, repo_name, g, file_change_requests, pr_changes, pr):
user_settings = UserSettings.from_username(username=username)
user = g.get_user(username)
full_name = user.name or user.login
name = full_name.split(" ")[0]
files_changed = []
for fcr in file_change_requests:
if fcr.change_type in ("create", "modify"):
diff = list(
difflib.unified_diff(
(fcr.old_content or "").splitlines() or [],
(fcr.new_content or "").splitlines() or [],
lineterm="",
)
)
added = sum(
1
for line in diff
if line.startswith("+") and not line.startswith("+++")
)
removed = sum(
1
for line in diff
if line.startswith("-") and not line.startswith("---")
)
files_changed.append(
f"<code>{fcr.filename}</code> (+{added}/-{removed})"
)
user_settings.send_email(
subject=f"Sweep Pull Request Complete for {repo_name}#{issue_number} {title}",
html=email_template.format(
name=name,
pr_url=pr.html_url,
issue_number=issue_number,
repo_full_name=repo_full_name,
pr_number=pr.number,
summary=markdown.markdown(pr_changes.body),
files_changed="\n".join(
[f"<li>{item}</li>" for item in files_changed]
),
sweeping_gif=sweeping_gif,
),
)
def handle_empty_repository(comment_id, current_issue, progress_headers, issue_comment):
first_comment = (
"Sweep is currently not supported on empty repositories. Please add some"
f" code to your repository and try again.\n{sep}##"
f" {progress_headers[1]}\n{bot_suffix}{discord_suffix}"
)
if issue_comment is None:
issue_comment = current_issue.create_comment(
first_comment + BOT_SUFFIX
)
else:
issue_comment.edit(first_comment + BOT_SUFFIX)
fire_and_forget_wrapper(add_emoji)(
current_issue, comment_id, reaction_content="confused"
)
fire_and_forget_wrapper(remove_emoji)(content_to_delete="eyes")
def get_branch_diff_text(repo, branch, base_branch=None):
base_branch = base_branch or SweepConfig.get_branch(repo)
comparison = repo.compare(base_branch, branch)
file_diffs = comparison.files
pr_diffs = []
for file in file_diffs:
diff = file.patch
if (
file.status == "added"
or file.status == "modified"
or file.status == "removed"
):
pr_diffs.append((file.filename, diff))
else:
logger.info(
f"File status {file.status} not recognized"
) # TODO(sweep): We don't handle renamed files
return "\n".join([f"{filename}\n{diff}" for filename, diff in pr_diffs])
def get_payment_messages(chat_logger: ChatLogger):
if chat_logger:
is_paying_user = chat_logger.is_paying_user()
is_consumer_tier = chat_logger.is_consumer_tier()
use_faster_model = chat_logger.use_faster_model()
else:
is_paying_user = True
is_consumer_tier = False
use_faster_model = False
# Find the first comment made by the bot
tickets_allocated = 5
if is_consumer_tier:
tickets_allocated = 15
if is_paying_user:
tickets_allocated = 500
purchased_ticket_count = (
chat_logger.get_ticket_count(purchased=True) if chat_logger else 0
)
ticket_count = (
max(tickets_allocated - chat_logger.get_ticket_count(), 0)
+ purchased_ticket_count
if chat_logger
else 999
)
daily_ticket_count = (
(3 - chat_logger.get_ticket_count(use_date=True) if not use_faster_model else 0)
if chat_logger
else 999
)
single_payment_link = "https://buy.stripe.com/00g3fh7qF85q0AE14d"
pro_payment_link = "https://buy.stripe.com/00g5npeT71H2gzCfZ8"
daily_message = (
f" and {daily_ticket_count} for the day"
if not is_paying_user and not is_consumer_tier
else ""
)
user_type = "๐Ÿ’Ž <b>Sweep Pro</b>" if is_paying_user else "โšก <b>Sweep Basic Tier</b>"
gpt_tickets_left_message = (
f"{ticket_count} Sweep issues left for the month"
if not is_paying_user
else "unlimited Sweep issues"
)
purchase_message = f"<br/><br/> For more Sweep issues, visit <a href={single_payment_link}>our payment portal</a>. For a one week free trial, try <a href={pro_payment_link}>Sweep Pro</a> (unlimited GPT-4 tickets)."
payment_message = (
f"{user_type}: You have {gpt_tickets_left_message}{daily_message}"
+ (purchase_message if not is_paying_user else "")
)
payment_message_start = (
f"{user_type}: You have {gpt_tickets_left_message}{daily_message}"
+ (purchase_message if not is_paying_user else "")
)
return payment_message, payment_message_start
def parse_issues_from_code_review(issue_string: str):
issue_regex = r'<issue>(?P<issue>.*?)<\/issue>'
issue_matches = list(re.finditer(issue_regex, issue_string, re.DOTALL))
potential_issues = set()
for issue in issue_matches:
issue_content = issue.group('issue')
issue_params = ['issue_description', 'start_line', 'end_line']
issue_args = {}
issue_failed = False
for param in issue_params:
regex = rf'<{param}>(?P<{param}>.*?)<\/{param}>'
result = re.search(regex, issue_content, re.DOTALL)
try:
issue_args[param] = result.group(param).strip()
except AttributeError:
issue_failed = True
break
if not issue_failed:
potential_issues.add(CodeReviewIssue(**issue_args))
return list(potential_issues)
# converts the list of issues inside a code_review into markdown text to display in a github comment
@posthog_trace
def render_code_review_issues(username: str, pr: PullRequest, code_review: CodeReview, issue_type: str = "", metadata: dict = {}):
files_to_blobs = {file.filename: file.blob_url for file in list(pr.get_files())}
# generate the diff urls
files_to_diffs = {}
for file_name, _ in files_to_blobs.items():
sha_256 = hashlib.sha256(file_name.encode('utf-8')).hexdigest()
files_to_diffs[file_name] = f"{pr.html_url}/files#diff-{sha_256}"
code_issues = code_review.issues
if issue_type == "potential":
code_issues = code_review.potential_issues
code_issues_string = ""
for issue in code_issues:
if code_review.file_name in files_to_blobs:
if issue.start_line == issue.end_line:
issue_blob_url = f"{files_to_blobs[code_review.file_name]}#L{issue.start_line}"
issue_diff_url = f"{files_to_diffs[code_review.file_name]}R{issue.start_line}"
else:
issue_blob_url = f"{files_to_blobs[code_review.file_name]}#L{issue.start_line}-L{issue.end_line}"
issue_diff_url = f"{files_to_diffs[code_review.file_name]}R{issue.start_line}-R{issue.end_line}"
code_issues_string += f"<li>{issue.issue_description}</li>\n\n{issue_blob_url}\n[View Diff]({issue_diff_url})"
return code_issues_string
def escape_html(text: str) -> str:
return text.replace('<', '&lt;').replace('>', '&gt;')
# make sure code blocks are render properly in github comments markdown
def format_code_sections(text: str) -> str:
backtick_count = text.count("`")
if backtick_count % 2 != 0:
# If there's an odd number of backticks, return the original text
return text
result = []
last_index = 0
inside_code = False
while True:
try:
index = text.index('`', last_index)
result.append(text[last_index:index])
if inside_code:
result.append('</code>')
else:
result.append('<code>')
inside_code = not inside_code
last_index = index + 1
except ValueError:
# No more backticks found
break
result.append(text[last_index:])
formatted_text = ''.join(result)
# Escape HTML characters within <code> tags
formatted_text = formatted_text.replace('<code>', '<code>').replace('</code>', '</code>')
parts = formatted_text.split('<code>')
for i in range(1, len(parts)):
code_content, rest = parts[i].split('</code>', 1)
parts[i] = escape_html(code_content) + '</code>' + rest
return '<code>'.join(parts)
# turns code_review_by_file into markdown string
@posthog_trace
def render_pr_review_by_file(username: str, pr: PullRequest, code_review_by_file: dict[str, CodeReview], dropped_files: list[str] = [], metadata: dict = {}) -> str:
body = f"{SWEEP_PR_REVIEW_HEADER}\n"
reviewed_files = ""
for file_name, code_review in code_review_by_file.items():
sweep_issues = code_review.issues
potential_issues = code_review.potential_issues
reviewed_files += f"""<details open>
<summary>{file_name}</summary>
<p>{format_code_sections(code_review.diff_summary)}</p>"""
if sweep_issues:
sweep_issues_string = render_code_review_issues(username, pr, code_review)
reviewed_files += f"<p><strong>Sweep Found These Issues</strong></p><ul>{format_code_sections(sweep_issues_string)}</ul>"
if potential_issues:
potential_issues_string = render_code_review_issues(username, pr, code_review, issue_type="potential")
reviewed_files += f"<details><summary><strong>Potential Issues</strong></summary><p>Sweep isn't 100% sure if the following are issues or not but they may be worth taking a look at.</p><ul>{format_code_sections(potential_issues_string)}</ul></details>"
reviewed_files += "</details><hr>"
if len(dropped_files) == 1:
reviewed_files += f"<p>{dropped_files[0]} was not reviewed because our filter identified it as typically a non-human-readable or less important file (e.g., dist files, package.json, images). If this is an error, please let us know.</p>"
elif len(dropped_files) > 1:
dropped_files_string = "".join([f"<li>{file}</li>" for file in dropped_files])
reviewed_files += f"<p>The following files were not reviewed because our filter identified them as typically non-human-readable or less important files (e.g., dist files, package.json, images). If this is an error, please let us know.</p><ul>{dropped_files_string}</ul>"
return body + reviewed_files
# handles the creation or update of the Sweep comment letting the user know that Sweep is reviewing a pr
# returns the comment_id
@posthog_trace
def create_update_review_pr_comment(username: str, pr: PullRequest, code_review_by_file: dict[str, CodeReview] | None = None, dropped_files: list[str] = [], metadata: dict = {}) -> int:
comment_id = -1
sweep_comment = None
# comments that appear in the github ui in the conversation tab are considered issue comments
pr_comments = list(pr.get_issue_comments())
# make sure we don't already have a comment created
for comment in pr_comments:
# a comment has already been created
if comment.body.startswith(SWEEP_PR_REVIEW_HEADER):
comment_id = comment.id
sweep_comment = comment
break
# comment has not yet been created
if not sweep_comment:
sweep_comment = pr.create_issue_comment(f"{SWEEP_PR_REVIEW_HEADER}\nSweep is currently reviewing your pr...")
# update body of sweep_comment
if code_review_by_file:
rendered_pr_review = render_pr_review_by_file(username, pr, code_review_by_file, dropped_files=dropped_files)
sweep_comment.edit(rendered_pr_review)
comment_id = sweep_comment.id
return comment_id
def render_fcrs(file_change_requests: list[FileChangeRequest]):
# Render plan start
planning_markdown = ""
for fcr in file_change_requests:
parsed_fcr = parse_fcr(fcr)
if parsed_fcr and parsed_fcr["new_code"]:
planning_markdown += f"#### `{fcr.filename}`\n"
planning_markdown += f"{blockquote(parsed_fcr['justification'])}\n\n"
if parsed_fcr["original_code"] and parsed_fcr["original_code"][0].strip():
planning_markdown += f"""```diff\n{generate_diff(
parsed_fcr["original_code"][0],
parsed_fcr["new_code"][0],
)}\n```\n"""
else:
_file_base_name, ext = os.path.splitext(fcr.filename)
planning_markdown += f"```{ext}\n{parsed_fcr['new_code'][0]}\n```\n"
else:

from __future__ import annotations
import os
import traceback
from functools import lru_cache
import github
import yaml
from github.Repository import Repository
from loguru import logger
from pydantic import BaseModel
from sweepai.core.entities import EmptyRepository
from sweepai.utils.file_utils import read_file_with_fallback_encodings
class SweepConfig(BaseModel):
include_dirs: list[str] = []
exclude_dirs: list[str] = [
".git",
"node_modules",
"build",
".venv",
"venv",
"patch",
"packages/blobs",
"dist",
]
exclude_path_dirs: list[str] = ["node_modules", "build", ".venv", "venv", ".git", "dist"]
exclude_substrings_aggressive: list[str] = [ # aggressively filter out file paths, may drop some relevant files
"integration",
".spec",
".test",
".json",
"test"
]
include_exts: list[str] = [
".cs",
".csharp",
".py",
".md",
".txt",
".ts",
".tsx",
".js",
".jsx",
".mjs",
]
exclude_exts: list[str] = [
".min.js",
".min.js.map",
".min.css",
".min.css.map",
".tfstate",
".tfstate.backup",
".jar",
".ipynb",
".png",
".jpg",
".jpeg",
".download",
".gif",
".bmp",
".tiff",
".ico",
".mp3",
".wav",
".wma",
".ogg",
".flac",
".mp4",
".avi",
".mkv",
".mov",
".patch",
".patch.disabled",
".wmv",
".m4a",
".m4v",
".3gp",
".3g2",
".rm",
".swf",
".flv",
".iso",
".bin",
".tar",
".zip",
".7z",
".gz",
".rar",
".pdf",
".doc",
".docx",
".xls",
".xlsx",
".ppt",
".pptx",
".svg",
".parquet",
".pyc",
".pub",
".pem",
".ttf",
".dfn",
".dfm",
".feature",
"sweep.yaml",
"pnpm-lock.yaml",
"LICENSE",
"poetry.lock",
'package-lock.json',
'package.json',
'pyproject.toml',
'requirements.txt',
'yarn.lock',
'.lockb',
]
# cutoff for when we output truncated versions of strings, this is an arbitrary number and can be changed
truncation_cutoff: int = 20000
# Image formats
max_file_limit: int = 60_000
# github comments
max_github_comment_body_length: int = 65535
# allowed image types for vision
allowed_image_types: list[str] = [
"jpg",
"jpeg",
"webp",
"png"
]
def to_yaml(self) -> str:
return yaml.safe_dump(self.dict())
@classmethod
def from_yaml(cls, yaml_str: str) -> "SweepConfig":
data = yaml.safe_load(yaml_str)
return cls.parse_obj(data)
@staticmethod
@lru_cache()
def get_branch(repo: Repository, override_branch: str | None = None) -> str:
if override_branch:
branch_name = override_branch
try:
repo.get_branch(branch_name)
return branch_name
except github.GithubException:
# try a more robust branch test
branch_name_parts = branch_name.split(" ")[0].split("/")
branch_name_combos = []
for i in range(len(branch_name_parts)):
branch_name_combos.append("/".join(branch_name_parts[i:]))
try:
for i in range(len(branch_name_combos)):
branch_name = branch_name_combos[i]
try:
repo.get_branch(branch_name)
return branch_name
except Exception as e:
if i < len(branch_name_combos) - 1:
continue
else:
raise Exception(f"Branch not found: {e}")
except Exception as e:
logger.exception(
f"Error when getting branch {branch_name}: {e}, traceback: {traceback.format_exc()}"
)
except Exception as e:
logger.exception(
f"Error when getting branch {branch_name}: {e}, traceback: {traceback.format_exc()}"
)
default_branch = repo.default_branch
try:
sweep_yaml_dict = {}
contents = repo.get_contents("sweep.yaml")
sweep_yaml_dict = yaml.safe_load(
contents.decoded_content.decode("utf-8")
)
if "branch" not in sweep_yaml_dict:
return default_branch
branch_name = sweep_yaml_dict["branch"]
try:
repo.get_branch(branch_name)
return branch_name
except Exception as e:
logger.exception(
f"Error when getting branch: {e}, traceback: {traceback.format_exc()}, creating branch"
)
repo.create_git_ref(
f"refs/heads/{branch_name}",
repo.get_branch(default_branch).commit.sha,
)
return branch_name
except Exception:
return default_branch
@staticmethod
def get_config(repo: Repository):
try:
contents = repo.get_contents("sweep.yaml")
config = yaml.safe_load(contents.decoded_content.decode("utf-8"))
return SweepConfig(**config)
except Exception as e:
logger.warning(f"Error when getting config: {e}, returning empty dict")
if "This repository is empty." in str(e):
raise EmptyRepository()
return SweepConfig()
@staticmethod
def get_draft(repo: Repository):
try:
contents = repo.get_contents("sweep.yaml")
config = yaml.safe_load(contents.decoded_content.decode("utf-8"))
return config.get("draft", False)
except Exception as e:
logger.warning(f"Error when getting draft: {e}, returning False")
return False
# returns if file is excluded or not
def is_file_excluded(self, file_path: str) -> bool:
parts = file_path.split(os.path.sep)
for part in parts:
if part in self.exclude_dirs or part in self.exclude_exts:
return True
return False
# returns if file is excluded or not, this version may drop actual relevant files
def is_file_excluded_aggressive(self, dir: str, file_path: str) -> bool:
# tiktoken_client = Tiktoken()
# must exist
if not os.path.exists(os.path.join(dir, file_path)) and not os.path.exists(file_path):
return True
full_path = os.path.join(dir, file_path)
if os.stat(full_path).st_size > 240000 or os.stat(full_path).st_size < 5:
return True
# exclude binary
with open(full_path, "rb") as f:
is_binary = False
for block in iter(lambda: f.read(1024), b""):
if b"\0" in block:
is_binary = True
break
if is_binary:
return True
try:
# fetch file
data = read_file_with_fallback_encodings(full_path)
lines = data.split("\n")
except UnicodeDecodeError:
logger.warning(f"UnicodeDecodeError in is_file_excluded_aggressive: {full_path}, skipping")
return True
line_count = len(lines)
# if average line length is greater than 200, then it is likely not human readable
if len(data)/line_count > 200:
return True
# check token density, if it is greater than 2, then it is likely not human readable
# token_count = tiktoken_client.count(data)
# if token_count == 0:
# return True
# if len(data)/token_count < 2:
# return True
# now check the file name
parts = file_path.split(os.path.sep)
for part in parts:
if part in self.exclude_dirs or part in self.exclude_exts:
return True
for part in self.exclude_substrings_aggressive:
if part in file_path:
return True
return False
@lru_cache(maxsize=None)
def get_gha_enabled(repo: Repository) -> bool:
try:
contents = repo.get_contents("sweep.yaml")
gha_enabled = yaml.safe_load(contents.decoded_content.decode("utf-8")).get(
"gha_enabled", False
)
return gha_enabled
except Exception:
logger.info(
"Error when getting gha enabled, falling back to False"
)
return False
@lru_cache(maxsize=None)
def get_description(repo: Repository) -> dict:
try:
contents = repo.get_contents("sweep.yaml")
sweep_yaml = yaml.safe_load(contents.decoded_content.decode("utf-8"))
description = sweep_yaml.get("description", "")
rules = sweep_yaml.get("rules", [])
rules = "\n * ".join(rules[:3])
return {"description": description, "rules": rules}
except Exception:
return {"description": "", "rules": ""}
@lru_cache(maxsize=None)
def get_sandbox_config(repo: Repository):
try:
contents = repo.get_contents("sweep.yaml")
description = yaml.safe_load(contents.decoded_content.decode("utf-8")).get(
"sandbox", {}
)
return description
except Exception:
return {}
@lru_cache(maxsize=None)
def get_branch_name_config(repo: Repository):
try:
contents = repo.get_contents("sweep.yaml")
description = yaml.safe_load(contents.decoded_content.decode("utf-8")).get(
"branch_use_underscores", False
)
return description
except Exception:
return False
@lru_cache(maxsize=None)
def get_documentation_dict(repo: Repository):
try:
sweep_yaml_content = repo.get_contents("sweep.yaml").decoded_content.decode(
"utf-8"
)
sweep_yaml = yaml.safe_load(sweep_yaml_content)
docs = sweep_yaml.get("docs", {})
return docs
except Exception:
return {}
@lru_cache(maxsize=None)
def get_blocked_dirs(repo: Repository):
try:
sweep_yaml_content = repo.get_contents("sweep.yaml").decoded_content.decode(
"utf-8"
)
sweep_yaml = yaml.safe_load(sweep_yaml_content)
dirs = sweep_yaml.get("blocked_dirs", [])
return dirs
except Exception:
return []
@lru_cache(maxsize=None)
def get_rules(repo: Repository):
try:
sweep_yaml_content = repo.get_contents("sweep.yaml").decoded_content.decode(
"utf-8"
)
sweep_yaml = yaml.safe_load(sweep_yaml_content)
rules = sweep_yaml.get("rules", [])
return rules
except Exception:
return []
# optional, can leave env var blank
GITHUB_APP_CLIENT_ID = os.environ.get("GITHUB_APP_CLIENT_ID", "Iv1.91fd31586a926a9f")
RESTART_SWEEP_BUTTON = "โ†ป Restart Sweep"
SWEEP_GOOD_FEEDBACK = "๐Ÿ‘ Sweep Did Well"
SWEEP_BAD_FEEDBACK = "๐Ÿ‘Ž Sweep Needs Improvement"
RESET_FILE = "Rollback changes to "
REVERT_CHANGED_FILES_TITLE = "## Rollback Files For Sweep"
RULES_TITLE = (
"## Apply [Sweep Rules](https://docs.sweep.dev/usage/config#rules) to your PR?"
)
RULES_LABEL = "**Apply:** "
DEFAULT_RULES = [
"All new business logic should have corresponding unit tests.",
"Refactor large functions to be more modular.",
"Add docstrings to all functions and file headers.",
]
DEFAULT_RULES_STRING = """\
- "All new business logic should have corresponding unit tests."
- "Refactor large functions to be more modular."

"""
on_ticket is the main function that is called when a new issue is created.
It is only called by the webhook handler in sweepai/api.py.
"""
import difflib
import io
import os
import re
import zipfile
import markdown
import requests
from github import Github, Repository
from github.PullRequest import PullRequest
from github.Issue import Issue
from loguru import logger
from tqdm import tqdm
import hashlib
from sweepai.agents.modify_utils import parse_fcr
from sweepai.agents.pr_description_bot import PRDescriptionBot
from sweepai.chat.api import posthog_trace
from sweepai.config.client import (
RESTART_SWEEP_BUTTON,
SweepConfig,
)
from sweepai.core.entities import (
FileChangeRequest,
SandboxResponse,
)
from sweepai.core.entities import create_error_logs as entities_create_error_logs
from sweepai.dataclasses.codereview import CodeReview, CodeReviewIssue
from sweepai.handlers.create_pr import (
safe_delete_sweep_branch,
)
from sweepai.handlers.on_check_suite import clean_gh_logs
from sweepai.utils.buttons import create_action_buttons
from sweepai.utils.chat_logger import ChatLogger
from sweepai.utils.diff import generate_diff
from sweepai.utils.github_utils import (
CURRENT_USERNAME,
get_github_client,
get_token,
)
from sweepai.utils.str_utils import (
BOT_SUFFIX,
blockquote,
bot_suffix,
clean_logs,
create_collapsible,
discord_suffix,
format_sandbox_success,
sep,
stars_suffix,
)
from sweepai.utils.ticket_utils import (
center,
fire_and_forget_wrapper,
)
from sweepai.utils.user_settings import UserSettings
sweeping_gif = """<a href="https://github.com/sweepai/sweep"><img class="swing" src="https://raw.githubusercontent.com/sweepai/sweep/main/.assets/sweeping.gif" width="100" style="width:50px; margin-bottom:10px" alt="Sweeping"></a>"""
custom_config = """
extends: relaxed
rules:
line-length: disable
indentation: disable
"""
INSTRUCTIONS_FOR_REVIEW = """\
### ๐Ÿ’ก To get Sweep to edit this pull request, you can:
* Comment below, and Sweep can edit the entire PR
* Comment on a file, Sweep will only modify the commented file
* Edit the original issue to get Sweep to recreate the PR from scratch"""
email_template = """Hey {name},
<br/><br/>
๐Ÿš€ I just finished creating a pull request for your issue ({repo_full_name}#{issue_number}) at <a href="{pr_url}">{repo_full_name}#{pr_number}</a>!
<br/><br/>
<h2>Summary</h2>
<blockquote>
{summary}
</blockquote>
<h2>Files Changed</h2>
<ul>
{files_changed}
</ul>
{sweeping_gif}
<br/>
Cheers,
<br/>
Sweep
<br/>"""
FAILING_GITHUB_ACTION_PROMPT = """\
The following Github Actions failed on a previous attempt at fixing this issue.
Propose a fix to the failing github actions. You must edit the source code, not the github action itself.
{github_action_log}
"""
SWEEP_PR_REVIEW_HEADER = "# Sweep: PR Review"
# Add :eyes: emoji to ticket
def add_emoji(issue: Issue, comment_id: int = None, reaction_content="eyes"):
item_to_react_to = issue.get_comment(comment_id) if comment_id else issue
item_to_react_to.create_reaction(reaction_content)
# Add :eyes: emoji to ticket
def add_emoji_to_pr(pr: PullRequest, comment_id: int = None, reaction_content="eyes"):
item_to_react_to = pr.get_comment(comment_id) if comment_id else pr
item_to_react_to.create_reaction(reaction_content)
# If SWEEP_BOT reacted to item_to_react_to with "rocket", then remove it.
def remove_emoji(issue: Issue, comment_id: int = None, content_to_delete="eyes"):
item_to_react_to = issue.get_comment(comment_id) if comment_id else issue
reactions = item_to_react_to.get_reactions()
for reaction in reactions:
if (
reaction.content == content_to_delete
and reaction.user.login == CURRENT_USERNAME
):
item_to_react_to.delete_reaction(reaction.id)
def create_error_logs(
commit_url_display: str,
sandbox_response: SandboxResponse,
status: str = "โœ“",
):
return (
(
"<br/>"
+ create_collapsible(
f"Sandbox logs for {commit_url_display} {status}",
blockquote(
"\n\n".join(
[
create_collapsible(
f"<code>{output}</code> {i + 1}/{len(sandbox_response.outputs)} {format_sandbox_success(sandbox_response.success)}",
f"<pre>{clean_logs(output)}</pre>",
i == len(sandbox_response.outputs) - 1,
)
for i, output in enumerate(sandbox_response.outputs)
if len(sandbox_response.outputs) > 0
]
)
),
opened=True,
)
)
if sandbox_response
else ""
)
# takes in a list of workflow runs and returns a list of messages containing the logs of the failing runs
def get_failing_gha_logs(runs, installation_id) -> str:
token = get_token(installation_id)
all_logs = ""
for run in runs:
# jobs_url
jobs_url = run.jobs_url
jobs_response = requests.get(
jobs_url,
headers={
"Accept": "application/vnd.github+json",
"Authorization": f"Bearer {token}",
"X-GitHub-Api-Version": "2022-11-28",
},
)
if jobs_response.status_code == 200:
failed_jobs = []
jobs = jobs_response.json()["jobs"]
for job in jobs:
if job["conclusion"] == "failure":
failed_jobs.append(job)
failed_jobs_name_list = []
for job in failed_jobs:
# add failed steps
for step in job["steps"]:
if step["conclusion"] == "failure":
failed_jobs_name_list.append(
f"{job['name']}/{step['number']}_{step['name']}"
)
else:
logger.error(
"Failed to get jobs for failing github actions, possible a credentials issue"
)
return all_logs
# make sure jobs in valid
if jobs_response.json()['total_count'] == 0:
logger.error(f"no jobs for this run: {run}, continuing...")
continue
# logs url
logs_url = run.logs_url
logs_response = requests.get(
logs_url,
headers={
"Accept": "application/vnd.github+json",
"Authorization": f"Bearer {token}",
"X-GitHub-Api-Version": "2022-11-28",
},
allow_redirects=True,
)
# Check if the request was successful
if logs_response.status_code == 200:
zip_data = io.BytesIO(logs_response.content)
zip_file = zipfile.ZipFile(zip_data, "r")
zip_file_names = zip_file.namelist()
for file in failed_jobs_name_list:
if f"{file}.txt" in zip_file_names:
logs = zip_file.read(f"{file}.txt").decode("utf-8")
logs_prompt = clean_gh_logs(logs)
all_logs += logs_prompt + "\n"
else:
logger.error(
"Failed to get logs for failing github actions, likely a credentials issue"
)
return all_logs
def delete_old_prs(repo: Repository, issue_number: int):
logger.info("Deleting old PRs...")
prs = repo.get_pulls(
state="open",
sort="created",
direction="desc",
base=SweepConfig.get_branch(repo),
)
for pr in tqdm(prs.get_page(0)):
# # Check if this issue is mentioned in the PR, and pr is owned by bot
# # This is done in create_pr, (pr_description = ...)
if pr.user.login == CURRENT_USERNAME and f"Fixes #{issue_number}.\n" in pr.body:
safe_delete_sweep_branch(pr, repo)
break
def get_comment_header(
index: int,
g: Github,
repo_full_name: str,
progress_headers: list[None | str],
tracking_id: str | None,
payment_message_start: str,
errored: bool = False,
pr_message: str = "",
done: bool = False,
initial_sandbox_response: int | SandboxResponse = -1,
initial_sandbox_response_file=None,
config_pr_url: str | None = None,
):
config_pr_message = (
"\n"
+ f"<div align='center'>Install Sweep Configs: <a href='{config_pr_url}'>Pull Request</a></div>"
if config_pr_url is not None
else ""
)
actions_message = create_action_buttons(
[
RESTART_SWEEP_BUTTON,
]
)
sandbox_execution_message = "\n\n## GitHub Actions failed\n\nThe sandbox appears to be unavailable or down.\n\n"
if initial_sandbox_response == -1:
sandbox_execution_message = ""
elif initial_sandbox_response is not None:
repo = g.get_repo(repo_full_name)
commit_hash = repo.get_commits()[0].sha
success = initial_sandbox_response.outputs and initial_sandbox_response.success
status = "โœ“" if success else "X"
sandbox_execution_message = (
"\n\n## GitHub Actions"
+ status
+ "\n\nHere are the GitHub Actions logs prior to making any changes:\n\n"
)
sandbox_execution_message += entities_create_error_logs(
f'<a href="https://github.com/{repo_full_name}/commit/{commit_hash}"><code>{commit_hash[:7]}</code></a>',
initial_sandbox_response,
initial_sandbox_response_file,
)
if success:
sandbox_execution_message += f"\n\nSandbox passed on the latest `{repo.default_branch}`, so sandbox checks will be enabled for this issue."
else:
sandbox_execution_message += "\n\nSandbox failed, so all sandbox checks will be disabled for this issue."
if index < 0:
index = 0
if index == 4:
return (
pr_message
+ config_pr_message
+ f"\n\n{actions_message}"
)
total = len(progress_headers)
index += 1 if done else 0
index *= 100 / total
index = int(index)
index = min(100, index)
if errored:
pbar = f"\n\n<img src='https://progress-bar.dev/{index}/?&title=Errored&width=600' alt='{index}%' />"
return (
f"{center(sweeping_gif)}<br/>{center(pbar)}\n\n"
+ f"\n\n{actions_message}"
)
pbar = f"\n\n<img src='https://progress-bar.dev/{index}/?&title=Progress&width=600' alt='{index}%' />"
return (
f"{center(sweeping_gif)}"
+ f"<br/>{center(pbar)}"
+ ("\n" + stars_suffix if index != -1 else "")
+ "\n"
+ center(payment_message_start)
+ config_pr_message
+ f"\n\n{actions_message}"
)
def process_summary(summary, issue_number, repo_full_name, installation_id):
summary = summary or ""
summary = re.sub(
"<details (open)?>(\r)?\n<summary>Checklist</summary>.*",
"",
summary,
flags=re.DOTALL,
).strip()
summary = re.sub(
"---\s+Checklist:(\r)?\n(\r)?\n- \[[ X]\].*",
"",
summary,
flags=re.DOTALL,
).strip()
summary = re.sub(
"### Details\n\n_No response_", "", summary, flags=re.DOTALL
)
summary = re.sub("\n\n", "\n", summary, flags=re.DOTALL)
repo_name = repo_full_name
user_token, g = get_github_client(installation_id)
repo = g.get_repo(repo_full_name)
current_issue: Issue = repo.get_issue(number=issue_number)
assignee = current_issue.assignee.login if current_issue.assignee else None
if assignee is None:
assignee = current_issue.user.login
branch_match = re.search(
r"([B|b]ranch:) *(?P<branch_name>.+?)(\s|$)", summary
)
overrided_branch_name = None
if branch_match and "branch_name" in branch_match.groupdict():
overrided_branch_name = (
branch_match.groupdict()["branch_name"].strip().strip("`\"'")
)
# TODO: this code might be finicky, might have missed edge cases
if overrided_branch_name.startswith("https://github.com/"):
overrided_branch_name = overrided_branch_name.split("?")[0].split(
"tree/"
)[-1]
SweepConfig.get_branch(repo, overrided_branch_name)
return summary,repo_name,user_token,g,repo,current_issue,assignee,overrided_branch_name
def raise_on_no_file_change_requests(title, summary, edit_sweep_comment, file_change_requests):
if not file_change_requests:
if len(title + summary) < 60:
edit_sweep_comment(
(
"Sorry, I could not find any files to modify, can you please"
" provide more details? Please make sure that the title and"
" summary of the issue are at least 60 characters."
),
-1,
)
else:
edit_sweep_comment(
(
"Sorry, I could not find any files to modify, can you please"
" provide more details?"
),
-1,
)
raise Exception("No files to modify.")
def rewrite_pr_description(issue_number, repo, overrided_branch_name, pull_request, pr_changes):
# change the body here
diff_text = get_branch_diff_text(
repo=repo,
branch=pull_request.branch_name,
base_branch=overrided_branch_name,
)
new_description = PRDescriptionBot().describe_diffs(
diff_text,
pull_request.title,
) # TODO: update the title as well
if new_description:
pr_changes.body = (
f"{new_description}\n\nFixes"
f" #{issue_number}.\n\n---\n\n{INSTRUCTIONS_FOR_REVIEW}{BOT_SUFFIX}"
)
return pr_changes
def send_email_to_user(title, issue_number, username, repo_full_name, tracking_id, repo_name, g, file_change_requests, pr_changes, pr):
user_settings = UserSettings.from_username(username=username)
user = g.get_user(username)
full_name = user.name or user.login
name = full_name.split(" ")[0]
files_changed = []
for fcr in file_change_requests:
if fcr.change_type in ("create", "modify"):
diff = list(
difflib.unified_diff(
(fcr.old_content or "").splitlines() or [],
(fcr.new_content or "").splitlines() or [],
lineterm="",
)
)
added = sum(
1
for line in diff
if line.startswith("+") and not line.startswith("+++")
)
removed = sum(
1
for line in diff
if line.startswith("-") and not line.startswith("---")
)
files_changed.append(
f"<code>{fcr.filename}</code> (+{added}/-{removed})"
)
user_settings.send_email(
subject=f"Sweep Pull Request Complete for {repo_name}#{issue_number} {title}",
html=email_template.format(
name=name,
pr_url=pr.html_url,
issue_number=issue_number,
repo_full_name=repo_full_name,
pr_number=pr.number,
summary=markdown.markdown(pr_changes.body),
files_changed="\n".join(
[f"<li>{item}</li>" for item in files_changed]
),
sweeping_gif=sweeping_gif,
),
)
def handle_empty_repository(comment_id, current_issue, progress_headers, issue_comment):
first_comment = (
"Sweep is currently not supported on empty repositories. Please add some"
f" code to your repository and try again.\n{sep}##"
f" {progress_headers[1]}\n{bot_suffix}{discord_suffix}"
)
if issue_comment is None:
issue_comment = current_issue.create_comment(
first_comment + BOT_SUFFIX
)
else:
issue_comment.edit(first_comment + BOT_SUFFIX)
fire_and_forget_wrapper(add_emoji)(
current_issue, comment_id, reaction_content="confused"
)
fire_and_forget_wrapper(remove_emoji)(content_to_delete="eyes")
def get_branch_diff_text(repo, branch, base_branch=None):
base_branch = base_branch or SweepConfig.get_branch(repo)
comparison = repo.compare(base_branch, branch)
file_diffs = comparison.files
pr_diffs = []
for file in file_diffs:
diff = file.patch
if (
file.status == "added"
or file.status == "modified"
or file.status == "removed"
):
pr_diffs.append((file.filename, diff))
else:
logger.info(
f"File status {file.status} not recognized"
) # TODO(sweep): We don't handle renamed files
return "\n".join([f"{filename}\n{diff}" for filename, diff in pr_diffs])
def get_payment_messages(chat_logger: ChatLogger):
if chat_logger:
is_paying_user = chat_logger.is_paying_user()
is_consumer_tier = chat_logger.is_consumer_tier()
use_faster_model = chat_logger.use_faster_model()
else:
is_paying_user = True
is_consumer_tier = False
use_faster_model = False
# Find the first comment made by the bot
tickets_allocated = 5
if is_consumer_tier:
tickets_allocated = 15
if is_paying_user:
tickets_allocated = 500
purchased_ticket_count = (
chat_logger.get_ticket_count(purchased=True) if chat_logger else 0
)
ticket_count = (
max(tickets_allocated - chat_logger.get_ticket_count(), 0)
+ purchased_ticket_count
if chat_logger
else 999
)
daily_ticket_count = (
(3 - chat_logger.get_ticket_count(use_date=True) if not use_faster_model else 0)
if chat_logger
else 999
)
single_payment_link = "https://buy.stripe.com/00g3fh7qF85q0AE14d"
pro_payment_link = "https://buy.stripe.com/00g5npeT71H2gzCfZ8"
daily_message = (
f" and {daily_ticket_count} for the day"
if not is_paying_user and not is_consumer_tier
else ""
)
user_type = "๐Ÿ’Ž <b>Sweep Pro</b>" if is_paying_user else "โšก <b>Sweep Basic Tier</b>"
gpt_tickets_left_message = (
f"{ticket_count} Sweep issues left for the month"
if not is_paying_user
else "unlimited Sweep issues"
)
purchase_message = f"<br/><br/> For more Sweep issues, visit <a href={single_payment_link}>our payment portal</a>. For a one week free trial, try <a href={pro_payment_link}>Sweep Pro</a> (unlimited GPT-4 tickets)."
payment_message = (
f"{user_type}: You have {gpt_tickets_left_message}{daily_message}"
+ (purchase_message if not is_paying_user else "")
)
payment_message_start = (
f"{user_type}: You have {gpt_tickets_left_message}{daily_message}"
+ (purchase_message if not is_paying_user else "")
)
return payment_message, payment_message_start
def parse_issues_from_code_review(issue_string: str):
issue_regex = r'<issue>(?P<issue>.*?)<\/issue>'
issue_matches = list(re.finditer(issue_regex, issue_string, re.DOTALL))
potential_issues = set()
for issue in issue_matches:
issue_content = issue.group('issue')
issue_params = ['issue_description', 'start_line', 'end_line']
issue_args = {}
issue_failed = False
for param in issue_params:
regex = rf'<{param}>(?P<{param}>.*?)<\/{param}>'
result = re.search(regex, issue_content, re.DOTALL)
try:
issue_args[param] = result.group(param).strip()
except AttributeError:
issue_failed = True
break
if not issue_failed:
potential_issues.add(CodeReviewIssue(**issue_args))
return list(potential_issues)
# converts the list of issues inside a code_review into markdown text to display in a github comment
@posthog_trace
def render_code_review_issues(username: str, pr: PullRequest, code_review: CodeReview, issue_type: str = "", metadata: dict = {}):
files_to_blobs = {file.filename: file.blob_url for file in list(pr.get_files())}
# generate the diff urls
files_to_diffs = {}
for file_name, _ in files_to_blobs.items():
sha_256 = hashlib.sha256(file_name.encode('utf-8')).hexdigest()
files_to_diffs[file_name] = f"{pr.html_url}/files#diff-{sha_256}"
code_issues = code_review.issues
if issue_type == "potential":
code_issues = code_review.potential_issues
code_issues_string = ""
for issue in code_issues:
if code_review.file_name in files_to_blobs:
if issue.start_line == issue.end_line:
issue_blob_url = f"{files_to_blobs[code_review.file_name]}#L{issue.start_line}"
issue_diff_url = f"{files_to_diffs[code_review.file_name]}R{issue.start_line}"
else:
issue_blob_url = f"{files_to_blobs[code_review.file_name]}#L{issue.start_line}-L{issue.end_line}"
issue_diff_url = f"{files_to_diffs[code_review.file_name]}R{issue.start_line}-R{issue.end_line}"
code_issues_string += f"<li>{issue.issue_description}</li>\n\n{issue_blob_url}\n[View Diff]({issue_diff_url})"
return code_issues_string
def escape_html(text: str) -> str:
return text.replace('<', '&lt;').replace('>', '&gt;')
# make sure code blocks are render properly in github comments markdown
def format_code_sections(text: str) -> str:
backtick_count = text.count("`")
if backtick_count % 2 != 0:
# If there's an odd number of backticks, return the original text
return text
result = []
last_index = 0
inside_code = False
while True:
try:
index = text.index('`', last_index)
result.append(text[last_index:index])
if inside_code:
result.append('</code>')
else:
result.append('<code>')
inside_code = not inside_code
last_index = index + 1
except ValueError:
# No more backticks found
break
result.append(text[last_index:])
formatted_text = ''.join(result)
# Escape HTML characters within <code> tags
formatted_text = formatted_text.replace('<code>', '<code>').replace('</code>', '</code>')
parts = formatted_text.split('<code>')
for i in range(1, len(parts)):
code_content, rest = parts[i].split('</code>', 1)
parts[i] = escape_html(code_content) + '</code>' + rest
return '<code>'.join(parts)
# turns code_review_by_file into markdown string
@posthog_trace
def render_pr_review_by_file(username: str, pr: PullRequest, code_review_by_file: dict[str, CodeReview], dropped_files: list[str] = [], metadata: dict = {}) -> str:
body = f"{SWEEP_PR_REVIEW_HEADER}\n"
reviewed_files = ""
for file_name, code_review in code_review_by_file.items():
sweep_issues = code_review.issues
potential_issues = code_review.potential_issues
reviewed_files += f"""<details open>
<summary>{file_name}</summary>
<p>{format_code_sections(code_review.diff_summary)}</p>"""
if sweep_issues:
sweep_issues_string = render_code_review_issues(username, pr, code_review)
reviewed_files += f"<p><strong>Sweep Found These Issues</strong></p><ul>{format_code_sections(sweep_issues_string)}</ul>"
if potential_issues:
potential_issues_string = render_code_review_issues(username, pr, code_review, issue_type="potential")
reviewed_files += f"<details><summary><strong>Potential Issues</strong></summary><p>Sweep isn't 100% sure if the following are issues or not but they may be worth taking a look at.</p><ul>{format_code_sections(potential_issues_string)}</ul></details>"
reviewed_files += "</details><hr>"
if len(dropped_files) == 1:
reviewed_files += f"<p>{dropped_files[0]} was not reviewed because our filter identified it as typically a non-human-readable or less important file (e.g., dist files, package.json, images). If this is an error, please let us know.</p>"
elif len(dropped_files) > 1:
dropped_files_string = "".join([f"<li>{file}</li>" for file in dropped_files])
reviewed_files += f"<p>The following files were not reviewed because our filter identified them as typically non-human-readable or less important files (e.g., dist files, package.json, images). If this is an error, please let us know.</p><ul>{dropped_files_string}</ul>"
return body + reviewed_files
# handles the creation or update of the Sweep comment letting the user know that Sweep is reviewing a pr
# returns the comment_id
@posthog_trace
def create_update_review_pr_comment(username: str, pr: PullRequest, code_review_by_file: dict[str, CodeReview] | None = None, dropped_files: list[str] = [], metadata: dict = {}) -> int:
comment_id = -1
sweep_comment = None
# comments that appear in the github ui in the conversation tab are considered issue comments
pr_comments = list(pr.get_issue_comments())
# make sure we don't already have a comment created
for comment in pr_comments:
# a comment has already been created
if comment.body.startswith(SWEEP_PR_REVIEW_HEADER):
comment_id = comment.id
sweep_comment = comment
break
# comment has not yet been created
if not sweep_comment:
sweep_comment = pr.create_issue_comment(f"{SWEEP_PR_REVIEW_HEADER}\nSweep is currently reviewing your pr...")
# update body of sweep_comment
if code_review_by_file:
rendered_pr_review = render_pr_review_by_file(username, pr, code_review_by_file, dropped_files=dropped_files)
sweep_comment.edit(rendered_pr_review)
comment_id = sweep_comment.id
return comment_id
def render_fcrs(file_change_requests: list[FileChangeRequest]):
# Render plan start
planning_markdown = ""
for fcr in file_change_requests:
parsed_fcr = parse_fcr(fcr)
if parsed_fcr and parsed_fcr["new_code"]:
planning_markdown += f"#### `{fcr.filename}`\n"
planning_markdown += f"{blockquote(parsed_fcr['justification'])}\n\n"
if parsed_fcr["original_code"] and parsed_fcr["original_code"][0].strip():
planning_markdown += f"""```diff\n{generate_diff(
parsed_fcr["original_code"][0],
parsed_fcr["new_code"][0],
)}\n```\n"""
else:
_file_base_name, ext = os.path.splitext(fcr.filename)
planning_markdown += f"```{ext}\n{parsed_fcr['new_code'][0]}\n```\n"
else:

from __future__ import annotations
import os
import traceback
from functools import lru_cache
import github
import yaml
from github.Repository import Repository
from loguru import logger
from pydantic import BaseModel
from sweepai.core.entities import EmptyRepository
from sweepai.utils.file_utils import read_file_with_fallback_encodings
class SweepConfig(BaseModel):
include_dirs: list[str] = []
exclude_dirs: list[str] = [
".git",
"node_modules",
"build",
".venv",
"venv",
"patch",
"packages/blobs",
"dist",
]
exclude_path_dirs: list[str] = ["node_modules", "build", ".venv", "venv", ".git", "dist"]
exclude_substrings_aggressive: list[str] = [ # aggressively filter out file paths, may drop some relevant files
"integration",
".spec",
".test",
".json",
"test"
]
include_exts: list[str] = [
".cs",
".csharp",
".py",
".md",
".txt",
".ts",
".tsx",
".js",
".jsx",
".mjs",
]
exclude_exts: list[str] = [
".min.js",
".min.js.map",
".min.css",
".min.css.map",
".tfstate",
".tfstate.backup",
".jar",
".ipynb",
".png",
".jpg",
".jpeg",
".download",
".gif",
".bmp",
".tiff",
".ico",
".mp3",
".wav",
".wma",
".ogg",
".flac",
".mp4",
".avi",
".mkv",
".mov",
".patch",
".patch.disabled",
".wmv",
".m4a",
".m4v",
".3gp",
".3g2",
".rm",
".swf",
".flv",
".iso",
".bin",
".tar",
".zip",
".7z",
".gz",
".rar",
".pdf",
".doc",
".docx",
".xls",
".xlsx",
".ppt",
".pptx",
".svg",
".parquet",
".pyc",
".pub",
".pem",
".ttf",
".dfn",
".dfm",
".feature",
"sweep.yaml",
"pnpm-lock.yaml",
"LICENSE",
"poetry.lock",
'package-lock.json',
'package.json',
'pyproject.toml',
'requirements.txt',
'yarn.lock',
'.lockb',
]
# cutoff for when we output truncated versions of strings, this is an arbitrary number and can be changed
truncation_cutoff: int = 20000
# Image formats
max_file_limit: int = 60_000
# github comments
max_github_comment_body_length: int = 65535
# allowed image types for vision
allowed_image_types: list[str] = [
"jpg",
"jpeg",
"webp",
"png"
]
def to_yaml(self) -> str:
return yaml.safe_dump(self.dict())
@classmethod
def from_yaml(cls, yaml_str: str) -> "SweepConfig":
data = yaml.safe_load(yaml_str)
return cls.parse_obj(data)
@staticmethod
@lru_cache()
def get_branch(repo: Repository, override_branch: str | None = None) -> str:
if override_branch:
branch_name = override_branch
try:
repo.get_branch(branch_name)
return branch_name
except github.GithubException:
# try a more robust branch test
branch_name_parts = branch_name.split(" ")[0].split("/")
branch_name_combos = []
for i in range(len(branch_name_parts)):
branch_name_combos.append("/".join(branch_name_parts[i:]))
try:
for i in range(len(branch_name_combos)):
branch_name = branch_name_combos[i]
try:
repo.get_branch(branch_name)
return branch_name
except Exception as e:
if i < len(branch_name_combos) - 1:
continue
else:
raise Exception(f"Branch not found: {e}")
except Exception as e:
logger.exception(
f"Error when getting branch {branch_name}: {e}, traceback: {traceback.format_exc()}"
)
except Exception as e:
logger.exception(
f"Error when getting branch {branch_name}: {e}, traceback: {traceback.format_exc()}"
)
default_branch = repo.default_branch
try:
sweep_yaml_dict = {}
contents = repo.get_contents("sweep.yaml")
sweep_yaml_dict = yaml.safe_load(
contents.decoded_content.decode("utf-8")
)
if "branch" not in sweep_yaml_dict:
return default_branch
branch_name = sweep_yaml_dict["branch"]
try:
repo.get_branch(branch_name)
return branch_name
except Exception as e:
logger.exception(
f"Error when getting branch: {e}, traceback: {traceback.format_exc()}, creating branch"
)
repo.create_git_ref(
f"refs/heads/{branch_name}",
repo.get_branch(default_branch).commit.sha,
)
return branch_name
except Exception:
return default_branch
@staticmethod
def get_config(repo: Repository):
try:
contents = repo.get_contents("sweep.yaml")
config = yaml.safe_load(contents.decoded_content.decode("utf-8"))
return SweepConfig(**config)
except Exception as e:
logger.warning(f"Error when getting config: {e}, returning empty dict")
if "This repository is empty." in str(e):
raise EmptyRepository()
return SweepConfig()
@staticmethod
def get_draft(repo: Repository):
try:
contents = repo.get_contents("sweep.yaml")
config = yaml.safe_load(contents.decoded_content.decode("utf-8"))
return config.get("draft", False)
except Exception as e:
logger.warning(f"Error when getting draft: {e}, returning False")
return False
# returns if file is excluded or not
def is_file_excluded(self, file_path: str) -> bool:
parts = file_path.split(os.path.sep)
for part in parts:
if part in self.exclude_dirs or part in self.exclude_exts:
return True
return False
# returns if file is excluded or not, this version may drop actual relevant files
def is_file_excluded_aggressive(self, dir: str, file_path: str) -> bool:
# tiktoken_client = Tiktoken()
# must exist
if not os.path.exists(os.path.join(dir, file_path)) and not os.path.exists(file_path):
return True
full_path = os.path.join(dir, file_path)
if os.stat(full_path).st_size > 240000 or os.stat(full_path).st_size < 5:
return True
# exclude binary
with open(full_path, "rb") as f:
is_binary = False
for block in iter(lambda: f.read(1024), b""):
if b"\0" in block:
is_binary = True
break
if is_binary:
return True
try:
# fetch file
data = read_file_with_fallback_encodings(full_path)
lines = data.split("\n")
except UnicodeDecodeError:
logger.warning(f"UnicodeDecodeError in is_file_excluded_aggressive: {full_path}, skipping")
return True
line_count = len(lines)
# if average line length is greater than 200, then it is likely not human readable
if len(data)/line_count > 200:
return True
# check token density, if it is greater than 2, then it is likely not human readable
# token_count = tiktoken_client.count(data)
# if token_count == 0:
# return True
# if len(data)/token_count < 2:
# return True
# now check the file name
parts = file_path.split(os.path.sep)
for part in parts:
if part in self.exclude_dirs or part in self.exclude_exts:
return True
for part in self.exclude_substrings_aggressive:
if part in file_path:
return True
return False
@lru_cache(maxsize=None)
def get_gha_enabled(repo: Repository) -> bool:
try:
contents = repo.get_contents("sweep.yaml")
gha_enabled = yaml.safe_load(contents.decoded_content.decode("utf-8")).get(
"gha_enabled", False
)
return gha_enabled
except Exception:
logger.info(
"Error when getting gha enabled, falling back to False"
)
return False
@lru_cache(maxsize=None)
def get_description(repo: Repository) -> dict:
try:
contents = repo.get_contents("sweep.yaml")
sweep_yaml = yaml.safe_load(contents.decoded_content.decode("utf-8"))
description = sweep_yaml.get("description", "")
rules = sweep_yaml.get("rules", [])
rules = "\n * ".join(rules[:3])
return {"description": description, "rules": rules}
except Exception:
return {"description": "", "rules": ""}
@lru_cache(maxsize=None)
def get_sandbox_config(repo: Repository):
try:
contents = repo.get_contents("sweep.yaml")
description = yaml.safe_load(contents.decoded_content.decode("utf-8")).get(
"sandbox", {}
)
return description
except Exception:
return {}
@lru_cache(maxsize=None)
def get_branch_name_config(repo: Repository):
try:
contents = repo.get_contents("sweep.yaml")
description = yaml.safe_load(contents.decoded_content.decode("utf-8")).get(
"branch_use_underscores", False
)
return description
except Exception:
return False
@lru_cache(maxsize=None)
def get_documentation_dict(repo: Repository):
try:
sweep_yaml_content = repo.get_contents("sweep.yaml").decoded_content.decode(
"utf-8"
)
sweep_yaml = yaml.safe_load(sweep_yaml_content)
docs = sweep_yaml.get("docs", {})
return docs
except Exception:
return {}
@lru_cache(maxsize=None)
def get_blocked_dirs(repo: Repository):
try:
sweep_yaml_content = repo.get_contents("sweep.yaml").decoded_content.decode(
"utf-8"
)
sweep_yaml = yaml.safe_load(sweep_yaml_content)
dirs = sweep_yaml.get("blocked_dirs", [])
return dirs
except Exception:
return []
@lru_cache(maxsize=None)
def get_rules(repo: Repository):
try:
sweep_yaml_content = repo.get_contents("sweep.yaml").decoded_content.decode(
"utf-8"
)
sweep_yaml = yaml.safe_load(sweep_yaml_content)
rules = sweep_yaml.get("rules", [])
return rules
except Exception:
return []
# optional, can leave env var blank
GITHUB_APP_CLIENT_ID = os.environ.get("GITHUB_APP_CLIENT_ID", "Iv1.91fd31586a926a9f")
RESTART_SWEEP_BUTTON = "โ†ป Restart Sweep"
SWEEP_GOOD_FEEDBACK = "๐Ÿ‘ Sweep Did Well"
SWEEP_BAD_FEEDBACK = "๐Ÿ‘Ž Sweep Needs Improvement"
RESET_FILE = "Rollback changes to "
REVERT_CHANGED_FILES_TITLE = "## Rollback Files For Sweep"
RULES_TITLE = (
"## Apply [Sweep Rules](https://docs.sweep.dev/usage/config#rules) to your PR?"
)
RULES_LABEL = "**Apply:** "
DEFAULT_RULES = [
"All new business logic should have corresponding unit tests.",
"Refactor large functions to be more modular.",
"Add docstrings to all functions and file headers.",
]
DEFAULT_RULES_STRING = """\
- "All new business logic should have corresponding unit tests."
- "Refactor large functions to be more modular."

Step 2: โŒจ๏ธ Coding

sweepai/utils/ticket_rendering_utils.py

Modify the logging level for the "no jobs for this run" message in the `get_failing_gha_logs` function.
--- 
+++ 
@@ -1,4 +1,4 @@
         # make sure jobs in valid
         if jobs_response.json()['total_count'] == 0:
-            logger.error(f"no jobs for this run: {run}, continuing...")
+            logger.warning(f"no jobs for this run: {run}, continuing...")
             continue

Step 3: ๐Ÿ”„๏ธ Validating

Your changes have been successfully made to the branch sweep/downgrade_this_to_a_warning. I have validated these changes using a syntax checker and a linter.


Tip

To recreate the pull request, edit the issue title or description.

This is an automated message generated by Sweep AI.