Files
cc/app/sbin/api
Oliver 9726dc0060 certs
2025-08-25 07:10:14 +02:00

235 lines
8.0 KiB
Python
Executable File

#!/usr/bin/env python3
from fastapi import FastAPI, HTTPException, Depends, Response
from fastapi.security.api_key import APIKeyHeader
from fastapi.responses import RedirectResponse
from pydantic import BaseModel
import psutil
import sqlite3
import subprocess
import os
import uvicorn
from typing import Optional
from datetime import datetime
# Constants
DB_PATH = "/4server/data/contracts.db"
BIN_PATH = "/4server/sbin"
API_KEY = os.getenv("API_KEY", "your-secret-api-key")
VERSION = "API: 0.0.6"
# FastAPI app
app = FastAPI()
api_key_header = APIKeyHeader(name="X-API-Key")
def verify_api_key(key: str = Depends(api_key_header)):
if key != API_KEY:
raise HTTPException(status_code=403, detail="Unauthorized")
# ---------------------- Helpers ----------------------
def run_command(cmd: list[str]) -> str:
"""Run a shell command and return stdout or raise HTTPException on error."""
result = subprocess.run(cmd, capture_output=True, text=True)
if result.returncode != 0:
raise HTTPException(status_code=500, detail=result.stderr.strip() or "Unknown error")
return result.stdout.strip()
def init_db():
"""Initialize the database with containers table."""
os.makedirs(os.path.dirname(DB_PATH), exist_ok=True)
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
cursor.execute('''
CREATE TABLE IF NOT EXISTS containers (
ID INTEGER PRIMARY KEY AUTOINCREMENT,
UUID CHAR(50),
email CHAR(100),
expires DATE,
tags TEXT,
env TEXT,
affiliate CHAR(30),
image CHAR(50),
history TEXT,
comment TEXT,
domains TEXT,
status CHAR(20),
created DATE,
bump DATE
)
''')
conn.commit()
conn.close()
def execute_db(query: str, params: tuple = (), fetch: bool = False):
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
cursor.execute(query, params)
conn.commit()
data = cursor.fetchall() if fetch else None
conn.close()
return data
# ---------------------- Models ----------------------
class ContainerModel(BaseModel):
UUID: Optional[str] = None
email: str
expires: str
tags: Optional[str] = None
env: Optional[str] = None
affiliate: Optional[str] = None
image: Optional[str] = None
history: Optional[str] = None
comment: Optional[str] = None
domains: Optional[str] = None
status: str
created: str
bump: Optional[str] = None
class ContainerIDRequest(BaseModel):
container_id: str
class UpdateContainerRequest(ContainerModel):
pass
class InfoContainerRequest(BaseModel):
container_id: Optional[str] = None
# ---------------------- Routes ----------------------
@app.get("/", include_in_schema=False)
def redirect_to_odoo():
return RedirectResponse(url="https://ODOO4PROJECTS.com")
@app.post("/container/update", dependencies=[Depends(verify_api_key)])
def update_container(request: UpdateContainerRequest):
existing = execute_db("SELECT * FROM containers WHERE UUID = ?", (request.UUID,), fetch=True)
if existing:
execute_db("""
UPDATE containers SET email=?, expires=?, tags=?, env=?, affiliate=?, image=?,
history=?, comment=?, domains=?, status=?, created=?, bump=?
WHERE UUID=?
""", (
request.email, request.expires, request.tags, request.env, request.affiliate,
request.image, request.history, request.comment, request.domains, request.status,
request.created, request.bump, request.UUID
))
else:
execute_db("""
INSERT INTO containers (UUID, email, expires, tags, env, affiliate, image, history,
comment, domains, status, created, bump)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""", (
request.UUID, request.email, request.expires, request.tags, request.env,
request.affiliate, request.image, request.history, request.comment,
request.domains, request.status, request.created, request.bump
))
return {"message": "Container updated or created"}
@app.post("/container/start", dependencies=[Depends(verify_api_key)])
def start_container(request: ContainerIDRequest):
return {"message": run_command([f"{BIN_PATH}/startContainer", request.container_id])}
@app.post("/container/stop", dependencies=[Depends(verify_api_key)])
def stop_container(request: ContainerIDRequest):
return {"message": run_command([f"{BIN_PATH}/stopContainer", request.container_id])}
@app.post("/container/nuke", dependencies=[Depends(verify_api_key)])
def nuke_container(request: ContainerIDRequest):
status = execute_db("SELECT status FROM containers WHERE UUID=?", (request.container_id,), fetch=True)
if not status or status[0][0] != "nuke":
raise HTTPException(400, "Container status is not 'nuke'")
return {"message": run_command([f"{BIN_PATH}/nukeContainer", request.container_id])}
@app.post("/container/info", dependencies=[Depends(verify_api_key)])
def info_container(request: InfoContainerRequest):
if request.container_id:
rows = execute_db("SELECT * FROM containers WHERE UUID=?", (request.container_id,), fetch=True)
else:
rows = execute_db("SELECT * FROM containers", fetch=True)
return {"containers": rows}
@app.post("/container/bump", dependencies=[Depends(verify_api_key)])
def bump_container(request: ContainerIDRequest):
today = datetime.utcnow().strftime("%Y-%m-%d")
execute_db("UPDATE containers SET bump=? WHERE UUID=?", (today, request.container_id))
msg = run_command([f"{BIN_PATH}/bumpContainer", request.container_id])
return {"message": msg, "bump_date": today}
@app.post("/container/quota", dependencies=[Depends(verify_api_key)])
def container_quota(request: ContainerIDRequest):
output = run_command([
"docker", "stats", request.container_id, "--no-stream",
"--format", "{{.MemUsage}},{{.BlockIO}}"
])
mem_usage, disk_usage = output.split(",")
return {"memory_usage": mem_usage, "disk_io": disk_usage}
# ---------------------- SYSTEM ----------------------
@app.get("/system/containers", dependencies=[Depends(verify_api_key)])
def get_containers():
return Response(content=run_command([f"{BIN_PATH}/getContainers"]), media_type="application/json")
@app.get("/system/images", dependencies=[Depends(verify_api_key)])
def list_images():
images = run_command(["docker", "images", "--format", "{{.Repository}}:{{.Tag}}"])
return {"images": images.split("\n")}
@app.get("/system/info", dependencies=[Depends(verify_api_key)])
def get_system_info():
try:
alpine_version = None
last_update = None
bump_dates = execute_db("SELECT MAX(bump) FROM containers", fetch=True)[0][0]
if os.path.exists("/4server/data/update"):
with open("/4server/data/update") as f:
last_update = f.read().strip()
if os.path.exists("/etc/alpine-release"):
with open("/etc/alpine-release") as f:
alpine_version = f.read().strip()
mem = psutil.virtual_memory()
disk = psutil.disk_usage("/")
cpu_count = psutil.cpu_count(logical=True)
return {
"alpine_version": alpine_version,
"last_update": last_update,
"latest_bump": bump_dates,
"version": VERSION,
"resources": {
"memory": {"total": mem.total, "available": mem.available, "used": mem.used},
"disk": {"total": disk.total, "used": disk.used, "free": disk.free},
"cpu_count": cpu_count
}
}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.post("/system/pull", dependencies=[Depends(verify_api_key)])
def pull_all_images():
return {"message": run_command([f"{BIN_PATH}/pullAllContainers"])}
# ---------------------- Entry Point ----------------------
if __name__ == "__main__":
print(VERSION)
init_db()
uvicorn.run(app, host="10.5.0.1", port=8888)