#!/usr/bin/env python3 from fastapi import FastAPI, HTTPException, Depends, Response from fastapi.security.api_key import APIKeyHeader from fastapi.responses import RedirectResponse from pydantic import BaseModel import psutil import sqlite3 import subprocess import os import uvicorn from typing import Optional from datetime import datetime # Constants DB_PATH = "/4server/data/contracts.db" BIN_PATH = "/4server/sbin" API_KEY = os.getenv("API_KEY", "your-secret-api-key") VERSION = "API: 0.0.7" # FastAPI app app = FastAPI() api_key_header = APIKeyHeader(name="X-API-Key") def verify_api_key(key: str = Depends(api_key_header)): if key != API_KEY: raise HTTPException(status_code=403, detail="Unauthorized") # ---------------------- Helpers ---------------------- def run_command(cmd: list[str]) -> str: """Run a shell command and return stdout or raise HTTPException on error.""" result = subprocess.run(cmd, capture_output=True, text=True) if result.returncode != 0: raise HTTPException(status_code=500, detail=result.stderr.strip() or "Unknown error") return result.stdout.strip() def init_db(): """Initialize the database with containers table.""" os.makedirs(os.path.dirname(DB_PATH), exist_ok=True) conn = sqlite3.connect(DB_PATH) cursor = conn.cursor() cursor.execute(''' CREATE TABLE IF NOT EXISTS containers ( ID INTEGER PRIMARY KEY AUTOINCREMENT, UUID CHAR(50), email CHAR(100), expires DATE, tags TEXT, env TEXT, affiliate CHAR(30), image CHAR(50), history TEXT, comment TEXT, domains TEXT, status CHAR(20), created DATE, bump DATE ) ''') conn.commit() conn.close() def execute_db(query: str, params: tuple = (), fetch: bool = False): conn = sqlite3.connect(DB_PATH) conn.row_factory = sqlite3.Row # <-- Add this line cursor = conn.cursor() cursor.execute(query, params) conn.commit() data = cursor.fetchall() if fetch else None conn.close() if data and fetch: return [dict(row) for row in data] # Convert each row to dict return data # ---------------------- Models ---------------------- class ContainerModel(BaseModel): UUID: Optional[str] = None email: Optional[str] = None expires: Optional[str] = None tags: Optional[str] = None env: Optional[str] = None affiliate: Optional[str] = None image: Optional[str] = None history: Optional[str] = None comment: Optional[str] = None domains: Optional[str] = None status: Optional[str] = None created: Optional[str] = None bump: Optional[str] = None class ContainerIDRequest(BaseModel): container_id: Optional[str] = None class UpdateContainerRequest(ContainerModel): pass class InfoContainerRequest(BaseModel): container_id: Optional[str] = None # ---------------------- Routes ---------------------- @app.get("/", include_in_schema=False) def redirect_to_odoo(): return RedirectResponse(url="https://ODOO4PROJECTS.com") @app.post("/container/update", dependencies=[Depends(verify_api_key)]) def update_container(request: UpdateContainerRequest): existing = execute_db("SELECT * FROM containers WHERE UUID = ?", (request.UUID,), fetch=True) if existing: execute_db(""" UPDATE containers SET email=?, expires=?, tags=?, env=?, affiliate=?, image=?, history=?, comment=?, domains=?, status=?, created=?, bump=? WHERE UUID=? """, ( request.email, request.expires, request.tags, request.env, request.affiliate, request.image, request.history, request.comment, request.domains, request.status, request.created, request.bump, request.UUID )) else: execute_db(""" INSERT INTO containers (UUID, email, expires, tags, env, affiliate, image, history, comment, domains, status, created, bump) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) """, ( request.UUID, request.email, request.expires, request.tags, request.env, request.affiliate, request.image, request.history, request.comment, request.domains, request.status, request.created, request.bump )) return {"message": "Container updated or created"} @app.post("/container/start", dependencies=[Depends(verify_api_key)]) def start_container(request: ContainerIDRequest): return {"message": run_command([f"{BIN_PATH}/startContainer", request.container_id])} @app.post("/container/stop", dependencies=[Depends(verify_api_key)]) def stop_container(request: ContainerIDRequest): return {"message": run_command([f"{BIN_PATH}/stopContainer", request.container_id])} @app.post("/container/nuke", dependencies=[Depends(verify_api_key)]) def nuke_container(request: ContainerIDRequest): status = execute_db("SELECT status FROM containers WHERE UUID=?", (request.container_id,), fetch=True) if not status or status[0][0] != "nuke": raise HTTPException(400, "Container status is not 'nuke'") return {"message": run_command([f"{BIN_PATH}/nukeContainer", request.container_id])} @app.post("/container/info", dependencies=[Depends(verify_api_key)]) def info_container(request: InfoContainerRequest): if request.container_id: rows = execute_db("SELECT * FROM containers WHERE UUID=?", (request.container_id,), fetch=True) else: rows = execute_db("SELECT * FROM containers", fetch=True) return {"containers": rows} @app.post("/container/bump", dependencies=[Depends(verify_api_key)]) def bump_container(request: ContainerIDRequest): today = datetime.utcnow().strftime("%Y-%m-%d") execute_db("UPDATE containers SET bump=? WHERE UUID=?", (today, request.container_id)) msg = run_command([f"{BIN_PATH}/bumpContainer", request.container_id]) return {"message": msg, "bump_date": today} @app.post("/container/quota", dependencies=[Depends(verify_api_key)]) def container_quota(request: ContainerIDRequest): output = run_command([ "docker", "stats", request.container_id, "--no-stream", "--format", "{{.MemUsage}},{{.BlockIO}}" ]) mem_usage, disk_usage = output.split(",") return {"memory_usage": mem_usage, "disk_io": disk_usage} # ---------------------- SYSTEM ---------------------- @app.get("/system/containers", dependencies=[Depends(verify_api_key)]) def get_containers(): return Response(content=run_command([f"{BIN_PATH}/getContainers"]), media_type="application/json") @app.get("/system/images", dependencies=[Depends(verify_api_key)]) def list_images(): images = run_command(["docker", "images", "--format", "{{.Repository}}:{{.Tag}}"]) return {"images": images.split("\n")} @app.get("/system/info", dependencies=[Depends(verify_api_key)]) def get_system_info(): try: alpine_version = None last_update = None bump_dates = execute_db("SELECT MAX(bump) FROM containers", fetch=True)[0][0] if os.path.exists("/4server/data/update"): with open("/4server/data/update") as f: last_update = f.read().strip() if os.path.exists("/etc/alpine-release"): with open("/etc/alpine-release") as f: alpine_version = f.read().strip() mem = psutil.virtual_memory() disk = psutil.disk_usage("/") cpu_count = psutil.cpu_count(logical=True) return { "alpine_version": alpine_version, "last_update": last_update, "latest_bump": bump_dates, "version": VERSION, "resources": { "memory": {"total": mem.total, "available": mem.available, "used": mem.used}, "disk": {"total": disk.total, "used": disk.used, "free": disk.free}, "cpu_count": cpu_count } } except Exception as e: raise HTTPException(status_code=500, detail=str(e)) @app.post("/system/pull", dependencies=[Depends(verify_api_key)]) def pull_all_images(): return {"message": run_command([f"{BIN_PATH}/pullAllContainers"])} # ---------------------- Entry Point ---------------------- if __name__ == "__main__": print(VERSION) init_db() uvicorn.run(app, host="10.5.0.1", port=8888)