fabriziosalmi/blacklists

v2

fabriziosalmi opened this issue · 6 comments

sanitize.py

import re
import tldextract
from tqdm import tqdm

# Pre-compiled regex pattern for FQDN validation
fqdn_pattern = re.compile('^(?!-)[A-Za-z0-9-]{1,63}(?<!-)$')

def is_valid_fqdn(s):
    """Check if the string is a valid FQDN."""
    if '*' in s or not s:
        return False
    extracted = tldextract.extract(s)
    if not all([extracted.domain, extracted.suffix]):
        return False
    return all(fqdn_pattern.match(x) for x in s.split('.'))

def remove_prefix(line, prefix):
    """General function to remove specified prefix from a line."""
    if line.startswith(prefix):
        potential_fqdn = line[len(prefix):]
        if is_valid_fqdn(potential_fqdn):
            return potential_fqdn
    return line

def sanitize_line(line, rules):
    """Apply all sanitization rules to a line."""
    for rule in rules:
        line = rule(line.strip())
        if line is None:
            return None
    return line

def get_sanitization_rules():
    """Returns a list of sanitization rules."""
    return [
        lambda line: None if line.startswith("#") else line,       # Remove comment lines
        lambda line: remove_prefix(line, "127.0.0.1"),             # Remove IP prefix 127.0.0.1 without space
        lambda line: remove_prefix(line, "127.0.0.1 "),            # Remove IP prefix 127.0.0.1 with space
        lambda line: remove_prefix(line, "0.0.0.0"),               # Remove IP prefix 0.0.0.0 without space
        lambda line: remove_prefix(line, "0.0.0.0 "),              # Remove IP prefix 0.0.0.0 with space
        lambda line: remove_prefix(line, "||"),                    # Remove double pipes
        lambda line: remove_prefix(line, "http://"),               # Remove http prefix
        lambda line: remove_prefix(line, "https://"),              # Remove https prefix
        lambda line: line.rstrip('.'),                             # Remove trailing dot
        lambda line: line.lower()                                  # Convert to lowercase
    ]

def process_large_file(input_file_path, output_file_path):
    """Process large files line by line and track progress."""
    unique_domains = set()
    rules = get_sanitization_rules()

    with open(input_file_path, 'r') as infile:
        total_lines = sum(1 for _ in infile)
        infile.seek(0)  # Reset file pointer to start
        with tqdm(total=total_lines, desc="Processing") as pbar:
            for line in infile:
                sanitized_line = sanitize_line(line, rules)
                if sanitized_line and is_valid_fqdn(sanitized_line):
                    unique_domains.add(sanitized_line)
                pbar.update(1)

    return unique_domains

def write_to_output_file(unique_domains, output_file_path):
    """Write unique domains to the output file and track progress."""
    # Sort the unique domain names in alphabetical order
    sorted_unique_domains = sorted(unique_domains)

    with open(output_file_path, 'w') as outfile:
        with tqdm(total=len(sorted_unique_domains), desc="Writing") as pbar:
            for domain in sorted_unique_domains:
                outfile.write(domain + '\n')
                pbar.update(1)

if __name__ == "__main__":
    input_file_path = 'input.txt'
    output_file_path = 'output.txt'

    unique_domains = process_large_file(input_file_path, output_file_path)
    write_to_output_file(unique_domains, output_file_path)

whitelist.py

import os
from pathlib import Path
import argparse
from tqdm import tqdm

def read_fqdn_from_file(file_path: Path, description: str) -> set:
    """Read the file and return a set of FQDNs with a progress bar."""
    with file_path.open('r') as file:
        fqdns = set()
        total_lines = sum(1 for _ in file)
        file.seek(0)
        with tqdm(total=total_lines, desc=description, unit="lines", leave=False) as pbar:
            for line in file:
                fqdn = line.strip()
                fqdns.add(fqdn)
                pbar.update(1)
        return fqdns

def write_fqdn_to_file(file_path: Path, content: set, description: str) -> None:
    """Write a set of FQDNs to the specified file with a progress bar."""
    with file_path.open('w') as file:
        total_fqdns = len(content)
        with tqdm(total=total_fqdns, desc=description, unit="lines", leave=False) as pbar:
            for fqdn in content:
                file.write(fqdn + '\n')
                pbar.update(1)

def ensure_file_exists(file_path: Path) -> None:
    """Check if a file exists or exit the program."""
    if not file_path.is_file():
        print(f"ERROR: File '{file_path}' not found.")
        exit(1)

def main(blacklist_path: Path, whitelist_path: Path, output_path: Path) -> None:
    """Main function to process blacklist and whitelist files."""
    
    # Check if files exist
    ensure_file_exists(blacklist_path)
    ensure_file_exists(whitelist_path)

    blacklist_fqdns = read_fqdn_from_file(blacklist_path, f"Reading {blacklist_path}")
    whitelist_fqdns = read_fqdn_from_file(whitelist_path, f"Reading {whitelist_path}")

    # Filter out whitelisted FQDNs from the blacklist
    filtered_fqdns = blacklist_fqdns - whitelist_fqdns

    write_fqdn_to_file(output_path, filtered_fqdns, f"Writing {output_path}")

    print(f"Blacklist: {len(blacklist_fqdns)} FQDNs.")
    print(f"Whitelist: {len(whitelist_fqdns)} FQDNs.")
    print(f"After Filtering: {len(filtered_fqdns)} FQDNs.")

if __name__ == '__main__':
    parser = argparse.ArgumentParser(description="Process blacklist and whitelist files.")
    parser.add_argument('--blacklist', default='blacklist.txt', type=Path, help='Path to blacklist file')
    parser.add_argument('--whitelist', default='whitelist.txt', type=Path, help='Path to whitelist file')
    parser.add_argument('--output', default='filtered_blacklist.txt', type=Path, help='Path to output file')
    
    args = parser.parse_args()

    try:
        main(args.blacklist, args.whitelist, args.output)
    except Exception as e:
        print(f"ERROR: {e}")
        exit(1)

generate_fqdn.sh

#!/bin/bash

# Description: Setup script for maintaining a domain blacklist.

# Function to display an error message and exit
die() {
  echo "$1" >&2
  exit 1
}

# Check if running with sudo
[ "$EUID" -eq 0 ] || die "Please run this script with sudo."

# Update and install prerequisites
echo "Updating package list..."
sudo apt-get update || die "Failed to update package list."
echo "Installing required packages..."
sudo apt-get install -y python3 python3-pip pv ncftp || die "Failed to install packages."

# Upgrade Python and pip
echo "Upgrading Python and pip..."
python3 -m ensurepip --upgrade || die "Failed to upgrade pip."
pip3 install --no-cache-dir --upgrade pip setuptools tldextract tqdm || die "Failed to upgrade pip packages."

# Function to download a URL
download_url() {
  local url="$1"
  local random_filename=$(uuidgen | tr -dc '[:alnum:]')

  echo "Downloading blacklist: $url"
  
  if wget -q --progress=bar:force -O "$random_filename.fqdn.list" "$url"; then
    echo "Downloaded: $url"
  else
    echo "Failed to download: $url"
  fi
}

# Download URLs from the list
LISTS="blacklists.fqdn.urls"
echo "Download blacklists"
while read -r url; do
  download_url "$url"
done < "$LISTS"

# Aggregate blacklists
echo "Aggregate blacklists"
cat *.fqdn.list | sort -u > all.fqdn.blacklist
rm -f *.fqdn.list

# Sanitize blacklists
mv all.fqdn.blacklist input.txt
python sanitize.py
mv output.txt all.fqdn.blacklist

# Remove whitelisted domains
mv all.fqdn.blacklist blacklist.txt
python whitelist.py
mv filtered_blacklist.txt all.fqdn.blacklist
rm blacklist.txt input.txt

total_lines_new=$(wc -l < all.fqdn.blacklist)
echo "Total domains: $total_lines_new."

scripts/update_rpz_blacklist.sh

#!/bin/bash

# ==========================================
# RPZ BLACKLIST UPDATER SCRIPT
# ==========================================

# List of required commands
REQUIRED_COMMANDS=("wget" "tar" "systemctl" "grep" "mkdir" "cat" "date" "named-checkconf")

# Check if required commands are installed
for cmd in "${REQUIRED_COMMANDS[@]}"; do
  if ! command -v "$cmd" >/dev/null 2>&1; then
    echo "Error: $cmd is required but not installed. Exiting."
    exit 1
  fi
done

# Directory to store the RPZ blacklist
RPZ_DIRECTORY="/path/to/store/rpz_blacklist"
# URL of the RPZ blacklist
RPZ_URL="https://github.com/fabriziosalmi/blacklists/raw/main/rpz_blacklist.tar.gz"
# BIND configuration file
BIND_CONFIG="/etc/bind/named.conf.local"

# Ensure the directory for the RPZ blacklist exists
mkdir -p "$RPZ_DIRECTORY"

# Download the latest RPZ blacklist from the repository
wget -O "$RPZ_DIRECTORY/rpz_blacklist.tar.gz" "$RPZ_URL"

# Extract the blacklist
tar -xzf "$RPZ_DIRECTORY/rpz_blacklist.tar.gz" -C "$RPZ_DIRECTORY"

# Check if the configuration is already added to avoid duplicate entries
if ! grep -q "rpz.blacklist" "$BIND_CONFIG"; then
    # Append configuration to BIND's config file
    echo "zone \"rpz.blacklist\" {
        type master;
        file \"$RPZ_DIRECTORY/rpz_blacklist.txt\";
    };" >> "$BIND_CONFIG"

    echo "options {
        response-policy { zone \"rpz.blacklist\"; };
    };" >> "$BIND_CONFIG"
fi

# Check BIND configuration
if ! named-checkconf "$BIND_CONFIG"; then
    echo "Error in BIND configuration. Please check manually!"
    exit 1
fi

echo "Script executed successfully!"

# To manually reload BIND and apply the new blacklist:
# sudo systemctl reload bind9
# You can also schedule this script using cron for automation.
# For example, to run it daily at 2 AM:
# crontab -e
# Add:
# 0 2 * * * /path/to/this_script/update_rpz_blacklist.sh

scripts/nft_blacklist_fqdn.sh

#!/bin/bash

print_error() {
  echo "Error: $1" >&2
  exit 1
}

print_success() {
  echo "Success: $1"
}

validate_domain() {
  local domain="$1"
  local domain_regex="^((?!-)[A-Za-z0-9-]{1,63}(?<!-)\.)+[A-Za-z]{2,63}$"
  [[ ! "$domain" =~ $domain_regex ]] && print_error "Invalid domain name: $domain"
}

readonly BLACKLIST_URL="https://github.com/fabriziosalmi/blacklists/releases/download/latest/blacklist.txt"
readonly INPUT_FILE="/tmp/all.fqdn.blacklist"
readonly RULES_FILE="nftables_rules.nft"
readonly TABLE_NAME="filter"
readonly CHAIN_NAME="input_drop"

if ! wget -q -O "$INPUT_FILE" "$BLACKLIST_URL"; then
  print_error "Failed to download the blacklist from $BLACKLIST_URL"
fi

[[ ! -r "$INPUT_FILE" ]] && print_error "Input file not found or not readable: $INPUT_FILE"

{
  echo "#!/usr/sbin/nft -f"
  echo "flush ruleset"
  echo "table $TABLE_NAME {"
  echo "    chain $CHAIN_NAME {"

  while IFS= read -r domain || [[ -n "$domain" ]]; do
    validate_domain "$domain"
    echo "        drop ip daddr $domain"
    echo "        drop ip saddr $domain"
  done < "$INPUT_FILE"

  echo "    }"
  echo "}"
} > "$RULES_FILE"

nft -f "$RULES_FILE" || print_error "Error applying nftables rules. Ensure you have the necessary privileges."

rm -f "$INPUT_FILE" "$RULES_FILE"

docker/pihole-squid/squid/update_blacklist.sh

#!/bin/bash

# Define the URL for the latest blacklist
blacklist_url="https://get.domainsblacklists.com/blacklist.txt"
blacklist_file="/etc/squid/conf.d/blacklist.txt"

# Check if 'wget' is installed
if ! command -v wget &> /dev/null; then
    echo "Error: 'wget' is not installed. Please install it."
    exit 1
fi

# Download the latest blacklist and handle errors
if wget -O "$blacklist_file" "$blacklist_url"; then
    echo "Blacklist updated successfully."

    # Check if Squid is installed and restart it
    if command -v squid &> /dev/null; then
        service squid restart
        echo "Squid restarted to apply the changes."
    else
        echo "Warning: Squid is not installed. Please install and configure it separately."
    fi
else
    echo "Error: Failed to update the blacklist. Please check the URL or your internet connection."
    exit 1
fi

sanitize.py

import re
import tldextract
from tqdm import tqdm

# Improved regex pattern for FQDN validation
fqdn_pattern = re.compile(r'^(?!-)[A-Za-z0-9-]{1,63}(?<!-)$')

def is_valid_fqdn(s):
    """Check if the string is a valid FQDN."""
    if '*' in s or not s:
        return False
    extracted = tldextract.extract(s)
    if not all([extracted.domain, extracted.suffix]):
        return False
    return all(fqdn_pattern.match(x) for x in s.split('.'))

def remove_prefix(line, prefixes):
    """General function to remove specified prefixes from a line."""
    for prefix in prefixes:
        if line.startswith(prefix):
            potential_fqdn = line[len(prefix):]
            if is_valid_fqdn(potential_fqdn):
                return potential_fqdn
    return line

def sanitize_line(line, rules):
    """Apply all sanitization rules to a line using list comprehension for efficiency."""
    for rule in rules:
        line = rule(line.strip())
        if line is None:
            return None
    return line

def get_sanitization_rules():
    """Returns a list of sanitization rules, utilizing a single function for prefix removal."""
    prefixes = ["127.0.0.1 ", "127.0.0.1", "0.0.0.0 ", "0.0.0.0", "||", "http://", "https://"]
    return [
        lambda line: None if line.startswith("#") else line,
        lambda line: remove_prefix(line, prefixes),
        lambda line: line.rstrip('.'),
        lambda line: line.lower()
    ]

def process_large_file(input_file_path, output_file_path):
    """Process large files line by line with optimized file reading and writing."""
    unique_domains = set()
    rules = get_sanitization_rules()

    with open(input_file_path, 'r') as infile, open(output_file_path, 'w') as outfile:
        total_lines = sum(1 for _ in infile)
        infile.seek(0)  # Reset file pointer to start
        for line in tqdm(infile, total=total_lines, desc="Processing"):
            sanitized_line = sanitize_line(line, rules)
            if sanitized_line and is_valid_fqdn(sanitized_line):
                unique_domains.add(sanitized_line)

        # Sort and write the unique domain names to the output file
        for domain in tqdm(sorted(unique_domains), desc="Writing"):
            outfile.write(domain + '\n')

# Example usage
process_large_file('input.txt', 'output.txt')