Fix code scanning alert - Uncontrolled data used in path expression
Opened this issue · 1 comments
Potential solution
To solve the bug related to "Uncontrolled data used in path expression," we need to ensure that all user inputs used in path expressions are properly validated and sanitized. This will prevent path traversal vulnerabilities and other security issues. The plan involves:
- Sanitizing and validating user inputs: Ensure that inputs like URLs, file names, and directory paths do not contain malicious content.
- Using secure methods for path handling: Utilize libraries and functions that help in securely constructing and validating paths.
What is causing this bug?
The bug is caused by the lack of validation and sanitization of user inputs used in path expressions. Specifically:
- In
src/stream_capture.py
, the stream URL is taken directly from user input without validation. - In
src/model_fetcher.py
, themodel_name
parameter is used directly to construct file paths without validation. - In
src/live_stream_detection.py
, theoutput_folder
andmodel_key
parameters are used in path expressions without proper validation.
Code
Here are the implementation details and code snippets to fix the issues:
src/stream_capture.py
Changes:
- Sanitize and validate the stream URL.
import re
from urllib.parse import urlparse
import argparse
import gc
import cv2
import time
class StreamCapture:
def __init__(self, stream_url: str, capture_interval: int = 15):
self.stream_url = stream_url
self.cap: cv2.VideoCapture | None = None
self.capture_interval = capture_interval
def initialise_stream(self, stream_url: str) -> None:
self.cap = cv2.VideoCapture(stream_url)
self.cap.set(cv2.CAP_PROP_BUFFERSIZE, 1)
self.cap.set(cv2.CAP_PROP_FOURCC, cv2.VideoWriter_fourcc(*'H264'))
if not self.cap.isOpened():
time.sleep(5)
self.cap.open(stream_url)
def execute_capture(self):
# Implementation of frame capture logic
pass
def is_valid_url(url: str) -> bool:
parsed_url = urlparse(url)
return all([parsed_url.scheme, parsed_url.netloc])
def sanitize_url(url: str) -> str:
url = re.sub(r'[^\w\-/:.?&=]', '', url)
return url
if __name__ == '__main__':
parser = argparse.ArgumentParser(description='Capture video stream frames.')
parser.add_argument('--url', type=str, help='Live stream URL', required=True)
args = parser.parse_args()
sanitized_url = sanitize_url(args.url)
if not is_valid_url(sanitized_url):
raise ValueError("Invalid URL provided")
stream_capture = StreamCapture(sanitized_url)
for frame, timestamp in stream_capture.execute_capture():
print(f"Frame at {timestamp} displayed")
del frame
gc.collect()
src/model_fetcher.py
Changes:
- Sanitize the
model_name
parameter to prevent path traversal.
from pathlib import Path
import requests
import re
class ModelInfo(TypedDict):
model_name: str
url: str
def sanitize_model_name(model_name: str) -> str:
if not re.match(r'^[\w\-]+$', model_name):
raise ValueError(f"Invalid model name: {model_name}")
return model_name
def download_model(model_name, url):
model_name = sanitize_model_name(model_name)
LOCAL_MODEL_DIRECTORY = Path('models/pt/')
LOCAL_MODEL_DIRECTORY.mkdir(parents=True, exist_ok=True)
local_file_path = LOCAL_MODEL_DIRECTORY / model_name
if local_file_path.exists():
print(f"'{model_name}' exists. Skipping download.")
return
response = requests.get(url, stream=True)
if response.status_code == 200:
with open(local_file_path, 'wb') as f:
for chunk in response.iter_content(chunk_size=8192):
f.write(chunk)
print(f"'{model_name}' saved to '{local_file_path}'.")
else:
print(f"Error downloading '{model_name}': {response.status_code}")
def main():
MODEL_URLS = {
'best_yolov8l.pt': 'http://changdar-server.mooo.com:28000/models/best_yolov8l.pt',
'best_yolov8x.pt': 'http://changdar-server.mooo.com:28000/models/best_yolov8x.pt',
}
for model_name, url in MODEL_URLS.items():
download_model(model_name, url)
if __name__ == '__main__':
main()
src/live_stream_detection.py
Changes:
- Validate and sanitize the
output_folder
andmodel_key
parameters. - Validate the
stream_url
parameter.
import os
import argparse
import cv2
from pathlib import Path
import re
from urllib.parse import urlparse
class LiveStreamDetector:
def __init__(self, api_url: str = 'http://localhost:5000', model_key: str = 'yolov8l', output_folder: str | None = None, run_local: bool = True):
self.api_url = api_url
self.model_key = self.sanitize_model_key(model_key)
self.output_folder = self.sanitize_output_folder(output_folder)
self.run_local = run_local
def sanitize_output_folder(self, output_folder: str) -> str:
if output_folder:
output_folder = os.path.abspath(output_folder)
if not output_folder.startswith(os.getcwd()):
raise ValueError("Invalid output folder path")
return output_folder
def sanitize_model_key(self, model_key: str) -> str:
if not re.match(r'^[\w\-]+$', model_key):
raise ValueError(f"Invalid model key: {model_key}")
return model_key
def generate_detections_local(self, frame: cv2.Mat) -> list[list[float]]:
model_path = Path('models/pt/') / f"best_{self.model_key}.pt"
# Implementation of detection logic
pass
def run_detection(self, stream_url: str) -> None:
if not self.is_valid_url(stream_url):
raise ValueError("Invalid stream URL")
cap = cv2.VideoCapture(stream_url)
# Implementation of detection logic
pass
def is_valid_url(self, url: str) -> bool:
parsed_url = urlparse(url)
return all([parsed_url.scheme, parsed_url.netloc])
if __name__ == '__main__':
parser = argparse.ArgumentParser(description='Perform live stream detection and tracking using YOLOv8.')
parser.add_argument('--url', type=str, help='Live stream URL', required=True)
parser.add_argument('--api_url', type=str, default='http://localhost:5000', help='API URL for detection')
parser.add_argument('--model_key', type=str, default='yolov8n', help='Model key for detection')
parser.add_argument('--output_folder', type=str, help='Output folder for detected frames')
parser.add_argument('--run_local', action='store_true', help='Run detection using local model')
args = parser.parse_args()
detector = LiveStreamDetector(api_url=args.api_url, model_key=args.model_key, output_folder=args.output_folder, run_local=args.run_local)
detector.run_detection(args.url)
How to replicate the bug
-
For
src/stream_capture.py
:- Run the script with a malicious URL input that contains path traversal sequences.
- Example:
python src/stream_capture.py --url "http://example.com/../../etc/passwd"
-
For
src/model_fetcher.py
:- Run the script with a malicious model name that contains path traversal sequences.
- Example: Modify
MODEL_URLS
to include a key like../etc/passwd
.
-
For
src/live_stream_detection.py
:- Run the script with malicious inputs for
--output_folder
and--model_key
. - Example:
python src/live_stream_detection.py --url "http://example.com" --output_folder "../../etc" --model_key "../etc/passwd"
- Run the script with malicious inputs for
By implementing the recommended changes, these vulnerabilities will be mitigated, ensuring that user inputs are properly validated and sanitized.
Click here to create a Pull Request with the proposed solution
Files used for this task:
Changes on src/stream_capture.py
Analysis Report
Overview
The file src/stream_capture.py
is responsible for capturing frames from a video stream. It includes functionalities to initialize the stream, capture frames at specified intervals, check internet speed, select stream quality based on speed, and handle failures in capturing frames.
Potential Security Issue
The primary concern mentioned in the issue is the "Uncontrolled data used in path expression," which can lead to path traversal vulnerabilities. In this file, the user input is primarily the stream URL provided via command-line arguments.
Key Areas to Review
- Stream URL Handling:
- The stream URL is taken directly from user input without any validation or sanitization.
- This URL is used in methods like
initialise_stream
andselect_quality_based_on_speed
.
Detailed Analysis
Stream URL Handling
-
Initialization:
def __init__(self, stream_url: str, capture_interval: int = 15): self.stream_url = stream_url self.cap: cv2.VideoCapture | None = None self.capture_interval = capture_interval
-
Stream Initialization:
def initialise_stream(self, stream_url: str) -> None: self.cap = cv2.VideoCapture(stream_url) self.cap.set(cv2.CAP_PROP_BUFFERSIZE, 1) self.cap.set(cv2.CAP_PROP_FOURCC, cv2.VideoWriter_fourcc(*'H264')) if not self.cap.isOpened(): time.sleep(5) self.cap.open(stream_url)
-
Command-line Argument Parsing:
if __name__ == '__main__': parser = argparse.ArgumentParser( description='Capture video stream frames.', ) parser.add_argument( '--url', type=str, help='Live stream URL', required=True, ) args = parser.parse_args() stream_capture = StreamCapture(args.url) for frame, timestamp in stream_capture.execute_capture(): print(f"Frame at {timestamp} displayed") del frame gc.collect()
Recommendations
-
Sanitize and Validate Stream URL:
- Ensure that the stream URL is a valid URL and does not contain any malicious content.
- Use libraries like
urllib.parse
to parse and validate the URL.
-
Secure Methods for Handling URLs:
- Avoid directly using user inputs in critical operations without validation.
- Implement a whitelist of allowed URL patterns if possible.
Example Fix
Here is an example of how you can sanitize and validate the stream URL:
import re
from urllib.parse import urlparse
def is_valid_url(url: str) -> bool:
# Basic URL validation
parsed_url = urlparse(url)
return all([parsed_url.scheme, parsed_url.netloc])
def sanitize_url(url: str) -> str:
# Remove any unwanted characters or patterns
url = re.sub(r'[^\w\-/:.?&=]', '', url)
return url
if __name__ == '__main__':
parser = argparse.ArgumentParser(
description='Capture video stream frames.',
)
parser.add_argument(
'--url',
type=str,
help='Live stream URL',
required=True,
)
args = parser.parse_args()
sanitized_url = sanitize_url(args.url)
if not is_valid_url(sanitized_url):
raise ValueError("Invalid URL provided")
stream_capture = StreamCapture(sanitized_url)
for frame, timestamp in stream_capture.execute_capture():
print(f"Frame at {timestamp} displayed")
del frame
gc.collect()
Conclusion
The primary issue in this file is the lack of validation and sanitization of the stream URL provided by the user. By implementing the recommended changes, you can mitigate the risk of path traversal and other related vulnerabilities.
Changes on src/model_fetcher.py
Analysis Report for src/model_fetcher.py
Overview
The src/model_fetcher.py
file is responsible for downloading model files from specified URLs and saving them locally. The main function iterates over a dictionary of model names and URLs, and the download_model
function handles the actual downloading and saving of the files.
Potential Security Issue
The primary concern in this file is the potential for path traversal vulnerabilities due to uncontrolled data being used in path expressions. Specifically, the model_name
parameter in the download_model
function is used directly to construct file paths without any validation or sanitization.
Detailed Analysis
-
Path Construction:
LOCAL_MODEL_DIRECTORY = Path('models/pt/') local_file_path = LOCAL_MODEL_DIRECTORY / model_name
The
model_name
is directly appended to theLOCAL_MODEL_DIRECTORY
to form thelocal_file_path
. Ifmodel_name
contains path traversal characters (e.g.,../
), it could lead to writing files outside the intended directory. -
File Existence Check:
if local_file_path.exists(): print(f"'{model_name}' exists. Skipping download.") return
This check ensures that the file is not downloaded if it already exists, but it does not mitigate the risk of path traversal.
-
File Writing:
with open(local_file_path, 'wb') as f: for chunk in response.iter_content(chunk_size=8192): f.write(chunk)
Writing to the file without validating the
model_name
can lead to overwriting critical files if path traversal is exploited.
Recommendations
To mitigate the risk of path traversal vulnerabilities, the following steps should be taken:
-
Sanitize
model_name
:
Ensure thatmodel_name
does not contain any path traversal characters. This can be done by validating the input and allowing only safe characters (e.g., alphanumeric characters, underscores, and hyphens). -
Use Secure Methods for Path Handling:
Utilize secure methods to handle file paths, ensuring that the constructed paths are within the intended directory.
Example Implementation
Here is an example of how to implement these recommendations:
from __future__ import annotations
from pathlib import Path
from typing import TypedDict
import requests
import re
class ModelInfo(TypedDict):
model_name: str
url: str
def sanitize_model_name(model_name: str) -> str:
"""
Sanitize the model name to prevent path traversal attacks.
Args:
model_name (str): The name of the model file.
Returns:
str: The sanitized model name.
"""
# Allow only alphanumeric characters, underscores, and hyphens
if not re.match(r'^[\w\-]+$', model_name):
raise ValueError(f"Invalid model name: {model_name}")
return model_name
def download_model(model_name, url):
"""
Download a model file if it doesn't already exist.
Args:
model_name (str): The name of the model file.
url (str): The URL of the model file.
"""
# Sanitize the model name
model_name = sanitize_model_name(model_name)
# Define the local directory to store the model files
LOCAL_MODEL_DIRECTORY = Path('models/pt/')
# Ensure the local directory exists
LOCAL_MODEL_DIRECTORY.mkdir(parents=True, exist_ok=True)
# Build the full local file path
local_file_path = LOCAL_MODEL_DIRECTORY / model_name
# Check if the model already exists
if local_file_path.exists():
print(f"'{model_name}' exists. Skipping download.")
return
# Send an HTTP GET request to fetch the model file
response = requests.get(url, stream=True)
if response.status_code == 200:
# Begin the download and write to the file
with open(local_file_path, 'wb') as f:
for chunk in response.iter_content(chunk_size=8192):
f.write(chunk)
print(f"'{model_name}' saved to '{local_file_path}'.")
else:
print(f"Error downloading '{model_name}': {response.status_code}")
def main():
# Define the URLs for the model files
MODEL_URLS = {
'best_yolov8l.pt': 'http://changdar-server.mooo.com:28000/models/best_yolov8l.pt',
'best_yolov8x.pt': 'http://changdar-server.mooo.com:28000/models/best_yolov8x.pt',
}
# Iterate over all models and download them if they don't already exist
for model_name, url in MODEL_URLS.items():
download_model(model_name, url)
if __name__ == '__main__':
main()
Conclusion
The primary issue in the src/model_fetcher.py
file is the lack of validation and sanitization of the model_name
parameter, which can lead to path traversal vulnerabilities. By implementing input sanitization and using secure methods for path handling, this vulnerability can be mitigated.
Changes on src/live_stream_detection.py
Analysis Report for src/live_stream_detection.py
Overview
The src/live_stream_detection.py
file is responsible for performing live stream detection and tracking using YOLOv8 with SAHI. It includes functionalities for both local and cloud-based detection. The primary concern is to review and sanitize any user inputs used in path expressions to prevent path traversal attacks.
Key Areas of Concern
- User Inputs in Path Expressions:
- The
output_folder
parameter in theLiveStreamDetector
class. - The
model_key
parameter in theLiveStreamDetector
class. - The
stream_url
parameter in therun_detection
method. - Command-line arguments parsed using
argparse
.
- The
Detailed Analysis
1. output_folder
Parameter
The output_folder
parameter is used to specify the folder for detected frames. This parameter can potentially be manipulated to perform path traversal attacks.
Current Usage:
def __init__(self, api_url: str = 'http://localhost:5000', model_key: str = 'yolov8l', output_folder: str | None = None, run_local: bool = True):
...
self.output_folder = output_folder
...
Recommendation:
- Validate and sanitize the
output_folder
parameter to ensure it does not contain any path traversal sequences. - Use
os.path.abspath
to convert the path to an absolute path and check if it is within a designated directory.
2. model_key
Parameter
The model_key
parameter is used to construct the model path. This parameter can also be manipulated to perform path traversal attacks.
Current Usage:
def generate_detections_local(self, frame: cv2.Mat) -> list[list[float]]:
...
model_path = Path('models/pt/') / f"best_{self.model_key}.pt"
...
Recommendation:
- Validate the
model_key
parameter to ensure it does not contain any path traversal sequences. - Use a whitelist of allowed model keys to prevent arbitrary file access.
3. stream_url
Parameter
The stream_url
parameter is used to specify the URL of the live stream. While this is not directly related to path traversal, it is important to ensure that the URL is valid and safe.
Current Usage:
def run_detection(self, stream_url: str) -> None:
...
cap = cv2.VideoCapture(stream_url)
...
Recommendation:
- Validate the
stream_url
parameter to ensure it is a valid URL. - Use a library like
validators
to check the URL format.
4. Command-line Arguments
The command-line arguments parsed using argparse
include --url
, --api_url
, --model_key
, --output_folder
, and --run_local
.
Current Usage:
if __name__ == '__main__':
parser = argparse.ArgumentParser(description='Perform live stream detection and tracking using YOLOv8.')
parser.add_argument('--url', type=str, help='Live stream URL', required=True)
parser.add_argument('--api_url', type=str, default='http://localhost:5000', help='API URL for detection')
parser.add_argument('--model_key', type=str, default='yolov8n', help='Model key for detection')
parser.add_argument('--output_folder', type=str, help='Output folder for detected frames')
parser.add_argument('--run_local', action='store_true', help='Run detection using local model')
args = parser.parse_args()
Recommendation:
- Validate and sanitize the command-line arguments, especially
--output_folder
and--model_key
. - Implement validation checks to ensure these parameters do not contain any malicious input.
Conclusion
The primary areas of concern in this file are the output_folder
and model_key
parameters, which are used in path expressions and can potentially be exploited for path traversal attacks. Additionally, the stream_url
parameter and command-line arguments should be validated to ensure they are safe and valid.
Action Items
- Implement validation and sanitization for the
output_folder
parameter. - Implement validation and sanitization for the
model_key
parameter. - Validate the
stream_url
parameter to ensure it is a valid URL. - Validate and sanitize command-line arguments, especially
--output_folder
and--model_key
.
By addressing these concerns, we can mitigate the risk of path traversal attacks and ensure the security of the application.
Disclaimer: This comment was entirely generated using AI. Be aware that the information provided may be incorrect.
Current plan usage: 32.02%
Have feedback or need help?
Discord
Documentation
support@codeautopilot.com