import os import logging import httpx from pathlib import Path from telegram import Update, BotCommand from telegram.ext import ApplicationBuilder, ContextTypes, CommandHandler, MessageHandler, filters # Configuration from Environment TOKEN = os.getenv('TELEGRAM_TOKEN') BASE_DIR = Path(os.getenv('BASE_DIR', '/data')) TEMP_DIR = BASE_DIR / ".temp" DEFAULT_SUBDIR = os.getenv('DEFAULT_SUBDIR', 'books') USER_IDS_STR = os.getenv('USER_IDS', '0') ALLOWED_USER_IDS = [int(uid.strip()) for uid in USER_IDS_STR.split(',') if uid.strip().isdigit()] # Komga API Configuration KOMGA_URL = os.getenv('KOMGA_URL', 'http://komga:8080').rstrip('/') KOMGA_USER = os.getenv('KOMGA_USER') KOMGA_PASS = os.getenv('KOMGA_PASS') logging.basicConfig(format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', level=logging.INFO) # Ensure required directories exist TEMP_DIR.mkdir(parents=True, exist_ok=True) (BASE_DIR / DEFAULT_SUBDIR).mkdir(parents=True, exist_ok=True) async def trigger_komga_scan(): """Triggers a scan for all libraries in Komga.""" if not (KOMGA_USER and KOMGA_PASS): return None try: async with httpx.AsyncClient(auth=(KOMGA_USER, KOMGA_PASS), timeout=10.0) as client: # First, get all libraries to find their IDs libs_res = await client.get(f"{KOMGA_URL}/api/v1/libraries") libs_res.raise_for_status() libraries = libs_res.json() for lib in libraries: lib_id = lib.get('id') if lib_id: scan_res = await client.post(f"{KOMGA_URL}/api/v1/libraries/{lib_id}/scan") scan_res.raise_for_status() return True except Exception as e: logging.error(f"Komga Scan Failed: {e}") return False async def post_init(application): """Sets up the bot menu button commands.""" commands = [ BotCommand("start", "Show help and status"), BotCommand("rescan", "Manually trigger Komga library scan") ] await application.bot.set_my_commands(commands) def is_authorized(update: Update) -> bool: """Checks if the user is in the whitelisted USER_IDS.""" return update.effective_user.id in ALLOWED_USER_IDS async def unauthorized_response(update: Update): """Sends a red cross to unauthorized users with a subtle ID tag.""" await update.message.reply_text(f"āŒ Access Denied. \n`id: {update.effective_user.id}`", parse_mode="Markdown") async def start(update: Update, context: ContextTypes.DEFAULT_TYPE): """Shows the help menu.""" if not is_authorized(update): return await unauthorized_response(update) scan_status = "Enabled" if (KOMGA_USER and KOMGA_PASS) else "Disabled (Missing Credentials)" help_text = ( "šŸ‘‹ **Komga Manager Bot**\n\n" "Send me a **document** to save it to your library.\n\n" f"• **Base Directory:** `/data`\n" f"• **Default Subdirectory:** `/{DEFAULT_SUBDIR}`\n" f"• **Auto-Scan:** `{scan_status}`\n\n" "**Commands:**\n" "• `/rescan`: Manually trigger a Komga library scan" ) await update.message.reply_text(help_text, parse_mode="Markdown") async def manual_rescan(update: Update, context: ContextTypes.DEFAULT_TYPE): """Manually triggers a scan via command.""" if not is_authorized(update): return await unauthorized_response(update) msg = await update.message.reply_text("šŸ”„ Triggering Komga scan...") scan_triggered = await trigger_komga_scan() if scan_triggered: await msg.edit_text("āœ… Komga scan triggered successfully.") elif scan_triggered is None: await msg.edit_text("āŒ Scan disabled: Komga credentials not set in environment.") else: await msg.edit_text("āŒ Failed to trigger Komga scan. Check logs.") async def handle_document(update: Update, context: ContextTypes.DEFAULT_TYPE): """Processes document attachments with atomic move and API scan.""" if not is_authorized(update): return await unauthorized_response(update) if not update.message.document: return await start(update, context) target_dir = (BASE_DIR / DEFAULT_SUBDIR).resolve() file_name = update.message.document.file_name counter = 1 original_stem = Path(file_name).stem suffix = Path(file_name).suffix final_path = target_dir / file_name while final_path.exists(): file_name = f"{original_stem}_{counter}{suffix}" final_path = target_dir / file_name counter += 1 if counter > 1: await update.message.reply_text(f"āš ļø Renamed to: `{file_name}`") msg = await update.message.reply_text("ā³ Downloading...") temp_path = TEMP_DIR / f"{update.message.document.file_id}{suffix}" try: new_file = await context.bot.get_file(update.message.document.file_id) await new_file.download_to_drive(custom_path=temp_path) os.replace(temp_path, final_path) status_msg = f"āœ… Saved to: `/data/{final_path.relative_to(BASE_DIR)}`" # Trigger Scan scan_triggered = await trigger_komga_scan() if scan_triggered: status_msg += "\nšŸ”„ Komga scan triggered." elif scan_triggered is False: status_msg += "\nāš ļø File saved, but Komga scan failed." await msg.edit_text(status_msg) except Exception as e: if temp_path.exists(): os.remove(temp_path) logging.error(f"Error handling document: {e}") await msg.edit_text("āŒ Error saving file.") if __name__ == '__main__': if not TOKEN: print("Error: TELEGRAM_TOKEN not set.") exit(1) app = ApplicationBuilder().token(TOKEN).post_init(post_init).build() app.add_handler(CommandHandler("start", start)) app.add_handler(CommandHandler("rescan", manual_rescan)) app.add_handler(MessageHandler(filters.Document.ALL, handle_document)) app.add_handler(MessageHandler(~filters.COMMAND & ~filters.Document.ALL, start)) print("Komga Bot started...") app.run_polling()