update
This commit is contained in:
+431
-65
@@ -15,11 +15,44 @@ from SYS.logger import log
|
||||
from SYS import pipeline as ctx
|
||||
from SYS.result_table import Table
|
||||
from cmdnat._parsing import (
|
||||
VALUE_ARG_FLAGS,
|
||||
extract_piped_value as _extract_piped_value,
|
||||
extract_arg_value as _extract_arg_value,
|
||||
extract_value_arg as _extract_value_arg,
|
||||
has_flag as _has_flag,
|
||||
)
|
||||
|
||||
|
||||
_PREFERENCES_BROWSE_PATH = "__preferences__"
|
||||
_PLUGINS_BROWSE_PATH = "__plugins__"
|
||||
_PLUGIN_CATEGORY_KEYS = ("plugin", "provider", "tool")
|
||||
_KNOWN_SECTION_LABELS = {
|
||||
"plugin": "Plugins",
|
||||
"provider": "Plugins",
|
||||
"tool": "Plugins",
|
||||
}
|
||||
_KNOWN_SECTION_DESCRIPTIONS = {
|
||||
_PREFERENCES_BROWSE_PATH: "Global preferences and simple values",
|
||||
_PLUGINS_BROWSE_PATH: "All configured plugins and plugin instances",
|
||||
"provider": "Plugin configuration",
|
||||
"plugin": "Plugin configuration",
|
||||
"tool": "Plugin configuration",
|
||||
}
|
||||
_SENSITIVE_CONFIG_KEYS = {
|
||||
"access_key",
|
||||
"access_token",
|
||||
"api",
|
||||
"api_key",
|
||||
"apikey",
|
||||
"authorization",
|
||||
"bearer_token",
|
||||
"cookie",
|
||||
"cookies",
|
||||
"password",
|
||||
"secret",
|
||||
"token",
|
||||
}
|
||||
|
||||
CMDLET = Cmdlet(
|
||||
name=".config",
|
||||
summary="Manage configuration settings",
|
||||
@@ -173,29 +206,316 @@ def _show_config_logs(args: Sequence[str]) -> int:
|
||||
return 0
|
||||
|
||||
|
||||
def flatten_config(config: Dict[str, Any], parent_key: str = "", sep: str = ".") -> List[Dict[str, Any]]:
|
||||
items: List[Dict[str, Any]] = []
|
||||
for k, v in config.items():
|
||||
if k.startswith("_"):
|
||||
continue
|
||||
new_key = f"{parent_key}{sep}{k}" if parent_key else k
|
||||
if isinstance(v, dict):
|
||||
items.extend(flatten_config(v, new_key, sep=sep))
|
||||
else:
|
||||
items.append({
|
||||
"key": new_key,
|
||||
"value": v,
|
||||
"value_display": str(v),
|
||||
"type": type(v).__name__,
|
||||
})
|
||||
return items
|
||||
|
||||
|
||||
def set_nested_config(config: Dict[str, Any], key: str, value: str) -> bool:
|
||||
return set_nested_config_value(config, key, value, on_error=print)
|
||||
|
||||
|
||||
def _get_selected_config_key() -> Optional[str]:
|
||||
def _visible_config_entries(config_data: Any) -> List[tuple[str, Any]]:
|
||||
if not isinstance(config_data, dict):
|
||||
return []
|
||||
return [
|
||||
(str(key), value)
|
||||
for key, value in config_data.items()
|
||||
if isinstance(key, str) and not key.startswith("_")
|
||||
]
|
||||
|
||||
|
||||
def _format_config_label(value: Any) -> str:
|
||||
text = str(value or "").strip()
|
||||
if not text:
|
||||
return "Configuration"
|
||||
if text == _PREFERENCES_BROWSE_PATH:
|
||||
return "Preferences"
|
||||
if text == _PLUGINS_BROWSE_PATH:
|
||||
return "Plugins"
|
||||
return text.replace("_", " ").replace("-", " ").strip().title()
|
||||
|
||||
|
||||
def _format_config_path_label(browse_path: Optional[str]) -> str:
|
||||
text = str(browse_path or "").strip()
|
||||
if not text:
|
||||
return "Root"
|
||||
if text == _PREFERENCES_BROWSE_PATH:
|
||||
return "Preferences"
|
||||
if text == _PLUGINS_BROWSE_PATH:
|
||||
return "Plugins"
|
||||
parts = [part for part in text.split(".") if part]
|
||||
formatted: List[str] = []
|
||||
for idx, part in enumerate(parts):
|
||||
if idx == 0 and part in _PLUGIN_CATEGORY_KEYS:
|
||||
formatted.append("Plugins")
|
||||
else:
|
||||
formatted.append(_format_config_label(part))
|
||||
return " / ".join(formatted)
|
||||
|
||||
|
||||
def _format_config_value(value: Any) -> str:
|
||||
if isinstance(value, bool):
|
||||
return "true" if value else "false"
|
||||
if value is None:
|
||||
return "null"
|
||||
if isinstance(value, (list, tuple, set)):
|
||||
return ", ".join(str(item) for item in value)
|
||||
return str(value)
|
||||
|
||||
|
||||
def _is_sensitive_config_key(key_path: str) -> bool:
|
||||
leaf = str(key_path or "").split(".")[-1].strip().lower()
|
||||
return leaf in _SENSITIVE_CONFIG_KEYS
|
||||
|
||||
|
||||
def _format_config_entry_count(value: Any) -> str:
|
||||
count = len(_visible_config_entries(value)) if isinstance(value, dict) else 0
|
||||
if count == 1:
|
||||
return "1 entry"
|
||||
return f"{count} entries"
|
||||
|
||||
|
||||
def _iter_plugin_branches(config_data: Dict[str, Any]) -> List[tuple[str, str, Any]]:
|
||||
branches: List[tuple[str, str, Any]] = []
|
||||
if not isinstance(config_data, dict):
|
||||
return branches
|
||||
|
||||
for category in _PLUGIN_CATEGORY_KEYS:
|
||||
category_block = config_data.get(category)
|
||||
if not isinstance(category_block, dict):
|
||||
continue
|
||||
for name, value in _visible_config_entries(category_block):
|
||||
branches.append((category, name, value))
|
||||
return branches
|
||||
|
||||
|
||||
def _collect_plugin_root_items(config_data: Dict[str, Any]) -> List[Dict[str, Any]]:
|
||||
plugin_items: Dict[str, Dict[str, Any]] = {}
|
||||
for category, name, value in _iter_plugin_branches(config_data):
|
||||
key = str(name or "").strip().lower()
|
||||
if not key:
|
||||
continue
|
||||
existing = plugin_items.get(key)
|
||||
if existing is None:
|
||||
plugin_items[key] = {
|
||||
"kind": "section",
|
||||
"title": _format_config_label(name),
|
||||
"browse_path": f"{category}.{name}",
|
||||
"summary": _format_config_entry_count(value),
|
||||
"type": "section",
|
||||
"description": "Plugin configuration",
|
||||
}
|
||||
continue
|
||||
|
||||
if str(category) == "plugin" and not str(existing.get("browse_path") or "").startswith("plugin."):
|
||||
existing["browse_path"] = f"{category}.{name}"
|
||||
try:
|
||||
current_count = int(str(existing.get("summary") or "0").split()[0])
|
||||
except Exception:
|
||||
current_count = 0
|
||||
extra_count = len(_visible_config_entries(value)) if isinstance(value, dict) else 0
|
||||
merged_count = current_count + extra_count
|
||||
existing["summary"] = "1 entry" if merged_count == 1 else f"{merged_count} entries"
|
||||
|
||||
return sorted(plugin_items.values(), key=lambda item: str(item.get("title") or "").lower())
|
||||
|
||||
|
||||
def _resolve_config_branch(
|
||||
config_data: Dict[str, Any],
|
||||
browse_path: Optional[str],
|
||||
) -> Optional[Dict[str, Any]]:
|
||||
text = str(browse_path or "").strip()
|
||||
if not text:
|
||||
return config_data if isinstance(config_data, dict) else None
|
||||
|
||||
if text == _PREFERENCES_BROWSE_PATH:
|
||||
return {
|
||||
key: value
|
||||
for key, value in _visible_config_entries(config_data)
|
||||
if not isinstance(value, dict)
|
||||
}
|
||||
|
||||
if text == _PLUGINS_BROWSE_PATH:
|
||||
return {
|
||||
str(item.get("title") or ""): item
|
||||
for item in _collect_plugin_root_items(config_data)
|
||||
}
|
||||
|
||||
current: Any = config_data
|
||||
for part in text.split("."):
|
||||
if not isinstance(current, dict):
|
||||
return None
|
||||
current = current.get(part)
|
||||
return current if isinstance(current, dict) else None
|
||||
|
||||
|
||||
def _build_section_item(
|
||||
*,
|
||||
title: str,
|
||||
browse_path: str,
|
||||
value: Any,
|
||||
description: Optional[str] = None,
|
||||
) -> Dict[str, Any]:
|
||||
return {
|
||||
"kind": "section",
|
||||
"title": title,
|
||||
"browse_path": browse_path,
|
||||
"summary": _format_config_entry_count(value),
|
||||
"type": "section",
|
||||
"description": str(description or "").strip() or _KNOWN_SECTION_DESCRIPTIONS.get(browse_path, ""),
|
||||
}
|
||||
|
||||
|
||||
def _build_value_item(
|
||||
*,
|
||||
key_path: str,
|
||||
name: str,
|
||||
value: Any,
|
||||
) -> Dict[str, Any]:
|
||||
display_value = "***" if _is_sensitive_config_key(key_path) else _format_config_value(value)
|
||||
path_parts = [part for part in str(key_path or "").split(".") if part]
|
||||
display_path = " / ".join(
|
||||
[_format_config_path_label(".".join(path_parts[:-1]))] if len(path_parts) > 1 else []
|
||||
+ [_format_config_label(path_parts[-1])] if path_parts else [_format_config_label(name)]
|
||||
)
|
||||
return {
|
||||
"kind": "value",
|
||||
"key": key_path,
|
||||
"name": name,
|
||||
"title": _format_config_label(name),
|
||||
"value": value,
|
||||
"value_display": display_value,
|
||||
"display_path": display_path,
|
||||
"type": type(value).__name__,
|
||||
}
|
||||
|
||||
|
||||
def _build_root_config_items(config_data: Dict[str, Any]) -> List[Dict[str, Any]]:
|
||||
items: List[Dict[str, Any]] = []
|
||||
visible_entries = _visible_config_entries(config_data)
|
||||
|
||||
preferences = {
|
||||
key: value
|
||||
for key, value in visible_entries
|
||||
if not isinstance(value, dict)
|
||||
}
|
||||
if preferences:
|
||||
items.append(
|
||||
_build_section_item(
|
||||
title="Preferences",
|
||||
browse_path=_PREFERENCES_BROWSE_PATH,
|
||||
value=preferences,
|
||||
)
|
||||
)
|
||||
|
||||
plugin_items = _collect_plugin_root_items(config_data)
|
||||
if plugin_items:
|
||||
items.append(
|
||||
_build_section_item(
|
||||
title="Plugins",
|
||||
browse_path=_PLUGINS_BROWSE_PATH,
|
||||
value={item["title"]: item for item in plugin_items},
|
||||
)
|
||||
)
|
||||
|
||||
other_sections: List[Dict[str, Any]] = []
|
||||
for key, value in visible_entries:
|
||||
if key in set(_PLUGIN_CATEGORY_KEYS) or not isinstance(value, dict):
|
||||
continue
|
||||
other_sections.append(
|
||||
_build_section_item(
|
||||
title=_format_config_label(key),
|
||||
browse_path=key,
|
||||
value=value,
|
||||
)
|
||||
)
|
||||
|
||||
other_sections.sort(key=lambda item: str(item.get("title") or "").lower())
|
||||
items.extend(other_sections)
|
||||
return items
|
||||
|
||||
|
||||
def _build_nested_config_items(
|
||||
config_data: Dict[str, Any],
|
||||
browse_path: str,
|
||||
) -> List[Dict[str, Any]]:
|
||||
if browse_path == _PLUGINS_BROWSE_PATH:
|
||||
return _collect_plugin_root_items(config_data)
|
||||
|
||||
branch = _resolve_config_branch(config_data, browse_path)
|
||||
if branch is None:
|
||||
return []
|
||||
|
||||
section_items: List[Dict[str, Any]] = []
|
||||
value_items: List[Dict[str, Any]] = []
|
||||
is_preferences_view = browse_path == _PREFERENCES_BROWSE_PATH
|
||||
|
||||
for key, value in _visible_config_entries(branch):
|
||||
full_key = key if is_preferences_view else f"{browse_path}.{key}"
|
||||
if isinstance(value, dict):
|
||||
section_items.append(
|
||||
_build_section_item(
|
||||
title=_format_config_label(key),
|
||||
browse_path=full_key,
|
||||
value=value,
|
||||
)
|
||||
)
|
||||
else:
|
||||
value_items.append(
|
||||
_build_value_item(
|
||||
key_path=full_key,
|
||||
name=key,
|
||||
value=value,
|
||||
)
|
||||
)
|
||||
|
||||
section_items.sort(key=lambda item: str(item.get("title") or "").lower())
|
||||
value_items.sort(key=lambda item: str(item.get("name") or "").lower())
|
||||
return section_items + value_items
|
||||
|
||||
|
||||
def _build_config_items(
|
||||
config_data: Dict[str, Any],
|
||||
browse_path: Optional[str] = None,
|
||||
) -> List[Dict[str, Any]]:
|
||||
text = str(browse_path or "").strip()
|
||||
if not text:
|
||||
return _build_root_config_items(config_data)
|
||||
return _build_nested_config_items(config_data, text)
|
||||
|
||||
|
||||
def _build_config_table_title(browse_path: Optional[str]) -> str:
|
||||
text = str(browse_path or "").strip()
|
||||
if not text:
|
||||
return "Configuration"
|
||||
return f"Configuration: {_format_config_path_label(text)}"
|
||||
|
||||
|
||||
def _build_config_header_lines(browse_path: Optional[str]) -> List[str]:
|
||||
text = str(browse_path or "").strip()
|
||||
if not text:
|
||||
return [
|
||||
"Use @N on a section to drill in. Use @.. to go back.",
|
||||
]
|
||||
return [
|
||||
f"Path: {_format_config_path_label(text)}",
|
||||
"Use @N on a section to drill in. Use @N | .config <value> to update a setting. Use @.. to go back.",
|
||||
]
|
||||
|
||||
|
||||
def _extract_browse_arg(args: Sequence[str]) -> Optional[str]:
|
||||
return _extract_arg_value(args, flags={"-browse", "--browse"}, allow_positional=False)
|
||||
|
||||
|
||||
def _extract_selected_update_value(args: Sequence[str]) -> Optional[str]:
|
||||
explicit = _extract_arg_value(args, flags=VALUE_ARG_FLAGS, allow_positional=False)
|
||||
if explicit is not None:
|
||||
return explicit
|
||||
|
||||
tokens = [str(arg).strip() for arg in (args or []) if str(arg).strip()]
|
||||
positional = [token for token in tokens if not token.startswith("-")]
|
||||
if len(positional) == 1:
|
||||
return positional[0]
|
||||
return None
|
||||
|
||||
|
||||
def _get_selected_config_item() -> Optional[Dict[str, Any]]:
|
||||
try:
|
||||
indices = ctx.get_last_selection() or []
|
||||
except Exception:
|
||||
@@ -211,32 +531,83 @@ def _get_selected_config_key() -> Optional[str]:
|
||||
return None
|
||||
item = items[idx]
|
||||
if isinstance(item, dict):
|
||||
return item.get("key")
|
||||
return getattr(item, "key", None)
|
||||
return item
|
||||
|
||||
normalized: Dict[str, Any] = {}
|
||||
for key in ("kind", "key", "title", "browse_path", "name", "value", "value_display", "type"):
|
||||
try:
|
||||
value = getattr(item, key, None)
|
||||
except Exception:
|
||||
value = None
|
||||
if value is not None:
|
||||
normalized[key] = value
|
||||
return normalized or None
|
||||
|
||||
|
||||
def _show_config_table(config_data: Dict[str, Any]) -> int:
|
||||
items = flatten_config(config_data)
|
||||
def _show_config_table(
|
||||
config_data: Dict[str, Any],
|
||||
*,
|
||||
browse_path: Optional[str] = None,
|
||||
) -> int:
|
||||
items = _build_config_items(config_data, browse_path=browse_path)
|
||||
if not items:
|
||||
print("No configuration entries available.")
|
||||
path_text = _format_config_path_label(browse_path)
|
||||
print(f"No configuration entries available for {path_text}.")
|
||||
return 0
|
||||
items.sort(key=lambda x: x.get("key"))
|
||||
|
||||
table = Table("Configuration")
|
||||
table = Table(_build_config_table_title(browse_path), preserve_order=True)
|
||||
table.set_table("config")
|
||||
table.set_source_command(".config", [])
|
||||
if browse_path:
|
||||
table.set_source_command(".config", ["-browse", str(browse_path)])
|
||||
else:
|
||||
table.set_source_command(".config", [])
|
||||
table.set_header_lines(_build_config_header_lines(browse_path))
|
||||
|
||||
for item in items:
|
||||
for idx, item in enumerate(items):
|
||||
row = table.add_row()
|
||||
row.add_column("Key", item.get("key", ""))
|
||||
row.add_column("Value", item.get("value_display", ""))
|
||||
row.add_column("Name", item.get("title", ""))
|
||||
row.add_column("Value", item.get("summary") or item.get("value_display", ""))
|
||||
row.add_column("Type", item.get("type", ""))
|
||||
if item.get("kind") == "section" and item.get("browse_path"):
|
||||
table.set_row_selection_action(
|
||||
idx,
|
||||
[".config", "-browse", str(item.get("browse_path"))],
|
||||
)
|
||||
|
||||
ctx.set_last_result_table_overlay(table, items)
|
||||
ctx.set_last_result_table(table, items)
|
||||
ctx.set_current_stage_table(table)
|
||||
return 0
|
||||
|
||||
|
||||
def _save_updated_config(config_data: Dict[str, Any], key_path: str) -> None:
|
||||
try:
|
||||
key_l = str(key_path or "").lower()
|
||||
except Exception:
|
||||
key_l = ""
|
||||
if "alldebrid" in key_l or "all-debrid" in key_l:
|
||||
save_config_and_verify(config_data)
|
||||
return
|
||||
save_config(config_data)
|
||||
|
||||
|
||||
def _resolve_direct_browse_path(
|
||||
config_data: Dict[str, Any],
|
||||
token: str,
|
||||
) -> Optional[str]:
|
||||
text = str(token or "").strip()
|
||||
if not text:
|
||||
return None
|
||||
lowered = text.lower()
|
||||
if lowered in {"preferences", "prefs"}:
|
||||
return _PREFERENCES_BROWSE_PATH
|
||||
if lowered in {"plugins", "plugin", "providers", "provider", "tools", "tool"}:
|
||||
return _PLUGINS_BROWSE_PATH
|
||||
branch = _resolve_config_branch(config_data, text)
|
||||
if isinstance(branch, dict):
|
||||
return text
|
||||
return None
|
||||
|
||||
|
||||
def _strip_value_quotes(value: str) -> str:
|
||||
if not value:
|
||||
return value
|
||||
@@ -254,41 +625,33 @@ def _run(piped_result: Any, args: List[str], config: Dict[str, Any]) -> int:
|
||||
# Load configuration from the database
|
||||
current_config = load_config()
|
||||
|
||||
selection_key = _get_selected_config_key()
|
||||
value_from_args = _extract_value_arg(args) if selection_key else None
|
||||
value_from_pipe = _extract_piped_value(piped_result)
|
||||
browse_path = _extract_browse_arg(args)
|
||||
if browse_path:
|
||||
return _show_config_table(current_config, browse_path=browse_path)
|
||||
|
||||
if selection_key:
|
||||
new_value = value_from_pipe or value_from_args
|
||||
if not new_value:
|
||||
print(
|
||||
"Provide a new value via pipe or argument: @N | .config <value>"
|
||||
)
|
||||
return 1
|
||||
new_value = _strip_value_quotes(new_value)
|
||||
try:
|
||||
set_nested_config(current_config, selection_key, new_value)
|
||||
# For AllDebrid API changes, use the verified save path to ensure
|
||||
# the new API key persisted to disk; otherwise fall back to normal save.
|
||||
selection_item = _get_selected_config_item()
|
||||
value_from_pipe = _extract_piped_value(piped_result)
|
||||
selection_kind = str((selection_item or {}).get("kind") or "").strip().lower()
|
||||
selection_key = str((selection_item or {}).get("key") or "").strip() or None
|
||||
selection_browse_path = str((selection_item or {}).get("browse_path") or "").strip() or None
|
||||
selection_display_path = str((selection_item or {}).get("display_path") or selection_key or "").strip() or selection_key
|
||||
|
||||
if selection_kind == "section" and selection_browse_path and not args and value_from_pipe is None:
|
||||
return _show_config_table(current_config, browse_path=selection_browse_path)
|
||||
|
||||
if selection_kind == "value" and selection_key:
|
||||
new_value = value_from_pipe or _extract_selected_update_value(args)
|
||||
if new_value is not None:
|
||||
new_value = _strip_value_quotes(new_value)
|
||||
try:
|
||||
key_l = str(selection_key or "").lower()
|
||||
except Exception:
|
||||
key_l = ""
|
||||
if "alldebrid" in key_l or "all-debrid" in key_l:
|
||||
try:
|
||||
save_config_and_verify(current_config)
|
||||
except Exception as exc:
|
||||
log(f"Configuration save verification failed for '{selection_key}': {exc}")
|
||||
print(f"Error saving configuration (verification failed): {exc}")
|
||||
return 1
|
||||
else:
|
||||
save_config(current_config)
|
||||
print(f"Updated '{selection_key}' to '{new_value}'")
|
||||
return 0
|
||||
except Exception as exc:
|
||||
log(f"Error updating config '{selection_key}': {exc}")
|
||||
print(f"Error updating config: {exc}")
|
||||
return 1
|
||||
set_nested_config(current_config, selection_key, new_value)
|
||||
_save_updated_config(current_config, selection_key)
|
||||
print(f"Updated '{selection_display_path}' to '{new_value}'")
|
||||
return 0
|
||||
except Exception as exc:
|
||||
log(f"Error updating config '{selection_key}': {exc}")
|
||||
print(f"Error updating config: {exc}")
|
||||
return 1
|
||||
|
||||
if not args:
|
||||
if sys.stdin.isatty() and not piped_result:
|
||||
@@ -300,13 +663,16 @@ def _run(piped_result: Any, args: List[str], config: Dict[str, Any]) -> int:
|
||||
|
||||
key = args[0]
|
||||
if len(args) < 2:
|
||||
browse_target = _resolve_direct_browse_path(current_config, key)
|
||||
if browse_target:
|
||||
return _show_config_table(current_config, browse_path=browse_target)
|
||||
print(f"Error: Value required for key '{key}'")
|
||||
return 1
|
||||
|
||||
value = _strip_value_quotes(" ".join(args[1:]))
|
||||
try:
|
||||
set_nested_config(current_config, key, value)
|
||||
save_config(current_config)
|
||||
_save_updated_config(current_config, key)
|
||||
print(f"Updated '{key}' to '{value}'")
|
||||
return 0
|
||||
except Exception as exc:
|
||||
|
||||
Reference in New Issue
Block a user