銀河鉄道

Python|CSV差分比較ツールを作る

サムネイル
[Python]compares2 CSV files

README

# csv_diff

2つのCSVファイルを比較し、差分レポートをExcel + CSVとして出力するGUIツール。

## できること

- ファイル選択ダイアログで2つのCSVを選択
- ファイル名の日付パターン(YYYYMMDD)や更新日時から新旧を自動判定
- キー列の候補を一意性の高い順に提示し、ユーザーが選択
- 差分を **added** / **deleted** / **updated** の3種類で検出
- キー列に重複がある場合は警告の上、行レベル比較(added/deleted のみ)にフォールバック
- キー列の空文字は欠損扱い(空文字 `""` を無効値として除外。キー候補評価・重複判定の両方で除外され、updated の対応付けにも使われない)
- 指定した列を差分比較から除外できる(`EXCLUDE_COMPARE_COLUMNS`)
- 指定した列の updated を専用シート/ファイルに分割して出力(`SPLIT_COLUMNS`)
- UTF-8 / UTF-8 BOM / CP932 の自動判定読み込み(エンコーディングエラーのみフォールバック)
- 読み込み時にセル値の前後空白・全角スペースを自動で正規化(トリム)
- 2つのCSVで列構成が異なる場合、差分のある列名を表示して続行するか確認
- ターミナルに進捗ログを表示(旧/新ファイル名、件数、処理時間、保存先)
- 出力先ディレクトリを `config.py` で変更可能(デフォルトは `Output/`。`CSV_DIFF_BASEDIR` が設定されている場合はその配下)

## できないこと・制限事項

- **列の増減は差分レポートに含まれない** — 比較は両CSVの共通列のみで行われる。片方にしかない列は警告表示のみで、レポートには出力されない
- **重複キー時のセルレベル比較(updated)は不可** — キー列に重複がある場合、どの行同士を対応付けるか確定できないため、行単位の added/deleted のみ出力される
- **複合キーには未対応** — キー列は1列のみ指定可能。複数列の組み合わせをキーにすることはできない
- **大規模ファイルは環境依存** — 全データをメモリに読み込むため、行数・列数・文字列長と実行環境のメモリにより、処理が遅くなったりメモリ不足になる可能性がある
- **重複キーモードは特に高負荷** — 重複キーがある場合は行単位比較となり、全行をタプル化して Counter で差分を取るため、通常モードよりメモリ/CPU負荷が上がりやすい

## ディレクトリ構成

```
csv_diff/
├── __init__.py          # パッケージ初期化
├── __main__.py          # python -m csv_diff 用エントリポイント
├── app.py               # メインフロー(logging, RunStatus)
├── config.py            # 設定値(比較・出力等)
├── ui_tk.py             # tkinter GUI(ダイアログ類)
├── logic/               # ロジック層
│   ├── __init__.py      # re-export
│   ├── io_csv.py        # CSV読み込み・正規化
│   ├── keying.py        # 共通列・キー検証・キー候補
│   ├── old_new.py       # 新旧推定
│   └── report/          # レポート生成・出力
│       ├── __init__.py      # re-export
│       ├── diff_report.py   # 差分生成の本体
│       ├── report_processor.py  # シート分割処理
│       └── file_exporter.py     # Excel/CSV物理出力
└── tests/
    ├── __init__.py
    └── test_diff_logic.py
```

## 必要環境

- Python 3.9+
- 依存パッケージ: `pip install -r requirements.txt`
  - pandas(必須)
  - openpyxl(必須。Excel出力に使用)
- tkinter(標準ライブラリ。macOS / Windows は通常同梱)

## 使い方

### Windows(推奨)

プロジェクトルートの `run_csv_diff.bat` をダブルクリック。

- 正常終了時はターミナルが自動で閉じる
- キャンセル・エラー時はターミナルが開いたままになり、ログを確認できる

### コマンドライン

```bash
python -m csv_diff
```

### 操作手順

1. ファイル選択ダイアログが開くので、比較したいCSVを2つ選択
2. 新旧の判定結果を確認(入れ替え・キャンセル可能)
3. キー列を候補から選択、または直接入力
4. 重複キーがある場合は警告が表示され、続行するか選択
5. `Output/` に以下のファイルが出力される
   - `diff_report_YYYYMMDD_HHMMSS.xlsx` — Excel(SPLIT_COLUMNS ごとにシート分割)
   - `diff_report_YYYYMMDD_HHMMSS_<シート名>.csv` — シートごとのCSV
   - `diff_report_YYYYMMDD_HHMMSS_all_combined.csv` — 全差分をまとめたCSV
   - 差分が0件の場合はファイル出力をスキップする(ログのみ表示)

## 設定(config.py)

`csv_diff/config.py` で以下の設定を変更できる。

### 比較設定

| 設定値 | デフォルト | 説明 |
|--------|-----------|------|
| `EXCLUDE_COMPARE_COLUMNS` | `["Date"]` | 差分比較から除外する列名のリスト |
| `SPLIT_COLUMNS` | `["Status", "Source"]` | 専用シートに分割する列名のリスト |

### 出力先ディレクトリ

| 設定値 | 説明 |
|--------|------|
| `OUTPUT_DIR` | **優先**。絶対パスを指定すると、そのフォルダに出力する |
| `OUTPUT_DIR_NAME` | `OUTPUT_DIR` が空の場合のみ使用。`CSV_DIFF_BASEDIR` 環境変数があればその配下、未設定ならプロジェクトルート配下にこの名前のフォルダを作って出力する(デフォルト: `"Output"`) |

```python
# 例: デスクトップに出力する場合
OUTPUT_DIR = r"C:\Users\<USERNAME>\Desktop\csv_diff_output"

# 例: デフォルトのまま使う場合
OUTPUT_DIR = ""
```

### その他の設定

| 設定値 | デフォルト | 説明 |
|--------|-----------|------|
| `OUTPUT_BASE_NAME` | `"diff_report"` | 出力ファイル名のベース名 |
| `KEY_CANDIDATES_TOP_N` | `10` | キー候補の最大表示数 |
| `KEY_CANDIDATES_SAMPLE_SIZE` | `10_000` | キー候補判定時のサンプリング行数 |

## 出力形式

補足: `EXCLUDE_COMPARE_COLUMNS` は updated の比較対象から除外する設定。added/deleted(行の増減)は抑制しない。

| 列名 | 内容 |
|------|------|
| change_type | `added` / `deleted` / `updated` |
| (キー列名) | 対象行のキー値 |
| column | 変更された列名(added/deleted の場合は `__ROW__`) |
| old_value | 変更前の値(added の場合は空) |
| new_value | 変更後の値(deleted の場合は空) |

## テスト

```bash
# venv を作成して依存をインストール(初回のみ)
python -m venv .venv

# macOS / Linux
source .venv/bin/activate

# Windows
# .venv\Scripts\activate

pip install -r requirements.txt
pip install pytest

# テスト実行
python -m pytest csv_diff/tests/ -v
```

Core

config.py

# config.py
"""csv_diff パッケージの設定値を一箇所に集約するモジュール。
入力は CSV 専用。出力は Excel + CSV の両方を自動生成する。"""

# ---------- 比較設定 ----------

# 差分比較から除外する列名(例: システムが自動更新する日時やIDなど)
# 複数指定可能: ["Date", "UPDATE_TIME"]
EXCLUDE_COMPARE_COLUMNS = ["Date"]

# 列ごとにシートを分割したい列名のリスト
# ここに指定した列は専用シートになり、それ以外の更新(および added/deleted)は「その他」になります
SPLIT_COLUMNS = ["Status", "Source"]


# ---------- 出力系 ----------

# 出力先ディレクトリ
# OUTPUT_DIR(優先): 絶対パスを指定すると、そのフォルダに出力する
#   例: OUTPUT_DIR = "C:\Users\<USERNAME>\Desktop\csv_diff_output"
# OUTPUT_DIR_NAME:    OUTPUT_DIR が空の場合のみ使用される
#                     run_csv_diff.bat と同じ階層にこの名前のフォルダを作って出力する
OUTPUT_DIR = ""
OUTPUT_DIR_NAME = "Output"

# 出力ファイル名のベース名
OUTPUT_BASE_NAME = "diff_report"

# キー候補の最大表示数
KEY_CANDIDATES_TOP_N = 10

# suggest_key_candidates のサンプリング行数
KEY_CANDIDATES_SAMPLE_SIZE = 10_000


# ---------- 入力系(原則は CSV 専用のため変更不可) ----------
# このツールは CSV 入力専用。UI のファイルフィルタとロジックの拡張子チェックに使用される。
INPUT_EXT = ".csv"
FILE_TYPES = [("CSV files", f"*{INPUT_EXT}")]

app.py

# app.py
# ツール全体のメインフローを制御するモジュール

from __future__ import annotations

import logging
import os
import sys
import time
from datetime import datetime
from enum import Enum, auto
from pathlib import Path

from . import ui_tk
from .config import (
    INPUT_EXT,
    FILE_TYPES,
    OUTPUT_DIR,
    OUTPUT_DIR_NAME,
    OUTPUT_BASE_NAME,
    KEY_CANDIDATES_TOP_N,
    KEY_CANDIDATES_SAMPLE_SIZE,
    EXCLUDE_COMPARE_COLUMNS,
)
from .logic import (
    read_csv_safely,
    get_common_columns,
    suggest_key_candidates,
    validate_key_column,
    build_diff_report,
    infer_old_new,
    split_by_config,
    export_reports,
)

logger = logging.getLogger(__name__)


class RunStatus(Enum):
    """_run() の終了理由"""
    SUCCESS = auto()    # 正常終了
    CANCELLED = auto()  # ユーザーキャンセル/中断
    ERROR = auto()      # 異常終了


# 拡張子バリデーション
def _validate_csv_paths(paths: list[str]) -> list[str]:
    bad = []
    for p in paths:
        pp = Path(p)
        if pp.suffix.lower() != INPUT_EXT:
            bad.append(f"{pp.name} ({pp.suffix or '拡張子なし'})")
    return bad


# 経過時間のフォーマット
def format_elapsed(seconds: float) -> str:
    s = int(seconds)
    m = s // 60
    r = s % 60
    return f"{m}分 {r}秒" if m > 0 else f"{r}秒"


# 出力先ディレクトリの解決
# 優先順位: OUTPUT_DIR(明示設定) > CSV_DIFF_BASEDIR(bat基準) > プロジェクトルート
def _resolve_output_dir() -> Path:
    if OUTPUT_DIR.strip():
        return Path(OUTPUT_DIR)

    base = os.environ.get("CSV_DIFF_BASEDIR")
    if base:
        return Path(base) / OUTPUT_DIR_NAME

    return Path(__file__).resolve().parent.parent / OUTPUT_DIR_NAME


# エントリポイント
def main() -> None:
    logging.basicConfig(level=logging.INFO, format="%(message)s")

    output_dir = _resolve_output_dir()

    status: RunStatus = RunStatus.ERROR
    try:
        status = _run(output_dir)
    finally:
        ui_tk.cleanup()

    if status == RunStatus.ERROR:
        sys.exit(1)
    if status == RunStatus.CANCELLED:
        logger.info("キャンセルされました。終了します。")
        sys.exit(2)


# メインフローの実行
def _run(output_dir: Path) -> RunStatus:
    # 1) select files
    logger.info("=" * 40)
    logger.info("   CSV Diff Tool Started...")
    logger.info("=" * 40)
    while True:
        paths = ui_tk.select_csv_files(filetypes=FILE_TYPES)
        if not paths:
            return RunStatus.CANCELLED
        if len(paths) != 2:
            ui_tk.show_error("CSVは必ず2つ選択してください。")
            continue
        # 1.5) validate extension
        bad = _validate_csv_paths(paths)
        if bad:
            ui_tk.show_error("CSVファイルのみ対応しています。\n\n選択されたファイル:\n" + "\n".join(bad))
            continue
        break

    # 2) infer old/new
    old_path, new_path = infer_old_new(paths)
    logger.info("* 推定 旧: %s", Path(old_path).name)
    logger.info("* 推定 新: %s", Path(new_path).name)

    answer = ui_tk.ask_confirm_old_new(old_path, new_path)
    if answer is None:
        return RunStatus.CANCELLED
    if answer is False:
        old_path, new_path = new_path, old_path
        logger.info("入れ替えました")

    logger.info("確定 旧: %s", Path(old_path).name)
    logger.info("確定 新: %s", Path(new_path).name)

    # 3) read
    start = time.time()
    logger.info("データを読み込んでいます...")
    try:
        old_df = read_csv_safely(old_path)
        new_df = read_csv_safely(new_path)
    except Exception as e:
        ui_tk.show_error(f"CSVの読み込みに失敗しました。\n\n{e}")
        return RunStatus.ERROR

    # 4) common columns
    common_cols = get_common_columns(old_df, new_df)
    if not common_cols:
        ui_tk.show_error("両方のCSVに共通する列がありません。")
        return RunStatus.ERROR

    # 4.5) warn if columns differ
    added_cols = sorted(set(new_df.columns) - set(old_df.columns))
    deleted_cols = sorted(set(old_df.columns) - set(new_df.columns))
    if added_cols or deleted_cols:
        if not ui_tk.ask_column_diff_proceed(added_cols, deleted_cols):
            return RunStatus.CANCELLED

    # 5) suggest key candidates
    candidates = suggest_key_candidates(
        old_df, new_df, common_cols,
        top_n=KEY_CANDIDATES_TOP_N,
        sample_size=KEY_CANDIDATES_SAMPLE_SIZE,
    )

    # 6) user selects key
    key_col = ui_tk.select_key_variable(candidates=candidates, common_columns=common_cols)
    if not key_col:
        return RunStatus.CANCELLED

    # 7) validate key column for duplicates
    has_duplicates, dup_warning = validate_key_column(old_df, new_df, key_col, common_cols)
    if has_duplicates and dup_warning is not None:
        if not ui_tk.ask_duplicate_key_proceed(dup_warning.message):
            return RunStatus.CANCELLED

    # 8) diff
    try:
        result = build_diff_report(
            old_df=old_df,
            new_df=new_df,
            key_col=key_col,
            common_cols=common_cols,
            has_duplicates=has_duplicates,
            exclude_cols=EXCLUDE_COMPARE_COLUMNS
        )
    except Exception as e:
        ui_tk.show_error(f"差分処理でエラーが発生しました。\n\n{e}")
        return RunStatus.ERROR

    logger.info("比較完了: 合計 %d 件の差分", result.diff_count)

    # 9) output
    # 出力フォルダの作成
    try:
        output_dir.mkdir(parents=True, exist_ok=True)
    except Exception as e:
        ui_tk.show_error(f"出力フォルダの作成に失敗しました。\n\n{e}")
        return RunStatus.ERROR

    timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
    base_filename = f"{OUTPUT_BASE_NAME}_{timestamp}"

    try:
        # report_processor によるデータの分割
        report_dict = split_by_config(result.report)

        # file_exporter による物理出力(Excel & CSV)
        export_reports(
            report_dict=report_dict,
            output_dir=str(output_dir),
            base_name=base_filename
        )
    except Exception as e:
        ui_tk.show_error(f"ファイル出力に失敗しました。\n\n{e}")
        return RunStatus.ERROR

    elapsed = time.time() - start
    logger.info("処理時間: %s", format_elapsed(elapsed))
    logger.info("保存完了 -> %s", output_dir)

    ui_tk.show_complete(result.diff_count, format_elapsed(elapsed), str(output_dir))
    return RunStatus.SUCCESS

ui_tk.py

# ui_tk.py
from __future__ import annotations

import tkinter as tk
from tkinter import filedialog, messagebox, ttk


# ---------- Tk root singleton ----------
# Tk() を複数回呼ぶとダイアログが表示されない・フリーズ・TclError 等が起きる
# (特に Windows で再現しやすく原因特定が困難)。
# そのため Tk root はシングルトンで管理し、全ダイアログは Toplevel で作る。

_root: tk.Tk | None = None


def _get_root() -> tk.Tk:
    """Return (or create) the single hidden Tk root window."""
    global _root
    if _root is None or not _root.winfo_exists():
        _root = tk.Tk()
        _root.withdraw()
    return _root


def cleanup() -> None:
    """Destroy the Tk root. Call once at program exit."""
    global _root
    if _root is not None:
        try:
            _root.destroy()
        except tk.TclError:
            pass
        _root = None


# ---------- Dialogs ----------

def select_csv_files(
    filetypes: list[tuple[str, str]] | None = None,
) -> list[str]:
    """Returns list of selected file paths (may be empty)."""
    if filetypes is None:
        filetypes = [("CSV files", "*.csv")]
    root = _get_root()
    paths = filedialog.askopenfilenames(
        parent=root,
        title="比較するCSVを2つ選択してください(Ctrl+クリックで複数選択)",
        filetypes=filetypes,
    )
    return list(paths)


def ask_confirm_old_new(old_path: str, new_path: str) -> bool | None:
    """
    True  = そのまま (Yes)
    False = 入れ替える (No)
    None  = 中断 (Cancel)
    """
    root = _get_root()

    # askyesnocancel を使用
    result = messagebox.askyesnocancel(
        "確認",
        "ファイルの新旧を確認してください。\n\n"
        f"旧: {old_path}\n"
        f"新: {new_path}\n\n"
        "この並びでOKなら「Yes」\n"
        "入れ替えるなら「No」\n"
        "中止するなら「Cancel」",
        parent=root,
    )
    return result


def ask_duplicate_key_proceed(warning_message: str) -> bool:
    """Show duplicate key warning and ask whether to proceed."""
    root = _get_root()
    ok = messagebox.askyesno(
        "重複キー警告",
        warning_message + "\n\nこのまま続行しますか?",
        parent=root,
    )
    return bool(ok)


def ask_column_diff_proceed(
    added_cols: list[str],
    deleted_cols: list[str],
) -> bool:
    """Show column difference warning and ask whether to proceed with common columns only."""
    root = _get_root()
    parts = ["旧ファイルと新ファイルで列構成が異なります。\n"]
    if added_cols:
        parts.append(f"新ファイルのみに存在する列 ({len(added_cols)}件):\n  {', '.join(added_cols)}\n")
    if deleted_cols:
        parts.append(f"旧ファイルのみに存在する列 ({len(deleted_cols)}件):\n  {', '.join(deleted_cols)}\n")
    parts.append("共通列のみで比較を続行しますか?")
    ok = messagebox.askyesno("列構成の差異", "\n".join(parts), parent=root)
    return bool(ok)


def select_key_variable(candidates: list[str], common_columns: list[str]) -> str | None:
    """
    UI:
        - buttons for candidates
        - manual entry with validation against common_columns
    Returns selected key column name or None (cancel).
    Uses Toplevel window instead of creating a new Tk root.
    """
    root = _get_root()
    selected: dict[str, str | None] = {"value": None}

    win = tk.Toplevel(root)
    win.title("キー列の選択")
    win.protocol("WM_DELETE_WINDOW", lambda: _safe_close(win))

    def finalize(val: str | None) -> None:
        if not val or not str(val).strip():
            return

        target = str(val).strip()

        if target not in common_columns:
            messagebox.showwarning(
                "入力エラー",
                f"列名 '{target}' は両方のCSVに存在しません。\n正しい列名を入力してください。",
                parent=win,
            )
            return

        selected["value"] = target
        _safe_close(win)

    # headline
    ttk.Label(
        win,
        text="キーとなる列を選択するか、直接入力してください。",
        padding=(10, 10),
    ).pack()

    # candidates
    if candidates:
        frame_btn = ttk.LabelFrame(win, text="候補から選択", padding=(10, 10))
        frame_btn.pack(padx=20, pady=5, fill="x")

        for col in candidates:
            ttk.Button(
                frame_btn,
                text=col,
                command=lambda c=col: finalize(c),
            ).pack(pady=2, fill="x")

    # manual entry
    frame_in = ttk.LabelFrame(win, text="直接入力(候補にない場合)", padding=(10, 10))
    frame_in.pack(padx=20, pady=10, fill="x")

    entry_var = tk.StringVar()
    entry = ttk.Entry(frame_in, textvariable=entry_var)
    entry.pack(side="left", padx=5, expand=True, fill="x")

    ttk.Button(frame_in, text="決定", command=lambda: finalize(entry_var.get())).pack(side="right")

    entry.bind("<Return>", lambda e: finalize(entry_var.get()))

    # cancel
    ttk.Button(
        win,
        text="キャンセル",
        command=lambda: _safe_close(win),
    ).pack(pady=10)

    # ウィンドウを画面中央に配置
    win.update_idletasks()
    w = win.winfo_width()
    h = win.winfo_height()
    sw = win.winfo_screenwidth()
    sh = win.winfo_screenheight()
    win.geometry(f"+{(sw - w) // 2}+{(sh - h) // 2}")

    # focus
    entry.focus_set()
    win.grab_set()
    root.wait_window(win)
    return selected["value"]


def _safe_close(win: tk.Toplevel) -> None:
    """Safely close a Toplevel window."""
    try:
        win.grab_release()
    except tk.TclError:
        pass
    try:
        win.destroy()
    except tk.TclError:
        pass


def show_error(msg: str) -> None:
    root = _get_root()
    messagebox.showerror("エラー", msg, parent=root)


def show_complete(diff_count: int, time_str: str, out_path: str) -> None:
    root = _get_root()
    messagebox.showinfo(
        "完了",
        "比較が完了しました。\n\n"
        f"差分件数: {diff_count} 件\n"
        f"処理時間: {time_str}\n"
        f"保存先:\n{out_path}",
        parent=root,
    )

logic

__init__.py

# logic/__init__.py
"""ロジック層の re-export。app.py からは from .logic import ... で使える。"""

from .io_csv import read_csv_safely
from .keying import (
    get_common_columns,
    suggest_key_candidates,
    validate_key_column,
    DuplicateKeyWarning,
)
from .old_new import infer_old_new
from .report import (
    build_diff_report,
    DiffResult,
    split_by_config,
    export_reports,
)

__all__ = [
    "read_csv_safely",
    "get_common_columns",
    "suggest_key_candidates",
    "validate_key_column",
    "DuplicateKeyWarning",
    "build_diff_report",
    "DiffResult",
    "infer_old_new",
    "split_by_config",
    "export_reports",
]

io_csv.py

# io_csv.py
"""CSV読み込みと正規化。"""
from __future__ import annotations

from pathlib import Path
import pandas as pd


def read_csv_safely(path: str | Path) -> pd.DataFrame:
    """
    Read CSV with fallback encodings. Always returns dtype=str DataFrame filled with "".
    Only encoding errors trigger the next fallback; other errors are raised immediately.
    """
    p = str(Path(path))
    last_err: UnicodeDecodeError | None = None
    for enc in ("utf-8-sig", "cp932", "utf-8"):
        try:
            df = pd.read_csv(p, dtype=str, encoding=enc)
            return _normalize_df(df)
        except UnicodeDecodeError as e:
            last_err = e
            continue
    raise ValueError(f"Failed to read CSV with supported encodings: {p}\n{last_err}")


def _normalize_df(df: pd.DataFrame) -> pd.DataFrame:
    """Fill NaN and strip whitespace from all cells."""
    df = df.fillna("")
    for c in df.columns:
        df[c] = df[c].astype(str).str.replace("\u3000", " ", regex=False).str.strip()
    return df

keying.py

# keying.py
"""共通列の特定、キー列の検証、キー候補の提案。"""
from __future__ import annotations

from dataclasses import dataclass, field

import pandas as pd


# ---------- Columns ----------

# It returns the column names that exist in both CSVs, sorted alphabetically.
def get_common_columns(old_df: pd.DataFrame, new_df: pd.DataFrame) -> list[str]:
    """Return intersection of columns (sorted)."""
    return sorted(set(old_df.columns) & set(new_df.columns))


# ---------- Key Validation ----------

# 警告情報を格納するデータクラス
@dataclass(frozen=True)
class DuplicateKeyWarning:
    """Warning returned when the chosen key column has duplicate values."""
    key_col: str
    old_dup_count: int
    new_dup_count: int
    old_examples: list[str] = field(default_factory=list)
    new_examples: list[str] = field(default_factory=list)

    @property
    def message(self) -> str:
        parts = [f"キー列 '{self.key_col}' に重複値があります。"]
        if self.old_dup_count > 0:
            parts.append(f"  旧ファイル: {self.old_dup_count} 件の重複キー (例: {', '.join(self.old_examples[:5])})")
        if self.new_dup_count > 0:
            parts.append(f"  新ファイル: {self.new_dup_count} 件の重複キー (例: {', '.join(self.new_examples[:5])})")
        parts.append("セルレベルの差分比較 (updated) は行えません。added/deleted のみ出力されます。")
        return "\n".join(parts)


def validate_key_column(
    old_df: pd.DataFrame,
    new_df: pd.DataFrame,
    key_col: str,
    common_cols: list[str],
) -> tuple[bool, DuplicateKeyWarning | None]:
    """
    Validate key column:
      - Must exist in common_cols (raises ValueError if not)
      - Checks for duplicates in both DataFrames
      - Empty strings are treated as missing, not as duplicates
    Returns (has_duplicates, warning).
      has_duplicates: True if key column has duplicate values.
      warning: DuplicateKeyWarning with details, or None if no duplicates.
    """
    if key_col not in common_cols:
        raise ValueError(f"Key column '{key_col}' must exist in BOTH CSVs.")

    # Exclude empty strings (treated as missing, consistent with suggest_key_candidates)
    old_keys = old_df[key_col][old_df[key_col] != ""]
    new_keys = new_df[key_col][new_df[key_col] != ""]

    old_dups = old_keys[old_keys.duplicated(keep=False)]
    new_dups = new_keys[new_keys.duplicated(keep=False)]

    old_dup_unique = old_dups.unique()
    new_dup_unique = new_dups.unique()

    if len(old_dup_unique) == 0 and len(new_dup_unique) == 0:
        return False, None

    warning = DuplicateKeyWarning(
        key_col=key_col,
        old_dup_count=len(old_dup_unique),
        new_dup_count=len(new_dup_unique),
        old_examples=list(old_dup_unique[:5]),
        new_examples=list(new_dup_unique[:5]),
    )
    return True, warning


# ---------- Key Suggestion ----------

def suggest_key_candidates(
    old_df: pd.DataFrame,
    new_df: pd.DataFrame,
    common_cols: list[str],
    top_n: int = 10,
    max_null_rate: float = 0.10,
    sample_size: int = 10_000,
) -> list[str]:
    """
    Suggest key candidates:
      - only common columns
      - low null/blank rate
      - high uniqueness ratio in BOTH files (use min of the two)
      - for large DataFrames (> sample_size rows), sample to estimate nunique
    """
    if old_df.empty or new_df.empty or not common_cols:
        return []

    # Sample if large
    old_s = old_df.sample(n=sample_size, random_state=42) if len(old_df) > sample_size else old_df
    new_s = new_df.sample(n=sample_size, random_state=42) if len(new_df) > sample_size else new_df

    candidates: list[tuple[str, float]] = []  # (col, uniq_ratio)

    for c in common_cols:
        o = old_s[c]
        n = new_s[c]

        null_rate = max((o == "").mean(), (n == "").mean())
        if null_rate > max_null_rate:
            continue

        denom = min(len(o), len(n))
        if denom <= 0:
            continue

        # 空文字を除外してユニーク数を算出(validate_key_column と同じ扱い)
        uniq_ratio = min(o[o != ""].nunique(), n[n != ""].nunique()) / denom
        candidates.append((c, uniq_ratio))

    candidates.sort(key=lambda x: x[1], reverse=True)
    return [c for c, _ in candidates[:top_n]]

old_new.py

# old_new.py
"""ファイル名の日付パターンや更新日時から新旧を推定する。"""
from __future__ import annotations

from pathlib import Path
import re

_DATE8 = re.compile(r"(\d{8})")


def infer_old_new(paths: list[str]) -> tuple[str, str]:
    """
    Decide (old, new) from two file paths.
    Priority:
      1) 8-digit date in filename (YYYYMMDD) if both have it and are different
      2) file modified time (older -> old)
      3) fallback: first is old
    """
    if len(paths) != 2:
        raise ValueError("infer_old_new expects exactly 2 paths")

    p1, p2 = map(Path, paths)

    m1 = _DATE8.search(p1.name)
    m2 = _DATE8.search(p2.name)
    if m1 and m2 and m1.group(1) != m2.group(1):
        return (str(p1), str(p2)) if m1.group(1) < m2.group(1) else (str(p2), str(p1))

    try:
        t1 = p1.stat().st_mtime
        t2 = p2.stat().st_mtime
        return (str(p1), str(p2)) if t1 <= t2 else (str(p2), str(p1))
    except Exception:
        return str(p1), str(p2)

logic/report

__init__.py

# logic/report/__init__.py

from .diff_report import build_diff_report, DiffResult
from .report_processor import split_by_config
from .file_exporter import export_reports

diff_report.py

# diff_report.py
"""差分レポート生成の本体。"""
from __future__ import annotations

from collections import Counter
from dataclasses import dataclass
import json

import pandas as pd


@dataclass(frozen=True)
class DiffResult:
    report: pd.DataFrame
    diff_count: int


def build_diff_report(
    old_df: pd.DataFrame,
    new_df: pd.DataFrame,
    key_col: str,
    common_cols: list[str],
    has_duplicates: bool,
    exclude_cols: list[str] | None = None,
) -> DiffResult:
    """
    Output columns: change_type, <key_col>, column, old_value, new_value
    exclude_cols に指定された列は updated の比較対象から除外する。
    """
    if key_col not in common_cols:
        raise ValueError(f"Key column '{key_col}' must exist in BOTH CSVs.")

    # 除外リストを小文字の set に変換しておく(正規化)
    exclude_lower = {c.lower() for c in (exclude_cols or [])}

    # 1. 共通列のみに絞り込む
    old_i = old_df[common_cols].copy()
    new_i = new_df[common_cols].copy()

    # 重複キーモードの場合は専用処理へ
    if has_duplicates:
        return _build_diff_report_dupkey(old_i, new_i, key_col, common_cols)

    # Caller guarantees uniqueness when has_duplicates is False
    old_nonblank = old_i[key_col][old_i[key_col] != ""]
    new_nonblank = new_i[key_col][new_i[key_col] != ""]
    old_dup = old_nonblank.duplicated().any()
    new_dup = new_nonblank.duplicated().any()
    if old_dup or new_dup:
        raise ValueError(
            f"has_duplicates=False but '{key_col}' contains duplicates "
            f"(old={old_dup}, new={new_dup}). "
            "Caller must set has_duplicates=True (validate_key_column result)."
        )

    # インデックス設定
    old_i = old_i.set_index(key_col, drop=False)
    new_i = new_i.set_index(key_col, drop=False)

    added_keys = new_i.index.difference(old_i.index)
    deleted_keys = old_i.index.difference(new_i.index)
    common_keys = old_i.index.intersection(new_i.index)

    # 比較対象の列(キーを除き、かつ除外リストに入っていないもの)
    compare_cols = [
        c for c in common_cols
        if c != key_col and c.strip().lower() not in exclude_lower
    ]

    rows: list[dict] = []

    # --- added ---
    for k in added_keys:
        snapshot = new_i.loc[k, common_cols].to_dict()
        rows.append({
            "change_type": "added",
            key_col: str(k),
            "column": "__ROW__",
            "old_value": "",
            "new_value": json.dumps(snapshot, ensure_ascii=False),
        })

    # --- deleted ---
    for k in deleted_keys:
        snapshot = old_i.loc[k, common_cols].to_dict()
        rows.append({
            "change_type": "deleted",
            key_col: str(k),
            "column": "__ROW__",
            "old_value": json.dumps(snapshot, ensure_ascii=False),
            "new_value": "",
        })

    # --- updated (cell diffs) ---
    if not common_keys.empty:
        old_common = old_i.loc[common_keys, compare_cols]
        new_common = new_i.loc[common_keys, compare_cols]

        diff = old_common.compare(new_common, keep_equal=False)

        if not diff.empty:
            stacked = diff.stack(level=0)

            def is_empty_like(v):
                return pd.isna(v) or v == ""

            def format_value(v):
                if pd.isna(v): return "<NA>"
                return "<EMPTY>" if str(v) == "" else str(v)

            for (k, col), row in stacked.iterrows():
                raw_old = row.get("self")
                raw_new = row.get("other")

                if is_empty_like(raw_old) and is_empty_like(raw_new):
                    continue

                rows.append({
                    "change_type": "updated",
                    key_col: str(k),
                    "column": str(col),
                    "old_value": format_value(raw_old),
                    "new_value": format_value(raw_new),
                })

    out_cols = ["change_type", key_col, "column", "old_value", "new_value"]
    report = pd.DataFrame(rows, columns=out_cols)
    if not report.empty:
        report = report.sort_values(["change_type", key_col, "column"], kind="stable")

    return DiffResult(report=report, diff_count=len(report))

def _build_diff_report_dupkey(
    old_df: pd.DataFrame,
    new_df: pd.DataFrame,
    key_col: str,
    common_cols: list[str],
) -> DiffResult:
    """重複キーがある場合のフォールバック(行単位の比較のみ)"""
    old_tuples = [tuple(r) for r in old_df.values]
    new_tuples = [tuple(r) for r in new_df.values]
    old_cnt, new_cnt = Counter(old_tuples), Counter(new_tuples)

    rows = []

    # added
    for tup, count in new_cnt.items():
        excess = count - old_cnt.get(tup, 0)
        if excess > 0:
            d = dict(zip(common_cols, tup))
            val = json.dumps(d, ensure_ascii=False)
            for _ in range(excess):
                rows.append({
                    "change_type": "added",
                    key_col: d.get(key_col, ""),
                    "column": "__ROW__",
                    "old_value": "",
                    "new_value": val,
                })

    # deleted
    for tup, count in old_cnt.items():
        excess = count - new_cnt.get(tup, 0)
        if excess > 0:
            d = dict(zip(common_cols, tup))
            val = json.dumps(d, ensure_ascii=False)
            for _ in range(excess):
                rows.append({
                    "change_type": "deleted",
                    key_col: d.get(key_col, ""),
                    "column": "__ROW__",
                    "old_value": val,
                    "new_value": "",
                })

    report = pd.DataFrame(rows, columns=["change_type", key_col, "column", "old_value", "new_value"])
    return DiffResult(report=report.sort_values(["change_type", key_col], kind="stable"), diff_count=len(report))

file_exporter.py

# file_exporter.py
from __future__ import annotations

import logging
import os
import re

import pandas as pd

logger = logging.getLogger(__name__)

# Excel シート名の禁止文字: \ / ? * [ ] :
_INVALID_SHEET_CHARS = re.compile(r'[\\/?*\[\]:]')

# Windows ファイル名の禁止文字: < > : " / \ | ? * と制御文字
_INVALID_FILE_CHARS = re.compile(r'[<>:"/\\|?*\x00-\x1f]')


def _sanitize_sheet_name(name: str) -> str:
    """Excel シート名として安全な文字列に変換する(最大31文字)。"""
    sanitized = _INVALID_SHEET_CHARS.sub("_", name).strip()
    if not sanitized:
        sanitized = "Sheet"
    return sanitized[:31]


def _sanitize_file_part(name: str) -> str:
    """ファイル名の一部として安全な文字列に変換する(Windows対応)。"""
    sanitized = _INVALID_FILE_CHARS.sub("_", name).strip().strip(".")
    return sanitized or "part"


def export_reports(report_dict: dict[str, pd.DataFrame], output_dir: str, base_name: str) -> None:
    """
    辞書形式のレポートデータを、Excel(シート分け)とCSV(ファイル分け)の両方で出力する。
    出力に失敗した場合は例外を上位に伝播させる。
    """
    if not report_dict:
        logger.info("No changes found. Skipping export.")
        return

    # 保存先のベースパス(例: Output/diff_report_20250216_123456)
    base_path = os.path.join(output_dir, base_name)

    # --- 1. Excel形式で保存(1ファイルにシート分け) ---
    xlsx_path = f"{base_path}.xlsx"
    try:
        with pd.ExcelWriter(xlsx_path, engine='openpyxl') as writer:
            for sheet_name, df in report_dict.items():
                df.to_excel(writer, sheet_name=_sanitize_sheet_name(sheet_name), index=False)
        logger.info("Excel report saved: %s", xlsx_path)
    except Exception:
        logger.exception("Failed to save Excel")
        raise

    # --- 2. CSV形式で保存(ファイルごとに分離) ---
    for suffix, df in report_dict.items():
        safe_suffix = _sanitize_file_part(suffix)
        csv_path = f"{base_path}_{safe_suffix}.csv"
        try:
            # 日本語Excelで開くことを想定して utf-8-sig (BOM付き)
            df.to_csv(csv_path, index=False, encoding="utf-8-sig")
            logger.info("CSV report saved: %s", csv_path)
        except Exception:
            logger.exception("Failed to save CSV (%s)", suffix)
            raise

    # --- 3. 全データが入ったマスターCSV ---
    master_csv_path = f"{base_path}_all_combined.csv"
    try:
        full_df = pd.concat(report_dict.values(), ignore_index=True)
        full_df.to_csv(master_csv_path, index=False, encoding="utf-8-sig")
        logger.info("Master CSV saved: %s", master_csv_path)
    except Exception:
        logger.exception("Failed to save master CSV")
        raise

report_processor.py

# report_processor.py
from __future__ import annotations

import pandas as pd
from ... import config

def split_by_config(report_df: pd.DataFrame) -> dict[str, pd.DataFrame]:
    """configの設定に基づき、DataFrameを分割する"""
    split_cols = config.SPLIT_COLUMNS or []
    result: dict[str, pd.DataFrame] = {}

    if report_df.empty:
        return result

    remaining = report_df.copy()

    # 1. 指定された各列について分割処理を行う
    for col in split_cols:
        col_norm = col.lower()

        # updated かつ 対象の列名に一致するものを抽出
        mask = (
            (remaining["change_type"] == "updated") &
            (remaining["column"].str.lower() == col_norm)
        )

        target = remaining[mask]

        if not target.empty:
            result[col] = target
            # 分割した分を「残り」から除外する
            remaining = remaining[~mask]

    # 2. すべてのループが終わった後、残ったデータ(added/deleted/その他のupdated)をOthersへ
    if not remaining.empty:
        result["Others"] = remaining

    return result

テスト

テストでやること

このテストファイルは、csv_diff ツールのロジック層(logic/ パッケージ)の各関数が正しく動作するかを検証するものです。

UIを介さずにロジック単体で正しく動くことを保証するためのテストで、app.py や tk_ui は対象外で、純粋なデータ処理部分だけを検証しています。

TestReadCsvSafely|CSV読み込みの堅牢性を確認

  • UTF-8 / UTF-8 BOM / CP932 のエンコーディング自動判定
  • dtype=str で先頭ゼロが消えないこと(”001″ → “001”)
  • 空セルが NaN ではなく “” に統一されること
  • 存在しないファイルでエラーが出ること

TestGetCommonColumns |2つのCSVに共通する列名の抽出を確認

  • 部分一致、完全不一致、完全一致の3パターン

TestSuggestKeyCandidates |キー列候補の自動提案ロジックを確認

  • 一意性が高い列(id)が上位に来ること
  • 空DataFrameで空リストを返すこと
  • 空白率が高い列が除外されること
  • sample_size パラメータでクラッシュしないこと

TestValidateKeyColumn|キー列の重複チェックを確認

  • 重複なし → (False, None)
  • 旧/新ファイルに重複あり → (True, DuplicateKeyWarning)
  • 存在しない列を指定 → ValueError

TestBuildDiffReport|差分レポート生成の正確性を確認

  • 行の追加(added)、削除(deleted)、セル変更(updated)の検出
  • 差分なしで diff_count == 0
  • 存在しないキーで ValueError
  • 重複キーモード(has_duplicates=True)で added/deleted のみ出力されること
  • 契約違反(has_duplicates=False なのに実際に重複あり)で ValueError

TestInferOldNew|新旧ファイルの自動判定を確認

  • ファイル名の日付パターン(20230101 vs 20230201)で判定
  • 日付が同じ場合は更新日時(mtime)にフォールバック
  • ファイル数が2でなければ ValueError

TestSplitByConfig | SPLIT_COLUMNS によるシート分割を確認

  • 設定列ごとのキー + Others が生成されること
  • 分割された行が Others に重複しないこと
  • 空 DataFrame → 空 dict
  • マッチなし → 全て Others
  • 分割後の合計行数が元と一致すること

  TestSanitizeSheetName | Excelシート名のサニタイズを確認

  • 禁止文字(\ / ? * [ ] :)が _ に置換されること
  • 31文字に切り詰められること
  • 空文字 → “Sheet” フォールバック

  TestSanitizeFilePart | Windowsファイル名のサニタイズを確認

  • 禁止文字(< > : ” / \ | ? *)が置換されること
  • 末尾ドット・制御文字の除去
  • 空文字やドットのみ → “part” フォールバック

  TestExportReports | ファイル出力の動作を確認

  • Excel + 分割CSV + マスターCSV が生成されること
  • マスターCSVに全分割の行が含まれること
  • 空 dict で何も出力しないこと
  • CSVファイル名の禁止文字がサニタイズされること

  TestNormalizeFullwidth | 全角スペースの正規化を確認

  • 全角スペース(\u3000)が除去されること
  • 半角・全角混在の空白が正規化されること

test_diff_logic.py
# tests/test_diff_logic.py
from __future__ import annotations

import os
from pathlib import Path

import pandas as pd
import pytest

from csv_diff.logic import (
    read_csv_safely,
    get_common_columns,
    suggest_key_candidates,
    validate_key_column,
    build_diff_report,
    infer_old_new,
    DuplicateKeyWarning,
    split_by_config,
    export_reports,
)
from csv_diff.logic.report.file_exporter import _sanitize_sheet_name, _sanitize_file_part


# ---------- Helpers ----------

def _write_csv(path: Path, content: str, encoding: str = "utf-8") -> None:
    path.write_text(content, encoding=encoding)


# ---------- read_csv_safely ----------

class TestReadCsvSafely:
    def test_utf8(self, tmp_path: Path):
        p = tmp_path / "utf8.csv"
        _write_csv(p, "id,name\n1,Alice\n2,Bob\n")
        df = read_csv_safely(p)
        assert list(df.columns) == ["id", "name"]
        assert len(df) == 2
        assert df["name"].iloc[0] == "Alice"

    def test_utf8_bom(self, tmp_path: Path):
        p = tmp_path / "bom.csv"
        _write_csv(p, "id,name\n1,テスト\n", encoding="utf-8-sig")
        df = read_csv_safely(p)
        assert "id" in df.columns

    def test_cp932(self, tmp_path: Path):
        p = tmp_path / "cp932.csv"
        _write_csv(p, "id,名前\n1,太郎\n", encoding="cp932")
        df = read_csv_safely(p)
        assert "名前" in df.columns
        assert df["名前"].iloc[0] == "太郎"

    def test_dtype_str(self, tmp_path: Path):
        p = tmp_path / "nums.csv"
        _write_csv(p, "id,val\n001,002\n")
        df = read_csv_safely(p)
        # dtype=str preserves leading zeros
        assert df["id"].iloc[0] == "001"

    def test_fillna(self, tmp_path: Path):
        p = tmp_path / "na.csv"
        _write_csv(p, "id,val\n1,\n2,x\n")
        df = read_csv_safely(p)
        assert df["val"].iloc[0] == ""

    def test_invalid_file(self, tmp_path: Path):
        p = tmp_path / "nope.csv"
        with pytest.raises(FileNotFoundError):
            read_csv_safely(p)


# ---------- get_common_columns ----------

class TestGetCommonColumns:
    def test_basic(self):
        old = pd.DataFrame({"a": [1], "b": [2], "c": [3]})
        new = pd.DataFrame({"b": [1], "c": [2], "d": [3]})
        assert get_common_columns(old, new) == ["b", "c"]

    def test_no_overlap(self):
        old = pd.DataFrame({"a": [1]})
        new = pd.DataFrame({"b": [1]})
        assert get_common_columns(old, new) == []

    def test_identical(self):
        df = pd.DataFrame({"x": [1], "y": [2]})
        assert get_common_columns(df, df) == ["x", "y"]


# ---------- suggest_key_candidates ----------

class TestSuggestKeyCandidates:
    def test_basic(self):
        old = pd.DataFrame({"id": ["1", "2", "3"], "name": ["a", "a", "b"]})
        new = pd.DataFrame({"id": ["1", "2", "3"], "name": ["a", "b", "c"]})
        common = ["id", "name"]
        result = suggest_key_candidates(old, new, common)
        # id should rank higher (more unique)
        assert result[0] == "id"

    def test_empty_df(self):
        old = pd.DataFrame({"id": pd.Series([], dtype=str)})
        new = pd.DataFrame({"id": pd.Series([], dtype=str)})
        assert suggest_key_candidates(old, new, ["id"]) == []

    def test_high_null_rate_excluded(self):
        old = pd.DataFrame({"id": ["1", "2", ""], "val": ["a", "b", "c"]})
        new = pd.DataFrame({"id": ["1", "", ""], "val": ["a", "b", "c"]})
        common = ["id", "val"]
        # id has > 10% blanks in new, should be excluded or ranked lower
        result = suggest_key_candidates(old, new, common, max_null_rate=0.10)
        # val should be included (no blanks) but id excluded (>10% blank in new)
        assert "val" in result

    def test_sample_size_param(self):
        # Just verify it doesn't crash with sample_size
        n = 100
        old = pd.DataFrame({"id": [str(i) for i in range(n)]})
        new = pd.DataFrame({"id": [str(i) for i in range(n)]})
        result = suggest_key_candidates(old, new, ["id"], sample_size=50)
        assert "id" in result


# ---------- validate_key_column ----------

class TestValidateKeyColumn:
    def test_no_duplicates(self):
        old = pd.DataFrame({"id": ["1", "2"], "v": ["a", "b"]})
        new = pd.DataFrame({"id": ["1", "2"], "v": ["c", "d"]})
        has_dup, warning = validate_key_column(old, new, "id", ["id", "v"])
        assert has_dup is False
        assert warning is None

    def test_duplicates_old(self):
        old = pd.DataFrame({"id": ["1", "1", "2"], "v": ["a", "b", "c"]})
        new = pd.DataFrame({"id": ["1", "2", "3"], "v": ["d", "e", "f"]})
        has_dup, warning = validate_key_column(old, new, "id", ["id", "v"])
        assert has_dup is True
        assert isinstance(warning, DuplicateKeyWarning)
        assert warning.old_dup_count > 0
        assert "重複" in warning.message

    def test_duplicates_new(self):
        old = pd.DataFrame({"id": ["1", "2"], "v": ["a", "b"]})
        new = pd.DataFrame({"id": ["1", "1"], "v": ["c", "d"]})
        has_dup, warning = validate_key_column(old, new, "id", ["id", "v"])
        assert has_dup is True
        assert isinstance(warning, DuplicateKeyWarning)
        assert warning.new_dup_count > 0

    def test_invalid_key_col(self):
        old = pd.DataFrame({"id": ["1"]})
        new = pd.DataFrame({"id": ["1"]})
        with pytest.raises(ValueError, match="must exist in BOTH"):
            validate_key_column(old, new, "nope", ["id"])


# ---------- build_diff_report ----------

class TestBuildDiffReport:
    def test_added(self):
        old = pd.DataFrame({"id": ["1"], "v": ["a"]})
        new = pd.DataFrame({"id": ["1", "2"], "v": ["a", "b"]})
        r = build_diff_report(old, new, "id", ["id", "v"], has_duplicates=False)
        added = r.report[r.report["change_type"] == "added"]
        assert len(added) == 1
        assert added.iloc[0]["id"] == "2"

    def test_deleted(self):
        old = pd.DataFrame({"id": ["1", "2"], "v": ["a", "b"]})
        new = pd.DataFrame({"id": ["1"], "v": ["a"]})
        r = build_diff_report(old, new, "id", ["id", "v"], has_duplicates=False)
        deleted = r.report[r.report["change_type"] == "deleted"]
        assert len(deleted) == 1
        assert deleted.iloc[0]["id"] == "2"

    def test_updated(self):
        old = pd.DataFrame({"id": ["1", "2"], "v": ["a", "b"]})
        new = pd.DataFrame({"id": ["1", "2"], "v": ["a", "X"]})
        r = build_diff_report(old, new, "id", ["id", "v"], has_duplicates=False)
        updated = r.report[r.report["change_type"] == "updated"]
        assert len(updated) == 1
        assert updated.iloc[0]["old_value"] == "b"
        assert updated.iloc[0]["new_value"] == "X"

    def test_no_diff(self):
        df = pd.DataFrame({"id": ["1", "2"], "v": ["a", "b"]})
        r = build_diff_report(df, df.copy(), "id", ["id", "v"], has_duplicates=False)
        assert r.diff_count == 0

    def test_invalid_key(self):
        df = pd.DataFrame({"id": ["1"]})
        with pytest.raises(ValueError):
            build_diff_report(df, df, "nope", ["id"], has_duplicates=False)

    def test_duplicate_keys_fallback(self):
        """With duplicate keys, should use row-level comparison instead of raising."""
        old = pd.DataFrame({"id": ["1", "1", "2"], "v": ["a", "b", "c"]})
        new = pd.DataFrame({"id": ["1", "1", "2"], "v": ["a", "b", "d"]})
        r = build_diff_report(old, new, "id", ["id", "v"], has_duplicates=True)
        # id=2 changed from c->d: should appear as 1 deleted + 1 added
        assert r.diff_count == 2
        types = set(r.report["change_type"])
        assert "added" in types
        assert "deleted" in types

    def test_duplicate_keys_added_only(self):
        """Duplicate keys: new has extra rows."""
        old = pd.DataFrame({"id": ["1", "1"], "v": ["a", "b"]})
        new = pd.DataFrame({"id": ["1", "1", "1"], "v": ["a", "b", "c"]})
        r = build_diff_report(old, new, "id", ["id", "v"], has_duplicates=True)
        added = r.report[r.report["change_type"] == "added"]
        assert len(added) == 1

    def test_contract_violation_raises(self):
        """has_duplicates=False with actual duplicates should raise ValueError."""
        old = pd.DataFrame({"id": ["1", "1"], "v": ["a", "b"]})
        new = pd.DataFrame({"id": ["1", "2"], "v": ["c", "d"]})
        with pytest.raises(ValueError, match="has_duplicates=False"):
            build_diff_report(old, new, "id", ["id", "v"], has_duplicates=False)


# ---------- infer_old_new ----------

class TestInferOldNew:
    def test_date_pattern(self, tmp_path: Path):
        p1 = tmp_path / "data_20230101.csv"
        p2 = tmp_path / "data_20230201.csv"
        p1.write_text("x")
        p2.write_text("x")
        old, new = infer_old_new([str(p1), str(p2)])
        assert old == str(p1)
        assert new == str(p2)

    def test_date_pattern_reversed(self, tmp_path: Path):
        p1 = tmp_path / "data_20230201.csv"
        p2 = tmp_path / "data_20230101.csv"
        p1.write_text("x")
        p2.write_text("x")
        old, new = infer_old_new([str(p1), str(p2)])
        assert old == str(p2)  # 20230101 is older
        assert new == str(p1)

    def test_mtime_fallback(self, tmp_path: Path):
        p1 = tmp_path / "a.csv"
        p2 = tmp_path / "b.csv"
        p1.write_text("x")
        # Set p1 to older mtime
        import time
        old_time = time.time() - 100
        os.utime(p1, (old_time, old_time))
        p2.write_text("x")
        old, new = infer_old_new([str(p1), str(p2)])
        assert old == str(p1)
        assert new == str(p2)

    def test_wrong_count(self):
        with pytest.raises(ValueError, match="exactly 2"):
            infer_old_new(["a.csv"])

    def test_same_date(self, tmp_path: Path):
        """Same date in both names -> falls through to mtime."""
        p1 = tmp_path / "data_20230101_a.csv"
        p2 = tmp_path / "data_20230101_b.csv"
        p1.write_text("x")
        p2.write_text("x")
        # Should not raise
        old, new = infer_old_new([str(p1), str(p2)])
        assert {old, new} == {str(p1), str(p2)}


# ---------- build_diff_report: exclude_cols ----------

class TestBuildDiffReportExcludeCols:
    def test_exclude_cols_hides_updated(self):
        """Excluded column changes should not appear in updated."""
        old = pd.DataFrame({"id": ["1"], "v": ["a"], "ts": ["old_ts"]})
        new = pd.DataFrame({"id": ["1"], "v": ["a"], "ts": ["new_ts"]})
        r = build_diff_report(
            old, new, "id", ["id", "v", "ts"],
            has_duplicates=False, exclude_cols=["ts"],
        )
        assert r.diff_count == 0

    def test_exclude_cols_case_insensitive(self):
        """Exclude matching should be case-insensitive."""
        old = pd.DataFrame({"id": ["1"], "Status": ["X"]})
        new = pd.DataFrame({"id": ["1"], "Status": ["Y"]})
        r = build_diff_report(
            old, new, "id", ["id", "Status"],
            has_duplicates=False, exclude_cols=["status"],
        )
        assert r.diff_count == 0

    def test_exclude_cols_does_not_affect_added_deleted(self):
        """Excluded cols should not suppress added/deleted rows."""
        old = pd.DataFrame({"id": ["1"], "ts": ["a"]})
        new = pd.DataFrame({"id": ["1", "2"], "ts": ["a", "b"]})
        r = build_diff_report(
            old, new, "id", ["id", "ts"],
            has_duplicates=False, exclude_cols=["ts"],
        )
        added = r.report[r.report["change_type"] == "added"]
        assert len(added) == 1

    def test_exclude_cols_none(self):
        """exclude_cols=None should behave the same as no exclusion."""
        old = pd.DataFrame({"id": ["1"], "v": ["a"]})
        new = pd.DataFrame({"id": ["1"], "v": ["b"]})
        r = build_diff_report(
            old, new, "id", ["id", "v"],
            has_duplicates=False, exclude_cols=None,
        )
        assert r.diff_count == 1


# ---------- split_by_config ----------

class TestSplitByConfig:
    def _make_report(self) -> pd.DataFrame:
        """Create a sample report DataFrame for testing."""
        return pd.DataFrame({
            "change_type": ["updated", "updated", "updated", "added", "deleted"],
            "id": ["1", "2", "3", "4", "5"],
            "column": ["Status", "Source", "other_col", "__ROW__", "__ROW__"],
            "old_value": ["a", "b", "c", "", "x"],
            "new_value": ["A", "B", "C", "y", ""],
        })

    def test_split_creates_expected_keys(self, monkeypatch):
        """Should create keys for each SPLIT_COLUMNS match + Others."""
        from csv_diff import config
        monkeypatch.setattr(config, "SPLIT_COLUMNS", ["Status", "Source"])
        result = split_by_config(self._make_report())
        assert "Status" in result
        assert "Source" in result
        assert "Others" in result

    def test_split_no_overlap(self, monkeypatch):
        """Split rows should not appear in Others."""
        from csv_diff import config
        monkeypatch.setattr(config, "SPLIT_COLUMNS", ["Status"])
        result = split_by_config(self._make_report())
        others_cols = result["Others"]["column"].tolist()
        assert "Status" not in [c.upper() for c in others_cols]

    def test_split_empty_df(self, monkeypatch):
        """Empty DataFrame should return empty dict."""
        from csv_diff import config
        monkeypatch.setattr(config, "SPLIT_COLUMNS", ["X"])
        result = split_by_config(pd.DataFrame())
        assert result == {}

    def test_split_no_match(self, monkeypatch):
        """If no SPLIT_COLUMNS match, everything goes to Others."""
        from csv_diff import config
        monkeypatch.setattr(config, "SPLIT_COLUMNS", ["NONEXISTENT"])
        result = split_by_config(self._make_report())
        assert list(result.keys()) == ["Others"]
        assert len(result["Others"]) == 5

    def test_split_preserves_total_rows(self, monkeypatch):
        """Total rows across all splits should equal the original."""
        from csv_diff import config
        monkeypatch.setattr(config, "SPLIT_COLUMNS", ["Status", "Source"])
        report = self._make_report()
        result = split_by_config(report)
        total = sum(len(df) for df in result.values())
        assert total == len(report)


# ---------- sanitize ----------

class TestSanitizeSheetName:
    def test_normal(self):
        assert _sanitize_sheet_name("Hello") == "Hello"

    def test_forbidden_chars(self):
        result = _sanitize_sheet_name("a\\b/c?d*e[f]g:h")
        assert "\\" not in result
        assert "/" not in result
        assert "?" not in result
        assert "*" not in result
        assert "[" not in result
        assert "]" not in result
        assert ":" not in result

    def test_max_length(self):
        long_name = "A" * 50
        assert len(_sanitize_sheet_name(long_name)) == 31

    def test_empty_fallback(self):
        assert _sanitize_sheet_name("") == "Sheet"

    def test_only_forbidden_chars(self):
        assert _sanitize_sheet_name("?*[]") == "____"


class TestSanitizeFilePart:
    def test_normal(self):
        assert _sanitize_file_part("hello") == "hello"

    def test_windows_forbidden_chars(self):
        result = _sanitize_file_part('a<b>c:d"e/f\\g|h?i*j')
        for ch in '<>:"/\\|?*':
            assert ch not in result

    def test_trailing_dots(self):
        assert _sanitize_file_part("name...") == "name"

    def test_empty_fallback(self):
        assert _sanitize_file_part("") == "part"

    def test_only_dots(self):
        assert _sanitize_file_part("...") == "part"

    def test_control_chars(self):
        result = _sanitize_file_part("a\x00b\x1fc")
        assert "\x00" not in result
        assert "\x1f" not in result


# ---------- export_reports ----------

class TestExportReports:
    def test_creates_xlsx_and_csvs(self, tmp_path: Path):
        """Should create Excel, per-split CSVs, and master CSV."""
        report_dict = {
            "Sheet1": pd.DataFrame({"a": [1, 2]}),
            "Sheet2": pd.DataFrame({"a": [3, 4]}),
        }
        export_reports(report_dict, str(tmp_path), "test_report")
        assert (tmp_path / "test_report.xlsx").exists()
        assert (tmp_path / "test_report_Sheet1.csv").exists()
        assert (tmp_path / "test_report_Sheet2.csv").exists()
        assert (tmp_path / "test_report_all_combined.csv").exists()

    def test_master_csv_has_all_rows(self, tmp_path: Path):
        """Master CSV should contain rows from all splits."""
        report_dict = {
            "A": pd.DataFrame({"x": ["1", "2"]}),
            "B": pd.DataFrame({"x": ["3"]}),
        }
        export_reports(report_dict, str(tmp_path), "out")
        master = pd.read_csv(tmp_path / "out_all_combined.csv", encoding="utf-8-sig")
        assert len(master) == 3

    def test_empty_dict_skips(self, tmp_path: Path):
        """Empty report_dict should not create any files."""
        export_reports({}, str(tmp_path), "empty")
        assert list(tmp_path.iterdir()) == []

    def test_sanitized_suffix_in_csv(self, tmp_path: Path):
        """Forbidden characters in key should be sanitized in CSV filename."""
        report_dict = {"a/b:c": pd.DataFrame({"x": [1]})}
        export_reports(report_dict, str(tmp_path), "test")
        # The forbidden chars should be replaced with _
        csv_files = [f.name for f in tmp_path.glob("test_*.csv") if "combined" not in f.name]
        assert len(csv_files) == 1
        for ch in '/:':
            assert ch not in csv_files[0]


# ---------- read_csv_safely: fullwidth space normalization ----------

class TestNormalizeFullwidth:
    def test_fullwidth_space_stripped(self, tmp_path: Path):
        """Full-width spaces (\\u3000) should be replaced and stripped."""
        p = tmp_path / "fw.csv"
        _write_csv(p, "id,val\n1,\u3000hello\u3000\n")
        df = read_csv_safely(p)
        assert df["val"].iloc[0] == "hello"

    def test_mixed_whitespace(self, tmp_path: Path):
        """Mixed half-width and full-width spaces should all be normalized."""
        p = tmp_path / "mix.csv"
        _write_csv(p, "id,val\n1, \u3000 test \u3000 \n")
        df = read_csv_safely(p)
        assert df["val"].iloc[0] == "test"

著者

author
月うさぎ

編集後記:
この記事の内容がベストではないかもしれません。

記事一覧