ACH-ARKIVO-ImportMedia/s3_utils.py

367 lines
19 KiB
Python

import boto3 # for S3
from botocore.exceptions import NoCredentialsError, PartialCredentialsError, ClientError # for exceptions
import logging # for logging
import json # for json.loads
import os # for os.path
import psycopg2 # for PostgreSQL
# Import custom modules
from file_utils import retrieve_file_contents, check_related_files, extract_and_validate_file_info # for file operations
from email_utils import handle_error # for error handling depecradted?
from db_utils import get_db_connection, check_inventory_in_db, check_objkey_in_file_db, add_file_record_and_relationship, retrieve_digital_file_names # for database operations
import config
# Function to check the existence of related files and validate in PostgreSQL
def parse_s3_files( s3, s3_files, ach_variables, excluded_folders=[]):
"""
Parses the S3 files and performs various operations on them.
Args:
s3 (S3): The S3 object for accessing S3 services.
s3_files (list): The list of S3 files to be processed.
Returns:
None
Raises:
FileNotFoundError: If a required file is not found in S3.
ValueError: If a file has zero size or if the file type is unsupported.
Exception: If any other unexpected exception occurs.
"""
# Load the configuration from the .env file
_ , db_config, _, bucket_name, _ = config.load_config()
# logg ach_variables
logging.info(f"ach_variables: {ach_variables}")
# ---------------------------------------------------------------------
# PHASE 3: PARSE & INSERT INTO DATABASE (DETAILS)
#
# 3.1) Filter out excluded prefixes and keep only files we care about.
# 3.2) Validate each media file alongside its related sidecars (.json, .md5, .pdf).
# 3.3) Cross-check the inventory code in the database and insert new records.
# ---------------------------------------------------------------------
logging.info("PHASE 3: Parse & insert - starting detailed file processing")
try:
# Ensure db_config is not None
if db_config is None:
raise ValueError("Database configuration is not loaded")
# return # Exit the function if db_config is None
conn = psycopg2.connect(**db_config)
cur = conn.cursor()
# Filter files with the desired prefix
# excluded_prefix = ['TEST-FOLDER-DEV/', 'DOCUMENTAZIONE_FOTOGRAFICA/', 'BTC/', 'VHS/', 'UMT/', 'OV2/', 'OA4/']
excluded_prefix = excluded_folders
# Exclude files that start with any prefix in the excluded_prefix array
filtered_files = [file for file in s3_files if not any(file.startswith(prefix) for prefix in excluded_prefix)]
# Filter files with the desired prefix
# DEBUG : filtered_files = [file for file in s3_files if file.startswith('TestFolderDev/')]
# Length of filtered files
logging.info(f"Array Length of filtered files: {len(filtered_files)}")
# Counters
error_files_count = 0
warning_files_count = 0
uploaded_files_count = 0
total_files = len(filtered_files)
#for file in s3_files:
for idx, file in enumerate(filtered_files, start=1):
# Display progress to console only (not written to log files)
print(f"--------------\n--- file {idx} of {total_files} ---\n--------------", flush=True)
# Use a savepoint per file to allow rollback on individual failures
# without aborting the full batch.
cur.execute("SAVEPOINT file_save")
try:
if file.endswith(('.mp4', '.mp3')): # Check for both .mp4 and .mp3
logging.info("Processing file: %s in the bucket: %s", file, bucket_name)
# check if file exists in db
result = check_objkey_in_file_db(cur, file)
# Check the result and proceed accordingly
if result:
# logging.warning(f"File {file} already exists in the database.")
warning_files_count += 1
if os.getenv('ACH_SAFE_RUN', 'true').lower() == 'true':
logging.error("ACH_SAFE_RUN=true: aborting Phase 3 due to warnings (file already exists in DB): %s", file)
raise ValueError("ACH_SAFE_RUN=true: aborting due to warnings in Phase 3")
# Rollback to savepoint to undo any partial changes for this file
cur.execute("ROLLBACK TO SAVEPOINT file_save")
continue
ach_variables['file_fullpath'] = file # is the Object key
ach_variables['inventory_code'] = os.path.splitext(os.path.basename(file))[0][:12]
logging.info(f"ach_variables['inventory_code'] {ach_variables['inventory_code']}: {file}")
# Extract the file extension
ach_variables['objectKeys']['media'] = file
ach_variables['objectKeys']['pdf'] = f"{os.path.splitext(file)[0]}.pdf"
ach_variables['objectKeys']['pdf'] = ach_variables['objectKeys']['pdf'].replace('_H264', '')
from config import EXTENSION_MIME_MAP
if file.endswith('.mp4'):
ach_variables['objectKeys']['conservative_copy'] = f"{os.path.splitext(file)[0]}.mov" # remove _H264 is done later
elif file.endswith('.mp3'):
ach_variables['objectKeys']['conservative_copy'] = f"{os.path.splitext(file)[0]}.wav"
else:
logging.error(f"Unsupported file type: {file}")
error_files_count +=1
# Rollback to savepoint for this file
cur.execute("ROLLBACK TO SAVEPOINT file_save")
continue
# Extract the file extension
file_extension = os.path.splitext(file)[1]
ach_variables['extension'] = file_extension # Store the file extension in ach_variables
logging.info(f"the file File extension: {file_extension}")
# Extract the file name with directory part
file_name_with_path = os.path.splitext(file)[0] # Remove the extension but keep path
logging.info(f"File name with path: {file_name_with_path}")
# Extract the base name from the file name
base_name = os.path.basename(file_name_with_path) # Extract the base name with path removed
logging.info(f"Base name: {base_name}")
# Apply _H264 removal only for .mp4 files
if file.endswith('.mp4'):
logging.info(f"File is an mp4 file: {file}. remove _H264")
base_name = base_name.replace('_H264', '')
file_name_with_path = file_name_with_path.replace('_H264', '')
logging.info(f"Modified base name for mp4: {base_name}")
logging.info(f"Modified file name with path for mp4: {file_name_with_path}")
try:
# Retrieve and log the file size
file_size = get_file_size(s3, bucket_name, file)
# maybe can trow an error inside te get_file_size function and catch it here
if file_size is not None:
ach_variables['media_disk_size'] = file_size
logging.info(f"The media file disk size is: {ach_variables['media_disk_size']}")
else:
logging.warning("Could not retrieve file size for %s.", file)
warning_files_count += 1
if os.getenv('ACH_SAFE_RUN', 'true').lower() == 'true':
logging.error("ACH_SAFE_RUN=true: aborting Phase 3 due to warnings (missing file size): %s", file)
raise ValueError("ACH_SAFE_RUN=true: aborting due to warnings in Phase 3")
continue # Skip to the next file in the loop
logging.info("Start Validating files for %s...", base_name)
# Check if related file exist and retreive .pdf file size
try:
# Check if the required files exist in S3
ach_variables['pdf_disk_size'] = check_related_files(s3, file_name_with_path, file, bucket_name)
logging.info(f"PDF disk size: {ach_variables['pdf_disk_size']}")
except FileNotFoundError as e:
# Handle case where the file is not found
logging.error(f"File not found error: {e}")
error_files_count +=1
continue # Move on to the next file in the loop
except ValueError as e:
# Handle value errors
logging.error(f"Value error: {e} probabli filesize zero")
error_files_count +=1
continue # Move on to the next file in the loop
except PermissionError as e:
# Handle permission errors
logging.error(f"Permission error: {e}")
error_files_count +=1
continue # Move on to the next file in the loop
except Exception as e:
# Handle any other exceptions
logging.error(f"An error occurred: {e}")
# Retrieve the file contents for related files: .md5, .json
try:
# Check if the file exists in S3 and retrieve file contents
logging.info(f"Retrieving file contents for {file_name_with_path}...")
file_contents = retrieve_file_contents(s3, f"{file_name_with_path}")
except Exception as e:
# Log the error
logging.error(f"Error retrieving file contents for {file_name_with_path}: {e}")
file_contents = None # Set file_contents to None or handle it as needed
error_files_count +=1
continue # Move on to the next file in the loop
# if contents dont exists
if file_contents is None:
logging.error(f"Error retrieving file contents for {file}.")
error_files_count +=1
continue # Move on to the next file in the loop
# Ensure file_contents is a dictionary
if isinstance(file_contents, str):
file_contents = json.loads(file_contents)
# Extract and validate file information
ach_variables['custom_data_in'], ach_variables['disk_size'], ach_variables['conservative_copy_extension'] = extract_and_validate_file_info(file_contents, file, ach_variables)
logging.info(f"Custom data extracted: {ach_variables['custom_data_in']}")
logging.info(f"Disk size extracted: {ach_variables['disk_size']}")
logging.info(f"Conservative copy extension extracted: {ach_variables['conservative_copy_extension']}")
logging.info(f"File {file} file validation completed")
except Exception as e:
logging.error(f"Error processing file {file}: {e}")
error_files_count +=1
continue # Move on to the next file in the loop
# no need truncate base name on this point
# base_name = base_name[:12] # Truncate the base name to 12 characters
# ////////////////////////////////////////////////////////////////////////////////
# Check if the base name exists in the database
logging.info(f"Checking database for {base_name}...")
try:
# Call the function to check the inventory code in the database and get the result
result, truncated_base_name = check_inventory_in_db(s3, cur, base_name)
logging.info(f"base name {base_name}, truncated_base_name: {truncated_base_name}")
# Check the result and proceed accordingly
if result:
logging.info(f"Inventory code {base_name} found in the database.")
# Call the function to retrieve digital file names
if retrieve_digital_file_names(s3, cur, base_name, ach_variables['objectKeys']['media']) == True:
# Call the function to add a file record and its relationship to the support record
# logging.info(f"ach_variables: {ach_variables}")
add_file_record_and_relationship(s3, cur, base_name, ach_variables)
else:
logging.warning(f"File record already exists for {base_name}.")
warning_files_count +=1
continue
else:
logging.error(f"Inventory code {base_name} not found in the database.")
error_files_count +=1
continue
except ValueError as e:
logging.error(f"An error occurred: {e}")
error_files_count +=1
continue
# Commit the changes to the database
logging.info(f"commit to databse {base_name}...")
# Commit to the database (conn, cur) only if everything is okay; otherwise, perform a rollback.
conn.commit()
uploaded_files_count +=1
except Exception as e:
# Roll back the changes done for this file only and continue processing others
logging.error(f"Error processing {file}: {e}. Rolling back this file's changes.")
try:
cur.execute("ROLLBACK TO SAVEPOINT file_save")
except Exception as rollback_err:
logging.error(f"Failed to rollback savepoint for {file}: {rollback_err}")
error_files_count += 1
continue
finally:
# Release the savepoint so it doesn't linger in the session
try:
cur.execute("RELEASE SAVEPOINT file_save")
except Exception:
# Ignore release errors; rollback already cleaned up state if needed
pass
cur.close()
conn.close()
except ValueError as e:
# Handle specific validation errors
logging.error(f"Validation error: {e}")
#handle_error(e) # Pass the ValueError to handle_error
raise e # Raise the exception to the calling function
except Exception as e:
# Handle any other unexpected errors
logging.error(f"Unexpected error: {e}")
#handle_error(e) # Pass unexpected errors to handle_error
raise e # Raise the exception to the calling function
# return the file saved
return uploaded_files_count, warning_files_count, error_files_count
# Function to create an S3 client
def create_s3_client(aws_config):
logging.info(f'Creating S3 client with endpoint: {aws_config["endpoint_url"]}')
try:
s3 = boto3.client(
's3',
endpoint_url=aws_config['endpoint_url'],
aws_access_key_id=aws_config['aws_access_key_id'],
aws_secret_access_key=aws_config['aws_secret_access_key'],
region_name=aws_config['region_name'],
config=boto3.session.Config(
signature_version='s3v4',
s3={'addressing_style': 'path'}
)
)
logging.info('S3 client created successfully')
return s3
except (NoCredentialsError, PartialCredentialsError) as e:
logging.error(f'Error creating S3 client: {e}')
raise e
# Function to list the contents of an S3 bucket
def list_s3_bucket(s3_client, bucket_name):
try:
paginator = s3_client.get_paginator('list_objects_v2')
bucket_contents = []
for page in paginator.paginate(Bucket=bucket_name):
if 'Contents' in page:
bucket_contents.extend(page['Contents'])
logging.info(f"Retrieved {len(bucket_contents)} items from the bucket.")
return bucket_contents
except ClientError as e:
logging.error(f'Error listing bucket contents: {e}')
raise e
# Function to get file size from S3
def get_file_size(s3_client, bucket_name, file_key):
try:
response = s3_client.head_object(Bucket=bucket_name, Key=file_key)
return response['ContentLength']
except ClientError as e:
logging.error(f"Failed to retrieve file size for {file_key}: {e}")
return None # or an appropriate fallback value
except Exception as e:
logging.error(f"An unexpected error occurred: {e}")
return None
# Function to check if a file exists in S3
def check_file_exists_in_s3(s3, file_name,bucket_name):
"""
Checks if a file exists in an S3 bucket.
Parameters:
- s3 (boto3.client): The S3 client object.
- file_name (str): The name of the file to check.
Returns:
- bool: True if the file exists, False otherwise.
Raises:
- ClientError: If there is an error checking the file.
"""
try:
s3.head_object(Bucket=bucket_name, Key=file_name)
return True
except ClientError as e:
if e.response['Error']['Code'] == '404':
return False
else:
logging.error(f'Error checking file {file_name}: {e}')
raise e
# Function to retrieve file contents from S3
def upload_file_to_s3(s3_client, file_path, bucket_name, object_name=None):
if object_name is None:
object_name = file_path
try:
s3_client.upload_file(file_path, bucket_name, object_name)
logging.info(f"File {file_path} uploaded to {bucket_name}/{object_name}")
except ClientError as e:
logging.error(f'Error uploading file {file_path} to bucket {bucket_name}: {e}')
raise e
# Function to download a file from S3
def download_file_from_s3(s3_client, bucket_name, object_name, file_path):
try:
s3_client.download_file(bucket_name, object_name, file_path)
logging.info(f"File {object_name} downloaded from {bucket_name} to {file_path}")
except ClientError as e:
logging.error(f'Error downloading file {object_name} from bucket {bucket_name}: {e}')
raise e