import boto3 # for S3 from botocore.exceptions import NoCredentialsError, PartialCredentialsError, ClientError # for exceptions import logging # for logging import json # for json.loads import os # for os.path import psycopg2 # for PostgreSQL # Import custom modules from file_utils import retrieve_file_contents, check_related_files, extract_and_validate_file_info # for file operations from error_handler import notify_error from db_utils import get_db_connection, check_inventory_in_db, check_objkey_in_file_db, add_file_record_and_relationship, retrieve_digital_file_names # for database operations import config # Function to check the existence of related files and validate in PostgreSQL def parse_s3_files( s3, s3_files, ach_variables, excluded_folders=[], s3_listing_cache=None): """ Parses the S3 files and performs various operations on them. Args: s3 (S3): The S3 object for accessing S3 services. s3_files (list): The list of S3 files to be processed. s3_listing_cache (dict, optional): Mapping of S3 keys to listing objects. Returns: tuple: (uploaded_files_count, warning_files_count, error_files_count) Raises: FileNotFoundError: If a required file is not found in S3. ValueError: If a file has zero size or if the file type is unsupported. Exception: If any other unexpected exception occurs. """ # Load the configuration from the .env file _ , db_config, _, bucket_name, _ = config.load_config() # logg ach_variables logging.info(f"ach_variables: {ach_variables}") # --------------------------------------------------------------------- # PHASE 3: PARSE & INSERT INTO DATABASE (DETAILS) # # 3.1) Filter out excluded prefixes and keep only files we care about. # 3.2) Validate each media file alongside its related sidecars (.json, .md5, .pdf). # 3.3) Cross-check the inventory code in the database and insert new records. # --------------------------------------------------------------------- logging.info("PHASE 3: Parse & insert - starting detailed file processing") try: # Ensure db_config is not None if db_config is None: raise ValueError("Database configuration is not loaded") # return # Exit the function if db_config is None conn = psycopg2.connect(**db_config) cur = conn.cursor() # Filter files with the desired prefix # excluded_prefix = ['TEST-FOLDER-DEV/', 'DOCUMENTAZIONE_FOTOGRAFICA/', 'BTC/', 'VHS/', 'UMT/', 'OV2/', 'OA4/'] excluded_prefix = excluded_folders # Exclude files that start with any prefix in the excluded_prefix array filtered_files = [file for file in s3_files if not any(file.startswith(prefix) for prefix in excluded_prefix)] # Filter files with the desired prefix # DEBUG : filtered_files = [file for file in s3_files if file.startswith('TestFolderDev/')] # Length of filtered files logging.info(f"Array Length of filtered files: {len(filtered_files)}") # Counters error_files_count = 0 warning_files_count = 0 uploaded_files_count = 0 total_files = len(filtered_files) #for file in s3_files: for idx, file in enumerate(filtered_files, start=1): # Display progress to console only (not written to log files) print(f"--------------\n--- file {idx} of {total_files} ---\n--------------", flush=True) # Ensure we start each file with a clean transaction state. try: conn.rollback() except Exception as e: logging.error(f"Rollback failed before processing {file}: {e}") try: cur.close() except Exception: pass cur = conn.cursor() try: # Use a savepoint per file to allow rollback on individual failures # without aborting the full batch. cur.execute("SAVEPOINT file_save") except Exception as e: # If the transaction is aborted, log and retry once. import traceback logging.error(f"Transaction aborted before processing {file}, retrying after reset: {e}") logging.error(traceback.format_exc()) try: conn.rollback() except Exception as rollback_err: logging.error(f"Rollback failed while recovering from aborted transaction: {rollback_err}") try: cur.close() except Exception: pass cur = conn.cursor() # Retry savepoint once after recovery cur.execute("SAVEPOINT file_save") try: if file.endswith(('.mp4', '.mp3')): # Check for both .mp4 and .mp3 logging.info("Processing file: %s in the bucket: %s", file, bucket_name) # check if file exists in db result = check_objkey_in_file_db(cur, file) # Check the result and proceed accordingly if result: # logging.warning(f"File {file} already exists in the database.") warning_files_count += 1 if os.getenv('ACH_SAFE_RUN', 'true').lower() == 'true': logging.error("ACH_SAFE_RUN=true: aborting Phase 3 due to warnings (file already exists in DB): %s", file) raise ValueError("ACH_SAFE_RUN=true: aborting due to warnings in Phase 3") # Rollback to savepoint to undo any partial changes for this file cur.execute("ROLLBACK TO SAVEPOINT file_save") continue ach_variables['file_fullpath'] = file # is the Object key ach_variables['inventory_code'] = os.path.splitext(os.path.basename(file))[0][:12] logging.info(f"ach_variables['inventory_code'] {ach_variables['inventory_code']}: {file}") # Extract the file extension ach_variables['objectKeys']['media'] = file ach_variables['objectKeys']['pdf'] = f"{os.path.splitext(file)[0]}.pdf" ach_variables['objectKeys']['pdf'] = ach_variables['objectKeys']['pdf'].replace('_H264', '') from config import EXTENSION_MIME_MAP if file.endswith('.mp4'): ach_variables['objectKeys']['conservative_copy'] = f"{os.path.splitext(file)[0]}.mov" # remove _H264 is done later elif file.endswith('.mp3'): ach_variables['objectKeys']['conservative_copy'] = f"{os.path.splitext(file)[0]}.wav" else: logging.error(f"Unsupported file type: {file}") error_files_count +=1 # Rollback to savepoint for this file cur.execute("ROLLBACK TO SAVEPOINT file_save") continue # Extract the file extension file_extension = os.path.splitext(file)[1] ach_variables['extension'] = file_extension # Store the file extension in ach_variables logging.info(f"the file File extension: {file_extension}") # Extract the file name with directory part file_name_with_path = os.path.splitext(file)[0] # Remove the extension but keep path logging.info(f"File name with path: {file_name_with_path}") # Extract the base name from the file name base_name = os.path.basename(file_name_with_path) # Extract the base name with path removed logging.info(f"Base name: {base_name}") # Apply _H264 removal only for .mp4 files if file.endswith('.mp4'): logging.info(f"File is an mp4 file: {file}. remove _H264") base_name = base_name.replace('_H264', '') file_name_with_path = file_name_with_path.replace('_H264', '') logging.info(f"Modified base name for mp4: {base_name}") logging.info(f"Modified file name with path for mp4: {file_name_with_path}") try: # Retrieve and log the file size # Optimized: Check cache first if s3_listing_cache and file in s3_listing_cache: file_size = s3_listing_cache[file].get('Size') logging.info(f"Retrieved file size from cache for: {file}") else: file_size = get_file_size(s3, bucket_name, file) # maybe can trow an error inside te get_file_size function and catch it here if file_size is not None: ach_variables['media_disk_size'] = file_size logging.info(f"The media file disk size is: {ach_variables['media_disk_size']}") else: logging.warning("Could not retrieve file size for %s.", file) warning_files_count += 1 if os.getenv('ACH_SAFE_RUN', 'true').lower() == 'true': logging.error("ACH_SAFE_RUN=true: aborting Phase 3 due to warnings (missing file size): %s", file) raise ValueError("ACH_SAFE_RUN=true: aborting due to warnings in Phase 3") continue # Skip to the next file in the loop logging.info("Start Validating files for %s...", base_name) # Check if related file exist and retreive .pdf file size try: # Check if the required files exist in S3 ach_variables['pdf_disk_size'] = check_related_files(s3, file_name_with_path, file, bucket_name, s3_listing_cache=s3_listing_cache) logging.info(f"PDF disk size: {ach_variables['pdf_disk_size']}") except FileNotFoundError as e: # Handle case where the file is not found logging.error(f"File not found error: {e}") error_files_count +=1 continue # Move on to the next file in the loop except ValueError as e: # Handle value errors logging.error(f"Value error: {e} probabli filesize zero") error_files_count +=1 continue # Move on to the next file in the loop except PermissionError as e: # Handle permission errors logging.error(f"Permission error: {e}") error_files_count +=1 continue # Move on to the next file in the loop except Exception as e: # Handle any other exceptions logging.error(f"Validation step failed for {file}: {e}") cur.execute("ROLLBACK TO SAVEPOINT file_save") error_files_count += 1 continue # Retrieve the file contents for related files: .md5, .json try: # Check if the file exists in S3 and retrieve file contents logging.info(f"Retrieving file contents for {file_name_with_path}...") file_contents = retrieve_file_contents(s3, f"{file_name_with_path}") except Exception as e: # Log the error logging.error(f"Error retrieving file contents for {file_name_with_path}: {e}") cur.execute("ROLLBACK TO SAVEPOINT file_save") file_contents = None # Set file_contents to None or handle it as needed error_files_count +=1 continue # Move on to the next file in the loop # if contents dont exists if file_contents is None: logging.error(f"Error retrieving file contents for {file}.") cur.execute("ROLLBACK TO SAVEPOINT file_save") error_files_count +=1 continue # Move on to the next file in the loop # Ensure file_contents is a dictionary if isinstance(file_contents, str): file_contents = json.loads(file_contents) # Extract and validate file information ach_variables['custom_data_in'], ach_variables['disk_size'], ach_variables['conservative_copy_extension'] = extract_and_validate_file_info(file_contents, file, ach_variables) logging.info(f"Custom data extracted: {ach_variables['custom_data_in']}") logging.info(f"Disk size extracted: {ach_variables['disk_size']}") logging.info(f"Conservative copy extension extracted: {ach_variables['conservative_copy_extension']}") logging.info(f"File {file} file validation completed") except Exception as e: logging.error(f"Error processing file {file}: {e}") error_files_count +=1 continue # Move on to the next file in the loop # no need truncate base name on this point # base_name = base_name[:12] # Truncate the base name to 12 characters # //////////////////////////////////////////////////////////////////////////////// # Check if the base name exists in the database logging.info(f"Checking database for {base_name}...") try: # Call the function to check the inventory code in the database and get the result result, truncated_base_name = check_inventory_in_db(s3, cur, base_name) logging.info(f"base name {base_name}, truncated_base_name: {truncated_base_name}") # Check the result and proceed accordingly if result: logging.info(f"Inventory code {base_name} found in the database.") # Call the function to retrieve digital file names if retrieve_digital_file_names(s3, cur, base_name, ach_variables['objectKeys']['media']) == True: # Call the function to add a file record and its relationship to the support record # logging.info(f"ach_variables: {ach_variables}") add_file_record_and_relationship(s3, cur, base_name, ach_variables) else: logging.warning(f"File record already exists for {base_name}.") cur.execute("ROLLBACK TO SAVEPOINT file_save") warning_files_count +=1 continue else: logging.error(f"Inventory code {base_name} not found in the database.") cur.execute("ROLLBACK TO SAVEPOINT file_save") error_files_count +=1 continue except Exception as e: logging.error(f"DB operation failed for {base_name}: {e}") cur.execute("ROLLBACK TO SAVEPOINT file_save") error_files_count +=1 continue # Commit the changes to the database logging.info(f"commit to databse {base_name}...") # Commit to the database (conn, cur) only if everything is okay; otherwise, perform a rollback. conn.commit() uploaded_files_count +=1 except Exception as e: # Roll back the changes done for this file only and continue processing others import traceback logging.error(f"Error processing {file}: {e}. Rolling back this file's changes.") logging.error(traceback.format_exc()) try: cur.execute("ROLLBACK TO SAVEPOINT file_save") except Exception as rollback_err: logging.error(f"Failed to rollback savepoint for {file}: {rollback_err}") error_files_count += 1 continue finally: # Release the savepoint so it doesn't linger in the session try: cur.execute("RELEASE SAVEPOINT file_save") except Exception: # Ignore release errors; rollback already cleaned up state if needed pass cur.close() conn.close() except ValueError as e: # Handle specific validation errors logging.error(f"Validation error: {e}") #handle_error(e) # Pass the ValueError to handle_error raise e # Raise the exception to the calling function except Exception as e: # Handle any other unexpected errors import traceback notify_error("FATAL ERROR in Phase 3 process", e) raise e # Raise the exception to the calling function # return the file saved return uploaded_files_count, warning_files_count, error_files_count # Function to create an S3 client def create_s3_client(aws_config): logging.info(f'Creating S3 client with endpoint: {aws_config["endpoint_url"]}') try: s3 = boto3.client( 's3', endpoint_url=aws_config['endpoint_url'], aws_access_key_id=aws_config['aws_access_key_id'], aws_secret_access_key=aws_config['aws_secret_access_key'], region_name=aws_config['region_name'], config=boto3.session.Config( signature_version='s3v4', s3={'addressing_style': 'path'} ) ) logging.info('S3 client created successfully') return s3 except (NoCredentialsError, PartialCredentialsError) as e: logging.error(f'Error creating S3 client: {e}') raise e # Function to list the contents of an S3 bucket def list_s3_bucket(s3_client, bucket_name): """ Lists S3 bucket contents with optional local JSON caching. Uses ACH_CACHE_S3_LIST from .env to decide if it should cache/read from 's3_cache.json'. """ cache_enabled = os.getenv('ACH_CACHE_S3_LIST', 'false').lower() == 'true' cache_file = 's3_cache.json' if cache_enabled and os.path.exists(cache_file): try: with open(cache_file, 'r', encoding='utf-8') as f: cached_data = json.load(f) logging.info(f"Loaded {len(cached_data)} items from local cache: {cache_file}") return cached_data except Exception as e: logging.warning(f"Failed to read S3 cache file: {e}. Falling back to S3 listing.") try: logging.info(f"Listing all objects in bucket: {bucket_name}...") paginator = s3_client.get_paginator('list_objects_v2') bucket_contents = [] # Convert datetime objects to string for JSON serialization def _serialize_datetime(obj): if isinstance(obj, datetime): return obj.isoformat() return obj from datetime import datetime for page in paginator.paginate(Bucket=bucket_name): if 'Contents' in page: for obj in page['Contents']: # Normalize dates for JSON compatibility if 'LastModified' in obj: obj['LastModified'] = obj['LastModified'].isoformat() bucket_contents.extend([obj]) logging.info(f"Retrieved {len(bucket_contents)} items from the bucket.") # Save to cache if enabled if cache_enabled: try: with open(cache_file, 'w', encoding='utf-8') as f: json.dump(bucket_contents, f) logging.info(f"S3 bucket listing saved to local cache: {cache_file}") except Exception as e: logging.warning(f"Failed to write S3 cache file: {e}") return bucket_contents except ClientError as e: logging.error(f'Error listing bucket contents: {e}') raise e # Function to get file size from S3 def get_file_size(s3_client, bucket_name, file_key): try: response = s3_client.head_object(Bucket=bucket_name, Key=file_key) return response['ContentLength'] except ClientError as e: logging.error(f"Failed to retrieve file size for {file_key}: {e}") return None # or an appropriate fallback value except Exception as e: logging.error(f"An unexpected error occurred: {e}") return None # Function to check if a file exists in S3 def check_file_exists_in_s3(s3, file_name,bucket_name): """ Checks if a file exists in an S3 bucket. Parameters: - s3 (boto3.client): The S3 client object. - file_name (str): The name of the file to check. Returns: - bool: True if the file exists, False otherwise. Raises: - ClientError: If there is an error checking the file. """ try: s3.head_object(Bucket=bucket_name, Key=file_name) return True except ClientError as e: if e.response['Error']['Code'] == '404': return False else: logging.error(f'Error checking file {file_name}: {e}') raise e # Function to retrieve file contents from S3 def upload_file_to_s3(s3_client, file_path, bucket_name, object_name=None): if object_name is None: object_name = file_path try: s3_client.upload_file(file_path, bucket_name, object_name) logging.info(f"File {file_path} uploaded to {bucket_name}/{object_name}") except ClientError as e: logging.error(f'Error uploading file {file_path} to bucket {bucket_name}: {e}') raise e # Function to download a file from S3 def download_file_from_s3(s3_client, bucket_name, object_name, file_path): try: s3_client.download_file(bucket_name, object_name, file_path) logging.info(f"File {object_name} downloaded from {bucket_name} to {file_path}") except ClientError as e: logging.error(f'Error downloading file {object_name} from bucket {bucket_name}: {e}') raise e