From 9bcf4a6b51272edeb5f5a3c6672736e8859afb3b Mon Sep 17 00:00:00 2001 From: thanhtl Date: Fri, 28 Feb 2025 16:14:46 +0700 Subject: [PATCH] Add upload.py --- upload.py | 169 ++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 169 insertions(+) create mode 100644 upload.py diff --git a/upload.py b/upload.py new file mode 100644 index 0000000..ee53e3b --- /dev/null +++ b/upload.py @@ -0,0 +1,169 @@ +#!/usr/bin/env python3 + +import sys +import requests +import os + +# Optional: caches for tag name -> ID and correspondent name -> ID +TAG_CACHE = {} +CORRESPONDENT_CACHE = {} + +BASE_URL = "http://10.1.135.227:8777" # Adjust if needed + +def get_or_create_tag_id(tag_name, token): + """ + Check if a tag named `tag_name` exists in Paperless. + If not found, create it. + Return the tag ID as an integer. + """ + # If we already resolved this tag, return from cache + if tag_name in TAG_CACHE: + return TAG_CACHE[tag_name] + + headers = {"Authorization": f"Token {token}"} + + # 1) Check if tag already exists by exact name + response = requests.get( + f"{BASE_URL}/api/tags/", + headers=headers, + params={"name": tag_name} + ) + response.raise_for_status() + + results = response.json().get("results", []) + found_id = None + for item in results: + if "name" in item and item["name"].lower() == tag_name.lower(): + found_id = item["id"] + break + + if found_id is not None: + # We have an existing tag + TAG_CACHE[tag_name] = found_id + return found_id + else: + # 2) Create a new tag via POST /api/tags/ + create_resp = requests.post( + f"{BASE_URL}/api/tags/", + headers=headers, + json={"name": tag_name} + ) + create_resp.raise_for_status() + new_tag_data = create_resp.json() + new_id = new_tag_data["id"] + + # Cache it + TAG_CACHE[tag_name] = new_id + return new_id + +def get_or_create_correspondent_id(corr_name, token): + """ + Check if a Correspondent named `corr_name` exists in Paperless. + If not found, create it. + Return the Correspondent ID as an integer. + """ + # If we already resolved this correspondent, return from cache + if corr_name in CORRESPONDENT_CACHE: + return CORRESPONDENT_CACHE[corr_name] + + headers = {"Authorization": f"Token {token}"} + + # 1) Check if correspondent already exists by exact name + response = requests.get( + f"{BASE_URL}/api/correspondents/", + headers=headers, + params={"name": corr_name} + ) + response.raise_for_status() + + results = response.json().get("results", []) + found_id = None + for item in results: + if "name" in item and item["name"].lower() == corr_name.lower(): + found_id = item["id"] + break + + if found_id is not None: + # We have an existing correspondent + CORRESPONDENT_CACHE[corr_name] = found_id + return found_id + else: + # 2) Create a new correspondent via POST /api/correspondents/ + create_resp = requests.post( + f"{BASE_URL}/api/correspondents/", + headers=headers, + json={"name": corr_name} + ) + create_resp.raise_for_status() + new_corr_data = create_resp.json() + new_id = new_corr_data["id"] + + # Cache it + CORRESPONDENT_CACHE[corr_name] = new_id + return new_id + +def upload_file_to_paperless(file_path, token, doc_title, devon_tags): + """ + 1) Resolve each DEVONthink tag to a Paperless tag ID (creating if necessary). + 2) Get/create the Correspondent = "KHCN". + 3) Upload the file to Paperless, including the final list of tag IDs, the doc_title, + and the corresponding correspondent ID. + """ + headers = {"Authorization": f"Token {token}"} + + # Convert each DEVONthink tag to a tag ID in Paperless + paperless_tag_ids = [] + for t in devon_tags: + if t.strip(): + tag_id = get_or_create_tag_id(t.strip(), token) + paperless_tag_ids.append(str(tag_id)) + + # Always set this document's Correspondent to "KHCN" + correspondent_id = get_or_create_correspondent_id("KHCN", token) + + # Build the form data + form_data = [] + form_data.append(("title", doc_title)) + form_data.append(("correspondent", str(correspondent_id))) # The key part + for tid in paperless_tag_ids: + form_data.append(("tags", tid)) + + # Prepare the file for multipart/form-data + with open(file_path, "rb") as f: + files = {"document": (os.path.basename(file_path), f)} + resp = requests.post( + f"{BASE_URL}/api/documents/post_document/", + headers=headers, + files=files, + data=form_data + ) + + resp.raise_for_status() + return resp.json() + +def main(): + """ + Usage: + python3 upload.py /path/to/file PAPERLESS_TOKEN "TITLE" "TAG1||TAG2||TAG3" + """ + if len(sys.argv) < 5: + print("Usage: python3 upload.py /path/to/file TOKEN \"TITLE\" \"TAG1||TAG2||...\"") + sys.exit(1) + + file_path = sys.argv[1] + token = sys.argv[2] + doc_title = sys.argv[3] + raw_tags = sys.argv[4] + + # Split the passed tag string on '||' to get list of tags + devon_tags = raw_tags.split("||") if raw_tags else [] + + try: + result = upload_file_to_paperless(file_path, token, doc_title, devon_tags) + print("Upload started. Paperless returned:\n", result) + except Exception as e: + print("Error uploading document:", e) + sys.exit(1) + +if __name__ == "__main__": + main()