j
This commit is contained in:
171
cmdnat/config.py
171
cmdnat/config.py
@@ -1,7 +1,9 @@
|
||||
from typing import List, Dict, Any
|
||||
from typing import List, Dict, Any, Optional, Sequence
|
||||
|
||||
from cmdlet._shared import Cmdlet, CmdletArg
|
||||
from SYS.config import load_config, save_config
|
||||
from SYS import pipeline as ctx
|
||||
from SYS.result_table import ResultTable
|
||||
|
||||
CMDLET = Cmdlet(
|
||||
name=".config",
|
||||
@@ -22,28 +24,21 @@ CMDLET = Cmdlet(
|
||||
)
|
||||
|
||||
|
||||
def flatten_config(config: Dict[str,
|
||||
Any],
|
||||
parent_key: str = "",
|
||||
sep: str = ".") -> List[Dict[str,
|
||||
Any]]:
|
||||
items = []
|
||||
def flatten_config(config: Dict[str, Any], parent_key: str = "", sep: str = ".") -> List[Dict[str, Any]]:
|
||||
items: List[Dict[str, Any]] = []
|
||||
for k, v in config.items():
|
||||
if k.startswith("_"): # Skip internal keys
|
||||
if k.startswith("_"):
|
||||
continue
|
||||
|
||||
new_key = f"{parent_key}{sep}{k}" if parent_key else k
|
||||
if isinstance(v, dict):
|
||||
items.extend(flatten_config(v, new_key, sep=sep))
|
||||
else:
|
||||
items.append(
|
||||
{
|
||||
"Key": new_key,
|
||||
"Value": str(v),
|
||||
"Type": type(v).__name__,
|
||||
"_selection_args": [new_key],
|
||||
}
|
||||
)
|
||||
items.append({
|
||||
"key": new_key,
|
||||
"value": v,
|
||||
"value_display": str(v),
|
||||
"type": type(v).__name__,
|
||||
})
|
||||
return items
|
||||
|
||||
|
||||
@@ -98,53 +93,133 @@ def set_nested_config(config: Dict[str, Any], key: str, value: str) -> bool:
|
||||
return True
|
||||
|
||||
|
||||
def _run(piped_result: Any, args: List[str], config: Dict[str, Any]) -> int:
|
||||
# Reload config to ensure we have the latest on disk
|
||||
# We don't use the passed 'config' because we want to edit the file
|
||||
# and 'config' might contain runtime objects (like worker manager)
|
||||
# But load_config() returns a fresh dict from disk (or cache)
|
||||
# We should use load_config()
|
||||
def _extract_piped_value(result: Any) -> Optional[str]:
|
||||
if isinstance(result, str):
|
||||
return result.strip() if result.strip() else None
|
||||
if isinstance(result, (int, float)):
|
||||
return str(result)
|
||||
if isinstance(result, dict):
|
||||
val = result.get("value")
|
||||
if val is not None:
|
||||
return str(val).strip()
|
||||
return None
|
||||
|
||||
|
||||
def _extract_value_arg(args: Sequence[str]) -> Optional[str]:
|
||||
if not args:
|
||||
return None
|
||||
tokens = [str(tok) for tok in args if tok is not None]
|
||||
flags = {"-value", "--value", "-set-value", "--set-value"}
|
||||
for idx, tok in enumerate(tokens):
|
||||
text = tok.strip()
|
||||
if not text:
|
||||
continue
|
||||
low = text.lower()
|
||||
if low in flags and idx + 1 < len(tokens):
|
||||
candidate = str(tokens[idx + 1]).strip()
|
||||
if candidate:
|
||||
return candidate
|
||||
if "=" in low:
|
||||
head, val = low.split("=", 1)
|
||||
if head in flags and val:
|
||||
return val.strip()
|
||||
for tok in tokens:
|
||||
text = str(tok).strip()
|
||||
if text and not text.startswith("-"):
|
||||
return text
|
||||
return None
|
||||
|
||||
|
||||
def _get_selected_config_key() -> Optional[str]:
|
||||
try:
|
||||
indices = ctx.get_last_selection() or []
|
||||
except Exception:
|
||||
indices = []
|
||||
try:
|
||||
items = ctx.get_last_result_items() or []
|
||||
except Exception:
|
||||
items = []
|
||||
if not indices or not items:
|
||||
return None
|
||||
idx = indices[0]
|
||||
if idx < 0 or idx >= len(items):
|
||||
return None
|
||||
item = items[idx]
|
||||
if isinstance(item, dict):
|
||||
return item.get("key")
|
||||
return getattr(item, "key", None)
|
||||
|
||||
|
||||
def _show_config_table(config_data: Dict[str, Any]) -> int:
|
||||
items = flatten_config(config_data)
|
||||
if not items:
|
||||
print("No configuration entries available.")
|
||||
return 0
|
||||
items.sort(key=lambda x: x.get("key"))
|
||||
|
||||
table = ResultTable("Configuration")
|
||||
table.set_table("config")
|
||||
table.set_source_command(".config", [])
|
||||
|
||||
for item in items:
|
||||
row = table.add_row()
|
||||
row.add_column("Key", item.get("key", ""))
|
||||
row.add_column("Value", item.get("value_display", ""))
|
||||
row.add_column("Type", item.get("type", ""))
|
||||
|
||||
ctx.set_last_result_table_overlay(table, items)
|
||||
ctx.set_current_stage_table(table)
|
||||
return 0
|
||||
|
||||
|
||||
def _strip_value_quotes(value: str) -> str:
|
||||
if not value:
|
||||
return value
|
||||
if (value.startswith('"') and value.endswith('"')) or (value.startswith("'") and value.endswith("'")):
|
||||
return value[1:-1]
|
||||
return value
|
||||
|
||||
|
||||
def _run(piped_result: Any, args: List[str], config: Dict[str, Any]) -> int:
|
||||
current_config = load_config()
|
||||
|
||||
# Parse args
|
||||
# We handle args manually because of the potential for spaces in values
|
||||
# and the @ expansion logic in CLI.py passing args
|
||||
selection_key = _get_selected_config_key()
|
||||
value_from_args = _extract_value_arg(args) if selection_key else None
|
||||
value_from_pipe = _extract_piped_value(piped_result)
|
||||
|
||||
if selection_key:
|
||||
new_value = value_from_pipe or value_from_args
|
||||
if not new_value:
|
||||
print(
|
||||
"Provide a new value via pipe or argument: @N | .config <value>"
|
||||
)
|
||||
return 1
|
||||
new_value = _strip_value_quotes(new_value)
|
||||
try:
|
||||
set_nested_config(current_config, selection_key, new_value)
|
||||
save_config(current_config)
|
||||
print(f"Updated '{selection_key}' to '{new_value}'")
|
||||
return 0
|
||||
except Exception as exc:
|
||||
print(f"Error updating config: {exc}")
|
||||
return 1
|
||||
|
||||
if not args:
|
||||
# List mode
|
||||
items = flatten_config(current_config)
|
||||
# Sort by key
|
||||
items.sort(key=lambda x: x["Key"])
|
||||
return _show_config_table(current_config)
|
||||
|
||||
# Emit items for ResultTable
|
||||
from SYS import pipeline as ctx
|
||||
|
||||
for item in items:
|
||||
ctx.emit(item)
|
||||
return 0
|
||||
|
||||
# Update mode
|
||||
key = args[0]
|
||||
|
||||
if len(args) < 2:
|
||||
print(f"Error: Value required for key '{key}'")
|
||||
return 1
|
||||
|
||||
value = " ".join(args[1:])
|
||||
|
||||
# Remove quotes if present
|
||||
if (value.startswith('"') and value.endswith('"')) or (value.startswith("'")
|
||||
and value.endswith("'")):
|
||||
value = value[1:-1]
|
||||
|
||||
value = _strip_value_quotes(" ".join(args[1:]))
|
||||
try:
|
||||
set_nested_config(current_config, key, value)
|
||||
save_config(current_config)
|
||||
print(f"Updated '{key}' to '{value}'")
|
||||
return 0
|
||||
except Exception as e:
|
||||
print(f"Error updating config: {e}")
|
||||
except Exception as exc:
|
||||
print(f"Error updating config: {exc}")
|
||||
return 1
|
||||
|
||||
|
||||
|
||||
Reference in New Issue
Block a user