from __future__ import annotations import shutil from pathlib import Path from typing import Any, Dict, List, Optional, Tuple from PluginCore.base import Provider from SYS.metadata import write_metadata, write_tags from SYS.utils import sanitize_filename, sha256_file, unique_path def _copy_sidecars(source_path: Path, target_path: Path) -> None: possible_sidecars = [ source_path.with_suffix(source_path.suffix + ".json"), source_path.with_name(source_path.name + ".tag"), source_path.with_name(source_path.name + ".metadata"), source_path.with_name(source_path.name + ".notes"), ] for sidecar in possible_sidecars: try: if not sidecar.exists(): continue suffix_part = sidecar.name.replace(source_path.name, "", 1) target_sidecar = target_path.parent / f"{target_path.name}{suffix_part}" target_sidecar.parent.mkdir(parents=True, exist_ok=True) shutil.copy2(str(sidecar), target_sidecar) except Exception: continue def _copy_with_progress( source_path: Path, target_path: Path, *, pipeline_progress: Any = None, label: str = "local export", chunk_size: int = 1024 * 1024, ) -> None: total_bytes: Optional[int] = None try: total_bytes = int(source_path.stat().st_size) except Exception: total_bytes = None transfer_started = False completed = 0 transfer_label = str(label or target_path.name or source_path.name) try: if pipeline_progress is not None and hasattr(pipeline_progress, "begin_transfer"): pipeline_progress.begin_transfer( label=transfer_label, total=total_bytes if isinstance(total_bytes, int) and total_bytes > 0 else None, ) transfer_started = True with source_path.open("rb") as src, target_path.open("wb") as dst: while True: chunk = src.read(max(4096, int(chunk_size or 0) or 1024 * 1024)) if not chunk: break dst.write(chunk) completed += len(chunk) if pipeline_progress is not None and hasattr(pipeline_progress, "update_transfer"): pipeline_progress.update_transfer( label=transfer_label, completed=completed, total=total_bytes if isinstance(total_bytes, int) and total_bytes > 0 else None, ) shutil.copystat(str(source_path), str(target_path)) finally: if pipeline_progress is not None and transfer_started and hasattr(pipeline_progress, "finish_transfer"): try: pipeline_progress.finish_transfer(label=transfer_label) except Exception: pass class Local(Provider): PLUGIN_NAME = "local" PLUGIN_ALIASES = ("filesystem", "fs") MULTI_INSTANCE = True SUPPORTED_CMDLETS = frozenset({"add-file"}) @property def label(self) -> str: return "Local Filesystem" @classmethod def config_schema(cls) -> List[Dict[str, Any]]: return [ { "key": "path", "label": "Destination Path", "type": "path", "default": "", "required": True, "placeholder": r"C:\Users\Me\Downloads", }, { "key": "create_dirs", "label": "Create Missing Directories", "type": "boolean", "default": True, }, ] def config_helper_text(self) -> str: return "Configure named local export destinations and use add-file -plugin local -instance ." @staticmethod def _looks_like_path(value: Any) -> bool: text = str(value or "").strip() if not text: return False if text.startswith((".", "~")): return True if "\\" in text or "/" in text: return True if len(text) >= 2 and text[1] == ":": return True return False def _settings_from_config( self, conf: Optional[Dict[str, Any]], *, instance_name: Optional[str] = None, ) -> Dict[str, Any]: entry = dict(conf or {}) path_value = str(entry.get("path") or entry.get("PATH") or "").strip() return { "instance": str(instance_name or entry.get("_instance_name") or "").strip() or None, "path": path_value, "create_dirs": bool(entry.get("create_dirs", entry.get("createDirs", True))), } def resolve_destination( self, instance_name: Optional[str] = None, *, require_explicit: bool = False, ) -> Tuple[Optional[str], Dict[str, Any]]: requested = str(instance_name or "").strip() if requested: resolved_name, conf = self.resolve_plugin_instance(requested, require_explicit=True) settings = self._settings_from_config(conf, instance_name=resolved_name) if settings.get("path"): return resolved_name or requested, settings if self._looks_like_path(requested): return requested, { "instance": requested, "path": requested, "create_dirs": True, } if require_explicit: return None, {} resolved_name, conf = self.resolve_plugin_instance(None, require_explicit=False) settings = self._settings_from_config(conf, instance_name=resolved_name) if settings.get("path"): return resolved_name, settings return None, {} def validate(self) -> bool: return True def upload(self, file_path: str, **kwargs: Any) -> Dict[str, Any]: source_path = Path(str(file_path or "")).expanduser() if not source_path.exists() or not source_path.is_file(): raise FileNotFoundError(f"File not found: {source_path}") pipeline_progress = kwargs.get("pipeline_progress") def _set_status(text: str) -> None: if pipeline_progress is None or not hasattr(pipeline_progress, "set_status"): return try: pipeline_progress.set_status(f"local: {text}") except Exception: pass def _clear_status() -> None: if pipeline_progress is None or not hasattr(pipeline_progress, "clear_status"): return try: pipeline_progress.clear_status() except Exception: pass try: requested_instance = str(kwargs.get("instance") or kwargs.get("store") or "").strip() or None resolved_name, settings = self.resolve_destination( requested_instance, require_explicit=bool(requested_instance), ) destination_text = str(settings.get("path") or "").strip() if not destination_text: requested_label = requested_instance or "" raise ValueError( f"Local destination '{requested_label}' is not configured. Use -plugin local -instance ." ) destination_root = Path(destination_text).expanduser() create_dirs = bool(settings.get("create_dirs", True)) if create_dirs: destination_root.mkdir(parents=True, exist_ok=True) elif not destination_root.exists(): raise FileNotFoundError(f"Destination directory does not exist: {destination_root}") elif not destination_root.is_dir(): raise NotADirectoryError(f"Destination is not a directory: {destination_root}") title = str(kwargs.get("title") or "").strip() if not title: title = source_path.stem.replace("_", " ").strip() base_name = sanitize_filename(title or source_path.stem) file_ext = source_path.suffix if file_ext and base_name.lower().endswith(file_ext.lower()): target_name = base_name else: target_name = base_name + file_ext direct_export_download = bool(kwargs.get("direct_export_download", False)) target_path = source_path if direct_export_download else destination_root / target_name if not direct_export_download: if target_path.exists(): target_path = unique_path(target_path) _set_status(f"copying {target_path.name}") _copy_with_progress( source_path, target_path, pipeline_progress=pipeline_progress, label=str(target_path.name or source_path.name or "local export"), ) _copy_sidecars(source_path, target_path) else: _set_status(f"finalizing {target_path.name}") tags = list(kwargs.get("tags") or []) urls = list(kwargs.get("urls") or []) hash_value = str(kwargs.get("hash_value") or "").strip() or None if not hash_value: try: hash_value = sha256_file(target_path) except Exception: hash_value = None relationships = kwargs.get("relationships") try: _set_status(f"writing metadata for {target_path.name}") write_tags( target_path, tags, urls, hash_value=hash_value, emit_debug=False, ) write_metadata( target_path, hash_value=hash_value, url=urls, relationships=relationships or [], emit_debug=False, ) except Exception: pass extra_updates: Dict[str, Any] = { "url": urls, "export_path": str(destination_root), } if resolved_name: extra_updates["instance"] = resolved_name if relationships: extra_updates["relationships"] = relationships return { "hash": hash_value or "unknown", "store": "local", "provider": self.name, "path": str(target_path), "tag": tags, "title": title or target_path.name, "relationships": relationships, "extra": extra_updates, } finally: _clear_status()