97 lines
2.6 KiB
Python
97 lines
2.6 KiB
Python
|
#!/usr/bin/python
|
||
|
import os
|
||
|
import runpy
|
||
|
import subprocess
|
||
|
import sys
|
||
|
import time
|
||
|
from distutils.util import strtobool
|
||
|
from pathlib import Path
|
||
|
from typing import Optional
|
||
|
|
||
|
import psutil
|
||
|
|
||
|
# Configs
|
||
|
main_dir = Path(sys.argv[0]).parent.resolve()
|
||
|
debug = strtobool(os.getenv("STATUS_DEBUG", "False"))
|
||
|
|
||
|
# Scripts by interval
|
||
|
statuses = {
|
||
|
5: [(2, "./scripts/get_volume.py")],
|
||
|
60: [
|
||
|
(0, "./scripts/clock.py"),
|
||
|
(4, "./scripts/get_battery.py"),
|
||
|
(3, "./scripts/get_wifi.py"),
|
||
|
(7, "./scripts/get_uptime.py"),
|
||
|
(1, "./scripts/unread_mail.py"),
|
||
|
],
|
||
|
# 300: [],
|
||
|
3600: [(5, "./scripts/get_publicip.py"), (6, "./scripts/get_weather.py")],
|
||
|
}
|
||
|
|
||
|
|
||
|
def list_statuses():
|
||
|
print("Status scripts being processed")
|
||
|
for interval, status in statuses.items():
|
||
|
period = "second" if interval == 1 else "seconds"
|
||
|
print(f" Updating every {interval} {period}")
|
||
|
for location, script in status:
|
||
|
print(f" {location}: {script}")
|
||
|
|
||
|
|
||
|
def kill_existing_instances():
|
||
|
current_pid = os.getpid()
|
||
|
for proc in psutil.process_iter(["pid", "name"]):
|
||
|
if proc.info["name"].startswith("duskwm_status"):
|
||
|
pid = proc.info["pid"]
|
||
|
if pid != current_pid:
|
||
|
os.kill(pid, 9)
|
||
|
print(f"Killing existing instance (PID {pid})")
|
||
|
|
||
|
|
||
|
def update_status(status: Optional[int], content: Optional[str]):
|
||
|
status = status if status is not None else 99
|
||
|
content = content if content is not None else ""
|
||
|
if debug:
|
||
|
print(f"{status:>3} : {content}")
|
||
|
else:
|
||
|
subprocess.run(
|
||
|
[
|
||
|
"duskc",
|
||
|
"--ignore-reply",
|
||
|
"run_command",
|
||
|
"setstatus",
|
||
|
f"{status}",
|
||
|
f"{content}",
|
||
|
]
|
||
|
)
|
||
|
|
||
|
|
||
|
def get_status_data(scriptpath: str) -> Optional[str]:
|
||
|
path = main_dir.joinpath(scriptpath).as_posix()
|
||
|
result = runpy.run_path(path)
|
||
|
content: Optional[str] = result.get("content", None)
|
||
|
return content
|
||
|
|
||
|
|
||
|
# Run at startup
|
||
|
print(f"Running in Debug mode: {debug}")
|
||
|
print(f"Root script found at: {main_dir}")
|
||
|
kill_existing_instances()
|
||
|
list_statuses()
|
||
|
|
||
|
seconds = 0
|
||
|
while True:
|
||
|
# Run on loop
|
||
|
for interval, scripts in statuses.items():
|
||
|
if seconds % interval == 0:
|
||
|
for status, script in scripts:
|
||
|
# try:
|
||
|
data = get_status_data(script)
|
||
|
# except Exception:
|
||
|
# data = f"Err: {script}"
|
||
|
update_status(status, data)
|
||
|
|
||
|
# Increment count and sleep for loop
|
||
|
seconds += 1
|
||
|
time.sleep(1)
|