273 lines
9.2 KiB
Python
273 lines
9.2 KiB
Python
from typing import List, Dict, Any, Optional, Sequence
|
|
|
|
from cmdlet._shared import Cmdlet, CmdletArg
|
|
from SYS.config import load_config, save_config
|
|
from SYS import pipeline as ctx
|
|
from SYS.result_table import ResultTable
|
|
|
|
CMDLET = Cmdlet(
|
|
name=".config",
|
|
summary="Manage configuration settings",
|
|
usage=".config [key] [value]",
|
|
arg=[
|
|
CmdletArg(
|
|
name="key",
|
|
description="Configuration key to update (dot-separated)",
|
|
required=False
|
|
),
|
|
CmdletArg(
|
|
name="value",
|
|
description="New value for the configuration key",
|
|
required=False
|
|
),
|
|
],
|
|
)
|
|
|
|
|
|
def flatten_config(config: Dict[str, Any], parent_key: str = "", sep: str = ".") -> List[Dict[str, Any]]:
|
|
items: List[Dict[str, Any]] = []
|
|
for k, v in config.items():
|
|
if k.startswith("_"):
|
|
continue
|
|
new_key = f"{parent_key}{sep}{k}" if parent_key else k
|
|
if isinstance(v, dict):
|
|
items.extend(flatten_config(v, new_key, sep=sep))
|
|
else:
|
|
items.append({
|
|
"key": new_key,
|
|
"value": v,
|
|
"value_display": str(v),
|
|
"type": type(v).__name__,
|
|
})
|
|
return items
|
|
|
|
|
|
def set_nested_config(config: Dict[str, Any], key: str, value: str) -> bool:
|
|
keys = key.split(".")
|
|
d = config
|
|
|
|
# Navigate to the parent dict
|
|
for k in keys[:-1]:
|
|
if k not in d or not isinstance(d[k], dict):
|
|
d[k] = {}
|
|
d = d[k]
|
|
|
|
last_key = keys[-1]
|
|
|
|
# Try to preserve type if key exists
|
|
if last_key in d:
|
|
current_val = d[last_key]
|
|
if isinstance(current_val, bool):
|
|
if value.lower() in ("true", "yes", "1", "on"):
|
|
d[last_key] = True
|
|
elif value.lower() in ("false", "no", "0", "off"):
|
|
d[last_key] = False
|
|
else:
|
|
# Fallback to boolean conversion of string (usually True for non-empty)
|
|
# But for config, explicit is better.
|
|
print(f"Warning: Could not convert '{value}' to boolean. Using string.")
|
|
d[last_key] = value
|
|
elif isinstance(current_val, int):
|
|
try:
|
|
d[last_key] = int(value)
|
|
except ValueError:
|
|
print(f"Warning: Could not convert '{value}' to int. Using string.")
|
|
d[last_key] = value
|
|
elif isinstance(current_val, float):
|
|
try:
|
|
d[last_key] = float(value)
|
|
except ValueError:
|
|
print(f"Warning: Could not convert '{value}' to float. Using string.")
|
|
d[last_key] = value
|
|
else:
|
|
d[last_key] = value
|
|
else:
|
|
# New key, try to infer type
|
|
if value.lower() in ("true", "false"):
|
|
d[last_key] = value.lower() == "true"
|
|
elif value.isdigit():
|
|
d[last_key] = int(value)
|
|
else:
|
|
d[last_key] = value
|
|
|
|
return True
|
|
|
|
|
|
def _extract_piped_value(result: Any) -> Optional[str]:
|
|
if isinstance(result, str):
|
|
return result.strip() if result.strip() else None
|
|
if isinstance(result, (int, float)):
|
|
return str(result)
|
|
if isinstance(result, dict):
|
|
val = result.get("value")
|
|
if val is not None:
|
|
return str(val).strip()
|
|
return None
|
|
|
|
|
|
def _extract_value_arg(args: Sequence[str]) -> Optional[str]:
|
|
if not args:
|
|
return None
|
|
tokens = [str(tok) for tok in args if tok is not None]
|
|
flags = {"-value", "--value", "-set-value", "--set-value"}
|
|
for idx, tok in enumerate(tokens):
|
|
text = tok.strip()
|
|
if not text:
|
|
continue
|
|
low = text.lower()
|
|
if low in flags and idx + 1 < len(tokens):
|
|
candidate = str(tokens[idx + 1]).strip()
|
|
if candidate:
|
|
return candidate
|
|
if "=" in low:
|
|
head, val = low.split("=", 1)
|
|
if head in flags and val:
|
|
return val.strip()
|
|
for tok in tokens:
|
|
text = str(tok).strip()
|
|
if text and not text.startswith("-"):
|
|
return text
|
|
return None
|
|
|
|
|
|
def _get_selected_config_key() -> Optional[str]:
|
|
try:
|
|
indices = ctx.get_last_selection() or []
|
|
except Exception:
|
|
indices = []
|
|
try:
|
|
items = ctx.get_last_result_items() or []
|
|
except Exception:
|
|
items = []
|
|
if not indices or not items:
|
|
return None
|
|
idx = indices[0]
|
|
if idx < 0 or idx >= len(items):
|
|
return None
|
|
item = items[idx]
|
|
if isinstance(item, dict):
|
|
return item.get("key")
|
|
return getattr(item, "key", None)
|
|
|
|
|
|
def _show_config_table(config_data: Dict[str, Any]) -> int:
|
|
items = flatten_config(config_data)
|
|
if not items:
|
|
print("No configuration entries available.")
|
|
return 0
|
|
items.sort(key=lambda x: x.get("key"))
|
|
|
|
table = ResultTable("Configuration")
|
|
table.set_table("config")
|
|
table.set_source_command(".config", [])
|
|
|
|
for item in items:
|
|
row = table.add_row()
|
|
row.add_column("Key", item.get("key", ""))
|
|
row.add_column("Value", item.get("value_display", ""))
|
|
row.add_column("Type", item.get("type", ""))
|
|
|
|
ctx.set_last_result_table_overlay(table, items)
|
|
ctx.set_current_stage_table(table)
|
|
return 0
|
|
|
|
|
|
def _strip_value_quotes(value: str) -> str:
|
|
if not value:
|
|
return value
|
|
if (value.startswith('"') and value.endswith('"')) or (value.startswith("'") and value.endswith("'")):
|
|
return value[1:-1]
|
|
return value
|
|
|
|
|
|
def _run(piped_result: Any, args: List[str], config: Dict[str, Any]) -> int:
|
|
import sys
|
|
from pathlib import Path
|
|
|
|
# Load from workspace root, not SYS directory
|
|
workspace_root = Path(__file__).resolve().parent.parent
|
|
current_config = load_config(config_dir=workspace_root)
|
|
|
|
selection_key = _get_selected_config_key()
|
|
value_from_args = _extract_value_arg(args) if selection_key else None
|
|
value_from_pipe = _extract_piped_value(piped_result)
|
|
|
|
if selection_key:
|
|
new_value = value_from_pipe or value_from_args
|
|
if not new_value:
|
|
print(
|
|
"Provide a new value via pipe or argument: @N | .config <value>"
|
|
)
|
|
return 1
|
|
new_value = _strip_value_quotes(new_value)
|
|
try:
|
|
set_nested_config(current_config, selection_key, new_value)
|
|
save_config(current_config, config_dir=workspace_root)
|
|
print(f"Updated '{selection_key}' to '{new_value}'")
|
|
return 0
|
|
except Exception as exc:
|
|
print(f"Error updating config: {exc}")
|
|
return 1
|
|
|
|
if not args:
|
|
# Check if we're in an interactive terminal and can launch a Textual modal
|
|
if sys.stdin.isatty() and not piped_result:
|
|
try:
|
|
from textual.app import App, ComposeResult
|
|
from TUI.modalscreen.config_modal import ConfigModal
|
|
|
|
class ConfigApp(App):
|
|
def on_mount(self) -> None:
|
|
self.title = "Config Editor"
|
|
# We push the modal screen. It will sit on top of the main (blank) screen.
|
|
# Using a callback to exit the app when the modal is dismissed.
|
|
self.push_screen(ConfigModal(), callback=self.exit_on_close)
|
|
|
|
def exit_on_close(self, result: Any = None) -> None:
|
|
self.exit()
|
|
|
|
with ctx.suspend_live_progress():
|
|
app = ConfigApp()
|
|
app.run()
|
|
|
|
# After modal exits, show the new status table if possible
|
|
try:
|
|
from cmdlet._shared import SharedArgs
|
|
from cmdnat.status import CMDLET as STATUS_CMDLET
|
|
# We reload the config one more time because it might have changed on disk
|
|
fresh_config = load_config(config_dir=workspace_root)
|
|
|
|
# Force refresh of shared caches (especially stores)
|
|
SharedArgs._refresh_store_choices_cache(fresh_config)
|
|
# Update the global SharedArgs choices so cmdlets pick up new stores
|
|
SharedArgs.STORE.choices = SharedArgs.get_store_choices(fresh_config, force=True)
|
|
|
|
return STATUS_CMDLET.exec(None, [], fresh_config)
|
|
except Exception:
|
|
pass
|
|
return 0
|
|
except Exception as exc:
|
|
# Fall back to table display if Textual modal fails
|
|
print(f"Note: Could not launch interactive editor ({exc}). Showing configuration table:")
|
|
return _show_config_table(current_config)
|
|
|
|
return _show_config_table(current_config)
|
|
|
|
key = args[0]
|
|
if len(args) < 2:
|
|
print(f"Error: Value required for key '{key}'")
|
|
return 1
|
|
|
|
value = _strip_value_quotes(" ".join(args[1:]))
|
|
try:
|
|
set_nested_config(current_config, key, value)
|
|
save_config(current_config, config_dir=workspace_root)
|
|
print(f"Updated '{key}' to '{value}'")
|
|
return 0
|
|
except Exception as exc:
|
|
print(f"Error updating config: {exc}")
|
|
return 1
|
|
|
|
|
|
CMDLET.exec = _run
|