import requests import json import os import argparse import sys from dotenv import load_dotenv # Check if environment variables are already set (e.g., in CI/CD) # If not, load them from .env file required_vars = ["ADGUARD_URL", "ADGUARD_USER", "ADGUARD_PASSWORD", "JSON_FILE"] env_vars_already_set = all(os.getenv(var) for var in required_vars) if env_vars_already_set: print("Using environment variables from system/CI/CD") else: # Try to load from .env file if os.path.exists(".env"): print("Loading environment variables from .env file") load_dotenv() else: print("No .env file found, expecting environment variables to be set") # Load environment variables (validation happens later) url = None username = None password = None json_file = None session = requests.Session() def validate_environment(): """Validate required environment variables.""" global url, username, password, json_file url = os.getenv("ADGUARD_URL") username = os.getenv("ADGUARD_USER") password = os.getenv("ADGUARD_PASSWORD") json_file = os.getenv("JSON_FILE") errors = [] if not url: errors.append("ADGUARD_URL environment variable is not set") if not username: errors.append("ADGUARD_USER environment variable is not set") if not password: errors.append("ADGUARD_PASSWORD environment variable is not set") if not json_file: errors.append("JSON_FILE environment variable is not set") if errors: print("ERROR: Missing required environment variables:") for error in errors: print(f" - {error}") print("\nPlease set these in your .env file or environment.") sys.exit(1) print(f"AdGuard URL: {url}") print(f"Username: {username}") print(f"JSON File: {json_file}") def validate_json_structure(data): """Validate the JSON structure and content.""" if not isinstance(data, dict): raise ValueError("JSON root must be an object/dictionary") if "rewrites" not in data: raise ValueError("JSON must contain a 'rewrites' key") rewrites = data["rewrites"] if not isinstance(rewrites, list): raise ValueError("'rewrites' must be an array/list") if len(rewrites) == 0: print("WARNING: 'rewrites' array is empty") return True for idx, entry in enumerate(rewrites): if not isinstance(entry, dict): raise ValueError(f"Entry at index {idx} is not an object/dictionary") if "domain" not in entry: raise ValueError(f"Entry at index {idx} is missing 'domain' field") if "answer" not in entry: raise ValueError(f"Entry at index {idx} is missing 'answer' field") if not isinstance(entry["domain"], str) or not entry["domain"].strip(): raise ValueError(f"Entry at index {idx} has invalid 'domain' field (must be non-empty string)") if not isinstance(entry["answer"], str) or not entry["answer"].strip(): raise ValueError(f"Entry at index {idx} has invalid 'answer' field (must be non-empty string)") print(f"JSON validation successful: {len(rewrites)} entries found") return True def login(): """Login to AdGuard API.""" try: r = session.post(f"{url}/control/login", json={"name": username, "password": password}, timeout=10) if r.status_code != 200: print(f"ERROR: Login failed with status {r.status_code}") print(f"Response: {r.text}") sys.exit(1) print("Login erfolgreich.") except requests.exceptions.Timeout: print("ERROR: Login request timed out") sys.exit(1) except requests.exceptions.ConnectionError: print(f"ERROR: Could not connect to AdGuard at {url}") sys.exit(1) except requests.exceptions.RequestException as e: print(f"ERROR: Login request failed: {e}") sys.exit(1) def load_json(): """Load and validate JSON file.""" if not os.path.exists(json_file): print(f"ERROR: JSON file not found: {json_file}") print("Please create the file or run with --pull to fetch from AdGuard") sys.exit(1) try: with open(json_file, "r") as f: data = json.load(f) except json.JSONDecodeError as e: print(f"ERROR: Invalid JSON in file {json_file}") print(f"JSON Error: {e}") sys.exit(1) except PermissionError: print(f"ERROR: Permission denied reading file: {json_file}") sys.exit(1) except Exception as e: print(f"ERROR: Could not read file {json_file}: {e}") sys.exit(1) # Validate JSON structure try: validate_json_structure(data) except ValueError as e: print(f"ERROR: JSON validation failed: {e}") sys.exit(1) return data["rewrites"] def save_json(data): """Save rewrites to JSON file.""" try: # Create backup if file exists if os.path.exists(json_file): backup_file = f"{json_file}.backup" try: with open(json_file, "r") as src, open(backup_file, "w") as dst: dst.write(src.read()) print(f"Created backup: {backup_file}") except Exception as e: print(f"WARNING: Could not create backup: {e}") with open(json_file, "w") as f: json.dump( {"rewrites": data}, f, indent=2 ) except PermissionError: print(f"ERROR: Permission denied writing to file: {json_file}") sys.exit(1) except Exception as e: print(f"ERROR: Could not write to file {json_file}: {e}") sys.exit(1) def get_rewrites(): """Get current rewrites from AdGuard.""" try: r = session.get(f"{url}/control/rewrite/list", timeout=10) r.raise_for_status() data = r.json() if not isinstance(data, list): print("ERROR: AdGuard API returned unexpected data format") sys.exit(1) return data except requests.exceptions.Timeout: print("ERROR: Request to get rewrites timed out") sys.exit(1) except requests.exceptions.ConnectionError: print(f"ERROR: Could not connect to AdGuard at {url}") sys.exit(1) except requests.exceptions.HTTPError as e: print(f"ERROR: HTTP error while getting rewrites: {e}") sys.exit(1) except requests.exceptions.RequestException as e: print(f"ERROR: Request failed while getting rewrites: {e}") sys.exit(1) except json.JSONDecodeError: print("ERROR: Invalid JSON response from AdGuard") sys.exit(1) def add_rewrite(entry): """Add a rewrite entry to AdGuard.""" try: r = session.post(f"{url}/control/rewrite/add", json=entry, timeout=10) r.raise_for_status() print(f"Hinzugefügt: {entry}") return True except requests.exceptions.Timeout: print(f"ERROR: Timeout while adding rewrite: {entry}") return False except requests.exceptions.HTTPError as e: print(f"ERROR: Failed to add rewrite {entry}: {e}") print(f"Response: {e.response.text if e.response else 'No response'}") return False except requests.exceptions.RequestException as e: print(f"ERROR: Request failed while adding rewrite {entry}: {e}") return False def delete_rewrite(entry): """Delete a rewrite entry from AdGuard.""" try: r = session.post(f"{url}/control/rewrite/delete", json=entry, timeout=10) r.raise_for_status() print(f"Gelöscht: {entry}") return True except requests.exceptions.Timeout: print(f"ERROR: Timeout while deleting rewrite: {entry}") return False except requests.exceptions.HTTPError as e: print(f"ERROR: Failed to delete rewrite {entry}: {e}") print(f"Response: {e.response.text if e.response else 'No response'}") return False except requests.exceptions.RequestException as e: print(f"ERROR: Request failed while deleting rewrite {entry}: {e}") return False def update_rewrite(old_entry, new_entry): """Update a rewrite entry in AdGuard.""" try: r = session.put(f"{url}/control/rewrite/update", json={"target": old_entry, "update": new_entry}, timeout=10) r.raise_for_status() print(f"Aktualisiert: {old_entry} -> {new_entry}") return True except requests.exceptions.Timeout: print(f"ERROR: Timeout while updating rewrite: {old_entry}") return False except requests.exceptions.HTTPError as e: print(f"ERROR: Failed to update rewrite {old_entry}: {e}") print(f"Response: {e.response.text if e.response else 'No response'}") return False except requests.exceptions.RequestException as e: print(f"ERROR: Request failed while updating rewrite {old_entry}: {e}") return False def sync_to_adguard(desired): """Sync desired rewrites to AdGuard.""" print("Fetching current rewrites from AdGuard...") existing = get_rewrites() existing_map = {r["domain"]: r["answer"] for r in existing} desired_map = {r["domain"]: r["answer"] for r in desired} failed_operations = [] # Änderungen oder Neueinträge updates_count = 0 additions_count = 0 for domain, ip in desired_map.items(): if domain in existing_map: if existing_map[domain] != ip: if not update_rewrite( {"domain": domain, "answer": existing_map[domain]}, {"domain": domain, "answer": ip} ): failed_operations.append(f"Update {domain}") else: updates_count += 1 else: if not add_rewrite({"domain": domain, "answer": ip}): failed_operations.append(f"Add {domain}") else: additions_count += 1 # Entferne veraltete deletions_count = 0 for domain, ip in existing_map.items(): if domain not in desired_map: if not delete_rewrite({"domain": domain, "answer": ip}): failed_operations.append(f"Delete {domain}") else: deletions_count += 1 # Summary print(f"\nSync Summary:") print(f" Added: {additions_count}") print(f" Updated: {updates_count}") print(f" Deleted: {deletions_count}") if failed_operations: print(f"\nWARNING: {len(failed_operations)} operation(s) failed:") for op in failed_operations: print(f" - {op}") return False return True def pull_from_adguard(): """Pull current rewrites from AdGuard and save to JSON file.""" print("Pulling current rewrites from AdGuard...") current = get_rewrites() save_json(current) print(f"Successfully saved {len(current)} rewrites to {json_file}") def main(): parser = argparse.ArgumentParser( description="AdGuard DNS Rewrite Management Tool", epilog="The rewrites.json file is the source of truth for --sync operations." ) parser.add_argument( "--sync", action="store_true", help="Sync rewrites from JSON file to AdGuard (add, update, delete to match JSON)" ) parser.add_argument( "--pull", action="store_true", help="Pull current rewrites from AdGuard and update the local JSON file" ) try: args = parser.parse_args() except SystemExit: raise # If no arguments provided, show error and help if not args.sync and not args.pull: parser.error("No action specified. Use --sync or --pull.") # Both options cannot be used together if args.sync and args.pull: parser.error("Cannot use --sync and --pull together. Choose one.") # Validate environment variables after parsing args validate_environment() try: # Login to AdGuard print("\n=== Logging in to AdGuard ===") login() if args.sync: # Sync from JSON to AdGuard (JSON is source of truth) print("\n=== Starting sync from JSON to AdGuard ===") desired = load_json() if len(desired) == 0: print("WARNING: No rewrites found in JSON file. Nothing to sync.") sys.exit(0) success = sync_to_adguard(desired) if success: print("\n✓ Sync completed successfully.") sys.exit(0) else: print("\n✗ Sync completed with errors.") sys.exit(1) elif args.pull: # Pull from AdGuard to JSON (overwrite local JSON) print("\n=== Pulling from AdGuard to JSON ===") pull_from_adguard() print("\n✓ Pull completed successfully.") sys.exit(0) except KeyboardInterrupt: print("\n\nOperation cancelled by user.") sys.exit(130) except Exception as e: print(f"\nUnexpected error: {e}") import traceback traceback.print_exc() sys.exit(1) if __name__ == "__main__": main()