diff --git a/cli.py b/cli.py index b12efbe..eb45eb4 100644 --- a/cli.py +++ b/cli.py @@ -375,6 +375,35 @@ def search( typer.echo("No cards matched that filter.") raise typer.Exit(code=0) + +@app.command("import-db") +def import_db( + file: Path = typer.Argument(..., help="CSV or XLSX file to import"), + table: str = typer.Option(..., "--table", "-t", help="Target table to import into"), + sheet: Optional[str] = typer.Option(None, "--sheet", help="Excel sheet name (optional)"), + delimiter: str = typer.Option(",", "--delimiter", "-d", help="CSV delimiter (default ',')"), + db_path: Optional[Path] = typer.Option(None, "--db", help="Path to target SQLite DB"), +) -> None: + """Import CSV or Excel data into the project's DB tables. + + Examples: + tarot import-db angels.xlsx --table card_angels + tarot import-db angels.csv --table card_angels --delimiter ',' + """ + try: + from tarot.db_import import import_file + except Exception as exc: # pragma: no cover - import errors bubble up in tests + typer.echo(f"Error loading import helper: {exc}") + raise typer.Exit(code=1) + + try: + res = import_file(file, table, db_path=db_path, sheet=sheet, delimiter=delimiter) + except Exception as exc: # pragma: no cover - errors will be surfaced for visibility + typer.echo(f"Import failed: {exc}") + raise typer.Exit(code=1) + + typer.echo(f"Inserted: {res.inserted}; Skipped: {res.skipped}; Updated: {res.updated}") + def _doc(obj: Any, fallback: str) -> str: return inspect.getdoc(obj) or fallback diff --git a/cli_renderers.py b/cli_renderers.py index be239db..1da557c 100644 --- a/cli_renderers.py +++ b/cli_renderers.py @@ -6,7 +6,7 @@ specialized handling for planets plus generic list/dict/object renderers. from __future__ import annotations -from typing import Any, Dict, List, Optional, Sequence +from typing import Any, Dict, List, Optional, Sequence, Tuple from rich import box from rich.console import Console, Group @@ -15,6 +15,8 @@ from rich.table import Table from tarot import Card from utils.attributes import Color, Planet +from utils.dates import format_month_day +from utils.object_formatting import format_value, get_object_attributes, is_nested_object class Renderer: @@ -69,27 +71,10 @@ class Renderer: self.console.print(table) def print_cards_table(self, cards: List[Card]) -> None: - table = Table(title="Cards", show_lines=True) - table.add_column("#", justify="right") - table.add_column("Name") - table.add_column("Arcana") - table.add_column("Suit") - table.add_column("Pip/Court") - - for card in cards: - suit = getattr(getattr(card, "suit", None), "name", "") - pip = str(getattr(card, "pip", "")) if getattr(card, "pip", None) else "" - court = getattr(card, "court_rank", "") - pip_display = court or pip - table.add_row( - str(getattr(card, "number", "")), - getattr(card, "name", ""), - getattr(card, "arcana", ""), - suit, - pip_display, - ) - - self.console.print(table) + for idx, card in enumerate(cards): + self._render_card(card) + if idx < len(cards) - 1: + self.console.print() # Internal renderers ------------------------------------------------- def _render_card(self, card: Card) -> None: @@ -99,7 +84,6 @@ class Renderer: arcana = getattr(card, "arcana", "") suit_obj = getattr(card, "suit", None) suit = getattr(suit_obj, "name", "") if suit_obj is not None else "" - pip = getattr(card, "pip", None) number = getattr(card, "number", "") keywords = getattr(card, "keywords", None) or [] meaning = getattr(card, "meaning", None) @@ -112,18 +96,7 @@ class Renderer: top_right = f"#{number}" if number not in (None, "") else "" top.add_row(top_left, top_center, top_right) - body_lines = [f"Arcana: {arcana or '-'}"] - if suit: - body_lines.append(f"Suit: {suit}") - if pip: - body_lines.append(f"Pip: {pip}") - if getattr(card, "court_rank", ""): - body_lines.append(f"Court Rank: {card.court_rank}") - if keywords: - body_lines.append(f"Keywords: {', '.join(keywords)}") - guidance = getattr(card, "guidance", None) - if guidance: - body_lines.append(f"Guidance: {guidance}") + body_lines = self._card_field_lines(card) body = Panel("\n".join(body_lines), box=box.MINIMAL, padding=(1, 1)) bottom = Table( @@ -292,6 +265,68 @@ class Renderer: safe_val = value if value not in (None, "") else "—" return f"{label}:\n{safe_val}" + def _card_field_lines(self, card: Card) -> List[str]: + exclude = {"name", "number", "meaning", "keywords"} + lines: List[str] = [] + for attr_name, attr_value in get_object_attributes(card): + if attr_name in exclude: + continue + if attr_value in (None, "", [], {}, ()): # Skip empty values + continue + + label = self._humanize_key(attr_name).title() + if is_nested_object(attr_value) and not hasattr(attr_value, "name"): + lines.append(f"{label}:") + nested = format_value(attr_value, indent=2) + lines.extend(nested.splitlines()) + continue + + lines.append(f"{label}: {self._format_card_value(attr_value)}") + + if not lines: + lines.append("(no fields)") + return lines + + def _format_card_value(self, value: Any) -> str: + if value is None: + return "—" + if isinstance(value, (str, int, float, bool)): + return str(value) + if isinstance(value, tuple): + date_range = self._format_date_range(value) + if date_range is not None: + return date_range + return ", ".join(self._format_card_value(v) for v in value) + if isinstance(value, list): + return ", ".join(self._format_card_value(v) for v in value) + if isinstance(value, dict): + return ", ".join( + f"{self._humanize_key(str(k)).title()}: {self._format_card_value(v)}" + for k, v in value.items() + ) + if hasattr(value, "name"): + return str(getattr(value, "name", value)) + if is_nested_object(value): + return format_value(value, indent=2) + return str(value) + + @staticmethod + def _format_date_range(value: Tuple[Any, ...]) -> Optional[str]: + if len(value) != 2: + return None + start, end = value + if not ( + isinstance(start, tuple) + and isinstance(end, tuple) + and len(start) == 2 + and len(end) == 2 + ): + return None + try: + return f"{format_month_day(start)} - {format_month_day(end)}" + except Exception: + return None + class PlanetRenderer: """Type-specific table rendering for planets.""" diff --git a/src/tarot/card/details.py b/src/tarot/card/details.py index d04a340..b9018c8 100644 --- a/src/tarot/card/details.py +++ b/src/tarot/card/details.py @@ -21,7 +21,11 @@ Usage: registry.load_into_card(card) """ -from typing import TYPE_CHECKING, Any, Dict, Optional +import json +import os +import sqlite3 +from pathlib import Path +from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple from tarot.constants import build_position_map @@ -43,11 +47,457 @@ class CardDetailsRegistry: - 65-78: Wands (same structure) """ - def __init__(self) -> None: - """Initialize the card details registry with interpretive data.""" - self._details: Dict[str, Dict[str, Any]] = self._build_registry() + _DETAIL_KEYS = { + "explanation", + "interpretation", + "keywords", + "reversed_keywords", + "guidance", + "date_range", + "degrees", + "day_angel", + "night_angel", + "day_demon", + "night_demon", + } + + def __init__(self, *, use_db: bool = True, db_path: Optional[Path] = None) -> None: + """Initialize the card details registry with interpretive data. + + Args: + use_db: If True, load details from a local SQLite database (creating it + on first run). If False, use the in-code registry only. + db_path: Optional custom path for the SQLite file. + """ # Map card positions (1-78) to registry keys self._position_map = self._build_position_map() + self._use_db = use_db + self._db_path = self._resolve_db_path(db_path) + + if self._use_db: + self._details = self._load_registry_from_db() + else: + self._details = self._build_registry() + + @staticmethod + def _resolve_db_path(db_path: Optional[Path]) -> Path: + """Resolve the database path with priority order. + + Priority: + 1. Explicit db_path argument + 2. PY_TAROT_DB_PATH or TAROT_DB_PATH environment variable + 3. Project root (where pyproject.toml is located) + 4. User home directory ~/.py-tarot/ + """ + if db_path is not None: + return Path(db_path) + + env_path = os.getenv("PY_TAROT_DB_PATH") or os.getenv("TAROT_DB_PATH") + if env_path: + return Path(env_path) + + # Try to find project root by looking for pyproject.toml + current = Path(__file__).resolve() + for parent in current.parents: + if (parent / "pyproject.toml").exists() or (parent / ".git").exists(): + return parent / "tarot.db" + + # Fallback to user home directory + return Path.home() / ".py-tarot" / "tarot.db" + + def _load_registry_from_db(self) -> Dict[str, Dict[str, Any]]: + try: + self._ensure_db() + except (OSError, sqlite3.Error): + return self._build_registry() + + conn = sqlite3.connect(self._db_path) + conn.row_factory = sqlite3.Row + try: + # Check for new schema columns + conn.execute("SELECT explanation_summary FROM card_details LIMIT 1") + + rows = conn.execute( + "SELECT registry_key, position, explanation_summary, explanation_waite, " + "interpretation, guidance, date_start_month, date_start_day, " + "date_end_month, date_end_day, degrees, extra_json FROM card_details" + ).fetchall() + except sqlite3.OperationalError: + conn.close() + if self._rebuild_db(): + conn = sqlite3.connect(self._db_path) + conn.row_factory = sqlite3.Row + try: + rows = conn.execute( + "SELECT registry_key, position, explanation_summary, explanation_waite, " + "interpretation, guidance, date_start_month, date_start_day, " + "date_end_month, date_end_day, degrees, extra_json FROM card_details" + ).fetchall() + finally: + pass + else: + return self._build_registry() + + # Load keywords + keywords_map: Dict[str, Dict[str, List[str]]] = {} + try: + kw_rows = conn.execute( + "SELECT card_key, keyword, is_reversed FROM card_keywords" + ).fetchall() + for k_row in kw_rows: + ckey = k_row["card_key"] + kw = k_row["keyword"] + is_rev = bool(k_row["is_reversed"]) + if ckey not in keywords_map: + keywords_map[ckey] = {"keywords": [], "reversed_keywords": []} + if is_rev: + keywords_map[ckey]["reversed_keywords"].append(kw) + else: + keywords_map[ckey]["keywords"].append(kw) + except sqlite3.OperationalError: + pass + + # Load angels and demons + angels_map: Dict[str, Dict[str, str]] = {} + demons_map: Dict[str, Dict[str, str]] = {} + try: + angel_rows = conn.execute( + "SELECT card_key, name, is_night FROM card_angels" + ).fetchall() + for a_row in angel_rows: + ckey = a_row["card_key"] + name = a_row["name"] + is_night = bool(a_row["is_night"]) + if ckey not in angels_map: + angels_map[ckey] = {} + field = "night_angel" if is_night else "day_angel" + angels_map[ckey][field] = name + + demon_rows = conn.execute( + "SELECT card_key, name, is_night FROM card_demons" + ).fetchall() + for d_row in demon_rows: + ckey = d_row["card_key"] + name = d_row["name"] + is_night = bool(d_row["is_night"]) + if ckey not in demons_map: + demons_map[ckey] = {} + field = "night_demon" if is_night else "day_demon" + demons_map[ckey][field] = name + except sqlite3.OperationalError: + pass + + details: Dict[str, Dict[str, Any]] = {} + for row in rows: + entry: Dict[str, Any] = {} + ex = {} + if row["explanation_summary"]: + ex["summary"] = row["explanation_summary"] + if row["explanation_waite"]: + ex["waite"] = row["explanation_waite"] + + if row["extra_json"]: + try: + extra = json.loads(row["extra_json"]) + if "explanation" in extra and isinstance(extra["explanation"], dict): + ex.update(extra.pop("explanation")) + entry.update(self._restore_details(extra)) + except (TypeError, ValueError): + pass + + if ex: + entry["explanation"] = ex + + if row["interpretation"]: + entry["interpretation"] = row["interpretation"] + if row["guidance"]: + entry["guidance"] = row["guidance"] + + kws = keywords_map.get(row["registry_key"], {}) + if kws.get("keywords"): + entry["keywords"] = kws["keywords"] + if kws.get("reversed_keywords"): + entry["reversed_keywords"] = kws["reversed_keywords"] + + # Merge angels from table + angels = angels_map.get(row["registry_key"], {}) + for field in ["day_angel", "night_angel"]: + if field in angels: + entry[field] = angels[field] + + # Merge demons from table + demons = demons_map.get(row["registry_key"], {}) + for field in ["day_demon", "night_demon"]: + if field in demons: + entry[field] = demons[field] + + if ( + row["date_start_month"] + and row["date_start_day"] + and row["date_end_month"] + and row["date_end_day"] + ): + entry["date_range"] = ( + (int(row["date_start_month"]), int(row["date_start_day"])), + (int(row["date_end_month"]), int(row["date_end_day"])), + ) + + if row["degrees"]: + entry["degrees"] = row["degrees"] + + details[row["registry_key"]] = entry + + conn.close() + return details + + def _ensure_db(self) -> None: + self._db_path.parent.mkdir(parents=True, exist_ok=True) + conn = sqlite3.connect(self._db_path) + try: + # Main card details table + conn.execute( + "CREATE TABLE IF NOT EXISTS card_details (" + "registry_key TEXT PRIMARY KEY, " + "position INTEGER, " + "explanation_summary TEXT, " + "explanation_waite TEXT, " + "interpretation TEXT, " + "guidance TEXT, " + "date_start_month INTEGER, " + "date_start_day INTEGER, " + "date_end_month INTEGER, " + "date_end_day INTEGER, " + "degrees TEXT, " + "extra_json TEXT" + ")" + ) + + # Keywords table (normalized) + conn.execute( + "CREATE TABLE IF NOT EXISTS card_keywords (" + "id INTEGER PRIMARY KEY AUTOINCREMENT, " + "card_key TEXT NOT NULL, " + "keyword TEXT NOT NULL, " + "is_reversed BOOLEAN DEFAULT 0, " + "FOREIGN KEY(card_key) REFERENCES card_details(registry_key) ON DELETE CASCADE" + ")" + ) + + # Angels table (normalized) + conn.execute( + "CREATE TABLE IF NOT EXISTS card_angels (" + "id INTEGER PRIMARY KEY AUTOINCREMENT, " + "card_key TEXT NOT NULL, " + "name TEXT NOT NULL, " + "is_night BOOLEAN DEFAULT 0, " + "FOREIGN KEY(card_key) REFERENCES card_details(registry_key) ON DELETE CASCADE" + ")" + ) + + # Demons table (normalized) + conn.execute( + "CREATE TABLE IF NOT EXISTS card_demons (" + "id INTEGER PRIMARY KEY AUTOINCREMENT, " + "card_key TEXT NOT NULL, " + "name TEXT NOT NULL, " + "is_night BOOLEAN DEFAULT 0, " + "FOREIGN KEY(card_key) REFERENCES card_details(registry_key) ON DELETE CASCADE" + ")" + ) + + # Indexes + conn.execute( + "CREATE INDEX IF NOT EXISTS idx_card_keywords_card_key ON card_keywords(card_key)" + ) + conn.execute( + "CREATE INDEX IF NOT EXISTS idx_card_angels_card_key ON card_angels(card_key)" + ) + conn.execute( + "CREATE INDEX IF NOT EXISTS idx_card_demons_card_key ON card_demons(card_key)" + ) + conn.execute( + "CREATE INDEX IF NOT EXISTS idx_card_details_position ON card_details(position)" + ) + + row = conn.execute("SELECT COUNT(1) FROM card_details").fetchone() + count = row[0] if row else 0 + if count == 0: + self._seed_db(conn) + finally: + conn.close() + + def _rebuild_db(self) -> bool: + try: + if self._db_path.exists(): + self._db_path.unlink() + self._ensure_db() + except (OSError, sqlite3.Error): + return False + return True + + def _seed_db(self, conn: sqlite3.Connection) -> None: + registry = self._build_registry() + key_to_position = {key: pos for pos, key in self._position_map.items()} + + details_rows = [] + keyword_rows = [] + angel_rows = [] + demon_rows = [] + + for key, details in registry.items(): + row_data, kws, angels, demons = self._prepare_rows(key, details) + row_data["position"] = key_to_position.get(key) + + details_rows.append( + ( + key, + row_data["position"], + row_data["explanation_summary"], + row_data["explanation_waite"], + row_data["interpretation"], + row_data["guidance"], + row_data["date_start_month"], + row_data["date_start_day"], + row_data["date_end_month"], + row_data["date_end_day"], + row_data["degrees"], + row_data["extra_json"], + ) + ) + + for kw in kws: + keyword_rows.append((key, kw["keyword"], kw["is_reversed"])) + + for angel in angels: + angel_rows.append((key, angel["name"], angel["is_night"])) + + for demon in demons: + demon_rows.append((key, demon["name"], demon["is_night"])) + + conn.executemany( + "INSERT OR REPLACE INTO card_details (" + "registry_key, position, explanation_summary, explanation_waite, " + "interpretation, guidance, date_start_month, date_start_day, " + "date_end_month, date_end_day, degrees, extra_json" + ") VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", + details_rows, + ) + + conn.executemany( + "INSERT INTO card_keywords (card_key, keyword, is_reversed) VALUES (?, ?, ?)", + keyword_rows, + ) + + conn.executemany( + "INSERT INTO card_angels (card_key, name, is_night) VALUES (?, ?, ?)", + angel_rows, + ) + + conn.executemany( + "INSERT INTO card_demons (card_key, name, is_night) VALUES (?, ?, ?)", + demon_rows, + ) + + conn.commit() + + def _prepare_rows( + self, key: str, details: Dict[str, Any] + ) -> Tuple[Dict[str, Any], List[Dict[str, Any]], List[Dict[str, Any]], List[Dict[str, Any]]]: + explanation = details.get("explanation") or {} + summary = explanation.get("summary") if isinstance(explanation, dict) else None + waite = explanation.get("waite") if isinstance(explanation, dict) else None + + extra_explanation = {} + if isinstance(explanation, dict): + extra_explanation = { + k: v for k, v in explanation.items() if k not in ("summary", "waite") + } + + keywords = details.get("keywords") or [] + reversed_keywords = details.get("reversed_keywords") or [] + + date_start_month = None + date_start_day = None + date_end_month = None + date_end_day = None + date_range = details.get("date_range") + if isinstance(date_range, tuple) and len(date_range) == 2: + start, end = date_range + if ( + isinstance(start, tuple) + and isinstance(end, tuple) + and len(start) == 2 + and len(end) == 2 + ): + date_start_month, date_start_day = start + date_end_month, date_end_day = end + + extras = {k: v for k, v in details.items() if k not in self._DETAIL_KEYS} + if extra_explanation: + extras["explanation"] = extra_explanation + + row = { + "explanation_summary": summary, + "explanation_waite": waite, + "interpretation": details.get("interpretation") or None, + "guidance": details.get("guidance") or None, + "date_start_month": date_start_month, + "date_start_day": date_start_day, + "date_end_month": date_end_month, + "date_end_day": date_end_day, + "degrees": details.get("degrees") or None, + "extra_json": json.dumps(self._to_json_compatible(extras)) + if extras + else None, + } + + kws_list = [] + for k in keywords: + kws_list.append({"keyword": k, "is_reversed": False}) + for k in reversed_keywords: + kws_list.append({"keyword": k, "is_reversed": True}) + + angels_list = [] + if details.get("day_angel"): + angels_list.append({"name": details["day_angel"], "is_night": False}) + if details.get("night_angel"): + angels_list.append({"name": details["night_angel"], "is_night": True}) + + demons_list = [] + if details.get("day_demon"): + demons_list.append({"name": details["day_demon"], "is_night": False}) + if details.get("night_demon"): + demons_list.append({"name": details["night_demon"], "is_night": True}) + + return row, kws_list, angels_list, demons_list + + def _details_to_row(self, details: Dict[str, Any]) -> Dict[str, Any]: + """Deprecated internal method, kept incase of direct calls but _prepare_rows is used now.""" + row, _ = self._prepare_rows("", details) + return row + + def _restore_details(self, details: Dict[str, Any]) -> Dict[str, Any]: + date_range = details.get("date_range") + if isinstance(date_range, list) and len(date_range) == 2: + start, end = date_range + if ( + isinstance(start, list) + and isinstance(end, list) + and len(start) == 2 + and len(end) == 2 + ): + details["date_range"] = (tuple(start), tuple(end)) + return details + + @classmethod + def _to_json_compatible(cls, value: Any) -> Any: + if isinstance(value, dict): + return {k: cls._to_json_compatible(v) for k, v in value.items()} + if isinstance(value, tuple): + return [cls._to_json_compatible(v) for v in value] + if isinstance(value, list): + return [cls._to_json_compatible(v) for v in value] + return value @staticmethod def key_to_roman(key: int) -> str: @@ -668,6 +1118,14 @@ class CardDetailsRegistry: "keywords": [], "reversed_keywords": [], "guidance": "", + # Date range: month/day (no year) — (start_month, start_day) to (end_month, end_day) + "date_range": ((6, 21), (7, 1)), + # Optional fields (can be populated later): + "degrees": 0-10, + "day_angel": "EIAEL", + "night_angel": "HABUIAH", + "day_demon": "Buer", + "night_demon": "Bifrons", }, "Three of Cups": { "explanation": { @@ -1152,8 +1610,74 @@ class CardDetailsRegistry: card.reversed_keywords = details.get("reversed_keywords", []) card.guidance = details.get("guidance", "") + # Optional temporal/astrological correspondences for minor cards + card.date_range = details.get("date_range") + card.degrees = details.get("degrees") + card.day_angel = details.get("day_angel") + card.night_angel = details.get("night_angel") + card.day_demon = details.get("day_demon") + card.night_demon = details.get("night_demon") + return True def __getitem__(self, card_name: str) -> Optional[Dict[str, Any]]: """Allow dict-like access: registry['Princess of Swords']""" return self.get(card_name) + + def find_positions_by_month_day(self, month: int, day: int) -> list: + """Return list of registry positions whose date_range includes the month/day. + + Args: + month: Month (1-12) + day: Day (1-31) + + Returns: + List of integer positions (1-78) matching the date + """ + from utils.dates import is_month_day_in_range + + matches: list[int] = [] + for pos, key in self._position_map.items(): + details = self._details.get(key) + if not details: + continue + dr = details.get("date_range") + if not dr: + continue + start, end = dr + try: + if is_month_day_in_range(month, day, start, end): + matches.append(pos) + except ValueError: + # Invalid date in registry or input; ignore + continue + return matches + + def find_cards_by_month_day(self, month: int, day: int, load_details: bool = True) -> list: + """Return a list of Card objects whose date_range includes month/day. + + If load_details is True, registry details will be loaded into returned Card objects. + """ + from tarot.deck import Deck + from .loader import load_card_details + + deck = Deck() + positions = self.find_positions_by_month_day(month, day) + cards = [] + for pos in positions: + # Positions are 1-indexed; deck.cards is list + try: + card = deck.cards[pos - 1] + except Exception: + continue + if load_details: + load_card_details(card, self) + cards.append(card) + return cards + + def find_cards_by_date_str(self, date_str: str, load_details: bool = True) -> list: + """Parse a date string and return matching Card objects.""" + from utils.dates import parse_month_day + + month, day = parse_month_day(date_str) + return self.find_cards_by_month_day(month, day, load_details) diff --git a/src/tarot/db_import.py b/src/tarot/db_import.py new file mode 100644 index 0000000..898a978 --- /dev/null +++ b/src/tarot/db_import.py @@ -0,0 +1,297 @@ +"""Helpers to import CSV/Excel data into the project's SQLite DB. + +This module provides a small, dependency-light importer that supports CSV out of the +box and Excel (.xlsx) when ``openpyxl`` is installed. It intentionally keeps the +mapping simple (header-driven) and provides safe deduplication for normalized +keyword/angel/demon tables. +""" +from __future__ import annotations + +import csv +import sqlite3 +from dataclasses import dataclass +from pathlib import Path +from typing import Dict, Iterable, List, Optional, Tuple + +try: + from openpyxl import load_workbook # type: ignore +except Exception: # pragma: no cover - optional dependency + load_workbook = None # type: ignore + +from tarot.card.details import CardDetailsRegistry + + +# Simple helpers ----------------------------------------------------------- + +def _read_csv(path: Path, delimiter: str = ",") -> Iterable[Dict[str, str]]: + with path.open("r", encoding="utf-8-sig", newline="") as fh: + reader = csv.DictReader(fh, delimiter=delimiter) + for row in reader: + yield {k.strip(): (v.strip() if isinstance(v, str) else v) for k, v in row.items()} + + +def _read_excel(path: Path, sheet: Optional[str] = None) -> Iterable[Dict[str, str]]: + if load_workbook is None: + raise RuntimeError("openpyxl is required to import Excel files - pip install openpyxl") + wb = load_workbook(path, read_only=True, data_only=True) + ws = wb[sheet] if sheet and sheet in wb.sheetnames else wb[wb.sheetnames[0]] + it = ws.iter_rows(values_only=True) + try: + headers = next(it) + except StopIteration: + return + headers = [h.strip() if isinstance(h, str) else str(h) for h in headers] + for row in it: + yield {str(h).strip(): (str(v).strip() if v is not None else "") for h, v in zip(headers, row)} + + +def _get_conn(db_path: Optional[Path]) -> sqlite3.Connection: + if db_path is None: + reg = CardDetailsRegistry(use_db=True) + db_path = reg._db_path # use the same resolution logic + else: + # Ensure DB is seeded if missing + CardDetailsRegistry(use_db=True, db_path=db_path) + conn = sqlite3.connect(db_path) + conn.row_factory = sqlite3.Row + return conn + + +def _coerce_bool(val: Optional[str]) -> int: + if val is None: + return 0 + if isinstance(val, bool): + return 1 if val else 0 + s = str(val).strip().lower() + return 1 if s in {"1", "true", "yes", "y", "t"} else 0 + + +def _parse_date_to_parts(val: Optional[str]) -> Optional[Tuple[int, int]]: + if not val: + return None + s = str(val).strip() + if not s: + return None + # Accept formats: M/D, MM/DD, YYYY-MM-DD + if "/" in s: + parts = s.split("/") + if len(parts) >= 2: + try: + m = int(parts[0]) + d = int(parts[1]) + return (m, d) + except Exception: + return None + if "-" in s: + parts = s.split("-") + try: + m = int(parts[1]) + d = int(parts[2]) + return (m, d) + except Exception: + return None + return None + + +# Table-specific importers ----------------------------------------------- + +@dataclass +class ImportResult: + inserted: int = 0 + skipped: int = 0 + updated: int = 0 + + +def import_keywords(rows: Iterable[Dict[str, str]], db_path: Optional[Path] = None) -> ImportResult: + conn = _get_conn(db_path) + cur = conn.cursor() + res = ImportResult() + try: + for row in rows: + card_key = row.get("card_key") or row.get("registry_key") or row.get("card") + keyword = row.get("keyword") or row.get("kw") + is_rev = _coerce_bool(row.get("is_reversed") or row.get("reversed")) + if not card_key or not keyword: + res.skipped += 1 + continue + # deduplicate + exists = cur.execute( + "SELECT 1 FROM card_keywords WHERE card_key=? AND keyword=? AND is_reversed=?", + (card_key, keyword, is_rev), + ).fetchone() + if exists: + res.skipped += 1 + continue + cur.execute( + "INSERT INTO card_keywords (card_key, keyword, is_reversed) VALUES (?, ?, ?)", + (card_key, keyword, is_rev), + ) + res.inserted += 1 + conn.commit() + finally: + conn.close() + return res + + +def import_angels(rows: Iterable[Dict[str, str]], db_path: Optional[Path] = None) -> ImportResult: + conn = _get_conn(db_path) + cur = conn.cursor() + res = ImportResult() + try: + for row in rows: + card_key = row.get("card_key") or row.get("registry_key") or row.get("card") + name = row.get("name") or row.get("angel") + is_night = _coerce_bool(row.get("is_night") or row.get("night") or row.get("isNight")) + if not card_key or not name: + res.skipped += 1 + continue + exists = cur.execute( + "SELECT 1 FROM card_angels WHERE card_key=? AND name=? AND is_night=?", + (card_key, name, is_night), + ).fetchone() + if exists: + res.skipped += 1 + continue + cur.execute( + "INSERT INTO card_angels (card_key, name, is_night) VALUES (?, ?, ?)", + (card_key, name, is_night), + ) + res.inserted += 1 + conn.commit() + finally: + conn.close() + return res + + +def import_demons(rows: Iterable[Dict[str, str]], db_path: Optional[Path] = None) -> ImportResult: + conn = _get_conn(db_path) + cur = conn.cursor() + res = ImportResult() + try: + for row in rows: + card_key = row.get("card_key") or row.get("registry_key") or row.get("card") + name = row.get("name") or row.get("demon") + is_night = _coerce_bool(row.get("is_night") or row.get("night") or row.get("isNight")) + if not card_key or not name: + res.skipped += 1 + continue + exists = cur.execute( + "SELECT 1 FROM card_demons WHERE card_key=? AND name=? AND is_night=?", + (card_key, name, is_night), + ).fetchone() + if exists: + res.skipped += 1 + continue + cur.execute( + "INSERT INTO card_demons (card_key, name, is_night) VALUES (?, ?, ?)", + (card_key, name, is_night), + ) + res.inserted += 1 + conn.commit() + finally: + conn.close() + return res + + +def import_card_details(rows: Iterable[Dict[str, str]], db_path: Optional[Path] = None) -> ImportResult: + conn = _get_conn(db_path) + cur = conn.cursor() + res = ImportResult() + allowed = { + "registry_key", + "explanation_summary", + "explanation_waite", + "interpretation", + "guidance", + "date_start", + "date_end", + "date_start_month", + "date_start_day", + "date_end_month", + "date_end_day", + "degrees", + "extra_json", + } + try: + for row in rows: + if not row: + res.skipped += 1 + continue + reg_key = row.get("registry_key") or row.get("card_key") or row.get("card") + if not reg_key: + res.skipped += 1 + continue + # Process date short forms + ds = _parse_date_to_parts(row.get("date_start")) + de = _parse_date_to_parts(row.get("date_end")) + if ds and de: + row["date_start_month"], row["date_start_day"] = ds + row["date_end_month"], row["date_end_day"] = de + + # Build columns/values to upsert + cols = ["registry_key"] + vals = [reg_key] + up_cols = [] + up_vals = [] + for k, v in row.items(): + if k == "registry_key": + continue + if k not in allowed: + continue + if v is None or v == "": + continue + cols.append(k) + vals.append(v) + up_cols.append(f"{k}=excluded.{k}") + placeholders = ",".join("?" for _ in vals) + sql = ( + f"INSERT INTO card_details ({','.join(cols)}) VALUES ({placeholders}) " + f"ON CONFLICT(registry_key) DO UPDATE SET {', '.join(up_cols)}" + ) + cur.execute(sql, tuple(vals)) + # rough heuristic: treat as inserted if no row existed + if cur.rowcount > 0: + res.inserted += 1 + else: + res.updated += 1 + conn.commit() + finally: + conn.close() + return res + + +# Public helper ----------------------------------------------------------- + +def import_file( + path: Path, + table: str, + *, + db_path: Optional[Path] = None, + sheet: Optional[str] = None, + delimiter: str = ",", +) -> ImportResult: + """Import a CSV or Excel file into one of the supported tables. + + Supported tables: ``card_keywords``, ``card_angels``, ``card_demons``, ``card_details``. + + The importer is header-driven and will look for reasonable alternate column + names (e.g., ``card_key`` / ``card`` / ``registry_key``). + """ + if not path.exists(): + raise FileNotFoundError(path) + suffix = path.suffix.lower() + if suffix in {".xls", ".xlsx"}: + rows = _read_excel(path, sheet) + else: + rows = _read_csv(path, delimiter) + + table = table.lower() + if table == "card_keywords": + return import_keywords(rows, db_path) + if table == "card_angels": + return import_angels(rows, db_path) + if table == "card_demons": + return import_demons(rows, db_path) + if table == "card_details": + return import_card_details(rows, db_path) + raise ValueError(f"Unsupported table: {table}") diff --git a/src/tarot/deck/deck.py b/src/tarot/deck/deck.py index c72681f..dee9585 100644 --- a/src/tarot/deck/deck.py +++ b/src/tarot/deck/deck.py @@ -73,6 +73,17 @@ class Card: # Image path for custom deck images image_path: Optional[str] = None + # Optional astrological/temporal correspondences (used primarily for Minor Arcana) + # - date_range: ((start_month, start_day), (end_month, end_day)) + # - degrees: free-form degree information (string) + # - day/night angels and demons + date_range: Optional[Tuple[Tuple[int, int], Tuple[int, int]]] = None + degrees: Optional[str] = None + day_angel: Optional[str] = None + night_angel: Optional[str] = None + day_demon: Optional[str] = None + night_demon: Optional[str] = None + def __str__(self) -> str: return f"{self.number}. {self.name}" diff --git a/src/utils/dates.py b/src/utils/dates.py new file mode 100644 index 0000000..93d9f5a --- /dev/null +++ b/src/utils/dates.py @@ -0,0 +1,139 @@ +""" +Date utilities for month/day ranges (no year). + +Provides parsing and range-checking functions for month/day pairs +(e.g., (6,21) meaning June 21). + +Functions: +- parse_month_day: parse '6/21', '06-21', '2026-06-21', 'June 21', 'Jun 21' +- is_month_day_in_range: test whether a month/day lies within a start/end pair +- month_day_to_day_of_year: convert a month/day to day-of-year for comparisons +- format_month_day: display nicely + +Note: Uses a fixed non-leap reference year for day-of-year calculations. +""" + +import re +import calendar +from datetime import date +from typing import Tuple + +# Reference non-leap year used for day-of-year calculations +_REFERENCE_YEAR = 2001 + +_MONTH_NAME_MAP = { + 'jan': 1, + 'january': 1, + 'feb': 2, + 'february': 2, + 'mar': 3, + 'march': 3, + 'apr': 4, + 'april': 4, + 'may': 5, + 'jun': 6, + 'june': 6, + 'jul': 7, + 'july': 7, + 'aug': 8, + 'august': 8, + 'sep': 9, + 'sept': 9, + 'september': 9, + 'oct': 10, + 'october': 10, + 'nov': 11, + 'november': 11, + 'dec': 12, + 'december': 12, +} + + +def month_day_to_day_of_year(month: int, day: int) -> int: + """Convert (month, day) to day-of-year using a non-leap reference year.""" + try: + dt = date(_REFERENCE_YEAR, month, day) + except ValueError as e: + raise ValueError(f"Invalid month/day: {month}/{day}: {e}") + return dt.timetuple().tm_yday + + +def is_month_day_in_range( + month: int, day: int, start: Tuple[int, int], end: Tuple[int, int] +) -> bool: + """Return True if (month, day) is within the inclusive range start->end. + + Handles ranges that cross year boundary (e.g., Dec 20 -> Jan 5). + """ + day_num = month_day_to_day_of_year(month, day) + start_num = month_day_to_day_of_year(*start) + end_num = month_day_to_day_of_year(*end) + + if start_num <= end_num: + return start_num <= day_num <= end_num + # Wrap-around + return day_num >= start_num or day_num <= end_num + + +def parse_month_day(s: str) -> Tuple[int, int]: + """Parse a date string and return (month, day). + + Accepted formats: + - MM/DD or M/D + - MM-DD + - YYYY-MM-DD or YYYY/MM/DD (year ignored) + - MonthName D or Mon D (e.g., 'June 21' or 'Jun 21') + + Raises ValueError if cannot parse or the date is invalid. + """ + if not s or not isinstance(s, str): + raise ValueError("Empty date string") + + s = s.strip() + + # Numeric forms: MM/DD or MM-DD + m = re.match(r"^(?P\d{1,2})\s*[/-]\s*(?P\d{1,2})$", s) + if m: + month = int(m.group("m")) + day = int(m.group("d")) + # validate + month_day_to_day_of_year(month, day) + return month, day + + # ISO with year: YYYY-MM-DD or YYYY/MM/DD + m = re.match(r"^(?P\d{4})[/-](?P\d{1,2})[/-](?P\d{1,2})$", s) + if m: + month = int(m.group("m")) + day = int(m.group("d")) + month_day_to_day_of_year(month, day) + return month, day + + # Month name forms: 'June 21' or '21 June' + parts = re.split(r"[ ,]+", s) + if len(parts) == 2: + a, b = parts + # If a is month name + a_low = a.lower() + if a_low in _MONTH_NAME_MAP: + month = _MONTH_NAME_MAP[a_low] + day = int(b) + month_day_to_day_of_year(month, day) + return month, day + # If b is month name (e.g., '21 June') + b_low = b.lower() + if b_low in _MONTH_NAME_MAP: + month = _MONTH_NAME_MAP[b_low] + day = int(a) + month_day_to_day_of_year(month, day) + return month, day + + raise ValueError(f"Unrecognized date format: '{s}'") + + +def format_month_day(month: int, day: int) -> str: + """Return a short human-friendly representation like 'Jun 21'.""" + if not (1 <= month <= 12): + raise ValueError("month must be 1..12") + # Use abbreviated month name + month_name = calendar.month_abbr[month] + return f"{month_name} {day}" \ No newline at end of file diff --git a/src/utils/filter.py b/src/utils/filter.py index cc096f6..893d183 100644 --- a/src/utils/filter.py +++ b/src/utils/filter.py @@ -85,6 +85,64 @@ def _matches_filter(obj: Any, key: str, value: Any) -> bool: """ attr_value = getattr(obj, key, None) + # Special handling for date filters: accept strings like '6/21', 'Jun 21', + # ISO dates (YYYY-MM-DD), or tuples like (6, 21). The object's `date_range` + # (if present) will be checked; if not, fall back to registry lookup by + # numeric position when available (e.g., Card.number). + if key == "date": + from utils.dates import parse_month_day, is_month_day_in_range + + # Normalize the provided value(s) into a list of checks + values_to_check = [] + if isinstance(value, str) and "," in value: + values_to_check = [v.strip() for v in value.split(",")] + elif isinstance(value, (list, tuple)): + # Could be a single tuple (6,21) or list of such tuples/strings + if len(value) and isinstance(value[0], (list, tuple)): + values_to_check = list(value) + else: + values_to_check = [value] + else: + values_to_check = [value] + + # Obtain the object's date_range, fallback to registry if absent + dr = attr_value + if dr is None: + obj_num = getattr(obj, "number", None) + if obj_num is not None: + try: + from tarot.card.details import CardDetailsRegistry + + registry = CardDetailsRegistry() + details = registry.get_by_position(obj_num) + if details is not None: + dr = details.get("date_range") + except Exception: + dr = None + + if dr is None: + return False + + start, end = dr + + for check_value in values_to_check: + try: + if isinstance(check_value, (list, tuple)): + month, day = int(check_value[0]), int(check_value[1]) + elif isinstance(check_value, str): + month, day = parse_month_day(check_value) + else: + # Fallback: try to interpret as sequence + month, day = int(check_value[0]), int(check_value[1]) + + if is_month_day_in_range(month, day, start, end): + return True + except Exception: + # Ignore parse/validation errors per individual check + continue + + return False + if attr_value is None: return False diff --git a/tarot.db b/tarot.db new file mode 100644 index 0000000..2223df3 Binary files /dev/null and b/tarot.db differ