k
This commit is contained in:
300
src/utils/query.py
Normal file
300
src/utils/query.py
Normal file
@@ -0,0 +1,300 @@
|
||||
"""
|
||||
Generic query builder for fluent, chainable access to tarot data.
|
||||
|
||||
Provides Query and QueryResult classes for building filters and accessing data
|
||||
in a dynamic, expressive way.
|
||||
|
||||
Usage:
|
||||
# By name
|
||||
result = letter.iching().name('peace')
|
||||
result = letter.alphabet().name('english')
|
||||
|
||||
# By filter expressions
|
||||
result = letter.iching().filter('number:1')
|
||||
result = letter.alphabet().filter('name:hebrew')
|
||||
result = number.number().filter('value:5')
|
||||
|
||||
# Get all results
|
||||
results = letter.iching().all() # Dict[int, Hexagram]
|
||||
results = letter.iching().list() # List[Hexagram]
|
||||
"""
|
||||
|
||||
from typing import Any, Callable, Dict, Generic, List, Optional, TypeVar, Union
|
||||
from utils.object_formatting import format_value, get_object_attributes, is_nested_object
|
||||
|
||||
T = TypeVar('T')
|
||||
|
||||
|
||||
class QueryResult:
|
||||
"""Single result from a query."""
|
||||
|
||||
def __init__(self, data: Any) -> None:
|
||||
self.data = data
|
||||
|
||||
def __repr__(self) -> str:
|
||||
if hasattr(self.data, '__repr__'):
|
||||
return repr(self.data)
|
||||
return f"{self.__class__.__name__}({self.data})"
|
||||
|
||||
def __str__(self) -> str:
|
||||
if hasattr(self.data, '__str__'):
|
||||
return str(self.data)
|
||||
return repr(self)
|
||||
|
||||
def __getattr__(self, name: str) -> Any:
|
||||
"""Pass through attribute access to the wrapped data."""
|
||||
if name.startswith('_'):
|
||||
raise AttributeError(f"'{self.__class__.__name__}' object has no attribute '{name}'")
|
||||
return getattr(self.data, name)
|
||||
|
||||
|
||||
class Query:
|
||||
"""
|
||||
Fluent query builder for accessing and filtering tarot data.
|
||||
|
||||
Supports chaining: .filter() → .name() → .get()
|
||||
"""
|
||||
|
||||
def __init__(self, data: Union[Dict[Any, T], List[T]]) -> None:
|
||||
"""Initialize with data source (dict or list)."""
|
||||
self._original_data = data
|
||||
self._data = data if isinstance(data, list) else list(data.values())
|
||||
self._filters: List[Callable[[T], bool]] = []
|
||||
|
||||
def filter(self, expression: str) -> 'Query':
|
||||
"""
|
||||
Filter by key:value expression.
|
||||
|
||||
Examples:
|
||||
.filter('name:peace')
|
||||
.filter('number:1')
|
||||
.filter('sephera:gevurah')
|
||||
.filter('value:5')
|
||||
|
||||
Supports multiple filters by chaining:
|
||||
.filter('number:1').filter('name:creative')
|
||||
"""
|
||||
key, value = expression.split(':', 1) if ':' in expression else (expression, '')
|
||||
|
||||
def filter_func(item: T) -> bool:
|
||||
# Special handling for 'name' key
|
||||
if key == 'name':
|
||||
if hasattr(item, 'name'):
|
||||
value_lower = value.lower()
|
||||
item_name = str(item.name).lower()
|
||||
return value_lower == item_name or value_lower in item_name
|
||||
return False
|
||||
|
||||
if not hasattr(item, key):
|
||||
return False
|
||||
item_value = getattr(item, key)
|
||||
# Flexible comparison
|
||||
if isinstance(item_value, str):
|
||||
return value.lower() in str(item_value).lower()
|
||||
else:
|
||||
return str(value) in str(item_value)
|
||||
|
||||
self._filters.append(filter_func)
|
||||
return self
|
||||
|
||||
def name(self, value: str) -> Optional['QueryResult']:
|
||||
"""
|
||||
Deprecated: Use .filter('name:value') instead.
|
||||
|
||||
Find item by name (exact or partial match, case-insensitive).
|
||||
Returns QueryResult wrapping the found item, or None if not found.
|
||||
"""
|
||||
return self.filter(f'name:{value}').first()
|
||||
|
||||
def get(self) -> Optional['QueryResult']:
|
||||
"""
|
||||
Get first result matching all applied filters.
|
||||
|
||||
Returns QueryResult or None if no match.
|
||||
"""
|
||||
for item in self._data:
|
||||
if all(f(item) for f in self._filters):
|
||||
return QueryResult(item)
|
||||
return None
|
||||
|
||||
def all(self) -> Dict[Any, T]:
|
||||
"""
|
||||
Get all results matching filters as dict.
|
||||
|
||||
Returns original dict structure (if input was dict) with filtered values.
|
||||
"""
|
||||
filtered = {}
|
||||
if isinstance(self._original_data, dict):
|
||||
for key, item in self._original_data.items():
|
||||
if all(f(item) for f in self._filters):
|
||||
filtered[key] = item
|
||||
else:
|
||||
for i, item in enumerate(self._data):
|
||||
if all(f(item) for f in self._filters):
|
||||
filtered[i] = item
|
||||
return filtered
|
||||
|
||||
def list(self) -> List[T]:
|
||||
"""
|
||||
Get all results matching filters as list.
|
||||
|
||||
Returns list of filtered items.
|
||||
"""
|
||||
return [item for item in self._data if all(f(item) for f in self._filters)]
|
||||
|
||||
def first(self) -> Optional['QueryResult']:
|
||||
"""Alias for get() - returns first matching item."""
|
||||
return self.get()
|
||||
|
||||
def count(self) -> int:
|
||||
"""Count items matching all filters."""
|
||||
return len(self.list())
|
||||
|
||||
def __repr__(self) -> str:
|
||||
return f"Query({self.count()} items)"
|
||||
|
||||
def __str__(self) -> str:
|
||||
items = self.list()
|
||||
if not items:
|
||||
return "Query(0 items)"
|
||||
return f"Query({self.count()} items): {items[0]}"
|
||||
|
||||
|
||||
class CollectionAccessor(Generic[T]):
|
||||
"""Context-aware accessor that provides a scoped .query() interface."""
|
||||
|
||||
def __init__(self, data_provider: Callable[[], Union[Dict[Any, T], List[T]]]) -> None:
|
||||
self._data_provider = data_provider
|
||||
|
||||
def _new_query(self) -> Query:
|
||||
data = self._data_provider()
|
||||
return Query(data)
|
||||
|
||||
def query(self) -> Query:
|
||||
"""Return a new Query scoped to this collection."""
|
||||
return self._new_query()
|
||||
|
||||
def filter(self, expression: str) -> Query:
|
||||
"""Shortcut to run a filtered query."""
|
||||
return self.query().filter(expression)
|
||||
|
||||
def name(self, value: str) -> Optional[QueryResult]:
|
||||
"""Shortcut to look up by name."""
|
||||
return self.query().name(value)
|
||||
|
||||
def all(self) -> Dict[Any, T]:
|
||||
"""Return all entries (optionally filtered)."""
|
||||
return self.query().all()
|
||||
|
||||
def list(self) -> List[T]:
|
||||
"""Return all entries as a list."""
|
||||
return self.query().list()
|
||||
|
||||
def first(self) -> Optional[QueryResult]:
|
||||
"""Return first matching entry."""
|
||||
return self.query().first()
|
||||
|
||||
def count(self) -> int:
|
||||
"""Count entries for this collection."""
|
||||
return self.query().count()
|
||||
|
||||
def display(self) -> str:
|
||||
"""
|
||||
Format all entries for user-friendly display with proper indentation.
|
||||
|
||||
Returns a formatted string with each item separated by blank lines.
|
||||
Nested objects are indented and separated with their own sections.
|
||||
"""
|
||||
from utils.object_formatting import is_nested_object, get_object_attributes
|
||||
|
||||
data = self.all()
|
||||
if not data:
|
||||
return "(empty collection)"
|
||||
|
||||
lines = []
|
||||
for key, item in data.items():
|
||||
lines.append(f"--- {key} ---")
|
||||
|
||||
# Format all attributes with proper nesting
|
||||
for attr_name, attr_value in get_object_attributes(item):
|
||||
if is_nested_object(attr_value):
|
||||
lines.append(f" {attr_name}:")
|
||||
lines.append(f" --- {attr_name.replace('_', ' ').title()} ---")
|
||||
nested = format_value(attr_value, indent=4)
|
||||
lines.append(nested)
|
||||
else:
|
||||
lines.append(f" {attr_name}: {attr_value}")
|
||||
|
||||
lines.append("") # Blank line between items
|
||||
|
||||
return "\n".join(lines)
|
||||
|
||||
def __repr__(self) -> str:
|
||||
"""Full detailed representation."""
|
||||
return self.display()
|
||||
|
||||
def __str__(self) -> str:
|
||||
"""Full detailed string representation."""
|
||||
return self.display()
|
||||
|
||||
|
||||
class FilterableDict(dict):
|
||||
"""Dict subclass that provides .filter() method for dynamic querying."""
|
||||
|
||||
def filter(self, expression: str = '') -> Query:
|
||||
"""
|
||||
Filter dict values by attribute:value expression.
|
||||
|
||||
Examples:
|
||||
data.filter('name:peace')
|
||||
data.filter('number:1')
|
||||
data.filter('') # Returns query of all items
|
||||
"""
|
||||
return Query(self).filter(expression) if expression else Query(self)
|
||||
|
||||
def display(self) -> str:
|
||||
"""
|
||||
Format all items in the dict for user-friendly display.
|
||||
|
||||
Returns a formatted string with each item separated by blank lines.
|
||||
Nested objects are indented and separated with their own sections.
|
||||
"""
|
||||
from utils.object_formatting import is_nested_object, get_object_attributes, format_value
|
||||
|
||||
if not self:
|
||||
return "(empty collection)"
|
||||
|
||||
lines = []
|
||||
for key, item in self.items():
|
||||
lines.append(f"--- {key} ---")
|
||||
|
||||
# Format all attributes with proper nesting
|
||||
for attr_name, attr_value in get_object_attributes(item):
|
||||
if is_nested_object(attr_value):
|
||||
lines.append(f" {attr_name}:")
|
||||
lines.append(f" --- {attr_name.replace('_', ' ').title()} ---")
|
||||
nested = format_value(attr_value, indent=4)
|
||||
lines.append(nested)
|
||||
else:
|
||||
lines.append(f" {attr_name}: {attr_value}")
|
||||
|
||||
lines.append("") # Blank line between items
|
||||
|
||||
return "\n".join(lines)
|
||||
|
||||
|
||||
def make_filterable(data: Union[Dict[Any, T], List[T]]) -> Union['FilterableDict', Query]:
|
||||
"""
|
||||
Convert dict or list to a filterable object with .filter() support.
|
||||
|
||||
Examples:
|
||||
walls = make_filterable(Cube.wall())
|
||||
peace = walls.filter('name:North').first()
|
||||
"""
|
||||
if isinstance(data, dict):
|
||||
# Create a FilterableDict from the regular dict
|
||||
filterable = FilterableDict(data)
|
||||
return filterable
|
||||
else:
|
||||
# For lists, wrap in a Query
|
||||
return Query(data)
|
||||
Reference in New Issue
Block a user