This commit is contained in:
2026-01-22 02:45:08 -08:00
parent 3d571b007b
commit ba23c0606f
18 changed files with 75 additions and 5355 deletions

View File

@@ -1,773 +0,0 @@
"""Remote Storage Server - REST API for file management on mobile devices.
This server runs on a mobile device (Android with Termux, iOS with iSH, etc.)
and exposes the local library database as a REST API. Your PC connects to this
server and uses it as a remote storage backend through the RemoteStorageBackend.
## INSTALLATION
### On Android (Termux):
1. Install Termux from Play Store: https://play.google.com/store/apps/details?id=com.termux
2. In Termux:
$ apt update && apt install python
$ pip install flask flask-cors
3. Copy this file to your device
4. Run it (with optional API key):
$ python remote_storage_server.py --storage-path /path/to/storage --port 999
$ python remote_storage_server.py --storage-path /path/to/storage --api-key mysecretkey
5. Server prints connection info automatically (IP, port, API key)
### On PC:
1. Install requests: pip install requests
2. Add to config.conf:
[store=remote]
name="phone"
url="http://192.168.1.100:999"
api_key="mysecretkey"
timeout=30
Note: API key is optional. Works on WiFi or cellular data.
## USAGE
After setup, all cmdlet work with the phone:
$ search-file zohar -store phone
$ @1-3 | add-relationship -king @4 -store phone
$ @1 | get-relationship -store phone
The server exposes REST endpoints that RemoteStorageBackend uses internally.
"""
from __future__ import annotations
import os
import sys
import argparse
import logging
import threading
import time
from pathlib import Path
from typing import Optional, Dict, Any
from datetime import datetime
from functools import wraps
# Add parent directory to path for imports
sys.path.insert(0, str(Path(__file__).parent.parent))
# ============================================================================
# CONFIGURATION
# ============================================================================
logging.basicConfig(
level=logging.INFO,
format="[%(asctime)s] %(levelname)s: %(message)s"
)
logger = logging.getLogger(__name__)
STORAGE_PATH: Optional[Path] = None
API_KEY: Optional[str] = None # API key for authentication (None = no auth required)
# Cache for database connection to prevent "database is locked" on high frequency requests
_DB_CACHE: Dict[str, Any] = {}
def get_db(path: Path):
from API.folder import LocalLibrarySearchOptimizer
p_str = str(path)
if p_str not in _DB_CACHE:
_DB_CACHE[p_str] = LocalLibrarySearchOptimizer(path)
_DB_CACHE[p_str].__enter__()
return _DB_CACHE[p_str]
# Try importing Flask - will be used in main() only
try:
from flask import Flask, request, jsonify
from flask_cors import CORS
HAS_FLASK = True
except ImportError:
HAS_FLASK = False
# ============================================================================
# UTILITY FUNCTIONS
# ============================================================================
def monitor_parent(parent_pid: int):
"""Monitor the parent process and shut down if it dies."""
if parent_pid <= 1:
return
logger.info(f"Monitoring parent process {parent_pid}")
# On Windows, we might need a different approach if os.kill(pid, 0) is unreliable
is_windows = sys.platform == "win32"
while True:
try:
if is_windows:
# OpenProcess with PROCESS_QUERY_LIMITED_INFORMATION (0x1000)
# This is safer than os.kill on Windows for existence checks
import ctypes
PROCESS_QUERY_LIMITED_INFORMATION = 0x1000
handle = ctypes.windll.kernel32.OpenProcess(PROCESS_QUERY_LIMITED_INFORMATION, False, parent_pid)
if handle:
exit_code = ctypes.c_ulong()
ctypes.windll.kernel32.GetExitCodeProcess(handle, ctypes.byref(exit_code))
ctypes.windll.kernel32.CloseHandle(handle)
# STILL_ACTIVE is 259
if exit_code.value != 259:
logger.info(f"Parent process {parent_pid} finished with code {exit_code.value}. Shutting down...")
os._exit(0)
else:
# On Windows, sometimes we lose access to the handle if the parent is transitioning
# or if it was started from a shell that already closed.
# We'll ignore handle failures for now unless we want to be very strict.
pass
else:
os.kill(parent_pid, 0)
except Exception as e:
# Parent is dead or inaccessible
logger.info(f"Parent process {parent_pid} no longer accessible: {e}. Shutting down server...")
os._exit(0)
time.sleep(5) # Increase check interval to be less aggressive
def get_local_ip() -> Optional[str]:
"""Get the local IP address that would be used for external connections."""
import socket
try:
# Create a socket to determine which interface would be used
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s.connect(("8.8.8.8", 80)) # Google DNS
ip = s.getsockname()[0]
s.close()
return ip
except Exception:
return None
# ============================================================================
# FLASK APP FACTORY
# ============================================================================
def create_app():
"""Create and configure Flask app with all routes."""
if not HAS_FLASK:
raise ImportError(
"Flask not installed. Install with: pip install flask flask-cors"
)
from flask import Flask, request, jsonify, send_file
from flask_cors import CORS
app = Flask(__name__)
CORS(app)
# ========================================================================
# HELPER DECORATORS
# ========================================================================
def require_auth():
"""Decorator to check API key authentication if configured."""
def decorator(f):
@wraps(f)
def decorated_function(*args, **kwargs):
if API_KEY:
# Get API key from header or query parameter
provided_key = request.headers.get("X-API-Key"
) or request.args.get("api_key")
if not provided_key or provided_key != API_KEY:
return jsonify({"error": "Unauthorized. Invalid or missing API key."}), 401
return f(*args, **kwargs)
return decorated_function
return decorator
def require_storage():
"""Decorator to ensure storage path is configured."""
def decorator(f):
@wraps(f)
def decorated_function(*args, **kwargs):
if not STORAGE_PATH:
return jsonify({"error": "Storage path not configured"}), 500
return f(*args, **kwargs)
return decorated_function
return decorator
# ========================================================================
# HEALTH CHECK
# ========================================================================
@app.route("/health", methods=["GET"])
def health():
"""Check server health and storage availability."""
# Check auth manually to allow discovery even if locked
authed = True
if API_KEY:
provided_key = request.headers.get("X-API-Key") or request.args.get("api_key")
if not provided_key or provided_key != API_KEY:
authed = False
status = {
"status": "ok",
"service": "remote_storage",
"name": os.environ.get("MM_SERVER_NAME", "Remote Storage"),
"storage_configured": STORAGE_PATH is not None,
"timestamp": datetime.now().isoformat(),
"locked": not authed and API_KEY is not None
}
# If not authed but API_KEY is required, return minimal info for discovery
if not authed and API_KEY:
return jsonify(status), 200
if STORAGE_PATH:
status["storage_path"] = str(STORAGE_PATH)
status["storage_exists"] = STORAGE_PATH.exists()
try:
search_db = get_db(STORAGE_PATH)
status["database_accessible"] = True
except Exception as e:
status["database_accessible"] = False
status["database_error"] = str(e)
return jsonify(status), 200
# ========================================================================
# FILE OPERATIONS
# ========================================================================
@app.route("/files/search", methods=["GET"])
@require_auth()
@require_storage()
def search_files():
"""Search for files by name or tag."""
query = request.args.get("q", "")
limit = request.args.get("limit", 100, type=int)
# Allow empty query or '*' for "list everything"
db_query = query if query and query != "*" else ""
try:
search_db = get_db(STORAGE_PATH)
results = search_db.search_by_name(db_query, limit)
tag_results = search_db.search_by_tag(db_query, limit)
all_results_dict = {
r["hash"]: r
for r in (results + tag_results)
}
# Fetch tags for each result to support title extraction on client
if search_db.db:
for res in all_results_dict.values():
file_hash = res.get("hash")
if file_hash:
tags = search_db.db.get_tags(file_hash)
res["tag"] = tags
return (
jsonify(
{
"query": query,
"count": len(all_results_dict),
"files": list(all_results_dict.values()),
}
),
200,
)
except Exception as e:
logger.error(f"Search error: {e}", exc_info=True)
return jsonify({"error": f"Search failed: {str(e)}"}), 500
@app.route("/files/<file_hash>", methods=["GET"])
@require_auth()
@require_storage()
def get_file_metadata(file_hash: str):
"""Get metadata for a specific file by hash."""
try:
search_db = get_db(STORAGE_PATH)
db = search_db.db
if not db:
return jsonify({"error": "Database unavailable"}), 500
file_path = db.search_hash(file_hash)
if not file_path or not file_path.exists():
return jsonify({"error": "File not found"}), 404
metadata = db.get_metadata(file_hash)
tags = db.get_tags(file_hash) # Use hash string
return (
jsonify(
{
"hash": file_hash,
"path": str(file_path),
"size": file_path.stat().st_size,
"metadata": metadata,
"tag": tags,
}
),
200,
)
except Exception as e:
logger.error(f"Get metadata error: {e}", exc_info=True)
return jsonify({"error": f"Failed to get metadata: {str(e)}"}), 500
@app.route("/files/raw/<file_hash>", methods=["GET"])
@require_auth()
@require_storage()
def download_file(file_hash: str):
"""Download a raw file by hash."""
try:
search_db = get_db(STORAGE_PATH)
db = search_db.db
if not db:
return jsonify({"error": "Database unavailable"}), 500
file_path = db.search_hash(file_hash)
if not file_path or not file_path.exists():
return jsonify({"error": "File not found"}), 404
return send_file(file_path)
except Exception as e:
logger.error(f"Download error: {e}", exc_info=True)
return jsonify({"error": f"Download failed: {str(e)}"}), 500
@app.route("/files/index", methods=["POST"])
@require_auth()
@require_storage()
def index_file():
"""Index a new file in the storage."""
from SYS.utils import sha256_file
data = request.get_json() or {}
file_path_str = data.get("path")
tags = data.get("tag", [])
url = data.get("url", [])
if not file_path_str:
return jsonify({"error": "File path required"}), 400
try:
file_path = Path(file_path_str)
if not file_path.exists():
return jsonify({"error": "File does not exist"}), 404
search_db = get_db(STORAGE_PATH)
db = search_db.db
if not db:
return jsonify({"error": "Database unavailable"}), 500
db.get_or_create_file_entry(file_path)
if tags:
db.add_tags(file_path, tags)
if url:
db.add_url(file_path, url)
file_hash = sha256_file(file_path)
return (
jsonify(
{
"hash": file_hash,
"path": str(file_path),
"tags_added": len(tags),
"url_added": len(url),
}
),
201,
)
except Exception as e:
logger.error(f"Index error: {e}", exc_info=True)
return jsonify({"error": f"Indexing failed: {str(e)}"}), 500
@app.route("/files/upload", methods=["POST"])
@require_auth()
@require_storage()
def upload_file():
"""Upload a file into storage (multipart/form-data).
Accepts form fields:
- file: uploaded file (required)
- tag: repeated tag parameters or comma-separated string
- url: repeated url parameters or comma-separated string
"""
from API.folder import API_folder_store
from SYS.utils import sha256_file, sanitize_filename, ensure_directory, unique_path
if 'file' not in request.files:
return jsonify({"error": "file required"}), 400
file_storage = request.files.get('file')
if file_storage is None:
return jsonify({"error": "file required"}), 400
filename = sanitize_filename(file_storage.filename or "upload")
incoming_dir = STORAGE_PATH / "incoming"
target_path = incoming_dir / filename
target_path = unique_path(target_path)
try:
# Initialize the DB first (run safety checks) before creating any files.
with API_folder_store(STORAGE_PATH) as db:
# Ensure the incoming directory exists only after DB safety checks pass.
ensure_directory(incoming_dir)
# Save uploaded file to storage
file_storage.save(str(target_path))
# Extract optional metadata
tags = []
if 'tag' in request.form:
# Support repeated form fields or comma-separated list
tags = request.form.getlist('tag') or []
if not tags and request.form.get('tag'):
tags = [t.strip() for t in str(request.form.get('tag') or "").split(",") if t.strip()]
urls = []
if 'url' in request.form:
urls = request.form.getlist('url') or []
if not urls and request.form.get('url'):
urls = [u.strip() for u in str(request.form.get('url') or "").split(",") if u.strip()]
db.get_or_create_file_entry(target_path)
if tags:
db.add_tags(target_path, tags)
if urls:
db.add_url(target_path, urls)
file_hash = sha256_file(target_path)
return (
jsonify({
"hash": file_hash,
"path": str(target_path),
"tags_added": len(tags),
"url_added": len(urls),
}),
201,
)
except Exception as e:
logger.error(f"Upload error: {e}", exc_info=True)
return jsonify({"error": f"Upload failed: {str(e)}"}), 500
# ========================================================================
# TAG OPERATIONS
# ========================================================================
@app.route("/tags/<file_hash>", methods=["GET"])
@require_auth()
@require_storage()
def get_tags(file_hash: str):
"""Get tags for a file."""
from API.folder import API_folder_store
try:
with API_folder_store(STORAGE_PATH) as db:
file_path = db.search_hash(file_hash)
if not file_path:
return jsonify({"error": "File not found"}), 404
tags = db.get_tags(file_path)
return jsonify({"hash": file_hash, "tag": tags}), 200
except Exception as e:
logger.error(f"Get tags error: {e}", exc_info=True)
return jsonify({"error": f"Failed: {str(e)}"}), 500
@app.route("/tags/<file_hash>", methods=["POST"])
@require_auth()
@require_storage()
def add_tags(file_hash: str):
"""Add tags to a file."""
from API.folder import API_folder_store
data = request.get_json() or {}
tags = data.get("tag", [])
mode = data.get("mode", "add")
if not tags:
return jsonify({"error": "Tag required"}), 400
try:
with API_folder_store(STORAGE_PATH) as db:
file_path = db.search_hash(file_hash)
if not file_path:
return jsonify({"error": "File not found"}), 404
if mode == "replace":
db.remove_tags(file_path, db.get_tags(file_path))
db.add_tags(file_path, tags)
return jsonify({"hash": file_hash, "tag_added": len(tags), "mode": mode}), 200
except Exception as e:
logger.error(f"Add tags error: {e}", exc_info=True)
return jsonify({"error": f"Failed: {str(e)}"}), 500
@app.route("/tags/<file_hash>", methods=["DELETE"])
@require_auth()
@require_storage()
def remove_tags(file_hash: str):
"""Remove tags from a file."""
from API.folder import API_folder_store
tags_str = request.args.get("tag", "")
try:
with API_folder_store(STORAGE_PATH) as db:
file_path = db.search_hash(file_hash)
if not file_path:
return jsonify({"error": "File not found"}), 404
if tags_str:
tags_to_remove = [t.strip() for t in tags_str.split(",")]
else:
tags_to_remove = db.get_tags(file_path)
db.remove_tags(file_path, tags_to_remove)
return jsonify({"hash": file_hash, "tags_removed": len(tags_to_remove)}), 200
except Exception as e:
logger.error(f"Remove tags error: {e}", exc_info=True)
return jsonify({"error": f"Failed: {str(e)}"}), 500
# ========================================================================
# RELATIONSHIP OPERATIONS
# ========================================================================
@app.route("/relationships/<file_hash>", methods=["GET"])
@require_auth()
@require_storage()
def get_relationships(file_hash: str):
"""Get relationships for a file."""
from API.folder import API_folder_store
try:
with API_folder_store(STORAGE_PATH) as db:
file_path = db.search_hash(file_hash)
if not file_path:
return jsonify({"error": "File not found"}), 404
metadata = db.get_metadata(file_path)
relationships = metadata.get("relationships",
{}) if metadata else {}
return jsonify({"hash": file_hash, "relationships": relationships}), 200
except Exception as e:
logger.error(f"Get relationships error: {e}", exc_info=True)
return jsonify({"error": f"Failed: {str(e)}"}), 500
@app.route("/relationships", methods=["POST"])
@require_auth()
@require_storage()
def set_relationship():
"""Set a relationship between two files."""
from API.folder import API_folder_store
data = request.get_json() or {}
from_hash = data.get("from_hash")
to_hash = data.get("to_hash")
rel_type = data.get("type", "alt")
if not from_hash or not to_hash:
return jsonify({"error": "from_hash and to_hash required"}), 400
try:
with API_folder_store(STORAGE_PATH) as db:
from_path = db.search_hash(from_hash)
to_path = db.search_hash(to_hash)
if not from_path or not to_path:
return jsonify({"error": "File not found"}), 404
db.set_relationship(from_path, to_path, rel_type)
return jsonify({"from_hash": from_hash, "to_hash": to_hash, "type": rel_type}), 200
except Exception as e:
logger.error(f"Set relationship error: {e}", exc_info=True)
return jsonify({"error": f"Failed: {str(e)}"}), 500
# ========================================================================
# URL OPERATIONS
# ========================================================================
@app.route("/url/<file_hash>", methods=["GET"])
@require_auth()
@require_storage()
def get_url(file_hash: str):
"""Get known url for a file."""
from API.folder import API_folder_store
try:
with API_folder_store(STORAGE_PATH) as db:
file_path = db.search_hash(file_hash)
if not file_path:
return jsonify({"error": "File not found"}), 404
metadata = db.get_metadata(file_path)
url = metadata.get("url", []) if metadata else []
return jsonify({"hash": file_hash, "url": url}), 200
except Exception as e:
logger.error(f"Get url error: {e}", exc_info=True)
return jsonify({"error": f"Failed: {str(e)}"}), 500
@app.route("/url/<file_hash>", methods=["POST"])
@require_auth()
@require_storage()
def add_url(file_hash: str):
"""Add url to a file."""
from API.folder import API_folder_store
data = request.get_json() or {}
url = data.get("url", [])
if not url:
return jsonify({"error": "url required"}), 400
try:
with API_folder_store(STORAGE_PATH) as db:
file_path = db.search_hash(file_hash)
if not file_path:
return jsonify({"error": "File not found"}), 404
db.add_url(file_path, url)
return jsonify({"hash": file_hash, "url_added": len(url)}), 200
except Exception as e:
logger.error(f"Add url error: {e}", exc_info=True)
return jsonify({"error": f"Failed: {str(e)}"}), 500
return app
# ============================================================================
# MAIN
# ============================================================================
def main():
if not HAS_FLASK:
print("ERROR: Flask and flask-cors required")
print("Install with: pip install flask flask-cors")
sys.exit(1)
parser = argparse.ArgumentParser(
description="Remote Storage Server for Medios-Macina",
epilog=
"Example: python remote_storage_server.py --storage-path /storage/media --port 999 --api-key mysecretkey",
)
parser.add_argument(
"--storage-path",
type=str,
required=True,
help="Path to storage directory"
)
parser.add_argument(
"--host",
type=str,
default="0.0.0.0",
help="Server host (default: 0.0.0.0)"
)
parser.add_argument(
"--port",
type=int,
default=999,
help="Server port (default: 999)"
)
parser.add_argument(
"--api-key",
type=str,
default=None,
help="API key for authentication (optional)"
)
parser.add_argument("--debug", action="store_true", help="Enable debug mode")
parser.add_argument(
"--monitor",
action="store_true",
help="Shut down if parent process dies"
)
parser.add_argument(
"--parent-pid",
type=int,
default=None,
help="Explicit PID to monitor (defaults to the immediate parent process)",
)
args = parser.parse_args()
# Start monitor thread if requested
if args.monitor:
monitor_pid = args.parent_pid or os.getppid()
if monitor_pid > 1:
monitor_thread = threading.Thread(
target=monitor_parent,
args=(monitor_pid, ),
daemon=True
)
monitor_thread.start()
global STORAGE_PATH, API_KEY
STORAGE_PATH = Path(args.storage_path).resolve()
API_KEY = args.api_key
if not STORAGE_PATH.exists():
print(f"ERROR: Storage path does not exist: {STORAGE_PATH}")
sys.exit(1)
# Get local IP address
local_ip = get_local_ip()
if not local_ip:
local_ip = "127.0.0.1"
print(f"\n{'='*70}")
print("Remote Storage Server - Medios-Macina")
print(f"{'='*70}")
print(f"Storage Path: {STORAGE_PATH}")
print(f"Local IP: {local_ip}")
print(f"Server URL: http://{local_ip}:{args.port}")
print(f"Health URL: http://{local_ip}:{args.port}/health")
print(
f"API Key: {'Enabled - ' + ('***' + args.api_key[-4:]) if args.api_key else 'Disabled (no auth)'}"
)
print(f"Debug Mode: {args.debug}")
print("\n📋 Config for config.conf:")
print("[store=remote]")
print('name="phone"')
print(f'url="http://{local_ip}:{args.port}"')
if args.api_key:
print(f'api_key="{args.api_key}"')
print("timeout=30")
print("\nOR use ZeroTier Networking (Server Side):")
print("[networking=zerotier]")
print(f'serve="{STORAGE_PATH.name}"')
print(f'port="{args.port}"')
if args.api_key:
print(f'api_key="{args.api_key}"')
print(f"\n{'='*70}\n")
try:
from API.folder import API_folder_store
with API_folder_store(STORAGE_PATH) as db:
logger.info("Database initialized successfully")
except Exception as e:
logger.error(f"Failed to initialize database: {e}")
sys.exit(1)
app = create_app()
app.run(host=args.host, port=args.port, debug=args.debug, use_reloader=False)
if __name__ == "__main__":
main()

View File

@@ -1,133 +0,0 @@
#!/usr/bin/env python3
"""Simple ZeroTier helper for joining networks and discovering peers.
Usage:
python scripts/zerotier_setup.py --join <network_id>
python scripts/zerotier_setup.py --list
python scripts/zerotier_setup.py --discover <network_id>
This is a convenience tool to exercise the API/zerotier.py functionality while
prototyping and bringing up remote peers for store testing.
"""
from __future__ import annotations
import argparse
import json
import sys
from pathlib import Path
from SYS.logger import log
try:
from API import zerotier
except Exception:
zerotier = None
def main(argv=None):
parser = argparse.ArgumentParser(description="ZeroTier helper for Medios-Macina")
parser.add_argument("--list", action="store_true", help="List local ZeroTier networks")
parser.add_argument("--join", type=str, help="Join a ZeroTier network by ID")
parser.add_argument("--leave", type=str, help="Leave a ZeroTier network by ID")
parser.add_argument("--discover", type=str, help="Discover services on a ZeroTier network ID")
parser.add_argument("--upload", type=str, help="Upload a file to a discovered 'remote' service on this ZeroTier network ID")
parser.add_argument("--file", type=str, help="Local file to upload (used with --upload)")
parser.add_argument("--tag", action="append", help="Tag to attach (repeatable)", default=[])
parser.add_argument("--url", action="append", help="URL to associate (repeatable)", default=[])
parser.add_argument("--api-key", type=str, help="API key to use for uploads (optional)")
parser.add_argument("--json", action="store_true", help="Output JSON when appropriate")
args = parser.parse_args(argv)
if zerotier is None:
log("ZeroTier API module not available; ensure API/zerotier.py is importable and zerotier or zerotier-cli is installed")
return 1
if args.list:
nets = zerotier.list_networks()
if args.json:
print(json.dumps([n.__dict__ for n in nets], indent=2))
else:
for n in nets:
print(f"{n.id}\t{name:=}{n.name}\t{n.status}\t{n.assigned_addresses}")
return 0
if args.join:
try:
ok = zerotier.join_network(args.join)
print("Joined" if ok else "Failed to join")
return 0 if ok else 2
except Exception as exc:
log(f"Join failed: {exc}")
print(f"Join failed: {exc}")
return 2
if args.leave:
try:
ok = zerotier.leave_network(args.leave)
print("Left" if ok else "Failed to leave")
return 0 if ok else 2
except Exception as exc:
log(f"Leave failed: {exc}")
print(f"Leave failed: {exc}")
return 2
if args.discover:
probes = zerotier.discover_services_on_network(args.discover)
if args.json:
print(json.dumps([p.__dict__ for p in probes], indent=2, default=str))
else:
for p in probes:
print(f"{p.address}:{p.port}{p.path} -> status={p.status_code} hint={p.service_hint}")
return 0
if args.upload:
# Upload a file to the first discovered remote service on the network
if not args.file:
print("ERROR: --file is required for --upload")
return 2
probe = zerotier.find_peer_service(args.upload, service_hint="remote")
if not probe:
print("No remote service found on network")
return 2
base = f"http://{probe.address}:{probe.port}"
try:
import httpx
url = base.rstrip("/") + "/files/upload"
headers = {}
if args.api_key:
headers["X-API-Key"] = args.api_key
with open(args.file, "rb") as fh:
files = {"file": (Path(args.file).name, fh)}
data = []
for t in (args.tag or []):
data.append(("tag", t))
for u in (args.url or []):
data.append(("url", u))
resp = httpx.post(url, files=files, data=data, headers=headers, timeout=30)
print(resp.status_code, resp.text)
return 0 if resp.status_code in (200, 201) else 2
except Exception:
import requests
url = base.rstrip("/") + "/files/upload"
headers = {}
if args.api_key:
headers["X-API-Key"] = args.api_key
with open(args.file, "rb") as fh:
files = {"file": (Path(args.file).name, fh)}
data = []
for t in (args.tag or []):
data.append(("tag", t))
for u in (args.url or []):
data.append(("url", u))
resp = requests.post(url, files=files, data=data, headers=headers, timeout=30)
print(resp.status_code, resp.text)
return 0 if resp.status_code in (200, 201) else 2
parser.print_help()
return 0
if __name__ == "__main__":
sys.exit(main())