check_syncthing/check_syncthing

218 lines
7.1 KiB
Python
Executable file

#!/usr/bin/python3
"""Monitoring plugin to check Syncthing status"""
import argparse
import datetime
import json
import logging
import sys
import dateutil.parser
import requests
parser = argparse.ArgumentParser(
description=__doc__, formatter_class=argparse.ArgumentDefaultsHelpFormatter
)
parser.add_argument("-d", "--debug", action="store_true", help="Enable debug mode")
parser.add_argument("-v", "--verbose", action="store_true", help="Enable verbose mode")
parser.add_argument("-H", "--host", help="Syncthing host", default="127.0.0.1")
parser.add_argument("-p", "--port", type=int, help="Syncthing port", default=8384)
parser.add_argument("-k", "--api-key", help="Syncthing REST API key", required=True)
parser.add_argument("-S", "--ssl", action="store_true", help="Enable SSL")
parser.add_argument("-t", "--timeout", type=int, help="Requests timeout (in seconds)", default=5)
parser.add_argument(
"-D", "--devices", action="append", help="Monitor only specified devices", default=[]
)
parser.add_argument(
"-x", "--excluded-devices", action="append", help="Do not monitor specified devices", default=[]
)
parser.add_argument(
"-F", "--folders", action="append", help="Monitor only specified folders", default=[]
)
parser.add_argument(
"-X", "--excluded-folders", action="append", help="Do not monitor specified folders", default=[]
)
parser.add_argument(
"-w",
"--warning-device-last-seen",
type=int,
help="Warning threshold for device last seen time in hours",
default=12,
)
parser.add_argument(
"-c",
"--critical-device-last-seen",
type=int,
help="Critical threshold for device last seen time in hours",
default=24,
)
args = parser.parse_args()
logformat = f"%(asctime)s - {sys.argv[0]} - %(levelname)s - %(message)s"
if args.debug:
loglevel = logging.DEBUG
elif args.verbose:
loglevel = logging.INFO
else:
loglevel = logging.WARNING
logging.basicConfig(level=loglevel, format=logformat)
def call_api(uri, method=None, data=None, headers=None, what=None):
"""Call Syncthing REST API"""
method = method.lower() if method else "get"
headers = headers if headers else {}
headers["X-API-Key"] = args.api_key
try:
r = getattr(requests, method)(
f"http{'s' if args.ssl else ''}://{args.host}:{args.port}/rest/{uri}",
data=json.dumps(data) if data else None,
headers=headers,
timeout=args.timeout,
)
r.raise_for_status()
except Exception: # pylint: disable=broad-exception-caught
print(f"UNKNOWN - Fail to retrieve {what if what else 'data'} from Syncthing REST API")
if data:
print(f"Request data: {json.dumps(data, indent=2)}")
logging.exception("Error on requesting %s %s", method, uri)
sys.exit(3)
try:
return r.json()
except requests.exceptions.JSONDecodeError:
print(
"UNKNOWN - Fail to decode Syncthing REST API data on reqesting "
f"{what if what else 'data'}"
)
print("Raw API return:")
print(r.text)
sys.exit(3)
config = call_api("config", what="configuration")
my_dev_id = config["defaults"]["folder"]["devices"][0]["deviceID"]
logging.debug("Config: %s", json.dumps(config, indent=2))
devices = {
dev["deviceID"]: dev["name"]
for dev in config["devices"]
if (
not dev["paused"]
and dev["deviceID"] != my_dev_id
and (not args.devices or dev["name"] in args.devices)
and dev["name"] not in args.excluded_devices
)
}
logging.debug("Devices from config: %s", json.dumps(devices, indent=2))
if args.devices:
unknown_devices = set(args.devices) - set(devices.values())
if unknown_devices:
print(f"UNKNOWN - Devices(s) are unknowns: {', '.join(unknown_devices)}")
sys.exit(3)
devices_stats = call_api("stats/device", what="devices stats")
logging.debug("Devices stats: %s", json.dumps(devices_stats, indent=2))
devices_last_seen = {
dev_id: dateutil.parser.isoparse(info["lastSeen"])
for dev_id, info in devices_stats.items()
if info["lastConnectionDurationS"] > 0
}
logging.debug("Device last seen: %s", json.dumps(devices_last_seen, indent=2, default=str))
errors = []
error_level = "OK"
exit_status = {
"OK": 0,
"WARNING": 1,
"CRITICAL": 2,
"UNKNOWN": 3,
}
extra_lines = []
messages = []
def add_error(level, msg):
"""Add error"""
global error_level # pylint: disable=global-statement
error_level = level if exit_status[level] > exit_status[error_level] else error_level
errors.append(msg)
warning_device_last_seen = datetime.timedelta(hours=args.warning_device_last_seen)
critical_device_last_seen = datetime.timedelta(hours=args.critical_device_last_seen)
now = datetime.datetime.now().replace(tzinfo=dateutil.tz.tzlocal())
extra_lines.append("Remote devices:")
for dev_id, dev_name in devices.items():
if args.devices and dev_name not in args.devices:
continue
last_seen = devices_last_seen.get(dev_id)
if not last_seen:
add_error("WARNING", f"Never seen device {dev_name}")
extra_lines.append(f"- {dev_name}: Never seen")
continue
delta = now - devices_last_seen.get(dev_id)
delta_str = str(delta).split(".", maxsplit=1)[0]
extra_lines.append(f"- {dev_name}: last seen on {last_seen} ({delta_str})")
if delta < warning_device_last_seen:
continue
error = f"Device {dev_name} not seen since {delta_str}"
if delta >= critical_device_last_seen:
add_error("CRITICAL", error)
elif delta >= warning_device_last_seen:
add_error("WARNING", error)
extra_lines.append("Folders:")
for folder in config["folders"]:
share_with = [
devices[dev["deviceID"]] for dev in folder["devices"] if dev["deviceID"] in devices
]
if not share_with:
if args.devices or args.excluded_devices:
continue
share_with = ["no device"]
extra_lines.append(
f"- {folder['label']} ({folder['path']}, share with {', '.join(share_with)})"
)
folder_errors = call_api(
f"folder/errors?folder={folder['id']}", what=f"folder '{folder['label']}' errors"
)
if folder_errors["errors"]:
add_error(
"WARNING",
f"{len(folder_errors['errors'])} errors detected on folder '{folder['label']}'",
)
extra_lines.append(f"Folder '{folder['label']}' errors:")
extra_lines.extend(
[f"{error['path']}: {error['error']}" for error in folder_errors["errors"]]
)
system_errors = call_api("system/error", what="system errors")
logging.debug("System errors: %s", json.dumps(system_errors, indent=2))
if system_errors["errors"]:
add_error("WARNING", f"{len(system_errors)} system errors detected")
extra_lines.extend(
[f"{error['when']}: {error['message']}" for error in system_errors["errors"]]
)
status = [error_level]
if errors:
status.append(", ".join(errors))
else:
status.append(
f"All {len(devices)} seen since less than {args.warning_device_last_seen}, "
"no error detected"
)
print(" - ".join(status))
print("\n".join(extra_lines))
sys.exit(exit_status[error_level])
# vim: tabstop=4 shiftwidth=4 softtabstop=4 expandtab