Skip to content

API Reference

This section provides detailed API documentation for git-autosquash's core components. The documentation is auto-generated from docstrings to ensure accuracy.

Core Components

GitOps

The central interface for all Git operations with proper error handling.

git_autosquash.git_ops

Git operations module for repository analysis and commands.

Classes
GitOps

Handles git operations for repository analysis and validation.

Source code in src/git_autosquash/git_ops.py
class GitOps:
    """Handles git operations for repository analysis and validation."""

    def __init__(self, repo_path: Optional[Path] = None) -> None:
        """Initialize GitOps with optional repository path.

        Args:
            repo_path: Path to git repository. Defaults to current directory.
        """
        self.repo_path = repo_path if repo_path is not None else Path.cwd()

    def _run_git_command(self, *args: str) -> tuple[bool, str]:
        """Run a git command and return success status and output.

        Args:
            *args: Git command arguments

        Returns:
            Tuple of (success, output/error_message)
        """
        try:
            result = subprocess.run(
                ["git", *args],
                cwd=self.repo_path,
                capture_output=True,
                text=True,
                check=False,
            )
            return (
                result.returncode == 0,
                result.stdout.strip() or result.stderr.strip(),
            )
        except (subprocess.SubprocessError, FileNotFoundError) as e:
            return False, f"Git command failed: {e}"

    def _run_git_command_with_input(
        self, *args: str, input_text: str
    ) -> tuple[bool, str]:
        """Run a git command with input text and return success status and output.

        Args:
            *args: Git command arguments
            input_text: Text to provide as stdin to the command

        Returns:
            Tuple of (success, output/error_message)
        """
        try:
            result = subprocess.run(
                ["git", *args],
                cwd=self.repo_path,
                input=input_text,
                capture_output=True,
                text=True,
                check=False,
            )
            return (
                result.returncode == 0,
                result.stdout.strip() or result.stderr.strip(),
            )
        except (subprocess.SubprocessError, FileNotFoundError) as e:
            return False, f"Git command failed: {e}"

    def is_git_repo(self) -> bool:
        """Check if current directory is inside a git repository.

        Returns:
            True if in a git repository, False otherwise
        """
        success, _ = self._run_git_command("rev-parse", "--git-dir")
        return success

    def get_current_branch(self) -> Optional[str]:
        """Get the current branch name.

        Returns:
            Branch name if on a branch, None if detached HEAD
        """
        success, output = self._run_git_command("symbolic-ref", "--short", "HEAD")
        return output if success else None

    def get_merge_base_with_main(self, current_branch: str) -> Optional[str]:
        """Find merge base with main/master branch.

        Args:
            current_branch: Current branch name

        Returns:
            Commit hash of merge base, or None if not found
        """
        # Try merge-base directly, let git handle missing refs
        for main_branch in ["main", "master"]:
            if main_branch == current_branch:
                continue

            success, output = self._run_git_command(
                "merge-base", main_branch, current_branch
            )
            if success:
                return output

        return None

    def get_working_tree_status(self) -> dict[str, bool]:
        """Get working tree status information.

        Returns:
            Dictionary with status flags: has_staged, has_unstaged, is_clean
        """
        success, output = self._run_git_command("status", "--porcelain")
        if not success:
            return {"has_staged": False, "has_unstaged": False, "is_clean": True}

        lines = output.split("\n") if output else []
        has_staged = any(line and line[0] not in "? " for line in lines)
        has_unstaged = any(line and line[1] not in " " for line in lines)
        is_clean = not lines or all(not line.strip() for line in lines)

        return {
            "has_staged": has_staged,
            "has_unstaged": has_unstaged,
            "is_clean": is_clean,
        }

    def has_commits_since_merge_base(self, merge_base: str) -> bool:
        """Check if there are commits on current branch since merge base.

        Args:
            merge_base: Merge base commit hash

        Returns:
            True if there are commits to work with
        """
        success, output = self._run_git_command(
            "rev-list", "--count", f"{merge_base}..HEAD"
        )
        if not success:
            return False

        try:
            count = int(output)
            return count > 0
        except ValueError:
            return False

    def run_git_command(
        self, args: list[str], env: dict[str, str] | None = None
    ) -> subprocess.CompletedProcess[str]:
        """Run a git command and return the complete result.

        Args:
            args: Git command arguments (without 'git')
            env: Optional environment variables

        Returns:
            CompletedProcess with stdout, stderr, and return code
        """
        cmd = ["git"] + args
        try:
            result = subprocess.run(
                cmd,
                cwd=self.repo_path,
                capture_output=True,
                text=True,
                env=env,
                timeout=300,  # 5 minute timeout
            )
            return result
        except subprocess.TimeoutExpired as e:
            return subprocess.CompletedProcess(
                args=cmd,
                returncode=124,  # timeout exit code
                stdout=e.stdout.decode() if e.stdout else "",
                stderr=f"Command timed out after 300 seconds: {e}",
            )
        except (OSError, PermissionError, FileNotFoundError) as e:
            # File system or permission errors
            return subprocess.CompletedProcess(
                args=cmd,
                returncode=1,
                stdout="",
                stderr=f"System error: {e}",
            )
        except Exception as e:
            # Unexpected errors - wrap for better reporting
            wrapped = handle_unexpected_error(e, f"git command: {' '.join(cmd)}")
            return subprocess.CompletedProcess(
                args=cmd,
                returncode=1,
                stdout="",
                stderr=str(wrapped),
            )
Functions
__init__(repo_path: Optional[Path] = None) -> None

Initialize GitOps with optional repository path.

Parameters:

Name Type Description Default
repo_path Optional[Path]

Path to git repository. Defaults to current directory.

None
Source code in src/git_autosquash/git_ops.py
def __init__(self, repo_path: Optional[Path] = None) -> None:
    """Initialize GitOps with optional repository path.

    Args:
        repo_path: Path to git repository. Defaults to current directory.
    """
    self.repo_path = repo_path if repo_path is not None else Path.cwd()
is_git_repo() -> bool

Check if current directory is inside a git repository.

Returns:

Type Description
bool

True if in a git repository, False otherwise

Source code in src/git_autosquash/git_ops.py
def is_git_repo(self) -> bool:
    """Check if current directory is inside a git repository.

    Returns:
        True if in a git repository, False otherwise
    """
    success, _ = self._run_git_command("rev-parse", "--git-dir")
    return success
get_current_branch() -> Optional[str]

Get the current branch name.

Returns:

Type Description
Optional[str]

Branch name if on a branch, None if detached HEAD

Source code in src/git_autosquash/git_ops.py
def get_current_branch(self) -> Optional[str]:
    """Get the current branch name.

    Returns:
        Branch name if on a branch, None if detached HEAD
    """
    success, output = self._run_git_command("symbolic-ref", "--short", "HEAD")
    return output if success else None
get_merge_base_with_main(current_branch: str) -> Optional[str]

Find merge base with main/master branch.

Parameters:

Name Type Description Default
current_branch str

Current branch name

required

Returns:

Type Description
Optional[str]

Commit hash of merge base, or None if not found

Source code in src/git_autosquash/git_ops.py
def get_merge_base_with_main(self, current_branch: str) -> Optional[str]:
    """Find merge base with main/master branch.

    Args:
        current_branch: Current branch name

    Returns:
        Commit hash of merge base, or None if not found
    """
    # Try merge-base directly, let git handle missing refs
    for main_branch in ["main", "master"]:
        if main_branch == current_branch:
            continue

        success, output = self._run_git_command(
            "merge-base", main_branch, current_branch
        )
        if success:
            return output

    return None
get_working_tree_status() -> dict[str, bool]

Get working tree status information.

Returns:

Type Description
dict[str, bool]

Dictionary with status flags: has_staged, has_unstaged, is_clean

Source code in src/git_autosquash/git_ops.py
def get_working_tree_status(self) -> dict[str, bool]:
    """Get working tree status information.

    Returns:
        Dictionary with status flags: has_staged, has_unstaged, is_clean
    """
    success, output = self._run_git_command("status", "--porcelain")
    if not success:
        return {"has_staged": False, "has_unstaged": False, "is_clean": True}

    lines = output.split("\n") if output else []
    has_staged = any(line and line[0] not in "? " for line in lines)
    has_unstaged = any(line and line[1] not in " " for line in lines)
    is_clean = not lines or all(not line.strip() for line in lines)

    return {
        "has_staged": has_staged,
        "has_unstaged": has_unstaged,
        "is_clean": is_clean,
    }
has_commits_since_merge_base(merge_base: str) -> bool

Check if there are commits on current branch since merge base.

Parameters:

Name Type Description Default
merge_base str

Merge base commit hash

required

Returns:

Type Description
bool

True if there are commits to work with

Source code in src/git_autosquash/git_ops.py
def has_commits_since_merge_base(self, merge_base: str) -> bool:
    """Check if there are commits on current branch since merge base.

    Args:
        merge_base: Merge base commit hash

    Returns:
        True if there are commits to work with
    """
    success, output = self._run_git_command(
        "rev-list", "--count", f"{merge_base}..HEAD"
    )
    if not success:
        return False

    try:
        count = int(output)
        return count > 0
    except ValueError:
        return False
run_git_command(args: list[str], env: dict[str, str] | None = None) -> subprocess.CompletedProcess[str]

Run a git command and return the complete result.

Parameters:

Name Type Description Default
args list[str]

Git command arguments (without 'git')

required
env dict[str, str] | None

Optional environment variables

None

Returns:

Type Description
CompletedProcess[str]

CompletedProcess with stdout, stderr, and return code

Source code in src/git_autosquash/git_ops.py
def run_git_command(
    self, args: list[str], env: dict[str, str] | None = None
) -> subprocess.CompletedProcess[str]:
    """Run a git command and return the complete result.

    Args:
        args: Git command arguments (without 'git')
        env: Optional environment variables

    Returns:
        CompletedProcess with stdout, stderr, and return code
    """
    cmd = ["git"] + args
    try:
        result = subprocess.run(
            cmd,
            cwd=self.repo_path,
            capture_output=True,
            text=True,
            env=env,
            timeout=300,  # 5 minute timeout
        )
        return result
    except subprocess.TimeoutExpired as e:
        return subprocess.CompletedProcess(
            args=cmd,
            returncode=124,  # timeout exit code
            stdout=e.stdout.decode() if e.stdout else "",
            stderr=f"Command timed out after 300 seconds: {e}",
        )
    except (OSError, PermissionError, FileNotFoundError) as e:
        # File system or permission errors
        return subprocess.CompletedProcess(
            args=cmd,
            returncode=1,
            stdout="",
            stderr=f"System error: {e}",
        )
    except Exception as e:
        # Unexpected errors - wrap for better reporting
        wrapped = handle_unexpected_error(e, f"git command: {' '.join(cmd)}")
        return subprocess.CompletedProcess(
            args=cmd,
            returncode=1,
            stdout="",
            stderr=str(wrapped),
        )
Functions

HunkParser

Parses Git diff output into structured hunk objects.

git_autosquash.hunk_parser

Diff parsing and hunk splitting module.

Classes
DiffHunk dataclass

Represents a single diff hunk with metadata.

Source code in src/git_autosquash/hunk_parser.py
@dataclass
class DiffHunk:
    """Represents a single diff hunk with metadata."""

    file_path: str
    old_start: int
    old_count: int
    new_start: int
    new_count: int
    lines: List[str]
    context_before: List[str]
    context_after: List[str]

    @property
    def affected_lines(self) -> range:
        """Get the range of lines affected in the new file."""
        return range(self.new_start, self.new_start + self.new_count)

    @property
    def has_additions(self) -> bool:
        """Check if hunk contains added lines."""
        return any(
            line.startswith("+") and not line.startswith("+++") for line in self.lines
        )

    @property
    def has_deletions(self) -> bool:
        """Check if hunk contains deleted lines."""
        return any(
            line.startswith("-") and not line.startswith("---") for line in self.lines
        )
Attributes
affected_lines: range property

Get the range of lines affected in the new file.

has_additions: bool property

Check if hunk contains added lines.

has_deletions: bool property

Check if hunk contains deleted lines.

HunkParser

Parses git diff output into structured hunks.

Source code in src/git_autosquash/hunk_parser.py
class HunkParser:
    """Parses git diff output into structured hunks."""

    def __init__(self, git_ops: GitOps) -> None:
        """Initialize HunkParser with GitOps instance.

        Args:
            git_ops: GitOps instance for running git commands
        """
        self.git_ops = git_ops

    def get_diff_hunks(self, line_by_line: bool = False) -> List[DiffHunk]:
        """Extract hunks from current working tree or staged changes.

        Args:
            line_by_line: If True, split hunks line-by-line for finer granularity

        Returns:
            List of DiffHunk objects representing changes
        """
        # Get working tree status to determine what to diff
        status = self.git_ops.get_working_tree_status()

        if status["is_clean"]:
            # Working tree is clean, diff HEAD~1 to get previous commit changes
            success, diff_output = self.git_ops._run_git_command(
                "show", "--format=", "HEAD"
            )
        elif status["has_staged"] and not status["has_unstaged"]:
            # Only staged changes, diff them
            success, diff_output = self.git_ops._run_git_command("diff", "--cached")
        elif not status["has_staged"] and status["has_unstaged"]:
            # Only unstaged changes, diff them
            success, diff_output = self.git_ops._run_git_command("diff")
        else:
            # Mixed changes, diff all (staged + unstaged)
            success, diff_output = self.git_ops._run_git_command("diff", "HEAD")

        if not success:
            return []

        hunks = self._parse_diff_output(diff_output)

        if line_by_line:
            hunks = self._split_hunks_line_by_line(hunks)

        return hunks

    def _parse_diff_output(self, diff_output: str) -> List[DiffHunk]:
        """Parse git diff output into DiffHunk objects.

        Args:
            diff_output: Raw git diff output

        Returns:
            List of parsed DiffHunk objects
        """
        if not diff_output.strip():
            return []

        hunks = []
        lines = diff_output.split("\n")
        current_file = None
        i = 0

        while i < len(lines):
            line = lines[i]

            # Track current file being processed
            if line.startswith("diff --git"):
                # Extract file path from "diff --git a/path b/path"
                match = re.match(r"diff --git a/(.*) b/(.*)", line)
                if match:
                    current_file = match.group(2)  # Use the new file path

            elif line.startswith("@@") and current_file:
                # Parse hunk header: @@ -old_start,old_count +new_start,new_count @@
                hunk_match = re.match(
                    r"@@ -(\d+)(?:,(\d+))? \+(\d+)(?:,(\d+))? @@", line
                )
                if hunk_match:
                    old_start = int(hunk_match.group(1))
                    old_count = int(hunk_match.group(2) or 1)
                    new_start = int(hunk_match.group(3))
                    new_count = int(hunk_match.group(4) or 1)

                    # Collect hunk lines
                    hunk_lines = [line]  # Include the @@ line
                    i += 1

                    while (
                        i < len(lines)
                        and not lines[i].startswith("@@")
                        and not lines[i].startswith("diff --git")
                    ):
                        hunk_lines.append(
                            lines[i]
                        )  # Preserve all lines including empty ones
                        i += 1

                    # Create DiffHunk object
                    hunk = DiffHunk(
                        file_path=current_file,
                        old_start=old_start,
                        old_count=old_count,
                        new_start=new_start,
                        new_count=new_count,
                        lines=hunk_lines,
                        context_before=[],
                        context_after=[],
                    )

                    hunks.append(hunk)
                    continue  # Don't increment i, it was already incremented in the loop

            i += 1

        return hunks

    def _split_hunks_line_by_line(self, hunks: List[DiffHunk]) -> List[DiffHunk]:
        """Split hunks into line-by-line changes for finer granularity.

        Args:
            hunks: List of original hunks to split

        Returns:
            List of line-by-line split hunks
        """
        split_hunks = []

        for hunk in hunks:
            # If hunk is already small (single line change), keep as-is
            change_lines = [
                line for line in hunk.lines[1:] if line.startswith(("+", "-"))
            ]
            if len(change_lines) <= 1:
                split_hunks.append(hunk)
                continue

            # Split into individual line changes
            current_old_line = hunk.old_start
            current_new_line = hunk.new_start

            i = 1  # Skip the @@ header line
            while i < len(hunk.lines):
                line = hunk.lines[i]

                if line.startswith("+"):
                    # Addition: create a single-line hunk with proper line counts
                    header = f"@@ -{current_old_line},0 +{current_new_line},1 @@"
                    new_hunk = DiffHunk(
                        file_path=hunk.file_path,
                        old_start=current_old_line,
                        old_count=0,
                        new_start=current_new_line,
                        new_count=1,
                        lines=[header, line],
                        context_before=[],
                        context_after=[],
                    )
                    split_hunks.append(new_hunk)
                    current_new_line += 1

                elif line.startswith("-"):
                    # Deletion: create a single-line hunk with proper line counts
                    header = f"@@ -{current_old_line},1 +{current_new_line},0 @@"
                    new_hunk = DiffHunk(
                        file_path=hunk.file_path,
                        old_start=current_old_line,
                        old_count=1,
                        new_start=current_new_line,
                        new_count=0,
                        lines=[header, line],
                        context_before=[],
                        context_after=[],
                    )
                    split_hunks.append(new_hunk)
                    current_old_line += 1

                else:
                    # Context line: advance both pointers
                    current_old_line += 1
                    current_new_line += 1

                i += 1

        return split_hunks

    def get_file_content_at_lines(
        self, file_path: str, start_line: int, end_line: int
    ) -> List[str]:
        """Get file content at specific line range for context.

        Args:
            file_path: Path to the file
            start_line: Starting line number (1-based)
            end_line: Ending line number (1-based, inclusive)

        Returns:
            List of lines from the file, empty list on error
        """
        # Use git show with line range for efficiency on large files
        success, output = self.git_ops._run_git_command("show", f"HEAD:{file_path}")

        if not success:
            return []

        try:
            lines = output.split("\n")
            # Convert to 0-based indexing and ensure bounds
            start_idx = max(0, start_line - 1)
            end_idx = min(len(lines), end_line)

            return lines[start_idx:end_idx]
        except Exception:
            # Handle any parsing errors gracefully
            return []
Functions
__init__(git_ops: GitOps) -> None

Initialize HunkParser with GitOps instance.

Parameters:

Name Type Description Default
git_ops GitOps

GitOps instance for running git commands

required
Source code in src/git_autosquash/hunk_parser.py
def __init__(self, git_ops: GitOps) -> None:
    """Initialize HunkParser with GitOps instance.

    Args:
        git_ops: GitOps instance for running git commands
    """
    self.git_ops = git_ops
get_diff_hunks(line_by_line: bool = False) -> List[DiffHunk]

Extract hunks from current working tree or staged changes.

Parameters:

Name Type Description Default
line_by_line bool

If True, split hunks line-by-line for finer granularity

False

Returns:

Type Description
List[DiffHunk]

List of DiffHunk objects representing changes

Source code in src/git_autosquash/hunk_parser.py
def get_diff_hunks(self, line_by_line: bool = False) -> List[DiffHunk]:
    """Extract hunks from current working tree or staged changes.

    Args:
        line_by_line: If True, split hunks line-by-line for finer granularity

    Returns:
        List of DiffHunk objects representing changes
    """
    # Get working tree status to determine what to diff
    status = self.git_ops.get_working_tree_status()

    if status["is_clean"]:
        # Working tree is clean, diff HEAD~1 to get previous commit changes
        success, diff_output = self.git_ops._run_git_command(
            "show", "--format=", "HEAD"
        )
    elif status["has_staged"] and not status["has_unstaged"]:
        # Only staged changes, diff them
        success, diff_output = self.git_ops._run_git_command("diff", "--cached")
    elif not status["has_staged"] and status["has_unstaged"]:
        # Only unstaged changes, diff them
        success, diff_output = self.git_ops._run_git_command("diff")
    else:
        # Mixed changes, diff all (staged + unstaged)
        success, diff_output = self.git_ops._run_git_command("diff", "HEAD")

    if not success:
        return []

    hunks = self._parse_diff_output(diff_output)

    if line_by_line:
        hunks = self._split_hunks_line_by_line(hunks)

    return hunks
get_file_content_at_lines(file_path: str, start_line: int, end_line: int) -> List[str]

Get file content at specific line range for context.

Parameters:

Name Type Description Default
file_path str

Path to the file

required
start_line int

Starting line number (1-based)

required
end_line int

Ending line number (1-based, inclusive)

required

Returns:

Type Description
List[str]

List of lines from the file, empty list on error

Source code in src/git_autosquash/hunk_parser.py
def get_file_content_at_lines(
    self, file_path: str, start_line: int, end_line: int
) -> List[str]:
    """Get file content at specific line range for context.

    Args:
        file_path: Path to the file
        start_line: Starting line number (1-based)
        end_line: Ending line number (1-based, inclusive)

    Returns:
        List of lines from the file, empty list on error
    """
    # Use git show with line range for efficiency on large files
    success, output = self.git_ops._run_git_command("show", f"HEAD:{file_path}")

    if not success:
        return []

    try:
        lines = output.split("\n")
        # Convert to 0-based indexing and ensure bounds
        start_idx = max(0, start_line - 1)
        end_idx = min(len(lines), end_line)

        return lines[start_idx:end_idx]
    except Exception:
        # Handle any parsing errors gracefully
        return []

BlameAnalyzer

Analyzes Git blame information to determine target commits for hunks.

git_autosquash.blame_analyzer

Git blame analysis and target commit resolution.

Classes
TargetingMethod

Bases: Enum

Enum for different targeting methods used to resolve a hunk.

Source code in src/git_autosquash/blame_analyzer.py
class TargetingMethod(Enum):
    """Enum for different targeting methods used to resolve a hunk."""

    BLAME_MATCH = "blame_match"
    CONTEXTUAL_BLAME_MATCH = "contextual_blame_match"  # Found via context lines
    FALLBACK_NEW_FILE = "fallback_new_file"
    FALLBACK_EXISTING_FILE = "fallback_existing_file"
    FALLBACK_CONSISTENCY = (
        "fallback_consistency"  # Same target as previous hunk from file
    )
HunkTargetMapping dataclass

Maps a hunk to its target commit for squashing.

Source code in src/git_autosquash/blame_analyzer.py
@dataclass
class HunkTargetMapping:
    """Maps a hunk to its target commit for squashing."""

    hunk: DiffHunk
    target_commit: Optional[str]
    confidence: str  # 'high', 'medium', 'low'
    blame_info: List[BlameInfo]
    targeting_method: TargetingMethod = TargetingMethod.BLAME_MATCH
    fallback_candidates: Optional[List[str]] = (
        None  # List of commit hashes for fallback scenarios
    )
    needs_user_selection: bool = False  # True if user needs to choose from candidates
BlameAnalyzer

Analyzes git blame to determine target commits for hunks.

Source code in src/git_autosquash/blame_analyzer.py
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
class BlameAnalyzer:
    """Analyzes git blame to determine target commits for hunks."""

    def __init__(self, git_ops: GitOps, merge_base: str) -> None:
        """Initialize BlameAnalyzer.

        Args:
            git_ops: GitOps instance for running git commands
            merge_base: Merge base commit hash to limit scope
        """
        self.git_ops = git_ops
        self.merge_base = merge_base
        self.batch_ops = BatchGitOperations(git_ops, merge_base)
        self._branch_commits_cache: Optional[Set[str]] = None
        self._commit_timestamp_cache: Dict[str, int] = {}
        self._file_target_cache: Dict[str, str] = {}  # Track previous targets by file
        self._new_files_cache: Optional[Set[str]] = None
        self._file_line_count_cache: Dict[str, int] = {}  # Cache file line counts

    def analyze_hunks(self, hunks: List[DiffHunk]) -> List[HunkTargetMapping]:
        """Analyze hunks and determine target commits for each.

        Args:
            hunks: List of DiffHunk objects to analyze

        Returns:
            List of HunkTargetMapping objects with target commit information
        """
        mappings = []

        for hunk in hunks:
            mapping = self._analyze_single_hunk(hunk)
            mappings.append(mapping)

        return mappings

    def _analyze_single_hunk(self, hunk: DiffHunk) -> HunkTargetMapping:
        """Analyze a single hunk to determine its target commit.

        Args:
            hunk: DiffHunk to analyze

        Returns:
            HunkTargetMapping with target commit information
        """
        # Check if this is a new file
        if self._is_new_file(hunk.file_path):
            return self._create_fallback_mapping(
                hunk, TargetingMethod.FALLBACK_NEW_FILE
            )

        # Check for previous target from same file (consistency)
        if hunk.file_path in self._file_target_cache:
            previous_target = self._file_target_cache[hunk.file_path]
            return HunkTargetMapping(
                hunk=hunk,
                target_commit=previous_target,
                confidence="medium",
                blame_info=[],
                targeting_method=TargetingMethod.FALLBACK_CONSISTENCY,
                needs_user_selection=False,
            )

        # Try blame-based analysis
        if hunk.has_deletions:
            # Get blame for the old lines being modified/deleted
            blame_info = self._get_blame_for_old_lines(hunk)
        else:
            # Pure addition, look at surrounding context
            blame_info = self._get_blame_for_context(hunk)

        if not blame_info:
            # Try contextual blame scanning as a fallback before giving up
            contextual_result = self._try_contextual_blame_fallback(
                hunk, self._get_branch_commits()
            )
            if contextual_result:
                return contextual_result

            return self._create_fallback_mapping(
                hunk, TargetingMethod.FALLBACK_EXISTING_FILE
            )

        # Filter commits to only those within our branch scope
        branch_commits = self._get_branch_commits()
        relevant_blame = [
            info for info in blame_info if info.commit_hash in branch_commits
        ]

        if not relevant_blame:
            # Try contextual blame scanning as a fallback before giving up
            contextual_result = self._try_contextual_blame_fallback(
                hunk, branch_commits
            )
            if contextual_result:
                return contextual_result

            return self._create_fallback_mapping(
                hunk, TargetingMethod.FALLBACK_EXISTING_FILE, blame_info
            )

        # Group by commit and count occurrences
        commit_counts: Dict[str, int] = {}
        for info in relevant_blame:
            commit_counts[info.commit_hash] = commit_counts.get(info.commit_hash, 0) + 1

        # Find most frequent commit, break ties by recency (requirement: take most recent)
        most_frequent_commit, max_count = max(
            commit_counts.items(),
            key=lambda x: (x[1], self._get_commit_timestamp(x[0])),
        )

        total_lines = len(relevant_blame)
        confidence_ratio = max_count / total_lines

        if confidence_ratio >= 0.8:
            confidence = "high"
        elif confidence_ratio >= 0.5:
            confidence = "medium"
        else:
            confidence = "low"

        # Store successful target for file consistency
        self._file_target_cache[hunk.file_path] = most_frequent_commit

        return HunkTargetMapping(
            hunk=hunk,
            target_commit=most_frequent_commit,
            confidence=confidence,
            blame_info=relevant_blame,
            targeting_method=TargetingMethod.BLAME_MATCH,
            needs_user_selection=False,
        )

    def _get_blame_for_old_lines(self, hunk: DiffHunk) -> List[BlameInfo]:
        """Get blame information for lines being deleted/modified.

        Args:
            hunk: DiffHunk with deletions

        Returns:
            List of BlameInfo objects for the deleted lines
        """
        # Run blame on the file at HEAD (before changes)
        success, blame_output = self.git_ops._run_git_command(
            "blame",
            f"-L{hunk.old_start},{hunk.old_start + hunk.old_count - 1}",
            "HEAD",
            "--",
            hunk.file_path,
        )

        if not success:
            return []

        return self._parse_blame_output(blame_output)

    def _get_blame_for_context(self, hunk: DiffHunk) -> List[BlameInfo]:
        """Get blame information for context around an addition.

        Args:
            hunk: DiffHunk with additions

        Returns:
            List of BlameInfo objects for surrounding context
        """
        # For additions, we need to map the new coordinates back to old coordinates
        # The insertion happens at new_start, so we look around old_start
        context_lines = 3

        # For pure additions, old_start is where the insertion point was in HEAD
        start_line = max(1, hunk.old_start - context_lines)
        end_line = hunk.old_start + context_lines

        success, blame_output = self.git_ops._run_git_command(
            "blame", f"-L{start_line},{end_line}", "HEAD", "--", hunk.file_path
        )

        if not success:
            return []

        return self._parse_blame_output(blame_output)

    def _get_contextual_lines_for_hunk(
        self, hunk: DiffHunk, context_lines: int = CONTEXTUAL_BLAME_LINES
    ) -> List[int]:
        """Get meaningful (non-whitespace) line numbers around a hunk.

        Uses OLD coordinates since git blame operates on HEAD (pre-change) state.
        For pure additions, searches around the insertion point in the original file.

        Args:
            hunk: DiffHunk to get context for
            context_lines: Number of lines above/below to consider

        Returns:
            List of line numbers that are meaningful context lines in HEAD
        """
        context_lines = min(context_lines, MAX_CONTEXTUAL_BLAME_LINES)

        # Use OLD coordinates for git blame HEAD compatibility
        if hunk.has_deletions or hunk.old_count > 0:
            # For modifications/deletions, use old coordinates
            base_start = hunk.old_start
            base_count = hunk.old_count
        else:
            # For pure additions, use insertion point in original file
            base_start = hunk.old_start
            base_count = 0  # No lines existed at insertion point

        # Get file line count to respect boundaries
        file_line_count = self._get_file_line_count(hunk.file_path)

        # Calculate context range around the hunk
        if base_count > 0:
            # For modifications/deletions: search around existing lines
            start_line = max(1, base_start - context_lines)
            end_line = min(file_line_count, base_start + base_count + context_lines - 1)
        else:
            # For pure additions: search around insertion point
            start_line = max(1, base_start - context_lines)
            end_line = min(file_line_count, base_start + context_lines)

        # Get all potential context lines
        potential_lines = list(range(start_line, end_line + 1))

        # Filter out whitespace-only lines
        meaningful_lines = self._filter_meaningful_lines(
            hunk.file_path, potential_lines
        )

        return meaningful_lines

    def _try_contextual_blame_fallback(
        self, hunk: DiffHunk, branch_commits: Set[str]
    ) -> Optional[HunkTargetMapping]:
        """Try contextual blame as fallback and return mapping if successful.

        Args:
            hunk: DiffHunk to analyze
            branch_commits: Set of commit hashes within branch scope

        Returns:
            HunkTargetMapping if contextual blame finds targets, None otherwise
        """
        contextual_blame_info = self._get_contextual_blame(hunk)

        if not contextual_blame_info:
            return None

        # Filter contextual blame to branch commits
        contextual_relevant_blame = [
            info for info in contextual_blame_info if info.commit_hash in branch_commits
        ]

        if contextual_relevant_blame:
            # Found contextual matches - analyze them
            return self._create_contextual_mapping(hunk, contextual_relevant_blame)

        return None

    def _get_file_line_count(self, file_path: str) -> int:
        """Get total line count for a file with safe fallback strategies.

        Args:
            file_path: Path to file

        Returns:
            Number of lines in file
        """
        if file_path in self._file_line_count_cache:
            return self._file_line_count_cache[file_path]

        # Try multiple approaches in order of preference
        line_count = self._try_get_line_count_from_head(file_path)
        if line_count is None:
            line_count = self._try_get_line_count_from_working_tree(file_path)
        if line_count is None:
            line_count = self._get_conservative_line_count_from_diff(file_path)

        # Cache and return
        self._file_line_count_cache[file_path] = line_count
        return line_count

    def _try_get_line_count_from_head(self, file_path: str) -> Optional[int]:
        """Try to get line count from HEAD version of file.

        Args:
            file_path: Path to file

        Returns:
            Line count if successful, None otherwise
        """
        success, output = self.git_ops._run_git_command("show", f"HEAD:{file_path}")
        if success and output is not None:
            return len(output.split("\n")) if output else 1
        return None

    def _try_get_line_count_from_working_tree(self, file_path: str) -> Optional[int]:
        """Try to get line count from working tree version of file.

        Args:
            file_path: Path to file

        Returns:
            Line count if successful, None otherwise
        """
        import os

        try:
            full_path = os.path.join(self.git_ops.repo_path, file_path)
            if os.path.exists(full_path):
                with open(full_path, "r", encoding="utf-8", errors="ignore") as f:
                    return sum(1 for _ in f)
        except (OSError, IOError):
            pass
        return None

    def _get_conservative_line_count_from_diff(self, file_path: str) -> int:
        """Get conservative line count based on diff information.

        Args:
            file_path: Path to file

        Returns:
            Conservative estimate of line count
        """
        success, output = self.git_ops._run_git_command(
            "diff", f"{self.merge_base}..HEAD", "--", file_path
        )

        if success and output:
            # Parse diff to find highest line number mentioned
            max_line = 0
            for line in output.split("\n"):
                if line.startswith("@@"):
                    # Parse hunk header: @@ -old_start,old_count +new_start,new_count @@
                    import re

                    match = re.match(
                        r"@@\s+-(\d+)(?:,(\d+))?\s+\+(\d+)(?:,(\d+))?\s+@@", line
                    )
                    if match:
                        new_start = int(match.group(3))
                        new_count = int(match.group(4)) if match.group(4) else 1
                        max_line = max(max_line, new_start + new_count - 1)

            if max_line > 0:
                return max_line + 10  # Add buffer for safety

        # Ultra-conservative fallback
        return 50

    def _filter_meaningful_lines(
        self, file_path: str, line_numbers: List[int]
    ) -> List[int]:
        """Filter out whitespace-only lines from a list of line numbers.

        Args:
            file_path: Path to file
            line_numbers: List of line numbers to check

        Returns:
            List of line numbers that contain meaningful content
        """
        if not line_numbers:
            return []

        meaningful_lines = []

        # Get the file content to check line contents
        success, file_content = self.git_ops._run_git_command(
            "show", f"HEAD:{file_path}"
        )
        if not success:
            # If we can't read the file, assume all lines are meaningful
            return line_numbers

        lines = file_content.split("\n")

        for line_num in line_numbers:
            if 1 <= line_num <= len(lines):
                line_content = lines[line_num - 1]  # Convert to 0-based indexing
                # Skip empty lines and whitespace-only lines
                if line_content.strip():
                    meaningful_lines.append(line_num)

        return meaningful_lines

    def _get_contextual_blame(self, hunk: DiffHunk) -> List[BlameInfo]:
        """Get blame information for meaningful context lines around a hunk.

        This is used as a fallback when primary blame analysis fails.
        It searches ±1 line (excluding whitespace) for blame information.
        Uses batch operations for improved performance.

        Args:
            hunk: DiffHunk to get contextual blame for

        Returns:
            List of BlameInfo objects from context lines
        """
        context_lines = self._get_contextual_lines_for_hunk(hunk)
        if not context_lines:
            return []

        # Convert individual lines to ranges for batch blame
        line_ranges = [(line_num, line_num) for line_num in context_lines]

        # Use batch blame operation
        batch_blame_info = self.batch_ops.batch_blame_lines(hunk.file_path, line_ranges)

        # Convert batch blame info to regular blame info and expand hashes
        all_blame_info = []
        short_hashes = []

        for line_num in context_lines:
            if line_num in batch_blame_info:
                batch_info = batch_blame_info[line_num]
                short_hashes.append(batch_info.commit_hash)

        # Batch expand hashes
        expanded_hashes = (
            self.batch_ops.batch_expand_hashes(short_hashes) if short_hashes else {}
        )

        # Create BlameInfo objects with expanded hashes
        for line_num in context_lines:
            if line_num in batch_blame_info:
                batch_info = batch_blame_info[line_num]
                expanded_hash = expanded_hashes.get(
                    batch_info.commit_hash, batch_info.commit_hash
                )

                blame_info = BlameInfo(
                    commit_hash=expanded_hash,
                    author=batch_info.author,
                    timestamp=batch_info.timestamp,
                    line_number=batch_info.line_number,
                    line_content=batch_info.line_content,
                )
                all_blame_info.append(blame_info)

        return all_blame_info

    def _get_blame_for_single_line(
        self, file_path: str, line_num: int
    ) -> List[BlameInfo]:
        """Get blame information for a single line.

        Args:
            file_path: Path to file
            line_num: Line number to get blame for

        Returns:
            List of BlameInfo objects (usually just one)
        """
        success, blame_output = self.git_ops._run_git_command(
            "blame", f"-L{line_num},{line_num}", "HEAD", "--", file_path
        )

        if not success:
            return []

        return self._parse_blame_output(blame_output)

    def _create_contextual_mapping(
        self, hunk: DiffHunk, contextual_blame: List[BlameInfo]
    ) -> HunkTargetMapping:
        """Create a mapping from contextual blame analysis.

        Args:
            hunk: DiffHunk that was analyzed
            contextual_blame: List of BlameInfo from context lines

        Returns:
            HunkTargetMapping with contextual target commit
        """
        # Group by commit and count occurrences (same logic as primary blame)
        commit_counts: Dict[str, int] = {}
        for info in contextual_blame:
            commit_counts[info.commit_hash] = commit_counts.get(info.commit_hash, 0) + 1

        # Find most frequent commit, break ties by recency
        most_frequent_commit, max_count = max(
            commit_counts.items(),
            key=lambda x: (x[1], self._get_commit_timestamp(x[0])),
        )

        total_lines = len(contextual_blame)
        confidence_ratio = max_count / total_lines

        # Contextual matches have slightly lower confidence than direct matches
        if confidence_ratio >= 0.8:
            confidence = "medium"  # Reduced from "high"
        elif confidence_ratio >= 0.5:
            confidence = "medium"
        else:
            confidence = "low"

        # Store successful target for file consistency
        self._file_target_cache[hunk.file_path] = most_frequent_commit

        return HunkTargetMapping(
            hunk=hunk,
            target_commit=most_frequent_commit,
            confidence=confidence,
            blame_info=contextual_blame,
            targeting_method=TargetingMethod.CONTEXTUAL_BLAME_MATCH,
            needs_user_selection=False,
        )

    def _parse_blame_output(self, blame_output: str) -> List[BlameInfo]:
        """Parse git blame output into BlameInfo objects with batch hash expansion.

        Args:
            blame_output: Raw git blame output

        Returns:
            List of parsed BlameInfo objects with expanded commit hashes
        """
        # First pass: collect all blame entries with short hashes
        raw_blame_infos = []
        short_hashes = []

        for line in blame_output.split("\n"):
            if not line.strip():
                continue

            # Parse blame line format:
            # commit_hash (author timestamp line_num) line_content
            match = re.match(
                r"^([a-f0-9]+)\s+\(([^)]+)\s+(\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2} [+-]\d{4})\s+(\d+)\)\s*(.*)",
                line,
            )
            if match:
                commit_hash = match.group(1)
                author = match.group(2).strip()
                timestamp = match.group(3)
                line_number = int(match.group(4))
                line_content = match.group(5)

                raw_blame_infos.append(
                    {
                        "commit_hash": commit_hash,
                        "author": author,
                        "timestamp": timestamp,
                        "line_number": line_number,
                        "line_content": line_content,
                    }
                )

                if len(commit_hash) < 40:  # Short hash
                    short_hashes.append(commit_hash)

        # Batch expand short hashes
        expanded_hashes = (
            self.batch_ops.batch_expand_hashes(short_hashes) if short_hashes else {}
        )

        # Second pass: create BlameInfo objects with expanded hashes
        blame_infos = []
        for raw_info in raw_blame_infos:
            commit_hash = str(raw_info["commit_hash"])
            full_commit_hash = expanded_hashes.get(commit_hash, commit_hash)

            blame_info = BlameInfo(
                commit_hash=full_commit_hash,
                author=str(raw_info["author"]),
                timestamp=str(raw_info["timestamp"]),
                line_number=int(raw_info["line_number"]),
                line_content=str(raw_info["line_content"]),
            )
            blame_infos.append(blame_info)

        return blame_infos

    def _get_branch_commits(self) -> Set[str]:
        """Get all commits on current branch since merge base.

        Returns:
            Set of commit hashes within branch scope
        """
        if self._branch_commits_cache is not None:
            return self._branch_commits_cache

        branch_commits = self.batch_ops.get_branch_commits()
        self._branch_commits_cache = set(branch_commits)
        return self._branch_commits_cache

    def _get_commit_timestamp(self, commit_hash: str) -> int:
        """Get timestamp of a commit for recency comparison.

        Args:
            commit_hash: Commit hash to get timestamp for

        Returns:
            Unix timestamp of the commit
        """
        # Use batch operations for better performance
        commit_info = self.batch_ops.batch_load_commit_info([commit_hash])
        if commit_hash in commit_info:
            return commit_info[commit_hash].timestamp
        return 0

    def get_commit_summary(self, commit_hash: str) -> str:
        """Get a short summary of a commit for display.

        Args:
            commit_hash: Commit hash to summarize

        Returns:
            Short commit summary (hash + subject)
        """
        commit_info = self.batch_ops.batch_load_commit_info([commit_hash])
        if commit_hash in commit_info:
            info = commit_info[commit_hash]
            return f"{info.short_hash} {info.subject}"
        return commit_hash[:8]

    def _is_new_file(self, file_path: str) -> bool:
        """Check if a file is new (didn't exist at merge-base).

        Args:
            file_path: Path to check

        Returns:
            True if file is new, False if it existed at merge-base
        """
        if self._new_files_cache is None:
            self._new_files_cache = self.batch_ops.get_new_files()

        return file_path in self._new_files_cache

    def _create_fallback_mapping(
        self,
        hunk: DiffHunk,
        method: TargetingMethod,
        blame_info: Optional[List[BlameInfo]] = None,
    ) -> HunkTargetMapping:
        """Create a fallback mapping that needs user selection.

        Args:
            hunk: DiffHunk to create mapping for
            method: Fallback method used
            blame_info: Optional blame info if available

        Returns:
            HunkTargetMapping with fallback candidates
        """
        candidates = self._get_fallback_candidates(hunk.file_path, method)

        return HunkTargetMapping(
            hunk=hunk,
            target_commit=None,
            confidence="low",
            blame_info=blame_info or [],
            targeting_method=method,
            fallback_candidates=candidates,
            needs_user_selection=True,
        )

    def _get_fallback_candidates(
        self, file_path: str, method: TargetingMethod
    ) -> List[str]:
        """Get prioritized list of candidate commits for fallback scenarios.

        Args:
            file_path: Path of the file being processed
            method: Fallback method to determine candidate ordering

        Returns:
            List of commit hashes ordered by priority
        """
        branch_commits = self._get_ordered_branch_commits()

        if method == TargetingMethod.FALLBACK_NEW_FILE:
            # For new files, just return recent commits first, merges last
            return branch_commits

        elif method == TargetingMethod.FALLBACK_EXISTING_FILE:
            # For existing files, prioritize commits that touched this file
            file_commits = self._get_commits_touching_file(file_path)
            other_commits = [c for c in branch_commits if c not in file_commits]
            return file_commits + other_commits

        return branch_commits

    def _get_ordered_branch_commits(self) -> List[str]:
        """Get branch commits ordered by recency, with merge commits last.

        Returns:
            List of commit hashes ordered by priority
        """
        branch_commits = list(self._get_branch_commits())
        if not branch_commits:
            return []

        # Use batch operations to get ordered commits
        ordered_commits = self.batch_ops.get_ordered_commits_by_recency(branch_commits)
        return [commit.commit_hash for commit in ordered_commits]

    def _get_commits_touching_file(self, file_path: str) -> List[str]:
        """Get commits that modified a specific file, ordered by recency.

        Args:
            file_path: Path to check for modifications

        Returns:
            List of commit hashes that touched the file
        """
        return self.batch_ops.get_commits_touching_file(file_path)

    def _is_merge_commit(self, commit_hash: str) -> bool:
        """Check if a commit is a merge commit.

        Args:
            commit_hash: Commit to check

        Returns:
            True if commit is a merge commit
        """
        commit_info = self.batch_ops.batch_load_commit_info([commit_hash])
        if commit_hash in commit_info:
            return commit_info[commit_hash].is_merge
        return False

    def set_target_for_file(self, file_path: str, target_commit: str) -> None:
        """Set target commit for a file to ensure consistency.

        Args:
            file_path: File path
            target_commit: Commit hash to use as target
        """
        self._file_target_cache[file_path] = target_commit

    def clear_file_cache(self) -> None:
        """Clear the file target cache for a fresh analysis."""
        self._file_target_cache.clear()
        self.batch_ops.clear_caches()
        self._branch_commits_cache = None
        self._new_files_cache = None
        self._file_line_count_cache.clear()
Functions
__init__(git_ops: GitOps, merge_base: str) -> None

Initialize BlameAnalyzer.

Parameters:

Name Type Description Default
git_ops GitOps

GitOps instance for running git commands

required
merge_base str

Merge base commit hash to limit scope

required
Source code in src/git_autosquash/blame_analyzer.py
def __init__(self, git_ops: GitOps, merge_base: str) -> None:
    """Initialize BlameAnalyzer.

    Args:
        git_ops: GitOps instance for running git commands
        merge_base: Merge base commit hash to limit scope
    """
    self.git_ops = git_ops
    self.merge_base = merge_base
    self.batch_ops = BatchGitOperations(git_ops, merge_base)
    self._branch_commits_cache: Optional[Set[str]] = None
    self._commit_timestamp_cache: Dict[str, int] = {}
    self._file_target_cache: Dict[str, str] = {}  # Track previous targets by file
    self._new_files_cache: Optional[Set[str]] = None
    self._file_line_count_cache: Dict[str, int] = {}  # Cache file line counts
analyze_hunks(hunks: List[DiffHunk]) -> List[HunkTargetMapping]

Analyze hunks and determine target commits for each.

Parameters:

Name Type Description Default
hunks List[DiffHunk]

List of DiffHunk objects to analyze

required

Returns:

Type Description
List[HunkTargetMapping]

List of HunkTargetMapping objects with target commit information

Source code in src/git_autosquash/blame_analyzer.py
def analyze_hunks(self, hunks: List[DiffHunk]) -> List[HunkTargetMapping]:
    """Analyze hunks and determine target commits for each.

    Args:
        hunks: List of DiffHunk objects to analyze

    Returns:
        List of HunkTargetMapping objects with target commit information
    """
    mappings = []

    for hunk in hunks:
        mapping = self._analyze_single_hunk(hunk)
        mappings.append(mapping)

    return mappings
get_commit_summary(commit_hash: str) -> str

Get a short summary of a commit for display.

Parameters:

Name Type Description Default
commit_hash str

Commit hash to summarize

required

Returns:

Type Description
str

Short commit summary (hash + subject)

Source code in src/git_autosquash/blame_analyzer.py
def get_commit_summary(self, commit_hash: str) -> str:
    """Get a short summary of a commit for display.

    Args:
        commit_hash: Commit hash to summarize

    Returns:
        Short commit summary (hash + subject)
    """
    commit_info = self.batch_ops.batch_load_commit_info([commit_hash])
    if commit_hash in commit_info:
        info = commit_info[commit_hash]
        return f"{info.short_hash} {info.subject}"
    return commit_hash[:8]
set_target_for_file(file_path: str, target_commit: str) -> None

Set target commit for a file to ensure consistency.

Parameters:

Name Type Description Default
file_path str

File path

required
target_commit str

Commit hash to use as target

required
Source code in src/git_autosquash/blame_analyzer.py
def set_target_for_file(self, file_path: str, target_commit: str) -> None:
    """Set target commit for a file to ensure consistency.

    Args:
        file_path: File path
        target_commit: Commit hash to use as target
    """
    self._file_target_cache[file_path] = target_commit
clear_file_cache() -> None

Clear the file target cache for a fresh analysis.

Source code in src/git_autosquash/blame_analyzer.py
def clear_file_cache(self) -> None:
    """Clear the file target cache for a fresh analysis."""
    self._file_target_cache.clear()
    self.batch_ops.clear_caches()
    self._branch_commits_cache = None
    self._new_files_cache = None
    self._file_line_count_cache.clear()

RebaseManager

Orchestrates interactive rebase operations to apply approved hunks.

git_autosquash.rebase_manager

Interactive rebase manager for applying hunk mappings to historical commits.

Classes
RebaseConflictError

Bases: Exception

Raised when rebase encounters conflicts that need user resolution.

Source code in src/git_autosquash/rebase_manager.py
class RebaseConflictError(Exception):
    """Raised when rebase encounters conflicts that need user resolution."""

    def __init__(self, message: str, conflicted_files: List[str]) -> None:
        """Initialize conflict error.

        Args:
            message: Error message
            conflicted_files: List of files with conflicts
        """
        super().__init__(message)
        self.conflicted_files = conflicted_files
Functions
__init__(message: str, conflicted_files: List[str]) -> None

Initialize conflict error.

Parameters:

Name Type Description Default
message str

Error message

required
conflicted_files List[str]

List of files with conflicts

required
Source code in src/git_autosquash/rebase_manager.py
def __init__(self, message: str, conflicted_files: List[str]) -> None:
    """Initialize conflict error.

    Args:
        message: Error message
        conflicted_files: List of files with conflicts
    """
    super().__init__(message)
    self.conflicted_files = conflicted_files
RebaseManager

Manages interactive rebase operations for squashing hunks to commits.

Source code in src/git_autosquash/rebase_manager.py
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
class RebaseManager:
    """Manages interactive rebase operations for squashing hunks to commits."""

    def __init__(self, git_ops: GitOps, merge_base: str) -> None:
        """Initialize rebase manager.

        Args:
            git_ops: Git operations handler
            merge_base: Merge base commit hash
        """
        self.git_ops = git_ops
        self.merge_base = merge_base
        self._stash_ref: Optional[str] = None
        self._original_branch: Optional[str] = None
        self._batch_ops: Optional[BatchGitOperations] = None

    def execute_squash(self, mappings: List[HunkTargetMapping]) -> bool:
        """Execute the squash operation for approved mappings.

        Args:
            mappings: List of approved hunk to commit mappings

        Returns:
            True if successful, False if user aborted

        Raises:
            RebaseConflictError: If conflicts occur during rebase
            subprocess.SubprocessError: If git operations fail
        """
        if not mappings:
            return True

        # Store original branch for cleanup
        self._original_branch = self.git_ops.get_current_branch()
        if not self._original_branch:
            raise ValueError("Cannot determine current branch")

        try:
            # Group hunks by target commit
            commit_hunks = self._group_hunks_by_commit(mappings)

            # Check working tree state and handle stashing if needed
            self._handle_working_tree_state()

            # Execute rebase for each target commit
            target_commits = self._get_commit_order(set(commit_hunks.keys()))
            print(
                f"DEBUG: Processing {len(target_commits)} target commits in order: {[c[:8] for c in target_commits]}"
            )

            for target_commit in target_commits:
                hunks = commit_hunks[target_commit]
                print(
                    f"DEBUG: Processing target commit {target_commit[:8]} with {len(hunks)} hunks"
                )
                success = self._apply_hunks_to_commit(target_commit, hunks)
                if not success:
                    print(f"DEBUG: Failed to apply hunks to commit {target_commit[:8]}")
                    return False
                print(
                    f"DEBUG: Successfully applied hunks to commit {target_commit[:8]}"
                )
                print("=" * 80)

            return True

        except Exception:
            # Cleanup on any error
            self._cleanup_on_error()
            raise

    def _group_hunks_by_commit(
        self, mappings: List[HunkTargetMapping]
    ) -> Dict[str, List[DiffHunk]]:
        """Group hunks by their target commit.

        Args:
            mappings: List of hunk to commit mappings

        Returns:
            Dictionary mapping commit hash to list of hunks
        """
        commit_hunks: Dict[str, List[DiffHunk]] = {}

        for mapping in mappings:
            if mapping.target_commit:
                commit_hash = mapping.target_commit
                if commit_hash not in commit_hunks:
                    commit_hunks[commit_hash] = []
                commit_hunks[commit_hash].append(mapping.hunk)

        return commit_hunks

    def _get_commit_order(self, commit_hashes: Set[str]) -> List[str]:
        """Get commits in git topological order (oldest first).

        Args:
            commit_hashes: Set of commit hashes to order

        Returns:
            List of commit hashes in git topological order (oldest first)
        """
        # Lazy initialize batch operations
        if self._batch_ops is None:
            self._batch_ops = BatchGitOperations(self.git_ops, self.merge_base)

        # Get all branch commits in topological order (newest first)
        all_branch_commits = self._batch_ops.get_branch_commits()

        # Filter to only the commits we need and reverse to get oldest first
        ordered_commits = []
        for commit_hash in reversed(all_branch_commits):
            if commit_hash in commit_hashes:
                ordered_commits.append(commit_hash)

        # Handle any commits not found in branch (shouldn't happen, but be safe)
        missing_commits = commit_hashes - set(ordered_commits)
        if missing_commits:
            ordered_commits.extend(sorted(missing_commits))

        return ordered_commits

    def _handle_working_tree_state(self) -> None:
        """Handle working tree state before rebase."""
        status = self.git_ops.get_working_tree_status()

        if not status["is_clean"]:
            # Stash any uncommitted changes
            result = self.git_ops.run_git_command(
                ["stash", "push", "-m", "git-autosquash temp stash"]
            )
            if result.returncode == 0:
                self._stash_ref = "stash@{0}"
            else:
                raise subprocess.SubprocessError(
                    f"Failed to stash changes: {result.stderr}"
                )

    def _apply_hunks_to_commit(self, target_commit: str, hunks: List[DiffHunk]) -> bool:
        """Apply hunks to a specific commit via interactive rebase.

        Args:
            target_commit: Target commit hash
            hunks: List of hunks to apply to this commit

        Returns:
            True if successful, False if user aborted
        """
        print(f"DEBUG: Applying {len(hunks)} hunks to commit {target_commit[:8]}")
        for i, hunk in enumerate(hunks):
            print(
                f"DEBUG: Hunk {i + 1}: {hunk.file_path} @@ {hunk.lines[0] if hunk.lines else 'empty'}"
            )

        # Start interactive rebase to edit the target commit
        print(f"DEBUG: Starting interactive rebase to edit {target_commit[:8]}")
        if not self._start_rebase_edit(target_commit):
            print("DEBUG: Failed to start rebase edit")
            return False

        print("DEBUG: Interactive rebase started successfully")

        # Check what commit we're actually at
        result = self.git_ops.run_git_command(["rev-parse", "HEAD"])
        if result.returncode == 0:
            current_head = result.stdout.strip()
            print(f"DEBUG: Current HEAD during rebase: {current_head[:8]}")
            print(f"DEBUG: Target commit: {target_commit[:8]}")
            if current_head != target_commit:
                print(
                    f"DEBUG: WARNING - HEAD mismatch! We're at {current_head[:8]} but expected {target_commit[:8]}"
                )

        # Check the actual file content at lines 87 and 111
        try:
            with open("shared/runtime/pyexec.c", "r") as f:
                lines = f.readlines()
                if len(lines) >= 87:
                    print(f"DEBUG: Line 87 content: '{lines[86].strip()}'")
                if len(lines) >= 111:
                    print(f"DEBUG: Line 111 content: '{lines[110].strip()}'")
        except Exception as e:
            print(f"DEBUG: Failed to read file content: {e}")

        try:
            # Create patch with corrected line numbers for target commit
            print("DEBUG: Applying patch with corrected line numbers")
            patch_content = self._create_corrected_patch_for_hunks(hunks, target_commit)
            print(
                f"DEBUG: Created corrected patch content ({len(patch_content)} chars):"
            )
            print("=" * 50)
            print(patch_content)
            print("=" * 50)
            self._apply_patch(patch_content)
            print("DEBUG: Patch applied successfully")

            # Amend the commit
            print("DEBUG: Amending commit with changes")
            self._amend_commit()
            print("DEBUG: Commit amended successfully")

            # Continue the rebase
            print("DEBUG: Continuing rebase")
            self._continue_rebase()
            print("DEBUG: Rebase continued successfully")

            return True

        except RebaseConflictError:
            # Let the exception propagate for user handling
            raise
        except Exception as e:
            # Abort rebase on unexpected errors
            print(f"DEBUG: Exception occurred during rebase: {e}")
            print(f"DEBUG: Exception type: {type(e)}")
            self._abort_rebase()
            raise subprocess.SubprocessError(f"Failed to apply changes: {e}")

    def _apply_hunks_directly(self, hunks: List[DiffHunk]) -> None:
        """Apply hunks directly to files by modifying the file content.

        Args:
            hunks: List of hunks to apply

        Raises:
            subprocess.SubprocessError: If file modifications fail
        """
        print(f"DEBUG: Starting direct application of {len(hunks)} hunks")

        # Group hunks by file
        files_to_hunks: Dict[str, List[DiffHunk]] = {}
        for hunk in hunks:
            if hunk.file_path not in files_to_hunks:
                files_to_hunks[hunk.file_path] = []
            files_to_hunks[hunk.file_path].append(hunk)

        for file_path, file_hunks in files_to_hunks.items():
            print(f"DEBUG: Processing file {file_path} with {len(file_hunks)} hunks")
            self._apply_hunks_to_file(file_path, file_hunks)

    def _apply_hunks_to_file(self, file_path: str, hunks: List[DiffHunk]) -> None:
        """Apply hunks directly to a specific file.

        Args:
            file_path: Path to the file to modify
            hunks: List of hunks to apply to this file

        Raises:
            subprocess.SubprocessError: If file modification fails
        """

        print(f"DEBUG: Reading current content of {file_path}")
        try:
            with open(file_path, "r") as f:
                content = f.read()
        except IOError as e:
            raise subprocess.SubprocessError(f"Failed to read {file_path}: {e}")

        print(f"DEBUG: Original file has {len(content.splitlines())} lines")

        # Apply each hunk's changes
        modified_content = content
        for i, hunk in enumerate(hunks):
            print(f"DEBUG: Applying hunk {i + 1} to {file_path}")
            modified_content = self._apply_single_hunk_to_content(
                modified_content, hunk
            )

        print(f"DEBUG: Modified file has {len(modified_content.splitlines())} lines")

        # Write the modified content back
        try:
            with open(file_path, "w") as f:
                f.write(modified_content)
            print(f"DEBUG: Successfully wrote modified content to {file_path}")
        except IOError as e:
            raise subprocess.SubprocessError(f"Failed to write {file_path}: {e}")

    def _apply_single_hunk_to_content(self, content: str, hunk: DiffHunk) -> str:
        """Apply a single hunk's changes to file content.

        Args:
            content: Original file content
            hunk: Hunk to apply

        Returns:
            Modified content with hunk changes applied
        """
        # For this specific case, we know the hunks are simple replacements
        # MICROPY_PY___FILE__ -> MICROPY_MODULE___FILE__

        # Extract the actual change from the hunk lines
        old_pattern = None
        new_replacement = None

        for line in hunk.lines:
            if line.startswith("-") and "MICROPY_PY___FILE__" in line:
                # Extract the line without the '-' prefix and clean whitespace
                old_pattern = line[1:].strip()
            elif line.startswith("+") and "MICROPY_MODULE___FILE__" in line:
                # Extract the line without the '+' prefix and clean whitespace
                new_replacement = line[1:].strip()

        if old_pattern and new_replacement:
            print(f"DEBUG: Replacing '{old_pattern}' with '{new_replacement}'")
            # Use exact string replacement
            modified = content.replace(old_pattern, new_replacement)
            if modified != content:
                print("DEBUG: Successfully applied replacement")
                return modified
            else:
                print("DEBUG: Warning: No replacement made - pattern not found")

        print("DEBUG: Could not extract clear replacement pattern from hunk")
        return content

    def _consolidate_hunks_by_file(
        self, hunks: List[DiffHunk]
    ) -> Dict[str, List[DiffHunk]]:
        """Group hunks by file and detect potential conflicts."""
        files_to_hunks = {}
        for hunk in hunks:
            if hunk.file_path not in files_to_hunks:
                files_to_hunks[hunk.file_path] = []
            files_to_hunks[hunk.file_path].append(hunk)
        return files_to_hunks

    def _extract_hunk_changes(self, hunk: DiffHunk) -> List[Dict]:
        """Extract all changes from a hunk, handling multiple changes per hunk.

        Returns:
            List of change dictionaries with 'old_line', 'new_line', and 'context'
        """
        changes = []
        current_change = {}

        for line in hunk.lines:
            if line.startswith("@@"):
                continue
            elif line.startswith("-") and not line.startswith("---"):
                current_change["old_line"] = line[1:].rstrip("\n")
            elif line.startswith("+") and not line.startswith("+++"):
                current_change["new_line"] = line[1:].rstrip("\n")
                # If we have both old and new, add the change
                if "old_line" in current_change:
                    changes.append(current_change.copy())
                    current_change = {}

        return changes

    def _find_target_with_context(
        self, change: Dict, file_lines: List[str], used_lines: Set[int]
    ) -> Optional[int]:
        """Find target line using context awareness to avoid duplicates.

        Args:
            change: Dictionary with 'old_line' and 'new_line'
            file_lines: Current file content
            used_lines: Set of line numbers already processed

        Returns:
            Target line number (1-based) or None if not found
        """
        old_line = change["old_line"].strip()
        candidates = []

        # Find all possible matches
        for i, file_line in enumerate(file_lines):
            line_num = i + 1  # 1-based
            if (
                file_line.rstrip("\n").strip() == old_line
                and line_num not in used_lines
            ):
                candidates.append(line_num)

        if not candidates:
            print(f"DEBUG: No unused matches found for line: '{old_line}'")
            return None

        if len(candidates) == 1:
            print(f"DEBUG: Found unique match at line {candidates[0]}")
            return candidates[0]

        # Multiple candidates - this is where we had the issue before
        print(f"DEBUG: Multiple candidates for '{old_line}': {candidates}")
        print(f"DEBUG: Used lines: {sorted(used_lines)}")

        # For now, use the first unused candidate
        # TODO: Could add more sophisticated context matching here
        selected = candidates[0]
        print(f"DEBUG: Selected first unused candidate: {selected}")
        return selected

    def _create_corrected_patch_for_hunks(
        self, hunks: List[DiffHunk], target_commit: str
    ) -> str:
        """Create a patch with line numbers corrected for the target commit state.
        Uses context-aware matching to avoid duplicate hunk conflicts.

        Args:
            hunks: List of hunks to include in patch
            target_commit: Target commit hash

        Returns:
            Patch content with corrected line numbers
        """
        print(
            f"DEBUG: Creating corrected patch for {len(hunks)} hunks targeting {target_commit[:8]}"
        )

        # Group hunks by file
        files_to_hunks: Dict[str, List[DiffHunk]] = self._consolidate_hunks_by_file(hunks)

        patch_lines = []

        for file_path, file_hunks in files_to_hunks.items():
            print(f"DEBUG: Processing {len(file_hunks)} hunks for file {file_path}")

            # Add file header
            patch_lines.extend([f"--- a/{file_path}", f"+++ b/{file_path}"])

            # Read the current file content to find correct line numbers
            try:
                with open(file_path, "r") as f:
                    file_lines = f.readlines()
                print(f"DEBUG: Read {len(file_lines)} lines from {file_path}")
            except IOError as e:
                print(f"DEBUG: Failed to read {file_path}: {e}")
                continue

            # Track which lines we've already used to prevent duplicates
            used_lines: Set[int] = set()

            # Extract all changes from all hunks for this file
            all_changes = []
            for hunk in file_hunks:
                changes = self._extract_hunk_changes(hunk)
                for change in changes:
                    change["original_hunk"] = hunk
                    all_changes.append(change)

            print(f"DEBUG: Extracted {len(all_changes)} total changes for {file_path}")

            # Process each change with context awareness
            for change in all_changes:
                target_line_num = self._find_target_with_context(
                    change, file_lines, used_lines
                )
                if target_line_num:
                    used_lines.add(target_line_num)
                    corrected_hunk = self._create_corrected_hunk_for_change(
                        change, target_line_num, file_lines
                    )
                    if corrected_hunk:
                        patch_lines.extend(corrected_hunk)

        return "\n".join(patch_lines) + "\n"

    def _create_corrected_hunk_for_change(
        self, change: Dict, target_line_num: int, file_lines: List[str]
    ) -> List[str]:
        """Create a corrected hunk for a single change at a specific line number.

        Args:
            change: Dictionary with 'old_line' and 'new_line'
            target_line_num: Target line number (1-based)
            file_lines: Current file content

        Returns:
            List of hunk lines for this change
        """
        new_line = change["new_line"]

        # Create context around the target line (3 lines before and after)
        context_start = max(1, target_line_num - 3)
        context_end = min(len(file_lines), target_line_num + 3)

        print(
            f"DEBUG: Creating hunk for change at line {target_line_num}, context {context_start}-{context_end}"
        )

        # Build the hunk header
        old_count = context_end - context_start + 1
        new_count = old_count  # Same count since we're replacing one line
        hunk_lines = []
        hunk_lines.append(
            f"@@ -{context_start},{old_count} +{context_start},{new_count} @@ "
        )

        # Build the hunk content
        for line_num in range(context_start, context_end + 1):
            if line_num > len(file_lines):
                break

            file_line = file_lines[line_num - 1].rstrip(
                "\n"
            )  # Convert to 0-based and remove newline

            if line_num == target_line_num:
                # This is the line to change
                hunk_lines.append(f"-{file_line}")
                hunk_lines.append(f"+{new_line}")
            else:
                # Context line
                hunk_lines.append(f" {file_line}")

        return hunk_lines

    def _create_corrected_hunk(
        self, hunk: DiffHunk, file_lines: List[str], file_path: str
    ) -> List[str]:
        """Create a corrected hunk with proper line numbers for the current file state.

        Args:
            hunk: Original hunk
            file_lines: Current file content as list of lines
            file_path: Path to the file

        Returns:
            List of corrected hunk lines
        """
        # Extract the old and new content from the hunk
        old_line = None
        new_line = None

        for line in hunk.lines:
            if line.startswith("-") and "MICROPY_PY___FILE__" in line:
                old_line = line[1:].rstrip("\n")  # Remove '-' and trailing newline
            elif line.startswith("+") and "MICROPY_MODULE___FILE__" in line:
                new_line = line[1:].rstrip("\n")  # Remove '+' and trailing newline

        if not old_line or not new_line:
            print("DEBUG: Could not extract old/new lines from hunk")
            return []

        print(f"DEBUG: Looking for line: '{old_line.strip()}'")

        # Find the line number in the current file
        target_line_num = None
        for i, file_line in enumerate(file_lines):
            if file_line.rstrip("\n").strip() == old_line.strip():
                target_line_num = i + 1  # Convert to 1-based line numbering
                print(f"DEBUG: Found target line at line {target_line_num}")
                break

        if target_line_num is None:
            print("DEBUG: Could not find target line in current file")
            return []

        # Create context around the target line (3 lines before and after)
        context_start = max(1, target_line_num - 3)
        context_end = min(len(file_lines), target_line_num + 3)

        print(
            f"DEBUG: Creating hunk for lines {context_start}-{context_end}, changing line {target_line_num}"
        )

        # Build the hunk
        hunk_lines = []
        hunk_lines.append(
            f"@@ -{context_start},{context_end - context_start + 1} +{context_start},{context_end - context_start + 1} @@ "
        )

        for line_num in range(context_start, context_end + 1):
            if line_num > len(file_lines):
                break

            file_line = file_lines[line_num - 1].rstrip(
                "\n"
            )  # Convert to 0-based and remove newline

            if line_num == target_line_num:
                # This is the line to change
                hunk_lines.append(f"-{file_line}")
                hunk_lines.append(f"+{new_line}")
            else:
                # Context line
                hunk_lines.append(f" {file_line}")

        return hunk_lines

    def _create_patch_for_hunks(self, hunks: List[DiffHunk]) -> str:
        """Create a patch string from a list of hunks.

        Args:
            hunks: List of hunks to include in patch

        Returns:
            Patch content as string
        """
        print(f"DEBUG: Creating patch for {len(hunks)} hunks")
        patch_lines = []
        current_file = None

        for hunk in hunks:
            print(f"DEBUG: Processing hunk for file {hunk.file_path}")
            print(f"DEBUG: Hunk has {len(hunk.lines)} lines")
            if hunk.lines:
                print(f"DEBUG: First line: {hunk.lines[0]}")
                print(f"DEBUG: Last line: {hunk.lines[-1]}")

            # Add file header if this is a new file
            if hunk.file_path != current_file:
                current_file = hunk.file_path
                patch_lines.extend(
                    [f"--- a/{hunk.file_path}", f"+++ b/{hunk.file_path}"]
                )
                print(f"DEBUG: Added file header for {hunk.file_path}")

            # Add hunk content
            patch_lines.extend(hunk.lines)
            print(f"DEBUG: Added {len(hunk.lines)} lines from hunk")

        patch_content = "\n".join(patch_lines) + "\n"
        print(f"DEBUG: Final patch content ({len(patch_content)} chars):")
        return patch_content

    def _start_rebase_edit(self, target_commit: str) -> bool:
        """Start interactive rebase to edit target commit.

        Args:
            target_commit: Commit to edit

        Returns:
            True if rebase started successfully
        """
        # Create rebase todo that marks target commit for editing
        todo_content = f"edit {target_commit}\n"

        # Write todo to temporary file
        with tempfile.NamedTemporaryFile(mode="w", suffix=".txt", delete=False) as f:
            f.write(todo_content)
            todo_file = f.name

        try:
            # Set git editor to use our todo file
            env = os.environ.copy()
            env["GIT_SEQUENCE_EDITOR"] = f"cp {todo_file}"

            # Start interactive rebase
            result = self.git_ops.run_git_command(
                ["rebase", "-i", f"{target_commit}^"], env=env
            )

            if result.returncode != 0:
                # Rebase failed to start
                return False

            return True

        finally:
            # Clean up temp file
            try:
                os.unlink(todo_file)
            except OSError:
                pass

    def _apply_patch(self, patch_content: str) -> None:
        """Apply patch content to working directory.

        Args:
            patch_content: Patch content to apply
        """
        # Write patch to temporary file
        with tempfile.NamedTemporaryFile(mode="w", suffix=".patch", delete=False) as f:
            f.write(patch_content)
            patch_file = f.name
            print(f"DEBUG: Wrote patch to temporary file: {patch_file}")

        try:
            # Apply patch using git apply with 3-way merge and fuzzy matching for better context handling
            print(
                f"DEBUG: Running git apply --3way --ignore-space-change --whitespace=nowarn {patch_file}"
            )
            result = self.git_ops.run_git_command(
                [
                    "apply",
                    "--3way",
                    "--ignore-space-change",
                    "--whitespace=nowarn",
                    patch_file,
                ]
            )
            print(f"DEBUG: git apply returned code: {result.returncode}")
            print(f"DEBUG: git apply stdout: {result.stdout}")
            print(f"DEBUG: git apply stderr: {result.stderr}")

            if result.returncode != 0:
                # Check if there are conflicts
                print("DEBUG: Patch application failed, checking for conflicts")
                conflicted_files = self._get_conflicted_files()
                print(f"DEBUG: Conflicted files: {conflicted_files}")
                if conflicted_files:
                    raise RebaseConflictError(
                        f"Patch application failed with conflicts: {result.stderr}",
                        conflicted_files,
                    )
                else:
                    raise subprocess.SubprocessError(
                        f"Patch application failed: {result.stderr}"
                    )

        finally:
            # Clean up temp file
            try:
                os.unlink(patch_file)
            except OSError:
                pass

    def _amend_commit(self) -> None:
        """Amend the current commit with changes, handling pre-commit hook modifications."""
        # Stage all changes
        result = self.git_ops.run_git_command(["add", "."])
        if result.returncode != 0:
            raise subprocess.SubprocessError(
                f"Failed to stage changes: {result.stderr}"
            )

        # Attempt to amend commit (keep original message)
        result = self.git_ops.run_git_command(["commit", "--amend", "--no-edit"])
        if result.returncode != 0:
            # Check if the failure was due to pre-commit hook modifications
            if "files were modified by this hook" in result.stderr:
                print(
                    "DEBUG: Pre-commit hook modified files, re-staging and retrying commit"
                )
                # Re-stage all changes after hook modifications
                stage_result = self.git_ops.run_git_command(["add", "."])
                if stage_result.returncode != 0:
                    raise subprocess.SubprocessError(
                        f"Failed to re-stage hook modifications: {stage_result.stderr}"
                    )
                # Retry the amend with hook modifications included
                retry_result = self.git_ops.run_git_command(
                    ["commit", "--amend", "--no-edit"]
                )
                if retry_result.returncode != 0:
                    raise subprocess.SubprocessError(
                        f"Failed to amend commit after hook modifications: {retry_result.stderr}"
                    )
                print(
                    "DEBUG: Successfully amended commit with pre-commit hook modifications"
                )
            else:
                raise subprocess.SubprocessError(
                    f"Failed to amend commit: {result.stderr}"
                )

    def _continue_rebase(self) -> None:
        """Continue the interactive rebase."""
        result = self.git_ops.run_git_command(["rebase", "--continue"])
        if result.returncode != 0:
            # Check for conflicts
            conflicted_files = self._get_conflicted_files()
            if conflicted_files:
                raise RebaseConflictError(
                    f"Rebase conflicts detected: {result.stderr}", conflicted_files
                )
            else:
                raise subprocess.SubprocessError(
                    f"Failed to continue rebase: {result.stderr}"
                )

    def _abort_rebase(self) -> None:
        """Abort the current rebase."""
        try:
            self.git_ops.run_git_command(["rebase", "--abort"])
        except subprocess.SubprocessError:
            # Ignore errors during abort
            pass

    def _get_conflicted_files(self) -> List[str]:
        """Get list of files with merge conflicts.

        Returns:
            List of file paths with conflicts
        """
        try:
            result = self.git_ops.run_git_command(
                ["diff", "--name-only", "--diff-filter=U"]
            )
            if result.returncode == 0:
                return [
                    line.strip() for line in result.stdout.split("\n") if line.strip()
                ]
        except subprocess.SubprocessError:
            pass

        return []

    def _cleanup_on_error(self) -> None:
        """Cleanup state after error."""
        # Abort any active rebase
        self._abort_rebase()

        # Restore stash if we created one
        if self._stash_ref:
            try:
                self.git_ops.run_git_command(["stash", "pop", self._stash_ref])
            except subprocess.SubprocessError:
                # Stash pop failed, but don't raise - user can manually recover
                pass
            finally:
                self._stash_ref = None

    def abort_operation(self) -> None:
        """Abort the current squash operation and restore original state."""
        self._cleanup_on_error()

    def is_rebase_in_progress(self) -> bool:
        """Check if a rebase is currently in progress.

        Returns:
            True if rebase is active
        """
        try:
            result = self.git_ops.run_git_command(["status", "--porcelain=v2"])
            if result.returncode == 0:
                # Look for rebase status indicators
                lines = result.stdout.split("\n")
                for line in lines:
                    if line.startswith("# rebase"):
                        return True
        except subprocess.SubprocessError:
            pass

        return False

    def get_rebase_status(self) -> Dict[str, Any]:
        """Get current rebase status information.

        Returns:
            Dictionary with rebase status details
        """
        status: Dict[str, Any] = {
            "in_progress": False,
            "current_commit": None,
            "conflicted_files": [],
            "step": None,
            "total_steps": None,
        }

        if not self.is_rebase_in_progress():
            return status

        status["in_progress"] = True
        status["conflicted_files"] = self._get_conflicted_files()

        # Try to get rebase step info
        try:
            rebase_dir = os.path.join(self.git_ops.repo_path, ".git", "rebase-merge")
            if os.path.exists(rebase_dir):
                # Read step info
                msgnum_file = os.path.join(rebase_dir, "msgnum")
                end_file = os.path.join(rebase_dir, "end")

                if os.path.exists(msgnum_file) and os.path.exists(end_file):
                    with open(msgnum_file, "r") as f:
                        status["step"] = int(f.read().strip())
                    with open(end_file, "r") as f:
                        status["total_steps"] = int(f.read().strip())
        except (OSError, ValueError):
            pass

        return status
Functions
__init__(git_ops: GitOps, merge_base: str) -> None

Initialize rebase manager.

Parameters:

Name Type Description Default
git_ops GitOps

Git operations handler

required
merge_base str

Merge base commit hash

required
Source code in src/git_autosquash/rebase_manager.py
def __init__(self, git_ops: GitOps, merge_base: str) -> None:
    """Initialize rebase manager.

    Args:
        git_ops: Git operations handler
        merge_base: Merge base commit hash
    """
    self.git_ops = git_ops
    self.merge_base = merge_base
    self._stash_ref: Optional[str] = None
    self._original_branch: Optional[str] = None
    self._batch_ops: Optional[BatchGitOperations] = None
execute_squash(mappings: List[HunkTargetMapping]) -> bool

Execute the squash operation for approved mappings.

Parameters:

Name Type Description Default
mappings List[HunkTargetMapping]

List of approved hunk to commit mappings

required

Returns:

Type Description
bool

True if successful, False if user aborted

Raises:

Type Description
RebaseConflictError

If conflicts occur during rebase

SubprocessError

If git operations fail

Source code in src/git_autosquash/rebase_manager.py
def execute_squash(self, mappings: List[HunkTargetMapping]) -> bool:
    """Execute the squash operation for approved mappings.

    Args:
        mappings: List of approved hunk to commit mappings

    Returns:
        True if successful, False if user aborted

    Raises:
        RebaseConflictError: If conflicts occur during rebase
        subprocess.SubprocessError: If git operations fail
    """
    if not mappings:
        return True

    # Store original branch for cleanup
    self._original_branch = self.git_ops.get_current_branch()
    if not self._original_branch:
        raise ValueError("Cannot determine current branch")

    try:
        # Group hunks by target commit
        commit_hunks = self._group_hunks_by_commit(mappings)

        # Check working tree state and handle stashing if needed
        self._handle_working_tree_state()

        # Execute rebase for each target commit
        target_commits = self._get_commit_order(set(commit_hunks.keys()))
        print(
            f"DEBUG: Processing {len(target_commits)} target commits in order: {[c[:8] for c in target_commits]}"
        )

        for target_commit in target_commits:
            hunks = commit_hunks[target_commit]
            print(
                f"DEBUG: Processing target commit {target_commit[:8]} with {len(hunks)} hunks"
            )
            success = self._apply_hunks_to_commit(target_commit, hunks)
            if not success:
                print(f"DEBUG: Failed to apply hunks to commit {target_commit[:8]}")
                return False
            print(
                f"DEBUG: Successfully applied hunks to commit {target_commit[:8]}"
            )
            print("=" * 80)

        return True

    except Exception:
        # Cleanup on any error
        self._cleanup_on_error()
        raise
abort_operation() -> None

Abort the current squash operation and restore original state.

Source code in src/git_autosquash/rebase_manager.py
def abort_operation(self) -> None:
    """Abort the current squash operation and restore original state."""
    self._cleanup_on_error()
is_rebase_in_progress() -> bool

Check if a rebase is currently in progress.

Returns:

Type Description
bool

True if rebase is active

Source code in src/git_autosquash/rebase_manager.py
def is_rebase_in_progress(self) -> bool:
    """Check if a rebase is currently in progress.

    Returns:
        True if rebase is active
    """
    try:
        result = self.git_ops.run_git_command(["status", "--porcelain=v2"])
        if result.returncode == 0:
            # Look for rebase status indicators
            lines = result.stdout.split("\n")
            for line in lines:
                if line.startswith("# rebase"):
                    return True
    except subprocess.SubprocessError:
        pass

    return False
get_rebase_status() -> Dict[str, Any]

Get current rebase status information.

Returns:

Type Description
Dict[str, Any]

Dictionary with rebase status details

Source code in src/git_autosquash/rebase_manager.py
def get_rebase_status(self) -> Dict[str, Any]:
    """Get current rebase status information.

    Returns:
        Dictionary with rebase status details
    """
    status: Dict[str, Any] = {
        "in_progress": False,
        "current_commit": None,
        "conflicted_files": [],
        "step": None,
        "total_steps": None,
    }

    if not self.is_rebase_in_progress():
        return status

    status["in_progress"] = True
    status["conflicted_files"] = self._get_conflicted_files()

    # Try to get rebase step info
    try:
        rebase_dir = os.path.join(self.git_ops.repo_path, ".git", "rebase-merge")
        if os.path.exists(rebase_dir):
            # Read step info
            msgnum_file = os.path.join(rebase_dir, "msgnum")
            end_file = os.path.join(rebase_dir, "end")

            if os.path.exists(msgnum_file) and os.path.exists(end_file):
                with open(msgnum_file, "r") as f:
                    status["step"] = int(f.read().strip())
                with open(end_file, "r") as f:
                    status["total_steps"] = int(f.read().strip())
    except (OSError, ValueError):
        pass

    return status

CLI Entry Point

Main

Command-line interface and entry point for git-autosquash.

git_autosquash.main

CLI entry point for git-autosquash.

Classes
Functions
main() -> None

Main entry point for git-autosquash command.

Source code in src/git_autosquash/main.py
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
def main() -> None:
    """Main entry point for git-autosquash command."""
    parser = argparse.ArgumentParser(
        prog="git-autosquash",
        description="Automatically squash changes back into historical commits",
    )
    parser.add_argument(
        "--line-by-line",
        action="store_true",
        help="Use line-by-line hunk splitting instead of default git hunks",
    )
    parser.add_argument(
        "--auto-accept",
        action="store_true",
        help="Automatically accept all hunks with blame-identified targets, bypass TUI",
    )
    parser.add_argument(
        "--version",
        action="version",
        version=f"%(prog)s {__version__}",
    )

    # Add strategy management subcommands
    subparsers = parser.add_subparsers(dest="command", help="Available commands")

    # Import here to avoid circular imports
    from git_autosquash.cli_strategy import add_strategy_subcommands

    add_strategy_subcommands(subparsers)

    args = parser.parse_args()

    # Handle strategy subcommands
    if hasattr(args, "func"):
        sys.exit(args.func(args))

    try:
        git_ops = GitOps()

        # Phase 1: Validate git repository
        if not git_ops.is_git_repo():
            error = RepositoryStateError(
                "Not in a git repository",
                recovery_suggestion="Run this command from within a git repository",
            )
            ErrorReporter.report_error(error)
            sys.exit(1)

        current_branch = git_ops.get_current_branch()
        if not current_branch:
            error = RepositoryStateError(
                "Not on a branch (detached HEAD)",
                recovery_suggestion="Switch to a branch with 'git checkout <branch-name>'",
            )
            ErrorReporter.report_error(error)
            sys.exit(1)

        merge_base = git_ops.get_merge_base_with_main(current_branch)
        if not merge_base:
            error = RepositoryStateError(
                "Could not find merge base with main/master",
                current_state=f"on branch {current_branch}",
                recovery_suggestion="Ensure your branch is based on main/master",
            )
            ErrorReporter.report_error(error)
            sys.exit(1)

        # Check if there are commits to work with
        if not git_ops.has_commits_since_merge_base(merge_base):
            error = RepositoryStateError(
                "No commits found on current branch since merge base",
                current_state=f"merge base: {merge_base}",
                recovery_suggestion="Make some commits on your branch before running git-autosquash",
            )
            ErrorReporter.report_error(error)
            sys.exit(1)

        # Analyze working tree status
        status = git_ops.get_working_tree_status()
        print(f"Current branch: {current_branch}")
        print(f"Merge base: {merge_base}")
        print(
            f"Working tree status: staged={status['has_staged']}, unstaged={status['has_unstaged']}, clean={status['is_clean']}"
        )

        # Handle mixed staged/unstaged state
        if status["has_staged"] and status["has_unstaged"]:
            print("\nMixed staged and unstaged changes detected.")
            print("Choose an option:")
            print("  a) Process all changes (staged + unstaged)")
            print("  s) Stash unstaged changes and process only staged")
            print("  q) Quit")

            choice = input("Your choice [a/s/q]: ").lower().strip()
            if choice == "q":
                print("Operation cancelled")
                sys.exit(0)
            elif choice == "s":
                print("Stash-only mode selected (not yet implemented)")
            elif choice == "a":
                print("Process-all mode selected")
            else:
                print("Invalid choice, defaulting to process all")
        elif status["is_clean"]:
            print("Working tree is clean, will reset HEAD~1 and process those changes")
        elif status["has_staged"]:
            print("Processing staged changes")
        else:
            print("Processing unstaged changes")

        # Phase 2: Parse hunks and analyze blame
        print("\nAnalyzing changes and finding target commits...")

        hunk_parser = HunkParser(git_ops)
        hunks = hunk_parser.get_diff_hunks(line_by_line=args.line_by_line)

        if not hunks:
            print("No changes found to process", file=sys.stderr)
            sys.exit(1)

        print(f"Found {len(hunks)} hunks to process")

        # Analyze hunks with enhanced blame and fallback analysis
        resolver = HunkTargetResolver(git_ops, merge_base)
        mappings = resolver.resolve_targets(hunks)

        # Categorize mappings into automatic and fallback
        automatic_mappings = [m for m in mappings if not m.needs_user_selection]
        fallback_mappings = [m for m in mappings if m.needs_user_selection]

        print(f"Found target commits for {len(automatic_mappings)} hunks")
        if fallback_mappings:
            print(
                f"Found {len(fallback_mappings)} hunks requiring manual target selection"
            )

        # If we have no automatic targets and no fallbacks, something is wrong
        if not mappings:
            print("No hunks found to process", file=sys.stderr)
            sys.exit(1)

        # Phase 3: User approval - either auto-accept or interactive TUI
        if args.auto_accept:
            # Auto-accept mode: accept all hunks with automatic blame targets
            approved_mappings = []
            ignored_mappings = []

            print(f"\nAuto-accept mode: Processing {len(mappings)} hunks...")

            for mapping in mappings:
                if mapping.target_commit and not mapping.needs_user_selection:
                    # This hunk has an automatic blame-identified target
                    approved_mappings.append(mapping)
                    commit_summary = resolver.get_commit_summary(mapping.target_commit)
                    print(f"✓ Auto-accepted: {mapping.hunk.file_path}{commit_summary}")
                else:
                    # This hunk needs manual selection, leave in working tree
                    ignored_mappings.append(mapping)
                    if mapping.needs_user_selection:
                        print(f"⚠ Left in working tree: {mapping.hunk.file_path} (needs manual selection)")
                    else:
                        print(f"⚠ Left in working tree: {mapping.hunk.file_path} (no target found)")

            print(f"\nAuto-accepted {len(approved_mappings)} hunks with automatic targets")
            if ignored_mappings:
                print(f"Left {len(ignored_mappings)} hunks in working tree")

            # Execute the rebase for approved hunks
            if approved_mappings:
                success = _execute_rebase(approved_mappings, git_ops, merge_base, resolver)
                if not success:
                    print("✗ Squash operation was aborted or failed.")
                    return
            else:
                success = True  # No rebase needed, just apply ignored hunks

            # Apply ignored hunks back to working tree
            if success and ignored_mappings:
                print(
                    f"\nApplying {len(ignored_mappings)} ignored hunks back to working tree..."
                )
                ignore_success = _apply_ignored_hunks(ignored_mappings, git_ops)
                if ignore_success:
                    print("✓ Ignored hunks have been restored to working tree")
                else:
                    print(
                        "⚠️  Some ignored hunks could not be restored - check working tree status"
                    )

            # Report final results for auto-accept mode
            if success:
                if approved_mappings and ignored_mappings:
                    print(
                        "✓ Operation completed! Changes squashed to commits and ignored hunks restored to working tree."
                    )
                elif approved_mappings:
                    print("✓ Squash operation completed successfully!")
                    print(
                        "Your changes have been distributed to their target commits."
                    )
                elif ignored_mappings:
                    print("✓ Ignored hunks have been restored to working tree.")
            else:
                print("✗ Operation failed.")

        else:
            # Interactive TUI mode
            print("\nLaunching enhanced interactive approval interface...")

            try:
                # Create commit history analyzer for fallback suggestions
                from git_autosquash.commit_history_analyzer import CommitHistoryAnalyzer

                commit_analyzer = CommitHistoryAnalyzer(git_ops, merge_base)

                # Always use enhanced app for better display of commit information
                from git_autosquash.tui.enhanced_app import EnhancedAutoSquashApp

                app = EnhancedAutoSquashApp(mappings, commit_analyzer)

                approved = app.run()

                if approved and (app.approved_mappings or app.ignored_mappings):
                    approved_mappings = app.approved_mappings
                    ignored_mappings = app.ignored_mappings

                    print(f"\nUser selected {len(approved_mappings)} hunks for squashing")
                    if ignored_mappings:
                        print(
                            f"User selected {len(ignored_mappings)} hunks to ignore (keep in working tree)"
                        )

                    # Phase 4 - Execute the interactive rebase for approved hunks
                    if approved_mappings:
                        print("\nExecuting interactive rebase for approved hunks...")
                        success = _execute_rebase(
                            approved_mappings, git_ops, merge_base, resolver
                        )

                        if not success:
                            print("✗ Squash operation was aborted or failed.")
                            return
                    else:
                        success = True  # No rebase needed, just apply ignored hunks

                    # Phase 5 - Apply ignored hunks back to working tree
                    if success and ignored_mappings:
                        print(
                            f"\nApplying {len(ignored_mappings)} ignored hunks back to working tree..."
                        )
                        ignore_success = _apply_ignored_hunks(ignored_mappings, git_ops)
                        if ignore_success:
                            print("✓ Ignored hunks have been restored to working tree")
                        else:
                            print(
                                "⚠️  Some ignored hunks could not be restored - check working tree status"
                            )

                    if success:
                        if approved_mappings and ignored_mappings:
                            print(
                                "✓ Operation completed! Changes squashed to commits and ignored hunks restored to working tree."
                            )
                        elif approved_mappings:
                            print("✓ Squash operation completed successfully!")
                            print(
                                "Your changes have been distributed to their target commits."
                            )
                        elif ignored_mappings:
                            print("✓ Ignored hunks have been restored to working tree.")
                    else:
                        print("✗ Operation failed.")

                else:
                    print("\nOperation cancelled by user or no hunks selected")

            except ImportError as e:
                print(f"\nTextual TUI not available: {e}")
                print("Falling back to simple text-based approval...")
                result = _simple_approval_fallback(mappings, resolver, commit_analyzer)

                approved_mappings = result["approved"]
                ignored_mappings = result["ignored"]

                if approved_mappings:
                    print(f"\nApproved {len(approved_mappings)} hunks for squashing")
                    if ignored_mappings:
                        print(
                            f"Selected {len(ignored_mappings)} hunks to ignore (keep in working tree)"
                        )

                    # Phase 4 - Execute the interactive rebase
                    print("\nExecuting interactive rebase...")
                    success = _execute_rebase(
                        approved_mappings, git_ops, merge_base, resolver
                    )

                    if success:
                        print("✓ Squash operation completed successfully!")
                        print("Your changes have been distributed to their target commits.")

                        # Apply ignored hunks back to working tree
                        if ignored_mappings:
                            print(
                                f"\nApplying {len(ignored_mappings)} ignored hunks back to working tree..."
                            )
                            ignore_success = _apply_ignored_hunks(ignored_mappings, git_ops)
                            if ignore_success:
                                print("✓ Ignored hunks have been restored to working tree")
                            else:
                                print(
                                    "⚠️  Some ignored hunks could not be restored - check working tree status"
                                )
                    else:
                        print("✗ Squash operation was aborted or failed.")
                else:
                    print("\nOperation cancelled")

            except Exception as e:
                print(f"\nTUI encountered an error: {e}")
                print("Falling back to simple text-based approval...")
                result = _simple_approval_fallback(mappings, resolver, commit_analyzer)

                approved_mappings = result["approved"]
                ignored_mappings = result["ignored"]

                if approved_mappings:
                    print(f"\nApproved {len(approved_mappings)} hunks for squashing")
                    if ignored_mappings:
                        print(
                            f"Selected {len(ignored_mappings)} hunks to ignore (keep in working tree)"
                        )

                    # Phase 4 - Execute the interactive rebase
                    print("\nExecuting interactive rebase...")
                    success = _execute_rebase(
                        approved_mappings, git_ops, merge_base, resolver
                    )

                    if success:
                        print("✓ Squash operation completed successfully!")
                        print("Your changes have been distributed to their target commits.")

                        # Apply ignored hunks back to working tree
                        if ignored_mappings:
                            print(
                                f"\nApplying {len(ignored_mappings)} ignored hunks back to working tree..."
                            )
                            ignore_success = _apply_ignored_hunks(ignored_mappings, git_ops)
                            if ignore_success:
                                print("✓ Ignored hunks have been restored to working tree")
                            else:
                                print(
                                    "⚠️  Some ignored hunks could not be restored - check working tree status"
                                )
                    else:
                        print("✗ Squash operation was aborted or failed.")
                else:
                    print("\nOperation cancelled")

    except GitAutoSquashError as e:
        # Our custom exceptions with user-friendly messages
        ErrorReporter.report_error(e)
        sys.exit(1)
    except KeyboardInterrupt:
        error = UserCancelledError("git-autosquash operation")
        ErrorReporter.report_error(error)
        sys.exit(130)
    except (subprocess.SubprocessError, FileNotFoundError) as e:
        # Git/system operation failures
        wrapped = handle_unexpected_error(
            e, "git operation", "Check git installation and repository state"
        )
        ErrorReporter.report_error(wrapped)
        sys.exit(1)
    except Exception as e:
        # Catch-all for unexpected errors
        wrapped = handle_unexpected_error(e, "git-autosquash execution")
        ErrorReporter.report_error(wrapped)
        sys.exit(1)

TUI Components

AutoSquashApp

Main Textual application for the interactive approval workflow.

git_autosquash.tui.app

Main Textual application for hunk approval workflow.

Classes
AutoSquashApp

Bases: App[bool]

Main application for git-autosquash TUI.

Source code in src/git_autosquash/tui/app.py
class AutoSquashApp(App[bool]):
    """Main application for git-autosquash TUI."""

    TITLE = "git-autosquash"
    SUB_TITLE = "Interactive hunk approval"

    BINDINGS = [
        Binding("q", "quit", "Quit", priority=True),
        Binding("ctrl+c", "quit", "Quit", show=False),
    ]

    def __init__(self, mappings: List[HunkTargetMapping]) -> None:
        """Initialize the application.

        Args:
            mappings: List of hunk to target commit mappings to review
        """
        super().__init__()
        self.mappings = mappings
        self.approved_mappings: List[HunkTargetMapping] = []
        self.ignored_mappings: List[HunkTargetMapping] = []

    def compose(self) -> ComposeResult:
        """Compose the application layout."""
        yield Header()
        yield Footer()

    def on_mount(self) -> None:
        """Handle application startup."""
        if not self.mappings:
            # No mappings to review, exit immediately
            self.exit(False)
            return

        # Push approval screen with mappings
        self.push_screen(ApprovalScreen(self.mappings), self._handle_approval_result)

    def _handle_approval_result(
        self, result: bool | dict | List[HunkTargetMapping] | None
    ) -> None:
        """Handle result from approval screen.

        Args:
            result: Dict with 'approved' and 'ignored' keys containing mappings,
                   List of approved mappings (legacy format),
                   Boolean True/False for success/cancel
        """
        if isinstance(result, dict):
            # New format with both approved and ignored mappings
            self.approved_mappings = result.get("approved", [])
            self.ignored_mappings = result.get("ignored", [])
            self.exit(True)
        elif isinstance(result, list):
            # Legacy format - just approved mappings
            self.approved_mappings = result
            self.ignored_mappings = []
            self.exit(True)
        elif result:
            # Boolean True (should not happen with new implementation)
            self.exit(True)
        else:
            # User cancelled
            self.exit(False)

    async def action_quit(self) -> None:
        """Handle quit action."""
        self.exit(False)
Functions
__init__(mappings: List[HunkTargetMapping]) -> None

Initialize the application.

Parameters:

Name Type Description Default
mappings List[HunkTargetMapping]

List of hunk to target commit mappings to review

required
Source code in src/git_autosquash/tui/app.py
def __init__(self, mappings: List[HunkTargetMapping]) -> None:
    """Initialize the application.

    Args:
        mappings: List of hunk to target commit mappings to review
    """
    super().__init__()
    self.mappings = mappings
    self.approved_mappings: List[HunkTargetMapping] = []
    self.ignored_mappings: List[HunkTargetMapping] = []
compose() -> ComposeResult

Compose the application layout.

Source code in src/git_autosquash/tui/app.py
def compose(self) -> ComposeResult:
    """Compose the application layout."""
    yield Header()
    yield Footer()
on_mount() -> None

Handle application startup.

Source code in src/git_autosquash/tui/app.py
def on_mount(self) -> None:
    """Handle application startup."""
    if not self.mappings:
        # No mappings to review, exit immediately
        self.exit(False)
        return

    # Push approval screen with mappings
    self.push_screen(ApprovalScreen(self.mappings), self._handle_approval_result)
action_quit() -> None async

Handle quit action.

Source code in src/git_autosquash/tui/app.py
async def action_quit(self) -> None:
    """Handle quit action."""
    self.exit(False)
WelcomeScreen

Bases: Screen[None]

Welcome screen showing project information.

Source code in src/git_autosquash/tui/app.py
class WelcomeScreen(Screen[None]):
    """Welcome screen showing project information."""

    def compose(self) -> ComposeResult:
        """Compose the welcome screen."""
        with Container():
            yield Static("Welcome to git-autosquash", id="title")
            yield Static(
                "This tool will help you automatically distribute changes "
                "back to the commits where they belong.",
                id="description",
            )
            with Horizontal():
                yield Button("Continue", variant="primary", id="continue")
                yield Button("Cancel", id="cancel")

    def on_button_pressed(self, event: Button.Pressed) -> None:
        """Handle button presses."""
        if event.button.id == "continue":
            self.dismiss(None)
        else:
            self.dismiss(None)
Functions
compose() -> ComposeResult

Compose the welcome screen.

Source code in src/git_autosquash/tui/app.py
def compose(self) -> ComposeResult:
    """Compose the welcome screen."""
    with Container():
        yield Static("Welcome to git-autosquash", id="title")
        yield Static(
            "This tool will help you automatically distribute changes "
            "back to the commits where they belong.",
            id="description",
        )
        with Horizontal():
            yield Button("Continue", variant="primary", id="continue")
            yield Button("Cancel", id="cancel")
on_button_pressed(event: Button.Pressed) -> None

Handle button presses.

Source code in src/git_autosquash/tui/app.py
def on_button_pressed(self, event: Button.Pressed) -> None:
    """Handle button presses."""
    if event.button.id == "continue":
        self.dismiss(None)
    else:
        self.dismiss(None)

ApprovalScreen

Interactive screen for reviewing and approving hunk to commit mappings.

git_autosquash.tui.screens

Screen implementations for git-autosquash TUI.

Classes
ApprovalScreen

Bases: Screen[Union[bool, List[HunkTargetMapping]]]

Screen for approving hunk to commit mappings.

Source code in src/git_autosquash/tui/screens.py
class ApprovalScreen(Screen[Union[bool, List[HunkTargetMapping]]]):
    """Screen for approving hunk to commit mappings."""

    BINDINGS = [
        Binding("enter", "approve_all", "Approve & Continue", priority=True),
        Binding("escape", "cancel", "Cancel", priority=True),
        Binding("a", "approve_all_toggle", "Toggle All", priority=False),
        Binding("i", "ignore_all_toggle", "Toggle All Ignore", priority=False),
        Binding("space", "toggle_current", "Toggle Current", priority=False),
        Binding("j", "next_hunk", "Next Hunk", show=False),
        Binding("k", "prev_hunk", "Prev Hunk", show=False),
        Binding("down", "next_hunk", "Next Hunk", show=False),
        Binding("up", "prev_hunk", "Prev Hunk", show=False),
    ]

    def __init__(self, mappings: List[HunkTargetMapping], **kwargs) -> None:
        """Initialize approval screen.

        Args:
            mappings: List of hunk to commit mappings to review
        """
        super().__init__(**kwargs)
        self.mappings = mappings
        self.current_hunk_index = 0
        self.hunk_widgets: List[HunkMappingWidget] = []
        self._selected_widget: HunkMappingWidget | None = None
        self._diff_viewer: DiffViewer | None = None

        # Centralized state management
        self.state_controller = UIStateController(mappings)

        # O(1) lookup cache for widget selection performance
        self._mapping_to_widget: Dict[HunkTargetMapping, HunkMappingWidget] = {}
        self._mapping_to_index: Dict[HunkTargetMapping, int] = {}

    def compose(self) -> ComposeResult:
        """Compose the screen layout."""
        yield Header()

        with Container(id="main-container"):
            # Title and summary
            yield Static("Hunk to Commit Mapping Review", id="screen-title")
            yield Static(
                f"Review {len(self.mappings)} hunks and their target commits. "
                "Use checkboxes to approve/reject individual hunks.",
                id="screen-description",
            )

            # Progress indicator
            yield ProgressIndicator(len(self.mappings), id="progress")

            # Main content area
            with Horizontal(id="content-area"):
                # Left panel: Hunk list
                with Vertical(id="hunk-list-panel"):
                    yield Static("Hunks", id="hunk-list-title")
                    with VerticalScroll(id="hunk-list"):
                        for i, mapping in enumerate(self.mappings):
                            hunk_widget = HunkMappingWidget(mapping)
                            self.hunk_widgets.append(hunk_widget)
                            # Build O(1) lookup caches
                            self._mapping_to_widget[mapping] = hunk_widget
                            self._mapping_to_index[mapping] = i
                            yield hunk_widget

                # Right panel: Diff viewer
                with Vertical(id="diff-panel"):
                    yield Static("Diff Preview", id="diff-title")
                    yield DiffViewer(id="diff-viewer")

            # Action buttons
            with Horizontal(id="action-buttons"):
                yield Button(
                    "Approve All & Continue", variant="success", id="approve-all"
                )
                yield Button("Continue with Selected", variant="primary", id="continue")
                yield Button("Cancel", variant="default", id="cancel")

        yield Footer()

    def on_mount(self) -> None:
        """Handle screen mounting."""
        # Cache diff viewer reference
        self._diff_viewer = self.query_one("#diff-viewer", DiffViewer)

        # Select first hunk if available
        if self.hunk_widgets:
            self._select_widget(self.hunk_widgets[0])

        # Update progress
        self._update_progress()

    @on(HunkMappingWidget.Selected)
    def on_hunk_selected(self, message: HunkMappingWidget.Selected) -> None:
        """Handle hunk selection using O(1) lookup."""
        # Use cached lookups for O(1) performance instead of O(n) iteration
        target_widget = self._mapping_to_widget.get(message.mapping)
        if target_widget:
            self.current_hunk_index = self._mapping_to_index[message.mapping]
            self._select_widget(target_widget)

    @on(HunkMappingWidget.ApprovalChanged)
    def on_approval_changed(self, message: HunkMappingWidget.ApprovalChanged) -> None:
        """Handle approval status changes."""
        self.state_controller.set_approved(message.mapping, message.approved)
        self._update_progress()

    @on(HunkMappingWidget.IgnoreChanged)
    def on_ignore_changed(self, message: HunkMappingWidget.IgnoreChanged) -> None:
        """Handle ignore status changes."""
        self.state_controller.set_ignored(message.mapping, message.ignored)
        self._update_progress()

    @on(Button.Pressed)
    def on_button_pressed(self, event: Button.Pressed) -> None:
        """Handle button presses."""
        if event.button.id == "approve-all":
            self.action_approve_all()
        elif event.button.id == "continue":
            self.action_continue()
        elif event.button.id == "cancel":
            self.action_cancel()

    def action_approve_all(self) -> None:
        """Approve all hunks and continue."""
        self.state_controller.approve_all()
        self._sync_widgets_with_state()
        self._update_progress()

        result = {
            "approved": self.state_controller.get_approved_mappings(),
            "ignored": self.state_controller.get_ignored_mappings(),
        }
        self.dismiss(result)

    def action_approve_all_toggle(self) -> None:
        """Toggle approval status of all hunks."""
        self.state_controller.approve_all_toggle()
        self._sync_widgets_with_state()
        self._update_progress()

    def action_continue(self) -> None:
        """Continue with currently selected hunks."""
        if not self.state_controller.has_selections():
            # No hunks selected at all, cannot continue
            return

        result = {
            "approved": self.state_controller.get_approved_mappings(),
            "ignored": self.state_controller.get_ignored_mappings(),
        }
        self.dismiss(result)

    def action_cancel(self) -> None:
        """Cancel the approval process."""
        self.dismiss(False)

    def action_next_hunk(self) -> None:
        """Select next hunk."""
        if self.current_hunk_index < len(self.hunk_widgets) - 1:
            self.current_hunk_index += 1
            self._select_hunk_by_index(self.current_hunk_index)

    def action_prev_hunk(self) -> None:
        """Select previous hunk."""
        if self.current_hunk_index > 0:
            self.current_hunk_index -= 1
            self._select_hunk_by_index(self.current_hunk_index)

    def action_ignore_all_toggle(self) -> None:
        """Toggle ignore status of all hunks."""
        self.state_controller.ignore_all_toggle()
        self._sync_widgets_with_state()
        self._update_progress()

    def action_toggle_current(self) -> None:
        """Toggle the approval state of the currently selected hunk (with checkbox model, just toggle approve)."""
        if not self.hunk_widgets or self.current_hunk_index >= len(self.hunk_widgets):
            return

        mapping = self.mappings[self.current_hunk_index]
        self.state_controller.toggle_approved(mapping)
        self._sync_widget_with_state(
            self.hunk_widgets[self.current_hunk_index], mapping
        )
        self._update_progress()

    def _select_widget(self, widget: HunkMappingWidget) -> None:
        """Select a specific widget, optimized to avoid O(n) operations.

        Args:
            widget: The widget to select
        """
        # Deselect previous widget (O(1) operation)
        if self._selected_widget and self._selected_widget != widget:
            self._selected_widget.selected = False

        # Select new widget
        widget.selected = True
        self._selected_widget = widget

        # Update diff viewer (cached reference)
        if self._diff_viewer:
            self._diff_viewer.show_hunk(widget.mapping.hunk)

        # Scroll to selected widget
        widget.scroll_visible()

    def _select_hunk_by_index(self, index: int) -> None:
        """Select hunk by index."""
        if 0 <= index < len(self.hunk_widgets):
            self.current_hunk_index = index
            self._select_widget(self.hunk_widgets[index])

    def _update_progress(self) -> None:
        """Update the progress indicator."""
        stats = self.state_controller.get_progress_stats()
        progress = self.query_one("#progress", ProgressIndicator)
        progress.update_progress(stats["approved"], stats["ignored"])

    def _sync_widgets_with_state(self) -> None:
        """Synchronize all widgets with the centralized state."""
        for widget in self.hunk_widgets:
            self._sync_widget_with_state(widget, widget.mapping)

    def _sync_widget_with_state(
        self, widget: HunkMappingWidget, mapping: HunkTargetMapping
    ) -> None:
        """Synchronize a single widget with the centralized state.

        Args:
            widget: The widget to synchronize
            mapping: The mapping associated with the widget
        """
        # Update widget reactive properties
        widget.approved = self.state_controller.is_approved(mapping)
        widget.ignored = self.state_controller.is_ignored(mapping)

        # Update checkboxes to reflect new state
        try:
            approve_checkbox = widget.query_one("#approve-checkbox", Checkbox)
            ignore_checkbox = widget.query_one("#ignore-checkbox", Checkbox)
            approve_checkbox.value = widget.approved
            ignore_checkbox.value = widget.ignored
        except (AttributeError, ValueError):
            # Checkboxes might not be available during initial setup or widget composition
            pass
Functions
__init__(mappings: List[HunkTargetMapping], **kwargs) -> None

Initialize approval screen.

Parameters:

Name Type Description Default
mappings List[HunkTargetMapping]

List of hunk to commit mappings to review

required
Source code in src/git_autosquash/tui/screens.py
def __init__(self, mappings: List[HunkTargetMapping], **kwargs) -> None:
    """Initialize approval screen.

    Args:
        mappings: List of hunk to commit mappings to review
    """
    super().__init__(**kwargs)
    self.mappings = mappings
    self.current_hunk_index = 0
    self.hunk_widgets: List[HunkMappingWidget] = []
    self._selected_widget: HunkMappingWidget | None = None
    self._diff_viewer: DiffViewer | None = None

    # Centralized state management
    self.state_controller = UIStateController(mappings)

    # O(1) lookup cache for widget selection performance
    self._mapping_to_widget: Dict[HunkTargetMapping, HunkMappingWidget] = {}
    self._mapping_to_index: Dict[HunkTargetMapping, int] = {}
compose() -> ComposeResult

Compose the screen layout.

Source code in src/git_autosquash/tui/screens.py
def compose(self) -> ComposeResult:
    """Compose the screen layout."""
    yield Header()

    with Container(id="main-container"):
        # Title and summary
        yield Static("Hunk to Commit Mapping Review", id="screen-title")
        yield Static(
            f"Review {len(self.mappings)} hunks and their target commits. "
            "Use checkboxes to approve/reject individual hunks.",
            id="screen-description",
        )

        # Progress indicator
        yield ProgressIndicator(len(self.mappings), id="progress")

        # Main content area
        with Horizontal(id="content-area"):
            # Left panel: Hunk list
            with Vertical(id="hunk-list-panel"):
                yield Static("Hunks", id="hunk-list-title")
                with VerticalScroll(id="hunk-list"):
                    for i, mapping in enumerate(self.mappings):
                        hunk_widget = HunkMappingWidget(mapping)
                        self.hunk_widgets.append(hunk_widget)
                        # Build O(1) lookup caches
                        self._mapping_to_widget[mapping] = hunk_widget
                        self._mapping_to_index[mapping] = i
                        yield hunk_widget

            # Right panel: Diff viewer
            with Vertical(id="diff-panel"):
                yield Static("Diff Preview", id="diff-title")
                yield DiffViewer(id="diff-viewer")

        # Action buttons
        with Horizontal(id="action-buttons"):
            yield Button(
                "Approve All & Continue", variant="success", id="approve-all"
            )
            yield Button("Continue with Selected", variant="primary", id="continue")
            yield Button("Cancel", variant="default", id="cancel")

    yield Footer()
on_mount() -> None

Handle screen mounting.

Source code in src/git_autosquash/tui/screens.py
def on_mount(self) -> None:
    """Handle screen mounting."""
    # Cache diff viewer reference
    self._diff_viewer = self.query_one("#diff-viewer", DiffViewer)

    # Select first hunk if available
    if self.hunk_widgets:
        self._select_widget(self.hunk_widgets[0])

    # Update progress
    self._update_progress()
on_hunk_selected(message: HunkMappingWidget.Selected) -> None

Handle hunk selection using O(1) lookup.

Source code in src/git_autosquash/tui/screens.py
@on(HunkMappingWidget.Selected)
def on_hunk_selected(self, message: HunkMappingWidget.Selected) -> None:
    """Handle hunk selection using O(1) lookup."""
    # Use cached lookups for O(1) performance instead of O(n) iteration
    target_widget = self._mapping_to_widget.get(message.mapping)
    if target_widget:
        self.current_hunk_index = self._mapping_to_index[message.mapping]
        self._select_widget(target_widget)
on_approval_changed(message: HunkMappingWidget.ApprovalChanged) -> None

Handle approval status changes.

Source code in src/git_autosquash/tui/screens.py
@on(HunkMappingWidget.ApprovalChanged)
def on_approval_changed(self, message: HunkMappingWidget.ApprovalChanged) -> None:
    """Handle approval status changes."""
    self.state_controller.set_approved(message.mapping, message.approved)
    self._update_progress()
on_ignore_changed(message: HunkMappingWidget.IgnoreChanged) -> None

Handle ignore status changes.

Source code in src/git_autosquash/tui/screens.py
@on(HunkMappingWidget.IgnoreChanged)
def on_ignore_changed(self, message: HunkMappingWidget.IgnoreChanged) -> None:
    """Handle ignore status changes."""
    self.state_controller.set_ignored(message.mapping, message.ignored)
    self._update_progress()
on_button_pressed(event: Button.Pressed) -> None

Handle button presses.

Source code in src/git_autosquash/tui/screens.py
@on(Button.Pressed)
def on_button_pressed(self, event: Button.Pressed) -> None:
    """Handle button presses."""
    if event.button.id == "approve-all":
        self.action_approve_all()
    elif event.button.id == "continue":
        self.action_continue()
    elif event.button.id == "cancel":
        self.action_cancel()
action_approve_all() -> None

Approve all hunks and continue.

Source code in src/git_autosquash/tui/screens.py
def action_approve_all(self) -> None:
    """Approve all hunks and continue."""
    self.state_controller.approve_all()
    self._sync_widgets_with_state()
    self._update_progress()

    result = {
        "approved": self.state_controller.get_approved_mappings(),
        "ignored": self.state_controller.get_ignored_mappings(),
    }
    self.dismiss(result)
action_approve_all_toggle() -> None

Toggle approval status of all hunks.

Source code in src/git_autosquash/tui/screens.py
def action_approve_all_toggle(self) -> None:
    """Toggle approval status of all hunks."""
    self.state_controller.approve_all_toggle()
    self._sync_widgets_with_state()
    self._update_progress()
action_continue() -> None

Continue with currently selected hunks.

Source code in src/git_autosquash/tui/screens.py
def action_continue(self) -> None:
    """Continue with currently selected hunks."""
    if not self.state_controller.has_selections():
        # No hunks selected at all, cannot continue
        return

    result = {
        "approved": self.state_controller.get_approved_mappings(),
        "ignored": self.state_controller.get_ignored_mappings(),
    }
    self.dismiss(result)
action_cancel() -> None

Cancel the approval process.

Source code in src/git_autosquash/tui/screens.py
def action_cancel(self) -> None:
    """Cancel the approval process."""
    self.dismiss(False)
action_next_hunk() -> None

Select next hunk.

Source code in src/git_autosquash/tui/screens.py
def action_next_hunk(self) -> None:
    """Select next hunk."""
    if self.current_hunk_index < len(self.hunk_widgets) - 1:
        self.current_hunk_index += 1
        self._select_hunk_by_index(self.current_hunk_index)
action_prev_hunk() -> None

Select previous hunk.

Source code in src/git_autosquash/tui/screens.py
def action_prev_hunk(self) -> None:
    """Select previous hunk."""
    if self.current_hunk_index > 0:
        self.current_hunk_index -= 1
        self._select_hunk_by_index(self.current_hunk_index)
action_ignore_all_toggle() -> None

Toggle ignore status of all hunks.

Source code in src/git_autosquash/tui/screens.py
def action_ignore_all_toggle(self) -> None:
    """Toggle ignore status of all hunks."""
    self.state_controller.ignore_all_toggle()
    self._sync_widgets_with_state()
    self._update_progress()
action_toggle_current() -> None

Toggle the approval state of the currently selected hunk (with checkbox model, just toggle approve).

Source code in src/git_autosquash/tui/screens.py
def action_toggle_current(self) -> None:
    """Toggle the approval state of the currently selected hunk (with checkbox model, just toggle approve)."""
    if not self.hunk_widgets or self.current_hunk_index >= len(self.hunk_widgets):
        return

    mapping = self.mappings[self.current_hunk_index]
    self.state_controller.toggle_approved(mapping)
    self._sync_widget_with_state(
        self.hunk_widgets[self.current_hunk_index], mapping
    )
    self._update_progress()

Widgets

Custom widgets for the TUI interface.

git_autosquash.tui.widgets

Custom widgets for git-autosquash TUI.

Classes
HunkMappingWidget

Bases: Widget

Widget displaying a single hunk to commit mapping.

Source code in src/git_autosquash/tui/widgets.py
class HunkMappingWidget(Widget):
    """Widget displaying a single hunk to commit mapping."""

    DEFAULT_CSS = """
    HunkMappingWidget {
        height: auto;
        margin: 1 0;
        border: round $primary;
    }

    HunkMappingWidget.selected {
        border: thick $accent;
    }

    HunkMappingWidget .hunk-header {
        background: $primary-background;
        color: $primary;
        text-style: bold;
        padding: 0 1;
    }

    HunkMappingWidget .commit-info {
        background: $secondary-background;
        color: $text;
        padding: 0 1;
    }

    HunkMappingWidget .confidence-high {
        color: $success;
    }

    HunkMappingWidget .confidence-medium {
        color: $warning;
    }

    HunkMappingWidget .confidence-low {
        color: $error;
    }
    """

    selected = reactive(False)
    approved = reactive(False)  # Default to unapproved for safety
    ignored = reactive(False)  # New state for ignoring hunks

    class Selected(Message):
        """Message sent when hunk is selected."""

        def __init__(self, mapping: HunkTargetMapping) -> None:
            self.mapping = mapping
            super().__init__()

    class ApprovalChanged(Message):
        """Message sent when approval status changes."""

        def __init__(self, mapping: HunkTargetMapping, approved: bool) -> None:
            self.mapping = mapping
            self.approved = approved
            super().__init__()

    class IgnoreChanged(Message):
        """Message sent when ignore status changes."""

        def __init__(self, mapping: HunkTargetMapping, ignored: bool) -> None:
            self.mapping = mapping
            self.ignored = ignored
            super().__init__()

    def __init__(self, mapping: HunkTargetMapping, **kwargs) -> None:
        """Initialize hunk mapping widget.

        Args:
            mapping: The hunk to commit mapping to display
        """
        super().__init__(**kwargs)
        self.mapping = mapping

    def compose(self) -> ComposeResult:
        """Compose the widget layout."""
        with Vertical():
            # Header with file and hunk info
            hunk_info = f"{self.mapping.hunk.file_path} @@ {self._format_hunk_range()}"
            yield Static(hunk_info, classes="hunk-header")

            # Target commit info
            if self.mapping.target_commit:
                commit_summary = f"→ {self.mapping.target_commit[:8]} "
                confidence_class = f"confidence-{self.mapping.confidence}"
                commit_info = f"{commit_summary} ({self.mapping.confidence} confidence)"
            else:
                commit_info = "→ No target commit found"
                confidence_class = "confidence-low"

            yield Static(commit_info, classes=f"commit-info {confidence_class}")

            # Action selection with separate concerns
            with Horizontal():
                yield Checkbox(
                    "Approve for squashing", value=self.approved, id="approve-checkbox"
                )
                yield Checkbox(
                    "Ignore (keep in working tree)",
                    value=self.ignored,
                    id="ignore-checkbox",
                )

    def _format_hunk_range(self) -> str:
        """Format hunk line range for display."""
        hunk = self.mapping.hunk
        return f"-{hunk.old_start},{hunk.old_count} +{hunk.new_start},{hunk.new_count}"

    def on_click(self, event: events.Click) -> None:
        """Handle click events."""
        self.selected = True
        self.post_message(self.Selected(self.mapping))

    def on_checkbox_changed(self, event: Checkbox.Changed) -> None:
        """Handle checkbox changes."""
        if event.checkbox.id == "approve-checkbox":
            self.approved = event.value
            self.post_message(self.ApprovalChanged(self.mapping, event.value))
        elif event.checkbox.id == "ignore-checkbox":
            self.ignored = event.value
            self.post_message(self.IgnoreChanged(self.mapping, event.value))

    def watch_selected(self, selected: bool) -> None:
        """React to selection changes."""
        self.set_class(selected, "selected")
Classes
Selected

Bases: Message

Message sent when hunk is selected.

Source code in src/git_autosquash/tui/widgets.py
class Selected(Message):
    """Message sent when hunk is selected."""

    def __init__(self, mapping: HunkTargetMapping) -> None:
        self.mapping = mapping
        super().__init__()
ApprovalChanged

Bases: Message

Message sent when approval status changes.

Source code in src/git_autosquash/tui/widgets.py
class ApprovalChanged(Message):
    """Message sent when approval status changes."""

    def __init__(self, mapping: HunkTargetMapping, approved: bool) -> None:
        self.mapping = mapping
        self.approved = approved
        super().__init__()
IgnoreChanged

Bases: Message

Message sent when ignore status changes.

Source code in src/git_autosquash/tui/widgets.py
class IgnoreChanged(Message):
    """Message sent when ignore status changes."""

    def __init__(self, mapping: HunkTargetMapping, ignored: bool) -> None:
        self.mapping = mapping
        self.ignored = ignored
        super().__init__()
Functions
__init__(mapping: HunkTargetMapping, **kwargs) -> None

Initialize hunk mapping widget.

Parameters:

Name Type Description Default
mapping HunkTargetMapping

The hunk to commit mapping to display

required
Source code in src/git_autosquash/tui/widgets.py
def __init__(self, mapping: HunkTargetMapping, **kwargs) -> None:
    """Initialize hunk mapping widget.

    Args:
        mapping: The hunk to commit mapping to display
    """
    super().__init__(**kwargs)
    self.mapping = mapping
compose() -> ComposeResult

Compose the widget layout.

Source code in src/git_autosquash/tui/widgets.py
def compose(self) -> ComposeResult:
    """Compose the widget layout."""
    with Vertical():
        # Header with file and hunk info
        hunk_info = f"{self.mapping.hunk.file_path} @@ {self._format_hunk_range()}"
        yield Static(hunk_info, classes="hunk-header")

        # Target commit info
        if self.mapping.target_commit:
            commit_summary = f"→ {self.mapping.target_commit[:8]} "
            confidence_class = f"confidence-{self.mapping.confidence}"
            commit_info = f"{commit_summary} ({self.mapping.confidence} confidence)"
        else:
            commit_info = "→ No target commit found"
            confidence_class = "confidence-low"

        yield Static(commit_info, classes=f"commit-info {confidence_class}")

        # Action selection with separate concerns
        with Horizontal():
            yield Checkbox(
                "Approve for squashing", value=self.approved, id="approve-checkbox"
            )
            yield Checkbox(
                "Ignore (keep in working tree)",
                value=self.ignored,
                id="ignore-checkbox",
            )
on_click(event: events.Click) -> None

Handle click events.

Source code in src/git_autosquash/tui/widgets.py
def on_click(self, event: events.Click) -> None:
    """Handle click events."""
    self.selected = True
    self.post_message(self.Selected(self.mapping))
on_checkbox_changed(event: Checkbox.Changed) -> None

Handle checkbox changes.

Source code in src/git_autosquash/tui/widgets.py
def on_checkbox_changed(self, event: Checkbox.Changed) -> None:
    """Handle checkbox changes."""
    if event.checkbox.id == "approve-checkbox":
        self.approved = event.value
        self.post_message(self.ApprovalChanged(self.mapping, event.value))
    elif event.checkbox.id == "ignore-checkbox":
        self.ignored = event.value
        self.post_message(self.IgnoreChanged(self.mapping, event.value))
watch_selected(selected: bool) -> None

React to selection changes.

Source code in src/git_autosquash/tui/widgets.py
def watch_selected(self, selected: bool) -> None:
    """React to selection changes."""
    self.set_class(selected, "selected")
DiffViewer

Bases: Widget

Widget for displaying diff content with syntax highlighting.

Source code in src/git_autosquash/tui/widgets.py
class DiffViewer(Widget):
    """Widget for displaying diff content with syntax highlighting."""

    DEFAULT_CSS = """
    DiffViewer {
        border: round $primary;
        padding: 1;
    }

    DiffViewer .diff-header {
        color: $text-muted;
        text-style: bold;
    }
    """

    def __init__(self, **kwargs) -> None:
        """Initialize diff viewer."""
        super().__init__(**kwargs)
        self._current_hunk: Optional[DiffHunk] = None

    def compose(self) -> ComposeResult:
        """Compose the widget layout."""
        yield Static("Select a hunk to view diff", id="diff-content")

    def show_hunk(self, hunk: DiffHunk) -> None:
        """Display diff content for a hunk.

        Args:
            hunk: The hunk to display
        """
        self._current_hunk = hunk

        # Format diff content
        diff_lines = []
        for line in hunk.lines:
            diff_lines.append(line)

        diff_text = "\n".join(diff_lines)

        # Create syntax highlighted content
        try:
            # Use diff syntax highlighting regardless of file extension
            # since we're showing diff output, not the original file
            syntax = Syntax(diff_text, "diff", theme="monokai", line_numbers=False)
            content: Union[Syntax, Text] = syntax
        except (ImportError, ValueError, AttributeError):
            # Fallback to plain text if syntax highlighting fails or is unavailable
            content = Text(diff_text)

        # Update the display
        diff_widget = self.query_one("#diff-content", Static)
        diff_widget.update(content)

    def _get_language_from_file(self, file_path: str) -> str:
        """Get language identifier from file extension.

        Args:
            file_path: Path to the file

        Returns:
            Language identifier for syntax highlighting
        """
        extension = file_path.split(".")[-1].lower()

        language_map = {
            "py": "python",
            "js": "javascript",
            "ts": "typescript",
            "jsx": "jsx",
            "tsx": "tsx",
            "java": "java",
            "c": "c",
            "cpp": "cpp",
            "cc": "cpp",
            "h": "c",
            "hpp": "cpp",
            "rs": "rust",
            "go": "go",
            "rb": "ruby",
            "php": "php",
            "sh": "bash",
            "bash": "bash",
            "zsh": "bash",
            "fish": "bash",
            "ps1": "powershell",
            "html": "html",
            "css": "css",
            "scss": "scss",
            "sass": "sass",
            "less": "less",
            "json": "json",
            "yaml": "yaml",
            "yml": "yaml",
            "xml": "xml",
            "md": "markdown",
            "sql": "sql",
        }

        return language_map.get(extension, "text")
Functions
__init__(**kwargs) -> None

Initialize diff viewer.

Source code in src/git_autosquash/tui/widgets.py
def __init__(self, **kwargs) -> None:
    """Initialize diff viewer."""
    super().__init__(**kwargs)
    self._current_hunk: Optional[DiffHunk] = None
compose() -> ComposeResult

Compose the widget layout.

Source code in src/git_autosquash/tui/widgets.py
def compose(self) -> ComposeResult:
    """Compose the widget layout."""
    yield Static("Select a hunk to view diff", id="diff-content")
show_hunk(hunk: DiffHunk) -> None

Display diff content for a hunk.

Parameters:

Name Type Description Default
hunk DiffHunk

The hunk to display

required
Source code in src/git_autosquash/tui/widgets.py
def show_hunk(self, hunk: DiffHunk) -> None:
    """Display diff content for a hunk.

    Args:
        hunk: The hunk to display
    """
    self._current_hunk = hunk

    # Format diff content
    diff_lines = []
    for line in hunk.lines:
        diff_lines.append(line)

    diff_text = "\n".join(diff_lines)

    # Create syntax highlighted content
    try:
        # Use diff syntax highlighting regardless of file extension
        # since we're showing diff output, not the original file
        syntax = Syntax(diff_text, "diff", theme="monokai", line_numbers=False)
        content: Union[Syntax, Text] = syntax
    except (ImportError, ValueError, AttributeError):
        # Fallback to plain text if syntax highlighting fails or is unavailable
        content = Text(diff_text)

    # Update the display
    diff_widget = self.query_one("#diff-content", Static)
    diff_widget.update(content)
ProgressIndicator

Bases: Widget

Widget showing progress through hunk approvals.

Source code in src/git_autosquash/tui/widgets.py
class ProgressIndicator(Widget):
    """Widget showing progress through hunk approvals."""

    DEFAULT_CSS = """
    ProgressIndicator {
        height: 1;
        background: $panel;
        color: $text;
        text-align: center;
    }
    """

    def __init__(self, total_hunks: int, **kwargs) -> None:
        """Initialize progress indicator.

        Args:
            total_hunks: Total number of hunks to process
        """
        super().__init__(**kwargs)
        self.total_hunks = total_hunks
        self.approved_count = 0
        self.ignored_count = 0

    def compose(self) -> ComposeResult:
        """Compose the widget layout."""
        yield Static(self._format_progress(), id="progress-text")

    def update_progress(self, approved_count: int, ignored_count: int = 0) -> None:
        """Update the progress display.

        Args:
            approved_count: Number of hunks approved so far
            ignored_count: Number of hunks ignored so far
        """
        self.approved_count = approved_count
        self.ignored_count = ignored_count
        progress_widget = self.query_one("#progress-text", Static)
        progress_widget.update(self._format_progress())

    def _format_progress(self) -> str:
        """Format progress text."""
        total_processed = self.approved_count + self.ignored_count
        percentage = (
            (total_processed / self.total_hunks * 100) if self.total_hunks > 0 else 0
        )
        status_parts = []
        if self.approved_count > 0:
            status_parts.append(f"{self.approved_count} squash")
        if self.ignored_count > 0:
            status_parts.append(f"{self.ignored_count} ignore")

        if status_parts:
            status = f"({', '.join(status_parts)})"
        else:
            status = ""

        return f"Progress: {total_processed}/{self.total_hunks} selected {status} ({percentage:.0f}%)"
Functions
__init__(total_hunks: int, **kwargs) -> None

Initialize progress indicator.

Parameters:

Name Type Description Default
total_hunks int

Total number of hunks to process

required
Source code in src/git_autosquash/tui/widgets.py
def __init__(self, total_hunks: int, **kwargs) -> None:
    """Initialize progress indicator.

    Args:
        total_hunks: Total number of hunks to process
    """
    super().__init__(**kwargs)
    self.total_hunks = total_hunks
    self.approved_count = 0
    self.ignored_count = 0
compose() -> ComposeResult

Compose the widget layout.

Source code in src/git_autosquash/tui/widgets.py
def compose(self) -> ComposeResult:
    """Compose the widget layout."""
    yield Static(self._format_progress(), id="progress-text")
update_progress(approved_count: int, ignored_count: int = 0) -> None

Update the progress display.

Parameters:

Name Type Description Default
approved_count int

Number of hunks approved so far

required
ignored_count int

Number of hunks ignored so far

0
Source code in src/git_autosquash/tui/widgets.py
def update_progress(self, approved_count: int, ignored_count: int = 0) -> None:
    """Update the progress display.

    Args:
        approved_count: Number of hunks approved so far
        ignored_count: Number of hunks ignored so far
    """
    self.approved_count = approved_count
    self.ignored_count = ignored_count
    progress_widget = self.query_one("#progress-text", Static)
    progress_widget.update(self._format_progress())

Usage Examples

Basic API Usage

Here's how to use the core components programmatically:

from git_autosquash.git_ops import GitOps
from git_autosquash.hunk_parser import HunkParser
from git_autosquash.blame_analyzer import BlameAnalyzer

# Initialize components
git_ops = GitOps(".")
hunk_parser = HunkParser(git_ops)
blame_analyzer = BlameAnalyzer(git_ops, "main")

# Get diff hunks
hunks = hunk_parser.get_diff_hunks()

# Analyze hunks to find target commits
mappings = blame_analyzer.analyze_hunks(hunks)

# Print results
for mapping in mappings:
    print(f"{mapping.hunk.file_path}: {mapping.target_commit} ({mapping.confidence})")

Custom TUI Integration

from git_autosquash.tui.app import AutoSquashApp

# Create custom TUI with your mappings
app = AutoSquashApp(mappings)
approved = app.run()

if approved and app.approved_mappings:
    print(f"User approved {len(app.approved_mappings)} hunks")
    # Process approved mappings...

Rebase Execution

from git_autosquash.rebase_manager import RebaseManager, RebaseConflictError

try:
    rebase_manager = RebaseManager(git_ops, merge_base)
    success = rebase_manager.execute_squash(approved_mappings)

    if success:
        print("Squash operation completed successfully!")
    else:
        print("Operation was cancelled by user")

except RebaseConflictError as e:
    print(f"Conflicts in: {', '.join(e.conflicted_files)}")
    # Handle conflicts...

Type Definitions

Common Types

from typing import List, Dict, Optional, Set

# Confidence levels
ConfidenceLevel = Literal["high", "medium", "low"]

# Working tree status
WorkingTreeStatus = Dict[str, bool]  # {"is_clean": bool, "has_staged": bool, "has_unstaged": bool}

# Rebase status information
RebaseStatus = Dict[str, Any]  # {"in_progress": bool, "conflicted_files": List[str], ...}

Error Types

All components raise specific exception types for different error conditions:

  • GitOps: subprocess.SubprocessError for Git command failures
  • HunkParser: ValueError for parsing errors
  • BlameAnalyzer: subprocess.SubprocessError for blame failures
  • RebaseManager: RebaseConflictError for merge conflicts
  • TUI: Standard Textual exceptions for interface errors

Configuration Options

Environment Variables

git-autosquash respects these environment variables:

  • GIT_SEQUENCE_EDITOR: Custom editor for interactive rebase (automatically managed)
  • TERM: Terminal type for TUI rendering
  • NO_COLOR: Disable colored output when set

Git Configuration

The following Git configuration affects git-autosquash behavior:

  • core.editor: Default editor for conflict resolution
  • merge.tool: Merge tool for resolving conflicts
  • rebase.autoStash: Automatic stashing during rebase (overridden by git-autosquash)

Performance Considerations

Caching

Several operations are cached for performance:

  • Branch commits: Expensive git rev-list operations
  • Commit timestamps: git show calls for chronological ordering
  • Commit summaries: git log output for display

Memory Usage

  • Diff parsing: Streams large diffs to avoid memory issues
  • Blame analysis: Processes hunks individually to limit memory usage
  • TUI rendering: Efficient widget updates and syntax highlighting

Git Operation Optimization

  • Batch operations: Groups related Git commands where possible
  • Subprocess management: Proper timeout and resource cleanup
  • Working directory: Minimal file I/O and temporary file usage

Testing the API

The API components are extensively tested. To run tests for specific components:

# Test specific components
uv run pytest tests/test_git_ops.py -v
uv run pytest tests/test_hunk_parser.py -v
uv run pytest tests/test_blame_analyzer.py -v
uv run pytest tests/test_rebase_manager.py -v

# Test with coverage
uv run pytest --cov=git_autosquash tests/

Contributing to the API

When extending the API:

  1. Follow existing patterns: Maintain consistency with current design
  2. Add comprehensive docstrings: Use Google style docstrings
  3. Include type annotations: Full typing for all public methods
  4. Write tests: Unit tests with appropriate mocking
  5. Update documentation: This reference updates automatically from docstrings