524 lines
21 KiB
Python
524 lines
21 KiB
Python
#!/usr/bin/env python3
|
|
r"""
|
|
SMART PDF Compressor - Only Saves When Actually Smaller
|
|
Automatically keeps the smaller version (original vs compressed)
|
|
|
|
Features:
|
|
- Dynamic timeout based on file size
|
|
- Only keeps compressed version if actually smaller
|
|
- Shows compression success rate
|
|
- Moves processed files to 'processed' folder
|
|
|
|
Usage: python batch_compress_smart_v2.py <source> <destination> [workers] [--timeout-per-mb SECONDS]
|
|
|
|
Examples:
|
|
python batch_compress_smart_v2.py D:\C D:\D 60
|
|
python batch_compress_smart_v2.py D:\C D:\D 60 --timeout-per-mb 10
|
|
|
|
Huong dan su dung
|
|
Day la se chuyen tu folder A (original) to foder B (Output/Result)
|
|
Required to install ghostscripts de chay
|
|
Toi uu cho PC manh 3990x 5090
|
|
|
|
"""
|
|
|
|
import os
|
|
import sys
|
|
import subprocess
|
|
import platform
|
|
from pathlib import Path
|
|
from concurrent.futures import ProcessPoolExecutor, as_completed
|
|
from multiprocessing import cpu_count
|
|
import time
|
|
from datetime import timedelta
|
|
import threading
|
|
import shutil
|
|
|
|
# Windows limit
|
|
MAX_WINDOWS_WORKERS = 61
|
|
|
|
def get_file_size_mb(filepath):
|
|
"""Get file size in MB"""
|
|
try:
|
|
return os.path.getsize(filepath) / (1024 * 1024)
|
|
except:
|
|
return 0
|
|
|
|
def ensure_folder_exists(folder_path):
|
|
"""Create folder if it doesn't exist"""
|
|
Path(folder_path).mkdir(parents=True, exist_ok=True)
|
|
|
|
def find_pdf_files_with_sizes(folder_path, processed_folder):
|
|
"""Find all unprocessed PDF files"""
|
|
pdf_files = []
|
|
if not os.path.exists(folder_path):
|
|
return pdf_files
|
|
|
|
processed_files = set()
|
|
if os.path.exists(processed_folder):
|
|
processed_files = {f.lower() for f in os.listdir(processed_folder) if f.lower().endswith('.pdf')}
|
|
|
|
for file in os.listdir(folder_path):
|
|
if file.lower().endswith('.pdf'):
|
|
full_path = os.path.join(folder_path, file)
|
|
if os.path.isfile(full_path) and file.lower() not in processed_files:
|
|
size = get_file_size_mb(full_path)
|
|
pdf_files.append((file, size))
|
|
|
|
# Sort by size (SMALLEST first)
|
|
return sorted(pdf_files, key=lambda x: x[1])
|
|
|
|
def find_ghostscript_command():
|
|
"""Find Ghostscript"""
|
|
gs_commands = ['gswin64c', 'gswin32c', 'gs']
|
|
for cmd in gs_commands:
|
|
try:
|
|
subprocess.run([cmd, '--version'], stdout=subprocess.PIPE, stderr=subprocess.PIPE, check=True, timeout=5)
|
|
return cmd
|
|
except:
|
|
continue
|
|
return None
|
|
|
|
def calculate_timeout(file_size_mb, seconds_per_mb=6):
|
|
"""
|
|
Calculate timeout based on file size
|
|
Default: 6 seconds per MB
|
|
Minimum: 60 seconds
|
|
Maximum: 30 minutes
|
|
"""
|
|
timeout = int(file_size_mb * seconds_per_mb)
|
|
timeout = max(60, timeout) # At least 1 minute
|
|
timeout = min(1800, timeout) # At most 30 minutes
|
|
return timeout
|
|
|
|
def compress_pdf_smart(args):
|
|
"""
|
|
Compress PDF and only keep if smaller
|
|
Returns: (filename, success, original_size, final_size, compressed_smaller, error_msg, duration, moved, timeout_used)
|
|
"""
|
|
input_path, output_path, processed_path, gs_command, thread_count, seconds_per_mb = args
|
|
filename = os.path.basename(input_path)
|
|
start_time = time.time()
|
|
moved = False
|
|
compressed_smaller = False
|
|
temp_output = output_path + ".tmp"
|
|
|
|
try:
|
|
original_size = get_file_size_mb(input_path)
|
|
|
|
# Calculate dynamic timeout
|
|
timeout_seconds = calculate_timeout(original_size, seconds_per_mb)
|
|
|
|
# Ghostscript compression to temp file
|
|
gs_cmd = [
|
|
gs_command,
|
|
'-sDEVICE=pdfwrite',
|
|
'-dCompatibilityLevel=1.4',
|
|
'-dPDFSETTINGS=/ebook',
|
|
'-dNOPAUSE',
|
|
'-dQUIET',
|
|
'-dBATCH',
|
|
'-dDetectDuplicateImages=true',
|
|
'-dCompressFonts=true',
|
|
'-dCompressPages=true',
|
|
'-dColorImageResolution=150',
|
|
'-dGrayImageResolution=150',
|
|
'-dMonoImageResolution=300',
|
|
f'-dNumRenderingThreads={thread_count}',
|
|
'-dOptimize=true',
|
|
'-dDownsampleColorImages=true',
|
|
'-dDownsampleGrayImages=true',
|
|
'-dColorImageDownsampleType=/Bicubic',
|
|
'-dGrayImageDownsampleType=/Bicubic',
|
|
f'-sOutputFile={temp_output}',
|
|
input_path
|
|
]
|
|
|
|
creation_flags = subprocess.HIGH_PRIORITY_CLASS if platform.system() == "Windows" else 0
|
|
|
|
result = subprocess.run(
|
|
gs_cmd,
|
|
stdout=subprocess.PIPE,
|
|
stderr=subprocess.PIPE,
|
|
check=True,
|
|
text=True,
|
|
timeout=timeout_seconds,
|
|
creationflags=creation_flags
|
|
)
|
|
|
|
if os.path.exists(temp_output):
|
|
compressed_size = get_file_size_mb(temp_output)
|
|
|
|
# Compare sizes - only use compressed if smaller
|
|
if compressed_size < original_size:
|
|
# Compressed is smaller - use it!
|
|
shutil.move(temp_output, output_path)
|
|
final_size = compressed_size
|
|
compressed_smaller = True
|
|
else:
|
|
# Original is smaller - copy it instead
|
|
shutil.copy2(input_path, output_path)
|
|
final_size = original_size
|
|
compressed_smaller = False
|
|
# Clean up temp file
|
|
if os.path.exists(temp_output):
|
|
os.remove(temp_output)
|
|
|
|
# Move source file to processed folder
|
|
try:
|
|
shutil.move(input_path, processed_path)
|
|
moved = True
|
|
except Exception as move_error:
|
|
pass
|
|
|
|
duration = time.time() - start_time
|
|
return (filename, True, original_size, final_size, compressed_smaller, None, duration, moved, timeout_seconds)
|
|
else:
|
|
duration = time.time() - start_time
|
|
return (filename, False, original_size, 0, False, "Output not created", duration, False, timeout_seconds)
|
|
|
|
except subprocess.TimeoutExpired:
|
|
# Timeout - just copy original
|
|
if os.path.exists(temp_output):
|
|
os.remove(temp_output)
|
|
|
|
try:
|
|
shutil.copy2(input_path, output_path)
|
|
shutil.move(input_path, processed_path)
|
|
moved = True
|
|
duration = time.time() - start_time
|
|
return (filename, True, original_size, original_size, False, f"Timeout - used original", duration, moved, timeout_seconds)
|
|
except:
|
|
duration = time.time() - start_time
|
|
return (filename, False, original_size, 0, False, f"Timeout after {timeout_seconds}s", duration, False, timeout_seconds)
|
|
|
|
except subprocess.CalledProcessError as e:
|
|
# Compression failed - copy original
|
|
if os.path.exists(temp_output):
|
|
os.remove(temp_output)
|
|
|
|
try:
|
|
shutil.copy2(input_path, output_path)
|
|
shutil.move(input_path, processed_path)
|
|
moved = True
|
|
duration = time.time() - start_time
|
|
error_msg = e.stderr[:50] if e.stderr else "Compression failed"
|
|
return (filename, True, original_size, original_size, False, f"Failed - used original: {error_msg}", duration, moved, timeout_seconds)
|
|
except:
|
|
duration = time.time() - start_time
|
|
error_msg = e.stderr[:100] if e.stderr else "Ghostscript error"
|
|
return (filename, False, original_size, 0, False, error_msg, duration, False, timeout_seconds)
|
|
|
|
except Exception as e:
|
|
if os.path.exists(temp_output):
|
|
os.remove(temp_output)
|
|
duration = time.time() - start_time
|
|
timeout_seconds = calculate_timeout(original_size, seconds_per_mb)
|
|
return (filename, False, original_size, 0, False, str(e)[:100], duration, False, timeout_seconds)
|
|
|
|
class ProgressMonitor:
|
|
"""Real-time progress monitoring"""
|
|
def __init__(self, total_files):
|
|
self.total_files = total_files
|
|
self.completed = 0
|
|
self.successful = 0
|
|
self.failed = 0
|
|
self.moved = 0
|
|
self.compressed_better = 0
|
|
self.original_better = 0
|
|
self.total_original = 0
|
|
self.total_final = 0
|
|
self.start_time = time.time()
|
|
self.lock = threading.Lock()
|
|
|
|
def update(self, success, original_size, final_size, compressed_smaller, was_moved):
|
|
with self.lock:
|
|
self.completed += 1
|
|
self.total_original += original_size
|
|
if success:
|
|
self.successful += 1
|
|
self.total_final += final_size
|
|
if was_moved:
|
|
self.moved += 1
|
|
if compressed_smaller:
|
|
self.compressed_better += 1
|
|
elif original_size == final_size:
|
|
self.original_better += 1
|
|
else:
|
|
self.failed += 1
|
|
|
|
def get_stats(self):
|
|
with self.lock:
|
|
elapsed = time.time() - self.start_time
|
|
rate = self.completed / elapsed if elapsed > 0 else 0
|
|
eta = (self.total_files - self.completed) / rate if rate > 0 else 0
|
|
|
|
return {
|
|
'completed': self.completed,
|
|
'successful': self.successful,
|
|
'failed': self.failed,
|
|
'moved': self.moved,
|
|
'compressed_better': self.compressed_better,
|
|
'original_better': self.original_better,
|
|
'elapsed': elapsed,
|
|
'rate': rate,
|
|
'eta': eta,
|
|
'total_original': self.total_original,
|
|
'total_final': self.total_final
|
|
}
|
|
|
|
def batch_compress_smart_v2(source_folder, dest_folder, max_workers=None, seconds_per_mb=6):
|
|
"""Smart batch compression - only keeps smaller files"""
|
|
|
|
source_folder = os.path.abspath(os.path.expanduser(source_folder))
|
|
dest_folder = os.path.abspath(os.path.expanduser(dest_folder))
|
|
processed_folder = os.path.join(source_folder, "processed")
|
|
failed_log = os.path.join(dest_folder, "_failed_files.txt")
|
|
|
|
if not os.path.exists(source_folder):
|
|
print(f"❌ Error: Source folder does not exist: {source_folder}")
|
|
return 1
|
|
|
|
ensure_folder_exists(dest_folder)
|
|
ensure_folder_exists(processed_folder)
|
|
|
|
print("🔍 Scanning for PDF files...")
|
|
pdf_files = find_pdf_files_with_sizes(source_folder, processed_folder)
|
|
|
|
if not pdf_files:
|
|
print("✓ All files already processed!")
|
|
return 0
|
|
|
|
gs_command = find_ghostscript_command()
|
|
if not gs_command:
|
|
print("❌ Error: Ghostscript not found!")
|
|
print("\nInstall Ghostscript from: https://ghostscript.com/releases/gsdnld.html")
|
|
return 1
|
|
|
|
# Count processed
|
|
processed_count = len([f for f in os.listdir(processed_folder) if f.lower().endswith('.pdf')]) if os.path.exists(processed_folder) else 0
|
|
|
|
# Determine workers
|
|
cpu_cores = cpu_count()
|
|
if max_workers is None:
|
|
max_workers = min(MAX_WINDOWS_WORKERS, int(cpu_cores * 0.75)) if platform.system() == "Windows" else int(cpu_cores * 0.85)
|
|
else:
|
|
if platform.system() == "Windows" and max_workers > MAX_WINDOWS_WORKERS:
|
|
print(f"⚠️ Adjusting workers from {max_workers} to {MAX_WINDOWS_WORKERS}")
|
|
max_workers = MAX_WINDOWS_WORKERS
|
|
|
|
threads_per_pdf = max(2, min(8, cpu_cores // (max_workers // 2)))
|
|
|
|
# Analyze file sizes
|
|
total_size = sum(s for _, s in pdf_files)
|
|
max_file_size = max(s for _, s in pdf_files)
|
|
max_timeout = calculate_timeout(max_file_size, seconds_per_mb)
|
|
|
|
# Print header
|
|
print("\n" + "=" * 80)
|
|
print(" 🎯 SMART PDF COMPRESSOR V2 - BEST VERSION WINS")
|
|
print(" Only keeps compressed file if it's actually smaller!")
|
|
print("=" * 80)
|
|
print(f"\n🖥️ CPU: {cpu_cores} logical cores")
|
|
print(f"🔧 Workers: {max_workers} parallel processes")
|
|
print(f"🧵 Threads/PDF: {threads_per_pdf} per file")
|
|
print(f"⏱️ Timeout: {seconds_per_mb}s per MB (min: 60s, max: 1800s)")
|
|
print(f"📦 Ghostscript: {gs_command}")
|
|
|
|
print(f"\n📁 Folders:")
|
|
print(f" Source: {source_folder}")
|
|
print(f" Destination: {dest_folder}")
|
|
print(f" Processed: {processed_folder}")
|
|
|
|
if processed_count > 0:
|
|
print(f"\n✓ Already processed: {processed_count} files")
|
|
|
|
print(f"\n📄 Files to process: {len(pdf_files)} PDFs")
|
|
print(f" Total size: {total_size:.1f} MB ({total_size/1024:.1f} GB)")
|
|
print(f" Largest file: {max_file_size:.1f} MB (timeout: {max_timeout}s = {max_timeout/60:.1f} min)")
|
|
|
|
print(f"\n📋 Smart Strategy:")
|
|
print(f" 1. Try to compress each PDF")
|
|
print(f" 2. Compare: compressed vs original")
|
|
print(f" 3. Keep whichever is SMALLER")
|
|
print(f" 4. Move source to 'processed' folder")
|
|
print(f" 5. Result: Always get the best version!")
|
|
|
|
print("\n" + "=" * 80)
|
|
response = input("👉 Continue? (yes/no): ").lower().strip()
|
|
if response not in ['yes', 'y']:
|
|
print("❌ Cancelled")
|
|
return 1
|
|
|
|
print("\n🚀 Starting smart compression...\n")
|
|
|
|
# Prepare tasks
|
|
tasks = []
|
|
for pdf_file, _ in pdf_files:
|
|
input_path = os.path.join(source_folder, pdf_file)
|
|
output_path = os.path.join(dest_folder, pdf_file)
|
|
processed_path = os.path.join(processed_folder, pdf_file)
|
|
tasks.append((input_path, output_path, processed_path, gs_command, threads_per_pdf, seconds_per_mb))
|
|
|
|
monitor = ProgressMonitor(len(pdf_files))
|
|
failed_files = []
|
|
|
|
start_time = time.time()
|
|
|
|
try:
|
|
with ProcessPoolExecutor(max_workers=max_workers) as executor:
|
|
future_to_file = {executor.submit(compress_pdf_smart, task): task for task in tasks}
|
|
|
|
for future in as_completed(future_to_file):
|
|
filename, success, original_size, final_size, compressed_smaller, error_msg, duration, was_moved, timeout_used = future.result()
|
|
|
|
monitor.update(success, original_size, final_size, compressed_smaller, was_moved)
|
|
stats = monitor.get_stats()
|
|
|
|
if success:
|
|
if compressed_smaller:
|
|
reduction = ((original_size - final_size) / original_size) * 100
|
|
indicator = "✓ COMPRESSED"
|
|
print(f"✓ [{stats['completed']}/{len(pdf_files)}] {filename[:45]}")
|
|
print(f" {original_size:.1f}MB → {final_size:.1f}MB ({reduction:.1f}%↓) [{duration:.1f}s] {indicator}")
|
|
else:
|
|
if original_size == final_size:
|
|
indicator = "= ORIGINAL (better)"
|
|
else:
|
|
indicator = "= ORIGINAL (compression failed)"
|
|
print(f"✓ [{stats['completed']}/{len(pdf_files)}] {filename[:45]}")
|
|
print(f" {original_size:.1f}MB (kept original) [{duration:.1f}s] {indicator}")
|
|
if error_msg:
|
|
print(f" Note: {error_msg}")
|
|
else:
|
|
print(f"✗ [{stats['completed']}/{len(pdf_files)}] {filename[:45]}")
|
|
print(f" FAILED ({original_size:.1f}MB): {error_msg} [{duration:.1f}s]")
|
|
failed_files.append(f"{filename} ({original_size:.1f}MB) - {error_msg}")
|
|
|
|
# Stats every 50 files
|
|
if stats['completed'] % 50 == 0:
|
|
compression_rate = (stats['compressed_better'] / stats['successful'] * 100) if stats['successful'] > 0 else 0
|
|
print(f"\n 📊 Progress:")
|
|
print(f" Rate: {stats['rate']:.2f} files/sec | ETA: {timedelta(seconds=int(stats['eta']))}")
|
|
print(f" Compressed better: {stats['compressed_better']} ({compression_rate:.1f}%)")
|
|
print(f" Original better: {stats['original_better']}")
|
|
print(f" Failed: {stats['failed']}")
|
|
if stats['total_original'] > 0:
|
|
saved_gb = (stats['total_original'] - stats['total_final']) / 1024
|
|
reduction_pct = ((stats['total_original'] - stats['total_final']) / stats['total_original']) * 100
|
|
print(f" Total saved: {saved_gb:.2f} GB ({reduction_pct:.1f}% reduction)")
|
|
print()
|
|
|
|
except KeyboardInterrupt:
|
|
print("\n\n⚠️ Interrupted by user")
|
|
stats = monitor.get_stats()
|
|
|
|
# Save failed files log
|
|
if failed_files:
|
|
with open(failed_log, 'w', encoding='utf-8') as f:
|
|
f.write(f"Failed Files Report - {time.strftime('%Y-%m-%d %H:%M:%S')}\n")
|
|
f.write(f"Total Failed: {len(failed_files)}\n")
|
|
f.write("=" * 80 + "\n\n")
|
|
for line in failed_files:
|
|
f.write(line + "\n")
|
|
print(f"\n📝 Failed files logged to: {failed_log}")
|
|
|
|
# Final Summary
|
|
elapsed_time = time.time() - start_time
|
|
stats = monitor.get_stats()
|
|
|
|
print("\n" + "=" * 80)
|
|
print("🏁 FINAL SUMMARY")
|
|
print("=" * 80)
|
|
print(f"⏱️ Total Time: {timedelta(seconds=int(elapsed_time))}")
|
|
print(f"⚡ Average Rate: {stats['completed'] / elapsed_time:.2f} files/sec")
|
|
print(f"\n📊 Results:")
|
|
print(f" Processed: {stats['completed']} files")
|
|
print(f" ✓ Successful: {stats['successful']}")
|
|
print(f" ✗ Failed: {stats['failed']}")
|
|
|
|
if stats['successful'] > 0:
|
|
compression_success_rate = (stats['compressed_better'] / stats['successful'] * 100)
|
|
print(f"\n🎯 Compression Results:")
|
|
print(f" Compressed better: {stats['compressed_better']} files ({compression_success_rate:.1f}%)")
|
|
print(f" Original better: {stats['original_better']} files ({100-compression_success_rate:.1f}%)")
|
|
|
|
if stats['total_original'] > 0:
|
|
print(f"\n💾 Storage:")
|
|
print(f" Original size: {stats['total_original']/1024:.2f} GB")
|
|
print(f" Final size: {stats['total_final']/1024:.2f} GB")
|
|
overall = ((stats['total_original'] - stats['total_final']) / stats['total_original']) * 100
|
|
saved = (stats['total_original'] - stats['total_final']) / 1024
|
|
print(f" Reduction: {overall:.1f}%")
|
|
print(f" 💰 Saved: {saved:.2f} GB")
|
|
|
|
print(f"\n📁 Locations:")
|
|
print(f" Final files: {dest_folder}")
|
|
print(f" Processed: {processed_folder}")
|
|
print(f" Remaining: {source_folder}")
|
|
|
|
if stats['failed'] > 0:
|
|
print(f"\n⚠️ {stats['failed']} files failed - see {failed_log}")
|
|
|
|
print("=" * 80)
|
|
|
|
return 0 if stats['failed'] == 0 else 1
|
|
|
|
def main():
|
|
"""Main entry point"""
|
|
if len(sys.argv) < 3:
|
|
cpu_cores = cpu_count()
|
|
recommended = min(MAX_WINDOWS_WORKERS, max(1, int(cpu_cores * 0.75))) if platform.system() == "Windows" else int(cpu_cores * 0.85)
|
|
|
|
print("🎯 SMART PDF Compressor V2 - Best Version Wins!")
|
|
print(f" Detected: {cpu_cores} cores | Recommended: {recommended} workers")
|
|
print("\nUsage:")
|
|
print(f" python {sys.argv[0]} <source> <destination> [workers] [--timeout-per-mb SECONDS]")
|
|
print("\nExamples:")
|
|
print(r" python batch_compress_smart_v2.py D:\C D:\D 60")
|
|
print(r" python batch_compress_smart_v2.py D:\C D:\D 60 --timeout-per-mb 10")
|
|
print("\nKey Feature:")
|
|
print(" • Tries to compress each PDF")
|
|
print(" • Compares compressed vs original")
|
|
print(" • Always keeps the SMALLER version")
|
|
print(" • No more files getting bigger!")
|
|
return 1
|
|
|
|
source = sys.argv[1]
|
|
destination = sys.argv[2]
|
|
|
|
# Parse arguments
|
|
max_workers = None
|
|
seconds_per_mb = 6
|
|
|
|
i = 3
|
|
while i < len(sys.argv):
|
|
arg = sys.argv[i]
|
|
if arg == '--timeout-per-mb' and i + 1 < len(sys.argv):
|
|
try:
|
|
seconds_per_mb = int(sys.argv[i + 1])
|
|
i += 2
|
|
except ValueError:
|
|
print(f"Warning: Invalid timeout-per-mb, using default 6")
|
|
i += 2
|
|
else:
|
|
try:
|
|
max_workers = int(arg)
|
|
i += 1
|
|
except ValueError:
|
|
print(f"Warning: Unknown argument '{arg}', ignoring")
|
|
i += 1
|
|
|
|
return batch_compress_smart_v2(source, destination, max_workers, seconds_per_mb)
|
|
|
|
if __name__ == '__main__':
|
|
try:
|
|
sys.exit(main())
|
|
except KeyboardInterrupt:
|
|
print("\n\n❌ Cancelled by user")
|
|
sys.exit(1)
|
|
except Exception as e:
|
|
print(f"\n❌ Error: {e}")
|
|
import traceback
|
|
traceback.print_exc()
|
|
sys.exit(1) |