ACH-ARKIVO-ImportMedia/file_utils.py

349 lines
16 KiB
Python

import os
import logging
from logging.handlers import RotatingFileHandler
import json
from utils import check_video_info, check_audio_info
from error_handler import handle_error
from botocore.exceptions import ClientError
#from config import load_config, aws_config, bucket_name
import config
def retrieve_file_contents(s3, base_name):
file_contents = {}
# Retrieve the configuration values
# aws_config, db_config, ach_config, bucket_name, ach_variables = config.load_config()
_, _, _, bucket_name, _ = config.load_config()
try:
# Define the file extensions as pairs
file_extensions = [['json', 'json'], ['md5', 'md5']]
for ext_pair in file_extensions:
file_name = f"{base_name}.{ext_pair[0]}"
try:
response = s3.get_object(Bucket=bucket_name, Key=file_name)
file_contents[ext_pair[1]] = response['Body'].read().decode('utf-8')
logging.info(f"Retrieved {ext_pair[1]} file content for base_name {base_name}.")
except ClientError as e:
# S3 returns a NoSuchKey error code when the key is missing.
code = e.response.get('Error', {}).get('Code', '')
if code in ('NoSuchKey', '404', 'NotFound'):
logging.warning(f"{file_name} not found in S3 (code={code}).")
# treat missing sidecars as non-fatal; continue
continue
else:
logging.error(f"Error retrieving {file_name}: {e}", exc_info=True)
# Re-raise other ClientError types
raise
except Exception as e:
logging.error(f'Error retrieving file contents for {base_name}: {e}', exc_info=True)
# Return empty JSON structure instead of raising to avoid tracebacks in callers
try:
return json.dumps({})
except Exception:
return '{}'
# Clean and format file_contents as proper JSON
try:
cleaned_contents = {}
# Clean the contents
for key, value in file_contents.items():
if isinstance(value, str):
# Remove trailing newlines or any other unwanted characters
cleaned_value = value.strip()
# Attempt to parse JSON
try:
cleaned_contents[key] = json.loads(cleaned_value)
except json.JSONDecodeError:
cleaned_contents[key] = cleaned_value
else:
cleaned_contents[key] = value
# Return the cleaned and formatted JSON
return json.dumps(cleaned_contents, indent=4)
except (TypeError, ValueError) as e:
logging.error(f'Error formatting file contents as JSON: {e}', exc_info=True)
raise e
def check_related_files(s3, file_name_with_path, file, bucket_name):
"""
Check for related files in S3 based on the given file type.
Parameters:
- s3: The S3 client object.
- file_name_with_path: The name of the file with its path.
- file: The file name.
- bucket_name: The name of the S3 bucket.
Returns:
None
Raises:
- FileNotFoundError: If a required file is not found in S3.
- ValueError: If a file has zero size.
- Exception: If an unexpected exception occurs.
"""
from s3_utils import check_file_exists_in_s3, get_file_size # avoid circular import
import config
# Load the configuration from the .env file
# aws_config, db_config, ach_config, bucket_name, ach_variables = config.load_config()
_, _, _, bucket_name, _ = config.load_config()
ach_pdf_disk_size = 0
# Set required extensions based on the file type
if file.endswith('.mp4'):
required_extensions = ['json', 'md5', 'pdf']
elif file.endswith('.mp3'):
required_extensions = ['json', 'md5']
else:
required_extensions = []
logging.info(f"Required extensions: {required_extensions}")
for ext in required_extensions:
related_file = f"{file_name_with_path}.{ext}"
logging.info(f"Checking for related file: {related_file}")
try:
if not check_file_exists_in_s3(s3, related_file,bucket_name):
error_message = f"Required file {related_file} not found in S3."
logging.error(error_message)
raise FileNotFoundError(error_message)
else:
logging.info(f"Found related file: {related_file}")
except FileNotFoundError as e:
logging.error(f"Caught a FileNotFoundError: {e}")
except Exception as e:
logging.error(f"Caught an unexpected exception: {e}")
# Check the size of the related file
try:
if ext in ['json', 'md5', 'pdf']:
file_size = get_file_size(s3, bucket_name, related_file)
if file_size == 0:
error_message = f"File {related_file} has zero size."
logging.error(error_message)
raise ValueError(error_message)
else:
logging.info(f"File {related_file} size: {file_size}")
except ValueError as e:
logging.error(f"Caught a ValueError file Size is zero: {e}")
raise ValueError(f"File {related_file} has zero size.")
except Exception as e:
logging.error(f"Caught an unexpected exception: {e}")
# If the required file is a .pdf, get its size and update ach_pdf_disk_size
if ext =='pdf':
pdf_file = f"{file_name_with_path}.pdf"
if check_file_exists_in_s3(s3, pdf_file,bucket_name):
pdf_file_size = get_file_size(s3, bucket_name, pdf_file)
ach_pdf_disk_size = pdf_file_size
logging.info(f"PDF disk size: {ach_pdf_disk_size}")
else:
logging.error(f"PDF file {pdf_file} not found.")
raise FileNotFoundError(f"PDF file {pdf_file} not found.")
return ach_pdf_disk_size
def extract_and_validate_file_info(file_contents, file, ach_variables):
# Load the configuration from the .env file
#aws_config, db_config, ach_config, bucket_name, _ = config.load_config()
# Extract relevant information from nested JSON
ach_custom_data_in = file_contents
# Check if json contain mediainfo metadata or ffprobe metadata or both
logging.info(f"Extracted JSON contents: {ach_custom_data_in['json']}")
# Check for keys at the first level
if 'mediainfo' in ach_custom_data_in['json'] and 'ffprobe' in ach_custom_data_in['json']:
ach_variables['custom_data_in'] = {
"mediainfo": ach_custom_data_in['json'].get('mediainfo', {}),
"ffprobe": ach_custom_data_in['json'].get('ffprobe', {}),
"filename": ach_custom_data_in['json'].get('filename', ''),
"md5": ach_custom_data_in.get('md5', '')
}
logging.info("mediainfo and ffprobe metadata found in JSON file.")
# Check for keys at the second level if it is not already ordered
elif 'creatingLibrary' in ach_custom_data_in['json'] and ach_custom_data_in['json'].get('creatingLibrary','').get('name','') == 'MediaInfoLib':
ach_variables['custom_data_in'] = {
"mediainfo": ach_custom_data_in['json'],
"md5": ach_custom_data_in.get('md5', '')
}
logging.info("mediainfo metadata found in JSON file.")
elif 'streams' in ach_custom_data_in['json']:
ach_variables['custom_data_in'] = {
"ffprobe": ach_custom_data_in['json'],
"md5": ach_custom_data_in.get('md5', '')
}
logging.info("ffprobe metadata found in JSON file.")
else:
ach_variables['custom_data_in'] = {
"md5": ach_custom_data_in.get('md5', '')
}
logging.error(f"No recognized data found in JSON file.{ach_custom_data_in} - {file_contents}")
# trhow an error
raise ValueError("No recognized data found in JSON file.")
logging.info(f"Extracted JSON contents: {ach_variables['custom_data_in']}")
# Extract FileExtension and FileSize if "@type" is "General"
ach_disk_size = None
tracks = ach_variables['custom_data_in'].get('mediainfo', {}).get('media', {}).get('track', [])
for track in tracks:
# Check if @type is "General"
if track.get('@type') == 'General':
# Retrieve the disk size from the General track
ach_disk_size = track.get('FileSize', None)
logging.info(f"Disk size from JSON media.track.General: {ach_disk_size}")
# Retrieve the file extension from the General track
ach_conservative_copy_extension = '.' + track.get('FileExtension', None)
logging.info(f"FileExtension JSON media.track.General: {ach_conservative_copy_extension}")
# Exit loop after finding the General track
break # Exit the loop after finding the General track
# Convert ach_disk_size to an integer if found
if ach_disk_size is not None:
ach_disk_size = int(ach_disk_size)
# MEDIAINFO
if "mediainfo" in ach_variables['custom_data_in'] and "media" in ach_variables['custom_data_in'].get("mediainfo"):
# Extract the media_ref field from the JSON file contents
media_ref = ach_variables['custom_data_in'].get('mediainfo', {}).get("media", {}).get("@ref", "")
#STRIP DOUBLE BACK SLASKS FROM PATH
media_ref = media_ref.replace("\\", "/")
logging.info(f"Media ref medianfo: {media_ref}")
# Split the path using '/' and get the last part (file name)
file_name = media_ref.split('/')[-2] + '/' + media_ref.split('/')[-1]
logging.info(f"Media file name (copia conservativa): {file_name}")
# Update the @ref field with the new file name
ach_variables['custom_data_in']["mediainfo"]["media"]["@ref"] = file_name
logging.info(f"Updated the truncated file_name at mediainfo.media.@ref {ach_variables['custom_data_in']['mediainfo']['media']['@ref']}")
else:
logging.warning(f"mediainfo.media.@ref not found in JSON file.")
# FFPROBE
if "ffprobe" in ach_variables['custom_data_in'] and "format" in ach_variables['custom_data_in'].get("ffprobe"):
# Extract the media_ref field from the JSON file contents
media_ref = ach_variables['custom_data_in'].get('ffprobe', {}).get("format", {}).get("filename", "")
#STRIP DOUBLE BACK SLASKS FROM PATH
media_ref = media_ref.replace("\\", "/")
logging.info(f"Media ref medianfo: {media_ref}")
# Split the path using '/' and get the last part (file name)
file_name = media_ref.split('/')[-2] + '/' + media_ref.split('/')[-1]
logging.info(f"Media file name (copia conservativa): {file_name}")
# Update the @ref field with the new file name
ach_variables['custom_data_in']["ffprobe"]["format"]["filename"] = file_name
logging.info(f"Updated the truncated file_name at ffprobe.format.filename {ach_variables['custom_data_in']['mediainfo']['media']['@ref']}")
else:
logging.warning(f"ffprobe.format.filename not found in JSON file.")
logging.info(f"Updated the truncated file_name at mediainfo.media.@ref {file_name}")
logging.info(f"JSON contents: {ach_variables['custom_data_in']}")
# Check if file_contents is a string
if isinstance(ach_variables['custom_data_in'], str):
# Parse the JSON string into a dictionary
ach_custom_data_in = json.loads(ach_variables['custom_data_in'])
else:
# Assume file_contents is already a dictionary
ach_custom_data_in = ach_variables['custom_data_in']
# Check if basename is equal to name in the json file
json_ref_mediainfo_path = ach_custom_data_in.get('mediainfo', {}).get("media", {}).get("@ref", "")
json_ref_ffprobe_path = ach_custom_data_in.get('ffprobe', {}).get("format", {}).get("filename", "")
logging.info(f"JSON file names: mediainfo: '{json_ref_mediainfo_path}', ffprobe: '{json_ref_ffprobe_path}', ach_file_fullpath: '{ach_variables['file_fullpath']}'")
# Extract base names
basename_fullpath = os.path.splitext(os.path.basename(ach_variables['file_fullpath']))[0]
basename_fullpath = basename_fullpath.replace('_H264', '')
basename_mediainfo = os.path.splitext(os.path.basename(json_ref_mediainfo_path))[0]
basename_ffprobe = os.path.splitext(os.path.basename(json_ref_ffprobe_path))[0]
# Check if the basenames are equal
if basename_fullpath != basename_mediainfo:
logging.warning(f"ach_file_fullpath '{basename_fullpath}' does not match JSON mediainfo file name '{basename_mediainfo}'.")
else:
logging.info(f"ach_file_fullpath '{basename_fullpath}' matches JSON mediainfo file name '{basename_mediainfo}'.")
# Check if the basename matches the ffprobe path
if basename_fullpath != basename_ffprobe:
logging.warning(f"ach_file_fullpath '{basename_fullpath}' does not match JSON ffprobe file name '{basename_ffprobe}'.")
else:
logging.info(f"ach_file_fullpath '{basename_fullpath}' matches JSON ffprobe file name '{basename_ffprobe}'.")
if basename_fullpath != basename_mediainfo and basename_fullpath != basename_ffprobe:
logging.error(f"ach_file_fullpath '{basename_fullpath}' does not match either JSON file name '{basename_mediainfo}' or '{basename_ffprobe}'.")
raise ValueError(f"ach_file_fullpath '{basename_fullpath}' does not match either JSON file name '{basename_mediainfo}' or '{basename_ffprobe}'.")
# Check if the file is a video or audio file
try:
if file.endswith('.mp4'):
result, message = check_video_info(ach_custom_data_in.get('mediainfo', {}))
logging.info(f"Validation result for {file}: {message}")
elif file.endswith('.mp3'):
result, message = check_audio_info(ach_custom_data_in.get('mediainfo', {}))
logging.info(f"Validation result for {file}: {message}")
else:
# Handle cases where the file type is not supported
raise ValueError(f"Unsupported file type: {file}")
# Handle the error if validation fails
if not result:
error_message = f"Validation failed for {file}: {message}"
logging.error(error_message)
# handle_error(ValueError(error_message)) # Create and handle the exception
except ValueError as e:
# Handle specific ValueError exceptions
logging.error(f"Caught a ValueError: {e}")
#handle_error(e) # Pass the ValueError to handle_error
except Exception as e:
# Handle any other unexpected exceptions
logging.error(f"Caught an unexpected exception: {e}")
#handle_error(e) # Pass unexpected exceptions to handle_error
# Return the updated ach_custom_data_in dictionary
ach_custom_data_in.pop('filename', None) # Remove 'filename' key if it exists
# logging.info(f"ach_custom_data_in: {json.dumps(ach_custom_data_in, indent=4)}")
return ach_custom_data_in, ach_disk_size, ach_conservative_copy_extension
def is_file_empty(file_path):
return os.path.exists(file_path) and os.path.getsize(file_path) == 0
# unused function
def read_file(file_path):
try:
with open(file_path, 'r') as file:
return file.read()
except FileNotFoundError as e:
logging.error(f"File not found: {e}")
raise e
except IOError as e:
logging.error(f"IO error: {e}")
raise e
def write_file(file_path, content):
try:
with open(file_path, 'w') as file:
file.write(content)
except IOError as e:
logging.error(f"IO error: {e}")
raise e