#!/usr/bin/env python3 from fastapi import FastAPI, HTTPException, Depends, Response from fastapi.security.api_key import APIKeyHeader from fastapi.responses import RedirectResponse from pydantic import BaseModel import psutil import sqlite3 import subprocess import os import uvicorn from typing import Dict, Any, Optional from datetime import datetime import json import re from collections import deque import time # ---------------------- Constants ---------------------- DB_PATH = "/4server/data/contracts.db" BIN_PATH = "/4server/sbin" API_KEY = os.getenv("API_KEY", "your-secret-api-key") VERSION = "API: 0.0.7" # ---------------------- FastAPI App ---------------------- app = FastAPI() api_key_header = APIKeyHeader(name="X-API-Key") def verify_api_key(key: str = Depends(api_key_header)): if key != API_KEY: raise HTTPException(status_code=403, detail="Unauthorized") # ---------------------- Helpers ---------------------- def run_command(cmd: list[str]) -> str: """Run a shell command and return stdout or raise HTTPException on error.""" result = subprocess.run(cmd, capture_output=True, text=True) if result.returncode != 0: error_msg = ( f"Command failed\n" f"Command: {' '.join(cmd)}\n" f"Return code: {result.returncode}\n" f"Stdout: {result.stdout.strip()}\n" f"Stderr: {result.stderr.strip() or 'None'}" ) raise HTTPException(status_code=500, detail=error_msg) return result.stdout.strip() def init_db(): """Initialize the database with containers table.""" os.makedirs(os.path.dirname(DB_PATH), exist_ok=True) conn = sqlite3.connect(DB_PATH) cursor = conn.cursor() cursor.execute(''' CREATE TABLE IF NOT EXISTS containers ( ID INTEGER PRIMARY KEY AUTOINCREMENT, UUID CHAR(50) UNIQUE, email CHAR(100), expires DATE, tags TEXT, env TEXT, affiliate CHAR(30), image CHAR(50), history TEXT, comment TEXT, domains TEXT, status CHAR(20), created DATE, bump DATE, secret TEXT ) ''') conn.commit() conn.close() def execute_db(query: str, params: tuple = (), fetch: bool = False): conn = sqlite3.connect(DB_PATH) conn.row_factory = sqlite3.Row cursor = conn.cursor() cursor.execute(query, params) conn.commit() data = cursor.fetchall() if fetch else None conn.close() if data and fetch: return [dict(row) for row in data] return None # ---------------------- Models ---------------------- class ContainerModel(BaseModel): UUID: str email: Optional[str] = None expires: Optional[str] = None tags: Optional[str] = None env: Optional[Dict[str, Any]] = None affiliate: Optional[str] = None image: Optional[str] = None history: Optional[str] = None comment: Optional[str] = None domains: Optional[str] = None status: Optional[str] = None created: Optional[str] = None bump: Optional[str] = None secret: Optional[Dict[str, Any]] = None class UUIDRequest(BaseModel): UUID: str # ---------------------- Routes ---------------------- @app.get("/", include_in_schema=False) def redirect_to_odoo(): return RedirectResponse(url="https://ODOO4PROJECTS.com") from fastapi import FastAPI, Depends from fastapi.responses import RedirectResponse from pydantic import BaseModel from typing import Optional, Dict, Any import json app = FastAPI() # ---------------------- Models ---------------------- class ContainerModel(BaseModel): UUID: str email: Optional[str] = None expires: Optional[str] = None tags: Optional[str] = None env: Optional[Dict[str, Any]] = None affiliate: Optional[str] = None image: Optional[str] = None history: Optional[str] = None comment: Optional[str] = None domains: Optional[str] = None status: Optional[str] = None created: Optional[str] = None bump: Optional[str] = None secret: Optional[Dict[str, Any]] = None # ---------------------- Routes ---------------------- @app.get("/", include_in_schema=False) def redirect_to_odoo(): return RedirectResponse(url="https://ODOO4PROJECTS.com") @app.post("/container/update", dependencies=[Depends(verify_api_key)]) def update_container(request: ContainerModel): # Convert dict fields to JSON strings env_str = json.dumps(request.env) if isinstance(request.env, dict) else None secret_str = json.dumps(request.secret) if isinstance(request.secret, dict) else None # Fetch existing record existing = execute_db("SELECT * FROM containers WHERE UUID = ?", (request.UUID,), fetch=True) if not existing: # If record does not exist, insert a new one with all given fields execute_db(""" INSERT INTO containers (UUID, email, expires, tags, env, affiliate, image, history, comment, domains, status, created, bump, secret) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) """, ( request.UUID, request.email, request.expires, request.tags, env_str, request.affiliate, request.image, request.history, request.comment, request.domains, request.status, request.created, request.bump, secret_str )) return {"UUID": request.UUID, "status": "created"} # Existing record found, do partial update existing = existing[0] # Assuming fetch returns list of dicts updates = {} params = [] # Only add fields that are not None in the request for field in ContainerModel.__fields__: if field == "UUID": continue value = getattr(request, field) if value is not None: if field in ["env", "secret"]: value = json.dumps(value) updates[field] = value params.append(value) if updates: # Build SQL dynamically set_clause = ", ".join(f"{k}=?" for k in updates.keys()) params.append(request.UUID) # UUID for WHERE clause execute_db(f"UPDATE containers SET {set_clause} WHERE UUID=?", tuple(params)) return {"UUID": request.UUID, "status": "updated", "fields_updated": list(updates.keys())} return {"UUID": request.UUID, "status": "no_change"} @app.post("/container/start", dependencies=[Depends(verify_api_key)]) def start_container(request: UUIDRequest): return {"message": run_command([f"{BIN_PATH}/startContainer", request.UUID])} @app.post("/container/stop", dependencies=[Depends(verify_api_key)]) def stop_container(request: UUIDRequest): return {"message": run_command([f"{BIN_PATH}/stopContainer", request.UUID])} @app.post("/container/nuke", dependencies=[Depends(verify_api_key)]) def nuke_container(request: UUIDRequest): status = execute_db("SELECT status FROM containers WHERE UUID=?", (request.UUID,), fetch=True) if not status or status[0]["status"] != "nuke": raise HTTPException(400, "Container status is not 'nuke'") return {"message": run_command([f"{BIN_PATH}/nukeContainer", request.UUID])} @app.post("/container/info", dependencies=[Depends(verify_api_key)]) def info_container(request: Optional[UUIDRequest] = None): # Fields to select fields = [ "ID", "UUID", "email", "expires", "tags", "env", "affiliate", "image", "history", "comment", "domains", "status", "created" ] field_str = ", ".join(fields) # Execute query if request: rows = execute_db( f"SELECT {field_str} FROM containers WHERE UUID=?", (str(request.UUID),), fetch=True ) else: rows = execute_db( f"SELECT {field_str} FROM containers", fetch=True ) # Map rows to dicts safely containers = [] for row in rows: if isinstance(row, dict): # Already a dict (e.g., some DB wrappers) containers.append(row) else: # Tuple/list -> map with fields containers.append(dict(zip(fields, row))) # Wrap in n8n JSON format n8n_items = [{"json": container} for container in containers] return n8n_items @app.post("/container/bump", dependencies=[Depends(verify_api_key)]) def bump_container(request: UUIDRequest): today = datetime.utcnow().strftime("%Y-%m-%d") execute_db("UPDATE containers SET bump=? WHERE UUID=?", (today, request.UUID)) msg = run_command([f"{BIN_PATH}/bumpContainer", request.UUID]) return {"message": msg, "bump_date": today} @app.post("/container/quota", dependencies=[Depends(verify_api_key)]) def container_quota(request: UUIDRequest): output = run_command([ "docker", "stats", request.UUID, "--no-stream", "--format", "{{.MemUsage}},{{.BlockIO}}" ]) mem_usage, disk_usage = output.split(",") return {"memory_usage": mem_usage, "disk_io": disk_usage} # ---------------------- SYSTEM ---------------------- @app.get("/system/containers", dependencies=[Depends(verify_api_key)]) def get_containers(): return Response(content=run_command([f"{BIN_PATH}/getContainers"]), media_type="application/json") @app.get("/system/images", dependencies=[Depends(verify_api_key)]) def list_images(): images = run_command(["docker", "images", "--format", "{{.Repository}}:{{.Tag}}"]) return {"images": images.split("\n")} @app.get("/system/info", dependencies=[Depends(verify_api_key)]) def get_system_info(): try: alpine_version = None last_update = None bump_dates = execute_db("SELECT MAX(bump) AS latest_bump FROM containers", fetch=True)[0]["latest_bump"] if os.path.exists("/4server/data/update"): with open("/4server/data/update") as f: last_update = f.read().strip() if os.path.exists("/etc/alpine-release"): with open("/etc/alpine-release") as f: alpine_version = f.read().strip() mem = psutil.virtual_memory() disk = psutil.disk_usage("/") cpu_count = psutil.cpu_count(logical=True) return { "alpine_version": alpine_version, "last_update": last_update, "latest_bump": bump_dates, "version": VERSION, "resources": { "memory": {"total": mem.total, "available": mem.available, "used": mem.used}, "disk": {"total": disk.total, "used": disk.used, "free": disk.free}, "cpu_count": cpu_count } } except Exception as e: raise HTTPException(status_code=500, detail=str(e)) @app.post("/system/pull", dependencies=[Depends(verify_api_key)]) def pull_all_images(): return {"message": run_command([f"{BIN_PATH}/pullAllContainers"])} @app.post("/client/revert/{uuid}", dependencies=[Depends(verify_api_key)]) def revert_changes(uuid: str): return {"message": run_command([f"{BIN_PATH}/gitRevert", uuid ])} @app.get("/client/logs/{uuid}", dependencies=[Depends(verify_api_key)]) async def get_odoo_log_summary(uuid: str): if not re.fullmatch(r"[0-9a-fA-F\-]+", uuid): raise HTTPException(status_code=400, detail="Invalid UUID format. Only numbers, letters a-f, and '-' are allowed.") BASE_LOG_DIR = "/4server/data" # Build file paths as strings project_dir = os.path.join(BASE_LOG_DIR, uuid) odoo_log_file = os.path.join(project_dir, "logs", "odoo.log") git_log_file = os.path.join(project_dir, "logs", "git.log") if not os.path.isfile(odoo_log_file): raise HTTPException(status_code=404, detail="Odoo log file not found") # --- Helper variables and functions --- IMPORTANT_PATTERNS = [ re.compile(r"odoo\.addons\."), # Any Odoo addon log re.compile(r"Job '.*' starting"), # Cron job start re.compile(r"Job '.*' fully done"), # Cron job end re.compile(r"ERROR"), # Errors re.compile(r"WARNING"), # Warnings re.compile(r"Traceback"), # Tracebacks ] def is_important_line(line: str) -> bool: return any(p.search(line) for p in IMPORTANT_PATTERNS) def read_last_lines(file_path: str, max_lines: int) -> list[str]: """ Read the last `max_lines` from the file efficiently. """ if not os.path.isfile(file_path): return [] last_lines = deque(maxlen=max_lines) with open(file_path, "r", encoding="utf-8", errors="ignore") as f: for line in f: last_lines.append(line.strip()) return list(last_lines) # --- Main logic --- try: # Last 500 lines from Odoo log last_500_lines = read_last_lines(odoo_log_file, 500) important_odoo_lines = [line for line in last_500_lines if is_important_line(line)] # Last 50 lines from git.log last_50_git_lines = read_last_lines(git_log_file, 50) return { "uuid": str(uuid), "important_odoo_log_lines": important_odoo_lines, "last_git_log_lines": last_50_git_lines, } except Exception as e: raise HTTPException(status_code=500, detail=f"Error reading log files: {e}") # ---------------------- Entry Point ---------------------- if __name__ == "__main__": print(VERSION) init_db() time.sleep(25) uvicorn.run(app, host="10.5.0.1", port=8888)