X Tutup
from __future__ import annotations import contextlib import os import shutil import subprocess import tempfile from pathlib import Path from typing import TYPE_CHECKING, TypedDict import questionary from commitizen import factory, git, out from commitizen.cz.exceptions import CzException from commitizen.cz.utils import get_backup_file_path from commitizen.exceptions import ( CommitError, CommitMessageLengthExceededError, CustomError, DryRunExit, NoAnswersError, NoCommitBackupError, NotAGitProjectError, NotAllowed, NothingToCommitError, ) from commitizen.git import smart_open if TYPE_CHECKING: from commitizen.config import BaseConfig class CommitArgs(TypedDict, total=False): all: bool dry_run: bool edit: bool extra_cli_args: str message_length_limit: int no_retry: bool signoff: bool write_message_to_file: Path | None retry: bool class Commit: """Show prompt for the user to create a guided commit.""" def __init__(self, config: BaseConfig, arguments: CommitArgs) -> None: if not git.is_git_project(): raise NotAGitProjectError() self.config: BaseConfig = config self.cz = factory.committer_factory(self.config) self.arguments = arguments self.backup_file_path = get_backup_file_path() def _read_backup_message(self) -> str | None: # Check the commit backup file exists if not self.backup_file_path.is_file(): return None # Read commit message from backup return self.backup_file_path.read_text( encoding=self.config.settings["encoding"] ).strip() def _get_message_by_prompt_commit_questions(self) -> str: # Prompt user for the commit message questions = self.cz.questions() for question in (q for q in questions if q["type"] == "list"): question["use_shortcuts"] = self.config.settings["use_shortcuts"] try: answers = questionary.prompt(questions, style=self.cz.style) except ValueError as err: root_err = err.__context__ if isinstance(root_err, CzException): raise CustomError(str(root_err)) raise err if not answers: raise NoAnswersError() message = self.cz.message(answers) self._validate_subject_length(message) return message def _validate_subject_length(self, message: str) -> None: message_length_limit = self.arguments.get( "message_length_limit", self.config.settings.get("message_length_limit", 0) ) # By the contract, message_length_limit is set to 0 for no limit if ( message_length_limit is None or message_length_limit <= 0 ): # do nothing for no limit return subject = message.partition("\n")[0].strip() if len(subject) > message_length_limit: raise CommitMessageLengthExceededError( f"Length of commit message exceeds limit ({len(subject)}/{message_length_limit}), subject: '{subject}'" ) def manual_edit(self, message: str) -> str: editor = git.get_core_editor() if editor is None: raise RuntimeError("No 'editor' value given and no default available.") exec_path = shutil.which(editor) if exec_path is None: raise RuntimeError(f"Editor '{editor}' not found.") with tempfile.NamedTemporaryFile(mode="w", delete=False) as file: file.write(message) file_path = file.name argv = [exec_path, file_path] subprocess.call(argv) message = Path(file_path).read_text().strip() os.unlink(file.name) return message def _get_message(self) -> str: if self.arguments.get("retry"): commit_message = self._read_backup_message() if commit_message is None: raise NoCommitBackupError() return commit_message if ( self.config.settings.get("retry_after_failure") and not self.arguments.get("no_retry") and (backup_message := self._read_backup_message()) ): return backup_message return self._get_message_by_prompt_commit_questions() def __call__(self) -> None: extra_args = self.arguments.get("extra_cli_args", "") dry_run = bool(self.arguments.get("dry_run")) write_message_to_file = self.arguments.get("write_message_to_file") signoff = bool(self.arguments.get("signoff")) if signoff: out.warn( "Deprecated warning: `cz commit -s` is deprecated and will be removed in v5, please use `cz commit -- -s` instead." ) if self.arguments.get("all"): git.add("-u") if git.is_staging_clean() and not (dry_run or "--allow-empty" in extra_args): raise NothingToCommitError("No files added to staging!") if write_message_to_file is not None and write_message_to_file.is_dir(): raise NotAllowed(f"{write_message_to_file} is a directory") commit_message = self._get_message() if self.arguments.get("edit"): commit_message = self.manual_edit(commit_message) out.info(f"\n{commit_message}\n") if write_message_to_file: with smart_open( write_message_to_file, "w", encoding=self.config.settings["encoding"] ) as file: file.write(commit_message) if dry_run: raise DryRunExit() if self.config.settings["always_signoff"] or signoff: extra_args = f"{extra_args} -s".strip() c = git.commit(commit_message, args=extra_args) if c.return_code != 0: out.error(c.err) # Create commit backup with smart_open( self.backup_file_path, "w", encoding=self.config.settings["encoding"] ) as f: f.write(commit_message) raise CommitError() if any(s in c.out for s in ("nothing added", "no changes added to commit")): out.error(c.out) return with contextlib.suppress(FileNotFoundError): self.backup_file_path.unlink() out.write(c.err) out.write(c.out) out.success("Commit successful!")
X Tutup