126 lines
3.6 KiB
Python
126 lines
3.6 KiB
Python
from __future__ import annotations
|
|
|
|
from dataclasses import dataclass
|
|
from typing import Any, Dict, List, Sequence
|
|
import sys
|
|
|
|
import pipeline as ctx
|
|
from . import _shared as sh
|
|
|
|
Cmdlet, SharedArgs, parse_cmdlet_args, get_field, normalize_hash = (
|
|
sh.Cmdlet,
|
|
sh.SharedArgs,
|
|
sh.parse_cmdlet_args,
|
|
sh.get_field,
|
|
sh.normalize_hash,
|
|
)
|
|
from SYS.logger import log
|
|
from Store import Store
|
|
|
|
|
|
@dataclass
|
|
class UrlItem:
|
|
url: str
|
|
hash: str
|
|
store: str
|
|
|
|
|
|
class Get_Url(Cmdlet):
|
|
"""Get url associated with files via hash+store."""
|
|
|
|
def __init__(self) -> None:
|
|
super().__init__(
|
|
name="get-url",
|
|
summary="List url associated with a file",
|
|
usage="@1 | get-url",
|
|
arg=[
|
|
SharedArgs.QUERY,
|
|
SharedArgs.STORE,
|
|
],
|
|
detail=[
|
|
"- Lists all url associated with file identified by hash+store",
|
|
],
|
|
exec=self.run,
|
|
)
|
|
self.register()
|
|
|
|
def run(self, result: Any, args: Sequence[str], config: Dict[str, Any]) -> int:
|
|
"""Get url for file via hash+store backend."""
|
|
parsed = parse_cmdlet_args(args, self)
|
|
|
|
query_hash = sh.parse_single_hash_query(parsed.get("query"))
|
|
if parsed.get("query") and not query_hash:
|
|
log("Error: -query must be of the form hash:<sha256>")
|
|
return 1
|
|
|
|
# Extract hash and store from result or args
|
|
file_hash = query_hash or get_field(result, "hash")
|
|
store_name = parsed.get("store") or get_field(result, "store")
|
|
|
|
if not file_hash:
|
|
log("Error: No file hash provided (pipe an item or use -query \"hash:<sha256>\")")
|
|
return 1
|
|
|
|
if not store_name:
|
|
log("Error: No store name provided")
|
|
return 1
|
|
|
|
# Normalize hash
|
|
file_hash = normalize_hash(file_hash)
|
|
if not file_hash:
|
|
log("Error: Invalid hash format")
|
|
return 1
|
|
|
|
# Get backend and retrieve url
|
|
try:
|
|
storage = Store(config)
|
|
backend = storage[store_name]
|
|
|
|
urls = backend.get_url(file_hash)
|
|
|
|
from result_table import ResultTable
|
|
|
|
title = str(get_field(result, "title") or "").strip()
|
|
table_title = "Title"
|
|
if title:
|
|
table_title = f"Title: {title}"
|
|
|
|
table = (
|
|
ResultTable(table_title, max_columns=1)
|
|
.set_preserve_order(True)
|
|
.set_table("url")
|
|
.set_value_case("preserve")
|
|
)
|
|
table.set_source_command("get-url", [])
|
|
|
|
items: List[UrlItem] = []
|
|
for u in list(urls or []):
|
|
u = str(u or "").strip()
|
|
if not u:
|
|
continue
|
|
row = table.add_row()
|
|
row.add_column("Url", u)
|
|
item = UrlItem(url=u, hash=file_hash, store=str(store_name))
|
|
items.append(item)
|
|
ctx.emit(item)
|
|
|
|
# Make this a real result table so @.. / @,, can navigate it
|
|
ctx.set_last_result_table(table if items else None, items, subject=result)
|
|
|
|
if not items:
|
|
log("No url found", file=sys.stderr)
|
|
|
|
return 0
|
|
|
|
except KeyError:
|
|
log(f"Error: Storage backend '{store_name}' not configured")
|
|
return 1
|
|
except Exception as exc:
|
|
log(f"Error retrieving url: {exc}", file=sys.stderr)
|
|
return 1
|
|
|
|
|
|
CMDLET = Get_Url()
|
|
|
|
|