Python|CSV差分比較ツールを作る
作成日:2026-02-16
更新日:2026-02-17

README
# csv_diff
2つのCSVファイルを比較し、差分レポートをExcel + CSVとして出力するGUIツール。
## できること
- ファイル選択ダイアログで2つのCSVを選択
- ファイル名の日付パターン(YYYYMMDD)や更新日時から新旧を自動判定
- キー列の候補を一意性の高い順に提示し、ユーザーが選択
- 差分を **added** / **deleted** / **updated** の3種類で検出
- キー列に重複がある場合は警告の上、行レベル比較(added/deleted のみ)にフォールバック
- キー列の空文字は欠損扱い(空文字 `""` を無効値として除外。キー候補評価・重複判定の両方で除外され、updated の対応付けにも使われない)
- 指定した列を差分比較から除外できる(`EXCLUDE_COMPARE_COLUMNS`)
- 指定した列の updated を専用シート/ファイルに分割して出力(`SPLIT_COLUMNS`)
- UTF-8 / UTF-8 BOM / CP932 の自動判定読み込み(エンコーディングエラーのみフォールバック)
- 読み込み時にセル値の前後空白・全角スペースを自動で正規化(トリム)
- 2つのCSVで列構成が異なる場合、差分のある列名を表示して続行するか確認
- ターミナルに進捗ログを表示(旧/新ファイル名、件数、処理時間、保存先)
- 出力先ディレクトリを `config.py` で変更可能(デフォルトは `Output/`。`CSV_DIFF_BASEDIR` が設定されている場合はその配下)
## できないこと・制限事項
- **列の増減は差分レポートに含まれない** — 比較は両CSVの共通列のみで行われる。片方にしかない列は警告表示のみで、レポートには出力されない
- **重複キー時のセルレベル比較(updated)は不可** — キー列に重複がある場合、どの行同士を対応付けるか確定できないため、行単位の added/deleted のみ出力される
- **複合キーには未対応** — キー列は1列のみ指定可能。複数列の組み合わせをキーにすることはできない
- **大規模ファイルは環境依存** — 全データをメモリに読み込むため、行数・列数・文字列長と実行環境のメモリにより、処理が遅くなったりメモリ不足になる可能性がある
- **重複キーモードは特に高負荷** — 重複キーがある場合は行単位比較となり、全行をタプル化して Counter で差分を取るため、通常モードよりメモリ/CPU負荷が上がりやすい
## ディレクトリ構成
```
csv_diff/
├── __init__.py # パッケージ初期化
├── __main__.py # python -m csv_diff 用エントリポイント
├── app.py # メインフロー(logging, RunStatus)
├── config.py # 設定値(比較・出力等)
├── ui_tk.py # tkinter GUI(ダイアログ類)
├── logic/ # ロジック層
│ ├── __init__.py # re-export
│ ├── io_csv.py # CSV読み込み・正規化
│ ├── keying.py # 共通列・キー検証・キー候補
│ ├── old_new.py # 新旧推定
│ └── report/ # レポート生成・出力
│ ├── __init__.py # re-export
│ ├── diff_report.py # 差分生成の本体
│ ├── report_processor.py # シート分割処理
│ └── file_exporter.py # Excel/CSV物理出力
└── tests/
├── __init__.py
└── test_diff_logic.py
```
## 必要環境
- Python 3.9+
- 依存パッケージ: `pip install -r requirements.txt`
- pandas(必須)
- openpyxl(必須。Excel出力に使用)
- tkinter(標準ライブラリ。macOS / Windows は通常同梱)
## 使い方
### Windows(推奨)
プロジェクトルートの `run_csv_diff.bat` をダブルクリック。
- 正常終了時はターミナルが自動で閉じる
- キャンセル・エラー時はターミナルが開いたままになり、ログを確認できる
### コマンドライン
```bash
python -m csv_diff
```
### 操作手順
1. ファイル選択ダイアログが開くので、比較したいCSVを2つ選択
2. 新旧の判定結果を確認(入れ替え・キャンセル可能)
3. キー列を候補から選択、または直接入力
4. 重複キーがある場合は警告が表示され、続行するか選択
5. `Output/` に以下のファイルが出力される
- `diff_report_YYYYMMDD_HHMMSS.xlsx` — Excel(SPLIT_COLUMNS ごとにシート分割)
- `diff_report_YYYYMMDD_HHMMSS_<シート名>.csv` — シートごとのCSV
- `diff_report_YYYYMMDD_HHMMSS_all_combined.csv` — 全差分をまとめたCSV
- 差分が0件の場合はファイル出力をスキップする(ログのみ表示)
## 設定(config.py)
`csv_diff/config.py` で以下の設定を変更できる。
### 比較設定
| 設定値 | デフォルト | 説明 |
|--------|-----------|------|
| `EXCLUDE_COMPARE_COLUMNS` | `["Date"]` | 差分比較から除外する列名のリスト |
| `SPLIT_COLUMNS` | `["Status", "Source"]` | 専用シートに分割する列名のリスト |
### 出力先ディレクトリ
| 設定値 | 説明 |
|--------|------|
| `OUTPUT_DIR` | **優先**。絶対パスを指定すると、そのフォルダに出力する |
| `OUTPUT_DIR_NAME` | `OUTPUT_DIR` が空の場合のみ使用。`CSV_DIFF_BASEDIR` 環境変数があればその配下、未設定ならプロジェクトルート配下にこの名前のフォルダを作って出力する(デフォルト: `"Output"`) |
```python
# 例: デスクトップに出力する場合
OUTPUT_DIR = r"C:\Users\<USERNAME>\Desktop\csv_diff_output"
# 例: デフォルトのまま使う場合
OUTPUT_DIR = ""
```
### その他の設定
| 設定値 | デフォルト | 説明 |
|--------|-----------|------|
| `OUTPUT_BASE_NAME` | `"diff_report"` | 出力ファイル名のベース名 |
| `KEY_CANDIDATES_TOP_N` | `10` | キー候補の最大表示数 |
| `KEY_CANDIDATES_SAMPLE_SIZE` | `10_000` | キー候補判定時のサンプリング行数 |
## 出力形式
補足: `EXCLUDE_COMPARE_COLUMNS` は updated の比較対象から除外する設定。added/deleted(行の増減)は抑制しない。
| 列名 | 内容 |
|------|------|
| change_type | `added` / `deleted` / `updated` |
| (キー列名) | 対象行のキー値 |
| column | 変更された列名(added/deleted の場合は `__ROW__`) |
| old_value | 変更前の値(added の場合は空) |
| new_value | 変更後の値(deleted の場合は空) |
## テスト
```bash
# venv を作成して依存をインストール(初回のみ)
python -m venv .venv
# macOS / Linux
source .venv/bin/activate
# Windows
# .venv\Scripts\activate
pip install -r requirements.txt
pip install pytest
# テスト実行
python -m pytest csv_diff/tests/ -v
```
Core
config.py
# config.py
"""csv_diff パッケージの設定値を一箇所に集約するモジュール。
入力は CSV 専用。出力は Excel + CSV の両方を自動生成する。"""
# ---------- 比較設定 ----------
# 差分比較から除外する列名(例: システムが自動更新する日時やIDなど)
# 複数指定可能: ["Date", "UPDATE_TIME"]
EXCLUDE_COMPARE_COLUMNS = ["Date"]
# 列ごとにシートを分割したい列名のリスト
# ここに指定した列は専用シートになり、それ以外の更新(および added/deleted)は「その他」になります
SPLIT_COLUMNS = ["Status", "Source"]
# ---------- 出力系 ----------
# 出力先ディレクトリ
# OUTPUT_DIR(優先): 絶対パスを指定すると、そのフォルダに出力する
# 例: OUTPUT_DIR = "C:\Users\<USERNAME>\Desktop\csv_diff_output"
# OUTPUT_DIR_NAME: OUTPUT_DIR が空の場合のみ使用される
# run_csv_diff.bat と同じ階層にこの名前のフォルダを作って出力する
OUTPUT_DIR = ""
OUTPUT_DIR_NAME = "Output"
# 出力ファイル名のベース名
OUTPUT_BASE_NAME = "diff_report"
# キー候補の最大表示数
KEY_CANDIDATES_TOP_N = 10
# suggest_key_candidates のサンプリング行数
KEY_CANDIDATES_SAMPLE_SIZE = 10_000
# ---------- 入力系(原則は CSV 専用のため変更不可) ----------
# このツールは CSV 入力専用。UI のファイルフィルタとロジックの拡張子チェックに使用される。
INPUT_EXT = ".csv"
FILE_TYPES = [("CSV files", f"*{INPUT_EXT}")]app.py
# app.py
# ツール全体のメインフローを制御するモジュール
from __future__ import annotations
import logging
import os
import sys
import time
from datetime import datetime
from enum import Enum, auto
from pathlib import Path
from . import ui_tk
from .config import (
INPUT_EXT,
FILE_TYPES,
OUTPUT_DIR,
OUTPUT_DIR_NAME,
OUTPUT_BASE_NAME,
KEY_CANDIDATES_TOP_N,
KEY_CANDIDATES_SAMPLE_SIZE,
EXCLUDE_COMPARE_COLUMNS,
)
from .logic import (
read_csv_safely,
get_common_columns,
suggest_key_candidates,
validate_key_column,
build_diff_report,
infer_old_new,
split_by_config,
export_reports,
)
logger = logging.getLogger(__name__)
class RunStatus(Enum):
"""_run() の終了理由"""
SUCCESS = auto() # 正常終了
CANCELLED = auto() # ユーザーキャンセル/中断
ERROR = auto() # 異常終了
# 拡張子バリデーション
def _validate_csv_paths(paths: list[str]) -> list[str]:
bad = []
for p in paths:
pp = Path(p)
if pp.suffix.lower() != INPUT_EXT:
bad.append(f"{pp.name} ({pp.suffix or '拡張子なし'})")
return bad
# 経過時間のフォーマット
def format_elapsed(seconds: float) -> str:
s = int(seconds)
m = s // 60
r = s % 60
return f"{m}分 {r}秒" if m > 0 else f"{r}秒"
# 出力先ディレクトリの解決
# 優先順位: OUTPUT_DIR(明示設定) > CSV_DIFF_BASEDIR(bat基準) > プロジェクトルート
def _resolve_output_dir() -> Path:
if OUTPUT_DIR.strip():
return Path(OUTPUT_DIR)
base = os.environ.get("CSV_DIFF_BASEDIR")
if base:
return Path(base) / OUTPUT_DIR_NAME
return Path(__file__).resolve().parent.parent / OUTPUT_DIR_NAME
# エントリポイント
def main() -> None:
logging.basicConfig(level=logging.INFO, format="%(message)s")
output_dir = _resolve_output_dir()
status: RunStatus = RunStatus.ERROR
try:
status = _run(output_dir)
finally:
ui_tk.cleanup()
if status == RunStatus.ERROR:
sys.exit(1)
if status == RunStatus.CANCELLED:
logger.info("キャンセルされました。終了します。")
sys.exit(2)
# メインフローの実行
def _run(output_dir: Path) -> RunStatus:
# 1) select files
logger.info("=" * 40)
logger.info(" CSV Diff Tool Started...")
logger.info("=" * 40)
while True:
paths = ui_tk.select_csv_files(filetypes=FILE_TYPES)
if not paths:
return RunStatus.CANCELLED
if len(paths) != 2:
ui_tk.show_error("CSVは必ず2つ選択してください。")
continue
# 1.5) validate extension
bad = _validate_csv_paths(paths)
if bad:
ui_tk.show_error("CSVファイルのみ対応しています。\n\n選択されたファイル:\n" + "\n".join(bad))
continue
break
# 2) infer old/new
old_path, new_path = infer_old_new(paths)
logger.info("* 推定 旧: %s", Path(old_path).name)
logger.info("* 推定 新: %s", Path(new_path).name)
answer = ui_tk.ask_confirm_old_new(old_path, new_path)
if answer is None:
return RunStatus.CANCELLED
if answer is False:
old_path, new_path = new_path, old_path
logger.info("入れ替えました")
logger.info("確定 旧: %s", Path(old_path).name)
logger.info("確定 新: %s", Path(new_path).name)
# 3) read
start = time.time()
logger.info("データを読み込んでいます...")
try:
old_df = read_csv_safely(old_path)
new_df = read_csv_safely(new_path)
except Exception as e:
ui_tk.show_error(f"CSVの読み込みに失敗しました。\n\n{e}")
return RunStatus.ERROR
# 4) common columns
common_cols = get_common_columns(old_df, new_df)
if not common_cols:
ui_tk.show_error("両方のCSVに共通する列がありません。")
return RunStatus.ERROR
# 4.5) warn if columns differ
added_cols = sorted(set(new_df.columns) - set(old_df.columns))
deleted_cols = sorted(set(old_df.columns) - set(new_df.columns))
if added_cols or deleted_cols:
if not ui_tk.ask_column_diff_proceed(added_cols, deleted_cols):
return RunStatus.CANCELLED
# 5) suggest key candidates
candidates = suggest_key_candidates(
old_df, new_df, common_cols,
top_n=KEY_CANDIDATES_TOP_N,
sample_size=KEY_CANDIDATES_SAMPLE_SIZE,
)
# 6) user selects key
key_col = ui_tk.select_key_variable(candidates=candidates, common_columns=common_cols)
if not key_col:
return RunStatus.CANCELLED
# 7) validate key column for duplicates
has_duplicates, dup_warning = validate_key_column(old_df, new_df, key_col, common_cols)
if has_duplicates and dup_warning is not None:
if not ui_tk.ask_duplicate_key_proceed(dup_warning.message):
return RunStatus.CANCELLED
# 8) diff
try:
result = build_diff_report(
old_df=old_df,
new_df=new_df,
key_col=key_col,
common_cols=common_cols,
has_duplicates=has_duplicates,
exclude_cols=EXCLUDE_COMPARE_COLUMNS
)
except Exception as e:
ui_tk.show_error(f"差分処理でエラーが発生しました。\n\n{e}")
return RunStatus.ERROR
logger.info("比較完了: 合計 %d 件の差分", result.diff_count)
# 9) output
# 出力フォルダの作成
try:
output_dir.mkdir(parents=True, exist_ok=True)
except Exception as e:
ui_tk.show_error(f"出力フォルダの作成に失敗しました。\n\n{e}")
return RunStatus.ERROR
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
base_filename = f"{OUTPUT_BASE_NAME}_{timestamp}"
try:
# report_processor によるデータの分割
report_dict = split_by_config(result.report)
# file_exporter による物理出力(Excel & CSV)
export_reports(
report_dict=report_dict,
output_dir=str(output_dir),
base_name=base_filename
)
except Exception as e:
ui_tk.show_error(f"ファイル出力に失敗しました。\n\n{e}")
return RunStatus.ERROR
elapsed = time.time() - start
logger.info("処理時間: %s", format_elapsed(elapsed))
logger.info("保存完了 -> %s", output_dir)
ui_tk.show_complete(result.diff_count, format_elapsed(elapsed), str(output_dir))
return RunStatus.SUCCESSui_tk.py
# ui_tk.py
from __future__ import annotations
import tkinter as tk
from tkinter import filedialog, messagebox, ttk
# ---------- Tk root singleton ----------
# Tk() を複数回呼ぶとダイアログが表示されない・フリーズ・TclError 等が起きる
# (特に Windows で再現しやすく原因特定が困難)。
# そのため Tk root はシングルトンで管理し、全ダイアログは Toplevel で作る。
_root: tk.Tk | None = None
def _get_root() -> tk.Tk:
"""Return (or create) the single hidden Tk root window."""
global _root
if _root is None or not _root.winfo_exists():
_root = tk.Tk()
_root.withdraw()
return _root
def cleanup() -> None:
"""Destroy the Tk root. Call once at program exit."""
global _root
if _root is not None:
try:
_root.destroy()
except tk.TclError:
pass
_root = None
# ---------- Dialogs ----------
def select_csv_files(
filetypes: list[tuple[str, str]] | None = None,
) -> list[str]:
"""Returns list of selected file paths (may be empty)."""
if filetypes is None:
filetypes = [("CSV files", "*.csv")]
root = _get_root()
paths = filedialog.askopenfilenames(
parent=root,
title="比較するCSVを2つ選択してください(Ctrl+クリックで複数選択)",
filetypes=filetypes,
)
return list(paths)
def ask_confirm_old_new(old_path: str, new_path: str) -> bool | None:
"""
True = そのまま (Yes)
False = 入れ替える (No)
None = 中断 (Cancel)
"""
root = _get_root()
# askyesnocancel を使用
result = messagebox.askyesnocancel(
"確認",
"ファイルの新旧を確認してください。\n\n"
f"旧: {old_path}\n"
f"新: {new_path}\n\n"
"この並びでOKなら「Yes」\n"
"入れ替えるなら「No」\n"
"中止するなら「Cancel」",
parent=root,
)
return result
def ask_duplicate_key_proceed(warning_message: str) -> bool:
"""Show duplicate key warning and ask whether to proceed."""
root = _get_root()
ok = messagebox.askyesno(
"重複キー警告",
warning_message + "\n\nこのまま続行しますか?",
parent=root,
)
return bool(ok)
def ask_column_diff_proceed(
added_cols: list[str],
deleted_cols: list[str],
) -> bool:
"""Show column difference warning and ask whether to proceed with common columns only."""
root = _get_root()
parts = ["旧ファイルと新ファイルで列構成が異なります。\n"]
if added_cols:
parts.append(f"新ファイルのみに存在する列 ({len(added_cols)}件):\n {', '.join(added_cols)}\n")
if deleted_cols:
parts.append(f"旧ファイルのみに存在する列 ({len(deleted_cols)}件):\n {', '.join(deleted_cols)}\n")
parts.append("共通列のみで比較を続行しますか?")
ok = messagebox.askyesno("列構成の差異", "\n".join(parts), parent=root)
return bool(ok)
def select_key_variable(candidates: list[str], common_columns: list[str]) -> str | None:
"""
UI:
- buttons for candidates
- manual entry with validation against common_columns
Returns selected key column name or None (cancel).
Uses Toplevel window instead of creating a new Tk root.
"""
root = _get_root()
selected: dict[str, str | None] = {"value": None}
win = tk.Toplevel(root)
win.title("キー列の選択")
win.protocol("WM_DELETE_WINDOW", lambda: _safe_close(win))
def finalize(val: str | None) -> None:
if not val or not str(val).strip():
return
target = str(val).strip()
if target not in common_columns:
messagebox.showwarning(
"入力エラー",
f"列名 '{target}' は両方のCSVに存在しません。\n正しい列名を入力してください。",
parent=win,
)
return
selected["value"] = target
_safe_close(win)
# headline
ttk.Label(
win,
text="キーとなる列を選択するか、直接入力してください。",
padding=(10, 10),
).pack()
# candidates
if candidates:
frame_btn = ttk.LabelFrame(win, text="候補から選択", padding=(10, 10))
frame_btn.pack(padx=20, pady=5, fill="x")
for col in candidates:
ttk.Button(
frame_btn,
text=col,
command=lambda c=col: finalize(c),
).pack(pady=2, fill="x")
# manual entry
frame_in = ttk.LabelFrame(win, text="直接入力(候補にない場合)", padding=(10, 10))
frame_in.pack(padx=20, pady=10, fill="x")
entry_var = tk.StringVar()
entry = ttk.Entry(frame_in, textvariable=entry_var)
entry.pack(side="left", padx=5, expand=True, fill="x")
ttk.Button(frame_in, text="決定", command=lambda: finalize(entry_var.get())).pack(side="right")
entry.bind("<Return>", lambda e: finalize(entry_var.get()))
# cancel
ttk.Button(
win,
text="キャンセル",
command=lambda: _safe_close(win),
).pack(pady=10)
# ウィンドウを画面中央に配置
win.update_idletasks()
w = win.winfo_width()
h = win.winfo_height()
sw = win.winfo_screenwidth()
sh = win.winfo_screenheight()
win.geometry(f"+{(sw - w) // 2}+{(sh - h) // 2}")
# focus
entry.focus_set()
win.grab_set()
root.wait_window(win)
return selected["value"]
def _safe_close(win: tk.Toplevel) -> None:
"""Safely close a Toplevel window."""
try:
win.grab_release()
except tk.TclError:
pass
try:
win.destroy()
except tk.TclError:
pass
def show_error(msg: str) -> None:
root = _get_root()
messagebox.showerror("エラー", msg, parent=root)
def show_complete(diff_count: int, time_str: str, out_path: str) -> None:
root = _get_root()
messagebox.showinfo(
"完了",
"比較が完了しました。\n\n"
f"差分件数: {diff_count} 件\n"
f"処理時間: {time_str}\n"
f"保存先:\n{out_path}",
parent=root,
)logic
__init__.py
# logic/__init__.py
"""ロジック層の re-export。app.py からは from .logic import ... で使える。"""
from .io_csv import read_csv_safely
from .keying import (
get_common_columns,
suggest_key_candidates,
validate_key_column,
DuplicateKeyWarning,
)
from .old_new import infer_old_new
from .report import (
build_diff_report,
DiffResult,
split_by_config,
export_reports,
)
__all__ = [
"read_csv_safely",
"get_common_columns",
"suggest_key_candidates",
"validate_key_column",
"DuplicateKeyWarning",
"build_diff_report",
"DiffResult",
"infer_old_new",
"split_by_config",
"export_reports",
]io_csv.py
# io_csv.py
"""CSV読み込みと正規化。"""
from __future__ import annotations
from pathlib import Path
import pandas as pd
def read_csv_safely(path: str | Path) -> pd.DataFrame:
"""
Read CSV with fallback encodings. Always returns dtype=str DataFrame filled with "".
Only encoding errors trigger the next fallback; other errors are raised immediately.
"""
p = str(Path(path))
last_err: UnicodeDecodeError | None = None
for enc in ("utf-8-sig", "cp932", "utf-8"):
try:
df = pd.read_csv(p, dtype=str, encoding=enc)
return _normalize_df(df)
except UnicodeDecodeError as e:
last_err = e
continue
raise ValueError(f"Failed to read CSV with supported encodings: {p}\n{last_err}")
def _normalize_df(df: pd.DataFrame) -> pd.DataFrame:
"""Fill NaN and strip whitespace from all cells."""
df = df.fillna("")
for c in df.columns:
df[c] = df[c].astype(str).str.replace("\u3000", " ", regex=False).str.strip()
return dfkeying.py
# keying.py
"""共通列の特定、キー列の検証、キー候補の提案。"""
from __future__ import annotations
from dataclasses import dataclass, field
import pandas as pd
# ---------- Columns ----------
# It returns the column names that exist in both CSVs, sorted alphabetically.
def get_common_columns(old_df: pd.DataFrame, new_df: pd.DataFrame) -> list[str]:
"""Return intersection of columns (sorted)."""
return sorted(set(old_df.columns) & set(new_df.columns))
# ---------- Key Validation ----------
# 警告情報を格納するデータクラス
@dataclass(frozen=True)
class DuplicateKeyWarning:
"""Warning returned when the chosen key column has duplicate values."""
key_col: str
old_dup_count: int
new_dup_count: int
old_examples: list[str] = field(default_factory=list)
new_examples: list[str] = field(default_factory=list)
@property
def message(self) -> str:
parts = [f"キー列 '{self.key_col}' に重複値があります。"]
if self.old_dup_count > 0:
parts.append(f" 旧ファイル: {self.old_dup_count} 件の重複キー (例: {', '.join(self.old_examples[:5])})")
if self.new_dup_count > 0:
parts.append(f" 新ファイル: {self.new_dup_count} 件の重複キー (例: {', '.join(self.new_examples[:5])})")
parts.append("セルレベルの差分比較 (updated) は行えません。added/deleted のみ出力されます。")
return "\n".join(parts)
def validate_key_column(
old_df: pd.DataFrame,
new_df: pd.DataFrame,
key_col: str,
common_cols: list[str],
) -> tuple[bool, DuplicateKeyWarning | None]:
"""
Validate key column:
- Must exist in common_cols (raises ValueError if not)
- Checks for duplicates in both DataFrames
- Empty strings are treated as missing, not as duplicates
Returns (has_duplicates, warning).
has_duplicates: True if key column has duplicate values.
warning: DuplicateKeyWarning with details, or None if no duplicates.
"""
if key_col not in common_cols:
raise ValueError(f"Key column '{key_col}' must exist in BOTH CSVs.")
# Exclude empty strings (treated as missing, consistent with suggest_key_candidates)
old_keys = old_df[key_col][old_df[key_col] != ""]
new_keys = new_df[key_col][new_df[key_col] != ""]
old_dups = old_keys[old_keys.duplicated(keep=False)]
new_dups = new_keys[new_keys.duplicated(keep=False)]
old_dup_unique = old_dups.unique()
new_dup_unique = new_dups.unique()
if len(old_dup_unique) == 0 and len(new_dup_unique) == 0:
return False, None
warning = DuplicateKeyWarning(
key_col=key_col,
old_dup_count=len(old_dup_unique),
new_dup_count=len(new_dup_unique),
old_examples=list(old_dup_unique[:5]),
new_examples=list(new_dup_unique[:5]),
)
return True, warning
# ---------- Key Suggestion ----------
def suggest_key_candidates(
old_df: pd.DataFrame,
new_df: pd.DataFrame,
common_cols: list[str],
top_n: int = 10,
max_null_rate: float = 0.10,
sample_size: int = 10_000,
) -> list[str]:
"""
Suggest key candidates:
- only common columns
- low null/blank rate
- high uniqueness ratio in BOTH files (use min of the two)
- for large DataFrames (> sample_size rows), sample to estimate nunique
"""
if old_df.empty or new_df.empty or not common_cols:
return []
# Sample if large
old_s = old_df.sample(n=sample_size, random_state=42) if len(old_df) > sample_size else old_df
new_s = new_df.sample(n=sample_size, random_state=42) if len(new_df) > sample_size else new_df
candidates: list[tuple[str, float]] = [] # (col, uniq_ratio)
for c in common_cols:
o = old_s[c]
n = new_s[c]
null_rate = max((o == "").mean(), (n == "").mean())
if null_rate > max_null_rate:
continue
denom = min(len(o), len(n))
if denom <= 0:
continue
# 空文字を除外してユニーク数を算出(validate_key_column と同じ扱い)
uniq_ratio = min(o[o != ""].nunique(), n[n != ""].nunique()) / denom
candidates.append((c, uniq_ratio))
candidates.sort(key=lambda x: x[1], reverse=True)
return [c for c, _ in candidates[:top_n]]old_new.py
# old_new.py
"""ファイル名の日付パターンや更新日時から新旧を推定する。"""
from __future__ import annotations
from pathlib import Path
import re
_DATE8 = re.compile(r"(\d{8})")
def infer_old_new(paths: list[str]) -> tuple[str, str]:
"""
Decide (old, new) from two file paths.
Priority:
1) 8-digit date in filename (YYYYMMDD) if both have it and are different
2) file modified time (older -> old)
3) fallback: first is old
"""
if len(paths) != 2:
raise ValueError("infer_old_new expects exactly 2 paths")
p1, p2 = map(Path, paths)
m1 = _DATE8.search(p1.name)
m2 = _DATE8.search(p2.name)
if m1 and m2 and m1.group(1) != m2.group(1):
return (str(p1), str(p2)) if m1.group(1) < m2.group(1) else (str(p2), str(p1))
try:
t1 = p1.stat().st_mtime
t2 = p2.stat().st_mtime
return (str(p1), str(p2)) if t1 <= t2 else (str(p2), str(p1))
except Exception:
return str(p1), str(p2)logic/report
__init__.py
# logic/report/__init__.py
from .diff_report import build_diff_report, DiffResult
from .report_processor import split_by_config
from .file_exporter import export_reportsdiff_report.py
# diff_report.py
"""差分レポート生成の本体。"""
from __future__ import annotations
from collections import Counter
from dataclasses import dataclass
import json
import pandas as pd
@dataclass(frozen=True)
class DiffResult:
report: pd.DataFrame
diff_count: int
def build_diff_report(
old_df: pd.DataFrame,
new_df: pd.DataFrame,
key_col: str,
common_cols: list[str],
has_duplicates: bool,
exclude_cols: list[str] | None = None,
) -> DiffResult:
"""
Output columns: change_type, <key_col>, column, old_value, new_value
exclude_cols に指定された列は updated の比較対象から除外する。
"""
if key_col not in common_cols:
raise ValueError(f"Key column '{key_col}' must exist in BOTH CSVs.")
# 除外リストを小文字の set に変換しておく(正規化)
exclude_lower = {c.lower() for c in (exclude_cols or [])}
# 1. 共通列のみに絞り込む
old_i = old_df[common_cols].copy()
new_i = new_df[common_cols].copy()
# 重複キーモードの場合は専用処理へ
if has_duplicates:
return _build_diff_report_dupkey(old_i, new_i, key_col, common_cols)
# Caller guarantees uniqueness when has_duplicates is False
old_nonblank = old_i[key_col][old_i[key_col] != ""]
new_nonblank = new_i[key_col][new_i[key_col] != ""]
old_dup = old_nonblank.duplicated().any()
new_dup = new_nonblank.duplicated().any()
if old_dup or new_dup:
raise ValueError(
f"has_duplicates=False but '{key_col}' contains duplicates "
f"(old={old_dup}, new={new_dup}). "
"Caller must set has_duplicates=True (validate_key_column result)."
)
# インデックス設定
old_i = old_i.set_index(key_col, drop=False)
new_i = new_i.set_index(key_col, drop=False)
added_keys = new_i.index.difference(old_i.index)
deleted_keys = old_i.index.difference(new_i.index)
common_keys = old_i.index.intersection(new_i.index)
# 比較対象の列(キーを除き、かつ除外リストに入っていないもの)
compare_cols = [
c for c in common_cols
if c != key_col and c.strip().lower() not in exclude_lower
]
rows: list[dict] = []
# --- added ---
for k in added_keys:
snapshot = new_i.loc[k, common_cols].to_dict()
rows.append({
"change_type": "added",
key_col: str(k),
"column": "__ROW__",
"old_value": "",
"new_value": json.dumps(snapshot, ensure_ascii=False),
})
# --- deleted ---
for k in deleted_keys:
snapshot = old_i.loc[k, common_cols].to_dict()
rows.append({
"change_type": "deleted",
key_col: str(k),
"column": "__ROW__",
"old_value": json.dumps(snapshot, ensure_ascii=False),
"new_value": "",
})
# --- updated (cell diffs) ---
if not common_keys.empty:
old_common = old_i.loc[common_keys, compare_cols]
new_common = new_i.loc[common_keys, compare_cols]
diff = old_common.compare(new_common, keep_equal=False)
if not diff.empty:
stacked = diff.stack(level=0)
def is_empty_like(v):
return pd.isna(v) or v == ""
def format_value(v):
if pd.isna(v): return "<NA>"
return "<EMPTY>" if str(v) == "" else str(v)
for (k, col), row in stacked.iterrows():
raw_old = row.get("self")
raw_new = row.get("other")
if is_empty_like(raw_old) and is_empty_like(raw_new):
continue
rows.append({
"change_type": "updated",
key_col: str(k),
"column": str(col),
"old_value": format_value(raw_old),
"new_value": format_value(raw_new),
})
out_cols = ["change_type", key_col, "column", "old_value", "new_value"]
report = pd.DataFrame(rows, columns=out_cols)
if not report.empty:
report = report.sort_values(["change_type", key_col, "column"], kind="stable")
return DiffResult(report=report, diff_count=len(report))
def _build_diff_report_dupkey(
old_df: pd.DataFrame,
new_df: pd.DataFrame,
key_col: str,
common_cols: list[str],
) -> DiffResult:
"""重複キーがある場合のフォールバック(行単位の比較のみ)"""
old_tuples = [tuple(r) for r in old_df.values]
new_tuples = [tuple(r) for r in new_df.values]
old_cnt, new_cnt = Counter(old_tuples), Counter(new_tuples)
rows = []
# added
for tup, count in new_cnt.items():
excess = count - old_cnt.get(tup, 0)
if excess > 0:
d = dict(zip(common_cols, tup))
val = json.dumps(d, ensure_ascii=False)
for _ in range(excess):
rows.append({
"change_type": "added",
key_col: d.get(key_col, ""),
"column": "__ROW__",
"old_value": "",
"new_value": val,
})
# deleted
for tup, count in old_cnt.items():
excess = count - new_cnt.get(tup, 0)
if excess > 0:
d = dict(zip(common_cols, tup))
val = json.dumps(d, ensure_ascii=False)
for _ in range(excess):
rows.append({
"change_type": "deleted",
key_col: d.get(key_col, ""),
"column": "__ROW__",
"old_value": val,
"new_value": "",
})
report = pd.DataFrame(rows, columns=["change_type", key_col, "column", "old_value", "new_value"])
return DiffResult(report=report.sort_values(["change_type", key_col], kind="stable"), diff_count=len(report))file_exporter.py
# file_exporter.py
from __future__ import annotations
import logging
import os
import re
import pandas as pd
logger = logging.getLogger(__name__)
# Excel シート名の禁止文字: \ / ? * [ ] :
_INVALID_SHEET_CHARS = re.compile(r'[\\/?*\[\]:]')
# Windows ファイル名の禁止文字: < > : " / \ | ? * と制御文字
_INVALID_FILE_CHARS = re.compile(r'[<>:"/\\|?*\x00-\x1f]')
def _sanitize_sheet_name(name: str) -> str:
"""Excel シート名として安全な文字列に変換する(最大31文字)。"""
sanitized = _INVALID_SHEET_CHARS.sub("_", name).strip()
if not sanitized:
sanitized = "Sheet"
return sanitized[:31]
def _sanitize_file_part(name: str) -> str:
"""ファイル名の一部として安全な文字列に変換する(Windows対応)。"""
sanitized = _INVALID_FILE_CHARS.sub("_", name).strip().strip(".")
return sanitized or "part"
def export_reports(report_dict: dict[str, pd.DataFrame], output_dir: str, base_name: str) -> None:
"""
辞書形式のレポートデータを、Excel(シート分け)とCSV(ファイル分け)の両方で出力する。
出力に失敗した場合は例外を上位に伝播させる。
"""
if not report_dict:
logger.info("No changes found. Skipping export.")
return
# 保存先のベースパス(例: Output/diff_report_20250216_123456)
base_path = os.path.join(output_dir, base_name)
# --- 1. Excel形式で保存(1ファイルにシート分け) ---
xlsx_path = f"{base_path}.xlsx"
try:
with pd.ExcelWriter(xlsx_path, engine='openpyxl') as writer:
for sheet_name, df in report_dict.items():
df.to_excel(writer, sheet_name=_sanitize_sheet_name(sheet_name), index=False)
logger.info("Excel report saved: %s", xlsx_path)
except Exception:
logger.exception("Failed to save Excel")
raise
# --- 2. CSV形式で保存(ファイルごとに分離) ---
for suffix, df in report_dict.items():
safe_suffix = _sanitize_file_part(suffix)
csv_path = f"{base_path}_{safe_suffix}.csv"
try:
# 日本語Excelで開くことを想定して utf-8-sig (BOM付き)
df.to_csv(csv_path, index=False, encoding="utf-8-sig")
logger.info("CSV report saved: %s", csv_path)
except Exception:
logger.exception("Failed to save CSV (%s)", suffix)
raise
# --- 3. 全データが入ったマスターCSV ---
master_csv_path = f"{base_path}_all_combined.csv"
try:
full_df = pd.concat(report_dict.values(), ignore_index=True)
full_df.to_csv(master_csv_path, index=False, encoding="utf-8-sig")
logger.info("Master CSV saved: %s", master_csv_path)
except Exception:
logger.exception("Failed to save master CSV")
raise
report_processor.py
# report_processor.py
from __future__ import annotations
import pandas as pd
from ... import config
def split_by_config(report_df: pd.DataFrame) -> dict[str, pd.DataFrame]:
"""configの設定に基づき、DataFrameを分割する"""
split_cols = config.SPLIT_COLUMNS or []
result: dict[str, pd.DataFrame] = {}
if report_df.empty:
return result
remaining = report_df.copy()
# 1. 指定された各列について分割処理を行う
for col in split_cols:
col_norm = col.lower()
# updated かつ 対象の列名に一致するものを抽出
mask = (
(remaining["change_type"] == "updated") &
(remaining["column"].str.lower() == col_norm)
)
target = remaining[mask]
if not target.empty:
result[col] = target
# 分割した分を「残り」から除外する
remaining = remaining[~mask]
# 2. すべてのループが終わった後、残ったデータ(added/deleted/その他のupdated)をOthersへ
if not remaining.empty:
result["Others"] = remaining
return resultテスト
テストでやること
このテストファイルは、csv_diff ツールのロジック層(logic/ パッケージ)の各関数が正しく動作するかを検証するものです。
UIを介さずにロジック単体で正しく動くことを保証するためのテストで、app.py や tk_ui は対象外で、純粋なデータ処理部分だけを検証しています。
TestReadCsvSafely|CSV読み込みの堅牢性を確認
- UTF-8 / UTF-8 BOM / CP932 のエンコーディング自動判定
- dtype=str で先頭ゼロが消えないこと(”001″ → “001”)
- 空セルが NaN ではなく “” に統一されること
- 存在しないファイルでエラーが出ること
TestGetCommonColumns |2つのCSVに共通する列名の抽出を確認
- 部分一致、完全不一致、完全一致の3パターン
TestSuggestKeyCandidates |キー列候補の自動提案ロジックを確認
- 一意性が高い列(id)が上位に来ること
- 空DataFrameで空リストを返すこと
- 空白率が高い列が除外されること
- sample_size パラメータでクラッシュしないこと
TestValidateKeyColumn|キー列の重複チェックを確認
- 重複なし → (False, None)
- 旧/新ファイルに重複あり → (True, DuplicateKeyWarning)
- 存在しない列を指定 → ValueError
TestBuildDiffReport|差分レポート生成の正確性を確認
- 行の追加(added)、削除(deleted)、セル変更(updated)の検出
- 差分なしで diff_count == 0
- 存在しないキーで ValueError
- 重複キーモード(has_duplicates=True)で added/deleted のみ出力されること
- 契約違反(has_duplicates=False なのに実際に重複あり)で ValueError
TestInferOldNew|新旧ファイルの自動判定を確認
- ファイル名の日付パターン(20230101 vs 20230201)で判定
- 日付が同じ場合は更新日時(mtime)にフォールバック
- ファイル数が2でなければ ValueError
TestSplitByConfig | SPLIT_COLUMNS によるシート分割を確認
- 設定列ごとのキー + Others が生成されること
- 分割された行が Others に重複しないこと
- 空 DataFrame → 空 dict
- マッチなし → 全て Others
- 分割後の合計行数が元と一致すること
TestSanitizeSheetName | Excelシート名のサニタイズを確認
- 禁止文字(\ / ? * [ ] :)が _ に置換されること
- 31文字に切り詰められること
- 空文字 → “Sheet” フォールバック
TestSanitizeFilePart | Windowsファイル名のサニタイズを確認
- 禁止文字(< > : ” / \ | ? *)が置換されること
- 末尾ドット・制御文字の除去
- 空文字やドットのみ → “part” フォールバック
TestExportReports | ファイル出力の動作を確認
- Excel + 分割CSV + マスターCSV が生成されること
- マスターCSVに全分割の行が含まれること
- 空 dict で何も出力しないこと
- CSVファイル名の禁止文字がサニタイズされること
TestNormalizeFullwidth | 全角スペースの正規化を確認
- 全角スペース(\u3000)が除去されること
- 半角・全角混在の空白が正規化されること
test_diff_logic.py
# tests/test_diff_logic.py
from __future__ import annotations
import os
from pathlib import Path
import pandas as pd
import pytest
from csv_diff.logic import (
read_csv_safely,
get_common_columns,
suggest_key_candidates,
validate_key_column,
build_diff_report,
infer_old_new,
DuplicateKeyWarning,
split_by_config,
export_reports,
)
from csv_diff.logic.report.file_exporter import _sanitize_sheet_name, _sanitize_file_part
# ---------- Helpers ----------
def _write_csv(path: Path, content: str, encoding: str = "utf-8") -> None:
path.write_text(content, encoding=encoding)
# ---------- read_csv_safely ----------
class TestReadCsvSafely:
def test_utf8(self, tmp_path: Path):
p = tmp_path / "utf8.csv"
_write_csv(p, "id,name\n1,Alice\n2,Bob\n")
df = read_csv_safely(p)
assert list(df.columns) == ["id", "name"]
assert len(df) == 2
assert df["name"].iloc[0] == "Alice"
def test_utf8_bom(self, tmp_path: Path):
p = tmp_path / "bom.csv"
_write_csv(p, "id,name\n1,テスト\n", encoding="utf-8-sig")
df = read_csv_safely(p)
assert "id" in df.columns
def test_cp932(self, tmp_path: Path):
p = tmp_path / "cp932.csv"
_write_csv(p, "id,名前\n1,太郎\n", encoding="cp932")
df = read_csv_safely(p)
assert "名前" in df.columns
assert df["名前"].iloc[0] == "太郎"
def test_dtype_str(self, tmp_path: Path):
p = tmp_path / "nums.csv"
_write_csv(p, "id,val\n001,002\n")
df = read_csv_safely(p)
# dtype=str preserves leading zeros
assert df["id"].iloc[0] == "001"
def test_fillna(self, tmp_path: Path):
p = tmp_path / "na.csv"
_write_csv(p, "id,val\n1,\n2,x\n")
df = read_csv_safely(p)
assert df["val"].iloc[0] == ""
def test_invalid_file(self, tmp_path: Path):
p = tmp_path / "nope.csv"
with pytest.raises(FileNotFoundError):
read_csv_safely(p)
# ---------- get_common_columns ----------
class TestGetCommonColumns:
def test_basic(self):
old = pd.DataFrame({"a": [1], "b": [2], "c": [3]})
new = pd.DataFrame({"b": [1], "c": [2], "d": [3]})
assert get_common_columns(old, new) == ["b", "c"]
def test_no_overlap(self):
old = pd.DataFrame({"a": [1]})
new = pd.DataFrame({"b": [1]})
assert get_common_columns(old, new) == []
def test_identical(self):
df = pd.DataFrame({"x": [1], "y": [2]})
assert get_common_columns(df, df) == ["x", "y"]
# ---------- suggest_key_candidates ----------
class TestSuggestKeyCandidates:
def test_basic(self):
old = pd.DataFrame({"id": ["1", "2", "3"], "name": ["a", "a", "b"]})
new = pd.DataFrame({"id": ["1", "2", "3"], "name": ["a", "b", "c"]})
common = ["id", "name"]
result = suggest_key_candidates(old, new, common)
# id should rank higher (more unique)
assert result[0] == "id"
def test_empty_df(self):
old = pd.DataFrame({"id": pd.Series([], dtype=str)})
new = pd.DataFrame({"id": pd.Series([], dtype=str)})
assert suggest_key_candidates(old, new, ["id"]) == []
def test_high_null_rate_excluded(self):
old = pd.DataFrame({"id": ["1", "2", ""], "val": ["a", "b", "c"]})
new = pd.DataFrame({"id": ["1", "", ""], "val": ["a", "b", "c"]})
common = ["id", "val"]
# id has > 10% blanks in new, should be excluded or ranked lower
result = suggest_key_candidates(old, new, common, max_null_rate=0.10)
# val should be included (no blanks) but id excluded (>10% blank in new)
assert "val" in result
def test_sample_size_param(self):
# Just verify it doesn't crash with sample_size
n = 100
old = pd.DataFrame({"id": [str(i) for i in range(n)]})
new = pd.DataFrame({"id": [str(i) for i in range(n)]})
result = suggest_key_candidates(old, new, ["id"], sample_size=50)
assert "id" in result
# ---------- validate_key_column ----------
class TestValidateKeyColumn:
def test_no_duplicates(self):
old = pd.DataFrame({"id": ["1", "2"], "v": ["a", "b"]})
new = pd.DataFrame({"id": ["1", "2"], "v": ["c", "d"]})
has_dup, warning = validate_key_column(old, new, "id", ["id", "v"])
assert has_dup is False
assert warning is None
def test_duplicates_old(self):
old = pd.DataFrame({"id": ["1", "1", "2"], "v": ["a", "b", "c"]})
new = pd.DataFrame({"id": ["1", "2", "3"], "v": ["d", "e", "f"]})
has_dup, warning = validate_key_column(old, new, "id", ["id", "v"])
assert has_dup is True
assert isinstance(warning, DuplicateKeyWarning)
assert warning.old_dup_count > 0
assert "重複" in warning.message
def test_duplicates_new(self):
old = pd.DataFrame({"id": ["1", "2"], "v": ["a", "b"]})
new = pd.DataFrame({"id": ["1", "1"], "v": ["c", "d"]})
has_dup, warning = validate_key_column(old, new, "id", ["id", "v"])
assert has_dup is True
assert isinstance(warning, DuplicateKeyWarning)
assert warning.new_dup_count > 0
def test_invalid_key_col(self):
old = pd.DataFrame({"id": ["1"]})
new = pd.DataFrame({"id": ["1"]})
with pytest.raises(ValueError, match="must exist in BOTH"):
validate_key_column(old, new, "nope", ["id"])
# ---------- build_diff_report ----------
class TestBuildDiffReport:
def test_added(self):
old = pd.DataFrame({"id": ["1"], "v": ["a"]})
new = pd.DataFrame({"id": ["1", "2"], "v": ["a", "b"]})
r = build_diff_report(old, new, "id", ["id", "v"], has_duplicates=False)
added = r.report[r.report["change_type"] == "added"]
assert len(added) == 1
assert added.iloc[0]["id"] == "2"
def test_deleted(self):
old = pd.DataFrame({"id": ["1", "2"], "v": ["a", "b"]})
new = pd.DataFrame({"id": ["1"], "v": ["a"]})
r = build_diff_report(old, new, "id", ["id", "v"], has_duplicates=False)
deleted = r.report[r.report["change_type"] == "deleted"]
assert len(deleted) == 1
assert deleted.iloc[0]["id"] == "2"
def test_updated(self):
old = pd.DataFrame({"id": ["1", "2"], "v": ["a", "b"]})
new = pd.DataFrame({"id": ["1", "2"], "v": ["a", "X"]})
r = build_diff_report(old, new, "id", ["id", "v"], has_duplicates=False)
updated = r.report[r.report["change_type"] == "updated"]
assert len(updated) == 1
assert updated.iloc[0]["old_value"] == "b"
assert updated.iloc[0]["new_value"] == "X"
def test_no_diff(self):
df = pd.DataFrame({"id": ["1", "2"], "v": ["a", "b"]})
r = build_diff_report(df, df.copy(), "id", ["id", "v"], has_duplicates=False)
assert r.diff_count == 0
def test_invalid_key(self):
df = pd.DataFrame({"id": ["1"]})
with pytest.raises(ValueError):
build_diff_report(df, df, "nope", ["id"], has_duplicates=False)
def test_duplicate_keys_fallback(self):
"""With duplicate keys, should use row-level comparison instead of raising."""
old = pd.DataFrame({"id": ["1", "1", "2"], "v": ["a", "b", "c"]})
new = pd.DataFrame({"id": ["1", "1", "2"], "v": ["a", "b", "d"]})
r = build_diff_report(old, new, "id", ["id", "v"], has_duplicates=True)
# id=2 changed from c->d: should appear as 1 deleted + 1 added
assert r.diff_count == 2
types = set(r.report["change_type"])
assert "added" in types
assert "deleted" in types
def test_duplicate_keys_added_only(self):
"""Duplicate keys: new has extra rows."""
old = pd.DataFrame({"id": ["1", "1"], "v": ["a", "b"]})
new = pd.DataFrame({"id": ["1", "1", "1"], "v": ["a", "b", "c"]})
r = build_diff_report(old, new, "id", ["id", "v"], has_duplicates=True)
added = r.report[r.report["change_type"] == "added"]
assert len(added) == 1
def test_contract_violation_raises(self):
"""has_duplicates=False with actual duplicates should raise ValueError."""
old = pd.DataFrame({"id": ["1", "1"], "v": ["a", "b"]})
new = pd.DataFrame({"id": ["1", "2"], "v": ["c", "d"]})
with pytest.raises(ValueError, match="has_duplicates=False"):
build_diff_report(old, new, "id", ["id", "v"], has_duplicates=False)
# ---------- infer_old_new ----------
class TestInferOldNew:
def test_date_pattern(self, tmp_path: Path):
p1 = tmp_path / "data_20230101.csv"
p2 = tmp_path / "data_20230201.csv"
p1.write_text("x")
p2.write_text("x")
old, new = infer_old_new([str(p1), str(p2)])
assert old == str(p1)
assert new == str(p2)
def test_date_pattern_reversed(self, tmp_path: Path):
p1 = tmp_path / "data_20230201.csv"
p2 = tmp_path / "data_20230101.csv"
p1.write_text("x")
p2.write_text("x")
old, new = infer_old_new([str(p1), str(p2)])
assert old == str(p2) # 20230101 is older
assert new == str(p1)
def test_mtime_fallback(self, tmp_path: Path):
p1 = tmp_path / "a.csv"
p2 = tmp_path / "b.csv"
p1.write_text("x")
# Set p1 to older mtime
import time
old_time = time.time() - 100
os.utime(p1, (old_time, old_time))
p2.write_text("x")
old, new = infer_old_new([str(p1), str(p2)])
assert old == str(p1)
assert new == str(p2)
def test_wrong_count(self):
with pytest.raises(ValueError, match="exactly 2"):
infer_old_new(["a.csv"])
def test_same_date(self, tmp_path: Path):
"""Same date in both names -> falls through to mtime."""
p1 = tmp_path / "data_20230101_a.csv"
p2 = tmp_path / "data_20230101_b.csv"
p1.write_text("x")
p2.write_text("x")
# Should not raise
old, new = infer_old_new([str(p1), str(p2)])
assert {old, new} == {str(p1), str(p2)}
# ---------- build_diff_report: exclude_cols ----------
class TestBuildDiffReportExcludeCols:
def test_exclude_cols_hides_updated(self):
"""Excluded column changes should not appear in updated."""
old = pd.DataFrame({"id": ["1"], "v": ["a"], "ts": ["old_ts"]})
new = pd.DataFrame({"id": ["1"], "v": ["a"], "ts": ["new_ts"]})
r = build_diff_report(
old, new, "id", ["id", "v", "ts"],
has_duplicates=False, exclude_cols=["ts"],
)
assert r.diff_count == 0
def test_exclude_cols_case_insensitive(self):
"""Exclude matching should be case-insensitive."""
old = pd.DataFrame({"id": ["1"], "Status": ["X"]})
new = pd.DataFrame({"id": ["1"], "Status": ["Y"]})
r = build_diff_report(
old, new, "id", ["id", "Status"],
has_duplicates=False, exclude_cols=["status"],
)
assert r.diff_count == 0
def test_exclude_cols_does_not_affect_added_deleted(self):
"""Excluded cols should not suppress added/deleted rows."""
old = pd.DataFrame({"id": ["1"], "ts": ["a"]})
new = pd.DataFrame({"id": ["1", "2"], "ts": ["a", "b"]})
r = build_diff_report(
old, new, "id", ["id", "ts"],
has_duplicates=False, exclude_cols=["ts"],
)
added = r.report[r.report["change_type"] == "added"]
assert len(added) == 1
def test_exclude_cols_none(self):
"""exclude_cols=None should behave the same as no exclusion."""
old = pd.DataFrame({"id": ["1"], "v": ["a"]})
new = pd.DataFrame({"id": ["1"], "v": ["b"]})
r = build_diff_report(
old, new, "id", ["id", "v"],
has_duplicates=False, exclude_cols=None,
)
assert r.diff_count == 1
# ---------- split_by_config ----------
class TestSplitByConfig:
def _make_report(self) -> pd.DataFrame:
"""Create a sample report DataFrame for testing."""
return pd.DataFrame({
"change_type": ["updated", "updated", "updated", "added", "deleted"],
"id": ["1", "2", "3", "4", "5"],
"column": ["Status", "Source", "other_col", "__ROW__", "__ROW__"],
"old_value": ["a", "b", "c", "", "x"],
"new_value": ["A", "B", "C", "y", ""],
})
def test_split_creates_expected_keys(self, monkeypatch):
"""Should create keys for each SPLIT_COLUMNS match + Others."""
from csv_diff import config
monkeypatch.setattr(config, "SPLIT_COLUMNS", ["Status", "Source"])
result = split_by_config(self._make_report())
assert "Status" in result
assert "Source" in result
assert "Others" in result
def test_split_no_overlap(self, monkeypatch):
"""Split rows should not appear in Others."""
from csv_diff import config
monkeypatch.setattr(config, "SPLIT_COLUMNS", ["Status"])
result = split_by_config(self._make_report())
others_cols = result["Others"]["column"].tolist()
assert "Status" not in [c.upper() for c in others_cols]
def test_split_empty_df(self, monkeypatch):
"""Empty DataFrame should return empty dict."""
from csv_diff import config
monkeypatch.setattr(config, "SPLIT_COLUMNS", ["X"])
result = split_by_config(pd.DataFrame())
assert result == {}
def test_split_no_match(self, monkeypatch):
"""If no SPLIT_COLUMNS match, everything goes to Others."""
from csv_diff import config
monkeypatch.setattr(config, "SPLIT_COLUMNS", ["NONEXISTENT"])
result = split_by_config(self._make_report())
assert list(result.keys()) == ["Others"]
assert len(result["Others"]) == 5
def test_split_preserves_total_rows(self, monkeypatch):
"""Total rows across all splits should equal the original."""
from csv_diff import config
monkeypatch.setattr(config, "SPLIT_COLUMNS", ["Status", "Source"])
report = self._make_report()
result = split_by_config(report)
total = sum(len(df) for df in result.values())
assert total == len(report)
# ---------- sanitize ----------
class TestSanitizeSheetName:
def test_normal(self):
assert _sanitize_sheet_name("Hello") == "Hello"
def test_forbidden_chars(self):
result = _sanitize_sheet_name("a\\b/c?d*e[f]g:h")
assert "\\" not in result
assert "/" not in result
assert "?" not in result
assert "*" not in result
assert "[" not in result
assert "]" not in result
assert ":" not in result
def test_max_length(self):
long_name = "A" * 50
assert len(_sanitize_sheet_name(long_name)) == 31
def test_empty_fallback(self):
assert _sanitize_sheet_name("") == "Sheet"
def test_only_forbidden_chars(self):
assert _sanitize_sheet_name("?*[]") == "____"
class TestSanitizeFilePart:
def test_normal(self):
assert _sanitize_file_part("hello") == "hello"
def test_windows_forbidden_chars(self):
result = _sanitize_file_part('a<b>c:d"e/f\\g|h?i*j')
for ch in '<>:"/\\|?*':
assert ch not in result
def test_trailing_dots(self):
assert _sanitize_file_part("name...") == "name"
def test_empty_fallback(self):
assert _sanitize_file_part("") == "part"
def test_only_dots(self):
assert _sanitize_file_part("...") == "part"
def test_control_chars(self):
result = _sanitize_file_part("a\x00b\x1fc")
assert "\x00" not in result
assert "\x1f" not in result
# ---------- export_reports ----------
class TestExportReports:
def test_creates_xlsx_and_csvs(self, tmp_path: Path):
"""Should create Excel, per-split CSVs, and master CSV."""
report_dict = {
"Sheet1": pd.DataFrame({"a": [1, 2]}),
"Sheet2": pd.DataFrame({"a": [3, 4]}),
}
export_reports(report_dict, str(tmp_path), "test_report")
assert (tmp_path / "test_report.xlsx").exists()
assert (tmp_path / "test_report_Sheet1.csv").exists()
assert (tmp_path / "test_report_Sheet2.csv").exists()
assert (tmp_path / "test_report_all_combined.csv").exists()
def test_master_csv_has_all_rows(self, tmp_path: Path):
"""Master CSV should contain rows from all splits."""
report_dict = {
"A": pd.DataFrame({"x": ["1", "2"]}),
"B": pd.DataFrame({"x": ["3"]}),
}
export_reports(report_dict, str(tmp_path), "out")
master = pd.read_csv(tmp_path / "out_all_combined.csv", encoding="utf-8-sig")
assert len(master) == 3
def test_empty_dict_skips(self, tmp_path: Path):
"""Empty report_dict should not create any files."""
export_reports({}, str(tmp_path), "empty")
assert list(tmp_path.iterdir()) == []
def test_sanitized_suffix_in_csv(self, tmp_path: Path):
"""Forbidden characters in key should be sanitized in CSV filename."""
report_dict = {"a/b:c": pd.DataFrame({"x": [1]})}
export_reports(report_dict, str(tmp_path), "test")
# The forbidden chars should be replaced with _
csv_files = [f.name for f in tmp_path.glob("test_*.csv") if "combined" not in f.name]
assert len(csv_files) == 1
for ch in '/:':
assert ch not in csv_files[0]
# ---------- read_csv_safely: fullwidth space normalization ----------
class TestNormalizeFullwidth:
def test_fullwidth_space_stripped(self, tmp_path: Path):
"""Full-width spaces (\\u3000) should be replaced and stripped."""
p = tmp_path / "fw.csv"
_write_csv(p, "id,val\n1,\u3000hello\u3000\n")
df = read_csv_safely(p)
assert df["val"].iloc[0] == "hello"
def test_mixed_whitespace(self, tmp_path: Path):
"""Mixed half-width and full-width spaces should all be normalized."""
p = tmp_path / "mix.csv"
_write_csv(p, "id,val\n1, \u3000 test \u3000 \n")
df = read_csv_safely(p)
assert df["val"].iloc[0] == "test"
2026-02-16
編集後記:
この記事の内容がベストではないかもしれません。
記事一覧
-

[Python]Write a 1D arrayvia pandas 【Python】Excelに書き出す|pandas.ExcelWriter -

[Python]endswith+ lower 【Python】拡張子の存在確認|endswith + lower(Method Chaining) -

[Python]Stringsplit 【Python】文字列を抜き出す|.split -

[Python]pathlibfor extension 【Python】文字列から拡張子を取得して、文字列で返す|pathlib.Path.suffix -

[Python]Excelto CSV 【Python】ExcelからCSVに書き出す|pandas,csv -

[Python]Learn FastAPIStep by Step Python【FastAPI Explained】Build High-Performance APIs with Ease