326 lines
16 KiB
Python
326 lines
16 KiB
Python
import boto3 # for S3
|
|
from botocore.exceptions import NoCredentialsError, PartialCredentialsError, ClientError # for exceptions
|
|
import logging # for logging
|
|
import json # for json.loads
|
|
import os # for os.path
|
|
import psycopg2 # for PostgreSQL
|
|
|
|
# Import custom modules
|
|
from file_utils import retrieve_file_contents, check_related_files, extract_and_validate_file_info # for file operations
|
|
from email_utils import handle_error # for error handling depecradted?
|
|
from db_utils import get_db_connection, check_inventory_in_db, check_objkey_in_file_db, add_file_record_and_relationship, retrieve_digital_file_names # for database operations
|
|
|
|
import config
|
|
|
|
# Function to check the existence of related files and validate in PostgreSQL
|
|
def parse_s3_files( s3, s3_files, ach_variables, excluded_folders=[]):
|
|
"""
|
|
Parses the S3 files and performs various operations on them.
|
|
Args:
|
|
s3 (S3): The S3 object for accessing S3 services.
|
|
s3_files (list): The list of S3 files to be processed.
|
|
Returns:
|
|
None
|
|
Raises:
|
|
FileNotFoundError: If a required file is not found in S3.
|
|
ValueError: If a file has zero size or if the file type is unsupported.
|
|
Exception: If any other unexpected exception occurs.
|
|
"""
|
|
# Load the configuration from the .env file
|
|
_ , db_config, _, bucket_name, _ = config.load_config()
|
|
# logg ach_variables
|
|
logging.info(f"ach_variables: {ach_variables}")
|
|
|
|
logging.info(f"Starting to parse S3 files from bucket {bucket_name}...")
|
|
|
|
try:
|
|
logging.info(f"Starting to parse S3 files from bucket {bucket_name}...")
|
|
# Ensure db_config is not None
|
|
if db_config is None:
|
|
raise ValueError("Database configuration is not loaded")
|
|
# return # Exit the function if db_config is None
|
|
|
|
conn = psycopg2.connect(**db_config)
|
|
cur = conn.cursor()
|
|
|
|
# Filter files with the desired prefix
|
|
# excluded_prefix = ['TEST-FOLDER-DEV/', 'DOCUMENTAZIONE_FOTOGRAFICA/', 'BTC/', 'VHS/', 'UMT/', 'OV2/', 'OA4/']
|
|
excluded_prefix = excluded_folders
|
|
# Exclude files that start with any prefix in the excluded_prefix array
|
|
filtered_files = [file for file in s3_files if not any(file.startswith(prefix) for prefix in excluded_prefix)]
|
|
|
|
# Filter files with the desired prefix
|
|
# DEBUG : filtered_files = [file for file in s3_files if file.startswith('TestFolderDev/')]
|
|
# Length of filtered files
|
|
logging.info(f"Array Length of filtered files: {len(filtered_files)}")
|
|
# Counters
|
|
error_files_count = 0
|
|
warning_files_count = 0
|
|
uploaded_files_count = 0
|
|
#for file in s3_files:
|
|
for file in filtered_files:
|
|
if file.endswith(('.mp4', '.mp3')): # Check for both .mp4 and .mp3
|
|
logging.info("Processing file: %s in the bucket: %s", file, bucket_name)
|
|
# check if file exists in db
|
|
result = check_objkey_in_file_db( cur, file)
|
|
# Check the result and proceed accordingly
|
|
if result:
|
|
# logging.warning(f"File {file} already exists in the database.")
|
|
warning_files_count +=1
|
|
continue
|
|
|
|
ach_variables['file_fullpath'] = file # is the Object key
|
|
ach_variables['inventory_code'] = os.path.splitext(os.path.basename(file))[0][:12]
|
|
logging.info(f"ach_variables['inventory_code'] {ach_variables['inventory_code']}: {file}")
|
|
# Extract the file extension
|
|
ach_variables['objectKeys']['media'] = file
|
|
ach_variables['objectKeys']['pdf'] = f"{os.path.splitext(file)[0]}.pdf"
|
|
ach_variables['objectKeys']['pdf'] = ach_variables['objectKeys']['pdf'].replace('_H264', '')
|
|
if file.endswith('.mp4'):
|
|
ach_variables['objectKeys']['conservative_copy'] = f"{os.path.splitext(file)[0]}.mov" # remove _H264 is done later
|
|
elif file.endswith('.mp3'):
|
|
ach_variables['objectKeys']['conservative_copy'] = f"{os.path.splitext(file)[0]}.wav"
|
|
else:
|
|
logging.KeyError(f"Unsupported file type: {file}")
|
|
error_files_count +=1
|
|
continue
|
|
|
|
# Extract the file extension
|
|
file_extension = os.path.splitext(file)[1]
|
|
ach_variables['extension'] = file_extension # Store the file extension in ach_variables
|
|
logging.info(f"the file File extension: {file_extension}")
|
|
|
|
# Extract the file name with directory part
|
|
file_name_with_path = os.path.splitext(file)[0] # Remove the extension but keep path
|
|
logging.info(f"File name with path: {file_name_with_path}")
|
|
|
|
# Extract the base name from the file name
|
|
base_name = os.path.basename(file_name_with_path) # Extract the base name with path removed
|
|
logging.info(f"Base name: {base_name}")
|
|
|
|
# Apply _H264 removal only for .mp4 files
|
|
if file.endswith('.mp4'):
|
|
logging.info(f"File is an mp4 file: {file}. remove _H264")
|
|
base_name = base_name.replace('_H264', '')
|
|
file_name_with_path = file_name_with_path.replace('_H264', '')
|
|
logging.info(f"Modified base name for mp4: {base_name}")
|
|
logging.info(f"Modified file name with path for mp4: {file_name_with_path}")
|
|
|
|
try:
|
|
# Retrieve and log the file size
|
|
file_size = get_file_size(s3, bucket_name, file)
|
|
# maybe can trow an error inside te get_file_size function and catch it here
|
|
if file_size is not None:
|
|
ach_variables['media_disk_size'] = file_size
|
|
logging.info(f"The media file disk size is: {ach_variables['media_disk_size']}")
|
|
else:
|
|
logging.warning("Could not retrieve file size for %s.", file)
|
|
warning_files_count +=1
|
|
continue # Skip to the next file in the loop
|
|
|
|
logging.info("Start Validating files for %s...", base_name)
|
|
# Check if related file exist and retreive .pdf file size
|
|
try:
|
|
# Check if the required files exist in S3
|
|
ach_variables['pdf_disk_size'] = check_related_files(s3, file_name_with_path, file, bucket_name)
|
|
logging.info(f"PDF disk size: {ach_variables['pdf_disk_size']}")
|
|
except FileNotFoundError as e:
|
|
# Handle case where the file is not found
|
|
logging.error(f"File not found error: {e}")
|
|
error_files_count +=1
|
|
continue # Move on to the next file in the loop
|
|
except ValueError as e:
|
|
# Handle value errors
|
|
logging.error(f"Value error: {e} probabli filesize zero")
|
|
error_files_count +=1
|
|
continue # Move on to the next file in the loop
|
|
except PermissionError as e:
|
|
# Handle permission errors
|
|
logging.error(f"Permission error: {e}")
|
|
error_files_count +=1
|
|
continue # Move on to the next file in the loop
|
|
except Exception as e:
|
|
# Handle any other exceptions
|
|
logging.error(f"An error occurred: {e}")
|
|
|
|
# Retrieve the file contents for related files: .md5, .json
|
|
try:
|
|
# Check if the file exists in S3 and retrieve file contents
|
|
logging.info(f"Retrieving file contents for {file_name_with_path}...")
|
|
file_contents = retrieve_file_contents(s3, f"{file_name_with_path}")
|
|
except Exception as e:
|
|
# Log the error
|
|
logging.error(f"Error retrieving file contents for {file_name_with_path}: {e}")
|
|
file_contents = None # Set file_contents to None or handle it as needed
|
|
error_files_count +=1
|
|
continue # Move on to the next file in the loop
|
|
|
|
# if contents dont exists
|
|
if file_contents is None:
|
|
logging.error(f"Error retrieving file contents for {file}.")
|
|
error_files_count +=1
|
|
continue # Move on to the next file in the loop
|
|
|
|
# Ensure file_contents is a dictionary
|
|
if isinstance(file_contents, str):
|
|
file_contents = json.loads(file_contents)
|
|
|
|
# Extract and validate file information
|
|
ach_variables['custom_data_in'], ach_variables['disk_size'], ach_variables['conservative_copy_extension'] = extract_and_validate_file_info(file_contents, file, ach_variables)
|
|
logging.info(f"Custom data extracted: {ach_variables['custom_data_in']}")
|
|
logging.info(f"Disk size extracted: {ach_variables['disk_size']}")
|
|
logging.info(f"Conservative copy extension extracted: {ach_variables['conservative_copy_extension']}")
|
|
logging.info(f"File {file} file validation completed")
|
|
except Exception as e:
|
|
logging.error(f"Error processing file {file}: {e}")
|
|
error_files_count +=1
|
|
continue # Move on to the next file in the loop
|
|
|
|
# no need truncate base name on this point
|
|
# base_name = base_name[:12] # Truncate the base name to 12 characters
|
|
# ////////////////////////////////////////////////////////////////////////////////
|
|
|
|
# Check if the base name exists in the database
|
|
logging.info(f"Checking database for {base_name}...")
|
|
|
|
try:
|
|
# Call the function to check the inventory code in the database and get the result
|
|
result, truncated_base_name = check_inventory_in_db(s3, cur, base_name)
|
|
logging.info(f"base name {base_name}, truncated_base_name: {truncated_base_name}")
|
|
# Check the result and proceed accordingly
|
|
if result:
|
|
logging.info(f"Inventory code {base_name} found in the database.")
|
|
# Call the function to retrieve digital file names
|
|
if retrieve_digital_file_names(s3, cur, base_name, ach_variables['objectKeys']['media']) == True:
|
|
# Call the function to add a file record and its relationship to the support record
|
|
# logging.info(f"ach_variables: {ach_variables}")
|
|
add_file_record_and_relationship(s3, cur, base_name, ach_variables)
|
|
else:
|
|
logging.warning(f"File record already exists for {base_name}.")
|
|
warning_files_count +=1
|
|
continue
|
|
else:
|
|
logging.error(f"Inventory code {base_name} not found in the database.")
|
|
error_files_count +=1
|
|
continue
|
|
except ValueError as e:
|
|
logging.error(f"An error occurred: {e}")
|
|
error_files_count +=1
|
|
continue
|
|
|
|
# Commit the changes to the database
|
|
logging.info(f"commit to databse {base_name}...")
|
|
|
|
# Commit to the database (conn, cur) only if everything is okay; otherwise, perform a rollback.
|
|
conn.commit()
|
|
uploaded_files_count +=1
|
|
cur.close()
|
|
conn.close()
|
|
except ValueError as e:
|
|
# Handle specific validation errors
|
|
logging.error(f"Validation error: {e}")
|
|
#handle_error(e) # Pass the ValueError to handle_error
|
|
raise e # Raise the exception to the calling function
|
|
except Exception as e:
|
|
# Handle any other unexpected errors
|
|
logging.error(f"Unexpected error: {e}")
|
|
#handle_error(e) # Pass unexpected errors to handle_error
|
|
raise e # Raise the exception to the calling function
|
|
|
|
# return the file saved
|
|
return uploaded_files_count, warning_files_count, error_files_count
|
|
|
|
# Function to create an S3 client
|
|
def create_s3_client(aws_config):
|
|
logging.info(f'Creating S3 client with endpoint: {aws_config["endpoint_url"]}')
|
|
try:
|
|
s3 = boto3.client(
|
|
's3',
|
|
endpoint_url=aws_config['endpoint_url'],
|
|
aws_access_key_id=aws_config['aws_access_key_id'],
|
|
aws_secret_access_key=aws_config['aws_secret_access_key'],
|
|
region_name=aws_config['region_name'],
|
|
config=boto3.session.Config(
|
|
signature_version='s3v4',
|
|
s3={'addressing_style': 'path'}
|
|
)
|
|
)
|
|
logging.info('S3 client created successfully')
|
|
return s3
|
|
except (NoCredentialsError, PartialCredentialsError) as e:
|
|
logging.error(f'Error creating S3 client: {e}')
|
|
raise e
|
|
|
|
# Function to list the contents of an S3 bucket
|
|
def list_s3_bucket(s3_client, bucket_name):
|
|
try:
|
|
paginator = s3_client.get_paginator('list_objects_v2')
|
|
bucket_contents = []
|
|
|
|
for page in paginator.paginate(Bucket=bucket_name):
|
|
if 'Contents' in page:
|
|
bucket_contents.extend(page['Contents'])
|
|
|
|
logging.info(f"Retrieved {len(bucket_contents)} items from the bucket.")
|
|
return bucket_contents
|
|
except ClientError as e:
|
|
logging.error(f'Error listing bucket contents: {e}')
|
|
raise e
|
|
|
|
# Function to get file size from S3
|
|
def get_file_size(s3_client, bucket_name, file_key):
|
|
try:
|
|
response = s3_client.head_object(Bucket=bucket_name, Key=file_key)
|
|
return response['ContentLength']
|
|
except ClientError as e:
|
|
logging.error(f"Failed to retrieve file size for {file_key}: {e}")
|
|
return None # or an appropriate fallback value
|
|
except Exception as e:
|
|
logging.error(f"An unexpected error occurred: {e}")
|
|
return None
|
|
|
|
# Function to check if a file exists in S3
|
|
def check_file_exists_in_s3(s3, file_name,bucket_name):
|
|
"""
|
|
Checks if a file exists in an S3 bucket.
|
|
|
|
Parameters:
|
|
- s3 (boto3.client): The S3 client object.
|
|
- file_name (str): The name of the file to check.
|
|
|
|
Returns:
|
|
- bool: True if the file exists, False otherwise.
|
|
|
|
Raises:
|
|
- ClientError: If there is an error checking the file.
|
|
|
|
"""
|
|
try:
|
|
s3.head_object(Bucket=bucket_name, Key=file_name)
|
|
return True
|
|
except ClientError as e:
|
|
if e.response['Error']['Code'] == '404':
|
|
return False
|
|
else:
|
|
logging.error(f'Error checking file {file_name}: {e}')
|
|
raise e
|
|
|
|
# Function to retrieve file contents from S3
|
|
def upload_file_to_s3(s3_client, file_path, bucket_name, object_name=None):
|
|
if object_name is None:
|
|
object_name = file_path
|
|
try:
|
|
s3_client.upload_file(file_path, bucket_name, object_name)
|
|
logging.info(f"File {file_path} uploaded to {bucket_name}/{object_name}")
|
|
except ClientError as e:
|
|
logging.error(f'Error uploading file {file_path} to bucket {bucket_name}: {e}')
|
|
raise e
|
|
|
|
# Function to download a file from S3
|
|
def download_file_from_s3(s3_client, bucket_name, object_name, file_path):
|
|
try:
|
|
s3_client.download_file(bucket_name, object_name, file_path)
|
|
logging.info(f"File {object_name} downloaded from {bucket_name} to {file_path}")
|
|
except ClientError as e:
|
|
logging.error(f'Error downloading file {object_name} from bucket {bucket_name}: {e}')
|
|
raise e |