h
This commit is contained in:
135
API/folder.py
135
API/folder.py
@@ -2028,39 +2028,40 @@ class API_folder_store:
|
|||||||
pipe: Optional[str] = None,
|
pipe: Optional[str] = None,
|
||||||
) -> int:
|
) -> int:
|
||||||
"""Insert a new worker entry into the database."""
|
"""Insert a new worker entry into the database."""
|
||||||
try:
|
with self._db_lock:
|
||||||
cursor = self.connection.cursor()
|
try:
|
||||||
cursor.execute(
|
cursor = self.connection.cursor()
|
||||||
"""
|
cursor.execute(
|
||||||
INSERT INTO worker (worker_id, worker_type, pipe, status, title, description, total_steps)
|
"""
|
||||||
VALUES (?, ?, ?, ?, ?, ?, ?)
|
INSERT INTO worker (worker_id, worker_type, pipe, status, title, description, total_steps)
|
||||||
""",
|
VALUES (?, ?, ?, ?, ?, ?, ?)
|
||||||
(
|
""",
|
||||||
worker_id,
|
(
|
||||||
worker_type,
|
worker_id,
|
||||||
pipe,
|
worker_type,
|
||||||
"running",
|
pipe,
|
||||||
title,
|
"running",
|
||||||
description,
|
title,
|
||||||
total_steps
|
description,
|
||||||
),
|
total_steps
|
||||||
)
|
),
|
||||||
worker_rowid = cursor.lastrowid or 0
|
)
|
||||||
|
worker_rowid = cursor.lastrowid or 0
|
||||||
# Prune occasionally (1 in 50 chance) or just run it to keep it clean
|
|
||||||
# Running it every time might be overkill, but let's do a light version
|
# Prune occasionally (1 in 50 chance) or just run it to keep it clean
|
||||||
cursor.execute(
|
# Running it every time might be overkill, but let's do a light version
|
||||||
"DELETE FROM worker WHERE status != 'running' AND id < (SELECT MAX(id) - ? FROM worker)",
|
cursor.execute(
|
||||||
(MAX_FINISHED_WORKERS * 2,)
|
"DELETE FROM worker WHERE status != 'running' AND id < (SELECT MAX(id) - ? FROM worker)",
|
||||||
)
|
(MAX_FINISHED_WORKERS * 2,)
|
||||||
|
)
|
||||||
|
|
||||||
self.connection.commit()
|
self.connection.commit()
|
||||||
return worker_rowid
|
return worker_rowid
|
||||||
except sqlite3.IntegrityError:
|
except sqlite3.IntegrityError:
|
||||||
return self.update_worker_status(worker_id, "running")
|
return self.update_worker_status(worker_id, "running")
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.error(f"Error inserting worker: {e}", exc_info=True)
|
logger.error(f"Error inserting worker: {e}", exc_info=True)
|
||||||
return 0
|
return 0
|
||||||
|
|
||||||
def update_worker(self, worker_id: str, **kwargs) -> bool:
|
def update_worker(self, worker_id: str, **kwargs) -> bool:
|
||||||
"""Update worker entry with given fields."""
|
"""Update worker entry with given fields."""
|
||||||
@@ -2107,40 +2108,41 @@ class API_folder_store:
|
|||||||
|
|
||||||
def update_worker_status(self, worker_id: str, status: str) -> int:
|
def update_worker_status(self, worker_id: str, status: str) -> int:
|
||||||
"""Update worker status and return its database ID."""
|
"""Update worker status and return its database ID."""
|
||||||
try:
|
with self._db_lock:
|
||||||
cursor = self.connection.cursor()
|
try:
|
||||||
|
cursor = self.connection.cursor()
|
||||||
|
|
||||||
if status in ("completed", "error"):
|
if status in ("completed", "error"):
|
||||||
cursor.execute(
|
cursor.execute(
|
||||||
"""
|
"""
|
||||||
UPDATE worker
|
UPDATE worker
|
||||||
SET status = ?, completed_at = CURRENT_TIMESTAMP, last_updated = CURRENT_TIMESTAMP
|
SET status = ?, completed_at = CURRENT_TIMESTAMP, last_updated = CURRENT_TIMESTAMP
|
||||||
WHERE worker_id = ?
|
WHERE worker_id = ?
|
||||||
""",
|
""",
|
||||||
(status,
|
(status,
|
||||||
worker_id),
|
worker_id),
|
||||||
)
|
)
|
||||||
else:
|
else:
|
||||||
cursor.execute(
|
cursor.execute(
|
||||||
"""
|
"""
|
||||||
UPDATE worker
|
UPDATE worker
|
||||||
SET status = ?, last_updated = CURRENT_TIMESTAMP
|
SET status = ?, last_updated = CURRENT_TIMESTAMP
|
||||||
WHERE worker_id = ?
|
WHERE worker_id = ?
|
||||||
""",
|
""",
|
||||||
(status,
|
(status,
|
||||||
worker_id),
|
worker_id),
|
||||||
)
|
)
|
||||||
|
|
||||||
self.connection.commit()
|
self.connection.commit()
|
||||||
|
|
||||||
cursor.execute("SELECT id FROM worker WHERE worker_id = ?",
|
cursor.execute("SELECT id FROM worker WHERE worker_id = ?",
|
||||||
(worker_id,
|
(worker_id,
|
||||||
))
|
))
|
||||||
row = cursor.fetchone()
|
row = cursor.fetchone()
|
||||||
return row[0] if row else 0
|
return row[0] if row else 0
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.error(f"Error updating worker status: {e}", exc_info=True)
|
logger.error(f"Error updating worker status: {e}", exc_info=True)
|
||||||
return 0
|
return 0
|
||||||
|
|
||||||
def get_worker(self, worker_id: str) -> Optional[Dict[str, Any]]:
|
def get_worker(self, worker_id: str) -> Optional[Dict[str, Any]]:
|
||||||
"""Retrieve a worker entry by ID."""
|
"""Retrieve a worker entry by ID."""
|
||||||
@@ -2484,10 +2486,15 @@ class API_folder_store:
|
|||||||
logger.error(f"Error closing database: {e}", exc_info=True)
|
logger.error(f"Error closing database: {e}", exc_info=True)
|
||||||
|
|
||||||
def __enter__(self):
|
def __enter__(self):
|
||||||
|
if not self.connection:
|
||||||
|
self._init_db()
|
||||||
return self
|
return self
|
||||||
|
|
||||||
def __exit__(self, exc_type, exc_val, exc_tb):
|
def __exit__(self, exc_type, exc_val, exc_tb):
|
||||||
self.close()
|
try:
|
||||||
|
self.close()
|
||||||
|
finally:
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
# ============================================================================
|
# ============================================================================
|
||||||
|
|||||||
@@ -372,6 +372,7 @@ class Folder(Store):
|
|||||||
tag: Optional list of tag values to add
|
tag: Optional list of tag values to add
|
||||||
url: Optional list of url to associate with the file
|
url: Optional list of url to associate with the file
|
||||||
title: Optional title (will be added as 'title:value' tag)
|
title: Optional title (will be added as 'title:value' tag)
|
||||||
|
file_hash: Optional pre-calculated SHA256 hash (skips re-hashing)
|
||||||
|
|
||||||
Returns:
|
Returns:
|
||||||
File hash (SHA256 hex string) as identifier
|
File hash (SHA256 hex string) as identifier
|
||||||
@@ -380,6 +381,7 @@ class Folder(Store):
|
|||||||
tag_list = kwargs.get("tag", [])
|
tag_list = kwargs.get("tag", [])
|
||||||
url = kwargs.get("url", [])
|
url = kwargs.get("url", [])
|
||||||
title = kwargs.get("title")
|
title = kwargs.get("title")
|
||||||
|
file_hash = kwargs.get("file_hash")
|
||||||
|
|
||||||
# Extract title from tags if not explicitly provided
|
# Extract title from tags if not explicitly provided
|
||||||
if not title:
|
if not title:
|
||||||
@@ -400,7 +402,10 @@ class Folder(Store):
|
|||||||
tag_list = [title_tag] + list(tag_list)
|
tag_list = [title_tag] + list(tag_list)
|
||||||
|
|
||||||
try:
|
try:
|
||||||
file_hash = sha256_file(file_path)
|
if not file_hash or len(str(file_hash)) != 64:
|
||||||
|
debug(f"[folder] Re-hashing file: {file_path}", file=sys.stderr)
|
||||||
|
file_hash = sha256_file(file_path)
|
||||||
|
|
||||||
debug(f"File hash: {file_hash}", file=sys.stderr)
|
debug(f"File hash: {file_hash}", file=sys.stderr)
|
||||||
|
|
||||||
# Preserve extension in the stored filename
|
# Preserve extension in the stored filename
|
||||||
|
|||||||
@@ -546,7 +546,7 @@ class Add_File(Cmdlet):
|
|||||||
delete_after_item = delete_after
|
delete_after_item = delete_after
|
||||||
try:
|
try:
|
||||||
if use_steps and (not steps_started):
|
if use_steps and (not steps_started):
|
||||||
progress.begin_steps(4)
|
progress.begin_steps(5)
|
||||||
progress.step("resolving source")
|
progress.step("resolving source")
|
||||||
steps_started = True
|
steps_started = True
|
||||||
|
|
||||||
@@ -560,6 +560,19 @@ class Add_File(Cmdlet):
|
|||||||
failures += 1
|
failures += 1
|
||||||
continue
|
continue
|
||||||
|
|
||||||
|
if use_steps and steps_started:
|
||||||
|
progress.step("hashing file")
|
||||||
|
|
||||||
|
# Update pipe_obj with resolved path
|
||||||
|
pipe_obj.path = str(media_path)
|
||||||
|
|
||||||
|
# When using -path (filesystem export), allow all file types.
|
||||||
|
# When using -store (backend), restrict to SUPPORTED_MEDIA_EXTENSIONS.
|
||||||
|
allow_all_files = not (location and is_storage_backend_location)
|
||||||
|
if not self._validate_source(media_path, allow_all_extensions=allow_all_files):
|
||||||
|
failures += 1
|
||||||
|
continue
|
||||||
|
|
||||||
if use_steps and steps_started and (not step2_done):
|
if use_steps and steps_started and (not step2_done):
|
||||||
progress.step("ingesting file")
|
progress.step("ingesting file")
|
||||||
step2_done = True
|
step2_done = True
|
||||||
@@ -1973,16 +1986,18 @@ class Add_File(Cmdlet):
|
|||||||
debug("[add-file] Deferring tag application until after Hydrus upload")
|
debug("[add-file] Deferring tag application until after Hydrus upload")
|
||||||
|
|
||||||
debug(
|
debug(
|
||||||
f"[add-file] Storing into backend '{backend_name}' path='{media_path}' title='{title}'"
|
f"[add-file] Storing into backend '{backend_name}' path='{media_path}' title='{title}' hash='{f_hash[:12] if f_hash else 'N/A'}'"
|
||||||
)
|
)
|
||||||
|
|
||||||
# Call backend's add_file with full metadata
|
# Call backend's add_file with full metadata
|
||||||
# Backend returns hash as identifier
|
# Backend returns hash as identifier. If we already know the hash from _resolve_source
|
||||||
|
# (which came from download-file emit), pass it to skip re-hashing the 4GB file.
|
||||||
file_identifier = backend.add_file(
|
file_identifier = backend.add_file(
|
||||||
media_path,
|
media_path,
|
||||||
title=title,
|
title=title,
|
||||||
tag=upload_tags,
|
tag=upload_tags,
|
||||||
url=[] if (defer_url_association and url) else url,
|
url=[] if (defer_url_association and url) else url,
|
||||||
|
file_hash=f_hash,
|
||||||
)
|
)
|
||||||
debug(
|
debug(
|
||||||
f"[add-file] backend.add_file returned identifier {file_identifier} (len={len(str(file_identifier)) if file_identifier is not None else 'None'})"
|
f"[add-file] backend.add_file returned identifier {file_identifier} (len={len(str(file_identifier)) if file_identifier is not None else 'None'})"
|
||||||
|
|||||||
Reference in New Issue
Block a user