dotfiles/duskwm/.config/dusk/status_scripts/duskwm_status.py

111 lines
3 KiB
Python
Executable file

import ast
import logging
import os
import runpy
import subprocess
import sys
import time
from pathlib import Path
from typing import Optional
import psutil
from duskwm_logging import log
# Configs
main_dir = Path(sys.argv[0]).parent.resolve()
debug = ast.literal_eval(os.getenv("STATUS_DEBUG", "False"))
# Scripts by interval
statuses = {
5: [(2, "./scripts/get_volume.py")],
60: [
(0, "./scripts/clock.py"),
(4, "./scripts/get_battery.py"),
(3, "./scripts/get_wifi.py"),
(7, "./scripts/get_uptime.py"),
(1, "./scripts/unread_mail.py"),
],
# 300: [],
3600: [(5, "./scripts/get_publicip.py"), (6, "./scripts/get_weather.py")],
}
def list_statuses():
log.info("Status scripts being processed")
for interval, status in statuses.items():
period = "second" if interval == 1 else "seconds"
log.info(f" Updating every {interval} {period}")
for location, script in status:
log.info(f" {location}: {script}")
def kill_existing_instances():
current_pid = os.getpid()
script_name = "duskwm_status.py"
found_existing = False
for proc in psutil.process_iter(["pid", "name", "cmdline"]):
try:
cmdline = proc.info["cmdline"]
if cmdline and any(script_name in arg for arg in cmdline):
pid = proc.info["pid"]
if pid != current_pid:
os.kill(pid, 9)
log.warning(f"Killing existing instance (PID {pid})")
found_existing = True
except (psutil.NoSuchProcess, psutil.AccessDenied):
continue
if not found_existing:
log.info("First run detected, sleeping to let system initialize...")
time.sleep(10)
def update_status(status: Optional[int], content: Optional[str]):
status = status if status is not None else 99
content = content if content is not None else ""
if debug:
log.setLevel(logging.DEBUG)
log.debug(f"{status:>3} : {content}")
else:
subprocess.run(
[
"duskc",
"--ignore-reply",
"run_command",
"setstatus",
f"{status}",
f"{content}",
]
)
def get_status_data(scriptpath: str) -> Optional[str]:
path = main_dir.joinpath(scriptpath).as_posix()
result = runpy.run_path(path)
content: Optional[str] = result.get("content", None)
return content
# Run at startup
log.info(f"Running in Debug mode: {debug}")
log.info(f"Root script found at: {main_dir}")
kill_existing_instances()
list_statuses()
seconds = 0
while True:
# Run on loop
for interval, scripts in statuses.items():
if seconds % interval == 0:
for status, script in scripts:
# try:
data = get_status_data(script)
# except Exception:
# data = f"Err: {script}"
update_status(status, data)
# Increment count and sleep for loop
seconds += 1
time.sleep(1)