diff --git a/SYS/config.py b/SYS/config.py index c08967b..93ab9cd 100644 --- a/SYS/config.py +++ b/SYS/config.py @@ -486,53 +486,7 @@ def load_config() -> Dict[str, Any]: ) log(summary) - # Try to detect if the most recent audit indicates we previously saved items - # that are no longer present in the loaded config (possible overwrite/restore) - try: - audit_path = Path(db.db_path).with_name("config_audit.log") - if audit_path.exists(): - last_line = None - with audit_path.open("r", encoding="utf-8") as fh: - for line in fh: - if line and line.strip(): - last_line = line - if last_line: - try: - last_entry = json.loads(last_line) - last_provs = set(last_entry.get("providers") or []) - current_provs = set(provs) - missing = sorted(list(last_provs - current_provs)) - if missing: - log( - f"WARNING: Config mismatch on load - last saved providers {sorted(list(last_provs))} " - f"are missing from loaded config: {missing} (last saved {last_entry.get('dt')})" - ) - try: - # Write a forensic mismatch record to help diagnose potential overwrites - mismatch_path = Path(db.db_path).with_name("config_mismatch.log") - record = { - "detected": datetime.datetime.utcnow().isoformat() + "Z", - "db": str(db.db_path), - "db_mtime": mtime, - "last_saved_dt": last_entry.get("dt"), - "last_saved_providers": sorted(list(last_provs)), - "missing": missing, - } - try: - backup_dir = Path(db.db_path).with_name("config_backups") - if backup_dir.exists(): - files = sorted(backup_dir.glob("medios-backup-*.db"), key=lambda p: p.stat().st_mtime, reverse=True) - record["latest_backup"] = str(files[0]) if files else None - except Exception: - pass - with mismatch_path.open("a", encoding="utf-8") as fh: - fh.write(json.dumps(record) + "\n") - except Exception: - pass - except Exception: - pass - except Exception: - pass + # Forensics disabled: audit/mismatch/backup detection removed to simplify code. except Exception: pass return db_config @@ -722,120 +676,13 @@ def save_config(config: Dict[str, Any]) -> int: except Exception as exc: log(f"Warning: WAL checkpoint failed: {exc}") - # Audit to disk so we can correlate saves across restarts and processes. - - # Audit to disk so we can correlate saves across restarts and processes. + # Forensics disabled: audit/logs/backups removed to keep save lean. + # Release the save lock we acquired earlier try: - audit_path = Path(db.db_path).with_name("config_audit.log") - - # Gather non-secret summary info (provider/store names) - provider_names = [] - store_names = [] - try: - pblock = config.get("provider") - if isinstance(pblock, dict): - provider_names = [str(k) for k in pblock.keys()] - except Exception: - provider_names = [] - try: - sblock = config.get("store") - if isinstance(sblock, dict): - store_names = [str(k) for k in sblock.keys()] - except Exception: - store_names = [] - - stack = traceback.format_stack() - caller = stack[-1].strip() if stack else "" - - # Try to include the database file modification time for correlation - db_mtime = None - try: - db_mtime = datetime.datetime.utcfromtimestamp(db.db_path.stat().st_mtime).isoformat() + "Z" - except Exception: - db_mtime = None - - # Create a consistent timestamped backup of the DB so we can recover later - backup_path = None - try: - backup_dir = Path(db.db_path).with_name("config_backups") - backup_dir.mkdir(parents=False, exist_ok=True) - ts = datetime.datetime.utcnow().strftime("%Y%m%dT%H%M%SZ") - candidate = backup_dir / f"medios-backup-{ts}.db" - try: - # Use sqlite backup API for a consistent copy - src_con = sqlite3.connect(str(db.db_path)) - dest_con = sqlite3.connect(str(candidate)) - src_con.backup(dest_con) - dest_con.close() - src_con.close() - backup_path = str(candidate) - except Exception as e: - log(f"Warning: Failed to create DB backup: {e}") - - # Prune older backups (keep last 20) - try: - files = sorted(backup_dir.glob("medios-backup-*.db"), key=lambda p: p.stat().st_mtime, reverse=True) - for old in files[20:]: - try: - old.unlink() - except Exception: - pass - except Exception: - pass - except Exception: - backup_path = None - - # Collect process/exec info and a short hash of the config for forensic tracing - try: - exe = sys.executable - argv = list(sys.argv) - cwd = os.getcwd() - user = getpass.getuser() - try: - cfg_hash = hashlib.md5(json.dumps(config, sort_keys=True).encode('utf-8')).hexdigest() - except Exception: - cfg_hash = None - except Exception: - exe = None - argv = None - cwd = None - user = None - cfg_hash = None - - entry = { - "ts": time.time(), - "dt": datetime.datetime.utcnow().isoformat() + "Z", - "pid": os.getpid(), - "exe": exe, - "argv": argv, - "cwd": cwd, - "user": user, - "stack": "".join(stack[-20:]), - "caller": caller, - "config_hash": cfg_hash, - "saved_entries": saved_entries, - "changed_count": changed_count, - "db": str(db.db_path), - "db_mtime": db_mtime, - "backup": backup_path, - "providers": provider_names, - "stores": store_names, - } - try: - with audit_path.open("a", encoding="utf-8") as fh: - fh.write(json.dumps(entry) + "\n") - except Exception: - # Best-effort; don't fail the save if audit write fails - log("Warning: Failed to write config audit file") + if lock_dir is not None and lock_dir.exists(): + _release_save_lock(lock_dir) except Exception: pass - finally: - # Release the save lock we acquired earlier - try: - if lock_dir is not None and lock_dir.exists(): - _release_save_lock(lock_dir) - except Exception: - pass break except sqlite3.OperationalError as exc: diff --git a/SYS/database.py b/SYS/database.py index 34a73b0..ea5105c 100644 --- a/SYS/database.py +++ b/SYS/database.py @@ -10,7 +10,7 @@ from typing import Any, Dict, List, Optional from contextlib import contextmanager import time import datetime -from SYS.logger import log +from SYS.logger import debug, log # DB execute retry settings (for transient 'database is locked' errors) _DB_EXEC_RETRY_MAX = 5 @@ -61,9 +61,9 @@ class Database: self.db_path = DB_PATH db_existed = self.db_path.exists() if db_existed: - log(f"Opening existing medios.db at {self.db_path}") + debug(f"Opening existing medios.db at {self.db_path}") else: - log(f"Creating medios.db at {self.db_path}") + debug(f"Creating medios.db at {self.db_path}") self.conn = sqlite3.connect( str(self.db_path), diff --git a/TUI/modalscreen/config_modal.py b/TUI/modalscreen/config_modal.py index d156942..173a718 100644 --- a/TUI/modalscreen/config_modal.py +++ b/TUI/modalscreen/config_modal.py @@ -154,6 +154,7 @@ class ConfigModal(ModalScreen): yield ListItem(Label("Global Settings"), id="cat-globals") yield ListItem(Label("Stores"), id="cat-stores") yield ListItem(Label("Providers"), id="cat-providers") + yield ListItem(Label("Tools"), id="cat-tools") with Vertical(id="config-content"): yield ScrollableContainer(id="fields-container") @@ -161,14 +162,17 @@ class ConfigModal(ModalScreen): yield Button("Save", variant="success", id="save-btn") yield Button("Add Store", variant="primary", id="add-store-btn") yield Button("Add Provider", variant="primary", id="add-provider-btn") + yield Button("Add Tool", variant="primary", id="add-tool-btn") yield Button("Back", id="back-btn") - yield Button("Restore Backup", id="restore-backup-btn") - yield Button("Copy DB Path", id="copy-db-btn") yield Button("Close", variant="error", id="cancel-btn") def on_mount(self) -> None: self.query_one("#add-store-btn", Button).display = False self.query_one("#add-provider-btn", Button).display = False + try: + self.query_one("#add-tool-btn", Button).display = False + except Exception: + pass # Update DB path and last-saved on mount try: self.query_one("#config-db-path", Static).update(self._db_path) @@ -213,6 +217,7 @@ class ConfigModal(ModalScreen): try: self.query_one("#add-store-btn", Button).display = (self.current_category == "stores" and self.editing_item_name is None) self.query_one("#add-provider-btn", Button).display = (self.current_category == "providers" and self.editing_item_name is None) + self.query_one("#add-tool-btn", Button).display = (self.current_category == "tools" and self.editing_item_name is None) self.query_one("#back-btn", Button).display = (self.editing_item_name is not None) self.query_one("#save-btn", Button).display = (self.editing_item_name is not None or self.current_category == "globals") except Exception: @@ -238,6 +243,8 @@ class ConfigModal(ModalScreen): self.render_stores(container) elif self.current_category == "providers": self.render_providers(container) + elif self.current_category == "tools": + self.render_tools(container) self.call_after_refresh(do_mount) @@ -354,6 +361,26 @@ class ConfigModal(ModalScreen): ) container.mount(row) + def render_tools(self, container: ScrollableContainer) -> None: + container.mount(Label("Configured Tools", classes="config-label")) + tools = self.config_data.get("tool", {}) + if not tools: + container.mount(Static("No tools configured.")) + else: + for i, (name, _) in enumerate(tools.items()): + edit_id = f"edit-tool-{i}" + del_id = f"del-tool-{i}" + self._button_id_map[edit_id] = ("edit", "tool", name) + self._button_id_map[del_id] = ("del", "tool", name) + + row = Horizontal( + Static(name, classes="item-label"), + Button("Edit", id=edit_id), + Button("Delete", variant="error", id=del_id), + classes="item-row" + ) + container.mount(row) + def render_item_editor(self, container: ScrollableContainer) -> None: item_type = str(self.editing_item_type or "") item_name = str(self.editing_item_name or "") @@ -392,6 +419,18 @@ class ConfigModal(ModalScreen): provider_schema_map[k.upper()] = field_def except Exception: pass + # Fetch Tool schema + if item_type == "tool": + try: + import importlib + mod = importlib.import_module(f"tool.{item_name}") + if hasattr(mod, "config_schema") and callable(mod.config_schema): + for field_def in mod.config_schema(): + k = field_def.get("key") + if k: + provider_schema_map[k.upper()] = field_def + except Exception: + pass # Use columns for better layout of inputs with paste buttons container.mount(Label("Edit Settings")) @@ -569,6 +608,8 @@ class ConfigModal(ModalScreen): self.current_category = "stores" elif event.item.id == "cat-providers": self.current_category = "providers" + elif event.item.id == "cat-tools": + self.current_category = "tools" self.editing_item_name = None self.editing_item_type = None @@ -645,6 +686,10 @@ class ConfigModal(ModalScreen): removed = True if str(name).strip().lower() == "alldebrid": self._remove_alldebrid_store_entry() + elif itype == "tool": + if "tool" in self.config_data and name in self.config_data["tool"]: + del self.config_data["tool"][name] + removed = True if removed: try: saved = self.save_all() @@ -676,31 +721,33 @@ class ConfigModal(ModalScreen): except Exception: pass self.app.push_screen(SelectionModal("Select Provider Type", options), callback=self.on_provider_type_selected) + elif bid == "add-tool-btn": + # Discover tool modules that advertise a config_schema() + options = [] + try: + import pkgutil + import importlib + import tool as _tool_pkg + + for _mod in pkgutil.iter_modules(_tool_pkg.__path__): + try: + mod = importlib.import_module(f"tool.{_mod.name}") + if hasattr(mod, "config_schema") and callable(mod.config_schema): + options.append(_mod.name) + except Exception: + continue + except Exception: + # Fallback to known entry + options = ["ytdlp"] + + if options: + options.sort() + self.app.push_screen(SelectionModal("Select Tool", options), callback=self.on_tool_type_selected) elif bid == "matrix-test-btn": self._request_matrix_test() elif bid == "matrix-rooms-btn": self._open_matrix_room_picker() - elif bid == "restore-backup-btn": - try: - backup = self._get_last_backup_path() - if not backup: - self.notify("No backups available", severity="warning") - else: - # Ask for confirmation via a simple notification and perform restore - self.notify(f"Restoring {backup.name}...", timeout=2) - self._restore_backup_background(str(backup)) - except Exception as exc: - self.notify(f"Restore failed: {exc}", severity="error") - elif bid == "copy-db-btn": - try: - if hasattr(self.app, "copy_to_clipboard"): - self.app.copy_to_clipboard(str(db.db_path)) - self.notify("DB path copied to clipboard") - else: - # Fall back to a visible notification - self.notify(str(db.db_path)) - except Exception: - self.notify("Failed to copy DB path", severity="warning") + # Restore UI removed: backups/audit remain available programmatically elif bid.startswith("paste-"): # Programmatic paste button target_id = bid.replace("paste-", "") @@ -735,64 +782,7 @@ class ConfigModal(ModalScreen): else: self.notify("Clipboard not supported in this terminal", severity="warning") - def _get_last_backup_path(self): - try: - backup_dir = Path(db.db_path).with_name("config_backups") - if not backup_dir.exists(): - return None - files = sorted(backup_dir.glob("medios-backup-*.db"), key=lambda p: p.stat().st_mtime, reverse=True) - return files[0] if files else None - except Exception: - return None - - @work(thread=True) - def _restore_backup_background(self, backup_path: str) -> None: - try: - import sqlite3, json - cfg = {} - with sqlite3.connect(backup_path) as conn: - cur = conn.cursor() - cur.execute("SELECT category, subtype, item_name, key, value FROM config") - rows = cur.fetchall() - for cat, sub, name, key, val in rows: - try: - parsed = json.loads(val) - except Exception: - parsed = val - if cat == 'global': - cfg[key] = parsed - elif cat in ('provider', 'tool'): - cd = cfg.setdefault(cat, {}) - sd = cd.setdefault(sub, {}) - sd[key] = parsed - elif cat == 'store': - cd = cfg.setdefault('store', {}) - sd = cd.setdefault(sub, {}) - nd = sd.setdefault(name, {}) - nd[key] = parsed - else: - cfg.setdefault(cat, {})[key] = parsed - - # Persist restored config using save_config - from SYS.config import save_config, reload_config - saved = save_config(cfg) - # Reload and update UI from main thread - self.app.call_from_thread(self._on_restore_complete, True, backup_path, saved) - except Exception as exc: - self.app.call_from_thread(self._on_restore_complete, False, backup_path, str(exc)) - - def _on_restore_complete(self, success: bool, backup_path: str, saved_or_error): - if success: - # Refresh our in-memory view and UI - try: - from SYS.config import reload_config - self.config_data = reload_config() - self.refresh_view() - except Exception: - pass - self.notify(f"Restore complete: re-saved {saved_or_error} entries from {Path(backup_path).name}") - else: - self.notify(f"Restore failed: {saved_or_error}", severity="error") + # Backup/restore helpers removed: forensics/audit mode disabled and restore UI removed. def on_store_type_selected(self, stype: str) -> None: if not stype: @@ -878,6 +868,31 @@ class ConfigModal(ModalScreen): self.editing_item_name = ptype self.refresh_view() + def on_tool_type_selected(self, tname: str) -> None: + if not tname: + return + self._capture_editor_snapshot() + if "tool" not in self.config_data: + self.config_data["tool"] = {} + + if tname not in self.config_data["tool"]: + new_config = {} + try: + import importlib + mod = importlib.import_module(f"tool.{tname}") + if hasattr(mod, "config_schema") and callable(mod.config_schema): + for field_def in mod.config_schema(): + key = field_def.get("key") + if key: + new_config[key] = field_def.get("default", "") + except Exception: + pass + self.config_data["tool"][tname] = new_config + + self.editing_item_type = "tool" + self.editing_item_name = tname + self.refresh_view() + def _update_config_value(self, widget_id: str, value: Any) -> None: if widget_id not in self._input_id_map: return @@ -1282,6 +1297,19 @@ class ConfigModal(ModalScreen): except Exception: pass section = self.config_data.get("provider", {}).get(item_name, {}) + elif item_type == "tool": + try: + import importlib + mod = importlib.import_module(f"tool.{item_name}") + if hasattr(mod, "config_schema") and callable(mod.config_schema): + for field_def in mod.config_schema(): + if field_def.get("required"): + k = field_def.get("key") + if k and k not in required_keys: + required_keys.append(k) + except Exception: + pass + section = self.config_data.get("tool", {}).get(item_name, {}) # Check required keys for rk in required_keys: diff --git a/cmdlet/download_file.py b/cmdlet/download_file.py index eba88be..4d873a2 100644 --- a/cmdlet/download_file.py +++ b/cmdlet/download_file.py @@ -1561,7 +1561,7 @@ class Download_File(Cmdlet): PipelineProgress(pipeline_context).step("downloading") debug(f"Starting download for {url} (format: {actual_format or 'default'}) with {download_timeout_seconds}s activity timeout...") - result_obj = _download_with_timeout(opts, timeout_seconds=download_timeout_seconds) + result_obj = _download_with_timeout(opts, timeout_seconds=download_timeout_seconds, config=config) debug(f"Download completed for {url}, building pipe object...") break except DownloadError as e: @@ -2318,7 +2318,7 @@ class Download_File(Cmdlet): ) try: - result_obj = _download_with_timeout(opts, timeout_seconds=300) + result_obj = _download_with_timeout(opts, timeout_seconds=300, config=config) except Exception as exc: log(f"[download-file] Download failed for {url_str}: {exc}", file=sys.stderr) return [] diff --git a/tool/playwright.py b/tool/playwright.py index c05cecf..729ed69 100644 --- a/tool/playwright.py +++ b/tool/playwright.py @@ -118,9 +118,10 @@ class PlaywrightTool: FFmpeg resolution (in order): 1. Config key: playwright.ffmpeg_path - 2. Environment variable: PLAYWRIGHT_FFMPEG_PATH - 3. Project bundled: MPV/ffmpeg/bin/ffmpeg[.exe] - 4. System PATH: which ffmpeg + 2. Environment variable: FFMPEG_PATH (global shared env) + 3. Environment variable: PLAYWRIGHT_FFMPEG_PATH (legacy) + 4. Project bundled: MPV/ffmpeg/bin/ffmpeg[.exe] + 5. System PATH: which ffmpeg """ def __init__(self, config: Optional[Dict[str, Any]] = None) -> None: @@ -158,7 +159,19 @@ class PlaywrightTool: headless_raw = _get("headless", defaults.headless) headless = bool(headless_raw) - ua = str(_get("user_agent", defaults.user_agent)) + # Resolve user_agent configuration. Support a 'custom' token which reads + # the actual UA from `user_agent_custom` so the UI can offer a simple + # dropdown without scattering long UA strings to users by default. + ua_raw = _get("user_agent", None) + if ua_raw is None: + ua = defaults.user_agent + else: + ua_s = str(ua_raw).strip() + if ua_s.lower() == "custom": + ua_custom = _get("user_agent_custom", "") + ua = str(ua_custom).strip() or defaults.user_agent + else: + ua = ua_s def _int(name: str, fallback: int) -> int: raw = _get(name, fallback) @@ -173,26 +186,35 @@ class PlaywrightTool: ignore_https = bool(_get("ignore_https_errors", defaults.ignore_https_errors)) - # Try to find ffmpeg: config override, environment variable, bundled, then system - # This checks if ffmpeg is actually available (not just the path to it) + # Try to find ffmpeg: config override, global env FFMPEG_PATH, legacy + # PLAYWRIGHT_FFMPEG_PATH, bundled, then system. This allows Playwright to + # use the shared ffmpeg path used by other tools (FFMPEG_PATH env). ffmpeg_path: Optional[str] = None config_ffmpeg = _get("ffmpeg_path", None) - + if config_ffmpeg: # User explicitly configured ffmpeg path candidate = str(config_ffmpeg).strip() - if Path(candidate).exists(): + if candidate and Path(candidate).exists(): ffmpeg_path = candidate else: debug(f"Configured ffmpeg path does not exist: {candidate}") - + if not ffmpeg_path: - # Check environment variable (supports project ffmpeg) - env_ffmpeg = os.environ.get("PLAYWRIGHT_FFMPEG_PATH") + # Prefer a global FFMPEG_PATH env var (shared by tools) before Playwright-specific one + env_ffmpeg = os.environ.get("FFMPEG_PATH") if env_ffmpeg and Path(env_ffmpeg).exists(): ffmpeg_path = env_ffmpeg elif env_ffmpeg: - debug(f"PLAYWRIGHT_FFMPEG_PATH set but path does not exist: {env_ffmpeg}") + debug(f"FFMPEG_PATH set but path does not exist: {env_ffmpeg}") + + if not ffmpeg_path: + # Backward-compatible Playwright-specific env var + env_ffmpeg2 = os.environ.get("PLAYWRIGHT_FFMPEG_PATH") + if env_ffmpeg2 and Path(env_ffmpeg2).exists(): + ffmpeg_path = env_ffmpeg2 + elif env_ffmpeg2: + debug(f"PLAYWRIGHT_FFMPEG_PATH set but path does not exist: {env_ffmpeg2}") if not ffmpeg_path: # Try to find bundled ffmpeg in the project (Windows-only, in MPV/ffmpeg/bin) @@ -229,6 +251,77 @@ class PlaywrightTool: ffmpeg_path=ffmpeg_path, ) + +def config_schema() -> List[Dict[str, Any]]: + """Return a schema describing editable Playwright tool defaults for the config UI. + + Notes: + - `user_agent` is a dropdown with a `custom` option; put the real UA in + `user_agent_custom` when choosing `custom`. + - Viewport dimensions are offered as convenient choices. + - `ffmpeg_path` intentionally defaults to empty; Playwright will consult + a global `FFMPEG_PATH` environment variable (or fallback to bundled/system). + """ + _defaults = PlaywrightDefaults() + + browser_choices = ["chromium", "firefox", "webkit"] + viewport_width_choices = [1920, 1366, 1280, 1024, 800] + viewport_height_choices = [1080, 900, 768, 720, 600] + + return [ + { + "key": "browser", + "label": "Playwright browser", + "default": _defaults.browser, + "choices": browser_choices, + }, + { + "key": "headless", + "label": "Headless", + "default": str(_defaults.headless), + "choices": ["true", "false"], + }, + { + "key": "user_agent", + "label": "User Agent", + "default": "default", + "choices": ["default", "native", "custom"], + }, + { + "key": "user_agent_custom", + "label": "Custom User Agent (used when User Agent = custom)", + "default": "", + }, + { + "key": "viewport_width", + "label": "Viewport width", + "default": _defaults.viewport_width, + "choices": viewport_width_choices, + }, + { + "key": "viewport_height", + "label": "Viewport height", + "default": _defaults.viewport_height, + "choices": viewport_height_choices, + }, + { + "key": "navigation_timeout_ms", + "label": "Navigation timeout (ms)", + "default": _defaults.navigation_timeout_ms, + }, + { + "key": "ignore_https_errors", + "label": "Ignore HTTPS errors", + "default": str(_defaults.ignore_https_errors), + "choices": ["true", "false"], + }, + { + "key": "ffmpeg_path", + "label": "FFmpeg path (leave empty to use global/bundled)", + "default": "", + }, + ] + def require(self) -> None: """Ensure Playwright is present; raise a helpful RuntimeError if not.""" try: diff --git a/tool/ytdlp.py b/tool/ytdlp.py index 1af0122..b54646f 100644 --- a/tool/ytdlp.py +++ b/tool/ytdlp.py @@ -220,11 +220,96 @@ def _has_browser_cookie_database() -> bool: return False -def _add_browser_cookies_if_available(options: Dict[str, Any]) -> None: +def _browser_cookie_path_for(browser_name: str) -> Optional[Path]: + """Return the cookie DB Path for a specific browser if present, else None. + + Supported browsers (case-insensitive): "chrome", "chromium", "brave". + """ + name = str(browser_name or "").strip().lower() + if not name: + return None + + try: + home = Path.home() + except Exception: + home = Path.cwd() + + # Windows + if os.name == "nt": + for env_value in (os.getenv("LOCALAPPDATA"), os.getenv("APPDATA")): + if not env_value: + continue + base = Path(env_value) + if name in ("chrome", "google-chrome"): + p = base / "Google" / "Chrome" / "User Data" / "Default" / "Cookies" + if p.is_file(): + return p + if name == "chromium": + p = base / "Chromium" / "User Data" / "Default" / "Cookies" + if p.is_file(): + return p + if name in ("brave", "brave-browser"): + p = base / "BraveSoftware" / "Brave-Browser" / "User Data" / "Default" / "Cookies" + if p.is_file(): + return p + + # *nix and macOS + if sys.platform == "darwin": + if name in ("chrome", "google-chrome"): + p = home / "Library" / "Application Support" / "Google" / "Chrome" / "Default" / "Cookies" + if p.is_file(): + return p + if name == "chromium": + p = home / "Library" / "Application Support" / "Chromium" / "Default" / "Cookies" + if p.is_file(): + return p + if name in ("brave", "brave-browser"): + p = home / "Library" / "Application Support" / "BraveSoftware" / "Brave-Browser" / "Default" / "Cookies" + if p.is_file(): + return p + + # Linux and other + if name in ("chrome", "google-chrome"): + p = home / ".config" / "google-chrome" / "Default" / "Cookies" + if p.is_file(): + return p + if name == "chromium": + p = home / ".config" / "chromium" / "Default" / "Cookies" + if p.is_file(): + return p + if name in ("brave", "brave-browser"): + p = home / ".config" / "BraveSoftware" / "Brave-Browser" / "Default" / "Cookies" + if p.is_file(): + return p + + return None + + +def _add_browser_cookies_if_available(options: Dict[str, Any], preferred_browser: Optional[str] = None) -> None: global _BROWSER_COOKIE_WARNING_EMITTED - if _has_browser_cookie_database(): - options["cookiesfrombrowser"] = ["chrome"] - return + + # If a preferred browser is specified, try to use it if available + if preferred_browser: + try: + if _browser_cookie_path_for(preferred_browser) is not None: + options["cookiesfrombrowser"] = [preferred_browser] + return + else: + if not _BROWSER_COOKIE_WARNING_EMITTED: + log(f"Requested browser cookie DB '{preferred_browser}' not found; falling back to autodetect.") + _BROWSER_COOKIE_WARNING_EMITTED = True + except Exception: + pass + + # Auto-detect in common order (chrome/chromium/brave) + for candidate in ("chrome", "chromium", "brave"): + try: + if _browser_cookie_path_for(candidate) is not None: + options["cookiesfrombrowser"] = [candidate] + return + except Exception: + continue + if not _BROWSER_COOKIE_WARNING_EMITTED: log( "Browser cookie extraction skipped because no Chrome-compatible cookie database was found. " @@ -624,17 +709,19 @@ class YtDlpDefaults: """User-tunable defaults for yt-dlp behavior. Recommended config.conf keys (top-level dotted keys): - - ytdlp.video_format="bestvideo+bestaudio/best" - - ytdlp.audio_format="251/140/bestaudio" + - format="best|1080|720|640|audio" - ytdlp.format_sort="res:2160,res:1440,res:1080,res:720,res" Cookies: - cookies="C:\\path\\cookies.txt" (already supported by config.resolve_cookies_path) + - cookies_from_browser="auto|none|chrome|brave|chromium" """ + format: str = "best" video_format: str = "bestvideo+bestaudio/best" audio_format: str = "251/140/bestaudio" format_sort: Optional[List[str]] = None + cookies_from_browser: Optional[str] = None class YtDlpTool: @@ -737,7 +824,29 @@ class YtDlpTool: ) fmt_sort = _parse_csv_list(fmt_sort_val) + # Cookie source preference: allow forcing a browser DB or 'auto'/'none' + cookies_pref = ( + tool_block.get("cookies_from_browser") + or tool_block.get("cookiesfrombrowser") + or ytdlp_block.get("cookies_from_browser") + or ytdlp_block.get("cookiesfrombrowser") + or cfg.get("ytdlp_cookies_from_browser") + or _get_nested(cfg, "ytdlp", "cookies_from_browser") + ) + + # Unified format preference: prefer explicit 'format' key but accept legacy keys + format_pref = ( + tool_block.get("format") + or tool_block.get("video_format") + or ytdlp_block.get("format") + or ytdlp_block.get("video_format") + or cfg.get("ytdlp_format") + or cfg.get("ytdlp_video_format") + or _get_nested(cfg, "ytdlp", "format") + ) + defaults = YtDlpDefaults( + format=str(format_pref).strip() if format_pref else "best", video_format=str( nested_video or video_format or _fallback_defaults.video_format ), @@ -745,18 +854,36 @@ class YtDlpTool: nested_audio or audio_format or _fallback_defaults.audio_format ), format_sort=fmt_sort, + cookies_from_browser=(str(cookies_pref).strip() if cookies_pref else None), ) return defaults + + def resolve_cookiefile(self) -> Optional[Path]: return self._cookiefile def default_format(self, mode: str) -> str: + """Determine the final yt-dlp format string. + + Priority: + - If caller explicitly requested audio mode (mode == 'audio'), return audio format. + - If configured default format is 'audio', return audio format. + - If configured default is 'best' or blank, return video_format. + - Otherwise return the configured format value (e.g., '720'). + """ m = str(mode or "").lower().strip() if m == "audio": return self.defaults.audio_format - return self.defaults.video_format + + cfg = (str(self.defaults.format or "")).strip() + lc = cfg.lower() + if lc == "audio": + return self.defaults.audio_format + if not cfg or lc == "best": + return self.defaults.video_format + return cfg def build_ytdlp_options(self, opts: DownloadOptions) -> Dict[str, Any]: """Translate DownloadOptions into yt-dlp API options.""" @@ -796,21 +923,69 @@ class YtDlpTool: if cookiefile is not None and cookiefile.is_file(): base_options["cookiefile"] = str(cookiefile) else: - # Add browser cookies support "just in case" if no file found (best effort) - # This uses yt-dlp's support for extracting from common browsers. - # Defaulting to 'chrome' as the most common path. - _add_browser_cookies_if_available(base_options) + # Respect configured browser cookie preference if provided; otherwise fall back to auto-detect. + pref = (self.defaults.cookies_from_browser or "").lower().strip() + if pref: + if pref in {"none", "off", "false"}: + # Explicitly disabled + pass + elif pref in {"auto", "detect"}: + _add_browser_cookies_if_available(base_options) + else: + # Try the preferred browser first; fall back to auto-detect if not present + _add_browser_cookies_if_available(base_options, preferred_browser=pref) + else: + # Add browser cookies support "just in case" if no file found (best effort) + _add_browser_cookies_if_available(base_options) - # Special handling for format keywords + # Special handling for format keywords explicitly passed in via options if opts.ytdl_format == "audio": - opts = opts._replace(mode="audio", ytdl_format=None) + try: + opts = opts._replace(mode="audio", ytdl_format=None) + except Exception: + try: + import dataclasses as _dc + + opts = _dc.replace(opts, mode="audio", ytdl_format=None) + except Exception: + pass elif opts.ytdl_format == "video": - opts = opts._replace(mode="video", ytdl_format=None) + try: + opts = opts._replace(mode="video", ytdl_format=None) + except Exception: + try: + import dataclasses as _dc + + opts = _dc.replace(opts, mode="video", ytdl_format=None) + except Exception: + pass if opts.no_playlist: base_options["noplaylist"] = True + # If no explicit format was provided, honor the configured default format ytdl_format = opts.ytdl_format + if not ytdl_format: + configured_format = (str(self.defaults.format or "")).strip() + if configured_format: + if configured_format.lower() == "audio": + # Default to audio-only downloads + try: + opts = opts._replace(mode="audio") + except Exception: + try: + import dataclasses as _dc + + opts = _dc.replace(opts, mode="audio") + except Exception: + pass + ytdl_format = None + else: + # Leave ytdl_format None so that default_format(opts.mode) + # returns the configured format literally (e.g., '720') and + # we don't auto-convert it to an internal selector. + pass + if ytdl_format and opts.mode != "audio": resolved = self.resolve_height_selector(ytdl_format) if resolved: @@ -958,6 +1133,45 @@ class YtDlpTool: pass +def config_schema() -> List[Dict[str, Any]]: + """Return a schema describing editable YT-DLP tool defaults for the config UI.""" + format_choices = [ + "best", + "1080", + "720", + "640", + "audio", + ] + + # Offer browser choices depending on what's present on the host system + browser_choices = ["auto", "none"] + for b in ("chrome", "chromium", "brave"): + try: + if _browser_cookie_path_for(b) is not None: + browser_choices.append(b) + except Exception: + continue + + return [ + { + "key": "format", + "label": "Default format", + "default": YtDlpDefaults.format, + "choices": format_choices, + }, + { + "key": "cookies", + "label": "Cookie file (path)", + "default": "", + }, + { + "key": "cookies_from_browser", + "label": "Browser cookie source (used if no cookie file)", + "default": "auto", + "choices": browser_choices, + }, + ] + # Progress + utility helpers for yt-dlp driven downloads (previously in cmdlet/download_media). _YTDLP_PROGRESS_BAR = ProgressBar() _YTDLP_PROGRESS_ACTIVITY_LOCK = threading.Lock() @@ -1483,8 +1697,12 @@ except ImportError: extract_ytdlp_tags = None # type: ignore -def download_media(opts: DownloadOptions, *, debug_logger: Optional[DebugLogger] = None) -> Any: - """Download streaming media exclusively via yt-dlp.""" +def download_media(opts: DownloadOptions, *, config: Optional[Dict[str, Any]] = None, debug_logger: Optional[DebugLogger] = None) -> Any: + """Download streaming media exclusively via yt-dlp. + + Optional `config` dict may be provided so tool defaults (e.g., cookies, default + format) are applied when constructing the YtDlpTool instance. + """ debug(f"[download_media] start: {opts.url}") try: @@ -1533,7 +1751,8 @@ def download_media(opts: DownloadOptions, *, debug_logger: Optional[DebugLogger] ensure_yt_dlp_ready() - ytdlp_tool = YtDlpTool() + # Use provided config when available so user tool settings are honored + ytdlp_tool = YtDlpTool(config or {}) ytdl_options = ytdlp_tool.build_ytdlp_options(opts) hooks = ytdl_options.get("progress_hooks") if not isinstance(hooks, list): @@ -1817,7 +2036,7 @@ def download_media(opts: DownloadOptions, *, debug_logger: Optional[DebugLogger] return DownloadMediaResult(path=media_path, info=entry, tag=tags_res, source_url=source_url, hash_value=hash_value) -def _download_with_timeout(opts: DownloadOptions, timeout_seconds: int = 300) -> Any: +def _download_with_timeout(opts: DownloadOptions, timeout_seconds: int = 300, config: Optional[Dict[str, Any]] = None) -> Any: import threading from typing import cast @@ -1825,7 +2044,7 @@ def _download_with_timeout(opts: DownloadOptions, timeout_seconds: int = 300) -> def _do_download() -> None: try: - result_container[0] = download_media(opts) + result_container[0] = download_media(opts, config=config) except Exception as exc: result_container[1] = exc