130 lines
3.7 KiB
Python
Executable file
130 lines
3.7 KiB
Python
Executable file
import ast
|
|
import logging
|
|
import os
|
|
import runpy
|
|
import subprocess
|
|
import sys
|
|
import time
|
|
from pathlib import Path
|
|
from typing import Optional
|
|
|
|
import psutil
|
|
|
|
from duskwm_logging import log
|
|
|
|
# Configs
|
|
main_dir = Path(sys.argv[0]).parent.resolve()
|
|
debug = ast.literal_eval(os.getenv("STATUS_DEBUG", "False"))
|
|
|
|
# Scripts by interval
|
|
statuses = {
|
|
5: [(2, "./scripts/get_volume.py")],
|
|
60: [
|
|
(0, "./scripts/clock.py"),
|
|
(4, "./scripts/get_battery.py"),
|
|
(3, "./scripts/get_wifi.py"),
|
|
(7, "./scripts/get_uptime.py"),
|
|
(1, "./scripts/unread_mail.py"),
|
|
],
|
|
# 300: [],
|
|
3600: [(5, "./scripts/get_publicip.py"), (6, "./scripts/get_weather.py")],
|
|
}
|
|
|
|
|
|
def list_statuses():
|
|
log.info("Status scripts being processed")
|
|
for interval, status in statuses.items():
|
|
period = "second" if interval == 1 else "seconds"
|
|
log.info(f" Updating every {interval} {period}")
|
|
for location, script in status:
|
|
log.info(f" {location}: {script}")
|
|
|
|
|
|
def kill_existing_instances():
|
|
# Create a temporary lock to ensure firstrun can run
|
|
lock_path = "/run/user/1000/duskwm_status.lock"
|
|
try:
|
|
with open(lock_path, "x") as lock_file:
|
|
lock_file.write("Running")
|
|
except FileExistsError:
|
|
sys.exit(0)
|
|
|
|
current_pid = os.getpid()
|
|
found_existing = False
|
|
|
|
for proc in psutil.process_iter(["pid", "name", "cmdline"]):
|
|
try:
|
|
cmdline = proc.info["cmdline"] or []
|
|
cmdline_str = " ".join(cmdline)
|
|
|
|
# Kill uv processes running our script
|
|
if "uv run" in cmdline_str and "duskwm_status.py" in cmdline_str:
|
|
pid = proc.info["pid"]
|
|
if pid != current_pid:
|
|
os.kill(pid, 9)
|
|
log.warning(f"Killing uv process (PID {pid})")
|
|
|
|
# Kill python processes running our script
|
|
elif "python" in proc.info["name"] and "duskwm_status.py" in cmdline_str:
|
|
pid = proc.info["pid"]
|
|
if pid != current_pid:
|
|
os.kill(pid, 9)
|
|
log.warning(f"Killing python process (PID {pid})")
|
|
found_existing = True
|
|
except (psutil.NoSuchProcess, psutil.AccessDenied):
|
|
continue
|
|
|
|
if not found_existing:
|
|
log.info("First run detected, sleeping to let system initialize...")
|
|
time.sleep(10)
|
|
|
|
os.remove(lock_path)
|
|
|
|
|
|
def update_status(status: Optional[int], content: Optional[str]):
|
|
status = status if status is not None else 99
|
|
content = content if content is not None else ""
|
|
if debug:
|
|
log.setLevel(logging.DEBUG)
|
|
log.debug(f"{status:>3} : {content}")
|
|
else:
|
|
subprocess.run(
|
|
[
|
|
"duskc",
|
|
"--ignore-reply",
|
|
"run_command",
|
|
"setstatus",
|
|
f"{status}",
|
|
f"{content}",
|
|
]
|
|
)
|
|
|
|
|
|
def get_status_data(scriptpath: str) -> Optional[str]:
|
|
path = main_dir.joinpath(scriptpath).as_posix()
|
|
result = runpy.run_path(path)
|
|
content: Optional[str] = result.get("content", None)
|
|
return content
|
|
|
|
|
|
# Run at startup
|
|
log.info(f"Running in Debug mode: {debug}")
|
|
log.info(f"Root script found at: {main_dir}")
|
|
kill_existing_instances()
|
|
list_statuses()
|
|
|
|
seconds = 0
|
|
while True:
|
|
# Run on loop
|
|
for interval, scripts in statuses.items():
|
|
if seconds % interval == 0:
|
|
for status, script in scripts:
|
|
# try:
|
|
data = get_status_data(script)
|
|
# except Exception:
|
|
# data = f"Err: {script}"
|
|
update_status(status, data)
|
|
|
|
# Increment count and sleep for loop
|
|
seconds += 1
|
|
time.sleep(1)
|