Skip to content

Instantly share code, notes, and snippets.

@luw2007
Last active May 12, 2026 04:20
Show Gist options
  • Select an option

  • Save luw2007/496968804795dabbebdf1ba70312b165 to your computer and use it in GitHub Desktop.

Select an option

Save luw2007/496968804795dabbebdf1ba70312b165 to your computer and use it in GitHub Desktop.
Unified AI conversation tool-use cleaner — strip tool-call noise from Trae/Coco/Claude exports | AI 对话记录 tool_use 统一清洗工具
#!/usr/bin/env python3
"""
tool_use_cleaner.py — Unified AI conversation tool-use cleaner
Strip tool-use noise from AI assistant conversation exports and produce
clean Markdown. Auto-detects multiple formats:
- Trae (toolName/status/filePath blocks)
- Coco CLI (⏺ ToolCall + ⎿ result blocks)
- Generic XML/JSON tool blocks (<tool_use>, {"type":"tool_use"} …)
- Fenced code blocks containing tool payloads
- Agent trace noise (Thought/skill calls/progress)
- Thinking / reasoning blocks
Supported input: Markdown · JSON / JSONL · plain text
Usage:
python3 tool_use_cleaner.py <input> [-o output] [--aggressive] [--stats]
echo "…" | python3 tool_use_cleaner.py -o output.md
python3 tool_use_cleaner.py input.md --trace --keep-json-code
"""
from __future__ import annotations
import argparse
import json
import re
import sys
from pathlib import Path
from typing import List, Iterable, Tuple, Dict
# ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
# Pattern definitions
# ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
EN_THINKING_VERBS = (
"Defining|Pinpointing|Inspecting|Reviewing|Confirming|Analyzing|"
"Evaluating|Considering|Examining|Assessing|Planning|Thinking|"
"Reasoning|Reflecting|Processing|Understanding|Interpreting|"
"Clarifying|Exploring|Investigating|Determining|Identifying|"
"Recognizing|Formulating|Synthesizing|Breaking down|Mapping out|"
"Working through|Figuring out|Piecing together|Sorting through|"
"Narrowing down|Zeroing in|Homing in|Drilling into|Digging into|"
"Looking into|Checking|Verifying|Validating|Noting|Observing|"
"Realizing|Deciding|Weighing|Comparing|Contrasting|Refining|"
"Updating|Revising|Recalibrating|Adjusting|Pivoting|Shifting|"
"Transitioning|Organizing|Summarizing|Crafting|Preparing|"
"Assembling|Gathering|Collecting|Searching|Scanning|Reading|"
"Parsing|Extracting|Fetching|Retrieving|Loading|Running|"
"Executing|Calling|Invoking|Querying|Computing|Calculating"
)
ZH_THINKING_VERBS = (
"分析|思考|推理|评估|检查|验证|确认|理解|审查|规划|"
"梳理|判断|识别|探索|调查|研究|处理|回顾|反思|"
"整理思路|深入分析|仔细检查|进一步|重新考虑|"
"初步判断|综合分析|逐步推理|查看|获取|读取|搜索|"
"扫描|解析|提取|加载|运行|执行|调用|查询|计算"
)
# Fence code block opening line
FENCE_RE = re.compile(r"^(```+)(.*)$")
class CleanerStats:
"""Cleaning statistics."""
def __init__(self):
self.original_chars = 0
self.cleaned_chars = 0
self.thinking_blocks_removed = 0
self.tool_blocks_removed = 0
self.empty_lines_collapsed = 0
self.metadata_lines_removed = 0
self.trace_lines_removed = 0
def summary(self):
ratio = (1 - self.cleaned_chars / max(self.original_chars, 1)) * 100
parts = [
f" Original chars: {self.original_chars:,}",
f" Cleaned chars: {self.cleaned_chars:,}",
f" Reduction: {ratio:.1f}%",
f" Thinking blocks removed: {self.thinking_blocks_removed}",
f" Tool blocks removed: {self.tool_blocks_removed}",
f" Metadata lines removed: {self.metadata_lines_removed}",
f" Blank lines collapsed: {self.empty_lines_collapsed}",
]
if self.trace_lines_removed:
parts.append(f" Trace noise removed: {self.trace_lines_removed}")
return "\n".join(parts)
class ToolUseCleaner:
"""Main cleaner — auto-detects Trae / Coco CLI / generic formats."""
def __init__(self, keep_thinking=False, aggressive=False,
trace=False, keep_json_code=False, keep_table_border=False):
self.keep_thinking = keep_thinking
self.aggressive = aggressive
self.trace = trace
self.keep_json_code = keep_json_code
self.keep_table_border = keep_table_border
self.stats = CleanerStats()
self._compile_patterns()
def _compile_patterns(self):
"""Compile regex patterns."""
# Thinking block title line
self.re_thinking_title = re.compile(
rf"^\*\*(?:{EN_THINKING_VERBS}|{ZH_THINKING_VERBS})[^*]*\*\*\s*$",
re.MULTILINE | re.IGNORECASE,
)
# XML-style tool tags (multiline)
xml_tags = [
"tool_use", "tool_result", "function_calls", "function_results",
"invoke", "antml:invoke", "antml:function_calls",
"tool_call", "tool_response", "search_results",
]
self.re_xml_blocks = []
for tag in xml_tags:
escaped = re.escape(tag)
# Match <tag ...> ... </tag> or self-closing <tag ... />
self.re_xml_blocks.append(
re.compile(rf"<{escaped}[\s>][\s\S]*?</{escaped}>", re.DOTALL)
)
self.re_xml_blocks.append(
re.compile(rf"<{escaped}\s[^>]*/\s*>", re.DOTALL)
)
# JSON tool_use object (standalone line {"type": "tool_use", ...})
self.re_json_tool = re.compile(
r'^\s*\{[^{}]*"type"\s*:\s*"tool_(?:use|result)"[^{}]*\}\s*$',
re.MULTILINE,
)
# Metadata comment lines
self.re_metadata = re.compile(
r"^<!--\s*(?:clip2file|created|source|chars|tool|function|timestamp)\s*:.*-->$",
re.MULTILINE | re.IGNORECASE,
)
# Table box-drawing border lines — lines with ONLY box-drawing chars,
# spaces, ASCII dashes/plus/pipe (no letters, digits, or CJK).
# Must contain at least one Unicode box-drawing char to avoid matching
# plain ASCII lines like "| Name | Age |" or "----".
# Negative lookahead excludes lines with any alphanumeric/CJK content.
_box = (
"─━┄┅┈┉╌╍═"
"┌┍┎┏┐┑┒┓└┕┖┗┘┙┚┛"
"├┝┞┟┠┡┢┣┤┥┦┧┨┩┪┫"
"┬┭┮┯┰┱┲┳┴┵┶┷┸┹┺┻"
"┼┽┾┿╀╁╂╃╄╅╆╇╈╉╊╋"
"╎╏║╒╓╔╕╖╗╘╙╚╛╜╝"
"╞╟╠╡╢╣╤╥╦╧╨╩╪╫╬╭╮╯╰"
"│┃┆┇┊┋"
)
self.re_table_border = re.compile(
rf"^(?![^\n]*[\w一-鿿])[{_box} \t+\-|]*[{_box}][{_box} \t+\-|]*\n?",
re.MULTILINE,
)
# Long horizontal box-drawing lines → shorten to 4 chars
# Matches 5+ consecutive horizontal box chars (─ ━ ═ ┄ ┅ ┈ ┉ ╌ ╍)
self.re_long_hline = re.compile(r"([─━═┄┅┈┉╌╍])\1{4,}")
# Consecutive blank lines (3+ -> 2)
self.re_multi_blank = re.compile(r"\n{4,}")
# Aggressive mode: tool parameter blocks
if self.aggressive:
self.re_tool_params = re.compile(
r"(?:^|\n)(?:Parameters|Arguments|Input|Output|Result|Tool):\s*\n"
r"(?:(?:[ \t]+\S.*|```[\s\S]*?```)\n?)+",
re.MULTILINE,
)
# Aggressive mode: "I'll use X tool" / "Let me call Y" transition sentences
self.re_tool_transition = re.compile(
r"^(?:I(?:'ll| will) (?:use|call|invoke|run|execute)|"
r"Let me (?:use|call|invoke|run|execute)|"
r"(?:Using|Calling|Invoking|Running|Executing) the|"
r"Now (?:I'll|let me) (?:use|call)|"
r"我(?:来|将|会)?(?:使用|调用|运行|执行)|"
r"让我(?:使用|调用|运行|执行)|"
r"(?:正在|开始)?(?:使用|调用|运行|执行))"
r".*(?:tool|工具|技能|skill).*$",
re.MULTILINE | re.IGNORECASE,
)
# ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
# Main cleaning pipeline
# ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
def clean(self, text: str) -> str:
"""Main cleaning entry point."""
self.stats.original_chars = len(text)
# 0. Agent trace preprocessing (optional)
if self.trace:
text = self._clean_agent_trace(text)
# 1. Remove Trae format tool blocks (toolName/status/filePath etc.)
text = self._clean_trae_blocks(text)
# 2. Remove Coco CLI format tool blocks (⏺ ToolCall + ⎿ results)
text = self._clean_coco_cli_blocks(text)
# 3. Remove metadata comments
text, n = self.re_metadata.subn("", text)
self.stats.metadata_lines_removed += n
# 4. Remove XML-style tool blocks
for pat in self.re_xml_blocks:
text, n = pat.subn("", text)
self.stats.tool_blocks_removed += n
# 5. Remove JSON tool_use blocks
text, n = self.re_json_tool.subn("", text)
self.stats.tool_blocks_removed += n
# 6. Fence-aware code block cleaning
text = self._clean_fenced_blocks(text)
# 7. Remove thinking blocks (if keep_thinking=False)
if not self.keep_thinking:
text = self._remove_thinking_blocks(text)
# 8. Remove table box-drawing border lines (default on)
if not self.keep_table_border:
text, n = self.re_table_border.subn("", text)
self.stats.metadata_lines_removed += n
# 8b. Shorten long horizontal box-drawing lines (────────── → ────)
text = self.re_long_hline.sub(r"\1\1\1\1", text)
# 9. Aggressive mode extra cleaning
if self.aggressive:
text, n = self.re_tool_params.subn("\n", text)
self.stats.tool_blocks_removed += n
text, n = self.re_tool_transition.subn("", text)
self.stats.tool_blocks_removed += n
# 10. Normalize whitespace-only lines → empty, then collapse runs
text = re.sub(r"(?m)^[ \t]+$", "", text)
orig_lines = text.count("\n")
text = self.re_multi_blank.sub("\n\n\n", text)
self.stats.empty_lines_collapsed = orig_lines - text.count("\n")
# 11. Trim leading/trailing whitespace, ensure file ends with newline
text = text.strip() + "\n"
self.stats.cleaned_chars = len(text)
return text
# ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
# Trae format cleaning
# ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
# Known Trae tool-block field prefixes
_RE_TRAE_FIELD = re.compile(
r"^\s*(?:toolName|status|filePath|file_pattern|command|query"
r"|changes|content|Todos updated)\s*:",
)
def _clean_trae_blocks(self, text: str) -> str:
"""Remove Trae-style tool blocks (toolName/status/filePath/command …).
State machine: on seeing ``toolName:`` enter *skip* mode. In skip
mode everything is discarded until a blank separator **followed by** a
content-looking line (CJK text or Markdown structure) is encountered.
This two-gate design prevents multi-line field values (heredoc commands,
inline code) from leaking through.
"""
if "toolName:" not in text:
return text
lines = text.split("\n")
out: List[str] = []
skip = False
saw_blank = False
removed = 0
for line in lines:
stripped = line.strip()
# Detect toolName: → enter skip
if stripped.startswith("toolName:"):
skip = True
saw_blank = False
removed += 1
continue
if skip:
# Blank / whitespace-only → set gate
if not stripped:
saw_blank = True
continue
# Known field line (resets blank gate — still inside block)
if self._RE_TRAE_FIELD.match(stripped):
removed += 1
saw_blank = False
continue
# Standalone "undefined"
if stripped == "undefined":
removed += 1
continue
# After blank separator: check if this looks like content
if saw_blank and self._is_content_line(stripped):
skip = False
out.append(line)
else:
# Still inside tool block (multi-line value / path / code)
removed += 1
continue
else:
# Normal mode: still strip orphaned field/status lines
if self._RE_TRAE_FIELD.match(stripped):
removed += 1
continue
if stripped == "undefined":
removed += 1
continue
out.append(line)
self.stats.tool_blocks_removed += removed
return "\n".join(out)
@staticmethod
def _is_content_line(s: str) -> bool:
"""Heuristic: does this stripped line look like assistant narrative?
Returns True for CJK text, Markdown structure, or sentence-like prose.
Returns False for code fragments, file paths, and short labels.
"""
# CJK characters (Chinese / Japanese / Korean)
if re.search(r'[\u4e00-\u9fff\u3040-\u309f\u30a0-\u30ff\uac00-\ud7af]', s):
return True
# Markdown heading / list / blockquote / table / bold / link
if re.match(r'^(?:[#\-*>|]|\*\*|\d+\.\s|\[)', s):
return True
return False
# ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
# Coco CLI format cleaning
# ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
_RE_COCO_TOOL = re.compile(r"[A-Z]\w*\(|^\[MCP\]|^research\(")
_RE_THOUGHT = re.compile(r"^Thought for \d+")
def _clean_coco_cli_blocks(self, text: str) -> str:
"""Remove Coco CLI tool-call blocks (⏺ ToolCall + ⎿ results).
Rules:
* ``⏺ Word(`` / ``⏺ [MCP]`` / ``⏺ research(`` → tool call, remove.
* ``⏺ <natural text>`` → narrative, keep (strip the ⏺ marker).
* ``⎿ …`` → result header, enter *result* mode (remove).
* While in result mode, remove continuation lines until a blank line
or a new ``⏺`` line is encountered.
* ``Thought for XmYs`` markers are removed.
"""
if "⏺" not in text and "⎿" not in text:
return text
lines = text.split("\n")
out: List[str] = []
in_result = False
removed = 0
for line in lines:
stripped = line.strip()
# Thought markers
if self._RE_THOUGHT.match(stripped):
removed += 1
continue
# ⏺ lines
if "⏺" in stripped:
in_result = False
after = stripped.split("⏺", 1)[1].strip()
if self._RE_COCO_TOOL.match(after):
removed += 1
continue
# Narrative — keep text after ⏺
if after:
out.append(after)
continue
# ⎿ result lines
if "⎿" in stripped:
in_result = True
removed += 1
continue
# Inside result continuation
if in_result:
if not stripped:
in_result = False
out.append(line)
else:
removed += 1
continue
# Regular content
out.append(line)
self.stats.tool_blocks_removed += removed
return "\n".join(out)
# ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
# Fence-aware code block cleaning
# ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
def _looks_like_tool_block(self, info: str, body_lines: List[str]) -> bool:
"""Determine whether a fenced code block is a tool call.
Heuristics:
1. Language tag explicitly marks it as a tool call
2. Body contains typical tool-call fields
3. When keep_json_code=True, preserve plain JSON without tool fields
"""
lang = info.strip().split()[0].lower() if info.strip() else ""
# 1) Explicitly tagged as tool call
if lang in {"tool", "tool_use", "tool-use", "toolcall", "tool_call",
"tool_code", "function_call"}:
return True
# Bash block containing tool call signatures
if lang == "bash":
first_lines = "\n".join(body_lines[:3])
if re.search(r"#.*tool|(?:Read|Write|Edit|Glob|Grep|Bash)\s*\(", first_lines):
return True
text = "\n".join(body_lines)
# 2) Trae / Coco tool-call field signatures
if re.search(r'"recipient_name"\s*:\s*"functions\.', text):
return True
if re.search(r'"tool"\s*:\s*"[A-Za-z0-9_.]+"', text) and (
'"arguments"' in text or '"params"' in text or '"parameters"' in text
):
return True
# 3) Protect JSON: no obvious tool fields and user wants to keep → don't remove
if self.keep_json_code and lang in {"json", ""}:
return False
return False
def _clean_fenced_blocks(self, text: str) -> str:
"""Scan lines to identify and remove tool-call fenced code blocks.
More robust than simple regex: correctly handles nested fences,
different tick lengths, and other edge cases.
"""
lines = text.split("\n")
out: List[str] = []
in_fence = False
fence_info = ""
fence_ticks = ""
fence_body: List[str] = []
def flush_fence() -> None:
nonlocal fence_info, fence_ticks, fence_body
if not fence_ticks:
return
if self._looks_like_tool_block(fence_info, fence_body):
self.stats.tool_blocks_removed += 1
# After removing a block, add a blank line to prevent merging
if out and out[-1].strip():
out.append("")
else:
# Keep the code block
out.append(f"{fence_ticks}{fence_info}")
out.extend(fence_body)
out.append(fence_ticks)
fence_info = ""
fence_ticks = ""
fence_body = []
for line in lines:
m = FENCE_RE.match(line)
if m:
ticks, info = m.group(1), m.group(2)
if not in_fence:
# Enter fenced code block
in_fence = True
fence_ticks = ticks
fence_info = info
fence_body = []
else:
# Only same-length fence marks as closing
if ticks == fence_ticks:
in_fence = False
flush_fence()
else:
# Different-length ``` inside treated as plain content
fence_body.append(line)
continue
if in_fence:
fence_body.append(line)
else:
out.append(line)
# Still inside fenced block at EOF — conservatively keep everything
if in_fence:
out.append(f"{fence_ticks}{fence_info}")
out.extend(fence_body)
return "\n".join(out)
# ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
# Agent trace cleaning
# ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
def _preprocess_single_line(self, raw: str) -> str:
"""If input is a single long line, insert newlines at semantic boundaries."""
# Boundary patterns and replacements
boundary_patterns = [
# Tool/noise boundaries
(r'(Thought)', r'\n§THOUGHT§\n'),
(r'(调用技能:\s*\S+)', r'\n§SKILL§\1\n'),
(r'(✬命令已执行[^\n]*)', r'\n§CMD§\1\n'),
(r'(在工作区搜索\s[^\n]*)', r'\n§SEARCH§\1\n'),
(r'(\d+/\d+\s*已完成)', r'\n§PROGRESS§\1\n'),
(r'(定位\s[^::]{3,40}(?=定位|packages/|tests/|\d+/\d+|前置说明|结论))',
r'\n§LOCATE§\1\n'),
# File paths
(r'((?:packages|tests|src|lib|configs?|scripts?)/[\w\-./]+\.'
r'(?:py|go|ts|tsx|js|json|yaml|yml|md|sh))',
r'\n§FILE§\1\n'),
# Meaningful section headers
(r'(前置说明[::])', r'\n\n## 前置说明\n'),
(r'(结论)', r'\n\n## 结论\n'),
(r'(逐项验证)', r'\n\n## 逐项验证\n'),
(r'(补充判断)', r'\n\n## 补充判断\n'),
(r'(工具调用简报)', r'\n\n§TOOL_SUMMARY§工具调用简报\n'),
# Sub-item fields
(r'exists[::]\s*', r'\n- **exists**: '),
(r'severity[::]\s*', r'\n- **severity**: '),
(r'简因[::]\s*', r'\n- **简因**: '),
(r'关键代码[::]\s*', r'\n- **关键代码**: '),
(r'直接证据[::]\s*', r'\n- **直接证据**: '),
(r'独立执行结果[::]\s*', r'\n- **独立执行结果**: '),
(r'最小复现实验结果[::]\s*', r'\n- **最小复现实验结果**: '),
]
text = raw
for pattern, replacement in boundary_patterns:
text = re.sub(pattern, replacement, text)
# Numbered items
text = re.sub(r'(?<=\n)(\d)\s+(平台未接入|cancel|evaluator)', r'\n### \1. \2', text)
text = re.sub(r'(## 结论\n+)(\d)\s+', r'\1### \2. ', text)
return text
def _clean_agent_trace(self, text: str) -> str:
"""Agent trace cleaning: remove Thought/skill calls/progress/file paths noise."""
original_chars = len(text)
# Phase 1: if input looks like a single long line, split it first
lines = text.splitlines()
if len(lines) <= 5 and original_chars > 500:
text = self._preprocess_single_line(text)
lines = text.splitlines()
# Phase 2: per-line filtering
output_lines: List[str] = []
removed = 0
skip_until_section = False # Used to skip "tool call summary" blocks
for line in lines:
stripped = line.strip()
# Skip blank lines (but preserve one blank line gap)
if not stripped:
if output_lines and output_lines[-1].strip():
output_lines.append("")
continue
# In "tool call summary" skip mode
if skip_until_section:
if stripped.startswith("## 前置说明") or stripped.startswith("## 结论"):
skip_until_section = False
# Continue processing this line
else:
removed += 1
continue
# === Noise patterns ===
# Preprocessor-tagged noise
if stripped.startswith("§THOUGHT§") or stripped == "§THOUGHT§":
removed += 1
continue
if any(stripped.startswith(tag) for tag in (
"§SKILL§", "§CMD§", "§SEARCH§", "§PROGRESS§", "§LOCATE§", "§FILE§"
)):
removed += 1
continue
if stripped.startswith("§TOOL_SUMMARY§"):
skip_until_section = True
removed += 1
continue
# Raw noise (cases not caught by preprocessor)
if stripped == "Thought":
removed += 1
continue
if re.match(r"^调用技能:", stripped):
removed += 1
continue
if stripped.startswith("✬命令已执行"):
removed += 1
continue
if re.match(r"^在工作区搜索\s", stripped):
removed += 1
continue
if re.match(r"^\d+/\d+\s*已完成", stripped):
removed += 1
continue
# Leftover code block tags
if stripped in ("Python", "text", "PY", "python3", "bash"):
removed += 1
continue
# Pure file path lines
if re.match(
r"^(packages|tests|src|lib|internal|cmd|configs?|scripts?|docs?)/"
r"[\w\-./]+\.(py|go|ts|tsx|js|jsx|yaml|yml|json|toml|md|txt|sh|sql)$",
stripped
):
removed += 1
continue
# "定位 xxx" short lines (insufficient context)
if re.match(r"^定位\s", stripped) and len(stripped) < 80:
removed += 1
continue
# Inline script fragments (multiple indicators present)
script_indicators = [
"from packages.", "from types import", "class Fake",
"async def main", "asyncio.run(", "SimpleNamespace(",
"print('PLATFORMS=", "print('passed=", "print('final_status=",
"model_dump()", "<<'PY'", "import asyncio",
]
if sum(1 for s in script_indicators if s in stripped) >= 2:
removed += 1
continue
# === Keep this line ===
output_lines.append(line)
# Phase 3: post-processing formatting
cleaned_lines: List[str] = []
for line in output_lines:
# Remove trailing code block tags
line = re.sub(r"\s{2,}Python\s*$", "", line)
line = re.sub(r"\s{2,}text\s*$", "", line)
line = re.sub(r"\s{2,}Python\s{2,}Python", "", line)
# "直接证据: Python" → "直接证据: (见源码)"
line = re.sub(r"(直接证据\*?\*?:\s*)Python", r"\1*(见源码)*", line)
cleaned_lines.append(line)
# Normalize blank lines
final: List[str] = []
blank_count = 0
for line in cleaned_lines:
if not line.strip():
blank_count += 1
if blank_count <= 2:
final.append("")
else:
blank_count = 0
final.append(line)
# Trim leading/trailing blank lines
while final and not final[0].strip():
final.pop(0)
while final and not final[-1].strip():
final.pop()
self.stats.trace_lines_removed += removed
return "\n".join(final)
# ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
# Thinking block removal
# ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
def _remove_thinking_blocks(self, text: str) -> str:
"""Remove thinking/reasoning blocks (title line + following body paragraphs).
Strategy: on encountering a thinking block title, enter skip mode —
skip the title line and all subsequent body text until a "content
boundary" is reached. Content boundary criteria:
- Markdown heading (# ...)
- Bold conclusion/solution marker (**推理 / **解决 / **总结 / **结论 / **方案 etc.)
- Horizontal rule (--- / ***)
- Two consecutive blank lines followed by non-thinking-title content
"""
lines = text.split("\n")
result = []
skip = False
consecutive_blank = 0
# Pre-compiled: bold markers for content boundaries
re_boundary_bold = re.compile(
r"^\*\*(?:推理|解决|总结|结论|方案|回答|答案|分析结果|最终|Summary|Solution|Answer|Conclusion|Result)",
re.IGNORECASE,
)
for i, line in enumerate(lines):
stripped = line.strip()
# Check if this is a thinking block title
if self.re_thinking_title.match(stripped):
if not skip:
skip = True
self.stats.thinking_blocks_removed += 1
consecutive_blank = 0
continue
if skip:
# ── Check if we've reached a content boundary ──
# 1. Markdown heading
if stripped.startswith("#"):
skip = False
consecutive_blank = 0
result.append(line)
continue
# 2. Bold conclusion marker
if re_boundary_bold.match(stripped):
skip = False
consecutive_blank = 0
result.append(line)
continue
# 3. Horizontal rule
if stripped in ("---", "***", "___") or re.match(r"^[-*_]{3,}$", stripped):
skip = False
consecutive_blank = 0
result.append(line)
continue
# 4. Blank line counting
if stripped == "":
consecutive_blank += 1
# On 2+ consecutive blank lines, look ahead
if consecutive_blank >= 2:
# Find next non-blank line
next_content = ""
for j in range(i + 1, len(lines)):
if lines[j].strip():
next_content = lines[j].strip()
break
# If next non-blank line isn't a thinking title → resume
if next_content and not self.re_thinking_title.match(next_content):
skip = False
result.append(line)
continue # Continue skipping blank lines
# 5. Regular body text → skip (thinking block body)
consecutive_blank = 0
continue
else:
consecutive_blank = 0 if stripped else consecutive_blank + 1
result.append(line)
return "\n".join(result)
# ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
# JSON conversation processing
# ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
def clean_json_conversation(self, data) -> str:
"""Clean JSON conversation records, extract valuable content as Markdown."""
output_parts = []
messages = []
if isinstance(data, list):
messages = data
elif isinstance(data, dict):
for key in ("messages", "conversation", "data", "items", "turns"):
if key in data and isinstance(data[key], list):
messages = data[key]
break
if not messages:
messages = [data]
for msg in messages:
if not isinstance(msg, dict):
continue
role = msg.get("role", msg.get("type", ""))
content = self._extract_content(msg)
if not content:
continue
# Skip pure tool_use / tool_result messages
if role in ("tool", "tool_result", "function"):
self.stats.tool_blocks_removed += 1
continue
# For assistant messages, filter out tool_use parts from content
if role == "assistant":
if isinstance(msg.get("content"), list):
# Claude API format: content is an array of blocks
text_parts = []
for block in msg["content"]:
if isinstance(block, dict):
if block.get("type") == "text":
text_parts.append(block.get("text", ""))
elif block.get("type") == "tool_use":
self.stats.tool_blocks_removed += 1
elif isinstance(block, str):
text_parts.append(block)
content = "\n\n".join(text_parts)
if content.strip():
cleaned = self.clean(content)
if cleaned.strip():
if role == "user":
output_parts.append(f"## User\n\n{cleaned}")
elif role == "assistant":
output_parts.append(f"## Assistant\n\n{cleaned}")
elif role == "system":
pass # Skip system messages
else:
output_parts.append(cleaned)
return "\n\n---\n\n".join(output_parts) + "\n" if output_parts else ""
def _extract_content(self, msg: dict) -> str:
"""Extract text content from a message object."""
content = msg.get("content", msg.get("text", msg.get("message", "")))
if isinstance(content, str):
return content
elif isinstance(content, list):
parts = []
for item in content:
if isinstance(item, str):
parts.append(item)
elif isinstance(item, dict):
if item.get("type") == "text":
parts.append(item.get("text", ""))
return "\n\n".join(parts)
return ""
# ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
# File processing entry point
# ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
def process_file(input_path, output_path=None, keep_thinking=False,
aggressive=False, show_stats=False, trace=False,
keep_json_code=False, keep_table_border=False):
"""Process a single file."""
path = Path(input_path)
if not path.exists():
print(f"Error: file not found — {path}", file=sys.stderr)
return False
if output_path is None:
output_path = path.parent / f"{path.stem}_cleaned.md"
else:
output_path = Path(output_path)
cleaner = ToolUseCleaner(
keep_thinking=keep_thinking,
aggressive=aggressive,
trace=trace,
keep_json_code=keep_json_code,
keep_table_border=keep_table_border,
)
raw = path.read_text(encoding="utf-8")
suffix = path.suffix.lower()
# Determine input format
if suffix in (".json", ".jsonl"):
result = _process_json(cleaner, raw, suffix)
else:
result = cleaner.clean(raw)
output_path.write_text(result, encoding="utf-8")
if show_stats:
print(f"📊 Cleaning stats ({path.name}):")
print(cleaner.stats.summary())
print(f"✅ Output: {output_path}")
return True
def process_stdin(output_path=None, keep_thinking=False, aggressive=False,
show_stats=False, trace=False, keep_json_code=False,
keep_table_border=False):
"""Read from stdin and process."""
raw = sys.stdin.read()
if not raw.strip():
print("Error: stdin is empty", file=sys.stderr)
return False
cleaner = ToolUseCleaner(
keep_thinking=keep_thinking,
aggressive=aggressive,
trace=trace,
keep_json_code=keep_json_code,
keep_table_border=keep_table_border,
)
# Try to detect if input is JSON
stripped = raw.strip()
if stripped.startswith(("{", "[")):
try:
data = json.loads(raw)
result = cleaner.clean_json_conversation(data)
except json.JSONDecodeError:
result = cleaner.clean(raw)
else:
result = cleaner.clean(raw)
if output_path:
Path(output_path).write_text(result, encoding="utf-8")
if show_stats:
print("📊 Cleaning stats (stdin):")
print(cleaner.stats.summary())
print(f"✅ Output: {output_path}")
else:
# No output path — write to stdout
sys.stdout.write(result)
if show_stats:
print("📊 Cleaning stats (stdin):", file=sys.stderr)
print(cleaner.stats.summary(), file=sys.stderr)
return True
def _process_json(cleaner, raw, suffix):
"""Process JSON/JSONL input."""
if suffix == ".jsonl":
messages = []
for line in raw.strip().split("\n"):
line = line.strip()
if line:
try:
messages.append(json.loads(line))
except json.JSONDecodeError:
pass
return cleaner.clean_json_conversation(messages)
else:
try:
data = json.loads(raw)
return cleaner.clean_json_conversation(data)
except json.JSONDecodeError:
# Invalid JSON — treat as plain text
return cleaner.clean(raw)
# ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
# CLI
# ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
def main():
parser = argparse.ArgumentParser(
description="AI conversation tool-use cleaner",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Examples:
python3 tool_use_cleaner.py chat_export.md
python3 tool_use_cleaner.py conversation.json -o clean.md --stats
python3 tool_use_cleaner.py messy.md --aggressive --stats
python3 tool_use_cleaner.py trace.md --trace --keep-json-code
cat input.md | python3 tool_use_cleaner.py -o output.md
cat input.md | python3 tool_use_cleaner.py > output.md
""",
)
parser.add_argument(
"input", nargs="?", default=None,
help="Input file path (.md / .json / .jsonl / .txt); omit to read from stdin",
)
parser.add_argument("-o", "--output", help="Output file path (default: <input>_cleaned.md)")
parser.add_argument(
"--keep-thinking", action="store_true",
help="Keep thinking/reasoning blocks (removed by default)",
)
parser.add_argument(
"--aggressive", action="store_true",
help="Aggressive mode: additionally remove tool parameter details and intermediate output",
)
parser.add_argument(
"--trace", action="store_true",
help="Enable agent trace cleaning (remove Thought/skill calls/progress noise)",
)
parser.add_argument(
"--keep-json-code", action="store_true",
help="Keep plain JSON code blocks; only remove when obvious tool fields are detected",
)
parser.add_argument(
"--keep-table-border", action="store_true",
help="Keep table box-drawing border lines (removed by default)",
)
parser.add_argument(
"--stats", action="store_true",
help="Show cleaning statistics",
)
args = parser.parse_args()
if args.input:
# File mode
success = process_file(
args.input,
args.output,
keep_thinking=args.keep_thinking,
aggressive=args.aggressive,
show_stats=args.stats,
trace=args.trace,
keep_json_code=args.keep_json_code,
keep_table_border=args.keep_table_border,
)
else:
# Stdin mode
success = process_stdin(
output_path=args.output,
keep_thinking=args.keep_thinking,
aggressive=args.aggressive,
show_stats=args.stats,
trace=args.trace,
keep_json_code=args.keep_json_code,
keep_table_border=args.keep_table_border,
)
sys.exit(0 if success else 1)
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment