Files
adguard-dns-tools/main.py
2025-11-06 23:30:13 +01:00

383 lines
13 KiB
Python

import requests
import json
import os
import argparse
import sys
from dotenv import load_dotenv
# Check if environment variables are already set (e.g., in CI/CD)
# If not, load them from .env file
required_vars = ["ADGUARD_URL", "ADGUARD_USER", "ADGUARD_PASSWORD", "JSON_FILE"]
env_vars_already_set = all(os.getenv(var) for var in required_vars)
if env_vars_already_set:
print("Using environment variables from system/CI/CD")
else:
# Try to load from .env file
if os.path.exists(".env"):
print("Loading environment variables from .env file")
load_dotenv()
else:
print("No .env file found, expecting environment variables to be set")
# Load environment variables (validation happens later)
url = None
username = None
password = None
json_file = None
session = requests.Session()
def validate_environment():
"""Validate required environment variables."""
global url, username, password, json_file
url = os.getenv("ADGUARD_URL")
username = os.getenv("ADGUARD_USER")
password = os.getenv("ADGUARD_PASSWORD")
json_file = os.getenv("JSON_FILE")
errors = []
if not url:
errors.append("ADGUARD_URL environment variable is not set")
if not username:
errors.append("ADGUARD_USER environment variable is not set")
if not password:
errors.append("ADGUARD_PASSWORD environment variable is not set")
if not json_file:
errors.append("JSON_FILE environment variable is not set")
if errors:
print("ERROR: Missing required environment variables:")
for error in errors:
print(f" - {error}")
print("\nPlease set these in your .env file or environment.")
sys.exit(1)
print(f"AdGuard URL: {url}")
print(f"Username: {username}")
print(f"JSON File: {json_file}")
def validate_json_structure(data):
"""Validate the JSON structure and content."""
if not isinstance(data, dict):
raise ValueError("JSON root must be an object/dictionary")
if "rewrites" not in data:
raise ValueError("JSON must contain a 'rewrites' key")
rewrites = data["rewrites"]
if not isinstance(rewrites, list):
raise ValueError("'rewrites' must be an array/list")
if len(rewrites) == 0:
print("WARNING: 'rewrites' array is empty")
return True
for idx, entry in enumerate(rewrites):
if not isinstance(entry, dict):
raise ValueError(f"Entry at index {idx} is not an object/dictionary")
if "domain" not in entry:
raise ValueError(f"Entry at index {idx} is missing 'domain' field")
if "answer" not in entry:
raise ValueError(f"Entry at index {idx} is missing 'answer' field")
if not isinstance(entry["domain"], str) or not entry["domain"].strip():
raise ValueError(f"Entry at index {idx} has invalid 'domain' field (must be non-empty string)")
if not isinstance(entry["answer"], str) or not entry["answer"].strip():
raise ValueError(f"Entry at index {idx} has invalid 'answer' field (must be non-empty string)")
print(f"JSON validation successful: {len(rewrites)} entries found")
return True
def login():
"""Login to AdGuard API."""
try:
r = session.post(f"{url}/control/login", json={"name": username, "password": password}, timeout=10)
if r.status_code != 200:
print(f"ERROR: Login failed with status {r.status_code}")
print(f"Response: {r.text}")
sys.exit(1)
print("Login erfolgreich.")
except requests.exceptions.Timeout:
print("ERROR: Login request timed out")
sys.exit(1)
except requests.exceptions.ConnectionError:
print(f"ERROR: Could not connect to AdGuard at {url}")
sys.exit(1)
except requests.exceptions.RequestException as e:
print(f"ERROR: Login request failed: {e}")
sys.exit(1)
def load_json():
"""Load and validate JSON file."""
if not os.path.exists(json_file):
print(f"ERROR: JSON file not found: {json_file}")
print("Please create the file or run with --pull to fetch from AdGuard")
sys.exit(1)
try:
with open(json_file, "r") as f:
data = json.load(f)
except json.JSONDecodeError as e:
print(f"ERROR: Invalid JSON in file {json_file}")
print(f"JSON Error: {e}")
sys.exit(1)
except PermissionError:
print(f"ERROR: Permission denied reading file: {json_file}")
sys.exit(1)
except Exception as e:
print(f"ERROR: Could not read file {json_file}: {e}")
sys.exit(1)
# Validate JSON structure
try:
validate_json_structure(data)
except ValueError as e:
print(f"ERROR: JSON validation failed: {e}")
sys.exit(1)
return data["rewrites"]
def save_json(data):
"""Save rewrites to JSON file."""
try:
# Create backup if file exists
if os.path.exists(json_file):
backup_file = f"{json_file}.backup"
try:
with open(json_file, "r") as src, open(backup_file, "w") as dst:
dst.write(src.read())
print(f"Created backup: {backup_file}")
except Exception as e:
print(f"WARNING: Could not create backup: {e}")
with open(json_file, "w") as f:
json.dump(
{"rewrites": data},
f,
indent=2
)
except PermissionError:
print(f"ERROR: Permission denied writing to file: {json_file}")
sys.exit(1)
except Exception as e:
print(f"ERROR: Could not write to file {json_file}: {e}")
sys.exit(1)
def get_rewrites():
"""Get current rewrites from AdGuard."""
try:
r = session.get(f"{url}/control/rewrite/list", timeout=10)
r.raise_for_status()
data = r.json()
if not isinstance(data, list):
print("ERROR: AdGuard API returned unexpected data format")
sys.exit(1)
return data
except requests.exceptions.Timeout:
print("ERROR: Request to get rewrites timed out")
sys.exit(1)
except requests.exceptions.ConnectionError:
print(f"ERROR: Could not connect to AdGuard at {url}")
sys.exit(1)
except requests.exceptions.HTTPError as e:
print(f"ERROR: HTTP error while getting rewrites: {e}")
sys.exit(1)
except requests.exceptions.RequestException as e:
print(f"ERROR: Request failed while getting rewrites: {e}")
sys.exit(1)
except json.JSONDecodeError:
print("ERROR: Invalid JSON response from AdGuard")
sys.exit(1)
def add_rewrite(entry):
"""Add a rewrite entry to AdGuard."""
try:
r = session.post(f"{url}/control/rewrite/add", json=entry, timeout=10)
r.raise_for_status()
print(f"Hinzugefügt: {entry}")
return True
except requests.exceptions.Timeout:
print(f"ERROR: Timeout while adding rewrite: {entry}")
return False
except requests.exceptions.HTTPError as e:
print(f"ERROR: Failed to add rewrite {entry}: {e}")
print(f"Response: {e.response.text if e.response else 'No response'}")
return False
except requests.exceptions.RequestException as e:
print(f"ERROR: Request failed while adding rewrite {entry}: {e}")
return False
def delete_rewrite(entry):
"""Delete a rewrite entry from AdGuard."""
try:
r = session.post(f"{url}/control/rewrite/delete", json=entry, timeout=10)
r.raise_for_status()
print(f"Gelöscht: {entry}")
return True
except requests.exceptions.Timeout:
print(f"ERROR: Timeout while deleting rewrite: {entry}")
return False
except requests.exceptions.HTTPError as e:
print(f"ERROR: Failed to delete rewrite {entry}: {e}")
print(f"Response: {e.response.text if e.response else 'No response'}")
return False
except requests.exceptions.RequestException as e:
print(f"ERROR: Request failed while deleting rewrite {entry}: {e}")
return False
def update_rewrite(old_entry, new_entry):
"""Update a rewrite entry in AdGuard."""
try:
r = session.put(f"{url}/control/rewrite/update", json={"target": old_entry, "update": new_entry}, timeout=10)
r.raise_for_status()
print(f"Aktualisiert: {old_entry} -> {new_entry}")
return True
except requests.exceptions.Timeout:
print(f"ERROR: Timeout while updating rewrite: {old_entry}")
return False
except requests.exceptions.HTTPError as e:
print(f"ERROR: Failed to update rewrite {old_entry}: {e}")
print(f"Response: {e.response.text if e.response else 'No response'}")
return False
except requests.exceptions.RequestException as e:
print(f"ERROR: Request failed while updating rewrite {old_entry}: {e}")
return False
def sync_to_adguard(desired):
"""Sync desired rewrites to AdGuard."""
print("Fetching current rewrites from AdGuard...")
existing = get_rewrites()
existing_map = {r["domain"]: r["answer"] for r in existing}
desired_map = {r["domain"]: r["answer"] for r in desired}
failed_operations = []
# Änderungen oder Neueinträge
updates_count = 0
additions_count = 0
for domain, ip in desired_map.items():
if domain in existing_map:
if existing_map[domain] != ip:
if not update_rewrite(
{"domain": domain, "answer": existing_map[domain]},
{"domain": domain, "answer": ip}
):
failed_operations.append(f"Update {domain}")
else:
updates_count += 1
else:
if not add_rewrite({"domain": domain, "answer": ip}):
failed_operations.append(f"Add {domain}")
else:
additions_count += 1
# Entferne veraltete
deletions_count = 0
for domain, ip in existing_map.items():
if domain not in desired_map:
if not delete_rewrite({"domain": domain, "answer": ip}):
failed_operations.append(f"Delete {domain}")
else:
deletions_count += 1
# Summary
print(f"\nSync Summary:")
print(f" Added: {additions_count}")
print(f" Updated: {updates_count}")
print(f" Deleted: {deletions_count}")
if failed_operations:
print(f"\nWARNING: {len(failed_operations)} operation(s) failed:")
for op in failed_operations:
print(f" - {op}")
return False
return True
def pull_from_adguard():
"""Pull current rewrites from AdGuard and save to JSON file."""
print("Pulling current rewrites from AdGuard...")
current = get_rewrites()
save_json(current)
print(f"Successfully saved {len(current)} rewrites to {json_file}")
def main():
parser = argparse.ArgumentParser(
description="AdGuard DNS Rewrite Management Tool",
epilog="The rewrites.json file is the source of truth for --sync operations."
)
parser.add_argument(
"--sync",
action="store_true",
help="Sync rewrites from JSON file to AdGuard (add, update, delete to match JSON)"
)
parser.add_argument(
"--pull",
action="store_true",
help="Pull current rewrites from AdGuard and update the local JSON file"
)
try:
args = parser.parse_args()
except SystemExit:
raise
# If no arguments provided, show error and help
if not args.sync and not args.pull:
parser.error("No action specified. Use --sync or --pull.")
# Both options cannot be used together
if args.sync and args.pull:
parser.error("Cannot use --sync and --pull together. Choose one.")
# Validate environment variables after parsing args
validate_environment()
try:
# Login to AdGuard
print("\n=== Logging in to AdGuard ===")
login()
if args.sync:
# Sync from JSON to AdGuard (JSON is source of truth)
print("\n=== Starting sync from JSON to AdGuard ===")
desired = load_json()
if len(desired) == 0:
print("WARNING: No rewrites found in JSON file. Nothing to sync.")
sys.exit(0)
success = sync_to_adguard(desired)
if success:
print("\n✓ Sync completed successfully.")
sys.exit(0)
else:
print("\n✗ Sync completed with errors.")
sys.exit(1)
elif args.pull:
# Pull from AdGuard to JSON (overwrite local JSON)
print("\n=== Pulling from AdGuard to JSON ===")
pull_from_adguard()
print("\n✓ Pull completed successfully.")
sys.exit(0)
except KeyboardInterrupt:
print("\n\nOperation cancelled by user.")
sys.exit(130)
except Exception as e:
print(f"\nUnexpected error: {e}")
import traceback
traceback.print_exc()
sys.exit(1)
if __name__ == "__main__":
main()