yihong1120/Construction-Hazard-Detection

Fix code scanning alert - Uncontrolled data used in path expression

Opened this issue · 1 comments

Potential solution

To solve the bug related to "Uncontrolled data used in path expression," we need to ensure that all user inputs used in path expressions are properly validated and sanitized. This will prevent path traversal vulnerabilities and other security issues. The plan involves:

  1. Sanitizing and validating user inputs: Ensure that inputs like URLs, file names, and directory paths do not contain malicious content.
  2. Using secure methods for path handling: Utilize libraries and functions that help in securely constructing and validating paths.

What is causing this bug?

The bug is caused by the lack of validation and sanitization of user inputs used in path expressions. Specifically:

  • In src/stream_capture.py, the stream URL is taken directly from user input without validation.
  • In src/model_fetcher.py, the model_name parameter is used directly to construct file paths without validation.
  • In src/live_stream_detection.py, the output_folder and model_key parameters are used in path expressions without proper validation.

Code

Here are the implementation details and code snippets to fix the issues:

src/stream_capture.py

Changes:

  1. Sanitize and validate the stream URL.
import re
from urllib.parse import urlparse
import argparse
import gc
import cv2
import time

class StreamCapture:
    def __init__(self, stream_url: str, capture_interval: int = 15):
        self.stream_url = stream_url
        self.cap: cv2.VideoCapture | None = None
        self.capture_interval = capture_interval

    def initialise_stream(self, stream_url: str) -> None:
        self.cap = cv2.VideoCapture(stream_url)
        self.cap.set(cv2.CAP_PROP_BUFFERSIZE, 1)
        self.cap.set(cv2.CAP_PROP_FOURCC, cv2.VideoWriter_fourcc(*'H264'))
        if not self.cap.isOpened():
            time.sleep(5)
            self.cap.open(stream_url)

    def execute_capture(self):
        # Implementation of frame capture logic
        pass

def is_valid_url(url: str) -> bool:
    parsed_url = urlparse(url)
    return all([parsed_url.scheme, parsed_url.netloc])

def sanitize_url(url: str) -> str:
    url = re.sub(r'[^\w\-/:.?&=]', '', url)
    return url

if __name__ == '__main__':
    parser = argparse.ArgumentParser(description='Capture video stream frames.')
    parser.add_argument('--url', type=str, help='Live stream URL', required=True)
    args = parser.parse_args()

    sanitized_url = sanitize_url(args.url)
    if not is_valid_url(sanitized_url):
        raise ValueError("Invalid URL provided")

    stream_capture = StreamCapture(sanitized_url)
    for frame, timestamp in stream_capture.execute_capture():
        print(f"Frame at {timestamp} displayed")
        del frame
        gc.collect()

src/model_fetcher.py

Changes:

  1. Sanitize the model_name parameter to prevent path traversal.
from pathlib import Path
import requests
import re

class ModelInfo(TypedDict):
    model_name: str
    url: str

def sanitize_model_name(model_name: str) -> str:
    if not re.match(r'^[\w\-]+$', model_name):
        raise ValueError(f"Invalid model name: {model_name}")
    return model_name

def download_model(model_name, url):
    model_name = sanitize_model_name(model_name)
    LOCAL_MODEL_DIRECTORY = Path('models/pt/')
    LOCAL_MODEL_DIRECTORY.mkdir(parents=True, exist_ok=True)
    local_file_path = LOCAL_MODEL_DIRECTORY / model_name

    if local_file_path.exists():
        print(f"'{model_name}' exists. Skipping download.")
        return

    response = requests.get(url, stream=True)
    if response.status_code == 200:
        with open(local_file_path, 'wb') as f:
            for chunk in response.iter_content(chunk_size=8192):
                f.write(chunk)
        print(f"'{model_name}' saved to '{local_file_path}'.")
    else:
        print(f"Error downloading '{model_name}': {response.status_code}")

def main():
    MODEL_URLS = {
        'best_yolov8l.pt': 'http://changdar-server.mooo.com:28000/models/best_yolov8l.pt',
        'best_yolov8x.pt': 'http://changdar-server.mooo.com:28000/models/best_yolov8x.pt',
    }

    for model_name, url in MODEL_URLS.items():
        download_model(model_name, url)

if __name__ == '__main__':
    main()

src/live_stream_detection.py

Changes:

  1. Validate and sanitize the output_folder and model_key parameters.
  2. Validate the stream_url parameter.
import os
import argparse
import cv2
from pathlib import Path
import re
from urllib.parse import urlparse

class LiveStreamDetector:
    def __init__(self, api_url: str = 'http://localhost:5000', model_key: str = 'yolov8l', output_folder: str | None = None, run_local: bool = True):
        self.api_url = api_url
        self.model_key = self.sanitize_model_key(model_key)
        self.output_folder = self.sanitize_output_folder(output_folder)
        self.run_local = run_local

    def sanitize_output_folder(self, output_folder: str) -> str:
        if output_folder:
            output_folder = os.path.abspath(output_folder)
            if not output_folder.startswith(os.getcwd()):
                raise ValueError("Invalid output folder path")
        return output_folder

    def sanitize_model_key(self, model_key: str) -> str:
        if not re.match(r'^[\w\-]+$', model_key):
            raise ValueError(f"Invalid model key: {model_key}")
        return model_key

    def generate_detections_local(self, frame: cv2.Mat) -> list[list[float]]:
        model_path = Path('models/pt/') / f"best_{self.model_key}.pt"
        # Implementation of detection logic
        pass

    def run_detection(self, stream_url: str) -> None:
        if not self.is_valid_url(stream_url):
            raise ValueError("Invalid stream URL")
        cap = cv2.VideoCapture(stream_url)
        # Implementation of detection logic
        pass

    def is_valid_url(self, url: str) -> bool:
        parsed_url = urlparse(url)
        return all([parsed_url.scheme, parsed_url.netloc])

if __name__ == '__main__':
    parser = argparse.ArgumentParser(description='Perform live stream detection and tracking using YOLOv8.')
    parser.add_argument('--url', type=str, help='Live stream URL', required=True)
    parser.add_argument('--api_url', type=str, default='http://localhost:5000', help='API URL for detection')
    parser.add_argument('--model_key', type=str, default='yolov8n', help='Model key for detection')
    parser.add_argument('--output_folder', type=str, help='Output folder for detected frames')
    parser.add_argument('--run_local', action='store_true', help='Run detection using local model')
    args = parser.parse_args()

    detector = LiveStreamDetector(api_url=args.api_url, model_key=args.model_key, output_folder=args.output_folder, run_local=args.run_local)
    detector.run_detection(args.url)

How to replicate the bug

  1. For src/stream_capture.py:

    • Run the script with a malicious URL input that contains path traversal sequences.
    • Example: python src/stream_capture.py --url "http://example.com/../../etc/passwd"
  2. For src/model_fetcher.py:

    • Run the script with a malicious model name that contains path traversal sequences.
    • Example: Modify MODEL_URLS to include a key like ../etc/passwd.
  3. For src/live_stream_detection.py:

    • Run the script with malicious inputs for --output_folder and --model_key.
    • Example: python src/live_stream_detection.py --url "http://example.com" --output_folder "../../etc" --model_key "../etc/passwd"

By implementing the recommended changes, these vulnerabilities will be mitigated, ensuring that user inputs are properly validated and sanitized.

Click here to create a Pull Request with the proposed solution

Files used for this task:

Changes on src/stream_capture.py

Analysis Report

Overview

The file src/stream_capture.py is responsible for capturing frames from a video stream. It includes functionalities to initialize the stream, capture frames at specified intervals, check internet speed, select stream quality based on speed, and handle failures in capturing frames.

Potential Security Issue

The primary concern mentioned in the issue is the "Uncontrolled data used in path expression," which can lead to path traversal vulnerabilities. In this file, the user input is primarily the stream URL provided via command-line arguments.

Key Areas to Review

  1. Stream URL Handling:
    • The stream URL is taken directly from user input without any validation or sanitization.
    • This URL is used in methods like initialise_stream and select_quality_based_on_speed.

Detailed Analysis

Stream URL Handling

  • Initialization:

    def __init__(self, stream_url: str, capture_interval: int = 15):
        self.stream_url = stream_url
        self.cap: cv2.VideoCapture | None = None
        self.capture_interval = capture_interval
  • Stream Initialization:

    def initialise_stream(self, stream_url: str) -> None:
        self.cap = cv2.VideoCapture(stream_url)
        self.cap.set(cv2.CAP_PROP_BUFFERSIZE, 1)
        self.cap.set(cv2.CAP_PROP_FOURCC, cv2.VideoWriter_fourcc(*'H264'))
        if not self.cap.isOpened():
            time.sleep(5)
            self.cap.open(stream_url)
  • Command-line Argument Parsing:

    if __name__ == '__main__':
        parser = argparse.ArgumentParser(
            description='Capture video stream frames.',
        )
        parser.add_argument(
            '--url',
            type=str,
            help='Live stream URL',
            required=True,
        )
        args = parser.parse_args()
    
        stream_capture = StreamCapture(args.url)
        for frame, timestamp in stream_capture.execute_capture():
            print(f"Frame at {timestamp} displayed")
            del frame
            gc.collect()

Recommendations

  1. Sanitize and Validate Stream URL:

    • Ensure that the stream URL is a valid URL and does not contain any malicious content.
    • Use libraries like urllib.parse to parse and validate the URL.
  2. Secure Methods for Handling URLs:

    • Avoid directly using user inputs in critical operations without validation.
    • Implement a whitelist of allowed URL patterns if possible.

Example Fix

Here is an example of how you can sanitize and validate the stream URL:

import re
from urllib.parse import urlparse

def is_valid_url(url: str) -> bool:
    # Basic URL validation
    parsed_url = urlparse(url)
    return all([parsed_url.scheme, parsed_url.netloc])

def sanitize_url(url: str) -> str:
    # Remove any unwanted characters or patterns
    url = re.sub(r'[^\w\-/:.?&=]', '', url)
    return url

if __name__ == '__main__':
    parser = argparse.ArgumentParser(
        description='Capture video stream frames.',
    )
    parser.add_argument(
        '--url',
        type=str,
        help='Live stream URL',
        required=True,
    )
    args = parser.parse_args()

    sanitized_url = sanitize_url(args.url)
    if not is_valid_url(sanitized_url):
        raise ValueError("Invalid URL provided")

    stream_capture = StreamCapture(sanitized_url)
    for frame, timestamp in stream_capture.execute_capture():
        print(f"Frame at {timestamp} displayed")
        del frame
        gc.collect()

Conclusion

The primary issue in this file is the lack of validation and sanitization of the stream URL provided by the user. By implementing the recommended changes, you can mitigate the risk of path traversal and other related vulnerabilities.

Changes on src/model_fetcher.py

Analysis Report for src/model_fetcher.py

Overview

The src/model_fetcher.py file is responsible for downloading model files from specified URLs and saving them locally. The main function iterates over a dictionary of model names and URLs, and the download_model function handles the actual downloading and saving of the files.

Potential Security Issue

The primary concern in this file is the potential for path traversal vulnerabilities due to uncontrolled data being used in path expressions. Specifically, the model_name parameter in the download_model function is used directly to construct file paths without any validation or sanitization.

Detailed Analysis

  1. Path Construction:

    LOCAL_MODEL_DIRECTORY = Path('models/pt/')
    local_file_path = LOCAL_MODEL_DIRECTORY / model_name

    The model_name is directly appended to the LOCAL_MODEL_DIRECTORY to form the local_file_path. If model_name contains path traversal characters (e.g., ../), it could lead to writing files outside the intended directory.

  2. File Existence Check:

    if local_file_path.exists():
        print(f"'{model_name}' exists. Skipping download.")
        return

    This check ensures that the file is not downloaded if it already exists, but it does not mitigate the risk of path traversal.

  3. File Writing:

    with open(local_file_path, 'wb') as f:
        for chunk in response.iter_content(chunk_size=8192):
            f.write(chunk)

    Writing to the file without validating the model_name can lead to overwriting critical files if path traversal is exploited.

Recommendations

To mitigate the risk of path traversal vulnerabilities, the following steps should be taken:

  1. Sanitize model_name:
    Ensure that model_name does not contain any path traversal characters. This can be done by validating the input and allowing only safe characters (e.g., alphanumeric characters, underscores, and hyphens).

  2. Use Secure Methods for Path Handling:
    Utilize secure methods to handle file paths, ensuring that the constructed paths are within the intended directory.

Example Implementation

Here is an example of how to implement these recommendations:

from __future__ import annotations

from pathlib import Path
from typing import TypedDict
import requests
import re

class ModelInfo(TypedDict):
    model_name: str
    url: str

def sanitize_model_name(model_name: str) -> str:
    """
    Sanitize the model name to prevent path traversal attacks.

    Args:
        model_name (str): The name of the model file.

    Returns:
        str: The sanitized model name.
    """
    # Allow only alphanumeric characters, underscores, and hyphens
    if not re.match(r'^[\w\-]+$', model_name):
        raise ValueError(f"Invalid model name: {model_name}")
    return model_name

def download_model(model_name, url):
    """
    Download a model file if it doesn't already exist.

    Args:
        model_name (str): The name of the model file.
        url (str): The URL of the model file.
    """
    # Sanitize the model name
    model_name = sanitize_model_name(model_name)

    # Define the local directory to store the model files
    LOCAL_MODEL_DIRECTORY = Path('models/pt/')

    # Ensure the local directory exists
    LOCAL_MODEL_DIRECTORY.mkdir(parents=True, exist_ok=True)
    # Build the full local file path
    local_file_path = LOCAL_MODEL_DIRECTORY / model_name

    # Check if the model already exists
    if local_file_path.exists():
        print(f"'{model_name}' exists. Skipping download.")
        return

    # Send an HTTP GET request to fetch the model file
    response = requests.get(url, stream=True)
    if response.status_code == 200:
        # Begin the download and write to the file
        with open(local_file_path, 'wb') as f:
            for chunk in response.iter_content(chunk_size=8192):
                f.write(chunk)
        print(f"'{model_name}' saved to '{local_file_path}'.")
    else:
        print(f"Error downloading '{model_name}': {response.status_code}")

def main():
    # Define the URLs for the model files
    MODEL_URLS = {
        'best_yolov8l.pt': 'http://changdar-server.mooo.com:28000/models/best_yolov8l.pt',
        'best_yolov8x.pt': 'http://changdar-server.mooo.com:28000/models/best_yolov8x.pt',
    }

    # Iterate over all models and download them if they don't already exist
    for model_name, url in MODEL_URLS.items():
        download_model(model_name, url)

if __name__ == '__main__':
    main()

Conclusion

The primary issue in the src/model_fetcher.py file is the lack of validation and sanitization of the model_name parameter, which can lead to path traversal vulnerabilities. By implementing input sanitization and using secure methods for path handling, this vulnerability can be mitigated.

Changes on src/live_stream_detection.py

Analysis Report for src/live_stream_detection.py

Overview

The src/live_stream_detection.py file is responsible for performing live stream detection and tracking using YOLOv8 with SAHI. It includes functionalities for both local and cloud-based detection. The primary concern is to review and sanitize any user inputs used in path expressions to prevent path traversal attacks.

Key Areas of Concern

  1. User Inputs in Path Expressions:
    • The output_folder parameter in the LiveStreamDetector class.
    • The model_key parameter in the LiveStreamDetector class.
    • The stream_url parameter in the run_detection method.
    • Command-line arguments parsed using argparse.

Detailed Analysis

1. output_folder Parameter

The output_folder parameter is used to specify the folder for detected frames. This parameter can potentially be manipulated to perform path traversal attacks.

Current Usage:

def __init__(self, api_url: str = 'http://localhost:5000', model_key: str = 'yolov8l', output_folder: str | None = None, run_local: bool = True):
    ...
    self.output_folder = output_folder
    ...

Recommendation:

  • Validate and sanitize the output_folder parameter to ensure it does not contain any path traversal sequences.
  • Use os.path.abspath to convert the path to an absolute path and check if it is within a designated directory.

2. model_key Parameter

The model_key parameter is used to construct the model path. This parameter can also be manipulated to perform path traversal attacks.

Current Usage:

def generate_detections_local(self, frame: cv2.Mat) -> list[list[float]]:
    ...
    model_path = Path('models/pt/') / f"best_{self.model_key}.pt"
    ...

Recommendation:

  • Validate the model_key parameter to ensure it does not contain any path traversal sequences.
  • Use a whitelist of allowed model keys to prevent arbitrary file access.

3. stream_url Parameter

The stream_url parameter is used to specify the URL of the live stream. While this is not directly related to path traversal, it is important to ensure that the URL is valid and safe.

Current Usage:

def run_detection(self, stream_url: str) -> None:
    ...
    cap = cv2.VideoCapture(stream_url)
    ...

Recommendation:

  • Validate the stream_url parameter to ensure it is a valid URL.
  • Use a library like validators to check the URL format.

4. Command-line Arguments

The command-line arguments parsed using argparse include --url, --api_url, --model_key, --output_folder, and --run_local.

Current Usage:

if __name__ == '__main__':
    parser = argparse.ArgumentParser(description='Perform live stream detection and tracking using YOLOv8.')
    parser.add_argument('--url', type=str, help='Live stream URL', required=True)
    parser.add_argument('--api_url', type=str, default='http://localhost:5000', help='API URL for detection')
    parser.add_argument('--model_key', type=str, default='yolov8n', help='Model key for detection')
    parser.add_argument('--output_folder', type=str, help='Output folder for detected frames')
    parser.add_argument('--run_local', action='store_true', help='Run detection using local model')
    args = parser.parse_args()

Recommendation:

  • Validate and sanitize the command-line arguments, especially --output_folder and --model_key.
  • Implement validation checks to ensure these parameters do not contain any malicious input.

Conclusion

The primary areas of concern in this file are the output_folder and model_key parameters, which are used in path expressions and can potentially be exploited for path traversal attacks. Additionally, the stream_url parameter and command-line arguments should be validated to ensure they are safe and valid.

Action Items

  1. Implement validation and sanitization for the output_folder parameter.
  2. Implement validation and sanitization for the model_key parameter.
  3. Validate the stream_url parameter to ensure it is a valid URL.
  4. Validate and sanitize command-line arguments, especially --output_folder and --model_key.

By addressing these concerns, we can mitigate the risk of path traversal attacks and ensure the security of the application.

Disclaimer: This comment was entirely generated using AI. Be aware that the information provided may be incorrect.

Current plan usage: 32.02%

Have feedback or need help?
Discord
Documentation
support@codeautopilot.com