pdfhandling/batchcompressed.py
2025-12-24 06:29:20 +07:00

524 lines
21 KiB
Python

#!/usr/bin/env python3
r"""
SMART PDF Compressor - Only Saves When Actually Smaller
Automatically keeps the smaller version (original vs compressed)
Features:
- Dynamic timeout based on file size
- Only keeps compressed version if actually smaller
- Shows compression success rate
- Moves processed files to 'processed' folder
Usage: python batch_compress_smart_v2.py <source> <destination> [workers] [--timeout-per-mb SECONDS]
Examples:
python batch_compress_smart_v2.py D:\C D:\D 60
python batch_compress_smart_v2.py D:\C D:\D 60 --timeout-per-mb 10
Huong dan su dung
Day la se chuyen tu folder A (original) to foder B (Output/Result)
Required to install ghostscripts de chay
Toi uu cho PC manh 3990x 5090
"""
import os
import sys
import subprocess
import platform
from pathlib import Path
from concurrent.futures import ProcessPoolExecutor, as_completed
from multiprocessing import cpu_count
import time
from datetime import timedelta
import threading
import shutil
# Windows limit
MAX_WINDOWS_WORKERS = 61
def get_file_size_mb(filepath):
"""Get file size in MB"""
try:
return os.path.getsize(filepath) / (1024 * 1024)
except:
return 0
def ensure_folder_exists(folder_path):
"""Create folder if it doesn't exist"""
Path(folder_path).mkdir(parents=True, exist_ok=True)
def find_pdf_files_with_sizes(folder_path, processed_folder):
"""Find all unprocessed PDF files"""
pdf_files = []
if not os.path.exists(folder_path):
return pdf_files
processed_files = set()
if os.path.exists(processed_folder):
processed_files = {f.lower() for f in os.listdir(processed_folder) if f.lower().endswith('.pdf')}
for file in os.listdir(folder_path):
if file.lower().endswith('.pdf'):
full_path = os.path.join(folder_path, file)
if os.path.isfile(full_path) and file.lower() not in processed_files:
size = get_file_size_mb(full_path)
pdf_files.append((file, size))
# Sort by size (SMALLEST first)
return sorted(pdf_files, key=lambda x: x[1])
def find_ghostscript_command():
"""Find Ghostscript"""
gs_commands = ['gswin64c', 'gswin32c', 'gs']
for cmd in gs_commands:
try:
subprocess.run([cmd, '--version'], stdout=subprocess.PIPE, stderr=subprocess.PIPE, check=True, timeout=5)
return cmd
except:
continue
return None
def calculate_timeout(file_size_mb, seconds_per_mb=6):
"""
Calculate timeout based on file size
Default: 6 seconds per MB
Minimum: 60 seconds
Maximum: 30 minutes
"""
timeout = int(file_size_mb * seconds_per_mb)
timeout = max(60, timeout) # At least 1 minute
timeout = min(1800, timeout) # At most 30 minutes
return timeout
def compress_pdf_smart(args):
"""
Compress PDF and only keep if smaller
Returns: (filename, success, original_size, final_size, compressed_smaller, error_msg, duration, moved, timeout_used)
"""
input_path, output_path, processed_path, gs_command, thread_count, seconds_per_mb = args
filename = os.path.basename(input_path)
start_time = time.time()
moved = False
compressed_smaller = False
temp_output = output_path + ".tmp"
try:
original_size = get_file_size_mb(input_path)
# Calculate dynamic timeout
timeout_seconds = calculate_timeout(original_size, seconds_per_mb)
# Ghostscript compression to temp file
gs_cmd = [
gs_command,
'-sDEVICE=pdfwrite',
'-dCompatibilityLevel=1.4',
'-dPDFSETTINGS=/ebook',
'-dNOPAUSE',
'-dQUIET',
'-dBATCH',
'-dDetectDuplicateImages=true',
'-dCompressFonts=true',
'-dCompressPages=true',
'-dColorImageResolution=150',
'-dGrayImageResolution=150',
'-dMonoImageResolution=300',
f'-dNumRenderingThreads={thread_count}',
'-dOptimize=true',
'-dDownsampleColorImages=true',
'-dDownsampleGrayImages=true',
'-dColorImageDownsampleType=/Bicubic',
'-dGrayImageDownsampleType=/Bicubic',
f'-sOutputFile={temp_output}',
input_path
]
creation_flags = subprocess.HIGH_PRIORITY_CLASS if platform.system() == "Windows" else 0
result = subprocess.run(
gs_cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
check=True,
text=True,
timeout=timeout_seconds,
creationflags=creation_flags
)
if os.path.exists(temp_output):
compressed_size = get_file_size_mb(temp_output)
# Compare sizes - only use compressed if smaller
if compressed_size < original_size:
# Compressed is smaller - use it!
shutil.move(temp_output, output_path)
final_size = compressed_size
compressed_smaller = True
else:
# Original is smaller - copy it instead
shutil.copy2(input_path, output_path)
final_size = original_size
compressed_smaller = False
# Clean up temp file
if os.path.exists(temp_output):
os.remove(temp_output)
# Move source file to processed folder
try:
shutil.move(input_path, processed_path)
moved = True
except Exception as move_error:
pass
duration = time.time() - start_time
return (filename, True, original_size, final_size, compressed_smaller, None, duration, moved, timeout_seconds)
else:
duration = time.time() - start_time
return (filename, False, original_size, 0, False, "Output not created", duration, False, timeout_seconds)
except subprocess.TimeoutExpired:
# Timeout - just copy original
if os.path.exists(temp_output):
os.remove(temp_output)
try:
shutil.copy2(input_path, output_path)
shutil.move(input_path, processed_path)
moved = True
duration = time.time() - start_time
return (filename, True, original_size, original_size, False, f"Timeout - used original", duration, moved, timeout_seconds)
except:
duration = time.time() - start_time
return (filename, False, original_size, 0, False, f"Timeout after {timeout_seconds}s", duration, False, timeout_seconds)
except subprocess.CalledProcessError as e:
# Compression failed - copy original
if os.path.exists(temp_output):
os.remove(temp_output)
try:
shutil.copy2(input_path, output_path)
shutil.move(input_path, processed_path)
moved = True
duration = time.time() - start_time
error_msg = e.stderr[:50] if e.stderr else "Compression failed"
return (filename, True, original_size, original_size, False, f"Failed - used original: {error_msg}", duration, moved, timeout_seconds)
except:
duration = time.time() - start_time
error_msg = e.stderr[:100] if e.stderr else "Ghostscript error"
return (filename, False, original_size, 0, False, error_msg, duration, False, timeout_seconds)
except Exception as e:
if os.path.exists(temp_output):
os.remove(temp_output)
duration = time.time() - start_time
timeout_seconds = calculate_timeout(original_size, seconds_per_mb)
return (filename, False, original_size, 0, False, str(e)[:100], duration, False, timeout_seconds)
class ProgressMonitor:
"""Real-time progress monitoring"""
def __init__(self, total_files):
self.total_files = total_files
self.completed = 0
self.successful = 0
self.failed = 0
self.moved = 0
self.compressed_better = 0
self.original_better = 0
self.total_original = 0
self.total_final = 0
self.start_time = time.time()
self.lock = threading.Lock()
def update(self, success, original_size, final_size, compressed_smaller, was_moved):
with self.lock:
self.completed += 1
self.total_original += original_size
if success:
self.successful += 1
self.total_final += final_size
if was_moved:
self.moved += 1
if compressed_smaller:
self.compressed_better += 1
elif original_size == final_size:
self.original_better += 1
else:
self.failed += 1
def get_stats(self):
with self.lock:
elapsed = time.time() - self.start_time
rate = self.completed / elapsed if elapsed > 0 else 0
eta = (self.total_files - self.completed) / rate if rate > 0 else 0
return {
'completed': self.completed,
'successful': self.successful,
'failed': self.failed,
'moved': self.moved,
'compressed_better': self.compressed_better,
'original_better': self.original_better,
'elapsed': elapsed,
'rate': rate,
'eta': eta,
'total_original': self.total_original,
'total_final': self.total_final
}
def batch_compress_smart_v2(source_folder, dest_folder, max_workers=None, seconds_per_mb=6):
"""Smart batch compression - only keeps smaller files"""
source_folder = os.path.abspath(os.path.expanduser(source_folder))
dest_folder = os.path.abspath(os.path.expanduser(dest_folder))
processed_folder = os.path.join(source_folder, "processed")
failed_log = os.path.join(dest_folder, "_failed_files.txt")
if not os.path.exists(source_folder):
print(f"❌ Error: Source folder does not exist: {source_folder}")
return 1
ensure_folder_exists(dest_folder)
ensure_folder_exists(processed_folder)
print("🔍 Scanning for PDF files...")
pdf_files = find_pdf_files_with_sizes(source_folder, processed_folder)
if not pdf_files:
print("✓ All files already processed!")
return 0
gs_command = find_ghostscript_command()
if not gs_command:
print("❌ Error: Ghostscript not found!")
print("\nInstall Ghostscript from: https://ghostscript.com/releases/gsdnld.html")
return 1
# Count processed
processed_count = len([f for f in os.listdir(processed_folder) if f.lower().endswith('.pdf')]) if os.path.exists(processed_folder) else 0
# Determine workers
cpu_cores = cpu_count()
if max_workers is None:
max_workers = min(MAX_WINDOWS_WORKERS, int(cpu_cores * 0.75)) if platform.system() == "Windows" else int(cpu_cores * 0.85)
else:
if platform.system() == "Windows" and max_workers > MAX_WINDOWS_WORKERS:
print(f"⚠️ Adjusting workers from {max_workers} to {MAX_WINDOWS_WORKERS}")
max_workers = MAX_WINDOWS_WORKERS
threads_per_pdf = max(2, min(8, cpu_cores // (max_workers // 2)))
# Analyze file sizes
total_size = sum(s for _, s in pdf_files)
max_file_size = max(s for _, s in pdf_files)
max_timeout = calculate_timeout(max_file_size, seconds_per_mb)
# Print header
print("\n" + "=" * 80)
print(" 🎯 SMART PDF COMPRESSOR V2 - BEST VERSION WINS")
print(" Only keeps compressed file if it's actually smaller!")
print("=" * 80)
print(f"\n🖥️ CPU: {cpu_cores} logical cores")
print(f"🔧 Workers: {max_workers} parallel processes")
print(f"🧵 Threads/PDF: {threads_per_pdf} per file")
print(f"⏱️ Timeout: {seconds_per_mb}s per MB (min: 60s, max: 1800s)")
print(f"📦 Ghostscript: {gs_command}")
print(f"\n📁 Folders:")
print(f" Source: {source_folder}")
print(f" Destination: {dest_folder}")
print(f" Processed: {processed_folder}")
if processed_count > 0:
print(f"\n✓ Already processed: {processed_count} files")
print(f"\n📄 Files to process: {len(pdf_files)} PDFs")
print(f" Total size: {total_size:.1f} MB ({total_size/1024:.1f} GB)")
print(f" Largest file: {max_file_size:.1f} MB (timeout: {max_timeout}s = {max_timeout/60:.1f} min)")
print(f"\n📋 Smart Strategy:")
print(f" 1. Try to compress each PDF")
print(f" 2. Compare: compressed vs original")
print(f" 3. Keep whichever is SMALLER")
print(f" 4. Move source to 'processed' folder")
print(f" 5. Result: Always get the best version!")
print("\n" + "=" * 80)
response = input("👉 Continue? (yes/no): ").lower().strip()
if response not in ['yes', 'y']:
print("❌ Cancelled")
return 1
print("\n🚀 Starting smart compression...\n")
# Prepare tasks
tasks = []
for pdf_file, _ in pdf_files:
input_path = os.path.join(source_folder, pdf_file)
output_path = os.path.join(dest_folder, pdf_file)
processed_path = os.path.join(processed_folder, pdf_file)
tasks.append((input_path, output_path, processed_path, gs_command, threads_per_pdf, seconds_per_mb))
monitor = ProgressMonitor(len(pdf_files))
failed_files = []
start_time = time.time()
try:
with ProcessPoolExecutor(max_workers=max_workers) as executor:
future_to_file = {executor.submit(compress_pdf_smart, task): task for task in tasks}
for future in as_completed(future_to_file):
filename, success, original_size, final_size, compressed_smaller, error_msg, duration, was_moved, timeout_used = future.result()
monitor.update(success, original_size, final_size, compressed_smaller, was_moved)
stats = monitor.get_stats()
if success:
if compressed_smaller:
reduction = ((original_size - final_size) / original_size) * 100
indicator = "✓ COMPRESSED"
print(f"✓ [{stats['completed']}/{len(pdf_files)}] {filename[:45]}")
print(f" {original_size:.1f}MB → {final_size:.1f}MB ({reduction:.1f}%↓) [{duration:.1f}s] {indicator}")
else:
if original_size == final_size:
indicator = "= ORIGINAL (better)"
else:
indicator = "= ORIGINAL (compression failed)"
print(f"✓ [{stats['completed']}/{len(pdf_files)}] {filename[:45]}")
print(f" {original_size:.1f}MB (kept original) [{duration:.1f}s] {indicator}")
if error_msg:
print(f" Note: {error_msg}")
else:
print(f"✗ [{stats['completed']}/{len(pdf_files)}] {filename[:45]}")
print(f" FAILED ({original_size:.1f}MB): {error_msg} [{duration:.1f}s]")
failed_files.append(f"{filename} ({original_size:.1f}MB) - {error_msg}")
# Stats every 50 files
if stats['completed'] % 50 == 0:
compression_rate = (stats['compressed_better'] / stats['successful'] * 100) if stats['successful'] > 0 else 0
print(f"\n 📊 Progress:")
print(f" Rate: {stats['rate']:.2f} files/sec | ETA: {timedelta(seconds=int(stats['eta']))}")
print(f" Compressed better: {stats['compressed_better']} ({compression_rate:.1f}%)")
print(f" Original better: {stats['original_better']}")
print(f" Failed: {stats['failed']}")
if stats['total_original'] > 0:
saved_gb = (stats['total_original'] - stats['total_final']) / 1024
reduction_pct = ((stats['total_original'] - stats['total_final']) / stats['total_original']) * 100
print(f" Total saved: {saved_gb:.2f} GB ({reduction_pct:.1f}% reduction)")
print()
except KeyboardInterrupt:
print("\n\n⚠️ Interrupted by user")
stats = monitor.get_stats()
# Save failed files log
if failed_files:
with open(failed_log, 'w', encoding='utf-8') as f:
f.write(f"Failed Files Report - {time.strftime('%Y-%m-%d %H:%M:%S')}\n")
f.write(f"Total Failed: {len(failed_files)}\n")
f.write("=" * 80 + "\n\n")
for line in failed_files:
f.write(line + "\n")
print(f"\n📝 Failed files logged to: {failed_log}")
# Final Summary
elapsed_time = time.time() - start_time
stats = monitor.get_stats()
print("\n" + "=" * 80)
print("🏁 FINAL SUMMARY")
print("=" * 80)
print(f"⏱️ Total Time: {timedelta(seconds=int(elapsed_time))}")
print(f"⚡ Average Rate: {stats['completed'] / elapsed_time:.2f} files/sec")
print(f"\n📊 Results:")
print(f" Processed: {stats['completed']} files")
print(f" ✓ Successful: {stats['successful']}")
print(f" ✗ Failed: {stats['failed']}")
if stats['successful'] > 0:
compression_success_rate = (stats['compressed_better'] / stats['successful'] * 100)
print(f"\n🎯 Compression Results:")
print(f" Compressed better: {stats['compressed_better']} files ({compression_success_rate:.1f}%)")
print(f" Original better: {stats['original_better']} files ({100-compression_success_rate:.1f}%)")
if stats['total_original'] > 0:
print(f"\n💾 Storage:")
print(f" Original size: {stats['total_original']/1024:.2f} GB")
print(f" Final size: {stats['total_final']/1024:.2f} GB")
overall = ((stats['total_original'] - stats['total_final']) / stats['total_original']) * 100
saved = (stats['total_original'] - stats['total_final']) / 1024
print(f" Reduction: {overall:.1f}%")
print(f" 💰 Saved: {saved:.2f} GB")
print(f"\n📁 Locations:")
print(f" Final files: {dest_folder}")
print(f" Processed: {processed_folder}")
print(f" Remaining: {source_folder}")
if stats['failed'] > 0:
print(f"\n⚠️ {stats['failed']} files failed - see {failed_log}")
print("=" * 80)
return 0 if stats['failed'] == 0 else 1
def main():
"""Main entry point"""
if len(sys.argv) < 3:
cpu_cores = cpu_count()
recommended = min(MAX_WINDOWS_WORKERS, max(1, int(cpu_cores * 0.75))) if platform.system() == "Windows" else int(cpu_cores * 0.85)
print("🎯 SMART PDF Compressor V2 - Best Version Wins!")
print(f" Detected: {cpu_cores} cores | Recommended: {recommended} workers")
print("\nUsage:")
print(f" python {sys.argv[0]} <source> <destination> [workers] [--timeout-per-mb SECONDS]")
print("\nExamples:")
print(r" python batch_compress_smart_v2.py D:\C D:\D 60")
print(r" python batch_compress_smart_v2.py D:\C D:\D 60 --timeout-per-mb 10")
print("\nKey Feature:")
print(" • Tries to compress each PDF")
print(" • Compares compressed vs original")
print(" • Always keeps the SMALLER version")
print(" • No more files getting bigger!")
return 1
source = sys.argv[1]
destination = sys.argv[2]
# Parse arguments
max_workers = None
seconds_per_mb = 6
i = 3
while i < len(sys.argv):
arg = sys.argv[i]
if arg == '--timeout-per-mb' and i + 1 < len(sys.argv):
try:
seconds_per_mb = int(sys.argv[i + 1])
i += 2
except ValueError:
print(f"Warning: Invalid timeout-per-mb, using default 6")
i += 2
else:
try:
max_workers = int(arg)
i += 1
except ValueError:
print(f"Warning: Unknown argument '{arg}', ignoring")
i += 1
return batch_compress_smart_v2(source, destination, max_workers, seconds_per_mb)
if __name__ == '__main__':
try:
sys.exit(main())
except KeyboardInterrupt:
print("\n\n❌ Cancelled by user")
sys.exit(1)
except Exception as e:
print(f"\n❌ Error: {e}")
import traceback
traceback.print_exc()
sys.exit(1)