Skip to content

API Reference

This section provides detailed API documentation for git-autosquash's core components. The documentation is auto-generated from docstrings to ensure accuracy.

Core Components

GitOps

The central interface for all Git operations with proper error handling.

git_autosquash.git_ops

Git operations module for repository analysis and commands.

Classes
GitOps

Handles git operations for repository analysis and validation.

Source code in src/git_autosquash/git_ops.py
class GitOps:
    """Handles git operations for repository analysis and validation."""

    def __init__(self, repo_path: Optional[Union[str, Path]] = None) -> None:
        """Initialize GitOps with optional repository path.

        Args:
            repo_path: Path to git repository. Defaults to current directory.
        """
        if repo_path is None:
            self.repo_path = Path.cwd()
        else:
            self.repo_path = Path(repo_path)

    def is_git_available(self) -> bool:
        """Check if git is installed and available.

        Returns:
            True if git command is available, False otherwise
        """
        try:
            result = subprocess.run(
                ["git", "--version"],
                capture_output=True,
                text=True,
                timeout=5,
            )
            return result.returncode == 0
        except (subprocess.SubprocessError, FileNotFoundError, OSError):
            return False

    def _run_git_command(
        self, *args: str, preserve_line_endings: bool = False
    ) -> tuple[bool, str]:
        """Run a git command and return success status and output.

        Args:
            *args: Git command arguments
            preserve_line_endings: If True, don't do universal newline conversion.
                                  Use for diff/show output where CRLF must be preserved.

        Returns:
            Tuple of (success, output/error_message)
        """
        try:
            stdout: str
            stderr: str
            returncode: int

            if preserve_line_endings:
                # Use binary mode and decode manually to preserve \r characters
                bin_result = subprocess.run(
                    ["git", *args],
                    cwd=self.repo_path,
                    capture_output=True,
                    check=False,
                )
                stdout = bin_result.stdout.decode("utf-8", errors="replace")
                stderr = bin_result.stderr.decode("utf-8", errors="replace")
                returncode = bin_result.returncode
            else:
                text_result = subprocess.run(
                    ["git", *args],
                    cwd=self.repo_path,
                    capture_output=True,
                    text=True,
                    check=False,
                )
                stdout = text_result.stdout
                stderr = text_result.stderr
                returncode = text_result.returncode

            # For status --porcelain and diff/show commands, preserve trailing whitespace
            # as it's significant (blank context lines in diffs are represented as a single space)
            if (
                len(args) >= 2 and args[0] == "status" and args[1] == "--porcelain"
            ) or (len(args) >= 1 and args[0] in ("show", "diff")):
                output = stdout.rstrip("\n")  # Only remove trailing newlines
            else:
                output = stdout.strip()  # Full strip for other commands

            return (
                returncode == 0,
                output or stderr.strip(),
            )
        except (subprocess.SubprocessError, FileNotFoundError) as e:
            return False, f"Git command failed: {e}"

    def _run_git_command_with_input(
        self, *args: str, input_text: str
    ) -> tuple[bool, str]:
        """Run a git command with input text and return success status and output.

        Args:
            *args: Git command arguments
            input_text: Text to provide as stdin to the command

        Returns:
            Tuple of (success, output/error_message)
        """
        try:
            result = subprocess.run(
                ["git", *args],
                cwd=self.repo_path,
                input=input_text,
                capture_output=True,
                text=True,
                check=False,
            )
            return (
                result.returncode == 0,
                result.stdout.strip() or result.stderr.strip(),
            )
        except (subprocess.SubprocessError, FileNotFoundError) as e:
            return False, f"Git command failed: {e}"

    def is_git_repo(self) -> bool:
        """Check if current directory is inside a git repository.

        Returns:
            True if in a git repository, False otherwise
        """
        success, _ = self._run_git_command("rev-parse", "--git-dir")
        return success

    def get_current_branch(self) -> Optional[str]:
        """Get the current branch name.

        Returns:
            Branch name if on a branch, None if detached HEAD
        """
        success, output = self._run_git_command("symbolic-ref", "--short", "HEAD")
        return output if success else None

    def get_merge_base_with_main(self, current_branch: str) -> Optional[str]:
        """Find merge base with main/master branch.

        Args:
            current_branch: Current branch name

        Returns:
            Commit hash of merge base, or None if not found
        """
        # Try merge-base directly, let git handle missing refs
        for main_branch in ["main", "master"]:
            if main_branch == current_branch:
                continue

            success, output = self._run_git_command(
                "merge-base", main_branch, current_branch
            )
            if success:
                return output

        return None

    def get_working_tree_status(self) -> dict[str, bool]:
        """Get working tree status information.

        Returns:
            Dictionary with status flags: has_staged, has_unstaged, is_clean
        """
        success, output = self._run_git_command("status", "--porcelain")
        if not success:
            return {"has_staged": False, "has_unstaged": False, "is_clean": True}

        lines = output.split("\n") if output else []
        has_staged = any(line and line[0] not in "? " for line in lines)
        has_unstaged = any(line and line[1] not in " " for line in lines)
        is_clean = not lines or all(not line.strip() for line in lines)

        return {
            "has_staged": has_staged,
            "has_unstaged": has_unstaged,
            "is_clean": is_clean,
        }

    def has_commits_since_merge_base(self, merge_base: str) -> bool:
        """Check if there are commits on current branch since merge base.

        Args:
            merge_base: Merge base commit hash

        Returns:
            True if there are commits to work with
        """
        success, output = self._run_git_command(
            "rev-list", "--count", f"{merge_base}..HEAD"
        )
        if not success:
            return False

        try:
            count = int(output)
            return count > 0
        except ValueError:
            return False

    def validate_merge_base(self, base_ref: str) -> tuple[bool, str, Optional[str]]:
        """Validate that a base reference is valid and usable as a merge-base.

        Args:
            base_ref: Git reference (branch name, commit hash, etc.)

        Returns:
            Tuple of (is_valid, error_message, resolved_commit_hash)
            If is_valid is True, error_message will be empty and resolved_commit_hash will be set
            If is_valid is False, error_message will contain the reason and resolved_commit_hash will be None
        """
        # First, check if the ref exists and resolve it to a commit hash
        success, output = self._run_git_command("rev-parse", "--verify", base_ref)
        if not success:
            return False, f"Reference '{base_ref}' does not exist", None

        resolved_hash = output.strip()

        # Check if it's actually a commit (not a tree or blob)
        success, obj_type = self._run_git_command("cat-file", "-t", resolved_hash)
        if not success or obj_type.strip() != "commit":
            return (
                False,
                f"'{base_ref}' is not a commit (type: {obj_type.strip()})",
                None,
            )

        # Check if the base is an ancestor of HEAD
        success, _ = self._run_git_command(
            "merge-base", "--is-ancestor", resolved_hash, "HEAD"
        )
        if not success:
            return (
                False,
                f"'{base_ref}' is not an ancestor of HEAD (not in current branch history)",
                None,
            )

        # Check if there are commits between base and HEAD
        success, output = self._run_git_command(
            "rev-list", "--count", f"{resolved_hash}..HEAD"
        )
        if not success:
            return False, f"Failed to check commits since '{base_ref}'", None

        try:
            count = int(output.strip())
            if count == 0:
                return False, f"No commits between '{base_ref}' and HEAD", None
        except ValueError:
            return False, f"Failed to parse commit count for '{base_ref}'", None

        return True, "", resolved_hash

    def run_git_command(
        self,
        args: list[str],
        env: dict[str, str] | None = None,
        preserve_line_endings: bool = False,
    ) -> subprocess.CompletedProcess[str]:
        """Run a git command and return the complete result.

        Args:
            args: Git command arguments (without 'git')
            env: Optional environment variables
            preserve_line_endings: If True, don't do universal newline conversion.
                                  Use for diff/show output where CRLF must be preserved.

        Returns:
            CompletedProcess with stdout, stderr, and return code
        """
        cmd = ["git"] + args
        try:
            if preserve_line_endings:
                # Use binary mode and decode manually to preserve \r characters
                bin_result = subprocess.run(
                    cmd,
                    cwd=self.repo_path,
                    capture_output=True,
                    env=env,
                    timeout=300,
                )
                # Decode without universal newline conversion
                return subprocess.CompletedProcess(
                    args=bin_result.args,
                    returncode=bin_result.returncode,
                    stdout=bin_result.stdout.decode("utf-8", errors="replace"),
                    stderr=bin_result.stderr.decode("utf-8", errors="replace"),
                )
            else:
                text_result = subprocess.run(
                    cmd,
                    cwd=self.repo_path,
                    capture_output=True,
                    text=True,
                    env=env,
                    timeout=300,  # 5 minute timeout
                )
                return text_result
        except subprocess.TimeoutExpired as e:
            return subprocess.CompletedProcess(
                args=cmd,
                returncode=124,  # timeout exit code
                stdout=e.stdout.decode() if e.stdout else "",
                stderr=f"Command timed out after 300 seconds: {e}",
            )
        except (OSError, PermissionError, FileNotFoundError) as e:
            # File system or permission errors
            return subprocess.CompletedProcess(
                args=cmd,
                returncode=1,
                stdout="",
                stderr=f"System error: {e}",
            )
        except Exception as e:
            # Unexpected errors - wrap for better reporting
            wrapped = handle_unexpected_error(e, f"git command: {' '.join(cmd)}")
            return subprocess.CompletedProcess(
                args=cmd,
                returncode=1,
                stdout="",
                stderr=str(wrapped),
            )

    def run_git_command_with_input(
        self, args: list[str], input_text: str, env: dict[str, str] | None = None
    ) -> subprocess.CompletedProcess[str]:
        """Run a git command with stdin input and return the complete result.

        Args:
            args: Git command arguments (without 'git')
            input_text: Text to provide as stdin to the command
            env: Optional environment variables

        Returns:
            CompletedProcess with stdout, stderr, and return code
        """
        cmd = ["git"] + args
        try:
            result = subprocess.run(
                cmd,
                cwd=self.repo_path,
                input=input_text,
                capture_output=True,
                text=True,
                env=env,
                timeout=300,  # 5 minute timeout
            )
            return result
        except subprocess.TimeoutExpired as e:
            return subprocess.CompletedProcess(
                args=cmd,
                returncode=124,  # timeout exit code
                stdout=e.stdout.decode() if e.stdout else "",
                stderr=f"Command timed out after 300 seconds: {e}",
            )
        except (OSError, PermissionError, FileNotFoundError) as e:
            # File system or permission errors
            return subprocess.CompletedProcess(
                args=cmd,
                returncode=1,
                stdout="",
                stderr=f"System error: {e}",
            )
        except Exception as e:
            # Unexpected errors - wrap for better reporting
            wrapped = handle_unexpected_error(e, f"git command: {' '.join(cmd)}")
            return subprocess.CompletedProcess(
                args=cmd,
                returncode=1,
                stdout="",
                stderr=str(wrapped),
            )
Functions
__init__(repo_path: Optional[Union[str, Path]] = None) -> None

Initialize GitOps with optional repository path.

Parameters:

Name Type Description Default
repo_path Optional[Union[str, Path]]

Path to git repository. Defaults to current directory.

None
Source code in src/git_autosquash/git_ops.py
def __init__(self, repo_path: Optional[Union[str, Path]] = None) -> None:
    """Initialize GitOps with optional repository path.

    Args:
        repo_path: Path to git repository. Defaults to current directory.
    """
    if repo_path is None:
        self.repo_path = Path.cwd()
    else:
        self.repo_path = Path(repo_path)
is_git_available() -> bool

Check if git is installed and available.

Returns:

Type Description
bool

True if git command is available, False otherwise

Source code in src/git_autosquash/git_ops.py
def is_git_available(self) -> bool:
    """Check if git is installed and available.

    Returns:
        True if git command is available, False otherwise
    """
    try:
        result = subprocess.run(
            ["git", "--version"],
            capture_output=True,
            text=True,
            timeout=5,
        )
        return result.returncode == 0
    except (subprocess.SubprocessError, FileNotFoundError, OSError):
        return False
is_git_repo() -> bool

Check if current directory is inside a git repository.

Returns:

Type Description
bool

True if in a git repository, False otherwise

Source code in src/git_autosquash/git_ops.py
def is_git_repo(self) -> bool:
    """Check if current directory is inside a git repository.

    Returns:
        True if in a git repository, False otherwise
    """
    success, _ = self._run_git_command("rev-parse", "--git-dir")
    return success
get_current_branch() -> Optional[str]

Get the current branch name.

Returns:

Type Description
Optional[str]

Branch name if on a branch, None if detached HEAD

Source code in src/git_autosquash/git_ops.py
def get_current_branch(self) -> Optional[str]:
    """Get the current branch name.

    Returns:
        Branch name if on a branch, None if detached HEAD
    """
    success, output = self._run_git_command("symbolic-ref", "--short", "HEAD")
    return output if success else None
get_merge_base_with_main(current_branch: str) -> Optional[str]

Find merge base with main/master branch.

Parameters:

Name Type Description Default
current_branch str

Current branch name

required

Returns:

Type Description
Optional[str]

Commit hash of merge base, or None if not found

Source code in src/git_autosquash/git_ops.py
def get_merge_base_with_main(self, current_branch: str) -> Optional[str]:
    """Find merge base with main/master branch.

    Args:
        current_branch: Current branch name

    Returns:
        Commit hash of merge base, or None if not found
    """
    # Try merge-base directly, let git handle missing refs
    for main_branch in ["main", "master"]:
        if main_branch == current_branch:
            continue

        success, output = self._run_git_command(
            "merge-base", main_branch, current_branch
        )
        if success:
            return output

    return None
get_working_tree_status() -> dict[str, bool]

Get working tree status information.

Returns:

Type Description
dict[str, bool]

Dictionary with status flags: has_staged, has_unstaged, is_clean

Source code in src/git_autosquash/git_ops.py
def get_working_tree_status(self) -> dict[str, bool]:
    """Get working tree status information.

    Returns:
        Dictionary with status flags: has_staged, has_unstaged, is_clean
    """
    success, output = self._run_git_command("status", "--porcelain")
    if not success:
        return {"has_staged": False, "has_unstaged": False, "is_clean": True}

    lines = output.split("\n") if output else []
    has_staged = any(line and line[0] not in "? " for line in lines)
    has_unstaged = any(line and line[1] not in " " for line in lines)
    is_clean = not lines or all(not line.strip() for line in lines)

    return {
        "has_staged": has_staged,
        "has_unstaged": has_unstaged,
        "is_clean": is_clean,
    }
has_commits_since_merge_base(merge_base: str) -> bool

Check if there are commits on current branch since merge base.

Parameters:

Name Type Description Default
merge_base str

Merge base commit hash

required

Returns:

Type Description
bool

True if there are commits to work with

Source code in src/git_autosquash/git_ops.py
def has_commits_since_merge_base(self, merge_base: str) -> bool:
    """Check if there are commits on current branch since merge base.

    Args:
        merge_base: Merge base commit hash

    Returns:
        True if there are commits to work with
    """
    success, output = self._run_git_command(
        "rev-list", "--count", f"{merge_base}..HEAD"
    )
    if not success:
        return False

    try:
        count = int(output)
        return count > 0
    except ValueError:
        return False
validate_merge_base(base_ref: str) -> tuple[bool, str, Optional[str]]

Validate that a base reference is valid and usable as a merge-base.

Parameters:

Name Type Description Default
base_ref str

Git reference (branch name, commit hash, etc.)

required

Returns:

Type Description
bool

Tuple of (is_valid, error_message, resolved_commit_hash)

str

If is_valid is True, error_message will be empty and resolved_commit_hash will be set

Optional[str]

If is_valid is False, error_message will contain the reason and resolved_commit_hash will be None

Source code in src/git_autosquash/git_ops.py
def validate_merge_base(self, base_ref: str) -> tuple[bool, str, Optional[str]]:
    """Validate that a base reference is valid and usable as a merge-base.

    Args:
        base_ref: Git reference (branch name, commit hash, etc.)

    Returns:
        Tuple of (is_valid, error_message, resolved_commit_hash)
        If is_valid is True, error_message will be empty and resolved_commit_hash will be set
        If is_valid is False, error_message will contain the reason and resolved_commit_hash will be None
    """
    # First, check if the ref exists and resolve it to a commit hash
    success, output = self._run_git_command("rev-parse", "--verify", base_ref)
    if not success:
        return False, f"Reference '{base_ref}' does not exist", None

    resolved_hash = output.strip()

    # Check if it's actually a commit (not a tree or blob)
    success, obj_type = self._run_git_command("cat-file", "-t", resolved_hash)
    if not success or obj_type.strip() != "commit":
        return (
            False,
            f"'{base_ref}' is not a commit (type: {obj_type.strip()})",
            None,
        )

    # Check if the base is an ancestor of HEAD
    success, _ = self._run_git_command(
        "merge-base", "--is-ancestor", resolved_hash, "HEAD"
    )
    if not success:
        return (
            False,
            f"'{base_ref}' is not an ancestor of HEAD (not in current branch history)",
            None,
        )

    # Check if there are commits between base and HEAD
    success, output = self._run_git_command(
        "rev-list", "--count", f"{resolved_hash}..HEAD"
    )
    if not success:
        return False, f"Failed to check commits since '{base_ref}'", None

    try:
        count = int(output.strip())
        if count == 0:
            return False, f"No commits between '{base_ref}' and HEAD", None
    except ValueError:
        return False, f"Failed to parse commit count for '{base_ref}'", None

    return True, "", resolved_hash
run_git_command(args: list[str], env: dict[str, str] | None = None, preserve_line_endings: bool = False) -> subprocess.CompletedProcess[str]

Run a git command and return the complete result.

Parameters:

Name Type Description Default
args list[str]

Git command arguments (without 'git')

required
env dict[str, str] | None

Optional environment variables

None
preserve_line_endings bool

If True, don't do universal newline conversion. Use for diff/show output where CRLF must be preserved.

False

Returns:

Type Description
CompletedProcess[str]

CompletedProcess with stdout, stderr, and return code

Source code in src/git_autosquash/git_ops.py
def run_git_command(
    self,
    args: list[str],
    env: dict[str, str] | None = None,
    preserve_line_endings: bool = False,
) -> subprocess.CompletedProcess[str]:
    """Run a git command and return the complete result.

    Args:
        args: Git command arguments (without 'git')
        env: Optional environment variables
        preserve_line_endings: If True, don't do universal newline conversion.
                              Use for diff/show output where CRLF must be preserved.

    Returns:
        CompletedProcess with stdout, stderr, and return code
    """
    cmd = ["git"] + args
    try:
        if preserve_line_endings:
            # Use binary mode and decode manually to preserve \r characters
            bin_result = subprocess.run(
                cmd,
                cwd=self.repo_path,
                capture_output=True,
                env=env,
                timeout=300,
            )
            # Decode without universal newline conversion
            return subprocess.CompletedProcess(
                args=bin_result.args,
                returncode=bin_result.returncode,
                stdout=bin_result.stdout.decode("utf-8", errors="replace"),
                stderr=bin_result.stderr.decode("utf-8", errors="replace"),
            )
        else:
            text_result = subprocess.run(
                cmd,
                cwd=self.repo_path,
                capture_output=True,
                text=True,
                env=env,
                timeout=300,  # 5 minute timeout
            )
            return text_result
    except subprocess.TimeoutExpired as e:
        return subprocess.CompletedProcess(
            args=cmd,
            returncode=124,  # timeout exit code
            stdout=e.stdout.decode() if e.stdout else "",
            stderr=f"Command timed out after 300 seconds: {e}",
        )
    except (OSError, PermissionError, FileNotFoundError) as e:
        # File system or permission errors
        return subprocess.CompletedProcess(
            args=cmd,
            returncode=1,
            stdout="",
            stderr=f"System error: {e}",
        )
    except Exception as e:
        # Unexpected errors - wrap for better reporting
        wrapped = handle_unexpected_error(e, f"git command: {' '.join(cmd)}")
        return subprocess.CompletedProcess(
            args=cmd,
            returncode=1,
            stdout="",
            stderr=str(wrapped),
        )
run_git_command_with_input(args: list[str], input_text: str, env: dict[str, str] | None = None) -> subprocess.CompletedProcess[str]

Run a git command with stdin input and return the complete result.

Parameters:

Name Type Description Default
args list[str]

Git command arguments (without 'git')

required
input_text str

Text to provide as stdin to the command

required
env dict[str, str] | None

Optional environment variables

None

Returns:

Type Description
CompletedProcess[str]

CompletedProcess with stdout, stderr, and return code

Source code in src/git_autosquash/git_ops.py
def run_git_command_with_input(
    self, args: list[str], input_text: str, env: dict[str, str] | None = None
) -> subprocess.CompletedProcess[str]:
    """Run a git command with stdin input and return the complete result.

    Args:
        args: Git command arguments (without 'git')
        input_text: Text to provide as stdin to the command
        env: Optional environment variables

    Returns:
        CompletedProcess with stdout, stderr, and return code
    """
    cmd = ["git"] + args
    try:
        result = subprocess.run(
            cmd,
            cwd=self.repo_path,
            input=input_text,
            capture_output=True,
            text=True,
            env=env,
            timeout=300,  # 5 minute timeout
        )
        return result
    except subprocess.TimeoutExpired as e:
        return subprocess.CompletedProcess(
            args=cmd,
            returncode=124,  # timeout exit code
            stdout=e.stdout.decode() if e.stdout else "",
            stderr=f"Command timed out after 300 seconds: {e}",
        )
    except (OSError, PermissionError, FileNotFoundError) as e:
        # File system or permission errors
        return subprocess.CompletedProcess(
            args=cmd,
            returncode=1,
            stdout="",
            stderr=f"System error: {e}",
        )
    except Exception as e:
        # Unexpected errors - wrap for better reporting
        wrapped = handle_unexpected_error(e, f"git command: {' '.join(cmd)}")
        return subprocess.CompletedProcess(
            args=cmd,
            returncode=1,
            stdout="",
            stderr=str(wrapped),
        )
Functions

HunkParser

Parses Git diff output into structured hunk objects.

git_autosquash.hunk_parser

Diff parsing and hunk splitting module.

Classes
DiffHunk dataclass

Represents a single diff hunk with metadata.

Can represent either a content hunk (line changes) or a file-level operation like file deletion. For file deletions, is_file_deletion will be True and the line number fields (old_start, etc.) will be placeholders (0).

Source code in src/git_autosquash/hunk_parser.py
@dataclass
class DiffHunk:
    """Represents a single diff hunk with metadata.

    Can represent either a content hunk (line changes) or a file-level operation
    like file deletion. For file deletions, is_file_deletion will be True and
    the line number fields (old_start, etc.) will be placeholders (0).
    """

    file_path: str
    old_start: int
    old_count: int
    new_start: int
    new_count: int
    lines: List[str]
    context_before: List[str]
    context_after: List[str]
    # File deletion support
    is_file_deletion: bool = False
    deleted_file_mode: Optional[str] = None
    deleted_file_content: Optional[str] = None

    @property
    def affected_lines(self) -> range:
        """Get the range of lines affected in the new file."""
        return range(self.new_start, self.new_start + self.new_count)

    @property
    def has_additions(self) -> bool:
        """Check if hunk contains added lines."""
        return any(
            line.startswith("+") and not line.startswith("+++") for line in self.lines
        )

    @property
    def has_deletions(self) -> bool:
        """Check if hunk contains deleted lines."""
        return any(
            line.startswith("-") and not line.startswith("---") for line in self.lines
        )
Attributes
affected_lines: range property

Get the range of lines affected in the new file.

has_additions: bool property

Check if hunk contains added lines.

has_deletions: bool property

Check if hunk contains deleted lines.

HunkParser

Parses git diff output into structured hunks.

Source code in src/git_autosquash/hunk_parser.py
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
class HunkParser:
    """Parses git diff output into structured hunks."""

    def __init__(self, git_ops: GitOps) -> None:
        """Initialize HunkParser with GitOps instance.

        Args:
            git_ops: GitOps instance for running git commands
        """
        self.git_ops = git_ops

    def get_diff_hunks(
        self,
        line_by_line: bool = False,
        from_commit: Optional[str] = None,
        source: str = "auto",
    ) -> List[DiffHunk]:
        """Extract hunks from a commit or working tree.

        Recommended usage: Use from_commit with SourceNormalizer for consistent
        parsing from normalized commits. The source parameter is maintained for
        backward compatibility.

        Args:
            line_by_line: If True, split hunks line-by-line for finer granularity
            from_commit: Commit hash to parse (recommended path, use with SourceNormalizer)
            source: DEPRECATED - What to process. Use from_commit instead.
                   Values: 'auto' (detect based on tree status), 'working-tree',
                   'index', 'head', or a commit SHA

        Returns:
            List of DiffHunk objects representing changes
        """
        if from_commit:
            # New path: parse from normalized commit
            if source != "auto":
                logger.debug(
                    f"Ignoring 'source={source}' parameter because from_commit is provided"
                )

            success, diff_output = self.git_ops._run_git_command(
                "show", "--format=", from_commit, preserve_line_endings=True
            )
            if not success:
                logger.warning(
                    f"Failed to get diff from commit {from_commit}: git command failed"
                )
                return []
            # Pass parent ref for getting deleted file content
            parent_ref = f"{from_commit}~1"
            hunks = self._parse_diff_output(diff_output, parent_ref=parent_ref)
        else:
            # Legacy path: maintain backward compatibility
            # Note: Always emit warning since source parameter is deprecated
            logger.debug(
                "Using deprecated source-based parsing. "
                "Consider using SourceNormalizer with from_commit parameter."
            )
            hunks = self._get_hunks_from_source(source)

        if line_by_line:
            hunks = self._split_hunks_line_by_line(hunks)

        return hunks

    def _get_hunks_from_source(self, source: str) -> List[DiffHunk]:
        """Get hunks from legacy source specification (backward compatibility).

        Args:
            source: Source specification ('auto', 'working-tree', 'index', 'head', or commit SHA)

        Returns:
            List of DiffHunk objects
        """
        if source == "auto":
            # Auto-detect based on working tree status
            status = self.git_ops.get_working_tree_status()

            if status["is_clean"]:
                # Working tree is clean, diff HEAD~1 to get previous commit changes
                success, diff_output = self.git_ops._run_git_command(
                    "show", "--format=", "HEAD", preserve_line_endings=True
                )
            elif status["has_staged"] and not status["has_unstaged"]:
                # Only staged changes, diff them
                success, diff_output = self.git_ops._run_git_command(
                    "diff", "--cached", preserve_line_endings=True
                )
            elif not status["has_staged"] and status["has_unstaged"]:
                # Only unstaged changes, diff them
                success, diff_output = self.git_ops._run_git_command(
                    "diff", preserve_line_endings=True
                )
            else:
                # Both staged and unstaged changes - process only staged changes
                # Unstaged changes will be temporarily stashed by the rebase manager
                success, diff_output = self.git_ops._run_git_command(
                    "diff", "--cached", preserve_line_endings=True
                )
        elif source == "working-tree":
            # Explicitly diff working tree (unstaged changes)
            success, diff_output = self.git_ops._run_git_command(
                "diff", preserve_line_endings=True
            )
        elif source == "index":
            # Explicitly diff staged changes
            success, diff_output = self.git_ops._run_git_command(
                "diff", "--cached", preserve_line_endings=True
            )
        elif source == "head" or source == "HEAD":
            # Diff HEAD commit
            success, diff_output = self.git_ops._run_git_command(
                "show", "--format=", "HEAD", preserve_line_endings=True
            )
        else:
            # Assume it's a commit SHA
            success, diff_output = self.git_ops._run_git_command(
                "show", "--format=", source, preserve_line_endings=True
            )

        if not success:
            return []

        return self._parse_diff_output(diff_output, parent_ref="HEAD")

    def _parse_diff_output(
        self, diff_output: str, parent_ref: str = "HEAD"
    ) -> List[DiffHunk]:
        """Parse git diff output into DiffHunk objects.

        Handles both content hunks (line changes) and file-level operations
        (deletions). For file deletions, creates a synthetic DiffHunk with
        is_file_deletion=True.

        Args:
            diff_output: Raw git diff output
            parent_ref: Git ref to use when retrieving deleted file content

        Returns:
            List of parsed DiffHunk objects (content hunks and file deletions)
        """
        if not diff_output.strip():
            return []

        hunks = []
        lines = diff_output.split("\n")
        current_file = None
        current_file_deleted = False
        current_file_mode = None
        i = 0

        while i < len(lines):
            line = lines[i]

            # Track current file being processed
            if line.startswith("diff --git"):
                # Before processing new file, check if previous file was deleted with no hunks
                if (
                    current_file
                    and current_file_deleted
                    and not self._has_hunks_for_file(hunks, current_file)
                ):
                    # Create synthetic hunk for file deletion
                    deletion_hunk = self._create_file_deletion_hunk(
                        current_file, current_file_mode, parent_ref
                    )
                    hunks.append(deletion_hunk)

                # Extract file path from "diff --git a/path b/path"
                match = re.match(r"diff --git a/(.*) b/(.*)", line)
                if match:
                    current_file = match.group(2)  # Use the new file path
                    current_file_deleted = False
                    current_file_mode = None

            # Detect file deletion
            elif line.startswith("deleted file mode"):
                match = re.match(r"deleted file mode (\d+)", line)
                if match:
                    current_file_deleted = True
                    current_file_mode = match.group(1)
                    logger.debug(
                        f"Detected file deletion: {current_file} (mode {current_file_mode})"
                    )

            # Handle binary file deletion (no content hunks)
            elif line.startswith("Binary files") and "and /dev/null differ" in line:
                # Binary file deletion - already tracked via deleted file mode
                # No content hunks will follow, so synthetic hunk will be created
                logger.debug(f"Binary file deletion detected: {current_file}")

            elif line.startswith("@@") and current_file:
                # Parse hunk header: @@ -old_start,old_count +new_start,new_count @@
                hunk_match = re.match(
                    r"@@ -(\d+)(?:,(\d+))? \+(\d+)(?:,(\d+))? @@", line
                )
                if hunk_match:
                    old_start = int(hunk_match.group(1))
                    old_count = int(hunk_match.group(2) or 1)
                    new_start = int(hunk_match.group(3))
                    new_count = int(hunk_match.group(4) or 1)

                    # Collect hunk lines
                    hunk_lines = [line]  # Include the @@ line
                    i += 1

                    while (
                        i < len(lines)
                        and not lines[i].startswith("@@")
                        and not lines[i].startswith("diff --git")
                    ):
                        hunk_lines.append(
                            lines[i]
                        )  # Preserve all lines including empty ones
                        i += 1

                    # Create DiffHunk object
                    hunk = DiffHunk(
                        file_path=current_file,
                        old_start=old_start,
                        old_count=old_count,
                        new_start=new_start,
                        new_count=new_count,
                        lines=hunk_lines,
                        context_before=[],
                        context_after=[],
                        # Mark as file deletion if the file was deleted
                        is_file_deletion=current_file_deleted,
                        deleted_file_mode=current_file_mode
                        if current_file_deleted
                        else None,
                    )

                    hunks.append(hunk)
                    continue  # Don't increment i, it was already incremented in the loop

            i += 1

        # Handle last file if it was deleted with no hunks
        if (
            current_file
            and current_file_deleted
            and not self._has_hunks_for_file(hunks, current_file)
        ):
            deletion_hunk = self._create_file_deletion_hunk(
                current_file, current_file_mode, parent_ref
            )
            hunks.append(deletion_hunk)

        return hunks

    def _has_hunks_for_file(self, hunks: List[DiffHunk], file_path: str) -> bool:
        """Check if hunks list already contains hunks for the given file.

        Args:
            hunks: List of hunks to search
            file_path: File path to check

        Returns:
            True if hunks contains at least one hunk for file_path
        """
        return any(hunk.file_path == file_path for hunk in hunks)

    def _create_file_deletion_hunk(
        self, file_path: str, file_mode: Optional[str], parent_ref: str
    ) -> DiffHunk:
        """Create a synthetic hunk for a file deletion.

        This is used when a file is deleted but has no content hunks
        (e.g., empty file deletion).

        Args:
            file_path: Path of deleted file
            file_mode: File mode (e.g., "100644")
            parent_ref: Git ref to use for getting file content

        Returns:
            DiffHunk representing the file deletion
        """
        # Get file content for TUI preview (from parent commit)
        deleted_content = self._get_deleted_file_content(file_path, parent_ref)

        return DiffHunk(
            file_path=file_path,
            old_start=0,
            old_count=0,
            new_start=0,
            new_count=0,
            lines=[],  # No hunk lines for pure file deletion
            context_before=[],
            context_after=[],
            is_file_deletion=True,
            deleted_file_mode=file_mode,
            deleted_file_content=deleted_content,
        )

    def _get_deleted_file_content(
        self, file_path: str, parent_ref: str
    ) -> Optional[str]:
        """Get the last content of a deleted file for preview.

        Limits content to first 1000 lines or 100KB to prevent memory exhaustion
        from large file deletions.

        Args:
            file_path: Path of deleted file
            parent_ref: Git ref to retrieve content from (typically parent commit)

        Returns:
            File content as string (truncated if large), or None if cannot retrieve
        """
        try:
            success, content = self.git_ops._run_git_command(
                "show", f"{parent_ref}:{file_path}"
            )
            if success:
                # Limit to 100KB to prevent memory exhaustion
                # MUST truncate BEFORE any string operations to handle single-line large files
                MAX_CONTENT_SIZE = 100 * 1024  # 100KB
                if len(content) > MAX_CONTENT_SIZE:
                    # Truncate first, then find clean line boundary
                    truncated = content[:MAX_CONTENT_SIZE]
                    # Find last newline for clean cut
                    last_newline = truncated.rfind("\n")
                    if last_newline > 0:
                        truncated = truncated[:last_newline]
                    return (
                        truncated
                        + "\n\n[Content truncated - file too large for preview]"
                    )
                return content
        except Exception as e:
            logger.debug(
                f"Could not retrieve deleted file content for {file_path}: {e}"
            )

        return None

    def _split_hunks_line_by_line(self, hunks: List[DiffHunk]) -> List[DiffHunk]:
        """Split hunks into line-by-line changes for finer granularity.

        Args:
            hunks: List of original hunks to split

        Returns:
            List of line-by-line split hunks
        """
        split_hunks = []

        for hunk in hunks:
            # If hunk is already small (single line change), keep as-is
            change_lines = [
                line for line in hunk.lines[1:] if line.startswith(("+", "-"))
            ]
            if len(change_lines) <= 1:
                split_hunks.append(hunk)
                continue

            # Split into individual line changes
            current_old_line = hunk.old_start
            current_new_line = hunk.new_start

            i = 1  # Skip the @@ header line
            while i < len(hunk.lines):
                line = hunk.lines[i]

                if line.startswith("+"):
                    # Addition: create a single-line hunk with proper line counts
                    header = f"@@ -{current_old_line},0 +{current_new_line},1 @@"
                    new_hunk = DiffHunk(
                        file_path=hunk.file_path,
                        old_start=current_old_line,
                        old_count=0,
                        new_start=current_new_line,
                        new_count=1,
                        lines=[header, line],
                        context_before=[],
                        context_after=[],
                    )
                    split_hunks.append(new_hunk)
                    current_new_line += 1

                elif line.startswith("-"):
                    # Deletion: create a single-line hunk with proper line counts
                    header = f"@@ -{current_old_line},1 +{current_new_line},0 @@"
                    new_hunk = DiffHunk(
                        file_path=hunk.file_path,
                        old_start=current_old_line,
                        old_count=1,
                        new_start=current_new_line,
                        new_count=0,
                        lines=[header, line],
                        context_before=[],
                        context_after=[],
                    )
                    split_hunks.append(new_hunk)
                    current_old_line += 1

                else:
                    # Context line: advance both pointers
                    current_old_line += 1
                    current_new_line += 1

                i += 1

        return split_hunks

    def get_file_content_at_lines(
        self, file_path: str, start_line: int, end_line: int, ref: str = "HEAD"
    ) -> List[str]:
        """Get file content at specific line range for context.

        Args:
            file_path: Path to the file
            start_line: Starting line number (1-based)
            end_line: Ending line number (1-based, inclusive)
            ref: Git ref to use for file content (default: HEAD)

        Returns:
            List of lines from the file, empty list on error
        """
        # Use git show with line range for efficiency on large files
        success, output = self.git_ops._run_git_command("show", f"{ref}:{file_path}")

        if not success:
            return []

        try:
            lines = output.split("\n")
            # Convert to 0-based indexing and ensure bounds
            start_idx = max(0, start_line - 1)
            end_idx = min(len(lines), end_line)

            return lines[start_idx:end_idx]
        except Exception:
            # Handle any parsing errors gracefully
            return []
Functions
__init__(git_ops: GitOps) -> None

Initialize HunkParser with GitOps instance.

Parameters:

Name Type Description Default
git_ops GitOps

GitOps instance for running git commands

required
Source code in src/git_autosquash/hunk_parser.py
def __init__(self, git_ops: GitOps) -> None:
    """Initialize HunkParser with GitOps instance.

    Args:
        git_ops: GitOps instance for running git commands
    """
    self.git_ops = git_ops
get_diff_hunks(line_by_line: bool = False, from_commit: Optional[str] = None, source: str = 'auto') -> List[DiffHunk]

Extract hunks from a commit or working tree.

Recommended usage: Use from_commit with SourceNormalizer for consistent parsing from normalized commits. The source parameter is maintained for backward compatibility.

Parameters:

Name Type Description Default
line_by_line bool

If True, split hunks line-by-line for finer granularity

False
from_commit Optional[str]

Commit hash to parse (recommended path, use with SourceNormalizer)

None
source str

DEPRECATED - What to process. Use from_commit instead. Values: 'auto' (detect based on tree status), 'working-tree', 'index', 'head', or a commit SHA

'auto'

Returns:

Type Description
List[DiffHunk]

List of DiffHunk objects representing changes

Source code in src/git_autosquash/hunk_parser.py
def get_diff_hunks(
    self,
    line_by_line: bool = False,
    from_commit: Optional[str] = None,
    source: str = "auto",
) -> List[DiffHunk]:
    """Extract hunks from a commit or working tree.

    Recommended usage: Use from_commit with SourceNormalizer for consistent
    parsing from normalized commits. The source parameter is maintained for
    backward compatibility.

    Args:
        line_by_line: If True, split hunks line-by-line for finer granularity
        from_commit: Commit hash to parse (recommended path, use with SourceNormalizer)
        source: DEPRECATED - What to process. Use from_commit instead.
               Values: 'auto' (detect based on tree status), 'working-tree',
               'index', 'head', or a commit SHA

    Returns:
        List of DiffHunk objects representing changes
    """
    if from_commit:
        # New path: parse from normalized commit
        if source != "auto":
            logger.debug(
                f"Ignoring 'source={source}' parameter because from_commit is provided"
            )

        success, diff_output = self.git_ops._run_git_command(
            "show", "--format=", from_commit, preserve_line_endings=True
        )
        if not success:
            logger.warning(
                f"Failed to get diff from commit {from_commit}: git command failed"
            )
            return []
        # Pass parent ref for getting deleted file content
        parent_ref = f"{from_commit}~1"
        hunks = self._parse_diff_output(diff_output, parent_ref=parent_ref)
    else:
        # Legacy path: maintain backward compatibility
        # Note: Always emit warning since source parameter is deprecated
        logger.debug(
            "Using deprecated source-based parsing. "
            "Consider using SourceNormalizer with from_commit parameter."
        )
        hunks = self._get_hunks_from_source(source)

    if line_by_line:
        hunks = self._split_hunks_line_by_line(hunks)

    return hunks
get_file_content_at_lines(file_path: str, start_line: int, end_line: int, ref: str = 'HEAD') -> List[str]

Get file content at specific line range for context.

Parameters:

Name Type Description Default
file_path str

Path to the file

required
start_line int

Starting line number (1-based)

required
end_line int

Ending line number (1-based, inclusive)

required
ref str

Git ref to use for file content (default: HEAD)

'HEAD'

Returns:

Type Description
List[str]

List of lines from the file, empty list on error

Source code in src/git_autosquash/hunk_parser.py
def get_file_content_at_lines(
    self, file_path: str, start_line: int, end_line: int, ref: str = "HEAD"
) -> List[str]:
    """Get file content at specific line range for context.

    Args:
        file_path: Path to the file
        start_line: Starting line number (1-based)
        end_line: Ending line number (1-based, inclusive)
        ref: Git ref to use for file content (default: HEAD)

    Returns:
        List of lines from the file, empty list on error
    """
    # Use git show with line range for efficiency on large files
    success, output = self.git_ops._run_git_command("show", f"{ref}:{file_path}")

    if not success:
        return []

    try:
        lines = output.split("\n")
        # Convert to 0-based indexing and ensure bounds
        start_idx = max(0, start_line - 1)
        end_idx = min(len(lines), end_line)

        return lines[start_idx:end_idx]
    except Exception:
        # Handle any parsing errors gracefully
        return []

BlameAnalyzer

Analyzes Git blame information to determine target commits for hunks.

git_autosquash.blame_analyzer

Git blame analysis and target commit resolution.

Classes
TargetingMethod

Bases: Enum

Enum for different targeting methods used to resolve a hunk.

Source code in src/git_autosquash/blame_analyzer.py
class TargetingMethod(Enum):
    """Enum for different targeting methods used to resolve a hunk."""

    BLAME_MATCH = "blame_match"
    CONTEXTUAL_BLAME_MATCH = "contextual_blame_match"  # Found via context lines
    FILE_DELETION = "file_deletion"  # File deletion targeting the addition commit
    FALLBACK_NEW_FILE = "fallback_new_file"
    FALLBACK_EXISTING_FILE = "fallback_existing_file"
    FALLBACK_CONSISTENCY = (
        "fallback_consistency"  # Same target as previous hunk from file
    )
HunkTargetMapping dataclass

Maps a hunk to its target commit for squashing.

Source code in src/git_autosquash/blame_analyzer.py
@dataclass
class HunkTargetMapping:
    """Maps a hunk to its target commit for squashing."""

    hunk: DiffHunk
    target_commit: Optional[str]
    confidence: str  # 'high', 'medium', 'low'
    blame_info: List[BlameInfo]
    targeting_method: TargetingMethod = TargetingMethod.BLAME_MATCH
    fallback_candidates: Optional[List[str]] = (
        None  # List of commit hashes for fallback scenarios
    )
    needs_user_selection: bool = False  # True if user needs to choose from candidates
BlameAnalyzer

Analyzes git blame to determine target commits for hunks.

Source code in src/git_autosquash/blame_analyzer.py
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
class BlameAnalyzer:
    """Analyzes git blame to determine target commits for hunks."""

    def __init__(
        self, git_ops: GitOps, merge_base: str, blame_ref: str = "HEAD"
    ) -> None:
        """Initialize BlameAnalyzer.

        Args:
            git_ops: GitOps instance for running git commands
            merge_base: Merge base commit hash to limit scope
            blame_ref: Git ref to use for blame operations (default: HEAD)
        """
        self.git_ops = git_ops
        self.merge_base = merge_base
        self.blame_ref = blame_ref
        self.batch_ops = BatchGitOperations(git_ops, merge_base, blame_ref=blame_ref)
        self._branch_commits_cache: Optional[Set[str]] = None
        self._commit_timestamp_cache: Dict[str, int] = {}
        self._file_target_cache: Dict[str, str] = {}  # Track previous targets by file
        self._new_files_cache: Optional[Set[str]] = None
        self._file_line_count_cache: Dict[str, int] = {}  # Cache file line counts

    def analyze_hunks(self, hunks: List[DiffHunk]) -> List[HunkTargetMapping]:
        """Analyze hunks and determine target commits for each.

        Args:
            hunks: List of DiffHunk objects to analyze

        Returns:
            List of HunkTargetMapping objects with target commit information
        """
        mappings = []

        for hunk in hunks:
            mapping = self._analyze_single_hunk(hunk)
            mappings.append(mapping)

        return mappings

    def _analyze_single_hunk(self, hunk: DiffHunk) -> HunkTargetMapping:
        """Analyze a single hunk to determine its target commit.

        Args:
            hunk: DiffHunk to analyze

        Returns:
            HunkTargetMapping with target commit information
        """
        # Handle file deletions specially - target the commit that added the file
        if hunk.is_file_deletion:
            addition_commit = self._find_file_addition_commit(hunk.file_path)
            if addition_commit:
                return HunkTargetMapping(
                    hunk=hunk,
                    target_commit=addition_commit,
                    confidence="high",
                    blame_info=[],
                    targeting_method=TargetingMethod.FILE_DELETION,
                    needs_user_selection=False,
                )
            # If we can't find addition commit, fall back to user selection
            return self._create_fallback_mapping(
                hunk, TargetingMethod.FALLBACK_EXISTING_FILE
            )

        # Check if this is a new file
        if self._is_new_file(hunk.file_path):
            return self._create_fallback_mapping(
                hunk, TargetingMethod.FALLBACK_NEW_FILE
            )

        # Check for previous target from same file (consistency)
        if hunk.file_path in self._file_target_cache:
            previous_target = self._file_target_cache[hunk.file_path]
            return HunkTargetMapping(
                hunk=hunk,
                target_commit=previous_target,
                confidence="medium",
                blame_info=[],
                targeting_method=TargetingMethod.FALLBACK_CONSISTENCY,
                needs_user_selection=False,
            )

        # Try blame-based analysis
        if hunk.has_deletions:
            # Get blame for the old lines being modified/deleted
            blame_info = self._get_blame_for_old_lines(hunk)
        else:
            # Pure addition, look at surrounding context
            blame_info = self._get_blame_for_context(hunk)

        if not blame_info:
            # Try contextual blame scanning as a fallback before giving up
            contextual_result = self._try_contextual_blame_fallback(
                hunk, self._get_branch_commits()
            )
            if contextual_result:
                return contextual_result

            return self._create_fallback_mapping(
                hunk, TargetingMethod.FALLBACK_EXISTING_FILE
            )

        # Filter commits to only those within our branch scope
        branch_commits = self._get_branch_commits()
        relevant_blame = [
            info for info in blame_info if info.commit_hash in branch_commits
        ]

        if not relevant_blame:
            # Try contextual blame scanning as a fallback before giving up
            contextual_result = self._try_contextual_blame_fallback(
                hunk, branch_commits
            )
            if contextual_result:
                return contextual_result

            return self._create_fallback_mapping(
                hunk, TargetingMethod.FALLBACK_EXISTING_FILE, blame_info
            )

        # Group by commit and count occurrences
        commit_counts: Dict[str, int] = {}
        for info in relevant_blame:
            commit_counts[info.commit_hash] = commit_counts.get(info.commit_hash, 0) + 1

        # Find most frequent commit, break ties by recency (requirement: take most recent)
        most_frequent_commit, max_count = max(
            commit_counts.items(),
            key=lambda x: (x[1], self._get_commit_timestamp(x[0])),
        )

        total_lines = len(relevant_blame)
        confidence_ratio = max_count / total_lines

        if confidence_ratio >= 0.8:
            confidence = "high"
        elif confidence_ratio >= 0.5:
            confidence = "medium"
        else:
            confidence = "low"

        # Store successful target for file consistency
        self._file_target_cache[hunk.file_path] = most_frequent_commit

        return HunkTargetMapping(
            hunk=hunk,
            target_commit=most_frequent_commit,
            confidence=confidence,
            blame_info=relevant_blame,
            targeting_method=TargetingMethod.BLAME_MATCH,
            needs_user_selection=False,
        )

    def _get_blame_for_old_lines(self, hunk: DiffHunk) -> List[BlameInfo]:
        """Get blame information for lines being deleted/modified.

        Args:
            hunk: DiffHunk with deletions

        Returns:
            List of BlameInfo objects for the deleted lines
        """
        # Run blame on the file at configured ref (before changes)
        success, blame_output = self.git_ops._run_git_command(
            "blame",
            f"-L{hunk.old_start},{hunk.old_start + hunk.old_count - 1}",
            self.blame_ref,
            "--",
            hunk.file_path,
        )

        if not success:
            return []

        return self._parse_blame_output(blame_output)

    def _get_blame_for_context(self, hunk: DiffHunk) -> List[BlameInfo]:
        """Get blame information for context around an addition.

        Args:
            hunk: DiffHunk with additions

        Returns:
            List of BlameInfo objects for surrounding context
        """
        # For additions, we need to map the new coordinates back to old coordinates
        # The insertion happens at new_start, so we look around old_start
        context_lines = 3

        # For pure additions, old_start is where the insertion point was at blame_ref
        start_line = max(1, hunk.old_start - context_lines)
        end_line = hunk.old_start + context_lines

        success, blame_output = self.git_ops._run_git_command(
            "blame", f"-L{start_line},{end_line}", self.blame_ref, "--", hunk.file_path
        )

        if not success:
            return []

        return self._parse_blame_output(blame_output)

    def _get_contextual_lines_for_hunk(
        self, hunk: DiffHunk, context_lines: int = CONTEXTUAL_BLAME_LINES
    ) -> List[int]:
        """Get meaningful (non-whitespace) line numbers around a hunk.

        Uses OLD coordinates since git blame operates on HEAD (pre-change) state.
        For pure additions, searches around the insertion point in the original file.

        Args:
            hunk: DiffHunk to get context for
            context_lines: Number of lines above/below to consider

        Returns:
            List of line numbers that are meaningful context lines in HEAD
        """
        context_lines = min(context_lines, MAX_CONTEXTUAL_BLAME_LINES)

        # Use OLD coordinates for git blame HEAD compatibility
        if hunk.has_deletions or hunk.old_count > 0:
            # For modifications/deletions, use old coordinates
            base_start = hunk.old_start
            base_count = hunk.old_count
        else:
            # For pure additions, use insertion point in original file
            base_start = hunk.old_start
            base_count = 0  # No lines existed at insertion point

        # Get file line count to respect boundaries
        file_line_count = self._get_file_line_count(hunk.file_path)

        # Calculate context range around the hunk
        if base_count > 0:
            # For modifications/deletions: search around existing lines
            start_line = max(1, base_start - context_lines)
            end_line = min(file_line_count, base_start + base_count + context_lines - 1)
        else:
            # For pure additions: search around insertion point
            start_line = max(1, base_start - context_lines)
            end_line = min(file_line_count, base_start + context_lines)

        # Get all potential context lines
        potential_lines = list(range(start_line, end_line + 1))

        # Filter out whitespace-only lines
        meaningful_lines = self._filter_meaningful_lines(
            hunk.file_path, potential_lines
        )

        return meaningful_lines

    def _try_contextual_blame_fallback(
        self, hunk: DiffHunk, branch_commits: Set[str]
    ) -> Optional[HunkTargetMapping]:
        """Try contextual blame as fallback and return mapping if successful.

        Args:
            hunk: DiffHunk to analyze
            branch_commits: Set of commit hashes within branch scope

        Returns:
            HunkTargetMapping if contextual blame finds targets, None otherwise
        """
        contextual_blame_info = self._get_contextual_blame(hunk)

        if not contextual_blame_info:
            return None

        # Filter contextual blame to branch commits
        contextual_relevant_blame = [
            info for info in contextual_blame_info if info.commit_hash in branch_commits
        ]

        if contextual_relevant_blame:
            # Found contextual matches - analyze them
            return self._create_contextual_mapping(hunk, contextual_relevant_blame)

        return None

    def _get_file_line_count(self, file_path: str) -> int:
        """Get total line count for a file with safe fallback strategies.

        Args:
            file_path: Path to file

        Returns:
            Number of lines in file
        """
        if file_path in self._file_line_count_cache:
            return self._file_line_count_cache[file_path]

        # Try multiple approaches in order of preference
        line_count = self._try_get_line_count_from_head(file_path)
        if line_count is None:
            line_count = self._try_get_line_count_from_working_tree(file_path)
        if line_count is None:
            line_count = self._get_conservative_line_count_from_diff(file_path)

        # Cache and return
        self._file_line_count_cache[file_path] = line_count
        return line_count

    def _try_get_line_count_from_head(self, file_path: str) -> Optional[int]:
        """Try to get line count from blame_ref version of file.

        Args:
            file_path: Path to file

        Returns:
            Line count if successful, None otherwise
        """
        success, output = self.git_ops._run_git_command(
            "show", f"{self.blame_ref}:{file_path}"
        )
        if success and output is not None:
            return len(output.split("\n")) if output else 1
        return None

    def _try_get_line_count_from_working_tree(self, file_path: str) -> Optional[int]:
        """Try to get line count from working tree version of file.

        Args:
            file_path: Path to file

        Returns:
            Line count if successful, None otherwise
        """
        import os

        try:
            full_path = os.path.join(self.git_ops.repo_path, file_path)
            if os.path.exists(full_path):
                with open(full_path, "r", encoding="utf-8", errors="ignore") as f:
                    return sum(1 for _ in f)
        except (OSError, IOError):
            pass
        return None

    def _get_conservative_line_count_from_diff(self, file_path: str) -> int:
        """Get conservative line count based on diff information.

        Args:
            file_path: Path to file

        Returns:
            Conservative estimate of line count
        """
        success, output = self.git_ops._run_git_command(
            "diff", f"{self.merge_base}..HEAD", "--", file_path
        )

        if success and output:
            # Parse diff to find highest line number mentioned
            max_line = 0
            for line in output.split("\n"):
                if line.startswith("@@"):
                    # Parse hunk header: @@ -old_start,old_count +new_start,new_count @@
                    import re

                    match = re.match(
                        r"@@\s+-(\d+)(?:,(\d+))?\s+\+(\d+)(?:,(\d+))?\s+@@", line
                    )
                    if match:
                        new_start = int(match.group(3))
                        new_count = int(match.group(4)) if match.group(4) else 1
                        max_line = max(max_line, new_start + new_count - 1)

            if max_line > 0:
                return max_line + 10  # Add buffer for safety

        # Ultra-conservative fallback
        return 50

    def _filter_meaningful_lines(
        self, file_path: str, line_numbers: List[int]
    ) -> List[int]:
        """Filter out whitespace-only lines from a list of line numbers.

        Args:
            file_path: Path to file
            line_numbers: List of line numbers to check

        Returns:
            List of line numbers that contain meaningful content
        """
        if not line_numbers:
            return []

        meaningful_lines = []

        # Get the file content to check line contents
        success, file_content = self.git_ops._run_git_command(
            "show", f"{self.blame_ref}:{file_path}"
        )
        if not success:
            # If we can't read the file, assume all lines are meaningful
            return line_numbers

        lines = file_content.split("\n")

        for line_num in line_numbers:
            if 1 <= line_num <= len(lines):
                line_content = lines[line_num - 1]  # Convert to 0-based indexing
                # Skip empty lines and whitespace-only lines
                if line_content.strip():
                    meaningful_lines.append(line_num)

        return meaningful_lines

    def _get_contextual_blame(self, hunk: DiffHunk) -> List[BlameInfo]:
        """Get blame information for meaningful context lines around a hunk.

        This is used as a fallback when primary blame analysis fails.
        It searches ±1 line (excluding whitespace) for blame information.
        Uses batch operations for improved performance.

        Args:
            hunk: DiffHunk to get contextual blame for

        Returns:
            List of BlameInfo objects from context lines
        """
        context_lines = self._get_contextual_lines_for_hunk(hunk)
        if not context_lines:
            return []

        # Convert individual lines to ranges for batch blame
        line_ranges = [(line_num, line_num) for line_num in context_lines]

        # Use batch blame operation
        batch_blame_info = self.batch_ops.batch_blame_lines(hunk.file_path, line_ranges)

        # Convert batch blame info to regular blame info and expand hashes
        all_blame_info = []
        short_hashes = []

        for line_num in context_lines:
            if line_num in batch_blame_info:
                batch_info = batch_blame_info[line_num]
                short_hashes.append(batch_info.commit_hash)

        # Batch expand hashes
        expanded_hashes = (
            self.batch_ops.batch_expand_hashes(short_hashes) if short_hashes else {}
        )

        # Create BlameInfo objects with expanded hashes
        for line_num in context_lines:
            if line_num in batch_blame_info:
                batch_info = batch_blame_info[line_num]
                expanded_hash = expanded_hashes.get(
                    batch_info.commit_hash, batch_info.commit_hash
                )

                blame_info = BlameInfo(
                    commit_hash=expanded_hash,
                    author=batch_info.author,
                    timestamp=batch_info.timestamp,
                    line_number=batch_info.line_number,
                    line_content=batch_info.line_content,
                )
                all_blame_info.append(blame_info)

        return all_blame_info

    def _get_blame_for_single_line(
        self, file_path: str, line_num: int
    ) -> List[BlameInfo]:
        """Get blame information for a single line.

        Args:
            file_path: Path to file
            line_num: Line number to get blame for

        Returns:
            List of BlameInfo objects (usually just one)
        """
        success, blame_output = self.git_ops._run_git_command(
            "blame", f"-L{line_num},{line_num}", self.blame_ref, "--", file_path
        )

        if not success:
            return []

        return self._parse_blame_output(blame_output)

    def _create_contextual_mapping(
        self, hunk: DiffHunk, contextual_blame: List[BlameInfo]
    ) -> HunkTargetMapping:
        """Create a mapping from contextual blame analysis.

        Args:
            hunk: DiffHunk that was analyzed
            contextual_blame: List of BlameInfo from context lines

        Returns:
            HunkTargetMapping with contextual target commit
        """
        # Group by commit and count occurrences (same logic as primary blame)
        commit_counts: Dict[str, int] = {}
        for info in contextual_blame:
            commit_counts[info.commit_hash] = commit_counts.get(info.commit_hash, 0) + 1

        # Find most frequent commit, break ties by recency
        most_frequent_commit, max_count = max(
            commit_counts.items(),
            key=lambda x: (x[1], self._get_commit_timestamp(x[0])),
        )

        total_lines = len(contextual_blame)
        confidence_ratio = max_count / total_lines

        # Contextual matches have slightly lower confidence than direct matches
        if confidence_ratio >= 0.8:
            confidence = "medium"  # Reduced from "high"
        elif confidence_ratio >= 0.5:
            confidence = "medium"
        else:
            confidence = "low"

        # Store successful target for file consistency
        self._file_target_cache[hunk.file_path] = most_frequent_commit

        return HunkTargetMapping(
            hunk=hunk,
            target_commit=most_frequent_commit,
            confidence=confidence,
            blame_info=contextual_blame,
            targeting_method=TargetingMethod.CONTEXTUAL_BLAME_MATCH,
            needs_user_selection=False,
        )

    def _parse_blame_output(self, blame_output: str) -> List[BlameInfo]:
        """Parse git blame output into BlameInfo objects with batch hash expansion.

        Args:
            blame_output: Raw git blame output

        Returns:
            List of parsed BlameInfo objects with expanded commit hashes
        """
        # First pass: collect all blame entries with short hashes
        raw_blame_infos = []
        short_hashes = []

        for line in blame_output.split("\n"):
            if not line.strip():
                continue

            # Parse blame line format:
            # commit_hash (author timestamp line_num) line_content
            match = re.match(
                r"^([a-f0-9]+)\s+\(([^)]+)\s+(\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2} [+-]\d{4})\s+(\d+)\)\s*(.*)",
                line,
            )
            if match:
                commit_hash = match.group(1)
                author = match.group(2).strip()
                timestamp = match.group(3)
                line_number = int(match.group(4))
                line_content = match.group(5)

                raw_blame_infos.append(
                    {
                        "commit_hash": commit_hash,
                        "author": author,
                        "timestamp": timestamp,
                        "line_number": line_number,
                        "line_content": line_content,
                    }
                )

                if len(commit_hash) < 40:  # Short hash
                    short_hashes.append(commit_hash)

        # Batch expand short hashes
        expanded_hashes = (
            self.batch_ops.batch_expand_hashes(short_hashes) if short_hashes else {}
        )

        # Second pass: create BlameInfo objects with expanded hashes
        blame_infos = []
        for raw_info in raw_blame_infos:
            commit_hash = str(raw_info["commit_hash"])
            full_commit_hash = expanded_hashes.get(commit_hash, commit_hash)

            blame_info = BlameInfo(
                commit_hash=full_commit_hash,
                author=str(raw_info["author"]),
                timestamp=str(raw_info["timestamp"]),
                line_number=int(raw_info["line_number"]),
                line_content=str(raw_info["line_content"]),
            )
            blame_infos.append(blame_info)

        return blame_infos

    def _get_branch_commits(self) -> Set[str]:
        """Get all commits on current branch since merge base.

        Returns:
            Set of commit hashes within branch scope
        """
        if self._branch_commits_cache is not None:
            return self._branch_commits_cache

        branch_commits = self.batch_ops.get_branch_commits()
        self._branch_commits_cache = set(branch_commits)
        return self._branch_commits_cache

    def _get_commit_timestamp(self, commit_hash: str) -> int:
        """Get timestamp of a commit for recency comparison.

        Args:
            commit_hash: Commit hash to get timestamp for

        Returns:
            Unix timestamp of the commit
        """
        # Use batch operations for better performance
        commit_info = self.batch_ops.batch_load_commit_info([commit_hash])
        if commit_hash in commit_info:
            return commit_info[commit_hash].timestamp
        return 0

    def get_commit_summary(self, commit_hash: str) -> str:
        """Get a short summary of a commit for display.

        Args:
            commit_hash: Commit hash to summarize

        Returns:
            Short commit summary (hash + subject)
        """
        commit_info = self.batch_ops.batch_load_commit_info([commit_hash])
        if commit_hash in commit_info:
            info = commit_info[commit_hash]
            return f"{info.short_hash} {info.subject}"
        return commit_hash[:8]

    def _is_new_file(self, file_path: str) -> bool:
        """Check if a file is new (didn't exist at merge-base).

        Args:
            file_path: Path to check

        Returns:
            True if file is new, False if it existed at merge-base
        """
        if self._new_files_cache is None:
            self._new_files_cache = self.batch_ops.get_new_files()

        return file_path in self._new_files_cache

    def _find_file_addition_commit(self, file_path: str) -> Optional[str]:
        """Find the commit that added a file.

        Uses git log --follow --diff-filter=A to find when the file was first added.
        Only returns commits within the branch scope (between merge_base and blame_ref).

        Args:
            file_path: Path to the file

        Returns:
            Commit hash that added the file, or None if not found
        """
        logger.debug(f"Finding addition commit for {file_path}")

        # Use --follow to track renames, --diff-filter=A to find additions only
        # Limit search to commits between merge_base and blame_ref
        success, output = self.git_ops._run_git_command(
            "log",
            "--follow",
            "--diff-filter=A",
            "--format=%H",
            f"{self.merge_base}..{self.blame_ref}",
            "--",
            file_path,
        )

        if not success:
            logger.debug(f"git log failed for {file_path}")
            return None

        if not output:
            logger.debug(
                f"No addition commit found for {file_path} in range "
                f"{self.merge_base[:8]}..{self.blame_ref[:8] if isinstance(self.blame_ref, str) else self.blame_ref}"
            )
            return None

        # git log returns commits in reverse chronological order
        # The last line is the oldest commit = the one that added the file
        commits = output.strip().split("\n")
        if commits and commits[-1]:
            addition_commit = commits[-1]
            logger.debug(f"Found addition commit {addition_commit[:8]} for {file_path}")
            return addition_commit

        logger.debug(f"No valid commits in output for {file_path}")
        return None

    def _create_fallback_mapping(
        self,
        hunk: DiffHunk,
        method: TargetingMethod,
        blame_info: Optional[List[BlameInfo]] = None,
    ) -> HunkTargetMapping:
        """Create a fallback mapping that needs user selection.

        Args:
            hunk: DiffHunk to create mapping for
            method: Fallback method used
            blame_info: Optional blame info if available

        Returns:
            HunkTargetMapping with fallback candidates
        """
        candidates = self._get_fallback_candidates(hunk.file_path, method)

        return HunkTargetMapping(
            hunk=hunk,
            target_commit=None,
            confidence="low",
            blame_info=blame_info or [],
            targeting_method=method,
            fallback_candidates=candidates,
            needs_user_selection=True,
        )

    def _get_fallback_candidates(
        self, file_path: str, method: TargetingMethod
    ) -> List[str]:
        """Get prioritized list of candidate commits for fallback scenarios.

        Args:
            file_path: Path of the file being processed
            method: Fallback method to determine candidate ordering

        Returns:
            List of commit hashes ordered by priority
        """
        branch_commits = self._get_ordered_branch_commits()

        if method == TargetingMethod.FALLBACK_NEW_FILE:
            # For new files, just return recent commits first, merges last
            return branch_commits

        elif method == TargetingMethod.FALLBACK_EXISTING_FILE:
            # For existing files, prioritize commits that touched this file
            file_commits = self._get_commits_touching_file(file_path)
            other_commits = [c for c in branch_commits if c not in file_commits]
            return file_commits + other_commits

        return branch_commits

    def _get_ordered_branch_commits(self) -> List[str]:
        """Get branch commits ordered by recency, with merge commits last.

        Returns:
            List of commit hashes ordered by priority
        """
        branch_commits = list(self._get_branch_commits())
        if not branch_commits:
            return []

        # Use batch operations to get ordered commits
        ordered_commits = self.batch_ops.get_ordered_commits_by_recency(branch_commits)
        return [commit.commit_hash for commit in ordered_commits]

    def _get_commits_touching_file(self, file_path: str) -> List[str]:
        """Get commits that modified a specific file, ordered by recency.

        Args:
            file_path: Path to check for modifications

        Returns:
            List of commit hashes that touched the file
        """
        return self.batch_ops.get_commits_touching_file(file_path)

    def _is_merge_commit(self, commit_hash: str) -> bool:
        """Check if a commit is a merge commit.

        Args:
            commit_hash: Commit to check

        Returns:
            True if commit is a merge commit
        """
        commit_info = self.batch_ops.batch_load_commit_info([commit_hash])
        if commit_hash in commit_info:
            return commit_info[commit_hash].is_merge
        return False

    def set_target_for_file(self, file_path: str, target_commit: str) -> None:
        """Set target commit for a file to ensure consistency.

        Args:
            file_path: File path
            target_commit: Commit hash to use as target
        """
        self._file_target_cache[file_path] = target_commit

    def clear_file_cache(self) -> None:
        """Clear the file target cache for a fresh analysis."""
        self._file_target_cache.clear()
        self.batch_ops.clear_caches()
        self._branch_commits_cache = None
        self._new_files_cache = None
        self._file_line_count_cache.clear()
Functions
__init__(git_ops: GitOps, merge_base: str, blame_ref: str = 'HEAD') -> None

Initialize BlameAnalyzer.

Parameters:

Name Type Description Default
git_ops GitOps

GitOps instance for running git commands

required
merge_base str

Merge base commit hash to limit scope

required
blame_ref str

Git ref to use for blame operations (default: HEAD)

'HEAD'
Source code in src/git_autosquash/blame_analyzer.py
def __init__(
    self, git_ops: GitOps, merge_base: str, blame_ref: str = "HEAD"
) -> None:
    """Initialize BlameAnalyzer.

    Args:
        git_ops: GitOps instance for running git commands
        merge_base: Merge base commit hash to limit scope
        blame_ref: Git ref to use for blame operations (default: HEAD)
    """
    self.git_ops = git_ops
    self.merge_base = merge_base
    self.blame_ref = blame_ref
    self.batch_ops = BatchGitOperations(git_ops, merge_base, blame_ref=blame_ref)
    self._branch_commits_cache: Optional[Set[str]] = None
    self._commit_timestamp_cache: Dict[str, int] = {}
    self._file_target_cache: Dict[str, str] = {}  # Track previous targets by file
    self._new_files_cache: Optional[Set[str]] = None
    self._file_line_count_cache: Dict[str, int] = {}  # Cache file line counts
analyze_hunks(hunks: List[DiffHunk]) -> List[HunkTargetMapping]

Analyze hunks and determine target commits for each.

Parameters:

Name Type Description Default
hunks List[DiffHunk]

List of DiffHunk objects to analyze

required

Returns:

Type Description
List[HunkTargetMapping]

List of HunkTargetMapping objects with target commit information

Source code in src/git_autosquash/blame_analyzer.py
def analyze_hunks(self, hunks: List[DiffHunk]) -> List[HunkTargetMapping]:
    """Analyze hunks and determine target commits for each.

    Args:
        hunks: List of DiffHunk objects to analyze

    Returns:
        List of HunkTargetMapping objects with target commit information
    """
    mappings = []

    for hunk in hunks:
        mapping = self._analyze_single_hunk(hunk)
        mappings.append(mapping)

    return mappings
get_commit_summary(commit_hash: str) -> str

Get a short summary of a commit for display.

Parameters:

Name Type Description Default
commit_hash str

Commit hash to summarize

required

Returns:

Type Description
str

Short commit summary (hash + subject)

Source code in src/git_autosquash/blame_analyzer.py
def get_commit_summary(self, commit_hash: str) -> str:
    """Get a short summary of a commit for display.

    Args:
        commit_hash: Commit hash to summarize

    Returns:
        Short commit summary (hash + subject)
    """
    commit_info = self.batch_ops.batch_load_commit_info([commit_hash])
    if commit_hash in commit_info:
        info = commit_info[commit_hash]
        return f"{info.short_hash} {info.subject}"
    return commit_hash[:8]
set_target_for_file(file_path: str, target_commit: str) -> None

Set target commit for a file to ensure consistency.

Parameters:

Name Type Description Default
file_path str

File path

required
target_commit str

Commit hash to use as target

required
Source code in src/git_autosquash/blame_analyzer.py
def set_target_for_file(self, file_path: str, target_commit: str) -> None:
    """Set target commit for a file to ensure consistency.

    Args:
        file_path: File path
        target_commit: Commit hash to use as target
    """
    self._file_target_cache[file_path] = target_commit
clear_file_cache() -> None

Clear the file target cache for a fresh analysis.

Source code in src/git_autosquash/blame_analyzer.py
def clear_file_cache(self) -> None:
    """Clear the file target cache for a fresh analysis."""
    self._file_target_cache.clear()
    self.batch_ops.clear_caches()
    self._branch_commits_cache = None
    self._new_files_cache = None
    self._file_line_count_cache.clear()

RebaseManager

Orchestrates interactive rebase operations to apply approved hunks.

git_autosquash.rebase_manager

Interactive rebase manager for applying hunk mappings to historical commits.

Classes
RebaseConflictError

Bases: Exception

Raised when rebase encounters conflicts that need user resolution.

Source code in src/git_autosquash/rebase_manager.py
class RebaseConflictError(Exception):
    """Raised when rebase encounters conflicts that need user resolution."""

    def __init__(self, message: str, conflicted_files: List[str]) -> None:
        """Initialize conflict error.

        Args:
            message: Error message
            conflicted_files: List of files with conflicts
        """
        super().__init__(message)
        self.conflicted_files = conflicted_files
Functions
__init__(message: str, conflicted_files: List[str]) -> None

Initialize conflict error.

Parameters:

Name Type Description Default
message str

Error message

required
conflicted_files List[str]

List of files with conflicts

required
Source code in src/git_autosquash/rebase_manager.py
def __init__(self, message: str, conflicted_files: List[str]) -> None:
    """Initialize conflict error.

    Args:
        message: Error message
        conflicted_files: List of files with conflicts
    """
    super().__init__(message)
    self.conflicted_files = conflicted_files
RebaseManager

Manages interactive rebase operations for squashing hunks to commits.

Source code in src/git_autosquash/rebase_manager.py
  32
  33
  34
  35
  36
  37
  38
  39
  40
  41
  42
  43
  44
  45
  46
  47
  48
  49
  50
  51
  52
  53
  54
  55
  56
  57
  58
  59
  60
  61
  62
  63
  64
  65
  66
  67
  68
  69
  70
  71
  72
  73
  74
  75
  76
  77
  78
  79
  80
  81
  82
  83
  84
  85
  86
  87
  88
  89
  90
  91
  92
  93
  94
  95
  96
  97
  98
  99
 100
 101
 102
 103
 104
 105
 106
 107
 108
 109
 110
 111
 112
 113
 114
 115
 116
 117
 118
 119
 120
 121
 122
 123
 124
 125
 126
 127
 128
 129
 130
 131
 132
 133
 134
 135
 136
 137
 138
 139
 140
 141
 142
 143
 144
 145
 146
 147
 148
 149
 150
 151
 152
 153
 154
 155
 156
 157
 158
 159
 160
 161
 162
 163
 164
 165
 166
 167
 168
 169
 170
 171
 172
 173
 174
 175
 176
 177
 178
 179
 180
 181
 182
 183
 184
 185
 186
 187
 188
 189
 190
 191
 192
 193
 194
 195
 196
 197
 198
 199
 200
 201
 202
 203
 204
 205
 206
 207
 208
 209
 210
 211
 212
 213
 214
 215
 216
 217
 218
 219
 220
 221
 222
 223
 224
 225
 226
 227
 228
 229
 230
 231
 232
 233
 234
 235
 236
 237
 238
 239
 240
 241
 242
 243
 244
 245
 246
 247
 248
 249
 250
 251
 252
 253
 254
 255
 256
 257
 258
 259
 260
 261
 262
 263
 264
 265
 266
 267
 268
 269
 270
 271
 272
 273
 274
 275
 276
 277
 278
 279
 280
 281
 282
 283
 284
 285
 286
 287
 288
 289
 290
 291
 292
 293
 294
 295
 296
 297
 298
 299
 300
 301
 302
 303
 304
 305
 306
 307
 308
 309
 310
 311
 312
 313
 314
 315
 316
 317
 318
 319
 320
 321
 322
 323
 324
 325
 326
 327
 328
 329
 330
 331
 332
 333
 334
 335
 336
 337
 338
 339
 340
 341
 342
 343
 344
 345
 346
 347
 348
 349
 350
 351
 352
 353
 354
 355
 356
 357
 358
 359
 360
 361
 362
 363
 364
 365
 366
 367
 368
 369
 370
 371
 372
 373
 374
 375
 376
 377
 378
 379
 380
 381
 382
 383
 384
 385
 386
 387
 388
 389
 390
 391
 392
 393
 394
 395
 396
 397
 398
 399
 400
 401
 402
 403
 404
 405
 406
 407
 408
 409
 410
 411
 412
 413
 414
 415
 416
 417
 418
 419
 420
 421
 422
 423
 424
 425
 426
 427
 428
 429
 430
 431
 432
 433
 434
 435
 436
 437
 438
 439
 440
 441
 442
 443
 444
 445
 446
 447
 448
 449
 450
 451
 452
 453
 454
 455
 456
 457
 458
 459
 460
 461
 462
 463
 464
 465
 466
 467
 468
 469
 470
 471
 472
 473
 474
 475
 476
 477
 478
 479
 480
 481
 482
 483
 484
 485
 486
 487
 488
 489
 490
 491
 492
 493
 494
 495
 496
 497
 498
 499
 500
 501
 502
 503
 504
 505
 506
 507
 508
 509
 510
 511
 512
 513
 514
 515
 516
 517
 518
 519
 520
 521
 522
 523
 524
 525
 526
 527
 528
 529
 530
 531
 532
 533
 534
 535
 536
 537
 538
 539
 540
 541
 542
 543
 544
 545
 546
 547
 548
 549
 550
 551
 552
 553
 554
 555
 556
 557
 558
 559
 560
 561
 562
 563
 564
 565
 566
 567
 568
 569
 570
 571
 572
 573
 574
 575
 576
 577
 578
 579
 580
 581
 582
 583
 584
 585
 586
 587
 588
 589
 590
 591
 592
 593
 594
 595
 596
 597
 598
 599
 600
 601
 602
 603
 604
 605
 606
 607
 608
 609
 610
 611
 612
 613
 614
 615
 616
 617
 618
 619
 620
 621
 622
 623
 624
 625
 626
 627
 628
 629
 630
 631
 632
 633
 634
 635
 636
 637
 638
 639
 640
 641
 642
 643
 644
 645
 646
 647
 648
 649
 650
 651
 652
 653
 654
 655
 656
 657
 658
 659
 660
 661
 662
 663
 664
 665
 666
 667
 668
 669
 670
 671
 672
 673
 674
 675
 676
 677
 678
 679
 680
 681
 682
 683
 684
 685
 686
 687
 688
 689
 690
 691
 692
 693
 694
 695
 696
 697
 698
 699
 700
 701
 702
 703
 704
 705
 706
 707
 708
 709
 710
 711
 712
 713
 714
 715
 716
 717
 718
 719
 720
 721
 722
 723
 724
 725
 726
 727
 728
 729
 730
 731
 732
 733
 734
 735
 736
 737
 738
 739
 740
 741
 742
 743
 744
 745
 746
 747
 748
 749
 750
 751
 752
 753
 754
 755
 756
 757
 758
 759
 760
 761
 762
 763
 764
 765
 766
 767
 768
 769
 770
 771
 772
 773
 774
 775
 776
 777
 778
 779
 780
 781
 782
 783
 784
 785
 786
 787
 788
 789
 790
 791
 792
 793
 794
 795
 796
 797
 798
 799
 800
 801
 802
 803
 804
 805
 806
 807
 808
 809
 810
 811
 812
 813
 814
 815
 816
 817
 818
 819
 820
 821
 822
 823
 824
 825
 826
 827
 828
 829
 830
 831
 832
 833
 834
 835
 836
 837
 838
 839
 840
 841
 842
 843
 844
 845
 846
 847
 848
 849
 850
 851
 852
 853
 854
 855
 856
 857
 858
 859
 860
 861
 862
 863
 864
 865
 866
 867
 868
 869
 870
 871
 872
 873
 874
 875
 876
 877
 878
 879
 880
 881
 882
 883
 884
 885
 886
 887
 888
 889
 890
 891
 892
 893
 894
 895
 896
 897
 898
 899
 900
 901
 902
 903
 904
 905
 906
 907
 908
 909
 910
 911
 912
 913
 914
 915
 916
 917
 918
 919
 920
 921
 922
 923
 924
 925
 926
 927
 928
 929
 930
 931
 932
 933
 934
 935
 936
 937
 938
 939
 940
 941
 942
 943
 944
 945
 946
 947
 948
 949
 950
 951
 952
 953
 954
 955
 956
 957
 958
 959
 960
 961
 962
 963
 964
 965
 966
 967
 968
 969
 970
 971
 972
 973
 974
 975
 976
 977
 978
 979
 980
 981
 982
 983
 984
 985
 986
 987
 988
 989
 990
 991
 992
 993
 994
 995
 996
 997
 998
 999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169
1170
1171
1172
1173
1174
1175
1176
1177
1178
1179
1180
1181
1182
1183
1184
1185
1186
1187
1188
1189
1190
1191
1192
1193
1194
1195
1196
1197
1198
1199
1200
1201
1202
1203
1204
1205
1206
1207
1208
1209
1210
1211
1212
1213
1214
1215
1216
1217
1218
1219
1220
1221
1222
1223
1224
1225
1226
1227
1228
1229
1230
1231
1232
1233
1234
1235
1236
1237
1238
1239
1240
1241
1242
1243
1244
1245
1246
1247
1248
1249
1250
1251
1252
1253
1254
1255
1256
1257
1258
1259
1260
1261
1262
1263
1264
1265
1266
1267
1268
1269
1270
1271
1272
1273
1274
1275
1276
1277
1278
1279
1280
1281
1282
1283
1284
1285
1286
1287
1288
1289
1290
1291
1292
1293
1294
1295
1296
1297
1298
1299
1300
1301
1302
1303
1304
1305
1306
1307
1308
1309
1310
1311
1312
1313
1314
1315
1316
1317
1318
1319
1320
1321
1322
1323
1324
1325
1326
1327
1328
1329
1330
1331
1332
1333
1334
1335
1336
1337
1338
1339
1340
1341
1342
1343
1344
1345
1346
1347
1348
1349
1350
1351
1352
1353
1354
1355
1356
1357
1358
1359
1360
1361
1362
1363
1364
1365
1366
1367
1368
1369
1370
1371
1372
1373
1374
1375
1376
1377
1378
1379
1380
1381
1382
1383
1384
1385
1386
1387
1388
1389
1390
1391
1392
1393
1394
1395
1396
1397
1398
1399
1400
1401
1402
1403
1404
1405
1406
1407
1408
1409
1410
1411
1412
1413
1414
1415
1416
1417
1418
1419
1420
1421
1422
1423
1424
1425
1426
1427
1428
1429
1430
1431
1432
1433
1434
1435
1436
1437
1438
1439
1440
1441
1442
1443
1444
1445
1446
1447
1448
1449
1450
1451
1452
1453
1454
1455
1456
1457
1458
1459
1460
1461
1462
1463
1464
1465
1466
1467
1468
1469
1470
1471
1472
1473
1474
1475
1476
1477
1478
1479
1480
1481
1482
1483
1484
1485
1486
1487
1488
1489
1490
1491
1492
1493
1494
1495
1496
1497
1498
1499
1500
1501
1502
1503
1504
1505
1506
1507
1508
1509
1510
1511
1512
1513
1514
1515
1516
1517
1518
1519
1520
1521
1522
1523
1524
1525
1526
1527
1528
1529
1530
1531
1532
1533
1534
1535
1536
1537
1538
1539
1540
1541
1542
1543
1544
1545
1546
1547
1548
1549
1550
1551
1552
1553
1554
1555
1556
1557
1558
1559
1560
1561
1562
1563
1564
1565
1566
1567
1568
1569
1570
1571
1572
1573
1574
1575
1576
1577
1578
1579
1580
1581
1582
1583
1584
1585
1586
1587
1588
1589
1590
1591
1592
1593
1594
1595
1596
1597
1598
1599
1600
1601
1602
1603
1604
1605
1606
1607
1608
1609
1610
1611
1612
1613
1614
1615
1616
1617
1618
1619
1620
1621
1622
1623
1624
1625
1626
1627
1628
1629
1630
1631
1632
1633
1634
1635
1636
1637
1638
1639
1640
1641
1642
1643
1644
1645
1646
1647
1648
1649
1650
1651
1652
1653
1654
1655
1656
1657
1658
1659
1660
1661
1662
1663
1664
1665
1666
1667
1668
1669
1670
1671
1672
1673
1674
1675
1676
1677
1678
1679
1680
1681
1682
1683
1684
1685
1686
1687
1688
1689
1690
1691
1692
1693
1694
1695
1696
1697
1698
1699
1700
1701
1702
1703
1704
1705
1706
1707
1708
1709
1710
1711
1712
1713
1714
1715
1716
1717
1718
1719
1720
1721
1722
1723
1724
1725
1726
1727
1728
1729
1730
1731
1732
1733
1734
1735
1736
1737
1738
1739
1740
1741
1742
1743
1744
1745
1746
1747
1748
1749
1750
1751
1752
1753
1754
1755
1756
1757
1758
1759
1760
1761
1762
1763
1764
1765
1766
1767
1768
1769
1770
1771
1772
1773
1774
1775
1776
1777
1778
1779
1780
1781
1782
1783
1784
1785
1786
1787
1788
1789
1790
1791
1792
1793
1794
1795
1796
1797
1798
1799
1800
1801
1802
1803
1804
1805
1806
1807
1808
1809
1810
1811
1812
1813
1814
1815
1816
1817
1818
1819
1820
1821
1822
1823
1824
1825
1826
1827
1828
1829
1830
1831
1832
1833
1834
1835
1836
1837
1838
1839
1840
1841
1842
1843
1844
1845
1846
1847
1848
1849
1850
1851
1852
1853
1854
1855
1856
1857
1858
1859
1860
1861
1862
1863
1864
1865
1866
1867
1868
1869
1870
1871
1872
1873
1874
1875
1876
1877
1878
1879
1880
1881
1882
1883
1884
1885
1886
1887
1888
1889
1890
1891
1892
1893
1894
1895
1896
1897
1898
1899
1900
1901
1902
1903
1904
1905
1906
1907
1908
1909
1910
1911
1912
1913
1914
1915
1916
1917
1918
1919
1920
1921
1922
1923
1924
1925
1926
1927
1928
1929
1930
1931
1932
1933
1934
1935
1936
1937
1938
1939
1940
1941
1942
1943
1944
1945
1946
1947
1948
1949
1950
1951
1952
1953
1954
1955
1956
1957
1958
1959
1960
1961
1962
1963
1964
1965
1966
1967
1968
1969
1970
1971
1972
1973
1974
1975
1976
1977
1978
1979
1980
1981
1982
1983
1984
1985
1986
1987
1988
1989
1990
1991
1992
1993
1994
1995
1996
1997
1998
1999
2000
2001
2002
2003
2004
2005
2006
2007
2008
2009
2010
2011
2012
2013
2014
2015
2016
2017
2018
2019
2020
2021
2022
2023
2024
2025
2026
2027
2028
2029
2030
2031
2032
2033
2034
2035
2036
2037
2038
class RebaseManager:
    """Manages interactive rebase operations for squashing hunks to commits."""

    def __init__(self, git_ops: GitOps, merge_base: str) -> None:
        """Initialize rebase manager.

        Args:
            git_ops: Git operations handler
            merge_base: Merge base commit hash
        """
        self.git_ops = git_ops
        self.merge_base = merge_base
        self._stash_ref: Optional[str] = None
        self._original_branch: Optional[str] = None
        self._batch_ops: Optional[BatchGitOperations] = None
        self._context: Optional[SquashContext] = None

    def execute_squash(
        self,
        mappings: List[HunkTargetMapping],
        ignored_mappings: List[HunkTargetMapping],
        context: SquashContext,
    ) -> bool:
        """Execute the squash operation for approved mappings.

        Args:
            mappings: List of approved hunk to commit mappings
            ignored_mappings: List of ignored hunk to commit mappings
            context: SquashContext for --source commits and blame configuration

        Returns:
            True if successful, False if user aborted

        Raises:
            RebaseConflictError: If conflicts occur during rebase
            subprocess.SubprocessError: If git operations fail
        """
        if not mappings and not ignored_mappings:
            return True

        self._context = context
        self._ignored_mappings = ignored_mappings

        # Store original branch for cleanup
        self._original_branch = self.git_ops.get_current_branch()
        if not self._original_branch:
            raise ValueError("Cannot determine current branch")

        try:
            # Group hunks by target commit
            commit_hunks = self._group_hunks_by_commit(mappings)

            # Build mapping from hunk to source_commit_sha for cherry-pick
            hunk_to_source_commit = {}
            for mapping in mappings:
                if mapping.source_commit_sha:
                    # Use hunk as key (need to find it in commit_hunks)
                    hunk_to_source_commit[id(mapping.hunk)] = mapping.source_commit_sha

            # Check working tree state and handle stashing if needed
            self._handle_working_tree_state()

            # Execute rebase for each target commit
            target_commits = self._get_commit_order(set(commit_hunks.keys()))
            logger.debug(
                f"Processing {len(target_commits)} target commits in order: {[c[:8] for c in target_commits]}"
            )

            for target_commit in target_commits:
                hunks = commit_hunks[target_commit]
                # Get source commit SHAs for these hunks
                source_commits = [hunk_to_source_commit.get(id(hunk)) for hunk in hunks]
                logger.debug(
                    f"Processing target commit {target_commit[:8]} with {len(hunks)} hunks"
                )
                success = self._apply_hunks_to_commit(
                    target_commit, hunks, source_commits
                )
                if not success:
                    logger.debug(f"Failed to apply hunks to commit {target_commit[:8]}")
                    return False
                logger.debug(
                    f"Successfully applied hunks to commit {target_commit[:8]}"
                )
                logger.debug("=" * 80)

            # Handle source commit if it has ignored hunks
            if hasattr(self, "_source_needs_edit") and self._ignored_mappings:
                logger.debug(
                    f"Processing source commit with {len(self._ignored_mappings)} ignored hunks"
                )
                success = self._process_source_with_ignored_hunks(
                    self._source_needs_edit, commit_hunks
                )
                if not success:
                    logger.debug(
                        f"Failed to process source commit {self._source_needs_edit[:8]}"
                    )
                    return False
                logger.debug(
                    f"Successfully processed source commit {self._source_needs_edit[:8]}"
                )
                logger.debug("=" * 80)

            # Restore stash if we created one (success path)
            if self._stash_ref:
                try:
                    success = self._restore_stash_by_sha(self._stash_ref)
                    if not success:
                        logger.error(f"Failed to restore stash: {self._stash_ref[:12]}")
                        logger.info(
                            f"Manual restore: git stash apply {self._stash_ref[:12]}"
                        )
                except Exception as e:
                    logger.error(f"Error restoring stash: {e}")
                    logger.info(
                        f"Manual restore: git stash apply {self._stash_ref[:12]}"
                    )
                finally:
                    self._stash_ref = None

            return True

        except RebaseConflictError:
            # Don't cleanup on rebase conflicts - let user resolve manually
            raise
        except Exception:
            # Cleanup on any other error
            self._cleanup_on_error()
            raise

    def _group_hunks_by_commit(
        self, mappings: List[HunkTargetMapping]
    ) -> Dict[str, List[DiffHunk]]:
        """Group hunks by their target commit.

        Args:
            mappings: List of hunk to commit mappings

        Returns:
            Dictionary mapping commit hash to list of hunks
        """
        commit_hunks: Dict[str, List[DiffHunk]] = {}

        for mapping in mappings:
            if mapping.target_commit:
                commit_hash = mapping.target_commit
                if commit_hash not in commit_hunks:
                    commit_hunks[commit_hash] = []
                commit_hunks[commit_hash].append(mapping.hunk)

        return commit_hunks

    def _get_commit_order(self, commit_hashes: Set[str]) -> List[str]:
        """Get commits in git topological order (newest first).

        Args:
            commit_hashes: Set of commit hashes to order

        Returns:
            List of commit hashes in git topological order (newest first)
        """
        # Lazy initialize batch operations
        if self._batch_ops is None:
            self._batch_ops = BatchGitOperations(self.git_ops, self.merge_base)

        # Get all branch commits in chronological order (oldest first)
        all_branch_commits = self._batch_ops.get_branch_commits()

        # Filter to only the commits we need, keeping chronological order
        ordered_commits = []
        for commit_hash in all_branch_commits:
            if commit_hash in commit_hashes:
                ordered_commits.append(commit_hash)

        # Handle any commits not found in branch (shouldn't happen, but be safe)
        missing_commits = commit_hashes - set(ordered_commits)
        if missing_commits:
            ordered_commits.extend(sorted(missing_commits))

        return ordered_commits

    def _handle_working_tree_state(self) -> None:
        """Handle working tree state before rebase."""
        status = self.git_ops.get_working_tree_status()

        # Validate status data
        if not isinstance(status, dict):
            raise ValueError("Invalid working tree status format")

        operation_type = None
        message = None
        stash_sha = None

        if status.get("has_staged", False) and status.get("has_unstaged", False):
            # Mixed changes: stash only unstaged changes, keep staged changes in index
            operation_type = "mixed"
            message = "git-autosquash: temporary stash of unstaged changes"

            # Use --keep-index to stash only unstaged changes
            stash_sha = self._create_stash_with_options(message, ["--keep-index"])

        elif status.get("has_staged", False) and not status.get("has_unstaged", False):
            # Staged changes only: must stash before rebase
            operation_type = "staged_only"
            message = "git-autosquash: temporary stash of staged changes"

            # Use --staged to stash only staged changes
            stash_sha = self._create_stash_with_options(message, ["--staged"])

        elif not status.get("has_staged", False) and status.get("has_unstaged", False):
            # Unstaged changes only: must stash before rebase
            operation_type = "unstaged_only"
            message = "git-autosquash: temporary stash of unstaged changes"

            # Stash all working tree changes
            stash_sha = self._create_and_store_stash(message)

        else:
            # Clean working tree, nothing to stash
            logger.debug("Working tree is clean, no stashing needed")
            return

        if stash_sha:
            self._stash_ref = stash_sha
            logger.info(f"Working tree prepared. Stash SHA: {stash_sha[:8]}")
        elif operation_type is not None:
            # Stash returned None but operation_type was set
            # This means status reported changes but there were no actual changes to stash
            # This can happen when processing historical commits with --source
            logger.debug(
                f"No actual changes to stash despite status indicating {operation_type} changes"
            )
            # Continue without stashing - working tree is effectively clean

    def _create_and_store_stash(self, message: str) -> Optional[str]:
        """Create a stash and return its SHA reference.

        Uses git stash create + store to get a reliable SHA reference
        instead of assuming stash@{0}.

        Args:
            message: Description for the stash

        Returns:
            SHA of created stash, or None if failed or no changes
        """
        # Step 1: Create stash object without modifying stash list
        # This returns a SHA that uniquely identifies the stash
        create_result = self.git_ops.run_git_command(["stash", "create", message])

        if create_result.returncode != 0:
            logger.error(f"Failed to create stash: {create_result.stderr}")
            return None

        stash_sha = create_result.stdout.strip()
        if not stash_sha:
            # No changes to stash (working tree might be clean)
            logger.info("No changes to stash")
            return None

        # Step 2: Store the stash object in the stash list
        # This makes it visible in 'git stash list'
        store_result = self.git_ops.run_git_command(
            ["stash", "store", "-m", message, stash_sha]
        )

        if store_result.returncode != 0:
            logger.error(f"Failed to store stash {stash_sha}: {store_result.stderr}")
            # The stash object exists but isn't in the list
            # We can still use it by SHA
            logger.warning(f"Stash created but not stored in list. SHA: {stash_sha}")

        logger.info(f"Created stash with SHA: {stash_sha}")
        return stash_sha

    def _create_stash_with_options(
        self, message: str, options: List[str]
    ) -> Optional[str]:
        """Create stash with specific options and return SHA.

        Args:
            message: Stash message
            options: List of git stash options (e.g., ['--keep-index'])

        Returns:
            SHA of created stash, or None if failed
        """
        # For options that require git stash push, we need to use a different approach
        # since git stash create doesn't support --staged or --keep-index

        # First, check if this is a standard creation without special options
        if not options or options == []:
            return self._create_and_store_stash(message)

        # For --staged: we need to temporarily manipulate the working tree
        if "--staged" in options:
            return self._create_staged_only_stash(message)

        # For --keep-index: we need to stash everything then restore index
        if "--keep-index" in options:
            return self._create_keep_index_stash(message)

        # For other unsupported options, use atomic stash create approach
        # This avoids race conditions but may not support all options perfectly
        logger.warning(
            f"Using generic atomic stash for options {options}. "
            f"Some options may not be fully supported."
        )

        # Save current HEAD for reference
        head_result = self.git_ops.run_git_command(["rev-parse", "HEAD"])
        if head_result.returncode != 0:
            logger.error("Failed to get HEAD reference")
            return None
        head_sha = head_result.stdout.strip()

        # Try to create a stash using the atomic approach
        # First, create a temporary commit with all changes
        result = self.git_ops.run_git_command(["add", "-A"])
        if result.returncode != 0:
            logger.error(f"Failed to stage all changes: {result.stderr}")
            return None

        # Create temporary commit
        commit_result = self.git_ops.run_git_command(
            ["commit", "--no-verify", "-m", f"TEMP: {message}"]
        )
        if commit_result.returncode != 0:
            # No changes to stash
            logger.info("No changes to stash")
            return None

        # Get the commit SHA
        temp_commit_result = self.git_ops.run_git_command(["rev-parse", "HEAD"])
        if temp_commit_result.returncode != 0:
            logger.error("Failed to get temporary commit SHA")
            # Reset to original state
            self.git_ops.run_git_command(["reset", "--hard", head_sha])
            return None
        # temp_commit_sha would be used here if we needed it for recovery
        # Currently we just use it to confirm the commit succeeded
        _ = temp_commit_result.stdout.strip()

        # Reset back to original HEAD but keep changes
        reset_result = self.git_ops.run_git_command(["reset", "--mixed", head_sha])
        if reset_result.returncode != 0:
            logger.error(
                f"Failed to reset after temporary commit: {reset_result.stderr}"
            )
            self.git_ops.run_git_command(["reset", "--hard", head_sha])
            return None

        # Now create stash from the temporary commit
        stash_create_result = self.git_ops.run_git_command(["stash", "create", message])
        if (
            stash_create_result.returncode != 0
            or not stash_create_result.stdout.strip()
        ):
            logger.error("Failed to create stash from changes")
            return None

        stash_sha = stash_create_result.stdout.strip()

        # Store the stash
        store_result = self.git_ops.run_git_command(
            ["stash", "store", "-m", message, stash_sha]
        )
        if store_result.returncode != 0:
            logger.warning(f"Failed to store stash in list: {store_result.stderr}")
            # Stash object exists but not in list - still usable

        logger.info(f"Created atomic stash with SHA: {stash_sha[:12]}")
        return stash_sha

    def _create_staged_only_stash(self, message: str) -> Optional[str]:
        """Create a stash containing only staged changes.

        Uses git stash create after manipulating the working tree to isolate staged changes.

        Args:
            message: Stash message

        Returns:
            SHA of created stash, or None if failed
        """
        # Strategy: git stash create doesn't support --staged, so we simulate it
        # 1. Create a temporary commit with staged changes
        # 2. Use git stash create to capture the difference
        # 3. Reset the temporary commit

        # Check if there are staged changes
        status_result = self.git_ops.run_git_command(["diff", "--cached", "--quiet"])
        if status_result.returncode == 0:
            logger.info("No staged changes to stash")
            return None

        # Create a temporary tree object from the index
        tree_result = self.git_ops.run_git_command(["write-tree"])
        if tree_result.returncode != 0:
            logger.error(f"Failed to write tree: {tree_result.stderr}")
            return None

        tree_sha = tree_result.stdout.strip()

        # Create stash commit object
        commit_result = self.git_ops.run_git_command(
            ["commit-tree", tree_sha, "-p", "HEAD", "-m", message]
        )
        if commit_result.returncode != 0:
            logger.error(f"Failed to create commit tree: {commit_result.stderr}")
            return None

        stash_sha = commit_result.stdout.strip()

        # Store in stash list
        store_result = self.git_ops.run_git_command(
            ["stash", "store", "-m", message, stash_sha]
        )
        if store_result.returncode != 0:
            logger.warning(f"Failed to store stash in list: {store_result.stderr}")

        logger.info(f"Created staged-only stash with SHA: {stash_sha}")
        return stash_sha

    def _create_keep_index_stash(self, message: str) -> Optional[str]:
        """Create a stash with --keep-index behavior using SHA references.

        Args:
            message: Stash message

        Returns:
            SHA of created stash, or None if failed
        """
        # Strategy for --keep-index:
        # 1. Save current index state
        # 2. Create stash of all changes
        # 3. Restore index to original state

        # Save index state
        index_tree_result = self.git_ops.run_git_command(["write-tree"])
        if index_tree_result.returncode != 0:
            logger.error(f"Failed to save index state: {index_tree_result.stderr}")
            return None

        index_tree = index_tree_result.stdout.strip()

        # Create stash of all changes (staged + unstaged)
        stash_sha = self._create_and_store_stash(message)
        if not stash_sha:
            return None

        # Restore index to saved state
        restore_result = self.git_ops.run_git_command(["read-tree", index_tree])
        if restore_result.returncode != 0:
            logger.error(f"Failed to restore index: {restore_result.stderr}")
            # The stash was created, but we couldn't restore index
            # This is a partial failure
            return stash_sha

        logger.info(f"Created keep-index stash with SHA: {stash_sha}")
        return stash_sha

    def _validate_stash_sha(self, stash_sha: str) -> bool:
        """Validate that a string is a valid SHA format.

        Args:
            stash_sha: String to validate

        Returns:
            True if valid SHA format, False otherwise
        """
        import re

        if not stash_sha or not isinstance(stash_sha, str):
            return False

        # Git SHA-1 is 40 hexadecimal characters
        # Git SHA-256 is 64 hexadecimal characters (future support)
        sha_pattern = re.compile(r"^[a-f0-9]{40}$|^[a-f0-9]{64}$")
        return bool(sha_pattern.match(stash_sha.lower()))

    def _verify_stash_exists(self, stash_sha: str) -> bool:
        """Verify that a stash SHA exists in the git repository.

        Args:
            stash_sha: SHA of the stash to verify

        Returns:
            True if stash exists and is a valid commit, False otherwise
        """
        if not self._validate_stash_sha(stash_sha):
            logger.error(f"Invalid SHA format: {stash_sha}")
            return False

        # Check if the object exists and is a commit
        result = self.git_ops.run_git_command(["cat-file", "-t", stash_sha])
        if result.returncode != 0:
            logger.error(f"Stash SHA does not exist: {stash_sha}")
            return False

        if result.stdout.strip() != "commit":
            logger.error(
                f"SHA exists but is not a commit: {stash_sha} (type: {result.stdout.strip()})"
            )
            return False

        return True

    def _find_stash_ref_by_sha(self, stash_sha: str) -> Optional[str]:
        """Find stash reference (stash@{n}) for a given SHA.

        Args:
            stash_sha: The SHA to find in stash list

        Returns:
            Stash reference like "stash@{0}" if found, None otherwise
        """
        if not self._validate_stash_sha(stash_sha):
            logger.error(f"Invalid SHA format: {stash_sha}")
            return None

        # List all stashes with their SHAs
        result = self.git_ops.run_git_command(["stash", "list", "--format=%H %gd"])

        if result.returncode != 0:
            logger.error(f"Failed to list stashes: {result.stderr}")
            return None

        # Parse output to find matching SHA
        for line in result.stdout.strip().split("\n"):
            if not line:
                continue
            parts = line.split(" ", 1)
            if len(parts) == 2:
                sha, ref = parts
                if sha == stash_sha:
                    # Extract stash@{n} from format like "(stash@{0})"
                    if ref.startswith("(") and ref.endswith(")"):
                        ref = ref[1:-1]
                    logger.debug(f"Found stash reference {ref} for SHA {stash_sha}")
                    return ref

        logger.warning(f"No stash reference found for SHA {stash_sha}")
        return None

    def _restore_stash_by_sha(self, stash_sha: str) -> bool:
        """Restore stash using its SHA reference.

        Args:
            stash_sha: SHA of the stash to restore

        Returns:
            True if successful, False otherwise
        """
        # Verify stash exists before attempting to restore
        if not self._verify_stash_exists(stash_sha):
            logger.error(f"Cannot restore stash - invalid or missing: {stash_sha[:12]}")
            return False

        logger.info(f"Restoring stash by SHA: {stash_sha[:12]}")

        # Apply the stash
        result = self.git_ops.run_git_command(["stash", "apply", stash_sha])

        if result.returncode != 0:
            # Check if it's a conflict during stash application
            if "CONFLICT" in result.stderr or "conflict" in result.stderr.lower():
                logger.error(
                    f"Conflicts occurred while applying stash {stash_sha[:12]}"
                )
            else:
                logger.error(f"Failed to apply stash {stash_sha}: {result.stderr}")
            return False

        # Successfully applied, now drop the stash using proper reference
        logger.info("Stash applied successfully, dropping from list")

        # Find the stash reference (stash@{n}) for this SHA
        stash_ref = self._find_stash_ref_by_sha(stash_sha)
        if stash_ref:
            # Use stash reference for drop command
            drop_result = self.git_ops.run_git_command(["stash", "drop", stash_ref])
            if drop_result.returncode != 0:
                logger.warning(
                    f"Failed to drop stash {stash_ref} (non-critical): {drop_result.stderr}"
                )
            else:
                logger.debug(
                    f"Successfully dropped stash {stash_ref} (SHA: {stash_sha[:12]})"
                )
        else:
            # Stash might have been dropped already or doesn't exist in list
            logger.warning(
                f"Could not find stash reference for SHA {stash_sha[:12]}. "
                "Stash may have been dropped already or created outside stash list."
            )

        return True

    def _apply_hunks_to_commit(
        self,
        target_commit: str,
        hunks: List[DiffHunk],
        source_commits: Optional[List[Optional[str]]] = None,
    ) -> bool:
        """Apply hunks to a specific commit via interactive rebase.

        Args:
            target_commit: Target commit hash
            hunks: List of hunks to apply to this commit

        Returns:
            True if successful, False if user aborted
        """
        logger.debug(f"Applying {len(hunks)} hunks to commit {target_commit[:8]}")
        for i, hunk in enumerate(hunks):
            logger.debug(
                f"Hunk {i + 1}: {hunk.file_path} @@ {hunk.lines[0] if hunk.lines else 'empty'}"
            )

        # Start interactive rebase to edit the target commit
        logger.debug(f"Starting interactive rebase to edit {target_commit[:8]}")
        if not self._start_rebase_edit(target_commit):
            logger.debug("Failed to start rebase edit")
            return False

        logger.debug("Interactive rebase started successfully")

        # Check what commit we're actually at
        result = self.git_ops.run_git_command(["rev-parse", "HEAD"])
        if result.returncode == 0:
            current_head = result.stdout.strip()
            logger.debug(f"Current HEAD during rebase: {current_head[:8]}")
            logger.debug(f"Target commit: {target_commit[:8]}")
            if current_head != target_commit:
                logger.debug(
                    f"WARNING - HEAD mismatch! We're at {current_head[:8]} but expected {target_commit[:8]}"
                )

        try:
            # Check if we have split commit SHAs (for cherry-pick)
            split_commits = [sc for sc in (source_commits or []) if sc is not None]

            if split_commits:
                # Verify alignment between split_commits and hunks
                if len(split_commits) != len(hunks):
                    raise subprocess.SubprocessError(
                        f"Split commits ({len(split_commits)}) and hunks ({len(hunks)}) count mismatch. "
                        f"This indicates a bug in the split-commit approach."
                    )
                # Use cherry-pick with 3-way merge (reliable)
                logger.debug(f"Cherry-picking {len(split_commits)} split commits")
                for i, (commit_sha, hunk) in enumerate(zip(split_commits, hunks), 1):
                    # Handle file deletions specially
                    if hunk.is_file_deletion:
                        logger.debug(
                            f"Applying file deletion {i}/{len(split_commits)}: {hunk.file_path}"
                        )
                        # Use --ignore-unmatch for idempotent deletion (no TOCTOU race)
                        # This succeeds even if file already deleted
                        result = self.git_ops.run_git_command(
                            ["rm", "--ignore-unmatch", hunk.file_path]
                        )
                        if result.returncode != 0:
                            logger.debug(f"git rm failed: {result.stderr}")
                            raise subprocess.SubprocessError(
                                f"Failed to delete file {hunk.file_path}: {result.stderr}"
                            )
                        logger.debug(f"File deletion {i} successful")
                    else:
                        # Regular hunk: cherry-pick the split commit
                        logger.debug(
                            f"Cherry-picking {i}/{len(split_commits)}: {commit_sha[:8]}"
                        )
                        result = self.git_ops.run_git_command(
                            ["cherry-pick", "--no-commit", commit_sha]
                        )
                        if result.returncode != 0:
                            logger.debug(f"Cherry-pick failed: {result.stderr}")
                            # Check for conflicts
                            conflicted_files = self._get_conflicted_files()
                            if conflicted_files:
                                raise RebaseConflictError(
                                    f"Cherry-pick failed with conflicts: {result.stderr}",
                                    conflicted_files,
                                )
                            else:
                                raise subprocess.SubprocessError(
                                    f"Cherry-pick failed: {result.stderr}"
                                )
                        logger.debug(f"Cherry-pick {i} successful")
                logger.debug("All operations successful")
            else:
                # Fall back to patch-based approach
                logger.debug("Creating patch from original hunk text")
                patch_content = self._create_patch_from_original_hunks(hunks)
                logger.debug(f"Created patch content ({len(patch_content)} chars):")
                logger.debug("=" * 50)
                logger.debug(patch_content)
                logger.debug("=" * 50)
                self._apply_patch_with_3way(patch_content)
                logger.debug("Patch applied successfully")

            # Amend the commit
            logger.debug("Amending commit with changes")
            self._amend_commit()
            logger.debug("Commit amended successfully")

            # Continue the rebase
            logger.debug("Continuing rebase")
            self._continue_rebase()
            logger.debug("Rebase continued successfully")

            return True

        except RebaseConflictError:
            # Let the exception propagate for user handling
            raise
        except Exception as e:
            # Abort rebase on unexpected errors
            logger.debug(f"Exception occurred during rebase: {e}")
            logger.debug(f"Exception type: {type(e)}")
            self._abort_rebase()
            raise subprocess.SubprocessError(f"Failed to apply changes: {e}")

    def _consolidate_hunks_by_file(
        self, hunks: List[DiffHunk]
    ) -> Dict[str, List[DiffHunk]]:
        """Group hunks by file and detect potential conflicts."""
        files_to_hunks: Dict[str, List[DiffHunk]] = {}
        for hunk in hunks:
            if hunk.file_path not in files_to_hunks:
                files_to_hunks[hunk.file_path] = []
            files_to_hunks[hunk.file_path].append(hunk)
        return files_to_hunks

    def _extract_hunk_changes(self, hunk: DiffHunk) -> List[Dict[str, Any]]:
        """Extract all changes from a hunk, handling multiple changes per hunk.

        Returns:
            List of change dictionaries with 'old_line', 'new_line', and 'context'
        """
        changes: List[Dict[str, Any]] = []
        current_change: Dict[str, Any] = {}
        context_before: List[str] = []

        for line in hunk.lines:
            if line.startswith("@@"):
                continue
            elif line.startswith(" "):
                # Context line
                context_line = line[1:].rstrip("\n")
                context_before.append(context_line)
                # Keep only last 3 context lines
                if len(context_before) > 3:
                    context_before.pop(0)

                # If we have a pending deletion and hit context, finalize it
                if "old_line" in current_change and "new_line" not in current_change:
                    current_change["is_deletion"] = True
                    changes.append(current_change.copy())
                    current_change = {}
            elif line.startswith("-") and not line.startswith("---"):
                # If we already have a pending deletion, finalize it first
                if "old_line" in current_change and "new_line" not in current_change:
                    current_change["is_deletion"] = True
                    changes.append(current_change.copy())
                    current_change = {}

                current_change["old_line"] = line[1:].rstrip("\n")
                current_change["context_before"] = context_before.copy()
            elif line.startswith("+") and not line.startswith("+++"):
                new_line = line[1:].rstrip("\n")

                if "old_line" in current_change:
                    # Modification: both old and new line
                    current_change["new_line"] = new_line
                    changes.append(current_change.copy())
                    current_change = {}
                else:
                    # Pure addition: only new line, use context to find insertion point
                    changes.append(
                        {
                            "new_line": new_line,
                            "context_before": context_before.copy(),
                            "is_addition": True,
                        }
                    )

        # Finalize any pending deletion at end of hunk
        if "old_line" in current_change and "new_line" not in current_change:
            current_change["is_deletion"] = True
            changes.append(current_change.copy())

        return changes

    def _find_target_with_context(
        self, change: Dict[str, Any], file_lines: List[str], used_lines: Set[int]
    ) -> Optional[int]:
        """Find target line using context awareness to avoid duplicates.

        Args:
            change: Dictionary with 'old_line' and 'new_line' (modifications) or
                   'new_line' and 'context_before' (additions)
            file_lines: Current file content
            used_lines: Set of line numbers already processed

        Returns:
            Target line number (1-based) or None if not found
        """
        # Handle pure additions using context matching
        if change.get("is_addition", False):
            context_before = change.get("context_before", [])
            if not context_before:
                logger.debug(
                    "Pure addition without context, cannot determine insertion point"
                )
                return None

            # Find where the context sequence appears in the file
            for i in range(len(file_lines) - len(context_before) + 1):
                # Check if context matches
                matches = True
                for j, ctx_line in enumerate(context_before):
                    file_line = file_lines[i + j].rstrip("\n").strip()
                    if file_line != ctx_line.strip():
                        matches = False
                        break

                if matches:
                    # Insert after the context
                    insertion_line = i + len(context_before) + 1  # 1-based
                    if insertion_line not in used_lines:
                        logger.debug(
                            f"Found insertion point for addition at line {insertion_line}"
                        )
                        return insertion_line

            logger.debug("Could not find context match for addition")
            return None

        # Handle modifications (existing logic)
        if "old_line" not in change:
            logger.debug("Change has neither old_line nor is_addition flag")
            return None

        old_line = change["old_line"].strip()
        candidates = []

        # Find all possible matches
        for i, file_line in enumerate(file_lines):
            line_num = i + 1  # 1-based
            file_line_stripped = file_line.rstrip("\n").strip()

            if file_line_stripped == old_line and line_num not in used_lines:
                candidates.append(line_num)

        if not candidates:
            logger.debug(f"No unused matches found for line: '{old_line}'")
            return None

        if len(candidates) == 1:
            logger.debug(f"Found unique match at line {candidates[0]}")
            return candidates[0]

        # Multiple candidates
        logger.debug(f"Multiple candidates for '{old_line}': {candidates}")
        logger.debug(f"Used lines: {sorted(used_lines)}")

        # Use the first unused candidate
        selected = candidates[0]
        logger.debug(f"Selected first unused candidate: {selected}")
        return selected

    def _create_corrected_patch_for_hunks(
        self, hunks: List[DiffHunk], target_commit: str
    ) -> str:
        """Create a patch with line numbers corrected for the target commit state.
        Uses context-aware matching to avoid duplicate hunk conflicts.

        Args:
            hunks: List of hunks to include in patch
            target_commit: Target commit hash

        Returns:
            Patch content with corrected line numbers
        """
        # Group hunks by file
        files_to_hunks: Dict[str, List[DiffHunk]] = self._consolidate_hunks_by_file(
            hunks
        )

        patch_lines = []

        for file_path, file_hunks in files_to_hunks.items():
            logger.debug(f"Processing {len(file_hunks)} hunks for file {file_path}")

            # Add file header
            patch_lines.extend([f"--- a/{file_path}", f"+++ b/{file_path}"])

            # Read the file content from target commit to find correct line numbers
            try:
                # Get file content at target commit
                logger.debug(f"Reading file content from commit {target_commit[:8]}")
                result = self.git_ops.run_git_command(
                    ["show", f"{target_commit}:{file_path}"]
                )
                if result.returncode != 0:
                    continue

                file_lines = result.stdout.splitlines(keepends=True)
            except Exception:
                continue

            # Extract all changes from all hunks for this file
            all_changes = []
            for hunk in file_hunks:
                changes = self._extract_hunk_changes(hunk)
                logger.debug(f"Extracted {len(changes)} changes from hunk")
                for change in changes:
                    change["original_hunk"] = hunk
                    all_changes.append(change)

            # Find target lines for all changes first
            changes_with_targets = []
            used_lines: Set[int] = set()

            logger.debug(f"Mapping {len(all_changes)} changes to target lines")
            for change in all_changes:
                target_line_num = self._find_target_with_context(
                    change, file_lines, used_lines
                )
                if target_line_num is not None:
                    used_lines.add(target_line_num)
                    changes_with_targets.append((change, target_line_num))

            # Sort changes by line number
            changes_with_targets.sort(key=lambda x: x[1])

            # Group overlapping changes to avoid hunk conflicts
            logger.debug(
                f"Consolidating {len(changes_with_targets)} changes into hunks"
            )
            consolidated_hunks = self._consolidate_overlapping_changes(
                changes_with_targets, file_lines
            )

            # Add consolidated hunks to patch
            for hunk_lines in consolidated_hunks:
                patch_lines.extend(hunk_lines)

        return "\n".join(patch_lines) + "\n"

    def _consolidate_overlapping_changes(
        self, changes_with_targets: List[tuple], file_lines: List[str]
    ) -> List[List[str]]:
        """Consolidate overlapping changes into non-overlapping hunks.

        Args:
            changes_with_targets: List of (change_dict, target_line_num) tuples sorted by line number
            file_lines: Current file content

        Returns:
            List of hunk line lists ready for patch inclusion
        """
        if not changes_with_targets:
            return []

        consolidated_hunks = []
        current_group: List[tuple] = []

        # Group changes that would create overlapping context
        for i, (change, line_num) in enumerate(changes_with_targets):
            if not current_group:
                # Start new group
                current_group = [(change, line_num)]
            else:
                # Check if this change overlaps with the current group's context
                group_start = min(line for _, line in current_group) - 6
                group_end = max(line for _, line in current_group) + 6

                change_start = line_num - 6
                change_end = line_num + 6

                # If contexts overlap, add to current group
                if change_start <= group_end and change_end >= group_start:
                    current_group.append((change, line_num))
                else:
                    # No overlap, create hunk for current group and start new group
                    hunk_lines = self._create_consolidated_hunk(
                        current_group, file_lines
                    )
                    if hunk_lines:
                        consolidated_hunks.append(hunk_lines)
                    current_group = [(change, line_num)]

        # Process final group
        if current_group:
            hunk_lines = self._create_consolidated_hunk(current_group, file_lines)
            if hunk_lines:
                consolidated_hunks.append(hunk_lines)

        return consolidated_hunks

    def _create_consolidated_hunk(
        self, changes_group: List[tuple], file_lines: List[str]
    ) -> List[str]:
        """Create a single hunk containing multiple changes.

        Args:
            changes_group: List of (change_dict, target_line_num) tuples to include in hunk
            file_lines: Current file content

        Returns:
            List of hunk lines, or empty list if creation failed
        """
        if not changes_group:
            return []

        # Determine the overall context range for all changes
        min_line = min(line_num for _, line_num in changes_group)
        max_line = max(line_num for _, line_num in changes_group)

        # Expand context to ensure good patch application (6 lines each side)
        context_start = max(1, min_line - 6)
        context_end = min(len(file_lines), max_line + 6)

        # Create change mapping for quick lookup
        changes_by_line = {line_num: change for change, line_num in changes_group}

        # Count additions and deletions to adjust new_count in hunk header
        num_additions = sum(
            1 for change, _ in changes_group if change.get("is_addition", False)
        )
        num_deletions = sum(
            1 for change, _ in changes_group if change.get("is_deletion", False)
        )

        # Build the hunk header
        old_count = context_end - context_start + 1
        new_count = (
            old_count + num_additions - num_deletions
        )  # Adjust for additions and deletions
        hunk_lines = []
        hunk_lines.append(
            f"@@ -{context_start},{old_count} +{context_start},{new_count} @@ "
        )

        # Build the hunk content
        for line_num in range(context_start, context_end + 1):
            if line_num > len(file_lines):
                break

            # Check if we need to insert additions/deletions/modifications at this line
            if line_num in changes_by_line:
                change = changes_by_line[line_num]

                if change.get("is_addition", False):
                    # Pure addition: insert new line before this line
                    new_line = change["new_line"]
                    hunk_lines.append(f"+{new_line}")
                    # Then output the current line as context
                    file_line = file_lines[line_num - 1].rstrip("\n")
                    hunk_lines.append(f" {file_line}")
                elif change.get("is_deletion", False):
                    # Pure deletion: remove this line (only output - line, no + line)
                    file_line = file_lines[line_num - 1].rstrip("\n")
                    hunk_lines.append(f"-{file_line}")
                    # Don't output context line after deletion - the line is being removed
                else:
                    # Modification: replace this line
                    file_line = file_lines[line_num - 1].rstrip("\n")
                    new_line = change["new_line"]
                    hunk_lines.append(f"-{file_line}")
                    hunk_lines.append(f"+{new_line}")
            else:
                # Context line
                file_line = file_lines[line_num - 1].rstrip("\n")
                hunk_lines.append(f" {file_line}")

        return hunk_lines

    def _create_corrected_hunk_for_change(
        self, change: Dict[str, Any], target_line_num: int, file_lines: List[str]
    ) -> List[str]:
        """Create a corrected hunk for a single change at a specific line number.

        Args:
            change: Dictionary with 'old_line' and 'new_line'
            target_line_num: Target line number (1-based)
            file_lines: Current file content

        Returns:
            List of hunk lines for this change
        """
        new_line = change["new_line"]

        # Create context around the target line (6 lines before and after for better resilience)
        context_start = max(1, target_line_num - 6)
        context_end = min(len(file_lines), target_line_num + 6)

        # Build the hunk header
        old_count = context_end - context_start + 1
        new_count = old_count  # Same count since we're replacing one line
        hunk_lines = []
        hunk_lines.append(
            f"@@ -{context_start},{old_count} +{context_start},{new_count} @@ "
        )

        # Build the hunk content
        for line_num in range(context_start, context_end + 1):
            if line_num > len(file_lines):
                break

            file_line = file_lines[line_num - 1].rstrip(
                "\n"
            )  # Convert to 0-based and remove newline

            if line_num == target_line_num:
                # This is the line to change
                hunk_lines.append(f"-{file_line}")
                hunk_lines.append(f"+{new_line}")
            else:
                # Context line
                hunk_lines.append(f" {file_line}")

        return hunk_lines

    def _generate_rebase_todo(self, target_commit: str) -> str:
        """Generate rebase todo list with target commit marked for editing.

        Args:
            target_commit: Commit to mark for editing

        Returns:
            Rebase todo content
        """
        # Get current HEAD commit
        head_result = self.git_ops.run_git_command(["rev-parse", "HEAD"])
        if head_result.returncode != 0:
            return f"edit {target_commit}\n"

        # Check if target commit is reachable from HEAD
        reachable_result = self.git_ops.run_git_command(
            ["merge-base", "--is-ancestor", target_commit, "HEAD"]
        )

        if reachable_result.returncode == 0:
            # Target commit is an ancestor of HEAD - use normal range
            result = self.git_ops.run_git_command(
                ["rev-list", "--reverse", f"{target_commit}^..HEAD"]
            )
        else:
            # Target commit is not in current branch history
            # Find common ancestor and create range from there
            merge_base_result = self.git_ops.run_git_command(
                ["merge-base", target_commit, "HEAD"]
            )

            if merge_base_result.returncode == 0:
                merge_base = merge_base_result.stdout.strip()
                # Get commits from merge base to HEAD that include our target
                result = self.git_ops.run_git_command(
                    ["rev-list", "--reverse", f"{merge_base}..HEAD", target_commit]
                )
            else:
                # No common ancestor found, fallback to simple edit
                return f"edit {target_commit}\n"

        if result.returncode != 0:
            # Fallback to simple edit if rev-list fails
            return f"edit {target_commit}\n"

        commit_list = [
            line.strip() for line in result.stdout.strip().split("\n") if line.strip()
        ]

        # If no commits found, use simple edit
        if not commit_list:
            return f"edit {target_commit}\n"

        # Check if source commit is in the commit list
        # When using --source <commit>, check if there are ignored hunks
        if self._context and self._context.source_commit:
            # Get full SHA of source commit for comparison
            source_result = self.git_ops.run_git_command(
                ["rev-parse", self._context.source_commit]
            )
            if source_result.returncode == 0:
                source_sha = source_result.stdout.strip()
                if source_sha in commit_list:
                    # Check if there are ignored hunks from source
                    has_ignored_hunks = bool(
                        hasattr(self, "_ignored_mappings") and self._ignored_mappings
                    )

                    if has_ignored_hunks:
                        # Mark source for special edit handling
                        self._source_needs_edit = source_sha
                    else:
                        # Remove source commit from the list - all its changes were squashed elsewhere
                        commit_list = [c for c in commit_list if c != source_sha]

                        if not commit_list:
                            # Only source commit was in the list, use simple edit
                            return f"edit {target_commit}\n"

        # Use comprehensive rebase approach
        todo_lines = []
        for commit_hash in commit_list:
            if commit_hash == target_commit:
                todo_lines.append(f"edit {commit_hash}")
            elif (
                hasattr(self, "_source_needs_edit")
                and commit_hash == self._source_needs_edit
            ):
                todo_lines.append(f"edit {commit_hash}")  # Source needs editing
            else:
                todo_lines.append(f"pick {commit_hash}")

        return "\n".join(todo_lines) + "\n"

    def _commit_might_conflict_with_target(
        self, commit_hash: str, target_commit: str, target_files: Optional[set] = None
    ) -> bool:
        """Check if a commit might conflict with changes to the target commit.

        Args:
            commit_hash: Commit to check for conflicts
            target_commit: Target commit being modified
            target_files: Set of files being modified in target (optional, will be computed if not provided)

        Returns:
            True if commit might conflict with target modifications
        """
        # Get files modified by the potentially conflicting commit
        result = self.git_ops.run_git_command(
            ["diff-tree", "--no-commit-id", "--name-only", "-r", commit_hash]
        )

        if result.returncode != 0:
            # If we can't determine files, assume potential conflict for safety
            return True

        commit_files = set(
            line.strip() for line in result.stdout.strip().split("\n") if line.strip()
        )

        # Get files modified in target commit if not provided
        if target_files is None:
            target_result = self.git_ops.run_git_command(
                ["diff-tree", "--no-commit-id", "--name-only", "-r", target_commit]
            )

            if target_result.returncode != 0:
                # If we can't determine target files, assume potential conflict
                return True

            target_files = set(
                line.strip()
                for line in target_result.stdout.strip().split("\n")
                if line.strip()
            )

        # Check for file overlap - if same files are modified, potential conflict
        file_overlap = commit_files.intersection(target_files)

        if file_overlap:
            return True

        return False

    def _should_use_simple_rebase(self, target_commit: str) -> bool:
        """Determine if we should use simple rebase approach to avoid conflicts.

        Args:
            target_commit: Target commit being modified

        Returns:
            True if simple rebase approach should be used
        """
        # Check if there are subsequent commits that might conflict
        result = self.git_ops.run_git_command(
            ["rev-list", "--reverse", f"{target_commit}^..HEAD"]
        )

        if result.returncode != 0:
            return False

        commit_list = [
            line.strip() for line in result.stdout.strip().split("\n") if line.strip()
        ]

        # Get target files once for efficiency
        target_result = self.git_ops.run_git_command(
            ["diff-tree", "--no-commit-id", "--name-only", "-r", target_commit]
        )

        if target_result.returncode != 0:
            return False

        target_files = set(
            line.strip()
            for line in target_result.stdout.strip().split("\n")
            if line.strip()
        )

        # Check if any subsequent commits might conflict
        for commit_hash in commit_list:
            if commit_hash != target_commit:
                if self._commit_might_conflict_with_target(
                    commit_hash, target_commit, target_files
                ):
                    return True

        return False

    def _create_corrected_hunk(
        self, hunk: DiffHunk, file_lines: List[str], file_path: str
    ) -> List[str]:
        """Create a corrected hunk with proper line numbers for the current file state.

        Args:
            hunk: Original hunk
            file_lines: Current file content as list of lines
            file_path: Path to the file

        Returns:
            List of corrected hunk lines
        """
        # Extract the old and new content from the hunk
        old_line = None
        new_line = None

        for line in hunk.lines:
            if line.startswith("-") and "MICROPY_PY___FILE__" in line:
                old_line = line[1:].rstrip("\n")  # Remove '-' and trailing newline
            elif line.startswith("+") and "MICROPY_MODULE___FILE__" in line:
                new_line = line[1:].rstrip("\n")  # Remove '+' and trailing newline

        if not old_line or not new_line:
            return []
        # Find the line number in the current file
        target_line_num = None
        for i, file_line in enumerate(file_lines):
            if file_line.rstrip("\n").strip() == old_line.strip():
                target_line_num = i + 1  # Convert to 1-based line numbering
                break

        if target_line_num is None:
            return []

        # Create context around the target line (6 lines before and after for better resilience)
        context_start = max(1, target_line_num - 6)
        context_end = min(len(file_lines), target_line_num + 6)

        # Build the hunk
        hunk_lines = []
        hunk_lines.append(
            f"@@ -{context_start},{context_end - context_start + 1} +{context_start},{context_end - context_start + 1} @@ "
        )

        for line_num in range(context_start, context_end + 1):
            if line_num > len(file_lines):
                break

            file_line = file_lines[line_num - 1].rstrip(
                "\n"
            )  # Convert to 0-based and remove newline

            if line_num == target_line_num:
                # This is the line to change
                hunk_lines.append(f"-{file_line}")
                hunk_lines.append(f"+{new_line}")
            else:
                # Context line
                hunk_lines.append(f" {file_line}")

        return hunk_lines

    def _create_patch_for_hunks(self, hunks: List[DiffHunk]) -> str:
        """Create a patch string from a list of hunks.

        Args:
            hunks: List of hunks to include in patch

        Returns:
            Patch content as string
        """
        patch_lines = []
        current_file = None

        for hunk in hunks:
            if hunk.lines:
                # Add file header if this is a new file
                if hunk.file_path != current_file:
                    current_file = hunk.file_path
                    patch_lines.extend(
                        [f"--- a/{hunk.file_path}", f"+++ b/{hunk.file_path}"]
                    )
                # Add hunk content
                patch_lines.extend(hunk.lines)
        patch_content = "\n".join(patch_lines) + "\n"
        return patch_content

    def _start_rebase_edit(self, target_commit: str) -> bool:
        """Start interactive rebase to edit target commit.

        Args:
            target_commit: Commit to edit

        Returns:
            True if rebase started successfully
        """
        # Check if rebase is already in progress
        if self.is_rebase_in_progress():
            # Complete the existing rebase by continuing through all edit points
            try:
                self._complete_remaining_rebase()
            except Exception:
                # Clean up the failed rebase state
                self._cleanup_rebase_state()

        # Create rebase todo that marks target commit for editing and picks all others
        todo_content = self._generate_rebase_todo(target_commit)

        # Write todo to temporary file
        with tempfile.NamedTemporaryFile(mode="w", suffix=".txt", delete=False) as f:
            f.write(todo_content)
            todo_file = f.name

        try:
            # Set git editor to use our todo file
            env = os.environ.copy()
            env["GIT_SEQUENCE_EDITOR"] = f"cp {todo_file}"

            # Start interactive rebase from target commit to include commits after it
            result = self.git_ops.run_git_command(
                ["rebase", "-i", f"{target_commit}^"], env=env
            )
            if result.returncode != 0:
                # Rebase failed to start
                return False

            return True

        finally:
            # Clean up temp file
            try:
                os.unlink(todo_file)
            except OSError:
                pass

    def _complete_remaining_rebase(self) -> None:
        """Complete remaining edit points in an active rebase without making changes.

        This is used when we need to finish a rebase that has edit points we don't
        need to handle (e.g., source commit edit points that will be handled later).
        """
        max_iterations = 20  # Prevent infinite loops
        iteration = 0

        while self.is_rebase_in_progress() and iteration < max_iterations:
            iteration += 1
            # Just continue without making any changes
            result = self.git_ops.run_git_command(["rebase", "--continue"])

            if result.returncode != 0:
                # Check if it's an empty commit
                if "nothing to commit" in result.stderr:
                    self.git_ops.run_git_command(["rebase", "--skip"])
                else:
                    # Some other error - re-raise
                    raise subprocess.SubprocessError(
                        f"Failed to complete rebase: {result.stderr}"
                    )

        if iteration >= max_iterations:
            raise subprocess.SubprocessError(
                "Failed to complete rebase after maximum iterations"
            )

    def _cleanup_rebase_state(self) -> None:
        """Clean up any existing rebase state that might interfere."""
        # Check if there's an ongoing rebase
        rebase_merge_dir = os.path.join(self.git_ops.repo_path, ".git", "rebase-merge")
        rebase_apply_dir = os.path.join(self.git_ops.repo_path, ".git", "rebase-apply")

        if os.path.exists(rebase_merge_dir) or os.path.exists(rebase_apply_dir):
            # Try to abort any existing rebase
            self.git_ops.run_git_command(["rebase", "--abort"])

    def _create_patch_from_original_hunks(self, hunks: List[DiffHunk]) -> str:
        """Create a patch using the ORIGINAL hunk text from git (not reconstructed).

        This preserves the exact hunk text extracted by HunkParser from 'git show',
        avoiding manual reconstruction bugs. Git will handle any line number
        differences with --3way and --recount.

        Args:
            hunks: List of hunks with original text from git

        Returns:
            Patch content string
        """
        patch_lines = []
        current_file = None

        for hunk in hunks:
            # Add file header if this is a new file
            if hunk.file_path != current_file:
                current_file = hunk.file_path

                # For file deletions, include the deletion metadata
                if hunk.is_file_deletion and hunk.deleted_file_mode:
                    patch_lines.extend(
                        [
                            f"--- a/{hunk.file_path}",
                            "+++ /dev/null",
                            f"deleted file mode {hunk.deleted_file_mode}",
                        ]
                    )
                else:
                    patch_lines.extend(
                        [f"--- a/{hunk.file_path}", f"+++ b/{hunk.file_path}"]
                    )

            # Add ORIGINAL hunk lines from git (not reconstructed)
            patch_lines.extend(hunk.lines)

        return "\n".join(patch_lines) + "\n"

    def _apply_patch_with_3way(self, patch_content: str) -> None:
        """Apply patch using git's 3-way merge and fuzzy matching.

        Uses git apply --3way --recount to let git handle:
        - Line number differences (--recount)
        - Context mismatches (--3way merge)
        - Explicit conflict reporting (not silent corruption)

        Args:
            patch_content: Patch content to apply

        Raises:
            RebaseConflictError: If patch application fails with conflicts
            subprocess.SubprocessError: If patch application fails
        """
        # Write patch to temporary file
        with tempfile.NamedTemporaryFile(mode="w", suffix=".patch", delete=False) as f:
            f.write(patch_content)
            patch_file = f.name
            logger.debug(f"Wrote patch to temporary file: {patch_file}")
        try:
            # Apply patch with 3-way merge and automatic line number recalculation
            logger.debug(f"Running git apply --3way --recount {patch_file}")
            result = self.git_ops.run_git_command(
                [
                    "apply",
                    "--3way",
                    "--recount",
                    patch_file,
                ]
            )
            logger.debug(f"git apply returned code: {result.returncode}")
            logger.debug(f"git apply stdout: {result.stdout}")
            logger.debug(f"git apply stderr: {result.stderr}")

            # Only show stderr if there was an error (returncode != 0)
            # Success can still have warnings about 3-way fallback which are noise
            if result.returncode != 0:
                logger.debug("Patch application failed, checking for conflicts")
                # Check if there are conflicts
                conflicted_files = self._get_conflicted_files()
                logger.debug(f"Conflicted files: {conflicted_files}")
                if conflicted_files:
                    raise RebaseConflictError(
                        f"Patch application failed with conflicts: {result.stderr}",
                        conflicted_files,
                    )
                else:
                    raise subprocess.SubprocessError(
                        f"Patch application failed: {result.stderr}"
                    )

        finally:
            # Clean up temp file
            try:
                os.unlink(patch_file)
            except OSError:
                pass

    def _apply_patch(self, patch_content: str) -> None:
        """Apply patch content to working directory (LEGACY - use _apply_patch_with_3way).

        Args:
            patch_content: Patch content to apply
        """
        # Write patch to temporary file
        with tempfile.NamedTemporaryFile(mode="w", suffix=".patch", delete=False) as f:
            f.write(patch_content)
            patch_file = f.name
            logger.debug(f"Wrote patch to temporary file: {patch_file}")
        try:
            # Apply patch using git apply with fuzzy matching for better context handling
            logger.debug(
                f"Running git apply --ignore-whitespace --whitespace=nowarn {patch_file}"
            )
            result = self.git_ops.run_git_command(
                [
                    "apply",
                    "--ignore-whitespace",
                    "--whitespace=nowarn",
                    patch_file,
                ]
            )
            logger.debug(f"git apply returned code: {result.returncode}")
            logger.debug(f"git apply stdout: {result.stdout}")
            logger.debug(f"git apply stderr: {result.stderr}")

            if result.returncode != 0:
                logger.debug("Patch application failed, checking for conflicts")
                # Check if there are conflicts
                conflicted_files = self._get_conflicted_files()
                logger.debug(f"Conflicted files: {conflicted_files}")
                if conflicted_files:
                    raise RebaseConflictError(
                        f"Patch application failed with conflicts: {result.stderr}",
                        conflicted_files,
                    )
                else:
                    raise subprocess.SubprocessError(
                        f"Patch application failed: {result.stderr}"
                    )

        finally:
            # Clean up temp file
            try:
                os.unlink(patch_file)
            except OSError:
                pass

    def _amend_commit(self) -> None:
        """Amend the current commit with changes, handling pre-commit hook modifications and empty commits."""
        # Stage all changes
        result = self.git_ops.run_git_command(["add", "."])
        if result.returncode != 0:
            raise subprocess.SubprocessError(
                f"Failed to stage changes: {result.stderr}"
            )

        # Attempt to amend commit (keep original message)
        result = self.git_ops.run_git_command(["commit", "--amend", "--no-edit"])
        if result.returncode != 0:
            # Check if the failure was due to empty commit
            if "would make" in result.stderr and "empty" in result.stderr:
                logger.debug("Amend would create empty commit, using --allow-empty")
                # Allow empty commit - this happens when changes cancel out the original
                retry_result = self.git_ops.run_git_command(
                    ["commit", "--amend", "--no-edit", "--allow-empty"]
                )
                if retry_result.returncode != 0:
                    raise subprocess.SubprocessError(
                        f"Failed to amend empty commit: {retry_result.stderr}"
                    )
                logger.debug("Successfully amended with --allow-empty")
            # Check if the failure was due to pre-commit hook modifications
            elif "files were modified by this hook" in result.stderr:
                logger.debug(
                    "Pre-commit hook modified files, re-staging and retrying commit"
                )
                # Re-stage all changes after hook modifications
                stage_result = self.git_ops.run_git_command(["add", "."])
                if stage_result.returncode != 0:
                    raise subprocess.SubprocessError(
                        f"Failed to re-stage hook modifications: {stage_result.stderr}"
                    )
                # Retry the amend with hook modifications included
                retry_result = self.git_ops.run_git_command(
                    ["commit", "--amend", "--no-edit"]
                )
                if retry_result.returncode != 0:
                    raise subprocess.SubprocessError(
                        f"Failed to amend commit after hook modifications: {retry_result.stderr}"
                    )
                logger.debug(
                    "Successfully amended commit with pre-commit hook modifications"
                )
            else:
                raise subprocess.SubprocessError(
                    f"Failed to amend commit: {result.stderr}"
                )

    def _continue_rebase(self) -> None:
        """Continue the interactive rebase, handling empty commits."""
        max_retries = 10  # Prevent infinite loops
        retry_count = 0

        while retry_count < max_retries:
            result = self.git_ops.run_git_command(["rebase", "--continue"])
            logger.debug(f"git rebase --continue returned: {result.returncode}")
            logger.debug(f"git rebase --continue stdout: {result.stdout}")
            logger.debug(f"git rebase --continue stderr: {result.stderr}")

            if result.returncode == 0:
                # Check if rebase is actually complete or just stopped at next edit point
                if not self.is_rebase_in_progress():
                    # Rebase completed successfully
                    logger.debug("Rebase completed successfully")
                    return
                else:
                    # Rebase stopped at next edit point - need to continue through remaining edits
                    logger.debug(
                        "Rebase stopped at next edit point, continuing through remaining edits"
                    )
                    # Continue through any remaining edit points automatically
                    retry_count += 1
                    continue

            # Check if this is an empty commit that should be skipped
            if (
                "The previous cherry-pick is now empty" in result.stderr
                or "nothing to commit, working tree clean" in result.stderr
            ):
                logger.debug("Skipping empty commit during rebase")
                skip_result = self.git_ops.run_git_command(["rebase", "--skip"])
                if skip_result.returncode == 0:
                    # Check if rebase is complete
                    status_result = self.git_ops.run_git_command(
                        ["status", "--porcelain=v1"]
                    )
                    if (
                        status_result.returncode == 0
                        and not self.is_rebase_in_progress()
                    ):
                        return  # Rebase completed
                    retry_count += 1
                    continue
                else:
                    raise subprocess.SubprocessError(
                        f"Failed to skip empty commit: {skip_result.stderr}"
                    )
            else:
                # Check for conflicts
                conflicted_files = self._get_conflicted_files()
                if conflicted_files:
                    raise RebaseConflictError(
                        f"Rebase conflicts detected: {result.stderr}", conflicted_files
                    )
                else:
                    raise subprocess.SubprocessError(
                        f"Failed to continue rebase: {result.stderr}"
                    )

        raise subprocess.SubprocessError(
            f"Rebase failed after {max_retries} attempts to handle empty commits"
        )

    def _abort_rebase(self) -> None:
        """Abort the current rebase."""
        try:
            self.git_ops.run_git_command(["rebase", "--abort"])
        except subprocess.SubprocessError:
            # Ignore errors during abort
            pass

    def _get_conflicted_files(self) -> List[str]:
        """Get list of files with merge conflicts.

        Returns:
            List of file paths with conflicts
        """
        try:
            result = self.git_ops.run_git_command(
                ["diff", "--name-only", "--diff-filter=U"]
            )
            if result.returncode == 0:
                return [
                    line.strip() for line in result.stdout.split("\n") if line.strip()
                ]
        except subprocess.SubprocessError:
            pass

        return []

    def _cleanup_on_error(self) -> None:
        """Cleanup state after error."""
        # Abort any active rebase
        self._abort_rebase()

        # Restore stash if we created one
        if self._stash_ref:
            try:
                success = self._restore_stash_by_sha(self._stash_ref)
                if not success:
                    logger.warning(
                        f"Failed to restore stash during cleanup: {self._stash_ref[:12]}"
                    )
            except Exception as e:
                # Stash restoration failed, but don't raise - user can manually recover
                logger.warning(f"Error restoring stash during cleanup: {e}")
            finally:
                self._stash_ref = None

    def abort_operation(self) -> None:
        """Abort the current squash operation and restore original state."""
        self._cleanup_on_error()

    def is_rebase_in_progress(self) -> bool:
        """Check if a rebase is currently in progress.

        Returns:
            True if rebase is active
        """
        # Check for rebase directories that indicate an active rebase
        rebase_merge_dir = os.path.join(self.git_ops.repo_path, ".git", "rebase-merge")
        rebase_apply_dir = os.path.join(self.git_ops.repo_path, ".git", "rebase-apply")

        if os.path.exists(rebase_merge_dir) or os.path.exists(rebase_apply_dir):
            return True

        # Also check git status output for rebase indicators
        try:
            result = self.git_ops.run_git_command(["status"])
            if result.returncode == 0 and "rebase in progress" in result.stdout:
                return True
        except subprocess.SubprocessError:
            pass

        return False

    def get_rebase_status(self) -> Dict[str, Any]:
        """Get current rebase status information.

        Returns:
            Dictionary with rebase status details
        """
        status: Dict[str, Any] = {
            "in_progress": False,
            "current_commit": None,
            "conflicted_files": [],
            "step": None,
            "total_steps": None,
        }

        if not self.is_rebase_in_progress():
            return status

        status["in_progress"] = True
        status["conflicted_files"] = self._get_conflicted_files()

        # Try to get rebase step info
        try:
            rebase_dir = os.path.join(self.git_ops.repo_path, ".git", "rebase-merge")
            if os.path.exists(rebase_dir):
                # Read step info
                msgnum_file = os.path.join(rebase_dir, "msgnum")
                end_file = os.path.join(rebase_dir, "end")

                if os.path.exists(msgnum_file) and os.path.exists(end_file):
                    with open(msgnum_file, "r") as f:
                        status["step"] = int(f.read().strip())
                    with open(end_file, "r") as f:
                        status["total_steps"] = int(f.read().strip())
        except (OSError, ValueError):
            pass

        return status

    def _process_source_with_ignored_hunks(
        self, source_commit: str, approved_hunks_by_target: Dict[str, List[DiffHunk]]
    ) -> bool:
        """Process source commit by removing approved hunks, keeping only ignored hunks.

        This is called when using --source with hunks that require manual selection.
        Instead of losing those hunks, we preserve them in the source commit at its
        original position.

        Args:
            source_commit: The source commit SHA to process
            approved_hunks_by_target: Dict mapping target commits to their approved hunks

        Returns:
            True if successful, False otherwise
        """
        logger.debug(f"Processing source commit {source_commit[:8]} with ignored hunks")

        # Get split commit SHAs for ignored hunks from the ignored mappings
        ignored_split_commits = []
        if hasattr(self, "_ignored_mappings"):
            for mapping in self._ignored_mappings:
                if mapping.source_commit_sha:
                    ignored_split_commits.append(mapping.source_commit_sha)
                    logger.debug(
                        f"Found ignored split commit: {mapping.source_commit_sha[:8]}"
                    )

        if not ignored_split_commits:
            logger.debug(
                "No ignored split commits found, source commit will be removed"
            )
            # Remove the source commit entirely (all hunks were squashed)
            result = self.git_ops.run_git_command(["rebase", "--skip"])
            if result.returncode != 0:
                logger.debug(f"Failed to skip source commit: {result.stderr}")
                return False
            return True

        logger.debug(
            f"Keeping {len(ignored_split_commits)} ignored hunks in source commit"
        )
        logger.debug("Ignored hunks are available in split commits:")
        for i, commit_sha in enumerate(ignored_split_commits, 1):
            logger.debug(f"  {i}. {commit_sha[:8]}")

        logger.debug(f"Source commit {source_commit[:8]} left unchanged")
        logger.debug(
            "To apply ignored hunks manually, cherry-pick the split commits above"
        )

        # Ignored hunks remain in split commits for manual review
        # This is safer than automatically modifying the source commit after rebases
        return True

    def _hunk_id(self, hunk: DiffHunk) -> str:
        """Create unique ID for hunk based on file, line range, and first line of content.

        Args:
            hunk: The hunk to create ID for

        Returns:
            Unique string identifier for the hunk
        """
        # Construct header from line numbers
        header = f"@@ -{hunk.old_start},{hunk.old_count} +{hunk.new_start},{hunk.new_count} @@"

        # Use file path, line numbers, and first line of actual changes
        first_change = next(
            (
                line
                for line in hunk.lines
                if line.startswith(("+", "-")) and not line.startswith(("+++", "---"))
            ),
            "",
        )
        return f"{hunk.file_path}:{header}:{first_change[:50]}"

    def _get_hunks_from_commit(self, commit: str) -> List[DiffHunk]:
        """Get all hunks from a commit.

        Args:
            commit: Commit SHA to get hunks from

        Returns:
            List of DiffHunk objects
        """
        from git_autosquash.hunk_parser import HunkParser

        parser = HunkParser(self.git_ops)
        return parser.get_diff_hunks(line_by_line=False, from_commit=commit)

    def _get_commit_message(self, commit: str) -> str:
        """Get commit message (excluding headers like commit SHA, author, etc).

        Args:
            commit: Commit SHA

        Returns:
            Commit message body
        """
        result = self.git_ops.run_git_command(["log", "-1", "--format=%B", commit])
        if result.returncode != 0:
            return "Unknown commit message"
        return result.stdout.strip()

    def _amend_commit_with_message(self, message: str) -> None:
        """Amend current commit with new message.

        Args:
            message: New commit message

        Raises:
            subprocess.SubprocessError: If amend fails
        """
        import tempfile

        with tempfile.NamedTemporaryFile(mode="w", delete=False, suffix=".txt") as f:
            f.write(message)
            msg_file = f.name

        try:
            # Stage all changes
            result = self.git_ops.run_git_command(["add", "-A"])
            if result.returncode != 0:
                raise subprocess.SubprocessError(f"Failed to stage: {result.stderr}")

            # Amend with message file
            result = self.git_ops.run_git_command(["commit", "--amend", "-F", msg_file])
            if result.returncode != 0:
                raise subprocess.SubprocessError(
                    f"Failed to amend commit: {result.stderr}"
                )
        finally:
            try:
                os.unlink(msg_file)
            except OSError:
                pass
Functions
__init__(git_ops: GitOps, merge_base: str) -> None

Initialize rebase manager.

Parameters:

Name Type Description Default
git_ops GitOps

Git operations handler

required
merge_base str

Merge base commit hash

required
Source code in src/git_autosquash/rebase_manager.py
def __init__(self, git_ops: GitOps, merge_base: str) -> None:
    """Initialize rebase manager.

    Args:
        git_ops: Git operations handler
        merge_base: Merge base commit hash
    """
    self.git_ops = git_ops
    self.merge_base = merge_base
    self._stash_ref: Optional[str] = None
    self._original_branch: Optional[str] = None
    self._batch_ops: Optional[BatchGitOperations] = None
    self._context: Optional[SquashContext] = None
execute_squash(mappings: List[HunkTargetMapping], ignored_mappings: List[HunkTargetMapping], context: SquashContext) -> bool

Execute the squash operation for approved mappings.

Parameters:

Name Type Description Default
mappings List[HunkTargetMapping]

List of approved hunk to commit mappings

required
ignored_mappings List[HunkTargetMapping]

List of ignored hunk to commit mappings

required
context SquashContext

SquashContext for --source commits and blame configuration

required

Returns:

Type Description
bool

True if successful, False if user aborted

Raises:

Type Description
RebaseConflictError

If conflicts occur during rebase

SubprocessError

If git operations fail

Source code in src/git_autosquash/rebase_manager.py
def execute_squash(
    self,
    mappings: List[HunkTargetMapping],
    ignored_mappings: List[HunkTargetMapping],
    context: SquashContext,
) -> bool:
    """Execute the squash operation for approved mappings.

    Args:
        mappings: List of approved hunk to commit mappings
        ignored_mappings: List of ignored hunk to commit mappings
        context: SquashContext for --source commits and blame configuration

    Returns:
        True if successful, False if user aborted

    Raises:
        RebaseConflictError: If conflicts occur during rebase
        subprocess.SubprocessError: If git operations fail
    """
    if not mappings and not ignored_mappings:
        return True

    self._context = context
    self._ignored_mappings = ignored_mappings

    # Store original branch for cleanup
    self._original_branch = self.git_ops.get_current_branch()
    if not self._original_branch:
        raise ValueError("Cannot determine current branch")

    try:
        # Group hunks by target commit
        commit_hunks = self._group_hunks_by_commit(mappings)

        # Build mapping from hunk to source_commit_sha for cherry-pick
        hunk_to_source_commit = {}
        for mapping in mappings:
            if mapping.source_commit_sha:
                # Use hunk as key (need to find it in commit_hunks)
                hunk_to_source_commit[id(mapping.hunk)] = mapping.source_commit_sha

        # Check working tree state and handle stashing if needed
        self._handle_working_tree_state()

        # Execute rebase for each target commit
        target_commits = self._get_commit_order(set(commit_hunks.keys()))
        logger.debug(
            f"Processing {len(target_commits)} target commits in order: {[c[:8] for c in target_commits]}"
        )

        for target_commit in target_commits:
            hunks = commit_hunks[target_commit]
            # Get source commit SHAs for these hunks
            source_commits = [hunk_to_source_commit.get(id(hunk)) for hunk in hunks]
            logger.debug(
                f"Processing target commit {target_commit[:8]} with {len(hunks)} hunks"
            )
            success = self._apply_hunks_to_commit(
                target_commit, hunks, source_commits
            )
            if not success:
                logger.debug(f"Failed to apply hunks to commit {target_commit[:8]}")
                return False
            logger.debug(
                f"Successfully applied hunks to commit {target_commit[:8]}"
            )
            logger.debug("=" * 80)

        # Handle source commit if it has ignored hunks
        if hasattr(self, "_source_needs_edit") and self._ignored_mappings:
            logger.debug(
                f"Processing source commit with {len(self._ignored_mappings)} ignored hunks"
            )
            success = self._process_source_with_ignored_hunks(
                self._source_needs_edit, commit_hunks
            )
            if not success:
                logger.debug(
                    f"Failed to process source commit {self._source_needs_edit[:8]}"
                )
                return False
            logger.debug(
                f"Successfully processed source commit {self._source_needs_edit[:8]}"
            )
            logger.debug("=" * 80)

        # Restore stash if we created one (success path)
        if self._stash_ref:
            try:
                success = self._restore_stash_by_sha(self._stash_ref)
                if not success:
                    logger.error(f"Failed to restore stash: {self._stash_ref[:12]}")
                    logger.info(
                        f"Manual restore: git stash apply {self._stash_ref[:12]}"
                    )
            except Exception as e:
                logger.error(f"Error restoring stash: {e}")
                logger.info(
                    f"Manual restore: git stash apply {self._stash_ref[:12]}"
                )
            finally:
                self._stash_ref = None

        return True

    except RebaseConflictError:
        # Don't cleanup on rebase conflicts - let user resolve manually
        raise
    except Exception:
        # Cleanup on any other error
        self._cleanup_on_error()
        raise
abort_operation() -> None

Abort the current squash operation and restore original state.

Source code in src/git_autosquash/rebase_manager.py
def abort_operation(self) -> None:
    """Abort the current squash operation and restore original state."""
    self._cleanup_on_error()
is_rebase_in_progress() -> bool

Check if a rebase is currently in progress.

Returns:

Type Description
bool

True if rebase is active

Source code in src/git_autosquash/rebase_manager.py
def is_rebase_in_progress(self) -> bool:
    """Check if a rebase is currently in progress.

    Returns:
        True if rebase is active
    """
    # Check for rebase directories that indicate an active rebase
    rebase_merge_dir = os.path.join(self.git_ops.repo_path, ".git", "rebase-merge")
    rebase_apply_dir = os.path.join(self.git_ops.repo_path, ".git", "rebase-apply")

    if os.path.exists(rebase_merge_dir) or os.path.exists(rebase_apply_dir):
        return True

    # Also check git status output for rebase indicators
    try:
        result = self.git_ops.run_git_command(["status"])
        if result.returncode == 0 and "rebase in progress" in result.stdout:
            return True
    except subprocess.SubprocessError:
        pass

    return False
get_rebase_status() -> Dict[str, Any]

Get current rebase status information.

Returns:

Type Description
Dict[str, Any]

Dictionary with rebase status details

Source code in src/git_autosquash/rebase_manager.py
def get_rebase_status(self) -> Dict[str, Any]:
    """Get current rebase status information.

    Returns:
        Dictionary with rebase status details
    """
    status: Dict[str, Any] = {
        "in_progress": False,
        "current_commit": None,
        "conflicted_files": [],
        "step": None,
        "total_steps": None,
    }

    if not self.is_rebase_in_progress():
        return status

    status["in_progress"] = True
    status["conflicted_files"] = self._get_conflicted_files()

    # Try to get rebase step info
    try:
        rebase_dir = os.path.join(self.git_ops.repo_path, ".git", "rebase-merge")
        if os.path.exists(rebase_dir):
            # Read step info
            msgnum_file = os.path.join(rebase_dir, "msgnum")
            end_file = os.path.join(rebase_dir, "end")

            if os.path.exists(msgnum_file) and os.path.exists(end_file):
                with open(msgnum_file, "r") as f:
                    status["step"] = int(f.read().strip())
                with open(end_file, "r") as f:
                    status["total_steps"] = int(f.read().strip())
    except (OSError, ValueError):
        pass

    return status

CLI Entry Point

Main

Command-line interface and entry point for git-autosquash.

git_autosquash.main

CLI entry point for git-autosquash.

Classes
Functions
setup_logging(verbose: bool) -> None

Configure logging for git-autosquash without affecting other loggers.

Source code in src/git_autosquash/main.py
def setup_logging(verbose: bool) -> None:
    """Configure logging for git-autosquash without affecting other loggers."""
    # Use named logger to avoid interfering with user's logging configuration
    logger = logging.getLogger("git_autosquash")

    # Only configure if not already configured
    if not logger.handlers:
        handler = logging.StreamHandler()
        handler.setFormatter(logging.Formatter("%(levelname)s: %(message)s"))
        logger.addHandler(handler)

    logger.setLevel(logging.DEBUG if verbose else logging.INFO)
    logger.propagate = False  # Don't propagate to root logger
complete_git_branches(incomplete: str) -> List[str]

Auto-complete git branch names for --base option.

Parameters:

Name Type Description Default
incomplete str

Partial branch name typed by user

required

Returns:

Type Description
List[str]

List of matching branch names

Source code in src/git_autosquash/main.py
def complete_git_branches(incomplete: str) -> List[str]:
    """Auto-complete git branch names for --base option.

    Args:
        incomplete: Partial branch name typed by user

    Returns:
        List of matching branch names
    """
    try:
        result = subprocess.run(
            ["git", "branch", "-a", "--format=%(refname:short)"],
            capture_output=True,
            text=True,
            check=False,
        )
        if result.returncode == 0:
            branches = [
                b.strip() for b in result.stdout.strip().split("\n") if b.strip()
            ]
            return [b for b in branches if b.startswith(incomplete)]
    except Exception:
        pass
    return []
complete_source_option(incomplete: str) -> List[str]

Auto-complete source options.

Parameters:

Name Type Description Default
incomplete str

Partial source value typed by user

required

Returns:

Type Description
List[str]

List of matching source options

Source code in src/git_autosquash/main.py
def complete_source_option(incomplete: str) -> List[str]:
    """Auto-complete source options.

    Args:
        incomplete: Partial source value typed by user

    Returns:
        List of matching source options
    """
    options = ["auto", "working-tree", "index", "head"]
    return [o for o in options if o.startswith(incomplete)]
validate_git_environment(git_ops: GitOps) -> str

Validate git environment and return current branch.

Parameters:

Name Type Description Default
git_ops GitOps

GitOps instance

required

Returns:

Type Description
str

Current branch name

Raises:

Type Description
SystemExit

If validation fails

Source code in src/git_autosquash/main.py
def validate_git_environment(git_ops: GitOps) -> str:
    """Validate git environment and return current branch.

    Args:
        git_ops: GitOps instance

    Returns:
        Current branch name

    Raises:
        SystemExit: If validation fails
    """
    # Check git availability
    if not git_ops.is_git_available():
        error = RepositoryStateError(
            "Git is not installed or not available in PATH",
            recovery_suggestion="Please install git and ensure it's available in your PATH environment variable",
        )
        ErrorReporter.report_error(error)
        sys.exit(1)

    # Validate git repository
    if not git_ops.is_git_repo():
        error = RepositoryStateError(
            "Not in a git repository",
            recovery_suggestion="Please run this command from within a git repository directory",
        )
        ErrorReporter.report_error(error)
        sys.exit(1)

    # Get current branch
    current_branch = git_ops.get_current_branch()
    if not current_branch:
        error = RepositoryStateError(
            "Not on a branch (detached HEAD)",
            recovery_suggestion="Please checkout a branch before using git-autosquash",
        )
        ErrorReporter.report_error(error)
        sys.exit(1)

    return current_branch
get_merge_base(git_ops: GitOps, current_branch: str, base_ref: Optional[str] = None) -> str

Get merge base with main branch or specified base.

Parameters:

Name Type Description Default
git_ops GitOps

GitOps instance

required
current_branch str

Current branch name

required
base_ref Optional[str]

Optional base reference (branch/commit) to use instead of main/master

None

Returns:

Type Description
str

Merge base commit hash

Raises:

Type Description
SystemExit

If merge base not found or invalid

Source code in src/git_autosquash/main.py
def get_merge_base(
    git_ops: GitOps, current_branch: str, base_ref: Optional[str] = None
) -> str:
    """Get merge base with main branch or specified base.

    Args:
        git_ops: GitOps instance
        current_branch: Current branch name
        base_ref: Optional base reference (branch/commit) to use instead of main/master

    Returns:
        Merge base commit hash

    Raises:
        SystemExit: If merge base not found or invalid
    """
    if base_ref:
        # User specified a base, validate it
        is_valid, error_msg, resolved_hash = git_ops.validate_merge_base(base_ref)
        if not is_valid:
            # Get tracking branch info for helpful error message
            result = git_ops.run_git_command(["branch", "-vv", current_branch])
            tracking_info = ""
            if result.returncode == 0 and result.stdout.strip():
                tracking_info = (
                    "\n\nYour current branch tracking info:\n" + result.stdout.strip()
                )

            error = RepositoryStateError(
                f"Invalid base reference: {error_msg}",
                recovery_suggestion=f"Please provide a valid base commit or branch.\n"
                f"Find your tracking branch with: git branch -vv{tracking_info}",
            )
            ErrorReporter.report_error(error)
            sys.exit(1)

        # Validation passed, resolved_hash is guaranteed to be a str
        assert resolved_hash is not None, (
            "validate_merge_base should return hash if valid"
        )
        return resolved_hash

    # Try automatic detection with main/master
    merge_base = git_ops.get_merge_base_with_main(current_branch)
    if not merge_base:
        # Get tracking branch info for helpful error message
        result = git_ops.run_git_command(["branch", "-vv", current_branch])
        tracking_info = ""
        if result.returncode == 0 and result.stdout.strip():
            # Parse tracking branch from output like: * branch_name hash [remote/branch] message
            import re

            match = re.search(r"\[([^\]]+)\]", result.stdout)
            if match:
                tracking_branch = (
                    match.group(1).split(":")[0].strip()
                )  # Remove ahead/behind info
                tracking_info = f"\n\nYour branch appears to track '{tracking_branch}'.\nTry using: git autosquash --base {tracking_branch}"

        error = RepositoryStateError(
            "Could not find merge base with main/master branch",
            recovery_suggestion=f"Your branch '{current_branch}' does not appear to be based on 'main' or 'master'.\n"
            f"Please specify the base branch explicitly using --base:\n\n"
            f"  git autosquash --base <branch-name>\n\n"
            f"Find your tracking branch with: git branch -vv{tracking_info}",
        )
        ErrorReporter.report_error(error)
        sys.exit(1)

    return merge_base
check_repository_state(git_ops: GitOps, merge_base: str, interactive_mode: bool = False) -> None

Check repository state and handle uncommitted changes.

Parameters:

Name Type Description Default
git_ops GitOps

GitOps instance

required
merge_base str

Merge base commit hash

required
interactive_mode bool

If True, prompt user for confirmation; if False, auto-continue

False

Raises:

Type Description
SystemExit

If repository state is invalid

Source code in src/git_autosquash/main.py
def check_repository_state(
    git_ops: GitOps, merge_base: str, interactive_mode: bool = False
) -> None:
    """Check repository state and handle uncommitted changes.

    Args:
        git_ops: GitOps instance
        merge_base: Merge base commit hash
        interactive_mode: If True, prompt user for confirmation; if False, auto-continue

    Raises:
        SystemExit: If repository state is invalid
    """
    # Check for commits to work with
    if not git_ops.has_commits_since_merge_base(merge_base):
        error = RepositoryStateError(
            "No commits to process since merge base",
            recovery_suggestion="Make some commits on your branch before running git-autosquash",
        )
        ErrorReporter.report_error(error)
        sys.exit(1)

    # Check working tree status
    status = git_ops.get_working_tree_status()

    # Only warn about stashing when we actually need to stash (both staged and unstaged)
    if status.get("has_staged", False) and status.get("has_unstaged", False):
        print(
            "⚠️  Working tree has both staged and unstaged changes. Unstaged changes will be temporarily stashed while processing staged changes."
        )
        if not interactive_mode:
            print(
                "✓ Auto-accepting mixed changes (unstaged will be temporarily stashed)"
            )
        else:
            choice = _get_user_choice_for_uncommitted_changes()
            if choice != "continue":
                print("Operation cancelled.")
                sys.exit(0)
process_hunks_and_mappings(git_ops: GitOps, merge_base: str, line_by_line: bool, source: str, blame_ref: str, context: SquashContext) -> tuple[List[HunkTargetMapping], List[HunkTargetMapping], str, bool, Optional[HunkCommitSplitter]]

Process hunks and create target mappings with validation.

Parameters:

Name Type Description Default
git_ops GitOps

GitOps instance

required
merge_base str

Merge base commit hash

required
line_by_line bool

Whether to use line-by-line splitting

required
source str

What to process (working-tree, index, head, commit SHA, or auto)

required
blame_ref str

Git ref to use for blame operations

required
context SquashContext

SquashContext for centralized blame/HEAD exclusion logic

required

Returns:

Type Description
tuple[List[HunkTargetMapping], List[HunkTargetMapping], str, bool, Optional[HunkCommitSplitter]]

Tuple of (automatic_mappings, fallback_mappings, starting_commit, temp_commit_created, splitter)

Raises:

Type Description
SystemExit

If no hunks found

Source code in src/git_autosquash/main.py
def process_hunks_and_mappings(
    git_ops: GitOps,
    merge_base: str,
    line_by_line: bool,
    source: str,
    blame_ref: str,
    context: SquashContext,
) -> tuple[
    List[HunkTargetMapping],
    List[HunkTargetMapping],
    str,
    bool,
    Optional["HunkCommitSplitter"],
]:
    """Process hunks and create target mappings with validation.

    Args:
        git_ops: GitOps instance
        merge_base: Merge base commit hash
        line_by_line: Whether to use line-by-line splitting
        source: What to process (working-tree, index, head, commit SHA, or auto)
        blame_ref: Git ref to use for blame operations
        context: SquashContext for centralized blame/HEAD exclusion logic

    Returns:
        Tuple of (automatic_mappings, fallback_mappings, starting_commit, temp_commit_created, splitter)

    Raises:
        SystemExit: If no hunks found
    """
    from git_autosquash.hunk_commit_splitter import HunkCommitSplitter

    # Phase 1: Normalize source to commit
    normalizer = SourceNormalizer(git_ops)
    starting_commit = normalizer.normalize_to_commit(source)
    logger.debug(f"Processing from commit: {starting_commit[:8]}")

    # Phase 2: Split source commit into per-hunk commits (ALWAYS for reliability)
    # This enables reliable 3-way merge during cherry-pick, which handles complex
    # cases like removing lines from the commit that added them.
    splitter: Optional[HunkCommitSplitter] = None
    split_commits: List[str] = []

    # Always use split-commit approach for reliable 3-way merge
    logger.debug(
        "Splitting source commit into per-hunk commits for reliable 3-way merge"
    )
    splitter = HunkCommitSplitter(git_ops)
    try:
        split_commits, hunks = splitter.split_commit_into_hunks(starting_commit)
        logger.debug(f"Created {len(split_commits)} split commits")
    except Exception as e:
        logger.debug(f"Failed to split commit: {e}")
        logger.warning(
            "Falling back to patch-based approach (may fail for complex cases like removing lines from the commit that added them)"
        )
        # Fall back to normal patch-based approach
        splitter = None
        split_commits = []

    # Phase 3: Parse hunks (use hunks from splitter if available)
    if not split_commits:
        hunk_parser = HunkParser(git_ops)
        hunks = hunk_parser.get_diff_hunks(line_by_line, from_commit=starting_commit)

    if not hunks:
        print("No hunks found to process.")
        normalizer.cleanup_temp_commit()
        if splitter:
            splitter.cleanup()
        sys.exit(0)

    # Phase 4: Pre-flight validation
    validator = ProcessingValidator(git_ops)
    validator.validate_hunk_count(starting_commit, hunks)

    # Phase 5: Resolve target commits for hunks with SquashContext
    # Exclude starting_commit from being a target - it's the commit we're squashing,
    # not a valid target. This prevents temp commits from targeting themselves.
    resolver = HunkTargetResolver(
        git_ops,
        merge_base,
        context,
        blame_ref=blame_ref,
        excluded_commits=[starting_commit],
    )
    mappings = resolver.resolve_targets(hunks)

    # Attach split commit SHAs to mappings (for cherry-pick)
    if split_commits:
        for i, mapping in enumerate(mappings):
            if i < len(split_commits):
                mapping.source_commit_sha = split_commits[i]
                logger.debug(
                    f"Mapped hunk {i + 1} to split commit {split_commits[i][:8]}"
                )

    # Separate automatic mappings from those requiring user input
    automatic_mappings = [m for m in mappings if not m.needs_user_selection]
    fallback_mappings = [m for m in mappings if m.needs_user_selection]

    return (
        automatic_mappings,
        fallback_mappings,
        starting_commit,
        normalizer.temp_commit_created,
        splitter,
    )
handle_automatic_mappings(automatic_mappings: List[HunkTargetMapping], interactive_mode: bool, git_ops: GitOps) -> tuple[List[HunkTargetMapping], List[HunkTargetMapping]]

Handle automatic mappings and return approved/ignored lists.

Parameters:

Name Type Description Default
automatic_mappings List[HunkTargetMapping]

List of automatic mappings

required
interactive_mode bool

If True, show mappings for user review; if False, auto-accept

required

Returns:

Type Description
tuple[List[HunkTargetMapping], List[HunkTargetMapping]]

Tuple of (approved_mappings, ignored_mappings)

Source code in src/git_autosquash/main.py
def handle_automatic_mappings(
    automatic_mappings: List[HunkTargetMapping], interactive_mode: bool, git_ops: GitOps
) -> tuple[List[HunkTargetMapping], List[HunkTargetMapping]]:
    """Handle automatic mappings and return approved/ignored lists.

    Args:
        automatic_mappings: List of automatic mappings
        interactive_mode: If True, show mappings for user review; if False, auto-accept

    Returns:
        Tuple of (approved_mappings, ignored_mappings)
    """
    if not automatic_mappings:
        return [], []

    if not interactive_mode:
        # Auto-accept mode (default): accept all automatic mappings
        # Count unique files
        files = set(m.hunk.file_path for m in automatic_mappings)
        print(
            f"\n✓ Auto-accepting {len(automatic_mappings)} hunks across {len(files)} files"
        )
        logger.debug(
            f"Auto-accepting {len(automatic_mappings)} hunks with blame-identified targets"
        )
        for mapping in automatic_mappings:
            commit_summary = (
                mapping.target_commit[:8] if mapping.target_commit else "unknown"
            )
            logger.debug(f"  → {mapping.hunk.file_path}: {commit_summary}")
        return automatic_mappings, []
    else:
        # Interactive mode: show automatic mappings for user confirmation
        _display_automatic_mappings(automatic_mappings)
        # Return mappings for TUI review instead of auto-accepting
        return automatic_mappings, []
run_interactive_ui(fallback_mappings: List[HunkTargetMapping], git_ops: GitOps, merge_base: str, context: SquashContext, blame_ref: str = 'HEAD') -> bool

Run the interactive TUI for user selections.

Parameters:

Name Type Description Default
fallback_mappings List[HunkTargetMapping]

Mappings requiring user input

required
git_ops GitOps

GitOps instance

required
merge_base str

Merge base commit hash

required
context SquashContext

SquashContext for centralized logic

required
blame_ref str

Git ref to use for blame operations (default: HEAD)

'HEAD'

Returns:

Type Description
bool

True if user approved changes, False if cancelled

Source code in src/git_autosquash/main.py
def run_interactive_ui(
    fallback_mappings: List[HunkTargetMapping],
    git_ops: GitOps,
    merge_base: str,
    context: SquashContext,
    blame_ref: str = "HEAD",
) -> bool:
    """Run the interactive TUI for user selections.

    Args:
        fallback_mappings: Mappings requiring user input
        git_ops: GitOps instance
        merge_base: Merge base commit hash
        context: SquashContext for centralized logic
        blame_ref: Git ref to use for blame operations (default: HEAD)

    Returns:
        True if user approved changes, False if cancelled
    """
    try:
        from git_autosquash.commit_history_analyzer import CommitHistoryAnalyzer
        from git_autosquash.tui.modern_app import ModernAutoSquashApp

        commit_analyzer = CommitHistoryAnalyzer(git_ops, merge_base)

        # Launch the modern TUI app
        app = ModernAutoSquashApp(fallback_mappings, commit_analyzer)
        approved = app.run()

        if approved:
            # Use the same rebase execution path as auto-accept mode for consistency
            result = _execute_rebase(
                app.approved_mappings,
                app.ignored_mappings,
                git_ops,
                merge_base,
                HunkTargetResolver(git_ops, merge_base, context, blame_ref=blame_ref),
                context,
                blame_ref=blame_ref,
            )
            if result:
                print("✓ Successfully applied selected hunks")
                return True
            else:
                print("✗ Failed to apply some hunks")
                return False
        else:
            print("Operation cancelled by user.")
            return False

    except Exception as e:
        if "Cancelled by user" in str(e):
            ErrorReporter.report_error(UserCancelledError("TUI cancelled by user"))
        else:
            error = handle_unexpected_error(e, "TUI execution")
            ErrorReporter.report_error(error)
        return False
run(ctx: typer.Context, line_by_line: Annotated[bool, typer.Option(--line - by - line, help='Use line-by-line hunk splitting instead of default git hunks')] = False, interactive: Annotated[bool, typer.Option(--interactive, -i, help='Launch interactive TUI for manual hunk review and approval')] = False, dry_run: Annotated[bool, typer.Option(--dry - run, -n, help='Show what would be done without making changes')] = False, source: Annotated[str, typer.Option(help="Specify what to process: 'auto' (detect based on tree status), 'working-tree', 'index', 'head', or a commit SHA. When 'head' or a commit SHA, git blame starts on <commit>~1", autocompletion=complete_source_option)] = 'auto', base: Annotated[Optional[str], typer.Option(help='Specify the base commit/branch for the merge-base. Use this when working on feature branches that are not based on main/master. Example: --base andrewleech/usbd_net or --base origin/develop', autocompletion=complete_git_branches)] = None, verbose: Annotated[bool, typer.Option(--verbose, -v, help='Enable verbose debug output for troubleshooting')] = False) -> None

Automatically squash changes back into historical commits.

Source code in src/git_autosquash/main.py
@app.callback(invoke_without_command=True)
def run(
    ctx: typer.Context,
    line_by_line: Annotated[
        bool,
        typer.Option(
            "--line-by-line",
            help="Use line-by-line hunk splitting instead of default git hunks",
        ),
    ] = False,
    interactive: Annotated[
        bool,
        typer.Option(
            "--interactive",
            "-i",
            help="Launch interactive TUI for manual hunk review and approval",
        ),
    ] = False,
    dry_run: Annotated[
        bool,
        typer.Option(
            "--dry-run",
            "-n",
            help="Show what would be done without making changes",
        ),
    ] = False,
    source: Annotated[
        str,
        typer.Option(
            help="Specify what to process: 'auto' (detect based on tree status), "
            "'working-tree', 'index', 'head', or a commit SHA. "
            "When 'head' or a commit SHA, git blame starts on <commit>~1",
            autocompletion=complete_source_option,
        ),
    ] = "auto",
    base: Annotated[
        Optional[str],
        typer.Option(
            help="Specify the base commit/branch for the merge-base. "
            "Use this when working on feature branches that are not based on main/master. "
            "Example: --base andrewleech/usbd_net or --base origin/develop",
            autocompletion=complete_git_branches,
        ),
    ] = None,
    verbose: Annotated[
        bool,
        typer.Option(
            "--verbose",
            "-v",
            help="Enable verbose debug output for troubleshooting",
        ),
    ] = False,
) -> None:
    """Automatically squash changes back into historical commits."""
    # Configure logging based on verbose flag
    setup_logging(verbose)
    # If a subcommand is being invoked, skip the main logic
    if ctx.invoked_subcommand is not None:
        return

    try:
        # Allow interactive dry-run (useful for previewing changes without applying)

        # Phase 2: Initialize git operations and validate environment
        git_ops = GitOps()
        current_branch = validate_git_environment(git_ops)
        merge_base = get_merge_base(git_ops, current_branch, base)
        check_repository_state(git_ops, merge_base, interactive_mode=interactive)

        # Save original HEAD for validation (before any processing)
        original_head_result = git_ops.run_git_command(["rev-parse", "HEAD"])
        original_head = (
            original_head_result.stdout.strip()
            if original_head_result.returncode == 0
            else None
        )

        # Create SquashContext from --source argument
        context = SquashContext.from_source(source, git_ops)
        blame_ref = context.blame_ref

        # Validate context
        validation_errors = context.validate(git_ops)
        if validation_errors:
            typer.echo("Error: Invalid context configuration:", err=True)
            for error in validation_errors:
                typer.echo(f"  • {error}", err=True)
            raise typer.Exit(code=1)

        # Phase 3: Process hunks and create mappings
        (
            automatic_mappings,
            fallback_mappings,
            starting_commit,
            temp_commit_created,
            splitter,
        ) = process_hunks_and_mappings(
            git_ops, merge_base, line_by_line, source, blame_ref, context
        )

        # Store normalizer for cleanup
        normalizer = SourceNormalizer(git_ops)
        normalizer.starting_commit = starting_commit
        normalizer.temp_commit_created = temp_commit_created

        try:
            logger.debug(f"Found target commits for {len(automatic_mappings)} hunks")
            if fallback_mappings:
                logger.debug(
                    f"Found {len(fallback_mappings)} hunks requiring manual target selection"
                )

            # Phase 4: Handle user interaction
            success = False
            rebase_performed = False  # Track if we actually did a rebase

            if dry_run:
                # Dry run mode: show what would be done without making changes
                _show_dry_run_output(automatic_mappings, fallback_mappings, git_ops)
                success = True
            elif interactive:
                # Interactive mode: combine all mappings for user review in TUI
                all_mappings = automatic_mappings + fallback_mappings

                if all_mappings:
                    success = run_interactive_ui(
                        all_mappings,
                        git_ops,
                        merge_base,
                        context,
                        blame_ref=blame_ref,
                    )
                else:
                    print("No hunks found to process.")
                    success = True
            else:
                # Auto-accept mode (DEFAULT): process only automatic mappings without TUI
                approved_mappings, ignored_mappings = handle_automatic_mappings(
                    automatic_mappings, interactive_mode=False, git_ops=git_ops
                )

                # Add fallback mappings to ignored list (they can't be auto-accepted)
                # These will be preserved in the source commit if using --source
                ignored_mappings.extend(fallback_mappings)

                if approved_mappings or ignored_mappings:
                    success = _execute_rebase(
                        approved_mappings,
                        ignored_mappings,
                        git_ops,
                        merge_base,
                        HunkTargetResolver(
                            git_ops, merge_base, context, blame_ref=blame_ref
                        ),
                        context,
                        blame_ref=blame_ref,
                    )
                    # Only mark rebase as performed if we actually approved hunks
                    # If only ignored_mappings, no commits were changed
                    rebase_performed = success and len(approved_mappings) > 0
                else:
                    success = True  # No rebase needed

            # Phase 5: Post-flight validation
            if success and not dry_run:
                validator = ProcessingValidator(git_ops)
                # For temp commits (staged/working-tree changes), use starting_commit as baseline
                # because that's the commit containing all the changes we're squashing.
                # For non-temp sources (--source HEAD or commit ref), use original_head
                # because the changes were already in history.
                if temp_commit_created:
                    validation_base = starting_commit
                else:
                    validation_base = (
                        original_head if original_head else starting_commit
                    )
                validator.validate_processing(
                    validation_base, description="squash operation"
                )
                print("[+] Validation passed - no corruption detected")

            # Phase 6: Report results
            if success:
                if dry_run:
                    print("\n✓ Dry run completed successfully!")
                else:
                    print("✓ Operation completed successfully!")
            else:
                print("✗ Operation failed or was cancelled.")
                raise typer.Exit(code=1)

        finally:
            # Cleanup temp commit logic:
            # - On failure: cleanup to restore original state
            # - On dry_run: cleanup to restore original state
            # - On success with rebase: don't cleanup (temp commit is orphaned, resetting would undo rebase)
            # - On success without rebase: cleanup (temp commit is HEAD, no work was done)
            if not success or dry_run or not rebase_performed:
                normalizer.cleanup_temp_commit()
            if splitter:
                splitter.cleanup()

    except GitAutoSquashError as e:
        ErrorReporter.report_error(e)
        raise typer.Exit(code=1)
    except KeyboardInterrupt:
        cancel_error = UserCancelledError("git-autosquash operation")
        ErrorReporter.report_error(cancel_error)
        raise typer.Exit(code=130)
    except (subprocess.SubprocessError, FileNotFoundError) as e:
        wrapped = handle_unexpected_error(
            e, "git operation", "Check git installation and repository state"
        )
        ErrorReporter.report_error(wrapped)
        raise typer.Exit(code=1)
    except Exception as e:
        # Check if it's a validation error
        from git_autosquash.validation import ValidationError

        if isinstance(e, ValidationError):
            typer.echo(f"\n✗ VALIDATION FAILED: {e}", err=True)
            typer.echo(
                "This indicates potential data corruption during processing.",
                err=True,
            )
            raise typer.Exit(code=1)

        wrapped = handle_unexpected_error(e, "git-autosquash execution")
        ErrorReporter.report_error(wrapped)
        raise typer.Exit(code=1)
main() -> None

Entry point wrapper for console_scripts.

Source code in src/git_autosquash/main.py
def main() -> None:
    """Entry point wrapper for console_scripts."""
    app()

TUI Components

ModernAutoSquashApp

Main Textual application for the interactive approval workflow.

git_autosquash.tui.modern_app

Modern Textual application implementing the 3-panel workflow from the mock.

Classes
ModernAutoSquashApp

Bases: App[bool]

Modern 3-panel Textual application for git-autosquash.

This application implements a completely different workflow from the enhanced app:

  1. Selection-based workflow: Select changes → Review targets → Choose → Continue
  2. Clean interface: No inline checkboxes or approval widgets
  3. Dynamic panels: Right panel updates based on left panel selection
  4. Live preview: Bottom panel shows diff content in real-time

This matches the workflow shown in the hero_screenshot.png mock.

Source code in src/git_autosquash/tui/modern_app.py
class ModernAutoSquashApp(App[bool]):
    """Modern 3-panel Textual application for git-autosquash.

    This application implements a completely different workflow from the enhanced app:

    1. **Selection-based workflow**: Select changes → Review targets → Choose → Continue
    2. **Clean interface**: No inline checkboxes or approval widgets
    3. **Dynamic panels**: Right panel updates based on left panel selection
    4. **Live preview**: Bottom panel shows diff content in real-time

    This matches the workflow shown in the hero_screenshot.png mock.
    """

    TITLE = "Git Autosquash"

    # Modern CSS styling will be defined in modern_screens.py
    CSS = """
    /* Base modern layout CSS - specific styling in screens */
    Screen {
        background: $surface;
    }
    """

    def __init__(
        self,
        mappings: List[HunkTargetMapping],
        commit_history_analyzer: CommitHistoryAnalyzer,
        **kwargs,
    ) -> None:
        """Initialize the modern git-autosquash app.

        Args:
            mappings: List of hunk to commit mappings to review
            commit_history_analyzer: Analyzer for generating commit suggestions
        """
        super().__init__(**kwargs)
        self.mappings = mappings
        self.commit_history_analyzer = commit_history_analyzer

        # Final selections - different from enhanced app's approach
        self.selected_targets: Dict[HunkTargetMapping, str] = {}
        self.ignored_mappings: List[HunkTargetMapping] = []

    def on_mount(self) -> None:
        """Handle app mounting."""
        # Launch the modern approval screen with 3-panel layout
        screen = ModernApprovalScreen(self.mappings, self.commit_history_analyzer)
        self.push_screen(screen, callback=self._on_approval_complete)

    def _on_approval_complete(self, result: Any) -> None:
        """Handle completion of approval screen.

        Args:
            result: Result from approval screen - either False (cancelled) or dict with selections
        """
        if result:
            # Modern workflow: result contains target assignments and ignored items
            self.selected_targets = result.get("targets", {})
            self.ignored_mappings = result.get("ignored", [])
            self.exit(True)
        else:
            self.exit(False)

    @property
    def approved_mappings(self) -> List[HunkTargetMapping]:
        """Get approved mappings (those with selected targets).

        Returns:
            List of mappings that have target commits assigned
        """
        approved = []
        for mapping in self.mappings:
            if mapping in self.selected_targets:
                # Update the mapping with selected target
                mapping.target_commit = self.selected_targets[mapping]
                mapping.needs_user_selection = False
                approved.append(mapping)
        return approved
Attributes
approved_mappings: List[HunkTargetMapping] property

Get approved mappings (those with selected targets).

Returns:

Type Description
List[HunkTargetMapping]

List of mappings that have target commits assigned

Functions
__init__(mappings: List[HunkTargetMapping], commit_history_analyzer: CommitHistoryAnalyzer, **kwargs) -> None

Initialize the modern git-autosquash app.

Parameters:

Name Type Description Default
mappings List[HunkTargetMapping]

List of hunk to commit mappings to review

required
commit_history_analyzer CommitHistoryAnalyzer

Analyzer for generating commit suggestions

required
Source code in src/git_autosquash/tui/modern_app.py
def __init__(
    self,
    mappings: List[HunkTargetMapping],
    commit_history_analyzer: CommitHistoryAnalyzer,
    **kwargs,
) -> None:
    """Initialize the modern git-autosquash app.

    Args:
        mappings: List of hunk to commit mappings to review
        commit_history_analyzer: Analyzer for generating commit suggestions
    """
    super().__init__(**kwargs)
    self.mappings = mappings
    self.commit_history_analyzer = commit_history_analyzer

    # Final selections - different from enhanced app's approach
    self.selected_targets: Dict[HunkTargetMapping, str] = {}
    self.ignored_mappings: List[HunkTargetMapping] = []
on_mount() -> None

Handle app mounting.

Source code in src/git_autosquash/tui/modern_app.py
def on_mount(self) -> None:
    """Handle app mounting."""
    # Launch the modern approval screen with 3-panel layout
    screen = ModernApprovalScreen(self.mappings, self.commit_history_analyzer)
    self.push_screen(screen, callback=self._on_approval_complete)

ModernScreens

Interactive screens for reviewing and approving hunk to commit mappings.

git_autosquash.tui.modern_screens

Screen implementations with 3-panel layout.

Classes
ModernApprovalScreen

Bases: Screen[Dict[str, Any]]

3-panel approval screen.

Layout: ┌─────────────────────────────────────────────────────────────┐ │ Header │ ├─────────────────────┬───────────────────────────────────────┤ │ Changes to Review │ Target Commits │ │ (Green border) │ (Cyan border) │ │ │ │ │ • file1.py:10-15 │ ○ commit abc123 Fix typo │ │ • file2.js:5-8 │ ○ commit def456 Update logic │ │ • file3.py:20-25 │ ○ commit ghi789 Refactor │ │ │ │ ├─────────────────────┴───────────────────────────────────────┤ │ Preview │ │ (White border) │ │ │ │ @@ -10,3 +10,3 @@ │ │ - old line │ │ + new line │ │ │ ├─────────────────────────────────────────────────────────────┤ │ Continue / Escape to Cancel │ └─────────────────────────────────────────────────────────────┘

Workflow: 1. User selects a change from left panel 2. Right panel shows suggested target commits for that change 3. Bottom panel shows diff preview of the selected change 4. User can select a target commit from right panel (updates the mapping) 5. User continues to next change or clicks Continue when done

Source code in src/git_autosquash/tui/modern_screens.py
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
class ModernApprovalScreen(Screen[Dict[str, Any]]):
    """3-panel approval screen.

    Layout:
    ┌─────────────────────────────────────────────────────────────┐
    │                        Header                               │
    ├─────────────────────┬───────────────────────────────────────┤
    │   Changes to Review │          Target Commits               │
    │   (Green border)    │          (Cyan border)                │
    │                     │                                       │
    │ • file1.py:10-15    │  ○ commit abc123 Fix typo             │
    │ • file2.js:5-8      │  ○ commit def456 Update logic         │
    │ • file3.py:20-25    │  ○ commit ghi789 Refactor             │
    │                     │                                       │
    ├─────────────────────┴───────────────────────────────────────┤
    │                     Preview                                 │
    │                   (White border)                            │
    │                                                             │
    │  @@ -10,3 +10,3 @@                                          │
    │  -    old line                                              │
    │  +    new line                                              │
    │                                                             │
    ├─────────────────────────────────────────────────────────────┤
    │                    Continue / Escape to Cancel              │
    └─────────────────────────────────────────────────────────────┘

    Workflow:
    1. User selects a change from left panel
    2. Right panel shows suggested target commits for that change
    3. Bottom panel shows diff preview of the selected change
    4. User can select a target commit from right panel (updates the mapping)
    5. User continues to next change or clicks Continue when done
    """

    BINDINGS = [
        Binding("enter", "continue", "Continue", priority=True),
        Binding("escape", "cancel", "Cancel", priority=True),
        Binding("j,down", "next_change", "Next Change", show=False),
        Binding("k,up", "prev_change", "Previous Change", show=False),
    ]

    # layout CSS with proper bordered panels
    CSS = """
    /* 3-panel layout */
    #main-container {
        layout: vertical;
        height: 100%;
    }

    #panels-row {
        layout: horizontal;
        height: 1fr;
    }

    #changes-panel {
        width: 40%;
        height: 100%;
        margin: 0 1 0 0;
        border: round green;
        border-title-style: bold;
        border-title-color: white;
        padding: 1;
    }

    #targets-panel {
        width: 60%;
        height: 100%;
        margin: 0 0 0 1;
        border: round cyan;
        border-title-style: bold;
        border-title-color: white;
        padding: 1;
        overflow: auto scroll;
    }

    #preview-panel {
        height: 1fr;
        margin: 1 0;
        border: round white;
        border-title-style: bold;
        border-title-color: white;
        padding: 1;
    }

    #action-buttons {
        height: 3;
        layout: horizontal;
        align: center middle;
        margin-bottom: 1;
    }

    #action-buttons Button {
        margin: 0 1;
        min-width: 15;
    }

    /* Changes list styling */
    #changes-list {
        height: 100%;
        width: 100%;
    }

    #changes-list ListItem {
        padding: 0 1;
        height: 1;
        width: 100%;
        min-width: 100%;
        text-wrap: nowrap;
    }

    #changes-list ListItem.--highlight {
        background: $surface-lighten-1;
        border-left: thick $primary;
        color: $text;
    }

    #changes-list ListItem Static {
        width: 100%;
        height: 1;
    }

    /* Target commits styling */
    #targets-container {
        height: 100%;
        width: 100%;
    }

    #targets-container RadioSet {
        height: 100%;
        width: 100%;
    }

    #targets-container RadioButton {
        padding: 0 1;
        height: 1;
        width: 100%;
        text-wrap: nowrap;
    }

    /* Auto-target styling */
    #targets-container RadioButton.auto-target {
        color: $success;
        text-style: bold;
    }

    /* Ignore option styling */
    #targets-container RadioButton.ignore-option {
        color: $warning;
        height: 2;
        margin-bottom: 1;
        padding: 0 1;
        border-bottom: thick $warning-muted;
        text-style: bold;
        text-wrap: wrap;
    }

    /* Preview panel styling */
    #diff-preview {
        height: 100%;
        width: 100%;
        overflow: auto;
    }
    """

    def __init__(
        self,
        mappings: List[HunkTargetMapping],
        commit_history_analyzer: CommitHistoryAnalyzer,
        **kwargs,
    ) -> None:
        """Initialize approval screen.

        Args:
            mappings: List of hunk to commit mappings to review
            commit_history_analyzer: Analyzer for generating commit suggestions
        """
        super().__init__(**kwargs)
        self.mappings = mappings
        self.commit_history_analyzer = commit_history_analyzer

        # Current state
        self.selected_mapping: Optional[HunkTargetMapping] = None
        self.current_targets: List[CommitInfo] = []

        # Final selections
        self.target_assignments: Dict[HunkTargetMapping, str] = {}
        self.ignored_mappings: List[HunkTargetMapping] = []

    def compose(self) -> ComposeResult:
        """Compose the 3-panel layout."""
        yield Header()

        with Container(id="main-container"):
            with Horizontal(id="panels-row"):
                # Left panel: Changes to Review (green border)
                with Container(id="changes-panel") as changes_container:
                    changes_container.border_title = "Changes to Review"
                    yield ListView(id="changes-list")

                # Right panel: Target Commits (cyan border)
                with Container(id="targets-panel") as targets_container:
                    targets_container.border_title = "Target Commits"
                    yield Vertical(id="targets-container")

            # Bottom panel: Preview (white border)
            with Container(id="preview-panel") as preview_container:
                preview_container.border_title = "Preview"
                yield Static("Select a change to view diff preview", id="diff-preview")

            # Action buttons
            with Horizontal(id="action-buttons"):
                yield Button("Continue", variant="success", id="continue-btn")
                yield Button("Cancel", variant="default", id="cancel-btn")

        yield Footer()

    async def on_mount(self) -> None:
        """Handle screen mounting."""
        # Populate changes list
        changes_list = self.query_one("#changes-list", ListView)
        for mapping in self.mappings:
            hunk = mapping.hunk
            # Format: "file.py:lines" or "[DELETED] file.py" for deletions
            if hunk.is_file_deletion:
                change_text = f"[DELETED] {hunk.file_path}"
            else:
                change_text = f"{hunk.file_path}:{hunk.new_start}-{hunk.new_start + hunk.new_count - 1}"
            item = ChangeListItem(change_text, mapping)
            await changes_list.append(item)

        # Auto-select first change if available
        if self.mappings:
            changes_list.index = 0
            if changes_list.index is not None:
                await self._handle_change_selection(changes_list.index)

    @on(ListView.Highlighted)
    async def on_list_highlighted(self, event: ListView.Highlighted) -> None:
        """Handle list item highlighting."""
        if event.list_view.id == "changes-list" and event.list_view.index is not None:
            await self._handle_change_selection(event.list_view.index)

    @on(RadioSet.Changed)
    async def on_radio_changed(self, event: RadioSet.Changed) -> None:
        """Handle radio button selection in targets panel."""
        if event.radio_set.id == "targets-radio" and event.pressed:
            # Get the commit hash from the custom attribute
            if hasattr(event.pressed, "commit_hash"):
                selected_hash = event.pressed.commit_hash
                if self.selected_mapping:
                    if selected_hash == "ignore-hunk":
                        # Handle ignore selection
                        if self.selected_mapping not in self.ignored_mappings:
                            self.ignored_mappings.append(self.selected_mapping)
                        # Remove from target assignments if present
                        if self.selected_mapping in self.target_assignments:
                            del self.target_assignments[self.selected_mapping]
                    else:
                        # Handle commit selection
                        self.target_assignments[self.selected_mapping] = selected_hash
                        # Remove from ignored if present
                        if self.selected_mapping in self.ignored_mappings:
                            self.ignored_mappings.remove(self.selected_mapping)

    async def _handle_change_selection(self, index: int) -> None:
        """Handle selection of a change from the left panel."""
        if 0 <= index < len(self.mappings):
            self.selected_mapping = self.mappings[index]

            # Update targets panel
            await self._update_targets_panel()

            # Update preview panel
            await self._update_preview_panel()

    async def _update_targets_panel(self) -> None:
        """Update the targets panel with commits for the selected change."""
        if not self.selected_mapping:
            return

        # Get commit suggestions for this hunk
        mapping = self.selected_mapping
        if mapping.target_commit and not mapping.needs_user_selection:
            # Blame match - show the target commit plus suggestions
            strategy = CommitSelectionStrategy.FILE_RELEVANCE
        else:
            # Fallback case - show general suggestions
            strategy = CommitSelectionStrategy.RECENCY

        self.current_targets = self.commit_history_analyzer.get_commit_suggestions(
            strategy, mapping.hunk.file_path
        )[:20]  # Limit to 20 for UI performance

        # Update the targets container - recreate RadioSet each time
        targets_container = self.query_one("#targets-container", Vertical)

        # Clear existing RadioSet
        await targets_container.remove_children()

        # Create radio buttons for each commit
        radio_buttons = []
        selected_value: Optional[str] = None

        # Add ignore option at the top
        ignore_btn = RadioButton("🚫 Ignore (keep in working tree)")
        # Store commit hash as custom attribute for event handling (same pattern as commits)
        ignore_btn.commit_hash = "ignore-hunk"  # type: ignore[attr-defined]
        ignore_btn.add_class("ignore-option")

        # Check if this hunk is already ignored
        if mapping in self.ignored_mappings:
            ignore_btn._should_be_selected = True  # type: ignore[attr-defined]
            selected_value = "ignore-hunk"

        radio_buttons.append(ignore_btn)

        for commit_info in self.current_targets:
            # Check if this is the automatic blame target
            is_auto_target = (
                mapping.target_commit
                and commit_info.commit_hash == mapping.target_commit
            )

            # Format with confidence indicators at the end to maintain alignment
            if is_auto_target:
                confidence = getattr(mapping, "confidence", "unknown")
                if confidence == "high":
                    confidence_text = " ✓HIGH"
                elif confidence == "medium":
                    confidence_text = " ~MED"
                else:
                    confidence_text = " ?LOW"
            else:
                confidence_text = ""

            # Don't truncate - let the panel handle text wrapping and sizing
            subject = commit_info.subject
            commit_text = f"{commit_info.commit_hash[:7]} {subject}{confidence_text}"

            # Create radio button - always start unselected, let RadioSet manage selection
            radio_btn = RadioButton(commit_text)
            # Store commit hash as custom attribute for event handling
            radio_btn.commit_hash = commit_info.commit_hash  # type: ignore[attr-defined]

            if is_auto_target:
                radio_btn.add_class("auto-target")

            # Determine if this should be the selected one and mark it
            should_select = False

            # Priority 1: Existing user assignment (always takes precedence)
            if mapping in self.target_assignments:
                if self.target_assignments[mapping] == commit_info.commit_hash:
                    should_select = True
            # Priority 2: Auto-target (if no user assignment exists)
            elif mapping not in self.target_assignments and is_auto_target:
                should_select = True

            if should_select:
                # Mark this button for post-mount selection
                radio_btn._should_be_selected = True  # type: ignore[attr-defined]
                selected_value = commit_info.commit_hash

            radio_buttons.append(radio_btn)

        # Create new RadioSet with all buttons
        targets_radio = RadioSet(*radio_buttons, id="targets-radio")

        # Mount the RadioSet
        await targets_container.mount(targets_radio)

        # Apply selection after mounting if we have something to select
        if selected_value is not None:
            # Only record in target_assignments if it's not the ignore option
            if (
                selected_value != "ignore-hunk"
                and mapping not in self.target_assignments
            ):
                self.target_assignments[mapping] = selected_value

            # Use call_after_refresh to ensure proper timing for selection
            # self.call_after_refresh(self._sync_radio_selection)
            self._sync_radio_selection()

    def _sync_radio_selection(self) -> None:
        """Sync focus to selected RadioButton after mounting."""
        try:
            # Find the target RadioSet
            targets_radio = self.query_one("#targets-radio", RadioSet)

            # Find the button marked for selection
            all_buttons = targets_radio.query(RadioButton).results()
            target_button = None

            for button in all_buttons:
                if (
                    hasattr(button, "_should_be_selected")
                    and button._should_be_selected
                ):
                    target_button = button
                    break

            if target_button:
                # Let RadioSet handle the selection properly
                # target_button.pressed = target_button
                target_button.value = False
                target_button.focus()
                target_button.value = True

        except Exception:
            # Log error but don't fail - selection is not critical for basic functionality
            pass

    async def _update_preview_panel(self) -> None:
        """Update the preview panel with diff content for the selected change."""
        if not self.selected_mapping:
            return

        # Format diff similar to the enhanced app
        hunk = self.selected_mapping.hunk
        diff_lines = []

        # Handle file deletions specially
        if hunk.is_file_deletion:
            diff_lines.append("[FILE DELETION]")
            diff_lines.append(f"--- {hunk.file_path}")
            diff_lines.append("+++ /dev/null")
            if hunk.deleted_file_mode:
                diff_lines.append(f"deleted file mode {hunk.deleted_file_mode}")
            diff_lines.append("")

            # Show deleted file content if available
            if hunk.deleted_file_content:
                diff_lines.append("Deleted content:")
                diff_lines.append("")
                for line in hunk.deleted_file_content.split("\n"):
                    diff_lines.append(f"-{line}")
            elif hunk.lines:
                # File deletion with content hunks
                for line in hunk.lines:
                    diff_lines.append(line)
            else:
                diff_lines.append("(empty file)")
        else:
            # Regular hunk display
            # Add file header
            diff_lines.append(f"--- {hunk.file_path}")
            diff_lines.append(f"+++ {hunk.file_path}")
            diff_lines.append(
                f"@@ -{hunk.old_start},{hunk.old_count} +{hunk.new_start},{hunk.new_count} @@"
            )

            # Add context before if available
            for line in hunk.context_before:
                diff_lines.append(f" {line}")

            # Add hunk lines
            for line in hunk.lines:
                diff_lines.append(line)

            # Add context after if available
            for line in hunk.context_after:
                diff_lines.append(f" {line}")

        diff_text = "\n".join(diff_lines)

        # Update preview with syntax highlighting
        try:
            from rich.syntax import Syntax
            from rich.text import Text

            content: Union["Syntax", "Text"] = Syntax(
                diff_text, "diff", theme="monokai", line_numbers=False
            )
        except (ImportError, ValueError):
            from rich.text import Text

            content = Text(diff_text)

        preview = self.query_one("#diff-preview", Static)
        preview.update(content)

    def action_continue(self) -> None:
        """Continue with current selections."""
        result = {"targets": self.target_assignments, "ignored": self.ignored_mappings}
        self.dismiss(result)

    def action_cancel(self) -> None:
        """Cancel the operation."""
        self.dismiss(None)

    def action_next_change(self) -> None:
        """Navigate to next change."""
        changes_list = self.query_one("#changes-list", ListView)
        if (
            changes_list.index is not None
            and changes_list.index < len(self.mappings) - 1
        ):
            changes_list.index += 1

    def action_prev_change(self) -> None:
        """Navigate to previous change."""
        changes_list = self.query_one("#changes-list", ListView)
        if changes_list.index is not None and changes_list.index > 0:
            changes_list.index -= 1

    @on(Button.Pressed)
    def on_button_pressed(self, event: Button.Pressed) -> None:
        """Handle button presses."""
        if event.button.id == "continue-btn":
            self.action_continue()
        elif event.button.id == "cancel-btn":
            self.action_cancel()
Functions
__init__(mappings: List[HunkTargetMapping], commit_history_analyzer: CommitHistoryAnalyzer, **kwargs) -> None

Initialize approval screen.

Parameters:

Name Type Description Default
mappings List[HunkTargetMapping]

List of hunk to commit mappings to review

required
commit_history_analyzer CommitHistoryAnalyzer

Analyzer for generating commit suggestions

required
Source code in src/git_autosquash/tui/modern_screens.py
def __init__(
    self,
    mappings: List[HunkTargetMapping],
    commit_history_analyzer: CommitHistoryAnalyzer,
    **kwargs,
) -> None:
    """Initialize approval screen.

    Args:
        mappings: List of hunk to commit mappings to review
        commit_history_analyzer: Analyzer for generating commit suggestions
    """
    super().__init__(**kwargs)
    self.mappings = mappings
    self.commit_history_analyzer = commit_history_analyzer

    # Current state
    self.selected_mapping: Optional[HunkTargetMapping] = None
    self.current_targets: List[CommitInfo] = []

    # Final selections
    self.target_assignments: Dict[HunkTargetMapping, str] = {}
    self.ignored_mappings: List[HunkTargetMapping] = []
compose() -> ComposeResult

Compose the 3-panel layout.

Source code in src/git_autosquash/tui/modern_screens.py
def compose(self) -> ComposeResult:
    """Compose the 3-panel layout."""
    yield Header()

    with Container(id="main-container"):
        with Horizontal(id="panels-row"):
            # Left panel: Changes to Review (green border)
            with Container(id="changes-panel") as changes_container:
                changes_container.border_title = "Changes to Review"
                yield ListView(id="changes-list")

            # Right panel: Target Commits (cyan border)
            with Container(id="targets-panel") as targets_container:
                targets_container.border_title = "Target Commits"
                yield Vertical(id="targets-container")

        # Bottom panel: Preview (white border)
        with Container(id="preview-panel") as preview_container:
            preview_container.border_title = "Preview"
            yield Static("Select a change to view diff preview", id="diff-preview")

        # Action buttons
        with Horizontal(id="action-buttons"):
            yield Button("Continue", variant="success", id="continue-btn")
            yield Button("Cancel", variant="default", id="cancel-btn")

    yield Footer()
on_mount() -> None async

Handle screen mounting.

Source code in src/git_autosquash/tui/modern_screens.py
async def on_mount(self) -> None:
    """Handle screen mounting."""
    # Populate changes list
    changes_list = self.query_one("#changes-list", ListView)
    for mapping in self.mappings:
        hunk = mapping.hunk
        # Format: "file.py:lines" or "[DELETED] file.py" for deletions
        if hunk.is_file_deletion:
            change_text = f"[DELETED] {hunk.file_path}"
        else:
            change_text = f"{hunk.file_path}:{hunk.new_start}-{hunk.new_start + hunk.new_count - 1}"
        item = ChangeListItem(change_text, mapping)
        await changes_list.append(item)

    # Auto-select first change if available
    if self.mappings:
        changes_list.index = 0
        if changes_list.index is not None:
            await self._handle_change_selection(changes_list.index)
on_list_highlighted(event: ListView.Highlighted) -> None async

Handle list item highlighting.

Source code in src/git_autosquash/tui/modern_screens.py
@on(ListView.Highlighted)
async def on_list_highlighted(self, event: ListView.Highlighted) -> None:
    """Handle list item highlighting."""
    if event.list_view.id == "changes-list" and event.list_view.index is not None:
        await self._handle_change_selection(event.list_view.index)
on_radio_changed(event: RadioSet.Changed) -> None async

Handle radio button selection in targets panel.

Source code in src/git_autosquash/tui/modern_screens.py
@on(RadioSet.Changed)
async def on_radio_changed(self, event: RadioSet.Changed) -> None:
    """Handle radio button selection in targets panel."""
    if event.radio_set.id == "targets-radio" and event.pressed:
        # Get the commit hash from the custom attribute
        if hasattr(event.pressed, "commit_hash"):
            selected_hash = event.pressed.commit_hash
            if self.selected_mapping:
                if selected_hash == "ignore-hunk":
                    # Handle ignore selection
                    if self.selected_mapping not in self.ignored_mappings:
                        self.ignored_mappings.append(self.selected_mapping)
                    # Remove from target assignments if present
                    if self.selected_mapping in self.target_assignments:
                        del self.target_assignments[self.selected_mapping]
                else:
                    # Handle commit selection
                    self.target_assignments[self.selected_mapping] = selected_hash
                    # Remove from ignored if present
                    if self.selected_mapping in self.ignored_mappings:
                        self.ignored_mappings.remove(self.selected_mapping)
action_continue() -> None

Continue with current selections.

Source code in src/git_autosquash/tui/modern_screens.py
def action_continue(self) -> None:
    """Continue with current selections."""
    result = {"targets": self.target_assignments, "ignored": self.ignored_mappings}
    self.dismiss(result)
action_cancel() -> None

Cancel the operation.

Source code in src/git_autosquash/tui/modern_screens.py
def action_cancel(self) -> None:
    """Cancel the operation."""
    self.dismiss(None)
action_next_change() -> None

Navigate to next change.

Source code in src/git_autosquash/tui/modern_screens.py
def action_next_change(self) -> None:
    """Navigate to next change."""
    changes_list = self.query_one("#changes-list", ListView)
    if (
        changes_list.index is not None
        and changes_list.index < len(self.mappings) - 1
    ):
        changes_list.index += 1
action_prev_change() -> None

Navigate to previous change.

Source code in src/git_autosquash/tui/modern_screens.py
def action_prev_change(self) -> None:
    """Navigate to previous change."""
    changes_list = self.query_one("#changes-list", ListView)
    if changes_list.index is not None and changes_list.index > 0:
        changes_list.index -= 1
on_button_pressed(event: Button.Pressed) -> None

Handle button presses.

Source code in src/git_autosquash/tui/modern_screens.py
@on(Button.Pressed)
def on_button_pressed(self, event: Button.Pressed) -> None:
    """Handle button presses."""
    if event.button.id == "continue-btn":
        self.action_continue()
    elif event.button.id == "cancel-btn":
        self.action_cancel()
ChangeListItem

Bases: ListItem

List item for changes in the left panel.

Source code in src/git_autosquash/tui/modern_screens.py
class ChangeListItem(ListItem):
    """List item for changes in the left panel."""

    def __init__(self, text: str, mapping: HunkTargetMapping) -> None:
        super().__init__()
        self.mapping = mapping
        # Single-line text that doesn't wrap
        self._text = Static(text, expand=True)

    def compose(self) -> ComposeResult:
        yield self._text

UI Controllers

Controllers for managing TUI state and interactions.

git_autosquash.tui.ui_controllers

UI management controllers for robust event-driven TUI coordination.

Classes
UIState

Bases: Enum

UI lifecycle states.

Source code in src/git_autosquash/tui/ui_controllers.py
class UIState(Enum):
    """UI lifecycle states."""

    INITIALIZING = auto()
    MOUNTED = auto()
    FOCUS_READY = auto()
    SCROLL_READY = auto()
    FULLY_READY = auto()
FocusController

Manages focus state coordination without timing dependencies.

Source code in src/git_autosquash/tui/ui_controllers.py
class FocusController:
    """Manages focus state coordination without timing dependencies."""

    def __init__(self, widget: Widget) -> None:
        self.widget = widget
        self.focus_ready = asyncio.Event()
        self.focus_targets: Dict[str, Widget] = {}
        self._cleanup_tasks: List[asyncio.Task] = []

    async def wait_for_focus_ready(self) -> None:
        """Wait for focus system to be ready."""
        await self.focus_ready.wait()

    def mark_focus_ready(self) -> None:
        """Mark focus system as ready."""
        self.focus_ready.set()

    def register_focus_target(self, target_id: str, widget: Widget) -> None:
        """Register a widget that can receive focus."""
        self.focus_targets[target_id] = widget

    async def set_focus_to_selected(self, radio_set_id: str) -> bool:
        """Set focus to the currently selected radio button in a RadioSet.

        Returns:
            True if focus was set successfully, False otherwise
        """
        try:
            if radio_set_id not in self.focus_targets:
                return False

            radio_set = self.focus_targets[radio_set_id]
            if not hasattr(radio_set, "query") or not hasattr(radio_set, "focus"):
                return False

            # Find selected button using manual iteration
            try:
                all_buttons = radio_set.query("RadioButton").results()
                selected_button = None
                for btn in all_buttons:
                    if getattr(btn, "value", False):
                        selected_button = btn
                        break

                if selected_button:
                    # Focus the RadioSet, then navigate to selected button
                    radio_set.focus()

                    # Use RadioSet's internal navigation to reach the selected button
                    all_buttons = radio_set.query("RadioButton").results()
                    selected_index = None

                    for i, button in enumerate(all_buttons):
                        if button is selected_button:
                            selected_index = i
                            break

                    if selected_index is not None and selected_index > 0:
                        # Navigate to the selected position
                        for _ in range(selected_index):
                            if hasattr(radio_set, "action_next_button"):
                                radio_set.action_next_button()

                    return True

            except Exception:
                # Fallback to just focusing the RadioSet
                radio_set.focus()
                return True

        except Exception:
            return False

        return False

    def cleanup(self) -> None:
        """Clean up resources and cancel pending tasks."""
        for task in self._cleanup_tasks:
            if not task.done():
                task.cancel()
        self._cleanup_tasks.clear()
        self.focus_targets.clear()
Functions
wait_for_focus_ready() -> None async

Wait for focus system to be ready.

Source code in src/git_autosquash/tui/ui_controllers.py
async def wait_for_focus_ready(self) -> None:
    """Wait for focus system to be ready."""
    await self.focus_ready.wait()
mark_focus_ready() -> None

Mark focus system as ready.

Source code in src/git_autosquash/tui/ui_controllers.py
def mark_focus_ready(self) -> None:
    """Mark focus system as ready."""
    self.focus_ready.set()
register_focus_target(target_id: str, widget: Widget) -> None

Register a widget that can receive focus.

Source code in src/git_autosquash/tui/ui_controllers.py
def register_focus_target(self, target_id: str, widget: Widget) -> None:
    """Register a widget that can receive focus."""
    self.focus_targets[target_id] = widget
set_focus_to_selected(radio_set_id: str) -> bool async

Set focus to the currently selected radio button in a RadioSet.

Returns:

Type Description
bool

True if focus was set successfully, False otherwise

Source code in src/git_autosquash/tui/ui_controllers.py
async def set_focus_to_selected(self, radio_set_id: str) -> bool:
    """Set focus to the currently selected radio button in a RadioSet.

    Returns:
        True if focus was set successfully, False otherwise
    """
    try:
        if radio_set_id not in self.focus_targets:
            return False

        radio_set = self.focus_targets[radio_set_id]
        if not hasattr(radio_set, "query") or not hasattr(radio_set, "focus"):
            return False

        # Find selected button using manual iteration
        try:
            all_buttons = radio_set.query("RadioButton").results()
            selected_button = None
            for btn in all_buttons:
                if getattr(btn, "value", False):
                    selected_button = btn
                    break

            if selected_button:
                # Focus the RadioSet, then navigate to selected button
                radio_set.focus()

                # Use RadioSet's internal navigation to reach the selected button
                all_buttons = radio_set.query("RadioButton").results()
                selected_index = None

                for i, button in enumerate(all_buttons):
                    if button is selected_button:
                        selected_index = i
                        break

                if selected_index is not None and selected_index > 0:
                    # Navigate to the selected position
                    for _ in range(selected_index):
                        if hasattr(radio_set, "action_next_button"):
                            radio_set.action_next_button()

                return True

        except Exception:
            # Fallback to just focusing the RadioSet
            radio_set.focus()
            return True

    except Exception:
        return False

    return False
cleanup() -> None

Clean up resources and cancel pending tasks.

Source code in src/git_autosquash/tui/ui_controllers.py
def cleanup(self) -> None:
    """Clean up resources and cancel pending tasks."""
    for task in self._cleanup_tasks:
        if not task.done():
            task.cancel()
    self._cleanup_tasks.clear()
    self.focus_targets.clear()
ScrollManager

Centralized scroll state management.

Source code in src/git_autosquash/tui/ui_controllers.py
class ScrollManager:
    """Centralized scroll state management."""

    def __init__(self) -> None:
        self._target_position = (0, 0)
        self._scroll_lock = asyncio.Lock()
        self._scroll_targets: Dict[str, Widget] = {}
        self._scroll_ready = asyncio.Event()

    def register_scroll_target(self, target_id: str, widget: Widget) -> None:
        """Register a widget that can be scrolled."""
        self._scroll_targets[target_id] = widget

    def mark_scroll_ready(self) -> None:
        """Mark scroll system as ready."""
        self._scroll_ready.set()

    async def wait_for_scroll_ready(self) -> None:
        """Wait for scroll system to be ready."""
        await self._scroll_ready.wait()

    async def scroll_to_top(self, target_id: str) -> bool:
        """Scroll target widget to top position.

        Args:
            target_id: ID of the scroll target to move to top

        Returns:
            True if scroll was successful, False otherwise
        """
        async with self._scroll_lock:
            try:
                if target_id not in self._scroll_targets:
                    return False

                widget = self._scroll_targets[target_id]
                if hasattr(widget, "scroll_to"):
                    widget.scroll_to(0, 0, animate=False)
                    self._target_position = (0, 0)
                    return True

            except Exception:
                return False

        return False

    def cleanup(self) -> None:
        """Clean up scroll targets."""
        self._scroll_targets.clear()
Functions
register_scroll_target(target_id: str, widget: Widget) -> None

Register a widget that can be scrolled.

Source code in src/git_autosquash/tui/ui_controllers.py
def register_scroll_target(self, target_id: str, widget: Widget) -> None:
    """Register a widget that can be scrolled."""
    self._scroll_targets[target_id] = widget
mark_scroll_ready() -> None

Mark scroll system as ready.

Source code in src/git_autosquash/tui/ui_controllers.py
def mark_scroll_ready(self) -> None:
    """Mark scroll system as ready."""
    self._scroll_ready.set()
wait_for_scroll_ready() -> None async

Wait for scroll system to be ready.

Source code in src/git_autosquash/tui/ui_controllers.py
async def wait_for_scroll_ready(self) -> None:
    """Wait for scroll system to be ready."""
    await self._scroll_ready.wait()
scroll_to_top(target_id: str) -> bool async

Scroll target widget to top position.

Parameters:

Name Type Description Default
target_id str

ID of the scroll target to move to top

required

Returns:

Type Description
bool

True if scroll was successful, False otherwise

Source code in src/git_autosquash/tui/ui_controllers.py
async def scroll_to_top(self, target_id: str) -> bool:
    """Scroll target widget to top position.

    Args:
        target_id: ID of the scroll target to move to top

    Returns:
        True if scroll was successful, False otherwise
    """
    async with self._scroll_lock:
        try:
            if target_id not in self._scroll_targets:
                return False

            widget = self._scroll_targets[target_id]
            if hasattr(widget, "scroll_to"):
                widget.scroll_to(0, 0, animate=False)
                self._target_position = (0, 0)
                return True

        except Exception:
            return False

    return False
cleanup() -> None

Clean up scroll targets.

Source code in src/git_autosquash/tui/ui_controllers.py
def cleanup(self) -> None:
    """Clean up scroll targets."""
    self._scroll_targets.clear()
UILifecycleManager

Coordinates UI lifecycle without timing dependencies.

Source code in src/git_autosquash/tui/ui_controllers.py
class UILifecycleManager:
    """Coordinates UI lifecycle without timing dependencies."""

    def __init__(self, widget: Widget) -> None:
        self.widget = widget
        self.focus_controller = FocusController(widget)
        self.scroll_manager = ScrollManager()
        self.state = UIState.INITIALIZING
        self._ready_callbacks: List[Callable[[], None]] = []
        self._cleanup_callbacks: List[Callable[[], None]] = []

    def register_ready_callback(self, callback: Callable[[], None]) -> None:
        """Register callback to run when UI is fully ready."""
        if self.state == UIState.FULLY_READY:
            # Already ready, execute immediately
            try:
                callback()
            except Exception as e:
                # Log but don't crash
                if hasattr(self.widget, "log"):
                    self.widget.log.error(f"Ready callback failed: {e}")
        else:
            self._ready_callbacks.append(callback)

    def register_cleanup_callback(self, callback: Callable[[], None]) -> None:
        """Register cleanup callback for unmount."""
        self._cleanup_callbacks.append(callback)

    def advance_to_mounted(self) -> None:
        """Advance to mounted state."""
        if self.state == UIState.INITIALIZING:
            self.state = UIState.MOUNTED
            self._check_ready_state()

    def advance_to_focus_ready(self) -> None:
        """Advance to focus ready state."""
        if self.state in (UIState.INITIALIZING, UIState.MOUNTED):
            self.state = UIState.FOCUS_READY
            self.focus_controller.mark_focus_ready()
            self._check_ready_state()

    def advance_to_scroll_ready(self) -> None:
        """Advance to scroll ready state."""
        if self.state in (UIState.INITIALIZING, UIState.MOUNTED, UIState.FOCUS_READY):
            self.state = UIState.SCROLL_READY
            self.scroll_manager.mark_scroll_ready()
            self._check_ready_state()

    def _check_ready_state(self) -> None:
        """Check if we can advance to fully ready."""
        if (
            self.state in (UIState.FOCUS_READY, UIState.SCROLL_READY)
            and self.focus_controller.focus_ready.is_set()
            and self.scroll_manager._scroll_ready.is_set()
        ):
            self.state = UIState.FULLY_READY
            self._execute_ready_callbacks()

    def _execute_ready_callbacks(self) -> None:
        """Execute all ready callbacks."""
        for callback in self._ready_callbacks:
            try:
                callback()
            except Exception as e:
                if hasattr(self.widget, "log"):
                    self.widget.log.error(f"Ready callback failed: {e}")
        self._ready_callbacks.clear()

    def cleanup(self) -> None:
        """Clean up all resources."""
        # Execute cleanup callbacks
        for callback in self._cleanup_callbacks:
            try:
                callback()
            except Exception as e:
                if hasattr(self.widget, "log"):
                    self.widget.log.error(f"Cleanup callback failed: {e}")

        # Clean up controllers
        self.focus_controller.cleanup()
        self.scroll_manager.cleanup()

        # Clear callbacks
        self._ready_callbacks.clear()
        self._cleanup_callbacks.clear()
Functions
register_ready_callback(callback: Callable[[], None]) -> None

Register callback to run when UI is fully ready.

Source code in src/git_autosquash/tui/ui_controllers.py
def register_ready_callback(self, callback: Callable[[], None]) -> None:
    """Register callback to run when UI is fully ready."""
    if self.state == UIState.FULLY_READY:
        # Already ready, execute immediately
        try:
            callback()
        except Exception as e:
            # Log but don't crash
            if hasattr(self.widget, "log"):
                self.widget.log.error(f"Ready callback failed: {e}")
    else:
        self._ready_callbacks.append(callback)
register_cleanup_callback(callback: Callable[[], None]) -> None

Register cleanup callback for unmount.

Source code in src/git_autosquash/tui/ui_controllers.py
def register_cleanup_callback(self, callback: Callable[[], None]) -> None:
    """Register cleanup callback for unmount."""
    self._cleanup_callbacks.append(callback)
advance_to_mounted() -> None

Advance to mounted state.

Source code in src/git_autosquash/tui/ui_controllers.py
def advance_to_mounted(self) -> None:
    """Advance to mounted state."""
    if self.state == UIState.INITIALIZING:
        self.state = UIState.MOUNTED
        self._check_ready_state()
advance_to_focus_ready() -> None

Advance to focus ready state.

Source code in src/git_autosquash/tui/ui_controllers.py
def advance_to_focus_ready(self) -> None:
    """Advance to focus ready state."""
    if self.state in (UIState.INITIALIZING, UIState.MOUNTED):
        self.state = UIState.FOCUS_READY
        self.focus_controller.mark_focus_ready()
        self._check_ready_state()
advance_to_scroll_ready() -> None

Advance to scroll ready state.

Source code in src/git_autosquash/tui/ui_controllers.py
def advance_to_scroll_ready(self) -> None:
    """Advance to scroll ready state."""
    if self.state in (UIState.INITIALIZING, UIState.MOUNTED, UIState.FOCUS_READY):
        self.state = UIState.SCROLL_READY
        self.scroll_manager.mark_scroll_ready()
        self._check_ready_state()
cleanup() -> None

Clean up all resources.

Source code in src/git_autosquash/tui/ui_controllers.py
def cleanup(self) -> None:
    """Clean up all resources."""
    # Execute cleanup callbacks
    for callback in self._cleanup_callbacks:
        try:
            callback()
        except Exception as e:
            if hasattr(self.widget, "log"):
                self.widget.log.error(f"Cleanup callback failed: {e}")

    # Clean up controllers
    self.focus_controller.cleanup()
    self.scroll_manager.cleanup()

    # Clear callbacks
    self._ready_callbacks.clear()
    self._cleanup_callbacks.clear()

Usage Examples

Basic API Usage

Here's how to use the core components programmatically:

from git_autosquash.git_ops import GitOps
from git_autosquash.hunk_parser import HunkParser
from git_autosquash.blame_analyzer import BlameAnalyzer

# Initialize components
git_ops = GitOps(".")
hunk_parser = HunkParser(git_ops)
blame_analyzer = BlameAnalyzer(git_ops, "main")

# Get diff hunks
hunks = hunk_parser.get_diff_hunks()

# Analyze hunks to find target commits
mappings = blame_analyzer.analyze_hunks(hunks)

# Print results
for mapping in mappings:
    print(f"{mapping.hunk.file_path}: {mapping.target_commit} ({mapping.confidence})")

Custom TUI Integration

from git_autosquash.tui.modern_app import ModernAutoSquashApp

# Create custom TUI with your mappings  
app = ModernAutoSquashApp(mappings, commit_analyzer)
approved = app.run()

if approved and app.approved_mappings:
    print(f"User approved {len(app.approved_mappings)} hunks")
    # Process approved mappings...

Rebase Execution

from git_autosquash.rebase_manager import RebaseManager, RebaseConflictError

try:
    rebase_manager = RebaseManager(git_ops, merge_base)
    success = rebase_manager.execute_squash(approved_mappings)

    if success:
        print("Squash operation completed successfully!")
    else:
        print("Operation was cancelled by user")

except RebaseConflictError as e:
    print(f"Conflicts in: {', '.join(e.conflicted_files)}")
    # Handle conflicts...

Type Definitions

Common Types

from typing import List, Dict, Optional, Set

# Confidence levels
ConfidenceLevel = Literal["high", "medium", "low"]

# Working tree status
WorkingTreeStatus = Dict[str, bool]  # {"is_clean": bool, "has_staged": bool, "has_unstaged": bool}

# Rebase status information
RebaseStatus = Dict[str, Any]  # {"in_progress": bool, "conflicted_files": List[str], ...}

Error Types

All components raise specific exception types for different error conditions:

  • GitOps: subprocess.SubprocessError for Git command failures
  • HunkParser: ValueError for parsing errors
  • BlameAnalyzer: subprocess.SubprocessError for blame failures
  • RebaseManager: RebaseConflictError for merge conflicts
  • TUI: Standard Textual exceptions for interface errors

Configuration Options

Environment Variables

git-autosquash respects these environment variables:

  • GIT_SEQUENCE_EDITOR: Custom editor for interactive rebase (automatically managed)
  • TERM: Terminal type for TUI rendering
  • NO_COLOR: Disable colored output when set

Git Configuration

The following Git configuration affects git-autosquash behavior:

  • core.editor: Default editor for conflict resolution
  • merge.tool: Merge tool for resolving conflicts
  • rebase.autoStash: Automatic stashing during rebase (overridden by git-autosquash)

Performance Considerations

Caching

Several operations are cached for performance:

  • Branch commits: Expensive git rev-list operations
  • Commit timestamps: git show calls for chronological ordering
  • Commit summaries: git log output for display

Memory Usage

  • Diff parsing: Streams large diffs to avoid memory issues
  • Blame analysis: Processes hunks individually to limit memory usage
  • TUI rendering: Efficient widget updates and syntax highlighting

Git Operation Optimization

  • Batch operations: Groups related Git commands where possible
  • Subprocess management: Proper timeout and resource cleanup
  • Working directory: Minimal file I/O and temporary file usage

Testing the API

The API components are extensively tested. To run tests for specific components:

# Test specific components
uv run pytest tests/test_git_ops.py -v
uv run pytest tests/test_hunk_parser.py -v
uv run pytest tests/test_blame_analyzer.py -v
uv run pytest tests/test_rebase_manager.py -v

# Test with coverage
uv run pytest --cov=git_autosquash tests/

Contributing to the API

When extending the API:

  1. Follow existing patterns: Maintain consistency with current design
  2. Add comprehensive docstrings: Use Google style docstrings
  3. Include type annotations: Full typing for all public methods
  4. Write tests: Unit tests with appropriate mocking
  5. Update documentation: This reference updates automatically from docstrings