d
This commit is contained in:
29
cli.py
29
cli.py
@@ -375,6 +375,35 @@ def search(
|
||||
typer.echo("No cards matched that filter.")
|
||||
raise typer.Exit(code=0)
|
||||
|
||||
|
||||
@app.command("import-db")
|
||||
def import_db(
|
||||
file: Path = typer.Argument(..., help="CSV or XLSX file to import"),
|
||||
table: str = typer.Option(..., "--table", "-t", help="Target table to import into"),
|
||||
sheet: Optional[str] = typer.Option(None, "--sheet", help="Excel sheet name (optional)"),
|
||||
delimiter: str = typer.Option(",", "--delimiter", "-d", help="CSV delimiter (default ',')"),
|
||||
db_path: Optional[Path] = typer.Option(None, "--db", help="Path to target SQLite DB"),
|
||||
) -> None:
|
||||
"""Import CSV or Excel data into the project's DB tables.
|
||||
|
||||
Examples:
|
||||
tarot import-db angels.xlsx --table card_angels
|
||||
tarot import-db angels.csv --table card_angels --delimiter ','
|
||||
"""
|
||||
try:
|
||||
from tarot.db_import import import_file
|
||||
except Exception as exc: # pragma: no cover - import errors bubble up in tests
|
||||
typer.echo(f"Error loading import helper: {exc}")
|
||||
raise typer.Exit(code=1)
|
||||
|
||||
try:
|
||||
res = import_file(file, table, db_path=db_path, sheet=sheet, delimiter=delimiter)
|
||||
except Exception as exc: # pragma: no cover - errors will be surfaced for visibility
|
||||
typer.echo(f"Import failed: {exc}")
|
||||
raise typer.Exit(code=1)
|
||||
|
||||
typer.echo(f"Inserted: {res.inserted}; Skipped: {res.skipped}; Updated: {res.updated}")
|
||||
|
||||
def _doc(obj: Any, fallback: str) -> str:
|
||||
return inspect.getdoc(obj) or fallback
|
||||
|
||||
|
||||
105
cli_renderers.py
105
cli_renderers.py
@@ -6,7 +6,7 @@ specialized handling for planets plus generic list/dict/object renderers.
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from typing import Any, Dict, List, Optional, Sequence
|
||||
from typing import Any, Dict, List, Optional, Sequence, Tuple
|
||||
|
||||
from rich import box
|
||||
from rich.console import Console, Group
|
||||
@@ -15,6 +15,8 @@ from rich.table import Table
|
||||
|
||||
from tarot import Card
|
||||
from utils.attributes import Color, Planet
|
||||
from utils.dates import format_month_day
|
||||
from utils.object_formatting import format_value, get_object_attributes, is_nested_object
|
||||
|
||||
|
||||
class Renderer:
|
||||
@@ -69,27 +71,10 @@ class Renderer:
|
||||
self.console.print(table)
|
||||
|
||||
def print_cards_table(self, cards: List[Card]) -> None:
|
||||
table = Table(title="Cards", show_lines=True)
|
||||
table.add_column("#", justify="right")
|
||||
table.add_column("Name")
|
||||
table.add_column("Arcana")
|
||||
table.add_column("Suit")
|
||||
table.add_column("Pip/Court")
|
||||
|
||||
for card in cards:
|
||||
suit = getattr(getattr(card, "suit", None), "name", "")
|
||||
pip = str(getattr(card, "pip", "")) if getattr(card, "pip", None) else ""
|
||||
court = getattr(card, "court_rank", "")
|
||||
pip_display = court or pip
|
||||
table.add_row(
|
||||
str(getattr(card, "number", "")),
|
||||
getattr(card, "name", ""),
|
||||
getattr(card, "arcana", ""),
|
||||
suit,
|
||||
pip_display,
|
||||
)
|
||||
|
||||
self.console.print(table)
|
||||
for idx, card in enumerate(cards):
|
||||
self._render_card(card)
|
||||
if idx < len(cards) - 1:
|
||||
self.console.print()
|
||||
|
||||
# Internal renderers -------------------------------------------------
|
||||
def _render_card(self, card: Card) -> None:
|
||||
@@ -99,7 +84,6 @@ class Renderer:
|
||||
arcana = getattr(card, "arcana", "")
|
||||
suit_obj = getattr(card, "suit", None)
|
||||
suit = getattr(suit_obj, "name", "") if suit_obj is not None else ""
|
||||
pip = getattr(card, "pip", None)
|
||||
number = getattr(card, "number", "")
|
||||
keywords = getattr(card, "keywords", None) or []
|
||||
meaning = getattr(card, "meaning", None)
|
||||
@@ -112,18 +96,7 @@ class Renderer:
|
||||
top_right = f"#{number}" if number not in (None, "") else ""
|
||||
top.add_row(top_left, top_center, top_right)
|
||||
|
||||
body_lines = [f"Arcana: {arcana or '-'}"]
|
||||
if suit:
|
||||
body_lines.append(f"Suit: {suit}")
|
||||
if pip:
|
||||
body_lines.append(f"Pip: {pip}")
|
||||
if getattr(card, "court_rank", ""):
|
||||
body_lines.append(f"Court Rank: {card.court_rank}")
|
||||
if keywords:
|
||||
body_lines.append(f"Keywords: {', '.join(keywords)}")
|
||||
guidance = getattr(card, "guidance", None)
|
||||
if guidance:
|
||||
body_lines.append(f"Guidance: {guidance}")
|
||||
body_lines = self._card_field_lines(card)
|
||||
body = Panel("\n".join(body_lines), box=box.MINIMAL, padding=(1, 1))
|
||||
|
||||
bottom = Table(
|
||||
@@ -292,6 +265,68 @@ class Renderer:
|
||||
safe_val = value if value not in (None, "") else "—"
|
||||
return f"{label}:\n{safe_val}"
|
||||
|
||||
def _card_field_lines(self, card: Card) -> List[str]:
|
||||
exclude = {"name", "number", "meaning", "keywords"}
|
||||
lines: List[str] = []
|
||||
for attr_name, attr_value in get_object_attributes(card):
|
||||
if attr_name in exclude:
|
||||
continue
|
||||
if attr_value in (None, "", [], {}, ()): # Skip empty values
|
||||
continue
|
||||
|
||||
label = self._humanize_key(attr_name).title()
|
||||
if is_nested_object(attr_value) and not hasattr(attr_value, "name"):
|
||||
lines.append(f"{label}:")
|
||||
nested = format_value(attr_value, indent=2)
|
||||
lines.extend(nested.splitlines())
|
||||
continue
|
||||
|
||||
lines.append(f"{label}: {self._format_card_value(attr_value)}")
|
||||
|
||||
if not lines:
|
||||
lines.append("(no fields)")
|
||||
return lines
|
||||
|
||||
def _format_card_value(self, value: Any) -> str:
|
||||
if value is None:
|
||||
return "—"
|
||||
if isinstance(value, (str, int, float, bool)):
|
||||
return str(value)
|
||||
if isinstance(value, tuple):
|
||||
date_range = self._format_date_range(value)
|
||||
if date_range is not None:
|
||||
return date_range
|
||||
return ", ".join(self._format_card_value(v) for v in value)
|
||||
if isinstance(value, list):
|
||||
return ", ".join(self._format_card_value(v) for v in value)
|
||||
if isinstance(value, dict):
|
||||
return ", ".join(
|
||||
f"{self._humanize_key(str(k)).title()}: {self._format_card_value(v)}"
|
||||
for k, v in value.items()
|
||||
)
|
||||
if hasattr(value, "name"):
|
||||
return str(getattr(value, "name", value))
|
||||
if is_nested_object(value):
|
||||
return format_value(value, indent=2)
|
||||
return str(value)
|
||||
|
||||
@staticmethod
|
||||
def _format_date_range(value: Tuple[Any, ...]) -> Optional[str]:
|
||||
if len(value) != 2:
|
||||
return None
|
||||
start, end = value
|
||||
if not (
|
||||
isinstance(start, tuple)
|
||||
and isinstance(end, tuple)
|
||||
and len(start) == 2
|
||||
and len(end) == 2
|
||||
):
|
||||
return None
|
||||
try:
|
||||
return f"{format_month_day(start)} - {format_month_day(end)}"
|
||||
except Exception:
|
||||
return None
|
||||
|
||||
|
||||
class PlanetRenderer:
|
||||
"""Type-specific table rendering for planets."""
|
||||
|
||||
@@ -21,7 +21,11 @@ Usage:
|
||||
registry.load_into_card(card)
|
||||
"""
|
||||
|
||||
from typing import TYPE_CHECKING, Any, Dict, Optional
|
||||
import json
|
||||
import os
|
||||
import sqlite3
|
||||
from pathlib import Path
|
||||
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple
|
||||
|
||||
from tarot.constants import build_position_map
|
||||
|
||||
@@ -43,11 +47,457 @@ class CardDetailsRegistry:
|
||||
- 65-78: Wands (same structure)
|
||||
"""
|
||||
|
||||
def __init__(self) -> None:
|
||||
"""Initialize the card details registry with interpretive data."""
|
||||
self._details: Dict[str, Dict[str, Any]] = self._build_registry()
|
||||
_DETAIL_KEYS = {
|
||||
"explanation",
|
||||
"interpretation",
|
||||
"keywords",
|
||||
"reversed_keywords",
|
||||
"guidance",
|
||||
"date_range",
|
||||
"degrees",
|
||||
"day_angel",
|
||||
"night_angel",
|
||||
"day_demon",
|
||||
"night_demon",
|
||||
}
|
||||
|
||||
def __init__(self, *, use_db: bool = True, db_path: Optional[Path] = None) -> None:
|
||||
"""Initialize the card details registry with interpretive data.
|
||||
|
||||
Args:
|
||||
use_db: If True, load details from a local SQLite database (creating it
|
||||
on first run). If False, use the in-code registry only.
|
||||
db_path: Optional custom path for the SQLite file.
|
||||
"""
|
||||
# Map card positions (1-78) to registry keys
|
||||
self._position_map = self._build_position_map()
|
||||
self._use_db = use_db
|
||||
self._db_path = self._resolve_db_path(db_path)
|
||||
|
||||
if self._use_db:
|
||||
self._details = self._load_registry_from_db()
|
||||
else:
|
||||
self._details = self._build_registry()
|
||||
|
||||
@staticmethod
|
||||
def _resolve_db_path(db_path: Optional[Path]) -> Path:
|
||||
"""Resolve the database path with priority order.
|
||||
|
||||
Priority:
|
||||
1. Explicit db_path argument
|
||||
2. PY_TAROT_DB_PATH or TAROT_DB_PATH environment variable
|
||||
3. Project root (where pyproject.toml is located)
|
||||
4. User home directory ~/.py-tarot/
|
||||
"""
|
||||
if db_path is not None:
|
||||
return Path(db_path)
|
||||
|
||||
env_path = os.getenv("PY_TAROT_DB_PATH") or os.getenv("TAROT_DB_PATH")
|
||||
if env_path:
|
||||
return Path(env_path)
|
||||
|
||||
# Try to find project root by looking for pyproject.toml
|
||||
current = Path(__file__).resolve()
|
||||
for parent in current.parents:
|
||||
if (parent / "pyproject.toml").exists() or (parent / ".git").exists():
|
||||
return parent / "tarot.db"
|
||||
|
||||
# Fallback to user home directory
|
||||
return Path.home() / ".py-tarot" / "tarot.db"
|
||||
|
||||
def _load_registry_from_db(self) -> Dict[str, Dict[str, Any]]:
|
||||
try:
|
||||
self._ensure_db()
|
||||
except (OSError, sqlite3.Error):
|
||||
return self._build_registry()
|
||||
|
||||
conn = sqlite3.connect(self._db_path)
|
||||
conn.row_factory = sqlite3.Row
|
||||
try:
|
||||
# Check for new schema columns
|
||||
conn.execute("SELECT explanation_summary FROM card_details LIMIT 1")
|
||||
|
||||
rows = conn.execute(
|
||||
"SELECT registry_key, position, explanation_summary, explanation_waite, "
|
||||
"interpretation, guidance, date_start_month, date_start_day, "
|
||||
"date_end_month, date_end_day, degrees, extra_json FROM card_details"
|
||||
).fetchall()
|
||||
except sqlite3.OperationalError:
|
||||
conn.close()
|
||||
if self._rebuild_db():
|
||||
conn = sqlite3.connect(self._db_path)
|
||||
conn.row_factory = sqlite3.Row
|
||||
try:
|
||||
rows = conn.execute(
|
||||
"SELECT registry_key, position, explanation_summary, explanation_waite, "
|
||||
"interpretation, guidance, date_start_month, date_start_day, "
|
||||
"date_end_month, date_end_day, degrees, extra_json FROM card_details"
|
||||
).fetchall()
|
||||
finally:
|
||||
pass
|
||||
else:
|
||||
return self._build_registry()
|
||||
|
||||
# Load keywords
|
||||
keywords_map: Dict[str, Dict[str, List[str]]] = {}
|
||||
try:
|
||||
kw_rows = conn.execute(
|
||||
"SELECT card_key, keyword, is_reversed FROM card_keywords"
|
||||
).fetchall()
|
||||
for k_row in kw_rows:
|
||||
ckey = k_row["card_key"]
|
||||
kw = k_row["keyword"]
|
||||
is_rev = bool(k_row["is_reversed"])
|
||||
if ckey not in keywords_map:
|
||||
keywords_map[ckey] = {"keywords": [], "reversed_keywords": []}
|
||||
if is_rev:
|
||||
keywords_map[ckey]["reversed_keywords"].append(kw)
|
||||
else:
|
||||
keywords_map[ckey]["keywords"].append(kw)
|
||||
except sqlite3.OperationalError:
|
||||
pass
|
||||
|
||||
# Load angels and demons
|
||||
angels_map: Dict[str, Dict[str, str]] = {}
|
||||
demons_map: Dict[str, Dict[str, str]] = {}
|
||||
try:
|
||||
angel_rows = conn.execute(
|
||||
"SELECT card_key, name, is_night FROM card_angels"
|
||||
).fetchall()
|
||||
for a_row in angel_rows:
|
||||
ckey = a_row["card_key"]
|
||||
name = a_row["name"]
|
||||
is_night = bool(a_row["is_night"])
|
||||
if ckey not in angels_map:
|
||||
angels_map[ckey] = {}
|
||||
field = "night_angel" if is_night else "day_angel"
|
||||
angels_map[ckey][field] = name
|
||||
|
||||
demon_rows = conn.execute(
|
||||
"SELECT card_key, name, is_night FROM card_demons"
|
||||
).fetchall()
|
||||
for d_row in demon_rows:
|
||||
ckey = d_row["card_key"]
|
||||
name = d_row["name"]
|
||||
is_night = bool(d_row["is_night"])
|
||||
if ckey not in demons_map:
|
||||
demons_map[ckey] = {}
|
||||
field = "night_demon" if is_night else "day_demon"
|
||||
demons_map[ckey][field] = name
|
||||
except sqlite3.OperationalError:
|
||||
pass
|
||||
|
||||
details: Dict[str, Dict[str, Any]] = {}
|
||||
for row in rows:
|
||||
entry: Dict[str, Any] = {}
|
||||
ex = {}
|
||||
if row["explanation_summary"]:
|
||||
ex["summary"] = row["explanation_summary"]
|
||||
if row["explanation_waite"]:
|
||||
ex["waite"] = row["explanation_waite"]
|
||||
|
||||
if row["extra_json"]:
|
||||
try:
|
||||
extra = json.loads(row["extra_json"])
|
||||
if "explanation" in extra and isinstance(extra["explanation"], dict):
|
||||
ex.update(extra.pop("explanation"))
|
||||
entry.update(self._restore_details(extra))
|
||||
except (TypeError, ValueError):
|
||||
pass
|
||||
|
||||
if ex:
|
||||
entry["explanation"] = ex
|
||||
|
||||
if row["interpretation"]:
|
||||
entry["interpretation"] = row["interpretation"]
|
||||
if row["guidance"]:
|
||||
entry["guidance"] = row["guidance"]
|
||||
|
||||
kws = keywords_map.get(row["registry_key"], {})
|
||||
if kws.get("keywords"):
|
||||
entry["keywords"] = kws["keywords"]
|
||||
if kws.get("reversed_keywords"):
|
||||
entry["reversed_keywords"] = kws["reversed_keywords"]
|
||||
|
||||
# Merge angels from table
|
||||
angels = angels_map.get(row["registry_key"], {})
|
||||
for field in ["day_angel", "night_angel"]:
|
||||
if field in angels:
|
||||
entry[field] = angels[field]
|
||||
|
||||
# Merge demons from table
|
||||
demons = demons_map.get(row["registry_key"], {})
|
||||
for field in ["day_demon", "night_demon"]:
|
||||
if field in demons:
|
||||
entry[field] = demons[field]
|
||||
|
||||
if (
|
||||
row["date_start_month"]
|
||||
and row["date_start_day"]
|
||||
and row["date_end_month"]
|
||||
and row["date_end_day"]
|
||||
):
|
||||
entry["date_range"] = (
|
||||
(int(row["date_start_month"]), int(row["date_start_day"])),
|
||||
(int(row["date_end_month"]), int(row["date_end_day"])),
|
||||
)
|
||||
|
||||
if row["degrees"]:
|
||||
entry["degrees"] = row["degrees"]
|
||||
|
||||
details[row["registry_key"]] = entry
|
||||
|
||||
conn.close()
|
||||
return details
|
||||
|
||||
def _ensure_db(self) -> None:
|
||||
self._db_path.parent.mkdir(parents=True, exist_ok=True)
|
||||
conn = sqlite3.connect(self._db_path)
|
||||
try:
|
||||
# Main card details table
|
||||
conn.execute(
|
||||
"CREATE TABLE IF NOT EXISTS card_details ("
|
||||
"registry_key TEXT PRIMARY KEY, "
|
||||
"position INTEGER, "
|
||||
"explanation_summary TEXT, "
|
||||
"explanation_waite TEXT, "
|
||||
"interpretation TEXT, "
|
||||
"guidance TEXT, "
|
||||
"date_start_month INTEGER, "
|
||||
"date_start_day INTEGER, "
|
||||
"date_end_month INTEGER, "
|
||||
"date_end_day INTEGER, "
|
||||
"degrees TEXT, "
|
||||
"extra_json TEXT"
|
||||
")"
|
||||
)
|
||||
|
||||
# Keywords table (normalized)
|
||||
conn.execute(
|
||||
"CREATE TABLE IF NOT EXISTS card_keywords ("
|
||||
"id INTEGER PRIMARY KEY AUTOINCREMENT, "
|
||||
"card_key TEXT NOT NULL, "
|
||||
"keyword TEXT NOT NULL, "
|
||||
"is_reversed BOOLEAN DEFAULT 0, "
|
||||
"FOREIGN KEY(card_key) REFERENCES card_details(registry_key) ON DELETE CASCADE"
|
||||
")"
|
||||
)
|
||||
|
||||
# Angels table (normalized)
|
||||
conn.execute(
|
||||
"CREATE TABLE IF NOT EXISTS card_angels ("
|
||||
"id INTEGER PRIMARY KEY AUTOINCREMENT, "
|
||||
"card_key TEXT NOT NULL, "
|
||||
"name TEXT NOT NULL, "
|
||||
"is_night BOOLEAN DEFAULT 0, "
|
||||
"FOREIGN KEY(card_key) REFERENCES card_details(registry_key) ON DELETE CASCADE"
|
||||
")"
|
||||
)
|
||||
|
||||
# Demons table (normalized)
|
||||
conn.execute(
|
||||
"CREATE TABLE IF NOT EXISTS card_demons ("
|
||||
"id INTEGER PRIMARY KEY AUTOINCREMENT, "
|
||||
"card_key TEXT NOT NULL, "
|
||||
"name TEXT NOT NULL, "
|
||||
"is_night BOOLEAN DEFAULT 0, "
|
||||
"FOREIGN KEY(card_key) REFERENCES card_details(registry_key) ON DELETE CASCADE"
|
||||
")"
|
||||
)
|
||||
|
||||
# Indexes
|
||||
conn.execute(
|
||||
"CREATE INDEX IF NOT EXISTS idx_card_keywords_card_key ON card_keywords(card_key)"
|
||||
)
|
||||
conn.execute(
|
||||
"CREATE INDEX IF NOT EXISTS idx_card_angels_card_key ON card_angels(card_key)"
|
||||
)
|
||||
conn.execute(
|
||||
"CREATE INDEX IF NOT EXISTS idx_card_demons_card_key ON card_demons(card_key)"
|
||||
)
|
||||
conn.execute(
|
||||
"CREATE INDEX IF NOT EXISTS idx_card_details_position ON card_details(position)"
|
||||
)
|
||||
|
||||
row = conn.execute("SELECT COUNT(1) FROM card_details").fetchone()
|
||||
count = row[0] if row else 0
|
||||
if count == 0:
|
||||
self._seed_db(conn)
|
||||
finally:
|
||||
conn.close()
|
||||
|
||||
def _rebuild_db(self) -> bool:
|
||||
try:
|
||||
if self._db_path.exists():
|
||||
self._db_path.unlink()
|
||||
self._ensure_db()
|
||||
except (OSError, sqlite3.Error):
|
||||
return False
|
||||
return True
|
||||
|
||||
def _seed_db(self, conn: sqlite3.Connection) -> None:
|
||||
registry = self._build_registry()
|
||||
key_to_position = {key: pos for pos, key in self._position_map.items()}
|
||||
|
||||
details_rows = []
|
||||
keyword_rows = []
|
||||
angel_rows = []
|
||||
demon_rows = []
|
||||
|
||||
for key, details in registry.items():
|
||||
row_data, kws, angels, demons = self._prepare_rows(key, details)
|
||||
row_data["position"] = key_to_position.get(key)
|
||||
|
||||
details_rows.append(
|
||||
(
|
||||
key,
|
||||
row_data["position"],
|
||||
row_data["explanation_summary"],
|
||||
row_data["explanation_waite"],
|
||||
row_data["interpretation"],
|
||||
row_data["guidance"],
|
||||
row_data["date_start_month"],
|
||||
row_data["date_start_day"],
|
||||
row_data["date_end_month"],
|
||||
row_data["date_end_day"],
|
||||
row_data["degrees"],
|
||||
row_data["extra_json"],
|
||||
)
|
||||
)
|
||||
|
||||
for kw in kws:
|
||||
keyword_rows.append((key, kw["keyword"], kw["is_reversed"]))
|
||||
|
||||
for angel in angels:
|
||||
angel_rows.append((key, angel["name"], angel["is_night"]))
|
||||
|
||||
for demon in demons:
|
||||
demon_rows.append((key, demon["name"], demon["is_night"]))
|
||||
|
||||
conn.executemany(
|
||||
"INSERT OR REPLACE INTO card_details ("
|
||||
"registry_key, position, explanation_summary, explanation_waite, "
|
||||
"interpretation, guidance, date_start_month, date_start_day, "
|
||||
"date_end_month, date_end_day, degrees, extra_json"
|
||||
") VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
|
||||
details_rows,
|
||||
)
|
||||
|
||||
conn.executemany(
|
||||
"INSERT INTO card_keywords (card_key, keyword, is_reversed) VALUES (?, ?, ?)",
|
||||
keyword_rows,
|
||||
)
|
||||
|
||||
conn.executemany(
|
||||
"INSERT INTO card_angels (card_key, name, is_night) VALUES (?, ?, ?)",
|
||||
angel_rows,
|
||||
)
|
||||
|
||||
conn.executemany(
|
||||
"INSERT INTO card_demons (card_key, name, is_night) VALUES (?, ?, ?)",
|
||||
demon_rows,
|
||||
)
|
||||
|
||||
conn.commit()
|
||||
|
||||
def _prepare_rows(
|
||||
self, key: str, details: Dict[str, Any]
|
||||
) -> Tuple[Dict[str, Any], List[Dict[str, Any]], List[Dict[str, Any]], List[Dict[str, Any]]]:
|
||||
explanation = details.get("explanation") or {}
|
||||
summary = explanation.get("summary") if isinstance(explanation, dict) else None
|
||||
waite = explanation.get("waite") if isinstance(explanation, dict) else None
|
||||
|
||||
extra_explanation = {}
|
||||
if isinstance(explanation, dict):
|
||||
extra_explanation = {
|
||||
k: v for k, v in explanation.items() if k not in ("summary", "waite")
|
||||
}
|
||||
|
||||
keywords = details.get("keywords") or []
|
||||
reversed_keywords = details.get("reversed_keywords") or []
|
||||
|
||||
date_start_month = None
|
||||
date_start_day = None
|
||||
date_end_month = None
|
||||
date_end_day = None
|
||||
date_range = details.get("date_range")
|
||||
if isinstance(date_range, tuple) and len(date_range) == 2:
|
||||
start, end = date_range
|
||||
if (
|
||||
isinstance(start, tuple)
|
||||
and isinstance(end, tuple)
|
||||
and len(start) == 2
|
||||
and len(end) == 2
|
||||
):
|
||||
date_start_month, date_start_day = start
|
||||
date_end_month, date_end_day = end
|
||||
|
||||
extras = {k: v for k, v in details.items() if k not in self._DETAIL_KEYS}
|
||||
if extra_explanation:
|
||||
extras["explanation"] = extra_explanation
|
||||
|
||||
row = {
|
||||
"explanation_summary": summary,
|
||||
"explanation_waite": waite,
|
||||
"interpretation": details.get("interpretation") or None,
|
||||
"guidance": details.get("guidance") or None,
|
||||
"date_start_month": date_start_month,
|
||||
"date_start_day": date_start_day,
|
||||
"date_end_month": date_end_month,
|
||||
"date_end_day": date_end_day,
|
||||
"degrees": details.get("degrees") or None,
|
||||
"extra_json": json.dumps(self._to_json_compatible(extras))
|
||||
if extras
|
||||
else None,
|
||||
}
|
||||
|
||||
kws_list = []
|
||||
for k in keywords:
|
||||
kws_list.append({"keyword": k, "is_reversed": False})
|
||||
for k in reversed_keywords:
|
||||
kws_list.append({"keyword": k, "is_reversed": True})
|
||||
|
||||
angels_list = []
|
||||
if details.get("day_angel"):
|
||||
angels_list.append({"name": details["day_angel"], "is_night": False})
|
||||
if details.get("night_angel"):
|
||||
angels_list.append({"name": details["night_angel"], "is_night": True})
|
||||
|
||||
demons_list = []
|
||||
if details.get("day_demon"):
|
||||
demons_list.append({"name": details["day_demon"], "is_night": False})
|
||||
if details.get("night_demon"):
|
||||
demons_list.append({"name": details["night_demon"], "is_night": True})
|
||||
|
||||
return row, kws_list, angels_list, demons_list
|
||||
|
||||
def _details_to_row(self, details: Dict[str, Any]) -> Dict[str, Any]:
|
||||
"""Deprecated internal method, kept incase of direct calls but _prepare_rows is used now."""
|
||||
row, _ = self._prepare_rows("", details)
|
||||
return row
|
||||
|
||||
def _restore_details(self, details: Dict[str, Any]) -> Dict[str, Any]:
|
||||
date_range = details.get("date_range")
|
||||
if isinstance(date_range, list) and len(date_range) == 2:
|
||||
start, end = date_range
|
||||
if (
|
||||
isinstance(start, list)
|
||||
and isinstance(end, list)
|
||||
and len(start) == 2
|
||||
and len(end) == 2
|
||||
):
|
||||
details["date_range"] = (tuple(start), tuple(end))
|
||||
return details
|
||||
|
||||
@classmethod
|
||||
def _to_json_compatible(cls, value: Any) -> Any:
|
||||
if isinstance(value, dict):
|
||||
return {k: cls._to_json_compatible(v) for k, v in value.items()}
|
||||
if isinstance(value, tuple):
|
||||
return [cls._to_json_compatible(v) for v in value]
|
||||
if isinstance(value, list):
|
||||
return [cls._to_json_compatible(v) for v in value]
|
||||
return value
|
||||
|
||||
@staticmethod
|
||||
def key_to_roman(key: int) -> str:
|
||||
@@ -668,6 +1118,14 @@ class CardDetailsRegistry:
|
||||
"keywords": [],
|
||||
"reversed_keywords": [],
|
||||
"guidance": "",
|
||||
# Date range: month/day (no year) — (start_month, start_day) to (end_month, end_day)
|
||||
"date_range": ((6, 21), (7, 1)),
|
||||
# Optional fields (can be populated later):
|
||||
"degrees": 0-10,
|
||||
"day_angel": "EIAEL",
|
||||
"night_angel": "HABUIAH",
|
||||
"day_demon": "Buer",
|
||||
"night_demon": "Bifrons",
|
||||
},
|
||||
"Three of Cups": {
|
||||
"explanation": {
|
||||
@@ -1152,8 +1610,74 @@ class CardDetailsRegistry:
|
||||
card.reversed_keywords = details.get("reversed_keywords", [])
|
||||
card.guidance = details.get("guidance", "")
|
||||
|
||||
# Optional temporal/astrological correspondences for minor cards
|
||||
card.date_range = details.get("date_range")
|
||||
card.degrees = details.get("degrees")
|
||||
card.day_angel = details.get("day_angel")
|
||||
card.night_angel = details.get("night_angel")
|
||||
card.day_demon = details.get("day_demon")
|
||||
card.night_demon = details.get("night_demon")
|
||||
|
||||
return True
|
||||
|
||||
def __getitem__(self, card_name: str) -> Optional[Dict[str, Any]]:
|
||||
"""Allow dict-like access: registry['Princess of Swords']"""
|
||||
return self.get(card_name)
|
||||
|
||||
def find_positions_by_month_day(self, month: int, day: int) -> list:
|
||||
"""Return list of registry positions whose date_range includes the month/day.
|
||||
|
||||
Args:
|
||||
month: Month (1-12)
|
||||
day: Day (1-31)
|
||||
|
||||
Returns:
|
||||
List of integer positions (1-78) matching the date
|
||||
"""
|
||||
from utils.dates import is_month_day_in_range
|
||||
|
||||
matches: list[int] = []
|
||||
for pos, key in self._position_map.items():
|
||||
details = self._details.get(key)
|
||||
if not details:
|
||||
continue
|
||||
dr = details.get("date_range")
|
||||
if not dr:
|
||||
continue
|
||||
start, end = dr
|
||||
try:
|
||||
if is_month_day_in_range(month, day, start, end):
|
||||
matches.append(pos)
|
||||
except ValueError:
|
||||
# Invalid date in registry or input; ignore
|
||||
continue
|
||||
return matches
|
||||
|
||||
def find_cards_by_month_day(self, month: int, day: int, load_details: bool = True) -> list:
|
||||
"""Return a list of Card objects whose date_range includes month/day.
|
||||
|
||||
If load_details is True, registry details will be loaded into returned Card objects.
|
||||
"""
|
||||
from tarot.deck import Deck
|
||||
from .loader import load_card_details
|
||||
|
||||
deck = Deck()
|
||||
positions = self.find_positions_by_month_day(month, day)
|
||||
cards = []
|
||||
for pos in positions:
|
||||
# Positions are 1-indexed; deck.cards is list
|
||||
try:
|
||||
card = deck.cards[pos - 1]
|
||||
except Exception:
|
||||
continue
|
||||
if load_details:
|
||||
load_card_details(card, self)
|
||||
cards.append(card)
|
||||
return cards
|
||||
|
||||
def find_cards_by_date_str(self, date_str: str, load_details: bool = True) -> list:
|
||||
"""Parse a date string and return matching Card objects."""
|
||||
from utils.dates import parse_month_day
|
||||
|
||||
month, day = parse_month_day(date_str)
|
||||
return self.find_cards_by_month_day(month, day, load_details)
|
||||
|
||||
297
src/tarot/db_import.py
Normal file
297
src/tarot/db_import.py
Normal file
@@ -0,0 +1,297 @@
|
||||
"""Helpers to import CSV/Excel data into the project's SQLite DB.
|
||||
|
||||
This module provides a small, dependency-light importer that supports CSV out of the
|
||||
box and Excel (.xlsx) when ``openpyxl`` is installed. It intentionally keeps the
|
||||
mapping simple (header-driven) and provides safe deduplication for normalized
|
||||
keyword/angel/demon tables.
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
import csv
|
||||
import sqlite3
|
||||
from dataclasses import dataclass
|
||||
from pathlib import Path
|
||||
from typing import Dict, Iterable, List, Optional, Tuple
|
||||
|
||||
try:
|
||||
from openpyxl import load_workbook # type: ignore
|
||||
except Exception: # pragma: no cover - optional dependency
|
||||
load_workbook = None # type: ignore
|
||||
|
||||
from tarot.card.details import CardDetailsRegistry
|
||||
|
||||
|
||||
# Simple helpers -----------------------------------------------------------
|
||||
|
||||
def _read_csv(path: Path, delimiter: str = ",") -> Iterable[Dict[str, str]]:
|
||||
with path.open("r", encoding="utf-8-sig", newline="") as fh:
|
||||
reader = csv.DictReader(fh, delimiter=delimiter)
|
||||
for row in reader:
|
||||
yield {k.strip(): (v.strip() if isinstance(v, str) else v) for k, v in row.items()}
|
||||
|
||||
|
||||
def _read_excel(path: Path, sheet: Optional[str] = None) -> Iterable[Dict[str, str]]:
|
||||
if load_workbook is None:
|
||||
raise RuntimeError("openpyxl is required to import Excel files - pip install openpyxl")
|
||||
wb = load_workbook(path, read_only=True, data_only=True)
|
||||
ws = wb[sheet] if sheet and sheet in wb.sheetnames else wb[wb.sheetnames[0]]
|
||||
it = ws.iter_rows(values_only=True)
|
||||
try:
|
||||
headers = next(it)
|
||||
except StopIteration:
|
||||
return
|
||||
headers = [h.strip() if isinstance(h, str) else str(h) for h in headers]
|
||||
for row in it:
|
||||
yield {str(h).strip(): (str(v).strip() if v is not None else "") for h, v in zip(headers, row)}
|
||||
|
||||
|
||||
def _get_conn(db_path: Optional[Path]) -> sqlite3.Connection:
|
||||
if db_path is None:
|
||||
reg = CardDetailsRegistry(use_db=True)
|
||||
db_path = reg._db_path # use the same resolution logic
|
||||
else:
|
||||
# Ensure DB is seeded if missing
|
||||
CardDetailsRegistry(use_db=True, db_path=db_path)
|
||||
conn = sqlite3.connect(db_path)
|
||||
conn.row_factory = sqlite3.Row
|
||||
return conn
|
||||
|
||||
|
||||
def _coerce_bool(val: Optional[str]) -> int:
|
||||
if val is None:
|
||||
return 0
|
||||
if isinstance(val, bool):
|
||||
return 1 if val else 0
|
||||
s = str(val).strip().lower()
|
||||
return 1 if s in {"1", "true", "yes", "y", "t"} else 0
|
||||
|
||||
|
||||
def _parse_date_to_parts(val: Optional[str]) -> Optional[Tuple[int, int]]:
|
||||
if not val:
|
||||
return None
|
||||
s = str(val).strip()
|
||||
if not s:
|
||||
return None
|
||||
# Accept formats: M/D, MM/DD, YYYY-MM-DD
|
||||
if "/" in s:
|
||||
parts = s.split("/")
|
||||
if len(parts) >= 2:
|
||||
try:
|
||||
m = int(parts[0])
|
||||
d = int(parts[1])
|
||||
return (m, d)
|
||||
except Exception:
|
||||
return None
|
||||
if "-" in s:
|
||||
parts = s.split("-")
|
||||
try:
|
||||
m = int(parts[1])
|
||||
d = int(parts[2])
|
||||
return (m, d)
|
||||
except Exception:
|
||||
return None
|
||||
return None
|
||||
|
||||
|
||||
# Table-specific importers -----------------------------------------------
|
||||
|
||||
@dataclass
|
||||
class ImportResult:
|
||||
inserted: int = 0
|
||||
skipped: int = 0
|
||||
updated: int = 0
|
||||
|
||||
|
||||
def import_keywords(rows: Iterable[Dict[str, str]], db_path: Optional[Path] = None) -> ImportResult:
|
||||
conn = _get_conn(db_path)
|
||||
cur = conn.cursor()
|
||||
res = ImportResult()
|
||||
try:
|
||||
for row in rows:
|
||||
card_key = row.get("card_key") or row.get("registry_key") or row.get("card")
|
||||
keyword = row.get("keyword") or row.get("kw")
|
||||
is_rev = _coerce_bool(row.get("is_reversed") or row.get("reversed"))
|
||||
if not card_key or not keyword:
|
||||
res.skipped += 1
|
||||
continue
|
||||
# deduplicate
|
||||
exists = cur.execute(
|
||||
"SELECT 1 FROM card_keywords WHERE card_key=? AND keyword=? AND is_reversed=?",
|
||||
(card_key, keyword, is_rev),
|
||||
).fetchone()
|
||||
if exists:
|
||||
res.skipped += 1
|
||||
continue
|
||||
cur.execute(
|
||||
"INSERT INTO card_keywords (card_key, keyword, is_reversed) VALUES (?, ?, ?)",
|
||||
(card_key, keyword, is_rev),
|
||||
)
|
||||
res.inserted += 1
|
||||
conn.commit()
|
||||
finally:
|
||||
conn.close()
|
||||
return res
|
||||
|
||||
|
||||
def import_angels(rows: Iterable[Dict[str, str]], db_path: Optional[Path] = None) -> ImportResult:
|
||||
conn = _get_conn(db_path)
|
||||
cur = conn.cursor()
|
||||
res = ImportResult()
|
||||
try:
|
||||
for row in rows:
|
||||
card_key = row.get("card_key") or row.get("registry_key") or row.get("card")
|
||||
name = row.get("name") or row.get("angel")
|
||||
is_night = _coerce_bool(row.get("is_night") or row.get("night") or row.get("isNight"))
|
||||
if not card_key or not name:
|
||||
res.skipped += 1
|
||||
continue
|
||||
exists = cur.execute(
|
||||
"SELECT 1 FROM card_angels WHERE card_key=? AND name=? AND is_night=?",
|
||||
(card_key, name, is_night),
|
||||
).fetchone()
|
||||
if exists:
|
||||
res.skipped += 1
|
||||
continue
|
||||
cur.execute(
|
||||
"INSERT INTO card_angels (card_key, name, is_night) VALUES (?, ?, ?)",
|
||||
(card_key, name, is_night),
|
||||
)
|
||||
res.inserted += 1
|
||||
conn.commit()
|
||||
finally:
|
||||
conn.close()
|
||||
return res
|
||||
|
||||
|
||||
def import_demons(rows: Iterable[Dict[str, str]], db_path: Optional[Path] = None) -> ImportResult:
|
||||
conn = _get_conn(db_path)
|
||||
cur = conn.cursor()
|
||||
res = ImportResult()
|
||||
try:
|
||||
for row in rows:
|
||||
card_key = row.get("card_key") or row.get("registry_key") or row.get("card")
|
||||
name = row.get("name") or row.get("demon")
|
||||
is_night = _coerce_bool(row.get("is_night") or row.get("night") or row.get("isNight"))
|
||||
if not card_key or not name:
|
||||
res.skipped += 1
|
||||
continue
|
||||
exists = cur.execute(
|
||||
"SELECT 1 FROM card_demons WHERE card_key=? AND name=? AND is_night=?",
|
||||
(card_key, name, is_night),
|
||||
).fetchone()
|
||||
if exists:
|
||||
res.skipped += 1
|
||||
continue
|
||||
cur.execute(
|
||||
"INSERT INTO card_demons (card_key, name, is_night) VALUES (?, ?, ?)",
|
||||
(card_key, name, is_night),
|
||||
)
|
||||
res.inserted += 1
|
||||
conn.commit()
|
||||
finally:
|
||||
conn.close()
|
||||
return res
|
||||
|
||||
|
||||
def import_card_details(rows: Iterable[Dict[str, str]], db_path: Optional[Path] = None) -> ImportResult:
|
||||
conn = _get_conn(db_path)
|
||||
cur = conn.cursor()
|
||||
res = ImportResult()
|
||||
allowed = {
|
||||
"registry_key",
|
||||
"explanation_summary",
|
||||
"explanation_waite",
|
||||
"interpretation",
|
||||
"guidance",
|
||||
"date_start",
|
||||
"date_end",
|
||||
"date_start_month",
|
||||
"date_start_day",
|
||||
"date_end_month",
|
||||
"date_end_day",
|
||||
"degrees",
|
||||
"extra_json",
|
||||
}
|
||||
try:
|
||||
for row in rows:
|
||||
if not row:
|
||||
res.skipped += 1
|
||||
continue
|
||||
reg_key = row.get("registry_key") or row.get("card_key") or row.get("card")
|
||||
if not reg_key:
|
||||
res.skipped += 1
|
||||
continue
|
||||
# Process date short forms
|
||||
ds = _parse_date_to_parts(row.get("date_start"))
|
||||
de = _parse_date_to_parts(row.get("date_end"))
|
||||
if ds and de:
|
||||
row["date_start_month"], row["date_start_day"] = ds
|
||||
row["date_end_month"], row["date_end_day"] = de
|
||||
|
||||
# Build columns/values to upsert
|
||||
cols = ["registry_key"]
|
||||
vals = [reg_key]
|
||||
up_cols = []
|
||||
up_vals = []
|
||||
for k, v in row.items():
|
||||
if k == "registry_key":
|
||||
continue
|
||||
if k not in allowed:
|
||||
continue
|
||||
if v is None or v == "":
|
||||
continue
|
||||
cols.append(k)
|
||||
vals.append(v)
|
||||
up_cols.append(f"{k}=excluded.{k}")
|
||||
placeholders = ",".join("?" for _ in vals)
|
||||
sql = (
|
||||
f"INSERT INTO card_details ({','.join(cols)}) VALUES ({placeholders}) "
|
||||
f"ON CONFLICT(registry_key) DO UPDATE SET {', '.join(up_cols)}"
|
||||
)
|
||||
cur.execute(sql, tuple(vals))
|
||||
# rough heuristic: treat as inserted if no row existed
|
||||
if cur.rowcount > 0:
|
||||
res.inserted += 1
|
||||
else:
|
||||
res.updated += 1
|
||||
conn.commit()
|
||||
finally:
|
||||
conn.close()
|
||||
return res
|
||||
|
||||
|
||||
# Public helper -----------------------------------------------------------
|
||||
|
||||
def import_file(
|
||||
path: Path,
|
||||
table: str,
|
||||
*,
|
||||
db_path: Optional[Path] = None,
|
||||
sheet: Optional[str] = None,
|
||||
delimiter: str = ",",
|
||||
) -> ImportResult:
|
||||
"""Import a CSV or Excel file into one of the supported tables.
|
||||
|
||||
Supported tables: ``card_keywords``, ``card_angels``, ``card_demons``, ``card_details``.
|
||||
|
||||
The importer is header-driven and will look for reasonable alternate column
|
||||
names (e.g., ``card_key`` / ``card`` / ``registry_key``).
|
||||
"""
|
||||
if not path.exists():
|
||||
raise FileNotFoundError(path)
|
||||
suffix = path.suffix.lower()
|
||||
if suffix in {".xls", ".xlsx"}:
|
||||
rows = _read_excel(path, sheet)
|
||||
else:
|
||||
rows = _read_csv(path, delimiter)
|
||||
|
||||
table = table.lower()
|
||||
if table == "card_keywords":
|
||||
return import_keywords(rows, db_path)
|
||||
if table == "card_angels":
|
||||
return import_angels(rows, db_path)
|
||||
if table == "card_demons":
|
||||
return import_demons(rows, db_path)
|
||||
if table == "card_details":
|
||||
return import_card_details(rows, db_path)
|
||||
raise ValueError(f"Unsupported table: {table}")
|
||||
@@ -73,6 +73,17 @@ class Card:
|
||||
# Image path for custom deck images
|
||||
image_path: Optional[str] = None
|
||||
|
||||
# Optional astrological/temporal correspondences (used primarily for Minor Arcana)
|
||||
# - date_range: ((start_month, start_day), (end_month, end_day))
|
||||
# - degrees: free-form degree information (string)
|
||||
# - day/night angels and demons
|
||||
date_range: Optional[Tuple[Tuple[int, int], Tuple[int, int]]] = None
|
||||
degrees: Optional[str] = None
|
||||
day_angel: Optional[str] = None
|
||||
night_angel: Optional[str] = None
|
||||
day_demon: Optional[str] = None
|
||||
night_demon: Optional[str] = None
|
||||
|
||||
def __str__(self) -> str:
|
||||
return f"{self.number}. {self.name}"
|
||||
|
||||
|
||||
139
src/utils/dates.py
Normal file
139
src/utils/dates.py
Normal file
@@ -0,0 +1,139 @@
|
||||
"""
|
||||
Date utilities for month/day ranges (no year).
|
||||
|
||||
Provides parsing and range-checking functions for month/day pairs
|
||||
(e.g., (6,21) meaning June 21).
|
||||
|
||||
Functions:
|
||||
- parse_month_day: parse '6/21', '06-21', '2026-06-21', 'June 21', 'Jun 21'
|
||||
- is_month_day_in_range: test whether a month/day lies within a start/end pair
|
||||
- month_day_to_day_of_year: convert a month/day to day-of-year for comparisons
|
||||
- format_month_day: display nicely
|
||||
|
||||
Note: Uses a fixed non-leap reference year for day-of-year calculations.
|
||||
"""
|
||||
|
||||
import re
|
||||
import calendar
|
||||
from datetime import date
|
||||
from typing import Tuple
|
||||
|
||||
# Reference non-leap year used for day-of-year calculations
|
||||
_REFERENCE_YEAR = 2001
|
||||
|
||||
_MONTH_NAME_MAP = {
|
||||
'jan': 1,
|
||||
'january': 1,
|
||||
'feb': 2,
|
||||
'february': 2,
|
||||
'mar': 3,
|
||||
'march': 3,
|
||||
'apr': 4,
|
||||
'april': 4,
|
||||
'may': 5,
|
||||
'jun': 6,
|
||||
'june': 6,
|
||||
'jul': 7,
|
||||
'july': 7,
|
||||
'aug': 8,
|
||||
'august': 8,
|
||||
'sep': 9,
|
||||
'sept': 9,
|
||||
'september': 9,
|
||||
'oct': 10,
|
||||
'october': 10,
|
||||
'nov': 11,
|
||||
'november': 11,
|
||||
'dec': 12,
|
||||
'december': 12,
|
||||
}
|
||||
|
||||
|
||||
def month_day_to_day_of_year(month: int, day: int) -> int:
|
||||
"""Convert (month, day) to day-of-year using a non-leap reference year."""
|
||||
try:
|
||||
dt = date(_REFERENCE_YEAR, month, day)
|
||||
except ValueError as e:
|
||||
raise ValueError(f"Invalid month/day: {month}/{day}: {e}")
|
||||
return dt.timetuple().tm_yday
|
||||
|
||||
|
||||
def is_month_day_in_range(
|
||||
month: int, day: int, start: Tuple[int, int], end: Tuple[int, int]
|
||||
) -> bool:
|
||||
"""Return True if (month, day) is within the inclusive range start->end.
|
||||
|
||||
Handles ranges that cross year boundary (e.g., Dec 20 -> Jan 5).
|
||||
"""
|
||||
day_num = month_day_to_day_of_year(month, day)
|
||||
start_num = month_day_to_day_of_year(*start)
|
||||
end_num = month_day_to_day_of_year(*end)
|
||||
|
||||
if start_num <= end_num:
|
||||
return start_num <= day_num <= end_num
|
||||
# Wrap-around
|
||||
return day_num >= start_num or day_num <= end_num
|
||||
|
||||
|
||||
def parse_month_day(s: str) -> Tuple[int, int]:
|
||||
"""Parse a date string and return (month, day).
|
||||
|
||||
Accepted formats:
|
||||
- MM/DD or M/D
|
||||
- MM-DD
|
||||
- YYYY-MM-DD or YYYY/MM/DD (year ignored)
|
||||
- MonthName D or Mon D (e.g., 'June 21' or 'Jun 21')
|
||||
|
||||
Raises ValueError if cannot parse or the date is invalid.
|
||||
"""
|
||||
if not s or not isinstance(s, str):
|
||||
raise ValueError("Empty date string")
|
||||
|
||||
s = s.strip()
|
||||
|
||||
# Numeric forms: MM/DD or MM-DD
|
||||
m = re.match(r"^(?P<m>\d{1,2})\s*[/-]\s*(?P<d>\d{1,2})$", s)
|
||||
if m:
|
||||
month = int(m.group("m"))
|
||||
day = int(m.group("d"))
|
||||
# validate
|
||||
month_day_to_day_of_year(month, day)
|
||||
return month, day
|
||||
|
||||
# ISO with year: YYYY-MM-DD or YYYY/MM/DD
|
||||
m = re.match(r"^(?P<y>\d{4})[/-](?P<m>\d{1,2})[/-](?P<d>\d{1,2})$", s)
|
||||
if m:
|
||||
month = int(m.group("m"))
|
||||
day = int(m.group("d"))
|
||||
month_day_to_day_of_year(month, day)
|
||||
return month, day
|
||||
|
||||
# Month name forms: 'June 21' or '21 June'
|
||||
parts = re.split(r"[ ,]+", s)
|
||||
if len(parts) == 2:
|
||||
a, b = parts
|
||||
# If a is month name
|
||||
a_low = a.lower()
|
||||
if a_low in _MONTH_NAME_MAP:
|
||||
month = _MONTH_NAME_MAP[a_low]
|
||||
day = int(b)
|
||||
month_day_to_day_of_year(month, day)
|
||||
return month, day
|
||||
# If b is month name (e.g., '21 June')
|
||||
b_low = b.lower()
|
||||
if b_low in _MONTH_NAME_MAP:
|
||||
month = _MONTH_NAME_MAP[b_low]
|
||||
day = int(a)
|
||||
month_day_to_day_of_year(month, day)
|
||||
return month, day
|
||||
|
||||
raise ValueError(f"Unrecognized date format: '{s}'")
|
||||
|
||||
|
||||
def format_month_day(month: int, day: int) -> str:
|
||||
"""Return a short human-friendly representation like 'Jun 21'."""
|
||||
if not (1 <= month <= 12):
|
||||
raise ValueError("month must be 1..12")
|
||||
# Use abbreviated month name
|
||||
month_name = calendar.month_abbr[month]
|
||||
return f"{month_name} {day}"
|
||||
@@ -85,6 +85,64 @@ def _matches_filter(obj: Any, key: str, value: Any) -> bool:
|
||||
"""
|
||||
attr_value = getattr(obj, key, None)
|
||||
|
||||
# Special handling for date filters: accept strings like '6/21', 'Jun 21',
|
||||
# ISO dates (YYYY-MM-DD), or tuples like (6, 21). The object's `date_range`
|
||||
# (if present) will be checked; if not, fall back to registry lookup by
|
||||
# numeric position when available (e.g., Card.number).
|
||||
if key == "date":
|
||||
from utils.dates import parse_month_day, is_month_day_in_range
|
||||
|
||||
# Normalize the provided value(s) into a list of checks
|
||||
values_to_check = []
|
||||
if isinstance(value, str) and "," in value:
|
||||
values_to_check = [v.strip() for v in value.split(",")]
|
||||
elif isinstance(value, (list, tuple)):
|
||||
# Could be a single tuple (6,21) or list of such tuples/strings
|
||||
if len(value) and isinstance(value[0], (list, tuple)):
|
||||
values_to_check = list(value)
|
||||
else:
|
||||
values_to_check = [value]
|
||||
else:
|
||||
values_to_check = [value]
|
||||
|
||||
# Obtain the object's date_range, fallback to registry if absent
|
||||
dr = attr_value
|
||||
if dr is None:
|
||||
obj_num = getattr(obj, "number", None)
|
||||
if obj_num is not None:
|
||||
try:
|
||||
from tarot.card.details import CardDetailsRegistry
|
||||
|
||||
registry = CardDetailsRegistry()
|
||||
details = registry.get_by_position(obj_num)
|
||||
if details is not None:
|
||||
dr = details.get("date_range")
|
||||
except Exception:
|
||||
dr = None
|
||||
|
||||
if dr is None:
|
||||
return False
|
||||
|
||||
start, end = dr
|
||||
|
||||
for check_value in values_to_check:
|
||||
try:
|
||||
if isinstance(check_value, (list, tuple)):
|
||||
month, day = int(check_value[0]), int(check_value[1])
|
||||
elif isinstance(check_value, str):
|
||||
month, day = parse_month_day(check_value)
|
||||
else:
|
||||
# Fallback: try to interpret as sequence
|
||||
month, day = int(check_value[0]), int(check_value[1])
|
||||
|
||||
if is_month_day_in_range(month, day, start, end):
|
||||
return True
|
||||
except Exception:
|
||||
# Ignore parse/validation errors per individual check
|
||||
continue
|
||||
|
||||
return False
|
||||
|
||||
if attr_value is None:
|
||||
return False
|
||||
|
||||
|
||||
Reference in New Issue
Block a user