import re
import tldextract
from tqdm import tqdm
# Pre-compiled regex pattern for FQDN validation
fqdn_pattern = re.compile('^(?!-)[A-Za-z0-9-]{1,63}(?<!-)$')
def is_valid_fqdn(s):
"""Check if the string is a valid FQDN."""
if '*' in s or not s:
return False
extracted = tldextract.extract(s)
if not all([extracted.domain, extracted.suffix]):
return False
return all(fqdn_pattern.match(x) for x in s.split('.'))
def remove_prefix(line, prefix):
"""General function to remove specified prefix from a line."""
if line.startswith(prefix):
potential_fqdn = line[len(prefix):]
if is_valid_fqdn(potential_fqdn):
return potential_fqdn
return line
def sanitize_line(line, rules):
"""Apply all sanitization rules to a line."""
for rule in rules:
line = rule(line.strip())
if line is None:
return None
return line
def get_sanitization_rules():
"""Returns a list of sanitization rules."""
return [
lambda line: None if line.startswith("#") else line, # Remove comment lines
lambda line: remove_prefix(line, ""), # Remove IP prefix without space
lambda line: remove_prefix(line, " "), # Remove IP prefix with space
lambda line: remove_prefix(line, ""), # Remove IP prefix without space
lambda line: remove_prefix(line, " "), # Remove IP prefix with space
lambda line: remove_prefix(line, "||"), # Remove double pipes
lambda line: remove_prefix(line, "http://"), # Remove http prefix
lambda line: remove_prefix(line, "https://"), # Remove https prefix
lambda line: line.rstrip('.'), # Remove trailing dot
lambda line: line.lower() # Convert to lowercase
def process_large_file(input_file_path, output_file_path):
"""Process large files line by line and track progress."""
unique_domains = set()
rules = get_sanitization_rules()
with open(input_file_path, 'r') as infile:
total_lines = sum(1 for _ in infile)
infile.seek(0) # Reset file pointer to start
with tqdm(total=total_lines, desc="Processing") as pbar:
for line in infile:
sanitized_line = sanitize_line(line, rules)
if sanitized_line and is_valid_fqdn(sanitized_line):
return unique_domains
def write_to_output_file(unique_domains, output_file_path):
"""Write unique domains to the output file and track progress."""
# Sort the unique domain names in alphabetical order
sorted_unique_domains = sorted(unique_domains)
with open(output_file_path, 'w') as outfile:
with tqdm(total=len(sorted_unique_domains), desc="Writing") as pbar:
for domain in sorted_unique_domains:
outfile.write(domain + '\n')
if __name__ == "__main__":
input_file_path = 'input.txt'
output_file_path = 'output.txt'
unique_domains = process_large_file(input_file_path, output_file_path)
write_to_output_file(unique_domains, output_file_path)
import os
from pathlib import Path
import argparse
from tqdm import tqdm
def read_fqdn_from_file(file_path: Path, description: str) -> set:
"""Read the file and return a set of FQDNs with a progress bar."""
with file_path.open('r') as file:
fqdns = set()
total_lines = sum(1 for _ in file)
with tqdm(total=total_lines, desc=description, unit="lines", leave=False) as pbar:
for line in file:
fqdn = line.strip()
return fqdns
def write_fqdn_to_file(file_path: Path, content: set, description: str) -> None:
"""Write a set of FQDNs to the specified file with a progress bar."""
with file_path.open('w') as file:
total_fqdns = len(content)
with tqdm(total=total_fqdns, desc=description, unit="lines", leave=False) as pbar:
for fqdn in content:
file.write(fqdn + '\n')
def ensure_file_exists(file_path: Path) -> None:
"""Check if a file exists or exit the program."""
if not file_path.is_file():
print(f"ERROR: File '{file_path}' not found.")
def main(blacklist_path: Path, whitelist_path: Path, output_path: Path) -> None:
"""Main function to process blacklist and whitelist files."""
# Check if files exist
blacklist_fqdns = read_fqdn_from_file(blacklist_path, f"Reading {blacklist_path}")
whitelist_fqdns = read_fqdn_from_file(whitelist_path, f"Reading {whitelist_path}")
# Filter out whitelisted FQDNs from the blacklist
filtered_fqdns = blacklist_fqdns - whitelist_fqdns
write_fqdn_to_file(output_path, filtered_fqdns, f"Writing {output_path}")
print(f"Blacklist: {len(blacklist_fqdns)} FQDNs.")
print(f"Whitelist: {len(whitelist_fqdns)} FQDNs.")
print(f"After Filtering: {len(filtered_fqdns)} FQDNs.")
if __name__ == '__main__':
parser = argparse.ArgumentParser(description="Process blacklist and whitelist files.")
parser.add_argument('--blacklist', default='blacklist.txt', type=Path, help='Path to blacklist file')
parser.add_argument('--whitelist', default='whitelist.txt', type=Path, help='Path to whitelist file')
parser.add_argument('--output', default='filtered_blacklist.txt', type=Path, help='Path to output file')
args = parser.parse_args()
main(args.blacklist, args.whitelist, args.output)
except Exception as e:
print(f"ERROR: {e}")
# Description: Setup script for maintaining a domain blacklist.
# Function to display an error message and exit
die() {
echo "$1" >&2
exit 1
# Check if running with sudo
[ "$EUID" -eq 0 ] || die "Please run this script with sudo."
# Update and install prerequisites
echo "Updating package list..."
sudo apt-get update || die "Failed to update package list."
echo "Installing required packages..."
sudo apt-get install -y python3 python3-pip pv ncftp || die "Failed to install packages."
# Upgrade Python and pip
echo "Upgrading Python and pip..."
python3 -m ensurepip --upgrade || die "Failed to upgrade pip."
pip3 install --no-cache-dir --upgrade pip setuptools tldextract tqdm || die "Failed to upgrade pip packages."
# Function to download a URL
download_url() {
local url="$1"
local random_filename=$(uuidgen | tr -dc '[:alnum:]')
echo "Downloading blacklist: $url"
if wget -q --progress=bar:force -O "$random_filename.fqdn.list" "$url"; then
echo "Downloaded: $url"
echo "Failed to download: $url"
# Download URLs from the list
echo "Download blacklists"
while read -r url; do
download_url "$url"
done < "$LISTS"
# Aggregate blacklists
echo "Aggregate blacklists"
cat *.fqdn.list | sort -u > all.fqdn.blacklist
rm -f *.fqdn.list
# Sanitize blacklists
mv all.fqdn.blacklist input.txt
python sanitize.py
mv output.txt all.fqdn.blacklist
# Remove whitelisted domains
mv all.fqdn.blacklist blacklist.txt
python whitelist.py
mv filtered_blacklist.txt all.fqdn.blacklist
rm blacklist.txt input.txt
total_lines_new=$(wc -l < all.fqdn.blacklist)
echo "Total domains: $total_lines_new."
# ==========================================
# ==========================================
# List of required commands
REQUIRED_COMMANDS=("wget" "tar" "systemctl" "grep" "mkdir" "cat" "date" "named-checkconf")
# Check if required commands are installed
for cmd in "${REQUIRED_COMMANDS[@]}"; do
if ! command -v "$cmd" >/dev/null 2>&1; then
echo "Error: $cmd is required but not installed. Exiting."
exit 1
# Directory to store the RPZ blacklist
# URL of the RPZ blacklist
# BIND configuration file
# Ensure the directory for the RPZ blacklist exists
mkdir -p "$RPZ_DIRECTORY"
# Download the latest RPZ blacklist from the repository
wget -O "$RPZ_DIRECTORY/rpz_blacklist.tar.gz" "$RPZ_URL"
# Extract the blacklist
tar -xzf "$RPZ_DIRECTORY/rpz_blacklist.tar.gz" -C "$RPZ_DIRECTORY"
# Check if the configuration is already added to avoid duplicate entries
if ! grep -q "rpz.blacklist" "$BIND_CONFIG"; then
# Append configuration to BIND's config file
echo "zone \"rpz.blacklist\" {
type master;
file \"$RPZ_DIRECTORY/rpz_blacklist.txt\";
};" >> "$BIND_CONFIG"
echo "options {
response-policy { zone \"rpz.blacklist\"; };
};" >> "$BIND_CONFIG"
# Check BIND configuration
if ! named-checkconf "$BIND_CONFIG"; then
echo "Error in BIND configuration. Please check manually!"
exit 1
echo "Script executed successfully!"
# To manually reload BIND and apply the new blacklist:
# sudo systemctl reload bind9
# You can also schedule this script using cron for automation.
# For example, to run it daily at 2 AM:
# crontab -e
# Add:
# 0 2 * * * /path/to/this_script/update_rpz_blacklist.sh
print_error() {
echo "Error: $1" >&2
exit 1
print_success() {
echo "Success: $1"
validate_domain() {
local domain="$1"
local domain_regex="^((?!-)[A-Za-z0-9-]{1,63}(?<!-)\.)+[A-Za-z]{2,63}$"
[[ ! "$domain" =~ $domain_regex ]] && print_error "Invalid domain name: $domain"
readonly BLACKLIST_URL="https://github.com/fabriziosalmi/blacklists/releases/download/latest/blacklist.txt"
readonly INPUT_FILE="/tmp/all.fqdn.blacklist"
readonly RULES_FILE="nftables_rules.nft"
readonly TABLE_NAME="filter"
readonly CHAIN_NAME="input_drop"
if ! wget -q -O "$INPUT_FILE" "$BLACKLIST_URL"; then
print_error "Failed to download the blacklist from $BLACKLIST_URL"
[[ ! -r "$INPUT_FILE" ]] && print_error "Input file not found or not readable: $INPUT_FILE"
echo "#!/usr/sbin/nft -f"
echo "flush ruleset"
echo "table $TABLE_NAME {"
echo " chain $CHAIN_NAME {"
while IFS= read -r domain || [[ -n "$domain" ]]; do
validate_domain "$domain"
echo " drop ip daddr $domain"
echo " drop ip saddr $domain"
done < "$INPUT_FILE"
echo " }"
echo "}"
nft -f "$RULES_FILE" || print_error "Error applying nftables rules. Ensure you have the necessary privileges."
# Define the URL for the latest blacklist
# Check if 'wget' is installed
if ! command -v wget &> /dev/null; then
echo "Error: 'wget' is not installed. Please install it."
exit 1
# Download the latest blacklist and handle errors
if wget -O "$blacklist_file" "$blacklist_url"; then
echo "Blacklist updated successfully."
# Check if Squid is installed and restart it
if command -v squid &> /dev/null; then
service squid restart
echo "Squid restarted to apply the changes."
echo "Warning: Squid is not installed. Please install and configure it separately."
echo "Error: Failed to update the blacklist. Please check the URL or your internet connection."
exit 1
import re
import tldextract
from tqdm import tqdm
# Improved regex pattern for FQDN validation
fqdn_pattern = re.compile(r'^(?!-)[A-Za-z0-9-]{1,63}(?<!-)$')
def is_valid_fqdn(s):
"""Check if the string is a valid FQDN."""
if '*' in s or not s:
return False
extracted = tldextract.extract(s)
if not all([extracted.domain, extracted.suffix]):
return False
return all(fqdn_pattern.match(x) for x in s.split('.'))
def remove_prefix(line, prefixes):
"""General function to remove specified prefixes from a line."""
for prefix in prefixes:
if line.startswith(prefix):
potential_fqdn = line[len(prefix):]
if is_valid_fqdn(potential_fqdn):
return potential_fqdn
return line
def sanitize_line(line, rules):
"""Apply all sanitization rules to a line using list comprehension for efficiency."""
for rule in rules:
line = rule(line.strip())
if line is None:
return None
return line
def get_sanitization_rules():
"""Returns a list of sanitization rules, utilizing a single function for prefix removal."""
prefixes = [" ", "", " ", "", "||", "http://", "https://"]
return [
lambda line: None if line.startswith("#") else line,
lambda line: remove_prefix(line, prefixes),
lambda line: line.rstrip('.'),
lambda line: line.lower()
def process_large_file(input_file_path, output_file_path):
"""Process large files line by line with optimized file reading and writing."""
unique_domains = set()
rules = get_sanitization_rules()
with open(input_file_path, 'r') as infile, open(output_file_path, 'w') as outfile:
total_lines = sum(1 for _ in infile)
infile.seek(0) # Reset file pointer to start
for line in tqdm(infile, total=total_lines, desc="Processing"):
sanitized_line = sanitize_line(line, rules)
if sanitized_line and is_valid_fqdn(sanitized_line):
# Sort and write the unique domain names to the output file
for domain in tqdm(sorted(unique_domains), desc="Writing"):
outfile.write(domain + '\n')
# Example usage
process_large_file('input.txt', 'output.txt')